pollable_map/futures/
ordered.rs1use futures::Stream;
2use std::collections::VecDeque;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll, Waker};
6
7pub struct OrderedFutureSet<F> {
10 queue: VecDeque<F>,
11 current_future: Option<F>,
12 waker: Option<Waker>,
13}
14
15impl<F> Default for OrderedFutureSet<F> {
16 fn default() -> Self {
17 Self {
18 queue: VecDeque::new(),
19 current_future: None,
20 waker: None,
21 }
22 }
23}
24
25impl<F> OrderedFutureSet<F> {
26 pub fn new() -> Self {
28 Self::default()
29 }
30
31 pub fn push(&mut self, fut: F) {
33 self.queue.push_back(fut);
34 if let Some(waker) = self.waker.take() {
35 waker.wake();
36 }
37 }
38
39 pub fn pop_front(&mut self) -> Option<F> {
41 let fut = self.queue.pop_front();
42 if let Some(waker) = self.waker.take() {
43 waker.wake();
44 }
45 fut
46 }
47
48 pub fn pop_back(&mut self) -> Option<F> {
50 let fut = self.queue.pop_back();
51 if let Some(waker) = self.waker.take() {
52 waker.wake();
53 }
54 fut
55 }
56}
57
58impl<F> FromIterator<F> for OrderedFutureSet<F>
59where
60 F: Future + Send + Unpin + 'static,
61{
62 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
63 let mut ordered = Self::new();
64 for fut in iter {
65 ordered.push(fut);
66 }
67 ordered
68 }
69}
70
71impl<F> Stream for OrderedFutureSet<F>
72where
73 F: Future + Send + Unpin + 'static,
74{
75 type Item = F::Output;
76 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
77 let this = &mut *self;
78
79 loop {
80 if this.current_future.is_none() {
81 let Some(fut) = this.queue.pop_front() else {
82 break;
83 };
84 this.current_future.replace(fut);
85 }
86
87 match this.current_future.as_mut() {
88 Some(fut) => {
89 let output = futures::ready!(Pin::new(fut).poll(cx));
90 this.current_future.take();
91 cx.waker().wake_by_ref();
92 return Poll::Ready(Some(output));
93 }
94 None => {
95 this.waker.replace(cx.waker().clone());
96 }
97 }
98 }
99
100 this.waker.replace(cx.waker().clone());
101 Poll::Pending
102 }
103
104 fn size_hint(&self) -> (usize, Option<usize>) {
105 (self.queue.len(), None)
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use crate::futures::ordered::OrderedFutureSet;
112 use futures::StreamExt;
113
114 #[test]
115 fn fifo_futures() {
116 futures::executor::block_on(async move {
117 let mut fifo = OrderedFutureSet::new();
118 fifo.push(futures::future::ready(1));
119 fifo.push(futures::future::ready(2));
120 fifo.push(futures::future::ready(4));
121 fifo.push(futures::future::ready(3));
122
123 let items = fifo.take(4).collect::<Vec<u8>>().await;
124
125 assert_eq!(items, vec![1, 2, 4, 3]);
126 });
127 }
128
129 #[test]
130 fn remove_front_entry() {
131 futures::executor::block_on(async move {
132 let mut fifo = OrderedFutureSet::new();
133 fifo.push(futures::future::ready(1));
134 fifo.push(futures::future::ready(2));
135 fifo.push(futures::future::ready(4));
136 fifo.push(futures::future::ready(3));
137
138 let front_fut = fifo.pop_front();
139 assert!(front_fut.is_some());
142
143 let items = fifo.take(3).collect::<Vec<u8>>().await;
144
145 assert_eq!(items, vec![2, 4, 3]);
146 })
147 }
148
149 #[test]
150 fn remove_back_entry() {
151 futures::executor::block_on(async move {
152 let mut fifo = OrderedFutureSet::new();
153 fifo.push(futures::future::ready(1));
154 fifo.push(futures::future::ready(2));
155 fifo.push(futures::future::ready(4));
156 fifo.push(futures::future::ready(3));
157
158 let front_fut = fifo.pop_back();
159 assert!(front_fut.is_some());
162
163 let items = fifo.take(3).collect::<Vec<u8>>().await;
164
165 assert_eq!(items, vec![1, 2, 4]);
166 })
167 }
168}