use chrono::{DateTime, Utc};
use cron::Schedule as CronSchedule;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use uuid::Uuid;
use super::{get_global_clock, VirtualClock};
pub enum CronJobAction {
Callback(Box<dyn Fn(DateTime<Utc>) -> Result<(), String> + Send + Sync>),
ScheduledResponse {
body: serde_json::Value,
status: u16,
headers: HashMap<String, String>,
},
DataMutation {
entity: String,
operation: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CronJob {
pub id: String,
pub name: String,
pub schedule: String,
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub last_execution: Option<DateTime<Utc>>,
#[serde(default)]
pub next_execution: Option<DateTime<Utc>>,
#[serde(default)]
pub execution_count: usize,
#[serde(default)]
pub action_type: String,
#[serde(default)]
pub action_metadata: serde_json::Value,
}
fn default_true() -> bool {
true
}
impl CronJob {
pub fn new(id: String, name: String, schedule: String) -> Self {
Self {
id,
name,
schedule,
enabled: true,
description: None,
last_execution: None,
next_execution: None,
execution_count: 0,
action_type: String::new(),
action_metadata: serde_json::Value::Null,
}
}
pub fn calculate_next_execution(&self, from: DateTime<Utc>) -> Option<DateTime<Utc>> {
if !self.enabled {
return None;
}
let trimmed_schedule = self.schedule.trim();
match CronSchedule::from_str(trimmed_schedule) {
Ok(schedule) => {
schedule.after(&from).next()
}
Err(e) => {
warn!("Invalid cron schedule '{}' for job '{}': {}", trimmed_schedule, self.id, e);
None
}
}
}
}
pub struct CronScheduler {
clock: Arc<VirtualClock>,
jobs: Arc<RwLock<HashMap<String, CronJob>>>,
actions: Arc<RwLock<HashMap<String, Arc<CronJobAction>>>>,
response_scheduler: Option<Arc<super::ResponseScheduler>>,
mutation_rule_manager: Option<Arc<dyn std::any::Any + Send + Sync>>,
}
impl CronScheduler {
pub fn new(clock: Arc<VirtualClock>) -> Self {
Self {
clock,
jobs: Arc::new(RwLock::new(HashMap::new())),
actions: Arc::new(RwLock::new(HashMap::new())),
response_scheduler: None,
mutation_rule_manager: None,
}
}
pub fn with_response_scheduler(mut self, scheduler: Arc<super::ResponseScheduler>) -> Self {
self.response_scheduler = Some(scheduler);
self
}
pub fn with_mutation_rule_manager(
mut self,
manager: Arc<dyn std::any::Any + Send + Sync>,
) -> Self {
self.mutation_rule_manager = Some(manager);
self
}
pub fn new_with_global_clock() -> Self {
let clock = get_global_clock().unwrap_or_else(|| Arc::new(VirtualClock::new()));
Self::new(clock)
}
pub async fn add_job(&self, job: CronJob, action: CronJobAction) -> Result<(), String> {
let now = self.clock.now();
let next_execution = job.calculate_next_execution(now);
if next_execution.is_none() {
warn!("Warning: Unable to calculate next execution for cron job '{}' with schedule '{}'. The job will be added but may not execute.", job.id, job.schedule);
}
let mut job_with_next = job;
job_with_next.next_execution = next_execution;
let job_id = job_with_next.id.clone();
let mut jobs = self.jobs.write().await;
jobs.insert(job_id.clone(), job_with_next);
let mut actions = self.actions.write().await;
actions.insert(job_id.clone(), Arc::new(action));
info!("Added cron job '{}' with schedule '{}'", job_id, jobs[&job_id].schedule);
Ok(())
}
pub async fn remove_job(&self, job_id: &str) -> bool {
let mut jobs = self.jobs.write().await;
let mut actions = self.actions.write().await;
let removed = jobs.remove(job_id).is_some();
actions.remove(job_id);
if removed {
info!("Removed cron job '{}'", job_id);
}
removed
}
pub async fn get_job(&self, job_id: &str) -> Option<CronJob> {
let jobs = self.jobs.read().await;
jobs.get(job_id).cloned()
}
pub async fn list_jobs(&self) -> Vec<CronJob> {
let jobs = self.jobs.read().await;
jobs.values().cloned().collect()
}
pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> Result<(), String> {
let mut jobs = self.jobs.write().await;
if let Some(job) = jobs.get_mut(job_id) {
job.enabled = enabled;
if enabled {
let now = self.clock.now();
job.next_execution = job.calculate_next_execution(now);
} else {
job.next_execution = None;
}
info!("Cron job '{}' {}", job_id, if enabled { "enabled" } else { "disabled" });
Ok(())
} else {
Err(format!("Cron job '{}' not found", job_id))
}
}
pub async fn check_and_execute(&self) -> Result<usize, String> {
let now = self.clock.now();
let mut executed = 0;
let mut jobs_to_execute = Vec::new();
{
let jobs = self.jobs.read().await;
for job in jobs.values() {
if !job.enabled {
continue;
}
if let Some(next) = job.next_execution {
if now >= next {
jobs_to_execute.push(job.id.clone());
}
}
}
}
for job_id in jobs_to_execute {
if let Err(e) = self.execute_job(&job_id).await {
warn!("Error executing cron job '{}': {}", job_id, e);
} else {
executed += 1;
}
}
Ok(executed)
}
async fn execute_job(&self, job_id: &str) -> Result<(), String> {
let now = self.clock.now();
let (_job, action) = {
let jobs = self.jobs.read().await;
let actions = self.actions.read().await;
let job = jobs.get(job_id).ok_or_else(|| format!("Job '{}' not found", job_id))?;
let action = actions
.get(job_id)
.ok_or_else(|| format!("Action for job '{}' not found", job_id))?;
(job.clone(), Arc::clone(action))
};
match action.as_ref() {
CronJobAction::Callback(callback) => {
debug!("Executing callback for cron job '{}'", job_id);
callback(now)?;
}
CronJobAction::ScheduledResponse {
body,
status,
headers,
} => {
debug!("Scheduled response for cron job '{}'", job_id);
if let Some(ref scheduler) = self.response_scheduler {
let scheduled_response = super::ScheduledResponse {
id: format!("cron-{}-{}", job_id, Uuid::new_v4()),
trigger_time: now,
body: body.clone(),
status: *status,
headers: headers.clone(),
name: Some(format!("Cron job: {}", job_id)),
repeat: None,
};
match scheduler.schedule(scheduled_response) {
Ok(response_id) => {
info!("Cron job '{}' scheduled response: {}", job_id, response_id);
}
Err(e) => {
warn!("Failed to schedule response for cron job '{}': {}", job_id, e);
}
}
} else {
warn!("Cron job '{}' triggered scheduled response but ResponseScheduler not configured", job_id);
info!("Cron job '{}' triggered scheduled response: {} (ResponseScheduler not available)", job_id, status);
}
}
CronJobAction::DataMutation { entity, operation } => {
debug!("Data mutation for cron job '{}': {} on {}", job_id, operation, entity);
if self.mutation_rule_manager.is_some() {
info!("Cron job '{}' triggered data mutation: {} on {} (MutationRuleManager available but execution requires database and registry)", job_id, operation, entity);
} else {
warn!("Cron job '{}' triggered data mutation but MutationRuleManager not configured", job_id);
info!("Cron job '{}' triggered data mutation: {} on {} (MutationRuleManager not available)", job_id, operation, entity);
}
}
}
{
let mut jobs = self.jobs.write().await;
if let Some(job) = jobs.get_mut(job_id) {
job.last_execution = Some(now);
job.execution_count += 1;
job.next_execution = job.calculate_next_execution(now);
}
}
info!("Executed cron job '{}'", job_id);
Ok(())
}
pub fn clock(&self) -> Arc<VirtualClock> {
self.clock.clone()
}
}
#[allow(dead_code)]
pub(crate) fn parse_cron_schedule(schedule: &str) -> Result<CronSchedule, String> {
let trimmed = schedule.trim();
CronSchedule::from_str(trimmed).map_err(|e| format!("Invalid cron expression: {}", e))
}
pub use cron::Schedule;
use std::str::FromStr;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cron_job_creation() {
let job =
CronJob::new("test-1".to_string(), "Test Job".to_string(), "0 3 * * *".to_string());
assert_eq!(job.id, "test-1");
assert_eq!(job.name, "Test Job");
assert_eq!(job.schedule, "0 3 * * *");
assert!(job.enabled);
}
#[test]
fn test_cron_schedule_parsing() {
let job = CronJob::new("test".to_string(), "Test".to_string(), "0 3 * * *".to_string());
assert_eq!(job.schedule, "0 3 * * *");
assert!(job.enabled);
}
#[tokio::test]
async fn test_cron_scheduler_add_job() {
let clock = Arc::new(VirtualClock::new());
clock.enable_and_set(Utc::now());
let scheduler = CronScheduler::new(clock);
let job =
CronJob::new("test-1".to_string(), "Test Job".to_string(), "0 3 * * *".to_string());
let action = CronJobAction::Callback(Box::new(|_| {
println!("Test callback");
Ok(())
}));
scheduler.add_job(job, action).await.unwrap();
let jobs = scheduler.list_jobs().await;
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].id, "test-1");
}
}