futures_01_ext/
futures_ordered.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under both the MIT license found in the
5 * LICENSE-MIT file in the root directory of this source tree and the Apache
6 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7 * of this source tree.
8 */
9
10//! Definition of the FuturesOrdered combinator, executing each future in a sequence serially
11//! and streaming their results.
12
13use std::fmt;
14
15use futures::Async;
16use futures::Future;
17use futures::IntoFuture;
18use futures::Poll;
19use futures::Stream;
20
21/// A future which takes a list of futures, executes them serially, and
22/// resolves with a vector of the completed values.
23///
24/// This future is created with the `futures_ordered` method.
25#[must_use = "streams do nothing unless polled"]
26pub struct FuturesOrdered<I>
27where
28    I: IntoIterator,
29    I::Item: IntoFuture,
30{
31    elems: I::IntoIter,
32    current: Option<<I::Item as IntoFuture>::Future>,
33}
34
35impl<I> fmt::Debug for FuturesOrdered<I>
36where
37    I: IntoIterator,
38    I::Item: IntoFuture,
39    <<I as IntoIterator>::Item as IntoFuture>::Future: fmt::Debug,
40    <<I as IntoIterator>::Item as IntoFuture>::Item: fmt::Debug,
41{
42    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
43        fmt.debug_struct("FuturesOrdered")
44            .field("current", &self.current)
45            .finish()
46    }
47}
48
49/// Creates a stream which returns results of the futures given.
50///
51/// The returned stream will serially drive execution for all of its underlying
52/// futures. Errors from a future will be returned immediately, but the stream
53/// will still be valid and
54pub fn futures_ordered<I>(iter: I) -> FuturesOrdered<I>
55where
56    I: IntoIterator,
57    I::Item: IntoFuture,
58{
59    let mut elems = iter.into_iter();
60    let current = next_future(&mut elems);
61    FuturesOrdered { elems, current }
62}
63
64impl<I> Stream for FuturesOrdered<I>
65where
66    I: IntoIterator,
67    I::Item: IntoFuture,
68{
69    type Item = <I::Item as IntoFuture>::Item;
70    type Error = <I::Item as IntoFuture>::Error;
71
72    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
73        match self.current.take() {
74            Some(mut fut) => {
75                match fut.poll() {
76                    Ok(Async::Ready(v)) => {
77                        self.current = next_future(&mut self.elems);
78                        Ok(Async::Ready(Some(v)))
79                    }
80                    Ok(Async::NotReady) => {
81                        self.current = Some(fut);
82                        Ok(Async::NotReady)
83                    }
84                    Err(e) => {
85                        // Don't dump self.elems at this point because the
86                        // caller might want to keep going on.
87                        self.current = next_future(&mut self.elems);
88                        Err(e)
89                    }
90                }
91            }
92            None => {
93                // End of stream.
94                Ok(Async::Ready(None))
95            }
96        }
97    }
98}
99
100#[inline]
101fn next_future<I>(elems: &mut I) -> Option<<I::Item as IntoFuture>::Future>
102where
103    I: Iterator,
104    I::Item: IntoFuture,
105{
106    elems.next().map(IntoFuture::into_future)
107}
108
109#[cfg(test)]
110mod test {
111    use std::result;
112
113    use futures::sync::mpsc;
114    use futures::task;
115    use futures::Future;
116    use futures::Sink;
117    use futures::Stream;
118    use futures03::compat::Future01CompatExt;
119
120    use super::*;
121
122    #[test]
123    fn test_basic() {
124        let into_futs = vec![ok(1), ok(2)];
125        assert_eq!(futures_ordered(into_futs).collect().wait(), Ok(vec![1, 2]));
126
127        let into_futs = vec![ok(1), err(2), ok(3)];
128        assert_eq!(futures_ordered(into_futs).collect().wait(), Err(2));
129    }
130
131    #[test]
132    fn test_serial() {
133        let (tx, rx) = mpsc::channel(2);
134        // If both the futures returned in parallel, rx would have [20, 10].
135        // Note we move tx in. Once all tx handles have been dropped, the rx
136        // stream ends.
137        let futs = vec![delayed_future(10, tx.clone(), 4), delayed_future(20, tx, 2)];
138
139        let runtime = tokio::runtime::Runtime::new().unwrap();
140        runtime
141            .block_on(futures_ordered(futs).collect().compat())
142            .unwrap();
143        let results = runtime.block_on(rx.collect().compat());
144        assert_eq!(results, Ok(vec![10, 20]));
145    }
146
147    fn delayed_future<T>(v: T, tx: mpsc::Sender<T>, count: usize) -> DelayedFuture<T> {
148        DelayedFuture {
149            send: Some((v, tx)),
150            count,
151        }
152    }
153    #[must_use = "futures do nothing unless you `.await` or poll them"]
154    struct DelayedFuture<T> {
155        send: Option<(T, mpsc::Sender<T>)>,
156        count: usize,
157    }
158
159    impl<T> Future for DelayedFuture<T> {
160        type Item = ();
161        type Error = !;
162
163        fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
164            self.count -= 1;
165            if self.count == 0 {
166                let (v, tx) = self.send.take().unwrap();
167                // In production code tx.send(v) would return a future which we could forward the
168                // poll call to. In test code, this is fine.
169                tx.send(v).wait().unwrap();
170                Ok(Async::Ready(()))
171            } else {
172                // Make sure the computation moves forward.
173                task::current().notify();
174                Ok(Async::NotReady)
175            }
176        }
177    }
178
179    fn ok(v: i32) -> result::Result<i32, i32> {
180        Ok(v)
181    }
182
183    fn err(v: i32) -> result::Result<i32, i32> {
184        Err(v)
185    }
186}