redis_queue/manager/
utils.rs

1use core::sync::atomic::{AtomicU32, Ordering};
2
3use crate::Queue;
4use crate::utils::queue_trim_consumed;
5use crate::manager::ConsumerKind;
6
7///Wrapper for `queue_trim_consumed` to perform consuming logic depending on `ConsumerKind` of `manager`
8///
9///This function is also guarded against performing multiple concurrent trims.
10///If task is ongoing, it will skip trim and exit task early.
11///Otherwise it starts trim with number of retries as specified via `retry_num`
12pub async fn trim_queue_task(queue: Queue, kind: ConsumerKind, retry_num: u32) {
13    static STATE: AtomicU32 = AtomicU32::new(0);
14
15    if let ConsumerKind::Extra = kind {
16        return;
17    }
18
19    let state = STATE.fetch_add(1, Ordering::SeqCst);
20    if state == 0 {
21        //Only call trim if no previous call exists.
22        //If trimming is ongoing there is no need to call again, wait for next opportunity
23        queue_trim_consumed(queue, retry_num).await
24    }
25    STATE.fetch_sub(1, Ordering::SeqCst);
26}