futures_buffered/
futures_ordered.rs

1use crate::futures_ordered_bounded::OrderWrapper;
2use crate::FuturesUnordered;
3use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
4use core::fmt;
5use core::iter::FromIterator;
6use core::num::Wrapping;
7use core::pin::Pin;
8use futures_core::future::Future;
9use futures_core::ready;
10use futures_core::stream::Stream;
11use futures_core::{
12    task::{Context, Poll},
13    FusedStream,
14};
15
16/// An unbounded queue of futures.
17///
18/// This "combinator" is similar to `FuturesUnordered`, but it imposes an order
19/// on top of the set of futures. While futures in the set will race to
20/// completion in parallel, results will only be returned in the order their
21/// originating futures were added to the queue.
22///
23/// Futures are pushed into this queue and their realized values are yielded in
24/// order. This structure is optimized to manage a large number of futures.
25/// Futures managed by `FuturesOrdered` will only be polled when they generate
26/// notifications. This reduces the required amount of work needed to coordinate
27/// large numbers of futures.
28///
29/// When a `FuturesOrdered` is first created, it does not contain any futures.
30/// Calling `poll` in this state will result in `Poll::Ready(None))` to be
31/// returned. Futures are submitted to the queue using `push`; however, the
32/// future will **not** be polled at this point. `FuturesOrdered` will only
33/// poll managed futures when `FuturesOrdered::poll` is called. As such, it
34/// is important to call `poll` after pushing new futures.
35///
36/// If `FuturesOrdered::poll` returns `Poll::Ready(None)` this means that
37/// the queue is currently not managing any futures. A future may be submitted
38/// to the queue at a later time. At that point, a call to
39/// `FuturesOrdered::poll` will either return the future's resolved value
40/// **or** `Poll::Pending` if the future has not yet completed. When
41/// multiple futures are submitted to the queue, `FuturesOrdered::poll` will
42/// return `Poll::Pending` until the first future completes, even if
43/// some of the later futures have already completed.
44///
45/// Note that you can create a ready-made `FuturesOrdered` via the
46/// [`collect`](Iterator::collect) method, or you can start with an empty queue
47/// with the `FuturesOrdered::new` constructor.
48#[must_use = "streams do nothing unless polled"]
49pub struct FuturesOrdered<T: Future> {
50    in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
51    queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
52    next_incoming_index: Wrapping<usize>,
53    next_outgoing_index: Wrapping<usize>,
54}
55
56impl<T: Future> Unpin for FuturesOrdered<T> {}
57
58impl<Fut: Future> FuturesOrdered<Fut> {
59    /// Constructs a new, empty `FuturesOrdered`
60    ///
61    /// The returned `FuturesOrdered` does not contain any futures and, in this
62    /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`.
63    pub fn new() -> Self {
64        // todo: make const
65        Self {
66            in_progress_queue: FuturesUnordered::new(),
67            queued_outputs: BinaryHeap::new(),
68            next_incoming_index: Wrapping(0),
69            next_outgoing_index: Wrapping(0),
70        }
71    }
72
73    /// Constructs a new, empty `FuturesOrdered`
74    ///
75    /// The returned `FuturesOrdered` does not contain any futures and, in this
76    /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`.
77    pub fn with_capacity(capacity: usize) -> Self {
78        Self {
79            in_progress_queue: FuturesUnordered::with_capacity(capacity),
80            queued_outputs: BinaryHeap::with_capacity(capacity - 1),
81            next_incoming_index: Wrapping(0),
82            next_outgoing_index: Wrapping(0),
83        }
84    }
85
86    /// Returns the number of futures contained in the queue.
87    ///
88    /// This represents the total number of in-flight futures, both
89    /// those currently processing and those that have completed but
90    /// which are waiting for earlier futures to complete.
91    pub fn len(&self) -> usize {
92        self.in_progress_queue.len() + self.queued_outputs.len()
93    }
94
95    /// Returns `true` if the queue contains no futures
96    pub fn is_empty(&self) -> bool {
97        self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
98    }
99
100    /// Pushes a future to the back of the queue.
101    ///
102    /// This function submits the given future to the internal set for managing.
103    /// This function will not call `poll` on the submitted future. The caller
104    /// must ensure that `FuturesOrderedBounded::poll` is called in order to receive
105    /// task notifications.
106    pub fn push_back(&mut self, future: Fut) {
107        self.in_progress_queue.push(OrderWrapper {
108            data: future,
109            index: self.next_incoming_index.0,
110        });
111        self.next_incoming_index += 1;
112    }
113
114    /// Pushes a future to the front of the queue.
115    ///
116    /// This function submits the given future to the internal set for managing.
117    /// This function will not call `poll` on the submitted future. The caller
118    /// must ensure that `FuturesOrderedBounded::poll` is called in order to receive
119    /// task notifications. This future will be the next future to be returned
120    /// complete.
121    pub fn push_front(&mut self, future: Fut) {
122        self.next_outgoing_index -= 1;
123        self.in_progress_queue.push(OrderWrapper {
124            data: future,
125            index: self.next_outgoing_index.0,
126        });
127    }
128}
129
130impl<Fut: Future> Default for FuturesOrdered<Fut> {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136impl<Fut: Future> Stream for FuturesOrdered<Fut> {
137    type Item = Fut::Output;
138
139    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
140        const MSB: usize = !(usize::MAX >> 1);
141
142        let this = &mut *self;
143
144        // house keeping if the indices gets too high
145        if this.next_outgoing_index.0 & MSB == MSB {
146            let mut ready_queue = core::mem::take(&mut this.queued_outputs).into_vec();
147            for entry in &mut ready_queue {
148                entry.index ^= MSB;
149            }
150            this.queued_outputs = ready_queue.into();
151
152            for group in &mut this.in_progress_queue.groups {
153                for task in group.tasks.iter_mut() {
154                    *task.project().index ^= MSB;
155                }
156            }
157
158            this.next_outgoing_index.0 ^= MSB;
159            this.next_incoming_index.0 ^= MSB;
160        }
161
162        // Check to see if we've already received the next value
163        if let Some(next_output) = this.queued_outputs.peek_mut() {
164            if next_output.index == this.next_outgoing_index.0 {
165                this.next_outgoing_index += 1;
166                return Poll::Ready(Some(PeekMut::pop(next_output).data));
167            }
168        }
169
170        loop {
171            match ready!(Pin::new(&mut this.in_progress_queue).poll_next(cx)) {
172                Some(output) => {
173                    if output.index == this.next_outgoing_index.0 {
174                        this.next_outgoing_index += 1;
175                        return Poll::Ready(Some(output.data));
176                    }
177
178                    this.queued_outputs.push(output);
179                }
180                None => return Poll::Ready(None),
181            }
182        }
183    }
184
185    fn size_hint(&self) -> (usize, Option<usize>) {
186        let len = self.len();
187        (len, Some(len))
188    }
189}
190
191impl<Fut: Future> fmt::Debug for FuturesOrdered<Fut> {
192    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
193        write!(f, "FuturesOrdered {{ ... }}")
194    }
195}
196
197impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
198    fn from_iter<T>(iter: T) -> Self
199    where
200        T: IntoIterator<Item = Fut>,
201    {
202        let mut index = Wrapping(0);
203        let in_progress_queue = FuturesUnordered::from_iter(iter.into_iter().map(|data| {
204            let next_index = index + Wrapping(1);
205            OrderWrapper {
206                data,
207                index: core::mem::replace(&mut index, next_index).0,
208            }
209        }));
210        Self {
211            in_progress_queue,
212            queued_outputs: BinaryHeap::new(),
213            next_incoming_index: index,
214            next_outgoing_index: Wrapping(0),
215        }
216    }
217}
218
219impl<Fut: Future> FusedStream for FuturesOrdered<Fut> {
220    fn is_terminated(&self) -> bool {
221        self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
222    }
223}
224
225impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
226    fn extend<I>(&mut self, iter: I)
227    where
228        I: IntoIterator<Item = Fut>,
229    {
230        for item in iter {
231            self.push_back(item);
232        }
233    }
234}
235
236#[cfg(test)]
237mod tests {
238    use crate::FuturesOrdered;
239    use core::{future::ready, task::Poll};
240    use futures::{Stream, StreamExt};
241    use futures_test::task::noop_context;
242
243    #[test]
244    fn ordered() {
245        let mut buffer = FuturesOrdered::with_capacity(1);
246
247        for i in 0..10 {
248            buffer.push_back(ready(i));
249        }
250
251        for i in 0..10 {
252            assert_eq!(
253                buffer.poll_next_unpin(&mut noop_context()),
254                Poll::Ready(Some(i))
255            );
256        }
257    }
258
259    #[test]
260    fn ordered_front() {
261        let mut buffer = FuturesOrdered::with_capacity(1);
262
263        for i in 0..10 {
264            buffer.push_front(ready(i));
265        }
266
267        for i in (0..10).rev() {
268            assert_eq!(
269                buffer.poll_next_unpin(&mut noop_context()),
270                Poll::Ready(Some(i))
271            );
272        }
273    }
274
275    #[test]
276    fn from_iter() {
277        let buffer = FuturesOrdered::from_iter((0..10).map(|_| ready(())));
278
279        assert_eq!(buffer.len(), 10);
280        assert_eq!(buffer.size_hint(), (10, Some(10)));
281    }
282}