use crate::context::Context;
use crate::cron_evaluator::CronEvaluator;
use crate::dal::DAL;
use crate::database::universal_types::{UniversalTimestamp, UniversalUuid};
use crate::error::ValidationError;
use crate::executor::{WorkflowExecutionError, WorkflowExecutor};
use crate::models::schedule::{CatchupPolicy, NewSchedule, NewScheduleExecution, Schedule};
use crate::runtime::Runtime;
use crate::trigger::{Trigger, TriggerError};
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct SchedulerConfig {
pub cron_poll_interval: Duration,
pub max_catchup_executions: usize,
pub max_acceptable_delay: Duration,
pub trigger_base_poll_interval: Duration,
pub trigger_poll_timeout: Duration,
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
cron_poll_interval: Duration::from_secs(30),
max_catchup_executions: 100,
max_acceptable_delay: Duration::from_secs(300), trigger_base_poll_interval: Duration::from_secs(1),
trigger_poll_timeout: Duration::from_secs(30),
}
}
}
#[derive(Clone)]
pub struct Scheduler {
dal: Arc<DAL>,
executor: Arc<dyn WorkflowExecutor>,
config: SchedulerConfig,
shutdown: watch::Receiver<bool>,
runtime: Arc<Runtime>,
last_poll_times: HashMap<String, Instant>,
last_cron_check: Option<Instant>,
}
impl Scheduler {
pub fn new(
dal: Arc<DAL>,
executor: Arc<dyn WorkflowExecutor>,
config: SchedulerConfig,
shutdown: watch::Receiver<bool>,
runtime: Arc<Runtime>,
) -> Self {
Self {
dal,
executor,
config,
shutdown,
runtime,
last_poll_times: HashMap::new(),
last_cron_check: None,
}
}
pub fn with_defaults(
dal: Arc<DAL>,
executor: Arc<dyn WorkflowExecutor>,
shutdown: watch::Receiver<bool>,
runtime: Arc<Runtime>,
) -> Self {
Self::new(dal, executor, SchedulerConfig::default(), shutdown, runtime)
}
pub async fn run_polling_loop(&mut self) -> Result<(), WorkflowExecutionError> {
info!(
"Starting unified scheduler (cron interval: {:?}, trigger base interval: {:?})",
self.config.cron_poll_interval, self.config.trigger_base_poll_interval,
);
let mut interval = tokio::time::interval(self.config.trigger_base_poll_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = interval.tick() => {
let now = Instant::now();
let should_check_cron = match self.last_cron_check {
Some(last) => now.duration_since(last) >= self.config.cron_poll_interval,
None => true,
};
if should_check_cron {
self.last_cron_check = Some(now);
if let Err(e) = self.check_and_execute_cron_schedules().await {
error!("Error processing cron schedules: {}", e);
}
}
if let Err(e) = self.check_and_process_triggers().await {
error!("Error processing triggers: {}", e);
}
}
_ = self.shutdown.changed() => {
if *self.shutdown.borrow() {
info!("Unified scheduler received shutdown signal");
break;
}
}
}
}
info!("Unified scheduler polling loop stopped");
Ok(())
}
async fn check_and_execute_cron_schedules(&self) -> Result<(), WorkflowExecutionError> {
let now = Utc::now();
debug!("Checking for due cron schedules at {}", now);
let due_schedules = self
.dal
.schedule()
.get_due_cron_schedules(now)
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: e.to_string(),
})?;
if due_schedules.is_empty() {
debug!("No due cron schedules found");
return Ok(());
}
info!("Found {} due cron schedule(s)", due_schedules.len());
for schedule in due_schedules {
if let Err(e) = self.process_cron_schedule(&schedule, now).await {
error!("Failed to process cron schedule {}: {}", schedule.id, e);
}
}
Ok(())
}
async fn process_cron_schedule(
&self,
schedule: &Schedule,
now: DateTime<Utc>,
) -> Result<(), WorkflowExecutionError> {
debug!(
"Processing cron schedule: {} (workflow: {})",
schedule.id, schedule.workflow_name
);
if !self.is_cron_schedule_active(schedule, now) {
debug!(
"Cron schedule {} is outside its active time window, skipping",
schedule.id
);
return Ok(());
}
let execution_times = self.calculate_execution_times(schedule, now)?;
if execution_times.is_empty() {
debug!(
"No execution times calculated for cron schedule {}",
schedule.id
);
return Ok(());
}
let next_run = self.calculate_next_run(schedule, now)?;
let claimed = self
.dal
.schedule()
.claim_and_update_cron(schedule.id, now, now, next_run)
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: e.to_string(),
})?;
if !claimed {
debug!(
"Cron schedule {} was already claimed by another instance",
schedule.id
);
return Ok(());
}
info!(
"Successfully claimed cron schedule {} for {} execution(s)",
schedule.id,
execution_times.len()
);
for scheduled_time in execution_times {
let audit_record_id = match self
.create_cron_execution_audit(schedule.id, scheduled_time)
.await
{
Ok(id) => id,
Err(e) => {
error!(
"Failed to create execution audit for cron schedule {} at {}: {}",
schedule.id, scheduled_time, e
);
continue;
}
};
match self.execute_cron_workflow(schedule, scheduled_time).await {
Ok(workflow_execution_id) => {
if let Err(e) = self
.dal
.schedule_execution()
.update_workflow_execution_id(audit_record_id, workflow_execution_id)
.await
{
error!(
"Failed to complete audit trail for cron schedule {} execution: {}",
schedule.id, e
);
}
info!(
"Successfully executed and audited workflow {} for cron schedule {} (scheduled: {})",
schedule.workflow_name, schedule.id, scheduled_time
);
}
Err(e) => {
error!(
"Failed to execute workflow {} for cron schedule {} (scheduled: {}): {}",
schedule.workflow_name, schedule.id, scheduled_time, e
);
error!(
"Execution lost: audit record {} exists but workflow execution failed",
audit_record_id
);
}
}
}
Ok(())
}
fn is_cron_schedule_active(&self, schedule: &Schedule, now: DateTime<Utc>) -> bool {
if let Some(start) = &schedule.start_date {
if now < start.0 {
return false;
}
}
if let Some(end) = &schedule.end_date {
if now > end.0 {
return false;
}
}
true
}
fn calculate_execution_times(
&self,
schedule: &Schedule,
now: DateTime<Utc>,
) -> Result<Vec<DateTime<Utc>>, WorkflowExecutionError> {
let policy_str = schedule.catchup_policy.as_deref().unwrap_or("skip");
let policy = CatchupPolicy::from(policy_str.to_string());
match policy {
CatchupPolicy::Skip => {
let next_run = schedule.next_run_at.map(|t| t.0).unwrap_or(now);
Ok(vec![next_run])
}
CatchupPolicy::RunAll => {
let cron_expr = schedule.cron_expression.as_deref().unwrap_or("* * * * *");
let tz = schedule.timezone.as_deref().unwrap_or("UTC");
let evaluator = CronEvaluator::new(cron_expr, tz).map_err(|e| {
WorkflowExecutionError::ExecutionFailed {
message: format!("Cron evaluation error: {}", e),
}
})?;
let start_time = schedule
.last_run_at
.map(|t| t.0)
.unwrap_or(schedule.created_at.0);
let missed_executions = evaluator
.executions_between(start_time, now, self.config.max_catchup_executions)
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Cron evaluation error: {}", e),
})?;
if missed_executions.len() >= self.config.max_catchup_executions {
warn!(
"Limited catchup executions to {} for cron schedule {} (policy: RunAll)",
self.config.max_catchup_executions, schedule.id
);
}
Ok(missed_executions)
}
}
}
fn calculate_next_run(
&self,
schedule: &Schedule,
after: DateTime<Utc>,
) -> Result<DateTime<Utc>, WorkflowExecutionError> {
let cron_expr = schedule.cron_expression.as_deref().unwrap_or("* * * * *");
let tz = schedule.timezone.as_deref().unwrap_or("UTC");
let evaluator = CronEvaluator::new(cron_expr, tz).map_err(|e| {
WorkflowExecutionError::ExecutionFailed {
message: format!("Cron evaluation error: {}", e),
}
})?;
evaluator
.next_execution(after)
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Cron evaluation error: {}", e),
})
}
async fn execute_cron_workflow(
&self,
schedule: &Schedule,
scheduled_time: DateTime<Utc>,
) -> Result<UniversalUuid, WorkflowExecutionError> {
let mut context = Context::new();
context
.insert(
"scheduled_time",
serde_json::json!(scheduled_time.to_rfc3339()),
)
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert("schedule_id", serde_json::json!(schedule.id.to_string()))
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert(
"schedule_timezone",
serde_json::json!(schedule.timezone.as_deref().unwrap_or("UTC")),
)
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert(
"schedule_expression",
serde_json::json!(schedule.cron_expression.as_deref().unwrap_or("")),
)
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
info!(
"Executing workflow '{}' for cron schedule {} (scheduled time: {})",
schedule.workflow_name, schedule.id, scheduled_time
);
let workflow_result = self
.executor
.execute(&schedule.workflow_name, context)
.await?;
debug!(
"Successfully handed off workflow '{}' to executor (execution_id: {})",
schedule.workflow_name, workflow_result.execution_id
);
Ok(UniversalUuid(workflow_result.execution_id))
}
async fn create_cron_execution_audit(
&self,
schedule_id: UniversalUuid,
scheduled_time: DateTime<Utc>,
) -> Result<UniversalUuid, ValidationError> {
let new_execution = NewScheduleExecution {
schedule_id,
workflow_execution_id: None,
scheduled_time: Some(UniversalTimestamp(scheduled_time)),
claimed_at: Some(UniversalTimestamp(Utc::now())),
context_hash: None,
};
let audit_record = self.dal.schedule_execution().create(new_execution).await?;
debug!(
"Created cron execution audit record {} for schedule {} (scheduled: {})",
audit_record.id, schedule_id, scheduled_time
);
Ok(audit_record.id)
}
async fn check_and_process_triggers(&mut self) -> Result<(), WorkflowExecutionError> {
debug!("Checking trigger schedules");
let schedules = self
.dal
.schedule()
.get_enabled_triggers()
.await
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Failed to get trigger schedules: {}", e),
})?;
if schedules.is_empty() {
debug!("No enabled trigger schedules found");
return Ok(());
}
let now = Instant::now();
for schedule in schedules {
let trigger_name = schedule
.trigger_name
.as_deref()
.unwrap_or("unknown")
.to_string();
let poll_interval = schedule
.poll_interval()
.unwrap_or(self.config.trigger_base_poll_interval);
let last_poll = self.last_poll_times.get(&trigger_name);
let should_poll = match last_poll {
Some(last) => now.duration_since(*last) >= poll_interval,
None => true,
};
if !should_poll {
continue;
}
if let Err(e) = self.process_trigger(&schedule).await {
error!("Failed to process trigger '{}': {}", trigger_name, e);
}
self.last_poll_times.insert(trigger_name, now);
}
Ok(())
}
async fn process_trigger(&self, schedule: &Schedule) -> Result<(), TriggerError> {
let trigger_name = schedule.trigger_name.as_deref().unwrap_or("unknown");
debug!(
"Processing trigger '{}' (workflow: {})",
trigger_name, schedule.workflow_name
);
let trigger = self.runtime.get_trigger(trigger_name).ok_or_else(|| {
TriggerError::TriggerNotFound {
name: trigger_name.to_string(),
}
})?;
let poll_result = tokio::time::timeout(self.config.trigger_poll_timeout, trigger.poll())
.await
.map_err(|_| TriggerError::PollError {
message: format!(
"Trigger '{}' poll timed out after {:?}",
trigger_name, self.config.trigger_poll_timeout
),
})?
.map_err(|e| {
error!("Trigger '{}' poll error: {}", trigger_name, e);
e
})?;
let now = Utc::now();
if let Err(e) = self.dal.schedule().update_last_poll(schedule.id, now).await {
warn!(
"Failed to update last_poll_at for trigger '{}': {}",
trigger_name, e
);
}
if !poll_result.should_fire() {
debug!("Trigger '{}' returned Skip", trigger_name);
return Ok(());
}
let context_hash = poll_result.context_hash();
if !schedule.allows_concurrent() {
let has_active = self
.dal
.schedule_execution()
.has_active_execution(schedule.id, &context_hash)
.await
.map_err(|e| TriggerError::ConnectionPool(e.to_string()))?;
if has_active {
debug!(
"Trigger '{}' has active execution with same context hash, skipping",
trigger_name
);
return Ok(());
}
}
info!(
"Trigger '{}' fired, scheduling workflow '{}'",
trigger_name, schedule.workflow_name
);
let execution = self
.create_trigger_execution_audit(schedule.id, &context_hash)
.await?;
let context = poll_result.into_context().unwrap_or_else(Context::new);
match self.execute_trigger_workflow(schedule, context).await {
Ok(workflow_execution_id) => {
if let Err(e) = self
.dal
.schedule_execution()
.update_workflow_execution_id(execution.id, workflow_execution_id)
.await
{
warn!(
"Failed to link schedule execution to workflow execution: {}",
e
);
}
info!(
"Successfully scheduled workflow '{}' for trigger '{}' (execution: {})",
schedule.workflow_name, trigger_name, workflow_execution_id
);
}
Err(e) => {
error!(
"Failed to execute workflow '{}' for trigger '{}': {}",
schedule.workflow_name, trigger_name, e
);
if let Err(e) = self
.dal
.schedule_execution()
.complete(execution.id, Utc::now())
.await
{
warn!(
"Failed to mark schedule execution as completed after failure: {}",
e
);
}
return Err(TriggerError::WorkflowSchedulingFailed {
workflow: schedule.workflow_name.clone(),
message: e.to_string(),
});
}
}
Ok(())
}
async fn create_trigger_execution_audit(
&self,
schedule_id: UniversalUuid,
context_hash: &str,
) -> Result<crate::models::schedule::ScheduleExecution, TriggerError> {
let new_execution = NewScheduleExecution {
schedule_id,
workflow_execution_id: None,
scheduled_time: None,
claimed_at: None,
context_hash: Some(context_hash.to_string()),
};
let execution = self
.dal
.schedule_execution()
.create(new_execution)
.await
.map_err(|e| TriggerError::ConnectionPool(e.to_string()))?;
debug!(
"Created trigger execution audit record {} for schedule {}",
execution.id, schedule_id
);
Ok(execution)
}
async fn execute_trigger_workflow(
&self,
schedule: &Schedule,
mut context: Context<serde_json::Value>,
) -> Result<UniversalUuid, WorkflowExecutionError> {
let trigger_name = schedule.trigger_name.as_deref().unwrap_or("unknown");
context
.insert("trigger_name", serde_json::json!(trigger_name))
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
context
.insert("triggered_at", serde_json::json!(Utc::now().to_rfc3339()))
.map_err(|e| WorkflowExecutionError::ExecutionFailed {
message: format!("Context error: {}", e),
})?;
let result = self
.executor
.execute(&schedule.workflow_name, context)
.await?;
debug!(
"Successfully handed off workflow '{}' to executor (execution_id: {})",
schedule.workflow_name, result.execution_id
);
Ok(UniversalUuid(result.execution_id))
}
pub async fn register_trigger(
&self,
trigger: &dyn Trigger,
workflow_name: &str,
) -> Result<Schedule, ValidationError> {
let mut new_schedule =
NewSchedule::trigger(trigger.name(), workflow_name, trigger.poll_interval());
new_schedule.allow_concurrent = Some(crate::database::universal_types::UniversalBool::new(
trigger.allow_concurrent(),
));
self.dal.schedule().upsert_trigger(new_schedule).await
}
pub async fn disable_trigger(&self, trigger_name: &str) -> Result<(), ValidationError> {
if let Some(schedule) = self
.dal
.schedule()
.get_by_trigger_name(trigger_name)
.await?
{
self.dal.schedule().disable(schedule.id).await?;
info!("Disabled trigger '{}'", trigger_name);
}
Ok(())
}
pub async fn enable_trigger(&self, trigger_name: &str) -> Result<(), ValidationError> {
if let Some(schedule) = self
.dal
.schedule()
.get_by_trigger_name(trigger_name)
.await?
{
self.dal.schedule().enable(schedule.id).await?;
info!("Enabled trigger '{}'", trigger_name);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::database::universal_types::{current_timestamp, UniversalBool};
fn create_test_cron_schedule(cron_expr: &str, timezone: &str) -> Schedule {
let now = current_timestamp();
Schedule {
id: UniversalUuid::new_v4(),
schedule_type: "cron".to_string(),
workflow_name: "test_workflow".to_string(),
enabled: UniversalBool::new(true),
cron_expression: Some(cron_expr.to_string()),
timezone: Some(timezone.to_string()),
catchup_policy: Some("skip".to_string()),
start_date: None,
end_date: None,
trigger_name: None,
poll_interval_ms: None,
allow_concurrent: None,
next_run_at: Some(now),
last_run_at: None,
last_poll_at: None,
created_at: now,
updated_at: now,
}
}
fn create_test_trigger_schedule(trigger_name: &str) -> Schedule {
let now = current_timestamp();
Schedule {
id: UniversalUuid::new_v4(),
schedule_type: "trigger".to_string(),
workflow_name: "test_workflow".to_string(),
enabled: UniversalBool::new(true),
cron_expression: None,
timezone: None,
catchup_policy: None,
start_date: None,
end_date: None,
trigger_name: Some(trigger_name.to_string()),
poll_interval_ms: Some(5000),
allow_concurrent: Some(UniversalBool::new(false)),
next_run_at: None,
last_run_at: None,
last_poll_at: None,
created_at: now,
updated_at: now,
}
}
#[test]
fn test_scheduler_config_default() {
let config = SchedulerConfig::default();
assert_eq!(config.cron_poll_interval, Duration::from_secs(30));
assert_eq!(config.max_catchup_executions, 100);
assert_eq!(config.max_acceptable_delay, Duration::from_secs(300));
assert_eq!(config.trigger_base_poll_interval, Duration::from_secs(1));
assert_eq!(config.trigger_poll_timeout, Duration::from_secs(30));
}
#[test]
fn test_is_cron_schedule_active_no_window() {
let schedule = create_test_cron_schedule("0 * * * *", "UTC");
let now = Utc::now();
let config = SchedulerConfig::default();
let (_shutdown_tx, shutdown_rx) = watch::channel(false);
assert!(schedule.start_date.is_none());
assert!(schedule.end_date.is_none());
let active = schedule.start_date.as_ref().is_none_or(|s| now >= s.0)
&& schedule.end_date.as_ref().is_none_or(|e| now <= e.0);
assert!(active);
let _ = config;
let _ = shutdown_rx;
}
#[test]
fn test_is_cron_schedule_active_with_start_date_future() {
let mut schedule = create_test_cron_schedule("0 * * * *", "UTC");
let future = Utc::now() + chrono::Duration::hours(1);
schedule.start_date = Some(UniversalTimestamp(future));
let now = Utc::now();
let active = schedule.start_date.as_ref().is_none_or(|s| now >= s.0)
&& schedule.end_date.as_ref().is_none_or(|e| now <= e.0);
assert!(!active);
}
#[test]
fn test_is_cron_schedule_active_with_end_date_past() {
let mut schedule = create_test_cron_schedule("0 * * * *", "UTC");
let past = Utc::now() - chrono::Duration::hours(1);
schedule.end_date = Some(UniversalTimestamp(past));
let now = Utc::now();
let active = schedule.start_date.as_ref().is_none_or(|s| now >= s.0)
&& schedule.end_date.as_ref().is_none_or(|e| now <= e.0);
assert!(!active);
}
#[test]
fn test_catchup_policy_from_schedule() {
let schedule = create_test_cron_schedule("0 * * * *", "UTC");
let policy_str = schedule.catchup_policy.as_deref().unwrap_or("skip");
let policy = CatchupPolicy::from(policy_str.to_string());
assert_eq!(policy, CatchupPolicy::Skip);
}
#[test]
fn test_catchup_policy_run_all() {
let mut schedule = create_test_cron_schedule("0 * * * *", "UTC");
schedule.catchup_policy = Some("run_all".to_string());
let policy_str = schedule.catchup_policy.as_deref().unwrap_or("skip");
let policy = CatchupPolicy::from(policy_str.to_string());
assert_eq!(policy, CatchupPolicy::RunAll);
}
#[test]
fn test_trigger_schedule_helpers() {
let schedule = create_test_trigger_schedule("file_watcher");
assert!(schedule.is_trigger());
assert!(!schedule.is_cron());
assert!(schedule.is_enabled());
assert_eq!(schedule.poll_interval(), Some(Duration::from_secs(5)));
assert!(!schedule.allows_concurrent());
}
#[test]
fn test_trigger_schedule_trigger_name_fallback() {
let mut schedule = create_test_trigger_schedule("file_watcher");
assert_eq!(
schedule.trigger_name.as_deref().unwrap_or("unknown"),
"file_watcher"
);
schedule.trigger_name = None;
assert_eq!(
schedule.trigger_name.as_deref().unwrap_or("unknown"),
"unknown"
);
}
#[test]
fn test_scheduler_config_custom() {
let config = SchedulerConfig {
cron_poll_interval: Duration::from_secs(60),
max_catchup_executions: 50,
max_acceptable_delay: Duration::from_secs(120),
trigger_base_poll_interval: Duration::from_secs(5),
trigger_poll_timeout: Duration::from_secs(10),
};
assert_eq!(config.cron_poll_interval, Duration::from_secs(60));
assert_eq!(config.max_catchup_executions, 50);
assert_eq!(config.max_acceptable_delay, Duration::from_secs(120));
assert_eq!(config.trigger_base_poll_interval, Duration::from_secs(5));
assert_eq!(config.trigger_poll_timeout, Duration::from_secs(10));
}
#[test]
fn test_scheduler_config_clone() {
let config = SchedulerConfig::default();
let cloned = config.clone();
assert_eq!(cloned.cron_poll_interval, config.cron_poll_interval);
assert_eq!(cloned.max_catchup_executions, config.max_catchup_executions);
assert_eq!(cloned.max_acceptable_delay, config.max_acceptable_delay);
assert_eq!(
cloned.trigger_base_poll_interval,
config.trigger_base_poll_interval
);
assert_eq!(cloned.trigger_poll_timeout, config.trigger_poll_timeout);
}
#[test]
fn test_scheduler_config_debug() {
let config = SchedulerConfig::default();
let debug_str = format!("{:?}", config);
assert!(debug_str.contains("SchedulerConfig"));
assert!(debug_str.contains("cron_poll_interval"));
}
#[test]
fn test_is_cron_schedule_active_both_bounds_containing_now() {
let mut schedule = create_test_cron_schedule("0 * * * *", "UTC");
let past = Utc::now() - chrono::Duration::hours(1);
let future = Utc::now() + chrono::Duration::hours(1);
schedule.start_date = Some(UniversalTimestamp(past));
schedule.end_date = Some(UniversalTimestamp(future));
let now = Utc::now();
let active = schedule.start_date.as_ref().is_none_or(|s| now >= s.0)
&& schedule.end_date.as_ref().is_none_or(|e| now <= e.0);
assert!(active);
}
#[test]
fn test_is_cron_schedule_active_both_bounds_excluding_now() {
let mut schedule = create_test_cron_schedule("0 * * * *", "UTC");
let future1 = Utc::now() + chrono::Duration::hours(1);
let future2 = Utc::now() + chrono::Duration::hours(2);
schedule.start_date = Some(UniversalTimestamp(future1));
schedule.end_date = Some(UniversalTimestamp(future2));
let now = Utc::now();
let active = schedule.start_date.as_ref().is_none_or(|s| now >= s.0)
&& schedule.end_date.as_ref().is_none_or(|e| now <= e.0);
assert!(!active);
}
#[test]
fn test_catchup_policy_unknown_defaults_to_skip() {
let policy = CatchupPolicy::from("unknown_policy".to_string());
assert_eq!(policy, CatchupPolicy::Skip);
}
#[test]
fn test_catchup_policy_none_defaults_to_skip() {
let schedule = create_test_cron_schedule("0 * * * *", "UTC");
let policy_str = schedule.catchup_policy.as_deref().unwrap_or("skip");
assert_eq!(policy_str, "skip");
}
#[test]
fn test_catchup_policy_missing_defaults_correctly() {
let mut schedule = create_test_cron_schedule("0 * * * *", "UTC");
schedule.catchup_policy = None;
let policy_str = schedule.catchup_policy.as_deref().unwrap_or("skip");
let policy = CatchupPolicy::from(policy_str.to_string());
assert_eq!(policy, CatchupPolicy::Skip);
}
#[test]
fn test_cron_schedule_helpers() {
let schedule = create_test_cron_schedule("*/5 * * * *", "America/New_York");
assert!(schedule.is_cron());
assert!(!schedule.is_trigger());
assert!(schedule.is_enabled());
assert_eq!(schedule.cron_expression.as_deref(), Some("*/5 * * * *"));
assert_eq!(schedule.timezone.as_deref(), Some("America/New_York"));
}
#[test]
fn test_trigger_schedule_no_poll_interval() {
let mut schedule = create_test_trigger_schedule("webhook");
schedule.poll_interval_ms = None;
assert_eq!(schedule.poll_interval(), None);
}
#[test]
fn test_trigger_schedule_allows_concurrent() {
let mut schedule = create_test_trigger_schedule("queue_trigger");
schedule.allow_concurrent = Some(UniversalBool::new(true));
assert!(schedule.allows_concurrent());
}
#[test]
fn test_trigger_schedule_no_concurrent_flag_defaults_false() {
let mut schedule = create_test_trigger_schedule("queue_trigger");
schedule.allow_concurrent = None;
assert!(!schedule.allows_concurrent());
}
}