memscope_rs/export/
batch_processor.rs

1//! Batch processor for optimized unsafe/FFI data processing
2//!
3//! This module provides high-performance batch processing capabilities for
4//! large datasets of unsafe and FFI memory allocations, with support for
5//! parallel processing and performance monitoring.
6
7use crate::analysis::unsafe_ffi_tracker::{
8    AllocationSource, BoundaryEvent, EnhancedAllocationInfo, LibCHookInfo, MemoryPassport,
9    RiskAssessment, RiskLevel,
10};
11use crate::core::types::{TrackingError, TrackingResult};
12use rayon::prelude::*;
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17
18/// Configuration for batch processing operations
19#[derive(Debug, Clone)]
20pub struct BatchProcessorConfig {
21    /// Size of each processing batch
22    pub batch_size: usize,
23    /// Threshold for enabling parallel processing
24    pub parallel_threshold: usize,
25    /// Maximum number of threads to use for parallel processing
26    pub max_threads: Option<usize>,
27    /// Enable performance monitoring
28    pub enable_monitoring: bool,
29    /// Memory usage limit per batch (in bytes)
30    pub memory_limit_per_batch: Option<usize>,
31}
32
33impl Default for BatchProcessorConfig {
34    fn default() -> Self {
35        Self {
36            batch_size: 1000,
37            parallel_threshold: 5000,
38            max_threads: None, // Use system default
39            enable_monitoring: true,
40            memory_limit_per_batch: Some(64 * 1024 * 1024), // 64MB per batch
41        }
42    }
43}
44
45/// Performance metrics for batch processing
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct BatchProcessingMetrics {
48    /// Total number of items processed
49    pub total_items: usize,
50    /// Number of batches processed
51    pub batch_count: usize,
52    /// Total processing time in milliseconds
53    pub total_processing_time_ms: u64,
54    /// Average processing time per batch in milliseconds
55    pub avg_batch_time_ms: f64,
56    /// Peak memory usage during processing
57    pub peak_memory_usage_bytes: usize,
58    /// Whether parallel processing was used
59    pub parallel_processing_used: bool,
60    /// Number of threads used
61    pub threads_used: usize,
62    /// Processing throughput (items per second)
63    pub throughput_items_per_sec: f64,
64}
65
66/// Processed unsafe allocation data
67#[derive(Debug, Clone, Serialize)]
68pub struct ProcessedUnsafeData {
69    /// Total number of unsafe allocations
70    pub total_allocations: usize,
71    /// Total memory allocated in unsafe blocks
72    pub total_memory: usize,
73    /// Risk distribution across all allocations
74    pub risk_distribution: RiskDistribution,
75    /// Information about unsafe blocks
76    pub unsafe_blocks: Vec<UnsafeBlockInfo>,
77    /// Processed unsafe allocations
78    pub allocations: Vec<ProcessedUnsafeAllocation>,
79    /// Performance metrics for processing
80    pub performance_metrics: UnsafePerformanceMetrics,
81}
82
83/// Processed FFI allocation data
84#[derive(Debug, Clone, Serialize)]
85pub struct ProcessedFFIData {
86    /// Total number of FFI allocations
87    pub total_allocations: usize,
88    /// Total memory allocated through FFI
89    pub total_memory: usize,
90    /// Libraries involved in FFI operations
91    pub libraries_involved: Vec<LibraryInfo>,
92    /// Hook statistics
93    pub hook_statistics: HookStatistics,
94    /// Processed FFI allocations
95    pub allocations: Vec<ProcessedFFIAllocation>,
96    /// Performance metrics for processing
97    pub performance_metrics: FFIPerformanceMetrics,
98}
99
100/// Processed boundary event data
101#[derive(Debug, Clone, Serialize)]
102pub struct ProcessedBoundaryData {
103    /// Total number of boundary crossings
104    pub total_crossings: usize,
105    /// Transfer patterns analysis
106    pub transfer_patterns: TransferPatterns,
107    /// Risk analysis for boundary operations
108    pub risk_analysis: BoundaryRiskAnalysis,
109    /// Processed boundary events
110    pub events: Vec<ProcessedBoundaryEvent>,
111    /// Performance impact analysis
112    pub performance_impact: BoundaryPerformanceImpact,
113}
114
115/// Risk distribution statistics
116#[derive(Debug, Clone, Serialize)]
117pub struct RiskDistribution {
118    /// Number of low risk allocations
119    pub low_risk: usize,
120    /// Number of medium risk allocations
121    pub medium_risk: usize,
122    /// Number of high risk allocations
123    pub high_risk: usize,
124    /// Number of critical risk allocations
125    pub critical_risk: usize,
126    /// Overall risk score (0.0 to 10.0)
127    pub overall_risk_score: f64,
128}
129
130/// Information about an unsafe block
131#[derive(Debug, Clone, Serialize)]
132pub struct UnsafeBlockInfo {
133    /// Location of the unsafe block
134    pub location: String,
135    /// Number of allocations in this block
136    pub allocation_count: usize,
137    /// Total memory allocated in this block
138    pub total_memory: usize,
139    /// Risk level of this block
140    pub risk_level: RiskLevel,
141    /// Functions called within this block
142    pub functions_called: Vec<String>,
143}
144
145/// Processed unsafe allocation
146#[derive(Debug, Clone, Serialize)]
147pub struct ProcessedUnsafeAllocation {
148    /// Memory pointer (as hex string)
149    pub ptr: String,
150    /// Allocation size
151    pub size: usize,
152    /// Type name if available
153    pub type_name: Option<String>,
154    /// Unsafe block location
155    pub unsafe_block_location: String,
156    /// Call stack information
157    pub call_stack: Vec<String>,
158    /// Risk assessment
159    pub risk_assessment: RiskAssessment,
160    /// Lifetime information
161    pub lifetime_info: LifetimeInfo,
162    /// Memory layout information
163    pub memory_layout: Option<MemoryLayoutInfo>,
164}
165
166/// Library information for FFI operations
167#[derive(Debug, Clone, Serialize)]
168pub struct LibraryInfo {
169    /// Name of the library
170    pub name: String,
171    /// Number of allocations from this library
172    pub allocation_count: usize,
173    /// Total memory allocated by this library
174    pub total_memory: usize,
175    /// Functions used from this library
176    pub functions_used: Vec<String>,
177    /// Average allocation size
178    pub avg_allocation_size: usize,
179}
180
181/// Hook statistics
182#[derive(Debug, Clone, Serialize)]
183pub struct HookStatistics {
184    /// Total number of hooks installed
185    pub total_hooks: usize,
186    /// Hook success rate
187    pub success_rate: f64,
188    /// Average hook overhead in nanoseconds
189    pub avg_overhead_ns: f64,
190    /// Hook methods used
191    pub methods_used: HashMap<String, usize>,
192}
193
194/// Processed FFI allocation
195#[derive(Debug, Clone, Serialize)]
196pub struct ProcessedFFIAllocation {
197    /// Memory pointer (as hex string)
198    pub ptr: String,
199    /// Allocation size
200    pub size: usize,
201    /// Library name
202    pub library_name: String,
203    /// Function name
204    pub function_name: String,
205    /// Call stack information
206    pub call_stack: Vec<String>,
207    /// Hook information
208    pub hook_info: LibCHookInfo,
209    /// Ownership information
210    pub ownership_info: OwnershipInfo,
211    /// Interop metadata
212    pub interop_metadata: InteropMetadata,
213}
214
215/// Transfer patterns analysis
216#[derive(Debug, Clone, Serialize)]
217pub struct TransferPatterns {
218    /// Most common transfer direction
219    pub dominant_direction: String,
220    /// Transfer frequency by type
221    pub frequency_by_type: HashMap<String, usize>,
222    /// Average transfer size
223    pub avg_transfer_size: usize,
224    /// Peak transfer activity time
225    pub peak_activity_time: Option<u128>,
226}
227
228/// Boundary risk analysis
229#[derive(Debug, Clone, Serialize)]
230pub struct BoundaryRiskAnalysis {
231    /// Overall boundary risk score
232    pub overall_risk_score: f64,
233    /// High risk transfer count
234    pub high_risk_transfers: usize,
235    /// Common risk patterns
236    pub common_risk_patterns: Vec<String>,
237    /// Mitigation recommendations
238    pub mitigation_recommendations: Vec<String>,
239}
240
241/// Processed boundary event
242#[derive(Debug, Clone, Serialize)]
243pub struct ProcessedBoundaryEvent {
244    /// Event identifier
245    pub event_id: String,
246    /// Event type
247    pub event_type: String,
248    /// Timestamp
249    pub timestamp: u128,
250    /// Source context information
251    pub from_context: ContextInfo,
252    /// Destination context information
253    pub to_context: ContextInfo,
254    /// Memory passport information
255    pub memory_passport: Option<MemoryPassport>,
256    /// Risk factors
257    pub risk_factors: Vec<String>,
258}
259
260/// Context information for boundary events
261#[derive(Debug, Clone, Serialize)]
262pub struct ContextInfo {
263    /// Context name (Rust/FFI)
264    pub name: String,
265    /// Function or module name
266    pub function: String,
267    /// Additional metadata
268    pub metadata: HashMap<String, String>,
269}
270
271/// Performance metrics for unsafe operations
272#[derive(Debug, Clone, Serialize)]
273pub struct UnsafePerformanceMetrics {
274    /// Processing time for unsafe analysis
275    pub processing_time_ms: u64,
276    /// Memory usage during processing
277    pub memory_usage_bytes: usize,
278    /// Number of risk assessments performed
279    pub risk_assessments_performed: usize,
280    /// Average risk assessment time
281    pub avg_risk_assessment_time_ns: f64,
282}
283
284/// Performance metrics for FFI operations
285#[derive(Debug, Clone, Serialize)]
286pub struct FFIPerformanceMetrics {
287    /// Processing time for FFI analysis
288    pub processing_time_ms: u64,
289    /// Memory usage during processing
290    pub memory_usage_bytes: usize,
291    /// Number of hook operations processed
292    pub hook_operations_processed: usize,
293    /// Average hook processing time
294    pub avg_hook_processing_time_ns: f64,
295}
296
297/// Performance impact of boundary operations
298#[derive(Debug, Clone, Serialize)]
299pub struct BoundaryPerformanceImpact {
300    /// Total boundary processing time
301    pub total_processing_time_ms: u64,
302    /// Average time per boundary crossing
303    pub avg_crossing_time_ns: f64,
304    /// Performance overhead percentage
305    pub overhead_percentage: f64,
306    /// Optimization opportunities
307    pub optimization_opportunities: Vec<String>,
308}
309
310/// Lifetime information for allocations
311#[derive(Debug, Clone, Serialize)]
312pub struct LifetimeInfo {
313    /// Allocation timestamp
314    pub allocated_at: u128,
315    /// Deallocation timestamp (if deallocated)
316    pub deallocated_at: Option<u128>,
317    /// Lifetime duration in nanoseconds
318    pub lifetime_ns: Option<u64>,
319    /// Scope information
320    pub scope: String,
321}
322
323/// Memory layout information
324#[derive(Debug, Clone, Serialize)]
325pub struct MemoryLayoutInfo {
326    /// Total size of the allocation
327    pub total_size: usize,
328    /// Memory alignment
329    pub alignment: usize,
330    /// Padding information
331    pub padding_bytes: usize,
332    /// Layout efficiency score
333    pub efficiency_score: f64,
334}
335
336/// Ownership information for FFI allocations
337#[derive(Debug, Clone, Serialize)]
338pub struct OwnershipInfo {
339    /// Current owner context
340    pub owner_context: String,
341    /// Owner function
342    pub owner_function: String,
343    /// Transfer timestamp
344    pub transfer_timestamp: u128,
345    /// Expected lifetime
346    pub expected_lifetime: Option<u128>,
347}
348
349/// Interop metadata for FFI operations
350#[derive(Debug, Clone, Serialize)]
351pub struct InteropMetadata {
352    /// Data marshalling information
353    pub marshalling_info: String,
354    /// Type conversion details
355    pub type_conversion: String,
356    /// Performance impact
357    pub performance_impact: String,
358    /// Safety considerations
359    pub safety_considerations: Vec<String>,
360}
361
362/// High-performance batch processor for unsafe/FFI data
363pub struct BatchProcessor {
364    /// Configuration for batch processing
365    config: BatchProcessorConfig,
366    /// Performance monitoring data
367    metrics: Arc<Mutex<BatchProcessingMetrics>>,
368    /// Thread pool for parallel processing
369    thread_pool: Option<rayon::ThreadPool>,
370}
371
372impl BatchProcessor {
373    /// Create a new batch processor with default configuration
374    pub fn new() -> Self {
375        Self::with_config(BatchProcessorConfig::default())
376    }
377
378    /// Create a new batch processor with custom configuration
379    pub fn with_config(config: BatchProcessorConfig) -> Self {
380        let thread_pool = if let Some(max_threads) = config.max_threads {
381            Some(
382                rayon::ThreadPoolBuilder::new()
383                    .num_threads(max_threads)
384                    .build()
385                    .expect("Failed to create thread pool"),
386            )
387        } else {
388            None
389        };
390
391        let metrics = Arc::new(Mutex::new(BatchProcessingMetrics {
392            total_items: 0,
393            batch_count: 0,
394            total_processing_time_ms: 0,
395            avg_batch_time_ms: 0.0,
396            peak_memory_usage_bytes: 0,
397            parallel_processing_used: false,
398            threads_used: 1,
399            throughput_items_per_sec: 0.0,
400        }));
401
402        Self {
403            config,
404            metrics,
405            thread_pool,
406        }
407    }
408
409    /// Process unsafe allocations in batches
410    pub fn process_unsafe_allocations(
411        &self,
412        allocations: &[EnhancedAllocationInfo],
413    ) -> TrackingResult<ProcessedUnsafeData> {
414        let start_time = Instant::now();
415        let use_parallel = allocations.len() >= self.config.parallel_threshold;
416
417        // Update metrics
418        if let Ok(mut metrics) = self.metrics.lock() {
419            metrics.total_items = allocations.len();
420            metrics.parallel_processing_used = use_parallel;
421            metrics.threads_used = if use_parallel {
422                self.thread_pool
423                    .as_ref()
424                    .map(|p| p.current_num_threads())
425                    .unwrap_or_else(|| rayon::current_num_threads())
426            } else {
427                1
428            };
429        }
430
431        let processed_allocations = if use_parallel {
432            self.process_unsafe_parallel(allocations)?
433        } else {
434            self.process_unsafe_sequential(allocations)?
435        };
436
437        let processing_time = start_time.elapsed();
438
439        // Calculate statistics
440        let total_memory: usize = processed_allocations.iter().map(|a| a.size).sum();
441        let risk_distribution = self.calculate_risk_distribution(&processed_allocations);
442        let unsafe_blocks = self.analyze_unsafe_blocks(&processed_allocations);
443
444        let performance_metrics = UnsafePerformanceMetrics {
445            processing_time_ms: processing_time.as_millis() as u64,
446            memory_usage_bytes: self.estimate_memory_usage(allocations.len()),
447            risk_assessments_performed: processed_allocations.len(),
448            avg_risk_assessment_time_ns: if processed_allocations.is_empty() {
449                0.0
450            } else {
451                processing_time.as_nanos() as f64 / processed_allocations.len() as f64
452            },
453        };
454
455        // Update final metrics
456        if let Ok(mut metrics) = self.metrics.lock() {
457            metrics.total_processing_time_ms = processing_time.as_millis() as u64;
458            metrics.avg_batch_time_ms = processing_time.as_millis() as f64
459                / ((allocations.len() / self.config.batch_size).max(1)) as f64;
460            metrics.throughput_items_per_sec = if processing_time.as_secs_f64() > 0.0 {
461                allocations.len() as f64 / processing_time.as_secs_f64()
462            } else {
463                0.0
464            };
465        }
466
467        Ok(ProcessedUnsafeData {
468            total_allocations: allocations.len(),
469            total_memory,
470            risk_distribution,
471            unsafe_blocks,
472            allocations: processed_allocations,
473            performance_metrics,
474        })
475    }
476
477    /// Process FFI allocations in batches
478    pub fn process_ffi_allocations(
479        &self,
480        allocations: &[EnhancedAllocationInfo],
481    ) -> TrackingResult<ProcessedFFIData> {
482        let start_time = Instant::now();
483        let use_parallel = allocations.len() >= self.config.parallel_threshold;
484
485        let processed_allocations = if use_parallel {
486            self.process_ffi_parallel(allocations)?
487        } else {
488            self.process_ffi_sequential(allocations)?
489        };
490
491        let processing_time = start_time.elapsed();
492
493        // Calculate statistics
494        let total_memory: usize = processed_allocations.iter().map(|a| a.size).sum();
495        let libraries_involved = self.analyze_libraries(&processed_allocations);
496        let hook_statistics = self.calculate_hook_statistics(&processed_allocations);
497
498        let performance_metrics = FFIPerformanceMetrics {
499            processing_time_ms: processing_time.as_millis() as u64,
500            memory_usage_bytes: self.estimate_memory_usage(allocations.len()),
501            hook_operations_processed: processed_allocations.len(),
502            avg_hook_processing_time_ns: if processed_allocations.is_empty() {
503                0.0
504            } else {
505                processing_time.as_nanos() as f64 / processed_allocations.len() as f64
506            },
507        };
508
509        Ok(ProcessedFFIData {
510            total_allocations: allocations.len(),
511            total_memory,
512            libraries_involved,
513            hook_statistics,
514            allocations: processed_allocations,
515            performance_metrics,
516        })
517    }
518
519    /// Process boundary events in batches
520    pub fn process_boundary_events(
521        &self,
522        allocations: &[EnhancedAllocationInfo],
523    ) -> TrackingResult<ProcessedBoundaryData> {
524        let start_time = Instant::now();
525
526        // Extract all boundary events from allocations
527        let mut all_events = Vec::new();
528        for allocation in allocations {
529            for event in &allocation.cross_boundary_events {
530                all_events.push((allocation, event));
531            }
532        }
533
534        let use_parallel = all_events.len() >= self.config.parallel_threshold;
535
536        let processed_events = if use_parallel {
537            self.process_boundary_parallel(&all_events)?
538        } else {
539            self.process_boundary_sequential(&all_events)?
540        };
541
542        let processing_time = start_time.elapsed();
543
544        // Calculate statistics
545        let transfer_patterns = self.analyze_transfer_patterns(&processed_events);
546        let risk_analysis = self.analyze_boundary_risks(&processed_events);
547
548        let performance_impact = BoundaryPerformanceImpact {
549            total_processing_time_ms: processing_time.as_millis() as u64,
550            avg_crossing_time_ns: if processed_events.is_empty() {
551                0.0
552            } else {
553                processing_time.as_nanos() as f64 / processed_events.len() as f64
554            },
555            overhead_percentage: 5.0, // Estimated overhead
556            optimization_opportunities: vec![
557                "Reduce boundary crossings".to_string(),
558                "Batch transfer operations".to_string(),
559            ],
560        };
561
562        Ok(ProcessedBoundaryData {
563            total_crossings: processed_events.len(),
564            transfer_patterns,
565            risk_analysis,
566            events: processed_events,
567            performance_impact,
568        })
569    }
570
571    /// Get current processing metrics
572    pub fn get_metrics(&self) -> TrackingResult<BatchProcessingMetrics> {
573        self.metrics
574            .lock()
575            .map(|m| m.clone())
576            .map_err(|e| TrackingError::LockError(e.to_string()))
577    }
578
579    /// Reset processing metrics
580    pub fn reset_metrics(&self) -> TrackingResult<()> {
581        if let Ok(mut metrics) = self.metrics.lock() {
582            *metrics = BatchProcessingMetrics {
583                total_items: 0,
584                batch_count: 0,
585                total_processing_time_ms: 0,
586                avg_batch_time_ms: 0.0,
587                peak_memory_usage_bytes: 0,
588                parallel_processing_used: false,
589                threads_used: 1,
590                throughput_items_per_sec: 0.0,
591            };
592        }
593        Ok(())
594    }
595}
596impl BatchProcessor {
597    /// Process unsafe allocations sequentially
598    fn process_unsafe_sequential(
599        &self,
600        allocations: &[EnhancedAllocationInfo],
601    ) -> TrackingResult<Vec<ProcessedUnsafeAllocation>> {
602        let mut processed = Vec::with_capacity(allocations.len());
603
604        for allocation in allocations {
605            if let AllocationSource::UnsafeRust {
606                unsafe_block_location,
607                call_stack,
608                risk_assessment,
609            } = &allocation.source
610            {
611                processed.push(ProcessedUnsafeAllocation {
612                    ptr: format!("0x{:x}", allocation.base.ptr),
613                    size: allocation.base.size,
614                    type_name: allocation.base.type_name.clone(),
615                    unsafe_block_location: unsafe_block_location.clone(),
616                    call_stack: call_stack.iter().map(|f| f.function_name.clone()).collect(),
617                    risk_assessment: risk_assessment.clone(),
618                    lifetime_info: LifetimeInfo {
619                        allocated_at: allocation.base.timestamp_alloc as u128,
620                        deallocated_at: allocation.base.timestamp_dealloc.map(|t| t as u128),
621                        lifetime_ns: allocation
622                            .base
623                            .timestamp_dealloc
624                            .map(|dealloc| (dealloc - allocation.base.timestamp_alloc) * 1_000_000),
625                        scope: allocation
626                            .base
627                            .scope_name
628                            .clone()
629                            .unwrap_or_else(|| "unknown".to_string()),
630                    },
631                    memory_layout: Some(MemoryLayoutInfo {
632                        total_size: allocation.base.size,
633                        alignment: 8,          // Default alignment
634                        padding_bytes: 0,      // Simplified
635                        efficiency_score: 0.9, // Estimated
636                    }),
637                });
638            }
639        }
640
641        Ok(processed)
642    }
643
644    /// Process unsafe allocations in parallel
645    fn process_unsafe_parallel(
646        &self,
647        allocations: &[EnhancedAllocationInfo],
648    ) -> TrackingResult<Vec<ProcessedUnsafeAllocation>> {
649        let processed: Result<Vec<_>, TrackingError> = if let Some(pool) = &self.thread_pool {
650            pool.install(|| {
651                allocations
652                    .par_chunks(self.config.batch_size)
653                    .map(|chunk| self.process_unsafe_sequential(chunk))
654                    .collect::<Result<Vec<_>, _>>()
655                    .map(|batches| batches.into_iter().flatten().collect())
656            })
657        } else {
658            allocations
659                .par_chunks(self.config.batch_size)
660                .map(|chunk| self.process_unsafe_sequential(chunk))
661                .collect::<Result<Vec<_>, _>>()
662                .map(|batches| batches.into_iter().flatten().collect())
663        };
664
665        processed
666    }
667
668    /// Process FFI allocations sequentially
669    fn process_ffi_sequential(
670        &self,
671        allocations: &[EnhancedAllocationInfo],
672    ) -> TrackingResult<Vec<ProcessedFFIAllocation>> {
673        let mut processed = Vec::with_capacity(allocations.len());
674
675        for allocation in allocations {
676            if let AllocationSource::FfiC {
677                library_name,
678                function_name,
679                call_stack,
680                libc_hook_info,
681            } = &allocation.source
682            {
683                processed.push(ProcessedFFIAllocation {
684                    ptr: format!("0x{:x}", allocation.base.ptr),
685                    size: allocation.base.size,
686                    library_name: library_name.clone(),
687                    function_name: function_name.clone(),
688                    call_stack: call_stack.iter().map(|f| f.function_name.clone()).collect(),
689                    hook_info: libc_hook_info.clone(),
690                    ownership_info: OwnershipInfo {
691                        owner_context: "FFI".to_string(),
692                        owner_function: function_name.clone(),
693                        transfer_timestamp: allocation.base.timestamp_alloc as u128,
694                        expected_lifetime: None,
695                    },
696                    interop_metadata: InteropMetadata {
697                        marshalling_info: "C-compatible".to_string(),
698                        type_conversion: "Direct".to_string(),
699                        performance_impact: "Low".to_string(),
700                        safety_considerations: vec![
701                            "Manual memory management required".to_string(),
702                            "Potential for memory leaks".to_string(),
703                        ],
704                    },
705                });
706            }
707        }
708
709        Ok(processed)
710    }
711
712    /// Process FFI allocations in parallel
713    fn process_ffi_parallel(
714        &self,
715        allocations: &[EnhancedAllocationInfo],
716    ) -> TrackingResult<Vec<ProcessedFFIAllocation>> {
717        let processed: Result<Vec<_>, TrackingError> = if let Some(pool) = &self.thread_pool {
718            pool.install(|| {
719                allocations
720                    .par_chunks(self.config.batch_size)
721                    .map(|chunk| self.process_ffi_sequential(chunk))
722                    .collect::<Result<Vec<_>, _>>()
723                    .map(|batches| batches.into_iter().flatten().collect())
724            })
725        } else {
726            allocations
727                .par_chunks(self.config.batch_size)
728                .map(|chunk| self.process_ffi_sequential(chunk))
729                .collect::<Result<Vec<_>, _>>()
730                .map(|batches| batches.into_iter().flatten().collect())
731        };
732
733        processed
734    }
735
736    /// Process boundary events sequentially
737    fn process_boundary_sequential(
738        &self,
739        events: &[(&EnhancedAllocationInfo, &BoundaryEvent)],
740    ) -> TrackingResult<Vec<ProcessedBoundaryEvent>> {
741        let mut processed = Vec::with_capacity(events.len());
742
743        for (allocation, event) in events {
744            processed.push(ProcessedBoundaryEvent {
745                event_id: format!("boundary_{:x}_{}", allocation.base.ptr, event.timestamp),
746                event_type: format!("{:?}", event.event_type),
747                timestamp: event.timestamp,
748                from_context: ContextInfo {
749                    name: event.from_context.clone(),
750                    function: "unknown".to_string(),
751                    metadata: HashMap::new(),
752                },
753                to_context: ContextInfo {
754                    name: event.to_context.clone(),
755                    function: "unknown".to_string(),
756                    metadata: HashMap::new(),
757                },
758                memory_passport: allocation.memory_passport.clone(),
759                risk_factors: vec!["Cross-boundary transfer".to_string()],
760            });
761        }
762
763        Ok(processed)
764    }
765
766    /// Process boundary events in parallel
767    fn process_boundary_parallel(
768        &self,
769        events: &[(&EnhancedAllocationInfo, &BoundaryEvent)],
770    ) -> TrackingResult<Vec<ProcessedBoundaryEvent>> {
771        let processed: Result<Vec<_>, TrackingError> = if let Some(pool) = &self.thread_pool {
772            pool.install(|| {
773                events
774                    .par_chunks(self.config.batch_size)
775                    .map(|chunk| self.process_boundary_sequential(chunk))
776                    .collect::<Result<Vec<_>, _>>()
777                    .map(|batches| batches.into_iter().flatten().collect())
778            })
779        } else {
780            events
781                .par_chunks(self.config.batch_size)
782                .map(|chunk| self.process_boundary_sequential(chunk))
783                .collect::<Result<Vec<_>, _>>()
784                .map(|batches| batches.into_iter().flatten().collect())
785        };
786
787        processed
788    }
789
790    /// Calculate risk distribution from processed allocations
791    fn calculate_risk_distribution(
792        &self,
793        allocations: &[ProcessedUnsafeAllocation],
794    ) -> RiskDistribution {
795        let mut low_risk = 0;
796        let mut medium_risk = 0;
797        let mut high_risk = 0;
798        let mut critical_risk = 0;
799        let mut total_risk_score = 0.0;
800
801        for allocation in allocations {
802            match allocation.risk_assessment.risk_level {
803                RiskLevel::Low => low_risk += 1,
804                RiskLevel::Medium => medium_risk += 1,
805                RiskLevel::High => high_risk += 1,
806                RiskLevel::Critical => critical_risk += 1,
807            }
808
809            // Calculate risk score based on level
810            let risk_score = match allocation.risk_assessment.risk_level {
811                RiskLevel::Low => 2.0,
812                RiskLevel::Medium => 5.0,
813                RiskLevel::High => 8.0,
814                RiskLevel::Critical => 10.0,
815            };
816            total_risk_score += risk_score;
817        }
818
819        let overall_risk_score = if allocations.is_empty() {
820            0.0
821        } else {
822            total_risk_score / allocations.len() as f64
823        };
824
825        RiskDistribution {
826            low_risk,
827            medium_risk,
828            high_risk,
829            critical_risk,
830            overall_risk_score,
831        }
832    }
833
834    /// Analyze unsafe blocks from processed allocations
835    fn analyze_unsafe_blocks(
836        &self,
837        allocations: &[ProcessedUnsafeAllocation],
838    ) -> Vec<UnsafeBlockInfo> {
839        let mut blocks: HashMap<String, UnsafeBlockInfo> = HashMap::new();
840
841        for allocation in allocations {
842            let entry = blocks
843                .entry(allocation.unsafe_block_location.clone())
844                .or_insert_with(|| UnsafeBlockInfo {
845                    location: allocation.unsafe_block_location.clone(),
846                    allocation_count: 0,
847                    total_memory: 0,
848                    risk_level: RiskLevel::Low,
849                    functions_called: Vec::new(),
850                });
851
852            entry.allocation_count += 1;
853            entry.total_memory += allocation.size;
854
855            // Update risk level to highest found
856            if matches!(allocation.risk_assessment.risk_level, RiskLevel::Critical) {
857                entry.risk_level = RiskLevel::Critical;
858            } else if matches!(allocation.risk_assessment.risk_level, RiskLevel::High)
859                && !matches!(entry.risk_level, RiskLevel::Critical)
860            {
861                entry.risk_level = RiskLevel::High;
862            } else if matches!(allocation.risk_assessment.risk_level, RiskLevel::Medium)
863                && matches!(entry.risk_level, RiskLevel::Low)
864            {
865                entry.risk_level = RiskLevel::Medium;
866            }
867
868            // Add unique functions from call stack
869            for func in &allocation.call_stack {
870                if !entry.functions_called.contains(func) {
871                    entry.functions_called.push(func.clone());
872                }
873            }
874        }
875
876        blocks.into_values().collect()
877    }
878
879    /// Analyze libraries from processed FFI allocations
880    fn analyze_libraries(&self, allocations: &[ProcessedFFIAllocation]) -> Vec<LibraryInfo> {
881        let mut libraries: HashMap<String, LibraryInfo> = HashMap::new();
882
883        for allocation in allocations {
884            let entry = libraries
885                .entry(allocation.library_name.clone())
886                .or_insert_with(|| LibraryInfo {
887                    name: allocation.library_name.clone(),
888                    allocation_count: 0,
889                    total_memory: 0,
890                    functions_used: Vec::new(),
891                    avg_allocation_size: 0,
892                });
893
894            entry.allocation_count += 1;
895            entry.total_memory += allocation.size;
896
897            if !entry.functions_used.contains(&allocation.function_name) {
898                entry.functions_used.push(allocation.function_name.clone());
899            }
900        }
901
902        // Calculate average allocation sizes
903        for library in libraries.values_mut() {
904            library.avg_allocation_size = if library.allocation_count > 0 {
905                library.total_memory / library.allocation_count
906            } else {
907                0
908            };
909        }
910
911        libraries.into_values().collect()
912    }
913
914    /// Calculate hook statistics from processed FFI allocations
915    fn calculate_hook_statistics(&self, allocations: &[ProcessedFFIAllocation]) -> HookStatistics {
916        let mut methods_used = HashMap::new();
917        let mut total_overhead = 0.0;
918        let mut overhead_count = 0;
919
920        for allocation in allocations {
921            let method_name = format!("{:?}", allocation.hook_info.hook_method);
922            *methods_used.entry(method_name).or_insert(0) += 1;
923
924            if let Some(overhead) = allocation.hook_info.hook_overhead_ns {
925                total_overhead += overhead as f64;
926                overhead_count += 1;
927            }
928        }
929
930        let avg_overhead_ns = if overhead_count > 0 {
931            total_overhead / overhead_count as f64
932        } else {
933            0.0
934        };
935
936        HookStatistics {
937            total_hooks: allocations.len(),
938            success_rate: 0.95, // Estimated success rate
939            avg_overhead_ns,
940            methods_used,
941        }
942    }
943
944    /// Analyze transfer patterns from boundary events
945    fn analyze_transfer_patterns(&self, events: &[ProcessedBoundaryEvent]) -> TransferPatterns {
946        let mut frequency_by_type = HashMap::new();
947        let mut total_size = 0;
948        let mut rust_to_ffi = 0;
949        let mut ffi_to_rust = 0;
950
951        for event in events {
952            *frequency_by_type
953                .entry(event.event_type.clone())
954                .or_insert(0) += 1;
955
956            if event.from_context.name.contains("Rust") && event.to_context.name.contains("FFI") {
957                rust_to_ffi += 1;
958            } else if event.from_context.name.contains("FFI")
959                && event.to_context.name.contains("Rust")
960            {
961                ffi_to_rust += 1;
962            }
963
964            // Estimate transfer size (simplified)
965            total_size += 64; // Average estimated size
966        }
967
968        let dominant_direction = if rust_to_ffi > ffi_to_rust {
969            "Rust -> FFI".to_string()
970        } else if ffi_to_rust > rust_to_ffi {
971            "FFI -> Rust".to_string()
972        } else {
973            "Balanced".to_string()
974        };
975
976        let avg_transfer_size = if events.is_empty() {
977            0
978        } else {
979            total_size / events.len()
980        };
981
982        TransferPatterns {
983            dominant_direction,
984            frequency_by_type,
985            avg_transfer_size,
986            peak_activity_time: None, // Could be calculated from timestamps
987        }
988    }
989
990    /// Analyze boundary risks from processed events
991    fn analyze_boundary_risks(&self, events: &[ProcessedBoundaryEvent]) -> BoundaryRiskAnalysis {
992        let high_risk_transfers = events.iter().filter(|e| e.risk_factors.len() > 1).count();
993
994        let overall_risk_score = if events.is_empty() {
995            0.0
996        } else {
997            (high_risk_transfers as f64 / events.len() as f64) * 10.0
998        };
999
1000        BoundaryRiskAnalysis {
1001            overall_risk_score,
1002            high_risk_transfers,
1003            common_risk_patterns: vec![
1004                "Unvalidated pointer transfer".to_string(),
1005                "Size mismatch potential".to_string(),
1006                "Ownership ambiguity".to_string(),
1007            ],
1008            mitigation_recommendations: vec![
1009                "Implement pointer validation".to_string(),
1010                "Add size checks at boundaries".to_string(),
1011                "Clarify ownership semantics".to_string(),
1012            ],
1013        }
1014    }
1015
1016    /// Estimate memory usage for processing
1017    fn estimate_memory_usage(&self, item_count: usize) -> usize {
1018        // Rough estimate: 1KB per item for processing overhead
1019        item_count * 1024
1020    }
1021}
1022
1023impl Default for BatchProcessor {
1024    fn default() -> Self {
1025        Self::new()
1026    }
1027}