use crate::context::Context;
use crate::cron_evaluator::CronEvaluator;
use crate::dal::DAL;
use crate::error::ValidationError;
use crate::executor::{PipelineError, PipelineExecutor};
use crate::models::cron_schedule::{CatchupPolicy, CronSchedule};
use chrono::{DateTime, Utc};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct CronSchedulerConfig {
pub poll_interval: Duration,
pub max_catchup_executions: usize,
pub max_acceptable_delay: Duration,
}
impl Default for CronSchedulerConfig {
fn default() -> Self {
Self {
poll_interval: Duration::from_secs(30),
max_catchup_executions: 100,
max_acceptable_delay: Duration::from_secs(300), }
}
}
#[derive(Clone)]
pub struct CronScheduler {
dal: Arc<DAL>,
executor: Arc<dyn PipelineExecutor>,
config: CronSchedulerConfig,
shutdown: watch::Receiver<bool>,
}
impl CronScheduler {
pub fn new(
dal: Arc<DAL>,
executor: Arc<dyn PipelineExecutor>,
config: CronSchedulerConfig,
shutdown: watch::Receiver<bool>,
) -> Self {
Self {
dal,
executor,
config,
shutdown,
}
}
pub fn with_defaults(
dal: Arc<DAL>,
executor: Arc<dyn PipelineExecutor>,
shutdown: watch::Receiver<bool>,
) -> Self {
Self::new(dal, executor, CronSchedulerConfig::default(), shutdown)
}
pub async fn run_polling_loop(&mut self) -> Result<(), PipelineError> {
info!(
"Starting cron scheduler polling loop (interval: {:?})",
self.config.poll_interval
);
let mut interval = tokio::time::interval(self.config.poll_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = self.check_and_execute_schedules().await {
error!("Error processing cron schedules: {}", e);
}
}
_ = self.shutdown.changed() => {
if *self.shutdown.borrow() {
info!("Cron scheduler received shutdown signal");
break;
}
}
}
}
info!("Cron scheduler polling loop stopped");
Ok(())
}
async fn check_and_execute_schedules(&self) -> Result<(), PipelineError> {
let now = Utc::now();
debug!("Checking for due cron schedules at {}", now);
let due_schedules = self
.dal
.cron_schedule()
.get_due_schedules(now)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: e.to_string(),
})?;
if due_schedules.is_empty() {
debug!("No due schedules found");
return Ok(());
}
info!("Found {} due cron schedule(s)", due_schedules.len());
for schedule in due_schedules {
if let Err(e) = self.process_schedule(&schedule, now).await {
error!("Failed to process schedule {}: {}", schedule.id, e);
}
}
Ok(())
}
async fn process_schedule(
&self,
schedule: &CronSchedule,
now: DateTime<Utc>,
) -> Result<(), PipelineError> {
debug!(
"Processing schedule: {} (workflow: {})",
schedule.id, schedule.workflow_name
);
if !self.is_schedule_active(schedule, now) {
debug!(
"Schedule {} is outside its active time window, skipping",
schedule.id
);
return Ok(());
}
let execution_times = self.calculate_execution_times(schedule, now)?;
if execution_times.is_empty() {
debug!("No execution times calculated for schedule {}", schedule.id);
return Ok(());
}
let next_run = self.calculate_next_run(schedule, now)?;
let claimed = self
.dal
.cron_schedule()
.claim_and_update(schedule.id, now, now, next_run)
.await
.map_err(|e| PipelineError::ExecutionFailed {
message: e.to_string(),
})?;
if !claimed {
debug!(
"Schedule {} was already claimed by another instance",
schedule.id
);
return Ok(());
}
info!(
"Successfully claimed schedule {} for {} execution(s)",
schedule.id,
execution_times.len()
);
for scheduled_time in execution_times {
let audit_record_id = match self
.create_execution_audit(schedule.id, scheduled_time)
.await
{
Ok(id) => id,
Err(e) => {
error!(
"Failed to create execution audit for schedule {} at {}: {}",
schedule.id, scheduled_time, e
);
continue;
}
};
match self.execute_workflow(schedule, scheduled_time).await {
Ok(pipeline_execution_id) => {
if let Err(e) = self
.complete_execution_audit(audit_record_id, pipeline_execution_id)
.await
{
error!(
"Failed to complete audit trail for schedule {} execution: {}",
schedule.id, e
);
}
info!(
"Successfully executed and audited workflow {} for schedule {} (scheduled: {})",
schedule.workflow_name, schedule.id, scheduled_time
);
}
Err(e) => {
error!(
"Failed to execute workflow {} for schedule {} (scheduled: {}): {}",
schedule.workflow_name, schedule.id, scheduled_time, e
);
error!(
"Execution lost: audit record {} exists but pipeline execution failed",
audit_record_id
);
}
}
}
Ok(())
}
fn is_schedule_active(&self, schedule: &CronSchedule, now: DateTime<Utc>) -> bool {
if let Some(start) = &schedule.start_date {
if now < start.0 {
return false;
}
}
if let Some(end) = &schedule.end_date {
if now > end.0 {
return false;
}
}
true
}
fn calculate_execution_times(
&self,
schedule: &CronSchedule,
now: DateTime<Utc>,
) -> Result<Vec<DateTime<Utc>>, PipelineError> {
let policy = CatchupPolicy::from(schedule.catchup_policy.clone());
match policy {
CatchupPolicy::Skip => {
Ok(vec![schedule.next_run_at.0])
}
CatchupPolicy::RunAll => {
let mut executions = Vec::new();
let evaluator = CronEvaluator::new(&schedule.cron_expression, &schedule.timezone)
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Cron evaluation error: {}", e),
})?;
let start_time = schedule
.last_run_at
.map(|t| t.0)
.unwrap_or(schedule.created_at.0);
let missed_executions = evaluator
.executions_between(start_time, now, self.config.max_catchup_executions)
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Cron evaluation error: {}", e),
})?;
executions.extend(missed_executions);
if executions.len() >= self.config.max_catchup_executions {
warn!(
"Limited catchup executions to {} for schedule {} (policy: RunAll)",
self.config.max_catchup_executions, schedule.id
);
}
Ok(executions)
}
}
}
fn calculate_next_run(
&self,
schedule: &CronSchedule,
after: DateTime<Utc>,
) -> Result<DateTime<Utc>, PipelineError> {
let evaluator =
CronEvaluator::new(&schedule.cron_expression, &schedule.timezone).map_err(|e| {
PipelineError::ExecutionFailed {
message: format!("Cron evaluation error: {}", e),
}
})?;
evaluator
.next_execution(after)
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Cron evaluation error: {}", e),
})
}
async fn execute_workflow(
&self,
schedule: &CronSchedule,
scheduled_time: DateTime<Utc>,
) -> Result<crate::database::UniversalUuid, PipelineError> {
let mut context = Context::new();
context
.insert(
"scheduled_time",
serde_json::json!(scheduled_time.to_rfc3339()),
)
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert("schedule_id", serde_json::json!(schedule.id.to_string()))
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert("schedule_timezone", serde_json::json!(schedule.timezone))
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert(
"schedule_expression",
serde_json::json!(schedule.cron_expression),
)
.map_err(|e| PipelineError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
info!(
"Executing workflow '{}' for schedule {} (scheduled time: {})",
schedule.workflow_name, schedule.id, scheduled_time
);
let pipeline_result = self
.executor
.execute(&schedule.workflow_name, context)
.await?;
debug!(
"Successfully handed off workflow '{}' to executor (execution_id: {})",
schedule.workflow_name, pipeline_result.execution_id
);
Ok(crate::database::UniversalUuid(pipeline_result.execution_id))
}
async fn create_execution_audit(
&self,
schedule_id: crate::database::UniversalUuid,
scheduled_time: DateTime<Utc>,
) -> Result<crate::database::UniversalUuid, ValidationError> {
use crate::database::universal_types::UniversalTimestamp;
use crate::models::cron_execution::NewCronExecution;
let new_execution = NewCronExecution::new(schedule_id, UniversalTimestamp(scheduled_time));
let audit_record = self.dal.cron_execution().create(new_execution).await?;
debug!(
"Created execution audit record {} for schedule {} (scheduled: {})",
audit_record.id, schedule_id, scheduled_time
);
Ok(audit_record.id)
}
async fn complete_execution_audit(
&self,
audit_record_id: crate::database::UniversalUuid,
pipeline_execution_id: crate::database::UniversalUuid,
) -> Result<(), ValidationError> {
self.dal
.cron_execution()
.update_pipeline_execution_id(audit_record_id, pipeline_execution_id)
.await?;
debug!(
"Completed execution audit record {} -> pipeline {}",
audit_record_id, pipeline_execution_id
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database::universal_types::{current_timestamp, UniversalBool, UniversalUuid};
use tokio::sync::watch;
fn create_test_schedule(cron_expr: &str, timezone: &str) -> CronSchedule {
let now = current_timestamp();
CronSchedule {
id: UniversalUuid::new_v4(),
workflow_name: "test_workflow".to_string(),
cron_expression: cron_expr.to_string(),
timezone: timezone.to_string(),
enabled: UniversalBool::new(true),
catchup_policy: "skip".to_string(),
start_date: None,
end_date: None,
next_run_at: now,
last_run_at: None,
created_at: now,
updated_at: now,
}
}
#[test]
fn test_cron_scheduler_config_default() {
let config = CronSchedulerConfig::default();
assert_eq!(config.poll_interval, std::time::Duration::from_secs(30));
assert_eq!(config.max_catchup_executions, 100);
assert_eq!(
config.max_acceptable_delay,
std::time::Duration::from_secs(300)
);
}
#[test]
fn test_is_schedule_active() {
let (_shutdown_tx, _shutdown_rx) = watch::channel(false);
let _schedule = create_test_schedule("0 * * * *", "UTC");
let _now = Utc::now();
}
#[test]
fn test_calculate_execution_times_skip_policy() {
let _schedule = create_test_schedule("0 * * * *", "UTC");
let _now = Utc::now();
}
#[test]
fn test_calculate_execution_times_run_all_policy() {
let mut _schedule = create_test_schedule("0 * * * *", "UTC");
_schedule.catchup_policy = "run_all".to_string();
}
}