1use crate::core::types::{AllocationInfo, TrackingError, TrackingResult};
7use crate::export::data_localizer::LocalizedExportData;
8use rayon::prelude::*;
9use serde_json;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::time::Instant;
12
13#[derive(Debug, Clone)]
15pub struct ParallelShardConfig {
16 pub shard_size: usize,
18 pub parallel_threshold: usize,
20 pub max_threads: Option<usize>,
22 pub enable_monitoring: bool,
24 pub estimated_json_size_per_allocation: usize,
26}
27
28impl Default for ParallelShardConfig {
29 fn default() -> Self {
30 Self {
31 shard_size: 1000, parallel_threshold: 2000, max_threads: None, enable_monitoring: true, estimated_json_size_per_allocation: 200, }
37 }
38}
39
40#[derive(Debug, Clone)]
42pub struct ProcessedShard {
43 pub data: Vec<u8>,
45 pub allocation_count: usize,
47 pub shard_index: usize,
49 pub processing_time_ms: u64,
51}
52
53#[derive(Debug, Clone)]
55pub struct ParallelProcessingStats {
56 pub total_allocations: usize,
58 pub shard_count: usize,
60 pub threads_used: usize,
62 pub total_processing_time_ms: u64,
64 pub avg_shard_processing_time_ms: f64,
66 pub parallel_efficiency: f64,
68 pub throughput_allocations_per_sec: f64,
70 pub used_parallel_processing: bool,
72 pub total_output_size_bytes: usize,
74}
75
76pub struct ParallelShardProcessor {
78 config: ParallelShardConfig,
80 processed_count: AtomicUsize,
82}
83
84impl ParallelShardProcessor {
85 pub fn new(config: ParallelShardConfig) -> Self {
87 if let Some(max_threads) = config.max_threads {
89 rayon::ThreadPoolBuilder::new()
90 .num_threads(max_threads)
91 .build_global()
92 .unwrap_or_else(|e| {
93 eprintln!(
94 "⚠️ Failed to set thread pool size to {}: {}",
95 max_threads, e
96 );
97 });
98 }
99
100 Self {
101 config,
102 processed_count: AtomicUsize::new(0),
103 }
104 }
105
106 pub fn process_allocations_parallel(
108 &self,
109 data: &LocalizedExportData,
110 ) -> TrackingResult<(Vec<ProcessedShard>, ParallelProcessingStats)> {
111 let start_time = Instant::now();
112 let allocations = &data.allocations;
113
114 println!(
115 "🔄 Starting parallel shard processing for {} allocations...",
116 allocations.len()
117 );
118
119 let use_parallel = allocations.len() >= self.config.parallel_threshold;
121 let actual_threads = if use_parallel {
122 rayon::current_num_threads()
123 } else {
124 1
125 };
126
127 println!(
128 " Parallel mode: {}, threads: {}, shard size: {}",
129 if use_parallel { "enabled" } else { "disabled" },
130 actual_threads,
131 self.config.shard_size
132 );
133
134 self.processed_count.store(0, Ordering::Relaxed);
136
137 let shards: Vec<&[AllocationInfo]> = allocations.chunks(self.config.shard_size).collect();
139
140 println!(" Shard count: {}", shards.len());
141
142 let processed_shards: TrackingResult<Vec<ProcessedShard>> = if use_parallel {
144 shards
145 .into_par_iter()
146 .enumerate()
147 .map(|(index, shard)| self.process_shard_optimized(shard, index))
148 .collect()
149 } else {
150 shards
151 .into_iter()
152 .enumerate()
153 .map(|(index, shard)| self.process_shard_optimized(shard, index))
154 .collect()
155 };
156
157 let processed_shards = processed_shards?;
158 let total_time = start_time.elapsed();
159
160 let stats = self.calculate_processing_stats(
162 &processed_shards,
163 allocations.len(),
164 actual_threads,
165 total_time.as_millis() as u64,
166 use_parallel,
167 );
168
169 self.print_performance_stats(&stats);
171
172 Ok((processed_shards, stats))
173 }
174
175 fn process_shard_optimized(
177 &self,
178 shard: &[AllocationInfo],
179 shard_index: usize,
180 ) -> TrackingResult<ProcessedShard> {
181 let shard_start = Instant::now();
182
183 let estimated_size = shard.len() * self.config.estimated_json_size_per_allocation;
185 let mut output_buffer = Vec::with_capacity(estimated_size);
186
187 serde_json::to_writer(&mut output_buffer, shard).map_err(|e| {
190 TrackingError::ExportError(format!("Shard {} serialization failed: {}", shard_index, e))
191 })?;
192
193 let processing_time = shard_start.elapsed();
194
195 self.processed_count
197 .fetch_add(shard.len(), Ordering::Relaxed);
198
199 if self.config.enable_monitoring && shard_index % 10 == 0 {
201 let _processed = self.processed_count.load(Ordering::Relaxed);
202 println!(
203 " Shard {} completed: {} allocations, {} bytes, {:?}",
204 shard_index,
205 shard.len(),
206 output_buffer.len(),
207 processing_time
208 );
209 }
210
211 Ok(ProcessedShard {
212 data: output_buffer,
213 allocation_count: shard.len(),
214 shard_index,
215 processing_time_ms: processing_time.as_millis() as u64,
216 })
217 }
218
219 fn calculate_processing_stats(
221 &self,
222 shards: &[ProcessedShard],
223 total_allocations: usize,
224 threads_used: usize,
225 total_time_ms: u64,
226 used_parallel: bool,
227 ) -> ParallelProcessingStats {
228 let total_output_size: usize = shards.iter().map(|s| s.data.len()).sum();
229 let avg_shard_time: f64 = if !shards.is_empty() {
230 shards
231 .iter()
232 .map(|s| s.processing_time_ms as f64)
233 .sum::<f64>()
234 / shards.len() as f64
235 } else {
236 0.0
237 };
238
239 let throughput = if total_time_ms > 0 {
240 (total_allocations as f64 * 1000.0) / total_time_ms as f64
241 } else {
242 0.0
243 };
244
245 let parallel_efficiency = if used_parallel && threads_used > 1 {
247 let theoretical_speedup = threads_used as f64;
250 let estimated_sequential_time = avg_shard_time * shards.len() as f64;
251 let actual_speedup = if total_time_ms > 0 {
252 estimated_sequential_time / total_time_ms as f64
253 } else {
254 1.0
255 };
256 (actual_speedup / theoretical_speedup).min(1.0)
257 } else {
258 1.0 };
260
261 ParallelProcessingStats {
262 total_allocations,
263 shard_count: shards.len(),
264 threads_used,
265 total_processing_time_ms: total_time_ms,
266 avg_shard_processing_time_ms: avg_shard_time,
267 parallel_efficiency,
268 throughput_allocations_per_sec: throughput,
269 used_parallel_processing: used_parallel,
270 total_output_size_bytes: total_output_size,
271 }
272 }
273
274 fn print_performance_stats(&self, stats: &ParallelProcessingStats) {
276 println!("✅ Parallel shard processing completed:");
277 println!(" Total allocations: {}", stats.total_allocations);
278 println!(" Shard count: {}", stats.shard_count);
279 println!(" Threads used: {}", stats.threads_used);
280 println!(" Total time: {}ms", stats.total_processing_time_ms);
281 println!(
282 " Average shard time: {:.2}ms",
283 stats.avg_shard_processing_time_ms
284 );
285 println!(
286 " Throughput: {:.0} allocations/sec",
287 stats.throughput_allocations_per_sec
288 );
289 println!(
290 " Output size: {:.2} MB",
291 stats.total_output_size_bytes as f64 / 1024.0 / 1024.0
292 );
293
294 if stats.used_parallel_processing {
295 println!(
296 " Parallel efficiency: {:.1}%",
297 stats.parallel_efficiency * 100.0
298 );
299 let speedup = stats.parallel_efficiency * stats.threads_used as f64;
300 println!(" Actual speedup: {:.2}x", speedup);
301 }
302 }
303
304 pub fn get_config(&self) -> &ParallelShardConfig {
306 &self.config
307 }
308
309 pub fn update_config(&mut self, config: ParallelShardConfig) {
311 self.config = config;
312 }
313
314 pub fn get_processed_count(&self) -> usize {
316 self.processed_count.load(Ordering::Relaxed)
317 }
318}
319
320impl Default for ParallelShardProcessor {
321 fn default() -> Self {
322 Self::new(ParallelShardConfig::default())
323 }
324}
325
326pub fn process_allocations_fast(
328 data: &LocalizedExportData,
329) -> TrackingResult<(Vec<ProcessedShard>, ParallelProcessingStats)> {
330 let processor = ParallelShardProcessor::default();
331 processor.process_allocations_parallel(data)
332}
333
334pub fn process_allocations_with_config(
336 data: &LocalizedExportData,
337 config: ParallelShardConfig,
338) -> TrackingResult<(Vec<ProcessedShard>, ParallelProcessingStats)> {
339 let processor = ParallelShardProcessor::new(config);
340 processor.process_allocations_parallel(data)
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346 use crate::analysis::unsafe_ffi_tracker::UnsafeFFIStats;
347 use crate::core::types::{MemoryStats, ScopeInfo};
348 use std::time::Instant;
349
350 fn create_test_data(allocation_count: usize) -> LocalizedExportData {
351 let mut allocations = Vec::new();
352 for i in 0..allocation_count {
353 allocations.push(AllocationInfo {
354 ptr: 0x1000 + i,
355 size: 64 + (i % 100),
356 type_name: Some(format!("TestType{}", i % 10)),
357 var_name: Some(format!("var_{}", i)),
358 scope_name: Some(format!("scope_{}", i % 5)),
359 timestamp_alloc: 1000000 + i as u64,
360 timestamp_dealloc: None,
361 thread_id: format!("test_thread_{}", i % 3),
362 borrow_count: 0,
363 stack_trace: None,
364 is_leaked: false,
365 lifetime_ms: None,
366 smart_pointer_info: None,
367 memory_layout: None,
368 generic_info: None,
369 dynamic_type_info: None,
370 runtime_state: None,
371 stack_allocation: None,
372 temporary_object: None,
373 fragmentation_analysis: None,
374 generic_instantiation: None,
375 type_relationships: None,
376 type_usage: None,
377 function_call_tracking: None,
378 lifecycle_tracking: None,
379 access_tracking: None,
380 });
381 }
382
383 LocalizedExportData {
384 allocations,
385 enhanced_allocations: Vec::new(),
386 stats: MemoryStats::default(),
387 ffi_stats: UnsafeFFIStats::default(),
388 scope_info: Vec::<ScopeInfo>::new(),
389 timestamp: Instant::now(),
390 }
391 }
392
393 #[test]
394 fn test_parallel_shard_processor_creation() {
395 let config = ParallelShardConfig::default();
396 let processor = ParallelShardProcessor::new(config);
397 assert_eq!(processor.get_config().shard_size, 1000);
398 }
399
400 #[test]
401 fn test_small_dataset_sequential_processing() {
402 let data = create_test_data(100); let processor = ParallelShardProcessor::default();
404
405 let result = processor.process_allocations_parallel(&data);
406 assert!(result.is_ok());
407
408 let (shards, stats) = result.unwrap();
409 assert_eq!(stats.total_allocations, 100);
410 assert!(!stats.used_parallel_processing); assert_eq!(shards.len(), 1); }
413
414 #[test]
415 fn test_large_dataset_parallel_processing() {
416 let data = create_test_data(5000); let processor = ParallelShardProcessor::default();
418
419 let result = processor.process_allocations_parallel(&data);
420 assert!(result.is_ok());
421
422 let (shards, stats) = result.unwrap();
423 assert_eq!(stats.total_allocations, 5000);
424 assert!(stats.used_parallel_processing); assert!(shards.len() > 1); let total_processed: usize = shards.iter().map(|s| s.allocation_count).sum();
429 assert_eq!(total_processed, 5000);
430 }
431
432 #[test]
433 fn test_custom_config() {
434 let config = ParallelShardConfig {
435 shard_size: 500,
436 parallel_threshold: 1000,
437 max_threads: Some(2),
438 enable_monitoring: false,
439 estimated_json_size_per_allocation: 150,
440 };
441
442 let data = create_test_data(2000);
443 let processor = ParallelShardProcessor::new(config);
444
445 let result = processor.process_allocations_parallel(&data);
446 assert!(result.is_ok());
447
448 let (shards, stats) = result.unwrap();
449 assert_eq!(stats.total_allocations, 2000);
450 assert_eq!(shards.len(), 4); }
452
453 #[test]
454 fn test_convenience_functions() {
455 let data = create_test_data(1500);
456
457 let result = process_allocations_fast(&data);
459 assert!(result.is_ok());
460
461 let config = ParallelShardConfig {
463 shard_size: 300,
464 ..Default::default()
465 };
466 let result = process_allocations_with_config(&data, config);
467 assert!(result.is_ok());
468
469 let (shards, _) = result.unwrap();
470 assert_eq!(shards.len(), 5); }
472
473 #[test]
474 fn test_processed_shard_structure() {
475 let data = create_test_data(100);
476 let processor = ParallelShardProcessor::default();
477
478 let result = processor.process_allocations_parallel(&data);
479 assert!(result.is_ok());
480
481 let (shards, _) = result.unwrap();
482 assert_eq!(shards.len(), 1);
483
484 let shard = &shards[0];
485 assert_eq!(shard.allocation_count, 100);
486 assert_eq!(shard.shard_index, 0);
487 assert!(!shard.data.is_empty());
488 assert!(shard.processing_time_ms < u64::MAX);
490
491 let parsed: Result<Vec<AllocationInfo>, _> = serde_json::from_slice(&shard.data);
493 assert!(parsed.is_ok());
494 assert_eq!(parsed.unwrap().len(), 100);
495 }
496}