Skip to main content

logging_send/
logging-send.rs

1use std::time::Duration;
2
3use timely::dataflow::{InputHandle, ProbeHandle};
4use timely::dataflow::operators::{Input, Exchange, Probe};
5use timely::logging::{TimelyEventBuilder, TimelyProgressEventBuilder, TimelySummaryEventBuilder};
6use timely::container::CapacityContainerBuilder;
7use timely::progress::reachability::logging::TrackerEventBuilder;
8
9fn main() {
10    // initializes and runs a timely dataflow.
11    timely::execute_from_args(std::env::args(), |worker| {
12
13        let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
14        let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
15        let mut input = InputHandle::new();
16        let probe = ProbeHandle::new();
17
18        // Register timely worker logging.
19        worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
20            if let Some(data) = data {
21                data.iter().for_each(|x| println!("LOG1: {:?}", x))
22            }
23            else {
24                println!("LOG1: Flush {time:?}");
25            }
26        );
27
28        // Register timely progress logging.
29        // Less generally useful: intended for debugging advanced custom operators or timely
30        // internals.
31        worker.log_register().unwrap().insert::<TimelyProgressEventBuilder<usize>,_>("timely/progress/usize", |time, data|
32            if let Some(data) = data {
33                data.iter().for_each(|x| {
34                    println!("PROGRESS: {:?}", x);
35                    let (_, ev) = x;
36                    print!("PROGRESS: TYPED MESSAGES: ");
37                    for (n, p, t, d) in ev.messages.iter() {
38                        print!("{:?}, ", (n, p, t, d));
39                    }
40                    println!();
41                    print!("PROGRESS: TYPED INTERNAL: ");
42                    for (n, p, t, d) in ev.internal.iter() {
43                        print!("{:?}, ", (n, p, t, d));
44                    }
45                    println!();
46                })
47            }
48            else {
49                println!("PROGRESS: Flush {time:?}");
50            }
51        );
52
53        worker.log_register().unwrap().insert::<TrackerEventBuilder<usize>,_>("timely/reachability/usize", |time, data|
54            if let Some(data) = data {
55                data.iter().for_each(|x| {
56                    println!("REACHABILITY: {:?}", x);
57                })
58            }
59            else {
60                println!("REACHABILITY: Flush {time:?}");
61            }
62        );
63
64        worker.log_register().unwrap().insert::<TimelySummaryEventBuilder<usize>,_>("timely/summary/usize", |time, data|
65            if let Some(data) = data {
66                data.iter().for_each(|(_, x)| {
67                    println!("SUMMARY: {:?}", x);
68                })
69            }
70            else {
71                println!("SUMMARY: Flush {time:?}");
72            }
73        );
74
75        // create a new input, exchange data, and inspect its output
76        worker.dataflow(|scope| {
77            scope
78                .input_from(&mut input)
79                .container::<Vec<_>>()
80                .exchange(|&x| x as u64)
81                .probe_with(&probe);
82        });
83
84        // Register timely worker logging.
85        worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
86            if let Some(data) = data {
87                data.iter().for_each(|x| println!("LOG2: {:?}", x))
88            }
89            else {
90                println!("LOG2: Flush {time:?}");
91            }
92        );
93
94        // create a new input, exchange data, and inspect its output
95        worker.dataflow(|scope| {
96            scope
97                .input_from(&mut input)
98                .exchange(|&x| x as u64)
99                .probe_with(&probe);
100        });
101
102        // Register user-level logging.
103        type MyBuilder = CapacityContainerBuilder<Vec<(Duration, ())>>;
104        worker.log_register().unwrap().insert::<MyBuilder,_>("input", |time, data|
105            if let Some(data) = data {
106                for element in data.iter() {
107                    println!("Round tick at: {:?}", element.0);
108                }
109            }
110            else {
111                println!("Round flush at: {time:?}");
112            }
113        );
114
115        let input_logger = worker.log_register().unwrap().get::<MyBuilder>("input").expect("Input logger absent");
116
117        let timer = std::time::Instant::now();
118
119        for round in 0 .. rounds {
120
121            for i in 0 .. batch {
122                input.send(i);
123            }
124            input.advance_to(round);
125            input_logger.log(());
126
127            while probe.less_than(input.time()) {
128                worker.step();
129            }
130
131        }
132
133        let volume = (rounds * batch) as f64;
134        let elapsed = timer.elapsed();
135        let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
136
137        println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
138
139    }).unwrap();
140}