use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use slatedb::{Db, IsolationLevel};
use tokio::sync::{Notify, watch};
use tracing::{debug, warn};
use crate::clock::Clock;
use crate::error::Result;
use crate::job::{JobRecord, JobStatus};
use crate::queue::{QueueConfig, dead_key, job_index_key, pending_key};
use crate::stats::update_stats;
pub(crate) struct Reaper {
pub(crate) db: Arc<Db>,
pub(crate) interval: Duration,
pub(crate) default_queue_config: QueueConfig,
pub(crate) queue_configs: HashMap<String, QueueConfig>,
pub(crate) clock: Arc<dyn Clock>,
pub(crate) job_available: Arc<Notify>,
pub(crate) completion_notify: Arc<Notify>,
}
impl Reaper {
pub(crate) async fn run(self, mut shutdown: watch::Receiver<bool>) {
let Reaper {
db,
interval,
default_queue_config,
queue_configs,
clock,
job_available,
completion_notify,
} = self;
let any_keep_done = default_queue_config.keep_done_jobs.is_some()
|| queue_configs.values().any(|c| c.keep_done_jobs.is_some());
let any_dead_retention = default_queue_config.dead_retention.is_some()
|| queue_configs.values().any(|c| c.dead_retention.is_some());
let keep_done_for = |queue: &str| -> Option<Duration> {
queue_configs
.get(queue)
.unwrap_or(&default_queue_config)
.keep_done_jobs
};
let dead_retention_for = |queue: &str| -> Option<Duration> {
queue_configs
.get(queue)
.unwrap_or(&default_queue_config)
.dead_retention
};
loop {
tokio::select! {
_ = tokio::time::sleep(interval) => {
match reap_expired(&db, clock.as_ref(), &completion_notify).await {
Ok(0) => {}
Ok(_) => job_available.notify_waiters(),
Err(e) => warn!("lease reaper error: {e}"),
}
if any_keep_done {
if let Err(e) = sweep_done(&db, clock.as_ref(), &keep_done_for).await {
warn!("done retention sweep error: {e}");
}
}
if any_dead_retention {
if let Err(e) = sweep_dead(&db, clock.as_ref(), &dead_retention_for).await {
warn!("dead retention sweep error: {e}");
}
}
}
_ = shutdown.changed() => break,
}
}
debug!("lease reaper stopped");
}
}
pub(crate) async fn reap_expired(
db: &Db,
clock: &dyn Clock,
completion_notify: &Notify,
) -> Result<usize> {
let now = clock.now_ms();
let mut expired_keys = Vec::new();
let mut iter = db.scan_prefix(b"claimed:").await?;
while let Some(kv) = iter.next().await? {
let key_str = match std::str::from_utf8(&kv.key) {
Ok(s) => s,
Err(_) => continue,
};
let after = match key_str.strip_prefix("claimed:") {
Some(s) => s,
None => continue,
};
let ts_str = match after.split(':').next() {
Some(s) => s,
None => continue,
};
let lease_expiry = match ts_str.parse::<u64>() {
Ok(v) => v,
Err(_) => continue,
};
if lease_expiry > now {
break;
}
expired_keys.push(kv.key.clone());
}
drop(iter);
let count = expired_keys.len();
for key_bytes in expired_keys {
reap_job(db, clock, &key_bytes, completion_notify).await?;
}
Ok(count)
}
async fn reap_job(
db: &Db,
clock: &dyn Clock,
claimed_key_bytes: &[u8],
completion_notify: &Notify,
) -> Result<()> {
loop {
let txn = db.begin(IsolationLevel::Snapshot).await?;
let raw = match txn.get(claimed_key_bytes).await? {
None => {
txn.rollback();
return Ok(());
}
Some(raw) => raw,
};
let mut job: JobRecord = rmp_serde::from_slice(&raw)?;
txn.delete(claimed_key_bytes)?;
if job.attempts >= job.max_attempts {
job.status = JobStatus::Dead;
job.last_error = Some("lease expired".to_string());
job.failed_at = Some(clock.now_ms());
let dead = dead_key(&job.queue, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
txn.put(dead.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), dead.as_bytes())?;
update_stats(
&txn,
&job.queue,
&[(JobStatus::Claimed, -1), (JobStatus::Dead, 1)],
)?;
warn!(
queue = %job.queue,
job_id = %job.id,
attempts = job.attempts,
"lease expired: job dead-lettered"
);
} else {
job.status = JobStatus::Pending;
job.claimed_at = None;
job.lease_expires_at = None;
let priority = job.priority;
let pending = pending_key(&job.queue, priority, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
txn.put(pending.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), pending.as_bytes())?;
update_stats(
&txn,
&job.queue,
&[(JobStatus::Pending, 1), (JobStatus::Claimed, -1)],
)?;
debug!(
queue = %job.queue,
job_id = %job.id,
attempts = job.attempts,
"lease expired: job re-queued"
);
}
let became_dead = matches!(job.status, JobStatus::Dead);
match txn.commit().await {
Ok(_) => {
if became_dead {
completion_notify.notify_waiters();
}
return Ok(());
}
Err(e) if e.kind() == slatedb::ErrorKind::Transaction => continue,
Err(e) => return Err(e.into()),
}
}
}
async fn sweep_done(
db: &Db,
clock: &dyn Clock,
keep_done_for: &(dyn Fn(&str) -> Option<Duration> + Sync),
) -> Result<()> {
let now = clock.now_ms();
let mut victims: Vec<(Vec<u8>, String)> = Vec::new();
let mut iter = db.scan_prefix(b"done:").await?;
while let Some(kv) = iter.next().await? {
let job: JobRecord = match rmp_serde::from_slice(&kv.value) {
Ok(j) => j,
Err(_) => continue,
};
let Some(completed_at) = job.completed_at else {
continue;
};
let Some(retention) = keep_done_for(&job.queue) else {
continue;
};
let cutoff = now.saturating_sub(retention.as_millis() as u64);
if completed_at < cutoff {
victims.push((kv.key.to_vec(), job.id.clone()));
}
}
drop(iter);
for (key, id) in victims {
let txn = db.begin(IsolationLevel::Snapshot).await?;
if txn.get(&key).await?.is_some() {
txn.delete(&key)?;
txn.delete(job_index_key(&id).as_bytes())?;
}
match txn.commit().await {
Ok(_) => {}
Err(e) if e.kind() == slatedb::ErrorKind::Transaction => continue,
Err(e) => return Err(e.into()),
}
}
Ok(())
}
async fn sweep_dead(
db: &Db,
clock: &dyn Clock,
dead_retention_for: &(dyn Fn(&str) -> Option<Duration> + Sync),
) -> Result<()> {
let now = clock.now_ms();
let mut victims: Vec<(Vec<u8>, String, String)> = Vec::new();
let mut iter = db.scan_prefix(b"dead:").await?;
while let Some(kv) = iter.next().await? {
let job: JobRecord = match rmp_serde::from_slice(&kv.value) {
Ok(j) => j,
Err(_) => continue,
};
let Some(failed_at) = job.failed_at else {
continue;
};
let Some(retention) = dead_retention_for(&job.queue) else {
continue;
};
let cutoff = now.saturating_sub(retention.as_millis() as u64);
if failed_at < cutoff {
victims.push((kv.key.to_vec(), job.queue.clone(), job.id.clone()));
}
}
drop(iter);
for (key, queue, id) in victims {
let txn = db.begin(IsolationLevel::Snapshot).await?;
if txn.get(&key).await?.is_some() {
txn.delete(&key)?;
txn.delete(job_index_key(&id).as_bytes())?;
update_stats(&txn, &queue, &[(JobStatus::Dead, -1)])?;
}
match txn.commit().await {
Ok(_) => {}
Err(e) if e.kind() == slatedb::ErrorKind::Transaction => continue,
Err(e) => return Err(e.into()),
}
}
Ok(())
}