use std::{
alloc,
cmp::min,
mem,
num::Wrapping,
ptr::NonNull,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
vec::Drain,
};
struct Cbuf<T> {
buf: NonNull<T>,
cap: usize,
msk: usize,
w_idx: AtomicUsize,
r_idx: AtomicUsize,
}
impl<T> Cbuf<T> {
fn new(capacity: usize) -> Cbuf<T> {
if !capacity.is_power_of_two() {
panic!("capacity not a power of two")
}
let buf = if mem::size_of::<T>() == 0 {
NonNull::dangling()
} else {
let layout = alloc::Layout::array::<T>(capacity).unwrap();
unsafe { NonNull::new(alloc::alloc(layout) as *mut T).unwrap() }
};
Cbuf {
buf: buf,
cap: capacity,
msk: capacity - 1, w_idx: AtomicUsize::new(0),
r_idx: AtomicUsize::new(0),
}
}
}
impl<T> Drop for Cbuf<T> {
fn drop(&mut self) {
if mem::size_of::<T>() == 0 {
return;
}
let mut r_idx = Wrapping(self.r_idx.load(Ordering::Relaxed));
let w_idx = Wrapping(self.w_idx.load(Ordering::Relaxed));
while r_idx != w_idx {
unsafe {
self.buf.as_ptr().add(r_idx.0 & self.msk).drop_in_place();
}
r_idx += 1;
}
unsafe {
let layout = alloc::Layout::array::<T>(self.cap).unwrap();
alloc::dealloc(self.buf.as_ptr() as *mut u8, layout)
}
}
}
pub struct ProdEnd<T> {
cbuf: Arc<Cbuf<T>>,
}
pub struct ConsEnd<T> {
cbuf: Arc<Cbuf<T>>,
}
pub fn new_scbuf<T>(capacity: usize) -> (ProdEnd<T>, ConsEnd<T>) {
let cbuf = Arc::new(Cbuf::<T>::new(capacity));
(
ProdEnd {
cbuf: Arc::clone(&cbuf),
},
ConsEnd {
cbuf: Arc::clone(&cbuf),
},
)
}
impl<T> ProdEnd<T> {
fn space(&self) -> (usize, Wrapping<usize>) {
let cbuf = self.cbuf.as_ref();
let w_idx = Wrapping(cbuf.w_idx.load(Ordering::Relaxed));
let r_idx = Wrapping(cbuf.r_idx.load(Ordering::Relaxed));
(cbuf.cap - (w_idx - r_idx).0, w_idx)
}
pub fn push(&mut self, item: T) -> Option<T> {
let cbuf = self.cbuf.as_ref();
let (space, w_idx) = self.space();
if space == 0 {
return Some(item);
}
unsafe {
cbuf.buf.as_ptr().add(w_idx.0 & cbuf.msk).write(item);
}
cbuf.w_idx.store((w_idx + Wrapping(1)).0, Ordering::Release);
None
}
pub fn write(&mut self, data: &mut Drain<'_, T>) {
let cbuf = self.cbuf.as_ref();
let (space, mut w_idx) = self.space();
for item in data.take(min(space, data.len())) {
unsafe {
cbuf.buf.as_ptr().add(w_idx.0 & cbuf.msk).write(item);
}
w_idx += 1;
}
cbuf.w_idx.store(w_idx.0, Ordering::Release);
}
}
impl<T> ConsEnd<T> {
fn fill(&self) -> (usize, Wrapping<usize>) {
let cbuf = self.cbuf.as_ref();
let r_idx = Wrapping(cbuf.r_idx.load(Ordering::Relaxed));
let w_idx = Wrapping(cbuf.w_idx.load(Ordering::Acquire));
((w_idx - r_idx).0, r_idx)
}
pub fn pop(&mut self) -> Option<T> {
let cbuf = self.cbuf.as_ref();
let (fill, r_idx) = self.fill();
if fill == 0 {
return None;
}
let item = unsafe {
cbuf.buf.as_ptr().add(r_idx.0 & cbuf.msk).read()
};
cbuf.r_idx.store((r_idx + Wrapping(1)).0, Ordering::Release);
Some(item)
}
pub fn read(&mut self, data: &mut Vec<T>) {
let cbuf = self.cbuf.as_ref();
let (fill, mut r_idx) = self.fill();
for _ in 0..min(fill, data.capacity() - data.len()) {
data.push(unsafe {
cbuf.buf.as_ptr().add(r_idx.0 & cbuf.msk).read()
});
r_idx += 1;
}
cbuf.r_idx.store(r_idx.0, Ordering::Release);
}
}
impl<T> Iterator for ConsEnd<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.pop()
}
}
unsafe impl<T> Send for Cbuf<T> {}
unsafe impl<T> Sync for Cbuf<T> {}
#[cfg(test)]
mod tests;