use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::backends::redis::client::{RedisClient, RedisConnection};
use crate::error::Result;
use crate::retry::Backoff;
use super::constants::AUTOCLAIM_COUNT;
fn reaper_consumer_name(group: &str) -> String {
format!("shove-reaper-{group}")
}
#[doc(hidden)]
pub fn spawn_reaper(
client: RedisClient,
streams: Vec<String>,
group: String,
interval: Duration,
min_idle_ms: u64,
shutdown: CancellationToken,
) -> JoinHandle<()> {
let reaper = reaper_consumer_name(&group);
tokio::spawn(async move {
let mut conn = match acquire_conn_with_retry(&client, &shutdown).await {
Some(c) => c,
None => return,
};
loop {
tokio::select! {
_ = shutdown.cancelled() => return,
_ = tokio::time::sleep(interval) => {}
}
let mut needs_reconnect = false;
for stream in &streams {
if shutdown.is_cancelled() {
return;
}
if let Err(e) =
autoclaim_all(&mut conn, stream, &group, &reaper, min_idle_ms, &shutdown).await
{
tracing::warn!(
stream,
error = %e,
"reaper: XAUTOCLAIM failed, reconnecting",
);
needs_reconnect = true;
break;
}
}
if needs_reconnect {
match acquire_conn_with_retry(&client, &shutdown).await {
Some(c) => conn = c,
None => return,
}
}
}
})
}
async fn autoclaim_all(
conn: &mut RedisConnection,
stream: &str,
group: &str,
consumer: &str,
min_idle_ms: u64,
shutdown: &CancellationToken,
) -> Result<()> {
use crate::error::ShoveError;
use redis::streams::StreamAutoClaimReply;
let mut cursor = "0-0".to_owned();
loop {
if shutdown.is_cancelled() {
return Ok(());
}
let reply: StreamAutoClaimReply = conn
.query(
redis::cmd("XAUTOCLAIM")
.arg(stream)
.arg(group)
.arg(consumer)
.arg(min_idle_ms)
.arg(&cursor)
.arg("COUNT")
.arg(AUTOCLAIM_COUNT),
)
.await
.map_err(|e| ShoveError::Connection(format!("XAUTOCLAIM failed: {e}")))?;
if reply.next_stream_id == "0-0" || reply.next_stream_id.is_empty() {
break;
}
cursor = reply.next_stream_id;
}
Ok(())
}
async fn acquire_conn_with_retry(
client: &RedisClient,
shutdown: &CancellationToken,
) -> Option<RedisConnection> {
let mut backoff = Backoff::default();
loop {
match client.multiplexed_conn().await {
Ok(c) => return Some(c),
Err(e) => {
if shutdown.is_cancelled() {
return None;
}
let delay = backoff.next().expect("backoff is infinite");
tracing::warn!(
"reaper: connection failed ({}), retrying in {:.1}s",
e,
delay.as_secs_f64()
);
tokio::select! {
_ = tokio::time::sleep(delay) => {}
_ = shutdown.cancelled() => return None,
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn reaper_consumer_name_is_stable_per_group() {
assert_eq!(reaper_consumer_name("orders"), "shove-reaper-orders");
assert_eq!(reaper_consumer_name("shove"), "shove-reaper-shove");
assert_eq!(reaper_consumer_name("g"), reaper_consumer_name("g"));
}
}