timely_util/
perf.rs

1/*
2    Operators ("meters") for performance measurement of timely dataflow programs.
3
4    These are not streaming operators; they compute their summaries after input
5    is completely finished. They are designed this way for easier use in experiments.
6*/
7
8use super::operators::{single_op_binary, window_all, window_all_parallel};
9use super::{Scope, Stream, SystemTime};
10use crate::util::time_util::nanos_timestamp;
11
12use std::cmp::max;
13use std::fmt::Debug;
14use timely::dataflow::operators::Inspect;
15
16/*
17    Meter which computes latency statistics for an output stream.
18*/
19pub fn latency_meter<G, D>(stream: &Stream<G, D>) -> Stream<G, f64>
20where
21    D: timely::Data + Debug,
22    G: Scope<Timestamp = u128>,
23{
24    let stream = window_all_parallel(
25        "Latency Meter",
26        stream,
27        Vec::new,
28        |latencies, time, data| {
29            let num_inputs = data.len();
30            let timestamp_now = nanos_timestamp(SystemTime::now());
31            let latency = timestamp_now - time;
32            for _i in 0..num_inputs {
33                latencies.push(latency);
34            }
35        },
36        |latencies| latencies.clone(),
37    );
38    let stream = window_all(
39        "Latency Meter Collect",
40        &stream,
41        Vec::new,
42        |latencies, _time, data| {
43            for latencies_other in data {
44                latencies.append(&mut latencies_other.clone());
45            }
46        },
47        |latencies| {
48            // println!("latencies: {:?}", latencies);
49            let sum: u128 = Iterator::sum(latencies.iter());
50            (sum as f64) / (1000000.0 * (latencies.len() as f64))
51        },
52    );
53    stream.inspect(|latency| println!("Avg Latency (ms): {:?}", latency))
54}
55
56/*
57    Meter which computes the total volume on a stream.
58*/
59pub fn volume_meter<G, D>(stream: &Stream<G, D>) -> Stream<G, usize>
60where
61    D: timely::Data + Debug,
62    G: Scope<Timestamp = u128>,
63{
64    let stream = window_all_parallel(
65        "Volume Meter",
66        stream,
67        || 0,
68        |count, _time, data| {
69            *count += data.len();
70        },
71        |count| *count,
72    );
73    let stream = window_all(
74        "Volume Meter Collect",
75        &stream,
76        || 0,
77        |count, _time, data| {
78            for count_other in data {
79                *count += count_other;
80            }
81        },
82        |count| *count,
83    );
84    stream.inspect(|count| println!("Volume (events): {:?}", count))
85}
86
87/*
88    Meter which computes the total completion time
89    (max timestamp - starting timestamp) on a stream.
90*/
91pub fn completion_meter<G, D>(stream: &Stream<G, D>) -> Stream<G, f64>
92where
93    D: timely::Data + Debug,
94    G: Scope<Timestamp = u128>,
95{
96    let start_timestamp = nanos_timestamp(SystemTime::now());
97    let stream = window_all_parallel(
98        "Completion Meter",
99        stream,
100        || (),
101        |_agg, _time, _data| {},
102        |_agg| nanos_timestamp(SystemTime::now()),
103    );
104    let stream = window_all(
105        "Completion Meter Collect",
106        &stream,
107        || 0,
108        |max_time, _time, data| {
109            for max_time_other in data {
110                *max_time = max(*max_time, max_time_other);
111            }
112        },
113        move |max_time| (((*max_time - start_timestamp) as f64) / 1000000.0),
114    );
115    stream.inspect(|compl_time| {
116        println!("Completion Time (ms): {:?}", compl_time)
117    })
118}
119
120/*
121    Meter which computes the throughput on a computation from an input
122    stream to an output stream.
123*/
124pub fn throughput_meter<D1, D2, G>(
125    in_stream: &Stream<G, D1>,
126    out_stream: &Stream<G, D2>,
127) -> Stream<G, f64>
128where
129    D1: timely::Data + Debug,
130    D2: timely::Data + Debug,
131    G: Scope<Timestamp = u128>,
132{
133    let volume = volume_meter(in_stream);
134    let compl_time = completion_meter(out_stream);
135    let throughput = single_op_binary(
136        "Throughput Meter Collect",
137        &volume,
138        &compl_time,
139        |volume, compl_time| (volume as f64) / compl_time,
140    );
141    throughput.inspect(|throughput| {
142        println!("Throughput (events/ms): {:?}", throughput)
143    })
144}
145
146/*
147    Meter for both latency and throughput.
148*/
149pub fn latency_throughput_meter<D1, D2, G>(
150    in_stream: &Stream<G, D1>,
151    out_stream: &Stream<G, D2>,
152) -> Stream<G, (f64, f64)>
153where
154    D1: timely::Data + Debug,
155    D2: timely::Data + Debug,
156    G: Scope<Timestamp = u128>,
157{
158    let latency = latency_meter(out_stream);
159    let throughput = throughput_meter(in_stream, out_stream);
160    single_op_binary(
161        "Latency-Throughput Meter Collect",
162        &latency,
163        &throughput,
164        |latency, throughput| (latency, throughput),
165    )
166}