use std::fmt;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use tracing::{debug, trace};
use crate::cemi::CemiFrame;
use super::pace::{PaceFilter, PaceFilterConfig};
use super::queue::{QueueFilter, QueueFilterConfig, QueuePriority};
use super::retry::{RetryFilter, RetryFilterConfig};
#[derive(Debug, Clone)]
pub struct FrameEnvelope {
pub cemi: CemiFrame,
pub channel_id: u8,
pub target_addr: SocketAddr,
pub priority: QueuePriority,
pub enqueued_at: Instant,
pub accumulated_delay: Duration,
pub retry_count: u8,
pub requires_ack: bool,
pub frame_size_bytes: usize,
}
impl FrameEnvelope {
pub fn new(cemi: CemiFrame, channel_id: u8, target_addr: SocketAddr) -> Self {
let frame_size_bytes = cemi.encode().len();
Self {
cemi,
channel_id,
target_addr,
priority: QueuePriority::Normal,
enqueued_at: Instant::now(),
accumulated_delay: Duration::ZERO,
retry_count: 0,
requires_ack: true,
frame_size_bytes,
}
}
pub fn with_priority(mut self, priority: QueuePriority) -> Self {
self.priority = priority;
self
}
pub fn with_ack_required(mut self, requires_ack: bool) -> Self {
self.requires_ack = requires_ack;
self
}
pub fn pipeline_duration(&self) -> Duration {
self.enqueued_at.elapsed()
}
}
#[derive(Debug, Clone)]
pub enum FilterResult {
Pass {
delay: Duration,
},
Queued,
Dropped {
reason: String,
},
Error {
message: String,
},
}
impl FilterResult {
pub fn pass() -> Self {
Self::Pass {
delay: Duration::ZERO,
}
}
pub fn pass_with_delay(delay: Duration) -> Self {
Self::Pass { delay }
}
pub fn should_continue(&self) -> bool {
matches!(self, Self::Pass { .. })
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FilterDirection {
Send,
Recv,
}
impl fmt::Display for FilterDirection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Send => write!(f, "send"),
Self::Recv => write!(f, "recv"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FilterChainConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default)]
pub pace: PaceFilterConfig,
#[serde(default)]
pub queue: QueueFilterConfig,
#[serde(default)]
pub retry: RetryFilterConfig,
}
impl Default for FilterChainConfig {
fn default() -> Self {
Self {
enabled: false,
pace: PaceFilterConfig::default(),
queue: QueueFilterConfig::default(),
retry: RetryFilterConfig::default(),
}
}
}
impl FilterChainConfig {
pub fn enabled() -> Self {
Self {
enabled: true,
..Default::default()
}
}
pub fn validate(&self) -> Result<(), String> {
self.pace.validate()?;
self.queue.validate()?;
self.retry.validate()?;
Ok(())
}
}
#[derive(Debug, Default)]
pub struct FilterChainStats {
pub frames_sent: AtomicU64,
pub frames_received: AtomicU64,
pub frames_dropped: AtomicU64,
pub frames_queued: AtomicU64,
pub total_delay_us: AtomicU64,
pub bypass_count: AtomicU64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FilterChainStatsSnapshot {
pub frames_sent: u64,
pub frames_received: u64,
pub frames_dropped: u64,
pub frames_queued: u64,
pub total_delay_us: u64,
pub bypass_count: u64,
}
pub struct FilterChain {
config: FilterChainConfig,
queue_filter: QueueFilter,
pace_filter: PaceFilter,
retry_filter: RetryFilter,
stats: Arc<FilterChainStats>,
}
impl FilterChain {
pub fn new(config: FilterChainConfig) -> Self {
Self {
queue_filter: QueueFilter::new(config.queue.clone()),
pace_filter: PaceFilter::new(config.pace.clone()),
retry_filter: RetryFilter::new(config.retry.clone()),
config,
stats: Arc::new(FilterChainStats::default()),
}
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
pub fn send(&self, envelope: &mut FrameEnvelope) -> FilterResult {
if !self.config.enabled {
self.stats.bypass_count.fetch_add(1, Ordering::Relaxed);
return FilterResult::pass();
}
self.stats.frames_sent.fetch_add(1, Ordering::Relaxed);
let queue_result = self.queue_filter.process_send(envelope);
if !queue_result.should_continue() {
match &queue_result {
FilterResult::Queued => {
self.stats.frames_queued.fetch_add(1, Ordering::Relaxed);
trace!(
channel_id = envelope.channel_id,
priority = ?envelope.priority,
"Frame queued by QueueFilter"
);
}
FilterResult::Dropped { reason } => {
self.stats.frames_dropped.fetch_add(1, Ordering::Relaxed);
debug!(
channel_id = envelope.channel_id,
reason = %reason,
"Frame dropped by QueueFilter"
);
}
_ => {}
}
return queue_result;
}
let pace_result = self.pace_filter.process_send(envelope);
if !pace_result.should_continue() {
match &pace_result {
FilterResult::Dropped { reason } => {
self.stats.frames_dropped.fetch_add(1, Ordering::Relaxed);
debug!(
channel_id = envelope.channel_id,
reason = %reason,
"Frame dropped by PaceFilter"
);
}
_ => {}
}
return pace_result;
}
if let FilterResult::Pass { delay } = &pace_result {
envelope.accumulated_delay += *delay;
}
let retry_result = self.retry_filter.process_send(envelope);
if !retry_result.should_continue() {
match &retry_result {
FilterResult::Dropped { reason } => {
self.stats.frames_dropped.fetch_add(1, Ordering::Relaxed);
debug!(
channel_id = envelope.channel_id,
reason = %reason,
"Frame dropped by RetryFilter (circuit breaker)"
);
}
_ => {}
}
return retry_result;
}
if let FilterResult::Pass { delay } = &retry_result {
envelope.accumulated_delay += *delay;
}
let total_delay = envelope.accumulated_delay;
if total_delay > Duration::ZERO {
self.stats
.total_delay_us
.fetch_add(total_delay.as_micros() as u64, Ordering::Relaxed);
}
trace!(
channel_id = envelope.channel_id,
delay_ms = total_delay.as_millis(),
priority = ?envelope.priority,
"Frame passed through filter chain (send)"
);
FilterResult::pass_with_delay(total_delay)
}
pub fn recv(&self, envelope: &mut FrameEnvelope) -> FilterResult {
if !self.config.enabled {
self.stats.bypass_count.fetch_add(1, Ordering::Relaxed);
return FilterResult::pass();
}
self.stats.frames_received.fetch_add(1, Ordering::Relaxed);
let retry_result = self.retry_filter.process_recv(envelope);
if !retry_result.should_continue() {
return retry_result;
}
let pace_result = self.pace_filter.process_recv(envelope);
if !pace_result.should_continue() {
return pace_result;
}
let queue_result = self.queue_filter.process_recv(envelope);
if !queue_result.should_continue() {
return queue_result;
}
trace!(
channel_id = envelope.channel_id,
"Frame passed through filter chain (recv)"
);
FilterResult::pass()
}
pub fn on_send_success(&self, channel_id: u8) {
if !self.config.enabled {
return;
}
self.retry_filter.on_success();
self.pace_filter.on_frame_completed();
self.queue_filter.on_ack_received(channel_id);
trace!(channel_id, "Filter chain: send success");
}
pub fn on_send_failure(&self, channel_id: u8, error: &str) {
if !self.config.enabled {
return;
}
self.retry_filter.on_failure();
self.queue_filter.on_send_error(channel_id);
debug!(channel_id, error, "Filter chain: send failure");
}
pub fn drain_pending(&self, channel_id: u8, max_count: usize) -> Vec<FrameEnvelope> {
if !self.config.enabled {
return Vec::new();
}
self.queue_filter.drain(channel_id, max_count)
}
pub fn has_pending(&self, channel_id: u8) -> bool {
if !self.config.enabled {
return false;
}
self.queue_filter.has_pending(channel_id)
}
pub fn pending_count(&self) -> usize {
if !self.config.enabled {
return 0;
}
self.queue_filter.total_pending()
}
pub fn pace_state(&self) -> super::pace::PaceState {
self.pace_filter.state()
}
pub fn circuit_breaker_state(&self) -> super::retry::CircuitBreakerState {
self.retry_filter.circuit_state()
}
pub fn queue_filter(&self) -> &QueueFilter {
&self.queue_filter
}
pub fn pace_filter(&self) -> &PaceFilter {
&self.pace_filter
}
pub fn retry_filter(&self) -> &RetryFilter {
&self.retry_filter
}
pub fn stats_snapshot(&self) -> FilterChainStatsSnapshot {
FilterChainStatsSnapshot {
frames_sent: self.stats.frames_sent.load(Ordering::Relaxed),
frames_received: self.stats.frames_received.load(Ordering::Relaxed),
frames_dropped: self.stats.frames_dropped.load(Ordering::Relaxed),
frames_queued: self.stats.frames_queued.load(Ordering::Relaxed),
total_delay_us: self.stats.total_delay_us.load(Ordering::Relaxed),
bypass_count: self.stats.bypass_count.load(Ordering::Relaxed),
}
}
}
impl fmt::Debug for FilterChain {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FilterChain")
.field("enabled", &self.config.enabled)
.field("pace_state", &self.pace_filter.state())
.field("circuit_breaker", &self.retry_filter.circuit_state())
.field("pending_frames", &self.queue_filter.total_pending())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::address::{GroupAddress, IndividualAddress};
use crate::cemi::CemiFrame;
fn make_envelope() -> FrameEnvelope {
let cemi = CemiFrame::group_value_write(
IndividualAddress::new(1, 1, 1),
GroupAddress::three_level(1, 0, 1),
vec![0x01],
);
FrameEnvelope::new(cemi, 1, "192.168.1.100:3671".parse().unwrap())
}
#[test]
fn test_filter_chain_disabled() {
let chain = FilterChain::new(FilterChainConfig::default());
assert!(!chain.is_enabled());
let mut envelope = make_envelope();
let result = chain.send(&mut envelope);
assert!(matches!(result, FilterResult::Pass { delay } if delay == Duration::ZERO));
let stats = chain.stats_snapshot();
assert_eq!(stats.bypass_count, 1);
assert_eq!(stats.frames_sent, 0);
}
#[test]
fn test_filter_chain_enabled_passthrough() {
let chain = FilterChain::new(FilterChainConfig::enabled());
assert!(chain.is_enabled());
let mut envelope = make_envelope();
let result = chain.send(&mut envelope);
assert!(result.should_continue());
let stats = chain.stats_snapshot();
assert_eq!(stats.frames_sent, 1);
assert_eq!(stats.bypass_count, 0);
}
#[test]
fn test_filter_chain_send_recv_cycle() {
let chain = FilterChain::new(FilterChainConfig::enabled());
let mut send_env = make_envelope();
let send_result = chain.send(&mut send_env);
assert!(send_result.should_continue());
let mut recv_env = make_envelope();
let recv_result = chain.recv(&mut recv_env);
assert!(recv_result.should_continue());
let stats = chain.stats_snapshot();
assert_eq!(stats.frames_sent, 1);
assert_eq!(stats.frames_received, 1);
}
#[test]
fn test_filter_chain_success_callback() {
let chain = FilterChain::new(FilterChainConfig::enabled());
let mut envelope = make_envelope();
chain.send(&mut envelope);
chain.on_send_success(1);
}
#[test]
fn test_filter_chain_failure_callback() {
let chain = FilterChain::new(FilterChainConfig::enabled());
let mut envelope = make_envelope();
chain.send(&mut envelope);
chain.on_send_failure(1, "test error");
}
#[test]
fn test_filter_chain_debug() {
let chain = FilterChain::new(FilterChainConfig::enabled());
let debug_str = format!("{:?}", chain);
assert!(debug_str.contains("FilterChain"));
assert!(debug_str.contains("enabled"));
}
#[test]
fn test_filter_chain_config_validate() {
let config = FilterChainConfig::enabled();
assert!(config.validate().is_ok());
}
#[test]
fn test_frame_envelope_creation() {
let envelope = make_envelope();
assert_eq!(envelope.channel_id, 1);
assert_eq!(envelope.priority, QueuePriority::Normal);
assert!(envelope.requires_ack);
assert!(envelope.frame_size_bytes > 0);
assert_eq!(envelope.retry_count, 0);
assert_eq!(envelope.accumulated_delay, Duration::ZERO);
}
#[test]
fn test_frame_envelope_builders() {
let envelope = make_envelope()
.with_priority(QueuePriority::High)
.with_ack_required(false);
assert_eq!(envelope.priority, QueuePriority::High);
assert!(!envelope.requires_ack);
}
#[test]
fn test_filter_result_variants() {
let pass = FilterResult::pass();
assert!(pass.should_continue());
let delayed = FilterResult::pass_with_delay(Duration::from_millis(50));
assert!(delayed.should_continue());
let queued = FilterResult::Queued;
assert!(!queued.should_continue());
let dropped = FilterResult::Dropped {
reason: "test".to_string(),
};
assert!(!dropped.should_continue());
let error = FilterResult::Error {
message: "test error".to_string(),
};
assert!(!error.should_continue());
}
#[test]
fn test_filter_direction_display() {
assert_eq!(FilterDirection::Send.to_string(), "send");
assert_eq!(FilterDirection::Recv.to_string(), "recv");
}
#[test]
fn test_drain_pending_disabled() {
let chain = FilterChain::new(FilterChainConfig::default());
let drained = chain.drain_pending(1, 10);
assert!(drained.is_empty());
}
#[test]
fn test_has_pending_disabled() {
let chain = FilterChain::new(FilterChainConfig::default());
assert!(!chain.has_pending(1));
}
#[test]
fn test_pending_count_disabled() {
let chain = FilterChain::new(FilterChainConfig::default());
assert_eq!(chain.pending_count(), 0);
}
}