atomr_remote/
send_queue.rs1use std::collections::VecDeque;
10
11use crate::error::{RemoteError, RemoteErrorKind};
12use crate::settings::SendQueueOverflow;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum SendOutcome {
17 Enqueued(usize),
19 DroppedNew,
21 DroppedOld,
23}
24
25pub struct BoundedSendQueue<T> {
27 inner: VecDeque<T>,
28 capacity: usize,
29 policy: SendQueueOverflow,
30}
31
32impl<T> BoundedSendQueue<T> {
33 pub fn new(capacity: usize, policy: SendQueueOverflow) -> Self {
34 Self { inner: VecDeque::with_capacity(capacity), capacity: capacity.max(1), policy }
35 }
36
37 pub fn try_push(&mut self, item: T) -> Result<SendOutcome, RemoteError> {
39 if self.inner.len() < self.capacity {
40 self.inner.push_back(item);
41 return Ok(SendOutcome::Enqueued(self.inner.len()));
42 }
43 match self.policy {
44 SendQueueOverflow::DropNew => Ok(SendOutcome::DroppedNew),
45 SendQueueOverflow::DropOld => {
46 let _ = self.inner.pop_front();
47 self.inner.push_back(item);
48 Ok(SendOutcome::DroppedOld)
49 }
50 SendQueueOverflow::Fail => {
51 Err(RemoteError::new(RemoteErrorKind::BackPressure, "send queue full"))
52 }
53 }
54 }
55
56 pub fn pop(&mut self) -> Option<T> {
57 self.inner.pop_front()
58 }
59
60 pub fn len(&self) -> usize {
61 self.inner.len()
62 }
63
64 pub fn is_empty(&self) -> bool {
65 self.inner.is_empty()
66 }
67
68 pub fn capacity(&self) -> usize {
69 self.capacity
70 }
71}
72
73#[cfg(test)]
74mod tests {
75 use super::*;
76
77 #[test]
78 fn enqueue_until_full() {
79 let mut q = BoundedSendQueue::<u32>::new(2, SendQueueOverflow::Fail);
80 assert!(matches!(q.try_push(1), Ok(SendOutcome::Enqueued(1))));
81 assert!(matches!(q.try_push(2), Ok(SendOutcome::Enqueued(2))));
82 assert!(q.try_push(3).is_err());
83 assert_eq!(q.len(), 2);
84 }
85
86 #[test]
87 fn drop_new_keeps_oldest() {
88 let mut q = BoundedSendQueue::<u32>::new(1, SendQueueOverflow::DropNew);
89 q.try_push(1).unwrap();
90 assert_eq!(q.try_push(2).unwrap(), SendOutcome::DroppedNew);
91 assert_eq!(q.pop(), Some(1));
92 }
93
94 #[test]
95 fn drop_old_evicts_oldest() {
96 let mut q = BoundedSendQueue::<u32>::new(1, SendQueueOverflow::DropOld);
97 q.try_push(1).unwrap();
98 assert_eq!(q.try_push(2).unwrap(), SendOutcome::DroppedOld);
99 assert_eq!(q.pop(), Some(2));
100 }
101
102 #[test]
103 fn capacity_floor_is_one() {
104 let mut q = BoundedSendQueue::<u32>::new(0, SendQueueOverflow::DropNew);
105 assert!(matches!(q.try_push(1), Ok(SendOutcome::Enqueued(1))));
106 assert_eq!(q.try_push(2).unwrap(), SendOutcome::DroppedNew);
107 }
108}