mockforge_core/time_travel/
cron.rs1use chrono::{DateTime, Utc};
32use cron::Schedule as CronSchedule;
33use serde::{Deserialize, Serialize};
34use std::collections::HashMap;
35use std::sync::Arc;
36use tokio::sync::RwLock;
37use tracing::{debug, info, warn};
38
39use super::{get_global_clock, VirtualClock};
40
41pub enum CronJobAction {
43 Callback(Box<dyn Fn(DateTime<Utc>) -> Result<(), String> + Send + Sync>),
45 ScheduledResponse {
47 body: serde_json::Value,
49 status: u16,
51 headers: HashMap<String, String>,
53 },
54 DataMutation {
56 entity: String,
58 operation: String,
60 },
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct CronJob {
69 pub id: String,
71 pub name: String,
73 pub schedule: String,
75 #[serde(default = "default_true")]
77 pub enabled: bool,
78 #[serde(default)]
80 pub description: Option<String>,
81 #[serde(default)]
83 pub last_execution: Option<DateTime<Utc>>,
84 #[serde(default)]
86 pub next_execution: Option<DateTime<Utc>>,
87 #[serde(default)]
89 pub execution_count: usize,
90 #[serde(default)]
92 pub action_type: String,
93 #[serde(default)]
95 pub action_metadata: serde_json::Value,
96}
97
98fn default_true() -> bool {
99 true
100}
101
102impl CronJob {
103 pub fn new(id: String, name: String, schedule: String) -> Self {
105 Self {
106 id,
107 name,
108 schedule,
109 enabled: true,
110 description: None,
111 last_execution: None,
112 next_execution: None,
113 execution_count: 0,
114 action_type: String::new(),
115 action_metadata: serde_json::Value::Null,
116 }
117 }
118
119 pub fn calculate_next_execution(&self, from: DateTime<Utc>) -> Option<DateTime<Utc>> {
121 if !self.enabled {
122 return None;
123 }
124
125 match CronSchedule::from_str(&self.schedule) {
126 Ok(schedule) => {
127 schedule.after(&from).next()
129 }
130 Err(e) => {
131 warn!("Invalid cron schedule '{}' for job '{}': {}", self.schedule, self.id, e);
132 None
133 }
134 }
135 }
136}
137
138pub struct CronScheduler {
140 clock: Arc<VirtualClock>,
142 jobs: Arc<RwLock<HashMap<String, CronJob>>>,
144 actions: Arc<RwLock<HashMap<String, Arc<CronJobAction>>>>,
146}
147
148impl CronScheduler {
149 pub fn new(clock: Arc<VirtualClock>) -> Self {
151 Self {
152 clock,
153 jobs: Arc::new(RwLock::new(HashMap::new())),
154 actions: Arc::new(RwLock::new(HashMap::new())),
155 }
156 }
157
158 pub fn new_with_global_clock() -> Self {
163 let clock = get_global_clock().unwrap_or_else(|| Arc::new(VirtualClock::new()));
164 Self::new(clock)
165 }
166
167 pub async fn add_job(&self, job: CronJob, action: CronJobAction) -> Result<(), String> {
169 CronSchedule::from_str(&job.schedule)
171 .map_err(|e| format!("Invalid cron expression '{}': {}", job.schedule, e))?;
172
173 let now = self.clock.now();
175 let next_execution = job.calculate_next_execution(now);
176
177 let mut job_with_next = job;
178 job_with_next.next_execution = next_execution;
179
180 let job_id = job_with_next.id.clone();
181
182 let mut jobs = self.jobs.write().await;
184 jobs.insert(job_id.clone(), job_with_next);
185
186 let mut actions = self.actions.write().await;
187 actions.insert(job_id.clone(), Arc::new(action));
188
189 info!("Added cron job '{}' with schedule '{}'", job_id, jobs[&job_id].schedule);
190 Ok(())
191 }
192
193 pub async fn remove_job(&self, job_id: &str) -> bool {
195 let mut jobs = self.jobs.write().await;
196 let mut actions = self.actions.write().await;
197
198 let removed = jobs.remove(job_id).is_some();
199 actions.remove(job_id);
200
201 if removed {
202 info!("Removed cron job '{}'", job_id);
203 }
204
205 removed
206 }
207
208 pub async fn get_job(&self, job_id: &str) -> Option<CronJob> {
210 let jobs = self.jobs.read().await;
211 jobs.get(job_id).cloned()
212 }
213
214 pub async fn list_jobs(&self) -> Vec<CronJob> {
216 let jobs = self.jobs.read().await;
217 jobs.values().cloned().collect()
218 }
219
220 pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> Result<(), String> {
222 let mut jobs = self.jobs.write().await;
223
224 if let Some(job) = jobs.get_mut(job_id) {
225 job.enabled = enabled;
226
227 if enabled {
229 let now = self.clock.now();
230 job.next_execution = job.calculate_next_execution(now);
231 } else {
232 job.next_execution = None;
233 }
234
235 info!("Cron job '{}' {}", job_id, if enabled { "enabled" } else { "disabled" });
236 Ok(())
237 } else {
238 Err(format!("Cron job '{}' not found", job_id))
239 }
240 }
241
242 pub async fn check_and_execute(&self) -> Result<usize, String> {
247 let now = self.clock.now();
248 let mut executed = 0;
249
250 let mut jobs_to_execute = Vec::new();
252
253 {
254 let jobs = self.jobs.read().await;
255 for job in jobs.values() {
256 if !job.enabled {
257 continue;
258 }
259
260 if let Some(next) = job.next_execution {
261 if now >= next {
262 jobs_to_execute.push(job.id.clone());
263 }
264 }
265 }
266 }
267
268 for job_id in jobs_to_execute {
270 if let Err(e) = self.execute_job(&job_id).await {
271 warn!("Error executing cron job '{}': {}", job_id, e);
272 } else {
273 executed += 1;
274 }
275 }
276
277 Ok(executed)
278 }
279
280 async fn execute_job(&self, job_id: &str) -> Result<(), String> {
282 let now = self.clock.now();
283
284 let (job, action) = {
286 let jobs = self.jobs.read().await;
287 let actions = self.actions.read().await;
288
289 let job = jobs.get(job_id).ok_or_else(|| format!("Job '{}' not found", job_id))?;
290 let action = actions
291 .get(job_id)
292 .ok_or_else(|| format!("Action for job '{}' not found", job_id))?;
293
294 (job.clone(), Arc::clone(action))
295 };
296
297 match action.as_ref() {
299 CronJobAction::Callback(callback) => {
300 debug!("Executing callback for cron job '{}'", job_id);
301 callback(now)?;
302 }
303 CronJobAction::ScheduledResponse {
304 body,
305 status,
306 headers,
307 } => {
308 debug!("Scheduled response for cron job '{}'", job_id);
309 info!("Cron job '{}' triggered scheduled response: {}", job_id, status);
312 }
313 CronJobAction::DataMutation { entity, operation } => {
314 debug!("Data mutation for cron job '{}': {} on {}", job_id, operation, entity);
315 info!("Cron job '{}' triggered data mutation: {} on {}", job_id, operation, entity);
318 }
319 }
320
321 {
323 let mut jobs = self.jobs.write().await;
324 if let Some(job) = jobs.get_mut(job_id) {
325 job.last_execution = Some(now);
326 job.execution_count += 1;
327
328 job.next_execution = job.calculate_next_execution(now);
330 }
331 }
332
333 info!("Executed cron job '{}'", job_id);
334 Ok(())
335 }
336
337 pub fn clock(&self) -> Arc<VirtualClock> {
339 self.clock.clone()
340 }
341}
342
343fn parse_cron_schedule(schedule: &str) -> Result<CronSchedule, String> {
345 CronSchedule::from_str(schedule).map_err(|e| format!("Invalid cron expression: {}", e))
346}
347
348pub use cron::Schedule;
350
351use std::str::FromStr;
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357
358 #[test]
359 fn test_cron_job_creation() {
360 let job =
361 CronJob::new("test-1".to_string(), "Test Job".to_string(), "0 3 * * *".to_string());
362
363 assert_eq!(job.id, "test-1");
364 assert_eq!(job.name, "Test Job");
365 assert_eq!(job.schedule, "0 3 * * *");
366 assert!(job.enabled);
367 }
368
369 #[test]
370 fn test_cron_schedule_parsing() {
371 let schedule = CronSchedule::from_str("0 3 * * *").unwrap();
372 let now = Utc::now();
373 let next = schedule.after(&now).next();
374 assert!(next.is_some());
375 }
376
377 #[tokio::test]
378 async fn test_cron_scheduler_add_job() {
379 let clock = Arc::new(VirtualClock::new());
380 clock.enable_and_set(Utc::now());
381 let scheduler = CronScheduler::new(clock);
382
383 let job =
384 CronJob::new("test-1".to_string(), "Test Job".to_string(), "0 3 * * *".to_string());
385
386 let action = CronJobAction::Callback(Box::new(|_| {
387 println!("Test callback");
388 Ok(())
389 }));
390
391 scheduler.add_job(job, action).await.unwrap();
392
393 let jobs = scheduler.list_jobs().await;
394 assert_eq!(jobs.len(), 1);
395 assert_eq!(jobs[0].id, "test-1");
396 }
397}