use std::sync::Arc;
use chrono::{Duration, Utc};
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use crate::core::Job;
use crate::error::{QmlError, Result};
use crate::storage::Storage;
pub const DEFAULT_RECURRING_BATCH_SIZE: usize = 256;
pub struct RecurringJobPoller {
storage: Arc<dyn Storage>,
poll_interval: Duration,
batch_size: usize,
}
impl RecurringJobPoller {
pub fn new(storage: Arc<dyn Storage>, poll_interval: Duration) -> Self {
Self {
storage,
poll_interval,
batch_size: DEFAULT_RECURRING_BATCH_SIZE,
}
}
pub async fn run_until_cancelled(&self, cancel: CancellationToken) -> Result<()> {
info!(
"Starting recurring-job poller with poll interval: {:?}",
self.poll_interval
);
let mut tick =
interval(
self.poll_interval
.to_std()
.map_err(|e| QmlError::ConfigurationError {
message: format!("Invalid recurring poll interval: {}", e),
})?,
);
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => {
debug!("Recurring poller exiting on cancellation");
return Ok(());
}
_ = tick.tick() => {}
}
if let Err(e) = self.tick_once().await {
error!("Recurring poller tick failed: {}", e);
}
}
}
pub async fn tick_once(&self) -> Result<usize> {
let now = Utc::now();
let due = self
.storage
.fetch_due_recurring_jobs(now, self.batch_size)
.await
.map_err(|e| QmlError::StorageError {
message: format!("Failed to fetch due recurring jobs: {}", e),
})?;
let count = due.len();
for mut r in due {
let job = Job::with_config(&r.method, r.payload.clone(), &r.queue, 0, 0);
if let Err(e) = self.storage.enqueue(&job).await {
error!("Failed to enqueue recurring job {} (firing): {}", r.id, e);
} else {
debug!(
"Enqueued recurring firing: recurring={} job={} method={}",
r.id, job.id, r.method
);
}
r.last_run_at = Some(now);
if let Err(e) = r.advance(now) {
warn!(
"Failed to advance recurring job {} ({}); disabling",
r.id, e
);
r.enabled = false;
}
if let Err(e) = self.storage.upsert_recurring_job(&r).await {
error!("Failed to upsert recurring job {}: {}", r.id, e);
}
}
Ok(count)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::RecurringJob;
use crate::storage::MemoryStorage;
#[tokio::test]
async fn tick_once_materializes_due_recurring_job() {
let storage: Arc<dyn Storage> = Arc::new(MemoryStorage::new());
let mut r = RecurringJob::new(
"every-second",
"* * * * * *",
"tick",
serde_json::json!({"x": 1}),
"default",
)
.unwrap();
r.next_run_at = Utc::now() - Duration::seconds(1);
storage.upsert_recurring_job(&r).await.unwrap();
let poller = RecurringJobPoller::new(storage.clone(), Duration::seconds(1));
let fired = poller.tick_once().await.unwrap();
assert_eq!(fired, 1);
let jobs = storage.list(None, None, None).await.unwrap();
assert!(jobs.iter().any(|j| j.method == "tick"));
let listed = storage.list_recurring_jobs().await.unwrap();
assert_eq!(listed.len(), 1);
assert!(listed[0].next_run_at > Utc::now() - Duration::seconds(1));
assert!(listed[0].last_run_at.is_some());
}
}