1use std::collections::VecDeque;
32use std::fmt;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::time::{Duration, Instant};
35
36use dashmap::DashMap;
37use parking_lot::RwLock;
38use serde::{Deserialize, Serialize};
39
40#[derive(Debug, Clone)]
46struct ErrorEvent {
47 timestamp: Instant,
49 #[allow(dead_code)]
51 category: ErrorCategory,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56pub enum ErrorCategory {
57 SendFailure,
59 AckError,
61 AckTimeout,
63 ConfirmationNack,
65 ConfirmationTimeout,
67 FlowControlDrop,
69}
70
71impl ErrorCategory {
72 pub fn name(&self) -> &'static str {
74 match self {
75 Self::SendFailure => "SendFailure",
76 Self::AckError => "AckError",
77 Self::AckTimeout => "AckTimeout",
78 Self::ConfirmationNack => "ConfirmationNack",
79 Self::ConfirmationTimeout => "ConfirmationTimeout",
80 Self::FlowControlDrop => "FlowControlDrop",
81 }
82 }
83
84 pub fn all() -> &'static [ErrorCategory] {
86 &[
87 Self::SendFailure,
88 Self::AckError,
89 Self::AckTimeout,
90 Self::ConfirmationNack,
91 Self::ConfirmationTimeout,
92 Self::FlowControlDrop,
93 ]
94 }
95}
96
97impl fmt::Display for ErrorCategory {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 write!(f, "{}", self.name())
100 }
101}
102
103struct ChannelErrorState {
109 consecutive_errors: u32,
111 recent_errors: VecDeque<ErrorEvent>,
113 total_successes: u64,
115 total_failures: u64,
117 category_counts: [u64; 6],
119 last_success_at: Option<Instant>,
121 last_error_at: Option<Instant>,
123 restart_triggered: bool,
125}
126
127impl ChannelErrorState {
128 fn new() -> Self {
129 Self {
130 consecutive_errors: 0,
131 recent_errors: VecDeque::with_capacity(128),
132 total_successes: 0,
133 total_failures: 0,
134 category_counts: [0; 6],
135 last_success_at: None,
136 last_error_at: None,
137 restart_triggered: false,
138 }
139 }
140
141 fn record_success(&mut self) {
143 self.consecutive_errors = 0;
144 self.total_successes += 1;
145 self.last_success_at = Some(Instant::now());
146 self.restart_triggered = false;
147 }
148
149 fn record_failure(&mut self, category: ErrorCategory) {
151 self.consecutive_errors += 1;
152 self.total_failures += 1;
153 self.last_error_at = Some(Instant::now());
154 self.category_counts[category as usize] += 1;
155 self.recent_errors.push_back(ErrorEvent {
156 timestamp: Instant::now(),
157 category,
158 });
159 }
160
161 fn prune_old_events(&mut self, window: Duration) {
163 let cutoff = Instant::now() - window;
164 while let Some(front) = self.recent_errors.front() {
165 if front.timestamp < cutoff {
166 self.recent_errors.pop_front();
167 } else {
168 break;
169 }
170 }
171 }
172
173 fn window_error_count(&self, window: Duration) -> usize {
175 let cutoff = Instant::now() - window;
176 self.recent_errors
177 .iter()
178 .filter(|e| e.timestamp >= cutoff)
179 .count()
180 }
181
182 fn window_error_rate(&self, window: Duration, total_sends_in_window: u64) -> f64 {
185 if total_sends_in_window == 0 {
186 return 0.0;
187 }
188 let errors = self.window_error_count(window) as f64;
189 errors / total_sends_in_window as f64
190 }
191
192 fn reset(&mut self) {
194 self.consecutive_errors = 0;
195 self.recent_errors.clear();
196 self.restart_triggered = false;
197 }
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct SendErrorTrackerConfig {
207 #[serde(default = "default_true")]
209 pub enabled: bool,
210
211 #[serde(default = "default_consecutive_threshold")]
215 pub consecutive_threshold: u32,
216
217 #[serde(default = "default_window_ms")]
221 pub window_ms: u64,
222
223 #[serde(default = "default_rate_threshold")]
227 pub rate_threshold: f64,
228
229 #[serde(default = "default_min_events_for_rate")]
233 pub min_events_for_rate: u64,
234}
235
236fn default_true() -> bool {
237 true
238}
239
240fn default_consecutive_threshold() -> u32 {
241 5
242}
243
244fn default_window_ms() -> u64 {
245 60_000
246}
247
248fn default_rate_threshold() -> f64 {
249 0.5
250}
251
252fn default_min_events_for_rate() -> u64 {
253 10
254}
255
256impl Default for SendErrorTrackerConfig {
257 fn default() -> Self {
258 Self {
259 enabled: true,
260 consecutive_threshold: default_consecutive_threshold(),
261 window_ms: default_window_ms(),
262 rate_threshold: default_rate_threshold(),
263 min_events_for_rate: default_min_events_for_rate(),
264 }
265 }
266}
267
268impl SendErrorTrackerConfig {
269 pub fn window(&self) -> Duration {
271 Duration::from_millis(self.window_ms)
272 }
273
274 pub fn validate(&self) -> Result<(), String> {
276 if self.consecutive_threshold == 0 {
277 return Err("SendErrorTracker consecutive_threshold must be > 0".to_string());
278 }
279 if self.window_ms == 0 {
280 return Err("SendErrorTracker window_ms must be > 0".to_string());
281 }
282 if self.rate_threshold < 0.0 || self.rate_threshold > 1.0 {
283 return Err(format!(
284 "SendErrorTracker rate_threshold must be 0.0-1.0, got {}",
285 self.rate_threshold
286 ));
287 }
288 Ok(())
289 }
290}
291
292#[derive(Debug, Clone, PartialEq, Eq)]
300pub enum TrackingResult {
301 Recorded,
303 ConsecutiveThresholdExceeded {
305 consecutive_errors: u32,
306 threshold: u32,
307 },
308 RateThresholdExceeded {
310 error_count: usize,
311 window_ms: u64,
312 rate: u32, },
314}
315
316impl TrackingResult {
317 pub fn requires_restart(&self) -> bool {
319 matches!(self, Self::ConsecutiveThresholdExceeded { .. })
320 }
321
322 pub fn is_warning(&self) -> bool {
324 matches!(self, Self::RateThresholdExceeded { .. })
325 }
326}
327
328pub struct SendErrorTracker {
336 config: SendErrorTrackerConfig,
338 channels: DashMap<u8, RwLock<ChannelErrorState>>,
340 stats: TrackerStats,
342}
343
344pub struct TrackerStats {
346 pub total_successes: AtomicU64,
348 pub total_failures: AtomicU64,
350 pub consecutive_triggers: AtomicU64,
352 pub rate_triggers: AtomicU64,
354}
355
356impl TrackerStats {
357 fn new() -> Self {
358 Self {
359 total_successes: AtomicU64::new(0),
360 total_failures: AtomicU64::new(0),
361 consecutive_triggers: AtomicU64::new(0),
362 rate_triggers: AtomicU64::new(0),
363 }
364 }
365
366 pub fn snapshot(&self) -> TrackerStatsSnapshot {
368 TrackerStatsSnapshot {
369 total_successes: self.total_successes.load(Ordering::Relaxed),
370 total_failures: self.total_failures.load(Ordering::Relaxed),
371 consecutive_triggers: self.consecutive_triggers.load(Ordering::Relaxed),
372 rate_triggers: self.rate_triggers.load(Ordering::Relaxed),
373 }
374 }
375}
376
377#[derive(Debug, Clone)]
379pub struct TrackerStatsSnapshot {
380 pub total_successes: u64,
381 pub total_failures: u64,
382 pub consecutive_triggers: u64,
383 pub rate_triggers: u64,
384}
385
386impl TrackerStatsSnapshot {
387 pub fn error_rate(&self) -> f64 {
389 let total = self.total_successes + self.total_failures;
390 if total == 0 {
391 return 0.0;
392 }
393 self.total_failures as f64 / total as f64
394 }
395
396 pub fn total_events(&self) -> u64 {
398 self.total_successes + self.total_failures
399 }
400}
401
402#[derive(Debug, Clone)]
404pub struct ChannelErrorSummary {
405 pub channel_id: u8,
406 pub consecutive_errors: u32,
407 pub total_successes: u64,
408 pub total_failures: u64,
409 pub window_error_count: usize,
410 pub restart_triggered: bool,
411 pub last_success_elapsed_ms: Option<u64>,
412 pub last_error_elapsed_ms: Option<u64>,
413 pub category_counts: [(ErrorCategory, u64); 6],
414}
415
416impl SendErrorTracker {
417 pub fn new(config: SendErrorTrackerConfig) -> Self {
419 Self {
420 channels: DashMap::new(),
421 config,
422 stats: TrackerStats::new(),
423 }
424 }
425
426 pub fn with_defaults() -> Self {
428 Self::new(SendErrorTrackerConfig::default())
429 }
430
431 pub fn is_enabled(&self) -> bool {
433 self.config.enabled
434 }
435
436 pub fn config(&self) -> &SendErrorTrackerConfig {
438 &self.config
439 }
440
441 pub fn on_send_success(&self, channel_id: u8) {
443 if !self.config.enabled {
444 return;
445 }
446
447 self.ensure_channel(channel_id);
448
449 if let Some(state_lock) = self.channels.get(&channel_id) {
450 let mut state = state_lock.write();
451 state.record_success();
452 }
453
454 self.stats.total_successes.fetch_add(1, Ordering::Relaxed);
455 }
456
457 pub fn on_send_failure(
461 &self,
462 channel_id: u8,
463 category: ErrorCategory,
464 ) -> TrackingResult {
465 if !self.config.enabled {
466 return TrackingResult::Recorded;
467 }
468
469 self.ensure_channel(channel_id);
470 self.stats.total_failures.fetch_add(1, Ordering::Relaxed);
471
472 let result = if let Some(state_lock) = self.channels.get(&channel_id) {
473 let mut state = state_lock.write();
474 state.record_failure(category);
475
476 let window = self.config.window();
478 state.prune_old_events(window);
479
480 if state.consecutive_errors >= self.config.consecutive_threshold
482 && !state.restart_triggered
483 {
484 state.restart_triggered = true;
485 self.stats.consecutive_triggers.fetch_add(1, Ordering::Relaxed);
486 return TrackingResult::ConsecutiveThresholdExceeded {
487 consecutive_errors: state.consecutive_errors,
488 threshold: self.config.consecutive_threshold,
489 };
490 }
491
492 let total_in_window = state.total_successes + state.total_failures;
494 if total_in_window >= self.config.min_events_for_rate {
495 let error_count = state.window_error_count(window);
496 let rate = state.window_error_rate(window, total_in_window);
497 if rate > self.config.rate_threshold {
498 self.stats.rate_triggers.fetch_add(1, Ordering::Relaxed);
499 return TrackingResult::RateThresholdExceeded {
500 error_count,
501 window_ms: self.config.window_ms,
502 rate: (rate * 100.0) as u32,
503 };
504 }
505 }
506
507 TrackingResult::Recorded
508 } else {
509 TrackingResult::Recorded
510 };
511
512 result
513 }
514
515 pub fn on_ack_error(&self, channel_id: u8, _status: u8) -> TrackingResult {
517 self.on_send_failure(channel_id, ErrorCategory::AckError)
518 }
519
520 pub fn on_ack_timeout(&self, channel_id: u8) -> TrackingResult {
522 self.on_send_failure(channel_id, ErrorCategory::AckTimeout)
523 }
524
525 pub fn on_confirmation_nack(&self, channel_id: u8) -> TrackingResult {
527 self.on_send_failure(channel_id, ErrorCategory::ConfirmationNack)
528 }
529
530 pub fn on_confirmation_timeout(&self, channel_id: u8) -> TrackingResult {
532 self.on_send_failure(channel_id, ErrorCategory::ConfirmationTimeout)
533 }
534
535 pub fn on_flow_control_drop(&self, channel_id: u8) -> TrackingResult {
537 self.on_send_failure(channel_id, ErrorCategory::FlowControlDrop)
538 }
539
540 pub fn consecutive_errors(&self, channel_id: u8) -> u32 {
542 self.channels
543 .get(&channel_id)
544 .map(|s| s.read().consecutive_errors)
545 .unwrap_or(0)
546 }
547
548 pub fn window_error_count(&self, channel_id: u8) -> usize {
550 let window = self.config.window();
551 self.channels
552 .get(&channel_id)
553 .map(|s| s.read().window_error_count(window))
554 .unwrap_or(0)
555 }
556
557 pub fn is_restart_triggered(&self, channel_id: u8) -> bool {
559 self.channels
560 .get(&channel_id)
561 .map(|s| s.read().restart_triggered)
562 .unwrap_or(false)
563 }
564
565 pub fn reset_channel(&self, channel_id: u8) {
567 if let Some(state_lock) = self.channels.get(&channel_id) {
568 state_lock.write().reset();
569 }
570 }
571
572 pub fn remove_channel(&self, channel_id: u8) {
574 self.channels.remove(&channel_id);
575 }
576
577 pub fn channel_summary(&self, channel_id: u8) -> Option<ChannelErrorSummary> {
579 let window = self.config.window();
580 self.channels.get(&channel_id).map(|state_lock| {
581 let state = state_lock.read();
582 let categories = ErrorCategory::all();
583 let mut category_counts = [(ErrorCategory::SendFailure, 0u64); 6];
584 for (i, cat) in categories.iter().enumerate() {
585 category_counts[i] = (*cat, state.category_counts[i]);
586 }
587
588 ChannelErrorSummary {
589 channel_id,
590 consecutive_errors: state.consecutive_errors,
591 total_successes: state.total_successes,
592 total_failures: state.total_failures,
593 window_error_count: state.window_error_count(window),
594 restart_triggered: state.restart_triggered,
595 last_success_elapsed_ms: state.last_success_at.map(|t| t.elapsed().as_millis() as u64),
596 last_error_elapsed_ms: state.last_error_at.map(|t| t.elapsed().as_millis() as u64),
597 category_counts,
598 }
599 })
600 }
601
602 pub fn stats_snapshot(&self) -> TrackerStatsSnapshot {
604 self.stats.snapshot()
605 }
606
607 fn ensure_channel(&self, channel_id: u8) {
609 self.channels
610 .entry(channel_id)
611 .or_insert_with(|| RwLock::new(ChannelErrorState::new()));
612 }
613}
614
615impl fmt::Debug for SendErrorTracker {
616 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617 f.debug_struct("SendErrorTracker")
618 .field("enabled", &self.config.enabled)
619 .field("channels", &self.channels.len())
620 .field("consecutive_threshold", &self.config.consecutive_threshold)
621 .field("window_ms", &self.config.window_ms)
622 .field("rate_threshold", &self.config.rate_threshold)
623 .finish()
624 }
625}
626
627impl Default for SendErrorTracker {
628 fn default() -> Self {
629 Self::with_defaults()
630 }
631}
632
633#[cfg(test)]
638mod tests {
639 use super::*;
640
641 #[test]
642 fn test_error_category_names() {
643 assert_eq!(ErrorCategory::SendFailure.name(), "SendFailure");
644 assert_eq!(ErrorCategory::AckError.name(), "AckError");
645 assert_eq!(ErrorCategory::AckTimeout.name(), "AckTimeout");
646 assert_eq!(ErrorCategory::ConfirmationNack.name(), "ConfirmationNack");
647 assert_eq!(ErrorCategory::ConfirmationTimeout.name(), "ConfirmationTimeout");
648 assert_eq!(ErrorCategory::FlowControlDrop.name(), "FlowControlDrop");
649 }
650
651 #[test]
652 fn test_error_category_display() {
653 assert_eq!(format!("{}", ErrorCategory::SendFailure), "SendFailure");
654 }
655
656 #[test]
657 fn test_error_category_all() {
658 assert_eq!(ErrorCategory::all().len(), 6);
659 }
660
661 #[test]
662 fn test_tracker_success_resets_consecutive() {
663 let tracker = SendErrorTracker::with_defaults();
664
665 tracker.on_send_failure(1, ErrorCategory::SendFailure);
666 tracker.on_send_failure(1, ErrorCategory::SendFailure);
667 assert_eq!(tracker.consecutive_errors(1), 2);
668
669 tracker.on_send_success(1);
670 assert_eq!(tracker.consecutive_errors(1), 0);
671 }
672
673 #[test]
674 fn test_tracker_consecutive_threshold() {
675 let config = SendErrorTrackerConfig {
676 consecutive_threshold: 3,
677 ..Default::default()
678 };
679 let tracker = SendErrorTracker::new(config);
680
681 let r1 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
682 assert_eq!(r1, TrackingResult::Recorded);
683
684 let r2 = tracker.on_send_failure(1, ErrorCategory::AckError);
685 assert_eq!(r2, TrackingResult::Recorded);
686
687 let r3 = tracker.on_send_failure(1, ErrorCategory::AckTimeout);
688 assert!(matches!(r3, TrackingResult::ConsecutiveThresholdExceeded {
689 consecutive_errors: 3,
690 threshold: 3,
691 }));
692
693 assert!(r3.requires_restart());
694 assert!(!r3.is_warning());
695 }
696
697 #[test]
698 fn test_tracker_restart_only_triggers_once() {
699 let config = SendErrorTrackerConfig {
700 consecutive_threshold: 2,
701 ..Default::default()
702 };
703 let tracker = SendErrorTracker::new(config);
704
705 let r1 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
706 assert_eq!(r1, TrackingResult::Recorded);
707
708 let r2 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
709 assert!(matches!(r2, TrackingResult::ConsecutiveThresholdExceeded { .. }));
710
711 let r3 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
713 assert!(!matches!(r3, TrackingResult::ConsecutiveThresholdExceeded { .. }));
715 }
716
717 #[test]
718 fn test_tracker_success_clears_restart_flag() {
719 let config = SendErrorTrackerConfig {
720 consecutive_threshold: 2,
721 ..Default::default()
722 };
723 let tracker = SendErrorTracker::new(config);
724
725 tracker.on_send_failure(1, ErrorCategory::SendFailure);
726 tracker.on_send_failure(1, ErrorCategory::SendFailure);
727 assert!(tracker.is_restart_triggered(1));
728
729 tracker.on_send_success(1);
730 assert!(!tracker.is_restart_triggered(1));
731
732 tracker.on_send_failure(1, ErrorCategory::SendFailure);
734 let r = tracker.on_send_failure(1, ErrorCategory::SendFailure);
735 assert!(matches!(r, TrackingResult::ConsecutiveThresholdExceeded { .. }));
736 }
737
738 #[test]
739 fn test_tracker_multi_channel() {
740 let tracker = SendErrorTracker::with_defaults();
741
742 tracker.on_send_failure(1, ErrorCategory::SendFailure);
743 tracker.on_send_failure(1, ErrorCategory::SendFailure);
744 tracker.on_send_failure(2, ErrorCategory::AckError);
745
746 assert_eq!(tracker.consecutive_errors(1), 2);
747 assert_eq!(tracker.consecutive_errors(2), 1);
748 assert_eq!(tracker.consecutive_errors(3), 0); }
750
751 #[test]
752 fn test_tracker_window_error_count() {
753 let config = SendErrorTrackerConfig {
754 window_ms: 60_000, ..Default::default()
756 };
757 let tracker = SendErrorTracker::new(config);
758
759 tracker.on_send_failure(1, ErrorCategory::SendFailure);
760 tracker.on_send_failure(1, ErrorCategory::AckError);
761 tracker.on_send_failure(1, ErrorCategory::AckTimeout);
762
763 assert_eq!(tracker.window_error_count(1), 3);
764 }
765
766 #[test]
767 fn test_tracker_convenience_methods() {
768 let config = SendErrorTrackerConfig {
769 consecutive_threshold: 100, ..Default::default()
771 };
772 let tracker = SendErrorTracker::new(config);
773
774 let r1 = tracker.on_ack_error(1, 0x21);
775 assert_eq!(r1, TrackingResult::Recorded);
776
777 let r2 = tracker.on_ack_timeout(1);
778 assert_eq!(r2, TrackingResult::Recorded);
779
780 let r3 = tracker.on_confirmation_nack(1);
781 assert_eq!(r3, TrackingResult::Recorded);
782
783 let r4 = tracker.on_confirmation_timeout(1);
784 assert_eq!(r4, TrackingResult::Recorded);
785
786 let r5 = tracker.on_flow_control_drop(1);
787 assert_eq!(r5, TrackingResult::Recorded);
788
789 assert_eq!(tracker.consecutive_errors(1), 5);
790 }
791
792 #[test]
793 fn test_tracker_reset_channel() {
794 let tracker = SendErrorTracker::with_defaults();
795
796 tracker.on_send_failure(1, ErrorCategory::SendFailure);
797 tracker.on_send_failure(1, ErrorCategory::SendFailure);
798 assert_eq!(tracker.consecutive_errors(1), 2);
799
800 tracker.reset_channel(1);
801 assert_eq!(tracker.consecutive_errors(1), 0);
802 assert!(!tracker.is_restart_triggered(1));
803 }
804
805 #[test]
806 fn test_tracker_remove_channel() {
807 let tracker = SendErrorTracker::with_defaults();
808
809 tracker.on_send_failure(1, ErrorCategory::SendFailure);
810 assert_eq!(tracker.consecutive_errors(1), 1);
811
812 tracker.remove_channel(1);
813 assert_eq!(tracker.consecutive_errors(1), 0);
814 }
815
816 #[test]
817 fn test_tracker_channel_summary() {
818 let tracker = SendErrorTracker::with_defaults();
819
820 tracker.on_send_success(1);
821 tracker.on_send_failure(1, ErrorCategory::SendFailure);
822 tracker.on_send_failure(1, ErrorCategory::AckError);
823
824 let summary = tracker.channel_summary(1).unwrap();
825 assert_eq!(summary.channel_id, 1);
826 assert_eq!(summary.consecutive_errors, 2);
827 assert_eq!(summary.total_successes, 1);
828 assert_eq!(summary.total_failures, 2);
829 assert!(!summary.restart_triggered);
830 assert!(summary.last_success_elapsed_ms.is_some());
831 assert!(summary.last_error_elapsed_ms.is_some());
832 }
833
834 #[test]
835 fn test_tracker_global_stats() {
836 let tracker = SendErrorTracker::with_defaults();
837
838 tracker.on_send_success(1);
839 tracker.on_send_success(2);
840 tracker.on_send_failure(1, ErrorCategory::SendFailure);
841
842 let stats = tracker.stats_snapshot();
843 assert_eq!(stats.total_successes, 2);
844 assert_eq!(stats.total_failures, 1);
845 assert_eq!(stats.total_events(), 3);
846
847 let rate = stats.error_rate();
848 assert!((rate - 1.0 / 3.0).abs() < 0.01);
849 }
850
851 #[test]
852 fn test_tracker_stats_empty() {
853 let stats = TrackerStatsSnapshot {
854 total_successes: 0,
855 total_failures: 0,
856 consecutive_triggers: 0,
857 rate_triggers: 0,
858 };
859 assert_eq!(stats.error_rate(), 0.0);
860 assert_eq!(stats.total_events(), 0);
861 }
862
863 #[test]
864 fn test_tracker_disabled() {
865 let config = SendErrorTrackerConfig {
866 enabled: false,
867 ..Default::default()
868 };
869 let tracker = SendErrorTracker::new(config);
870
871 tracker.on_send_success(1);
872 let result = tracker.on_send_failure(1, ErrorCategory::SendFailure);
873 assert_eq!(result, TrackingResult::Recorded);
874 assert_eq!(tracker.consecutive_errors(1), 0);
875 }
876
877 #[test]
878 fn test_config_validate() {
879 assert!(SendErrorTrackerConfig::default().validate().is_ok());
880
881 assert!(SendErrorTrackerConfig {
882 consecutive_threshold: 0,
883 ..Default::default()
884 }.validate().is_err());
885
886 assert!(SendErrorTrackerConfig {
887 window_ms: 0,
888 ..Default::default()
889 }.validate().is_err());
890
891 assert!(SendErrorTrackerConfig {
892 rate_threshold: 1.5,
893 ..Default::default()
894 }.validate().is_err());
895
896 assert!(SendErrorTrackerConfig {
897 rate_threshold: -0.1,
898 ..Default::default()
899 }.validate().is_err());
900 }
901
902 #[test]
903 fn test_config_defaults() {
904 let config = SendErrorTrackerConfig::default();
905 assert!(config.enabled);
906 assert_eq!(config.consecutive_threshold, 5);
907 assert_eq!(config.window_ms, 60_000);
908 assert_eq!(config.rate_threshold, 0.5);
909 assert_eq!(config.min_events_for_rate, 10);
910 }
911
912 #[test]
913 fn test_tracker_debug() {
914 let tracker = SendErrorTracker::with_defaults();
915 let debug_str = format!("{:?}", tracker);
916 assert!(debug_str.contains("SendErrorTracker"));
917 assert!(debug_str.contains("enabled"));
918 }
919
920 #[test]
921 fn test_tracking_result_properties() {
922 let recorded = TrackingResult::Recorded;
923 assert!(!recorded.requires_restart());
924 assert!(!recorded.is_warning());
925
926 let consecutive = TrackingResult::ConsecutiveThresholdExceeded {
927 consecutive_errors: 5,
928 threshold: 5,
929 };
930 assert!(consecutive.requires_restart());
931 assert!(!consecutive.is_warning());
932
933 let rate = TrackingResult::RateThresholdExceeded {
934 error_count: 10,
935 window_ms: 60_000,
936 rate: 75,
937 };
938 assert!(!rate.requires_restart());
939 assert!(rate.is_warning());
940 }
941
942 #[test]
943 fn test_category_counts_in_summary() {
944 let tracker = SendErrorTracker::with_defaults();
945
946 tracker.on_send_failure(1, ErrorCategory::SendFailure);
947 tracker.on_send_failure(1, ErrorCategory::SendFailure);
948 tracker.on_send_failure(1, ErrorCategory::AckError);
949 tracker.on_send_failure(1, ErrorCategory::ConfirmationNack);
950
951 let summary = tracker.channel_summary(1).unwrap();
952
953 let send_failures = summary.category_counts
955 .iter()
956 .find(|(cat, _)| *cat == ErrorCategory::SendFailure)
957 .map(|(_, count)| *count)
958 .unwrap();
959 assert_eq!(send_failures, 2);
960
961 let ack_errors = summary.category_counts
962 .iter()
963 .find(|(cat, _)| *cat == ErrorCategory::AckError)
964 .map(|(_, count)| *count)
965 .unwrap();
966 assert_eq!(ack_errors, 1);
967 }
968}