1#![cfg_attr(not(test), deny(clippy::unwrap_used, clippy::expect_used))]
2use copybook_core::Schema;
14use copybook_error::{Error, ErrorCode, ErrorContext, Result};
15use std::io::{BufRead, BufReader, Read, Write};
16use tracing::{debug, warn};
17
18pub const RDW_HEADER_LEN: usize = 4;
20
21pub const RDW_MAX_PAYLOAD_LEN: usize = u16::MAX as usize;
23
24const RDW_READER_BUF_CAPACITY: usize = (u16::MAX as usize) + RDW_HEADER_LEN;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub struct RdwHeader {
42 bytes: [u8; RDW_HEADER_LEN],
43}
44
45impl RdwHeader {
46 #[must_use]
48 #[inline]
49 pub const fn from_bytes(bytes: [u8; RDW_HEADER_LEN]) -> Self {
50 Self { bytes }
51 }
52
53 #[inline]
58 #[must_use = "Handle the Result or propagate the error"]
59 pub fn from_payload_len(payload_len: usize, reserved: u16) -> Result<Self> {
60 let len = rdw_payload_len_to_u16(payload_len)?;
61 let len_bytes = len.to_be_bytes();
62 let reserved_bytes = reserved.to_be_bytes();
63 Ok(Self {
64 bytes: [
65 len_bytes[0],
66 len_bytes[1],
67 reserved_bytes[0],
68 reserved_bytes[1],
69 ],
70 })
71 }
72
73 #[must_use]
75 #[inline]
76 pub const fn bytes(self) -> [u8; RDW_HEADER_LEN] {
77 self.bytes
78 }
79
80 #[must_use]
82 #[inline]
83 pub const fn length(self) -> u16 {
84 u16::from_be_bytes([self.bytes[0], self.bytes[1]])
85 }
86
87 #[must_use]
89 #[inline]
90 pub const fn reserved(self) -> u16 {
91 u16::from_be_bytes([self.bytes[2], self.bytes[3]])
92 }
93
94 #[must_use]
98 #[inline]
99 pub const fn looks_ascii_corrupt(self) -> bool {
100 rdw_is_suspect_ascii_corruption(self.bytes)
101 }
102}
103
104#[inline]
109#[must_use = "Handle the Result or propagate the error"]
110pub fn rdw_payload_len_to_u16(len: usize) -> Result<u16> {
111 u16::try_from(len).map_err(|_| {
112 Error::new(
113 ErrorCode::CBKF102_RECORD_LENGTH_INVALID,
114 format!(
115 "RDW payload too large: {} bytes exceeds maximum of {}",
116 len,
117 u16::MAX
118 ),
119 )
120 })
121}
122
123#[must_use]
127#[inline]
128pub const fn rdw_is_suspect_ascii_corruption(rdw_header: [u8; RDW_HEADER_LEN]) -> bool {
129 copybook_rdw_predicates::rdw_is_suspect_ascii_corruption(rdw_header)
130}
131
132#[inline]
139#[must_use = "Handle the Result or propagate the error"]
140pub fn rdw_read_len<R: BufRead>(reader: &mut R) -> Result<u16> {
141 let buf = reader.fill_buf().map_err(|e| {
142 Error::new(
143 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
144 format!("I/O error peeking RDW length: {e}"),
145 )
146 })?;
147 if buf.len() < 2 {
148 return Err(Error::new(
149 ErrorCode::CBKF102_RECORD_LENGTH_INVALID,
150 format!(
151 "Incomplete RDW header: expected 2 bytes for length (have {})",
152 buf.len()
153 ),
154 ));
155 }
156
157 let hi = buf[0];
158 let lo = buf[1];
159 reader.consume(2);
160 Ok(u16::from_be_bytes([hi, lo]))
161}
162
163#[inline]
170#[must_use = "Handle the Result or propagate the error"]
171pub fn rdw_slice_body<R: BufRead>(reader: &mut R, len: u16) -> Result<&[u8]> {
172 let need = usize::from(len);
173 if need == 0 {
174 return Ok(&[]);
175 }
176
177 let buf = reader.fill_buf().map_err(|e| {
178 Error::new(
179 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
180 format!("I/O error reading RDW payload: {e}"),
181 )
182 })?;
183
184 if buf.len() < need {
185 return Err(Error::new(
186 ErrorCode::CBKF102_RECORD_LENGTH_INVALID,
187 format!(
188 "Incomplete RDW record payload: expected {} bytes (have {})",
189 need,
190 buf.len()
191 ),
192 ));
193 }
194
195 Ok(&buf[..need])
196}
197
198#[inline]
200#[must_use]
201pub const fn rdw_validate_and_finish(body: &[u8]) -> &[u8] {
202 body
203}
204
205#[inline]
213#[must_use = "Handle the Result or propagate the error"]
214pub fn rdw_try_peek_len<R: BufRead>(reader: &mut R) -> Result<Option<()>> {
215 let buf = reader.fill_buf().map_err(|e| {
216 Error::new(
217 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
218 format!("I/O error peeking RDW header: {e}"),
219 )
220 })?;
221 if buf.len() <= 1 {
222 return Ok(None);
223 }
224 Ok(Some(()))
225}
226
227#[derive(Debug, Clone)]
229pub struct RDWRecord {
230 pub header: [u8; RDW_HEADER_LEN],
232 pub payload: Vec<u8>,
234}
235
236impl RDWRecord {
237 #[inline]
254 #[must_use = "Handle the Result or propagate the error"]
255 pub fn try_new(payload: Vec<u8>) -> Result<Self> {
256 let header = RdwHeader::from_payload_len(payload.len(), 0)?.bytes();
257 Ok(Self { header, payload })
258 }
259
260 #[deprecated(
265 since = "0.4.3",
266 note = "use try_new() instead for fallible construction"
267 )]
268 #[allow(clippy::expect_used)] #[inline]
270 #[must_use]
271 pub fn new(payload: Vec<u8>) -> Self {
272 Self::try_new(payload).expect("RDW payload exceeds maximum size (65535 bytes)")
273 }
274
275 #[inline]
280 #[must_use = "Handle the Result or propagate the error"]
281 pub fn try_with_reserved(payload: Vec<u8>, reserved: u16) -> Result<Self> {
282 let header = RdwHeader::from_payload_len(payload.len(), reserved)?.bytes();
283 Ok(Self { header, payload })
284 }
285
286 #[deprecated(
291 since = "0.4.3",
292 note = "use try_with_reserved() instead for fallible construction"
293 )]
294 #[allow(clippy::expect_used)] #[inline]
296 #[must_use]
297 pub fn with_reserved(payload: Vec<u8>, reserved: u16) -> Self {
298 Self::try_with_reserved(payload, reserved)
299 .expect("RDW payload exceeds maximum size (65535 bytes)")
300 }
301
302 #[inline]
304 #[must_use]
305 pub fn length(&self) -> u16 {
306 RdwHeader::from_bytes(self.header).length()
307 }
308
309 #[inline]
311 #[must_use]
312 pub fn reserved(&self) -> u16 {
313 RdwHeader::from_bytes(self.header).reserved()
314 }
315
316 #[inline]
321 #[must_use = "Handle the Result or propagate the error"]
322 pub fn try_recompute_length(&mut self) -> Result<()> {
323 self.header = RdwHeader::from_payload_len(self.payload.len(), self.reserved())?.bytes();
324 Ok(())
325 }
326
327 #[deprecated(
332 since = "0.4.3",
333 note = "use try_recompute_length() instead for fallible operation"
334 )]
335 #[allow(clippy::expect_used)] #[inline]
337 pub fn recompute_length(&mut self) {
338 self.try_recompute_length()
339 .expect("RDW payload exceeds maximum size (65535 bytes)");
340 }
341
342 #[inline]
344 #[must_use]
345 pub fn as_bytes(&self) -> Vec<u8> {
346 let mut result = Vec::with_capacity(RDW_HEADER_LEN + self.payload.len());
347 result.extend_from_slice(&self.header);
348 result.extend_from_slice(&self.payload);
349 result
350 }
351}
352
353#[derive(Debug)]
355pub struct RDWRecordReader<R: Read> {
356 input: BufReader<R>,
357 record_count: u64,
358 strict_mode: bool,
359}
360
361impl<R: Read> RDWRecordReader<R> {
362 #[inline]
364 #[must_use]
365 pub fn new(input: R, strict_mode: bool) -> Self {
366 Self {
367 input: BufReader::with_capacity(RDW_READER_BUF_CAPACITY, input),
368 record_count: 0,
369 strict_mode,
370 }
371 }
372
373 #[inline]
374 fn peek_header(&mut self) -> Result<Option<[u8; RDW_HEADER_LEN]>> {
375 let peek = rdw_try_peek_len(&mut self.input).map_err(|error| {
376 error.with_context(ErrorContext {
377 record_index: Some(self.record_count + 1),
378 field_path: None,
379 byte_offset: Some(0),
380 line_number: None,
381 details: Some("Unable to peek RDW header".to_string()),
382 })
383 })?;
384
385 if peek.is_none() {
386 let buf = self.input.fill_buf().map_err(|e| {
387 Error::new(
388 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
389 format!("I/O error reading RDW header: {e}"),
390 )
391 .with_context(ErrorContext {
392 record_index: Some(self.record_count + 1),
393 field_path: None,
394 byte_offset: Some(0),
395 line_number: None,
396 details: Some("Unable to read RDW header".to_string()),
397 })
398 })?;
399
400 if buf.is_empty() {
401 debug!("Reached EOF after {} RDW records", self.record_count);
402 return Ok(None);
403 }
404
405 if self.strict_mode {
406 return Err(Error::new(
407 ErrorCode::CBKF221_RDW_UNDERFLOW,
408 "Incomplete RDW header: expected 4 bytes".to_string(),
409 )
410 .with_context(ErrorContext {
411 record_index: Some(self.record_count + 1),
412 field_path: None,
413 byte_offset: Some(0),
414 line_number: None,
415 details: Some("File ends with incomplete RDW header".to_string()),
416 }));
417 }
418
419 debug!(
420 "Reached EOF after {} RDW records (truncated header ignored)",
421 self.record_count
422 );
423 let remaining = buf.len();
424 self.input.consume(remaining);
425 return Ok(None);
426 }
427
428 let buf = self.input.fill_buf().map_err(|e| {
429 Error::new(
430 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
431 format!("I/O error reading RDW header: {e}"),
432 )
433 .with_context(ErrorContext {
434 record_index: Some(self.record_count + 1),
435 field_path: None,
436 byte_offset: Some(0),
437 line_number: None,
438 details: Some("Unable to read RDW header".to_string()),
439 })
440 })?;
441
442 if buf.len() < RDW_HEADER_LEN {
443 if self.strict_mode {
444 return Err(Error::new(
445 ErrorCode::CBKF221_RDW_UNDERFLOW,
446 "Incomplete RDW header: expected 4 bytes".to_string(),
447 )
448 .with_context(ErrorContext {
449 record_index: Some(self.record_count + 1),
450 field_path: None,
451 byte_offset: Some(0),
452 line_number: None,
453 details: Some("File ends with incomplete RDW header".to_string()),
454 }));
455 }
456
457 debug!(
458 "Reached EOF after {} RDW records (truncated header ignored)",
459 self.record_count
460 );
461 let remaining = buf.len();
462 self.input.consume(remaining);
463 return Ok(None);
464 }
465
466 Ok(Some([buf[0], buf[1], buf[2], buf[3]]))
467 }
468
469 #[inline]
475 #[must_use = "Handle the Result or propagate the error"]
476 pub fn read_record(&mut self) -> Result<Option<RDWRecord>> {
477 let Some(header) = self.peek_header()? else {
478 return Ok(None);
479 };
480
481 let length = match rdw_read_len(&mut self.input) {
482 Ok(len) => len,
483 Err(error) => {
484 return Err(error.with_context(ErrorContext {
485 record_index: Some(self.record_count + 1),
486 field_path: None,
487 byte_offset: Some(0),
488 line_number: None,
489 details: Some("Unable to read RDW body length".to_string()),
490 }));
491 }
492 };
493
494 self.input.consume(2);
496 let reserved = u16::from_be_bytes([header[2], header[3]]);
497
498 self.record_count += 1;
499 debug!(
500 "Read RDW header for record {}: length={}, reserved={:04X}",
501 self.record_count,
502 u32::from(length),
503 reserved
504 );
505
506 if reserved != 0 {
507 let error = Error::new(
508 ErrorCode::CBKR211_RDW_RESERVED_NONZERO,
509 format!("RDW reserved bytes are non-zero: {reserved:04X}"),
510 )
511 .with_context(ErrorContext {
512 record_index: Some(self.record_count),
513 field_path: None,
514 byte_offset: Some(2),
515 line_number: None,
516 details: Some(format!("Expected 0000, got {reserved:04X}")),
517 });
518
519 if self.strict_mode {
520 return Err(error);
521 }
522
523 warn!(
524 "RDW reserved bytes non-zero (record {}): {:04X}",
525 self.record_count, reserved
526 );
527 }
528
529 if Self::is_suspect_ascii_corruption(header) {
530 warn!(
531 "RDW appears to be ASCII-corrupted (record {}): {:02X} {:02X} {:02X} {:02X}",
532 self.record_count, header[0], header[1], header[2], header[3]
533 );
534
535 return Err(Error::new(
536 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
537 format!(
538 "RDW appears to be ASCII-corrupted: {:02X} {:02X} {:02X} {:02X}",
539 header[0], header[1], header[2], header[3]
540 ),
541 )
542 .with_context(ErrorContext {
543 record_index: Some(self.record_count),
544 field_path: None,
545 byte_offset: Some(0),
546 line_number: None,
547 details: Some("Suspected ASCII transfer corruption".to_string()),
548 }));
549 }
550
551 if length == 0 {
552 debug!("Zero-length RDW record {}", self.record_count);
553 return Ok(Some(RDWRecord {
554 header,
555 payload: Vec::new(),
556 }));
557 }
558
559 let payload_len = usize::from(length);
560 let body_slice = match rdw_slice_body(&mut self.input, length) {
561 Ok(slice) => slice,
562 Err(error) => {
563 return Err(error.with_context(ErrorContext {
564 record_index: Some(self.record_count),
565 field_path: None,
566 byte_offset: Some(4),
567 line_number: None,
568 details: Some("File ends with incomplete RDW payload".to_string()),
569 }));
570 }
571 };
572
573 let payload = rdw_validate_and_finish(body_slice).to_vec();
574 self.input.consume(payload_len);
575
576 debug!(
577 "Read RDW record {} payload: {} bytes",
578 self.record_count, length
579 );
580 Ok(Some(RDWRecord { header, payload }))
581 }
582
583 #[inline]
588 #[must_use = "Handle the Result or propagate the error"]
589 pub fn validate_zero_length_record(&self, schema: &Schema) -> Result<()> {
590 let min_size = Self::calculate_schema_fixed_prefix(schema);
591
592 if min_size > 0 {
593 return Err(Error::new(
594 ErrorCode::CBKF221_RDW_UNDERFLOW,
595 format!("Zero-length RDW record invalid: schema requires minimum {min_size} bytes"),
596 )
597 .with_context(ErrorContext {
598 record_index: Some(self.record_count),
599 field_path: None,
600 byte_offset: None,
601 line_number: None,
602 details: Some("Zero-length record with non-zero schema prefix".to_string()),
603 }));
604 }
605
606 Ok(())
607 }
608
609 #[inline]
611 #[must_use]
612 pub fn record_count(&self) -> u64 {
613 self.record_count
614 }
615
616 #[inline]
617 fn calculate_schema_fixed_prefix(schema: &Schema) -> u32 {
618 use copybook_core::{Field, Occurs};
619
620 fn find_first_odo_offset(fields: &[Field], current: &mut Option<u32>) {
621 for field in fields {
622 if let Some(Occurs::ODO { .. }) = &field.occurs {
623 let offset = field.offset;
624 match current {
625 Some(existing) => {
626 if offset < *existing {
627 *current = Some(offset);
628 }
629 }
630 None => *current = Some(offset),
631 }
632 }
633 if !field.children.is_empty() {
634 find_first_odo_offset(&field.children, current);
635 }
636 }
637 }
638
639 let mut first_odo_offset: Option<u32> = None;
640 find_first_odo_offset(&schema.fields, &mut first_odo_offset);
641
642 if let Some(offset) = first_odo_offset {
643 offset
644 } else if let Some(lrecl) = schema.lrecl_fixed {
645 lrecl
646 } else {
647 fn find_record_end(fields: &[Field], max_end: &mut u32) {
648 for field in fields {
649 let end = field.offset + field.len;
650 if end > *max_end {
651 *max_end = end;
652 }
653 if !field.children.is_empty() {
654 find_record_end(&field.children, max_end);
655 }
656 }
657 }
658
659 let mut max_end = 0;
660 find_record_end(&schema.fields, &mut max_end);
661 max_end
662 }
663 }
664
665 #[inline]
666 fn is_suspect_ascii_corruption(rdw_header: [u8; RDW_HEADER_LEN]) -> bool {
667 rdw_is_suspect_ascii_corruption(rdw_header)
668 }
669}
670
671#[derive(Debug)]
673pub struct RDWRecordWriter<W: Write> {
674 output: W,
675 record_count: u64,
676}
677
678impl<W: Write> RDWRecordWriter<W> {
679 #[inline]
681 #[must_use]
682 pub fn new(output: W) -> Self {
683 Self {
684 output,
685 record_count: 0,
686 }
687 }
688
689 #[inline]
694 #[must_use = "Handle the Result or propagate the error"]
695 pub fn write_record(&mut self, record: &RDWRecord) -> Result<()> {
696 self.output.write_all(&record.header).map_err(|e| {
697 Error::new(
698 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
699 format!("I/O error writing RDW header: {e}"),
700 )
701 .with_context(ErrorContext {
702 record_index: Some(self.record_count + 1),
703 field_path: None,
704 byte_offset: None,
705 line_number: None,
706 details: None,
707 })
708 })?;
709
710 self.output.write_all(&record.payload).map_err(|e| {
711 Error::new(
712 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
713 format!("I/O error writing RDW payload: {e}"),
714 )
715 .with_context(ErrorContext {
716 record_index: Some(self.record_count + 1),
717 field_path: None,
718 byte_offset: Some(4),
719 line_number: None,
720 details: None,
721 })
722 })?;
723
724 self.record_count += 1;
725 debug!(
726 "Wrote RDW record {} with {} byte payload",
727 self.record_count,
728 record.payload.len()
729 );
730 Ok(())
731 }
732
733 #[inline]
738 #[must_use = "Handle the Result or propagate the error"]
739 pub fn write_record_from_payload(
740 &mut self,
741 payload: &[u8],
742 preserve_reserved: Option<u16>,
743 ) -> Result<()> {
744 let length = payload.len();
745 let header =
746 RdwHeader::from_payload_len(length, preserve_reserved.unwrap_or(0)).map_err(|_| {
747 Error::new(
748 ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
749 format!(
750 "RDW payload too large: {length} bytes exceeds maximum of {}",
751 u16::MAX
752 ),
753 )
754 .with_context(ErrorContext {
755 record_index: Some(self.record_count + 1),
756 field_path: None,
757 byte_offset: None,
758 line_number: None,
759 details: Some("RDW length field is 16-bit".to_string()),
760 })
761 })?;
762
763 let record = RDWRecord {
764 header: header.bytes(),
765 payload: payload.to_vec(),
766 };
767 self.write_record(&record)
768 }
769
770 #[inline]
775 #[must_use = "Handle the Result or propagate the error"]
776 pub fn flush(&mut self) -> Result<()> {
777 self.output.flush().map_err(|e| {
778 Error::new(
779 ErrorCode::CBKF104_RDW_SUSPECT_ASCII,
780 format!("I/O error flushing output: {e}"),
781 )
782 })
783 }
784
785 #[inline]
787 #[must_use]
788 pub fn record_count(&self) -> u64 {
789 self.record_count
790 }
791}
792
793#[cfg(test)]
794#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
795mod tests {
796 use super::*;
797 use proptest::collection::vec;
798 use proptest::prelude::*;
799 use std::io::{BufRead, Cursor};
800
801 #[test]
802 fn header_from_payload_len_roundtrips() {
803 let header = RdwHeader::from_payload_len(10, 0x1234).unwrap();
804 assert_eq!(header.length(), 10);
805 assert_eq!(header.reserved(), 0x1234);
806 assert_eq!(header.bytes(), [0x00, 0x0A, 0x12, 0x34]);
807 }
808
809 #[test]
810 fn header_from_payload_len_oversize_fails() {
811 let err = RdwHeader::from_payload_len(RDW_MAX_PAYLOAD_LEN + 1, 0).unwrap_err();
812 assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
813 }
814
815 #[test]
816 fn ascii_corruption_heuristic_matches_digits_only() {
817 assert!(rdw_is_suspect_ascii_corruption([b'1', b'2', 0, 0]));
818 assert!(!rdw_is_suspect_ascii_corruption([0, 12, 0, 0]));
819 }
820
821 #[test]
822 fn rdw_peek_len_none_on_short_buffer() {
823 let mut cur = Cursor::new(Vec::<u8>::new());
824 assert!(rdw_try_peek_len(&mut cur).unwrap().is_none());
825
826 let mut cur = Cursor::new(vec![0x00]);
827 assert!(rdw_try_peek_len(&mut cur).unwrap().is_none());
828 }
829
830 #[test]
831 fn rdw_read_len_consumes_two_bytes() {
832 let mut cur = Cursor::new(vec![0x00, 0x03, 0xAA, 0xBB, b'A', b'B', b'C']);
833 let len = rdw_read_len(&mut cur).unwrap();
834 assert_eq!(len, 3);
835
836 cur.consume(2);
837 let body = rdw_slice_body(&mut cur, len).unwrap();
838 assert_eq!(rdw_validate_and_finish(body), b"ABC");
839 }
840
841 #[test]
842 fn rdw_slice_body_short_is_cbkf102() {
843 let mut cur = Cursor::new(vec![0x00, 0x10, 0xAA, 0xBB, 0xCC, 0xDD]);
844 let len = rdw_read_len(&mut cur).unwrap();
845 cur.consume(2);
846 let err = rdw_slice_body(&mut cur, len).unwrap_err();
847 assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
848 }
849
850 #[test]
851 fn rdw_record_try_new_roundtrip() {
852 let record = RDWRecord::try_new(b"hello".to_vec()).unwrap();
853 assert_eq!(record.length(), 5);
854 assert_eq!(record.reserved(), 0);
855 assert_eq!(record.payload, b"hello");
856 }
857
858 #[test]
859 fn rdw_record_try_with_reserved_roundtrip() {
860 let record = RDWRecord::try_with_reserved(b"test".to_vec(), 0x1234).unwrap();
861 assert_eq!(record.length(), 4);
862 assert_eq!(record.reserved(), 0x1234);
863 assert_eq!(record.payload, b"test");
864 }
865
866 #[test]
867 fn rdw_record_try_recompute_updates_length() {
868 let mut record = RDWRecord::try_new(b"test".to_vec()).unwrap();
869 record.payload = b"longer_payload".to_vec();
870 record.try_recompute_length().unwrap();
871 assert_eq!(record.length(), 14);
872 }
873
874 #[test]
875 fn rdw_record_as_bytes_prepends_header() {
876 let record = RDWRecord::try_new(b"hi".to_vec()).unwrap();
877 assert_eq!(record.as_bytes(), vec![0, 2, 0, 0, b'h', b'i']);
878 }
879
880 #[test]
881 fn rdw_writer_writes_record() {
882 let mut output = Vec::new();
883 let mut writer = RDWRecordWriter::new(&mut output);
884 let record = RDWRecord::try_new(b"test".to_vec()).unwrap();
885 writer.write_record(&record).unwrap();
886 assert_eq!(writer.record_count(), 1);
887 assert_eq!(output, vec![0, 4, 0, 0, b't', b'e', b's', b't']);
888 }
889
890 #[test]
891 fn rdw_writer_writes_record_from_payload_with_reserved() {
892 let mut output = Vec::new();
893 let mut writer = RDWRecordWriter::new(&mut output);
894 writer
895 .write_record_from_payload(b"test", Some(0x1234))
896 .unwrap();
897 assert_eq!(output, vec![0, 4, 0x12, 0x34, b't', b'e', b's', b't']);
898 }
899
900 #[test]
901 fn rdw_reader_reads_single_record() {
902 let data = vec![0, 5, 0, 0, b'h', b'e', b'l', b'l', b'o'];
903 let mut reader = RDWRecordReader::new(Cursor::new(data), false);
904
905 let record = reader.read_record().unwrap().unwrap();
906 assert_eq!(record.length(), 5);
907 assert_eq!(record.reserved(), 0);
908 assert_eq!(record.payload, b"hello");
909 assert_eq!(reader.record_count(), 1);
910 }
911
912 #[test]
913 fn rdw_reader_reads_multiple_records() {
914 let data = vec![
915 0, 2, 0, 0, b'h', b'i', 0, 3, 0, 0, b'b', b'y', b'e',
917 ];
918 let mut reader = RDWRecordReader::new(Cursor::new(data), false);
919
920 let first = reader.read_record().unwrap().unwrap();
921 assert_eq!(first.payload, b"hi");
922 assert_eq!(reader.record_count(), 1);
923
924 let second = reader.read_record().unwrap().unwrap();
925 assert_eq!(second.payload, b"bye");
926 assert_eq!(reader.record_count(), 2);
927
928 assert!(reader.read_record().unwrap().is_none());
929 }
930
931 #[test]
932 fn rdw_reader_reserved_nonzero_is_warning_in_lenient_mode() {
933 let data = vec![0, 4, 0x12, 0x34, b't', b'e', b's', b't'];
934 let mut reader = RDWRecordReader::new(Cursor::new(data), false);
935
936 let record = reader.read_record().unwrap().unwrap();
937 assert_eq!(record.reserved(), 0x1234);
938 assert_eq!(record.payload, b"test");
939 }
940
941 #[test]
942 fn rdw_reader_reserved_nonzero_is_error_in_strict_mode() {
943 let data = vec![0, 4, 0x12, 0x34, b't', b'e', b's', b't'];
944 let mut reader = RDWRecordReader::new(Cursor::new(data), true);
945
946 let error = reader.read_record().unwrap_err();
947 assert_eq!(error.code, ErrorCode::CBKR211_RDW_RESERVED_NONZERO);
948 }
949
950 #[test]
951 fn rdw_reader_incomplete_header_lenient_is_eof() {
952 let data = vec![0, 4];
953 let mut reader = RDWRecordReader::new(Cursor::new(data), false);
954 let result = reader.read_record().unwrap();
955 assert!(result.is_none());
956 }
957
958 #[test]
959 fn rdw_reader_incomplete_header_strict_is_underflow() {
960 let data = vec![0, 4];
961 let mut reader = RDWRecordReader::new(Cursor::new(data), true);
962 let error = reader.read_record().unwrap_err();
963 assert_eq!(error.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
964 }
965
966 #[test]
967 fn rdw_reader_incomplete_payload_is_cbkf102() {
968 let data = vec![0, 5, 0, 0, b'h', b'i'];
969 let mut reader = RDWRecordReader::new(Cursor::new(data), false);
970
971 let error = reader.read_record().unwrap_err();
972 assert_eq!(error.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
973 }
974
975 #[test]
976 fn rdw_reader_ascii_corruption_is_detected() {
977 let data = vec![b'1', b'2', 0, 0, b'H', b'E', b'L', b'L', b'O'];
978 let mut reader = RDWRecordReader::new(Cursor::new(data), false);
979
980 let error = reader.read_record().unwrap_err();
981 assert_eq!(error.code, ErrorCode::CBKF104_RDW_SUSPECT_ASCII);
982 }
983
984 #[test]
985 fn rdw_reader_zero_length_validation_obeys_schema_prefix() {
986 use copybook_core::{Field, FieldKind, Occurs, Schema, TailODO};
987
988 let mut counter = Field::with_kind(
989 5,
990 "CTR".to_string(),
991 FieldKind::BinaryInt {
992 bits: 16,
993 signed: false,
994 },
995 );
996 counter.offset = 0;
997 counter.len = 2;
998
999 let mut array = Field::with_kind(5, "ARR".to_string(), FieldKind::Alphanum { len: 1 });
1000 array.offset = 2;
1001 array.len = 1;
1002 array.occurs = Some(Occurs::ODO {
1003 min: 0,
1004 max: 5,
1005 counter_path: "CTR".to_string(),
1006 });
1007
1008 let schema = Schema {
1009 fields: vec![counter, array],
1010 lrecl_fixed: None,
1011 tail_odo: Some(TailODO {
1012 counter_path: "CTR".to_string(),
1013 min_count: 0,
1014 max_count: 5,
1015 array_path: "ARR".to_string(),
1016 }),
1017 fingerprint: String::new(),
1018 };
1019
1020 let reader = RDWRecordReader::new(Cursor::new(Vec::<u8>::new()), false);
1021 let error = reader.validate_zero_length_record(&schema).unwrap_err();
1022 assert_eq!(error.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
1023
1024 let empty_schema = Schema::new();
1025 reader.validate_zero_length_record(&empty_schema).unwrap();
1026 }
1027
1028 #[test]
1029 fn rdw_writer_payload_too_large_is_cbke501() {
1030 let mut output = Vec::new();
1031 let mut writer = RDWRecordWriter::new(&mut output);
1032 let large_payload = vec![0u8; usize::from(u16::MAX) + 1];
1033 let err = writer
1034 .write_record_from_payload(&large_payload, None)
1035 .unwrap_err();
1036 assert_eq!(err.code, ErrorCode::CBKE501_JSON_TYPE_MISMATCH);
1037 }
1038
1039 #[test]
1040 fn rdw_record_oversize_try_new_is_cbkf102() {
1041 let large_payload = vec![0u8; usize::from(u16::MAX) + 1];
1042 let err = RDWRecord::try_new(large_payload).unwrap_err();
1043 assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
1044 assert!(err.message.contains("RDW payload too large"));
1045 }
1046
1047 #[test]
1048 fn rdw_record_oversize_try_with_reserved_is_cbkf102() {
1049 let large_payload = vec![0u8; usize::from(u16::MAX) + 1];
1050 let err = RDWRecord::try_with_reserved(large_payload, 0x1234).unwrap_err();
1051 assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
1052 assert!(err.message.contains("RDW payload too large"));
1053 }
1054
1055 #[test]
1056 fn rdw_record_oversize_try_recompute_is_cbkf102() {
1057 let mut record = RDWRecord::try_new(b"test".to_vec()).unwrap();
1058 record.payload = vec![0u8; usize::from(u16::MAX) + 1];
1059 let err = record.try_recompute_length().unwrap_err();
1060 assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
1061 assert!(err.message.contains("RDW payload too large"));
1062 }
1063
1064 #[test]
1065 #[should_panic(expected = "RDW payload exceeds maximum size")]
1066 #[allow(deprecated)]
1067 fn rdw_record_new_panics_on_oversize_payload() {
1068 let payload = vec![0u8; usize::from(u16::MAX) + 1];
1069 let _ = RDWRecord::new(payload);
1070 }
1071
1072 proptest! {
1073 #[test]
1074 fn prop_header_payload_len_roundtrip(payload_len in 0u16..=u16::MAX, reserved in any::<u16>()) {
1075 let header = RdwHeader::from_payload_len(payload_len as usize, reserved).unwrap();
1076 prop_assert_eq!(header.length(), payload_len);
1077 prop_assert_eq!(header.reserved(), reserved);
1078 prop_assert_eq!(RdwHeader::from_bytes(header.bytes()).length(), payload_len);
1079 }
1080
1081 #[test]
1082 fn prop_ascii_corruption_heuristic_matches_manual(b0 in any::<u8>(), b1 in any::<u8>(), b2 in any::<u8>(), b3 in any::<u8>()) {
1083 let header = [b0, b1, b2, b3];
1084 let expected = b0.is_ascii_digit() && b1.is_ascii_digit();
1085 prop_assert_eq!(rdw_is_suspect_ascii_corruption(header), expected);
1086 prop_assert_eq!(RdwHeader::from_bytes(header).looks_ascii_corrupt(), expected);
1087 }
1088
1089 #[test]
1090 fn prop_rdw_record_length_matches_payload(payload in vec(any::<u8>(), 0..=1024), reserved in any::<u16>()) {
1091 let record = RDWRecord::try_with_reserved(payload.clone(), reserved).unwrap();
1092 prop_assert_eq!(usize::from(record.length()), payload.len());
1093 prop_assert_eq!(record.reserved(), reserved);
1094 let bytes = record.as_bytes();
1095 prop_assert_eq!(bytes.len(), RDW_HEADER_LEN + payload.len());
1096 prop_assert_eq!(&bytes[RDW_HEADER_LEN..], payload.as_slice());
1097 }
1098
1099 #[test]
1100 fn prop_rdw_writer_from_payload_encodes_header(payload in vec(any::<u8>(), 0..=512), reserved in any::<u16>()) {
1101 let mut output = Vec::new();
1102 let mut writer = RDWRecordWriter::new(&mut output);
1103 writer.write_record_from_payload(&payload, Some(reserved)).unwrap();
1104 prop_assert_eq!(writer.record_count(), 1);
1105 let header = RdwHeader::from_bytes(output[0..RDW_HEADER_LEN].try_into().unwrap());
1106 prop_assert_eq!(usize::from(header.length()), payload.len());
1107 prop_assert_eq!(header.reserved(), reserved);
1108 prop_assert_eq!(&output[RDW_HEADER_LEN..], payload.as_slice());
1109 }
1110
1111 #[test]
1112 fn prop_rdw_writer_reader_roundtrip(
1113 payload in vec(any::<u8>(), 0..=1024),
1114 reserved in any::<u16>(),
1115 ) {
1116 let mut encoded = Vec::new();
1117 let mut writer = RDWRecordWriter::new(&mut encoded);
1118 writer.write_record_from_payload(&payload, Some(reserved)).unwrap();
1119
1120 let mut reader = RDWRecordReader::new(Cursor::new(encoded), false);
1121 let decoded = reader.read_record().unwrap().unwrap();
1122 prop_assert_eq!(decoded.payload.as_slice(), payload.as_slice());
1123 prop_assert_eq!(decoded.reserved(), reserved);
1124 prop_assert!(reader.read_record().unwrap().is_none());
1125 }
1126 }
1127
1128 #[test]
1131 fn rdw_header_big_endian_length_parsing() {
1132 let header = RdwHeader::from_bytes([0x01, 0x00, 0x00, 0x00]);
1134 assert_eq!(header.length(), 256);
1135 assert_eq!(header.reserved(), 0);
1136
1137 let header = RdwHeader::from_bytes([0xFF, 0xFF, 0x00, 0x00]);
1139 assert_eq!(header.length(), u16::MAX);
1140 }
1141
1142 #[test]
1143 fn rdw_header_reserved_bytes_preserved() {
1144 let header = RdwHeader::from_bytes([0x00, 0x0A, 0xDE, 0xAD]);
1145 assert_eq!(header.length(), 10);
1146 assert_eq!(header.reserved(), 0xDEAD);
1147 }
1148
1149 #[test]
1150 fn rdw_reader_empty_file_returns_none() {
1151 let mut reader = RDWRecordReader::new(Cursor::new(Vec::<u8>::new()), false);
1152 assert!(reader.read_record().unwrap().is_none());
1153 assert_eq!(reader.record_count(), 0);
1154 }
1155
1156 #[test]
1157 fn rdw_reader_empty_file_strict_returns_none() {
1158 let mut reader = RDWRecordReader::new(Cursor::new(Vec::<u8>::new()), true);
1159 assert!(reader.read_record().unwrap().is_none());
1160 assert_eq!(reader.record_count(), 0);
1161 }
1162
1163 #[test]
1164 fn rdw_reader_zero_length_record() {
1165 let data = vec![0x00, 0x00, 0x00, 0x00];
1166 let mut reader = RDWRecordReader::new(Cursor::new(data), false);
1167 let record = reader.read_record().unwrap().unwrap();
1168 assert_eq!(record.length(), 0);
1169 assert!(record.payload.is_empty());
1170 assert_eq!(reader.record_count(), 1);
1171 assert!(reader.read_record().unwrap().is_none());
1172 }
1173
1174 #[test]
1175 fn rdw_reader_max_record_size() {
1176 let payload = vec![0xABu8; u16::MAX as usize];
1177 let mut data = Vec::with_capacity(RDW_HEADER_LEN + payload.len());
1178 data.extend_from_slice(&[0xFF, 0xFF, 0x00, 0x00]);
1179 data.extend_from_slice(&payload);
1180
1181 let mut reader = RDWRecordReader::new(Cursor::new(data), false);
1182 let record = reader.read_record().unwrap().unwrap();
1183 assert_eq!(record.length(), u16::MAX);
1184 assert_eq!(record.payload.len(), u16::MAX as usize);
1185 assert!(record.payload.iter().all(|&b| b == 0xAB));
1186 }
1187
1188 #[test]
1189 fn rdw_multi_record_write_read_roundtrip() {
1190 let payloads: Vec<&[u8]> = vec![b"alpha", b"", b"gamma delta", b"x"];
1191 let mut encoded = Vec::new();
1192 {
1193 let mut writer = RDWRecordWriter::new(&mut encoded);
1194 for p in &payloads {
1195 writer.write_record_from_payload(p, None).unwrap();
1196 }
1197 writer.flush().unwrap();
1198 assert_eq!(writer.record_count(), 4);
1199 }
1200
1201 let mut reader = RDWRecordReader::new(Cursor::new(&encoded), false);
1202 for expected in &payloads {
1203 let record = reader.read_record().unwrap().unwrap();
1204 assert_eq!(record.payload.as_slice(), *expected);
1205 }
1206 assert!(reader.read_record().unwrap().is_none());
1207 assert_eq!(reader.record_count(), 4);
1208 }
1209
1210 #[test]
1211 fn rdw_streaming_many_records() {
1212 let record_count = 500;
1213 let payload = b"STREAMING_TEST";
1214 let mut encoded = Vec::new();
1215 {
1216 let mut writer = RDWRecordWriter::new(&mut encoded);
1217 for _ in 0..record_count {
1218 writer.write_record_from_payload(payload, None).unwrap();
1219 }
1220 writer.flush().unwrap();
1221 }
1222
1223 let mut reader = RDWRecordReader::new(Cursor::new(&encoded), false);
1224 let mut count = 0u64;
1225 while let Some(record) = reader.read_record().unwrap() {
1226 assert_eq!(record.payload, payload);
1227 count += 1;
1228 }
1229 assert_eq!(count, record_count);
1230 assert_eq!(reader.record_count(), record_count);
1231 }
1232
1233 #[test]
1234 fn rdw_reader_single_byte_header_lenient_is_eof() {
1235 let data = vec![0x00];
1236 let mut reader = RDWRecordReader::new(Cursor::new(data), false);
1237 assert!(reader.read_record().unwrap().is_none());
1238 }
1239
1240 #[test]
1241 fn rdw_reader_single_byte_header_strict_is_underflow() {
1242 let data = vec![0x00];
1243 let mut reader = RDWRecordReader::new(Cursor::new(data), true);
1244 let err = reader.read_record().unwrap_err();
1245 assert_eq!(err.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
1246 }
1247
1248 #[test]
1251 fn rdw_header_zero_length_zero_reserved() {
1252 let header = RdwHeader::from_payload_len(0, 0).unwrap();
1253 assert_eq!(header.length(), 0);
1254 assert_eq!(header.reserved(), 0);
1255 assert_eq!(header.bytes(), [0, 0, 0, 0]);
1256 }
1257
1258 #[test]
1259 fn rdw_header_max_payload_len() {
1260 let header = RdwHeader::from_payload_len(RDW_MAX_PAYLOAD_LEN, 0).unwrap();
1261 assert_eq!(header.length(), u16::MAX);
1262 }
1263
1264 #[test]
1265 fn rdw_header_max_payload_len_plus_one_fails() {
1266 let err = RdwHeader::from_payload_len(RDW_MAX_PAYLOAD_LEN + 1, 0).unwrap_err();
1267 assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
1268 }
1269
1270 #[test]
1271 fn rdw_header_length_one() {
1272 let header = RdwHeader::from_payload_len(1, 0).unwrap();
1273 assert_eq!(header.length(), 1);
1274 assert_eq!(header.bytes(), [0, 1, 0, 0]);
1275 }
1276
1277 #[test]
1278 fn rdw_header_looks_ascii_corrupt_false_for_binary() {
1279 let header = RdwHeader::from_bytes([0x00, 0x0A, 0x00, 0x00]);
1280 assert!(!header.looks_ascii_corrupt());
1281 }
1282
1283 #[test]
1284 fn rdw_header_looks_ascii_corrupt_true_for_digits() {
1285 let header = RdwHeader::from_bytes([b'0', b'5', 0x00, 0x00]);
1286 assert!(header.looks_ascii_corrupt());
1287 }
1288
1289 #[test]
1290 fn rdw_payload_len_to_u16_zero() {
1291 assert_eq!(rdw_payload_len_to_u16(0).unwrap(), 0);
1292 }
1293
1294 #[test]
1295 fn rdw_payload_len_to_u16_max() {
1296 assert_eq!(
1297 rdw_payload_len_to_u16(usize::from(u16::MAX)).unwrap(),
1298 u16::MAX
1299 );
1300 }
1301
1302 #[test]
1303 fn rdw_payload_len_to_u16_too_large() {
1304 let err = rdw_payload_len_to_u16(usize::from(u16::MAX) + 1).unwrap_err();
1305 assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
1306 assert!(err.message.contains("RDW payload too large"));
1307 }
1308
1309 #[test]
1310 fn rdw_slice_body_zero_length_returns_empty() {
1311 let mut cur = Cursor::new(vec![0xAA, 0xBB]);
1312 let body = rdw_slice_body(&mut cur, 0).unwrap();
1313 assert!(body.is_empty());
1314 }
1315
1316 #[test]
1317 fn rdw_validate_and_finish_identity() {
1318 let data = b"test_data";
1319 let result = rdw_validate_and_finish(data);
1320 assert_eq!(result, data);
1321 }
1322
1323 #[test]
1324 fn rdw_record_clone() {
1325 let record = RDWRecord::try_new(b"clone_me".to_vec()).unwrap();
1326 let cloned = record.clone();
1327 assert_eq!(cloned.payload, record.payload);
1328 assert_eq!(cloned.header, record.header);
1329 }
1330
1331 #[test]
1332 fn rdw_record_debug_format() {
1333 let record = RDWRecord::try_new(b"dbg".to_vec()).unwrap();
1334 let debug = format!("{record:?}");
1335 assert!(debug.contains("RDWRecord"));
1336 }
1337
1338 #[test]
1339 fn rdw_record_empty_payload() {
1340 let record = RDWRecord::try_new(Vec::new()).unwrap();
1341 assert_eq!(record.length(), 0);
1342 assert!(record.payload.is_empty());
1343 assert_eq!(record.as_bytes().len(), RDW_HEADER_LEN);
1344 }
1345
1346 #[test]
1347 fn rdw_reader_three_byte_header_lenient_is_eof() {
1348 let data = vec![0x00, 0x05, 0x00];
1349 let mut reader = RDWRecordReader::new(Cursor::new(data), false);
1350 assert!(reader.read_record().unwrap().is_none());
1351 }
1352
1353 #[test]
1354 fn rdw_reader_three_byte_header_strict_is_underflow() {
1355 let data = vec![0x00, 0x05, 0x00];
1356 let mut reader = RDWRecordReader::new(Cursor::new(data), true);
1357 let err = reader.read_record().unwrap_err();
1358 assert_eq!(err.code, ErrorCode::CBKF221_RDW_UNDERFLOW);
1359 }
1360
1361 #[test]
1362 fn rdw_writer_flush_succeeds() {
1363 let mut output = Vec::new();
1364 let mut writer = RDWRecordWriter::new(&mut output);
1365 writer.flush().unwrap();
1366 assert_eq!(writer.record_count(), 0);
1367 }
1368
1369 #[test]
1370 fn rdw_writer_multiple_records_count() {
1371 let mut output = Vec::new();
1372 let mut writer = RDWRecordWriter::new(&mut output);
1373 for i in 0..5 {
1374 writer.write_record_from_payload(&[i], None).unwrap();
1375 }
1376 assert_eq!(writer.record_count(), 5);
1377 }
1378
1379 #[test]
1380 fn rdw_try_peek_len_two_bytes_returns_some() {
1381 let mut cur = Cursor::new(vec![0x00, 0x05]);
1382 assert!(rdw_try_peek_len(&mut cur).unwrap().is_some());
1383 }
1384
1385 #[test]
1386 fn rdw_read_len_incomplete_is_error() {
1387 let mut cur = Cursor::new(vec![0x00]);
1388 let err = rdw_read_len(&mut cur).unwrap_err();
1389 assert_eq!(err.code, ErrorCode::CBKF102_RECORD_LENGTH_INVALID);
1390 }
1391}