Skip to main content

mabi_knx/filter/
queue.rs

1//! QueueFilter — 3-priority FIFO with WaitingForAck backpressure.
2//!
3//! The QueueFilter provides priority-based frame queuing that simulates
4//! real KNX gateway behavior. When the tunnel connection is in
5//! WaitingForAck state, frames are queued instead of being sent directly.
6//!
7//! ## Priority Levels
8//!
9//! - **High**: L_Data.con confirmations, M_Reset.ind, system frames
10//! - **Normal**: L_Data.req group value writes/reads, L_Data.ind broadcasts
11//! - **Low**: Bus monitor frames, property service responses, diagnostics
12//!
13//! ## Backpressure
14//!
15//! When `waiting_for_ack` is active for a channel, the QueueFilter holds
16//! frames instead of passing them through. This prevents frame loss when
17//! the client hasn't yet acknowledged the previous frame.
18//!
19//! ## Queue Limits
20//!
21//! Each priority level has a configurable maximum depth. When a queue is
22//! full, the oldest frame in the lowest priority queue is dropped to make
23//! room (priority-based eviction).
24
25use std::collections::{HashMap, VecDeque};
26use std::fmt;
27use std::sync::atomic::{AtomicU64, Ordering};
28
29use parking_lot::RwLock;
30use serde::{Deserialize, Serialize};
31use tracing::{debug, trace};
32
33use crate::cemi::MessageCode;
34
35use super::chain::{FilterResult, FrameEnvelope};
36
37// ============================================================================
38// Queue Priority
39// ============================================================================
40
41/// Frame priority for queue ordering.
42///
43/// Frames are dequeued in strict priority order: High → Normal → Low.
44/// Within the same priority, frames are dequeued in FIFO order.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
46pub enum QueuePriority {
47    /// Lowest priority: diagnostics, bus monitor, property responses.
48    Low = 0,
49    /// Default priority: regular data frames.
50    Normal = 1,
51    /// Highest priority: confirmations, resets, system frames.
52    High = 2,
53}
54
55impl QueuePriority {
56    /// Classify a frame by its message code into a priority level.
57    pub fn classify(message_code: MessageCode) -> Self {
58        match message_code {
59            // High priority: confirmations and resets
60            MessageCode::LDataCon
61            | MessageCode::LDataReq  // Original requests get high priority for retransmit
62            | MessageCode::MResetInd
63            | MessageCode::MResetReq => Self::High,
64
65            // Normal priority: data indications and standard requests
66            MessageCode::LDataInd
67            | MessageCode::MPropReadReq
68            | MessageCode::MPropWriteReq
69            | MessageCode::MPropReadCon
70            | MessageCode::MPropWriteCon => Self::Normal,
71
72            // Low priority: bus monitor, raw, and everything else
73            MessageCode::LBusmonInd
74            | MessageCode::LRawReq
75            | MessageCode::LRawCon
76            | MessageCode::LRawInd => Self::Low,
77        }
78    }
79
80    /// All priority levels in descending order (highest first).
81    pub fn all_descending() -> &'static [QueuePriority] {
82        &[Self::High, Self::Normal, Self::Low]
83    }
84}
85
86impl fmt::Display for QueuePriority {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        match self {
89            Self::Low => write!(f, "Low"),
90            Self::Normal => write!(f, "Normal"),
91            Self::High => write!(f, "High"),
92        }
93    }
94}
95
96// ============================================================================
97// Per-Channel Queue State
98// ============================================================================
99
100/// Per-channel queue state tracking.
101#[derive(Debug)]
102struct ChannelQueueState {
103    /// Whether this channel is currently waiting for an ACK.
104    waiting_for_ack: bool,
105    /// Per-priority FIFO queues.
106    queues: [VecDeque<FrameEnvelope>; 3],
107}
108
109impl ChannelQueueState {
110    fn new() -> Self {
111        Self {
112            waiting_for_ack: false,
113            queues: [
114                VecDeque::new(), // Low (index 0)
115                VecDeque::new(), // Normal (index 1)
116                VecDeque::new(), // High (index 2)
117            ],
118        }
119    }
120
121    /// Get the queue for a given priority level.
122    fn queue_mut(&mut self, priority: QueuePriority) -> &mut VecDeque<FrameEnvelope> {
123        &mut self.queues[priority as usize]
124    }
125
126    /// Get the total number of queued frames across all priorities.
127    fn total_queued(&self) -> usize {
128        self.queues.iter().map(|q| q.len()).sum()
129    }
130
131    /// Drain up to `max_count` frames in priority order (High → Normal → Low).
132    fn drain(&mut self, max_count: usize) -> Vec<FrameEnvelope> {
133        let mut result = Vec::with_capacity(max_count);
134
135        for priority in QueuePriority::all_descending() {
136            let queue = &mut self.queues[*priority as usize];
137            while result.len() < max_count {
138                match queue.pop_front() {
139                    Some(envelope) => result.push(envelope),
140                    None => break,
141                }
142            }
143            if result.len() >= max_count {
144                break;
145            }
146        }
147
148        result
149    }
150
151    /// Drop the oldest frame from the lowest priority queue that has frames.
152    /// Returns the priority level of the dropped frame, or None if all queues are empty.
153    fn evict_lowest(&mut self) -> Option<QueuePriority> {
154        // Try Low first, then Normal (never evict High)
155        for &priority in &[QueuePriority::Low, QueuePriority::Normal] {
156            let queue = &mut self.queues[priority as usize];
157            if queue.pop_front().is_some() {
158                return Some(priority);
159            }
160        }
161        None
162    }
163}
164
165// ============================================================================
166// QueueFilter Configuration
167// ============================================================================
168
169/// QueueFilter configuration.
170#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct QueueFilterConfig {
172    /// Whether the QueueFilter is enabled.
173    #[serde(default = "default_true")]
174    pub enabled: bool,
175
176    /// Maximum queue depth per priority level per channel.
177    #[serde(default = "default_max_queue_depth")]
178    pub max_queue_depth: usize,
179
180    /// Whether to enable WaitingForAck backpressure.
181    /// When true, frames are queued when the connection is WaitingForAck.
182    #[serde(default = "default_true")]
183    pub backpressure_enabled: bool,
184
185    /// Whether to evict lower-priority frames when queue is full.
186    /// When false, new frames are dropped if queue is full.
187    #[serde(default = "default_true")]
188    pub priority_eviction_enabled: bool,
189
190    /// Maximum total frames across all channels and priorities.
191    /// 0 = unlimited (bounded only by per-channel limits).
192    #[serde(default = "default_max_total_frames")]
193    pub max_total_frames: usize,
194}
195
196fn default_true() -> bool {
197    true
198}
199
200fn default_max_queue_depth() -> usize {
201    64
202}
203
204fn default_max_total_frames() -> usize {
205    1024
206}
207
208impl Default for QueueFilterConfig {
209    fn default() -> Self {
210        Self {
211            enabled: true,
212            max_queue_depth: default_max_queue_depth(),
213            backpressure_enabled: true,
214            priority_eviction_enabled: true,
215            max_total_frames: default_max_total_frames(),
216        }
217    }
218}
219
220impl QueueFilterConfig {
221    /// Validate the configuration.
222    pub fn validate(&self) -> Result<(), String> {
223        if self.max_queue_depth == 0 {
224            return Err("QueueFilter max_queue_depth must be > 0".to_string());
225        }
226        Ok(())
227    }
228}
229
230// ============================================================================
231// QueueFilter Statistics
232// ============================================================================
233
234/// Lock-free QueueFilter statistics.
235#[derive(Debug, Default)]
236pub struct QueueFilterStats {
237    /// Frames that passed through without queuing.
238    pub direct_pass: AtomicU64,
239    /// Frames that were queued due to backpressure.
240    pub queued_frames: AtomicU64,
241    /// Frames that were dropped because queue was full.
242    pub dropped_full: AtomicU64,
243    /// Frames evicted from lower-priority queues to make room.
244    pub evicted_frames: AtomicU64,
245    /// Frames successfully drained from queues.
246    pub drained_frames: AtomicU64,
247    /// High-priority frames processed.
248    pub high_priority: AtomicU64,
249    /// Normal-priority frames processed.
250    pub normal_priority: AtomicU64,
251    /// Low-priority frames processed.
252    pub low_priority: AtomicU64,
253}
254
255/// Snapshot of QueueFilter statistics.
256#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub struct QueueFilterStatsSnapshot {
258    pub direct_pass: u64,
259    pub queued_frames: u64,
260    pub dropped_full: u64,
261    pub evicted_frames: u64,
262    pub drained_frames: u64,
263    pub high_priority: u64,
264    pub normal_priority: u64,
265    pub low_priority: u64,
266}
267
268// ============================================================================
269// QueueFilter
270// ============================================================================
271
272/// 3-priority FIFO queue with WaitingForAck backpressure.
273///
274/// The QueueFilter classifies frames by priority and queues them when
275/// the target connection is in WaitingForAck state. This prevents
276/// frame loss from the server overwhelming the client before it has
277/// acknowledged the previous frame.
278///
279/// When backpressure is released (ACK received), queued frames are
280/// drained in strict priority order: High → Normal → Low.
281pub struct QueueFilter {
282    config: QueueFilterConfig,
283    /// Per-channel queue state.
284    channels: RwLock<HashMap<u8, ChannelQueueState>>,
285    stats: QueueFilterStats,
286}
287
288impl QueueFilter {
289    /// Create a new QueueFilter with the given configuration.
290    pub fn new(config: QueueFilterConfig) -> Self {
291        Self {
292            config,
293            channels: RwLock::new(HashMap::new()),
294            stats: QueueFilterStats::default(),
295        }
296    }
297
298    /// Process a frame in the send direction.
299    ///
300    /// 1. Classify the frame's priority based on message code
301    /// 2. If backpressure is active (WaitingForAck), queue the frame
302    /// 3. Otherwise, pass through with priority set on envelope
303    pub fn process_send(&self, envelope: &mut FrameEnvelope) -> FilterResult {
304        if !self.config.enabled {
305            return FilterResult::pass();
306        }
307
308        // Classify priority
309        let priority = QueuePriority::classify(envelope.cemi.message_code);
310        envelope.priority = priority;
311
312        // Record priority stats
313        match priority {
314            QueuePriority::High => self.stats.high_priority.fetch_add(1, Ordering::Relaxed),
315            QueuePriority::Normal => self.stats.normal_priority.fetch_add(1, Ordering::Relaxed),
316            QueuePriority::Low => self.stats.low_priority.fetch_add(1, Ordering::Relaxed),
317        };
318
319        // Check backpressure
320        if self.config.backpressure_enabled {
321            let mut channels = self.channels.write();
322            let total_pending: usize = channels.values().map(|cs| cs.total_queued()).sum();
323
324            let channel_state = channels
325                .entry(envelope.channel_id)
326                .or_insert_with(ChannelQueueState::new);
327
328            if channel_state.waiting_for_ack {
329                // Backpressure active — queue the frame
330                return self.enqueue_frame(channel_state, envelope.clone(), total_pending);
331            }
332        }
333
334        self.stats.direct_pass.fetch_add(1, Ordering::Relaxed);
335        FilterResult::pass()
336    }
337
338    /// Process a frame in the recv direction.
339    ///
340    /// On the receive path, we just pass through. The actual backpressure
341    /// release happens via `on_ack_received()`.
342    pub fn process_recv(&self, _envelope: &FrameEnvelope) -> FilterResult {
343        FilterResult::pass()
344    }
345
346    /// Enqueue a frame with priority-based eviction.
347    ///
348    /// `total_pending` is the total number of frames across all channels,
349    /// computed before this call to avoid deadlocking on the channels lock.
350    fn enqueue_frame(
351        &self,
352        channel_state: &mut ChannelQueueState,
353        envelope: FrameEnvelope,
354        total_pending: usize,
355    ) -> FilterResult {
356        let priority = envelope.priority;
357        let queue = channel_state.queue_mut(priority);
358
359        // Check per-priority queue depth
360        if queue.len() >= self.config.max_queue_depth {
361            if self.config.priority_eviction_enabled {
362                // Try to evict from a lower-priority queue
363                if let Some(evicted_priority) = channel_state.evict_lowest() {
364                    self.stats.evicted_frames.fetch_add(1, Ordering::Relaxed);
365                    debug!(
366                        channel_id = envelope.channel_id,
367                        evicted_priority = %evicted_priority,
368                        enqueue_priority = %priority,
369                        "QueueFilter: evicted lower-priority frame to make room"
370                    );
371                } else {
372                    // All lower queues empty, and current queue is full — drop
373                    self.stats.dropped_full.fetch_add(1, Ordering::Relaxed);
374                    return FilterResult::Dropped {
375                        reason: format!(
376                            "QueueFilter: {} queue full ({} frames), no lower priority to evict",
377                            priority,
378                            self.config.max_queue_depth,
379                        ),
380                    };
381                }
382            } else {
383                // No eviction — drop
384                self.stats.dropped_full.fetch_add(1, Ordering::Relaxed);
385                return FilterResult::Dropped {
386                    reason: format!(
387                        "QueueFilter: {} queue full ({} frames)",
388                        priority,
389                        self.config.max_queue_depth,
390                    ),
391                };
392            }
393        }
394
395        // Check total frame limit
396        if self.config.max_total_frames > 0 && total_pending >= self.config.max_total_frames {
397            self.stats.dropped_full.fetch_add(1, Ordering::Relaxed);
398            return FilterResult::Dropped {
399                reason: format!(
400                    "QueueFilter: total frame limit reached ({} >= {})",
401                    total_pending, self.config.max_total_frames,
402                ),
403            };
404        }
405
406        // Enqueue
407        let queue = channel_state.queue_mut(priority);
408        queue.push_back(envelope);
409        self.stats.queued_frames.fetch_add(1, Ordering::Relaxed);
410
411        trace!(
412            priority = %priority,
413            queue_depth = queue.len(),
414            "QueueFilter: frame queued"
415        );
416
417        FilterResult::Queued
418    }
419
420    /// Set a channel's WaitingForAck backpressure flag.
421    pub fn set_waiting_for_ack(&self, channel_id: u8, waiting: bool) {
422        if !self.config.enabled || !self.config.backpressure_enabled {
423            return;
424        }
425
426        let mut channels = self.channels.write();
427        let channel_state = channels
428            .entry(channel_id)
429            .or_insert_with(ChannelQueueState::new);
430        channel_state.waiting_for_ack = waiting;
431
432        trace!(
433            channel_id,
434            waiting_for_ack = waiting,
435            "QueueFilter: backpressure state changed"
436        );
437    }
438
439    /// Notify that an ACK was received for a channel.
440    ///
441    /// This releases backpressure and allows queued frames to be drained.
442    pub fn on_ack_received(&self, channel_id: u8) {
443        if !self.config.enabled {
444            return;
445        }
446
447        let mut channels = self.channels.write();
448        if let Some(channel_state) = channels.get_mut(&channel_id) {
449            channel_state.waiting_for_ack = false;
450            trace!(
451                channel_id,
452                pending = channel_state.total_queued(),
453                "QueueFilter: ACK received, backpressure released"
454            );
455        }
456    }
457
458    /// Notify that a send error occurred for a channel.
459    ///
460    /// This does not change backpressure state — the connection may retry.
461    pub fn on_send_error(&self, channel_id: u8) {
462        trace!(channel_id, "QueueFilter: send error recorded");
463    }
464
465    /// Check if there are pending frames for a channel.
466    pub fn has_pending(&self, channel_id: u8) -> bool {
467        let channels = self.channels.read();
468        channels
469            .get(&channel_id)
470            .map(|cs| cs.total_queued() > 0)
471            .unwrap_or(false)
472    }
473
474    /// Get the total number of pending frames across all channels.
475    pub fn total_pending(&self) -> usize {
476        let channels = self.channels.read();
477        channels.values().map(|cs| cs.total_queued()).sum()
478    }
479
480    /// Drain up to `max_count` frames from a channel in priority order.
481    ///
482    /// Returns frames in descending priority order: High → Normal → Low.
483    pub fn drain(&self, channel_id: u8, max_count: usize) -> Vec<FrameEnvelope> {
484        if !self.config.enabled {
485            return Vec::new();
486        }
487
488        let mut channels = self.channels.write();
489        let result = match channels.get_mut(&channel_id) {
490            Some(channel_state) => {
491                let drained = channel_state.drain(max_count);
492                self.stats
493                    .drained_frames
494                    .fetch_add(drained.len() as u64, Ordering::Relaxed);
495
496                trace!(
497                    channel_id,
498                    drained_count = drained.len(),
499                    remaining = channel_state.total_queued(),
500                    "QueueFilter: frames drained"
501                );
502
503                drained
504            }
505            None => Vec::new(),
506        };
507
508        result
509    }
510
511    /// Get pending frame counts per priority for a channel.
512    pub fn pending_by_priority(&self, channel_id: u8) -> [usize; 3] {
513        let channels = self.channels.read();
514        match channels.get(&channel_id) {
515            Some(cs) => [
516                cs.queues[0].len(), // Low
517                cs.queues[1].len(), // Normal
518                cs.queues[2].len(), // High
519            ],
520            None => [0, 0, 0],
521        }
522    }
523
524    /// Remove all queued frames for a channel (e.g., on disconnect).
525    pub fn clear_channel(&self, channel_id: u8) {
526        let mut channels = self.channels.write();
527        channels.remove(&channel_id);
528    }
529
530    /// Get a snapshot of the statistics.
531    pub fn stats_snapshot(&self) -> QueueFilterStatsSnapshot {
532        QueueFilterStatsSnapshot {
533            direct_pass: self.stats.direct_pass.load(Ordering::Relaxed),
534            queued_frames: self.stats.queued_frames.load(Ordering::Relaxed),
535            dropped_full: self.stats.dropped_full.load(Ordering::Relaxed),
536            evicted_frames: self.stats.evicted_frames.load(Ordering::Relaxed),
537            drained_frames: self.stats.drained_frames.load(Ordering::Relaxed),
538            high_priority: self.stats.high_priority.load(Ordering::Relaxed),
539            normal_priority: self.stats.normal_priority.load(Ordering::Relaxed),
540            low_priority: self.stats.low_priority.load(Ordering::Relaxed),
541        }
542    }
543}
544
545impl fmt::Debug for QueueFilter {
546    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
547        f.debug_struct("QueueFilter")
548            .field("enabled", &self.config.enabled)
549            .field("max_queue_depth", &self.config.max_queue_depth)
550            .field("backpressure_enabled", &self.config.backpressure_enabled)
551            .field("total_pending", &self.total_pending())
552            .finish()
553    }
554}
555
556// ============================================================================
557// Tests
558// ============================================================================
559
560#[cfg(test)]
561mod tests {
562    use super::*;
563    use crate::address::{GroupAddress, IndividualAddress};
564    use crate::cemi::CemiFrame;
565
566    fn make_envelope(channel_id: u8) -> FrameEnvelope {
567        let cemi = CemiFrame::group_value_write(
568            IndividualAddress::new(1, 1, 1),
569            GroupAddress::three_level(1, 0, 1),
570            vec![0x01],
571        );
572        FrameEnvelope::new(cemi, channel_id, "192.168.1.100:3671".parse().unwrap())
573    }
574
575    fn make_confirmation_envelope(channel_id: u8) -> FrameEnvelope {
576        let mut cemi = CemiFrame::group_value_write(
577            IndividualAddress::new(1, 1, 1),
578            GroupAddress::three_level(1, 0, 1),
579            vec![0x01],
580        );
581        cemi.message_code = MessageCode::LDataCon;
582        FrameEnvelope::new(cemi, channel_id, "192.168.1.100:3671".parse().unwrap())
583    }
584
585    fn make_busmon_envelope(channel_id: u8) -> FrameEnvelope {
586        let cemi = CemiFrame::bus_monitor_indication(&[0x11, 0x00, 0x00], 0x00, 0);
587        FrameEnvelope::new(cemi, channel_id, "192.168.1.100:3671".parse().unwrap())
588    }
589
590    #[test]
591    fn test_queue_priority_classify() {
592        assert_eq!(
593            QueuePriority::classify(MessageCode::LDataCon),
594            QueuePriority::High
595        );
596        assert_eq!(
597            QueuePriority::classify(MessageCode::MResetInd),
598            QueuePriority::High
599        );
600        assert_eq!(
601            QueuePriority::classify(MessageCode::LDataInd),
602            QueuePriority::Normal
603        );
604        assert_eq!(
605            QueuePriority::classify(MessageCode::MPropReadReq),
606            QueuePriority::Normal
607        );
608        assert_eq!(
609            QueuePriority::classify(MessageCode::LBusmonInd),
610            QueuePriority::Low
611        );
612        assert_eq!(
613            QueuePriority::classify(MessageCode::LRawReq),
614            QueuePriority::Low
615        );
616    }
617
618    #[test]
619    fn test_queue_priority_ordering() {
620        assert!(QueuePriority::High > QueuePriority::Normal);
621        assert!(QueuePriority::Normal > QueuePriority::Low);
622    }
623
624    #[test]
625    fn test_queue_priority_display() {
626        assert_eq!(QueuePriority::High.to_string(), "High");
627        assert_eq!(QueuePriority::Normal.to_string(), "Normal");
628        assert_eq!(QueuePriority::Low.to_string(), "Low");
629    }
630
631    #[test]
632    fn test_queue_filter_disabled() {
633        let mut config = QueueFilterConfig::default();
634        config.enabled = false;
635        let filter = QueueFilter::new(config);
636
637        let mut envelope = make_envelope(1);
638        let result = filter.process_send(&mut envelope);
639        assert!(result.should_continue());
640    }
641
642    #[test]
643    fn test_queue_filter_passthrough_no_backpressure() {
644        let config = QueueFilterConfig::default();
645        let filter = QueueFilter::new(config);
646
647        let mut envelope = make_envelope(1);
648        let result = filter.process_send(&mut envelope);
649        assert!(result.should_continue());
650
651        let stats = filter.stats_snapshot();
652        assert_eq!(stats.direct_pass, 1);
653        assert_eq!(stats.queued_frames, 0);
654    }
655
656    #[test]
657    fn test_queue_filter_backpressure_queues_frames() {
658        let config = QueueFilterConfig::default();
659        let filter = QueueFilter::new(config);
660
661        // Enable backpressure for channel 1
662        filter.set_waiting_for_ack(1, true);
663
664        let mut envelope = make_envelope(1);
665        let result = filter.process_send(&mut envelope);
666        assert!(matches!(result, FilterResult::Queued));
667
668        let stats = filter.stats_snapshot();
669        assert_eq!(stats.queued_frames, 1);
670        assert!(filter.has_pending(1));
671        assert_eq!(filter.total_pending(), 1);
672    }
673
674    #[test]
675    fn test_queue_filter_backpressure_release() {
676        let config = QueueFilterConfig::default();
677        let filter = QueueFilter::new(config);
678
679        // Enable backpressure
680        filter.set_waiting_for_ack(1, true);
681
682        // Queue some frames
683        for _ in 0..3 {
684            let mut envelope = make_envelope(1);
685            filter.process_send(&mut envelope);
686        }
687        assert_eq!(filter.total_pending(), 3);
688
689        // Release backpressure
690        filter.on_ack_received(1);
691
692        // Drain queued frames
693        let drained = filter.drain(1, 10);
694        assert_eq!(drained.len(), 3);
695        assert_eq!(filter.total_pending(), 0);
696    }
697
698    #[test]
699    fn test_queue_filter_priority_ordering() {
700        let config = QueueFilterConfig::default();
701        let filter = QueueFilter::new(config);
702
703        // Enable backpressure
704        filter.set_waiting_for_ack(1, true);
705
706        // Queue frames of different priorities
707        let mut low_env = make_busmon_envelope(1);
708        filter.process_send(&mut low_env);
709
710        let mut normal_env = make_envelope(1);
711        filter.process_send(&mut normal_env);
712
713        let mut high_env = make_confirmation_envelope(1);
714        filter.process_send(&mut high_env);
715
716        // Drain should return High → Normal → Low
717        let drained = filter.drain(1, 10);
718        assert_eq!(drained.len(), 3);
719        assert_eq!(drained[0].priority, QueuePriority::High);
720        assert_eq!(drained[1].priority, QueuePriority::Normal);
721        assert_eq!(drained[2].priority, QueuePriority::Low);
722    }
723
724    #[test]
725    fn test_queue_filter_queue_full_drop() {
726        let mut config = QueueFilterConfig::default();
727        config.max_queue_depth = 2;
728        config.priority_eviction_enabled = false; // Disable eviction
729        let filter = QueueFilter::new(config);
730
731        filter.set_waiting_for_ack(1, true);
732
733        // Fill the queue
734        for _ in 0..2 {
735            let mut envelope = make_envelope(1);
736            let result = filter.process_send(&mut envelope);
737            assert!(matches!(result, FilterResult::Queued));
738        }
739
740        // Third frame should be dropped
741        let mut envelope = make_envelope(1);
742        let result = filter.process_send(&mut envelope);
743        assert!(matches!(result, FilterResult::Dropped { .. }));
744
745        let stats = filter.stats_snapshot();
746        assert_eq!(stats.dropped_full, 1);
747    }
748
749    #[test]
750    fn test_queue_filter_priority_eviction() {
751        let mut config = QueueFilterConfig::default();
752        config.max_queue_depth = 1;
753        config.priority_eviction_enabled = true;
754        let filter = QueueFilter::new(config);
755
756        filter.set_waiting_for_ack(1, true);
757
758        // Queue a low-priority frame (fills Low queue to max_queue_depth=1)
759        let mut low_env = make_busmon_envelope(1);
760        let result = filter.process_send(&mut low_env);
761        assert!(matches!(result, FilterResult::Queued));
762
763        // Queue a normal-priority frame (Normal queue is empty, direct enqueue)
764        let mut normal_env1 = make_envelope(1);
765        let result = filter.process_send(&mut normal_env1);
766        assert!(matches!(result, FilterResult::Queued));
767
768        // Verify both queued: Low=1, Normal=1
769        let counts = filter.pending_by_priority(1);
770        assert_eq!(counts[0], 1); // Low: still present
771        assert_eq!(counts[1], 1); // Normal: queued
772
773        // Now queue ANOTHER normal-priority frame — Normal queue is full (1 >= max_queue_depth=1)
774        // This triggers eviction from the Low queue to make room
775        let mut normal_env2 = make_envelope(1);
776        let result = filter.process_send(&mut normal_env2);
777        assert!(matches!(result, FilterResult::Queued));
778
779        // Low was evicted, second normal enqueued into Normal queue
780        let counts = filter.pending_by_priority(1);
781        assert_eq!(counts[0], 0); // Low: evicted
782        assert_eq!(counts[1], 2); // Normal: 2 frames (the eviction freed a slot, allowing enqueue)
783
784        let stats = filter.stats_snapshot();
785        assert_eq!(stats.evicted_frames, 1);
786    }
787
788    #[test]
789    fn test_queue_filter_multi_channel() {
790        let config = QueueFilterConfig::default();
791        let filter = QueueFilter::new(config);
792
793        filter.set_waiting_for_ack(1, true);
794        filter.set_waiting_for_ack(2, true);
795
796        // Queue frames for both channels
797        let mut env1 = make_envelope(1);
798        filter.process_send(&mut env1);
799        let mut env2 = make_envelope(2);
800        filter.process_send(&mut env2);
801
802        assert_eq!(filter.total_pending(), 2);
803        assert!(filter.has_pending(1));
804        assert!(filter.has_pending(2));
805
806        // Drain channel 1 only
807        let drained = filter.drain(1, 10);
808        assert_eq!(drained.len(), 1);
809        assert!(!filter.has_pending(1));
810        assert!(filter.has_pending(2));
811    }
812
813    #[test]
814    fn test_queue_filter_clear_channel() {
815        let config = QueueFilterConfig::default();
816        let filter = QueueFilter::new(config);
817
818        filter.set_waiting_for_ack(1, true);
819        let mut env = make_envelope(1);
820        filter.process_send(&mut env);
821
822        filter.clear_channel(1);
823        assert!(!filter.has_pending(1));
824        assert_eq!(filter.total_pending(), 0);
825    }
826
827    #[test]
828    fn test_queue_filter_pending_by_priority() {
829        let config = QueueFilterConfig::default();
830        let filter = QueueFilter::new(config);
831
832        filter.set_waiting_for_ack(1, true);
833
834        // Queue frames of different types
835        let mut env1 = make_busmon_envelope(1);    // Low
836        let mut env2 = make_envelope(1);           // Normal
837        let mut env3 = make_confirmation_envelope(1); // High
838
839        filter.process_send(&mut env1);
840        filter.process_send(&mut env2);
841        filter.process_send(&mut env3);
842
843        let counts = filter.pending_by_priority(1);
844        assert_eq!(counts[0], 1); // Low
845        assert_eq!(counts[1], 1); // Normal
846        assert_eq!(counts[2], 1); // High
847    }
848
849    #[test]
850    fn test_queue_filter_config_validate() {
851        let config = QueueFilterConfig::default();
852        assert!(config.validate().is_ok());
853
854        let mut bad_config = QueueFilterConfig::default();
855        bad_config.max_queue_depth = 0;
856        assert!(bad_config.validate().is_err());
857    }
858
859    #[test]
860    fn test_queue_filter_recv_passthrough() {
861        let config = QueueFilterConfig::default();
862        let filter = QueueFilter::new(config);
863
864        let envelope = make_envelope(1);
865        let result = filter.process_recv(&envelope);
866        assert!(result.should_continue());
867    }
868
869    #[test]
870    fn test_queue_filter_debug() {
871        let config = QueueFilterConfig::default();
872        let filter = QueueFilter::new(config);
873        let debug_str = format!("{:?}", filter);
874        assert!(debug_str.contains("QueueFilter"));
875        assert!(debug_str.contains("enabled"));
876    }
877
878    #[test]
879    fn test_queue_filter_stats() {
880        let config = QueueFilterConfig::default();
881        let filter = QueueFilter::new(config);
882
883        // Several operations
884        let mut env = make_envelope(1);
885        filter.process_send(&mut env);
886
887        filter.set_waiting_for_ack(1, true);
888        let mut env2 = make_envelope(1);
889        filter.process_send(&mut env2);
890
891        let stats = filter.stats_snapshot();
892        assert_eq!(stats.direct_pass, 1);
893        assert_eq!(stats.queued_frames, 1);
894    }
895
896    #[test]
897    fn test_max_total_frames_limit() {
898        let mut config = QueueFilterConfig::default();
899        config.max_total_frames = 2;
900        let filter = QueueFilter::new(config);
901
902        filter.set_waiting_for_ack(1, true);
903
904        // Queue 2 frames
905        for _ in 0..2 {
906            let mut env = make_envelope(1);
907            let result = filter.process_send(&mut env);
908            assert!(matches!(result, FilterResult::Queued));
909        }
910
911        // Third should be dropped (total limit)
912        let mut env = make_envelope(1);
913        let result = filter.process_send(&mut env);
914        assert!(matches!(result, FilterResult::Dropped { .. }));
915    }
916}