ringbuf_blocking/wrap/
cons.rs

1use super::{BlockingWrap, WaitError};
2use crate::{rb::BlockingRbRef, sync::Semaphore};
3use core::time::Duration;
4#[cfg(feature = "std")]
5use ringbuf::traits::Based;
6use ringbuf::{
7    traits::{consumer::DelegateConsumer, observer::DelegateObserver, Consumer, Observer},
8    wrap::Wrap,
9};
10#[cfg(feature = "std")]
11use std::io;
12
13pub type BlockingCons<R> = BlockingWrap<R, false, true>;
14
15impl<R: BlockingRbRef> DelegateObserver for BlockingCons<R> {}
16impl<R: BlockingRbRef> DelegateConsumer for BlockingCons<R> {}
17
18macro_rules! wait_iter {
19    ($self:expr) => {
20        $self.rb.rb().write.take_iter($self.timeout()).reset()
21    };
22}
23
24impl<R: BlockingRbRef> BlockingCons<R> {
25    pub fn is_closed(&self) -> bool {
26        !self.write_is_held()
27    }
28
29    pub fn set_timeout(&mut self, timeout: Option<Duration>) {
30        self.timeout = timeout;
31    }
32    pub fn timeout(&self) -> Option<Duration> {
33        self.timeout
34    }
35
36    pub fn wait_occupied(&mut self, count: usize) -> Result<(), WaitError> {
37        debug_assert!(count <= self.rb().capacity().get());
38        for _ in wait_iter!(self) {
39            if self.base.occupied_len() >= count {
40                return Ok(());
41            }
42            if self.is_closed() {
43                return Err(WaitError::Closed);
44            }
45        }
46        Err(WaitError::TimedOut)
47    }
48
49    pub fn pop(&mut self) -> Result<<Self as Observer>::Item, WaitError> {
50        for _ in wait_iter!(self) {
51            if let Some(item) = self.base.try_pop() {
52                return Ok(item);
53            }
54            if self.is_closed() {
55                return Err(WaitError::Closed);
56            }
57        }
58        Err(WaitError::TimedOut)
59    }
60
61    pub fn pop_all_iter(&mut self) -> PopAllIter<'_, R> {
62        PopAllIter { owner: self }
63    }
64}
65
66impl<R: BlockingRbRef> BlockingCons<R>
67where
68    <Self as Observer>::Item: Copy,
69{
70    pub fn pop_exact(&mut self, mut slice: &mut [<Self as Observer>::Item]) -> usize {
71        if slice.is_empty() {
72            return 0;
73        }
74        let mut count = 0;
75        for _ in wait_iter!(self) {
76            let n = self.base.pop_slice(slice);
77            slice = &mut slice[n..];
78            count += n;
79
80            if slice.is_empty() || (self.is_closed() && self.is_empty()) {
81                break;
82            }
83        }
84        count
85    }
86
87    #[cfg(feature = "alloc")]
88    pub fn pop_until_end(&mut self, vec: &mut alloc::vec::Vec<<Self as Observer>::Item>) {
89        if self.is_closed() && self.is_empty() {
90            return;
91        }
92        for _ in wait_iter!(self) {
93            loop {
94                if vec.len() == vec.capacity() {
95                    vec.reserve(vec.capacity().max(16));
96                }
97                let n = self.base.pop_slice_uninit(vec.spare_capacity_mut());
98                if n == 0 {
99                    break;
100                }
101                unsafe { vec.set_len(vec.len() + n) };
102            }
103            if self.is_closed() && self.is_empty() {
104                break;
105            }
106        }
107    }
108}
109
110#[cfg(feature = "std")]
111impl<R: BlockingRbRef> io::Read for BlockingCons<R>
112where
113    <Self as Based>::Base: Consumer<Item = u8>,
114{
115    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
116        for _ in wait_iter!(self) {
117            let n = self.base.pop_slice(buf);
118            if n > 0 {
119                return Ok(n);
120            }
121            if self.is_closed() {
122                return Ok(0);
123            }
124        }
125        Err(io::ErrorKind::TimedOut.into())
126    }
127}
128
129pub struct PopAllIter<'a, R: BlockingRbRef> {
130    owner: &'a mut BlockingCons<R>,
131}
132
133impl<R: BlockingRbRef> Iterator for PopAllIter<'_, R> {
134    type Item = <R::Rb as Observer>::Item;
135
136    fn next(&mut self) -> Option<Self::Item> {
137        self.owner.pop().ok()
138    }
139}