mockforge_core/time_travel/
cron.rs1use chrono::{DateTime, Utc};
32use cron::Schedule as CronSchedule;
33use serde::{Deserialize, Serialize};
34use std::collections::HashMap;
35use std::sync::Arc;
36use tokio::sync::RwLock;
37use tracing::{debug, info, warn};
38use uuid::Uuid;
39
40use super::{get_global_clock, VirtualClock};
41
42pub enum CronJobAction {
44 Callback(Box<dyn Fn(DateTime<Utc>) -> Result<(), String> + Send + Sync>),
46 ScheduledResponse {
48 body: serde_json::Value,
50 status: u16,
52 headers: HashMap<String, String>,
54 },
55 DataMutation {
57 entity: String,
59 operation: String,
61 },
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct CronJob {
70 pub id: String,
72 pub name: String,
74 pub schedule: String,
76 #[serde(default = "default_true")]
78 pub enabled: bool,
79 #[serde(default)]
81 pub description: Option<String>,
82 #[serde(default)]
84 pub last_execution: Option<DateTime<Utc>>,
85 #[serde(default)]
87 pub next_execution: Option<DateTime<Utc>>,
88 #[serde(default)]
90 pub execution_count: usize,
91 #[serde(default)]
93 pub action_type: String,
94 #[serde(default)]
96 pub action_metadata: serde_json::Value,
97}
98
99fn default_true() -> bool {
100 true
101}
102
103impl CronJob {
104 pub fn new(id: String, name: String, schedule: String) -> Self {
106 Self {
107 id,
108 name,
109 schedule,
110 enabled: true,
111 description: None,
112 last_execution: None,
113 next_execution: None,
114 execution_count: 0,
115 action_type: String::new(),
116 action_metadata: serde_json::Value::Null,
117 }
118 }
119
120 pub fn calculate_next_execution(&self, from: DateTime<Utc>) -> Option<DateTime<Utc>> {
122 if !self.enabled {
123 return None;
124 }
125
126 let trimmed_schedule = self.schedule.trim();
128 match CronSchedule::from_str(trimmed_schedule) {
129 Ok(schedule) => {
130 schedule.after(&from).next()
132 }
133 Err(e) => {
134 warn!("Invalid cron schedule '{}' for job '{}': {}", trimmed_schedule, self.id, e);
135 None
136 }
137 }
138 }
139}
140
141pub struct CronScheduler {
143 clock: Arc<VirtualClock>,
145 jobs: Arc<RwLock<HashMap<String, CronJob>>>,
147 actions: Arc<RwLock<HashMap<String, Arc<CronJobAction>>>>,
149 response_scheduler: Option<Arc<super::ResponseScheduler>>,
151 mutation_rule_manager: Option<Arc<dyn std::any::Any + Send + Sync>>,
154}
155
156impl CronScheduler {
157 pub fn new(clock: Arc<VirtualClock>) -> Self {
159 Self {
160 clock,
161 jobs: Arc::new(RwLock::new(HashMap::new())),
162 actions: Arc::new(RwLock::new(HashMap::new())),
163 response_scheduler: None,
164 mutation_rule_manager: None,
165 }
166 }
167
168 pub fn with_response_scheduler(mut self, scheduler: Arc<super::ResponseScheduler>) -> Self {
170 self.response_scheduler = Some(scheduler);
171 self
172 }
173
174 pub fn with_mutation_rule_manager(
178 mut self,
179 manager: Arc<dyn std::any::Any + Send + Sync>,
180 ) -> Self {
181 self.mutation_rule_manager = Some(manager);
182 self
183 }
184
185 pub fn new_with_global_clock() -> Self {
190 let clock = get_global_clock().unwrap_or_else(|| Arc::new(VirtualClock::new()));
191 Self::new(clock)
192 }
193
194 pub async fn add_job(&self, job: CronJob, action: CronJobAction) -> Result<(), String> {
196 let now = self.clock.now();
201 let next_execution = job.calculate_next_execution(now);
202
203 if next_execution.is_none() {
206 warn!("Warning: Unable to calculate next execution for cron job '{}' with schedule '{}'. The job will be added but may not execute.", job.id, job.schedule);
207 }
208
209 let mut job_with_next = job;
210 job_with_next.next_execution = next_execution;
211
212 let job_id = job_with_next.id.clone();
213
214 let mut jobs = self.jobs.write().await;
216 jobs.insert(job_id.clone(), job_with_next);
217
218 let mut actions = self.actions.write().await;
219 actions.insert(job_id.clone(), Arc::new(action));
220
221 info!("Added cron job '{}' with schedule '{}'", job_id, jobs[&job_id].schedule);
222 Ok(())
223 }
224
225 pub async fn remove_job(&self, job_id: &str) -> bool {
227 let mut jobs = self.jobs.write().await;
228 let mut actions = self.actions.write().await;
229
230 let removed = jobs.remove(job_id).is_some();
231 actions.remove(job_id);
232
233 if removed {
234 info!("Removed cron job '{}'", job_id);
235 }
236
237 removed
238 }
239
240 pub async fn get_job(&self, job_id: &str) -> Option<CronJob> {
242 let jobs = self.jobs.read().await;
243 jobs.get(job_id).cloned()
244 }
245
246 pub async fn list_jobs(&self) -> Vec<CronJob> {
248 let jobs = self.jobs.read().await;
249 jobs.values().cloned().collect()
250 }
251
252 pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> Result<(), String> {
254 let mut jobs = self.jobs.write().await;
255
256 if let Some(job) = jobs.get_mut(job_id) {
257 job.enabled = enabled;
258
259 if enabled {
261 let now = self.clock.now();
262 job.next_execution = job.calculate_next_execution(now);
263 } else {
264 job.next_execution = None;
265 }
266
267 info!("Cron job '{}' {}", job_id, if enabled { "enabled" } else { "disabled" });
268 Ok(())
269 } else {
270 Err(format!("Cron job '{}' not found", job_id))
271 }
272 }
273
274 pub async fn check_and_execute(&self) -> Result<usize, String> {
279 let now = self.clock.now();
280 let mut executed = 0;
281
282 let mut jobs_to_execute = Vec::new();
284
285 {
286 let jobs = self.jobs.read().await;
287 for job in jobs.values() {
288 if !job.enabled {
289 continue;
290 }
291
292 if let Some(next) = job.next_execution {
293 if now >= next {
294 jobs_to_execute.push(job.id.clone());
295 }
296 }
297 }
298 }
299
300 for job_id in jobs_to_execute {
302 if let Err(e) = self.execute_job(&job_id).await {
303 warn!("Error executing cron job '{}': {}", job_id, e);
304 } else {
305 executed += 1;
306 }
307 }
308
309 Ok(executed)
310 }
311
312 async fn execute_job(&self, job_id: &str) -> Result<(), String> {
314 let now = self.clock.now();
315
316 let (job, action) = {
318 let jobs = self.jobs.read().await;
319 let actions = self.actions.read().await;
320
321 let job = jobs.get(job_id).ok_or_else(|| format!("Job '{}' not found", job_id))?;
322 let action = actions
323 .get(job_id)
324 .ok_or_else(|| format!("Action for job '{}' not found", job_id))?;
325
326 (job.clone(), Arc::clone(action))
327 };
328
329 match action.as_ref() {
331 CronJobAction::Callback(callback) => {
332 debug!("Executing callback for cron job '{}'", job_id);
333 callback(now)?;
334 }
335 CronJobAction::ScheduledResponse {
336 body,
337 status,
338 headers,
339 } => {
340 debug!("Scheduled response for cron job '{}'", job_id);
341
342 if let Some(ref scheduler) = self.response_scheduler {
344 let scheduled_response = super::ScheduledResponse {
346 id: format!("cron-{}-{}", job_id, Uuid::new_v4()),
347 trigger_time: now,
348 body: body.clone(),
349 status: *status,
350 headers: headers.clone(),
351 name: Some(format!("Cron job: {}", job_id)),
352 repeat: None,
353 };
354
355 match scheduler.schedule(scheduled_response) {
356 Ok(response_id) => {
357 info!("Cron job '{}' scheduled response: {}", job_id, response_id);
358 }
359 Err(e) => {
360 warn!("Failed to schedule response for cron job '{}': {}", job_id, e);
361 }
362 }
363 } else {
364 warn!("Cron job '{}' triggered scheduled response but ResponseScheduler not configured", job_id);
365 info!("Cron job '{}' triggered scheduled response: {} (ResponseScheduler not available)", job_id, status);
366 }
367 }
368 CronJobAction::DataMutation { entity, operation } => {
369 debug!("Data mutation for cron job '{}': {} on {}", job_id, operation, entity);
370
371 if self.mutation_rule_manager.is_some() {
380 info!("Cron job '{}' triggered data mutation: {} on {} (MutationRuleManager available but execution requires database and registry)", job_id, operation, entity);
381 } else {
382 warn!("Cron job '{}' triggered data mutation but MutationRuleManager not configured", job_id);
383 info!("Cron job '{}' triggered data mutation: {} on {} (MutationRuleManager not available)", job_id, operation, entity);
384 }
385 }
386 }
387
388 {
390 let mut jobs = self.jobs.write().await;
391 if let Some(job) = jobs.get_mut(job_id) {
392 job.last_execution = Some(now);
393 job.execution_count += 1;
394
395 job.next_execution = job.calculate_next_execution(now);
397 }
398 }
399
400 info!("Executed cron job '{}'", job_id);
401 Ok(())
402 }
403
404 pub fn clock(&self) -> Arc<VirtualClock> {
406 self.clock.clone()
407 }
408}
409
410pub(crate) fn parse_cron_schedule(schedule: &str) -> Result<CronSchedule, String> {
412 let trimmed = schedule.trim();
414 CronSchedule::from_str(trimmed).map_err(|e| format!("Invalid cron expression: {}", e))
415}
416
417pub use cron::Schedule;
419
420use std::str::FromStr;
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426
427 #[test]
428 fn test_cron_job_creation() {
429 let job =
430 CronJob::new("test-1".to_string(), "Test Job".to_string(), "0 3 * * *".to_string());
431
432 assert_eq!(job.id, "test-1");
433 assert_eq!(job.name, "Test Job");
434 assert_eq!(job.schedule, "0 3 * * *");
435 assert!(job.enabled);
436 }
437
438 #[test]
439 fn test_cron_schedule_parsing() {
440 let job = CronJob::new("test".to_string(), "Test".to_string(), "0 3 * * *".to_string());
445 assert_eq!(job.schedule, "0 3 * * *");
446 assert!(job.enabled);
447 }
450
451 #[tokio::test]
452 async fn test_cron_scheduler_add_job() {
453 let clock = Arc::new(VirtualClock::new());
454 clock.enable_and_set(Utc::now());
455 let scheduler = CronScheduler::new(clock);
456
457 let job =
458 CronJob::new("test-1".to_string(), "Test Job".to_string(), "0 3 * * *".to_string());
459
460 let action = CronJobAction::Callback(Box::new(|_| {
461 println!("Test callback");
462 Ok(())
463 }));
464
465 scheduler.add_job(job, action).await.unwrap();
466
467 let jobs = scheduler.list_jobs().await;
468 assert_eq!(jobs.len(), 1);
469 assert_eq!(jobs[0].id, "test-1");
470 }
471}