extern crate differential_dataflow;
extern crate timely;
use std::fmt::Debug;
use std::collections::BTreeMap;
use timely::dataflow::operators::probe::Handle;
use timely::progress::frontier::AntichainRef;
use differential_dataflow::input::Input;
use differential_dataflow::operators::arrange::ArrangeByKey;
use differential_dataflow::operators::*;
use differential_dataflow::trace::cursor::CursorDebug;
use differential_dataflow::trace::TraceReader;
type Node = u32;
type Edge = (Node, Node);
type Time = u32;
type Diff = isize;
fn main() {
let rounds: u32 = std::env::args().nth(1).unwrap().parse().unwrap();
let mut summaries = timely::execute_from_args(std::env::args(), move |worker| {
let mut probe = Handle::new();
let (mut graph, mut graph_trace) = worker.dataflow(|scope| {
let (graph_input, graph) = scope.new_collection();
let graph_arr = graph.map(|(x, y): Edge| (x, (x, y))).arrange_by_key();
let graph_trace = graph_arr.trace.clone();
graph_arr
.as_collection(|_, v| *v)
.consolidate()
.probe_with(&mut probe);
(graph_input, graph_trace)
});
if worker.index() != 0 {
graph.close();
for i in 1..rounds + 1 {
graph_trace.set_physical_compaction(AntichainRef::new(&[i]));
graph_trace.set_logical_compaction(AntichainRef::new(&[i]));
worker.step_while(|| probe.less_than(&i));
dump_cursor(i, worker.index(), &mut graph_trace);
}
} else {
for i in 1..rounds + 1 {
graph.insert((i, i + 1));
graph.insert((i + 1, i));
if i > 1 {
graph.remove((i - 1, i));
}
graph.advance_to(i);
graph.flush();
graph_trace.set_physical_compaction(AntichainRef::new(&[i]));
graph_trace.set_logical_compaction(AntichainRef::new(&[i]));
worker.step_while(|| probe.less_than(graph.time()));
dump_cursor(i, worker.index(), &mut graph_trace);
}
}
let (mut cursor, storage) = graph_trace.cursor();
cursor.to_vec(&storage)
})
.unwrap().join();
let mut graph_content: BTreeMap<Edge, Diff> = BTreeMap::new();
for summary in summaries.drain(..) {
let mut summary_vec: Vec<((Node, Edge), Vec<(Time, Diff)>)> = summary.unwrap();
for ((_, edge), timestamps) in summary_vec.drain(..) {
let diff: Diff = timestamps.iter().map(|(_,d)|d).sum();
if diff != 0 {
*graph_content.entry(edge).or_insert(0) += diff;
}
}
}
println!("Final graph: {:?}", graph_content);
let mut expected_graph_content: BTreeMap<Edge, Diff> = BTreeMap::new();
for i in 1..rounds+1 {
expected_graph_content.insert((i+1, i), 1);
}
expected_graph_content.insert((rounds, rounds+1), 1);
assert_eq!(graph_content, expected_graph_content);
}
fn dump_cursor<Tr>(round: u32, index: usize, trace: &mut Tr)
where
Tr: TraceReader,
Tr::Key: Debug + Clone,
Tr::Val: Debug + Clone,
Tr::Time: Debug + Clone,
Tr::R: Debug + Clone,
{
let (mut cursor, storage) = trace.cursor();
for ((k, v), diffs) in cursor.to_vec(&storage).iter() {
println!("round {}, w{} {:?}:{:?}: {:?}", round, index, *k, *v, diffs);
}
}