Skip to main content

logging_replay/
logging_replay.rs

1//! Demonstrates cross-thread capture and replay of timely logging events.
2//!
3//! A source timely instance (2 workers) runs a simple dataflow and captures its
4//! logging events using thread-safe `link_sync::EventLink`s. A sink timely instance
5//! (1 worker) replays those events and counts them.
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use timely::dataflow::operators::{Exchange, Inspect, ToStream};
11use timely::dataflow::operators::capture::event::link_sync::EventLink;
12use timely::dataflow::operators::capture::Replay;
13use timely::logging::{BatchLogger, TimelyEventBuilder, TimelyEvent};
14
15fn main() {
16
17    let source_workers = 2usize;
18    let sink_workers = 1usize;
19
20    // One EventLink per source worker, shared between source (writer) and sink (reader).
21    let event_links: Vec<_> = (0..source_workers)
22        .map(|_| Arc::new(EventLink::<Duration, Vec<(Duration, TimelyEvent)>>::new()))
23        .collect();
24
25    // Clone reader handles (they start at the head; the writer will advance past them).
26    let readers: Vec<_> = event_links.iter().map(Arc::clone).collect();
27
28    std::thread::scope(|scope| {
29
30        // --- Source instance: 2 workers producing logging events ---
31        let source = scope.spawn(move || {
32            timely::execute(timely::Config::process(source_workers), move |worker| {
33
34                // Install logging: capture timely events into our shared EventLink.
35                let link = event_links[worker.index()].clone();
36                let mut logger = BatchLogger::new(link);
37                worker.log_register()
38                    .unwrap()
39                    .insert::<TimelyEventBuilder, _>("timely", move |time, data| {
40                        logger.publish_batch(time, data);
41                    });
42
43                // A trivial dataflow to generate some logging activity.
44                worker.dataflow::<u64,_,_>(|scope| {
45                    (0..100u64)
46                        .to_stream(scope)
47                        .container::<Vec<_>>()
48                        .exchange(|&x| x)
49                        .inspect(|_x| { });
50                });
51
52            }).expect("source execution failed");
53        });
54
55        // --- Sink instance: 1 worker replaying the captured logs ---
56        let sink = scope.spawn(move || {
57            timely::execute(timely::Config::process(sink_workers), move |worker| {
58
59                // Each sink worker replays a disjoint subset of the source streams.
60                let replayers: Vec<_> = readers.iter().enumerate()
61                    .filter(|(i, _)| i % worker.peers() == worker.index())
62                    .map(|(_, r)| Arc::clone(r))
63                    .collect();
64
65                worker.dataflow::<Duration,_,_>(|scope| {
66                    replayers
67                        .replay_into(scope)
68                        .inspect(|event| {
69                            println!("  {:?}", event);
70                        });
71                });
72
73            }).expect("sink execution failed");
74        });
75
76        source.join().expect("source panicked");
77        sink.join().expect("sink panicked");
78    });
79
80    println!("Done.");
81}