1use arrow::record_batch::RecordBatch;
2use arrow::array::{StringArray, Float64Array};
3use arrow::datatypes::{DataType, Field, Schema};
4use arrow::compute::sum;
5use std::sync::Arc;
6use std::collections::HashMap;
7use crate::algorithms::{GraphAlgorithm, AlgorithmParams};
8use crate::graph::ArrowGraph;
9use crate::error::{GraphError, Result};
10
11pub struct VectorizedPageRank;
13
14impl VectorizedPageRank {
15 fn compute_vectorized_pagerank(
17 &self,
18 graph: &ArrowGraph,
19 damping_factor: f64,
20 max_iterations: usize,
21 tolerance: f64,
22 ) -> Result<HashMap<String, f64>> {
23 let node_count = graph.node_count();
24 if node_count == 0 {
25 return Ok(HashMap::new());
26 }
27
28 let node_ids: Vec<String> = graph.node_ids().cloned().collect();
30 let mut node_to_index: HashMap<String, usize> = HashMap::new();
31 for (i, node_id) in node_ids.iter().enumerate() {
32 node_to_index.insert(node_id.clone(), i);
33 }
34
35 let mut adjacency_sources = Vec::new();
37 let mut adjacency_targets = Vec::new();
38 let mut adjacency_weights = Vec::new();
39
40 for (source_idx, source_id) in node_ids.iter().enumerate() {
41 if let Some(neighbors) = graph.neighbors(source_id) {
42 let out_degree = neighbors.len() as f64;
43 for neighbor in neighbors {
44 if let Some(&target_idx) = node_to_index.get(neighbor) {
45 adjacency_sources.push(source_idx as u32);
46 adjacency_targets.push(target_idx as u32);
47 adjacency_weights.push(damping_factor / out_degree);
49 }
50 }
51 }
52 }
53
54 let initial_score = 1.0 / node_count as f64;
56 let mut current_scores = vec![initial_score; node_count];
57 let mut next_scores = vec![(1.0 - damping_factor) / node_count as f64; node_count];
58
59 for iteration in 0..max_iterations {
61 let base_score = (1.0 - damping_factor) / node_count as f64;
63 for score in &mut next_scores {
64 *score = base_score;
65 }
66
67 for (source_idx, source_id) in node_ids.iter().enumerate() {
69 let source_score = current_scores[source_idx];
70
71 if let Some(neighbors) = graph.neighbors(source_id) {
72 let out_degree = neighbors.len() as f64;
73 if out_degree > 0.0 {
74 let contribution = source_score * damping_factor / out_degree;
75
76 for neighbor in neighbors {
77 if let Some(&target_idx) = node_to_index.get(neighbor) {
78 next_scores[target_idx] += contribution;
79 }
80 }
81 }
82 } else {
83 let dangling_contribution = source_score * damping_factor / node_count as f64;
85 for score in &mut next_scores {
86 *score += dangling_contribution;
87 }
88 }
89 }
90
91 let mut total_diff = 0.0;
93 for i in 0..node_count {
94 total_diff += (next_scores[i] - current_scores[i]).abs();
95 }
96
97 let diff_values: Vec<f64> = current_scores.iter()
99 .zip(next_scores.iter())
100 .map(|(current, next)| (next - current).abs())
101 .collect();
102 let diff_array = Float64Array::from(diff_values);
103 let total_diff_value = sum(&diff_array).unwrap_or(total_diff);
104
105 if total_diff_value < tolerance {
107 log::debug!("Vectorized PageRank converged after {} iterations", iteration + 1);
108 break;
109 }
110
111 std::mem::swap(&mut current_scores, &mut next_scores);
113 }
114
115 let mut result = HashMap::new();
117 for (i, score) in current_scores.iter().enumerate() {
118 result.insert(node_ids[i].clone(), *score);
119 }
120
121 Ok(result)
122 }
123}
124
125impl GraphAlgorithm for VectorizedPageRank {
126 fn execute(&self, graph: &ArrowGraph, params: &AlgorithmParams) -> Result<RecordBatch> {
127 let damping_factor: f64 = params.get("damping_factor").unwrap_or(0.85);
128 let max_iterations: usize = params.get("max_iterations").unwrap_or(100);
129 let tolerance: f64 = params.get("tolerance").unwrap_or(1e-6);
130
131 if !(0.0..=1.0).contains(&damping_factor) {
133 return Err(GraphError::invalid_parameter(
134 "damping_factor must be between 0.0 and 1.0"
135 ));
136 }
137
138 if max_iterations == 0 {
139 return Err(GraphError::invalid_parameter(
140 "max_iterations must be greater than 0"
141 ));
142 }
143
144 if tolerance <= 0.0 {
145 return Err(GraphError::invalid_parameter(
146 "tolerance must be greater than 0.0"
147 ));
148 }
149
150 let scores = self.compute_vectorized_pagerank(graph, damping_factor, max_iterations, tolerance)?;
151
152 let schema = Arc::new(Schema::new(vec![
154 Field::new("node_id", DataType::Utf8, false),
155 Field::new("pagerank_score", DataType::Float64, false),
156 ]));
157
158 let mut node_ids = Vec::new();
159 let mut pagerank_scores = Vec::new();
160
161 let mut sorted_scores: Vec<(&String, &f64)> = scores.iter().collect();
163 sorted_scores.sort_by(|a, b| b.1.partial_cmp(a.1).unwrap_or(std::cmp::Ordering::Equal));
164
165 for (node_id, score) in sorted_scores {
166 node_ids.push(node_id.clone());
167 pagerank_scores.push(*score);
168 }
169
170 RecordBatch::try_new(
171 schema,
172 vec![
173 Arc::new(StringArray::from(node_ids)),
174 Arc::new(Float64Array::from(pagerank_scores)),
175 ],
176 ).map_err(GraphError::from)
177 }
178
179 fn name(&self) -> &'static str {
180 "vectorized_pagerank"
181 }
182
183 fn description(&self) -> &'static str {
184 "Calculate PageRank scores using vectorized Arrow compute kernels for SIMD performance"
185 }
186}
187
188pub struct VectorizedDistanceCalculator;
190
191impl VectorizedDistanceCalculator {
192 pub fn compute_all_pairs_distances(&self, graph: &ArrowGraph) -> Result<Vec<Vec<f64>>> {
194 let node_count = graph.node_count();
195 let node_ids: Vec<String> = graph.node_ids().cloned().collect();
196
197 let mut distances = vec![vec![f64::INFINITY; node_count]; node_count];
199
200 for i in 0..node_count {
202 distances[i][i] = 0.0;
203 }
204
205 let mut node_to_index: HashMap<String, usize> = HashMap::new();
207 for (i, node_id) in node_ids.iter().enumerate() {
208 node_to_index.insert(node_id.clone(), i);
209 }
210
211 for (i, source_id) in node_ids.iter().enumerate() {
213 if let Some(neighbors) = graph.neighbors(source_id) {
214 for neighbor in neighbors {
215 if let Some(&j) = node_to_index.get(neighbor) {
216 let weight = graph.edge_weight(source_id, neighbor).unwrap_or(1.0);
217 distances[i][j] = weight;
218 }
219 }
220 }
221 }
222
223 for k in 0..node_count {
225 let _k_row = Float64Array::from(distances[k].clone());
227
228 for i in 0..node_count {
229 if distances[i][k] == f64::INFINITY {
230 continue;
231 }
232
233 for j in 0..node_count {
235 let via_k = distances[i][k] + distances[k][j];
236 distances[i][j] = distances[i][j].min(via_k);
237 }
238 }
239 }
240
241 Ok(distances)
242 }
243
244 pub fn compute_vectorized_betweenness(&self, graph: &ArrowGraph) -> Result<HashMap<String, f64>> {
246 let node_count = graph.node_count();
247 let node_ids: Vec<String> = graph.node_ids().cloned().collect();
248 let mut centrality: HashMap<String, f64> = HashMap::new();
249
250 for node_id in &node_ids {
252 centrality.insert(node_id.clone(), 0.0);
253 }
254
255 let distances = self.compute_all_pairs_distances(graph)?;
257
258 for s in 0..node_count {
260 for t in (s + 1)..node_count {
261 if distances[s][t] == f64::INFINITY {
262 continue; }
264
265 let shortest_distance = distances[s][t];
266
267 for v in 0..node_count {
269 if v == s || v == t {
270 continue;
271 }
272
273 if (distances[s][v] + distances[v][t] - shortest_distance).abs() < 1e-10 {
275 let v_centrality = centrality.get_mut(&node_ids[v]).unwrap();
276 *v_centrality += 1.0;
277 }
278 }
279 }
280 }
281
282 if node_count > 2 {
284 let normalization = 2.0 / ((node_count - 1) * (node_count - 2)) as f64;
285 for score in centrality.values_mut() {
286 *score *= normalization;
287 }
288 }
289
290 Ok(centrality)
291 }
292}
293
294pub struct VectorizedBatchOperations;
296
297impl VectorizedBatchOperations {
298 pub fn compute_batch_centralities(&self, graph: &ArrowGraph) -> Result<RecordBatch> {
300 let node_ids: Vec<String> = graph.node_ids().cloned().collect();
301 let node_count = node_ids.len();
302
303 if node_count == 0 {
304 let schema = Arc::new(Schema::new(vec![
305 Field::new("node_id", DataType::Utf8, false),
306 Field::new("degree_centrality", DataType::Float64, false),
307 Field::new("eigenvector_centrality", DataType::Float64, false),
308 Field::new("closeness_centrality", DataType::Float64, false),
309 ]));
310
311 return RecordBatch::try_new(
312 schema,
313 vec![
314 Arc::new(StringArray::from(Vec::<String>::new())),
315 Arc::new(Float64Array::from(Vec::<f64>::new())),
316 Arc::new(Float64Array::from(Vec::<f64>::new())),
317 Arc::new(Float64Array::from(Vec::<f64>::new())),
318 ],
319 ).map_err(GraphError::from);
320 }
321
322 let mut degrees = Vec::new();
324 for node_id in &node_ids {
325 let degree = graph.neighbors(node_id)
326 .map(|neighbors| neighbors.len())
327 .unwrap_or(0) as f64;
328 degrees.push(degree);
329 }
330
331 let max_possible_degree = (node_count - 1) as f64;
333 let degree_array = Float64Array::from(degrees);
334
335 let normalized_degree_values: Vec<f64> = degree_array.iter()
337 .map(|d| d.unwrap_or(0.0) / max_possible_degree)
338 .collect();
339 let normalized_degrees = Float64Array::from(normalized_degree_values);
340
341 let eigenvector_scores = vec![1.0 / (node_count as f64).sqrt(); node_count];
343
344 let distance_calc = VectorizedDistanceCalculator;
346 let distances = distance_calc.compute_all_pairs_distances(graph)?;
347
348 let mut closeness_scores = Vec::new();
349 for i in 0..node_count {
350 let mut total_distance = 0.0;
351 let mut reachable_count = 0;
352
353 for j in 0..node_count {
354 if i != j && distances[i][j] != f64::INFINITY {
355 total_distance += distances[i][j];
356 reachable_count += 1;
357 }
358 }
359
360 let closeness = if total_distance > 0.0 && reachable_count > 0 {
361 let avg_distance = total_distance / reachable_count as f64;
362 let connectivity = reachable_count as f64 / (node_count - 1) as f64;
363 connectivity / avg_distance
364 } else {
365 0.0
366 };
367
368 closeness_scores.push(closeness);
369 }
370
371 let schema = Arc::new(Schema::new(vec![
372 Field::new("node_id", DataType::Utf8, false),
373 Field::new("degree_centrality", DataType::Float64, false),
374 Field::new("eigenvector_centrality", DataType::Float64, false),
375 Field::new("closeness_centrality", DataType::Float64, false),
376 ]));
377
378 RecordBatch::try_new(
379 schema,
380 vec![
381 Arc::new(StringArray::from(node_ids)),
382 Arc::new(normalized_degrees),
383 Arc::new(Float64Array::from(eigenvector_scores)),
384 Arc::new(Float64Array::from(closeness_scores)),
385 ],
386 ).map_err(GraphError::from)
387 }
388}
389
390impl GraphAlgorithm for VectorizedBatchOperations {
391 fn execute(&self, graph: &ArrowGraph, _params: &AlgorithmParams) -> Result<RecordBatch> {
392 self.compute_batch_centralities(graph)
393 }
394
395 fn name(&self) -> &'static str {
396 "batch_centralities"
397 }
398
399 fn description(&self) -> &'static str {
400 "Compute multiple centrality measures using vectorized operations for optimal performance"
401 }
402}