1use std::fmt;
2use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
3
4use super::{OverflowPolicy, QueueError, QueueResult, QueueStatsSnapshot, RtQueueBase};
5use crate::buffer::AtomicCell;
6
7#[repr(C, align(64))]
12pub struct SpscQueue<T: Copy, const CAP: usize> {
13 buffer: [AtomicCell<T>; CAP],
14 head: AtomicUsize,
15 tail: AtomicUsize,
16 full: AtomicBool,
17 mask: usize,
18 overflow_policy: OverflowPolicy,
19 default_value: Option<T>,
20}
21
22impl<T: Copy + Default, const CAP: usize> SpscQueue<T, CAP> {
23 pub fn new() -> Self {
25 assert!(CAP.is_power_of_two(), "CAP must be a power of two");
26
27 let buffer = std::array::from_fn(|_| AtomicCell::new(T::default()));
28
29 Self {
30 buffer,
31 head: AtomicUsize::new(0),
32 tail: AtomicUsize::new(0),
33 full: AtomicBool::new(false),
34 mask: CAP - 1,
35 overflow_policy: OverflowPolicy::OverwriteOldest,
36 default_value: None,
37 }
38 }
39
40 pub fn with_policies(overflow_policy: OverflowPolicy, default_value: Option<T>) -> Self {
42 let mut queue = Self::new();
43 queue.overflow_policy = overflow_policy;
44 queue.default_value = default_value;
45 queue
46 }
47
48 pub fn push(&self, value: T) -> QueueResult<()> {
50 let head = self.head.load(Ordering::Relaxed);
51 let next_head = (head + 1) & self.mask;
52
53 if self.full.load(Ordering::Acquire) {
55 match self.overflow_policy {
56 OverflowPolicy::OverwriteOldest => {
57 let _ = self.tail.fetch_add(1, Ordering::Release) & self.mask;
60 self.full.store(false, Ordering::Release);
61 }
62
63 OverflowPolicy::DropNewest => {
64 return Err(QueueError::QueueFull);
65 }
66
67 OverflowPolicy::Panic => {
68 panic!("SpscQueue overflow (capacity: {})", CAP);
69 }
70
71 OverflowPolicy::Block => {
72 return Err(QueueError::QueueFull);
73 }
74 }
75 }
76
77 self.buffer[head].store(value);
78
79 self.head.store(next_head, Ordering::Release);
81
82 if next_head == self.tail.load(Ordering::Acquire) {
84 self.full.store(true, Ordering::Release);
85 }
86
87 Ok(())
88 }
89
90 pub fn pop(&self) -> Option<T> {
92 if self.is_empty() {
93 return self.default_value;
94 }
95
96 let tail = self.tail.load(Ordering::Relaxed);
97 let value = self.buffer[tail].load();
98
99 let next_tail = (tail + 1) & self.mask;
100 self.tail.store(next_tail, Ordering::Release);
101
102 self.full.store(false, Ordering::Release);
104
105 Some(value)
106 }
107
108 pub fn peek(&self) -> Option<T> {
110 if self.is_empty() {
111 None
112 } else {
113 let tail = self.tail.load(Ordering::Acquire);
114 Some(self.buffer[tail].load())
115 }
116 }
117
118 pub fn len(&self) -> usize {
120 if self.full.load(Ordering::Acquire) {
121 CAP
122 } else {
123 let head = self.head.load(Ordering::Acquire);
124 let tail = self.tail.load(Ordering::Acquire);
125
126 if head >= tail {
127 head - tail
128 } else {
129 CAP - tail + head
130 }
131 }
132 }
133
134 pub const fn capacity(&self) -> usize {
136 CAP
137 }
138
139 pub fn is_empty(&self) -> bool {
141 !self.full.load(Ordering::Acquire)
142 && self.head.load(Ordering::Acquire) == self.tail.load(Ordering::Acquire)
143 }
144
145 pub fn is_full(&self) -> bool {
147 self.full.load(Ordering::Acquire)
148 }
149
150 pub fn clear(&self) {
152 self.head.store(0, Ordering::Relaxed);
153 self.tail.store(0, Ordering::Relaxed);
154 self.full.store(false, Ordering::Relaxed);
155 }
156
157 pub fn stats(&self) -> QueueStatsSnapshot {
159 QueueStatsSnapshot::default()
160 }
161
162 pub fn set_default(&mut self, value: T) {
164 self.default_value = Some(value);
165 }
166
167 pub fn overflow_policy(&self) -> OverflowPolicy {
169 self.overflow_policy
170 }
171
172 pub fn set_overflow_policy(&mut self, policy: OverflowPolicy) {
174 self.overflow_policy = policy;
175 }
176}
177
178impl<T: Copy + Default + Send + Sync, const CAP: usize> RtQueueBase<T> for SpscQueue<T, CAP> {
179 fn push(&self, value: T) -> QueueResult<()> {
180 self.push(value)
181 }
182
183 fn pop(&self) -> Option<T> {
184 self.pop()
185 }
186
187 fn len(&self) -> usize {
188 self.len()
189 }
190
191 fn capacity(&self) -> usize {
192 CAP
193 }
194
195 fn clear(&self) {
196 self.clear();
197 }
198}
199
200impl<T: Copy + Default + fmt::Debug, const CAP: usize> fmt::Debug for SpscQueue<T, CAP> {
201 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202 f.debug_struct("SpscQueue")
203 .field("head", &self.head.load(Ordering::Relaxed))
204 .field("tail", &self.tail.load(Ordering::Relaxed))
205 .field("capacity", &CAP)
206 .field("len", &self.len())
207 .field("overflow_policy", &self.overflow_policy)
208 .field("default_value", &self.default_value)
209 .finish()
210 }
211}
212
213impl<T: Copy + Default, const CAP: usize> Default for SpscQueue<T, CAP> {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218#[allow(unsafe_code)]
219unsafe impl<T: Copy + Send, const CAP: usize> Send for SpscQueue<T, CAP> {}
220#[allow(unsafe_code)]
221unsafe impl<T: Copy + Sync, const CAP: usize> Sync for SpscQueue<T, CAP> {}
222
223#[cfg(test)]
228mod tests {
229 use super::*;
230
231 #[test]
232 fn test_spsc_basic() {
233 let queue = SpscQueue::<i32, 4>::new();
234
235 assert!(queue.is_empty());
236 assert_eq!(queue.capacity(), 4);
237 assert_eq!(queue.len(), 0);
238
239 queue.push(1).unwrap();
240 assert_eq!(queue.len(), 1);
241 assert!(!queue.is_empty());
242 assert!(!queue.is_full()); queue.push(2).unwrap();
245 queue.push(3).unwrap();
246 queue.push(4).unwrap();
247
248 assert!(queue.is_full()); assert_eq!(queue.len(), 4);
250
251 assert_eq!(queue.pop(), Some(1));
252 assert_eq!(queue.pop(), Some(2));
253 assert_eq!(queue.pop(), Some(3));
254 assert_eq!(queue.pop(), Some(4));
255 assert_eq!(queue.pop(), None);
256 assert!(queue.is_empty());
257 }
258
259 #[test]
260 fn test_spsc_overwrite_policy() {
261 let queue = SpscQueue::<i32, 2>::new(); queue.push(1).unwrap();
264 queue.push(2).unwrap();
265 assert!(queue.is_full());
266
267 queue.push(3).unwrap();
269 assert_eq!(queue.len(), 2);
270
271 assert_eq!(queue.pop(), Some(2));
273 assert_eq!(queue.pop(), Some(3));
274 assert_eq!(queue.pop(), None);
275 }
276
277 #[test]
278 fn test_spsc_drop_newest_policy() {
279 let queue = SpscQueue::<i32, 2>::with_policies(OverflowPolicy::DropNewest, None);
280
281 queue.push(1).unwrap();
282 queue.push(2).unwrap();
283 assert!(queue.is_full());
284
285 assert!(queue.push(3).is_err());
287
288 assert_eq!(queue.pop(), Some(1));
290 assert_eq!(queue.pop(), Some(2));
291 assert_eq!(queue.pop(), None);
292 }
293
294 #[test]
295 fn test_spsc_wraparound() {
296 let queue = SpscQueue::<i32, 4>::new();
297
298 queue.push(0).unwrap();
300 queue.push(1).unwrap();
301 queue.push(2).unwrap();
302 queue.push(3).unwrap();
303 assert!(queue.is_full());
304
305 assert_eq!(queue.pop(), Some(0));
307 assert_eq!(queue.pop(), Some(1));
308
309 queue.push(4).unwrap();
311 queue.push(5).unwrap();
312 assert!(queue.is_full());
313
314 assert_eq!(queue.pop(), Some(2));
316 assert_eq!(queue.pop(), Some(3));
317 assert_eq!(queue.pop(), Some(4));
318 assert_eq!(queue.pop(), Some(5));
319 assert_eq!(queue.pop(), None);
320 }
321
322 #[test]
323 fn test_spsc_peek() {
324 let queue = SpscQueue::<i32, 4>::new();
325
326 assert_eq!(queue.peek(), None);
327
328 queue.push(42).unwrap();
329 assert_eq!(queue.peek(), Some(42));
330 assert_eq!(queue.len(), 1);
331 assert_eq!(queue.pop(), Some(42));
332 assert_eq!(queue.peek(), None);
333 }
334
335 #[test]
336 fn test_spsc_clear() {
337 let queue = SpscQueue::<i32, 4>::new();
338
339 queue.push(1).unwrap();
340 queue.push(2).unwrap();
341 queue.push(3).unwrap();
342
343 assert_eq!(queue.len(), 3);
344
345 queue.clear();
346 assert_eq!(queue.len(), 0);
347 assert!(queue.is_empty());
348 }
349
350 #[test]
351 fn test_spsc_default_value() {
352 let queue = SpscQueue::<i32, 4>::with_policies(OverflowPolicy::OverwriteOldest, Some(-1));
353
354 assert_eq!(queue.pop(), Some(-1));
355
356 queue.push(42).unwrap();
357 assert_eq!(queue.pop(), Some(42));
358 assert_eq!(queue.pop(), Some(-1));
359 }
360
361 #[test]
362 fn test_spsc_policy_change() {
363 let mut queue = SpscQueue::<i32, 2>::new();
364 assert_eq!(queue.overflow_policy(), OverflowPolicy::OverwriteOldest);
365
366 queue.set_overflow_policy(OverflowPolicy::DropNewest);
367 assert_eq!(queue.overflow_policy(), OverflowPolicy::DropNewest);
368 }
369
370 #[test]
371 #[should_panic(expected = "CAP must be a power of two")]
372 fn test_spsc_invalid_capacity() {
373 let _ = SpscQueue::<i32, 3>::new();
374 }
375}