1use std::fmt;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3
4use super::{OverflowPolicy, QueueError, QueueResult, QueueStatsSnapshot, RtQueueBase};
5use crate::buffer::AtomicCell;
6
7#[repr(C, align(64))]
16pub struct SpscQueue<T: Copy, const CAP: usize> {
17 buffer: [AtomicCell<T>; CAP],
19 head: AtomicUsize,
21 tail: AtomicUsize,
23 full: AtomicBool,
25 mask: usize,
27 overflow_policy: OverflowPolicy,
29 default_value: Option<T>,
31}
32
33impl<T: Copy + Default, const CAP: usize> SpscQueue<T, CAP> {
34 pub fn new() -> Self {
41 assert!(CAP.is_power_of_two(), "CAP must be a power of two");
42
43 let buffer = std::array::from_fn(|_| AtomicCell::new(T::default()));
44
45 Self {
46 buffer,
47 head: AtomicUsize::new(0),
48 tail: AtomicUsize::new(0),
49 full: AtomicBool::new(false),
50 mask: CAP - 1,
51 overflow_policy: OverflowPolicy::OverwriteOldest,
52 default_value: None,
53 }
54 }
55
56 pub fn with_policies(overflow_policy: OverflowPolicy, default_value: Option<T>) -> Self {
58 let mut queue = Self::new();
59 queue.overflow_policy = overflow_policy;
60 queue.default_value = default_value;
61 queue
62 }
63
64 pub fn push(&self, value: T) -> QueueResult<()> {
75 let head = self.head.load(Ordering::Relaxed);
76 let next_head = (head + 1) & self.mask;
77
78 if self.full.load(Ordering::Acquire) {
79 match self.overflow_policy {
80 OverflowPolicy::OverwriteOldest => {
81 let _ = self.tail.fetch_add(1, Ordering::Release) & self.mask;
82 self.full.store(false, Ordering::Release);
83 }
84
85 OverflowPolicy::DropNewest => {
86 return Err(QueueError::QueueFull);
87 }
88
89 OverflowPolicy::Panic => {
90 panic!("SpscQueue overflow (capacity: {})", CAP);
91 }
92
93 OverflowPolicy::Block => {
94 return Err(QueueError::QueueFull);
95 }
96 }
97 }
98
99 self.buffer[head].store(value);
100
101 self.head.store(next_head, Ordering::Release);
102
103 if next_head == self.tail.load(Ordering::Acquire) {
104 self.full.store(true, Ordering::Release);
105 }
106
107 Ok(())
108 }
109
110 pub fn pop(&self) -> Option<T> {
112 if self.is_empty() {
113 return self.default_value;
114 }
115
116 let tail = self.tail.load(Ordering::Relaxed);
117 let value = self.buffer[tail].load();
118
119 let next_tail = (tail + 1) & self.mask;
120 self.tail.store(next_tail, Ordering::Release);
121
122 self.full.store(false, Ordering::Release);
123
124 Some(value)
125 }
126
127 pub fn peek(&self) -> Option<T> {
129 if self.is_empty() {
130 None
131 } else {
132 let tail = self.tail.load(Ordering::Acquire);
133 Some(self.buffer[tail].load())
134 }
135 }
136
137 pub fn len(&self) -> usize {
139 if self.full.load(Ordering::Acquire) {
140 CAP
141 } else {
142 let head = self.head.load(Ordering::Acquire);
143 let tail = self.tail.load(Ordering::Acquire);
144
145 if head >= tail {
146 head - tail
147 } else {
148 CAP - tail + head
149 }
150 }
151 }
152
153 pub const fn capacity(&self) -> usize {
155 CAP
156 }
157
158 pub fn is_empty(&self) -> bool {
160 !self.full.load(Ordering::Acquire)
161 && self.head.load(Ordering::Acquire) == self.tail.load(Ordering::Acquire)
162 }
163
164 pub fn is_full(&self) -> bool {
166 self.full.load(Ordering::Acquire)
167 }
168
169 pub fn clear(&self) {
171 self.head.store(0, Ordering::Relaxed);
172 self.tail.store(0, Ordering::Relaxed);
173 self.full.store(false, Ordering::Relaxed);
174 }
175
176 pub fn stats(&self) -> QueueStatsSnapshot {
178 QueueStatsSnapshot::default()
179 }
180
181 pub fn set_default(&mut self, value: T) {
183 self.default_value = Some(value);
184 }
185
186 pub fn overflow_policy(&self) -> OverflowPolicy {
188 self.overflow_policy
189 }
190
191 pub fn set_overflow_policy(&mut self, policy: OverflowPolicy) {
193 self.overflow_policy = policy;
194 }
195}
196
197impl<T: Copy + Default + Send + Sync, const CAP: usize> RtQueueBase<T> for SpscQueue<T, CAP> {
198 fn push(&self, value: T) -> QueueResult<()> {
199 self.push(value)
200 }
201
202 fn pop(&self) -> Option<T> {
203 self.pop()
204 }
205
206 fn len(&self) -> usize {
207 self.len()
208 }
209
210 fn capacity(&self) -> usize {
211 CAP
212 }
213
214 fn clear(&self) {
215 self.clear();
216 }
217}
218
219impl<T: Copy + Default + fmt::Debug, const CAP: usize> fmt::Debug for SpscQueue<T, CAP> {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 f.debug_struct("SpscQueue")
222 .field("head", &self.head.load(Ordering::Relaxed))
223 .field("tail", &self.tail.load(Ordering::Relaxed))
224 .field("capacity", &CAP)
225 .field("len", &self.len())
226 .field("overflow_policy", &self.overflow_policy)
227 .field("default_value", &self.default_value)
228 .finish()
229 }
230}
231
232impl<T: Copy + Default, const CAP: usize> Default for SpscQueue<T, CAP> {
233 fn default() -> Self {
234 Self::new()
235 }
236}
237#[allow(unsafe_code)]
238unsafe impl<T: Copy + Send, const CAP: usize> Send for SpscQueue<T, CAP> {}
239#[allow(unsafe_code)]
240unsafe impl<T: Copy + Sync, const CAP: usize> Sync for SpscQueue<T, CAP> {}
241
242#[cfg(test)]
247mod tests {
248 use super::*;
249
250 #[test]
251 fn test_spsc_basic() {
252 let queue = SpscQueue::<i32, 4>::new();
253
254 assert!(queue.is_empty());
255 assert_eq!(queue.capacity(), 4);
256 assert_eq!(queue.len(), 0);
257
258 queue.push(1).unwrap();
259 assert_eq!(queue.len(), 1);
260 assert!(!queue.is_empty());
261 assert!(!queue.is_full()); queue.push(2).unwrap();
264 queue.push(3).unwrap();
265 queue.push(4).unwrap();
266
267 assert!(queue.is_full()); assert_eq!(queue.len(), 4);
269
270 assert_eq!(queue.pop(), Some(1));
271 assert_eq!(queue.pop(), Some(2));
272 assert_eq!(queue.pop(), Some(3));
273 assert_eq!(queue.pop(), Some(4));
274 assert_eq!(queue.pop(), None);
275 assert!(queue.is_empty());
276 }
277
278 #[test]
279 fn test_spsc_overwrite_policy() {
280 let queue = SpscQueue::<i32, 2>::new(); queue.push(1).unwrap();
283 queue.push(2).unwrap();
284 assert!(queue.is_full());
285
286 queue.push(3).unwrap();
288 assert_eq!(queue.len(), 2);
289
290 assert_eq!(queue.pop(), Some(2));
292 assert_eq!(queue.pop(), Some(3));
293 assert_eq!(queue.pop(), None);
294 }
295
296 #[test]
297 fn test_spsc_drop_newest_policy() {
298 let queue = SpscQueue::<i32, 2>::with_policies(OverflowPolicy::DropNewest, None);
299
300 queue.push(1).unwrap();
301 queue.push(2).unwrap();
302 assert!(queue.is_full());
303
304 assert!(queue.push(3).is_err());
306
307 assert_eq!(queue.pop(), Some(1));
309 assert_eq!(queue.pop(), Some(2));
310 assert_eq!(queue.pop(), None);
311 }
312
313 #[test]
314 fn test_spsc_wraparound() {
315 let queue = SpscQueue::<i32, 4>::new();
316
317 queue.push(0).unwrap();
319 queue.push(1).unwrap();
320 queue.push(2).unwrap();
321 queue.push(3).unwrap();
322 assert!(queue.is_full());
323
324 assert_eq!(queue.pop(), Some(0));
326 assert_eq!(queue.pop(), Some(1));
327
328 queue.push(4).unwrap();
330 queue.push(5).unwrap();
331 assert!(queue.is_full());
332
333 assert_eq!(queue.pop(), Some(2));
335 assert_eq!(queue.pop(), Some(3));
336 assert_eq!(queue.pop(), Some(4));
337 assert_eq!(queue.pop(), Some(5));
338 assert_eq!(queue.pop(), None);
339 }
340
341 #[test]
342 fn test_spsc_peek() {
343 let queue = SpscQueue::<i32, 4>::new();
344
345 assert_eq!(queue.peek(), None);
346
347 queue.push(42).unwrap();
348 assert_eq!(queue.peek(), Some(42));
349 assert_eq!(queue.len(), 1);
350 assert_eq!(queue.pop(), Some(42));
351 assert_eq!(queue.peek(), None);
352 }
353
354 #[test]
355 fn test_spsc_clear() {
356 let queue = SpscQueue::<i32, 4>::new();
357
358 queue.push(1).unwrap();
359 queue.push(2).unwrap();
360 queue.push(3).unwrap();
361
362 assert_eq!(queue.len(), 3);
363
364 queue.clear();
365 assert_eq!(queue.len(), 0);
366 assert!(queue.is_empty());
367 }
368
369 #[test]
370 fn test_spsc_default_value() {
371 let queue = SpscQueue::<i32, 4>::with_policies(OverflowPolicy::OverwriteOldest, Some(-1));
372
373 assert_eq!(queue.pop(), Some(-1));
374
375 queue.push(42).unwrap();
376 assert_eq!(queue.pop(), Some(42));
377 assert_eq!(queue.pop(), Some(-1));
378 }
379
380 #[test]
381 fn test_spsc_policy_change() {
382 let mut queue = SpscQueue::<i32, 2>::new();
383 assert_eq!(queue.overflow_policy(), OverflowPolicy::OverwriteOldest);
384
385 queue.set_overflow_policy(OverflowPolicy::DropNewest);
386 assert_eq!(queue.overflow_policy(), OverflowPolicy::DropNewest);
387 }
388
389 #[test]
390 #[should_panic(expected = "CAP must be a power of two")]
391 fn test_spsc_invalid_capacity() {
392 let _ = SpscQueue::<i32, 3>::new();
393 }
394}