use std::str::FromStr;
use std::time::Duration;
use chrono::{DateTime, Utc};
use chrono_tz::Tz;
use cron::Schedule;
use mockforge_registry_core::models::test_run::EnqueueTestRun;
use mockforge_registry_core::models::{CloudWorkspace, TestRun, TestSchedule, TestSuite};
use sqlx::PgPool;
use tracing::{error, info, warn};
use crate::redis::RedisPool;
use crate::run_queue::{enqueue, EnqueuedJob};
const TICK_INTERVAL: Duration = Duration::from_secs(60);
pub fn start_test_schedule_worker(pool: PgPool, redis: Option<RedisPool>) {
info!("test_schedule worker started — ticking every {}s", TICK_INTERVAL.as_secs());
tokio::spawn(async move {
let mut interval = tokio::time::interval(TICK_INTERVAL);
interval.tick().await;
loop {
interval.tick().await;
if let Err(e) = run_tick(&pool, redis.as_ref()).await {
error!(error = %e, "test_schedule tick failed");
}
}
});
}
pub async fn run_tick(pool: &PgPool, redis: Option<&RedisPool>) -> sqlx::Result<u32> {
let schedules = TestSchedule::list_enabled(pool).await?;
if schedules.is_empty() {
return Ok(0);
}
let now = Utc::now();
let mut fired = 0u32;
for sched in schedules {
let lower_bound = sched.last_triggered_at.unwrap_or(sched.created_at);
match next_fire_in_window(&sched.cron, &sched.timezone, lower_bound, now) {
Ok(Some(fire_at)) => {
if let Err(e) = trigger_scheduled_run(pool, redis, &sched, fire_at).await {
error!(
schedule_id = %sched.id,
error = %e,
"scheduled run trigger failed",
);
} else {
fired += 1;
}
}
Ok(None) => {
}
Err(e) => {
warn!(
schedule_id = %sched.id,
cron = %sched.cron,
timezone = %sched.timezone,
error = %e,
"skipping schedule with unparsable cron / tz",
);
}
}
}
if fired > 0 {
info!(fired, "test_schedule tick triggered runs");
}
Ok(fired)
}
async fn trigger_scheduled_run(
pool: &PgPool,
redis: Option<&RedisPool>,
sched: &TestSchedule,
fire_at: DateTime<Utc>,
) -> sqlx::Result<()> {
let marked = TestSchedule::mark_triggered(pool, sched.id, fire_at).await?;
if marked.is_none() {
return Ok(());
}
let suite = match TestSuite::find_by_id(pool, sched.suite_id).await? {
Some(s) => s,
None => {
warn!(
schedule_id = %sched.id,
suite_id = %sched.suite_id,
"schedule references missing suite — skipping",
);
return Ok(());
}
};
let workspace = match CloudWorkspace::find_by_id(pool, suite.workspace_id).await {
Ok(Some(w)) => w,
Ok(None) => {
warn!(
schedule_id = %sched.id,
workspace_id = %suite.workspace_id,
"suite references missing workspace — skipping",
);
return Ok(());
}
Err(e) => {
error!(
schedule_id = %sched.id,
error = %e,
"DB error loading workspace for scheduled run",
);
return Err(e);
}
};
let run = TestRun::enqueue(
pool,
EnqueueTestRun {
suite_id: suite.id,
org_id: workspace.org_id,
kind: &suite.kind,
triggered_by: "schedule",
triggered_by_user: None,
git_ref: None,
git_sha: None,
},
)
.await?;
let mut payload = match suite.config.clone() {
serde_json::Value::Object(map) => map,
_ => serde_json::Map::new(),
};
payload.insert("schedule_id".into(), serde_json::json!(sched.id));
if let Err(e) = enqueue(
redis,
EnqueuedJob {
run_id: run.id,
org_id: run.org_id,
source_id: suite.id,
kind: &suite.kind,
payload: serde_json::Value::Object(payload),
},
)
.await
{
error!(
schedule_id = %sched.id,
run_id = %run.id,
error = %e,
"scheduled run inserted but Redis enqueue failed",
);
}
Ok(())
}
fn next_fire_in_window(
cron_expr: &str,
timezone: &str,
lower_bound: DateTime<Utc>,
now: DateTime<Utc>,
) -> Result<Option<DateTime<Utc>>, String> {
let tz: Tz = timezone.parse().map_err(|e| format!("invalid timezone: {e}"))?;
let schedule = Schedule::from_str(cron_expr).map_err(|e| format!("invalid cron: {e}"))?;
let local_lb = lower_bound.with_timezone(&tz);
let next_local = schedule.after(&local_lb).next();
let Some(next_local) = next_local else {
return Ok(None);
};
let next_utc = next_local.with_timezone(&Utc);
if next_utc <= now {
Ok(Some(next_utc))
} else {
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
#[test]
fn fire_in_window_returns_some_when_cron_ticked_since_lb() {
let lb = Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap();
let now = Utc.with_ymd_and_hms(2026, 1, 1, 12, 1, 30).unwrap();
let r = next_fire_in_window("0 * * * * *", "UTC", lb, now).unwrap();
assert!(r.is_some(), "expected a fire in window");
}
#[test]
fn fire_in_window_returns_none_when_lb_just_after_last_fire() {
let lb = Utc.with_ymd_and_hms(2026, 1, 1, 12, 1, 1).unwrap();
let now = Utc.with_ymd_and_hms(2026, 1, 1, 12, 1, 30).unwrap();
let r = next_fire_in_window("0 * * * * *", "UTC", lb, now).unwrap();
assert!(r.is_none(), "expected no fire in window, got {r:?}");
}
#[test]
fn fire_in_window_rejects_invalid_cron() {
let lb = Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap();
let now = Utc.with_ymd_and_hms(2026, 1, 1, 1, 0, 0).unwrap();
let r = next_fire_in_window("not a cron", "UTC", lb, now);
assert!(r.is_err());
}
#[test]
fn fire_in_window_rejects_invalid_timezone() {
let lb = Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap();
let now = Utc.with_ymd_and_hms(2026, 1, 1, 1, 0, 0).unwrap();
let r = next_fire_in_window("0 * * * * *", "Mars/Olympus", lb, now);
assert!(r.is_err());
}
}