use crate::error::AwaError;
use crate::job::JobRow;
use chrono::{DateTime, Utc};
use croner::Cron;
use serde::Serialize;
use sqlx::PgExecutor;
#[derive(Debug, Clone)]
pub struct PeriodicJob {
pub name: String,
pub cron_expr: String,
pub timezone: String,
pub kind: String,
pub queue: String,
pub args: serde_json::Value,
pub priority: i16,
pub max_attempts: i16,
pub tags: Vec<String>,
pub metadata: serde_json::Value,
}
impl PeriodicJob {
pub fn builder(name: impl Into<String>, cron_expr: impl Into<String>) -> PeriodicJobBuilder {
PeriodicJobBuilder {
name: name.into(),
cron_expr: cron_expr.into(),
timezone: "UTC".to_string(),
queue: "default".to_string(),
priority: 2,
max_attempts: 25,
tags: Vec::new(),
metadata: serde_json::json!({}),
}
}
pub fn latest_fire_time(
&self,
now: DateTime<Utc>,
after: Option<DateTime<Utc>>,
) -> Option<DateTime<Utc>> {
let cron = Cron::new(&self.cron_expr)
.parse()
.expect("cron_expr was validated at build time");
let tz: chrono_tz::Tz = self
.timezone
.parse()
.expect("timezone was validated at build time");
let now_tz = now.with_timezone(&tz);
let search_start = match after {
Some(after_time) => after_time.with_timezone(&tz),
None => now_tz - chrono::Duration::hours(24),
};
let mut latest_fire: Option<DateTime<Utc>> = None;
for fire_time in cron.clone().iter_from(search_start) {
let fire_utc = fire_time.with_timezone(&Utc);
if fire_utc > now {
break;
}
if let Some(after_time) = after {
if fire_utc <= after_time {
continue;
}
}
latest_fire = Some(fire_utc);
}
latest_fire
}
}
#[derive(Debug, Clone)]
pub struct PeriodicJobBuilder {
name: String,
cron_expr: String,
timezone: String,
queue: String,
priority: i16,
max_attempts: i16,
tags: Vec<String>,
metadata: serde_json::Value,
}
impl PeriodicJobBuilder {
pub fn timezone(mut self, timezone: impl Into<String>) -> Self {
self.timezone = timezone.into();
self
}
pub fn queue(mut self, queue: impl Into<String>) -> Self {
self.queue = queue.into();
self
}
pub fn priority(mut self, priority: i16) -> Self {
self.priority = priority;
self
}
pub fn max_attempts(mut self, max_attempts: i16) -> Self {
self.max_attempts = max_attempts;
self
}
pub fn tags(mut self, tags: Vec<String>) -> Self {
self.tags = tags;
self
}
pub fn metadata(mut self, metadata: serde_json::Value) -> Self {
self.metadata = metadata;
self
}
pub fn build(self, args: &impl crate::JobArgs) -> Result<PeriodicJob, AwaError> {
self.build_raw(args.kind_str().to_string(), args.to_args()?)
}
pub fn build_raw(self, kind: String, args: serde_json::Value) -> Result<PeriodicJob, AwaError> {
Cron::new(&self.cron_expr)
.parse()
.map_err(|err| AwaError::Validation(format!("invalid cron expression: {err}")))?;
self.timezone
.parse::<chrono_tz::Tz>()
.map_err(|err| AwaError::Validation(format!("invalid timezone: {err}")))?;
if !(1..=4).contains(&self.priority) {
return Err(AwaError::Validation(format!(
"priority must be between 1 and 4, got {}",
self.priority
)));
}
if !(1..=1000).contains(&self.max_attempts) {
return Err(AwaError::Validation(format!(
"max_attempts must be between 1 and 1000, got {}",
self.max_attempts
)));
}
Ok(PeriodicJob {
name: self.name,
cron_expr: self.cron_expr,
timezone: self.timezone,
kind,
queue: self.queue,
args,
priority: self.priority,
max_attempts: self.max_attempts,
tags: self.tags,
metadata: self.metadata,
})
}
}
#[derive(Debug, Clone, sqlx::FromRow, Serialize)]
pub struct CronJobRow {
pub name: String,
pub cron_expr: String,
pub timezone: String,
pub kind: String,
pub queue: String,
pub args: serde_json::Value,
pub priority: i16,
pub max_attempts: i16,
pub tags: Vec<String>,
pub metadata: serde_json::Value,
pub last_enqueued_at: Option<DateTime<Utc>>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
pub async fn upsert_cron_job<'e, E>(executor: E, job: &PeriodicJob) -> Result<(), AwaError>
where
E: PgExecutor<'e>,
{
sqlx::query(
r#"
INSERT INTO awa.cron_jobs (name, cron_expr, timezone, kind, queue, args, priority, max_attempts, tags, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (name) DO UPDATE SET
cron_expr = EXCLUDED.cron_expr,
timezone = EXCLUDED.timezone,
kind = EXCLUDED.kind,
queue = EXCLUDED.queue,
args = EXCLUDED.args,
priority = EXCLUDED.priority,
max_attempts = EXCLUDED.max_attempts,
tags = EXCLUDED.tags,
metadata = EXCLUDED.metadata,
updated_at = now()
"#,
)
.bind(&job.name)
.bind(&job.cron_expr)
.bind(&job.timezone)
.bind(&job.kind)
.bind(&job.queue)
.bind(&job.args)
.bind(job.priority)
.bind(job.max_attempts)
.bind(&job.tags)
.bind(&job.metadata)
.execute(executor)
.await?;
Ok(())
}
pub fn next_fire_time(cron_expr: &str, timezone: &str) -> Option<DateTime<Utc>> {
next_fire_time_after(cron_expr, timezone, Utc::now())
}
pub fn next_fire_time_after(
cron_expr: &str,
timezone: &str,
after: DateTime<Utc>,
) -> Option<DateTime<Utc>> {
let cron = Cron::new(cron_expr).parse().ok()?;
let tz: chrono_tz::Tz = timezone.parse().ok()?;
let after_tz = after.with_timezone(&tz);
let next = cron.iter_from(after_tz).next()?;
Some(next.with_timezone(&Utc))
}
pub async fn list_cron_jobs<'e, E>(executor: E) -> Result<Vec<CronJobRow>, AwaError>
where
E: PgExecutor<'e>,
{
let rows = sqlx::query_as::<_, CronJobRow>("SELECT * FROM awa.cron_jobs ORDER BY name")
.fetch_all(executor)
.await?;
Ok(rows)
}
pub async fn delete_cron_job<'e, E>(executor: E, name: &str) -> Result<bool, AwaError>
where
E: PgExecutor<'e>,
{
let result = sqlx::query("DELETE FROM awa.cron_jobs WHERE name = $1")
.bind(name)
.execute(executor)
.await?;
Ok(result.rows_affected() > 0)
}
pub async fn atomic_enqueue<'e, E>(
executor: E,
cron_name: &str,
fire_time: DateTime<Utc>,
previous_enqueued_at: Option<DateTime<Utc>>,
) -> Result<Option<JobRow>, AwaError>
where
E: PgExecutor<'e>,
{
let row = sqlx::query_as::<_, JobRow>(
r#"
WITH mark AS (
UPDATE awa.cron_jobs
SET last_enqueued_at = $2, updated_at = now()
WHERE name = $1
AND (last_enqueued_at IS NOT DISTINCT FROM $3)
RETURNING name, kind, queue, args, priority, max_attempts, tags, metadata
)
INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, tags, metadata)
SELECT kind, queue, args, 'available', priority, max_attempts, tags,
metadata || jsonb_build_object('cron_name', name, 'cron_fire_time', $2::text)
FROM mark
RETURNING *
"#,
)
.bind(cron_name)
.bind(fire_time)
.bind(previous_enqueued_at)
.fetch_optional(executor)
.await?;
Ok(row)
}
pub async fn trigger_cron_job<'e, E>(executor: E, name: &str) -> Result<JobRow, AwaError>
where
E: PgExecutor<'e>,
{
let row = sqlx::query_as::<_, JobRow>(
r#"
WITH cron AS (
SELECT name, kind, queue, args, priority, max_attempts, tags, metadata
FROM awa.cron_jobs
WHERE name = $1
)
INSERT INTO awa.jobs (kind, queue, args, state, priority, max_attempts, tags, metadata)
SELECT kind, queue, args, 'available', priority, max_attempts, tags,
metadata || jsonb_build_object('cron_name', name, 'triggered_manually', true)
FROM cron
RETURNING *
"#,
)
.bind(name)
.fetch_optional(executor)
.await?;
row.ok_or_else(|| AwaError::Validation(format!("cron job not found: {name}")))
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
fn make_periodic(cron_expr: &str, timezone: &str) -> PeriodicJob {
PeriodicJob {
name: "test".to_string(),
cron_expr: cron_expr.to_string(),
timezone: timezone.to_string(),
kind: "test_job".to_string(),
queue: "default".to_string(),
args: serde_json::json!({}),
priority: 2,
max_attempts: 25,
tags: vec![],
metadata: serde_json::json!({}),
}
}
#[test]
fn test_valid_cron_expression() {
let result = PeriodicJob::builder("test", "0 9 * * *")
.build_raw("test_job".to_string(), serde_json::json!({}));
assert!(result.is_ok());
}
#[test]
fn test_invalid_cron_expression() {
let result = PeriodicJob::builder("test", "not a cron")
.build_raw("test_job".to_string(), serde_json::json!({}));
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
err.to_string().contains("invalid cron expression"),
"got: {err}"
);
}
#[test]
fn test_invalid_timezone() {
let result = PeriodicJob::builder("test", "0 9 * * *")
.timezone("Not/A/Timezone")
.build_raw("test_job".to_string(), serde_json::json!({}));
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.to_string().contains("invalid timezone"), "got: {err}");
}
#[test]
fn test_builder_defaults() {
let job = PeriodicJob::builder("daily_report", "0 9 * * *")
.build_raw(
"daily_report".to_string(),
serde_json::json!({"format": "pdf"}),
)
.unwrap();
assert_eq!(job.name, "daily_report");
assert_eq!(job.timezone, "UTC");
assert_eq!(job.queue, "default");
assert_eq!(job.priority, 2);
assert_eq!(job.max_attempts, 25);
assert!(job.tags.is_empty());
}
#[test]
fn test_builder_custom_fields() {
let job = PeriodicJob::builder("report", "0 9 * * *")
.timezone("Pacific/Auckland")
.queue("reports")
.priority(1)
.max_attempts(3)
.tags(vec!["important".to_string()])
.metadata(serde_json::json!({"source": "cron"}))
.build_raw("daily_report".to_string(), serde_json::json!({}))
.unwrap();
assert_eq!(job.timezone, "Pacific/Auckland");
assert_eq!(job.queue, "reports");
assert_eq!(job.priority, 1);
assert_eq!(job.max_attempts, 3);
assert_eq!(job.tags, vec!["important"]);
}
#[test]
fn test_latest_fire_time_finds_past_fire() {
let pj = make_periodic("0 * * * *", "UTC");
let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 13, 0, 0).unwrap());
let fire = pj.latest_fire_time(now, after);
assert_eq!(
fire,
Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap())
);
}
#[test]
fn test_no_fire_when_next_is_future() {
let pj = make_periodic("0 * * * *", "UTC");
let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
let fire = pj.latest_fire_time(now, after);
assert!(fire.is_none(), "Should not fire until 15:00");
}
#[test]
fn test_first_registration_null_last_enqueued() {
let pj = make_periodic("0 * * * *", "UTC");
let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
let fire = pj.latest_fire_time(now, None);
assert_eq!(
fire,
Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap()),
"Should enqueue the most recent past fire on first registration"
);
}
#[test]
fn test_no_backfill_only_latest_fire() {
let pj = make_periodic("* * * * *", "UTC");
let now = Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap();
let after = Some(Utc.with_ymd_and_hms(2025, 6, 15, 14, 0, 0).unwrap());
let fire = pj.latest_fire_time(now, after);
assert_eq!(
fire,
Some(Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap())
);
}
#[test]
fn test_timezone_aware_fire_time() {
let pj = make_periodic("0 9 * * *", "Pacific/Auckland");
let now = Utc.with_ymd_and_hms(2025, 6, 15, 21, 30, 0).unwrap();
let after = Some(Utc.with_ymd_and_hms(2025, 6, 14, 21, 0, 0).unwrap());
let fire = pj.latest_fire_time(now, after);
assert_eq!(
fire,
Some(Utc.with_ymd_and_hms(2025, 6, 15, 21, 0, 0).unwrap())
);
}
#[test]
fn test_dst_spring_forward() {
let pj = make_periodic("30 2 * * *", "US/Eastern");
let now = Utc.with_ymd_and_hms(2025, 3, 9, 12, 0, 0).unwrap();
let after = Some(Utc.with_ymd_and_hms(2025, 3, 8, 12, 0, 0).unwrap());
let fire = pj.latest_fire_time(now, after);
let fire_count = if fire.is_some() { 1 } else { 0 };
assert!(
fire_count <= 1,
"Should fire at most once during spring-forward"
);
}
#[test]
fn test_dst_fall_back() {
let pj = make_periodic("30 1 * * *", "US/Eastern");
let now = Utc.with_ymd_and_hms(2025, 11, 2, 12, 0, 0).unwrap();
let after = Some(Utc.with_ymd_and_hms(2025, 11, 1, 12, 0, 0).unwrap());
let fire = pj.latest_fire_time(now, after);
assert!(fire.is_some(), "Should fire once during fall-back");
let fire_time = fire.unwrap();
let second_fire = pj.latest_fire_time(now, Some(fire_time));
assert!(
second_fire.is_none(),
"Should not fire a second time during fall-back"
);
}
#[test]
fn test_invalid_priority() {
let result = PeriodicJob::builder("test", "0 9 * * *")
.priority(5)
.build_raw("test_job".to_string(), serde_json::json!({}));
assert!(result.is_err());
}
#[test]
fn test_invalid_max_attempts() {
let result = PeriodicJob::builder("test", "0 9 * * *")
.max_attempts(0)
.build_raw("test_job".to_string(), serde_json::json!({}));
assert!(result.is_err());
}
#[test]
fn test_next_fire_time_exact() {
let now = Utc.with_ymd_and_hms(2025, 6, 15, 14, 35, 0).unwrap();
let next = next_fire_time_after("0 * * * *", "UTC", now);
assert_eq!(
next,
Some(Utc.with_ymd_and_hms(2025, 6, 15, 15, 0, 0).unwrap())
);
}
#[test]
fn test_next_fire_time_respects_timezone() {
let now = Utc.with_ymd_and_hms(2025, 6, 15, 20, 0, 0).unwrap();
let next = next_fire_time_after("0 9 * * *", "Pacific/Auckland", now);
assert_eq!(
next,
Some(Utc.with_ymd_and_hms(2025, 6, 15, 21, 0, 0).unwrap())
);
let next_utc = next_fire_time_after("0 9 * * *", "UTC", now);
assert_eq!(
next_utc,
Some(Utc.with_ymd_and_hms(2025, 6, 16, 9, 0, 0).unwrap())
);
}
#[test]
fn test_next_fire_time_dst_boundary() {
let now = Utc.with_ymd_and_hms(2025, 3, 9, 6, 30, 0).unwrap();
let next = next_fire_time_after("0 * * * *", "US/Eastern", now);
assert!(next.is_some());
let next = next.unwrap();
assert!(
next >= Utc.with_ymd_and_hms(2025, 3, 9, 7, 0, 0).unwrap(),
"should skip the non-existent 2:00 AM, got {next}"
);
}
#[test]
fn test_next_fire_time_invalid_input() {
let now = Utc::now();
assert!(next_fire_time_after("not a cron", "UTC", now).is_none());
assert!(next_fire_time_after("* * * * *", "Not/A/Zone", now).is_none());
}
}