differential_dataflow/algorithms/graphs/
sequential.rs1use std::hash::Hash;
4
5use timely::progress::Timestamp;
6
7use crate::{VecCollection, ExchangeData};
8use crate::lattice::Lattice;
9use crate::operators::*;
10use crate::hashable::Hashable;
11
12fn _color<'scope, T, N>(edges: VecCollection<'scope, T, (N,N)>) -> VecCollection<'scope, T,(N,Option<u32>)>
13where
14 T: Timestamp + Lattice + Ord + Hash,
15 N: ExchangeData+Hash,
16{
17 let start = edges.clone()
19 .map(|(x,_y)| (x,u32::max_value()))
20 .distinct();
21
22 sequence(start, edges, |_node, vals| {
24
25 (1u32 ..)
29 .find(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
30 .unwrap()
31 })
32}
33
34pub fn sequence<'scope, T, N, V, F>(
44 state: VecCollection<'scope, T, (N,V)>,
45 edges: VecCollection<'scope, T, (N,N)>,
46 logic: F) -> VecCollection<'scope, T, (N,Option<V>)>
47where
48 T: Timestamp + Lattice + Hash + Ord,
49 N: ExchangeData+Hashable,
50 V: ExchangeData,
51 F: Fn(&N, &[(&V, isize)])->V+'static
52{
53
54 state
56 .clone()
57 .map(|(node, _state)| (node, None))
58 .iterate(|scope, new_state| {
59 let edges = edges.enter(scope);
61 let old_state = state.enter(scope);
62 let forward = edges.clone().filter(|edge| edge.0 < edge.1);
66 let reverse = edges.filter(|edge| edge.0 > edge.1);
67
68 let new_messages = new_state.join_map(forward, |_k,v,d| (d.clone(),v.clone()));
70
71 let incomplete = new_messages.clone().filter(|x| x.1.is_none()).map(|x| x.0).distinct();
72 let new_messages = new_messages.filter(|x| x.1.is_some()).map(|x| (x.0, x.1.unwrap()));
73
74 let old_messages = old_state.join_map(reverse, |_k,v,d| (d.clone(),v.clone()));
75
76 let messages = new_messages.concat(old_messages).antijoin(incomplete.clone());
77
78 messages
83 .reduce(move |k, vs, t| t.push((Some(logic(k,vs)),1)))
86 .concat(incomplete.map(|x| (x, None)))
87 })
88}