arrow_graph/algorithms/
tests.rs

1#[cfg(test)]
2mod tests {
3    use crate::algorithms::pathfinding::{ShortestPath, AllPaths};
4    use crate::algorithms::centrality::{PageRank, BetweennessCentrality, EigenvectorCentrality, ClosenessCentrality};
5    use crate::algorithms::components::{WeaklyConnectedComponents, StronglyConnectedComponents};
6    use crate::algorithms::community::LeidenCommunityDetection;
7    use crate::algorithms::aggregation::{TriangleCount, ClusteringCoefficient};
8    use crate::algorithms::vectorized::{VectorizedPageRank, VectorizedBatchOperations};
9    use crate::algorithms::sampling::{RandomWalk, Node2VecWalk, GraphSampling};
10    use crate::graph::{StreamingGraphProcessor, StreamingGraphSystem, StreamUpdate};
11    use crate::{GraphAlgorithm, AlgorithmParams};
12    use crate::ArrowGraph;
13    use arrow::record_batch::RecordBatch;
14    use arrow::array::{StringArray, Float64Array, UInt32Array, UInt64Array};
15    use arrow::datatypes::{DataType, Field, Schema};
16    use std::sync::Arc;
17
18    fn create_test_graph() -> ArrowGraph {
19        let schema = Arc::new(Schema::new(vec![
20            Field::new("source", DataType::Utf8, false),
21            Field::new("target", DataType::Utf8, false),
22            Field::new("weight", DataType::Float64, true),
23        ]));
24
25        let source_array = StringArray::from(vec!["A", "A", "B", "C", "D"]);
26        let target_array = StringArray::from(vec!["B", "C", "C", "D", "E"]);
27        let weight_array = Float64Array::from(vec![
28            Some(1.0), 
29            Some(4.0), 
30            Some(2.0), 
31            Some(1.0),
32            Some(1.0)
33        ]);
34
35        let edges = RecordBatch::try_new(
36            schema,
37            vec![
38                Arc::new(source_array) as Arc<dyn arrow::array::Array>, 
39                Arc::new(target_array) as Arc<dyn arrow::array::Array>, 
40                Arc::new(weight_array) as Arc<dyn arrow::array::Array>
41            ],
42        ).unwrap();
43
44        ArrowGraph::from_edges(edges).unwrap()
45    }
46
47    fn create_disconnected_graph() -> ArrowGraph {
48        let schema = Arc::new(Schema::new(vec![
49            Field::new("source", DataType::Utf8, false),
50            Field::new("target", DataType::Utf8, false),
51            Field::new("weight", DataType::Float64, true),
52        ]));
53
54        // Create two disconnected components: A-B and C-D
55        let source_array = StringArray::from(vec!["A", "C"]);
56        let target_array = StringArray::from(vec!["B", "D"]);
57        let weight_array = Float64Array::from(vec![Some(1.0), Some(1.0)]);
58
59        let edges = RecordBatch::try_new(
60            schema,
61            vec![
62                Arc::new(source_array) as Arc<dyn arrow::array::Array>, 
63                Arc::new(target_array) as Arc<dyn arrow::array::Array>, 
64                Arc::new(weight_array) as Arc<dyn arrow::array::Array>
65            ],
66        ).unwrap();
67
68        ArrowGraph::from_edges(edges).unwrap()
69    }
70
71    #[test]
72    fn test_shortest_path_single_target() {
73        let graph = create_test_graph();
74        let algorithm = ShortestPath;
75        
76        let params = AlgorithmParams::new()
77            .with_param("source", "A")
78            .with_param("target", "D");
79        
80        let result = algorithm.execute(&graph, &params).unwrap();
81        
82        assert_eq!(result.num_rows(), 1);
83        assert_eq!(result.num_columns(), 4);
84        
85        // Check that we got a result with source A, target D
86        let source_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
87        let target_col = result.column(1).as_any().downcast_ref::<StringArray>().unwrap();
88        let distance_col = result.column(2).as_any().downcast_ref::<Float64Array>().unwrap();
89        
90        assert_eq!(source_col.value(0), "A");
91        assert_eq!(target_col.value(0), "D");
92        assert_eq!(distance_col.value(0), 4.0); // A->B->C->D = 1+2+1 = 4, A->C->D = 4+1 = 5, so shortest is A->B->C->D = 4
93    }
94
95    #[test]
96    fn test_shortest_path_all_targets() {
97        let graph = create_test_graph();
98        let algorithm = ShortestPath;
99        
100        let params = AlgorithmParams::new()
101            .with_param("source", "A");
102        
103        let result = algorithm.execute(&graph, &params).unwrap();
104        
105        // Should have paths to all reachable nodes except source
106        assert!(result.num_rows() > 0);
107        assert_eq!(result.num_columns(), 3);
108        
109        let source_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
110        
111        // All sources should be "A"
112        for i in 0..result.num_rows() {
113            assert_eq!(source_col.value(i), "A");
114        }
115    }
116
117    #[test]
118    fn test_all_paths() {
119        let graph = create_test_graph();
120        let algorithm = AllPaths;
121        
122        let params = AlgorithmParams::new()
123            .with_param("source", "A")
124            .with_param("target", "D")
125            .with_param("max_hops", 5);
126        
127        let result = algorithm.execute(&graph, &params).unwrap();
128        
129        // Should find multiple paths from A to D
130        assert!(result.num_rows() >= 1);
131        assert_eq!(result.num_columns(), 4);
132        
133        let source_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
134        let target_col = result.column(1).as_any().downcast_ref::<StringArray>().unwrap();
135        
136        for i in 0..result.num_rows() {
137            assert_eq!(source_col.value(i), "A");
138            assert_eq!(target_col.value(i), "D");
139        }
140    }
141
142    #[test]
143    fn test_shortest_path_no_path() {
144        let graph = create_test_graph();
145        let algorithm = ShortestPath;
146        
147        let params = AlgorithmParams::new()
148            .with_param("source", "E")
149            .with_param("target", "A");
150        
151        let result = algorithm.execute(&graph, &params);
152        
153        // Should return an error since there's no path from E to A
154        assert!(result.is_err());
155    }
156
157    #[test]
158    fn test_pagerank() {
159        let graph = create_test_graph();
160        let algorithm = PageRank;
161        
162        let params = AlgorithmParams::new()
163            .with_param("damping_factor", 0.85)
164            .with_param("max_iterations", 100)
165            .with_param("tolerance", 1e-6);
166        
167        let result = algorithm.execute(&graph, &params).unwrap();
168        
169        // Should have PageRank scores for all nodes
170        assert_eq!(result.num_rows(), graph.node_count());
171        assert_eq!(result.num_columns(), 2);
172        
173        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
174        let score_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
175        
176        // Verify all nodes are present
177        let mut found_nodes = std::collections::HashSet::new();
178        let mut total_score = 0.0;
179        
180        for i in 0..result.num_rows() {
181            let node_id = node_col.value(i);
182            let score = score_col.value(i);
183            
184            found_nodes.insert(node_id);
185            total_score += score;
186            
187            // PageRank scores should be positive
188            assert!(score > 0.0);
189        }
190        
191        // Check that we have all expected nodes
192        assert!(found_nodes.contains("A"));
193        assert!(found_nodes.contains("B"));
194        assert!(found_nodes.contains("C"));
195        assert!(found_nodes.contains("D"));
196        assert!(found_nodes.contains("E"));
197        
198        // PageRank scores should sum to approximately 1.0
199        assert!((total_score - 1.0).abs() < 1e-10);
200    }
201
202    #[test]
203    fn test_pagerank_with_custom_params() {
204        let graph = create_test_graph();
205        let algorithm = PageRank;
206        
207        let params = AlgorithmParams::new()
208            .with_param("damping_factor", 0.5)
209            .with_param("max_iterations", 10)
210            .with_param("tolerance", 1e-3);
211        
212        let result = algorithm.execute(&graph, &params).unwrap();
213        
214        assert!(result.num_rows() > 0);
215        
216        let score_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
217        let total_score: f64 = (0..result.num_rows()).map(|i| score_col.value(i)).sum();
218        
219        // Should still sum to approximately 1.0 even with different parameters
220        assert!((total_score - 1.0).abs() < 1e-2);
221    }
222
223    #[test]
224    fn test_pagerank_invalid_params() {
225        let graph = create_test_graph();
226        let algorithm = PageRank;
227        
228        // Test invalid damping factor
229        let params = AlgorithmParams::new().with_param("damping_factor", 1.5);
230        let result = algorithm.execute(&graph, &params);
231        assert!(result.is_err());
232        
233        // Test invalid max_iterations
234        let params = AlgorithmParams::new().with_param("max_iterations", 0);
235        let result = algorithm.execute(&graph, &params);
236        assert!(result.is_err());
237        
238        // Test invalid tolerance
239        let params = AlgorithmParams::new().with_param("tolerance", -1.0);
240        let result = algorithm.execute(&graph, &params);
241        assert!(result.is_err());
242    }
243
244    #[test]
245    fn test_weakly_connected_components() {
246        let graph = create_test_graph();
247        let algorithm = WeaklyConnectedComponents;
248        
249        let params = AlgorithmParams::new();
250        let result = algorithm.execute(&graph, &params).unwrap();
251        
252        // Should have one component containing all nodes
253        assert_eq!(result.num_rows(), graph.node_count());
254        assert_eq!(result.num_columns(), 2);
255        
256        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
257        let component_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
258        
259        // All nodes should be in the same component (0) since the graph is connected
260        let mut found_nodes = std::collections::HashSet::new();
261        for i in 0..result.num_rows() {
262            let node_id = node_col.value(i);
263            let component_id = component_col.value(i);
264            
265            found_nodes.insert(node_id);
266            assert_eq!(component_id, 0); // Single component
267        }
268        
269        // Check that we have all expected nodes
270        assert!(found_nodes.contains("A"));
271        assert!(found_nodes.contains("B"));
272        assert!(found_nodes.contains("C"));
273        assert!(found_nodes.contains("D"));
274        assert!(found_nodes.contains("E"));
275    }
276
277    #[test]
278    fn test_weakly_connected_components_disconnected() {
279        let graph = create_disconnected_graph();
280        let algorithm = WeaklyConnectedComponents;
281        
282        let params = AlgorithmParams::new();
283        let result = algorithm.execute(&graph, &params).unwrap();
284        
285        // Should have two components: A-B and C-D
286        assert_eq!(result.num_rows(), 4);
287        assert_eq!(result.num_columns(), 2);
288        
289        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
290        let component_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
291        
292        let mut component_map: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
293        for i in 0..result.num_rows() {
294            let node_id = node_col.value(i).to_string();
295            let component_id = component_col.value(i);
296            component_map.insert(node_id, component_id);
297        }
298        
299        // A and B should be in the same component
300        assert_eq!(component_map.get("A"), component_map.get("B"));
301        
302        // C and D should be in the same component
303        assert_eq!(component_map.get("C"), component_map.get("D"));
304        
305        // A-B and C-D should be in different components
306        assert_ne!(component_map.get("A"), component_map.get("C"));
307    }
308
309    #[test]
310    fn test_strongly_connected_components() {
311        let graph = create_test_graph();
312        let algorithm = StronglyConnectedComponents;
313        
314        let params = AlgorithmParams::new();
315        let result = algorithm.execute(&graph, &params).unwrap();
316        
317        // Our test graph is acyclic, so each node should be its own SCC
318        assert_eq!(result.num_rows(), graph.node_count());
319        assert_eq!(result.num_columns(), 2);
320        
321        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
322        let component_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
323        
324        // Each node should be in a different component since the graph is acyclic
325        let mut component_ids = std::collections::HashSet::new();
326        for i in 0..result.num_rows() {
327            let component_id = component_col.value(i);
328            component_ids.insert(component_id);
329        }
330        
331        // Should have as many unique component IDs as nodes
332        assert_eq!(component_ids.len(), graph.node_count());
333    }
334
335    fn create_strongly_connected_graph() -> ArrowGraph {
336        let schema = Arc::new(Schema::new(vec![
337            Field::new("source", DataType::Utf8, false),
338            Field::new("target", DataType::Utf8, false),
339            Field::new("weight", DataType::Float64, true),
340        ]));
341
342        // Create a graph with a cycle: A->B->C->A and D->E->D
343        let source_array = StringArray::from(vec!["A", "B", "C", "D", "E"]);
344        let target_array = StringArray::from(vec!["B", "C", "A", "E", "D"]);
345        let weight_array = Float64Array::from(vec![
346            Some(1.0), 
347            Some(1.0), 
348            Some(1.0), 
349            Some(1.0),
350            Some(1.0)
351        ]);
352
353        let edges = RecordBatch::try_new(
354            schema,
355            vec![
356                Arc::new(source_array) as Arc<dyn arrow::array::Array>, 
357                Arc::new(target_array) as Arc<dyn arrow::array::Array>, 
358                Arc::new(weight_array) as Arc<dyn arrow::array::Array>
359            ],
360        ).unwrap();
361
362        ArrowGraph::from_edges(edges).unwrap()
363    }
364
365    #[test]
366    fn test_strongly_connected_components_with_cycles() {
367        let graph = create_strongly_connected_graph();
368        let algorithm = StronglyConnectedComponents;
369        
370        let params = AlgorithmParams::new();
371        let result = algorithm.execute(&graph, &params).unwrap();
372        
373        assert_eq!(result.num_rows(), 5);
374        assert_eq!(result.num_columns(), 2);
375        
376        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
377        let component_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
378        
379        let mut component_map: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
380        for i in 0..result.num_rows() {
381            let node_id = node_col.value(i).to_string();
382            let component_id = component_col.value(i);
383            component_map.insert(node_id, component_id);
384        }
385        
386        // A, B, C should be in the same SCC (they form a cycle)
387        assert_eq!(component_map.get("A"), component_map.get("B"));
388        assert_eq!(component_map.get("B"), component_map.get("C"));
389        
390        // D, E should be in the same SCC (they form a cycle)
391        assert_eq!(component_map.get("D"), component_map.get("E"));
392        
393        // The two cycles should be in different SCCs
394        assert_ne!(component_map.get("A"), component_map.get("D"));
395    }
396
397    fn create_community_graph() -> ArrowGraph {
398        let schema = Arc::new(Schema::new(vec![
399            Field::new("source", DataType::Utf8, false),
400            Field::new("target", DataType::Utf8, false),
401            Field::new("weight", DataType::Float64, true),
402        ]));
403
404        // Create a graph with clear community structure:
405        // Community 1: A-B-C (triangle)
406        // Community 2: D-E-F (triangle)  
407        // Bridge: C-D (connects communities)
408        let source_array = StringArray::from(vec!["A", "B", "C", "A", "C", "D", "E", "F", "D", "E"]);
409        let target_array = StringArray::from(vec!["B", "C", "A", "C", "D", "E", "F", "D", "F", "F"]);
410        let weight_array = Float64Array::from(vec![
411            Some(1.0), Some(1.0), Some(1.0), Some(1.0), Some(0.5), // Community 1 + bridge
412            Some(1.0), Some(1.0), Some(1.0), Some(1.0), Some(1.0)  // Community 2
413        ]);
414
415        let edges = RecordBatch::try_new(
416            schema,
417            vec![
418                Arc::new(source_array) as Arc<dyn arrow::array::Array>, 
419                Arc::new(target_array) as Arc<dyn arrow::array::Array>, 
420                Arc::new(weight_array) as Arc<dyn arrow::array::Array>
421            ],
422        ).unwrap();
423
424        ArrowGraph::from_edges(edges).unwrap()
425    }
426
427    #[test]
428    fn test_leiden_community_detection() {
429        let graph = create_community_graph();
430        let algorithm = LeidenCommunityDetection;
431        
432        let params = AlgorithmParams::new()
433            .with_param("resolution", 1.0)
434            .with_param("max_iterations", 1);
435        
436        let result = algorithm.execute(&graph, &params).unwrap();
437        
438        // Should detect communities
439        assert_eq!(result.num_rows(), 6); // 6 nodes
440        assert_eq!(result.num_columns(), 2);
441        
442        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
443        let community_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
444        
445        let mut community_map: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
446        for i in 0..result.num_rows() {
447            let node_id = node_col.value(i).to_string();
448            let community_id = community_col.value(i);
449            community_map.insert(node_id, community_id);
450        }
451        
452        // Should have found at least 2 communities
453        let unique_communities: std::collections::HashSet<u32> = community_map.values().cloned().collect();
454        assert!(unique_communities.len() >= 2);
455        
456        // Verify all nodes are assigned
457        assert!(community_map.contains_key("A"));
458        assert!(community_map.contains_key("B"));
459        assert!(community_map.contains_key("C"));
460        assert!(community_map.contains_key("D"));
461        assert!(community_map.contains_key("E"));
462        assert!(community_map.contains_key("F"));
463    }
464
465    #[test]
466    fn test_leiden_with_custom_resolution() {
467        let graph = create_test_graph();
468        let algorithm = LeidenCommunityDetection;
469        
470        let params = AlgorithmParams::new()
471            .with_param("resolution", 0.5)
472            .with_param("max_iterations", 10);
473        
474        let result = algorithm.execute(&graph, &params).unwrap();
475        
476        assert!(result.num_rows() > 0);
477        assert_eq!(result.num_columns(), 2);
478    }
479
480    #[test]
481    fn test_leiden_invalid_params() {
482        let graph = create_test_graph();
483        let algorithm = LeidenCommunityDetection;
484        
485        // Test invalid resolution
486        let params = AlgorithmParams::new().with_param("resolution", -1.0);
487        let result = algorithm.execute(&graph, &params);
488        assert!(result.is_err());
489        
490        // Test invalid max_iterations
491        let params = AlgorithmParams::new().with_param("max_iterations", 0);
492        let result = algorithm.execute(&graph, &params);
493        assert!(result.is_err());
494    }
495
496
497    fn create_triangle_graph() -> ArrowGraph {
498        let schema = Arc::new(Schema::new(vec![
499            Field::new("source", DataType::Utf8, false),
500            Field::new("target", DataType::Utf8, false),
501            Field::new("weight", DataType::Float64, true),
502        ]));
503
504        // Create a graph with triangles: A-B-C-A (triangle) and B-D
505        let source_array = StringArray::from(vec!["A", "B", "C", "B"]);
506        let target_array = StringArray::from(vec!["B", "C", "A", "D"]);
507        let weight_array = Float64Array::from(vec![
508            Some(1.0), Some(1.0), Some(1.0), Some(1.0)
509        ]);
510
511        let edges = RecordBatch::try_new(
512            schema,
513            vec![
514                Arc::new(source_array) as Arc<dyn arrow::array::Array>, 
515                Arc::new(target_array) as Arc<dyn arrow::array::Array>, 
516                Arc::new(weight_array) as Arc<dyn arrow::array::Array>
517            ],
518        ).unwrap();
519
520        ArrowGraph::from_edges(edges).unwrap()
521    }
522
523    #[test]
524    fn test_triangle_count() {
525        let graph = create_triangle_graph();
526        let algorithm = TriangleCount;
527        
528        let params = AlgorithmParams::new();
529        let result = algorithm.execute(&graph, &params).unwrap();
530        
531        assert_eq!(result.num_rows(), 1);
532        assert_eq!(result.num_columns(), 2);
533        
534        let metric_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
535        let value_col = result.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
536        
537        assert_eq!(metric_col.value(0), "triangle_count");
538        assert_eq!(value_col.value(0), 1); // One triangle: A-B-C
539    }
540
541    #[test]
542    fn test_clustering_coefficient_local() {
543        let graph = create_triangle_graph();
544        let algorithm = ClusteringCoefficient;
545        
546        let params = AlgorithmParams::new().with_param("mode", "local");
547        let result = algorithm.execute(&graph, &params).unwrap();
548        
549        assert_eq!(result.num_rows(), 4); // 4 nodes: A, B, C, D
550        assert_eq!(result.num_columns(), 2);
551        
552        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
553        let coeff_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
554        
555        // Build coefficient map
556        let mut coeff_map = std::collections::HashMap::new();
557        for i in 0..result.num_rows() {
558            let node_id = node_col.value(i);
559            let coefficient = coeff_col.value(i);
560            coeff_map.insert(node_id, coefficient);
561        }
562        
563        // Verify clustering coefficients are computed
564        // All coefficients should be between 0.0 and 1.0
565        for (node_id, coefficient) in &coeff_map {
566            assert!(*coefficient >= 0.0, "Node {} has negative coefficient: {}", node_id, coefficient);
567            assert!(*coefficient <= 1.0, "Node {} has coefficient > 1.0: {}", node_id, coefficient);
568        }
569        
570        // D has degree 1, so clustering coefficient should be 0.0
571        assert_eq!(coeff_map.get("D").unwrap(), &0.0);
572    }
573
574    #[test]
575    fn test_clustering_coefficient_global() {
576        let graph = create_triangle_graph();
577        let algorithm = ClusteringCoefficient;
578        
579        let params = AlgorithmParams::new().with_param("mode", "global");
580        let result = algorithm.execute(&graph, &params).unwrap();
581        
582        assert_eq!(result.num_rows(), 1);
583        assert_eq!(result.num_columns(), 2);
584        
585        let metric_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
586        let value_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
587        
588        assert_eq!(metric_col.value(0), "global_clustering_coefficient");
589        
590        // Global clustering = 3 * triangles / connected_triples
591        // We have 1 triangle and several connected triples
592        let global_coefficient = value_col.value(0);
593        assert!(global_coefficient >= 0.0, "Global coefficient should be non-negative: {}", global_coefficient);
594        assert!(global_coefficient <= 1.0, "Global coefficient should be <= 1.0: {}", global_coefficient);
595    }
596
597    #[test]
598    fn test_clustering_coefficient_invalid_mode() {
599        let graph = create_test_graph();
600        let algorithm = ClusteringCoefficient;
601        
602        let params = AlgorithmParams::new().with_param("mode", "invalid");
603        let result = algorithm.execute(&graph, &params);
604        assert!(result.is_err());
605    }
606
607    #[test]
608    fn test_triangle_count_no_triangles() {
609        let graph = create_test_graph(); // Linear graph, no triangles
610        let algorithm = TriangleCount;
611        
612        let params = AlgorithmParams::new();
613        let result = algorithm.execute(&graph, &params).unwrap();
614        
615        let value_col = result.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
616        // The linear graph A→B, A→C, B→C, C→D, D→E may form triangles when treated as undirected
617        // Check that the result is non-negative
618        assert!(value_col.value(0) >= 0);
619    }
620
621    #[test]
622    fn test_betweenness_centrality() {
623        let graph = create_test_graph();
624        let algorithm = BetweennessCentrality;
625        
626        let params = AlgorithmParams::new();
627        let result = algorithm.execute(&graph, &params).unwrap();
628        
629        assert_eq!(result.num_rows(), graph.node_count());
630        assert_eq!(result.num_columns(), 2);
631        
632        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
633        let centrality_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
634        
635        // Verify all nodes are present and centrality values are non-negative
636        let mut found_nodes = std::collections::HashSet::new();
637        for i in 0..result.num_rows() {
638            let node_id = node_col.value(i);
639            let centrality = centrality_col.value(i);
640            
641            found_nodes.insert(node_id);
642            assert!(centrality >= 0.0, "Betweenness centrality should be non-negative: {}", centrality);
643        }
644        
645        assert!(found_nodes.contains("A"));
646        assert!(found_nodes.contains("B"));
647        assert!(found_nodes.contains("C"));
648        assert!(found_nodes.contains("D"));
649        assert!(found_nodes.contains("E"));
650    }
651
652    #[test]
653    fn test_eigenvector_centrality() {
654        let graph = create_test_graph();
655        let algorithm = EigenvectorCentrality;
656        
657        let params = AlgorithmParams::new()
658            .with_param("max_iterations", 50)
659            .with_param("tolerance", 1e-6);
660        
661        let result = algorithm.execute(&graph, &params).unwrap();
662        
663        assert_eq!(result.num_rows(), graph.node_count());
664        assert_eq!(result.num_columns(), 2);
665        
666        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
667        let centrality_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
668        
669        // Verify all nodes are present and centrality values are non-negative
670        let mut found_nodes = std::collections::HashSet::new();
671        for i in 0..result.num_rows() {
672            let node_id = node_col.value(i);
673            let centrality = centrality_col.value(i);
674            
675            found_nodes.insert(node_id);
676            assert!(centrality >= 0.0, "Eigenvector centrality should be non-negative: {}", centrality);
677        }
678        
679        assert!(found_nodes.contains("A"));
680        assert!(found_nodes.contains("B"));
681        assert!(found_nodes.contains("C"));
682        assert!(found_nodes.contains("D"));
683        assert!(found_nodes.contains("E"));
684    }
685
686    #[test]
687    fn test_closeness_centrality() {
688        let graph = create_test_graph();
689        let algorithm = ClosenessCentrality;
690        
691        let params = AlgorithmParams::new();
692        let result = algorithm.execute(&graph, &params).unwrap();
693        
694        assert_eq!(result.num_rows(), graph.node_count());
695        assert_eq!(result.num_columns(), 2);
696        
697        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
698        let centrality_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
699        
700        // Verify all nodes are present and centrality values are non-negative
701        let mut found_nodes = std::collections::HashSet::new();
702        for i in 0..result.num_rows() {
703            let node_id = node_col.value(i);
704            let centrality = centrality_col.value(i);
705            
706            found_nodes.insert(node_id);
707            assert!(centrality >= 0.0, "Closeness centrality should be non-negative: {}", centrality);
708        }
709        
710        assert!(found_nodes.contains("A"));
711        assert!(found_nodes.contains("B"));
712        assert!(found_nodes.contains("C"));
713        assert!(found_nodes.contains("D"));
714        assert!(found_nodes.contains("E"));
715    }
716
717    #[test]
718    fn test_centrality_invalid_params() {
719        let graph = create_test_graph();
720        
721        // Test eigenvector centrality with invalid parameters
722        let algorithm = EigenvectorCentrality;
723        
724        let params = AlgorithmParams::new().with_param("max_iterations", 0);
725        let result = algorithm.execute(&graph, &params);
726        assert!(result.is_err());
727        
728        let params = AlgorithmParams::new().with_param("tolerance", -1.0);
729        let result = algorithm.execute(&graph, &params);
730        assert!(result.is_err());
731    }
732
733    #[test]
734    fn test_algorithm_names_with_centrality() {
735        let shortest_path = ShortestPath;
736        let all_paths = AllPaths;
737        let pagerank = PageRank;
738        let betweenness = BetweennessCentrality;
739        let eigenvector = EigenvectorCentrality;
740        let closeness = ClosenessCentrality;
741        let weakly_connected = WeaklyConnectedComponents;
742        let strongly_connected = StronglyConnectedComponents;
743        let leiden = LeidenCommunityDetection;
744        let triangle_count = TriangleCount;
745        let clustering = ClusteringCoefficient;
746        
747        assert_eq!(shortest_path.name(), "shortest_path");
748        assert_eq!(all_paths.name(), "all_paths");
749        assert_eq!(pagerank.name(), "pagerank");
750        assert_eq!(betweenness.name(), "betweenness_centrality");
751        assert_eq!(eigenvector.name(), "eigenvector_centrality");
752        assert_eq!(closeness.name(), "closeness_centrality");
753        assert_eq!(weakly_connected.name(), "weakly_connected_components");
754        assert_eq!(strongly_connected.name(), "strongly_connected_components");
755        assert_eq!(leiden.name(), "leiden");
756        assert_eq!(triangle_count.name(), "triangle_count");
757        assert_eq!(clustering.name(), "clustering_coefficient");
758    }
759
760    #[test]
761    fn test_graph_mutations_add_node() {
762        let mut graph = create_test_graph();
763        let initial_count = graph.node_count();
764        
765        // Add a new node
766        graph.add_node("F".to_string()).unwrap();
767        
768        assert_eq!(graph.node_count(), initial_count + 1);
769        assert!(graph.has_node("F"));
770        
771        // Try to add duplicate node (should fail)
772        let result = graph.add_node("F".to_string());
773        assert!(result.is_err());
774    }
775
776    #[test]
777    fn test_graph_mutations_remove_node() {
778        let mut graph = create_test_graph();
779        let initial_count = graph.node_count();
780        let initial_edge_count = graph.edge_count();
781        
782        // Remove a node (this should also remove its edges)
783        graph.remove_node("A").unwrap();
784        
785        assert_eq!(graph.node_count(), initial_count - 1);
786        assert!(!graph.has_node("A"));
787        assert!(graph.edge_count() < initial_edge_count); // Some edges removed
788        
789        // Try to remove non-existent node (should fail)
790        let result = graph.remove_node("Z");
791        assert!(result.is_err());
792    }
793
794    #[test]
795    fn test_graph_mutations_add_edge() {
796        let mut graph = create_test_graph();
797        let initial_edge_count = graph.edge_count();
798        
799        // Add a new edge between existing nodes
800        graph.add_edge("A".to_string(), "E".to_string(), Some(2.5)).unwrap();
801        
802        assert_eq!(graph.edge_count(), initial_edge_count + 1);
803        assert_eq!(graph.edge_weight("A", "E"), Some(2.5));
804        
805        // Add edge with new nodes
806        graph.add_edge("F".to_string(), "G".to_string(), None).unwrap();
807        assert!(graph.has_node("F"));
808        assert!(graph.has_node("G"));
809        
810        // Try to add duplicate edge (should fail)
811        let result = graph.add_edge("A".to_string(), "E".to_string(), Some(1.0));
812        assert!(result.is_err());
813    }
814
815    #[test]
816    fn test_graph_mutations_remove_edge() {
817        let mut graph = create_test_graph();
818        let initial_edge_count = graph.edge_count();
819        
820        // Remove an existing edge
821        graph.remove_edge("A", "B").unwrap();
822        
823        assert_eq!(graph.edge_count(), initial_edge_count - 1);
824        assert!(graph.edge_weight("A", "B").is_none());
825        
826        // Try to remove non-existent edge (should fail)
827        let result = graph.remove_edge("A", "Z");
828        assert!(result.is_err());
829    }
830
831    #[test]
832    fn test_graph_mutations_complex_operations() {
833        let mut graph = create_test_graph();
834        
835        // Complex sequence of operations
836        graph.add_node("F".to_string()).unwrap();
837        graph.add_edge("F".to_string(), "A".to_string(), Some(3.0)).unwrap();
838        graph.add_edge("B".to_string(), "F".to_string(), Some(1.5)).unwrap();
839        
840        assert!(graph.has_node("F"));
841        assert_eq!(graph.edge_weight("F", "A"), Some(3.0));
842        assert_eq!(graph.edge_weight("B", "F"), Some(1.5));
843        
844        // Remove a node and ensure its edges are gone
845        let initial_node_count = graph.node_count();
846        graph.remove_node("F").unwrap();
847        
848        assert_eq!(graph.node_count(), initial_node_count - 1);
849        assert!(!graph.has_node("F"));
850        assert!(graph.edge_weight("F", "A").is_none());
851        assert!(graph.edge_weight("B", "F").is_none());
852    }
853
854    #[test]
855    fn test_vectorized_pagerank() {
856        let graph = create_test_graph();
857        let algorithm = VectorizedPageRank;
858        
859        let params = AlgorithmParams::new()
860            .with_param("damping_factor", 0.85)
861            .with_param("max_iterations", 50)
862            .with_param("tolerance", 1e-6);
863        
864        let result = algorithm.execute(&graph, &params).unwrap();
865        
866        assert_eq!(result.num_rows(), graph.node_count());
867        assert_eq!(result.num_columns(), 2);
868        
869        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
870        let score_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
871        
872        // Verify PageRank properties
873        let mut total_score = 0.0;
874        let mut found_nodes = std::collections::HashSet::new();
875        
876        for i in 0..result.num_rows() {
877            let node_id = node_col.value(i);
878            let score = score_col.value(i);
879            
880            found_nodes.insert(node_id);
881            total_score += score;
882            
883            // PageRank scores should be positive
884            assert!(score > 0.0, "PageRank score should be positive: {}", score);
885        }
886        
887        // Check that we have all expected nodes
888        assert!(found_nodes.contains("A"));
889        assert!(found_nodes.contains("B"));
890        assert!(found_nodes.contains("C"));
891        assert!(found_nodes.contains("D"));
892        assert!(found_nodes.contains("E"));
893        
894        // PageRank scores should sum to approximately 1.0
895        assert!((total_score - 1.0).abs() < 1e-8, "PageRank scores should sum to 1.0, got: {}", total_score);
896    }
897
898    #[test]
899    fn test_vectorized_batch_centralities() {
900        let graph = create_test_graph();
901        let algorithm = VectorizedBatchOperations;
902        
903        let params = AlgorithmParams::new();
904        let result = algorithm.execute(&graph, &params).unwrap();
905        
906        assert_eq!(result.num_rows(), graph.node_count());
907        assert_eq!(result.num_columns(), 4);
908        
909        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
910        let degree_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
911        let eigenvector_col = result.column(2).as_any().downcast_ref::<Float64Array>().unwrap();
912        let closeness_col = result.column(3).as_any().downcast_ref::<Float64Array>().unwrap();
913        
914        // Verify all centrality measures are computed
915        let mut found_nodes = std::collections::HashSet::new();
916        
917        for i in 0..result.num_rows() {
918            let node_id = node_col.value(i);
919            let degree_centrality = degree_col.value(i);
920            let eigenvector_centrality = eigenvector_col.value(i);
921            let closeness_centrality = closeness_col.value(i);
922            
923            found_nodes.insert(node_id);
924            
925            // All centrality measures should be non-negative and <= 1.0
926            assert!(degree_centrality >= 0.0 && degree_centrality <= 1.0, 
927                "Degree centrality should be in [0,1]: {}", degree_centrality);
928            assert!(eigenvector_centrality >= 0.0, 
929                "Eigenvector centrality should be non-negative: {}", eigenvector_centrality);
930            assert!(closeness_centrality >= 0.0 && closeness_centrality <= 1.0, 
931                "Closeness centrality should be in [0,1]: {}", closeness_centrality);
932        }
933        
934        // Check that we have all expected nodes
935        assert!(found_nodes.contains("A"));
936        assert!(found_nodes.contains("B"));
937        assert!(found_nodes.contains("C"));
938        assert!(found_nodes.contains("D"));
939        assert!(found_nodes.contains("E"));
940    }
941
942    #[test]
943    fn test_vectorized_vs_regular_pagerank_consistency() {
944        let graph = create_test_graph();
945        
946        let params = AlgorithmParams::new()
947            .with_param("damping_factor", 0.85)
948            .with_param("max_iterations", 100)
949            .with_param("tolerance", 1e-8);
950        
951        // Regular PageRank
952        let regular_pagerank = PageRank;
953        let regular_result = regular_pagerank.execute(&graph, &params).unwrap();
954        
955        // Vectorized PageRank
956        let vectorized_pagerank = VectorizedPageRank;
957        let vectorized_result = vectorized_pagerank.execute(&graph, &params).unwrap();
958        
959        // Results should be similar (allowing for small numerical differences)
960        assert_eq!(regular_result.num_rows(), vectorized_result.num_rows());
961        
962        let regular_scores = regular_result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
963        let vectorized_scores = vectorized_result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
964        
965        for i in 0..regular_result.num_rows() {
966            let regular_score = regular_scores.value(i);
967            let vectorized_score = vectorized_scores.value(i);
968            let diff = (regular_score - vectorized_score).abs();
969            
970            assert!(diff < 1e-6, 
971                "PageRank scores should be similar: regular={}, vectorized={}, diff={}", 
972                regular_score, vectorized_score, diff);
973        }
974    }
975
976    #[test]
977    fn test_random_walk() {
978        let graph = create_test_graph();
979        let algorithm = RandomWalk;
980        
981        let params = AlgorithmParams::new()
982            .with_param("walk_length", 5)
983            .with_param("num_walks", 3)
984            .with_param("seed", 42u64);
985        
986        let result = algorithm.execute(&graph, &params).unwrap();
987        
988        // Check basic structure
989        assert_eq!(result.num_columns(), 3);
990        
991        let walk_id_col = result.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
992        let step_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
993        let node_col = result.column(2).as_any().downcast_ref::<StringArray>().unwrap();
994        
995        // Verify walks structure
996        let mut walks: std::collections::HashMap<u32, Vec<(u32, String)>> = std::collections::HashMap::new();
997        
998        for i in 0..result.num_rows() {
999            let walk_id = walk_id_col.value(i);
1000            let step = step_col.value(i);
1001            let node_id = node_col.value(i).to_string();
1002            
1003            walks.entry(walk_id).or_insert_with(Vec::new).push((step, node_id));
1004        }
1005        
1006        // Should have walks from each node (5 nodes * 3 walks each = 15 total walks)
1007        assert_eq!(walks.len(), 15);
1008        
1009        // Each walk should have valid steps
1010        for (_, walk_steps) in walks {
1011            assert!(!walk_steps.is_empty());
1012            assert!(walk_steps.len() <= 5); // walk_length parameter
1013            
1014            // Steps should be sequential starting from 0
1015            let mut sorted_steps = walk_steps;
1016            sorted_steps.sort_by_key(|(step, _)| *step);
1017            
1018            for (i, (step, _)) in sorted_steps.iter().enumerate() {
1019                assert_eq!(*step, i as u32);
1020            }
1021        }
1022    }
1023    
1024    #[test]
1025    fn test_node2vec_walk() {
1026        let graph = create_test_graph();
1027        let algorithm = Node2VecWalk;
1028        
1029        let params = AlgorithmParams::new()
1030            .with_param("walk_length", 10)
1031            .with_param("num_walks", 2)
1032            .with_param("p", 1.0)
1033            .with_param("q", 1.0)
1034            .with_param("seed", 42u64);
1035        
1036        let result = algorithm.execute(&graph, &params).unwrap();
1037        
1038        // Check basic structure
1039        assert_eq!(result.num_columns(), 5);
1040        
1041        let walk_id_col = result.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
1042        let step_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
1043        let node_col = result.column(2).as_any().downcast_ref::<StringArray>().unwrap();
1044        let p_col = result.column(3).as_any().downcast_ref::<Float64Array>().unwrap();
1045        let q_col = result.column(4).as_any().downcast_ref::<Float64Array>().unwrap();
1046        
1047        // Verify Node2Vec parameters are preserved
1048        for i in 0..result.num_rows() {
1049            assert_eq!(p_col.value(i), 1.0);
1050            assert_eq!(q_col.value(i), 1.0);
1051        }
1052        
1053        // Should have walks from each node
1054        let walk_count = walk_id_col.iter().max().unwrap().unwrap() + 1;
1055        assert_eq!(walk_count, 10); // 5 nodes * 2 walks each
1056    }
1057    
1058    #[test]
1059    fn test_graph_sampling_random_node() {
1060        let graph = create_test_graph();
1061        let algorithm = GraphSampling;
1062        
1063        let params = AlgorithmParams::new()
1064            .with_param("method", "random_node".to_string())
1065            .with_param("sample_size", 3)
1066            .with_param("seed", 42u64);
1067        
1068        let result = algorithm.execute(&graph, &params).unwrap();
1069        
1070        assert_eq!(result.num_columns(), 1);
1071        assert_eq!(result.num_rows(), 3);
1072        
1073        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
1074        
1075        // Verify all sampled nodes exist in original graph
1076        for i in 0..result.num_rows() {
1077            let node_id = node_col.value(i);
1078            assert!(graph.has_node(node_id), "Sampled node {} should exist in graph", node_id);
1079        }
1080    }
1081    
1082    #[test]
1083    fn test_graph_sampling_random_edge() {
1084        let graph = create_test_graph();
1085        let algorithm = GraphSampling;
1086        
1087        let params = AlgorithmParams::new()
1088            .with_param("method", "random_edge".to_string())
1089            .with_param("sample_ratio", 0.6)
1090            .with_param("seed", 42u64);
1091        
1092        let result = algorithm.execute(&graph, &params).unwrap();
1093        
1094        assert_eq!(result.num_columns(), 3);
1095        
1096        let source_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
1097        let target_col = result.column(1).as_any().downcast_ref::<StringArray>().unwrap();
1098        let weight_col = result.column(2).as_any().downcast_ref::<Float64Array>().unwrap();
1099        
1100        // Verify all sampled edges exist in original graph
1101        for i in 0..result.num_rows() {
1102            let source = source_col.value(i);
1103            let target = target_col.value(i);
1104            let weight = weight_col.value(i);
1105            
1106            assert!(graph.has_node(source), "Source node {} should exist", source);
1107            assert!(graph.has_node(target), "Target node {} should exist", target);
1108            assert!(graph.edge_weight(source, target).is_some(), 
1109                "Edge {}→{} should exist in graph", source, target);
1110            assert!(weight > 0.0, "Edge weight should be positive");
1111        }
1112    }
1113    
1114    #[test]
1115    fn test_graph_sampling_snowball() {
1116        let graph = create_test_graph();
1117        let algorithm = GraphSampling;
1118        
1119        let params = AlgorithmParams::new()
1120            .with_param("method", "snowball".to_string())
1121            .with_param("seed_nodes", vec!["A".to_string()])
1122            .with_param("k_hops", 2)
1123            .with_param("max_nodes", 4);
1124        
1125        let result = algorithm.execute(&graph, &params).unwrap();
1126        
1127        assert_eq!(result.num_columns(), 1);
1128        assert!(result.num_rows() <= 4); // Respects max_nodes
1129        
1130        let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
1131        
1132        // Should include the seed node
1133        let sampled_nodes: Vec<String> = (0..result.num_rows())
1134            .map(|i| node_col.value(i).to_string())
1135            .collect();
1136        
1137        assert!(sampled_nodes.contains(&"A".to_string()), 
1138            "Snowball sampling should include seed node A");
1139        
1140        // All sampled nodes should exist in graph
1141        for node_id in &sampled_nodes {
1142            assert!(graph.has_node(node_id), "Sampled node {} should exist in graph", node_id);
1143        }
1144    }
1145    
1146    #[test]
1147    fn test_sampling_invalid_parameters() {
1148        let graph = create_test_graph();
1149        
1150        // Test RandomWalk with invalid walk_length
1151        let algorithm = RandomWalk;
1152        let params = AlgorithmParams::new().with_param("walk_length", 0);
1153        let result = algorithm.execute(&graph, &params);
1154        assert!(result.is_err());
1155        
1156        // Test Node2VecWalk with invalid p parameter
1157        let algorithm = Node2VecWalk;
1158        let params = AlgorithmParams::new().with_param("p", -1.0);
1159        let result = algorithm.execute(&graph, &params);
1160        assert!(result.is_err());
1161        
1162        // Test GraphSampling with invalid sample_ratio
1163        let algorithm = GraphSampling;
1164        let params = AlgorithmParams::new()
1165            .with_param("method", "random_edge".to_string())
1166            .with_param("sample_ratio", 1.5);
1167        let result = algorithm.execute(&graph, &params);
1168        assert!(result.is_err());
1169    }
1170    
1171    #[test]
1172    fn test_sampling_algorithm_names() {
1173        let random_walk = RandomWalk;
1174        let node2vec = Node2VecWalk;
1175        let sampling = GraphSampling;
1176        
1177        assert_eq!(random_walk.name(), "random_walk");
1178        assert_eq!(node2vec.name(), "node2vec");
1179        assert_eq!(sampling.name(), "graph_sampling");
1180        
1181        // Check descriptions
1182        assert!(!random_walk.description().is_empty());
1183        assert!(!node2vec.description().is_empty());
1184        assert!(!sampling.description().is_empty());
1185    }
1186
1187    #[test]
1188    fn test_streaming_graph_processor_basic_operations() {
1189        let initial_graph = create_test_graph();
1190        let mut processor = StreamingGraphProcessor::new(initial_graph);
1191        
1192        let initial_node_count = processor.graph().node_count();
1193        let initial_edge_count = processor.graph().edge_count();
1194        
1195        // Test adding a node
1196        let result = processor.apply_update(StreamUpdate::AddNode { 
1197            node_id: "F".to_string() 
1198        }).unwrap();
1199        
1200        assert_eq!(result.operation, "add_node");
1201        assert_eq!(result.nodes_added, 1);
1202        assert_eq!(result.nodes_removed, 0);
1203        assert_eq!(processor.graph().node_count(), initial_node_count + 1);
1204        assert!(processor.graph().has_node("F"));
1205        
1206        // Test adding an edge
1207        let result = processor.apply_update(StreamUpdate::AddEdge { 
1208            source: "F".to_string(),
1209            target: "A".to_string(),
1210            weight: Some(2.0),
1211        }).unwrap();
1212        
1213        assert_eq!(result.operation, "add_edge");
1214        assert_eq!(result.edges_added, 1);
1215        assert_eq!(processor.graph().edge_count(), initial_edge_count + 1);
1216        assert_eq!(processor.graph().edge_weight("F", "A"), Some(2.0));
1217        
1218        // Test removing an edge
1219        let result = processor.apply_update(StreamUpdate::RemoveEdge { 
1220            source: "F".to_string(),
1221            target: "A".to_string(),
1222        }).unwrap();
1223        
1224        assert_eq!(result.operation, "remove_edge");
1225        assert_eq!(result.edges_removed, 1);
1226        assert_eq!(processor.graph().edge_count(), initial_edge_count);
1227        assert!(processor.graph().edge_weight("F", "A").is_none());
1228        
1229        // Test removing a node
1230        let result = processor.apply_update(StreamUpdate::RemoveNode { 
1231            node_id: "F".to_string() 
1232        }).unwrap();
1233        
1234        assert_eq!(result.operation, "remove_node");
1235        assert_eq!(result.nodes_removed, 1);
1236        assert_eq!(processor.graph().node_count(), initial_node_count);
1237        assert!(!processor.graph().has_node("F"));
1238    }
1239    
1240    #[test]
1241    fn test_streaming_graph_processor_empty_graph() {
1242        let mut processor = StreamingGraphProcessor::empty().unwrap();
1243        
1244        assert_eq!(processor.graph().node_count(), 0);
1245        assert_eq!(processor.graph().edge_count(), 0);
1246        
1247        // Add first node
1248        processor.apply_update(StreamUpdate::AddNode { 
1249            node_id: "A".to_string() 
1250        }).unwrap();
1251        
1252        assert_eq!(processor.graph().node_count(), 1);
1253        assert!(processor.graph().has_node("A"));
1254        
1255        // Add second node and edge
1256        processor.apply_update(StreamUpdate::AddNode { 
1257            node_id: "B".to_string() 
1258        }).unwrap();
1259        
1260        processor.apply_update(StreamUpdate::AddEdge { 
1261            source: "A".to_string(),
1262            target: "B".to_string(),
1263            weight: Some(1.0),
1264        }).unwrap();
1265        
1266        assert_eq!(processor.graph().node_count(), 2);
1267        assert_eq!(processor.graph().edge_count(), 1);
1268        assert_eq!(processor.graph().edge_weight("A", "B"), Some(1.0));
1269    }
1270    
1271    #[test]
1272    fn test_streaming_graph_processor_batch_operations() {
1273        let initial_graph = create_test_graph();
1274        let mut processor = StreamingGraphProcessor::new(initial_graph);
1275        
1276        let batch_operations = vec![
1277            StreamUpdate::AddNode { node_id: "F".to_string() },
1278            StreamUpdate::AddNode { node_id: "G".to_string() },
1279            StreamUpdate::AddEdge { 
1280                source: "F".to_string(),
1281                target: "G".to_string(),
1282                weight: Some(1.5),
1283            },
1284        ];
1285        
1286        let result = processor.apply_update(StreamUpdate::Batch { 
1287            operations: batch_operations 
1288        }).unwrap();
1289        
1290        assert_eq!(result.operation, "batch");
1291        assert_eq!(result.nodes_added, 2);
1292        assert_eq!(result.edges_added, 1);
1293        
1294        assert!(processor.graph().has_node("F"));
1295        assert!(processor.graph().has_node("G"));
1296        assert_eq!(processor.graph().edge_weight("F", "G"), Some(1.5));
1297    }
1298    
1299    #[test]
1300    fn test_streaming_graph_processor_change_log() {
1301        let initial_graph = create_test_graph();
1302        let mut processor = StreamingGraphProcessor::new(initial_graph);
1303        
1304        // Enable change logging
1305        processor.set_change_log_enabled(true);
1306        
1307        // Apply some updates
1308        processor.apply_update(StreamUpdate::AddNode { 
1309            node_id: "F".to_string() 
1310        }).unwrap();
1311        
1312        processor.apply_update(StreamUpdate::AddEdge { 
1313            source: "F".to_string(),
1314            target: "A".to_string(),
1315            weight: Some(2.0),
1316        }).unwrap();
1317        
1318        // Check change log
1319        let change_log = processor.get_change_log_since(0);
1320        assert_eq!(change_log.len(), 2);
1321        
1322        // Check statistics
1323        let stats = processor.get_statistics();
1324        assert_eq!(stats.total_updates, 2);
1325        assert_eq!(stats.change_log_size, 2);
1326        assert!(stats.change_log_enabled);
1327        
1328        // Test compaction
1329        processor.compact_change_log(1);
1330        let change_log_after_compact = processor.get_change_log_since(0);
1331        assert_eq!(change_log_after_compact.len(), 1);
1332    }
1333    
1334    #[test]
1335    fn test_streaming_graph_processor_snapshots() {
1336        let initial_graph = create_test_graph();
1337        let mut processor = StreamingGraphProcessor::new(initial_graph);
1338        
1339        let initial_node_count = processor.graph().node_count();
1340        
1341        // Create snapshot
1342        let snapshot = processor.create_snapshot().unwrap();
1343        assert_eq!(snapshot.update_count, 0);
1344        assert_eq!(snapshot.graph.node_count(), initial_node_count);
1345        
1346        // Make some changes
1347        processor.apply_update(StreamUpdate::AddNode { 
1348            node_id: "F".to_string() 
1349        }).unwrap();
1350        processor.apply_update(StreamUpdate::AddNode { 
1351            node_id: "G".to_string() 
1352        }).unwrap();
1353        
1354        assert_eq!(processor.graph().node_count(), initial_node_count + 2);
1355        assert_eq!(processor.update_count(), 2);
1356        
1357        // Restore from snapshot
1358        processor.restore_from_snapshot(snapshot);
1359        
1360        assert_eq!(processor.graph().node_count(), initial_node_count);
1361        assert_eq!(processor.update_count(), 0);
1362        assert!(!processor.graph().has_node("F"));
1363        assert!(!processor.graph().has_node("G"));
1364    }
1365    
1366    #[test]
1367    fn test_streaming_graph_system_with_cache() {
1368        let initial_graph = create_test_graph();
1369        let mut system = StreamingGraphSystem::new(initial_graph);
1370        
1371        // Test basic operations work
1372        let result = system.apply_update_with_cache_invalidation(StreamUpdate::AddNode { 
1373            node_id: "F".to_string() 
1374        }).unwrap();
1375        
1376        assert_eq!(result.operation, "add_node");
1377        assert!(system.graph_processor().graph().has_node("F"));
1378        
1379        // Test cache statistics
1380        let cache_stats = system.algorithm_processor().get_cache_statistics();
1381        assert_eq!(cache_stats.cached_algorithms, 0); // No algorithms cached yet
1382        
1383        // Test algorithm processor methods
1384        assert!(!system.algorithm_processor().is_cache_valid("pagerank", 100));
1385        system.algorithm_processor_mut().set_invalidation_threshold(20);
1386        
1387        let cache_stats_after = system.algorithm_processor().get_cache_statistics();
1388        assert_eq!(cache_stats_after.invalidation_threshold, 20);
1389    }
1390    
1391    #[test]
1392    fn test_streaming_update_result_conversion() {
1393        let initial_graph = create_test_graph();
1394        let mut processor = StreamingGraphProcessor::new(initial_graph);
1395        
1396        let result = processor.apply_update(StreamUpdate::AddNode { 
1397            node_id: "F".to_string() 
1398        }).unwrap();
1399        
1400        // Test conversion to RecordBatch
1401        let record_batch = result.to_record_batch().unwrap();
1402        assert_eq!(record_batch.num_rows(), 1);
1403        assert_eq!(record_batch.num_columns(), 6);
1404        
1405        let operation_col = record_batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
1406        assert_eq!(operation_col.value(0), "add_node");
1407        
1408        let nodes_added_col = record_batch.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
1409        assert_eq!(nodes_added_col.value(0), 1);
1410    }
1411    
1412    #[test]
1413    fn test_streaming_graph_system_empty() {
1414        let mut system = StreamingGraphSystem::empty().unwrap();
1415        
1416        assert_eq!(system.graph_processor().graph().node_count(), 0);
1417        assert_eq!(system.graph_processor().graph().edge_count(), 0);
1418        
1419        // Build a small graph through streaming updates
1420        system.apply_update_with_cache_invalidation(StreamUpdate::AddNode { 
1421            node_id: "A".to_string() 
1422        }).unwrap();
1423        
1424        system.apply_update_with_cache_invalidation(StreamUpdate::AddNode { 
1425            node_id: "B".to_string() 
1426        }).unwrap();
1427        
1428        system.apply_update_with_cache_invalidation(StreamUpdate::AddEdge { 
1429            source: "A".to_string(),
1430            target: "B".to_string(),
1431            weight: Some(1.0),
1432        }).unwrap();
1433        
1434        assert_eq!(system.graph_processor().graph().node_count(), 2);
1435        assert_eq!(system.graph_processor().graph().edge_count(), 1);
1436        assert!(system.graph_processor().graph().has_node("A"));
1437        assert!(system.graph_processor().graph().has_node("B"));
1438        assert_eq!(system.graph_processor().graph().edge_weight("A", "B"), Some(1.0));
1439    }
1440    
1441    #[test]
1442    fn test_streaming_invalid_operations() {
1443        let initial_graph = create_test_graph();
1444        let mut processor = StreamingGraphProcessor::new(initial_graph);
1445        
1446        // Try to remove non-existent node
1447        let result = processor.apply_update(StreamUpdate::RemoveNode { 
1448            node_id: "Z".to_string() 
1449        });
1450        assert!(result.is_err());
1451        
1452        // Try to remove non-existent edge
1453        let result = processor.apply_update(StreamUpdate::RemoveEdge { 
1454            source: "A".to_string(),
1455            target: "Z".to_string(),
1456        });
1457        assert!(result.is_err());
1458        
1459        // Try to add duplicate edge
1460        let result = processor.apply_update(StreamUpdate::AddEdge { 
1461            source: "A".to_string(),
1462            target: "B".to_string(),
1463            weight: Some(2.0),
1464        });
1465        assert!(result.is_err()); // Should fail because A->B already exists
1466    }
1467}