differential_dataflow/algorithms/graphs/sequential.rs
1//! Sequential (non-concurrent) graph algorithms.
2
3use std::hash::Hash;
4
5use timely::dataflow::*;
6
7use crate::{Collection, ExchangeData};
8use crate::lattice::Lattice;
9use crate::operators::*;
10use crate::hashable::Hashable;
11
12fn _color<G, N>(edges: &Collection<G, (N,N)>) -> Collection<G,(N,Option<u32>)>
13where
14 G: Scope<Timestamp: Lattice+Ord>,
15 N: ExchangeData+Hash,
16{
17 // need some bogus initial values.
18 let start = edges.map(|(x,_y)| (x,u32::max_value()))
19 .distinct();
20
21 // repeatedly apply color-picking logic.
22 sequence(&start, edges, |_node, vals| {
23
24 // look for the first absent positive integer.
25 // start at 1 in case we ever use NonZero<u32>.
26
27 (1u32 ..)
28 .find(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
29 .unwrap()
30 })
31}
32
33/// Applies `logic` to nodes sequentially, in order of node identifiers.
34///
35/// The `logic` function updates a node's state as a function of its
36/// neighbor states. It will only be called on complete input.
37///
38/// Internally, this method performs a fixed-point computation in which
39/// a node "fires" once all of its neighbors with lower identifier have
40/// fired, and we apply `logic` to the new state of lower neighbors and
41/// the old state (input) of higher neighbors.
42pub fn sequence<G, N, V, F>(
43 state: &Collection<G, (N,V)>,
44 edges: &Collection<G, (N,N)>,
45 logic: F) -> Collection<G, (N,Option<V>)>
46where
47 G: Scope<Timestamp: Lattice+Hash+Ord>,
48 N: ExchangeData+Hashable,
49 V: ExchangeData,
50 F: Fn(&N, &[(&V, isize)])->V+'static
51{
52
53 // start iteration with None messages for all.
54 state
55 .map(|(node, _state)| (node, None))
56 .iterate(|new_state| {
57 // immutable content: edges and initial state.
58 let edges = edges.enter(&new_state.scope());
59 let old_state = state.enter(&new_state.scope());
60 // .map(|x| (x.0, Some(x.1)));
61
62 // break edges into forward and reverse directions.
63 let forward = edges.filter(|edge| edge.0 < edge.1);
64 let reverse = edges.filter(|edge| edge.0 > edge.1);
65
66 // new state goes along forward edges, old state along reverse edges
67 let new_messages = new_state.join_map(&forward, |_k,v,d| (d.clone(),v.clone()));
68
69 let incomplete = new_messages.filter(|x| x.1.is_none()).map(|x| x.0).distinct();
70 let new_messages = new_messages.filter(|x| x.1.is_some()).map(|x| (x.0, x.1.unwrap()));
71
72 let old_messages = old_state.join_map(&reverse, |_k,v,d| (d.clone(),v.clone()));
73
74 let messages = new_messages.concat(&old_messages).antijoin(&incomplete);
75
76 // // determine who has incoming `None` messages, and suppress all of them.
77 // let incomplete = new_messages.filter(|x| x.1.is_none()).map(|x| x.0).distinct();
78
79 // merge messages; suppress computation if not all inputs available yet.
80 messages
81 // .concat(&old_messages) // /-- possibly too clever: None if any inputs None.
82 // .antijoin(&incomplete)
83 .reduce(move |k, vs, t| t.push((Some(logic(k,vs)),1)))
84 .concat(&incomplete.map(|x| (x, None)))
85 })
86}