mockforge_core/time_travel/
cron.rs

1//! Cron scheduler for simulated recurring events
2//!
3//! This module provides a cron-like scheduler that integrates with the virtual clock
4//! to support time-based recurring events. It works alongside the ResponseScheduler
5//! to handle complex recurring schedules while ResponseScheduler handles one-time
6//! and simple interval responses.
7//!
8//! ## Usage
9//!
10//! ```rust,no_run
11//! use mockforge_core::time_travel::{CronScheduler, CronJob, CronJobAction};
12//! use std::sync::Arc;
13//!
14//! let scheduler = CronScheduler::new(clock.clone());
15//!
16//! // Schedule a job that runs every day at 3am
17//! let job = CronJob {
18//!     id: "daily-cleanup".to_string(),
19//!     name: "Daily Cleanup".to_string(),
20//!     schedule: "0 3 * * *".to_string(), // 3am every day
21//!     action: CronJobAction::Callback(Box::new(|_| {
22//!         println!("Running daily cleanup");
23//!         Ok(())
24//!     })),
25//!     enabled: true,
26//! };
27//!
28//! scheduler.add_job(job).await?;
29//! ```
30
31use chrono::{DateTime, Utc};
32use cron::Schedule as CronSchedule;
33use serde::{Deserialize, Serialize};
34use std::collections::HashMap;
35use std::sync::Arc;
36use tokio::sync::RwLock;
37use tracing::{debug, info, warn};
38use uuid::Uuid;
39
40use super::{get_global_clock, VirtualClock};
41
42/// Action to execute when a cron job triggers
43pub enum CronJobAction {
44    /// Execute a callback function
45    Callback(Box<dyn Fn(DateTime<Utc>) -> Result<(), String> + Send + Sync>),
46    /// Send a scheduled response (integrated with ResponseScheduler)
47    ScheduledResponse {
48        /// Response body
49        body: serde_json::Value,
50        /// HTTP status code
51        status: u16,
52        /// Response headers
53        headers: HashMap<String, String>,
54    },
55    /// Trigger a data mutation (for VBR integration)
56    DataMutation {
57        /// Entity name
58        entity: String,
59        /// Mutation operation
60        operation: String,
61    },
62}
63
64// Note: CronJobAction cannot be Serialized/Deserialized due to the callback.
65// For persistence, we'll need to store job metadata separately.
66
67/// A cron job definition
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct CronJob {
70    /// Unique identifier for this job
71    pub id: String,
72    /// Human-readable name
73    pub name: String,
74    /// Cron expression (e.g., "0 3 * * *" for 3am daily)
75    pub schedule: String,
76    /// Whether this job is enabled
77    #[serde(default = "default_true")]
78    pub enabled: bool,
79    /// Optional description
80    #[serde(default)]
81    pub description: Option<String>,
82    /// Last execution time
83    #[serde(default)]
84    pub last_execution: Option<DateTime<Utc>>,
85    /// Next scheduled execution time
86    #[serde(default)]
87    pub next_execution: Option<DateTime<Utc>>,
88    /// Number of times this job has executed
89    #[serde(default)]
90    pub execution_count: usize,
91    /// Action type (for serialization - actual action stored separately)
92    #[serde(default)]
93    pub action_type: String,
94    /// Action metadata (for serialization)
95    #[serde(default)]
96    pub action_metadata: serde_json::Value,
97}
98
99fn default_true() -> bool {
100    true
101}
102
103impl CronJob {
104    /// Create a new cron job
105    pub fn new(id: String, name: String, schedule: String) -> Self {
106        Self {
107            id,
108            name,
109            schedule,
110            enabled: true,
111            description: None,
112            last_execution: None,
113            next_execution: None,
114            execution_count: 0,
115            action_type: String::new(),
116            action_metadata: serde_json::Value::Null,
117        }
118    }
119
120    /// Calculate the next execution time based on the cron schedule
121    pub fn calculate_next_execution(&self, from: DateTime<Utc>) -> Option<DateTime<Utc>> {
122        if !self.enabled {
123            return None;
124        }
125
126        match CronSchedule::from_str(&self.schedule) {
127            Ok(schedule) => {
128                // Get the next occurrence after the given time
129                schedule.after(&from).next()
130            }
131            Err(e) => {
132                warn!("Invalid cron schedule '{}' for job '{}': {}", self.schedule, self.id, e);
133                None
134            }
135        }
136    }
137}
138
139/// Cron scheduler that integrates with the virtual clock
140pub struct CronScheduler {
141    /// Virtual clock reference
142    clock: Arc<VirtualClock>,
143    /// Registered cron jobs
144    jobs: Arc<RwLock<HashMap<String, CronJob>>>,
145    /// Job actions (stored separately since they can't be serialized)
146    actions: Arc<RwLock<HashMap<String, Arc<CronJobAction>>>>,
147    /// Optional ResponseScheduler for scheduled response integration
148    response_scheduler: Option<Arc<super::ResponseScheduler>>,
149    /// Optional MutationRuleManager for VBR mutation integration
150    /// Note: This requires database and registry to be passed when executing mutations
151    mutation_rule_manager: Option<Arc<dyn std::any::Any + Send + Sync>>,
152}
153
154impl CronScheduler {
155    /// Create a new cron scheduler
156    pub fn new(clock: Arc<VirtualClock>) -> Self {
157        Self {
158            clock,
159            jobs: Arc::new(RwLock::new(HashMap::new())),
160            actions: Arc::new(RwLock::new(HashMap::new())),
161            response_scheduler: None,
162            mutation_rule_manager: None,
163        }
164    }
165
166    /// Set the ResponseScheduler for scheduled response integration
167    pub fn with_response_scheduler(mut self, scheduler: Arc<super::ResponseScheduler>) -> Self {
168        self.response_scheduler = Some(scheduler);
169        self
170    }
171
172    /// Set the MutationRuleManager for VBR mutation integration
173    /// Note: This is stored as Any since MutationRuleManager is in a different crate
174    /// The actual execution requires database and registry to be passed separately
175    pub fn with_mutation_rule_manager(mut self, manager: Arc<dyn std::any::Any + Send + Sync>) -> Self {
176        self.mutation_rule_manager = Some(manager);
177        self
178    }
179
180    /// Create a new cron scheduler using the global clock
181    ///
182    /// This will use the global virtual clock registry if available,
183    /// or create a new clock if none is registered.
184    pub fn new_with_global_clock() -> Self {
185        let clock = get_global_clock().unwrap_or_else(|| Arc::new(VirtualClock::new()));
186        Self::new(clock)
187    }
188
189    /// Add a cron job
190    pub async fn add_job(&self, job: CronJob, action: CronJobAction) -> Result<(), String> {
191        // Validate cron expression
192        CronSchedule::from_str(&job.schedule)
193            .map_err(|e| format!("Invalid cron expression '{}': {}", job.schedule, e))?;
194
195        // Calculate next execution time
196        let now = self.clock.now();
197        let next_execution = job.calculate_next_execution(now);
198
199        let mut job_with_next = job;
200        job_with_next.next_execution = next_execution;
201
202        let job_id = job_with_next.id.clone();
203
204        // Store job and action
205        let mut jobs = self.jobs.write().await;
206        jobs.insert(job_id.clone(), job_with_next);
207
208        let mut actions = self.actions.write().await;
209        actions.insert(job_id.clone(), Arc::new(action));
210
211        info!("Added cron job '{}' with schedule '{}'", job_id, jobs[&job_id].schedule);
212        Ok(())
213    }
214
215    /// Remove a cron job
216    pub async fn remove_job(&self, job_id: &str) -> bool {
217        let mut jobs = self.jobs.write().await;
218        let mut actions = self.actions.write().await;
219
220        let removed = jobs.remove(job_id).is_some();
221        actions.remove(job_id);
222
223        if removed {
224            info!("Removed cron job '{}'", job_id);
225        }
226
227        removed
228    }
229
230    /// Get a cron job by ID
231    pub async fn get_job(&self, job_id: &str) -> Option<CronJob> {
232        let jobs = self.jobs.read().await;
233        jobs.get(job_id).cloned()
234    }
235
236    /// List all cron jobs
237    pub async fn list_jobs(&self) -> Vec<CronJob> {
238        let jobs = self.jobs.read().await;
239        jobs.values().cloned().collect()
240    }
241
242    /// Enable or disable a cron job
243    pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> Result<(), String> {
244        let mut jobs = self.jobs.write().await;
245
246        if let Some(job) = jobs.get_mut(job_id) {
247            job.enabled = enabled;
248
249            // Recalculate next execution if enabling
250            if enabled {
251                let now = self.clock.now();
252                job.next_execution = job.calculate_next_execution(now);
253            } else {
254                job.next_execution = None;
255            }
256
257            info!("Cron job '{}' {}", job_id, if enabled { "enabled" } else { "disabled" });
258            Ok(())
259        } else {
260            Err(format!("Cron job '{}' not found", job_id))
261        }
262    }
263
264    /// Check for jobs that should execute now and execute them
265    ///
266    /// This should be called periodically (e.g., every second) to check
267    /// if any jobs are due for execution.
268    pub async fn check_and_execute(&self) -> Result<usize, String> {
269        let now = self.clock.now();
270        let mut executed = 0;
271
272        // Get jobs that need to execute
273        let mut jobs_to_execute = Vec::new();
274
275        {
276            let jobs = self.jobs.read().await;
277            for job in jobs.values() {
278                if !job.enabled {
279                    continue;
280                }
281
282                if let Some(next) = job.next_execution {
283                    if now >= next {
284                        jobs_to_execute.push(job.id.clone());
285                    }
286                }
287            }
288        }
289
290        // Execute jobs
291        for job_id in jobs_to_execute {
292            if let Err(e) = self.execute_job(&job_id).await {
293                warn!("Error executing cron job '{}': {}", job_id, e);
294            } else {
295                executed += 1;
296            }
297        }
298
299        Ok(executed)
300    }
301
302    /// Execute a specific cron job
303    async fn execute_job(&self, job_id: &str) -> Result<(), String> {
304        let now = self.clock.now();
305
306        // Get job and action
307        let (job, action) = {
308            let jobs = self.jobs.read().await;
309            let actions = self.actions.read().await;
310
311            let job = jobs.get(job_id).ok_or_else(|| format!("Job '{}' not found", job_id))?;
312            let action = actions
313                .get(job_id)
314                .ok_or_else(|| format!("Action for job '{}' not found", job_id))?;
315
316            (job.clone(), Arc::clone(action))
317        };
318
319        // Execute the action
320        match action.as_ref() {
321            CronJobAction::Callback(callback) => {
322                debug!("Executing callback for cron job '{}'", job_id);
323                callback(now)?;
324            }
325            CronJobAction::ScheduledResponse {
326                body,
327                status,
328                headers,
329            } => {
330                debug!("Scheduled response for cron job '{}'", job_id);
331
332                // Integrate with ResponseScheduler if available
333                if let Some(ref scheduler) = self.response_scheduler {
334                    // Create a ScheduledResponse for immediate execution (trigger_time = now)
335                    let scheduled_response = super::ScheduledResponse {
336                        id: format!("cron-{}-{}", job_id, Uuid::new_v4()),
337                        trigger_time: now,
338                        body: body.clone(),
339                        status: *status,
340                        headers: headers.clone(),
341                        name: Some(format!("Cron job: {}", job_id)),
342                        repeat: None,
343                    };
344
345                    match scheduler.schedule(scheduled_response) {
346                        Ok(response_id) => {
347                            info!("Cron job '{}' scheduled response: {}", job_id, response_id);
348                        }
349                        Err(e) => {
350                            warn!("Failed to schedule response for cron job '{}': {}", job_id, e);
351                        }
352                    }
353                } else {
354                    warn!("Cron job '{}' triggered scheduled response but ResponseScheduler not configured", job_id);
355                    info!("Cron job '{}' triggered scheduled response: {} (ResponseScheduler not available)", job_id, status);
356                }
357            }
358            CronJobAction::DataMutation { entity, operation } => {
359                debug!("Data mutation for cron job '{}': {} on {}", job_id, operation, entity);
360
361                // Note: VBR mutation execution requires database and registry
362                // which are not available in the cron scheduler context.
363                // The mutation_rule_manager is stored but cannot be executed here
364                // without the database and registry dependencies.
365                //
366                // For full integration, mutation rules should be created separately
367                // and referenced by ID, or the cron scheduler should be extended
368                // to accept database and registry as dependencies.
369                if self.mutation_rule_manager.is_some() {
370                    info!("Cron job '{}' triggered data mutation: {} on {} (MutationRuleManager available but execution requires database and registry)", job_id, operation, entity);
371                } else {
372                    warn!("Cron job '{}' triggered data mutation but MutationRuleManager not configured", job_id);
373                    info!("Cron job '{}' triggered data mutation: {} on {} (MutationRuleManager not available)", job_id, operation, entity);
374                }
375            }
376        }
377
378        // Update job state
379        {
380            let mut jobs = self.jobs.write().await;
381            if let Some(job) = jobs.get_mut(job_id) {
382                job.last_execution = Some(now);
383                job.execution_count += 1;
384
385                // Calculate next execution
386                job.next_execution = job.calculate_next_execution(now);
387            }
388        }
389
390        info!("Executed cron job '{}'", job_id);
391        Ok(())
392    }
393
394    /// Get the virtual clock
395    pub fn clock(&self) -> Arc<VirtualClock> {
396        self.clock.clone()
397    }
398}
399
400// Helper function to parse cron schedule string
401fn parse_cron_schedule(schedule: &str) -> Result<CronSchedule, String> {
402    CronSchedule::from_str(schedule).map_err(|e| format!("Invalid cron expression: {}", e))
403}
404
405// Re-export Schedule for convenience
406pub use cron::Schedule;
407
408// Import Schedule::from_str
409use std::str::FromStr;
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414
415    #[test]
416    fn test_cron_job_creation() {
417        let job =
418            CronJob::new("test-1".to_string(), "Test Job".to_string(), "0 3 * * *".to_string());
419
420        assert_eq!(job.id, "test-1");
421        assert_eq!(job.name, "Test Job");
422        assert_eq!(job.schedule, "0 3 * * *");
423        assert!(job.enabled);
424    }
425
426    #[test]
427    fn test_cron_schedule_parsing() {
428        let schedule = CronSchedule::from_str("0 3 * * *").unwrap();
429        let now = Utc::now();
430        let next = schedule.after(&now).next();
431        assert!(next.is_some());
432    }
433
434    #[tokio::test]
435    async fn test_cron_scheduler_add_job() {
436        let clock = Arc::new(VirtualClock::new());
437        clock.enable_and_set(Utc::now());
438        let scheduler = CronScheduler::new(clock);
439
440        let job =
441            CronJob::new("test-1".to_string(), "Test Job".to_string(), "0 3 * * *".to_string());
442
443        let action = CronJobAction::Callback(Box::new(|_| {
444            println!("Test callback");
445            Ok(())
446        }));
447
448        scheduler.add_job(job, action).await.unwrap();
449
450        let jobs = scheduler.list_jobs().await;
451        assert_eq!(jobs.len(), 1);
452        assert_eq!(jobs[0].id, "test-1");
453    }
454}