Skip to main content

atomr_remote/
send_queue.rs

1//! Bounded outbound send-queue with `OverflowStrategy`. Phase 5.G of
2//! `docs/full-port-plan.md`.
3//!
4//! The endpoint writer used to drain an unbounded mpsc which silently
5//! grew under sustained back-pressure. This module wraps that channel in
6//! a small VecDeque-backed queue with a bounded capacity and a
7//! configurable [`SendQueueOverflow`] policy.
8
9use std::collections::VecDeque;
10
11use crate::error::{RemoteError, RemoteErrorKind};
12use crate::settings::SendQueueOverflow;
13
14/// Result of a `try_push`.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum SendOutcome {
17    /// Message accepted; queue length is the returned value.
18    Enqueued(usize),
19    /// Overflow handled by dropping the new message.
20    DroppedNew,
21    /// Overflow handled by dropping an older message; queue still len capacity.
22    DroppedOld,
23}
24
25/// Bounded send queue for outbound endpoint envelopes (typed via `T`).
26pub struct BoundedSendQueue<T> {
27    inner: VecDeque<T>,
28    capacity: usize,
29    policy: SendQueueOverflow,
30}
31
32impl<T> BoundedSendQueue<T> {
33    pub fn new(capacity: usize, policy: SendQueueOverflow) -> Self {
34        Self { inner: VecDeque::with_capacity(capacity), capacity: capacity.max(1), policy }
35    }
36
37    /// Try to enqueue `item`. Honours the configured overflow policy.
38    pub fn try_push(&mut self, item: T) -> Result<SendOutcome, RemoteError> {
39        if self.inner.len() < self.capacity {
40            self.inner.push_back(item);
41            return Ok(SendOutcome::Enqueued(self.inner.len()));
42        }
43        match self.policy {
44            SendQueueOverflow::DropNew => Ok(SendOutcome::DroppedNew),
45            SendQueueOverflow::DropOld => {
46                let _ = self.inner.pop_front();
47                self.inner.push_back(item);
48                Ok(SendOutcome::DroppedOld)
49            }
50            SendQueueOverflow::Fail => {
51                Err(RemoteError::new(RemoteErrorKind::BackPressure, "send queue full"))
52            }
53        }
54    }
55
56    pub fn pop(&mut self) -> Option<T> {
57        self.inner.pop_front()
58    }
59
60    pub fn len(&self) -> usize {
61        self.inner.len()
62    }
63
64    pub fn is_empty(&self) -> bool {
65        self.inner.is_empty()
66    }
67
68    pub fn capacity(&self) -> usize {
69        self.capacity
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use super::*;
76
77    #[test]
78    fn enqueue_until_full() {
79        let mut q = BoundedSendQueue::<u32>::new(2, SendQueueOverflow::Fail);
80        assert!(matches!(q.try_push(1), Ok(SendOutcome::Enqueued(1))));
81        assert!(matches!(q.try_push(2), Ok(SendOutcome::Enqueued(2))));
82        assert!(q.try_push(3).is_err());
83        assert_eq!(q.len(), 2);
84    }
85
86    #[test]
87    fn drop_new_keeps_oldest() {
88        let mut q = BoundedSendQueue::<u32>::new(1, SendQueueOverflow::DropNew);
89        q.try_push(1).unwrap();
90        assert_eq!(q.try_push(2).unwrap(), SendOutcome::DroppedNew);
91        assert_eq!(q.pop(), Some(1));
92    }
93
94    #[test]
95    fn drop_old_evicts_oldest() {
96        let mut q = BoundedSendQueue::<u32>::new(1, SendQueueOverflow::DropOld);
97        q.try_push(1).unwrap();
98        assert_eq!(q.try_push(2).unwrap(), SendOutcome::DroppedOld);
99        assert_eq!(q.pop(), Some(2));
100    }
101
102    #[test]
103    fn capacity_floor_is_one() {
104        let mut q = BoundedSendQueue::<u32>::new(0, SendQueueOverflow::DropNew);
105        assert!(matches!(q.try_push(1), Ok(SendOutcome::Enqueued(1))));
106        assert_eq!(q.try_push(2).unwrap(), SendOutcome::DroppedNew);
107    }
108}