Skip to main content

mabi_knx/
error_tracker.rs

1//! Send Error Tracker with consecutive and sliding window error rate monitoring.
2//!
3//! This module provides production-grade error tracking for KNXnet/IP frame
4//! transmission, supporting two complementary monitoring strategies:
5//!
6//! ## Consecutive Error Tracking
7//!
8//! Tracks consecutive send failures. When the count reaches a configurable
9//! threshold, a tunnel restart is triggered (matching knxd behavior where
10//! 5 consecutive errors trigger reconnection).
11//!
12//! ## Sliding Window Error Rate
13//!
14//! Monitors the error rate over a configurable time window (default: 60 seconds).
15//! This catches intermittent but persistent error patterns that wouldn't trigger
16//! the consecutive threshold.
17//!
18//! ## Per-Channel Tracking
19//!
20//! Each tunnel connection (channel) has independent error tracking. This allows
21//! the simulator to inject channel-specific failure patterns for testing.
22//!
23//! ## Integration
24//!
25//! The tracker is integrated into the server's send pipeline:
26//! - `on_send_success(channel_id)` — resets consecutive counter
27//! - `on_send_failure(channel_id, error)` — increments counters, checks thresholds
28//! - `on_ack_error(channel_id, status)` — tracks ACK-level errors
29//! - `on_confirmation_nack(channel_id)` — tracks L_Data.con failures
30
31use std::collections::VecDeque;
32use std::fmt;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::time::{Duration, Instant};
35
36use dashmap::DashMap;
37use parking_lot::RwLock;
38use serde::{Deserialize, Serialize};
39
40// ============================================================================
41// Error Event
42// ============================================================================
43
44/// A recorded error event with timestamp and classification.
45#[derive(Debug, Clone)]
46struct ErrorEvent {
47    /// When the error occurred.
48    timestamp: Instant,
49    /// Error category — stored for per-category window analysis.
50    #[allow(dead_code)]
51    category: ErrorCategory,
52}
53
54/// Error category for classification.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56pub enum ErrorCategory {
57    /// UDP send failure (I/O error).
58    SendFailure,
59    /// ACK error (non-zero status in TUNNELLING_ACK).
60    AckError,
61    /// ACK timeout (no ACK received within timeout).
62    AckTimeout,
63    /// L_Data.con NACK (confirmation failure from bus).
64    ConfirmationNack,
65    /// L_Data.con timeout (no confirmation received).
66    ConfirmationTimeout,
67    /// Flow control drop (filter chain dropped the frame).
68    FlowControlDrop,
69}
70
71impl ErrorCategory {
72    /// Human-readable name.
73    pub fn name(&self) -> &'static str {
74        match self {
75            Self::SendFailure => "SendFailure",
76            Self::AckError => "AckError",
77            Self::AckTimeout => "AckTimeout",
78            Self::ConfirmationNack => "ConfirmationNack",
79            Self::ConfirmationTimeout => "ConfirmationTimeout",
80            Self::FlowControlDrop => "FlowControlDrop",
81        }
82    }
83
84    /// All categories.
85    pub fn all() -> &'static [ErrorCategory] {
86        &[
87            Self::SendFailure,
88            Self::AckError,
89            Self::AckTimeout,
90            Self::ConfirmationNack,
91            Self::ConfirmationTimeout,
92            Self::FlowControlDrop,
93        ]
94    }
95}
96
97impl fmt::Display for ErrorCategory {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        write!(f, "{}", self.name())
100    }
101}
102
103// ============================================================================
104// Channel Error State
105// ============================================================================
106
107/// Per-channel error tracking state.
108struct ChannelErrorState {
109    /// Consecutive error counter — resets on success.
110    consecutive_errors: u32,
111    /// Sliding window of recent error events.
112    recent_errors: VecDeque<ErrorEvent>,
113    /// Total successes.
114    total_successes: u64,
115    /// Total failures.
116    total_failures: u64,
117    /// Per-category failure counts.
118    category_counts: [u64; 6],
119    /// Timestamp of last successful send.
120    last_success_at: Option<Instant>,
121    /// Timestamp of last error.
122    last_error_at: Option<Instant>,
123    /// Whether this channel has been flagged for restart.
124    restart_triggered: bool,
125}
126
127impl ChannelErrorState {
128    fn new() -> Self {
129        Self {
130            consecutive_errors: 0,
131            recent_errors: VecDeque::with_capacity(128),
132            total_successes: 0,
133            total_failures: 0,
134            category_counts: [0; 6],
135            last_success_at: None,
136            last_error_at: None,
137            restart_triggered: false,
138        }
139    }
140
141    /// Record a successful send.
142    fn record_success(&mut self) {
143        self.consecutive_errors = 0;
144        self.total_successes += 1;
145        self.last_success_at = Some(Instant::now());
146        self.restart_triggered = false;
147    }
148
149    /// Record a failed send.
150    fn record_failure(&mut self, category: ErrorCategory) {
151        self.consecutive_errors += 1;
152        self.total_failures += 1;
153        self.last_error_at = Some(Instant::now());
154        self.category_counts[category as usize] += 1;
155        self.recent_errors.push_back(ErrorEvent {
156            timestamp: Instant::now(),
157            category,
158        });
159    }
160
161    /// Prune events older than the window duration.
162    fn prune_old_events(&mut self, window: Duration) {
163        let cutoff = Instant::now() - window;
164        while let Some(front) = self.recent_errors.front() {
165            if front.timestamp < cutoff {
166                self.recent_errors.pop_front();
167            } else {
168                break;
169            }
170        }
171    }
172
173    /// Get the error count within the sliding window.
174    fn window_error_count(&self, window: Duration) -> usize {
175        let cutoff = Instant::now() - window;
176        self.recent_errors
177            .iter()
178            .filter(|e| e.timestamp >= cutoff)
179            .count()
180    }
181
182    /// Calculate error rate within the sliding window.
183    /// Returns (errors_in_window, total_in_window, rate).
184    fn window_error_rate(&self, window: Duration, total_sends_in_window: u64) -> f64 {
185        if total_sends_in_window == 0 {
186            return 0.0;
187        }
188        let errors = self.window_error_count(window) as f64;
189        errors / total_sends_in_window as f64
190    }
191
192    /// Reset all state for this channel.
193    fn reset(&mut self) {
194        self.consecutive_errors = 0;
195        self.recent_errors.clear();
196        self.restart_triggered = false;
197    }
198}
199
200// ============================================================================
201// Tracker Configuration
202// ============================================================================
203
204/// SendErrorTracker configuration.
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct SendErrorTrackerConfig {
207    /// Whether the error tracker is enabled.
208    #[serde(default = "default_true")]
209    pub enabled: bool,
210
211    /// Consecutive error threshold for tunnel restart.
212    /// When consecutive errors reach this count, the tracker signals a restart.
213    /// Default: 5 (matches knxd behavior).
214    #[serde(default = "default_consecutive_threshold")]
215    pub consecutive_threshold: u32,
216
217    /// Sliding window duration in milliseconds.
218    /// Errors older than this are pruned from the window.
219    /// Default: 60000 (60 seconds).
220    #[serde(default = "default_window_ms")]
221    pub window_ms: u64,
222
223    /// Error rate threshold within the sliding window (0.0 - 1.0).
224    /// When the error rate exceeds this within the window, a warning is triggered.
225    /// Default: 0.5 (50% error rate).
226    #[serde(default = "default_rate_threshold")]
227    pub rate_threshold: f64,
228
229    /// Minimum number of events in the window before rate threshold applies.
230    /// Prevents false positives when only a few events have occurred.
231    /// Default: 10.
232    #[serde(default = "default_min_events_for_rate")]
233    pub min_events_for_rate: u64,
234}
235
236fn default_true() -> bool {
237    true
238}
239
240fn default_consecutive_threshold() -> u32 {
241    5
242}
243
244fn default_window_ms() -> u64 {
245    60_000
246}
247
248fn default_rate_threshold() -> f64 {
249    0.5
250}
251
252fn default_min_events_for_rate() -> u64 {
253    10
254}
255
256impl Default for SendErrorTrackerConfig {
257    fn default() -> Self {
258        Self {
259            enabled: true,
260            consecutive_threshold: default_consecutive_threshold(),
261            window_ms: default_window_ms(),
262            rate_threshold: default_rate_threshold(),
263            min_events_for_rate: default_min_events_for_rate(),
264        }
265    }
266}
267
268impl SendErrorTrackerConfig {
269    /// Get the sliding window duration.
270    pub fn window(&self) -> Duration {
271        Duration::from_millis(self.window_ms)
272    }
273
274    /// Validate the configuration.
275    pub fn validate(&self) -> Result<(), String> {
276        if self.consecutive_threshold == 0 {
277            return Err("SendErrorTracker consecutive_threshold must be > 0".to_string());
278        }
279        if self.window_ms == 0 {
280            return Err("SendErrorTracker window_ms must be > 0".to_string());
281        }
282        if self.rate_threshold < 0.0 || self.rate_threshold > 1.0 {
283            return Err(format!(
284                "SendErrorTracker rate_threshold must be 0.0-1.0, got {}",
285                self.rate_threshold
286            ));
287        }
288        Ok(())
289    }
290}
291
292// ============================================================================
293// Tracker Result
294// ============================================================================
295
296/// Result of error tracking evaluation.
297///
298/// Returned by `on_send_failure()` to indicate what action should be taken.
299#[derive(Debug, Clone, PartialEq, Eq)]
300pub enum TrackingResult {
301    /// Error recorded, no threshold exceeded.
302    Recorded,
303    /// Consecutive error threshold exceeded — tunnel restart recommended.
304    ConsecutiveThresholdExceeded {
305        consecutive_errors: u32,
306        threshold: u32,
307    },
308    /// Sliding window error rate exceeded — warning state.
309    RateThresholdExceeded {
310        error_count: usize,
311        window_ms: u64,
312        rate: u32, // Rate as percentage (0-100)
313    },
314}
315
316impl TrackingResult {
317    /// Whether a tunnel restart is recommended.
318    pub fn requires_restart(&self) -> bool {
319        matches!(self, Self::ConsecutiveThresholdExceeded { .. })
320    }
321
322    /// Whether this is a warning state.
323    pub fn is_warning(&self) -> bool {
324        matches!(self, Self::RateThresholdExceeded { .. })
325    }
326}
327
328// ============================================================================
329// SendErrorTracker
330// ============================================================================
331
332/// Production-grade send error tracker with per-channel state.
333///
334/// Thread-safe — uses DashMap for per-channel state and atomics for global counters.
335pub struct SendErrorTracker {
336    /// Configuration.
337    config: SendErrorTrackerConfig,
338    /// Per-channel error state.
339    channels: DashMap<u8, RwLock<ChannelErrorState>>,
340    /// Global statistics.
341    stats: TrackerStats,
342}
343
344/// Global tracker statistics.
345pub struct TrackerStats {
346    /// Total successes across all channels.
347    pub total_successes: AtomicU64,
348    /// Total failures across all channels.
349    pub total_failures: AtomicU64,
350    /// Number of times consecutive threshold was exceeded.
351    pub consecutive_triggers: AtomicU64,
352    /// Number of times rate threshold was exceeded.
353    pub rate_triggers: AtomicU64,
354}
355
356impl TrackerStats {
357    fn new() -> Self {
358        Self {
359            total_successes: AtomicU64::new(0),
360            total_failures: AtomicU64::new(0),
361            consecutive_triggers: AtomicU64::new(0),
362            rate_triggers: AtomicU64::new(0),
363        }
364    }
365
366    /// Take a snapshot.
367    pub fn snapshot(&self) -> TrackerStatsSnapshot {
368        TrackerStatsSnapshot {
369            total_successes: self.total_successes.load(Ordering::Relaxed),
370            total_failures: self.total_failures.load(Ordering::Relaxed),
371            consecutive_triggers: self.consecutive_triggers.load(Ordering::Relaxed),
372            rate_triggers: self.rate_triggers.load(Ordering::Relaxed),
373        }
374    }
375}
376
377/// Immutable snapshot of tracker statistics.
378#[derive(Debug, Clone)]
379pub struct TrackerStatsSnapshot {
380    pub total_successes: u64,
381    pub total_failures: u64,
382    pub consecutive_triggers: u64,
383    pub rate_triggers: u64,
384}
385
386impl TrackerStatsSnapshot {
387    /// Overall error rate.
388    pub fn error_rate(&self) -> f64 {
389        let total = self.total_successes + self.total_failures;
390        if total == 0 {
391            return 0.0;
392        }
393        self.total_failures as f64 / total as f64
394    }
395
396    /// Total events.
397    pub fn total_events(&self) -> u64 {
398        self.total_successes + self.total_failures
399    }
400}
401
402/// Per-channel error summary.
403#[derive(Debug, Clone)]
404pub struct ChannelErrorSummary {
405    pub channel_id: u8,
406    pub consecutive_errors: u32,
407    pub total_successes: u64,
408    pub total_failures: u64,
409    pub window_error_count: usize,
410    pub restart_triggered: bool,
411    pub last_success_elapsed_ms: Option<u64>,
412    pub last_error_elapsed_ms: Option<u64>,
413    pub category_counts: [(ErrorCategory, u64); 6],
414}
415
416impl SendErrorTracker {
417    /// Create a new tracker with the given configuration.
418    pub fn new(config: SendErrorTrackerConfig) -> Self {
419        Self {
420            channels: DashMap::new(),
421            config,
422            stats: TrackerStats::new(),
423        }
424    }
425
426    /// Create a tracker with default configuration.
427    pub fn with_defaults() -> Self {
428        Self::new(SendErrorTrackerConfig::default())
429    }
430
431    /// Check if the tracker is enabled.
432    pub fn is_enabled(&self) -> bool {
433        self.config.enabled
434    }
435
436    /// Get the configuration.
437    pub fn config(&self) -> &SendErrorTrackerConfig {
438        &self.config
439    }
440
441    /// Record a successful send for a channel.
442    pub fn on_send_success(&self, channel_id: u8) {
443        if !self.config.enabled {
444            return;
445        }
446
447        self.ensure_channel(channel_id);
448
449        if let Some(state_lock) = self.channels.get(&channel_id) {
450            let mut state = state_lock.write();
451            state.record_success();
452        }
453
454        self.stats.total_successes.fetch_add(1, Ordering::Relaxed);
455    }
456
457    /// Record a send failure for a channel.
458    ///
459    /// Returns a `TrackingResult` indicating whether any threshold was exceeded.
460    pub fn on_send_failure(&self, channel_id: u8, category: ErrorCategory) -> TrackingResult {
461        if !self.config.enabled {
462            return TrackingResult::Recorded;
463        }
464
465        self.ensure_channel(channel_id);
466        self.stats.total_failures.fetch_add(1, Ordering::Relaxed);
467
468        let result = if let Some(state_lock) = self.channels.get(&channel_id) {
469            let mut state = state_lock.write();
470            state.record_failure(category);
471
472            // Prune old events
473            let window = self.config.window();
474            state.prune_old_events(window);
475
476            // Check consecutive threshold
477            if state.consecutive_errors >= self.config.consecutive_threshold
478                && !state.restart_triggered
479            {
480                state.restart_triggered = true;
481                self.stats
482                    .consecutive_triggers
483                    .fetch_add(1, Ordering::Relaxed);
484                return TrackingResult::ConsecutiveThresholdExceeded {
485                    consecutive_errors: state.consecutive_errors,
486                    threshold: self.config.consecutive_threshold,
487                };
488            }
489
490            // Check sliding window rate threshold
491            let total_in_window = state.total_successes + state.total_failures;
492            if total_in_window >= self.config.min_events_for_rate {
493                let error_count = state.window_error_count(window);
494                let rate = state.window_error_rate(window, total_in_window);
495                if rate > self.config.rate_threshold {
496                    self.stats.rate_triggers.fetch_add(1, Ordering::Relaxed);
497                    return TrackingResult::RateThresholdExceeded {
498                        error_count,
499                        window_ms: self.config.window_ms,
500                        rate: (rate * 100.0) as u32,
501                    };
502                }
503            }
504
505            TrackingResult::Recorded
506        } else {
507            TrackingResult::Recorded
508        };
509
510        result
511    }
512
513    /// Record an ACK error.
514    pub fn on_ack_error(&self, channel_id: u8, _status: u8) -> TrackingResult {
515        self.on_send_failure(channel_id, ErrorCategory::AckError)
516    }
517
518    /// Record an ACK timeout.
519    pub fn on_ack_timeout(&self, channel_id: u8) -> TrackingResult {
520        self.on_send_failure(channel_id, ErrorCategory::AckTimeout)
521    }
522
523    /// Record a confirmation NACK.
524    pub fn on_confirmation_nack(&self, channel_id: u8) -> TrackingResult {
525        self.on_send_failure(channel_id, ErrorCategory::ConfirmationNack)
526    }
527
528    /// Record a confirmation timeout.
529    pub fn on_confirmation_timeout(&self, channel_id: u8) -> TrackingResult {
530        self.on_send_failure(channel_id, ErrorCategory::ConfirmationTimeout)
531    }
532
533    /// Record a flow control drop.
534    pub fn on_flow_control_drop(&self, channel_id: u8) -> TrackingResult {
535        self.on_send_failure(channel_id, ErrorCategory::FlowControlDrop)
536    }
537
538    /// Get the consecutive error count for a channel.
539    pub fn consecutive_errors(&self, channel_id: u8) -> u32 {
540        self.channels
541            .get(&channel_id)
542            .map(|s| s.read().consecutive_errors)
543            .unwrap_or(0)
544    }
545
546    /// Get the error count in the current sliding window for a channel.
547    pub fn window_error_count(&self, channel_id: u8) -> usize {
548        let window = self.config.window();
549        self.channels
550            .get(&channel_id)
551            .map(|s| s.read().window_error_count(window))
552            .unwrap_or(0)
553    }
554
555    /// Check if a channel has triggered a restart.
556    pub fn is_restart_triggered(&self, channel_id: u8) -> bool {
557        self.channels
558            .get(&channel_id)
559            .map(|s| s.read().restart_triggered)
560            .unwrap_or(false)
561    }
562
563    /// Reset error state for a channel.
564    pub fn reset_channel(&self, channel_id: u8) {
565        if let Some(state_lock) = self.channels.get(&channel_id) {
566            state_lock.write().reset();
567        }
568    }
569
570    /// Remove a channel's tracking state entirely.
571    pub fn remove_channel(&self, channel_id: u8) {
572        self.channels.remove(&channel_id);
573    }
574
575    /// Get a summary for a specific channel.
576    pub fn channel_summary(&self, channel_id: u8) -> Option<ChannelErrorSummary> {
577        let window = self.config.window();
578        self.channels.get(&channel_id).map(|state_lock| {
579            let state = state_lock.read();
580            let categories = ErrorCategory::all();
581            let mut category_counts = [(ErrorCategory::SendFailure, 0u64); 6];
582            for (i, cat) in categories.iter().enumerate() {
583                category_counts[i] = (*cat, state.category_counts[i]);
584            }
585
586            ChannelErrorSummary {
587                channel_id,
588                consecutive_errors: state.consecutive_errors,
589                total_successes: state.total_successes,
590                total_failures: state.total_failures,
591                window_error_count: state.window_error_count(window),
592                restart_triggered: state.restart_triggered,
593                last_success_elapsed_ms: state
594                    .last_success_at
595                    .map(|t| t.elapsed().as_millis() as u64),
596                last_error_elapsed_ms: state.last_error_at.map(|t| t.elapsed().as_millis() as u64),
597                category_counts,
598            }
599        })
600    }
601
602    /// Get global statistics.
603    pub fn stats_snapshot(&self) -> TrackerStatsSnapshot {
604        self.stats.snapshot()
605    }
606
607    /// Ensure a channel state entry exists.
608    fn ensure_channel(&self, channel_id: u8) {
609        self.channels
610            .entry(channel_id)
611            .or_insert_with(|| RwLock::new(ChannelErrorState::new()));
612    }
613}
614
615impl fmt::Debug for SendErrorTracker {
616    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617        f.debug_struct("SendErrorTracker")
618            .field("enabled", &self.config.enabled)
619            .field("channels", &self.channels.len())
620            .field("consecutive_threshold", &self.config.consecutive_threshold)
621            .field("window_ms", &self.config.window_ms)
622            .field("rate_threshold", &self.config.rate_threshold)
623            .finish()
624    }
625}
626
627impl Default for SendErrorTracker {
628    fn default() -> Self {
629        Self::with_defaults()
630    }
631}
632
633// ============================================================================
634// Tests
635// ============================================================================
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640
641    #[test]
642    fn test_error_category_names() {
643        assert_eq!(ErrorCategory::SendFailure.name(), "SendFailure");
644        assert_eq!(ErrorCategory::AckError.name(), "AckError");
645        assert_eq!(ErrorCategory::AckTimeout.name(), "AckTimeout");
646        assert_eq!(ErrorCategory::ConfirmationNack.name(), "ConfirmationNack");
647        assert_eq!(
648            ErrorCategory::ConfirmationTimeout.name(),
649            "ConfirmationTimeout"
650        );
651        assert_eq!(ErrorCategory::FlowControlDrop.name(), "FlowControlDrop");
652    }
653
654    #[test]
655    fn test_error_category_display() {
656        assert_eq!(format!("{}", ErrorCategory::SendFailure), "SendFailure");
657    }
658
659    #[test]
660    fn test_error_category_all() {
661        assert_eq!(ErrorCategory::all().len(), 6);
662    }
663
664    #[test]
665    fn test_tracker_success_resets_consecutive() {
666        let tracker = SendErrorTracker::with_defaults();
667
668        tracker.on_send_failure(1, ErrorCategory::SendFailure);
669        tracker.on_send_failure(1, ErrorCategory::SendFailure);
670        assert_eq!(tracker.consecutive_errors(1), 2);
671
672        tracker.on_send_success(1);
673        assert_eq!(tracker.consecutive_errors(1), 0);
674    }
675
676    #[test]
677    fn test_tracker_consecutive_threshold() {
678        let config = SendErrorTrackerConfig {
679            consecutive_threshold: 3,
680            ..Default::default()
681        };
682        let tracker = SendErrorTracker::new(config);
683
684        let r1 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
685        assert_eq!(r1, TrackingResult::Recorded);
686
687        let r2 = tracker.on_send_failure(1, ErrorCategory::AckError);
688        assert_eq!(r2, TrackingResult::Recorded);
689
690        let r3 = tracker.on_send_failure(1, ErrorCategory::AckTimeout);
691        assert!(matches!(
692            r3,
693            TrackingResult::ConsecutiveThresholdExceeded {
694                consecutive_errors: 3,
695                threshold: 3,
696            }
697        ));
698
699        assert!(r3.requires_restart());
700        assert!(!r3.is_warning());
701    }
702
703    #[test]
704    fn test_tracker_restart_only_triggers_once() {
705        let config = SendErrorTrackerConfig {
706            consecutive_threshold: 2,
707            ..Default::default()
708        };
709        let tracker = SendErrorTracker::new(config);
710
711        let r1 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
712        assert_eq!(r1, TrackingResult::Recorded);
713
714        let r2 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
715        assert!(matches!(
716            r2,
717            TrackingResult::ConsecutiveThresholdExceeded { .. }
718        ));
719
720        // Subsequent failures should NOT re-trigger (restart_triggered=true)
721        let r3 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
722        // This may still be Recorded or RateThresholdExceeded, but not ConsecutiveThresholdExceeded
723        assert!(!matches!(
724            r3,
725            TrackingResult::ConsecutiveThresholdExceeded { .. }
726        ));
727    }
728
729    #[test]
730    fn test_tracker_success_clears_restart_flag() {
731        let config = SendErrorTrackerConfig {
732            consecutive_threshold: 2,
733            ..Default::default()
734        };
735        let tracker = SendErrorTracker::new(config);
736
737        tracker.on_send_failure(1, ErrorCategory::SendFailure);
738        tracker.on_send_failure(1, ErrorCategory::SendFailure);
739        assert!(tracker.is_restart_triggered(1));
740
741        tracker.on_send_success(1);
742        assert!(!tracker.is_restart_triggered(1));
743
744        // Can trigger again
745        tracker.on_send_failure(1, ErrorCategory::SendFailure);
746        let r = tracker.on_send_failure(1, ErrorCategory::SendFailure);
747        assert!(matches!(
748            r,
749            TrackingResult::ConsecutiveThresholdExceeded { .. }
750        ));
751    }
752
753    #[test]
754    fn test_tracker_multi_channel() {
755        let tracker = SendErrorTracker::with_defaults();
756
757        tracker.on_send_failure(1, ErrorCategory::SendFailure);
758        tracker.on_send_failure(1, ErrorCategory::SendFailure);
759        tracker.on_send_failure(2, ErrorCategory::AckError);
760
761        assert_eq!(tracker.consecutive_errors(1), 2);
762        assert_eq!(tracker.consecutive_errors(2), 1);
763        assert_eq!(tracker.consecutive_errors(3), 0); // Non-existent channel
764    }
765
766    #[test]
767    fn test_tracker_window_error_count() {
768        let config = SendErrorTrackerConfig {
769            window_ms: 60_000, // 60 seconds
770            ..Default::default()
771        };
772        let tracker = SendErrorTracker::new(config);
773
774        tracker.on_send_failure(1, ErrorCategory::SendFailure);
775        tracker.on_send_failure(1, ErrorCategory::AckError);
776        tracker.on_send_failure(1, ErrorCategory::AckTimeout);
777
778        assert_eq!(tracker.window_error_count(1), 3);
779    }
780
781    #[test]
782    fn test_tracker_convenience_methods() {
783        let config = SendErrorTrackerConfig {
784            consecutive_threshold: 100, // High threshold so we don't trigger
785            ..Default::default()
786        };
787        let tracker = SendErrorTracker::new(config);
788
789        let r1 = tracker.on_ack_error(1, 0x21);
790        assert_eq!(r1, TrackingResult::Recorded);
791
792        let r2 = tracker.on_ack_timeout(1);
793        assert_eq!(r2, TrackingResult::Recorded);
794
795        let r3 = tracker.on_confirmation_nack(1);
796        assert_eq!(r3, TrackingResult::Recorded);
797
798        let r4 = tracker.on_confirmation_timeout(1);
799        assert_eq!(r4, TrackingResult::Recorded);
800
801        let r5 = tracker.on_flow_control_drop(1);
802        assert_eq!(r5, TrackingResult::Recorded);
803
804        assert_eq!(tracker.consecutive_errors(1), 5);
805    }
806
807    #[test]
808    fn test_tracker_reset_channel() {
809        let tracker = SendErrorTracker::with_defaults();
810
811        tracker.on_send_failure(1, ErrorCategory::SendFailure);
812        tracker.on_send_failure(1, ErrorCategory::SendFailure);
813        assert_eq!(tracker.consecutive_errors(1), 2);
814
815        tracker.reset_channel(1);
816        assert_eq!(tracker.consecutive_errors(1), 0);
817        assert!(!tracker.is_restart_triggered(1));
818    }
819
820    #[test]
821    fn test_tracker_remove_channel() {
822        let tracker = SendErrorTracker::with_defaults();
823
824        tracker.on_send_failure(1, ErrorCategory::SendFailure);
825        assert_eq!(tracker.consecutive_errors(1), 1);
826
827        tracker.remove_channel(1);
828        assert_eq!(tracker.consecutive_errors(1), 0);
829    }
830
831    #[test]
832    fn test_tracker_channel_summary() {
833        let tracker = SendErrorTracker::with_defaults();
834
835        tracker.on_send_success(1);
836        tracker.on_send_failure(1, ErrorCategory::SendFailure);
837        tracker.on_send_failure(1, ErrorCategory::AckError);
838
839        let summary = tracker.channel_summary(1).unwrap();
840        assert_eq!(summary.channel_id, 1);
841        assert_eq!(summary.consecutive_errors, 2);
842        assert_eq!(summary.total_successes, 1);
843        assert_eq!(summary.total_failures, 2);
844        assert!(!summary.restart_triggered);
845        assert!(summary.last_success_elapsed_ms.is_some());
846        assert!(summary.last_error_elapsed_ms.is_some());
847    }
848
849    #[test]
850    fn test_tracker_global_stats() {
851        let tracker = SendErrorTracker::with_defaults();
852
853        tracker.on_send_success(1);
854        tracker.on_send_success(2);
855        tracker.on_send_failure(1, ErrorCategory::SendFailure);
856
857        let stats = tracker.stats_snapshot();
858        assert_eq!(stats.total_successes, 2);
859        assert_eq!(stats.total_failures, 1);
860        assert_eq!(stats.total_events(), 3);
861
862        let rate = stats.error_rate();
863        assert!((rate - 1.0 / 3.0).abs() < 0.01);
864    }
865
866    #[test]
867    fn test_tracker_stats_empty() {
868        let stats = TrackerStatsSnapshot {
869            total_successes: 0,
870            total_failures: 0,
871            consecutive_triggers: 0,
872            rate_triggers: 0,
873        };
874        assert_eq!(stats.error_rate(), 0.0);
875        assert_eq!(stats.total_events(), 0);
876    }
877
878    #[test]
879    fn test_tracker_disabled() {
880        let config = SendErrorTrackerConfig {
881            enabled: false,
882            ..Default::default()
883        };
884        let tracker = SendErrorTracker::new(config);
885
886        tracker.on_send_success(1);
887        let result = tracker.on_send_failure(1, ErrorCategory::SendFailure);
888        assert_eq!(result, TrackingResult::Recorded);
889        assert_eq!(tracker.consecutive_errors(1), 0);
890    }
891
892    #[test]
893    fn test_config_validate() {
894        assert!(SendErrorTrackerConfig::default().validate().is_ok());
895
896        assert!(SendErrorTrackerConfig {
897            consecutive_threshold: 0,
898            ..Default::default()
899        }
900        .validate()
901        .is_err());
902
903        assert!(SendErrorTrackerConfig {
904            window_ms: 0,
905            ..Default::default()
906        }
907        .validate()
908        .is_err());
909
910        assert!(SendErrorTrackerConfig {
911            rate_threshold: 1.5,
912            ..Default::default()
913        }
914        .validate()
915        .is_err());
916
917        assert!(SendErrorTrackerConfig {
918            rate_threshold: -0.1,
919            ..Default::default()
920        }
921        .validate()
922        .is_err());
923    }
924
925    #[test]
926    fn test_config_defaults() {
927        let config = SendErrorTrackerConfig::default();
928        assert!(config.enabled);
929        assert_eq!(config.consecutive_threshold, 5);
930        assert_eq!(config.window_ms, 60_000);
931        assert_eq!(config.rate_threshold, 0.5);
932        assert_eq!(config.min_events_for_rate, 10);
933    }
934
935    #[test]
936    fn test_tracker_debug() {
937        let tracker = SendErrorTracker::with_defaults();
938        let debug_str = format!("{:?}", tracker);
939        assert!(debug_str.contains("SendErrorTracker"));
940        assert!(debug_str.contains("enabled"));
941    }
942
943    #[test]
944    fn test_tracking_result_properties() {
945        let recorded = TrackingResult::Recorded;
946        assert!(!recorded.requires_restart());
947        assert!(!recorded.is_warning());
948
949        let consecutive = TrackingResult::ConsecutiveThresholdExceeded {
950            consecutive_errors: 5,
951            threshold: 5,
952        };
953        assert!(consecutive.requires_restart());
954        assert!(!consecutive.is_warning());
955
956        let rate = TrackingResult::RateThresholdExceeded {
957            error_count: 10,
958            window_ms: 60_000,
959            rate: 75,
960        };
961        assert!(!rate.requires_restart());
962        assert!(rate.is_warning());
963    }
964
965    #[test]
966    fn test_category_counts_in_summary() {
967        let tracker = SendErrorTracker::with_defaults();
968
969        tracker.on_send_failure(1, ErrorCategory::SendFailure);
970        tracker.on_send_failure(1, ErrorCategory::SendFailure);
971        tracker.on_send_failure(1, ErrorCategory::AckError);
972        tracker.on_send_failure(1, ErrorCategory::ConfirmationNack);
973
974        let summary = tracker.channel_summary(1).unwrap();
975
976        // Find SendFailure count
977        let send_failures = summary
978            .category_counts
979            .iter()
980            .find(|(cat, _)| *cat == ErrorCategory::SendFailure)
981            .map(|(_, count)| *count)
982            .unwrap();
983        assert_eq!(send_failures, 2);
984
985        let ack_errors = summary
986            .category_counts
987            .iter()
988            .find(|(cat, _)| *cat == ErrorCategory::AckError)
989            .map(|(_, count)| *count)
990            .unwrap();
991        assert_eq!(ack_errors, 1);
992    }
993}