1use crate::export::binary::error::BinaryExportError;
8use crate::export::binary::field_parser::{FieldParser, PartialAllocationInfo};
9use crate::export::binary::selective_reader::AllocationField;
10use std::collections::HashSet;
11use std::io::{Read, Seek, SeekFrom};
12
13pub struct BatchProcessor {
15 field_parser: FieldParser,
17
18 config: BatchProcessorConfig,
20
21 stats: BatchProcessorStats,
23
24 record_buffer: Vec<u8>,
26
27 record_cache: Vec<CachedRecord>,
29}
30
31#[derive(Debug, Clone)]
33pub struct BatchProcessorConfig {
34 pub batch_size: usize,
36
37 pub buffer_size: usize,
39
40 pub enable_prefetching: bool,
42
43 pub prefetch_count: usize,
45
46 pub enable_record_caching: bool,
48
49 pub max_cache_size: usize,
51
52 pub optimize_cpu_cache: bool,
54
55 pub use_memory_mapping: bool,
57}
58
59impl Default for BatchProcessorConfig {
60 fn default() -> Self {
61 Self {
62 batch_size: 1000,
63 buffer_size: 64 * 1024, enable_prefetching: true,
65 prefetch_count: 100,
66 enable_record_caching: true,
67 max_cache_size: 5000,
68 optimize_cpu_cache: true,
69 use_memory_mapping: false,
70 }
71 }
72}
73
74#[derive(Debug, Clone, Default)]
76pub struct BatchProcessorStats {
77 pub batches_processed: u64,
79
80 pub records_processed: u64,
82
83 pub cache_hits: u64,
85
86 pub cache_misses: u64,
88
89 pub prefetch_operations: u64,
91
92 pub total_processing_time_us: u64,
94
95 pub io_time_us: u64,
97
98 pub parsing_time_us: u64,
100
101 pub bytes_read: u64,
103
104 pub allocations_avoided: u64,
106}
107
108impl BatchProcessorStats {
109 pub fn cache_hit_rate(&self) -> f64 {
111 let total_requests = self.cache_hits + self.cache_misses;
112 if total_requests == 0 {
113 0.0
114 } else {
115 (self.cache_hits as f64 / total_requests as f64) * 100.0
116 }
117 }
118
119 pub fn avg_records_per_batch(&self) -> f64 {
121 if self.batches_processed == 0 {
122 0.0
123 } else {
124 self.records_processed as f64 / self.batches_processed as f64
125 }
126 }
127
128 pub fn processing_throughput(&self) -> f64 {
130 if self.total_processing_time_us == 0 {
131 0.0
132 } else {
133 (self.records_processed as f64 * 1_000_000.0) / self.total_processing_time_us as f64
134 }
135 }
136
137 pub fn io_efficiency(&self) -> f64 {
139 if self.io_time_us == 0 {
140 0.0
141 } else {
142 self.bytes_read as f64 / self.io_time_us as f64
143 }
144 }
145}
146
147#[derive(Debug, Clone)]
149struct CachedRecord {
150 offset: u64,
152
153 data: PartialAllocationInfo,
155
156 cached_at: std::time::Instant,
158
159 access_count: u32,
161}
162
163#[derive(Debug)]
165pub struct RecordBatch {
166 pub records: Vec<PartialAllocationInfo>,
168
169 pub metadata: BatchMetadata,
171}
172
173#[derive(Debug, Clone)]
175pub struct BatchMetadata {
176 pub start_offset: u64,
178
179 pub end_offset: u64,
181
182 pub record_count: usize,
184
185 pub total_size: u64,
187
188 pub parsed_fields: HashSet<AllocationField>,
190}
191
192impl BatchProcessor {
193 pub fn new() -> Self {
195 Self::with_config(BatchProcessorConfig::default())
196 }
197
198 pub fn with_config(config: BatchProcessorConfig) -> Self {
200 Self {
201 field_parser: FieldParser::new(),
202 config: config.clone(),
203 stats: BatchProcessorStats::default(),
204 record_buffer: Vec::with_capacity(config.buffer_size),
205 record_cache: Vec::new(),
206 }
207 }
208
209 pub fn process_batch<R: Read + Seek>(
211 &mut self,
212 reader: &mut R,
213 record_offsets: &[u64],
214 requested_fields: &HashSet<AllocationField>,
215 ) -> Result<RecordBatch, BinaryExportError> {
216 let start_time = std::time::Instant::now();
217
218 let mut sorted_offsets = record_offsets.to_vec();
220 sorted_offsets.sort_unstable();
221
222 let mut records = Vec::with_capacity(sorted_offsets.len());
223 let mut start_offset = u64::MAX;
224 let mut end_offset = 0u64;
225 let mut total_size = 0u64;
226
227 for chunk in sorted_offsets.chunks(self.config.batch_size) {
229 let chunk_records = self.process_record_chunk(reader, chunk, requested_fields)?;
230
231 for (offset, record) in chunk.iter().zip(chunk_records.iter()) {
232 start_offset = start_offset.min(*offset);
233 end_offset = end_offset.max(*offset);
234 total_size += self.estimate_record_size(record);
235 }
236
237 records.extend(chunk_records);
238 }
239
240 let metadata = BatchMetadata {
241 start_offset,
242 end_offset,
243 record_count: records.len(),
244 total_size,
245 parsed_fields: requested_fields.clone(),
246 };
247
248 self.stats.batches_processed += 1;
249 self.stats.records_processed += records.len() as u64;
250 self.stats.total_processing_time_us += start_time.elapsed().as_micros() as u64;
251
252 Ok(RecordBatch { records, metadata })
253 }
254
255 pub fn process_with_prefetch<R: Read + Seek>(
257 &mut self,
258 reader: &mut R,
259 record_offsets: &[u64],
260 requested_fields: &HashSet<AllocationField>,
261 ) -> Result<Vec<PartialAllocationInfo>, BinaryExportError> {
262 if !self.config.enable_prefetching {
263 let batch = self.process_batch(reader, record_offsets, requested_fields)?;
264 return Ok(batch.records);
265 }
266
267 let mut results = Vec::with_capacity(record_offsets.len());
268
269 for chunk in record_offsets.chunks(self.config.prefetch_count) {
271 if chunk.len() == self.config.prefetch_count {
273 self.prefetch_records(reader, chunk)?;
274 }
275
276 let chunk_results = self.process_record_chunk(reader, chunk, requested_fields)?;
277 results.extend(chunk_results);
278 }
279
280 Ok(results)
281 }
282
283 pub fn process_streaming<R: Read + Seek, F>(
285 &mut self,
286 reader: &mut R,
287 record_offsets: &[u64],
288 requested_fields: &HashSet<AllocationField>,
289 mut callback: F,
290 ) -> Result<usize, BinaryExportError>
291 where
292 F: FnMut(&RecordBatch) -> Result<bool, BinaryExportError>, {
294 let mut processed_count = 0;
295
296 for chunk in record_offsets.chunks(self.config.batch_size) {
298 let batch = self.process_batch(reader, chunk, requested_fields)?;
299 processed_count += batch.records.len();
300
301 if !callback(&batch)? {
302 break;
303 }
304 }
305
306 Ok(processed_count)
307 }
308
309 pub fn get_stats(&self) -> &BatchProcessorStats {
311 &self.stats
312 }
313
314 pub fn reset_stats(&mut self) {
316 self.stats = BatchProcessorStats::default();
317 }
318
319 pub fn clear_cache(&mut self) {
321 self.record_cache.clear();
322 }
323
324 pub fn cache_size(&self) -> usize {
326 self.record_cache.len()
327 }
328
329 fn process_record_chunk<R: Read + Seek>(
333 &mut self,
334 reader: &mut R,
335 offsets: &[u64],
336 requested_fields: &HashSet<AllocationField>,
337 ) -> Result<Vec<PartialAllocationInfo>, BinaryExportError> {
338 let mut records = Vec::with_capacity(offsets.len());
339
340 for &offset in offsets {
341 if let Some(cached_record) = self.get_cached_record(offset) {
343 records.push(cached_record);
344 self.stats.cache_hits += 1;
345 continue;
346 }
347
348 let io_start = std::time::Instant::now();
350 reader.seek(SeekFrom::Start(offset))?;
351 self.stats.io_time_us += io_start.elapsed().as_micros() as u64;
352
353 let parse_start = std::time::Instant::now();
354 let record = self
355 .field_parser
356 .parse_selective_fields(reader, requested_fields)?;
357 self.stats.parsing_time_us += parse_start.elapsed().as_micros() as u64;
358
359 if self.config.enable_record_caching {
361 self.cache_record(offset, record.clone());
362 }
363
364 records.push(record);
365 self.stats.cache_misses += 1;
366 }
367
368 Ok(records)
369 }
370
371 fn prefetch_records<R: Read + Seek>(
373 &mut self,
374 reader: &mut R,
375 offsets: &[u64],
376 ) -> Result<(), BinaryExportError> {
377 if offsets.is_empty() {
378 return Ok(());
379 }
380
381 let min_offset = *offsets.iter().min().expect("Operation failed");
383 let max_offset = *offsets.iter().max().expect("Operation failed");
384
385 let prefetch_size = (max_offset - min_offset + 1024).min(self.config.buffer_size as u64);
387
388 reader.seek(SeekFrom::Start(min_offset))?;
390 self.record_buffer.clear();
391 self.record_buffer.resize(prefetch_size as usize, 0);
392
393 let bytes_read = reader.read(&mut self.record_buffer)?;
394 self.record_buffer.truncate(bytes_read);
395
396 self.stats.prefetch_operations += 1;
397 self.stats.bytes_read += bytes_read as u64;
398
399 Ok(())
400 }
401
402 fn get_cached_record(&mut self, offset: u64) -> Option<PartialAllocationInfo> {
404 if !self.config.enable_record_caching {
405 return None;
406 }
407
408 if let Some(index) = self.record_cache.iter().position(|r| r.offset == offset) {
409 let cached = &mut self.record_cache[index];
410 cached.access_count += 1;
411 Some(cached.data.clone())
412 } else {
413 None
414 }
415 }
416
417 fn cache_record(&mut self, offset: u64, record: PartialAllocationInfo) {
419 if !self.config.enable_record_caching {
420 return;
421 }
422
423 if self.record_cache.len() >= self.config.max_cache_size {
425 self.evict_lru_record();
426 }
427
428 let cached_record = CachedRecord {
429 offset,
430 data: record,
431 cached_at: std::time::Instant::now(),
432 access_count: 1,
433 };
434
435 self.record_cache.push(cached_record);
436 }
437
438 fn evict_lru_record(&mut self) {
440 if let Some(lru_index) = self
441 .record_cache
442 .iter()
443 .enumerate()
444 .min_by_key(|(_, r)| (r.access_count, r.cached_at))
445 .map(|(i, _)| i)
446 {
447 self.record_cache.remove(lru_index);
448 }
449 }
450
451 fn estimate_record_size(&self, record: &PartialAllocationInfo) -> u64 {
453 let mut size = 24; if let Some(Some(ref var_name)) = record.var_name {
457 size += var_name.len() as u64 + 4; }
459
460 if let Some(Some(ref type_name)) = record.type_name {
461 size += type_name.len() as u64 + 4;
462 }
463
464 if let Some(ref thread_id) = record.thread_id {
465 size += thread_id.len() as u64 + 4;
466 }
467
468 if let Some(Some(ref stack_trace)) = record.stack_trace {
469 size += stack_trace.iter().map(|s| s.len() as u64 + 4).sum::<u64>() + 4;
470 }
472
473 size += 16; size
476 }
477}
478
479impl Default for BatchProcessor {
480 fn default() -> Self {
481 Self::new()
482 }
483}
484
485pub struct BatchProcessorBuilder {
487 config: BatchProcessorConfig,
488}
489
490impl BatchProcessorBuilder {
491 pub fn new() -> Self {
493 Self {
494 config: BatchProcessorConfig::default(),
495 }
496 }
497
498 pub fn batch_size(mut self, size: usize) -> Self {
500 self.config.batch_size = size;
501 self
502 }
503
504 pub fn buffer_size(mut self, size: usize) -> Self {
506 self.config.buffer_size = size;
507 self
508 }
509
510 pub fn prefetching(mut self, enabled: bool) -> Self {
512 self.config.enable_prefetching = enabled;
513 self
514 }
515
516 pub fn prefetch_count(mut self, count: usize) -> Self {
518 self.config.prefetch_count = count;
519 self
520 }
521
522 pub fn caching(mut self, enabled: bool) -> Self {
524 self.config.enable_record_caching = enabled;
525 self
526 }
527
528 pub fn max_cache_size(mut self, size: usize) -> Self {
530 self.config.max_cache_size = size;
531 self
532 }
533
534 pub fn cpu_cache_optimization(mut self, enabled: bool) -> Self {
536 self.config.optimize_cpu_cache = enabled;
537 self
538 }
539
540 pub fn memory_mapping(mut self, enabled: bool) -> Self {
542 self.config.use_memory_mapping = enabled;
543 self
544 }
545
546 pub fn build(self) -> BatchProcessor {
548 BatchProcessor::with_config(self.config)
549 }
550}
551
552impl Default for BatchProcessorBuilder {
553 fn default() -> Self {
554 Self::new()
555 }
556}
557
558#[cfg(test)]
559mod tests {
560 use super::*;
561 use std::io::Cursor;
562
563 #[allow(dead_code)]
564 fn create_test_binary_data() -> (Vec<u8>, Vec<u64>) {
565 use crate::core::types::AllocationInfo;
566 use crate::export::binary::index_builder::BinaryIndexBuilder;
567 use crate::export::binary::writer::BinaryWriter;
568 use tempfile::NamedTempFile;
569
570 let mut allocations = Vec::new();
572 for i in 0..5 {
573 allocations.push(AllocationInfo {
574 ptr: 0x1000 + i * 0x100,
575 size: 1024 + i * 100,
576 var_name: Some(format!("var_{i}")),
577 type_name: Some(format!("Type{i}")),
578 scope_name: None,
579 timestamp_alloc: 1234567890 + i as u64,
580 timestamp_dealloc: None,
581 thread_id: format!("thread_{i}"),
582 borrow_count: i,
583 stack_trace: None,
584 is_leaked: false,
585 lifetime_ms: None,
586 borrow_info: None,
587 clone_info: None,
588 ownership_history_available: false,
589 smart_pointer_info: None,
590 memory_layout: None,
591 generic_info: None,
592 dynamic_type_info: None,
593 runtime_state: None,
594 stack_allocation: None,
595 temporary_object: None,
596 fragmentation_analysis: None,
597 generic_instantiation: None,
598 type_relationships: None,
599 type_usage: None,
600 function_call_tracking: None,
601 lifecycle_tracking: None,
602 access_tracking: None,
603 drop_chain_analysis: None,
604 });
605 }
606
607 let temp_file = NamedTempFile::new().expect("Failed to create temp file");
609 {
610 let config = crate::export::binary::BinaryExportConfig::minimal();
611 let mut writer = BinaryWriter::new_with_config(temp_file.path(), &config)
612 .expect("Failed to create temp file");
613 writer
614 .write_header(allocations.len() as u32)
615 .expect("Operation failed");
616
617 for alloc in &allocations {
618 writer
619 .write_allocation(alloc)
620 .expect("Failed to write allocation");
621 }
622 writer.finish().expect("Failed to finish writing");
623
624 let index_builder = BinaryIndexBuilder::new();
626 let index = index_builder
627 .build_index(temp_file.path())
628 .expect("Operation failed");
629
630 let mut offsets = Vec::new();
631 for i in 0..allocations.len() {
632 if let Some(offset) = index.get_record_offset(i) {
633 offsets.push(offset);
634 }
635 }
636
637 let data = std::fs::read(temp_file.path()).expect("Operation failed");
638 (data, offsets)
639 }
640 }
641
642 #[test]
643 fn test_batch_processor_creation() {
644 let processor = BatchProcessor::new();
645 assert_eq!(processor.cache_size(), 0);
646 assert_eq!(processor.get_stats().batches_processed, 0);
647 }
648
649 #[test]
650 fn test_batch_processor_builder() {
651 let processor = BatchProcessorBuilder::new()
652 .batch_size(500)
653 .buffer_size(32 * 1024)
654 .prefetching(false)
655 .caching(false)
656 .build();
657
658 assert_eq!(processor.config.batch_size, 500);
659 assert_eq!(processor.config.buffer_size, 32 * 1024);
660 assert!(!processor.config.enable_prefetching);
661 assert!(!processor.config.enable_record_caching);
662 }
663
664 #[test]
665 fn test_batch_processing() {
666 let mut processor = BatchProcessor::new();
668
669 assert_eq!(processor.get_stats().batches_processed, 0);
671 assert_eq!(processor.cache_size(), 0);
672
673 let partial_info = PartialAllocationInfo::new();
675 processor.cache_record(100, partial_info);
676 assert_eq!(processor.cache_size(), 1);
677
678 processor.clear_cache();
679 assert_eq!(processor.cache_size(), 0);
680 }
681
682 #[test]
683 fn test_prefetch_processing() {
684 let processor = BatchProcessorBuilder::new()
685 .prefetching(true)
686 .prefetch_count(3)
687 .build();
688
689 assert!(processor.config.enable_prefetching);
691 assert_eq!(processor.config.prefetch_count, 3);
692 assert_eq!(processor.get_stats().prefetch_operations, 0);
693 }
694
695 #[test]
696 fn test_streaming_processing() {
697 let mut processor = BatchProcessor::new();
698
699 let empty_data = Vec::new();
701 let mut cursor = Cursor::new(empty_data);
702 let record_offsets = Vec::new();
703 let requested_fields = [AllocationField::Ptr].into_iter().collect();
704
705 let mut batch_count = 0;
706 let _processed_count = processor
707 .process_streaming(&mut cursor, &record_offsets, &requested_fields, |_batch| {
708 batch_count += 1;
709 Ok(true) })
711 .expect("Test operation failed");
712
713 assert_eq!(batch_count, 0);
715 assert_eq!(_processed_count, 0);
716 }
717
718 #[test]
719 fn test_caching() {
720 let processor = BatchProcessorBuilder::new()
721 .caching(true)
722 .max_cache_size(10)
723 .build();
724
725 assert!(processor.config.enable_record_caching);
727 assert_eq!(processor.config.max_cache_size, 10);
728 assert_eq!(processor.cache_size(), 0);
729 }
730
731 #[test]
732 fn test_batch_metadata() {
733 let processor = BatchProcessor::new();
734
735 assert_eq!(processor.config.batch_size, 1000);
737 assert_eq!(processor.get_stats().batches_processed, 0);
738 }
739
740 #[test]
741 fn test_batch_processor_stats() {
742 let processor = BatchProcessor::new();
743
744 let stats = processor.get_stats();
745 assert_eq!(stats.batches_processed, 0);
746 assert_eq!(stats.records_processed, 0);
747 assert_eq!(stats.total_processing_time_us, 0);
748 assert_eq!(stats.avg_records_per_batch(), 0.0);
749
750 assert_eq!(stats.cache_hit_rate(), 0.0);
752 assert_eq!(stats.processing_throughput(), 0.0);
753 }
754
755 #[test]
756 fn test_cache_operations() {
757 let mut processor = BatchProcessorBuilder::new().caching(true).build();
758
759 assert_eq!(processor.cache_size(), 0);
760
761 let partial_info = PartialAllocationInfo::new();
763 processor.cache_record(100, partial_info);
764 assert_eq!(processor.cache_size(), 1);
765
766 processor.clear_cache();
768 assert_eq!(processor.cache_size(), 0);
769 }
770
771 #[test]
772 fn test_stats_reset() {
773 let mut processor = BatchProcessor::new();
774
775 assert_eq!(processor.get_stats().batches_processed, 0);
777
778 processor.stats.batches_processed = 5;
780 processor.stats.records_processed = 100;
781
782 assert_eq!(processor.get_stats().batches_processed, 5);
783 assert_eq!(processor.get_stats().records_processed, 100);
784
785 processor.reset_stats();
786 assert_eq!(processor.get_stats().batches_processed, 0);
787 assert_eq!(processor.get_stats().records_processed, 0);
788 }
789}