use std::cmp::Ordering;
use timely::dataflow::*;
use timely::progress::Timestamp;
use timely::dataflow::operators::{Input, Exchange, Probe};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;
fn main() {
let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
timely::execute_from_args(std::env::args().skip(4), move |worker| {
let index = worker.index();
let peers = worker.peers();
let mut input = InputHandle::new();
let probe = ProbeHandle::new();
worker.dataflow(|scope| {
scope.input_from(&mut input)
.union_find()
.exchange(|_| 0)
.union_find()
.probe_with(&probe);
});
use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
let hasher = BuildHasherDefault::<DefaultHasher>::new();
let insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
hasher.hash_one(&(i,index,1)) as usize % nodes));
for (edge, arc) in insert.take(edges / peers).enumerate() {
input.send(arc);
if edge % batch == (batch - 1) {
let next = input.time() + 1;
input.advance_to(next);
while probe.less_than(input.time()) {
worker.step();
}
}
}
}).unwrap(); }
trait UnionFind {
fn union_find(self) -> Self;
}
impl<'scope, T: Timestamp> UnionFind for StreamVec<'scope, T, (usize, usize)> {
fn union_find(self) -> Self {
self.unary(Pipeline, "UnionFind", |_,_| {
let mut roots = vec![]; let mut ranks = vec![];
move |input, output| {
input.for_each_time(|time, data| {
let mut session = output.session(&time);
for &mut (mut x, mut y) in data.flatten() {
let m = ::std::cmp::max(x, y);
for i in roots.len() .. (m + 1) {
roots.push(i);
ranks.push(0);
}
while x != roots[x] { x = roots[x]; }
while y != roots[y] { y = roots[y]; }
if x != y {
session.give((x, y));
match ranks[x].cmp(&ranks[y]) {
Ordering::Less => { roots[x] = y },
Ordering::Greater => { roots[y] = x },
Ordering::Equal => { roots[y] = x; ranks[x] += 1 },
}
}
}
});
}
})
}
}