Skip to main content

StreamingProcessor

Struct StreamingProcessor 

Source
pub struct StreamingProcessor { /* private fields */ }
Expand description

Memory-bounded streaming processor

Tracks memory usage and provides backpressure signals to maintain bounded memory consumption during streaming record processing. Enables processing multi-GB COBOL files with steady-state memory usage <256 MiB.

§Purpose

Prevents unbounded memory growth when processing large data files by:

  • Tracking estimated memory usage across buffers and in-flight data
  • Providing memory pressure signals for throttling/flushing
  • Collecting processing statistics for monitoring

§Usage Pattern

  1. Create processor with memory limit: StreamingProcessor::new()
  2. Track memory allocations: update_memory_usage()
  3. Check pressure before processing: is_memory_pressure()
  4. Throttle input or flush buffers when pressure detected
  5. Record completed work: record_processed()

§Memory Tracking

What to track:

  • Input buffers (record data)
  • Output buffers (JSON strings, encoded data)
  • Reorder buffers (parallel processing)
  • Scratch buffers (codec working memory)

Pressure threshold: 80% of max_memory_bytes

§Examples

§Basic Streaming with Memory Bounds

use copybook_codec_memory::StreamingProcessor;

let mut processor = StreamingProcessor::with_default_limit(); // 256 MiB

for record in get_records() {
    // Check memory pressure
    if processor.is_memory_pressure() {
        // Flush output buffers or throttle input
        flush_buffers();
    }

    // Track record allocation
    processor.update_memory_usage(record.len() as isize);

    // Process record
    process_record(&record);

    // Record completion
    processor.record_processed(record.len());

    // Track deallocation
    processor.update_memory_usage(-(record.len() as isize));
}

let stats = processor.stats();
println!("Processed {} records, peak {} MiB",
         stats.records_processed,
         stats.current_memory_bytes / 1024 / 1024);

§File Processing with Adaptive Batching

use copybook_codec_memory::StreamingProcessor;

let mut processor = StreamingProcessor::new(512); // 512 MiB limit
let mut batch = Vec::new();

for record in file_records() {
    batch.push(record.clone());
    processor.update_memory_usage(record.len() as isize);

    // Adaptive batching based on memory pressure
    if processor.is_memory_pressure() || batch.len() >= 1000 {
        // Process batch
        for rec in batch.drain(..) {
            process_and_write(&rec);
            processor.record_processed(rec.len());
            processor.update_memory_usage(-(rec.len() as isize));
        }
    }
}

// Process remaining records
for rec in batch {
    process_and_write(&rec);
    processor.record_processed(rec.len());
}

Implementations§

Source§

impl StreamingProcessor

Source

pub fn new(max_memory_mb: usize) -> Self

Create a new streaming processor with memory limit

§Arguments
  • max_memory_mb - Maximum memory usage in megabytes (MiB)
§Examples
use copybook_codec_memory::StreamingProcessor;

let processor = StreamingProcessor::new(512); // 512 MiB limit
assert!(!processor.is_memory_pressure());
Source

pub fn with_default_limit() -> Self

Create with default 256 MiB limit

Default limit matches copybook-rs steady-state memory target for processing multi-GB COBOL files.

§Examples
use copybook_codec_memory::StreamingProcessor;

let processor = StreamingProcessor::with_default_limit();
let stats = processor.stats();
assert_eq!(stats.max_memory_bytes, 256 * 1024 * 1024);
Source

pub fn is_memory_pressure(&self) -> bool

Check if we’re approaching memory limit

Returns true when current memory usage exceeds 80% of the maximum limit. This is the signal to apply backpressure: flush buffers, throttle input, or reduce batch sizes.

§Examples
use copybook_codec_memory::StreamingProcessor;

let mut processor = StreamingProcessor::new(1); // 1 MiB limit

processor.update_memory_usage(500 * 1024); // 500 KB
assert!(!processor.is_memory_pressure()); // <80%

processor.update_memory_usage(400 * 1024); // +400 KB = 900 KB total
assert!(processor.is_memory_pressure()); // >80%
Source

pub fn update_memory_usage(&mut self, bytes_delta: isize)

Update memory usage estimate

Track memory allocations (+) and deallocations (-) to maintain current memory usage estimate. Use this to track all significant buffers in the processing pipeline.

Performance optimization: Optimized for hot path with minimal branching.

§Arguments
  • bytes_delta - Signed byte count change (positive = allocation, negative = deallocation)
§Examples
use copybook_codec_memory::StreamingProcessor;

let mut processor = StreamingProcessor::new(256);

// Allocate 8 KB buffer
processor.update_memory_usage(8192);
assert_eq!(processor.stats().current_memory_bytes, 8192);

// Deallocate 4 KB
processor.update_memory_usage(-4096);
assert_eq!(processor.stats().current_memory_bytes, 4096);

// Saturating behavior (no underflow)
processor.update_memory_usage(-10000);
assert_eq!(processor.stats().current_memory_bytes, 0);
Source

pub fn record_processed(&mut self, record_size: usize)

Record processing of a record

Updates counters for completed record processing. Call this after successfully processing each record.

§Arguments
  • record_size - Size of the processed record in bytes
§Examples
use copybook_codec_memory::StreamingProcessor;

let mut processor = StreamingProcessor::with_default_limit();

processor.record_processed(1024);
processor.record_processed(2048);

let stats = processor.stats();
assert_eq!(stats.records_processed, 2);
assert_eq!(stats.bytes_processed, 3072);
Source

pub fn stats(&self) -> StreamingProcessorStats

Get processing statistics

Returns current operational statistics including memory usage, utilization percentage, and processing throughput metrics.

§Examples
use copybook_codec_memory::StreamingProcessor;

let mut processor = StreamingProcessor::new(100); // 100 MiB
processor.update_memory_usage(50 * 1024 * 1024); // 50 MiB
processor.record_processed(1000);

let stats = processor.stats();
assert_eq!(stats.memory_utilization_percent, 50);
assert_eq!(stats.records_processed, 1);
assert_eq!(stats.bytes_processed, 1000);

Trait Implementations§

Source§

impl Debug for StreamingProcessor

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more