use std::fmt::Debug;
use std::ops::Deref;
use timely::dataflow::*;
use timely::dataflow::scopes::Child;
use timely::dataflow::operators::*;
use timely::dataflow::operators::feedback::Handle;
use ::{Data, Collection};
use collection::LeastUpperBound;
pub trait IterateExt<G: Scope, D: Data> {
fn iterate<F>(&self, logic: F) -> Collection<G, D>
where G::Timestamp: LeastUpperBound,
for<'a> F: FnOnce(&Collection<Child<'a, G, u64>, D>)->Collection<Child<'a, G, u64>, D>;
}
impl<G: Scope, D: Ord+Data+Debug> IterateExt<G, D> for Collection<G, D> {
fn iterate<F>(&self, logic: F) -> Collection<G, D>
where G::Timestamp: LeastUpperBound,
for<'a> F: FnOnce(&Collection<Child<'a, G, u64>, D>)->Collection<Child<'a, G, u64>, D> {
self.inner.scope().scoped(|subgraph| {
let variable = Variable::from(self.enter(subgraph));
let result = logic(&variable);
variable.set(&result);
result.leave()
})
}
}
pub struct Variable<'a, G: Scope, D: Data>
where G::Timestamp: LeastUpperBound {
collection: Collection<Child<'a, G, u64>, D>,
feedback: Handle<G::Timestamp, u64,(D, i32)>,
source: Collection<Child<'a, G, u64>, D>,
}
impl<'a, G: Scope, D: Data> Variable<'a, G, D> where G::Timestamp: LeastUpperBound {
pub fn from(source: Collection<Child<'a, G, u64>, D>) -> Variable<'a, G, D> {
let (feedback, updates) = source.inner.scope().loop_variable(u64::max_value(), 1);
let collection = Collection::new(updates).concat(&source);
Variable { collection: collection, feedback: feedback, source: source }
}
pub fn set(self, result: &Collection<Child<'a, G, u64>, D>) -> Collection<Child<'a, G, u64>, D> {
self.source.negate()
.concat(result)
.inner
.connect_loop(self.feedback);
self.collection
}
}
impl<'a, G: Scope, D: Data> Deref for Variable<'a, G, D> where G::Timestamp: LeastUpperBound {
type Target = Collection<Child<'a, G, u64>, D>;
fn deref(&self) -> &Self::Target {
&self.collection
}
}