Skip to main content

logging_recv/
logging-recv.rs

1use std::net::TcpListener;
2use std::time::Duration;
3
4use timely::dataflow::operators::Inspect;
5use timely::dataflow::operators::capture::{EventReader, Replay};
6use timely::logging::{TimelySetup, TimelyEvent};
7
8fn main() {
9    timely::execute_from_args(std::env::args(), |worker| {
10
11        let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
12
13        // create replayers from disjoint partition of source worker identifiers.
14        let replayers =
15        (0 .. source_peers)
16            .filter(|i| i % worker.peers() == worker.index())
17            .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
18            .collect::<Vec<_>>()
19            .into_iter()
20            .map(|l| l.incoming().next().unwrap().unwrap())
21            .map(EventReader::<Duration,Vec<(Duration,TimelySetup,TimelyEvent)>,_>::new)
22            .collect::<Vec<_>>();
23
24        worker.dataflow(|scope| {
25            replayers
26                .replay_into(scope)
27                .inspect(|x| println!("replayed: {:?}", x));
28        })
29    }).unwrap(); // asserts error-free execution
30}