qml_rs/processing/
recurring.rs1use std::sync::Arc;
13
14use chrono::{Duration, Utc};
15use tokio::time::interval;
16use tokio_util::sync::CancellationToken;
17use tracing::{debug, error, info, warn};
18
19use crate::core::Job;
20use crate::error::{QmlError, Result};
21use crate::storage::Storage;
22
23pub const DEFAULT_RECURRING_BATCH_SIZE: usize = 256;
27
28pub struct RecurringJobPoller {
30 storage: Arc<dyn Storage>,
31 poll_interval: Duration,
32 batch_size: usize,
33}
34
35impl RecurringJobPoller {
36 pub fn new(storage: Arc<dyn Storage>, poll_interval: Duration) -> Self {
37 Self {
38 storage,
39 poll_interval,
40 batch_size: DEFAULT_RECURRING_BATCH_SIZE,
41 }
42 }
43
44 pub async fn run_until_cancelled(&self, cancel: CancellationToken) -> Result<()> {
46 info!(
47 "Starting recurring-job poller with poll interval: {:?}",
48 self.poll_interval
49 );
50
51 let mut tick =
52 interval(
53 self.poll_interval
54 .to_std()
55 .map_err(|e| QmlError::ConfigurationError {
56 message: format!("Invalid recurring poll interval: {}", e),
57 })?,
58 );
59
60 loop {
61 tokio::select! {
62 biased;
63 _ = cancel.cancelled() => {
64 debug!("Recurring poller exiting on cancellation");
65 return Ok(());
66 }
67 _ = tick.tick() => {}
68 }
69
70 if let Err(e) = self.tick_once().await {
71 error!("Recurring poller tick failed: {}", e);
72 }
73 }
74 }
75
76 pub async fn tick_once(&self) -> Result<usize> {
79 let now = Utc::now();
80 let due = self
81 .storage
82 .fetch_due_recurring_jobs(now, self.batch_size)
83 .await
84 .map_err(|e| QmlError::StorageError {
85 message: format!("Failed to fetch due recurring jobs: {}", e),
86 })?;
87
88 let count = due.len();
89 for mut r in due {
90 let job = Job::with_config(&r.method, r.payload.clone(), &r.queue, 0, 0);
91 if let Err(e) = self.storage.enqueue(&job).await {
92 error!("Failed to enqueue recurring job {} (firing): {}", r.id, e);
93 } else {
96 debug!(
97 "Enqueued recurring firing: recurring={} job={} method={}",
98 r.id, job.id, r.method
99 );
100 }
101
102 r.last_run_at = Some(now);
103 if let Err(e) = r.advance(now) {
104 warn!(
105 "Failed to advance recurring job {} ({}); disabling",
106 r.id, e
107 );
108 r.enabled = false;
109 }
110 if let Err(e) = self.storage.upsert_recurring_job(&r).await {
111 error!("Failed to upsert recurring job {}: {}", r.id, e);
112 }
113 }
114 Ok(count)
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121 use crate::core::RecurringJob;
122 use crate::storage::MemoryStorage;
123
124 #[tokio::test]
125 async fn tick_once_materializes_due_recurring_job() {
126 let storage: Arc<dyn Storage> = Arc::new(MemoryStorage::new());
127 let mut r = RecurringJob::new(
128 "every-second",
129 "* * * * * *",
130 "tick",
131 serde_json::json!({"x": 1}),
132 "default",
133 )
134 .unwrap();
135 r.next_run_at = Utc::now() - Duration::seconds(1);
136 storage.upsert_recurring_job(&r).await.unwrap();
137
138 let poller = RecurringJobPoller::new(storage.clone(), Duration::seconds(1));
139 let fired = poller.tick_once().await.unwrap();
140 assert_eq!(fired, 1);
141
142 let jobs = storage.list(None, None, None).await.unwrap();
144 assert!(jobs.iter().any(|j| j.method == "tick"));
145
146 let listed = storage.list_recurring_jobs().await.unwrap();
148 assert_eq!(listed.len(), 1);
149 assert!(listed[0].next_run_at > Utc::now() - Duration::seconds(1));
150 assert!(listed[0].last_run_at.is_some());
151 }
152}