1use timely::dataflow::{InputHandle, ProbeHandle};
2use timely::dataflow::operators::{Input, Probe};
3use timely::dataflow::operators::vec::Filter;
4
5fn main() {
6
7 let mut args = std::env::args();
8 args.next();
9 let rate: usize = args.next().expect("Must specify rate").parse().expect("Rate must be an usize");
10 let duration_s: usize = args.next().expect("Must specify duration_s").parse().expect("duration_s must be an usize");
11
12 timely::execute_from_args(args, move |worker| {
13
14 let index = worker.index();
15 let peers = worker.peers();
16
17 timely::synchronization::Barrier::new(worker).wait();
19
20 let timer = std::time::Instant::now();
21
22 let mut input = InputHandle::new();
23 let probe = ProbeHandle::new();
24
25 worker.dataflow(|scope| {
27 scope
28 .input_from(&mut input) .filter(|_| false) .probe_with(&probe); });
32
33 let ns_per_request = 1_000_000_000 / rate;
34 let mut insert_counter = index; let mut retire_counter = index; let mut inserted_ns = 0;
38
39 let mut counts = vec![[0u64; 16]; 64];
43
44 let counter_limit = rate * duration_s;
45 while retire_counter < counter_limit {
46
47 let elapsed = timer.elapsed();
49 let elapsed_ns = elapsed.as_secs() * 1_000_000_000 + (elapsed.subsec_nanos() as u64);
50
51 let acknowledged_ns: u64 = probe.with_frontier(|frontier| frontier[0]);
53
54 while ((retire_counter * ns_per_request) as u64) < acknowledged_ns && retire_counter < counter_limit {
56 let requested_at = (retire_counter * ns_per_request) as u64;
57 let latency_ns = elapsed_ns - requested_at;
58
59 let count_index = latency_ns.next_power_of_two().trailing_zeros() as usize;
60 let low_bits = ((elapsed_ns - requested_at) >> (count_index - 5)) & 0xF;
61 counts[count_index][low_bits as usize] += 1;
62
63 retire_counter += peers;
64 }
65
66 let target_ns = {
81 let delta: u64 = inserted_ns - acknowledged_ns;
82 let bits = ::std::mem::size_of::<u64>() * 8 - delta.leading_zeros() as usize;
83 let scale = ::std::cmp::max((1 << bits) / 4, 1024);
84 elapsed_ns & !(scale - 1)
85 };
86
87 if inserted_ns < target_ns {
89
90 while ((insert_counter * ns_per_request) as u64) < target_ns {
91 input.send(insert_counter);
92 insert_counter += peers;
93 }
94 input.advance_to(target_ns);
95 inserted_ns = target_ns;
96 }
97
98 worker.step();
99 }
100
101 if index == 0 {
103
104 let mut results = Vec::new();
105 let total = counts.iter().map(|x| x.iter().sum::<u64>()).sum();
106 let mut sum = 0;
107 for index in (10 .. counts.len()).rev() {
108 for sub in (0 .. 16).rev() {
109 if sum > 0 && sum < total {
110 let latency = (1 << (index-1)) + (sub << (index-5));
111 let fraction = (sum as f64) / (total as f64);
112 results.push((latency, fraction));
113 }
114 sum += counts[index][sub];
115 }
116 }
117 for (latency, fraction) in results.drain(..).rev() {
118 println!("{}\t{}", latency, fraction);
119 }
120 }
121
122 }).unwrap();
123}