1use std::cmp::Ordering;
2
3use timely::dataflow::*;
4use timely::dataflow::operators::{Input, Exchange, Probe};
5use timely::dataflow::operators::generic::operator::Operator;
6use timely::dataflow::channels::pact::Pipeline;
7
8fn main() {
9
10 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15 timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17 let index = worker.index();
18 let peers = worker.peers();
19
20 let mut input = InputHandle::new();
21 let probe = ProbeHandle::new();
22
23 worker.dataflow(|scope| {
24 scope.input_from(&mut input)
25 .union_find()
27 .exchange(|_| 0)
28 .union_find()
29 .probe_with(&probe);
30 });
31
32 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34 let hasher = BuildHasherDefault::<DefaultHasher>::new();
35 let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36 hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38 for (edge, arc) in insert.take(edges / peers).enumerate() {
39 input.send(arc);
40 if edge % batch == (batch - 1) {
41 let next = input.epoch() + 1;
42 input.advance_to(next);
43 while probe.less_than(input.time()) {
44 worker.step();
45 }
46 }
47 }
48
49 }).unwrap(); }
51
52trait UnionFind {
53 fn union_find(self) -> Self;
54}
55
56impl<G: Scope> UnionFind for StreamVec<G, (usize, usize)> {
57 fn union_find(self) -> StreamVec<G, (usize, usize)> {
58
59 self.unary(Pipeline, "UnionFind", |_,_| {
60
61 let mut roots = vec![]; let mut ranks = vec![]; move |input, output| {
65
66 input.for_each_time(|time, data| {
67 let mut session = output.session(&time);
68 for &mut (mut x, mut y) in data.flatten() {
69
70 let m = ::std::cmp::max(x, y);
72 for i in roots.len() .. (m + 1) {
73 roots.push(i);
74 ranks.push(0);
75 }
76
77 while x != roots[x] { x = roots[x]; }
79 while y != roots[y] { y = roots[y]; }
80
81 if x != y {
82 session.give((x, y));
83 match ranks[x].cmp(&ranks[y]) {
84 Ordering::Less => { roots[x] = y },
85 Ordering::Greater => { roots[y] = x },
86 Ordering::Equal => { roots[y] = x; ranks[x] += 1 },
87 }
88 }
89 }
90 });
91 }
92 })
93 }
94}