redis_queue/manager/utils.rs
1use core::sync::atomic::{AtomicU32, Ordering};
2
3use crate::Queue;
4use crate::utils::queue_trim_consumed;
5use crate::manager::ConsumerKind;
6
7///Wrapper for `queue_trim_consumed` to perform consuming logic depending on `ConsumerKind` of `manager`
8///
9///This function is also guarded against performing multiple concurrent trims.
10///If task is ongoing, it will skip trim and exit task early.
11///Otherwise it starts trim with number of retries as specified via `retry_num`
12pub async fn trim_queue_task(queue: Queue, kind: ConsumerKind, retry_num: u32) {
13 static STATE: AtomicU32 = AtomicU32::new(0);
14
15 if let ConsumerKind::Extra = kind {
16 return;
17 }
18
19 let state = STATE.fetch_add(1, Ordering::SeqCst);
20 if state == 0 {
21 //Only call trim if no previous call exists.
22 //If trimming is ongoing there is no need to call again, wait for next opportunity
23 queue_trim_consumed(queue, retry_num).await
24 }
25 STATE.fetch_sub(1, Ordering::SeqCst);
26}