Skip to main content

qml_rs/processing/
cleanup.rs

1//! Expired-job cleanup worker.
2//!
3//! [`CleanupWorker`] periodically deletes jobs whose `expires_at` has
4//! passed. `expires_at` is stamped by
5//! [`JobProcessor`](crate::processing::JobProcessor) when a job enters a
6//! final state, using the server's configured `succeeded_ttl` /
7//! `failed_ttl`. Running this out-of-band (rather than sweeping on every
8//! enqueue) keeps the hot path O(1).
9
10use std::sync::Arc;
11
12use chrono::{Duration, Utc};
13use tokio::time::interval;
14use tokio_util::sync::CancellationToken;
15use tracing::{debug, error, info};
16
17use crate::error::{QmlError, Result};
18use crate::storage::Storage;
19
20/// Default interval between cleanup sweeps.
21pub const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::minutes(1);
22
23/// Default time-to-live for successfully completed jobs.
24pub const DEFAULT_SUCCEEDED_TTL: Duration = Duration::hours(24);
25
26/// Default time-to-live for permanently-failed jobs.
27pub const DEFAULT_FAILED_TTL: Duration = Duration::days(7);
28
29pub struct CleanupWorker {
30    storage: Arc<dyn Storage>,
31    interval: Duration,
32}
33
34impl CleanupWorker {
35    pub fn new(storage: Arc<dyn Storage>, interval: Duration) -> Self {
36        Self { storage, interval }
37    }
38
39    /// Run the cleanup loop until `cancel` fires.
40    pub async fn run_until_cancelled(&self, cancel: CancellationToken) -> Result<()> {
41        info!("Starting cleanup worker with interval: {:?}", self.interval);
42
43        let mut tick =
44            interval(
45                self.interval
46                    .to_std()
47                    .map_err(|e| QmlError::ConfigurationError {
48                        message: format!("Invalid cleanup interval: {}", e),
49                    })?,
50            );
51
52        loop {
53            tokio::select! {
54                biased;
55                _ = cancel.cancelled() => {
56                    debug!("Cleanup worker exiting on cancellation");
57                    return Ok(());
58                }
59                _ = tick.tick() => {}
60            }
61
62            if let Err(e) = self.sweep_once().await {
63                error!("Cleanup sweep failed: {}", e);
64            }
65        }
66    }
67
68    /// One cleanup sweep. Exposed for tests.
69    ///
70    /// Two pieces of work, in order:
71    /// 1. Delete jobs whose `expires_at` has passed (final-state sweep).
72    /// 2. Delete generic named locks whose `expires_at` has passed.
73    ///
74    /// Both share a single `now` so a tick is consistent. Returns the
75    /// number of expired *jobs* removed (preserved for backward
76    /// compatibility with the existing test); named-lock removal is
77    /// logged but doesn't change the return.
78    pub async fn sweep_once(&self) -> Result<usize> {
79        let now = Utc::now();
80        let removed =
81            self.storage
82                .delete_expired_jobs(now)
83                .await
84                .map_err(|e| QmlError::StorageError {
85                    message: format!("Failed to delete expired jobs: {}", e),
86                })?;
87        if removed > 0 {
88            info!("Cleanup worker removed {} expired jobs", removed);
89        }
90
91        match self.storage.cleanup_expired_named_locks(now).await {
92            Ok(0) => {}
93            Ok(n) => info!("Cleanup worker removed {} expired named locks", n),
94            // A failed lock sweep shouldn't poison the whole tick — we
95            // already swept jobs successfully and a future tick will
96            // retry the lock cleanup.
97            Err(e) => error!("Failed to clean up expired named locks: {}", e),
98        }
99
100        Ok(removed)
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use crate::core::{Job, JobState};
108    use crate::storage::MemoryStorage;
109
110    #[tokio::test]
111    async fn sweep_once_removes_expired_jobs() {
112        let storage: Arc<dyn Storage> = Arc::new(MemoryStorage::new());
113
114        let mut expired = Job::new("x", serde_json::Value::Null);
115        expired.state = JobState::succeeded(1, None);
116        expired.expires_at = Some(Utc::now() - Duration::seconds(10));
117        storage.enqueue(&expired).await.unwrap();
118
119        let mut fresh = Job::new("x", serde_json::Value::Null);
120        fresh.state = JobState::succeeded(1, None);
121        fresh.expires_at = Some(Utc::now() + Duration::hours(1));
122        storage.enqueue(&fresh).await.unwrap();
123
124        let worker = CleanupWorker::new(storage.clone(), Duration::seconds(60));
125        let removed = worker.sweep_once().await.unwrap();
126        assert_eq!(removed, 1);
127        assert!(storage.get(&fresh.id).await.unwrap().is_some());
128        assert!(storage.get(&expired.id).await.unwrap().is_none());
129    }
130}