1pub mod config;
53pub mod stats;
54pub mod targeting;
55
56pub mod crc_corruption;
58pub mod delayed_response;
59pub mod exception_injection;
60pub mod extra_data;
61pub mod no_response;
62pub mod partial_frame;
63pub mod truncated_response;
64pub mod wrong_function_code;
65pub mod wrong_transaction_id;
66pub mod wrong_unit_id;
67
68pub mod connection_disruption;
70pub mod rtu_timing;
71
72use std::sync::Arc;
73use std::time::Duration;
74
75use tracing::{debug, trace};
76
77pub use config::*;
78pub use stats::{FaultStats, FaultStatsSnapshot};
79pub use targeting::FaultTarget;
80
81pub use crc_corruption::CrcCorruptionFault;
83pub use delayed_response::DelayedResponseFault;
84pub use exception_injection::ExceptionInjectionFault;
85pub use extra_data::ExtraDataFault;
86pub use no_response::NoResponseFault;
87pub use partial_frame::PartialFrameFault;
88pub use truncated_response::TruncatedResponseFault;
89pub use wrong_function_code::WrongFunctionCodeFault;
90pub use wrong_transaction_id::WrongTransactionIdFault;
91pub use wrong_unit_id::WrongUnitIdFault;
92
93#[derive(Debug, Clone)]
98pub enum FaultAction {
99 SendResponse(Vec<u8>),
102
103 DropResponse,
106
107 DelayThenSend { delay: Duration, response: Vec<u8> },
110
111 SendPartial { bytes: Vec<u8> },
114
115 SendRawBytes(Vec<u8>),
118
119 OverrideTransactionId {
122 transaction_id: u16,
123 response: Vec<u8>,
124 },
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129pub enum TransportKind {
130 Tcp,
132 Rtu,
134}
135
136#[derive(Debug, Clone)]
141pub struct ModbusFaultContext {
142 pub transport: TransportKind,
144
145 pub unit_id: u8,
147
148 pub function_code: u8,
150
151 pub request_pdu: Vec<u8>,
153
154 pub response_pdu: Vec<u8>,
157
158 pub transaction_id: u16,
160
161 pub is_broadcast: bool,
163
164 pub request_number: u64,
166}
167
168impl ModbusFaultContext {
169 pub fn rtu(
171 unit_id: u8,
172 function_code: u8,
173 request_pdu: &[u8],
174 response_pdu: &[u8],
175 request_number: u64,
176 ) -> Self {
177 Self {
178 transport: TransportKind::Rtu,
179 unit_id,
180 function_code,
181 request_pdu: request_pdu.to_vec(),
182 response_pdu: response_pdu.to_vec(),
183 transaction_id: 0,
184 is_broadcast: unit_id == 0,
185 request_number,
186 }
187 }
188
189 pub fn tcp(
191 unit_id: u8,
192 function_code: u8,
193 request_pdu: &[u8],
194 response_pdu: &[u8],
195 transaction_id: u16,
196 request_number: u64,
197 ) -> Self {
198 Self {
199 transport: TransportKind::Tcp,
200 unit_id,
201 function_code,
202 request_pdu: request_pdu.to_vec(),
203 response_pdu: response_pdu.to_vec(),
204 transaction_id,
205 is_broadcast: unit_id == 0,
206 request_number,
207 }
208 }
209}
210
211pub trait ModbusFault: Send + Sync {
225 fn fault_type(&self) -> &'static str;
227
228 fn is_enabled(&self) -> bool;
230
231 fn set_enabled(&self, enabled: bool);
233
234 fn should_activate(&self, ctx: &ModbusFaultContext) -> bool;
237
238 fn apply(&self, ctx: &ModbusFaultContext) -> FaultAction;
243
244 fn stats(&self) -> FaultStatsSnapshot;
246
247 fn reset_stats(&self);
249
250 fn is_short_circuit(&self) -> bool {
253 false
254 }
255
256 fn compatible_transport(&self) -> Option<TransportKind> {
259 None
260 }
261}
262
263pub struct FaultPipeline {
277 faults: Vec<Arc<dyn ModbusFault>>,
279 enabled: std::sync::atomic::AtomicBool,
281}
282
283impl FaultPipeline {
284 pub fn new() -> Self {
286 Self {
287 faults: Vec::new(),
288 enabled: std::sync::atomic::AtomicBool::new(true),
289 }
290 }
291
292 pub fn with_fault(mut self, fault: Arc<dyn ModbusFault>) -> Self {
294 self.faults.push(fault);
295 self
296 }
297
298 pub fn with_faults(mut self, faults: Vec<Arc<dyn ModbusFault>>) -> Self {
300 self.faults.extend(faults);
301 self
302 }
303
304 pub fn is_enabled(&self) -> bool {
306 self.enabled.load(std::sync::atomic::Ordering::Acquire)
307 }
308
309 pub fn set_enabled(&self, enabled: bool) {
311 self.enabled
312 .store(enabled, std::sync::atomic::Ordering::Release);
313 }
314
315 pub fn len(&self) -> usize {
317 self.faults.len()
318 }
319
320 pub fn is_empty(&self) -> bool {
322 self.faults.is_empty()
323 }
324
325 pub fn apply(&self, ctx: &ModbusFaultContext) -> Option<FaultAction> {
335 if !self.is_enabled() {
336 return None;
337 }
338
339 for fault in &self.faults {
341 if !fault.is_enabled() {
342 continue;
343 }
344 if !fault.is_short_circuit() {
345 continue;
346 }
347 if let Some(required) = fault.compatible_transport() {
349 if required != ctx.transport {
350 continue;
351 }
352 }
353 if fault.should_activate(ctx) {
354 let action = fault.apply(ctx);
355 debug!(
356 fault_type = fault.fault_type(),
357 unit_id = ctx.unit_id,
358 fc = ctx.function_code,
359 "Short-circuit fault activated"
360 );
361 return Some(action);
362 }
363 }
364
365 let mut current_response = ctx.response_pdu.clone();
367 let mut any_activated = false;
368 let mut total_delay = Duration::ZERO;
369 let mut override_tid: Option<u16> = None;
370 let mut send_raw = false;
371
372 for fault in &self.faults {
373 if !fault.is_enabled() {
374 continue;
375 }
376 if fault.is_short_circuit() {
377 continue;
378 }
379 if let Some(required) = fault.compatible_transport() {
381 if required != ctx.transport {
382 continue;
383 }
384 }
385 if !fault.should_activate(ctx) {
386 continue;
387 }
388
389 let mut sub_ctx = ctx.clone();
391 sub_ctx.response_pdu = current_response.clone();
392
393 let action = fault.apply(&sub_ctx);
394 any_activated = true;
395
396 trace!(
397 fault_type = fault.fault_type(),
398 unit_id = ctx.unit_id,
399 fc = ctx.function_code,
400 "Mutation fault activated"
401 );
402
403 match action {
404 FaultAction::SendResponse(pdu) => {
405 current_response = pdu;
406 }
407 FaultAction::DelayThenSend { delay, response } => {
408 total_delay += delay;
409 current_response = response;
410 }
411 FaultAction::OverrideTransactionId {
412 transaction_id,
413 response,
414 } => {
415 override_tid = Some(transaction_id);
416 current_response = response;
417 }
418 FaultAction::SendRawBytes(bytes) => {
419 current_response = bytes;
420 send_raw = true;
421 }
422 FaultAction::DropResponse => {
424 return Some(FaultAction::DropResponse);
425 }
426 FaultAction::SendPartial { bytes } => {
427 return Some(FaultAction::SendPartial { bytes });
428 }
429 }
430 }
431
432 if !any_activated {
433 return None;
434 }
435
436 let action = if send_raw {
438 if total_delay > Duration::ZERO {
439 FaultAction::DelayThenSend {
441 delay: total_delay,
442 response: current_response,
443 }
444 } else {
445 FaultAction::SendRawBytes(current_response)
446 }
447 } else if let Some(tid) = override_tid {
448 if total_delay > Duration::ZERO {
449 FaultAction::DelayThenSend {
450 delay: total_delay,
451 response: current_response,
452 }
453 } else {
454 FaultAction::OverrideTransactionId {
455 transaction_id: tid,
456 response: current_response,
457 }
458 }
459 } else if total_delay > Duration::ZERO {
460 FaultAction::DelayThenSend {
461 delay: total_delay,
462 response: current_response,
463 }
464 } else {
465 FaultAction::SendResponse(current_response)
466 };
467
468 debug!(
469 unit_id = ctx.unit_id,
470 fc = ctx.function_code,
471 "Fault pipeline produced action"
472 );
473
474 Some(action)
475 }
476
477 pub fn all_stats(&self) -> Vec<(&'static str, FaultStatsSnapshot)> {
479 self.faults
480 .iter()
481 .map(|f| (f.fault_type(), f.stats()))
482 .collect()
483 }
484
485 pub fn reset_all_stats(&self) {
487 for fault in &self.faults {
488 fault.reset_stats();
489 }
490 }
491
492 pub fn from_config(config: &FaultInjectionConfig) -> Self {
494 let mut pipeline = Self::new();
495
496 if !config.enabled {
497 pipeline
498 .enabled
499 .store(false, std::sync::atomic::Ordering::Release);
500 return pipeline;
501 }
502
503 for fault_cfg in &config.faults {
504 let fault: Arc<dyn ModbusFault> = match fault_cfg.fault_type {
505 FaultType::CrcCorruption => Arc::new(CrcCorruptionFault::from_config(
506 &fault_cfg.config,
507 fault_cfg.target.clone(),
508 )),
509 FaultType::WrongUnitId => Arc::new(WrongUnitIdFault::from_config(
510 &fault_cfg.config,
511 fault_cfg.target.clone(),
512 )),
513 FaultType::WrongFunctionCode => Arc::new(WrongFunctionCodeFault::from_config(
514 &fault_cfg.config,
515 fault_cfg.target.clone(),
516 )),
517 FaultType::WrongTransactionId => Arc::new(WrongTransactionIdFault::from_config(
518 &fault_cfg.config,
519 fault_cfg.target.clone(),
520 )),
521 FaultType::TruncatedResponse => Arc::new(TruncatedResponseFault::from_config(
522 &fault_cfg.config,
523 fault_cfg.target.clone(),
524 )),
525 FaultType::ExtraData => Arc::new(ExtraDataFault::from_config(
526 &fault_cfg.config,
527 fault_cfg.target.clone(),
528 )),
529 FaultType::DelayedResponse => Arc::new(DelayedResponseFault::from_config(
530 &fault_cfg.config,
531 fault_cfg.target.clone(),
532 )),
533 FaultType::NoResponse => Arc::new(NoResponseFault::new(fault_cfg.target.clone())),
534 FaultType::ExceptionInjection => Arc::new(ExceptionInjectionFault::from_config(
535 &fault_cfg.config,
536 fault_cfg.target.clone(),
537 )),
538 FaultType::PartialFrame => Arc::new(PartialFrameFault::from_config(
539 &fault_cfg.config,
540 fault_cfg.target.clone(),
541 )),
542 };
543
544 pipeline.faults.push(fault);
545 }
546
547 pipeline
548 }
549}
550
551impl Default for FaultPipeline {
552 fn default() -> Self {
553 Self::new()
554 }
555}
556
557impl std::fmt::Debug for FaultPipeline {
558 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
559 f.debug_struct("FaultPipeline")
560 .field("enabled", &self.is_enabled())
561 .field("fault_count", &self.faults.len())
562 .field(
563 "faults",
564 &self
565 .faults
566 .iter()
567 .map(|f| f.fault_type())
568 .collect::<Vec<_>>(),
569 )
570 .finish()
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577
578 struct AlwaysDropFault {
580 stats: FaultStats,
581 }
582
583 impl AlwaysDropFault {
584 fn new() -> Self {
585 Self {
586 stats: FaultStats::new(),
587 }
588 }
589 }
590
591 impl ModbusFault for AlwaysDropFault {
592 fn fault_type(&self) -> &'static str {
593 "always_drop"
594 }
595 fn is_enabled(&self) -> bool {
596 self.stats.is_enabled()
597 }
598 fn set_enabled(&self, enabled: bool) {
599 self.stats.set_enabled(enabled);
600 }
601 fn should_activate(&self, _ctx: &ModbusFaultContext) -> bool {
602 self.stats.record_check();
603 true
604 }
605 fn apply(&self, _ctx: &ModbusFaultContext) -> FaultAction {
606 self.stats.record_activation();
607 self.stats.record_affected();
608 FaultAction::DropResponse
609 }
610 fn stats(&self) -> FaultStatsSnapshot {
611 self.stats.snapshot()
612 }
613 fn reset_stats(&self) {
614 self.stats.reset();
615 }
616 fn is_short_circuit(&self) -> bool {
617 true
618 }
619 }
620
621 struct PrependByteFault {
623 byte: u8,
624 stats: FaultStats,
625 }
626
627 impl PrependByteFault {
628 fn new(byte: u8) -> Self {
629 Self {
630 byte,
631 stats: FaultStats::new(),
632 }
633 }
634 }
635
636 impl ModbusFault for PrependByteFault {
637 fn fault_type(&self) -> &'static str {
638 "prepend_byte"
639 }
640 fn is_enabled(&self) -> bool {
641 self.stats.is_enabled()
642 }
643 fn set_enabled(&self, enabled: bool) {
644 self.stats.set_enabled(enabled);
645 }
646 fn should_activate(&self, _ctx: &ModbusFaultContext) -> bool {
647 self.stats.record_check();
648 true
649 }
650 fn apply(&self, ctx: &ModbusFaultContext) -> FaultAction {
651 self.stats.record_activation();
652 self.stats.record_affected();
653 let mut response = vec![self.byte];
654 response.extend_from_slice(&ctx.response_pdu);
655 FaultAction::SendResponse(response)
656 }
657 fn stats(&self) -> FaultStatsSnapshot {
658 self.stats.snapshot()
659 }
660 fn reset_stats(&self) {
661 self.stats.reset();
662 }
663 }
664
665 fn test_ctx() -> ModbusFaultContext {
666 ModbusFaultContext::tcp(
667 1,
668 0x03,
669 &[0x03, 0x00, 0x00, 0x00, 0x01],
670 &[0x03, 0x02, 0x00, 0x64],
671 1,
672 1,
673 )
674 }
675
676 #[test]
677 fn test_empty_pipeline() {
678 let pipeline = FaultPipeline::new();
679 assert!(pipeline.is_empty());
680 assert_eq!(pipeline.len(), 0);
681 assert!(pipeline.apply(&test_ctx()).is_none());
682 }
683
684 #[test]
685 fn test_pipeline_disabled() {
686 let pipeline = FaultPipeline::new().with_fault(Arc::new(AlwaysDropFault::new()));
687 pipeline.set_enabled(false);
688 assert!(pipeline.apply(&test_ctx()).is_none());
689 }
690
691 #[test]
692 fn test_short_circuit_priority() {
693 let drop_fault = Arc::new(AlwaysDropFault::new());
694 let prepend_fault = Arc::new(PrependByteFault::new(0xFF));
695
696 let pipeline = FaultPipeline::new()
698 .with_fault(prepend_fault.clone())
699 .with_fault(drop_fault.clone());
700
701 let action = pipeline.apply(&test_ctx());
702 assert!(matches!(action, Some(FaultAction::DropResponse)));
703
704 assert_eq!(drop_fault.stats().checks, 1);
706 assert_eq!(drop_fault.stats().activations, 1);
707
708 assert_eq!(prepend_fault.stats().checks, 0);
710 }
711
712 #[test]
713 fn test_mutation_chaining() {
714 let fault1 = Arc::new(PrependByteFault::new(0xAA));
715 let fault2 = Arc::new(PrependByteFault::new(0xBB));
716
717 let pipeline = FaultPipeline::new()
718 .with_fault(fault1.clone())
719 .with_fault(fault2.clone());
720
721 let ctx = test_ctx();
722 let action = pipeline.apply(&ctx).unwrap();
723
724 match action {
725 FaultAction::SendResponse(pdu) => {
726 assert_eq!(pdu[0], 0xBB);
728 assert_eq!(pdu[1], 0xAA);
729 assert_eq!(&pdu[2..], &ctx.response_pdu);
730 }
731 _ => panic!("Expected SendResponse"),
732 }
733 }
734
735 #[test]
736 fn test_disabled_fault_skipped() {
737 let fault = Arc::new(PrependByteFault::new(0xFF));
738 fault.set_enabled(false);
739
740 let pipeline = FaultPipeline::new().with_fault(fault.clone());
741 assert!(pipeline.apply(&test_ctx()).is_none());
742 assert_eq!(fault.stats().checks, 0);
743 }
744
745 #[test]
746 fn test_all_stats() {
747 let fault1 = Arc::new(PrependByteFault::new(0xAA));
748 let fault2 = Arc::new(PrependByteFault::new(0xBB));
749
750 let pipeline = FaultPipeline::new().with_fault(fault1).with_fault(fault2);
751
752 pipeline.apply(&test_ctx());
753
754 let stats = pipeline.all_stats();
755 assert_eq!(stats.len(), 2);
756 assert_eq!(stats[0].0, "prepend_byte");
757 assert_eq!(stats[0].1.activations, 1);
758 assert_eq!(stats[1].0, "prepend_byte");
759 assert_eq!(stats[1].1.activations, 1);
760 }
761
762 #[test]
763 fn test_reset_all_stats() {
764 let fault = Arc::new(PrependByteFault::new(0xAA));
765 let pipeline = FaultPipeline::new().with_fault(fault.clone());
766
767 pipeline.apply(&test_ctx());
768 assert_eq!(fault.stats().activations, 1);
769
770 pipeline.reset_all_stats();
771 assert_eq!(fault.stats().activations, 0);
772 }
773
774 #[test]
775 fn test_rtu_context() {
776 let ctx = ModbusFaultContext::rtu(1, 0x03, &[0x03], &[0x03, 0x02], 5);
777 assert_eq!(ctx.transport, TransportKind::Rtu);
778 assert_eq!(ctx.unit_id, 1);
779 assert_eq!(ctx.transaction_id, 0);
780 assert!(!ctx.is_broadcast);
781 assert_eq!(ctx.request_number, 5);
782 }
783
784 #[test]
785 fn test_tcp_context() {
786 let ctx = ModbusFaultContext::tcp(0, 0x03, &[0x03], &[0x03, 0x02], 42, 10);
787 assert_eq!(ctx.transport, TransportKind::Tcp);
788 assert_eq!(ctx.unit_id, 0);
789 assert!(ctx.is_broadcast);
790 assert_eq!(ctx.transaction_id, 42);
791 assert_eq!(ctx.request_number, 10);
792 }
793
794 #[test]
795 fn test_transport_compatibility() {
796 struct RtuOnlyFault {
797 stats: FaultStats,
798 }
799 impl ModbusFault for RtuOnlyFault {
800 fn fault_type(&self) -> &'static str {
801 "rtu_only"
802 }
803 fn is_enabled(&self) -> bool {
804 self.stats.is_enabled()
805 }
806 fn set_enabled(&self, enabled: bool) {
807 self.stats.set_enabled(enabled);
808 }
809 fn should_activate(&self, _: &ModbusFaultContext) -> bool {
810 self.stats.record_check();
811 true
812 }
813 fn apply(&self, ctx: &ModbusFaultContext) -> FaultAction {
814 self.stats.record_activation();
815 FaultAction::SendResponse(ctx.response_pdu.clone())
816 }
817 fn stats(&self) -> FaultStatsSnapshot {
818 self.stats.snapshot()
819 }
820 fn reset_stats(&self) {
821 self.stats.reset();
822 }
823 fn compatible_transport(&self) -> Option<TransportKind> {
824 Some(TransportKind::Rtu)
825 }
826 }
827
828 let fault = Arc::new(RtuOnlyFault {
829 stats: FaultStats::new(),
830 });
831 let pipeline = FaultPipeline::new().with_fault(fault.clone());
832
833 let tcp_ctx = test_ctx(); assert!(pipeline.apply(&tcp_ctx).is_none());
836
837 let rtu_ctx = ModbusFaultContext::rtu(1, 0x03, &[0x03], &[0x03, 0x02], 1);
839 assert!(pipeline.apply(&rtu_ctx).is_some());
840 }
841}