extern crate indexmap;
extern crate timely;
extern crate differential_dataflow;
use std::io::{BufRead, BufReader};
use std::fs::File;
use indexmap::IndexMap;
use timely::progress::Timestamp;
use timely::order::Product;
use timely::dataflow::Scope;
use timely::dataflow::scopes::ScopeParent;
use differential_dataflow::Collection;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::input::{Input, InputSession};
use differential_dataflow::operators::arrange::{ArrangeByKey, ArrangeBySelf};
use differential_dataflow::operators::iterate::Variable;
use differential_dataflow::operators::{Threshold, JoinCore, Consolidate};
type Node = usize;
type Edge = (Node, Node);
type Iter = usize;
type Diff = isize;
#[derive(Debug)]
pub enum Relation {
Forward(String),
Reverse(String),
}
#[derive(Debug)]
pub struct Production {
pub left_hand: String,
pub relations: Vec<Relation>,
}
impl<'a> From<&'a str> for Production {
fn from(text: &'a str) -> Production {
let mut names = text.split_whitespace();
let left_hand = names.next().expect("All rules must have a target.").to_string();
let relations = names.map(|name| {
if name.starts_with('~') { Relation::Reverse(name.split_at(1).1.to_string()) }
else { Relation::Forward(name.to_string()) }
}).collect();
Production { left_hand, relations }
}
}
#[derive(Debug)]
pub struct Query {
pub productions: Vec<Production>,
}
use differential_dataflow::trace::implementations::ord::{OrdValSpine, OrdKeySpine};
use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
type TraceKeyHandle<K,T,R> = TraceAgent<OrdKeySpine<K, T, R>>;
type TraceValHandle<K,V,T,R> = TraceAgent<OrdValSpine<K, V, T, R>>;
type Arrange<G,K,V,R> = Arranged<G, TraceValHandle<K, V, <G as ScopeParent>::Timestamp, R>>;
pub struct EdgeVariable<G: Scope> where G::Timestamp : Lattice {
variable: Variable<G, Edge, Diff>,
current: Collection<G, Edge, Diff>,
forward: Option<Arrange<G, Node, Node, Diff>>,
reverse: Option<Arrange<G, Node, Node, Diff>>,
}
impl<G: Scope> EdgeVariable<G> where G::Timestamp : Lattice {
pub fn from(source: &Collection<G, Edge>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let variable = Variable::new(&mut source.scope(), step);
EdgeVariable {
variable: variable,
current: source.clone(),
forward: None,
reverse: None,
}
}
pub fn add_production(&mut self, production: &Collection<G, Edge, Diff>) {
self.current = self.current.concat(production);
}
pub fn complete(self) {
let distinct = self.current.distinct();
self.variable.set(&distinct);
}
pub fn forward(&mut self) -> &Arrange<G, Node, Node, Diff> {
if self.forward.is_none() {
self.forward = Some(self.variable.arrange_by_key());
}
self.forward.as_ref().unwrap()
}
pub fn reverse(&mut self) -> &Arrange<G, Node, Node, Diff> {
if self.reverse.is_none() {
self.reverse = Some(self.variable.map(|(x,y)| (y,x)).arrange_by_key());
}
self.reverse.as_ref().unwrap()
}
}
pub struct RelationHandles<T: Timestamp+Lattice> {
pub input: InputSession<T, Edge, Diff>,
pub trace: TraceKeyHandle<Edge, T, Diff>,
}
impl Query {
pub fn build_from<'a>(iterator: impl IntoIterator<Item=&'a str>) -> Self {
Query { productions: iterator.into_iter().map(|text| Production::from(text)).collect() }
}
pub fn render_in<G: Scope>(&self, scope: &mut G) -> IndexMap<String, RelationHandles<G::Timestamp>>
where G::Timestamp: Lattice+::timely::order::TotalOrder {
let mut input_map = IndexMap::new();
for production in self.productions.iter() {
input_map.entry(production.left_hand.clone()).or_insert_with(|| scope.new_collection());
}
scope.iterative::<Iter,_,_>(|subscope| {
let mut result_map = IndexMap::new();
let mut variable_map = IndexMap::new();
for (name, (input, collection)) in input_map.drain(..) {
let edge_variable = EdgeVariable::from(&collection.enter(subscope), Product::new(Default::default(), 1));
let trace = edge_variable.variable.leave().arrange_by_self().trace;
result_map.insert(name.clone(), RelationHandles { input, trace });
variable_map.insert(name.clone(), edge_variable);
}
for production in self.productions.iter() {
let name = &production.left_hand;
let rule = &production.relations;
if rule.len() > 0 {
let mut transposed = match &rule[0] {
Relation::Forward(name) => variable_map[name].reverse().clone(),
Relation::Reverse(name) => variable_map[name].forward().clone(),
};
for relation in rule[1..].iter() {
let to_join = match relation {
Relation::Forward(name) => variable_map[name].forward(),
Relation::Reverse(name) => variable_map[name].reverse(),
};
transposed =
transposed
.join_core(to_join, |_k,&x,&y| Some((y,x)))
.arrange_by_key();
}
variable_map[name].add_production(&transposed.as_collection(|&dst,&src| (src,dst)));
}
}
for (_name, variable) in variable_map.drain(..) {
variable.complete();
}
result_map
})
}
}
fn main() {
timely::execute_from_args(std::env::args(), move |worker| {
let timer = ::std::time::Instant::now();
let peers = worker.peers();
let index = worker.index();
let query_filename = std::env::args().nth(1).expect("Argument 1 (query filename) missing.");
let query_text = std::fs::read_to_string(query_filename).expect("Failed to read query file");
let query = Query::build_from(query_text.lines());
let mut relation_map = worker.dataflow::<(),_,_>(|scope| query.render_in(scope));
if index == 0 { println!("{:?}:\tDataflow assembled for {:?}", timer.elapsed(), query); }
worker.dataflow(|scope| {
for (name, data) in relation_map.iter_mut() {
let name = name.to_string();
data.trace
.import(scope)
.as_collection(|&_kv,&()| ())
.consolidate()
.inspect(move |x| println!("{:?}\tfinal size of relation '{}': {:?}", timer.elapsed(), name, x.2));
}
});
let data_filename = std::env::args().nth(2).expect("Argument 2 (data filename) missing.");
let file = BufReader::new(File::open(data_filename).expect("Failed to read data file"));
for readline in file.lines() {
let line = readline.ok().expect("read error");
if !line.starts_with('#') && line.len() > 0 {
let mut elts = line[..].split_whitespace();
let src = elts.next().expect("data line with no src (1st) element").parse().expect("malformed src");
if (src as usize) % peers == index {
let dst = elts.next().expect("data line with no dst (2nd) element").parse().expect("malformed dst");
let val: &str = elts.next().expect("data line with no val (3rd) element");
if let Some(handle) = relation_map.get_mut(val) {
handle.input.insert((src, dst));
}
else {
panic!("couldn't find the named relation: {:?}", val);
}
}
}
}
if index == 0 { println!("{:?}:\tData loaded", timer.elapsed()); }
}).expect("Timely computation did not complete cleanly");
}