Skip to main content

futures_buffered/
futures_unordered_bounded.rs

1use core::{
2    fmt,
3    future::Future,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use crate::{slot_map::PinSlotMap, waker_list::WakerList};
9use futures_core::{FusedStream, Stream};
10
11/// A set of futures which may complete in any order.
12///
13/// Much like [`futures::stream::FuturesUnordered`](https://docs.rs/futures/0.3.25/futures/stream/struct.FuturesUnordered.html),
14/// this is a thread-safe, `Pin` friendly, lifetime friendly, concurrent processing stream.
15///
16/// The is different to `FuturesUnordered` in that `FuturesUnorderedBounded` has a fixed capacity for processing count.
17/// This means it's less flexible, but produces better memory efficiency.
18///
19/// ## Benchmarks
20///
21/// ### Speed
22///
23/// Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:
24///
25/// ```text
26/// FuturesUnordered         time:   [420.47 ms 422.21 ms 423.99 ms]
27/// FuturesUnorderedBounded  time:   [366.02 ms 367.54 ms 369.05 ms]
28/// ```
29///
30/// ### Memory usage
31///
32/// Running 512000 `Ready<i32>` futures with 256 concurrent jobs.
33///
34/// - count: the number of times alloc/dealloc was called
35/// - alloc: the number of cumulative bytes allocated
36/// - dealloc: the number of cumulative bytes deallocated
37///
38/// ```text
39/// FuturesUnordered
40///     count:    1024002
41///     alloc:    40960144 B
42///     dealloc:  40960000 B
43///
44/// FuturesUnorderedBounded
45///     count:    2
46///     alloc:    8264 B
47///     dealloc:  0 B
48/// ```
49///
50/// ### Conclusion
51///
52/// As you can see, `FuturesUnorderedBounded` massively reduces you memory overhead while providing a significant performance gain.
53/// Perfect for if you want a fixed batch size
54///
55/// # Example
56///
57/// Making 1024 total HTTP requests, with a max concurrency of 128
58///
59/// ```
60/// use futures::future::Future;
61/// use futures::stream::StreamExt;
62/// use futures_buffered::FuturesUnorderedBounded;
63/// use hyper::client::conn::http1::{handshake, SendRequest};
64/// use hyper::body::Incoming;
65/// use hyper::{Request, Response};
66/// use hyper_util::rt::TokioIo;
67/// use tokio::net::TcpStream;
68///
69/// # #[cfg(miri)] fn main() {}
70/// # #[cfg(not(miri))] #[tokio::main]
71/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
72/// // create a tcp connection
73/// let stream = TcpStream::connect("example.com:80").await?;
74///
75/// // perform the http handshakes
76/// let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
77/// tokio::spawn(conn);
78///
79/// /// make http request to example.com and read the response
80/// fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
81///     let req = Request::builder()
82///         .header("Host", "example.com")
83///         .method("GET")
84///         .body(String::new())
85///         .unwrap();
86///     rs.send_request(req)
87/// }
88///
89/// // create a queue that can hold 128 concurrent requests
90/// let mut queue = FuturesUnorderedBounded::new(128);
91///
92/// // start up 128 requests
93/// for _ in 0..128 {
94///     queue.push(make_req(&mut rs));
95/// }
96/// // wait for a request to finish and start another to fill its place - up to 1024 total requests
97/// for _ in 128..1024 {
98///     queue.next().await;
99///     queue.push(make_req(&mut rs));
100/// }
101/// // wait for the tail end to finish
102/// for _ in 0..128 {
103///     queue.next().await;
104/// }
105/// # Ok(()) }
106/// ```
107pub struct FuturesUnorderedBounded<F> {
108    pub(crate) tasks: PinSlotMap<F>,
109    pub(crate) shared: WakerList,
110}
111
112impl<F> Unpin for FuturesUnorderedBounded<F> {}
113
114impl<F> FuturesUnorderedBounded<F> {
115    /// Constructs a new, empty [`FuturesUnorderedBounded`] with the given fixed capacity.
116    ///
117    /// The returned [`FuturesUnorderedBounded`] does not contain any futures.
118    /// In this state, [`FuturesUnorderedBounded::poll_next`](Stream::poll_next) will
119    /// return [`Poll::Ready(None)`](Poll::Ready).
120    pub fn new(cap: usize) -> Self {
121        Self {
122            tasks: PinSlotMap::new(cap),
123            shared: WakerList::new(cap),
124        }
125    }
126
127    /// Push a future into the set.
128    ///
129    /// This method adds the given future to the set. This method will not
130    /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must
131    /// ensure that [`FuturesUnorderedBounded::poll_next`](Stream::poll_next) is called
132    /// in order to receive wake-up notifications for the given future.
133    ///
134    /// # Panics
135    /// This method will panic if the buffer is currently full. See [`FuturesUnorderedBounded::try_push`] to get a result instead
136    #[track_caller]
137    pub fn push(&mut self, fut: F) {
138        if self.try_push(fut).is_err() {
139            panic!("attempted to push into a full `FuturesUnorderedBounded`");
140        }
141    }
142
143    /// Push a future into the set.
144    ///
145    /// This method adds the given future to the set. This method will not
146    /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must
147    /// ensure that [`FuturesUnorderedBounded::poll_next`](Stream::poll_next) is called
148    /// in order to receive wake-up notifications for the given future.
149    ///
150    /// # Errors
151    /// This method will error if the buffer is currently full, returning the future back
152    pub fn try_push(&mut self, fut: F) -> Result<(), F> {
153        self.try_push_with(fut, core::convert::identity)
154    }
155
156    #[inline]
157    pub(crate) fn try_push_with<T>(&mut self, t: T, f: impl FnMut(T) -> F) -> Result<(), T> {
158        let i = self.tasks.insert_with(t, f)?;
159        // safety: i is always within capacity
160        unsafe {
161            self.shared.push(i);
162        }
163        Ok(())
164    }
165
166    /// Returns `true` if the set contains no futures.
167    pub fn is_empty(&self) -> bool {
168        self.tasks.is_empty()
169    }
170
171    /// Returns the number of futures contained in the set.
172    ///
173    /// This represents the total number of in-flight futures.
174    pub fn len(&self) -> usize {
175        self.tasks.len()
176    }
177
178    /// Returns the number of futures that can be contained in the set.
179    pub fn capacity(&self) -> usize {
180        self.tasks.capacity()
181    }
182}
183
184type PollFn<F, O> = fn(Pin<&mut F>, cx: &mut Context<'_>) -> Poll<O>;
185
186impl<F> FuturesUnorderedBounded<F> {
187    pub(crate) fn poll_inner_no_remove<O>(
188        &mut self,
189        cx: &mut Context<'_>,
190        poll_fn: PollFn<F, O>,
191    ) -> Poll<Option<(usize, O)>> {
192        const MAX: usize = 61;
193
194        if self.is_empty() {
195            return Poll::Ready(None);
196        }
197
198        self.shared.register(cx.waker());
199
200        let mut count = 0;
201        loop {
202            count += 1;
203            // if we are in a pending only loop - let's break out.
204            // we do this even with tokio-coop in case of unconstrained tasks, or non-tokio runtimes.
205            if count > MAX {
206                cx.waker().wake_by_ref();
207                return Poll::Pending;
208            }
209
210            #[cfg(feature = "tokio-coop")]
211            let coop = core::task::ready!(tokio::task::coop::poll_proceed(cx));
212
213            match unsafe { self.shared.pop() } {
214                crate::waker_list::ReadySlot::None => return Poll::Pending,
215                crate::waker_list::ReadySlot::Inconsistent => {
216                    cx.waker().wake_by_ref();
217                    return Poll::Pending;
218                }
219                crate::waker_list::ReadySlot::Ready((i, waker)) => {
220                    #[cfg(feature = "tokio-coop")]
221                    coop.made_progress();
222
223                    if let Some(task) = self.tasks.get(i) {
224                        let mut cx = Context::from_waker(&waker);
225
226                        let res = poll_fn(task, &mut cx);
227
228                        if let Poll::Ready(x) = res {
229                            return Poll::Ready(Some((i, x)));
230                        }
231                    }
232                }
233            }
234        }
235    }
236}
237
238impl<F: Future> FuturesUnorderedBounded<F> {
239    pub(crate) fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, F::Output)>> {
240        match self.poll_inner_no_remove(cx, F::poll) {
241            Poll::Ready(Some((i, x))) => {
242                self.tasks.remove(i);
243                Poll::Ready(Some((i, x)))
244            }
245            p => p,
246        }
247    }
248}
249
250impl<F: Future> Stream for FuturesUnorderedBounded<F> {
251    type Item = F::Output;
252
253    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254        match self.poll_inner(cx) {
255            Poll::Ready(Some((_, x))) => Poll::Ready(Some(x)),
256            Poll::Ready(None) => Poll::Ready(None),
257            Poll::Pending => Poll::Pending,
258        }
259    }
260
261    fn size_hint(&self) -> (usize, Option<usize>) {
262        let len = self.len();
263        (len, Some(len))
264    }
265}
266
267impl<F> FromIterator<F> for FuturesUnorderedBounded<F> {
268    /// Constructs a new, empty [`FuturesUnorderedBounded`] with a fixed capacity that is the length of the iterator.
269    ///
270    /// # Example
271    ///
272    /// Making 1024 total HTTP requests, with a max concurrency of 128
273    ///
274    /// ```
275    /// use futures::future::Future;
276    /// use futures::stream::StreamExt;
277    /// use futures_buffered::FuturesUnorderedBounded;
278    /// use hyper::client::conn::http1::{handshake, SendRequest};
279    /// use hyper::body::Incoming;
280    /// use hyper::{Request, Response};
281    /// use hyper_util::rt::TokioIo;
282    /// use tokio::net::TcpStream;
283    ///
284    /// # #[cfg(miri)] fn main() {}
285    /// # #[cfg(not(miri))] #[tokio::main]
286    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
287    /// // create a tcp connection
288    /// let stream = TcpStream::connect("example.com:80").await?;
289    ///
290    /// // perform the http handshakes
291    /// let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
292    /// tokio::spawn(conn);
293    ///
294    /// /// make http request to example.com and read the response
295    /// fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
296    ///     let req = Request::builder()
297    ///         .header("Host", "example.com")
298    ///         .method("GET")
299    ///         .body(String::new())
300    ///         .unwrap();
301    ///     rs.send_request(req)
302    /// }
303    ///
304    /// // create a queue with an initial 128 concurrent requests
305    /// let mut queue: FuturesUnorderedBounded<_> = (0..128).map(|_| make_req(&mut rs)).collect();
306    ///
307    /// // wait for a request to finish and start another to fill its place - up to 1024 total requests
308    /// for _ in 128..1024 {
309    ///     queue.next().await;
310    ///     queue.push(make_req(&mut rs));
311    /// }
312    /// // wait for the tail end to finish
313    /// for _ in 0..128 {
314    ///     queue.next().await;
315    /// }
316    /// # Ok(()) }
317    /// ```
318    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
319        // store the futures in our task list
320        let tasks = PinSlotMap::from_iter(iter);
321
322        // determine the actual capacity and create the shared state
323        let cap = tasks.len();
324        let shared = WakerList::new(cap);
325
326        for i in 0..cap {
327            // safety: i is always within capacity
328            unsafe {
329                shared.push(i);
330            }
331        }
332
333        // create the queue
334        Self { tasks, shared }
335    }
336}
337
338impl<Fut: Future> FusedStream for FuturesUnorderedBounded<Fut> {
339    fn is_terminated(&self) -> bool {
340        self.is_empty()
341    }
342}
343
344impl<Fut> fmt::Debug for FuturesUnorderedBounded<Fut> {
345    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346        f.debug_struct("FuturesUnorderedBounded")
347            .field("len", &self.tasks.len())
348            .finish_non_exhaustive()
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use core::{
356        cell::Cell,
357        future::{poll_fn, ready},
358        time::Duration,
359    };
360    use futures::{channel::oneshot, StreamExt};
361    use futures_test::task::noop_context;
362    use pin_project_lite::pin_project;
363    use std::time::Instant;
364
365    pin_project!(
366        struct PollCounter<'c, F> {
367            count: &'c Cell<usize>,
368            #[pin]
369            inner: F,
370        }
371    );
372
373    impl<F: Future> Future for PollCounter<'_, F> {
374        type Output = F::Output;
375        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
376            self.count.set(self.count.get() + 1);
377            self.project().inner.poll(cx)
378        }
379    }
380
381    struct Yield {
382        done: bool,
383    }
384    impl Unpin for Yield {}
385    impl Future for Yield {
386        type Output = ();
387
388        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
389            if self.as_mut().done {
390                Poll::Ready(())
391            } else {
392                cx.waker().wake_by_ref();
393                self.as_mut().done = true;
394                Poll::Pending
395            }
396        }
397    }
398
399    fn yield_now(count: &Cell<usize>) -> PollCounter<'_, Yield> {
400        PollCounter {
401            count,
402            inner: Yield { done: false },
403        }
404    }
405
406    #[test]
407    fn single() {
408        let c = Cell::new(0);
409
410        let mut buffer = FuturesUnorderedBounded::new(10);
411        buffer.push(yield_now(&c));
412        futures::executor::block_on(buffer.next());
413
414        drop(buffer);
415        assert_eq!(c.into_inner(), 2);
416    }
417
418    #[test]
419    #[should_panic(expected = "attempted to push into a full `FuturesUnorderedBounded`")]
420    fn full() {
421        let mut buffer = FuturesUnorderedBounded::new(1);
422        buffer.push(ready(()));
423        buffer.push(ready(()));
424    }
425
426    #[test]
427    fn len() {
428        let mut buffer = FuturesUnorderedBounded::new(1);
429
430        assert_eq!(buffer.len(), 0);
431        assert!(buffer.is_empty());
432        assert_eq!(buffer.capacity(), 1);
433        assert_eq!(buffer.size_hint(), (0, Some(0)));
434        assert!(buffer.is_terminated());
435
436        buffer.push(ready(()));
437
438        assert_eq!(buffer.len(), 1);
439        assert!(!buffer.is_empty());
440        assert_eq!(buffer.capacity(), 1);
441        assert_eq!(buffer.size_hint(), (1, Some(1)));
442        assert!(!buffer.is_terminated());
443
444        futures::executor::block_on(buffer.next());
445
446        assert_eq!(buffer.len(), 0);
447        assert!(buffer.is_empty());
448        assert_eq!(buffer.capacity(), 1);
449        assert_eq!(buffer.size_hint(), (0, Some(0)));
450        assert!(buffer.is_terminated());
451    }
452
453    #[test]
454    fn from_iter() {
455        let buffer = FuturesUnorderedBounded::from_iter((0..10).map(|_| ready(())));
456
457        assert_eq!(buffer.len(), 10);
458        assert_eq!(buffer.capacity(), 10);
459        assert_eq!(buffer.size_hint(), (10, Some(10)));
460    }
461
462    #[test]
463    fn drop_while_waiting() {
464        let mut buffer = FuturesUnorderedBounded::new(10);
465        let waker = Cell::new(None);
466        buffer.push(poll_fn(|cx| {
467            waker.set(Some(cx.waker().clone()));
468            Poll::<()>::Pending
469        }));
470
471        assert_eq!(buffer.poll_next_unpin(&mut noop_context()), Poll::Pending);
472        drop(buffer);
473
474        let cx = waker.take().unwrap();
475        drop(cx);
476    }
477
478    #[test]
479    fn multi() {
480        fn wait(count: &Cell<usize>) -> PollCounter<'_, Yield> {
481            yield_now(count)
482        }
483
484        let c = Cell::new(0);
485
486        let mut buffer = FuturesUnorderedBounded::new(10);
487        // build up
488        for _ in 0..10 {
489            buffer.push(wait(&c));
490        }
491        // poll and insert
492        for _ in 0..100 {
493            assert!(futures::executor::block_on(buffer.next()).is_some());
494            buffer.push(wait(&c));
495        }
496        // drain down
497        for _ in 0..10 {
498            assert!(futures::executor::block_on(buffer.next()).is_some());
499        }
500
501        let count = c.into_inner();
502        assert_eq!(count, 220);
503    }
504
505    #[test]
506    fn very_slow_task() {
507        let c = Cell::new(0);
508
509        let now = Instant::now();
510
511        let mut buffer = FuturesUnorderedBounded::new(10);
512        // build up
513        for _ in 0..9 {
514            buffer.push(yield_now(&c));
515        }
516        // spawn a slow future among a bunch of fast ones.
517        // the test is to make sure this doesn't block the rest getting completed
518        buffer.push(yield_now(&c));
519        // poll and insert
520        for _ in 0..100 {
521            assert!(futures::executor::block_on(buffer.next()).is_some());
522            buffer.push(yield_now(&c));
523        }
524        // drain down
525        for _ in 0..10 {
526            assert!(futures::executor::block_on(buffer.next()).is_some());
527        }
528
529        let dur = now.elapsed();
530        assert!(dur < Duration::from_millis(2050));
531
532        let count = c.into_inner();
533        assert_eq!(count, 220);
534    }
535
536    #[cfg(not(miri))]
537    #[tokio::test]
538    async fn unordered_large() {
539        for i in 0..256 {
540            let mut queue: FuturesUnorderedBounded<_> = ((0..i).map(|_| async move {
541                tokio::time::sleep(Duration::from_nanos(1)).await;
542            }))
543            .collect();
544            for _ in 0..i {
545                queue.next().await.unwrap();
546            }
547        }
548    }
549
550    #[test]
551    fn correct_fairer_order() {
552        const LEN: usize = 256;
553
554        let mut buffer = FuturesUnorderedBounded::new(LEN);
555        let mut txs = vec![];
556        for _ in 0..LEN {
557            let (tx, rx) = oneshot::channel();
558            buffer.push(rx);
559            txs.push(tx);
560        }
561
562        for _ in 0..=(LEN / 61) {
563            assert!(buffer.poll_next_unpin(&mut noop_context()).is_pending());
564        }
565
566        for (i, tx) in txs.into_iter().enumerate() {
567            let _ = tx.send(i);
568        }
569
570        for i in 0..LEN {
571            let poll = buffer.poll_next_unpin(&mut noop_context());
572            assert_eq!(poll, Poll::Ready(Some(Ok(i))));
573        }
574    }
575}