1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use std::hash::Hash;
use timely::dataflow::*;
use ::{Collection, ExchangeData};
use ::lattice::Lattice;
use ::operators::*;
use hashable::Hashable;
fn _color<G, N>(edges: &Collection<G, (N,N)>) -> Collection<G,(N,Option<u32>)>
where
G: Scope,
G::Timestamp: Lattice+Ord,
N: ExchangeData+Hash,
{
let start = edges.map(|(x,_y)| (x,u32::max_value()))
.distinct();
sequence(&start, &edges, |_node, vals| {
(1u32 ..)
.filter(|&i| vals.get(i as usize - 1).map(|x| *x.0) != Some(i))
.next()
.unwrap()
})
}
pub fn sequence<G, N, V, F>(
state: &Collection<G, (N,V)>,
edges: &Collection<G, (N,N)>,
logic: F) -> Collection<G, (N,Option<V>)>
where
G: Scope,
G::Timestamp: Lattice+Hash+Ord,
N: ExchangeData+Hashable,
V: ExchangeData,
F: Fn(&N, &[(&V, isize)])->V+'static
{
let timer = ::std::time::Instant::now();
state
.map(|(node, _state)| (node, None))
.iterate(|new_state| {
new_state.map(|x| x.1.is_some()).consolidate().inspect(move |x| println!("{:?}\t{:?}", timer.elapsed(), x));
let edges = edges.enter(&new_state.scope());
let old_state = state.enter(&new_state.scope());
let forward = edges.filter(|edge| edge.0 < edge.1);
let reverse = edges.filter(|edge| edge.0 > edge.1);
let new_messages = new_state.join_map(&forward, |_k,v,d| (d.clone(),v.clone()));
let incomplete = new_messages.filter(|x| x.1.is_none()).map(|x| x.0).distinct();
let new_messages = new_messages.filter(|x| x.1.is_some()).map(|x| (x.0, x.1.unwrap()));
let old_messages = old_state.join_map(&reverse, |_k,v,d| (d.clone(),v.clone()));
let messages = new_messages.concat(&old_messages).antijoin(&incomplete);
messages
.reduce(move |k, vs, t| t.push((Some(logic(k,vs)),1)))
.concat(&incomplete.map(|x| (x, None)))
})
}