1use super::operators::{single_op_binary, window_all, window_all_parallel};
9use super::{Scope, Stream, SystemTime};
10use crate::util::time_util::nanos_timestamp;
11
12use std::cmp::max;
13use std::fmt::Debug;
14use timely::dataflow::operators::Inspect;
15
16pub fn latency_meter<G, D>(stream: &Stream<G, D>) -> Stream<G, f64>
20where
21 D: timely::Data + Debug,
22 G: Scope<Timestamp = u128>,
23{
24 let stream = window_all_parallel(
25 "Latency Meter",
26 stream,
27 Vec::new,
28 |latencies, time, data| {
29 let num_inputs = data.len();
30 let timestamp_now = nanos_timestamp(SystemTime::now());
31 let latency = timestamp_now - time;
32 for _i in 0..num_inputs {
33 latencies.push(latency);
34 }
35 },
36 |latencies| latencies.clone(),
37 );
38 let stream = window_all(
39 "Latency Meter Collect",
40 &stream,
41 Vec::new,
42 |latencies, _time, data| {
43 for latencies_other in data {
44 latencies.append(&mut latencies_other.clone());
45 }
46 },
47 |latencies| {
48 let sum: u128 = Iterator::sum(latencies.iter());
50 (sum as f64) / (1000000.0 * (latencies.len() as f64))
51 },
52 );
53 stream.inspect(|latency| println!("Avg Latency (ms): {:?}", latency))
54}
55
56pub fn volume_meter<G, D>(stream: &Stream<G, D>) -> Stream<G, usize>
60where
61 D: timely::Data + Debug,
62 G: Scope<Timestamp = u128>,
63{
64 let stream = window_all_parallel(
65 "Volume Meter",
66 stream,
67 || 0,
68 |count, _time, data| {
69 *count += data.len();
70 },
71 |count| *count,
72 );
73 let stream = window_all(
74 "Volume Meter Collect",
75 &stream,
76 || 0,
77 |count, _time, data| {
78 for count_other in data {
79 *count += count_other;
80 }
81 },
82 |count| *count,
83 );
84 stream.inspect(|count| println!("Volume (events): {:?}", count))
85}
86
87pub fn completion_meter<G, D>(stream: &Stream<G, D>) -> Stream<G, f64>
92where
93 D: timely::Data + Debug,
94 G: Scope<Timestamp = u128>,
95{
96 let start_timestamp = nanos_timestamp(SystemTime::now());
97 let stream = window_all_parallel(
98 "Completion Meter",
99 stream,
100 || (),
101 |_agg, _time, _data| {},
102 |_agg| nanos_timestamp(SystemTime::now()),
103 );
104 let stream = window_all(
105 "Completion Meter Collect",
106 &stream,
107 || 0,
108 |max_time, _time, data| {
109 for max_time_other in data {
110 *max_time = max(*max_time, max_time_other);
111 }
112 },
113 move |max_time| (((*max_time - start_timestamp) as f64) / 1000000.0),
114 );
115 stream.inspect(|compl_time| {
116 println!("Completion Time (ms): {:?}", compl_time)
117 })
118}
119
120pub fn throughput_meter<D1, D2, G>(
125 in_stream: &Stream<G, D1>,
126 out_stream: &Stream<G, D2>,
127) -> Stream<G, f64>
128where
129 D1: timely::Data + Debug,
130 D2: timely::Data + Debug,
131 G: Scope<Timestamp = u128>,
132{
133 let volume = volume_meter(in_stream);
134 let compl_time = completion_meter(out_stream);
135 let throughput = single_op_binary(
136 "Throughput Meter Collect",
137 &volume,
138 &compl_time,
139 |volume, compl_time| (volume as f64) / compl_time,
140 );
141 throughput.inspect(|throughput| {
142 println!("Throughput (events/ms): {:?}", throughput)
143 })
144}
145
146pub fn latency_throughput_meter<D1, D2, G>(
150 in_stream: &Stream<G, D1>,
151 out_stream: &Stream<G, D2>,
152) -> Stream<G, (f64, f64)>
153where
154 D1: timely::Data + Debug,
155 D2: timely::Data + Debug,
156 G: Scope<Timestamp = u128>,
157{
158 let latency = latency_meter(out_stream);
159 let throughput = throughput_meter(in_stream, out_stream);
160 single_op_binary(
161 "Latency-Throughput Meter Collect",
162 &latency,
163 &throughput,
164 |latency, throughput| (latency, throughput),
165 )
166}