Skip to main content

copybook_fixed/
lib.rs

1#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))]
2// SPDX-License-Identifier: AGPL-3.0-or-later
3//! Fixed-length record framing primitives.
4//!
5//! This crate intentionally focuses on one concern:
6//! reading and writing LRECL-framed records with deterministic padding and
7//! structured error mapping.
8//!
9//! Use [`FixedRecordReader`] to consume fixed-length records from a byte stream
10//! and [`FixedRecordWriter`] to produce them with automatic null-byte padding.
11
12use copybook_core::Schema;
13use copybook_error::{Error, ErrorCode, ErrorContext, Result};
14use std::convert::TryFrom;
15use std::io::{ErrorKind, Read, Write};
16use tracing::{debug, warn};
17
18/// Fixed record reader for processing fixed-length records.
19#[derive(Debug)]
20pub struct FixedRecordReader<R: Read> {
21    input: R,
22    lrecl: u32,
23    record_count: u64,
24}
25
26impl<R: Read> FixedRecordReader<R> {
27    /// Create a new fixed record reader.
28    ///
29    /// # Errors
30    /// Returns an error if no LRECL is provided or if it is zero.
31    #[inline]
32    #[must_use = "Handle the Result or propagate the error"]
33    pub fn new(input: R, lrecl: Option<u32>) -> Result<Self> {
34        let lrecl = lrecl.ok_or_else(|| {
35            Error::new(
36                ErrorCode::CBKI001_INVALID_STATE,
37                "Fixed format requires LRECL",
38            )
39        })?;
40
41        if lrecl == 0 {
42            return Err(Error::new(
43                ErrorCode::CBKI001_INVALID_STATE,
44                "LRECL must be greater than zero",
45            ));
46        }
47
48        Ok(Self {
49            input,
50            lrecl,
51            record_count: 0,
52        })
53    }
54
55    /// Read the next record.
56    ///
57    /// # Errors
58    /// Returns an error if the record cannot be read due to I/O errors.
59    #[inline]
60    #[must_use = "Handle the Result or propagate the error"]
61    pub fn read_record(&mut self) -> Result<Option<Vec<u8>>> {
62        // Read one byte first so true EOF can be treated as `Ok(None)`.
63        let mut first_byte = [0u8; 1];
64        match self.input.read_exact(&mut first_byte) {
65            Ok(()) => {
66                let lrecl_len = self.lrecl_usize()?;
67                let mut buffer = vec![0u8; lrecl_len];
68                buffer[0] = first_byte[0];
69
70                if lrecl_len > 1 {
71                    match self.input.read_exact(&mut buffer[1..]) {
72                        Ok(()) => {
73                            self.record_count += 1;
74                            debug!(
75                                "Read fixed record {} of {} bytes",
76                                self.record_count, self.lrecl
77                            );
78                            Ok(Some(buffer))
79                        }
80                        Err(e) if e.kind() == ErrorKind::UnexpectedEof => Err(Error::new(
81                            ErrorCode::CBKF221_RDW_UNDERFLOW,
82                            format!(
83                                "Incomplete record at end of file: expected {} bytes",
84                                self.lrecl
85                            ),
86                        )
87                        .with_context(ErrorContext {
88                            record_index: Some(self.record_count + 1),
89                            field_path: None,
90                            byte_offset: None,
91                            line_number: None,
92                            details: Some("File ends with partial record".to_string()),
93                        })),
94                        Err(e) => Err(Error::new(
95                            ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
96                            format!("I/O error reading record: {e}"),
97                        )
98                        .with_context(ErrorContext {
99                            record_index: Some(self.record_count + 1),
100                            field_path: None,
101                            byte_offset: None,
102                            line_number: None,
103                            details: None,
104                        })),
105                    }
106                } else {
107                    self.record_count += 1;
108                    debug!(
109                        "Read fixed record {} of {} bytes",
110                        self.record_count, self.lrecl
111                    );
112                    Ok(Some(buffer))
113                }
114            }
115            Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
116                debug!("Reached EOF after {} records", self.record_count);
117                Ok(None)
118            }
119            Err(e) => Err(Error::new(
120                ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
121                format!("I/O error reading record: {e}"),
122            )
123            .with_context(ErrorContext {
124                record_index: Some(self.record_count + 1),
125                field_path: None,
126                byte_offset: None,
127                line_number: None,
128                details: None,
129            })),
130        }
131    }
132
133    /// Validate record length against schema expectations.
134    ///
135    /// # Errors
136    /// Returns an error if the record length does not match configured LRECL.
137    #[inline]
138    #[must_use = "Handle the Result or propagate the error"]
139    pub fn validate_record_length(&self, schema: &Schema, record_data: &[u8]) -> Result<()> {
140        let lrecl_len = self.lrecl_usize()?;
141        if record_data.len() != lrecl_len {
142            return Err(Error::new(
143                ErrorCode::CBKF221_RDW_UNDERFLOW,
144                format!(
145                    "Record length mismatch: expected {}, got {}",
146                    self.lrecl,
147                    record_data.len()
148                ),
149            )
150            .with_context(ErrorContext {
151                record_index: Some(self.record_count),
152                field_path: None,
153                byte_offset: None,
154                line_number: None,
155                details: Some("Fixed record length validation failed".to_string()),
156            }));
157        }
158
159        if let Some(schema_lrecl) = schema.lrecl_fixed
160            && self.lrecl != schema_lrecl
161        {
162            warn!(
163                "LRECL mismatch: reader configured for {}, schema expects {}",
164                self.lrecl, schema_lrecl
165            );
166        }
167
168        if schema.tail_odo.is_some() {
169            debug!("Record has ODO tail, variable length within fixed LRECL is expected");
170        }
171
172        Ok(())
173    }
174
175    /// Get the current record count.
176    #[must_use]
177    #[inline]
178    pub fn record_count(&self) -> u64 {
179        self.record_count
180    }
181
182    /// Get the configured LRECL.
183    #[must_use]
184    #[inline]
185    pub fn lrecl(&self) -> u32 {
186        self.lrecl
187    }
188
189    #[inline]
190    fn lrecl_usize(&self) -> Result<usize> {
191        usize::try_from(self.lrecl).map_err(|_| {
192            Error::new(
193                ErrorCode::CBKP001_SYNTAX,
194                "LRECL exceeds platform addressable size",
195            )
196        })
197    }
198}
199
200/// Fixed record writer for writing fixed-length records.
201#[derive(Debug)]
202pub struct FixedRecordWriter<W: Write> {
203    output: W,
204    lrecl: u32,
205    record_count: u64,
206}
207
208impl<W: Write> FixedRecordWriter<W> {
209    /// Create a new fixed record writer.
210    ///
211    /// # Errors
212    /// Returns an error if no LRECL is provided or if it is zero.
213    #[inline]
214    #[must_use = "Handle the Result or propagate the error"]
215    pub fn new(output: W, lrecl: Option<u32>) -> Result<Self> {
216        let lrecl = lrecl.ok_or_else(|| {
217            Error::new(
218                ErrorCode::CBKI001_INVALID_STATE,
219                "Fixed format requires LRECL",
220            )
221        })?;
222
223        if lrecl == 0 {
224            return Err(Error::new(
225                ErrorCode::CBKI001_INVALID_STATE,
226                "LRECL must be greater than zero",
227            ));
228        }
229
230        Ok(Self {
231            output,
232            lrecl,
233            record_count: 0,
234        })
235    }
236
237    /// Write a record and pad with `0x00` to LRECL.
238    ///
239    /// # Errors
240    /// Returns an error if the record is longer than LRECL or I/O fails.
241    #[inline]
242    #[must_use = "Handle the Result or propagate the error"]
243    pub fn write_record(&mut self, data: &[u8]) -> Result<()> {
244        let data_len = data.len();
245        let lrecl = self.lrecl_usize()?;
246
247        if data_len > lrecl {
248            return Err(Error::new(
249                ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
250                format!("Record too long: {data_len} bytes exceeds LRECL of {lrecl}"),
251            )
252            .with_context(ErrorContext {
253                record_index: Some(self.record_count + 1),
254                field_path: None,
255                byte_offset: None,
256                line_number: None,
257                details: Some("Record exceeds fixed length".to_string()),
258            }));
259        }
260
261        self.output.write_all(data).map_err(|e| {
262            Error::new(
263                ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
264                format!("I/O error writing record: {e}"),
265            )
266            .with_context(ErrorContext {
267                record_index: Some(self.record_count + 1),
268                field_path: None,
269                byte_offset: None,
270                line_number: None,
271                details: None,
272            })
273        })?;
274
275        if data_len < lrecl {
276            let padding = vec![0u8; lrecl - data_len];
277            self.output.write_all(&padding).map_err(|e| {
278                Error::new(
279                    ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
280                    format!("I/O error writing padding: {e}"),
281                )
282                .with_context(ErrorContext {
283                    record_index: Some(self.record_count + 1),
284                    field_path: None,
285                    byte_offset: Some(u64::try_from(data_len).unwrap_or(u64::MAX)),
286                    line_number: None,
287                    details: Some("Error writing record padding".to_string()),
288                })
289            })?;
290        }
291
292        self.record_count += 1;
293        debug!(
294            "Wrote fixed record {} of {} bytes (data: {}, padding: {})",
295            self.record_count,
296            lrecl,
297            data_len,
298            lrecl - data_len
299        );
300        Ok(())
301    }
302
303    /// Flush the output.
304    ///
305    /// # Errors
306    /// Returns an error if the flush operation fails.
307    #[inline]
308    #[must_use = "Handle the Result or propagate the error"]
309    pub fn flush(&mut self) -> Result<()> {
310        self.output.flush().map_err(|e| {
311            Error::new(
312                ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
313                format!("I/O error flushing output: {e}"),
314            )
315        })
316    }
317
318    /// Get the current record count.
319    #[must_use]
320    #[inline]
321    pub fn record_count(&self) -> u64 {
322        self.record_count
323    }
324
325    /// Get the configured LRECL.
326    #[must_use]
327    #[inline]
328    pub fn lrecl(&self) -> u32 {
329        self.lrecl
330    }
331
332    #[inline]
333    fn lrecl_usize(&self) -> Result<usize> {
334        usize::try_from(self.lrecl).map_err(|_| {
335            Error::new(
336                ErrorCode::CBKP001_SYNTAX,
337                "LRECL exceeds platform addressable size",
338            )
339        })
340    }
341}
342
343#[cfg(test)]
344#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
345mod tests {
346    use super::*;
347    use proptest::collection::vec;
348    use proptest::prelude::*;
349    use std::io::Cursor;
350
351    #[test]
352    fn fixed_record_reader_basic() {
353        let data = b"ABCD1234EFGH5678";
354        let mut reader = FixedRecordReader::new(Cursor::new(data), Some(8)).unwrap();
355
356        let record1 = reader.read_record().unwrap().unwrap();
357        assert_eq!(record1, b"ABCD1234");
358        assert_eq!(reader.record_count(), 1);
359
360        let record2 = reader.read_record().unwrap().unwrap();
361        assert_eq!(record2, b"EFGH5678");
362        assert_eq!(reader.record_count(), 2);
363
364        let record3 = reader.read_record().unwrap();
365        assert!(record3.is_none());
366    }
367
368    #[test]
369    fn fixed_record_reader_partial_record_is_underflow() {
370        let data = b"ABCD123";
371        let mut reader = FixedRecordReader::new(Cursor::new(data), Some(8)).unwrap();
372
373        let error = reader.read_record().unwrap_err();
374        assert_eq!(error.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
375    }
376
377    #[test]
378    fn fixed_record_reader_zero_lrecl_is_invalid_state() {
379        let data = b"test";
380        let error = FixedRecordReader::new(Cursor::new(data), Some(0)).unwrap_err();
381        assert_eq!(error.code, ErrorCode::CBKI001_INVALID_STATE);
382    }
383
384    #[test]
385    fn fixed_record_reader_missing_lrecl_is_invalid_state() {
386        let data = b"test";
387        let error = FixedRecordReader::new(Cursor::new(data), None).unwrap_err();
388        assert_eq!(error.code, ErrorCode::CBKI001_INVALID_STATE);
389    }
390
391    #[test]
392    fn fixed_record_writer_basic() {
393        let mut output = Vec::new();
394        let mut writer = FixedRecordWriter::new(&mut output, Some(8)).unwrap();
395
396        writer.write_record(b"ABCD1234").unwrap();
397        writer.write_record(b"XYZ").unwrap();
398        writer.flush().unwrap();
399
400        assert_eq!(writer.record_count(), 2);
401        assert_eq!(output, b"ABCD1234XYZ\x00\x00\x00\x00\x00");
402    }
403
404    #[test]
405    fn fixed_record_writer_too_long_is_cbke501() {
406        let mut output = Vec::new();
407        let mut writer = FixedRecordWriter::new(&mut output, Some(4)).unwrap();
408
409        let error = writer.write_record(b"ABCDEFGH").unwrap_err();
410        assert_eq!(error.code, ErrorCode::CBKE501_JSON_TYPE_MISMATCH);
411    }
412
413    #[test]
414    fn fixed_record_writer_zero_lrecl_is_invalid_state() {
415        let mut output = Vec::new();
416        let error = FixedRecordWriter::new(&mut output, Some(0)).unwrap_err();
417        assert_eq!(error.code, ErrorCode::CBKI001_INVALID_STATE);
418    }
419
420    #[test]
421    fn fixed_record_writer_missing_lrecl_is_invalid_state() {
422        let mut output = Vec::new();
423        let error = FixedRecordWriter::new(&mut output, None).unwrap_err();
424        assert_eq!(error.code, ErrorCode::CBKI001_INVALID_STATE);
425    }
426
427    #[test]
428    fn validate_record_length_ok() {
429        let data = b"ABCD1234";
430        let mut reader = FixedRecordReader::new(Cursor::new(data), Some(8)).unwrap();
431        let record = reader.read_record().unwrap().unwrap();
432
433        let schema = Schema::new();
434        reader.validate_record_length(&schema, &record).unwrap();
435    }
436
437    #[test]
438    fn validate_record_length_mismatch_is_underflow() {
439        let data = b"ABCD1234";
440        let mut reader = FixedRecordReader::new(Cursor::new(data), Some(8)).unwrap();
441        let _ = reader.read_record().unwrap().unwrap();
442
443        let schema = Schema::new();
444        let error = reader.validate_record_length(&schema, b"ABC").unwrap_err();
445        assert_eq!(error.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
446    }
447
448    proptest! {
449        #[test]
450        fn prop_fixed_writer_reader_roundtrip(
451            lrecl in 1u16..=512u16,
452            payload in vec(any::<u8>(), 0..=512),
453        ) {
454            prop_assume!(payload.len() <= usize::from(lrecl));
455            let mut encoded = Vec::new();
456            let mut writer = FixedRecordWriter::new(&mut encoded, Some(u32::from(lrecl))).unwrap();
457            writer.write_record(&payload).unwrap();
458            writer.flush().unwrap();
459            prop_assert_eq!(encoded.len(), usize::from(lrecl));
460            prop_assert_eq!(&encoded[..payload.len()], payload.as_slice());
461
462            let mut reader = FixedRecordReader::new(Cursor::new(&encoded), Some(u32::from(lrecl))).unwrap();
463            let decoded = reader.read_record().unwrap().unwrap();
464            prop_assert_eq!(decoded, encoded.as_slice());
465            prop_assert_eq!(reader.read_record().unwrap(), None);
466        }
467
468        #[test]
469        fn prop_fixed_writer_rejects_oversize_payload(
470            lrecl in 1u16..=128u16,
471            extra in 1usize..=64usize,
472        ) {
473            let mut output = Vec::new();
474            let mut writer = FixedRecordWriter::new(&mut output, Some(u32::from(lrecl))).unwrap();
475            let payload = vec![0x41; usize::from(lrecl) + extra];
476            let error = writer.write_record(&payload).unwrap_err();
477            prop_assert_eq!(error.code, ErrorCode::CBKE501_JSON_TYPE_MISMATCH);
478        }
479    }
480
481    // ---- additional coverage for fixed-length framing ----
482
483    #[test]
484    fn fixed_reader_empty_file_returns_none() {
485        let mut reader = FixedRecordReader::new(Cursor::new(Vec::<u8>::new()), Some(8)).unwrap();
486        assert!(reader.read_record().unwrap().is_none());
487        assert_eq!(reader.record_count(), 0);
488    }
489
490    #[test]
491    fn fixed_reader_single_byte_lrecl() {
492        let data = b"ABCDE";
493        let mut reader = FixedRecordReader::new(Cursor::new(data.as_slice()), Some(1)).unwrap();
494        for expected in b"ABCDE" {
495            let record = reader.read_record().unwrap().unwrap();
496            assert_eq!(record, vec![*expected]);
497        }
498        assert!(reader.read_record().unwrap().is_none());
499        assert_eq!(reader.record_count(), 5);
500    }
501
502    #[test]
503    fn fixed_reader_lrecl_accessor() {
504        let reader = FixedRecordReader::new(Cursor::new(Vec::<u8>::new()), Some(42)).unwrap();
505        assert_eq!(reader.lrecl(), 42);
506    }
507
508    #[test]
509    fn fixed_writer_lrecl_accessor() {
510        let mut output = Vec::new();
511        let writer = FixedRecordWriter::new(&mut output, Some(42)).unwrap();
512        assert_eq!(writer.lrecl(), 42);
513    }
514
515    #[test]
516    fn fixed_writer_exact_lrecl_no_padding() {
517        let mut output = Vec::new();
518        let mut writer = FixedRecordWriter::new(&mut output, Some(4)).unwrap();
519        writer.write_record(b"ABCD").unwrap();
520        writer.flush().unwrap();
521        assert_eq!(output, b"ABCD");
522    }
523
524    #[test]
525    fn fixed_writer_empty_payload_full_padding() {
526        let mut output = Vec::new();
527        let mut writer = FixedRecordWriter::new(&mut output, Some(4)).unwrap();
528        writer.write_record(b"").unwrap();
529        writer.flush().unwrap();
530        assert_eq!(output, vec![0u8; 4]);
531    }
532
533    #[test]
534    fn fixed_multi_record_write_read_roundtrip() {
535        let lrecl = 10u32;
536        let payloads: Vec<&[u8]> = vec![b"AAAAAAAAAA", b"BB", b"CCCCCCCCCC"];
537        let mut encoded = Vec::new();
538        {
539            let mut writer = FixedRecordWriter::new(&mut encoded, Some(lrecl)).unwrap();
540            for p in &payloads {
541                writer.write_record(p).unwrap();
542            }
543            writer.flush().unwrap();
544            assert_eq!(writer.record_count(), 3);
545        }
546        assert_eq!(encoded.len(), 30);
547
548        let mut reader = FixedRecordReader::new(Cursor::new(&encoded), Some(lrecl)).unwrap();
549        for (i, expected) in payloads.iter().enumerate() {
550            let record = reader.read_record().unwrap().unwrap();
551            assert_eq!(
552                &record[..expected.len()],
553                *expected,
554                "record {i} data mismatch"
555            );
556            // remaining bytes are zero-padded
557            assert!(
558                record[expected.len()..].iter().all(|&b| b == 0),
559                "record {i} padding mismatch"
560            );
561        }
562        assert!(reader.read_record().unwrap().is_none());
563        assert_eq!(reader.record_count(), 3);
564    }
565
566    #[test]
567    fn fixed_streaming_many_records() {
568        let lrecl = 16u32;
569        let record_count = 500u64;
570        let payload = b"STREAMING_FIXED_";
571        assert_eq!(payload.len(), lrecl as usize);
572
573        let mut encoded = Vec::new();
574        {
575            let mut writer = FixedRecordWriter::new(&mut encoded, Some(lrecl)).unwrap();
576            for _ in 0..record_count {
577                writer.write_record(payload).unwrap();
578            }
579            writer.flush().unwrap();
580        }
581
582        let mut reader = FixedRecordReader::new(Cursor::new(&encoded), Some(lrecl)).unwrap();
583        let mut count = 0u64;
584        while let Some(record) = reader.read_record().unwrap() {
585            assert_eq!(record.as_slice(), payload.as_slice());
586            count += 1;
587        }
588        assert_eq!(count, record_count);
589        assert_eq!(reader.record_count(), record_count);
590    }
591}