use std::hash::Hash;
use timely::progress::Timestamp;
use crate::{VecCollection, ExchangeData};
use crate::lattice::Lattice;
use crate::difference::{Abelian, Multiply};
pub fn propagate<'scope, T, N, L, R>(edges: VecCollection<'scope, T, (N,N), R>, nodes: VecCollection<'scope, T,(N,L),R>) -> VecCollection<'scope, T,(N,L),R>
where
T: Timestamp + Lattice + Ord + Hash,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
{
propagate_core(edges.arrange_by_key(), nodes, |_label| 0)
}
pub fn propagate_at<'scope, T, N, L, F, R>(edges: VecCollection<'scope, T, (N,N), R>, nodes: VecCollection<'scope, T,(N,L),R>, logic: F) -> VecCollection<'scope, T,(N,L),R>
where
T: Timestamp + Lattice + Ord + Hash,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
F: Fn(&L)->u64+Clone+'static,
{
propagate_core(edges.arrange_by_key(), nodes, logic)
}
use crate::trace::TraceReader;
use crate::operators::arrange::arrangement::Arranged;
pub fn propagate_core<'scope, N, L, Tr, F, R>(edges: Arranged<'scope, Tr>, nodes: VecCollection<'scope, Tr::Time,(N,L),R>, logic: F) -> VecCollection<'scope, Tr::Time,(N,L),R>
where
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Multiply<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: for<'a> TraceReader<Key<'a>=&'a N, Val<'a>=&'a N, Diff=R, Time: Hash>+Clone+'static,
F: Fn(&L)->u64+Clone+'static,
{
use timely::order::Product;
let outer = nodes.scope();
outer.scoped::<Product<_, usize>,_,_>("Propagate", |scope| {
use crate::operators::iterate::Variable;
use crate::trace::implementations::{ValBuilder, ValSpine};
use timely::order::Product;
let edges = edges.enter(scope);
let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));
let (proposals_bind, proposals) = Variable::new(scope, Product::new(Default::default(), 1usize));
let labels =
proposals
.concat(nodes)
.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
let propagate: VecCollection<_, (N, L), R> =
labels
.clone()
.join_core(edges, |_k, l: &L, d| Some((d.clone(), l.clone())));
proposals_bind.set(propagate);
labels
.as_collection(|k,v| (k.clone(), v.clone()))
.leave(outer)
})
}