pollable_map/futures/
ordered.rs1use futures::Stream;
2use std::collections::VecDeque;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll, Waker};
6
7pub struct OrderedFutureSet<F> {
10 queue: VecDeque<F>,
11 current_future: Option<F>,
12 waker: Option<Waker>,
13}
14
15impl<F> Default for OrderedFutureSet<F> {
16 fn default() -> Self {
17 Self {
18 queue: VecDeque::new(),
19 current_future: None,
20 waker: None,
21 }
22 }
23}
24
25impl<F> OrderedFutureSet<F> {
26 pub fn new() -> Self {
28 Self::default()
29 }
30
31 pub fn push(&mut self, fut: F) {
33 self.queue.push_back(fut);
34 if let Some(waker) = self.waker.take() {
35 waker.wake();
36 }
37 }
38}
39
40impl<F> FromIterator<F> for OrderedFutureSet<F>
41where
42 F: Future + Send + Unpin + 'static,
43{
44 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
45 let mut ordered = Self::new();
46 for fut in iter {
47 ordered.push(fut);
48 }
49 ordered
50 }
51}
52
53impl<F> Stream for OrderedFutureSet<F>
54where
55 F: Future + Send + Unpin + 'static,
56{
57 type Item = F::Output;
58 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
59 let this = &mut *self;
60
61 loop {
62 if this.current_future.is_none() {
63 let Some(fut) = this.queue.pop_front() else {
64 break;
65 };
66 this.current_future.replace(fut);
67 }
68
69 match this.current_future.as_mut() {
70 Some(fut) => {
71 let output = futures::ready!(Pin::new(fut).poll(cx));
72 this.current_future.take();
73 cx.waker().wake_by_ref();
74 return Poll::Ready(Some(output));
75 }
76 None => {
77 this.waker.replace(cx.waker().clone());
78 }
79 }
80 }
81
82 this.waker.replace(cx.waker().clone());
83 Poll::Pending
84 }
85
86 fn size_hint(&self) -> (usize, Option<usize>) {
87 (self.queue.len(), None)
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use crate::futures::ordered::OrderedFutureSet;
94 use futures::{FutureExt, StreamExt};
95
96 #[test]
97 fn fifo_futures() {
98 futures::executor::block_on(async move {
99 let mut fifo = OrderedFutureSet::new();
100 fifo.push(futures::future::ready(1));
101 fifo.push(futures::future::ready(2));
102 fifo.push(futures::future::ready(4));
103 fifo.push(futures::future::ready(3));
104
105 let items = fifo.take(4).collect::<Vec<u8>>().now_or_never().unwrap();
106
107 assert_eq!(items, vec![1, 2, 4, 3]);
108 });
109 }
110}