ringbuf-blocking 0.1.0-rc.6

Blocking version of ringbuf
Documentation
use super::{BlockingWrap, WaitError};
use crate::{rb::BlockingRbRef, sync::Semaphore};
use core::time::Duration;
#[cfg(feature = "std")]
use ringbuf::traits::Based;
use ringbuf::{
    traits::{consumer::DelegateConsumer, observer::DelegateObserver, Consumer, Observer},
    wrap::Wrap,
};
#[cfg(feature = "std")]
use std::io;

pub type BlockingCons<R> = BlockingWrap<R, false, true>;

impl<R: BlockingRbRef> DelegateObserver for BlockingCons<R> {}
impl<R: BlockingRbRef> DelegateConsumer for BlockingCons<R> {}

macro_rules! wait_iter {
    ($self:expr) => {
        $self.rb.rb().write.take_iter($self.timeout()).reset()
    };
}

impl<R: BlockingRbRef> BlockingCons<R> {
    pub fn is_closed(&self) -> bool {
        !self.write_is_held()
    }

    pub fn set_timeout(&mut self, timeout: Option<Duration>) {
        self.timeout = timeout;
    }
    pub fn timeout(&self) -> Option<Duration> {
        self.timeout
    }

    pub fn wait_occupied(&mut self, count: usize) -> Result<(), WaitError> {
        debug_assert!(count <= self.rb().capacity().get());
        for _ in wait_iter!(self) {
            if self.base.occupied_len() >= count {
                return Ok(());
            }
            if self.is_closed() {
                return Err(WaitError::Closed);
            }
        }
        Err(WaitError::TimedOut)
    }

    pub fn pop(&mut self) -> Result<<Self as Observer>::Item, WaitError> {
        for _ in wait_iter!(self) {
            if let Some(item) = self.base.try_pop() {
                return Ok(item);
            }
            if self.is_closed() {
                return Err(WaitError::Closed);
            }
        }
        Err(WaitError::TimedOut)
    }

    pub fn pop_all_iter(&mut self) -> PopAllIter<'_, R> {
        PopAllIter { owner: self }
    }
}

impl<R: BlockingRbRef> BlockingCons<R>
where
    <Self as Observer>::Item: Copy,
{
    pub fn pop_exact(&mut self, mut slice: &mut [<Self as Observer>::Item]) -> usize {
        if slice.is_empty() {
            return 0;
        }
        let mut count = 0;
        for _ in wait_iter!(self) {
            let n = self.base.pop_slice(slice);
            slice = &mut slice[n..];
            count += n;

            if slice.is_empty() || (self.is_closed() && self.is_empty()) {
                break;
            }
        }
        count
    }

    #[cfg(feature = "alloc")]
    pub fn pop_until_end(&mut self, vec: &mut alloc::vec::Vec<<Self as Observer>::Item>) {
        if self.is_closed() && self.is_empty() {
            return;
        }
        for _ in wait_iter!(self) {
            loop {
                if vec.len() == vec.capacity() {
                    vec.reserve(vec.capacity().max(16));
                }
                let n = self.base.pop_slice_uninit(vec.spare_capacity_mut());
                if n == 0 {
                    break;
                }
                unsafe { vec.set_len(vec.len() + n) };
            }
            if self.is_closed() && self.is_empty() {
                break;
            }
        }
    }
}

#[cfg(feature = "std")]
impl<R: BlockingRbRef> io::Read for BlockingCons<R>
where
    <Self as Based>::Base: Consumer<Item = u8>,
{
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        for _ in wait_iter!(self) {
            let n = self.base.pop_slice(buf);
            if n > 0 {
                return Ok(n);
            }
            if self.is_closed() {
                return Ok(0);
            }
        }
        Err(io::ErrorKind::TimedOut.into())
    }
}

pub struct PopAllIter<'a, R: BlockingRbRef> {
    owner: &'a mut BlockingCons<R>,
}

impl<R: BlockingRbRef> Iterator for PopAllIter<'_, R> {
    type Item = <R::Rb as Observer>::Item;

    fn next(&mut self) -> Option<Self::Item> {
        self.owner.pop().ok()
    }
}