Skip to main content

qml_rs/processing/
recurring.rs

1//! Recurring-job poller.
2//!
3//! [`RecurringJobPoller`] is the runtime counterpart to the
4//! [`RecurringJob`](crate::core::RecurringJob) storage contract. Every
5//! `poll_interval` it asks the storage backend for due templates,
6//! materializes each one into a normal [`Job`] via [`Storage::enqueue`],
7//! then advances `next_run_at` and upserts the row back. Backends
8//! implement `fetch_due_recurring_jobs` with locking (Postgres: `FOR
9//! UPDATE SKIP LOCKED`, Redis: per-row `SET NX`) so two servers running
10//! the poller in parallel cannot double-fire the same tick.
11
12use std::sync::Arc;
13
14use chrono::{Duration, Utc};
15use tokio::time::interval;
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, error, info, warn};
18
19use crate::core::Job;
20use crate::error::{QmlError, Result};
21use crate::storage::Storage;
22
23/// Maximum number of recurring templates claimed per tick. A large
24/// backlog (e.g. after a long downtime) drains across multiple ticks
25/// rather than all at once.
26pub const DEFAULT_RECURRING_BATCH_SIZE: usize = 256;
27
28/// Background poller that materializes due [`RecurringJob`] templates.
29pub struct RecurringJobPoller {
30    storage: Arc<dyn Storage>,
31    poll_interval: Duration,
32    batch_size: usize,
33}
34
35impl RecurringJobPoller {
36    pub fn new(storage: Arc<dyn Storage>, poll_interval: Duration) -> Self {
37        Self {
38            storage,
39            poll_interval,
40            batch_size: DEFAULT_RECURRING_BATCH_SIZE,
41        }
42    }
43
44    /// Run the poll loop until `cancel` fires.
45    pub async fn run_until_cancelled(&self, cancel: CancellationToken) -> Result<()> {
46        info!(
47            "Starting recurring-job poller with poll interval: {:?}",
48            self.poll_interval
49        );
50
51        let mut tick =
52            interval(
53                self.poll_interval
54                    .to_std()
55                    .map_err(|e| QmlError::ConfigurationError {
56                        message: format!("Invalid recurring poll interval: {}", e),
57                    })?,
58            );
59
60        loop {
61            tokio::select! {
62                biased;
63                _ = cancel.cancelled() => {
64                    debug!("Recurring poller exiting on cancellation");
65                    return Ok(());
66                }
67                _ = tick.tick() => {}
68            }
69
70            if let Err(e) = self.tick_once().await {
71                error!("Recurring poller tick failed: {}", e);
72            }
73        }
74    }
75
76    /// One poll iteration — claim due templates, enqueue a Job for each,
77    /// advance `next_run_at`, and upsert. Exposed for tests.
78    pub async fn tick_once(&self) -> Result<usize> {
79        let now = Utc::now();
80        let due = self
81            .storage
82            .fetch_due_recurring_jobs(now, self.batch_size)
83            .await
84            .map_err(|e| QmlError::StorageError {
85                message: format!("Failed to fetch due recurring jobs: {}", e),
86            })?;
87
88        let count = due.len();
89        for mut r in due {
90            let job = Job::with_config(&r.method, r.payload.clone(), &r.queue, 0, 0);
91            if let Err(e) = self.storage.enqueue(&job).await {
92                error!("Failed to enqueue recurring job {} (firing): {}", r.id, e);
93                // Still advance next_run_at so we don't hammer the same
94                // tick on every cycle.
95            } else {
96                debug!(
97                    "Enqueued recurring firing: recurring={} job={} method={}",
98                    r.id, job.id, r.method
99                );
100            }
101
102            r.last_run_at = Some(now);
103            if let Err(e) = r.advance(now) {
104                warn!(
105                    "Failed to advance recurring job {} ({}); disabling",
106                    r.id, e
107                );
108                r.enabled = false;
109            }
110            if let Err(e) = self.storage.upsert_recurring_job(&r).await {
111                error!("Failed to upsert recurring job {}: {}", r.id, e);
112            }
113        }
114        Ok(count)
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121    use crate::core::RecurringJob;
122    use crate::storage::MemoryStorage;
123
124    #[tokio::test]
125    async fn tick_once_materializes_due_recurring_job() {
126        let storage: Arc<dyn Storage> = Arc::new(MemoryStorage::new());
127        let mut r = RecurringJob::new(
128            "every-second",
129            "* * * * * *",
130            "tick",
131            serde_json::json!({"x": 1}),
132            "default",
133        )
134        .unwrap();
135        r.next_run_at = Utc::now() - Duration::seconds(1);
136        storage.upsert_recurring_job(&r).await.unwrap();
137
138        let poller = RecurringJobPoller::new(storage.clone(), Duration::seconds(1));
139        let fired = poller.tick_once().await.unwrap();
140        assert_eq!(fired, 1);
141
142        // A job with method=`tick` should now exist.
143        let jobs = storage.list(None, None, None).await.unwrap();
144        assert!(jobs.iter().any(|j| j.method == "tick"));
145
146        // next_run_at should have advanced into the future.
147        let listed = storage.list_recurring_jobs().await.unwrap();
148        assert_eq!(listed.len(), 1);
149        assert!(listed[0].next_run_at > Utc::now() - Duration::seconds(1));
150        assert!(listed[0].last_run_at.is_some());
151    }
152}