scirs2_graph/
performance.rs

1//! Performance optimizations for large graph operations
2//!
3//! This module provides performance-optimized algorithms and data structures
4//! specifically designed for handling large graphs efficiently.
5
6use crate::base::{EdgeWeight, Graph, Node};
7use crate::error::{GraphError, Result};
8use scirs2_core::parallel_ops::*;
9use std::collections::HashMap;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::Arc;
12
13/// Configuration for parallel processing
14#[derive(Debug, Clone)]
15pub struct ParallelConfig {
16    /// Number of threads to use (None = use all available cores)
17    pub num_threads: Option<usize>,
18    /// Chunk size for parallel operations
19    pub chunk_size: usize,
20    /// Enable SIMD optimizations where available
21    pub enable_simd: bool,
22}
23
24impl Default for ParallelConfig {
25    fn default() -> Self {
26        ParallelConfig {
27            num_threads: None, // Use all available cores
28            chunk_size: 1000,
29            enable_simd: true,
30        }
31    }
32}
33
34/// Memory-efficient iterator for large graph traversals
35pub struct LargeGraphIterator<N: Node, E: EdgeWeight> {
36    /// Current position in iteration
37    position: usize,
38    /// Graph reference
39    graph_data: Vec<(N, N, E)>,
40    /// Chunk size for memory efficiency
41    chunk_size: usize,
42}
43
44impl<N: Node, E: EdgeWeight> LargeGraphIterator<N, E> {
45    /// Create a new iterator for large graphs
46    pub fn new<Ix>(graph: &Graph<N, E, Ix>, chunk_size: usize) -> Self
47    where
48        N: Clone + std::fmt::Debug,
49        E: Clone,
50        Ix: petgraph::graph::IndexType,
51    {
52        let graph_data = graph
53            .edges()
54            .into_iter()
55            .map(|edge| (edge.source, edge.target, edge.weight))
56            .collect();
57
58        LargeGraphIterator {
59            position: 0,
60            graph_data,
61            chunk_size,
62        }
63    }
64
65    /// Get the next chunk of edges
66    pub fn next_chunk(&mut self) -> Option<&[(N, N, E)]> {
67        if self.position >= self.graph_data.len() {
68            return None;
69        }
70
71        let end = (self.position + self.chunk_size).min(self.graph_data.len());
72        let chunk = &self.graph_data[self.position..end];
73        self.position = end;
74
75        if chunk.is_empty() {
76            None
77        } else {
78            Some(chunk)
79        }
80    }
81
82    /// Reset iterator to beginning
83    pub fn reset(&mut self) {
84        self.position = 0;
85    }
86}
87
88/// Parallel degree computation for large graphs
89#[allow(dead_code)]
90pub fn parallel_degree_computation<N, E, Ix>(
91    graph: &Graph<N, E, Ix>,
92    config: &ParallelConfig,
93) -> Result<HashMap<N, usize>>
94where
95    N: Node + Clone + Send + Sync + std::fmt::Debug,
96    E: EdgeWeight + Send + Sync,
97    Ix: petgraph::graph::IndexType + Send + Sync,
98{
99    // Note: Thread pool configuration is handled globally by scirs2-core
100    // The num_threads config parameter is preserved for future use but currently ignored
101
102    let nodes: Vec<_> = graph.nodes().into_iter().cloned().collect();
103
104    // Parallel computation of degrees
105    let degrees: HashMap<N, usize> = nodes
106        .par_chunks(config.chunk_size)
107        .map(|chunk| {
108            let mut local_degrees = HashMap::new();
109            for node in chunk {
110                let degree = graph.degree(node);
111                local_degrees.insert(node.clone(), degree);
112            }
113            local_degrees
114        })
115        .reduce(HashMap::new, |mut acc, local| {
116            acc.extend(local);
117            acc
118        });
119
120    Ok(degrees)
121}
122
123/// Memory-efficient parallel shortest path computation
124#[allow(dead_code)]
125pub fn parallel_shortest_paths<N, E, Ix>(
126    graph: &Graph<N, E, Ix>,
127    sources: &[N],
128    _config: &ParallelConfig,
129) -> Result<HashMap<N, HashMap<N, E>>>
130where
131    N: Node + Clone + Send + Sync + std::fmt::Debug,
132    E: EdgeWeight
133        + Clone
134        + Send
135        + Sync
136        + num_traits::Zero
137        + num_traits::One
138        + std::ops::Add<Output = E>
139        + PartialOrd
140        + std::marker::Copy
141        + std::fmt::Debug
142        + std::default::Default,
143    Ix: petgraph::graph::IndexType + Send + Sync,
144{
145    use crate::algorithms::shortest_path::dijkstra_path;
146
147    // Note: Thread pool configuration is handled globally by scirs2-core
148    // The num_threads _config parameter is preserved for future use but currently ignored
149
150    let all_nodes: Vec<_> = graph.nodes().into_iter().cloned().collect();
151
152    // Parallel computation of shortest paths from multiple sources
153    let results: HashMap<N, HashMap<N, E>> = sources
154        .par_iter()
155        .map(|source| {
156            let mut paths_from_source = HashMap::new();
157
158            for target in &all_nodes {
159                if let Ok(Some(path)) = dijkstra_path(graph, source, target) {
160                    paths_from_source.insert(target.clone(), path.total_weight);
161                }
162            }
163
164            (source.clone(), paths_from_source)
165        })
166        .collect();
167
168    Ok(results)
169}
170
171/// Cache-friendly adjacency matrix computation for large graphs
172#[allow(dead_code)]
173pub fn cache_friendly_adjacency_matrix<N, E, Ix>(graph: &Graph<N, E, Ix>) -> Result<Vec<Vec<E>>>
174where
175    N: Node + Clone + std::fmt::Debug,
176    E: EdgeWeight + Clone + num_traits::Zero + Copy,
177    Ix: petgraph::graph::IndexType,
178{
179    let n = graph.node_count();
180    if n == 0 {
181        return Ok(vec![]);
182    }
183
184    // Pre-allocate matrix with cache-friendly access patterns
185    let mut matrix = vec![vec![E::zero(); n]; n];
186
187    // Node to index mapping
188    let node_to_index: HashMap<N, usize> = graph
189        .nodes()
190        .into_iter()
191        .enumerate()
192        .map(|(i, node)| (node.clone(), i))
193        .collect();
194
195    // Fill matrix in row-major order for cache efficiency
196    for edge in graph.edges() {
197        if let (Some(&src_idx), Some(&tgt_idx)) = (
198            node_to_index.get(&edge.source),
199            node_to_index.get(&edge.target),
200        ) {
201            matrix[src_idx][tgt_idx] = edge.weight;
202            matrix[tgt_idx][src_idx] = edge.weight; // Undirected _graph
203        }
204    }
205
206    Ok(matrix)
207}
208
209/// Streaming algorithm for processing very large graphs
210pub struct StreamingGraphProcessor<N: Node, E: EdgeWeight> {
211    /// Current batch of edges being processed
212    current_batch: Vec<(N, N, E)>,
213    /// Maximum batch size
214    batch_size: usize,
215    /// Running statistics
216    edge_count: AtomicUsize,
217    /// Degree accumulator
218    degree_counter: Arc<parking_lot::Mutex<HashMap<N, usize>>>,
219}
220
221impl<N: Node, E: EdgeWeight> StreamingGraphProcessor<N, E>
222where
223    N: Clone + Send + Sync,
224    E: Clone + Send + Sync,
225{
226    /// Create a new streaming processor
227    pub fn new(batch_size: usize) -> Self {
228        StreamingGraphProcessor {
229            current_batch: Vec::with_capacity(batch_size),
230            batch_size,
231            edge_count: AtomicUsize::new(0),
232            degree_counter: Arc::new(parking_lot::Mutex::new(HashMap::new())),
233        }
234    }
235
236    /// Add an edge to the streaming processor
237    pub fn add_edge(&mut self, source: N, target: N, weight: E) -> Result<()> {
238        self.current_batch.push((source, target, weight));
239
240        if self.current_batch.len() >= self.batch_size {
241            self.process_batch()?;
242        }
243
244        Ok(())
245    }
246
247    /// Process the current batch of edges
248    fn process_batch(&mut self) -> Result<()> {
249        if self.current_batch.is_empty() {
250            return Ok(());
251        }
252
253        // Update edge count
254        self.edge_count
255            .fetch_add(self.current_batch.len(), Ordering::Relaxed);
256
257        // Update degree counts
258        {
259            let mut degrees = self.degree_counter.lock();
260            for (source, target_, _) in &self.current_batch {
261                *degrees.entry(source.clone()).or_insert(0) += 1;
262                *degrees.entry(target_.clone()).or_insert(0) += 1;
263            }
264        }
265
266        // Clear current batch
267        self.current_batch.clear();
268
269        Ok(())
270    }
271
272    /// Finish processing and return final statistics
273    pub fn finish(mut self) -> Result<(usize, HashMap<N, usize>)> {
274        // Process remaining edges
275        self.process_batch()?;
276
277        let total_edges = self.edge_count.load(Ordering::Relaxed);
278        let degrees = Arc::try_unwrap(self.degree_counter)
279            .map_err(|_| GraphError::AlgorithmError("Failed to unwrap degree counter".to_string()))?
280            .into_inner();
281
282        Ok((total_edges, degrees))
283    }
284
285    /// Get current edge count
286    pub fn edge_count(&self) -> usize {
287        self.edge_count.load(Ordering::Relaxed)
288    }
289}
290
291/// SIMD-optimized operations for numeric graph computations
292#[cfg(target_arch = "x86_64")]
293pub mod simd_ops {
294    #[allow(unused_imports)]
295    use super::*;
296    use scirs2_core::simd_ops::SimdUnifiedOps;
297
298    /// SIMD-optimized vector addition for graph metrics
299    #[allow(dead_code)]
300    pub fn simd_vector_add(a: &[f64], b: &[f64]) -> Vec<f64> {
301        assert_eq!(a.len(), b.len());
302
303        // Convert slices to ArrayView1 for SIMD operations
304        let a_view = ndarray::ArrayView1::from(a);
305        let b_view = ndarray::ArrayView1::from(b);
306
307        // Use scirs2-core SIMD operations for optimal performance
308        let result = f64::simd_add(&a_view, &b_view);
309
310        // Convert back to Vec<f64>
311        result.to_vec()
312    }
313
314    /// SIMD-optimized dot product for similarity computations
315    #[allow(dead_code)]
316    pub fn simd_dot_product(a: &[f64], b: &[f64]) -> f64 {
317        assert_eq!(a.len(), b.len());
318        let a_view = ndarray::ArrayView1::from(a);
319        let b_view = ndarray::ArrayView1::from(b);
320
321        // Use scirs2-core SIMD optimized dot product
322        f64::simd_dot(&a_view, &b_view)
323    }
324
325    /// SIMD-optimized vector normalization
326    #[allow(dead_code)]
327    pub fn simd_normalize(vector: &mut [f64]) {
328        let vector_view = ndarray::ArrayView1::from(&*vector);
329        let norm = f64::simd_norm(&vector_view);
330        if norm > 0.0 {
331            for val in vector.iter_mut() {
332                *val /= norm;
333            }
334        }
335    }
336
337    /// SIMD-optimized cosine similarity
338    #[allow(dead_code)]
339    pub fn simd_cosine_similarity(a: &[f64], b: &[f64]) -> f64 {
340        assert_eq!(a.len(), b.len());
341        let a_view = ndarray::ArrayView1::from(a);
342        let b_view = ndarray::ArrayView1::from(b);
343        let dot_product = f64::simd_dot(&a_view, &b_view);
344        let norm_a = f64::simd_norm(&a_view);
345        let norm_b = f64::simd_norm(&b_view);
346        dot_product / (norm_a * norm_b)
347    }
348
349    /// SIMD-optimized euclidean distance
350    #[allow(dead_code)]
351    pub fn simd_euclidean_distance(a: &[f64], b: &[f64]) -> f64 {
352        assert_eq!(a.len(), b.len());
353        let a_view = ndarray::ArrayView1::from(a);
354        let b_view = ndarray::ArrayView1::from(b);
355        let diff = f64::simd_sub(&a_view, &b_view);
356        f64::simd_norm(&diff.view())
357    }
358
359    /// SIMD-optimized batch centrality computation
360    #[allow(dead_code)]
361    pub fn simd_batch_centrality_update(
362        centralities: &mut [f64],
363        contributions: &[f64],
364        weights: &[f64],
365    ) {
366        assert_eq!(centralities.len(), contributions.len());
367        assert_eq!(centralities.len(), weights.len());
368
369        // Multiply contributions by weights and add to centralities
370        let contrib_view = ndarray::ArrayView1::from(contributions);
371        let weights_view = ndarray::ArrayView1::from(weights);
372        let weighted_contribs = f64::simd_mul(&contrib_view, &weights_view);
373
374        // Manual add-assign since there's no direct simd_add_assign
375        for (c, w) in centralities.iter_mut().zip(weighted_contribs.iter()) {
376            *c += *w;
377        }
378    }
379
380    /// SIMD-optimized matrix-vector multiplication for PageRank-style algorithms
381    #[allow(dead_code)]
382    pub fn simd_sparse_matvec(
383        row_ptr: &[usize],
384        col_idx: &[usize],
385        values: &[f64],
386        x: &[f64],
387        y: &mut [f64],
388    ) {
389        y.fill(0.0);
390
391        for (i, y_i) in y.iter_mut().enumerate() {
392            let row_start = row_ptr[i];
393            let row_end = row_ptr[i + 1];
394
395            // Process row elements in SIMD-optimized chunks
396            let row_values = &values[row_start..row_end];
397            let row_indices = &col_idx[row_start..row_end];
398
399            // Gather x values corresponding to column indices
400            let x_vals: Vec<f64> = row_indices.iter().map(|&j| x[j]).collect();
401
402            // SIMD dot product for this row
403            let row_view = ndarray::ArrayView1::from(row_values);
404            let x_view = ndarray::ArrayView1::from(&x_vals);
405            *y_i = f64::simd_dot(&row_view, &x_view);
406        }
407    }
408
409    /// SIMD-optimized degree computation for multiple nodes
410    #[allow(dead_code)]
411    pub fn simd_batch_degree_computation(_rowptr: &[usize], degrees: &mut [usize]) {
412        for (i, degree) in degrees.iter_mut().enumerate() {
413            *degree = _rowptr[i + 1] - _rowptr[i];
414        }
415    }
416}
417
418/// Non-x86_64 fallback implementations
419#[cfg(not(target_arch = "x86_64"))]
420pub mod simd_ops {
421    /// Fallback vector addition
422    #[allow(dead_code)]
423    pub fn simd_vector_add(a: &[f64], b: &[f64]) -> Vec<f64> {
424        assert_eq!(a.len(), b.len());
425        a.iter().zip(b.iter()).map(|(&x, &y)| x + y).collect()
426    }
427
428    /// Fallback dot product
429    #[allow(dead_code)]
430    pub fn simd_dot_product(a: &[f64], b: &[f64]) -> f64 {
431        assert_eq!(a.len(), b.len());
432        a.iter().zip(b.iter()).map(|(&x, &y)| x * y).sum()
433    }
434
435    /// Fallback normalization
436    #[allow(dead_code)]
437    pub fn simd_normalize(vector: &mut [f64]) {
438        let norm: f64 = vector.iter().map(|x| x * x).sum::<f64>().sqrt();
439        if norm > 0.0 {
440            for x in vector.iter_mut() {
441                *x /= norm;
442            }
443        }
444    }
445
446    /// Fallback cosine similarity
447    #[allow(dead_code)]
448    pub fn simd_cosine_similarity(a: &[f64], b: &[f64]) -> f64 {
449        assert_eq!(a.len(), b.len());
450        let dot: f64 = a.iter().zip(b.iter()).map(|(&x, &y)| x * y).sum();
451        let norm_a: f64 = a.iter().map(|x| x * x).sum::<f64>().sqrt();
452        let norm_b: f64 = b.iter().map(|x| x * x).sum::<f64>().sqrt();
453
454        if norm_a == 0.0 || norm_b == 0.0 {
455            0.0
456        } else {
457            dot / (norm_a * norm_b)
458        }
459    }
460
461    /// Fallback euclidean distance
462    #[allow(dead_code)]
463    pub fn simd_euclidean_distance(a: &[f64], b: &[f64]) -> f64 {
464        assert_eq!(a.len(), b.len());
465        a.iter()
466            .zip(b.iter())
467            .map(|(&x, &y)| (x - y) * (x - y))
468            .sum::<f64>()
469            .sqrt()
470    }
471
472    /// Fallback batch centrality update
473    #[allow(dead_code)]
474    pub fn simd_batch_centrality_update(
475        centralities: &mut [f64],
476        contributions: &[f64],
477        weights: &[f64],
478    ) {
479        for ((cent, &contrib), &weight) in centralities
480            .iter_mut()
481            .zip(contributions.iter())
482            .zip(weights.iter())
483        {
484            *cent += contrib * weight;
485        }
486    }
487
488    /// Fallback sparse matrix-vector multiplication
489    #[allow(dead_code)]
490    pub fn simd_sparse_matvec(
491        row_ptr: &[usize],
492        col_idx: &[usize],
493        values: &[f64],
494        x: &[f64],
495        y: &mut [f64],
496    ) {
497        y.fill(0.0);
498
499        for (i, y_i) in y.iter_mut().enumerate() {
500            let row_start = row_ptr[i];
501            let row_end = row_ptr[i + 1];
502
503            for j in row_start..row_end {
504                *y_i += values[j] * x[col_idx[j]];
505            }
506        }
507    }
508
509    /// Fallback batch degree computation
510    #[allow(dead_code)]
511    pub fn simd_batch_degree_computation(_rowptr: &[usize], degrees: &mut [usize]) {
512        for (i, degree) in degrees.iter_mut().enumerate() {
513            *degree = _rowptr[i + 1] - _rowptr[i];
514        }
515    }
516}
517
518/// Lazy evaluation wrapper for expensive graph computations
519pub struct LazyGraphMetric<T> {
520    /// The computed value stored in a thread-safe cell
521    value: std::sync::OnceLock<std::result::Result<T, GraphError>>,
522    /// Computation function stored in a mutex for thread safety
523    #[allow(clippy::type_complexity)]
524    compute_fn: std::sync::Mutex<Option<Box<dyn FnOnce() -> Result<T> + Send + 'static>>>,
525}
526
527impl<T> LazyGraphMetric<T>
528where
529    T: Send + 'static,
530{
531    /// Create a new lazy metric
532    pub fn new<F>(_computefn: F) -> Self
533    where
534        F: FnOnce() -> Result<T> + Send + 'static,
535    {
536        LazyGraphMetric {
537            value: std::sync::OnceLock::new(),
538            compute_fn: std::sync::Mutex::new(Some(Box::new(_computefn))),
539        }
540    }
541
542    /// Get the value, computing it if necessary (thread-safe)
543    pub fn get(&self) -> Result<&T> {
544        let result = self.value.get_or_init(|| {
545            // Extract the computation function from the mutex
546            let mut fn_guard = self.compute_fn.lock().unwrap();
547            if let Some(compute_fn) = fn_guard.take() {
548                // Execute the computation
549                compute_fn()
550            } else {
551                // Function already consumed, this shouldn't happen in normal usage
552                Err(GraphError::AlgorithmError(
553                    "Computation function already consumed".to_string(),
554                ))
555            }
556        });
557
558        match result {
559            Ok(value) => Ok(value),
560            Err(e) => Err(GraphError::AlgorithmError(format!(
561                "Lazy computation failed: {e}"
562            ))),
563        }
564    }
565
566    /// Check if the value has been computed
567    pub fn is_computed(&self) -> bool {
568        self.value.get().is_some()
569    }
570
571    /// Force computation if not already done
572    pub fn force(&self) -> Result<()> {
573        self.get().map(|_| ())
574    }
575
576    /// Get the cached result if available, without triggering computation
577    pub fn try_get(&self) -> Option<std::result::Result<&T, &GraphError>> {
578        self.value.get().map(|result| match result {
579            Ok(value) => Ok(value),
580            Err(error) => Err(error),
581        })
582    }
583}
584
585/// Performance-focused memory profiling metrics
586#[derive(Debug, Clone)]
587pub struct MemoryMetrics {
588    /// Current memory usage in bytes
589    pub current_bytes: usize,
590    /// Peak memory usage during operation
591    pub peak_bytes: usize,
592    /// Average memory usage
593    pub average_bytes: usize,
594    /// Number of allocations
595    pub allocation_count: usize,
596    /// Number of deallocations
597    pub deallocation_count: usize,
598    /// Memory growth rate (bytes per second)
599    pub growth_rate: f64,
600    /// Potential memory leaks (allocations - deallocations)
601    pub potential_leaks: isize,
602}
603
604impl Default for MemoryMetrics {
605    fn default() -> Self {
606        MemoryMetrics {
607            current_bytes: 0,
608            peak_bytes: 0,
609            average_bytes: 0,
610            allocation_count: 0,
611            deallocation_count: 0,
612            growth_rate: 0.0,
613            potential_leaks: 0,
614        }
615    }
616}
617
618/// Real-time memory profiler for graph operations
619pub struct RealTimeMemoryProfiler {
620    /// Memory samples over time
621    samples: Vec<(std::time::Instant, usize)>,
622    /// Start time
623    start_time: std::time::Instant,
624    /// Allocation tracking
625    allocations: AtomicUsize,
626    /// Deallocation tracking
627    deallocations: AtomicUsize,
628    /// Sampling interval in milliseconds
629    #[allow(dead_code)]
630    sample_interval_ms: u64,
631}
632
633impl RealTimeMemoryProfiler {
634    /// Create a new real-time profiler
635    pub fn new(sample_interval_ms: u64) -> Self {
636        RealTimeMemoryProfiler {
637            samples: Vec::new(),
638            start_time: std::time::Instant::now(),
639            allocations: AtomicUsize::new(0),
640            deallocations: AtomicUsize::new(0),
641            sample_interval_ms,
642        }
643    }
644
645    /// Record a memory measurement
646    pub fn sample_memory(&mut self, currentmemory: usize) {
647        self.samples
648            .push((std::time::Instant::now(), currentmemory));
649    }
650
651    /// Record an allocation
652    pub fn record_allocation(&self, size: usize) {
653        self.allocations.fetch_add(1, Ordering::Relaxed);
654    }
655
656    /// Record a deallocation
657    pub fn record_deallocation(&self, size: usize) {
658        self.deallocations.fetch_add(1, Ordering::Relaxed);
659    }
660
661    /// Generate comprehensive memory metrics
662    pub fn generate_metrics(&self) -> MemoryMetrics {
663        if self.samples.is_empty() {
664            return MemoryMetrics::default();
665        }
666
667        let current_bytes = self.samples.last().map(|(_, mem)| *mem).unwrap_or(0);
668        let peak_bytes = self.samples.iter().map(|(_, mem)| *mem).max().unwrap_or(0);
669        let average_bytes = if !self.samples.is_empty() {
670            self.samples.iter().map(|(_, mem)| *mem).sum::<usize>() / self.samples.len()
671        } else {
672            0
673        };
674
675        let allocation_count = self.allocations.load(Ordering::Relaxed);
676        let deallocation_count = self.deallocations.load(Ordering::Relaxed);
677        let potential_leaks = allocation_count as isize - deallocation_count as isize;
678
679        // Calculate growth rate
680        let growth_rate = if self.samples.len() >= 2 {
681            let first = &self.samples[0];
682            let last = &self.samples[self.samples.len() - 1];
683            let time_diff = last.0.duration_since(first.0).as_secs_f64();
684            let memory_diff = last.1 as f64 - first.1 as f64;
685            if time_diff > 0.0 {
686                memory_diff / time_diff
687            } else {
688                0.0
689            }
690        } else {
691            0.0
692        };
693
694        MemoryMetrics {
695            current_bytes,
696            peak_bytes,
697            average_bytes,
698            allocation_count,
699            deallocation_count,
700            growth_rate,
701            potential_leaks,
702        }
703    }
704
705    /// Check for potential memory issues
706    pub fn analyze_memory_health(&self) -> Vec<String> {
707        let metrics = self.generate_metrics();
708        let mut warnings = Vec::new();
709
710        // Check for rapid memory growth
711        if metrics.growth_rate > 1_000_000.0 {
712            // 1MB/second
713            warnings.push(format!(
714                "High memory growth rate: {:.2} bytes/second",
715                metrics.growth_rate
716            ));
717        }
718
719        // Check for potential leaks
720        if metrics.potential_leaks > 1000 {
721            warnings.push(format!(
722                "Potential memory leak detected: {} unmatched allocations",
723                metrics.potential_leaks
724            ));
725        }
726
727        // Check for excessive peak memory
728        if metrics.peak_bytes > 1_000_000_000 {
729            // 1GB
730            warnings.push(format!(
731                "High peak memory usage: {:.2} MB",
732                metrics.peak_bytes as f64 / 1_000_000.0
733            ));
734        }
735
736        warnings
737    }
738
739    /// Export memory timeline for visualization
740    pub fn export_timeline(&self) -> Vec<(f64, usize)> {
741        self.samples
742            .iter()
743            .map(|(time, memory)| {
744                let elapsed = time.duration_since(self.start_time).as_secs_f64();
745                (elapsed, *memory)
746            })
747            .collect()
748    }
749}
750
751/// Performance monitoring utilities with enhanced memory profiling
752pub struct PerformanceMonitor {
753    /// Start time of current operation
754    start_time: std::time::Instant,
755    /// Operation name
756    operation_name: String,
757    /// Real-time memory profiler
758    memory_profiler: RealTimeMemoryProfiler,
759    /// Memory sampling thread handle
760    sampling_active: Arc<std::sync::atomic::AtomicBool>,
761}
762
763impl PerformanceMonitor {
764    /// Start monitoring a new operation with memory profiling
765    pub fn start(_operationname: String) -> Self {
766        Self::start_with_config(_operationname, 100) // Sample every 100ms by default
767    }
768
769    /// Start monitoring with custom sampling interval
770    pub fn start_with_config(operation_name: String, sample_intervalms: u64) -> Self {
771        PerformanceMonitor {
772            start_time: std::time::Instant::now(),
773            operation_name,
774            memory_profiler: RealTimeMemoryProfiler::new(sample_intervalms),
775            sampling_active: Arc::new(std::sync::atomic::AtomicBool::new(true)),
776        }
777    }
778
779    /// Manually record current memory usage
780    pub fn record_memory(&mut self, currentmemory: usize) {
781        self.memory_profiler.sample_memory(currentmemory);
782    }
783
784    /// Record an allocation event
785    pub fn record_allocation(&self, size: usize) {
786        self.memory_profiler.record_allocation(size);
787    }
788
789    /// Record a deallocation event
790    pub fn record_deallocation(&self, size: usize) {
791        self.memory_profiler.record_deallocation(size);
792    }
793
794    /// Get current memory metrics
795    pub fn get_memory_metrics(&self) -> MemoryMetrics {
796        self.memory_profiler.generate_metrics()
797    }
798
799    /// Check for memory health issues
800    pub fn check_memory_health(&self) -> Vec<String> {
801        self.memory_profiler.analyze_memory_health()
802    }
803
804    /// Get memory timeline for analysis
805    pub fn get_memory_timeline(&self) -> Vec<(f64, usize)> {
806        self.memory_profiler.export_timeline()
807    }
808
809    /// Update peak memory usage (legacy method)
810    pub fn update_memory(&mut self, currentmemory: usize) {
811        self.record_memory(currentmemory);
812    }
813
814    /// Finish monitoring and return comprehensive performance metrics
815    pub fn finish(self) -> PerformanceReport {
816        self.sampling_active.store(false, Ordering::Relaxed);
817
818        let duration = self.start_time.elapsed();
819        let memory_metrics = self.memory_profiler.generate_metrics();
820        let memory_warnings = self.memory_profiler.analyze_memory_health();
821        let timeline = self.memory_profiler.export_timeline();
822
823        let report = PerformanceReport {
824            operation_name: self.operation_name.clone(),
825            duration,
826            memory_metrics,
827            memory_warnings: memory_warnings.clone(),
828            timeline,
829        };
830
831        println!(
832            "Operation '{}' completed in {:?}",
833            self.operation_name, duration
834        );
835        println!(
836            "Memory: peak={:.2}MB, avg={:.2}MB, current={:.2}MB",
837            report.memory_metrics.peak_bytes as f64 / 1_000_000.0,
838            report.memory_metrics.average_bytes as f64 / 1_000_000.0,
839            report.memory_metrics.current_bytes as f64 / 1_000_000.0
840        );
841
842        if !memory_warnings.is_empty() {
843            println!("Memory warnings:");
844            for warning in &memory_warnings {
845                println!("  - {warning}");
846            }
847        }
848
849        report
850    }
851}
852
853/// Comprehensive performance report
854#[derive(Debug)]
855pub struct PerformanceReport {
856    /// Operation name
857    pub operation_name: String,
858    /// Total execution duration
859    pub duration: std::time::Duration,
860    /// Memory metrics
861    pub memory_metrics: MemoryMetrics,
862    /// Memory health warnings
863    pub memory_warnings: Vec<String>,
864    /// Memory usage timeline
865    pub timeline: Vec<(f64, usize)>,
866}
867
868/// Optimized graph algorithms trait for large graphs
869pub trait LargeGraphOps<N: Node, E: EdgeWeight> {
870    /// Parallel computation of node degrees
871    fn parallel_degrees(&self, config: &ParallelConfig) -> Result<HashMap<N, usize>>;
872
873    /// Memory-efficient iteration over edges
874    fn iter_edges_chunked(&self, chunksize: usize) -> LargeGraphIterator<N, E>;
875
876    /// Cache-friendly matrix representation
877    fn cache_friendly_matrix(&self) -> Result<Vec<Vec<E>>>;
878}
879
880impl<N: Node + std::fmt::Debug, E: EdgeWeight, Ix: petgraph::graph::IndexType + Send + Sync>
881    LargeGraphOps<N, E> for Graph<N, E, Ix>
882where
883    N: Clone + Send + Sync + std::fmt::Debug,
884    E: Clone + Send + Sync + num_traits::Zero + Copy,
885{
886    fn parallel_degrees(&self, config: &ParallelConfig) -> Result<HashMap<N, usize>> {
887        parallel_degree_computation(self, config)
888    }
889
890    fn iter_edges_chunked(&self, chunksize: usize) -> LargeGraphIterator<N, E> {
891        LargeGraphIterator::new(self, chunksize)
892    }
893
894    fn cache_friendly_matrix(&self) -> Result<Vec<Vec<E>>> {
895        cache_friendly_adjacency_matrix(self)
896    }
897}
898
899#[cfg(test)]
900mod tests {
901    use super::*;
902
903    #[test]
904    fn test_parallel_config() {
905        let config = ParallelConfig::default();
906        assert_eq!(config.chunk_size, 1000);
907        assert!(config.enable_simd);
908    }
909
910    #[test]
911    fn test_large_graph_iterator() {
912        let mut graph: Graph<i32, f64> = Graph::new();
913        graph.add_edge(1, 2, 1.0).unwrap();
914        graph.add_edge(2, 3, 2.0).unwrap();
915        graph.add_edge(3, 4, 3.0).unwrap();
916
917        let mut iterator = LargeGraphIterator::new(&graph, 2);
918
919        let chunk1 = iterator.next_chunk();
920        assert!(chunk1.is_some());
921        assert_eq!(chunk1.unwrap().len(), 2);
922
923        let chunk2 = iterator.next_chunk();
924        assert!(chunk2.is_some());
925        assert_eq!(chunk2.unwrap().len(), 1);
926
927        let chunk3 = iterator.next_chunk();
928        assert!(chunk3.is_none());
929    }
930
931    #[test]
932    fn test_parallel_degree_computation() {
933        let mut graph: Graph<i32, f64> = Graph::new();
934        graph.add_edge(1, 2, 1.0).unwrap();
935        graph.add_edge(2, 3, 2.0).unwrap();
936        graph.add_edge(3, 1, 3.0).unwrap();
937
938        let config = ParallelConfig::default();
939        let degrees = graph.parallel_degrees(&config).unwrap();
940
941        assert_eq!(degrees[&1], 2);
942        assert_eq!(degrees[&2], 2);
943        assert_eq!(degrees[&3], 2);
944    }
945
946    #[test]
947    fn test_streaming_processor() {
948        let mut processor: StreamingGraphProcessor<i32, f64> = StreamingGraphProcessor::new(2);
949
950        processor.add_edge(1, 2, 1.0).unwrap();
951        assert_eq!(processor.edge_count(), 0); // Not yet processed
952
953        processor.add_edge(2, 3, 2.0).unwrap();
954        assert_eq!(processor.edge_count(), 2); // Batch processed
955
956        let (total_edges, degrees) = processor.finish().unwrap();
957        assert_eq!(total_edges, 2);
958        assert_eq!(degrees[&1], 1);
959        assert_eq!(degrees[&2], 2);
960        assert_eq!(degrees[&3], 1);
961    }
962
963    #[test]
964    fn test_cache_friendly_matrix() {
965        let mut graph: Graph<i32, f64> = Graph::new();
966        graph.add_edge(0, 1, 1.0).unwrap();
967        graph.add_edge(1, 2, 2.0).unwrap();
968
969        let matrix = graph.cache_friendly_matrix().unwrap();
970        assert_eq!(matrix.len(), 3);
971        assert_eq!(matrix[0][1], 1.0);
972        assert_eq!(matrix[1][2], 2.0);
973        assert_eq!(matrix[2][1], 2.0); // Undirected
974    }
975
976    #[test]
977    fn test_performance_monitor() {
978        let mut monitor = PerformanceMonitor::start("test_operation".to_string());
979
980        // Simulate memory usage
981        monitor.record_memory(1024);
982        monitor.record_memory(2048);
983        monitor.record_memory(1536);
984
985        // Simulate allocations
986        monitor.record_allocation(1024);
987        monitor.record_allocation(512);
988        monitor.record_deallocation(256);
989
990        std::thread::sleep(std::time::Duration::from_millis(10));
991        let report = monitor.finish();
992
993        assert!(report.duration.as_millis() >= 10);
994        assert_eq!(report.memory_metrics.peak_bytes, 2048);
995        assert_eq!(report.memory_metrics.current_bytes, 1536);
996        assert_eq!(report.memory_metrics.allocation_count, 2);
997        assert_eq!(report.memory_metrics.deallocation_count, 1);
998        assert_eq!(report.memory_metrics.potential_leaks, 1);
999    }
1000
1001    #[test]
1002    fn test_real_time_memory_profiler() {
1003        let mut profiler = RealTimeMemoryProfiler::new(50);
1004
1005        // Record memory samples
1006        profiler.sample_memory(1000);
1007        std::thread::sleep(std::time::Duration::from_millis(10));
1008        profiler.sample_memory(2000);
1009        std::thread::sleep(std::time::Duration::from_millis(10));
1010        profiler.sample_memory(1500);
1011
1012        // Record allocations/deallocations
1013        profiler.record_allocation(1000);
1014        profiler.record_allocation(500);
1015        profiler.record_deallocation(200);
1016
1017        let metrics = profiler.generate_metrics();
1018        assert_eq!(metrics.current_bytes, 1500);
1019        assert_eq!(metrics.peak_bytes, 2000);
1020        assert!(metrics.average_bytes > 0);
1021        assert_eq!(metrics.allocation_count, 2);
1022        assert_eq!(metrics.deallocation_count, 1);
1023        assert_eq!(metrics.potential_leaks, 1);
1024
1025        // Test timeline export
1026        let timeline = profiler.export_timeline();
1027        assert_eq!(timeline.len(), 3);
1028        assert_eq!(timeline[0].1, 1000);
1029        assert_eq!(timeline[1].1, 2000);
1030        assert_eq!(timeline[2].1, 1500);
1031    }
1032
1033    #[test]
1034    fn test_memory_health_analysis() {
1035        let mut profiler = RealTimeMemoryProfiler::new(100);
1036
1037        // Simulate high memory growth
1038        profiler.sample_memory(100_000_000);
1039        std::thread::sleep(std::time::Duration::from_millis(50));
1040        profiler.sample_memory(200_000_000);
1041
1042        // Simulate many unmatched allocations
1043        for _ in 0..1500 {
1044            profiler.record_allocation(1024);
1045        }
1046
1047        let warnings = profiler.analyze_memory_health();
1048        assert!(!warnings.is_empty());
1049
1050        // Should warn about high growth rate and potential leaks
1051        let has_growth_warning = warnings.iter().any(|w| w.contains("growth rate"));
1052        let has_leak_warning = warnings.iter().any(|w| w.contains("leak"));
1053
1054        assert!(has_growth_warning);
1055        assert!(has_leak_warning);
1056    }
1057
1058    #[test]
1059    fn test_memory_metrics_calculation() {
1060        let mut profiler = RealTimeMemoryProfiler::new(100);
1061
1062        // Create a clear growth pattern
1063        profiler.sample_memory(1000);
1064        std::thread::sleep(std::time::Duration::from_millis(100));
1065        profiler.sample_memory(2000);
1066        std::thread::sleep(std::time::Duration::from_millis(100));
1067        profiler.sample_memory(3000);
1068
1069        let metrics = profiler.generate_metrics();
1070
1071        // Should have positive growth rate
1072        assert!(metrics.growth_rate > 0.0);
1073
1074        // Average should be around 2000
1075        assert!(metrics.average_bytes >= 1500 && metrics.average_bytes <= 2500);
1076
1077        // Peak should be 3000
1078        assert_eq!(metrics.peak_bytes, 3000);
1079
1080        // Current should be 3000
1081        assert_eq!(metrics.current_bytes, 3000);
1082    }
1083
1084    #[test]
1085    fn test_simd_operations() {
1086        use crate::performance::simd_ops::*;
1087
1088        let a = vec![1.0, 2.0, 3.0];
1089        let b = vec![4.0, 5.0, 6.0];
1090
1091        // Test vector addition
1092        let sum = simd_vector_add(&a, &b);
1093        assert_eq!(sum, vec![5.0, 7.0, 9.0]);
1094
1095        // Test dot product
1096        let dot = simd_dot_product(&a, &b);
1097        assert_eq!(dot, 32.0); // 1*4 + 2*5 + 3*6
1098
1099        // Test cosine similarity
1100        let similarity = simd_cosine_similarity(&a, &b);
1101        assert!((similarity - 0.9746318461970762).abs() < 1e-10); // Known cosine similarity
1102
1103        // Test euclidean distance
1104        let distance = simd_euclidean_distance(&a, &b);
1105        assert!((distance - 5.196152422706632).abs() < 1e-10); // sqrt((4-1)^2 + (5-2)^2 + (6-3)^2)
1106
1107        // Test vector normalization
1108        let mut vector = vec![3.0, 4.0, 0.0];
1109        simd_normalize(&mut vector);
1110        let expected_norm =
1111            (vector[0] * vector[0] + vector[1] * vector[1] + vector[2] * vector[2]).sqrt();
1112        assert!((expected_norm - 1.0).abs() < 1e-10);
1113
1114        // Test batch centrality update
1115        let mut centralities = vec![1.0, 2.0, 3.0];
1116        let contributions = vec![0.5, 1.0, 1.5];
1117        let weights = vec![2.0, 2.0, 2.0];
1118        simd_batch_centrality_update(&mut centralities, &contributions, &weights);
1119        assert_eq!(centralities, vec![2.0, 4.0, 6.0]); // 1+0.5*2, 2+1*2, 3+1.5*2
1120    }
1121
1122    #[test]
1123    fn test_sparse_matvec() {
1124        use crate::performance::simd_ops::*;
1125
1126        // Create a simple 3x3 sparse matrix in CSR format:
1127        // [1 0 2]
1128        // [0 3 0]
1129        // [1 0 4]
1130        let row_ptr = vec![0, 2, 3, 5];
1131        let col_idx = vec![0, 2, 1, 0, 2];
1132        let values = vec![1.0, 2.0, 3.0, 1.0, 4.0];
1133        let x = vec![1.0, 1.0, 1.0];
1134        let mut y = vec![0.0; 3];
1135
1136        simd_sparse_matvec(&row_ptr, &col_idx, &values, &x, &mut y);
1137
1138        // Expected: [1*1 + 2*1, 3*1, 1*1 + 4*1] = [3, 3, 5]
1139        assert_eq!(y, vec![3.0, 3.0, 5.0]);
1140    }
1141
1142    #[test]
1143    fn test_batch_degree_computation() {
1144        use crate::performance::simd_ops::*;
1145
1146        // Row pointers for nodes with degrees [2, 1, 2]
1147        let row_ptr = vec![0, 2, 3, 5];
1148        let mut degrees = vec![0; 3];
1149
1150        simd_batch_degree_computation(&row_ptr, &mut degrees);
1151
1152        assert_eq!(degrees, vec![2, 1, 2]);
1153    }
1154
1155    #[test]
1156    fn test_lazy_graph_metric() {
1157        use std::sync::atomic::{AtomicUsize, Ordering};
1158        use std::sync::Arc;
1159
1160        // Test basic lazy evaluation
1161        let counter = Arc::new(AtomicUsize::new(0));
1162        let counter_clone = counter.clone();
1163
1164        let lazy_metric = LazyGraphMetric::new(move || {
1165            counter_clone.fetch_add(1, Ordering::Relaxed);
1166            Ok(42i32)
1167        });
1168
1169        // Initially not computed
1170        assert!(!lazy_metric.is_computed());
1171        assert_eq!(counter.load(Ordering::Relaxed), 0);
1172
1173        // First access computes the value
1174        let result1 = lazy_metric.get().unwrap();
1175        assert_eq!(*result1, 42);
1176        assert!(lazy_metric.is_computed());
1177        assert_eq!(counter.load(Ordering::Relaxed), 1);
1178
1179        // Second access returns cached value
1180        let result2 = lazy_metric.get().unwrap();
1181        assert_eq!(*result2, 42);
1182        assert_eq!(counter.load(Ordering::Relaxed), 1); // Not incremented again
1183
1184        // try_get should return the cached value
1185        assert!(lazy_metric.try_get().is_some());
1186    }
1187
1188    #[test]
1189    fn test_lazy_graph_metric_error() {
1190        let lazy_metric: LazyGraphMetric<String> =
1191            LazyGraphMetric::new(|| Err(GraphError::AlgorithmError("Test error".to_string())));
1192
1193        // Should propagate the error
1194        let result = lazy_metric.get();
1195        assert!(result.is_err());
1196
1197        // Subsequent calls should return the same error
1198        let result2 = lazy_metric.get();
1199        assert!(result2.is_err());
1200    }
1201
1202    #[test]
1203    fn test_lazy_graph_metric_thread_safety() {
1204        use std::sync::atomic::{AtomicUsize, Ordering};
1205        use std::sync::Arc;
1206        use std::thread;
1207
1208        let counter = Arc::new(AtomicUsize::new(0));
1209        let counter_clone = counter.clone();
1210
1211        let lazy_metric = Arc::new(LazyGraphMetric::new(move || {
1212            counter_clone.fetch_add(1, Ordering::Relaxed);
1213            std::thread::sleep(std::time::Duration::from_millis(10)); // Simulate work
1214            Ok(100i32)
1215        }));
1216
1217        // Spawn multiple threads that try to access the value
1218        let handles: Vec<_> = (0..10)
1219            .map(|_| {
1220                let metric = lazy_metric.clone();
1221                thread::spawn(move || *metric.get().unwrap())
1222            })
1223            .collect();
1224
1225        // Wait for all threads and collect results
1226        let results: Vec<i32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1227
1228        // All threads should get the same value
1229        assert!(results.iter().all(|&x| x == 100));
1230
1231        // Computation should only happen once
1232        assert_eq!(counter.load(Ordering::Relaxed), 1);
1233    }
1234}