Skip to main content

rill_core/queues/
spsc.rs

1use std::fmt;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3
4use super::{OverflowPolicy, QueueError, QueueResult, QueueStatsSnapshot, RtQueueBase};
5use crate::buffer::AtomicCell;
6
7// =============================================================================
8// Main structure
9// =============================================================================
10
11/// A lock-free single-producer single-consumer queue.
12///
13/// Uses atomic operations for real-time safe push/pop without blocking.
14/// The capacity must be a power of two for efficient mask-based indexing.
15#[repr(C, align(64))]
16pub struct SpscQueue<T: Copy, const CAP: usize> {
17    /// Ring buffer of atomic cells holding queue elements.
18    buffer: [AtomicCell<T>; CAP],
19    /// Producer index (written by producer, read by consumer).
20    head: AtomicUsize,
21    /// Consumer index (written by consumer, read by producer).
22    tail: AtomicUsize,
23    /// Flag indicating whether the queue is full.
24    full: AtomicBool,
25    /// Bitmask for wrapping (CAP - 1, requires CAP to be a power of two).
26    mask: usize,
27    /// Behaviour when a push would overflow the queue.
28    overflow_policy: OverflowPolicy,
29    /// Default value returned when popping from an empty queue.
30    default_value: Option<T>,
31}
32
33impl<T: Copy + Default, const CAP: usize> SpscQueue<T, CAP> {
34    /// Create a new SPSC queue with default policies.
35    ///
36    /// The overflow policy defaults to [`OverflowPolicy::OverwriteOldest`].
37    ///
38    /// # Panics
39    /// Panics if `CAP` is not a power of two.
40    pub fn new() -> Self {
41        assert!(CAP.is_power_of_two(), "CAP must be a power of two");
42
43        let buffer = std::array::from_fn(|_| AtomicCell::new(T::default()));
44
45        Self {
46            buffer,
47            head: AtomicUsize::new(0),
48            tail: AtomicUsize::new(0),
49            full: AtomicBool::new(false),
50            mask: CAP - 1,
51            overflow_policy: OverflowPolicy::OverwriteOldest,
52            default_value: None,
53        }
54    }
55
56    /// Create a queue with custom overflow policy and default value.
57    pub fn with_policies(overflow_policy: OverflowPolicy, default_value: Option<T>) -> Self {
58        let mut queue = Self::new();
59        queue.overflow_policy = overflow_policy;
60        queue.default_value = default_value;
61        queue
62    }
63
64    /// Push a value into the queue.
65    ///
66    /// If the queue is full, behaviour depends on [`OverflowPolicy`].
67    ///
68    /// # Errors
69    /// Returns `QueueFull` when the policy is [`OverflowPolicy::DropNewest`]
70    /// or [`OverflowPolicy::Block`] and the queue is full.
71    ///
72    /// # Panics
73    /// Panics when the policy is [`OverflowPolicy::Panic`] and the queue is full.
74    pub fn push(&self, value: T) -> QueueResult<()> {
75        let head = self.head.load(Ordering::Relaxed);
76        let next_head = (head + 1) & self.mask;
77
78        if self.full.load(Ordering::Acquire) {
79            match self.overflow_policy {
80                OverflowPolicy::OverwriteOldest => {
81                    let _ = self.tail.fetch_add(1, Ordering::Release) & self.mask;
82                    self.full.store(false, Ordering::Release);
83                }
84
85                OverflowPolicy::DropNewest => {
86                    return Err(QueueError::QueueFull);
87                }
88
89                OverflowPolicy::Panic => {
90                    panic!("SpscQueue overflow (capacity: {})", CAP);
91                }
92
93                OverflowPolicy::Block => {
94                    return Err(QueueError::QueueFull);
95                }
96            }
97        }
98
99        self.buffer[head].store(value);
100
101        self.head.store(next_head, Ordering::Release);
102
103        if next_head == self.tail.load(Ordering::Acquire) {
104            self.full.store(true, Ordering::Release);
105        }
106
107        Ok(())
108    }
109
110    /// Pop a value from the queue, or return the default value if empty.
111    pub fn pop(&self) -> Option<T> {
112        if self.is_empty() {
113            return self.default_value;
114        }
115
116        let tail = self.tail.load(Ordering::Relaxed);
117        let value = self.buffer[tail].load();
118
119        let next_tail = (tail + 1) & self.mask;
120        self.tail.store(next_tail, Ordering::Release);
121
122        self.full.store(false, Ordering::Release);
123
124        Some(value)
125    }
126
127    /// Peek at the front value without removing it.
128    pub fn peek(&self) -> Option<T> {
129        if self.is_empty() {
130            None
131        } else {
132            let tail = self.tail.load(Ordering::Acquire);
133            Some(self.buffer[tail].load())
134        }
135    }
136
137    /// Return the current number of elements in the queue.
138    pub fn len(&self) -> usize {
139        if self.full.load(Ordering::Acquire) {
140            CAP
141        } else {
142            let head = self.head.load(Ordering::Acquire);
143            let tail = self.tail.load(Ordering::Acquire);
144
145            if head >= tail {
146                head - tail
147            } else {
148                CAP - tail + head
149            }
150        }
151    }
152
153    /// Return the fixed capacity of the queue.
154    pub const fn capacity(&self) -> usize {
155        CAP
156    }
157
158    /// Return true if the queue is empty.
159    pub fn is_empty(&self) -> bool {
160        !self.full.load(Ordering::Acquire)
161            && self.head.load(Ordering::Acquire) == self.tail.load(Ordering::Acquire)
162    }
163
164    /// Return true if the queue is full.
165    pub fn is_full(&self) -> bool {
166        self.full.load(Ordering::Acquire)
167    }
168
169    /// Clear the queue, resetting both head and tail pointers.
170    pub fn clear(&self) {
171        self.head.store(0, Ordering::Relaxed);
172        self.tail.store(0, Ordering::Relaxed);
173        self.full.store(false, Ordering::Relaxed);
174    }
175
176    /// Return a statistics snapshot (currently always empty).
177    pub fn stats(&self) -> QueueStatsSnapshot {
178        QueueStatsSnapshot::default()
179    }
180
181    /// Set the default value returned when popping from an empty queue.
182    pub fn set_default(&mut self, value: T) {
183        self.default_value = Some(value);
184    }
185
186    /// Return the current overflow policy.
187    pub fn overflow_policy(&self) -> OverflowPolicy {
188        self.overflow_policy
189    }
190
191    /// Set the overflow policy.
192    pub fn set_overflow_policy(&mut self, policy: OverflowPolicy) {
193        self.overflow_policy = policy;
194    }
195}
196
197impl<T: Copy + Default + Send + Sync, const CAP: usize> RtQueueBase<T> for SpscQueue<T, CAP> {
198    fn push(&self, value: T) -> QueueResult<()> {
199        self.push(value)
200    }
201
202    fn pop(&self) -> Option<T> {
203        self.pop()
204    }
205
206    fn len(&self) -> usize {
207        self.len()
208    }
209
210    fn capacity(&self) -> usize {
211        CAP
212    }
213
214    fn clear(&self) {
215        self.clear();
216    }
217}
218
219impl<T: Copy + Default + fmt::Debug, const CAP: usize> fmt::Debug for SpscQueue<T, CAP> {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        f.debug_struct("SpscQueue")
222            .field("head", &self.head.load(Ordering::Relaxed))
223            .field("tail", &self.tail.load(Ordering::Relaxed))
224            .field("capacity", &CAP)
225            .field("len", &self.len())
226            .field("overflow_policy", &self.overflow_policy)
227            .field("default_value", &self.default_value)
228            .finish()
229    }
230}
231
232impl<T: Copy + Default, const CAP: usize> Default for SpscQueue<T, CAP> {
233    fn default() -> Self {
234        Self::new()
235    }
236}
237#[allow(unsafe_code)]
238unsafe impl<T: Copy + Send, const CAP: usize> Send for SpscQueue<T, CAP> {}
239#[allow(unsafe_code)]
240unsafe impl<T: Copy + Sync, const CAP: usize> Sync for SpscQueue<T, CAP> {}
241
242// =============================================================================
243// Tests
244// =============================================================================
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249
250    #[test]
251    fn test_spsc_basic() {
252        let queue = SpscQueue::<i32, 4>::new();
253
254        assert!(queue.is_empty());
255        assert_eq!(queue.capacity(), 4);
256        assert_eq!(queue.len(), 0);
257
258        queue.push(1).unwrap();
259        assert_eq!(queue.len(), 1);
260        assert!(!queue.is_empty());
261        assert!(!queue.is_full()); // Not full after 1 element
262
263        queue.push(2).unwrap();
264        queue.push(3).unwrap();
265        queue.push(4).unwrap();
266
267        assert!(queue.is_full()); // Full after 4 elements
268        assert_eq!(queue.len(), 4);
269
270        assert_eq!(queue.pop(), Some(1));
271        assert_eq!(queue.pop(), Some(2));
272        assert_eq!(queue.pop(), Some(3));
273        assert_eq!(queue.pop(), Some(4));
274        assert_eq!(queue.pop(), None);
275        assert!(queue.is_empty());
276    }
277
278    #[test]
279    fn test_spsc_overwrite_policy() {
280        let queue = SpscQueue::<i32, 2>::new(); // default policy is OverwriteOldest
281
282        queue.push(1).unwrap();
283        queue.push(2).unwrap();
284        assert!(queue.is_full());
285
286        // Overwrite the oldest (1)
287        queue.push(3).unwrap();
288        assert_eq!(queue.len(), 2);
289
290        // Now the queue is [2, 3] (2 became the oldest)
291        assert_eq!(queue.pop(), Some(2));
292        assert_eq!(queue.pop(), Some(3));
293        assert_eq!(queue.pop(), None);
294    }
295
296    #[test]
297    fn test_spsc_drop_newest_policy() {
298        let queue = SpscQueue::<i32, 2>::with_policies(OverflowPolicy::DropNewest, None);
299
300        queue.push(1).unwrap();
301        queue.push(2).unwrap();
302        assert!(queue.is_full());
303
304        // Should return an error, element is not added
305        assert!(queue.push(3).is_err());
306
307        // Queue should contain [1, 2] in the same order
308        assert_eq!(queue.pop(), Some(1));
309        assert_eq!(queue.pop(), Some(2));
310        assert_eq!(queue.pop(), None);
311    }
312
313    #[test]
314    fn test_spsc_wraparound() {
315        let queue = SpscQueue::<i32, 4>::new();
316
317        // Fill
318        queue.push(0).unwrap();
319        queue.push(1).unwrap();
320        queue.push(2).unwrap();
321        queue.push(3).unwrap();
322        assert!(queue.is_full());
323
324        // Pop two
325        assert_eq!(queue.pop(), Some(0));
326        assert_eq!(queue.pop(), Some(1));
327
328        // Push two new ones
329        queue.push(4).unwrap();
330        queue.push(5).unwrap();
331        assert!(queue.is_full());
332
333        // Verify order
334        assert_eq!(queue.pop(), Some(2));
335        assert_eq!(queue.pop(), Some(3));
336        assert_eq!(queue.pop(), Some(4));
337        assert_eq!(queue.pop(), Some(5));
338        assert_eq!(queue.pop(), None);
339    }
340
341    #[test]
342    fn test_spsc_peek() {
343        let queue = SpscQueue::<i32, 4>::new();
344
345        assert_eq!(queue.peek(), None);
346
347        queue.push(42).unwrap();
348        assert_eq!(queue.peek(), Some(42));
349        assert_eq!(queue.len(), 1);
350        assert_eq!(queue.pop(), Some(42));
351        assert_eq!(queue.peek(), None);
352    }
353
354    #[test]
355    fn test_spsc_clear() {
356        let queue = SpscQueue::<i32, 4>::new();
357
358        queue.push(1).unwrap();
359        queue.push(2).unwrap();
360        queue.push(3).unwrap();
361
362        assert_eq!(queue.len(), 3);
363
364        queue.clear();
365        assert_eq!(queue.len(), 0);
366        assert!(queue.is_empty());
367    }
368
369    #[test]
370    fn test_spsc_default_value() {
371        let queue = SpscQueue::<i32, 4>::with_policies(OverflowPolicy::OverwriteOldest, Some(-1));
372
373        assert_eq!(queue.pop(), Some(-1));
374
375        queue.push(42).unwrap();
376        assert_eq!(queue.pop(), Some(42));
377        assert_eq!(queue.pop(), Some(-1));
378    }
379
380    #[test]
381    fn test_spsc_policy_change() {
382        let mut queue = SpscQueue::<i32, 2>::new();
383        assert_eq!(queue.overflow_policy(), OverflowPolicy::OverwriteOldest);
384
385        queue.set_overflow_policy(OverflowPolicy::DropNewest);
386        assert_eq!(queue.overflow_policy(), OverflowPolicy::DropNewest);
387    }
388
389    #[test]
390    #[should_panic(expected = "CAP must be a power of two")]
391    fn test_spsc_invalid_capacity() {
392        let _ = SpscQueue::<i32, 3>::new();
393    }
394}