1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
//!Redis queue utilities
use crate::queue::Queue;
use crate::types::TrimMethod;
use core::time;
const SLEEP_INTERVAL: time::Duration = time::Duration::from_secs(1);
#[tracing::instrument(skip(queue))]
///Utility to trim redis stream of all messages that are consumed by everyone
///
///## Algorithm:
///
///- Enumerate all groups using the queue to find `last-delivered-id` which will show ID of last
///messaged which was read from queue;
///- Enumerate all pending messages within group to find lower consumed id of every consumer;
///- If no message delivered yet, then do nothing
///- Else If no pending message is present, then perform `XTRIM MINID min(last_delivered_ids) + 1`
///resulting in all messages, including `min(last_delivered_ids)` deleted from queue
///- Otherwise if least on pending message is present, then perform `XTRIM MINID min(lowest_ids)`
///resulting in all messages except `min(lowest_ids)`
///
///## Re-try mechanism
///
///if `max_retry` is above 1, then, if redis temporary unavailable, task will sleep `1s * try_number` and try again.
pub async fn queue_trim_consumed(queue: Queue, max_retry: u32) {
match queue.len().await {
//Nothing to do as there is no entries
Ok(0) => return,
Err(error) => {
tracing::warn!("redis len failed: {error}");
return;
}
Ok(_) => (),
}
let mut retries = 1;
let groups = loop {
match queue.groups_info().await {
Ok(groups) => break groups,
Err(error) => {
if retries <= max_retry {
if error.is_timeout() || error.is_connection_dropped() || error.is_connection_refusal() {
tracing::info!("redis temp. unavailable: {error}");
tokio::time::sleep(SLEEP_INTERVAL * retries).await;
retries += 1;
continue;
}
}
tracing::warn!("group info failed: {error}");
return;
}
}
};
if groups.is_empty() {
return;
}
let mut lowest_ids = Vec::with_capacity(groups.len());
let mut last_delivered_ids = Vec::with_capacity(groups.len());
for group in groups.iter() {
//If id is 0-0 it means there is yet to be a single message delivery
//as such we cannot even consider this ID for use, so skip it and do not try to fetch
//pending status
if group.last_delivered_id.is_nil() {
continue;
}
last_delivered_ids.push(group.last_delivered_id);
retries = 1;
let stats = loop {
match queue.pending_stats(&group.name).await {
Ok(stats) => break stats,
Err(error) => {
if retries <= max_retry {
if error.is_timeout() || error.is_connection_dropped() || error.is_connection_refusal() {
tracing::info!("redis temp. unavailable: {error}");
tokio::time::sleep(SLEEP_INTERVAL * retries).await;
retries += 1;
continue;
}
}
tracing::warn!("group pending stats failed: {error}");
return;
}
}
};
//if length is 0, there is no ID of interest, skip it
if stats.len != 0 {
lowest_ids.push(stats.lowest_id);
}
}
if last_delivered_ids.is_empty() {
//No message delivered yet, skip
return;
}
//We sort ids in order to get common ID between all groups that consumes messages from this
//stream, so that we only delete message only once all groups consumed message.
lowest_ids.sort_unstable();
last_delivered_ids.sort_unstable();
let trim = match lowest_ids.get(0) {
None => {
//If there are no pending message at all it means that user consumed all delivered messages
//hence trim queue of messages with id <= last_delivered_id
let last_delivered_id = last_delivered_ids[0];
TrimMethod::MinId(last_delivered_id.next())
}
Some(lowest_pending_id) => {
//if lowest_ids present, it means there are pending messages, so we need to limit deletion to
//only removing messages with id < lowest_pending_id
TrimMethod::MinId(*lowest_pending_id)
}
};
retries = 0;
loop {
match queue.trim(trim).await {
Ok(number) => {
tracing::info!("Removed {number} entries");
return;
}
Err(error) => {
if retries <= max_retry {
if error.is_timeout() || error.is_connection_dropped() || error.is_connection_refusal() {
tracing::info!("redis temp. unavailable: {error}");
tokio::time::sleep(SLEEP_INTERVAL * retries).await;
retries += 1;
continue;
}
}
tracing::warn!("stream trim failed: {error}");
return;
}
}
}
}