use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::error::QueueError;
use crate::message::CanMessage;
use super::QueueOverflowPolicy;
pub const DEFAULT_QUEUE_CAPACITY: usize = 1000;
#[derive(Debug, Clone, Default)]
pub struct QueueStats {
pub enqueued: u64,
pub dequeued: u64,
pub dropped: u64,
pub overflow_count: u64,
}
struct AtomicQueueStats {
enqueued: AtomicU64,
dequeued: AtomicU64,
dropped: AtomicU64,
overflow_count: AtomicU64,
}
impl AtomicQueueStats {
fn new() -> Self {
Self {
enqueued: AtomicU64::new(0),
dequeued: AtomicU64::new(0),
dropped: AtomicU64::new(0),
overflow_count: AtomicU64::new(0),
}
}
fn snapshot(&self) -> QueueStats {
QueueStats {
enqueued: self.enqueued.load(Ordering::Relaxed),
dequeued: self.dequeued.load(Ordering::Relaxed),
dropped: self.dropped.load(Ordering::Relaxed),
overflow_count: self.overflow_count.load(Ordering::Relaxed),
}
}
fn inc_enqueued(&self) {
self.enqueued.fetch_add(1, Ordering::Relaxed);
}
fn inc_dequeued(&self) {
self.dequeued.fetch_add(1, Ordering::Relaxed);
}
fn inc_dropped(&self) {
self.dropped.fetch_add(1, Ordering::Relaxed);
}
fn inc_overflow(&self) {
self.overflow_count.fetch_add(1, Ordering::Relaxed);
}
}
pub struct BoundedQueue {
buffer: VecDeque<CanMessage>,
capacity: usize,
policy: QueueOverflowPolicy,
stats: AtomicQueueStats,
}
impl BoundedQueue {
#[must_use]
pub fn new(capacity: usize) -> Self {
Self::with_policy(capacity, QueueOverflowPolicy::default())
}
#[must_use]
pub fn with_policy(capacity: usize, policy: QueueOverflowPolicy) -> Self {
Self {
buffer: VecDeque::with_capacity(capacity),
capacity,
policy,
stats: AtomicQueueStats::new(),
}
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn len(&self) -> usize {
self.buffer.len()
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn is_full(&self) -> bool {
self.buffer.len() >= self.capacity
}
pub fn policy(&self) -> QueueOverflowPolicy {
self.policy
}
pub fn stats(&self) -> QueueStats {
self.stats.snapshot()
}
pub fn push(&mut self, message: CanMessage) -> Result<(), QueueError> {
if self.is_full() {
self.stats.inc_overflow();
match self.policy {
QueueOverflowPolicy::DropOldest => {
#[allow(unused_variables)]
if let Some(dropped) = self.buffer.pop_front() {
self.stats.inc_dropped();
#[cfg(feature = "tracing")]
crate::log_queue_overflow!(self.policy, dropped.id().raw());
}
}
QueueOverflowPolicy::DropNewest => {
self.stats.inc_dropped();
#[cfg(feature = "tracing")]
crate::log_queue_overflow!(self.policy, message.id().raw());
return Err(QueueError::MessageDropped {
id: message.id().raw(),
reason: "Queue full, DropNewest policy".to_string(),
});
}
QueueOverflowPolicy::Block { .. } => {
return Err(QueueError::QueueFull {
capacity: self.capacity,
});
}
}
}
self.buffer.push_back(message);
self.stats.inc_enqueued();
Ok(())
}
pub fn pop(&mut self) -> Option<CanMessage> {
let msg = self.buffer.pop_front();
if msg.is_some() {
self.stats.inc_dequeued();
}
msg
}
pub fn peek(&self) -> Option<&CanMessage> {
self.buffer.front()
}
pub fn clear(&mut self) {
self.buffer.clear();
}
pub fn adjust_capacity(&mut self, new_capacity: usize) {
while self.buffer.len() > new_capacity {
match self.policy {
QueueOverflowPolicy::DropOldest | QueueOverflowPolicy::Block { .. } => {
if self.buffer.pop_front().is_some() {
self.stats.inc_dropped();
}
}
QueueOverflowPolicy::DropNewest => {
if self.buffer.pop_back().is_some() {
self.stats.inc_dropped();
}
}
}
}
self.capacity = new_capacity;
}
pub fn iter(&self) -> impl Iterator<Item = &CanMessage> {
self.buffer.iter()
}
}
impl Default for BoundedQueue {
fn default() -> Self {
Self::new(DEFAULT_QUEUE_CAPACITY)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::CanId;
fn make_test_message(id: u16) -> CanMessage {
CanMessage::new_standard(id, &[0u8; 8]).unwrap()
}
#[test]
fn test_new_queue() {
let queue = BoundedQueue::new(100);
assert_eq!(queue.capacity(), 100);
assert!(queue.is_empty());
assert!(!queue.is_full());
}
#[test]
fn test_push_pop() {
let mut queue = BoundedQueue::new(10);
let msg = make_test_message(0x123);
assert!(queue.push(msg.clone()).is_ok());
assert_eq!(queue.len(), 1);
let popped = queue.pop();
assert!(popped.is_some());
assert_eq!(popped.unwrap().id(), CanId::Standard(0x123));
assert!(queue.is_empty());
}
#[test]
fn test_drop_oldest_policy() {
let mut queue = BoundedQueue::with_policy(3, QueueOverflowPolicy::DropOldest);
queue.push(make_test_message(1)).unwrap();
queue.push(make_test_message(2)).unwrap();
queue.push(make_test_message(3)).unwrap();
assert!(queue.is_full());
queue.push(make_test_message(4)).unwrap();
assert_eq!(queue.len(), 3);
assert_eq!(queue.pop().unwrap().id(), CanId::Standard(2));
assert_eq!(queue.pop().unwrap().id(), CanId::Standard(3));
assert_eq!(queue.pop().unwrap().id(), CanId::Standard(4));
let stats = queue.stats();
assert_eq!(stats.dropped, 1);
assert_eq!(stats.overflow_count, 1);
}
#[test]
fn test_drop_newest_policy() {
let mut queue = BoundedQueue::with_policy(3, QueueOverflowPolicy::DropNewest);
queue.push(make_test_message(1)).unwrap();
queue.push(make_test_message(2)).unwrap();
queue.push(make_test_message(3)).unwrap();
let result = queue.push(make_test_message(4));
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
QueueError::MessageDropped { .. }
));
assert_eq!(queue.pop().unwrap().id(), CanId::Standard(1));
assert_eq!(queue.pop().unwrap().id(), CanId::Standard(2));
assert_eq!(queue.pop().unwrap().id(), CanId::Standard(3));
}
#[test]
fn test_block_policy_sync() {
use std::time::Duration;
let mut queue = BoundedQueue::with_policy(
2,
QueueOverflowPolicy::Block {
timeout: Duration::from_millis(100),
},
);
queue.push(make_test_message(1)).unwrap();
queue.push(make_test_message(2)).unwrap();
let result = queue.push(make_test_message(3));
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), QueueError::QueueFull { .. }));
}
#[test]
fn test_adjust_capacity() {
let mut queue = BoundedQueue::new(10);
for i in 0..5u16 {
queue.push(make_test_message(i)).unwrap();
}
queue.adjust_capacity(3);
assert_eq!(queue.capacity(), 3);
assert_eq!(queue.len(), 3);
assert_eq!(queue.pop().unwrap().id(), CanId::Standard(2));
assert_eq!(queue.pop().unwrap().id(), CanId::Standard(3));
assert_eq!(queue.pop().unwrap().id(), CanId::Standard(4));
}
#[test]
fn test_stats() {
let mut queue = BoundedQueue::with_policy(2, QueueOverflowPolicy::DropOldest);
queue.push(make_test_message(1)).unwrap();
queue.push(make_test_message(2)).unwrap();
queue.push(make_test_message(3)).unwrap(); queue.pop();
let stats = queue.stats();
assert_eq!(stats.enqueued, 3);
assert_eq!(stats.dequeued, 1);
assert_eq!(stats.dropped, 1);
assert_eq!(stats.overflow_count, 1);
}
#[test]
fn test_default_queue() {
let queue = BoundedQueue::default();
assert_eq!(queue.capacity(), DEFAULT_QUEUE_CAPACITY);
}
}