use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use tracing::{debug, trace};
use crate::cemi::MessageCode;
use super::chain::{FilterResult, FrameEnvelope};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum QueuePriority {
Low = 0,
Normal = 1,
High = 2,
}
impl QueuePriority {
pub fn classify(message_code: MessageCode) -> Self {
match message_code {
MessageCode::LDataCon
| MessageCode::LDataReq | MessageCode::MResetInd
| MessageCode::MResetReq => Self::High,
MessageCode::LDataInd
| MessageCode::MPropReadReq
| MessageCode::MPropWriteReq
| MessageCode::MPropReadCon
| MessageCode::MPropWriteCon => Self::Normal,
MessageCode::LBusmonInd
| MessageCode::LRawReq
| MessageCode::LRawCon
| MessageCode::LRawInd => Self::Low,
}
}
pub fn all_descending() -> &'static [QueuePriority] {
&[Self::High, Self::Normal, Self::Low]
}
}
impl fmt::Display for QueuePriority {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Low => write!(f, "Low"),
Self::Normal => write!(f, "Normal"),
Self::High => write!(f, "High"),
}
}
}
#[derive(Debug)]
struct ChannelQueueState {
waiting_for_ack: bool,
queues: [VecDeque<FrameEnvelope>; 3],
}
impl ChannelQueueState {
fn new() -> Self {
Self {
waiting_for_ack: false,
queues: [
VecDeque::new(), VecDeque::new(), VecDeque::new(), ],
}
}
fn queue_mut(&mut self, priority: QueuePriority) -> &mut VecDeque<FrameEnvelope> {
&mut self.queues[priority as usize]
}
fn total_queued(&self) -> usize {
self.queues.iter().map(|q| q.len()).sum()
}
fn drain(&mut self, max_count: usize) -> Vec<FrameEnvelope> {
let mut result = Vec::with_capacity(max_count);
for priority in QueuePriority::all_descending() {
let queue = &mut self.queues[*priority as usize];
while result.len() < max_count {
match queue.pop_front() {
Some(envelope) => result.push(envelope),
None => break,
}
}
if result.len() >= max_count {
break;
}
}
result
}
fn evict_lowest(&mut self) -> Option<QueuePriority> {
for &priority in &[QueuePriority::Low, QueuePriority::Normal] {
let queue = &mut self.queues[priority as usize];
if queue.pop_front().is_some() {
return Some(priority);
}
}
None
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueFilterConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default = "default_max_queue_depth")]
pub max_queue_depth: usize,
#[serde(default = "default_true")]
pub backpressure_enabled: bool,
#[serde(default = "default_true")]
pub priority_eviction_enabled: bool,
#[serde(default = "default_max_total_frames")]
pub max_total_frames: usize,
}
fn default_true() -> bool {
true
}
fn default_max_queue_depth() -> usize {
64
}
fn default_max_total_frames() -> usize {
1024
}
impl Default for QueueFilterConfig {
fn default() -> Self {
Self {
enabled: true,
max_queue_depth: default_max_queue_depth(),
backpressure_enabled: true,
priority_eviction_enabled: true,
max_total_frames: default_max_total_frames(),
}
}
}
impl QueueFilterConfig {
pub fn validate(&self) -> Result<(), String> {
if self.max_queue_depth == 0 {
return Err("QueueFilter max_queue_depth must be > 0".to_string());
}
Ok(())
}
}
#[derive(Debug, Default)]
pub struct QueueFilterStats {
pub direct_pass: AtomicU64,
pub queued_frames: AtomicU64,
pub dropped_full: AtomicU64,
pub evicted_frames: AtomicU64,
pub drained_frames: AtomicU64,
pub high_priority: AtomicU64,
pub normal_priority: AtomicU64,
pub low_priority: AtomicU64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct QueueFilterStatsSnapshot {
pub direct_pass: u64,
pub queued_frames: u64,
pub dropped_full: u64,
pub evicted_frames: u64,
pub drained_frames: u64,
pub high_priority: u64,
pub normal_priority: u64,
pub low_priority: u64,
}
pub struct QueueFilter {
config: QueueFilterConfig,
channels: RwLock<HashMap<u8, ChannelQueueState>>,
stats: QueueFilterStats,
}
impl QueueFilter {
pub fn new(config: QueueFilterConfig) -> Self {
Self {
config,
channels: RwLock::new(HashMap::new()),
stats: QueueFilterStats::default(),
}
}
pub fn process_send(&self, envelope: &mut FrameEnvelope) -> FilterResult {
if !self.config.enabled {
return FilterResult::pass();
}
let priority = QueuePriority::classify(envelope.cemi.message_code);
envelope.priority = priority;
match priority {
QueuePriority::High => self.stats.high_priority.fetch_add(1, Ordering::Relaxed),
QueuePriority::Normal => self.stats.normal_priority.fetch_add(1, Ordering::Relaxed),
QueuePriority::Low => self.stats.low_priority.fetch_add(1, Ordering::Relaxed),
};
if self.config.backpressure_enabled {
let mut channels = self.channels.write();
let total_pending: usize = channels.values().map(|cs| cs.total_queued()).sum();
let channel_state = channels
.entry(envelope.channel_id)
.or_insert_with(ChannelQueueState::new);
if channel_state.waiting_for_ack {
return self.enqueue_frame(channel_state, envelope.clone(), total_pending);
}
}
self.stats.direct_pass.fetch_add(1, Ordering::Relaxed);
FilterResult::pass()
}
pub fn process_recv(&self, _envelope: &FrameEnvelope) -> FilterResult {
FilterResult::pass()
}
fn enqueue_frame(
&self,
channel_state: &mut ChannelQueueState,
envelope: FrameEnvelope,
total_pending: usize,
) -> FilterResult {
let priority = envelope.priority;
let queue = channel_state.queue_mut(priority);
if queue.len() >= self.config.max_queue_depth {
if self.config.priority_eviction_enabled {
if let Some(evicted_priority) = channel_state.evict_lowest() {
self.stats.evicted_frames.fetch_add(1, Ordering::Relaxed);
debug!(
channel_id = envelope.channel_id,
evicted_priority = %evicted_priority,
enqueue_priority = %priority,
"QueueFilter: evicted lower-priority frame to make room"
);
} else {
self.stats.dropped_full.fetch_add(1, Ordering::Relaxed);
return FilterResult::Dropped {
reason: format!(
"QueueFilter: {} queue full ({} frames), no lower priority to evict",
priority, self.config.max_queue_depth,
),
};
}
} else {
self.stats.dropped_full.fetch_add(1, Ordering::Relaxed);
return FilterResult::Dropped {
reason: format!(
"QueueFilter: {} queue full ({} frames)",
priority, self.config.max_queue_depth,
),
};
}
}
if self.config.max_total_frames > 0 && total_pending >= self.config.max_total_frames {
self.stats.dropped_full.fetch_add(1, Ordering::Relaxed);
return FilterResult::Dropped {
reason: format!(
"QueueFilter: total frame limit reached ({} >= {})",
total_pending, self.config.max_total_frames,
),
};
}
let queue = channel_state.queue_mut(priority);
queue.push_back(envelope);
self.stats.queued_frames.fetch_add(1, Ordering::Relaxed);
trace!(
priority = %priority,
queue_depth = queue.len(),
"QueueFilter: frame queued"
);
FilterResult::Queued
}
pub fn set_waiting_for_ack(&self, channel_id: u8, waiting: bool) {
if !self.config.enabled || !self.config.backpressure_enabled {
return;
}
let mut channels = self.channels.write();
let channel_state = channels
.entry(channel_id)
.or_insert_with(ChannelQueueState::new);
channel_state.waiting_for_ack = waiting;
trace!(
channel_id,
waiting_for_ack = waiting,
"QueueFilter: backpressure state changed"
);
}
pub fn on_ack_received(&self, channel_id: u8) {
if !self.config.enabled {
return;
}
let mut channels = self.channels.write();
if let Some(channel_state) = channels.get_mut(&channel_id) {
channel_state.waiting_for_ack = false;
trace!(
channel_id,
pending = channel_state.total_queued(),
"QueueFilter: ACK received, backpressure released"
);
}
}
pub fn on_send_error(&self, channel_id: u8) {
trace!(channel_id, "QueueFilter: send error recorded");
}
pub fn has_pending(&self, channel_id: u8) -> bool {
let channels = self.channels.read();
channels
.get(&channel_id)
.map(|cs| cs.total_queued() > 0)
.unwrap_or(false)
}
pub fn total_pending(&self) -> usize {
let channels = self.channels.read();
channels.values().map(|cs| cs.total_queued()).sum()
}
pub fn drain(&self, channel_id: u8, max_count: usize) -> Vec<FrameEnvelope> {
if !self.config.enabled {
return Vec::new();
}
let mut channels = self.channels.write();
let result = match channels.get_mut(&channel_id) {
Some(channel_state) => {
let drained = channel_state.drain(max_count);
self.stats
.drained_frames
.fetch_add(drained.len() as u64, Ordering::Relaxed);
trace!(
channel_id,
drained_count = drained.len(),
remaining = channel_state.total_queued(),
"QueueFilter: frames drained"
);
drained
}
None => Vec::new(),
};
result
}
pub fn pending_by_priority(&self, channel_id: u8) -> [usize; 3] {
let channels = self.channels.read();
match channels.get(&channel_id) {
Some(cs) => [
cs.queues[0].len(), cs.queues[1].len(), cs.queues[2].len(), ],
None => [0, 0, 0],
}
}
pub fn clear_channel(&self, channel_id: u8) {
let mut channels = self.channels.write();
channels.remove(&channel_id);
}
pub fn stats_snapshot(&self) -> QueueFilterStatsSnapshot {
QueueFilterStatsSnapshot {
direct_pass: self.stats.direct_pass.load(Ordering::Relaxed),
queued_frames: self.stats.queued_frames.load(Ordering::Relaxed),
dropped_full: self.stats.dropped_full.load(Ordering::Relaxed),
evicted_frames: self.stats.evicted_frames.load(Ordering::Relaxed),
drained_frames: self.stats.drained_frames.load(Ordering::Relaxed),
high_priority: self.stats.high_priority.load(Ordering::Relaxed),
normal_priority: self.stats.normal_priority.load(Ordering::Relaxed),
low_priority: self.stats.low_priority.load(Ordering::Relaxed),
}
}
}
impl fmt::Debug for QueueFilter {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueueFilter")
.field("enabled", &self.config.enabled)
.field("max_queue_depth", &self.config.max_queue_depth)
.field("backpressure_enabled", &self.config.backpressure_enabled)
.field("total_pending", &self.total_pending())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::address::{GroupAddress, IndividualAddress};
use crate::cemi::CemiFrame;
fn make_envelope(channel_id: u8) -> FrameEnvelope {
let cemi = CemiFrame::group_value_write(
IndividualAddress::new(1, 1, 1),
GroupAddress::three_level(1, 0, 1),
vec![0x01],
);
FrameEnvelope::new(cemi, channel_id, "192.168.1.100:3671".parse().unwrap())
}
fn make_confirmation_envelope(channel_id: u8) -> FrameEnvelope {
let mut cemi = CemiFrame::group_value_write(
IndividualAddress::new(1, 1, 1),
GroupAddress::three_level(1, 0, 1),
vec![0x01],
);
cemi.message_code = MessageCode::LDataCon;
FrameEnvelope::new(cemi, channel_id, "192.168.1.100:3671".parse().unwrap())
}
fn make_busmon_envelope(channel_id: u8) -> FrameEnvelope {
let cemi = CemiFrame::bus_monitor_indication(&[0x11, 0x00, 0x00], 0x00, 0);
FrameEnvelope::new(cemi, channel_id, "192.168.1.100:3671".parse().unwrap())
}
#[test]
fn test_queue_priority_classify() {
assert_eq!(
QueuePriority::classify(MessageCode::LDataCon),
QueuePriority::High
);
assert_eq!(
QueuePriority::classify(MessageCode::MResetInd),
QueuePriority::High
);
assert_eq!(
QueuePriority::classify(MessageCode::LDataInd),
QueuePriority::Normal
);
assert_eq!(
QueuePriority::classify(MessageCode::MPropReadReq),
QueuePriority::Normal
);
assert_eq!(
QueuePriority::classify(MessageCode::LBusmonInd),
QueuePriority::Low
);
assert_eq!(
QueuePriority::classify(MessageCode::LRawReq),
QueuePriority::Low
);
}
#[test]
fn test_queue_priority_ordering() {
assert!(QueuePriority::High > QueuePriority::Normal);
assert!(QueuePriority::Normal > QueuePriority::Low);
}
#[test]
fn test_queue_priority_display() {
assert_eq!(QueuePriority::High.to_string(), "High");
assert_eq!(QueuePriority::Normal.to_string(), "Normal");
assert_eq!(QueuePriority::Low.to_string(), "Low");
}
#[test]
fn test_queue_filter_disabled() {
let mut config = QueueFilterConfig::default();
config.enabled = false;
let filter = QueueFilter::new(config);
let mut envelope = make_envelope(1);
let result = filter.process_send(&mut envelope);
assert!(result.should_continue());
}
#[test]
fn test_queue_filter_passthrough_no_backpressure() {
let config = QueueFilterConfig::default();
let filter = QueueFilter::new(config);
let mut envelope = make_envelope(1);
let result = filter.process_send(&mut envelope);
assert!(result.should_continue());
let stats = filter.stats_snapshot();
assert_eq!(stats.direct_pass, 1);
assert_eq!(stats.queued_frames, 0);
}
#[test]
fn test_queue_filter_backpressure_queues_frames() {
let config = QueueFilterConfig::default();
let filter = QueueFilter::new(config);
filter.set_waiting_for_ack(1, true);
let mut envelope = make_envelope(1);
let result = filter.process_send(&mut envelope);
assert!(matches!(result, FilterResult::Queued));
let stats = filter.stats_snapshot();
assert_eq!(stats.queued_frames, 1);
assert!(filter.has_pending(1));
assert_eq!(filter.total_pending(), 1);
}
#[test]
fn test_queue_filter_backpressure_release() {
let config = QueueFilterConfig::default();
let filter = QueueFilter::new(config);
filter.set_waiting_for_ack(1, true);
for _ in 0..3 {
let mut envelope = make_envelope(1);
filter.process_send(&mut envelope);
}
assert_eq!(filter.total_pending(), 3);
filter.on_ack_received(1);
let drained = filter.drain(1, 10);
assert_eq!(drained.len(), 3);
assert_eq!(filter.total_pending(), 0);
}
#[test]
fn test_queue_filter_priority_ordering() {
let config = QueueFilterConfig::default();
let filter = QueueFilter::new(config);
filter.set_waiting_for_ack(1, true);
let mut low_env = make_busmon_envelope(1);
filter.process_send(&mut low_env);
let mut normal_env = make_envelope(1);
filter.process_send(&mut normal_env);
let mut high_env = make_confirmation_envelope(1);
filter.process_send(&mut high_env);
let drained = filter.drain(1, 10);
assert_eq!(drained.len(), 3);
assert_eq!(drained[0].priority, QueuePriority::High);
assert_eq!(drained[1].priority, QueuePriority::Normal);
assert_eq!(drained[2].priority, QueuePriority::Low);
}
#[test]
fn test_queue_filter_queue_full_drop() {
let mut config = QueueFilterConfig::default();
config.max_queue_depth = 2;
config.priority_eviction_enabled = false; let filter = QueueFilter::new(config);
filter.set_waiting_for_ack(1, true);
for _ in 0..2 {
let mut envelope = make_envelope(1);
let result = filter.process_send(&mut envelope);
assert!(matches!(result, FilterResult::Queued));
}
let mut envelope = make_envelope(1);
let result = filter.process_send(&mut envelope);
assert!(matches!(result, FilterResult::Dropped { .. }));
let stats = filter.stats_snapshot();
assert_eq!(stats.dropped_full, 1);
}
#[test]
fn test_queue_filter_priority_eviction() {
let mut config = QueueFilterConfig::default();
config.max_queue_depth = 1;
config.priority_eviction_enabled = true;
let filter = QueueFilter::new(config);
filter.set_waiting_for_ack(1, true);
let mut low_env = make_busmon_envelope(1);
let result = filter.process_send(&mut low_env);
assert!(matches!(result, FilterResult::Queued));
let mut normal_env1 = make_envelope(1);
let result = filter.process_send(&mut normal_env1);
assert!(matches!(result, FilterResult::Queued));
let counts = filter.pending_by_priority(1);
assert_eq!(counts[0], 1); assert_eq!(counts[1], 1);
let mut normal_env2 = make_envelope(1);
let result = filter.process_send(&mut normal_env2);
assert!(matches!(result, FilterResult::Queued));
let counts = filter.pending_by_priority(1);
assert_eq!(counts[0], 0); assert_eq!(counts[1], 2);
let stats = filter.stats_snapshot();
assert_eq!(stats.evicted_frames, 1);
}
#[test]
fn test_queue_filter_multi_channel() {
let config = QueueFilterConfig::default();
let filter = QueueFilter::new(config);
filter.set_waiting_for_ack(1, true);
filter.set_waiting_for_ack(2, true);
let mut env1 = make_envelope(1);
filter.process_send(&mut env1);
let mut env2 = make_envelope(2);
filter.process_send(&mut env2);
assert_eq!(filter.total_pending(), 2);
assert!(filter.has_pending(1));
assert!(filter.has_pending(2));
let drained = filter.drain(1, 10);
assert_eq!(drained.len(), 1);
assert!(!filter.has_pending(1));
assert!(filter.has_pending(2));
}
#[test]
fn test_queue_filter_clear_channel() {
let config = QueueFilterConfig::default();
let filter = QueueFilter::new(config);
filter.set_waiting_for_ack(1, true);
let mut env = make_envelope(1);
filter.process_send(&mut env);
filter.clear_channel(1);
assert!(!filter.has_pending(1));
assert_eq!(filter.total_pending(), 0);
}
#[test]
fn test_queue_filter_pending_by_priority() {
let config = QueueFilterConfig::default();
let filter = QueueFilter::new(config);
filter.set_waiting_for_ack(1, true);
let mut env1 = make_busmon_envelope(1); let mut env2 = make_envelope(1); let mut env3 = make_confirmation_envelope(1);
filter.process_send(&mut env1);
filter.process_send(&mut env2);
filter.process_send(&mut env3);
let counts = filter.pending_by_priority(1);
assert_eq!(counts[0], 1); assert_eq!(counts[1], 1); assert_eq!(counts[2], 1); }
#[test]
fn test_queue_filter_config_validate() {
let config = QueueFilterConfig::default();
assert!(config.validate().is_ok());
let mut bad_config = QueueFilterConfig::default();
bad_config.max_queue_depth = 0;
assert!(bad_config.validate().is_err());
}
#[test]
fn test_queue_filter_recv_passthrough() {
let config = QueueFilterConfig::default();
let filter = QueueFilter::new(config);
let envelope = make_envelope(1);
let result = filter.process_recv(&envelope);
assert!(result.should_continue());
}
#[test]
fn test_queue_filter_debug() {
let config = QueueFilterConfig::default();
let filter = QueueFilter::new(config);
let debug_str = format!("{:?}", filter);
assert!(debug_str.contains("QueueFilter"));
assert!(debug_str.contains("enabled"));
}
#[test]
fn test_queue_filter_stats() {
let config = QueueFilterConfig::default();
let filter = QueueFilter::new(config);
let mut env = make_envelope(1);
filter.process_send(&mut env);
filter.set_waiting_for_ack(1, true);
let mut env2 = make_envelope(1);
filter.process_send(&mut env2);
let stats = filter.stats_snapshot();
assert_eq!(stats.direct_pass, 1);
assert_eq!(stats.queued_frames, 1);
}
#[test]
fn test_max_total_frames_limit() {
let mut config = QueueFilterConfig::default();
config.max_total_frames = 2;
let filter = QueueFilter::new(config);
filter.set_waiting_for_ack(1, true);
for _ in 0..2 {
let mut env = make_envelope(1);
let result = filter.process_send(&mut env);
assert!(matches!(result, FilterResult::Queued));
}
let mut env = make_envelope(1);
let result = filter.process_send(&mut env);
assert!(matches!(result, FilterResult::Dropped { .. }));
}
}