1use crate::core::types::{AllocationInfo, TrackingError, TrackingResult};
7use crate::export::data_localizer::LocalizedExportData;
8use rayon::prelude::*;
9use serde_json;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::time::Instant;
12
13#[derive(Debug, Clone)]
15pub struct ParallelShardConfig {
16 pub shard_size: usize,
18 pub parallel_threshold: usize,
20 pub max_threads: Option<usize>,
22 pub enable_monitoring: bool,
24 pub estimated_json_size_per_allocation: usize,
26}
27
28impl Default for ParallelShardConfig {
29 fn default() -> Self {
30 Self {
31 shard_size: 1000, parallel_threshold: 2000, max_threads: None, enable_monitoring: true, estimated_json_size_per_allocation: 200, }
37 }
38}
39
40#[derive(Debug, Clone)]
42pub struct ProcessedShard {
43 pub data: Vec<u8>,
45 pub allocation_count: usize,
47 pub shard_index: usize,
49 pub processing_time_ms: u64,
51}
52
53#[derive(Debug, Clone)]
55pub struct ParallelProcessingStats {
56 pub total_allocations: usize,
58 pub shard_count: usize,
60 pub threads_used: usize,
62 pub total_processing_time_ms: u64,
64 pub avg_shard_processing_time_ms: f64,
66 pub parallel_efficiency: f64,
68 pub throughput_allocations_per_sec: f64,
70 pub used_parallel_processing: bool,
72 pub total_output_size_bytes: usize,
74}
75
76pub struct ParallelShardProcessor {
78 config: ParallelShardConfig,
80 processed_count: AtomicUsize,
82}
83
84impl ParallelShardProcessor {
85 pub fn new(config: ParallelShardConfig) -> Self {
87 if let Some(max_threads) = config.max_threads {
89 rayon::ThreadPoolBuilder::new()
90 .num_threads(max_threads)
91 .build_global()
92 .unwrap_or_else(|e| {
93 tracing::warn!(
94 "⚠️ Failed to set thread pool size to {}: {}",
95 max_threads,
96 e
97 );
98 });
99 }
100
101 Self {
102 config,
103 processed_count: AtomicUsize::new(0),
104 }
105 }
106
107 pub fn process_allocations_parallel(
109 &self,
110 data: &LocalizedExportData,
111 ) -> TrackingResult<(Vec<ProcessedShard>, ParallelProcessingStats)> {
112 let start_time = Instant::now();
113 let allocations = &data.allocations;
114
115 tracing::info!(
116 "🔄 Starting parallel shard processing for {} allocations...",
117 allocations.len()
118 );
119
120 let use_parallel = allocations.len() >= self.config.parallel_threshold;
122 let actual_threads = if use_parallel {
123 rayon::current_num_threads()
124 } else {
125 1
126 };
127
128 tracing::info!(
129 " Parallel mode: {}, threads: {}, shard size: {}",
130 if use_parallel { "enabled" } else { "disabled" },
131 actual_threads,
132 self.config.shard_size
133 );
134
135 self.processed_count.store(0, Ordering::Relaxed);
137
138 let shards: Vec<&[AllocationInfo]> = allocations.chunks(self.config.shard_size).collect();
140
141 tracing::info!(" Shard count: {}", shards.len());
142
143 let processed_shards: TrackingResult<Vec<ProcessedShard>> = if use_parallel {
145 shards
146 .into_par_iter()
147 .enumerate()
148 .map(|(index, shard)| self.process_shard_optimized(shard, index))
149 .collect()
150 } else {
151 shards
152 .into_iter()
153 .enumerate()
154 .map(|(index, shard)| self.process_shard_optimized(shard, index))
155 .collect()
156 };
157
158 let processed_shards = processed_shards?;
159 let total_time = start_time.elapsed();
160
161 let stats = self.calculate_processing_stats(
163 &processed_shards,
164 allocations.len(),
165 actual_threads,
166 total_time.as_millis() as u64,
167 use_parallel,
168 );
169
170 self.print_performance_stats(&stats);
172
173 Ok((processed_shards, stats))
174 }
175
176 fn process_shard_optimized(
178 &self,
179 shard: &[AllocationInfo],
180 shard_index: usize,
181 ) -> TrackingResult<ProcessedShard> {
182 let shard_start = Instant::now();
183
184 let estimated_size = shard.len() * self.config.estimated_json_size_per_allocation;
186 let mut output_buffer = Vec::with_capacity(estimated_size);
187
188 serde_json::to_writer(&mut output_buffer, shard).map_err(|e| {
191 TrackingError::ExportError(format!("Shard {shard_index} serialization failed: {e}"))
192 })?;
193
194 let processing_time = shard_start.elapsed();
195
196 self.processed_count
198 .fetch_add(shard.len(), Ordering::Relaxed);
199
200 if self.config.enable_monitoring && shard_index % 10 == 0 {
202 let _processed = self.processed_count.load(Ordering::Relaxed);
203 tracing::info!(
204 " Shard {shard_index} completed: {} allocations, {} bytes, {processing_time:?}",
205 shard.len(),
206 output_buffer.len(),
207 );
208 }
209
210 Ok(ProcessedShard {
211 data: output_buffer,
212 allocation_count: shard.len(),
213 shard_index,
214 processing_time_ms: processing_time.as_millis() as u64,
215 })
216 }
217
218 fn calculate_processing_stats(
220 &self,
221 shards: &[ProcessedShard],
222 total_allocations: usize,
223 threads_used: usize,
224 total_time_ms: u64,
225 used_parallel: bool,
226 ) -> ParallelProcessingStats {
227 let total_output_size: usize = shards.iter().map(|s| s.data.len()).sum();
228 let avg_shard_time: f64 = if !shards.is_empty() {
229 shards
230 .iter()
231 .map(|s| s.processing_time_ms as f64)
232 .sum::<f64>()
233 / shards.len() as f64
234 } else {
235 0.0
236 };
237
238 let throughput = if total_time_ms > 0 {
239 (total_allocations as f64 * 1000.0) / total_time_ms as f64
240 } else {
241 0.0
242 };
243
244 let parallel_efficiency = if used_parallel && threads_used > 1 {
246 let theoretical_speedup = threads_used as f64;
249 let estimated_sequential_time = avg_shard_time * shards.len() as f64;
250 let actual_speedup = if total_time_ms > 0 {
251 estimated_sequential_time / total_time_ms as f64
252 } else {
253 1.0
254 };
255 (actual_speedup / theoretical_speedup).min(1.0)
256 } else {
257 1.0 };
259
260 ParallelProcessingStats {
261 total_allocations,
262 shard_count: shards.len(),
263 threads_used,
264 total_processing_time_ms: total_time_ms,
265 avg_shard_processing_time_ms: avg_shard_time,
266 parallel_efficiency,
267 throughput_allocations_per_sec: throughput,
268 used_parallel_processing: used_parallel,
269 total_output_size_bytes: total_output_size,
270 }
271 }
272
273 fn print_performance_stats(&self, stats: &ParallelProcessingStats) {
275 tracing::info!("✅ Parallel shard processing completed:");
276 tracing::info!(" Total allocations: {}", stats.total_allocations);
277 tracing::info!(" Shard count: {}", stats.shard_count);
278 tracing::info!(" Threads used: {}", stats.threads_used);
279 tracing::info!(" Total time: {}ms", stats.total_processing_time_ms);
280 tracing::info!(
281 " Average shard time: {:.2}ms",
282 stats.avg_shard_processing_time_ms
283 );
284 tracing::info!(
285 " Throughput: {:.0} allocations/sec",
286 stats.throughput_allocations_per_sec
287 );
288 tracing::info!(
289 " Output size: {:.2} MB",
290 stats.total_output_size_bytes as f64 / 1024.0 / 1024.0
291 );
292
293 if stats.used_parallel_processing {
294 tracing::info!(
295 " Parallel efficiency: {:.1}%",
296 stats.parallel_efficiency * 100.0
297 );
298 let speedup = stats.parallel_efficiency * stats.threads_used as f64;
299 tracing::info!(" Actual speedup: {:.2}x", speedup);
300 }
301 }
302
303 pub fn get_config(&self) -> &ParallelShardConfig {
305 &self.config
306 }
307
308 pub fn update_config(&mut self, config: ParallelShardConfig) {
310 self.config = config;
311 }
312
313 pub fn get_processed_count(&self) -> usize {
315 self.processed_count.load(Ordering::Relaxed)
316 }
317}
318
319impl Default for ParallelShardProcessor {
320 fn default() -> Self {
321 Self::new(ParallelShardConfig::default())
322 }
323}
324
325pub fn process_allocations_fast(
327 data: &LocalizedExportData,
328) -> TrackingResult<(Vec<ProcessedShard>, ParallelProcessingStats)> {
329 let processor = ParallelShardProcessor::default();
330 processor.process_allocations_parallel(data)
331}
332
333pub fn process_allocations_with_config(
335 data: &LocalizedExportData,
336 config: ParallelShardConfig,
337) -> TrackingResult<(Vec<ProcessedShard>, ParallelProcessingStats)> {
338 let processor = ParallelShardProcessor::new(config);
339 processor.process_allocations_parallel(data)
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use crate::analysis::unsafe_ffi_tracker::UnsafeFFIStats;
346 use crate::core::types::{MemoryStats, ScopeInfo};
347 use std::time::Instant;
348
349 fn create_test_data(allocation_count: usize) -> LocalizedExportData {
350 let mut allocations = Vec::new();
351 for i in 0..allocation_count {
352 allocations.push(AllocationInfo {
353 ptr: 0x1000 + i,
354 size: 64 + (i % 100),
355 type_name: Some(format!("TestType{}", i % 10)),
356 var_name: Some(format!("var_{i}")),
357 scope_name: Some(format!("scope_{}", i % 5)),
358 timestamp_alloc: 1000000 + i as u64,
359 timestamp_dealloc: None,
360 thread_id: format!("test_thread_{}", i % 3),
361 borrow_count: 0,
362 stack_trace: None,
363 is_leaked: false,
364 lifetime_ms: None,
365 borrow_info: None,
366 clone_info: None,
367 ownership_history_available: false,
368 smart_pointer_info: None,
369 memory_layout: None,
370 generic_info: None,
371 dynamic_type_info: None,
372 runtime_state: None,
373 stack_allocation: None,
374 temporary_object: None,
375 fragmentation_analysis: None,
376 generic_instantiation: None,
377 type_relationships: None,
378 type_usage: None,
379 function_call_tracking: None,
380 lifecycle_tracking: None,
381 access_tracking: None,
382 drop_chain_analysis: None,
383 });
384 }
385
386 LocalizedExportData {
387 allocations,
388 enhanced_allocations: Vec::new(),
389 stats: MemoryStats::default(),
390 ffi_stats: UnsafeFFIStats::default(),
391 scope_info: Vec::<ScopeInfo>::new(),
392 timestamp: Instant::now(),
393 }
394 }
395
396 #[test]
397 fn test_parallel_shard_processor_creation() {
398 let config = ParallelShardConfig::default();
399 let processor = ParallelShardProcessor::new(config);
400 assert_eq!(processor.get_config().shard_size, 1000);
401 }
402
403 #[test]
404 fn test_small_dataset_sequential_processing() {
405 let data = create_test_data(100); let processor = ParallelShardProcessor::default();
407
408 let result = processor.process_allocations_parallel(&data);
409 assert!(result.is_ok());
410
411 let (shards, stats) = result.expect("Failed to process allocations");
412 assert_eq!(stats.total_allocations, 100);
413 assert!(!stats.used_parallel_processing); assert_eq!(shards.len(), 1); }
416
417 #[test]
418 fn test_parallel_processing() {
419 let data = create_test_data(20); let processor = ParallelShardProcessor::default();
421
422 let result = processor.process_allocations_parallel(&data);
423 assert!(result.is_ok());
424
425 let (shards, stats) = result.expect("Test operation failed");
426 assert_eq!(stats.total_allocations, 20);
427 assert!(!shards.is_empty());
428 }
429
430 #[test]
431 fn test_custom_config() {
432 let config = ParallelShardConfig {
433 shard_size: 500,
434 parallel_threshold: 1000,
435 max_threads: Some(2),
436 enable_monitoring: false,
437 estimated_json_size_per_allocation: 150,
438 };
439
440 let data = create_test_data(2000);
441 let processor = ParallelShardProcessor::new(config);
442
443 let result = processor.process_allocations_parallel(&data);
444 assert!(result.is_ok());
445
446 let (shards, stats) = result.expect("Test operation failed");
447 assert_eq!(stats.total_allocations, 2000);
448 assert_eq!(shards.len(), 4); }
450
451 #[test]
452 fn test_convenience_functions() {
453 let data = create_test_data(1500);
454
455 let result = process_allocations_fast(&data);
457 assert!(result.is_ok());
458
459 let config = ParallelShardConfig {
461 shard_size: 300,
462 ..Default::default()
463 };
464 let result = process_allocations_with_config(&data, config);
465 assert!(result.is_ok());
466
467 let (shards, _) = result.expect("Test operation failed");
468 assert_eq!(shards.len(), 5); }
470
471 #[test]
472 fn test_processed_shard_structure() {
473 let data = create_test_data(100);
474 let processor = ParallelShardProcessor::default();
475
476 let result = processor.process_allocations_parallel(&data);
477 assert!(result.is_ok());
478
479 let (shards, _) = result.expect("Failed to process allocations");
480 assert_eq!(shards.len(), 1);
481
482 let shard = &shards[0];
483 assert_eq!(shard.allocation_count, 100);
484 assert_eq!(shard.shard_index, 0);
485 assert!(!shard.data.is_empty());
486 assert!(shard.processing_time_ms < u64::MAX);
488
489 let parsed: Result<Vec<AllocationInfo>, _> = serde_json::from_slice(&shard.data);
491 assert!(parsed.is_ok());
492 assert_eq!(parsed.expect("Failed to parse JSON").len(), 100);
493 }
494}