use crate::{
framed::{FrameConsumer, FrameProducer},
Error, Result,
};
use core::{
cell::UnsafeCell,
cmp::min,
marker::PhantomData,
mem::{forget, transmute, MaybeUninit},
ops::{Deref, DerefMut},
ptr::NonNull,
result::Result as CoreResult,
slice::from_raw_parts_mut,
sync::atomic::{
AtomicBool, AtomicUsize,
Ordering::{AcqRel, Acquire, Release},
},
};
#[derive(Debug)]
pub struct BBBuffer<const N: usize> {
buf: UnsafeCell<MaybeUninit<[u8; N]>>,
write: AtomicUsize,
read: AtomicUsize,
last: AtomicUsize,
reserve: AtomicUsize,
read_in_progress: AtomicBool,
write_in_progress: AtomicBool,
already_split: AtomicBool,
}
unsafe impl<const A: usize> Sync for BBBuffer<A> {}
impl<'a, const N: usize> BBBuffer<N> {
pub fn try_split(&'a self) -> Result<(Producer<'a, N>, Consumer<'a, N>)> {
if atomic::swap(&self.already_split, true, AcqRel) {
return Err(Error::AlreadySplit);
}
unsafe {
let mu_ptr = self.buf.get();
(*mu_ptr).as_mut_ptr().write_bytes(0u8, 1);
let nn1 = NonNull::new_unchecked(self as *const _ as *mut _);
let nn2 = NonNull::new_unchecked(self as *const _ as *mut _);
Ok((
Producer {
bbq: nn1,
pd: PhantomData,
},
Consumer {
bbq: nn2,
pd: PhantomData,
},
))
}
}
pub fn try_split_framed(&'a self) -> Result<(FrameProducer<'a, N>, FrameConsumer<'a, N>)> {
let (producer, consumer) = self.try_split()?;
Ok((FrameProducer { producer }, FrameConsumer { consumer }))
}
pub fn try_release(
&'a self,
prod: Producer<'a, N>,
cons: Consumer<'a, N>,
) -> CoreResult<(), (Producer<'a, N>, Consumer<'a, N>)> {
let our_prod = prod.bbq.as_ptr() as *const Self == self;
let our_cons = cons.bbq.as_ptr() as *const Self == self;
if !(our_prod && our_cons) {
return Err((prod, cons));
}
let wr_in_progress = self.write_in_progress.load(Acquire);
let rd_in_progress = self.read_in_progress.load(Acquire);
if wr_in_progress || rd_in_progress {
return Err((prod, cons));
}
drop(prod);
drop(cons);
self.write.store(0, Release);
self.read.store(0, Release);
self.reserve.store(0, Release);
self.last.store(0, Release);
self.already_split.store(false, Release);
Ok(())
}
pub fn try_release_framed(
&'a self,
prod: FrameProducer<'a, N>,
cons: FrameConsumer<'a, N>,
) -> CoreResult<(), (FrameProducer<'a, N>, FrameConsumer<'a, N>)> {
self.try_release(prod.producer, cons.consumer)
.map_err(|(producer, consumer)| {
(FrameProducer { producer }, FrameConsumer { consumer })
})
}
}
impl<const A: usize> BBBuffer<A> {
pub const fn new() -> Self {
Self {
buf: UnsafeCell::new(MaybeUninit::uninit()),
write: AtomicUsize::new(0),
read: AtomicUsize::new(0),
last: AtomicUsize::new(0),
reserve: AtomicUsize::new(0),
read_in_progress: AtomicBool::new(false),
write_in_progress: AtomicBool::new(false),
already_split: AtomicBool::new(false),
}
}
}
pub struct Producer<'a, const N: usize> {
bbq: NonNull<BBBuffer<N>>,
pd: PhantomData<&'a ()>,
}
unsafe impl<'a, const N: usize> Send for Producer<'a, N> {}
impl<'a, const N: usize> Producer<'a, N> {
pub fn grant_exact(&mut self, sz: usize) -> Result<GrantW<'a, N>> {
let inner = unsafe { &self.bbq.as_ref() };
if atomic::swap(&inner.write_in_progress, true, AcqRel) {
return Err(Error::GrantInProgress);
}
let write = inner.write.load(Acquire);
let read = inner.read.load(Acquire);
let max = N;
let already_inverted = write < read;
let start = if already_inverted {
if (write + sz) < read {
write
} else {
inner.write_in_progress.store(false, Release);
return Err(Error::InsufficientSize);
}
} else {
if write + sz <= max {
write
} else {
if sz < read {
0
} else {
inner.write_in_progress.store(false, Release);
return Err(Error::InsufficientSize);
}
}
};
inner.reserve.store(start + sz, Release);
let start_of_buf_ptr = inner.buf.get().cast::<u8>();
let grant_slice =
unsafe { from_raw_parts_mut(start_of_buf_ptr.offset(start as isize), sz) };
Ok(GrantW {
buf: grant_slice,
bbq: self.bbq,
to_commit: 0,
})
}
pub fn grant_max_remaining(&mut self, mut sz: usize) -> Result<GrantW<'a, N>> {
let inner = unsafe { &self.bbq.as_ref() };
if atomic::swap(&inner.write_in_progress, true, AcqRel) {
return Err(Error::GrantInProgress);
}
let write = inner.write.load(Acquire);
let read = inner.read.load(Acquire);
let max = N;
let already_inverted = write < read;
let start = if already_inverted {
let remain = read - write - 1;
if remain != 0 {
sz = min(remain, sz);
write
} else {
inner.write_in_progress.store(false, Release);
return Err(Error::InsufficientSize);
}
} else {
if write != max {
sz = min(max - write, sz);
write
} else {
if read > 1 {
sz = min(read - 1, sz);
0
} else {
inner.write_in_progress.store(false, Release);
return Err(Error::InsufficientSize);
}
}
};
inner.reserve.store(start + sz, Release);
let start_of_buf_ptr = inner.buf.get().cast::<u8>();
let grant_slice =
unsafe { from_raw_parts_mut(start_of_buf_ptr.offset(start as isize), sz) };
Ok(GrantW {
buf: grant_slice,
bbq: self.bbq,
to_commit: 0,
})
}
}
pub struct Consumer<'a, const N: usize> {
bbq: NonNull<BBBuffer<N>>,
pd: PhantomData<&'a ()>,
}
unsafe impl<'a, const N: usize> Send for Consumer<'a, N> {}
impl<'a, const N: usize> Consumer<'a, N> {
pub fn read(&mut self) -> Result<GrantR<'a, N>> {
let inner = unsafe { &self.bbq.as_ref() };
if atomic::swap(&inner.read_in_progress, true, AcqRel) {
return Err(Error::GrantInProgress);
}
let write = inner.write.load(Acquire);
let last = inner.last.load(Acquire);
let mut read = inner.read.load(Acquire);
if (read == last) && (write < read) {
read = 0;
inner.read.store(0, Release);
}
let sz = if write < read {
last
} else {
write
} - read;
if sz == 0 {
inner.read_in_progress.store(false, Release);
return Err(Error::InsufficientSize);
}
let start_of_buf_ptr = inner.buf.get().cast::<u8>();
let grant_slice = unsafe { from_raw_parts_mut(start_of_buf_ptr.offset(read as isize), sz) };
Ok(GrantR {
buf: grant_slice,
bbq: self.bbq,
to_release: 0,
})
}
pub fn split_read(&mut self) -> Result<SplitGrantR<'a, N>> {
let inner = unsafe { &self.bbq.as_ref() };
if atomic::swap(&inner.read_in_progress, true, AcqRel) {
return Err(Error::GrantInProgress);
}
let write = inner.write.load(Acquire);
let last = inner.last.load(Acquire);
let mut read = inner.read.load(Acquire);
if (read == last) && (write < read) {
read = 0;
inner.read.store(0, Release);
}
let (sz1, sz2) = if write < read {
(last - read, write)
} else {
(write - read, 0)
};
if sz1 == 0 {
inner.read_in_progress.store(false, Release);
return Err(Error::InsufficientSize);
}
let start_of_buf_ptr = inner.buf.get().cast::<u8>();
let grant_slice1 =
unsafe { from_raw_parts_mut(start_of_buf_ptr.offset(read as isize), sz1) };
let grant_slice2 = unsafe { from_raw_parts_mut(start_of_buf_ptr, sz2) };
Ok(SplitGrantR {
buf1: grant_slice1,
buf2: grant_slice2,
bbq: self.bbq,
to_release: 0,
})
}
}
impl<const N: usize> BBBuffer<N> {
pub const fn capacity(&self) -> usize {
N
}
}
#[derive(Debug, PartialEq)]
pub struct GrantW<'a, const N: usize> {
pub(crate) buf: &'a mut [u8],
bbq: NonNull<BBBuffer<N>>,
pub(crate) to_commit: usize,
}
unsafe impl<'a, const N: usize> Send for GrantW<'a, N> {}
#[derive(Debug, PartialEq)]
pub struct GrantR<'a, const N: usize> {
pub(crate) buf: &'a mut [u8],
bbq: NonNull<BBBuffer<N>>,
pub(crate) to_release: usize,
}
#[derive(Debug, PartialEq)]
pub struct SplitGrantR<'a, const N: usize> {
pub(crate) buf1: &'a mut [u8],
pub(crate) buf2: &'a mut [u8],
bbq: NonNull<BBBuffer<N>>,
pub(crate) to_release: usize,
}
unsafe impl<'a, const N: usize> Send for GrantR<'a, N> {}
unsafe impl<'a, const N: usize> Send for SplitGrantR<'a, N> {}
impl<'a, const N: usize> GrantW<'a, N> {
pub fn commit(mut self, used: usize) {
self.commit_inner(used);
forget(self);
}
pub fn buf(&mut self) -> &mut [u8] {
self.buf
}
pub unsafe fn as_static_mut_buf(&mut self) -> &'static mut [u8] {
transmute::<&mut [u8], &'static mut [u8]>(self.buf)
}
#[inline(always)]
pub(crate) fn commit_inner(&mut self, used: usize) {
let inner = unsafe { &self.bbq.as_ref() };
if !inner.write_in_progress.load(Acquire) {
return;
}
let len = self.buf.len();
let used = min(len, used);
let write = inner.write.load(Acquire);
atomic::fetch_sub(&inner.reserve, len - used, AcqRel);
let max = N;
let last = inner.last.load(Acquire);
let new_write = inner.reserve.load(Acquire);
if (new_write < write) && (write != max) {
inner.last.store(write, Release);
} else if new_write > last {
inner.last.store(max, Release);
}
inner.write.store(new_write, Release);
inner.write_in_progress.store(false, Release);
}
pub fn to_commit(&mut self, amt: usize) {
self.to_commit = self.buf.len().min(amt);
}
}
impl<'a, const N: usize> GrantR<'a, N> {
pub fn release(mut self, used: usize) {
let used = min(self.buf.len(), used);
self.release_inner(used);
forget(self);
}
pub(crate) fn shrink(&mut self, len: usize) {
let mut new_buf: &mut [u8] = &mut [];
core::mem::swap(&mut self.buf, &mut new_buf);
let (new, _) = new_buf.split_at_mut(len);
self.buf = new;
}
pub fn buf(&self) -> &[u8] {
self.buf
}
pub fn buf_mut(&mut self) -> &mut [u8] {
self.buf
}
pub unsafe fn as_static_buf(&self) -> &'static [u8] {
transmute::<&[u8], &'static [u8]>(self.buf)
}
#[inline(always)]
pub(crate) fn release_inner(&mut self, used: usize) {
let inner = unsafe { &self.bbq.as_ref() };
if !inner.read_in_progress.load(Acquire) {
return;
}
debug_assert!(used <= self.buf.len());
let _ = atomic::fetch_add(&inner.read, used, Release);
inner.read_in_progress.store(false, Release);
}
pub fn to_release(&mut self, amt: usize) {
self.to_release = self.buf.len().min(amt);
}
}
impl<'a, const N: usize> SplitGrantR<'a, N> {
pub fn release(mut self, used: usize) {
let used = min(self.combined_len(), used);
self.release_inner(used);
forget(self);
}
pub fn bufs(&self) -> (&[u8], &[u8]) {
(self.buf1, self.buf2)
}
pub fn bufs_mut(&mut self) -> (&mut [u8], &mut [u8]) {
(self.buf1, self.buf2)
}
#[inline(always)]
pub(crate) fn release_inner(&mut self, used: usize) {
let inner = unsafe { &self.bbq.as_ref() };
if !inner.read_in_progress.load(Acquire) {
return;
}
debug_assert!(used <= self.combined_len());
if used <= self.buf1.len() {
let _ = atomic::fetch_add(&inner.read, used, Release);
} else {
inner.read.store(used - self.buf1.len(), Release);
}
inner.read_in_progress.store(false, Release);
}
pub fn to_release(&mut self, amt: usize) {
self.to_release = self.combined_len().min(amt);
}
pub fn combined_len(&self) -> usize {
self.buf1.len() + self.buf2.len()
}
}
impl<'a, const N: usize> Drop for GrantW<'a, N> {
fn drop(&mut self) {
self.commit_inner(self.to_commit)
}
}
impl<'a, const N: usize> Drop for GrantR<'a, N> {
fn drop(&mut self) {
self.release_inner(self.to_release)
}
}
impl<'a, const N: usize> Drop for SplitGrantR<'a, N> {
fn drop(&mut self) {
self.release_inner(self.to_release)
}
}
impl<'a, const N: usize> Deref for GrantW<'a, N> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.buf
}
}
impl<'a, const N: usize> DerefMut for GrantW<'a, N> {
fn deref_mut(&mut self) -> &mut [u8] {
self.buf
}
}
impl<'a, const N: usize> Deref for GrantR<'a, N> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
self.buf
}
}
impl<'a, const N: usize> DerefMut for GrantR<'a, N> {
fn deref_mut(&mut self) -> &mut [u8] {
self.buf
}
}
#[cfg(feature = "thumbv6")]
mod atomic {
use core::sync::atomic::{
AtomicBool, AtomicUsize,
Ordering::{self, Acquire, Release},
};
use cortex_m::interrupt::free;
#[inline(always)]
pub fn fetch_add(atomic: &AtomicUsize, val: usize, _order: Ordering) -> usize {
free(|_| {
let prev = atomic.load(Acquire);
atomic.store(prev.wrapping_add(val), Release);
prev
})
}
#[inline(always)]
pub fn fetch_sub(atomic: &AtomicUsize, val: usize, _order: Ordering) -> usize {
free(|_| {
let prev = atomic.load(Acquire);
atomic.store(prev.wrapping_sub(val), Release);
prev
})
}
#[inline(always)]
pub fn swap(atomic: &AtomicBool, val: bool, _order: Ordering) -> bool {
free(|_| {
let prev = atomic.load(Acquire);
atomic.store(val, Release);
prev
})
}
}
#[cfg(not(feature = "thumbv6"))]
mod atomic {
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[inline(always)]
pub fn fetch_add(atomic: &AtomicUsize, val: usize, order: Ordering) -> usize {
atomic.fetch_add(val, order)
}
#[inline(always)]
pub fn fetch_sub(atomic: &AtomicUsize, val: usize, order: Ordering) -> usize {
atomic.fetch_sub(val, order)
}
#[inline(always)]
pub fn swap(atomic: &AtomicBool, val: bool, order: Ordering) -> bool {
atomic.swap(val, order)
}
}