Function redis_queue::utils::queue_trim_consumed
source · pub async fn queue_trim_consumed(queue: Queue, max_retry: u32)
Expand description
Utility to trim redis stream of all messages that are consumed by everyone
Algorithm:
- Enumerate all groups using the queue to find
last-delivered-id
which will show ID of last messaged which was read from queue; - Enumerate all pending messages within group to find lower consumed id of every consumer;
- If no message delivered yet, then do nothing
- Else If no pending message is present, then perform
XTRIM MINID min(last_delivered_ids) + 1
resulting in all messages, includingmin(last_delivered_ids)
deleted from queue - Otherwise if least on pending message is present, then perform
XTRIM MINID min(lowest_ids)
resulting in all messages exceptmin(lowest_ids)
Re-try mechanism
if max_retry
is above 1, then, if redis temporary unavailable, task will sleep 1s * try_number
and try again.