pollable_map/futures/
ordered.rs

1use futures::Stream;
2use std::collections::VecDeque;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll, Waker};
6
7/// An unbounded queue of futures imposed a FIFO order while polling one future at a time
8/// and returning the output to stream before popping the next future in queue to be polled.
9pub struct OrderedFutureSet<F> {
10    queue: VecDeque<F>,
11    current_future: Option<F>,
12    waker: Option<Waker>,
13}
14
15impl<F> Default for OrderedFutureSet<F> {
16    fn default() -> Self {
17        Self {
18            queue: VecDeque::new(),
19            current_future: None,
20            waker: None,
21        }
22    }
23}
24
25impl<F> OrderedFutureSet<F> {
26    /// Constructs a new, empty [`OrderedFutureSet`]
27    pub fn new() -> Self {
28        Self::default()
29    }
30
31    /// Push a future to the back of the queue
32    pub fn push(&mut self, fut: F) {
33        self.queue.push_back(fut);
34        if let Some(waker) = self.waker.take() {
35            waker.wake();
36        }
37    }
38
39    /// Remove a future from the front of the queue
40    pub fn pop_front(&mut self) -> Option<F> {
41        let fut = self.queue.pop_front();
42        if let Some(waker) = self.waker.take() {
43            waker.wake();
44        }
45        fut
46    }
47
48    /// Remove a future from the back of the queue
49    pub fn pop_back(&mut self) -> Option<F> {
50        let fut = self.queue.pop_back();
51        if let Some(waker) = self.waker.take() {
52            waker.wake();
53        }
54        fut
55    }
56}
57
58impl<F> FromIterator<F> for OrderedFutureSet<F>
59where
60    F: Future + Send + Unpin + 'static,
61{
62    fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
63        let mut ordered = Self::new();
64        for fut in iter {
65            ordered.push(fut);
66        }
67        ordered
68    }
69}
70
71impl<F> Stream for OrderedFutureSet<F>
72where
73    F: Future + Send + Unpin + 'static,
74{
75    type Item = F::Output;
76    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
77        let this = &mut *self;
78
79        loop {
80            if this.current_future.is_none() {
81                let Some(fut) = this.queue.pop_front() else {
82                    break;
83                };
84                this.current_future.replace(fut);
85            }
86
87            match this.current_future.as_mut() {
88                Some(fut) => {
89                    let output = futures::ready!(Pin::new(fut).poll(cx));
90                    this.current_future.take();
91                    cx.waker().wake_by_ref();
92                    return Poll::Ready(Some(output));
93                }
94                None => {
95                    this.waker.replace(cx.waker().clone());
96                }
97            }
98        }
99
100        this.waker.replace(cx.waker().clone());
101        Poll::Pending
102    }
103
104    fn size_hint(&self) -> (usize, Option<usize>) {
105        (self.queue.len(), None)
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use crate::futures::ordered::OrderedFutureSet;
112    use futures::StreamExt;
113
114    #[test]
115    fn fifo_futures() {
116        futures::executor::block_on(async move {
117            let mut fifo = OrderedFutureSet::new();
118            fifo.push(futures::future::ready(1));
119            fifo.push(futures::future::ready(2));
120            fifo.push(futures::future::ready(4));
121            fifo.push(futures::future::ready(3));
122
123            let items = fifo.take(4).collect::<Vec<u8>>().await;
124
125            assert_eq!(items, vec![1, 2, 4, 3]);
126        });
127    }
128
129    #[test]
130    fn remove_front_entry() {
131        futures::executor::block_on(async move {
132            let mut fifo = OrderedFutureSet::new();
133            fifo.push(futures::future::ready(1));
134            fifo.push(futures::future::ready(2));
135            fifo.push(futures::future::ready(4));
136            fifo.push(futures::future::ready(3));
137
138            let front_fut = fifo.pop_front();
139            // TODO: Write a `Ready` future that supports `Eq` and `PartialEq` for tests
140            //       to use `assert_eq(front_fut, Some(futures::future::ready(1)));`
141            assert!(front_fut.is_some());
142
143            let items = fifo.take(3).collect::<Vec<u8>>().await;
144
145            assert_eq!(items, vec![2, 4, 3]);
146        })
147    }
148
149    #[test]
150    fn remove_back_entry() {
151        futures::executor::block_on(async move {
152            let mut fifo = OrderedFutureSet::new();
153            fifo.push(futures::future::ready(1));
154            fifo.push(futures::future::ready(2));
155            fifo.push(futures::future::ready(4));
156            fifo.push(futures::future::ready(3));
157
158            let front_fut = fifo.pop_back();
159            // TODO: Write a `Ready` future that supports `Eq` and `PartialEq` for tests
160            //       to use `assert_eq(front_fut, Some(futures::future::ready(3)));`
161            assert!(front_fut.is_some());
162
163            let items = fifo.take(3).collect::<Vec<u8>>().await;
164
165            assert_eq!(items, vec![1, 2, 4]);
166        })
167    }
168}