futures_util/stream/
futures_ordered.rs

1use std::cmp::{Eq, PartialEq, PartialOrd, Ord, Ordering};
2use std::collections::BinaryHeap;
3use std::fmt::{self, Debug};
4use std::iter::FromIterator;
5
6use futures_core::{Async, Future, IntoFuture, Poll, Stream};
7use futures_core::task;
8
9use stream::FuturesUnordered;
10
11#[must_use = "futures do nothing unless polled"]
12#[derive(Debug)]
13struct OrderWrapper<T> {
14    item: T,
15    index: usize,
16}
17
18impl<T> PartialEq for OrderWrapper<T> {
19    fn eq(&self, other: &Self) -> bool {
20        self.index == other.index
21    }
22}
23
24impl<T> Eq for OrderWrapper<T> {}
25
26impl<T> PartialOrd for OrderWrapper<T> {
27    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
28        Some(self.cmp(other))
29    }
30}
31
32impl<T> Ord for OrderWrapper<T> {
33    fn cmp(&self, other: &Self) -> Ordering {
34        // BinaryHeap is a max heap, so compare backwards here.
35        other.index.cmp(&self.index)
36    }
37}
38
39impl<T> Future for OrderWrapper<T>
40    where T: Future
41{
42    type Item = OrderWrapper<T::Item>;
43    type Error = T::Error;
44
45    fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
46        let result = try_ready!(self.item.poll(cx));
47        Ok(Async::Ready(OrderWrapper {
48            item: result,
49            index: self.index
50        }))
51    }
52}
53
54/// An unbounded queue of futures.
55///
56/// This "combinator" is similar to `FuturesUnordered`, but it imposes an order
57/// on top of the set of futures. While futures in the set will race to
58/// completion in parallel, results will only be returned in the order their
59/// originating futures were added to the queue.
60///
61/// Futures are pushed into this queue and their realized values are yielded in
62/// order. This structure is optimized to manage a large number of futures.
63/// Futures managed by `FuturesOrdered` will only be polled when they generate
64/// notifications. This reduces the required amount of work needed to coordinate
65/// large numbers of futures.
66///
67/// When a `FuturesOrdered` is first created, it does not contain any futures.
68/// Calling `poll` in this state will result in `Ok(Async::Ready(None))` to be
69/// returned. Futures are submitted to the queue using `push`; however, the
70/// future will **not** be polled at this point. `FuturesOrdered` will only
71/// poll managed futures when `FuturesOrdered::poll` is called. As such, it
72/// is important to call `poll` after pushing new futures.
73///
74/// If `FuturesOrdered::poll` returns `Ok(Async::Ready(None))` this means that
75/// the queue is currently not managing any futures. A future may be submitted
76/// to the queue at a later time. At that point, a call to
77/// `FuturesOrdered::poll` will either return the future's resolved value
78/// **or** `Ok(Async::Pending)` if the future has not yet completed. When
79/// multiple futures are submitted to the queue, `FuturesOrdered::poll` will
80/// return `Ok(Async::Pending)` until the first future completes, even if
81/// some of the later futures have already completed.
82///
83/// Note that you can create a ready-made `FuturesOrdered` via the
84/// `futures_ordered` function in the `stream` module, or you can start with an
85/// empty queue with the `FuturesOrdered::new` constructor.
86#[must_use = "streams do nothing unless polled"]
87pub struct FuturesOrdered<T>
88    where T: Future
89{
90    in_progress: FuturesUnordered<OrderWrapper<T>>,
91    queued_results: BinaryHeap<OrderWrapper<T::Item>>,
92    next_incoming_index: usize,
93    next_outgoing_index: usize,
94}
95
96/// Converts a list of futures into a `Stream` of results from the futures.
97///
98/// This function will take an list of futures (e.g. a vector, an iterator,
99/// etc), and return a stream. The stream will yield items as they become
100/// available on the futures internally, in the order that their originating
101/// futures were submitted to the queue. If the futures complete out of order,
102/// items will be stored internally within `FuturesOrdered` until all preceding
103/// items have been yielded.
104///
105/// Note that the returned queue can also be used to dynamically push more
106/// futures into the queue as they become available.
107pub fn futures_ordered<I>(futures: I) -> FuturesOrdered<<I::Item as IntoFuture>::Future>
108where
109    I: IntoIterator,
110    I::Item: IntoFuture,
111{
112    futures.into_iter().map(|f| f.into_future()).collect()
113}
114
115impl<T> FuturesOrdered<T>
116    where T: Future
117{
118    /// Constructs a new, empty `FuturesOrdered`
119    ///
120    /// The returned `FuturesOrdered` does not contain any futures and, in this
121    /// state, `FuturesOrdered::poll` will return `Ok(Async::Ready(None))`.
122    pub fn new() -> FuturesOrdered<T> {
123        FuturesOrdered {
124            in_progress: FuturesUnordered::new(),
125            queued_results: BinaryHeap::new(),
126            next_incoming_index: 0,
127            next_outgoing_index: 0,
128        }
129    }
130
131    /// Returns the number of futures contained in the queue.
132    ///
133    /// This represents the total number of in-flight futures, both
134    /// those currently processing and those that have completed but
135    /// which are waiting for earlier futures to complete.
136    pub fn len(&self) -> usize {
137        self.in_progress.len() + self.queued_results.len()
138    }
139
140    /// Returns `true` if the queue contains no futures
141    pub fn is_empty(&self) -> bool {
142        self.in_progress.is_empty() && self.queued_results.is_empty()
143    }
144
145    /// Push a future into the queue.
146    ///
147    /// This function submits the given future to the internal set for managing.
148    /// This function will not call `poll` on the submitted future. The caller
149    /// must ensure that `FuturesOrdered::poll` is called in order to receive
150    /// task notifications.
151    pub fn push(&mut self, future: T) {
152        let wrapped = OrderWrapper {
153            item: future,
154            index: self.next_incoming_index,
155        };
156        self.next_incoming_index += 1;
157        self.in_progress.push(wrapped);
158    }
159}
160
161impl<T> Stream for FuturesOrdered<T>
162    where T: Future
163{
164    type Item = T::Item;
165    type Error = T::Error;
166
167    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
168        // Get any completed futures from the unordered set.
169        loop {
170            match self.in_progress.poll_next(cx)? {
171                Async::Ready(Some(result)) => self.queued_results.push(result),
172                Async::Ready(None) | Async::Pending => break,
173            }
174        }
175
176        if let Some(next_result) = self.queued_results.peek() {
177            // PeekMut::pop is not stable yet QQ
178            if next_result.index != self.next_outgoing_index {
179                return Ok(Async::Pending);
180            }
181        } else if !self.in_progress.is_empty() {
182            return Ok(Async::Pending);
183        } else {
184            return Ok(Async::Ready(None));
185        }
186
187        let next_result = self.queued_results.pop().unwrap();
188        self.next_outgoing_index += 1;
189        Ok(Async::Ready(Some(next_result.item)))
190    }
191}
192
193impl<T: Debug> Debug for FuturesOrdered<T>
194    where T: Future
195{
196    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
197        write!(fmt, "FuturesOrdered {{ ... }}")
198    }
199}
200
201impl<F: Future> FromIterator<F> for FuturesOrdered<F> {
202    fn from_iter<T>(iter: T) -> Self
203    where
204        T: IntoIterator<Item = F>,
205    {
206        let acc = FuturesOrdered::new();
207        iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc })
208    }
209}