ringbuf_blocking/wrap/
prod.rs1use super::{BlockingWrap, WaitError};
2use crate::{rb::BlockingRbRef, sync::Semaphore};
3use core::time::Duration;
4#[cfg(feature = "std")]
5use ringbuf::traits::Based;
6use ringbuf::{
7 traits::{observer::DelegateObserver, producer::DelegateProducer, Observer, Producer},
8 wrap::Wrap,
9};
10#[cfg(feature = "std")]
11use std::io;
12
13pub type BlockingProd<R> = BlockingWrap<R, true, false>;
14
15impl<R: BlockingRbRef> DelegateObserver for BlockingProd<R> {}
16impl<R: BlockingRbRef> DelegateProducer for BlockingProd<R> {}
17
18macro_rules! wait_iter {
19 ($self:expr) => {
20 $self.rb.rb().read.take_iter($self.timeout()).reset()
21 };
22}
23
24impl<R: BlockingRbRef> BlockingProd<R> {
25 pub fn is_closed(&self) -> bool {
26 !self.read_is_held()
27 }
28
29 pub fn set_timeout(&mut self, timeout: Option<Duration>) {
30 self.timeout = timeout;
31 }
32 pub fn timeout(&self) -> Option<Duration> {
33 self.timeout
34 }
35
36 pub fn wait_vacant(&mut self, count: usize) -> Result<(), WaitError> {
37 debug_assert!(count <= self.rb().capacity().get());
38 for _ in wait_iter!(self) {
39 if self.base.vacant_len() >= count {
40 return Ok(());
41 }
42 if self.is_closed() {
43 return Err(WaitError::Closed);
44 }
45 }
46 Err(WaitError::TimedOut)
47 }
48
49 pub fn push(&mut self, mut item: <Self as Observer>::Item) -> Result<(), (WaitError, <Self as Observer>::Item)> {
50 for _ in wait_iter!(self) {
51 item = match self.base.try_push(item) {
52 Ok(()) => return Ok(()),
53 Err(item) => item,
54 };
55 if self.is_closed() {
56 return Err((WaitError::Closed, item));
57 }
58 }
59 Err((WaitError::TimedOut, item))
60 }
61
62 pub fn push_all_iter<I: Iterator<Item = <Self as Observer>::Item>>(&mut self, iter: I) -> usize {
63 let mut iter = iter.peekable();
64 if iter.peek().is_none() {
65 return 0;
66 }
67 let mut count = 0;
68 for _ in wait_iter!(self) {
69 if self.is_closed() {
70 break;
71 }
72
73 count += self.base.push_iter(&mut iter);
74
75 if iter.peek().is_none() {
76 break;
77 }
78 }
79 count
80 }
81}
82impl<R: BlockingRbRef> BlockingProd<R>
83where
84 <Self as Observer>::Item: Copy,
85{
86 pub fn push_exact(&mut self, mut slice: &[<Self as Observer>::Item]) -> usize {
87 if slice.is_empty() {
88 return 0;
89 }
90
91 let mut count = 0;
92 for _ in wait_iter!(self) {
93 if self.is_closed() {
94 break;
95 }
96
97 let n = self.base.push_slice(slice);
98 slice = &slice[n..];
99 count += n;
100
101 if slice.is_empty() {
102 break;
103 }
104 }
105 count
106 }
107}
108
109#[cfg(feature = "std")]
110impl<R: BlockingRbRef> io::Write for BlockingProd<R>
111where
112 <Self as Based>::Base: Producer<Item = u8>,
113{
114 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
115 for _ in wait_iter!(self) {
116 if self.is_closed() {
117 return Ok(0);
118 }
119 let n = self.base.push_slice(buf);
120 if n > 0 {
121 return Ok(n);
122 }
123 }
124 Err(io::ErrorKind::TimedOut.into())
125 }
126 fn flush(&mut self) -> io::Result<()> {
127 Ok(())
128 }
129}