1use crate::base::{EdgeWeight, Graph, Node};
7use crate::error::{GraphError, Result};
8use scirs2_core::parallel_ops::*;
9use std::collections::HashMap;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::Arc;
12
13#[derive(Debug, Clone)]
15pub struct ParallelConfig {
16 pub num_threads: Option<usize>,
18 pub chunk_size: usize,
20 pub enable_simd: bool,
22}
23
24impl Default for ParallelConfig {
25 fn default() -> Self {
26 ParallelConfig {
27 num_threads: None, chunk_size: 1000,
29 enable_simd: true,
30 }
31 }
32}
33
34pub struct LargeGraphIterator<N: Node, E: EdgeWeight> {
36 position: usize,
38 graph_data: Vec<(N, N, E)>,
40 chunk_size: usize,
42}
43
44impl<N: Node, E: EdgeWeight> LargeGraphIterator<N, E> {
45 pub fn new<Ix>(graph: &Graph<N, E, Ix>, chunk_size: usize) -> Self
47 where
48 N: Clone + std::fmt::Debug,
49 E: Clone,
50 Ix: petgraph::graph::IndexType,
51 {
52 let graph_data = graph
53 .edges()
54 .into_iter()
55 .map(|edge| (edge.source, edge.target, edge.weight))
56 .collect();
57
58 LargeGraphIterator {
59 position: 0,
60 graph_data,
61 chunk_size,
62 }
63 }
64
65 pub fn next_chunk(&mut self) -> Option<&[(N, N, E)]> {
67 if self.position >= self.graph_data.len() {
68 return None;
69 }
70
71 let end = (self.position + self.chunk_size).min(self.graph_data.len());
72 let chunk = &self.graph_data[self.position..end];
73 self.position = end;
74
75 if chunk.is_empty() {
76 None
77 } else {
78 Some(chunk)
79 }
80 }
81
82 pub fn reset(&mut self) {
84 self.position = 0;
85 }
86}
87
88#[allow(dead_code)]
90pub fn parallel_degree_computation<N, E, Ix>(
91 graph: &Graph<N, E, Ix>,
92 config: &ParallelConfig,
93) -> Result<HashMap<N, usize>>
94where
95 N: Node + Clone + Send + Sync + std::fmt::Debug,
96 E: EdgeWeight + Send + Sync,
97 Ix: petgraph::graph::IndexType + Send + Sync,
98{
99 let nodes: Vec<_> = graph.nodes().into_iter().cloned().collect();
103
104 let degrees: HashMap<N, usize> = nodes
106 .par_chunks(config.chunk_size)
107 .map(|chunk| {
108 let mut local_degrees = HashMap::new();
109 for node in chunk {
110 let degree = graph.degree(node);
111 local_degrees.insert(node.clone(), degree);
112 }
113 local_degrees
114 })
115 .reduce(HashMap::new, |mut acc, local| {
116 acc.extend(local);
117 acc
118 });
119
120 Ok(degrees)
121}
122
123#[allow(dead_code)]
125pub fn parallel_shortest_paths<N, E, Ix>(
126 graph: &Graph<N, E, Ix>,
127 sources: &[N],
128 _config: &ParallelConfig,
129) -> Result<HashMap<N, HashMap<N, E>>>
130where
131 N: Node + Clone + Send + Sync + std::fmt::Debug,
132 E: EdgeWeight
133 + Clone
134 + Send
135 + Sync
136 + num_traits::Zero
137 + num_traits::One
138 + std::ops::Add<Output = E>
139 + PartialOrd
140 + std::marker::Copy
141 + std::fmt::Debug
142 + std::default::Default,
143 Ix: petgraph::graph::IndexType + Send + Sync,
144{
145 use crate::algorithms::shortest_path::dijkstra_path;
146
147 let all_nodes: Vec<_> = graph.nodes().into_iter().cloned().collect();
151
152 let results: HashMap<N, HashMap<N, E>> = sources
154 .par_iter()
155 .map(|source| {
156 let mut paths_from_source = HashMap::new();
157
158 for target in &all_nodes {
159 if let Ok(Some(path)) = dijkstra_path(graph, source, target) {
160 paths_from_source.insert(target.clone(), path.total_weight);
161 }
162 }
163
164 (source.clone(), paths_from_source)
165 })
166 .collect();
167
168 Ok(results)
169}
170
171#[allow(dead_code)]
173pub fn cache_friendly_adjacency_matrix<N, E, Ix>(graph: &Graph<N, E, Ix>) -> Result<Vec<Vec<E>>>
174where
175 N: Node + Clone + std::fmt::Debug,
176 E: EdgeWeight + Clone + num_traits::Zero + Copy,
177 Ix: petgraph::graph::IndexType,
178{
179 let n = graph.node_count();
180 if n == 0 {
181 return Ok(vec![]);
182 }
183
184 let mut matrix = vec![vec![E::zero(); n]; n];
186
187 let node_to_index: HashMap<N, usize> = graph
189 .nodes()
190 .into_iter()
191 .enumerate()
192 .map(|(i, node)| (node.clone(), i))
193 .collect();
194
195 for edge in graph.edges() {
197 if let (Some(&src_idx), Some(&tgt_idx)) = (
198 node_to_index.get(&edge.source),
199 node_to_index.get(&edge.target),
200 ) {
201 matrix[src_idx][tgt_idx] = edge.weight;
202 matrix[tgt_idx][src_idx] = edge.weight; }
204 }
205
206 Ok(matrix)
207}
208
209pub struct StreamingGraphProcessor<N: Node, E: EdgeWeight> {
211 current_batch: Vec<(N, N, E)>,
213 batch_size: usize,
215 edge_count: AtomicUsize,
217 degree_counter: Arc<parking_lot::Mutex<HashMap<N, usize>>>,
219}
220
221impl<N: Node, E: EdgeWeight> StreamingGraphProcessor<N, E>
222where
223 N: Clone + Send + Sync,
224 E: Clone + Send + Sync,
225{
226 pub fn new(batch_size: usize) -> Self {
228 StreamingGraphProcessor {
229 current_batch: Vec::with_capacity(batch_size),
230 batch_size,
231 edge_count: AtomicUsize::new(0),
232 degree_counter: Arc::new(parking_lot::Mutex::new(HashMap::new())),
233 }
234 }
235
236 pub fn add_edge(&mut self, source: N, target: N, weight: E) -> Result<()> {
238 self.current_batch.push((source, target, weight));
239
240 if self.current_batch.len() >= self.batch_size {
241 self.process_batch()?;
242 }
243
244 Ok(())
245 }
246
247 fn process_batch(&mut self) -> Result<()> {
249 if self.current_batch.is_empty() {
250 return Ok(());
251 }
252
253 self.edge_count
255 .fetch_add(self.current_batch.len(), Ordering::Relaxed);
256
257 {
259 let mut degrees = self.degree_counter.lock();
260 for (source, target_, _) in &self.current_batch {
261 *degrees.entry(source.clone()).or_insert(0) += 1;
262 *degrees.entry(target_.clone()).or_insert(0) += 1;
263 }
264 }
265
266 self.current_batch.clear();
268
269 Ok(())
270 }
271
272 pub fn finish(mut self) -> Result<(usize, HashMap<N, usize>)> {
274 self.process_batch()?;
276
277 let total_edges = self.edge_count.load(Ordering::Relaxed);
278 let degrees = Arc::try_unwrap(self.degree_counter)
279 .map_err(|_| GraphError::AlgorithmError("Failed to unwrap degree counter".to_string()))?
280 .into_inner();
281
282 Ok((total_edges, degrees))
283 }
284
285 pub fn edge_count(&self) -> usize {
287 self.edge_count.load(Ordering::Relaxed)
288 }
289}
290
291#[cfg(target_arch = "x86_64")]
293pub mod simd_ops {
294 #[allow(unused_imports)]
295 use super::*;
296 use scirs2_core::simd_ops::SimdUnifiedOps;
297
298 #[allow(dead_code)]
300 pub fn simd_vector_add(a: &[f64], b: &[f64]) -> Vec<f64> {
301 assert_eq!(a.len(), b.len());
302
303 let a_view = ndarray::ArrayView1::from(a);
305 let b_view = ndarray::ArrayView1::from(b);
306
307 let result = f64::simd_add(&a_view, &b_view);
309
310 result.to_vec()
312 }
313
314 #[allow(dead_code)]
316 pub fn simd_dot_product(a: &[f64], b: &[f64]) -> f64 {
317 assert_eq!(a.len(), b.len());
318 let a_view = ndarray::ArrayView1::from(a);
319 let b_view = ndarray::ArrayView1::from(b);
320
321 f64::simd_dot(&a_view, &b_view)
323 }
324
325 #[allow(dead_code)]
327 pub fn simd_normalize(vector: &mut [f64]) {
328 let vector_view = ndarray::ArrayView1::from(&*vector);
329 let norm = f64::simd_norm(&vector_view);
330 if norm > 0.0 {
331 for val in vector.iter_mut() {
332 *val /= norm;
333 }
334 }
335 }
336
337 #[allow(dead_code)]
339 pub fn simd_cosine_similarity(a: &[f64], b: &[f64]) -> f64 {
340 assert_eq!(a.len(), b.len());
341 let a_view = ndarray::ArrayView1::from(a);
342 let b_view = ndarray::ArrayView1::from(b);
343 let dot_product = f64::simd_dot(&a_view, &b_view);
344 let norm_a = f64::simd_norm(&a_view);
345 let norm_b = f64::simd_norm(&b_view);
346 dot_product / (norm_a * norm_b)
347 }
348
349 #[allow(dead_code)]
351 pub fn simd_euclidean_distance(a: &[f64], b: &[f64]) -> f64 {
352 assert_eq!(a.len(), b.len());
353 let a_view = ndarray::ArrayView1::from(a);
354 let b_view = ndarray::ArrayView1::from(b);
355 let diff = f64::simd_sub(&a_view, &b_view);
356 f64::simd_norm(&diff.view())
357 }
358
359 #[allow(dead_code)]
361 pub fn simd_batch_centrality_update(
362 centralities: &mut [f64],
363 contributions: &[f64],
364 weights: &[f64],
365 ) {
366 assert_eq!(centralities.len(), contributions.len());
367 assert_eq!(centralities.len(), weights.len());
368
369 let contrib_view = ndarray::ArrayView1::from(contributions);
371 let weights_view = ndarray::ArrayView1::from(weights);
372 let weighted_contribs = f64::simd_mul(&contrib_view, &weights_view);
373
374 for (c, w) in centralities.iter_mut().zip(weighted_contribs.iter()) {
376 *c += *w;
377 }
378 }
379
380 #[allow(dead_code)]
382 pub fn simd_sparse_matvec(
383 row_ptr: &[usize],
384 col_idx: &[usize],
385 values: &[f64],
386 x: &[f64],
387 y: &mut [f64],
388 ) {
389 y.fill(0.0);
390
391 for (i, y_i) in y.iter_mut().enumerate() {
392 let row_start = row_ptr[i];
393 let row_end = row_ptr[i + 1];
394
395 let row_values = &values[row_start..row_end];
397 let row_indices = &col_idx[row_start..row_end];
398
399 let x_vals: Vec<f64> = row_indices.iter().map(|&j| x[j]).collect();
401
402 let row_view = ndarray::ArrayView1::from(row_values);
404 let x_view = ndarray::ArrayView1::from(&x_vals);
405 *y_i = f64::simd_dot(&row_view, &x_view);
406 }
407 }
408
409 #[allow(dead_code)]
411 pub fn simd_batch_degree_computation(_rowptr: &[usize], degrees: &mut [usize]) {
412 for (i, degree) in degrees.iter_mut().enumerate() {
413 *degree = _rowptr[i + 1] - _rowptr[i];
414 }
415 }
416}
417
418#[cfg(not(target_arch = "x86_64"))]
420pub mod simd_ops {
421 #[allow(dead_code)]
423 pub fn simd_vector_add(a: &[f64], b: &[f64]) -> Vec<f64> {
424 assert_eq!(a.len(), b.len());
425 a.iter().zip(b.iter()).map(|(&x, &y)| x + y).collect()
426 }
427
428 #[allow(dead_code)]
430 pub fn simd_dot_product(a: &[f64], b: &[f64]) -> f64 {
431 assert_eq!(a.len(), b.len());
432 a.iter().zip(b.iter()).map(|(&x, &y)| x * y).sum()
433 }
434
435 #[allow(dead_code)]
437 pub fn simd_normalize(vector: &mut [f64]) {
438 let norm: f64 = vector.iter().map(|x| x * x).sum::<f64>().sqrt();
439 if norm > 0.0 {
440 for x in vector.iter_mut() {
441 *x /= norm;
442 }
443 }
444 }
445
446 #[allow(dead_code)]
448 pub fn simd_cosine_similarity(a: &[f64], b: &[f64]) -> f64 {
449 assert_eq!(a.len(), b.len());
450 let dot: f64 = a.iter().zip(b.iter()).map(|(&x, &y)| x * y).sum();
451 let norm_a: f64 = a.iter().map(|x| x * x).sum::<f64>().sqrt();
452 let norm_b: f64 = b.iter().map(|x| x * x).sum::<f64>().sqrt();
453
454 if norm_a == 0.0 || norm_b == 0.0 {
455 0.0
456 } else {
457 dot / (norm_a * norm_b)
458 }
459 }
460
461 #[allow(dead_code)]
463 pub fn simd_euclidean_distance(a: &[f64], b: &[f64]) -> f64 {
464 assert_eq!(a.len(), b.len());
465 a.iter()
466 .zip(b.iter())
467 .map(|(&x, &y)| (x - y) * (x - y))
468 .sum::<f64>()
469 .sqrt()
470 }
471
472 #[allow(dead_code)]
474 pub fn simd_batch_centrality_update(
475 centralities: &mut [f64],
476 contributions: &[f64],
477 weights: &[f64],
478 ) {
479 for ((cent, &contrib), &weight) in centralities
480 .iter_mut()
481 .zip(contributions.iter())
482 .zip(weights.iter())
483 {
484 *cent += contrib * weight;
485 }
486 }
487
488 #[allow(dead_code)]
490 pub fn simd_sparse_matvec(
491 row_ptr: &[usize],
492 col_idx: &[usize],
493 values: &[f64],
494 x: &[f64],
495 y: &mut [f64],
496 ) {
497 y.fill(0.0);
498
499 for (i, y_i) in y.iter_mut().enumerate() {
500 let row_start = row_ptr[i];
501 let row_end = row_ptr[i + 1];
502
503 for j in row_start..row_end {
504 *y_i += values[j] * x[col_idx[j]];
505 }
506 }
507 }
508
509 #[allow(dead_code)]
511 pub fn simd_batch_degree_computation(_rowptr: &[usize], degrees: &mut [usize]) {
512 for (i, degree) in degrees.iter_mut().enumerate() {
513 *degree = _rowptr[i + 1] - _rowptr[i];
514 }
515 }
516}
517
518pub struct LazyGraphMetric<T> {
520 value: std::sync::OnceLock<std::result::Result<T, GraphError>>,
522 #[allow(clippy::type_complexity)]
524 compute_fn: std::sync::Mutex<Option<Box<dyn FnOnce() -> Result<T> + Send + 'static>>>,
525}
526
527impl<T> LazyGraphMetric<T>
528where
529 T: Send + 'static,
530{
531 pub fn new<F>(_computefn: F) -> Self
533 where
534 F: FnOnce() -> Result<T> + Send + 'static,
535 {
536 LazyGraphMetric {
537 value: std::sync::OnceLock::new(),
538 compute_fn: std::sync::Mutex::new(Some(Box::new(_computefn))),
539 }
540 }
541
542 pub fn get(&self) -> Result<&T> {
544 let result = self.value.get_or_init(|| {
545 let mut fn_guard = self.compute_fn.lock().unwrap();
547 if let Some(compute_fn) = fn_guard.take() {
548 compute_fn()
550 } else {
551 Err(GraphError::AlgorithmError(
553 "Computation function already consumed".to_string(),
554 ))
555 }
556 });
557
558 match result {
559 Ok(value) => Ok(value),
560 Err(e) => Err(GraphError::AlgorithmError(format!(
561 "Lazy computation failed: {e}"
562 ))),
563 }
564 }
565
566 pub fn is_computed(&self) -> bool {
568 self.value.get().is_some()
569 }
570
571 pub fn force(&self) -> Result<()> {
573 self.get().map(|_| ())
574 }
575
576 pub fn try_get(&self) -> Option<std::result::Result<&T, &GraphError>> {
578 self.value.get().map(|result| match result {
579 Ok(value) => Ok(value),
580 Err(error) => Err(error),
581 })
582 }
583}
584
585#[derive(Debug, Clone)]
587pub struct MemoryMetrics {
588 pub current_bytes: usize,
590 pub peak_bytes: usize,
592 pub average_bytes: usize,
594 pub allocation_count: usize,
596 pub deallocation_count: usize,
598 pub growth_rate: f64,
600 pub potential_leaks: isize,
602}
603
604impl Default for MemoryMetrics {
605 fn default() -> Self {
606 MemoryMetrics {
607 current_bytes: 0,
608 peak_bytes: 0,
609 average_bytes: 0,
610 allocation_count: 0,
611 deallocation_count: 0,
612 growth_rate: 0.0,
613 potential_leaks: 0,
614 }
615 }
616}
617
618pub struct RealTimeMemoryProfiler {
620 samples: Vec<(std::time::Instant, usize)>,
622 start_time: std::time::Instant,
624 allocations: AtomicUsize,
626 deallocations: AtomicUsize,
628 #[allow(dead_code)]
630 sample_interval_ms: u64,
631}
632
633impl RealTimeMemoryProfiler {
634 pub fn new(sample_interval_ms: u64) -> Self {
636 RealTimeMemoryProfiler {
637 samples: Vec::new(),
638 start_time: std::time::Instant::now(),
639 allocations: AtomicUsize::new(0),
640 deallocations: AtomicUsize::new(0),
641 sample_interval_ms,
642 }
643 }
644
645 pub fn sample_memory(&mut self, currentmemory: usize) {
647 self.samples
648 .push((std::time::Instant::now(), currentmemory));
649 }
650
651 pub fn record_allocation(&self, size: usize) {
653 self.allocations.fetch_add(1, Ordering::Relaxed);
654 }
655
656 pub fn record_deallocation(&self, size: usize) {
658 self.deallocations.fetch_add(1, Ordering::Relaxed);
659 }
660
661 pub fn generate_metrics(&self) -> MemoryMetrics {
663 if self.samples.is_empty() {
664 return MemoryMetrics::default();
665 }
666
667 let current_bytes = self.samples.last().map(|(_, mem)| *mem).unwrap_or(0);
668 let peak_bytes = self.samples.iter().map(|(_, mem)| *mem).max().unwrap_or(0);
669 let average_bytes = if !self.samples.is_empty() {
670 self.samples.iter().map(|(_, mem)| *mem).sum::<usize>() / self.samples.len()
671 } else {
672 0
673 };
674
675 let allocation_count = self.allocations.load(Ordering::Relaxed);
676 let deallocation_count = self.deallocations.load(Ordering::Relaxed);
677 let potential_leaks = allocation_count as isize - deallocation_count as isize;
678
679 let growth_rate = if self.samples.len() >= 2 {
681 let first = &self.samples[0];
682 let last = &self.samples[self.samples.len() - 1];
683 let time_diff = last.0.duration_since(first.0).as_secs_f64();
684 let memory_diff = last.1 as f64 - first.1 as f64;
685 if time_diff > 0.0 {
686 memory_diff / time_diff
687 } else {
688 0.0
689 }
690 } else {
691 0.0
692 };
693
694 MemoryMetrics {
695 current_bytes,
696 peak_bytes,
697 average_bytes,
698 allocation_count,
699 deallocation_count,
700 growth_rate,
701 potential_leaks,
702 }
703 }
704
705 pub fn analyze_memory_health(&self) -> Vec<String> {
707 let metrics = self.generate_metrics();
708 let mut warnings = Vec::new();
709
710 if metrics.growth_rate > 1_000_000.0 {
712 warnings.push(format!(
714 "High memory growth rate: {:.2} bytes/second",
715 metrics.growth_rate
716 ));
717 }
718
719 if metrics.potential_leaks > 1000 {
721 warnings.push(format!(
722 "Potential memory leak detected: {} unmatched allocations",
723 metrics.potential_leaks
724 ));
725 }
726
727 if metrics.peak_bytes > 1_000_000_000 {
729 warnings.push(format!(
731 "High peak memory usage: {:.2} MB",
732 metrics.peak_bytes as f64 / 1_000_000.0
733 ));
734 }
735
736 warnings
737 }
738
739 pub fn export_timeline(&self) -> Vec<(f64, usize)> {
741 self.samples
742 .iter()
743 .map(|(time, memory)| {
744 let elapsed = time.duration_since(self.start_time).as_secs_f64();
745 (elapsed, *memory)
746 })
747 .collect()
748 }
749}
750
751pub struct PerformanceMonitor {
753 start_time: std::time::Instant,
755 operation_name: String,
757 memory_profiler: RealTimeMemoryProfiler,
759 sampling_active: Arc<std::sync::atomic::AtomicBool>,
761}
762
763impl PerformanceMonitor {
764 pub fn start(_operationname: String) -> Self {
766 Self::start_with_config(_operationname, 100) }
768
769 pub fn start_with_config(operation_name: String, sample_intervalms: u64) -> Self {
771 PerformanceMonitor {
772 start_time: std::time::Instant::now(),
773 operation_name,
774 memory_profiler: RealTimeMemoryProfiler::new(sample_intervalms),
775 sampling_active: Arc::new(std::sync::atomic::AtomicBool::new(true)),
776 }
777 }
778
779 pub fn record_memory(&mut self, currentmemory: usize) {
781 self.memory_profiler.sample_memory(currentmemory);
782 }
783
784 pub fn record_allocation(&self, size: usize) {
786 self.memory_profiler.record_allocation(size);
787 }
788
789 pub fn record_deallocation(&self, size: usize) {
791 self.memory_profiler.record_deallocation(size);
792 }
793
794 pub fn get_memory_metrics(&self) -> MemoryMetrics {
796 self.memory_profiler.generate_metrics()
797 }
798
799 pub fn check_memory_health(&self) -> Vec<String> {
801 self.memory_profiler.analyze_memory_health()
802 }
803
804 pub fn get_memory_timeline(&self) -> Vec<(f64, usize)> {
806 self.memory_profiler.export_timeline()
807 }
808
809 pub fn update_memory(&mut self, currentmemory: usize) {
811 self.record_memory(currentmemory);
812 }
813
814 pub fn finish(self) -> PerformanceReport {
816 self.sampling_active.store(false, Ordering::Relaxed);
817
818 let duration = self.start_time.elapsed();
819 let memory_metrics = self.memory_profiler.generate_metrics();
820 let memory_warnings = self.memory_profiler.analyze_memory_health();
821 let timeline = self.memory_profiler.export_timeline();
822
823 let report = PerformanceReport {
824 operation_name: self.operation_name.clone(),
825 duration,
826 memory_metrics,
827 memory_warnings: memory_warnings.clone(),
828 timeline,
829 };
830
831 println!(
832 "Operation '{}' completed in {:?}",
833 self.operation_name, duration
834 );
835 println!(
836 "Memory: peak={:.2}MB, avg={:.2}MB, current={:.2}MB",
837 report.memory_metrics.peak_bytes as f64 / 1_000_000.0,
838 report.memory_metrics.average_bytes as f64 / 1_000_000.0,
839 report.memory_metrics.current_bytes as f64 / 1_000_000.0
840 );
841
842 if !memory_warnings.is_empty() {
843 println!("Memory warnings:");
844 for warning in &memory_warnings {
845 println!(" - {warning}");
846 }
847 }
848
849 report
850 }
851}
852
853#[derive(Debug)]
855pub struct PerformanceReport {
856 pub operation_name: String,
858 pub duration: std::time::Duration,
860 pub memory_metrics: MemoryMetrics,
862 pub memory_warnings: Vec<String>,
864 pub timeline: Vec<(f64, usize)>,
866}
867
868pub trait LargeGraphOps<N: Node, E: EdgeWeight> {
870 fn parallel_degrees(&self, config: &ParallelConfig) -> Result<HashMap<N, usize>>;
872
873 fn iter_edges_chunked(&self, chunksize: usize) -> LargeGraphIterator<N, E>;
875
876 fn cache_friendly_matrix(&self) -> Result<Vec<Vec<E>>>;
878}
879
880impl<N: Node + std::fmt::Debug, E: EdgeWeight, Ix: petgraph::graph::IndexType + Send + Sync>
881 LargeGraphOps<N, E> for Graph<N, E, Ix>
882where
883 N: Clone + Send + Sync + std::fmt::Debug,
884 E: Clone + Send + Sync + num_traits::Zero + Copy,
885{
886 fn parallel_degrees(&self, config: &ParallelConfig) -> Result<HashMap<N, usize>> {
887 parallel_degree_computation(self, config)
888 }
889
890 fn iter_edges_chunked(&self, chunksize: usize) -> LargeGraphIterator<N, E> {
891 LargeGraphIterator::new(self, chunksize)
892 }
893
894 fn cache_friendly_matrix(&self) -> Result<Vec<Vec<E>>> {
895 cache_friendly_adjacency_matrix(self)
896 }
897}
898
899#[cfg(test)]
900mod tests {
901 use super::*;
902
903 #[test]
904 fn test_parallel_config() {
905 let config = ParallelConfig::default();
906 assert_eq!(config.chunk_size, 1000);
907 assert!(config.enable_simd);
908 }
909
910 #[test]
911 fn test_large_graph_iterator() {
912 let mut graph: Graph<i32, f64> = Graph::new();
913 graph.add_edge(1, 2, 1.0).unwrap();
914 graph.add_edge(2, 3, 2.0).unwrap();
915 graph.add_edge(3, 4, 3.0).unwrap();
916
917 let mut iterator = LargeGraphIterator::new(&graph, 2);
918
919 let chunk1 = iterator.next_chunk();
920 assert!(chunk1.is_some());
921 assert_eq!(chunk1.unwrap().len(), 2);
922
923 let chunk2 = iterator.next_chunk();
924 assert!(chunk2.is_some());
925 assert_eq!(chunk2.unwrap().len(), 1);
926
927 let chunk3 = iterator.next_chunk();
928 assert!(chunk3.is_none());
929 }
930
931 #[test]
932 fn test_parallel_degree_computation() {
933 let mut graph: Graph<i32, f64> = Graph::new();
934 graph.add_edge(1, 2, 1.0).unwrap();
935 graph.add_edge(2, 3, 2.0).unwrap();
936 graph.add_edge(3, 1, 3.0).unwrap();
937
938 let config = ParallelConfig::default();
939 let degrees = graph.parallel_degrees(&config).unwrap();
940
941 assert_eq!(degrees[&1], 2);
942 assert_eq!(degrees[&2], 2);
943 assert_eq!(degrees[&3], 2);
944 }
945
946 #[test]
947 fn test_streaming_processor() {
948 let mut processor: StreamingGraphProcessor<i32, f64> = StreamingGraphProcessor::new(2);
949
950 processor.add_edge(1, 2, 1.0).unwrap();
951 assert_eq!(processor.edge_count(), 0); processor.add_edge(2, 3, 2.0).unwrap();
954 assert_eq!(processor.edge_count(), 2); let (total_edges, degrees) = processor.finish().unwrap();
957 assert_eq!(total_edges, 2);
958 assert_eq!(degrees[&1], 1);
959 assert_eq!(degrees[&2], 2);
960 assert_eq!(degrees[&3], 1);
961 }
962
963 #[test]
964 fn test_cache_friendly_matrix() {
965 let mut graph: Graph<i32, f64> = Graph::new();
966 graph.add_edge(0, 1, 1.0).unwrap();
967 graph.add_edge(1, 2, 2.0).unwrap();
968
969 let matrix = graph.cache_friendly_matrix().unwrap();
970 assert_eq!(matrix.len(), 3);
971 assert_eq!(matrix[0][1], 1.0);
972 assert_eq!(matrix[1][2], 2.0);
973 assert_eq!(matrix[2][1], 2.0); }
975
976 #[test]
977 fn test_performance_monitor() {
978 let mut monitor = PerformanceMonitor::start("test_operation".to_string());
979
980 monitor.record_memory(1024);
982 monitor.record_memory(2048);
983 monitor.record_memory(1536);
984
985 monitor.record_allocation(1024);
987 monitor.record_allocation(512);
988 monitor.record_deallocation(256);
989
990 std::thread::sleep(std::time::Duration::from_millis(10));
991 let report = monitor.finish();
992
993 assert!(report.duration.as_millis() >= 10);
994 assert_eq!(report.memory_metrics.peak_bytes, 2048);
995 assert_eq!(report.memory_metrics.current_bytes, 1536);
996 assert_eq!(report.memory_metrics.allocation_count, 2);
997 assert_eq!(report.memory_metrics.deallocation_count, 1);
998 assert_eq!(report.memory_metrics.potential_leaks, 1);
999 }
1000
1001 #[test]
1002 fn test_real_time_memory_profiler() {
1003 let mut profiler = RealTimeMemoryProfiler::new(50);
1004
1005 profiler.sample_memory(1000);
1007 std::thread::sleep(std::time::Duration::from_millis(10));
1008 profiler.sample_memory(2000);
1009 std::thread::sleep(std::time::Duration::from_millis(10));
1010 profiler.sample_memory(1500);
1011
1012 profiler.record_allocation(1000);
1014 profiler.record_allocation(500);
1015 profiler.record_deallocation(200);
1016
1017 let metrics = profiler.generate_metrics();
1018 assert_eq!(metrics.current_bytes, 1500);
1019 assert_eq!(metrics.peak_bytes, 2000);
1020 assert!(metrics.average_bytes > 0);
1021 assert_eq!(metrics.allocation_count, 2);
1022 assert_eq!(metrics.deallocation_count, 1);
1023 assert_eq!(metrics.potential_leaks, 1);
1024
1025 let timeline = profiler.export_timeline();
1027 assert_eq!(timeline.len(), 3);
1028 assert_eq!(timeline[0].1, 1000);
1029 assert_eq!(timeline[1].1, 2000);
1030 assert_eq!(timeline[2].1, 1500);
1031 }
1032
1033 #[test]
1034 fn test_memory_health_analysis() {
1035 let mut profiler = RealTimeMemoryProfiler::new(100);
1036
1037 profiler.sample_memory(100_000_000);
1039 std::thread::sleep(std::time::Duration::from_millis(50));
1040 profiler.sample_memory(200_000_000);
1041
1042 for _ in 0..1500 {
1044 profiler.record_allocation(1024);
1045 }
1046
1047 let warnings = profiler.analyze_memory_health();
1048 assert!(!warnings.is_empty());
1049
1050 let has_growth_warning = warnings.iter().any(|w| w.contains("growth rate"));
1052 let has_leak_warning = warnings.iter().any(|w| w.contains("leak"));
1053
1054 assert!(has_growth_warning);
1055 assert!(has_leak_warning);
1056 }
1057
1058 #[test]
1059 fn test_memory_metrics_calculation() {
1060 let mut profiler = RealTimeMemoryProfiler::new(100);
1061
1062 profiler.sample_memory(1000);
1064 std::thread::sleep(std::time::Duration::from_millis(100));
1065 profiler.sample_memory(2000);
1066 std::thread::sleep(std::time::Duration::from_millis(100));
1067 profiler.sample_memory(3000);
1068
1069 let metrics = profiler.generate_metrics();
1070
1071 assert!(metrics.growth_rate > 0.0);
1073
1074 assert!(metrics.average_bytes >= 1500 && metrics.average_bytes <= 2500);
1076
1077 assert_eq!(metrics.peak_bytes, 3000);
1079
1080 assert_eq!(metrics.current_bytes, 3000);
1082 }
1083
1084 #[test]
1085 fn test_simd_operations() {
1086 use crate::performance::simd_ops::*;
1087
1088 let a = vec![1.0, 2.0, 3.0];
1089 let b = vec![4.0, 5.0, 6.0];
1090
1091 let sum = simd_vector_add(&a, &b);
1093 assert_eq!(sum, vec![5.0, 7.0, 9.0]);
1094
1095 let dot = simd_dot_product(&a, &b);
1097 assert_eq!(dot, 32.0); let similarity = simd_cosine_similarity(&a, &b);
1101 assert!((similarity - 0.9746318461970762).abs() < 1e-10); let distance = simd_euclidean_distance(&a, &b);
1105 assert!((distance - 5.196152422706632).abs() < 1e-10); let mut vector = vec![3.0, 4.0, 0.0];
1109 simd_normalize(&mut vector);
1110 let expected_norm =
1111 (vector[0] * vector[0] + vector[1] * vector[1] + vector[2] * vector[2]).sqrt();
1112 assert!((expected_norm - 1.0).abs() < 1e-10);
1113
1114 let mut centralities = vec![1.0, 2.0, 3.0];
1116 let contributions = vec![0.5, 1.0, 1.5];
1117 let weights = vec![2.0, 2.0, 2.0];
1118 simd_batch_centrality_update(&mut centralities, &contributions, &weights);
1119 assert_eq!(centralities, vec![2.0, 4.0, 6.0]); }
1121
1122 #[test]
1123 fn test_sparse_matvec() {
1124 use crate::performance::simd_ops::*;
1125
1126 let row_ptr = vec![0, 2, 3, 5];
1131 let col_idx = vec![0, 2, 1, 0, 2];
1132 let values = vec![1.0, 2.0, 3.0, 1.0, 4.0];
1133 let x = vec![1.0, 1.0, 1.0];
1134 let mut y = vec![0.0; 3];
1135
1136 simd_sparse_matvec(&row_ptr, &col_idx, &values, &x, &mut y);
1137
1138 assert_eq!(y, vec![3.0, 3.0, 5.0]);
1140 }
1141
1142 #[test]
1143 fn test_batch_degree_computation() {
1144 use crate::performance::simd_ops::*;
1145
1146 let row_ptr = vec![0, 2, 3, 5];
1148 let mut degrees = vec![0; 3];
1149
1150 simd_batch_degree_computation(&row_ptr, &mut degrees);
1151
1152 assert_eq!(degrees, vec![2, 1, 2]);
1153 }
1154
1155 #[test]
1156 fn test_lazy_graph_metric() {
1157 use std::sync::atomic::{AtomicUsize, Ordering};
1158 use std::sync::Arc;
1159
1160 let counter = Arc::new(AtomicUsize::new(0));
1162 let counter_clone = counter.clone();
1163
1164 let lazy_metric = LazyGraphMetric::new(move || {
1165 counter_clone.fetch_add(1, Ordering::Relaxed);
1166 Ok(42i32)
1167 });
1168
1169 assert!(!lazy_metric.is_computed());
1171 assert_eq!(counter.load(Ordering::Relaxed), 0);
1172
1173 let result1 = lazy_metric.get().unwrap();
1175 assert_eq!(*result1, 42);
1176 assert!(lazy_metric.is_computed());
1177 assert_eq!(counter.load(Ordering::Relaxed), 1);
1178
1179 let result2 = lazy_metric.get().unwrap();
1181 assert_eq!(*result2, 42);
1182 assert_eq!(counter.load(Ordering::Relaxed), 1); assert!(lazy_metric.try_get().is_some());
1186 }
1187
1188 #[test]
1189 fn test_lazy_graph_metric_error() {
1190 let lazy_metric: LazyGraphMetric<String> =
1191 LazyGraphMetric::new(|| Err(GraphError::AlgorithmError("Test error".to_string())));
1192
1193 let result = lazy_metric.get();
1195 assert!(result.is_err());
1196
1197 let result2 = lazy_metric.get();
1199 assert!(result2.is_err());
1200 }
1201
1202 #[test]
1203 fn test_lazy_graph_metric_thread_safety() {
1204 use std::sync::atomic::{AtomicUsize, Ordering};
1205 use std::sync::Arc;
1206 use std::thread;
1207
1208 let counter = Arc::new(AtomicUsize::new(0));
1209 let counter_clone = counter.clone();
1210
1211 let lazy_metric = Arc::new(LazyGraphMetric::new(move || {
1212 counter_clone.fetch_add(1, Ordering::Relaxed);
1213 std::thread::sleep(std::time::Duration::from_millis(10)); Ok(100i32)
1215 }));
1216
1217 let handles: Vec<_> = (0..10)
1219 .map(|_| {
1220 let metric = lazy_metric.clone();
1221 thread::spawn(move || *metric.get().unwrap())
1222 })
1223 .collect();
1224
1225 let results: Vec<i32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1227
1228 assert!(results.iter().all(|&x| x == 100));
1230
1231 assert_eq!(counter.load(Ordering::Relaxed), 1);
1233 }
1234}