1use std::collections::{HashMap, VecDeque};
26use std::fmt;
27use std::sync::atomic::{AtomicU64, Ordering};
28
29use parking_lot::RwLock;
30use serde::{Deserialize, Serialize};
31use tracing::{debug, trace};
32
33use crate::cemi::MessageCode;
34
35use super::chain::{FilterResult, FrameEnvelope};
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
46pub enum QueuePriority {
47 Low = 0,
49 Normal = 1,
51 High = 2,
53}
54
55impl QueuePriority {
56 pub fn classify(message_code: MessageCode) -> Self {
58 match message_code {
59 MessageCode::LDataCon
61 | MessageCode::LDataReq | MessageCode::MResetInd
63 | MessageCode::MResetReq => Self::High,
64
65 MessageCode::LDataInd
67 | MessageCode::MPropReadReq
68 | MessageCode::MPropWriteReq
69 | MessageCode::MPropReadCon
70 | MessageCode::MPropWriteCon => Self::Normal,
71
72 MessageCode::LBusmonInd
74 | MessageCode::LRawReq
75 | MessageCode::LRawCon
76 | MessageCode::LRawInd => Self::Low,
77 }
78 }
79
80 pub fn all_descending() -> &'static [QueuePriority] {
82 &[Self::High, Self::Normal, Self::Low]
83 }
84}
85
86impl fmt::Display for QueuePriority {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 match self {
89 Self::Low => write!(f, "Low"),
90 Self::Normal => write!(f, "Normal"),
91 Self::High => write!(f, "High"),
92 }
93 }
94}
95
96#[derive(Debug)]
102struct ChannelQueueState {
103 waiting_for_ack: bool,
105 queues: [VecDeque<FrameEnvelope>; 3],
107}
108
109impl ChannelQueueState {
110 fn new() -> Self {
111 Self {
112 waiting_for_ack: false,
113 queues: [
114 VecDeque::new(), VecDeque::new(), VecDeque::new(), ],
118 }
119 }
120
121 fn queue_mut(&mut self, priority: QueuePriority) -> &mut VecDeque<FrameEnvelope> {
123 &mut self.queues[priority as usize]
124 }
125
126 fn total_queued(&self) -> usize {
128 self.queues.iter().map(|q| q.len()).sum()
129 }
130
131 fn drain(&mut self, max_count: usize) -> Vec<FrameEnvelope> {
133 let mut result = Vec::with_capacity(max_count);
134
135 for priority in QueuePriority::all_descending() {
136 let queue = &mut self.queues[*priority as usize];
137 while result.len() < max_count {
138 match queue.pop_front() {
139 Some(envelope) => result.push(envelope),
140 None => break,
141 }
142 }
143 if result.len() >= max_count {
144 break;
145 }
146 }
147
148 result
149 }
150
151 fn evict_lowest(&mut self) -> Option<QueuePriority> {
154 for &priority in &[QueuePriority::Low, QueuePriority::Normal] {
156 let queue = &mut self.queues[priority as usize];
157 if queue.pop_front().is_some() {
158 return Some(priority);
159 }
160 }
161 None
162 }
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct QueueFilterConfig {
172 #[serde(default = "default_true")]
174 pub enabled: bool,
175
176 #[serde(default = "default_max_queue_depth")]
178 pub max_queue_depth: usize,
179
180 #[serde(default = "default_true")]
183 pub backpressure_enabled: bool,
184
185 #[serde(default = "default_true")]
188 pub priority_eviction_enabled: bool,
189
190 #[serde(default = "default_max_total_frames")]
193 pub max_total_frames: usize,
194}
195
196fn default_true() -> bool {
197 true
198}
199
200fn default_max_queue_depth() -> usize {
201 64
202}
203
204fn default_max_total_frames() -> usize {
205 1024
206}
207
208impl Default for QueueFilterConfig {
209 fn default() -> Self {
210 Self {
211 enabled: true,
212 max_queue_depth: default_max_queue_depth(),
213 backpressure_enabled: true,
214 priority_eviction_enabled: true,
215 max_total_frames: default_max_total_frames(),
216 }
217 }
218}
219
220impl QueueFilterConfig {
221 pub fn validate(&self) -> Result<(), String> {
223 if self.max_queue_depth == 0 {
224 return Err("QueueFilter max_queue_depth must be > 0".to_string());
225 }
226 Ok(())
227 }
228}
229
230#[derive(Debug, Default)]
236pub struct QueueFilterStats {
237 pub direct_pass: AtomicU64,
239 pub queued_frames: AtomicU64,
241 pub dropped_full: AtomicU64,
243 pub evicted_frames: AtomicU64,
245 pub drained_frames: AtomicU64,
247 pub high_priority: AtomicU64,
249 pub normal_priority: AtomicU64,
251 pub low_priority: AtomicU64,
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub struct QueueFilterStatsSnapshot {
258 pub direct_pass: u64,
259 pub queued_frames: u64,
260 pub dropped_full: u64,
261 pub evicted_frames: u64,
262 pub drained_frames: u64,
263 pub high_priority: u64,
264 pub normal_priority: u64,
265 pub low_priority: u64,
266}
267
268pub struct QueueFilter {
282 config: QueueFilterConfig,
283 channels: RwLock<HashMap<u8, ChannelQueueState>>,
285 stats: QueueFilterStats,
286}
287
288impl QueueFilter {
289 pub fn new(config: QueueFilterConfig) -> Self {
291 Self {
292 config,
293 channels: RwLock::new(HashMap::new()),
294 stats: QueueFilterStats::default(),
295 }
296 }
297
298 pub fn process_send(&self, envelope: &mut FrameEnvelope) -> FilterResult {
304 if !self.config.enabled {
305 return FilterResult::pass();
306 }
307
308 let priority = QueuePriority::classify(envelope.cemi.message_code);
310 envelope.priority = priority;
311
312 match priority {
314 QueuePriority::High => self.stats.high_priority.fetch_add(1, Ordering::Relaxed),
315 QueuePriority::Normal => self.stats.normal_priority.fetch_add(1, Ordering::Relaxed),
316 QueuePriority::Low => self.stats.low_priority.fetch_add(1, Ordering::Relaxed),
317 };
318
319 if self.config.backpressure_enabled {
321 let mut channels = self.channels.write();
322 let total_pending: usize = channels.values().map(|cs| cs.total_queued()).sum();
323
324 let channel_state = channels
325 .entry(envelope.channel_id)
326 .or_insert_with(ChannelQueueState::new);
327
328 if channel_state.waiting_for_ack {
329 return self.enqueue_frame(channel_state, envelope.clone(), total_pending);
331 }
332 }
333
334 self.stats.direct_pass.fetch_add(1, Ordering::Relaxed);
335 FilterResult::pass()
336 }
337
338 pub fn process_recv(&self, _envelope: &FrameEnvelope) -> FilterResult {
343 FilterResult::pass()
344 }
345
346 fn enqueue_frame(
351 &self,
352 channel_state: &mut ChannelQueueState,
353 envelope: FrameEnvelope,
354 total_pending: usize,
355 ) -> FilterResult {
356 let priority = envelope.priority;
357 let queue = channel_state.queue_mut(priority);
358
359 if queue.len() >= self.config.max_queue_depth {
361 if self.config.priority_eviction_enabled {
362 if let Some(evicted_priority) = channel_state.evict_lowest() {
364 self.stats.evicted_frames.fetch_add(1, Ordering::Relaxed);
365 debug!(
366 channel_id = envelope.channel_id,
367 evicted_priority = %evicted_priority,
368 enqueue_priority = %priority,
369 "QueueFilter: evicted lower-priority frame to make room"
370 );
371 } else {
372 self.stats.dropped_full.fetch_add(1, Ordering::Relaxed);
374 return FilterResult::Dropped {
375 reason: format!(
376 "QueueFilter: {} queue full ({} frames), no lower priority to evict",
377 priority, self.config.max_queue_depth,
378 ),
379 };
380 }
381 } else {
382 self.stats.dropped_full.fetch_add(1, Ordering::Relaxed);
384 return FilterResult::Dropped {
385 reason: format!(
386 "QueueFilter: {} queue full ({} frames)",
387 priority, self.config.max_queue_depth,
388 ),
389 };
390 }
391 }
392
393 if self.config.max_total_frames > 0 && total_pending >= self.config.max_total_frames {
395 self.stats.dropped_full.fetch_add(1, Ordering::Relaxed);
396 return FilterResult::Dropped {
397 reason: format!(
398 "QueueFilter: total frame limit reached ({} >= {})",
399 total_pending, self.config.max_total_frames,
400 ),
401 };
402 }
403
404 let queue = channel_state.queue_mut(priority);
406 queue.push_back(envelope);
407 self.stats.queued_frames.fetch_add(1, Ordering::Relaxed);
408
409 trace!(
410 priority = %priority,
411 queue_depth = queue.len(),
412 "QueueFilter: frame queued"
413 );
414
415 FilterResult::Queued
416 }
417
418 pub fn set_waiting_for_ack(&self, channel_id: u8, waiting: bool) {
420 if !self.config.enabled || !self.config.backpressure_enabled {
421 return;
422 }
423
424 let mut channels = self.channels.write();
425 let channel_state = channels
426 .entry(channel_id)
427 .or_insert_with(ChannelQueueState::new);
428 channel_state.waiting_for_ack = waiting;
429
430 trace!(
431 channel_id,
432 waiting_for_ack = waiting,
433 "QueueFilter: backpressure state changed"
434 );
435 }
436
437 pub fn on_ack_received(&self, channel_id: u8) {
441 if !self.config.enabled {
442 return;
443 }
444
445 let mut channels = self.channels.write();
446 if let Some(channel_state) = channels.get_mut(&channel_id) {
447 channel_state.waiting_for_ack = false;
448 trace!(
449 channel_id,
450 pending = channel_state.total_queued(),
451 "QueueFilter: ACK received, backpressure released"
452 );
453 }
454 }
455
456 pub fn on_send_error(&self, channel_id: u8) {
460 trace!(channel_id, "QueueFilter: send error recorded");
461 }
462
463 pub fn has_pending(&self, channel_id: u8) -> bool {
465 let channels = self.channels.read();
466 channels
467 .get(&channel_id)
468 .map(|cs| cs.total_queued() > 0)
469 .unwrap_or(false)
470 }
471
472 pub fn total_pending(&self) -> usize {
474 let channels = self.channels.read();
475 channels.values().map(|cs| cs.total_queued()).sum()
476 }
477
478 pub fn drain(&self, channel_id: u8, max_count: usize) -> Vec<FrameEnvelope> {
482 if !self.config.enabled {
483 return Vec::new();
484 }
485
486 let mut channels = self.channels.write();
487 let result = match channels.get_mut(&channel_id) {
488 Some(channel_state) => {
489 let drained = channel_state.drain(max_count);
490 self.stats
491 .drained_frames
492 .fetch_add(drained.len() as u64, Ordering::Relaxed);
493
494 trace!(
495 channel_id,
496 drained_count = drained.len(),
497 remaining = channel_state.total_queued(),
498 "QueueFilter: frames drained"
499 );
500
501 drained
502 }
503 None => Vec::new(),
504 };
505
506 result
507 }
508
509 pub fn pending_by_priority(&self, channel_id: u8) -> [usize; 3] {
511 let channels = self.channels.read();
512 match channels.get(&channel_id) {
513 Some(cs) => [
514 cs.queues[0].len(), cs.queues[1].len(), cs.queues[2].len(), ],
518 None => [0, 0, 0],
519 }
520 }
521
522 pub fn clear_channel(&self, channel_id: u8) {
524 let mut channels = self.channels.write();
525 channels.remove(&channel_id);
526 }
527
528 pub fn stats_snapshot(&self) -> QueueFilterStatsSnapshot {
530 QueueFilterStatsSnapshot {
531 direct_pass: self.stats.direct_pass.load(Ordering::Relaxed),
532 queued_frames: self.stats.queued_frames.load(Ordering::Relaxed),
533 dropped_full: self.stats.dropped_full.load(Ordering::Relaxed),
534 evicted_frames: self.stats.evicted_frames.load(Ordering::Relaxed),
535 drained_frames: self.stats.drained_frames.load(Ordering::Relaxed),
536 high_priority: self.stats.high_priority.load(Ordering::Relaxed),
537 normal_priority: self.stats.normal_priority.load(Ordering::Relaxed),
538 low_priority: self.stats.low_priority.load(Ordering::Relaxed),
539 }
540 }
541}
542
543impl fmt::Debug for QueueFilter {
544 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
545 f.debug_struct("QueueFilter")
546 .field("enabled", &self.config.enabled)
547 .field("max_queue_depth", &self.config.max_queue_depth)
548 .field("backpressure_enabled", &self.config.backpressure_enabled)
549 .field("total_pending", &self.total_pending())
550 .finish()
551 }
552}
553
554#[cfg(test)]
559mod tests {
560 use super::*;
561 use crate::address::{GroupAddress, IndividualAddress};
562 use crate::cemi::CemiFrame;
563
564 fn make_envelope(channel_id: u8) -> FrameEnvelope {
565 let cemi = CemiFrame::group_value_write(
566 IndividualAddress::new(1, 1, 1),
567 GroupAddress::three_level(1, 0, 1),
568 vec![0x01],
569 );
570 FrameEnvelope::new(cemi, channel_id, "192.168.1.100:3671".parse().unwrap())
571 }
572
573 fn make_confirmation_envelope(channel_id: u8) -> FrameEnvelope {
574 let mut cemi = CemiFrame::group_value_write(
575 IndividualAddress::new(1, 1, 1),
576 GroupAddress::three_level(1, 0, 1),
577 vec![0x01],
578 );
579 cemi.message_code = MessageCode::LDataCon;
580 FrameEnvelope::new(cemi, channel_id, "192.168.1.100:3671".parse().unwrap())
581 }
582
583 fn make_busmon_envelope(channel_id: u8) -> FrameEnvelope {
584 let cemi = CemiFrame::bus_monitor_indication(&[0x11, 0x00, 0x00], 0x00, 0);
585 FrameEnvelope::new(cemi, channel_id, "192.168.1.100:3671".parse().unwrap())
586 }
587
588 #[test]
589 fn test_queue_priority_classify() {
590 assert_eq!(
591 QueuePriority::classify(MessageCode::LDataCon),
592 QueuePriority::High
593 );
594 assert_eq!(
595 QueuePriority::classify(MessageCode::MResetInd),
596 QueuePriority::High
597 );
598 assert_eq!(
599 QueuePriority::classify(MessageCode::LDataInd),
600 QueuePriority::Normal
601 );
602 assert_eq!(
603 QueuePriority::classify(MessageCode::MPropReadReq),
604 QueuePriority::Normal
605 );
606 assert_eq!(
607 QueuePriority::classify(MessageCode::LBusmonInd),
608 QueuePriority::Low
609 );
610 assert_eq!(
611 QueuePriority::classify(MessageCode::LRawReq),
612 QueuePriority::Low
613 );
614 }
615
616 #[test]
617 fn test_queue_priority_ordering() {
618 assert!(QueuePriority::High > QueuePriority::Normal);
619 assert!(QueuePriority::Normal > QueuePriority::Low);
620 }
621
622 #[test]
623 fn test_queue_priority_display() {
624 assert_eq!(QueuePriority::High.to_string(), "High");
625 assert_eq!(QueuePriority::Normal.to_string(), "Normal");
626 assert_eq!(QueuePriority::Low.to_string(), "Low");
627 }
628
629 #[test]
630 fn test_queue_filter_disabled() {
631 let mut config = QueueFilterConfig::default();
632 config.enabled = false;
633 let filter = QueueFilter::new(config);
634
635 let mut envelope = make_envelope(1);
636 let result = filter.process_send(&mut envelope);
637 assert!(result.should_continue());
638 }
639
640 #[test]
641 fn test_queue_filter_passthrough_no_backpressure() {
642 let config = QueueFilterConfig::default();
643 let filter = QueueFilter::new(config);
644
645 let mut envelope = make_envelope(1);
646 let result = filter.process_send(&mut envelope);
647 assert!(result.should_continue());
648
649 let stats = filter.stats_snapshot();
650 assert_eq!(stats.direct_pass, 1);
651 assert_eq!(stats.queued_frames, 0);
652 }
653
654 #[test]
655 fn test_queue_filter_backpressure_queues_frames() {
656 let config = QueueFilterConfig::default();
657 let filter = QueueFilter::new(config);
658
659 filter.set_waiting_for_ack(1, true);
661
662 let mut envelope = make_envelope(1);
663 let result = filter.process_send(&mut envelope);
664 assert!(matches!(result, FilterResult::Queued));
665
666 let stats = filter.stats_snapshot();
667 assert_eq!(stats.queued_frames, 1);
668 assert!(filter.has_pending(1));
669 assert_eq!(filter.total_pending(), 1);
670 }
671
672 #[test]
673 fn test_queue_filter_backpressure_release() {
674 let config = QueueFilterConfig::default();
675 let filter = QueueFilter::new(config);
676
677 filter.set_waiting_for_ack(1, true);
679
680 for _ in 0..3 {
682 let mut envelope = make_envelope(1);
683 filter.process_send(&mut envelope);
684 }
685 assert_eq!(filter.total_pending(), 3);
686
687 filter.on_ack_received(1);
689
690 let drained = filter.drain(1, 10);
692 assert_eq!(drained.len(), 3);
693 assert_eq!(filter.total_pending(), 0);
694 }
695
696 #[test]
697 fn test_queue_filter_priority_ordering() {
698 let config = QueueFilterConfig::default();
699 let filter = QueueFilter::new(config);
700
701 filter.set_waiting_for_ack(1, true);
703
704 let mut low_env = make_busmon_envelope(1);
706 filter.process_send(&mut low_env);
707
708 let mut normal_env = make_envelope(1);
709 filter.process_send(&mut normal_env);
710
711 let mut high_env = make_confirmation_envelope(1);
712 filter.process_send(&mut high_env);
713
714 let drained = filter.drain(1, 10);
716 assert_eq!(drained.len(), 3);
717 assert_eq!(drained[0].priority, QueuePriority::High);
718 assert_eq!(drained[1].priority, QueuePriority::Normal);
719 assert_eq!(drained[2].priority, QueuePriority::Low);
720 }
721
722 #[test]
723 fn test_queue_filter_queue_full_drop() {
724 let mut config = QueueFilterConfig::default();
725 config.max_queue_depth = 2;
726 config.priority_eviction_enabled = false; let filter = QueueFilter::new(config);
728
729 filter.set_waiting_for_ack(1, true);
730
731 for _ in 0..2 {
733 let mut envelope = make_envelope(1);
734 let result = filter.process_send(&mut envelope);
735 assert!(matches!(result, FilterResult::Queued));
736 }
737
738 let mut envelope = make_envelope(1);
740 let result = filter.process_send(&mut envelope);
741 assert!(matches!(result, FilterResult::Dropped { .. }));
742
743 let stats = filter.stats_snapshot();
744 assert_eq!(stats.dropped_full, 1);
745 }
746
747 #[test]
748 fn test_queue_filter_priority_eviction() {
749 let mut config = QueueFilterConfig::default();
750 config.max_queue_depth = 1;
751 config.priority_eviction_enabled = true;
752 let filter = QueueFilter::new(config);
753
754 filter.set_waiting_for_ack(1, true);
755
756 let mut low_env = make_busmon_envelope(1);
758 let result = filter.process_send(&mut low_env);
759 assert!(matches!(result, FilterResult::Queued));
760
761 let mut normal_env1 = make_envelope(1);
763 let result = filter.process_send(&mut normal_env1);
764 assert!(matches!(result, FilterResult::Queued));
765
766 let counts = filter.pending_by_priority(1);
768 assert_eq!(counts[0], 1); assert_eq!(counts[1], 1); let mut normal_env2 = make_envelope(1);
774 let result = filter.process_send(&mut normal_env2);
775 assert!(matches!(result, FilterResult::Queued));
776
777 let counts = filter.pending_by_priority(1);
779 assert_eq!(counts[0], 0); assert_eq!(counts[1], 2); let stats = filter.stats_snapshot();
783 assert_eq!(stats.evicted_frames, 1);
784 }
785
786 #[test]
787 fn test_queue_filter_multi_channel() {
788 let config = QueueFilterConfig::default();
789 let filter = QueueFilter::new(config);
790
791 filter.set_waiting_for_ack(1, true);
792 filter.set_waiting_for_ack(2, true);
793
794 let mut env1 = make_envelope(1);
796 filter.process_send(&mut env1);
797 let mut env2 = make_envelope(2);
798 filter.process_send(&mut env2);
799
800 assert_eq!(filter.total_pending(), 2);
801 assert!(filter.has_pending(1));
802 assert!(filter.has_pending(2));
803
804 let drained = filter.drain(1, 10);
806 assert_eq!(drained.len(), 1);
807 assert!(!filter.has_pending(1));
808 assert!(filter.has_pending(2));
809 }
810
811 #[test]
812 fn test_queue_filter_clear_channel() {
813 let config = QueueFilterConfig::default();
814 let filter = QueueFilter::new(config);
815
816 filter.set_waiting_for_ack(1, true);
817 let mut env = make_envelope(1);
818 filter.process_send(&mut env);
819
820 filter.clear_channel(1);
821 assert!(!filter.has_pending(1));
822 assert_eq!(filter.total_pending(), 0);
823 }
824
825 #[test]
826 fn test_queue_filter_pending_by_priority() {
827 let config = QueueFilterConfig::default();
828 let filter = QueueFilter::new(config);
829
830 filter.set_waiting_for_ack(1, true);
831
832 let mut env1 = make_busmon_envelope(1); let mut env2 = make_envelope(1); let mut env3 = make_confirmation_envelope(1); filter.process_send(&mut env1);
838 filter.process_send(&mut env2);
839 filter.process_send(&mut env3);
840
841 let counts = filter.pending_by_priority(1);
842 assert_eq!(counts[0], 1); assert_eq!(counts[1], 1); assert_eq!(counts[2], 1); }
846
847 #[test]
848 fn test_queue_filter_config_validate() {
849 let config = QueueFilterConfig::default();
850 assert!(config.validate().is_ok());
851
852 let mut bad_config = QueueFilterConfig::default();
853 bad_config.max_queue_depth = 0;
854 assert!(bad_config.validate().is_err());
855 }
856
857 #[test]
858 fn test_queue_filter_recv_passthrough() {
859 let config = QueueFilterConfig::default();
860 let filter = QueueFilter::new(config);
861
862 let envelope = make_envelope(1);
863 let result = filter.process_recv(&envelope);
864 assert!(result.should_continue());
865 }
866
867 #[test]
868 fn test_queue_filter_debug() {
869 let config = QueueFilterConfig::default();
870 let filter = QueueFilter::new(config);
871 let debug_str = format!("{:?}", filter);
872 assert!(debug_str.contains("QueueFilter"));
873 assert!(debug_str.contains("enabled"));
874 }
875
876 #[test]
877 fn test_queue_filter_stats() {
878 let config = QueueFilterConfig::default();
879 let filter = QueueFilter::new(config);
880
881 let mut env = make_envelope(1);
883 filter.process_send(&mut env);
884
885 filter.set_waiting_for_ack(1, true);
886 let mut env2 = make_envelope(1);
887 filter.process_send(&mut env2);
888
889 let stats = filter.stats_snapshot();
890 assert_eq!(stats.direct_pass, 1);
891 assert_eq!(stats.queued_frames, 1);
892 }
893
894 #[test]
895 fn test_max_total_frames_limit() {
896 let mut config = QueueFilterConfig::default();
897 config.max_total_frames = 2;
898 let filter = QueueFilter::new(config);
899
900 filter.set_waiting_for_ack(1, true);
901
902 for _ in 0..2 {
904 let mut env = make_envelope(1);
905 let result = filter.process_send(&mut env);
906 assert!(matches!(result, FilterResult::Queued));
907 }
908
909 let mut env = make_envelope(1);
911 let result = filter.process_send(&mut env);
912 assert!(matches!(result, FilterResult::Dropped { .. }));
913 }
914}