Skip to main content

actionqueue_executor_local/
pool.rs

1//! Bounded, single-threaded dispatch queue for attempt coordination.
2//! This type is NOT thread-safe. For concurrent access, wrap in a Mutex.
3
4use std::collections::VecDeque;
5use std::error::Error;
6use std::fmt;
7use std::num::NonZeroUsize;
8
9/// Typed errors emitted by [`DispatchQueue`].
10#[derive(Debug, Clone, PartialEq, Eq)]
11pub enum DispatchQueueError {
12    /// Intake was rejected because the queue reached configured capacity.
13    Backpressure {
14        /// Maximum number of queued work items allowed.
15        capacity: usize,
16    },
17    /// Operation was rejected because the dispatch queue has been shut down.
18    Shutdown,
19}
20
21impl fmt::Display for DispatchQueueError {
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        match self {
24            Self::Backpressure { capacity } => {
25                write!(f, "dispatch queue capacity reached ({capacity})")
26            }
27            Self::Shutdown => write!(f, "dispatch queue is shut down"),
28        }
29    }
30}
31
32impl Error for DispatchQueueError {}
33
34/// Deterministic FIFO dispatch queue for attempt coordination.
35///
36/// # Invariants
37///
38/// - Intake is bounded to `capacity`.
39/// - Dequeue order is deterministic FIFO.
40/// - Shutdown state is explicit and inspectable.
41/// - This type does not apply execution or retry policy.
42#[derive(Debug)]
43pub struct DispatchQueue<T> {
44    capacity: usize,
45    queue: VecDeque<T>,
46    shutdown: bool,
47}
48
49impl<T> DispatchQueue<T> {
50    /// Creates a dispatch queue with bounded intake capacity.
51    pub fn new(capacity: NonZeroUsize) -> Self {
52        Self {
53            capacity: capacity.get(),
54            queue: VecDeque::with_capacity(capacity.get()),
55            shutdown: false,
56        }
57    }
58
59    /// Enqueues one work item.
60    ///
61    /// Returns:
62    /// - [`DispatchQueueError::Shutdown`] if the dispatch queue has been closed.
63    /// - [`DispatchQueueError::Backpressure`] if queue capacity is already reached.
64    pub fn enqueue(&mut self, item: T) -> Result<(), DispatchQueueError> {
65        if self.shutdown {
66            return Err(DispatchQueueError::Shutdown);
67        }
68
69        if self.queue.len() >= self.capacity {
70            return Err(DispatchQueueError::Backpressure { capacity: self.capacity });
71        }
72
73        self.queue.push_back(item);
74        Ok(())
75    }
76
77    /// Dequeues one work item in deterministic FIFO order.
78    ///
79    /// Returns [`DispatchQueueError::Shutdown`] only when the dispatch queue has been shut down
80    /// and no queued work remains.
81    pub fn dequeue(&mut self) -> Result<Option<T>, DispatchQueueError> {
82        if let Some(item) = self.queue.pop_front() {
83            return Ok(Some(item));
84        }
85
86        if self.shutdown {
87            return Err(DispatchQueueError::Shutdown);
88        }
89
90        Ok(None)
91    }
92
93    /// Marks the dispatch queue as shut down.
94    ///
95    /// After shutdown:
96    /// - New intake is rejected.
97    /// - Existing queued items remain available to be drained in FIFO order.
98    pub fn shutdown(&mut self) {
99        self.shutdown = true;
100    }
101
102    /// Returns queue capacity.
103    pub fn capacity(&self) -> usize {
104        self.capacity
105    }
106
107    /// Returns queued item count.
108    pub fn len(&self) -> usize {
109        self.queue.len()
110    }
111
112    /// Returns true when no items are queued.
113    pub fn is_empty(&self) -> bool {
114        self.queue.is_empty()
115    }
116
117    /// Returns true when dispatch queue has been marked shut down.
118    pub fn is_shutdown(&self) -> bool {
119        self.shutdown
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use std::num::NonZeroUsize;
126
127    use super::{DispatchQueue, DispatchQueueError};
128
129    #[test]
130    fn dequeue_is_fifo() {
131        let mut queue = DispatchQueue::new(NonZeroUsize::new(3).expect("non-zero"));
132        queue.enqueue(10).expect("first enqueue should succeed");
133        queue.enqueue(20).expect("second enqueue should succeed");
134        queue.enqueue(30).expect("third enqueue should succeed");
135
136        assert_eq!(queue.dequeue().expect("dequeue should succeed"), Some(10));
137        assert_eq!(queue.dequeue().expect("dequeue should succeed"), Some(20));
138        assert_eq!(queue.dequeue().expect("dequeue should succeed"), Some(30));
139        assert_eq!(queue.dequeue().expect("dequeue should succeed"), None);
140    }
141
142    #[test]
143    fn enqueue_returns_backpressure_at_capacity() {
144        let mut queue = DispatchQueue::new(NonZeroUsize::new(1).expect("non-zero"));
145
146        queue.enqueue(7).expect("enqueue should succeed");
147
148        assert_eq!(queue.enqueue(8), Err(DispatchQueueError::Backpressure { capacity: 1 }));
149    }
150
151    #[test]
152    fn shutdown_rejects_intake_but_allows_drain() {
153        let mut queue = DispatchQueue::new(NonZeroUsize::new(2).expect("non-zero"));
154        queue.enqueue(1).expect("enqueue should succeed");
155        queue.shutdown();
156
157        assert_eq!(queue.enqueue(2), Err(DispatchQueueError::Shutdown));
158        assert_eq!(queue.dequeue().expect("drain should succeed"), Some(1));
159        assert_eq!(queue.dequeue(), Err(DispatchQueueError::Shutdown));
160    }
161}