Skip to main content

canlink_hal/queue/
bounded.rs

1//! Bounded queue implementation (FR-011, FR-017)
2//!
3//! Provides a bounded message queue with configurable overflow policies.
4
5use std::collections::VecDeque;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use crate::error::QueueError;
9use crate::message::CanMessage;
10
11use super::QueueOverflowPolicy;
12
13/// Default queue capacity (FR-017)
14pub const DEFAULT_QUEUE_CAPACITY: usize = 1000;
15
16/// Queue statistics
17///
18/// Tracks queue operations for monitoring and debugging.
19#[derive(Debug, Clone, Default)]
20pub struct QueueStats {
21    /// Total messages enqueued
22    pub enqueued: u64,
23    /// Total messages dequeued
24    pub dequeued: u64,
25    /// Total messages dropped due to overflow
26    pub dropped: u64,
27    /// Number of times the queue was full
28    pub overflow_count: u64,
29}
30
31/// Internal atomic stats for thread-safe updates
32struct AtomicQueueStats {
33    enqueued: AtomicU64,
34    dequeued: AtomicU64,
35    dropped: AtomicU64,
36    overflow_count: AtomicU64,
37}
38
39impl AtomicQueueStats {
40    fn new() -> Self {
41        Self {
42            enqueued: AtomicU64::new(0),
43            dequeued: AtomicU64::new(0),
44            dropped: AtomicU64::new(0),
45            overflow_count: AtomicU64::new(0),
46        }
47    }
48
49    fn snapshot(&self) -> QueueStats {
50        QueueStats {
51            enqueued: self.enqueued.load(Ordering::Relaxed),
52            dequeued: self.dequeued.load(Ordering::Relaxed),
53            dropped: self.dropped.load(Ordering::Relaxed),
54            overflow_count: self.overflow_count.load(Ordering::Relaxed),
55        }
56    }
57
58    fn inc_enqueued(&self) {
59        self.enqueued.fetch_add(1, Ordering::Relaxed);
60    }
61
62    fn inc_dequeued(&self) {
63        self.dequeued.fetch_add(1, Ordering::Relaxed);
64    }
65
66    fn inc_dropped(&self) {
67        self.dropped.fetch_add(1, Ordering::Relaxed);
68    }
69
70    fn inc_overflow(&self) {
71        self.overflow_count.fetch_add(1, Ordering::Relaxed);
72    }
73}
74
75/// Bounded message queue
76///
77/// A queue with a fixed maximum capacity and configurable overflow policy.
78///
79/// # Example
80///
81/// ```rust
82/// use canlink_hal::queue::{BoundedQueue, QueueOverflowPolicy};
83/// use canlink_hal::message::CanMessage;
84///
85/// // Create queue with default policy (DropOldest)
86/// let mut queue = BoundedQueue::new(100);
87///
88/// // Create queue with custom policy
89/// let mut queue = BoundedQueue::with_policy(100, QueueOverflowPolicy::DropNewest);
90/// ```
91pub struct BoundedQueue {
92    buffer: VecDeque<CanMessage>,
93    capacity: usize,
94    policy: QueueOverflowPolicy,
95    stats: AtomicQueueStats,
96}
97
98impl BoundedQueue {
99    /// Create a new bounded queue with default overflow policy
100    ///
101    /// # Arguments
102    ///
103    /// * `capacity` - Maximum number of messages the queue can hold
104    ///
105    /// # Example
106    ///
107    /// ```rust
108    /// use canlink_hal::queue::BoundedQueue;
109    ///
110    /// let queue = BoundedQueue::new(1000);
111    /// assert_eq!(queue.capacity(), 1000);
112    /// ```
113    #[must_use]
114    pub fn new(capacity: usize) -> Self {
115        Self::with_policy(capacity, QueueOverflowPolicy::default())
116    }
117
118    /// Create a new bounded queue with specified overflow policy
119    ///
120    /// # Arguments
121    ///
122    /// * `capacity` - Maximum number of messages the queue can hold
123    /// * `policy` - Overflow handling policy
124    #[must_use]
125    pub fn with_policy(capacity: usize, policy: QueueOverflowPolicy) -> Self {
126        Self {
127            buffer: VecDeque::with_capacity(capacity),
128            capacity,
129            policy,
130            stats: AtomicQueueStats::new(),
131        }
132    }
133
134    /// Get the queue capacity
135    pub fn capacity(&self) -> usize {
136        self.capacity
137    }
138
139    /// Get the current number of messages in the queue
140    pub fn len(&self) -> usize {
141        self.buffer.len()
142    }
143
144    /// Check if the queue is empty
145    pub fn is_empty(&self) -> bool {
146        self.buffer.is_empty()
147    }
148
149    /// Check if the queue is full
150    pub fn is_full(&self) -> bool {
151        self.buffer.len() >= self.capacity
152    }
153
154    /// Get the overflow policy
155    pub fn policy(&self) -> QueueOverflowPolicy {
156        self.policy
157    }
158
159    /// Get queue statistics
160    pub fn stats(&self) -> QueueStats {
161        self.stats.snapshot()
162    }
163
164    /// Push a message to the queue
165    ///
166    /// Behavior depends on the overflow policy:
167    /// - `DropOldest`: Removes the oldest message if full
168    /// - `DropNewest`: Rejects the new message if full
169    /// - `Block`: Returns `QueueError::QueueFull` (async version handles blocking)
170    ///
171    /// # Errors
172    ///
173    /// - Returns `QueueError::QueueFull` if using `Block` policy and queue is full
174    /// - Returns `QueueError::MessageDropped` if using `DropNewest` policy and queue is full
175    pub fn push(&mut self, message: CanMessage) -> Result<(), QueueError> {
176        if self.is_full() {
177            self.stats.inc_overflow();
178
179            match self.policy {
180                QueueOverflowPolicy::DropOldest => {
181                    // Remove oldest message
182                    #[allow(unused_variables)]
183                    if let Some(dropped) = self.buffer.pop_front() {
184                        self.stats.inc_dropped();
185                        #[cfg(feature = "tracing")]
186                        crate::log_queue_overflow!(self.policy, dropped.id().raw());
187                    }
188                    // Continue to add new message
189                }
190                QueueOverflowPolicy::DropNewest => {
191                    self.stats.inc_dropped();
192                    #[cfg(feature = "tracing")]
193                    crate::log_queue_overflow!(self.policy, message.id().raw());
194                    return Err(QueueError::MessageDropped {
195                        id: message.id().raw(),
196                        reason: "Queue full, DropNewest policy".to_string(),
197                    });
198                }
199                QueueOverflowPolicy::Block { .. } => {
200                    // Synchronous push cannot block, return error
201                    return Err(QueueError::QueueFull {
202                        capacity: self.capacity,
203                    });
204                }
205            }
206        }
207
208        self.buffer.push_back(message);
209        self.stats.inc_enqueued();
210        Ok(())
211    }
212
213    /// Pop a message from the queue
214    ///
215    /// Returns `None` if the queue is empty.
216    pub fn pop(&mut self) -> Option<CanMessage> {
217        let msg = self.buffer.pop_front();
218        if msg.is_some() {
219            self.stats.inc_dequeued();
220        }
221        msg
222    }
223
224    /// Peek at the next message without removing it
225    pub fn peek(&self) -> Option<&CanMessage> {
226        self.buffer.front()
227    }
228
229    /// Clear all messages from the queue
230    pub fn clear(&mut self) {
231        self.buffer.clear();
232    }
233
234    /// Adjust the queue capacity
235    ///
236    /// If the new capacity is smaller than the current number of messages,
237    /// excess messages are removed according to the overflow policy.
238    ///
239    /// # Arguments
240    ///
241    /// * `new_capacity` - New maximum capacity
242    pub fn adjust_capacity(&mut self, new_capacity: usize) {
243        while self.buffer.len() > new_capacity {
244            match self.policy {
245                QueueOverflowPolicy::DropOldest | QueueOverflowPolicy::Block { .. } => {
246                    if self.buffer.pop_front().is_some() {
247                        self.stats.inc_dropped();
248                    }
249                }
250                QueueOverflowPolicy::DropNewest => {
251                    if self.buffer.pop_back().is_some() {
252                        self.stats.inc_dropped();
253                    }
254                }
255            }
256        }
257        self.capacity = new_capacity;
258    }
259
260    /// Iterate over messages without removing them
261    pub fn iter(&self) -> impl Iterator<Item = &CanMessage> {
262        self.buffer.iter()
263    }
264}
265
266impl Default for BoundedQueue {
267    fn default() -> Self {
268        Self::new(DEFAULT_QUEUE_CAPACITY)
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275    use crate::message::CanId;
276
277    fn make_test_message(id: u16) -> CanMessage {
278        CanMessage::new_standard(id, &[0u8; 8]).unwrap()
279    }
280
281    #[test]
282    fn test_new_queue() {
283        let queue = BoundedQueue::new(100);
284        assert_eq!(queue.capacity(), 100);
285        assert!(queue.is_empty());
286        assert!(!queue.is_full());
287    }
288
289    #[test]
290    fn test_push_pop() {
291        let mut queue = BoundedQueue::new(10);
292        let msg = make_test_message(0x123);
293
294        assert!(queue.push(msg.clone()).is_ok());
295        assert_eq!(queue.len(), 1);
296
297        let popped = queue.pop();
298        assert!(popped.is_some());
299        assert_eq!(popped.unwrap().id(), CanId::Standard(0x123));
300        assert!(queue.is_empty());
301    }
302
303    #[test]
304    fn test_drop_oldest_policy() {
305        let mut queue = BoundedQueue::with_policy(3, QueueOverflowPolicy::DropOldest);
306
307        // Fill the queue
308        queue.push(make_test_message(1)).unwrap();
309        queue.push(make_test_message(2)).unwrap();
310        queue.push(make_test_message(3)).unwrap();
311        assert!(queue.is_full());
312
313        // Push one more - should drop oldest (1)
314        queue.push(make_test_message(4)).unwrap();
315        assert_eq!(queue.len(), 3);
316
317        // Verify oldest was dropped
318        assert_eq!(queue.pop().unwrap().id(), CanId::Standard(2));
319        assert_eq!(queue.pop().unwrap().id(), CanId::Standard(3));
320        assert_eq!(queue.pop().unwrap().id(), CanId::Standard(4));
321
322        let stats = queue.stats();
323        assert_eq!(stats.dropped, 1);
324        assert_eq!(stats.overflow_count, 1);
325    }
326
327    #[test]
328    fn test_drop_newest_policy() {
329        let mut queue = BoundedQueue::with_policy(3, QueueOverflowPolicy::DropNewest);
330
331        // Fill the queue
332        queue.push(make_test_message(1)).unwrap();
333        queue.push(make_test_message(2)).unwrap();
334        queue.push(make_test_message(3)).unwrap();
335
336        // Push one more - should reject it
337        let result = queue.push(make_test_message(4));
338        assert!(result.is_err());
339        assert!(matches!(
340            result.unwrap_err(),
341            QueueError::MessageDropped { .. }
342        ));
343
344        // Verify queue unchanged
345        assert_eq!(queue.pop().unwrap().id(), CanId::Standard(1));
346        assert_eq!(queue.pop().unwrap().id(), CanId::Standard(2));
347        assert_eq!(queue.pop().unwrap().id(), CanId::Standard(3));
348    }
349
350    #[test]
351    fn test_block_policy_sync() {
352        use std::time::Duration;
353
354        let mut queue = BoundedQueue::with_policy(
355            2,
356            QueueOverflowPolicy::Block {
357                timeout: Duration::from_millis(100),
358            },
359        );
360
361        queue.push(make_test_message(1)).unwrap();
362        queue.push(make_test_message(2)).unwrap();
363
364        // Sync push should return QueueFull error
365        let result = queue.push(make_test_message(3));
366        assert!(result.is_err());
367        assert!(matches!(result.unwrap_err(), QueueError::QueueFull { .. }));
368    }
369
370    #[test]
371    fn test_adjust_capacity() {
372        let mut queue = BoundedQueue::new(10);
373
374        // Add 5 messages
375        for i in 0..5u16 {
376            queue.push(make_test_message(i)).unwrap();
377        }
378
379        // Reduce capacity to 3
380        queue.adjust_capacity(3);
381        assert_eq!(queue.capacity(), 3);
382        assert_eq!(queue.len(), 3);
383
384        // With DropOldest, oldest messages should be removed
385        assert_eq!(queue.pop().unwrap().id(), CanId::Standard(2));
386        assert_eq!(queue.pop().unwrap().id(), CanId::Standard(3));
387        assert_eq!(queue.pop().unwrap().id(), CanId::Standard(4));
388    }
389
390    #[test]
391    fn test_stats() {
392        let mut queue = BoundedQueue::with_policy(2, QueueOverflowPolicy::DropOldest);
393
394        queue.push(make_test_message(1)).unwrap();
395        queue.push(make_test_message(2)).unwrap();
396        queue.push(make_test_message(3)).unwrap(); // Causes overflow
397        queue.pop();
398
399        let stats = queue.stats();
400        assert_eq!(stats.enqueued, 3);
401        assert_eq!(stats.dequeued, 1);
402        assert_eq!(stats.dropped, 1);
403        assert_eq!(stats.overflow_count, 1);
404    }
405
406    #[test]
407    fn test_default_queue() {
408        let queue = BoundedQueue::default();
409        assert_eq!(queue.capacity(), DEFAULT_QUEUE_CAPACITY);
410    }
411}