Skip to main content

loopdemo/
loopdemo.rs

1use timely::dataflow::{InputHandle, ProbeHandle};
2use timely::dataflow::operators::{Input, Feedback, Concat, ConnectLoop, Probe};
3use timely::dataflow::operators::vec::{Map, Filter};
4
5fn main() {
6
7    let mut args = std::env::args();
8    args.next();
9    let rate: usize = args.next().expect("Must specify rate").parse().expect("Rate must be an usize");
10    let duration_s: usize = args.next().expect("Must specify duration_s").parse().expect("duration_s must be an usize");
11
12    timely::execute_from_args(args, move |worker| {
13
14        let index = worker.index();
15        let peers = worker.peers();
16
17        let timer = std::time::Instant::now();
18
19        let mut input = InputHandle::new();
20        let probe = ProbeHandle::new();
21
22        // Create a dataflow that discards input data (just synchronizes).
23        worker.dataflow(|scope| {
24
25            let stream = scope.input_from(&mut input);
26
27            let (loop_handle, loop_stream) = scope.feedback(1);
28
29            let step =
30            stream
31                .concat(loop_stream)
32                .map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 })
33                .filter(|x| x > &1);
34
35            step.probe_with(&probe)
36                .connect_loop(loop_handle);
37        });
38
39        let ns_per_request = 1_000_000_000 / rate;
40        let mut insert_counter = index;           // counts up as we insert records.
41        let mut retire_counter = index;           // counts up as we retire records.
42
43        let mut inserted_ns = 0;
44
45        // We repeatedly consult the elapsed time, and introduce any data now considered available.
46        // At the same time, we observe the output and record which inputs are considered retired.
47
48        let mut counts = vec![[0u64; 16]; 64];
49
50        let counter_limit = rate * duration_s;
51        while retire_counter < counter_limit {
52
53            // Open-loop latency-throughput test, parameterized by offered rate `ns_per_request`.
54            let elapsed = timer.elapsed();
55            let elapsed_ns = elapsed.as_secs() * 1_000_000_000 + (elapsed.subsec_nanos() as u64);
56
57            // Determine completed ns.
58            let acknowledged_ns: u64 = probe.with_frontier(|frontier| frontier[0]);
59
60            // Notice any newly-retired records.
61            while ((retire_counter * ns_per_request) as u64) < acknowledged_ns && retire_counter < counter_limit {
62                let requested_at = (retire_counter * ns_per_request) as u64;
63                let latency_ns = elapsed_ns - requested_at;
64
65                let count_index = latency_ns.next_power_of_two().trailing_zeros() as usize;
66                let low_bits = ((elapsed_ns - requested_at) >> (count_index - 5)) & 0xF;
67                counts[count_index][low_bits as usize] += 1;
68
69                retire_counter += peers;
70            }
71
72            // Now, should we introduce more records before stepping the worker?
73            // Three choices here:
74            //
75            //   1. Wait until previous batch acknowledged.
76            //   2. Tick at most once every millisecond-ish.
77            //   3. Geometrically increasing outstanding batches.
78
79            // Technique 1:
80            // let target_ns = if acknowledged_ns >= inserted_ns { elapsed_ns } else { inserted_ns };
81
82            // Technique 2:
83            // let target_ns = elapsed_ns & !((1 << 20) - 1);
84
85            // Technique 3:
86            let scale = (inserted_ns - acknowledged_ns).next_power_of_two();
87            let target_ns = elapsed_ns & !(scale - 1);
88
89            if inserted_ns < target_ns {
90
91                while ((insert_counter * ns_per_request) as u64) < target_ns {
92                    input.send(insert_counter);
93                    insert_counter += peers;
94                }
95                input.advance_to(target_ns);
96                inserted_ns = target_ns;
97            }
98
99            worker.step();
100        }
101
102        // Report observed latency measurements.
103        if index == 0 {
104
105            let mut results = Vec::new();
106            let total = counts.iter().map(|x| x.iter().sum::<u64>()).sum();
107            let mut sum = 0;
108            for index in (10 .. counts.len()).rev() {
109                for sub in (0 .. 16).rev() {
110                    if sum > 0 && sum < total {
111                        let latency = (1 << (index-1)) + (sub << (index-5));
112                        let fraction = (sum as f64) / (total as f64);
113                        results.push((latency, fraction));
114                    }
115                    sum += counts[index][sub];
116                }
117            }
118            for (latency, fraction) in results.drain(..).rev() {
119                println!("{}\t{}", latency, fraction);
120            }
121        }
122
123    }).unwrap();
124}