use std::sync::Arc;
use chrono::{Duration, Utc};
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};
use crate::error::{QmlError, Result};
use crate::storage::Storage;
pub const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::minutes(1);
pub const DEFAULT_SUCCEEDED_TTL: Duration = Duration::hours(24);
pub const DEFAULT_FAILED_TTL: Duration = Duration::days(7);
pub struct CleanupWorker {
storage: Arc<dyn Storage>,
interval: Duration,
}
impl CleanupWorker {
pub fn new(storage: Arc<dyn Storage>, interval: Duration) -> Self {
Self { storage, interval }
}
pub async fn run_until_cancelled(&self, cancel: CancellationToken) -> Result<()> {
info!("Starting cleanup worker with interval: {:?}", self.interval);
let mut tick =
interval(
self.interval
.to_std()
.map_err(|e| QmlError::ConfigurationError {
message: format!("Invalid cleanup interval: {}", e),
})?,
);
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
debug!("Cleanup worker exiting on cancellation");
return Ok(());
}
_ = tick.tick() => {}
}
if let Err(e) = self.sweep_once().await {
error!("Cleanup sweep failed: {}", e);
}
}
}
pub async fn sweep_once(&self) -> Result<usize> {
let now = Utc::now();
let removed =
self.storage
.delete_expired_jobs(now)
.await
.map_err(|e| QmlError::StorageError {
message: format!("Failed to delete expired jobs: {}", e),
})?;
if removed > 0 {
info!("Cleanup worker removed {} expired jobs", removed);
}
match self.storage.cleanup_expired_named_locks(now).await {
Ok(0) => {}
Ok(n) => info!("Cleanup worker removed {} expired named locks", n),
Err(e) => error!("Failed to clean up expired named locks: {}", e),
}
Ok(removed)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{Job, JobState};
use crate::storage::MemoryStorage;
#[tokio::test]
async fn sweep_once_removes_expired_jobs() {
let storage: Arc<dyn Storage> = Arc::new(MemoryStorage::new());
let mut expired = Job::new("x", serde_json::Value::Null);
expired.state = JobState::succeeded(1, None);
expired.expires_at = Some(Utc::now() - Duration::seconds(10));
storage.enqueue(&expired).await.unwrap();
let mut fresh = Job::new("x", serde_json::Value::Null);
fresh.state = JobState::succeeded(1, None);
fresh.expires_at = Some(Utc::now() + Duration::hours(1));
storage.enqueue(&fresh).await.unwrap();
let worker = CleanupWorker::new(storage.clone(), Duration::seconds(60));
let removed = worker.sweep_once().await.unwrap();
assert_eq!(removed, 1);
assert!(storage.get(&fresh.id).await.unwrap().is_some());
assert!(storage.get(&expired.id).await.unwrap().is_none());
}
}