mockforge_chaos/
scenario_orchestrator.rs

1//! Scenario orchestration for composing and chaining chaos scenarios
2
3use crate::{config::ChaosConfig, scenarios::ChaosScenario};
4use chrono::{DateTime, Utc};
5use parking_lot::RwLock;
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use tokio::sync::mpsc;
9use tokio::time::sleep;
10use tracing::{debug, info, warn};
11
12/// Scenario step in an orchestration
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct ScenarioStep {
15    /// Step name
16    pub name: String,
17    /// Scenario to execute
18    pub scenario: ChaosScenario,
19    /// Duration in seconds (0 = use scenario's duration)
20    pub duration_seconds: Option<u64>,
21    /// Delay before starting this step (in seconds)
22    pub delay_before_seconds: u64,
23    /// Continue on failure
24    pub continue_on_failure: bool,
25}
26
27impl ScenarioStep {
28    /// Create a new scenario step
29    pub fn new(name: impl Into<String>, scenario: ChaosScenario) -> Self {
30        Self {
31            name: name.into(),
32            scenario,
33            duration_seconds: None,
34            delay_before_seconds: 0,
35            continue_on_failure: false,
36        }
37    }
38
39    /// Set duration
40    pub fn with_duration(mut self, seconds: u64) -> Self {
41        self.duration_seconds = Some(seconds);
42        self
43    }
44
45    /// Set delay before step
46    pub fn with_delay_before(mut self, seconds: u64) -> Self {
47        self.delay_before_seconds = seconds;
48        self
49    }
50
51    /// Set continue on failure
52    pub fn continue_on_failure(mut self) -> Self {
53        self.continue_on_failure = true;
54        self
55    }
56}
57
58/// Orchestrated scenario composition
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct OrchestratedScenario {
61    /// Orchestration name
62    pub name: String,
63    /// Description
64    pub description: Option<String>,
65    /// Steps to execute in order
66    pub steps: Vec<ScenarioStep>,
67    /// Execute steps in parallel
68    pub parallel: bool,
69    /// Loop the orchestration
70    pub loop_orchestration: bool,
71    /// Maximum iterations (0 = infinite)
72    pub max_iterations: usize,
73    /// Tags
74    pub tags: Vec<String>,
75}
76
77impl OrchestratedScenario {
78    /// Create a new orchestrated scenario
79    pub fn new(name: impl Into<String>) -> Self {
80        Self {
81            name: name.into(),
82            description: None,
83            steps: Vec::new(),
84            parallel: false,
85            loop_orchestration: false,
86            max_iterations: 1,
87            tags: Vec::new(),
88        }
89    }
90
91    /// Set description
92    pub fn with_description(mut self, description: impl Into<String>) -> Self {
93        self.description = Some(description.into());
94        self
95    }
96
97    /// Add a step
98    pub fn add_step(mut self, step: ScenarioStep) -> Self {
99        self.steps.push(step);
100        self
101    }
102
103    /// Execute steps in parallel
104    pub fn with_parallel_execution(mut self) -> Self {
105        self.parallel = true;
106        self
107    }
108
109    /// Loop the orchestration
110    pub fn with_loop(mut self, max_iterations: usize) -> Self {
111        self.loop_orchestration = true;
112        self.max_iterations = max_iterations;
113        self
114    }
115
116    /// Add tags
117    pub fn with_tags(mut self, tags: Vec<String>) -> Self {
118        self.tags = tags;
119        self
120    }
121
122    /// Export to JSON
123    pub fn to_json(&self) -> Result<String, serde_json::Error> {
124        serde_json::to_string_pretty(self)
125    }
126
127    /// Export to YAML
128    pub fn to_yaml(&self) -> Result<String, serde_yaml::Error> {
129        serde_yaml::to_string(self)
130    }
131
132    /// Import from JSON
133    pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
134        serde_json::from_str(json)
135    }
136
137    /// Import from YAML
138    pub fn from_yaml(yaml: &str) -> Result<Self, serde_yaml::Error> {
139        serde_yaml::from_str(yaml)
140    }
141}
142
143/// Orchestration execution status
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct OrchestrationStatus {
146    /// Orchestration name
147    pub name: String,
148    /// Current iteration
149    pub current_iteration: usize,
150    /// Current step index
151    pub current_step: usize,
152    /// Total steps
153    pub total_steps: usize,
154    /// Execution start time
155    pub started_at: DateTime<Utc>,
156    /// Is currently running
157    pub is_running: bool,
158    /// Failed steps
159    pub failed_steps: Vec<String>,
160    /// Progress (0.0 - 1.0)
161    pub progress: f64,
162}
163
164/// Scenario orchestrator
165pub struct ScenarioOrchestrator {
166    /// Current orchestration status
167    status: Arc<RwLock<Option<OrchestrationStatus>>>,
168    /// Active config (current step's config)
169    active_config: Arc<RwLock<Option<ChaosConfig>>>,
170    /// Control channel
171    control_tx: Option<mpsc::Sender<OrchestrationControl>>,
172}
173
174/// Orchestration control commands
175enum OrchestrationControl {
176    Stop,
177}
178
179impl ScenarioOrchestrator {
180    /// Create a new orchestrator
181    pub fn new() -> Self {
182        Self {
183            status: Arc::new(RwLock::new(None)),
184            active_config: Arc::new(RwLock::new(None)),
185            control_tx: None,
186        }
187    }
188
189    /// Execute an orchestrated scenario
190    pub async fn execute(&mut self, orchestrated: OrchestratedScenario) -> Result<(), String> {
191        // Check if already running
192        {
193            let status = self.status.read();
194            if status.is_some() {
195                return Err("Orchestration already in progress".to_string());
196            }
197        }
198
199        let orchestration_name = orchestrated.name.clone();
200        let total_steps = orchestrated.steps.len();
201
202        if total_steps == 0 {
203            return Err("No steps to execute".to_string());
204        }
205
206        info!(
207            "Starting orchestration '{}' ({} steps, parallel: {})",
208            orchestration_name, total_steps, orchestrated.parallel
209        );
210
211        // Initialize status
212        {
213            let mut status = self.status.write();
214            *status = Some(OrchestrationStatus {
215                name: orchestration_name.clone(),
216                current_iteration: 1,
217                current_step: 0,
218                total_steps,
219                started_at: Utc::now(),
220                is_running: true,
221                failed_steps: Vec::new(),
222                progress: 0.0,
223            });
224        }
225
226        // Create control channel
227        let (control_tx, mut control_rx) = mpsc::channel::<OrchestrationControl>(10);
228        self.control_tx = Some(control_tx);
229
230        // Clone Arc for the async task
231        let status_arc = Arc::clone(&self.status);
232        let config_arc = Arc::clone(&self.active_config);
233
234        // Spawn orchestration task
235        tokio::spawn(async move {
236            Self::orchestration_task(orchestrated, status_arc, config_arc, &mut control_rx).await;
237        });
238
239        Ok(())
240    }
241
242    /// Orchestration task (runs in background)
243    async fn orchestration_task(
244        orchestrated: OrchestratedScenario,
245        status: Arc<RwLock<Option<OrchestrationStatus>>>,
246        active_config: Arc<RwLock<Option<ChaosConfig>>>,
247        control_rx: &mut mpsc::Receiver<OrchestrationControl>,
248    ) {
249        let max_iterations = if orchestrated.loop_orchestration {
250            orchestrated.max_iterations
251        } else {
252            1
253        };
254
255        let mut iteration = 1;
256
257        loop {
258            // Check if reached max iterations
259            if max_iterations > 0 && iteration > max_iterations {
260                break;
261            }
262
263            info!(
264                "Starting iteration {}/{} of orchestration '{}'",
265                iteration,
266                if max_iterations > 0 {
267                    max_iterations.to_string()
268                } else {
269                    "∞".to_string()
270                },
271                orchestrated.name
272            );
273
274            // Update iteration
275            Self::update_status(&status, |s| s.current_iteration = iteration);
276
277            if orchestrated.parallel {
278                // Execute steps in parallel
279                Self::execute_steps_parallel(&orchestrated, &status, &active_config).await;
280            } else {
281                // Execute steps sequentially
282                if !Self::execute_steps_sequential(
283                    &orchestrated,
284                    &status,
285                    &active_config,
286                    control_rx,
287                )
288                .await
289                {
290                    // Stopped by control command
291                    break;
292                }
293            }
294
295            iteration += 1;
296
297            // Check if should loop
298            if !orchestrated.loop_orchestration {
299                break;
300            }
301        }
302
303        info!("Orchestration '{}' completed", orchestrated.name);
304        Self::clear_status(&status);
305        Self::clear_config(&active_config);
306    }
307
308    /// Execute steps sequentially
309    async fn execute_steps_sequential(
310        orchestrated: &OrchestratedScenario,
311        status: &Arc<RwLock<Option<OrchestrationStatus>>>,
312        active_config: &Arc<RwLock<Option<ChaosConfig>>>,
313        control_rx: &mut mpsc::Receiver<OrchestrationControl>,
314    ) -> bool {
315        for (index, step) in orchestrated.steps.iter().enumerate() {
316            // Check for control commands
317            if let Ok(cmd) = control_rx.try_recv() {
318                match cmd {
319                    OrchestrationControl::Stop => {
320                        info!("Orchestration stopped");
321                        return false;
322                    }
323                }
324            }
325
326            // Execute step
327            let success = Self::execute_step(step, status, active_config).await;
328
329            if !success && !step.continue_on_failure {
330                warn!("Step '{}' failed, stopping orchestration", step.name);
331                Self::update_status(status, |s| s.failed_steps.push(step.name.clone()));
332                return false;
333            }
334
335            // Update progress
336            Self::update_status(status, |s| {
337                s.current_step = index + 1;
338                s.progress = (index + 1) as f64 / orchestrated.steps.len() as f64;
339            });
340        }
341
342        true
343    }
344
345    /// Execute steps in parallel
346    async fn execute_steps_parallel(
347        orchestrated: &OrchestratedScenario,
348        status: &Arc<RwLock<Option<OrchestrationStatus>>>,
349        active_config: &Arc<RwLock<Option<ChaosConfig>>>,
350    ) {
351        let mut handles = Vec::new();
352
353        for step in &orchestrated.steps {
354            let step_clone = step.clone();
355            let status_clone = Arc::clone(status);
356            let config_clone = Arc::clone(active_config);
357
358            let handle = tokio::spawn(async move {
359                Self::execute_step(&step_clone, &status_clone, &config_clone).await
360            });
361
362            handles.push(handle);
363        }
364
365        // Wait for all steps to complete
366        for handle in handles {
367            let _ = handle.await;
368        }
369    }
370
371    /// Execute a single step
372    async fn execute_step(
373        step: &ScenarioStep,
374        _status: &Arc<RwLock<Option<OrchestrationStatus>>>,
375        active_config: &Arc<RwLock<Option<ChaosConfig>>>,
376    ) -> bool {
377        info!("Executing step: {}", step.name);
378
379        // Delay before step
380        if step.delay_before_seconds > 0 {
381            debug!("Waiting {}s before step '{}'", step.delay_before_seconds, step.name);
382            sleep(std::time::Duration::from_secs(step.delay_before_seconds)).await;
383        }
384
385        // Apply the step's chaos config
386        {
387            let mut config = active_config.write();
388            *config = Some(step.scenario.chaos_config.clone());
389        }
390
391        // Determine duration
392        let duration = step.duration_seconds.or(Some(step.scenario.duration_seconds)).unwrap_or(0);
393
394        if duration > 0 {
395            debug!("Running step '{}' for {}s", step.name, duration);
396            sleep(std::time::Duration::from_secs(duration)).await;
397        }
398
399        info!("Completed step: {}", step.name);
400        true
401    }
402
403    /// Update status
404    fn update_status<F>(status: &Arc<RwLock<Option<OrchestrationStatus>>>, f: F)
405    where
406        F: FnOnce(&mut OrchestrationStatus),
407    {
408        let mut status_guard = status.write();
409        if let Some(ref mut s) = *status_guard {
410            f(s);
411        }
412    }
413
414    /// Clear status
415    fn clear_status(status: &Arc<RwLock<Option<OrchestrationStatus>>>) {
416        let mut status_guard = status.write();
417        *status_guard = None;
418    }
419
420    /// Clear config
421    fn clear_config(config: &Arc<RwLock<Option<ChaosConfig>>>) {
422        let mut config_guard = config.write();
423        *config_guard = None;
424    }
425
426    /// Get current orchestration status
427    pub fn get_status(&self) -> Option<OrchestrationStatus> {
428        self.status.read().clone()
429    }
430
431    /// Get currently active chaos config
432    pub fn get_active_config(&self) -> Option<ChaosConfig> {
433        self.active_config.read().clone()
434    }
435
436    /// Check if orchestration is running
437    pub fn is_running(&self) -> bool {
438        self.status.read().is_some()
439    }
440
441    /// Stop orchestration
442    pub async fn stop(&self) -> Result<(), String> {
443        if let Some(ref tx) = self.control_tx {
444            tx.send(OrchestrationControl::Stop)
445                .await
446                .map_err(|e| format!("Failed to stop: {}", e))?;
447            Ok(())
448        } else {
449            Err("No orchestration in progress".to_string())
450        }
451    }
452}
453
454impl Default for ScenarioOrchestrator {
455    fn default() -> Self {
456        Self::new()
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463
464    #[test]
465    fn test_scenario_step_creation() {
466        let scenario = ChaosScenario::new("test", ChaosConfig::default());
467        let step = ScenarioStep::new("step1", scenario);
468
469        assert_eq!(step.name, "step1");
470        assert_eq!(step.delay_before_seconds, 0);
471        assert!(!step.continue_on_failure);
472    }
473
474    #[test]
475    fn test_orchestrated_scenario_creation() {
476        let orchestrated = OrchestratedScenario::new("test_orchestration");
477
478        assert_eq!(orchestrated.name, "test_orchestration");
479        assert_eq!(orchestrated.steps.len(), 0);
480        assert!(!orchestrated.parallel);
481        assert!(!orchestrated.loop_orchestration);
482    }
483
484    #[test]
485    fn test_add_steps() {
486        let scenario1 = ChaosScenario::new("scenario1", ChaosConfig::default());
487        let scenario2 = ChaosScenario::new("scenario2", ChaosConfig::default());
488
489        let step1 = ScenarioStep::new("step1", scenario1);
490        let step2 = ScenarioStep::new("step2", scenario2);
491
492        let orchestrated = OrchestratedScenario::new("test").add_step(step1).add_step(step2);
493
494        assert_eq!(orchestrated.steps.len(), 2);
495    }
496
497    #[test]
498    fn test_json_export_import() {
499        let scenario = ChaosScenario::new("test", ChaosConfig::default());
500        let step = ScenarioStep::new("step1", scenario);
501
502        let orchestrated = OrchestratedScenario::new("test_orchestration")
503            .with_description("Test description")
504            .add_step(step);
505
506        let json = orchestrated.to_json().unwrap();
507        let imported = OrchestratedScenario::from_json(&json).unwrap();
508
509        assert_eq!(imported.name, "test_orchestration");
510        assert_eq!(imported.steps.len(), 1);
511    }
512
513    #[tokio::test]
514    async fn test_orchestrator_creation() {
515        let orchestrator = ScenarioOrchestrator::new();
516        assert!(!orchestrator.is_running());
517    }
518}