Skip to main content

qml_rs/core/
recurring.rs

1//! Recurring job definition.
2//!
3//! A [`RecurringJob`] is a cron-scheduled template that the
4//! [`RecurringJobPoller`](crate::processing::RecurringJobPoller) materializes
5//! into a regular [`Job`](crate::core::Job) each time its cron schedule
6//! fires. The template itself is persisted in a dedicated table/keyspace
7//! separate from the job queue.
8//!
9//! ## Example
10//! ```rust
11//! use qml_rs::RecurringJob;
12//! use serde_json::json;
13//!
14//! // Every day at 09:00 UTC
15//! let r = RecurringJob::new(
16//!     "daily-report",
17//!     "0 0 9 * * *",
18//!     "generate_report",
19//!     json!({ "kind": "daily" }),
20//!     "default",
21//! ).unwrap();
22//! assert_eq!(r.id, "daily-report");
23//! ```
24
25use crate::error::{QmlError, Result};
26use chrono::{DateTime, Utc};
27use cron::Schedule;
28use serde::{Deserialize, Serialize};
29use serde_json::Value as JsonValue;
30use std::str::FromStr;
31
32/// A cron-scheduled job template.
33#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
34pub struct RecurringJob {
35    /// Stable recurring-job id (caller-chosen, e.g. `"daily-report"`).
36    pub id: String,
37    /// Cron expression in the [`cron`] crate's 6/7-field format
38    /// (`sec min hour day month dow [year]`).
39    pub cron: String,
40    /// Method name registered with the worker registry.
41    pub method: String,
42    /// JSON payload passed to the worker on each firing.
43    pub payload: JsonValue,
44    /// Queue the materialized jobs land in.
45    pub queue: String,
46    /// Next firing time (computed from `cron`).
47    pub next_run_at: DateTime<Utc>,
48    /// When the template last produced a job, or `None` if never fired.
49    pub last_run_at: Option<DateTime<Utc>>,
50    /// When the template was first created.
51    pub created_at: DateTime<Utc>,
52    /// When the template was last mutated.
53    pub updated_at: DateTime<Utc>,
54    /// Disabled templates are ignored by the poller.
55    pub enabled: bool,
56}
57
58impl RecurringJob {
59    /// Creates a new recurring-job template. Parses `cron` and computes
60    /// `next_run_at` from `now`. Returns a validation error if the
61    /// expression is invalid or has no future occurrences.
62    pub fn new(
63        id: impl Into<String>,
64        cron: impl Into<String>,
65        method: impl Into<String>,
66        payload: JsonValue,
67        queue: impl Into<String>,
68    ) -> Result<Self> {
69        let cron = cron.into();
70        let next_run_at = next_after(&cron, Utc::now())?;
71        let now = Utc::now();
72        Ok(Self {
73            id: id.into(),
74            cron,
75            method: method.into(),
76            payload,
77            queue: queue.into(),
78            next_run_at,
79            last_run_at: None,
80            created_at: now,
81            updated_at: now,
82            enabled: true,
83        })
84    }
85
86    /// Recomputes `next_run_at` to the next occurrence strictly after
87    /// `from`, updating `updated_at` in the process. Returns an error if
88    /// the stored cron expression is no longer parseable or has no future
89    /// occurrence (the latter shouldn't happen for sane expressions).
90    pub fn advance(&mut self, from: DateTime<Utc>) -> Result<()> {
91        self.next_run_at = next_after(&self.cron, from)?;
92        self.updated_at = Utc::now();
93        Ok(())
94    }
95
96    /// Parses the stored cron expression. Useful when a caller needs more
97    /// than just "next firing" (e.g. listing upcoming runs).
98    pub fn schedule(&self) -> Result<Schedule> {
99        parse_schedule(&self.cron)
100    }
101}
102
103fn parse_schedule(expr: &str) -> Result<Schedule> {
104    Schedule::from_str(expr).map_err(|e| QmlError::InvalidJobData {
105        message: format!("Invalid cron expression `{}`: {}", expr, e),
106    })
107}
108
109fn next_after(expr: &str, from: DateTime<Utc>) -> Result<DateTime<Utc>> {
110    let schedule = parse_schedule(expr)?;
111    schedule
112        .after(&from)
113        .next()
114        .ok_or_else(|| QmlError::InvalidJobData {
115            message: format!("Cron expression `{}` has no future occurrences", expr),
116        })
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122    use serde_json::json;
123
124    #[test]
125    fn new_parses_cron_and_computes_next_run() {
126        let r = RecurringJob::new(
127            "every-second",
128            "* * * * * *",
129            "tick",
130            json!(null),
131            "default",
132        )
133        .unwrap();
134        assert!(r.next_run_at >= Utc::now());
135        assert!(r.enabled);
136        assert!(r.last_run_at.is_none());
137    }
138
139    #[test]
140    fn new_rejects_invalid_cron() {
141        let err = RecurringJob::new("bad", "not a cron", "x", json!(null), "default").unwrap_err();
142        assert!(matches!(err, QmlError::InvalidJobData { .. }));
143    }
144
145    #[test]
146    fn advance_moves_next_run_forward() {
147        let mut r = RecurringJob::new("m", "0 * * * * *", "tick", json!(null), "default").unwrap();
148        let before = r.next_run_at;
149        r.advance(before).unwrap();
150        assert!(r.next_run_at > before);
151    }
152}