1#![allow(clippy::missing_inline_in_public_items)]
11
12use crate::JSON_SCHEMA_VERSION;
13use crate::options::{DecodeOptions, EncodeOptions, RecordFormat, ZonedEncodingFormat};
14use crate::zoned_overpunch::ZeroSignPolicy;
15use base64::Engine;
16use copybook_core::{Error, ErrorCode, Result, Schema};
17use serde_json::Value;
18use std::cell::RefCell;
19use std::convert::TryFrom;
20use std::fmt;
21use std::io::{BufRead, BufReader, Read, Write};
22use tracing::info;
23
24fn flatten_fields_into(
27 source: &serde_json::Map<String, Value>,
28 target: &mut serde_json::Map<String, Value>,
29) {
30 for (key, value) in source {
31 if let Value::Object(nested) = value {
32 flatten_fields_into(nested, target);
34 } else {
35 target.insert(key.clone(), value.clone());
36 }
37 }
38}
39
40fn build_json_envelope(
45 fields: serde_json::Map<String, Value>,
46 schema: &Schema,
47 options: &DecodeOptions,
48 record_index: u64,
49 record_length: usize,
50 raw_b64: Option<String>,
51 encoding_metadata: Vec<(String, ZonedEncodingFormat)>,
52) -> Value {
53 let mut root = serde_json::Map::new();
54
55 root.insert(
56 String::from("schema"),
57 Value::String(JSON_SCHEMA_VERSION.into()),
58 );
59 root.insert(
60 String::from("record_index"),
61 Value::Number(serde_json::Number::from(record_index)),
62 );
63
64 let codepage = options.codepage.to_string();
65 root.insert(String::from("codepage"), Value::String(codepage));
66
67 flatten_fields_into(&fields, &mut root);
68 root.insert(String::from("fields"), Value::Object(fields));
69
70 if options.emit_meta {
71 if !schema.fingerprint.is_empty() {
72 root.insert(
73 String::from("schema_fingerprint"),
74 Value::String(schema.fingerprint.clone()),
75 );
76 root.insert(
77 String::from("__schema_id"),
78 Value::String(schema.fingerprint.clone()),
79 );
80 }
81 root.insert(
82 String::from("length"),
83 Value::Number(serde_json::Number::from(record_length)),
84 );
85 root.insert(
86 String::from("__record_index"),
87 Value::Number(serde_json::Number::from(record_index)),
88 );
89 root.insert(
90 String::from("__length"),
91 Value::Number(serde_json::Number::from(record_length)),
92 );
93 }
94
95 if let Some(raw) = raw_b64 {
96 root.insert(String::from("raw_b64"), Value::String(raw.clone()));
97 root.insert(String::from("__raw_b64"), Value::String(raw));
98 }
99
100 if options.preserve_zoned_encoding && !encoding_metadata.is_empty() {
101 let mut meta_map = serde_json::Map::new();
102 for (field_name, format) in encoding_metadata {
103 let format_text = format.to_string();
104 meta_map.insert(field_name, Value::String(format_text));
105 }
106 root.insert(String::from("_encoding_metadata"), Value::Object(meta_map));
107 }
108
109 Value::Object(root)
110}
111
112thread_local! {
113 static WARNING_COUNTER: RefCell<u64> = const { RefCell::new(0) };
114}
115
116#[cfg(feature = "metrics")]
117mod telemetry {
118 use crate::options::{Codepage, DecodeOptions, RecordFormat, ZonedEncodingFormat};
119 use metrics::{counter, gauge, histogram};
120
121 #[inline]
122 pub fn record_read(bytes: usize, options: &DecodeOptions) {
123 let format_label = format_label(options.format);
124 let codepage_label = codepage_label(options.codepage);
125 let zero_policy_label = zero_policy_label(options);
126
127 counter!(
128 "copybook_records_total",
129 "format" => format_label,
130 "codepage" => codepage_label,
131 "zero_policy" => zero_policy_label
132 )
133 .increment(1);
134 counter!(
135 "copybook_bytes_total",
136 "format" => format_label,
137 "codepage" => codepage_label,
138 "zero_policy" => zero_policy_label
139 )
140 .increment(bytes as u64);
141 }
142
143 #[inline]
144 pub fn record_error(family: &'static str) {
145 counter!("copybook_decode_errors_total", "family" => family).increment(1);
146 }
147
148 #[inline]
149 pub fn record_completion(
150 duration_seconds: f64,
151 throughput_mibps: f64,
152 options: &DecodeOptions,
153 ) {
154 let format_label = format_label(options.format);
155 let codepage_label = codepage_label(options.codepage);
156
157 if duration_seconds.is_finite() && duration_seconds >= 0.0 {
158 histogram!(
159 "copybook_decode_seconds",
160 "format" => format_label,
161 "codepage" => codepage_label
162 )
163 .record(duration_seconds);
164 }
165
166 if throughput_mibps.is_finite() {
167 gauge!(
168 "copybook_throughput_mibps",
169 "format" => format_label,
170 "codepage" => codepage_label
171 )
172 .set(throughput_mibps);
173 }
174 }
175
176 #[inline]
177 fn zero_policy_label(options: &DecodeOptions) -> &'static str {
178 if options.preserve_zoned_encoding {
179 "preserved"
180 } else if options.preferred_zoned_encoding != ZonedEncodingFormat::Auto {
181 "override"
182 } else {
183 "preferred"
184 }
185 }
186
187 #[inline]
188 fn format_label(format: RecordFormat) -> &'static str {
189 match format {
190 RecordFormat::Fixed => "fixed",
191 RecordFormat::RDW => "rdw",
192 }
193 }
194
195 #[inline]
196 fn codepage_label(codepage: Codepage) -> &'static str {
197 match codepage {
198 Codepage::ASCII => "ascii",
199 Codepage::CP037 => "cp037",
200 Codepage::CP273 => "cp273",
201 Codepage::CP500 => "cp500",
202 Codepage::CP1047 => "cp1047",
203 Codepage::CP1140 => "cp1140",
204 }
205 }
206}
207
208#[cfg(not(feature = "metrics"))]
209mod telemetry {
210 use crate::options::DecodeOptions;
211
212 #[inline]
213 pub fn record_read(_bytes: usize, _options: &DecodeOptions) {}
214
215 #[inline]
216 pub fn record_error(_family: &'static str) {}
217
218 #[inline]
219 pub fn record_completion(
220 _duration_seconds: f64,
221 _throughput_mibps: f64,
222 _options: &DecodeOptions,
223 ) {
224 }
225}
226
227#[derive(Debug, Default, Clone, PartialEq)]
232pub struct RunSummary {
233 pub records_processed: u64,
235 pub records_with_errors: u64,
237 pub warnings: u64,
239 pub processing_time_ms: u64,
241 pub bytes_processed: u64,
243 pub schema_fingerprint: String,
245 pub throughput_mbps: f64,
247 pub peak_memory_bytes: Option<u64>,
249 pub threads_used: usize,
251}
252
253impl RunSummary {
254 #[must_use]
256 pub fn new() -> Self {
257 Self::default()
258 }
259
260 #[must_use]
262 pub fn with_threads(threads: usize) -> Self {
263 Self {
264 threads_used: threads,
265 ..Self::default()
266 }
267 }
268
269 #[allow(clippy::cast_precision_loss)]
271 pub fn calculate_throughput(&mut self) {
272 if self.processing_time_ms > 0 {
273 let seconds = self.processing_time_ms as f64 / 1000.0;
274 let megabytes = self.bytes_processed as f64 / (1024.0 * 1024.0);
275 self.throughput_mbps = megabytes / seconds;
276 }
277 }
278
279 #[must_use]
281 pub const fn has_errors(&self) -> bool {
282 self.records_with_errors > 0
283 }
284
285 #[must_use]
287 pub const fn has_warnings(&self) -> bool {
288 self.warnings > 0
289 }
290
291 #[must_use]
293 pub const fn is_successful(&self) -> bool {
294 !self.has_errors()
295 }
296
297 #[must_use]
299 pub const fn total_records(&self) -> u64 {
300 self.records_processed + self.records_with_errors
301 }
302
303 #[must_use]
305 #[allow(clippy::cast_precision_loss)]
306 pub fn success_rate(&self) -> f64 {
307 let total = self.total_records();
308 if total == 0 {
309 100.0
310 } else {
311 (self.records_processed as f64 / total as f64) * 100.0
312 }
313 }
314
315 #[must_use]
317 pub fn error_rate(&self) -> f64 {
318 100.0 - self.success_rate()
319 }
320
321 #[must_use]
323 #[allow(clippy::cast_precision_loss)]
324 pub fn processing_time_seconds(&self) -> f64 {
325 self.processing_time_ms as f64 / 1000.0
326 }
327
328 #[must_use]
330 #[allow(clippy::cast_precision_loss)]
331 pub fn bytes_processed_mb(&self) -> f64 {
332 self.bytes_processed as f64 / (1024.0 * 1024.0)
333 }
334
335 pub fn set_schema_fingerprint(&mut self, fingerprint: String) {
337 self.schema_fingerprint = fingerprint;
338 }
339
340 pub fn set_peak_memory_bytes(&mut self, bytes: u64) {
342 self.peak_memory_bytes = Some(bytes);
343 }
344}
345
346impl fmt::Display for RunSummary {
347 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
348 writeln!(f, "Processing Summary:")?;
349 writeln!(f, " Records processed: {}", self.records_processed)?;
350 writeln!(f, " Records with errors: {}", self.records_with_errors)?;
351 writeln!(f, " Warnings: {}", self.warnings)?;
352 writeln!(f, " Success rate: {:.1}%", self.success_rate())?;
353 writeln!(
354 f,
355 " Processing time: {:.2}s",
356 self.processing_time_seconds()
357 )?;
358 writeln!(f, " Bytes processed: {:.2} MB", self.bytes_processed_mb())?;
359 writeln!(f, " Throughput: {:.2} MB/s", self.throughput_mbps)?;
360 writeln!(f, " Threads used: {}", self.threads_used)?;
361 if let Some(peak_memory) = self.peak_memory_bytes {
362 #[allow(clippy::cast_precision_loss)]
363 let peak_mb = peak_memory as f64 / (1024.0 * 1024.0);
364 writeln!(f, " Peak memory: {peak_mb:.2} MB")?;
365 }
366 if !self.schema_fingerprint.is_empty() {
367 writeln!(f, " Schema fingerprint: {}", self.schema_fingerprint)?;
368 }
369 Ok(())
370 }
371}
372
373#[inline]
400#[must_use = "Handle the Result or propagate the error"]
401pub fn decode_record(schema: &Schema, data: &[u8], options: &DecodeOptions) -> Result<Value> {
402 decode_record_with_raw_data(schema, data, options, None, 0)
403}
404
405#[inline]
441#[must_use = "Handle the Result or propagate the error"]
442pub fn decode_record_with_scratch(
443 schema: &Schema,
444 data: &[u8],
445 options: &DecodeOptions,
446 scratch: &mut crate::memory::ScratchBuffers,
447) -> Result<Value> {
448 decode_record_with_scratch_and_raw(schema, data, options, None, 0, scratch)
449}
450
451fn decode_record_with_scratch_and_raw(
453 schema: &Schema,
454 data: &[u8],
455 options: &DecodeOptions,
456 raw_data: Option<Vec<u8>>,
457 record_index: u64,
458 scratch: &mut crate::memory::ScratchBuffers,
459) -> Result<Value> {
460 use serde_json::Map;
461
462 let mut fields_map = Map::new();
463 let mut record_raw = None;
464 let mut encoding_acc = Vec::new();
465
466 if let Some(raw_bytes) = raw_data.filter(|_| {
467 matches!(
468 options.emit_raw,
469 crate::options::RawMode::Record | crate::options::RawMode::RecordRDW
470 )
471 }) {
472 record_raw = Some(base64::engine::general_purpose::STANDARD.encode(raw_bytes));
473 }
474
475 process_fields_recursive_with_scratch(
476 &schema.fields,
477 data,
478 &mut fields_map,
479 options,
480 scratch,
481 record_index,
482 &mut encoding_acc,
483 )?;
484
485 Ok(build_json_envelope(
486 fields_map,
487 schema,
488 options,
489 record_index,
490 data.len(),
491 record_raw,
492 encoding_acc,
493 ))
494}
495
496#[inline]
501#[must_use = "Handle the Result or propagate the error"]
502pub fn decode_record_with_raw_data(
503 schema: &Schema,
504 data: &[u8],
505 options: &DecodeOptions,
506 raw_data_with_header: Option<&[u8]>,
507 record_index: u64,
508) -> Result<Value> {
509 use crate::options::RawMode;
510 use serde_json::Map;
511
512 let mut fields_map = Map::new();
513 let mut scratch_buffers: Option<crate::memory::ScratchBuffers> = None;
514 let mut encoding_acc = Vec::new();
515
516 process_fields_recursive(
517 &schema.fields,
518 data,
519 &mut fields_map,
520 options,
521 &mut scratch_buffers,
522 record_index,
523 &mut encoding_acc,
524 )?;
525
526 let mut record_raw = None;
527 match options.emit_raw {
528 RawMode::Off => {}
529 RawMode::Record | RawMode::Field => {
530 let raw_b64 = base64::engine::general_purpose::STANDARD.encode(data);
531 record_raw = Some(raw_b64);
532 }
533 RawMode::RecordRDW => {
534 if let Some(full_raw) = raw_data_with_header {
535 let raw_b64 = base64::engine::general_purpose::STANDARD.encode(full_raw);
536 record_raw = Some(raw_b64);
537 } else {
538 let raw_b64 = base64::engine::general_purpose::STANDARD.encode(data);
539 record_raw = Some(raw_b64);
540 }
541 }
542 }
543
544 Ok(build_json_envelope(
545 fields_map,
546 schema,
547 options,
548 record_index,
549 data.len(),
550 record_raw,
551 encoding_acc,
552 ))
553}
554
555fn process_fields_recursive(
560 fields: &[copybook_core::Field],
561 data: &[u8],
562 json_obj: &mut serde_json::Map<String, Value>,
563 options: &DecodeOptions,
564 scratch_buffers: &mut Option<crate::memory::ScratchBuffers>,
565 record_index: u64,
566 encoding_acc: &mut Vec<(String, ZonedEncodingFormat)>,
567) -> Result<()> {
568 use copybook_core::FieldKind;
569
570 let total_fields = fields.len();
571
572 for (field_index, field) in fields.iter().enumerate() {
573 match (&field.kind, &field.occurs) {
574 (_, Some(occurs)) => {
575 process_array_field(
576 field,
577 occurs,
578 data,
579 json_obj,
580 options,
581 fields,
582 scratch_buffers,
583 record_index,
584 encoding_acc,
585 )?;
586 }
587 (FieldKind::Group, None) if field.level > 1 => {
588 let mut group_obj = serde_json::Map::new();
589 process_fields_recursive(
590 &field.children,
591 data,
592 &mut group_obj,
593 options,
594 scratch_buffers,
595 record_index,
596 encoding_acc,
597 )?;
598 json_obj.insert(field.name.clone(), Value::Object(group_obj));
599 }
600 (FieldKind::Group, None) => {
601 process_fields_recursive(
602 &field.children,
603 data,
604 json_obj,
605 options,
606 scratch_buffers,
607 record_index,
608 encoding_acc,
609 )?;
610 }
611 _ => {
612 process_scalar_field_standard(
613 field,
614 field_index,
615 total_fields,
616 data,
617 json_obj,
618 options,
619 scratch_buffers,
620 encoding_acc,
621 )?;
622 }
623 }
624 }
625
626 Ok(())
627}
628
629fn process_fields_recursive_with_scratch(
632 fields: &[copybook_core::Field],
633 data: &[u8],
634 json_obj: &mut serde_json::Map<String, Value>,
635 options: &DecodeOptions,
636 scratch: &mut crate::memory::ScratchBuffers,
637 record_index: u64,
638 encoding_acc: &mut Vec<(String, ZonedEncodingFormat)>,
639) -> Result<()> {
640 use copybook_core::FieldKind;
641
642 for field in fields {
643 if is_filler_field(field) && !options.emit_filler {
644 continue;
645 }
646
647 match (&field.kind, &field.occurs) {
648 (_, Some(occurs)) => {
649 process_array_field_with_scratch(
650 field,
651 occurs,
652 data,
653 json_obj,
654 options,
655 fields,
656 scratch,
657 record_index,
658 encoding_acc,
659 )?;
660 }
661 (FieldKind::Group, None) if field.level > 1 => {
662 let mut group_obj = serde_json::Map::new();
663 process_fields_recursive_with_scratch(
664 &field.children,
665 data,
666 &mut group_obj,
667 options,
668 scratch,
669 record_index,
670 encoding_acc,
671 )?;
672 json_obj.insert(field.name.clone(), Value::Object(group_obj));
673 }
674 (FieldKind::Group, None) => {
675 process_fields_recursive_with_scratch(
676 &field.children,
677 data,
678 json_obj,
679 options,
680 scratch,
681 record_index,
682 encoding_acc,
683 )?;
684 }
685 _ => {
686 process_scalar_field_with_scratch(
687 field,
688 data,
689 json_obj,
690 options,
691 scratch,
692 encoding_acc,
693 )?;
694 }
695 }
696 }
697
698 Ok(())
699}
700
701#[inline]
711#[allow(clippy::too_many_arguments)]
712fn process_scalar_field_standard(
713 field: ©book_core::Field,
714 field_index: usize,
715 total_fields: usize,
716 data: &[u8],
717 json_obj: &mut serde_json::Map<String, Value>,
718 options: &DecodeOptions,
719 scratch_buffers: &mut Option<crate::memory::ScratchBuffers>,
720 encoding_acc: &mut Vec<(String, ZonedEncodingFormat)>,
721) -> Result<()> {
722 if matches!(field.kind, copybook_core::FieldKind::Renames { .. }) {
724 let Some(resolved) = &field.resolved_renames else {
725 return Err(Error::new(
726 ErrorCode::CBKD101_INVALID_FIELD_TYPE,
727 format!(
728 "RENAMES field '{name}' has no resolved metadata",
729 name = field.name
730 ),
731 ));
732 };
733
734 let alias_start = resolved.offset as usize;
735 let alias_end = alias_start + resolved.length as usize;
736
737 if alias_end > data.len() {
738 return Err(Error::new(
739 ErrorCode::CBKD301_RECORD_TOO_SHORT,
740 format!(
741 "RENAMES field '{name}' at offset {offset} with length {length} exceeds data length {data_len}",
742 name = field.name,
743 offset = resolved.offset,
744 length = resolved.length,
745 data_len = data.len()
746 ),
747 ));
748 }
749
750 let alias_data = &data[alias_start..alias_end];
751 let text = crate::charset::ebcdic_to_utf8(
752 alias_data,
753 options.codepage,
754 options.on_decode_unmappable,
755 )?;
756 json_obj.insert(field.name.clone(), Value::String(text));
757 return Ok(());
758 }
759
760 let field_start = field.offset as usize;
761 let mut field_end = field_start + field.len as usize;
762
763 if options.format == RecordFormat::RDW
764 && field_index + 1 == total_fields
765 && matches!(field.kind, copybook_core::FieldKind::Alphanum { .. })
766 && data.len() > field_end
767 {
768 field_end = data.len();
769 }
770
771 if field_start > data.len() {
772 return Err(Error::new(
773 ErrorCode::CBKD301_RECORD_TOO_SHORT,
774 format!(
775 "Field '{name}' starts beyond record boundary",
776 name = field.name
777 ),
778 ));
779 }
780
781 field_end = field_end.min(data.len());
782
783 if field_start >= field_end {
784 return Ok(());
785 }
786
787 let field_data = &data[field_start..field_end];
788 let value = decode_scalar_field_value_standard(field, field_data, options, scratch_buffers)?;
789
790 if options.preserve_zoned_encoding {
792 collect_zoned_encoding_info(field, field_data, options, encoding_acc);
793 }
794
795 json_obj.insert(field.name.clone(), value);
796
797 if matches!(options.emit_raw, crate::options::RawMode::Field) {
799 let raw_key = format!("{}_raw_b64", field.name);
800 let raw_b64 = base64::engine::general_purpose::STANDARD.encode(field_data);
801 json_obj.insert(raw_key, Value::String(raw_b64));
802 }
803
804 Ok(())
805}
806
807#[inline]
811fn process_scalar_field_with_scratch(
812 field: ©book_core::Field,
813 data: &[u8],
814 json_obj: &mut serde_json::Map<String, Value>,
815 options: &DecodeOptions,
816 scratch: &mut crate::memory::ScratchBuffers,
817 encoding_acc: &mut Vec<(String, ZonedEncodingFormat)>,
818) -> Result<()> {
819 if matches!(field.kind, copybook_core::FieldKind::Renames { .. }) {
821 let Some(resolved) = &field.resolved_renames else {
822 return Err(Error::new(
823 ErrorCode::CBKD101_INVALID_FIELD_TYPE,
824 format!(
825 "RENAMES field '{name}' has no resolved metadata",
826 name = field.name
827 ),
828 ));
829 };
830
831 let alias_start = resolved.offset as usize;
832 let alias_end = alias_start + resolved.length as usize;
833
834 if alias_end > data.len() {
835 return Err(Error::new(
836 ErrorCode::CBKD301_RECORD_TOO_SHORT,
837 format!(
838 "RENAMES field '{name}' at offset {offset} with length {length} exceeds data length {data_len}",
839 name = field.name,
840 offset = resolved.offset,
841 length = resolved.length,
842 data_len = data.len()
843 ),
844 ));
845 }
846
847 let alias_data = &data[alias_start..alias_end];
848 let text = crate::charset::ebcdic_to_utf8(
849 alias_data,
850 options.codepage,
851 options.on_decode_unmappable,
852 )?;
853 json_obj.insert(field.name.clone(), Value::String(text));
854 return Ok(());
855 }
856
857 let field_start = field.offset as usize;
858 let mut field_end = field_start + field.len as usize;
859
860 if field_start > data.len() {
861 return Err(Error::new(
862 ErrorCode::CBKD301_RECORD_TOO_SHORT,
863 format!(
864 "Field '{name}' starts beyond record boundary",
865 name = field.name
866 ),
867 ));
868 }
869
870 if options.format == RecordFormat::RDW {
871 field_end = field_end.min(data.len());
872 }
873
874 if field_start >= field_end {
875 return Ok(());
876 }
877
878 if field_end > data.len() {
879 return Err(Error::new(
880 ErrorCode::CBKD301_RECORD_TOO_SHORT,
881 format!(
882 "Field '{name}' at offset {offset} with length {length} exceeds data length {data_len}",
883 name = field.name,
884 offset = field.offset,
885 length = field.len,
886 data_len = data.len()
887 ),
888 ));
889 }
890
891 let field_data = &data[field_start..field_end];
892 let value = decode_scalar_field_value_with_scratch(field, field_data, options, scratch)?;
893
894 if options.preserve_zoned_encoding {
896 collect_zoned_encoding_info(field, field_data, options, encoding_acc);
897 }
898
899 json_obj.insert(field.name.clone(), value);
900
901 if matches!(options.emit_raw, crate::options::RawMode::Field) {
903 let raw_key = format!("{}_raw_b64", field.name);
904 let raw_b64 = base64::engine::general_purpose::STANDARD.encode(field_data);
905 json_obj.insert(raw_key, Value::String(raw_b64));
906 }
907
908 Ok(())
909}
910
911#[allow(clippy::too_many_arguments)]
913fn process_array_field(
914 field: ©book_core::Field,
915 occurs: ©book_core::Occurs,
916 data: &[u8],
917 json_obj: &mut serde_json::Map<String, Value>,
918 options: &DecodeOptions,
919 all_fields: &[copybook_core::Field],
920 scratch_buffers: &mut Option<crate::memory::ScratchBuffers>,
921 record_index: u64,
922 encoding_acc: &mut Vec<(String, ZonedEncodingFormat)>,
923) -> Result<()> {
924 use copybook_core::{FieldKind, Occurs};
925
926 let count = match occurs {
927 Occurs::Fixed { count } => *count,
928 Occurs::ODO {
929 min,
930 max,
931 counter_path,
932 } => {
933 let scratch = scratch_buffers.get_or_insert_with(crate::memory::ScratchBuffers::new);
935 let counter_value =
936 find_and_read_counter_field(counter_path, all_fields, data, options, scratch)?;
937
938 let counter_field = find_field_by_path(all_fields, counter_path)?;
939 let validation_context = crate::odo_redefines::OdoValidationContext {
940 field_path: field.path.clone(),
941 counter_path: counter_path.clone(),
942 record_index,
943 byte_offset: u64::from(counter_field.offset),
944 };
945 let validation = crate::odo_redefines::validate_odo_decode(
946 counter_value,
947 *min,
948 *max,
949 &validation_context,
950 options,
951 )?;
952
953 if let Some(warning) = validation.warning {
954 tracing::warn!("{}", warning);
955 increment_warning_counter();
956 }
957
958 validation.actual_count
959 }
960 };
961
962 let element_size = field.len as usize;
963 let array_start = field.offset as usize;
964 let total_array_size = element_size * count as usize;
965 let array_end = array_start + total_array_size;
966
967 if array_end > data.len() {
969 return Err(Error::new(
970 ErrorCode::CBKD301_RECORD_TOO_SHORT,
971 format!(
972 "Array '{}' requires {} bytes but only {} bytes available",
973 field.name,
974 total_array_size,
975 data.len().saturating_sub(array_start)
976 ),
977 ));
978 }
979
980 let mut array_values = Vec::new();
982 for i in 0..count {
983 let element_start = array_start + (i as usize * element_size);
984 let element_end = element_start + element_size;
985
986 let element_value = match &field.kind {
987 FieldKind::Group => {
988 let mut element_obj = serde_json::Map::new();
990 let element_base_offset = u32::try_from(element_start).map_err(|_| {
991 Error::new(
992 ErrorCode::CBKD301_RECORD_TOO_SHORT,
993 format!("Array element offset {element_start} exceeds supported range"),
994 )
995 })?;
996 let adjusted_children = adjust_field_offsets(&field.children, element_base_offset);
997 process_fields_recursive(
998 &adjusted_children,
999 data,
1000 &mut element_obj,
1001 options,
1002 scratch_buffers,
1003 record_index,
1004 encoding_acc,
1005 )?;
1006 Value::Object(element_obj)
1007 }
1008 FieldKind::Condition { values } => condition_value(values, "CONDITION_ARRAY"),
1009 _ => {
1010 let element_data = &data[element_start..element_end];
1011 let val = decode_scalar_field_value_standard(
1012 field,
1013 element_data,
1014 options,
1015 scratch_buffers,
1016 )?;
1017 if options.preserve_zoned_encoding {
1018 collect_zoned_encoding_info(field, element_data, options, encoding_acc);
1019 }
1020 val
1021 }
1022 };
1023
1024 array_values.push(element_value);
1025 }
1026
1027 json_obj.insert(field.name.clone(), Value::Array(array_values));
1028 Ok(())
1029}
1030
1031#[allow(clippy::too_many_arguments)]
1033fn process_array_field_with_scratch(
1034 field: ©book_core::Field,
1035 occurs: ©book_core::Occurs,
1036 data: &[u8],
1037 json_obj: &mut serde_json::Map<String, Value>,
1038 options: &DecodeOptions,
1039 all_fields: &[copybook_core::Field],
1040 scratch: &mut crate::memory::ScratchBuffers,
1041 record_index: u64,
1042 encoding_acc: &mut Vec<(String, ZonedEncodingFormat)>,
1043) -> Result<()> {
1044 use copybook_core::{FieldKind, Occurs};
1045 use serde_json::Value;
1046
1047 let count = match occurs {
1048 Occurs::Fixed { count } => *count,
1049 Occurs::ODO {
1050 min,
1051 max,
1052 counter_path,
1053 } => {
1054 let counter_value =
1056 find_and_read_counter_field(counter_path, all_fields, data, options, scratch)?;
1057
1058 let counter_field = find_field_by_path(all_fields, counter_path)?;
1059 let validation_context = crate::odo_redefines::OdoValidationContext {
1060 field_path: field.path.clone(),
1061 counter_path: counter_path.clone(),
1062 record_index,
1063 byte_offset: u64::from(counter_field.offset),
1064 };
1065 let validation = crate::odo_redefines::validate_odo_decode(
1066 counter_value,
1067 *min,
1068 *max,
1069 &validation_context,
1070 options,
1071 )?;
1072
1073 if let Some(warning) = validation.warning {
1074 tracing::warn!("{}", warning);
1075 increment_warning_counter();
1076 }
1077
1078 validation.actual_count
1079 }
1080 };
1081
1082 let element_size = field.len as usize;
1083 let array_start = field.offset as usize;
1084 let total_array_size = element_size * count as usize;
1085 let array_end = array_start + total_array_size;
1086
1087 if array_end > data.len() {
1088 return Err(Error::new(
1089 ErrorCode::CBKD301_RECORD_TOO_SHORT,
1090 format!(
1091 "Array field '{}' with {} elements at offset {} requires {} bytes but record has {}",
1092 field.name,
1093 count,
1094 array_start,
1095 total_array_size,
1096 data.len() - array_start
1097 ),
1098 ));
1099 }
1100
1101 let mut array_values = Vec::new();
1102
1103 for i in 0..count {
1104 let element_offset = array_start + (i as usize * element_size);
1105 let element_data = &data[element_offset..element_offset + element_size];
1106
1107 let element_value = match &field.kind {
1108 FieldKind::Group => {
1109 let mut group_obj = serde_json::Map::new();
1111
1112 let element_offset_u32 = u32::try_from(element_offset).map_err(|_| {
1114 Error::new(
1115 ErrorCode::CBKD301_RECORD_TOO_SHORT,
1116 format!("Array element offset {element_offset} exceeds supported range"),
1117 )
1118 })?;
1119
1120 let mut element_field = field.clone();
1121 element_field.offset = element_offset_u32;
1122 element_field.occurs = None; process_fields_recursive_with_scratch(
1125 &element_field.children,
1126 data,
1127 &mut group_obj,
1128 options,
1129 scratch,
1130 record_index,
1131 encoding_acc,
1132 )?;
1133 Value::Object(group_obj)
1134 }
1135 FieldKind::Condition { values } => condition_value(values, "CONDITION_ARRAY"),
1136 _ => {
1137 let val =
1138 decode_scalar_field_value_with_scratch(field, element_data, options, scratch)?;
1139 if options.preserve_zoned_encoding {
1140 collect_zoned_encoding_info(field, element_data, options, encoding_acc);
1141 }
1142 val
1143 }
1144 };
1145
1146 array_values.push(element_value);
1147 }
1148
1149 json_obj.insert(field.name.clone(), Value::Array(array_values));
1150 Ok(())
1151}
1152
1153fn find_and_read_counter_field(
1155 counter_path: &str,
1156 all_fields: &[copybook_core::Field],
1157 data: &[u8],
1158 options: &DecodeOptions,
1159 scratch: &mut crate::memory::ScratchBuffers,
1160) -> Result<u32> {
1161 let counter_field = find_field_by_path(all_fields, counter_path)?;
1163
1164 let field_start = counter_field.offset as usize;
1166 let field_end = field_start + counter_field.len as usize;
1167
1168 if field_end > data.len() {
1169 return Err(Error::new(
1170 ErrorCode::CBKD301_RECORD_TOO_SHORT,
1171 format!("Counter field '{counter_path}' extends beyond record"),
1172 ));
1173 }
1174
1175 let field_data = &data[field_start..field_end];
1176
1177 match &counter_field.kind {
1179 copybook_core::FieldKind::ZonedDecimal {
1180 digits,
1181 scale,
1182 signed,
1183 sign_separate,
1184 } => {
1185 let count = if let Some(sign_sep) = sign_separate {
1186 let decimal = crate::numeric::decode_zoned_decimal_sign_separate(
1187 field_data,
1188 *digits,
1189 *scale,
1190 sign_sep,
1191 options.codepage,
1192 )?;
1193 decimal_counter_to_u32(&decimal, counter_path)?
1194 } else {
1195 let decimal_str = crate::numeric::decode_zoned_decimal_to_string_with_scratch(
1196 field_data,
1197 *digits,
1198 *scale,
1199 *signed,
1200 options.codepage,
1201 counter_field.blank_when_zero,
1202 scratch,
1203 )?;
1204 decimal_str.parse::<u32>().map_err(|_| {
1205 Error::new(
1206 ErrorCode::CBKS121_COUNTER_NOT_FOUND,
1207 format!("ODO counter '{counter_path}' has invalid value: {decimal_str}"),
1208 )
1209 })?
1210 };
1211
1212 Ok(count)
1213 }
1214 copybook_core::FieldKind::BinaryInt { bits, signed } => {
1215 let int_value = crate::numeric::decode_binary_int(field_data, *bits, *signed)?;
1216 if int_value < 0 {
1217 return Err(Error::new(
1218 ErrorCode::CBKS121_COUNTER_NOT_FOUND,
1219 format!("ODO counter '{counter_path}' has negative value: {int_value}"),
1220 ));
1221 }
1222 Ok(u32::try_from(int_value).map_err(|_| {
1223 Error::new(
1224 ErrorCode::CBKS121_COUNTER_NOT_FOUND,
1225 format!("ODO counter '{counter_path}' exceeds supported range: {int_value}"),
1226 )
1227 })?)
1228 }
1229 copybook_core::FieldKind::PackedDecimal {
1230 digits,
1231 scale,
1232 signed,
1233 } => {
1234 let decimal_str = crate::numeric::decode_packed_decimal_to_string_with_scratch(
1235 field_data, *digits, *scale, *signed, scratch,
1236 )?;
1237 let count = decimal_str.parse::<u32>().map_err(|_| {
1238 Error::new(
1239 ErrorCode::CBKS121_COUNTER_NOT_FOUND,
1240 format!("ODO counter '{counter_path}' has invalid value: {decimal_str}"),
1241 )
1242 })?;
1243 Ok(count)
1244 }
1245 _ => Err(Error::new(
1246 ErrorCode::CBKS121_COUNTER_NOT_FOUND,
1247 format!("ODO counter '{counter_path}' has unsupported type"),
1248 )),
1249 }
1250}
1251
1252fn find_field_by_path<'a>(
1254 fields: &'a [copybook_core::Field],
1255 path: &str,
1256) -> Result<&'a copybook_core::Field> {
1257 for field in fields {
1258 if field.path == path || field.name == path {
1259 return Ok(field);
1260 }
1261 if let Ok(found) = find_field_by_path(&field.children, path) {
1263 return Ok(found);
1264 }
1265 }
1266
1267 Err(Error::new(
1268 ErrorCode::CBKS121_COUNTER_NOT_FOUND,
1269 format!("ODO counter field '{path}' not found"),
1270 ))
1271}
1272
1273fn adjust_field_offsets(
1279 fields: &[copybook_core::Field],
1280 base_offset: u32,
1281) -> Vec<copybook_core::Field> {
1282 fields
1283 .iter()
1284 .map(|field| {
1285 let mut adjusted_field = field.clone();
1286 adjusted_field.offset = base_offset;
1287 if !adjusted_field.children.is_empty() {
1288 adjusted_field.children =
1289 adjust_field_offsets(&adjusted_field.children, base_offset);
1290 }
1291 adjusted_field
1292 })
1293 .collect()
1294}
1295
1296#[inline]
1298fn is_filler_field(field: ©book_core::Field) -> bool {
1299 field.name.eq_ignore_ascii_case("FILLER") || field.name.starts_with("_filler_")
1300}
1301
1302#[inline]
1307fn collect_zoned_encoding_info(
1308 field: ©book_core::Field,
1309 field_data: &[u8],
1310 options: &DecodeOptions,
1311 encoding_acc: &mut Vec<(String, ZonedEncodingFormat)>,
1312) {
1313 if let copybook_core::FieldKind::ZonedDecimal { digits, signed, .. } = &field.kind
1314 && let Ok((_, Some(info))) = crate::numeric::decode_zoned_decimal_with_encoding(
1315 field_data,
1316 *digits,
1317 0, *signed,
1319 options.codepage,
1320 field.blank_when_zero,
1321 true,
1322 )
1323 && !info.has_mixed_encoding
1324 {
1325 encoding_acc.push((field.name.clone(), info.detected_format));
1326 }
1327}
1328
1329fn numeric_string_to_value(s: String, options: &DecodeOptions) -> Value {
1335 use crate::options::JsonNumberMode;
1336 match options.json_number_mode {
1337 JsonNumberMode::Lossless => Value::String(s),
1338 JsonNumberMode::Native => {
1339 if !s.contains('.') && !s.contains('e') && !s.contains('E') {
1341 if let Ok(n) = s.parse::<i64>() {
1342 return Value::Number(serde_json::Number::from(n));
1343 }
1344 if let Ok(n) = s.parse::<u64>() {
1345 return Value::Number(serde_json::Number::from(n));
1346 }
1347 }
1348 if let Ok(f) = s.parse::<f64>()
1350 && let Some(n) = serde_json::Number::from_f64(f)
1351 {
1352 return Value::Number(n);
1353 }
1354 Value::String(s)
1356 }
1357 }
1358}
1359
1360#[allow(clippy::too_many_lines)]
1362fn decode_scalar_field_value_standard(
1363 field: ©book_core::Field,
1364 field_data: &[u8],
1365 options: &DecodeOptions,
1366 scratch_buffers: &mut Option<crate::memory::ScratchBuffers>,
1367) -> Result<Value> {
1368 use copybook_core::FieldKind;
1369
1370 match &field.kind {
1371 FieldKind::Alphanum { .. } => {
1372 let text = crate::charset::ebcdic_to_utf8(
1373 field_data,
1374 options.codepage,
1375 options.on_decode_unmappable,
1376 )?;
1377 Ok(Value::String(text))
1378 }
1379 FieldKind::ZonedDecimal {
1380 digits,
1381 scale,
1382 signed,
1383 sign_separate,
1384 } => {
1385 if let Some(sign_sep) = sign_separate {
1386 let decimal = crate::numeric::decode_zoned_decimal_sign_separate(
1387 field_data,
1388 *digits,
1389 *scale,
1390 sign_sep,
1391 options.codepage,
1392 )?;
1393 Ok(zoned_decimal_to_json_value(
1394 &decimal,
1395 *digits,
1396 *scale,
1397 field.blank_when_zero,
1398 options,
1399 ))
1400 } else if options.preserve_zoned_encoding {
1401 let (decimal, _encoding_info) = crate::numeric::decode_zoned_decimal_with_encoding(
1403 field_data,
1404 *digits,
1405 *scale,
1406 *signed,
1407 options.codepage,
1408 field.blank_when_zero,
1409 true, )?;
1411
1412 Ok(zoned_decimal_to_json_value(
1415 &decimal,
1416 *digits,
1417 *scale,
1418 field.blank_when_zero,
1419 options,
1420 ))
1421 } else {
1422 let decimal = crate::numeric::decode_zoned_decimal(
1424 field_data,
1425 *digits,
1426 *scale,
1427 *signed,
1428 options.codepage,
1429 field.blank_when_zero,
1430 )?;
1431 Ok(zoned_decimal_to_json_value(
1432 &decimal,
1433 *digits,
1434 *scale,
1435 field.blank_when_zero,
1436 options,
1437 ))
1438 }
1439 }
1440 FieldKind::BinaryInt { bits, signed } => {
1441 let int_value = crate::numeric::decode_binary_int(field_data, *bits, *signed)?;
1442 let scratch = scratch_buffers.get_or_insert_with(crate::memory::ScratchBuffers::new);
1443 let formatted =
1444 crate::numeric::format_binary_int_to_string_with_scratch(int_value, scratch);
1445 Ok(numeric_string_to_value(formatted, options))
1446 }
1447 FieldKind::PackedDecimal {
1448 digits,
1449 scale,
1450 signed,
1451 } => {
1452 let scratch = scratch_buffers.get_or_insert_with(crate::memory::ScratchBuffers::new);
1453 let decimal_str = crate::numeric::decode_packed_decimal_to_string_with_scratch(
1454 field_data, *digits, *scale, *signed, scratch,
1455 )?;
1456 Ok(numeric_string_to_value(decimal_str, options))
1457 }
1458 FieldKind::Group => {
1459 Err(Error::new(
1461 ErrorCode::CBKD101_INVALID_FIELD_TYPE,
1462 format!(
1463 "Cannot process group field '{name}' as scalar",
1464 name = field.name
1465 ),
1466 ))
1467 }
1468 FieldKind::Condition { values } => {
1469 Ok(condition_value(values, "CONDITION"))
1472 }
1473 FieldKind::Renames { .. } => {
1474 let Some(resolved) = &field.resolved_renames else {
1476 return Err(Error::new(
1477 ErrorCode::CBKD101_INVALID_FIELD_TYPE,
1478 format!(
1479 "RENAMES field '{name}' has no resolved metadata",
1480 name = field.name
1481 ),
1482 ));
1483 };
1484 let alias_start = resolved.offset as usize;
1486 let alias_end = alias_start + resolved.length as usize;
1487
1488 if alias_end > field_data.len() {
1489 return Err(Error::new(
1490 ErrorCode::CBKD301_RECORD_TOO_SHORT,
1491 format!(
1492 "RENAMES field '{name}' at offset {offset} with length {length} exceeds data length {data_len}",
1493 name = field.name,
1494 offset = resolved.offset,
1495 length = resolved.length,
1496 data_len = field_data.len()
1497 ),
1498 ));
1499 }
1500
1501 if resolved.members.len() == 1 {
1504 let alias_data = &field_data[alias_start..alias_end];
1506 let text = crate::charset::ebcdic_to_utf8(
1508 alias_data,
1509 options.codepage,
1510 options.on_decode_unmappable,
1511 )?;
1512 return Ok(Value::String(text));
1513 }
1514 let alias_data = &field_data[alias_start..alias_end];
1516 let text = crate::charset::ebcdic_to_utf8(
1517 alias_data,
1518 options.codepage,
1519 options.on_decode_unmappable,
1520 )?;
1521 Ok(Value::String(text))
1522 }
1523 FieldKind::EditedNumeric {
1524 pic_string, scale, ..
1525 } => {
1526 let raw_str = crate::charset::ebcdic_to_utf8(
1528 field_data,
1529 options.codepage,
1530 options.on_decode_unmappable,
1531 )?;
1532
1533 let pattern = crate::edited_pic::tokenize_edited_pic(pic_string)?;
1535
1536 let numeric_value = crate::edited_pic::decode_edited_numeric(
1538 &raw_str,
1539 &pattern,
1540 *scale,
1541 field.blank_when_zero,
1542 )?;
1543
1544 Ok(numeric_string_to_value(
1546 numeric_value.to_decimal_string(),
1547 options,
1548 ))
1549 }
1550 FieldKind::FloatSingle => {
1551 let value =
1552 crate::numeric::decode_float_single_with_format(field_data, options.float_format)?;
1553 if value.is_nan() || value.is_infinite() {
1554 Ok(Value::Null)
1555 } else {
1556 Ok(Value::Number(
1557 serde_json::Number::from_f64(f64::from(value))
1558 .unwrap_or_else(|| serde_json::Number::from(0)),
1559 ))
1560 }
1561 }
1562 FieldKind::FloatDouble => {
1563 let value =
1564 crate::numeric::decode_float_double_with_format(field_data, options.float_format)?;
1565 if value.is_nan() || value.is_infinite() {
1566 Ok(Value::Null)
1567 } else {
1568 Ok(Value::Number(
1569 serde_json::Number::from_f64(value)
1570 .unwrap_or_else(|| serde_json::Number::from(0)),
1571 ))
1572 }
1573 }
1574 }
1575}
1576
1577#[allow(clippy::too_many_lines)]
1579fn decode_scalar_field_value_with_scratch(
1580 field: ©book_core::Field,
1581 field_data: &[u8],
1582 options: &DecodeOptions,
1583 scratch: &mut crate::memory::ScratchBuffers,
1584) -> Result<Value> {
1585 use copybook_core::FieldKind;
1586
1587 match &field.kind {
1588 FieldKind::Alphanum { .. } => {
1589 let text = crate::charset::ebcdic_to_utf8(
1590 field_data,
1591 options.codepage,
1592 options.on_decode_unmappable,
1593 )?;
1594 Ok(Value::String(text))
1595 }
1596 FieldKind::ZonedDecimal {
1597 digits,
1598 scale,
1599 signed,
1600 sign_separate,
1601 } => {
1602 if let Some(sign_sep) = sign_separate {
1603 let decimal = crate::numeric::decode_zoned_decimal_sign_separate(
1604 field_data,
1605 *digits,
1606 *scale,
1607 sign_sep,
1608 options.codepage,
1609 )?;
1610 Ok(zoned_decimal_to_json_value(
1611 &decimal,
1612 *digits,
1613 *scale,
1614 field.blank_when_zero,
1615 options,
1616 ))
1617 } else {
1618 let decimal_str = crate::numeric::decode_zoned_decimal_to_string_with_scratch(
1619 field_data,
1620 *digits,
1621 *scale,
1622 *signed,
1623 options.codepage,
1624 field.blank_when_zero,
1625 scratch,
1626 )?;
1627 Ok(numeric_string_to_value(decimal_str, options))
1628 }
1629 }
1630 FieldKind::BinaryInt { bits, signed } => {
1631 let int_value = crate::numeric::decode_binary_int(field_data, *bits, *signed)?;
1632 let formatted =
1633 crate::numeric::format_binary_int_to_string_with_scratch(int_value, scratch);
1634 Ok(numeric_string_to_value(formatted, options))
1635 }
1636 FieldKind::PackedDecimal {
1637 digits,
1638 scale,
1639 signed,
1640 } => {
1641 let decimal_str = crate::numeric::decode_packed_decimal_to_string_with_scratch(
1642 field_data, *digits, *scale, *signed, scratch,
1643 )?;
1644 Ok(numeric_string_to_value(decimal_str, options))
1645 }
1646 FieldKind::Group => Err(Error::new(
1647 ErrorCode::CBKD101_INVALID_FIELD_TYPE,
1648 format!(
1649 "Cannot process group field '{name}' as scalar",
1650 name = field.name
1651 ),
1652 )),
1653 FieldKind::Condition { values } => Ok(condition_value(values, "CONDITION")),
1654 FieldKind::Renames { .. } => {
1655 let Some(resolved) = &field.resolved_renames else {
1657 return Err(Error::new(
1658 ErrorCode::CBKD101_INVALID_FIELD_TYPE,
1659 format!(
1660 "RENAMES field '{name}' has no resolved metadata",
1661 name = field.name
1662 ),
1663 ));
1664 };
1665 let alias_start = resolved.offset as usize;
1667 let alias_end = alias_start + resolved.length as usize;
1668
1669 if alias_end > field_data.len() {
1670 return Err(Error::new(
1671 ErrorCode::CBKD301_RECORD_TOO_SHORT,
1672 format!(
1673 "RENAMES field '{name}' at offset {offset} with length {length} exceeds data length {data_len}",
1674 name = field.name,
1675 offset = resolved.offset,
1676 length = resolved.length,
1677 data_len = field_data.len()
1678 ),
1679 ));
1680 }
1681
1682 let alias_data = &field_data[alias_start..alias_end];
1684 let text = crate::charset::ebcdic_to_utf8(
1685 alias_data,
1686 options.codepage,
1687 options.on_decode_unmappable,
1688 )?;
1689 Ok(Value::String(text))
1690 }
1691 FieldKind::EditedNumeric {
1692 pic_string, scale, ..
1693 } => {
1694 let raw_str = crate::charset::ebcdic_to_utf8(
1696 field_data,
1697 options.codepage,
1698 options.on_decode_unmappable,
1699 )?;
1700
1701 let pattern = crate::edited_pic::tokenize_edited_pic(pic_string)?;
1703
1704 let numeric_value = crate::edited_pic::decode_edited_numeric(
1706 &raw_str,
1707 &pattern,
1708 *scale,
1709 field.blank_when_zero,
1710 )?;
1711
1712 Ok(numeric_string_to_value(
1714 numeric_value.to_decimal_string(),
1715 options,
1716 ))
1717 }
1718 FieldKind::FloatSingle => {
1719 let value =
1720 crate::numeric::decode_float_single_with_format(field_data, options.float_format)?;
1721 if value.is_nan() || value.is_infinite() {
1722 Ok(Value::Null)
1723 } else {
1724 Ok(Value::Number(
1725 serde_json::Number::from_f64(f64::from(value))
1726 .unwrap_or_else(|| serde_json::Number::from(0)),
1727 ))
1728 }
1729 }
1730 FieldKind::FloatDouble => {
1731 let value =
1732 crate::numeric::decode_float_double_with_format(field_data, options.float_format)?;
1733 if value.is_nan() || value.is_infinite() {
1734 Ok(Value::Null)
1735 } else {
1736 Ok(Value::Number(
1737 serde_json::Number::from_f64(value)
1738 .unwrap_or_else(|| serde_json::Number::from(0)),
1739 ))
1740 }
1741 }
1742 }
1743}
1744
1745#[inline]
1749fn condition_value(values: &[String], prefix: &str) -> Value {
1750 if values.is_empty() {
1751 Value::String(prefix.to_owned())
1752 } else {
1753 Value::String(format!("{prefix}({})", values.join("|")))
1754 }
1755}
1756
1757#[inline]
1785#[must_use = "Handle the Result or propagate the error"]
1786pub fn encode_record(schema: &Schema, json: &Value, options: &EncodeOptions) -> Result<Vec<u8>> {
1787 let root_obj = json.as_object().ok_or_else(|| {
1788 Error::new(
1789 ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
1790 "Expected JSON object for record envelope",
1791 )
1792 })?;
1793 let fields_value = if let Some(fields_val) = root_obj.get("fields") {
1794 fields_val.as_object().ok_or_else(|| {
1795 Error::new(
1796 ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
1797 "`fields` must be a JSON object",
1798 )
1799 })?;
1800 fields_val
1801 } else {
1802 json
1803 };
1804
1805 if options.use_raw
1807 && let Some(raw_b64_value) = root_obj
1808 .get("raw_b64")
1809 .or_else(|| root_obj.get("__raw_b64"))
1810 && let Some(raw_str) = raw_b64_value.as_str()
1811 {
1812 let raw_data = base64::engine::general_purpose::STANDARD
1814 .decode(raw_str)
1815 .map_err(|e| {
1816 Error::new(
1817 ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
1818 format!("Invalid base64 in raw_b64: {e}"),
1819 )
1820 })?;
1821
1822 match options.format {
1823 RecordFormat::RDW => {
1824 if raw_data.len() >= 4 {
1826 let mut rdw_record = raw_data.clone();
1827
1828 let payload = &rdw_record[4..];
1830
1831 let mut should_recompute = false;
1833
1834 let field_payload = encode_fields_to_bytes(schema, fields_value, options)?;
1836 if field_payload != payload {
1837 should_recompute = true;
1838 }
1839
1840 if should_recompute {
1841 let capped_len = field_payload.len().min(u16::MAX as usize);
1843 let new_length = u16::try_from(capped_len).unwrap_or(u16::MAX);
1844 let length_bytes = new_length.to_be_bytes();
1845 rdw_record[0] = length_bytes[0];
1846 rdw_record[1] = length_bytes[1];
1847 rdw_record.splice(4.., field_payload);
1851 }
1852
1853 return Ok(rdw_record);
1854 }
1855 }
1856 RecordFormat::Fixed => {
1857 return Ok(raw_data);
1858 }
1859 }
1860 }
1861
1862 validate_lib_api_redefines_encoding(schema, fields_value, options)?;
1864 validate_lib_api_odo_encoding(schema, fields_value, options)?;
1865
1866 match options.format {
1867 RecordFormat::Fixed => {
1868 let payload = encode_fields_to_bytes(schema, fields_value, options)?;
1869 Ok(payload)
1870 }
1871 RecordFormat::RDW => {
1872 let payload = encode_fields_to_bytes(schema, fields_value, options)?;
1873
1874 let rdw_record = crate::record::RDWRecord::try_new(payload)?;
1876 let mut result = Vec::new();
1877 result.extend_from_slice(&rdw_record.header);
1878 result.extend_from_slice(&rdw_record.payload);
1879 Ok(result)
1880 }
1881 }
1882}
1883
1884fn validate_lib_api_redefines_encoding(
1886 schema: &Schema,
1887 json_value: &Value,
1888 options: &EncodeOptions,
1889) -> Result<()> {
1890 let redefines_context = crate::odo_redefines::build_redefines_context(schema, json_value);
1891
1892 for (cluster_path, non_null_views) in &redefines_context.cluster_views {
1893 let field_path = non_null_views
1894 .first()
1895 .cloned()
1896 .unwrap_or_else(|| cluster_path.clone());
1897
1898 let byte_offset = non_null_views
1899 .iter()
1900 .find_map(|view| schema.find_field(view).map(|field| u64::from(field.offset)))
1901 .or_else(|| {
1902 schema
1903 .find_field(cluster_path)
1904 .map(|field| u64::from(field.offset))
1905 })
1906 .unwrap_or(0);
1907
1908 crate::odo_redefines::validate_redefines_encoding(
1909 &redefines_context,
1910 cluster_path,
1911 &field_path,
1912 json_value,
1913 options.use_raw,
1914 0,
1915 byte_offset,
1916 )?;
1917 }
1918
1919 Ok(())
1920}
1921
1922fn validate_lib_api_odo_encoding(
1924 schema: &Schema,
1925 json_value: &Value,
1926 options: &EncodeOptions,
1927) -> Result<()> {
1928 let Some(tail_odo) = &schema.tail_odo else {
1929 return Ok(());
1930 };
1931
1932 let fields_value = if let Some(fields_value) = json_value.get("fields") {
1933 fields_value
1934 } else {
1935 json_value
1936 };
1937
1938 let has_wrapper = json_value.get("fields").is_some();
1939 if !fields_value.is_object() {
1940 if has_wrapper {
1941 return Err(Error::new(
1942 ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
1943 "`fields` must be a JSON object",
1944 ));
1945 }
1946 return Ok(());
1947 }
1948
1949 if let Some(array) = json_lookup_array(fields_value, &tail_odo.array_path) {
1950 let array_field = schema.find_field(&tail_odo.array_path).ok_or_else(|| {
1951 Error::new(
1952 ErrorCode::CBKS121_COUNTER_NOT_FOUND,
1953 format!(
1954 "ODO array field '{}' not found in schema",
1955 tail_odo.array_path
1956 ),
1957 )
1958 .with_context(crate::odo_redefines::create_comprehensive_error_context(
1959 0,
1960 &tail_odo.array_path,
1961 0,
1962 None,
1963 ))
1964 })?;
1965
1966 let counter_field = schema.find_field(&tail_odo.counter_path).ok_or_else(|| {
1967 crate::odo_redefines::handle_missing_counter_field(
1968 &tail_odo.counter_path,
1969 &tail_odo.array_path,
1970 schema,
1971 0,
1972 0,
1973 )
1974 })?;
1975
1976 if json_lookup_value(fields_value, &tail_odo.counter_path).is_none() {
1977 return Err(crate::odo_redefines::handle_missing_counter_field(
1978 &tail_odo.counter_path,
1979 &tail_odo.array_path,
1980 schema,
1981 0,
1982 u64::from(counter_field.offset),
1983 ));
1984 }
1985
1986 let context = crate::odo_redefines::OdoValidationContext {
1987 field_path: tail_odo.array_path.clone(),
1988 counter_path: tail_odo.counter_path.clone(),
1989 record_index: 0,
1990 byte_offset: u64::from(array_field.offset),
1991 };
1992
1993 crate::odo_redefines::validate_odo_encode(
1994 array.len(),
1995 tail_odo.min_count,
1996 tail_odo.max_count,
1997 &context,
1998 options,
1999 )?;
2000 }
2001
2002 Ok(())
2003}
2004
2005fn json_lookup_value<'a>(value: &'a Value, field_path: &str) -> Option<&'a Value> {
2006 let mut current = value;
2007 for segment in field_path.split('.') {
2008 current = current.as_object()?.get(segment)?;
2009 }
2010 Some(current)
2011}
2012
2013fn json_lookup_array<'a>(value: &'a Value, field_path: &str) -> Option<&'a Vec<Value>> {
2014 let leaf = field_path.split('.').next_back().unwrap_or("");
2015 match json_lookup_value(value, field_path) {
2016 Some(Value::Array(array)) => Some(array),
2017 _ => {
2018 if let Value::Object(obj) = value {
2019 obj.get(leaf).and_then(|candidate| candidate.as_array())
2020 } else {
2021 None
2022 }
2023 }
2024 }
2025}
2026
2027fn encode_fields_to_bytes(
2029 schema: &Schema,
2030 json: &Value,
2031 options: &EncodeOptions,
2032) -> Result<Vec<u8>> {
2033 let record_length = schema.lrecl_fixed.unwrap_or_else(|| {
2034 schema.fields.iter().map(|f| f.len).sum::<u32>()
2036 }) as usize;
2037
2038 let mut buffer = vec![0u8; record_length];
2039
2040 if let Some(obj) = json.as_object() {
2041 let encoding_metadata = obj
2042 .get("_encoding_metadata")
2043 .and_then(|value| value.as_object());
2044 encode_fields_recursive(
2045 &schema.fields,
2046 obj,
2047 encoding_metadata,
2048 "",
2049 &mut buffer,
2050 0,
2051 options,
2052 )?;
2053 }
2054
2055 Ok(buffer)
2056}
2057
2058fn encode_fields_recursive(
2060 fields: &[copybook_core::Field],
2061 json_obj: &serde_json::Map<String, Value>,
2062 encoding_metadata: Option<&serde_json::Map<String, Value>>,
2063 path_prefix: &str,
2064 buffer: &mut [u8],
2065 offset: usize,
2066 options: &EncodeOptions,
2067) -> Result<usize> {
2068 let mut current_offset = offset;
2069
2070 for field in fields {
2071 let field_path = if path_prefix.is_empty() {
2072 field.name.clone()
2073 } else {
2074 format!("{path_prefix}.{}", field.name)
2075 };
2076
2077 current_offset = encode_single_field(
2078 field,
2079 &field_path,
2080 json_obj,
2081 encoding_metadata,
2082 buffer,
2083 current_offset,
2084 options,
2085 )?;
2086 }
2087
2088 Ok(current_offset)
2089}
2090
2091#[inline]
2096#[allow(clippy::too_many_lines)]
2097fn encode_single_field(
2098 field: ©book_core::Field,
2099 field_path: &str,
2100 json_obj: &serde_json::Map<String, Value>,
2101 encoding_metadata: Option<&serde_json::Map<String, Value>>,
2102 buffer: &mut [u8],
2103 current_offset: usize,
2104 options: &EncodeOptions,
2105) -> Result<usize> {
2106 use copybook_core::FieldKind;
2107
2108 match &field.kind {
2109 FieldKind::Group => encode_group_field(
2110 field,
2111 field_path,
2112 json_obj,
2113 encoding_metadata,
2114 buffer,
2115 current_offset,
2116 options,
2117 ),
2118 FieldKind::Alphanum { .. } => {
2119 encode_alphanum_field(field, json_obj, buffer, current_offset, options)
2120 }
2121 FieldKind::ZonedDecimal {
2122 digits,
2123 scale,
2124 signed,
2125 sign_separate,
2126 } => {
2127 if let Some(sign_sep) = sign_separate {
2128 let field_len = field.len as usize;
2129 if let Some(text) = json_obj.get(&field.name).and_then(|v| v.as_str()) {
2130 crate::numeric::encode_zoned_decimal_sign_separate(
2131 text,
2132 *digits,
2133 *scale,
2134 sign_sep,
2135 options.codepage,
2136 &mut buffer[current_offset..current_offset + field_len],
2137 )?;
2138 }
2139 Ok(current_offset + field_len)
2140 } else {
2141 encode_zoned_decimal_field(
2142 field,
2143 field_path,
2144 json_obj,
2145 encoding_metadata,
2146 buffer,
2147 current_offset,
2148 options,
2149 DecimalSpec {
2150 digits: *digits,
2151 scale: *scale,
2152 signed: *signed,
2153 },
2154 )
2155 }
2156 }
2157 FieldKind::PackedDecimal {
2158 digits,
2159 scale,
2160 signed,
2161 } => encode_packed_decimal_field(
2162 field,
2163 field_path,
2164 json_obj,
2165 buffer,
2166 current_offset,
2167 options,
2168 DecimalSpec {
2169 digits: *digits,
2170 scale: *scale,
2171 signed: *signed,
2172 },
2173 ),
2174 FieldKind::BinaryInt { bits, signed } => encode_binary_int_field(
2175 field,
2176 field_path,
2177 json_obj,
2178 buffer,
2179 current_offset,
2180 options,
2181 BinarySpec {
2182 bits: *bits,
2183 signed: *signed,
2184 },
2185 ),
2186 FieldKind::Condition { .. } => Ok(current_offset),
2187 FieldKind::Renames { .. } => {
2188 Ok(current_offset)
2192 }
2193 FieldKind::EditedNumeric {
2194 pic_string, scale, ..
2195 } => {
2196 if let Some(text) = json_obj
2198 .get(&field.name)
2199 .and_then(|v| coerce_to_str(v, options.coerce_numbers))
2200 {
2201 let pattern = crate::edited_pic::tokenize_edited_pic(pic_string)?;
2203
2204 let encoded = crate::edited_pic::encode_edited_numeric(
2206 &text,
2207 &pattern,
2208 *scale,
2209 field.blank_when_zero,
2210 )?;
2211
2212 let bytes = crate::charset::utf8_to_ebcdic(&encoded, options.codepage)?;
2214 let field_len = field.len as usize;
2215 let copy_len = bytes.len().min(field_len);
2216
2217 if current_offset + field_len <= buffer.len() {
2218 buffer[current_offset..current_offset + copy_len]
2219 .copy_from_slice(&bytes[..copy_len]);
2220 let space = crate::charset::space_byte(options.codepage);
2222 buffer[current_offset + copy_len..current_offset + field_len].fill(space);
2223 }
2224 }
2225 Ok(current_offset + field.len as usize)
2226 }
2227 FieldKind::FloatSingle => {
2228 let field_len = field.len as usize;
2229 if let Some(val) = json_obj.get(&field.name) {
2230 let f = match val {
2231 Value::Number(n) => {
2232 let f64_val = n.as_f64().unwrap_or(0.0);
2233 if f64_val.is_finite()
2235 && (f64_val > f64::from(f32::MAX) || f64_val < f64::from(f32::MIN))
2236 {
2237 return Err(Error::new(
2238 ErrorCode::CBKE531_FLOAT_ENCODE_OVERFLOW,
2239 format!("Value overflow for COMP-1 field '{}'", field.name),
2240 ));
2241 }
2242 #[allow(clippy::cast_possible_truncation)]
2244 {
2245 f64_val as f32
2246 }
2247 }
2248 Value::String(s) => s.parse::<f32>().map_err(|e| {
2249 Error::new(
2250 ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
2251 format!(
2252 "Cannot parse '{}' as f32 for field '{}': {}",
2253 s, field.name, e
2254 ),
2255 )
2256 })?,
2257 Value::Null => f32::NAN,
2258 _ => {
2259 return Err(Error::new(
2260 ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
2261 format!("Expected number for COMP-1 field '{}'", field.name),
2262 ));
2263 }
2264 };
2265 if current_offset + field_len <= buffer.len() {
2266 crate::numeric::encode_float_single_with_format(
2267 f,
2268 &mut buffer[current_offset..current_offset + field_len],
2269 options.float_format,
2270 )?;
2271 }
2272 }
2273 Ok(current_offset + field_len)
2274 }
2275 FieldKind::FloatDouble => {
2276 let field_len = field.len as usize;
2277 if let Some(val) = json_obj.get(&field.name) {
2278 let f = match val {
2279 Value::Number(n) => n.as_f64().unwrap_or(0.0),
2280 Value::String(s) => s.parse::<f64>().map_err(|e| {
2281 Error::new(
2282 ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
2283 format!(
2284 "Cannot parse '{}' as f64 for field '{}': {}",
2285 s, field.name, e
2286 ),
2287 )
2288 })?,
2289 Value::Null => f64::NAN,
2290 _ => {
2291 return Err(Error::new(
2292 ErrorCode::CBKE501_JSON_TYPE_MISMATCH,
2293 format!("Expected number for COMP-2 field '{}'", field.name),
2294 ));
2295 }
2296 };
2297 if current_offset + field_len <= buffer.len() {
2298 crate::numeric::encode_float_double_with_format(
2299 f,
2300 &mut buffer[current_offset..current_offset + field_len],
2301 options.float_format,
2302 )?;
2303 }
2304 }
2305 Ok(current_offset + field_len)
2306 }
2307 }
2308}
2309
2310#[inline]
2312fn encode_group_field(
2313 field: ©book_core::Field,
2314 field_path: &str,
2315 json_obj: &serde_json::Map<String, Value>,
2316 encoding_metadata: Option<&serde_json::Map<String, Value>>,
2317 buffer: &mut [u8],
2318 current_offset: usize,
2319 options: &EncodeOptions,
2320) -> Result<usize> {
2321 if let Some(sub_obj) = json_obj.get(&field.name).and_then(|v| v.as_object()) {
2322 encode_fields_recursive(
2323 &field.children,
2324 sub_obj,
2325 encoding_metadata,
2326 field_path,
2327 buffer,
2328 current_offset,
2329 options,
2330 )
2331 } else {
2332 encode_fields_recursive(
2333 &field.children,
2334 json_obj,
2335 encoding_metadata,
2336 field_path,
2337 buffer,
2338 current_offset,
2339 options,
2340 )
2341 }
2342}
2343
2344#[inline]
2346fn encode_alphanum_field(
2347 field: ©book_core::Field,
2348 json_obj: &serde_json::Map<String, Value>,
2349 buffer: &mut [u8],
2350 current_offset: usize,
2351 options: &EncodeOptions,
2352) -> Result<usize> {
2353 let field_len = field.len as usize;
2354
2355 if let Some(text) = json_obj.get(&field.name).and_then(|value| value.as_str()) {
2356 if text.len() > field_len {
2358 return Err(Error::new(
2359 ErrorCode::CBKE515_STRING_LENGTH_VIOLATION,
2360 format!(
2361 "String length {} exceeds field capacity {} for alphanumeric field {}",
2362 text.len(),
2363 field_len,
2364 field.path
2365 ),
2366 )
2367 .with_field(field.path.clone()));
2368 }
2369
2370 let bytes = crate::charset::utf8_to_ebcdic(text, options.codepage)?;
2371 let copy_len = bytes.len().min(field_len);
2372
2373 if current_offset + field_len <= buffer.len() {
2374 buffer[current_offset..current_offset + copy_len].copy_from_slice(&bytes[..copy_len]);
2375 let space = crate::charset::space_byte(options.codepage);
2377 buffer[current_offset + copy_len..current_offset + field_len].fill(space);
2378 }
2379 }
2380
2381 Ok(current_offset + field_len)
2382}
2383
2384#[derive(Copy, Clone)]
2385struct DecimalSpec {
2386 digits: u16,
2387 scale: i16,
2388 signed: bool,
2389}
2390
2391fn resolve_preserved_zoned_format(
2392 metadata: &serde_json::Map<String, Value>,
2393 field_path: &str,
2394 field_name: &str,
2395) -> Option<ZonedEncodingFormat> {
2396 let candidates = [field_path, field_name];
2397 for key in candidates {
2398 if let Some(format) = metadata
2399 .get(key)
2400 .and_then(parse_zoned_encoding_metadata_value)
2401 {
2402 return Some(format);
2403 }
2404 }
2405 None
2406}
2407
2408fn parse_zoned_encoding_metadata_value(value: &Value) -> Option<ZonedEncodingFormat> {
2409 match value {
2410 Value::String(s) => parse_zoned_encoding_format_str(s),
2411 Value::Object(map) => map
2412 .get("zoned_encoding")
2413 .and_then(Value::as_str)
2414 .and_then(parse_zoned_encoding_format_str),
2415 _ => None,
2416 }
2417}
2418
2419fn parse_zoned_encoding_format_str(value: &str) -> Option<ZonedEncodingFormat> {
2420 match value.trim().to_ascii_lowercase().as_str() {
2421 "ascii" => Some(ZonedEncodingFormat::Ascii),
2422 "ebcdic" => Some(ZonedEncodingFormat::Ebcdic),
2423 "auto" => Some(ZonedEncodingFormat::Auto),
2424 _ => None,
2425 }
2426}
2427
2428fn coerce_to_str(value: &Value, coerce: bool) -> Option<String> {
2434 match value {
2435 Value::String(s) => Some(s.clone()),
2436 Value::Number(n) if coerce => Some(n.to_string()),
2437 _ => None,
2438 }
2439}
2440
2441#[inline]
2442#[allow(clippy::too_many_arguments)]
2443fn encode_zoned_decimal_field(
2444 field: ©book_core::Field,
2445 field_path: &str,
2446 json_obj: &serde_json::Map<String, Value>,
2447 encoding_metadata: Option<&serde_json::Map<String, Value>>,
2448 buffer: &mut [u8],
2449 current_offset: usize,
2450 options: &EncodeOptions,
2451 spec: DecimalSpec,
2452) -> Result<usize> {
2453 let field_len = field.len as usize;
2454
2455 if let Some(text) = json_obj
2456 .get(&field.name)
2457 .and_then(|v| coerce_to_str(v, options.coerce_numbers))
2458 {
2459 if field.blank_when_zero && options.bwz_encode {
2462 let encoded = crate::numeric::encode_zoned_decimal_with_bwz(
2463 &text,
2464 spec.digits,
2465 spec.scale,
2466 spec.signed,
2467 options.codepage,
2468 options.bwz_encode,
2469 )?;
2470 if current_offset + field_len <= buffer.len() && encoded.len() == field_len {
2471 buffer[current_offset..current_offset + field_len].copy_from_slice(&encoded);
2472 }
2473 return Ok(current_offset + field_len);
2474 }
2475
2476 let preserved_format = encoding_metadata
2477 .and_then(|meta| resolve_preserved_zoned_format(meta, field_path, &field.name));
2478 let resolved_format = options
2479 .zoned_encoding_override
2480 .or(preserved_format)
2481 .unwrap_or(options.preferred_zoned_encoding);
2482 let (effective_format, zero_policy) = match resolved_format {
2484 ZonedEncodingFormat::Ascii => (ZonedEncodingFormat::Ascii, ZeroSignPolicy::Positive),
2485 ZonedEncodingFormat::Ebcdic => (ZonedEncodingFormat::Ebcdic, ZeroSignPolicy::Preferred),
2486 ZonedEncodingFormat::Auto => {
2487 if options.codepage.is_ascii() {
2488 (ZonedEncodingFormat::Ascii, ZeroSignPolicy::Positive)
2489 } else {
2490 (ZonedEncodingFormat::Ebcdic, ZeroSignPolicy::Preferred)
2491 }
2492 }
2493 };
2494
2495 let encoded = crate::numeric::encode_zoned_decimal_with_format_and_policy(
2496 &text,
2497 spec.digits,
2498 spec.scale,
2499 spec.signed,
2500 options.codepage,
2501 Some(effective_format),
2502 zero_policy,
2503 )?;
2504
2505 if current_offset + field_len <= buffer.len() && encoded.len() == field_len {
2506 buffer[current_offset..current_offset + field_len].copy_from_slice(&encoded);
2507 }
2508 }
2509
2510 Ok(current_offset + field_len)
2511}
2512
2513#[inline]
2514fn encode_packed_decimal_field(
2515 field: ©book_core::Field,
2516 _field_path: &str,
2517 json_obj: &serde_json::Map<String, Value>,
2518 buffer: &mut [u8],
2519 current_offset: usize,
2520 options: &EncodeOptions,
2521 spec: DecimalSpec,
2522) -> Result<usize> {
2523 let field_len = field.len as usize;
2524
2525 if let Some(text) = json_obj
2526 .get(&field.name)
2527 .and_then(|v| coerce_to_str(v, options.coerce_numbers))
2528 {
2529 let encoded =
2530 crate::numeric::encode_packed_decimal(&text, spec.digits, spec.scale, spec.signed)?;
2531 if current_offset + field_len <= buffer.len() && encoded.len() == field_len {
2532 buffer[current_offset..current_offset + field_len].copy_from_slice(&encoded);
2533 }
2534 }
2535
2536 Ok(current_offset + field_len)
2537}
2538
2539#[derive(Copy, Clone)]
2540struct BinarySpec {
2541 bits: u16,
2542 signed: bool,
2543}
2544
2545#[inline]
2546fn encode_binary_int_field(
2547 field: ©book_core::Field,
2548 _field_path: &str,
2549 json_obj: &serde_json::Map<String, Value>,
2550 buffer: &mut [u8],
2551 current_offset: usize,
2552 options: &EncodeOptions,
2553 spec: BinarySpec,
2554) -> Result<usize> {
2555 let field_len = field.len as usize;
2556
2557 if let Some(num) = json_obj.get(&field.name).and_then(|v| {
2558 if let Some(n) = v.as_i64() {
2559 Some(n)
2561 } else if let Some(s) = coerce_to_str(v, options.coerce_numbers) {
2562 s.parse::<i64>().ok()
2563 } else {
2564 None
2565 }
2566 }) {
2567 let encoded = crate::numeric::encode_binary_int(num, spec.bits, spec.signed)?;
2568 if current_offset + field_len <= buffer.len() && encoded.len() == field_len {
2569 buffer[current_offset..current_offset + field_len].copy_from_slice(&encoded);
2570 }
2571 }
2572
2573 Ok(current_offset + field_len)
2574}
2575
2576#[inline]
2600#[must_use = "Handle the Result or propagate the error"]
2601pub fn decode_file_to_jsonl(
2602 schema: &Schema,
2603 input: impl Read,
2604 mut output: impl Write,
2605 options: &DecodeOptions,
2606) -> Result<RunSummary> {
2607 let start_time = std::time::Instant::now();
2608 let mut summary = RunSummary::new();
2609 summary.set_schema_fingerprint(schema.fingerprint.clone());
2610
2611 WARNING_COUNTER.with(|counter| {
2612 *counter.borrow_mut() = 0;
2613 });
2614
2615 match options.format {
2616 RecordFormat::Fixed => {
2617 process_fixed_records(schema, input, &mut output, options, &mut summary)?;
2618 }
2619 RecordFormat::RDW => {
2620 process_rdw_records(schema, input, &mut output, options, &mut summary)?;
2621 }
2622 }
2623
2624 let elapsed_ms = start_time.elapsed().as_millis();
2625 summary.processing_time_ms = u64::try_from(elapsed_ms).unwrap_or(u64::MAX);
2626 summary.calculate_throughput();
2627 summary.warnings = WARNING_COUNTER.with(|counter| *counter.borrow());
2628 telemetry::record_completion(
2629 summary.processing_time_seconds(),
2630 summary.throughput_mbps,
2631 options,
2632 );
2633 info!(
2634 target: "copybook::decode",
2635 records_processed = summary.records_processed,
2636 records_with_errors = summary.records_with_errors,
2637 warnings = summary.warnings,
2638 bytes_processed = summary.bytes_processed,
2639 elapsed_ms = summary.processing_time_ms,
2640 throughput_mibps = summary.throughput_mbps,
2641 schema_fingerprint = %summary.schema_fingerprint,
2642 codepage = %options.codepage,
2643 format = ?options.format,
2644 strict_mode = options.strict_mode,
2645 raw_mode = ?options.emit_raw,
2646 );
2647
2648 Ok(summary)
2649}
2650
2651fn process_fixed_records<R: Read, W: Write>(
2652 schema: &Schema,
2653 reader: R,
2654 output: &mut W,
2655 options: &DecodeOptions,
2656 summary: &mut RunSummary,
2657) -> Result<()> {
2658 let mut reader = crate::record::FixedRecordReader::new(reader, schema.lrecl_fixed)?;
2659 let mut scratch = crate::memory::ScratchBuffers::new();
2660 let mut record_index = 0u64;
2661
2662 while let Some(record_data) = reader.read_record()? {
2663 record_index += 1;
2664 summary.bytes_processed += record_data.len() as u64;
2665 telemetry::record_read(record_data.len(), options);
2666
2667 let raw_data_for_decode = match options.emit_raw {
2668 crate::options::RawMode::Record => Some(record_data.clone()),
2669 _ => None,
2670 };
2671
2672 match decode_record_with_scratch_and_raw(
2673 schema,
2674 &record_data,
2675 options,
2676 raw_data_for_decode,
2677 record_index,
2678 &mut scratch,
2679 ) {
2680 Ok(json_value) => {
2681 write_json_record(output, &json_value)?;
2682 summary.records_processed += 1;
2683 }
2684 Err(error) => {
2685 summary.records_with_errors += 1;
2686 let family = error.family_prefix();
2687 telemetry::record_error(family);
2688 if options.strict_mode {
2689 return Err(error);
2690 }
2691 }
2692 }
2693 }
2694
2695 Ok(())
2696}
2697
2698fn process_rdw_records<R: Read, W: Write>(
2699 schema: &Schema,
2700 reader: R,
2701 output: &mut W,
2702 options: &DecodeOptions,
2703 summary: &mut RunSummary,
2704) -> Result<()> {
2705 let mut reader = crate::record::RDWRecordReader::new(reader, options.strict_mode);
2706 let mut scratch = crate::memory::ScratchBuffers::new();
2707 let mut record_index = 0u64;
2708
2709 while let Some(rdw_record) = reader.read_record()? {
2710 record_index += 1;
2711 summary.bytes_processed += rdw_record.payload.len() as u64;
2712 telemetry::record_read(rdw_record.payload.len(), options);
2713 if rdw_record.reserved() != 0 {
2714 increment_warning_counter();
2715 }
2716
2717 if let Some(schema_lrecl) = schema.lrecl_fixed
2718 && rdw_record.payload.len() < schema_lrecl as usize
2719 {
2720 let error = Error::new(
2721 ErrorCode::CBKF221_RDW_UNDERFLOW,
2722 format!(
2723 "RDW payload too short: {} bytes, schema requires {} bytes",
2724 rdw_record.payload.len(),
2725 schema_lrecl
2726 ),
2727 );
2728
2729 summary.records_with_errors += 1;
2730 let family = error.family_prefix();
2731 telemetry::record_error(family);
2732 if options.strict_mode {
2733 return Err(error);
2734 }
2735 continue;
2736 }
2737
2738 let full_raw_data = match options.emit_raw {
2739 crate::options::RawMode::RecordRDW => {
2740 let mut full_data =
2741 Vec::with_capacity(rdw_record.header.len() + rdw_record.payload.len());
2742 full_data.extend_from_slice(&rdw_record.header);
2743 full_data.extend_from_slice(&rdw_record.payload);
2744 Some(full_data)
2745 }
2746 crate::options::RawMode::Record => Some(rdw_record.payload.clone()),
2747 _ => None,
2748 };
2749
2750 match decode_record_with_scratch_and_raw(
2751 schema,
2752 &rdw_record.payload,
2753 options,
2754 full_raw_data,
2755 record_index,
2756 &mut scratch,
2757 ) {
2758 Ok(json_value) => {
2759 write_json_record(output, &json_value)?;
2760 summary.records_processed += 1;
2761 }
2762 Err(error) => {
2763 summary.records_with_errors += 1;
2764 let family = error.family_prefix();
2765 telemetry::record_error(family);
2766 if options.strict_mode {
2767 return Err(error);
2768 }
2769 }
2770 }
2771 }
2772
2773 Ok(())
2774}
2775
2776#[inline]
2777fn write_json_record<W: Write>(output: &mut W, value: &Value) -> Result<()> {
2778 if let Err(e) = serde_json::to_writer(&mut *output, value) {
2779 let error = Error::new(ErrorCode::CBKC201_JSON_WRITE_ERROR, e.to_string());
2780 telemetry::record_error(error.family_prefix());
2781 return Err(error);
2782 }
2783
2784 if let Err(e) = writeln!(output) {
2785 let error = Error::new(ErrorCode::CBKC201_JSON_WRITE_ERROR, e.to_string());
2786 telemetry::record_error(error.family_prefix());
2787 return Err(error);
2788 }
2789
2790 Ok(())
2791}
2792
2793pub fn increment_warning_counter() {
2798 WARNING_COUNTER.with(|counter| {
2799 *counter.borrow_mut() += 1;
2800 });
2801}
2802
2803#[inline]
2836#[must_use = "Handle the Result or propagate the error"]
2837pub fn encode_jsonl_to_file(
2838 schema: &Schema,
2839 input: impl Read,
2840 mut output: impl Write,
2841 options: &EncodeOptions,
2842) -> Result<RunSummary> {
2843 let start_time = std::time::Instant::now();
2844 let mut summary = RunSummary::new();
2845 summary.set_schema_fingerprint(schema.fingerprint.clone());
2846
2847 let reader = BufReader::new(input);
2848 let mut record_count = 0u64;
2849
2850 for line in reader.lines() {
2851 let line =
2852 line.map_err(|e| Error::new(ErrorCode::CBKC201_JSON_WRITE_ERROR, e.to_string()))?;
2853
2854 if line.trim().is_empty() {
2855 continue;
2856 }
2857
2858 record_count += 1;
2859
2860 let json_value: Value = serde_json::from_str(&line)
2862 .map_err(|e| Error::new(ErrorCode::CBKE501_JSON_TYPE_MISMATCH, e.to_string()))?;
2863
2864 if let Ok(binary_data) = encode_record(schema, &json_value, options) {
2866 output
2867 .write_all(&binary_data)
2868 .map_err(|e| Error::new(ErrorCode::CBKC201_JSON_WRITE_ERROR, e.to_string()))?;
2869 summary.bytes_processed += binary_data.len() as u64;
2870 } else {
2871 summary.records_with_errors += 1;
2872 if options.strict_mode {
2873 break;
2874 }
2875 }
2876 }
2877
2878 summary.records_processed = record_count;
2879 let elapsed_ms = start_time.elapsed().as_millis();
2880 summary.processing_time_ms = u64::try_from(elapsed_ms).unwrap_or(u64::MAX);
2881 summary.calculate_throughput();
2882
2883 Ok(summary)
2884}
2885
2886fn format_zoned_decimal_with_digits(
2888 decimal: &crate::numeric::SmallDecimal,
2889 digits: u16,
2890 blank_when_zero: bool,
2891) -> String {
2892 use std::fmt::Write;
2893
2894 if blank_when_zero {
2896 return decimal.to_string();
2897 }
2898
2899 if decimal.value == 0 {
2902 let natural_format = decimal.to_string();
2903 if natural_format == "0" {
2904 return "0".to_string();
2905 }
2906 }
2907
2908 let mut result = String::new();
2910 let value = decimal.value;
2911 let negative = decimal.negative && value != 0;
2912
2913 if negative {
2914 result.push('-');
2915 }
2916
2917 if decimal.scale <= 0 {
2919 let scaled_value = if decimal.scale < 0 {
2920 let exponent = u32::from(decimal.scale.unsigned_abs());
2921 value * 10_i64.pow(exponent)
2922 } else {
2923 value
2924 };
2925 if write!(result, "{:0width$}", scaled_value, width = digits as usize).is_err() {
2926 result.push('0');
2928 }
2929 } else {
2930 result.push_str(&decimal.to_string());
2932 }
2933
2934 result
2935}
2936
2937#[inline]
2938fn small_decimal_to_string(decimal: &crate::numeric::SmallDecimal) -> String {
2939 decimal.to_string()
2940}
2941
2942fn zoned_decimal_to_json_value(
2943 decimal: &crate::numeric::SmallDecimal,
2944 digits: u16,
2945 scale: i16,
2946 blank_when_zero: bool,
2947 options: &DecodeOptions,
2948) -> Value {
2949 let formatted = if scale == 0 {
2950 format_zoned_decimal_with_digits(decimal, digits, blank_when_zero)
2951 } else {
2952 small_decimal_to_string(decimal)
2953 };
2954 numeric_string_to_value(formatted, options)
2955}
2956
2957#[inline]
2958fn decimal_counter_to_u32(
2959 decimal: &crate::numeric::SmallDecimal,
2960 counter_path: &str,
2961) -> Result<u32> {
2962 let text = small_decimal_to_string(decimal);
2963 text.parse::<u32>().map_err(|_| {
2964 Error::new(
2965 ErrorCode::CBKS121_COUNTER_NOT_FOUND,
2966 format!("ODO counter '{counter_path}' has invalid value: {text}"),
2967 )
2968 })
2969}
2970
2971#[cfg(test)]
2972#[allow(clippy::expect_used)]
2973#[allow(clippy::unwrap_used)]
2974mod tests {
2975 use super::*;
2976 use crate::Codepage;
2977 use crate::iterator::RecordIterator;
2978 use copybook_core::{Error, ErrorCode, Result, parse_copybook};
2979 use std::io::Cursor;
2980
2981 #[test]
2982 fn test_decode_record() -> Result<()> {
2983 let copybook_text = r"
2984 01 RECORD.
2985 05 ID PIC 9(3).
2986 05 NAME PIC X(5).
2987 ";
2988
2989 let schema = parse_copybook(copybook_text)?;
2990 let options = DecodeOptions {
2991 codepage: Codepage::ASCII, ..DecodeOptions::default()
2993 };
2994 let data = b"001ALICE";
2995
2996 let result = decode_record(&schema, data, &options)?;
2997 assert!(result.is_object());
2998 let object = result.as_object().ok_or_else(|| {
2999 Error::new(
3000 ErrorCode::CBKP001_SYNTAX,
3001 "decoded record should be an object".to_string(),
3002 )
3003 })?;
3004 assert!(object.len() > 1);
3005 Ok(())
3006 }
3007
3008 #[test]
3009 fn test_encode_record() -> Result<()> {
3010 let copybook_text = r"
3011 01 RECORD.
3012 05 ID PIC 9(3).
3013 05 NAME PIC X(5).
3014 ";
3015
3016 let schema = parse_copybook(copybook_text)?;
3017 let options = EncodeOptions::default();
3018
3019 let mut json_obj = serde_json::Map::new();
3020 json_obj.insert("ID".into(), Value::String("123".into()));
3021 json_obj.insert("NAME".into(), Value::String("HELLO".into()));
3022 let json = Value::Object(json_obj);
3023
3024 let result = encode_record(&schema, &json, &options)?;
3025 assert!(!result.is_empty());
3026 assert_eq!(result.len(), 8); Ok(())
3030 }
3031
3032 #[test]
3033 fn test_record_iterator() -> Result<()> {
3034 let copybook_text = r"
3035 01 RECORD.
3036 05 ID PIC 9(3).
3037 05 NAME PIC X(5).
3038 ";
3039
3040 let schema = parse_copybook(copybook_text)?;
3041 let options = DecodeOptions::default();
3042
3043 let test_data = vec![0u8; 16]; let cursor = Cursor::new(test_data);
3046
3047 let iterator = RecordIterator::new(cursor, &schema, &options)?;
3048 assert_eq!(iterator.current_record_index(), 0);
3049 assert!(!iterator.is_eof());
3050 Ok(())
3051 }
3052
3053 #[test]
3054 fn test_decode_file_to_jsonl() -> Result<()> {
3055 let copybook_text = r"
3056 01 RECORD.
3057 05 ID PIC 9(3).
3058 05 NAME PIC X(5).
3059 ";
3060
3061 let schema = parse_copybook(copybook_text)?;
3062 let options = DecodeOptions {
3063 codepage: Codepage::ASCII, ..DecodeOptions::default()
3065 };
3066
3067 let input_data = b"001ALICE002BOBBY".to_vec(); let input = Cursor::new(input_data);
3070
3071 let mut output = Vec::new();
3073
3074 let summary = decode_file_to_jsonl(&schema, input, &mut output, &options)?;
3075 assert!(summary.records_processed > 0);
3076 assert!(!output.is_empty());
3077 Ok(())
3078 }
3079
3080 #[test]
3081 fn test_encode_jsonl_to_file() -> Result<()> {
3082 let copybook_text = r"
3083 01 RECORD.
3084 05 ID PIC 9(3).
3085 05 NAME PIC X(5).
3086 ";
3087
3088 let schema = parse_copybook(copybook_text)?;
3089 let options = EncodeOptions::default();
3090
3091 let jsonl_data = "{\"__status\":\"test\"}\n{\"__status\":\"test2\"}";
3093 let input = Cursor::new(jsonl_data.as_bytes());
3094
3095 let mut output = Vec::new();
3097
3098 let summary = encode_jsonl_to_file(&schema, input, &mut output, &options)?;
3099 assert_eq!(summary.records_processed, 2);
3100 assert!(!output.is_empty());
3101 Ok(())
3102 }
3103}