Skip to main content

unionfind/
unionfind.rs

1use std::cmp::Ordering;
2
3use timely::dataflow::*;
4use timely::dataflow::operators::{Input, Exchange, Probe};
5use timely::dataflow::operators::generic::operator::Operator;
6use timely::dataflow::channels::pact::Pipeline;
7
8fn main() {
9
10    // command-line args: numbers of nodes and edges in the random graph.
11    let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
12    let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
13    let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
14
15    timely::execute_from_args(std::env::args().skip(4), move |worker| {
16
17        let index = worker.index();
18        let peers = worker.peers();
19
20        let mut input = InputHandle::new();
21        let probe = ProbeHandle::new();
22
23        worker.dataflow(|scope| {
24            scope.input_from(&mut input)
25                //  .exchange(move |x: &(usize, usize)| (x.0 % (peers - 1)) as u64 + 1)
26                 .union_find()
27                 .exchange(|_| 0)
28                 .union_find()
29                 .probe_with(&probe);
30        });
31
32        // Generate roughly random data.
33        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
34        let hasher = BuildHasherDefault::<DefaultHasher>::new();
35        let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
36                                         hasher.hash_one(&(i,index,1)) as usize % nodes));
37
38        for (edge, arc) in insert.take(edges / peers).enumerate() {
39            input.send(arc);
40            if edge % batch == (batch - 1) {
41                let next = input.epoch() + 1;
42                input.advance_to(next);
43                while probe.less_than(input.time()) {
44                    worker.step();
45                }
46            }
47        }
48
49    }).unwrap(); // asserts error-free execution;
50}
51
52trait UnionFind {
53    fn union_find(self) -> Self;
54}
55
56impl<G: Scope> UnionFind for StreamVec<G, (usize, usize)> {
57    fn union_find(self) -> StreamVec<G, (usize, usize)> {
58
59        self.unary(Pipeline, "UnionFind", |_,_| {
60
61            let mut roots = vec![];  // u32 works, and is smaller than uint/u64
62            let mut ranks = vec![];  // u8 should be large enough (n < 2^256)
63
64            move |input, output| {
65
66                input.for_each_time(|time, data| {
67                    let mut session = output.session(&time);
68                    for &mut (mut x, mut y) in data.flatten() {
69
70                        // grow arrays if required.
71                        let m = ::std::cmp::max(x, y);
72                        for i in roots.len() .. (m + 1) {
73                            roots.push(i);
74                            ranks.push(0);
75                        }
76
77                        // look up roots for `x` and `y`.
78                        while x != roots[x] { x = roots[x]; }
79                        while y != roots[y] { y = roots[y]; }
80
81                        if x != y {
82                            session.give((x, y));
83                            match ranks[x].cmp(&ranks[y]) {
84                                Ordering::Less    => { roots[x] = y },
85                                Ordering::Greater => { roots[y] = x },
86                                Ordering::Equal   => { roots[y] = x; ranks[x] += 1 },
87                            }
88                        }
89                    }
90                });
91            }
92        })
93    }
94}