extern crate rand;
extern crate timely;
extern crate differential_dataflow;
use std::hash::Hash;
use std::time::Instant;
use timely::dataflow::*;
use rand::{Rng, SeedableRng, StdRng};
use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
type Node = usize;
type Edge = (Node, Node);
fn main() {
let users: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let topics: usize = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: usize = std::env::args().nth(3).unwrap().parse().unwrap();
let inspect: bool = std::env::args().nth(4).unwrap() == "inspect";
timely::execute_from_args(std::env::args().skip(5), move |worker| {
let timer = Instant::now();
let (mut tweets, mut queries, probe) = worker.dataflow(|scope| {
let (tweet_input, tweets) = scope.new_collection();
let labels = connected_components(&tweets.map(|(u,m,_)| (u,m)));
let label_topics = tweets.map(|(u,_,t)| (u,t))
.join_map(&labels, |_,&t,&l| (l,t));
let counts = label_topics.map(|x| (x,()))
.group(|_,s,t| t.push((s[0].1, 1)));
let k = 5;
let topk = counts.map(|((l,t), c)| (l, (-c, t)))
.group(move |_,s,t| {
t.extend(s[..k].iter().map(|&(&(_,t),_)| (t,1)));
});
let (query_input, queries) = scope.new_collection();
let label_query = queries.map(|q| (q,()))
.join_map(&labels, |q,_,&l| (l,q.clone()));
let mut query_topics = label_query.join_map(&topk, |k,x,&y| (k.clone(), x.clone(), y));
if !inspect {
query_topics = query_topics.filter(|_| false);
}
let probe = query_topics.consolidate()
.inspect(|&((l,q,t),_,w)| println!("\t(query: {},\tlabel: {},\ttopic:{}\t(weight: {})", q, l, t, w))
.probe();
(tweet_input, query_input, probe)
});
let tweet_seed: &[_] = &[0, 1, 2, worker.index()];
let mut tweet_rng1: StdRng = SeedableRng::from_seed(tweet_seed); let mut tweet_rng2: StdRng = SeedableRng::from_seed(tweet_seed);
let query_seed: &[_] = &[1, 2, 3, worker.index()];
let mut query_rng1: StdRng = SeedableRng::from_seed(query_seed); let mut query_rng2: StdRng = SeedableRng::from_seed(query_seed);
println!("performing AppealingDataflow with {} users, {} topics:", users, topics);
for _ in 0 .. users/worker.peers() {
tweets.insert((tweet_rng1.gen_range(0, users),
tweet_rng1.gen_range(0, users),
tweet_rng1.gen_range(0, topics)));
}
if worker.index() == 0 {
queries.insert(query_rng1.gen_range(0, users));
}
println!("loaded; elapsed: {:?}", timer.elapsed());
tweets.advance_to(1); tweets.flush();
queries.advance_to(1); queries.flush();
worker.step_while(|| probe.less_than(queries.time()));
println!("stable; elapsed: {:?}", timer.elapsed());
if batch > 0 {
for wave in 0.. {
let mut my_batch = batch / worker.peers();
if worker.index() < (batch % worker.peers()) {
my_batch += 1;
}
let start = ::std::time::Instant::now();
let round = *tweets.epoch();
for _ in 0..my_batch {
tweets.insert((tweet_rng1.gen_range(0, users),
tweet_rng1.gen_range(0, users),
tweet_rng1.gen_range(0, topics)));
tweets.remove((tweet_rng2.gen_range(0, users),
tweet_rng2.gen_range(0, users),
tweet_rng2.gen_range(0, topics)));
}
if worker.index() == 0 {
queries.insert(query_rng1.gen_range(0, users));
queries.remove(query_rng2.gen_range(0, users));
}
tweets.advance_to(round + 1); tweets.flush();
queries.advance_to(round + 1); queries.flush();
worker.step_while(|| probe.less_than(queries.time()));
if worker.index() == 0 {
println!("wave {}: avg {:?}", wave, start.elapsed() / (batch as u32));
}
}
}
}).unwrap();
}
fn connected_components<G: Scope>(edges: &Collection<G, Edge>) -> Collection<G, (Node, Node)>
where G::Timestamp: Lattice+Hash+Ord {
let nodes = edges.map_in_place(|pair| {
let min = std::cmp::min(pair.0, pair.1);
*pair = (min, min);
})
.consolidate();
let edges = edges.map_in_place(|x| ::std::mem::swap(&mut x.0, &mut x.1))
.concat(&edges);
nodes.filter(|_| false)
.iterate(|inner| {
let edges = edges.enter(&inner.scope());
let nodes = nodes.enter_at(&inner.scope(), |r| 256 * (64 - r.0.leading_zeros() as u64));
inner.join_map(&edges, |_k,l,d| (*d,*l))
.concat(&nodes)
.group(|_, s, t| { t.push((*s[0].0, 1)); } )
})
}