memscope_rs/export/binary/
streaming_field_processor.rs

1//! Streaming field processor for constant memory field processing
2//!
3//! This module provides streaming field processing capabilities that maintain
4//! constant memory usage by processing records one at a time and immediately
5//! discarding processed data. It includes LRU field caching and optimized
6//! record structures for high-performance processing.
7
8use crate::export::binary::error::BinaryExportError;
9use crate::export::binary::field_parser::PartialAllocationInfo;
10use crate::export::binary::selective_reader::AllocationField;
11use std::collections::{HashMap, HashSet};
12use std::time::Instant;
13
14/// Configuration for streaming field processor
15#[derive(Debug, Clone)]
16pub struct StreamingFieldProcessorConfig {
17    /// Maximum size of the LRU field cache
18    pub max_cache_size: usize,
19
20    /// Enable field value caching
21    pub enable_field_caching: bool,
22
23    /// Enable pre-formatted field optimization
24    pub enable_preformatted_fields: bool,
25
26    /// Maximum memory usage before forcing cache eviction (bytes)
27    pub max_memory_usage: usize,
28
29    /// Enable streaming statistics collection
30    pub enable_statistics: bool,
31
32    /// Batch size for processing multiple records
33    pub batch_size: usize,
34}
35
36impl Default for StreamingFieldProcessorConfig {
37    fn default() -> Self {
38        Self {
39            max_cache_size: 1000,
40            enable_field_caching: true,
41            enable_preformatted_fields: true,
42            max_memory_usage: 16 * 1024 * 1024, // 16MB
43            enable_statistics: true,
44            batch_size: 100,
45        }
46    }
47}
48
49/// Statistics for streaming field processing
50#[derive(Debug, Clone, Default)]
51pub struct StreamingFieldProcessorStats {
52    /// Total records processed
53    pub records_processed: u64,
54
55    /// Total fields processed
56    pub fields_processed: u64,
57
58    /// Number of cache hits
59    pub cache_hits: u64,
60
61    /// Number of cache misses
62    pub cache_misses: u64,
63
64    /// Number of cache evictions
65    pub cache_evictions: u64,
66
67    /// Total processing time in microseconds
68    pub total_processing_time_us: u64,
69
70    /// Time spent on field formatting (microseconds)
71    pub field_formatting_time_us: u64,
72
73    /// Memory usage in bytes
74    pub current_memory_usage: usize,
75
76    /// Peak memory usage in bytes
77    pub peak_memory_usage: usize,
78
79    /// Number of preformatted fields used
80    pub preformatted_fields_used: u64,
81
82    /// Number of records immediately discarded
83    pub records_discarded: u64,
84}
85
86impl StreamingFieldProcessorStats {
87    /// Calculate cache hit rate as percentage
88    pub fn cache_hit_rate(&self) -> f64 {
89        let total_requests = self.cache_hits + self.cache_misses;
90        if total_requests == 0 {
91            0.0
92        } else {
93            (self.cache_hits as f64 / total_requests as f64) * 100.0
94        }
95    }
96
97    /// Calculate processing throughput (records per second)
98    pub fn processing_throughput(&self) -> f64 {
99        if self.total_processing_time_us == 0 {
100            0.0
101        } else {
102            (self.records_processed as f64 * 1_000_000.0) / self.total_processing_time_us as f64
103        }
104    }
105
106    /// Calculate field processing efficiency
107    pub fn field_processing_efficiency(&self) -> f64 {
108        if self.fields_processed == 0 {
109            0.0
110        } else {
111            (self.preformatted_fields_used as f64 / self.fields_processed as f64) * 100.0
112        }
113    }
114
115    /// Calculate memory efficiency (bytes per record)
116    pub fn memory_efficiency(&self) -> f64 {
117        if self.records_processed == 0 {
118            0.0
119        } else {
120            self.current_memory_usage as f64 / self.records_processed as f64
121        }
122    }
123}
124
125/// Cached field value with LRU metadata
126#[derive(Debug, Clone)]
127struct CachedFieldValue {
128    /// The formatted field value
129    value: String,
130
131    /// Last access time for LRU eviction
132    last_accessed: Instant,
133
134    /// Access count for popularity tracking
135    access_count: u32,
136
137    /// Estimated memory usage of this cache entry
138    memory_usage: usize,
139}
140
141impl CachedFieldValue {
142    fn new(value: String) -> Self {
143        let memory_usage = value.len() + std::mem::size_of::<Self>();
144        Self {
145            value,
146            last_accessed: Instant::now(),
147            access_count: 1,
148            memory_usage,
149        }
150    }
151
152    fn access(&mut self) -> &str {
153        self.last_accessed = Instant::now();
154        self.access_count += 1;
155        &self.value
156    }
157}
158
159/// Optimized record structure with pre-formatted fields
160#[derive(Debug, Clone)]
161pub struct OptimizedRecord {
162    /// Original allocation info
163    pub allocation: PartialAllocationInfo,
164
165    /// Pre-formatted field values for common fields
166    pub preformatted_fields: HashMap<AllocationField, String>,
167
168    /// Timestamp when this record was created
169    pub created_at: Instant,
170
171    /// Estimated memory usage of this record
172    pub memory_usage: usize,
173}
174
175impl OptimizedRecord {
176    /// Create a new optimized record
177    pub fn new(allocation: PartialAllocationInfo) -> Self {
178        let mut record = Self {
179            allocation,
180            preformatted_fields: HashMap::new(),
181            created_at: Instant::now(),
182            memory_usage: 0,
183        };
184
185        // Estimate memory usage
186        record.memory_usage = record.estimate_memory_usage();
187        record
188    }
189
190    /// Pre-format commonly used fields
191    pub fn preformat_fields(&mut self, fields: &HashSet<AllocationField>) {
192        for field in fields {
193            if let Some(formatted) = self.format_field(field) {
194                self.preformatted_fields.insert(*field, formatted);
195            }
196        }
197
198        // Update memory usage estimate
199        self.memory_usage = self.estimate_memory_usage();
200    }
201
202    /// Get a formatted field value (from cache or format on demand)
203    pub fn get_formatted_field(&self, field: &AllocationField) -> Option<String> {
204        // Check preformatted fields first
205        if let Some(cached) = self.preformatted_fields.get(field) {
206            return Some(cached.clone());
207        }
208
209        // Format on demand
210        self.format_field(field)
211    }
212
213    /// Format a specific field
214    fn format_field(&self, field: &AllocationField) -> Option<String> {
215        match field {
216            AllocationField::Ptr => self.allocation.ptr.map(|ptr| format!("\"0x{ptr:x}\"")),
217            AllocationField::Size => self.allocation.size.map(|size| size.to_string()),
218            AllocationField::VarName => {
219                self.allocation
220                    .var_name
221                    .as_ref()
222                    .map(|var_name| match var_name {
223                        Some(name) => format!("\"{}\"", name.replace('"', "\\\"")),
224                        None => "null".to_string(),
225                    })
226            }
227            AllocationField::TypeName => {
228                self.allocation
229                    .type_name
230                    .as_ref()
231                    .map(|type_name| match type_name {
232                        Some(name) => format!("\"{}\"", name.replace('"', "\\\"")),
233                        None => "null".to_string(),
234                    })
235            }
236            AllocationField::ScopeName => {
237                self.allocation
238                    .scope_name
239                    .as_ref()
240                    .map(|scope_name| match scope_name {
241                        Some(name) => format!("\"{}\"", name.replace('"', "\\\"")),
242                        None => "null".to_string(),
243                    })
244            }
245            AllocationField::TimestampAlloc => {
246                self.allocation.timestamp_alloc.map(|ts| ts.to_string())
247            }
248            AllocationField::TimestampDealloc => {
249                self.allocation
250                    .timestamp_dealloc
251                    .as_ref()
252                    .map(|ts_opt| match ts_opt {
253                        Some(ts) => ts.to_string(),
254                        None => "null".to_string(),
255                    })
256            }
257            AllocationField::ThreadId => self
258                .allocation
259                .thread_id
260                .as_ref()
261                .map(|thread_id| format!("\"{}\"", thread_id.replace('"', "\\\""))),
262            AllocationField::BorrowCount => {
263                self.allocation.borrow_count.map(|count| count.to_string())
264            }
265            AllocationField::IsLeaked => self
266                .allocation
267                .is_leaked
268                .map(|leaked| if leaked { "true" } else { "false" }.to_string()),
269            AllocationField::StackTrace => {
270                self.allocation
271                    .stack_trace
272                    .as_ref()
273                    .map(|stack_trace_opt| match stack_trace_opt {
274                        Some(trace) => {
275                            let trace_json: Vec<String> = trace
276                                .iter()
277                                .map(|s| format!("\"{}\"", s.replace('"', "\\\"")))
278                                .collect();
279                            format!("[{}]", trace_json.join(", "))
280                        }
281                        None => "null".to_string(),
282                    })
283            }
284            AllocationField::LifetimeMs => {
285                self.allocation
286                    .lifetime_ms
287                    .as_ref()
288                    .map(|lifetime_opt| match lifetime_opt {
289                        Some(ms) => ms.to_string(),
290                        None => "null".to_string(),
291                    })
292            }
293            _ => None, // Advanced fields not implemented yet
294        }
295    }
296
297    /// Estimate memory usage of this record
298    fn estimate_memory_usage(&self) -> usize {
299        let mut usage = std::mem::size_of::<Self>();
300
301        // Add preformatted fields memory
302        for value in self.preformatted_fields.values() {
303            usage += value.len() + std::mem::size_of::<String>();
304        }
305
306        // Add allocation info memory (simplified estimation)
307        usage += 256; // Approximate size of PartialAllocationInfo
308
309        usage
310    }
311}
312
313/// Streaming field processor with constant memory usage
314pub struct StreamingFieldProcessor {
315    /// Configuration
316    config: StreamingFieldProcessorConfig,
317
318    /// LRU cache for field values
319    field_cache: HashMap<String, CachedFieldValue>,
320
321    /// Statistics
322    stats: StreamingFieldProcessorStats,
323
324    /// Current memory usage estimate
325    current_memory_usage: usize,
326}
327
328impl StreamingFieldProcessor {
329    /// Create a new streaming field processor
330    pub fn new() -> Self {
331        Self::with_config(StreamingFieldProcessorConfig::default())
332    }
333
334    /// Create a new streaming field processor with custom configuration
335    pub fn with_config(config: StreamingFieldProcessorConfig) -> Self {
336        Self {
337            config,
338            field_cache: HashMap::new(),
339            stats: StreamingFieldProcessorStats::default(),
340            current_memory_usage: 0,
341        }
342    }
343
344    /// Process a single record with streaming (constant memory)
345    pub fn process_record_streaming<F>(
346        &mut self,
347        allocation: PartialAllocationInfo,
348        requested_fields: &HashSet<AllocationField>,
349        mut processor: F,
350    ) -> Result<(), BinaryExportError>
351    where
352        F: FnMut(&OptimizedRecord) -> Result<(), BinaryExportError>,
353    {
354        let start_time = Instant::now();
355
356        // Create optimized record
357        let mut record = OptimizedRecord::new(allocation);
358
359        // Pre-format requested fields
360        if self.config.enable_preformatted_fields {
361            record.preformat_fields(requested_fields);
362        }
363
364        // Update memory usage
365        self.current_memory_usage += record.memory_usage;
366        if self.current_memory_usage > self.stats.peak_memory_usage {
367            self.stats.peak_memory_usage = self.current_memory_usage;
368        }
369
370        // Process the record
371        processor(&record)?;
372
373        // Immediately discard the record to maintain constant memory
374        self.current_memory_usage -= record.memory_usage;
375        self.stats.records_discarded += 1;
376
377        // Update statistics
378        self.stats.records_processed += 1;
379        self.stats.fields_processed += requested_fields.len() as u64;
380        self.stats.total_processing_time_us += start_time.elapsed().as_micros() as u64;
381
382        // Check if we need to evict cache entries
383        if self.current_memory_usage > self.config.max_memory_usage {
384            self.evict_cache_entries()?;
385        }
386
387        Ok(())
388    }
389
390    /// Process multiple records in streaming fashion
391    pub fn process_records_streaming<F>(
392        &mut self,
393        allocations: Vec<PartialAllocationInfo>,
394        requested_fields: &HashSet<AllocationField>,
395        mut processor: F,
396    ) -> Result<(), BinaryExportError>
397    where
398        F: FnMut(&OptimizedRecord) -> Result<(), BinaryExportError>,
399    {
400        // Process records in batches to maintain constant memory
401        for batch in allocations.chunks(self.config.batch_size) {
402            for allocation in batch {
403                self.process_record_streaming(
404                    allocation.clone(),
405                    requested_fields,
406                    &mut processor,
407                )?;
408            }
409
410            // Force cache cleanup between batches
411            if self.current_memory_usage > self.config.max_memory_usage / 2 {
412                self.evict_cache_entries()?;
413            }
414        }
415
416        Ok(())
417    }
418
419    /// Get a cached field value or format and cache it
420    pub fn get_or_format_field(
421        &mut self,
422        cache_key: &str,
423        field: &AllocationField,
424        allocation: &PartialAllocationInfo,
425    ) -> Result<Option<String>, BinaryExportError> {
426        if !self.config.enable_field_caching {
427            // Format directly without caching
428            return Ok(self.format_field_direct(field, allocation));
429        }
430
431        // Check cache first
432        if let Some(cached) = self.field_cache.get_mut(cache_key) {
433            self.stats.cache_hits += 1;
434            return Ok(Some(cached.access().to_string()));
435        }
436
437        // Format and cache
438        if let Some(formatted) = self.format_field_direct(field, allocation) {
439            let cached_value = CachedFieldValue::new(formatted.clone());
440            self.current_memory_usage += cached_value.memory_usage;
441
442            self.field_cache.insert(cache_key.to_string(), cached_value);
443            self.stats.cache_misses += 1;
444
445            // Check if we need to evict
446            if self.field_cache.len() > self.config.max_cache_size {
447                self.evict_lru_entry()?;
448            }
449
450            Ok(Some(formatted))
451        } else {
452            self.stats.cache_misses += 1;
453            Ok(None)
454        }
455    }
456
457    /// Get current statistics
458    pub fn get_stats(&self) -> &StreamingFieldProcessorStats {
459        &self.stats
460    }
461
462    /// Reset statistics
463    pub fn reset_stats(&mut self) {
464        self.stats = StreamingFieldProcessorStats::default();
465    }
466
467    /// Clear all caches
468    pub fn clear_cache(&mut self) {
469        self.field_cache.clear();
470        self.current_memory_usage = 0;
471    }
472
473    /// Get current cache size
474    pub fn cache_size(&self) -> usize {
475        self.field_cache.len()
476    }
477
478    /// Get current memory usage
479    pub fn memory_usage(&self) -> usize {
480        self.current_memory_usage
481    }
482
483    // Private helper methods
484
485    /// Format a field directly without caching
486    fn format_field_direct(
487        &self,
488        field: &AllocationField,
489        allocation: &PartialAllocationInfo,
490    ) -> Option<String> {
491        // Create a temporary OptimizedRecord for formatting
492        let temp_record = OptimizedRecord::new(allocation.clone());
493        temp_record.format_field(field)
494    }
495
496    /// Evict cache entries to reduce memory usage
497    fn evict_cache_entries(&mut self) -> Result<(), BinaryExportError> {
498        let target_size = self.config.max_cache_size / 2;
499
500        while self.field_cache.len() > target_size {
501            self.evict_lru_entry()?;
502        }
503
504        Ok(())
505    }
506
507    /// Evict the least recently used cache entry
508    fn evict_lru_entry(&mut self) -> Result<(), BinaryExportError> {
509        if let Some((lru_key, lru_value)) = self
510            .field_cache
511            .iter()
512            .min_by_key(|(_, v)| (v.access_count, v.last_accessed))
513            .map(|(k, v)| (k.clone(), v.clone()))
514        {
515            self.current_memory_usage -= lru_value.memory_usage;
516            self.field_cache.remove(&lru_key);
517            self.stats.cache_evictions += 1;
518        }
519
520        Ok(())
521    }
522}
523
524impl Default for StreamingFieldProcessor {
525    fn default() -> Self {
526        Self::new()
527    }
528}
529
530/// Builder for streaming field processor configuration
531pub struct StreamingFieldProcessorConfigBuilder {
532    config: StreamingFieldProcessorConfig,
533}
534
535impl StreamingFieldProcessorConfigBuilder {
536    /// Create a new configuration builder
537    pub fn new() -> Self {
538        Self {
539            config: StreamingFieldProcessorConfig::default(),
540        }
541    }
542
543    /// Set maximum cache size
544    pub fn max_cache_size(mut self, size: usize) -> Self {
545        self.config.max_cache_size = size;
546        self
547    }
548
549    /// Enable or disable field caching
550    pub fn field_caching(mut self, enabled: bool) -> Self {
551        self.config.enable_field_caching = enabled;
552        self
553    }
554
555    /// Enable or disable preformatted fields
556    pub fn preformatted_fields(mut self, enabled: bool) -> Self {
557        self.config.enable_preformatted_fields = enabled;
558        self
559    }
560
561    /// Set maximum memory usage
562    pub fn max_memory_usage(mut self, bytes: usize) -> Self {
563        self.config.max_memory_usage = bytes;
564        self
565    }
566
567    /// Enable or disable statistics collection
568    pub fn statistics(mut self, enabled: bool) -> Self {
569        self.config.enable_statistics = enabled;
570        self
571    }
572
573    /// Set batch size
574    pub fn batch_size(mut self, size: usize) -> Self {
575        self.config.batch_size = size;
576        self
577    }
578
579    /// Build the configuration
580    pub fn build(self) -> StreamingFieldProcessorConfig {
581        self.config
582    }
583}
584
585impl Default for StreamingFieldProcessorConfigBuilder {
586    fn default() -> Self {
587        Self::new()
588    }
589}
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594
595    #[test]
596    fn test_streaming_field_processor_creation() {
597        let processor = StreamingFieldProcessor::new();
598        assert_eq!(processor.cache_size(), 0);
599        assert_eq!(processor.memory_usage(), 0);
600    }
601
602    #[test]
603    fn test_config_builder() {
604        let config = StreamingFieldProcessorConfigBuilder::new()
605            .max_cache_size(500)
606            .field_caching(false)
607            .preformatted_fields(false)
608            .max_memory_usage(8 * 1024 * 1024)
609            .build();
610
611        assert_eq!(config.max_cache_size, 500);
612        assert!(!config.enable_field_caching);
613        assert!(!config.enable_preformatted_fields);
614        assert_eq!(config.max_memory_usage, 8 * 1024 * 1024);
615    }
616
617    #[test]
618    fn test_optimized_record_creation() {
619        let allocation = PartialAllocationInfo {
620            ptr: Some(0x1000),
621            size: Some(1024),
622            var_name: Some(Some("test_var".to_string())),
623            type_name: Some(Some("i32".to_string())),
624            scope_name: Some(None),
625            timestamp_alloc: Some(1234567890),
626            timestamp_dealloc: Some(None),
627            thread_id: Some("main".to_string()),
628            borrow_count: Some(0),
629            stack_trace: Some(None),
630            is_leaked: Some(false),
631            lifetime_ms: Some(None),
632            // improve.md extensions
633            borrow_info: None,
634            clone_info: None,
635            ownership_history_available: Some(false),
636        };
637
638        let record = OptimizedRecord::new(allocation);
639        assert!(record.memory_usage > 0);
640        assert!(record.preformatted_fields.is_empty());
641    }
642
643    #[test]
644    fn test_field_formatting() {
645        let allocation = PartialAllocationInfo {
646            ptr: Some(0x1000),
647            size: Some(1024),
648            var_name: Some(Some("test_var".to_string())),
649            type_name: Some(Some("i32".to_string())),
650            scope_name: Some(None),
651            timestamp_alloc: Some(1234567890),
652            timestamp_dealloc: Some(None),
653            thread_id: Some("main".to_string()),
654            borrow_count: Some(0),
655            stack_trace: Some(None),
656            is_leaked: Some(false),
657            lifetime_ms: Some(None),
658            // improve.md extensions
659            borrow_info: None,
660            clone_info: None,
661            ownership_history_available: Some(false),
662        };
663
664        let record = OptimizedRecord::new(allocation);
665
666        assert_eq!(
667            record.get_formatted_field(&AllocationField::Ptr),
668            Some("\"0x1000\"".to_string())
669        );
670        assert_eq!(
671            record.get_formatted_field(&AllocationField::Size),
672            Some("1024".to_string())
673        );
674        assert_eq!(
675            record.get_formatted_field(&AllocationField::VarName),
676            Some("\"test_var\"".to_string())
677        );
678        assert_eq!(
679            record.get_formatted_field(&AllocationField::IsLeaked),
680            Some("false".to_string())
681        );
682    }
683
684    #[test]
685    fn test_preformatted_fields() {
686        let allocation = PartialAllocationInfo {
687            ptr: Some(0x1000),
688            size: Some(1024),
689            var_name: Some(Some("test".to_string())),
690            type_name: Some(Some("i32".to_string())),
691            scope_name: Some(None),
692            timestamp_alloc: Some(1234567890),
693            timestamp_dealloc: Some(None),
694            thread_id: Some("main".to_string()),
695            borrow_count: Some(0),
696            stack_trace: Some(None),
697            is_leaked: Some(false),
698            lifetime_ms: Some(None),
699            // improve.md extensions
700            borrow_info: None,
701            clone_info: None,
702            ownership_history_available: Some(false),
703        };
704
705        let mut record = OptimizedRecord::new(allocation);
706        let fields = [AllocationField::Ptr, AllocationField::Size]
707            .into_iter()
708            .collect();
709
710        record.preformat_fields(&fields);
711
712        assert_eq!(record.preformatted_fields.len(), 2);
713        assert!(record
714            .preformatted_fields
715            .contains_key(&AllocationField::Ptr));
716        assert!(record
717            .preformatted_fields
718            .contains_key(&AllocationField::Size));
719    }
720
721    #[test]
722    fn test_streaming_processing() {
723        let mut processor = StreamingFieldProcessor::new();
724
725        let allocation = PartialAllocationInfo {
726            ptr: Some(0x1000),
727            size: Some(1024),
728            var_name: Some(Some("test".to_string())),
729            type_name: Some(Some("i32".to_string())),
730            scope_name: Some(None),
731            timestamp_alloc: Some(1234567890),
732            timestamp_dealloc: Some(None),
733            thread_id: Some("main".to_string()),
734            borrow_count: Some(0),
735            stack_trace: Some(None),
736            is_leaked: Some(false),
737            lifetime_ms: Some(None),
738            // improve.md extensions
739            borrow_info: None,
740            clone_info: None,
741            ownership_history_available: Some(false),
742        };
743
744        let fields = [AllocationField::Ptr, AllocationField::Size]
745            .into_iter()
746            .collect();
747
748        let mut processed_count = 0;
749        processor
750            .process_record_streaming(allocation, &fields, |_record| {
751                processed_count += 1;
752                Ok(())
753            })
754            .expect("Test operation failed");
755
756        assert_eq!(processed_count, 1);
757        assert_eq!(processor.get_stats().records_processed, 1);
758        assert_eq!(processor.get_stats().records_discarded, 1);
759        assert_eq!(processor.get_stats().fields_processed, 2);
760    }
761
762    #[test]
763    fn test_cache_operations() {
764        let mut processor = StreamingFieldProcessor::new();
765
766        let allocation = PartialAllocationInfo {
767            ptr: Some(0x1000),
768            size: Some(1024),
769            var_name: Some(Some("test".to_string())),
770            type_name: Some(Some("i32".to_string())),
771            scope_name: Some(None),
772            timestamp_alloc: Some(1234567890),
773            timestamp_dealloc: Some(None),
774            thread_id: Some("main".to_string()),
775            borrow_count: Some(0),
776            stack_trace: Some(None),
777            is_leaked: Some(false),
778            lifetime_ms: Some(None),
779            // improve.md extensions
780            borrow_info: None,
781            clone_info: None,
782            ownership_history_available: Some(false),
783        };
784
785        // Test cache miss
786        let result1 = processor
787            .get_or_format_field("test_key", &AllocationField::Ptr, &allocation)
788            .expect("Test operation failed");
789        assert_eq!(result1, Some("\"0x1000\"".to_string()));
790        assert_eq!(processor.get_stats().cache_misses, 1);
791        assert_eq!(processor.cache_size(), 1);
792
793        // Test cache hit
794        let result2 = processor
795            .get_or_format_field("test_key", &AllocationField::Ptr, &allocation)
796            .expect("Test operation failed");
797        assert_eq!(result2, Some("\"0x1000\"".to_string()));
798        assert_eq!(processor.get_stats().cache_hits, 1);
799        assert_eq!(processor.cache_size(), 1);
800    }
801
802    #[test]
803    fn test_statistics() {
804        let stats = StreamingFieldProcessorStats {
805            cache_hits: 8,
806            cache_misses: 2,
807            records_processed: 100,
808            fields_processed: 500,
809            preformatted_fields_used: 300,
810            total_processing_time_us: 1_000_000, // 1 second
811            current_memory_usage: 1024,
812            ..Default::default()
813        };
814
815        assert_eq!(stats.cache_hit_rate(), 80.0);
816        assert_eq!(stats.processing_throughput(), 100.0); // 100 records per second
817        assert_eq!(stats.field_processing_efficiency(), 60.0); // 60% preformatted
818        assert_eq!(stats.memory_efficiency(), 10.24); // 10.24 bytes per record
819    }
820
821    #[test]
822    fn test_memory_management() {
823        let config = StreamingFieldProcessorConfigBuilder::new()
824            .max_memory_usage(1024) // Very small limit for testing
825            .max_cache_size(5)
826            .build();
827
828        let mut processor = StreamingFieldProcessor::with_config(config);
829
830        // Test that memory usage is tracked
831        let initial_memory = processor.memory_usage();
832        assert_eq!(initial_memory, 0);
833
834        // Test cache clearing
835        processor.clear_cache();
836        assert_eq!(processor.cache_size(), 0);
837        assert_eq!(processor.memory_usage(), 0);
838    }
839}