#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))]
use copybook_core::Schema;
use copybook_error::{Error, ErrorCode, ErrorContext, Result};
use std::io::{BufRead, BufReader, Read, Write};
use tracing::{debug, warn};
pub const RDW_HEADER_LEN: usize = 4;
pub const RDW_MAX_PAYLOAD_LEN: usize = u16::MAX as usize;
const RDW_READER_BUF_CAPACITY: usize = (u16::MAX as usize) + RDW_HEADER_LEN;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RdwHeader {
bytes: [u8; RDW_HEADER_LEN],
}
impl RdwHeader {
#[must_use]
#[inline]
pub const fn from_bytes(bytes: [u8; RDW_HEADER_LEN]) -> Self {
Self { bytes }
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn from_payload_len(payload_len: usize, reserved: u16) -> Result<Self> {
let len = rdw_payload_len_to_u16(payload_len)?;
let len_bytes = len.to_be_bytes();
let reserved_bytes = reserved.to_be_bytes();
Ok(Self {
bytes: [
len_bytes[0],
len_bytes[1],
reserved_bytes[0],
reserved_bytes[1],
],
})
}
#[must_use]
#[inline]
pub const fn bytes(self) -> [u8; RDW_HEADER_LEN] {
self.bytes
}
#[must_use]
#[inline]
pub const fn length(self) -> u16 {
u16::from_be_bytes([self.bytes[0], self.bytes[1]])
}
#[must_use]
#[inline]
pub const fn reserved(self) -> u16 {
u16::from_be_bytes([self.bytes[2], self.bytes[3]])
}
#[must_use]
#[inline]
pub const fn looks_ascii_corrupt(self) -> bool {
rdw_is_suspect_ascii_corruption(self.bytes)
}
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn rdw_payload_len_to_u16(len: usize) -> Result<u16> {
u16::try_from(len).map_err(|_| {
Error::new(
ErrorCode::CBKF102_RECORD_LENGTH_INVALID,
format!(
"RDW payload too large: {} bytes exceeds maximum of {}",
len,
u16::MAX
),
)
})
}
#[must_use]
#[inline]
pub const fn rdw_is_suspect_ascii_corruption(rdw_header: [u8; RDW_HEADER_LEN]) -> bool {
copybook_rdw_predicates::rdw_is_suspect_ascii_corruption(rdw_header)
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn rdw_read_len<R: BufRead>(reader: &mut R) -> Result<u16> {
let buf = reader.fill_buf().map_err(|e| {
Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error peeking RDW length: {e}"),
)
})?;
if buf.len() < 2 {
return Err(Error::new(
ErrorCode::CBKF102_RECORD_LENGTH_INVALID,
format!(
"Incomplete RDW header: expected 2 bytes for length (have {})",
buf.len()
),
));
}
let hi = buf[0];
let lo = buf[1];
reader.consume(2);
Ok(u16::from_be_bytes([hi, lo]))
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn rdw_slice_body<R: BufRead>(reader: &mut R, len: u16) -> Result<&[u8]> {
let need = usize::from(len);
if need == 0 {
return Ok(&[]);
}
let buf = reader.fill_buf().map_err(|e| {
Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error reading RDW payload: {e}"),
)
})?;
if buf.len() < need {
return Err(Error::new(
ErrorCode::CBKF102_RECORD_LENGTH_INVALID,
format!(
"Incomplete RDW record payload: expected {} bytes (have {})",
need,
buf.len()
),
));
}
Ok(&buf[..need])
}
#[inline]
#[must_use]
pub const fn rdw_validate_and_finish(body: &[u8]) -> &[u8] {
body
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn rdw_try_peek_len<R: BufRead>(reader: &mut R) -> Result<Option<()>> {
let buf = reader.fill_buf().map_err(|e| {
Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error peeking RDW header: {e}"),
)
})?;
if buf.len() <= 1 {
return Ok(None);
}
Ok(Some(()))
}
#[derive(Debug, Clone)]
pub struct RDWRecord {
pub header: [u8; RDW_HEADER_LEN],
pub payload: Vec<u8>,
}
impl RDWRecord {
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn try_new(payload: Vec<u8>) -> Result<Self> {
let header = RdwHeader::from_payload_len(payload.len(), 0)?.bytes();
Ok(Self { header, payload })
}
#[deprecated(
since = "0.4.3",
note = "use try_new() instead for fallible construction"
)]
#[allow(clippy::expect_used)] #[inline]
#[must_use]
pub fn new(payload: Vec<u8>) -> Self {
Self::try_new(payload).expect("RDW payload exceeds maximum size (65535 bytes)")
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn try_with_reserved(payload: Vec<u8>, reserved: u16) -> Result<Self> {
let header = RdwHeader::from_payload_len(payload.len(), reserved)?.bytes();
Ok(Self { header, payload })
}
#[deprecated(
since = "0.4.3",
note = "use try_with_reserved() instead for fallible construction"
)]
#[allow(clippy::expect_used)] #[inline]
#[must_use]
pub fn with_reserved(payload: Vec<u8>, reserved: u16) -> Self {
Self::try_with_reserved(payload, reserved)
.expect("RDW payload exceeds maximum size (65535 bytes)")
}
#[inline]
#[must_use]
pub fn length(&self) -> u16 {
RdwHeader::from_bytes(self.header).length()
}
#[inline]
#[must_use]
pub fn reserved(&self) -> u16 {
RdwHeader::from_bytes(self.header).reserved()
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn try_recompute_length(&mut self) -> Result<()> {
self.header = RdwHeader::from_payload_len(self.payload.len(), self.reserved())?.bytes();
Ok(())
}
#[deprecated(
since = "0.4.3",
note = "use try_recompute_length() instead for fallible operation"
)]
#[allow(clippy::expect_used)] #[inline]
pub fn recompute_length(&mut self) {
self.try_recompute_length()
.expect("RDW payload exceeds maximum size (65535 bytes)");
}
#[inline]
#[must_use]
pub fn as_bytes(&self) -> Vec<u8> {
let mut result = Vec::with_capacity(RDW_HEADER_LEN + self.payload.len());
result.extend_from_slice(&self.header);
result.extend_from_slice(&self.payload);
result
}
}
#[derive(Debug)]
pub struct RDWRecordReader<R: Read> {
input: BufReader<R>,
record_count: u64,
strict_mode: bool,
}
impl<R: Read> RDWRecordReader<R> {
#[inline]
#[must_use]
pub fn new(input: R, strict_mode: bool) -> Self {
Self {
input: BufReader::with_capacity(RDW_READER_BUF_CAPACITY, input),
record_count: 0,
strict_mode,
}
}
#[inline]
fn peek_header(&mut self) -> Result<Option<[u8; RDW_HEADER_LEN]>> {
let peek = rdw_try_peek_len(&mut self.input).map_err(|error| {
error.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: Some(0),
line_number: None,
details: Some("Unable to peek RDW header".to_string()),
})
})?;
if peek.is_none() {
let buf = self.input.fill_buf().map_err(|e| {
Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error reading RDW header: {e}"),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: Some(0),
line_number: None,
details: Some("Unable to read RDW header".to_string()),
})
})?;
if buf.is_empty() {
debug!("Reached EOF after {} RDW records", self.record_count);
return Ok(None);
}
if self.strict_mode {
return Err(Error::new(
ErrorCode::CBKF221_RDW_UNDERFLOW,
"Incomplete RDW header: expected 4 bytes".to_string(),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: Some(0),
line_number: None,
details: Some("File ends with incomplete RDW header".to_string()),
}));
}
debug!(
"Reached EOF after {} RDW records (truncated header ignored)",
self.record_count
);
let remaining = buf.len();
self.input.consume(remaining);
return Ok(None);
}
let buf = self.input.fill_buf().map_err(|e| {
Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error reading RDW header: {e}"),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: Some(0),
line_number: None,
details: Some("Unable to read RDW header".to_string()),
})
})?;
if buf.len() < RDW_HEADER_LEN {
if self.strict_mode {
return Err(Error::new(
ErrorCode::CBKF221_RDW_UNDERFLOW,
"Incomplete RDW header: expected 4 bytes".to_string(),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: Some(0),
line_number: None,
details: Some("File ends with incomplete RDW header".to_string()),
}));
}
debug!(
"Reached EOF after {} RDW records (truncated header ignored)",
self.record_count
);
let remaining = buf.len();
self.input.consume(remaining);
return Ok(None);
}
Ok(Some([buf[0], buf[1], buf[2], buf[3]]))
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn read_record(&mut self) -> Result<Option<RDWRecord>> {
let Some(header) = self.peek_header()? else {
return Ok(None);
};
let length = match rdw_read_len(&mut self.input) {
Ok(len) => len,
Err(error) => {
return Err(error.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: Some(0),
line_number: None,
details: Some("Unable to read RDW body length".to_string()),
}));
}
};
self.input.consume(2);
let reserved = u16::from_be_bytes([header[2], header[3]]);
self.record_count += 1;
debug!(
"Read RDW header for record {}: length={}, reserved={:04X}",
self.record_count,
u32::from(length),
reserved
);
if reserved != 0 {
let error = Error::new(
ErrorCode::CBKR211_RDW_RESERVED_NONZERO,
format!("RDW reserved bytes are non-zero: {reserved:04X}"),
)
.with_context(ErrorContext {
record_index: Some(self.record_count),
field_path: None,
byte_offset: Some(2),
line_number: None,
details: Some(format!("Expected 0000, got {reserved:04X}")),
});
if self.strict_mode {
return Err(error);
}
warn!(
"RDW reserved bytes non-zero (record {}): {:04X}",
self.record_count, reserved
);
}
if Self::is_suspect_ascii_corruption(header) {
warn!(
"RDW appears to be ASCII-corrupted (record {}): {:02X} {:02X} {:02X} {:02X}",
self.record_count, header[0], header[1], header[2], header[3]
);
return Err(Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!(
"RDW appears to be ASCII-corrupted: {:02X} {:02X} {:02X} {:02X}",
header[0], header[1], header[2], header[3]
),
)
.with_context(ErrorContext {
record_index: Some(self.record_count),
field_path: None,
byte_offset: Some(0),
line_number: None,
details: Some("Suspected ASCII transfer corruption".to_string()),
}));
}
if length == 0 {
debug!("Zero-length RDW record {}", self.record_count);
return Ok(Some(RDWRecord {
header,
payload: Vec::new(),
}));
}
let payload_len = usize::from(length);
let body_slice = match rdw_slice_body(&mut self.input, length) {
Ok(slice) => slice,
Err(error) => {
return Err(error.with_context(ErrorContext {
record_index: Some(self.record_count),
field_path: None,
byte_offset: Some(4),
line_number: None,
details: Some("File ends with incomplete RDW payload".to_string()),
}));
}
};
let payload = rdw_validate_and_finish(body_slice).to_vec();
self.input.consume(payload_len);
debug!(
"Read RDW record {} payload: {} bytes",
self.record_count, length
);
Ok(Some(RDWRecord { header, payload }))
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn validate_zero_length_record(&self, schema: &Schema) -> Result<()> {
let min_size = Self::calculate_schema_fixed_prefix(schema);
if min_size > 0 {
return Err(Error::new(
ErrorCode::CBKF221_RDW_UNDERFLOW,
format!("Zero-length RDW record invalid: schema requires minimum {min_size} bytes"),
)
.with_context(ErrorContext {
record_index: Some(self.record_count),
field_path: None,
byte_offset: None,
line_number: None,
details: Some("Zero-length record with non-zero schema prefix".to_string()),
}));
}
Ok(())
}
#[inline]
#[must_use]
pub fn record_count(&self) -> u64 {
self.record_count
}
#[inline]
fn calculate_schema_fixed_prefix(schema: &Schema) -> u32 {
use copybook_core::{Field, Occurs};
fn find_first_odo_offset(fields: &[Field], current: &mut Option<u32>) {
for field in fields {
if let Some(Occurs::ODO { .. }) = &field.occurs {
let offset = field.offset;
match current {
Some(existing) => {
if offset < *existing {
*current = Some(offset);
}
}
None => *current = Some(offset),
}
}
if !field.children.is_empty() {
find_first_odo_offset(&field.children, current);
}
}
}
let mut first_odo_offset: Option<u32> = None;
find_first_odo_offset(&schema.fields, &mut first_odo_offset);
if let Some(offset) = first_odo_offset {
offset
} else if let Some(lrecl) = schema.lrecl_fixed {
lrecl
} else {
fn find_record_end(fields: &[Field], max_end: &mut u32) {
for field in fields {
let end = field.offset + field.len;
if end > *max_end {
*max_end = end;
}
if !field.children.is_empty() {
find_record_end(&field.children, max_end);
}
}
}
let mut max_end = 0;
find_record_end(&schema.fields, &mut max_end);
max_end
}
}
#[inline]
fn is_suspect_ascii_corruption(rdw_header: [u8; RDW_HEADER_LEN]) -> bool {
rdw_is_suspect_ascii_corruption(rdw_header)
}
}
#[derive(Debug)]
pub struct RDWRecordWriter<W: Write> {
output: W,
record_count: u64,
}
impl<W: Write> RDWRecordWriter<W> {
#[inline]
#[must_use]
pub fn new(output: W) -> Self {
Self {
output,
record_count: 0,
}
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn write_record(&mut self, record: &RDWRecord) -> Result<()> {
self.output.write_all(&record.header).map_err(|e| {
Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error writing RDW header: {e}"),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: None,
line_number: None,
details: None,
})
})?;
self.output.write_all(&record.payload).map_err(|e| {
Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error writing RDW payload: {e}"),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: Some(4),
line_number: None,
details: None,
})
})?;
self.record_count += 1;
debug!(
"Wrote RDW record {} with {} byte payload",
self.record_count,
record.payload.len()
);
Ok(())
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn write_record_from_payload(
&mut self,
payload: &[u8],
preserve_reserved: Option<u16>,
) -> Result<()> {
let length = payload.len();
let header =
RdwHeader::from_payload_len(length, preserve_reserved.unwrap_or(0)).map_err(|_| {
Error::new(
ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
format!(
"RDW payload too large: {length} bytes exceeds maximum of {}",
u16::MAX
),
)
.with_context(ErrorContext {
record_index: Some(self.record_count + 1),
field_path: None,
byte_offset: None,
line_number: None,
details: Some("RDW length field is 16-bit".to_string()),
})
})?;
let record = RDWRecord {
header: header.bytes(),
payload: payload.to_vec(),
};
self.write_record(&record)
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn flush(&mut self) -> Result<()> {
self.output.flush().map_err(|e| {
Error::new(
ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
format!("I/O error flushing output: {e}"),
)
})
}
#[inline]
#[must_use]
pub fn record_count(&self) -> u64 {
self.record_count
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use proptest::collection::vec;
use proptest::prelude::*;
use std::io::{BufRead, Cursor};
#[test]
fn header_from_payload_len_roundtrips() {
let header = RdwHeader::from_payload_len(10, 0x1234).unwrap();
assert_eq!(header.length(), 10);
assert_eq!(header.reserved(), 0x1234);
assert_eq!(header.bytes(), [0x00, 0x0A, 0x12, 0x34]);
}
#[test]
fn header_from_payload_len_oversize_fails() {
let err = RdwHeader::from_payload_len(RDW_MAX_PAYLOAD_LEN + 1, 0).unwrap_err();
assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
}
#[test]
fn ascii_corruption_heuristic_matches_digits_only() {
assert!(rdw_is_suspect_ascii_corruption([b'1', b'2', 0, 0]));
assert!(!rdw_is_suspect_ascii_corruption([0, 12, 0, 0]));
}
#[test]
fn rdw_peek_len_none_on_short_buffer() {
let mut cur = Cursor::new(Vec::<u8>::new());
assert!(rdw_try_peek_len(&mut cur).unwrap().is_none());
let mut cur = Cursor::new(vec![0x00]);
assert!(rdw_try_peek_len(&mut cur).unwrap().is_none());
}
#[test]
fn rdw_read_len_consumes_two_bytes() {
let mut cur = Cursor::new(vec![0x00, 0x03, 0xAA, 0xBB, b'A', b'B', b'C']);
let len = rdw_read_len(&mut cur).unwrap();
assert_eq!(len, 3);
cur.consume(2);
let body = rdw_slice_body(&mut cur, len).unwrap();
assert_eq!(rdw_validate_and_finish(body), b"ABC");
}
#[test]
fn rdw_slice_body_short_is_cbkf102() {
let mut cur = Cursor::new(vec![0x00, 0x10, 0xAA, 0xBB, 0xCC, 0xDD]);
let len = rdw_read_len(&mut cur).unwrap();
cur.consume(2);
let err = rdw_slice_body(&mut cur, len).unwrap_err();
assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
}
#[test]
fn rdw_record_try_new_roundtrip() {
let record = RDWRecord::try_new(b"hello".to_vec()).unwrap();
assert_eq!(record.length(), 5);
assert_eq!(record.reserved(), 0);
assert_eq!(record.payload, b"hello");
}
#[test]
fn rdw_record_try_with_reserved_roundtrip() {
let record = RDWRecord::try_with_reserved(b"test".to_vec(), 0x1234).unwrap();
assert_eq!(record.length(), 4);
assert_eq!(record.reserved(), 0x1234);
assert_eq!(record.payload, b"test");
}
#[test]
fn rdw_record_try_recompute_updates_length() {
let mut record = RDWRecord::try_new(b"test".to_vec()).unwrap();
record.payload = b"longer_payload".to_vec();
record.try_recompute_length().unwrap();
assert_eq!(record.length(), 14);
}
#[test]
fn rdw_record_as_bytes_prepends_header() {
let record = RDWRecord::try_new(b"hi".to_vec()).unwrap();
assert_eq!(record.as_bytes(), vec![0, 2, 0, 0, b'h', b'i']);
}
#[test]
fn rdw_writer_writes_record() {
let mut output = Vec::new();
let mut writer = RDWRecordWriter::new(&mut output);
let record = RDWRecord::try_new(b"test".to_vec()).unwrap();
writer.write_record(&record).unwrap();
assert_eq!(writer.record_count(), 1);
assert_eq!(output, vec![0, 4, 0, 0, b't', b'e', b's', b't']);
}
#[test]
fn rdw_writer_writes_record_from_payload_with_reserved() {
let mut output = Vec::new();
let mut writer = RDWRecordWriter::new(&mut output);
writer
.write_record_from_payload(b"test", Some(0x1234))
.unwrap();
assert_eq!(output, vec![0, 4, 0x12, 0x34, b't', b'e', b's', b't']);
}
#[test]
fn rdw_reader_reads_single_record() {
let data = vec![0, 5, 0, 0, b'h', b'e', b'l', b'l', b'o'];
let mut reader = RDWRecordReader::new(Cursor::new(data), false);
let record = reader.read_record().unwrap().unwrap();
assert_eq!(record.length(), 5);
assert_eq!(record.reserved(), 0);
assert_eq!(record.payload, b"hello");
assert_eq!(reader.record_count(), 1);
}
#[test]
fn rdw_reader_reads_multiple_records() {
let data = vec![
0, 2, 0, 0, b'h', b'i', 0, 3, 0, 0, b'b', b'y', b'e',
];
let mut reader = RDWRecordReader::new(Cursor::new(data), false);
let first = reader.read_record().unwrap().unwrap();
assert_eq!(first.payload, b"hi");
assert_eq!(reader.record_count(), 1);
let second = reader.read_record().unwrap().unwrap();
assert_eq!(second.payload, b"bye");
assert_eq!(reader.record_count(), 2);
assert!(reader.read_record().unwrap().is_none());
}
#[test]
fn rdw_reader_reserved_nonzero_is_warning_in_lenient_mode() {
let data = vec![0, 4, 0x12, 0x34, b't', b'e', b's', b't'];
let mut reader = RDWRecordReader::new(Cursor::new(data), false);
let record = reader.read_record().unwrap().unwrap();
assert_eq!(record.reserved(), 0x1234);
assert_eq!(record.payload, b"test");
}
#[test]
fn rdw_reader_reserved_nonzero_is_error_in_strict_mode() {
let data = vec![0, 4, 0x12, 0x34, b't', b'e', b's', b't'];
let mut reader = RDWRecordReader::new(Cursor::new(data), true);
let error = reader.read_record().unwrap_err();
assert_eq!(error.code, ErrorCode::CBKR211_RDW_RESERVED_NONZERO);
}
#[test]
fn rdw_reader_incomplete_header_lenient_is_eof() {
let data = vec![0, 4];
let mut reader = RDWRecordReader::new(Cursor::new(data), false);
let result = reader.read_record().unwrap();
assert!(result.is_none());
}
#[test]
fn rdw_reader_incomplete_header_strict_is_underflow() {
let data = vec![0, 4];
let mut reader = RDWRecordReader::new(Cursor::new(data), true);
let error = reader.read_record().unwrap_err();
assert_eq!(error.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
}
#[test]
fn rdw_reader_incomplete_payload_is_cbkf102() {
let data = vec![0, 5, 0, 0, b'h', b'i'];
let mut reader = RDWRecordReader::new(Cursor::new(data), false);
let error = reader.read_record().unwrap_err();
assert_eq!(error.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
}
#[test]
fn rdw_reader_ascii_corruption_is_detected() {
let data = vec![b'1', b'2', 0, 0, b'H', b'E', b'L', b'L', b'O'];
let mut reader = RDWRecordReader::new(Cursor::new(data), false);
let error = reader.read_record().unwrap_err();
assert_eq!(error.code, ErrorCode::CBKF104_RDW_SUSPECT_ASCII);
}
#[test]
fn rdw_reader_zero_length_validation_obeys_schema_prefix() {
use copybook_core::{Field, FieldKind, Occurs, Schema, TailODO};
let mut counter = Field::with_kind(
5,
"CTR".to_string(),
FieldKind::BinaryInt {
bits: 16,
signed: false,
},
);
counter.offset = 0;
counter.len = 2;
let mut array = Field::with_kind(5, "ARR".to_string(), FieldKind::Alphanum { len: 1 });
array.offset = 2;
array.len = 1;
array.occurs = Some(Occurs::ODO {
min: 0,
max: 5,
counter_path: "CTR".to_string(),
});
let schema = Schema {
fields: vec![counter, array],
lrecl_fixed: None,
tail_odo: Some(TailODO {
counter_path: "CTR".to_string(),
min_count: 0,
max_count: 5,
array_path: "ARR".to_string(),
}),
fingerprint: String::new(),
};
let reader = RDWRecordReader::new(Cursor::new(Vec::<u8>::new()), false);
let error = reader.validate_zero_length_record(&schema).unwrap_err();
assert_eq!(error.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
let empty_schema = Schema::new();
reader.validate_zero_length_record(&empty_schema).unwrap();
}
#[test]
fn rdw_writer_payload_too_large_is_cbke501() {
let mut output = Vec::new();
let mut writer = RDWRecordWriter::new(&mut output);
let large_payload = vec![0u8; usize::from(u16::MAX) + 1];
let err = writer
.write_record_from_payload(&large_payload, None)
.unwrap_err();
assert_eq!(err.code, ErrorCode::CBKE501_JSON_TYPE_MISMATCH);
}
#[test]
fn rdw_record_oversize_try_new_is_cbkf102() {
let large_payload = vec![0u8; usize::from(u16::MAX) + 1];
let err = RDWRecord::try_new(large_payload).unwrap_err();
assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
assert!(err.message.contains("RDW payload too large"));
}
#[test]
fn rdw_record_oversize_try_with_reserved_is_cbkf102() {
let large_payload = vec![0u8; usize::from(u16::MAX) + 1];
let err = RDWRecord::try_with_reserved(large_payload, 0x1234).unwrap_err();
assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
assert!(err.message.contains("RDW payload too large"));
}
#[test]
fn rdw_record_oversize_try_recompute_is_cbkf102() {
let mut record = RDWRecord::try_new(b"test".to_vec()).unwrap();
record.payload = vec![0u8; usize::from(u16::MAX) + 1];
let err = record.try_recompute_length().unwrap_err();
assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
assert!(err.message.contains("RDW payload too large"));
}
#[test]
#[should_panic(expected = "RDW payload exceeds maximum size")]
#[allow(deprecated)]
fn rdw_record_new_panics_on_oversize_payload() {
let payload = vec![0u8; usize::from(u16::MAX) + 1];
let _ = RDWRecord::new(payload);
}
proptest! {
#[test]
fn prop_header_payload_len_roundtrip(payload_len in 0u16..=u16::MAX, reserved in any::<u16>()) {
let header = RdwHeader::from_payload_len(payload_len as usize, reserved).unwrap();
prop_assert_eq!(header.length(), payload_len);
prop_assert_eq!(header.reserved(), reserved);
prop_assert_eq!(RdwHeader::from_bytes(header.bytes()).length(), payload_len);
}
#[test]
fn prop_ascii_corruption_heuristic_matches_manual(b0 in any::<u8>(), b1 in any::<u8>(), b2 in any::<u8>(), b3 in any::<u8>()) {
let header = [b0, b1, b2, b3];
let expected = b0.is_ascii_digit() && b1.is_ascii_digit();
prop_assert_eq!(rdw_is_suspect_ascii_corruption(header), expected);
prop_assert_eq!(RdwHeader::from_bytes(header).looks_ascii_corrupt(), expected);
}
#[test]
fn prop_rdw_record_length_matches_payload(payload in vec(any::<u8>(), 0..=1024), reserved in any::<u16>()) {
let record = RDWRecord::try_with_reserved(payload.clone(), reserved).unwrap();
prop_assert_eq!(usize::from(record.length()), payload.len());
prop_assert_eq!(record.reserved(), reserved);
let bytes = record.as_bytes();
prop_assert_eq!(bytes.len(), RDW_HEADER_LEN + payload.len());
prop_assert_eq!(&bytes[RDW_HEADER_LEN..], payload.as_slice());
}
#[test]
fn prop_rdw_writer_from_payload_encodes_header(payload in vec(any::<u8>(), 0..=512), reserved in any::<u16>()) {
let mut output = Vec::new();
let mut writer = RDWRecordWriter::new(&mut output);
writer.write_record_from_payload(&payload, Some(reserved)).unwrap();
prop_assert_eq!(writer.record_count(), 1);
let header = RdwHeader::from_bytes(output[0..RDW_HEADER_LEN].try_into().unwrap());
prop_assert_eq!(usize::from(header.length()), payload.len());
prop_assert_eq!(header.reserved(), reserved);
prop_assert_eq!(&output[RDW_HEADER_LEN..], payload.as_slice());
}
#[test]
fn prop_rdw_writer_reader_roundtrip(
payload in vec(any::<u8>(), 0..=1024),
reserved in any::<u16>(),
) {
let mut encoded = Vec::new();
let mut writer = RDWRecordWriter::new(&mut encoded);
writer.write_record_from_payload(&payload, Some(reserved)).unwrap();
let mut reader = RDWRecordReader::new(Cursor::new(encoded), false);
let decoded = reader.read_record().unwrap().unwrap();
prop_assert_eq!(decoded.payload.as_slice(), payload.as_slice());
prop_assert_eq!(decoded.reserved(), reserved);
prop_assert!(reader.read_record().unwrap().is_none());
}
}
#[test]
fn rdw_header_big_endian_length_parsing() {
let header = RdwHeader::from_bytes([0x01, 0x00, 0x00, 0x00]);
assert_eq!(header.length(), 256);
assert_eq!(header.reserved(), 0);
let header = RdwHeader::from_bytes([0xFF, 0xFF, 0x00, 0x00]);
assert_eq!(header.length(), u16::MAX);
}
#[test]
fn rdw_header_reserved_bytes_preserved() {
let header = RdwHeader::from_bytes([0x00, 0x0A, 0xDE, 0xAD]);
assert_eq!(header.length(), 10);
assert_eq!(header.reserved(), 0xDEAD);
}
#[test]
fn rdw_reader_empty_file_returns_none() {
let mut reader = RDWRecordReader::new(Cursor::new(Vec::<u8>::new()), false);
assert!(reader.read_record().unwrap().is_none());
assert_eq!(reader.record_count(), 0);
}
#[test]
fn rdw_reader_empty_file_strict_returns_none() {
let mut reader = RDWRecordReader::new(Cursor::new(Vec::<u8>::new()), true);
assert!(reader.read_record().unwrap().is_none());
assert_eq!(reader.record_count(), 0);
}
#[test]
fn rdw_reader_zero_length_record() {
let data = vec![0x00, 0x00, 0x00, 0x00];
let mut reader = RDWRecordReader::new(Cursor::new(data), false);
let record = reader.read_record().unwrap().unwrap();
assert_eq!(record.length(), 0);
assert!(record.payload.is_empty());
assert_eq!(reader.record_count(), 1);
assert!(reader.read_record().unwrap().is_none());
}
#[test]
fn rdw_reader_max_record_size() {
let payload = vec![0xABu8; u16::MAX as usize];
let mut data = Vec::with_capacity(RDW_HEADER_LEN + payload.len());
data.extend_from_slice(&[0xFF, 0xFF, 0x00, 0x00]);
data.extend_from_slice(&payload);
let mut reader = RDWRecordReader::new(Cursor::new(data), false);
let record = reader.read_record().unwrap().unwrap();
assert_eq!(record.length(), u16::MAX);
assert_eq!(record.payload.len(), u16::MAX as usize);
assert!(record.payload.iter().all(|&b| b == 0xAB));
}
#[test]
fn rdw_multi_record_write_read_roundtrip() {
let payloads: Vec<&[u8]> = vec![b"alpha", b"", b"gamma delta", b"x"];
let mut encoded = Vec::new();
{
let mut writer = RDWRecordWriter::new(&mut encoded);
for p in &payloads {
writer.write_record_from_payload(p, None).unwrap();
}
writer.flush().unwrap();
assert_eq!(writer.record_count(), 4);
}
let mut reader = RDWRecordReader::new(Cursor::new(&encoded), false);
for expected in &payloads {
let record = reader.read_record().unwrap().unwrap();
assert_eq!(record.payload.as_slice(), *expected);
}
assert!(reader.read_record().unwrap().is_none());
assert_eq!(reader.record_count(), 4);
}
#[test]
fn rdw_streaming_many_records() {
let record_count = 500;
let payload = b"STREAMING_TEST";
let mut encoded = Vec::new();
{
let mut writer = RDWRecordWriter::new(&mut encoded);
for _ in 0..record_count {
writer.write_record_from_payload(payload, None).unwrap();
}
writer.flush().unwrap();
}
let mut reader = RDWRecordReader::new(Cursor::new(&encoded), false);
let mut count = 0u64;
while let Some(record) = reader.read_record().unwrap() {
assert_eq!(record.payload, payload);
count += 1;
}
assert_eq!(count, record_count);
assert_eq!(reader.record_count(), record_count);
}
#[test]
fn rdw_reader_single_byte_header_lenient_is_eof() {
let data = vec![0x00];
let mut reader = RDWRecordReader::new(Cursor::new(data), false);
assert!(reader.read_record().unwrap().is_none());
}
#[test]
fn rdw_reader_single_byte_header_strict_is_underflow() {
let data = vec![0x00];
let mut reader = RDWRecordReader::new(Cursor::new(data), true);
let err = reader.read_record().unwrap_err();
assert_eq!(err.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
}
#[test]
fn rdw_header_zero_length_zero_reserved() {
let header = RdwHeader::from_payload_len(0, 0).unwrap();
assert_eq!(header.length(), 0);
assert_eq!(header.reserved(), 0);
assert_eq!(header.bytes(), [0, 0, 0, 0]);
}
#[test]
fn rdw_header_max_payload_len() {
let header = RdwHeader::from_payload_len(RDW_MAX_PAYLOAD_LEN, 0).unwrap();
assert_eq!(header.length(), u16::MAX);
}
#[test]
fn rdw_header_max_payload_len_plus_one_fails() {
let err = RdwHeader::from_payload_len(RDW_MAX_PAYLOAD_LEN + 1, 0).unwrap_err();
assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
}
#[test]
fn rdw_header_length_one() {
let header = RdwHeader::from_payload_len(1, 0).unwrap();
assert_eq!(header.length(), 1);
assert_eq!(header.bytes(), [0, 1, 0, 0]);
}
#[test]
fn rdw_header_looks_ascii_corrupt_false_for_binary() {
let header = RdwHeader::from_bytes([0x00, 0x0A, 0x00, 0x00]);
assert!(!header.looks_ascii_corrupt());
}
#[test]
fn rdw_header_looks_ascii_corrupt_true_for_digits() {
let header = RdwHeader::from_bytes([b'0', b'5', 0x00, 0x00]);
assert!(header.looks_ascii_corrupt());
}
#[test]
fn rdw_payload_len_to_u16_zero() {
assert_eq!(rdw_payload_len_to_u16(0).unwrap(), 0);
}
#[test]
fn rdw_payload_len_to_u16_max() {
assert_eq!(
rdw_payload_len_to_u16(usize::from(u16::MAX)).unwrap(),
u16::MAX
);
}
#[test]
fn rdw_payload_len_to_u16_too_large() {
let err = rdw_payload_len_to_u16(usize::from(u16::MAX) + 1).unwrap_err();
assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
assert!(err.message.contains("RDW payload too large"));
}
#[test]
fn rdw_slice_body_zero_length_returns_empty() {
let mut cur = Cursor::new(vec![0xAA, 0xBB]);
let body = rdw_slice_body(&mut cur, 0).unwrap();
assert!(body.is_empty());
}
#[test]
fn rdw_validate_and_finish_identity() {
let data = b"test_data";
let result = rdw_validate_and_finish(data);
assert_eq!(result, data);
}
#[test]
fn rdw_record_clone() {
let record = RDWRecord::try_new(b"clone_me".to_vec()).unwrap();
let cloned = record.clone();
assert_eq!(cloned.payload, record.payload);
assert_eq!(cloned.header, record.header);
}
#[test]
fn rdw_record_debug_format() {
let record = RDWRecord::try_new(b"dbg".to_vec()).unwrap();
let debug = format!("{record:?}");
assert!(debug.contains("RDWRecord"));
}
#[test]
fn rdw_record_empty_payload() {
let record = RDWRecord::try_new(Vec::new()).unwrap();
assert_eq!(record.length(), 0);
assert!(record.payload.is_empty());
assert_eq!(record.as_bytes().len(), RDW_HEADER_LEN);
}
#[test]
fn rdw_reader_three_byte_header_lenient_is_eof() {
let data = vec![0x00, 0x05, 0x00];
let mut reader = RDWRecordReader::new(Cursor::new(data), false);
assert!(reader.read_record().unwrap().is_none());
}
#[test]
fn rdw_reader_three_byte_header_strict_is_underflow() {
let data = vec![0x00, 0x05, 0x00];
let mut reader = RDWRecordReader::new(Cursor::new(data), true);
let err = reader.read_record().unwrap_err();
assert_eq!(err.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
}
#[test]
fn rdw_writer_flush_succeeds() {
let mut output = Vec::new();
let mut writer = RDWRecordWriter::new(&mut output);
writer.flush().unwrap();
assert_eq!(writer.record_count(), 0);
}
#[test]
fn rdw_writer_multiple_records_count() {
let mut output = Vec::new();
let mut writer = RDWRecordWriter::new(&mut output);
for i in 0..5 {
writer.write_record_from_payload(&[i], None).unwrap();
}
assert_eq!(writer.record_count(), 5);
}
#[test]
fn rdw_try_peek_len_two_bytes_returns_some() {
let mut cur = Cursor::new(vec![0x00, 0x05]);
assert!(rdw_try_peek_len(&mut cur).unwrap().is_some());
}
#[test]
fn rdw_read_len_incomplete_is_error() {
let mut cur = Cursor::new(vec![0x00]);
let err = rdw_read_len(&mut cur).unwrap_err();
assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
}
}