use core::{
ops::{Deref, DerefMut},
ptr::NonNull,
};
use crate::traits::{
bbqhdl::BbqHandle,
coordination::{Coord, ReadGrantError, WriteGrantError},
notifier::{AsyncNotifier, Notifier},
storage::Storage,
};
pub struct StreamProducer<Q>
where
Q: BbqHandle,
{
pub(crate) bbq: Q::Target,
}
pub struct StreamConsumer<Q>
where
Q: BbqHandle,
{
pub(crate) bbq: Q::Target,
}
#[must_use = "Write Grants must be committed to be effective"]
pub struct StreamGrantW<Q>
where
Q: BbqHandle,
{
bbq: Q::Target,
ptr: NonNull<u8>,
len: usize,
to_commit: usize,
}
pub struct StreamGrantR<Q>
where
Q: BbqHandle,
{
bbq: Q::Target,
ptr: NonNull<u8>,
len: usize,
to_release: usize,
}
impl<Q> StreamProducer<Q>
where
Q: BbqHandle,
{
pub fn grant_max_remaining(&self, max: usize) -> Result<StreamGrantW<Q>, WriteGrantError> {
let (ptr, cap) = unsafe { self.bbq.sto.ptr_len() };
let (offset, len) = self.bbq.cor.grant_max_remaining(cap, max)?;
let ptr = unsafe {
let p = ptr.as_ptr().byte_add(offset);
NonNull::new_unchecked(p)
};
Ok(StreamGrantW {
bbq: self.bbq.clone(),
ptr,
len,
to_commit: 0,
})
}
pub fn grant_exact(&self, sz: usize) -> Result<StreamGrantW<Q>, WriteGrantError> {
let (ptr, cap) = unsafe { self.bbq.sto.ptr_len() };
let offset = self.bbq.cor.grant_exact(cap, sz)?;
let ptr = unsafe {
let p = ptr.as_ptr().byte_add(offset);
NonNull::new_unchecked(p)
};
Ok(StreamGrantW {
bbq: self.bbq.clone(),
ptr,
len: sz,
to_commit: 0,
})
}
pub fn capacity(&self) -> usize {
self.bbq.capacity()
}
}
impl<Q> StreamProducer<Q>
where
Q: BbqHandle,
Q::Notifier: AsyncNotifier,
{
pub async fn wait_grant_max_remaining(&self, max: usize) -> StreamGrantW<Q> {
self.bbq
.not
.wait_for_not_full(|| self.grant_max_remaining(max).ok())
.await
}
pub async fn wait_grant_exact(&self, sz: usize) -> StreamGrantW<Q> {
self.bbq
.not
.wait_for_not_full(|| self.grant_exact(sz).ok())
.await
}
}
unsafe impl<Q: BbqHandle + Send> Send for StreamProducer<Q> {}
impl<Q> StreamConsumer<Q>
where
Q: BbqHandle,
{
pub fn read(&self) -> Result<StreamGrantR<Q>, ReadGrantError> {
let (ptr, _cap) = unsafe { self.bbq.sto.ptr_len() };
let (offset, len) = self.bbq.cor.read()?;
let ptr = unsafe {
let p = ptr.as_ptr().byte_add(offset);
NonNull::new_unchecked(p)
};
Ok(StreamGrantR {
bbq: self.bbq.clone(),
ptr,
len,
to_release: 0,
})
}
pub fn capacity(&self) -> usize {
self.bbq.capacity()
}
}
impl<Q> StreamConsumer<Q>
where
Q: BbqHandle,
Q::Notifier: AsyncNotifier,
{
pub async fn wait_read(&self) -> StreamGrantR<Q> {
self.bbq.not.wait_for_not_empty(|| self.read().ok()).await
}
}
unsafe impl<Q: BbqHandle + Send> Send for StreamConsumer<Q> {}
impl<Q> StreamGrantW<Q>
where
Q: BbqHandle,
{
pub fn commit(self, used: usize) {
let (_, cap) = unsafe { self.bbq.sto.ptr_len() };
let used = used.min(self.len);
self.bbq.cor.commit_inner(cap, self.len, used);
if used != 0 {
self.bbq.not.wake_one_consumer();
}
core::mem::forget(self);
}
}
impl<Q> Deref for StreamGrantW<Q>
where
Q: BbqHandle,
{
type Target = [u8];
fn deref(&self) -> &Self::Target {
unsafe { core::slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
}
}
impl<Q> DerefMut for StreamGrantW<Q>
where
Q: BbqHandle,
{
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { core::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) }
}
}
impl<Q> Drop for StreamGrantW<Q>
where
Q: BbqHandle,
{
fn drop(&mut self) {
let StreamGrantW {
bbq,
ptr: _,
len,
to_commit,
} = self;
let (_, cap) = unsafe { bbq.sto.ptr_len() };
let len = *len;
let used = (*to_commit).min(len);
bbq.cor.commit_inner(cap, len, used);
if used != 0 {
bbq.not.wake_one_consumer();
}
}
}
unsafe impl<Q: BbqHandle + Send> Send for StreamGrantW<Q> {}
impl<Q> StreamGrantR<Q>
where
Q: BbqHandle,
{
pub fn release(self, used: usize) {
let used = used.min(self.len);
self.bbq.cor.release_inner(used);
if used != 0 {
self.bbq.not.wake_one_producer();
}
core::mem::forget(self);
}
}
impl<Q> Deref for StreamGrantR<Q>
where
Q: BbqHandle,
{
type Target = [u8];
fn deref(&self) -> &Self::Target {
unsafe { core::slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
}
}
impl<Q> DerefMut for StreamGrantR<Q>
where
Q: BbqHandle,
{
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { core::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) }
}
}
impl<Q> Drop for StreamGrantR<Q>
where
Q: BbqHandle,
{
fn drop(&mut self) {
let StreamGrantR {
bbq,
ptr: _,
len,
to_release,
} = self;
let len = *len;
let used = (*to_release).min(len);
bbq.cor.release_inner(used);
if used != 0 {
bbq.not.wake_one_producer();
}
}
}
unsafe impl<Q: BbqHandle + Send> Send for StreamGrantR<Q> {}