memscope_rs/export/
parallel_shard_processor.rs

1//! Parallel shard processor - high performance parallel JSON serialization
2//!
3//! This module implements parallel shard processing functionality, dividing large allocation data into shards for parallel processing,
4//! significantly improving JSON serialization performance, especially on multi-core systems.
5
6use crate::core::types::{AllocationInfo, TrackingError, TrackingResult};
7use crate::export::data_localizer::LocalizedExportData;
8use rayon::prelude::*;
9use serde_json;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::time::Instant;
12
13/// Parallel shard processor configuration
14#[derive(Debug, Clone)]
15pub struct ParallelShardConfig {
16    /// Size of each shard (allocation count)
17    pub shard_size: usize,
18    /// Parallel processing threshold (only enable parallel processing if the number of allocations exceeds this value)
19    pub parallel_threshold: usize,
20    /// Maximum number of threads (None means use system default)
21    pub max_threads: Option<usize>,
22    /// Whether to enable performance monitoring
23    pub enable_monitoring: bool,
24    /// Estimated JSON size per allocation (for pre-allocation)
25    pub estimated_json_size_per_allocation: usize,
26}
27
28impl Default for ParallelShardConfig {
29    fn default() -> Self {
30        Self {
31            shard_size: 1000,                        // Each shard contains 1000 allocations
32            parallel_threshold: 2000, // Only enable parallel processing if the number of allocations exceeds 2000
33            max_threads: None,        // Use system default number of threads
34            enable_monitoring: true,  // Enable performance monitoring
35            estimated_json_size_per_allocation: 200, // Estimated JSON size per allocation (for pre-allocation)
36        }
37    }
38}
39
40/// Processed shard data
41#[derive(Debug, Clone)]
42pub struct ProcessedShard {
43    /// Serialized JSON data
44    pub data: Vec<u8>,
45    /// Number of allocations in the shard
46    pub allocation_count: usize,
47    /// Shard index
48    pub shard_index: usize,
49    /// Processing time (milliseconds)
50    pub processing_time_ms: u64,
51}
52
53/// Parallel processing statistics
54#[derive(Debug, Clone)]
55pub struct ParallelProcessingStats {
56    /// Total number of allocations
57    pub total_allocations: usize,
58    /// Number of shards
59    pub shard_count: usize,
60    /// Number of threads used
61    pub threads_used: usize,
62    /// Total processing time (milliseconds)
63    pub total_processing_time_ms: u64,
64    /// Average processing time per shard (milliseconds)
65    pub avg_shard_processing_time_ms: f64,
66    /// Parallel efficiency (acceleration ratio compared to single thread)
67    pub parallel_efficiency: f64,
68    /// Throughput (allocations per second)
69    pub throughput_allocations_per_sec: f64,
70    /// Whether parallel processing was used
71    pub used_parallel_processing: bool,
72    /// Total output size (bytes)
73    pub total_output_size_bytes: usize,
74}
75
76/// Parallel shard processor
77pub struct ParallelShardProcessor {
78    /// Configuration
79    config: ParallelShardConfig,
80    /// Processing counter (for monitoring)
81    processed_count: AtomicUsize,
82}
83
84impl ParallelShardProcessor {
85    /// Create a new parallel shard processor
86    pub fn new(config: ParallelShardConfig) -> Self {
87        // If a maximum thread count is specified, set the rayon thread pool
88        if let Some(max_threads) = config.max_threads {
89            rayon::ThreadPoolBuilder::new()
90                .num_threads(max_threads)
91                .build_global()
92                .unwrap_or_else(|e| {
93                    eprintln!(
94                        "⚠️ Failed to set thread pool size to {}: {}",
95                        max_threads, e
96                    );
97                });
98        }
99
100        Self {
101            config,
102            processed_count: AtomicUsize::new(0),
103        }
104    }
105
106    /// Process allocations in parallel
107    pub fn process_allocations_parallel(
108        &self,
109        data: &LocalizedExportData,
110    ) -> TrackingResult<(Vec<ProcessedShard>, ParallelProcessingStats)> {
111        let start_time = Instant::now();
112        let allocations = &data.allocations;
113
114        println!(
115            "🔄 Starting parallel shard processing for {} allocations...",
116            allocations.len()
117        );
118
119        // Determine whether to use parallel processing
120        let use_parallel = allocations.len() >= self.config.parallel_threshold;
121        let actual_threads = if use_parallel {
122            rayon::current_num_threads()
123        } else {
124            1
125        };
126
127        println!(
128            "   Parallel mode: {}, threads: {}, shard size: {}",
129            if use_parallel { "enabled" } else { "disabled" },
130            actual_threads,
131            self.config.shard_size
132        );
133
134        // Reset counter for monitoring
135        self.processed_count.store(0, Ordering::Relaxed);
136
137        // Split data into shards
138        let shards: Vec<&[AllocationInfo]> = allocations.chunks(self.config.shard_size).collect();
139
140        println!("   Shard count: {}", shards.len());
141
142        // Parallel or serial processing of shards
143        let processed_shards: TrackingResult<Vec<ProcessedShard>> = if use_parallel {
144            shards
145                .into_par_iter()
146                .enumerate()
147                .map(|(index, shard)| self.process_shard_optimized(shard, index))
148                .collect()
149        } else {
150            shards
151                .into_iter()
152                .enumerate()
153                .map(|(index, shard)| self.process_shard_optimized(shard, index))
154                .collect()
155        };
156
157        let processed_shards = processed_shards?;
158        let total_time = start_time.elapsed();
159
160        // Calculate statistics
161        let stats = self.calculate_processing_stats(
162            &processed_shards,
163            allocations.len(),
164            actual_threads,
165            total_time.as_millis() as u64,
166            use_parallel,
167        );
168
169        // Print performance statistics
170        self.print_performance_stats(&stats);
171
172        Ok((processed_shards, stats))
173    }
174
175    /// Optimized shard processing method
176    fn process_shard_optimized(
177        &self,
178        shard: &[AllocationInfo],
179        shard_index: usize,
180    ) -> TrackingResult<ProcessedShard> {
181        let shard_start = Instant::now();
182
183        // Estimate output size and preallocate buffers
184        let estimated_size = shard.len() * self.config.estimated_json_size_per_allocation;
185        let mut output_buffer = Vec::with_capacity(estimated_size);
186
187        // Use serde_json's efficient API to serialize directly to byte vector
188        // This is more reliable than manual formatting and performs well
189        serde_json::to_writer(&mut output_buffer, shard).map_err(|e| {
190            TrackingError::ExportError(format!("Shard {} serialization failed: {}", shard_index, e))
191        })?;
192
193        let processing_time = shard_start.elapsed();
194
195        // Update processed counter
196        self.processed_count
197            .fetch_add(shard.len(), Ordering::Relaxed);
198
199        // If monitoring is enabled, print progress
200        if self.config.enable_monitoring && shard_index % 10 == 0 {
201            let _processed = self.processed_count.load(Ordering::Relaxed);
202            println!(
203                "   Shard {} completed: {} allocations, {} bytes, {:?}",
204                shard_index,
205                shard.len(),
206                output_buffer.len(),
207                processing_time
208            );
209        }
210
211        Ok(ProcessedShard {
212            data: output_buffer,
213            allocation_count: shard.len(),
214            shard_index,
215            processing_time_ms: processing_time.as_millis() as u64,
216        })
217    }
218
219    /// Calculate processing statistics
220    fn calculate_processing_stats(
221        &self,
222        shards: &[ProcessedShard],
223        total_allocations: usize,
224        threads_used: usize,
225        total_time_ms: u64,
226        used_parallel: bool,
227    ) -> ParallelProcessingStats {
228        let total_output_size: usize = shards.iter().map(|s| s.data.len()).sum();
229        let avg_shard_time: f64 = if !shards.is_empty() {
230            shards
231                .iter()
232                .map(|s| s.processing_time_ms as f64)
233                .sum::<f64>()
234                / shards.len() as f64
235        } else {
236            0.0
237        };
238
239        let throughput = if total_time_ms > 0 {
240            (total_allocations as f64 * 1000.0) / total_time_ms as f64
241        } else {
242            0.0
243        };
244
245        // Estimate parallel efficiency (simplified calculation)
246        let parallel_efficiency = if used_parallel && threads_used > 1 {
247            // In an ideal scenario, N threads should provide close to N times the performance
248            // Actual efficiency = Actual acceleration ratio / Theoretical acceleration ratio
249            let theoretical_speedup = threads_used as f64;
250            let estimated_sequential_time = avg_shard_time * shards.len() as f64;
251            let actual_speedup = if total_time_ms > 0 {
252                estimated_sequential_time / total_time_ms as f64
253            } else {
254                1.0
255            };
256            (actual_speedup / theoretical_speedup).min(1.0)
257        } else {
258            1.0 // Single thread efficiency is 100%
259        };
260
261        ParallelProcessingStats {
262            total_allocations,
263            shard_count: shards.len(),
264            threads_used,
265            total_processing_time_ms: total_time_ms,
266            avg_shard_processing_time_ms: avg_shard_time,
267            parallel_efficiency,
268            throughput_allocations_per_sec: throughput,
269            used_parallel_processing: used_parallel,
270            total_output_size_bytes: total_output_size,
271        }
272    }
273
274    /// Print performance statistics
275    fn print_performance_stats(&self, stats: &ParallelProcessingStats) {
276        println!("✅ Parallel shard processing completed:");
277        println!("   Total allocations: {}", stats.total_allocations);
278        println!("   Shard count: {}", stats.shard_count);
279        println!("   Threads used: {}", stats.threads_used);
280        println!("   Total time: {}ms", stats.total_processing_time_ms);
281        println!(
282            "   Average shard time: {:.2}ms",
283            stats.avg_shard_processing_time_ms
284        );
285        println!(
286            "   Throughput: {:.0} allocations/sec",
287            stats.throughput_allocations_per_sec
288        );
289        println!(
290            "   Output size: {:.2} MB",
291            stats.total_output_size_bytes as f64 / 1024.0 / 1024.0
292        );
293
294        if stats.used_parallel_processing {
295            println!(
296                "   Parallel efficiency: {:.1}%",
297                stats.parallel_efficiency * 100.0
298            );
299            let speedup = stats.parallel_efficiency * stats.threads_used as f64;
300            println!("   Actual speedup: {:.2}x", speedup);
301        }
302    }
303
304    /// Get current configuration
305    pub fn get_config(&self) -> &ParallelShardConfig {
306        &self.config
307    }
308
309    /// Update configuration
310    pub fn update_config(&mut self, config: ParallelShardConfig) {
311        self.config = config;
312    }
313
314    /// Get processed count
315    pub fn get_processed_count(&self) -> usize {
316        self.processed_count.load(Ordering::Relaxed)
317    }
318}
319
320impl Default for ParallelShardProcessor {
321    fn default() -> Self {
322        Self::new(ParallelShardConfig::default())
323    }
324}
325
326/// Convenience function: Fast parallel processing of allocation data
327pub fn process_allocations_fast(
328    data: &LocalizedExportData,
329) -> TrackingResult<(Vec<ProcessedShard>, ParallelProcessingStats)> {
330    let processor = ParallelShardProcessor::default();
331    processor.process_allocations_parallel(data)
332}
333
334/// Convenience function: Parallel processing with custom configuration
335pub fn process_allocations_with_config(
336    data: &LocalizedExportData,
337    config: ParallelShardConfig,
338) -> TrackingResult<(Vec<ProcessedShard>, ParallelProcessingStats)> {
339    let processor = ParallelShardProcessor::new(config);
340    processor.process_allocations_parallel(data)
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346    use crate::analysis::unsafe_ffi_tracker::UnsafeFFIStats;
347    use crate::core::types::{MemoryStats, ScopeInfo};
348    use std::time::Instant;
349
350    fn create_test_data(allocation_count: usize) -> LocalizedExportData {
351        let mut allocations = Vec::new();
352        for i in 0..allocation_count {
353            allocations.push(AllocationInfo {
354                ptr: 0x1000 + i,
355                size: 64 + (i % 100),
356                type_name: Some(format!("TestType{}", i % 10)),
357                var_name: Some(format!("var_{}", i)),
358                scope_name: Some(format!("scope_{}", i % 5)),
359                timestamp_alloc: 1000000 + i as u64,
360                timestamp_dealloc: None,
361                thread_id: format!("test_thread_{}", i % 3),
362                borrow_count: 0,
363                stack_trace: None,
364                is_leaked: false,
365                lifetime_ms: None,
366                smart_pointer_info: None,
367                memory_layout: None,
368                generic_info: None,
369                dynamic_type_info: None,
370                runtime_state: None,
371                stack_allocation: None,
372                temporary_object: None,
373                fragmentation_analysis: None,
374                generic_instantiation: None,
375                type_relationships: None,
376                type_usage: None,
377                function_call_tracking: None,
378                lifecycle_tracking: None,
379                access_tracking: None,
380            });
381        }
382
383        LocalizedExportData {
384            allocations,
385            enhanced_allocations: Vec::new(),
386            stats: MemoryStats::default(),
387            ffi_stats: UnsafeFFIStats::default(),
388            scope_info: Vec::<ScopeInfo>::new(),
389            timestamp: Instant::now(),
390        }
391    }
392
393    #[test]
394    fn test_parallel_shard_processor_creation() {
395        let config = ParallelShardConfig::default();
396        let processor = ParallelShardProcessor::new(config);
397        assert_eq!(processor.get_config().shard_size, 1000);
398    }
399
400    #[test]
401    fn test_small_dataset_sequential_processing() {
402        let data = create_test_data(100); // Small dataset, should use sequential processing
403        let processor = ParallelShardProcessor::default();
404
405        let result = processor.process_allocations_parallel(&data);
406        assert!(result.is_ok());
407
408        let (shards, stats) = result.unwrap();
409        assert_eq!(stats.total_allocations, 100);
410        assert!(!stats.used_parallel_processing); // Should use sequential processing
411        assert_eq!(shards.len(), 1); // Only one shard
412    }
413
414    #[test]
415    fn test_large_dataset_parallel_processing() {
416        let data = create_test_data(5000); // Large dataset, should use parallel processing
417        let processor = ParallelShardProcessor::default();
418
419        let result = processor.process_allocations_parallel(&data);
420        assert!(result.is_ok());
421
422        let (shards, stats) = result.unwrap();
423        assert_eq!(stats.total_allocations, 5000);
424        assert!(stats.used_parallel_processing); // Should use parallel processing
425        assert!(shards.len() > 1); // Should have multiple shards
426
427        // Verify that the total number of allocations processed equals the original data
428        let total_processed: usize = shards.iter().map(|s| s.allocation_count).sum();
429        assert_eq!(total_processed, 5000);
430    }
431
432    #[test]
433    fn test_custom_config() {
434        let config = ParallelShardConfig {
435            shard_size: 500,
436            parallel_threshold: 1000,
437            max_threads: Some(2),
438            enable_monitoring: false,
439            estimated_json_size_per_allocation: 150,
440        };
441
442        let data = create_test_data(2000);
443        let processor = ParallelShardProcessor::new(config);
444
445        let result = processor.process_allocations_parallel(&data);
446        assert!(result.is_ok());
447
448        let (shards, stats) = result.unwrap();
449        assert_eq!(stats.total_allocations, 2000);
450        assert_eq!(shards.len(), 4); // 2000 / 500 = 4 shards
451    }
452
453    #[test]
454    fn test_convenience_functions() {
455        let data = create_test_data(1500);
456
457        // Test fast processing function
458        let result = process_allocations_fast(&data);
459        assert!(result.is_ok());
460
461        // Test custom configuration function
462        let config = ParallelShardConfig {
463            shard_size: 300,
464            ..Default::default()
465        };
466        let result = process_allocations_with_config(&data, config);
467        assert!(result.is_ok());
468
469        let (shards, _) = result.unwrap();
470        assert_eq!(shards.len(), 5); // 1500 / 300 = 5 shards
471    }
472
473    #[test]
474    fn test_processed_shard_structure() {
475        let data = create_test_data(100);
476        let processor = ParallelShardProcessor::default();
477
478        let result = processor.process_allocations_parallel(&data);
479        assert!(result.is_ok());
480
481        let (shards, _) = result.unwrap();
482        assert_eq!(shards.len(), 1);
483
484        let shard = &shards[0];
485        assert_eq!(shard.allocation_count, 100);
486        assert_eq!(shard.shard_index, 0);
487        assert!(!shard.data.is_empty());
488        // processing_time_ms is u64, always >= 0, so just check it exists
489        assert!(shard.processing_time_ms < u64::MAX);
490
491        // Verify that the JSON data is valid
492        let parsed: Result<Vec<AllocationInfo>, _> = serde_json::from_slice(&shard.data);
493        assert!(parsed.is_ok());
494        assert_eq!(parsed.unwrap().len(), 100);
495    }
496}