1use std::collections::VecDeque;
32use std::fmt;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::time::{Duration, Instant};
35
36use dashmap::DashMap;
37use parking_lot::RwLock;
38use serde::{Deserialize, Serialize};
39
40#[derive(Debug, Clone)]
46struct ErrorEvent {
47 timestamp: Instant,
49 #[allow(dead_code)]
51 category: ErrorCategory,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56pub enum ErrorCategory {
57 SendFailure,
59 AckError,
61 AckTimeout,
63 ConfirmationNack,
65 ConfirmationTimeout,
67 FlowControlDrop,
69}
70
71impl ErrorCategory {
72 pub fn name(&self) -> &'static str {
74 match self {
75 Self::SendFailure => "SendFailure",
76 Self::AckError => "AckError",
77 Self::AckTimeout => "AckTimeout",
78 Self::ConfirmationNack => "ConfirmationNack",
79 Self::ConfirmationTimeout => "ConfirmationTimeout",
80 Self::FlowControlDrop => "FlowControlDrop",
81 }
82 }
83
84 pub fn all() -> &'static [ErrorCategory] {
86 &[
87 Self::SendFailure,
88 Self::AckError,
89 Self::AckTimeout,
90 Self::ConfirmationNack,
91 Self::ConfirmationTimeout,
92 Self::FlowControlDrop,
93 ]
94 }
95}
96
97impl fmt::Display for ErrorCategory {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 write!(f, "{}", self.name())
100 }
101}
102
103struct ChannelErrorState {
109 consecutive_errors: u32,
111 recent_errors: VecDeque<ErrorEvent>,
113 total_successes: u64,
115 total_failures: u64,
117 category_counts: [u64; 6],
119 last_success_at: Option<Instant>,
121 last_error_at: Option<Instant>,
123 restart_triggered: bool,
125}
126
127impl ChannelErrorState {
128 fn new() -> Self {
129 Self {
130 consecutive_errors: 0,
131 recent_errors: VecDeque::with_capacity(128),
132 total_successes: 0,
133 total_failures: 0,
134 category_counts: [0; 6],
135 last_success_at: None,
136 last_error_at: None,
137 restart_triggered: false,
138 }
139 }
140
141 fn record_success(&mut self) {
143 self.consecutive_errors = 0;
144 self.total_successes += 1;
145 self.last_success_at = Some(Instant::now());
146 self.restart_triggered = false;
147 }
148
149 fn record_failure(&mut self, category: ErrorCategory) {
151 self.consecutive_errors += 1;
152 self.total_failures += 1;
153 self.last_error_at = Some(Instant::now());
154 self.category_counts[category as usize] += 1;
155 self.recent_errors.push_back(ErrorEvent {
156 timestamp: Instant::now(),
157 category,
158 });
159 }
160
161 fn prune_old_events(&mut self, window: Duration) {
163 let cutoff = Instant::now() - window;
164 while let Some(front) = self.recent_errors.front() {
165 if front.timestamp < cutoff {
166 self.recent_errors.pop_front();
167 } else {
168 break;
169 }
170 }
171 }
172
173 fn window_error_count(&self, window: Duration) -> usize {
175 let cutoff = Instant::now() - window;
176 self.recent_errors
177 .iter()
178 .filter(|e| e.timestamp >= cutoff)
179 .count()
180 }
181
182 fn window_error_rate(&self, window: Duration, total_sends_in_window: u64) -> f64 {
185 if total_sends_in_window == 0 {
186 return 0.0;
187 }
188 let errors = self.window_error_count(window) as f64;
189 errors / total_sends_in_window as f64
190 }
191
192 fn reset(&mut self) {
194 self.consecutive_errors = 0;
195 self.recent_errors.clear();
196 self.restart_triggered = false;
197 }
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct SendErrorTrackerConfig {
207 #[serde(default = "default_true")]
209 pub enabled: bool,
210
211 #[serde(default = "default_consecutive_threshold")]
215 pub consecutive_threshold: u32,
216
217 #[serde(default = "default_window_ms")]
221 pub window_ms: u64,
222
223 #[serde(default = "default_rate_threshold")]
227 pub rate_threshold: f64,
228
229 #[serde(default = "default_min_events_for_rate")]
233 pub min_events_for_rate: u64,
234}
235
236fn default_true() -> bool {
237 true
238}
239
240fn default_consecutive_threshold() -> u32 {
241 5
242}
243
244fn default_window_ms() -> u64 {
245 60_000
246}
247
248fn default_rate_threshold() -> f64 {
249 0.5
250}
251
252fn default_min_events_for_rate() -> u64 {
253 10
254}
255
256impl Default for SendErrorTrackerConfig {
257 fn default() -> Self {
258 Self {
259 enabled: true,
260 consecutive_threshold: default_consecutive_threshold(),
261 window_ms: default_window_ms(),
262 rate_threshold: default_rate_threshold(),
263 min_events_for_rate: default_min_events_for_rate(),
264 }
265 }
266}
267
268impl SendErrorTrackerConfig {
269 pub fn window(&self) -> Duration {
271 Duration::from_millis(self.window_ms)
272 }
273
274 pub fn validate(&self) -> Result<(), String> {
276 if self.consecutive_threshold == 0 {
277 return Err("SendErrorTracker consecutive_threshold must be > 0".to_string());
278 }
279 if self.window_ms == 0 {
280 return Err("SendErrorTracker window_ms must be > 0".to_string());
281 }
282 if self.rate_threshold < 0.0 || self.rate_threshold > 1.0 {
283 return Err(format!(
284 "SendErrorTracker rate_threshold must be 0.0-1.0, got {}",
285 self.rate_threshold
286 ));
287 }
288 Ok(())
289 }
290}
291
292#[derive(Debug, Clone, PartialEq, Eq)]
300pub enum TrackingResult {
301 Recorded,
303 ConsecutiveThresholdExceeded {
305 consecutive_errors: u32,
306 threshold: u32,
307 },
308 RateThresholdExceeded {
310 error_count: usize,
311 window_ms: u64,
312 rate: u32, },
314}
315
316impl TrackingResult {
317 pub fn requires_restart(&self) -> bool {
319 matches!(self, Self::ConsecutiveThresholdExceeded { .. })
320 }
321
322 pub fn is_warning(&self) -> bool {
324 matches!(self, Self::RateThresholdExceeded { .. })
325 }
326}
327
328pub struct SendErrorTracker {
336 config: SendErrorTrackerConfig,
338 channels: DashMap<u8, RwLock<ChannelErrorState>>,
340 stats: TrackerStats,
342}
343
344pub struct TrackerStats {
346 pub total_successes: AtomicU64,
348 pub total_failures: AtomicU64,
350 pub consecutive_triggers: AtomicU64,
352 pub rate_triggers: AtomicU64,
354}
355
356impl TrackerStats {
357 fn new() -> Self {
358 Self {
359 total_successes: AtomicU64::new(0),
360 total_failures: AtomicU64::new(0),
361 consecutive_triggers: AtomicU64::new(0),
362 rate_triggers: AtomicU64::new(0),
363 }
364 }
365
366 pub fn snapshot(&self) -> TrackerStatsSnapshot {
368 TrackerStatsSnapshot {
369 total_successes: self.total_successes.load(Ordering::Relaxed),
370 total_failures: self.total_failures.load(Ordering::Relaxed),
371 consecutive_triggers: self.consecutive_triggers.load(Ordering::Relaxed),
372 rate_triggers: self.rate_triggers.load(Ordering::Relaxed),
373 }
374 }
375}
376
377#[derive(Debug, Clone)]
379pub struct TrackerStatsSnapshot {
380 pub total_successes: u64,
381 pub total_failures: u64,
382 pub consecutive_triggers: u64,
383 pub rate_triggers: u64,
384}
385
386impl TrackerStatsSnapshot {
387 pub fn error_rate(&self) -> f64 {
389 let total = self.total_successes + self.total_failures;
390 if total == 0 {
391 return 0.0;
392 }
393 self.total_failures as f64 / total as f64
394 }
395
396 pub fn total_events(&self) -> u64 {
398 self.total_successes + self.total_failures
399 }
400}
401
402#[derive(Debug, Clone)]
404pub struct ChannelErrorSummary {
405 pub channel_id: u8,
406 pub consecutive_errors: u32,
407 pub total_successes: u64,
408 pub total_failures: u64,
409 pub window_error_count: usize,
410 pub restart_triggered: bool,
411 pub last_success_elapsed_ms: Option<u64>,
412 pub last_error_elapsed_ms: Option<u64>,
413 pub category_counts: [(ErrorCategory, u64); 6],
414}
415
416impl SendErrorTracker {
417 pub fn new(config: SendErrorTrackerConfig) -> Self {
419 Self {
420 channels: DashMap::new(),
421 config,
422 stats: TrackerStats::new(),
423 }
424 }
425
426 pub fn with_defaults() -> Self {
428 Self::new(SendErrorTrackerConfig::default())
429 }
430
431 pub fn is_enabled(&self) -> bool {
433 self.config.enabled
434 }
435
436 pub fn config(&self) -> &SendErrorTrackerConfig {
438 &self.config
439 }
440
441 pub fn on_send_success(&self, channel_id: u8) {
443 if !self.config.enabled {
444 return;
445 }
446
447 self.ensure_channel(channel_id);
448
449 if let Some(state_lock) = self.channels.get(&channel_id) {
450 let mut state = state_lock.write();
451 state.record_success();
452 }
453
454 self.stats.total_successes.fetch_add(1, Ordering::Relaxed);
455 }
456
457 pub fn on_send_failure(&self, channel_id: u8, category: ErrorCategory) -> TrackingResult {
461 if !self.config.enabled {
462 return TrackingResult::Recorded;
463 }
464
465 self.ensure_channel(channel_id);
466 self.stats.total_failures.fetch_add(1, Ordering::Relaxed);
467
468 let result = if let Some(state_lock) = self.channels.get(&channel_id) {
469 let mut state = state_lock.write();
470 state.record_failure(category);
471
472 let window = self.config.window();
474 state.prune_old_events(window);
475
476 if state.consecutive_errors >= self.config.consecutive_threshold
478 && !state.restart_triggered
479 {
480 state.restart_triggered = true;
481 self.stats
482 .consecutive_triggers
483 .fetch_add(1, Ordering::Relaxed);
484 return TrackingResult::ConsecutiveThresholdExceeded {
485 consecutive_errors: state.consecutive_errors,
486 threshold: self.config.consecutive_threshold,
487 };
488 }
489
490 let total_in_window = state.total_successes + state.total_failures;
492 if total_in_window >= self.config.min_events_for_rate {
493 let error_count = state.window_error_count(window);
494 let rate = state.window_error_rate(window, total_in_window);
495 if rate > self.config.rate_threshold {
496 self.stats.rate_triggers.fetch_add(1, Ordering::Relaxed);
497 return TrackingResult::RateThresholdExceeded {
498 error_count,
499 window_ms: self.config.window_ms,
500 rate: (rate * 100.0) as u32,
501 };
502 }
503 }
504
505 TrackingResult::Recorded
506 } else {
507 TrackingResult::Recorded
508 };
509
510 result
511 }
512
513 pub fn on_ack_error(&self, channel_id: u8, _status: u8) -> TrackingResult {
515 self.on_send_failure(channel_id, ErrorCategory::AckError)
516 }
517
518 pub fn on_ack_timeout(&self, channel_id: u8) -> TrackingResult {
520 self.on_send_failure(channel_id, ErrorCategory::AckTimeout)
521 }
522
523 pub fn on_confirmation_nack(&self, channel_id: u8) -> TrackingResult {
525 self.on_send_failure(channel_id, ErrorCategory::ConfirmationNack)
526 }
527
528 pub fn on_confirmation_timeout(&self, channel_id: u8) -> TrackingResult {
530 self.on_send_failure(channel_id, ErrorCategory::ConfirmationTimeout)
531 }
532
533 pub fn on_flow_control_drop(&self, channel_id: u8) -> TrackingResult {
535 self.on_send_failure(channel_id, ErrorCategory::FlowControlDrop)
536 }
537
538 pub fn consecutive_errors(&self, channel_id: u8) -> u32 {
540 self.channels
541 .get(&channel_id)
542 .map(|s| s.read().consecutive_errors)
543 .unwrap_or(0)
544 }
545
546 pub fn window_error_count(&self, channel_id: u8) -> usize {
548 let window = self.config.window();
549 self.channels
550 .get(&channel_id)
551 .map(|s| s.read().window_error_count(window))
552 .unwrap_or(0)
553 }
554
555 pub fn is_restart_triggered(&self, channel_id: u8) -> bool {
557 self.channels
558 .get(&channel_id)
559 .map(|s| s.read().restart_triggered)
560 .unwrap_or(false)
561 }
562
563 pub fn reset_channel(&self, channel_id: u8) {
565 if let Some(state_lock) = self.channels.get(&channel_id) {
566 state_lock.write().reset();
567 }
568 }
569
570 pub fn remove_channel(&self, channel_id: u8) {
572 self.channels.remove(&channel_id);
573 }
574
575 pub fn channel_summary(&self, channel_id: u8) -> Option<ChannelErrorSummary> {
577 let window = self.config.window();
578 self.channels.get(&channel_id).map(|state_lock| {
579 let state = state_lock.read();
580 let categories = ErrorCategory::all();
581 let mut category_counts = [(ErrorCategory::SendFailure, 0u64); 6];
582 for (i, cat) in categories.iter().enumerate() {
583 category_counts[i] = (*cat, state.category_counts[i]);
584 }
585
586 ChannelErrorSummary {
587 channel_id,
588 consecutive_errors: state.consecutive_errors,
589 total_successes: state.total_successes,
590 total_failures: state.total_failures,
591 window_error_count: state.window_error_count(window),
592 restart_triggered: state.restart_triggered,
593 last_success_elapsed_ms: state
594 .last_success_at
595 .map(|t| t.elapsed().as_millis() as u64),
596 last_error_elapsed_ms: state.last_error_at.map(|t| t.elapsed().as_millis() as u64),
597 category_counts,
598 }
599 })
600 }
601
602 pub fn stats_snapshot(&self) -> TrackerStatsSnapshot {
604 self.stats.snapshot()
605 }
606
607 fn ensure_channel(&self, channel_id: u8) {
609 self.channels
610 .entry(channel_id)
611 .or_insert_with(|| RwLock::new(ChannelErrorState::new()));
612 }
613}
614
615impl fmt::Debug for SendErrorTracker {
616 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617 f.debug_struct("SendErrorTracker")
618 .field("enabled", &self.config.enabled)
619 .field("channels", &self.channels.len())
620 .field("consecutive_threshold", &self.config.consecutive_threshold)
621 .field("window_ms", &self.config.window_ms)
622 .field("rate_threshold", &self.config.rate_threshold)
623 .finish()
624 }
625}
626
627impl Default for SendErrorTracker {
628 fn default() -> Self {
629 Self::with_defaults()
630 }
631}
632
633#[cfg(test)]
638mod tests {
639 use super::*;
640
641 #[test]
642 fn test_error_category_names() {
643 assert_eq!(ErrorCategory::SendFailure.name(), "SendFailure");
644 assert_eq!(ErrorCategory::AckError.name(), "AckError");
645 assert_eq!(ErrorCategory::AckTimeout.name(), "AckTimeout");
646 assert_eq!(ErrorCategory::ConfirmationNack.name(), "ConfirmationNack");
647 assert_eq!(
648 ErrorCategory::ConfirmationTimeout.name(),
649 "ConfirmationTimeout"
650 );
651 assert_eq!(ErrorCategory::FlowControlDrop.name(), "FlowControlDrop");
652 }
653
654 #[test]
655 fn test_error_category_display() {
656 assert_eq!(format!("{}", ErrorCategory::SendFailure), "SendFailure");
657 }
658
659 #[test]
660 fn test_error_category_all() {
661 assert_eq!(ErrorCategory::all().len(), 6);
662 }
663
664 #[test]
665 fn test_tracker_success_resets_consecutive() {
666 let tracker = SendErrorTracker::with_defaults();
667
668 tracker.on_send_failure(1, ErrorCategory::SendFailure);
669 tracker.on_send_failure(1, ErrorCategory::SendFailure);
670 assert_eq!(tracker.consecutive_errors(1), 2);
671
672 tracker.on_send_success(1);
673 assert_eq!(tracker.consecutive_errors(1), 0);
674 }
675
676 #[test]
677 fn test_tracker_consecutive_threshold() {
678 let config = SendErrorTrackerConfig {
679 consecutive_threshold: 3,
680 ..Default::default()
681 };
682 let tracker = SendErrorTracker::new(config);
683
684 let r1 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
685 assert_eq!(r1, TrackingResult::Recorded);
686
687 let r2 = tracker.on_send_failure(1, ErrorCategory::AckError);
688 assert_eq!(r2, TrackingResult::Recorded);
689
690 let r3 = tracker.on_send_failure(1, ErrorCategory::AckTimeout);
691 assert!(matches!(
692 r3,
693 TrackingResult::ConsecutiveThresholdExceeded {
694 consecutive_errors: 3,
695 threshold: 3,
696 }
697 ));
698
699 assert!(r3.requires_restart());
700 assert!(!r3.is_warning());
701 }
702
703 #[test]
704 fn test_tracker_restart_only_triggers_once() {
705 let config = SendErrorTrackerConfig {
706 consecutive_threshold: 2,
707 ..Default::default()
708 };
709 let tracker = SendErrorTracker::new(config);
710
711 let r1 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
712 assert_eq!(r1, TrackingResult::Recorded);
713
714 let r2 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
715 assert!(matches!(
716 r2,
717 TrackingResult::ConsecutiveThresholdExceeded { .. }
718 ));
719
720 let r3 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
722 assert!(!matches!(
724 r3,
725 TrackingResult::ConsecutiveThresholdExceeded { .. }
726 ));
727 }
728
729 #[test]
730 fn test_tracker_success_clears_restart_flag() {
731 let config = SendErrorTrackerConfig {
732 consecutive_threshold: 2,
733 ..Default::default()
734 };
735 let tracker = SendErrorTracker::new(config);
736
737 tracker.on_send_failure(1, ErrorCategory::SendFailure);
738 tracker.on_send_failure(1, ErrorCategory::SendFailure);
739 assert!(tracker.is_restart_triggered(1));
740
741 tracker.on_send_success(1);
742 assert!(!tracker.is_restart_triggered(1));
743
744 tracker.on_send_failure(1, ErrorCategory::SendFailure);
746 let r = tracker.on_send_failure(1, ErrorCategory::SendFailure);
747 assert!(matches!(
748 r,
749 TrackingResult::ConsecutiveThresholdExceeded { .. }
750 ));
751 }
752
753 #[test]
754 fn test_tracker_multi_channel() {
755 let tracker = SendErrorTracker::with_defaults();
756
757 tracker.on_send_failure(1, ErrorCategory::SendFailure);
758 tracker.on_send_failure(1, ErrorCategory::SendFailure);
759 tracker.on_send_failure(2, ErrorCategory::AckError);
760
761 assert_eq!(tracker.consecutive_errors(1), 2);
762 assert_eq!(tracker.consecutive_errors(2), 1);
763 assert_eq!(tracker.consecutive_errors(3), 0); }
765
766 #[test]
767 fn test_tracker_window_error_count() {
768 let config = SendErrorTrackerConfig {
769 window_ms: 60_000, ..Default::default()
771 };
772 let tracker = SendErrorTracker::new(config);
773
774 tracker.on_send_failure(1, ErrorCategory::SendFailure);
775 tracker.on_send_failure(1, ErrorCategory::AckError);
776 tracker.on_send_failure(1, ErrorCategory::AckTimeout);
777
778 assert_eq!(tracker.window_error_count(1), 3);
779 }
780
781 #[test]
782 fn test_tracker_convenience_methods() {
783 let config = SendErrorTrackerConfig {
784 consecutive_threshold: 100, ..Default::default()
786 };
787 let tracker = SendErrorTracker::new(config);
788
789 let r1 = tracker.on_ack_error(1, 0x21);
790 assert_eq!(r1, TrackingResult::Recorded);
791
792 let r2 = tracker.on_ack_timeout(1);
793 assert_eq!(r2, TrackingResult::Recorded);
794
795 let r3 = tracker.on_confirmation_nack(1);
796 assert_eq!(r3, TrackingResult::Recorded);
797
798 let r4 = tracker.on_confirmation_timeout(1);
799 assert_eq!(r4, TrackingResult::Recorded);
800
801 let r5 = tracker.on_flow_control_drop(1);
802 assert_eq!(r5, TrackingResult::Recorded);
803
804 assert_eq!(tracker.consecutive_errors(1), 5);
805 }
806
807 #[test]
808 fn test_tracker_reset_channel() {
809 let tracker = SendErrorTracker::with_defaults();
810
811 tracker.on_send_failure(1, ErrorCategory::SendFailure);
812 tracker.on_send_failure(1, ErrorCategory::SendFailure);
813 assert_eq!(tracker.consecutive_errors(1), 2);
814
815 tracker.reset_channel(1);
816 assert_eq!(tracker.consecutive_errors(1), 0);
817 assert!(!tracker.is_restart_triggered(1));
818 }
819
820 #[test]
821 fn test_tracker_remove_channel() {
822 let tracker = SendErrorTracker::with_defaults();
823
824 tracker.on_send_failure(1, ErrorCategory::SendFailure);
825 assert_eq!(tracker.consecutive_errors(1), 1);
826
827 tracker.remove_channel(1);
828 assert_eq!(tracker.consecutive_errors(1), 0);
829 }
830
831 #[test]
832 fn test_tracker_channel_summary() {
833 let tracker = SendErrorTracker::with_defaults();
834
835 tracker.on_send_success(1);
836 tracker.on_send_failure(1, ErrorCategory::SendFailure);
837 tracker.on_send_failure(1, ErrorCategory::AckError);
838
839 let summary = tracker.channel_summary(1).unwrap();
840 assert_eq!(summary.channel_id, 1);
841 assert_eq!(summary.consecutive_errors, 2);
842 assert_eq!(summary.total_successes, 1);
843 assert_eq!(summary.total_failures, 2);
844 assert!(!summary.restart_triggered);
845 assert!(summary.last_success_elapsed_ms.is_some());
846 assert!(summary.last_error_elapsed_ms.is_some());
847 }
848
849 #[test]
850 fn test_tracker_global_stats() {
851 let tracker = SendErrorTracker::with_defaults();
852
853 tracker.on_send_success(1);
854 tracker.on_send_success(2);
855 tracker.on_send_failure(1, ErrorCategory::SendFailure);
856
857 let stats = tracker.stats_snapshot();
858 assert_eq!(stats.total_successes, 2);
859 assert_eq!(stats.total_failures, 1);
860 assert_eq!(stats.total_events(), 3);
861
862 let rate = stats.error_rate();
863 assert!((rate - 1.0 / 3.0).abs() < 0.01);
864 }
865
866 #[test]
867 fn test_tracker_stats_empty() {
868 let stats = TrackerStatsSnapshot {
869 total_successes: 0,
870 total_failures: 0,
871 consecutive_triggers: 0,
872 rate_triggers: 0,
873 };
874 assert_eq!(stats.error_rate(), 0.0);
875 assert_eq!(stats.total_events(), 0);
876 }
877
878 #[test]
879 fn test_tracker_disabled() {
880 let config = SendErrorTrackerConfig {
881 enabled: false,
882 ..Default::default()
883 };
884 let tracker = SendErrorTracker::new(config);
885
886 tracker.on_send_success(1);
887 let result = tracker.on_send_failure(1, ErrorCategory::SendFailure);
888 assert_eq!(result, TrackingResult::Recorded);
889 assert_eq!(tracker.consecutive_errors(1), 0);
890 }
891
892 #[test]
893 fn test_config_validate() {
894 assert!(SendErrorTrackerConfig::default().validate().is_ok());
895
896 assert!(SendErrorTrackerConfig {
897 consecutive_threshold: 0,
898 ..Default::default()
899 }
900 .validate()
901 .is_err());
902
903 assert!(SendErrorTrackerConfig {
904 window_ms: 0,
905 ..Default::default()
906 }
907 .validate()
908 .is_err());
909
910 assert!(SendErrorTrackerConfig {
911 rate_threshold: 1.5,
912 ..Default::default()
913 }
914 .validate()
915 .is_err());
916
917 assert!(SendErrorTrackerConfig {
918 rate_threshold: -0.1,
919 ..Default::default()
920 }
921 .validate()
922 .is_err());
923 }
924
925 #[test]
926 fn test_config_defaults() {
927 let config = SendErrorTrackerConfig::default();
928 assert!(config.enabled);
929 assert_eq!(config.consecutive_threshold, 5);
930 assert_eq!(config.window_ms, 60_000);
931 assert_eq!(config.rate_threshold, 0.5);
932 assert_eq!(config.min_events_for_rate, 10);
933 }
934
935 #[test]
936 fn test_tracker_debug() {
937 let tracker = SendErrorTracker::with_defaults();
938 let debug_str = format!("{:?}", tracker);
939 assert!(debug_str.contains("SendErrorTracker"));
940 assert!(debug_str.contains("enabled"));
941 }
942
943 #[test]
944 fn test_tracking_result_properties() {
945 let recorded = TrackingResult::Recorded;
946 assert!(!recorded.requires_restart());
947 assert!(!recorded.is_warning());
948
949 let consecutive = TrackingResult::ConsecutiveThresholdExceeded {
950 consecutive_errors: 5,
951 threshold: 5,
952 };
953 assert!(consecutive.requires_restart());
954 assert!(!consecutive.is_warning());
955
956 let rate = TrackingResult::RateThresholdExceeded {
957 error_count: 10,
958 window_ms: 60_000,
959 rate: 75,
960 };
961 assert!(!rate.requires_restart());
962 assert!(rate.is_warning());
963 }
964
965 #[test]
966 fn test_category_counts_in_summary() {
967 let tracker = SendErrorTracker::with_defaults();
968
969 tracker.on_send_failure(1, ErrorCategory::SendFailure);
970 tracker.on_send_failure(1, ErrorCategory::SendFailure);
971 tracker.on_send_failure(1, ErrorCategory::AckError);
972 tracker.on_send_failure(1, ErrorCategory::ConfirmationNack);
973
974 let summary = tracker.channel_summary(1).unwrap();
975
976 let send_failures = summary
978 .category_counts
979 .iter()
980 .find(|(cat, _)| *cat == ErrorCategory::SendFailure)
981 .map(|(_, count)| *count)
982 .unwrap();
983 assert_eq!(send_failures, 2);
984
985 let ack_errors = summary
986 .category_counts
987 .iter()
988 .find(|(cat, _)| *cat == ErrorCategory::AckError)
989 .map(|(_, count)| *count)
990 .unwrap();
991 assert_eq!(ack_errors, 1);
992 }
993}