canlink_hal/queue/
bounded.rs1use std::collections::VecDeque;
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use crate::error::QueueError;
9use crate::message::CanMessage;
10
11use super::QueueOverflowPolicy;
12
13pub const DEFAULT_QUEUE_CAPACITY: usize = 1000;
15
16#[derive(Debug, Clone, Default)]
20pub struct QueueStats {
21 pub enqueued: u64,
23 pub dequeued: u64,
25 pub dropped: u64,
27 pub overflow_count: u64,
29}
30
31struct AtomicQueueStats {
33 enqueued: AtomicU64,
34 dequeued: AtomicU64,
35 dropped: AtomicU64,
36 overflow_count: AtomicU64,
37}
38
39impl AtomicQueueStats {
40 fn new() -> Self {
41 Self {
42 enqueued: AtomicU64::new(0),
43 dequeued: AtomicU64::new(0),
44 dropped: AtomicU64::new(0),
45 overflow_count: AtomicU64::new(0),
46 }
47 }
48
49 fn snapshot(&self) -> QueueStats {
50 QueueStats {
51 enqueued: self.enqueued.load(Ordering::Relaxed),
52 dequeued: self.dequeued.load(Ordering::Relaxed),
53 dropped: self.dropped.load(Ordering::Relaxed),
54 overflow_count: self.overflow_count.load(Ordering::Relaxed),
55 }
56 }
57
58 fn inc_enqueued(&self) {
59 self.enqueued.fetch_add(1, Ordering::Relaxed);
60 }
61
62 fn inc_dequeued(&self) {
63 self.dequeued.fetch_add(1, Ordering::Relaxed);
64 }
65
66 fn inc_dropped(&self) {
67 self.dropped.fetch_add(1, Ordering::Relaxed);
68 }
69
70 fn inc_overflow(&self) {
71 self.overflow_count.fetch_add(1, Ordering::Relaxed);
72 }
73}
74
75pub struct BoundedQueue {
92 buffer: VecDeque<CanMessage>,
93 capacity: usize,
94 policy: QueueOverflowPolicy,
95 stats: AtomicQueueStats,
96}
97
98impl BoundedQueue {
99 #[must_use]
114 pub fn new(capacity: usize) -> Self {
115 Self::with_policy(capacity, QueueOverflowPolicy::default())
116 }
117
118 #[must_use]
125 pub fn with_policy(capacity: usize, policy: QueueOverflowPolicy) -> Self {
126 Self {
127 buffer: VecDeque::with_capacity(capacity),
128 capacity,
129 policy,
130 stats: AtomicQueueStats::new(),
131 }
132 }
133
134 pub fn capacity(&self) -> usize {
136 self.capacity
137 }
138
139 pub fn len(&self) -> usize {
141 self.buffer.len()
142 }
143
144 pub fn is_empty(&self) -> bool {
146 self.buffer.is_empty()
147 }
148
149 pub fn is_full(&self) -> bool {
151 self.buffer.len() >= self.capacity
152 }
153
154 pub fn policy(&self) -> QueueOverflowPolicy {
156 self.policy
157 }
158
159 pub fn stats(&self) -> QueueStats {
161 self.stats.snapshot()
162 }
163
164 pub fn push(&mut self, message: CanMessage) -> Result<(), QueueError> {
176 if self.is_full() {
177 self.stats.inc_overflow();
178
179 match self.policy {
180 QueueOverflowPolicy::DropOldest => {
181 #[allow(unused_variables)]
183 if let Some(dropped) = self.buffer.pop_front() {
184 self.stats.inc_dropped();
185 #[cfg(feature = "tracing")]
186 crate::log_queue_overflow!(self.policy, dropped.id().raw());
187 }
188 }
190 QueueOverflowPolicy::DropNewest => {
191 self.stats.inc_dropped();
192 #[cfg(feature = "tracing")]
193 crate::log_queue_overflow!(self.policy, message.id().raw());
194 return Err(QueueError::MessageDropped {
195 id: message.id().raw(),
196 reason: "Queue full, DropNewest policy".to_string(),
197 });
198 }
199 QueueOverflowPolicy::Block { .. } => {
200 return Err(QueueError::QueueFull {
202 capacity: self.capacity,
203 });
204 }
205 }
206 }
207
208 self.buffer.push_back(message);
209 self.stats.inc_enqueued();
210 Ok(())
211 }
212
213 pub fn pop(&mut self) -> Option<CanMessage> {
217 let msg = self.buffer.pop_front();
218 if msg.is_some() {
219 self.stats.inc_dequeued();
220 }
221 msg
222 }
223
224 pub fn peek(&self) -> Option<&CanMessage> {
226 self.buffer.front()
227 }
228
229 pub fn clear(&mut self) {
231 self.buffer.clear();
232 }
233
234 pub fn adjust_capacity(&mut self, new_capacity: usize) {
243 while self.buffer.len() > new_capacity {
244 match self.policy {
245 QueueOverflowPolicy::DropOldest | QueueOverflowPolicy::Block { .. } => {
246 if self.buffer.pop_front().is_some() {
247 self.stats.inc_dropped();
248 }
249 }
250 QueueOverflowPolicy::DropNewest => {
251 if self.buffer.pop_back().is_some() {
252 self.stats.inc_dropped();
253 }
254 }
255 }
256 }
257 self.capacity = new_capacity;
258 }
259
260 pub fn iter(&self) -> impl Iterator<Item = &CanMessage> {
262 self.buffer.iter()
263 }
264}
265
266impl Default for BoundedQueue {
267 fn default() -> Self {
268 Self::new(DEFAULT_QUEUE_CAPACITY)
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275 use crate::message::CanId;
276
277 fn make_test_message(id: u16) -> CanMessage {
278 CanMessage::new_standard(id, &[0u8; 8]).unwrap()
279 }
280
281 #[test]
282 fn test_new_queue() {
283 let queue = BoundedQueue::new(100);
284 assert_eq!(queue.capacity(), 100);
285 assert!(queue.is_empty());
286 assert!(!queue.is_full());
287 }
288
289 #[test]
290 fn test_push_pop() {
291 let mut queue = BoundedQueue::new(10);
292 let msg = make_test_message(0x123);
293
294 assert!(queue.push(msg.clone()).is_ok());
295 assert_eq!(queue.len(), 1);
296
297 let popped = queue.pop();
298 assert!(popped.is_some());
299 assert_eq!(popped.unwrap().id(), CanId::Standard(0x123));
300 assert!(queue.is_empty());
301 }
302
303 #[test]
304 fn test_drop_oldest_policy() {
305 let mut queue = BoundedQueue::with_policy(3, QueueOverflowPolicy::DropOldest);
306
307 queue.push(make_test_message(1)).unwrap();
309 queue.push(make_test_message(2)).unwrap();
310 queue.push(make_test_message(3)).unwrap();
311 assert!(queue.is_full());
312
313 queue.push(make_test_message(4)).unwrap();
315 assert_eq!(queue.len(), 3);
316
317 assert_eq!(queue.pop().unwrap().id(), CanId::Standard(2));
319 assert_eq!(queue.pop().unwrap().id(), CanId::Standard(3));
320 assert_eq!(queue.pop().unwrap().id(), CanId::Standard(4));
321
322 let stats = queue.stats();
323 assert_eq!(stats.dropped, 1);
324 assert_eq!(stats.overflow_count, 1);
325 }
326
327 #[test]
328 fn test_drop_newest_policy() {
329 let mut queue = BoundedQueue::with_policy(3, QueueOverflowPolicy::DropNewest);
330
331 queue.push(make_test_message(1)).unwrap();
333 queue.push(make_test_message(2)).unwrap();
334 queue.push(make_test_message(3)).unwrap();
335
336 let result = queue.push(make_test_message(4));
338 assert!(result.is_err());
339 assert!(matches!(
340 result.unwrap_err(),
341 QueueError::MessageDropped { .. }
342 ));
343
344 assert_eq!(queue.pop().unwrap().id(), CanId::Standard(1));
346 assert_eq!(queue.pop().unwrap().id(), CanId::Standard(2));
347 assert_eq!(queue.pop().unwrap().id(), CanId::Standard(3));
348 }
349
350 #[test]
351 fn test_block_policy_sync() {
352 use std::time::Duration;
353
354 let mut queue = BoundedQueue::with_policy(
355 2,
356 QueueOverflowPolicy::Block {
357 timeout: Duration::from_millis(100),
358 },
359 );
360
361 queue.push(make_test_message(1)).unwrap();
362 queue.push(make_test_message(2)).unwrap();
363
364 let result = queue.push(make_test_message(3));
366 assert!(result.is_err());
367 assert!(matches!(result.unwrap_err(), QueueError::QueueFull { .. }));
368 }
369
370 #[test]
371 fn test_adjust_capacity() {
372 let mut queue = BoundedQueue::new(10);
373
374 for i in 0..5u16 {
376 queue.push(make_test_message(i)).unwrap();
377 }
378
379 queue.adjust_capacity(3);
381 assert_eq!(queue.capacity(), 3);
382 assert_eq!(queue.len(), 3);
383
384 assert_eq!(queue.pop().unwrap().id(), CanId::Standard(2));
386 assert_eq!(queue.pop().unwrap().id(), CanId::Standard(3));
387 assert_eq!(queue.pop().unwrap().id(), CanId::Standard(4));
388 }
389
390 #[test]
391 fn test_stats() {
392 let mut queue = BoundedQueue::with_policy(2, QueueOverflowPolicy::DropOldest);
393
394 queue.push(make_test_message(1)).unwrap();
395 queue.push(make_test_message(2)).unwrap();
396 queue.push(make_test_message(3)).unwrap(); queue.pop();
398
399 let stats = queue.stats();
400 assert_eq!(stats.enqueued, 3);
401 assert_eq!(stats.dequeued, 1);
402 assert_eq!(stats.dropped, 1);
403 assert_eq!(stats.overflow_count, 1);
404 }
405
406 #[test]
407 fn test_default_queue() {
408 let queue = BoundedQueue::default();
409 assert_eq!(queue.capacity(), DEFAULT_QUEUE_CAPACITY);
410 }
411}