use crate::{Error, Result};
use core::{
cell::UnsafeCell,
cmp::min,
marker::PhantomData,
mem::{forget, transmute, MaybeUninit},
ops::{Deref, DerefMut},
ptr::NonNull,
slice::from_raw_parts,
slice::from_raw_parts_mut,
sync::atomic::{
AtomicBool, AtomicUsize,
Ordering::{AcqRel, Acquire, Release},
},
};
pub use generic_array::typenum::consts;
use generic_array::{ArrayLength, GenericArray};
pub struct BBBuffer<N: ArrayLength<u8>>(
#[doc(hidden)] pub ConstBBBuffer<GenericArray<u8, N>>,
);
unsafe impl<A> Sync for ConstBBBuffer<A> {}
impl<'a, N> BBBuffer<N>
where
N: ArrayLength<u8>,
{
pub fn try_split(&'a self) -> Result<(Producer<'a, N>, Consumer<'a, N>)> {
if self.0.already_split.swap(true, AcqRel) {
return Err(Error::AlreadySplit);
} else {
unsafe {
let mu_ptr = self.0.buf.get();
(*mu_ptr).as_mut_ptr().write_bytes(0u8, 1);
let nn1 = NonNull::new_unchecked(self as *const _ as *mut _);
let nn2 = NonNull::new_unchecked(self as *const _ as *mut _);
Ok((
Producer {
bbq: nn1,
pd: PhantomData,
},
Consumer {
bbq: nn2,
pd: PhantomData,
},
))
}
}
}
}
pub struct ConstBBBuffer<A> {
buf: UnsafeCell<MaybeUninit<A>>,
write: AtomicUsize,
read: AtomicUsize,
last: AtomicUsize,
reserve: AtomicUsize,
read_in_progress: AtomicBool,
already_split: AtomicBool,
}
impl<A> ConstBBBuffer<A> {
pub const fn new() -> Self {
Self {
buf: UnsafeCell::new(MaybeUninit::uninit()),
write: AtomicUsize::new(0),
read: AtomicUsize::new(0),
last: AtomicUsize::new(0),
reserve: AtomicUsize::new(0),
read_in_progress: AtomicBool::new(false),
already_split: AtomicBool::new(false),
}
}
}
pub struct Producer<'a, N>
where
N: ArrayLength<u8>,
{
bbq: NonNull<BBBuffer<N>>,
pd: PhantomData<&'a ()>,
}
unsafe impl<'a, N> Send for Producer<'a, N> where N: ArrayLength<u8> {}
impl<'a, N> Producer<'a, N>
where
N: ArrayLength<u8>,
{
pub fn grant_exact(&mut self, sz: usize) -> Result<GrantW<'a, N>> {
let inner = unsafe { &self.bbq.as_ref().0 };
let write = inner.write.load(Acquire);
if inner.reserve.load(Acquire) != write {
return Err(Error::GrantInProgress);
}
let read = inner.read.load(Acquire);
let max = N::to_usize();
let already_inverted = write < read;
let start = if already_inverted {
if (write + sz) < read {
write
} else {
return Err(Error::InsufficientSize);
}
} else {
if write + sz <= max {
write
} else {
if sz < read {
0
} else {
return Err(Error::InsufficientSize);
}
}
};
inner.reserve.store(start + sz, Release);
let start_of_buf_ptr = inner.buf.get().cast::<u8>();
let grant_slice =
unsafe { from_raw_parts_mut(start_of_buf_ptr.offset(start as isize), sz) };
Ok(GrantW {
buf: grant_slice,
bbq: self.bbq,
})
}
pub fn grant_max_remaining(&mut self, mut sz: usize) -> Result<GrantW<'a, N>> {
let inner = unsafe { &self.bbq.as_ref().0 };
let write = inner.write.load(Acquire);
if inner.reserve.load(Acquire) != write {
return Err(Error::GrantInProgress);
}
let read = inner.read.load(Acquire);
let max = N::to_usize();
let already_inverted = write < read;
let start = if already_inverted {
let remain = read - write - 1;
if remain != 0 {
sz = min(remain, sz);
write
} else {
return Err(Error::InsufficientSize);
}
} else {
if write != max {
sz = min(max - write, sz);
write
} else {
if read > 1 {
sz = min(read - 1, sz);
0
} else {
return Err(Error::InsufficientSize);
}
}
};
inner.reserve.store(start + sz, Release);
let start_of_buf_ptr = inner.buf.get().cast::<u8>();
let grant_slice =
unsafe { from_raw_parts_mut(start_of_buf_ptr.offset(start as isize), sz) };
Ok(GrantW {
buf: grant_slice,
bbq: self.bbq,
})
}
}
pub struct Consumer<'a, N>
where
N: ArrayLength<u8>,
{
bbq: NonNull<BBBuffer<N>>,
pd: PhantomData<&'a ()>,
}
unsafe impl<'a, N> Send for Consumer<'a, N> where N: ArrayLength<u8> {}
impl<'a, N> Consumer<'a, N>
where
N: ArrayLength<u8>,
{
pub fn read(&mut self) -> Result<GrantR<'a, N>> {
let inner = unsafe { &self.bbq.as_ref().0 };
if inner.read_in_progress.load(Acquire) {
return Err(Error::GrantInProgress);
}
let write = inner.write.load(Acquire);
let last = inner.last.load(Acquire);
let mut read = inner.read.load(Acquire);
if (read == last) && (write < read) {
read = 0;
inner.read.store(0, Release);
}
let sz = if write < read {
last
} else {
write
} - read;
if sz == 0 {
return Err(Error::InsufficientSize);
}
inner.read_in_progress.store(true, Release);
let start_of_buf_ptr = inner.buf.get().cast::<u8>();
let grant_slice = unsafe { from_raw_parts(start_of_buf_ptr.offset(read as isize), sz) };
Ok(GrantR {
buf: grant_slice,
bbq: self.bbq,
})
}
}
impl<N> BBBuffer<N>
where
N: ArrayLength<u8>,
{
pub fn capacity(&self) -> usize {
N::to_usize()
}
}
impl<N> BBBuffer<N>
where
N: ArrayLength<u8>,
{
pub fn new() -> Self {
Self(ConstBBBuffer::new())
}
}
#[derive(Debug, PartialEq)]
pub struct GrantW<'a, N>
where
N: ArrayLength<u8>,
{
buf: &'a mut [u8],
bbq: NonNull<BBBuffer<N>>,
}
#[derive(Debug, PartialEq)]
pub struct GrantR<'a, N>
where
N: ArrayLength<u8>,
{
buf: &'a [u8],
bbq: NonNull<BBBuffer<N>>,
}
impl<'a, N> GrantW<'a, N>
where
N: ArrayLength<u8>,
{
pub fn commit(mut self, used: usize) {
self.commit_inner(used);
forget(self);
}
pub fn buf(&mut self) -> &mut [u8] {
self.buf
}
pub unsafe fn as_static_mut_buf(&mut self) -> &'static mut [u8] {
transmute::<&mut [u8], &'static mut [u8]>(self.buf)
}
#[inline(always)]
fn commit_inner(&mut self, used: usize) {
let inner = unsafe { &self.bbq.as_ref().0 };
let len = self.buf.len();
let used = min(len, used);
let write = inner.write.load(Acquire);
inner.reserve.fetch_sub(len - used, AcqRel);
let max = N::to_usize();
let last = inner.last.load(Acquire);
let new_write = inner.reserve.load(Acquire);
if (new_write < write) && (write != max) {
inner.last.store(write, Release);
} else if new_write > last {
inner.last.store(max, Release);
}
inner.write.store(new_write, Release);
}
}
impl<'a, N> GrantR<'a, N>
where
N: ArrayLength<u8>,
{
pub fn release(mut self, used: usize) {
self.release_inner(used);
forget(self);
}
pub fn buf(&self) -> &[u8] {
self.buf
}
pub unsafe fn as_static_buf(&self) -> &'static [u8] {
transmute::<&[u8], &'static [u8]>(self.buf)
}
#[inline(always)]
fn release_inner(&mut self, used: usize) {
let inner = unsafe { &self.bbq.as_ref().0 };
let used = min(self.buf.len(), used);
let _ = inner.read.fetch_add(used, Release);
inner.read_in_progress.store(false, Release);
}
}
impl<'a, N> Drop for GrantW<'a, N>
where
N: ArrayLength<u8>,
{
fn drop(&mut self) {
self.commit_inner(0)
}
}
impl<'a, N> Drop for GrantR<'a, N>
where
N: ArrayLength<u8>,
{
fn drop(&mut self) {
self.release_inner(0)
}
}
impl<'a, N> Deref for GrantW<'a, N>
where
N: ArrayLength<u8>,
{
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.buf
}
}
impl<'a, N> DerefMut for GrantW<'a, N>
where
N: ArrayLength<u8>,
{
fn deref_mut(&mut self) -> &mut [u8] {
self.buf
}
}
impl<'a, N> Deref for GrantR<'a, N>
where
N: ArrayLength<u8>,
{
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.buf
}
}