use timely::dataflow::Scope;
use timely::dataflow::scopes::Child;
use timely::dataflow::operators::vec::{Filter, Map};
use differential_dataflow::{AsCollection, VecCollection, Data};
use differential_dataflow::difference::Abelian;
use crate::altneu::AltNeu;
pub trait Differentiate<G: Scope, D: Data, R: Abelian> {
fn differentiate<'a>(self, child: &Child<'a, G, AltNeu<G::Timestamp>>) -> VecCollection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>;
}
pub trait Integrate<G: Scope, D: Data, R: Abelian> {
fn integrate(self) -> VecCollection<G, D, R>;
}
impl<G, D, R> Differentiate<G, D, R> for VecCollection<G, D, R>
where
G: Scope,
D: Data,
R: Abelian + 'static,
{
fn differentiate<'a>(self, child: &Child<'a, G, AltNeu<G::Timestamp>>) -> VecCollection<Child<'a, G, AltNeu<G::Timestamp>>, D, R> {
self.enter(child)
.inner
.flat_map(|(data, time, diff)| {
let mut neg_diff = diff.clone();
neg_diff.negate();
let neu = (data.clone(), AltNeu::neu(time.time.clone()), neg_diff);
let alt = (data, time, diff);
Some(alt).into_iter().chain(Some(neu))
})
.as_collection()
}
}
impl<'a, G, D, R> Integrate<G, D, R> for VecCollection<Child<'a, G, AltNeu<G::Timestamp>>, D, R>
where
G: Scope,
D: Data,
R: Abelian + 'static,
{
fn integrate(self) -> VecCollection<G, D, R> {
self.inner
.filter(|(_d,t,_r)| !t.neu)
.as_collection()
.leave()
}
}