Skip to main content

differential_dataflow/algorithms/graphs/
sequential.rs

1//! Sequential (non-concurrent) graph algorithms.
2
3use std::hash::Hash;
4
5use timely::dataflow::*;
6
7use crate::{VecCollection, ExchangeData};
8use crate::lattice::Lattice;
9use crate::operators::*;
10use crate::hashable::Hashable;
11
12fn _color<G, N>(edges: VecCollection<G, (N,N)>) -> VecCollection<G,(N,Option<u32>)>
13where
14    G: Scope<Timestamp: Lattice+Ord+Hash>,
15    N: ExchangeData+Hash,
16{
17    // need some bogus initial values.
18    let start = edges.clone()
19                     .map(|(x,_y)| (x,u32::max_value()))
20                     .distinct();
21
22    // repeatedly apply color-picking logic.
23    sequence(start, edges, |_node, vals| {
24
25        // look for the first absent positive integer.
26        // start at 1 in case we ever use NonZero<u32>.
27
28        (1u32 ..)
29            .find(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
30            .unwrap()
31    })
32}
33
34/// Applies `logic` to nodes sequentially, in order of node identifiers.
35///
36/// The `logic` function updates a node's state as a function of its
37/// neighbor states. It will only be called on complete input.
38///
39/// Internally, this method performs a fixed-point computation in which
40/// a node "fires" once all of its neighbors with lower identifier have
41/// fired, and we apply `logic` to the new state of lower neighbors and
42/// the old state (input) of higher neighbors.
43pub fn sequence<G, N, V, F>(
44    state: VecCollection<G, (N,V)>,
45    edges: VecCollection<G, (N,N)>,
46    logic: F) -> VecCollection<G, (N,Option<V>)>
47where
48    G: Scope<Timestamp: Lattice+Hash+Ord>,
49    N: ExchangeData+Hashable,
50    V: ExchangeData,
51    F: Fn(&N, &[(&V, isize)])->V+'static
52{
53
54    // start iteration with None messages for all.
55    state
56        .clone()
57        .map(|(node, _state)| (node, None))
58        .iterate(|scope, new_state| {
59            // immutable content: edges and initial state.
60            let edges = edges.enter(&scope);
61            let old_state = state.enter(&scope);
62                                 // .map(|x| (x.0, Some(x.1)));
63
64            // break edges into forward and reverse directions.
65            let forward = edges.clone().filter(|edge| edge.0 < edge.1);
66            let reverse = edges.filter(|edge| edge.0 > edge.1);
67
68            // new state goes along forward edges, old state along reverse edges
69            let new_messages = new_state.join_map(forward, |_k,v,d| (d.clone(),v.clone()));
70
71            let incomplete = new_messages.clone().filter(|x| x.1.is_none()).map(|x| x.0).distinct();
72            let new_messages = new_messages.filter(|x| x.1.is_some()).map(|x| (x.0, x.1.unwrap()));
73
74            let old_messages = old_state.join_map(reverse, |_k,v,d| (d.clone(),v.clone()));
75
76            let messages = new_messages.concat(old_messages).antijoin(incomplete.clone());
77
78            // // determine who has incoming `None` messages, and suppress all of them.
79            // let incomplete = new_messages.filter(|x| x.1.is_none()).map(|x| x.0).distinct();
80
81            // merge messages; suppress computation if not all inputs available yet.
82            messages
83                // .concat(old_messages)  // /-- possibly too clever: None if any inputs None.
84                // .antijoin(incomplete)
85                .reduce(move |k, vs, t| t.push((Some(logic(k,vs)),1)))
86                .concat(incomplete.map(|x| (x, None)))
87        })
88}