promising_future/
util.rs

1use std::iter::FromIterator;
2
3use futurestream::FutureStream;
4use spawner::{Spawner, ThreadSpawner};
5use super::{future_promise, Future};
6
7use Pollresult::*;
8
9/// Return first available `Future` from an iterator of `Future`s.
10///
11/// Given an iterator producing a stream of `Future<T>` values, return the first resolved value.
12/// All other values are discarded.  `Future`s which resolve without values are ignored.
13///
14/// Note 1: This lazily consumes the input iterator so it can be infinite. However each unresolved `Future`
15/// takes memory so `Future`s should resolve in bounded time (they need not resolve with values;
16/// valueless `Future`s are discarded).
17///
18/// Note 2: `futures.into_iter()` should avoid blocking, as that will block this function even if
19/// other `Future`s resolve. (FIXME)
20pub fn any<T, I>(futures: I) -> Option<T>
21    where I: IntoIterator<Item=Future<T>>, T: Send + 'static
22{
23    let stream = FutureStream::new();
24
25    // XXX TODO need way to select on futures.into_iter() and stream.wait()...
26    for fut in futures {
27        // Check to see if future has already resolved; if it has a value return it immediately, or
28        // discard it if it never will. Otherwise, if its unresolved, add it to the stream.
29        match fut.poll() {
30            Unresolved(fut) => stream.add(fut), // add to stream
31            Resolved(v@Some(_)) => return v,    // return value
32            Resolved(None) => (),               // skip
33        };
34
35        // Check to see if anything has become resolved
36        while let Some(fut) = stream.poll() {
37            match fut.poll() {
38                Unresolved(_) => panic!("FutureStream.poll returned unresolved Future"),
39                Resolved(v@Some(_)) => return v,
40                Resolved(None) => (),
41            }
42        }
43    }
44
45    // Consumed whole input iterator, wait for something to finish
46    while let Some(fut) = stream.wait() {
47        match fut.poll() {
48            Unresolved(_) => panic!("FutureStream.wait returned unresolved Future"),
49            Resolved(v@Some(_)) => return v,
50            Resolved(None) => (),
51        }
52    }
53    None
54}
55
56/// Return a Future of all values in an iterator of `Future`s.
57///
58/// Take an iterator producing `Future<T>` values and return a `Future<Vec<T>>`. The elements in the
59/// returned vector is undefined; typically it will be the order in which they resolved.
60///
61/// This function is non-blocking; the blocking occurs within a thread. Pass a type which implements
62/// `Spawner` which is used to produce the thread.
63pub fn all_with<T, I, S, F>(futures: I, spawner: S) -> Future<F>
64    where S: Spawner,
65          T: Send + 'static,
66          I: IntoIterator<Item=Future<T>> + Send + 'static,
67          F: FromIterator<T> + Send + 'static
68{
69    let (f, p) = future_promise();
70    let stream = FutureStream::from_iter(futures);
71
72    spawner.spawn(move || {
73        let waiter = stream.waiter();
74        p.set(waiter.into_iter().collect());
75    });
76
77    f
78}
79
80/// Return a Future of all values in an iterator of `Future`s.
81///
82/// Take an iterator producing `Future<T>` values and return a `Future<Vec<T>>`.
83///
84/// This function is non-blocking; the blocking occurs within a thread. This uses
85/// `std::thread::spawn()` to create the thread needed to block.
86pub fn all<T, I, F>(futures: I) -> Future<F>
87    where T: Send + 'static,
88          I: IntoIterator<Item=Future<T>> + Send + 'static,
89          F: FromIterator<T> + Send + 'static
90{
91    all_with(futures, ThreadSpawner)
92}