1use std::collections::HashMap;
2
3use timely::dataflow::operators::{ToStream, Concat, Feedback, ConnectLoop};
4use timely::dataflow::operators::generic::operator::Operator;
5use timely::dataflow::channels::pact::Exchange;
6
7fn main() {
8
9 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
11 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
12
13 timely::execute_from_args(std::env::args().skip(3), move |worker| {
15
16 let index = worker.index();
17 let peers = worker.peers();
18
19 let mut edge_list = Vec::new();
21 let mut node_lists = HashMap::new();
22
23 let mut offsets = Vec::new();
25 let mut targets = Vec::new();
26
27 let mut done = vec![u32::MAX; 1 + (nodes / peers)];
29
30 let start = std::time::Instant::now();
31
32 worker.dataflow::<usize,_,_>(move |scope| {
33
34 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
36 let hasher = BuildHasherDefault::<DefaultHasher>::new();
37 let graph =
38 (0..edges/peers)
39 .map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
40 hasher.hash_one(&(i,index,1)) as usize % nodes))
41 .map(|(src,dst)| (src as u32, dst as u32))
42 .to_stream(scope)
43 .container::<Vec<_>>();
44
45 let (handle, stream) = scope.feedback(1usize);
47
48 graph.binary_notify(
50 stream,
51 Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
52 Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
53 "BFS",
54 vec![],
55 move |input1, input2, output, notify| {
56
57 input1.for_each_time(|time, data| {
59 notify.notify_at(time.retain(output.output_index()));
60 edge_list.extend(data.map(std::mem::take));
61 });
62
63 input2.for_each_time(|time, data| {
65 node_lists.entry(*time.time())
66 .or_insert_with(|| {
67 notify.notify_at(time.retain(output.output_index()));
68 Vec::new()
69 })
70 .extend(data.map(std::mem::take));
71 });
72
73 notify.for_each(|time, _num, _notify| {
74
75 if *time == 0 {
77
78 if index == 0 { println!("{:?}:\tsorting", start.elapsed()); }
80
81 edge_list.sort();
83
84 let mut count = 0;
85 for buffer in &edge_list { count += buffer.len(); }
86
87 offsets = Vec::with_capacity(1 + (nodes / peers));
89 targets = Vec::with_capacity(count);
90
91 offsets.push(0);
93 let mut prev_node = 0;
94 for buffer in edge_list.drain(..) {
95 for (node, edge) in buffer {
96 let temp = node / peers as u32;
97 while prev_node < temp {
98 prev_node += 1;
99 offsets.push(targets.len() as u32)
100 }
101 targets.push(edge);
102 }
103 }
104 while offsets.len() < offsets.capacity() {
105 offsets.push(targets.len() as u32);
106 }
107 }
108
109 if index == 0 { println!("{:?}:\ttime: {:?}", start.elapsed(), time.time()); }
111
112 if let Some(mut todo) = node_lists.remove(&time) {
113 let mut session = output.session(&time);
114
115 for buffer in todo.drain(..) {
119 for (node, prev) in buffer {
120 let temp = (node as usize) / peers;
121 if done[temp] == u32::MAX {
122 done[temp] = prev;
123 let lower = offsets[temp] as usize;
124 let upper = offsets[temp + 1] as usize;
125 for &target in &targets[lower..upper] {
126 session.give((target, node));
127 }
128 }
129 }
130 }
131 }
132 });
133 }
134 )
135 .concat((0..1).map(|x| (x,x)).to_stream(scope))
136 .connect_loop(handle);
137 });
138 }).unwrap(); }