#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))]
use copybook_core::Schema;
use copybook_error::{Error, ErrorCode, ErrorContext, Result};
use std::convert::TryFrom;
use std::io::{ErrorKind, Read, Write};
use tracing::{debug, warn};
#[derive(Debug)]
pub struct FixedRecordReader<R: Read> {
input: R,
lrecl: u32,
record_count: u64,
}
impl<R: Read> FixedRecordReader<R> {
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn new(input: R, lrecl: Option<u32>) -> Result<Self> {
let lrecl = lrecl.ok_or_else(|| {
Error::new(
ErrorCode::CBKI001_INVALID_STATE,
"Fixed format requires LRECL",
)
})?;
if lrecl == 0 {
return Err(Error::new(
ErrorCode::CBKI001_INVALID_STATE,
"LRECL must be greater than zero",
));
}
Ok(Self {
input,
lrecl,
record_count: 0,
})
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn read_record(&mut self) -> Result<Option<Vec<u8>>> {
let mut first_byte = [0u8; 1];
match self.input.read_exact(&mut first_byte) {
Ok(()) => {
let lrecl_len = self.lrecl_usize()?;
let mut buffer = vec![0u8; lrecl_len];
buffer[0] = first_byte[0];
if lrecl_len > 1 {
match self.input.read_exact(&mut buffer[1..]) {
Ok(()) => {
self.record_count += 1;
debug!(
"Read fixed record {} of {} bytes",
self.record_count, self.lrecl
);
Ok(Some(buffer))
}
Err(e) if e.kind() == ErrorKind::UnexpectedEof => Err(Error::new(
ErrorCode::CBKF221_RDW_UNDERFLOW,
format!(
"Incomplete record at end of file: expected {} bytes",
self.lrecl
),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: None,
line_number: None,
details: Some("File ends with partial record".to_string()),
})),
Err(e) => Err(Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error reading record: {e}"),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: None,
line_number: None,
details: None,
})),
}
} else {
self.record_count += 1;
debug!(
"Read fixed record {} of {} bytes",
self.record_count, self.lrecl
);
Ok(Some(buffer))
}
}
Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
debug!("Reached EOF after {} records", self.record_count);
Ok(None)
}
Err(e) => Err(Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error reading record: {e}"),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: None,
line_number: None,
details: None,
})),
}
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn validate_record_length(&self, schema: &Schema, record_data: &[u8]) -> Result<()> {
let lrecl_len = self.lrecl_usize()?;
if record_data.len() != lrecl_len {
return Err(Error::new(
ErrorCode::CBKF221_RDW_UNDERFLOW,
format!(
"Record length mismatch: expected {}, got {}",
self.lrecl,
record_data.len()
),
)
.with_context(ErrorContext {
record_index: Some(self.record_count),
field_path: None,
byte_offset: None,
line_number: None,
details: Some("Fixed record length validation failed".to_string()),
}));
}
if let Some(schema_lrecl) = schema.lrecl_fixed
&& self.lrecl != schema_lrecl
{
warn!(
"LRECL mismatch: reader configured for {}, schema expects {}",
self.lrecl, schema_lrecl
);
}
if schema.tail_odo.is_some() {
debug!("Record has ODO tail, variable length within fixed LRECL is expected");
}
Ok(())
}
#[must_use]
#[inline]
pub fn record_count(&self) -> u64 {
self.record_count
}
#[must_use]
#[inline]
pub fn lrecl(&self) -> u32 {
self.lrecl
}
#[inline]
fn lrecl_usize(&self) -> Result<usize> {
usize::try_from(self.lrecl).map_err(|_| {
Error::new(
ErrorCode::CBKP001_SYNTAX,
"LRECL exceeds platform addressable size",
)
})
}
}
#[derive(Debug)]
pub struct FixedRecordWriter<W: Write> {
output: W,
lrecl: u32,
record_count: u64,
}
impl<W: Write> FixedRecordWriter<W> {
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn new(output: W, lrecl: Option<u32>) -> Result<Self> {
let lrecl = lrecl.ok_or_else(|| {
Error::new(
ErrorCode::CBKI001_INVALID_STATE,
"Fixed format requires LRECL",
)
})?;
if lrecl == 0 {
return Err(Error::new(
ErrorCode::CBKI001_INVALID_STATE,
"LRECL must be greater than zero",
));
}
Ok(Self {
output,
lrecl,
record_count: 0,
})
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn write_record(&mut self, data: &[u8]) -> Result<()> {
let data_len = data.len();
let lrecl = self.lrecl_usize()?;
if data_len > lrecl {
return Err(Error::new(
ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
format!("Record too long: {data_len} bytes exceeds LRECL of {lrecl}"),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: None,
line_number: None,
details: Some("Record exceeds fixed length".to_string()),
}));
}
self.output.write_all(data).map_err(|e| {
Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error writing record: {e}"),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: None,
line_number: None,
details: None,
})
})?;
if data_len < lrecl {
let padding = vec![0u8; lrecl - data_len];
self.output.write_all(&padding).map_err(|e| {
Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error writing padding: {e}"),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: Some(u64::try_from(data_len).unwrap_or(u64::MAX)),
line_number: None,
details: Some("Error writing record padding".to_string()),
})
})?;
}
self.record_count += 1;
debug!(
"Wrote fixed record {} of {} bytes (data: {}, padding: {})",
self.record_count,
lrecl,
data_len,
lrecl - data_len
);
Ok(())
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn flush(&mut self) -> Result<()> {
self.output.flush().map_err(|e| {
Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error flushing output: {e}"),
)
})
}
#[must_use]
#[inline]
pub fn record_count(&self) -> u64 {
self.record_count
}
#[must_use]
#[inline]
pub fn lrecl(&self) -> u32 {
self.lrecl
}
#[inline]
fn lrecl_usize(&self) -> Result<usize> {
usize::try_from(self.lrecl).map_err(|_| {
Error::new(
ErrorCode::CBKP001_SYNTAX,
"LRECL exceeds platform addressable size",
)
})
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use proptest::collection::vec;
use proptest::prelude::*;
use std::io::Cursor;
#[test]
fn fixed_record_reader_basic() {
let data = b"ABCD1234EFGH5678";
let mut reader = FixedRecordReader::new(Cursor::new(data), Some(8)).unwrap();
let record1 = reader.read_record().unwrap().unwrap();
assert_eq!(record1, b"ABCD1234");
assert_eq!(reader.record_count(), 1);
let record2 = reader.read_record().unwrap().unwrap();
assert_eq!(record2, b"EFGH5678");
assert_eq!(reader.record_count(), 2);
let record3 = reader.read_record().unwrap();
assert!(record3.is_none());
}
#[test]
fn fixed_record_reader_partial_record_is_underflow() {
let data = b"ABCD123";
let mut reader = FixedRecordReader::new(Cursor::new(data), Some(8)).unwrap();
let error = reader.read_record().unwrap_err();
assert_eq!(error.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
}
#[test]
fn fixed_record_reader_zero_lrecl_is_invalid_state() {
let data = b"test";
let error = FixedRecordReader::new(Cursor::new(data), Some(0)).unwrap_err();
assert_eq!(error.code, ErrorCode::CBKI001_INVALID_STATE);
}
#[test]
fn fixed_record_reader_missing_lrecl_is_invalid_state() {
let data = b"test";
let error = FixedRecordReader::new(Cursor::new(data), None).unwrap_err();
assert_eq!(error.code, ErrorCode::CBKI001_INVALID_STATE);
}
#[test]
fn fixed_record_writer_basic() {
let mut output = Vec::new();
let mut writer = FixedRecordWriter::new(&mut output, Some(8)).unwrap();
writer.write_record(b"ABCD1234").unwrap();
writer.write_record(b"XYZ").unwrap();
writer.flush().unwrap();
assert_eq!(writer.record_count(), 2);
assert_eq!(output, b"ABCD1234XYZ\x00\x00\x00\x00\x00");
}
#[test]
fn fixed_record_writer_too_long_is_cbke501() {
let mut output = Vec::new();
let mut writer = FixedRecordWriter::new(&mut output, Some(4)).unwrap();
let error = writer.write_record(b"ABCDEFGH").unwrap_err();
assert_eq!(error.code, ErrorCode::CBKE501_JSON_TYPE_MISMATCH);
}
#[test]
fn fixed_record_writer_zero_lrecl_is_invalid_state() {
let mut output = Vec::new();
let error = FixedRecordWriter::new(&mut output, Some(0)).unwrap_err();
assert_eq!(error.code, ErrorCode::CBKI001_INVALID_STATE);
}
#[test]
fn fixed_record_writer_missing_lrecl_is_invalid_state() {
let mut output = Vec::new();
let error = FixedRecordWriter::new(&mut output, None).unwrap_err();
assert_eq!(error.code, ErrorCode::CBKI001_INVALID_STATE);
}
#[test]
fn validate_record_length_ok() {
let data = b"ABCD1234";
let mut reader = FixedRecordReader::new(Cursor::new(data), Some(8)).unwrap();
let record = reader.read_record().unwrap().unwrap();
let schema = Schema::new();
reader.validate_record_length(&schema, &record).unwrap();
}
#[test]
fn validate_record_length_mismatch_is_underflow() {
let data = b"ABCD1234";
let mut reader = FixedRecordReader::new(Cursor::new(data), Some(8)).unwrap();
let _ = reader.read_record().unwrap().unwrap();
let schema = Schema::new();
let error = reader.validate_record_length(&schema, b"ABC").unwrap_err();
assert_eq!(error.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
}
proptest! {
#[test]
fn prop_fixed_writer_reader_roundtrip(
lrecl in 1u16..=512u16,
payload in vec(any::<u8>(), 0..=512),
) {
prop_assume!(payload.len() <= usize::from(lrecl));
let mut encoded = Vec::new();
let mut writer = FixedRecordWriter::new(&mut encoded, Some(u32::from(lrecl))).unwrap();
writer.write_record(&payload).unwrap();
writer.flush().unwrap();
prop_assert_eq!(encoded.len(), usize::from(lrecl));
prop_assert_eq!(&encoded[..payload.len()], payload.as_slice());
let mut reader = FixedRecordReader::new(Cursor::new(&encoded), Some(u32::from(lrecl))).unwrap();
let decoded = reader.read_record().unwrap().unwrap();
prop_assert_eq!(decoded, encoded.as_slice());
prop_assert_eq!(reader.read_record().unwrap(), None);
}
#[test]
fn prop_fixed_writer_rejects_oversize_payload(
lrecl in 1u16..=128u16,
extra in 1usize..=64usize,
) {
let mut output = Vec::new();
let mut writer = FixedRecordWriter::new(&mut output, Some(u32::from(lrecl))).unwrap();
let payload = vec![0x41; usize::from(lrecl) + extra];
let error = writer.write_record(&payload).unwrap_err();
prop_assert_eq!(error.code, ErrorCode::CBKE501_JSON_TYPE_MISMATCH);
}
}
#[test]
fn fixed_reader_empty_file_returns_none() {
let mut reader = FixedRecordReader::new(Cursor::new(Vec::<u8>::new()), Some(8)).unwrap();
assert!(reader.read_record().unwrap().is_none());
assert_eq!(reader.record_count(), 0);
}
#[test]
fn fixed_reader_single_byte_lrecl() {
let data = b"ABCDE";
let mut reader = FixedRecordReader::new(Cursor::new(data.as_slice()), Some(1)).unwrap();
for expected in b"ABCDE" {
let record = reader.read_record().unwrap().unwrap();
assert_eq!(record, vec![*expected]);
}
assert!(reader.read_record().unwrap().is_none());
assert_eq!(reader.record_count(), 5);
}
#[test]
fn fixed_reader_lrecl_accessor() {
let reader = FixedRecordReader::new(Cursor::new(Vec::<u8>::new()), Some(42)).unwrap();
assert_eq!(reader.lrecl(), 42);
}
#[test]
fn fixed_writer_lrecl_accessor() {
let mut output = Vec::new();
let writer = FixedRecordWriter::new(&mut output, Some(42)).unwrap();
assert_eq!(writer.lrecl(), 42);
}
#[test]
fn fixed_writer_exact_lrecl_no_padding() {
let mut output = Vec::new();
let mut writer = FixedRecordWriter::new(&mut output, Some(4)).unwrap();
writer.write_record(b"ABCD").unwrap();
writer.flush().unwrap();
assert_eq!(output, b"ABCD");
}
#[test]
fn fixed_writer_empty_payload_full_padding() {
let mut output = Vec::new();
let mut writer = FixedRecordWriter::new(&mut output, Some(4)).unwrap();
writer.write_record(b"").unwrap();
writer.flush().unwrap();
assert_eq!(output, vec![0u8; 4]);
}
#[test]
fn fixed_multi_record_write_read_roundtrip() {
let lrecl = 10u32;
let payloads: Vec<&[u8]> = vec![b"AAAAAAAAAA", b"BB", b"CCCCCCCCCC"];
let mut encoded = Vec::new();
{
let mut writer = FixedRecordWriter::new(&mut encoded, Some(lrecl)).unwrap();
for p in &payloads {
writer.write_record(p).unwrap();
}
writer.flush().unwrap();
assert_eq!(writer.record_count(), 3);
}
assert_eq!(encoded.len(), 30);
let mut reader = FixedRecordReader::new(Cursor::new(&encoded), Some(lrecl)).unwrap();
for (i, expected) in payloads.iter().enumerate() {
let record = reader.read_record().unwrap().unwrap();
assert_eq!(
&record[..expected.len()],
*expected,
"record {i} data mismatch"
);
assert!(
record[expected.len()..].iter().all(|&b| b == 0),
"record {i} padding mismatch"
);
}
assert!(reader.read_record().unwrap().is_none());
assert_eq!(reader.record_count(), 3);
}
#[test]
fn fixed_streaming_many_records() {
let lrecl = 16u32;
let record_count = 500u64;
let payload = b"STREAMING_FIXED_";
assert_eq!(payload.len(), lrecl as usize);
let mut encoded = Vec::new();
{
let mut writer = FixedRecordWriter::new(&mut encoded, Some(lrecl)).unwrap();
for _ in 0..record_count {
writer.write_record(payload).unwrap();
}
writer.flush().unwrap();
}
let mut reader = FixedRecordReader::new(Cursor::new(&encoded), Some(lrecl)).unwrap();
let mut count = 0u64;
while let Some(record) = reader.read_record().unwrap() {
assert_eq!(record.as_slice(), payload.as_slice());
count += 1;
}
assert_eq!(count, record_count);
assert_eq!(reader.record_count(), record_count);
}
}