Skip to main content

awa_model/
cron.rs

1//! Periodic/cron job types and database operations.
2//!
3//! Schedules are defined in application code, synced to `awa.cron_jobs` via UPSERT,
4//! and evaluated by the leader to atomically enqueue jobs.
5
6use crate::error::AwaError;
7use crate::job::JobRow;
8use chrono::{DateTime, Utc};
9use croner::Cron;
10use sqlx::PgExecutor;
11
12/// A periodic job schedule definition.
13///
14/// Created via `PeriodicJob::builder(name, cron_expr).build(args)`.
15#[derive(Debug, Clone)]
16pub struct PeriodicJob {
17    /// Unique name identifying this schedule (e.g., "daily_report").
18    pub name: String,
19    /// Cron expression (e.g., "0 9 * * *").
20    pub cron_expr: String,
21    /// IANA timezone (e.g., "Pacific/Auckland"). Defaults to "UTC".
22    pub timezone: String,
23    /// Job kind (derived from JobArgs trait).
24    pub kind: String,
25    /// Target queue. Defaults to "default".
26    pub queue: String,
27    /// Serialized job arguments.
28    pub args: serde_json::Value,
29    /// Job priority (1-4). Defaults to 2.
30    pub priority: i16,
31    /// Max retry attempts. Defaults to 25.
32    pub max_attempts: i16,
33    /// Tags attached to created jobs.
34    pub tags: Vec<String>,
35    /// Extra metadata merged into created jobs.
36    pub metadata: serde_json::Value,
37}
38
39impl PeriodicJob {
40    /// Start building a periodic job with a name and cron expression.
41    ///
42    /// The cron expression is validated eagerly — invalid expressions
43    /// cause `build()` to return an error.
44    pub fn builder(name: impl Into<String>, cron_expr: impl Into<String>) -> PeriodicJobBuilder {
45        PeriodicJobBuilder {
46            name: name.into(),
47            cron_expr: cron_expr.into(),
48            timezone: "UTC".to_string(),
49            queue: "default".to_string(),
50            priority: 2,
51            max_attempts: 25,
52            tags: Vec::new(),
53            metadata: serde_json::json!({}),
54        }
55    }
56
57    /// Compute the latest fire time <= `now` that is strictly after `after`.
58    ///
59    /// Returns `None` if no fire time exists in the range (after, now].
60    /// This handles both "first registration" (after=None → find latest past fire)
61    /// and "regular evaluation" (after=Some(last_enqueued_at)).
62    pub fn latest_fire_time(
63        &self,
64        now: DateTime<Utc>,
65        after: Option<DateTime<Utc>>,
66    ) -> Option<DateTime<Utc>> {
67        let cron = Cron::new(&self.cron_expr)
68            .parse()
69            .expect("cron_expr was validated at build time");
70
71        let tz: chrono_tz::Tz = self
72            .timezone
73            .parse()
74            .expect("timezone was validated at build time");
75
76        let now_tz = now.with_timezone(&tz);
77
78        // Walk backwards from now to find the most recent fire time.
79        // croner doesn't have a "previous" iterator, so we find the fire time
80        // by iterating forward from a start point.
81        let search_start = match after {
82            Some(after_time) => after_time.with_timezone(&tz),
83            // For first registration, search from 24h ago to avoid unbounded iteration
84            None => now_tz - chrono::Duration::hours(24),
85        };
86
87        let mut latest_fire: Option<DateTime<Utc>> = None;
88
89        // Iterate forward from search_start, collecting fire times <= now
90        for fire_time in cron.clone().iter_from(search_start) {
91            let fire_utc = fire_time.with_timezone(&Utc);
92
93            // Stop once we've passed now
94            if fire_utc > now {
95                break;
96            }
97
98            // Skip fires at or before the `after` boundary
99            if let Some(after_time) = after {
100                if fire_utc <= after_time {
101                    continue;
102                }
103            }
104
105            latest_fire = Some(fire_utc);
106        }
107
108        latest_fire
109    }
110}
111
112/// Builder for `PeriodicJob`.
113#[derive(Debug, Clone)]
114pub struct PeriodicJobBuilder {
115    name: String,
116    cron_expr: String,
117    timezone: String,
118    queue: String,
119    priority: i16,
120    max_attempts: i16,
121    tags: Vec<String>,
122    metadata: serde_json::Value,
123}
124
125impl PeriodicJobBuilder {
126    /// Set the IANA timezone (e.g., "Pacific/Auckland").
127    pub fn timezone(mut self, timezone: impl Into<String>) -> Self {
128        self.timezone = timezone.into();
129        self
130    }
131
132    /// Set the target queue.
133    pub fn queue(mut self, queue: impl Into<String>) -> Self {
134        self.queue = queue.into();
135        self
136    }
137
138    /// Set the job priority (1-4).
139    pub fn priority(mut self, priority: i16) -> Self {
140        self.priority = priority;
141        self
142    }
143
144    /// Set the max retry attempts.
145    pub fn max_attempts(mut self, max_attempts: i16) -> Self {
146        self.max_attempts = max_attempts;
147        self
148    }
149
150    /// Set tags for created jobs.
151    pub fn tags(mut self, tags: Vec<String>) -> Self {
152        self.tags = tags;
153        self
154    }
155
156    /// Set extra metadata for created jobs.
157    pub fn metadata(mut self, metadata: serde_json::Value) -> Self {
158        self.metadata = metadata;
159        self
160    }
161
162    /// Build the periodic job, validating the cron expression and timezone.
163    ///
164    /// The `args` parameter must implement `JobArgs` — the kind is derived
165    /// from the type and args are serialized to JSON.
166    pub fn build(self, args: &impl crate::JobArgs) -> Result<PeriodicJob, AwaError> {
167        self.build_raw(args.kind_str().to_string(), args.to_args()?)
168    }
169
170    /// Build from raw kind and args JSON (used by Python bindings).
171    pub fn build_raw(self, kind: String, args: serde_json::Value) -> Result<PeriodicJob, AwaError> {
172        // Validate cron expression
173        Cron::new(&self.cron_expr)
174            .parse()
175            .map_err(|err| AwaError::Validation(format!("invalid cron expression: {err}")))?;
176
177        // Validate timezone
178        self.timezone
179            .parse::<chrono_tz::Tz>()
180            .map_err(|err| AwaError::Validation(format!("invalid timezone: {err}")))?;
181
182        // Validate priority
183        if !(1..=4).contains(&self.priority) {
184            return Err(AwaError::Validation(format!(
185                "priority must be between 1 and 4, got {}",
186                self.priority
187            )));
188        }
189
190        // Validate max_attempts
191        if !(1..=1000).contains(&self.max_attempts) {
192            return Err(AwaError::Validation(format!(
193                "max_attempts must be between 1 and 1000, got {}",
194                self.max_attempts
195            )));
196        }
197
198        Ok(PeriodicJob {
199            name: self.name,
200            cron_expr: self.cron_expr,
201            timezone: self.timezone,
202            kind,
203            queue: self.queue,
204            args,
205            priority: self.priority,
206            max_attempts: self.max_attempts,
207            tags: self.tags,
208            metadata: self.metadata,
209        })
210    }
211}
212
213/// A row from the `awa.cron_jobs` table.
214#[derive(Debug, Clone, sqlx::FromRow)]
215pub struct CronJobRow {
216    pub name: String,
217    pub cron_expr: String,
218    pub timezone: String,
219    pub kind: String,
220    pub queue: String,
221    pub args: serde_json::Value,
222    pub priority: i16,
223    pub max_attempts: i16,
224    pub tags: Vec<String>,
225    pub metadata: serde_json::Value,
226    pub last_enqueued_at: Option<DateTime<Utc>>,
227    pub created_at: DateTime<Utc>,
228    pub updated_at: DateTime<Utc>,
229}
230
231/// Upsert a periodic job schedule into `awa.cron_jobs`.
232///
233/// Additive only — never deletes rows not in the input set.
234pub async fn upsert_cron_job<'e, E>(executor: E, job: &PeriodicJob) -> Result<(), AwaError>
235where
236    E: PgExecutor<'e>,
237{
238    sqlx::query(
239        r#"
240        INSERT INTO awa.cron_jobs (name, cron_expr, timezone, kind, queue, args, priority, max_attempts, tags, metadata)
241        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
242        ON CONFLICT (name) DO UPDATE SET
243            cron_expr = EXCLUDED.cron_expr,
244            timezone = EXCLUDED.timezone,
245            kind = EXCLUDED.kind,
246            queue = EXCLUDED.queue,
247            args = EXCLUDED.args,
248            priority = EXCLUDED.priority,
249            max_attempts = EXCLUDED.max_attempts,
250            tags = EXCLUDED.tags,
251            metadata = EXCLUDED.metadata,
252            updated_at = now()
253        "#,
254    )
255    .bind(&job.name)
256    .bind(&job.cron_expr)
257    .bind(&job.timezone)
258    .bind(&job.kind)
259    .bind(&job.queue)
260    .bind(&job.args)
261    .bind(job.priority)
262    .bind(job.max_attempts)
263    .bind(&job.tags)
264    .bind(&job.metadata)
265    .execute(executor)
266    .await?;
267
268    Ok(())
269}
270
271/// Load all cron job rows from `awa.cron_jobs`.
272pub async fn list_cron_jobs<'e, E>(executor: E) -> Result<Vec<CronJobRow>, AwaError>
273where
274    E: PgExecutor<'e>,
275{
276    let rows = sqlx::query_as::<_, CronJobRow>("SELECT * FROM awa.cron_jobs ORDER BY name")
277        .fetch_all(executor)
278        .await?;
279    Ok(rows)
280}
281
282/// Delete a cron job schedule by name.
283pub async fn delete_cron_job<'e, E>(executor: E, name: &str) -> Result<bool, AwaError>
284where
285    E: PgExecutor<'e>,
286{
287    let result = sqlx::query("DELETE FROM awa.cron_jobs WHERE name = $1")
288        .bind(name)
289        .execute(executor)
290        .await?;
291    Ok(result.rows_affected() > 0)
292}
293
294/// Atomically mark a cron job as enqueued AND insert the resulting job.
295///
296/// Uses a single CTE so that both the UPDATE and INSERT happen in one
297/// atomic operation. If the process crashes mid-transaction, Postgres
298/// rolls back both. If another leader already claimed this fire time
299/// (last_enqueued_at no longer matches), the UPDATE matches 0 rows
300/// and the INSERT produces nothing.
301///
302/// Returns the inserted job row, or `None` if the fire was already claimed.
303pub async fn atomic_enqueue<'e, E>(
304    executor: E,
305    cron_name: &str,
306    fire_time: DateTime<Utc>,
307    previous_enqueued_at: Option<DateTime<Utc>>,
308) -> Result<Option<JobRow>, AwaError>
309where
310    E: PgExecutor<'e>,
311{
312    let row = sqlx::query_as::<_, JobRow>(
313        r#"
314        WITH mark AS (
315            UPDATE awa.cron_jobs
316            SET last_enqueued_at = $2, updated_at = now()
317            WHERE name = $1
318              AND (last_enqueued_at IS NOT DISTINCT FROM $3)
319            RETURNING name, kind, queue, args, priority, max_attempts, tags, metadata
320        )
321        INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, tags, metadata)
322        SELECT kind, queue, args, 'available', priority, max_attempts, tags,
323               metadata || jsonb_build_object('cron_name', name, 'cron_fire_time', $2::text)
324        FROM mark
325        RETURNING *
326        "#,
327    )
328    .bind(cron_name)
329    .bind(fire_time)
330    .bind(previous_enqueued_at)
331    .fetch_optional(executor)
332    .await?;
333
334    Ok(row)
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use chrono::TimeZone;
341
342    fn make_periodic(cron_expr: &str, timezone: &str) -> PeriodicJob {
343        PeriodicJob {
344            name: "test".to_string(),
345            cron_expr: cron_expr.to_string(),
346            timezone: timezone.to_string(),
347            kind: "test_job".to_string(),
348            queue: "default".to_string(),
349            args: serde_json::json!({}),
350            priority: 2,
351            max_attempts: 25,
352            tags: vec![],
353            metadata: serde_json::json!({}),
354        }
355    }
356
357    #[test]
358    fn test_valid_cron_expression() {
359        let result = PeriodicJob::builder("test", "0 9 * * *")
360            .build_raw("test_job".to_string(), serde_json::json!({}));
361        assert!(result.is_ok());
362    }
363
364    #[test]
365    fn test_invalid_cron_expression() {
366        let result = PeriodicJob::builder("test", "not a cron")
367            .build_raw("test_job".to_string(), serde_json::json!({}));
368        assert!(result.is_err());
369        let err = result.unwrap_err();
370        assert!(
371            err.to_string().contains("invalid cron expression"),
372            "got: {err}"
373        );
374    }
375
376    #[test]
377    fn test_invalid_timezone() {
378        let result = PeriodicJob::builder("test", "0 9 * * *")
379            .timezone("Not/A/Timezone")
380            .build_raw("test_job".to_string(), serde_json::json!({}));
381        assert!(result.is_err());
382        let err = result.unwrap_err();
383        assert!(err.to_string().contains("invalid timezone"), "got: {err}");
384    }
385
386    #[test]
387    fn test_builder_defaults() {
388        let job = PeriodicJob::builder("daily_report", "0 9 * * *")
389            .build_raw(
390                "daily_report".to_string(),
391                serde_json::json!({"format": "pdf"}),
392            )
393            .unwrap();
394        assert_eq!(job.name, "daily_report");
395        assert_eq!(job.timezone, "UTC");
396        assert_eq!(job.queue, "default");
397        assert_eq!(job.priority, 2);
398        assert_eq!(job.max_attempts, 25);
399        assert!(job.tags.is_empty());
400    }
401
402    #[test]
403    fn test_builder_custom_fields() {
404        let job = PeriodicJob::builder("report", "0 9 * * *")
405            .timezone("Pacific/Auckland")
406            .queue("reports")
407            .priority(1)
408            .max_attempts(3)
409            .tags(vec!["important".to_string()])
410            .metadata(serde_json::json!({"source": "cron"}))
411            .build_raw("daily_report".to_string(), serde_json::json!({}))
412            .unwrap();
413        assert_eq!(job.timezone, "Pacific/Auckland");
414        assert_eq!(job.queue, "reports");
415        assert_eq!(job.priority, 1);
416        assert_eq!(job.max_attempts, 3);
417        assert_eq!(job.tags, vec!["important"]);
418    }
419
420    #[test]
421    fn test_latest_fire_time_finds_past_fire() {
422        // Every hour at :00
423        let pj = make_periodic("0 * * * *", "UTC");
424        let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
425        let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 13, 0, 0).unwrap());
426
427        let fire = pj.latest_fire_time(now, after);
428        assert_eq!(
429            fire,
430            Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap())
431        );
432    }
433
434    #[test]
435    fn test_no_fire_when_next_is_future() {
436        // Every hour at :00
437        let pj = make_periodic("0 * * * *", "UTC");
438        let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
439        // Already fired at 14:00
440        let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
441
442        let fire = pj.latest_fire_time(now, after);
443        assert!(fire.is_none(), "Should not fire until 15:00");
444    }
445
446    #[test]
447    fn test_first_registration_null_last_enqueued() {
448        // Every hour at :00, registered at 14:35 with no previous fire
449        let pj = make_periodic("0 * * * *", "UTC");
450        let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
451
452        let fire = pj.latest_fire_time(now, None);
453        assert_eq!(
454            fire,
455            Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap()),
456            "Should enqueue the most recent past fire on first registration"
457        );
458    }
459
460    #[test]
461    fn test_no_backfill_only_latest_fire() {
462        // Every minute, last enqueued 1 hour ago
463        let pj = make_periodic("* * * * *", "UTC");
464        let now = Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap();
465        let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
466
467        let fire = pj.latest_fire_time(now, after);
468        // Should return 15:00, not 14:01 — only the latest missed fire
469        assert_eq!(
470            fire,
471            Some(Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap())
472        );
473    }
474
475    #[test]
476    fn test_timezone_aware_fire_time() {
477        // 9 AM daily in Auckland timezone
478        let pj = make_periodic("0 9 * * *", "Pacific/Auckland");
479        // It's 2025-06-15 21:30 UTC = 2025-06-16 09:30 NZST
480        // So 09:00 NZST on June 16 = 21:00 UTC on June 15
481        let now = Utc.with_ymd_and_hms(2025, 6, 15, 21, 30, 0).unwrap();
482        let after = Some(Utc.with_ymd_and_hms(2025, 6, 14, 21, 0, 0).unwrap());
483
484        let fire = pj.latest_fire_time(now, after);
485        // 09:00 NZST on June 16 = 21:00 UTC on June 15
486        assert_eq!(
487            fire,
488            Some(Utc.with_ymd_and_hms(2025, 6, 15, 21, 0, 0).unwrap())
489        );
490    }
491
492    #[test]
493    fn test_dst_spring_forward() {
494        // 2:30 AM US/Eastern on March 9 2025 — clocks spring forward from 2:00 to 3:00
495        // Schedule at 2:30 AM should fire once (the 2:30 time doesn't exist, so croner
496        // should skip it or fire at the next valid time)
497        let pj = make_periodic("30 2 * * *", "US/Eastern");
498        let now = Utc.with_ymd_and_hms(2025, 3, 9, 12, 0, 0).unwrap();
499        let after = Some(Utc.with_ymd_and_hms(2025, 3, 8, 12, 0, 0).unwrap());
500
501        let fire = pj.latest_fire_time(now, after);
502        // On spring-forward day, 2:30 AM doesn't exist. croner may skip it entirely
503        // or map it to 3:30 AM. Either way, we should get at most one fire.
504        let fire_count = if fire.is_some() { 1 } else { 0 };
505        assert!(
506            fire_count <= 1,
507            "Should fire at most once during spring-forward"
508        );
509    }
510
511    #[test]
512    fn test_dst_fall_back() {
513        // 1:30 AM US/Eastern on Nov 2 2025 — clocks fall back from 2:00 to 1:00
514        // 1:30 AM happens twice. Should fire exactly once.
515        let pj = make_periodic("30 1 * * *", "US/Eastern");
516        let now = Utc.with_ymd_and_hms(2025, 11, 2, 12, 0, 0).unwrap();
517        let after = Some(Utc.with_ymd_and_hms(2025, 11, 1, 12, 0, 0).unwrap());
518
519        let fire = pj.latest_fire_time(now, after);
520        assert!(fire.is_some(), "Should fire once during fall-back");
521
522        // Verify it's only one fire by checking that after this fire, no more fires exist
523        let fire_time = fire.unwrap();
524        let second_fire = pj.latest_fire_time(now, Some(fire_time));
525        assert!(
526            second_fire.is_none(),
527            "Should not fire a second time during fall-back"
528        );
529    }
530
531    #[test]
532    fn test_invalid_priority() {
533        let result = PeriodicJob::builder("test", "0 9 * * *")
534            .priority(5)
535            .build_raw("test_job".to_string(), serde_json::json!({}));
536        assert!(result.is_err());
537    }
538
539    #[test]
540    fn test_invalid_max_attempts() {
541        let result = PeriodicJob::builder("test", "0 9 * * *")
542            .max_attempts(0)
543            .build_raw("test_job".to_string(), serde_json::json!({}));
544        assert!(result.is_err());
545    }
546}