use std::fmt::Debug;
use std::ops::Deref;
use timely::order::Product;
use timely::progress::Timestamp;
use timely::Container;
use timely::dataflow::operators::feedback::Handle;
use timely::dataflow::operators::{ConnectLoop, Feedback};
use timely::dataflow::scopes::child::Iterative;
use timely::dataflow::*;
use crate::difference::{Abelian, Semigroup};
use crate::lattice::Lattice;
use crate::{Collection, Data, VecCollection};
pub trait Iterate<G: Scope<Timestamp: Lattice>, D: Data, R: Semigroup> {
fn iterate<F>(&self, logic: F) -> VecCollection<G, D, R>
where
for<'a> F: FnOnce(
&VecCollection<Iterative<'a, G, u64>, D, R>,
) -> VecCollection<Iterative<'a, G, u64>, D, R>;
}
impl<G: Scope<Timestamp: Lattice>, D: Ord + Data + Debug, R: Abelian + 'static> Iterate<G, D, R>
for VecCollection<G, D, R>
{
fn iterate<F>(&self, logic: F) -> VecCollection<G, D, R>
where
for<'a> F: FnOnce(
&VecCollection<Iterative<'a, G, u64>, D, R>,
) -> VecCollection<Iterative<'a, G, u64>, D, R>,
{
self.inner.scope().scoped("Iterate", |subgraph| {
let variable =
Variable::new_from(self.enter(subgraph), Product::new(Default::default(), 1));
let result = logic(&variable);
variable.set(&result);
result.leave()
})
}
}
impl<G: Scope<Timestamp: Lattice>, D: Ord + Data + Debug, R: Semigroup + 'static> Iterate<G, D, R>
for G
{
fn iterate<F>(&self, logic: F) -> VecCollection<G, D, R>
where
for<'a> F: FnOnce(
&VecCollection<Iterative<'a, G, u64>, D, R>,
) -> VecCollection<Iterative<'a, G, u64>, D, R>,
{
let mut clone = self.clone();
clone.scoped("Iterate", |subgraph| {
let variable = SemigroupVariable::new(subgraph, Product::new(Default::default(), 1));
let result = logic(&variable);
variable.set(&result);
result.leave()
})
}
}
pub struct Variable<G, C>
where
G: Scope<Timestamp: Lattice>,
C: Container,
{
collection: Collection<G, C>,
feedback: Handle<G, C>,
source: Option<Collection<G, C>>,
step: <G::Timestamp as Timestamp>::Summary,
}
pub type VecVariable<G, D, R> = Variable<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
impl<G, C: Container> Variable<G, C>
where
G: Scope<Timestamp: Lattice>,
C: crate::collection::containers::Negate
+ crate::collection::containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
{
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = scope.feedback(step.clone());
let collection = Collection::<G, C>::new(updates);
Self {
collection,
feedback,
source: None,
step,
}
}
pub fn new_from(source: Collection<G, C>, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = source.inner.scope().feedback(step.clone());
let collection = Collection::<G, C>::new(updates).concat(&source);
Variable {
collection,
feedback,
source: Some(source),
step,
}
}
pub fn set(self, result: &Collection<G, C>) -> Collection<G, C> {
let mut in_result = result.clone();
if let Some(source) = &self.source {
in_result = in_result.concat(&source.negate());
}
self.set_concat(&in_result)
}
pub fn set_concat(self, result: &Collection<G, C>) -> Collection<G, C> {
let step = self.step;
result.results_in(step).inner.connect_loop(self.feedback);
self.collection
}
}
impl<G: Scope<Timestamp: Lattice>, C: Container> Deref for Variable<G, C> {
type Target = Collection<G, C>;
fn deref(&self) -> &Self::Target {
&self.collection
}
}
pub struct SemigroupVariable<G, C>
where
G: Scope<Timestamp: Lattice>,
C: Container,
{
collection: Collection<G, C>,
feedback: Handle<G, C>,
step: <G::Timestamp as Timestamp>::Summary,
}
impl<G, C: Container> SemigroupVariable<G, C>
where
G: Scope<Timestamp: Lattice>,
C: crate::collection::containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
{
pub fn new(scope: &mut G, step: <G::Timestamp as Timestamp>::Summary) -> Self {
let (feedback, updates) = scope.feedback(step.clone());
let collection = Collection::<G, C>::new(updates);
SemigroupVariable {
collection,
feedback,
step,
}
}
pub fn set(self, result: &Collection<G, C>) -> Collection<G, C> {
let step = self.step;
result.results_in(step).inner.connect_loop(self.feedback);
self.collection
}
}
impl<G: Scope, C: Container> Deref for SemigroupVariable<G, C>
where
G::Timestamp: Lattice,
{
type Target = Collection<G, C>;
fn deref(&self) -> &Self::Target {
&self.collection
}
}