tokio_stream_util/futures_ordered.rs
1//! An unbounded queue of futures which yields results in submission order.
2//! This is similar to `FuturesUnordered`, but imposes a FIFO order on top of
3//! the set of futures.
4//!
5//! Futures are pushed into this queue and their realized values are yielded in
6//! order. This structure is optimized to manage a large number of futures.
7//! Futures managed by `FuturesOrdered` will only be polled when they generate
8//! notifications. This reduces the required amount of work needed to coordinate
9//! large numbers of futures.
10//!
11//! When a `FuturesOrdered` is first created, it does not contain any futures.
12//! Calling `poll_next` in this state will result in `Poll::Ready(None)`
13//! to be returned. Futures are submitted to the queue using `push_back` (or
14//! `push_front`); however, the future will **not** be polled at this point.
15//! `FuturesOrdered` will only poll managed futures when `poll_next` is called.
16//! As such, it is important to call `poll_next` after pushing new futures.
17//! If `poll_next` returns `Poll::Ready(None)` this means that the queue is
18//! currently not managing any futures. A future may be submitted to the queue
19//! at a later time. At that point, a call to `poll_next` will either
20//! return the future's resolved value **or** `Poll::Pending` if the future has
21//! not yet completed. When multiple futures are submitted to the queue,
22//! `poll_next` will return `Poll::Pending` until the first future completes, even if
23//! some of the later futures have already completed.
24//! Note that you can create a ready-made `FuturesOrdered` via the
25//! `collect` method, or you can start with an empty queue with the
26//! `FuturesOrdered::new` constructor.
27//! This type is only available when the `std` or `alloc` feature of this
28//! library is activated, and it is activated by default.
29//!
30//! This type is similar to the `FuturesOrdered` type in the `futures`
31//! crate, but is adapted to work in the `tokio` ecosystem.
32//! # Examples
33//! ```
34//!
35//! use tokio_stream_util::FuturesOrdered;
36//! use tokio_stream::StreamExt;
37//!
38//! #[tokio::main]
39//! async fn main() {
40//! let mut futures = FuturesOrdered::new();
41//! futures.push_back(tokio::task::spawn(async move { 1 }));
42//! futures.push_back(tokio::task::spawn(async move { 2 }));
43//! assert_eq!(futures.len(), 2);
44//! assert_eq!(futures.is_empty(), false);
45//! assert_eq!(futures.next().await.unwrap().unwrap(), 1);
46//! }
47//!
48//! ```
49
50use crate::futures_unordered::FuturesUnordered;
51use crate::FusedStream;
52use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
53use core::cmp::Ordering;
54use core::fmt::{self, Debug};
55use core::future::Future;
56use core::iter::FromIterator;
57use core::pin::Pin;
58use core::task::{Context, Poll};
59use tokio_stream::Stream;
60
61#[must_use = "futures do nothing unless you `.await` or poll them"]
62struct Ordered<T> {
63 data: T, // A future or a future's output
64 // Use i64 for index since isize may overflow in 32-bit targets.
65 index: i64,
66}
67
68/// Projection type returned by `Ordered::project`.
69pub(crate) struct OrderedProj<'pin, T: ?Sized> {
70 data: Pin<&'pin mut T>,
71 index: &'pin mut i64,
72}
73
74impl<T> Ordered<T> {
75 /// Project a `Pin<&mut Ordered<T>>` to its fields.
76 ///
77 /// Safety: this uses `get_unchecked_mut` and `Pin::new_unchecked` to
78 /// construct the projection. The caller must ensure the projection is
79 /// used according to `Pin` invariants.
80 pub fn project<'pin>(self: Pin<&'pin mut Self>) -> OrderedProj<'pin, T> {
81 unsafe {
82 let Self { data, index } = Pin::get_unchecked_mut(self);
83 OrderedProj {
84 data: Pin::new_unchecked(data),
85 index,
86 }
87 }
88 }
89}
90
91impl<T: Debug> Debug for Ordered<T> {
92 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93 f.debug_struct("Ordered")
94 .field("data", &self.data)
95 .field("index", &self.index)
96 .finish()
97 }
98}
99
100impl<T> PartialEq for Ordered<T> {
101 fn eq(&self, other: &Self) -> bool {
102 self.index == other.index
103 }
104}
105
106impl<T> Eq for Ordered<T> {}
107
108impl<T> PartialOrd for Ordered<T> {
109 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
110 Some(self.cmp(other))
111 }
112}
113
114impl<T> Ord for Ordered<T> {
115 fn cmp(&self, other: &Self) -> Ordering {
116 // BinaryHeap is a max heap, so compare backwards here.
117 other.index.cmp(&self.index)
118 }
119}
120
121impl<T> Future for Ordered<T>
122where
123 T: Future,
124{
125 type Output = Ordered<T::Output>;
126
127 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128 // Project to access pinned `data` and unpinned `index`.
129 let proj = self.project();
130 let index = *proj.index;
131 proj.data.poll(cx).map(|output| Ordered {
132 data: output,
133 index,
134 })
135 }
136}
137
138/// An unbounded queue of futures.
139///
140/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO
141/// order on top of the set of futures. While futures in the set will race to
142/// completion in parallel, results will only be returned in the order their
143/// originating futures were added to the queue.
144///
145/// Futures are pushed into this queue and their realized values are yielded in
146/// order. This structure is optimized to manage a large number of futures.
147/// Futures managed by [`FuturesOrdered`] will only be polled when they generate
148/// notifications. This reduces the required amount of work needed to coordinate
149/// large numbers of futures.
150///
151/// When a [`FuturesOrdered`] is first created, it does not contain any futures.
152/// Calling [`poll_next`](FuturesOrdered::poll_next) in this state will result
153/// in [`Poll::Ready(None)`](Poll::Ready) to be returned. Futures are submitted
154/// to the queue using [`push_back`](FuturesOrdered::push_back) (or
155/// [`push_front`](FuturesOrdered::push_front)); however, the future will
156/// **not** be polled at this point. [`FuturesOrdered`] will only poll managed
157/// futures when [`FuturesOrdered::poll_next`] is called. As such, it
158/// is important to call [`poll_next`](FuturesOrdered::poll_next) after pushing
159/// new futures.
160///
161/// If [`FuturesOrdered::poll_next`] returns [`Poll::Ready(None)`](Poll::Ready)
162/// this means that the queue is currently not managing any futures. A future
163/// may be submitted to the queue at a later time. At that point, a call to
164/// [`FuturesOrdered::poll_next`] will either return the future's resolved value
165/// **or** [`Poll::Pending`] if the future has not yet completed. When
166/// multiple futures are submitted to the queue, [`FuturesOrdered::poll_next`]
167/// will return [`Poll::Pending`] until the first future completes, even if
168/// some of the later futures have already completed.
169///
170/// Note that you can create a ready-made [`FuturesOrdered`] via the
171/// [`collect`](Iterator::collect) method, or you can start with an empty queue
172/// with the [`FuturesOrdered::new`] constructor.
173///
174/// This type is only available when the `std` or `alloc` feature of this
175/// library is activated, and it is activated by default.
176#[must_use = "streams do nothing unless polled"]
177pub struct FuturesOrdered<T: Future> {
178 in_progress_queue: FuturesUnordered<Ordered<T>>,
179 queued_outputs: BinaryHeap<Ordered<T::Output>>,
180 next_incoming_index: i64,
181 next_outgoing_index: i64,
182}
183
184pub(crate) struct FuturesOrderedProj<'pin, T: Future> {
185 in_progress_queue: Pin<&'pin mut FuturesUnordered<Ordered<T>>>,
186 queued_outputs: &'pin mut BinaryHeap<Ordered<T::Output>>,
187 #[allow(dead_code)]
188 next_incoming_index: &'pin mut i64,
189 next_outgoing_index: &'pin mut i64,
190}
191
192impl<T: Future> Unpin for FuturesOrdered<T> {}
193
194impl<Fut: Future> FuturesOrdered<Fut> {
195 /// Constructs a new, empty `FuturesOrdered`
196 ///
197 /// The returned [`FuturesOrdered`] does not contain any futures and, in
198 /// this state, [`FuturesOrdered::poll_next`] will return
199 /// [`Poll::Ready(None)`](Poll::Ready).
200 pub fn new() -> Self {
201 Self {
202 in_progress_queue: FuturesUnordered::new(),
203 queued_outputs: BinaryHeap::new(),
204 next_incoming_index: 0,
205 next_outgoing_index: 0,
206 }
207 }
208
209 /// Returns the number of futures contained in the queue.
210 ///
211 /// This represents the total number of in-flight futures, both
212 /// those currently processing and those that have completed but
213 /// which are waiting for earlier futures to complete.
214 pub fn len(&self) -> usize {
215 self.in_progress_queue.len() + self.queued_outputs.len()
216 }
217
218 /// Returns `true` if the queue contains no futures
219 pub fn is_empty(&self) -> bool {
220 self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
221 }
222
223 pub(crate) fn project(self: Pin<&mut Self>) -> FuturesOrderedProj<'_, Fut> {
224 unsafe {
225 let this = self.get_unchecked_mut();
226 FuturesOrderedProj {
227 in_progress_queue: Pin::new_unchecked(&mut this.in_progress_queue),
228 queued_outputs: &mut this.queued_outputs,
229 next_incoming_index: &mut this.next_incoming_index,
230 next_outgoing_index: &mut this.next_outgoing_index,
231 }
232 }
233 }
234
235 /// Push a future into the queue.
236 ///
237 /// This function submits the given future to the internal set for managing.
238 /// This function will not call [`poll`](Future::poll) on the submitted
239 /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is
240 /// called in order to receive task notifications.
241 #[deprecated(note = "use `push_back` instead")]
242 pub fn push(&mut self, future: Fut) {
243 self.push_back(future);
244 }
245
246 /// Pushes a future to the back of the queue.
247 ///
248 /// This function submits the given future to the internal set for managing.
249 /// This function will not call [`poll`](Future::poll) on the submitted
250 /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is
251 /// called in order to receive task notifications.
252 pub fn push_back(&mut self, future: Fut) {
253 let wrapped = Ordered {
254 data: future,
255 index: self.next_incoming_index,
256 };
257 self.next_incoming_index += 1;
258 self.in_progress_queue.push(wrapped);
259 }
260
261 /// Pushes a future to the front of the queue.
262 ///
263 /// This function submits the given future to the internal set for managing.
264 /// This function will not call [`poll`](Future::poll) on the submitted
265 /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is
266 /// called in order to receive task notifications. This future will be
267 /// the next future to be returned complete.
268 pub fn push_front(&mut self, future: Fut) {
269 let wrapped = Ordered {
270 data: future,
271 index: self.next_outgoing_index - 1,
272 };
273 self.next_outgoing_index -= 1;
274 self.in_progress_queue.push(wrapped);
275 }
276}
277
278impl<Fut: Future> Default for FuturesOrdered<Fut> {
279 fn default() -> Self {
280 Self::new()
281 }
282}
283
284impl<Fut: Future> Stream for FuturesOrdered<Fut> {
285 type Item = Fut::Output;
286
287 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
288 let mut this = self.project();
289
290 // Check to see if we've already received the next value
291 if let Some(next_output) = this.queued_outputs.peek_mut() {
292 if next_output.index == *this.next_outgoing_index {
293 *this.next_outgoing_index += 1;
294 return Poll::Ready(Some(PeekMut::pop(next_output).data));
295 }
296 }
297
298 loop {
299 match this.in_progress_queue.as_mut().poll_next(cx) {
300 Poll::Ready(Some(output)) => {
301 if output.index == *this.next_outgoing_index {
302 *this.next_outgoing_index += 1;
303 break Poll::Ready(Some(output.data));
304 } else {
305 this.queued_outputs.push(output)
306 }
307 }
308 Poll::Pending => break Poll::Pending,
309 Poll::Ready(None) => break Poll::Ready(None),
310 }
311 }
312 }
313
314 fn size_hint(&self) -> (usize, Option<usize>) {
315 let len = self.len();
316 (len, Some(len))
317 }
318}
319
320impl<Fut: Future> Debug for FuturesOrdered<Fut> {
321 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
322 write!(f, "FuturesOrdered {{ ... }}")
323 }
324}
325
326impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
327 fn from_iter<T>(iter: T) -> Self
328 where
329 T: IntoIterator<Item = Fut>,
330 {
331 let acc = Self::new();
332 iter.into_iter().fold(acc, |mut acc, item| {
333 acc.push_back(item);
334 acc
335 })
336 }
337}
338
339impl<Fut: Future> FusedStream for FuturesOrdered<Fut> {
340 fn is_terminated(&self) -> bool {
341 self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
342 }
343}
344
345impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
346 fn extend<I>(&mut self, iter: I)
347 where
348 I: IntoIterator<Item = Fut>,
349 {
350 for item in iter {
351 self.push_back(item);
352 }
353 }
354}