1#[cfg(test)]
2mod tests {
3 use crate::algorithms::pathfinding::{ShortestPath, AllPaths};
4 use crate::algorithms::centrality::{PageRank, BetweennessCentrality, EigenvectorCentrality, ClosenessCentrality};
5 use crate::algorithms::components::{WeaklyConnectedComponents, StronglyConnectedComponents};
6 use crate::algorithms::community::LeidenCommunityDetection;
7 use crate::algorithms::aggregation::{TriangleCount, ClusteringCoefficient};
8 use crate::algorithms::vectorized::{VectorizedPageRank, VectorizedBatchOperations};
9 use crate::algorithms::sampling::{RandomWalk, Node2VecWalk, GraphSampling};
10 use crate::graph::{StreamingGraphProcessor, StreamingGraphSystem, StreamUpdate};
11 use crate::{GraphAlgorithm, AlgorithmParams};
12 use crate::ArrowGraph;
13 use arrow::record_batch::RecordBatch;
14 use arrow::array::{StringArray, Float64Array, UInt32Array, UInt64Array};
15 use arrow::datatypes::{DataType, Field, Schema};
16 use std::sync::Arc;
17
18 fn create_test_graph() -> ArrowGraph {
19 let schema = Arc::new(Schema::new(vec![
20 Field::new("source", DataType::Utf8, false),
21 Field::new("target", DataType::Utf8, false),
22 Field::new("weight", DataType::Float64, true),
23 ]));
24
25 let source_array = StringArray::from(vec!["A", "A", "B", "C", "D"]);
26 let target_array = StringArray::from(vec!["B", "C", "C", "D", "E"]);
27 let weight_array = Float64Array::from(vec![
28 Some(1.0),
29 Some(4.0),
30 Some(2.0),
31 Some(1.0),
32 Some(1.0)
33 ]);
34
35 let edges = RecordBatch::try_new(
36 schema,
37 vec![
38 Arc::new(source_array) as Arc<dyn arrow::array::Array>,
39 Arc::new(target_array) as Arc<dyn arrow::array::Array>,
40 Arc::new(weight_array) as Arc<dyn arrow::array::Array>
41 ],
42 ).unwrap();
43
44 ArrowGraph::from_edges(edges).unwrap()
45 }
46
47 fn create_disconnected_graph() -> ArrowGraph {
48 let schema = Arc::new(Schema::new(vec![
49 Field::new("source", DataType::Utf8, false),
50 Field::new("target", DataType::Utf8, false),
51 Field::new("weight", DataType::Float64, true),
52 ]));
53
54 let source_array = StringArray::from(vec!["A", "C"]);
56 let target_array = StringArray::from(vec!["B", "D"]);
57 let weight_array = Float64Array::from(vec![Some(1.0), Some(1.0)]);
58
59 let edges = RecordBatch::try_new(
60 schema,
61 vec![
62 Arc::new(source_array) as Arc<dyn arrow::array::Array>,
63 Arc::new(target_array) as Arc<dyn arrow::array::Array>,
64 Arc::new(weight_array) as Arc<dyn arrow::array::Array>
65 ],
66 ).unwrap();
67
68 ArrowGraph::from_edges(edges).unwrap()
69 }
70
71 #[test]
72 fn test_shortest_path_single_target() {
73 let graph = create_test_graph();
74 let algorithm = ShortestPath;
75
76 let params = AlgorithmParams::new()
77 .with_param("source", "A")
78 .with_param("target", "D");
79
80 let result = algorithm.execute(&graph, ¶ms).unwrap();
81
82 assert_eq!(result.num_rows(), 1);
83 assert_eq!(result.num_columns(), 4);
84
85 let source_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
87 let target_col = result.column(1).as_any().downcast_ref::<StringArray>().unwrap();
88 let distance_col = result.column(2).as_any().downcast_ref::<Float64Array>().unwrap();
89
90 assert_eq!(source_col.value(0), "A");
91 assert_eq!(target_col.value(0), "D");
92 assert_eq!(distance_col.value(0), 4.0); }
94
95 #[test]
96 fn test_shortest_path_all_targets() {
97 let graph = create_test_graph();
98 let algorithm = ShortestPath;
99
100 let params = AlgorithmParams::new()
101 .with_param("source", "A");
102
103 let result = algorithm.execute(&graph, ¶ms).unwrap();
104
105 assert!(result.num_rows() > 0);
107 assert_eq!(result.num_columns(), 3);
108
109 let source_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
110
111 for i in 0..result.num_rows() {
113 assert_eq!(source_col.value(i), "A");
114 }
115 }
116
117 #[test]
118 fn test_all_paths() {
119 let graph = create_test_graph();
120 let algorithm = AllPaths;
121
122 let params = AlgorithmParams::new()
123 .with_param("source", "A")
124 .with_param("target", "D")
125 .with_param("max_hops", 5);
126
127 let result = algorithm.execute(&graph, ¶ms).unwrap();
128
129 assert!(result.num_rows() >= 1);
131 assert_eq!(result.num_columns(), 4);
132
133 let source_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
134 let target_col = result.column(1).as_any().downcast_ref::<StringArray>().unwrap();
135
136 for i in 0..result.num_rows() {
137 assert_eq!(source_col.value(i), "A");
138 assert_eq!(target_col.value(i), "D");
139 }
140 }
141
142 #[test]
143 fn test_shortest_path_no_path() {
144 let graph = create_test_graph();
145 let algorithm = ShortestPath;
146
147 let params = AlgorithmParams::new()
148 .with_param("source", "E")
149 .with_param("target", "A");
150
151 let result = algorithm.execute(&graph, ¶ms);
152
153 assert!(result.is_err());
155 }
156
157 #[test]
158 fn test_pagerank() {
159 let graph = create_test_graph();
160 let algorithm = PageRank;
161
162 let params = AlgorithmParams::new()
163 .with_param("damping_factor", 0.85)
164 .with_param("max_iterations", 100)
165 .with_param("tolerance", 1e-6);
166
167 let result = algorithm.execute(&graph, ¶ms).unwrap();
168
169 assert_eq!(result.num_rows(), graph.node_count());
171 assert_eq!(result.num_columns(), 2);
172
173 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
174 let score_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
175
176 let mut found_nodes = std::collections::HashSet::new();
178 let mut total_score = 0.0;
179
180 for i in 0..result.num_rows() {
181 let node_id = node_col.value(i);
182 let score = score_col.value(i);
183
184 found_nodes.insert(node_id);
185 total_score += score;
186
187 assert!(score > 0.0);
189 }
190
191 assert!(found_nodes.contains("A"));
193 assert!(found_nodes.contains("B"));
194 assert!(found_nodes.contains("C"));
195 assert!(found_nodes.contains("D"));
196 assert!(found_nodes.contains("E"));
197
198 assert!((total_score - 1.0).abs() < 1e-10);
200 }
201
202 #[test]
203 fn test_pagerank_with_custom_params() {
204 let graph = create_test_graph();
205 let algorithm = PageRank;
206
207 let params = AlgorithmParams::new()
208 .with_param("damping_factor", 0.5)
209 .with_param("max_iterations", 10)
210 .with_param("tolerance", 1e-3);
211
212 let result = algorithm.execute(&graph, ¶ms).unwrap();
213
214 assert!(result.num_rows() > 0);
215
216 let score_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
217 let total_score: f64 = (0..result.num_rows()).map(|i| score_col.value(i)).sum();
218
219 assert!((total_score - 1.0).abs() < 1e-2);
221 }
222
223 #[test]
224 fn test_pagerank_invalid_params() {
225 let graph = create_test_graph();
226 let algorithm = PageRank;
227
228 let params = AlgorithmParams::new().with_param("damping_factor", 1.5);
230 let result = algorithm.execute(&graph, ¶ms);
231 assert!(result.is_err());
232
233 let params = AlgorithmParams::new().with_param("max_iterations", 0);
235 let result = algorithm.execute(&graph, ¶ms);
236 assert!(result.is_err());
237
238 let params = AlgorithmParams::new().with_param("tolerance", -1.0);
240 let result = algorithm.execute(&graph, ¶ms);
241 assert!(result.is_err());
242 }
243
244 #[test]
245 fn test_weakly_connected_components() {
246 let graph = create_test_graph();
247 let algorithm = WeaklyConnectedComponents;
248
249 let params = AlgorithmParams::new();
250 let result = algorithm.execute(&graph, ¶ms).unwrap();
251
252 assert_eq!(result.num_rows(), graph.node_count());
254 assert_eq!(result.num_columns(), 2);
255
256 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
257 let component_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
258
259 let mut found_nodes = std::collections::HashSet::new();
261 for i in 0..result.num_rows() {
262 let node_id = node_col.value(i);
263 let component_id = component_col.value(i);
264
265 found_nodes.insert(node_id);
266 assert_eq!(component_id, 0); }
268
269 assert!(found_nodes.contains("A"));
271 assert!(found_nodes.contains("B"));
272 assert!(found_nodes.contains("C"));
273 assert!(found_nodes.contains("D"));
274 assert!(found_nodes.contains("E"));
275 }
276
277 #[test]
278 fn test_weakly_connected_components_disconnected() {
279 let graph = create_disconnected_graph();
280 let algorithm = WeaklyConnectedComponents;
281
282 let params = AlgorithmParams::new();
283 let result = algorithm.execute(&graph, ¶ms).unwrap();
284
285 assert_eq!(result.num_rows(), 4);
287 assert_eq!(result.num_columns(), 2);
288
289 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
290 let component_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
291
292 let mut component_map: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
293 for i in 0..result.num_rows() {
294 let node_id = node_col.value(i).to_string();
295 let component_id = component_col.value(i);
296 component_map.insert(node_id, component_id);
297 }
298
299 assert_eq!(component_map.get("A"), component_map.get("B"));
301
302 assert_eq!(component_map.get("C"), component_map.get("D"));
304
305 assert_ne!(component_map.get("A"), component_map.get("C"));
307 }
308
309 #[test]
310 fn test_strongly_connected_components() {
311 let graph = create_test_graph();
312 let algorithm = StronglyConnectedComponents;
313
314 let params = AlgorithmParams::new();
315 let result = algorithm.execute(&graph, ¶ms).unwrap();
316
317 assert_eq!(result.num_rows(), graph.node_count());
319 assert_eq!(result.num_columns(), 2);
320
321 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
322 let component_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
323
324 let mut component_ids = std::collections::HashSet::new();
326 for i in 0..result.num_rows() {
327 let component_id = component_col.value(i);
328 component_ids.insert(component_id);
329 }
330
331 assert_eq!(component_ids.len(), graph.node_count());
333 }
334
335 fn create_strongly_connected_graph() -> ArrowGraph {
336 let schema = Arc::new(Schema::new(vec![
337 Field::new("source", DataType::Utf8, false),
338 Field::new("target", DataType::Utf8, false),
339 Field::new("weight", DataType::Float64, true),
340 ]));
341
342 let source_array = StringArray::from(vec!["A", "B", "C", "D", "E"]);
344 let target_array = StringArray::from(vec!["B", "C", "A", "E", "D"]);
345 let weight_array = Float64Array::from(vec![
346 Some(1.0),
347 Some(1.0),
348 Some(1.0),
349 Some(1.0),
350 Some(1.0)
351 ]);
352
353 let edges = RecordBatch::try_new(
354 schema,
355 vec![
356 Arc::new(source_array) as Arc<dyn arrow::array::Array>,
357 Arc::new(target_array) as Arc<dyn arrow::array::Array>,
358 Arc::new(weight_array) as Arc<dyn arrow::array::Array>
359 ],
360 ).unwrap();
361
362 ArrowGraph::from_edges(edges).unwrap()
363 }
364
365 #[test]
366 fn test_strongly_connected_components_with_cycles() {
367 let graph = create_strongly_connected_graph();
368 let algorithm = StronglyConnectedComponents;
369
370 let params = AlgorithmParams::new();
371 let result = algorithm.execute(&graph, ¶ms).unwrap();
372
373 assert_eq!(result.num_rows(), 5);
374 assert_eq!(result.num_columns(), 2);
375
376 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
377 let component_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
378
379 let mut component_map: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
380 for i in 0..result.num_rows() {
381 let node_id = node_col.value(i).to_string();
382 let component_id = component_col.value(i);
383 component_map.insert(node_id, component_id);
384 }
385
386 assert_eq!(component_map.get("A"), component_map.get("B"));
388 assert_eq!(component_map.get("B"), component_map.get("C"));
389
390 assert_eq!(component_map.get("D"), component_map.get("E"));
392
393 assert_ne!(component_map.get("A"), component_map.get("D"));
395 }
396
397 fn create_community_graph() -> ArrowGraph {
398 let schema = Arc::new(Schema::new(vec![
399 Field::new("source", DataType::Utf8, false),
400 Field::new("target", DataType::Utf8, false),
401 Field::new("weight", DataType::Float64, true),
402 ]));
403
404 let source_array = StringArray::from(vec!["A", "B", "C", "A", "C", "D", "E", "F", "D", "E"]);
409 let target_array = StringArray::from(vec!["B", "C", "A", "C", "D", "E", "F", "D", "F", "F"]);
410 let weight_array = Float64Array::from(vec![
411 Some(1.0), Some(1.0), Some(1.0), Some(1.0), Some(0.5), Some(1.0), Some(1.0), Some(1.0), Some(1.0), Some(1.0) ]);
414
415 let edges = RecordBatch::try_new(
416 schema,
417 vec![
418 Arc::new(source_array) as Arc<dyn arrow::array::Array>,
419 Arc::new(target_array) as Arc<dyn arrow::array::Array>,
420 Arc::new(weight_array) as Arc<dyn arrow::array::Array>
421 ],
422 ).unwrap();
423
424 ArrowGraph::from_edges(edges).unwrap()
425 }
426
427 #[test]
428 fn test_leiden_community_detection() {
429 let graph = create_community_graph();
430 let algorithm = LeidenCommunityDetection;
431
432 let params = AlgorithmParams::new()
433 .with_param("resolution", 1.0)
434 .with_param("max_iterations", 1);
435
436 let result = algorithm.execute(&graph, ¶ms).unwrap();
437
438 assert_eq!(result.num_rows(), 6); assert_eq!(result.num_columns(), 2);
441
442 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
443 let community_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
444
445 let mut community_map: std::collections::HashMap<String, u32> = std::collections::HashMap::new();
446 for i in 0..result.num_rows() {
447 let node_id = node_col.value(i).to_string();
448 let community_id = community_col.value(i);
449 community_map.insert(node_id, community_id);
450 }
451
452 let unique_communities: std::collections::HashSet<u32> = community_map.values().cloned().collect();
454 assert!(unique_communities.len() >= 2);
455
456 assert!(community_map.contains_key("A"));
458 assert!(community_map.contains_key("B"));
459 assert!(community_map.contains_key("C"));
460 assert!(community_map.contains_key("D"));
461 assert!(community_map.contains_key("E"));
462 assert!(community_map.contains_key("F"));
463 }
464
465 #[test]
466 fn test_leiden_with_custom_resolution() {
467 let graph = create_test_graph();
468 let algorithm = LeidenCommunityDetection;
469
470 let params = AlgorithmParams::new()
471 .with_param("resolution", 0.5)
472 .with_param("max_iterations", 10);
473
474 let result = algorithm.execute(&graph, ¶ms).unwrap();
475
476 assert!(result.num_rows() > 0);
477 assert_eq!(result.num_columns(), 2);
478 }
479
480 #[test]
481 fn test_leiden_invalid_params() {
482 let graph = create_test_graph();
483 let algorithm = LeidenCommunityDetection;
484
485 let params = AlgorithmParams::new().with_param("resolution", -1.0);
487 let result = algorithm.execute(&graph, ¶ms);
488 assert!(result.is_err());
489
490 let params = AlgorithmParams::new().with_param("max_iterations", 0);
492 let result = algorithm.execute(&graph, ¶ms);
493 assert!(result.is_err());
494 }
495
496
497 fn create_triangle_graph() -> ArrowGraph {
498 let schema = Arc::new(Schema::new(vec![
499 Field::new("source", DataType::Utf8, false),
500 Field::new("target", DataType::Utf8, false),
501 Field::new("weight", DataType::Float64, true),
502 ]));
503
504 let source_array = StringArray::from(vec!["A", "B", "C", "B"]);
506 let target_array = StringArray::from(vec!["B", "C", "A", "D"]);
507 let weight_array = Float64Array::from(vec![
508 Some(1.0), Some(1.0), Some(1.0), Some(1.0)
509 ]);
510
511 let edges = RecordBatch::try_new(
512 schema,
513 vec![
514 Arc::new(source_array) as Arc<dyn arrow::array::Array>,
515 Arc::new(target_array) as Arc<dyn arrow::array::Array>,
516 Arc::new(weight_array) as Arc<dyn arrow::array::Array>
517 ],
518 ).unwrap();
519
520 ArrowGraph::from_edges(edges).unwrap()
521 }
522
523 #[test]
524 fn test_triangle_count() {
525 let graph = create_triangle_graph();
526 let algorithm = TriangleCount;
527
528 let params = AlgorithmParams::new();
529 let result = algorithm.execute(&graph, ¶ms).unwrap();
530
531 assert_eq!(result.num_rows(), 1);
532 assert_eq!(result.num_columns(), 2);
533
534 let metric_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
535 let value_col = result.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
536
537 assert_eq!(metric_col.value(0), "triangle_count");
538 assert_eq!(value_col.value(0), 1); }
540
541 #[test]
542 fn test_clustering_coefficient_local() {
543 let graph = create_triangle_graph();
544 let algorithm = ClusteringCoefficient;
545
546 let params = AlgorithmParams::new().with_param("mode", "local");
547 let result = algorithm.execute(&graph, ¶ms).unwrap();
548
549 assert_eq!(result.num_rows(), 4); assert_eq!(result.num_columns(), 2);
551
552 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
553 let coeff_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
554
555 let mut coeff_map = std::collections::HashMap::new();
557 for i in 0..result.num_rows() {
558 let node_id = node_col.value(i);
559 let coefficient = coeff_col.value(i);
560 coeff_map.insert(node_id, coefficient);
561 }
562
563 for (node_id, coefficient) in &coeff_map {
566 assert!(*coefficient >= 0.0, "Node {} has negative coefficient: {}", node_id, coefficient);
567 assert!(*coefficient <= 1.0, "Node {} has coefficient > 1.0: {}", node_id, coefficient);
568 }
569
570 assert_eq!(coeff_map.get("D").unwrap(), &0.0);
572 }
573
574 #[test]
575 fn test_clustering_coefficient_global() {
576 let graph = create_triangle_graph();
577 let algorithm = ClusteringCoefficient;
578
579 let params = AlgorithmParams::new().with_param("mode", "global");
580 let result = algorithm.execute(&graph, ¶ms).unwrap();
581
582 assert_eq!(result.num_rows(), 1);
583 assert_eq!(result.num_columns(), 2);
584
585 let metric_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
586 let value_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
587
588 assert_eq!(metric_col.value(0), "global_clustering_coefficient");
589
590 let global_coefficient = value_col.value(0);
593 assert!(global_coefficient >= 0.0, "Global coefficient should be non-negative: {}", global_coefficient);
594 assert!(global_coefficient <= 1.0, "Global coefficient should be <= 1.0: {}", global_coefficient);
595 }
596
597 #[test]
598 fn test_clustering_coefficient_invalid_mode() {
599 let graph = create_test_graph();
600 let algorithm = ClusteringCoefficient;
601
602 let params = AlgorithmParams::new().with_param("mode", "invalid");
603 let result = algorithm.execute(&graph, ¶ms);
604 assert!(result.is_err());
605 }
606
607 #[test]
608 fn test_triangle_count_no_triangles() {
609 let graph = create_test_graph(); let algorithm = TriangleCount;
611
612 let params = AlgorithmParams::new();
613 let result = algorithm.execute(&graph, ¶ms).unwrap();
614
615 let value_col = result.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
616 assert!(value_col.value(0) >= 0);
619 }
620
621 #[test]
622 fn test_betweenness_centrality() {
623 let graph = create_test_graph();
624 let algorithm = BetweennessCentrality;
625
626 let params = AlgorithmParams::new();
627 let result = algorithm.execute(&graph, ¶ms).unwrap();
628
629 assert_eq!(result.num_rows(), graph.node_count());
630 assert_eq!(result.num_columns(), 2);
631
632 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
633 let centrality_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
634
635 let mut found_nodes = std::collections::HashSet::new();
637 for i in 0..result.num_rows() {
638 let node_id = node_col.value(i);
639 let centrality = centrality_col.value(i);
640
641 found_nodes.insert(node_id);
642 assert!(centrality >= 0.0, "Betweenness centrality should be non-negative: {}", centrality);
643 }
644
645 assert!(found_nodes.contains("A"));
646 assert!(found_nodes.contains("B"));
647 assert!(found_nodes.contains("C"));
648 assert!(found_nodes.contains("D"));
649 assert!(found_nodes.contains("E"));
650 }
651
652 #[test]
653 fn test_eigenvector_centrality() {
654 let graph = create_test_graph();
655 let algorithm = EigenvectorCentrality;
656
657 let params = AlgorithmParams::new()
658 .with_param("max_iterations", 50)
659 .with_param("tolerance", 1e-6);
660
661 let result = algorithm.execute(&graph, ¶ms).unwrap();
662
663 assert_eq!(result.num_rows(), graph.node_count());
664 assert_eq!(result.num_columns(), 2);
665
666 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
667 let centrality_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
668
669 let mut found_nodes = std::collections::HashSet::new();
671 for i in 0..result.num_rows() {
672 let node_id = node_col.value(i);
673 let centrality = centrality_col.value(i);
674
675 found_nodes.insert(node_id);
676 assert!(centrality >= 0.0, "Eigenvector centrality should be non-negative: {}", centrality);
677 }
678
679 assert!(found_nodes.contains("A"));
680 assert!(found_nodes.contains("B"));
681 assert!(found_nodes.contains("C"));
682 assert!(found_nodes.contains("D"));
683 assert!(found_nodes.contains("E"));
684 }
685
686 #[test]
687 fn test_closeness_centrality() {
688 let graph = create_test_graph();
689 let algorithm = ClosenessCentrality;
690
691 let params = AlgorithmParams::new();
692 let result = algorithm.execute(&graph, ¶ms).unwrap();
693
694 assert_eq!(result.num_rows(), graph.node_count());
695 assert_eq!(result.num_columns(), 2);
696
697 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
698 let centrality_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
699
700 let mut found_nodes = std::collections::HashSet::new();
702 for i in 0..result.num_rows() {
703 let node_id = node_col.value(i);
704 let centrality = centrality_col.value(i);
705
706 found_nodes.insert(node_id);
707 assert!(centrality >= 0.0, "Closeness centrality should be non-negative: {}", centrality);
708 }
709
710 assert!(found_nodes.contains("A"));
711 assert!(found_nodes.contains("B"));
712 assert!(found_nodes.contains("C"));
713 assert!(found_nodes.contains("D"));
714 assert!(found_nodes.contains("E"));
715 }
716
717 #[test]
718 fn test_centrality_invalid_params() {
719 let graph = create_test_graph();
720
721 let algorithm = EigenvectorCentrality;
723
724 let params = AlgorithmParams::new().with_param("max_iterations", 0);
725 let result = algorithm.execute(&graph, ¶ms);
726 assert!(result.is_err());
727
728 let params = AlgorithmParams::new().with_param("tolerance", -1.0);
729 let result = algorithm.execute(&graph, ¶ms);
730 assert!(result.is_err());
731 }
732
733 #[test]
734 fn test_algorithm_names_with_centrality() {
735 let shortest_path = ShortestPath;
736 let all_paths = AllPaths;
737 let pagerank = PageRank;
738 let betweenness = BetweennessCentrality;
739 let eigenvector = EigenvectorCentrality;
740 let closeness = ClosenessCentrality;
741 let weakly_connected = WeaklyConnectedComponents;
742 let strongly_connected = StronglyConnectedComponents;
743 let leiden = LeidenCommunityDetection;
744 let triangle_count = TriangleCount;
745 let clustering = ClusteringCoefficient;
746
747 assert_eq!(shortest_path.name(), "shortest_path");
748 assert_eq!(all_paths.name(), "all_paths");
749 assert_eq!(pagerank.name(), "pagerank");
750 assert_eq!(betweenness.name(), "betweenness_centrality");
751 assert_eq!(eigenvector.name(), "eigenvector_centrality");
752 assert_eq!(closeness.name(), "closeness_centrality");
753 assert_eq!(weakly_connected.name(), "weakly_connected_components");
754 assert_eq!(strongly_connected.name(), "strongly_connected_components");
755 assert_eq!(leiden.name(), "leiden");
756 assert_eq!(triangle_count.name(), "triangle_count");
757 assert_eq!(clustering.name(), "clustering_coefficient");
758 }
759
760 #[test]
761 fn test_graph_mutations_add_node() {
762 let mut graph = create_test_graph();
763 let initial_count = graph.node_count();
764
765 graph.add_node("F".to_string()).unwrap();
767
768 assert_eq!(graph.node_count(), initial_count + 1);
769 assert!(graph.has_node("F"));
770
771 let result = graph.add_node("F".to_string());
773 assert!(result.is_err());
774 }
775
776 #[test]
777 fn test_graph_mutations_remove_node() {
778 let mut graph = create_test_graph();
779 let initial_count = graph.node_count();
780 let initial_edge_count = graph.edge_count();
781
782 graph.remove_node("A").unwrap();
784
785 assert_eq!(graph.node_count(), initial_count - 1);
786 assert!(!graph.has_node("A"));
787 assert!(graph.edge_count() < initial_edge_count); let result = graph.remove_node("Z");
791 assert!(result.is_err());
792 }
793
794 #[test]
795 fn test_graph_mutations_add_edge() {
796 let mut graph = create_test_graph();
797 let initial_edge_count = graph.edge_count();
798
799 graph.add_edge("A".to_string(), "E".to_string(), Some(2.5)).unwrap();
801
802 assert_eq!(graph.edge_count(), initial_edge_count + 1);
803 assert_eq!(graph.edge_weight("A", "E"), Some(2.5));
804
805 graph.add_edge("F".to_string(), "G".to_string(), None).unwrap();
807 assert!(graph.has_node("F"));
808 assert!(graph.has_node("G"));
809
810 let result = graph.add_edge("A".to_string(), "E".to_string(), Some(1.0));
812 assert!(result.is_err());
813 }
814
815 #[test]
816 fn test_graph_mutations_remove_edge() {
817 let mut graph = create_test_graph();
818 let initial_edge_count = graph.edge_count();
819
820 graph.remove_edge("A", "B").unwrap();
822
823 assert_eq!(graph.edge_count(), initial_edge_count - 1);
824 assert!(graph.edge_weight("A", "B").is_none());
825
826 let result = graph.remove_edge("A", "Z");
828 assert!(result.is_err());
829 }
830
831 #[test]
832 fn test_graph_mutations_complex_operations() {
833 let mut graph = create_test_graph();
834
835 graph.add_node("F".to_string()).unwrap();
837 graph.add_edge("F".to_string(), "A".to_string(), Some(3.0)).unwrap();
838 graph.add_edge("B".to_string(), "F".to_string(), Some(1.5)).unwrap();
839
840 assert!(graph.has_node("F"));
841 assert_eq!(graph.edge_weight("F", "A"), Some(3.0));
842 assert_eq!(graph.edge_weight("B", "F"), Some(1.5));
843
844 let initial_node_count = graph.node_count();
846 graph.remove_node("F").unwrap();
847
848 assert_eq!(graph.node_count(), initial_node_count - 1);
849 assert!(!graph.has_node("F"));
850 assert!(graph.edge_weight("F", "A").is_none());
851 assert!(graph.edge_weight("B", "F").is_none());
852 }
853
854 #[test]
855 fn test_vectorized_pagerank() {
856 let graph = create_test_graph();
857 let algorithm = VectorizedPageRank;
858
859 let params = AlgorithmParams::new()
860 .with_param("damping_factor", 0.85)
861 .with_param("max_iterations", 50)
862 .with_param("tolerance", 1e-6);
863
864 let result = algorithm.execute(&graph, ¶ms).unwrap();
865
866 assert_eq!(result.num_rows(), graph.node_count());
867 assert_eq!(result.num_columns(), 2);
868
869 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
870 let score_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
871
872 let mut total_score = 0.0;
874 let mut found_nodes = std::collections::HashSet::new();
875
876 for i in 0..result.num_rows() {
877 let node_id = node_col.value(i);
878 let score = score_col.value(i);
879
880 found_nodes.insert(node_id);
881 total_score += score;
882
883 assert!(score > 0.0, "PageRank score should be positive: {}", score);
885 }
886
887 assert!(found_nodes.contains("A"));
889 assert!(found_nodes.contains("B"));
890 assert!(found_nodes.contains("C"));
891 assert!(found_nodes.contains("D"));
892 assert!(found_nodes.contains("E"));
893
894 assert!((total_score - 1.0).abs() < 1e-8, "PageRank scores should sum to 1.0, got: {}", total_score);
896 }
897
898 #[test]
899 fn test_vectorized_batch_centralities() {
900 let graph = create_test_graph();
901 let algorithm = VectorizedBatchOperations;
902
903 let params = AlgorithmParams::new();
904 let result = algorithm.execute(&graph, ¶ms).unwrap();
905
906 assert_eq!(result.num_rows(), graph.node_count());
907 assert_eq!(result.num_columns(), 4);
908
909 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
910 let degree_col = result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
911 let eigenvector_col = result.column(2).as_any().downcast_ref::<Float64Array>().unwrap();
912 let closeness_col = result.column(3).as_any().downcast_ref::<Float64Array>().unwrap();
913
914 let mut found_nodes = std::collections::HashSet::new();
916
917 for i in 0..result.num_rows() {
918 let node_id = node_col.value(i);
919 let degree_centrality = degree_col.value(i);
920 let eigenvector_centrality = eigenvector_col.value(i);
921 let closeness_centrality = closeness_col.value(i);
922
923 found_nodes.insert(node_id);
924
925 assert!(degree_centrality >= 0.0 && degree_centrality <= 1.0,
927 "Degree centrality should be in [0,1]: {}", degree_centrality);
928 assert!(eigenvector_centrality >= 0.0,
929 "Eigenvector centrality should be non-negative: {}", eigenvector_centrality);
930 assert!(closeness_centrality >= 0.0 && closeness_centrality <= 1.0,
931 "Closeness centrality should be in [0,1]: {}", closeness_centrality);
932 }
933
934 assert!(found_nodes.contains("A"));
936 assert!(found_nodes.contains("B"));
937 assert!(found_nodes.contains("C"));
938 assert!(found_nodes.contains("D"));
939 assert!(found_nodes.contains("E"));
940 }
941
942 #[test]
943 fn test_vectorized_vs_regular_pagerank_consistency() {
944 let graph = create_test_graph();
945
946 let params = AlgorithmParams::new()
947 .with_param("damping_factor", 0.85)
948 .with_param("max_iterations", 100)
949 .with_param("tolerance", 1e-8);
950
951 let regular_pagerank = PageRank;
953 let regular_result = regular_pagerank.execute(&graph, ¶ms).unwrap();
954
955 let vectorized_pagerank = VectorizedPageRank;
957 let vectorized_result = vectorized_pagerank.execute(&graph, ¶ms).unwrap();
958
959 assert_eq!(regular_result.num_rows(), vectorized_result.num_rows());
961
962 let regular_scores = regular_result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
963 let vectorized_scores = vectorized_result.column(1).as_any().downcast_ref::<Float64Array>().unwrap();
964
965 for i in 0..regular_result.num_rows() {
966 let regular_score = regular_scores.value(i);
967 let vectorized_score = vectorized_scores.value(i);
968 let diff = (regular_score - vectorized_score).abs();
969
970 assert!(diff < 1e-6,
971 "PageRank scores should be similar: regular={}, vectorized={}, diff={}",
972 regular_score, vectorized_score, diff);
973 }
974 }
975
976 #[test]
977 fn test_random_walk() {
978 let graph = create_test_graph();
979 let algorithm = RandomWalk;
980
981 let params = AlgorithmParams::new()
982 .with_param("walk_length", 5)
983 .with_param("num_walks", 3)
984 .with_param("seed", 42u64);
985
986 let result = algorithm.execute(&graph, ¶ms).unwrap();
987
988 assert_eq!(result.num_columns(), 3);
990
991 let walk_id_col = result.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
992 let step_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
993 let node_col = result.column(2).as_any().downcast_ref::<StringArray>().unwrap();
994
995 let mut walks: std::collections::HashMap<u32, Vec<(u32, String)>> = std::collections::HashMap::new();
997
998 for i in 0..result.num_rows() {
999 let walk_id = walk_id_col.value(i);
1000 let step = step_col.value(i);
1001 let node_id = node_col.value(i).to_string();
1002
1003 walks.entry(walk_id).or_insert_with(Vec::new).push((step, node_id));
1004 }
1005
1006 assert_eq!(walks.len(), 15);
1008
1009 for (_, walk_steps) in walks {
1011 assert!(!walk_steps.is_empty());
1012 assert!(walk_steps.len() <= 5); let mut sorted_steps = walk_steps;
1016 sorted_steps.sort_by_key(|(step, _)| *step);
1017
1018 for (i, (step, _)) in sorted_steps.iter().enumerate() {
1019 assert_eq!(*step, i as u32);
1020 }
1021 }
1022 }
1023
1024 #[test]
1025 fn test_node2vec_walk() {
1026 let graph = create_test_graph();
1027 let algorithm = Node2VecWalk;
1028
1029 let params = AlgorithmParams::new()
1030 .with_param("walk_length", 10)
1031 .with_param("num_walks", 2)
1032 .with_param("p", 1.0)
1033 .with_param("q", 1.0)
1034 .with_param("seed", 42u64);
1035
1036 let result = algorithm.execute(&graph, ¶ms).unwrap();
1037
1038 assert_eq!(result.num_columns(), 5);
1040
1041 let walk_id_col = result.column(0).as_any().downcast_ref::<UInt32Array>().unwrap();
1042 let step_col = result.column(1).as_any().downcast_ref::<UInt32Array>().unwrap();
1043 let node_col = result.column(2).as_any().downcast_ref::<StringArray>().unwrap();
1044 let p_col = result.column(3).as_any().downcast_ref::<Float64Array>().unwrap();
1045 let q_col = result.column(4).as_any().downcast_ref::<Float64Array>().unwrap();
1046
1047 for i in 0..result.num_rows() {
1049 assert_eq!(p_col.value(i), 1.0);
1050 assert_eq!(q_col.value(i), 1.0);
1051 }
1052
1053 let walk_count = walk_id_col.iter().max().unwrap().unwrap() + 1;
1055 assert_eq!(walk_count, 10); }
1057
1058 #[test]
1059 fn test_graph_sampling_random_node() {
1060 let graph = create_test_graph();
1061 let algorithm = GraphSampling;
1062
1063 let params = AlgorithmParams::new()
1064 .with_param("method", "random_node".to_string())
1065 .with_param("sample_size", 3)
1066 .with_param("seed", 42u64);
1067
1068 let result = algorithm.execute(&graph, ¶ms).unwrap();
1069
1070 assert_eq!(result.num_columns(), 1);
1071 assert_eq!(result.num_rows(), 3);
1072
1073 let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
1074
1075 for i in 0..result.num_rows() {
1077 let node_id = node_col.value(i);
1078 assert!(graph.has_node(node_id), "Sampled node {} should exist in graph", node_id);
1079 }
1080 }
1081
1082 #[test]
1083 fn test_graph_sampling_random_edge() {
1084 let graph = create_test_graph();
1085 let algorithm = GraphSampling;
1086
1087 let params = AlgorithmParams::new()
1088 .with_param("method", "random_edge".to_string())
1089 .with_param("sample_ratio", 0.6)
1090 .with_param("seed", 42u64);
1091
1092 let result = algorithm.execute(&graph, ¶ms).unwrap();
1093
1094 assert_eq!(result.num_columns(), 3);
1095
1096 let source_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
1097 let target_col = result.column(1).as_any().downcast_ref::<StringArray>().unwrap();
1098 let weight_col = result.column(2).as_any().downcast_ref::<Float64Array>().unwrap();
1099
1100 for i in 0..result.num_rows() {
1102 let source = source_col.value(i);
1103 let target = target_col.value(i);
1104 let weight = weight_col.value(i);
1105
1106 assert!(graph.has_node(source), "Source node {} should exist", source);
1107 assert!(graph.has_node(target), "Target node {} should exist", target);
1108 assert!(graph.edge_weight(source, target).is_some(),
1109 "Edge {}→{} should exist in graph", source, target);
1110 assert!(weight > 0.0, "Edge weight should be positive");
1111 }
1112 }
1113
1114 #[test]
1115 fn test_graph_sampling_snowball() {
1116 let graph = create_test_graph();
1117 let algorithm = GraphSampling;
1118
1119 let params = AlgorithmParams::new()
1120 .with_param("method", "snowball".to_string())
1121 .with_param("seed_nodes", vec!["A".to_string()])
1122 .with_param("k_hops", 2)
1123 .with_param("max_nodes", 4);
1124
1125 let result = algorithm.execute(&graph, ¶ms).unwrap();
1126
1127 assert_eq!(result.num_columns(), 1);
1128 assert!(result.num_rows() <= 4); let node_col = result.column(0).as_any().downcast_ref::<StringArray>().unwrap();
1131
1132 let sampled_nodes: Vec<String> = (0..result.num_rows())
1134 .map(|i| node_col.value(i).to_string())
1135 .collect();
1136
1137 assert!(sampled_nodes.contains(&"A".to_string()),
1138 "Snowball sampling should include seed node A");
1139
1140 for node_id in &sampled_nodes {
1142 assert!(graph.has_node(node_id), "Sampled node {} should exist in graph", node_id);
1143 }
1144 }
1145
1146 #[test]
1147 fn test_sampling_invalid_parameters() {
1148 let graph = create_test_graph();
1149
1150 let algorithm = RandomWalk;
1152 let params = AlgorithmParams::new().with_param("walk_length", 0);
1153 let result = algorithm.execute(&graph, ¶ms);
1154 assert!(result.is_err());
1155
1156 let algorithm = Node2VecWalk;
1158 let params = AlgorithmParams::new().with_param("p", -1.0);
1159 let result = algorithm.execute(&graph, ¶ms);
1160 assert!(result.is_err());
1161
1162 let algorithm = GraphSampling;
1164 let params = AlgorithmParams::new()
1165 .with_param("method", "random_edge".to_string())
1166 .with_param("sample_ratio", 1.5);
1167 let result = algorithm.execute(&graph, ¶ms);
1168 assert!(result.is_err());
1169 }
1170
1171 #[test]
1172 fn test_sampling_algorithm_names() {
1173 let random_walk = RandomWalk;
1174 let node2vec = Node2VecWalk;
1175 let sampling = GraphSampling;
1176
1177 assert_eq!(random_walk.name(), "random_walk");
1178 assert_eq!(node2vec.name(), "node2vec");
1179 assert_eq!(sampling.name(), "graph_sampling");
1180
1181 assert!(!random_walk.description().is_empty());
1183 assert!(!node2vec.description().is_empty());
1184 assert!(!sampling.description().is_empty());
1185 }
1186
1187 #[test]
1188 fn test_streaming_graph_processor_basic_operations() {
1189 let initial_graph = create_test_graph();
1190 let mut processor = StreamingGraphProcessor::new(initial_graph);
1191
1192 let initial_node_count = processor.graph().node_count();
1193 let initial_edge_count = processor.graph().edge_count();
1194
1195 let result = processor.apply_update(StreamUpdate::AddNode {
1197 node_id: "F".to_string()
1198 }).unwrap();
1199
1200 assert_eq!(result.operation, "add_node");
1201 assert_eq!(result.nodes_added, 1);
1202 assert_eq!(result.nodes_removed, 0);
1203 assert_eq!(processor.graph().node_count(), initial_node_count + 1);
1204 assert!(processor.graph().has_node("F"));
1205
1206 let result = processor.apply_update(StreamUpdate::AddEdge {
1208 source: "F".to_string(),
1209 target: "A".to_string(),
1210 weight: Some(2.0),
1211 }).unwrap();
1212
1213 assert_eq!(result.operation, "add_edge");
1214 assert_eq!(result.edges_added, 1);
1215 assert_eq!(processor.graph().edge_count(), initial_edge_count + 1);
1216 assert_eq!(processor.graph().edge_weight("F", "A"), Some(2.0));
1217
1218 let result = processor.apply_update(StreamUpdate::RemoveEdge {
1220 source: "F".to_string(),
1221 target: "A".to_string(),
1222 }).unwrap();
1223
1224 assert_eq!(result.operation, "remove_edge");
1225 assert_eq!(result.edges_removed, 1);
1226 assert_eq!(processor.graph().edge_count(), initial_edge_count);
1227 assert!(processor.graph().edge_weight("F", "A").is_none());
1228
1229 let result = processor.apply_update(StreamUpdate::RemoveNode {
1231 node_id: "F".to_string()
1232 }).unwrap();
1233
1234 assert_eq!(result.operation, "remove_node");
1235 assert_eq!(result.nodes_removed, 1);
1236 assert_eq!(processor.graph().node_count(), initial_node_count);
1237 assert!(!processor.graph().has_node("F"));
1238 }
1239
1240 #[test]
1241 fn test_streaming_graph_processor_empty_graph() {
1242 let mut processor = StreamingGraphProcessor::empty().unwrap();
1243
1244 assert_eq!(processor.graph().node_count(), 0);
1245 assert_eq!(processor.graph().edge_count(), 0);
1246
1247 processor.apply_update(StreamUpdate::AddNode {
1249 node_id: "A".to_string()
1250 }).unwrap();
1251
1252 assert_eq!(processor.graph().node_count(), 1);
1253 assert!(processor.graph().has_node("A"));
1254
1255 processor.apply_update(StreamUpdate::AddNode {
1257 node_id: "B".to_string()
1258 }).unwrap();
1259
1260 processor.apply_update(StreamUpdate::AddEdge {
1261 source: "A".to_string(),
1262 target: "B".to_string(),
1263 weight: Some(1.0),
1264 }).unwrap();
1265
1266 assert_eq!(processor.graph().node_count(), 2);
1267 assert_eq!(processor.graph().edge_count(), 1);
1268 assert_eq!(processor.graph().edge_weight("A", "B"), Some(1.0));
1269 }
1270
1271 #[test]
1272 fn test_streaming_graph_processor_batch_operations() {
1273 let initial_graph = create_test_graph();
1274 let mut processor = StreamingGraphProcessor::new(initial_graph);
1275
1276 let batch_operations = vec![
1277 StreamUpdate::AddNode { node_id: "F".to_string() },
1278 StreamUpdate::AddNode { node_id: "G".to_string() },
1279 StreamUpdate::AddEdge {
1280 source: "F".to_string(),
1281 target: "G".to_string(),
1282 weight: Some(1.5),
1283 },
1284 ];
1285
1286 let result = processor.apply_update(StreamUpdate::Batch {
1287 operations: batch_operations
1288 }).unwrap();
1289
1290 assert_eq!(result.operation, "batch");
1291 assert_eq!(result.nodes_added, 2);
1292 assert_eq!(result.edges_added, 1);
1293
1294 assert!(processor.graph().has_node("F"));
1295 assert!(processor.graph().has_node("G"));
1296 assert_eq!(processor.graph().edge_weight("F", "G"), Some(1.5));
1297 }
1298
1299 #[test]
1300 fn test_streaming_graph_processor_change_log() {
1301 let initial_graph = create_test_graph();
1302 let mut processor = StreamingGraphProcessor::new(initial_graph);
1303
1304 processor.set_change_log_enabled(true);
1306
1307 processor.apply_update(StreamUpdate::AddNode {
1309 node_id: "F".to_string()
1310 }).unwrap();
1311
1312 processor.apply_update(StreamUpdate::AddEdge {
1313 source: "F".to_string(),
1314 target: "A".to_string(),
1315 weight: Some(2.0),
1316 }).unwrap();
1317
1318 let change_log = processor.get_change_log_since(0);
1320 assert_eq!(change_log.len(), 2);
1321
1322 let stats = processor.get_statistics();
1324 assert_eq!(stats.total_updates, 2);
1325 assert_eq!(stats.change_log_size, 2);
1326 assert!(stats.change_log_enabled);
1327
1328 processor.compact_change_log(1);
1330 let change_log_after_compact = processor.get_change_log_since(0);
1331 assert_eq!(change_log_after_compact.len(), 1);
1332 }
1333
1334 #[test]
1335 fn test_streaming_graph_processor_snapshots() {
1336 let initial_graph = create_test_graph();
1337 let mut processor = StreamingGraphProcessor::new(initial_graph);
1338
1339 let initial_node_count = processor.graph().node_count();
1340
1341 let snapshot = processor.create_snapshot().unwrap();
1343 assert_eq!(snapshot.update_count, 0);
1344 assert_eq!(snapshot.graph.node_count(), initial_node_count);
1345
1346 processor.apply_update(StreamUpdate::AddNode {
1348 node_id: "F".to_string()
1349 }).unwrap();
1350 processor.apply_update(StreamUpdate::AddNode {
1351 node_id: "G".to_string()
1352 }).unwrap();
1353
1354 assert_eq!(processor.graph().node_count(), initial_node_count + 2);
1355 assert_eq!(processor.update_count(), 2);
1356
1357 processor.restore_from_snapshot(snapshot);
1359
1360 assert_eq!(processor.graph().node_count(), initial_node_count);
1361 assert_eq!(processor.update_count(), 0);
1362 assert!(!processor.graph().has_node("F"));
1363 assert!(!processor.graph().has_node("G"));
1364 }
1365
1366 #[test]
1367 fn test_streaming_graph_system_with_cache() {
1368 let initial_graph = create_test_graph();
1369 let mut system = StreamingGraphSystem::new(initial_graph);
1370
1371 let result = system.apply_update_with_cache_invalidation(StreamUpdate::AddNode {
1373 node_id: "F".to_string()
1374 }).unwrap();
1375
1376 assert_eq!(result.operation, "add_node");
1377 assert!(system.graph_processor().graph().has_node("F"));
1378
1379 let cache_stats = system.algorithm_processor().get_cache_statistics();
1381 assert_eq!(cache_stats.cached_algorithms, 0); assert!(!system.algorithm_processor().is_cache_valid("pagerank", 100));
1385 system.algorithm_processor_mut().set_invalidation_threshold(20);
1386
1387 let cache_stats_after = system.algorithm_processor().get_cache_statistics();
1388 assert_eq!(cache_stats_after.invalidation_threshold, 20);
1389 }
1390
1391 #[test]
1392 fn test_streaming_update_result_conversion() {
1393 let initial_graph = create_test_graph();
1394 let mut processor = StreamingGraphProcessor::new(initial_graph);
1395
1396 let result = processor.apply_update(StreamUpdate::AddNode {
1397 node_id: "F".to_string()
1398 }).unwrap();
1399
1400 let record_batch = result.to_record_batch().unwrap();
1402 assert_eq!(record_batch.num_rows(), 1);
1403 assert_eq!(record_batch.num_columns(), 6);
1404
1405 let operation_col = record_batch.column(0).as_any().downcast_ref::<StringArray>().unwrap();
1406 assert_eq!(operation_col.value(0), "add_node");
1407
1408 let nodes_added_col = record_batch.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
1409 assert_eq!(nodes_added_col.value(0), 1);
1410 }
1411
1412 #[test]
1413 fn test_streaming_graph_system_empty() {
1414 let mut system = StreamingGraphSystem::empty().unwrap();
1415
1416 assert_eq!(system.graph_processor().graph().node_count(), 0);
1417 assert_eq!(system.graph_processor().graph().edge_count(), 0);
1418
1419 system.apply_update_with_cache_invalidation(StreamUpdate::AddNode {
1421 node_id: "A".to_string()
1422 }).unwrap();
1423
1424 system.apply_update_with_cache_invalidation(StreamUpdate::AddNode {
1425 node_id: "B".to_string()
1426 }).unwrap();
1427
1428 system.apply_update_with_cache_invalidation(StreamUpdate::AddEdge {
1429 source: "A".to_string(),
1430 target: "B".to_string(),
1431 weight: Some(1.0),
1432 }).unwrap();
1433
1434 assert_eq!(system.graph_processor().graph().node_count(), 2);
1435 assert_eq!(system.graph_processor().graph().edge_count(), 1);
1436 assert!(system.graph_processor().graph().has_node("A"));
1437 assert!(system.graph_processor().graph().has_node("B"));
1438 assert_eq!(system.graph_processor().graph().edge_weight("A", "B"), Some(1.0));
1439 }
1440
1441 #[test]
1442 fn test_streaming_invalid_operations() {
1443 let initial_graph = create_test_graph();
1444 let mut processor = StreamingGraphProcessor::new(initial_graph);
1445
1446 let result = processor.apply_update(StreamUpdate::RemoveNode {
1448 node_id: "Z".to_string()
1449 });
1450 assert!(result.is_err());
1451
1452 let result = processor.apply_update(StreamUpdate::RemoveEdge {
1454 source: "A".to_string(),
1455 target: "Z".to_string(),
1456 });
1457 assert!(result.is_err());
1458
1459 let result = processor.apply_update(StreamUpdate::AddEdge {
1461 source: "A".to_string(),
1462 target: "B".to_string(),
1463 weight: Some(2.0),
1464 });
1465 assert!(result.is_err()); }
1467}