futures_buffered/buffered/
ordered.rs

1use crate::FuturesOrderedBounded;
2use core::{
3    future::Future,
4    pin::Pin,
5    task::{ready, Context, Poll},
6};
7use futures_core::Stream;
8use pin_project_lite::pin_project;
9
10pin_project! {
11    /// Stream for the [`buffered_ordered`](crate::BufferedStreamExt::buffered_ordered) method.
12    #[must_use = "streams do nothing unless polled"]
13    pub struct BufferedOrdered<St>
14    where
15        St: Stream,
16        St::Item: Future,
17    {
18        #[pin]
19        pub(crate) stream: Option<St>,
20        pub(crate) in_progress_queue: FuturesOrderedBounded<St::Item>,
21    }
22}
23
24impl<St> Stream for BufferedOrdered<St>
25where
26    St: Stream,
27    St::Item: Future,
28{
29    type Item = <St::Item as Future>::Output;
30
31    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32        let mut this = self.project();
33
34        // First up, try to spawn off as many futures as possible by filling up
35        // our queue of futures.
36        let ordered = this.in_progress_queue;
37        while ordered.in_progress_queue.tasks.len() < ordered.in_progress_queue.tasks.capacity() {
38            if let Some(s) = this.stream.as_mut().as_pin_mut() {
39                match s.poll_next(cx) {
40                    Poll::Ready(Some(fut)) => {
41                        ordered.push_back(fut);
42                        continue;
43                    }
44                    Poll::Ready(None) => this.stream.as_mut().set(None),
45                    Poll::Pending => {}
46                }
47            }
48            break;
49        }
50
51        // Attempt to pull the next value from the in_progress_queue
52        let res = Pin::new(ordered).poll_next(cx);
53        if let Some(val) = ready!(res) {
54            return Poll::Ready(Some(val));
55        }
56
57        // If more values are still coming from the stream, we're not done yet
58        if this.stream.is_none() {
59            Poll::Ready(None)
60        } else {
61            Poll::Pending
62        }
63    }
64
65    fn size_hint(&self) -> (usize, Option<usize>) {
66        let queue_len = self.in_progress_queue.len();
67        let (lower, upper) = self
68            .stream
69            .as_ref()
70            .map(|s| s.size_hint())
71            .unwrap_or((0, Some(0)));
72        let lower = lower.saturating_add(queue_len);
73        let upper = match upper {
74            Some(x) => x.checked_add(queue_len),
75            None => None,
76        };
77        (lower, upper)
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use crate::BufferedStreamExt;
84
85    use super::*;
86    use futures::{channel::oneshot, stream, StreamExt};
87    use futures_test::task::noop_context;
88
89    #[test]
90    fn buffered_ordered() {
91        let (send_one, recv_one) = oneshot::channel();
92        let (send_two, recv_two) = oneshot::channel();
93
94        let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
95        let mut buffered = stream_of_futures.buffered_ordered(10);
96        let mut cx = noop_context();
97
98        // sized properly
99        assert_eq!(buffered.size_hint(), (2, Some(2)));
100
101        // make sure it returns pending
102        assert_eq!(buffered.poll_next_unpin(&mut cx), Poll::Pending);
103
104        // returns in a fixed order
105        send_two.send(2i32).unwrap();
106        assert_eq!(buffered.poll_next_unpin(&mut cx), Poll::Pending);
107
108        send_one.send(1i32).unwrap();
109        assert_eq!(
110            buffered.poll_next_unpin(&mut cx),
111            Poll::Ready(Some(Ok(1i32)))
112        );
113        assert_eq!(
114            buffered.poll_next_unpin(&mut cx),
115            Poll::Ready(Some(Ok(2i32)))
116        );
117
118        // completes properly
119        assert_eq!(buffered.poll_next_unpin(&mut cx), Poll::Ready(None));
120    }
121}