use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use tracing::{debug, info, trace, warn};
use super::chain::{FilterResult, FrameEnvelope};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitBreakerState {
Closed,
Open {
opened_at: Instant,
},
HalfOpen,
}
impl CircuitBreakerState {
pub fn name(&self) -> &'static str {
match self {
Self::Closed => "Closed",
Self::Open { .. } => "Open",
Self::HalfOpen => "HalfOpen",
}
}
pub fn allows_traffic(&self) -> bool {
matches!(self, Self::Closed | Self::HalfOpen)
}
}
impl fmt::Display for CircuitBreakerState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Closed => write!(f, "Closed"),
Self::Open { opened_at } => {
let elapsed = opened_at.elapsed();
write!(f, "Open({}ms ago)", elapsed.as_millis())
}
Self::HalfOpen => write!(f, "HalfOpen"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryFilterConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default = "default_max_retries")]
pub max_retries: u8,
#[serde(default = "default_retry_delay_ms")]
pub retry_delay_ms: u64,
#[serde(default = "default_true")]
pub exponential_backoff: bool,
#[serde(default = "default_max_retry_delay_ms")]
pub max_retry_delay_ms: u64,
#[serde(default = "default_failure_threshold")]
pub failure_threshold: u32,
#[serde(default = "default_recovery_timeout_ms")]
pub recovery_timeout_ms: u64,
#[serde(default = "default_success_threshold")]
pub success_threshold: u32,
}
fn default_true() -> bool {
true
}
fn default_max_retries() -> u8 {
3
}
fn default_retry_delay_ms() -> u64 {
100
}
fn default_max_retry_delay_ms() -> u64 {
5000
}
fn default_failure_threshold() -> u32 {
5
}
fn default_recovery_timeout_ms() -> u64 {
10000
}
fn default_success_threshold() -> u32 {
1
}
impl Default for RetryFilterConfig {
fn default() -> Self {
Self {
enabled: true,
max_retries: default_max_retries(),
retry_delay_ms: default_retry_delay_ms(),
exponential_backoff: true,
max_retry_delay_ms: default_max_retry_delay_ms(),
failure_threshold: default_failure_threshold(),
recovery_timeout_ms: default_recovery_timeout_ms(),
success_threshold: default_success_threshold(),
}
}
}
impl RetryFilterConfig {
pub fn retry_delay(&self, attempt: u8) -> Duration {
let base = self.retry_delay_ms;
let delay = if self.exponential_backoff {
base.saturating_mul(1u64 << attempt.min(10))
} else {
base
};
let capped = delay.min(self.max_retry_delay_ms);
Duration::from_millis(capped)
}
pub fn recovery_timeout(&self) -> Duration {
Duration::from_millis(self.recovery_timeout_ms)
}
pub fn validate(&self) -> Result<(), String> {
if self.failure_threshold == 0 {
return Err("RetryFilter failure_threshold must be > 0".to_string());
}
if self.recovery_timeout_ms == 0 {
return Err("RetryFilter recovery_timeout_ms must be > 0".to_string());
}
if self.success_threshold == 0 {
return Err("RetryFilter success_threshold must be > 0".to_string());
}
Ok(())
}
}
#[derive(Debug, Default)]
pub struct RetryFilterStats {
pub direct_pass: AtomicU64,
pub circuit_open_drops: AtomicU64,
pub probe_frames: AtomicU64,
pub retry_attempts: AtomicU64,
pub successes: AtomicU64,
pub failures: AtomicU64,
pub state_transitions: AtomicU64,
pub circuit_trips: AtomicU64,
pub circuit_resets: AtomicU64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RetryFilterStatsSnapshot {
pub direct_pass: u64,
pub circuit_open_drops: u64,
pub probe_frames: u64,
pub retry_attempts: u64,
pub successes: u64,
pub failures: u64,
pub state_transitions: u64,
pub circuit_trips: u64,
pub circuit_resets: u64,
}
pub struct RetryFilter {
config: RetryFilterConfig,
circuit_state: RwLock<CircuitBreakerState>,
failure_count: RwLock<u32>,
success_count: RwLock<u32>,
stats: RetryFilterStats,
}
impl RetryFilter {
pub fn new(config: RetryFilterConfig) -> Self {
Self {
config,
circuit_state: RwLock::new(CircuitBreakerState::Closed),
failure_count: RwLock::new(0),
success_count: RwLock::new(0),
stats: RetryFilterStats::default(),
}
}
pub fn circuit_state(&self) -> CircuitBreakerState {
let state = *self.circuit_state.read();
if let CircuitBreakerState::Open { opened_at } = state {
if opened_at.elapsed() >= self.config.recovery_timeout() {
let mut state_w = self.circuit_state.write();
if let CircuitBreakerState::Open { opened_at: oa } = *state_w {
if oa.elapsed() >= self.config.recovery_timeout() {
*state_w = CircuitBreakerState::HalfOpen;
*self.success_count.write() = 0;
self.stats.state_transitions.fetch_add(1, Ordering::Relaxed);
info!(
recovery_timeout_ms = self.config.recovery_timeout_ms,
"CircuitBreaker: Open → HalfOpen (recovery timeout elapsed)"
);
return CircuitBreakerState::HalfOpen;
}
}
}
}
state
}
pub fn process_send(&self, envelope: &FrameEnvelope) -> FilterResult {
if !self.config.enabled {
return FilterResult::pass();
}
let state = self.circuit_state();
match state {
CircuitBreakerState::Closed => {
let delay = if envelope.retry_count > 0 {
let retry_delay = self.config.retry_delay(envelope.retry_count - 1);
self.stats.retry_attempts.fetch_add(1, Ordering::Relaxed);
trace!(
retry_count = envelope.retry_count,
delay_ms = retry_delay.as_millis(),
"RetryFilter: retry with delay"
);
retry_delay
} else {
Duration::ZERO
};
self.stats.direct_pass.fetch_add(1, Ordering::Relaxed);
FilterResult::pass_with_delay(delay)
}
CircuitBreakerState::Open { .. } => {
self.stats
.circuit_open_drops
.fetch_add(1, Ordering::Relaxed);
debug!(
channel_id = envelope.channel_id,
"RetryFilter: circuit breaker OPEN, dropping frame"
);
FilterResult::Dropped {
reason: format!(
"CircuitBreaker: circuit is Open (failures >= {})",
self.config.failure_threshold
),
}
}
CircuitBreakerState::HalfOpen => {
self.stats.probe_frames.fetch_add(1, Ordering::Relaxed);
debug!(
channel_id = envelope.channel_id,
"RetryFilter: circuit breaker HalfOpen, allowing probe frame"
);
FilterResult::pass()
}
}
}
pub fn process_recv(&self, _envelope: &FrameEnvelope) -> FilterResult {
FilterResult::pass()
}
pub fn on_success(&self) {
if !self.config.enabled {
return;
}
self.stats.successes.fetch_add(1, Ordering::Relaxed);
let mut state = self.circuit_state.write();
match *state {
CircuitBreakerState::Closed => {
*self.failure_count.write() = 0;
}
CircuitBreakerState::HalfOpen => {
let mut sc = self.success_count.write();
*sc += 1;
if *sc >= self.config.success_threshold {
*state = CircuitBreakerState::Closed;
*self.failure_count.write() = 0;
*sc = 0;
self.stats.state_transitions.fetch_add(1, Ordering::Relaxed);
self.stats.circuit_resets.fetch_add(1, Ordering::Relaxed);
info!(
success_threshold = self.config.success_threshold,
"CircuitBreaker: HalfOpen → Closed (recovery successful)"
);
}
}
CircuitBreakerState::Open { .. } => {
}
}
}
pub fn on_failure(&self) {
if !self.config.enabled {
return;
}
self.stats.failures.fetch_add(1, Ordering::Relaxed);
let mut state = self.circuit_state.write();
match *state {
CircuitBreakerState::Closed => {
let mut fc = self.failure_count.write();
*fc += 1;
if *fc >= self.config.failure_threshold {
*state = CircuitBreakerState::Open {
opened_at: Instant::now(),
};
self.stats.state_transitions.fetch_add(1, Ordering::Relaxed);
self.stats.circuit_trips.fetch_add(1, Ordering::Relaxed);
warn!(
failure_count = *fc,
threshold = self.config.failure_threshold,
recovery_timeout_ms = self.config.recovery_timeout_ms,
"CircuitBreaker: Closed → Open (failure threshold reached)"
);
}
}
CircuitBreakerState::HalfOpen => {
*state = CircuitBreakerState::Open {
opened_at: Instant::now(),
};
*self.success_count.write() = 0;
self.stats.state_transitions.fetch_add(1, Ordering::Relaxed);
self.stats.circuit_trips.fetch_add(1, Ordering::Relaxed);
warn!("CircuitBreaker: HalfOpen → Open (probe failed)");
}
CircuitBreakerState::Open { .. } => {
*self.failure_count.write() += 1;
}
}
}
pub fn can_retry(&self, envelope: &FrameEnvelope) -> bool {
if !self.config.enabled {
return false;
}
envelope.retry_count < self.config.max_retries && self.circuit_state().allows_traffic()
}
pub fn retry_delay(&self, attempt: u8) -> Duration {
self.config.retry_delay(attempt)
}
pub fn failure_count(&self) -> u32 {
*self.failure_count.read()
}
pub fn success_count(&self) -> u32 {
*self.success_count.read()
}
pub fn force_state(&self, state: CircuitBreakerState) {
*self.circuit_state.write() = state;
}
pub fn reset(&self) {
*self.circuit_state.write() = CircuitBreakerState::Closed;
*self.failure_count.write() = 0;
*self.success_count.write() = 0;
}
pub fn stats_snapshot(&self) -> RetryFilterStatsSnapshot {
RetryFilterStatsSnapshot {
direct_pass: self.stats.direct_pass.load(Ordering::Relaxed),
circuit_open_drops: self.stats.circuit_open_drops.load(Ordering::Relaxed),
probe_frames: self.stats.probe_frames.load(Ordering::Relaxed),
retry_attempts: self.stats.retry_attempts.load(Ordering::Relaxed),
successes: self.stats.successes.load(Ordering::Relaxed),
failures: self.stats.failures.load(Ordering::Relaxed),
state_transitions: self.stats.state_transitions.load(Ordering::Relaxed),
circuit_trips: self.stats.circuit_trips.load(Ordering::Relaxed),
circuit_resets: self.stats.circuit_resets.load(Ordering::Relaxed),
}
}
}
impl fmt::Debug for RetryFilter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RetryFilter")
.field("enabled", &self.config.enabled)
.field("circuit_state", &*self.circuit_state.read())
.field("failure_count", &*self.failure_count.read())
.field("success_count", &*self.success_count.read())
.field("max_retries", &self.config.max_retries)
.field("failure_threshold", &self.config.failure_threshold)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::address::{GroupAddress, IndividualAddress};
use crate::cemi::CemiFrame;
fn make_envelope() -> FrameEnvelope {
let cemi = CemiFrame::group_value_write(
IndividualAddress::new(1, 1, 1),
GroupAddress::three_level(1, 0, 1),
vec![0x01],
);
FrameEnvelope::new(cemi, 1, "192.168.1.100:3671".parse().unwrap())
}
fn make_retry_envelope(retry_count: u8) -> FrameEnvelope {
let mut env = make_envelope();
env.retry_count = retry_count;
env
}
#[test]
fn test_circuit_breaker_state_names() {
assert_eq!(CircuitBreakerState::Closed.name(), "Closed");
assert_eq!(
CircuitBreakerState::Open {
opened_at: Instant::now()
}
.name(),
"Open"
);
assert_eq!(CircuitBreakerState::HalfOpen.name(), "HalfOpen");
}
#[test]
fn test_circuit_breaker_allows_traffic() {
assert!(CircuitBreakerState::Closed.allows_traffic());
assert!(CircuitBreakerState::HalfOpen.allows_traffic());
assert!(!CircuitBreakerState::Open {
opened_at: Instant::now()
}
.allows_traffic());
}
#[test]
fn test_circuit_breaker_display() {
let s = CircuitBreakerState::Closed.to_string();
assert_eq!(s, "Closed");
let s = CircuitBreakerState::HalfOpen.to_string();
assert_eq!(s, "HalfOpen");
let s = CircuitBreakerState::Open {
opened_at: Instant::now(),
}
.to_string();
assert!(s.starts_with("Open("));
}
#[test]
fn test_config_defaults() {
let config = RetryFilterConfig::default();
assert!(config.enabled);
assert_eq!(config.max_retries, 3);
assert_eq!(config.retry_delay_ms, 100);
assert!(config.exponential_backoff);
assert_eq!(config.max_retry_delay_ms, 5000);
assert_eq!(config.failure_threshold, 5);
assert_eq!(config.recovery_timeout_ms, 10000);
assert_eq!(config.success_threshold, 1);
}
#[test]
fn test_config_retry_delay_constant() {
let mut config = RetryFilterConfig::default();
config.exponential_backoff = false;
config.retry_delay_ms = 100;
assert_eq!(config.retry_delay(0), Duration::from_millis(100));
assert_eq!(config.retry_delay(1), Duration::from_millis(100));
assert_eq!(config.retry_delay(5), Duration::from_millis(100));
}
#[test]
fn test_config_retry_delay_exponential() {
let config = RetryFilterConfig::default();
assert_eq!(config.retry_delay(0), Duration::from_millis(100)); assert_eq!(config.retry_delay(1), Duration::from_millis(200)); assert_eq!(config.retry_delay(2), Duration::from_millis(400)); assert_eq!(config.retry_delay(3), Duration::from_millis(800)); }
#[test]
fn test_config_retry_delay_capped() {
let mut config = RetryFilterConfig::default();
config.max_retry_delay_ms = 500;
assert_eq!(config.retry_delay(0), Duration::from_millis(100));
assert_eq!(config.retry_delay(1), Duration::from_millis(200));
assert_eq!(config.retry_delay(2), Duration::from_millis(400));
assert_eq!(config.retry_delay(3), Duration::from_millis(500)); assert_eq!(config.retry_delay(10), Duration::from_millis(500)); }
#[test]
fn test_config_validate() {
let config = RetryFilterConfig::default();
assert!(config.validate().is_ok());
let mut bad = RetryFilterConfig::default();
bad.failure_threshold = 0;
assert!(bad.validate().is_err());
let mut bad = RetryFilterConfig::default();
bad.recovery_timeout_ms = 0;
assert!(bad.validate().is_err());
let mut bad = RetryFilterConfig::default();
bad.success_threshold = 0;
assert!(bad.validate().is_err());
}
#[test]
fn test_retry_filter_disabled() {
let mut config = RetryFilterConfig::default();
config.enabled = false;
let filter = RetryFilter::new(config);
let envelope = make_envelope();
let result = filter.process_send(&envelope);
assert!(result.should_continue());
}
#[test]
fn test_retry_filter_closed_passthrough() {
let config = RetryFilterConfig::default();
let filter = RetryFilter::new(config);
let envelope = make_envelope();
let result = filter.process_send(&envelope);
assert!(matches!(
result,
FilterResult::Pass {
delay
} if delay == Duration::ZERO
));
let stats = filter.stats_snapshot();
assert_eq!(stats.direct_pass, 1);
}
#[test]
fn test_retry_filter_retry_with_delay() {
let config = RetryFilterConfig::default();
let filter = RetryFilter::new(config);
let envelope = make_retry_envelope(2);
let result = filter.process_send(&envelope);
match result {
FilterResult::Pass { delay } => {
assert_eq!(delay, Duration::from_millis(200));
}
_ => panic!("Expected Pass with delay"),
}
let stats = filter.stats_snapshot();
assert_eq!(stats.retry_attempts, 1);
}
#[test]
fn test_circuit_breaker_trip() {
let mut config = RetryFilterConfig::default();
config.failure_threshold = 3;
let filter = RetryFilter::new(config);
filter.on_failure();
assert!(matches!(
filter.circuit_state(),
CircuitBreakerState::Closed
));
filter.on_failure();
assert!(matches!(
filter.circuit_state(),
CircuitBreakerState::Closed
));
filter.on_failure();
assert!(matches!(
filter.circuit_state(),
CircuitBreakerState::Open { .. }
));
let stats = filter.stats_snapshot();
assert_eq!(stats.failures, 3);
assert_eq!(stats.circuit_trips, 1);
}
#[test]
fn test_circuit_breaker_open_drops_frames() {
let mut config = RetryFilterConfig::default();
config.failure_threshold = 1;
let filter = RetryFilter::new(config);
filter.on_failure(); assert!(matches!(
filter.circuit_state(),
CircuitBreakerState::Open { .. }
));
let envelope = make_envelope();
let result = filter.process_send(&envelope);
assert!(matches!(result, FilterResult::Dropped { .. }));
let stats = filter.stats_snapshot();
assert_eq!(stats.circuit_open_drops, 1);
}
#[test]
fn test_circuit_breaker_halfopen_recovery() {
let mut config = RetryFilterConfig::default();
config.failure_threshold = 1;
config.recovery_timeout_ms = 1; config.success_threshold = 1;
let filter = RetryFilter::new(config);
filter.on_failure();
assert!(matches!(
filter.circuit_state(),
CircuitBreakerState::Open { .. }
));
std::thread::sleep(Duration::from_millis(5));
let state = filter.circuit_state();
assert!(matches!(state, CircuitBreakerState::HalfOpen));
let envelope = make_envelope();
let result = filter.process_send(&envelope);
assert!(result.should_continue());
let stats = filter.stats_snapshot();
assert_eq!(stats.probe_frames, 1);
filter.on_success();
assert!(matches!(
filter.circuit_state(),
CircuitBreakerState::Closed
));
assert_eq!(filter.stats_snapshot().circuit_resets, 1);
}
#[test]
fn test_circuit_breaker_halfopen_failure() {
let mut config = RetryFilterConfig::default();
config.failure_threshold = 1;
config.recovery_timeout_ms = 1;
let filter = RetryFilter::new(config);
filter.on_failure();
std::thread::sleep(Duration::from_millis(5));
assert!(matches!(
filter.circuit_state(),
CircuitBreakerState::HalfOpen
));
filter.on_failure();
assert!(matches!(
filter.circuit_state(),
CircuitBreakerState::Open { .. }
));
}
#[test]
fn test_success_resets_failure_count() {
let mut config = RetryFilterConfig::default();
config.failure_threshold = 5;
let filter = RetryFilter::new(config);
filter.on_failure();
filter.on_failure();
assert_eq!(filter.failure_count(), 2);
filter.on_success();
assert_eq!(filter.failure_count(), 0);
}
#[test]
fn test_can_retry() {
let mut config = RetryFilterConfig::default();
config.max_retries = 3;
let filter = RetryFilter::new(config);
let mut env = make_envelope();
env.retry_count = 0;
assert!(filter.can_retry(&env));
env.retry_count = 2;
assert!(filter.can_retry(&env));
env.retry_count = 3;
assert!(!filter.can_retry(&env));
}
#[test]
fn test_can_retry_circuit_open() {
let mut config = RetryFilterConfig::default();
config.failure_threshold = 1;
let filter = RetryFilter::new(config);
filter.on_failure();
let env = make_envelope();
assert!(!filter.can_retry(&env));
}
#[test]
fn test_force_state() {
let config = RetryFilterConfig::default();
let filter = RetryFilter::new(config);
filter.force_state(CircuitBreakerState::HalfOpen);
assert!(matches!(
filter.circuit_state(),
CircuitBreakerState::HalfOpen
));
filter.force_state(CircuitBreakerState::Closed);
assert!(matches!(
filter.circuit_state(),
CircuitBreakerState::Closed
));
}
#[test]
fn test_reset() {
let mut config = RetryFilterConfig::default();
config.failure_threshold = 1;
let filter = RetryFilter::new(config);
filter.on_failure(); assert!(matches!(
filter.circuit_state(),
CircuitBreakerState::Open { .. }
));
filter.reset();
assert!(matches!(
filter.circuit_state(),
CircuitBreakerState::Closed
));
assert_eq!(filter.failure_count(), 0);
assert_eq!(filter.success_count(), 0);
}
#[test]
fn test_retry_filter_recv_passthrough() {
let config = RetryFilterConfig::default();
let filter = RetryFilter::new(config);
let envelope = make_envelope();
let result = filter.process_recv(&envelope);
assert!(result.should_continue());
}
#[test]
fn test_retry_filter_debug() {
let config = RetryFilterConfig::default();
let filter = RetryFilter::new(config);
let debug_str = format!("{:?}", filter);
assert!(debug_str.contains("RetryFilter"));
assert!(debug_str.contains("Closed"));
}
#[test]
fn test_retry_filter_stats_snapshot() {
let config = RetryFilterConfig::default();
let filter = RetryFilter::new(config);
filter.on_success();
filter.on_failure();
let stats = filter.stats_snapshot();
assert_eq!(stats.successes, 1);
assert_eq!(stats.failures, 1);
}
}