extern crate rand;
extern crate timely;
extern crate differential_dataflow;
extern crate graph_map;
use rand::{Rng, SeedableRng, StdRng};
use timely::dataflow::*;
use timely::dataflow::operators::*;
use differential_dataflow::input::Input;
use differential_dataflow::{Collection, AsCollection, Data};
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
use graph_map::GraphMMap;
type Node = u32;
type Edge = (Node, Node);
fn main() {
timely::execute_from_args(std::env::args(), move |worker| {
let (mut raw, mut trans, mut query, mut roots, query_probe, roots_probe) = worker.dataflow(|scope| {
let (raw_input, raw) = scope.new_collection();
let (trans_input, trans) = scope.new_collection();
let (handle, graph) = scope.loop_variable(u64::max_value(), 1);
let graph = graph.as_collection();
let aborts = transactional(&trans, &graph);
let writes = trans.filter(|x| !x.1)
.map(|x| (x.0, x.2))
.antijoin(&aborts)
.map(|(_,edge)| edge);
let graph = raw.concat(&writes);
graph.inner.connect_loop(handle);
let (query_input, query) = scope.new_collection();
let query_probe = graph.semijoin(&query)
.probe();
let (roots_input, roots) = scope.new_collection();
let result = reach(&graph, &roots);
let roots_probe = result.probe();
(raw_input, trans_input, query_input, roots_input, query_probe, roots_probe)
});
let filename = std::env::args().nth(1).unwrap();
let graph = GraphMMap::new(&filename);
if worker.index() == 0 {
for node in 0 .. graph.nodes() {
for &edge in graph.edges(node) {
raw.insert((node as u32, edge as u32));
}
}
}
raw.close();
let next = trans.epoch() + 2;
trans.advance_to(next); trans.flush();
query.advance_to(next); query.flush();
roots.advance_to(next); roots.flush();
while query_probe.less_than(&trans.time()) || roots_probe.less_than(&trans.time()) {
worker.step();
}
let seed: &[_] = &[1, 2, 3, 4];
let mut rng: StdRng = SeedableRng::from_seed(seed);
let block: u32 = std::env::args().nth(2).unwrap().parse().unwrap();
let mut reads = Vec::with_capacity(1000);
while reads.len() < reads.capacity() {
let mut buffer = vec![];
for i in 0 .. block as usize {
let key = rng.gen_range(0, graph.nodes() as u32);
if i % worker.peers() == worker.index() {
buffer.push(key);
}
}
reads.push(buffer);
}
let timer = ::std::time::Instant::now();
for buffer in &reads {
for &entry in buffer {
query.insert(entry);
}
let next = trans.epoch() + 1;
trans.advance_to(next); trans.flush();
query.advance_to(next); query.flush();
roots.advance_to(next); roots.flush();
while query_probe.less_than(&trans.time()) || roots_probe.less_than(&trans.time()) {
worker.step();
}
}
query.close();
println!("query elapsed: {:?} for 1,000 x {}", timer.elapsed(), block);
let mut travs = Vec::with_capacity(1000);
while travs.len() < travs.capacity() {
let mut temp = vec![];
for _ in 0 .. block {
temp.push(rng.gen_range(0, graph.nodes() as u32));
}
travs.push(temp);
}
let mut latencies = vec![];
let timer = ::std::time::Instant::now();
for buffer in &travs {
let inner_timer = ::std::time::Instant::now();
for &src in buffer {
roots.insert(src);
}
let next = trans.epoch() + 1;
trans.advance_to(next); trans.flush();
roots.advance_to(next); roots.flush();
while query_probe.less_than(&trans.time()) || roots_probe.less_than(&trans.time()) {
worker.step();
}
let elapsed = inner_timer.elapsed();
latencies.push(elapsed.as_secs() * 1000000000 + elapsed.subsec_nanos() as u64);
}
roots.close();
println!("travs elapsed: {:?} for 1,000 x {}", timer.elapsed(), block);
if worker.index() == 0 {
latencies[500 ..].sort();
for &x in latencies[500..].iter() {
println!("{}", (x as f64) / 1000000000.0f64);
}
}
let mut writes = Vec::with_capacity(1000);
let index = worker.index();
let peers = worker.peers();
let mut t_id = index;
while writes.len() < writes.capacity() {
let mut edge_set = vec![];
for _ in 0 .. block {
edge_set.push((rng.gen_range(0, graph.nodes() as u32), rng.gen_range(0, graph.nodes() as u32)));
}
let mut buffer = vec![];
for i in 0 .. block as usize {
let edge1 = edge_set[rng.gen_range(0, edge_set.len())];
let edge2 = edge_set[rng.gen_range(0, edge_set.len())];
if i % worker.peers() == worker.index() {
buffer.push((t_id, true, edge1));
buffer.push((t_id, false, edge2));
}
t_id += peers;
}
writes.push(buffer);
}
let timer = ::std::time::Instant::now();
for buffer in &writes {
for &entry in buffer {
trans.insert(entry);
}
let next = trans.epoch() + 1;
trans.advance_to(next); trans.flush();
while query_probe.less_than(&trans.time()) || roots_probe.less_than(&trans.time()) {
worker.step();
}
}
println!("trans elapsed: {:?} for 1,000 x {}", timer.elapsed(), block);
}).unwrap();
}
fn reach<G>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, Node)>
where G: Scope, G::Timestamp: Lattice+Ord {
let nodes = roots.map(|x| (x, x));
nodes.iterate(|inner| {
let edges = edges.enter(&inner.scope());
let nodes = nodes.enter(&inner.scope());
inner.join_map(&edges, |_k,&s,&d| (d, s))
.concat(&nodes)
.distinct()
})
}
fn transactional<G, D>(trans: &Collection<G, (usize, bool, D)>, state: &Collection<G, D>) -> Collection<G, usize>
where G: Scope, D: Data+Default+::std::hash::Hash, G::Timestamp: Lattice+Ord {
trans.filter(|_| false)
.map(|_| 0)
.iterate(|abort| {
let trans = trans.enter(&abort.scope());
let state = state.enter(&abort.scope())
.map(|x| (x, 0));
let reads = trans.filter(|x| x.1 == true)
.map(|x| (x.2, x.0));
let writes = trans.filter(|x| x.1 == false)
.map(|x| (x.0, x.2))
.antijoin(&abort)
.map(|x| (x.1, x.0));
let lookup = writes.concat(&state)
.join_map(&reads, |key, &tid1, &tid2| (key.clone(), tid1, tid2))
.filter(|&(_, tid1, tid2)| tid1 < tid2)
.map(|(k, _, tid2)| (k, tid2))
.distinct();
reads.map(|x| (x,()))
.antijoin(&lookup)
.map(|((_,t),())| t)
.distinct()
})
}