use crate::buffer::MirrorBuffer;
use core::marker::PhantomData;
use core::ptr;
use core::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
#[repr(align(64))]
pub(crate) struct PaddedAtomic(pub(crate) AtomicUsize);
impl core::ops::Deref for PaddedAtomic {
type Target = AtomicUsize;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub(crate) struct SharedState<T> {
pub(crate) buffer: MirrorBuffer,
pub(crate) head: PaddedAtomic,
pub(crate) tail: PaddedAtomic,
pub(crate) capacity: usize,
pub(crate) mask: usize,
pub(crate) _marker: PhantomData<T>,
}
unsafe impl<T: Send> Send for SharedState<T> {}
unsafe impl<T: Send> Sync for SharedState<T> {}
pub struct PicoSPSC<T> {
shared: Arc<SharedState<T>>,
}
pub struct PicoProducer<T> {
shared: Arc<SharedState<T>>,
}
pub struct PicoConsumer<T> {
shared: Arc<SharedState<T>>,
}
impl<T> PicoSPSC<T> {
pub fn new(capacity_count: usize) -> Result<Self, String> {
let item_size = core::mem::size_of::<T>();
if item_size == 0 {
return Err("Zero sized types are not supported in PicoSPSC".into());
}
let total_bytes = capacity_count
.checked_mul(item_size)
.ok_or_else(|| "Requested capacity is too large (overflow)".to_string())?;
let buffer = MirrorBuffer::new(total_bytes)?;
let actual_capacity = buffer.size() / item_size;
let mask = if actual_capacity > 0 && (actual_capacity & (actual_capacity - 1)) == 0 {
actual_capacity - 1
} else {
0
};
let shared = Arc::new(SharedState {
buffer,
head: PaddedAtomic(AtomicUsize::new(0)),
tail: PaddedAtomic(AtomicUsize::new(0)),
capacity: actual_capacity,
mask,
_marker: PhantomData,
});
Ok(Self { shared })
}
pub fn split(self) -> (PicoProducer<T>, PicoConsumer<T>) {
(
PicoProducer {
shared: self.shared.clone(),
},
PicoConsumer {
shared: self.shared,
},
)
}
}
impl<T> PicoProducer<T> {
#[inline]
fn wrap(&self, val: usize) -> usize {
if self.shared.mask != 0 {
val & self.shared.mask
} else {
val % self.shared.capacity
}
}
#[inline]
pub fn push(&self, item: T) -> bool {
let head = self.shared.head.load(Ordering::Relaxed);
let tail = self.shared.tail.load(Ordering::Acquire);
let next_head = self.wrap(head + 1);
if next_head == tail {
return false;
}
unsafe {
let ptr = (self.shared.buffer.as_mut_ptr() as *mut T).add(head);
ptr::write(ptr, item);
}
self.shared.head.store(next_head, Ordering::Release);
true
}
pub fn available_space(&self) -> usize {
let head = self.shared.head.load(Ordering::Relaxed);
let tail = self.shared.tail.load(Ordering::Acquire);
if head >= tail {
self.shared.capacity - (head - tail) - 1
} else {
tail - head - 1
}
}
#[inline]
pub fn writable_slice(&self) -> &mut [T] {
let head = self.shared.head.load(Ordering::Relaxed);
let tail = self.shared.tail.load(Ordering::Acquire);
let space = if head >= tail {
self.shared.capacity - (head - tail) - 1
} else {
tail - head - 1
};
unsafe {
let ptr = (self.shared.buffer.as_mut_ptr() as *mut T).add(head);
core::slice::from_raw_parts_mut(ptr, space)
}
}
#[inline]
pub fn advance_head(&self, n: usize) {
let head = self.shared.head.load(Ordering::Relaxed);
self.shared
.head
.store(self.wrap(head + n), Ordering::Release);
}
}
impl<T: Copy> PicoProducer<T> {
pub fn push_slice(&self, data: &[T]) -> bool {
let n = data.len();
let head = self.shared.head.load(Ordering::Relaxed);
let tail = self.shared.tail.load(Ordering::Acquire);
let current_len = if head >= tail {
head - tail
} else {
self.shared.capacity - (tail - head)
};
if self.shared.capacity - current_len - 1 < n {
return false;
}
unsafe {
let dest_ptr = (self.shared.buffer.as_mut_ptr() as *mut T).add(head);
ptr::copy_nonoverlapping(data.as_ptr(), dest_ptr, n);
}
self.shared
.head
.store(self.wrap(head + n), Ordering::Release);
true
}
}
impl<T> PicoConsumer<T> {
#[inline]
fn wrap(&self, val: usize) -> usize {
if self.shared.mask != 0 {
val & self.shared.mask
} else {
val % self.shared.capacity
}
}
#[inline]
pub fn pop(&self) -> Option<T> {
let tail = self.shared.tail.load(Ordering::Relaxed);
let head = self.shared.head.load(Ordering::Acquire);
if head == tail {
return None;
}
let item = unsafe {
let ptr = (self.shared.buffer.as_mut_ptr() as *const T).add(tail);
ptr::read(ptr)
};
self.shared
.tail
.store(self.wrap(tail + 1), Ordering::Release);
Some(item)
}
#[inline]
pub fn len(&self) -> usize {
let head = self.shared.head.load(Ordering::Acquire);
let tail = self.shared.tail.load(Ordering::Relaxed);
if head >= tail {
head - tail
} else {
self.shared.capacity - (tail - head)
}
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn readable_slice(&self) -> &[T] {
let head = self.shared.head.load(Ordering::Acquire);
let tail = self.shared.tail.load(Ordering::Relaxed);
let len = if head >= tail {
head - tail
} else {
self.shared.capacity - (tail - head)
};
unsafe {
let ptr = (self.shared.buffer.as_mut_ptr() as *const T).add(tail);
core::slice::from_raw_parts(ptr, len)
}
}
#[inline]
pub fn advance_tail(&self, n: usize) {
let tail = self.shared.tail.load(Ordering::Relaxed);
self.shared
.tail
.store(self.wrap(tail + n), Ordering::Release);
}
}
pub fn create_spsc<T>(capacity_count: usize) -> Result<(PicoProducer<T>, PicoConsumer<T>), String> {
Ok(PicoSPSC::new(capacity_count)?.split())
}
impl<T, const N: usize> crate::ring::PicoRing<T, N> {
pub fn into_spsc(self) -> (PicoProducer<T>, PicoConsumer<T>) {
let shared = Arc::new(SharedState {
buffer: self.buffer,
head: PaddedAtomic(AtomicUsize::new(self.head)),
tail: PaddedAtomic(AtomicUsize::new(self.tail)),
capacity: self.capacity,
mask: self.mask,
_marker: PhantomData,
});
(
PicoProducer {
shared: shared.clone(),
},
PicoConsumer { shared },
)
}
pub fn into_mpsc(self) -> (crate::mpsc::PicoMpscProducer<T>, crate::mpsc::PicoMpscConsumer<T>) {
let shared = Arc::new(crate::mpsc::SharedState {
buffer: self.buffer,
head: crate::mpsc::PaddedAtomic(AtomicUsize::new(self.head)),
tail: crate::mpsc::PaddedAtomic(AtomicUsize::new(self.tail)),
commit: crate::mpsc::PaddedAtomic(AtomicUsize::new(self.head)),
capacity: self.capacity,
mask: self.mask,
_marker: PhantomData,
});
(
crate::mpsc::PicoMpscProducer {
shared: shared.clone(),
},
crate::mpsc::PicoMpscConsumer { shared },
)
}
}