Skip to main content

async_ringbuf/traits/
consumer.rs

1use core::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll, Waker},
5};
6use futures_util::future::FusedFuture;
7use ringbuf::traits::Consumer;
8#[cfg(feature = "std")]
9use std::io;
10
11pub trait AsyncConsumer: Consumer {
12    fn register_waker(&self, waker: &Waker);
13
14    fn close(&mut self);
15    /// Whether the corresponding producer was closed.
16    fn is_closed(&self) -> bool {
17        !self.write_is_held()
18    }
19
20    /// Pop item from the ring buffer waiting asynchronously if the buffer is empty.
21    ///
22    /// Future returns:
23    /// + `Some(item)` - an item is taken.
24    /// + `None` - the buffer is empty and the corresponding producer was dropped.
25    ///
26    /// # Cancel safety
27    ///
28    /// If future is cancelled then no item removed from the ring buffer.
29    fn pop(&mut self) -> PopFuture<'_, Self> {
30        PopFuture { owner: self, done: false }
31    }
32
33    /// Wait for the buffer to contain at least `count` items or to close.
34    ///
35    /// In debug mode panics if `count` is greater than buffer capacity.
36    ///
37    /// The method takes `&mut self` because only single [`WaitOccupiedFuture`] is allowed at a time.
38    ///
39    /// # Cancel safety
40    ///
41    /// The future can be safely cancelled.
42    fn wait_occupied(&mut self, count: usize) -> WaitOccupiedFuture<'_, Self> {
43        debug_assert!(count <= self.capacity().get());
44        WaitOccupiedFuture {
45            owner: self,
46            count,
47            done: false,
48        }
49    }
50
51    /// Fill slice with items from the ring buffer waiting asynchronously until slice filled or corresponding producer closed.
52    ///
53    /// Future returns:
54    /// + `Ok` - the whole slice is filled with the items from the buffer.
55    /// + `Err(count)` - the buffer is empty and the corresponding producer was dropped, number of items copied to slice is returned.
56    ///
57    /// # Cancel safety
58    ///
59    /// If future is cancelled then slice can be partially filled.
60    /// The number of items already copied can be examined by [`PopSliceFuture::count`].
61    fn pop_exact<'a: 'b, 'b>(&'a mut self, slice: &'b mut [Self::Item]) -> PopSliceFuture<'a, 'b, Self>
62    where
63        Self::Item: Copy,
64    {
65        PopSliceFuture {
66            owner: self,
67            slice: Some(slice),
68            count: 0,
69        }
70    }
71
72    /// Fill `vec` with items from the ring buffer waiting asynchronously until corresponding producer closed.
73    ///
74    /// # Cancel safety
75    ///
76    /// If future is cancelled then `vec` contains items taken from RB before cancellation.
77    #[cfg(feature = "alloc")]
78    fn pop_until_end<'a: 'b, 'b>(&'a mut self, vec: &'b mut alloc::vec::Vec<Self::Item>) -> PopVecFuture<'a, 'b, Self> {
79        PopVecFuture {
80            owner: self,
81            vec: Some(vec),
82        }
83    }
84
85    /// Poll for the next item in the ring buffer.
86    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
87    where
88        Self: Unpin,
89    {
90        let mut waker_registered = false;
91        loop {
92            let closed = self.is_closed();
93            if let Some(item) = self.try_pop() {
94                break Poll::Ready(Some(item));
95            }
96            if closed {
97                break Poll::Ready(None);
98            }
99            if waker_registered {
100                break Poll::Pending;
101            }
102            self.register_waker(cx.waker());
103            waker_registered = true;
104        }
105    }
106
107    /// Poll reading bytes from byte buffer.
108    #[cfg(feature = "std")]
109    fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>>
110    where
111        Self: AsyncConsumer<Item = u8> + Unpin,
112    {
113        let mut waker_registered = false;
114        loop {
115            let closed = self.is_closed();
116            let len = self.pop_slice(buf);
117            if len != 0 || closed {
118                break Poll::Ready(Ok(len));
119            }
120            if waker_registered {
121                break Poll::Pending;
122            }
123            self.register_waker(cx.waker());
124            waker_registered = true;
125        }
126    }
127}
128
129/// # Cancel safety
130///
131/// If future is cancelled then no item removed from the ring buffer.
132pub struct PopFuture<'a, A: AsyncConsumer + ?Sized> {
133    owner: &'a mut A,
134    done: bool,
135}
136impl<A: AsyncConsumer> Unpin for PopFuture<'_, A> {}
137impl<A: AsyncConsumer> FusedFuture for PopFuture<'_, A> {
138    fn is_terminated(&self) -> bool {
139        self.done || self.owner.is_closed()
140    }
141}
142impl<A: AsyncConsumer> Future for PopFuture<'_, A> {
143    type Output = Option<A::Item>;
144
145    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
146        let mut waker_registered = false;
147        loop {
148            assert!(!self.done);
149            let closed = self.owner.is_closed();
150            if let Some(item) = self.owner.try_pop() {
151                self.done = true;
152                break Poll::Ready(Some(item));
153            }
154            if closed {
155                break Poll::Ready(None);
156            }
157            if waker_registered {
158                break Poll::Pending;
159            }
160            self.owner.register_waker(cx.waker());
161            waker_registered = true;
162        }
163    }
164}
165
166/// # Cancel safety
167///
168/// If future is cancelled then slice can be partially filled.
169pub struct PopSliceFuture<'a, 'b, A: AsyncConsumer + ?Sized>
170where
171    A::Item: Copy,
172{
173    owner: &'a mut A,
174    slice: Option<&'b mut [A::Item]>,
175    count: usize,
176}
177impl<A: AsyncConsumer> Unpin for PopSliceFuture<'_, '_, A> where A::Item: Copy {}
178impl<A: AsyncConsumer> FusedFuture for PopSliceFuture<'_, '_, A>
179where
180    A::Item: Copy,
181{
182    fn is_terminated(&self) -> bool {
183        self.slice.is_none()
184    }
185}
186impl<A: AsyncConsumer> Future for PopSliceFuture<'_, '_, A>
187where
188    A::Item: Copy,
189{
190    type Output = Result<(), usize>;
191
192    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193        let mut waker_registered = false;
194        loop {
195            let closed = self.owner.is_closed();
196            let mut slice = self.slice.take().unwrap();
197            let len = self.owner.pop_slice(slice);
198            slice = &mut slice[len..];
199            self.count += len;
200            if slice.is_empty() {
201                break Poll::Ready(Ok(()));
202            }
203            if closed {
204                break Poll::Ready(Err(self.count));
205            }
206            self.slice.replace(slice);
207            if waker_registered {
208                break Poll::Pending;
209            }
210            self.owner.register_waker(cx.waker());
211            waker_registered = true;
212        }
213    }
214}
215impl<A: AsyncConsumer> PopSliceFuture<'_, '_, A>
216where
217    A::Item: Copy,
218{
219    /// Number of items already copied from the ring buufer to the slice provided.
220    pub fn count(&self) -> usize {
221        self.count
222    }
223}
224
225/// # Cancel safety
226///
227/// If future is cancelled then `vec` contains items taken from RB before cancellation.
228#[cfg(feature = "alloc")]
229pub struct PopVecFuture<'a, 'b, A: AsyncConsumer + ?Sized> {
230    owner: &'a mut A,
231    vec: Option<&'b mut alloc::vec::Vec<A::Item>>,
232}
233#[cfg(feature = "alloc")]
234impl<A: AsyncConsumer> Unpin for PopVecFuture<'_, '_, A> {}
235#[cfg(feature = "alloc")]
236impl<A: AsyncConsumer> FusedFuture for PopVecFuture<'_, '_, A> {
237    fn is_terminated(&self) -> bool {
238        self.vec.is_none()
239    }
240}
241#[cfg(feature = "alloc")]
242impl<A: AsyncConsumer> Future for PopVecFuture<'_, '_, A> {
243    type Output = ();
244
245    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
246        let mut waker_registered = false;
247        loop {
248            let closed = self.owner.is_closed();
249            let vec = self.vec.take().unwrap();
250
251            loop {
252                if vec.len() == vec.capacity() {
253                    vec.reserve(vec.capacity().max(16));
254                }
255                let n = self.owner.pop_slice_uninit(vec.spare_capacity_mut());
256                if n == 0 {
257                    break;
258                }
259                unsafe { vec.set_len(vec.len() + n) };
260            }
261
262            if closed {
263                break Poll::Ready(());
264            }
265            self.vec.replace(vec);
266            if waker_registered {
267                break Poll::Pending;
268            }
269            self.owner.register_waker(cx.waker());
270            waker_registered = true;
271        }
272    }
273}
274
275/// # Cancel safety
276///
277/// The future can be safely cancelled.
278pub struct WaitOccupiedFuture<'a, A: AsyncConsumer + ?Sized> {
279    owner: &'a A,
280    count: usize,
281    done: bool,
282}
283impl<A: AsyncConsumer> Unpin for WaitOccupiedFuture<'_, A> {}
284impl<A: AsyncConsumer> FusedFuture for WaitOccupiedFuture<'_, A> {
285    fn is_terminated(&self) -> bool {
286        self.done
287    }
288}
289impl<A: AsyncConsumer> Future for WaitOccupiedFuture<'_, A> {
290    type Output = ();
291
292    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
293        let mut waker_registered = false;
294        loop {
295            assert!(!self.done);
296            let closed = self.owner.is_closed();
297            if self.count <= self.owner.occupied_len() || closed {
298                self.done = true;
299                break Poll::Ready(());
300            }
301            if waker_registered {
302                break Poll::Pending;
303            }
304            self.owner.register_waker(cx.waker());
305            waker_registered = true;
306        }
307    }
308}