1use crate::error::AwaError;
7use crate::job::JobRow;
8use chrono::{DateTime, Utc};
9use croner::Cron;
10use serde::Serialize;
11use sqlx::PgExecutor;
12
13#[derive(Debug, Clone)]
17pub struct PeriodicJob {
18 pub name: String,
20 pub cron_expr: String,
22 pub timezone: String,
24 pub kind: String,
26 pub queue: String,
28 pub args: serde_json::Value,
30 pub priority: i16,
32 pub max_attempts: i16,
34 pub tags: Vec<String>,
36 pub metadata: serde_json::Value,
38}
39
40impl PeriodicJob {
41 pub fn builder(name: impl Into<String>, cron_expr: impl Into<String>) -> PeriodicJobBuilder {
46 PeriodicJobBuilder {
47 name: name.into(),
48 cron_expr: cron_expr.into(),
49 timezone: "UTC".to_string(),
50 queue: "default".to_string(),
51 priority: 2,
52 max_attempts: 25,
53 tags: Vec::new(),
54 metadata: serde_json::json!({}),
55 }
56 }
57
58 pub fn latest_fire_time(
64 &self,
65 now: DateTime<Utc>,
66 after: Option<DateTime<Utc>>,
67 ) -> Option<DateTime<Utc>> {
68 let cron = Cron::new(&self.cron_expr)
69 .with_seconds_optional()
70 .parse()
71 .expect("cron_expr was validated at build time");
72
73 let tz: chrono_tz::Tz = self
74 .timezone
75 .parse()
76 .expect("timezone was validated at build time");
77
78 let now_tz = now.with_timezone(&tz);
79
80 let search_start = match after {
84 Some(after_time) => after_time.with_timezone(&tz),
85 None => now_tz - chrono::Duration::hours(24),
87 };
88
89 let mut latest_fire: Option<DateTime<Utc>> = None;
90
91 for fire_time in cron.clone().iter_from(search_start) {
93 let fire_utc = fire_time.with_timezone(&Utc);
94
95 if fire_utc > now {
97 break;
98 }
99
100 if let Some(after_time) = after {
102 if fire_utc <= after_time {
103 continue;
104 }
105 }
106
107 latest_fire = Some(fire_utc);
108 }
109
110 latest_fire
111 }
112}
113
114#[derive(Debug, Clone)]
116pub struct PeriodicJobBuilder {
117 name: String,
118 cron_expr: String,
119 timezone: String,
120 queue: String,
121 priority: i16,
122 max_attempts: i16,
123 tags: Vec<String>,
124 metadata: serde_json::Value,
125}
126
127impl PeriodicJobBuilder {
128 pub fn timezone(mut self, timezone: impl Into<String>) -> Self {
130 self.timezone = timezone.into();
131 self
132 }
133
134 pub fn queue(mut self, queue: impl Into<String>) -> Self {
136 self.queue = queue.into();
137 self
138 }
139
140 pub fn priority(mut self, priority: i16) -> Self {
142 self.priority = priority;
143 self
144 }
145
146 pub fn max_attempts(mut self, max_attempts: i16) -> Self {
148 self.max_attempts = max_attempts;
149 self
150 }
151
152 pub fn tags(mut self, tags: Vec<String>) -> Self {
154 self.tags = tags;
155 self
156 }
157
158 pub fn metadata(mut self, metadata: serde_json::Value) -> Self {
160 self.metadata = metadata;
161 self
162 }
163
164 pub fn build(self, args: &impl crate::JobArgs) -> Result<PeriodicJob, AwaError> {
169 self.build_raw(args.kind_str().to_string(), args.to_args()?)
170 }
171
172 pub fn build_raw(self, kind: String, args: serde_json::Value) -> Result<PeriodicJob, AwaError> {
174 Cron::new(&self.cron_expr)
176 .with_seconds_optional()
177 .parse()
178 .map_err(|err| AwaError::Validation(format!("invalid cron expression: {err}")))?;
179
180 self.timezone
182 .parse::<chrono_tz::Tz>()
183 .map_err(|err| AwaError::Validation(format!("invalid timezone: {err}")))?;
184
185 if !(1..=4).contains(&self.priority) {
187 return Err(AwaError::Validation(format!(
188 "priority must be between 1 and 4, got {}",
189 self.priority
190 )));
191 }
192
193 if !(1..=1000).contains(&self.max_attempts) {
195 return Err(AwaError::Validation(format!(
196 "max_attempts must be between 1 and 1000, got {}",
197 self.max_attempts
198 )));
199 }
200
201 Ok(PeriodicJob {
202 name: self.name,
203 cron_expr: self.cron_expr,
204 timezone: self.timezone,
205 kind,
206 queue: self.queue,
207 args,
208 priority: self.priority,
209 max_attempts: self.max_attempts,
210 tags: self.tags,
211 metadata: self.metadata,
212 })
213 }
214}
215
216#[derive(Debug, Clone, sqlx::FromRow, Serialize)]
218pub struct CronJobRow {
219 pub name: String,
220 pub cron_expr: String,
221 pub timezone: String,
222 pub kind: String,
223 pub queue: String,
224 pub args: serde_json::Value,
225 pub priority: i16,
226 pub max_attempts: i16,
227 pub tags: Vec<String>,
228 pub metadata: serde_json::Value,
229 pub last_enqueued_at: Option<DateTime<Utc>>,
230 pub created_at: DateTime<Utc>,
231 pub updated_at: DateTime<Utc>,
232}
233
234pub async fn upsert_cron_job<'e, E>(executor: E, job: &PeriodicJob) -> Result<(), AwaError>
238where
239 E: PgExecutor<'e>,
240{
241 sqlx::query(
242 r#"
243 INSERT INTO awa.cron_jobs (name, cron_expr, timezone, kind, queue, args, priority, max_attempts, tags, metadata)
244 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
245 ON CONFLICT (name) DO UPDATE SET
246 cron_expr = EXCLUDED.cron_expr,
247 timezone = EXCLUDED.timezone,
248 kind = EXCLUDED.kind,
249 queue = EXCLUDED.queue,
250 args = EXCLUDED.args,
251 priority = EXCLUDED.priority,
252 max_attempts = EXCLUDED.max_attempts,
253 tags = EXCLUDED.tags,
254 metadata = EXCLUDED.metadata,
255 updated_at = now()
256 "#,
257 )
258 .bind(&job.name)
259 .bind(&job.cron_expr)
260 .bind(&job.timezone)
261 .bind(&job.kind)
262 .bind(&job.queue)
263 .bind(&job.args)
264 .bind(job.priority)
265 .bind(job.max_attempts)
266 .bind(&job.tags)
267 .bind(&job.metadata)
268 .execute(executor)
269 .await?;
270
271 Ok(())
272}
273
274pub fn next_fire_time(cron_expr: &str, timezone: &str) -> Option<DateTime<Utc>> {
278 next_fire_time_after(cron_expr, timezone, Utc::now())
279}
280
281pub fn next_fire_time_after(
286 cron_expr: &str,
287 timezone: &str,
288 after: DateTime<Utc>,
289) -> Option<DateTime<Utc>> {
290 let cron = Cron::new(cron_expr).with_seconds_optional().parse().ok()?;
291 let tz: chrono_tz::Tz = timezone.parse().ok()?;
292 let after_tz = after.with_timezone(&tz);
293 let next = cron.iter_from(after_tz).next()?;
294 Some(next.with_timezone(&Utc))
295}
296
297pub async fn list_cron_jobs<'e, E>(executor: E) -> Result<Vec<CronJobRow>, AwaError>
299where
300 E: PgExecutor<'e>,
301{
302 let rows = sqlx::query_as::<_, CronJobRow>("SELECT * FROM awa.cron_jobs ORDER BY name")
303 .fetch_all(executor)
304 .await?;
305 Ok(rows)
306}
307
308pub async fn delete_cron_job<'e, E>(executor: E, name: &str) -> Result<bool, AwaError>
310where
311 E: PgExecutor<'e>,
312{
313 let result = sqlx::query("DELETE FROM awa.cron_jobs WHERE name = $1")
314 .bind(name)
315 .execute(executor)
316 .await?;
317 Ok(result.rows_affected() > 0)
318}
319
320pub async fn atomic_enqueue<'e, E>(
330 executor: E,
331 cron_name: &str,
332 fire_time: DateTime<Utc>,
333 previous_enqueued_at: Option<DateTime<Utc>>,
334) -> Result<Option<JobRow>, AwaError>
335where
336 E: PgExecutor<'e>,
337{
338 let row = sqlx::query_as::<_, JobRow>(
339 r#"
340 WITH mark AS (
341 UPDATE awa.cron_jobs
342 SET last_enqueued_at = $2, updated_at = now()
343 WHERE name = $1
344 AND (last_enqueued_at IS NOT DISTINCT FROM $3)
345 RETURNING name, kind, queue, args, priority, max_attempts, tags, metadata
346 )
347 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, tags, metadata)
348 SELECT kind, queue, args, 'available', priority, max_attempts, tags,
349 metadata || jsonb_build_object('cron_name', name, 'cron_fire_time', $2::text)
350 FROM mark
351 RETURNING *
352 "#,
353 )
354 .bind(cron_name)
355 .bind(fire_time)
356 .bind(previous_enqueued_at)
357 .fetch_optional(executor)
358 .await?;
359
360 Ok(row)
361}
362
363pub async fn trigger_cron_job<'e, E>(executor: E, name: &str) -> Result<JobRow, AwaError>
369where
370 E: PgExecutor<'e>,
371{
372 let row = sqlx::query_as::<_, JobRow>(
373 r#"
374 WITH cron AS (
375 SELECT name, kind, queue, args, priority, max_attempts, tags, metadata
376 FROM awa.cron_jobs
377 WHERE name = $1
378 )
379 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, tags, metadata)
380 SELECT kind, queue, args, 'available', priority, max_attempts, tags,
381 metadata || jsonb_build_object('cron_name', name, 'triggered_manually', true)
382 FROM cron
383 RETURNING *
384 "#,
385 )
386 .bind(name)
387 .fetch_optional(executor)
388 .await?;
389
390 row.ok_or_else(|| AwaError::Validation(format!("cron job not found: {name}")))
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396 use chrono::TimeZone;
397
398 fn make_periodic(cron_expr: &str, timezone: &str) -> PeriodicJob {
399 PeriodicJob {
400 name: "test".to_string(),
401 cron_expr: cron_expr.to_string(),
402 timezone: timezone.to_string(),
403 kind: "test_job".to_string(),
404 queue: "default".to_string(),
405 args: serde_json::json!({}),
406 priority: 2,
407 max_attempts: 25,
408 tags: vec![],
409 metadata: serde_json::json!({}),
410 }
411 }
412
413 #[test]
414 fn test_valid_cron_expression() {
415 let result = PeriodicJob::builder("test", "0 9 * * *")
416 .build_raw("test_job".to_string(), serde_json::json!({}));
417 assert!(result.is_ok());
418 }
419
420 #[test]
421 fn test_invalid_cron_expression() {
422 let result = PeriodicJob::builder("test", "not a cron")
423 .build_raw("test_job".to_string(), serde_json::json!({}));
424 assert!(result.is_err());
425 let err = result.unwrap_err();
426 assert!(
427 err.to_string().contains("invalid cron expression"),
428 "got: {err}"
429 );
430 }
431
432 #[test]
433 fn test_invalid_timezone() {
434 let result = PeriodicJob::builder("test", "0 9 * * *")
435 .timezone("Not/A/Timezone")
436 .build_raw("test_job".to_string(), serde_json::json!({}));
437 assert!(result.is_err());
438 let err = result.unwrap_err();
439 assert!(err.to_string().contains("invalid timezone"), "got: {err}");
440 }
441
442 #[test]
443 fn test_builder_defaults() {
444 let job = PeriodicJob::builder("daily_report", "0 9 * * *")
445 .build_raw(
446 "daily_report".to_string(),
447 serde_json::json!({"format": "pdf"}),
448 )
449 .unwrap();
450 assert_eq!(job.name, "daily_report");
451 assert_eq!(job.timezone, "UTC");
452 assert_eq!(job.queue, "default");
453 assert_eq!(job.priority, 2);
454 assert_eq!(job.max_attempts, 25);
455 assert!(job.tags.is_empty());
456 }
457
458 #[test]
459 fn test_builder_custom_fields() {
460 let job = PeriodicJob::builder("report", "0 9 * * *")
461 .timezone("Pacific/Auckland")
462 .queue("reports")
463 .priority(1)
464 .max_attempts(3)
465 .tags(vec!["important".to_string()])
466 .metadata(serde_json::json!({"source": "cron"}))
467 .build_raw("daily_report".to_string(), serde_json::json!({}))
468 .unwrap();
469 assert_eq!(job.timezone, "Pacific/Auckland");
470 assert_eq!(job.queue, "reports");
471 assert_eq!(job.priority, 1);
472 assert_eq!(job.max_attempts, 3);
473 assert_eq!(job.tags, vec!["important"]);
474 }
475
476 #[test]
477 fn test_latest_fire_time_finds_past_fire() {
478 let pj = make_periodic("0 * * * *", "UTC");
480 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
481 let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 13, 0, 0).unwrap());
482
483 let fire = pj.latest_fire_time(now, after);
484 assert_eq!(
485 fire,
486 Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap())
487 );
488 }
489
490 #[test]
491 fn test_no_fire_when_next_is_future() {
492 let pj = make_periodic("0 * * * *", "UTC");
494 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
495 let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
497
498 let fire = pj.latest_fire_time(now, after);
499 assert!(fire.is_none(), "Should not fire until 15:00");
500 }
501
502 #[test]
503 fn test_first_registration_null_last_enqueued() {
504 let pj = make_periodic("0 * * * *", "UTC");
506 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
507
508 let fire = pj.latest_fire_time(now, None);
509 assert_eq!(
510 fire,
511 Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap()),
512 "Should enqueue the most recent past fire on first registration"
513 );
514 }
515
516 #[test]
517 fn test_no_backfill_only_latest_fire() {
518 let pj = make_periodic("* * * * *", "UTC");
520 let now = Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap();
521 let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
522
523 let fire = pj.latest_fire_time(now, after);
524 assert_eq!(
526 fire,
527 Some(Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap())
528 );
529 }
530
531 #[test]
532 fn test_timezone_aware_fire_time() {
533 let pj = make_periodic("0 9 * * *", "Pacific/Auckland");
535 let now = Utc.with_ymd_and_hms(2025, 6, 15, 21, 30, 0).unwrap();
538 let after = Some(Utc.with_ymd_and_hms(2025, 6, 14, 21, 0, 0).unwrap());
539
540 let fire = pj.latest_fire_time(now, after);
541 assert_eq!(
543 fire,
544 Some(Utc.with_ymd_and_hms(2025, 6, 15, 21, 0, 0).unwrap())
545 );
546 }
547
548 #[test]
549 fn test_dst_spring_forward() {
550 let pj = make_periodic("30 2 * * *", "US/Eastern");
554 let now = Utc.with_ymd_and_hms(2025, 3, 9, 12, 0, 0).unwrap();
555 let after = Some(Utc.with_ymd_and_hms(2025, 3, 8, 12, 0, 0).unwrap());
556
557 let fire = pj.latest_fire_time(now, after);
558 let fire_count = if fire.is_some() { 1 } else { 0 };
561 assert!(
562 fire_count <= 1,
563 "Should fire at most once during spring-forward"
564 );
565 }
566
567 #[test]
568 fn test_dst_fall_back() {
569 let pj = make_periodic("30 1 * * *", "US/Eastern");
572 let now = Utc.with_ymd_and_hms(2025, 11, 2, 12, 0, 0).unwrap();
573 let after = Some(Utc.with_ymd_and_hms(2025, 11, 1, 12, 0, 0).unwrap());
574
575 let fire = pj.latest_fire_time(now, after);
576 assert!(fire.is_some(), "Should fire once during fall-back");
577
578 let fire_time = fire.unwrap();
580 let second_fire = pj.latest_fire_time(now, Some(fire_time));
581 assert!(
582 second_fire.is_none(),
583 "Should not fire a second time during fall-back"
584 );
585 }
586
587 #[test]
588 fn test_invalid_priority() {
589 let result = PeriodicJob::builder("test", "0 9 * * *")
590 .priority(5)
591 .build_raw("test_job".to_string(), serde_json::json!({}));
592 assert!(result.is_err());
593 }
594
595 #[test]
596 fn test_invalid_max_attempts() {
597 let result = PeriodicJob::builder("test", "0 9 * * *")
598 .max_attempts(0)
599 .build_raw("test_job".to_string(), serde_json::json!({}));
600 assert!(result.is_err());
601 }
602
603 #[test]
604 fn test_next_fire_time_exact() {
605 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
607 let next = next_fire_time_after("0 * * * *", "UTC", now);
608 assert_eq!(
609 next,
610 Some(Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap())
611 );
612 }
613
614 #[test]
615 fn test_next_fire_time_respects_timezone() {
616 let now = Utc.with_ymd_and_hms(2025, 6, 15, 20, 0, 0).unwrap();
619 let next = next_fire_time_after("0 9 * * *", "Pacific/Auckland", now);
620 assert_eq!(
621 next,
622 Some(Utc.with_ymd_and_hms(2025, 6, 15, 21, 0, 0).unwrap())
623 );
624
625 let next_utc = next_fire_time_after("0 9 * * *", "UTC", now);
627 assert_eq!(
628 next_utc,
629 Some(Utc.with_ymd_and_hms(2025, 6, 16, 9, 0, 0).unwrap())
630 );
631 }
632
633 #[test]
634 fn test_next_fire_time_dst_boundary() {
635 let now = Utc.with_ymd_and_hms(2025, 3, 9, 6, 30, 0).unwrap();
638 let next = next_fire_time_after("0 * * * *", "US/Eastern", now);
639 assert!(next.is_some());
640 let next = next.unwrap();
642 assert!(
643 next >= Utc.with_ymd_and_hms(2025, 3, 9, 7, 0, 0).unwrap(),
644 "should skip the non-existent 2:00 AM, got {next}"
645 );
646 }
647
648 #[test]
649 fn test_next_fire_time_invalid_input() {
650 let now = Utc::now();
651 assert!(next_fire_time_after("not a cron", "UTC", now).is_none());
652 assert!(next_fire_time_after("* * * * *", "Not/A/Zone", now).is_none());
653 }
654
655 #[test]
658 fn test_six_field_cron_accepted_by_builder() {
659 let result = PeriodicJob::builder("test", "30 0 9 * * *")
660 .build_raw("test_job".to_string(), serde_json::json!({}));
661 assert!(result.is_ok(), "6-field cron should be accepted");
662 }
663
664 #[test]
665 fn test_six_field_cron_every_15_seconds() {
666 let result = PeriodicJob::builder("fast", "*/15 * * * * *")
668 .build_raw("fast_job".to_string(), serde_json::json!({}));
669 assert!(result.is_ok(), "every-15-seconds cron should be accepted");
670 }
671
672 #[test]
673 fn test_six_field_next_fire_time() {
674 let now = Utc.with_ymd_and_hms(2025, 6, 15, 8, 0, 0).unwrap();
676 let next = next_fire_time_after("30 0 9 * * *", "UTC", now);
677 assert_eq!(
678 next,
679 Some(
680 Utc.with_ymd_and_hms(2025, 6, 15, 9, 0, 0).unwrap() + chrono::Duration::seconds(30)
681 ),
682 "should fire at 09:00:30"
683 );
684 }
685
686 #[test]
687 fn test_six_field_every_15_seconds_next_fire() {
688 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 7).unwrap();
690 let next = next_fire_time_after("*/15 * * * * *", "UTC", now);
691 assert_eq!(
692 next,
693 Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 15).unwrap()),
694 "should fire at next 15-second boundary"
695 );
696 }
697
698 #[test]
699 fn test_five_field_still_works() {
700 let result = PeriodicJob::builder("classic", "0 9 * * *")
702 .build_raw("classic_job".to_string(), serde_json::json!({}));
703 assert!(result.is_ok(), "5-field cron should still be accepted");
704
705 let now = Utc.with_ymd_and_hms(2025, 6, 15, 8, 0, 0).unwrap();
706 let next = next_fire_time_after("0 9 * * *", "UTC", now);
707 assert_eq!(
708 next,
709 Some(Utc.with_ymd_and_hms(2025, 6, 15, 9, 0, 0).unwrap())
710 );
711 }
712
713 #[test]
714 fn test_six_field_latest_fire_time() {
715 let job = PeriodicJob::builder("sec_job", "0 */5 * * * *")
717 .build_raw("sec_job".to_string(), serde_json::json!({}))
718 .unwrap();
719
720 let now = Utc.with_ymd_and_hms(2025, 6, 15, 12, 7, 0).unwrap();
721 let latest = job.latest_fire_time(now, None);
722 assert!(
723 latest.is_some(),
724 "6-field cron should produce a latest fire time"
725 );
726 assert_eq!(
729 latest.unwrap(),
730 Utc.with_ymd_and_hms(2025, 6, 15, 12, 5, 0).unwrap()
731 );
732 }
733}