1use crate::scenarios::ChaosScenario;
4use chrono::{DateTime, Duration, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::Arc;
9use tokio::sync::mpsc;
10use tokio::time::interval;
11use tracing::{debug, info, warn};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15#[serde(tag = "type")]
16pub enum ScheduleType {
17 Once { at: DateTime<Utc> },
19 Delayed { delay_seconds: u64 },
21 Periodic {
23 interval_seconds: u64,
24 max_executions: usize,
26 },
27 Cron {
29 hour: Option<u8>,
31 minute: Option<u8>,
33 day_of_week: Option<u8>,
35 max_executions: usize,
37 },
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ScheduledScenario {
43 pub id: String,
45 pub scenario: ChaosScenario,
47 pub schedule: ScheduleType,
49 pub enabled: bool,
51 pub execution_count: usize,
53 pub last_executed: Option<DateTime<Utc>>,
55 pub next_execution: Option<DateTime<Utc>>,
57}
58
59impl ScheduledScenario {
60 pub fn new(id: impl Into<String>, scenario: ChaosScenario, schedule: ScheduleType) -> Self {
62 let mut scheduled = Self {
63 id: id.into(),
64 scenario,
65 schedule,
66 enabled: true,
67 execution_count: 0,
68 last_executed: None,
69 next_execution: None,
70 };
71
72 scheduled.calculate_next_execution();
73 scheduled
74 }
75
76 pub fn calculate_next_execution(&mut self) {
78 let now = Utc::now();
79
80 self.next_execution = match &self.schedule {
81 ScheduleType::Once { at } => {
82 if *at > now && self.execution_count == 0 {
83 Some(*at)
84 } else {
85 None
86 }
87 }
88 ScheduleType::Delayed { delay_seconds } => {
89 if self.execution_count == 0 {
90 Some(now + Duration::seconds(*delay_seconds as i64))
91 } else {
92 None
93 }
94 }
95 ScheduleType::Periodic {
96 interval_seconds,
97 max_executions,
98 } => {
99 if *max_executions == 0 || self.execution_count < *max_executions {
100 Some(now + Duration::seconds(*interval_seconds as i64))
101 } else {
102 None
103 }
104 }
105 ScheduleType::Cron {
106 hour: _,
107 minute: _,
108 day_of_week: _,
109 max_executions,
110 } => {
111 if *max_executions > 0 && self.execution_count >= *max_executions {
112 None
113 } else {
114 let next = now + Duration::hours(1);
117
118 Some(next)
121 }
122 }
123 };
124 }
125
126 pub fn should_execute(&self) -> bool {
128 if !self.enabled {
129 return false;
130 }
131
132 if let Some(next) = self.next_execution {
133 Utc::now() >= next
134 } else {
135 false
136 }
137 }
138
139 pub fn mark_executed(&mut self) {
141 self.execution_count += 1;
142 self.last_executed = Some(Utc::now());
143 self.calculate_next_execution();
144 }
145}
146
147pub struct ScenarioScheduler {
149 schedules: Arc<RwLock<HashMap<String, ScheduledScenario>>>,
151 execution_tx: Arc<RwLock<Option<mpsc::Sender<ScheduledScenario>>>>,
153 task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
155}
156
157impl ScenarioScheduler {
158 pub fn new() -> Self {
160 Self {
161 schedules: Arc::new(RwLock::new(HashMap::new())),
162 execution_tx: Arc::new(RwLock::new(None)),
163 task_handle: Arc::new(RwLock::new(None)),
164 }
165 }
166
167 pub fn add_schedule(&self, scheduled: ScheduledScenario) {
169 let id = scheduled.id.clone();
170 let mut schedules = self.schedules.write();
171 schedules.insert(id.clone(), scheduled);
172 info!("Added scheduled scenario: {}", id);
173 }
174
175 pub fn remove_schedule(&self, id: &str) -> Option<ScheduledScenario> {
177 let mut schedules = self.schedules.write();
178 let removed = schedules.remove(id);
179 if removed.is_some() {
180 info!("Removed scheduled scenario: {}", id);
181 }
182 removed
183 }
184
185 pub fn get_schedule(&self, id: &str) -> Option<ScheduledScenario> {
187 let schedules = self.schedules.read();
188 schedules.get(id).cloned()
189 }
190
191 pub fn get_all_schedules(&self) -> Vec<ScheduledScenario> {
193 let schedules = self.schedules.read();
194 schedules.values().cloned().collect()
195 }
196
197 pub fn enable_schedule(&self, id: &str) -> Result<(), String> {
199 let mut schedules = self.schedules.write();
200 if let Some(scheduled) = schedules.get_mut(id) {
201 scheduled.enabled = true;
202 scheduled.calculate_next_execution();
203 info!("Enabled scheduled scenario: {}", id);
204 Ok(())
205 } else {
206 Err(format!("Schedule '{}' not found", id))
207 }
208 }
209
210 pub fn disable_schedule(&self, id: &str) -> Result<(), String> {
212 let mut schedules = self.schedules.write();
213 if let Some(scheduled) = schedules.get_mut(id) {
214 scheduled.enabled = false;
215 info!("Disabled scheduled scenario: {}", id);
216 Ok(())
217 } else {
218 Err(format!("Schedule '{}' not found", id))
219 }
220 }
221
222 pub async fn start<F>(&self, callback: F)
224 where
225 F: Fn(ScheduledScenario) + Send + 'static,
226 {
227 {
229 let task_handle = self.task_handle.read();
230 if task_handle.is_some() {
231 warn!("Scheduler already running");
232 return;
233 }
234 }
235
236 info!("Starting scenario scheduler");
237
238 let (tx, rx) = mpsc::channel::<ScheduledScenario>(100);
239
240 {
242 let mut execution_tx = self.execution_tx.write();
243 *execution_tx = Some(tx);
244 }
245
246 let schedules = Arc::clone(&self.schedules);
248 let handle = tokio::spawn(async move {
249 Self::scheduler_task(schedules, rx, callback).await;
250 });
251
252 {
254 let mut task_handle = self.task_handle.write();
255 *task_handle = Some(handle);
256 }
257 }
258
259 async fn scheduler_task<F>(
261 schedules: Arc<RwLock<HashMap<String, ScheduledScenario>>>,
262 mut rx: mpsc::Receiver<ScheduledScenario>,
263 callback: F,
264 ) where
265 F: Fn(ScheduledScenario),
266 {
267 let mut interval = interval(std::time::Duration::from_secs(1));
268
269 loop {
270 tokio::select! {
271 _ = interval.tick() => {
272 let mut to_execute = Vec::new();
274
275 {
276 let mut schedules_guard = schedules.write();
277
278 for (id, scheduled) in schedules_guard.iter_mut() {
279 if scheduled.should_execute() {
280 debug!("Triggering scheduled scenario: {}", id);
281 to_execute.push(scheduled.clone());
282 scheduled.mark_executed();
283 }
284 }
285 }
286
287 for scheduled in to_execute {
289 info!("Executing scheduled scenario: {}", scheduled.id);
290 callback(scheduled);
291 }
292 }
293
294 Some(scheduled) = rx.recv() => {
295 info!("Manual execution of scheduled scenario: {}", scheduled.id);
297 callback(scheduled);
298 }
299
300 else => break,
301 }
302 }
303
304 info!("Scheduler task stopped");
305 }
306
307 pub async fn stop(&self) {
309 info!("Stopping scenario scheduler");
310
311 {
313 let mut execution_tx = self.execution_tx.write();
314 *execution_tx = None;
315 }
316
317 let mut task_handle = self.task_handle.write();
319 if let Some(handle) = task_handle.take() {
320 handle.abort();
321 }
322 }
323
324 pub async fn trigger_now(&self, id: &str) -> Result<(), String> {
326 let scheduled = {
327 let schedules = self.schedules.read();
328 schedules.get(id).cloned()
329 };
330
331 if let Some(scheduled) = scheduled {
332 let tx = self.execution_tx.read().as_ref().cloned();
333 if let Some(tx) = tx {
334 tx.send(scheduled).await.map_err(|e| format!("Failed to trigger: {}", e))?;
335 Ok(())
336 } else {
337 Err("Scheduler not started".to_string())
338 }
339 } else {
340 Err(format!("Schedule '{}' not found", id))
341 }
342 }
343
344 pub fn get_next_execution(&self) -> Option<(String, DateTime<Utc>)> {
346 let schedules = self.schedules.read();
347 schedules
348 .iter()
349 .filter_map(|(id, s)| s.next_execution.map(|t| (id.clone(), t)))
350 .min_by_key(|(_, t)| *t)
351 }
352}
353
354impl Default for ScenarioScheduler {
355 fn default() -> Self {
356 Self::new()
357 }
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363 use crate::config::ChaosConfig;
364
365 #[test]
366 fn test_scheduled_scenario_once() {
367 let scenario = ChaosScenario::new("test", ChaosConfig::default());
368 let future_time = Utc::now() + Duration::hours(1);
369 let schedule = ScheduleType::Once { at: future_time };
370
371 let scheduled = ScheduledScenario::new("sched1", scenario, schedule);
372
373 assert_eq!(scheduled.id, "sched1");
374 assert!(scheduled.enabled);
375 assert_eq!(scheduled.execution_count, 0);
376 assert!(scheduled.next_execution.is_some());
377 }
378
379 #[test]
380 fn test_scheduled_scenario_periodic() {
381 let scenario = ChaosScenario::new("test", ChaosConfig::default());
382 let schedule = ScheduleType::Periodic {
383 interval_seconds: 60,
384 max_executions: 10,
385 };
386
387 let scheduled = ScheduledScenario::new("sched1", scenario, schedule);
388
389 assert!(scheduled.next_execution.is_some());
390 }
391
392 #[test]
393 fn test_scheduler_add_remove() {
394 let scheduler = ScenarioScheduler::new();
395 let scenario = ChaosScenario::new("test", ChaosConfig::default());
396 let schedule = ScheduleType::Delayed { delay_seconds: 10 };
397
398 let scheduled = ScheduledScenario::new("sched1", scenario, schedule);
399
400 scheduler.add_schedule(scheduled.clone());
401 assert!(scheduler.get_schedule("sched1").is_some());
402
403 let removed = scheduler.remove_schedule("sched1");
404 assert!(removed.is_some());
405 assert!(scheduler.get_schedule("sched1").is_none());
406 }
407
408 #[test]
409 fn test_enable_disable() {
410 let scheduler = ScenarioScheduler::new();
411 let scenario = ChaosScenario::new("test", ChaosConfig::default());
412 let schedule = ScheduleType::Delayed { delay_seconds: 10 };
413
414 let scheduled = ScheduledScenario::new("sched1", scenario, schedule);
415 scheduler.add_schedule(scheduled);
416
417 scheduler.disable_schedule("sched1").unwrap();
418 let s = scheduler.get_schedule("sched1").unwrap();
419 assert!(!s.enabled);
420
421 scheduler.enable_schedule("sched1").unwrap();
422 let s = scheduler.get_schedule("sched1").unwrap();
423 assert!(s.enabled);
424 }
425}