1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use std::iter::FromIterator;

use futurestream::FutureStream;
use spawner::{Spawner, ThreadSpawner};
use super::{future_promise, Future};

use Pollresult::*;

/// Return first available `Future` from an iterator of `Future`s.
///
/// Given an iterator producing a stream of `Future<T>` values, return the first resolved value.
/// All other values are discarded.  `Future`s which resolve without values are ignored.
///
/// Note 1: This lazily consumes the input iterator so it can be infinite. However each unresolved `Future`
/// takes memory so `Future`s should resolve in bounded time (they need not resolve with values;
/// valueless `Future`s are discarded).
///
/// Note 2: `futures.into_iter()` should avoid blocking, as that will block this function even if
/// other `Future`s resolve. (FIXME)
pub fn any<T, I>(futures: I) -> Option<T>
    where I: IntoIterator<Item=Future<T>>, T: Send + 'static
{
    let stream = FutureStream::new();

    // XXX TODO need way to select on futures.into_iter() and stream.wait()...
    for fut in futures {
        // Check to see if future has already resolved; if it has a value return it immediately, or
        // discard it if it never will. Otherwise, if its unresolved, add it to the stream.
        match fut.poll() {
            Unresolved(fut) => stream.add(fut), // add to stream
            Resolved(v@Some(_)) => return v,    // return value
            Resolved(None) => (),               // skip
        };

        // Check to see if anything has become resolved
        while let Some(fut) = stream.poll() {
            match fut.poll() {
                Unresolved(_) => panic!("FutureStream.poll returned unresolved Future"),
                Resolved(v@Some(_)) => return v,
                Resolved(None) => (),
            }
        }
    }

    // Consumed whole input iterator, wait for something to finish
    while let Some(fut) = stream.wait() {
        match fut.poll() {
            Unresolved(_) => panic!("FutureStream.wait returned unresolved Future"),
            Resolved(v@Some(_)) => return v,
            Resolved(None) => (),
        }
    }
    None
}

/// Return a Future of all values in an iterator of `Future`s.
///
/// Take an iterator producing `Future<T>` values and return a `Future<Vec<T>>`. The elements in the
/// returned vector is undefined; typically it will be the order in which they resolved.
///
/// This function is non-blocking; the blocking occurs within a thread. Pass a type which implements
/// `Spawner` which is used to produce the thread.
pub fn all_with<T, I, S, F>(futures: I, spawner: S) -> Future<F>
    where S: Spawner,
          T: Send + 'static,
          I: IntoIterator<Item=Future<T>> + Send + 'static,
          F: FromIterator<T> + Send + 'static
{
    let (f, p) = future_promise();
    let stream = FutureStream::from_iter(futures);

    spawner.spawn(move || {
        let waiter = stream.waiter();
        p.set(waiter.into_iter().collect());
    });

    f
}

/// Return a Future of all values in an iterator of `Future`s.
///
/// Take an iterator producing `Future<T>` values and return a `Future<Vec<T>>`.
///
/// This function is non-blocking; the blocking occurs within a thread. This uses
/// `std::thread::spawn()` to create the thread needed to block.
pub fn all<T, I, F>(futures: I) -> Future<F>
    where T: Send + 'static,
          I: IntoIterator<Item=Future<T>> + Send + 'static,
          F: FromIterator<T> + Send + 'static
{
    all_with(futures, ThreadSpawner)
}