ringbuf_blocking/wrap/
cons.rs1use super::{BlockingWrap, WaitError};
2use crate::{rb::BlockingRbRef, sync::Semaphore};
3use core::time::Duration;
4#[cfg(feature = "std")]
5use ringbuf::traits::Based;
6use ringbuf::{
7 traits::{consumer::DelegateConsumer, observer::DelegateObserver, Consumer, Observer},
8 wrap::Wrap,
9};
10#[cfg(feature = "std")]
11use std::io;
12
13pub type BlockingCons<R> = BlockingWrap<R, false, true>;
14
15impl<R: BlockingRbRef> DelegateObserver for BlockingCons<R> {}
16impl<R: BlockingRbRef> DelegateConsumer for BlockingCons<R> {}
17
18macro_rules! wait_iter {
19 ($self:expr) => {
20 $self.rb.rb().write.take_iter($self.timeout()).reset()
21 };
22}
23
24impl<R: BlockingRbRef> BlockingCons<R> {
25 pub fn is_closed(&self) -> bool {
26 !self.write_is_held()
27 }
28
29 pub fn set_timeout(&mut self, timeout: Option<Duration>) {
30 self.timeout = timeout;
31 }
32 pub fn timeout(&self) -> Option<Duration> {
33 self.timeout
34 }
35
36 pub fn wait_occupied(&mut self, count: usize) -> Result<(), WaitError> {
37 debug_assert!(count <= self.rb().capacity().get());
38 for _ in wait_iter!(self) {
39 if self.base.occupied_len() >= count {
40 return Ok(());
41 }
42 if self.is_closed() {
43 return Err(WaitError::Closed);
44 }
45 }
46 Err(WaitError::TimedOut)
47 }
48
49 pub fn pop(&mut self) -> Result<<Self as Observer>::Item, WaitError> {
50 for _ in wait_iter!(self) {
51 if let Some(item) = self.base.try_pop() {
52 return Ok(item);
53 }
54 if self.is_closed() {
55 return Err(WaitError::Closed);
56 }
57 }
58 Err(WaitError::TimedOut)
59 }
60
61 pub fn pop_all_iter(&mut self) -> PopAllIter<'_, R> {
62 PopAllIter { owner: self }
63 }
64}
65
66impl<R: BlockingRbRef> BlockingCons<R>
67where
68 <Self as Observer>::Item: Copy,
69{
70 pub fn pop_exact(&mut self, mut slice: &mut [<Self as Observer>::Item]) -> usize {
71 if slice.is_empty() {
72 return 0;
73 }
74 let mut count = 0;
75 for _ in wait_iter!(self) {
76 let n = self.base.pop_slice(slice);
77 slice = &mut slice[n..];
78 count += n;
79
80 if slice.is_empty() || (self.is_closed() && self.is_empty()) {
81 break;
82 }
83 }
84 count
85 }
86
87 #[cfg(feature = "alloc")]
88 pub fn pop_until_end(&mut self, vec: &mut alloc::vec::Vec<<Self as Observer>::Item>) {
89 if self.is_closed() && self.is_empty() {
90 return;
91 }
92 for _ in wait_iter!(self) {
93 loop {
94 if vec.len() == vec.capacity() {
95 vec.reserve(vec.capacity().max(16));
96 }
97 let n = self.base.pop_slice_uninit(vec.spare_capacity_mut());
98 if n == 0 {
99 break;
100 }
101 unsafe { vec.set_len(vec.len() + n) };
102 }
103 if self.is_closed() && self.is_empty() {
104 break;
105 }
106 }
107 }
108}
109
110#[cfg(feature = "std")]
111impl<R: BlockingRbRef> io::Read for BlockingCons<R>
112where
113 <Self as Based>::Base: Consumer<Item = u8>,
114{
115 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
116 for _ in wait_iter!(self) {
117 let n = self.base.pop_slice(buf);
118 if n > 0 {
119 return Ok(n);
120 }
121 if self.is_closed() {
122 return Ok(0);
123 }
124 }
125 Err(io::ErrorKind::TimedOut.into())
126 }
127}
128
129pub struct PopAllIter<'a, R: BlockingRbRef> {
130 owner: &'a mut BlockingCons<R>,
131}
132
133impl<R: BlockingRbRef> Iterator for PopAllIter<'_, R> {
134 type Item = <R::Rb as Observer>::Item;
135
136 fn next(&mut self) -> Option<Self::Item> {
137 self.owner.pop().ok()
138 }
139}