futures_stream_select_all_send/
lib.rs

1extern crate futures;
2use futures::stream::{self, Stream};
3
4pub fn select_all<I, T, E>(streams: I) -> Box<Stream<Item = T, Error = E> + Send>
5    where I: IntoIterator,
6          I::Item: Stream<Item = T, Error = E> + Send + 'static,
7          T: 'static + Send,
8          E: 'static + Send,
9{
10    struct Level<T, E> {
11        power: usize,
12        stream: Box<Stream<Item = T, Error = E> + Send>,
13    }
14
15    let mut stack: Vec<Level<T, E>> = Vec::new();
16    for stream in streams {
17        let mut lev_a = Level { power: 0, stream: Box::new(stream), };
18        while stack.last().map(|l| lev_a.power == l.power).unwrap_or(false) {
19            let lev_b = stack.pop().unwrap();
20            lev_a = Level {
21                power: lev_b.power + 1,
22                stream: Box::new(lev_b.stream.select(lev_a.stream)),
23            }
24        }
25        stack.push(lev_a);
26    }
27
28    if let Some(tree_lev) = stack.pop() {
29        let mut tree = tree_lev.stream;
30        while let Some(node) = stack.pop() {
31            tree = Box::new(tree.select(node.stream))
32        }
33        tree
34    } else {
35        Box::new(stream::empty())
36    }
37}
38
39#[cfg(test)]
40mod tests {
41    use futures::{stream, Stream, Future};
42    use super::select_all;
43
44    #[test]
45    fn happy_path() {
46        let stream_a = stream::iter_result(vec![Ok(0), Ok(1)]);
47        let stream_b = stream::iter_result(vec![Ok(2), Ok(3), Ok(4)]);
48        let stream_c = stream::iter_result(vec![Ok(5)]);
49
50        let mut values = select_all::<_, _, ()>(vec![stream_a, stream_b, stream_c])
51            .collect()
52            .wait()
53            .unwrap();
54        values.sort();
55        assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
56    }
57
58    #[test]
59    fn an_error() {
60        let stream_a = stream::iter_result(vec![Ok(0), Ok(1)]);
61        let stream_b = stream::iter_result(vec![Ok(2), Err("fail"), Ok(4)]);
62        let stream_c = stream::iter_result(vec![Ok(5)]);
63
64        let status = select_all(vec![stream_a, stream_b, stream_c])
65            .collect()
66            .wait();
67        assert_eq!(status, Err("fail"));
68    }
69}