pub async fn queue_trim_consumed(queue: Queue, max_retry: u32)
Expand description

Utility to trim redis stream of all messages that are consumed by everyone

Algorithm:

  • Enumerate all groups using the queue to find last-delivered-id which will show ID of last messaged which was read from queue;
  • Enumerate all pending messages within group to find lower consumed id of every consumer;
  • If no message delivered yet, then do nothing
  • Else If no pending message is present, then perform XTRIM MINID min(last_delivered_ids) + 1 resulting in all messages, including min(last_delivered_ids) deleted from queue
  • Otherwise if least on pending message is present, then perform XTRIM MINID min(lowest_ids) resulting in all messages except min(lowest_ids)

Re-try mechanism

if max_retry is above 1, then, if redis temporary unavailable, task will sleep 1s * try_number and try again.