1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use std::collections::HashMap;
use timely::dataflow::operators::{ToStream, Concat, Feedback, ConnectLoop};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Exchange;
fn main() {
// command-line args: numbers of nodes and edges in the random graph.
let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
// let logging = ::timely::logging::to_tcp_socket();
timely::execute_from_args(std::env::args().skip(3), move |worker| {
let index = worker.index();
let peers = worker.peers();
// pending edges and node updates.
let mut edge_list = Vec::new();
let mut node_lists = HashMap::new();
// graph data; offsets into targets.
let mut offsets = Vec::new();
let mut targets = Vec::new();
// holds the bfs parent of each node, or u32::MAX if unset.
let mut done = vec![u32::MAX; 1 + (nodes / peers)];
let start = std::time::Instant::now();
worker.dataflow::<usize,_,_>(move |scope| {
// generate part of a random graph.
use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
let hasher = BuildHasherDefault::<DefaultHasher>::new();
let graph =
(0..edges/peers)
.map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
hasher.hash_one(&(i,index,1)) as usize % nodes))
.map(|(src,dst)| (src as u32, dst as u32))
.to_stream(scope)
.container::<Vec<_>>();
// define a loop variable, for the (node, worker) pairs.
let (handle, stream) = scope.feedback(1usize);
// use the stream of edges
graph.binary_notify(
stream,
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
"BFS",
vec![],
move |input1, input2, output, notify| {
// receive edges, start to sort them
input1.for_each_time(|time, data| {
notify.notify_at(time.retain(output.output_index()));
edge_list.extend(data.map(std::mem::take));
});
// receive (node, worker) pairs, note any new ones.
input2.for_each_time(|time, data| {
node_lists.entry(*time.time())
.or_insert_with(|| {
notify.notify_at(time.retain(output.output_index()));
Vec::new()
})
.extend(data.map(std::mem::take));
});
notify.for_each(|time, _num, _notify| {
// maybe process the graph
if *time == 0 {
// print some diagnostic timing information
if index == 0 { println!("{:?}:\tsorting", start.elapsed()); }
// sort the edges (previously: radix sorted).
edge_list.sort();
let mut count = 0;
for buffer in &edge_list { count += buffer.len(); }
// allocate sufficient memory, to avoid resizing.
offsets = Vec::with_capacity(1 + (nodes / peers));
targets = Vec::with_capacity(count);
// construct the graph
offsets.push(0);
let mut prev_node = 0;
for buffer in edge_list.drain(..) {
for (node, edge) in buffer {
let temp = node / peers as u32;
while prev_node < temp {
prev_node += 1;
offsets.push(targets.len() as u32)
}
targets.push(edge);
}
}
while offsets.len() < offsets.capacity() {
offsets.push(targets.len() as u32);
}
}
// print some diagnostic timing information
if index == 0 { println!("{:?}:\ttime: {:?}", start.elapsed(), time.time()); }
if let Some(mut todo) = node_lists.remove(&time) {
let mut session = output.session(&time);
// we could sort these, or not (previously: radix sorted).
// todo.sort();
for buffer in todo.drain(..) {
for (node, prev) in buffer {
let temp = (node as usize) / peers;
if done[temp] == u32::MAX {
done[temp] = prev;
let lower = offsets[temp] as usize;
let upper = offsets[temp + 1] as usize;
for &target in &targets[lower..upper] {
session.give((target, node));
}
}
}
}
}
});
}
)
.concat((0..1).map(|x| (x,x)).to_stream(scope))
.connect_loop(handle);
});
}).unwrap(); // asserts error-free execution;
}