1#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))]
2use copybook_core::Schema;
13use copybook_error::{Error, ErrorCode, ErrorContext, Result};
14use std::convert::TryFrom;
15use std::io::{ErrorKind, Read, Write};
16use tracing::{debug, warn};
17
18#[derive(Debug)]
20pub struct FixedRecordReader<R: Read> {
21 input: R,
22 lrecl: u32,
23 record_count: u64,
24}
25
26impl<R: Read> FixedRecordReader<R> {
27 #[inline]
32 #[must_use = "Handle the Result or propagate the error"]
33 pub fn new(input: R, lrecl: Option<u32>) -> Result<Self> {
34 let lrecl = lrecl.ok_or_else(|| {
35 Error::new(
36 ErrorCode::CBKI001_INVALID_STATE,
37 "Fixed format requires LRECL",
38 )
39 })?;
40
41 if lrecl == 0 {
42 return Err(Error::new(
43 ErrorCode::CBKI001_INVALID_STATE,
44 "LRECL must be greater than zero",
45 ));
46 }
47
48 Ok(Self {
49 input,
50 lrecl,
51 record_count: 0,
52 })
53 }
54
55 #[inline]
60 #[must_use = "Handle the Result or propagate the error"]
61 pub fn read_record(&mut self) -> Result<Option<Vec<u8>>> {
62 let mut first_byte = [0u8; 1];
64 match self.input.read_exact(&mut first_byte) {
65 Ok(()) => {
66 let lrecl_len = self.lrecl_usize()?;
67 let mut buffer = vec![0u8; lrecl_len];
68 buffer[0] = first_byte[0];
69
70 if lrecl_len > 1 {
71 match self.input.read_exact(&mut buffer[1..]) {
72 Ok(()) => {
73 self.record_count += 1;
74 debug!(
75 "Read fixed record {} of {} bytes",
76 self.record_count, self.lrecl
77 );
78 Ok(Some(buffer))
79 }
80 Err(e) if e.kind() == ErrorKind::UnexpectedEof => Err(Error::new(
81 ErrorCode::CBKF221_RDW_UNDERFLOW,
82 format!(
83 "Incomplete record at end of file: expected {} bytes",
84 self.lrecl
85 ),
86 )
87 .with_context(ErrorContext {
88 record_index: Some(self.record_count + 1),
89 field_path: None,
90 byte_offset: None,
91 line_number: None,
92 details: Some("File ends with partial record".to_string()),
93 })),
94 Err(e) => Err(Error::new(
95 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
96 format!("I/O error reading record: {e}"),
97 )
98 .with_context(ErrorContext {
99 record_index: Some(self.record_count + 1),
100 field_path: None,
101 byte_offset: None,
102 line_number: None,
103 details: None,
104 })),
105 }
106 } else {
107 self.record_count += 1;
108 debug!(
109 "Read fixed record {} of {} bytes",
110 self.record_count, self.lrecl
111 );
112 Ok(Some(buffer))
113 }
114 }
115 Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
116 debug!("Reached EOF after {} records", self.record_count);
117 Ok(None)
118 }
119 Err(e) => Err(Error::new(
120 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
121 format!("I/O error reading record: {e}"),
122 )
123 .with_context(ErrorContext {
124 record_index: Some(self.record_count + 1),
125 field_path: None,
126 byte_offset: None,
127 line_number: None,
128 details: None,
129 })),
130 }
131 }
132
133 #[inline]
138 #[must_use = "Handle the Result or propagate the error"]
139 pub fn validate_record_length(&self, schema: &Schema, record_data: &[u8]) -> Result<()> {
140 let lrecl_len = self.lrecl_usize()?;
141 if record_data.len() != lrecl_len {
142 return Err(Error::new(
143 ErrorCode::CBKF221_RDW_UNDERFLOW,
144 format!(
145 "Record length mismatch: expected {}, got {}",
146 self.lrecl,
147 record_data.len()
148 ),
149 )
150 .with_context(ErrorContext {
151 record_index: Some(self.record_count),
152 field_path: None,
153 byte_offset: None,
154 line_number: None,
155 details: Some("Fixed record length validation failed".to_string()),
156 }));
157 }
158
159 if let Some(schema_lrecl) = schema.lrecl_fixed
160 && self.lrecl != schema_lrecl
161 {
162 warn!(
163 "LRECL mismatch: reader configured for {}, schema expects {}",
164 self.lrecl, schema_lrecl
165 );
166 }
167
168 if schema.tail_odo.is_some() {
169 debug!("Record has ODO tail, variable length within fixed LRECL is expected");
170 }
171
172 Ok(())
173 }
174
175 #[must_use]
177 #[inline]
178 pub fn record_count(&self) -> u64 {
179 self.record_count
180 }
181
182 #[must_use]
184 #[inline]
185 pub fn lrecl(&self) -> u32 {
186 self.lrecl
187 }
188
189 #[inline]
190 fn lrecl_usize(&self) -> Result<usize> {
191 usize::try_from(self.lrecl).map_err(|_| {
192 Error::new(
193 ErrorCode::CBKP001_SYNTAX,
194 "LRECL exceeds platform addressable size",
195 )
196 })
197 }
198}
199
200#[derive(Debug)]
202pub struct FixedRecordWriter<W: Write> {
203 output: W,
204 lrecl: u32,
205 record_count: u64,
206}
207
208impl<W: Write> FixedRecordWriter<W> {
209 #[inline]
214 #[must_use = "Handle the Result or propagate the error"]
215 pub fn new(output: W, lrecl: Option<u32>) -> Result<Self> {
216 let lrecl = lrecl.ok_or_else(|| {
217 Error::new(
218 ErrorCode::CBKI001_INVALID_STATE,
219 "Fixed format requires LRECL",
220 )
221 })?;
222
223 if lrecl == 0 {
224 return Err(Error::new(
225 ErrorCode::CBKI001_INVALID_STATE,
226 "LRECL must be greater than zero",
227 ));
228 }
229
230 Ok(Self {
231 output,
232 lrecl,
233 record_count: 0,
234 })
235 }
236
237 #[inline]
242 #[must_use = "Handle the Result or propagate the error"]
243 pub fn write_record(&mut self, data: &[u8]) -> Result<()> {
244 let data_len = data.len();
245 let lrecl = self.lrecl_usize()?;
246
247 if data_len > lrecl {
248 return Err(Error::new(
249 ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
250 format!("Record too long: {data_len} bytes exceeds LRECL of {lrecl}"),
251 )
252 .with_context(ErrorContext {
253 record_index: Some(self.record_count + 1),
254 field_path: None,
255 byte_offset: None,
256 line_number: None,
257 details: Some("Record exceeds fixed length".to_string()),
258 }));
259 }
260
261 self.output.write_all(data).map_err(|e| {
262 Error::new(
263 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
264 format!("I/O error writing record: {e}"),
265 )
266 .with_context(ErrorContext {
267 record_index: Some(self.record_count + 1),
268 field_path: None,
269 byte_offset: None,
270 line_number: None,
271 details: None,
272 })
273 })?;
274
275 if data_len < lrecl {
276 let padding = vec![0u8; lrecl - data_len];
277 self.output.write_all(&padding).map_err(|e| {
278 Error::new(
279 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
280 format!("I/O error writing padding: {e}"),
281 )
282 .with_context(ErrorContext {
283 record_index: Some(self.record_count + 1),
284 field_path: None,
285 byte_offset: Some(u64::try_from(data_len).unwrap_or(u64::MAX)),
286 line_number: None,
287 details: Some("Error writing record padding".to_string()),
288 })
289 })?;
290 }
291
292 self.record_count += 1;
293 debug!(
294 "Wrote fixed record {} of {} bytes (data: {}, padding: {})",
295 self.record_count,
296 lrecl,
297 data_len,
298 lrecl - data_len
299 );
300 Ok(())
301 }
302
303 #[inline]
308 #[must_use = "Handle the Result or propagate the error"]
309 pub fn flush(&mut self) -> Result<()> {
310 self.output.flush().map_err(|e| {
311 Error::new(
312 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
313 format!("I/O error flushing output: {e}"),
314 )
315 })
316 }
317
318 #[must_use]
320 #[inline]
321 pub fn record_count(&self) -> u64 {
322 self.record_count
323 }
324
325 #[must_use]
327 #[inline]
328 pub fn lrecl(&self) -> u32 {
329 self.lrecl
330 }
331
332 #[inline]
333 fn lrecl_usize(&self) -> Result<usize> {
334 usize::try_from(self.lrecl).map_err(|_| {
335 Error::new(
336 ErrorCode::CBKP001_SYNTAX,
337 "LRECL exceeds platform addressable size",
338 )
339 })
340 }
341}
342
343#[cfg(test)]
344#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
345mod tests {
346 use super::*;
347 use proptest::collection::vec;
348 use proptest::prelude::*;
349 use std::io::Cursor;
350
351 #[test]
352 fn fixed_record_reader_basic() {
353 let data = b"ABCD1234EFGH5678";
354 let mut reader = FixedRecordReader::new(Cursor::new(data), Some(8)).unwrap();
355
356 let record1 = reader.read_record().unwrap().unwrap();
357 assert_eq!(record1, b"ABCD1234");
358 assert_eq!(reader.record_count(), 1);
359
360 let record2 = reader.read_record().unwrap().unwrap();
361 assert_eq!(record2, b"EFGH5678");
362 assert_eq!(reader.record_count(), 2);
363
364 let record3 = reader.read_record().unwrap();
365 assert!(record3.is_none());
366 }
367
368 #[test]
369 fn fixed_record_reader_partial_record_is_underflow() {
370 let data = b"ABCD123";
371 let mut reader = FixedRecordReader::new(Cursor::new(data), Some(8)).unwrap();
372
373 let error = reader.read_record().unwrap_err();
374 assert_eq!(error.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
375 }
376
377 #[test]
378 fn fixed_record_reader_zero_lrecl_is_invalid_state() {
379 let data = b"test";
380 let error = FixedRecordReader::new(Cursor::new(data), Some(0)).unwrap_err();
381 assert_eq!(error.code, ErrorCode::CBKI001_INVALID_STATE);
382 }
383
384 #[test]
385 fn fixed_record_reader_missing_lrecl_is_invalid_state() {
386 let data = b"test";
387 let error = FixedRecordReader::new(Cursor::new(data), None).unwrap_err();
388 assert_eq!(error.code, ErrorCode::CBKI001_INVALID_STATE);
389 }
390
391 #[test]
392 fn fixed_record_writer_basic() {
393 let mut output = Vec::new();
394 let mut writer = FixedRecordWriter::new(&mut output, Some(8)).unwrap();
395
396 writer.write_record(b"ABCD1234").unwrap();
397 writer.write_record(b"XYZ").unwrap();
398 writer.flush().unwrap();
399
400 assert_eq!(writer.record_count(), 2);
401 assert_eq!(output, b"ABCD1234XYZ\x00\x00\x00\x00\x00");
402 }
403
404 #[test]
405 fn fixed_record_writer_too_long_is_cbke501() {
406 let mut output = Vec::new();
407 let mut writer = FixedRecordWriter::new(&mut output, Some(4)).unwrap();
408
409 let error = writer.write_record(b"ABCDEFGH").unwrap_err();
410 assert_eq!(error.code, ErrorCode::CBKE501_JSON_TYPE_MISMATCH);
411 }
412
413 #[test]
414 fn fixed_record_writer_zero_lrecl_is_invalid_state() {
415 let mut output = Vec::new();
416 let error = FixedRecordWriter::new(&mut output, Some(0)).unwrap_err();
417 assert_eq!(error.code, ErrorCode::CBKI001_INVALID_STATE);
418 }
419
420 #[test]
421 fn fixed_record_writer_missing_lrecl_is_invalid_state() {
422 let mut output = Vec::new();
423 let error = FixedRecordWriter::new(&mut output, None).unwrap_err();
424 assert_eq!(error.code, ErrorCode::CBKI001_INVALID_STATE);
425 }
426
427 #[test]
428 fn validate_record_length_ok() {
429 let data = b"ABCD1234";
430 let mut reader = FixedRecordReader::new(Cursor::new(data), Some(8)).unwrap();
431 let record = reader.read_record().unwrap().unwrap();
432
433 let schema = Schema::new();
434 reader.validate_record_length(&schema, &record).unwrap();
435 }
436
437 #[test]
438 fn validate_record_length_mismatch_is_underflow() {
439 let data = b"ABCD1234";
440 let mut reader = FixedRecordReader::new(Cursor::new(data), Some(8)).unwrap();
441 let _ = reader.read_record().unwrap().unwrap();
442
443 let schema = Schema::new();
444 let error = reader.validate_record_length(&schema, b"ABC").unwrap_err();
445 assert_eq!(error.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
446 }
447
448 proptest! {
449 #[test]
450 fn prop_fixed_writer_reader_roundtrip(
451 lrecl in 1u16..=512u16,
452 payload in vec(any::<u8>(), 0..=512),
453 ) {
454 prop_assume!(payload.len() <= usize::from(lrecl));
455 let mut encoded = Vec::new();
456 let mut writer = FixedRecordWriter::new(&mut encoded, Some(u32::from(lrecl))).unwrap();
457 writer.write_record(&payload).unwrap();
458 writer.flush().unwrap();
459 prop_assert_eq!(encoded.len(), usize::from(lrecl));
460 prop_assert_eq!(&encoded[..payload.len()], payload.as_slice());
461
462 let mut reader = FixedRecordReader::new(Cursor::new(&encoded), Some(u32::from(lrecl))).unwrap();
463 let decoded = reader.read_record().unwrap().unwrap();
464 prop_assert_eq!(decoded, encoded.as_slice());
465 prop_assert_eq!(reader.read_record().unwrap(), None);
466 }
467
468 #[test]
469 fn prop_fixed_writer_rejects_oversize_payload(
470 lrecl in 1u16..=128u16,
471 extra in 1usize..=64usize,
472 ) {
473 let mut output = Vec::new();
474 let mut writer = FixedRecordWriter::new(&mut output, Some(u32::from(lrecl))).unwrap();
475 let payload = vec![0x41; usize::from(lrecl) + extra];
476 let error = writer.write_record(&payload).unwrap_err();
477 prop_assert_eq!(error.code, ErrorCode::CBKE501_JSON_TYPE_MISMATCH);
478 }
479 }
480
481 #[test]
484 fn fixed_reader_empty_file_returns_none() {
485 let mut reader = FixedRecordReader::new(Cursor::new(Vec::<u8>::new()), Some(8)).unwrap();
486 assert!(reader.read_record().unwrap().is_none());
487 assert_eq!(reader.record_count(), 0);
488 }
489
490 #[test]
491 fn fixed_reader_single_byte_lrecl() {
492 let data = b"ABCDE";
493 let mut reader = FixedRecordReader::new(Cursor::new(data.as_slice()), Some(1)).unwrap();
494 for expected in b"ABCDE" {
495 let record = reader.read_record().unwrap().unwrap();
496 assert_eq!(record, vec![*expected]);
497 }
498 assert!(reader.read_record().unwrap().is_none());
499 assert_eq!(reader.record_count(), 5);
500 }
501
502 #[test]
503 fn fixed_reader_lrecl_accessor() {
504 let reader = FixedRecordReader::new(Cursor::new(Vec::<u8>::new()), Some(42)).unwrap();
505 assert_eq!(reader.lrecl(), 42);
506 }
507
508 #[test]
509 fn fixed_writer_lrecl_accessor() {
510 let mut output = Vec::new();
511 let writer = FixedRecordWriter::new(&mut output, Some(42)).unwrap();
512 assert_eq!(writer.lrecl(), 42);
513 }
514
515 #[test]
516 fn fixed_writer_exact_lrecl_no_padding() {
517 let mut output = Vec::new();
518 let mut writer = FixedRecordWriter::new(&mut output, Some(4)).unwrap();
519 writer.write_record(b"ABCD").unwrap();
520 writer.flush().unwrap();
521 assert_eq!(output, b"ABCD");
522 }
523
524 #[test]
525 fn fixed_writer_empty_payload_full_padding() {
526 let mut output = Vec::new();
527 let mut writer = FixedRecordWriter::new(&mut output, Some(4)).unwrap();
528 writer.write_record(b"").unwrap();
529 writer.flush().unwrap();
530 assert_eq!(output, vec![0u8; 4]);
531 }
532
533 #[test]
534 fn fixed_multi_record_write_read_roundtrip() {
535 let lrecl = 10u32;
536 let payloads: Vec<&[u8]> = vec![b"AAAAAAAAAA", b"BB", b"CCCCCCCCCC"];
537 let mut encoded = Vec::new();
538 {
539 let mut writer = FixedRecordWriter::new(&mut encoded, Some(lrecl)).unwrap();
540 for p in &payloads {
541 writer.write_record(p).unwrap();
542 }
543 writer.flush().unwrap();
544 assert_eq!(writer.record_count(), 3);
545 }
546 assert_eq!(encoded.len(), 30);
547
548 let mut reader = FixedRecordReader::new(Cursor::new(&encoded), Some(lrecl)).unwrap();
549 for (i, expected) in payloads.iter().enumerate() {
550 let record = reader.read_record().unwrap().unwrap();
551 assert_eq!(
552 &record[..expected.len()],
553 *expected,
554 "record {i} data mismatch"
555 );
556 assert!(
558 record[expected.len()..].iter().all(|&b| b == 0),
559 "record {i} padding mismatch"
560 );
561 }
562 assert!(reader.read_record().unwrap().is_none());
563 assert_eq!(reader.record_count(), 3);
564 }
565
566 #[test]
567 fn fixed_streaming_many_records() {
568 let lrecl = 16u32;
569 let record_count = 500u64;
570 let payload = b"STREAMING_FIXED_";
571 assert_eq!(payload.len(), lrecl as usize);
572
573 let mut encoded = Vec::new();
574 {
575 let mut writer = FixedRecordWriter::new(&mut encoded, Some(lrecl)).unwrap();
576 for _ in 0..record_count {
577 writer.write_record(payload).unwrap();
578 }
579 writer.flush().unwrap();
580 }
581
582 let mut reader = FixedRecordReader::new(Cursor::new(&encoded), Some(lrecl)).unwrap();
583 let mut count = 0u64;
584 while let Some(record) = reader.read_record().unwrap() {
585 assert_eq!(record.as_slice(), payload.as_slice());
586 count += 1;
587 }
588 assert_eq!(count, record_count);
589 assert_eq!(reader.record_count(), record_count);
590 }
591}