actionqueue_executor_local/
pool.rs1use std::collections::VecDeque;
5use std::error::Error;
6use std::fmt;
7use std::num::NonZeroUsize;
8
9#[derive(Debug, Clone, PartialEq, Eq)]
11pub enum DispatchQueueError {
12 Backpressure {
14 capacity: usize,
16 },
17 Shutdown,
19}
20
21impl fmt::Display for DispatchQueueError {
22 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23 match self {
24 Self::Backpressure { capacity } => {
25 write!(f, "dispatch queue capacity reached ({capacity})")
26 }
27 Self::Shutdown => write!(f, "dispatch queue is shut down"),
28 }
29 }
30}
31
32impl Error for DispatchQueueError {}
33
34#[derive(Debug)]
43pub struct DispatchQueue<T> {
44 capacity: usize,
45 queue: VecDeque<T>,
46 shutdown: bool,
47}
48
49impl<T> DispatchQueue<T> {
50 pub fn new(capacity: NonZeroUsize) -> Self {
52 Self {
53 capacity: capacity.get(),
54 queue: VecDeque::with_capacity(capacity.get()),
55 shutdown: false,
56 }
57 }
58
59 pub fn enqueue(&mut self, item: T) -> Result<(), DispatchQueueError> {
65 if self.shutdown {
66 return Err(DispatchQueueError::Shutdown);
67 }
68
69 if self.queue.len() >= self.capacity {
70 return Err(DispatchQueueError::Backpressure { capacity: self.capacity });
71 }
72
73 self.queue.push_back(item);
74 Ok(())
75 }
76
77 pub fn dequeue(&mut self) -> Result<Option<T>, DispatchQueueError> {
82 if let Some(item) = self.queue.pop_front() {
83 return Ok(Some(item));
84 }
85
86 if self.shutdown {
87 return Err(DispatchQueueError::Shutdown);
88 }
89
90 Ok(None)
91 }
92
93 pub fn shutdown(&mut self) {
99 self.shutdown = true;
100 }
101
102 pub fn capacity(&self) -> usize {
104 self.capacity
105 }
106
107 pub fn len(&self) -> usize {
109 self.queue.len()
110 }
111
112 pub fn is_empty(&self) -> bool {
114 self.queue.is_empty()
115 }
116
117 pub fn is_shutdown(&self) -> bool {
119 self.shutdown
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use std::num::NonZeroUsize;
126
127 use super::{DispatchQueue, DispatchQueueError};
128
129 #[test]
130 fn dequeue_is_fifo() {
131 let mut queue = DispatchQueue::new(NonZeroUsize::new(3).expect("non-zero"));
132 queue.enqueue(10).expect("first enqueue should succeed");
133 queue.enqueue(20).expect("second enqueue should succeed");
134 queue.enqueue(30).expect("third enqueue should succeed");
135
136 assert_eq!(queue.dequeue().expect("dequeue should succeed"), Some(10));
137 assert_eq!(queue.dequeue().expect("dequeue should succeed"), Some(20));
138 assert_eq!(queue.dequeue().expect("dequeue should succeed"), Some(30));
139 assert_eq!(queue.dequeue().expect("dequeue should succeed"), None);
140 }
141
142 #[test]
143 fn enqueue_returns_backpressure_at_capacity() {
144 let mut queue = DispatchQueue::new(NonZeroUsize::new(1).expect("non-zero"));
145
146 queue.enqueue(7).expect("enqueue should succeed");
147
148 assert_eq!(queue.enqueue(8), Err(DispatchQueueError::Backpressure { capacity: 1 }));
149 }
150
151 #[test]
152 fn shutdown_rejects_intake_but_allows_drain() {
153 let mut queue = DispatchQueue::new(NonZeroUsize::new(2).expect("non-zero"));
154 queue.enqueue(1).expect("enqueue should succeed");
155 queue.shutdown();
156
157 assert_eq!(queue.enqueue(2), Err(DispatchQueueError::Shutdown));
158 assert_eq!(queue.dequeue().expect("drain should succeed"), Some(1));
159 assert_eq!(queue.dequeue(), Err(DispatchQueueError::Shutdown));
160 }
161}