1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
use future::{Future, IntoFuture};
use stream::Stream;
use poll::Poll;
use Async;
use stack::{Stack, Drain};
use std::sync::Arc;
use task::{self, UnparkEvent};

use std::prelude::v1::*;

/// An adaptor for a stream of futures to execute the futures concurrently, if
/// possible, delivering results as they become available.
///
/// This adaptor will return their results in the order that they complete.
/// This is created by the `futures` method.
///
#[must_use = "streams do nothing unless polled"]
pub struct FuturesUnordered<F>
    where F: Future
{
    futures: Vec<Option<F>>,
    stack: Arc<Stack<usize>>,
    pending: Option<Drain<usize>>,
    active: usize,
}

/// Converts a list of futures into a `Stream` of results from the futures.
///
/// This function will take an list of futures (e.g. a vector, an iterator,
/// etc), and return a stream. The stream will yield items as they become
/// available on the futures internally, in the order that they become
/// available. This function is similar to `buffer_unordered` in that it may
/// return items in a different order than in the list specified.
pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
    where I: IntoIterator,
          I::Item: IntoFuture
{
    let futures = futures.into_iter()
                         .map(IntoFuture::into_future)
                         .map(Some)
                         .collect::<Vec<_>>();
    let stack = Arc::new(Stack::new());
    for i in 0..futures.len() {
        stack.push(i);
    }
    FuturesUnordered {
        active: futures.len(),
        futures: futures,
        pending: None,
        stack: stack,
    }
}

impl<F> FuturesUnordered<F>
    where F: Future
{
    fn poll_pending(&mut self, mut drain: Drain<usize>)
                    -> Option<Poll<Option<F::Item>, F::Error>> {
        while let Some(id) = drain.next() {
            // If this future was already done just skip the notification
            if self.futures[id].is_none() {
                continue
            }
            let event = UnparkEvent::new(self.stack.clone(), id);
            let ret = match task::with_unpark_event(event, || {
                self.futures[id]
                    .as_mut()
                    .unwrap()
                    .poll()
            }) {
                Ok(Async::NotReady) => continue,
                Ok(Async::Ready(val)) => Ok(Async::Ready(Some(val))),
                Err(e) => Err(e),
            };
            self.pending = Some(drain);
            self.active -= 1;
            self.futures[id] = None;
            return Some(ret)
        }
        None
    }
}

impl<F> Stream for FuturesUnordered<F>
    where F: Future
{
    type Item = F::Item;
    type Error = F::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        if self.active == 0 {
            return Ok(Async::Ready(None))
        }
        if let Some(drain) = self.pending.take() {
            if let Some(ret) = self.poll_pending(drain) {
                return ret
            }
        }
        let drain = self.stack.drain();
        if let Some(ret) = self.poll_pending(drain) {
            return ret
        }
        assert!(self.active > 0);
        Ok(Async::NotReady)
    }
}