use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::watch;
use tracing::{debug, info, warn};
use crate::dal::DAL;
#[derive(Debug, Clone)]
pub struct StaleClaimSweeperConfig {
pub sweep_interval: Duration,
pub stale_threshold: Duration,
}
impl Default for StaleClaimSweeperConfig {
fn default() -> Self {
Self {
sweep_interval: Duration::from_secs(30),
stale_threshold: Duration::from_secs(60),
}
}
}
pub struct StaleClaimSweeper {
dal: Arc<DAL>,
config: StaleClaimSweeperConfig,
shutdown_rx: watch::Receiver<bool>,
ready_at: Instant,
}
impl StaleClaimSweeper {
pub fn new(
dal: Arc<DAL>,
config: StaleClaimSweeperConfig,
shutdown_rx: watch::Receiver<bool>,
) -> Self {
Self {
dal,
config,
shutdown_rx,
ready_at: Instant::now(),
}
}
pub async fn run(&mut self) {
info!(
"Starting stale claim sweeper (interval: {}s, threshold: {}s, grace period: {}s)",
self.config.sweep_interval.as_secs(),
self.config.stale_threshold.as_secs(),
self.config.stale_threshold.as_secs(),
);
let mut interval = tokio::time::interval(self.config.sweep_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = interval.tick() => {
self.sweep().await;
}
_ = self.shutdown_rx.changed() => {
if *self.shutdown_rx.borrow() {
info!("Stale claim sweeper shutting down");
break;
}
}
}
}
}
pub async fn sweep(&self) {
let uptime = self.ready_at.elapsed();
if uptime < self.config.stale_threshold {
debug!(
"Stale claim sweeper in grace period ({:.1}s / {}s) — skipping sweep",
uptime.as_secs_f64(),
self.config.stale_threshold.as_secs()
);
return;
}
let stale_claims = match self
.dal
.task_execution()
.find_stale_claims(self.config.stale_threshold)
.await
{
Ok(claims) => claims,
Err(e) => {
warn!("Stale claim sweep failed: {}", e);
return;
}
};
if stale_claims.is_empty() {
debug!("Stale claim sweep: no stale claims found");
return;
}
info!(
"Stale claim sweep found {} stale claims",
stale_claims.len()
);
for claim in &stale_claims {
let age = chrono::Utc::now() - claim.heartbeat_at;
if let Err(e) = self
.dal
.task_execution()
.release_runner_claim(claim.task_id)
.await
{
warn!(
"Failed to release stale claim on task {}: {}",
claim.task_id, e
);
continue;
}
if let Err(e) = self.dal.task_execution().mark_ready(claim.task_id).await {
warn!(
"Failed to reset task {} to Ready after stale claim release: {}",
claim.task_id, e
);
continue;
}
info!(
"Released stale claim: task {} (runner {}, last heartbeat {}s ago)",
claim.task_id,
claim.claimed_by,
age.num_seconds()
);
}
info!(
"Stale claim sweep complete: {} claims released",
stale_claims.len()
);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_defaults() {
let config = StaleClaimSweeperConfig::default();
assert_eq!(config.sweep_interval, Duration::from_secs(30));
assert_eq!(config.stale_threshold, Duration::from_secs(60));
}
#[test]
fn config_custom_values() {
let config = StaleClaimSweeperConfig {
sweep_interval: Duration::from_secs(10),
stale_threshold: Duration::from_secs(120),
};
assert_eq!(config.sweep_interval, Duration::from_secs(10));
assert_eq!(config.stale_threshold, Duration::from_secs(120));
}
#[test]
fn config_clone() {
let config = StaleClaimSweeperConfig::default();
let cloned = config.clone();
assert_eq!(config.sweep_interval, cloned.sweep_interval);
assert_eq!(config.stale_threshold, cloned.stale_threshold);
}
}