extern crate rand;
extern crate timely;
extern crate differential_dataflow;
use timely::dataflow::*;
use timely::dataflow::operators::*;
use rand::{Rng, SeedableRng, StdRng};
use differential_dataflow::{Collection, AsCollection};
use differential_dataflow::operators::group::{CountUnsigned, Count};
use differential_dataflow::operators::join::JoinUnsigned;
use differential_dataflow::operators::threshold::ThresholdUnsigned;
use differential_dataflow::operators::iterate::IterateExt;
use differential_dataflow::collection::LeastUpperBound;
fn main() {
let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap();
let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
let k: i32 = std::env::args().nth(4).unwrap().parse().unwrap();
timely::execute_from_args(std::env::args().skip(5), move |computation| {
let index = computation.index();
let peers = computation.peers();
let (mut input, probe) = computation.scoped(|scope| {
let (input, edges) = scope.new_input();
let edges = edges.as_collection();
let edges = kcore(&edges, k);
let degrs = edges.flat_map(|(src,dst)| Some(src).into_iter().chain(Some(dst).into_iter()))
.count_u();
let distr = degrs.map(|(_, cnt)| cnt as u32)
.count_u();
let probe = distr.inspect(|x| println!("observed: {:?}", x))
.probe().0;
(input, probe)
});
let seed: &[_] = &[1, 2, 3, index];
let mut rng1: StdRng = SeedableRng::from_seed(seed); let mut rng2: StdRng = SeedableRng::from_seed(seed);
for edge in 0..edges {
if edge % peers == index {
input.send(((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1));
}
if edge % 10000 == 9999 {
computation.step();
}
}
let timer = ::std::time::Instant::now();
input.advance_to(1);
computation.step_while(|| probe.lt(input.time()));
if index == 0 {
println!("Loading finished after {:?}", timer.elapsed());
}
if batch > 0 {
for edge in 0usize .. {
if edge % peers == index {
input.send(((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1));
input.send(((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)),-1));
}
if edge % batch == (batch - 1) {
let timer = ::std::time::Instant::now();
let next = input.epoch() + 1;
input.advance_to(next);
computation.step_while(|| probe.lt(input.time()));
if index == 0 {
println!("Round {} finished after {:?}", next - 1, timer.elapsed());
}
}
}
}
}).unwrap();
}
fn kcore<G: Scope>(edges: &Collection<G, (u32, u32)>, k: i32) -> Collection<G, (u32, u32)>
where G::Timestamp: LeastUpperBound {
edges.iterate(|inner| {
let active = inner.flat_map(|(src,dst)| Some(src).into_iter().chain(Some(dst).into_iter()))
.threshold_u(move |_,cnt| if cnt >= k { 1 } else { 0 });
edges.enter(&inner.scope())
.semijoin_u(&active).map(|(src,dst)| (dst,src))
.semijoin_u(&active).map(|(dst,src)| (src,dst))
})
}