extern crate timely;
extern crate differential_dataflow;
use timely::progress::nested::product::Product;
use timely::dataflow::*;
use timely::dataflow::scopes::Child;
use timely::dataflow::operators::*;
use timely::dataflow::operators::feedback::Handle;
use differential_dataflow::input::Input;
use differential_dataflow::{Data, Collection, Hashable};
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
pub struct Variable<'a, G: Scope, D: Default+Data+Hashable>
where G::Timestamp: Lattice+Ord {
feedback: Option<Handle<G::Timestamp, u64,(D, Product<G::Timestamp, u64>, isize)>>,
current: Collection<Child<'a, G, u64>, D>,
cycle: Collection<Child<'a, G, u64>, D>,
}
impl<'a, G: Scope, D: Default+Data+Hashable> Variable<'a, G, D> where G::Timestamp: Lattice+Ord {
pub fn from(source: &Collection<Child<'a, G, u64>, D>) -> Variable<'a, G, D> {
let (feedback, cycle) = source.inner.scope().loop_variable(u64::max_value(), 1);
let cycle = Collection::new(cycle);
let mut result = Variable { feedback: Some(feedback), current: cycle.clone(), cycle: cycle };
result.add(source);
result
}
pub fn add(&mut self, source: &Collection<Child<'a, G, u64>, D>) {
self.current = self.current.concat(source);
}
}
impl<'a, G: Scope, D: Default+Data+Hashable> ::std::ops::Deref for Variable<'a, G, D> where G::Timestamp: Lattice+Ord {
type Target = Collection<Child<'a, G, u64>, D>;
fn deref(&self) -> &Self::Target {
&self.cycle
}
}
impl<'a, G: Scope, D: Default+Data+Hashable> Drop for Variable<'a, G, D> where G::Timestamp: Lattice+Ord {
fn drop(&mut self) {
if let Some(feedback) = self.feedback.take() {
self.current.distinct()
.inner
.connect_loop(feedback);
}
}
}
fn main() {
timely::execute_from_args(std::env::args(), |worker| {
worker.dataflow::<u64,_,_>(move |outer| {
let (_cin, c) = outer.new_collection::<(u32,u32,u32),isize>();
let (_pin, p) = outer.new_collection::<(u32,u32),isize>();
let (_qin, q) = outer.new_collection::<(u32,u32,u32),isize>();
let (_rin, r) = outer.new_collection::<(u32,u32,u32),isize>();
let (_sin, s) = outer.new_collection::<(u32,u32),isize>();
let (_uin, u) = outer.new_collection::<(u32,u32,u32),isize>();
let (_p, _q) = outer.scoped::<u64,_,_>(|inner| {
let mut p = Variable::from(&p.enter(inner));
let mut q = Variable::from(&q.enter(inner));
let c = Variable::from(&c.enter(inner));
let r = Variable::from(&r.enter(inner));
let s = Variable::from(&s.enter(inner));
let u = Variable::from(&u.enter(inner));
let ir1 = p.map(|(x,y)| (y,x))
.join_map(&p, |_y,&x,&z| (x,z));
p.add(&ir1);
let ir2 = p.map(|(x,y)| (y,x))
.join_map(&q.map(|(y,r,z)| (y,(r,z))), |_y,&x,&(r,z)| (x,r,z));
q.add(&ir2);
let ir3 = p.map(|(y,w)| (w,y))
.join_map(&u.map(|(w,r,z)| (w,(r,z))), |_w,&y,&(r,z)| ((y,r),z))
.join_map(&q.map(|(x,r,y)| ((y,r),x)), |_yr,&z,&x| (x,z));
p.add(&ir3);
let ir4 = c.map(|(y,w,z)| (w,(y,z)))
.join_map(&p.map(|(x,w)| (w,x)), |_w,&(y,z),&x| ((x,y),z))
.semijoin(&p)
.map(|((x,_y),z)| (x,z));
p.add(&ir4);
let ir5 = q.map(|(x,r,z)| (r,(x,z)))
.join_map(&s, |_r,&(x,z),&q| (x,q,z));
q.add(&ir5);
let ir6 = q.map(|(x,y,z)| (y,(x,z)))
.join_map(&r.map(|(y,u,e)| (y,(u,e))), |_y,&(x,z),&(u,e)| ((z,u),(x,e)))
.join_map(&q.map(|(z,u,o)| ((z,u),o)), |_zu,&(x,e),&o| (x,e,o));
q.add(&ir6);
(p.leave(), q.leave())
});
});
}).unwrap();
}