use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Feedback, Concat, ConnectLoop, Probe};
use timely::dataflow::operators::vec::{Map, Filter};
fn main() {
let mut args = std::env::args();
args.next();
let rate: usize = args.next().expect("Must specify rate").parse().expect("Rate must be an usize");
let duration_s: usize = args.next().expect("Must specify duration_s").parse().expect("duration_s must be an usize");
timely::execute_from_args(args, move |worker| {
let index = worker.index();
let peers = worker.peers();
let timer = std::time::Instant::now();
let mut input = InputHandle::new();
let probe = ProbeHandle::new();
worker.dataflow(|scope| {
let stream = scope.input_from(&mut input);
let (loop_handle, loop_stream) = scope.feedback(1);
let step =
stream
.concat(loop_stream)
.map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 })
.filter(|x| x > &1);
step.probe_with(&probe)
.connect_loop(loop_handle);
});
let ns_per_request = 1_000_000_000 / rate;
let mut insert_counter = index; let mut retire_counter = index;
let mut inserted_ns = 0;
let mut counts = vec![[0u64; 16]; 64];
let counter_limit = rate * duration_s;
while retire_counter < counter_limit {
let elapsed = timer.elapsed();
let elapsed_ns = elapsed.as_secs() * 1_000_000_000 + (elapsed.subsec_nanos() as u64);
let acknowledged_ns: u64 = probe.with_frontier(|frontier| frontier[0]);
while ((retire_counter * ns_per_request) as u64) < acknowledged_ns && retire_counter < counter_limit {
let requested_at = (retire_counter * ns_per_request) as u64;
let latency_ns = elapsed_ns - requested_at;
let count_index = latency_ns.next_power_of_two().trailing_zeros() as usize;
let low_bits = ((elapsed_ns - requested_at) >> (count_index - 5)) & 0xF;
counts[count_index][low_bits as usize] += 1;
retire_counter += peers;
}
let scale = (inserted_ns - acknowledged_ns).next_power_of_two();
let target_ns = elapsed_ns & !(scale - 1);
if inserted_ns < target_ns {
while ((insert_counter * ns_per_request) as u64) < target_ns {
input.send(insert_counter);
insert_counter += peers;
}
input.advance_to(target_ns);
inserted_ns = target_ns;
}
worker.step();
}
if index == 0 {
let mut results = Vec::new();
let total = counts.iter().map(|x| x.iter().sum::<u64>()).sum();
let mut sum = 0;
for index in (10 .. counts.len()).rev() {
for sub in (0 .. 16).rev() {
if sum > 0 && sum < total {
let latency = (1 << (index-1)) + (sub << (index-5));
let fraction = (sum as f64) / (total as f64);
results.push((latency, fraction));
}
sum += counts[index][sub];
}
}
for (latency, fraction) in results.drain(..).rev() {
println!("{}\t{}", latency, fraction);
}
}
}).unwrap();
}