logging_send/
logging-send.rs1use std::time::Duration;
2
3use timely::dataflow::{InputHandle, ProbeHandle};
4use timely::dataflow::operators::{Input, Exchange, Probe};
5use timely::logging::{TimelyEventBuilder, TimelyProgressEventBuilder, TimelySummaryEventBuilder};
6use timely::container::CapacityContainerBuilder;
7use timely::progress::reachability::logging::TrackerEventBuilder;
8
9fn main() {
10 timely::execute_from_args(std::env::args(), |worker| {
12
13 let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
14 let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
15 let mut input = InputHandle::new();
16 let probe = ProbeHandle::new();
17
18 worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
20 if let Some(data) = data {
21 data.iter().for_each(|x| println!("LOG1: {:?}", x))
22 }
23 else {
24 println!("LOG1: Flush {time:?}");
25 }
26 );
27
28 worker.log_register().unwrap().insert::<TimelyProgressEventBuilder<usize>,_>("timely/progress/usize", |time, data|
32 if let Some(data) = data {
33 data.iter().for_each(|x| {
34 println!("PROGRESS: {:?}", x);
35 let (_, ev) = x;
36 print!("PROGRESS: TYPED MESSAGES: ");
37 for (n, p, t, d) in ev.messages.iter() {
38 print!("{:?}, ", (n, p, t, d));
39 }
40 println!();
41 print!("PROGRESS: TYPED INTERNAL: ");
42 for (n, p, t, d) in ev.internal.iter() {
43 print!("{:?}, ", (n, p, t, d));
44 }
45 println!();
46 })
47 }
48 else {
49 println!("PROGRESS: Flush {time:?}");
50 }
51 );
52
53 worker.log_register().unwrap().insert::<TrackerEventBuilder<usize>,_>("timely/reachability/usize", |time, data|
54 if let Some(data) = data {
55 data.iter().for_each(|x| {
56 println!("REACHABILITY: {:?}", x);
57 })
58 }
59 else {
60 println!("REACHABILITY: Flush {time:?}");
61 }
62 );
63
64 worker.log_register().unwrap().insert::<TimelySummaryEventBuilder<usize>,_>("timely/summary/usize", |time, data|
65 if let Some(data) = data {
66 data.iter().for_each(|(_, x)| {
67 println!("SUMMARY: {:?}", x);
68 })
69 }
70 else {
71 println!("SUMMARY: Flush {time:?}");
72 }
73 );
74
75 worker.dataflow(|scope| {
77 scope
78 .input_from(&mut input)
79 .container::<Vec<_>>()
80 .exchange(|&x| x as u64)
81 .probe_with(&probe);
82 });
83
84 worker.log_register().unwrap().insert::<TimelyEventBuilder,_>("timely", |time, data|
86 if let Some(data) = data {
87 data.iter().for_each(|x| println!("LOG2: {:?}", x))
88 }
89 else {
90 println!("LOG2: Flush {time:?}");
91 }
92 );
93
94 worker.dataflow(|scope| {
96 scope
97 .input_from(&mut input)
98 .exchange(|&x| x as u64)
99 .probe_with(&probe);
100 });
101
102 type MyBuilder = CapacityContainerBuilder<Vec<(Duration, ())>>;
104 worker.log_register().unwrap().insert::<MyBuilder,_>("input", |time, data|
105 if let Some(data) = data {
106 for element in data.iter() {
107 println!("Round tick at: {:?}", element.0);
108 }
109 }
110 else {
111 println!("Round flush at: {time:?}");
112 }
113 );
114
115 let input_logger = worker.log_register().unwrap().get::<MyBuilder>("input").expect("Input logger absent");
116
117 let timer = std::time::Instant::now();
118
119 for round in 0 .. rounds {
120
121 for i in 0 .. batch {
122 input.send(i);
123 }
124 input.advance_to(round);
125 input_logger.log(());
126
127 while probe.less_than(input.time()) {
128 worker.step();
129 }
130
131 }
132
133 let volume = (rounds * batch) as f64;
134 let elapsed = timer.elapsed();
135 let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
136
137 println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
138
139 }).unwrap();
140}