1use crate::{config::ChaosConfig, scenarios::ChaosScenario};
4use chrono::{DateTime, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::time::sleep;
10use tracing::{debug, info, warn};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct ScenarioStep {
15 pub name: String,
17 pub scenario: ChaosScenario,
19 pub duration_seconds: Option<u64>,
21 pub delay_before_seconds: u64,
23 pub continue_on_failure: bool,
25}
26
27impl ScenarioStep {
28 pub fn new(name: impl Into<String>, scenario: ChaosScenario) -> Self {
30 Self {
31 name: name.into(),
32 scenario,
33 duration_seconds: None,
34 delay_before_seconds: 0,
35 continue_on_failure: false,
36 }
37 }
38
39 pub fn with_duration(mut self, seconds: u64) -> Self {
41 self.duration_seconds = Some(seconds);
42 self
43 }
44
45 pub fn with_delay_before(mut self, seconds: u64) -> Self {
47 self.delay_before_seconds = seconds;
48 self
49 }
50
51 pub fn continue_on_failure(mut self) -> Self {
53 self.continue_on_failure = true;
54 self
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct OrchestratedScenario {
61 pub name: String,
63 pub description: Option<String>,
65 pub steps: Vec<ScenarioStep>,
67 pub parallel: bool,
69 pub loop_orchestration: bool,
71 pub max_iterations: usize,
73 pub tags: Vec<String>,
75}
76
77impl OrchestratedScenario {
78 pub fn new(name: impl Into<String>) -> Self {
80 Self {
81 name: name.into(),
82 description: None,
83 steps: Vec::new(),
84 parallel: false,
85 loop_orchestration: false,
86 max_iterations: 1,
87 tags: Vec::new(),
88 }
89 }
90
91 pub fn with_description(mut self, description: impl Into<String>) -> Self {
93 self.description = Some(description.into());
94 self
95 }
96
97 pub fn add_step(mut self, step: ScenarioStep) -> Self {
99 self.steps.push(step);
100 self
101 }
102
103 pub fn with_parallel_execution(mut self) -> Self {
105 self.parallel = true;
106 self
107 }
108
109 pub fn with_loop(mut self, max_iterations: usize) -> Self {
111 self.loop_orchestration = true;
112 self.max_iterations = max_iterations;
113 self
114 }
115
116 pub fn with_tags(mut self, tags: Vec<String>) -> Self {
118 self.tags = tags;
119 self
120 }
121
122 pub fn to_json(&self) -> Result<String, serde_json::Error> {
124 serde_json::to_string_pretty(self)
125 }
126
127 pub fn to_yaml(&self) -> Result<String, serde_yaml::Error> {
129 serde_yaml::to_string(self)
130 }
131
132 pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
134 serde_json::from_str(json)
135 }
136
137 pub fn from_yaml(yaml: &str) -> Result<Self, serde_yaml::Error> {
139 serde_yaml::from_str(yaml)
140 }
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct OrchestrationStatus {
146 pub name: String,
148 pub current_iteration: usize,
150 pub current_step: usize,
152 pub total_steps: usize,
154 pub started_at: DateTime<Utc>,
156 pub is_running: bool,
158 pub failed_steps: Vec<String>,
160 pub progress: f64,
162}
163
164pub struct ScenarioOrchestrator {
166 status: Arc<RwLock<Option<OrchestrationStatus>>>,
168 active_config: Arc<RwLock<Option<ChaosConfig>>>,
170 control_tx: Option<mpsc::Sender<OrchestrationControl>>,
172}
173
174enum OrchestrationControl {
176 Stop,
177}
178
179impl ScenarioOrchestrator {
180 pub fn new() -> Self {
182 Self {
183 status: Arc::new(RwLock::new(None)),
184 active_config: Arc::new(RwLock::new(None)),
185 control_tx: None,
186 }
187 }
188
189 pub async fn execute(&mut self, orchestrated: OrchestratedScenario) -> Result<(), String> {
191 {
193 let status = self.status.read();
194 if status.is_some() {
195 return Err("Orchestration already in progress".to_string());
196 }
197 }
198
199 let orchestration_name = orchestrated.name.clone();
200 let total_steps = orchestrated.steps.len();
201
202 if total_steps == 0 {
203 return Err("No steps to execute".to_string());
204 }
205
206 info!(
207 "Starting orchestration '{}' ({} steps, parallel: {})",
208 orchestration_name, total_steps, orchestrated.parallel
209 );
210
211 {
213 let mut status = self.status.write();
214 *status = Some(OrchestrationStatus {
215 name: orchestration_name.clone(),
216 current_iteration: 1,
217 current_step: 0,
218 total_steps,
219 started_at: Utc::now(),
220 is_running: true,
221 failed_steps: Vec::new(),
222 progress: 0.0,
223 });
224 }
225
226 let (control_tx, mut control_rx) = mpsc::channel::<OrchestrationControl>(10);
228 self.control_tx = Some(control_tx);
229
230 let status_arc = Arc::clone(&self.status);
232 let config_arc = Arc::clone(&self.active_config);
233
234 tokio::spawn(async move {
236 Self::orchestration_task(orchestrated, status_arc, config_arc, &mut control_rx).await;
237 });
238
239 Ok(())
240 }
241
242 async fn orchestration_task(
244 orchestrated: OrchestratedScenario,
245 status: Arc<RwLock<Option<OrchestrationStatus>>>,
246 active_config: Arc<RwLock<Option<ChaosConfig>>>,
247 control_rx: &mut mpsc::Receiver<OrchestrationControl>,
248 ) {
249 let max_iterations = if orchestrated.loop_orchestration {
250 orchestrated.max_iterations
251 } else {
252 1
253 };
254
255 let mut iteration = 1;
256
257 loop {
258 if max_iterations > 0 && iteration > max_iterations {
260 break;
261 }
262
263 info!(
264 "Starting iteration {}/{} of orchestration '{}'",
265 iteration,
266 if max_iterations > 0 {
267 max_iterations.to_string()
268 } else {
269 "∞".to_string()
270 },
271 orchestrated.name
272 );
273
274 Self::update_status(&status, |s| s.current_iteration = iteration);
276
277 if orchestrated.parallel {
278 Self::execute_steps_parallel(&orchestrated, &status, &active_config).await;
280 } else {
281 if !Self::execute_steps_sequential(
283 &orchestrated,
284 &status,
285 &active_config,
286 control_rx,
287 )
288 .await
289 {
290 break;
292 }
293 }
294
295 iteration += 1;
296
297 if !orchestrated.loop_orchestration {
299 break;
300 }
301 }
302
303 info!("Orchestration '{}' completed", orchestrated.name);
304 Self::clear_status(&status);
305 Self::clear_config(&active_config);
306 }
307
308 async fn execute_steps_sequential(
310 orchestrated: &OrchestratedScenario,
311 status: &Arc<RwLock<Option<OrchestrationStatus>>>,
312 active_config: &Arc<RwLock<Option<ChaosConfig>>>,
313 control_rx: &mut mpsc::Receiver<OrchestrationControl>,
314 ) -> bool {
315 for (index, step) in orchestrated.steps.iter().enumerate() {
316 if let Ok(cmd) = control_rx.try_recv() {
318 match cmd {
319 OrchestrationControl::Stop => {
320 info!("Orchestration stopped");
321 return false;
322 }
323 }
324 }
325
326 let success = Self::execute_step(step, status, active_config).await;
328
329 if !success && !step.continue_on_failure {
330 warn!("Step '{}' failed, stopping orchestration", step.name);
331 Self::update_status(status, |s| s.failed_steps.push(step.name.clone()));
332 return false;
333 }
334
335 Self::update_status(status, |s| {
337 s.current_step = index + 1;
338 s.progress = (index + 1) as f64 / orchestrated.steps.len() as f64;
339 });
340 }
341
342 true
343 }
344
345 async fn execute_steps_parallel(
347 orchestrated: &OrchestratedScenario,
348 status: &Arc<RwLock<Option<OrchestrationStatus>>>,
349 active_config: &Arc<RwLock<Option<ChaosConfig>>>,
350 ) {
351 let mut handles = Vec::new();
352
353 for step in &orchestrated.steps {
354 let step_clone = step.clone();
355 let status_clone = Arc::clone(status);
356 let config_clone = Arc::clone(active_config);
357
358 let handle = tokio::spawn(async move {
359 Self::execute_step(&step_clone, &status_clone, &config_clone).await
360 });
361
362 handles.push(handle);
363 }
364
365 for handle in handles {
367 let _ = handle.await;
368 }
369 }
370
371 async fn execute_step(
373 step: &ScenarioStep,
374 _status: &Arc<RwLock<Option<OrchestrationStatus>>>,
375 active_config: &Arc<RwLock<Option<ChaosConfig>>>,
376 ) -> bool {
377 info!("Executing step: {}", step.name);
378
379 if step.delay_before_seconds > 0 {
381 debug!("Waiting {}s before step '{}'", step.delay_before_seconds, step.name);
382 sleep(std::time::Duration::from_secs(step.delay_before_seconds)).await;
383 }
384
385 {
387 let mut config = active_config.write();
388 *config = Some(step.scenario.chaos_config.clone());
389 }
390
391 let duration = step.duration_seconds.or(Some(step.scenario.duration_seconds)).unwrap_or(0);
393
394 if duration > 0 {
395 debug!("Running step '{}' for {}s", step.name, duration);
396 sleep(std::time::Duration::from_secs(duration)).await;
397 }
398
399 info!("Completed step: {}", step.name);
400 true
401 }
402
403 fn update_status<F>(status: &Arc<RwLock<Option<OrchestrationStatus>>>, f: F)
405 where
406 F: FnOnce(&mut OrchestrationStatus),
407 {
408 let mut status_guard = status.write();
409 if let Some(ref mut s) = *status_guard {
410 f(s);
411 }
412 }
413
414 fn clear_status(status: &Arc<RwLock<Option<OrchestrationStatus>>>) {
416 let mut status_guard = status.write();
417 *status_guard = None;
418 }
419
420 fn clear_config(config: &Arc<RwLock<Option<ChaosConfig>>>) {
422 let mut config_guard = config.write();
423 *config_guard = None;
424 }
425
426 pub fn get_status(&self) -> Option<OrchestrationStatus> {
428 self.status.read().clone()
429 }
430
431 pub fn get_active_config(&self) -> Option<ChaosConfig> {
433 self.active_config.read().clone()
434 }
435
436 pub fn is_running(&self) -> bool {
438 self.status.read().is_some()
439 }
440
441 pub async fn stop(&self) -> Result<(), String> {
443 if let Some(ref tx) = self.control_tx {
444 tx.send(OrchestrationControl::Stop)
445 .await
446 .map_err(|e| format!("Failed to stop: {}", e))?;
447 Ok(())
448 } else {
449 Err("No orchestration in progress".to_string())
450 }
451 }
452}
453
454impl Default for ScenarioOrchestrator {
455 fn default() -> Self {
456 Self::new()
457 }
458}
459
460#[cfg(test)]
461mod tests {
462 use super::*;
463
464 #[test]
465 fn test_scenario_step_creation() {
466 let scenario = ChaosScenario::new("test", ChaosConfig::default());
467 let step = ScenarioStep::new("step1", scenario);
468
469 assert_eq!(step.name, "step1");
470 assert_eq!(step.delay_before_seconds, 0);
471 assert!(!step.continue_on_failure);
472 }
473
474 #[test]
475 fn test_orchestrated_scenario_creation() {
476 let orchestrated = OrchestratedScenario::new("test_orchestration");
477
478 assert_eq!(orchestrated.name, "test_orchestration");
479 assert_eq!(orchestrated.steps.len(), 0);
480 assert!(!orchestrated.parallel);
481 assert!(!orchestrated.loop_orchestration);
482 }
483
484 #[test]
485 fn test_add_steps() {
486 let scenario1 = ChaosScenario::new("scenario1", ChaosConfig::default());
487 let scenario2 = ChaosScenario::new("scenario2", ChaosConfig::default());
488
489 let step1 = ScenarioStep::new("step1", scenario1);
490 let step2 = ScenarioStep::new("step2", scenario2);
491
492 let orchestrated = OrchestratedScenario::new("test").add_step(step1).add_step(step2);
493
494 assert_eq!(orchestrated.steps.len(), 2);
495 }
496
497 #[test]
498 fn test_json_export_import() {
499 let scenario = ChaosScenario::new("test", ChaosConfig::default());
500 let step = ScenarioStep::new("step1", scenario);
501
502 let orchestrated = OrchestratedScenario::new("test_orchestration")
503 .with_description("Test description")
504 .add_step(step);
505
506 let json = orchestrated.to_json().unwrap();
507 let imported = OrchestratedScenario::from_json(&json).unwrap();
508
509 assert_eq!(imported.name, "test_orchestration");
510 assert_eq!(imported.steps.len(), 1);
511 }
512
513 #[tokio::test]
514 async fn test_orchestrator_creation() {
515 let orchestrator = ScenarioOrchestrator::new();
516 assert!(!orchestrator.is_running());
517 }
518}