mockforge_chaos/
scenario_scheduler.rs

1//! Time-based scenario scheduling
2
3use crate::scenarios::ChaosScenario;
4use chrono::{DateTime, Duration, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::mpsc;
10use tokio::time::interval;
11use tracing::{debug, info, warn};
12
13/// Schedule type
14#[derive(Debug, Clone, Serialize, Deserialize)]
15#[serde(tag = "type")]
16pub enum ScheduleType {
17    /// Run once at a specific time
18    Once { at: DateTime<Utc> },
19    /// Run after a delay
20    Delayed { delay_seconds: u64 },
21    /// Run periodically with interval
22    Periodic {
23        interval_seconds: u64,
24        /// Maximum executions (0 = infinite)
25        max_executions: usize,
26    },
27    /// Run on a cron-like schedule (simplified)
28    Cron {
29        /// Hour (0-23)
30        hour: Option<u8>,
31        /// Minute (0-59)
32        minute: Option<u8>,
33        /// Day of week (0-6, 0 = Sunday)
34        day_of_week: Option<u8>,
35        /// Maximum executions (0 = infinite)
36        max_executions: usize,
37    },
38}
39
40/// Scheduled scenario
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ScheduledScenario {
43    /// Unique ID
44    pub id: String,
45    /// Scenario to execute
46    pub scenario: ChaosScenario,
47    /// Schedule configuration
48    pub schedule: ScheduleType,
49    /// Is enabled
50    pub enabled: bool,
51    /// Number of times executed
52    pub execution_count: usize,
53    /// Last execution time
54    pub last_executed: Option<DateTime<Utc>>,
55    /// Next scheduled execution
56    pub next_execution: Option<DateTime<Utc>>,
57}
58
59impl ScheduledScenario {
60    /// Create a new scheduled scenario
61    pub fn new(id: impl Into<String>, scenario: ChaosScenario, schedule: ScheduleType) -> Self {
62        let mut scheduled = Self {
63            id: id.into(),
64            scenario,
65            schedule,
66            enabled: true,
67            execution_count: 0,
68            last_executed: None,
69            next_execution: None,
70        };
71
72        scheduled.calculate_next_execution();
73        scheduled
74    }
75
76    /// Calculate next execution time
77    pub fn calculate_next_execution(&mut self) {
78        let now = Utc::now();
79
80        self.next_execution = match &self.schedule {
81            ScheduleType::Once { at } => {
82                if *at > now && self.execution_count == 0 {
83                    Some(*at)
84                } else {
85                    None
86                }
87            }
88            ScheduleType::Delayed { delay_seconds } => {
89                if self.execution_count == 0 {
90                    Some(now + Duration::seconds(*delay_seconds as i64))
91                } else {
92                    None
93                }
94            }
95            ScheduleType::Periodic {
96                interval_seconds,
97                max_executions,
98            } => {
99                if *max_executions == 0 || self.execution_count < *max_executions {
100                    Some(now + Duration::seconds(*interval_seconds as i64))
101                } else {
102                    None
103                }
104            }
105            ScheduleType::Cron {
106                hour: _,
107                minute: _,
108                day_of_week: _,
109                max_executions,
110            } => {
111                if *max_executions > 0 && self.execution_count >= *max_executions {
112                    None
113                } else {
114                    // Simplified cron calculation - just add 1 hour for next execution
115                    // In a production system, you'd use a cron library
116                    let next = now + Duration::hours(1);
117
118                    // This is a simplified implementation
119                    // For full cron support, integrate with a cron parsing library like `cron`
120                    Some(next)
121                }
122            }
123        };
124    }
125
126    /// Check if should execute now
127    pub fn should_execute(&self) -> bool {
128        if !self.enabled {
129            return false;
130        }
131
132        if let Some(next) = self.next_execution {
133            Utc::now() >= next
134        } else {
135            false
136        }
137    }
138
139    /// Mark as executed
140    pub fn mark_executed(&mut self) {
141        self.execution_count += 1;
142        self.last_executed = Some(Utc::now());
143        self.calculate_next_execution();
144    }
145}
146
147/// Scenario scheduler
148pub struct ScenarioScheduler {
149    /// Scheduled scenarios
150    schedules: Arc<RwLock<HashMap<String, ScheduledScenario>>>,
151    /// Execution callback channel
152    execution_tx: Arc<RwLock<Option<mpsc::Sender<ScheduledScenario>>>>,
153    /// Scheduler task handle
154    task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
155}
156
157impl ScenarioScheduler {
158    /// Create a new scheduler
159    pub fn new() -> Self {
160        Self {
161            schedules: Arc::new(RwLock::new(HashMap::new())),
162            execution_tx: Arc::new(RwLock::new(None)),
163            task_handle: Arc::new(RwLock::new(None)),
164        }
165    }
166
167    /// Add a scheduled scenario
168    pub fn add_schedule(&self, scheduled: ScheduledScenario) {
169        let id = scheduled.id.clone();
170        let mut schedules = self.schedules.write();
171        schedules.insert(id.clone(), scheduled);
172        info!("Added scheduled scenario: {}", id);
173    }
174
175    /// Remove a scheduled scenario
176    pub fn remove_schedule(&self, id: &str) -> Option<ScheduledScenario> {
177        let mut schedules = self.schedules.write();
178        let removed = schedules.remove(id);
179        if removed.is_some() {
180            info!("Removed scheduled scenario: {}", id);
181        }
182        removed
183    }
184
185    /// Get a scheduled scenario
186    pub fn get_schedule(&self, id: &str) -> Option<ScheduledScenario> {
187        let schedules = self.schedules.read();
188        schedules.get(id).cloned()
189    }
190
191    /// Get all scheduled scenarios
192    pub fn get_all_schedules(&self) -> Vec<ScheduledScenario> {
193        let schedules = self.schedules.read();
194        schedules.values().cloned().collect()
195    }
196
197    /// Enable a scheduled scenario
198    pub fn enable_schedule(&self, id: &str) -> Result<(), String> {
199        let mut schedules = self.schedules.write();
200        if let Some(scheduled) = schedules.get_mut(id) {
201            scheduled.enabled = true;
202            scheduled.calculate_next_execution();
203            info!("Enabled scheduled scenario: {}", id);
204            Ok(())
205        } else {
206            Err(format!("Schedule '{}' not found", id))
207        }
208    }
209
210    /// Disable a scheduled scenario
211    pub fn disable_schedule(&self, id: &str) -> Result<(), String> {
212        let mut schedules = self.schedules.write();
213        if let Some(scheduled) = schedules.get_mut(id) {
214            scheduled.enabled = false;
215            info!("Disabled scheduled scenario: {}", id);
216            Ok(())
217        } else {
218            Err(format!("Schedule '{}' not found", id))
219        }
220    }
221
222    /// Start the scheduler with a callback
223    pub async fn start<F>(&self, callback: F)
224    where
225        F: Fn(ScheduledScenario) + Send + 'static,
226    {
227        // Check if already running
228        {
229            let task_handle = self.task_handle.read();
230            if task_handle.is_some() {
231                warn!("Scheduler already running");
232                return;
233            }
234        }
235
236        info!("Starting scenario scheduler");
237
238        let (tx, rx) = mpsc::channel::<ScheduledScenario>(100);
239
240        // Store execution channel
241        {
242            let mut execution_tx = self.execution_tx.write();
243            *execution_tx = Some(tx);
244        }
245
246        // Spawn scheduler task
247        let schedules = Arc::clone(&self.schedules);
248        let handle = tokio::spawn(async move {
249            Self::scheduler_task(schedules, rx, callback).await;
250        });
251
252        // Store task handle
253        {
254            let mut task_handle = self.task_handle.write();
255            *task_handle = Some(handle);
256        }
257    }
258
259    /// Scheduler task (runs in background)
260    async fn scheduler_task<F>(
261        schedules: Arc<RwLock<HashMap<String, ScheduledScenario>>>,
262        mut rx: mpsc::Receiver<ScheduledScenario>,
263        callback: F,
264    ) where
265        F: Fn(ScheduledScenario),
266    {
267        let mut interval = interval(std::time::Duration::from_secs(1));
268
269        loop {
270            tokio::select! {
271                _ = interval.tick() => {
272                    // Check for scenarios to execute
273                    let mut to_execute = Vec::new();
274
275                    {
276                        let mut schedules_guard = schedules.write();
277
278                        for (id, scheduled) in schedules_guard.iter_mut() {
279                            if scheduled.should_execute() {
280                                debug!("Triggering scheduled scenario: {}", id);
281                                to_execute.push(scheduled.clone());
282                                scheduled.mark_executed();
283                            }
284                        }
285                    }
286
287                    // Execute scenarios
288                    for scheduled in to_execute {
289                        info!("Executing scheduled scenario: {}", scheduled.id);
290                        callback(scheduled);
291                    }
292                }
293
294                Some(scheduled) = rx.recv() => {
295                    // Manual execution request
296                    info!("Manual execution of scheduled scenario: {}", scheduled.id);
297                    callback(scheduled);
298                }
299
300                else => break,
301            }
302        }
303
304        info!("Scheduler task stopped");
305    }
306
307    /// Stop the scheduler
308    pub async fn stop(&self) {
309        info!("Stopping scenario scheduler");
310
311        // Clear execution channel
312        {
313            let mut execution_tx = self.execution_tx.write();
314            *execution_tx = None;
315        }
316
317        // Abort task
318        let mut task_handle = self.task_handle.write();
319        if let Some(handle) = task_handle.take() {
320            handle.abort();
321        }
322    }
323
324    /// Manually trigger a scheduled scenario
325    pub async fn trigger_now(&self, id: &str) -> Result<(), String> {
326        let scheduled = {
327            let schedules = self.schedules.read();
328            schedules.get(id).cloned()
329        };
330
331        if let Some(scheduled) = scheduled {
332            let tx = self.execution_tx.read().as_ref().cloned();
333            if let Some(tx) = tx {
334                tx.send(scheduled).await.map_err(|e| format!("Failed to trigger: {}", e))?;
335                Ok(())
336            } else {
337                Err("Scheduler not started".to_string())
338            }
339        } else {
340            Err(format!("Schedule '{}' not found", id))
341        }
342    }
343
344    /// Get next scheduled execution
345    pub fn get_next_execution(&self) -> Option<(String, DateTime<Utc>)> {
346        let schedules = self.schedules.read();
347        schedules
348            .iter()
349            .filter_map(|(id, s)| s.next_execution.map(|t| (id.clone(), t)))
350            .min_by_key(|(_, t)| *t)
351    }
352}
353
354impl Default for ScenarioScheduler {
355    fn default() -> Self {
356        Self::new()
357    }
358}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363    use crate::config::ChaosConfig;
364
365    #[test]
366    fn test_scheduled_scenario_once() {
367        let scenario = ChaosScenario::new("test", ChaosConfig::default());
368        let future_time = Utc::now() + Duration::hours(1);
369        let schedule = ScheduleType::Once { at: future_time };
370
371        let scheduled = ScheduledScenario::new("sched1", scenario, schedule);
372
373        assert_eq!(scheduled.id, "sched1");
374        assert!(scheduled.enabled);
375        assert_eq!(scheduled.execution_count, 0);
376        assert!(scheduled.next_execution.is_some());
377    }
378
379    #[test]
380    fn test_scheduled_scenario_periodic() {
381        let scenario = ChaosScenario::new("test", ChaosConfig::default());
382        let schedule = ScheduleType::Periodic {
383            interval_seconds: 60,
384            max_executions: 10,
385        };
386
387        let scheduled = ScheduledScenario::new("sched1", scenario, schedule);
388
389        assert!(scheduled.next_execution.is_some());
390    }
391
392    #[test]
393    fn test_scheduler_add_remove() {
394        let scheduler = ScenarioScheduler::new();
395        let scenario = ChaosScenario::new("test", ChaosConfig::default());
396        let schedule = ScheduleType::Delayed { delay_seconds: 10 };
397
398        let scheduled = ScheduledScenario::new("sched1", scenario, schedule);
399
400        scheduler.add_schedule(scheduled.clone());
401        assert!(scheduler.get_schedule("sched1").is_some());
402
403        let removed = scheduler.remove_schedule("sched1");
404        assert!(removed.is_some());
405        assert!(scheduler.get_schedule("sched1").is_none());
406    }
407
408    #[test]
409    fn test_enable_disable() {
410        let scheduler = ScenarioScheduler::new();
411        let scenario = ChaosScenario::new("test", ChaosConfig::default());
412        let schedule = ScheduleType::Delayed { delay_seconds: 10 };
413
414        let scheduled = ScheduledScenario::new("sched1", scenario, schedule);
415        scheduler.add_schedule(scheduled);
416
417        scheduler.disable_schedule("sched1").unwrap();
418        let s = scheduler.get_schedule("sched1").unwrap();
419        assert!(!s.enabled);
420
421        scheduler.enable_schedule("sched1").unwrap();
422        let s = scheduler.get_schedule("sched1").unwrap();
423        assert!(s.enabled);
424    }
425}