capture_recv/
capture_recv.rs1use std::net::TcpListener;
2use timely::dataflow::operators::Inspect;
3use timely::dataflow::operators::capture::{EventReader, Replay};
4
5fn main() {
6 timely::execute_from_args(std::env::args(), |worker| {
7
8 let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9
10 let replayers =
12 (0 .. source_peers)
13 .filter(|i| i % worker.peers() == worker.index())
14 .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
15 .collect::<Vec<_>>()
16 .into_iter()
17 .map(|l| l.incoming().next().unwrap().unwrap())
18 .map(EventReader::<_,Vec<u64>,_>::new)
19 .collect::<Vec<_>>();
20
21 worker.dataflow::<u64,_,_>(|scope| {
22 replayers
23 .replay_into(scope)
24 .inspect(|x| println!("replayed: {:?}", x));
25 })
26 }).unwrap(); }