pub use generic_array::typenum::consts;
use core::{
cell::UnsafeCell,
marker::PhantomData,
mem::{size_of, MaybeUninit, forget},
ops::{Deref, DerefMut},
ptr::NonNull,
slice::from_raw_parts,
slice::from_raw_parts_mut,
sync::atomic::{
AtomicBool, AtomicUsize,
Ordering::{Acquire, Relaxed, Release}
},
};
use generic_array::{ArrayLength, GenericArray};
use crate::{Error, Result};
pub struct BBBuffer<N: ArrayLength<u8>> (
#[doc(hidden)] pub ConstBBBuffer<GenericArray<u8, N>>,
);
unsafe impl<A> Sync for ConstBBBuffer<A> {}
impl<'a, N> BBBuffer<N>
where
N: ArrayLength<u8>,
{
pub fn try_split(&'a self) -> Result<(Producer<'a, N>, Consumer<'a, N>)> {
if self.0.already_split.swap(true, Relaxed) {
return Err(Error::AlreadySplit);
} else {
unsafe {
let mu_ptr = self.0.buf.get();
(*mu_ptr).as_mut_ptr().write_bytes(0u8, 1);
let nn1 = NonNull::new_unchecked(self as *const _ as *mut _);
let nn2 = NonNull::new_unchecked(self as *const _ as *mut _);
Ok((
Producer {
bbq: nn1,
pd: PhantomData,
},
Consumer {
bbq: nn2,
pd: PhantomData,
},
))
}
}
}
}
pub struct ConstBBBuffer<A> {
buf: UnsafeCell<MaybeUninit<A>>,
write: AtomicUsize,
read: AtomicUsize,
last: AtomicUsize,
reserve: AtomicUsize,
read_in_progress: AtomicBool,
already_split: AtomicBool,
}
impl<A> ConstBBBuffer<A> {
pub const fn new() -> Self {
Self {
buf: UnsafeCell::new(MaybeUninit::uninit()),
write: AtomicUsize::new(0),
read: AtomicUsize::new(0),
last: AtomicUsize::new(size_of::<A>()),
reserve: AtomicUsize::new(0),
read_in_progress: AtomicBool::new(false),
already_split: AtomicBool::new(false),
}
}
}
pub struct Producer<'a, N>
where
N: ArrayLength<u8>,
{
bbq: NonNull<BBBuffer<N>>,
pd: PhantomData<&'a ()>,
}
unsafe impl<'a, N> Send for Producer<'a, N>
where
N: ArrayLength<u8>
{ }
impl<'a, N> Producer<'a, N>
where
N: ArrayLength<u8>,
{
pub fn grant(&mut self, sz: usize) -> Result<GrantW<N>> {
let inner = unsafe { &self.bbq.as_ref().0 };
let write = inner.write.load(Relaxed);
if inner.reserve.load(Relaxed) != write {
return Err(Error::GrantInProgress);
}
let read = inner.read.load(Acquire);
let max = N::to_usize();
let already_inverted = write < read;
let start = if already_inverted {
if (write + sz) < read {
write
} else {
return Err(Error::InsufficientSize);
}
} else {
if write + sz <= max {
write
} else {
if sz < read {
0
} else {
return Err(Error::InsufficientSize);
}
}
};
inner.reserve.store(start + sz, Relaxed);
let c = unsafe { (*inner.buf.get()).as_mut_ptr().cast::<u8>() };
let d =
unsafe { from_raw_parts_mut(c.offset(start as isize), sz) };
Ok(GrantW { buf: d, bbq: self.bbq })
}
}
pub struct Consumer<'a, N>
where
N: ArrayLength<u8>,
{
bbq: NonNull<BBBuffer<N>>,
pd: PhantomData<&'a ()>,
}
unsafe impl<'a, N> Send for Consumer<'a, N>
where
N: ArrayLength<u8>
{ }
impl<'a, N> Consumer<'a, N>
where
N: ArrayLength<u8>,
{
pub fn read(&mut self) -> Result<GrantR<N>> {
let inner = unsafe { &self.bbq.as_ref().0 };
if inner.read_in_progress.load(Relaxed) {
return Err(Error::GrantInProgress);
}
let write = inner.write.load(Acquire);
let last = inner.last.load(Acquire);
let mut read = inner.read.load(Relaxed);
if (read == last) && (write < read) {
read = 0;
inner.read.store(0, Release);
}
let sz = if write < read {
last
} else {
write
} - read;
if sz == 0 {
return Err(Error::InsufficientSize);
}
inner.read_in_progress.store(true, Relaxed);
let c = unsafe { (*inner.buf.get()).as_ptr().cast::<u8>() };
let d = unsafe { from_raw_parts(c.offset(read as isize), sz) };
Ok(GrantR { buf: d, bbq: self.bbq })
}
}
impl<N> BBBuffer<N>
where
N: ArrayLength<u8>,
{
pub fn capacity(&self) -> usize {
N::to_usize()
}
}
impl<N> BBBuffer<N>
where
N: ArrayLength<u8>,
{
pub fn new() -> Self {
Self(
ConstBBBuffer::new(),
)
}
}
#[derive(Debug, PartialEq)]
pub struct GrantW<'a, N>
where
N: ArrayLength<u8>
{
buf: &'a mut [u8],
bbq: NonNull<BBBuffer<N>>
}
#[derive(Debug, PartialEq)]
pub struct GrantR<'a, N>
where
N: ArrayLength<u8>
{
buf: &'a [u8],
bbq: NonNull<BBBuffer<N>>
}
impl<'a, N> GrantW<'a, N>
where
N: ArrayLength<u8>
{
pub fn commit(mut self, used: usize) {
self.commit_inner(used);
forget(self);
}
#[inline(always)]
fn commit_inner(&mut self, used: usize) {
let inner = unsafe { &self.bbq.as_ref().0 };
let len = self.buf.len();
assert!(len >= used);
let write = inner.write.load(Relaxed);
inner.reserve.fetch_sub(len - used, Relaxed);
let max = N::to_usize();
let last = inner.last.load(Relaxed);
if (inner.reserve.load(Relaxed) < write) && (write != max) {
inner.last.store(write, Release);
} else if write > last {
inner.last.store(max, Release);
}
inner.write.store(inner.reserve.load(Relaxed), Release);
}
}
impl<'a, N> GrantR<'a, N>
where
N: ArrayLength<u8>
{
pub fn release(mut self, used: usize) {
self.release_inner(used);
forget(self);
}
#[inline(always)]
fn release_inner(&mut self, used: usize) {
let inner = unsafe { &self.bbq.as_ref().0 };
assert!(used <= self.buf.len());
let _ = inner.read.fetch_add(used, Release);
inner.read_in_progress.store(false, Relaxed);
}
}
impl<'a, N> Drop for GrantW<'a, N>
where
N: ArrayLength<u8>,
{
fn drop(&mut self) {
self.commit_inner(0)
}
}
impl<'a, N> Drop for GrantR<'a, N>
where
N: ArrayLength<u8>,
{
fn drop(&mut self) {
self.release_inner(0)
}
}
impl<'a, N> Deref for GrantW<'a, N>
where
N: ArrayLength<u8>,
{
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.buf
}
}
impl<'a, N> DerefMut for GrantW<'a, N>
where
N: ArrayLength<u8>,
{
fn deref_mut(&mut self) -> &mut [u8] {
self.buf
}
}
impl<'a, N> Deref for GrantR<'a, N>
where
N: ArrayLength<u8>,
{
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.buf
}
}