1#![feature(noop_waker)]
8
9use std::{
10 marker::Unpin,
11 task::{Poll, Context},
12 pin::Pin,
13 boxed::Box,
14};
15
16use futures::{Future, FutureExt};
17use log::debug;
18
19pub async fn join_windowed_unordered<F: Future>(window: usize, futures: impl Iterator<Item=F>) -> Vec<<F as Future>::Output> {
24 exec_windowed_unordered(window, futures, |f| f).await
25}
26
27pub async fn join_windowed_ordered<F: Future>(window: usize, futures: impl Iterator<Item=F>) -> Vec<<F as Future>::Output> {
32 exec_windowed_ordered(window, futures, |f| f).await
34}
35
36pub async fn exec_windowed_unordered<I, R: Future>(window: usize, inputs: impl Iterator<Item=I>, f: impl Fn(I) -> R) -> Vec<<R as Future>::Output> {
38 let w = FutureWindow::new(window, inputs, f);
39 w.await
40}
41
42pub async fn exec_windowed_ordered<I, R: Future>(window: usize, inputs: impl Iterator<Item=I>, f: impl Fn(I) -> R) -> Vec<<R as Future>::Output> {
44 let w = FutureWindow::new(window, inputs.enumerate(), |(n, i)| {
46 let t = f(i);
47 Box::pin(async move {
48 let r = t.await;
49 (n, r)
50 })
51 });
52 let mut r = w.await;
53
54 r.sort_by_key(|(n, _r)| *n);
56 r.drain(..).map(|(_n, r)| r).collect()
57}
58
59pub struct FutureWindow<I: Iterator, R: Future, F: Fn(<I as Iterator>::Item) -> R> {
61 window: usize,
63
64 inputs: I,
66
67 f: F,
69
70 current: Vec<Pin<Box<R>>>,
72
73 results: Vec<<R as Future>::Output>,
75
76 count: usize,
78}
79
80
81impl <I: Iterator, R: Future, F: Fn(<I as Iterator>::Item) -> R> Unpin for FutureWindow<I, R, F> {}
82
83impl <I: Iterator, R: Future, F: Fn(<I as Iterator>::Item) -> R> FutureWindow<I, R, F> {
84 pub fn new(n: usize, inputs: I, f: F) -> Self {
90 Self {
91 window: n,
92 inputs,
93 f,
94 current: Vec::new(),
95 results: Vec::new(),
96 count: 0,
97 }
98 }
99
100 fn update(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Vec<<R as Future>::Output>> {
101 let mut pending_tasks = true;
102
103 println!("poll!");
104
105 while self.current.len() < self.window {
107 match self.inputs.next() {
108 Some(v) => {
110 println!("new task {}", self.count);
111
112 let f = (self.f)(v);
113 self.current.push(Box::pin(f));
114 self.count += 1;
115
116 cx.waker().clone().wake();
118 },
119 None => {
121 println!("no pending tasks");
122 pending_tasks = false;
123 break;
124 },
125 }
126 }
127
128 let mut current: Vec<_> = self.current.drain(..).collect();
130 for mut c in current.drain(..) {
131 match c.poll_unpin(cx) {
133 Poll::Ready(v) => {
134 println!("completed task");
135 self.results.push(v);
137 },
138 Poll::Pending => {
139 self.current.push(c);
141 },
142 }
143 }
144
145 if self.current.is_empty() && !pending_tasks {
147 debug!("{} tasks complete", self.results.len());
148 Poll::Ready(self.results.drain(..).collect())
149
150 } else if self.current.len() < self.window && pending_tasks {
152 cx.waker().clone().wake();
153 Poll::Pending
154
155 } else {
156 Poll::Pending
157 }
158 }
159}
160
161impl <I: Iterator, R: Future, F: Fn(<I as Iterator>::Item) -> R> Future for FutureWindow<I, R, F> {
163 type Output = Vec<<R as Future>::Output>;
164
165 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
166 Self::update(&mut self.as_mut(), cx)
167 }
168}
169
170
171#[cfg(test)]
172mod tests {
173 use std::task::Waker;
174
175 use super::*;
176
177 struct NPollMan {
179 index: usize,
180 polls: usize,
181 }
182
183 impl NPollMan {
184 fn one_poll(index: usize) -> Self {
186 Self{ index, polls: 1 }
187 }
188
189 fn n_poll(index: usize, polls: usize) -> Self {
191 Self{ index, polls }
192 }
193 }
194
195 impl Future for NPollMan {
196 type Output = usize;
197
198 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
199 match self.polls == 0 {
201 true => Poll::Ready(self.index),
203 false => {
205 self.as_mut().polls -= 1;
206 cx.waker().clone().wake();
207
208 Poll::Pending
209 }
210 }
211 }
212 }
213
214 async fn one_poll(index: usize) -> usize {
215 let o = NPollMan::one_poll(index);
216 o.await
217 }
218
219 #[test]
220 fn test_window_internals() {
221 let waker = Waker::noop();
222 let mut ctx = Context::from_waker(&waker);
223
224 let n = 2;
225
226 let mut w = FutureWindow::new(n, 0..5, |n| one_poll(n) );
228
229 assert_eq!(w.current.len(), 0);
231 assert_eq!(w.results.len(), 0);
232
233 assert_eq!(w.poll_unpin(&mut ctx).is_pending(), true);
235 assert_eq!(w.current.len(), n);
236 assert_eq!(w.results.len(), 0);
237
238 assert_eq!(w.poll_unpin(&mut ctx).is_pending(), true);
240 assert_eq!(w.current.len(), 0);
241 assert_eq!(w.results.len(), n);
242
243 assert_eq!(w.poll_unpin(&mut ctx).is_pending(), true);
245 assert_eq!(w.current.len(), n);
246 assert_eq!(w.results.len(), n);
247
248 assert_eq!(w.poll_unpin(&mut ctx).is_pending(), true);
250 assert_eq!(w.current.len(), 0);
251 assert_eq!(w.results.len(), 2 * n);
252
253 assert_eq!(w.poll_unpin(&mut ctx).is_pending(), true);
255 assert_eq!(w.current.len(), 1);
256 assert_eq!(w.results.len(), 2 * n);
257
258 let r = w.poll_unpin(&mut ctx);
260 assert_eq!(r.is_pending(), false);
261 assert_eq!(w.current.len(), 0);
262 assert_eq!(r, Poll::Ready(vec![0, 1, 2, 3, 4]));
263 }
264
265 async fn reverse_poll(index: usize, count: usize) -> usize {
266 let o = NPollMan::n_poll(index, count + 1 - index);
267 o.await
268 }
269
270 #[tokio::test]
271 async fn test_window_ordered() {
272 let w = exec_windowed_unordered(2, 0..5, |n| reverse_poll(n, 5) ).await;
274 assert_ne!(w, vec![0, 1, 2, 3, 4]);
275
276 let w = exec_windowed_ordered(2, 0..5, |n| reverse_poll(n, 5) ).await;
278 assert_eq!(w, vec![0, 1, 2, 3, 4]);
279 }
280}