1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
//! Directed label reachability.

use std::hash::Hash;
use std::ops::Mul;

use timely::dataflow::*;

use ::{Collection, ExchangeData};
use ::operators::*;
use ::lattice::Lattice;
use ::difference::Abelian;
use ::operators::arrange::arrangement::ArrangeByKey;

/// Propagates labels forward, retaining the minimum label.
///
/// This algorithm naively propagates all labels at once, much like standard label propagation.
/// To more carefully control the label propagation, consider `propagate_core` which supports a
/// method to limit the introduction of labels.
pub fn propagate<G, N, L, R>(edges: &Collection<G, (N,N), R>, nodes: &Collection<G,(N,L),R>) -> Collection<G,(N,L),R>
where
    G: Scope,
    G::Timestamp: Lattice+Ord,
    N: ExchangeData+Hash,
    R: ExchangeData+Abelian,
    R: Mul<R, Output=R>,
    R: From<i8>,
    L: ExchangeData,
{
    propagate_core(&edges.arrange_by_key(), nodes, |_label| 0)
}

/// Propagates labels forward, retaining the minimum label.
///
/// This algorithm naively propagates all labels at once, much like standard label propagation.
/// To more carefully control the label propagation, consider `propagate_core` which supports a
/// method to limit the introduction of labels.
pub fn propagate_at<G, N, L, F, R>(edges: &Collection<G, (N,N), R>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
where
    G: Scope,
    G::Timestamp: Lattice+Ord,
    N: ExchangeData+Hash,
    R: ExchangeData+Abelian,
    R: Mul<R, Output=R>,
    R: From<i8>,
    L: ExchangeData,
    F: Fn(&L)->u64+Clone+'static,
{
    propagate_core(&edges.arrange_by_key(), nodes, logic)
}

use trace::TraceReader;
use operators::arrange::arrangement::Arranged;

/// Propagates labels forward, retaining the minimum label.
///
/// This variant takes a pre-arranged edge collection, to facilitate re-use, and allows
/// a method `logic` to specify the rounds in which we introduce various labels. The output
/// of `logic should be a number in the interval [0,64],
pub fn propagate_core<G, N, L, Tr, F, R>(edges: &Arranged<G,Tr>, nodes: &Collection<G,(N,L),R>, logic: F) -> Collection<G,(N,L),R>
where
    G: Scope,
    G::Timestamp: Lattice+Ord,
    N: ExchangeData+Hash,
    R: ExchangeData+Abelian,
    R: Mul<R, Output=R>,
    R: From<i8>,
    L: ExchangeData,
    Tr: TraceReader<Key=N, Val=N, Time=G::Timestamp, R=R>+Clone+'static,
    Tr::Batch: crate::trace::BatchReader<N, N, G::Timestamp, Tr::R>+'static,
    Tr::Cursor: crate::trace::Cursor<N, N, G::Timestamp, Tr::R>+'static,
    F: Fn(&L)->u64+Clone+'static,
{
    // Morally the code performs the following iterative computation. However, in the interest of a simplified
    // dataflow graph and reduced memory footprint we instead have a wordier version below. The core differences
    // between the two are that 1. the former filters its input and pretends to perform non-monotonic computation,
    // whereas the latter creates an initially empty monotonic iteration variable, and 2. the latter rotates the
    // iterative computation so that the arrangement produced by `reduce` can be re-used.

    // nodes.filter(|_| false)
    //      .iterate(|inner| {
    //          let edges = edges.enter(&inner.scope());
    //          let nodes = nodes.enter_at(&inner.scope(), move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as u64));
    //          inner.join_map(&edges, |_k,l,d| (d.clone(),l.clone()))
    //               .concat(&nodes)
    //               .reduce(|_, s, t| t.push((s[0].0.clone(), 1)))
    //      })

    nodes.scope().iterative::<usize,_,_>(|scope| {

        use crate::operators::reduce::ReduceCore;
        use crate::operators::iterate::SemigroupVariable;
        use crate::trace::implementations::ord::OrdValSpine as DefaultValTrace;

        use timely::order::Product;

        let edges = edges.enter(scope);
        let nodes = nodes.enter_at(scope, move |r| 256 * (64 - (logic(&r.1)).leading_zeros() as usize));

        let proposals = SemigroupVariable::new(scope, Product::new(Default::default(), 1usize));

        let labels =
        proposals
            .concat(&nodes)
            .reduce_abelian::<_,DefaultValTrace<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1 as i8))));

        let propagate: Collection<_, (N, L), R> =
        labels
            .join_core(&edges, |_k, l: &L, d| Some((d.clone(), l.clone())));

        proposals.set(&propagate);

        labels
            .as_collection(|k,v| (k.clone(), v.clone()))
            .leave()
    })
}