1use crate::error::AwaError;
7use crate::job::JobRow;
8use chrono::{DateTime, Utc};
9use croner::Cron;
10use sqlx::PgExecutor;
11
12#[derive(Debug, Clone)]
16pub struct PeriodicJob {
17 pub name: String,
19 pub cron_expr: String,
21 pub timezone: String,
23 pub kind: String,
25 pub queue: String,
27 pub args: serde_json::Value,
29 pub priority: i16,
31 pub max_attempts: i16,
33 pub tags: Vec<String>,
35 pub metadata: serde_json::Value,
37}
38
39impl PeriodicJob {
40 pub fn builder(name: impl Into<String>, cron_expr: impl Into<String>) -> PeriodicJobBuilder {
45 PeriodicJobBuilder {
46 name: name.into(),
47 cron_expr: cron_expr.into(),
48 timezone: "UTC".to_string(),
49 queue: "default".to_string(),
50 priority: 2,
51 max_attempts: 25,
52 tags: Vec::new(),
53 metadata: serde_json::json!({}),
54 }
55 }
56
57 pub fn latest_fire_time(
63 &self,
64 now: DateTime<Utc>,
65 after: Option<DateTime<Utc>>,
66 ) -> Option<DateTime<Utc>> {
67 let cron = Cron::new(&self.cron_expr)
68 .parse()
69 .expect("cron_expr was validated at build time");
70
71 let tz: chrono_tz::Tz = self
72 .timezone
73 .parse()
74 .expect("timezone was validated at build time");
75
76 let now_tz = now.with_timezone(&tz);
77
78 let search_start = match after {
82 Some(after_time) => after_time.with_timezone(&tz),
83 None => now_tz - chrono::Duration::hours(24),
85 };
86
87 let mut latest_fire: Option<DateTime<Utc>> = None;
88
89 for fire_time in cron.clone().iter_from(search_start) {
91 let fire_utc = fire_time.with_timezone(&Utc);
92
93 if fire_utc > now {
95 break;
96 }
97
98 if let Some(after_time) = after {
100 if fire_utc <= after_time {
101 continue;
102 }
103 }
104
105 latest_fire = Some(fire_utc);
106 }
107
108 latest_fire
109 }
110}
111
112#[derive(Debug, Clone)]
114pub struct PeriodicJobBuilder {
115 name: String,
116 cron_expr: String,
117 timezone: String,
118 queue: String,
119 priority: i16,
120 max_attempts: i16,
121 tags: Vec<String>,
122 metadata: serde_json::Value,
123}
124
125impl PeriodicJobBuilder {
126 pub fn timezone(mut self, timezone: impl Into<String>) -> Self {
128 self.timezone = timezone.into();
129 self
130 }
131
132 pub fn queue(mut self, queue: impl Into<String>) -> Self {
134 self.queue = queue.into();
135 self
136 }
137
138 pub fn priority(mut self, priority: i16) -> Self {
140 self.priority = priority;
141 self
142 }
143
144 pub fn max_attempts(mut self, max_attempts: i16) -> Self {
146 self.max_attempts = max_attempts;
147 self
148 }
149
150 pub fn tags(mut self, tags: Vec<String>) -> Self {
152 self.tags = tags;
153 self
154 }
155
156 pub fn metadata(mut self, metadata: serde_json::Value) -> Self {
158 self.metadata = metadata;
159 self
160 }
161
162 pub fn build(self, args: &impl crate::JobArgs) -> Result<PeriodicJob, AwaError> {
167 self.build_raw(args.kind_str().to_string(), args.to_args()?)
168 }
169
170 pub fn build_raw(self, kind: String, args: serde_json::Value) -> Result<PeriodicJob, AwaError> {
172 Cron::new(&self.cron_expr)
174 .parse()
175 .map_err(|err| AwaError::Validation(format!("invalid cron expression: {err}")))?;
176
177 self.timezone
179 .parse::<chrono_tz::Tz>()
180 .map_err(|err| AwaError::Validation(format!("invalid timezone: {err}")))?;
181
182 if !(1..=4).contains(&self.priority) {
184 return Err(AwaError::Validation(format!(
185 "priority must be between 1 and 4, got {}",
186 self.priority
187 )));
188 }
189
190 if !(1..=1000).contains(&self.max_attempts) {
192 return Err(AwaError::Validation(format!(
193 "max_attempts must be between 1 and 1000, got {}",
194 self.max_attempts
195 )));
196 }
197
198 Ok(PeriodicJob {
199 name: self.name,
200 cron_expr: self.cron_expr,
201 timezone: self.timezone,
202 kind,
203 queue: self.queue,
204 args,
205 priority: self.priority,
206 max_attempts: self.max_attempts,
207 tags: self.tags,
208 metadata: self.metadata,
209 })
210 }
211}
212
213#[derive(Debug, Clone, sqlx::FromRow)]
215pub struct CronJobRow {
216 pub name: String,
217 pub cron_expr: String,
218 pub timezone: String,
219 pub kind: String,
220 pub queue: String,
221 pub args: serde_json::Value,
222 pub priority: i16,
223 pub max_attempts: i16,
224 pub tags: Vec<String>,
225 pub metadata: serde_json::Value,
226 pub last_enqueued_at: Option<DateTime<Utc>>,
227 pub created_at: DateTime<Utc>,
228 pub updated_at: DateTime<Utc>,
229}
230
231pub async fn upsert_cron_job<'e, E>(executor: E, job: &PeriodicJob) -> Result<(), AwaError>
235where
236 E: PgExecutor<'e>,
237{
238 sqlx::query(
239 r#"
240 INSERT INTO awa.cron_jobs (name, cron_expr, timezone, kind, queue, args, priority, max_attempts, tags, metadata)
241 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
242 ON CONFLICT (name) DO UPDATE SET
243 cron_expr = EXCLUDED.cron_expr,
244 timezone = EXCLUDED.timezone,
245 kind = EXCLUDED.kind,
246 queue = EXCLUDED.queue,
247 args = EXCLUDED.args,
248 priority = EXCLUDED.priority,
249 max_attempts = EXCLUDED.max_attempts,
250 tags = EXCLUDED.tags,
251 metadata = EXCLUDED.metadata,
252 updated_at = now()
253 "#,
254 )
255 .bind(&job.name)
256 .bind(&job.cron_expr)
257 .bind(&job.timezone)
258 .bind(&job.kind)
259 .bind(&job.queue)
260 .bind(&job.args)
261 .bind(job.priority)
262 .bind(job.max_attempts)
263 .bind(&job.tags)
264 .bind(&job.metadata)
265 .execute(executor)
266 .await?;
267
268 Ok(())
269}
270
271pub async fn list_cron_jobs<'e, E>(executor: E) -> Result<Vec<CronJobRow>, AwaError>
273where
274 E: PgExecutor<'e>,
275{
276 let rows = sqlx::query_as::<_, CronJobRow>("SELECT * FROM awa.cron_jobs ORDER BY name")
277 .fetch_all(executor)
278 .await?;
279 Ok(rows)
280}
281
282pub async fn delete_cron_job<'e, E>(executor: E, name: &str) -> Result<bool, AwaError>
284where
285 E: PgExecutor<'e>,
286{
287 let result = sqlx::query("DELETE FROM awa.cron_jobs WHERE name = $1")
288 .bind(name)
289 .execute(executor)
290 .await?;
291 Ok(result.rows_affected() > 0)
292}
293
294pub async fn atomic_enqueue<'e, E>(
304 executor: E,
305 cron_name: &str,
306 fire_time: DateTime<Utc>,
307 previous_enqueued_at: Option<DateTime<Utc>>,
308) -> Result<Option<JobRow>, AwaError>
309where
310 E: PgExecutor<'e>,
311{
312 let row = sqlx::query_as::<_, JobRow>(
313 r#"
314 WITH mark AS (
315 UPDATE awa.cron_jobs
316 SET last_enqueued_at = $2, updated_at = now()
317 WHERE name = $1
318 AND (last_enqueued_at IS NOT DISTINCT FROM $3)
319 RETURNING name, kind, queue, args, priority, max_attempts, tags, metadata
320 )
321 INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, tags, metadata)
322 SELECT kind, queue, args, 'available', priority, max_attempts, tags,
323 metadata || jsonb_build_object('cron_name', name, 'cron_fire_time', $2::text)
324 FROM mark
325 RETURNING *
326 "#,
327 )
328 .bind(cron_name)
329 .bind(fire_time)
330 .bind(previous_enqueued_at)
331 .fetch_optional(executor)
332 .await?;
333
334 Ok(row)
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use chrono::TimeZone;
341
342 fn make_periodic(cron_expr: &str, timezone: &str) -> PeriodicJob {
343 PeriodicJob {
344 name: "test".to_string(),
345 cron_expr: cron_expr.to_string(),
346 timezone: timezone.to_string(),
347 kind: "test_job".to_string(),
348 queue: "default".to_string(),
349 args: serde_json::json!({}),
350 priority: 2,
351 max_attempts: 25,
352 tags: vec![],
353 metadata: serde_json::json!({}),
354 }
355 }
356
357 #[test]
358 fn test_valid_cron_expression() {
359 let result = PeriodicJob::builder("test", "0 9 * * *")
360 .build_raw("test_job".to_string(), serde_json::json!({}));
361 assert!(result.is_ok());
362 }
363
364 #[test]
365 fn test_invalid_cron_expression() {
366 let result = PeriodicJob::builder("test", "not a cron")
367 .build_raw("test_job".to_string(), serde_json::json!({}));
368 assert!(result.is_err());
369 let err = result.unwrap_err();
370 assert!(
371 err.to_string().contains("invalid cron expression"),
372 "got: {err}"
373 );
374 }
375
376 #[test]
377 fn test_invalid_timezone() {
378 let result = PeriodicJob::builder("test", "0 9 * * *")
379 .timezone("Not/A/Timezone")
380 .build_raw("test_job".to_string(), serde_json::json!({}));
381 assert!(result.is_err());
382 let err = result.unwrap_err();
383 assert!(err.to_string().contains("invalid timezone"), "got: {err}");
384 }
385
386 #[test]
387 fn test_builder_defaults() {
388 let job = PeriodicJob::builder("daily_report", "0 9 * * *")
389 .build_raw(
390 "daily_report".to_string(),
391 serde_json::json!({"format": "pdf"}),
392 )
393 .unwrap();
394 assert_eq!(job.name, "daily_report");
395 assert_eq!(job.timezone, "UTC");
396 assert_eq!(job.queue, "default");
397 assert_eq!(job.priority, 2);
398 assert_eq!(job.max_attempts, 25);
399 assert!(job.tags.is_empty());
400 }
401
402 #[test]
403 fn test_builder_custom_fields() {
404 let job = PeriodicJob::builder("report", "0 9 * * *")
405 .timezone("Pacific/Auckland")
406 .queue("reports")
407 .priority(1)
408 .max_attempts(3)
409 .tags(vec!["important".to_string()])
410 .metadata(serde_json::json!({"source": "cron"}))
411 .build_raw("daily_report".to_string(), serde_json::json!({}))
412 .unwrap();
413 assert_eq!(job.timezone, "Pacific/Auckland");
414 assert_eq!(job.queue, "reports");
415 assert_eq!(job.priority, 1);
416 assert_eq!(job.max_attempts, 3);
417 assert_eq!(job.tags, vec!["important"]);
418 }
419
420 #[test]
421 fn test_latest_fire_time_finds_past_fire() {
422 let pj = make_periodic("0 * * * *", "UTC");
424 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
425 let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 13, 0, 0).unwrap());
426
427 let fire = pj.latest_fire_time(now, after);
428 assert_eq!(
429 fire,
430 Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap())
431 );
432 }
433
434 #[test]
435 fn test_no_fire_when_next_is_future() {
436 let pj = make_periodic("0 * * * *", "UTC");
438 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
439 let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
441
442 let fire = pj.latest_fire_time(now, after);
443 assert!(fire.is_none(), "Should not fire until 15:00");
444 }
445
446 #[test]
447 fn test_first_registration_null_last_enqueued() {
448 let pj = make_periodic("0 * * * *", "UTC");
450 let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
451
452 let fire = pj.latest_fire_time(now, None);
453 assert_eq!(
454 fire,
455 Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap()),
456 "Should enqueue the most recent past fire on first registration"
457 );
458 }
459
460 #[test]
461 fn test_no_backfill_only_latest_fire() {
462 let pj = make_periodic("* * * * *", "UTC");
464 let now = Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap();
465 let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
466
467 let fire = pj.latest_fire_time(now, after);
468 assert_eq!(
470 fire,
471 Some(Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap())
472 );
473 }
474
475 #[test]
476 fn test_timezone_aware_fire_time() {
477 let pj = make_periodic("0 9 * * *", "Pacific/Auckland");
479 let now = Utc.with_ymd_and_hms(2025, 6, 15, 21, 30, 0).unwrap();
482 let after = Some(Utc.with_ymd_and_hms(2025, 6, 14, 21, 0, 0).unwrap());
483
484 let fire = pj.latest_fire_time(now, after);
485 assert_eq!(
487 fire,
488 Some(Utc.with_ymd_and_hms(2025, 6, 15, 21, 0, 0).unwrap())
489 );
490 }
491
492 #[test]
493 fn test_dst_spring_forward() {
494 let pj = make_periodic("30 2 * * *", "US/Eastern");
498 let now = Utc.with_ymd_and_hms(2025, 3, 9, 12, 0, 0).unwrap();
499 let after = Some(Utc.with_ymd_and_hms(2025, 3, 8, 12, 0, 0).unwrap());
500
501 let fire = pj.latest_fire_time(now, after);
502 let fire_count = if fire.is_some() { 1 } else { 0 };
505 assert!(
506 fire_count <= 1,
507 "Should fire at most once during spring-forward"
508 );
509 }
510
511 #[test]
512 fn test_dst_fall_back() {
513 let pj = make_periodic("30 1 * * *", "US/Eastern");
516 let now = Utc.with_ymd_and_hms(2025, 11, 2, 12, 0, 0).unwrap();
517 let after = Some(Utc.with_ymd_and_hms(2025, 11, 1, 12, 0, 0).unwrap());
518
519 let fire = pj.latest_fire_time(now, after);
520 assert!(fire.is_some(), "Should fire once during fall-back");
521
522 let fire_time = fire.unwrap();
524 let second_fire = pj.latest_fire_time(now, Some(fire_time));
525 assert!(
526 second_fire.is_none(),
527 "Should not fire a second time during fall-back"
528 );
529 }
530
531 #[test]
532 fn test_invalid_priority() {
533 let result = PeriodicJob::builder("test", "0 9 * * *")
534 .priority(5)
535 .build_raw("test_job".to_string(), serde_json::json!({}));
536 assert!(result.is_err());
537 }
538
539 #[test]
540 fn test_invalid_max_attempts() {
541 let result = PeriodicJob::builder("test", "0 9 * * *")
542 .max_attempts(0)
543 .build_raw("test_job".to_string(), serde_json::json!({}));
544 assert!(result.is_err());
545 }
546}