1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use std::collections::HashSet;
use std::borrow::Borrow;
use futures::{Poll, Async, Stream};
use futures::stream::BoxStream;
use {Address, Error};
pub fn union_stream(streams: Vec<BoxStream<Address, Error>>)
-> Union
{
Union {
buf: vec![None; streams.len()],
streams: streams,
}
}
pub struct Union {
streams: Vec<BoxStream<Address, Error>>,
buf: Vec<Option<Address>>,
}
impl Stream for Union {
type Item = Address;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Address>, Error> {
let mut changed = false;
for (i, s) in self.streams.iter_mut().enumerate() {
match s.poll()? {
Async::Ready(Some(a)) => {
self.buf[i] = Some(a);
changed = true;
}
Async::NotReady => {}
Async::Ready(None) => {}
}
}
if !changed {
return Ok(Async::NotReady);
}
Ok(Async::Ready(Some(union_addresses(
self.buf.iter().filter_map(|x| x.as_ref())))))
}
}
pub fn union_addresses<'x, I, B>(iter: I) -> Address
where I: Iterator<Item=B>,
B: Borrow<Address>,
{
let mut set = HashSet::new();
for child in iter {
set.extend(child.borrow().at(0).addresses());
}
return set.into_iter().collect();
}
#[cfg(test)]
mod test {
use Address;
use super::union_addresses;
use std::net::SocketAddr;
use std::str::FromStr;
use std::collections::HashSet;
#[test]
fn test_union() {
let a1 = [ "127.0.0.1:1234", "10.0.0.1:3456" ]
.iter()
.map(|x| SocketAddr::from_str(x).unwrap())
.collect::<Address>();
let a2 = [ "127.0.0.2:1234", "10.0.0.1:3456" ]
.iter()
.map(|x| SocketAddr::from_str(x).unwrap())
.collect::<Address>();
let a = union_addresses([a1, a2].iter());
assert_eq!(a.at(0).addresses().collect::<HashSet<_>>(), vec![
SocketAddr::from_str("127.0.0.1:1234").unwrap(),
SocketAddr::from_str("127.0.0.2:1234").unwrap(),
SocketAddr::from_str("10.0.0.1:3456").unwrap(),
].into_iter().collect::<HashSet<_>>());
assert_eq!(a.at(0).addresses().collect::<Vec<_>>().len(), 3);
}
}