1use std::fmt;
18use std::net::SocketAddr;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use serde::{Deserialize, Serialize};
24use tracing::{debug, trace};
25
26use crate::cemi::CemiFrame;
27
28use super::pace::{PaceFilter, PaceFilterConfig};
29use super::queue::{QueueFilter, QueueFilterConfig, QueuePriority};
30use super::retry::{RetryFilter, RetryFilterConfig};
31
32#[derive(Debug, Clone)]
42pub struct FrameEnvelope {
43 pub cemi: CemiFrame,
45 pub channel_id: u8,
47 pub target_addr: SocketAddr,
49 pub priority: QueuePriority,
51 pub enqueued_at: Instant,
53 pub accumulated_delay: Duration,
55 pub retry_count: u8,
57 pub requires_ack: bool,
59 pub frame_size_bytes: usize,
61}
62
63impl FrameEnvelope {
64 pub fn new(
66 cemi: CemiFrame,
67 channel_id: u8,
68 target_addr: SocketAddr,
69 ) -> Self {
70 let frame_size_bytes = cemi.encode().len();
71 Self {
72 cemi,
73 channel_id,
74 target_addr,
75 priority: QueuePriority::Normal,
76 enqueued_at: Instant::now(),
77 accumulated_delay: Duration::ZERO,
78 retry_count: 0,
79 requires_ack: true,
80 frame_size_bytes,
81 }
82 }
83
84 pub fn with_priority(mut self, priority: QueuePriority) -> Self {
86 self.priority = priority;
87 self
88 }
89
90 pub fn with_ack_required(mut self, requires_ack: bool) -> Self {
92 self.requires_ack = requires_ack;
93 self
94 }
95
96 pub fn pipeline_duration(&self) -> Duration {
98 self.enqueued_at.elapsed()
99 }
100}
101
102#[derive(Debug, Clone)]
108pub enum FilterResult {
109 Pass {
111 delay: Duration,
113 },
114 Queued,
117 Dropped {
119 reason: String,
121 },
122 Error {
124 message: String,
126 },
127}
128
129impl FilterResult {
130 pub fn pass() -> Self {
132 Self::Pass {
133 delay: Duration::ZERO,
134 }
135 }
136
137 pub fn pass_with_delay(delay: Duration) -> Self {
139 Self::Pass { delay }
140 }
141
142 pub fn should_continue(&self) -> bool {
144 matches!(self, Self::Pass { .. })
145 }
146}
147
148#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub enum FilterDirection {
155 Send,
157 Recv,
159}
160
161impl fmt::Display for FilterDirection {
162 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 match self {
164 Self::Send => write!(f, "send"),
165 Self::Recv => write!(f, "recv"),
166 }
167 }
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct FilterChainConfig {
177 #[serde(default)]
180 pub enabled: bool,
181
182 #[serde(default)]
184 pub pace: PaceFilterConfig,
185
186 #[serde(default)]
188 pub queue: QueueFilterConfig,
189
190 #[serde(default)]
192 pub retry: RetryFilterConfig,
193}
194
195impl Default for FilterChainConfig {
196 fn default() -> Self {
197 Self {
198 enabled: false,
199 pace: PaceFilterConfig::default(),
200 queue: QueueFilterConfig::default(),
201 retry: RetryFilterConfig::default(),
202 }
203 }
204}
205
206impl FilterChainConfig {
207 pub fn enabled() -> Self {
209 Self {
210 enabled: true,
211 ..Default::default()
212 }
213 }
214
215 pub fn validate(&self) -> Result<(), String> {
217 self.pace.validate()?;
218 self.queue.validate()?;
219 self.retry.validate()?;
220 Ok(())
221 }
222}
223
224#[derive(Debug, Default)]
230pub struct FilterChainStats {
231 pub frames_sent: AtomicU64,
233 pub frames_received: AtomicU64,
235 pub frames_dropped: AtomicU64,
237 pub frames_queued: AtomicU64,
239 pub total_delay_us: AtomicU64,
241 pub bypass_count: AtomicU64,
243}
244
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub struct FilterChainStatsSnapshot {
248 pub frames_sent: u64,
249 pub frames_received: u64,
250 pub frames_dropped: u64,
251 pub frames_queued: u64,
252 pub total_delay_us: u64,
253 pub bypass_count: u64,
254}
255
256pub struct FilterChain {
273 config: FilterChainConfig,
274 queue_filter: QueueFilter,
275 pace_filter: PaceFilter,
276 retry_filter: RetryFilter,
277 stats: Arc<FilterChainStats>,
278}
279
280impl FilterChain {
281 pub fn new(config: FilterChainConfig) -> Self {
283 Self {
284 queue_filter: QueueFilter::new(config.queue.clone()),
285 pace_filter: PaceFilter::new(config.pace.clone()),
286 retry_filter: RetryFilter::new(config.retry.clone()),
287 config,
288 stats: Arc::new(FilterChainStats::default()),
289 }
290 }
291
292 pub fn is_enabled(&self) -> bool {
294 self.config.enabled
295 }
296
297 pub fn send(&self, envelope: &mut FrameEnvelope) -> FilterResult {
304 if !self.config.enabled {
305 self.stats.bypass_count.fetch_add(1, Ordering::Relaxed);
306 return FilterResult::pass();
307 }
308
309 self.stats.frames_sent.fetch_add(1, Ordering::Relaxed);
310
311 let queue_result = self.queue_filter.process_send(envelope);
313 if !queue_result.should_continue() {
314 match &queue_result {
315 FilterResult::Queued => {
316 self.stats.frames_queued.fetch_add(1, Ordering::Relaxed);
317 trace!(
318 channel_id = envelope.channel_id,
319 priority = ?envelope.priority,
320 "Frame queued by QueueFilter"
321 );
322 }
323 FilterResult::Dropped { reason } => {
324 self.stats.frames_dropped.fetch_add(1, Ordering::Relaxed);
325 debug!(
326 channel_id = envelope.channel_id,
327 reason = %reason,
328 "Frame dropped by QueueFilter"
329 );
330 }
331 _ => {}
332 }
333 return queue_result;
334 }
335
336 let pace_result = self.pace_filter.process_send(envelope);
338 if !pace_result.should_continue() {
339 match &pace_result {
340 FilterResult::Dropped { reason } => {
341 self.stats.frames_dropped.fetch_add(1, Ordering::Relaxed);
342 debug!(
343 channel_id = envelope.channel_id,
344 reason = %reason,
345 "Frame dropped by PaceFilter"
346 );
347 }
348 _ => {}
349 }
350 return pace_result;
351 }
352 if let FilterResult::Pass { delay } = &pace_result {
353 envelope.accumulated_delay += *delay;
354 }
355
356 let retry_result = self.retry_filter.process_send(envelope);
358 if !retry_result.should_continue() {
359 match &retry_result {
360 FilterResult::Dropped { reason } => {
361 self.stats.frames_dropped.fetch_add(1, Ordering::Relaxed);
362 debug!(
363 channel_id = envelope.channel_id,
364 reason = %reason,
365 "Frame dropped by RetryFilter (circuit breaker)"
366 );
367 }
368 _ => {}
369 }
370 return retry_result;
371 }
372 if let FilterResult::Pass { delay } = &retry_result {
373 envelope.accumulated_delay += *delay;
374 }
375
376 let total_delay = envelope.accumulated_delay;
378 if total_delay > Duration::ZERO {
379 self.stats
380 .total_delay_us
381 .fetch_add(total_delay.as_micros() as u64, Ordering::Relaxed);
382 }
383
384 trace!(
385 channel_id = envelope.channel_id,
386 delay_ms = total_delay.as_millis(),
387 priority = ?envelope.priority,
388 "Frame passed through filter chain (send)"
389 );
390
391 FilterResult::pass_with_delay(total_delay)
392 }
393
394 pub fn recv(&self, envelope: &mut FrameEnvelope) -> FilterResult {
401 if !self.config.enabled {
402 self.stats.bypass_count.fetch_add(1, Ordering::Relaxed);
403 return FilterResult::pass();
404 }
405
406 self.stats.frames_received.fetch_add(1, Ordering::Relaxed);
407
408 let retry_result = self.retry_filter.process_recv(envelope);
410 if !retry_result.should_continue() {
411 return retry_result;
412 }
413
414 let pace_result = self.pace_filter.process_recv(envelope);
416 if !pace_result.should_continue() {
417 return pace_result;
418 }
419
420 let queue_result = self.queue_filter.process_recv(envelope);
422 if !queue_result.should_continue() {
423 return queue_result;
424 }
425
426 trace!(
427 channel_id = envelope.channel_id,
428 "Frame passed through filter chain (recv)"
429 );
430
431 FilterResult::pass()
432 }
433
434 pub fn on_send_success(&self, channel_id: u8) {
439 if !self.config.enabled {
440 return;
441 }
442 self.retry_filter.on_success();
443 self.pace_filter.on_frame_completed();
444 self.queue_filter.on_ack_received(channel_id);
445
446 trace!(channel_id, "Filter chain: send success");
447 }
448
449 pub fn on_send_failure(&self, channel_id: u8, error: &str) {
454 if !self.config.enabled {
455 return;
456 }
457 self.retry_filter.on_failure();
458 self.queue_filter.on_send_error(channel_id);
459
460 debug!(
461 channel_id,
462 error,
463 "Filter chain: send failure"
464 );
465 }
466
467 pub fn drain_pending(&self, channel_id: u8, max_count: usize) -> Vec<FrameEnvelope> {
473 if !self.config.enabled {
474 return Vec::new();
475 }
476 self.queue_filter.drain(channel_id, max_count)
477 }
478
479 pub fn has_pending(&self, channel_id: u8) -> bool {
481 if !self.config.enabled {
482 return false;
483 }
484 self.queue_filter.has_pending(channel_id)
485 }
486
487 pub fn pending_count(&self) -> usize {
489 if !self.config.enabled {
490 return 0;
491 }
492 self.queue_filter.total_pending()
493 }
494
495 pub fn pace_state(&self) -> super::pace::PaceState {
497 self.pace_filter.state()
498 }
499
500 pub fn circuit_breaker_state(&self) -> super::retry::CircuitBreakerState {
502 self.retry_filter.circuit_state()
503 }
504
505 pub fn queue_filter(&self) -> &QueueFilter {
507 &self.queue_filter
508 }
509
510 pub fn pace_filter(&self) -> &PaceFilter {
512 &self.pace_filter
513 }
514
515 pub fn retry_filter(&self) -> &RetryFilter {
517 &self.retry_filter
518 }
519
520 pub fn stats_snapshot(&self) -> FilterChainStatsSnapshot {
522 FilterChainStatsSnapshot {
523 frames_sent: self.stats.frames_sent.load(Ordering::Relaxed),
524 frames_received: self.stats.frames_received.load(Ordering::Relaxed),
525 frames_dropped: self.stats.frames_dropped.load(Ordering::Relaxed),
526 frames_queued: self.stats.frames_queued.load(Ordering::Relaxed),
527 total_delay_us: self.stats.total_delay_us.load(Ordering::Relaxed),
528 bypass_count: self.stats.bypass_count.load(Ordering::Relaxed),
529 }
530 }
531}
532
533impl fmt::Debug for FilterChain {
534 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
535 f.debug_struct("FilterChain")
536 .field("enabled", &self.config.enabled)
537 .field("pace_state", &self.pace_filter.state())
538 .field("circuit_breaker", &self.retry_filter.circuit_state())
539 .field("pending_frames", &self.queue_filter.total_pending())
540 .finish()
541 }
542}
543
544#[cfg(test)]
549mod tests {
550 use super::*;
551 use crate::address::{GroupAddress, IndividualAddress};
552 use crate::cemi::CemiFrame;
553
554 fn make_envelope() -> FrameEnvelope {
555 let cemi = CemiFrame::group_value_write(
556 IndividualAddress::new(1, 1, 1),
557 GroupAddress::three_level(1, 0, 1),
558 vec![0x01],
559 );
560 FrameEnvelope::new(
561 cemi,
562 1,
563 "192.168.1.100:3671".parse().unwrap(),
564 )
565 }
566
567 #[test]
568 fn test_filter_chain_disabled() {
569 let chain = FilterChain::new(FilterChainConfig::default());
570 assert!(!chain.is_enabled());
571
572 let mut envelope = make_envelope();
573 let result = chain.send(&mut envelope);
574
575 assert!(matches!(result, FilterResult::Pass { delay } if delay == Duration::ZERO));
576
577 let stats = chain.stats_snapshot();
578 assert_eq!(stats.bypass_count, 1);
579 assert_eq!(stats.frames_sent, 0);
580 }
581
582 #[test]
583 fn test_filter_chain_enabled_passthrough() {
584 let chain = FilterChain::new(FilterChainConfig::enabled());
585 assert!(chain.is_enabled());
586
587 let mut envelope = make_envelope();
588 let result = chain.send(&mut envelope);
589
590 assert!(result.should_continue());
591
592 let stats = chain.stats_snapshot();
593 assert_eq!(stats.frames_sent, 1);
594 assert_eq!(stats.bypass_count, 0);
595 }
596
597 #[test]
598 fn test_filter_chain_send_recv_cycle() {
599 let chain = FilterChain::new(FilterChainConfig::enabled());
600
601 let mut send_env = make_envelope();
603 let send_result = chain.send(&mut send_env);
604 assert!(send_result.should_continue());
605
606 let mut recv_env = make_envelope();
608 let recv_result = chain.recv(&mut recv_env);
609 assert!(recv_result.should_continue());
610
611 let stats = chain.stats_snapshot();
612 assert_eq!(stats.frames_sent, 1);
613 assert_eq!(stats.frames_received, 1);
614 }
615
616 #[test]
617 fn test_filter_chain_success_callback() {
618 let chain = FilterChain::new(FilterChainConfig::enabled());
619
620 let mut envelope = make_envelope();
621 chain.send(&mut envelope);
622 chain.on_send_success(1);
623
624 }
626
627 #[test]
628 fn test_filter_chain_failure_callback() {
629 let chain = FilterChain::new(FilterChainConfig::enabled());
630
631 let mut envelope = make_envelope();
632 chain.send(&mut envelope);
633 chain.on_send_failure(1, "test error");
634
635 }
637
638 #[test]
639 fn test_filter_chain_debug() {
640 let chain = FilterChain::new(FilterChainConfig::enabled());
641 let debug_str = format!("{:?}", chain);
642 assert!(debug_str.contains("FilterChain"));
643 assert!(debug_str.contains("enabled"));
644 }
645
646 #[test]
647 fn test_filter_chain_config_validate() {
648 let config = FilterChainConfig::enabled();
649 assert!(config.validate().is_ok());
650 }
651
652 #[test]
653 fn test_frame_envelope_creation() {
654 let envelope = make_envelope();
655 assert_eq!(envelope.channel_id, 1);
656 assert_eq!(envelope.priority, QueuePriority::Normal);
657 assert!(envelope.requires_ack);
658 assert!(envelope.frame_size_bytes > 0);
659 assert_eq!(envelope.retry_count, 0);
660 assert_eq!(envelope.accumulated_delay, Duration::ZERO);
661 }
662
663 #[test]
664 fn test_frame_envelope_builders() {
665 let envelope = make_envelope()
666 .with_priority(QueuePriority::High)
667 .with_ack_required(false);
668
669 assert_eq!(envelope.priority, QueuePriority::High);
670 assert!(!envelope.requires_ack);
671 }
672
673 #[test]
674 fn test_filter_result_variants() {
675 let pass = FilterResult::pass();
676 assert!(pass.should_continue());
677
678 let delayed = FilterResult::pass_with_delay(Duration::from_millis(50));
679 assert!(delayed.should_continue());
680
681 let queued = FilterResult::Queued;
682 assert!(!queued.should_continue());
683
684 let dropped = FilterResult::Dropped {
685 reason: "test".to_string(),
686 };
687 assert!(!dropped.should_continue());
688
689 let error = FilterResult::Error {
690 message: "test error".to_string(),
691 };
692 assert!(!error.should_continue());
693 }
694
695 #[test]
696 fn test_filter_direction_display() {
697 assert_eq!(FilterDirection::Send.to_string(), "send");
698 assert_eq!(FilterDirection::Recv.to_string(), "recv");
699 }
700
701 #[test]
702 fn test_drain_pending_disabled() {
703 let chain = FilterChain::new(FilterChainConfig::default());
704 let drained = chain.drain_pending(1, 10);
705 assert!(drained.is_empty());
706 }
707
708 #[test]
709 fn test_has_pending_disabled() {
710 let chain = FilterChain::new(FilterChainConfig::default());
711 assert!(!chain.has_pending(1));
712 }
713
714 #[test]
715 fn test_pending_count_disabled() {
716 let chain = FilterChain::new(FilterChainConfig::default());
717 assert_eq!(chain.pending_count(), 0);
718 }
719}