Skip to main content

pagerank/
pagerank.rs

1use std::collections::HashMap;
2
3use timely::dataflow::{InputHandle, ProbeHandle};
4use timely::dataflow::operators::{Feedback, ConnectLoop, Probe};
5use timely::dataflow::operators::generic::Operator;
6use timely::dataflow::channels::pact::Exchange;
7
8fn main() {
9
10    timely::execute_from_args(std::env::args().skip(3), move |worker| {
11
12        let mut input = InputHandle::new();
13        let probe = ProbeHandle::new();
14
15        worker.dataflow::<usize,_,_>(|scope| {
16
17            // create a new input, into which we can push edge changes.
18            let edge_stream = input.to_stream(scope);
19
20            // create a new feedback stream, which will be changes to ranks.
21            let (handle, rank_stream) = scope.feedback(1);
22
23            // bring edges and ranks together!
24            let changes = edge_stream.binary_frontier(
25                rank_stream,
26                Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
27                Exchange::new(|x: &(usize, i64)| x.0 as u64),
28                "PageRank",
29                |_capability, _info| {
30
31                    // where we stash out-of-order data.
32                    let mut edge_stash: HashMap<_, Vec<_>> = HashMap::new();
33                    let mut rank_stash: HashMap<_, Vec<_>> = HashMap::new();
34
35                    // lists of edges, ranks, and changes.
36                    let mut edges = Vec::new();
37                    let mut ranks = Vec::new();
38                    let mut diffs = Vec::new(); // for received but un-acted upon deltas.
39                    let mut delta = Vec::new();
40
41                    let timer = ::std::time::Instant::now();
42
43                    move |(input1, frontier1), (input2, frontier2), output| {
44
45                        // hold on to edge changes until it is time.
46                        input1.for_each_time(|time, data| {
47                            let entry = edge_stash.entry(time.retain(output.output_index())).or_default();
48                            data.for_each(|data| entry.append(data));
49                        });
50
51                        // hold on to rank changes until it is time.
52                        input2.for_each_time(|time, data| {
53                            let entry = rank_stash.entry(time.retain(output.output_index())).or_default();
54                            data.for_each(|data| entry.append(data));
55                        });
56
57                        let frontiers = &[frontier1, frontier2];
58
59                        for (time, edge_changes) in edge_stash.iter_mut() {
60                            if frontiers.iter().all(|f| !f.less_equal(time)) {
61
62                                let mut session = output.session(time);
63
64                                compact(edge_changes);
65
66                                for ((src, dst), diff) in edge_changes.drain(..) {
67
68                                    // 0. ensure enough state allocated
69                                    while edges.len() <= src { edges.push(Vec::new()); }
70                                    while ranks.len() <= src { ranks.push(1_000); }
71                                    while diffs.len() <= src { diffs.push(0); }
72
73                                    // 1. subtract previous distribution.
74                                    allocate(ranks[src], &edges[src][..], &mut delta);
75                                    for x in delta.iter_mut() { x.1 *= -1; }
76
77                                    // 2. update edges.
78                                    edges[src].push((dst, diff));
79                                    compact(&mut edges[src]);
80
81                                    // 3. re-distribute allocations.
82                                    allocate(ranks[src], &edges[src][..], &mut delta);
83
84                                    // 4. compact down and send cumulative changes.
85                                    compact(&mut delta);
86                                    for (dst, diff) in delta.drain(..) {
87                                        session.give((dst, diff));
88                                    }
89                                }
90                            }
91                        }
92
93                        edge_stash.retain(|_key, val| !val.is_empty());
94
95                        for (time, rank_changes) in rank_stash.iter_mut() {
96                            if frontiers.iter().all(|f| !f.less_equal(time)) {
97
98                                let mut session = output.session(time);
99
100                                compact(rank_changes);
101
102                                let mut cnt = 0;
103                                let mut sum = 0;
104                                let mut max = 0;
105
106                                for (src, diff) in rank_changes.drain(..) {
107
108                                    cnt += 1;
109                                    sum += diff.abs();
110                                    max = if max < diff.abs() { diff.abs() } else { max };
111
112                                    // 0. ensure enough state allocated
113                                    while edges.len() <= src { edges.push(Vec::new()); }
114                                    while ranks.len() <= src { ranks.push(1_000); }
115                                    while diffs.len() <= src { diffs.push(0); }
116
117                                    // 1. subtract previous distribution.
118                                    allocate(ranks[src], &edges[src][..], &mut delta);
119                                    for x in delta.iter_mut() { x.1 *= -1; }
120
121                                    // 2. update ranks.
122                                    diffs[src] += diff;
123                                    if diffs[src].abs() >= 6 {
124                                        ranks[src] += diffs[src];
125                                        diffs[src] = 0;
126                                    }
127
128                                    // 3. re-distribute allocations.
129                                    allocate(ranks[src], &edges[src][..], &mut delta);
130
131                                    // 4. compact down and send cumulative changes.
132                                    compact(&mut delta);
133                                    for (dst, diff) in delta.drain(..) {
134                                        session.give((dst, diff));
135                                    }
136                                }
137
138                                println!("{:?}:\t{:?}\t{}\t{}\t{}", timer.elapsed(), time.time(), cnt, sum, max);
139                            }
140                        }
141
142                        rank_stash.retain(|_key, val| !val.is_empty());
143
144                    }
145                }
146            );
147
148            changes
149                .probe_with(&probe)
150                .connect_loop(handle);
151
152        });
153
154        let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
155        let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
156
157        let index = worker.index();
158        // Generate roughly random data.
159        use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
160        let hasher = BuildHasherDefault::<DefaultHasher>::new();
161        let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
162                                             hasher.hash_one(&(i,index,1)) as usize % nodes));
163        let remove = insert.clone();
164
165        for ins in (&mut insert).take(edges / worker.peers()) {
166            input.send((ins, 1));
167        }
168
169        input.advance_to(1);
170
171        while probe.less_than(input.time()) {
172            worker.step();
173        }
174
175        for (ins, del) in insert.zip(remove).take(1000) {
176            input.send((ins, 1));
177            input.send((del,-1));
178            input.advance_to(input.time() + 1);
179            while probe.less_than(input.time()) {
180                worker.step();
181            }
182        }
183
184    }).unwrap(); // asserts error-free execution;
185}
186
187fn compact<T: Ord>(list: &mut Vec<(T, i64)>) {
188    if !list.is_empty() {
189        list.sort_by(|x,y| x.0.cmp(&y.0));
190        for i in 0 .. list.len() - 1 {
191            if list[i].0 == list[i+1].0 {
192                list[i+1].1 += list[i].1;
193                list[i].1 = 0;
194            }
195        }
196        list.retain(|x| x.1 != 0);
197    }
198}
199
200// this method allocates some rank between elements of `edges`.
201fn allocate(rank: i64, edges: &[(usize, i64)], send: &mut Vec<(usize, i64)>) {
202    if !edges.is_empty() {
203        assert!(rank >= 0);
204        assert!(edges.iter().all(|x| x.1 > 0));
205
206        let distribute = (rank * 5) / 6;
207        let degree = edges.len() as i64;
208        let share = distribute / degree;
209        for i in 0 .. edges.len() {
210            if (i as i64) < (distribute % (edges.len() as i64)) {
211                send.push((edges[i].0, edges[i].1 * (share + 1)));
212            }
213            else {
214                send.push((edges[i].0, edges[i].1 * share));
215            }
216        }
217    }
218}