async_ringbuf/traits/
producer.rs1use core::{
2 future::Future,
3 iter::Peekable,
4 pin::Pin,
5 task::{Context, Poll, Waker},
6};
7use futures_util::future::FusedFuture;
8use ringbuf::traits::Producer;
9#[cfg(feature = "std")]
10use std::io;
11
12pub trait AsyncProducer: Producer {
13 fn register_waker(&self, waker: &Waker);
14
15 fn close(&mut self);
16 fn is_closed(&self) -> bool {
18 !self.read_is_held()
19 }
20
21 fn push(&mut self, item: Self::Item) -> PushFuture<'_, Self> {
31 PushFuture {
32 owner: self,
33 item: Some(item),
34 }
35 }
36
37 fn push_iter_all<I: Iterator<Item = Self::Item>>(&mut self, iter: I) -> PushIterFuture<'_, Self, I> {
49 PushIterFuture {
50 owner: self,
51 iter: Some(iter.peekable()),
52 }
53 }
54
55 fn wait_vacant(&mut self, count: usize) -> WaitVacantFuture<'_, Self> {
65 debug_assert!(count <= self.capacity().get());
66 WaitVacantFuture {
67 owner: self,
68 count,
69 done: false,
70 }
71 }
72
73 fn push_exact<'a: 'b, 'b>(&'a mut self, slice: &'b [Self::Item]) -> PushSliceFuture<'a, 'b, Self>
84 where
85 Self::Item: Copy,
86 {
87 PushSliceFuture {
88 owner: self,
89 slice: Some(slice),
90 count: 0,
91 }
92 }
93
94 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
96 let mut waker_registered = false;
97 loop {
98 if self.is_closed() {
99 break Poll::Ready(false);
100 }
101 if !self.is_full() {
102 break Poll::Ready(true);
103 }
104 if waker_registered {
105 break Poll::Pending;
106 }
107 self.register_waker(cx.waker());
108 waker_registered = true;
109 }
110 }
111
112 #[cfg(feature = "std")]
114 fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
115 where
116 Self: AsyncProducer<Item = u8> + Unpin,
117 {
118 let mut waker_registered = false;
119 loop {
120 if self.is_closed() {
121 break Poll::Ready(Ok(0));
122 }
123 let count = self.push_slice(buf);
124 if count > 0 {
125 break Poll::Ready(Ok(count));
126 }
127 if waker_registered {
128 break Poll::Pending;
129 }
130 self.register_waker(cx.waker());
131 waker_registered = true;
132 }
133 }
134}
135
136pub struct PushFuture<'a, A: AsyncProducer + ?Sized> {
140 owner: &'a mut A,
141 item: Option<A::Item>,
142}
143impl<A: AsyncProducer> Unpin for PushFuture<'_, A> {}
144impl<A: AsyncProducer> FusedFuture for PushFuture<'_, A> {
145 fn is_terminated(&self) -> bool {
146 self.item.is_none()
147 }
148}
149impl<A: AsyncProducer> Future for PushFuture<'_, A> {
150 type Output = Result<(), A::Item>;
151
152 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
153 let mut waker_registered = false;
154 loop {
155 let item = self.item.take().unwrap();
156 if self.owner.is_closed() {
157 break Poll::Ready(Err(item));
158 }
159 let push_result = self.owner.try_push(item);
160 if push_result.is_ok() {
161 break Poll::Ready(Ok(()));
162 }
163 self.item.replace(push_result.unwrap_err());
164 if waker_registered {
165 break Poll::Pending;
166 }
167 self.owner.register_waker(cx.waker());
168 waker_registered = true;
169 }
170 }
171}
172
173pub struct PushSliceFuture<'a, 'b, A: AsyncProducer + ?Sized>
177where
178 A::Item: Copy,
179{
180 owner: &'a mut A,
181 slice: Option<&'b [A::Item]>,
182 count: usize,
183}
184impl<A: AsyncProducer> Unpin for PushSliceFuture<'_, '_, A> where A::Item: Copy {}
185impl<A: AsyncProducer> FusedFuture for PushSliceFuture<'_, '_, A>
186where
187 A::Item: Copy,
188{
189 fn is_terminated(&self) -> bool {
190 self.slice.is_none()
191 }
192}
193impl<A: AsyncProducer> Future for PushSliceFuture<'_, '_, A>
194where
195 A::Item: Copy,
196{
197 type Output = Result<(), usize>;
198
199 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
200 let mut waker_registered = false;
201 loop {
202 let mut slice = self.slice.take().unwrap();
203 if self.owner.is_closed() {
204 break Poll::Ready(Err(self.count));
205 }
206 let len = self.owner.push_slice(slice);
207 slice = &slice[len..];
208 self.count += len;
209 if slice.is_empty() {
210 break Poll::Ready(Ok(()));
211 }
212 self.slice.replace(slice);
213 if waker_registered {
214 break Poll::Pending;
215 }
216 self.owner.register_waker(cx.waker());
217 waker_registered = true;
218 }
219 }
220}
221impl<A: AsyncProducer> PushSliceFuture<'_, '_, A>
222where
223 A::Item: Copy,
224{
225 pub fn count(&self) -> usize {
226 self.count
227 }
228}
229
230pub struct PushIterFuture<'a, A: AsyncProducer + ?Sized, I: Iterator<Item = A::Item>> {
234 owner: &'a mut A,
235 iter: Option<Peekable<I>>,
236}
237impl<A: AsyncProducer, I: Iterator<Item = A::Item>> Unpin for PushIterFuture<'_, A, I> {}
238impl<A: AsyncProducer, I: Iterator<Item = A::Item>> FusedFuture for PushIterFuture<'_, A, I> {
239 fn is_terminated(&self) -> bool {
240 self.iter.is_none() || self.owner.is_closed()
241 }
242}
243impl<A: AsyncProducer, I: Iterator<Item = A::Item>> Future for PushIterFuture<'_, A, I> {
244 type Output = bool;
245
246 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
247 let mut waker_registered = false;
248 loop {
249 let mut iter = self.iter.take().unwrap();
250 if self.owner.is_closed() {
251 break Poll::Ready(false);
252 }
253 self.owner.push_iter(&mut iter);
254 if iter.peek().is_none() {
255 break Poll::Ready(true);
256 }
257 self.iter.replace(iter);
258 if waker_registered {
259 break Poll::Pending;
260 }
261 self.owner.register_waker(cx.waker());
262 waker_registered = true;
263 }
264 }
265}
266impl<A: AsyncProducer, I: Iterator<Item = A::Item>> PushIterFuture<'_, A, I> {
267 pub fn inner(&self) -> &Peekable<I> {
268 self.iter.as_ref().unwrap()
269 }
270 pub fn inner_mut(&mut self) -> &mut Peekable<I> {
271 self.iter.as_mut().unwrap()
272 }
273 pub fn into_inner(self) -> Peekable<I> {
274 self.iter.unwrap()
275 }
276}
277
278pub struct WaitVacantFuture<'a, A: AsyncProducer + ?Sized> {
282 owner: &'a A,
283 count: usize,
284 done: bool,
285}
286impl<A: AsyncProducer> Unpin for WaitVacantFuture<'_, A> {}
287impl<A: AsyncProducer> FusedFuture for WaitVacantFuture<'_, A> {
288 fn is_terminated(&self) -> bool {
289 self.done
290 }
291}
292impl<A: AsyncProducer> Future for WaitVacantFuture<'_, A> {
293 type Output = ();
294
295 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
296 let mut waker_registered = false;
297 loop {
298 assert!(!self.done);
299 let closed = self.owner.is_closed();
300 if self.count <= self.owner.vacant_len() || closed {
301 break Poll::Ready(());
302 }
303 if waker_registered {
304 break Poll::Pending;
305 }
306 self.owner.register_waker(cx.waker());
307 waker_registered = true;
308 }
309 }
310}