Skip to main content

windowed_futures/
lib.rs

1//! Helpers for windowed parallel execution of futures or tasks.
2//! 
3//! This provides helpers to spawn or join a set of futures or async tasks while limiting the number 
4//! of parallel operations to a configurable window size to avoid overwhelming the executor or
5//! target services.
6
7#![feature(noop_waker)]
8
9use std::{
10    marker::Unpin,
11    task::{Poll, Context},
12    pin::Pin,
13    boxed::Box,
14};
15
16use futures::{Future, FutureExt};
17use log::debug;
18
19/// Join a provided set of futures returning an unordered collection of results
20///
21/// This is provided for equivalence with [futures::future::join_all], you may prefer [exec_windowed_unordered] which
22/// will create each future at the time of execution instead of upfront.
23pub async fn join_windowed_unordered<F: Future>(window: usize, futures: impl Iterator<Item=F>) -> Vec<<F as Future>::Output> {
24    exec_windowed_unordered(window, futures, |f| f).await
25}
26
27/// Join a provided set of futures returning an collection of results matching the order of the input iterator
28/// 
29/// This is provided for equivalence with [futures::future::join_all], you may prefer [exec_windowed_ordered] which
30/// will create each future at the time of execution instead of upfront.
31pub async fn join_windowed_ordered<F: Future>(window: usize, futures: impl Iterator<Item=F>) -> Vec<<F as Future>::Output> {
32    // Execute windowing future with enumerated results
33    exec_windowed_ordered(window, futures, |f| f).await
34}
35
36/// Execute an async closure for a provided set of inputs returning an unordered collection of results
37pub async fn exec_windowed_unordered<I, R: Future>(window: usize, inputs: impl Iterator<Item=I>, f: impl Fn(I) -> R) -> Vec<<R as Future>::Output> {
38    let w = FutureWindow::new(window, inputs, f);
39    w.await
40}
41
42/// Execute an async closure for a provided set of inputs returning an collection of results matching the order of the input iterator
43pub async fn exec_windowed_ordered<I, R: Future>(window: usize, inputs: impl Iterator<Item=I>, f: impl Fn(I) -> R) -> Vec<<R as Future>::Output> {
44    // Execute windowing future with enumerated results
45    let w = FutureWindow::new(window, inputs.enumerate(), |(n, i)| {
46        let t = f(i);
47        Box::pin(async move {
48            let r = t.await;
49            (n, r)
50        })
51    });
52    let mut r = w.await;
53
54    // Re-sort results and drop indexes
55    r.sort_by_key(|(n, _r)| *n);
56    r.drain(..).map(|(_n, r)| r).collect()
57}
58
59/// Resolve a collection of futures using a window of maximum parallel tasks
60pub struct FutureWindow<I: Iterator, R: Future, F: Fn(<I as Iterator>::Item) -> R> {
61    /// Parallelisation window size (maximum number of parallel tasks)
62    window: usize,
63
64    /// Iterator over inputs for parallel operation
65    inputs: I,
66
67    /// Closure to be executed per iterator item to create a pollable future
68    f: F,
69
70    /// Currently executing futures
71    current: Vec<Pin<Box<R>>>,
72
73    /// Completed executor results
74    results: Vec<<R as Future>::Output>,
75
76    /// Task counter
77    count: usize,
78}
79
80
81impl <I: Iterator, R: Future, F: Fn(<I as Iterator>::Item) -> R> Unpin for FutureWindow<I, R, F> {}
82
83impl <I: Iterator, R: Future, F: Fn(<I as Iterator>::Item) -> R> FutureWindow<I, R, F> {
84    /// Create a new windowing future over an iterator of inputs with a function
85    /// translating inputs into futures for execution.
86    ///
87    /// This will execute each future ensuring only n futures are executed in parallel,
88    /// resolving into an **unsorted** array of results.
89    pub fn new(n: usize, inputs: I, f: F) -> Self {
90        Self {
91            window: n,
92            inputs,
93            f,
94            current: Vec::new(),
95            results: Vec::new(),
96            count: 0,
97        }
98    }
99
100    fn update(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Vec<<R as Future>::Output>> {
101        let mut pending_tasks = true;
102
103        println!("poll!");
104
105        // Ensure we're running n tasks
106        while self.current.len() < self.window {
107            match self.inputs.next() {
108                // If we have remaining inputs, start tasks
109                Some(v) => {
110                    println!("new task {}", self.count);
111
112                    let f = (self.f)(v);
113                    self.current.push(Box::pin(f));
114                    self.count += 1;
115
116                    // Ensure we wake any time we add a new task
117                    cx.waker().clone().wake();
118                },
119                // Otherwise, skip this and set flag indicating no new tasks are available
120                None => {
121                    println!("no pending tasks");
122                    pending_tasks = false;
123                    break;
124                },
125            }
126        }
127
128        // Poll for completion of current tasks
129        let mut current: Vec<_> = self.current.drain(..).collect();
130        for mut c in current.drain(..) {
131            // Handle completion or re-stack for next future
132            match c.poll_unpin(cx) {
133                Poll::Ready(v) => {
134                    println!("completed task");
135                    // Store result and drop future
136                    self.results.push(v);
137                },
138                Poll::Pending => {
139                    // Keep tracking future
140                    self.current.push(c);
141                },
142            }
143        }
144
145        // Complete when we have no pending tasks and the current list is empty
146        if self.current.is_empty() && !pending_tasks {
147            debug!("{} tasks complete", self.results.len());
148            Poll::Ready(self.results.drain(..).collect())
149
150        // Force wake if any tasks have completed but we still have some pending
151        } else if self.current.len() < self.window && pending_tasks {
152            cx.waker().clone().wake();
153            Poll::Pending
154
155        } else {
156            Poll::Pending
157        }
158    }
159}
160
161/// [Future] implementation for [FutureWindow]
162impl <I: Iterator, R: Future, F: Fn(<I as Iterator>::Item) -> R> Future for FutureWindow<I, R, F> {
163    type Output = Vec<<R as Future>::Output>;
164
165    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
166       Self::update(&mut self.as_mut(), cx)
167    }
168}
169
170
171#[cfg(test)]
172mod tests {
173    use std::task::Waker;
174
175    use super::*;
176
177    /// Poll-able helper that returns index after a configurable number of polls
178    struct NPollMan {
179        index: usize,
180        polls: usize,
181    }
182
183    impl NPollMan {
184        /// [NPollMan] that returns after one poll
185        fn one_poll(index: usize) -> Self {
186            Self{ index, polls: 1 }
187        }
188
189        /// [NPollMan] that returns after N polls
190        fn n_poll(index: usize, polls: usize) -> Self {
191            Self{ index, polls }
192        }
193    }
194
195    impl Future for NPollMan {
196        type Output = usize;
197
198        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
199            // Check remaining number of polls
200            match self.polls == 0 {
201                // Return ready when completed
202                true => Poll::Ready(self.index),
203                // Otherwise decrement and return pending
204                false => {
205                    self.as_mut().polls -= 1;
206                    cx.waker().clone().wake();
207
208                    Poll::Pending
209                }
210            }
211        }
212    }
213
214    async fn one_poll(index: usize) -> usize {
215        let o = NPollMan::one_poll(index);
216        o.await
217    }
218
219    #[test]
220    fn test_window_internals() {
221        let waker = Waker::noop();
222        let mut ctx = Context::from_waker(&waker);
223
224        let n = 2;
225
226        // Create unordered windowing executor
227        let mut w = FutureWindow::new(n, 0..5, |n| one_poll(n) );
228
229        // Check we're not yet executing anything
230        assert_eq!(w.current.len(), 0);
231        assert_eq!(w.results.len(), 0);
232
233        // First poll should queue up n tasks to run
234        assert_eq!(w.poll_unpin(&mut ctx).is_pending(), true);
235        assert_eq!(w.current.len(), n);
236        assert_eq!(w.results.len(), 0);
237        
238        // Second poll should complete n tasks
239        assert_eq!(w.poll_unpin(&mut ctx).is_pending(), true);
240        assert_eq!(w.current.len(), 0);
241        assert_eq!(w.results.len(), n);
242
243        // Next poll should enqueue the next N tasks
244        assert_eq!(w.poll_unpin(&mut ctx).is_pending(), true);
245        assert_eq!(w.current.len(), n);
246        assert_eq!(w.results.len(), n);
247
248        // Next poll should complete the next N tasks
249        assert_eq!(w.poll_unpin(&mut ctx).is_pending(), true);
250        assert_eq!(w.current.len(), 0);
251        assert_eq!(w.results.len(), 2 * n);
252
253        // Next poll should enqueue the final task
254        assert_eq!(w.poll_unpin(&mut ctx).is_pending(), true);
255        assert_eq!(w.current.len(), 1);
256        assert_eq!(w.results.len(), 2 * n);
257
258        // Next poll should complete the final task, returning the result
259        let r = w.poll_unpin(&mut ctx);
260        assert_eq!(r.is_pending(), false);
261        assert_eq!(w.current.len(), 0);
262        assert_eq!(r, Poll::Ready(vec![0, 1, 2, 3, 4]));
263    }
264
265    async fn reverse_poll(index: usize, count: usize) -> usize {
266        let o = NPollMan::n_poll(index, count + 1 - index);
267        o.await
268    }
269
270    #[tokio::test]
271    async fn test_window_ordered() {
272        // Check unordered execution
273        let w = exec_windowed_unordered(2, 0..5, |n| reverse_poll(n, 5) ).await;
274        assert_ne!(w, vec![0, 1, 2, 3, 4]);
275
276        // Check ordered execution
277        let w = exec_windowed_ordered(2, 0..5, |n| reverse_poll(n, 5) ).await;
278        assert_eq!(w, vec![0, 1, 2, 3, 4]);
279    }
280}