qml-rs 1.1.0

A Rust implementation of QML background job processing
Documentation
//! Expired-job cleanup worker.
//!
//! [`CleanupWorker`] periodically deletes jobs whose `expires_at` has
//! passed. `expires_at` is stamped by
//! [`JobProcessor`](crate::processing::JobProcessor) when a job enters a
//! final state, using the server's configured `succeeded_ttl` /
//! `failed_ttl`. Running this out-of-band (rather than sweeping on every
//! enqueue) keeps the hot path O(1).

use std::sync::Arc;

use chrono::{Duration, Utc};
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info};

use crate::error::{QmlError, Result};
use crate::storage::Storage;

/// Default interval between cleanup sweeps.
pub const DEFAULT_CLEANUP_INTERVAL: Duration = Duration::minutes(1);

/// Default time-to-live for successfully completed jobs.
pub const DEFAULT_SUCCEEDED_TTL: Duration = Duration::hours(24);

/// Default time-to-live for permanently-failed jobs.
pub const DEFAULT_FAILED_TTL: Duration = Duration::days(7);

pub struct CleanupWorker {
    storage: Arc<dyn Storage>,
    interval: Duration,
}

impl CleanupWorker {
    pub fn new(storage: Arc<dyn Storage>, interval: Duration) -> Self {
        Self { storage, interval }
    }

    /// Run the cleanup loop until `cancel` fires.
    pub async fn run_until_cancelled(&self, cancel: CancellationToken) -> Result<()> {
        info!("Starting cleanup worker with interval: {:?}", self.interval);

        let mut tick =
            interval(
                self.interval
                    .to_std()
                    .map_err(|e| QmlError::ConfigurationError {
                        message: format!("Invalid cleanup interval: {}", e),
                    })?,
            );

        loop {
            tokio::select! {
                biased;
                _ = cancel.cancelled() => {
                    debug!("Cleanup worker exiting on cancellation");
                    return Ok(());
                }
                _ = tick.tick() => {}
            }

            if let Err(e) = self.sweep_once().await {
                error!("Cleanup sweep failed: {}", e);
            }
        }
    }

    /// One cleanup sweep. Exposed for tests.
    pub async fn sweep_once(&self) -> Result<usize> {
        let removed = self
            .storage
            .delete_expired_jobs(Utc::now())
            .await
            .map_err(|e| QmlError::StorageError {
                message: format!("Failed to delete expired jobs: {}", e),
            })?;
        if removed > 0 {
            info!("Cleanup worker removed {} expired jobs", removed);
        }
        Ok(removed)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::core::{Job, JobState};
    use crate::storage::MemoryStorage;

    #[tokio::test]
    async fn sweep_once_removes_expired_jobs() {
        let storage: Arc<dyn Storage> = Arc::new(MemoryStorage::new());

        let mut expired = Job::new("x", serde_json::Value::Null);
        expired.state = JobState::succeeded(1, None);
        expired.expires_at = Some(Utc::now() - Duration::seconds(10));
        storage.enqueue(&expired).await.unwrap();

        let mut fresh = Job::new("x", serde_json::Value::Null);
        fresh.state = JobState::succeeded(1, None);
        fresh.expires_at = Some(Utc::now() + Duration::hours(1));
        storage.enqueue(&fresh).await.unwrap();

        let worker = CleanupWorker::new(storage.clone(), Duration::seconds(60));
        let removed = worker.sweep_once().await.unwrap();
        assert_eq!(removed, 1);
        assert!(storage.get(&fresh.id).await.unwrap().is_some());
        assert!(storage.get(&expired.id).await.unwrap().is_none());
    }
}