use chrono::{Datelike, Timelike};
use serde::{Deserialize, Serialize};
use crate::types::EventPayload;
#[derive(Debug, Clone)]
enum CronField {
Any,
Value(u32),
Step { base: Option<u32>, step: u32 },
List(Vec<u32>),
Range { start: u32, end: u32 },
}
impl CronField {
fn matches(&self, value: u32) -> bool {
match self {
CronField::Any => true,
CronField::Value(v) => *v == value,
CronField::Step { base, step } => {
let start = base.unwrap_or(0);
if value < start {
return false;
}
(value - start).is_multiple_of(*step)
},
CronField::List(values) => values.contains(&value),
CronField::Range { start, end } => value >= *start && value <= *end,
}
}
fn parse(field: &str) -> Result<Self, String> {
if field == "*" {
return Ok(CronField::Any);
}
if let Some(step_idx) = field.find('/') {
let (base_part, step_part) = field.split_at(step_idx);
let step_str = &step_part[1..];
let step = step_str
.parse::<u32>()
.map_err(|_| format!("Invalid step value: {}", step_str))?;
let base = if base_part == "*" {
None
} else {
Some(
base_part
.parse::<u32>()
.map_err(|_| format!("Invalid base value: {}", base_part))?,
)
};
return Ok(CronField::Step { base, step });
}
if let Some(dash_idx) = field.find('-') {
let (start_str, end_str) = field.split_at(dash_idx);
let end_str = &end_str[1..];
let start = start_str
.parse::<u32>()
.map_err(|_| format!("Invalid range start: {}", start_str))?;
let end =
end_str.parse::<u32>().map_err(|_| format!("Invalid range end: {}", end_str))?;
return Ok(CronField::Range { start, end });
}
if field.contains(',') {
let values = field
.split(',')
.map(|v| v.parse::<u32>().map_err(|_| format!("Invalid list value: {}", v)))
.collect::<Result<Vec<u32>, String>>()?;
return Ok(CronField::List(values));
}
let value = field
.parse::<u32>()
.map_err(|_| format!("Invalid cron field value: {}", field))?;
Ok(CronField::Value(value))
}
}
#[derive(Debug, Clone)]
pub struct CronSchedule {
pub expression: String,
minute: CronField,
hour: CronField,
day: CronField,
month: CronField,
weekday: CronField,
}
impl CronSchedule {
pub fn parse(expression: &str) -> Result<Self, String> {
let parts: Vec<&str> = expression.split_whitespace().collect();
if parts.len() != 5 {
return Err(format!("Cron expression must have 5 fields, got {}", parts.len()));
}
let minute = CronField::parse(parts[0])?;
let hour = CronField::parse(parts[1])?;
let day = CronField::parse(parts[2])?;
let month = CronField::parse(parts[3])?;
let weekday = CronField::parse(parts[4])?;
Ok(CronSchedule {
expression: expression.to_string(),
minute,
hour,
day,
month,
weekday,
})
}
#[must_use]
pub fn matches(&self, datetime: &chrono::DateTime<chrono::Utc>) -> bool {
let minute = datetime.minute();
let hour = datetime.hour();
let day = datetime.day();
let month = datetime.month();
let weekday = datetime.weekday().number_from_sunday();
self.minute.matches(minute)
&& self.hour.matches(hour)
&& self.day.matches(day)
&& self.month.matches(month)
&& self.weekday.matches(weekday)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CronExecutionState {
pub last_executed: Option<chrono::DateTime<chrono::Utc>>,
}
impl CronExecutionState {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn should_execute(
&self,
schedule: &CronSchedule,
exec_time: &chrono::DateTime<chrono::Utc>,
) -> bool {
if !schedule.matches(exec_time) {
return false;
}
let Some(last_exec) = self.last_executed else {
return true;
};
let window_start = Self::find_schedule_window(schedule, exec_time);
last_exec >= window_start
}
fn find_schedule_window(
schedule: &CronSchedule,
time: &chrono::DateTime<chrono::Utc>,
) -> chrono::DateTime<chrono::Utc> {
let mut current = *time;
for _ in 0..60 {
current -= chrono::Duration::minutes(1);
if schedule.matches(¤t) {
return current;
}
}
*time
}
#[allow(clippy::missing_const_for_fn)] pub fn record_execution(&mut self, time: chrono::DateTime<chrono::Utc>) {
self.last_executed = Some(time);
}
#[must_use]
pub fn find_missed_executions(
&self,
schedule: &CronSchedule,
since: &chrono::DateTime<chrono::Utc>,
until: &chrono::DateTime<chrono::Utc>,
) -> Vec<chrono::DateTime<chrono::Utc>> {
let mut missed = Vec::new();
let mut current = *since + chrono::Duration::minutes(1);
while current < *until {
if schedule.matches(¤t) {
missed.push(current);
}
current += chrono::Duration::minutes(1);
}
missed
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CronTrigger {
pub function_name: String,
pub schedule: String,
pub timezone: String,
}
impl CronTrigger {
#[must_use]
pub fn build_payload(&self, exec_time: &chrono::DateTime<chrono::Utc>) -> EventPayload {
let trigger_type = format!("cron:{}", self.function_name);
let data = serde_json::json!({
"schedule": self.schedule,
"timezone": self.timezone,
"executed_at": exec_time.to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
});
EventPayload {
trigger_type,
entity: "cron".to_string(),
event_kind: "scheduled".to_string(),
data,
timestamp: chrono::Utc::now(),
}
}
}
pub struct CronScheduler {
triggers: Vec<(CronTrigger, CronExecutionState)>,
}
impl CronScheduler {
#[must_use]
pub fn new(triggers: Vec<CronTrigger>) -> Self {
let triggers = triggers.into_iter().map(|t| (t, CronExecutionState::new())).collect();
Self { triggers }
}
#[must_use]
pub const fn trigger_count(&self) -> usize {
self.triggers.len()
}
#[must_use]
pub fn start(
self,
observer: std::sync::Arc<crate::observer::FunctionObserver>,
module_registry: std::collections::HashMap<String, crate::types::FunctionModule>,
) -> CronSchedulerHandle {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
tokio::spawn(cron_scheduler_task(self, observer, module_registry, shutdown_rx));
CronSchedulerHandle {
shutdown_tx: Some(shutdown_tx),
}
}
}
async fn cron_scheduler_task(
mut scheduler: CronScheduler,
observer: std::sync::Arc<crate::observer::FunctionObserver>,
module_registry: std::collections::HashMap<String, crate::types::FunctionModule>,
mut shutdown_rx: tokio::sync::oneshot::Receiver<()>,
) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
interval.tick().await;
loop {
tokio::select! {
_ = interval.tick() => {
let now = chrono::Utc::now();
for (trigger, state) in &mut scheduler.triggers {
let schedule = match CronSchedule::parse(&trigger.schedule) {
Ok(s) => s,
Err(e) => {
tracing::warn!(
function = %trigger.function_name,
expression = %trigger.schedule,
error = %e,
"Invalid cron expression — skipping trigger"
);
continue;
}
};
if !state.should_execute(&schedule, &now) {
continue;
}
state.record_execution(now);
let Some(module) = module_registry.get(&trigger.function_name) else {
tracing::warn!(
function = %trigger.function_name,
"Cron trigger fired but function module not found — skipping"
);
continue;
};
let payload = trigger.build_payload(&now);
let observer_clone = std::sync::Arc::clone(&observer);
let module_clone = module.clone();
let fn_name = trigger.function_name.clone();
tokio::spawn(async move {
let host = crate::host::NoopHostContext::new(payload.clone());
match observer_clone
.invoke(
&module_clone,
payload,
&host,
crate::types::ResourceLimits::default(),
)
.await
{
Ok(_) => {
tracing::debug!(function = %fn_name, "Cron function completed");
}
Err(e) => {
let chain = error_source_chain(&e);
tracing::error!(
function = %fn_name,
error = %e,
error.debug = ?e,
error.chain = %chain,
"Cron function invocation failed"
);
}
}
});
}
}
_ = &mut shutdown_rx => {
tracing::debug!("Cron scheduler received shutdown signal — stopping");
break;
}
}
}
}
fn error_source_chain(err: &(dyn std::error::Error + 'static)) -> String {
use std::fmt::Write as _;
let mut chain = String::new();
let mut current: Option<&dyn std::error::Error> = err.source();
while let Some(source) = current {
if !chain.is_empty() {
chain.push_str(" → ");
}
let _ = write!(chain, "{source}");
current = source.source();
}
if chain.is_empty() {
"<no source>".to_string()
} else {
chain
}
}
pub struct CronSchedulerHandle {
shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
}
impl CronSchedulerHandle {
pub fn stop(mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
}
}
impl Drop for CronSchedulerHandle {
fn drop(&mut self) {
if let Some(tx) = self.shutdown_tx.take() {
let _ = tx.send(());
}
}
}