futures_buffered/
futures_unordered.rs

1use alloc::vec::Vec;
2use core::{
3    fmt,
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7};
8
9use crate::FuturesUnorderedBounded;
10use futures_core::{FusedStream, Stream};
11
12/// A set of futures which may complete in any order.
13///
14/// Much like [`futures::stream::FuturesUnordered`](https://docs.rs/futures/0.3.25/futures/stream/struct.FuturesUnordered.html),
15/// this is a thread-safe, `Pin` friendly, lifetime friendly, concurrent processing stream.
16///
17/// The is different to [`FuturesUnorderedBounded`] because it doesn't have a fixed capacity.
18/// It still manages to achieve good efficiency however
19///
20/// ## Benchmarks
21///
22/// All benchmarks are run with `FuturesUnordered::new()`, no predefined capacity.
23///
24/// ### Speed
25///
26/// Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:
27///
28/// ```text
29/// futures::FuturesUnordered time:   [412.52 ms 414.47 ms 416.41 ms]
30/// crate::FuturesUnordered   time:   [412.96 ms 414.69 ms 416.65 ms]
31/// FuturesUnorderedBounded   time:   [361.81 ms 362.96 ms 364.13 ms]
32/// ```
33///
34/// ### Memory usage
35///
36/// Running 512000 `Ready<i32>` futures with 256 concurrent jobs.
37///
38/// - count: the number of times alloc/dealloc was called
39/// - alloc: the number of cumulative bytes allocated
40/// - dealloc: the number of cumulative bytes deallocated
41///
42/// ```text
43/// futures::FuturesUnordered
44///     count:    1024002
45///     alloc:    40960144 B
46///     dealloc:  40960000 B
47///
48/// crate::FuturesUnordered
49///     count:    9
50///     alloc:    15840 B
51///     dealloc:  0 B
52/// ```
53///
54/// ### Conclusion
55///
56/// As you can see, our `FuturesUnordered` massively reduces you memory overhead while maintaining good performance.
57///
58/// # Example
59///
60/// Making 1024 total HTTP requests, with a max concurrency of 128
61///
62/// ```
63/// use futures::future::Future;
64/// use futures::stream::StreamExt;
65/// use futures_buffered::FuturesUnordered;
66/// use hyper::client::conn::http1::{handshake, SendRequest};
67/// use hyper::body::Incoming;
68/// use hyper::{Request, Response};
69/// use hyper_util::rt::TokioIo;
70/// use tokio::net::TcpStream;
71///
72/// # #[cfg(miri)] fn main() {}
73/// # #[cfg(not(miri))] #[tokio::main]
74/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
75/// // create a tcp connection
76/// let stream = TcpStream::connect("example.com:80").await?;
77///
78/// // perform the http handshakes
79/// let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
80/// tokio::spawn(conn);
81///
82/// /// make http request to example.com and read the response
83/// fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
84///     let req = Request::builder()
85///         .header("Host", "example.com")
86///         .method("GET")
87///         .body(String::new())
88///         .unwrap();
89///     rs.send_request(req)
90/// }
91///
92/// // create a queue that can hold 128 concurrent requests
93/// let mut queue = FuturesUnordered::with_capacity(128);
94///
95/// // start up 128 requests
96/// for _ in 0..128 {
97///     queue.push(make_req(&mut rs));
98/// }
99/// // wait for a request to finish and start another to fill its place - up to 1024 total requests
100/// for _ in 128..1024 {
101///     queue.next().await;
102///     queue.push(make_req(&mut rs));
103/// }
104/// // wait for the tail end to finish
105/// for _ in 0..128 {
106///     queue.next().await;
107/// }
108/// # Ok(()) }
109/// ```
110pub struct FuturesUnordered<F> {
111    rem: usize,
112    pub(crate) groups: Vec<FuturesUnorderedBounded<F>>,
113    poll_next: usize,
114}
115
116pub(crate) const MIN_CAPACITY: usize = 32;
117
118impl<F> Unpin for FuturesUnordered<F> {}
119
120impl<F> Default for FuturesUnordered<F> {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126impl<F> FuturesUnordered<F> {
127    /// Constructs a new, empty [`FuturesUnordered`].
128    ///
129    /// The returned [`FuturesUnordered`] does not contain any futures.
130    /// In this state, [`FuturesUnordered::poll_next`](Stream::poll_next) will
131    /// return [`Poll::Ready(None)`](Poll::Ready).
132    pub const fn new() -> Self {
133        Self {
134            rem: 0,
135            groups: Vec::new(),
136            poll_next: 0,
137        }
138    }
139
140    /// Constructs a new, empty [`FuturesUnordered`] with the given fixed capacity.
141    ///
142    /// The returned [`FuturesUnordered`] does not contain any futures.
143    /// In this state, [`FuturesUnordered::poll_next`](Stream::poll_next) will
144    /// return [`Poll::Ready(None)`](Poll::Ready).
145    pub fn with_capacity(n: usize) -> Self {
146        if n > 0 {
147            Self {
148                rem: 0,
149                groups: Vec::from_iter([FuturesUnorderedBounded::new(n)]),
150                poll_next: 0,
151            }
152        } else {
153            Self::new()
154        }
155    }
156
157    /// Push a future into the set.
158    ///
159    /// This method adds the given future to the set. This method will not
160    /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must
161    /// ensure that [`FuturesUnordered::poll_next`](Stream::poll_next) is called
162    /// in order to receive wake-up notifications for the given future.
163    pub fn push(&mut self, fut: F) {
164        self.rem += 1;
165
166        let last = match self.groups.last_mut() {
167            Some(last) => last,
168            None => {
169                self.groups.push(FuturesUnorderedBounded::new(MIN_CAPACITY));
170                self.groups
171                    .last_mut()
172                    .expect("group should have at least one entry")
173            }
174        };
175        match last.try_push(fut) {
176            Ok(()) => {}
177            Err(future) => {
178                let mut next = FuturesUnorderedBounded::new(last.capacity() * 2);
179                next.push(future);
180                self.groups.push(next);
181            }
182        }
183    }
184
185    /// Returns `true` if the set contains no futures.
186    pub fn is_empty(&self) -> bool {
187        self.rem == 0
188    }
189
190    /// Returns the number of futures contained in the set.
191    ///
192    /// This represents the total number of in-flight futures.
193    pub fn len(&self) -> usize {
194        self.rem
195    }
196
197    /// Returns the number of futures that can be contained in the set.
198    pub fn capacity(&self) -> usize {
199        match self.groups.as_slice() {
200            [] => 0,
201            [only] => only.capacity(),
202            [.., last] => {
203                let spare_cap = last.capacity() - last.len();
204                self.rem + spare_cap
205            }
206        }
207    }
208}
209
210impl<F: Future> Stream for FuturesUnordered<F> {
211    type Item = F::Output;
212
213    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
214        let Self {
215            rem,
216            groups,
217            poll_next,
218        } = &mut *self;
219        if groups.is_empty() {
220            return Poll::Ready(None);
221        }
222
223        for _ in 0..groups.len() {
224            if *poll_next >= groups.len() {
225                *poll_next = 0;
226            }
227
228            let poll = Pin::new(&mut groups[*poll_next]).poll_next(cx);
229            match poll {
230                Poll::Ready(Some(x)) => {
231                    *rem -= 1;
232                    return Poll::Ready(Some(x));
233                }
234                Poll::Ready(None) => {
235                    let group = groups.remove(*poll_next);
236                    debug_assert!(group.is_empty());
237
238                    if groups.is_empty() {
239                        // group should contain at least 1 set
240                        groups.push(group);
241                        debug_assert_eq!(*rem, 0);
242                        return Poll::Ready(None);
243                    }
244
245                    // we do not want to drop the last set as it contains
246                    // the largest allocation that we want to keep a hold of
247                    if *poll_next == groups.len() {
248                        groups.push(group);
249                        *poll_next = 0;
250                    }
251                }
252                Poll::Pending => {
253                    *poll_next += 1;
254                }
255            }
256        }
257        Poll::Pending
258    }
259
260    fn size_hint(&self) -> (usize, Option<usize>) {
261        (self.rem, Some(self.rem))
262    }
263}
264impl<F: Future> FusedStream for FuturesUnordered<F> {
265    fn is_terminated(&self) -> bool {
266        self.is_empty()
267    }
268}
269
270impl<F> FromIterator<F> for FuturesUnordered<F> {
271    /// Constructs a new, empty [`FuturesUnordered`] with a fixed capacity that is the length of the iterator.
272    ///
273    /// # Example
274    ///
275    /// Making 1024 total HTTP requests, with a max concurrency of 128
276    ///
277    /// ```
278    /// use futures::future::Future;
279    /// use futures::stream::StreamExt;
280    /// use futures_buffered::FuturesUnordered;
281    /// use hyper::client::conn::http1::{handshake, SendRequest};
282    /// use hyper::body::Incoming;
283    /// use hyper::{Request, Response};
284    /// use hyper_util::rt::TokioIo;
285    /// use tokio::net::TcpStream;
286    ///
287    /// # #[cfg(miri)] fn main() {}
288    /// # #[cfg(not(miri))] #[tokio::main]
289    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
290    /// // create a tcp connection
291    /// let stream = TcpStream::connect("example.com:80").await?;
292    ///
293    /// // perform the http handshakes
294    /// let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
295    /// tokio::spawn(conn);
296    ///
297    /// /// make http request to example.com and read the response
298    /// fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
299    ///     let req = Request::builder()
300    ///         .header("Host", "example.com")
301    ///         .method("GET")
302    ///         .body(String::new())
303    ///         .unwrap();
304    ///     rs.send_request(req)
305    /// }
306    ///
307    /// // create a queue with an initial 128 concurrent requests
308    /// let mut queue: FuturesUnordered<_> = (0..128).map(|_| make_req(&mut rs)).collect();
309    ///
310    /// // wait for a request to finish and start another to fill its place - up to 1024 total requests
311    /// for _ in 128..1024 {
312    ///     queue.next().await;
313    ///     queue.push(make_req(&mut rs));
314    /// }
315    /// // wait for the tail end to finish
316    /// for _ in 0..128 {
317    ///     queue.next().await;
318    /// }
319    /// # Ok(()) }
320    /// ```
321    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
322        let iter = iter.into_iter();
323        let mut this =
324            FuturesUnordered::with_capacity(usize::max(iter.size_hint().0, MIN_CAPACITY));
325        for fut in iter {
326            this.push(fut);
327        }
328        this
329    }
330}
331
332impl<Fut> fmt::Debug for FuturesUnordered<Fut> {
333    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334        f.debug_struct("FuturesUnordered")
335            .field("queues", &self.groups)
336            .field("len", &self.rem)
337            .finish_non_exhaustive()
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use core::{cell::Cell, future::ready, time::Duration};
345    use futures::StreamExt;
346    use pin_project_lite::pin_project;
347    use std::{thread, time::Instant};
348
349    pin_project!(
350        struct PollCounter<'c, F> {
351            count: &'c Cell<usize>,
352            #[pin]
353            inner: F,
354        }
355    );
356
357    impl<F: Future> Future for PollCounter<'_, F> {
358        type Output = F::Output;
359        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
360            self.count.set(self.count.get() + 1);
361            self.project().inner.poll(cx)
362        }
363    }
364
365    struct Sleep {
366        until: Instant,
367    }
368    impl Unpin for Sleep {}
369    impl Future for Sleep {
370        type Output = ();
371
372        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
373            let until = self.until;
374            if until > Instant::now() {
375                let waker = cx.waker().clone();
376                thread::spawn(move || {
377                    thread::sleep(until.duration_since(Instant::now()));
378                    waker.wake();
379                });
380                Poll::Pending
381            } else {
382                Poll::Ready(())
383            }
384        }
385    }
386
387    struct Yield {
388        done: bool,
389    }
390    impl Unpin for Yield {}
391    impl Future for Yield {
392        type Output = ();
393
394        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
395            if self.as_mut().done {
396                Poll::Ready(())
397            } else {
398                cx.waker().wake_by_ref();
399                self.as_mut().done = true;
400                Poll::Pending
401            }
402        }
403    }
404
405    fn yield_now(count: &Cell<usize>) -> PollCounter<'_, Yield> {
406        PollCounter {
407            count,
408            inner: Yield { done: false },
409        }
410    }
411
412    #[test]
413    fn single() {
414        let c = Cell::new(0);
415
416        let mut buffer = FuturesUnordered::new();
417        buffer.push(yield_now(&c));
418        futures::executor::block_on(buffer.next());
419
420        drop(buffer);
421        assert_eq!(c.into_inner(), 2);
422    }
423
424    #[test]
425    fn len() {
426        let mut buffer = FuturesUnordered::with_capacity(1);
427
428        assert_eq!(buffer.len(), 0);
429        assert!(buffer.is_empty());
430        assert_eq!(buffer.capacity(), 1);
431        assert_eq!(buffer.size_hint(), (0, Some(0)));
432        assert!(buffer.is_terminated());
433
434        buffer.push(ready(()));
435
436        assert_eq!(buffer.len(), 1);
437        assert!(!buffer.is_empty());
438        assert_eq!(buffer.capacity(), 1);
439        assert_eq!(buffer.size_hint(), (1, Some(1)));
440        assert!(!buffer.is_terminated());
441
442        buffer.push(ready(()));
443
444        assert_eq!(buffer.len(), 2);
445        assert!(!buffer.is_empty());
446        assert_eq!(buffer.capacity(), 3);
447        assert_eq!(buffer.size_hint(), (2, Some(2)));
448        assert!(!buffer.is_terminated());
449
450        futures::executor::block_on(buffer.next());
451        futures::executor::block_on(buffer.next());
452
453        assert_eq!(buffer.len(), 0);
454        assert!(buffer.is_empty());
455        assert_eq!(buffer.capacity(), 2);
456        assert_eq!(buffer.size_hint(), (0, Some(0)));
457        assert!(buffer.is_terminated());
458    }
459
460    #[test]
461    fn from_iter() {
462        let buffer = FuturesUnordered::from_iter((0..10).map(|_| ready(())));
463
464        assert_eq!(buffer.len(), 10);
465        assert_eq!(buffer.capacity(), 32);
466        assert_eq!(buffer.size_hint(), (10, Some(10)));
467    }
468
469    #[test]
470    fn multi() {
471        fn wait(count: &Cell<usize>) -> PollCounter<'_, Yield> {
472            yield_now(count)
473        }
474
475        let c = Cell::new(0);
476
477        let mut buffer = FuturesUnordered::with_capacity(1);
478        // build up
479        for _ in 0..10 {
480            buffer.push(wait(&c));
481        }
482        // poll and insert
483        for _ in 0..100 {
484            assert!(futures::executor::block_on(buffer.next()).is_some());
485            buffer.push(wait(&c));
486        }
487        // drain down
488        for _ in 0..10 {
489            assert!(futures::executor::block_on(buffer.next()).is_some());
490        }
491
492        let count = c.into_inner();
493        assert_eq!(count, 220);
494    }
495
496    #[test]
497    fn very_slow_task() {
498        let c = Cell::new(0);
499
500        let now = Instant::now();
501
502        let mut buffer = FuturesUnordered::with_capacity(1);
503        // build up
504        for _ in 0..9 {
505            buffer.push(yield_now(&c));
506        }
507        // spawn a slow future among a bunch of fast ones.
508        // the test is to make sure this doesn't block the rest getting completed
509        buffer.push(yield_now(&c));
510        // poll and insert
511        for _ in 0..100 {
512            assert!(futures::executor::block_on(buffer.next()).is_some());
513            buffer.push(yield_now(&c));
514        }
515        // drain down
516        for _ in 0..10 {
517            assert!(futures::executor::block_on(buffer.next()).is_some());
518        }
519
520        let dur = now.elapsed();
521        assert!(dur < Duration::from_millis(2050));
522
523        let count = c.into_inner();
524        assert_eq!(count, 220);
525    }
526
527    #[cfg(not(miri))]
528    #[tokio::test]
529    async fn unordered_large() {
530        for i in 0..256 {
531            let mut queue: FuturesUnorderedBounded<_> = ((0..i).map(|_| async move {
532                tokio::time::sleep(Duration::from_nanos(1)).await;
533            }))
534            .collect();
535            for _ in 0..i {
536                queue.next().await.unwrap();
537            }
538        }
539    }
540}