1use std::collections::HashMap;
2
3use timely::dataflow::*;
4use timely::dataflow::operators::{Input, Probe};
5use timely::dataflow::operators::generic::Operator;
6use timely::dataflow::channels::pact::Exchange;
7
8fn main() {
9
10 let keys: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
12 let vals: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input1 = InputHandle::new();
21 let mut input2 = InputHandle::new();
22 let probe = ProbeHandle::new();
23
24 worker.dataflow(|scope| {
25
26 let stream1 = scope.input_from(&mut input1);
27 let stream2 = scope.input_from(&mut input2);
28
29 let exchange1 = Exchange::new(|x: &(u64, u64)| x.0);
30 let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);
31
32 stream1
33 .binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
34
35 let mut map1 = HashMap::<u64, Vec<u64>>::new();
36 let mut map2 = HashMap::<u64, Vec<u64>>::new();
37
38 move |input1, input2, output| {
39
40 input1.for_each_time(|time, data| {
42 let mut session = output.session(&time);
43 for (key, val1) in data.flat_map(|d| d.drain(..)) {
44 if let Some(values) = map2.get(&key) {
45 for val2 in values.iter() {
46 session.give((val1, *val2));
47 }
48 }
49
50 map1.entry(key).or_default().push(val1);
51 }
52 });
53
54 input2.for_each_time(|time, data| {
56 let mut session = output.session(&time);
57 for (key, val2) in data.flat_map(|d| d.drain(..)) {
58 if let Some(values) = map1.get(&key) {
59 for val1 in values.iter() {
60 session.give((*val1, val2));
61 }
62 }
63
64 map2.entry(key).or_default().push(val2);
65 }
66 });
67 }
68 })
69 .container::<Vec<_>>()
70 .probe_with(&probe);
71 });
72
73 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
75 let hasher = BuildHasherDefault::<DefaultHasher>::new();
76 let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) % keys,
77 hasher.hash_one(&(i,index,1)) % keys,
78 hasher.hash_one(&(i,index,2)) % keys,
79 hasher.hash_one(&(i,index,3)) % keys));
80
81 let timer = std::time::Instant::now();
82
83 let mut sent = 0;
84 while sent < (vals / peers) {
85
86 let to_send = std::cmp::min(batch, vals/peers - sent);
88 for (src0, dst0, src1, dst1) in (&mut insert).take(to_send) {
89 input1.send((src0, dst0));
90 input2.send((src1, dst1));
91 }
92 sent += to_send;
93
94 let next = input1.epoch() + 1;
96 input1.advance_to(next);
97 input2.advance_to(next);
98 while probe.less_than(input1.time()) {
99 worker.step();
100 }
101
102 println!("{:?}\tworker {} batch complete", timer.elapsed(), index)
103 }
104
105 }).unwrap(); }