1use std::sync::Arc;
4use std::time::Instant;
5
6use parking_lot::RwLock;
7use serde::{Deserialize, Serialize};
8use tokio::sync::broadcast;
9
10use crate::context::FaultContext;
11use crate::error::{ChaosError, ChaosResult};
12use crate::fault::{BoxedFault, Fault, FaultBehavior, FaultStatistics};
13use crate::registry::{FaultFilter, FaultRegistry};
14use crate::scheduler::{ChaosEvent, ChaosSchedule, ChaosScheduler, ChaosType};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
22#[serde(rename_all = "lowercase")]
23pub enum EngineState {
24 #[default]
26 Stopped,
27
28 Running,
30
31 Paused,
33}
34
35#[derive(Debug, Clone)]
41pub enum EngineEvent {
42 Started,
44
45 Stopped,
47
48 Paused,
50
51 Resumed,
53
54 FaultRegistered { fault_id: String },
56
57 FaultUnregistered { fault_id: String },
59
60 FaultActivated { fault_id: String, target: String },
62
63 FaultDeactivated { fault_id: String, target: String },
65
66 FaultApplied {
68 fault_id: String,
69 target: String,
70 behavior: FaultBehavior,
71 },
72
73 ScheduleEvent(ChaosEvent),
75
76 Error { message: String },
78}
79
80#[derive(Debug)]
110pub struct ChaosEngine {
111 registry: Arc<FaultRegistry>,
113
114 scheduler: Arc<RwLock<Option<ChaosScheduler>>>,
116
117 state: Arc<RwLock<EngineState>>,
119
120 event_tx: broadcast::Sender<EngineEvent>,
122
123 started_at: Arc<RwLock<Option<Instant>>>,
125
126 continue_on_error: bool,
128}
129
130impl ChaosEngine {
131 pub fn new() -> Self {
133 let (event_tx, _) = broadcast::channel(1024);
134 Self {
135 registry: Arc::new(FaultRegistry::new()),
136 scheduler: Arc::new(RwLock::new(None)),
137 state: Arc::new(RwLock::new(EngineState::Stopped)),
138 event_tx,
139 started_at: Arc::new(RwLock::new(None)),
140 continue_on_error: true,
141 }
142 }
143
144 pub fn builder() -> ChaosEngineBuilder {
146 ChaosEngineBuilder::new()
147 }
148
149 pub fn registry(&self) -> &FaultRegistry {
151 &self.registry
152 }
153
154 pub fn state(&self) -> EngineState {
156 *self.state.read()
157 }
158
159 pub fn is_running(&self) -> bool {
161 *self.state.read() == EngineState::Running
162 }
163
164 pub fn subscribe(&self) -> broadcast::Receiver<EngineEvent> {
166 self.event_tx.subscribe()
167 }
168
169 pub async fn start(&self) -> ChaosResult<()> {
171 let mut state = self.state.write();
172 if *state == EngineState::Running {
173 return Err(ChaosError::EngineAlreadyRunning);
174 }
175
176 *state = EngineState::Running;
177 *self.started_at.write() = Some(Instant::now());
178
179 if let Some(ref mut scheduler) = *self.scheduler.write() {
181 scheduler.start();
182 }
183
184 let _ = self.event_tx.send(EngineEvent::Started);
185 tracing::info!("Chaos engine started");
186
187 Ok(())
188 }
189
190 pub async fn stop(&self) -> ChaosResult<()> {
192 let mut state = self.state.write();
193 if *state == EngineState::Stopped {
194 return Err(ChaosError::EngineNotRunning);
195 }
196
197 if let Some(ref mut scheduler) = *self.scheduler.write() {
199 scheduler.stop();
200 }
201
202 *state = EngineState::Stopped;
203 *self.started_at.write() = None;
204
205 self.registry.reset_all();
207
208 let _ = self.event_tx.send(EngineEvent::Stopped);
209 tracing::info!("Chaos engine stopped");
210
211 Ok(())
212 }
213
214 pub fn pause(&self) -> ChaosResult<()> {
216 let mut state = self.state.write();
217 if *state != EngineState::Running {
218 return Err(ChaosError::EngineNotRunning);
219 }
220
221 *state = EngineState::Paused;
222 let _ = self.event_tx.send(EngineEvent::Paused);
223 tracing::info!("Chaos engine paused");
224
225 Ok(())
226 }
227
228 pub fn resume(&self) -> ChaosResult<()> {
230 let mut state = self.state.write();
231 if *state != EngineState::Paused {
232 return Err(ChaosError::InvalidStateTransition {
233 from: format!("{:?}", *state),
234 to: "Running".to_string(),
235 });
236 }
237
238 *state = EngineState::Running;
239 let _ = self.event_tx.send(EngineEvent::Resumed);
240 tracing::info!("Chaos engine resumed");
241
242 Ok(())
243 }
244
245 pub fn register(&self, id: impl Into<String>, fault: BoxedFault) -> ChaosResult<()> {
247 let id = id.into();
248 self.registry.register(&id, fault)?;
249 let _ = self.event_tx.send(EngineEvent::FaultRegistered {
250 fault_id: id,
251 });
252 Ok(())
253 }
254
255 pub fn unregister(&self, id: &str) -> ChaosResult<BoxedFault> {
257 let fault = self.registry.unregister(id)?;
258 let _ = self.event_tx.send(EngineEvent::FaultUnregistered {
259 fault_id: id.to_string(),
260 });
261 Ok(fault)
262 }
263
264 pub async fn enable(&self, fault_id: &str, target: impl Into<String>) -> ChaosResult<()> {
266 let target = target.into();
267 self.registry.activate(fault_id, &target)?;
268 let _ = self.event_tx.send(EngineEvent::FaultActivated {
269 fault_id: fault_id.to_string(),
270 target,
271 });
272 Ok(())
273 }
274
275 pub async fn disable(&self, fault_id: &str, target: &str) -> ChaosResult<()> {
277 self.registry.deactivate(fault_id, target)?;
278 let _ = self.event_tx.send(EngineEvent::FaultDeactivated {
279 fault_id: fault_id.to_string(),
280 target: target.to_string(),
281 });
282 Ok(())
283 }
284
285 pub async fn enable_globally(&self, fault_id: &str) -> ChaosResult<()> {
287 self.registry.activate_globally(fault_id)?;
288 let _ = self.event_tx.send(EngineEvent::FaultActivated {
289 fault_id: fault_id.to_string(),
290 target: "*".to_string(),
291 });
292 Ok(())
293 }
294
295 pub async fn disable_globally(&self, fault_id: &str) -> ChaosResult<()> {
297 self.registry.deactivate_globally(fault_id)?;
298 let _ = self.event_tx.send(EngineEvent::FaultDeactivated {
299 fault_id: fault_id.to_string(),
300 target: "*".to_string(),
301 });
302 Ok(())
303 }
304
305 pub fn set_schedule(&self, schedule: ChaosSchedule) {
307 *self.scheduler.write() = Some(ChaosScheduler::new(schedule));
308 }
309
310 pub fn tick_scheduler(&self) -> Vec<ChaosEvent> {
312 if let Some(ref mut scheduler) = *self.scheduler.write() {
313 let events = scheduler.tick();
314 for event in &events {
315 let _ = self.event_tx.send(EngineEvent::ScheduleEvent(event.clone()));
316 }
317 events
318 } else {
319 Vec::new()
320 }
321 }
322
323 pub async fn process(&self, ctx: &mut FaultContext) -> ChaosResult<FaultBehavior> {
327 if *self.state.read() != EngineState::Running {
329 return Ok(FaultBehavior::Continue);
330 }
331
332 let active_fault_ids = self.registry.active_for(ctx.target.identifier());
334
335 let mut final_behavior = FaultBehavior::Continue;
336
337 for fault_id in active_fault_ids {
338 if ctx.skip_remaining {
339 break;
340 }
341
342 if let Some(entry) = self.registry.get(&fault_id) {
343 let should_activate = match entry.fault.should_activate(ctx).await {
345 Ok(v) => v,
346 Err(e) => {
347 if !self.continue_on_error {
348 return Err(e);
349 }
350 let _ = self.event_tx.send(EngineEvent::Error {
351 message: e.to_string(),
352 });
353 continue;
354 }
355 };
356
357 if should_activate {
358 let behavior = match entry.fault.before_operation(ctx).await {
360 Ok(b) => b,
361 Err(e) => {
362 if !self.continue_on_error {
363 return Err(e);
364 }
365 let _ = self.event_tx.send(EngineEvent::Error {
366 message: e.to_string(),
367 });
368 continue;
369 }
370 };
371
372 let _ = self.event_tx.send(EngineEvent::FaultApplied {
374 fault_id: fault_id.clone(),
375 target: ctx.target.device_id.clone(),
376 behavior: behavior.clone(),
377 });
378
379 final_behavior = merge_behaviors(final_behavior, behavior);
381
382 if !final_behavior.should_proceed() {
384 break;
385 }
386 }
387 }
388 }
389
390 Ok(final_behavior)
391 }
392
393 pub async fn process_after(&self, ctx: &mut FaultContext) -> ChaosResult<()> {
395 if *self.state.read() != EngineState::Running {
396 return Ok(());
397 }
398
399 ctx.transition_to_after();
400
401 let active_fault_ids = self.registry.active_for(ctx.target.identifier());
402
403 for fault_id in active_fault_ids {
404 if let Some(entry) = self.registry.get(&fault_id) {
405 if let Err(e) = entry.fault.after_operation(ctx).await {
406 if !self.continue_on_error {
407 return Err(e);
408 }
409 let _ = self.event_tx.send(EngineEvent::Error {
410 message: e.to_string(),
411 });
412 }
413 }
414 }
415
416 Ok(())
417 }
418
419 pub fn statistics(&self) -> Vec<(String, FaultStatistics)> {
421 self.registry
422 .ids()
423 .into_iter()
424 .filter_map(|id| {
425 self.registry.get(&id).map(|entry| (id, entry.fault.statistics()))
426 })
427 .collect()
428 }
429}
430
431impl Default for ChaosEngine {
432 fn default() -> Self {
433 Self::new()
434 }
435}
436
437fn merge_behaviors(a: FaultBehavior, b: FaultBehavior) -> FaultBehavior {
439 use FaultBehavior::*;
440
441 match (&a, &b) {
442 (Abort { .. }, _) => a,
444 (_, Abort { .. }) => b,
445
446 (Skip, _) => a,
448 (_, Skip) => b,
449
450 (ReturnError { .. }, _) => a,
452 (_, ReturnError { .. }) => b,
453
454 (Delay { duration_ms: d1 }, Delay { duration_ms: d2 }) => {
456 Delay {
457 duration_ms: (*d1).max(*d2),
458 }
459 }
460 (Delay { .. }, _) => a,
461 (_, Delay { .. }) => b,
462
463 (Modify, _) => a,
465 (_, Modify) => b,
466
467 _ => Continue,
469 }
470}
471
472#[derive(Debug, Default)]
478pub struct ChaosEngineBuilder {
479 faults: Vec<(String, BoxedFault)>,
480 schedule: Option<ChaosSchedule>,
481 continue_on_error: bool,
482}
483
484impl ChaosEngineBuilder {
485 pub fn new() -> Self {
487 Self {
488 faults: Vec::new(),
489 schedule: None,
490 continue_on_error: true,
491 }
492 }
493
494 pub fn add_fault(mut self, id: impl Into<String>, fault: impl Fault + 'static) -> Self {
496 self.faults.push((id.into(), Box::new(fault)));
497 self
498 }
499
500 pub fn add_boxed_fault(mut self, id: impl Into<String>, fault: BoxedFault) -> Self {
502 self.faults.push((id.into(), fault));
503 self
504 }
505
506 pub fn schedule(mut self, schedule: ChaosSchedule) -> Self {
508 self.schedule = Some(schedule);
509 self
510 }
511
512 pub fn continue_on_error(mut self, continue_on_error: bool) -> Self {
514 self.continue_on_error = continue_on_error;
515 self
516 }
517
518 pub fn build(self) -> ChaosEngine {
520 let (event_tx, _) = broadcast::channel(1024);
521 let registry = Arc::new(FaultRegistry::new());
522
523 for (id, fault) in self.faults {
524 let _ = registry.register(id, fault);
525 }
526
527 let scheduler = self.schedule.map(ChaosScheduler::new);
528
529 ChaosEngine {
530 registry,
531 scheduler: Arc::new(RwLock::new(scheduler)),
532 state: Arc::new(RwLock::new(EngineState::Stopped)),
533 event_tx,
534 started_at: Arc::new(RwLock::new(None)),
535 continue_on_error: self.continue_on_error,
536 }
537 }
538}
539
540#[cfg(test)]
541mod tests {
542 use super::*;
543 use crate::context::{OperationType, TargetInfo};
544 use crate::network::NetworkLatencyFault;
545 use mabi_core::Protocol;
546
547 #[tokio::test]
548 async fn test_engine_lifecycle() {
549 let engine = ChaosEngine::new();
550
551 assert_eq!(engine.state(), EngineState::Stopped);
552
553 engine.start().await.unwrap();
554 assert_eq!(engine.state(), EngineState::Running);
555
556 engine.pause().unwrap();
557 assert_eq!(engine.state(), EngineState::Paused);
558
559 engine.resume().unwrap();
560 assert_eq!(engine.state(), EngineState::Running);
561
562 engine.stop().await.unwrap();
563 assert_eq!(engine.state(), EngineState::Stopped);
564 }
565
566 #[tokio::test]
567 async fn test_fault_registration() {
568 let engine = ChaosEngine::new();
569
570 let fault = NetworkLatencyFault::builder()
571 .id("test")
572 .base_ms(100)
573 .build();
574
575 engine.register("test", Box::new(fault)).unwrap();
576 assert!(engine.registry().contains("test"));
577
578 engine.unregister("test").unwrap();
579 assert!(!engine.registry().contains("test"));
580 }
581
582 #[tokio::test]
583 async fn test_fault_activation() {
584 let engine = ChaosEngine::new();
585
586 let fault = NetworkLatencyFault::builder()
587 .id("test")
588 .base_ms(10)
589 .build();
590
591 engine.register("test", Box::new(fault)).unwrap();
592 engine.start().await.unwrap();
593 engine.enable("test", "device-001").await.unwrap();
594
595 let mut ctx = FaultContext::new(
596 TargetInfo::device("device-001"),
597 OperationType::Read {
598 point_id: "temp".to_string(),
599 },
600 Protocol::ModbusTcp,
601 );
602
603 let behavior = engine.process(&mut ctx).await.unwrap();
604 assert!(ctx.was_affected());
605 }
606
607 #[test]
608 fn test_behavior_merging() {
609 use FaultBehavior::*;
610
611 assert!(matches!(
613 merge_behaviors(Abort { error: "a".into() }, Continue),
614 Abort { .. }
615 ));
616
617 assert!(matches!(
619 merge_behaviors(Delay { duration_ms: 100 }, Skip),
620 Skip
621 ));
622
623 match merge_behaviors(Delay { duration_ms: 100 }, Delay { duration_ms: 200 }) {
625 Delay { duration_ms } => assert_eq!(duration_ms, 200),
626 _ => panic!("Expected Delay"),
627 }
628 }
629}