use crate::{config::ChaosConfig, scenarios::ChaosScenario};
use chrono::{DateTime, Utc};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScenarioStep {
pub name: String,
pub scenario: ChaosScenario,
pub duration_seconds: Option<u64>,
pub delay_before_seconds: u64,
pub continue_on_failure: bool,
}
impl ScenarioStep {
pub fn new(name: impl Into<String>, scenario: ChaosScenario) -> Self {
Self {
name: name.into(),
scenario,
duration_seconds: None,
delay_before_seconds: 0,
continue_on_failure: false,
}
}
pub fn with_duration(mut self, seconds: u64) -> Self {
self.duration_seconds = Some(seconds);
self
}
pub fn with_delay_before(mut self, seconds: u64) -> Self {
self.delay_before_seconds = seconds;
self
}
pub fn continue_on_failure(mut self) -> Self {
self.continue_on_failure = true;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrchestratedScenario {
pub name: String,
pub description: Option<String>,
pub steps: Vec<ScenarioStep>,
pub parallel: bool,
pub loop_orchestration: bool,
pub max_iterations: usize,
pub tags: Vec<String>,
}
impl OrchestratedScenario {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
description: None,
steps: Vec::new(),
parallel: false,
loop_orchestration: false,
max_iterations: 1,
tags: Vec::new(),
}
}
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = Some(description.into());
self
}
pub fn add_step(mut self, step: ScenarioStep) -> Self {
self.steps.push(step);
self
}
pub fn with_parallel_execution(mut self) -> Self {
self.parallel = true;
self
}
pub fn with_loop(mut self, max_iterations: usize) -> Self {
self.loop_orchestration = true;
self.max_iterations = max_iterations;
self
}
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = tags;
self
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(self)
}
pub fn to_yaml(&self) -> Result<String, serde_yaml::Error> {
serde_yaml::to_string(self)
}
pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(json)
}
pub fn from_yaml(yaml: &str) -> Result<Self, serde_yaml::Error> {
serde_yaml::from_str(yaml)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OrchestrationStatus {
pub name: String,
pub current_iteration: usize,
pub current_step: usize,
pub total_steps: usize,
pub started_at: DateTime<Utc>,
pub is_running: bool,
pub failed_steps: Vec<String>,
pub progress: f64,
}
pub struct ScenarioOrchestrator {
status: Arc<RwLock<Option<OrchestrationStatus>>>,
active_config: Arc<RwLock<Option<ChaosConfig>>>,
control_tx: Option<mpsc::Sender<OrchestrationControl>>,
}
enum OrchestrationControl {
Stop,
}
impl ScenarioOrchestrator {
pub fn new() -> Self {
Self {
status: Arc::new(RwLock::new(None)),
active_config: Arc::new(RwLock::new(None)),
control_tx: None,
}
}
pub async fn execute(&mut self, orchestrated: OrchestratedScenario) -> Result<(), String> {
{
let status = self.status.read();
if status.is_some() {
return Err("Orchestration already in progress".to_string());
}
}
let orchestration_name = orchestrated.name.clone();
let total_steps = orchestrated.steps.len();
if total_steps == 0 {
return Err("No steps to execute".to_string());
}
info!(
"Starting orchestration '{}' ({} steps, parallel: {})",
orchestration_name, total_steps, orchestrated.parallel
);
{
let mut status = self.status.write();
*status = Some(OrchestrationStatus {
name: orchestration_name.clone(),
current_iteration: 1,
current_step: 0,
total_steps,
started_at: Utc::now(),
is_running: true,
failed_steps: Vec::new(),
progress: 0.0,
});
}
let (control_tx, mut control_rx) = mpsc::channel::<OrchestrationControl>(10);
self.control_tx = Some(control_tx);
let status_arc = Arc::clone(&self.status);
let config_arc = Arc::clone(&self.active_config);
tokio::spawn(async move {
Self::orchestration_task(orchestrated, status_arc, config_arc, &mut control_rx).await;
});
Ok(())
}
async fn orchestration_task(
orchestrated: OrchestratedScenario,
status: Arc<RwLock<Option<OrchestrationStatus>>>,
active_config: Arc<RwLock<Option<ChaosConfig>>>,
control_rx: &mut mpsc::Receiver<OrchestrationControl>,
) {
let max_iterations = if orchestrated.loop_orchestration {
orchestrated.max_iterations
} else {
1
};
let mut iteration = 1;
loop {
if max_iterations > 0 && iteration > max_iterations {
break;
}
info!(
"Starting iteration {}/{} of orchestration '{}'",
iteration,
if max_iterations > 0 {
max_iterations.to_string()
} else {
"∞".to_string()
},
orchestrated.name
);
Self::update_status(&status, |s| s.current_iteration = iteration);
if orchestrated.parallel {
Self::execute_steps_parallel(&orchestrated, &status, &active_config).await;
} else {
if !Self::execute_steps_sequential(
&orchestrated,
&status,
&active_config,
control_rx,
)
.await
{
break;
}
}
iteration += 1;
if !orchestrated.loop_orchestration {
break;
}
}
info!("Orchestration '{}' completed", orchestrated.name);
Self::clear_status(&status);
Self::clear_config(&active_config);
}
async fn execute_steps_sequential(
orchestrated: &OrchestratedScenario,
status: &Arc<RwLock<Option<OrchestrationStatus>>>,
active_config: &Arc<RwLock<Option<ChaosConfig>>>,
control_rx: &mut mpsc::Receiver<OrchestrationControl>,
) -> bool {
for (index, step) in orchestrated.steps.iter().enumerate() {
if let Ok(cmd) = control_rx.try_recv() {
match cmd {
OrchestrationControl::Stop => {
info!("Orchestration stopped");
return false;
}
}
}
let success = Self::execute_step(step, status, active_config).await;
if !success && !step.continue_on_failure {
warn!("Step '{}' failed, stopping orchestration", step.name);
Self::update_status(status, |s| s.failed_steps.push(step.name.clone()));
return false;
}
Self::update_status(status, |s| {
s.current_step = index + 1;
s.progress = (index + 1) as f64 / orchestrated.steps.len() as f64;
});
}
true
}
async fn execute_steps_parallel(
orchestrated: &OrchestratedScenario,
status: &Arc<RwLock<Option<OrchestrationStatus>>>,
active_config: &Arc<RwLock<Option<ChaosConfig>>>,
) {
let mut handles = Vec::new();
for step in &orchestrated.steps {
let step_clone = step.clone();
let status_clone = Arc::clone(status);
let config_clone = Arc::clone(active_config);
let handle = tokio::spawn(async move {
Self::execute_step(&step_clone, &status_clone, &config_clone).await
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
}
async fn execute_step(
step: &ScenarioStep,
_status: &Arc<RwLock<Option<OrchestrationStatus>>>,
active_config: &Arc<RwLock<Option<ChaosConfig>>>,
) -> bool {
info!("Executing step: {}", step.name);
if step.delay_before_seconds > 0 {
debug!("Waiting {}s before step '{}'", step.delay_before_seconds, step.name);
sleep(std::time::Duration::from_secs(step.delay_before_seconds)).await;
}
{
let mut config = active_config.write();
*config = Some(step.scenario.chaos_config.clone());
}
let duration = step.duration_seconds.or(Some(step.scenario.duration_seconds)).unwrap_or(0);
if duration > 0 {
debug!("Running step '{}' for {}s", step.name, duration);
sleep(std::time::Duration::from_secs(duration)).await;
}
info!("Completed step: {}", step.name);
true
}
fn update_status<F>(status: &Arc<RwLock<Option<OrchestrationStatus>>>, f: F)
where
F: FnOnce(&mut OrchestrationStatus),
{
let mut status_guard = status.write();
if let Some(ref mut s) = *status_guard {
f(s);
}
}
fn clear_status(status: &Arc<RwLock<Option<OrchestrationStatus>>>) {
let mut status_guard = status.write();
*status_guard = None;
}
fn clear_config(config: &Arc<RwLock<Option<ChaosConfig>>>) {
let mut config_guard = config.write();
*config_guard = None;
}
pub fn get_status(&self) -> Option<OrchestrationStatus> {
self.status.read().clone()
}
pub fn get_active_config(&self) -> Option<ChaosConfig> {
self.active_config.read().clone()
}
pub fn is_running(&self) -> bool {
self.status.read().is_some()
}
pub async fn stop(&self) -> Result<(), String> {
if let Some(ref tx) = self.control_tx {
tx.send(OrchestrationControl::Stop)
.await
.map_err(|e| format!("Failed to stop: {}", e))?;
Ok(())
} else {
Err("No orchestration in progress".to_string())
}
}
}
impl Default for ScenarioOrchestrator {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_scenario_step_creation() {
let scenario = ChaosScenario::new("test", ChaosConfig::default());
let step = ScenarioStep::new("step1", scenario);
assert_eq!(step.name, "step1");
assert_eq!(step.delay_before_seconds, 0);
assert!(!step.continue_on_failure);
}
#[test]
fn test_orchestrated_scenario_creation() {
let orchestrated = OrchestratedScenario::new("test_orchestration");
assert_eq!(orchestrated.name, "test_orchestration");
assert_eq!(orchestrated.steps.len(), 0);
assert!(!orchestrated.parallel);
assert!(!orchestrated.loop_orchestration);
}
#[test]
fn test_add_steps() {
let scenario1 = ChaosScenario::new("scenario1", ChaosConfig::default());
let scenario2 = ChaosScenario::new("scenario2", ChaosConfig::default());
let step1 = ScenarioStep::new("step1", scenario1);
let step2 = ScenarioStep::new("step2", scenario2);
let orchestrated = OrchestratedScenario::new("test").add_step(step1).add_step(step2);
assert_eq!(orchestrated.steps.len(), 2);
}
#[test]
fn test_json_export_import() {
let scenario = ChaosScenario::new("test", ChaosConfig::default());
let step = ScenarioStep::new("step1", scenario);
let orchestrated = OrchestratedScenario::new("test_orchestration")
.with_description("Test description")
.add_step(step);
let json = orchestrated.to_json().unwrap();
let imported = OrchestratedScenario::from_json(&json).unwrap();
assert_eq!(imported.name, "test_orchestration");
assert_eq!(imported.steps.len(), 1);
}
#[tokio::test]
async fn test_orchestrator_creation() {
let orchestrator = ScenarioOrchestrator::new();
assert!(!orchestrator.is_running());
}
}