use std::fmt::Debug;
use std::ops::Deref;
use timely::progress::nested::product::Product;
use timely::dataflow::*;
use timely::dataflow::scopes::Child;
use timely::dataflow::operators::*;
use timely::dataflow::operators::feedback::Handle;
use ::{Data, Collection, Diff};
use lattice::Lattice;
pub trait Iterate<G: Scope, D: Data, R: Diff> {
fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
where G::Timestamp: Lattice,
for<'a> F: FnOnce(&Collection<Child<'a, G, u64>, D, R>)->Collection<Child<'a, G, u64>, D, R>;
}
impl<G: Scope, D: Ord+Data+Debug, R: Diff> Iterate<G, D, R> for Collection<G, D, R> {
fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
where G::Timestamp: Lattice,
for<'a> F: FnOnce(&Collection<Child<'a, G, u64>, D, R>)->Collection<Child<'a, G, u64>, D, R> {
self.inner.scope().scoped(|subgraph| {
let variable = Variable::from(self.enter(subgraph));
let result = logic(&variable);
variable.set(&result);
result.leave()
})
}
}
pub struct Variable<'a, G: Scope, D: Data, R: Diff>
where G::Timestamp: Lattice {
collection: Collection<Child<'a, G, u64>, D, R>,
feedback: Handle<G::Timestamp, u64,(D, Product<G::Timestamp, u64>, R)>,
source: Collection<Child<'a, G, u64>, D, R>,
}
impl<'a, G: Scope, D: Data, R: Diff> Variable<'a, G, D, R> where G::Timestamp: Lattice {
pub fn from(source: Collection<Child<'a, G, u64>, D, R>) -> Variable<'a, G, D, R> {
let (feedback, updates) = source.inner.scope().loop_variable(u64::max_value(), 1);
let collection = Collection::new(updates).concat(&source);
Variable { collection: collection, feedback: feedback, source: source }
}
pub fn set(self, result: &Collection<Child<'a, G, u64>, D, R>) -> Collection<Child<'a, G, u64>, D, R> {
self.source.negate()
.concat(result)
.inner
.map(|(x,t,d)| (x, Product::new(t.outer, t.inner+1), d))
.connect_loop(self.feedback);
self.collection
}
}
impl<'a, G: Scope, D: Data, R: Diff> Deref for Variable<'a, G, D, R> where G::Timestamp: Lattice {
type Target = Collection<Child<'a, G, u64>, D, R>;
fn deref(&self) -> &Self::Target {
&self.collection
}
}