mockforge_core/time_travel/
cron.rs

1//! Cron scheduler for simulated recurring events
2//!
3//! This module provides a cron-like scheduler that integrates with the virtual clock
4//! to support time-based recurring events. It works alongside the ResponseScheduler
5//! to handle complex recurring schedules while ResponseScheduler handles one-time
6//! and simple interval responses.
7//!
8//! ## Usage
9//!
10//! ```rust,no_run
11//! use mockforge_core::time_travel::{CronScheduler, CronJob, CronJobAction};
12//! use std::sync::Arc;
13//!
14//! let scheduler = CronScheduler::new(clock.clone());
15//!
16//! // Schedule a job that runs every day at 3am
17//! let job = CronJob {
18//!     id: "daily-cleanup".to_string(),
19//!     name: "Daily Cleanup".to_string(),
20//!     schedule: "0 3 * * *".to_string(), // 3am every day
21//!     action: CronJobAction::Callback(Box::new(|_| {
22//!         println!("Running daily cleanup");
23//!         Ok(())
24//!     })),
25//!     enabled: true,
26//! };
27//!
28//! scheduler.add_job(job).await?;
29//! ```
30
31use chrono::{DateTime, Utc};
32use cron::Schedule as CronSchedule;
33use serde::{Deserialize, Serialize};
34use std::collections::HashMap;
35use std::sync::Arc;
36use tokio::sync::RwLock;
37use tracing::{debug, info, warn};
38
39use super::{get_global_clock, VirtualClock};
40
41/// Action to execute when a cron job triggers
42pub enum CronJobAction {
43    /// Execute a callback function
44    Callback(Box<dyn Fn(DateTime<Utc>) -> Result<(), String> + Send + Sync>),
45    /// Send a scheduled response (integrated with ResponseScheduler)
46    ScheduledResponse {
47        /// Response body
48        body: serde_json::Value,
49        /// HTTP status code
50        status: u16,
51        /// Response headers
52        headers: HashMap<String, String>,
53    },
54    /// Trigger a data mutation (for VBR integration)
55    DataMutation {
56        /// Entity name
57        entity: String,
58        /// Mutation operation
59        operation: String,
60    },
61}
62
63// Note: CronJobAction cannot be Serialized/Deserialized due to the callback.
64// For persistence, we'll need to store job metadata separately.
65
66/// A cron job definition
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct CronJob {
69    /// Unique identifier for this job
70    pub id: String,
71    /// Human-readable name
72    pub name: String,
73    /// Cron expression (e.g., "0 3 * * *" for 3am daily)
74    pub schedule: String,
75    /// Whether this job is enabled
76    #[serde(default = "default_true")]
77    pub enabled: bool,
78    /// Optional description
79    #[serde(default)]
80    pub description: Option<String>,
81    /// Last execution time
82    #[serde(default)]
83    pub last_execution: Option<DateTime<Utc>>,
84    /// Next scheduled execution time
85    #[serde(default)]
86    pub next_execution: Option<DateTime<Utc>>,
87    /// Number of times this job has executed
88    #[serde(default)]
89    pub execution_count: usize,
90    /// Action type (for serialization - actual action stored separately)
91    #[serde(default)]
92    pub action_type: String,
93    /// Action metadata (for serialization)
94    #[serde(default)]
95    pub action_metadata: serde_json::Value,
96}
97
98fn default_true() -> bool {
99    true
100}
101
102impl CronJob {
103    /// Create a new cron job
104    pub fn new(id: String, name: String, schedule: String) -> Self {
105        Self {
106            id,
107            name,
108            schedule,
109            enabled: true,
110            description: None,
111            last_execution: None,
112            next_execution: None,
113            execution_count: 0,
114            action_type: String::new(),
115            action_metadata: serde_json::Value::Null,
116        }
117    }
118
119    /// Calculate the next execution time based on the cron schedule
120    pub fn calculate_next_execution(&self, from: DateTime<Utc>) -> Option<DateTime<Utc>> {
121        if !self.enabled {
122            return None;
123        }
124
125        match CronSchedule::from_str(&self.schedule) {
126            Ok(schedule) => {
127                // Get the next occurrence after the given time
128                schedule.after(&from).next()
129            }
130            Err(e) => {
131                warn!("Invalid cron schedule '{}' for job '{}': {}", self.schedule, self.id, e);
132                None
133            }
134        }
135    }
136}
137
138/// Cron scheduler that integrates with the virtual clock
139pub struct CronScheduler {
140    /// Virtual clock reference
141    clock: Arc<VirtualClock>,
142    /// Registered cron jobs
143    jobs: Arc<RwLock<HashMap<String, CronJob>>>,
144    /// Job actions (stored separately since they can't be serialized)
145    actions: Arc<RwLock<HashMap<String, Arc<CronJobAction>>>>,
146}
147
148impl CronScheduler {
149    /// Create a new cron scheduler
150    pub fn new(clock: Arc<VirtualClock>) -> Self {
151        Self {
152            clock,
153            jobs: Arc::new(RwLock::new(HashMap::new())),
154            actions: Arc::new(RwLock::new(HashMap::new())),
155        }
156    }
157
158    /// Create a new cron scheduler using the global clock
159    ///
160    /// This will use the global virtual clock registry if available,
161    /// or create a new clock if none is registered.
162    pub fn new_with_global_clock() -> Self {
163        let clock = get_global_clock().unwrap_or_else(|| Arc::new(VirtualClock::new()));
164        Self::new(clock)
165    }
166
167    /// Add a cron job
168    pub async fn add_job(&self, job: CronJob, action: CronJobAction) -> Result<(), String> {
169        // Validate cron expression
170        CronSchedule::from_str(&job.schedule)
171            .map_err(|e| format!("Invalid cron expression '{}': {}", job.schedule, e))?;
172
173        // Calculate next execution time
174        let now = self.clock.now();
175        let next_execution = job.calculate_next_execution(now);
176
177        let mut job_with_next = job;
178        job_with_next.next_execution = next_execution;
179
180        let job_id = job_with_next.id.clone();
181
182        // Store job and action
183        let mut jobs = self.jobs.write().await;
184        jobs.insert(job_id.clone(), job_with_next);
185
186        let mut actions = self.actions.write().await;
187        actions.insert(job_id.clone(), Arc::new(action));
188
189        info!("Added cron job '{}' with schedule '{}'", job_id, jobs[&job_id].schedule);
190        Ok(())
191    }
192
193    /// Remove a cron job
194    pub async fn remove_job(&self, job_id: &str) -> bool {
195        let mut jobs = self.jobs.write().await;
196        let mut actions = self.actions.write().await;
197
198        let removed = jobs.remove(job_id).is_some();
199        actions.remove(job_id);
200
201        if removed {
202            info!("Removed cron job '{}'", job_id);
203        }
204
205        removed
206    }
207
208    /// Get a cron job by ID
209    pub async fn get_job(&self, job_id: &str) -> Option<CronJob> {
210        let jobs = self.jobs.read().await;
211        jobs.get(job_id).cloned()
212    }
213
214    /// List all cron jobs
215    pub async fn list_jobs(&self) -> Vec<CronJob> {
216        let jobs = self.jobs.read().await;
217        jobs.values().cloned().collect()
218    }
219
220    /// Enable or disable a cron job
221    pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> Result<(), String> {
222        let mut jobs = self.jobs.write().await;
223
224        if let Some(job) = jobs.get_mut(job_id) {
225            job.enabled = enabled;
226
227            // Recalculate next execution if enabling
228            if enabled {
229                let now = self.clock.now();
230                job.next_execution = job.calculate_next_execution(now);
231            } else {
232                job.next_execution = None;
233            }
234
235            info!("Cron job '{}' {}", job_id, if enabled { "enabled" } else { "disabled" });
236            Ok(())
237        } else {
238            Err(format!("Cron job '{}' not found", job_id))
239        }
240    }
241
242    /// Check for jobs that should execute now and execute them
243    ///
244    /// This should be called periodically (e.g., every second) to check
245    /// if any jobs are due for execution.
246    pub async fn check_and_execute(&self) -> Result<usize, String> {
247        let now = self.clock.now();
248        let mut executed = 0;
249
250        // Get jobs that need to execute
251        let mut jobs_to_execute = Vec::new();
252
253        {
254            let jobs = self.jobs.read().await;
255            for job in jobs.values() {
256                if !job.enabled {
257                    continue;
258                }
259
260                if let Some(next) = job.next_execution {
261                    if now >= next {
262                        jobs_to_execute.push(job.id.clone());
263                    }
264                }
265            }
266        }
267
268        // Execute jobs
269        for job_id in jobs_to_execute {
270            if let Err(e) = self.execute_job(&job_id).await {
271                warn!("Error executing cron job '{}': {}", job_id, e);
272            } else {
273                executed += 1;
274            }
275        }
276
277        Ok(executed)
278    }
279
280    /// Execute a specific cron job
281    async fn execute_job(&self, job_id: &str) -> Result<(), String> {
282        let now = self.clock.now();
283
284        // Get job and action
285        let (job, action) = {
286            let jobs = self.jobs.read().await;
287            let actions = self.actions.read().await;
288
289            let job = jobs.get(job_id).ok_or_else(|| format!("Job '{}' not found", job_id))?;
290            let action = actions
291                .get(job_id)
292                .ok_or_else(|| format!("Action for job '{}' not found", job_id))?;
293
294            (job.clone(), Arc::clone(action))
295        };
296
297        // Execute the action
298        match action.as_ref() {
299            CronJobAction::Callback(callback) => {
300                debug!("Executing callback for cron job '{}'", job_id);
301                callback(now)?;
302            }
303            CronJobAction::ScheduledResponse {
304                body,
305                status,
306                headers,
307            } => {
308                debug!("Scheduled response for cron job '{}'", job_id);
309                // TODO: Integrate with ResponseScheduler
310                // For now, just log
311                info!("Cron job '{}' triggered scheduled response: {}", job_id, status);
312            }
313            CronJobAction::DataMutation { entity, operation } => {
314                debug!("Data mutation for cron job '{}': {} on {}", job_id, operation, entity);
315                // TODO: Integrate with VBR mutation rules
316                // For now, just log
317                info!("Cron job '{}' triggered data mutation: {} on {}", job_id, operation, entity);
318            }
319        }
320
321        // Update job state
322        {
323            let mut jobs = self.jobs.write().await;
324            if let Some(job) = jobs.get_mut(job_id) {
325                job.last_execution = Some(now);
326                job.execution_count += 1;
327
328                // Calculate next execution
329                job.next_execution = job.calculate_next_execution(now);
330            }
331        }
332
333        info!("Executed cron job '{}'", job_id);
334        Ok(())
335    }
336
337    /// Get the virtual clock
338    pub fn clock(&self) -> Arc<VirtualClock> {
339        self.clock.clone()
340    }
341}
342
343// Helper function to parse cron schedule string
344fn parse_cron_schedule(schedule: &str) -> Result<CronSchedule, String> {
345    CronSchedule::from_str(schedule).map_err(|e| format!("Invalid cron expression: {}", e))
346}
347
348// Re-export Schedule for convenience
349pub use cron::Schedule;
350
351// Import Schedule::from_str
352use std::str::FromStr;
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    #[test]
359    fn test_cron_job_creation() {
360        let job =
361            CronJob::new("test-1".to_string(), "Test Job".to_string(), "0 3 * * *".to_string());
362
363        assert_eq!(job.id, "test-1");
364        assert_eq!(job.name, "Test Job");
365        assert_eq!(job.schedule, "0 3 * * *");
366        assert!(job.enabled);
367    }
368
369    #[test]
370    fn test_cron_schedule_parsing() {
371        let schedule = CronSchedule::from_str("0 3 * * *").unwrap();
372        let now = Utc::now();
373        let next = schedule.after(&now).next();
374        assert!(next.is_some());
375    }
376
377    #[tokio::test]
378    async fn test_cron_scheduler_add_job() {
379        let clock = Arc::new(VirtualClock::new());
380        clock.enable_and_set(Utc::now());
381        let scheduler = CronScheduler::new(clock);
382
383        let job =
384            CronJob::new("test-1".to_string(), "Test Job".to_string(), "0 3 * * *".to_string());
385
386        let action = CronJobAction::Callback(Box::new(|_| {
387            println!("Test callback");
388            Ok(())
389        }));
390
391        scheduler.add_job(job, action).await.unwrap();
392
393        let jobs = scheduler.list_jobs().await;
394        assert_eq!(jobs.len(), 1);
395        assert_eq!(jobs[0].id, "test-1");
396    }
397}