1use crate::export::binary::error::BinaryExportError;
9use crate::export::binary::field_parser::PartialAllocationInfo;
10use crate::export::binary::selective_reader::AllocationField;
11use std::collections::{HashMap, HashSet};
12use std::time::Instant;
13
14#[derive(Debug, Clone)]
16pub struct StreamingFieldProcessorConfig {
17 pub max_cache_size: usize,
19
20 pub enable_field_caching: bool,
22
23 pub enable_preformatted_fields: bool,
25
26 pub max_memory_usage: usize,
28
29 pub enable_statistics: bool,
31
32 pub batch_size: usize,
34}
35
36impl Default for StreamingFieldProcessorConfig {
37 fn default() -> Self {
38 Self {
39 max_cache_size: 1000,
40 enable_field_caching: true,
41 enable_preformatted_fields: true,
42 max_memory_usage: 16 * 1024 * 1024, enable_statistics: true,
44 batch_size: 100,
45 }
46 }
47}
48
49#[derive(Debug, Clone, Default)]
51pub struct StreamingFieldProcessorStats {
52 pub records_processed: u64,
54
55 pub fields_processed: u64,
57
58 pub cache_hits: u64,
60
61 pub cache_misses: u64,
63
64 pub cache_evictions: u64,
66
67 pub total_processing_time_us: u64,
69
70 pub field_formatting_time_us: u64,
72
73 pub current_memory_usage: usize,
75
76 pub peak_memory_usage: usize,
78
79 pub preformatted_fields_used: u64,
81
82 pub records_discarded: u64,
84}
85
86impl StreamingFieldProcessorStats {
87 pub fn cache_hit_rate(&self) -> f64 {
89 let total_requests = self.cache_hits + self.cache_misses;
90 if total_requests == 0 {
91 0.0
92 } else {
93 (self.cache_hits as f64 / total_requests as f64) * 100.0
94 }
95 }
96
97 pub fn processing_throughput(&self) -> f64 {
99 if self.total_processing_time_us == 0 {
100 0.0
101 } else {
102 (self.records_processed as f64 * 1_000_000.0) / self.total_processing_time_us as f64
103 }
104 }
105
106 pub fn field_processing_efficiency(&self) -> f64 {
108 if self.fields_processed == 0 {
109 0.0
110 } else {
111 (self.preformatted_fields_used as f64 / self.fields_processed as f64) * 100.0
112 }
113 }
114
115 pub fn memory_efficiency(&self) -> f64 {
117 if self.records_processed == 0 {
118 0.0
119 } else {
120 self.current_memory_usage as f64 / self.records_processed as f64
121 }
122 }
123}
124
125#[derive(Debug, Clone)]
127struct CachedFieldValue {
128 value: String,
130
131 last_accessed: Instant,
133
134 access_count: u32,
136
137 memory_usage: usize,
139}
140
141impl CachedFieldValue {
142 fn new(value: String) -> Self {
143 let memory_usage = value.len() + std::mem::size_of::<Self>();
144 Self {
145 value,
146 last_accessed: Instant::now(),
147 access_count: 1,
148 memory_usage,
149 }
150 }
151
152 fn access(&mut self) -> &str {
153 self.last_accessed = Instant::now();
154 self.access_count += 1;
155 &self.value
156 }
157}
158
159#[derive(Debug, Clone)]
161pub struct OptimizedRecord {
162 pub allocation: PartialAllocationInfo,
164
165 pub preformatted_fields: HashMap<AllocationField, String>,
167
168 pub created_at: Instant,
170
171 pub memory_usage: usize,
173}
174
175impl OptimizedRecord {
176 pub fn new(allocation: PartialAllocationInfo) -> Self {
178 let mut record = Self {
179 allocation,
180 preformatted_fields: HashMap::new(),
181 created_at: Instant::now(),
182 memory_usage: 0,
183 };
184
185 record.memory_usage = record.estimate_memory_usage();
187 record
188 }
189
190 pub fn preformat_fields(&mut self, fields: &HashSet<AllocationField>) {
192 for field in fields {
193 if let Some(formatted) = self.format_field(field) {
194 self.preformatted_fields.insert(*field, formatted);
195 }
196 }
197
198 self.memory_usage = self.estimate_memory_usage();
200 }
201
202 pub fn get_formatted_field(&self, field: &AllocationField) -> Option<String> {
204 if let Some(cached) = self.preformatted_fields.get(field) {
206 return Some(cached.clone());
207 }
208
209 self.format_field(field)
211 }
212
213 fn format_field(&self, field: &AllocationField) -> Option<String> {
215 match field {
216 AllocationField::Ptr => self.allocation.ptr.map(|ptr| format!("\"0x{ptr:x}\"")),
217 AllocationField::Size => self.allocation.size.map(|size| size.to_string()),
218 AllocationField::VarName => {
219 self.allocation
220 .var_name
221 .as_ref()
222 .map(|var_name| match var_name {
223 Some(name) => format!("\"{}\"", name.replace('"', "\\\"")),
224 None => "null".to_string(),
225 })
226 }
227 AllocationField::TypeName => {
228 self.allocation
229 .type_name
230 .as_ref()
231 .map(|type_name| match type_name {
232 Some(name) => format!("\"{}\"", name.replace('"', "\\\"")),
233 None => "null".to_string(),
234 })
235 }
236 AllocationField::ScopeName => {
237 self.allocation
238 .scope_name
239 .as_ref()
240 .map(|scope_name| match scope_name {
241 Some(name) => format!("\"{}\"", name.replace('"', "\\\"")),
242 None => "null".to_string(),
243 })
244 }
245 AllocationField::TimestampAlloc => {
246 self.allocation.timestamp_alloc.map(|ts| ts.to_string())
247 }
248 AllocationField::TimestampDealloc => {
249 self.allocation
250 .timestamp_dealloc
251 .as_ref()
252 .map(|ts_opt| match ts_opt {
253 Some(ts) => ts.to_string(),
254 None => "null".to_string(),
255 })
256 }
257 AllocationField::ThreadId => self
258 .allocation
259 .thread_id
260 .as_ref()
261 .map(|thread_id| format!("\"{}\"", thread_id.replace('"', "\\\""))),
262 AllocationField::BorrowCount => {
263 self.allocation.borrow_count.map(|count| count.to_string())
264 }
265 AllocationField::IsLeaked => self
266 .allocation
267 .is_leaked
268 .map(|leaked| if leaked { "true" } else { "false" }.to_string()),
269 AllocationField::StackTrace => {
270 self.allocation
271 .stack_trace
272 .as_ref()
273 .map(|stack_trace_opt| match stack_trace_opt {
274 Some(trace) => {
275 let trace_json: Vec<String> = trace
276 .iter()
277 .map(|s| format!("\"{}\"", s.replace('"', "\\\"")))
278 .collect();
279 format!("[{}]", trace_json.join(", "))
280 }
281 None => "null".to_string(),
282 })
283 }
284 AllocationField::LifetimeMs => {
285 self.allocation
286 .lifetime_ms
287 .as_ref()
288 .map(|lifetime_opt| match lifetime_opt {
289 Some(ms) => ms.to_string(),
290 None => "null".to_string(),
291 })
292 }
293 _ => None, }
295 }
296
297 fn estimate_memory_usage(&self) -> usize {
299 let mut usage = std::mem::size_of::<Self>();
300
301 for value in self.preformatted_fields.values() {
303 usage += value.len() + std::mem::size_of::<String>();
304 }
305
306 usage += 256; usage
310 }
311}
312
313pub struct StreamingFieldProcessor {
315 config: StreamingFieldProcessorConfig,
317
318 field_cache: HashMap<String, CachedFieldValue>,
320
321 stats: StreamingFieldProcessorStats,
323
324 current_memory_usage: usize,
326}
327
328impl StreamingFieldProcessor {
329 pub fn new() -> Self {
331 Self::with_config(StreamingFieldProcessorConfig::default())
332 }
333
334 pub fn with_config(config: StreamingFieldProcessorConfig) -> Self {
336 Self {
337 config,
338 field_cache: HashMap::new(),
339 stats: StreamingFieldProcessorStats::default(),
340 current_memory_usage: 0,
341 }
342 }
343
344 pub fn process_record_streaming<F>(
346 &mut self,
347 allocation: PartialAllocationInfo,
348 requested_fields: &HashSet<AllocationField>,
349 mut processor: F,
350 ) -> Result<(), BinaryExportError>
351 where
352 F: FnMut(&OptimizedRecord) -> Result<(), BinaryExportError>,
353 {
354 let start_time = Instant::now();
355
356 let mut record = OptimizedRecord::new(allocation);
358
359 if self.config.enable_preformatted_fields {
361 record.preformat_fields(requested_fields);
362 }
363
364 self.current_memory_usage += record.memory_usage;
366 if self.current_memory_usage > self.stats.peak_memory_usage {
367 self.stats.peak_memory_usage = self.current_memory_usage;
368 }
369
370 processor(&record)?;
372
373 self.current_memory_usage -= record.memory_usage;
375 self.stats.records_discarded += 1;
376
377 self.stats.records_processed += 1;
379 self.stats.fields_processed += requested_fields.len() as u64;
380 self.stats.total_processing_time_us += start_time.elapsed().as_micros() as u64;
381
382 if self.current_memory_usage > self.config.max_memory_usage {
384 self.evict_cache_entries()?;
385 }
386
387 Ok(())
388 }
389
390 pub fn process_records_streaming<F>(
392 &mut self,
393 allocations: Vec<PartialAllocationInfo>,
394 requested_fields: &HashSet<AllocationField>,
395 mut processor: F,
396 ) -> Result<(), BinaryExportError>
397 where
398 F: FnMut(&OptimizedRecord) -> Result<(), BinaryExportError>,
399 {
400 for batch in allocations.chunks(self.config.batch_size) {
402 for allocation in batch {
403 self.process_record_streaming(
404 allocation.clone(),
405 requested_fields,
406 &mut processor,
407 )?;
408 }
409
410 if self.current_memory_usage > self.config.max_memory_usage / 2 {
412 self.evict_cache_entries()?;
413 }
414 }
415
416 Ok(())
417 }
418
419 pub fn get_or_format_field(
421 &mut self,
422 cache_key: &str,
423 field: &AllocationField,
424 allocation: &PartialAllocationInfo,
425 ) -> Result<Option<String>, BinaryExportError> {
426 if !self.config.enable_field_caching {
427 return Ok(self.format_field_direct(field, allocation));
429 }
430
431 if let Some(cached) = self.field_cache.get_mut(cache_key) {
433 self.stats.cache_hits += 1;
434 return Ok(Some(cached.access().to_string()));
435 }
436
437 if let Some(formatted) = self.format_field_direct(field, allocation) {
439 let cached_value = CachedFieldValue::new(formatted.clone());
440 self.current_memory_usage += cached_value.memory_usage;
441
442 self.field_cache.insert(cache_key.to_string(), cached_value);
443 self.stats.cache_misses += 1;
444
445 if self.field_cache.len() > self.config.max_cache_size {
447 self.evict_lru_entry()?;
448 }
449
450 Ok(Some(formatted))
451 } else {
452 self.stats.cache_misses += 1;
453 Ok(None)
454 }
455 }
456
457 pub fn get_stats(&self) -> &StreamingFieldProcessorStats {
459 &self.stats
460 }
461
462 pub fn reset_stats(&mut self) {
464 self.stats = StreamingFieldProcessorStats::default();
465 }
466
467 pub fn clear_cache(&mut self) {
469 self.field_cache.clear();
470 self.current_memory_usage = 0;
471 }
472
473 pub fn cache_size(&self) -> usize {
475 self.field_cache.len()
476 }
477
478 pub fn memory_usage(&self) -> usize {
480 self.current_memory_usage
481 }
482
483 fn format_field_direct(
487 &self,
488 field: &AllocationField,
489 allocation: &PartialAllocationInfo,
490 ) -> Option<String> {
491 let temp_record = OptimizedRecord::new(allocation.clone());
493 temp_record.format_field(field)
494 }
495
496 fn evict_cache_entries(&mut self) -> Result<(), BinaryExportError> {
498 let target_size = self.config.max_cache_size / 2;
499
500 while self.field_cache.len() > target_size {
501 self.evict_lru_entry()?;
502 }
503
504 Ok(())
505 }
506
507 fn evict_lru_entry(&mut self) -> Result<(), BinaryExportError> {
509 if let Some((lru_key, lru_value)) = self
510 .field_cache
511 .iter()
512 .min_by_key(|(_, v)| (v.access_count, v.last_accessed))
513 .map(|(k, v)| (k.clone(), v.clone()))
514 {
515 self.current_memory_usage -= lru_value.memory_usage;
516 self.field_cache.remove(&lru_key);
517 self.stats.cache_evictions += 1;
518 }
519
520 Ok(())
521 }
522}
523
524impl Default for StreamingFieldProcessor {
525 fn default() -> Self {
526 Self::new()
527 }
528}
529
530pub struct StreamingFieldProcessorConfigBuilder {
532 config: StreamingFieldProcessorConfig,
533}
534
535impl StreamingFieldProcessorConfigBuilder {
536 pub fn new() -> Self {
538 Self {
539 config: StreamingFieldProcessorConfig::default(),
540 }
541 }
542
543 pub fn max_cache_size(mut self, size: usize) -> Self {
545 self.config.max_cache_size = size;
546 self
547 }
548
549 pub fn field_caching(mut self, enabled: bool) -> Self {
551 self.config.enable_field_caching = enabled;
552 self
553 }
554
555 pub fn preformatted_fields(mut self, enabled: bool) -> Self {
557 self.config.enable_preformatted_fields = enabled;
558 self
559 }
560
561 pub fn max_memory_usage(mut self, bytes: usize) -> Self {
563 self.config.max_memory_usage = bytes;
564 self
565 }
566
567 pub fn statistics(mut self, enabled: bool) -> Self {
569 self.config.enable_statistics = enabled;
570 self
571 }
572
573 pub fn batch_size(mut self, size: usize) -> Self {
575 self.config.batch_size = size;
576 self
577 }
578
579 pub fn build(self) -> StreamingFieldProcessorConfig {
581 self.config
582 }
583}
584
585impl Default for StreamingFieldProcessorConfigBuilder {
586 fn default() -> Self {
587 Self::new()
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594
595 #[test]
596 fn test_streaming_field_processor_creation() {
597 let processor = StreamingFieldProcessor::new();
598 assert_eq!(processor.cache_size(), 0);
599 assert_eq!(processor.memory_usage(), 0);
600 }
601
602 #[test]
603 fn test_config_builder() {
604 let config = StreamingFieldProcessorConfigBuilder::new()
605 .max_cache_size(500)
606 .field_caching(false)
607 .preformatted_fields(false)
608 .max_memory_usage(8 * 1024 * 1024)
609 .build();
610
611 assert_eq!(config.max_cache_size, 500);
612 assert!(!config.enable_field_caching);
613 assert!(!config.enable_preformatted_fields);
614 assert_eq!(config.max_memory_usage, 8 * 1024 * 1024);
615 }
616
617 #[test]
618 fn test_optimized_record_creation() {
619 let allocation = PartialAllocationInfo {
620 ptr: Some(0x1000),
621 size: Some(1024),
622 var_name: Some(Some("test_var".to_string())),
623 type_name: Some(Some("i32".to_string())),
624 scope_name: Some(None),
625 timestamp_alloc: Some(1234567890),
626 timestamp_dealloc: Some(None),
627 thread_id: Some("main".to_string()),
628 borrow_count: Some(0),
629 stack_trace: Some(None),
630 is_leaked: Some(false),
631 lifetime_ms: Some(None),
632 borrow_info: None,
634 clone_info: None,
635 ownership_history_available: Some(false),
636 };
637
638 let record = OptimizedRecord::new(allocation);
639 assert!(record.memory_usage > 0);
640 assert!(record.preformatted_fields.is_empty());
641 }
642
643 #[test]
644 fn test_field_formatting() {
645 let allocation = PartialAllocationInfo {
646 ptr: Some(0x1000),
647 size: Some(1024),
648 var_name: Some(Some("test_var".to_string())),
649 type_name: Some(Some("i32".to_string())),
650 scope_name: Some(None),
651 timestamp_alloc: Some(1234567890),
652 timestamp_dealloc: Some(None),
653 thread_id: Some("main".to_string()),
654 borrow_count: Some(0),
655 stack_trace: Some(None),
656 is_leaked: Some(false),
657 lifetime_ms: Some(None),
658 borrow_info: None,
660 clone_info: None,
661 ownership_history_available: Some(false),
662 };
663
664 let record = OptimizedRecord::new(allocation);
665
666 assert_eq!(
667 record.get_formatted_field(&AllocationField::Ptr),
668 Some("\"0x1000\"".to_string())
669 );
670 assert_eq!(
671 record.get_formatted_field(&AllocationField::Size),
672 Some("1024".to_string())
673 );
674 assert_eq!(
675 record.get_formatted_field(&AllocationField::VarName),
676 Some("\"test_var\"".to_string())
677 );
678 assert_eq!(
679 record.get_formatted_field(&AllocationField::IsLeaked),
680 Some("false".to_string())
681 );
682 }
683
684 #[test]
685 fn test_preformatted_fields() {
686 let allocation = PartialAllocationInfo {
687 ptr: Some(0x1000),
688 size: Some(1024),
689 var_name: Some(Some("test".to_string())),
690 type_name: Some(Some("i32".to_string())),
691 scope_name: Some(None),
692 timestamp_alloc: Some(1234567890),
693 timestamp_dealloc: Some(None),
694 thread_id: Some("main".to_string()),
695 borrow_count: Some(0),
696 stack_trace: Some(None),
697 is_leaked: Some(false),
698 lifetime_ms: Some(None),
699 borrow_info: None,
701 clone_info: None,
702 ownership_history_available: Some(false),
703 };
704
705 let mut record = OptimizedRecord::new(allocation);
706 let fields = [AllocationField::Ptr, AllocationField::Size]
707 .into_iter()
708 .collect();
709
710 record.preformat_fields(&fields);
711
712 assert_eq!(record.preformatted_fields.len(), 2);
713 assert!(record
714 .preformatted_fields
715 .contains_key(&AllocationField::Ptr));
716 assert!(record
717 .preformatted_fields
718 .contains_key(&AllocationField::Size));
719 }
720
721 #[test]
722 fn test_streaming_processing() {
723 let mut processor = StreamingFieldProcessor::new();
724
725 let allocation = PartialAllocationInfo {
726 ptr: Some(0x1000),
727 size: Some(1024),
728 var_name: Some(Some("test".to_string())),
729 type_name: Some(Some("i32".to_string())),
730 scope_name: Some(None),
731 timestamp_alloc: Some(1234567890),
732 timestamp_dealloc: Some(None),
733 thread_id: Some("main".to_string()),
734 borrow_count: Some(0),
735 stack_trace: Some(None),
736 is_leaked: Some(false),
737 lifetime_ms: Some(None),
738 borrow_info: None,
740 clone_info: None,
741 ownership_history_available: Some(false),
742 };
743
744 let fields = [AllocationField::Ptr, AllocationField::Size]
745 .into_iter()
746 .collect();
747
748 let mut processed_count = 0;
749 processor
750 .process_record_streaming(allocation, &fields, |_record| {
751 processed_count += 1;
752 Ok(())
753 })
754 .expect("Test operation failed");
755
756 assert_eq!(processed_count, 1);
757 assert_eq!(processor.get_stats().records_processed, 1);
758 assert_eq!(processor.get_stats().records_discarded, 1);
759 assert_eq!(processor.get_stats().fields_processed, 2);
760 }
761
762 #[test]
763 fn test_cache_operations() {
764 let mut processor = StreamingFieldProcessor::new();
765
766 let allocation = PartialAllocationInfo {
767 ptr: Some(0x1000),
768 size: Some(1024),
769 var_name: Some(Some("test".to_string())),
770 type_name: Some(Some("i32".to_string())),
771 scope_name: Some(None),
772 timestamp_alloc: Some(1234567890),
773 timestamp_dealloc: Some(None),
774 thread_id: Some("main".to_string()),
775 borrow_count: Some(0),
776 stack_trace: Some(None),
777 is_leaked: Some(false),
778 lifetime_ms: Some(None),
779 borrow_info: None,
781 clone_info: None,
782 ownership_history_available: Some(false),
783 };
784
785 let result1 = processor
787 .get_or_format_field("test_key", &AllocationField::Ptr, &allocation)
788 .expect("Test operation failed");
789 assert_eq!(result1, Some("\"0x1000\"".to_string()));
790 assert_eq!(processor.get_stats().cache_misses, 1);
791 assert_eq!(processor.cache_size(), 1);
792
793 let result2 = processor
795 .get_or_format_field("test_key", &AllocationField::Ptr, &allocation)
796 .expect("Test operation failed");
797 assert_eq!(result2, Some("\"0x1000\"".to_string()));
798 assert_eq!(processor.get_stats().cache_hits, 1);
799 assert_eq!(processor.cache_size(), 1);
800 }
801
802 #[test]
803 fn test_statistics() {
804 let stats = StreamingFieldProcessorStats {
805 cache_hits: 8,
806 cache_misses: 2,
807 records_processed: 100,
808 fields_processed: 500,
809 preformatted_fields_used: 300,
810 total_processing_time_us: 1_000_000, current_memory_usage: 1024,
812 ..Default::default()
813 };
814
815 assert_eq!(stats.cache_hit_rate(), 80.0);
816 assert_eq!(stats.processing_throughput(), 100.0); assert_eq!(stats.field_processing_efficiency(), 60.0); assert_eq!(stats.memory_efficiency(), 10.24); }
820
821 #[test]
822 fn test_memory_management() {
823 let config = StreamingFieldProcessorConfigBuilder::new()
824 .max_memory_usage(1024) .max_cache_size(5)
826 .build();
827
828 let mut processor = StreamingFieldProcessor::with_config(config);
829
830 let initial_memory = processor.memory_usage();
832 assert_eq!(initial_memory, 0);
833
834 processor.clear_cache();
836 assert_eq!(processor.cache_size(), 0);
837 assert_eq!(processor.memory_usage(), 0);
838 }
839}