memscope_rs/export/
batch_processor.rs

1//! Batch processor for optimized unsafe/FFI data processing
2//!
3//! This module provides high-performance batch processing capabilities for
4//! large datasets of unsafe and FFI memory allocations, with support for
5//! parallel processing and performance monitoring.
6
7use crate::analysis::unsafe_ffi_tracker::{
8    AllocationSource, BoundaryEvent, EnhancedAllocationInfo, LibCHookInfo, MemoryPassport,
9    RiskAssessment, RiskLevel,
10};
11use crate::core::types::{TrackingError, TrackingResult};
12
13use rayon::prelude::*;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::{Arc, Mutex};
17use std::time::Instant;
18
19/// Configuration for batch processing operations
20#[derive(Debug, Clone)]
21pub struct BatchProcessorConfig {
22    /// Size of each processing batch
23    pub batch_size: usize,
24    /// Threshold for enabling parallel processing
25    pub parallel_threshold: usize,
26    /// Maximum number of threads to use for parallel processing
27    pub max_threads: Option<usize>,
28    /// Enable performance monitoring
29    pub enable_monitoring: bool,
30    /// Memory usage limit per batch (in bytes)
31    pub memory_limit_per_batch: Option<usize>,
32}
33
34impl Default for BatchProcessorConfig {
35    fn default() -> Self {
36        Self {
37            batch_size: 1000,
38            parallel_threshold: 5000,
39            max_threads: None, // Use system default
40            enable_monitoring: true,
41            memory_limit_per_batch: Some(64 * 1024 * 1024), // 64MB per batch
42        }
43    }
44}
45
46/// Performance metrics for batch processing
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct BatchProcessingMetrics {
49    /// Total number of items processed
50    pub total_items: usize,
51    /// Number of batches processed
52    pub batch_count: usize,
53    /// Total processing time in milliseconds
54    pub total_processing_time_ms: u64,
55    /// Average processing time per batch in milliseconds
56    pub avg_batch_time_ms: f64,
57    /// Peak memory usage during processing
58    pub peak_memory_usage_bytes: usize,
59    /// Whether parallel processing was used
60    pub parallel_processing_used: bool,
61    /// Number of threads used
62    pub threads_used: usize,
63    /// Processing throughput (items per second)
64    pub throughput_items_per_sec: f64,
65}
66
67/// Processed unsafe allocation data
68#[derive(Debug, Clone, Serialize)]
69pub struct ProcessedUnsafeData {
70    /// Total number of unsafe allocations
71    pub total_allocations: usize,
72    /// Total memory allocated in unsafe blocks
73    pub total_memory: usize,
74    /// Risk distribution across all allocations
75    pub risk_distribution: RiskDistribution,
76    /// Information about unsafe blocks
77    pub unsafe_blocks: Vec<UnsafeBlockInfo>,
78    /// Processed unsafe allocations
79    pub allocations: Vec<ProcessedUnsafeAllocation>,
80    /// Performance metrics for processing
81    pub performance_metrics: UnsafePerformanceMetrics,
82}
83
84/// Processed FFI allocation data
85#[derive(Debug, Clone, Serialize)]
86pub struct ProcessedFFIData {
87    /// Total number of FFI allocations
88    pub total_allocations: usize,
89    /// Total memory allocated through FFI
90    pub total_memory: usize,
91    /// Libraries involved in FFI operations
92    pub libraries_involved: Vec<LibraryInfo>,
93    /// Hook statistics
94    pub hook_statistics: HookStatistics,
95    /// Processed FFI allocations
96    pub allocations: Vec<ProcessedFFIAllocation>,
97    /// Performance metrics for processing
98    pub performance_metrics: FFIPerformanceMetrics,
99}
100
101/// Processed boundary event data
102#[derive(Debug, Clone, Serialize)]
103pub struct ProcessedBoundaryData {
104    /// Total number of boundary crossings
105    pub total_crossings: usize,
106    /// Transfer patterns analysis
107    pub transfer_patterns: TransferPatterns,
108    /// Risk analysis for boundary operations
109    pub risk_analysis: BoundaryRiskAnalysis,
110    /// Processed boundary events
111    pub events: Vec<ProcessedBoundaryEvent>,
112    /// Performance impact analysis
113    pub performance_impact: BoundaryPerformanceImpact,
114}
115
116/// Risk distribution statistics
117#[derive(Debug, Clone, Serialize)]
118pub struct RiskDistribution {
119    /// Number of low risk allocations
120    pub low_risk: usize,
121    /// Number of medium risk allocations
122    pub medium_risk: usize,
123    /// Number of high risk allocations
124    pub high_risk: usize,
125    /// Number of critical risk allocations
126    pub critical_risk: usize,
127    /// Overall risk score (0.0 to 10.0)
128    pub overall_risk_score: f64,
129}
130
131/// Information about an unsafe block
132#[derive(Debug, Clone, Serialize)]
133pub struct UnsafeBlockInfo {
134    /// Location of the unsafe block
135    pub location: String,
136    /// Number of allocations in this block
137    pub allocation_count: usize,
138    /// Total memory allocated in this block
139    pub total_memory: usize,
140    /// Risk level of this block
141    pub risk_level: RiskLevel,
142    /// Functions called within this block
143    pub functions_called: Vec<String>,
144}
145
146/// Processed unsafe allocation
147#[derive(Debug, Clone, Serialize)]
148pub struct ProcessedUnsafeAllocation {
149    /// Memory pointer (as hex string)
150    pub ptr: String,
151    /// Allocation size
152    pub size: usize,
153    /// Type name if available
154    pub type_name: Option<String>,
155    /// Unsafe block location
156    pub unsafe_block_location: String,
157    /// Call stack information
158    pub call_stack: Vec<String>,
159    /// Risk assessment
160    pub risk_assessment: RiskAssessment,
161    /// Lifetime information
162    pub lifetime_info: LifetimeInfo,
163    /// Memory layout information
164    pub memory_layout: Option<MemoryLayoutInfo>,
165}
166
167/// Library information for FFI operations
168#[derive(Debug, Clone, Serialize)]
169pub struct LibraryInfo {
170    /// Name of the library
171    pub name: String,
172    /// Number of allocations from this library
173    pub allocation_count: usize,
174    /// Total memory allocated by this library
175    pub total_memory: usize,
176    /// Functions used from this library
177    pub functions_used: Vec<String>,
178    /// Average allocation size
179    pub avg_allocation_size: usize,
180}
181
182/// Hook statistics
183#[derive(Debug, Clone, Serialize)]
184pub struct HookStatistics {
185    /// Total number of hooks installed
186    pub total_hooks: usize,
187    /// Hook success rate
188    pub success_rate: f64,
189    /// Average hook overhead in nanoseconds
190    pub avg_overhead_ns: f64,
191    /// Hook methods used
192    pub methods_used: HashMap<String, usize>,
193}
194
195/// Processed FFI allocation
196#[derive(Debug, Clone, Serialize)]
197pub struct ProcessedFFIAllocation {
198    /// Memory pointer (as hex string)
199    pub ptr: String,
200    /// Allocation size
201    pub size: usize,
202    /// Library name
203    pub library_name: String,
204    /// Function name
205    pub function_name: String,
206    /// Call stack information
207    pub call_stack: Vec<String>,
208    /// Hook information
209    pub hook_info: LibCHookInfo,
210    /// Ownership information
211    pub ownership_info: OwnershipInfo,
212    /// Interop metadata
213    pub interop_metadata: InteropMetadata,
214}
215
216/// Transfer patterns analysis
217#[derive(Debug, Clone, Serialize)]
218pub struct TransferPatterns {
219    /// Most common transfer direction
220    pub dominant_direction: String,
221    /// Transfer frequency by type
222    pub frequency_by_type: HashMap<String, usize>,
223    /// Average transfer size
224    pub avg_transfer_size: usize,
225    /// Peak transfer activity time
226    pub peak_activity_time: Option<u128>,
227}
228
229/// Boundary risk analysis
230#[derive(Debug, Clone, Serialize)]
231pub struct BoundaryRiskAnalysis {
232    /// Overall boundary risk score
233    pub overall_risk_score: f64,
234    /// High risk transfer count
235    pub high_risk_transfers: usize,
236    /// Common risk patterns
237    pub common_risk_patterns: Vec<String>,
238    /// Mitigation recommendations
239    pub mitigation_recommendations: Vec<String>,
240}
241
242/// Processed boundary event
243#[derive(Debug, Clone, Serialize)]
244pub struct ProcessedBoundaryEvent {
245    /// Event identifier
246    pub event_id: String,
247    /// Event type
248    pub event_type: String,
249    /// Timestamp
250    pub timestamp: u128,
251    /// Source context information
252    pub from_context: ContextInfo,
253    /// Destination context information
254    pub to_context: ContextInfo,
255    /// Memory passport information
256    pub memory_passport: Option<MemoryPassport>,
257    /// Risk factors
258    pub risk_factors: Vec<String>,
259}
260
261/// Context information for boundary events
262#[derive(Debug, Clone, Serialize)]
263pub struct ContextInfo {
264    /// Context name (Rust/FFI)
265    pub name: String,
266    /// Function or module name
267    pub function: String,
268    /// Additional metadata
269    pub metadata: HashMap<String, String>,
270}
271
272/// Performance metrics for unsafe operations
273#[derive(Debug, Clone, Serialize)]
274pub struct UnsafePerformanceMetrics {
275    /// Processing time for unsafe analysis
276    pub processing_time_ms: u64,
277    /// Memory usage during processing
278    pub memory_usage_bytes: usize,
279    /// Number of risk assessments performed
280    pub risk_assessments_performed: usize,
281    /// Average risk assessment time
282    pub avg_risk_assessment_time_ns: f64,
283}
284
285/// Performance metrics for FFI operations
286#[derive(Debug, Clone, Serialize)]
287pub struct FFIPerformanceMetrics {
288    /// Processing time for FFI analysis
289    pub processing_time_ms: u64,
290    /// Memory usage during processing
291    pub memory_usage_bytes: usize,
292    /// Number of hook operations processed
293    pub hook_operations_processed: usize,
294    /// Average hook processing time
295    pub avg_hook_processing_time_ns: f64,
296}
297
298/// Performance impact of boundary operations
299#[derive(Debug, Clone, Serialize)]
300pub struct BoundaryPerformanceImpact {
301    /// Total boundary processing time
302    pub total_processing_time_ms: u64,
303    /// Average time per boundary crossing
304    pub avg_crossing_time_ns: f64,
305    /// Performance overhead percentage
306    pub overhead_percentage: f64,
307    /// Optimization opportunities
308    pub optimization_opportunities: Vec<String>,
309}
310
311/// Lifetime information for allocations
312#[derive(Debug, Clone, Serialize)]
313pub struct LifetimeInfo {
314    /// Allocation timestamp
315    pub allocated_at: u128,
316    /// Deallocation timestamp (if deallocated)
317    pub deallocated_at: Option<u128>,
318    /// Lifetime duration in nanoseconds
319    pub lifetime_ns: Option<u64>,
320    /// Scope information
321    pub scope: String,
322}
323
324/// Memory layout information
325#[derive(Debug, Clone, Serialize)]
326pub struct MemoryLayoutInfo {
327    /// Total size of the allocation
328    pub total_size: usize,
329    /// Memory alignment
330    pub alignment: usize,
331    /// Padding information
332    pub padding_bytes: usize,
333    /// Layout efficiency score
334    pub efficiency_score: f64,
335}
336
337/// Ownership information for FFI allocations
338#[derive(Debug, Clone, Serialize)]
339pub struct OwnershipInfo {
340    /// Current owner context
341    pub owner_context: String,
342    /// Owner function
343    pub owner_function: String,
344    /// Transfer timestamp
345    pub transfer_timestamp: u128,
346    /// Expected lifetime
347    pub expected_lifetime: Option<u128>,
348}
349
350/// Interop metadata for FFI operations
351#[derive(Debug, Clone, Serialize)]
352pub struct InteropMetadata {
353    /// Data marshalling information
354    pub marshalling_info: String,
355    /// Type conversion details
356    pub type_conversion: String,
357    /// Performance impact
358    pub performance_impact: String,
359    /// Safety considerations
360    pub safety_considerations: Vec<String>,
361}
362
363/// High-performance batch processor for unsafe/FFI data
364pub struct BatchProcessor {
365    /// Configuration for batch processing
366    config: BatchProcessorConfig,
367    /// Performance monitoring data
368    metrics: Arc<Mutex<BatchProcessingMetrics>>,
369    /// Thread pool for parallel processing
370    thread_pool: Option<rayon::ThreadPool>,
371}
372
373impl BatchProcessor {
374    /// Create a new batch processor with default configuration
375    pub fn new() -> Self {
376        Self::with_config(BatchProcessorConfig::default())
377    }
378
379    /// Create a new batch processor with custom configuration
380    pub fn with_config(config: BatchProcessorConfig) -> Self {
381        let thread_pool = config.max_threads.map(|max_threads| {
382            rayon::ThreadPoolBuilder::new()
383                .num_threads(max_threads)
384                .build()
385                .expect("Failed to create thread pool")
386        });
387
388        let metrics = Arc::new(Mutex::new(BatchProcessingMetrics {
389            total_items: 0,
390            batch_count: 0,
391            total_processing_time_ms: 0,
392            avg_batch_time_ms: 0.0,
393            peak_memory_usage_bytes: 0,
394            parallel_processing_used: false,
395            threads_used: 1,
396            throughput_items_per_sec: 0.0,
397        }));
398
399        Self {
400            config,
401            metrics,
402            thread_pool,
403        }
404    }
405
406    /// Process unsafe allocations in batches
407    pub fn process_unsafe_allocations(
408        &self,
409        allocations: &[EnhancedAllocationInfo],
410    ) -> TrackingResult<ProcessedUnsafeData> {
411        let start_time = Instant::now();
412        let use_parallel = allocations.len() >= self.config.parallel_threshold;
413
414        // Update metrics
415        if let Ok(mut metrics) = self.metrics.lock() {
416            metrics.total_items = allocations.len();
417            metrics.parallel_processing_used = use_parallel;
418            metrics.threads_used = if use_parallel {
419                self.thread_pool
420                    .as_ref()
421                    .map(|p| p.current_num_threads())
422                    .unwrap_or_else(rayon::current_num_threads)
423            } else {
424                1
425            };
426        }
427
428        let processed_allocations = if use_parallel {
429            self.process_unsafe_parallel(allocations)?
430        } else {
431            self.process_unsafe_sequential(allocations)?
432        };
433
434        let processing_time = start_time.elapsed();
435
436        // Calculate statistics
437        let total_memory: usize = processed_allocations.iter().map(|a| a.size).sum();
438        let risk_distribution = self.calculate_risk_distribution(&processed_allocations);
439        let unsafe_blocks = self.analyze_unsafe_blocks(&processed_allocations);
440
441        let performance_metrics = UnsafePerformanceMetrics {
442            processing_time_ms: processing_time.as_millis() as u64,
443            memory_usage_bytes: self.estimate_memory_usage(allocations.len()),
444            risk_assessments_performed: processed_allocations.len(),
445            avg_risk_assessment_time_ns: if processed_allocations.is_empty() {
446                0.0
447            } else {
448                processing_time.as_nanos() as f64 / processed_allocations.len() as f64
449            },
450        };
451
452        // Update final metrics
453        if let Ok(mut metrics) = self.metrics.lock() {
454            metrics.total_processing_time_ms = processing_time.as_millis() as u64;
455            metrics.avg_batch_time_ms = processing_time.as_millis() as f64
456                / ((allocations.len() / self.config.batch_size).max(1)) as f64;
457            metrics.throughput_items_per_sec = if processing_time.as_secs_f64() > 0.0 {
458                allocations.len() as f64 / processing_time.as_secs_f64()
459            } else {
460                0.0
461            };
462        }
463
464        Ok(ProcessedUnsafeData {
465            total_allocations: allocations.len(),
466            total_memory,
467            risk_distribution,
468            unsafe_blocks,
469            allocations: processed_allocations,
470            performance_metrics,
471        })
472    }
473
474    /// Process FFI allocations in batches
475    pub fn process_ffi_allocations(
476        &self,
477        allocations: &[EnhancedAllocationInfo],
478    ) -> TrackingResult<ProcessedFFIData> {
479        let start_time = Instant::now();
480        let use_parallel = allocations.len() >= self.config.parallel_threshold;
481
482        let processed_allocations = if use_parallel {
483            self.process_ffi_parallel(allocations)?
484        } else {
485            self.process_ffi_sequential(allocations)?
486        };
487
488        let processing_time = start_time.elapsed();
489
490        // Calculate statistics
491        let total_memory: usize = processed_allocations.iter().map(|a| a.size).sum();
492        let libraries_involved = self.analyze_libraries(&processed_allocations);
493        let hook_statistics = self.calculate_hook_statistics(&processed_allocations);
494
495        let performance_metrics = FFIPerformanceMetrics {
496            processing_time_ms: processing_time.as_millis() as u64,
497            memory_usage_bytes: self.estimate_memory_usage(allocations.len()),
498            hook_operations_processed: processed_allocations.len(),
499            avg_hook_processing_time_ns: if processed_allocations.is_empty() {
500                0.0
501            } else {
502                processing_time.as_nanos() as f64 / processed_allocations.len() as f64
503            },
504        };
505
506        Ok(ProcessedFFIData {
507            total_allocations: allocations.len(),
508            total_memory,
509            libraries_involved,
510            hook_statistics,
511            allocations: processed_allocations,
512            performance_metrics,
513        })
514    }
515
516    /// Process boundary events in batches
517    pub fn process_boundary_events(
518        &self,
519        allocations: &[EnhancedAllocationInfo],
520    ) -> TrackingResult<ProcessedBoundaryData> {
521        let start_time = Instant::now();
522
523        // Extract all boundary events from allocations
524        let mut all_events = Vec::new();
525        for allocation in allocations {
526            for event in &allocation.cross_boundary_events {
527                all_events.push((allocation, event));
528            }
529        }
530
531        let use_parallel = all_events.len() >= self.config.parallel_threshold;
532
533        let processed_events = if use_parallel {
534            self.process_boundary_parallel(&all_events)?
535        } else {
536            self.process_boundary_sequential(&all_events)?
537        };
538
539        let processing_time = start_time.elapsed();
540
541        // Calculate statistics
542        let transfer_patterns = self.analyze_transfer_patterns(&processed_events);
543        let risk_analysis = self.analyze_boundary_risks(&processed_events);
544
545        let performance_impact = BoundaryPerformanceImpact {
546            total_processing_time_ms: processing_time.as_millis() as u64,
547            avg_crossing_time_ns: if processed_events.is_empty() {
548                0.0
549            } else {
550                processing_time.as_nanos() as f64 / processed_events.len() as f64
551            },
552            overhead_percentage: 5.0, // Estimated overhead
553            optimization_opportunities: vec![
554                "Reduce boundary crossings".to_string(),
555                "Batch transfer operations".to_string(),
556            ],
557        };
558
559        Ok(ProcessedBoundaryData {
560            total_crossings: processed_events.len(),
561            transfer_patterns,
562            risk_analysis,
563            events: processed_events,
564            performance_impact,
565        })
566    }
567
568    /// Get current processing metrics
569    pub fn get_metrics(&self) -> TrackingResult<BatchProcessingMetrics> {
570        self.metrics
571            .lock()
572            .map(|m| m.clone())
573            .map_err(|e| TrackingError::LockError(e.to_string()))
574    }
575
576    /// Reset processing metrics
577    pub fn reset_metrics(&self) -> TrackingResult<()> {
578        if let Ok(mut metrics) = self.metrics.lock() {
579            *metrics = BatchProcessingMetrics {
580                total_items: 0,
581                batch_count: 0,
582                total_processing_time_ms: 0,
583                avg_batch_time_ms: 0.0,
584                peak_memory_usage_bytes: 0,
585                parallel_processing_used: false,
586                threads_used: 1,
587                throughput_items_per_sec: 0.0,
588            };
589        }
590        Ok(())
591    }
592}
593impl BatchProcessor {
594    /// Process unsafe allocations sequentially
595    fn process_unsafe_sequential(
596        &self,
597        allocations: &[EnhancedAllocationInfo],
598    ) -> TrackingResult<Vec<ProcessedUnsafeAllocation>> {
599        let mut processed = Vec::with_capacity(allocations.len());
600
601        for allocation in allocations {
602            if let AllocationSource::UnsafeRust {
603                unsafe_block_location,
604                call_stack,
605                risk_assessment,
606            } = &allocation.source
607            {
608                processed.push(ProcessedUnsafeAllocation {
609                    ptr: format!("0x{:x}", allocation.base.ptr),
610                    size: allocation.base.size,
611                    type_name: allocation.base.type_name.clone(),
612                    unsafe_block_location: unsafe_block_location.clone(),
613                    call_stack: call_stack
614                        .get_frames()
615                        .unwrap_or_default()
616                        .iter()
617                        .map(|f| f.function_name.clone())
618                        .collect(),
619                    risk_assessment: risk_assessment.clone(),
620                    lifetime_info: LifetimeInfo {
621                        allocated_at: allocation.base.timestamp_alloc as u128,
622                        deallocated_at: allocation.base.timestamp_dealloc.map(|t| t as u128),
623                        lifetime_ns: allocation
624                            .base
625                            .timestamp_dealloc
626                            .map(|dealloc| (dealloc - allocation.base.timestamp_alloc) * 1_000_000),
627                        scope: allocation
628                            .base
629                            .scope_name
630                            .clone()
631                            .unwrap_or_else(|| "unknown".to_string()),
632                    },
633                    memory_layout: Some(MemoryLayoutInfo {
634                        total_size: allocation.base.size,
635                        alignment: 8,          // Default alignment
636                        padding_bytes: 0,      // Simplified
637                        efficiency_score: 0.9, // Estimated
638                    }),
639                });
640            }
641        }
642
643        Ok(processed)
644    }
645
646    /// Process unsafe allocations in parallel
647    fn process_unsafe_parallel(
648        &self,
649        allocations: &[EnhancedAllocationInfo],
650    ) -> TrackingResult<Vec<ProcessedUnsafeAllocation>> {
651        let processed: Result<Vec<_>, TrackingError> = if let Some(pool) = &self.thread_pool {
652            pool.install(|| {
653                allocations
654                    .par_chunks(self.config.batch_size)
655                    .map(|chunk| self.process_unsafe_sequential(chunk))
656                    .collect::<Result<Vec<_>, _>>()
657                    .map(|batches| batches.into_iter().flatten().collect())
658            })
659        } else {
660            allocations
661                .par_chunks(self.config.batch_size)
662                .map(|chunk| self.process_unsafe_sequential(chunk))
663                .collect::<Result<Vec<_>, _>>()
664                .map(|batches| batches.into_iter().flatten().collect())
665        };
666
667        processed
668    }
669
670    /// Process FFI allocations sequentially
671    fn process_ffi_sequential(
672        &self,
673        allocations: &[EnhancedAllocationInfo],
674    ) -> TrackingResult<Vec<ProcessedFFIAllocation>> {
675        let mut processed = Vec::with_capacity(allocations.len());
676
677        for allocation in allocations {
678            if let AllocationSource::FfiC {
679                resolved_function,
680                call_stack,
681                libc_hook_info,
682            } = &allocation.source
683            {
684                processed.push(ProcessedFFIAllocation {
685                    ptr: format!("0x{:x}", allocation.base.ptr),
686                    size: allocation.base.size,
687                    library_name: resolved_function.library_name.clone(),
688                    function_name: resolved_function.function_name.clone(),
689                    call_stack: call_stack
690                        .get_frames()
691                        .unwrap_or_default()
692                        .iter()
693                        .map(|f| f.function_name.clone())
694                        .collect(),
695                    hook_info: libc_hook_info.clone(),
696                    ownership_info: OwnershipInfo {
697                        owner_context: "FFI".to_string(),
698                        owner_function: resolved_function.function_name.clone(),
699                        transfer_timestamp: allocation.base.timestamp_alloc as u128,
700                        expected_lifetime: None,
701                    },
702                    interop_metadata: InteropMetadata {
703                        marshalling_info: "C-compatible".to_string(),
704                        type_conversion: "Direct".to_string(),
705                        performance_impact: "Low".to_string(),
706                        safety_considerations: vec![
707                            "Manual memory management required".to_string(),
708                            "Potential for memory leaks".to_string(),
709                        ],
710                    },
711                });
712            }
713        }
714
715        Ok(processed)
716    }
717
718    /// Process FFI allocations in parallel
719    fn process_ffi_parallel(
720        &self,
721        allocations: &[EnhancedAllocationInfo],
722    ) -> TrackingResult<Vec<ProcessedFFIAllocation>> {
723        let processed: Result<Vec<_>, TrackingError> = if let Some(pool) = &self.thread_pool {
724            pool.install(|| {
725                allocations
726                    .par_chunks(self.config.batch_size)
727                    .map(|chunk| self.process_ffi_sequential(chunk))
728                    .collect::<Result<Vec<_>, _>>()
729                    .map(|batches| batches.into_iter().flatten().collect())
730            })
731        } else {
732            allocations
733                .par_chunks(self.config.batch_size)
734                .map(|chunk| self.process_ffi_sequential(chunk))
735                .collect::<Result<Vec<_>, _>>()
736                .map(|batches| batches.into_iter().flatten().collect())
737        };
738
739        processed
740    }
741
742    /// Process boundary events sequentially
743    fn process_boundary_sequential(
744        &self,
745        events: &[(&EnhancedAllocationInfo, &BoundaryEvent)],
746    ) -> TrackingResult<Vec<ProcessedBoundaryEvent>> {
747        let mut processed = Vec::with_capacity(events.len());
748
749        for (allocation, event) in events {
750            processed.push(ProcessedBoundaryEvent {
751                event_id: format!("boundary_{:x}_{}", allocation.base.ptr, event.timestamp),
752                event_type: format!("{:?}", event.event_type),
753                timestamp: event.timestamp,
754                from_context: ContextInfo {
755                    name: event.from_context.clone(),
756                    function: "unknown".to_string(),
757                    metadata: HashMap::new(),
758                },
759                to_context: ContextInfo {
760                    name: event.to_context.clone(),
761                    function: "unknown".to_string(),
762                    metadata: HashMap::new(),
763                },
764                memory_passport: allocation.memory_passport.clone(),
765                risk_factors: vec!["Cross-boundary transfer".to_string()],
766            });
767        }
768
769        Ok(processed)
770    }
771
772    /// Process boundary events in parallel
773    fn process_boundary_parallel(
774        &self,
775        events: &[(&EnhancedAllocationInfo, &BoundaryEvent)],
776    ) -> TrackingResult<Vec<ProcessedBoundaryEvent>> {
777        let processed: Result<Vec<_>, TrackingError> = if let Some(pool) = &self.thread_pool {
778            pool.install(|| {
779                events
780                    .par_chunks(self.config.batch_size)
781                    .map(|chunk| self.process_boundary_sequential(chunk))
782                    .collect::<Result<Vec<_>, _>>()
783                    .map(|batches| batches.into_iter().flatten().collect())
784            })
785        } else {
786            events
787                .par_chunks(self.config.batch_size)
788                .map(|chunk| self.process_boundary_sequential(chunk))
789                .collect::<Result<Vec<_>, _>>()
790                .map(|batches| batches.into_iter().flatten().collect())
791        };
792
793        processed
794    }
795
796    /// Calculate risk distribution from processed allocations
797    fn calculate_risk_distribution(
798        &self,
799        allocations: &[ProcessedUnsafeAllocation],
800    ) -> RiskDistribution {
801        let mut low_risk = 0;
802        let mut medium_risk = 0;
803        let mut high_risk = 0;
804        let mut critical_risk = 0;
805        let mut total_risk_score = 0.0;
806
807        for allocation in allocations {
808            match allocation.risk_assessment.risk_level {
809                RiskLevel::Low => low_risk += 1,
810                RiskLevel::Medium => medium_risk += 1,
811                RiskLevel::High => high_risk += 1,
812                RiskLevel::Critical => critical_risk += 1,
813            }
814
815            // Calculate risk score based on level
816            let risk_score = match allocation.risk_assessment.risk_level {
817                RiskLevel::Low => 2.0,
818                RiskLevel::Medium => 5.0,
819                RiskLevel::High => 8.0,
820                RiskLevel::Critical => 10.0,
821            };
822            total_risk_score += risk_score;
823        }
824
825        let overall_risk_score = if allocations.is_empty() {
826            0.0
827        } else {
828            total_risk_score / allocations.len() as f64
829        };
830
831        RiskDistribution {
832            low_risk,
833            medium_risk,
834            high_risk,
835            critical_risk,
836            overall_risk_score,
837        }
838    }
839
840    /// Analyze unsafe blocks from processed allocations
841    fn analyze_unsafe_blocks(
842        &self,
843        allocations: &[ProcessedUnsafeAllocation],
844    ) -> Vec<UnsafeBlockInfo> {
845        let mut blocks: HashMap<String, UnsafeBlockInfo> = HashMap::new();
846
847        for allocation in allocations {
848            let entry = blocks
849                .entry(allocation.unsafe_block_location.clone())
850                .or_insert_with(|| UnsafeBlockInfo {
851                    location: allocation.unsafe_block_location.clone(),
852                    allocation_count: 0,
853                    total_memory: 0,
854                    risk_level: RiskLevel::Low,
855                    functions_called: Vec::new(),
856                });
857
858            entry.allocation_count += 1;
859            entry.total_memory += allocation.size;
860
861            // Update risk level to highest found
862            if matches!(allocation.risk_assessment.risk_level, RiskLevel::Critical) {
863                entry.risk_level = RiskLevel::Critical;
864            } else if matches!(allocation.risk_assessment.risk_level, RiskLevel::High)
865                && !matches!(entry.risk_level, RiskLevel::Critical)
866            {
867                entry.risk_level = RiskLevel::High;
868            } else if matches!(allocation.risk_assessment.risk_level, RiskLevel::Medium)
869                && matches!(entry.risk_level, RiskLevel::Low)
870            {
871                entry.risk_level = RiskLevel::Medium;
872            }
873
874            // Add unique functions from call stack
875            for func in &allocation.call_stack {
876                if !entry.functions_called.contains(func) {
877                    entry.functions_called.push(func.clone());
878                }
879            }
880        }
881
882        blocks.into_values().collect()
883    }
884
885    /// Analyze libraries from processed FFI allocations
886    fn analyze_libraries(&self, allocations: &[ProcessedFFIAllocation]) -> Vec<LibraryInfo> {
887        let mut libraries: HashMap<String, LibraryInfo> = HashMap::new();
888
889        for allocation in allocations {
890            let entry = libraries
891                .entry(allocation.library_name.clone())
892                .or_insert_with(|| LibraryInfo {
893                    name: allocation.library_name.clone(),
894                    allocation_count: 0,
895                    total_memory: 0,
896                    functions_used: Vec::new(),
897                    avg_allocation_size: 0,
898                });
899
900            entry.allocation_count += 1;
901            entry.total_memory += allocation.size;
902
903            if !entry.functions_used.contains(&allocation.function_name) {
904                entry.functions_used.push(allocation.function_name.clone());
905            }
906        }
907
908        // Calculate average allocation sizes
909        for library in libraries.values_mut() {
910            library.avg_allocation_size = if library.allocation_count > 0 {
911                library.total_memory / library.allocation_count
912            } else {
913                0
914            };
915        }
916
917        libraries.into_values().collect()
918    }
919
920    /// Calculate hook statistics from processed FFI allocations
921    fn calculate_hook_statistics(&self, allocations: &[ProcessedFFIAllocation]) -> HookStatistics {
922        let mut methods_used = HashMap::new();
923        let mut total_overhead = 0.0;
924        let mut overhead_count = 0;
925
926        for allocation in allocations {
927            let method_name = format!("{:?}", allocation.hook_info.hook_method);
928            *methods_used.entry(method_name).or_insert(0) += 1;
929
930            if let Some(overhead) = allocation.hook_info.hook_overhead_ns {
931                total_overhead += overhead as f64;
932                overhead_count += 1;
933            }
934        }
935
936        let avg_overhead_ns = if overhead_count > 0 {
937            total_overhead / overhead_count as f64
938        } else {
939            0.0
940        };
941
942        HookStatistics {
943            total_hooks: allocations.len(),
944            success_rate: 0.95, // Estimated success rate
945            avg_overhead_ns,
946            methods_used,
947        }
948    }
949
950    /// Analyze transfer patterns from boundary events
951    fn analyze_transfer_patterns(&self, events: &[ProcessedBoundaryEvent]) -> TransferPatterns {
952        let mut frequency_by_type = HashMap::new();
953        let mut total_size = 0;
954        let mut rust_to_ffi = 0;
955        let mut ffi_to_rust = 0;
956
957        for event in events {
958            *frequency_by_type
959                .entry(event.event_type.clone())
960                .or_insert(0) += 1;
961
962            if event.from_context.name.contains("Rust") && event.to_context.name.contains("FFI") {
963                rust_to_ffi += 1;
964            } else if event.from_context.name.contains("FFI")
965                && event.to_context.name.contains("Rust")
966            {
967                ffi_to_rust += 1;
968            }
969
970            // Estimate transfer size (simplified)
971            total_size += 64; // Average estimated size
972        }
973
974        let dominant_direction = if rust_to_ffi > ffi_to_rust {
975            "Rust -> FFI".to_string()
976        } else if ffi_to_rust > rust_to_ffi {
977            "FFI -> Rust".to_string()
978        } else {
979            "Balanced".to_string()
980        };
981
982        let avg_transfer_size = if events.is_empty() {
983            0
984        } else {
985            total_size / events.len()
986        };
987
988        TransferPatterns {
989            dominant_direction,
990            frequency_by_type,
991            avg_transfer_size,
992            peak_activity_time: None, // Could be calculated from timestamps
993        }
994    }
995
996    /// Analyze boundary risks from processed events
997    fn analyze_boundary_risks(&self, events: &[ProcessedBoundaryEvent]) -> BoundaryRiskAnalysis {
998        let high_risk_transfers = events.iter().filter(|e| e.risk_factors.len() > 1).count();
999
1000        let overall_risk_score = if events.is_empty() {
1001            0.0
1002        } else {
1003            (high_risk_transfers as f64 / events.len() as f64) * 10.0
1004        };
1005
1006        BoundaryRiskAnalysis {
1007            overall_risk_score,
1008            high_risk_transfers,
1009            common_risk_patterns: vec![
1010                "Unvalidated pointer transfer".to_string(),
1011                "Size mismatch potential".to_string(),
1012                "Ownership ambiguity".to_string(),
1013            ],
1014            mitigation_recommendations: vec![
1015                "Implement pointer validation".to_string(),
1016                "Add size checks at boundaries".to_string(),
1017                "Clarify ownership semantics".to_string(),
1018            ],
1019        }
1020    }
1021
1022    /// Estimate memory usage for processing
1023    fn estimate_memory_usage(&self, item_count: usize) -> usize {
1024        // Rough estimate: 1KB per item for processing overhead
1025        item_count * 1024
1026    }
1027}
1028
1029impl Default for BatchProcessor {
1030    fn default() -> Self {
1031        Self::new()
1032    }
1033}
1034
1035#[cfg(test)]
1036mod tests {
1037    use super::*;
1038    use crate::analysis::ffi_function_resolver::ResolvedFfiFunction;
1039    use crate::analysis::unsafe_ffi_tracker::{
1040        AllocationMetadata, AllocationSource, BoundaryEvent, BoundaryEventType,
1041        EnhancedAllocationInfo, HookMethod, LibCHookInfo, MemoryProtectionFlags, RiskAssessment,
1042        RiskFactor, RiskFactorType, RiskLevel,
1043    };
1044    use crate::analysis::FfiFunctionCategory;
1045    use crate::core::types::AllocationInfo;
1046    use crate::core::CallStackRef;
1047    use std::time::SystemTime;
1048
1049    fn create_test_allocation_info(ptr: usize, size: usize, type_name: &str) -> AllocationInfo {
1050        AllocationInfo {
1051            ptr,
1052            size,
1053            var_name: Some("test_var".to_string()),
1054            type_name: Some(type_name.to_string()),
1055            scope_name: Some("test_scope".to_string()),
1056            timestamp_alloc: SystemTime::now()
1057                .duration_since(SystemTime::UNIX_EPOCH)
1058                .unwrap()
1059                .as_millis() as u64,
1060            timestamp_dealloc: None,
1061            thread_id: "test_thread".to_string(),
1062            borrow_count: 0,
1063            stack_trace: None,
1064            is_leaked: false,
1065            lifetime_ms: None,
1066            borrow_info: None,
1067            clone_info: None,
1068            ownership_history_available: false,
1069            smart_pointer_info: None,
1070            memory_layout: None,
1071            generic_info: None,
1072            dynamic_type_info: None,
1073            runtime_state: None,
1074            stack_allocation: None,
1075            temporary_object: None,
1076            fragmentation_analysis: None,
1077            generic_instantiation: None,
1078            type_relationships: None,
1079            type_usage: None,
1080            function_call_tracking: None,
1081            lifecycle_tracking: None,
1082            access_tracking: None,
1083            drop_chain_analysis: None,
1084        }
1085    }
1086
1087    fn create_test_unsafe_allocation(ptr: usize, size: usize) -> EnhancedAllocationInfo {
1088        let call_stack = CallStackRef::new(0, Some(1));
1089        EnhancedAllocationInfo {
1090            base: create_test_allocation_info(ptr, size, "TestType"),
1091            source: AllocationSource::UnsafeRust {
1092                unsafe_block_location: "test.rs:42".to_string(),
1093                call_stack: call_stack.clone(),
1094                risk_assessment: RiskAssessment {
1095                    risk_level: RiskLevel::Medium,
1096                    risk_factors: vec![RiskFactor {
1097                        factor_type: RiskFactorType::RawPointerDeref,
1098                        severity: 5.0,
1099                        description: "Raw pointer usage".to_string(),
1100                        source_location: Some("test.rs:42".to_string()),
1101                    }],
1102                    mitigation_suggestions: vec!["Add bounds checking".to_string()],
1103                    confidence_score: 0.8,
1104                    assessment_timestamp: SystemTime::now()
1105                        .duration_since(SystemTime::UNIX_EPOCH)
1106                        .unwrap()
1107                        .as_nanos(),
1108                },
1109            },
1110            call_stack,
1111            cross_boundary_events: Vec::new(),
1112            safety_violations: Vec::new(),
1113            ffi_tracked: false,
1114            memory_passport: None,
1115            ownership_history: None,
1116        }
1117    }
1118
1119    fn create_test_ffi_allocation(ptr: usize, size: usize) -> EnhancedAllocationInfo {
1120        let call_stack = CallStackRef::new(0, Some(1));
1121        EnhancedAllocationInfo {
1122            base: create_test_allocation_info(ptr, size, "CType"),
1123            source: AllocationSource::FfiC {
1124                resolved_function: ResolvedFfiFunction {
1125                    library_name: "libc".to_string(),
1126                    function_name: "malloc".to_string(),
1127                    signature: None,
1128                    category: FfiFunctionCategory::MemoryManagement,
1129                    risk_level: crate::analysis::FfiRiskLevel::Medium,
1130                    metadata: std::collections::HashMap::new(),
1131                },
1132                call_stack: call_stack.clone(),
1133                libc_hook_info: LibCHookInfo {
1134                    hook_method: HookMethod::DynamicLinker,
1135                    original_function: "malloc".to_string(),
1136                    hook_timestamp: SystemTime::now()
1137                        .duration_since(SystemTime::UNIX_EPOCH)
1138                        .unwrap()
1139                        .as_nanos(),
1140                    allocation_metadata: AllocationMetadata {
1141                        requested_size: size,
1142                        actual_size: size,
1143                        alignment: 8,
1144                        allocator_info: "libc".to_string(),
1145                        protection_flags: Some(MemoryProtectionFlags {
1146                            readable: true,
1147                            writable: true,
1148                            executable: false,
1149                            shared: false,
1150                        }),
1151                    },
1152                    hook_overhead_ns: Some(100),
1153                },
1154            },
1155            call_stack,
1156            cross_boundary_events: Vec::new(),
1157            safety_violations: Vec::new(),
1158            ffi_tracked: true,
1159            memory_passport: None,
1160            ownership_history: None,
1161        }
1162    }
1163
1164    fn create_test_allocation_with_boundary_events(
1165        ptr: usize,
1166        size: usize,
1167    ) -> EnhancedAllocationInfo {
1168        let mut allocation = create_test_unsafe_allocation(ptr, size);
1169        let call_stack = CallStackRef::new(0, Some(1));
1170        allocation.cross_boundary_events = vec![
1171            BoundaryEvent {
1172                event_type: BoundaryEventType::RustToFfi,
1173                timestamp: 1000,
1174                from_context: "Rust".to_string(),
1175                to_context: "FFI".to_string(),
1176                stack: call_stack.clone(),
1177            },
1178            BoundaryEvent {
1179                event_type: BoundaryEventType::FfiToRust,
1180                timestamp: 2000,
1181                from_context: "FFI".to_string(),
1182                to_context: "Rust".to_string(),
1183                stack: call_stack,
1184            },
1185        ];
1186        allocation
1187    }
1188
1189    #[test]
1190    fn test_batch_processor_config_default() {
1191        let config = BatchProcessorConfig::default();
1192        assert_eq!(config.batch_size, 1000);
1193        assert_eq!(config.parallel_threshold, 5000);
1194        assert!(config.max_threads.is_none());
1195        assert!(config.enable_monitoring);
1196        assert_eq!(config.memory_limit_per_batch, Some(64 * 1024 * 1024));
1197    }
1198
1199    #[test]
1200    fn test_batch_processor_creation() {
1201        let processor = BatchProcessor::new();
1202        assert!(processor.config.enable_monitoring);
1203        assert_eq!(processor.config.batch_size, 1000);
1204    }
1205
1206    #[test]
1207    fn test_batch_processor_with_custom_config() {
1208        let config = BatchProcessorConfig {
1209            batch_size: 500,
1210            parallel_threshold: 2000,
1211            max_threads: Some(4),
1212            enable_monitoring: false,
1213            memory_limit_per_batch: Some(32 * 1024 * 1024),
1214        };
1215
1216        let processor = BatchProcessor::with_config(config.clone());
1217        assert_eq!(processor.config.batch_size, 500);
1218        assert_eq!(processor.config.parallel_threshold, 2000);
1219        assert_eq!(processor.config.max_threads, Some(4));
1220        assert!(!processor.config.enable_monitoring);
1221    }
1222
1223    #[test]
1224    fn test_process_unsafe_allocations_empty() {
1225        let processor = BatchProcessor::new();
1226        let allocations = vec![];
1227
1228        let result = processor.process_unsafe_allocations(&allocations);
1229        assert!(result.is_ok());
1230
1231        let processed = result.unwrap();
1232        assert_eq!(processed.total_allocations, 0);
1233        assert_eq!(processed.total_memory, 0);
1234        assert_eq!(processed.allocations.len(), 0);
1235        assert_eq!(processed.risk_distribution.low_risk, 0);
1236    }
1237
1238    #[test]
1239    fn test_process_unsafe_allocations_single() {
1240        let processor = BatchProcessor::new();
1241        let allocations = vec![create_test_unsafe_allocation(0x1000, 64)];
1242
1243        let result = processor.process_unsafe_allocations(&allocations);
1244        assert!(result.is_ok());
1245
1246        let processed = result.unwrap();
1247        assert_eq!(processed.total_allocations, 1);
1248        assert_eq!(processed.total_memory, 64);
1249        assert_eq!(processed.allocations.len(), 1);
1250        assert_eq!(processed.risk_distribution.medium_risk, 1);
1251
1252        let allocation = &processed.allocations[0];
1253        assert_eq!(allocation.ptr, "0x1000");
1254        assert_eq!(allocation.size, 64);
1255        assert_eq!(allocation.unsafe_block_location, "test.rs:42");
1256    }
1257
1258    #[test]
1259    fn test_process_unsafe_allocations_multiple() {
1260        let processor = BatchProcessor::new();
1261        let allocations = vec![
1262            create_test_unsafe_allocation(0x1000, 64),
1263            create_test_unsafe_allocation(0x2000, 128),
1264            create_test_unsafe_allocation(0x3000, 256),
1265        ];
1266
1267        let result = processor.process_unsafe_allocations(&allocations);
1268        assert!(result.is_ok());
1269
1270        let processed = result.unwrap();
1271        assert_eq!(processed.total_allocations, 3);
1272        assert_eq!(processed.total_memory, 448); // 64 + 128 + 256
1273        assert_eq!(processed.allocations.len(), 3);
1274        assert_eq!(processed.risk_distribution.medium_risk, 3);
1275    }
1276
1277    #[test]
1278    fn test_process_ffi_allocations_empty() {
1279        let processor = BatchProcessor::new();
1280        let allocations = vec![];
1281
1282        let result = processor.process_ffi_allocations(&allocations);
1283        assert!(result.is_ok());
1284
1285        let processed = result.unwrap();
1286        assert_eq!(processed.total_allocations, 0);
1287        assert_eq!(processed.total_memory, 0);
1288        assert_eq!(processed.allocations.len(), 0);
1289        assert_eq!(processed.libraries_involved.len(), 0);
1290    }
1291
1292    #[test]
1293    fn test_process_ffi_allocations_single() {
1294        let processor = BatchProcessor::new();
1295        let allocations = vec![create_test_ffi_allocation(0x4000, 512)];
1296
1297        let result = processor.process_ffi_allocations(&allocations);
1298        assert!(result.is_ok());
1299
1300        let processed = result.unwrap();
1301        assert_eq!(processed.total_allocations, 1);
1302        assert_eq!(processed.total_memory, 512);
1303        assert_eq!(processed.allocations.len(), 1);
1304        assert_eq!(processed.libraries_involved.len(), 1);
1305
1306        let allocation = &processed.allocations[0];
1307        assert_eq!(allocation.ptr, "0x4000");
1308        assert_eq!(allocation.size, 512);
1309        assert_eq!(allocation.library_name, "libc");
1310        assert_eq!(allocation.function_name, "malloc");
1311
1312        let library = &processed.libraries_involved[0];
1313        assert_eq!(library.name, "libc");
1314        assert_eq!(library.allocation_count, 1);
1315        assert_eq!(library.total_memory, 512);
1316    }
1317
1318    #[test]
1319    fn test_process_ffi_allocations_multiple_libraries() {
1320        let processor = BatchProcessor::new();
1321        let alloc1 = create_test_ffi_allocation(0x4000, 512);
1322        let mut alloc2 = create_test_ffi_allocation(0x5000, 256);
1323
1324        // Change second allocation to different library
1325        if let AllocationSource::FfiC {
1326            resolved_function, ..
1327        } = &mut alloc2.source
1328        {
1329            resolved_function.library_name = "libssl".to_string();
1330            resolved_function.function_name = "OPENSSL_malloc".to_string();
1331        }
1332
1333        let allocations = vec![alloc1, alloc2];
1334
1335        let result = processor.process_ffi_allocations(&allocations);
1336        assert!(result.is_ok());
1337
1338        let processed = result.unwrap();
1339        assert_eq!(processed.total_allocations, 2);
1340        assert_eq!(processed.total_memory, 768); // 512 + 256
1341        assert_eq!(processed.allocations.len(), 2);
1342        assert_eq!(processed.libraries_involved.len(), 2);
1343
1344        let library_names: Vec<&String> = processed
1345            .libraries_involved
1346            .iter()
1347            .map(|l| &l.name)
1348            .collect();
1349        assert!(library_names.contains(&&"libc".to_string()));
1350        assert!(library_names.contains(&&"libssl".to_string()));
1351    }
1352
1353    #[test]
1354    fn test_process_boundary_events_empty() {
1355        let processor = BatchProcessor::new();
1356        let allocations = vec![];
1357
1358        let result = processor.process_boundary_events(&allocations);
1359        assert!(result.is_ok());
1360
1361        let processed = result.unwrap();
1362        assert_eq!(processed.total_crossings, 0);
1363        assert_eq!(processed.events.len(), 0);
1364        assert_eq!(processed.transfer_patterns.avg_transfer_size, 0);
1365    }
1366
1367    #[test]
1368    fn test_process_boundary_events_with_events() {
1369        let processor = BatchProcessor::new();
1370        let allocations = vec![
1371            create_test_allocation_with_boundary_events(0x6000, 128),
1372            create_test_allocation_with_boundary_events(0x7000, 256),
1373        ];
1374
1375        let result = processor.process_boundary_events(&allocations);
1376        assert!(result.is_ok());
1377
1378        let processed = result.unwrap();
1379        assert_eq!(processed.total_crossings, 4); // 2 allocations * 2 events each
1380        assert_eq!(processed.events.len(), 4);
1381        assert!(processed.transfer_patterns.avg_transfer_size > 0);
1382        assert!(processed.risk_analysis.overall_risk_score >= 0.0);
1383
1384        // Check that events are properly processed
1385        let event = &processed.events[0];
1386        assert!(event.event_id.contains("boundary_"));
1387        assert!(!event.event_type.is_empty());
1388        assert!(event.timestamp > 0);
1389    }
1390
1391    #[test]
1392    fn test_risk_distribution_calculation() {
1393        let processor = BatchProcessor::new();
1394
1395        // Create allocations with different risk levels
1396        let mut low_risk_alloc = create_test_unsafe_allocation(0x1000, 64);
1397        let mut high_risk_alloc = create_test_unsafe_allocation(0x2000, 128);
1398        let mut critical_risk_alloc = create_test_unsafe_allocation(0x3000, 256);
1399
1400        // Modify risk levels
1401        if let AllocationSource::UnsafeRust {
1402            risk_assessment, ..
1403        } = &mut low_risk_alloc.source
1404        {
1405            risk_assessment.risk_level = RiskLevel::Low;
1406        }
1407        if let AllocationSource::UnsafeRust {
1408            risk_assessment, ..
1409        } = &mut high_risk_alloc.source
1410        {
1411            risk_assessment.risk_level = RiskLevel::High;
1412        }
1413        if let AllocationSource::UnsafeRust {
1414            risk_assessment, ..
1415        } = &mut critical_risk_alloc.source
1416        {
1417            risk_assessment.risk_level = RiskLevel::Critical;
1418        }
1419
1420        let allocations = vec![low_risk_alloc, high_risk_alloc, critical_risk_alloc];
1421
1422        let result = processor.process_unsafe_allocations(&allocations);
1423        assert!(result.is_ok());
1424
1425        let processed = result.unwrap();
1426        assert_eq!(processed.risk_distribution.low_risk, 1);
1427        assert_eq!(processed.risk_distribution.medium_risk, 0);
1428        assert_eq!(processed.risk_distribution.high_risk, 1);
1429        assert_eq!(processed.risk_distribution.critical_risk, 1);
1430        assert!(processed.risk_distribution.overall_risk_score > 0.0);
1431    }
1432
1433    #[test]
1434    fn test_unsafe_blocks_analysis() {
1435        let processor = BatchProcessor::new();
1436
1437        let mut alloc1 = create_test_unsafe_allocation(0x1000, 64);
1438        let mut alloc2 = create_test_unsafe_allocation(0x2000, 128);
1439        let mut alloc3 = create_test_unsafe_allocation(0x3000, 256);
1440
1441        // Set different unsafe block locations
1442        if let AllocationSource::UnsafeRust {
1443            unsafe_block_location,
1444            ..
1445        } = &mut alloc1.source
1446        {
1447            *unsafe_block_location = "test.rs:10".to_string();
1448        }
1449        if let AllocationSource::UnsafeRust {
1450            unsafe_block_location,
1451            ..
1452        } = &mut alloc2.source
1453        {
1454            *unsafe_block_location = "test.rs:10".to_string(); // Same block
1455        }
1456        if let AllocationSource::UnsafeRust {
1457            unsafe_block_location,
1458            ..
1459        } = &mut alloc3.source
1460        {
1461            *unsafe_block_location = "test.rs:20".to_string(); // Different block
1462        }
1463
1464        let allocations = vec![alloc1, alloc2, alloc3];
1465
1466        let result = processor.process_unsafe_allocations(&allocations);
1467        assert!(result.is_ok());
1468
1469        let processed = result.unwrap();
1470        assert_eq!(processed.unsafe_blocks.len(), 2); // Two different blocks
1471
1472        // Find the block with 2 allocations
1473        let block_with_two = processed
1474            .unsafe_blocks
1475            .iter()
1476            .find(|b| b.allocation_count == 2)
1477            .expect("Should find block with 2 allocations");
1478
1479        assert_eq!(block_with_two.location, "test.rs:10");
1480        assert_eq!(block_with_two.total_memory, 192); // 64 + 128
1481    }
1482
1483    #[test]
1484    fn test_hook_statistics_calculation() {
1485        let processor = BatchProcessor::new();
1486
1487        let alloc1 = create_test_ffi_allocation(0x4000, 512);
1488        let mut alloc2 = create_test_ffi_allocation(0x5000, 256);
1489
1490        // Modify hook info
1491        let mut alloc1 = alloc1;
1492        if let AllocationSource::FfiC { libc_hook_info, .. } = &mut alloc1.source {
1493            libc_hook_info.hook_overhead_ns = Some(150);
1494        }
1495        if let AllocationSource::FfiC { libc_hook_info, .. } = &mut alloc2.source {
1496            libc_hook_info.hook_overhead_ns = Some(200);
1497            libc_hook_info.hook_method = HookMethod::LdPreload;
1498        }
1499
1500        let allocations = vec![alloc1, alloc2];
1501
1502        let result = processor.process_ffi_allocations(&allocations);
1503        assert!(result.is_ok());
1504
1505        let processed = result.unwrap();
1506        assert_eq!(processed.hook_statistics.total_hooks, 2);
1507        assert!(processed.hook_statistics.avg_overhead_ns > 0.0);
1508        assert_eq!(processed.hook_statistics.methods_used.len(), 2);
1509    }
1510
1511    #[test]
1512    fn test_transfer_patterns_analysis() {
1513        let processor = BatchProcessor::new();
1514        let allocations = vec![
1515            create_test_allocation_with_boundary_events(0x6000, 128),
1516            create_test_allocation_with_boundary_events(0x7000, 256),
1517        ];
1518
1519        let result = processor.process_boundary_events(&allocations);
1520        assert!(result.is_ok());
1521
1522        let processed = result.unwrap();
1523        let patterns = &processed.transfer_patterns;
1524
1525        assert!(!patterns.dominant_direction.is_empty());
1526        assert!(!patterns.frequency_by_type.is_empty());
1527        assert!(patterns.avg_transfer_size > 0);
1528    }
1529
1530    #[test]
1531    fn test_metrics_tracking() {
1532        let processor = BatchProcessor::new();
1533
1534        // Initial metrics should be empty
1535        let initial_metrics = processor.get_metrics().unwrap();
1536        assert_eq!(initial_metrics.total_items, 0);
1537        assert_eq!(initial_metrics.batch_count, 0);
1538
1539        // Process some allocations
1540        let allocations = vec![
1541            create_test_unsafe_allocation(0x1000, 64),
1542            create_test_unsafe_allocation(0x2000, 128),
1543        ];
1544
1545        let _result = processor.process_unsafe_allocations(&allocations);
1546
1547        // Check updated metrics
1548        let updated_metrics = processor.get_metrics().unwrap();
1549        assert_eq!(updated_metrics.total_items, 2);
1550        assert!(updated_metrics.throughput_items_per_sec > 0.0);
1551    }
1552
1553    #[test]
1554    fn test_metrics_reset() {
1555        let processor = BatchProcessor::new();
1556
1557        // Process some allocations to generate metrics
1558        let allocations = vec![create_test_unsafe_allocation(0x1000, 64)];
1559        let _result = processor.process_unsafe_allocations(&allocations);
1560
1561        // Verify metrics are not empty
1562        let metrics_before = processor.get_metrics().unwrap();
1563        assert!(metrics_before.total_items > 0);
1564
1565        // Reset metrics
1566        let reset_result = processor.reset_metrics();
1567        assert!(reset_result.is_ok());
1568
1569        // Verify metrics are reset
1570        let metrics_after = processor.get_metrics().unwrap();
1571        assert_eq!(metrics_after.total_items, 0);
1572        assert_eq!(metrics_after.total_processing_time_ms, 0);
1573    }
1574
1575    #[test]
1576    fn test_parallel_processing_threshold() {
1577        let config = BatchProcessorConfig {
1578            batch_size: 10,
1579            parallel_threshold: 5, // Low threshold for testing
1580            max_threads: Some(2),
1581            enable_monitoring: true,
1582            memory_limit_per_batch: None,
1583        };
1584
1585        let processor = BatchProcessor::with_config(config);
1586
1587        // Create enough allocations to trigger parallel processing
1588        let allocations: Vec<_> = (0..10)
1589            .map(|i| create_test_unsafe_allocation(0x1000 + i * 0x100, 64))
1590            .collect();
1591
1592        let result = processor.process_unsafe_allocations(&allocations);
1593        assert!(result.is_ok());
1594
1595        let processed = result.unwrap();
1596        assert_eq!(processed.total_allocations, 10);
1597
1598        // Check that parallel processing was used
1599        let metrics = processor.get_metrics().unwrap();
1600        assert!(metrics.parallel_processing_used);
1601        assert!(metrics.threads_used > 1);
1602    }
1603
1604    #[test]
1605    fn test_memory_usage_estimation() {
1606        let processor = BatchProcessor::new();
1607
1608        // Test the private method through public interface
1609        let allocations = vec![
1610            create_test_unsafe_allocation(0x1000, 64),
1611            create_test_unsafe_allocation(0x2000, 128),
1612        ];
1613
1614        let result = processor.process_unsafe_allocations(&allocations);
1615        assert!(result.is_ok());
1616
1617        let processed = result.unwrap();
1618        assert!(processed.performance_metrics.memory_usage_bytes > 0);
1619        // Should be roughly 2 * 1024 bytes for 2 items
1620        assert!(processed.performance_metrics.memory_usage_bytes >= 2048);
1621    }
1622
1623    #[test]
1624    fn test_boundary_risk_analysis() {
1625        let processor = BatchProcessor::new();
1626        let allocations = vec![create_test_allocation_with_boundary_events(0x6000, 128)];
1627
1628        let result = processor.process_boundary_events(&allocations);
1629        assert!(result.is_ok());
1630
1631        let processed = result.unwrap();
1632        let risk_analysis = &processed.risk_analysis;
1633
1634        assert!(risk_analysis.overall_risk_score >= 0.0);
1635        assert!(risk_analysis.overall_risk_score <= 10.0);
1636        assert!(!risk_analysis.common_risk_patterns.is_empty());
1637        assert!(!risk_analysis.mitigation_recommendations.is_empty());
1638    }
1639
1640    #[test]
1641    fn test_performance_metrics_structure() {
1642        let processor = BatchProcessor::new();
1643        let allocations = vec![create_test_unsafe_allocation(0x1000, 64)];
1644
1645        let result = processor.process_unsafe_allocations(&allocations);
1646        assert!(result.is_ok());
1647
1648        let processed = result.unwrap();
1649        let metrics = &processed.performance_metrics;
1650
1651        assert!(metrics.memory_usage_bytes > 0);
1652        assert_eq!(metrics.risk_assessments_performed, 1);
1653        assert!(metrics.avg_risk_assessment_time_ns > 0.0);
1654    }
1655
1656    #[test]
1657    fn test_default_implementation() {
1658        let processor1 = BatchProcessor::default();
1659        let processor2 = BatchProcessor::new();
1660
1661        // Both should have the same configuration
1662        assert_eq!(processor1.config.batch_size, processor2.config.batch_size);
1663        assert_eq!(
1664            processor1.config.parallel_threshold,
1665            processor2.config.parallel_threshold
1666        );
1667        assert_eq!(
1668            processor1.config.enable_monitoring,
1669            processor2.config.enable_monitoring
1670        );
1671    }
1672}