1pub mod config;
53pub mod stats;
54pub mod targeting;
55
56pub mod crc_corruption;
58pub mod delayed_response;
59pub mod exception_injection;
60pub mod extra_data;
61pub mod no_response;
62pub mod partial_frame;
63pub mod truncated_response;
64pub mod wrong_function_code;
65pub mod wrong_transaction_id;
66pub mod wrong_unit_id;
67
68pub mod connection_disruption;
70pub mod rtu_timing;
71
72use std::sync::Arc;
73use std::time::Duration;
74
75use tracing::{debug, trace};
76
77pub use config::*;
78pub use stats::{FaultStats, FaultStatsSnapshot};
79pub use targeting::FaultTarget;
80
81pub use crc_corruption::CrcCorruptionFault;
83pub use delayed_response::DelayedResponseFault;
84pub use exception_injection::ExceptionInjectionFault;
85pub use extra_data::ExtraDataFault;
86pub use no_response::NoResponseFault;
87pub use partial_frame::PartialFrameFault;
88pub use truncated_response::TruncatedResponseFault;
89pub use wrong_function_code::WrongFunctionCodeFault;
90pub use wrong_transaction_id::WrongTransactionIdFault;
91pub use wrong_unit_id::WrongUnitIdFault;
92
93#[derive(Debug, Clone)]
98pub enum FaultAction {
99 SendResponse(Vec<u8>),
102
103 DropResponse,
106
107 DelayThenSend {
110 delay: Duration,
111 response: Vec<u8>,
112 },
113
114 SendPartial {
117 bytes: Vec<u8>,
118 },
119
120 SendRawBytes(Vec<u8>),
123
124 OverrideTransactionId {
127 transaction_id: u16,
128 response: Vec<u8>,
129 },
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub enum TransportKind {
135 Tcp,
137 Rtu,
139}
140
141#[derive(Debug, Clone)]
146pub struct ModbusFaultContext {
147 pub transport: TransportKind,
149
150 pub unit_id: u8,
152
153 pub function_code: u8,
155
156 pub request_pdu: Vec<u8>,
158
159 pub response_pdu: Vec<u8>,
162
163 pub transaction_id: u16,
165
166 pub is_broadcast: bool,
168
169 pub request_number: u64,
171}
172
173impl ModbusFaultContext {
174 pub fn rtu(
176 unit_id: u8,
177 function_code: u8,
178 request_pdu: &[u8],
179 response_pdu: &[u8],
180 request_number: u64,
181 ) -> Self {
182 Self {
183 transport: TransportKind::Rtu,
184 unit_id,
185 function_code,
186 request_pdu: request_pdu.to_vec(),
187 response_pdu: response_pdu.to_vec(),
188 transaction_id: 0,
189 is_broadcast: unit_id == 0,
190 request_number,
191 }
192 }
193
194 pub fn tcp(
196 unit_id: u8,
197 function_code: u8,
198 request_pdu: &[u8],
199 response_pdu: &[u8],
200 transaction_id: u16,
201 request_number: u64,
202 ) -> Self {
203 Self {
204 transport: TransportKind::Tcp,
205 unit_id,
206 function_code,
207 request_pdu: request_pdu.to_vec(),
208 response_pdu: response_pdu.to_vec(),
209 transaction_id,
210 is_broadcast: unit_id == 0,
211 request_number,
212 }
213 }
214}
215
216pub trait ModbusFault: Send + Sync {
230 fn fault_type(&self) -> &'static str;
232
233 fn is_enabled(&self) -> bool;
235
236 fn set_enabled(&self, enabled: bool);
238
239 fn should_activate(&self, ctx: &ModbusFaultContext) -> bool;
242
243 fn apply(&self, ctx: &ModbusFaultContext) -> FaultAction;
248
249 fn stats(&self) -> FaultStatsSnapshot;
251
252 fn reset_stats(&self);
254
255 fn is_short_circuit(&self) -> bool {
258 false
259 }
260
261 fn compatible_transport(&self) -> Option<TransportKind> {
264 None
265 }
266}
267
268pub struct FaultPipeline {
282 faults: Vec<Arc<dyn ModbusFault>>,
284 enabled: std::sync::atomic::AtomicBool,
286}
287
288impl FaultPipeline {
289 pub fn new() -> Self {
291 Self {
292 faults: Vec::new(),
293 enabled: std::sync::atomic::AtomicBool::new(true),
294 }
295 }
296
297 pub fn with_fault(mut self, fault: Arc<dyn ModbusFault>) -> Self {
299 self.faults.push(fault);
300 self
301 }
302
303 pub fn with_faults(mut self, faults: Vec<Arc<dyn ModbusFault>>) -> Self {
305 self.faults.extend(faults);
306 self
307 }
308
309 pub fn is_enabled(&self) -> bool {
311 self.enabled.load(std::sync::atomic::Ordering::Acquire)
312 }
313
314 pub fn set_enabled(&self, enabled: bool) {
316 self.enabled
317 .store(enabled, std::sync::atomic::Ordering::Release);
318 }
319
320 pub fn len(&self) -> usize {
322 self.faults.len()
323 }
324
325 pub fn is_empty(&self) -> bool {
327 self.faults.is_empty()
328 }
329
330 pub fn apply(&self, ctx: &ModbusFaultContext) -> Option<FaultAction> {
340 if !self.is_enabled() {
341 return None;
342 }
343
344 for fault in &self.faults {
346 if !fault.is_enabled() {
347 continue;
348 }
349 if !fault.is_short_circuit() {
350 continue;
351 }
352 if let Some(required) = fault.compatible_transport() {
354 if required != ctx.transport {
355 continue;
356 }
357 }
358 if fault.should_activate(ctx) {
359 let action = fault.apply(ctx);
360 debug!(
361 fault_type = fault.fault_type(),
362 unit_id = ctx.unit_id,
363 fc = ctx.function_code,
364 "Short-circuit fault activated"
365 );
366 return Some(action);
367 }
368 }
369
370 let mut current_response = ctx.response_pdu.clone();
372 let mut any_activated = false;
373 let mut total_delay = Duration::ZERO;
374 let mut override_tid: Option<u16> = None;
375 let mut send_raw = false;
376
377 for fault in &self.faults {
378 if !fault.is_enabled() {
379 continue;
380 }
381 if fault.is_short_circuit() {
382 continue;
383 }
384 if let Some(required) = fault.compatible_transport() {
386 if required != ctx.transport {
387 continue;
388 }
389 }
390 if !fault.should_activate(ctx) {
391 continue;
392 }
393
394 let mut sub_ctx = ctx.clone();
396 sub_ctx.response_pdu = current_response.clone();
397
398 let action = fault.apply(&sub_ctx);
399 any_activated = true;
400
401 trace!(
402 fault_type = fault.fault_type(),
403 unit_id = ctx.unit_id,
404 fc = ctx.function_code,
405 "Mutation fault activated"
406 );
407
408 match action {
409 FaultAction::SendResponse(pdu) => {
410 current_response = pdu;
411 }
412 FaultAction::DelayThenSend { delay, response } => {
413 total_delay += delay;
414 current_response = response;
415 }
416 FaultAction::OverrideTransactionId {
417 transaction_id,
418 response,
419 } => {
420 override_tid = Some(transaction_id);
421 current_response = response;
422 }
423 FaultAction::SendRawBytes(bytes) => {
424 current_response = bytes;
425 send_raw = true;
426 }
427 FaultAction::DropResponse => {
429 return Some(FaultAction::DropResponse);
430 }
431 FaultAction::SendPartial { bytes } => {
432 return Some(FaultAction::SendPartial { bytes });
433 }
434 }
435 }
436
437 if !any_activated {
438 return None;
439 }
440
441 let action = if send_raw {
443 if total_delay > Duration::ZERO {
444 FaultAction::DelayThenSend {
446 delay: total_delay,
447 response: current_response,
448 }
449 } else {
450 FaultAction::SendRawBytes(current_response)
451 }
452 } else if let Some(tid) = override_tid {
453 if total_delay > Duration::ZERO {
454 FaultAction::DelayThenSend {
455 delay: total_delay,
456 response: current_response,
457 }
458 } else {
459 FaultAction::OverrideTransactionId {
460 transaction_id: tid,
461 response: current_response,
462 }
463 }
464 } else if total_delay > Duration::ZERO {
465 FaultAction::DelayThenSend {
466 delay: total_delay,
467 response: current_response,
468 }
469 } else {
470 FaultAction::SendResponse(current_response)
471 };
472
473 debug!(
474 unit_id = ctx.unit_id,
475 fc = ctx.function_code,
476 "Fault pipeline produced action"
477 );
478
479 Some(action)
480 }
481
482 pub fn all_stats(&self) -> Vec<(&'static str, FaultStatsSnapshot)> {
484 self.faults
485 .iter()
486 .map(|f| (f.fault_type(), f.stats()))
487 .collect()
488 }
489
490 pub fn reset_all_stats(&self) {
492 for fault in &self.faults {
493 fault.reset_stats();
494 }
495 }
496
497 pub fn from_config(config: &FaultInjectionConfig) -> Self {
499 let mut pipeline = Self::new();
500
501 if !config.enabled {
502 pipeline
503 .enabled
504 .store(false, std::sync::atomic::Ordering::Release);
505 return pipeline;
506 }
507
508 for fault_cfg in &config.faults {
509 let fault: Arc<dyn ModbusFault> = match fault_cfg.fault_type {
510 FaultType::CrcCorruption => Arc::new(CrcCorruptionFault::from_config(
511 &fault_cfg.config,
512 fault_cfg.target.clone(),
513 )),
514 FaultType::WrongUnitId => Arc::new(WrongUnitIdFault::from_config(
515 &fault_cfg.config,
516 fault_cfg.target.clone(),
517 )),
518 FaultType::WrongFunctionCode => Arc::new(WrongFunctionCodeFault::from_config(
519 &fault_cfg.config,
520 fault_cfg.target.clone(),
521 )),
522 FaultType::WrongTransactionId => Arc::new(WrongTransactionIdFault::from_config(
523 &fault_cfg.config,
524 fault_cfg.target.clone(),
525 )),
526 FaultType::TruncatedResponse => Arc::new(TruncatedResponseFault::from_config(
527 &fault_cfg.config,
528 fault_cfg.target.clone(),
529 )),
530 FaultType::ExtraData => Arc::new(ExtraDataFault::from_config(
531 &fault_cfg.config,
532 fault_cfg.target.clone(),
533 )),
534 FaultType::DelayedResponse => Arc::new(DelayedResponseFault::from_config(
535 &fault_cfg.config,
536 fault_cfg.target.clone(),
537 )),
538 FaultType::NoResponse => Arc::new(NoResponseFault::new(fault_cfg.target.clone())),
539 FaultType::ExceptionInjection => Arc::new(
540 ExceptionInjectionFault::from_config(&fault_cfg.config, fault_cfg.target.clone()),
541 ),
542 FaultType::PartialFrame => Arc::new(PartialFrameFault::from_config(
543 &fault_cfg.config,
544 fault_cfg.target.clone(),
545 )),
546 };
547
548 pipeline.faults.push(fault);
549 }
550
551 pipeline
552 }
553}
554
555impl Default for FaultPipeline {
556 fn default() -> Self {
557 Self::new()
558 }
559}
560
561impl std::fmt::Debug for FaultPipeline {
562 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
563 f.debug_struct("FaultPipeline")
564 .field("enabled", &self.is_enabled())
565 .field("fault_count", &self.faults.len())
566 .field(
567 "faults",
568 &self
569 .faults
570 .iter()
571 .map(|f| f.fault_type())
572 .collect::<Vec<_>>(),
573 )
574 .finish()
575 }
576}
577
578#[cfg(test)]
579mod tests {
580 use super::*;
581
582 struct AlwaysDropFault {
584 stats: FaultStats,
585 }
586
587 impl AlwaysDropFault {
588 fn new() -> Self {
589 Self {
590 stats: FaultStats::new(),
591 }
592 }
593 }
594
595 impl ModbusFault for AlwaysDropFault {
596 fn fault_type(&self) -> &'static str {
597 "always_drop"
598 }
599 fn is_enabled(&self) -> bool {
600 self.stats.is_enabled()
601 }
602 fn set_enabled(&self, enabled: bool) {
603 self.stats.set_enabled(enabled);
604 }
605 fn should_activate(&self, _ctx: &ModbusFaultContext) -> bool {
606 self.stats.record_check();
607 true
608 }
609 fn apply(&self, _ctx: &ModbusFaultContext) -> FaultAction {
610 self.stats.record_activation();
611 self.stats.record_affected();
612 FaultAction::DropResponse
613 }
614 fn stats(&self) -> FaultStatsSnapshot {
615 self.stats.snapshot()
616 }
617 fn reset_stats(&self) {
618 self.stats.reset();
619 }
620 fn is_short_circuit(&self) -> bool {
621 true
622 }
623 }
624
625 struct PrependByteFault {
627 byte: u8,
628 stats: FaultStats,
629 }
630
631 impl PrependByteFault {
632 fn new(byte: u8) -> Self {
633 Self {
634 byte,
635 stats: FaultStats::new(),
636 }
637 }
638 }
639
640 impl ModbusFault for PrependByteFault {
641 fn fault_type(&self) -> &'static str {
642 "prepend_byte"
643 }
644 fn is_enabled(&self) -> bool {
645 self.stats.is_enabled()
646 }
647 fn set_enabled(&self, enabled: bool) {
648 self.stats.set_enabled(enabled);
649 }
650 fn should_activate(&self, _ctx: &ModbusFaultContext) -> bool {
651 self.stats.record_check();
652 true
653 }
654 fn apply(&self, ctx: &ModbusFaultContext) -> FaultAction {
655 self.stats.record_activation();
656 self.stats.record_affected();
657 let mut response = vec![self.byte];
658 response.extend_from_slice(&ctx.response_pdu);
659 FaultAction::SendResponse(response)
660 }
661 fn stats(&self) -> FaultStatsSnapshot {
662 self.stats.snapshot()
663 }
664 fn reset_stats(&self) {
665 self.stats.reset();
666 }
667 }
668
669 fn test_ctx() -> ModbusFaultContext {
670 ModbusFaultContext::tcp(1, 0x03, &[0x03, 0x00, 0x00, 0x00, 0x01], &[0x03, 0x02, 0x00, 0x64], 1, 1)
671 }
672
673 #[test]
674 fn test_empty_pipeline() {
675 let pipeline = FaultPipeline::new();
676 assert!(pipeline.is_empty());
677 assert_eq!(pipeline.len(), 0);
678 assert!(pipeline.apply(&test_ctx()).is_none());
679 }
680
681 #[test]
682 fn test_pipeline_disabled() {
683 let pipeline = FaultPipeline::new()
684 .with_fault(Arc::new(AlwaysDropFault::new()));
685 pipeline.set_enabled(false);
686 assert!(pipeline.apply(&test_ctx()).is_none());
687 }
688
689 #[test]
690 fn test_short_circuit_priority() {
691 let drop_fault = Arc::new(AlwaysDropFault::new());
692 let prepend_fault = Arc::new(PrependByteFault::new(0xFF));
693
694 let pipeline = FaultPipeline::new()
696 .with_fault(prepend_fault.clone())
697 .with_fault(drop_fault.clone());
698
699 let action = pipeline.apply(&test_ctx());
700 assert!(matches!(action, Some(FaultAction::DropResponse)));
701
702 assert_eq!(drop_fault.stats().checks, 1);
704 assert_eq!(drop_fault.stats().activations, 1);
705
706 assert_eq!(prepend_fault.stats().checks, 0);
708 }
709
710 #[test]
711 fn test_mutation_chaining() {
712 let fault1 = Arc::new(PrependByteFault::new(0xAA));
713 let fault2 = Arc::new(PrependByteFault::new(0xBB));
714
715 let pipeline = FaultPipeline::new()
716 .with_fault(fault1.clone())
717 .with_fault(fault2.clone());
718
719 let ctx = test_ctx();
720 let action = pipeline.apply(&ctx).unwrap();
721
722 match action {
723 FaultAction::SendResponse(pdu) => {
724 assert_eq!(pdu[0], 0xBB);
726 assert_eq!(pdu[1], 0xAA);
727 assert_eq!(&pdu[2..], &ctx.response_pdu);
728 }
729 _ => panic!("Expected SendResponse"),
730 }
731 }
732
733 #[test]
734 fn test_disabled_fault_skipped() {
735 let fault = Arc::new(PrependByteFault::new(0xFF));
736 fault.set_enabled(false);
737
738 let pipeline = FaultPipeline::new().with_fault(fault.clone());
739 assert!(pipeline.apply(&test_ctx()).is_none());
740 assert_eq!(fault.stats().checks, 0);
741 }
742
743 #[test]
744 fn test_all_stats() {
745 let fault1 = Arc::new(PrependByteFault::new(0xAA));
746 let fault2 = Arc::new(PrependByteFault::new(0xBB));
747
748 let pipeline = FaultPipeline::new()
749 .with_fault(fault1)
750 .with_fault(fault2);
751
752 pipeline.apply(&test_ctx());
753
754 let stats = pipeline.all_stats();
755 assert_eq!(stats.len(), 2);
756 assert_eq!(stats[0].0, "prepend_byte");
757 assert_eq!(stats[0].1.activations, 1);
758 assert_eq!(stats[1].0, "prepend_byte");
759 assert_eq!(stats[1].1.activations, 1);
760 }
761
762 #[test]
763 fn test_reset_all_stats() {
764 let fault = Arc::new(PrependByteFault::new(0xAA));
765 let pipeline = FaultPipeline::new().with_fault(fault.clone());
766
767 pipeline.apply(&test_ctx());
768 assert_eq!(fault.stats().activations, 1);
769
770 pipeline.reset_all_stats();
771 assert_eq!(fault.stats().activations, 0);
772 }
773
774 #[test]
775 fn test_rtu_context() {
776 let ctx = ModbusFaultContext::rtu(1, 0x03, &[0x03], &[0x03, 0x02], 5);
777 assert_eq!(ctx.transport, TransportKind::Rtu);
778 assert_eq!(ctx.unit_id, 1);
779 assert_eq!(ctx.transaction_id, 0);
780 assert!(!ctx.is_broadcast);
781 assert_eq!(ctx.request_number, 5);
782 }
783
784 #[test]
785 fn test_tcp_context() {
786 let ctx = ModbusFaultContext::tcp(0, 0x03, &[0x03], &[0x03, 0x02], 42, 10);
787 assert_eq!(ctx.transport, TransportKind::Tcp);
788 assert_eq!(ctx.unit_id, 0);
789 assert!(ctx.is_broadcast);
790 assert_eq!(ctx.transaction_id, 42);
791 assert_eq!(ctx.request_number, 10);
792 }
793
794 #[test]
795 fn test_transport_compatibility() {
796 struct RtuOnlyFault {
797 stats: FaultStats,
798 }
799 impl ModbusFault for RtuOnlyFault {
800 fn fault_type(&self) -> &'static str { "rtu_only" }
801 fn is_enabled(&self) -> bool { self.stats.is_enabled() }
802 fn set_enabled(&self, enabled: bool) { self.stats.set_enabled(enabled); }
803 fn should_activate(&self, _: &ModbusFaultContext) -> bool {
804 self.stats.record_check();
805 true
806 }
807 fn apply(&self, ctx: &ModbusFaultContext) -> FaultAction {
808 self.stats.record_activation();
809 FaultAction::SendResponse(ctx.response_pdu.clone())
810 }
811 fn stats(&self) -> FaultStatsSnapshot { self.stats.snapshot() }
812 fn reset_stats(&self) { self.stats.reset(); }
813 fn compatible_transport(&self) -> Option<TransportKind> {
814 Some(TransportKind::Rtu)
815 }
816 }
817
818 let fault = Arc::new(RtuOnlyFault { stats: FaultStats::new() });
819 let pipeline = FaultPipeline::new().with_fault(fault.clone());
820
821 let tcp_ctx = test_ctx(); assert!(pipeline.apply(&tcp_ctx).is_none());
824
825 let rtu_ctx = ModbusFaultContext::rtu(1, 0x03, &[0x03], &[0x03, 0x02], 1);
827 assert!(pipeline.apply(&rtu_ctx).is_some());
828 }
829}