use core::{
marker::PhantomData,
ops::{Deref, DerefMut},
ptr::NonNull,
};
use crate::traits::{
bbqhdl::BbqHandle,
coordination::{Coord, ReadGrantError, WriteGrantError},
notifier::{AsyncNotifier, Notifier},
storage::Storage,
};
pub unsafe trait LenHeader: Into<usize> + Copy + Ord {
type Bytes;
fn to_le_bytes(&self) -> Self::Bytes;
fn from_le_bytes(by: Self::Bytes) -> Self;
}
pub struct FramedProducer<Q, H = u16>
where
Q: BbqHandle,
H: LenHeader,
{
pub(crate) bbq: Q::Target,
pub(crate) pd: PhantomData<H>,
}
pub struct FramedConsumer<Q, H = u16>
where
Q: BbqHandle,
H: LenHeader,
{
pub(crate) bbq: Q::Target,
pub(crate) pd: PhantomData<H>,
}
#[must_use = "Write Grants must be committed to be effective"]
pub struct FramedGrantW<Q, H = u16>
where
Q: BbqHandle,
H: LenHeader,
{
bbq: Q::Target,
base_ptr: NonNull<u8>,
hdr: H,
}
#[must_use = "Read Grants must be released to free space"]
pub struct FramedGrantR<Q, H = u16>
where
Q: BbqHandle,
H: LenHeader,
{
bbq: Q::Target,
body_ptr: NonNull<u8>,
hdr: H,
}
unsafe impl LenHeader for u16 {
type Bytes = [u8; 2];
#[inline(always)]
fn to_le_bytes(&self) -> Self::Bytes {
u16::to_le_bytes(*self)
}
#[inline(always)]
fn from_le_bytes(by: Self::Bytes) -> Self {
u16::from_le_bytes(by)
}
}
unsafe impl LenHeader for usize {
type Bytes = [u8; core::mem::size_of::<usize>()];
#[inline(always)]
fn to_le_bytes(&self) -> Self::Bytes {
usize::to_le_bytes(*self)
}
#[inline(always)]
fn from_le_bytes(by: Self::Bytes) -> Self {
usize::from_le_bytes(by)
}
}
impl<Q, H> FramedProducer<Q, H>
where
Q: BbqHandle,
H: LenHeader,
{
pub fn grant(&self, sz: H) -> Result<FramedGrantW<Q, H>, WriteGrantError> {
let (ptr, cap) = unsafe { self.bbq.sto.ptr_len() };
let needed = sz.into() + core::mem::size_of::<H>();
let offset = self.bbq.cor.grant_exact(cap, needed)?;
let base_ptr = unsafe {
let p = ptr.as_ptr().byte_add(offset);
NonNull::new_unchecked(p)
};
Ok(FramedGrantW {
bbq: self.bbq.clone(),
base_ptr,
hdr: sz,
})
}
pub fn capacity(&self) -> usize {
self.bbq.capacity()
}
}
impl<Q, H> FramedProducer<Q, H>
where
Q: BbqHandle,
Q::Notifier: AsyncNotifier,
H: LenHeader,
{
pub async fn wait_grant(&self, sz: H) -> FramedGrantW<Q, H> {
self.bbq.not.wait_for_not_full(|| self.grant(sz).ok()).await
}
}
impl<Q, H> FramedConsumer<Q, H>
where
Q: BbqHandle,
H: LenHeader,
{
pub fn read(&self) -> Result<FramedGrantR<Q, H>, ReadGrantError> {
let (ptr, _cap) = unsafe { self.bbq.sto.ptr_len() };
let (offset, grant_len) = self.bbq.cor.read()?;
let hdr_sz = const { core::mem::size_of::<H>() };
if hdr_sz > grant_len {
self.bbq.cor.release_inner(0);
return Err(ReadGrantError::InconsistentFrameHeader);
}
let ptr = unsafe { ptr.as_ptr().byte_add(offset) };
let hdr: H = unsafe { ptr.cast::<H>().read_unaligned() };
if (hdr_sz + hdr.into()) > grant_len {
self.bbq.cor.release_inner(0);
return Err(ReadGrantError::InconsistentFrameHeader);
}
let body_ptr = unsafe {
let p = ptr.byte_add(hdr_sz);
core::ptr::NonNull::new_unchecked(p)
};
Ok(FramedGrantR {
bbq: self.bbq.clone(),
body_ptr,
hdr,
})
}
pub fn capacity(&self) -> usize {
self.bbq.capacity()
}
}
impl<Q, H> FramedConsumer<Q, H>
where
Q: BbqHandle,
Q::Notifier: AsyncNotifier,
H: LenHeader,
{
pub async fn wait_read(&self) -> FramedGrantR<Q, H> {
self.bbq.not.wait_for_not_empty(|| self.read().ok()).await
}
}
impl<Q, H> FramedGrantW<Q, H>
where
Q: BbqHandle,
H: LenHeader,
{
pub fn commit(self, used: H) {
let (_ptr, cap) = unsafe { self.bbq.sto.ptr_len() };
let hdrlen: usize = const { core::mem::size_of::<H>() };
let grant_len = hdrlen + self.hdr.into();
let clamp_hdr = self.hdr.min(used);
let used_len: usize = hdrlen + clamp_hdr.into();
unsafe {
self.base_ptr
.cast::<H>()
.as_ptr()
.write_unaligned(clamp_hdr);
}
self.bbq.cor.commit_inner(cap, grant_len, used_len);
self.bbq.not.wake_one_consumer();
core::mem::forget(self);
}
pub fn abort(self) {
}
}
impl<Q, H> Deref for FramedGrantW<Q, H>
where
Q: BbqHandle,
H: LenHeader,
{
type Target = [u8];
fn deref(&self) -> &Self::Target {
let len = self.hdr.into();
let body_ptr = unsafe {
let hdr_sz = const { core::mem::size_of::<H>() };
self.base_ptr.as_ptr().byte_add(hdr_sz)
};
unsafe { core::slice::from_raw_parts(body_ptr, len) }
}
}
impl<Q, H> DerefMut for FramedGrantW<Q, H>
where
Q: BbqHandle,
H: LenHeader,
{
fn deref_mut(&mut self) -> &mut Self::Target {
let len = self.hdr.into();
let body_ptr = unsafe {
let hdr_sz = const { core::mem::size_of::<H>() };
self.base_ptr.as_ptr().byte_add(hdr_sz)
};
unsafe { core::slice::from_raw_parts_mut(body_ptr, len) }
}
}
impl<Q, H> Drop for FramedGrantW<Q, H>
where
Q: BbqHandle,
H: LenHeader,
{
fn drop(&mut self) {
let (_ptr, cap) = unsafe { self.bbq.sto.ptr_len() };
let hdrlen: usize = const { core::mem::size_of::<H>() };
let grant_len = hdrlen + self.hdr.into();
self.bbq.cor.commit_inner(cap, grant_len, 0);
}
}
unsafe impl<Q, H> Send for FramedGrantW<Q, H>
where
Q: BbqHandle,
Q::Target: Send,
H: LenHeader + Send,
{
}
impl<Q, H> FramedGrantR<Q, H>
where
Q: BbqHandle,
H: LenHeader,
{
pub fn release(self) {
let len: usize = self.hdr.into();
let hdrlen: usize = const { core::mem::size_of::<H>() };
let used = len + hdrlen;
self.bbq.cor.release_inner(used);
self.bbq.not.wake_one_producer();
core::mem::forget(self);
}
pub fn keep(self) {
}
}
impl<Q, H> Deref for FramedGrantR<Q, H>
where
Q: BbqHandle,
H: LenHeader,
{
type Target = [u8];
fn deref(&self) -> &Self::Target {
let len: usize = self.hdr.into();
unsafe { core::slice::from_raw_parts(self.body_ptr.as_ptr(), len) }
}
}
impl<Q, H> DerefMut for FramedGrantR<Q, H>
where
Q: BbqHandle,
H: LenHeader,
{
fn deref_mut(&mut self) -> &mut Self::Target {
let len: usize = self.hdr.into();
unsafe { core::slice::from_raw_parts_mut(self.body_ptr.as_ptr(), len) }
}
}
impl<Q, H> Drop for FramedGrantR<Q, H>
where
Q: BbqHandle,
H: LenHeader,
{
fn drop(&mut self) {
self.bbq.cor.release_inner(0);
}
}
unsafe impl<Q, H> Send for FramedGrantR<Q, H>
where
Q: BbqHandle,
Q::Target: Send,
H: LenHeader + Send,
{
}