Skip to main content

bfs/
bfs.rs

1use std::collections::HashMap;
2
3use timely::dataflow::operators::{ToStream, Concat, Feedback, ConnectLoop};
4use timely::dataflow::operators::generic::operator::Operator;
5use timely::dataflow::channels::pact::Exchange;
6
7fn main() {
8
9    // command-line args: numbers of nodes and edges in the random graph.
10    let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
11    let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
12
13    // let logging = ::timely::logging::to_tcp_socket();
14    timely::execute_from_args(std::env::args().skip(3), move |worker| {
15
16        let index = worker.index();
17        let peers = worker.peers();
18
19        // pending edges and node updates.
20        let mut edge_list = Vec::new();
21        let mut node_lists = HashMap::new();
22
23        // graph data; offsets into targets.
24        let mut offsets = Vec::new();
25        let mut targets = Vec::new();
26
27        // holds the bfs parent of each node, or u32::MAX if unset.
28        let mut done = vec![u32::MAX; 1 + (nodes / peers)];
29
30        let start = std::time::Instant::now();
31
32        worker.dataflow::<usize,_,_>(move |scope| {
33
34            // generate part of a random graph.
35            use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
36            let hasher = BuildHasherDefault::<DefaultHasher>::new();
37            let graph =
38            (0..edges/peers)
39                .map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
40                               hasher.hash_one(&(i,index,1)) as usize % nodes))
41                .map(|(src,dst)| (src as u32, dst as u32))
42                .to_stream(scope)
43                .container::<Vec<_>>();
44
45            // define a loop variable, for the (node, worker) pairs.
46            let (handle, stream) = scope.feedback(1usize);
47
48            // use the stream of edges
49            graph.binary_notify(
50                stream,
51                Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
52                Exchange::new(|x: &(u32, u32)| u64::from(x.0)),
53                "BFS",
54                vec![],
55                move |input1, input2, output, notify| {
56
57                    // receive edges, start to sort them
58                    input1.for_each_time(|time, data| {
59                        notify.notify_at(time.retain(output.output_index()));
60                        edge_list.extend(data.map(std::mem::take));
61                    });
62
63                    // receive (node, worker) pairs, note any new ones.
64                    input2.for_each_time(|time, data| {
65                        node_lists.entry(*time.time())
66                                  .or_insert_with(|| {
67                                      notify.notify_at(time.retain(output.output_index()));
68                                      Vec::new()
69                                  })
70                                  .extend(data.map(std::mem::take));
71                    });
72
73                    notify.for_each(|time, _num, _notify| {
74
75                        // maybe process the graph
76                        if *time == 0 {
77
78                            // print some diagnostic timing information
79                            if index == 0 { println!("{:?}:\tsorting", start.elapsed()); }
80
81                            // sort the edges (previously: radix sorted).
82                            edge_list.sort();
83
84                            let mut count = 0;
85                            for buffer in &edge_list { count += buffer.len(); }
86
87                            // allocate sufficient memory, to avoid resizing.
88                            offsets = Vec::with_capacity(1 + (nodes / peers));
89                            targets = Vec::with_capacity(count);
90
91                            // construct the graph
92                            offsets.push(0);
93                            let mut prev_node = 0;
94                            for buffer in edge_list.drain(..) {
95                                for (node, edge) in buffer {
96                                    let temp = node / peers as u32;
97                                    while prev_node < temp {
98                                        prev_node += 1;
99                                        offsets.push(targets.len() as u32)
100                                    }
101                                    targets.push(edge);
102                                }
103                            }
104                            while offsets.len() < offsets.capacity() {
105                                offsets.push(targets.len() as u32);
106                            }
107                        }
108
109                        // print some diagnostic timing information
110                        if index == 0 { println!("{:?}:\ttime: {:?}", start.elapsed(), time.time()); }
111
112                        if let Some(mut todo) = node_lists.remove(&time) {
113                            let mut session = output.session(&time);
114
115                            // we could sort these, or not (previously: radix sorted).
116                            // todo.sort();
117
118                            for buffer in todo.drain(..) {
119                                for (node, prev) in buffer {
120                                    let temp = (node as usize) / peers;
121                                    if done[temp] == u32::MAX {
122                                        done[temp] = prev;
123                                        let lower = offsets[temp] as usize;
124                                        let upper = offsets[temp + 1] as usize;
125                                        for &target in &targets[lower..upper] {
126                                            session.give((target, node));
127                                        }
128                                    }
129                                }
130                            }
131                        }
132                    });
133                }
134            )
135            .concat((0..1).map(|x| (x,x)).to_stream(scope))
136            .connect_loop(handle);
137        });
138    }).unwrap(); // asserts error-free execution;
139}