use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
pub struct SpscRingBuffer<T: Send> {
slots: Arc<Mutex<Vec<Option<T>>>>,
cap: usize,
mask: usize,
head: Arc<AtomicUsize>,
tail: Arc<AtomicUsize>,
}
impl<T: Send> SpscRingBuffer<T> {
pub fn new(capacity: usize) -> Self {
let cap = (capacity.max(1) + 1).next_power_of_two();
let mut slots = Vec::with_capacity(cap);
for _ in 0..cap {
slots.push(None);
}
Self {
slots: Arc::new(Mutex::new(slots)),
cap,
mask: cap - 1,
head: Arc::new(AtomicUsize::new(0)),
tail: Arc::new(AtomicUsize::new(0)),
}
}
pub fn try_push(&self, value: T) -> Result<(), T> {
let head = self.head.load(Ordering::Relaxed);
let next_head = (head + 1) & self.mask;
if next_head == self.tail.load(Ordering::Acquire) {
return Err(value);
}
{
let mut guard = self
.slots
.lock()
.unwrap_or_else(|poison| poison.into_inner());
guard[head] = Some(value);
}
self.head.store(next_head, Ordering::Release);
Ok(())
}
pub fn try_pop(&self) -> Option<T> {
let tail = self.tail.load(Ordering::Relaxed);
if tail == self.head.load(Ordering::Acquire) {
return None;
}
let value = {
let mut guard = self
.slots
.lock()
.unwrap_or_else(|poison| poison.into_inner());
guard[tail].take()
};
let next_tail = (tail + 1) & self.mask;
self.tail.store(next_tail, Ordering::Release);
value
}
pub fn is_empty(&self) -> bool {
self.tail.load(Ordering::Acquire) == self.head.load(Ordering::Acquire)
}
pub fn is_full(&self) -> bool {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
((head + 1) & self.mask) == tail
}
pub fn len(&self) -> usize {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
(head.wrapping_sub(tail)) & self.mask
}
pub fn capacity(&self) -> usize {
self.cap - 1
}
}
pub struct SpscProducer<T: Send> {
pub(crate) inner: Arc<SpscRingBuffer<T>>,
}
pub struct SpscConsumer<T: Send> {
pub(crate) inner: Arc<SpscRingBuffer<T>>,
}
impl<T: Send> SpscProducer<T> {
pub fn try_send(&self, value: T) -> Result<(), T> {
self.inner.try_push(value)
}
}
impl<T: Send> SpscConsumer<T> {
pub fn try_recv(&self) -> Option<T> {
self.inner.try_pop()
}
}
pub fn spsc_channel<T: Send>(capacity: usize) -> (SpscProducer<T>, SpscConsumer<T>) {
let ring = Arc::new(SpscRingBuffer::new(capacity));
(
SpscProducer {
inner: Arc::clone(&ring),
},
SpscConsumer { inner: ring },
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_spsc_basic() {
let (tx, rx) = spsc_channel::<u32>(16);
for i in 0u32..10 {
tx.try_send(i).expect("send should succeed");
}
for i in 0u32..10 {
assert_eq!(rx.try_recv(), Some(i), "item {i} out of order");
}
assert_eq!(rx.try_recv(), None, "queue should be empty after draining");
}
#[test]
fn test_spsc_full_boundary() {
let (tx, rx) = spsc_channel::<u32>(4);
let cap = tx.inner.capacity();
assert!(cap >= 4, "usable capacity must be at least 4, got {cap}");
for i in 0..cap {
tx.try_send(i as u32)
.unwrap_or_else(|_| panic!("send {i} should succeed within capacity"));
}
assert!(
tx.try_send(99).is_err(),
"push to full buffer must return Err"
);
let first = rx.try_recv().expect("first pop should succeed");
assert_eq!(first, 0);
tx.try_send(99).expect("send after pop should succeed");
}
#[test]
fn test_spsc_empty_boundary() {
let (_tx, rx) = spsc_channel::<i64>(8);
assert_eq!(rx.try_recv(), None, "pop from empty queue must be None");
}
#[test]
fn test_spsc_concurrent() {
use std::thread;
const N: u64 = 10_000;
let (tx, rx) = spsc_channel::<u64>(64);
let producer = thread::spawn(move || {
let mut i: u64 = 0;
while i < N {
if tx.try_send(i).is_ok() {
i += 1;
} else {
thread::yield_now();
}
}
});
let consumer = thread::spawn(move || {
let mut received: Vec<u64> = Vec::with_capacity(N as usize);
while received.len() < N as usize {
if let Some(v) = rx.try_recv() {
received.push(v);
} else {
thread::yield_now();
}
}
received
});
producer.join().expect("producer thread panicked");
let received = consumer.join().expect("consumer thread panicked");
assert_eq!(received.len(), N as usize);
for (idx, &val) in received.iter().enumerate() {
assert_eq!(val, idx as u64, "item at position {idx} is out of order");
}
}
}