memscope_rs/export/
parallel_shard_processor.rs

1//! Parallel shard processor - high performance parallel JSON serialization
2//!
3//! This module implements parallel shard processing functionality, dividing large allocation data into shards for parallel processing,
4//! significantly improving JSON serialization performance, especially on multi-core systems.
5
6use crate::core::types::{AllocationInfo, TrackingError, TrackingResult};
7use crate::export::data_localizer::LocalizedExportData;
8use rayon::prelude::*;
9use serde_json;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::time::Instant;
12
13/// Parallel shard processor configuration
14#[derive(Debug, Clone)]
15pub struct ParallelShardConfig {
16    /// Size of each shard (allocation count)
17    pub shard_size: usize,
18    /// Parallel processing threshold (only enable parallel processing if the number of allocations exceeds this value)
19    pub parallel_threshold: usize,
20    /// Maximum number of threads (None means use system default)
21    pub max_threads: Option<usize>,
22    /// Whether to enable performance monitoring
23    pub enable_monitoring: bool,
24    /// Estimated JSON size per allocation (for pre-allocation)
25    pub estimated_json_size_per_allocation: usize,
26}
27
28impl Default for ParallelShardConfig {
29    fn default() -> Self {
30        Self {
31            shard_size: 1000,                        // Each shard contains 1000 allocations
32            parallel_threshold: 2000, // Only enable parallel processing if the number of allocations exceeds 2000
33            max_threads: None,        // Use system default number of threads
34            enable_monitoring: true,  // Enable performance monitoring
35            estimated_json_size_per_allocation: 200, // Estimated JSON size per allocation (for pre-allocation)
36        }
37    }
38}
39
40/// Processed shard data
41#[derive(Debug, Clone)]
42pub struct ProcessedShard {
43    /// Serialized JSON data
44    pub data: Vec<u8>,
45    /// Number of allocations in the shard
46    pub allocation_count: usize,
47    /// Shard index
48    pub shard_index: usize,
49    /// Processing time (milliseconds)
50    pub processing_time_ms: u64,
51}
52
53/// Parallel processing statistics
54#[derive(Debug, Clone)]
55pub struct ParallelProcessingStats {
56    /// Total number of allocations
57    pub total_allocations: usize,
58    /// Number of shards
59    pub shard_count: usize,
60    /// Number of threads used
61    pub threads_used: usize,
62    /// Total processing time (milliseconds)
63    pub total_processing_time_ms: u64,
64    /// Average processing time per shard (milliseconds)
65    pub avg_shard_processing_time_ms: f64,
66    /// Parallel efficiency (acceleration ratio compared to single thread)
67    pub parallel_efficiency: f64,
68    /// Throughput (allocations per second)
69    pub throughput_allocations_per_sec: f64,
70    /// Whether parallel processing was used
71    pub used_parallel_processing: bool,
72    /// Total output size (bytes)
73    pub total_output_size_bytes: usize,
74}
75
76/// Parallel shard processor
77pub struct ParallelShardProcessor {
78    /// Configuration
79    config: ParallelShardConfig,
80    /// Processing counter (for monitoring)
81    processed_count: AtomicUsize,
82}
83
84impl ParallelShardProcessor {
85    /// Create a new parallel shard processor
86    pub fn new(config: ParallelShardConfig) -> Self {
87        // If a maximum thread count is specified, set the rayon thread pool
88        if let Some(max_threads) = config.max_threads {
89            rayon::ThreadPoolBuilder::new()
90                .num_threads(max_threads)
91                .build_global()
92                .unwrap_or_else(|e| {
93                    tracing::warn!(
94                        "⚠️ Failed to set thread pool size to {}: {}",
95                        max_threads,
96                        e
97                    );
98                });
99        }
100
101        Self {
102            config,
103            processed_count: AtomicUsize::new(0),
104        }
105    }
106
107    /// Process allocations in parallel
108    pub fn process_allocations_parallel(
109        &self,
110        data: &LocalizedExportData,
111    ) -> TrackingResult<(Vec<ProcessedShard>, ParallelProcessingStats)> {
112        let start_time = Instant::now();
113        let allocations = &data.allocations;
114
115        tracing::info!(
116            "🔄 Starting parallel shard processing for {} allocations...",
117            allocations.len()
118        );
119
120        // Determine whether to use parallel processing
121        let use_parallel = allocations.len() >= self.config.parallel_threshold;
122        let actual_threads = if use_parallel {
123            rayon::current_num_threads()
124        } else {
125            1
126        };
127
128        tracing::info!(
129            "   Parallel mode: {}, threads: {}, shard size: {}",
130            if use_parallel { "enabled" } else { "disabled" },
131            actual_threads,
132            self.config.shard_size
133        );
134
135        // Reset counter for monitoring
136        self.processed_count.store(0, Ordering::Relaxed);
137
138        // Split data into shards
139        let shards: Vec<&[AllocationInfo]> = allocations.chunks(self.config.shard_size).collect();
140
141        tracing::info!("   Shard count: {}", shards.len());
142
143        // Parallel or serial processing of shards
144        let processed_shards: TrackingResult<Vec<ProcessedShard>> = if use_parallel {
145            shards
146                .into_par_iter()
147                .enumerate()
148                .map(|(index, shard)| self.process_shard_optimized(shard, index))
149                .collect()
150        } else {
151            shards
152                .into_iter()
153                .enumerate()
154                .map(|(index, shard)| self.process_shard_optimized(shard, index))
155                .collect()
156        };
157
158        let processed_shards = processed_shards?;
159        let total_time = start_time.elapsed();
160
161        // Calculate statistics
162        let stats = self.calculate_processing_stats(
163            &processed_shards,
164            allocations.len(),
165            actual_threads,
166            total_time.as_millis() as u64,
167            use_parallel,
168        );
169
170        // Print performance statistics
171        self.print_performance_stats(&stats);
172
173        Ok((processed_shards, stats))
174    }
175
176    /// Optimized shard processing method
177    fn process_shard_optimized(
178        &self,
179        shard: &[AllocationInfo],
180        shard_index: usize,
181    ) -> TrackingResult<ProcessedShard> {
182        let shard_start = Instant::now();
183
184        // Estimate output size and preallocate buffers
185        let estimated_size = shard.len() * self.config.estimated_json_size_per_allocation;
186        let mut output_buffer = Vec::with_capacity(estimated_size);
187
188        // Use serde_json's efficient API to serialize directly to byte vector
189        // This is more reliable than manual formatting and performs well
190        serde_json::to_writer(&mut output_buffer, shard).map_err(|e| {
191            TrackingError::ExportError(format!("Shard {shard_index} serialization failed: {e}"))
192        })?;
193
194        let processing_time = shard_start.elapsed();
195
196        // Update processed counter
197        self.processed_count
198            .fetch_add(shard.len(), Ordering::Relaxed);
199
200        // If monitoring is enabled, print progress
201        if self.config.enable_monitoring && shard_index % 10 == 0 {
202            let _processed = self.processed_count.load(Ordering::Relaxed);
203            tracing::info!(
204                "   Shard {shard_index} completed: {} allocations, {} bytes, {processing_time:?}",
205                shard.len(),
206                output_buffer.len(),
207            );
208        }
209
210        Ok(ProcessedShard {
211            data: output_buffer,
212            allocation_count: shard.len(),
213            shard_index,
214            processing_time_ms: processing_time.as_millis() as u64,
215        })
216    }
217
218    /// Calculate processing statistics
219    fn calculate_processing_stats(
220        &self,
221        shards: &[ProcessedShard],
222        total_allocations: usize,
223        threads_used: usize,
224        total_time_ms: u64,
225        used_parallel: bool,
226    ) -> ParallelProcessingStats {
227        let total_output_size: usize = shards.iter().map(|s| s.data.len()).sum();
228        let avg_shard_time: f64 = if !shards.is_empty() {
229            shards
230                .iter()
231                .map(|s| s.processing_time_ms as f64)
232                .sum::<f64>()
233                / shards.len() as f64
234        } else {
235            0.0
236        };
237
238        let throughput = if total_time_ms > 0 {
239            (total_allocations as f64 * 1000.0) / total_time_ms as f64
240        } else {
241            0.0
242        };
243
244        // Estimate parallel efficiency (simplified calculation)
245        let parallel_efficiency = if used_parallel && threads_used > 1 {
246            // In an ideal scenario, N threads should provide close to N times the performance
247            // Actual efficiency = Actual acceleration ratio / Theoretical acceleration ratio
248            let theoretical_speedup = threads_used as f64;
249            let estimated_sequential_time = avg_shard_time * shards.len() as f64;
250            let actual_speedup = if total_time_ms > 0 {
251                estimated_sequential_time / total_time_ms as f64
252            } else {
253                1.0
254            };
255            (actual_speedup / theoretical_speedup).min(1.0)
256        } else {
257            1.0 // Single thread efficiency is 100%
258        };
259
260        ParallelProcessingStats {
261            total_allocations,
262            shard_count: shards.len(),
263            threads_used,
264            total_processing_time_ms: total_time_ms,
265            avg_shard_processing_time_ms: avg_shard_time,
266            parallel_efficiency,
267            throughput_allocations_per_sec: throughput,
268            used_parallel_processing: used_parallel,
269            total_output_size_bytes: total_output_size,
270        }
271    }
272
273    /// Print performance statistics
274    fn print_performance_stats(&self, stats: &ParallelProcessingStats) {
275        tracing::info!("✅ Parallel shard processing completed:");
276        tracing::info!("   Total allocations: {}", stats.total_allocations);
277        tracing::info!("   Shard count: {}", stats.shard_count);
278        tracing::info!("   Threads used: {}", stats.threads_used);
279        tracing::info!("   Total time: {}ms", stats.total_processing_time_ms);
280        tracing::info!(
281            "   Average shard time: {:.2}ms",
282            stats.avg_shard_processing_time_ms
283        );
284        tracing::info!(
285            "   Throughput: {:.0} allocations/sec",
286            stats.throughput_allocations_per_sec
287        );
288        tracing::info!(
289            "   Output size: {:.2} MB",
290            stats.total_output_size_bytes as f64 / 1024.0 / 1024.0
291        );
292
293        if stats.used_parallel_processing {
294            tracing::info!(
295                "   Parallel efficiency: {:.1}%",
296                stats.parallel_efficiency * 100.0
297            );
298            let speedup = stats.parallel_efficiency * stats.threads_used as f64;
299            tracing::info!("   Actual speedup: {:.2}x", speedup);
300        }
301    }
302
303    /// Get current configuration
304    pub fn get_config(&self) -> &ParallelShardConfig {
305        &self.config
306    }
307
308    /// Update configuration
309    pub fn update_config(&mut self, config: ParallelShardConfig) {
310        self.config = config;
311    }
312
313    /// Get processed count
314    pub fn get_processed_count(&self) -> usize {
315        self.processed_count.load(Ordering::Relaxed)
316    }
317}
318
319impl Default for ParallelShardProcessor {
320    fn default() -> Self {
321        Self::new(ParallelShardConfig::default())
322    }
323}
324
325/// Convenience function: Fast parallel processing of allocation data
326pub fn process_allocations_fast(
327    data: &LocalizedExportData,
328) -> TrackingResult<(Vec<ProcessedShard>, ParallelProcessingStats)> {
329    let processor = ParallelShardProcessor::default();
330    processor.process_allocations_parallel(data)
331}
332
333/// Convenience function: Parallel processing with custom configuration
334pub fn process_allocations_with_config(
335    data: &LocalizedExportData,
336    config: ParallelShardConfig,
337) -> TrackingResult<(Vec<ProcessedShard>, ParallelProcessingStats)> {
338    let processor = ParallelShardProcessor::new(config);
339    processor.process_allocations_parallel(data)
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use crate::analysis::unsafe_ffi_tracker::UnsafeFFIStats;
346    use crate::core::types::{MemoryStats, ScopeInfo};
347    use std::time::Instant;
348
349    fn create_test_data(allocation_count: usize) -> LocalizedExportData {
350        let mut allocations = Vec::new();
351        for i in 0..allocation_count {
352            allocations.push(AllocationInfo {
353                ptr: 0x1000 + i,
354                size: 64 + (i % 100),
355                type_name: Some(format!("TestType{}", i % 10)),
356                var_name: Some(format!("var_{i}")),
357                scope_name: Some(format!("scope_{}", i % 5)),
358                timestamp_alloc: 1000000 + i as u64,
359                timestamp_dealloc: None,
360                thread_id: format!("test_thread_{}", i % 3),
361                borrow_count: 0,
362                stack_trace: None,
363                is_leaked: false,
364                lifetime_ms: None,
365                borrow_info: None,
366                clone_info: None,
367                ownership_history_available: false,
368                smart_pointer_info: None,
369                memory_layout: None,
370                generic_info: None,
371                dynamic_type_info: None,
372                runtime_state: None,
373                stack_allocation: None,
374                temporary_object: None,
375                fragmentation_analysis: None,
376                generic_instantiation: None,
377                type_relationships: None,
378                type_usage: None,
379                function_call_tracking: None,
380                lifecycle_tracking: None,
381                access_tracking: None,
382                drop_chain_analysis: None,
383            });
384        }
385
386        LocalizedExportData {
387            allocations,
388            enhanced_allocations: Vec::new(),
389            stats: MemoryStats::default(),
390            ffi_stats: UnsafeFFIStats::default(),
391            scope_info: Vec::<ScopeInfo>::new(),
392            timestamp: Instant::now(),
393        }
394    }
395
396    #[test]
397    fn test_parallel_shard_processor_creation() {
398        let config = ParallelShardConfig::default();
399        let processor = ParallelShardProcessor::new(config);
400        assert_eq!(processor.get_config().shard_size, 1000);
401    }
402
403    #[test]
404    fn test_small_dataset_sequential_processing() {
405        let data = create_test_data(100); // Small dataset, should use sequential processing
406        let processor = ParallelShardProcessor::default();
407
408        let result = processor.process_allocations_parallel(&data);
409        assert!(result.is_ok());
410
411        let (shards, stats) = result.expect("Failed to process allocations");
412        assert_eq!(stats.total_allocations, 100);
413        assert!(!stats.used_parallel_processing); // Should use sequential processing
414        assert_eq!(shards.len(), 1); // Only one shard
415    }
416
417    #[test]
418    fn test_parallel_processing() {
419        let data = create_test_data(20); // Small dataset for fast testing
420        let processor = ParallelShardProcessor::default();
421
422        let result = processor.process_allocations_parallel(&data);
423        assert!(result.is_ok());
424
425        let (shards, stats) = result.expect("Test operation failed");
426        assert_eq!(stats.total_allocations, 20);
427        assert!(!shards.is_empty());
428    }
429
430    #[test]
431    fn test_custom_config() {
432        let config = ParallelShardConfig {
433            shard_size: 500,
434            parallel_threshold: 1000,
435            max_threads: Some(2),
436            enable_monitoring: false,
437            estimated_json_size_per_allocation: 150,
438        };
439
440        let data = create_test_data(2000);
441        let processor = ParallelShardProcessor::new(config);
442
443        let result = processor.process_allocations_parallel(&data);
444        assert!(result.is_ok());
445
446        let (shards, stats) = result.expect("Test operation failed");
447        assert_eq!(stats.total_allocations, 2000);
448        assert_eq!(shards.len(), 4); // 2000 / 500 = 4 shards
449    }
450
451    #[test]
452    fn test_convenience_functions() {
453        let data = create_test_data(1500);
454
455        // Test fast processing function
456        let result = process_allocations_fast(&data);
457        assert!(result.is_ok());
458
459        // Test custom configuration function
460        let config = ParallelShardConfig {
461            shard_size: 300,
462            ..Default::default()
463        };
464        let result = process_allocations_with_config(&data, config);
465        assert!(result.is_ok());
466
467        let (shards, _) = result.expect("Test operation failed");
468        assert_eq!(shards.len(), 5); // 1500 / 300 = 5 shards
469    }
470
471    #[test]
472    fn test_processed_shard_structure() {
473        let data = create_test_data(100);
474        let processor = ParallelShardProcessor::default();
475
476        let result = processor.process_allocations_parallel(&data);
477        assert!(result.is_ok());
478
479        let (shards, _) = result.expect("Failed to process allocations");
480        assert_eq!(shards.len(), 1);
481
482        let shard = &shards[0];
483        assert_eq!(shard.allocation_count, 100);
484        assert_eq!(shard.shard_index, 0);
485        assert!(!shard.data.is_empty());
486        // processing_time_ms is u64, always >= 0, so just check it exists
487        assert!(shard.processing_time_ms < u64::MAX);
488
489        // Verify that the JSON data is valid
490        let parsed: Result<Vec<AllocationInfo>, _> = serde_json::from_slice(&shard.data);
491        assert!(parsed.is_ok());
492        assert_eq!(parsed.expect("Failed to parse JSON").len(), 100);
493    }
494}