futures_stream_select_all_send/
lib.rs1extern crate futures;
2use futures::stream::{self, Stream};
3
4pub fn select_all<I, T, E>(streams: I) -> Box<Stream<Item = T, Error = E> + Send>
5 where I: IntoIterator,
6 I::Item: Stream<Item = T, Error = E> + Send + 'static,
7 T: 'static + Send,
8 E: 'static + Send,
9{
10 struct Level<T, E> {
11 power: usize,
12 stream: Box<Stream<Item = T, Error = E> + Send>,
13 }
14
15 let mut stack: Vec<Level<T, E>> = Vec::new();
16 for stream in streams {
17 let mut lev_a = Level { power: 0, stream: Box::new(stream), };
18 while stack.last().map(|l| lev_a.power == l.power).unwrap_or(false) {
19 let lev_b = stack.pop().unwrap();
20 lev_a = Level {
21 power: lev_b.power + 1,
22 stream: Box::new(lev_b.stream.select(lev_a.stream)),
23 }
24 }
25 stack.push(lev_a);
26 }
27
28 if let Some(tree_lev) = stack.pop() {
29 let mut tree = tree_lev.stream;
30 while let Some(node) = stack.pop() {
31 tree = Box::new(tree.select(node.stream))
32 }
33 tree
34 } else {
35 Box::new(stream::empty())
36 }
37}
38
39#[cfg(test)]
40mod tests {
41 use futures::{stream, Stream, Future};
42 use super::select_all;
43
44 #[test]
45 fn happy_path() {
46 let stream_a = stream::iter_result(vec![Ok(0), Ok(1)]);
47 let stream_b = stream::iter_result(vec![Ok(2), Ok(3), Ok(4)]);
48 let stream_c = stream::iter_result(vec![Ok(5)]);
49
50 let mut values = select_all::<_, _, ()>(vec![stream_a, stream_b, stream_c])
51 .collect()
52 .wait()
53 .unwrap();
54 values.sort();
55 assert_eq!(values, vec![0, 1, 2, 3, 4, 5]);
56 }
57
58 #[test]
59 fn an_error() {
60 let stream_a = stream::iter_result(vec![Ok(0), Ok(1)]);
61 let stream_b = stream::iter_result(vec![Ok(2), Err("fail"), Ok(4)]);
62 let stream_c = stream::iter_result(vec![Ok(5)]);
63
64 let status = select_all(vec![stream_a, stream_b, stream_c])
65 .collect()
66 .wait();
67 assert_eq!(status, Err("fail"));
68 }
69}