Skip to main content

mabi_chaos/
engine.rs

1//! Chaos engine for orchestrating fault injection.
2
3use std::sync::Arc;
4use std::time::Instant;
5
6use parking_lot::RwLock;
7use serde::{Deserialize, Serialize};
8use tokio::sync::broadcast;
9
10use crate::context::FaultContext;
11use crate::error::{ChaosError, ChaosResult};
12use crate::fault::{BoxedFault, Fault, FaultBehavior, FaultStatistics};
13use crate::registry::{FaultFilter, FaultRegistry};
14use crate::scheduler::{ChaosEvent, ChaosSchedule, ChaosScheduler, ChaosType};
15
16// =============================================================================
17// Engine State
18// =============================================================================
19
20/// State of the chaos engine.
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
22#[serde(rename_all = "lowercase")]
23pub enum EngineState {
24    /// Engine is stopped.
25    #[default]
26    Stopped,
27
28    /// Engine is running.
29    Running,
30
31    /// Engine is paused.
32    Paused,
33}
34
35// =============================================================================
36// Engine Event
37// =============================================================================
38
39/// Events emitted by the chaos engine.
40#[derive(Debug, Clone)]
41pub enum EngineEvent {
42    /// Engine started.
43    Started,
44
45    /// Engine stopped.
46    Stopped,
47
48    /// Engine paused.
49    Paused,
50
51    /// Engine resumed.
52    Resumed,
53
54    /// Fault registered.
55    FaultRegistered { fault_id: String },
56
57    /// Fault unregistered.
58    FaultUnregistered { fault_id: String },
59
60    /// Fault activated.
61    FaultActivated { fault_id: String, target: String },
62
63    /// Fault deactivated.
64    FaultDeactivated { fault_id: String, target: String },
65
66    /// Fault applied.
67    FaultApplied {
68        fault_id: String,
69        target: String,
70        behavior: FaultBehavior,
71    },
72
73    /// Schedule event.
74    ScheduleEvent(ChaosEvent),
75
76    /// Error occurred.
77    Error { message: String },
78}
79
80// =============================================================================
81// Chaos Engine
82// =============================================================================
83
84/// Central orchestrator for chaos engineering.
85///
86/// The `ChaosEngine` manages fault registration, activation, and application.
87/// It coordinates between the fault registry and scheduler.
88///
89/// # Example
90///
91/// ```rust,ignore
92/// use mabi_chaos::prelude::*;
93///
94/// // Create engine
95/// let engine = ChaosEngine::builder()
96///     .add_fault("latency", latency_fault)
97///     .add_fault("loss", loss_fault)
98///     .build();
99///
100/// // Start the engine
101/// engine.start().await?;
102///
103/// // Enable faults for targets
104/// engine.enable("latency", "device-*").await?;
105///
106/// // Process a request through chaos
107/// let behavior = engine.process(&mut context).await?;
108/// ```
109#[derive(Debug)]
110pub struct ChaosEngine {
111    /// Fault registry.
112    registry: Arc<FaultRegistry>,
113
114    /// Optional scheduler.
115    scheduler: Arc<RwLock<Option<ChaosScheduler>>>,
116
117    /// Engine state.
118    state: Arc<RwLock<EngineState>>,
119
120    /// Event broadcaster.
121    event_tx: broadcast::Sender<EngineEvent>,
122
123    /// Start time.
124    started_at: Arc<RwLock<Option<Instant>>>,
125
126    /// Whether to continue on fault errors.
127    continue_on_error: bool,
128}
129
130impl ChaosEngine {
131    /// Create a new chaos engine.
132    pub fn new() -> Self {
133        let (event_tx, _) = broadcast::channel(1024);
134        Self {
135            registry: Arc::new(FaultRegistry::new()),
136            scheduler: Arc::new(RwLock::new(None)),
137            state: Arc::new(RwLock::new(EngineState::Stopped)),
138            event_tx,
139            started_at: Arc::new(RwLock::new(None)),
140            continue_on_error: true,
141        }
142    }
143
144    /// Create a builder.
145    pub fn builder() -> ChaosEngineBuilder {
146        ChaosEngineBuilder::new()
147    }
148
149    /// Get the fault registry.
150    pub fn registry(&self) -> &FaultRegistry {
151        &self.registry
152    }
153
154    /// Get current state.
155    pub fn state(&self) -> EngineState {
156        *self.state.read()
157    }
158
159    /// Check if running.
160    pub fn is_running(&self) -> bool {
161        *self.state.read() == EngineState::Running
162    }
163
164    /// Subscribe to events.
165    pub fn subscribe(&self) -> broadcast::Receiver<EngineEvent> {
166        self.event_tx.subscribe()
167    }
168
169    /// Start the engine.
170    pub async fn start(&self) -> ChaosResult<()> {
171        let mut state = self.state.write();
172        if *state == EngineState::Running {
173            return Err(ChaosError::EngineAlreadyRunning);
174        }
175
176        *state = EngineState::Running;
177        *self.started_at.write() = Some(Instant::now());
178
179        // Start scheduler if present
180        if let Some(ref mut scheduler) = *self.scheduler.write() {
181            scheduler.start();
182        }
183
184        let _ = self.event_tx.send(EngineEvent::Started);
185        tracing::info!("Chaos engine started");
186
187        Ok(())
188    }
189
190    /// Stop the engine.
191    pub async fn stop(&self) -> ChaosResult<()> {
192        let mut state = self.state.write();
193        if *state == EngineState::Stopped {
194            return Err(ChaosError::EngineNotRunning);
195        }
196
197        // Stop scheduler
198        if let Some(ref mut scheduler) = *self.scheduler.write() {
199            scheduler.stop();
200        }
201
202        *state = EngineState::Stopped;
203        *self.started_at.write() = None;
204
205        // Reset all faults
206        self.registry.reset_all();
207
208        let _ = self.event_tx.send(EngineEvent::Stopped);
209        tracing::info!("Chaos engine stopped");
210
211        Ok(())
212    }
213
214    /// Pause the engine.
215    pub fn pause(&self) -> ChaosResult<()> {
216        let mut state = self.state.write();
217        if *state != EngineState::Running {
218            return Err(ChaosError::EngineNotRunning);
219        }
220
221        *state = EngineState::Paused;
222        let _ = self.event_tx.send(EngineEvent::Paused);
223        tracing::info!("Chaos engine paused");
224
225        Ok(())
226    }
227
228    /// Resume the engine.
229    pub fn resume(&self) -> ChaosResult<()> {
230        let mut state = self.state.write();
231        if *state != EngineState::Paused {
232            return Err(ChaosError::InvalidStateTransition {
233                from: format!("{:?}", *state),
234                to: "Running".to_string(),
235            });
236        }
237
238        *state = EngineState::Running;
239        let _ = self.event_tx.send(EngineEvent::Resumed);
240        tracing::info!("Chaos engine resumed");
241
242        Ok(())
243    }
244
245    /// Register a fault.
246    pub fn register(&self, id: impl Into<String>, fault: BoxedFault) -> ChaosResult<()> {
247        let id = id.into();
248        self.registry.register(&id, fault)?;
249        let _ = self.event_tx.send(EngineEvent::FaultRegistered {
250            fault_id: id,
251        });
252        Ok(())
253    }
254
255    /// Unregister a fault.
256    pub fn unregister(&self, id: &str) -> ChaosResult<BoxedFault> {
257        let fault = self.registry.unregister(id)?;
258        let _ = self.event_tx.send(EngineEvent::FaultUnregistered {
259            fault_id: id.to_string(),
260        });
261        Ok(fault)
262    }
263
264    /// Enable a fault for a target.
265    pub async fn enable(&self, fault_id: &str, target: impl Into<String>) -> ChaosResult<()> {
266        let target = target.into();
267        self.registry.activate(fault_id, &target)?;
268        let _ = self.event_tx.send(EngineEvent::FaultActivated {
269            fault_id: fault_id.to_string(),
270            target,
271        });
272        Ok(())
273    }
274
275    /// Disable a fault for a target.
276    pub async fn disable(&self, fault_id: &str, target: &str) -> ChaosResult<()> {
277        self.registry.deactivate(fault_id, target)?;
278        let _ = self.event_tx.send(EngineEvent::FaultDeactivated {
279            fault_id: fault_id.to_string(),
280            target: target.to_string(),
281        });
282        Ok(())
283    }
284
285    /// Enable a fault globally.
286    pub async fn enable_globally(&self, fault_id: &str) -> ChaosResult<()> {
287        self.registry.activate_globally(fault_id)?;
288        let _ = self.event_tx.send(EngineEvent::FaultActivated {
289            fault_id: fault_id.to_string(),
290            target: "*".to_string(),
291        });
292        Ok(())
293    }
294
295    /// Disable a fault globally.
296    pub async fn disable_globally(&self, fault_id: &str) -> ChaosResult<()> {
297        self.registry.deactivate_globally(fault_id)?;
298        let _ = self.event_tx.send(EngineEvent::FaultDeactivated {
299            fault_id: fault_id.to_string(),
300            target: "*".to_string(),
301        });
302        Ok(())
303    }
304
305    /// Set a schedule.
306    pub fn set_schedule(&self, schedule: ChaosSchedule) {
307        *self.scheduler.write() = Some(ChaosScheduler::new(schedule));
308    }
309
310    /// Process scheduler tick.
311    pub fn tick_scheduler(&self) -> Vec<ChaosEvent> {
312        if let Some(ref mut scheduler) = *self.scheduler.write() {
313            let events = scheduler.tick();
314            for event in &events {
315                let _ = self.event_tx.send(EngineEvent::ScheduleEvent(event.clone()));
316            }
317            events
318        } else {
319            Vec::new()
320        }
321    }
322
323    /// Process a request through chaos injection.
324    ///
325    /// This is the main entry point for applying faults.
326    pub async fn process(&self, ctx: &mut FaultContext) -> ChaosResult<FaultBehavior> {
327        // Check state
328        if *self.state.read() != EngineState::Running {
329            return Ok(FaultBehavior::Continue);
330        }
331
332        // Get active faults for this target
333        let active_fault_ids = self.registry.active_for(ctx.target.identifier());
334
335        let mut final_behavior = FaultBehavior::Continue;
336
337        for fault_id in active_fault_ids {
338            if ctx.skip_remaining {
339                break;
340            }
341
342            if let Some(entry) = self.registry.get(&fault_id) {
343                // Check if should activate
344                let should_activate = match entry.fault.should_activate(ctx).await {
345                    Ok(v) => v,
346                    Err(e) => {
347                        if !self.continue_on_error {
348                            return Err(e);
349                        }
350                        let _ = self.event_tx.send(EngineEvent::Error {
351                            message: e.to_string(),
352                        });
353                        continue;
354                    }
355                };
356
357                if should_activate {
358                    // Apply the fault
359                    let behavior = match entry.fault.before_operation(ctx).await {
360                        Ok(b) => b,
361                        Err(e) => {
362                            if !self.continue_on_error {
363                                return Err(e);
364                            }
365                            let _ = self.event_tx.send(EngineEvent::Error {
366                                message: e.to_string(),
367                            });
368                            continue;
369                        }
370                    };
371
372                    // Emit event
373                    let _ = self.event_tx.send(EngineEvent::FaultApplied {
374                        fault_id: fault_id.clone(),
375                        target: ctx.target.device_id.clone(),
376                        behavior: behavior.clone(),
377                    });
378
379                    // Determine final behavior (most severe wins)
380                    final_behavior = merge_behaviors(final_behavior, behavior);
381
382                    // Check if should stop processing
383                    if !final_behavior.should_proceed() {
384                        break;
385                    }
386                }
387            }
388        }
389
390        Ok(final_behavior)
391    }
392
393    /// Process after operation (for response modification).
394    pub async fn process_after(&self, ctx: &mut FaultContext) -> ChaosResult<()> {
395        if *self.state.read() != EngineState::Running {
396            return Ok(());
397        }
398
399        ctx.transition_to_after();
400
401        let active_fault_ids = self.registry.active_for(ctx.target.identifier());
402
403        for fault_id in active_fault_ids {
404            if let Some(entry) = self.registry.get(&fault_id) {
405                if let Err(e) = entry.fault.after_operation(ctx).await {
406                    if !self.continue_on_error {
407                        return Err(e);
408                    }
409                    let _ = self.event_tx.send(EngineEvent::Error {
410                        message: e.to_string(),
411                    });
412                }
413            }
414        }
415
416        Ok(())
417    }
418
419    /// Get statistics for all faults.
420    pub fn statistics(&self) -> Vec<(String, FaultStatistics)> {
421        self.registry
422            .ids()
423            .into_iter()
424            .filter_map(|id| {
425                self.registry.get(&id).map(|entry| (id, entry.fault.statistics()))
426            })
427            .collect()
428    }
429}
430
431impl Default for ChaosEngine {
432    fn default() -> Self {
433        Self::new()
434    }
435}
436
437/// Merge two fault behaviors, keeping the most severe.
438fn merge_behaviors(a: FaultBehavior, b: FaultBehavior) -> FaultBehavior {
439    use FaultBehavior::*;
440
441    match (&a, &b) {
442        // Abort always wins
443        (Abort { .. }, _) => a,
444        (_, Abort { .. }) => b,
445
446        // Skip beats delay and continue
447        (Skip, _) => a,
448        (_, Skip) => b,
449
450        // ReturnError beats delay and continue
451        (ReturnError { .. }, _) => a,
452        (_, ReturnError { .. }) => b,
453
454        // Delay - take longer delay
455        (Delay { duration_ms: d1 }, Delay { duration_ms: d2 }) => {
456            Delay {
457                duration_ms: (*d1).max(*d2),
458            }
459        }
460        (Delay { .. }, _) => a,
461        (_, Delay { .. }) => b,
462
463        // Modify beats continue
464        (Modify, _) => a,
465        (_, Modify) => b,
466
467        // Continue is default
468        _ => Continue,
469    }
470}
471
472// =============================================================================
473// Builder
474// =============================================================================
475
476/// Builder for chaos engine.
477#[derive(Debug, Default)]
478pub struct ChaosEngineBuilder {
479    faults: Vec<(String, BoxedFault)>,
480    schedule: Option<ChaosSchedule>,
481    continue_on_error: bool,
482}
483
484impl ChaosEngineBuilder {
485    /// Create a new builder.
486    pub fn new() -> Self {
487        Self {
488            faults: Vec::new(),
489            schedule: None,
490            continue_on_error: true,
491        }
492    }
493
494    /// Add a fault.
495    pub fn add_fault(mut self, id: impl Into<String>, fault: impl Fault + 'static) -> Self {
496        self.faults.push((id.into(), Box::new(fault)));
497        self
498    }
499
500    /// Add a boxed fault.
501    pub fn add_boxed_fault(mut self, id: impl Into<String>, fault: BoxedFault) -> Self {
502        self.faults.push((id.into(), fault));
503        self
504    }
505
506    /// Set a schedule.
507    pub fn schedule(mut self, schedule: ChaosSchedule) -> Self {
508        self.schedule = Some(schedule);
509        self
510    }
511
512    /// Set whether to continue on error.
513    pub fn continue_on_error(mut self, continue_on_error: bool) -> Self {
514        self.continue_on_error = continue_on_error;
515        self
516    }
517
518    /// Build the engine.
519    pub fn build(self) -> ChaosEngine {
520        let (event_tx, _) = broadcast::channel(1024);
521        let registry = Arc::new(FaultRegistry::new());
522
523        for (id, fault) in self.faults {
524            let _ = registry.register(id, fault);
525        }
526
527        let scheduler = self.schedule.map(ChaosScheduler::new);
528
529        ChaosEngine {
530            registry,
531            scheduler: Arc::new(RwLock::new(scheduler)),
532            state: Arc::new(RwLock::new(EngineState::Stopped)),
533            event_tx,
534            started_at: Arc::new(RwLock::new(None)),
535            continue_on_error: self.continue_on_error,
536        }
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543    use crate::context::{OperationType, TargetInfo};
544    use crate::network::NetworkLatencyFault;
545    use mabi_core::Protocol;
546
547    #[tokio::test]
548    async fn test_engine_lifecycle() {
549        let engine = ChaosEngine::new();
550
551        assert_eq!(engine.state(), EngineState::Stopped);
552
553        engine.start().await.unwrap();
554        assert_eq!(engine.state(), EngineState::Running);
555
556        engine.pause().unwrap();
557        assert_eq!(engine.state(), EngineState::Paused);
558
559        engine.resume().unwrap();
560        assert_eq!(engine.state(), EngineState::Running);
561
562        engine.stop().await.unwrap();
563        assert_eq!(engine.state(), EngineState::Stopped);
564    }
565
566    #[tokio::test]
567    async fn test_fault_registration() {
568        let engine = ChaosEngine::new();
569
570        let fault = NetworkLatencyFault::builder()
571            .id("test")
572            .base_ms(100)
573            .build();
574
575        engine.register("test", Box::new(fault)).unwrap();
576        assert!(engine.registry().contains("test"));
577
578        engine.unregister("test").unwrap();
579        assert!(!engine.registry().contains("test"));
580    }
581
582    #[tokio::test]
583    async fn test_fault_activation() {
584        let engine = ChaosEngine::new();
585
586        let fault = NetworkLatencyFault::builder()
587            .id("test")
588            .base_ms(10)
589            .build();
590
591        engine.register("test", Box::new(fault)).unwrap();
592        engine.start().await.unwrap();
593        engine.enable("test", "device-001").await.unwrap();
594
595        let mut ctx = FaultContext::new(
596            TargetInfo::device("device-001"),
597            OperationType::Read {
598                point_id: "temp".to_string(),
599            },
600            Protocol::ModbusTcp,
601        );
602
603        let behavior = engine.process(&mut ctx).await.unwrap();
604        assert!(ctx.was_affected());
605    }
606
607    #[test]
608    fn test_behavior_merging() {
609        use FaultBehavior::*;
610
611        // Abort wins
612        assert!(matches!(
613            merge_behaviors(Abort { error: "a".into() }, Continue),
614            Abort { .. }
615        ));
616
617        // Skip beats delay
618        assert!(matches!(
619            merge_behaviors(Delay { duration_ms: 100 }, Skip),
620            Skip
621        ));
622
623        // Longer delay wins
624        match merge_behaviors(Delay { duration_ms: 100 }, Delay { duration_ms: 200 }) {
625            Delay { duration_ms } => assert_eq!(duration_ms, 200),
626            _ => panic!("Expected Delay"),
627        }
628    }
629}