async-ringbuf 0.3.6

Async SPSC FIFO ring buffer
Documentation
use core::{
    future::Future,
    pin::Pin,
    task::{Context, Poll, Waker},
};
use futures_util::future::FusedFuture;
use ringbuf::traits::Consumer;
#[cfg(feature = "std")]
use std::io;

pub trait AsyncConsumer: Consumer {
    fn register_waker(&self, waker: &Waker);

    fn close(&mut self);
    /// Whether the corresponding producer was closed.
    fn is_closed(&self) -> bool {
        !self.write_is_held()
    }

    /// Pop item from the ring buffer waiting asynchronously if the buffer is empty.
    ///
    /// Future returns:
    /// + `Some(item)` - an item is taken.
    /// + `None` - the buffer is empty and the corresponding producer was dropped.
    ///
    /// # Cancel safety
    ///
    /// If future is cancelled then no item removed from the ring buffer.
    fn pop(&mut self) -> PopFuture<'_, Self> {
        PopFuture { owner: self, done: false }
    }

    /// Wait for the buffer to contain at least `count` items or to close.
    ///
    /// In debug mode panics if `count` is greater than buffer capacity.
    ///
    /// The method takes `&mut self` because only single [`WaitOccupiedFuture`] is allowed at a time.
    ///
    /// # Cancel safety
    ///
    /// The future can be safely cancelled.
    fn wait_occupied(&mut self, count: usize) -> WaitOccupiedFuture<'_, Self> {
        debug_assert!(count <= self.capacity().get());
        WaitOccupiedFuture {
            owner: self,
            count,
            done: false,
        }
    }

    /// Fill slice with items from the ring buffer waiting asynchronously until slice filled or corresponding producer closed.
    ///
    /// Future returns:
    /// + `Ok` - the whole slice is filled with the items from the buffer.
    /// + `Err(count)` - the buffer is empty and the corresponding producer was dropped, number of items copied to slice is returned.
    ///
    /// # Cancel safety
    ///
    /// If future is cancelled then slice can be partially filled.
    /// The number of items already copied can be examined by [`PopSliceFuture::count`].
    fn pop_exact<'a: 'b, 'b>(&'a mut self, slice: &'b mut [Self::Item]) -> PopSliceFuture<'a, 'b, Self>
    where
        Self::Item: Copy,
    {
        PopSliceFuture {
            owner: self,
            slice: Some(slice),
            count: 0,
        }
    }

    /// Fill `vec` with items from the ring buffer waiting asynchronously until corresponding producer closed.
    ///
    /// # Cancel safety
    ///
    /// If future is cancelled then `vec` contains items taken from RB before cancellation.
    #[cfg(feature = "alloc")]
    fn pop_until_end<'a: 'b, 'b>(&'a mut self, vec: &'b mut alloc::vec::Vec<Self::Item>) -> PopVecFuture<'a, 'b, Self> {
        PopVecFuture {
            owner: self,
            vec: Some(vec),
        }
    }

    /// Poll for the next item in the ring buffer.
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
    where
        Self: Unpin,
    {
        let mut waker_registered = false;
        loop {
            let closed = self.is_closed();
            if let Some(item) = self.try_pop() {
                break Poll::Ready(Some(item));
            }
            if closed {
                break Poll::Ready(None);
            }
            if waker_registered {
                break Poll::Pending;
            }
            self.register_waker(cx.waker());
            waker_registered = true;
        }
    }

    /// Poll reading bytes from byte buffer.
    #[cfg(feature = "std")]
    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>>
    where
        Self: AsyncConsumer<Item = u8> + Unpin,
    {
        let mut waker_registered = false;
        loop {
            let closed = self.is_closed();
            let len = self.pop_slice(buf);
            if len != 0 || closed {
                break Poll::Ready(Ok(len));
            }
            if waker_registered {
                break Poll::Pending;
            }
            self.register_waker(cx.waker());
            waker_registered = true;
        }
    }
}

/// # Cancel safety
///
/// If future is cancelled then no item removed from the ring buffer.
pub struct PopFuture<'a, A: AsyncConsumer + ?Sized> {
    owner: &'a mut A,
    done: bool,
}
impl<A: AsyncConsumer> Unpin for PopFuture<'_, A> {}
impl<A: AsyncConsumer> FusedFuture for PopFuture<'_, A> {
    fn is_terminated(&self) -> bool {
        self.done || self.owner.is_closed()
    }
}
impl<A: AsyncConsumer> Future for PopFuture<'_, A> {
    type Output = Option<A::Item>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut waker_registered = false;
        loop {
            assert!(!self.done);
            let closed = self.owner.is_closed();
            if let Some(item) = self.owner.try_pop() {
                self.done = true;
                break Poll::Ready(Some(item));
            }
            if closed {
                break Poll::Ready(None);
            }
            if waker_registered {
                break Poll::Pending;
            }
            self.owner.register_waker(cx.waker());
            waker_registered = true;
        }
    }
}

/// # Cancel safety
///
/// If future is cancelled then slice can be partially filled.
pub struct PopSliceFuture<'a, 'b, A: AsyncConsumer + ?Sized>
where
    A::Item: Copy,
{
    owner: &'a mut A,
    slice: Option<&'b mut [A::Item]>,
    count: usize,
}
impl<A: AsyncConsumer> Unpin for PopSliceFuture<'_, '_, A> where A::Item: Copy {}
impl<A: AsyncConsumer> FusedFuture for PopSliceFuture<'_, '_, A>
where
    A::Item: Copy,
{
    fn is_terminated(&self) -> bool {
        self.slice.is_none()
    }
}
impl<A: AsyncConsumer> Future for PopSliceFuture<'_, '_, A>
where
    A::Item: Copy,
{
    type Output = Result<(), usize>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut waker_registered = false;
        loop {
            let closed = self.owner.is_closed();
            let mut slice = self.slice.take().unwrap();
            let len = self.owner.pop_slice(slice);
            slice = &mut slice[len..];
            self.count += len;
            if slice.is_empty() {
                break Poll::Ready(Ok(()));
            }
            if closed {
                break Poll::Ready(Err(self.count));
            }
            self.slice.replace(slice);
            if waker_registered {
                break Poll::Pending;
            }
            self.owner.register_waker(cx.waker());
            waker_registered = true;
        }
    }
}
impl<A: AsyncConsumer> PopSliceFuture<'_, '_, A>
where
    A::Item: Copy,
{
    /// Number of items already copied from the ring buufer to the slice provided.
    pub fn count(&self) -> usize {
        self.count
    }
}

/// # Cancel safety
///
/// If future is cancelled then `vec` contains items taken from RB before cancellation.
#[cfg(feature = "alloc")]
pub struct PopVecFuture<'a, 'b, A: AsyncConsumer + ?Sized> {
    owner: &'a mut A,
    vec: Option<&'b mut alloc::vec::Vec<A::Item>>,
}
#[cfg(feature = "alloc")]
impl<A: AsyncConsumer> Unpin for PopVecFuture<'_, '_, A> {}
#[cfg(feature = "alloc")]
impl<A: AsyncConsumer> FusedFuture for PopVecFuture<'_, '_, A> {
    fn is_terminated(&self) -> bool {
        self.vec.is_none()
    }
}
#[cfg(feature = "alloc")]
impl<A: AsyncConsumer> Future for PopVecFuture<'_, '_, A> {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut waker_registered = false;
        loop {
            let closed = self.owner.is_closed();
            let vec = self.vec.take().unwrap();

            loop {
                if vec.len() == vec.capacity() {
                    vec.reserve(vec.capacity().max(16));
                }
                let n = self.owner.pop_slice_uninit(vec.spare_capacity_mut());
                if n == 0 {
                    break;
                }
                unsafe { vec.set_len(vec.len() + n) };
            }

            if closed {
                break Poll::Ready(());
            }
            self.vec.replace(vec);
            if waker_registered {
                break Poll::Pending;
            }
            self.owner.register_waker(cx.waker());
            waker_registered = true;
        }
    }
}

/// # Cancel safety
///
/// The future can be safely cancelled.
pub struct WaitOccupiedFuture<'a, A: AsyncConsumer + ?Sized> {
    owner: &'a A,
    count: usize,
    done: bool,
}
impl<A: AsyncConsumer> Unpin for WaitOccupiedFuture<'_, A> {}
impl<A: AsyncConsumer> FusedFuture for WaitOccupiedFuture<'_, A> {
    fn is_terminated(&self) -> bool {
        self.done
    }
}
impl<A: AsyncConsumer> Future for WaitOccupiedFuture<'_, A> {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut waker_registered = false;
        loop {
            assert!(!self.done);
            let closed = self.owner.is_closed();
            if self.count <= self.owner.occupied_len() || closed {
                self.done = true;
                break Poll::Ready(());
            }
            if waker_registered {
                break Poll::Pending;
            }
            self.owner.register_waker(cx.waker());
            waker_registered = true;
        }
    }
}