1use std::collections::{HashMap, VecDeque};
26use std::fmt;
27use std::sync::atomic::{AtomicU64, Ordering};
28
29use parking_lot::RwLock;
30use serde::{Deserialize, Serialize};
31use tracing::{debug, trace};
32
33use crate::cemi::MessageCode;
34
35use super::chain::{FilterResult, FrameEnvelope};
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
46pub enum QueuePriority {
47 Low = 0,
49 Normal = 1,
51 High = 2,
53}
54
55impl QueuePriority {
56 pub fn classify(message_code: MessageCode) -> Self {
58 match message_code {
59 MessageCode::LDataCon
61 | MessageCode::LDataReq | MessageCode::MResetInd
63 | MessageCode::MResetReq => Self::High,
64
65 MessageCode::LDataInd
67 | MessageCode::MPropReadReq
68 | MessageCode::MPropWriteReq
69 | MessageCode::MPropReadCon
70 | MessageCode::MPropWriteCon => Self::Normal,
71
72 MessageCode::LBusmonInd
74 | MessageCode::LRawReq
75 | MessageCode::LRawCon
76 | MessageCode::LRawInd => Self::Low,
77 }
78 }
79
80 pub fn all_descending() -> &'static [QueuePriority] {
82 &[Self::High, Self::Normal, Self::Low]
83 }
84}
85
86impl fmt::Display for QueuePriority {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 match self {
89 Self::Low => write!(f, "Low"),
90 Self::Normal => write!(f, "Normal"),
91 Self::High => write!(f, "High"),
92 }
93 }
94}
95
96#[derive(Debug)]
102struct ChannelQueueState {
103 waiting_for_ack: bool,
105 queues: [VecDeque<FrameEnvelope>; 3],
107}
108
109impl ChannelQueueState {
110 fn new() -> Self {
111 Self {
112 waiting_for_ack: false,
113 queues: [
114 VecDeque::new(), VecDeque::new(), VecDeque::new(), ],
118 }
119 }
120
121 fn queue_mut(&mut self, priority: QueuePriority) -> &mut VecDeque<FrameEnvelope> {
123 &mut self.queues[priority as usize]
124 }
125
126 fn total_queued(&self) -> usize {
128 self.queues.iter().map(|q| q.len()).sum()
129 }
130
131 fn drain(&mut self, max_count: usize) -> Vec<FrameEnvelope> {
133 let mut result = Vec::with_capacity(max_count);
134
135 for priority in QueuePriority::all_descending() {
136 let queue = &mut self.queues[*priority as usize];
137 while result.len() < max_count {
138 match queue.pop_front() {
139 Some(envelope) => result.push(envelope),
140 None => break,
141 }
142 }
143 if result.len() >= max_count {
144 break;
145 }
146 }
147
148 result
149 }
150
151 fn evict_lowest(&mut self) -> Option<QueuePriority> {
154 for &priority in &[QueuePriority::Low, QueuePriority::Normal] {
156 let queue = &mut self.queues[priority as usize];
157 if queue.pop_front().is_some() {
158 return Some(priority);
159 }
160 }
161 None
162 }
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct QueueFilterConfig {
172 #[serde(default = "default_true")]
174 pub enabled: bool,
175
176 #[serde(default = "default_max_queue_depth")]
178 pub max_queue_depth: usize,
179
180 #[serde(default = "default_true")]
183 pub backpressure_enabled: bool,
184
185 #[serde(default = "default_true")]
188 pub priority_eviction_enabled: bool,
189
190 #[serde(default = "default_max_total_frames")]
193 pub max_total_frames: usize,
194}
195
196fn default_true() -> bool {
197 true
198}
199
200fn default_max_queue_depth() -> usize {
201 64
202}
203
204fn default_max_total_frames() -> usize {
205 1024
206}
207
208impl Default for QueueFilterConfig {
209 fn default() -> Self {
210 Self {
211 enabled: true,
212 max_queue_depth: default_max_queue_depth(),
213 backpressure_enabled: true,
214 priority_eviction_enabled: true,
215 max_total_frames: default_max_total_frames(),
216 }
217 }
218}
219
220impl QueueFilterConfig {
221 pub fn validate(&self) -> Result<(), String> {
223 if self.max_queue_depth == 0 {
224 return Err("QueueFilter max_queue_depth must be > 0".to_string());
225 }
226 Ok(())
227 }
228}
229
230#[derive(Debug, Default)]
236pub struct QueueFilterStats {
237 pub direct_pass: AtomicU64,
239 pub queued_frames: AtomicU64,
241 pub dropped_full: AtomicU64,
243 pub evicted_frames: AtomicU64,
245 pub drained_frames: AtomicU64,
247 pub high_priority: AtomicU64,
249 pub normal_priority: AtomicU64,
251 pub low_priority: AtomicU64,
253}
254
255#[derive(Debug, Clone, Copy, PartialEq, Eq)]
257pub struct QueueFilterStatsSnapshot {
258 pub direct_pass: u64,
259 pub queued_frames: u64,
260 pub dropped_full: u64,
261 pub evicted_frames: u64,
262 pub drained_frames: u64,
263 pub high_priority: u64,
264 pub normal_priority: u64,
265 pub low_priority: u64,
266}
267
268pub struct QueueFilter {
282 config: QueueFilterConfig,
283 channels: RwLock<HashMap<u8, ChannelQueueState>>,
285 stats: QueueFilterStats,
286}
287
288impl QueueFilter {
289 pub fn new(config: QueueFilterConfig) -> Self {
291 Self {
292 config,
293 channels: RwLock::new(HashMap::new()),
294 stats: QueueFilterStats::default(),
295 }
296 }
297
298 pub fn process_send(&self, envelope: &mut FrameEnvelope) -> FilterResult {
304 if !self.config.enabled {
305 return FilterResult::pass();
306 }
307
308 let priority = QueuePriority::classify(envelope.cemi.message_code);
310 envelope.priority = priority;
311
312 match priority {
314 QueuePriority::High => self.stats.high_priority.fetch_add(1, Ordering::Relaxed),
315 QueuePriority::Normal => self.stats.normal_priority.fetch_add(1, Ordering::Relaxed),
316 QueuePriority::Low => self.stats.low_priority.fetch_add(1, Ordering::Relaxed),
317 };
318
319 if self.config.backpressure_enabled {
321 let mut channels = self.channels.write();
322 let total_pending: usize = channels.values().map(|cs| cs.total_queued()).sum();
323
324 let channel_state = channels
325 .entry(envelope.channel_id)
326 .or_insert_with(ChannelQueueState::new);
327
328 if channel_state.waiting_for_ack {
329 return self.enqueue_frame(channel_state, envelope.clone(), total_pending);
331 }
332 }
333
334 self.stats.direct_pass.fetch_add(1, Ordering::Relaxed);
335 FilterResult::pass()
336 }
337
338 pub fn process_recv(&self, _envelope: &FrameEnvelope) -> FilterResult {
343 FilterResult::pass()
344 }
345
346 fn enqueue_frame(
351 &self,
352 channel_state: &mut ChannelQueueState,
353 envelope: FrameEnvelope,
354 total_pending: usize,
355 ) -> FilterResult {
356 let priority = envelope.priority;
357 let queue = channel_state.queue_mut(priority);
358
359 if queue.len() >= self.config.max_queue_depth {
361 if self.config.priority_eviction_enabled {
362 if let Some(evicted_priority) = channel_state.evict_lowest() {
364 self.stats.evicted_frames.fetch_add(1, Ordering::Relaxed);
365 debug!(
366 channel_id = envelope.channel_id,
367 evicted_priority = %evicted_priority,
368 enqueue_priority = %priority,
369 "QueueFilter: evicted lower-priority frame to make room"
370 );
371 } else {
372 self.stats.dropped_full.fetch_add(1, Ordering::Relaxed);
374 return FilterResult::Dropped {
375 reason: format!(
376 "QueueFilter: {} queue full ({} frames), no lower priority to evict",
377 priority,
378 self.config.max_queue_depth,
379 ),
380 };
381 }
382 } else {
383 self.stats.dropped_full.fetch_add(1, Ordering::Relaxed);
385 return FilterResult::Dropped {
386 reason: format!(
387 "QueueFilter: {} queue full ({} frames)",
388 priority,
389 self.config.max_queue_depth,
390 ),
391 };
392 }
393 }
394
395 if self.config.max_total_frames > 0 && total_pending >= self.config.max_total_frames {
397 self.stats.dropped_full.fetch_add(1, Ordering::Relaxed);
398 return FilterResult::Dropped {
399 reason: format!(
400 "QueueFilter: total frame limit reached ({} >= {})",
401 total_pending, self.config.max_total_frames,
402 ),
403 };
404 }
405
406 let queue = channel_state.queue_mut(priority);
408 queue.push_back(envelope);
409 self.stats.queued_frames.fetch_add(1, Ordering::Relaxed);
410
411 trace!(
412 priority = %priority,
413 queue_depth = queue.len(),
414 "QueueFilter: frame queued"
415 );
416
417 FilterResult::Queued
418 }
419
420 pub fn set_waiting_for_ack(&self, channel_id: u8, waiting: bool) {
422 if !self.config.enabled || !self.config.backpressure_enabled {
423 return;
424 }
425
426 let mut channels = self.channels.write();
427 let channel_state = channels
428 .entry(channel_id)
429 .or_insert_with(ChannelQueueState::new);
430 channel_state.waiting_for_ack = waiting;
431
432 trace!(
433 channel_id,
434 waiting_for_ack = waiting,
435 "QueueFilter: backpressure state changed"
436 );
437 }
438
439 pub fn on_ack_received(&self, channel_id: u8) {
443 if !self.config.enabled {
444 return;
445 }
446
447 let mut channels = self.channels.write();
448 if let Some(channel_state) = channels.get_mut(&channel_id) {
449 channel_state.waiting_for_ack = false;
450 trace!(
451 channel_id,
452 pending = channel_state.total_queued(),
453 "QueueFilter: ACK received, backpressure released"
454 );
455 }
456 }
457
458 pub fn on_send_error(&self, channel_id: u8) {
462 trace!(channel_id, "QueueFilter: send error recorded");
463 }
464
465 pub fn has_pending(&self, channel_id: u8) -> bool {
467 let channels = self.channels.read();
468 channels
469 .get(&channel_id)
470 .map(|cs| cs.total_queued() > 0)
471 .unwrap_or(false)
472 }
473
474 pub fn total_pending(&self) -> usize {
476 let channels = self.channels.read();
477 channels.values().map(|cs| cs.total_queued()).sum()
478 }
479
480 pub fn drain(&self, channel_id: u8, max_count: usize) -> Vec<FrameEnvelope> {
484 if !self.config.enabled {
485 return Vec::new();
486 }
487
488 let mut channels = self.channels.write();
489 let result = match channels.get_mut(&channel_id) {
490 Some(channel_state) => {
491 let drained = channel_state.drain(max_count);
492 self.stats
493 .drained_frames
494 .fetch_add(drained.len() as u64, Ordering::Relaxed);
495
496 trace!(
497 channel_id,
498 drained_count = drained.len(),
499 remaining = channel_state.total_queued(),
500 "QueueFilter: frames drained"
501 );
502
503 drained
504 }
505 None => Vec::new(),
506 };
507
508 result
509 }
510
511 pub fn pending_by_priority(&self, channel_id: u8) -> [usize; 3] {
513 let channels = self.channels.read();
514 match channels.get(&channel_id) {
515 Some(cs) => [
516 cs.queues[0].len(), cs.queues[1].len(), cs.queues[2].len(), ],
520 None => [0, 0, 0],
521 }
522 }
523
524 pub fn clear_channel(&self, channel_id: u8) {
526 let mut channels = self.channels.write();
527 channels.remove(&channel_id);
528 }
529
530 pub fn stats_snapshot(&self) -> QueueFilterStatsSnapshot {
532 QueueFilterStatsSnapshot {
533 direct_pass: self.stats.direct_pass.load(Ordering::Relaxed),
534 queued_frames: self.stats.queued_frames.load(Ordering::Relaxed),
535 dropped_full: self.stats.dropped_full.load(Ordering::Relaxed),
536 evicted_frames: self.stats.evicted_frames.load(Ordering::Relaxed),
537 drained_frames: self.stats.drained_frames.load(Ordering::Relaxed),
538 high_priority: self.stats.high_priority.load(Ordering::Relaxed),
539 normal_priority: self.stats.normal_priority.load(Ordering::Relaxed),
540 low_priority: self.stats.low_priority.load(Ordering::Relaxed),
541 }
542 }
543}
544
545impl fmt::Debug for QueueFilter {
546 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
547 f.debug_struct("QueueFilter")
548 .field("enabled", &self.config.enabled)
549 .field("max_queue_depth", &self.config.max_queue_depth)
550 .field("backpressure_enabled", &self.config.backpressure_enabled)
551 .field("total_pending", &self.total_pending())
552 .finish()
553 }
554}
555
556#[cfg(test)]
561mod tests {
562 use super::*;
563 use crate::address::{GroupAddress, IndividualAddress};
564 use crate::cemi::CemiFrame;
565
566 fn make_envelope(channel_id: u8) -> FrameEnvelope {
567 let cemi = CemiFrame::group_value_write(
568 IndividualAddress::new(1, 1, 1),
569 GroupAddress::three_level(1, 0, 1),
570 vec![0x01],
571 );
572 FrameEnvelope::new(cemi, channel_id, "192.168.1.100:3671".parse().unwrap())
573 }
574
575 fn make_confirmation_envelope(channel_id: u8) -> FrameEnvelope {
576 let mut cemi = CemiFrame::group_value_write(
577 IndividualAddress::new(1, 1, 1),
578 GroupAddress::three_level(1, 0, 1),
579 vec![0x01],
580 );
581 cemi.message_code = MessageCode::LDataCon;
582 FrameEnvelope::new(cemi, channel_id, "192.168.1.100:3671".parse().unwrap())
583 }
584
585 fn make_busmon_envelope(channel_id: u8) -> FrameEnvelope {
586 let cemi = CemiFrame::bus_monitor_indication(&[0x11, 0x00, 0x00], 0x00, 0);
587 FrameEnvelope::new(cemi, channel_id, "192.168.1.100:3671".parse().unwrap())
588 }
589
590 #[test]
591 fn test_queue_priority_classify() {
592 assert_eq!(
593 QueuePriority::classify(MessageCode::LDataCon),
594 QueuePriority::High
595 );
596 assert_eq!(
597 QueuePriority::classify(MessageCode::MResetInd),
598 QueuePriority::High
599 );
600 assert_eq!(
601 QueuePriority::classify(MessageCode::LDataInd),
602 QueuePriority::Normal
603 );
604 assert_eq!(
605 QueuePriority::classify(MessageCode::MPropReadReq),
606 QueuePriority::Normal
607 );
608 assert_eq!(
609 QueuePriority::classify(MessageCode::LBusmonInd),
610 QueuePriority::Low
611 );
612 assert_eq!(
613 QueuePriority::classify(MessageCode::LRawReq),
614 QueuePriority::Low
615 );
616 }
617
618 #[test]
619 fn test_queue_priority_ordering() {
620 assert!(QueuePriority::High > QueuePriority::Normal);
621 assert!(QueuePriority::Normal > QueuePriority::Low);
622 }
623
624 #[test]
625 fn test_queue_priority_display() {
626 assert_eq!(QueuePriority::High.to_string(), "High");
627 assert_eq!(QueuePriority::Normal.to_string(), "Normal");
628 assert_eq!(QueuePriority::Low.to_string(), "Low");
629 }
630
631 #[test]
632 fn test_queue_filter_disabled() {
633 let mut config = QueueFilterConfig::default();
634 config.enabled = false;
635 let filter = QueueFilter::new(config);
636
637 let mut envelope = make_envelope(1);
638 let result = filter.process_send(&mut envelope);
639 assert!(result.should_continue());
640 }
641
642 #[test]
643 fn test_queue_filter_passthrough_no_backpressure() {
644 let config = QueueFilterConfig::default();
645 let filter = QueueFilter::new(config);
646
647 let mut envelope = make_envelope(1);
648 let result = filter.process_send(&mut envelope);
649 assert!(result.should_continue());
650
651 let stats = filter.stats_snapshot();
652 assert_eq!(stats.direct_pass, 1);
653 assert_eq!(stats.queued_frames, 0);
654 }
655
656 #[test]
657 fn test_queue_filter_backpressure_queues_frames() {
658 let config = QueueFilterConfig::default();
659 let filter = QueueFilter::new(config);
660
661 filter.set_waiting_for_ack(1, true);
663
664 let mut envelope = make_envelope(1);
665 let result = filter.process_send(&mut envelope);
666 assert!(matches!(result, FilterResult::Queued));
667
668 let stats = filter.stats_snapshot();
669 assert_eq!(stats.queued_frames, 1);
670 assert!(filter.has_pending(1));
671 assert_eq!(filter.total_pending(), 1);
672 }
673
674 #[test]
675 fn test_queue_filter_backpressure_release() {
676 let config = QueueFilterConfig::default();
677 let filter = QueueFilter::new(config);
678
679 filter.set_waiting_for_ack(1, true);
681
682 for _ in 0..3 {
684 let mut envelope = make_envelope(1);
685 filter.process_send(&mut envelope);
686 }
687 assert_eq!(filter.total_pending(), 3);
688
689 filter.on_ack_received(1);
691
692 let drained = filter.drain(1, 10);
694 assert_eq!(drained.len(), 3);
695 assert_eq!(filter.total_pending(), 0);
696 }
697
698 #[test]
699 fn test_queue_filter_priority_ordering() {
700 let config = QueueFilterConfig::default();
701 let filter = QueueFilter::new(config);
702
703 filter.set_waiting_for_ack(1, true);
705
706 let mut low_env = make_busmon_envelope(1);
708 filter.process_send(&mut low_env);
709
710 let mut normal_env = make_envelope(1);
711 filter.process_send(&mut normal_env);
712
713 let mut high_env = make_confirmation_envelope(1);
714 filter.process_send(&mut high_env);
715
716 let drained = filter.drain(1, 10);
718 assert_eq!(drained.len(), 3);
719 assert_eq!(drained[0].priority, QueuePriority::High);
720 assert_eq!(drained[1].priority, QueuePriority::Normal);
721 assert_eq!(drained[2].priority, QueuePriority::Low);
722 }
723
724 #[test]
725 fn test_queue_filter_queue_full_drop() {
726 let mut config = QueueFilterConfig::default();
727 config.max_queue_depth = 2;
728 config.priority_eviction_enabled = false; let filter = QueueFilter::new(config);
730
731 filter.set_waiting_for_ack(1, true);
732
733 for _ in 0..2 {
735 let mut envelope = make_envelope(1);
736 let result = filter.process_send(&mut envelope);
737 assert!(matches!(result, FilterResult::Queued));
738 }
739
740 let mut envelope = make_envelope(1);
742 let result = filter.process_send(&mut envelope);
743 assert!(matches!(result, FilterResult::Dropped { .. }));
744
745 let stats = filter.stats_snapshot();
746 assert_eq!(stats.dropped_full, 1);
747 }
748
749 #[test]
750 fn test_queue_filter_priority_eviction() {
751 let mut config = QueueFilterConfig::default();
752 config.max_queue_depth = 1;
753 config.priority_eviction_enabled = true;
754 let filter = QueueFilter::new(config);
755
756 filter.set_waiting_for_ack(1, true);
757
758 let mut low_env = make_busmon_envelope(1);
760 let result = filter.process_send(&mut low_env);
761 assert!(matches!(result, FilterResult::Queued));
762
763 let mut normal_env1 = make_envelope(1);
765 let result = filter.process_send(&mut normal_env1);
766 assert!(matches!(result, FilterResult::Queued));
767
768 let counts = filter.pending_by_priority(1);
770 assert_eq!(counts[0], 1); assert_eq!(counts[1], 1); let mut normal_env2 = make_envelope(1);
776 let result = filter.process_send(&mut normal_env2);
777 assert!(matches!(result, FilterResult::Queued));
778
779 let counts = filter.pending_by_priority(1);
781 assert_eq!(counts[0], 0); assert_eq!(counts[1], 2); let stats = filter.stats_snapshot();
785 assert_eq!(stats.evicted_frames, 1);
786 }
787
788 #[test]
789 fn test_queue_filter_multi_channel() {
790 let config = QueueFilterConfig::default();
791 let filter = QueueFilter::new(config);
792
793 filter.set_waiting_for_ack(1, true);
794 filter.set_waiting_for_ack(2, true);
795
796 let mut env1 = make_envelope(1);
798 filter.process_send(&mut env1);
799 let mut env2 = make_envelope(2);
800 filter.process_send(&mut env2);
801
802 assert_eq!(filter.total_pending(), 2);
803 assert!(filter.has_pending(1));
804 assert!(filter.has_pending(2));
805
806 let drained = filter.drain(1, 10);
808 assert_eq!(drained.len(), 1);
809 assert!(!filter.has_pending(1));
810 assert!(filter.has_pending(2));
811 }
812
813 #[test]
814 fn test_queue_filter_clear_channel() {
815 let config = QueueFilterConfig::default();
816 let filter = QueueFilter::new(config);
817
818 filter.set_waiting_for_ack(1, true);
819 let mut env = make_envelope(1);
820 filter.process_send(&mut env);
821
822 filter.clear_channel(1);
823 assert!(!filter.has_pending(1));
824 assert_eq!(filter.total_pending(), 0);
825 }
826
827 #[test]
828 fn test_queue_filter_pending_by_priority() {
829 let config = QueueFilterConfig::default();
830 let filter = QueueFilter::new(config);
831
832 filter.set_waiting_for_ack(1, true);
833
834 let mut env1 = make_busmon_envelope(1); let mut env2 = make_envelope(1); let mut env3 = make_confirmation_envelope(1); filter.process_send(&mut env1);
840 filter.process_send(&mut env2);
841 filter.process_send(&mut env3);
842
843 let counts = filter.pending_by_priority(1);
844 assert_eq!(counts[0], 1); assert_eq!(counts[1], 1); assert_eq!(counts[2], 1); }
848
849 #[test]
850 fn test_queue_filter_config_validate() {
851 let config = QueueFilterConfig::default();
852 assert!(config.validate().is_ok());
853
854 let mut bad_config = QueueFilterConfig::default();
855 bad_config.max_queue_depth = 0;
856 assert!(bad_config.validate().is_err());
857 }
858
859 #[test]
860 fn test_queue_filter_recv_passthrough() {
861 let config = QueueFilterConfig::default();
862 let filter = QueueFilter::new(config);
863
864 let envelope = make_envelope(1);
865 let result = filter.process_recv(&envelope);
866 assert!(result.should_continue());
867 }
868
869 #[test]
870 fn test_queue_filter_debug() {
871 let config = QueueFilterConfig::default();
872 let filter = QueueFilter::new(config);
873 let debug_str = format!("{:?}", filter);
874 assert!(debug_str.contains("QueueFilter"));
875 assert!(debug_str.contains("enabled"));
876 }
877
878 #[test]
879 fn test_queue_filter_stats() {
880 let config = QueueFilterConfig::default();
881 let filter = QueueFilter::new(config);
882
883 let mut env = make_envelope(1);
885 filter.process_send(&mut env);
886
887 filter.set_waiting_for_ack(1, true);
888 let mut env2 = make_envelope(1);
889 filter.process_send(&mut env2);
890
891 let stats = filter.stats_snapshot();
892 assert_eq!(stats.direct_pass, 1);
893 assert_eq!(stats.queued_frames, 1);
894 }
895
896 #[test]
897 fn test_max_total_frames_limit() {
898 let mut config = QueueFilterConfig::default();
899 config.max_total_frames = 2;
900 let filter = QueueFilter::new(config);
901
902 filter.set_waiting_for_ack(1, true);
903
904 for _ in 0..2 {
906 let mut env = make_envelope(1);
907 let result = filter.process_send(&mut env);
908 assert!(matches!(result, FilterResult::Queued));
909 }
910
911 let mut env = make_envelope(1);
913 let result = filter.process_send(&mut env);
914 assert!(matches!(result, FilterResult::Dropped { .. }));
915 }
916}