futures_util/stream/futures_ordered.rs
1use std::cmp::{Eq, PartialEq, PartialOrd, Ord, Ordering};
2use std::collections::BinaryHeap;
3use std::fmt::{self, Debug};
4use std::iter::FromIterator;
5
6use futures_core::{Async, Future, IntoFuture, Poll, Stream};
7use futures_core::task;
8
9use stream::FuturesUnordered;
10
11#[must_use = "futures do nothing unless polled"]
12#[derive(Debug)]
13struct OrderWrapper<T> {
14 item: T,
15 index: usize,
16}
17
18impl<T> PartialEq for OrderWrapper<T> {
19 fn eq(&self, other: &Self) -> bool {
20 self.index == other.index
21 }
22}
23
24impl<T> Eq for OrderWrapper<T> {}
25
26impl<T> PartialOrd for OrderWrapper<T> {
27 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
28 Some(self.cmp(other))
29 }
30}
31
32impl<T> Ord for OrderWrapper<T> {
33 fn cmp(&self, other: &Self) -> Ordering {
34 // BinaryHeap is a max heap, so compare backwards here.
35 other.index.cmp(&self.index)
36 }
37}
38
39impl<T> Future for OrderWrapper<T>
40 where T: Future
41{
42 type Item = OrderWrapper<T::Item>;
43 type Error = T::Error;
44
45 fn poll(&mut self, cx: &mut task::Context) -> Poll<Self::Item, Self::Error> {
46 let result = try_ready!(self.item.poll(cx));
47 Ok(Async::Ready(OrderWrapper {
48 item: result,
49 index: self.index
50 }))
51 }
52}
53
54/// An unbounded queue of futures.
55///
56/// This "combinator" is similar to `FuturesUnordered`, but it imposes an order
57/// on top of the set of futures. While futures in the set will race to
58/// completion in parallel, results will only be returned in the order their
59/// originating futures were added to the queue.
60///
61/// Futures are pushed into this queue and their realized values are yielded in
62/// order. This structure is optimized to manage a large number of futures.
63/// Futures managed by `FuturesOrdered` will only be polled when they generate
64/// notifications. This reduces the required amount of work needed to coordinate
65/// large numbers of futures.
66///
67/// When a `FuturesOrdered` is first created, it does not contain any futures.
68/// Calling `poll` in this state will result in `Ok(Async::Ready(None))` to be
69/// returned. Futures are submitted to the queue using `push`; however, the
70/// future will **not** be polled at this point. `FuturesOrdered` will only
71/// poll managed futures when `FuturesOrdered::poll` is called. As such, it
72/// is important to call `poll` after pushing new futures.
73///
74/// If `FuturesOrdered::poll` returns `Ok(Async::Ready(None))` this means that
75/// the queue is currently not managing any futures. A future may be submitted
76/// to the queue at a later time. At that point, a call to
77/// `FuturesOrdered::poll` will either return the future's resolved value
78/// **or** `Ok(Async::Pending)` if the future has not yet completed. When
79/// multiple futures are submitted to the queue, `FuturesOrdered::poll` will
80/// return `Ok(Async::Pending)` until the first future completes, even if
81/// some of the later futures have already completed.
82///
83/// Note that you can create a ready-made `FuturesOrdered` via the
84/// `futures_ordered` function in the `stream` module, or you can start with an
85/// empty queue with the `FuturesOrdered::new` constructor.
86#[must_use = "streams do nothing unless polled"]
87pub struct FuturesOrdered<T>
88 where T: Future
89{
90 in_progress: FuturesUnordered<OrderWrapper<T>>,
91 queued_results: BinaryHeap<OrderWrapper<T::Item>>,
92 next_incoming_index: usize,
93 next_outgoing_index: usize,
94}
95
96/// Converts a list of futures into a `Stream` of results from the futures.
97///
98/// This function will take an list of futures (e.g. a vector, an iterator,
99/// etc), and return a stream. The stream will yield items as they become
100/// available on the futures internally, in the order that their originating
101/// futures were submitted to the queue. If the futures complete out of order,
102/// items will be stored internally within `FuturesOrdered` until all preceding
103/// items have been yielded.
104///
105/// Note that the returned queue can also be used to dynamically push more
106/// futures into the queue as they become available.
107pub fn futures_ordered<I>(futures: I) -> FuturesOrdered<<I::Item as IntoFuture>::Future>
108where
109 I: IntoIterator,
110 I::Item: IntoFuture,
111{
112 futures.into_iter().map(|f| f.into_future()).collect()
113}
114
115impl<T> FuturesOrdered<T>
116 where T: Future
117{
118 /// Constructs a new, empty `FuturesOrdered`
119 ///
120 /// The returned `FuturesOrdered` does not contain any futures and, in this
121 /// state, `FuturesOrdered::poll` will return `Ok(Async::Ready(None))`.
122 pub fn new() -> FuturesOrdered<T> {
123 FuturesOrdered {
124 in_progress: FuturesUnordered::new(),
125 queued_results: BinaryHeap::new(),
126 next_incoming_index: 0,
127 next_outgoing_index: 0,
128 }
129 }
130
131 /// Returns the number of futures contained in the queue.
132 ///
133 /// This represents the total number of in-flight futures, both
134 /// those currently processing and those that have completed but
135 /// which are waiting for earlier futures to complete.
136 pub fn len(&self) -> usize {
137 self.in_progress.len() + self.queued_results.len()
138 }
139
140 /// Returns `true` if the queue contains no futures
141 pub fn is_empty(&self) -> bool {
142 self.in_progress.is_empty() && self.queued_results.is_empty()
143 }
144
145 /// Push a future into the queue.
146 ///
147 /// This function submits the given future to the internal set for managing.
148 /// This function will not call `poll` on the submitted future. The caller
149 /// must ensure that `FuturesOrdered::poll` is called in order to receive
150 /// task notifications.
151 pub fn push(&mut self, future: T) {
152 let wrapped = OrderWrapper {
153 item: future,
154 index: self.next_incoming_index,
155 };
156 self.next_incoming_index += 1;
157 self.in_progress.push(wrapped);
158 }
159}
160
161impl<T> Stream for FuturesOrdered<T>
162 where T: Future
163{
164 type Item = T::Item;
165 type Error = T::Error;
166
167 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
168 // Get any completed futures from the unordered set.
169 loop {
170 match self.in_progress.poll_next(cx)? {
171 Async::Ready(Some(result)) => self.queued_results.push(result),
172 Async::Ready(None) | Async::Pending => break,
173 }
174 }
175
176 if let Some(next_result) = self.queued_results.peek() {
177 // PeekMut::pop is not stable yet QQ
178 if next_result.index != self.next_outgoing_index {
179 return Ok(Async::Pending);
180 }
181 } else if !self.in_progress.is_empty() {
182 return Ok(Async::Pending);
183 } else {
184 return Ok(Async::Ready(None));
185 }
186
187 let next_result = self.queued_results.pop().unwrap();
188 self.next_outgoing_index += 1;
189 Ok(Async::Ready(Some(next_result.item)))
190 }
191}
192
193impl<T: Debug> Debug for FuturesOrdered<T>
194 where T: Future
195{
196 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
197 write!(fmt, "FuturesOrdered {{ ... }}")
198 }
199}
200
201impl<F: Future> FromIterator<F> for FuturesOrdered<F> {
202 fn from_iter<T>(iter: T) -> Self
203 where
204 T: IntoIterator<Item = F>,
205 {
206 let acc = FuturesOrdered::new();
207 iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc })
208 }
209}