redis_queue/
utils.rs

1//!Redis queue utilities
2
3use crate::queue::Queue;
4use crate::types::TrimMethod;
5
6use core::time;
7
8const SLEEP_INTERVAL: time::Duration = time::Duration::from_secs(1);
9
10#[tracing::instrument(skip(queue))]
11///Utility to trim redis stream of all messages that are consumed by everyone
12///
13///## Algorithm:
14///
15///- Enumerate all groups using the queue to find `last-delivered-id` which will show ID of last
16///messaged which was read from queue;
17///- Enumerate all pending messages within group to find lower consumed id of every consumer;
18///- If no message delivered yet, then do nothing
19///- Else If no pending message is present, then perform `XTRIM MINID min(last_delivered_ids) + 1`
20///resulting in all messages, including `min(last_delivered_ids)` deleted from queue
21///- Otherwise if least on pending message is present, then perform `XTRIM MINID min(lowest_ids)`
22///resulting in all messages except `min(lowest_ids)`
23///
24///## Re-try mechanism
25///
26///if `max_retry` is above 1, then, if redis temporary unavailable, task will sleep `1s * try_number` and try again.
27pub async fn queue_trim_consumed(queue: Queue, max_retry: u32) {
28    match queue.len().await {
29        //Nothing to do as there is no entries
30        Ok(0) => return,
31        Err(error) => {
32            tracing::warn!("redis len failed: {error}");
33            return;
34        }
35        Ok(_) => (),
36    }
37
38    let mut retries = 1;
39
40    let groups = loop {
41        match queue.groups_info().await {
42            Ok(groups) => break groups,
43            Err(error) => {
44                if retries <= max_retry {
45                    if error.is_timeout() || error.is_connection_dropped() || error.is_connection_refusal() {
46                        tracing::info!("redis temp. unavailable: {error}");
47                        tokio::time::sleep(SLEEP_INTERVAL * retries).await;
48                        retries += 1;
49                        continue;
50                    }
51                }
52                tracing::warn!("group info failed: {error}");
53                return;
54            }
55        }
56    };
57
58    if groups.is_empty() {
59        return;
60    }
61
62    let mut lowest_ids = Vec::with_capacity(groups.len());
63    let mut last_delivered_ids = Vec::with_capacity(groups.len());
64    for group in groups.iter() {
65        //If id is 0-0 it means there is yet to be a single message delivery
66        //as such we cannot even consider this ID for use, so skip it and do not try to fetch
67        //pending status
68        if group.last_delivered_id.is_nil() {
69            continue;
70        }
71
72        last_delivered_ids.push(group.last_delivered_id);
73
74        retries = 1;
75        let stats = loop {
76            match queue.pending_stats(&group.name).await {
77                Ok(stats) => break stats,
78                Err(error) => {
79                    if retries <= max_retry {
80                        if error.is_timeout() || error.is_connection_dropped() || error.is_connection_refusal() {
81                            tracing::info!("redis temp. unavailable: {error}");
82                            tokio::time::sleep(SLEEP_INTERVAL * retries).await;
83                            retries += 1;
84                            continue;
85                        }
86                    }
87                    tracing::warn!("group pending stats failed: {error}");
88                    return;
89                }
90            }
91        };
92        //if length is 0, there is no ID of interest, skip it
93        if stats.len != 0 {
94            lowest_ids.push(stats.lowest_id);
95        }
96    }
97
98    if last_delivered_ids.is_empty() {
99        //No message delivered yet, skip
100        return;
101    }
102
103    //We sort ids in order to get common ID between all groups that consumes messages from this
104    //stream, so that we only delete message only once all groups consumed message.
105    lowest_ids.sort_unstable();
106    last_delivered_ids.sort_unstable();
107
108    let trim = match lowest_ids.get(0) {
109        None => {
110            //If there are no pending message at all it means that user consumed all delivered messages
111            //hence trim queue of messages with id <= last_delivered_id
112            let last_delivered_id = last_delivered_ids[0];
113            TrimMethod::MinId(last_delivered_id.next())
114        }
115        Some(lowest_pending_id) => {
116            //if lowest_ids present, it means there are pending messages, so we need to limit deletion to
117            //only removing messages with id < lowest_pending_id
118            TrimMethod::MinId(*lowest_pending_id)
119        }
120    };
121
122    retries = 0;
123    loop {
124        match queue.trim(trim).await {
125            Ok(number) => {
126                tracing::info!("Removed {number} entries");
127                return;
128            }
129            Err(error) => {
130                if retries <= max_retry {
131                    if error.is_timeout() || error.is_connection_dropped() || error.is_connection_refusal() {
132                        tracing::info!("redis temp. unavailable: {error}");
133                        tokio::time::sleep(SLEEP_INTERVAL * retries).await;
134                        retries += 1;
135                        continue;
136                    }
137                }
138                tracing::warn!("stream trim failed: {error}");
139                return;
140            }
141        }
142    }
143}