use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use super::{OverflowPolicy, QueueError, QueueResult, QueueStatsSnapshot, RtQueueBase};
use crate::buffer::AtomicCell;
#[repr(C, align(64))]
pub struct SpscQueue<T: Copy, const CAP: usize> {
buffer: [AtomicCell<T>; CAP],
head: AtomicUsize,
tail: AtomicUsize,
full: AtomicBool,
mask: usize,
overflow_policy: OverflowPolicy,
default_value: Option<T>,
}
impl<T: Copy + Default, const CAP: usize> SpscQueue<T, CAP> {
pub fn new() -> Self {
assert!(CAP.is_power_of_two(), "CAP must be a power of two");
let buffer = std::array::from_fn(|_| AtomicCell::new(T::default()));
Self {
buffer,
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
full: AtomicBool::new(false),
mask: CAP - 1,
overflow_policy: OverflowPolicy::OverwriteOldest,
default_value: None,
}
}
pub fn with_policies(overflow_policy: OverflowPolicy, default_value: Option<T>) -> Self {
let mut queue = Self::new();
queue.overflow_policy = overflow_policy;
queue.default_value = default_value;
queue
}
pub fn push(&self, value: T) -> QueueResult<()> {
let head = self.head.load(Ordering::Relaxed);
let next_head = (head + 1) & self.mask;
if self.full.load(Ordering::Acquire) {
match self.overflow_policy {
OverflowPolicy::OverwriteOldest => {
let _ = self.tail.fetch_add(1, Ordering::Release) & self.mask;
self.full.store(false, Ordering::Release);
}
OverflowPolicy::DropNewest => {
return Err(QueueError::QueueFull);
}
OverflowPolicy::Panic => {
panic!("SpscQueue overflow (capacity: {})", CAP);
}
OverflowPolicy::Block => {
return Err(QueueError::QueueFull);
}
}
}
self.buffer[head].store(value);
self.head.store(next_head, Ordering::Release);
if next_head == self.tail.load(Ordering::Acquire) {
self.full.store(true, Ordering::Release);
}
Ok(())
}
pub fn pop(&self) -> Option<T> {
if self.is_empty() {
return self.default_value;
}
let tail = self.tail.load(Ordering::Relaxed);
let value = self.buffer[tail].load();
let next_tail = (tail + 1) & self.mask;
self.tail.store(next_tail, Ordering::Release);
self.full.store(false, Ordering::Release);
Some(value)
}
pub fn peek(&self) -> Option<T> {
if self.is_empty() {
None
} else {
let tail = self.tail.load(Ordering::Acquire);
Some(self.buffer[tail].load())
}
}
pub fn len(&self) -> usize {
if self.full.load(Ordering::Acquire) {
CAP
} else {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
if head >= tail {
head - tail
} else {
CAP - tail + head
}
}
}
pub const fn capacity(&self) -> usize {
CAP
}
pub fn is_empty(&self) -> bool {
!self.full.load(Ordering::Acquire)
&& self.head.load(Ordering::Acquire) == self.tail.load(Ordering::Acquire)
}
pub fn is_full(&self) -> bool {
self.full.load(Ordering::Acquire)
}
pub fn clear(&self) {
self.head.store(0, Ordering::Relaxed);
self.tail.store(0, Ordering::Relaxed);
self.full.store(false, Ordering::Relaxed);
}
pub fn stats(&self) -> QueueStatsSnapshot {
QueueStatsSnapshot::default()
}
pub fn set_default(&mut self, value: T) {
self.default_value = Some(value);
}
pub fn overflow_policy(&self) -> OverflowPolicy {
self.overflow_policy
}
pub fn set_overflow_policy(&mut self, policy: OverflowPolicy) {
self.overflow_policy = policy;
}
}
impl<T: Copy + Default + Send + Sync, const CAP: usize> RtQueueBase<T> for SpscQueue<T, CAP> {
fn push(&self, value: T) -> QueueResult<()> {
self.push(value)
}
fn pop(&self) -> Option<T> {
self.pop()
}
fn len(&self) -> usize {
self.len()
}
fn capacity(&self) -> usize {
CAP
}
fn clear(&self) {
self.clear();
}
}
impl<T: Copy + Default + fmt::Debug, const CAP: usize> fmt::Debug for SpscQueue<T, CAP> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SpscQueue")
.field("head", &self.head.load(Ordering::Relaxed))
.field("tail", &self.tail.load(Ordering::Relaxed))
.field("capacity", &CAP)
.field("len", &self.len())
.field("overflow_policy", &self.overflow_policy)
.field("default_value", &self.default_value)
.finish()
}
}
impl<T: Copy + Default, const CAP: usize> Default for SpscQueue<T, CAP> {
fn default() -> Self {
Self::new()
}
}
#[allow(unsafe_code)]
unsafe impl<T: Copy + Send, const CAP: usize> Send for SpscQueue<T, CAP> {}
#[allow(unsafe_code)]
unsafe impl<T: Copy + Sync, const CAP: usize> Sync for SpscQueue<T, CAP> {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_spsc_basic() {
let queue = SpscQueue::<i32, 4>::new();
assert!(queue.is_empty());
assert_eq!(queue.capacity(), 4);
assert_eq!(queue.len(), 0);
queue.push(1).unwrap();
assert_eq!(queue.len(), 1);
assert!(!queue.is_empty());
assert!(!queue.is_full());
queue.push(2).unwrap();
queue.push(3).unwrap();
queue.push(4).unwrap();
assert!(queue.is_full()); assert_eq!(queue.len(), 4);
assert_eq!(queue.pop(), Some(1));
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), Some(4));
assert_eq!(queue.pop(), None);
assert!(queue.is_empty());
}
#[test]
fn test_spsc_overwrite_policy() {
let queue = SpscQueue::<i32, 2>::new();
queue.push(1).unwrap();
queue.push(2).unwrap();
assert!(queue.is_full());
queue.push(3).unwrap();
assert_eq!(queue.len(), 2);
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), None);
}
#[test]
fn test_spsc_drop_newest_policy() {
let queue = SpscQueue::<i32, 2>::with_policies(OverflowPolicy::DropNewest, None);
queue.push(1).unwrap();
queue.push(2).unwrap();
assert!(queue.is_full());
assert!(queue.push(3).is_err());
assert_eq!(queue.pop(), Some(1));
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), None);
}
#[test]
fn test_spsc_wraparound() {
let queue = SpscQueue::<i32, 4>::new();
queue.push(0).unwrap();
queue.push(1).unwrap();
queue.push(2).unwrap();
queue.push(3).unwrap();
assert!(queue.is_full());
assert_eq!(queue.pop(), Some(0));
assert_eq!(queue.pop(), Some(1));
queue.push(4).unwrap();
queue.push(5).unwrap();
assert!(queue.is_full());
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), Some(4));
assert_eq!(queue.pop(), Some(5));
assert_eq!(queue.pop(), None);
}
#[test]
fn test_spsc_peek() {
let queue = SpscQueue::<i32, 4>::new();
assert_eq!(queue.peek(), None);
queue.push(42).unwrap();
assert_eq!(queue.peek(), Some(42));
assert_eq!(queue.len(), 1);
assert_eq!(queue.pop(), Some(42));
assert_eq!(queue.peek(), None);
}
#[test]
fn test_spsc_clear() {
let queue = SpscQueue::<i32, 4>::new();
queue.push(1).unwrap();
queue.push(2).unwrap();
queue.push(3).unwrap();
assert_eq!(queue.len(), 3);
queue.clear();
assert_eq!(queue.len(), 0);
assert!(queue.is_empty());
}
#[test]
fn test_spsc_default_value() {
let queue = SpscQueue::<i32, 4>::with_policies(OverflowPolicy::OverwriteOldest, Some(-1));
assert_eq!(queue.pop(), Some(-1));
queue.push(42).unwrap();
assert_eq!(queue.pop(), Some(42));
assert_eq!(queue.pop(), Some(-1));
}
#[test]
fn test_spsc_policy_change() {
let mut queue = SpscQueue::<i32, 2>::new();
assert_eq!(queue.overflow_policy(), OverflowPolicy::OverwriteOldest);
queue.set_overflow_policy(OverflowPolicy::DropNewest);
assert_eq!(queue.overflow_policy(), OverflowPolicy::DropNewest);
}
#[test]
#[should_panic(expected = "CAP must be a power of two")]
fn test_spsc_invalid_capacity() {
let _ = SpscQueue::<i32, 3>::new();
}
}