use crate::options::{DecodeOptions, RecordFormat};
use copybook_core::{Error, ErrorCode, Result, Schema};
use copybook_rdw::RdwHeader;
use serde_json::Value;
use std::io::{BufReader, Read};
const FIXED_FORMAT_LRECL_MISSING: &str = "Fixed format requires a fixed record length (LRECL). \
Set `schema.lrecl_fixed` or use `RecordFormat::Variable`.";
pub struct RecordIterator<R: Read> {
reader: BufReader<R>,
schema: Schema,
options: DecodeOptions,
record_index: u64,
eof_reached: bool,
buffer: Vec<u8>,
}
impl<R: Read> RecordIterator<R> {
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn new(reader: R, schema: &Schema, options: &DecodeOptions) -> Result<Self> {
Ok(Self {
reader: BufReader::new(reader),
schema: schema.clone(),
options: options.clone(),
record_index: 0,
eof_reached: false,
buffer: Vec::new(),
})
}
#[inline]
#[must_use]
pub fn current_record_index(&self) -> u64 {
self.record_index
}
#[inline]
#[must_use]
pub fn is_eof(&self) -> bool {
self.eof_reached
}
#[inline]
#[must_use]
pub fn schema(&self) -> &Schema {
&self.schema
}
#[inline]
#[must_use]
pub fn options(&self) -> &DecodeOptions {
&self.options
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn read_raw_record(&mut self) -> Result<Option<Vec<u8>>> {
if self.eof_reached {
return Ok(None);
}
self.buffer.clear();
let record_data = match self.options.format {
RecordFormat::Fixed => {
let lrecl = self.schema.lrecl_fixed.ok_or_else(|| {
Error::new(ErrorCode::CBKI001_INVALID_STATE, FIXED_FORMAT_LRECL_MISSING)
})? as usize;
self.buffer.resize(lrecl, 0);
match self.reader.read_exact(&mut self.buffer) {
Ok(()) => {
self.record_index += 1;
Some(self.buffer.clone())
}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
self.eof_reached = true;
return Ok(None);
}
Err(e) => {
return Err(Error::new(
ErrorCode::CBKD301_RECORD_TOO_SHORT,
format!("Failed to read fixed record: {e}"),
));
}
}
}
RecordFormat::RDW => {
let mut rdw_header = [0u8; 4];
match self.reader.read_exact(&mut rdw_header) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
self.eof_reached = true;
return Ok(None);
}
Err(e) => {
return Err(Error::new(
ErrorCode::CBKF221_RDW_UNDERFLOW,
format!("Failed to read RDW header: {e}"),
));
}
}
let length = usize::from(RdwHeader::from_bytes(rdw_header).length());
self.buffer.resize(length, 0);
match self.reader.read_exact(&mut self.buffer) {
Ok(()) => {
self.record_index += 1;
Some(self.buffer.clone())
}
Err(e) => {
return Err(Error::new(
ErrorCode::CBKF221_RDW_UNDERFLOW,
format!("Failed to read RDW payload: {e}"),
));
}
}
}
};
Ok(record_data)
}
#[inline]
fn decode_next_record(&mut self) -> Result<Option<Value>> {
match self.read_raw_record()? {
Some(record_bytes) => {
let json_value = crate::decode_record(&self.schema, &record_bytes, &self.options)?;
Ok(Some(json_value))
}
None => Ok(None),
}
}
}
impl<R: Read> Iterator for RecordIterator<R> {
type Item = Result<Value>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.eof_reached {
return None;
}
match self.decode_next_record() {
Ok(Some(value)) => Some(Ok(value)),
Ok(None) => {
self.eof_reached = true;
None
}
Err(error) => {
Some(Err(error))
}
}
}
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn iter_records_from_file<P: AsRef<std::path::Path>>(
file_path: P,
schema: &Schema,
options: &DecodeOptions,
) -> Result<RecordIterator<std::fs::File>> {
let file = std::fs::File::open(file_path)
.map_err(|e| Error::new(ErrorCode::CBKF104_RDW_SUSPECT_ASCII, e.to_string()))?;
RecordIterator::new(file, schema, options)
}
#[inline]
#[must_use = "Handle the Result or propagate the error"]
pub fn iter_records<R: Read>(
reader: R,
schema: &Schema,
options: &DecodeOptions,
) -> Result<RecordIterator<R>> {
RecordIterator::new(reader, schema, options)
}
#[cfg(test)]
#[allow(clippy::expect_used)]
#[allow(clippy::unwrap_used)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::Codepage;
use copybook_core::parse_copybook;
use std::io::Cursor;
#[test]
fn test_record_iterator_basic() {
let copybook_text = r"
01 RECORD.
05 ID PIC 9(3).
05 NAME PIC X(5).
";
let schema = parse_copybook(copybook_text).unwrap();
let test_data = b"001ALICE002BOB ";
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::Fixed,
..DecodeOptions::default()
};
let iterator = RecordIterator::new(cursor, &schema, &options).unwrap();
assert_eq!(iterator.current_record_index(), 0);
assert!(!iterator.is_eof());
}
#[test]
fn test_record_iterator_rdw() {
let copybook_text = r"
01 RECORD.
05 ID PIC 9(3).
05 NAME PIC X(5).
";
let schema = parse_copybook(copybook_text).unwrap();
let test_data = vec![
0x00, 0x08, 0x00, 0x00, b'0', b'0', b'1', b'A', b'L', b'I', b'C', b'E', 0x00, 0x06, 0x00, 0x00, b'0', b'0', b'2', b'B', b'O', b'B', ];
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::RDW,
..DecodeOptions::default()
};
let iterator = RecordIterator::new(cursor, &schema, &options).unwrap();
assert_eq!(iterator.current_record_index(), 0);
assert!(!iterator.is_eof());
}
#[test]
fn test_raw_record_reading() {
let copybook_text = r"
01 RECORD.
05 ID PIC 9(3).
05 NAME PIC X(5).
";
let schema = parse_copybook(copybook_text).unwrap();
let test_data = b"001ALICE";
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::Fixed,
..DecodeOptions::default()
};
let mut iterator = RecordIterator::new(cursor, &schema, &options).unwrap();
let raw_record = iterator.read_raw_record().unwrap().unwrap();
assert_eq!(raw_record, b"001ALICE");
assert_eq!(iterator.current_record_index(), 1);
assert!(iterator.read_raw_record().unwrap().is_none());
}
#[test]
fn test_iterator_error_handling() {
let copybook_text = r"
01 RECORD.
05 ID PIC 9(3).
05 NAME PIC X(5).
";
let schema = parse_copybook(copybook_text).unwrap();
let test_data = b"001A";
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::Fixed,
..DecodeOptions::default()
};
let mut iterator = RecordIterator::new(cursor, &schema, &options).unwrap();
assert!(iterator.next().is_none());
}
#[test]
fn test_iterator_fixed_format_missing_lrecl_errors_on_next() {
let copybook_text = "01 SOME-GROUP. 05 SOME-FIELD PIC X(1).";
let mut schema = parse_copybook(copybook_text).unwrap();
schema.lrecl_fixed = None;
let test_data = b"";
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::Fixed,
..DecodeOptions::default()
};
let mut iterator = RecordIterator::new(cursor, &schema, &options).unwrap();
let first = iterator.next().unwrap();
assert!(first.is_err());
if let Err(e) = first {
assert_eq!(e.code, ErrorCode::CBKI001_INVALID_STATE);
assert_eq!(e.message, FIXED_FORMAT_LRECL_MISSING);
}
}
#[test]
fn test_iterator_schema_and_options_accessors() {
let copybook_text = r"
01 RECORD.
05 ID PIC 9(3).
05 NAME PIC X(5).
";
let mut schema = parse_copybook(copybook_text).unwrap();
schema.lrecl_fixed = Some(8);
let test_data = b"001ALICE";
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::Fixed,
codepage: Codepage::ASCII,
..DecodeOptions::default()
};
let iterator = RecordIterator::new(cursor, &schema, &options).unwrap();
assert_eq!(iterator.schema().fields[0].name, "RECORD");
assert_eq!(iterator.options().format, RecordFormat::Fixed);
}
#[test]
fn test_iterator_multiple_fixed_records() {
let copybook_text = r"
01 RECORD.
05 ID PIC 9(3).
05 NAME PIC X(5).
";
let mut schema = parse_copybook(copybook_text).unwrap();
schema.lrecl_fixed = Some(8);
let test_data = b"001ALICE002BOB 003CAROL";
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::Fixed,
codepage: Codepage::ASCII,
..DecodeOptions::default()
};
let mut iterator = RecordIterator::new(cursor, &schema, &options).unwrap();
let mut count = 0;
for result in iterator.by_ref() {
assert!(result.is_ok(), "Record {count} should decode successfully");
count += 1;
}
assert_eq!(count, 3);
assert_eq!(iterator.current_record_index(), 3);
assert!(iterator.is_eof());
}
#[test]
fn test_iterator_rdw_multiple_records() {
let copybook_text = r"
01 RECORD.
05 ID PIC 9(3).
05 NAME PIC X(5).
";
let schema = parse_copybook(copybook_text).unwrap();
let test_data = vec![
0x00, 0x08, 0x00, 0x00, b'0', b'0', b'1', b'A', b'L', b'I', b'C', b'E', 0x00, 0x06, 0x00, 0x00, b'0', b'0', b'2', b'B', b'O', b'B', 0x00, 0x08, 0x00, 0x00, b'0', b'0', b'3', b'C', b'A', b'R', b'O', b'L',
];
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::RDW,
codepage: Codepage::ASCII,
..DecodeOptions::default()
};
let mut iterator = RecordIterator::new(cursor, &schema, &options).unwrap();
let mut count = 0;
for result in iterator.by_ref() {
assert!(result.is_ok(), "Record {count} should decode successfully");
count += 1;
}
assert_eq!(count, 3);
assert_eq!(iterator.current_record_index(), 3);
assert!(iterator.is_eof());
}
#[test]
fn test_iter_records_convenience() {
let copybook_text = r"
01 RECORD.
05 ID PIC 9(3).
05 NAME PIC X(5).
";
let schema = parse_copybook(copybook_text).unwrap();
let test_data = b"001ALICE002BOB ";
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::Fixed,
..DecodeOptions::default()
};
let iterator = iter_records(cursor, &schema, &options).unwrap();
assert_eq!(iterator.current_record_index(), 0);
assert!(!iterator.is_eof());
}
#[test]
fn test_iterator_with_empty_data() {
let copybook_text = r"
01 RECORD.
05 ID PIC 9(3).
05 NAME PIC X(5).
";
let mut schema = parse_copybook(copybook_text).unwrap();
schema.lrecl_fixed = Some(8);
let test_data = b"";
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::Fixed,
..DecodeOptions::default()
};
let mut iterator = RecordIterator::new(cursor, &schema, &options).unwrap();
assert!(iterator.next().is_none());
assert!(iterator.is_eof());
assert_eq!(iterator.current_record_index(), 0);
}
#[test]
fn test_iterator_raw_record_eof() {
let copybook_text = r"
01 RECORD.
05 ID PIC 9(3).
05 NAME PIC X(5).
";
let schema = parse_copybook(copybook_text).unwrap();
let test_data = b"001ALICE";
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::Fixed,
..DecodeOptions::default()
};
let mut iterator = RecordIterator::new(cursor, &schema, &options).unwrap();
assert!(iterator.read_raw_record().unwrap().is_some());
assert_eq!(iterator.current_record_index(), 1);
assert!(iterator.read_raw_record().unwrap().is_none());
assert!(iterator.is_eof());
}
#[test]
fn test_iterator_collect_results() {
let copybook_text = r"
01 RECORD.
05 ID PIC 9(3).
05 NAME PIC X(5).
";
let mut schema = parse_copybook(copybook_text).unwrap();
schema.lrecl_fixed = Some(8);
let test_data = b"001ALICE002BOB 003CAROL";
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::Fixed,
codepage: Codepage::ASCII,
..DecodeOptions::default()
};
let iterator = RecordIterator::new(cursor, &schema, &options).unwrap();
let results: Vec<Result<Value>> = iterator.collect();
assert_eq!(results.len(), 3);
for result in results {
assert!(result.is_ok());
}
}
#[test]
fn test_iterator_with_decode_error() {
let copybook_text = r"
01 RECORD.
05 ID PIC 9(3).
05 NAME PIC X(5).
";
let mut schema = parse_copybook(copybook_text).unwrap();
schema.lrecl_fixed = Some(8);
let test_data = b"001ALICE";
let cursor = Cursor::new(test_data);
let options = DecodeOptions {
format: RecordFormat::Fixed,
codepage: Codepage::ASCII,
..DecodeOptions::default()
};
let mut iterator = RecordIterator::new(cursor, &schema, &options).unwrap();
let first = iterator.next();
assert!(first.is_some());
assert!(first.unwrap().is_ok());
assert!(iterator.next().is_none());
}
}