use std::hash::Hash;
use std::ops::Mul;
use timely::dataflow::*;
use ::{Collection, ExchangeData};
use ::operators::*;
use ::lattice::Lattice;
use ::difference::Abelian;
use ::operators::arrange::arrangement::ArrangeByKey;
pub fn propagate<G, N, L, R>(edges: &Collection<G, (N,N), R>, nodes: &Collection<G,(N,L),R>) -> Collection<G,(N,L),R>
where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Mul<R, Output=R>,
R: From<i8>,
L: ExchangeData,
{
propagate_core(&edges.arrange_by_key(), nodes, |_label| 0)
}
pub fn propagate_at<G, N, L, F, R>(edges: &Collection<G, (N,N), R>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Mul<R, Output=R>,
R: From<i8>,
L: ExchangeData,
F: Fn(&L)->u64+Clone+'static,
{
propagate_core(&edges.arrange_by_key(), nodes, logic)
}
use trace::TraceReader;
use operators::arrange::arrangement::Arranged;
pub fn propagate_core<G, N, L, Tr, F, R>(edges: &Arranged<G,Tr>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
R: ExchangeData+Abelian,
R: Mul<R, Output=R>,
R: From<i8>,
L: ExchangeData,
Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=R>+Clone+'static,
Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
F: Fn(&L)->u64+Clone+'static,
{
nodes.scope().iterative::<usize,_,_>(|scope| {
use crate::operators::reduce::ReduceCore;
use crate::operators::iterate::SemigroupVariable;
use crate::trace::implementations::ord::OrdValSpine as DefaultValTrace;
use timely::order::Product;
let edges = edges.enter(scope);
let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));
let proposals = SemigroupVariable::new(scope, Product::new(Default::default(), 1usize));
let labels =
proposals
.concat(&nodes)
.reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1 as i8))));
let propagate: Collection<_, (N, L), R> =
labels
.join_core(&edges, |_k, l: &L, d| Some((d.clone(), l.clone())));
proposals.set(&propagate);
labels
.as_collection(|k,v| (k.clone(), v.clone()))
.leave()
})
}