use std::collections::VecDeque;
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use dashmap::DashMap;
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone)]
struct ErrorEvent {
timestamp: Instant,
#[allow(dead_code)]
category: ErrorCategory,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ErrorCategory {
SendFailure,
AckError,
AckTimeout,
ConfirmationNack,
ConfirmationTimeout,
FlowControlDrop,
}
impl ErrorCategory {
pub fn name(&self) -> &'static str {
match self {
Self::SendFailure => "SendFailure",
Self::AckError => "AckError",
Self::AckTimeout => "AckTimeout",
Self::ConfirmationNack => "ConfirmationNack",
Self::ConfirmationTimeout => "ConfirmationTimeout",
Self::FlowControlDrop => "FlowControlDrop",
}
}
pub fn all() -> &'static [ErrorCategory] {
&[
Self::SendFailure,
Self::AckError,
Self::AckTimeout,
Self::ConfirmationNack,
Self::ConfirmationTimeout,
Self::FlowControlDrop,
]
}
}
impl fmt::Display for ErrorCategory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name())
}
}
struct ChannelErrorState {
consecutive_errors: u32,
recent_errors: VecDeque<ErrorEvent>,
total_successes: u64,
total_failures: u64,
category_counts: [u64; 6],
last_success_at: Option<Instant>,
last_error_at: Option<Instant>,
restart_triggered: bool,
}
impl ChannelErrorState {
fn new() -> Self {
Self {
consecutive_errors: 0,
recent_errors: VecDeque::with_capacity(128),
total_successes: 0,
total_failures: 0,
category_counts: [0; 6],
last_success_at: None,
last_error_at: None,
restart_triggered: false,
}
}
fn record_success(&mut self) {
self.consecutive_errors = 0;
self.total_successes += 1;
self.last_success_at = Some(Instant::now());
self.restart_triggered = false;
}
fn record_failure(&mut self, category: ErrorCategory) {
self.consecutive_errors += 1;
self.total_failures += 1;
self.last_error_at = Some(Instant::now());
self.category_counts[category as usize] += 1;
self.recent_errors.push_back(ErrorEvent {
timestamp: Instant::now(),
category,
});
}
fn prune_old_events(&mut self, window: Duration) {
let cutoff = Instant::now() - window;
while let Some(front) = self.recent_errors.front() {
if front.timestamp < cutoff {
self.recent_errors.pop_front();
} else {
break;
}
}
}
fn window_error_count(&self, window: Duration) -> usize {
let cutoff = Instant::now() - window;
self.recent_errors
.iter()
.filter(|e| e.timestamp >= cutoff)
.count()
}
fn window_error_rate(&self, window: Duration, total_sends_in_window: u64) -> f64 {
if total_sends_in_window == 0 {
return 0.0;
}
let errors = self.window_error_count(window) as f64;
errors / total_sends_in_window as f64
}
fn reset(&mut self) {
self.consecutive_errors = 0;
self.recent_errors.clear();
self.restart_triggered = false;
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SendErrorTrackerConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default = "default_consecutive_threshold")]
pub consecutive_threshold: u32,
#[serde(default = "default_window_ms")]
pub window_ms: u64,
#[serde(default = "default_rate_threshold")]
pub rate_threshold: f64,
#[serde(default = "default_min_events_for_rate")]
pub min_events_for_rate: u64,
}
fn default_true() -> bool {
true
}
fn default_consecutive_threshold() -> u32 {
5
}
fn default_window_ms() -> u64 {
60_000
}
fn default_rate_threshold() -> f64 {
0.5
}
fn default_min_events_for_rate() -> u64 {
10
}
impl Default for SendErrorTrackerConfig {
fn default() -> Self {
Self {
enabled: true,
consecutive_threshold: default_consecutive_threshold(),
window_ms: default_window_ms(),
rate_threshold: default_rate_threshold(),
min_events_for_rate: default_min_events_for_rate(),
}
}
}
impl SendErrorTrackerConfig {
pub fn window(&self) -> Duration {
Duration::from_millis(self.window_ms)
}
pub fn validate(&self) -> Result<(), String> {
if self.consecutive_threshold == 0 {
return Err("SendErrorTracker consecutive_threshold must be > 0".to_string());
}
if self.window_ms == 0 {
return Err("SendErrorTracker window_ms must be > 0".to_string());
}
if self.rate_threshold < 0.0 || self.rate_threshold > 1.0 {
return Err(format!(
"SendErrorTracker rate_threshold must be 0.0-1.0, got {}",
self.rate_threshold
));
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TrackingResult {
Recorded,
ConsecutiveThresholdExceeded {
consecutive_errors: u32,
threshold: u32,
},
RateThresholdExceeded {
error_count: usize,
window_ms: u64,
rate: u32, },
}
impl TrackingResult {
pub fn requires_restart(&self) -> bool {
matches!(self, Self::ConsecutiveThresholdExceeded { .. })
}
pub fn is_warning(&self) -> bool {
matches!(self, Self::RateThresholdExceeded { .. })
}
}
pub struct SendErrorTracker {
config: SendErrorTrackerConfig,
channels: DashMap<u8, RwLock<ChannelErrorState>>,
stats: TrackerStats,
}
pub struct TrackerStats {
pub total_successes: AtomicU64,
pub total_failures: AtomicU64,
pub consecutive_triggers: AtomicU64,
pub rate_triggers: AtomicU64,
}
impl TrackerStats {
fn new() -> Self {
Self {
total_successes: AtomicU64::new(0),
total_failures: AtomicU64::new(0),
consecutive_triggers: AtomicU64::new(0),
rate_triggers: AtomicU64::new(0),
}
}
pub fn snapshot(&self) -> TrackerStatsSnapshot {
TrackerStatsSnapshot {
total_successes: self.total_successes.load(Ordering::Relaxed),
total_failures: self.total_failures.load(Ordering::Relaxed),
consecutive_triggers: self.consecutive_triggers.load(Ordering::Relaxed),
rate_triggers: self.rate_triggers.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct TrackerStatsSnapshot {
pub total_successes: u64,
pub total_failures: u64,
pub consecutive_triggers: u64,
pub rate_triggers: u64,
}
impl TrackerStatsSnapshot {
pub fn error_rate(&self) -> f64 {
let total = self.total_successes + self.total_failures;
if total == 0 {
return 0.0;
}
self.total_failures as f64 / total as f64
}
pub fn total_events(&self) -> u64 {
self.total_successes + self.total_failures
}
}
#[derive(Debug, Clone)]
pub struct ChannelErrorSummary {
pub channel_id: u8,
pub consecutive_errors: u32,
pub total_successes: u64,
pub total_failures: u64,
pub window_error_count: usize,
pub restart_triggered: bool,
pub last_success_elapsed_ms: Option<u64>,
pub last_error_elapsed_ms: Option<u64>,
pub category_counts: [(ErrorCategory, u64); 6],
}
impl SendErrorTracker {
pub fn new(config: SendErrorTrackerConfig) -> Self {
Self {
channels: DashMap::new(),
config,
stats: TrackerStats::new(),
}
}
pub fn with_defaults() -> Self {
Self::new(SendErrorTrackerConfig::default())
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
pub fn config(&self) -> &SendErrorTrackerConfig {
&self.config
}
pub fn on_send_success(&self, channel_id: u8) {
if !self.config.enabled {
return;
}
self.ensure_channel(channel_id);
if let Some(state_lock) = self.channels.get(&channel_id) {
let mut state = state_lock.write();
state.record_success();
}
self.stats.total_successes.fetch_add(1, Ordering::Relaxed);
}
pub fn on_send_failure(&self, channel_id: u8, category: ErrorCategory) -> TrackingResult {
if !self.config.enabled {
return TrackingResult::Recorded;
}
self.ensure_channel(channel_id);
self.stats.total_failures.fetch_add(1, Ordering::Relaxed);
let result = if let Some(state_lock) = self.channels.get(&channel_id) {
let mut state = state_lock.write();
state.record_failure(category);
let window = self.config.window();
state.prune_old_events(window);
if state.consecutive_errors >= self.config.consecutive_threshold
&& !state.restart_triggered
{
state.restart_triggered = true;
self.stats
.consecutive_triggers
.fetch_add(1, Ordering::Relaxed);
return TrackingResult::ConsecutiveThresholdExceeded {
consecutive_errors: state.consecutive_errors,
threshold: self.config.consecutive_threshold,
};
}
let total_in_window = state.total_successes + state.total_failures;
if total_in_window >= self.config.min_events_for_rate {
let error_count = state.window_error_count(window);
let rate = state.window_error_rate(window, total_in_window);
if rate > self.config.rate_threshold {
self.stats.rate_triggers.fetch_add(1, Ordering::Relaxed);
return TrackingResult::RateThresholdExceeded {
error_count,
window_ms: self.config.window_ms,
rate: (rate * 100.0) as u32,
};
}
}
TrackingResult::Recorded
} else {
TrackingResult::Recorded
};
result
}
pub fn on_ack_error(&self, channel_id: u8, _status: u8) -> TrackingResult {
self.on_send_failure(channel_id, ErrorCategory::AckError)
}
pub fn on_ack_timeout(&self, channel_id: u8) -> TrackingResult {
self.on_send_failure(channel_id, ErrorCategory::AckTimeout)
}
pub fn on_confirmation_nack(&self, channel_id: u8) -> TrackingResult {
self.on_send_failure(channel_id, ErrorCategory::ConfirmationNack)
}
pub fn on_confirmation_timeout(&self, channel_id: u8) -> TrackingResult {
self.on_send_failure(channel_id, ErrorCategory::ConfirmationTimeout)
}
pub fn on_flow_control_drop(&self, channel_id: u8) -> TrackingResult {
self.on_send_failure(channel_id, ErrorCategory::FlowControlDrop)
}
pub fn consecutive_errors(&self, channel_id: u8) -> u32 {
self.channels
.get(&channel_id)
.map(|s| s.read().consecutive_errors)
.unwrap_or(0)
}
pub fn window_error_count(&self, channel_id: u8) -> usize {
let window = self.config.window();
self.channels
.get(&channel_id)
.map(|s| s.read().window_error_count(window))
.unwrap_or(0)
}
pub fn is_restart_triggered(&self, channel_id: u8) -> bool {
self.channels
.get(&channel_id)
.map(|s| s.read().restart_triggered)
.unwrap_or(false)
}
pub fn reset_channel(&self, channel_id: u8) {
if let Some(state_lock) = self.channels.get(&channel_id) {
state_lock.write().reset();
}
}
pub fn remove_channel(&self, channel_id: u8) {
self.channels.remove(&channel_id);
}
pub fn channel_summary(&self, channel_id: u8) -> Option<ChannelErrorSummary> {
let window = self.config.window();
self.channels.get(&channel_id).map(|state_lock| {
let state = state_lock.read();
let categories = ErrorCategory::all();
let mut category_counts = [(ErrorCategory::SendFailure, 0u64); 6];
for (i, cat) in categories.iter().enumerate() {
category_counts[i] = (*cat, state.category_counts[i]);
}
ChannelErrorSummary {
channel_id,
consecutive_errors: state.consecutive_errors,
total_successes: state.total_successes,
total_failures: state.total_failures,
window_error_count: state.window_error_count(window),
restart_triggered: state.restart_triggered,
last_success_elapsed_ms: state
.last_success_at
.map(|t| t.elapsed().as_millis() as u64),
last_error_elapsed_ms: state.last_error_at.map(|t| t.elapsed().as_millis() as u64),
category_counts,
}
})
}
pub fn stats_snapshot(&self) -> TrackerStatsSnapshot {
self.stats.snapshot()
}
fn ensure_channel(&self, channel_id: u8) {
self.channels
.entry(channel_id)
.or_insert_with(|| RwLock::new(ChannelErrorState::new()));
}
}
impl fmt::Debug for SendErrorTracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SendErrorTracker")
.field("enabled", &self.config.enabled)
.field("channels", &self.channels.len())
.field("consecutive_threshold", &self.config.consecutive_threshold)
.field("window_ms", &self.config.window_ms)
.field("rate_threshold", &self.config.rate_threshold)
.finish()
}
}
impl Default for SendErrorTracker {
fn default() -> Self {
Self::with_defaults()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_error_category_names() {
assert_eq!(ErrorCategory::SendFailure.name(), "SendFailure");
assert_eq!(ErrorCategory::AckError.name(), "AckError");
assert_eq!(ErrorCategory::AckTimeout.name(), "AckTimeout");
assert_eq!(ErrorCategory::ConfirmationNack.name(), "ConfirmationNack");
assert_eq!(
ErrorCategory::ConfirmationTimeout.name(),
"ConfirmationTimeout"
);
assert_eq!(ErrorCategory::FlowControlDrop.name(), "FlowControlDrop");
}
#[test]
fn test_error_category_display() {
assert_eq!(format!("{}", ErrorCategory::SendFailure), "SendFailure");
}
#[test]
fn test_error_category_all() {
assert_eq!(ErrorCategory::all().len(), 6);
}
#[test]
fn test_tracker_success_resets_consecutive() {
let tracker = SendErrorTracker::with_defaults();
tracker.on_send_failure(1, ErrorCategory::SendFailure);
tracker.on_send_failure(1, ErrorCategory::SendFailure);
assert_eq!(tracker.consecutive_errors(1), 2);
tracker.on_send_success(1);
assert_eq!(tracker.consecutive_errors(1), 0);
}
#[test]
fn test_tracker_consecutive_threshold() {
let config = SendErrorTrackerConfig {
consecutive_threshold: 3,
..Default::default()
};
let tracker = SendErrorTracker::new(config);
let r1 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
assert_eq!(r1, TrackingResult::Recorded);
let r2 = tracker.on_send_failure(1, ErrorCategory::AckError);
assert_eq!(r2, TrackingResult::Recorded);
let r3 = tracker.on_send_failure(1, ErrorCategory::AckTimeout);
assert!(matches!(
r3,
TrackingResult::ConsecutiveThresholdExceeded {
consecutive_errors: 3,
threshold: 3,
}
));
assert!(r3.requires_restart());
assert!(!r3.is_warning());
}
#[test]
fn test_tracker_restart_only_triggers_once() {
let config = SendErrorTrackerConfig {
consecutive_threshold: 2,
..Default::default()
};
let tracker = SendErrorTracker::new(config);
let r1 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
assert_eq!(r1, TrackingResult::Recorded);
let r2 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
assert!(matches!(
r2,
TrackingResult::ConsecutiveThresholdExceeded { .. }
));
let r3 = tracker.on_send_failure(1, ErrorCategory::SendFailure);
assert!(!matches!(
r3,
TrackingResult::ConsecutiveThresholdExceeded { .. }
));
}
#[test]
fn test_tracker_success_clears_restart_flag() {
let config = SendErrorTrackerConfig {
consecutive_threshold: 2,
..Default::default()
};
let tracker = SendErrorTracker::new(config);
tracker.on_send_failure(1, ErrorCategory::SendFailure);
tracker.on_send_failure(1, ErrorCategory::SendFailure);
assert!(tracker.is_restart_triggered(1));
tracker.on_send_success(1);
assert!(!tracker.is_restart_triggered(1));
tracker.on_send_failure(1, ErrorCategory::SendFailure);
let r = tracker.on_send_failure(1, ErrorCategory::SendFailure);
assert!(matches!(
r,
TrackingResult::ConsecutiveThresholdExceeded { .. }
));
}
#[test]
fn test_tracker_multi_channel() {
let tracker = SendErrorTracker::with_defaults();
tracker.on_send_failure(1, ErrorCategory::SendFailure);
tracker.on_send_failure(1, ErrorCategory::SendFailure);
tracker.on_send_failure(2, ErrorCategory::AckError);
assert_eq!(tracker.consecutive_errors(1), 2);
assert_eq!(tracker.consecutive_errors(2), 1);
assert_eq!(tracker.consecutive_errors(3), 0); }
#[test]
fn test_tracker_window_error_count() {
let config = SendErrorTrackerConfig {
window_ms: 60_000, ..Default::default()
};
let tracker = SendErrorTracker::new(config);
tracker.on_send_failure(1, ErrorCategory::SendFailure);
tracker.on_send_failure(1, ErrorCategory::AckError);
tracker.on_send_failure(1, ErrorCategory::AckTimeout);
assert_eq!(tracker.window_error_count(1), 3);
}
#[test]
fn test_tracker_convenience_methods() {
let config = SendErrorTrackerConfig {
consecutive_threshold: 100, ..Default::default()
};
let tracker = SendErrorTracker::new(config);
let r1 = tracker.on_ack_error(1, 0x21);
assert_eq!(r1, TrackingResult::Recorded);
let r2 = tracker.on_ack_timeout(1);
assert_eq!(r2, TrackingResult::Recorded);
let r3 = tracker.on_confirmation_nack(1);
assert_eq!(r3, TrackingResult::Recorded);
let r4 = tracker.on_confirmation_timeout(1);
assert_eq!(r4, TrackingResult::Recorded);
let r5 = tracker.on_flow_control_drop(1);
assert_eq!(r5, TrackingResult::Recorded);
assert_eq!(tracker.consecutive_errors(1), 5);
}
#[test]
fn test_tracker_reset_channel() {
let tracker = SendErrorTracker::with_defaults();
tracker.on_send_failure(1, ErrorCategory::SendFailure);
tracker.on_send_failure(1, ErrorCategory::SendFailure);
assert_eq!(tracker.consecutive_errors(1), 2);
tracker.reset_channel(1);
assert_eq!(tracker.consecutive_errors(1), 0);
assert!(!tracker.is_restart_triggered(1));
}
#[test]
fn test_tracker_remove_channel() {
let tracker = SendErrorTracker::with_defaults();
tracker.on_send_failure(1, ErrorCategory::SendFailure);
assert_eq!(tracker.consecutive_errors(1), 1);
tracker.remove_channel(1);
assert_eq!(tracker.consecutive_errors(1), 0);
}
#[test]
fn test_tracker_channel_summary() {
let tracker = SendErrorTracker::with_defaults();
tracker.on_send_success(1);
tracker.on_send_failure(1, ErrorCategory::SendFailure);
tracker.on_send_failure(1, ErrorCategory::AckError);
let summary = tracker.channel_summary(1).unwrap();
assert_eq!(summary.channel_id, 1);
assert_eq!(summary.consecutive_errors, 2);
assert_eq!(summary.total_successes, 1);
assert_eq!(summary.total_failures, 2);
assert!(!summary.restart_triggered);
assert!(summary.last_success_elapsed_ms.is_some());
assert!(summary.last_error_elapsed_ms.is_some());
}
#[test]
fn test_tracker_global_stats() {
let tracker = SendErrorTracker::with_defaults();
tracker.on_send_success(1);
tracker.on_send_success(2);
tracker.on_send_failure(1, ErrorCategory::SendFailure);
let stats = tracker.stats_snapshot();
assert_eq!(stats.total_successes, 2);
assert_eq!(stats.total_failures, 1);
assert_eq!(stats.total_events(), 3);
let rate = stats.error_rate();
assert!((rate - 1.0 / 3.0).abs() < 0.01);
}
#[test]
fn test_tracker_stats_empty() {
let stats = TrackerStatsSnapshot {
total_successes: 0,
total_failures: 0,
consecutive_triggers: 0,
rate_triggers: 0,
};
assert_eq!(stats.error_rate(), 0.0);
assert_eq!(stats.total_events(), 0);
}
#[test]
fn test_tracker_disabled() {
let config = SendErrorTrackerConfig {
enabled: false,
..Default::default()
};
let tracker = SendErrorTracker::new(config);
tracker.on_send_success(1);
let result = tracker.on_send_failure(1, ErrorCategory::SendFailure);
assert_eq!(result, TrackingResult::Recorded);
assert_eq!(tracker.consecutive_errors(1), 0);
}
#[test]
fn test_config_validate() {
assert!(SendErrorTrackerConfig::default().validate().is_ok());
assert!(SendErrorTrackerConfig {
consecutive_threshold: 0,
..Default::default()
}
.validate()
.is_err());
assert!(SendErrorTrackerConfig {
window_ms: 0,
..Default::default()
}
.validate()
.is_err());
assert!(SendErrorTrackerConfig {
rate_threshold: 1.5,
..Default::default()
}
.validate()
.is_err());
assert!(SendErrorTrackerConfig {
rate_threshold: -0.1,
..Default::default()
}
.validate()
.is_err());
}
#[test]
fn test_config_defaults() {
let config = SendErrorTrackerConfig::default();
assert!(config.enabled);
assert_eq!(config.consecutive_threshold, 5);
assert_eq!(config.window_ms, 60_000);
assert_eq!(config.rate_threshold, 0.5);
assert_eq!(config.min_events_for_rate, 10);
}
#[test]
fn test_tracker_debug() {
let tracker = SendErrorTracker::with_defaults();
let debug_str = format!("{:?}", tracker);
assert!(debug_str.contains("SendErrorTracker"));
assert!(debug_str.contains("enabled"));
}
#[test]
fn test_tracking_result_properties() {
let recorded = TrackingResult::Recorded;
assert!(!recorded.requires_restart());
assert!(!recorded.is_warning());
let consecutive = TrackingResult::ConsecutiveThresholdExceeded {
consecutive_errors: 5,
threshold: 5,
};
assert!(consecutive.requires_restart());
assert!(!consecutive.is_warning());
let rate = TrackingResult::RateThresholdExceeded {
error_count: 10,
window_ms: 60_000,
rate: 75,
};
assert!(!rate.requires_restart());
assert!(rate.is_warning());
}
#[test]
fn test_category_counts_in_summary() {
let tracker = SendErrorTracker::with_defaults();
tracker.on_send_failure(1, ErrorCategory::SendFailure);
tracker.on_send_failure(1, ErrorCategory::SendFailure);
tracker.on_send_failure(1, ErrorCategory::AckError);
tracker.on_send_failure(1, ErrorCategory::ConfirmationNack);
let summary = tracker.channel_summary(1).unwrap();
let send_failures = summary
.category_counts
.iter()
.find(|(cat, _)| *cat == ErrorCategory::SendFailure)
.map(|(_, count)| *count)
.unwrap();
assert_eq!(send_failures, 2);
let ack_errors = summary
.category_counts
.iter()
.find(|(cat, _)| *cat == ErrorCategory::AckError)
.map(|(_, count)| *count)
.unwrap();
assert_eq!(ack_errors, 1);
}
}