pub struct StreamingProcessor { /* private fields */ }Expand description
Memory-bounded streaming processor
Tracks memory usage and provides backpressure signals to maintain bounded memory consumption during streaming record processing. Enables processing multi-GB COBOL files with steady-state memory usage <256 MiB.
§Purpose
Prevents unbounded memory growth when processing large data files by:
- Tracking estimated memory usage across buffers and in-flight data
- Providing memory pressure signals for throttling/flushing
- Collecting processing statistics for monitoring
§Usage Pattern
- Create processor with memory limit:
StreamingProcessor::new() - Track memory allocations:
update_memory_usage() - Check pressure before processing:
is_memory_pressure() - Throttle input or flush buffers when pressure detected
- Record completed work:
record_processed()
§Memory Tracking
What to track:
- Input buffers (record data)
- Output buffers (JSON strings, encoded data)
- Reorder buffers (parallel processing)
- Scratch buffers (codec working memory)
Pressure threshold: 80% of max_memory_bytes
§Examples
§Basic Streaming with Memory Bounds
use copybook_codec_memory::StreamingProcessor;
let mut processor = StreamingProcessor::with_default_limit(); // 256 MiB
for record in get_records() {
// Check memory pressure
if processor.is_memory_pressure() {
// Flush output buffers or throttle input
flush_buffers();
}
// Track record allocation
processor.update_memory_usage(record.len() as isize);
// Process record
process_record(&record);
// Record completion
processor.record_processed(record.len());
// Track deallocation
processor.update_memory_usage(-(record.len() as isize));
}
let stats = processor.stats();
println!("Processed {} records, peak {} MiB",
stats.records_processed,
stats.current_memory_bytes / 1024 / 1024);§File Processing with Adaptive Batching
use copybook_codec_memory::StreamingProcessor;
let mut processor = StreamingProcessor::new(512); // 512 MiB limit
let mut batch = Vec::new();
for record in file_records() {
batch.push(record.clone());
processor.update_memory_usage(record.len() as isize);
// Adaptive batching based on memory pressure
if processor.is_memory_pressure() || batch.len() >= 1000 {
// Process batch
for rec in batch.drain(..) {
process_and_write(&rec);
processor.record_processed(rec.len());
processor.update_memory_usage(-(rec.len() as isize));
}
}
}
// Process remaining records
for rec in batch {
process_and_write(&rec);
processor.record_processed(rec.len());
}Implementations§
Source§impl StreamingProcessor
impl StreamingProcessor
Sourcepub fn with_default_limit() -> Self
pub fn with_default_limit() -> Self
Create with default 256 MiB limit
Default limit matches copybook-rs steady-state memory target for processing multi-GB COBOL files.
§Examples
use copybook_codec_memory::StreamingProcessor;
let processor = StreamingProcessor::with_default_limit();
let stats = processor.stats();
assert_eq!(stats.max_memory_bytes, 256 * 1024 * 1024);Sourcepub fn is_memory_pressure(&self) -> bool
pub fn is_memory_pressure(&self) -> bool
Check if we’re approaching memory limit
Returns true when current memory usage exceeds 80% of the maximum limit.
This is the signal to apply backpressure: flush buffers, throttle input,
or reduce batch sizes.
§Examples
use copybook_codec_memory::StreamingProcessor;
let mut processor = StreamingProcessor::new(1); // 1 MiB limit
processor.update_memory_usage(500 * 1024); // 500 KB
assert!(!processor.is_memory_pressure()); // <80%
processor.update_memory_usage(400 * 1024); // +400 KB = 900 KB total
assert!(processor.is_memory_pressure()); // >80%Sourcepub fn update_memory_usage(&mut self, bytes_delta: isize)
pub fn update_memory_usage(&mut self, bytes_delta: isize)
Update memory usage estimate
Track memory allocations (+) and deallocations (-) to maintain current memory usage estimate. Use this to track all significant buffers in the processing pipeline.
Performance optimization: Optimized for hot path with minimal branching.
§Arguments
bytes_delta- Signed byte count change (positive = allocation, negative = deallocation)
§Examples
use copybook_codec_memory::StreamingProcessor;
let mut processor = StreamingProcessor::new(256);
// Allocate 8 KB buffer
processor.update_memory_usage(8192);
assert_eq!(processor.stats().current_memory_bytes, 8192);
// Deallocate 4 KB
processor.update_memory_usage(-4096);
assert_eq!(processor.stats().current_memory_bytes, 4096);
// Saturating behavior (no underflow)
processor.update_memory_usage(-10000);
assert_eq!(processor.stats().current_memory_bytes, 0);Sourcepub fn record_processed(&mut self, record_size: usize)
pub fn record_processed(&mut self, record_size: usize)
Record processing of a record
Updates counters for completed record processing. Call this after successfully processing each record.
§Arguments
record_size- Size of the processed record in bytes
§Examples
use copybook_codec_memory::StreamingProcessor;
let mut processor = StreamingProcessor::with_default_limit();
processor.record_processed(1024);
processor.record_processed(2048);
let stats = processor.stats();
assert_eq!(stats.records_processed, 2);
assert_eq!(stats.bytes_processed, 3072);Sourcepub fn stats(&self) -> StreamingProcessorStats
pub fn stats(&self) -> StreamingProcessorStats
Get processing statistics
Returns current operational statistics including memory usage, utilization percentage, and processing throughput metrics.
§Examples
use copybook_codec_memory::StreamingProcessor;
let mut processor = StreamingProcessor::new(100); // 100 MiB
processor.update_memory_usage(50 * 1024 * 1024); // 50 MiB
processor.record_processed(1000);
let stats = processor.stats();
assert_eq!(stats.memory_utilization_percent, 50);
assert_eq!(stats.records_processed, 1);
assert_eq!(stats.bytes_processed, 1000);