Skip to main content

palimpsest_dataflow/algorithms/graphs/
sequential.rs

1//! Sequential (non-concurrent) graph algorithms.
2
3use std::hash::Hash;
4
5use timely::dataflow::*;
6
7use crate::hashable::Hashable;
8use crate::lattice::Lattice;
9use crate::operators::*;
10use crate::{ExchangeData, VecCollection};
11
12fn _color<G, N>(edges: &VecCollection<G, (N, N)>) -> VecCollection<G, (N, Option<u32>)>
13where
14    G: Scope<Timestamp: Lattice + Ord + Hash>,
15    N: ExchangeData + Hash,
16{
17    // need some bogus initial values.
18    let start = edges.map(|(x, _y)| (x, u32::max_value())).distinct();
19
20    // repeatedly apply color-picking logic.
21    sequence(&start, edges, |_node, vals| {
22        // look for the first absent positive integer.
23        // start at 1 in case we ever use NonZero<u32>.
24
25        (1u32..)
26            .find(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
27            .unwrap()
28    })
29}
30
31/// Applies `logic` to nodes sequentially, in order of node identifiers.
32///
33/// The `logic` function updates a node's state as a function of its
34/// neighbor states. It will only be called on complete input.
35///
36/// Internally, this method performs a fixed-point computation in which
37/// a node "fires" once all of its neighbors with lower identifier have
38/// fired, and we apply `logic` to the new state of lower neighbors and
39/// the old state (input) of higher neighbors.
40pub fn sequence<G, N, V, F>(
41    state: &VecCollection<G, (N, V)>,
42    edges: &VecCollection<G, (N, N)>,
43    logic: F,
44) -> VecCollection<G, (N, Option<V>)>
45where
46    G: Scope<Timestamp: Lattice + Hash + Ord>,
47    N: ExchangeData + Hashable,
48    V: ExchangeData,
49    F: Fn(&N, &[(&V, isize)]) -> V + 'static,
50{
51    // start iteration with None messages for all.
52    state
53        .map(|(node, _state)| (node, None))
54        .iterate(|new_state| {
55            // immutable content: edges and initial state.
56            let edges = edges.enter(&new_state.scope());
57            let old_state = state.enter(&new_state.scope());
58            // .map(|x| (x.0, Some(x.1)));
59
60            // break edges into forward and reverse directions.
61            let forward = edges.filter(|edge| edge.0 < edge.1);
62            let reverse = edges.filter(|edge| edge.0 > edge.1);
63
64            // new state goes along forward edges, old state along reverse edges
65            let new_messages = new_state.join_map(&forward, |_k, v, d| (d.clone(), v.clone()));
66
67            let incomplete = new_messages
68                .filter(|x| x.1.is_none())
69                .map(|x| x.0)
70                .distinct();
71            let new_messages = new_messages
72                .filter(|x| x.1.is_some())
73                .map(|x| (x.0, x.1.unwrap()));
74
75            let old_messages = old_state.join_map(&reverse, |_k, v, d| (d.clone(), v.clone()));
76
77            let messages = new_messages.concat(&old_messages).antijoin(&incomplete);
78
79            // // determine who has incoming `None` messages, and suppress all of them.
80            // let incomplete = new_messages.filter(|x| x.1.is_none()).map(|x| x.0).distinct();
81
82            // merge messages; suppress computation if not all inputs available yet.
83            messages
84                // .concat(&old_messages)  // /-- possibly too clever: None if any inputs None.
85                // .antijoin(&incomplete)
86                .reduce(move |k, vs, t| t.push((Some(logic(k,vs)),1)))
87                .concat(&incomplete.map(|x| (x, None)))
88        })
89}