1use std::cmp::Ordering;
2
3use timely::dataflow::*;
4use timely::progress::Timestamp;
5use timely::dataflow::operators::{Input, Exchange, Probe};
6use timely::dataflow::operators::generic::operator::Operator;
7use timely::dataflow::channels::pact::Pipeline;
8
9fn main() {
10
11 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
13 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
14 let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
15
16 timely::execute_from_args(std::env::args().skip(4), move |worker| {
17
18 let index = worker.index();
19 let peers = worker.peers();
20
21 let mut input = InputHandle::new();
22 let probe = ProbeHandle::new();
23
24 worker.dataflow(|scope| {
25 scope.input_from(&mut input)
26 .union_find()
28 .exchange(|_| 0)
29 .union_find()
30 .probe_with(&probe);
31 });
32
33 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
35 let hasher = BuildHasherDefault::<DefaultHasher>::new();
36 let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
37 hasher.hash_one(&(i,index,1)) as usize % nodes));
38
39 for (edge, arc) in insert.take(edges / peers).enumerate() {
40 input.send(arc);
41 if edge % batch == (batch - 1) {
42 let next = input.time() + 1;
43 input.advance_to(next);
44 while probe.less_than(input.time()) {
45 worker.step();
46 }
47 }
48 }
49
50 }).unwrap(); }
52
53trait UnionFind {
54 fn union_find(self) -> Self;
55}
56
57impl<'scope, T: Timestamp> UnionFind for StreamVec<'scope, T, (usize, usize)> {
58 fn union_find(self) -> Self {
59
60 self.unary(Pipeline, "UnionFind", |_,_| {
61
62 let mut roots = vec![]; let mut ranks = vec![]; move |input, output| {
66
67 input.for_each_time(|time, data| {
68 let mut session = output.session(&time);
69 for &mut (mut x, mut y) in data.flatten() {
70
71 let m = ::std::cmp::max(x, y);
73 for i in roots.len() .. (m + 1) {
74 roots.push(i);
75 ranks.push(0);
76 }
77
78 while x != roots[x] { x = roots[x]; }
80 while y != roots[y] { y = roots[y]; }
81
82 if x != y {
83 session.give((x, y));
84 match ranks[x].cmp(&ranks[y]) {
85 Ordering::Less => { roots[x] = y },
86 Ordering::Greater => { roots[y] = x },
87 Ordering::Equal => { roots[y] = x; ranks[x] += 1 },
88 }
89 }
90 }
91 });
92 }
93 })
94 }
95}