1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Probe};
use timely::dataflow::operators::vec::Filter;
fn main() {
let mut args = std::env::args();
args.next();
let rate: usize = args.next().expect("Must specify rate").parse().expect("Rate must be an usize");
let duration_s: usize = args.next().expect("Must specify duration_s").parse().expect("duration_s must be an usize");
timely::execute_from_args(args, move |worker| {
let index = worker.index();
let peers = worker.peers();
// re-synchronize all workers (account for start-up).
timely::synchronization::Barrier::new(worker).wait();
let timer = std::time::Instant::now();
let mut input = InputHandle::new();
let probe = ProbeHandle::new();
// Create a dataflow that discards input data (just synchronizes).
worker.dataflow(|scope| {
scope
.input_from(&mut input) // read input.
.filter(|_| false) // do nothing.
.probe_with(&probe); // observe output.
});
let ns_per_request = 1_000_000_000 / rate;
let mut insert_counter = index; // counts up as we insert records.
let mut retire_counter = index; // counts up as we retire records.
let mut inserted_ns = 0;
// We repeatedly consult the elapsed time, and introduce any data now considered available.
// At the same time, we observe the output and record which inputs are considered retired.
let mut counts = vec![[0u64; 16]; 64];
let counter_limit = rate * duration_s;
while retire_counter < counter_limit {
// Open-loop latency-throughput test, parameterized by offered rate `ns_per_request`.
let elapsed = timer.elapsed();
let elapsed_ns = elapsed.as_secs() * 1_000_000_000 + (elapsed.subsec_nanos() as u64);
// Determine completed ns.
let acknowledged_ns: u64 = probe.with_frontier(|frontier| frontier[0]);
// Notice any newly-retired records.
while ((retire_counter * ns_per_request) as u64) < acknowledged_ns && retire_counter < counter_limit {
let requested_at = (retire_counter * ns_per_request) as u64;
let latency_ns = elapsed_ns - requested_at;
let count_index = latency_ns.next_power_of_two().trailing_zeros() as usize;
let low_bits = ((elapsed_ns - requested_at) >> (count_index - 5)) & 0xF;
counts[count_index][low_bits as usize] += 1;
retire_counter += peers;
}
// Now, should we introduce more records before stepping the worker?
// Three choices here:
//
// 1. Wait until previous batch acknowledged.
// 2. Tick at most once every millisecond-ish.
// 3. Geometrically increase outstanding batches.
// Technique 1:
// let target_ns = if acknowledged_ns >= inserted_ns { elapsed_ns } else { inserted_ns };
// Technique 2:
// let target_ns = elapsed_ns & !((1 << 20) - 1);
// Technique 3:
let target_ns = {
let delta: u64 = inserted_ns - acknowledged_ns;
let bits = ::std::mem::size_of::<u64>() * 8 - delta.leading_zeros() as usize;
let scale = ::std::cmp::max((1 << bits) / 4, 1024);
elapsed_ns & !(scale - 1)
};
// Common for each technique.
if inserted_ns < target_ns {
while ((insert_counter * ns_per_request) as u64) < target_ns {
input.send(insert_counter);
insert_counter += peers;
}
input.advance_to(target_ns);
inserted_ns = target_ns;
}
worker.step();
}
// Report observed latency measurements.
if index == 0 {
let mut results = Vec::new();
let total = counts.iter().map(|x| x.iter().sum::<u64>()).sum();
let mut sum = 0;
for index in (10 .. counts.len()).rev() {
for sub in (0 .. 16).rev() {
if sum > 0 && sum < total {
let latency = (1 << (index-1)) + (sub << (index-5));
let fraction = (sum as f64) / (total as f64);
results.push((latency, fraction));
}
sum += counts[index][sub];
}
}
for (latency, fraction) in results.drain(..).rev() {
println!("{}\t{}", latency, fraction);
}
}
}).unwrap();
}