Skip to main content

mabi_knx/filter/
chain.rs

1//! Bidirectional filter chain pipeline.
2//!
3//! The [`FilterChain`] orchestrates frame processing through a pipeline of
4//! filters in both send (forward) and receive (reverse) directions.
5//!
6//! ```text
7//! send():  Client → Filter[0] → Filter[1] → ... → Filter[N] → Bus
8//! recv():  Client ← Filter[0] ← Filter[1] ← ... ← Filter[N] ← Bus
9//! ```
10//!
11//! Each filter can:
12//! - **Pass** the frame through (possibly with delay)
13//! - **Queue** the frame for later delivery
14//! - **Drop** the frame (e.g., circuit breaker open)
15//! - **Error** with a specific reason
16
17use std::fmt;
18use std::net::SocketAddr;
19use std::sync::atomic::{AtomicU64, Ordering};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use serde::{Deserialize, Serialize};
24use tracing::{debug, trace};
25
26use crate::cemi::CemiFrame;
27
28use super::pace::{PaceFilter, PaceFilterConfig};
29use super::queue::{QueueFilter, QueueFilterConfig, QueuePriority};
30use super::retry::{RetryFilter, RetryFilterConfig};
31
32// ============================================================================
33// Frame Envelope — carries frame + metadata through the pipeline
34// ============================================================================
35
36/// Frame envelope carrying a cEMI frame with pipeline metadata.
37///
38/// The envelope tracks the frame's journey through the filter chain,
39/// including original timestamps, priority, channel association, and
40/// the number of pipeline stages it has passed through.
41#[derive(Debug, Clone)]
42pub struct FrameEnvelope {
43    /// The cEMI frame payload.
44    pub cemi: CemiFrame,
45    /// Channel ID of the originating connection.
46    pub channel_id: u8,
47    /// Target client address for delivery.
48    pub target_addr: SocketAddr,
49    /// Frame priority (determined by QueueFilter classification).
50    pub priority: QueuePriority,
51    /// Timestamp when the frame entered the pipeline.
52    pub enqueued_at: Instant,
53    /// Accumulated delay from all filters in the chain.
54    pub accumulated_delay: Duration,
55    /// Number of retry attempts (managed by RetryFilter).
56    pub retry_count: u8,
57    /// Whether this frame requires an ACK from the client.
58    pub requires_ack: bool,
59    /// Original frame size in bytes (for PaceFilter timing calculation).
60    pub frame_size_bytes: usize,
61}
62
63impl FrameEnvelope {
64    /// Create a new frame envelope.
65    pub fn new(
66        cemi: CemiFrame,
67        channel_id: u8,
68        target_addr: SocketAddr,
69    ) -> Self {
70        let frame_size_bytes = cemi.encode().len();
71        Self {
72            cemi,
73            channel_id,
74            target_addr,
75            priority: QueuePriority::Normal,
76            enqueued_at: Instant::now(),
77            accumulated_delay: Duration::ZERO,
78            retry_count: 0,
79            requires_ack: true,
80            frame_size_bytes,
81        }
82    }
83
84    /// Create with explicit priority.
85    pub fn with_priority(mut self, priority: QueuePriority) -> Self {
86        self.priority = priority;
87        self
88    }
89
90    /// Create with ACK requirement flag.
91    pub fn with_ack_required(mut self, requires_ack: bool) -> Self {
92        self.requires_ack = requires_ack;
93        self
94    }
95
96    /// Get total time in pipeline.
97    pub fn pipeline_duration(&self) -> Duration {
98        self.enqueued_at.elapsed()
99    }
100}
101
102// ============================================================================
103// Filter Result
104// ============================================================================
105
106/// Result of a filter processing a frame.
107#[derive(Debug, Clone)]
108pub enum FilterResult {
109    /// Frame passed through, optionally with added delay.
110    Pass {
111        /// Additional delay to apply before actual transmission.
112        delay: Duration,
113    },
114    /// Frame was queued internally — will be delivered later.
115    /// The pipeline should not proceed; the filter owns the frame.
116    Queued,
117    /// Frame was dropped (e.g., circuit breaker open, queue full).
118    Dropped {
119        /// Human-readable reason for dropping.
120        reason: String,
121    },
122    /// An error occurred during filtering.
123    Error {
124        /// Error description.
125        message: String,
126    },
127}
128
129impl FilterResult {
130    /// Convenience for pass with no delay.
131    pub fn pass() -> Self {
132        Self::Pass {
133            delay: Duration::ZERO,
134        }
135    }
136
137    /// Convenience for pass with delay.
138    pub fn pass_with_delay(delay: Duration) -> Self {
139        Self::Pass { delay }
140    }
141
142    /// Whether the frame should continue through the pipeline.
143    pub fn should_continue(&self) -> bool {
144        matches!(self, Self::Pass { .. })
145    }
146}
147
148// ============================================================================
149// Filter Direction
150// ============================================================================
151
152/// Direction of frame flow through the filter chain.
153#[derive(Debug, Clone, Copy, PartialEq, Eq)]
154pub enum FilterDirection {
155    /// Client → Bus (outbound). Filters are applied in forward order.
156    Send,
157    /// Bus → Client (inbound). Filters are applied in reverse order.
158    Recv,
159}
160
161impl fmt::Display for FilterDirection {
162    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163        match self {
164            Self::Send => write!(f, "send"),
165            Self::Recv => write!(f, "recv"),
166        }
167    }
168}
169
170// ============================================================================
171// Filter Chain Configuration
172// ============================================================================
173
174/// Configuration for the complete filter chain.
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct FilterChainConfig {
177    /// Whether the filter chain is enabled at all.
178    /// When disabled, frames pass through with zero overhead.
179    #[serde(default)]
180    pub enabled: bool,
181
182    /// PaceFilter configuration (bus timing simulation).
183    #[serde(default)]
184    pub pace: PaceFilterConfig,
185
186    /// QueueFilter configuration (priority queuing).
187    #[serde(default)]
188    pub queue: QueueFilterConfig,
189
190    /// RetryFilter configuration (retry + circuit breaker).
191    #[serde(default)]
192    pub retry: RetryFilterConfig,
193}
194
195impl Default for FilterChainConfig {
196    fn default() -> Self {
197        Self {
198            enabled: false,
199            pace: PaceFilterConfig::default(),
200            queue: QueueFilterConfig::default(),
201            retry: RetryFilterConfig::default(),
202        }
203    }
204}
205
206impl FilterChainConfig {
207    /// Create an enabled filter chain with default sub-filter configs.
208    pub fn enabled() -> Self {
209        Self {
210            enabled: true,
211            ..Default::default()
212        }
213    }
214
215    /// Validate the configuration.
216    pub fn validate(&self) -> Result<(), String> {
217        self.pace.validate()?;
218        self.queue.validate()?;
219        self.retry.validate()?;
220        Ok(())
221    }
222}
223
224// ============================================================================
225// Filter Chain Statistics
226// ============================================================================
227
228/// Lock-free filter chain statistics.
229#[derive(Debug, Default)]
230pub struct FilterChainStats {
231    /// Total frames processed in send direction.
232    pub frames_sent: AtomicU64,
233    /// Total frames processed in recv direction.
234    pub frames_received: AtomicU64,
235    /// Total frames dropped across all filters.
236    pub frames_dropped: AtomicU64,
237    /// Total frames queued across all filters.
238    pub frames_queued: AtomicU64,
239    /// Total accumulated delay applied (in microseconds).
240    pub total_delay_us: AtomicU64,
241    /// Total filter chain bypass (disabled) count.
242    pub bypass_count: AtomicU64,
243}
244
245/// Snapshot of filter chain statistics.
246#[derive(Debug, Clone, Copy, PartialEq, Eq)]
247pub struct FilterChainStatsSnapshot {
248    pub frames_sent: u64,
249    pub frames_received: u64,
250    pub frames_dropped: u64,
251    pub frames_queued: u64,
252    pub total_delay_us: u64,
253    pub bypass_count: u64,
254}
255
256// ============================================================================
257// Filter Chain
258// ============================================================================
259
260/// Bidirectional filter chain for KNX bus flow control.
261///
262/// The chain processes frames through three stages:
263/// 1. **QueueFilter**: Priority-based queuing with backpressure
264/// 2. **PaceFilter**: Bus timing enforcement (inter-frame delay)
265/// 3. **RetryFilter**: Retry logic with circuit breaker protection
266///
267/// In the send direction (Client → Bus), filters are applied in order:
268///   Queue → Pace → Retry
269///
270/// In the recv direction (Bus → Client), filters are applied in reverse:
271///   Retry → Pace → Queue
272pub struct FilterChain {
273    config: FilterChainConfig,
274    queue_filter: QueueFilter,
275    pace_filter: PaceFilter,
276    retry_filter: RetryFilter,
277    stats: Arc<FilterChainStats>,
278}
279
280impl FilterChain {
281    /// Create a new filter chain with the given configuration.
282    pub fn new(config: FilterChainConfig) -> Self {
283        Self {
284            queue_filter: QueueFilter::new(config.queue.clone()),
285            pace_filter: PaceFilter::new(config.pace.clone()),
286            retry_filter: RetryFilter::new(config.retry.clone()),
287            config,
288            stats: Arc::new(FilterChainStats::default()),
289        }
290    }
291
292    /// Check if the filter chain is enabled.
293    pub fn is_enabled(&self) -> bool {
294        self.config.enabled
295    }
296
297    /// Process a frame through the send (forward) direction.
298    ///
299    /// Pipeline order: Queue → Pace → Retry
300    ///
301    /// Returns the total delay to apply before transmission, or an error
302    /// if the frame was dropped/queued.
303    pub fn send(&self, envelope: &mut FrameEnvelope) -> FilterResult {
304        if !self.config.enabled {
305            self.stats.bypass_count.fetch_add(1, Ordering::Relaxed);
306            return FilterResult::pass();
307        }
308
309        self.stats.frames_sent.fetch_add(1, Ordering::Relaxed);
310
311        // Stage 1: QueueFilter — priority classification + backpressure
312        let queue_result = self.queue_filter.process_send(envelope);
313        if !queue_result.should_continue() {
314            match &queue_result {
315                FilterResult::Queued => {
316                    self.stats.frames_queued.fetch_add(1, Ordering::Relaxed);
317                    trace!(
318                        channel_id = envelope.channel_id,
319                        priority = ?envelope.priority,
320                        "Frame queued by QueueFilter"
321                    );
322                }
323                FilterResult::Dropped { reason } => {
324                    self.stats.frames_dropped.fetch_add(1, Ordering::Relaxed);
325                    debug!(
326                        channel_id = envelope.channel_id,
327                        reason = %reason,
328                        "Frame dropped by QueueFilter"
329                    );
330                }
331                _ => {}
332            }
333            return queue_result;
334        }
335
336        // Stage 2: PaceFilter — bus timing enforcement
337        let pace_result = self.pace_filter.process_send(envelope);
338        if !pace_result.should_continue() {
339            match &pace_result {
340                FilterResult::Dropped { reason } => {
341                    self.stats.frames_dropped.fetch_add(1, Ordering::Relaxed);
342                    debug!(
343                        channel_id = envelope.channel_id,
344                        reason = %reason,
345                        "Frame dropped by PaceFilter"
346                    );
347                }
348                _ => {}
349            }
350            return pace_result;
351        }
352        if let FilterResult::Pass { delay } = &pace_result {
353            envelope.accumulated_delay += *delay;
354        }
355
356        // Stage 3: RetryFilter — circuit breaker check
357        let retry_result = self.retry_filter.process_send(envelope);
358        if !retry_result.should_continue() {
359            match &retry_result {
360                FilterResult::Dropped { reason } => {
361                    self.stats.frames_dropped.fetch_add(1, Ordering::Relaxed);
362                    debug!(
363                        channel_id = envelope.channel_id,
364                        reason = %reason,
365                        "Frame dropped by RetryFilter (circuit breaker)"
366                    );
367                }
368                _ => {}
369            }
370            return retry_result;
371        }
372        if let FilterResult::Pass { delay } = &retry_result {
373            envelope.accumulated_delay += *delay;
374        }
375
376        // Record total accumulated delay
377        let total_delay = envelope.accumulated_delay;
378        if total_delay > Duration::ZERO {
379            self.stats
380                .total_delay_us
381                .fetch_add(total_delay.as_micros() as u64, Ordering::Relaxed);
382        }
383
384        trace!(
385            channel_id = envelope.channel_id,
386            delay_ms = total_delay.as_millis(),
387            priority = ?envelope.priority,
388            "Frame passed through filter chain (send)"
389        );
390
391        FilterResult::pass_with_delay(total_delay)
392    }
393
394    /// Process a frame through the recv (reverse) direction.
395    ///
396    /// Pipeline order: Retry → Pace → Queue
397    ///
398    /// For the receive path, we apply filters in reverse to maintain
399    /// the bidirectional pipeline semantics.
400    pub fn recv(&self, envelope: &mut FrameEnvelope) -> FilterResult {
401        if !self.config.enabled {
402            self.stats.bypass_count.fetch_add(1, Ordering::Relaxed);
403            return FilterResult::pass();
404        }
405
406        self.stats.frames_received.fetch_add(1, Ordering::Relaxed);
407
408        // Stage 1 (reverse): RetryFilter — record successful delivery
409        let retry_result = self.retry_filter.process_recv(envelope);
410        if !retry_result.should_continue() {
411            return retry_result;
412        }
413
414        // Stage 2 (reverse): PaceFilter — update timing state
415        let pace_result = self.pace_filter.process_recv(envelope);
416        if !pace_result.should_continue() {
417            return pace_result;
418        }
419
420        // Stage 3 (reverse): QueueFilter — dequeue acknowledgment
421        let queue_result = self.queue_filter.process_recv(envelope);
422        if !queue_result.should_continue() {
423            return queue_result;
424        }
425
426        trace!(
427            channel_id = envelope.channel_id,
428            "Frame passed through filter chain (recv)"
429        );
430
431        FilterResult::pass()
432    }
433
434    /// Notify the filter chain that a frame transmission succeeded.
435    ///
436    /// This feeds back into the RetryFilter's circuit breaker and
437    /// the PaceFilter's state machine.
438    pub fn on_send_success(&self, channel_id: u8) {
439        if !self.config.enabled {
440            return;
441        }
442        self.retry_filter.on_success();
443        self.pace_filter.on_frame_completed();
444        self.queue_filter.on_ack_received(channel_id);
445
446        trace!(channel_id, "Filter chain: send success");
447    }
448
449    /// Notify the filter chain that a frame transmission failed.
450    ///
451    /// This feeds into the RetryFilter's circuit breaker to potentially
452    /// trip it open.
453    pub fn on_send_failure(&self, channel_id: u8, error: &str) {
454        if !self.config.enabled {
455            return;
456        }
457        self.retry_filter.on_failure();
458        self.queue_filter.on_send_error(channel_id);
459
460        debug!(
461            channel_id,
462            error,
463            "Filter chain: send failure"
464        );
465    }
466
467    /// Drain pending frames from the QueueFilter for a specific channel.
468    ///
469    /// Returns frames in priority order (High → Normal → Low).
470    /// Used by the server to poll for queued frames when the connection
471    /// transitions from WaitingForAck back to Idle.
472    pub fn drain_pending(&self, channel_id: u8, max_count: usize) -> Vec<FrameEnvelope> {
473        if !self.config.enabled {
474            return Vec::new();
475        }
476        self.queue_filter.drain(channel_id, max_count)
477    }
478
479    /// Check if the chain has pending frames for a channel.
480    pub fn has_pending(&self, channel_id: u8) -> bool {
481        if !self.config.enabled {
482            return false;
483        }
484        self.queue_filter.has_pending(channel_id)
485    }
486
487    /// Get the total pending frame count across all channels.
488    pub fn pending_count(&self) -> usize {
489        if !self.config.enabled {
490            return 0;
491        }
492        self.queue_filter.total_pending()
493    }
494
495    /// Get the current PaceFilter state.
496    pub fn pace_state(&self) -> super::pace::PaceState {
497        self.pace_filter.state()
498    }
499
500    /// Get the current CircuitBreaker state.
501    pub fn circuit_breaker_state(&self) -> super::retry::CircuitBreakerState {
502        self.retry_filter.circuit_state()
503    }
504
505    /// Get a reference to the QueueFilter for inspection.
506    pub fn queue_filter(&self) -> &QueueFilter {
507        &self.queue_filter
508    }
509
510    /// Get a reference to the PaceFilter for inspection.
511    pub fn pace_filter(&self) -> &PaceFilter {
512        &self.pace_filter
513    }
514
515    /// Get a reference to the RetryFilter for inspection.
516    pub fn retry_filter(&self) -> &RetryFilter {
517        &self.retry_filter
518    }
519
520    /// Get a snapshot of the filter chain statistics.
521    pub fn stats_snapshot(&self) -> FilterChainStatsSnapshot {
522        FilterChainStatsSnapshot {
523            frames_sent: self.stats.frames_sent.load(Ordering::Relaxed),
524            frames_received: self.stats.frames_received.load(Ordering::Relaxed),
525            frames_dropped: self.stats.frames_dropped.load(Ordering::Relaxed),
526            frames_queued: self.stats.frames_queued.load(Ordering::Relaxed),
527            total_delay_us: self.stats.total_delay_us.load(Ordering::Relaxed),
528            bypass_count: self.stats.bypass_count.load(Ordering::Relaxed),
529        }
530    }
531}
532
533impl fmt::Debug for FilterChain {
534    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
535        f.debug_struct("FilterChain")
536            .field("enabled", &self.config.enabled)
537            .field("pace_state", &self.pace_filter.state())
538            .field("circuit_breaker", &self.retry_filter.circuit_state())
539            .field("pending_frames", &self.queue_filter.total_pending())
540            .finish()
541    }
542}
543
544// ============================================================================
545// Tests
546// ============================================================================
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551    use crate::address::{GroupAddress, IndividualAddress};
552    use crate::cemi::CemiFrame;
553
554    fn make_envelope() -> FrameEnvelope {
555        let cemi = CemiFrame::group_value_write(
556            IndividualAddress::new(1, 1, 1),
557            GroupAddress::three_level(1, 0, 1),
558            vec![0x01],
559        );
560        FrameEnvelope::new(
561            cemi,
562            1,
563            "192.168.1.100:3671".parse().unwrap(),
564        )
565    }
566
567    #[test]
568    fn test_filter_chain_disabled() {
569        let chain = FilterChain::new(FilterChainConfig::default());
570        assert!(!chain.is_enabled());
571
572        let mut envelope = make_envelope();
573        let result = chain.send(&mut envelope);
574
575        assert!(matches!(result, FilterResult::Pass { delay } if delay == Duration::ZERO));
576
577        let stats = chain.stats_snapshot();
578        assert_eq!(stats.bypass_count, 1);
579        assert_eq!(stats.frames_sent, 0);
580    }
581
582    #[test]
583    fn test_filter_chain_enabled_passthrough() {
584        let chain = FilterChain::new(FilterChainConfig::enabled());
585        assert!(chain.is_enabled());
586
587        let mut envelope = make_envelope();
588        let result = chain.send(&mut envelope);
589
590        assert!(result.should_continue());
591
592        let stats = chain.stats_snapshot();
593        assert_eq!(stats.frames_sent, 1);
594        assert_eq!(stats.bypass_count, 0);
595    }
596
597    #[test]
598    fn test_filter_chain_send_recv_cycle() {
599        let chain = FilterChain::new(FilterChainConfig::enabled());
600
601        // Send
602        let mut send_env = make_envelope();
603        let send_result = chain.send(&mut send_env);
604        assert!(send_result.should_continue());
605
606        // Recv
607        let mut recv_env = make_envelope();
608        let recv_result = chain.recv(&mut recv_env);
609        assert!(recv_result.should_continue());
610
611        let stats = chain.stats_snapshot();
612        assert_eq!(stats.frames_sent, 1);
613        assert_eq!(stats.frames_received, 1);
614    }
615
616    #[test]
617    fn test_filter_chain_success_callback() {
618        let chain = FilterChain::new(FilterChainConfig::enabled());
619
620        let mut envelope = make_envelope();
621        chain.send(&mut envelope);
622        chain.on_send_success(1);
623
624        // No assertion needed — just ensure no panic
625    }
626
627    #[test]
628    fn test_filter_chain_failure_callback() {
629        let chain = FilterChain::new(FilterChainConfig::enabled());
630
631        let mut envelope = make_envelope();
632        chain.send(&mut envelope);
633        chain.on_send_failure(1, "test error");
634
635        // No assertion needed — just ensure no panic
636    }
637
638    #[test]
639    fn test_filter_chain_debug() {
640        let chain = FilterChain::new(FilterChainConfig::enabled());
641        let debug_str = format!("{:?}", chain);
642        assert!(debug_str.contains("FilterChain"));
643        assert!(debug_str.contains("enabled"));
644    }
645
646    #[test]
647    fn test_filter_chain_config_validate() {
648        let config = FilterChainConfig::enabled();
649        assert!(config.validate().is_ok());
650    }
651
652    #[test]
653    fn test_frame_envelope_creation() {
654        let envelope = make_envelope();
655        assert_eq!(envelope.channel_id, 1);
656        assert_eq!(envelope.priority, QueuePriority::Normal);
657        assert!(envelope.requires_ack);
658        assert!(envelope.frame_size_bytes > 0);
659        assert_eq!(envelope.retry_count, 0);
660        assert_eq!(envelope.accumulated_delay, Duration::ZERO);
661    }
662
663    #[test]
664    fn test_frame_envelope_builders() {
665        let envelope = make_envelope()
666            .with_priority(QueuePriority::High)
667            .with_ack_required(false);
668
669        assert_eq!(envelope.priority, QueuePriority::High);
670        assert!(!envelope.requires_ack);
671    }
672
673    #[test]
674    fn test_filter_result_variants() {
675        let pass = FilterResult::pass();
676        assert!(pass.should_continue());
677
678        let delayed = FilterResult::pass_with_delay(Duration::from_millis(50));
679        assert!(delayed.should_continue());
680
681        let queued = FilterResult::Queued;
682        assert!(!queued.should_continue());
683
684        let dropped = FilterResult::Dropped {
685            reason: "test".to_string(),
686        };
687        assert!(!dropped.should_continue());
688
689        let error = FilterResult::Error {
690            message: "test error".to_string(),
691        };
692        assert!(!error.should_continue());
693    }
694
695    #[test]
696    fn test_filter_direction_display() {
697        assert_eq!(FilterDirection::Send.to_string(), "send");
698        assert_eq!(FilterDirection::Recv.to_string(), "recv");
699    }
700
701    #[test]
702    fn test_drain_pending_disabled() {
703        let chain = FilterChain::new(FilterChainConfig::default());
704        let drained = chain.drain_pending(1, 10);
705        assert!(drained.is_empty());
706    }
707
708    #[test]
709    fn test_has_pending_disabled() {
710        let chain = FilterChain::new(FilterChainConfig::default());
711        assert!(!chain.has_pending(1));
712    }
713
714    #[test]
715    fn test_pending_count_disabled() {
716        let chain = FilterChain::new(FilterChainConfig::default());
717        assert_eq!(chain.pending_count(), 0);
718    }
719}