pollable_map/futures/
ordered.rs

1use futures::Stream;
2use std::collections::VecDeque;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll, Waker};
6
7/// An unbounded queue of futures imposed a FIFO order while polling one future at a time
8/// and returning the output to stream before popping the next future in queue to be polled.
9pub struct OrderedFutureSet<F> {
10    queue: VecDeque<F>,
11    current_future: Option<F>,
12    waker: Option<Waker>,
13}
14
15impl<F> Default for OrderedFutureSet<F> {
16    fn default() -> Self {
17        Self {
18            queue: VecDeque::new(),
19            current_future: None,
20            waker: None,
21        }
22    }
23}
24
25impl<F> OrderedFutureSet<F> {
26    /// Constructs a new, empty [`OrderedFutureSet`]
27    pub fn new() -> Self {
28        Self::default()
29    }
30
31    /// Furshes a future to the back of the queue
32    pub fn push(&mut self, fut: F) {
33        self.queue.push_back(fut);
34        if let Some(waker) = self.waker.take() {
35            waker.wake();
36        }
37    }
38}
39
40impl<F> FromIterator<F> for OrderedFutureSet<F>
41where
42    F: Future + Send + Unpin + 'static,
43{
44    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
45        let mut ordered = Self::new();
46        for fut in iter {
47            ordered.push(fut);
48        }
49        ordered
50    }
51}
52
53impl<F> Stream for OrderedFutureSet<F>
54where
55    F: Future + Send + Unpin + 'static,
56{
57    type Item = F::Output;
58    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
59        let this = &mut *self;
60
61        loop {
62            if this.current_future.is_none() {
63                let Some(fut) = this.queue.pop_front() else {
64                    break;
65                };
66                this.current_future.replace(fut);
67            }
68
69            match this.current_future.as_mut() {
70                Some(fut) => {
71                    let output = futures::ready!(Pin::new(fut).poll(cx));
72                    this.current_future.take();
73                    cx.waker().wake_by_ref();
74                    return Poll::Ready(Some(output));
75                }
76                None => {
77                    this.waker.replace(cx.waker().clone());
78                }
79            }
80        }
81
82        this.waker.replace(cx.waker().clone());
83        Poll::Pending
84    }
85
86    fn size_hint(&self) -> (usize, Option<usize>) {
87        (self.queue.len(), None)
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use crate::futures::ordered::OrderedFutureSet;
94    use futures::{FutureExt, StreamExt};
95
96    #[test]
97    fn fifo_futures() {
98        futures::executor::block_on(async move {
99            let mut fifo = OrderedFutureSet::new();
100            fifo.push(futures::future::ready(1));
101            fifo.push(futures::future::ready(2));
102            fifo.push(futures::future::ready(4));
103            fifo.push(futures::future::ready(3));
104
105            let items = fifo.take(4).collect::<Vec<u8>>().now_or_never().unwrap();
106
107            assert_eq!(items, vec![1, 2, 4, 3]);
108        });
109    }
110}