promising_future/util.rs
1use std::iter::FromIterator;
2
3use futurestream::FutureStream;
4use spawner::{Spawner, ThreadSpawner};
5use super::{future_promise, Future};
6
7use Pollresult::*;
8
9/// Return first available `Future` from an iterator of `Future`s.
10///
11/// Given an iterator producing a stream of `Future<T>` values, return the first resolved value.
12/// All other values are discarded. `Future`s which resolve without values are ignored.
13///
14/// Note 1: This lazily consumes the input iterator so it can be infinite. However each unresolved `Future`
15/// takes memory so `Future`s should resolve in bounded time (they need not resolve with values;
16/// valueless `Future`s are discarded).
17///
18/// Note 2: `futures.into_iter()` should avoid blocking, as that will block this function even if
19/// other `Future`s resolve. (FIXME)
20pub fn any<T, I>(futures: I) -> Option<T>
21 where I: IntoIterator<Item=Future<T>>, T: Send + 'static
22{
23 let stream = FutureStream::new();
24
25 // XXX TODO need way to select on futures.into_iter() and stream.wait()...
26 for fut in futures {
27 // Check to see if future has already resolved; if it has a value return it immediately, or
28 // discard it if it never will. Otherwise, if its unresolved, add it to the stream.
29 match fut.poll() {
30 Unresolved(fut) => stream.add(fut), // add to stream
31 Resolved(v@Some(_)) => return v, // return value
32 Resolved(None) => (), // skip
33 };
34
35 // Check to see if anything has become resolved
36 while let Some(fut) = stream.poll() {
37 match fut.poll() {
38 Unresolved(_) => panic!("FutureStream.poll returned unresolved Future"),
39 Resolved(v@Some(_)) => return v,
40 Resolved(None) => (),
41 }
42 }
43 }
44
45 // Consumed whole input iterator, wait for something to finish
46 while let Some(fut) = stream.wait() {
47 match fut.poll() {
48 Unresolved(_) => panic!("FutureStream.wait returned unresolved Future"),
49 Resolved(v@Some(_)) => return v,
50 Resolved(None) => (),
51 }
52 }
53 None
54}
55
56/// Return a Future of all values in an iterator of `Future`s.
57///
58/// Take an iterator producing `Future<T>` values and return a `Future<Vec<T>>`. The elements in the
59/// returned vector is undefined; typically it will be the order in which they resolved.
60///
61/// This function is non-blocking; the blocking occurs within a thread. Pass a type which implements
62/// `Spawner` which is used to produce the thread.
63pub fn all_with<T, I, S, F>(futures: I, spawner: S) -> Future<F>
64 where S: Spawner,
65 T: Send + 'static,
66 I: IntoIterator<Item=Future<T>> + Send + 'static,
67 F: FromIterator<T> + Send + 'static
68{
69 let (f, p) = future_promise();
70 let stream = FutureStream::from_iter(futures);
71
72 spawner.spawn(move || {
73 let waiter = stream.waiter();
74 p.set(waiter.into_iter().collect());
75 });
76
77 f
78}
79
80/// Return a Future of all values in an iterator of `Future`s.
81///
82/// Take an iterator producing `Future<T>` values and return a `Future<Vec<T>>`.
83///
84/// This function is non-blocking; the blocking occurs within a thread. This uses
85/// `std::thread::spawn()` to create the thread needed to block.
86pub fn all<T, I, F>(futures: I) -> Future<F>
87 where T: Send + 'static,
88 I: IntoIterator<Item=Future<T>> + Send + 'static,
89 F: FromIterator<T> + Send + 'static
90{
91 all_with(futures, ThreadSpawner)
92}