1use crate::error::AwaError;
7use crate::job::JobRow;
8use chrono::{DateTime, Utc};
9use croner::Cron;
10use serde::Serialize;
11use sqlx::PgExecutor;
12
13#[derive(Debug, Clone)]
17pub struct PeriodicJob {
18 pub name: String,
20 pub cron_expr: String,
22 pub timezone: String,
24 pub kind: String,
26 pub queue: String,
28 pub args: serde_json::Value,
30 pub priority: i16,
32 pub max_attempts: i16,
34 pub tags: Vec<String>,
36 pub metadata: serde_json::Value,
38}
39
40impl PeriodicJob {
41 pub fn builder(name: impl Into<String>, cron_expr: impl Into<String>) -> PeriodicJobBuilder {
46 PeriodicJobBuilder {
47 name: name.into(),
48 cron_expr: cron_expr.into(),
49 timezone: "UTC".to_string(),
50 queue: "default".to_string(),
51 priority: 2,
52 max_attempts: 25,
53 tags: Vec::new(),
54 metadata: serde_json::json!({}),
55 }
56 }
57
58 pub fn latest_fire_time(
64 &self,
65 now: DateTime<Utc>,
66 after: Option<DateTime<Utc>>,
67 ) -> Option<DateTime<Utc>> {
68 let cron = Cron::new(&self.cron_expr)
69 .parse()
70 .expect("cron_expr was validated at build time");
71
72 let tz: chrono_tz::Tz = self
73 .timezone
74 .parse()
75 .expect("timezone was validated at build time");
76
77 let now_tz = now.with_timezone(&tz);
78
79 let search_start = match after {
83 Some(after_time) => after_time.with_timezone(&tz),
84 None => now_tz - chrono::Duration::hours(24),
86 };
87
88 let mut latest_fire: Option<DateTime<Utc>> = None;
89
90 for fire_time in cron.clone().iter_from(search_start) {
92 let fire_utc = fire_time.with_timezone(&Utc);
93
94 if fire_utc > now {
96 break;
97 }
98
99 if let Some(after_time) = after {
101 if fire_utc <= after_time {
102 continue;
103 }
104 }
105
106 latest_fire = Some(fire_utc);
107 }
108
109 latest_fire
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct PeriodicJobBuilder {
116 name: String,
117 cron_expr: String,
118 timezone: String,
119 queue: String,
120 priority: i16,
121 max_attempts: i16,
122 tags: Vec<String>,
123 metadata: serde_json::Value,
124}
125
126impl PeriodicJobBuilder {
127 pub fn timezone(mut self, timezone: impl Into<String>) -> Self {
129 self.timezone = timezone.into();
130 self
131 }
132
133 pub fn queue(mut self, queue: impl Into<String>) -> Self {
135 self.queue = queue.into();
136 self
137 }
138
139 pub fn priority(mut self, priority: i16) -> Self {
141 self.priority = priority;
142 self
143 }
144
145 pub fn max_attempts(mut self, max_attempts: i16) -> Self {
147 self.max_attempts = max_attempts;
148 self
149 }
150
151 pub fn tags(mut self, tags: Vec<String>) -> Self {
153 self.tags = tags;
154 self
155 }
156
157 pub fn metadata(mut self, metadata: serde_json::Value) -> Self {
159 self.metadata = metadata;
160 self
161 }
162
163 pub fn build(self, args: &impl crate::JobArgs) -> Result<PeriodicJob, AwaError> {
168 self.build_raw(args.kind_str().to_string(), args.to_args()?)
169 }
170
171 pub fn build_raw(self, kind: String, args: serde_json::Value) -> Result<PeriodicJob, AwaError> {
173 Cron::new(&self.cron_expr)
175 .parse()
176 .map_err(|err| AwaError::Validation(format!("invalid cron expression: {err}")))?;
177
178 self.timezone
180 .parse::<chrono_tz::Tz>()
181 .map_err(|err| AwaError::Validation(format!("invalid timezone: {err}")))?;
182
183 if !(1..=4).contains(&self.priority) {
185 return Err(AwaError::Validation(format!(
186 "priority must be between 1 and 4, got {}",
187 self.priority
188 )));
189 }
190
191 if !(1..=1000).contains(&self.max_attempts) {
193 return Err(AwaError::Validation(format!(
194 "max_attempts must be between 1 and 1000, got {}",
195 self.max_attempts
196 )));
197 }
198
199 Ok(PeriodicJob {
200 name: self.name,
201 cron_expr: self.cron_expr,
202 timezone: self.timezone,
203 kind,
204 queue: self.queue,
205 args,
206 priority: self.priority,
207 max_attempts: self.max_attempts,
208 tags: self.tags,
209 metadata: self.metadata,
210 })
211 }
212}
213
214#[derive(Debug, Clone, sqlx::FromRow, Serialize)]
216pub struct CronJobRow {
217 pub name: String,
218 pub cron_expr: String,
219 pub timezone: String,
220 pub kind: String,
221 pub queue: String,
222 pub args: serde_json::Value,
223 pub priority: i16,
224 pub max_attempts: i16,
225 pub tags: Vec<String>,
226 pub metadata: serde_json::Value,
227 pub last_enqueued_at: Option<DateTime<Utc>>,
228 pub created_at: DateTime<Utc>,
229 pub updated_at: DateTime<Utc>,
230}
231
232pub async fn upsert_cron_job<'e, E>(executor: E, job: &PeriodicJob) -> Result<(), AwaError>
236where
237 E: PgExecutor<'e>,
238{
239 sqlx::query(
240 r#"
241 INSERT INTO awa.cron_jobs (name, cron_expr, timezone, kind, queue, args, priority, max_attempts, tags, metadata)
242 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
243 ON CONFLICT (name) DO UPDATE SET
244 cron_expr = EXCLUDED.cron_expr,
245 timezone = EXCLUDED.timezone,
246 kind = EXCLUDED.kind,
247 queue = EXCLUDED.queue,
248 args = EXCLUDED.args,
249 priority = EXCLUDED.priority,
250 max_attempts = EXCLUDED.max_attempts,
251 tags = EXCLUDED.tags,
252 metadata = EXCLUDED.metadata,
253 updated_at = now()
254 "#,
255 )
256 .bind(&job.name)
257 .bind(&job.cron_expr)
258 .bind(&job.timezone)
259 .bind(&job.kind)
260 .bind(&job.queue)
261 .bind(&job.args)
262 .bind(job.priority)
263 .bind(job.max_attempts)
264 .bind(&job.tags)
265 .bind(&job.metadata)
266 .execute(executor)
267 .await?;
268
269 Ok(())
270}
271
272pub fn next_fire_time(cron_expr: &str, timezone: &str) -> Option<DateTime<Utc>> {
276 next_fire_time_after(cron_expr, timezone, Utc::now())
277}
278
279pub fn next_fire_time_after(
284 cron_expr: &str,
285 timezone: &str,
286 after: DateTime<Utc>,
287) -> Option<DateTime<Utc>> {
288 let cron = Cron::new(cron_expr).parse().ok()?;
289 let tz: chrono_tz::Tz = timezone.parse().ok()?;
290 let after_tz = after.with_timezone(&tz);
291 let next = cron.iter_from(after_tz).next()?;
292 Some(next.with_timezone(&Utc))
293}
294
295pub async fn list_cron_jobs<'e, E>(executor: E) -> Result<Vec<CronJobRow>, AwaError>
297where
298 E: PgExecutor<'e>,
299{
300 let rows = sqlx::query_as::<_, CronJobRow>("SELECT * FROM awa.cron_jobs ORDER BY name")
301 .fetch_all(executor)
302 .await?;
303 Ok(rows)
304}
305
306pub async fn delete_cron_job<'e, E>(executor: E, name: &str) -> Result<bool, AwaError>
308where
309 E: PgExecutor<'e>,
310{
311 let result = sqlx::query("DELETE FROM awa.cron_jobs WHERE name = $1")
312 .bind(name)
313 .execute(executor)
314 .await?;
315 Ok(result.rows_affected() > 0)
316}
317
318pub async fn atomic_enqueue<'e, E>(
328 executor: E,
329 cron_name: &str,
330 fire_time: DateTime<Utc>,
331 previous_enqueued_at: Option<DateTime<Utc>>,
332) -> Result<Option<JobRow>, AwaError>
333where
334 E: PgExecutor<'e>,
335{
336 let row = sqlx::query_as::<_, JobRow>(
337 r#"
338 WITH mark AS (
339 UPDATE awa.cron_jobs
340 SET last_enqueued_at = $2, updated_at = now()
341 WHERE name = $1
342 AND (last_enqueued_at IS NOT DISTINCT FROM $3)
343 RETURNING name, kind, queue, args, priority, max_attempts, tags, metadata
344 )
345 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, tags, metadata)
346 SELECT kind, queue, args, 'available', priority, max_attempts, tags,
347 metadata || jsonb_build_object('cron_name', name, 'cron_fire_time', $2::text)
348 FROM mark
349 RETURNING *
350 "#,
351 )
352 .bind(cron_name)
353 .bind(fire_time)
354 .bind(previous_enqueued_at)
355 .fetch_optional(executor)
356 .await?;
357
358 Ok(row)
359}
360
361pub async fn trigger_cron_job<'e, E>(executor: E, name: &str) -> Result<JobRow, AwaError>
367where
368 E: PgExecutor<'e>,
369{
370 let row = sqlx::query_as::<_, JobRow>(
371 r#"
372 WITH cron AS (
373 SELECT name, kind, queue, args, priority, max_attempts, tags, metadata
374 FROM awa.cron_jobs
375 WHERE name = $1
376 )
377 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, tags, metadata)
378 SELECT kind, queue, args, 'available', priority, max_attempts, tags,
379 metadata || jsonb_build_object('cron_name', name, 'triggered_manually', true)
380 FROM cron
381 RETURNING *
382 "#,
383 )
384 .bind(name)
385 .fetch_optional(executor)
386 .await?;
387
388 row.ok_or_else(|| AwaError::Validation(format!("cron job not found: {name}")))
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394 use chrono::TimeZone;
395
396 fn make_periodic(cron_expr: &str, timezone: &str) -> PeriodicJob {
397 PeriodicJob {
398 name: "test".to_string(),
399 cron_expr: cron_expr.to_string(),
400 timezone: timezone.to_string(),
401 kind: "test_job".to_string(),
402 queue: "default".to_string(),
403 args: serde_json::json!({}),
404 priority: 2,
405 max_attempts: 25,
406 tags: vec![],
407 metadata: serde_json::json!({}),
408 }
409 }
410
411 #[test]
412 fn test_valid_cron_expression() {
413 let result = PeriodicJob::builder("test", "0 9 * * *")
414 .build_raw("test_job".to_string(), serde_json::json!({}));
415 assert!(result.is_ok());
416 }
417
418 #[test]
419 fn test_invalid_cron_expression() {
420 let result = PeriodicJob::builder("test", "not a cron")
421 .build_raw("test_job".to_string(), serde_json::json!({}));
422 assert!(result.is_err());
423 let err = result.unwrap_err();
424 assert!(
425 err.to_string().contains("invalid cron expression"),
426 "got: {err}"
427 );
428 }
429
430 #[test]
431 fn test_invalid_timezone() {
432 let result = PeriodicJob::builder("test", "0 9 * * *")
433 .timezone("Not/A/Timezone")
434 .build_raw("test_job".to_string(), serde_json::json!({}));
435 assert!(result.is_err());
436 let err = result.unwrap_err();
437 assert!(err.to_string().contains("invalid timezone"), "got: {err}");
438 }
439
440 #[test]
441 fn test_builder_defaults() {
442 let job = PeriodicJob::builder("daily_report", "0 9 * * *")
443 .build_raw(
444 "daily_report".to_string(),
445 serde_json::json!({"format": "pdf"}),
446 )
447 .unwrap();
448 assert_eq!(job.name, "daily_report");
449 assert_eq!(job.timezone, "UTC");
450 assert_eq!(job.queue, "default");
451 assert_eq!(job.priority, 2);
452 assert_eq!(job.max_attempts, 25);
453 assert!(job.tags.is_empty());
454 }
455
456 #[test]
457 fn test_builder_custom_fields() {
458 let job = PeriodicJob::builder("report", "0 9 * * *")
459 .timezone("Pacific/Auckland")
460 .queue("reports")
461 .priority(1)
462 .max_attempts(3)
463 .tags(vec!["important".to_string()])
464 .metadata(serde_json::json!({"source": "cron"}))
465 .build_raw("daily_report".to_string(), serde_json::json!({}))
466 .unwrap();
467 assert_eq!(job.timezone, "Pacific/Auckland");
468 assert_eq!(job.queue, "reports");
469 assert_eq!(job.priority, 1);
470 assert_eq!(job.max_attempts, 3);
471 assert_eq!(job.tags, vec!["important"]);
472 }
473
474 #[test]
475 fn test_latest_fire_time_finds_past_fire() {
476 let pj = make_periodic("0 * * * *", "UTC");
478 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
479 let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 13, 0, 0).unwrap());
480
481 let fire = pj.latest_fire_time(now, after);
482 assert_eq!(
483 fire,
484 Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap())
485 );
486 }
487
488 #[test]
489 fn test_no_fire_when_next_is_future() {
490 let pj = make_periodic("0 * * * *", "UTC");
492 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
493 let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
495
496 let fire = pj.latest_fire_time(now, after);
497 assert!(fire.is_none(), "Should not fire until 15:00");
498 }
499
500 #[test]
501 fn test_first_registration_null_last_enqueued() {
502 let pj = make_periodic("0 * * * *", "UTC");
504 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
505
506 let fire = pj.latest_fire_time(now, None);
507 assert_eq!(
508 fire,
509 Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap()),
510 "Should enqueue the most recent past fire on first registration"
511 );
512 }
513
514 #[test]
515 fn test_no_backfill_only_latest_fire() {
516 let pj = make_periodic("* * * * *", "UTC");
518 let now = Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap();
519 let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
520
521 let fire = pj.latest_fire_time(now, after);
522 assert_eq!(
524 fire,
525 Some(Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap())
526 );
527 }
528
529 #[test]
530 fn test_timezone_aware_fire_time() {
531 let pj = make_periodic("0 9 * * *", "Pacific/Auckland");
533 let now = Utc.with_ymd_and_hms(2025, 6, 15, 21, 30, 0).unwrap();
536 let after = Some(Utc.with_ymd_and_hms(2025, 6, 14, 21, 0, 0).unwrap());
537
538 let fire = pj.latest_fire_time(now, after);
539 assert_eq!(
541 fire,
542 Some(Utc.with_ymd_and_hms(2025, 6, 15, 21, 0, 0).unwrap())
543 );
544 }
545
546 #[test]
547 fn test_dst_spring_forward() {
548 let pj = make_periodic("30 2 * * *", "US/Eastern");
552 let now = Utc.with_ymd_and_hms(2025, 3, 9, 12, 0, 0).unwrap();
553 let after = Some(Utc.with_ymd_and_hms(2025, 3, 8, 12, 0, 0).unwrap());
554
555 let fire = pj.latest_fire_time(now, after);
556 let fire_count = if fire.is_some() { 1 } else { 0 };
559 assert!(
560 fire_count <= 1,
561 "Should fire at most once during spring-forward"
562 );
563 }
564
565 #[test]
566 fn test_dst_fall_back() {
567 let pj = make_periodic("30 1 * * *", "US/Eastern");
570 let now = Utc.with_ymd_and_hms(2025, 11, 2, 12, 0, 0).unwrap();
571 let after = Some(Utc.with_ymd_and_hms(2025, 11, 1, 12, 0, 0).unwrap());
572
573 let fire = pj.latest_fire_time(now, after);
574 assert!(fire.is_some(), "Should fire once during fall-back");
575
576 let fire_time = fire.unwrap();
578 let second_fire = pj.latest_fire_time(now, Some(fire_time));
579 assert!(
580 second_fire.is_none(),
581 "Should not fire a second time during fall-back"
582 );
583 }
584
585 #[test]
586 fn test_invalid_priority() {
587 let result = PeriodicJob::builder("test", "0 9 * * *")
588 .priority(5)
589 .build_raw("test_job".to_string(), serde_json::json!({}));
590 assert!(result.is_err());
591 }
592
593 #[test]
594 fn test_invalid_max_attempts() {
595 let result = PeriodicJob::builder("test", "0 9 * * *")
596 .max_attempts(0)
597 .build_raw("test_job".to_string(), serde_json::json!({}));
598 assert!(result.is_err());
599 }
600
601 #[test]
602 fn test_next_fire_time_exact() {
603 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
605 let next = next_fire_time_after("0 * * * *", "UTC", now);
606 assert_eq!(
607 next,
608 Some(Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap())
609 );
610 }
611
612 #[test]
613 fn test_next_fire_time_respects_timezone() {
614 let now = Utc.with_ymd_and_hms(2025, 6, 15, 20, 0, 0).unwrap();
617 let next = next_fire_time_after("0 9 * * *", "Pacific/Auckland", now);
618 assert_eq!(
619 next,
620 Some(Utc.with_ymd_and_hms(2025, 6, 15, 21, 0, 0).unwrap())
621 );
622
623 let next_utc = next_fire_time_after("0 9 * * *", "UTC", now);
625 assert_eq!(
626 next_utc,
627 Some(Utc.with_ymd_and_hms(2025, 6, 16, 9, 0, 0).unwrap())
628 );
629 }
630
631 #[test]
632 fn test_next_fire_time_dst_boundary() {
633 let now = Utc.with_ymd_and_hms(2025, 3, 9, 6, 30, 0).unwrap();
636 let next = next_fire_time_after("0 * * * *", "US/Eastern", now);
637 assert!(next.is_some());
638 let next = next.unwrap();
640 assert!(
641 next >= Utc.with_ymd_and_hms(2025, 3, 9, 7, 0, 0).unwrap(),
642 "should skip the non-existent 2:00 AM, got {next}"
643 );
644 }
645
646 #[test]
647 fn test_next_fire_time_invalid_input() {
648 let now = Utc::now();
649 assert!(next_fire_time_after("not a cron", "UTC", now).is_none());
650 assert!(next_fire_time_after("* * * * *", "Not/A/Zone", now).is_none());
651 }
652}