logging_replay/
logging_replay.rs1use std::sync::Arc;
8use std::time::Duration;
9
10use timely::dataflow::operators::{Exchange, Inspect, ToStream};
11use timely::dataflow::operators::capture::event::link_sync::EventLink;
12use timely::dataflow::operators::capture::Replay;
13use timely::logging::{BatchLogger, TimelyEventBuilder, TimelyEvent};
14
15fn main() {
16
17 let source_workers = 2usize;
18 let sink_workers = 1usize;
19
20 let event_links: Vec<_> = (0..source_workers)
22 .map(|_| Arc::new(EventLink::<Duration, Vec<(Duration, TimelyEvent)>>::new()))
23 .collect();
24
25 let readers: Vec<_> = event_links.iter().map(Arc::clone).collect();
27
28 std::thread::scope(|scope| {
29
30 let source = scope.spawn(move || {
32 timely::execute(timely::Config::process(source_workers), move |worker| {
33
34 let link = event_links[worker.index()].clone();
36 let mut logger = BatchLogger::new(link);
37 worker.log_register()
38 .unwrap()
39 .insert::<TimelyEventBuilder, _>("timely", move |time, data| {
40 logger.publish_batch(time, data);
41 });
42
43 worker.dataflow::<u64,_,_>(|scope| {
45 (0..100u64)
46 .to_stream(scope)
47 .container::<Vec<_>>()
48 .exchange(|&x| x)
49 .inspect(|_x| { });
50 });
51
52 }).expect("source execution failed");
53 });
54
55 let sink = scope.spawn(move || {
57 timely::execute(timely::Config::process(sink_workers), move |worker| {
58
59 let replayers: Vec<_> = readers.iter().enumerate()
61 .filter(|(i, _)| i % worker.peers() == worker.index())
62 .map(|(_, r)| Arc::clone(r))
63 .collect();
64
65 worker.dataflow::<Duration,_,_>(|scope| {
66 replayers
67 .replay_into(scope)
68 .inspect(|event| {
69 println!(" {:?}", event);
70 });
71 });
72
73 }).expect("sink execution failed");
74 });
75
76 source.join().expect("source panicked");
77 sink.join().expect("sink panicked");
78 });
79
80 println!("Done.");
81}