Skip to main content

async_ringbuf/traits/
producer.rs

1use core::{
2    future::Future,
3    iter::Peekable,
4    pin::Pin,
5    task::{Context, Poll, Waker},
6};
7use futures_util::future::FusedFuture;
8use ringbuf::traits::Producer;
9#[cfg(feature = "std")]
10use std::io;
11
12pub trait AsyncProducer: Producer {
13    fn register_waker(&self, waker: &Waker);
14
15    fn close(&mut self);
16    /// Whether the corresponding consumer was closed.
17    fn is_closed(&self) -> bool {
18        !self.read_is_held()
19    }
20
21    /// Push item to the ring buffer waiting asynchronously if the buffer is full.
22    ///
23    /// Future returns:
24    /// + `Ok` - item successfully pushed.
25    /// + `Err(item)` - the corresponding consumer was dropped, item is returned back.
26    ///
27    /// # Cancel safety
28    ///
29    /// If future is cancelled no item pushed to the RB.
30    fn push(&mut self, item: Self::Item) -> PushFuture<'_, Self> {
31        PushFuture {
32            owner: self,
33            item: Some(item),
34        }
35    }
36
37    /// Push items from iterator waiting asynchronously if the buffer is full.
38    ///
39    /// Future returns:
40    /// + `true` - iterator ended.
41    /// + `false` - the corresponding consumer was dropped.
42    ///
43    /// # Cancel safety
44    ///
45    /// If future is cancelled then remaining items are left in iterator.
46    /// You can get the iterator by using [`PushIterFuture::inner`], [`PushIterFuture::inner_mut`] and [`PushIterFuture::into_inner`].
47    /// *Note that the iterator is [`Peekable`].*
48    fn push_iter_all<I: Iterator<Item = Self::Item>>(&mut self, iter: I) -> PushIterFuture<'_, Self, I> {
49        PushIterFuture {
50            owner: self,
51            iter: Some(iter.peekable()),
52        }
53    }
54
55    /// Wait for the buffer to have at least `count` free places for items or to close.
56    ///
57    /// In debug mode panics if `count` is greater than buffer capacity.
58    ///
59    /// The method takes `&mut self` because only single [`WaitVacantFuture`] is allowed at a time.
60    ///
61    /// # Cancel safety
62    ///
63    /// You can safely cancel this future.
64    fn wait_vacant(&mut self, count: usize) -> WaitVacantFuture<'_, Self> {
65        debug_assert!(count <= self.capacity().get());
66        WaitVacantFuture {
67            owner: self,
68            count,
69            done: false,
70        }
71    }
72
73    /// Copy slice contents to the buffer waiting asynchronously if the buffer is full.
74    ///
75    /// Future returns:
76    /// + `Ok` - all slice contents are copied.
77    /// + `Err(count)` - the corresponding consumer was dropped, number of copied items returned.
78    ///
79    /// # Cancel safety
80    ///
81    /// On cancel the slice can be copied partially.
82    /// The number of items already copied can be examined by [`PushSliceFuture::count`].
83    fn push_exact<'a: 'b, 'b>(&'a mut self, slice: &'b [Self::Item]) -> PushSliceFuture<'a, 'b, Self>
84    where
85        Self::Item: Copy,
86    {
87        PushSliceFuture {
88            owner: self,
89            slice: Some(slice),
90            count: 0,
91        }
92    }
93
94    /// Poll the ring buffer has free slot for at least one item and the corresponding consumer is not closed.
95    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<bool> {
96        let mut waker_registered = false;
97        loop {
98            if self.is_closed() {
99                break Poll::Ready(false);
100            }
101            if !self.is_full() {
102                break Poll::Ready(true);
103            }
104            if waker_registered {
105                break Poll::Pending;
106            }
107            self.register_waker(cx.waker());
108            waker_registered = true;
109        }
110    }
111
112    /// Poll writing bytes into byte buffer.
113    #[cfg(feature = "std")]
114    fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
115    where
116        Self: AsyncProducer<Item = u8> + Unpin,
117    {
118        let mut waker_registered = false;
119        loop {
120            if self.is_closed() {
121                break Poll::Ready(Ok(0));
122            }
123            let count = self.push_slice(buf);
124            if count > 0 {
125                break Poll::Ready(Ok(count));
126            }
127            if waker_registered {
128                break Poll::Pending;
129            }
130            self.register_waker(cx.waker());
131            waker_registered = true;
132        }
133    }
134}
135
136/// # Cancel safety
137///
138/// If future is cancelled no item pushed to the RB.
139pub struct PushFuture<'a, A: AsyncProducer + ?Sized> {
140    owner: &'a mut A,
141    item: Option<A::Item>,
142}
143impl<A: AsyncProducer> Unpin for PushFuture<'_, A> {}
144impl<A: AsyncProducer> FusedFuture for PushFuture<'_, A> {
145    fn is_terminated(&self) -> bool {
146        self.item.is_none()
147    }
148}
149impl<A: AsyncProducer> Future for PushFuture<'_, A> {
150    type Output = Result<(), A::Item>;
151
152    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
153        let mut waker_registered = false;
154        loop {
155            let item = self.item.take().unwrap();
156            if self.owner.is_closed() {
157                break Poll::Ready(Err(item));
158            }
159            let push_result = self.owner.try_push(item);
160            if push_result.is_ok() {
161                break Poll::Ready(Ok(()));
162            }
163            self.item.replace(push_result.unwrap_err());
164            if waker_registered {
165                break Poll::Pending;
166            }
167            self.owner.register_waker(cx.waker());
168            waker_registered = true;
169        }
170    }
171}
172
173/// # Cancel safety
174///
175/// On cancel the slice can be copied partially.
176pub struct PushSliceFuture<'a, 'b, A: AsyncProducer + ?Sized>
177where
178    A::Item: Copy,
179{
180    owner: &'a mut A,
181    slice: Option<&'b [A::Item]>,
182    count: usize,
183}
184impl<A: AsyncProducer> Unpin for PushSliceFuture<'_, '_, A> where A::Item: Copy {}
185impl<A: AsyncProducer> FusedFuture for PushSliceFuture<'_, '_, A>
186where
187    A::Item: Copy,
188{
189    fn is_terminated(&self) -> bool {
190        self.slice.is_none()
191    }
192}
193impl<A: AsyncProducer> Future for PushSliceFuture<'_, '_, A>
194where
195    A::Item: Copy,
196{
197    type Output = Result<(), usize>;
198
199    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
200        let mut waker_registered = false;
201        loop {
202            let mut slice = self.slice.take().unwrap();
203            if self.owner.is_closed() {
204                break Poll::Ready(Err(self.count));
205            }
206            let len = self.owner.push_slice(slice);
207            slice = &slice[len..];
208            self.count += len;
209            if slice.is_empty() {
210                break Poll::Ready(Ok(()));
211            }
212            self.slice.replace(slice);
213            if waker_registered {
214                break Poll::Pending;
215            }
216            self.owner.register_waker(cx.waker());
217            waker_registered = true;
218        }
219    }
220}
221impl<A: AsyncProducer> PushSliceFuture<'_, '_, A>
222where
223    A::Item: Copy,
224{
225    pub fn count(&self) -> usize {
226        self.count
227    }
228}
229
230/// # Cancel safety
231///
232/// If future is cancelled then remaining items are left in iterator.
233pub struct PushIterFuture<'a, A: AsyncProducer + ?Sized, I: Iterator<Item = A::Item>> {
234    owner: &'a mut A,
235    iter: Option<Peekable<I>>,
236}
237impl<A: AsyncProducer, I: Iterator<Item = A::Item>> Unpin for PushIterFuture<'_, A, I> {}
238impl<A: AsyncProducer, I: Iterator<Item = A::Item>> FusedFuture for PushIterFuture<'_, A, I> {
239    fn is_terminated(&self) -> bool {
240        self.iter.is_none() || self.owner.is_closed()
241    }
242}
243impl<A: AsyncProducer, I: Iterator<Item = A::Item>> Future for PushIterFuture<'_, A, I> {
244    type Output = bool;
245
246    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
247        let mut waker_registered = false;
248        loop {
249            let mut iter = self.iter.take().unwrap();
250            if self.owner.is_closed() {
251                break Poll::Ready(false);
252            }
253            self.owner.push_iter(&mut iter);
254            if iter.peek().is_none() {
255                break Poll::Ready(true);
256            }
257            self.iter.replace(iter);
258            if waker_registered {
259                break Poll::Pending;
260            }
261            self.owner.register_waker(cx.waker());
262            waker_registered = true;
263        }
264    }
265}
266impl<A: AsyncProducer, I: Iterator<Item = A::Item>> PushIterFuture<'_, A, I> {
267    pub fn inner(&self) -> &Peekable<I> {
268        self.iter.as_ref().unwrap()
269    }
270    pub fn inner_mut(&mut self) -> &mut Peekable<I> {
271        self.iter.as_mut().unwrap()
272    }
273    pub fn into_inner(self) -> Peekable<I> {
274        self.iter.unwrap()
275    }
276}
277
278/// # Cancel safety
279///
280/// You can safely cancel this future.
281pub struct WaitVacantFuture<'a, A: AsyncProducer + ?Sized> {
282    owner: &'a A,
283    count: usize,
284    done: bool,
285}
286impl<A: AsyncProducer> Unpin for WaitVacantFuture<'_, A> {}
287impl<A: AsyncProducer> FusedFuture for WaitVacantFuture<'_, A> {
288    fn is_terminated(&self) -> bool {
289        self.done
290    }
291}
292impl<A: AsyncProducer> Future for WaitVacantFuture<'_, A> {
293    type Output = ();
294
295    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
296        let mut waker_registered = false;
297        loop {
298            assert!(!self.done);
299            let closed = self.owner.is_closed();
300            if self.count <= self.owner.vacant_len() || closed {
301                break Poll::Ready(());
302            }
303            if waker_registered {
304                break Poll::Pending;
305            }
306            self.owner.register_waker(cx.waker());
307            waker_registered = true;
308        }
309    }
310}