use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use chrono::{DateTime, Utc};
use cron::Schedule;
use parking_lot::RwLock;
use rand::Rng;
use tokio::sync::Notify;
use tokio::time::interval;
use super::cron_types::*;
use super::job_store::{JobStore, JobStoreError, SqliteJobStore};
use super::{AgentScheduler, DefaultAgentScheduler};
use crate::types::ExecutionMode;
#[derive(Debug, Clone)]
pub struct CronSchedulerConfig {
pub tick_interval: Duration,
pub max_concurrent_cron_jobs: usize,
pub job_store_path: Option<std::path::PathBuf>,
pub enable_missed_run_catchup: bool,
}
impl Default for CronSchedulerConfig {
fn default() -> Self {
Self {
tick_interval: Duration::from_secs(1),
max_concurrent_cron_jobs: 100,
job_store_path: None,
enable_missed_run_catchup: true,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum CronSchedulerError {
#[error("invalid cron expression (expected 6-field: sec min hour day month weekday): {0}")]
InvalidCron(String),
#[error("invalid timezone: {0}")]
InvalidTimezone(String),
#[error("job store error: {0}")]
Store(#[from] JobStoreError),
#[error("scheduler error: {0}")]
Scheduler(String),
#[error("job not found: {0}")]
NotFound(CronJobId),
#[error("identity verification failed for job {0}: {1}")]
IdentityVerificationFailed(CronJobId, String),
}
#[derive(Debug, Clone, Default)]
pub struct CronMetrics {
pub jobs_total: u64,
pub jobs_active: u64,
pub jobs_paused: u64,
pub jobs_dead_letter: u64,
pub runs_total: u64,
pub runs_succeeded: u64,
pub runs_failed: u64,
pub runs_skipped_concurrency: u64,
pub runs_skipped_identity: u64,
pub average_execution_time_ms: f64,
pub longest_run_ms: u64,
}
pub struct CronScheduler {
store: Arc<SqliteJobStore>,
agent_scheduler: Arc<DefaultAgentScheduler>,
config: CronSchedulerConfig,
shutdown_notify: Arc<Notify>,
is_running: Arc<RwLock<bool>>,
active_runs: Arc<RwLock<usize>>,
per_job_active: Arc<RwLock<HashMap<CronJobId, usize>>>,
metrics: Arc<RwLock<CronMetrics>>,
}
impl CronScheduler {
pub async fn new(
config: CronSchedulerConfig,
agent_scheduler: Arc<DefaultAgentScheduler>,
) -> Result<Self, CronSchedulerError> {
let path = config
.job_store_path
.clone()
.unwrap_or_else(SqliteJobStore::default_path);
let store = Arc::new(SqliteJobStore::open(&path)?);
let scheduler = Self {
store,
agent_scheduler,
config,
shutdown_notify: Arc::new(Notify::new()),
is_running: Arc::new(RwLock::new(true)),
active_runs: Arc::new(RwLock::new(0)),
per_job_active: Arc::new(RwLock::new(HashMap::new())),
metrics: Arc::new(RwLock::new(CronMetrics::default())),
};
scheduler.start_tick_loop();
Ok(scheduler)
}
#[cfg(test)]
pub async fn new_in_memory(
config: CronSchedulerConfig,
agent_scheduler: Arc<DefaultAgentScheduler>,
) -> Result<Self, CronSchedulerError> {
let store = Arc::new(SqliteJobStore::open_in_memory()?);
let scheduler = Self {
store,
agent_scheduler,
config,
shutdown_notify: Arc::new(Notify::new()),
is_running: Arc::new(RwLock::new(true)),
active_runs: Arc::new(RwLock::new(0)),
per_job_active: Arc::new(RwLock::new(HashMap::new())),
metrics: Arc::new(RwLock::new(CronMetrics::default())),
};
scheduler.start_tick_loop();
Ok(scheduler)
}
pub async fn add_job(
&self,
mut job: CronJobDefinition,
) -> Result<CronJobId, CronSchedulerError> {
Self::parse_cron(&job.cron_expression)?;
Self::validate_timezone(&job.timezone)?;
job.next_run = self.compute_next_run(&job.cron_expression, &job.timezone, None)?;
self.store.save_job(&job).await?;
tracing::info!(
"Added cron job {} ({}) — next run: {:?}",
job.job_id,
job.name,
job.next_run
);
Ok(job.job_id)
}
pub async fn remove_job(&self, job_id: CronJobId) -> Result<(), CronSchedulerError> {
if !self.store.delete_job(job_id).await? {
return Err(CronSchedulerError::NotFound(job_id));
}
tracing::info!("Removed cron job {}", job_id);
Ok(())
}
pub async fn pause_job(&self, job_id: CronJobId) -> Result<(), CronSchedulerError> {
let mut job = self
.store
.get_job(job_id)
.await?
.ok_or(CronSchedulerError::NotFound(job_id))?;
job.status = CronJobStatus::Paused;
job.enabled = false;
job.updated_at = Utc::now();
self.store.save_job(&job).await?;
tracing::info!("Paused cron job {}", job_id);
Ok(())
}
pub async fn resume_job(&self, job_id: CronJobId) -> Result<(), CronSchedulerError> {
let mut job = self
.store
.get_job(job_id)
.await?
.ok_or(CronSchedulerError::NotFound(job_id))?;
job.status = CronJobStatus::Active;
job.enabled = true;
job.failure_count = 0; job.next_run = self.compute_next_run(&job.cron_expression, &job.timezone, None)?;
job.updated_at = Utc::now();
self.store.save_job(&job).await?;
tracing::info!("Resumed cron job {} — next run: {:?}", job_id, job.next_run);
Ok(())
}
pub async fn update_job(&self, mut job: CronJobDefinition) -> Result<(), CronSchedulerError> {
Self::parse_cron(&job.cron_expression)?;
Self::validate_timezone(&job.timezone)?;
job.next_run = self.compute_next_run(&job.cron_expression, &job.timezone, None)?;
job.updated_at = Utc::now();
self.store.save_job(&job).await?;
Ok(())
}
pub async fn get_job(
&self,
job_id: CronJobId,
) -> Result<CronJobDefinition, CronSchedulerError> {
self.store
.get_job(job_id)
.await?
.ok_or(CronSchedulerError::NotFound(job_id))
}
pub async fn list_jobs(&self) -> Result<Vec<CronJobDefinition>, CronSchedulerError> {
Ok(self.store.list_jobs(None).await?)
}
pub fn get_next_runs(
&self,
cron_expression: &str,
timezone: &str,
count: usize,
) -> Result<Vec<DateTime<Utc>>, CronSchedulerError> {
let schedule = Self::parse_cron(cron_expression)?;
let tz: chrono_tz::Tz = timezone
.parse()
.map_err(|_| CronSchedulerError::InvalidTimezone(timezone.to_string()))?;
let now = Utc::now().with_timezone(&tz);
let runs: Vec<DateTime<Utc>> = schedule
.after(&now)
.take(count)
.map(|dt| dt.with_timezone(&Utc))
.collect();
Ok(runs)
}
pub async fn trigger_now(&self, job_id: CronJobId) -> Result<(), CronSchedulerError> {
let job = self
.store
.get_job(job_id)
.await?
.ok_or(CronSchedulerError::NotFound(job_id))?;
tracing::info!("Force-triggering cron job {} ({})", job_id, job.name);
let mut run_config = job.agent_config.clone();
run_config.execution_mode = ExecutionMode::CronScheduled {
cron_expression: job.cron_expression.clone(),
timezone: job.timezone.clone(),
};
run_config
.metadata
.insert("trigger".to_string(), "cron_manual".to_string());
run_config
.metadata
.insert("cron_job_id".to_string(), job.job_id.to_string());
run_config
.metadata
.insert("session_id".to_string(), uuid::Uuid::new_v4().to_string());
run_config.metadata.insert(
"session_mode".to_string(),
format!("{:?}", job.session_mode),
);
let started_at = Utc::now();
let run_id = uuid::Uuid::new_v4();
let agent_id = job.agent_config.id;
let result = self.agent_scheduler.schedule_agent(run_config).await;
let (status, error) = match &result {
Ok(_) => (JobRunStatus::Succeeded, None),
Err(e) => (JobRunStatus::Failed, Some(e.to_string())),
};
let completed_at = Utc::now();
let exec_ms = (completed_at - started_at).num_milliseconds().max(0) as u64;
let record = JobRunRecord {
run_id,
job_id: job.job_id,
agent_id,
started_at,
completed_at: Some(completed_at),
status,
error,
execution_time_ms: Some(exec_ms),
};
self.store.save_run_record(&record).await?;
result
.map(|_| ())
.map_err(|e| CronSchedulerError::Scheduler(e.to_string()))
}
pub async fn shutdown(&self) {
{
let is_running = self.is_running.read();
if !*is_running {
return;
}
}
*self.is_running.write() = false;
self.shutdown_notify.notify_waiters();
tracing::info!("CronScheduler shutdown complete");
}
pub async fn get_run_history(
&self,
job_id: CronJobId,
limit: usize,
) -> Result<Vec<JobRunRecord>, CronSchedulerError> {
Ok(self.store.get_run_history(job_id, limit).await?)
}
pub fn metrics(&self) -> CronMetrics {
self.metrics.read().clone()
}
pub async fn check_health(&self) -> Result<CronSchedulerHealth, CronSchedulerError> {
let jobs = self.store.list_jobs(None).await?;
let active = jobs
.iter()
.filter(|j| j.status == CronJobStatus::Active)
.count();
let paused = jobs
.iter()
.filter(|j| j.status == CronJobStatus::Paused)
.count();
let dead = jobs
.iter()
.filter(|j| j.status == CronJobStatus::DeadLetter)
.count();
Ok(CronSchedulerHealth {
is_running: *self.is_running.read(),
store_accessible: true,
jobs_total: jobs.len(),
jobs_active: active,
jobs_paused: paused,
jobs_dead_letter: dead,
global_active_runs: *self.active_runs.read(),
max_concurrent: self.config.max_concurrent_cron_jobs,
})
}
fn start_tick_loop(&self) {
let store = self.store.clone();
let agent_scheduler = self.agent_scheduler.clone();
let shutdown = self.shutdown_notify.clone();
let is_running = self.is_running.clone();
let tick = self.config.tick_interval;
let max_concurrent = self.config.max_concurrent_cron_jobs;
let active_runs = self.active_runs.clone();
let per_job_active = self.per_job_active.clone();
let metrics = self.metrics.clone();
tokio::spawn(async move {
let mut ticker = interval(tick);
loop {
tokio::select! {
_ = ticker.tick() => {
if !*is_running.read() {
break;
}
let current_active = *active_runs.read();
if current_active >= max_concurrent {
tracing::debug!("CronScheduler: at capacity ({}/{}), skipping tick",
current_active, max_concurrent);
continue;
}
let now = Utc::now();
let due_jobs = match store.get_due_jobs(now).await {
Ok(jobs) => jobs,
Err(e) => {
tracing::error!("CronScheduler: failed to query due jobs: {}", e);
continue;
}
};
for job in due_jobs {
let remaining = max_concurrent.saturating_sub(*active_runs.read());
if remaining == 0 {
break;
}
{
let pja = per_job_active.read();
let job_active = pja.get(&job.job_id).copied().unwrap_or(0);
if job_active >= job.max_concurrent as usize {
tracing::debug!(
"CronScheduler: job {} at per-job concurrency limit ({}/{}), skipping",
job.job_id, job_active, job.max_concurrent
);
metrics.write().runs_skipped_concurrency += 1;
continue;
}
}
let next_run = compute_next_run_static(
&job.cron_expression,
&job.timezone,
Some(now),
);
let new_status = if job.one_shot {
CronJobStatus::Completed
} else {
CronJobStatus::Active
};
let enabled = !job.one_shot;
if let Err(e) = store
.update_run_state(
job.job_id,
now,
next_run,
job.run_count + 1,
new_status,
enabled,
)
.await
{
tracing::error!("CronScheduler: failed to update run state for {}: {}", job.job_id, e);
continue;
}
let store_c = store.clone();
let sched_c = agent_scheduler.clone();
let active_c = active_runs.clone();
let pja_c = per_job_active.clone();
let metrics_c = metrics.clone();
*active_c.write() += 1;
{
let mut pja = pja_c.write();
*pja.entry(job.job_id).or_insert(0) += 1;
}
tokio::spawn(async move {
if job.jitter_max_secs > 0 {
let jitter_ms = {
let mut rng = rand::thread_rng();
rng.gen_range(0..=(job.jitter_max_secs as u64 * 1000))
};
tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
}
let started_at = Utc::now();
let run_id = uuid::Uuid::new_v4();
let agent_id = job.agent_config.id;
let session_id = uuid::Uuid::new_v4();
let mut run_config = job.agent_config.clone();
run_config.execution_mode = ExecutionMode::CronScheduled {
cron_expression: job.cron_expression.clone(),
timezone: job.timezone.clone(),
};
run_config.metadata.insert(
"trigger".to_string(),
"cron".to_string(),
);
run_config.metadata.insert(
"cron_job_id".to_string(),
job.job_id.to_string(),
);
run_config.metadata.insert(
"cron_expression".to_string(),
job.cron_expression.clone(),
);
run_config.metadata.insert(
"session_id".to_string(),
session_id.to_string(),
);
run_config.metadata.insert(
"session_mode".to_string(),
format!("{:?}", job.session_mode),
);
let result: Result<crate::types::AgentId, crate::types::SchedulerError> =
sched_c.schedule_agent(run_config).await;
let (status, error) = match &result {
Ok(_) => (JobRunStatus::Succeeded, None),
Err(e) => (JobRunStatus::Failed, Some(e.to_string())),
};
let completed_at = Utc::now();
let exec_ms = (completed_at - started_at).num_milliseconds().max(0) as u64;
{
let mut m = metrics_c.write();
m.runs_total += 1;
match status {
JobRunStatus::Succeeded => m.runs_succeeded += 1,
JobRunStatus::Failed => m.runs_failed += 1,
_ => {}
}
if exec_ms > m.longest_run_ms {
m.longest_run_ms = exec_ms;
}
if m.runs_total > 0 {
m.average_execution_time_ms = m.average_execution_time_ms
+ (exec_ms as f64 - m.average_execution_time_ms)
/ m.runs_total as f64;
}
}
let record = JobRunRecord {
run_id,
job_id: job.job_id,
agent_id,
started_at,
completed_at: Some(completed_at),
status,
error: error.clone(),
execution_time_ms: Some(exec_ms),
};
if let Err(e) = store_c.save_run_record(&record).await {
tracing::error!("CronScheduler: failed to save run record: {}", e);
}
if result.is_err() {
let new_fail = job.failure_count + 1;
let fail_status = if new_fail >= job.max_retries as u64 {
tracing::warn!(
"CronScheduler: job {} exceeded max_retries ({}), moving to dead letter",
job.job_id, job.max_retries
);
CronJobStatus::DeadLetter
} else {
CronJobStatus::Active
};
let _ = store_c.record_failure(job.job_id, new_fail, fail_status).await;
}
*active_c.write() -= 1;
{
let mut pja = pja_c.write();
if let Some(count) = pja.get_mut(&job.job_id) {
*count = count.saturating_sub(1);
if *count == 0 {
pja.remove(&job.job_id);
}
}
}
});
}
}
_ = shutdown.notified() => {
tracing::info!("CronScheduler tick loop shutting down");
break;
}
}
}
});
}
fn parse_cron(expr: &str) -> Result<Schedule, CronSchedulerError> {
Schedule::from_str(expr)
.map_err(|e| CronSchedulerError::InvalidCron(format!("{expr}: {e}")))
}
fn validate_timezone(tz: &str) -> Result<chrono_tz::Tz, CronSchedulerError> {
tz.parse::<chrono_tz::Tz>()
.map_err(|_| CronSchedulerError::InvalidTimezone(tz.to_string()))
}
fn compute_next_run(
&self,
cron_expression: &str,
timezone: &str,
after: Option<DateTime<Utc>>,
) -> Result<Option<DateTime<Utc>>, CronSchedulerError> {
Ok(compute_next_run_static(cron_expression, timezone, after))
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CronSchedulerHealth {
pub is_running: bool,
pub store_accessible: bool,
pub jobs_total: usize,
pub jobs_active: usize,
pub jobs_paused: usize,
pub jobs_dead_letter: usize,
pub global_active_runs: usize,
pub max_concurrent: usize,
}
fn compute_next_run_static(
cron_expression: &str,
timezone: &str,
after: Option<DateTime<Utc>>,
) -> Option<DateTime<Utc>> {
let schedule = Schedule::from_str(cron_expression).ok()?;
let tz: chrono_tz::Tz = timezone.parse().ok()?;
let reference = after.unwrap_or_else(Utc::now).with_timezone(&tz);
schedule
.after(&reference)
.next()
.map(|dt| dt.with_timezone(&Utc))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scheduler::heartbeat::HeartbeatContextMode;
use crate::scheduler::SchedulerConfig;
use crate::types::{AgentConfig, AgentId, Priority, ResourceLimits, SecurityTier};
use std::collections::HashMap;
fn test_agent_config() -> AgentConfig {
AgentConfig {
id: AgentId::new(),
name: "cron_agent".to_string(),
dsl_source: "agent cron_agent {}".to_string(),
execution_mode: ExecutionMode::Ephemeral,
security_tier: SecurityTier::Tier1,
resource_limits: ResourceLimits::default(),
capabilities: vec![],
policies: vec![],
metadata: HashMap::new(),
priority: Priority::Normal,
}
}
async fn make_scheduler() -> (CronScheduler, Arc<DefaultAgentScheduler>) {
let sched = Arc::new(
DefaultAgentScheduler::new(SchedulerConfig::default())
.await
.unwrap(),
);
let config = CronSchedulerConfig {
tick_interval: Duration::from_millis(100),
..Default::default()
};
let store = Arc::new(SqliteJobStore::open_in_memory().unwrap());
let cron = CronScheduler {
store,
agent_scheduler: sched.clone(),
config,
shutdown_notify: Arc::new(Notify::new()),
is_running: Arc::new(RwLock::new(true)),
active_runs: Arc::new(RwLock::new(0)),
per_job_active: Arc::new(RwLock::new(HashMap::new())),
metrics: Arc::new(RwLock::new(CronMetrics::default())),
};
cron.start_tick_loop();
(cron, sched)
}
#[test]
fn parse_valid_seven_field_cron_expressions() {
assert!(CronScheduler::parse_cron("0 * * * * * *").is_ok());
assert!(CronScheduler::parse_cron("0 */5 * * * * *").is_ok());
assert!(CronScheduler::parse_cron("0 0 12 * * Mon 2027").is_ok());
}
#[test]
fn parse_valid_six_field_cron_expressions() {
assert!(CronScheduler::parse_cron("0 * * * * *").is_ok());
assert!(CronScheduler::parse_cron("0 */5 * * * *").is_ok());
assert!(CronScheduler::parse_cron("*/30 * * * * *").is_ok());
assert!(CronScheduler::parse_cron("0 0 9 * * Mon-Fri").is_ok());
assert!(CronScheduler::parse_cron("0 0 0 1 * *").is_ok());
}
#[test]
fn reject_invalid_cron() {
assert!(CronScheduler::parse_cron("not a cron").is_err());
}
#[test]
fn reject_five_field_unix_cron() {
assert!(CronScheduler::parse_cron("*/5 * * * *").is_err());
assert!(CronScheduler::parse_cron("0 12 * * Mon").is_err());
}
#[test]
fn reject_empty_and_whitespace_cron() {
assert!(CronScheduler::parse_cron("").is_err());
assert!(CronScheduler::parse_cron(" ").is_err());
}
#[test]
fn error_message_includes_format_hint() {
let err = CronScheduler::parse_cron("bad").unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("6-field"),
"error should mention 6-field format, got: {msg}"
);
assert!(
msg.contains("sec min hour"),
"error should include field names, got: {msg}"
);
}
#[test]
fn validate_timezone() {
assert!(CronScheduler::validate_timezone("UTC").is_ok());
assert!(CronScheduler::validate_timezone("America/New_York").is_ok());
assert!(CronScheduler::validate_timezone("Asia/Kathmandu").is_ok());
assert!(CronScheduler::validate_timezone("Bogus/Zone").is_err());
}
#[test]
fn compute_next_run_returns_future_time_seven_field() {
let next = compute_next_run_static("0 * * * * * *", "UTC", None);
assert!(next.is_some());
assert!(next.unwrap() > Utc::now());
}
#[test]
fn compute_next_run_returns_future_time_six_field() {
let next = compute_next_run_static("0 * * * * *", "UTC", None);
assert!(next.is_some());
assert!(next.unwrap() > Utc::now());
}
#[test]
fn compute_next_run_respects_after_parameter() {
let reference = Utc::now() + chrono::Duration::hours(1);
let next = compute_next_run_static("0 * * * * *", "UTC", Some(reference));
assert!(next.is_some());
assert!(next.unwrap() > reference);
}
#[test]
fn compute_next_run_with_different_timezones() {
let utc_next = compute_next_run_static("0 0 12 * * *", "UTC", None);
let eastern_next = compute_next_run_static("0 0 12 * * *", "America/New_York", None);
assert!(utc_next.is_some());
assert!(eastern_next.is_some());
assert_ne!(utc_next.unwrap(), eastern_next.unwrap());
}
#[test]
fn compute_next_run_returns_none_for_invalid_expression() {
let next = compute_next_run_static("bad", "UTC", None);
assert!(next.is_none());
}
#[test]
fn compute_next_run_returns_none_for_invalid_timezone() {
let next = compute_next_run_static("0 * * * * *", "Mars/Olympus", None);
assert!(next.is_none());
}
#[test]
fn get_next_runs_returns_multiple() {
let runs = {
let schedule = Schedule::from_str("0 * * * * *").unwrap();
let tz: chrono_tz::Tz = "UTC".parse().unwrap();
let now = Utc::now().with_timezone(&tz);
schedule
.after(&now)
.take(5)
.map(|dt| dt.with_timezone(&Utc))
.collect::<Vec<_>>()
};
assert_eq!(runs.len(), 5);
for pair in runs.windows(2) {
assert!(pair[1] > pair[0]);
}
}
#[test]
fn six_and_seven_field_produce_equivalent_schedules() {
let now = Some(Utc::now());
let six = compute_next_run_static("0 */5 * * * *", "UTC", now);
let seven = compute_next_run_static("0 */5 * * * * *", "UTC", now);
assert_eq!(six, seven);
}
#[tokio::test]
async fn add_and_list_jobs() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"test_job".to_string(),
"0 * * * * *".to_string(), "UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
let jobs = cron.list_jobs().await.unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].job_id, id);
assert!(jobs[0].next_run.is_some());
cron.shutdown().await;
}
#[tokio::test]
async fn pause_and_resume() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"pause_test".to_string(),
"0 * * * * *".to_string(), "UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
cron.pause_job(id).await.unwrap();
let paused = cron.get_job(id).await.unwrap();
assert_eq!(paused.status, CronJobStatus::Paused);
assert!(!paused.enabled);
cron.resume_job(id).await.unwrap();
let resumed = cron.get_job(id).await.unwrap();
assert_eq!(resumed.status, CronJobStatus::Active);
assert!(resumed.enabled);
assert!(resumed.next_run.is_some());
cron.shutdown().await;
}
#[tokio::test]
async fn remove_job() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"remove_test".to_string(),
"0 * * * * *".to_string(), "UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
cron.remove_job(id).await.unwrap();
assert!(matches!(
cron.get_job(id).await,
Err(CronSchedulerError::NotFound(_))
));
cron.shutdown().await;
}
#[tokio::test]
async fn one_shot_lifecycle() {
let (cron, _sched) = make_scheduler().await;
let mut job = CronJobDefinition::new(
"one_shot".to_string(),
"* * * * * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
job.one_shot = true;
let id = cron.add_job(job).await.unwrap();
cron.store
.update_run_state(
id,
Utc::now(),
Some(Utc::now() - chrono::Duration::seconds(5)),
0,
CronJobStatus::Active,
true,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let loaded = cron.get_job(id).await.unwrap();
assert_eq!(loaded.status, CronJobStatus::Completed);
assert!(!loaded.enabled);
cron.shutdown().await;
}
#[tokio::test]
async fn trigger_now_fires_immediately() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"trigger_now".to_string(),
"0 0 0 1 1 * 2099".to_string(), "UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
cron.trigger_now(id).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let history = cron.get_run_history(id, 10).await.unwrap();
assert!(!history.is_empty());
cron.shutdown().await;
}
#[tokio::test]
async fn reject_invalid_cron_on_add() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"bad_cron".to_string(),
"invalid".to_string(),
"UTC".to_string(),
test_agent_config(),
);
assert!(cron.add_job(job).await.is_err());
cron.shutdown().await;
}
#[tokio::test]
async fn reject_five_field_cron_on_add() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"five_field".to_string(),
"*/5 * * * *".to_string(), "UTC".to_string(),
test_agent_config(),
);
assert!(cron.add_job(job).await.is_err());
cron.shutdown().await;
}
#[tokio::test]
async fn reject_invalid_timezone_on_add() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"bad_tz".to_string(),
"0 * * * * *".to_string(),
"Mars/Olympus".to_string(),
test_agent_config(),
);
assert!(cron.add_job(job).await.is_err());
cron.shutdown().await;
}
#[tokio::test]
async fn shutdown_is_idempotent() {
let (cron, _sched) = make_scheduler().await;
cron.shutdown().await;
cron.shutdown().await; }
#[tokio::test]
async fn metrics_increment_on_runs() {
let (cron, _sched) = make_scheduler().await;
let m = cron.metrics();
assert_eq!(m.runs_total, 0);
assert_eq!(m.runs_succeeded, 0);
cron.shutdown().await;
}
#[tokio::test]
async fn health_check_returns_valid() {
let (cron, _sched) = make_scheduler().await;
let health = cron.check_health().await.unwrap();
assert!(health.is_running);
assert!(health.store_accessible);
assert_eq!(health.jobs_total, 0);
cron.shutdown().await;
}
#[tokio::test]
async fn session_mode_persists() {
let (cron, _sched) = make_scheduler().await;
let mut job = CronJobDefinition::new(
"session_test".to_string(),
"0 * * * * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
job.session_mode = HeartbeatContextMode::FullyEphemeral;
let id = cron.add_job(job).await.unwrap();
let loaded = cron.get_job(id).await.unwrap();
assert_eq!(loaded.session_mode, HeartbeatContextMode::FullyEphemeral);
cron.shutdown().await;
}
#[tokio::test]
async fn dead_letter_status_roundtrip() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"dl_test".to_string(),
"0 * * * * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
cron.store
.record_failure(id, 10, CronJobStatus::DeadLetter)
.await
.unwrap();
let loaded = cron.get_job(id).await.unwrap();
assert_eq!(loaded.status, CronJobStatus::DeadLetter);
cron.resume_job(id).await.unwrap();
let resumed = cron.get_job(id).await.unwrap();
assert_eq!(resumed.status, CronJobStatus::Active);
assert_eq!(resumed.failure_count, 0);
cron.shutdown().await;
}
#[tokio::test]
async fn jitter_field_persists() {
let (cron, _sched) = make_scheduler().await;
let mut job = CronJobDefinition::new(
"jitter_test".to_string(),
"0 * * * * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
job.jitter_max_secs = 5;
let id = cron.add_job(job).await.unwrap();
let loaded = cron.get_job(id).await.unwrap();
assert_eq!(loaded.jitter_max_secs, 5);
cron.shutdown().await;
}
#[tokio::test]
async fn add_job_with_six_field_cron() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"six_field_job".to_string(),
"0 */5 * * * *".to_string(), "UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
let loaded = cron.get_job(id).await.unwrap();
assert_eq!(loaded.cron_expression, "0 */5 * * * *");
assert!(loaded.next_run.is_some());
cron.shutdown().await;
}
#[tokio::test]
async fn trigger_now_with_six_field_cron() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"trigger_six".to_string(),
"0 0 0 1 1 *".to_string(), "UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
cron.trigger_now(id).await.unwrap();
let history = cron.get_run_history(id, 10).await.unwrap();
assert!(!history.is_empty());
assert_eq!(history[0].status, JobRunStatus::Succeeded);
cron.shutdown().await;
}
#[tokio::test]
async fn update_job_changes_cron_expression() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"update_cron".to_string(),
"0 * * * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
let original = cron.get_job(id).await.unwrap();
let mut updated = original.clone();
updated.cron_expression = "0 */10 * * * *".to_string();
cron.update_job(updated).await.unwrap();
let loaded = cron.get_job(id).await.unwrap();
assert_eq!(loaded.cron_expression, "0 */10 * * * *");
assert!(loaded.next_run.is_some());
assert!(loaded.updated_at > original.updated_at);
cron.shutdown().await;
}
#[tokio::test]
async fn update_job_rejects_invalid_cron() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"update_bad".to_string(),
"0 * * * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
let mut bad = cron.get_job(id).await.unwrap();
bad.cron_expression = "nope".to_string();
assert!(cron.update_job(bad).await.is_err());
let loaded = cron.get_job(id).await.unwrap();
assert_eq!(loaded.cron_expression, "0 * * * * *");
cron.shutdown().await;
}
#[tokio::test]
async fn update_job_rejects_invalid_timezone() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"update_bad_tz".to_string(),
"0 * * * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
let mut bad = cron.get_job(id).await.unwrap();
bad.timezone = "Fake/Zone".to_string();
assert!(cron.update_job(bad).await.is_err());
cron.shutdown().await;
}
#[tokio::test]
async fn update_job_changes_timezone() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"tz_update".to_string(),
"0 0 12 * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
let original_next = cron.get_job(id).await.unwrap().next_run;
let mut updated = cron.get_job(id).await.unwrap();
updated.timezone = "America/New_York".to_string();
cron.update_job(updated).await.unwrap();
let loaded = cron.get_job(id).await.unwrap();
assert_eq!(loaded.timezone, "America/New_York");
assert_ne!(loaded.next_run, original_next);
cron.shutdown().await;
}
#[tokio::test]
async fn get_next_runs_api_six_field() {
let (cron, _sched) = make_scheduler().await;
let runs = cron.get_next_runs("0 * * * * *", "UTC", 3).unwrap();
assert_eq!(runs.len(), 3);
for pair in runs.windows(2) {
assert!(pair[1] > pair[0]);
}
cron.shutdown().await;
}
#[tokio::test]
async fn get_next_runs_api_rejects_bad_expression() {
let (cron, _sched) = make_scheduler().await;
assert!(cron.get_next_runs("bad", "UTC", 3).is_err());
cron.shutdown().await;
}
#[tokio::test]
async fn get_next_runs_api_rejects_bad_timezone() {
let (cron, _sched) = make_scheduler().await;
assert!(cron.get_next_runs("0 * * * * *", "Bogus/Tz", 3).is_err());
cron.shutdown().await;
}
#[tokio::test]
async fn multiple_jobs_coexist() {
let (cron, _sched) = make_scheduler().await;
let job_a = CronJobDefinition::new(
"job_a".to_string(),
"0 * * * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
let job_b = CronJobDefinition::new(
"job_b".to_string(),
"0 */10 * * * *".to_string(),
"America/Chicago".to_string(),
test_agent_config(),
);
let job_c = CronJobDefinition::new(
"job_c".to_string(),
"0 0 9 * * Mon-Fri".to_string(),
"Europe/London".to_string(),
test_agent_config(),
);
let id_a = cron.add_job(job_a).await.unwrap();
let id_b = cron.add_job(job_b).await.unwrap();
let id_c = cron.add_job(job_c).await.unwrap();
let all = cron.list_jobs().await.unwrap();
assert_eq!(all.len(), 3);
cron.remove_job(id_b).await.unwrap();
let remaining = cron.list_jobs().await.unwrap();
assert_eq!(remaining.len(), 2);
let ids: Vec<_> = remaining.iter().map(|j| j.job_id).collect();
assert!(ids.contains(&id_a));
assert!(ids.contains(&id_c));
cron.shutdown().await;
}
#[tokio::test]
async fn health_check_reflects_job_states() {
let (cron, _sched) = make_scheduler().await;
let active_job = CronJobDefinition::new(
"active".to_string(),
"0 * * * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
cron.add_job(active_job).await.unwrap();
let pause_job = CronJobDefinition::new(
"paused".to_string(),
"0 * * * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
let pid = cron.add_job(pause_job).await.unwrap();
cron.pause_job(pid).await.unwrap();
let dl_job = CronJobDefinition::new(
"dead_letter".to_string(),
"0 * * * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
let did = cron.add_job(dl_job).await.unwrap();
cron.store
.record_failure(did, 10, CronJobStatus::DeadLetter)
.await
.unwrap();
let health = cron.check_health().await.unwrap();
assert_eq!(health.jobs_total, 3);
assert_eq!(health.jobs_active, 1);
assert_eq!(health.jobs_paused, 1);
assert_eq!(health.jobs_dead_letter, 1);
cron.shutdown().await;
}
#[tokio::test]
async fn pause_nonexistent_job_returns_not_found() {
let (cron, _sched) = make_scheduler().await;
let bogus_id = CronJobId::new();
assert!(matches!(
cron.pause_job(bogus_id).await,
Err(CronSchedulerError::NotFound(_))
));
cron.shutdown().await;
}
#[tokio::test]
async fn resume_nonexistent_job_returns_not_found() {
let (cron, _sched) = make_scheduler().await;
let bogus_id = CronJobId::new();
assert!(matches!(
cron.resume_job(bogus_id).await,
Err(CronSchedulerError::NotFound(_))
));
cron.shutdown().await;
}
#[tokio::test]
async fn remove_nonexistent_job_returns_not_found() {
let (cron, _sched) = make_scheduler().await;
let bogus_id = CronJobId::new();
assert!(matches!(
cron.remove_job(bogus_id).await,
Err(CronSchedulerError::NotFound(_))
));
cron.shutdown().await;
}
#[tokio::test]
async fn trigger_now_nonexistent_job_returns_not_found() {
let (cron, _sched) = make_scheduler().await;
let bogus_id = CronJobId::new();
assert!(matches!(
cron.trigger_now(bogus_id).await,
Err(CronSchedulerError::NotFound(_))
));
cron.shutdown().await;
}
#[tokio::test]
async fn double_pause_is_idempotent() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"double_pause".to_string(),
"0 * * * * *".to_string(),
"UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
cron.pause_job(id).await.unwrap();
cron.pause_job(id).await.unwrap(); let loaded = cron.get_job(id).await.unwrap();
assert_eq!(loaded.status, CronJobStatus::Paused);
cron.shutdown().await;
}
#[tokio::test]
async fn metrics_update_after_trigger_now() {
let (cron, _sched) = make_scheduler().await;
let job = CronJobDefinition::new(
"metrics_trigger".to_string(),
"0 0 0 1 1 * 2099".to_string(),
"UTC".to_string(),
test_agent_config(),
);
let id = cron.add_job(job).await.unwrap();
let before = cron.metrics();
assert_eq!(before.runs_total, 0);
cron.trigger_now(id).await.unwrap();
let history = cron.get_run_history(id, 10).await.unwrap();
assert_eq!(history.len(), 1);
assert_eq!(history[0].status, JobRunStatus::Succeeded);
assert!(history[0].execution_time_ms.is_some());
cron.shutdown().await;
}
#[tokio::test]
async fn health_check_after_shutdown() {
let (cron, _sched) = make_scheduler().await;
cron.shutdown().await;
let health = cron.check_health().await.unwrap();
assert!(!health.is_running);
assert!(health.store_accessible);
}
}