use crate::queue::Queue;
use crate::types::TrimMethod;
use core::time;
const SLEEP_INTERVAL: time::Duration = time::Duration::from_secs(1);
#[tracing::instrument(skip(queue))]
pub async fn queue_trim_consumed(queue: Queue, max_retry: u32) {
match queue.len().await {
Ok(0) => return,
Err(error) => {
tracing::warn!("redis len failed: {error}");
return;
}
Ok(_) => (),
}
let mut retries = 1;
let groups = loop {
match queue.groups_info().await {
Ok(groups) => break groups,
Err(error) => {
if retries <= max_retry {
if error.is_timeout() || error.is_connection_dropped() || error.is_connection_refusal() {
tracing::info!("redis temp. unavailable: {error}");
tokio::time::sleep(SLEEP_INTERVAL * retries).await;
retries += 1;
continue;
}
}
tracing::warn!("group info failed: {error}");
return;
}
}
};
if groups.is_empty() {
return;
}
let mut lowest_ids = Vec::with_capacity(groups.len());
let mut last_delivered_ids = Vec::with_capacity(groups.len());
for group in groups.iter() {
if group.last_delivered_id.is_nil() {
continue;
}
last_delivered_ids.push(group.last_delivered_id);
retries = 1;
let stats = loop {
match queue.pending_stats(&group.name).await {
Ok(stats) => break stats,
Err(error) => {
if retries <= max_retry {
if error.is_timeout() || error.is_connection_dropped() || error.is_connection_refusal() {
tracing::info!("redis temp. unavailable: {error}");
tokio::time::sleep(SLEEP_INTERVAL * retries).await;
retries += 1;
continue;
}
}
tracing::warn!("group pending stats failed: {error}");
return;
}
}
};
if stats.len != 0 {
lowest_ids.push(stats.lowest_id);
}
}
if last_delivered_ids.is_empty() {
return;
}
lowest_ids.sort_unstable();
last_delivered_ids.sort_unstable();
let trim = match lowest_ids.get(0) {
None => {
let last_delivered_id = last_delivered_ids[0];
TrimMethod::MinId(last_delivered_id.next())
}
Some(lowest_pending_id) => {
TrimMethod::MinId(*lowest_pending_id)
}
};
retries = 0;
loop {
match queue.trim(trim).await {
Ok(number) => {
tracing::info!("Removed {number} entries");
return;
}
Err(error) => {
if retries <= max_retry {
if error.is_timeout() || error.is_connection_dropped() || error.is_connection_refusal() {
tracing::info!("redis temp. unavailable: {error}");
tokio::time::sleep(SLEEP_INTERVAL * retries).await;
retries += 1;
continue;
}
}
tracing::warn!("stream trim failed: {error}");
return;
}
}
}
}