use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::backends::redis::client::{RedisClient, RedisConnection};
use crate::error::Result;
use super::client::acquire_conn_with_retry;
use super::constants::AUTOCLAIM_COUNT;
fn reaper_consumer_name(group: &str) -> String {
format!("shove-reaper-{group}")
}
#[doc(hidden)]
pub fn spawn_reaper(
client: RedisClient,
streams: Vec<String>,
group: String,
interval: Duration,
min_idle_ms: u64,
shutdown: CancellationToken,
) -> JoinHandle<()> {
spawn_sidecar(
client,
streams,
group,
interval,
Some(min_idle_ms),
false,
shutdown,
)
}
#[doc(hidden)]
pub fn spawn_maintenance(
client: RedisClient,
streams: Vec<String>,
group: String,
interval: Duration,
min_idle_ms: Option<u64>,
shutdown: CancellationToken,
) -> JoinHandle<()> {
spawn_sidecar(
client,
streams,
group,
interval,
min_idle_ms,
true,
shutdown,
)
}
#[allow(clippy::too_many_arguments)]
fn spawn_sidecar(
client: RedisClient,
streams: Vec<String>,
group: String,
interval: Duration,
min_idle_ms: Option<u64>,
trim: bool,
shutdown: CancellationToken,
) -> JoinHandle<()> {
let reaper = reaper_consumer_name(&group);
tokio::spawn(async move {
let mut conn = match acquire_conn_with_retry(&client, &shutdown, "reaper").await {
Some(c) => c,
None => return,
};
loop {
let mut needs_reconnect = false;
for stream in &streams {
if shutdown.is_cancelled() {
return;
}
let result = match min_idle_ms {
Some(idle) => {
autoclaim_all(&mut conn, stream, &group, &reaper, idle, &shutdown).await
}
None => Ok(()),
};
let result = match result {
Ok(()) if trim => trim_acked(&mut conn, stream).await,
other => other,
};
if let Err(e) = result {
tracing::warn!(
stream,
error = %e,
"reaper: sweep failed, reconnecting",
);
needs_reconnect = true;
break;
}
}
if needs_reconnect {
match acquire_conn_with_retry(&client, &shutdown, "reaper").await {
Some(c) => conn = c,
None => return,
}
}
tokio::select! {
_ = shutdown.cancelled() => return,
_ = tokio::time::sleep(interval) => {}
}
}
})
}
async fn autoclaim_all(
conn: &mut RedisConnection,
stream: &str,
group: &str,
consumer: &str,
min_idle_ms: u64,
shutdown: &CancellationToken,
) -> Result<()> {
use crate::error::ShoveError;
use redis::streams::StreamAutoClaimReply;
let mut cursor = "0-0".to_owned();
loop {
if shutdown.is_cancelled() {
return Ok(());
}
let reply: StreamAutoClaimReply = conn
.query(
redis::cmd("XAUTOCLAIM")
.arg(stream)
.arg(group)
.arg(consumer)
.arg(min_idle_ms)
.arg(&cursor)
.arg("COUNT")
.arg(AUTOCLAIM_COUNT),
)
.await
.map_err(|e| ShoveError::Connection(format!("XAUTOCLAIM failed: {e}")))?;
for entry in &reply.claimed {
let mut xadd_cmd = redis::cmd("XADD");
xadd_cmd.arg(stream).arg("*");
for (field, value) in &entry.map {
match value {
redis::Value::BulkString(bytes) => {
xadd_cmd.arg(field.as_str()).arg(bytes.as_slice());
}
redis::Value::SimpleString(s) => {
xadd_cmd.arg(field.as_str()).arg(s.as_str());
}
_ => {}
}
}
match conn.query::<redis::Value>(&mut xadd_cmd).await {
Ok(_) => {
if let Err(e) = conn
.query::<i64>(redis::cmd("XACK").arg(stream).arg(group).arg(&entry.id))
.await
{
tracing::warn!(
stream,
entry_id = %entry.id,
error = %e,
"reaper: XACK failed after redeliver — entry stays in PEL",
);
}
}
Err(e) => {
tracing::warn!(
stream,
entry_id = %entry.id,
error = %e,
"reaper: XADD redeliver failed — entry stays in PEL for next cycle",
);
}
}
}
if reply.next_stream_id == "0-0" || reply.next_stream_id.is_empty() {
break;
}
cursor = reply.next_stream_id;
}
Ok(())
}
async fn trim_acked(conn: &mut RedisConnection, stream: &str) -> Result<()> {
use redis::streams::{StreamInfoGroupsReply, StreamPendingReply};
let info: StreamInfoGroupsReply = conn
.query(redis::cmd("XINFO").arg("GROUPS").arg(stream))
.await?;
if info.groups.is_empty() {
return Ok(());
}
let mut threshold: Option<(u64, u64)> = None;
let mut threshold_raw = String::new();
for g in info.groups {
let checkpoint = if g.pending > 0 {
let pending: StreamPendingReply = conn
.query(redis::cmd("XPENDING").arg(stream).arg(&g.name))
.await?;
match pending {
StreamPendingReply::Data(data) => data.start_id,
StreamPendingReply::Empty => g.last_delivered_id,
other => {
tracing::warn!(
stream,
group = %g.name,
reply = ?other,
"trim skipped: unexpected XPENDING reply shape — stream will not shrink this sweep"
);
return Ok(());
}
}
} else {
g.last_delivered_id
};
if checkpoint == "0-0" {
return Ok(());
}
let Some(parsed) = super::stream_id::parse(&checkpoint) else {
tracing::warn!(
stream,
group = %g.name,
checkpoint,
"trim skipped: unparseable group checkpoint — stream will not shrink this sweep"
);
return Ok(());
};
if threshold.is_none_or(|t| parsed < t) {
threshold = Some(parsed);
threshold_raw = checkpoint;
}
}
let trimmed: i64 = conn
.query(
redis::cmd("XTRIM")
.arg(stream)
.arg("MINID")
.arg(&threshold_raw),
)
.await?;
if trimmed > 0 {
tracing::debug!(
stream,
trimmed,
threshold = threshold_raw,
"reaper: trimmed acked entries"
);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn reaper_consumer_name_is_stable_per_group() {
assert_eq!(reaper_consumer_name("orders"), "shove-reaper-orders");
assert_eq!(reaper_consumer_name("shove"), "shove-reaper-shove");
assert_eq!(reaper_consumer_name("g"), reaper_consumer_name("g"));
}
}