use std::fmt::Debug;
use timely::Container;
use timely::progress::Timestamp;
use timely::order::Product;
use timely::dataflow::*;
use timely::dataflow::scope::Iterative;
use timely::dataflow::operators::{Feedback, ConnectLoop};
use timely::dataflow::operators::feedback::Handle;
use crate::{Data, VecCollection, Collection};
use crate::difference::{Semigroup, Abelian};
use crate::lattice::Lattice;
pub trait Iterate<'scope, T: Timestamp + Lattice, D: Data, R: Semigroup> {
fn iterate<F>(self, logic: F) -> VecCollection<'scope, T, D, R>
where
for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product<T, u64>, D, R>)->VecCollection<'inner, Product<T, u64>, D, R>;
}
impl<'scope, T: Timestamp + Lattice, D: Ord+Data+Debug, R: Abelian+'static> Iterate<'scope, T, D, R> for VecCollection<'scope, T, D, R> {
fn iterate<F>(self, logic: F) -> VecCollection<'scope, T, D, R>
where
for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product<T, u64>, D, R>)->VecCollection<'inner, Product<T, u64>, D, R>,
{
let outer = self.inner.scope();
outer.scoped("Iterate", |subgraph| {
let (variable, collection) = Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1));
let result = logic(subgraph, collection);
variable.set(result.clone());
result.leave(outer)
})
}
}
impl<'scope, T: Timestamp + Lattice, D: Ord+Data+Debug, R: Semigroup+'static> Iterate<'scope, T, D, R> for Scope<'scope, T> {
fn iterate<F>(self, logic: F) -> VecCollection<'scope, T, D, R>
where
for<'inner> F: FnOnce(Iterative<'inner, T, u64>, VecCollection<'inner, Product<T, u64>, D, R>)->VecCollection<'inner, Product<T, u64>, D, R>,
{
let outer = self;
self.scoped("Iterate", |subgraph| {
let (variable, collection) = Variable::new(subgraph, Product::new(Default::default(), 1));
let result = logic(subgraph, collection);
variable.set(result.clone());
result.leave(outer)
}
)
}
}
pub struct Variable<'scope, T, C>
where
T: Timestamp + Lattice,
C: Container,
{
feedback: Handle<'scope, T, C>,
source: Option<Collection<'scope, T, C>>,
step: T::Summary,
}
pub type VecVariable<'scope, T, D, R> = Variable<'scope, T, Vec<(D, T, R)>>;
impl<'scope, T, C: Container> Variable<'scope, T, C>
where
T: Timestamp + Lattice,
C: crate::collection::containers::ResultsIn<T::Summary>,
{
pub fn new(scope: Scope<'scope, T>, step: T::Summary) -> (Self, Collection<'scope, T, C>) {
let (feedback, updates) = scope.feedback(step.clone());
let collection = Collection::<T, C>::new(updates);
(Self { feedback, source: None, step }, collection)
}
pub fn new_from(source: Collection<'scope, T, C>, step: T::Summary) -> (Self, Collection<'scope, T, C>) where C: Clone + crate::collection::containers::Negate {
let (feedback, updates) = source.inner.scope().feedback(step.clone());
let collection = Collection::<T, C>::new(updates).concat(source.clone());
(Variable { feedback, source: Some(source.negate()), step }, collection)
}
pub fn set(mut self, mut result: Collection<'scope, T, C>) {
if let Some(source) = self.source.take() {
result = result.concat(source);
}
result
.results_in(self.step)
.inner
.connect_loop(self.feedback);
}
}