Skip to main content

mabi_knx/
error_tracker.rs

1//! Send Error Tracker with consecutive and sliding window error rate monitoring.
2//!
3//! This module provides production-grade error tracking for KNXnet/IP frame
4//! transmission, supporting two complementary monitoring strategies:
5//!
6//! ## Consecutive Error Tracking
7//!
8//! Tracks consecutive send failures. When the count reaches a configurable
9//! threshold, a tunnel restart is triggered (matching knxd behavior where
10//! 5 consecutive errors trigger reconnection).
11//!
12//! ## Sliding Window Error Rate
13//!
14//! Monitors the error rate over a configurable time window (default: 60 seconds).
15//! This catches intermittent but persistent error patterns that wouldn't trigger
16//! the consecutive threshold.
17//!
18//! ## Per-Channel Tracking
19//!
20//! Each tunnel connection (channel) has independent error tracking. This allows
21//! the simulator to inject channel-specific failure patterns for testing.
22//!
23//! ## Integration
24//!
25//! The tracker is integrated into the server's send pipeline:
26//! - `on_send_success(channel_id)` — resets consecutive counter
27//! - `on_send_failure(channel_id, error)` — increments counters, checks thresholds
28//! - `on_ack_error(channel_id, status)` — tracks ACK-level errors
29//! - `on_confirmation_nack(channel_id)` — tracks L_Data.con failures
30
31use std::collections::VecDeque;
32use std::fmt;
33use std::sync::atomic::{AtomicU64, Ordering};
34use std::time::{Duration, Instant};
35
36use dashmap::DashMap;
37use parking_lot::RwLock;
38use serde::{Deserialize, Serialize};
39
40// ============================================================================
41// Error Event
42// ============================================================================
43
44/// A recorded error event with timestamp and classification.
45#[derive(Debug, Clone)]
46struct ErrorEvent {
47    /// When the error occurred.
48    timestamp: Instant,
49    /// Error category — stored for per-category window analysis.
50    #[allow(dead_code)]
51    category: ErrorCategory,
52}
53
54/// Error category for classification.
55#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
56pub enum ErrorCategory {
57    /// UDP send failure (I/O error).
58    SendFailure,
59    /// ACK error (non-zero status in TUNNELLING_ACK).
60    AckError,
61    /// ACK timeout (no ACK received within timeout).
62    AckTimeout,
63    /// L_Data.con NACK (confirmation failure from bus).
64    ConfirmationNack,
65    /// L_Data.con timeout (no confirmation received).
66    ConfirmationTimeout,
67    /// Flow control drop (filter chain dropped the frame).
68    FlowControlDrop,
69}
70
71impl ErrorCategory {
72    /// Human-readable name.
73    pub fn name(&self) -> &'static str {
74        match self {
75            Self::SendFailure => "SendFailure",
76            Self::AckError => "AckError",
77            Self::AckTimeout => "AckTimeout",
78            Self::ConfirmationNack => "ConfirmationNack",
79            Self::ConfirmationTimeout => "ConfirmationTimeout",
80            Self::FlowControlDrop => "FlowControlDrop",
81        }
82    }
83
84    /// All categories.
85    pub fn all() -> &'static [ErrorCategory] {
86        &[
87            Self::SendFailure,
88            Self::AckError,
89            Self::AckTimeout,
90            Self::ConfirmationNack,
91            Self::ConfirmationTimeout,
92            Self::FlowControlDrop,
93        ]
94    }
95}
96
97impl fmt::Display for ErrorCategory {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        write!(f, "{}", self.name())
100    }
101}
102
103// ============================================================================
104// Channel Error State
105// ============================================================================
106
107/// Per-channel error tracking state.
108struct ChannelErrorState {
109    /// Consecutive error counter — resets on success.
110    consecutive_errors: u32,
111    /// Sliding window of recent error events.
112    recent_errors: VecDeque<ErrorEvent>,
113    /// Total successes.
114    total_successes: u64,
115    /// Total failures.
116    total_failures: u64,
117    /// Per-category failure counts.
118    category_counts: [u64; 6],
119    /// Timestamp of last successful send.
120    last_success_at: Option<Instant>,
121    /// Timestamp of last error.
122    last_error_at: Option<Instant>,
123    /// Whether this channel has been flagged for restart.
124    restart_triggered: bool,
125}
126
127impl ChannelErrorState {
128    fn new() -> Self {
129        Self {
130            consecutive_errors: 0,
131            recent_errors: VecDeque::with_capacity(128),
132            total_successes: 0,
133            total_failures: 0,
134            category_counts: [0; 6],
135            last_success_at: None,
136            last_error_at: None,
137            restart_triggered: false,
138        }
139    }
140
141    /// Record a successful send.
142    fn record_success(&mut self) {
143        self.consecutive_errors = 0;
144        self.total_successes += 1;
145        self.last_success_at = Some(Instant::now());
146        self.restart_triggered = false;
147    }
148
149    /// Record a failed send.
150    fn record_failure(&mut self, category: ErrorCategory) {
151        self.consecutive_errors += 1;
152        self.total_failures += 1;
153        self.last_error_at = Some(Instant::now());
154        self.category_counts[category as usize] += 1;
155        self.recent_errors.push_back(ErrorEvent {
156            timestamp: Instant::now(),
157            category,
158        });
159    }
160
161    /// Prune events older than the window duration.
162    fn prune_old_events(&mut self, window: Duration) {
163        let cutoff = Instant::now() - window;
164        while let Some(front) = self.recent_errors.front() {
165            if front.timestamp < cutoff {
166                self.recent_errors.pop_front();
167            } else {
168                break;
169            }
170        }
171    }
172
173    /// Get the error count within the sliding window.
174    fn window_error_count(&self, window: Duration) -> usize {
175        let cutoff = Instant::now() - window;
176        self.recent_errors
177            .iter()
178            .filter(|e| e.timestamp >= cutoff)
179            .count()
180    }
181
182    /// Calculate error rate within the sliding window.
183    /// Returns (errors_in_window, total_in_window, rate).
184    fn window_error_rate(&self, window: Duration, total_sends_in_window: u64) -> f64 {
185        if total_sends_in_window == 0 {
186            return 0.0;
187        }
188        let errors = self.window_error_count(window) as f64;
189        errors / total_sends_in_window as f64
190    }
191
192    /// Reset all state for this channel.
193    fn reset(&mut self) {
194        self.consecutive_errors = 0;
195        self.recent_errors.clear();
196        self.restart_triggered = false;
197    }
198}
199
200// ============================================================================
201// Tracker Configuration
202// ============================================================================
203
204/// SendErrorTracker configuration.
205#[derive(Debug, Clone, Serialize, Deserialize)]
206pub struct SendErrorTrackerConfig {
207    /// Whether the error tracker is enabled.
208    #[serde(default = "default_true")]
209    pub enabled: bool,
210
211    /// Consecutive error threshold for tunnel restart.
212    /// When consecutive errors reach this count, the tracker signals a restart.
213    /// Default: 5 (matches knxd behavior).
214    #[serde(default = "default_consecutive_threshold")]
215    pub consecutive_threshold: u32,
216
217    /// Sliding window duration in milliseconds.
218    /// Errors older than this are pruned from the window.
219    /// Default: 60000 (60 seconds).
220    #[serde(default = "default_window_ms")]
221    pub window_ms: u64,
222
223    /// Error rate threshold within the sliding window (0.0 - 1.0).
224    /// When the error rate exceeds this within the window, a warning is triggered.
225    /// Default: 0.5 (50% error rate).
226    #[serde(default = "default_rate_threshold")]
227    pub rate_threshold: f64,
228
229    /// Minimum number of events in the window before rate threshold applies.
230    /// Prevents false positives when only a few events have occurred.
231    /// Default: 10.
232    #[serde(default = "default_min_events_for_rate")]
233    pub min_events_for_rate: u64,
234}
235
236fn default_true() -> bool {
237    true
238}
239
240fn default_consecutive_threshold() -> u32 {
241    5
242}
243
244fn default_window_ms() -> u64 {
245    60_000
246}
247
248fn default_rate_threshold() -> f64 {
249    0.5
250}
251
252fn default_min_events_for_rate() -> u64 {
253    10
254}
255
256impl Default for SendErrorTrackerConfig {
257    fn default() -> Self {
258        Self {
259            enabled: true,
260            consecutive_threshold: default_consecutive_threshold(),
261            window_ms: default_window_ms(),
262            rate_threshold: default_rate_threshold(),
263            min_events_for_rate: default_min_events_for_rate(),
264        }
265    }
266}
267
268impl SendErrorTrackerConfig {
269    /// Get the sliding window duration.
270    pub fn window(&self) -> Duration {
271        Duration::from_millis(self.window_ms)
272    }
273
274    /// Validate the configuration.
275    pub fn validate(&self) -> Result<(), String> {
276        if self.consecutive_threshold == 0 {
277            return Err("SendErrorTracker consecutive_threshold must be > 0".to_string());
278        }
279        if self.window_ms == 0 {
280            return Err("SendErrorTracker window_ms must be > 0".to_string());
281        }
282        if self.rate_threshold < 0.0 || self.rate_threshold > 1.0 {
283            return Err(format!(
284                "SendErrorTracker rate_threshold must be 0.0-1.0, got {}",
285                self.rate_threshold
286            ));
287        }
288        Ok(())
289    }
290}
291
292// ============================================================================
293// Tracker Result
294// ============================================================================
295
296/// Result of error tracking evaluation.
297///
298/// Returned by `on_send_failure()` to indicate what action should be taken.
299#[derive(Debug, Clone, PartialEq, Eq)]
300pub enum TrackingResult {
301    /// Error recorded, no threshold exceeded.
302    Recorded,
303    /// Consecutive error threshold exceeded — tunnel restart recommended.
304    ConsecutiveThresholdExceeded {
305        consecutive_errors: u32,
306        threshold: u32,
307    },
308    /// Sliding window error rate exceeded — warning state.
309    RateThresholdExceeded {
310        error_count: usize,
311        window_ms: u64,
312        rate: u32, // Rate as percentage (0-100)
313    },
314}
315
316impl TrackingResult {
317    /// Whether a tunnel restart is recommended.
318    pub fn requires_restart(&self) -> bool {
319        matches!(self, Self::ConsecutiveThresholdExceeded { .. })
320    }
321
322    /// Whether this is a warning state.
323    pub fn is_warning(&self) -> bool {
324        matches!(self, Self::RateThresholdExceeded { .. })
325    }
326}
327
328// ============================================================================
329// SendErrorTracker
330// ============================================================================
331
332/// Production-grade send error tracker with per-channel state.
333///
334/// Thread-safe — uses DashMap for per-channel state and atomics for global counters.
335pub struct SendErrorTracker {
336    /// Configuration.
337    config: SendErrorTrackerConfig,
338    /// Per-channel error state.
339    channels: DashMap<u8, RwLock<ChannelErrorState>>,
340    /// Global statistics.
341    stats: TrackerStats,
342}
343
344/// Global tracker statistics.
345pub struct TrackerStats {
346    /// Total successes across all channels.
347    pub total_successes: AtomicU64,
348    /// Total failures across all channels.
349    pub total_failures: AtomicU64,
350    /// Number of times consecutive threshold was exceeded.
351    pub consecutive_triggers: AtomicU64,
352    /// Number of times rate threshold was exceeded.
353    pub rate_triggers: AtomicU64,
354}
355
356impl TrackerStats {
357    fn new() -> Self {
358        Self {
359            total_successes: AtomicU64::new(0),
360            total_failures: AtomicU64::new(0),
361            consecutive_triggers: AtomicU64::new(0),
362            rate_triggers: AtomicU64::new(0),
363        }
364    }
365
366    /// Take a snapshot.
367    pub fn snapshot(&self) -> TrackerStatsSnapshot {
368        TrackerStatsSnapshot {
369            total_successes: self.total_successes.load(Ordering::Relaxed),
370            total_failures: self.total_failures.load(Ordering::Relaxed),
371            consecutive_triggers: self.consecutive_triggers.load(Ordering::Relaxed),
372            rate_triggers: self.rate_triggers.load(Ordering::Relaxed),
373        }
374    }
375}
376
377/// Immutable snapshot of tracker statistics.
378#[derive(Debug, Clone)]
379pub struct TrackerStatsSnapshot {
380    pub total_successes: u64,
381    pub total_failures: u64,
382    pub consecutive_triggers: u64,
383    pub rate_triggers: u64,
384}
385
386impl TrackerStatsSnapshot {
387    /// Overall error rate.
388    pub fn error_rate(&self) -> f64 {
389        let total = self.total_successes + self.total_failures;
390        if total == 0 {
391            return 0.0;
392        }
393        self.total_failures as f64 / total as f64
394    }
395
396    /// Total events.
397    pub fn total_events(&self) -> u64 {
398        self.total_successes + self.total_failures
399    }
400}
401
402/// Per-channel error summary.
403#[derive(Debug, Clone)]
404pub struct ChannelErrorSummary {
405    pub channel_id: u8,
406    pub consecutive_errors: u32,
407    pub total_successes: u64,
408    pub total_failures: u64,
409    pub window_error_count: usize,
410    pub restart_triggered: bool,
411    pub last_success_elapsed_ms: Option<u64>,
412    pub last_error_elapsed_ms: Option<u64>,
413    pub category_counts: [(ErrorCategory, u64); 6],
414}
415
416impl SendErrorTracker {
417    /// Create a new tracker with the given configuration.
418    pub fn new(config: SendErrorTrackerConfig) -> Self {
419        Self {
420            channels: DashMap::new(),
421            config,
422            stats: TrackerStats::new(),
423        }
424    }
425
426    /// Create a tracker with default configuration.
427    pub fn with_defaults() -> Self {
428        Self::new(SendErrorTrackerConfig::default())
429    }
430
431    /// Check if the tracker is enabled.
432    pub fn is_enabled(&self) -> bool {
433        self.config.enabled
434    }
435
436    /// Get the configuration.
437    pub fn config(&self) -> &SendErrorTrackerConfig {
438        &self.config
439    }
440
441    /// Record a successful send for a channel.
442    pub fn on_send_success(&self, channel_id: u8) {
443        if !self.config.enabled {
444            return;
445        }
446
447        self.ensure_channel(channel_id);
448
449        if let Some(state_lock) = self.channels.get(&channel_id) {
450            let mut state = state_lock.write();
451            state.record_success();
452        }
453
454        self.stats.total_successes.fetch_add(1, Ordering::Relaxed);
455    }
456
457    /// Record a send failure for a channel.
458    ///
459    /// Returns a `TrackingResult` indicating whether any threshold was exceeded.
460    pub fn on_send_failure(
461        &self,
462        channel_id: u8,
463        category: ErrorCategory,
464    ) -> TrackingResult {
465        if !self.config.enabled {
466            return TrackingResult::Recorded;
467        }
468
469        self.ensure_channel(channel_id);
470        self.stats.total_failures.fetch_add(1, Ordering::Relaxed);
471
472        let result = if let Some(state_lock) = self.channels.get(&channel_id) {
473            let mut state = state_lock.write();
474            state.record_failure(category);
475
476            // Prune old events
477            let window = self.config.window();
478            state.prune_old_events(window);
479
480            // Check consecutive threshold
481            if state.consecutive_errors >= self.config.consecutive_threshold
482                && !state.restart_triggered
483            {
484                state.restart_triggered = true;
485                self.stats.consecutive_triggers.fetch_add(1, Ordering::Relaxed);
486                return TrackingResult::ConsecutiveThresholdExceeded {
487                    consecutive_errors: state.consecutive_errors,
488                    threshold: self.config.consecutive_threshold,
489                };
490            }
491
492            // Check sliding window rate threshold
493            let total_in_window = state.total_successes + state.total_failures;
494            if total_in_window >= self.config.min_events_for_rate {
495                let error_count = state.window_error_count(window);
496                let rate = state.window_error_rate(window, total_in_window);
497                if rate > self.config.rate_threshold {
498                    self.stats.rate_triggers.fetch_add(1, Ordering::Relaxed);
499                    return TrackingResult::RateThresholdExceeded {
500                        error_count,
501                        window_ms: self.config.window_ms,
502                        rate: (rate * 100.0) as u32,
503                    };
504                }
505            }
506
507            TrackingResult::Recorded
508        } else {
509            TrackingResult::Recorded
510        };
511
512        result
513    }
514
515    /// Record an ACK error.
516    pub fn on_ack_error(&self, channel_id: u8, _status: u8) -> TrackingResult {
517        self.on_send_failure(channel_id, ErrorCategory::AckError)
518    }
519
520    /// Record an ACK timeout.
521    pub fn on_ack_timeout(&self, channel_id: u8) -> TrackingResult {
522        self.on_send_failure(channel_id, ErrorCategory::AckTimeout)
523    }
524
525    /// Record a confirmation NACK.
526    pub fn on_confirmation_nack(&self, channel_id: u8) -> TrackingResult {
527        self.on_send_failure(channel_id, ErrorCategory::ConfirmationNack)
528    }
529
530    /// Record a confirmation timeout.
531    pub fn on_confirmation_timeout(&self, channel_id: u8) -> TrackingResult {
532        self.on_send_failure(channel_id, ErrorCategory::ConfirmationTimeout)
533    }
534
535    /// Record a flow control drop.
536    pub fn on_flow_control_drop(&self, channel_id: u8) -> TrackingResult {
537        self.on_send_failure(channel_id, ErrorCategory::FlowControlDrop)
538    }
539
540    /// Get the consecutive error count for a channel.
541    pub fn consecutive_errors(&self, channel_id: u8) -> u32 {
542        self.channels
543            .get(&channel_id)
544            .map(|s| s.read().consecutive_errors)
545            .unwrap_or(0)
546    }
547
548    /// Get the error count in the current sliding window for a channel.
549    pub fn window_error_count(&self, channel_id: u8) -> usize {
550        let window = self.config.window();
551        self.channels
552            .get(&channel_id)
553            .map(|s| s.read().window_error_count(window))
554            .unwrap_or(0)
555    }
556
557    /// Check if a channel has triggered a restart.
558    pub fn is_restart_triggered(&self, channel_id: u8) -> bool {
559        self.channels
560            .get(&channel_id)
561            .map(|s| s.read().restart_triggered)
562            .unwrap_or(false)
563    }
564
565    /// Reset error state for a channel.
566    pub fn reset_channel(&self, channel_id: u8) {
567        if let Some(state_lock) = self.channels.get(&channel_id) {
568            state_lock.write().reset();
569        }
570    }
571
572    /// Remove a channel's tracking state entirely.
573    pub fn remove_channel(&self, channel_id: u8) {
574        self.channels.remove(&channel_id);
575    }
576
577    /// Get a summary for a specific channel.
578    pub fn channel_summary(&self, channel_id: u8) -> Option<ChannelErrorSummary> {
579        let window = self.config.window();
580        self.channels.get(&channel_id).map(|state_lock| {
581            let state = state_lock.read();
582            let categories = ErrorCategory::all();
583            let mut category_counts = [(ErrorCategory::SendFailure, 0u64); 6];
584            for (i, cat) in categories.iter().enumerate() {
585                category_counts[i] = (*cat, state.category_counts[i]);
586            }
587
588            ChannelErrorSummary {
589                channel_id,
590                consecutive_errors: state.consecutive_errors,
591                total_successes: state.total_successes,
592                total_failures: state.total_failures,
593                window_error_count: state.window_error_count(window),
594                restart_triggered: state.restart_triggered,
595                last_success_elapsed_ms: state.last_success_at.map(|t| t.elapsed().as_millis() as u64),
596                last_error_elapsed_ms: state.last_error_at.map(|t| t.elapsed().as_millis() as u64),
597                category_counts,
598            }
599        })
600    }
601
602    /// Get global statistics.
603    pub fn stats_snapshot(&self) -> TrackerStatsSnapshot {
604        self.stats.snapshot()
605    }
606
607    /// Ensure a channel state entry exists.
608    fn ensure_channel(&self, channel_id: u8) {
609        self.channels
610            .entry(channel_id)
611            .or_insert_with(|| RwLock::new(ChannelErrorState::new()));
612    }
613}
614
615impl fmt::Debug for SendErrorTracker {
616    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
617        f.debug_struct("SendErrorTracker")
618            .field("enabled", &self.config.enabled)
619            .field("channels", &self.channels.len())
620            .field("consecutive_threshold", &self.config.consecutive_threshold)
621            .field("window_ms", &self.config.window_ms)
622            .field("rate_threshold", &self.config.rate_threshold)
623            .finish()
624    }
625}
626
627impl Default for SendErrorTracker {
628    fn default() -> Self {
629        Self::with_defaults()
630    }
631}
632
633// ============================================================================
634// Tests
635// ============================================================================
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640
641    #[test]
642    fn test_error_category_names() {
643        assert_eq!(ErrorCategory::SendFailure.name(), "SendFailure");
644        assert_eq!(ErrorCategory::AckError.name(), "AckError");
645        assert_eq!(ErrorCategory::AckTimeout.name(), "AckTimeout");
646        assert_eq!(ErrorCategory::ConfirmationNack.name(), "ConfirmationNack");
647        assert_eq!(ErrorCategory::ConfirmationTimeout.name(), "ConfirmationTimeout");
648        assert_eq!(ErrorCategory::FlowControlDrop.name(), "FlowControlDrop");
649    }
650
651    #[test]
652    fn test_error_category_display() {
653        assert_eq!(format!("{}", ErrorCategory::SendFailure), "SendFailure");
654    }
655
656    #[test]
657    fn test_error_category_all() {
658        assert_eq!(ErrorCategory::all().len(), 6);
659    }
660
661    #[test]
662    fn test_tracker_success_resets_consecutive() {
663        let tracker = SendErrorTracker::with_defaults();
664
665        tracker.on_send_failure(1, ErrorCategory::SendFailure);
666        tracker.on_send_failure(1, ErrorCategory::SendFailure);
667        assert_eq!(tracker.consecutive_errors(1), 2);
668
669        tracker.on_send_success(1);
670        assert_eq!(tracker.consecutive_errors(1), 0);
671    }
672
673    #[test]
674    fn test_tracker_consecutive_threshold() {
675        let config = SendErrorTrackerConfig {
676            consecutive_threshold: 3,
677            ..Default::default()
678        };
679        let tracker = SendErrorTracker::new(config);
680
681        let r1 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
682        assert_eq!(r1, TrackingResult::Recorded);
683
684        let r2 = tracker.on_send_failure(1, ErrorCategory::AckError);
685        assert_eq!(r2, TrackingResult::Recorded);
686
687        let r3 = tracker.on_send_failure(1, ErrorCategory::AckTimeout);
688        assert!(matches!(r3, TrackingResult::ConsecutiveThresholdExceeded {
689            consecutive_errors: 3,
690            threshold: 3,
691        }));
692
693        assert!(r3.requires_restart());
694        assert!(!r3.is_warning());
695    }
696
697    #[test]
698    fn test_tracker_restart_only_triggers_once() {
699        let config = SendErrorTrackerConfig {
700            consecutive_threshold: 2,
701            ..Default::default()
702        };
703        let tracker = SendErrorTracker::new(config);
704
705        let r1 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
706        assert_eq!(r1, TrackingResult::Recorded);
707
708        let r2 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
709        assert!(matches!(r2, TrackingResult::ConsecutiveThresholdExceeded { .. }));
710
711        // Subsequent failures should NOT re-trigger (restart_triggered=true)
712        let r3 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
713        // This may still be Recorded or RateThresholdExceeded, but not ConsecutiveThresholdExceeded
714        assert!(!matches!(r3, TrackingResult::ConsecutiveThresholdExceeded { .. }));
715    }
716
717    #[test]
718    fn test_tracker_success_clears_restart_flag() {
719        let config = SendErrorTrackerConfig {
720            consecutive_threshold: 2,
721            ..Default::default()
722        };
723        let tracker = SendErrorTracker::new(config);
724
725        tracker.on_send_failure(1, ErrorCategory::SendFailure);
726        tracker.on_send_failure(1, ErrorCategory::SendFailure);
727        assert!(tracker.is_restart_triggered(1));
728
729        tracker.on_send_success(1);
730        assert!(!tracker.is_restart_triggered(1));
731
732        // Can trigger again
733        tracker.on_send_failure(1, ErrorCategory::SendFailure);
734        let r = tracker.on_send_failure(1, ErrorCategory::SendFailure);
735        assert!(matches!(r, TrackingResult::ConsecutiveThresholdExceeded { .. }));
736    }
737
738    #[test]
739    fn test_tracker_multi_channel() {
740        let tracker = SendErrorTracker::with_defaults();
741
742        tracker.on_send_failure(1, ErrorCategory::SendFailure);
743        tracker.on_send_failure(1, ErrorCategory::SendFailure);
744        tracker.on_send_failure(2, ErrorCategory::AckError);
745
746        assert_eq!(tracker.consecutive_errors(1), 2);
747        assert_eq!(tracker.consecutive_errors(2), 1);
748        assert_eq!(tracker.consecutive_errors(3), 0); // Non-existent channel
749    }
750
751    #[test]
752    fn test_tracker_window_error_count() {
753        let config = SendErrorTrackerConfig {
754            window_ms: 60_000, // 60 seconds
755            ..Default::default()
756        };
757        let tracker = SendErrorTracker::new(config);
758
759        tracker.on_send_failure(1, ErrorCategory::SendFailure);
760        tracker.on_send_failure(1, ErrorCategory::AckError);
761        tracker.on_send_failure(1, ErrorCategory::AckTimeout);
762
763        assert_eq!(tracker.window_error_count(1), 3);
764    }
765
766    #[test]
767    fn test_tracker_convenience_methods() {
768        let config = SendErrorTrackerConfig {
769            consecutive_threshold: 100, // High threshold so we don't trigger
770            ..Default::default()
771        };
772        let tracker = SendErrorTracker::new(config);
773
774        let r1 = tracker.on_ack_error(1, 0x21);
775        assert_eq!(r1, TrackingResult::Recorded);
776
777        let r2 = tracker.on_ack_timeout(1);
778        assert_eq!(r2, TrackingResult::Recorded);
779
780        let r3 = tracker.on_confirmation_nack(1);
781        assert_eq!(r3, TrackingResult::Recorded);
782
783        let r4 = tracker.on_confirmation_timeout(1);
784        assert_eq!(r4, TrackingResult::Recorded);
785
786        let r5 = tracker.on_flow_control_drop(1);
787        assert_eq!(r5, TrackingResult::Recorded);
788
789        assert_eq!(tracker.consecutive_errors(1), 5);
790    }
791
792    #[test]
793    fn test_tracker_reset_channel() {
794        let tracker = SendErrorTracker::with_defaults();
795
796        tracker.on_send_failure(1, ErrorCategory::SendFailure);
797        tracker.on_send_failure(1, ErrorCategory::SendFailure);
798        assert_eq!(tracker.consecutive_errors(1), 2);
799
800        tracker.reset_channel(1);
801        assert_eq!(tracker.consecutive_errors(1), 0);
802        assert!(!tracker.is_restart_triggered(1));
803    }
804
805    #[test]
806    fn test_tracker_remove_channel() {
807        let tracker = SendErrorTracker::with_defaults();
808
809        tracker.on_send_failure(1, ErrorCategory::SendFailure);
810        assert_eq!(tracker.consecutive_errors(1), 1);
811
812        tracker.remove_channel(1);
813        assert_eq!(tracker.consecutive_errors(1), 0);
814    }
815
816    #[test]
817    fn test_tracker_channel_summary() {
818        let tracker = SendErrorTracker::with_defaults();
819
820        tracker.on_send_success(1);
821        tracker.on_send_failure(1, ErrorCategory::SendFailure);
822        tracker.on_send_failure(1, ErrorCategory::AckError);
823
824        let summary = tracker.channel_summary(1).unwrap();
825        assert_eq!(summary.channel_id, 1);
826        assert_eq!(summary.consecutive_errors, 2);
827        assert_eq!(summary.total_successes, 1);
828        assert_eq!(summary.total_failures, 2);
829        assert!(!summary.restart_triggered);
830        assert!(summary.last_success_elapsed_ms.is_some());
831        assert!(summary.last_error_elapsed_ms.is_some());
832    }
833
834    #[test]
835    fn test_tracker_global_stats() {
836        let tracker = SendErrorTracker::with_defaults();
837
838        tracker.on_send_success(1);
839        tracker.on_send_success(2);
840        tracker.on_send_failure(1, ErrorCategory::SendFailure);
841
842        let stats = tracker.stats_snapshot();
843        assert_eq!(stats.total_successes, 2);
844        assert_eq!(stats.total_failures, 1);
845        assert_eq!(stats.total_events(), 3);
846
847        let rate = stats.error_rate();
848        assert!((rate - 1.0 / 3.0).abs() < 0.01);
849    }
850
851    #[test]
852    fn test_tracker_stats_empty() {
853        let stats = TrackerStatsSnapshot {
854            total_successes: 0,
855            total_failures: 0,
856            consecutive_triggers: 0,
857            rate_triggers: 0,
858        };
859        assert_eq!(stats.error_rate(), 0.0);
860        assert_eq!(stats.total_events(), 0);
861    }
862
863    #[test]
864    fn test_tracker_disabled() {
865        let config = SendErrorTrackerConfig {
866            enabled: false,
867            ..Default::default()
868        };
869        let tracker = SendErrorTracker::new(config);
870
871        tracker.on_send_success(1);
872        let result = tracker.on_send_failure(1, ErrorCategory::SendFailure);
873        assert_eq!(result, TrackingResult::Recorded);
874        assert_eq!(tracker.consecutive_errors(1), 0);
875    }
876
877    #[test]
878    fn test_config_validate() {
879        assert!(SendErrorTrackerConfig::default().validate().is_ok());
880
881        assert!(SendErrorTrackerConfig {
882            consecutive_threshold: 0,
883            ..Default::default()
884        }.validate().is_err());
885
886        assert!(SendErrorTrackerConfig {
887            window_ms: 0,
888            ..Default::default()
889        }.validate().is_err());
890
891        assert!(SendErrorTrackerConfig {
892            rate_threshold: 1.5,
893            ..Default::default()
894        }.validate().is_err());
895
896        assert!(SendErrorTrackerConfig {
897            rate_threshold: -0.1,
898            ..Default::default()
899        }.validate().is_err());
900    }
901
902    #[test]
903    fn test_config_defaults() {
904        let config = SendErrorTrackerConfig::default();
905        assert!(config.enabled);
906        assert_eq!(config.consecutive_threshold, 5);
907        assert_eq!(config.window_ms, 60_000);
908        assert_eq!(config.rate_threshold, 0.5);
909        assert_eq!(config.min_events_for_rate, 10);
910    }
911
912    #[test]
913    fn test_tracker_debug() {
914        let tracker = SendErrorTracker::with_defaults();
915        let debug_str = format!("{:?}", tracker);
916        assert!(debug_str.contains("SendErrorTracker"));
917        assert!(debug_str.contains("enabled"));
918    }
919
920    #[test]
921    fn test_tracking_result_properties() {
922        let recorded = TrackingResult::Recorded;
923        assert!(!recorded.requires_restart());
924        assert!(!recorded.is_warning());
925
926        let consecutive = TrackingResult::ConsecutiveThresholdExceeded {
927            consecutive_errors: 5,
928            threshold: 5,
929        };
930        assert!(consecutive.requires_restart());
931        assert!(!consecutive.is_warning());
932
933        let rate = TrackingResult::RateThresholdExceeded {
934            error_count: 10,
935            window_ms: 60_000,
936            rate: 75,
937        };
938        assert!(!rate.requires_restart());
939        assert!(rate.is_warning());
940    }
941
942    #[test]
943    fn test_category_counts_in_summary() {
944        let tracker = SendErrorTracker::with_defaults();
945
946        tracker.on_send_failure(1, ErrorCategory::SendFailure);
947        tracker.on_send_failure(1, ErrorCategory::SendFailure);
948        tracker.on_send_failure(1, ErrorCategory::AckError);
949        tracker.on_send_failure(1, ErrorCategory::ConfirmationNack);
950
951        let summary = tracker.channel_summary(1).unwrap();
952
953        // Find SendFailure count
954        let send_failures = summary.category_counts
955            .iter()
956            .find(|(cat, _)| *cat == ErrorCategory::SendFailure)
957            .map(|(_, count)| *count)
958            .unwrap();
959        assert_eq!(send_failures, 2);
960
961        let ack_errors = summary.category_counts
962            .iter()
963            .find(|(cat, _)| *cat == ErrorCategory::AckError)
964            .map(|(_, count)| *count)
965            .unwrap();
966        assert_eq!(ack_errors, 1);
967    }
968}