use std::fmt::Debug;
use std::ops::Deref;
use timely::progress::{Timestamp, PathSummary};
use timely::order::Product;
use timely::dataflow::*;
use timely::dataflow::scopes::child::Iterative;
use timely::dataflow::operators::{Feedback, ConnectLoop, Map};
use timely::dataflow::operators::feedback::Handle;
use ::{Data, Collection};
use ::difference::{Monoid, Abelian};
use lattice::Lattice;
pub trait Iterate<G: Scope, D: Data, R: Monoid> {
fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
where
G::Timestamp: Lattice,
for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>)->Collection<Iterative<'a, G, u64>, D, R>;
}
impl<G: Scope, D: Ord+Data+Debug, R: Abelian> Iterate<G, D, R> for Collection<G, D, R> {
fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
where G::Timestamp: Lattice,
for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>)->Collection<Iterative<'a, G, u64>, D, R> {
self.inner.scope().scoped("Iterate", |subgraph| {
let variable = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1));
let result = logic(&variable);
variable.set(&result);
result.leave()
})
}
}
impl<G: Scope, D: Ord+Data+Debug, R: Monoid> Iterate<G, D, R> for G {
fn iterate<F>(&self, logic: F) -> Collection<G, D, R>
where G::Timestamp: Lattice,
for<'a> F: FnOnce(&Collection<Iterative<'a, G, u64>, D, R>)->Collection<Iterative<'a, G, u64>, D, R> {
let mut clone = self.clone();
clone
.scoped("Iterate", |subgraph| {
let variable = MonoidVariable::new(subgraph, Product::new(Default::default(), 1));
let result = logic(&variable);
variable.set(&result);
result.leave()
}
)
}
}
pub struct Variable<G: Scope, D: Data, R: Abelian>
where G::Timestamp: Lattice {
collection: Collection<G, D, R>,
feedback: Handle<G, (D, G::Timestamp, R)>,
source: Collection<G, D, R>,
step: <G::Timestamp as Timestamp>::Summary,
}
impl<G: Scope, D: Data, R: Abelian> Variable<G, D, R> where G::Timestamp: Lattice {
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
use collection::AsCollection;
let empty = ::timely::dataflow::operators::generic::operator::empty(scope).as_collection();
Self::new_from(empty, step)
}
pub fn new_from(source: Collection<G, D, R>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = source.inner.scope().feedback(step.clone());
let collection = Collection::new(updates).concat(&source);
Variable { collection, feedback, source, step }
}
pub fn set(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
let step = self.step;
self.source
.negate()
.concat(result)
.inner
.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
.connect_loop(self.feedback);
self.collection
}
}
impl<G: Scope, D: Data, R: Abelian> Deref for Variable<G, D, R> where G::Timestamp: Lattice {
type Target = Collection<G, D, R>;
fn deref(&self) -> &Self::Target {
&self.collection
}
}
pub struct MonoidVariable<G: Scope, D: Data, R: Monoid>
where G::Timestamp: Lattice {
collection: Collection<G, D, R>,
feedback: Handle<G, (D, G::Timestamp, R)>,
step: <G::Timestamp as Timestamp>::Summary,
}
impl<G: Scope, D: Data, R: Monoid> MonoidVariable<G, D, R> where G::Timestamp: Lattice {
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = scope.feedback(step.clone());
let collection = Collection::new(updates);
MonoidVariable { collection, feedback, step }
}
pub fn set(self, result: &Collection<G, D, R>) -> Collection<G, D, R> {
let step = self.step;
result
.inner
.flat_map(move |(x,t,d)| step.results_in(&t).map(|t| (x,t,d)))
.connect_loop(self.feedback);
self.collection
}
}
impl<G: Scope, D: Data, R: Monoid> Deref for MonoidVariable<G, D, R> where G::Timestamp: Lattice {
type Target = Collection<G, D, R>;
fn deref(&self) -> &Self::Target {
&self.collection
}
}