Skip to main content

awa_model/
cron.rs

1//! Periodic/cron job types and database operations.
2//!
3//! Schedules are defined in application code, synced to `awa.cron_jobs` via UPSERT,
4//! and evaluated by the leader to atomically enqueue jobs.
5
6use crate::error::AwaError;
7use crate::job::JobRow;
8use chrono::{DateTime, Utc};
9use croner::Cron;
10use serde::Serialize;
11use sqlx::PgExecutor;
12
13/// A periodic job schedule definition.
14///
15/// Created via `PeriodicJob::builder(name, cron_expr).build(args)`.
16#[derive(Debug, Clone)]
17pub struct PeriodicJob {
18    /// Unique name identifying this schedule (e.g., "daily_report").
19    pub name: String,
20    /// Cron expression (e.g., "0 9 * * *").
21    pub cron_expr: String,
22    /// IANA timezone (e.g., "Pacific/Auckland"). Defaults to "UTC".
23    pub timezone: String,
24    /// Job kind (derived from JobArgs trait).
25    pub kind: String,
26    /// Target queue. Defaults to "default".
27    pub queue: String,
28    /// Serialized job arguments.
29    pub args: serde_json::Value,
30    /// Job priority (1-4). Defaults to 2.
31    pub priority: i16,
32    /// Max retry attempts. Defaults to 25.
33    pub max_attempts: i16,
34    /// Tags attached to created jobs.
35    pub tags: Vec<String>,
36    /// Extra metadata merged into created jobs.
37    pub metadata: serde_json::Value,
38}
39
40impl PeriodicJob {
41    /// Start building a periodic job with a name and cron expression.
42    ///
43    /// The cron expression is validated eagerly — invalid expressions
44    /// cause `build()` to return an error.
45    pub fn builder(name: impl Into<String>, cron_expr: impl Into<String>) -> PeriodicJobBuilder {
46        PeriodicJobBuilder {
47            name: name.into(),
48            cron_expr: cron_expr.into(),
49            timezone: "UTC".to_string(),
50            queue: "default".to_string(),
51            priority: 2,
52            max_attempts: 25,
53            tags: Vec::new(),
54            metadata: serde_json::json!({}),
55        }
56    }
57
58    /// Compute the latest fire time <= `now` that is strictly after `after`.
59    ///
60    /// Returns `None` if no fire time exists in the range (after, now].
61    /// This handles both "first registration" (after=None → find latest past fire)
62    /// and "regular evaluation" (after=Some(last_enqueued_at)).
63    pub fn latest_fire_time(
64        &self,
65        now: DateTime<Utc>,
66        after: Option<DateTime<Utc>>,
67    ) -> Option<DateTime<Utc>> {
68        let cron = Cron::new(&self.cron_expr)
69            .parse()
70            .expect("cron_expr was validated at build time");
71
72        let tz: chrono_tz::Tz = self
73            .timezone
74            .parse()
75            .expect("timezone was validated at build time");
76
77        let now_tz = now.with_timezone(&tz);
78
79        // Walk backwards from now to find the most recent fire time.
80        // croner doesn't have a "previous" iterator, so we find the fire time
81        // by iterating forward from a start point.
82        let search_start = match after {
83            Some(after_time) => after_time.with_timezone(&tz),
84            // For first registration, search from 24h ago to avoid unbounded iteration
85            None => now_tz - chrono::Duration::hours(24),
86        };
87
88        let mut latest_fire: Option<DateTime<Utc>> = None;
89
90        // Iterate forward from search_start, collecting fire times <= now
91        for fire_time in cron.clone().iter_from(search_start) {
92            let fire_utc = fire_time.with_timezone(&Utc);
93
94            // Stop once we've passed now
95            if fire_utc > now {
96                break;
97            }
98
99            // Skip fires at or before the `after` boundary
100            if let Some(after_time) = after {
101                if fire_utc <= after_time {
102                    continue;
103                }
104            }
105
106            latest_fire = Some(fire_utc);
107        }
108
109        latest_fire
110    }
111}
112
113/// Builder for `PeriodicJob`.
114#[derive(Debug, Clone)]
115pub struct PeriodicJobBuilder {
116    name: String,
117    cron_expr: String,
118    timezone: String,
119    queue: String,
120    priority: i16,
121    max_attempts: i16,
122    tags: Vec<String>,
123    metadata: serde_json::Value,
124}
125
126impl PeriodicJobBuilder {
127    /// Set the IANA timezone (e.g., "Pacific/Auckland").
128    pub fn timezone(mut self, timezone: impl Into<String>) -> Self {
129        self.timezone = timezone.into();
130        self
131    }
132
133    /// Set the target queue.
134    pub fn queue(mut self, queue: impl Into<String>) -> Self {
135        self.queue = queue.into();
136        self
137    }
138
139    /// Set the job priority (1-4).
140    pub fn priority(mut self, priority: i16) -> Self {
141        self.priority = priority;
142        self
143    }
144
145    /// Set the max retry attempts.
146    pub fn max_attempts(mut self, max_attempts: i16) -> Self {
147        self.max_attempts = max_attempts;
148        self
149    }
150
151    /// Set tags for created jobs.
152    pub fn tags(mut self, tags: Vec<String>) -> Self {
153        self.tags = tags;
154        self
155    }
156
157    /// Set extra metadata for created jobs.
158    pub fn metadata(mut self, metadata: serde_json::Value) -> Self {
159        self.metadata = metadata;
160        self
161    }
162
163    /// Build the periodic job, validating the cron expression and timezone.
164    ///
165    /// The `args` parameter must implement `JobArgs` — the kind is derived
166    /// from the type and args are serialized to JSON.
167    pub fn build(self, args: &impl crate::JobArgs) -> Result<PeriodicJob, AwaError> {
168        self.build_raw(args.kind_str().to_string(), args.to_args()?)
169    }
170
171    /// Build from raw kind and args JSON (used by Python bindings).
172    pub fn build_raw(self, kind: String, args: serde_json::Value) -> Result<PeriodicJob, AwaError> {
173        // Validate cron expression
174        Cron::new(&self.cron_expr)
175            .parse()
176            .map_err(|err| AwaError::Validation(format!("invalid cron expression: {err}")))?;
177
178        // Validate timezone
179        self.timezone
180            .parse::<chrono_tz::Tz>()
181            .map_err(|err| AwaError::Validation(format!("invalid timezone: {err}")))?;
182
183        // Validate priority
184        if !(1..=4).contains(&self.priority) {
185            return Err(AwaError::Validation(format!(
186                "priority must be between 1 and 4, got {}",
187                self.priority
188            )));
189        }
190
191        // Validate max_attempts
192        if !(1..=1000).contains(&self.max_attempts) {
193            return Err(AwaError::Validation(format!(
194                "max_attempts must be between 1 and 1000, got {}",
195                self.max_attempts
196            )));
197        }
198
199        Ok(PeriodicJob {
200            name: self.name,
201            cron_expr: self.cron_expr,
202            timezone: self.timezone,
203            kind,
204            queue: self.queue,
205            args,
206            priority: self.priority,
207            max_attempts: self.max_attempts,
208            tags: self.tags,
209            metadata: self.metadata,
210        })
211    }
212}
213
214/// A row from the `awa.cron_jobs` table.
215#[derive(Debug, Clone, sqlx::FromRow, Serialize)]
216pub struct CronJobRow {
217    pub name: String,
218    pub cron_expr: String,
219    pub timezone: String,
220    pub kind: String,
221    pub queue: String,
222    pub args: serde_json::Value,
223    pub priority: i16,
224    pub max_attempts: i16,
225    pub tags: Vec<String>,
226    pub metadata: serde_json::Value,
227    pub last_enqueued_at: Option<DateTime<Utc>>,
228    pub created_at: DateTime<Utc>,
229    pub updated_at: DateTime<Utc>,
230}
231
232/// Upsert a periodic job schedule into `awa.cron_jobs`.
233///
234/// Additive only — never deletes rows not in the input set.
235pub async fn upsert_cron_job<'e, E>(executor: E, job: &PeriodicJob) -> Result<(), AwaError>
236where
237    E: PgExecutor<'e>,
238{
239    sqlx::query(
240        r#"
241        INSERT INTO awa.cron_jobs (name, cron_expr, timezone, kind, queue, args, priority, max_attempts, tags, metadata)
242        VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
243        ON CONFLICT (name) DO UPDATE SET
244            cron_expr = EXCLUDED.cron_expr,
245            timezone = EXCLUDED.timezone,
246            kind = EXCLUDED.kind,
247            queue = EXCLUDED.queue,
248            args = EXCLUDED.args,
249            priority = EXCLUDED.priority,
250            max_attempts = EXCLUDED.max_attempts,
251            tags = EXCLUDED.tags,
252            metadata = EXCLUDED.metadata,
253            updated_at = now()
254        "#,
255    )
256    .bind(&job.name)
257    .bind(&job.cron_expr)
258    .bind(&job.timezone)
259    .bind(&job.kind)
260    .bind(&job.queue)
261    .bind(&job.args)
262    .bind(job.priority)
263    .bind(job.max_attempts)
264    .bind(&job.tags)
265    .bind(&job.metadata)
266    .execute(executor)
267    .await?;
268
269    Ok(())
270}
271
272/// Load all cron job rows from `awa.cron_jobs`.
273pub async fn list_cron_jobs<'e, E>(executor: E) -> Result<Vec<CronJobRow>, AwaError>
274where
275    E: PgExecutor<'e>,
276{
277    let rows = sqlx::query_as::<_, CronJobRow>("SELECT * FROM awa.cron_jobs ORDER BY name")
278        .fetch_all(executor)
279        .await?;
280    Ok(rows)
281}
282
283/// Delete a cron job schedule by name.
284pub async fn delete_cron_job<'e, E>(executor: E, name: &str) -> Result<bool, AwaError>
285where
286    E: PgExecutor<'e>,
287{
288    let result = sqlx::query("DELETE FROM awa.cron_jobs WHERE name = $1")
289        .bind(name)
290        .execute(executor)
291        .await?;
292    Ok(result.rows_affected() > 0)
293}
294
295/// Atomically mark a cron job as enqueued AND insert the resulting job.
296///
297/// Uses a single CTE so that both the UPDATE and INSERT happen in one
298/// atomic operation. If the process crashes mid-transaction, Postgres
299/// rolls back both. If another leader already claimed this fire time
300/// (last_enqueued_at no longer matches), the UPDATE matches 0 rows
301/// and the INSERT produces nothing.
302///
303/// Returns the inserted job row, or `None` if the fire was already claimed.
304pub async fn atomic_enqueue<'e, E>(
305    executor: E,
306    cron_name: &str,
307    fire_time: DateTime<Utc>,
308    previous_enqueued_at: Option<DateTime<Utc>>,
309) -> Result<Option<JobRow>, AwaError>
310where
311    E: PgExecutor<'e>,
312{
313    let row = sqlx::query_as::<_, JobRow>(
314        r#"
315        WITH mark AS (
316            UPDATE awa.cron_jobs
317            SET last_enqueued_at = $2, updated_at = now()
318            WHERE name = $1
319              AND (last_enqueued_at IS NOT DISTINCT FROM $3)
320            RETURNING name, kind, queue, args, priority, max_attempts, tags, metadata
321        )
322        INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, tags, metadata)
323        SELECT kind, queue, args, 'available', priority, max_attempts, tags,
324               metadata || jsonb_build_object('cron_name', name, 'cron_fire_time', $2::text)
325        FROM mark
326        RETURNING *
327        "#,
328    )
329    .bind(cron_name)
330    .bind(fire_time)
331    .bind(previous_enqueued_at)
332    .fetch_optional(executor)
333    .await?;
334
335    Ok(row)
336}
337
338/// Trigger an immediate run of a cron job without updating last_enqueued_at.
339///
340/// Reads the cron job config from `awa.cron_jobs` and inserts a new job
341/// directly. Does NOT update `last_enqueued_at` so the normal schedule
342/// is unaffected.
343pub async fn trigger_cron_job<'e, E>(executor: E, name: &str) -> Result<JobRow, AwaError>
344where
345    E: PgExecutor<'e>,
346{
347    let row = sqlx::query_as::<_, JobRow>(
348        r#"
349        WITH cron AS (
350            SELECT name, kind, queue, args, priority, max_attempts, tags, metadata
351            FROM awa.cron_jobs
352            WHERE name = $1
353        )
354        INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, tags, metadata)
355        SELECT kind, queue, args, 'available', priority, max_attempts, tags,
356               metadata || jsonb_build_object('cron_name', name, 'triggered_manually', true)
357        FROM cron
358        RETURNING *
359        "#,
360    )
361    .bind(name)
362    .fetch_optional(executor)
363    .await?;
364
365    row.ok_or_else(|| AwaError::Validation(format!("cron job not found: {name}")))
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use chrono::TimeZone;
372
373    fn make_periodic(cron_expr: &str, timezone: &str) -> PeriodicJob {
374        PeriodicJob {
375            name: "test".to_string(),
376            cron_expr: cron_expr.to_string(),
377            timezone: timezone.to_string(),
378            kind: "test_job".to_string(),
379            queue: "default".to_string(),
380            args: serde_json::json!({}),
381            priority: 2,
382            max_attempts: 25,
383            tags: vec![],
384            metadata: serde_json::json!({}),
385        }
386    }
387
388    #[test]
389    fn test_valid_cron_expression() {
390        let result = PeriodicJob::builder("test", "0 9 * * *")
391            .build_raw("test_job".to_string(), serde_json::json!({}));
392        assert!(result.is_ok());
393    }
394
395    #[test]
396    fn test_invalid_cron_expression() {
397        let result = PeriodicJob::builder("test", "not a cron")
398            .build_raw("test_job".to_string(), serde_json::json!({}));
399        assert!(result.is_err());
400        let err = result.unwrap_err();
401        assert!(
402            err.to_string().contains("invalid cron expression"),
403            "got: {err}"
404        );
405    }
406
407    #[test]
408    fn test_invalid_timezone() {
409        let result = PeriodicJob::builder("test", "0 9 * * *")
410            .timezone("Not/A/Timezone")
411            .build_raw("test_job".to_string(), serde_json::json!({}));
412        assert!(result.is_err());
413        let err = result.unwrap_err();
414        assert!(err.to_string().contains("invalid timezone"), "got: {err}");
415    }
416
417    #[test]
418    fn test_builder_defaults() {
419        let job = PeriodicJob::builder("daily_report", "0 9 * * *")
420            .build_raw(
421                "daily_report".to_string(),
422                serde_json::json!({"format": "pdf"}),
423            )
424            .unwrap();
425        assert_eq!(job.name, "daily_report");
426        assert_eq!(job.timezone, "UTC");
427        assert_eq!(job.queue, "default");
428        assert_eq!(job.priority, 2);
429        assert_eq!(job.max_attempts, 25);
430        assert!(job.tags.is_empty());
431    }
432
433    #[test]
434    fn test_builder_custom_fields() {
435        let job = PeriodicJob::builder("report", "0 9 * * *")
436            .timezone("Pacific/Auckland")
437            .queue("reports")
438            .priority(1)
439            .max_attempts(3)
440            .tags(vec!["important".to_string()])
441            .metadata(serde_json::json!({"source": "cron"}))
442            .build_raw("daily_report".to_string(), serde_json::json!({}))
443            .unwrap();
444        assert_eq!(job.timezone, "Pacific/Auckland");
445        assert_eq!(job.queue, "reports");
446        assert_eq!(job.priority, 1);
447        assert_eq!(job.max_attempts, 3);
448        assert_eq!(job.tags, vec!["important"]);
449    }
450
451    #[test]
452    fn test_latest_fire_time_finds_past_fire() {
453        // Every hour at :00
454        let pj = make_periodic("0 * * * *", "UTC");
455        let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
456        let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 13, 0, 0).unwrap());
457
458        let fire = pj.latest_fire_time(now, after);
459        assert_eq!(
460            fire,
461            Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap())
462        );
463    }
464
465    #[test]
466    fn test_no_fire_when_next_is_future() {
467        // Every hour at :00
468        let pj = make_periodic("0 * * * *", "UTC");
469        let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
470        // Already fired at 14:00
471        let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
472
473        let fire = pj.latest_fire_time(now, after);
474        assert!(fire.is_none(), "Should not fire until 15:00");
475    }
476
477    #[test]
478    fn test_first_registration_null_last_enqueued() {
479        // Every hour at :00, registered at 14:35 with no previous fire
480        let pj = make_periodic("0 * * * *", "UTC");
481        let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
482
483        let fire = pj.latest_fire_time(now, None);
484        assert_eq!(
485            fire,
486            Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap()),
487            "Should enqueue the most recent past fire on first registration"
488        );
489    }
490
491    #[test]
492    fn test_no_backfill_only_latest_fire() {
493        // Every minute, last enqueued 1 hour ago
494        let pj = make_periodic("* * * * *", "UTC");
495        let now = Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap();
496        let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
497
498        let fire = pj.latest_fire_time(now, after);
499        // Should return 15:00, not 14:01 — only the latest missed fire
500        assert_eq!(
501            fire,
502            Some(Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap())
503        );
504    }
505
506    #[test]
507    fn test_timezone_aware_fire_time() {
508        // 9 AM daily in Auckland timezone
509        let pj = make_periodic("0 9 * * *", "Pacific/Auckland");
510        // It's 2025-06-15 21:30 UTC = 2025-06-16 09:30 NZST
511        // So 09:00 NZST on June 16 = 21:00 UTC on June 15
512        let now = Utc.with_ymd_and_hms(2025, 6, 15, 21, 30, 0).unwrap();
513        let after = Some(Utc.with_ymd_and_hms(2025, 6, 14, 21, 0, 0).unwrap());
514
515        let fire = pj.latest_fire_time(now, after);
516        // 09:00 NZST on June 16 = 21:00 UTC on June 15
517        assert_eq!(
518            fire,
519            Some(Utc.with_ymd_and_hms(2025, 6, 15, 21, 0, 0).unwrap())
520        );
521    }
522
523    #[test]
524    fn test_dst_spring_forward() {
525        // 2:30 AM US/Eastern on March 9 2025 — clocks spring forward from 2:00 to 3:00
526        // Schedule at 2:30 AM should fire once (the 2:30 time doesn't exist, so croner
527        // should skip it or fire at the next valid time)
528        let pj = make_periodic("30 2 * * *", "US/Eastern");
529        let now = Utc.with_ymd_and_hms(2025, 3, 9, 12, 0, 0).unwrap();
530        let after = Some(Utc.with_ymd_and_hms(2025, 3, 8, 12, 0, 0).unwrap());
531
532        let fire = pj.latest_fire_time(now, after);
533        // On spring-forward day, 2:30 AM doesn't exist. croner may skip it entirely
534        // or map it to 3:30 AM. Either way, we should get at most one fire.
535        let fire_count = if fire.is_some() { 1 } else { 0 };
536        assert!(
537            fire_count <= 1,
538            "Should fire at most once during spring-forward"
539        );
540    }
541
542    #[test]
543    fn test_dst_fall_back() {
544        // 1:30 AM US/Eastern on Nov 2 2025 — clocks fall back from 2:00 to 1:00
545        // 1:30 AM happens twice. Should fire exactly once.
546        let pj = make_periodic("30 1 * * *", "US/Eastern");
547        let now = Utc.with_ymd_and_hms(2025, 11, 2, 12, 0, 0).unwrap();
548        let after = Some(Utc.with_ymd_and_hms(2025, 11, 1, 12, 0, 0).unwrap());
549
550        let fire = pj.latest_fire_time(now, after);
551        assert!(fire.is_some(), "Should fire once during fall-back");
552
553        // Verify it's only one fire by checking that after this fire, no more fires exist
554        let fire_time = fire.unwrap();
555        let second_fire = pj.latest_fire_time(now, Some(fire_time));
556        assert!(
557            second_fire.is_none(),
558            "Should not fire a second time during fall-back"
559        );
560    }
561
562    #[test]
563    fn test_invalid_priority() {
564        let result = PeriodicJob::builder("test", "0 9 * * *")
565            .priority(5)
566            .build_raw("test_job".to_string(), serde_json::json!({}));
567        assert!(result.is_err());
568    }
569
570    #[test]
571    fn test_invalid_max_attempts() {
572        let result = PeriodicJob::builder("test", "0 9 * * *")
573            .max_attempts(0)
574            .build_raw("test_job".to_string(), serde_json::json!({}));
575        assert!(result.is_err());
576    }
577}