1use std::collections::HashMap;
2
3use timely::dataflow::{InputHandle, ProbeHandle};
4use timely::dataflow::operators::{Feedback, ConnectLoop, Probe};
5use timely::dataflow::operators::generic::Operator;
6use timely::dataflow::channels::pact::Exchange;
7
8fn main() {
9
10 timely::execute_from_args(std::env::args().skip(3), move |worker| {
11
12 let mut input = InputHandle::new();
13 let probe = ProbeHandle::new();
14
15 worker.dataflow::<usize,_,_>(|scope| {
16
17 let edge_stream = input.to_stream(scope);
19
20 let (handle, rank_stream) = scope.feedback(1);
22
23 let changes = edge_stream.binary_frontier(
25 rank_stream,
26 Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
27 Exchange::new(|x: &(usize, i64)| x.0 as u64),
28 "PageRank",
29 |_capability, _info| {
30
31 let mut edge_stash: HashMap<_, Vec<_>> = HashMap::new();
33 let mut rank_stash: HashMap<_, Vec<_>> = HashMap::new();
34
35 let mut edges = Vec::new();
37 let mut ranks = Vec::new();
38 let mut diffs = Vec::new(); let mut delta = Vec::new();
40
41 let timer = ::std::time::Instant::now();
42
43 move |(input1, frontier1), (input2, frontier2), output| {
44
45 input1.for_each_time(|time, data| {
47 let entry = edge_stash.entry(time.retain(output.output_index())).or_default();
48 data.for_each(|data| entry.append(data));
49 });
50
51 input2.for_each_time(|time, data| {
53 let entry = rank_stash.entry(time.retain(output.output_index())).or_default();
54 data.for_each(|data| entry.append(data));
55 });
56
57 let frontiers = &[frontier1, frontier2];
58
59 for (time, edge_changes) in edge_stash.iter_mut() {
60 if frontiers.iter().all(|f| !f.less_equal(time)) {
61
62 let mut session = output.session(time);
63
64 compact(edge_changes);
65
66 for ((src, dst), diff) in edge_changes.drain(..) {
67
68 while edges.len() <= src { edges.push(Vec::new()); }
70 while ranks.len() <= src { ranks.push(1_000); }
71 while diffs.len() <= src { diffs.push(0); }
72
73 allocate(ranks[src], &edges[src][..], &mut delta);
75 for x in delta.iter_mut() { x.1 *= -1; }
76
77 edges[src].push((dst, diff));
79 compact(&mut edges[src]);
80
81 allocate(ranks[src], &edges[src][..], &mut delta);
83
84 compact(&mut delta);
86 for (dst, diff) in delta.drain(..) {
87 session.give((dst, diff));
88 }
89 }
90 }
91 }
92
93 edge_stash.retain(|_key, val| !val.is_empty());
94
95 for (time, rank_changes) in rank_stash.iter_mut() {
96 if frontiers.iter().all(|f| !f.less_equal(time)) {
97
98 let mut session = output.session(time);
99
100 compact(rank_changes);
101
102 let mut cnt = 0;
103 let mut sum = 0;
104 let mut max = 0;
105
106 for (src, diff) in rank_changes.drain(..) {
107
108 cnt += 1;
109 sum += diff.abs();
110 max = if max < diff.abs() { diff.abs() } else { max };
111
112 while edges.len() <= src { edges.push(Vec::new()); }
114 while ranks.len() <= src { ranks.push(1_000); }
115 while diffs.len() <= src { diffs.push(0); }
116
117 allocate(ranks[src], &edges[src][..], &mut delta);
119 for x in delta.iter_mut() { x.1 *= -1; }
120
121 diffs[src] += diff;
123 if diffs[src].abs() >= 6 {
124 ranks[src] += diffs[src];
125 diffs[src] = 0;
126 }
127
128 allocate(ranks[src], &edges[src][..], &mut delta);
130
131 compact(&mut delta);
133 for (dst, diff) in delta.drain(..) {
134 session.give((dst, diff));
135 }
136 }
137
138 println!("{:?}:\t{:?}\t{}\t{}\t{}", timer.elapsed(), time.time(), cnt, sum, max);
139 }
140 }
141
142 rank_stash.retain(|_key, val| !val.is_empty());
143
144 }
145 }
146 );
147
148 changes
149 .probe_with(&probe)
150 .connect_loop(handle);
151
152 });
153
154 let nodes: usize = std::env::args().nth(1).unwrap().parse().unwrap();
155 let edges: usize = std::env::args().nth(2).unwrap().parse().unwrap();
156
157 let index = worker.index();
158 use std::hash::{BuildHasher, BuildHasherDefault, DefaultHasher};
160 let hasher = BuildHasherDefault::<DefaultHasher>::new();
161 let mut insert = (0..).map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes,
162 hasher.hash_one(&(i,index,1)) as usize % nodes));
163 let remove = insert.clone();
164
165 for ins in (&mut insert).take(edges / worker.peers()) {
166 input.send((ins, 1));
167 }
168
169 input.advance_to(1);
170
171 while probe.less_than(input.time()) {
172 worker.step();
173 }
174
175 for (ins, del) in insert.zip(remove).take(1000) {
176 input.send((ins, 1));
177 input.send((del,-1));
178 input.advance_to(input.time() + 1);
179 while probe.less_than(input.time()) {
180 worker.step();
181 }
182 }
183
184 }).unwrap(); }
186
187fn compact<T: Ord>(list: &mut Vec<(T, i64)>) {
188 if !list.is_empty() {
189 list.sort_by(|x,y| x.0.cmp(&y.0));
190 for i in 0 .. list.len() - 1 {
191 if list[i].0 == list[i+1].0 {
192 list[i+1].1 += list[i].1;
193 list[i].1 = 0;
194 }
195 }
196 list.retain(|x| x.1 != 0);
197 }
198}
199
200fn allocate(rank: i64, edges: &[(usize, i64)], send: &mut Vec<(usize, i64)>) {
202 if !edges.is_empty() {
203 assert!(rank >= 0);
204 assert!(edges.iter().all(|x| x.1 > 0));
205
206 let distribute = (rank * 5) / 6;
207 let degree = edges.len() as i64;
208 let share = distribute / degree;
209 for i in 0 .. edges.len() {
210 if (i as i64) < (distribute % (edges.len() as i64)) {
211 send.push((edges[i].0, edges[i].1 * (share + 1)));
212 }
213 else {
214 send.push((edges[i].0, edges[i].1 * share));
215 }
216 }
217 }
218}