redis_queue/utils.rs
1//!Redis queue utilities
2
3use crate::queue::Queue;
4use crate::types::TrimMethod;
5
6use core::time;
7
8const SLEEP_INTERVAL: time::Duration = time::Duration::from_secs(1);
9
10#[tracing::instrument(skip(queue))]
11///Utility to trim redis stream of all messages that are consumed by everyone
12///
13///## Algorithm:
14///
15///- Enumerate all groups using the queue to find `last-delivered-id` which will show ID of last
16///messaged which was read from queue;
17///- Enumerate all pending messages within group to find lower consumed id of every consumer;
18///- If no message delivered yet, then do nothing
19///- Else If no pending message is present, then perform `XTRIM MINID min(last_delivered_ids) + 1`
20///resulting in all messages, including `min(last_delivered_ids)` deleted from queue
21///- Otherwise if least on pending message is present, then perform `XTRIM MINID min(lowest_ids)`
22///resulting in all messages except `min(lowest_ids)`
23///
24///## Re-try mechanism
25///
26///if `max_retry` is above 1, then, if redis temporary unavailable, task will sleep `1s * try_number` and try again.
27pub async fn queue_trim_consumed(queue: Queue, max_retry: u32) {
28 match queue.len().await {
29 //Nothing to do as there is no entries
30 Ok(0) => return,
31 Err(error) => {
32 tracing::warn!("redis len failed: {error}");
33 return;
34 }
35 Ok(_) => (),
36 }
37
38 let mut retries = 1;
39
40 let groups = loop {
41 match queue.groups_info().await {
42 Ok(groups) => break groups,
43 Err(error) => {
44 if retries <= max_retry {
45 if error.is_timeout() || error.is_connection_dropped() || error.is_connection_refusal() {
46 tracing::info!("redis temp. unavailable: {error}");
47 tokio::time::sleep(SLEEP_INTERVAL * retries).await;
48 retries += 1;
49 continue;
50 }
51 }
52 tracing::warn!("group info failed: {error}");
53 return;
54 }
55 }
56 };
57
58 if groups.is_empty() {
59 return;
60 }
61
62 let mut lowest_ids = Vec::with_capacity(groups.len());
63 let mut last_delivered_ids = Vec::with_capacity(groups.len());
64 for group in groups.iter() {
65 //If id is 0-0 it means there is yet to be a single message delivery
66 //as such we cannot even consider this ID for use, so skip it and do not try to fetch
67 //pending status
68 if group.last_delivered_id.is_nil() {
69 continue;
70 }
71
72 last_delivered_ids.push(group.last_delivered_id);
73
74 retries = 1;
75 let stats = loop {
76 match queue.pending_stats(&group.name).await {
77 Ok(stats) => break stats,
78 Err(error) => {
79 if retries <= max_retry {
80 if error.is_timeout() || error.is_connection_dropped() || error.is_connection_refusal() {
81 tracing::info!("redis temp. unavailable: {error}");
82 tokio::time::sleep(SLEEP_INTERVAL * retries).await;
83 retries += 1;
84 continue;
85 }
86 }
87 tracing::warn!("group pending stats failed: {error}");
88 return;
89 }
90 }
91 };
92 //if length is 0, there is no ID of interest, skip it
93 if stats.len != 0 {
94 lowest_ids.push(stats.lowest_id);
95 }
96 }
97
98 if last_delivered_ids.is_empty() {
99 //No message delivered yet, skip
100 return;
101 }
102
103 //We sort ids in order to get common ID between all groups that consumes messages from this
104 //stream, so that we only delete message only once all groups consumed message.
105 lowest_ids.sort_unstable();
106 last_delivered_ids.sort_unstable();
107
108 let trim = match lowest_ids.get(0) {
109 None => {
110 //If there are no pending message at all it means that user consumed all delivered messages
111 //hence trim queue of messages with id <= last_delivered_id
112 let last_delivered_id = last_delivered_ids[0];
113 TrimMethod::MinId(last_delivered_id.next())
114 }
115 Some(lowest_pending_id) => {
116 //if lowest_ids present, it means there are pending messages, so we need to limit deletion to
117 //only removing messages with id < lowest_pending_id
118 TrimMethod::MinId(*lowest_pending_id)
119 }
120 };
121
122 retries = 0;
123 loop {
124 match queue.trim(trim).await {
125 Ok(number) => {
126 tracing::info!("Removed {number} entries");
127 return;
128 }
129 Err(error) => {
130 if retries <= max_retry {
131 if error.is_timeout() || error.is_connection_dropped() || error.is_connection_refusal() {
132 tracing::info!("redis temp. unavailable: {error}");
133 tokio::time::sleep(SLEEP_INTERVAL * retries).await;
134 retries += 1;
135 continue;
136 }
137 }
138 tracing::warn!("stream trim failed: {error}");
139 return;
140 }
141 }
142 }
143}