ringbuf_blocking/wrap/
prod.rs

1use super::{BlockingWrap, WaitError};
2use crate::{rb::BlockingRbRef, sync::Semaphore};
3use core::time::Duration;
4#[cfg(feature = "std")]
5use ringbuf::traits::Based;
6use ringbuf::{
7    traits::{observer::DelegateObserver, producer::DelegateProducer, Observer, Producer},
8    wrap::Wrap,
9};
10#[cfg(feature = "std")]
11use std::io;
12
13pub type BlockingProd<R> = BlockingWrap<R, true, false>;
14
15impl<R: BlockingRbRef> DelegateObserver for BlockingProd<R> {}
16impl<R: BlockingRbRef> DelegateProducer for BlockingProd<R> {}
17
18macro_rules! wait_iter {
19    ($self:expr) => {
20        $self.rb.rb().read.take_iter($self.timeout()).reset()
21    };
22}
23
24impl<R: BlockingRbRef> BlockingProd<R> {
25    pub fn is_closed(&self) -> bool {
26        !self.read_is_held()
27    }
28
29    pub fn set_timeout(&mut self, timeout: Option<Duration>) {
30        self.timeout = timeout;
31    }
32    pub fn timeout(&self) -> Option<Duration> {
33        self.timeout
34    }
35
36    pub fn wait_vacant(&mut self, count: usize) -> Result<(), WaitError> {
37        debug_assert!(count <= self.rb().capacity().get());
38        for _ in wait_iter!(self) {
39            if self.base.vacant_len() >= count {
40                return Ok(());
41            }
42            if self.is_closed() {
43                return Err(WaitError::Closed);
44            }
45        }
46        Err(WaitError::TimedOut)
47    }
48
49    pub fn push(&mut self, mut item: <Self as Observer>::Item) -> Result<(), (WaitError, <Self as Observer>::Item)> {
50        for _ in wait_iter!(self) {
51            item = match self.base.try_push(item) {
52                Ok(()) => return Ok(()),
53                Err(item) => item,
54            };
55            if self.is_closed() {
56                return Err((WaitError::Closed, item));
57            }
58        }
59        Err((WaitError::TimedOut, item))
60    }
61
62    pub fn push_all_iter<I: Iterator<Item = <Self as Observer>::Item>>(&mut self, iter: I) -> usize {
63        let mut iter = iter.peekable();
64        if iter.peek().is_none() {
65            return 0;
66        }
67        let mut count = 0;
68        for _ in wait_iter!(self) {
69            if self.is_closed() {
70                break;
71            }
72
73            count += self.base.push_iter(&mut iter);
74
75            if iter.peek().is_none() {
76                break;
77            }
78        }
79        count
80    }
81}
82impl<R: BlockingRbRef> BlockingProd<R>
83where
84    <Self as Observer>::Item: Copy,
85{
86    pub fn push_exact(&mut self, mut slice: &[<Self as Observer>::Item]) -> usize {
87        if slice.is_empty() {
88            return 0;
89        }
90
91        let mut count = 0;
92        for _ in wait_iter!(self) {
93            if self.is_closed() {
94                break;
95            }
96
97            let n = self.base.push_slice(slice);
98            slice = &slice[n..];
99            count += n;
100
101            if slice.is_empty() {
102                break;
103            }
104        }
105        count
106    }
107}
108
109#[cfg(feature = "std")]
110impl<R: BlockingRbRef> io::Write for BlockingProd<R>
111where
112    <Self as Based>::Base: Producer<Item = u8>,
113{
114    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
115        for _ in wait_iter!(self) {
116            if self.is_closed() {
117                return Ok(0);
118            }
119            let n = self.base.push_slice(buf);
120            if n > 0 {
121                return Ok(n);
122            }
123        }
124        Err(io::ErrorKind::TimedOut.into())
125    }
126    fn flush(&mut self) -> io::Result<()> {
127        Ok(())
128    }
129}