use core::cell::{Cell, UnsafeCell};
use core::marker::PhantomData;
use core::mem::MaybeUninit;
use core::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
struct Inner<T> {
buffer: Box<[UnsafeCell<MaybeUninit<T>>]>,
capacity: usize,
head: AtomicUsize,
tail: AtomicUsize,
}
unsafe impl<T: Send> Sync for Inner<T> {}
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
let mut head = *self.head.get_mut();
let tail = *self.tail.get_mut();
while head != tail {
let idx = head % self.capacity;
unsafe {
(*self.buffer[idx].get()).assume_init_drop();
}
head = head.wrapping_add(1);
}
}
}
pub struct Producer<T> {
inner: Arc<Inner<T>>,
_not_sync: PhantomData<Cell<()>>,
}
pub struct Consumer<T> {
inner: Arc<Inner<T>>,
_not_sync: PhantomData<Cell<()>>,
}
unsafe impl<T: Send> Send for Producer<T> {}
unsafe impl<T: Send> Send for Consumer<T> {}
pub fn channel<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
assert!(capacity > 0, "channel capacity must be non-zero");
let mut buffer = Vec::with_capacity(capacity);
for _ in 0..capacity {
buffer.push(UnsafeCell::new(MaybeUninit::<T>::uninit()));
}
let inner = Arc::new(Inner {
buffer: buffer.into_boxed_slice(),
capacity,
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
});
let producer = Producer {
inner: Arc::clone(&inner),
_not_sync: PhantomData,
};
let consumer = Consumer {
inner,
_not_sync: PhantomData,
};
(producer, consumer)
}
impl<T> Producer<T> {
pub fn try_push(&self, value: T) -> Result<(), T> {
let tail = self.inner.tail.load(Ordering::Relaxed);
let head = self.inner.head.load(Ordering::Acquire);
if tail.wrapping_sub(head) >= self.inner.capacity {
return Err(value);
}
let idx = tail % self.inner.capacity;
unsafe {
(*self.inner.buffer[idx].get()).write(value);
}
self.inner
.tail
.store(tail.wrapping_add(1), Ordering::Release);
Ok(())
}
pub fn capacity(&self) -> usize {
self.inner.capacity
}
}
impl<T> Consumer<T> {
pub fn try_pop(&self) -> Option<T> {
let head = self.inner.head.load(Ordering::Relaxed);
let tail = self.inner.tail.load(Ordering::Acquire);
if head == tail {
return None;
}
let idx = head % self.inner.capacity;
let value = unsafe { (*self.inner.buffer[idx].get()).assume_init_read() };
self.inner
.head
.store(head.wrapping_add(1), Ordering::Release);
Some(value)
}
pub fn capacity(&self) -> usize {
self.inner.capacity
}
}
#[allow(dead_code)]
fn _doc_compile_fail_anchors() {}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc as StdArc;
use std::thread;
#[test]
fn push_pop_preserves_fifo_order() {
let (tx, rx) = channel::<u32>(4);
for i in 0..4 {
tx.try_push(i).unwrap();
}
let mut received = Vec::new();
while let Some(v) = rx.try_pop() {
received.push(v);
}
assert_eq!(received, vec![0, 1, 2, 3]);
}
#[test]
fn push_fails_when_full() {
let (tx, _rx) = channel::<u8>(3);
assert!(tx.try_push(1).is_ok());
assert!(tx.try_push(2).is_ok());
assert!(tx.try_push(3).is_ok());
assert_eq!(tx.try_push(4), Err(4));
}
#[test]
fn pop_returns_none_when_empty() {
let (_tx, rx) = channel::<u8>(3);
assert_eq!(rx.try_pop(), None);
}
#[test]
fn alternating_push_pop_handles_wrap_around() {
let (tx, rx) = channel::<usize>(4);
for i in 0..1_000 {
tx.try_push(i).unwrap();
assert_eq!(rx.try_pop(), Some(i));
}
assert_eq!(rx.try_pop(), None);
}
#[test]
fn capacity_reported_through_both_ends() {
let (tx, rx) = channel::<u8>(7);
assert_eq!(tx.capacity(), 7);
assert_eq!(rx.capacity(), 7);
}
#[test]
#[should_panic(expected = "capacity must be non-zero")]
fn zero_capacity_panics() {
let _ = channel::<u8>(0);
}
#[test]
fn concurrent_push_pop_preserves_order_and_count() {
const ITEMS: usize = 50_000;
let (tx, rx) = channel::<usize>(16);
let producer = thread::spawn(move || {
let mut i = 0;
while i < ITEMS {
if tx.try_push(i).is_ok() {
i += 1;
}
}
});
let mut received = Vec::with_capacity(ITEMS);
while received.len() < ITEMS {
if let Some(v) = rx.try_pop() {
received.push(v);
}
}
producer.join().unwrap();
assert_eq!(received.len(), ITEMS);
for (i, v) in received.iter().enumerate() {
assert_eq!(*v, i, "out-of-order item at position {i}");
}
}
#[derive(Debug)]
struct DropCounter(StdArc<AtomicUsize>);
impl Drop for DropCounter {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
#[test]
fn drop_runs_for_un_popped_items() {
let counter = StdArc::new(AtomicUsize::new(0));
{
let (tx, _rx) = channel::<DropCounter>(8);
for _ in 0..5 {
tx.try_push(DropCounter(StdArc::clone(&counter))).unwrap();
}
}
assert_eq!(counter.load(Ordering::SeqCst), 5);
}
#[test]
fn every_item_drops_exactly_once_across_pop_and_inner_drop() {
let counter = StdArc::new(AtomicUsize::new(0));
{
let (tx, rx) = channel::<DropCounter>(8);
for _ in 0..4 {
tx.try_push(DropCounter(StdArc::clone(&counter))).unwrap();
}
for _ in 0..3 {
let _ = rx.try_pop().expect("queue should not be empty");
}
assert_eq!(
counter.load(Ordering::SeqCst),
3,
"popped items must drop on pop"
);
}
assert_eq!(
counter.load(Ordering::SeqCst),
4,
"every item must drop exactly once across pop + Inner::drop"
);
}
#[test]
fn producer_is_send() {
fn assert_send<T: Send>() {}
assert_send::<Producer<u32>>();
assert_send::<Consumer<u32>>();
}
}