Skip to main content

hashjoin/
hashjoin.rs

1use std::collections::HashMap;
2
3use timely::dataflow::*;
4use timely::dataflow::operators::{Input, Probe};
5use timely::dataflow::operators::generic::Operator;
6use timely::dataflow::channels::pact::Exchange;
7
8fn main() {
9
10    // command-line args: numbers of nodes and edges in the random graph.
11    let keys: u64 = std::env::args().nth(1).unwrap().parse().unwrap();
12    let vals: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13    let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15    timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17        let index = worker.index();
18        let peers = worker.peers();
19
20        let mut input1 = InputHandle::new();
21        let mut input2 = InputHandle::new();
22        let probe = ProbeHandle::new();
23
24        worker.dataflow(|scope| {
25
26            let stream1 = scope.input_from(&mut input1);
27            let stream2 = scope.input_from(&mut input2);
28
29            let exchange1 = Exchange::new(|x: &(u64, u64)| x.0);
30            let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);
31
32            stream1
33                .binary(stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
34
35                    let mut map1 = HashMap::<u64, Vec<u64>>::new();
36                    let mut map2 = HashMap::<u64, Vec<u64>>::new();
37
38                    move |input1, input2, output| {
39
40                        // Drain first input, check second map, update first map.
41                        input1.for_each_time(|time, data| {
42                            let mut session = output.session(&time);
43                            for (key, val1) in data.flat_map(|d| d.drain(..)) {
44                                if let Some(values) = map2.get(&key) {
45                                    for val2 in values.iter() {
46                                        session.give((val1, *val2));
47                                    }
48                                }
49
50                                map1.entry(key).or_default().push(val1);
51                            }
52                        });
53
54                        // Drain second input, check first map, update second map.
55                        input2.for_each_time(|time, data| {
56                            let mut session = output.session(&time);
57                            for (key, val2) in data.flat_map(|d| d.drain(..)) {
58                                if let Some(values) = map1.get(&key) {
59                                    for val1 in values.iter() {
60                                        session.give((*val1, val2));
61                                    }
62                                }
63
64                                map2.entry(key).or_default().push(val2);
65                            }
66                        });
67                    }
68                })
69                .container::<Vec<_>>()
70                .probe_with(&probe);
71        });
72
73        // Generate roughly random data.
74        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
75        let hasher = BuildHasherDefault::<DefaultHasher>::new();
76        let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) % keys,
77                                             hasher.hash_one(&(i,index,1)) % keys,
78                                             hasher.hash_one(&(i,index,2)) % keys,
79                                             hasher.hash_one(&(i,index,3)) % keys));
80
81        let timer = std::time::Instant::now();
82
83        let mut sent = 0;
84        while sent < (vals / peers) {
85
86            // Send some amount of data, no more than `batch`.
87            let to_send = std::cmp::min(batch, vals/peers - sent);
88            for (src0, dst0, src1, dst1) in (&mut insert).take(to_send) {
89                input1.send((src0, dst0));
90                input2.send((src1, dst1));
91            }
92            sent += to_send;
93
94            // Advance input, iterate until data cleared.
95            let next = input1.epoch() + 1;
96            input1.advance_to(next);
97            input2.advance_to(next);
98            while probe.less_than(input1.time()) {
99                worker.step();
100            }
101
102            println!("{:?}\tworker {} batch complete", timer.elapsed(), index)
103        }
104
105    }).unwrap(); // asserts error-free execution;
106}