1use crate::error::AwaError;
7use crate::job::JobRow;
8use chrono::{DateTime, Utc};
9use croner::Cron;
10use serde::Serialize;
11use sqlx::PgExecutor;
12
13#[derive(Debug, Clone)]
17pub struct PeriodicJob {
18 pub name: String,
20 pub cron_expr: String,
22 pub timezone: String,
24 pub kind: String,
26 pub queue: String,
28 pub args: serde_json::Value,
30 pub priority: i16,
32 pub max_attempts: i16,
34 pub tags: Vec<String>,
36 pub metadata: serde_json::Value,
38}
39
40impl PeriodicJob {
41 pub fn builder(name: impl Into<String>, cron_expr: impl Into<String>) -> PeriodicJobBuilder {
46 PeriodicJobBuilder {
47 name: name.into(),
48 cron_expr: cron_expr.into(),
49 timezone: "UTC".to_string(),
50 queue: "default".to_string(),
51 priority: 2,
52 max_attempts: 25,
53 tags: Vec::new(),
54 metadata: serde_json::json!({}),
55 }
56 }
57
58 pub fn latest_fire_time(
64 &self,
65 now: DateTime<Utc>,
66 after: Option<DateTime<Utc>>,
67 ) -> Option<DateTime<Utc>> {
68 let cron = Cron::new(&self.cron_expr)
69 .parse()
70 .expect("cron_expr was validated at build time");
71
72 let tz: chrono_tz::Tz = self
73 .timezone
74 .parse()
75 .expect("timezone was validated at build time");
76
77 let now_tz = now.with_timezone(&tz);
78
79 let search_start = match after {
83 Some(after_time) => after_time.with_timezone(&tz),
84 None => now_tz - chrono::Duration::hours(24),
86 };
87
88 let mut latest_fire: Option<DateTime<Utc>> = None;
89
90 for fire_time in cron.clone().iter_from(search_start) {
92 let fire_utc = fire_time.with_timezone(&Utc);
93
94 if fire_utc > now {
96 break;
97 }
98
99 if let Some(after_time) = after {
101 if fire_utc <= after_time {
102 continue;
103 }
104 }
105
106 latest_fire = Some(fire_utc);
107 }
108
109 latest_fire
110 }
111}
112
113#[derive(Debug, Clone)]
115pub struct PeriodicJobBuilder {
116 name: String,
117 cron_expr: String,
118 timezone: String,
119 queue: String,
120 priority: i16,
121 max_attempts: i16,
122 tags: Vec<String>,
123 metadata: serde_json::Value,
124}
125
126impl PeriodicJobBuilder {
127 pub fn timezone(mut self, timezone: impl Into<String>) -> Self {
129 self.timezone = timezone.into();
130 self
131 }
132
133 pub fn queue(mut self, queue: impl Into<String>) -> Self {
135 self.queue = queue.into();
136 self
137 }
138
139 pub fn priority(mut self, priority: i16) -> Self {
141 self.priority = priority;
142 self
143 }
144
145 pub fn max_attempts(mut self, max_attempts: i16) -> Self {
147 self.max_attempts = max_attempts;
148 self
149 }
150
151 pub fn tags(mut self, tags: Vec<String>) -> Self {
153 self.tags = tags;
154 self
155 }
156
157 pub fn metadata(mut self, metadata: serde_json::Value) -> Self {
159 self.metadata = metadata;
160 self
161 }
162
163 pub fn build(self, args: &impl crate::JobArgs) -> Result<PeriodicJob, AwaError> {
168 self.build_raw(args.kind_str().to_string(), args.to_args()?)
169 }
170
171 pub fn build_raw(self, kind: String, args: serde_json::Value) -> Result<PeriodicJob, AwaError> {
173 Cron::new(&self.cron_expr)
175 .parse()
176 .map_err(|err| AwaError::Validation(format!("invalid cron expression: {err}")))?;
177
178 self.timezone
180 .parse::<chrono_tz::Tz>()
181 .map_err(|err| AwaError::Validation(format!("invalid timezone: {err}")))?;
182
183 if !(1..=4).contains(&self.priority) {
185 return Err(AwaError::Validation(format!(
186 "priority must be between 1 and 4, got {}",
187 self.priority
188 )));
189 }
190
191 if !(1..=1000).contains(&self.max_attempts) {
193 return Err(AwaError::Validation(format!(
194 "max_attempts must be between 1 and 1000, got {}",
195 self.max_attempts
196 )));
197 }
198
199 Ok(PeriodicJob {
200 name: self.name,
201 cron_expr: self.cron_expr,
202 timezone: self.timezone,
203 kind,
204 queue: self.queue,
205 args,
206 priority: self.priority,
207 max_attempts: self.max_attempts,
208 tags: self.tags,
209 metadata: self.metadata,
210 })
211 }
212}
213
214#[derive(Debug, Clone, sqlx::FromRow, Serialize)]
216pub struct CronJobRow {
217 pub name: String,
218 pub cron_expr: String,
219 pub timezone: String,
220 pub kind: String,
221 pub queue: String,
222 pub args: serde_json::Value,
223 pub priority: i16,
224 pub max_attempts: i16,
225 pub tags: Vec<String>,
226 pub metadata: serde_json::Value,
227 pub last_enqueued_at: Option<DateTime<Utc>>,
228 pub created_at: DateTime<Utc>,
229 pub updated_at: DateTime<Utc>,
230}
231
232pub async fn upsert_cron_job<'e, E>(executor: E, job: &PeriodicJob) -> Result<(), AwaError>
236where
237 E: PgExecutor<'e>,
238{
239 sqlx::query(
240 r#"
241 INSERT INTO awa.cron_jobs (name, cron_expr, timezone, kind, queue, args, priority, max_attempts, tags, metadata)
242 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
243 ON CONFLICT (name) DO UPDATE SET
244 cron_expr = EXCLUDED.cron_expr,
245 timezone = EXCLUDED.timezone,
246 kind = EXCLUDED.kind,
247 queue = EXCLUDED.queue,
248 args = EXCLUDED.args,
249 priority = EXCLUDED.priority,
250 max_attempts = EXCLUDED.max_attempts,
251 tags = EXCLUDED.tags,
252 metadata = EXCLUDED.metadata,
253 updated_at = now()
254 "#,
255 )
256 .bind(&job.name)
257 .bind(&job.cron_expr)
258 .bind(&job.timezone)
259 .bind(&job.kind)
260 .bind(&job.queue)
261 .bind(&job.args)
262 .bind(job.priority)
263 .bind(job.max_attempts)
264 .bind(&job.tags)
265 .bind(&job.metadata)
266 .execute(executor)
267 .await?;
268
269 Ok(())
270}
271
272pub async fn list_cron_jobs<'e, E>(executor: E) -> Result<Vec<CronJobRow>, AwaError>
274where
275 E: PgExecutor<'e>,
276{
277 let rows = sqlx::query_as::<_, CronJobRow>("SELECT * FROM awa.cron_jobs ORDER BY name")
278 .fetch_all(executor)
279 .await?;
280 Ok(rows)
281}
282
283pub async fn delete_cron_job<'e, E>(executor: E, name: &str) -> Result<bool, AwaError>
285where
286 E: PgExecutor<'e>,
287{
288 let result = sqlx::query("DELETE FROM awa.cron_jobs WHERE name = $1")
289 .bind(name)
290 .execute(executor)
291 .await?;
292 Ok(result.rows_affected() > 0)
293}
294
295pub async fn atomic_enqueue<'e, E>(
305 executor: E,
306 cron_name: &str,
307 fire_time: DateTime<Utc>,
308 previous_enqueued_at: Option<DateTime<Utc>>,
309) -> Result<Option<JobRow>, AwaError>
310where
311 E: PgExecutor<'e>,
312{
313 let row = sqlx::query_as::<_, JobRow>(
314 r#"
315 WITH mark AS (
316 UPDATE awa.cron_jobs
317 SET last_enqueued_at = $2, updated_at = now()
318 WHERE name = $1
319 AND (last_enqueued_at IS NOT DISTINCT FROM $3)
320 RETURNING name, kind, queue, args, priority, max_attempts, tags, metadata
321 )
322 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, tags, metadata)
323 SELECT kind, queue, args, 'available', priority, max_attempts, tags,
324 metadata || jsonb_build_object('cron_name', name, 'cron_fire_time', $2::text)
325 FROM mark
326 RETURNING *
327 "#,
328 )
329 .bind(cron_name)
330 .bind(fire_time)
331 .bind(previous_enqueued_at)
332 .fetch_optional(executor)
333 .await?;
334
335 Ok(row)
336}
337
338pub async fn trigger_cron_job<'e, E>(executor: E, name: &str) -> Result<JobRow, AwaError>
344where
345 E: PgExecutor<'e>,
346{
347 let row = sqlx::query_as::<_, JobRow>(
348 r#"
349 WITH cron AS (
350 SELECT name, kind, queue, args, priority, max_attempts, tags, metadata
351 FROM awa.cron_jobs
352 WHERE name = $1
353 )
354 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, tags, metadata)
355 SELECT kind, queue, args, 'available', priority, max_attempts, tags,
356 metadata || jsonb_build_object('cron_name', name, 'triggered_manually', true)
357 FROM cron
358 RETURNING *
359 "#,
360 )
361 .bind(name)
362 .fetch_optional(executor)
363 .await?;
364
365 row.ok_or_else(|| AwaError::Validation(format!("cron job not found: {name}")))
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371 use chrono::TimeZone;
372
373 fn make_periodic(cron_expr: &str, timezone: &str) -> PeriodicJob {
374 PeriodicJob {
375 name: "test".to_string(),
376 cron_expr: cron_expr.to_string(),
377 timezone: timezone.to_string(),
378 kind: "test_job".to_string(),
379 queue: "default".to_string(),
380 args: serde_json::json!({}),
381 priority: 2,
382 max_attempts: 25,
383 tags: vec![],
384 metadata: serde_json::json!({}),
385 }
386 }
387
388 #[test]
389 fn test_valid_cron_expression() {
390 let result = PeriodicJob::builder("test", "0 9 * * *")
391 .build_raw("test_job".to_string(), serde_json::json!({}));
392 assert!(result.is_ok());
393 }
394
395 #[test]
396 fn test_invalid_cron_expression() {
397 let result = PeriodicJob::builder("test", "not a cron")
398 .build_raw("test_job".to_string(), serde_json::json!({}));
399 assert!(result.is_err());
400 let err = result.unwrap_err();
401 assert!(
402 err.to_string().contains("invalid cron expression"),
403 "got: {err}"
404 );
405 }
406
407 #[test]
408 fn test_invalid_timezone() {
409 let result = PeriodicJob::builder("test", "0 9 * * *")
410 .timezone("Not/A/Timezone")
411 .build_raw("test_job".to_string(), serde_json::json!({}));
412 assert!(result.is_err());
413 let err = result.unwrap_err();
414 assert!(err.to_string().contains("invalid timezone"), "got: {err}");
415 }
416
417 #[test]
418 fn test_builder_defaults() {
419 let job = PeriodicJob::builder("daily_report", "0 9 * * *")
420 .build_raw(
421 "daily_report".to_string(),
422 serde_json::json!({"format": "pdf"}),
423 )
424 .unwrap();
425 assert_eq!(job.name, "daily_report");
426 assert_eq!(job.timezone, "UTC");
427 assert_eq!(job.queue, "default");
428 assert_eq!(job.priority, 2);
429 assert_eq!(job.max_attempts, 25);
430 assert!(job.tags.is_empty());
431 }
432
433 #[test]
434 fn test_builder_custom_fields() {
435 let job = PeriodicJob::builder("report", "0 9 * * *")
436 .timezone("Pacific/Auckland")
437 .queue("reports")
438 .priority(1)
439 .max_attempts(3)
440 .tags(vec!["important".to_string()])
441 .metadata(serde_json::json!({"source": "cron"}))
442 .build_raw("daily_report".to_string(), serde_json::json!({}))
443 .unwrap();
444 assert_eq!(job.timezone, "Pacific/Auckland");
445 assert_eq!(job.queue, "reports");
446 assert_eq!(job.priority, 1);
447 assert_eq!(job.max_attempts, 3);
448 assert_eq!(job.tags, vec!["important"]);
449 }
450
451 #[test]
452 fn test_latest_fire_time_finds_past_fire() {
453 let pj = make_periodic("0 * * * *", "UTC");
455 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
456 let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 13, 0, 0).unwrap());
457
458 let fire = pj.latest_fire_time(now, after);
459 assert_eq!(
460 fire,
461 Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap())
462 );
463 }
464
465 #[test]
466 fn test_no_fire_when_next_is_future() {
467 let pj = make_periodic("0 * * * *", "UTC");
469 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
470 let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
472
473 let fire = pj.latest_fire_time(now, after);
474 assert!(fire.is_none(), "Should not fire until 15:00");
475 }
476
477 #[test]
478 fn test_first_registration_null_last_enqueued() {
479 let pj = make_periodic("0 * * * *", "UTC");
481 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
482
483 let fire = pj.latest_fire_time(now, None);
484 assert_eq!(
485 fire,
486 Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap()),
487 "Should enqueue the most recent past fire on first registration"
488 );
489 }
490
491 #[test]
492 fn test_no_backfill_only_latest_fire() {
493 let pj = make_periodic("* * * * *", "UTC");
495 let now = Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap();
496 let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
497
498 let fire = pj.latest_fire_time(now, after);
499 assert_eq!(
501 fire,
502 Some(Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap())
503 );
504 }
505
506 #[test]
507 fn test_timezone_aware_fire_time() {
508 let pj = make_periodic("0 9 * * *", "Pacific/Auckland");
510 let now = Utc.with_ymd_and_hms(2025, 6, 15, 21, 30, 0).unwrap();
513 let after = Some(Utc.with_ymd_and_hms(2025, 6, 14, 21, 0, 0).unwrap());
514
515 let fire = pj.latest_fire_time(now, after);
516 assert_eq!(
518 fire,
519 Some(Utc.with_ymd_and_hms(2025, 6, 15, 21, 0, 0).unwrap())
520 );
521 }
522
523 #[test]
524 fn test_dst_spring_forward() {
525 let pj = make_periodic("30 2 * * *", "US/Eastern");
529 let now = Utc.with_ymd_and_hms(2025, 3, 9, 12, 0, 0).unwrap();
530 let after = Some(Utc.with_ymd_and_hms(2025, 3, 8, 12, 0, 0).unwrap());
531
532 let fire = pj.latest_fire_time(now, after);
533 let fire_count = if fire.is_some() { 1 } else { 0 };
536 assert!(
537 fire_count <= 1,
538 "Should fire at most once during spring-forward"
539 );
540 }
541
542 #[test]
543 fn test_dst_fall_back() {
544 let pj = make_periodic("30 1 * * *", "US/Eastern");
547 let now = Utc.with_ymd_and_hms(2025, 11, 2, 12, 0, 0).unwrap();
548 let after = Some(Utc.with_ymd_and_hms(2025, 11, 1, 12, 0, 0).unwrap());
549
550 let fire = pj.latest_fire_time(now, after);
551 assert!(fire.is_some(), "Should fire once during fall-back");
552
553 let fire_time = fire.unwrap();
555 let second_fire = pj.latest_fire_time(now, Some(fire_time));
556 assert!(
557 second_fire.is_none(),
558 "Should not fire a second time during fall-back"
559 );
560 }
561
562 #[test]
563 fn test_invalid_priority() {
564 let result = PeriodicJob::builder("test", "0 9 * * *")
565 .priority(5)
566 .build_raw("test_job".to_string(), serde_json::json!({}));
567 assert!(result.is_err());
568 }
569
570 #[test]
571 fn test_invalid_max_attempts() {
572 let result = PeriodicJob::builder("test", "0 9 * * *")
573 .max_attempts(0)
574 .build_raw("test_job".to_string(), serde_json::json!({}));
575 assert!(result.is_err());
576 }
577}