futures_buffered/
futures_ordered_bounded.rs

1use crate::FuturesUnorderedBounded;
2use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
3use core::cmp::Ordering;
4use core::fmt;
5use core::iter::FromIterator;
6use core::num::Wrapping;
7use core::pin::Pin;
8use futures_core::future::Future;
9use futures_core::ready;
10use futures_core::stream::Stream;
11use futures_core::{
12    task::{Context, Poll},
13    FusedStream,
14};
15use pin_project_lite::pin_project;
16
17pin_project! {
18    #[must_use = "futures do nothing unless you `.await` or poll them"]
19    #[derive(Debug)]
20    pub(crate) struct OrderWrapper<T> {
21        #[pin]
22        pub data: T, // A future or a future's output
23        pub index: usize,
24    }
25}
26
27impl<T> PartialEq for OrderWrapper<T> {
28    fn eq(&self, other: &Self) -> bool {
29        self.index == other.index
30    }
31}
32
33impl<T> Eq for OrderWrapper<T> {}
34
35impl<T> PartialOrd for OrderWrapper<T> {
36    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
37        Some(self.cmp(other))
38    }
39}
40
41impl<T> Ord for OrderWrapper<T> {
42    fn cmp(&self, other: &Self) -> Ordering {
43        // BinaryHeap is a max heap, so compare backwards here.
44        other.index.cmp(&self.index)
45    }
46}
47
48impl<T> Future for OrderWrapper<T>
49where
50    T: Future,
51{
52    type Output = OrderWrapper<T::Output>;
53
54    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
55        let index = self.index;
56        self.project().data.poll(cx).map(|output| OrderWrapper {
57            data: output,
58            index,
59        })
60    }
61}
62
63/// An unbounded queue of futures.
64///
65/// This "combinator" is similar to `FuturesUnordered`, but it imposes an order
66/// on top of the set of futures. While futures in the set will race to
67/// completion in parallel, results will only be returned in the order their
68/// originating futures were added to the queue.
69///
70/// Futures are pushed into this queue and their realized values are yielded in
71/// order. This structure is optimized to manage a large number of futures.
72/// Futures managed by `FuturesOrderedBounded` will only be polled when they generate
73/// notifications. This reduces the required amount of work needed to coordinate
74/// large numbers of futures.
75///
76/// When a `FuturesOrderedBounded` is first created, it does not contain any futures.
77/// Calling `poll` in this state will result in `Poll::Ready(None))` to be
78/// returned. Futures are submitted to the queue using `push`; however, the
79/// future will **not** be polled at this point. `FuturesOrderedBounded` will only
80/// poll managed futures when `FuturesOrderedBounded::poll` is called. As such, it
81/// is important to call `poll` after pushing new futures.
82///
83/// If `FuturesOrderedBounded::poll` returns `Poll::Ready(None)` this means that
84/// the queue is currently not managing any futures. A future may be submitted
85/// to the queue at a later time. At that point, a call to
86/// `FuturesOrderedBounded::poll` will either return the future's resolved value
87/// **or** `Poll::Pending` if the future has not yet completed. When
88/// multiple futures are submitted to the queue, `FuturesOrderedBounded::poll` will
89/// return `Poll::Pending` until the first future completes, even if
90/// some of the later futures have already completed.
91///
92/// Note that you can create a ready-made `FuturesOrderedBounded` via the
93/// [`collect`](Iterator::collect) method, or you can start with an empty queue
94/// with the `FuturesOrderedBounded::new` constructor.
95#[must_use = "streams do nothing unless polled"]
96pub struct FuturesOrderedBounded<T: Future> {
97    pub(crate) in_progress_queue: FuturesUnorderedBounded<OrderWrapper<T>>,
98    queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
99    pub(crate) next_incoming_index: Wrapping<usize>,
100    next_outgoing_index: Wrapping<usize>,
101}
102
103impl<T: Future> Unpin for FuturesOrderedBounded<T> {}
104
105impl<Fut: Future> FuturesOrderedBounded<Fut> {
106    /// Constructs a new, empty `FuturesOrderedBounded`
107    ///
108    /// The returned `FuturesOrderedBounded` does not contain any futures and, in this
109    /// state, `FuturesOrderedBounded::poll_next` will return `Poll::Ready(None)`.
110    pub fn new(capacity: usize) -> Self {
111        Self {
112            in_progress_queue: FuturesUnorderedBounded::new(capacity),
113            queued_outputs: BinaryHeap::with_capacity(capacity - 1),
114            next_incoming_index: Wrapping(0),
115            next_outgoing_index: Wrapping(0),
116        }
117    }
118
119    /// Returns the number of futures contained in the queue.
120    ///
121    /// This represents the total number of in-flight futures, both
122    /// those currently processing and those that have completed but
123    /// which are waiting for earlier futures to complete.
124    pub fn len(&self) -> usize {
125        self.in_progress_queue.len() + self.queued_outputs.len()
126    }
127
128    /// Returns `true` if the queue contains no futures
129    pub fn is_empty(&self) -> bool {
130        self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
131    }
132
133    /// Pushes a future to the back of the queue.
134    ///
135    /// This function submits the given future to the internal set for managing.
136    /// This function will not call `poll` on the submitted future. The caller
137    /// must ensure that `FuturesOrderedBounded::poll` is called in order to receive
138    /// task notifications.
139    ///
140    /// # Errors
141    /// This method will error if the buffer is currently full, returning the future back
142    pub fn try_push_back(&mut self, future: Fut) -> Result<(), Fut> {
143        self.in_progress_queue.try_push_with(future, |future| {
144            let wrapped = OrderWrapper {
145                data: future,
146                index: self.next_incoming_index.0,
147            };
148            self.next_incoming_index += 1;
149            wrapped
150        })
151    }
152
153    /// Pushes a future to the front of the queue.
154    ///
155    /// This function submits the given future to the internal set for managing.
156    /// This function will not call `poll` on the submitted future. The caller
157    /// must ensure that `FuturesOrderedBounded::poll` is called in order to receive
158    /// task notifications. This future will be the next future to be returned
159    /// complete.
160    ///
161    /// # Errors
162    /// This method will error if the buffer is currently full, returning the future back
163    pub fn try_push_front(&mut self, future: Fut) -> Result<(), Fut> {
164        self.in_progress_queue.try_push_with(future, |future| {
165            self.next_outgoing_index -= 1;
166            OrderWrapper {
167                data: future,
168                index: self.next_outgoing_index.0,
169            }
170        })
171    }
172
173    /// Pushes a future to the back of the queue.
174    ///
175    /// This function submits the given future to the internal set for managing.
176    /// This function will not call `poll` on the submitted future. The caller
177    /// must ensure that `FuturesOrderedBounded::poll` is called in order to receive
178    /// task notifications.
179    ///
180    /// # Panics
181    /// This method will panic if the buffer is currently full. See [`FuturesOrderedBounded::try_push_back`] to get a result instead
182    #[track_caller]
183    pub fn push_back(&mut self, future: Fut) {
184        if self.try_push_back(future).is_err() {
185            panic!("attempted to push into a full `FuturesOrderedBounded`");
186        }
187    }
188
189    /// Pushes a future to the front of the queue.
190    ///
191    /// This function submits the given future to the internal set for managing.
192    /// This function will not call `poll` on the submitted future. The caller
193    /// must ensure that `FuturesOrderedBounded::poll` is called in order to receive
194    /// task notifications. This future will be the next future to be returned
195    /// complete.
196    ///
197    /// # Panics
198    /// This method will panic if the buffer is currently full. See [`FuturesOrderedBounded::try_push_front`] to get a result instead
199    #[track_caller]
200    pub fn push_front(&mut self, future: Fut) {
201        if self.try_push_front(future).is_err() {
202            panic!("attempted to push into a full `FuturesOrderedBounded`");
203        }
204    }
205}
206
207impl<Fut: Future> Stream for FuturesOrderedBounded<Fut> {
208    type Item = Fut::Output;
209
210    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
211        const MSB: usize = !(usize::MAX >> 1);
212
213        let this = &mut *self;
214
215        // house keeping if the indices gets too high
216        if this.next_outgoing_index.0 & MSB == MSB {
217            let mut ready_queue = core::mem::take(&mut this.queued_outputs).into_vec();
218            for entry in &mut ready_queue {
219                entry.index ^= MSB;
220            }
221            this.queued_outputs = ready_queue.into();
222
223            for task in this.in_progress_queue.tasks.iter_mut() {
224                *task.project().index ^= MSB;
225            }
226
227            this.next_outgoing_index.0 ^= MSB;
228            this.next_incoming_index.0 ^= MSB;
229        }
230
231        // Check to see if we've already received the next value
232        if let Some(next_output) = this.queued_outputs.peek_mut() {
233            if next_output.index == this.next_outgoing_index.0 {
234                this.next_outgoing_index += 1;
235                return Poll::Ready(Some(PeekMut::pop(next_output).data));
236            }
237        }
238
239        loop {
240            match ready!(Pin::new(&mut this.in_progress_queue).poll_next(cx)) {
241                Some(output) => {
242                    if output.index == this.next_outgoing_index.0 {
243                        this.next_outgoing_index += 1;
244                        return Poll::Ready(Some(output.data));
245                    }
246
247                    this.queued_outputs.push(output);
248                }
249                None => return Poll::Ready(None),
250            }
251        }
252    }
253
254    fn size_hint(&self) -> (usize, Option<usize>) {
255        let len = self.len();
256        (len, Some(len))
257    }
258}
259
260impl<Fut: Future> fmt::Debug for FuturesOrderedBounded<Fut> {
261    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
262        write!(f, "FuturesOrderedBounded {{ ... }}")
263    }
264}
265
266impl<Fut: Future> FromIterator<Fut> for FuturesOrderedBounded<Fut> {
267    fn from_iter<T>(iter: T) -> Self
268    where
269        T: IntoIterator<Item = Fut>,
270    {
271        let mut index = Wrapping(0);
272        let in_progress_queue = FuturesUnorderedBounded::from_iter(iter.into_iter().map(|data| {
273            let next_index = index + Wrapping(1);
274            OrderWrapper {
275                data,
276                index: core::mem::replace(&mut index, next_index).0,
277            }
278        }));
279        Self {
280            in_progress_queue,
281            queued_outputs: BinaryHeap::new(),
282            next_incoming_index: index,
283            next_outgoing_index: Wrapping(0),
284        }
285    }
286}
287
288impl<Fut: Future> FusedStream for FuturesOrderedBounded<Fut> {
289    fn is_terminated(&self) -> bool {
290        self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
291    }
292}
293
294impl<Fut: Future> Extend<Fut> for FuturesOrderedBounded<Fut> {
295    fn extend<I>(&mut self, iter: I)
296    where
297        I: IntoIterator<Item = Fut>,
298    {
299        for item in iter {
300            self.push_back(item);
301        }
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use crate::FuturesOrderedBounded;
308    use core::{future::ready, task::Poll};
309    use futures::{Stream, StreamExt};
310    use futures_test::task::noop_context;
311
312    #[test]
313    fn ordered() {
314        let mut buffer = FuturesOrderedBounded::new(10);
315
316        for i in 0..10 {
317            buffer.push_back(ready(i));
318        }
319
320        for i in 0..10 {
321            assert_eq!(
322                buffer.poll_next_unpin(&mut noop_context()),
323                Poll::Ready(Some(i))
324            );
325        }
326    }
327
328    #[test]
329    fn ordered_front() {
330        let mut buffer = FuturesOrderedBounded::new(10);
331
332        for i in 0..10 {
333            buffer.push_front(ready(i));
334        }
335
336        for i in (0..10).rev() {
337            assert_eq!(
338                buffer.poll_next_unpin(&mut noop_context()),
339                Poll::Ready(Some(i))
340            );
341        }
342    }
343
344    #[test]
345    #[should_panic(expected = "attempted to push into a full `FuturesOrderedBounded`")]
346    fn full_back() {
347        let mut buffer = FuturesOrderedBounded::new(1);
348        buffer.push_back(ready(()));
349        buffer.push_back(ready(()));
350    }
351
352    #[test]
353    #[should_panic(expected = "attempted to push into a full `FuturesOrderedBounded`")]
354    fn full_front() {
355        let mut buffer = FuturesOrderedBounded::new(1);
356        buffer.push_front(ready(()));
357        buffer.push_front(ready(()));
358    }
359
360    #[test]
361    fn from_iter() {
362        let buffer = FuturesOrderedBounded::from_iter((0..10).map(|_| ready(())));
363
364        assert_eq!(buffer.len(), 10);
365        assert_eq!(buffer.size_hint(), (10, Some(10)));
366    }
367}