Skip to main content

copybook_codec_memory/
streaming.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2/// Memory-bounded streaming processor
3///
4/// Tracks memory usage and provides backpressure signals to maintain
5/// bounded memory consumption during streaming record processing. Enables
6/// processing multi-GB COBOL files with steady-state memory usage <256 MiB.
7///
8/// # Purpose
9///
10/// Prevents unbounded memory growth when processing large data files by:
11/// - Tracking estimated memory usage across buffers and in-flight data
12/// - Providing memory pressure signals for throttling/flushing
13/// - Collecting processing statistics for monitoring
14///
15/// # Usage Pattern
16///
17/// 1. Create processor with memory limit: [`StreamingProcessor::new()`]
18/// 2. Track memory allocations: [`update_memory_usage()`](StreamingProcessor::update_memory_usage)
19/// 3. Check pressure before processing: [`is_memory_pressure()`](StreamingProcessor::is_memory_pressure)
20/// 4. Throttle input or flush buffers when pressure detected
21/// 5. Record completed work: [`record_processed()`](StreamingProcessor::record_processed)
22///
23/// # Memory Tracking
24///
25/// **What to track**:
26/// - Input buffers (record data)
27/// - Output buffers (JSON strings, encoded data)
28/// - Reorder buffers (parallel processing)
29/// - Scratch buffers (codec working memory)
30///
31/// **Pressure threshold**: 80% of `max_memory_bytes`
32///
33/// # Examples
34///
35/// ## Basic Streaming with Memory Bounds
36///
37/// ```rust
38/// use copybook_codec_memory::StreamingProcessor;
39///
40/// let mut processor = StreamingProcessor::with_default_limit(); // 256 MiB
41///
42/// for record in get_records() {
43///     // Check memory pressure
44///     if processor.is_memory_pressure() {
45///         // Flush output buffers or throttle input
46///         flush_buffers();
47///     }
48///
49///     // Track record allocation
50///     processor.update_memory_usage(record.len() as isize);
51///
52///     // Process record
53///     process_record(&record);
54///
55///     // Record completion
56///     processor.record_processed(record.len());
57///
58///     // Track deallocation
59///     processor.update_memory_usage(-(record.len() as isize));
60/// }
61///
62/// let stats = processor.stats();
63/// println!("Processed {} records, peak {} MiB",
64///          stats.records_processed,
65///          stats.current_memory_bytes / 1024 / 1024);
66/// # fn get_records() -> Vec<Vec<u8>> { vec![] }
67/// # fn flush_buffers() {}
68/// # fn process_record(_: &[u8]) {}
69/// ```
70///
71/// ## File Processing with Adaptive Batching
72///
73/// ```rust
74/// use copybook_codec_memory::StreamingProcessor;
75///
76/// let mut processor = StreamingProcessor::new(512); // 512 MiB limit
77/// let mut batch = Vec::new();
78///
79/// for record in file_records() {
80///     batch.push(record.clone());
81///     processor.update_memory_usage(record.len() as isize);
82///
83///     // Adaptive batching based on memory pressure
84///     if processor.is_memory_pressure() || batch.len() >= 1000 {
85///         // Process batch
86///         for rec in batch.drain(..) {
87///             process_and_write(&rec);
88///             processor.record_processed(rec.len());
89///             processor.update_memory_usage(-(rec.len() as isize));
90///         }
91///     }
92/// }
93///
94/// // Process remaining records
95/// for rec in batch {
96///     process_and_write(&rec);
97///     processor.record_processed(rec.len());
98/// }
99/// # fn file_records() -> Vec<Vec<u8>> { vec![] }
100/// # fn process_and_write(_: &[u8]) {}
101/// ```
102#[derive(Debug)]
103pub struct StreamingProcessor {
104    /// Maximum memory usage target (bytes)
105    max_memory_bytes: usize,
106
107    /// Current estimated memory usage
108    ///
109    /// Updated via `update_memory_usage()` as data flows through the system.
110    current_memory_bytes: usize,
111
112    /// Record processing statistics
113    ///
114    /// Count of records successfully processed.
115    records_processed: u64,
116
117    /// Bytes processed
118    ///
119    /// Total byte volume processed (input record sizes).
120    bytes_processed: u64,
121}
122
123impl StreamingProcessor {
124    /// Create a new streaming processor with memory limit
125    ///
126    /// # Arguments
127    ///
128    /// * `max_memory_mb` - Maximum memory usage in megabytes (MiB)
129    ///
130    /// # Examples
131    ///
132    /// ```rust
133    /// use copybook_codec_memory::StreamingProcessor;
134    ///
135    /// let processor = StreamingProcessor::new(512); // 512 MiB limit
136    /// assert!(!processor.is_memory_pressure());
137    /// ```
138    #[inline]
139    #[must_use]
140    pub fn new(max_memory_mb: usize) -> Self {
141        Self {
142            max_memory_bytes: max_memory_mb * 1024 * 1024,
143            current_memory_bytes: 0,
144            records_processed: 0,
145            bytes_processed: 0,
146        }
147    }
148
149    /// Create with default 256 MiB limit
150    ///
151    /// Default limit matches copybook-rs steady-state memory target for
152    /// processing multi-GB COBOL files.
153    ///
154    /// # Examples
155    ///
156    /// ```rust
157    /// use copybook_codec_memory::StreamingProcessor;
158    ///
159    /// let processor = StreamingProcessor::with_default_limit();
160    /// let stats = processor.stats();
161    /// assert_eq!(stats.max_memory_bytes, 256 * 1024 * 1024);
162    /// ```
163    #[inline]
164    #[must_use]
165    pub fn with_default_limit() -> Self {
166        Self::new(256)
167    }
168
169    /// Check if we're approaching memory limit
170    ///
171    /// Returns `true` when current memory usage exceeds 80% of the maximum limit.
172    /// This is the signal to apply backpressure: flush buffers, throttle input,
173    /// or reduce batch sizes.
174    ///
175    /// # Examples
176    ///
177    /// ```rust
178    /// use copybook_codec_memory::StreamingProcessor;
179    ///
180    /// let mut processor = StreamingProcessor::new(1); // 1 MiB limit
181    ///
182    /// processor.update_memory_usage(500 * 1024); // 500 KB
183    /// assert!(!processor.is_memory_pressure()); // <80%
184    ///
185    /// processor.update_memory_usage(400 * 1024); // +400 KB = 900 KB total
186    /// assert!(processor.is_memory_pressure()); // >80%
187    /// ```
188    #[inline]
189    #[must_use]
190    pub fn is_memory_pressure(&self) -> bool {
191        self.current_memory_bytes > (self.max_memory_bytes * 80 / 100) // 80% threshold
192    }
193
194    /// Update memory usage estimate
195    ///
196    /// Track memory allocations (+) and deallocations (-) to maintain
197    /// current memory usage estimate. Use this to track all significant
198    /// buffers in the processing pipeline.
199    ///
200    /// **Performance optimization**: Optimized for hot path with minimal branching.
201    ///
202    /// # Arguments
203    ///
204    /// * `bytes_delta` - Signed byte count change (positive = allocation, negative = deallocation)
205    ///
206    /// # Examples
207    ///
208    /// ```rust
209    /// use copybook_codec_memory::StreamingProcessor;
210    ///
211    /// let mut processor = StreamingProcessor::new(256);
212    ///
213    /// // Allocate 8 KB buffer
214    /// processor.update_memory_usage(8192);
215    /// assert_eq!(processor.stats().current_memory_bytes, 8192);
216    ///
217    /// // Deallocate 4 KB
218    /// processor.update_memory_usage(-4096);
219    /// assert_eq!(processor.stats().current_memory_bytes, 4096);
220    ///
221    /// // Saturating behavior (no underflow)
222    /// processor.update_memory_usage(-10000);
223    /// assert_eq!(processor.stats().current_memory_bytes, 0);
224    /// ```
225    #[inline]
226    pub fn update_memory_usage(&mut self, bytes_delta: isize) {
227        if let Ok(increase) = usize::try_from(bytes_delta) {
228            self.current_memory_bytes = self.current_memory_bytes.saturating_add(increase);
229        } else {
230            let decrease = bytes_delta.unsigned_abs();
231            self.current_memory_bytes = self.current_memory_bytes.saturating_sub(decrease);
232        }
233    }
234
235    /// Record processing of a record
236    ///
237    /// Updates counters for completed record processing. Call this after
238    /// successfully processing each record.
239    ///
240    /// # Arguments
241    ///
242    /// * `record_size` - Size of the processed record in bytes
243    ///
244    /// # Examples
245    ///
246    /// ```rust
247    /// use copybook_codec_memory::StreamingProcessor;
248    ///
249    /// let mut processor = StreamingProcessor::with_default_limit();
250    ///
251    /// processor.record_processed(1024);
252    /// processor.record_processed(2048);
253    ///
254    /// let stats = processor.stats();
255    /// assert_eq!(stats.records_processed, 2);
256    /// assert_eq!(stats.bytes_processed, 3072);
257    /// ```
258    #[inline]
259    pub fn record_processed(&mut self, record_size: usize) {
260        self.records_processed += 1;
261        self.bytes_processed += record_size as u64;
262    }
263
264    /// Get processing statistics
265    ///
266    /// Returns current operational statistics including memory usage,
267    /// utilization percentage, and processing throughput metrics.
268    ///
269    /// # Examples
270    ///
271    /// ```rust
272    /// use copybook_codec_memory::StreamingProcessor;
273    ///
274    /// let mut processor = StreamingProcessor::new(100); // 100 MiB
275    /// processor.update_memory_usage(50 * 1024 * 1024); // 50 MiB
276    /// processor.record_processed(1000);
277    ///
278    /// let stats = processor.stats();
279    /// assert_eq!(stats.memory_utilization_percent, 50);
280    /// assert_eq!(stats.records_processed, 1);
281    /// assert_eq!(stats.bytes_processed, 1000);
282    /// ```
283    #[inline]
284    #[must_use]
285    pub fn stats(&self) -> StreamingProcessorStats {
286        StreamingProcessorStats {
287            max_memory_bytes: self.max_memory_bytes,
288            current_memory_bytes: self.current_memory_bytes,
289            memory_utilization_percent: (self.current_memory_bytes * 100) / self.max_memory_bytes,
290            records_processed: self.records_processed,
291            bytes_processed: self.bytes_processed,
292        }
293    }
294}
295
296/// Statistics about streaming processor operation
297///
298/// Snapshot of current memory usage and processing metrics.
299#[derive(Debug, Clone)]
300pub struct StreamingProcessorStats {
301    /// Maximum memory limit (bytes)
302    pub max_memory_bytes: usize,
303
304    /// Current memory usage (bytes)
305    ///
306    /// Estimated total memory across tracked buffers.
307    pub current_memory_bytes: usize,
308
309    /// Memory utilization percentage
310    ///
311    /// Current usage as percentage of max (0-100).
312    pub memory_utilization_percent: usize,
313
314    /// Records processed
315    ///
316    /// Total count of successfully processed records.
317    pub records_processed: u64,
318
319    /// Bytes processed
320    ///
321    /// Total input byte volume processed.
322    pub bytes_processed: u64,
323}