memscope_rs/lockfree/
resource_integration.rs

1//! Integration layer that combines memory tracking with platform resource monitoring
2//!
3//! This module provides unified API for comprehensive system analysis in multi-threaded environments
4
5use super::analysis::LockfreeAnalysis;
6use super::platform_resources::{PlatformResourceCollector, PlatformResourceMetrics};
7use super::tracker::ThreadLocalTracker;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::atomic::{AtomicBool, Ordering};
11use std::sync::{Arc, Mutex};
12use std::thread;
13use std::time::Instant;
14
15/// Comprehensive analysis combining memory and system resource data
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ComprehensiveAnalysis {
18    pub memory_analysis: LockfreeAnalysis,
19    pub resource_timeline: Vec<PlatformResourceMetrics>,
20    pub correlation_metrics: CorrelationMetrics,
21    pub performance_insights: PerformanceInsights,
22}
23
24/// Correlation analysis between memory operations and system resources
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct CorrelationMetrics {
27    pub memory_cpu_correlation: f64,
28    pub memory_gpu_correlation: f64,
29    pub memory_io_correlation: f64,
30    pub allocation_rate_vs_cpu_usage: f64,
31    pub deallocation_rate_vs_memory_pressure: f64,
32}
33
34/// Performance insights derived from combined analysis
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct PerformanceInsights {
37    pub primary_bottleneck: BottleneckType,
38    pub cpu_efficiency_score: f32,
39    pub memory_efficiency_score: f32,
40    pub io_efficiency_score: f32,
41    pub recommendations: Vec<String>,
42    pub thread_performance_ranking: Vec<ThreadPerformanceMetric>,
43}
44
45/// Type of performance bottleneck identified
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub enum BottleneckType {
48    CpuBound,
49    MemoryBound,
50    IoBound,
51    GpuBound,
52    ContentionBound,
53    Balanced,
54}
55
56/// Per-thread performance ranking
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct ThreadPerformanceMetric {
59    pub thread_id: u64,
60    pub thread_name: Option<String>,
61    pub efficiency_score: f32,
62    pub resource_usage_score: f32,
63    pub allocation_efficiency: f32,
64}
65
66/// Integrated profiling session that tracks both memory and system resources
67pub struct IntegratedProfilingSession {
68    memory_trackers: HashMap<u64, ThreadLocalTracker>,
69    #[allow(dead_code)]
70    resource_collector: PlatformResourceCollector,
71    resource_timeline: Arc<Mutex<Vec<PlatformResourceMetrics>>>,
72    is_active: Arc<AtomicBool>,
73    collection_thread: Option<thread::JoinHandle<()>>,
74    start_time: Instant,
75    output_directory: std::path::PathBuf,
76}
77
78impl IntegratedProfilingSession {
79    /// Create new integrated profiling session
80    pub fn new(output_dir: &std::path::Path) -> Result<Self, Box<dyn std::error::Error>> {
81        let resource_collector = PlatformResourceCollector::new()?;
82
83        Ok(Self {
84            memory_trackers: HashMap::new(),
85            resource_collector,
86            resource_timeline: Arc::new(Mutex::new(Vec::new())),
87            is_active: Arc::new(AtomicBool::new(false)),
88            collection_thread: None,
89            start_time: Instant::now(),
90            output_directory: output_dir.to_path_buf(),
91        })
92    }
93
94    /// Start comprehensive profiling
95    pub fn start_profiling(&mut self) -> Result<(), Box<dyn std::error::Error>> {
96        self.is_active.store(true, Ordering::SeqCst);
97
98        // Start resource collection thread
99        let is_active = self.is_active.clone();
100        let resource_timeline = self.resource_timeline.clone();
101        let mut collector = PlatformResourceCollector::new()?;
102
103        let handle = thread::Builder::new()
104            .name("resource_collector".to_string())
105            .spawn(move || {
106                let collection_interval = collector.get_optimal_collection_interval();
107
108                while is_active.load(Ordering::Relaxed) {
109                    let start = Instant::now();
110
111                    match collector.collect_metrics() {
112                        Ok(metrics) => {
113                            // Store metrics in shared timeline
114                            if let Ok(mut timeline) = resource_timeline.lock() {
115                                timeline.push(metrics);
116                            }
117                        }
118                        Err(_e) => {
119                            // Handle collection errors gracefully
120                        }
121                    }
122
123                    let elapsed = start.elapsed();
124                    if elapsed < collection_interval {
125                        thread::sleep(collection_interval - elapsed);
126                    }
127                }
128            })?;
129
130        self.collection_thread = Some(handle);
131        Ok(())
132    }
133
134    /// Stop profiling and generate comprehensive analysis
135    pub fn stop_profiling_and_analyze(
136        &mut self,
137    ) -> Result<ComprehensiveAnalysis, Box<dyn std::error::Error>> {
138        self.is_active.store(false, Ordering::SeqCst);
139
140        // Wait for collection thread to finish
141        if let Some(handle) = self.collection_thread.take() {
142            let _ = handle.join();
143        }
144
145        // Finalize all memory trackers
146        for (_, mut tracker) in self.memory_trackers.drain() {
147            let _ = tracker.finalize();
148        }
149
150        // Generate comprehensive analysis
151        self.generate_comprehensive_analysis()
152    }
153
154    fn generate_comprehensive_analysis(
155        &self,
156    ) -> Result<ComprehensiveAnalysis, Box<dyn std::error::Error>> {
157        // Aggregate memory analysis from all threads
158        let aggregator = super::aggregator::LockfreeAggregator::new(self.output_directory.clone());
159        let memory_analysis = aggregator.aggregate_all_threads()?;
160
161        // Calculate correlations
162        let correlation_metrics = self.calculate_correlations(&memory_analysis)?;
163
164        // Generate performance insights
165        let performance_insights =
166            self.generate_performance_insights(&memory_analysis, &correlation_metrics)?;
167
168        // Get resource timeline data
169        let resource_timeline = self
170            .resource_timeline
171            .lock()
172            .map_err(|_| "Failed to lock resource timeline")?
173            .clone();
174
175        Ok(ComprehensiveAnalysis {
176            memory_analysis,
177            resource_timeline,
178            correlation_metrics,
179            performance_insights,
180        })
181    }
182
183    fn calculate_correlations(
184        &self,
185        memory_analysis: &LockfreeAnalysis,
186    ) -> Result<CorrelationMetrics, Box<dyn std::error::Error>> {
187        // Calculate correlation between memory operations and system resources
188        let memory_cpu_correlation = self.calculate_memory_cpu_correlation(memory_analysis);
189        let memory_gpu_correlation = self.calculate_memory_gpu_correlation(memory_analysis);
190        let memory_io_correlation = self.calculate_memory_io_correlation(memory_analysis);
191        let allocation_rate_vs_cpu_usage =
192            self.calculate_allocation_cpu_correlation(memory_analysis);
193        let deallocation_rate_vs_memory_pressure =
194            self.calculate_deallocation_pressure_correlation(memory_analysis);
195
196        Ok(CorrelationMetrics {
197            memory_cpu_correlation,
198            memory_gpu_correlation,
199            memory_io_correlation,
200            allocation_rate_vs_cpu_usage,
201            deallocation_rate_vs_memory_pressure,
202        })
203    }
204
205    fn calculate_memory_cpu_correlation(&self, _memory_analysis: &LockfreeAnalysis) -> f64 {
206        let timeline = match self.resource_timeline.lock() {
207            Ok(timeline) => timeline,
208            Err(_) => return 0.0,
209        };
210
211        if timeline.is_empty() {
212            return 0.0;
213        }
214
215        // Calculate correlation between allocation rate and CPU usage
216        let avg_cpu_usage: f32 = timeline
217            .iter()
218            .map(|r| r.cpu_metrics.overall_usage_percent)
219            .sum::<f32>()
220            / timeline.len() as f32;
221
222        // Simple correlation based on average CPU usage
223        // In real implementation, would use proper statistical correlation
224        match avg_cpu_usage {
225            usage if usage > 80.0 => 0.8,
226            usage if usage > 60.0 => 0.6,
227            usage if usage > 40.0 => 0.4,
228            _ => 0.2,
229        }
230    }
231
232    fn calculate_memory_gpu_correlation(&self, _memory_analysis: &LockfreeAnalysis) -> f64 {
233        let timeline = match self.resource_timeline.lock() {
234            Ok(timeline) => timeline,
235            Err(_) => return 0.0,
236        };
237
238        let gpu_samples = timeline.iter().filter(|r| r.gpu_metrics.is_some()).count();
239
240        if gpu_samples == 0 {
241            return 0.0;
242        }
243
244        // Calculate correlation with GPU usage
245        let avg_gpu_usage: f32 = timeline
246            .iter()
247            .filter_map(|r| r.gpu_metrics.as_ref())
248            .map(|g| g.compute_usage_percent)
249            .sum::<f32>()
250            / gpu_samples as f32;
251
252        if avg_gpu_usage > 50.0 {
253            0.5
254        } else {
255            0.1
256        }
257    }
258
259    fn calculate_memory_io_correlation(&self, _memory_analysis: &LockfreeAnalysis) -> f64 {
260        let timeline = match self.resource_timeline.lock() {
261            Ok(timeline) => timeline,
262            Err(_) => return 0.0,
263        };
264
265        if timeline.is_empty() {
266            return 0.0;
267        }
268
269        // Calculate correlation with I/O activity
270        let avg_io_activity: u64 = timeline
271            .iter()
272            .map(|r| r.io_metrics.disk_read_bytes_per_sec + r.io_metrics.disk_write_bytes_per_sec)
273            .sum::<u64>()
274            / timeline.len() as u64;
275
276        match avg_io_activity {
277            activity if activity > 100_000_000 => 0.7, // > 100MB/s
278            activity if activity > 10_000_000 => 0.4,  // > 10MB/s
279            _ => 0.1,
280        }
281    }
282
283    fn calculate_allocation_cpu_correlation(&self, memory_analysis: &LockfreeAnalysis) -> f64 {
284        // Correlation between allocation frequency and CPU usage
285        let allocation_count = memory_analysis.summary.total_allocations;
286        let duration_secs = self.start_time.elapsed().as_secs() as f64;
287        let allocation_rate = allocation_count as f64 / duration_secs.max(1.0);
288
289        // Higher allocation rates typically correlate with higher CPU usage
290        match allocation_rate {
291            rate if rate > 10000.0 => 0.8,
292            rate if rate > 1000.0 => 0.6,
293            rate if rate > 100.0 => 0.4,
294            _ => 0.2,
295        }
296    }
297
298    fn calculate_deallocation_pressure_correlation(
299        &self,
300        memory_analysis: &LockfreeAnalysis,
301    ) -> f64 {
302        // Correlation between deallocation rate and memory pressure
303        let deallocation_count = memory_analysis.summary.total_deallocations;
304        let peak_memory = memory_analysis.summary.peak_memory_usage;
305
306        // Higher memory pressure typically leads to more frequent deallocations
307        if peak_memory > 1_000_000_000 && deallocation_count > 1000 {
308            // > 1GB peak
309            0.7
310        } else if peak_memory > 100_000_000 && deallocation_count > 100 {
311            // > 100MB peak
312            0.4
313        } else {
314            0.1
315        }
316    }
317
318    fn generate_performance_insights(
319        &self,
320        memory_analysis: &LockfreeAnalysis,
321        correlations: &CorrelationMetrics,
322    ) -> Result<PerformanceInsights, Box<dyn std::error::Error>> {
323        // Identify primary bottleneck
324        let primary_bottleneck = self.identify_primary_bottleneck(correlations);
325
326        // Calculate efficiency scores
327        let cpu_efficiency_score = self.calculate_cpu_efficiency_score();
328        let memory_efficiency_score = self.calculate_memory_efficiency_score(memory_analysis);
329        let io_efficiency_score = self.calculate_io_efficiency_score();
330
331        // Generate recommendations
332        let recommendations = self.generate_recommendations(&primary_bottleneck, memory_analysis);
333
334        // Rank thread performance
335        let thread_performance_ranking = self.rank_thread_performance(memory_analysis);
336
337        Ok(PerformanceInsights {
338            primary_bottleneck,
339            cpu_efficiency_score,
340            memory_efficiency_score,
341            io_efficiency_score,
342            recommendations,
343            thread_performance_ranking,
344        })
345    }
346
347    fn identify_primary_bottleneck(&self, correlations: &CorrelationMetrics) -> BottleneckType {
348        // Determine bottleneck based on correlation strengths
349        let cpu_score =
350            correlations.memory_cpu_correlation + correlations.allocation_rate_vs_cpu_usage;
351        let memory_score = correlations.deallocation_rate_vs_memory_pressure;
352        let io_score = correlations.memory_io_correlation;
353        let gpu_score = correlations.memory_gpu_correlation;
354
355        if cpu_score > 1.0 {
356            BottleneckType::CpuBound
357        } else if memory_score > 0.5 {
358            BottleneckType::MemoryBound
359        } else if io_score > 0.5 {
360            BottleneckType::IoBound
361        } else if gpu_score > 0.4 {
362            BottleneckType::GpuBound
363        } else {
364            BottleneckType::Balanced
365        }
366    }
367
368    fn calculate_cpu_efficiency_score(&self) -> f32 {
369        let timeline = match self.resource_timeline.lock() {
370            Ok(timeline) => timeline,
371            Err(_) => return 0.0,
372        };
373
374        if timeline.is_empty() {
375            return 0.0;
376        }
377
378        let avg_cpu_usage: f32 = timeline
379            .iter()
380            .map(|r| r.cpu_metrics.overall_usage_percent)
381            .sum::<f32>()
382            / timeline.len() as f32;
383
384        // Efficiency is good when CPU usage is moderate (not too low, not maxed out)
385        match avg_cpu_usage {
386            usage if (70.0..=85.0).contains(&usage) => 90.0,
387            usage if (50.0..=95.0).contains(&usage) => 75.0,
388            usage if usage >= 30.0 => 60.0,
389            _ => 40.0,
390        }
391    }
392
393    fn calculate_memory_efficiency_score(&self, memory_analysis: &LockfreeAnalysis) -> f32 {
394        let allocation_count = memory_analysis.summary.total_allocations;
395        let deallocation_count = memory_analysis.summary.total_deallocations;
396
397        if allocation_count == 0 {
398            return 0.0;
399        }
400
401        // Good efficiency when allocations and deallocations are balanced
402        let balance_ratio = deallocation_count as f32 / allocation_count as f32;
403        match balance_ratio {
404            ratio if (0.9..=1.1).contains(&ratio) => 95.0,
405            ratio if (0.8..=1.2).contains(&ratio) => 85.0,
406            ratio if (0.7..=1.3).contains(&ratio) => 70.0,
407            _ => 50.0,
408        }
409    }
410
411    fn calculate_io_efficiency_score(&self) -> f32 {
412        let timeline = match self.resource_timeline.lock() {
413            Ok(timeline) => timeline,
414            Err(_) => return 0.0,
415        };
416
417        if timeline.is_empty() {
418            return 0.0;
419        }
420
421        // Calculate average I/O throughput
422        let avg_io_throughput: u64 = timeline
423            .iter()
424            .map(|r| r.io_metrics.disk_read_bytes_per_sec + r.io_metrics.disk_write_bytes_per_sec)
425            .sum::<u64>()
426            / timeline.len() as u64;
427
428        // Efficiency based on consistent I/O patterns (not too bursty)
429        match avg_io_throughput {
430            throughput if throughput > 0 && throughput < 1_000_000_000 => 80.0, // < 1GB/s
431            throughput if throughput > 0 => 60.0,
432            _ => 40.0,
433        }
434    }
435
436    fn generate_recommendations(
437        &self,
438        bottleneck: &BottleneckType,
439        memory_analysis: &LockfreeAnalysis,
440    ) -> Vec<String> {
441        let mut recommendations = Vec::new();
442
443        match bottleneck {
444            BottleneckType::CpuBound => {
445                recommendations.push(
446                    "Consider reducing CPU-intensive operations in memory allocation paths"
447                        .to_string(),
448                );
449                recommendations
450                    .push("Optimize hot allocation patterns identified in analysis".to_string());
451            }
452            BottleneckType::MemoryBound => {
453                recommendations
454                    .push("Reduce memory fragmentation by using memory pools".to_string());
455                recommendations.push(
456                    "Consider implementing object recycling for frequently allocated types"
457                        .to_string(),
458                );
459            }
460            BottleneckType::IoBound => {
461                recommendations
462                    .push("Reduce I/O operations during memory-intensive phases".to_string());
463                recommendations.push(
464                    "Consider async I/O patterns to avoid blocking memory operations".to_string(),
465                );
466            }
467            BottleneckType::GpuBound => {
468                recommendations
469                    .push("Optimize GPU memory transfers and synchronization".to_string());
470                recommendations.push("Consider reducing GPU-CPU memory copying".to_string());
471            }
472            BottleneckType::ContentionBound => {
473                recommendations
474                    .push("Reduce lock contention in memory allocation paths".to_string());
475                recommendations.push("Consider using lock-free data structures".to_string());
476            }
477            BottleneckType::Balanced => {
478                recommendations.push("System performance appears well-balanced".to_string());
479                recommendations
480                    .push("Monitor for performance regression in future updates".to_string());
481            }
482        }
483
484        // Add memory-specific recommendations
485        let avg_allocation_size = if memory_analysis.summary.total_allocations > 0 {
486            memory_analysis.summary.peak_memory_usage
487                / memory_analysis.summary.total_allocations as usize
488        } else {
489            0
490        };
491
492        if avg_allocation_size > 1024 * 1024 {
493            // > 1MB average
494            recommendations.push(
495                "Large average allocation size detected - consider memory streaming".to_string(),
496            );
497        } else if avg_allocation_size < 64 {
498            // < 64 bytes average
499            recommendations
500                .push("Many small allocations detected - consider object pooling".to_string());
501        }
502
503        recommendations
504    }
505
506    fn rank_thread_performance(
507        &self,
508        memory_analysis: &LockfreeAnalysis,
509    ) -> Vec<ThreadPerformanceMetric> {
510        let mut rankings = Vec::new();
511
512        // Get thread statistics from memory analysis
513        for (thread_id, thread_stats) in &memory_analysis.thread_stats {
514            let allocation_efficiency = if thread_stats.total_allocations > 0 {
515                (thread_stats.total_deallocations as f32 / thread_stats.total_allocations as f32
516                    * 100.0)
517                    .min(100.0)
518            } else {
519                0.0
520            };
521
522            let resource_usage_score = self.calculate_thread_resource_score(*thread_id);
523
524            // Overall efficiency score combining allocation patterns and resource usage
525            let efficiency_score = (allocation_efficiency + resource_usage_score) / 2.0;
526
527            // Try to get thread name from resource timeline
528            let thread_name = self.get_thread_name(*thread_id);
529
530            rankings.push(ThreadPerformanceMetric {
531                thread_id: *thread_id,
532                thread_name,
533                efficiency_score,
534                resource_usage_score,
535                allocation_efficiency,
536            });
537        }
538
539        // Sort by efficiency score (highest first)
540        rankings.sort_by(|a, b| {
541            b.efficiency_score
542                .partial_cmp(&a.efficiency_score)
543                .unwrap_or(std::cmp::Ordering::Equal)
544        });
545
546        rankings
547    }
548
549    fn get_thread_name(&self, thread_id: u64) -> Option<String> {
550        let timeline = match self.resource_timeline.lock() {
551            Ok(timeline) => timeline,
552            Err(_) => return None,
553        };
554
555        // Look for thread name in any of the resource snapshots
556        for snapshot in timeline.iter() {
557            if let Some(thread_metrics) = snapshot.thread_metrics.get(&thread_id) {
558                if let Some(ref name) = thread_metrics.thread_name {
559                    if !name.trim().is_empty() {
560                        return Some(name.clone());
561                    }
562                }
563            }
564        }
565
566        None
567    }
568
569    fn calculate_thread_resource_score(&self, thread_id: u64) -> f32 {
570        // Calculate resource usage score for a specific thread
571        let timeline = match self.resource_timeline.lock() {
572            Ok(timeline) => timeline,
573            Err(_) => return 50.0,
574        };
575
576        let mut total_cpu_usage = 0.0f32;
577        let mut sample_count = 0;
578
579        for resource_snapshot in timeline.iter() {
580            if let Some(thread_metrics) = resource_snapshot.thread_metrics.get(&thread_id) {
581                total_cpu_usage += thread_metrics.cpu_usage_percent;
582                sample_count += 1;
583            }
584        }
585
586        if sample_count > 0 {
587            let avg_cpu_usage = total_cpu_usage / sample_count as f32;
588            // Score based on moderate CPU usage (not idle, not maxed out)
589            match avg_cpu_usage {
590                usage if (30.0..=70.0).contains(&usage) => 90.0,
591                usage if (20.0..=80.0).contains(&usage) => 75.0,
592                usage if (10.0..=90.0).contains(&usage) => 60.0,
593                _ => 40.0,
594            }
595        } else {
596            50.0 // Default score when no data available
597        }
598    }
599}
600
601/// Convenience function for quick comprehensive profiling
602pub fn comprehensive_profile_execution<F, R>(
603    output_dir: &std::path::Path,
604    execution_fn: F,
605) -> Result<(R, ComprehensiveAnalysis), Box<dyn std::error::Error>>
606where
607    F: FnOnce() -> R,
608{
609    let mut session = IntegratedProfilingSession::new(output_dir)?;
610    session.start_profiling()?;
611
612    let result = execution_fn();
613
614    let analysis = session.stop_profiling_and_analyze()?;
615
616    Ok((result, analysis))
617}
618
619#[cfg(test)]
620mod tests {
621    use super::*;
622    // Test module for resource integration
623
624    #[test]
625    fn test_integrated_profiling_session_creation() {
626        let temp_dir = std::env::temp_dir().join("memscope_test");
627        let result = IntegratedProfilingSession::new(&temp_dir);
628
629        match result {
630            Ok(_session) => {
631                // Session created successfully
632            }
633            Err(e) => {
634                // May fail on platforms without resource monitoring support
635                println!("Session creation failed: {}", e);
636            }
637        }
638    }
639
640    #[test]
641    fn test_comprehensive_profiling_function() {
642        let temp_dir = std::env::temp_dir().join("memscope_comprehensive_test");
643
644        let result = comprehensive_profile_execution(&temp_dir, || {
645            // Simulate some work
646            let mut data = Vec::new();
647            for i in 0..1000 {
648                data.push(vec![i; 100]);
649            }
650            data.len()
651        });
652
653        match result {
654            Ok((work_result, _analysis)) => {
655                assert_eq!(work_result, 1000);
656            }
657            Err(e) => {
658                // May fail on platforms without full support
659                println!("Comprehensive profiling failed: {}", e);
660            }
661        }
662    }
663
664    #[test]
665    fn test_bottleneck_identification() {
666        let correlations = CorrelationMetrics {
667            memory_cpu_correlation: 0.8,
668            memory_gpu_correlation: 0.1,
669            memory_io_correlation: 0.2,
670            allocation_rate_vs_cpu_usage: 0.7,
671            deallocation_rate_vs_memory_pressure: 0.3,
672        };
673
674        let temp_dir = std::env::temp_dir().join("memscope_bottleneck_test");
675        if let Ok(session) = IntegratedProfilingSession::new(&temp_dir) {
676            let bottleneck = session.identify_primary_bottleneck(&correlations);
677            matches!(bottleneck, BottleneckType::CpuBound);
678        }
679    }
680}