use std::str::FromStr;
use chrono::Utc;
use cron::Schedule;
use keel_events::{EventQueueHandle, KeelInput};
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CronJobConfig {
pub id: String,
pub name: String,
pub expression: String,
pub prompt: String,
pub enabled: bool,
pub project_id: Option<String>,
}
pub struct CronRunner;
impl CronRunner {
pub fn spawn(
config: CronJobConfig,
queue_handle: EventQueueHandle,
shutdown: watch::Receiver<bool>,
) -> anyhow::Result<tokio::task::JoinHandle<()>> {
Schedule::from_str(&config.expression)
.map_err(|e| anyhow::anyhow!("Invalid cron expression '{}': {}", config.expression, e))?;
let handle = tokio::spawn(async move {
Self::run(config, queue_handle, shutdown).await;
});
Ok(handle)
}
async fn run(
config: CronJobConfig,
queue_handle: EventQueueHandle,
mut shutdown: watch::Receiver<bool>,
) {
if !config.enabled {
info!(cron_id = %config.id, "Cron job disabled, not running");
return;
}
let schedule = match Schedule::from_str(&config.expression) {
Ok(s) => s,
Err(e) => {
error!(
cron_id = %config.id,
expression = %config.expression,
error = %e,
"Failed to parse cron expression"
);
return;
}
};
info!(
cron_id = %config.id,
name = %config.name,
expression = %config.expression,
"Cron runner started"
);
loop {
let next = match schedule.upcoming(Utc).next() {
Some(t) => t,
None => {
warn!(cron_id = %config.id, "No future occurrences, stopping");
return;
}
};
let now = Utc::now();
let delay = next.signed_duration_since(now);
let sleep_duration = if delay.num_milliseconds() > 0 {
std::time::Duration::from_millis(delay.num_milliseconds() as u64)
} else {
std::time::Duration::from_millis(0)
};
debug!(
cron_id = %config.id,
next = %next,
sleep_ms = sleep_duration.as_millis() as u64,
"Sleeping until next cron trigger"
);
tokio::select! {
_ = tokio::time::sleep(sleep_duration) => {}
_ = shutdown.changed() => {
if *shutdown.borrow() {
info!(cron_id = %config.id, "Cron runner shutting down");
return;
}
}
}
let mut input = KeelInput::cron(&config.name, &config.prompt);
if let Some(ref project_id) = config.project_id {
input = input.with_project(project_id.clone());
}
if let Err(e) = queue_handle.push(input) {
error!(
cron_id = %config.id,
error = %e,
"Failed to push cron input"
);
return;
}
debug!(cron_id = %config.id, "Cron job fired");
}
}
}