use std::str::FromStr;
use std::sync::Arc;
use anyhow::Result;
use cron::Schedule;
use tokio::time::{interval, Duration};
use tracing::{debug, error, info, warn};
use crate::store::WorkflowStore;
use crate::types::{WorkflowEvent, WorkflowRecord};
const SCHEDULER_POLL_SECS: u64 = 15;
pub async fn run_scheduler<S: WorkflowStore>(store: Arc<S>) {
let mut tick = interval(Duration::from_secs(SCHEDULER_POLL_SECS));
info!("Cron scheduler started (poll every {SCHEDULER_POLL_SECS}s)");
loop {
tick.tick().await;
match store.try_acquire_scheduler_lock().await {
Ok(true) => {
if let Err(e) = evaluate_schedules(&*store).await {
error!("Scheduler error: {e}");
}
}
Ok(false) => {
debug!("Scheduler: not the leader, skipping cron evaluation");
}
Err(e) => {
error!("Scheduler: leader election failed: {e}");
}
}
}
}
async fn evaluate_schedules<S: WorkflowStore>(store: &S) -> Result<()> {
let namespaces = store.list_namespaces().await?;
let now = timestamp_now();
for ns in &namespaces {
if let Err(e) = evaluate_namespace_schedules(store, &ns.name, now).await {
error!("Scheduler error in namespace '{}': {e}", ns.name);
}
}
Ok(())
}
async fn evaluate_namespace_schedules<S: WorkflowStore>(store: &S, namespace: &str, now: f64) -> Result<()> {
let schedules = store.list_schedules(namespace).await?;
for sched in schedules {
if sched.paused {
continue;
}
let is_due = match sched.next_run_at {
Some(next) => now >= next,
None => true, };
if !is_due {
continue;
}
let next_run = match compute_next_run(&sched.cron_expr, &sched.timezone) {
Some(t) => t,
None => {
warn!(
"Invalid cron expression or timezone for schedule '{}': expr={} tz={}",
sched.name, sched.cron_expr, sched.timezone
);
continue;
}
};
if sched.overlap_policy == "skip"
&& let Some(ref last_wf_id) = sched.last_workflow_id
&& let Some(wf) = store.get_workflow(last_wf_id).await?
&& !crate::types::WorkflowStatus::from_str(&wf.status)
.map(|s| s.is_terminal())
.unwrap_or(true)
{
debug!(
"Schedule '{}': skipping — previous run {} still {}",
sched.name, last_wf_id, wf.status
);
store
.update_schedule_last_run(namespace, &sched.name, now, next_run, last_wf_id)
.await?;
continue;
}
let workflow_id = format!("{}-{}", sched.name, now as u64);
let run_id = format!("run-{workflow_id}");
let wf = WorkflowRecord {
id: workflow_id.clone(),
namespace: namespace.to_string(),
run_id,
workflow_type: sched.workflow_type.clone(),
task_queue: sched.task_queue.clone(),
status: "PENDING".to_string(),
input: sched.input.clone(),
result: None,
error: None,
parent_id: None,
claimed_by: None,
search_attributes: None,
archived_at: None,
archive_uri: None,
created_at: now,
updated_at: now,
completed_at: None,
};
store.create_workflow(&wf).await?;
store
.append_event(&WorkflowEvent {
id: None,
workflow_id: workflow_id.clone(),
seq: 1,
event_type: "WorkflowStarted".to_string(),
payload: sched.input.clone(),
timestamp: now,
})
.await?;
store.mark_workflow_dispatchable(&workflow_id).await?;
store
.update_schedule_last_run(namespace, &sched.name, now, next_run, &workflow_id)
.await?;
info!(
"Schedule '{}': started workflow {workflow_id} (type: {})",
sched.name, sched.workflow_type
);
}
Ok(())
}
fn compute_next_run(cron_expr: &str, timezone: &str) -> Option<f64> {
let schedule = Schedule::from_str(cron_expr).ok()?;
if timezone.is_empty() || timezone.eq_ignore_ascii_case("UTC") {
let next = schedule.upcoming(chrono::Utc).next()?;
return Some(next.timestamp() as f64);
}
let tz: chrono_tz::Tz = timezone.parse().ok()?;
let next = schedule.upcoming(tz).next()?;
Some(next.timestamp() as f64)
}
fn timestamp_now() -> f64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn compute_next_run_honors_timezone() {
let utc_next = compute_next_run("0 0 2 * * *", "UTC").expect("utc next_run");
let berlin_next =
compute_next_run("0 0 2 * * *", "Europe/Berlin").expect("berlin next_run");
assert_ne!(
utc_next, berlin_next,
"02:00 UTC and 02:00 Europe/Berlin should not coincide"
);
}
#[test]
fn compute_next_run_empty_timezone_defaults_to_utc() {
let utc_next = compute_next_run("0 0 2 * * *", "UTC").expect("utc next_run");
let default_next = compute_next_run("0 0 2 * * *", "").expect("empty next_run");
assert_eq!(utc_next, default_next);
}
#[test]
fn compute_next_run_invalid_timezone_returns_none() {
assert!(compute_next_run("0 0 2 * * *", "Not/AZone").is_none());
}
#[test]
fn compute_next_run_invalid_cron_returns_none() {
assert!(compute_next_run("not a cron", "UTC").is_none());
}
}