mockforge_core/time_travel/
cron.rs1use chrono::{DateTime, Utc};
32use cron::Schedule as CronSchedule;
33use serde::{Deserialize, Serialize};
34use std::collections::HashMap;
35use std::sync::Arc;
36use tokio::sync::RwLock;
37use tracing::{debug, info, warn};
38use uuid::Uuid;
39
40use super::{get_global_clock, VirtualClock};
41
42pub enum CronJobAction {
44 Callback(Box<dyn Fn(DateTime<Utc>) -> Result<(), String> + Send + Sync>),
46 ScheduledResponse {
48 body: serde_json::Value,
50 status: u16,
52 headers: HashMap<String, String>,
54 },
55 DataMutation {
57 entity: String,
59 operation: String,
61 },
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct CronJob {
70 pub id: String,
72 pub name: String,
74 pub schedule: String,
76 #[serde(default = "default_true")]
78 pub enabled: bool,
79 #[serde(default)]
81 pub description: Option<String>,
82 #[serde(default)]
84 pub last_execution: Option<DateTime<Utc>>,
85 #[serde(default)]
87 pub next_execution: Option<DateTime<Utc>>,
88 #[serde(default)]
90 pub execution_count: usize,
91 #[serde(default)]
93 pub action_type: String,
94 #[serde(default)]
96 pub action_metadata: serde_json::Value,
97}
98
99fn default_true() -> bool {
100 true
101}
102
103impl CronJob {
104 pub fn new(id: String, name: String, schedule: String) -> Self {
106 Self {
107 id,
108 name,
109 schedule,
110 enabled: true,
111 description: None,
112 last_execution: None,
113 next_execution: None,
114 execution_count: 0,
115 action_type: String::new(),
116 action_metadata: serde_json::Value::Null,
117 }
118 }
119
120 pub fn calculate_next_execution(&self, from: DateTime<Utc>) -> Option<DateTime<Utc>> {
122 if !self.enabled {
123 return None;
124 }
125
126 match CronSchedule::from_str(&self.schedule) {
127 Ok(schedule) => {
128 schedule.after(&from).next()
130 }
131 Err(e) => {
132 warn!("Invalid cron schedule '{}' for job '{}': {}", self.schedule, self.id, e);
133 None
134 }
135 }
136 }
137}
138
139pub struct CronScheduler {
141 clock: Arc<VirtualClock>,
143 jobs: Arc<RwLock<HashMap<String, CronJob>>>,
145 actions: Arc<RwLock<HashMap<String, Arc<CronJobAction>>>>,
147 response_scheduler: Option<Arc<super::ResponseScheduler>>,
149 mutation_rule_manager: Option<Arc<dyn std::any::Any + Send + Sync>>,
152}
153
154impl CronScheduler {
155 pub fn new(clock: Arc<VirtualClock>) -> Self {
157 Self {
158 clock,
159 jobs: Arc::new(RwLock::new(HashMap::new())),
160 actions: Arc::new(RwLock::new(HashMap::new())),
161 response_scheduler: None,
162 mutation_rule_manager: None,
163 }
164 }
165
166 pub fn with_response_scheduler(mut self, scheduler: Arc<super::ResponseScheduler>) -> Self {
168 self.response_scheduler = Some(scheduler);
169 self
170 }
171
172 pub fn with_mutation_rule_manager(mut self, manager: Arc<dyn std::any::Any + Send + Sync>) -> Self {
176 self.mutation_rule_manager = Some(manager);
177 self
178 }
179
180 pub fn new_with_global_clock() -> Self {
185 let clock = get_global_clock().unwrap_or_else(|| Arc::new(VirtualClock::new()));
186 Self::new(clock)
187 }
188
189 pub async fn add_job(&self, job: CronJob, action: CronJobAction) -> Result<(), String> {
191 CronSchedule::from_str(&job.schedule)
193 .map_err(|e| format!("Invalid cron expression '{}': {}", job.schedule, e))?;
194
195 let now = self.clock.now();
197 let next_execution = job.calculate_next_execution(now);
198
199 let mut job_with_next = job;
200 job_with_next.next_execution = next_execution;
201
202 let job_id = job_with_next.id.clone();
203
204 let mut jobs = self.jobs.write().await;
206 jobs.insert(job_id.clone(), job_with_next);
207
208 let mut actions = self.actions.write().await;
209 actions.insert(job_id.clone(), Arc::new(action));
210
211 info!("Added cron job '{}' with schedule '{}'", job_id, jobs[&job_id].schedule);
212 Ok(())
213 }
214
215 pub async fn remove_job(&self, job_id: &str) -> bool {
217 let mut jobs = self.jobs.write().await;
218 let mut actions = self.actions.write().await;
219
220 let removed = jobs.remove(job_id).is_some();
221 actions.remove(job_id);
222
223 if removed {
224 info!("Removed cron job '{}'", job_id);
225 }
226
227 removed
228 }
229
230 pub async fn get_job(&self, job_id: &str) -> Option<CronJob> {
232 let jobs = self.jobs.read().await;
233 jobs.get(job_id).cloned()
234 }
235
236 pub async fn list_jobs(&self) -> Vec<CronJob> {
238 let jobs = self.jobs.read().await;
239 jobs.values().cloned().collect()
240 }
241
242 pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> Result<(), String> {
244 let mut jobs = self.jobs.write().await;
245
246 if let Some(job) = jobs.get_mut(job_id) {
247 job.enabled = enabled;
248
249 if enabled {
251 let now = self.clock.now();
252 job.next_execution = job.calculate_next_execution(now);
253 } else {
254 job.next_execution = None;
255 }
256
257 info!("Cron job '{}' {}", job_id, if enabled { "enabled" } else { "disabled" });
258 Ok(())
259 } else {
260 Err(format!("Cron job '{}' not found", job_id))
261 }
262 }
263
264 pub async fn check_and_execute(&self) -> Result<usize, String> {
269 let now = self.clock.now();
270 let mut executed = 0;
271
272 let mut jobs_to_execute = Vec::new();
274
275 {
276 let jobs = self.jobs.read().await;
277 for job in jobs.values() {
278 if !job.enabled {
279 continue;
280 }
281
282 if let Some(next) = job.next_execution {
283 if now >= next {
284 jobs_to_execute.push(job.id.clone());
285 }
286 }
287 }
288 }
289
290 for job_id in jobs_to_execute {
292 if let Err(e) = self.execute_job(&job_id).await {
293 warn!("Error executing cron job '{}': {}", job_id, e);
294 } else {
295 executed += 1;
296 }
297 }
298
299 Ok(executed)
300 }
301
302 async fn execute_job(&self, job_id: &str) -> Result<(), String> {
304 let now = self.clock.now();
305
306 let (job, action) = {
308 let jobs = self.jobs.read().await;
309 let actions = self.actions.read().await;
310
311 let job = jobs.get(job_id).ok_or_else(|| format!("Job '{}' not found", job_id))?;
312 let action = actions
313 .get(job_id)
314 .ok_or_else(|| format!("Action for job '{}' not found", job_id))?;
315
316 (job.clone(), Arc::clone(action))
317 };
318
319 match action.as_ref() {
321 CronJobAction::Callback(callback) => {
322 debug!("Executing callback for cron job '{}'", job_id);
323 callback(now)?;
324 }
325 CronJobAction::ScheduledResponse {
326 body,
327 status,
328 headers,
329 } => {
330 debug!("Scheduled response for cron job '{}'", job_id);
331
332 if let Some(ref scheduler) = self.response_scheduler {
334 let scheduled_response = super::ScheduledResponse {
336 id: format!("cron-{}-{}", job_id, Uuid::new_v4()),
337 trigger_time: now,
338 body: body.clone(),
339 status: *status,
340 headers: headers.clone(),
341 name: Some(format!("Cron job: {}", job_id)),
342 repeat: None,
343 };
344
345 match scheduler.schedule(scheduled_response) {
346 Ok(response_id) => {
347 info!("Cron job '{}' scheduled response: {}", job_id, response_id);
348 }
349 Err(e) => {
350 warn!("Failed to schedule response for cron job '{}': {}", job_id, e);
351 }
352 }
353 } else {
354 warn!("Cron job '{}' triggered scheduled response but ResponseScheduler not configured", job_id);
355 info!("Cron job '{}' triggered scheduled response: {} (ResponseScheduler not available)", job_id, status);
356 }
357 }
358 CronJobAction::DataMutation { entity, operation } => {
359 debug!("Data mutation for cron job '{}': {} on {}", job_id, operation, entity);
360
361 if self.mutation_rule_manager.is_some() {
370 info!("Cron job '{}' triggered data mutation: {} on {} (MutationRuleManager available but execution requires database and registry)", job_id, operation, entity);
371 } else {
372 warn!("Cron job '{}' triggered data mutation but MutationRuleManager not configured", job_id);
373 info!("Cron job '{}' triggered data mutation: {} on {} (MutationRuleManager not available)", job_id, operation, entity);
374 }
375 }
376 }
377
378 {
380 let mut jobs = self.jobs.write().await;
381 if let Some(job) = jobs.get_mut(job_id) {
382 job.last_execution = Some(now);
383 job.execution_count += 1;
384
385 job.next_execution = job.calculate_next_execution(now);
387 }
388 }
389
390 info!("Executed cron job '{}'", job_id);
391 Ok(())
392 }
393
394 pub fn clock(&self) -> Arc<VirtualClock> {
396 self.clock.clone()
397 }
398}
399
400fn parse_cron_schedule(schedule: &str) -> Result<CronSchedule, String> {
402 CronSchedule::from_str(schedule).map_err(|e| format!("Invalid cron expression: {}", e))
403}
404
405pub use cron::Schedule;
407
408use std::str::FromStr;
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414
415 #[test]
416 fn test_cron_job_creation() {
417 let job =
418 CronJob::new("test-1".to_string(), "Test Job".to_string(), "0 3 * * *".to_string());
419
420 assert_eq!(job.id, "test-1");
421 assert_eq!(job.name, "Test Job");
422 assert_eq!(job.schedule, "0 3 * * *");
423 assert!(job.enabled);
424 }
425
426 #[test]
427 fn test_cron_schedule_parsing() {
428 let schedule = CronSchedule::from_str("0 3 * * *").unwrap();
429 let now = Utc::now();
430 let next = schedule.after(&now).next();
431 assert!(next.is_some());
432 }
433
434 #[tokio::test]
435 async fn test_cron_scheduler_add_job() {
436 let clock = Arc::new(VirtualClock::new());
437 clock.enable_and_set(Utc::now());
438 let scheduler = CronScheduler::new(clock);
439
440 let job =
441 CronJob::new("test-1".to_string(), "Test Job".to_string(), "0 3 * * *".to_string());
442
443 let action = CronJobAction::Callback(Box::new(|_| {
444 println!("Test callback");
445 Ok(())
446 }));
447
448 scheduler.add_job(job, action).await.unwrap();
449
450 let jobs = scheduler.list_jobs().await;
451 assert_eq!(jobs.len(), 1);
452 assert_eq!(jobs[0].id, "test-1");
453 }
454}