use std::collections::HashMap;
use timely::dataflow::*;
use timely::dataflow::operators::{Input, Probe};
use timely::dataflow::operators::generic::Operator;
use timely::dataflow::channels::pact::Exchange;
fn main() {
let keys: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
let vals: usize = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
timely::execute_from_args(std::env::args().skip(4), move |worker| {
let index = worker.index();
let peers = worker.peers();
let mut input1 = InputHandle::new();
let mut input2 = InputHandle::new();
let probe = ProbeHandle::new();
worker.dataflow(|scope| {
let stream1 = scope.input_from(&mut input1);
let stream2 = scope.input_from(&mut input2);
let exchange1 = Exchange::new(|x: &(u64, u64)| x.0);
let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);
stream1
.binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
let mut map1 = HashMap::<u64, Vec<u64>>::new();
let mut map2 = HashMap::<u64, Vec<u64>>::new();
move |input1, input2, output| {
input1.for_each_time(|time, data| {
let mut session = output.session(&time);
for (key, val1) in data.flat_map(|d| d.drain(..)) {
if let Some(values) = map2.get(&key) {
for val2 in values.iter() {
session.give((val1, *val2));
}
}
map1.entry(key).or_default().push(val1);
}
});
input2.for_each_time(|time, data| {
let mut session = output.session(&time);
for (key, val2) in data.flat_map(|d| d.drain(..)) {
if let Some(values) = map1.get(&key) {
for val1 in values.iter() {
session.give((*val1, val2));
}
}
map2.entry(key).or_default().push(val2);
}
});
}
})
.container::<Vec<_>>()
.probe_with(&probe);
});
use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
let hasher = BuildHasherDefault::<DefaultHasher>::new();
let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) % keys,
hasher.hash_one(&(i,index,1)) % keys,
hasher.hash_one(&(i,index,2)) % keys,
hasher.hash_one(&(i,index,3)) % keys));
let timer = std::time::Instant::now();
let mut sent = 0;
while sent < (vals / peers) {
let to_send = std::cmp::min(batch, vals/peers - sent);
for (src0, dst0, src1, dst1) in (&mut insert).take(to_send) {
input1.send((src0, dst0));
input2.send((src1, dst1));
}
sent += to_send;
let next = input1.time() + 1;
input1.advance_to(next);
input2.advance_to(next);
while probe.less_than(input1.time()) {
worker.step();
}
println!("{:?}\tworker {} batch complete", timer.elapsed(), index)
}
}).unwrap(); }