futures_buffered/
try_buffered.rs

1use core::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use crate::{FuturesOrderedBounded, TryStream};
7use crate::{FuturesUnorderedBounded, TryFuture};
8use futures_core::ready;
9use futures_core::Stream;
10use pin_project_lite::pin_project;
11
12impl<T: ?Sized + TryStream> BufferedTryStreamExt for T {}
13
14/// An extension trait for `Stream`s that provides a variety of convenient
15/// combinator functions.
16pub trait BufferedTryStreamExt: TryStream {
17    /// An adaptor for creating a buffered list of pending futures.
18    ///
19    /// If this stream's item can be converted into a future, then this adaptor
20    /// will buffer up to at most `n` futures and then return the outputs in the
21    /// same order as the underlying stream. No more than `n` futures will be
22    /// buffered at any point in time, and less than `n` may also be buffered
23    /// depending on the state of each future.
24    ///
25    /// The returned stream will be a stream of each future's output.
26    fn try_buffered_ordered(self, n: usize) -> TryBufferedOrdered<Self>
27    where
28        Self::Ok: TryFuture<Err = Self::Err>,
29        Self: Sized,
30    {
31        TryBufferedOrdered {
32            stream: Some(self),
33            in_progress_queue: FuturesOrderedBounded::new(n),
34        }
35    }
36
37    /// An adaptor for creating a buffered list of pending futures (unordered).
38    ///
39    /// If this stream's item can be converted into a future, then this adaptor
40    /// will buffer up to `n` futures and then return the outputs in the order
41    /// in which they complete. No more than `n` futures will be buffered at
42    /// any point in time, and less than `n` may also be buffered depending on
43    /// the state of each future.
44    ///
45    /// The returned stream will be a stream of each future's output.
46    fn try_buffered_unordered(self, n: usize) -> TryBufferUnordered<Self>
47    where
48        Self::Ok: TryFuture<Err = Self::Err>,
49        Self: Sized,
50    {
51        TryBufferUnordered {
52            stream: Some(self),
53            in_progress_queue: FuturesUnorderedBounded::new(n),
54        }
55    }
56}
57
58pin_project! {
59    /// Stream for the [`try_buffered_ordered`](BufferedTryStreamExt::try_buffered_ordered) method.
60    #[must_use = "streams do nothing unless polled"]
61    pub struct TryBufferedOrdered<St>
62    where
63        St: TryStream,
64        St::Ok: TryFuture,
65    {
66        #[pin]
67        stream: Option<St>,
68        in_progress_queue: FuturesOrderedBounded<St::Ok>,
69    }
70}
71
72impl<St> Stream for TryBufferedOrdered<St>
73where
74    St: TryStream,
75    St::Ok: TryFuture<Err = St::Err>,
76{
77    type Item = Result<<St::Ok as TryFuture>::Ok, St::Err>;
78
79    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
80        let mut this = self.project();
81
82        // First up, try to spawn off as many futures as possible by filling up
83        // our queue of futures.
84        let ordered = this.in_progress_queue;
85        while ordered.in_progress_queue.tasks.len() < ordered.in_progress_queue.tasks.capacity() {
86            if let Some(s) = this.stream.as_mut().as_pin_mut() {
87                match s.poll_next(cx)? {
88                    Poll::Ready(Some(fut)) => {
89                        ordered.push_back(fut);
90                        continue;
91                    }
92                    Poll::Ready(None) => this.stream.as_mut().set(None),
93                    Poll::Pending => {}
94                }
95            }
96            break;
97        }
98
99        // Attempt to pull the next value from the in_progress_queue
100        let res = Pin::new(ordered).poll_next(cx);
101        if let Some(val) = ready!(res) {
102            return Poll::Ready(Some(val));
103        }
104
105        // If more values are still coming from the stream, we're not done yet
106        if this.stream.is_none() {
107            Poll::Ready(None)
108        } else {
109            Poll::Pending
110        }
111    }
112
113    fn size_hint(&self) -> (usize, Option<usize>) {
114        match &self.stream {
115            Some(s) => {
116                let queue_len = self.in_progress_queue.len();
117                let (lower, upper) = s.size_hint();
118                let lower = lower.saturating_add(queue_len);
119                let upper = match upper {
120                    Some(x) => x.checked_add(queue_len),
121                    None => None,
122                };
123                (lower, upper)
124            }
125            _ => (0, Some(0)),
126        }
127    }
128}
129
130pin_project!(
131    /// Stream for the [`try_buffered_unordered`](BufferedTryStreamExt::try_buffered_unordered) method.
132    #[must_use = "streams do nothing unless polled"]
133    pub struct TryBufferUnordered<S: TryStream> {
134        #[pin]
135        stream: Option<S>,
136        in_progress_queue: FuturesUnorderedBounded<S::Ok>,
137    }
138);
139
140impl<St> Stream for TryBufferUnordered<St>
141where
142    St: TryStream,
143    St::Ok: TryFuture<Err = St::Err>,
144{
145    type Item = Result<<St::Ok as TryFuture>::Ok, St::Err>;
146
147    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
148        let mut this = self.project();
149
150        // First up, try to spawn off as many futures as possible by filling up
151        // our queue of futures.
152        let unordered = this.in_progress_queue;
153        while unordered.tasks.len() < unordered.tasks.capacity() {
154            if let Some(s) = this.stream.as_mut().as_pin_mut() {
155                match s.poll_next(cx)? {
156                    Poll::Ready(Some(fut)) => {
157                        unordered.push(fut);
158                        continue;
159                    }
160                    Poll::Ready(None) => this.stream.as_mut().set(None),
161                    Poll::Pending => {}
162                }
163            }
164            break;
165        }
166
167        // Attempt to pull the next value from the in_progress_queue
168        match Pin::new(unordered).poll_next(cx) {
169            x @ (Poll::Pending | Poll::Ready(Some(_))) => return x,
170            Poll::Ready(None) => {}
171        }
172
173        // If more values are still coming from the stream, we're not done yet
174        if this.stream.as_pin_mut().is_none() {
175            Poll::Ready(None)
176        } else {
177            Poll::Pending
178        }
179    }
180
181    fn size_hint(&self) -> (usize, Option<usize>) {
182        match &self.stream {
183            Some(s) => {
184                let queue_len = self.in_progress_queue.len();
185                let (lower, upper) = s.size_hint();
186                let lower = lower.saturating_add(queue_len);
187                let upper = match upper {
188                    Some(x) => x.checked_add(queue_len),
189                    None => None,
190                };
191                (lower, upper)
192            }
193            _ => (0, Some(0)),
194        }
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use core::task::Poll;
202    use futures::{
203        channel::oneshot::{self, Canceled},
204        stream, TryFutureExt, TryStreamExt,
205    };
206    use futures_test::task::noop_context;
207
208    fn _else(_: Canceled) -> Result<i32, i32> {
209        Ok(0)
210    }
211
212    #[test]
213    fn buffered_ordered() {
214        let (send_one, recv_one) = oneshot::channel();
215        let (send_two, recv_two) = oneshot::channel();
216
217        let stream_of_futures = stream::iter(vec![
218            Ok(recv_one.unwrap_or_else(_else)),
219            Err(0),
220            Ok(recv_two.unwrap_or_else(_else)),
221        ]);
222        let mut buffered = stream_of_futures.try_buffered_ordered(10);
223        let mut cx = noop_context();
224
225        // sized properly
226        assert_eq!(buffered.size_hint(), (3, Some(3)));
227
228        // stream errors upfront
229        assert_eq!(
230            buffered.try_poll_next_unpin(&mut cx),
231            Poll::Ready(Some(Err(0)))
232        );
233
234        // make sure it returns pending
235        assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Pending);
236
237        // returns in a fixed order
238        send_two.send(Ok(2)).unwrap();
239        assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Pending);
240
241        send_one.send(Err(1)).unwrap();
242        assert_eq!(
243            buffered.try_poll_next_unpin(&mut cx),
244            Poll::Ready(Some(Err(1)))
245        );
246        assert_eq!(
247            buffered.try_poll_next_unpin(&mut cx),
248            Poll::Ready(Some(Ok(2)))
249        );
250
251        // completes properly
252        assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Ready(None));
253    }
254
255    #[test]
256    fn buffered_unordered() {
257        let (send_one, recv_one) = oneshot::channel();
258        let (send_two, recv_two) = oneshot::channel();
259
260        let stream_of_futures = stream::iter(vec![
261            Ok(recv_one.unwrap_or_else(_else)),
262            Err(0),
263            Ok(recv_two.unwrap_or_else(_else)),
264        ]);
265        let mut buffered = stream_of_futures.try_buffered_unordered(10);
266        let mut cx = noop_context();
267
268        // sized properly
269        assert_eq!(buffered.size_hint(), (3, Some(3)));
270
271        // stream errors upfront
272        assert_eq!(
273            buffered.try_poll_next_unpin(&mut cx),
274            Poll::Ready(Some(Err(0)))
275        );
276
277        // make sure it returns pending
278        assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Pending);
279
280        // returns in any order
281        send_two.send(Ok(2)).unwrap();
282        assert_eq!(
283            buffered.try_poll_next_unpin(&mut cx),
284            Poll::Ready(Some(Ok(2)))
285        );
286
287        send_one.send(Ok(1)).unwrap();
288        assert_eq!(
289            buffered.try_poll_next_unpin(&mut cx),
290            Poll::Ready(Some(Ok(1)))
291        );
292
293        // completes properly
294        assert_eq!(buffered.try_poll_next_unpin(&mut cx), Poll::Ready(None));
295    }
296}