qml_rs/processing/
cleanup.rs1use std::sync::Arc;
11
12use chrono::{Duration, Utc};
13use tokio::time::interval;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, error, info};
16
17use crate::error::{QmlError, Result};
18use crate::storage::Storage;
19
20pub const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::minutes(1);
22
23pub const DEFAULT_SUCCEEDED_TTL: Duration = Duration::hours(24);
25
26pub const DEFAULT_FAILED_TTL: Duration = Duration::days(7);
28
29pub struct CleanupWorker {
30 storage: Arc<dyn Storage>,
31 interval: Duration,
32}
33
34impl CleanupWorker {
35 pub fn new(storage: Arc<dyn Storage>, interval: Duration) -> Self {
36 Self { storage, interval }
37 }
38
39 pub async fn run_until_cancelled(&self, cancel: CancellationToken) -> Result<()> {
41 info!("Starting cleanup worker with interval: {:?}", self.interval);
42
43 let mut tick =
44 interval(
45 self.interval
46 .to_std()
47 .map_err(|e| QmlError::ConfigurationError {
48 message: format!("Invalid cleanup interval: {}", e),
49 })?,
50 );
51
52 loop {
53 tokio::select! {
54 biased;
55 _ = cancel.cancelled() => {
56 debug!("Cleanup worker exiting on cancellation");
57 return Ok(());
58 }
59 _ = tick.tick() => {}
60 }
61
62 if let Err(e) = self.sweep_once().await {
63 error!("Cleanup sweep failed: {}", e);
64 }
65 }
66 }
67
68 pub async fn sweep_once(&self) -> Result<usize> {
79 let now = Utc::now();
80 let removed =
81 self.storage
82 .delete_expired_jobs(now)
83 .await
84 .map_err(|e| QmlError::StorageError {
85 message: format!("Failed to delete expired jobs: {}", e),
86 })?;
87 if removed > 0 {
88 info!("Cleanup worker removed {} expired jobs", removed);
89 }
90
91 match self.storage.cleanup_expired_named_locks(now).await {
92 Ok(0) => {}
93 Ok(n) => info!("Cleanup worker removed {} expired named locks", n),
94 Err(e) => error!("Failed to clean up expired named locks: {}", e),
98 }
99
100 Ok(removed)
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use crate::core::{Job, JobState};
108 use crate::storage::MemoryStorage;
109
110 #[tokio::test]
111 async fn sweep_once_removes_expired_jobs() {
112 let storage: Arc<dyn Storage> = Arc::new(MemoryStorage::new());
113
114 let mut expired = Job::new("x", serde_json::Value::Null);
115 expired.state = JobState::succeeded(1, None);
116 expired.expires_at = Some(Utc::now() - Duration::seconds(10));
117 storage.enqueue(&expired).await.unwrap();
118
119 let mut fresh = Job::new("x", serde_json::Value::Null);
120 fresh.state = JobState::succeeded(1, None);
121 fresh.expires_at = Some(Utc::now() + Duration::hours(1));
122 storage.enqueue(&fresh).await.unwrap();
123
124 let worker = CleanupWorker::new(storage.clone(), Duration::seconds(60));
125 let removed = worker.sweep_once().await.unwrap();
126 assert_eq!(removed, 1);
127 assert!(storage.get(&fresh.id).await.unwrap().is_some());
128 assert!(storage.get(&expired.id).await.unwrap().is_none());
129 }
130}