futures_buffered/buffered/
unordered.rs

1use core::{
2    future::Future,
3    pin::Pin,
4    task::{Context, Poll},
5};
6use futures_core::Stream;
7use pin_project_lite::pin_project;
8
9use crate::FuturesUnorderedBounded;
10
11pin_project!(
12    /// Stream for the [`buffered_unordered`](crate::BufferedStreamExt::buffered_unordered)
13    /// method.
14    ///
15    /// # Examples
16    ///
17    /// ```
18    /// # futures::executor::block_on(async {
19    /// use futures::channel::oneshot;
20    /// use futures::stream::{self, StreamExt};
21    /// use futures_buffered::BufferedStreamExt;
22    ///
23    /// let (send_one, recv_one) = oneshot::channel();
24    /// let (send_two, recv_two) = oneshot::channel();
25    ///
26    /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
27    /// let mut buffered = stream_of_futures.buffered_unordered(10);
28    ///
29    /// send_two.send(2i32)?;
30    /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
31    ///
32    /// send_one.send(1i32)?;
33    /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
34    ///
35    /// assert_eq!(buffered.next().await, None);
36    /// # Ok::<(), i32>(()) }).unwrap();
37    /// ```
38    ///
39    /// ## Benchmarks
40    ///
41    /// ### Speed
42    ///
43    /// Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:
44    ///
45    /// ```text
46    /// futures::stream::BufferUnordered    time:   [420.33 ms 422.57 ms 424.83 ms]
47    /// futures_buffered::BufferUnordered   time:   [363.39 ms 365.59 ms 367.78 ms]
48    /// ```
49    ///
50    /// ### Memory usage
51    ///
52    /// Running 512000 `Ready<i32>` futures with 256 concurrent jobs.
53    ///
54    /// - count: the number of times alloc/dealloc was called
55    /// - alloc: the number of cumulative bytes allocated
56    /// - dealloc: the number of cumulative bytes deallocated
57    ///
58    /// ```text
59    /// futures::stream::BufferUnordered
60    ///     count:    1024002
61    ///     alloc:    40960144 B
62    ///     dealloc:  40960000 B
63    ///
64    /// futures_buffered::BufferUnordered
65    ///     count:    2
66    ///     alloc:    8264 B
67    ///     dealloc:  0 B
68    /// ```
69    #[must_use = "streams do nothing unless polled"]
70    pub struct BufferUnordered<S: Stream> {
71        #[pin]
72        pub(crate) stream: Option<S>,
73        pub(crate) in_progress_queue: FuturesUnorderedBounded<S::Item>,
74    }
75);
76
77impl<St> Stream for BufferUnordered<St>
78where
79    St: Stream,
80    St::Item: Future,
81{
82    type Item = <St::Item as Future>::Output;
83
84    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
85        let mut this = self.project();
86
87        // First up, try to spawn off as many futures as possible by filling up
88        // our queue of futures.
89        let unordered = this.in_progress_queue;
90        while unordered.tasks.len() < unordered.tasks.capacity() {
91            if let Some(s) = this.stream.as_mut().as_pin_mut() {
92                match s.poll_next(cx) {
93                    Poll::Ready(Some(fut)) => {
94                        unordered.push(fut);
95                        continue;
96                    }
97                    Poll::Ready(None) => this.stream.as_mut().set(None),
98                    Poll::Pending => {}
99                }
100            }
101            break;
102        }
103
104        // Attempt to pull the next value from the in_progress_queue
105        match Pin::new(unordered).poll_next(cx) {
106            x @ (Poll::Pending | Poll::Ready(Some(_))) => return x,
107            Poll::Ready(None) => {}
108        }
109
110        // If more values are still coming from the stream, we're not done yet
111        if this.stream.as_pin_mut().is_none() {
112            Poll::Ready(None)
113        } else {
114            Poll::Pending
115        }
116    }
117
118    fn size_hint(&self) -> (usize, Option<usize>) {
119        let queue_len = self.in_progress_queue.len();
120        let (lower, upper) = self
121            .stream
122            .as_ref()
123            .map(|s| s.size_hint())
124            .unwrap_or((0, Some(0)));
125        let lower = lower.saturating_add(queue_len);
126        let upper = match upper {
127            Some(x) => x.checked_add(queue_len),
128            None => None,
129        };
130        (lower, upper)
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use crate::BufferedStreamExt;
137
138    use super::*;
139    use futures::{channel::oneshot, stream, StreamExt};
140    use futures_test::task::noop_context;
141    use rand::{rng, Rng};
142    use tokio::task::JoinSet;
143
144    #[test]
145    fn buffered_unordered() {
146        let (send_one, recv_one) = oneshot::channel();
147        let (send_two, recv_two) = oneshot::channel();
148
149        let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
150        let mut buffered = stream_of_futures.buffered_unordered(10);
151        let mut cx = noop_context();
152
153        // sized properly
154        assert_eq!(buffered.size_hint(), (2, Some(2)));
155
156        // make sure it returns pending
157        assert_eq!(buffered.poll_next_unpin(&mut cx), Poll::Pending);
158
159        // returns in any order
160        send_two.send(2i32).unwrap();
161        assert_eq!(
162            buffered.poll_next_unpin(&mut cx),
163            Poll::Ready(Some(Ok(2i32)))
164        );
165
166        send_one.send(1i32).unwrap();
167        assert_eq!(
168            buffered.poll_next_unpin(&mut cx),
169            Poll::Ready(Some(Ok(1i32)))
170        );
171
172        // completes properly
173        assert_eq!(buffered.poll_next_unpin(&mut cx), Poll::Ready(None));
174    }
175
176    #[cfg(not(miri))]
177    // #[tokio::test(flavor = "multi_thread")]
178    #[tokio::test(start_paused = true)]
179    async fn high_concurrency() {
180        let now = tokio::time::Instant::now();
181        let dur = std::time::Duration::from_millis(10);
182        let n = 1024 * 16;
183        let c = 32;
184
185        let estimated = dur.as_secs_f64() * 10.5 * (n as f64) / (c as f64) * 4.0;
186        dbg!(estimated);
187
188        let mut js = JoinSet::new();
189
190        for _ in 0..32 {
191            js.spawn(async move {
192                let x = futures::stream::repeat_with(|| {
193                    let n = rng().random_range(1..=20);
194                    let fut = async move {
195                        for _ in 0..4 {
196                            tokio::time::sleep(n * dur).await;
197                        }
198                    };
199                    tokio::time::timeout(dur * (5 * n), fut)
200                });
201                let x = x.take(n as usize).buffered_unordered(c as usize);
202                x.for_each(|res| async { res.unwrap() }).await;
203            });
204        }
205
206        while js.join_next().await.is_some() {}
207
208        let elapsed = now.elapsed().as_secs_f64();
209        dbg!(elapsed);
210    }
211}