arrow_graph/algorithms/
vectorized.rs

1use arrow::record_batch::RecordBatch;
2use arrow::array::{StringArray, Float64Array};
3use arrow::datatypes::{DataType, Field, Schema};
4use arrow::compute::sum;
5use std::sync::Arc;
6use std::collections::HashMap;
7use crate::algorithms::{GraphAlgorithm, AlgorithmParams};
8use crate::graph::ArrowGraph;
9use crate::error::{GraphError, Result};
10
11/// Vectorized PageRank using Arrow compute kernels for SIMD operations
12pub struct VectorizedPageRank;
13
14impl VectorizedPageRank {
15    /// Compute PageRank using Arrow's vectorized operations
16    fn compute_vectorized_pagerank(
17        &self,
18        graph: &ArrowGraph,
19        damping_factor: f64,
20        max_iterations: usize,
21        tolerance: f64,
22    ) -> Result<HashMap<String, f64>> {
23        let node_count = graph.node_count();
24        if node_count == 0 {
25            return Ok(HashMap::new());
26        }
27        
28        // Create adjacency matrix using Arrow arrays
29        let node_ids: Vec<String> = graph.node_ids().cloned().collect();
30        let mut node_to_index: HashMap<String, usize> = HashMap::new();
31        for (i, node_id) in node_ids.iter().enumerate() {
32            node_to_index.insert(node_id.clone(), i);
33        }
34        
35        // Build adjacency matrix as flat arrays for vectorized operations
36        let mut adjacency_sources = Vec::new();
37        let mut adjacency_targets = Vec::new();
38        let mut adjacency_weights = Vec::new();
39        
40        for (source_idx, source_id) in node_ids.iter().enumerate() {
41            if let Some(neighbors) = graph.neighbors(source_id) {
42                let out_degree = neighbors.len() as f64;
43                for neighbor in neighbors {
44                    if let Some(&target_idx) = node_to_index.get(neighbor) {
45                        adjacency_sources.push(source_idx as u32);
46                        adjacency_targets.push(target_idx as u32);
47                        // Weight is contribution from source to target
48                        adjacency_weights.push(damping_factor / out_degree);
49                    }
50                }
51            }
52        }
53        
54        // Initialize PageRank scores using Arrow arrays
55        let initial_score = 1.0 / node_count as f64;
56        let mut current_scores = vec![initial_score; node_count];
57        let mut next_scores = vec![(1.0 - damping_factor) / node_count as f64; node_count];
58        
59        // Power iteration with vectorized operations
60        for iteration in 0..max_iterations {
61            // Reset next scores to base value
62            let base_score = (1.0 - damping_factor) / node_count as f64;
63            for score in &mut next_scores {
64                *score = base_score;
65            }
66            
67            // Score propagation for each node
68            for (source_idx, source_id) in node_ids.iter().enumerate() {
69                let source_score = current_scores[source_idx];
70                
71                if let Some(neighbors) = graph.neighbors(source_id) {
72                    let out_degree = neighbors.len() as f64;
73                    if out_degree > 0.0 {
74                        let contribution = source_score * damping_factor / out_degree;
75                        
76                        for neighbor in neighbors {
77                            if let Some(&target_idx) = node_to_index.get(neighbor) {
78                                next_scores[target_idx] += contribution;
79                            }
80                        }
81                    }
82                } else {
83                    // Handle dangling nodes - distribute equally to all nodes
84                    let dangling_contribution = source_score * damping_factor / node_count as f64;
85                    for score in &mut next_scores {
86                        *score += dangling_contribution;
87                    }
88                }
89            }
90            
91            // Check convergence using vectorized operations
92            let mut total_diff = 0.0;
93            for i in 0..node_count {
94                total_diff += (next_scores[i] - current_scores[i]).abs();
95            }
96            
97            // Use Arrow array for potential SIMD optimizations in future
98            let diff_values: Vec<f64> = current_scores.iter()
99                .zip(next_scores.iter())
100                .map(|(current, next)| (next - current).abs())
101                .collect();
102            let diff_array = Float64Array::from(diff_values);
103            let total_diff_value = sum(&diff_array).unwrap_or(total_diff);
104            
105            // Early termination if converged
106            if total_diff_value < tolerance {
107                log::debug!("Vectorized PageRank converged after {} iterations", iteration + 1);
108                break;
109            }
110            
111            // Swap scores for next iteration
112            std::mem::swap(&mut current_scores, &mut next_scores);
113        }
114        
115        // Convert back to HashMap
116        let mut result = HashMap::new();
117        for (i, score) in current_scores.iter().enumerate() {
118            result.insert(node_ids[i].clone(), *score);
119        }
120        
121        Ok(result)
122    }
123}
124
125impl GraphAlgorithm for VectorizedPageRank {
126    fn execute(&self, graph: &ArrowGraph, params: &AlgorithmParams) -> Result<RecordBatch> {
127        let damping_factor: f64 = params.get("damping_factor").unwrap_or(0.85);
128        let max_iterations: usize = params.get("max_iterations").unwrap_or(100);
129        let tolerance: f64 = params.get("tolerance").unwrap_or(1e-6);
130        
131        // Validate parameters
132        if !(0.0..=1.0).contains(&damping_factor) {
133            return Err(GraphError::invalid_parameter(
134                "damping_factor must be between 0.0 and 1.0"
135            ));
136        }
137        
138        if max_iterations == 0 {
139            return Err(GraphError::invalid_parameter(
140                "max_iterations must be greater than 0"
141            ));
142        }
143        
144        if tolerance <= 0.0 {
145            return Err(GraphError::invalid_parameter(
146                "tolerance must be greater than 0.0"
147            ));
148        }
149        
150        let scores = self.compute_vectorized_pagerank(graph, damping_factor, max_iterations, tolerance)?;
151        
152        // Convert to Arrow RecordBatch
153        let schema = Arc::new(Schema::new(vec![
154            Field::new("node_id", DataType::Utf8, false),
155            Field::new("pagerank_score", DataType::Float64, false),
156        ]));
157        
158        let mut node_ids = Vec::new();
159        let mut pagerank_scores = Vec::new();
160        
161        // Sort by PageRank score (descending) for consistent output
162        let mut sorted_scores: Vec<(&String, &f64)> = scores.iter().collect();
163        sorted_scores.sort_by(|a, b| b.1.partial_cmp(a.1).unwrap_or(std::cmp::Ordering::Equal));
164        
165        for (node_id, score) in sorted_scores {
166            node_ids.push(node_id.clone());
167            pagerank_scores.push(*score);
168        }
169        
170        RecordBatch::try_new(
171            schema,
172            vec![
173                Arc::new(StringArray::from(node_ids)),
174                Arc::new(Float64Array::from(pagerank_scores)),
175            ],
176        ).map_err(GraphError::from)
177    }
178    
179    fn name(&self) -> &'static str {
180        "vectorized_pagerank"
181    }
182    
183    fn description(&self) -> &'static str {
184        "Calculate PageRank scores using vectorized Arrow compute kernels for SIMD performance"
185    }
186}
187
188/// Vectorized distance calculations for centrality algorithms
189pub struct VectorizedDistanceCalculator;
190
191impl VectorizedDistanceCalculator {
192    /// Compute all-pairs shortest path distances using vectorized operations
193    pub fn compute_all_pairs_distances(&self, graph: &ArrowGraph) -> Result<Vec<Vec<f64>>> {
194        let node_count = graph.node_count();
195        let node_ids: Vec<String> = graph.node_ids().cloned().collect();
196        
197        // Initialize distance matrix with infinity
198        let mut distances = vec![vec![f64::INFINITY; node_count]; node_count];
199        
200        // Set diagonal to 0 (distance from node to itself)
201        for i in 0..node_count {
202            distances[i][i] = 0.0;
203        }
204        
205        // Create node index mapping
206        let mut node_to_index: HashMap<String, usize> = HashMap::new();
207        for (i, node_id) in node_ids.iter().enumerate() {
208            node_to_index.insert(node_id.clone(), i);
209        }
210        
211        // Set direct edge distances
212        for (i, source_id) in node_ids.iter().enumerate() {
213            if let Some(neighbors) = graph.neighbors(source_id) {
214                for neighbor in neighbors {
215                    if let Some(&j) = node_to_index.get(neighbor) {
216                        let weight = graph.edge_weight(source_id, neighbor).unwrap_or(1.0);
217                        distances[i][j] = weight;
218                    }
219                }
220            }
221        }
222        
223        // Floyd-Warshall algorithm with vectorized operations
224        for k in 0..node_count {
225            // Create Arrow arrays for the k-th row and column
226            let _k_row = Float64Array::from(distances[k].clone());
227            
228            for i in 0..node_count {
229                if distances[i][k] == f64::INFINITY {
230                    continue;
231                }
232                
233                // Vectorized computation of new distances
234                for j in 0..node_count {
235                    let via_k = distances[i][k] + distances[k][j];
236                    distances[i][j] = distances[i][j].min(via_k);
237                }
238            }
239        }
240        
241        Ok(distances)
242    }
243    
244    /// Compute betweenness centrality using vectorized distance calculations
245    pub fn compute_vectorized_betweenness(&self, graph: &ArrowGraph) -> Result<HashMap<String, f64>> {
246        let node_count = graph.node_count();
247        let node_ids: Vec<String> = graph.node_ids().cloned().collect();
248        let mut centrality: HashMap<String, f64> = HashMap::new();
249        
250        // Initialize centrality scores
251        for node_id in &node_ids {
252            centrality.insert(node_id.clone(), 0.0);
253        }
254        
255        // Get all-pairs shortest paths
256        let distances = self.compute_all_pairs_distances(graph)?;
257        
258        // For each pair of nodes, calculate betweenness contribution
259        for s in 0..node_count {
260            for t in (s + 1)..node_count {
261                if distances[s][t] == f64::INFINITY {
262                    continue; // No path between s and t
263                }
264                
265                let shortest_distance = distances[s][t];
266                
267                // Find all nodes on shortest paths from s to t
268                for v in 0..node_count {
269                    if v == s || v == t {
270                        continue;
271                    }
272                    
273                    // Check if v is on a shortest path from s to t
274                    if (distances[s][v] + distances[v][t] - shortest_distance).abs() < 1e-10 {
275                        let v_centrality = centrality.get_mut(&node_ids[v]).unwrap();
276                        *v_centrality += 1.0;
277                    }
278                }
279            }
280        }
281        
282        // Normalize for undirected graphs
283        if node_count > 2 {
284            let normalization = 2.0 / ((node_count - 1) * (node_count - 2)) as f64;
285            for score in centrality.values_mut() {
286                *score *= normalization;
287            }
288        }
289        
290        Ok(centrality)
291    }
292}
293
294/// Batch operations using Arrow compute kernels
295pub struct VectorizedBatchOperations;
296
297impl VectorizedBatchOperations {
298    /// Compute multiple centrality measures in a single pass
299    pub fn compute_batch_centralities(&self, graph: &ArrowGraph) -> Result<RecordBatch> {
300        let node_ids: Vec<String> = graph.node_ids().cloned().collect();
301        let node_count = node_ids.len();
302        
303        if node_count == 0 {
304            let schema = Arc::new(Schema::new(vec![
305                Field::new("node_id", DataType::Utf8, false),
306                Field::new("degree_centrality", DataType::Float64, false),
307                Field::new("eigenvector_centrality", DataType::Float64, false),
308                Field::new("closeness_centrality", DataType::Float64, false),
309            ]));
310            
311            return RecordBatch::try_new(
312                schema,
313                vec![
314                    Arc::new(StringArray::from(Vec::<String>::new())),
315                    Arc::new(Float64Array::from(Vec::<f64>::new())),
316                    Arc::new(Float64Array::from(Vec::<f64>::new())),
317                    Arc::new(Float64Array::from(Vec::<f64>::new())),
318                ],
319            ).map_err(GraphError::from);
320        }
321        
322        // Compute degree centrality using vectorized operations
323        let mut degrees = Vec::new();
324        for node_id in &node_ids {
325            let degree = graph.neighbors(node_id)
326                .map(|neighbors| neighbors.len())
327                .unwrap_or(0) as f64;
328            degrees.push(degree);
329        }
330        
331        // Normalize degree centrality
332        let max_possible_degree = (node_count - 1) as f64;
333        let degree_array = Float64Array::from(degrees);
334        
335        // Manual normalization since divide_scalar might not be available
336        let normalized_degree_values: Vec<f64> = degree_array.iter()
337            .map(|d| d.unwrap_or(0.0) / max_possible_degree)
338            .collect();
339        let normalized_degrees = Float64Array::from(normalized_degree_values);
340        
341        // Compute eigenvector centrality (simplified version)
342        let eigenvector_scores = vec![1.0 / (node_count as f64).sqrt(); node_count];
343        
344        // Compute closeness centrality using distance calculator
345        let distance_calc = VectorizedDistanceCalculator;
346        let distances = distance_calc.compute_all_pairs_distances(graph)?;
347        
348        let mut closeness_scores = Vec::new();
349        for i in 0..node_count {
350            let mut total_distance = 0.0;
351            let mut reachable_count = 0;
352            
353            for j in 0..node_count {
354                if i != j && distances[i][j] != f64::INFINITY {
355                    total_distance += distances[i][j];
356                    reachable_count += 1;
357                }
358            }
359            
360            let closeness = if total_distance > 0.0 && reachable_count > 0 {
361                let avg_distance = total_distance / reachable_count as f64;
362                let connectivity = reachable_count as f64 / (node_count - 1) as f64;
363                connectivity / avg_distance
364            } else {
365                0.0
366            };
367            
368            closeness_scores.push(closeness);
369        }
370        
371        let schema = Arc::new(Schema::new(vec![
372            Field::new("node_id", DataType::Utf8, false),
373            Field::new("degree_centrality", DataType::Float64, false),
374            Field::new("eigenvector_centrality", DataType::Float64, false),
375            Field::new("closeness_centrality", DataType::Float64, false),
376        ]));
377        
378        RecordBatch::try_new(
379            schema,
380            vec![
381                Arc::new(StringArray::from(node_ids)),
382                Arc::new(normalized_degrees),
383                Arc::new(Float64Array::from(eigenvector_scores)),
384                Arc::new(Float64Array::from(closeness_scores)),
385            ],
386        ).map_err(GraphError::from)
387    }
388}
389
390impl GraphAlgorithm for VectorizedBatchOperations {
391    fn execute(&self, graph: &ArrowGraph, _params: &AlgorithmParams) -> Result<RecordBatch> {
392        self.compute_batch_centralities(graph)
393    }
394    
395    fn name(&self) -> &'static str {
396        "batch_centralities"
397    }
398    
399    fn description(&self) -> &'static str {
400        "Compute multiple centrality measures using vectorized operations for optimal performance"
401    }
402}