copybook_codec_memory/streaming.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2/// Memory-bounded streaming processor
3///
4/// Tracks memory usage and provides backpressure signals to maintain
5/// bounded memory consumption during streaming record processing. Enables
6/// processing multi-GB COBOL files with steady-state memory usage <256 MiB.
7///
8/// # Purpose
9///
10/// Prevents unbounded memory growth when processing large data files by:
11/// - Tracking estimated memory usage across buffers and in-flight data
12/// - Providing memory pressure signals for throttling/flushing
13/// - Collecting processing statistics for monitoring
14///
15/// # Usage Pattern
16///
17/// 1. Create processor with memory limit: [`StreamingProcessor::new()`]
18/// 2. Track memory allocations: [`update_memory_usage()`](StreamingProcessor::update_memory_usage)
19/// 3. Check pressure before processing: [`is_memory_pressure()`](StreamingProcessor::is_memory_pressure)
20/// 4. Throttle input or flush buffers when pressure detected
21/// 5. Record completed work: [`record_processed()`](StreamingProcessor::record_processed)
22///
23/// # Memory Tracking
24///
25/// **What to track**:
26/// - Input buffers (record data)
27/// - Output buffers (JSON strings, encoded data)
28/// - Reorder buffers (parallel processing)
29/// - Scratch buffers (codec working memory)
30///
31/// **Pressure threshold**: 80% of `max_memory_bytes`
32///
33/// # Examples
34///
35/// ## Basic Streaming with Memory Bounds
36///
37/// ```rust
38/// use copybook_codec_memory::StreamingProcessor;
39///
40/// let mut processor = StreamingProcessor::with_default_limit(); // 256 MiB
41///
42/// for record in get_records() {
43/// // Check memory pressure
44/// if processor.is_memory_pressure() {
45/// // Flush output buffers or throttle input
46/// flush_buffers();
47/// }
48///
49/// // Track record allocation
50/// processor.update_memory_usage(record.len() as isize);
51///
52/// // Process record
53/// process_record(&record);
54///
55/// // Record completion
56/// processor.record_processed(record.len());
57///
58/// // Track deallocation
59/// processor.update_memory_usage(-(record.len() as isize));
60/// }
61///
62/// let stats = processor.stats();
63/// println!("Processed {} records, peak {} MiB",
64/// stats.records_processed,
65/// stats.current_memory_bytes / 1024 / 1024);
66/// # fn get_records() -> Vec<Vec<u8>> { vec![] }
67/// # fn flush_buffers() {}
68/// # fn process_record(_: &[u8]) {}
69/// ```
70///
71/// ## File Processing with Adaptive Batching
72///
73/// ```rust
74/// use copybook_codec_memory::StreamingProcessor;
75///
76/// let mut processor = StreamingProcessor::new(512); // 512 MiB limit
77/// let mut batch = Vec::new();
78///
79/// for record in file_records() {
80/// batch.push(record.clone());
81/// processor.update_memory_usage(record.len() as isize);
82///
83/// // Adaptive batching based on memory pressure
84/// if processor.is_memory_pressure() || batch.len() >= 1000 {
85/// // Process batch
86/// for rec in batch.drain(..) {
87/// process_and_write(&rec);
88/// processor.record_processed(rec.len());
89/// processor.update_memory_usage(-(rec.len() as isize));
90/// }
91/// }
92/// }
93///
94/// // Process remaining records
95/// for rec in batch {
96/// process_and_write(&rec);
97/// processor.record_processed(rec.len());
98/// }
99/// # fn file_records() -> Vec<Vec<u8>> { vec![] }
100/// # fn process_and_write(_: &[u8]) {}
101/// ```
102#[derive(Debug)]
103pub struct StreamingProcessor {
104 /// Maximum memory usage target (bytes)
105 max_memory_bytes: usize,
106
107 /// Current estimated memory usage
108 ///
109 /// Updated via `update_memory_usage()` as data flows through the system.
110 current_memory_bytes: usize,
111
112 /// Record processing statistics
113 ///
114 /// Count of records successfully processed.
115 records_processed: u64,
116
117 /// Bytes processed
118 ///
119 /// Total byte volume processed (input record sizes).
120 bytes_processed: u64,
121}
122
123impl StreamingProcessor {
124 /// Create a new streaming processor with memory limit
125 ///
126 /// # Arguments
127 ///
128 /// * `max_memory_mb` - Maximum memory usage in megabytes (MiB)
129 ///
130 /// # Examples
131 ///
132 /// ```rust
133 /// use copybook_codec_memory::StreamingProcessor;
134 ///
135 /// let processor = StreamingProcessor::new(512); // 512 MiB limit
136 /// assert!(!processor.is_memory_pressure());
137 /// ```
138 #[inline]
139 #[must_use]
140 pub fn new(max_memory_mb: usize) -> Self {
141 Self {
142 max_memory_bytes: max_memory_mb * 1024 * 1024,
143 current_memory_bytes: 0,
144 records_processed: 0,
145 bytes_processed: 0,
146 }
147 }
148
149 /// Create with default 256 MiB limit
150 ///
151 /// Default limit matches copybook-rs steady-state memory target for
152 /// processing multi-GB COBOL files.
153 ///
154 /// # Examples
155 ///
156 /// ```rust
157 /// use copybook_codec_memory::StreamingProcessor;
158 ///
159 /// let processor = StreamingProcessor::with_default_limit();
160 /// let stats = processor.stats();
161 /// assert_eq!(stats.max_memory_bytes, 256 * 1024 * 1024);
162 /// ```
163 #[inline]
164 #[must_use]
165 pub fn with_default_limit() -> Self {
166 Self::new(256)
167 }
168
169 /// Check if we're approaching memory limit
170 ///
171 /// Returns `true` when current memory usage exceeds 80% of the maximum limit.
172 /// This is the signal to apply backpressure: flush buffers, throttle input,
173 /// or reduce batch sizes.
174 ///
175 /// # Examples
176 ///
177 /// ```rust
178 /// use copybook_codec_memory::StreamingProcessor;
179 ///
180 /// let mut processor = StreamingProcessor::new(1); // 1 MiB limit
181 ///
182 /// processor.update_memory_usage(500 * 1024); // 500 KB
183 /// assert!(!processor.is_memory_pressure()); // <80%
184 ///
185 /// processor.update_memory_usage(400 * 1024); // +400 KB = 900 KB total
186 /// assert!(processor.is_memory_pressure()); // >80%
187 /// ```
188 #[inline]
189 #[must_use]
190 pub fn is_memory_pressure(&self) -> bool {
191 self.current_memory_bytes > (self.max_memory_bytes * 80 / 100) // 80% threshold
192 }
193
194 /// Update memory usage estimate
195 ///
196 /// Track memory allocations (+) and deallocations (-) to maintain
197 /// current memory usage estimate. Use this to track all significant
198 /// buffers in the processing pipeline.
199 ///
200 /// **Performance optimization**: Optimized for hot path with minimal branching.
201 ///
202 /// # Arguments
203 ///
204 /// * `bytes_delta` - Signed byte count change (positive = allocation, negative = deallocation)
205 ///
206 /// # Examples
207 ///
208 /// ```rust
209 /// use copybook_codec_memory::StreamingProcessor;
210 ///
211 /// let mut processor = StreamingProcessor::new(256);
212 ///
213 /// // Allocate 8 KB buffer
214 /// processor.update_memory_usage(8192);
215 /// assert_eq!(processor.stats().current_memory_bytes, 8192);
216 ///
217 /// // Deallocate 4 KB
218 /// processor.update_memory_usage(-4096);
219 /// assert_eq!(processor.stats().current_memory_bytes, 4096);
220 ///
221 /// // Saturating behavior (no underflow)
222 /// processor.update_memory_usage(-10000);
223 /// assert_eq!(processor.stats().current_memory_bytes, 0);
224 /// ```
225 #[inline]
226 pub fn update_memory_usage(&mut self, bytes_delta: isize) {
227 if let Ok(increase) = usize::try_from(bytes_delta) {
228 self.current_memory_bytes = self.current_memory_bytes.saturating_add(increase);
229 } else {
230 let decrease = bytes_delta.unsigned_abs();
231 self.current_memory_bytes = self.current_memory_bytes.saturating_sub(decrease);
232 }
233 }
234
235 /// Record processing of a record
236 ///
237 /// Updates counters for completed record processing. Call this after
238 /// successfully processing each record.
239 ///
240 /// # Arguments
241 ///
242 /// * `record_size` - Size of the processed record in bytes
243 ///
244 /// # Examples
245 ///
246 /// ```rust
247 /// use copybook_codec_memory::StreamingProcessor;
248 ///
249 /// let mut processor = StreamingProcessor::with_default_limit();
250 ///
251 /// processor.record_processed(1024);
252 /// processor.record_processed(2048);
253 ///
254 /// let stats = processor.stats();
255 /// assert_eq!(stats.records_processed, 2);
256 /// assert_eq!(stats.bytes_processed, 3072);
257 /// ```
258 #[inline]
259 pub fn record_processed(&mut self, record_size: usize) {
260 self.records_processed += 1;
261 self.bytes_processed += record_size as u64;
262 }
263
264 /// Get processing statistics
265 ///
266 /// Returns current operational statistics including memory usage,
267 /// utilization percentage, and processing throughput metrics.
268 ///
269 /// # Examples
270 ///
271 /// ```rust
272 /// use copybook_codec_memory::StreamingProcessor;
273 ///
274 /// let mut processor = StreamingProcessor::new(100); // 100 MiB
275 /// processor.update_memory_usage(50 * 1024 * 1024); // 50 MiB
276 /// processor.record_processed(1000);
277 ///
278 /// let stats = processor.stats();
279 /// assert_eq!(stats.memory_utilization_percent, 50);
280 /// assert_eq!(stats.records_processed, 1);
281 /// assert_eq!(stats.bytes_processed, 1000);
282 /// ```
283 #[inline]
284 #[must_use]
285 pub fn stats(&self) -> StreamingProcessorStats {
286 StreamingProcessorStats {
287 max_memory_bytes: self.max_memory_bytes,
288 current_memory_bytes: self.current_memory_bytes,
289 memory_utilization_percent: (self.current_memory_bytes * 100) / self.max_memory_bytes,
290 records_processed: self.records_processed,
291 bytes_processed: self.bytes_processed,
292 }
293 }
294}
295
296/// Statistics about streaming processor operation
297///
298/// Snapshot of current memory usage and processing metrics.
299#[derive(Debug, Clone)]
300pub struct StreamingProcessorStats {
301 /// Maximum memory limit (bytes)
302 pub max_memory_bytes: usize,
303
304 /// Current memory usage (bytes)
305 ///
306 /// Estimated total memory across tracked buffers.
307 pub current_memory_bytes: usize,
308
309 /// Memory utilization percentage
310 ///
311 /// Current usage as percentage of max (0-100).
312 pub memory_utilization_percent: usize,
313
314 /// Records processed
315 ///
316 /// Total count of successfully processed records.
317 pub records_processed: u64,
318
319 /// Bytes processed
320 ///
321 /// Total input byte volume processed.
322 pub bytes_processed: u64,
323}