Skip to main content

openloop/
openloop.rs

1use timely::dataflow::{InputHandle, ProbeHandle};
2use timely::dataflow::operators::{Input, Probe};
3use timely::dataflow::operators::vec::Filter;
4
5fn main() {
6
7    let mut args = std::env::args();
8    args.next();
9    let rate: usize = args.next().expect("Must specify rate").parse().expect("Rate must be an usize");
10    let duration_s: usize = args.next().expect("Must specify duration_s").parse().expect("duration_s must be an usize");
11
12    timely::execute_from_args(args, move |worker| {
13
14        let index = worker.index();
15        let peers = worker.peers();
16
17        // re-synchronize all workers (account for start-up).
18        timely::synchronization::Barrier::new(worker).wait();
19
20        let timer = std::time::Instant::now();
21
22        let mut input = InputHandle::new();
23        let probe = ProbeHandle::new();
24
25        // Create a dataflow that discards input data (just synchronizes).
26        worker.dataflow(|scope| {
27            scope
28                .input_from(&mut input)     // read input.
29                .filter(|_| false)          // do nothing.
30                .probe_with(&probe);    // observe output.
31        });
32
33        let ns_per_request = 1_000_000_000 / rate;
34        let mut insert_counter = index;           // counts up as we insert records.
35        let mut retire_counter = index;           // counts up as we retire records.
36
37        let mut inserted_ns = 0;
38
39        // We repeatedly consult the elapsed time, and introduce any data now considered available.
40        // At the same time, we observe the output and record which inputs are considered retired.
41
42        let mut counts = vec![[0u64; 16]; 64];
43
44        let counter_limit = rate * duration_s;
45        while retire_counter < counter_limit {
46
47            // Open-loop latency-throughput test, parameterized by offered rate `ns_per_request`.
48            let elapsed = timer.elapsed();
49            let elapsed_ns = elapsed.as_secs() * 1_000_000_000 + (elapsed.subsec_nanos() as u64);
50
51            // Determine completed ns.
52            let acknowledged_ns: u64 = probe.with_frontier(|frontier| frontier[0]);
53
54            // Notice any newly-retired records.
55            while ((retire_counter * ns_per_request) as u64) < acknowledged_ns && retire_counter < counter_limit {
56                let requested_at = (retire_counter * ns_per_request) as u64;
57                let latency_ns = elapsed_ns - requested_at;
58
59                let count_index = latency_ns.next_power_of_two().trailing_zeros() as usize;
60                let low_bits = ((elapsed_ns - requested_at) >> (count_index - 5)) & 0xF;
61                counts[count_index][low_bits as usize] += 1;
62
63                retire_counter += peers;
64            }
65
66            // Now, should we introduce more records before stepping the worker?
67            // Three choices here:
68            //
69            //   1. Wait until previous batch acknowledged.
70            //   2. Tick at most once every millisecond-ish.
71            //   3. Geometrically increase outstanding batches.
72
73            // Technique 1:
74            // let target_ns = if acknowledged_ns >= inserted_ns { elapsed_ns } else { inserted_ns };
75
76            // Technique 2:
77            // let target_ns = elapsed_ns & !((1 << 20) - 1);
78
79            // Technique 3:
80            let target_ns = {
81                let delta: u64 = inserted_ns - acknowledged_ns;
82                let bits = ::std::mem::size_of::<u64>() * 8 - delta.leading_zeros() as usize;
83                let scale = ::std::cmp::max((1 << bits) / 4, 1024);
84                elapsed_ns & !(scale - 1)
85            };
86
87            // Common for each technique.
88            if inserted_ns < target_ns {
89
90                while ((insert_counter * ns_per_request) as u64) < target_ns {
91                    input.send(insert_counter);
92                    insert_counter += peers;
93                }
94                input.advance_to(target_ns);
95                inserted_ns = target_ns;
96            }
97
98            worker.step();
99        }
100
101        // Report observed latency measurements.
102        if index == 0 {
103
104            let mut results = Vec::new();
105            let total = counts.iter().map(|x| x.iter().sum::<u64>()).sum();
106            let mut sum = 0;
107            for index in (10 .. counts.len()).rev() {
108                for sub in (0 .. 16).rev() {
109                    if sum > 0 && sum < total {
110                        let latency = (1 << (index-1)) + (sub << (index-5));
111                        let fraction = (sum as f64) / (total as f64);
112                        results.push((latency, fraction));
113                    }
114                    sum += counts[index][sub];
115                }
116            }
117            for (latency, fraction) in results.drain(..).rev() {
118                println!("{}\t{}", latency, fraction);
119            }
120        }
121
122    }).unwrap();
123}