extern crate rand;
extern crate timely;
use std::cmp::Ordering;
use rand::{Rng, SeedableRng, StdRng};
use timely::dataflow::*;
use timely::dataflow::operators::{Input, Exchange, Probe};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
fn main() {
let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
timely::execute_from_args(std::env::args().skip(4), move |worker| {
let index = worker.index();
let peers = worker.peers();
let mut input = InputHandle::new();
let mut probe = ProbeHandle::new();
worker.dataflow(|scope| {
scope.input_from(&mut input)
.union_find()
.exchange(|_| 0)
.union_find()
.probe_with(&mut probe);
});
let seed: &[_] = &[1, 2, 3, index];
let mut rng: StdRng = SeedableRng::from_seed(seed);
for edge in 0..(edges / peers) {
input.send((rng.gen_range(0, nodes), rng.gen_range(0, nodes)));
if edge % batch == (batch - 1) {
let next = input.epoch() + 1;
input.advance_to(next);
while probe.less_than(input.time()) {
worker.step();
}
}
}
}).unwrap(); }
trait UnionFind {
fn union_find(&self) -> Self;
}
impl<G: Scope> UnionFind for Stream<G, (usize, usize)> {
fn union_find(&self) -> Stream<G, (usize, usize)> {
self.unary(Pipeline, "UnionFind", |_,_| {
let mut roots = vec![]; let mut ranks = vec![];
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session(&time);
for &(mut x, mut y) in data.iter() {
let m = ::std::cmp::max(x, y);
for i in roots.len() .. (m + 1) {
roots.push(i);
ranks.push(0);
}
while x != roots[x] { x = roots[x]; }
while y != roots[y] { y = roots[y]; }
if x != y {
session.give((x, y));
match ranks[x].cmp(&ranks[y]) {
Ordering::Less => { roots[x] = y },
Ordering::Greater => { roots[y] = x },
Ordering::Equal => { roots[y] = x; ranks[x] += 1 },
}
}
}
}
}
})
}
}