tokio_stream_util/
futures_ordered.rs

1//! An unbounded queue of futures which yields results in submission order.
2//! This is similar to `FuturesUnordered`, but imposes a FIFO order on top of
3//! the set of futures.
4//!
5//! Futures are pushed into this queue and their realized values are yielded in
6//! order. This structure is optimized to manage a large number of futures.
7//! Futures managed by `FuturesOrdered` will only be polled when they generate
8//! notifications. This reduces the required amount of work needed to coordinate
9//! large numbers of futures.
10//!
11//! When a `FuturesOrdered` is first created, it does not contain any futures.
12//! Calling `poll_next` in this state will result in `Poll::Ready(None)`
13//! to be returned. Futures are submitted to the queue using `push_back` (or
14//! `push_front`); however, the future will **not** be polled at this point.
15//! `FuturesOrdered` will only poll managed futures when `poll_next` is called.
16//! As such, it is important to call `poll_next` after pushing new futures.
17//! If `poll_next` returns `Poll::Ready(None)` this means that the queue is
18//! currently not managing any futures. A future may be submitted to the queue
19//! at a later time. At that point, a call to `poll_next` will either
20//! return the future's resolved value **or** `Poll::Pending` if the future has
21//! not yet completed. When multiple futures are submitted to the queue,
22//! `poll_next` will return `Poll::Pending` until the first future completes, even if
23//! some of the later futures have already completed.
24//! Note that you can create a ready-made `FuturesOrdered` via the
25//! `collect` method, or you can start with an empty queue with the
26//! `FuturesOrdered::new` constructor.
27//! This type is only available when the `std` or `alloc` feature of this
28//! library is activated, and it is activated by default.
29//!
30//! This type is similar to the `FuturesOrdered` type in the `futures`
31//! crate, but is adapted to work in the `tokio` ecosystem.
32//! # Examples
33//! ```
34//!
35//! use tokio_stream_util::FuturesOrdered;
36//! use tokio_stream::StreamExt;
37//!
38//! #[tokio::main]
39//! async fn main() {
40//! let mut futures = FuturesOrdered::new();
41//! futures.push_back(tokio::task::spawn(async move { 1 }));
42//! futures.push_back(tokio::task::spawn(async move { 2 }));
43//! assert_eq!(futures.len(), 2);
44//! assert_eq!(futures.is_empty(), false);
45//! assert_eq!(futures.next().await.unwrap().unwrap(), 1);
46//! }
47//!
48//! ```
49
50use crate::futures_unordered::FuturesUnordered;
51use crate::FusedStream;
52use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
53use core::cmp::Ordering;
54use core::fmt::{self, Debug};
55use core::future::Future;
56use core::iter::FromIterator;
57use core::pin::Pin;
58use core::task::{Context, Poll};
59use tokio_stream::Stream;
60
61#[must_use = "futures do nothing unless you `.await` or poll them"]
62struct Ordered<T> {
63    data: T, // A future or a future's output
64    // Use i64 for index since isize may overflow in 32-bit targets.
65    index: i64,
66}
67
68/// Projection type returned by `Ordered::project`.
69pub(crate) struct OrderedProj<'pin, T: ?Sized> {
70    data: Pin<&'pin mut T>,
71    index: &'pin mut i64,
72}
73
74impl<T> Ordered<T> {
75    /// Project a `Pin<&mut Ordered<T>>` to its fields.
76    ///
77    /// Safety: this uses `get_unchecked_mut` and `Pin::new_unchecked` to
78    /// construct the projection. The caller must ensure the projection is
79    /// used according to `Pin` invariants.
80    pub fn project<'pin>(self: Pin<&'pin mut Self>) -> OrderedProj<'pin, T> {
81        unsafe {
82            let Self { data, index } = Pin::get_unchecked_mut(self);
83            OrderedProj {
84                data: Pin::new_unchecked(data),
85                index,
86            }
87        }
88    }
89}
90
91impl<T: Debug> Debug for Ordered<T> {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        f.debug_struct("Ordered")
94            .field("data", &self.data)
95            .field("index", &self.index)
96            .finish()
97    }
98}
99
100impl<T> PartialEq for Ordered<T> {
101    fn eq(&self, other: &Self) -> bool {
102        self.index == other.index
103    }
104}
105
106impl<T> Eq for Ordered<T> {}
107
108impl<T> PartialOrd for Ordered<T> {
109    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
110        Some(self.cmp(other))
111    }
112}
113
114impl<T> Ord for Ordered<T> {
115    fn cmp(&self, other: &Self) -> Ordering {
116        // BinaryHeap is a max heap, so compare backwards here.
117        other.index.cmp(&self.index)
118    }
119}
120
121impl<T> Future for Ordered<T>
122where
123    T: Future,
124{
125    type Output = Ordered<T::Output>;
126
127    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128        // Project to access pinned `data` and unpinned `index`.
129        let proj = self.project();
130        let index = *proj.index;
131        proj.data.poll(cx).map(|output| Ordered {
132            data: output,
133            index,
134        })
135    }
136}
137
138/// An unbounded queue of futures.
139///
140/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO
141/// order on top of the set of futures. While futures in the set will race to
142/// completion in parallel, results will only be returned in the order their
143/// originating futures were added to the queue.
144///
145/// Futures are pushed into this queue and their realized values are yielded in
146/// order. This structure is optimized to manage a large number of futures.
147/// Futures managed by [`FuturesOrdered`] will only be polled when they generate
148/// notifications. This reduces the required amount of work needed to coordinate
149/// large numbers of futures.
150///
151/// When a [`FuturesOrdered`] is first created, it does not contain any futures.
152/// Calling [`poll_next`](FuturesOrdered::poll_next) in this state will result
153/// in [`Poll::Ready(None)`](Poll::Ready) to be returned. Futures are submitted
154/// to the queue using [`push_back`](FuturesOrdered::push_back) (or
155/// [`push_front`](FuturesOrdered::push_front)); however, the future will
156/// **not** be polled at this point. [`FuturesOrdered`] will only poll managed
157/// futures when [`FuturesOrdered::poll_next`] is called. As such, it
158/// is important to call [`poll_next`](FuturesOrdered::poll_next) after pushing
159/// new futures.
160///
161/// If [`FuturesOrdered::poll_next`] returns [`Poll::Ready(None)`](Poll::Ready)
162/// this means that the queue is currently not managing any futures. A future
163/// may be submitted to the queue at a later time. At that point, a call to
164/// [`FuturesOrdered::poll_next`] will either return the future's resolved value
165/// **or** [`Poll::Pending`] if the future has not yet completed. When
166/// multiple futures are submitted to the queue, [`FuturesOrdered::poll_next`]
167/// will return [`Poll::Pending`] until the first future completes, even if
168/// some of the later futures have already completed.
169///
170/// Note that you can create a ready-made [`FuturesOrdered`] via the
171/// [`collect`](Iterator::collect) method, or you can start with an empty queue
172/// with the [`FuturesOrdered::new`] constructor.
173///
174/// This type is only available when the `std` or `alloc` feature of this
175/// library is activated, and it is activated by default.
176#[must_use = "streams do nothing unless polled"]
177pub struct FuturesOrdered<T: Future> {
178    in_progress_queue: FuturesUnordered<Ordered<T>>,
179    queued_outputs: BinaryHeap<Ordered<T::Output>>,
180    next_incoming_index: i64,
181    next_outgoing_index: i64,
182}
183
184pub(crate) struct FuturesOrderedProj<'pin, T: Future> {
185    in_progress_queue: Pin<&'pin mut FuturesUnordered<Ordered<T>>>,
186    queued_outputs: &'pin mut BinaryHeap<Ordered<T::Output>>,
187    #[allow(dead_code)]
188    next_incoming_index: &'pin mut i64,
189    next_outgoing_index: &'pin mut i64,
190}
191
192impl<T: Future> Unpin for FuturesOrdered<T> {}
193
194impl<Fut: Future> FuturesOrdered<Fut> {
195    /// Constructs a new, empty `FuturesOrdered`
196    ///
197    /// The returned [`FuturesOrdered`] does not contain any futures and, in
198    /// this state, [`FuturesOrdered::poll_next`] will return
199    /// [`Poll::Ready(None)`](Poll::Ready).
200    pub fn new() -> Self {
201        Self {
202            in_progress_queue: FuturesUnordered::new(),
203            queued_outputs: BinaryHeap::new(),
204            next_incoming_index: 0,
205            next_outgoing_index: 0,
206        }
207    }
208
209    /// Returns the number of futures contained in the queue.
210    ///
211    /// This represents the total number of in-flight futures, both
212    /// those currently processing and those that have completed but
213    /// which are waiting for earlier futures to complete.
214    pub fn len(&self) -> usize {
215        self.in_progress_queue.len() + self.queued_outputs.len()
216    }
217
218    /// Returns `true` if the queue contains no futures
219    pub fn is_empty(&self) -> bool {
220        self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
221    }
222
223    pub(crate) fn project(self: Pin<&mut Self>) -> FuturesOrderedProj<'_, Fut> {
224        unsafe {
225            let this = self.get_unchecked_mut();
226            FuturesOrderedProj {
227                in_progress_queue: Pin::new_unchecked(&mut this.in_progress_queue),
228                queued_outputs: &mut this.queued_outputs,
229                next_incoming_index: &mut this.next_incoming_index,
230                next_outgoing_index: &mut this.next_outgoing_index,
231            }
232        }
233    }
234
235    /// Push a future into the queue.
236    ///
237    /// This function submits the given future to the internal set for managing.
238    /// This function will not call [`poll`](Future::poll) on the submitted
239    /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is
240    /// called in order to receive task notifications.
241    #[deprecated(note = "use `push_back` instead")]
242    pub fn push(&mut self, future: Fut) {
243        self.push_back(future);
244    }
245
246    /// Pushes a future to the back of the queue.
247    ///
248    /// This function submits the given future to the internal set for managing.
249    /// This function will not call [`poll`](Future::poll) on the submitted
250    /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is
251    /// called in order to receive task notifications.
252    pub fn push_back(&mut self, future: Fut) {
253        let wrapped = Ordered {
254            data: future,
255            index: self.next_incoming_index,
256        };
257        self.next_incoming_index += 1;
258        self.in_progress_queue.push(wrapped);
259    }
260
261    /// Pushes a future to the front of the queue.
262    ///
263    /// This function submits the given future to the internal set for managing.
264    /// This function will not call [`poll`](Future::poll) on the submitted
265    /// future. The caller must ensure that [`FuturesOrdered::poll_next`] is
266    /// called in order to receive task notifications. This future will be
267    /// the next future to be returned complete.
268    pub fn push_front(&mut self, future: Fut) {
269        let wrapped = Ordered {
270            data: future,
271            index: self.next_outgoing_index - 1,
272        };
273        self.next_outgoing_index -= 1;
274        self.in_progress_queue.push(wrapped);
275    }
276}
277
278impl<Fut: Future> Default for FuturesOrdered<Fut> {
279    fn default() -> Self {
280        Self::new()
281    }
282}
283
284impl<Fut: Future> Stream for FuturesOrdered<Fut> {
285    type Item = Fut::Output;
286
287    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
288        let mut this = self.project();
289
290        // Check to see if we've already received the next value
291        if let Some(next_output) = this.queued_outputs.peek_mut() {
292            if next_output.index == *this.next_outgoing_index {
293                *this.next_outgoing_index += 1;
294                return Poll::Ready(Some(PeekMut::pop(next_output).data));
295            }
296        }
297
298        loop {
299            match this.in_progress_queue.as_mut().poll_next(cx) {
300                Poll::Ready(Some(output)) => {
301                    if output.index == *this.next_outgoing_index {
302                        *this.next_outgoing_index += 1;
303                        break Poll::Ready(Some(output.data));
304                    } else {
305                        this.queued_outputs.push(output)
306                    }
307                }
308                Poll::Pending => break Poll::Pending,
309                Poll::Ready(None) => break Poll::Ready(None),
310            }
311        }
312    }
313
314    fn size_hint(&self) -> (usize, Option<usize>) {
315        let len = self.len();
316        (len, Some(len))
317    }
318}
319
320impl<Fut: Future> Debug for FuturesOrdered<Fut> {
321    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
322        write!(f, "FuturesOrdered {{ ... }}")
323    }
324}
325
326impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
327    fn from_iter<T>(iter: T) -> Self
328    where
329        T: IntoIterator<Item = Fut>,
330    {
331        let acc = Self::new();
332        iter.into_iter().fold(acc, |mut acc, item| {
333            acc.push_back(item);
334            acc
335        })
336    }
337}
338
339impl<Fut: Future> FusedStream for FuturesOrdered<Fut> {
340    fn is_terminated(&self) -> bool {
341        self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
342    }
343}
344
345impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
346    fn extend<I>(&mut self, iter: I)
347    where
348        I: IntoIterator<Item = Fut>,
349    {
350        for item in iter {
351            self.push_back(item);
352        }
353    }
354}