use std::hash::Hash;
use timely::dataflow::*;
use crate::difference::{Abelian, Multiply};
use crate::lattice::Lattice;
use crate::operators::arrange::arrangement::ArrangeByKey;
use crate::{ExchangeData, VecCollection};
pub fn propagate<G, N, L, R>(
edges: &VecCollection<G, (N, N), R>,
nodes: &VecCollection<G, (N, L), R>,
) -> VecCollection<G, (N, L), R>
where
G: Scope<Timestamp: Lattice + Ord + Hash>,
N: ExchangeData + Hash,
R: ExchangeData + Abelian,
R: Multiply<R, Output = R>,
R: From<i8>,
L: ExchangeData,
{
propagate_core(&edges.arrange_by_key(), nodes, |_label| 0)
}
pub fn propagate_at<G, N, L, F, R>(
edges: &VecCollection<G, (N, N), R>,
nodes: &VecCollection<G, (N, L), R>,
logic: F,
) -> VecCollection<G, (N, L), R>
where
G: Scope<Timestamp: Lattice + Ord + Hash>,
N: ExchangeData + Hash,
R: ExchangeData + Abelian,
R: Multiply<R, Output = R>,
R: From<i8>,
L: ExchangeData,
F: Fn(&L) -> u64 + Clone + 'static,
{
propagate_core(&edges.arrange_by_key(), nodes, logic)
}
use crate::operators::arrange::arrangement::Arranged;
use crate::trace::TraceReader;
pub fn propagate_core<G, N, L, Tr, F, R>(
edges: &Arranged<G, Tr>,
nodes: &VecCollection<G, (N, L), R>,
logic: F,
) -> VecCollection<G, (N, L), R>
where
G: Scope<Timestamp = Tr::Time>,
N: ExchangeData + Hash,
R: ExchangeData + Abelian,
R: Multiply<R, Output = R>,
R: From<i8>,
L: ExchangeData,
Tr: for<'a> TraceReader<Key<'a> = &'a N, Val<'a> = &'a N, Time: Hash, Diff = R>
+ Clone
+ 'static,
F: Fn(&L) -> u64 + Clone + 'static,
{
nodes.scope().iterative::<usize, _, _>(|scope| {
use crate::operators::iterate::SemigroupVariable;
use crate::operators::reduce::ReduceCore;
use crate::trace::implementations::{ValBuilder, ValSpine};
use timely::order::Product;
let edges = edges.enter(scope);
let nodes = nodes.enter_at(scope, move |r| {
256 * (64 - (logic(&r.1)).leading_zeros() as usize)
});
let proposals = SemigroupVariable::new(scope, Product::new(Default::default(), 1usize));
let labels = proposals
.concat(&nodes)
.reduce_abelian::<_, ValBuilder<_, _, _, _>, ValSpine<_, _, _, _>>(
"Propagate",
|_, s, t| t.push((s[0].0.clone(), R::from(1_i8))),
);
let propagate: VecCollection<_, (N, L), R> =
labels.join_core(&edges, |_k, l: &L, d| Some((d.clone(), l.clone())));
proposals.set(&propagate);
labels.as_collection(|k, v| (k.clone(), v.clone())).leave()
})
}