1use super::transform::TransformResult;
24use std::collections::HashMap;
25
26const COL_SEP: u8 = 0x00;
27const VAL_SEP: u8 = 0x01;
28const METADATA_VERSION_UNIFORM: u8 = 1;
29const METADATA_VERSION_GROUPED: u8 = 2;
30
31const MIN_GROUP_ROWS: usize = 5;
33
34type SchemaGroup = (Vec<Vec<u8>>, Vec<(usize, Vec<Vec<u8>>)>);
36
37fn extract_value(line: &[u8], mut pos: usize) -> Option<(Vec<u8>, usize)> {
43 while pos < line.len() && line[pos].is_ascii_whitespace() {
45 pos += 1;
46 }
47 if pos >= line.len() {
48 return None;
49 }
50
51 let start = pos;
52 match line[pos] {
53 b'"' => {
54 pos += 1;
56 let mut escaped = false;
57 while pos < line.len() {
58 if escaped {
59 escaped = false;
60 } else if line[pos] == b'\\' {
61 escaped = true;
62 } else if line[pos] == b'"' {
63 pos += 1;
64 return Some((line[start..pos].to_vec(), pos));
65 }
66 pos += 1;
67 }
68 None }
70 b'{' => {
71 let mut depth = 1;
73 pos += 1;
74 while pos < line.len() && depth > 0 {
75 match line[pos] {
76 b'"' => {
77 pos += 1;
79 let mut escaped = false;
80 while pos < line.len() {
81 if escaped {
82 escaped = false;
83 } else if line[pos] == b'\\' {
84 escaped = true;
85 } else if line[pos] == b'"' {
86 break;
87 }
88 pos += 1;
89 }
90 }
91 b'{' => depth += 1,
92 b'}' => depth -= 1,
93 _ => {}
94 }
95 pos += 1;
96 }
97 if depth != 0 || pos > line.len() {
98 return None; }
100 Some((line[start..pos].to_vec(), pos))
101 }
102 b'[' => {
103 let mut depth = 1;
105 pos += 1;
106 while pos < line.len() && depth > 0 {
107 match line[pos] {
108 b'"' => {
109 pos += 1;
110 let mut escaped = false;
111 while pos < line.len() {
112 if escaped {
113 escaped = false;
114 } else if line[pos] == b'\\' {
115 escaped = true;
116 } else if line[pos] == b'"' {
117 break;
118 }
119 pos += 1;
120 }
121 }
122 b'[' => depth += 1,
123 b']' => depth -= 1,
124 _ => {}
125 }
126 pos += 1;
127 }
128 if depth != 0 || pos > line.len() {
129 return None; }
131 Some((line[start..pos].to_vec(), pos))
132 }
133 _ => {
134 while pos < line.len() {
136 match line[pos] {
137 b',' | b'}' | b']' => break,
138 _ if line[pos].is_ascii_whitespace() => break,
139 _ => pos += 1,
140 }
141 }
142 if pos == start {
143 None
144 } else {
145 Some((line[start..pos].to_vec(), pos))
146 }
147 }
148 }
149}
150
151type ParsedLine = (Vec<Vec<u8>>, Vec<Vec<u8>>);
162
163fn parse_line(line: &[u8]) -> Option<ParsedLine> {
164 let mut pos = 0;
165
166 while pos < line.len() && line[pos].is_ascii_whitespace() {
168 pos += 1;
169 }
170 if pos >= line.len() || line[pos] != b'{' {
171 return None;
172 }
173
174 let mut parts: Vec<Vec<u8>> = Vec::new();
175 let mut values: Vec<Vec<u8>> = Vec::new();
176 let mut part_start = 0;
177
178 pos += 1; loop {
181 while pos < line.len() && line[pos].is_ascii_whitespace() {
183 pos += 1;
184 }
185 if pos >= line.len() {
186 return None;
187 }
188
189 if line[pos] == b'}' {
191 parts.push(line[part_start..].to_vec());
193 break;
194 }
195
196 if line[pos] != b'"' {
198 return None;
199 }
200 pos += 1;
202 let mut escaped = false;
203 while pos < line.len() {
204 if escaped {
205 escaped = false;
206 } else if line[pos] == b'\\' {
207 escaped = true;
208 } else if line[pos] == b'"' {
209 pos += 1;
210 break;
211 }
212 pos += 1;
213 }
214
215 while pos < line.len() && line[pos].is_ascii_whitespace() {
217 pos += 1;
218 }
219 if pos >= line.len() || line[pos] != b':' {
220 return None;
221 }
222 pos += 1; parts.push(line[part_start..pos].to_vec());
226
227 let (value, value_end) = extract_value(line, pos)?;
229 values.push(value);
230 pos = value_end;
231
232 part_start = pos;
234
235 while pos < line.len() && line[pos].is_ascii_whitespace() {
237 pos += 1;
238 }
239 if pos >= line.len() {
240 return None;
241 }
242
243 if line[pos] == b',' {
245 pos += 1;
246 } else if line[pos] == b'}' {
247 parts.push(line[part_start..].to_vec());
251 break;
252 } else {
253 return None; }
255 }
256
257 if values.is_empty() {
258 return None;
259 }
260
261 Some((parts, values))
262}
263
264fn split_lines(data: &[u8]) -> Vec<&[u8]> {
266 let mut lines: Vec<&[u8]> = Vec::new();
267 let mut start = 0;
268 for i in 0..data.len() {
269 if data[i] == b'\n' {
270 lines.push(&data[start..i]);
271 start = i + 1;
272 }
273 }
274 if start < data.len() {
275 lines.push(&data[start..]);
276 }
277 lines
278}
279
280fn build_uniform_columnar(
283 template_parts: &[Vec<u8>],
284 columns: &[Vec<Vec<u8>>],
285 num_rows: usize,
286 has_trailing_newline: bool,
287) -> (Vec<u8>, Vec<u8>) {
288 let num_cols = columns.len();
289
290 let mut col_data = Vec::new();
292 for (ci, col) in columns.iter().enumerate() {
293 for (ri, val) in col.iter().enumerate() {
294 col_data.extend_from_slice(val);
295 if ri < num_rows - 1 {
296 col_data.push(VAL_SEP);
297 }
298 }
299 if ci < num_cols - 1 {
300 col_data.push(COL_SEP);
301 }
302 }
303
304 let mut metadata = Vec::new();
306 metadata.push(METADATA_VERSION_UNIFORM);
307 metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
308 metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
309 metadata.push(if has_trailing_newline { 1 } else { 0 });
310 metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
311 for part in template_parts {
312 metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
313 metadata.extend_from_slice(part);
314 }
315
316 (col_data, metadata)
317}
318
319fn preprocess_uniform(
322 non_empty: &[&[u8]],
323 has_trailing_newline: bool,
324) -> Option<(Vec<u8>, Vec<u8>)> {
325 if non_empty.len() < 2 {
326 return None;
327 }
328
329 let (template_parts, first_values) = parse_line(non_empty[0])?;
330 let num_cols = first_values.len();
331 if template_parts.len() != num_cols + 1 {
332 return None;
333 }
334
335 let mut columns: Vec<Vec<Vec<u8>>> = Vec::with_capacity(num_cols);
336 for v in &first_values {
337 columns.push(vec![v.clone()]);
338 }
339
340 for &line in &non_empty[1..] {
341 let (parts, values) = parse_line(line)?;
342 if values.len() != num_cols || parts.len() != template_parts.len() {
343 return None;
344 }
345 for (a, b) in parts.iter().zip(template_parts.iter()) {
346 if a != b {
347 return None;
348 }
349 }
350 for (col, val) in values.iter().enumerate() {
351 columns[col].push(val.clone());
352 }
353 }
354
355 Some(build_uniform_columnar(
356 &template_parts,
357 &columns,
358 non_empty.len(),
359 has_trailing_newline,
360 ))
361}
362
363fn preprocess_grouped(
384 non_empty: &[&[u8]],
385 has_trailing_newline: bool,
386) -> Option<(Vec<u8>, Vec<u8>)> {
387 if non_empty.len() < MIN_GROUP_ROWS {
388 return None;
389 }
390
391 let mut parsed: Vec<Option<ParsedLine>> = Vec::with_capacity(non_empty.len());
394 for &line in non_empty {
395 parsed.push(parse_line(line));
396 }
397
398 let mut group_map: HashMap<Vec<u8>, SchemaGroup> = HashMap::new();
401 let mut residual_indices: Vec<usize> = Vec::new();
402
403 for (idx, parsed_line) in parsed.into_iter().enumerate() {
404 if let Some((parts, values)) = parsed_line {
405 let mut key = Vec::new();
407 for part in &parts {
408 key.extend_from_slice(&(part.len() as u32).to_le_bytes());
409 key.extend_from_slice(part);
410 }
411 group_map
412 .entry(key)
413 .or_insert_with(|| (parts, Vec::new()))
414 .1
415 .push((idx, values));
416 } else {
417 residual_indices.push(idx);
419 }
420 }
421
422 let mut groups: Vec<SchemaGroup> = Vec::new();
424 for (_key, (template_parts, rows)) in group_map {
425 if rows.len() >= MIN_GROUP_ROWS {
426 groups.push((template_parts, rows));
427 } else {
428 for (idx, _) in &rows {
430 residual_indices.push(*idx);
431 }
432 }
433 }
434
435 if groups.is_empty() {
437 return None;
438 }
439
440 groups.sort_by_key(|(_, rows)| rows[0].0);
442 residual_indices.sort_unstable();
443
444 struct GroupOutput {
446 row_indices: Vec<u32>,
447 col_data: Vec<u8>,
448 group_metadata: Vec<u8>,
449 }
450
451 let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
452
453 for (template_parts, rows) in &groups {
454 let num_cols = template_parts.len() - 1;
455 let mut columns: Vec<Vec<Vec<u8>>> = (0..num_cols).map(|_| Vec::new()).collect();
456 let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
457
458 for (idx, values) in rows {
459 row_indices.push(*idx as u32);
460 for (col, val) in values.iter().enumerate() {
461 columns[col].push(val.clone());
462 }
463 }
464
465 let (col_data, group_metadata) =
467 build_uniform_columnar(template_parts, &columns, rows.len(), false);
468
469 group_outputs.push(GroupOutput {
470 row_indices,
471 col_data,
472 group_metadata,
473 });
474 }
475
476 let mut data_out = Vec::new();
478 for group in &group_outputs {
479 data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
480 data_out.extend_from_slice(&group.col_data);
481 }
482
483 let residual_start = data_out.len();
485 for (i, &idx) in residual_indices.iter().enumerate() {
486 data_out.extend_from_slice(non_empty[idx]);
487 if i < residual_indices.len() - 1 {
488 data_out.push(b'\n');
489 }
490 }
491 let _residual_len = data_out.len() - residual_start;
492
493 let mut metadata = Vec::new();
495 metadata.push(METADATA_VERSION_GROUPED);
496 metadata.push(if has_trailing_newline { 1 } else { 0 });
497 metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
498 metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
499
500 for group in &group_outputs {
501 metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
502 for &idx in &group.row_indices {
503 metadata.extend_from_slice(&idx.to_le_bytes());
504 }
505 metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
506 metadata.extend_from_slice(&group.group_metadata);
507 }
508
509 metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
510 for &idx in &residual_indices {
511 metadata.extend_from_slice(&(idx as u32).to_le_bytes());
512 }
513
514 Some((data_out, metadata))
515}
516
517pub(crate) struct NestedGroupInfo {
519 pub(crate) original_col_index: u16,
521 pub(crate) sub_keys: Vec<Vec<u8>>,
523 pub(crate) nested_template: Vec<Vec<u8>>,
526 pub(crate) absence_bitmap: Vec<u8>,
532}
533
534pub(crate) fn flatten_nested_columns(
540 col_data: &[u8],
541 num_rows: usize,
542) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
543 let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
545 if columns.is_empty() || num_rows == 0 {
546 return None;
547 }
548
549 let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
550 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
553
554 for (col_idx, &col_chunk) in columns.iter().enumerate() {
555 let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
556 if values.len() != num_rows {
557 return None;
558 }
559
560 let mut all_objects = true;
562 let mut has_non_null = false;
563 for val in &values {
564 if *val == b"null" {
565 continue;
566 }
567 has_non_null = true;
568 if !val.starts_with(b"{") {
569 all_objects = false;
570 break;
571 }
572 }
573
574 if !all_objects || !has_non_null {
575 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
577 output_columns.push(col_values);
578 continue;
579 }
580
581 let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
585 let mut nested_template: Vec<Vec<u8>> = Vec::new();
586 type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
587 let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
588
589 for val in &values {
590 if *val == b"null" {
591 parsed_rows.push(None);
592 continue;
593 }
594 if nested_template.is_empty() {
595 match parse_nested_object_with_template(val) {
597 Some((template, kv_pairs)) => {
598 for (key, _) in &kv_pairs {
599 if !all_sub_keys.iter().any(|k| k == key) {
600 all_sub_keys.push(key.clone());
601 }
602 }
603 nested_template = template;
604 parsed_rows.push(Some(kv_pairs));
605 }
606 None => {
607 all_sub_keys.clear();
608 break;
609 }
610 }
611 } else {
612 match parse_nested_object_kv(val) {
614 Some(kv_pairs) => {
615 for (key, _) in &kv_pairs {
616 if !all_sub_keys.iter().any(|k| k == key) {
617 all_sub_keys.push(key.clone());
618 }
619 }
620 parsed_rows.push(Some(kv_pairs));
621 }
622 None => {
623 all_sub_keys.clear();
624 break;
625 }
626 }
627 }
628 }
629
630 if all_sub_keys.is_empty() {
631 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
633 output_columns.push(col_values);
634 continue;
635 }
636
637 let num_sub_keys = all_sub_keys.len();
641 let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
642 let total_bits = num_sub_keys * num_rows;
643 let bitmap_bytes = total_bits.div_ceil(8);
644 let mut absence_bitmap = vec![0u8; bitmap_bytes];
645 let mut has_any_absent = false;
646
647 for (row_idx, parsed) in parsed_rows.iter().enumerate() {
648 match parsed {
649 Some(kv_pairs) => {
650 for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
651 let found = kv_pairs.iter().find(|(k, _)| k == sk);
652 match found {
653 Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
654 None => {
655 sub_columns[sk_idx].push(b"null".to_vec());
656 let bit_idx = sk_idx * num_rows + row_idx;
658 absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
659 has_any_absent = true;
660 }
661 }
662 }
663 }
664 None => {
665 for sc in sub_columns.iter_mut() {
669 sc.push(b"null".to_vec());
670 }
671 }
672 }
673 }
674
675 nested_groups.push(NestedGroupInfo {
676 original_col_index: col_idx as u16,
677 sub_keys: all_sub_keys,
678 nested_template,
679 absence_bitmap: if has_any_absent {
680 absence_bitmap
681 } else {
682 Vec::new()
683 },
684 });
685
686 for sc in sub_columns {
687 output_columns.push(sc);
688 }
689 }
690
691 if nested_groups.is_empty() {
692 return None;
693 }
694
695 let num_out_cols = output_columns.len();
697 let mut out = Vec::new();
698 for (ci, col) in output_columns.iter().enumerate() {
699 for (ri, val) in col.iter().enumerate() {
700 out.extend_from_slice(val);
701 if ri < num_rows - 1 {
702 out.push(VAL_SEP);
703 }
704 }
705 if ci < num_out_cols - 1 {
706 out.push(COL_SEP);
707 }
708 }
709
710 Some((out, nested_groups))
711}
712
713#[allow(clippy::type_complexity)]
718pub(crate) fn parse_nested_object_with_template(
719 obj: &[u8],
720) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
721 let mut pos = 0;
722
723 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
725 pos += 1;
726 }
727 if pos >= obj.len() || obj[pos] != b'{' {
728 return None;
729 }
730 pos += 1;
731
732 let mut parts: Vec<Vec<u8>> = Vec::new();
733 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
734 let mut part_start = 0;
735
736 loop {
737 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
739 pos += 1;
740 }
741 if pos >= obj.len() {
742 return None;
743 }
744 if obj[pos] == b'}' {
745 parts.push(obj[part_start..].to_vec());
746 break;
747 }
748
749 if obj[pos] != b'"' {
751 return None;
752 }
753 let key_str_start = pos + 1;
754 pos += 1;
755 let mut escaped = false;
756 while pos < obj.len() {
757 if escaped {
758 escaped = false;
759 } else if obj[pos] == b'\\' {
760 escaped = true;
761 } else if obj[pos] == b'"' {
762 break;
763 }
764 pos += 1;
765 }
766 if pos >= obj.len() {
767 return None;
768 }
769 let key = obj[key_str_start..pos].to_vec();
770 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
774 pos += 1;
775 }
776 if pos >= obj.len() || obj[pos] != b':' {
777 return None;
778 }
779 pos += 1;
780
781 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
783 pos += 1;
784 }
785
786 parts.push(obj[part_start..pos].to_vec());
788
789 let value_start = pos;
791 let (value, value_end) = extract_value(obj, value_start)?;
793 pos = value_end;
794 pairs.push((key, value));
795
796 part_start = pos;
797
798 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
800 pos += 1;
801 }
802 if pos >= obj.len() {
803 return None;
804 }
805 if obj[pos] == b',' {
806 pos += 1;
807 } else if obj[pos] == b'}' {
808 parts.push(obj[part_start..].to_vec());
809 break;
810 } else {
811 return None;
812 }
813 }
814
815 if pairs.is_empty() {
816 return None;
817 }
818 Some((parts, pairs))
819}
820
821pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
825 let mut pos = 0;
826
827 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
829 pos += 1;
830 }
831 if pos >= obj.len() || obj[pos] != b'{' {
832 return None;
833 }
834 pos += 1;
835
836 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
837
838 loop {
839 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
841 pos += 1;
842 }
843 if pos >= obj.len() {
844 return None;
845 }
846 if obj[pos] == b'}' {
847 break;
848 }
849
850 if obj[pos] != b'"' {
852 return None;
853 }
854 pos += 1;
855 let key_start = pos;
856 let mut escaped = false;
857 while pos < obj.len() {
858 if escaped {
859 escaped = false;
860 } else if obj[pos] == b'\\' {
861 escaped = true;
862 } else if obj[pos] == b'"' {
863 break;
864 }
865 pos += 1;
866 }
867 if pos >= obj.len() {
868 return None;
869 }
870 let key = obj[key_start..pos].to_vec();
871 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
875 pos += 1;
876 }
877 if pos >= obj.len() || obj[pos] != b':' {
878 return None;
879 }
880 pos += 1;
881
882 let (value, value_end) = extract_value(obj, pos)?;
884 pos = value_end;
885 pairs.push((key, value));
886
887 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
889 pos += 1;
890 }
891 if pos >= obj.len() {
892 return None;
893 }
894 if obj[pos] == b',' {
895 pos += 1;
896 } else if obj[pos] == b'}' {
897 break;
898 } else {
899 return None;
900 }
901 }
902
903 if pairs.is_empty() {
904 return None;
905 }
906 Some(pairs)
907}
908
909pub(crate) fn unflatten_nested_columns(
914 flat_data: &[u8],
915 nested_groups: &[NestedGroupInfo],
916 num_rows: usize,
917 total_flat_cols: usize,
918) -> Vec<u8> {
919 let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
920 if flat_columns.len() != total_flat_cols {
921 return flat_data.to_vec();
922 }
923
924 let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
926 for chunk in &flat_columns {
927 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
928 if vals.len() != num_rows {
929 return flat_data.to_vec();
930 }
931 flat_col_values.push(vals);
932 }
933
934 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
937
938 let original_num_cols = total_flat_cols
947 - nested_groups
948 .iter()
949 .map(|g| g.sub_keys.len())
950 .sum::<usize>()
951 + nested_groups.len();
952
953 let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
955 for (gi, group) in nested_groups.iter().enumerate() {
956 if (group.original_col_index as usize) < original_num_cols {
957 original_col_map[group.original_col_index as usize] = Some(gi);
958 }
959 }
960
961 let mut flat_idx = 0;
962 for entry in original_col_map.iter().take(original_num_cols) {
963 if let Some(gi) = entry {
964 let group = &nested_groups[*gi];
965 let num_sub = group.sub_keys.len();
966
967 let is_absent = |si: usize, row: usize| -> bool {
969 if group.absence_bitmap.is_empty() {
970 return false; }
972 let bit_idx = si * num_rows + row;
973 let byte_idx = bit_idx / 8;
974 if byte_idx >= group.absence_bitmap.len() {
975 return false;
976 }
977 (group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
978 };
979
980 let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
982 for row in 0..num_rows {
983 let all_null = (0..num_sub).all(|si| {
986 flat_idx + si < flat_col_values.len()
987 && flat_col_values[flat_idx + si][row] == b"null"
988 });
989 if all_null && !group.absence_bitmap.is_empty() {
990 let any_present_null = (0..num_sub).any(|si| {
993 flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
994 });
995 if any_present_null {
996 } else {
999 merged_col.push(b"null".to_vec());
1001 continue;
1002 }
1003 } else if all_null {
1004 merged_col.push(b"null".to_vec());
1005 continue;
1006 }
1007
1008 let has_absent = (0..num_sub).any(|si| is_absent(si, row));
1010
1011 if !has_absent
1012 && !group.nested_template.is_empty()
1013 && group.nested_template.len() == num_sub + 1
1014 {
1015 let mut obj = Vec::new();
1018 obj.extend_from_slice(&group.nested_template[0]);
1019 if flat_idx < flat_col_values.len() {
1020 obj.extend_from_slice(flat_col_values[flat_idx][row]);
1021 }
1022 for si in 1..num_sub {
1023 obj.extend_from_slice(&group.nested_template[si]);
1024 if flat_idx + si < flat_col_values.len() {
1025 obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
1026 }
1027 }
1028 obj.extend_from_slice(&group.nested_template[num_sub]);
1029 merged_col.push(obj);
1030 } else {
1031 let mut obj = Vec::new();
1034 obj.push(b'{');
1035 let mut first = true;
1036 for si in 0..num_sub {
1037 if flat_idx + si >= flat_col_values.len() {
1038 break;
1039 }
1040 if is_absent(si, row) {
1041 continue; }
1043 let val = flat_col_values[flat_idx + si][row];
1044 if !first {
1045 obj.push(b',');
1046 }
1047 first = false;
1048 obj.push(b'"');
1049 obj.extend_from_slice(&group.sub_keys[si]);
1050 obj.push(b'"');
1051 obj.push(b':');
1052 obj.extend_from_slice(val);
1053 }
1054 obj.push(b'}');
1055 merged_col.push(obj);
1056 }
1057 }
1058 output_columns.push(merged_col);
1059 flat_idx += num_sub;
1060 } else {
1061 if flat_idx < flat_col_values.len() {
1063 let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1064 .iter()
1065 .map(|v| v.to_vec())
1066 .collect();
1067 output_columns.push(col);
1068 }
1069 flat_idx += 1;
1070 }
1071 }
1072
1073 let num_out_cols = output_columns.len();
1075 let mut out = Vec::new();
1076 for (ci, col) in output_columns.iter().enumerate() {
1077 for (ri, val) in col.iter().enumerate() {
1078 out.extend_from_slice(val);
1079 if ri < num_rows - 1 {
1080 out.push(VAL_SEP);
1081 }
1082 }
1083 if ci < num_out_cols - 1 {
1084 out.push(COL_SEP);
1085 }
1086 }
1087
1088 out
1089}
1090
1091pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1096 let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1097 let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
1098 let mut out = Vec::new();
1099 let version = if has_absence {
1100 3u8
1101 } else if has_template {
1102 2u8
1103 } else {
1104 1u8
1105 };
1106 out.push(version);
1107 out.push(groups.len() as u8);
1108 for group in groups {
1109 out.extend_from_slice(&group.original_col_index.to_le_bytes());
1110 out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1111 for key in &group.sub_keys {
1112 out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1113 out.extend_from_slice(key);
1114 }
1115 if has_template || version == 3 {
1116 out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1117 for part in &group.nested_template {
1118 out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1119 out.extend_from_slice(part);
1120 }
1121 }
1122 if version == 3 {
1123 let bm_len = group.absence_bitmap.len() as u32;
1124 out.extend_from_slice(&bm_len.to_le_bytes());
1125 out.extend_from_slice(&group.absence_bitmap);
1126 }
1127 }
1128 out
1129}
1130
1131pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1136 if data.is_empty() {
1137 return None;
1138 }
1139 let mut pos = 0;
1140 let version = data[pos];
1141 pos += 1;
1142 if version != 1 && version != 2 && version != 3 {
1143 return None;
1144 }
1145 let has_template = version == 2 || version == 3;
1146 let has_absence = version == 3;
1147 if pos >= data.len() {
1148 return None;
1149 }
1150 let num_groups = data[pos] as usize;
1151 pos += 1;
1152
1153 let mut groups = Vec::with_capacity(num_groups);
1154 for _ in 0..num_groups {
1155 if pos + 4 > data.len() {
1156 return None;
1157 }
1158 let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1159 pos += 2;
1160 let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1161 pos += 2;
1162
1163 let mut sub_keys = Vec::with_capacity(num_sub_cols);
1164 for _ in 0..num_sub_cols {
1165 if pos + 2 > data.len() {
1166 return None;
1167 }
1168 let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1169 pos += 2;
1170 if pos + key_len > data.len() {
1171 return None;
1172 }
1173 sub_keys.push(data[pos..pos + key_len].to_vec());
1174 pos += key_len;
1175 }
1176
1177 let nested_template = if has_template {
1178 if pos + 2 > data.len() {
1179 return None;
1180 }
1181 let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1182 pos += 2;
1183 let mut parts = Vec::with_capacity(num_parts);
1184 for _ in 0..num_parts {
1185 if pos + 2 > data.len() {
1186 return None;
1187 }
1188 let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1189 pos += 2;
1190 if pos + part_len > data.len() {
1191 return None;
1192 }
1193 parts.push(data[pos..pos + part_len].to_vec());
1194 pos += part_len;
1195 }
1196 parts
1197 } else {
1198 Vec::new()
1199 };
1200
1201 let absence_bitmap = if has_absence {
1202 if pos + 4 > data.len() {
1203 return None;
1204 }
1205 let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1206 pos += 4;
1207 if pos + bm_len > data.len() {
1208 return None;
1209 }
1210 let bm = data[pos..pos + bm_len].to_vec();
1211 pos += bm_len;
1212 bm
1213 } else {
1214 Vec::new()
1215 };
1216
1217 groups.push(NestedGroupInfo {
1218 original_col_index,
1219 sub_keys,
1220 nested_template,
1221 absence_bitmap,
1222 });
1223 }
1224
1225 Some((groups, pos))
1226}
1227
1228pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1233 if data.is_empty() {
1234 return None;
1235 }
1236
1237 let has_trailing_newline = data.last() == Some(&b'\n');
1238 let lines = split_lines(data);
1239 let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1240
1241 if non_empty.len() < 2 {
1242 return None;
1243 }
1244
1245 if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1247 if col_data.len() + metadata.len() < data.len() {
1248 let num_rows = non_empty.len();
1253 if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1254 let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
1259 let unflattened =
1260 unflatten_nested_columns(&flat_data, &nested_groups, num_rows, total_flat_cols);
1261 if unflattened == col_data {
1262 let nested_bytes = serialize_nested_info(&nested_groups);
1264 metadata.extend_from_slice(&nested_bytes);
1265 return Some(TransformResult {
1266 data: flat_data,
1267 metadata,
1268 });
1269 }
1270 }
1272 metadata.push(0u8); return Some(TransformResult {
1275 data: col_data,
1276 metadata,
1277 });
1278 }
1279 }
1280
1281 if let Some((grouped_data, grouped_metadata)) =
1283 preprocess_grouped(&non_empty, has_trailing_newline)
1284 {
1285 if grouped_data.len() + grouped_metadata.len() < data.len() {
1286 return Some(TransformResult {
1287 data: grouped_data,
1288 metadata: grouped_metadata,
1289 });
1290 }
1291 }
1292
1293 None
1294}
1295
1296pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1299 if metadata.is_empty() {
1300 return data.to_vec();
1301 }
1302 match metadata[0] {
1303 METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1304 METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1305 _ => data.to_vec(),
1306 }
1307}
1308
1309fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1311 if metadata.len() < 10 {
1312 return data.to_vec();
1313 }
1314 let mut pos = 0;
1315 let _version = metadata[pos];
1316 pos += 1;
1317 let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1318 pos += 4;
1319 let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1320 pos += 2;
1321 let has_trailing_newline = metadata[pos] != 0;
1322 pos += 1;
1323 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1324 pos += 2;
1325
1326 let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1327 for _ in 0..num_parts {
1328 if pos + 2 > metadata.len() {
1329 return data.to_vec();
1330 }
1331 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1332 pos += 2;
1333 if pos + part_len > metadata.len() {
1334 return data.to_vec();
1335 }
1336 parts.push(metadata[pos..pos + part_len].to_vec());
1337 pos += part_len;
1338 }
1339
1340 if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1341 return data.to_vec();
1342 }
1343
1344 let remaining_metadata = &metadata[pos..];
1346 if !remaining_metadata.is_empty()
1347 && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
1348 {
1349 if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1351 let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1353 let unflattened =
1354 unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1355 return reverse_uniform_from_parts(
1356 &unflattened,
1357 &parts,
1358 num_rows,
1359 num_cols,
1360 has_trailing_newline,
1361 );
1362 }
1363 }
1364
1365 reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1366}
1367
1368fn reverse_uniform_from_parts(
1370 data: &[u8],
1371 parts: &[Vec<u8>],
1372 num_rows: usize,
1373 num_cols: usize,
1374 has_trailing_newline: bool,
1375) -> Vec<u8> {
1376 let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1377 if col_chunks.len() != num_cols {
1378 return data.to_vec();
1379 }
1380
1381 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1382 for chunk in &col_chunks {
1383 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1384 if vals.len() != num_rows {
1385 return data.to_vec();
1386 }
1387 columns.push(vals);
1388 }
1389
1390 let mut output = Vec::with_capacity(data.len() * 2);
1391 #[allow(clippy::needless_range_loop)]
1392 for row in 0..num_rows {
1393 output.extend_from_slice(&parts[0]);
1394 output.extend_from_slice(columns[0][row]);
1395 for col in 1..num_cols {
1396 output.extend_from_slice(&parts[col]);
1397 output.extend_from_slice(columns[col][row]);
1398 }
1399 output.extend_from_slice(&parts[num_cols]);
1400
1401 if row < num_rows - 1 || has_trailing_newline {
1402 output.push(b'\n');
1403 }
1404 }
1405
1406 output
1407}
1408
1409fn parse_uniform_metadata(metadata: &[u8]) -> Option<(Vec<Vec<u8>>, usize, usize, bool)> {
1412 if metadata.len() < 10 {
1413 return None;
1414 }
1415 let mut pos = 1; let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1417 pos += 4;
1418 let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1419 pos += 2;
1420 let has_trailing_newline = metadata[pos] != 0;
1421 pos += 1;
1422 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1423 pos += 2;
1424
1425 let mut parts = Vec::with_capacity(num_parts);
1426 for _ in 0..num_parts {
1427 if pos + 2 > metadata.len() {
1428 return None;
1429 }
1430 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1431 pos += 2;
1432 if pos + part_len > metadata.len() {
1433 return None;
1434 }
1435 parts.push(metadata[pos..pos + part_len].to_vec());
1436 pos += part_len;
1437 }
1438
1439 if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1440 return None;
1441 }
1442
1443 Some((parts, num_rows, num_cols, has_trailing_newline))
1444}
1445
1446fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1448 if metadata.len() < 8 {
1449 return data.to_vec();
1450 }
1451
1452 let mut mpos = 1; let has_trailing_newline = metadata[mpos] != 0;
1454 mpos += 1;
1455 let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1456 mpos += 4;
1457 let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1458 mpos += 2;
1459
1460 let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1462
1463 let mut dpos: usize = 0;
1465
1466 for _ in 0..num_groups {
1467 if mpos + 4 > metadata.len() {
1469 return data.to_vec();
1470 }
1471 let group_row_count =
1472 u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1473 mpos += 4;
1474
1475 let mut row_indices = Vec::with_capacity(group_row_count);
1476 for _ in 0..group_row_count {
1477 if mpos + 4 > metadata.len() {
1478 return data.to_vec();
1479 }
1480 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1481 mpos += 4;
1482 row_indices.push(idx);
1483 }
1484
1485 if mpos + 4 > metadata.len() {
1487 return data.to_vec();
1488 }
1489 let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1490 mpos += 4;
1491 if mpos + gm_len > metadata.len() {
1492 return data.to_vec();
1493 }
1494 let group_metadata = &metadata[mpos..mpos + gm_len];
1495 mpos += gm_len;
1496
1497 if dpos + 4 > data.len() {
1499 return data.to_vec();
1500 }
1501 let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1502 dpos += 4;
1503 if dpos + gd_len > data.len() {
1504 return data.to_vec();
1505 }
1506 let group_data = &data[dpos..dpos + gd_len];
1507 dpos += gd_len;
1508
1509 let (parts, num_rows, num_cols, _trailing) = match parse_uniform_metadata(group_metadata) {
1511 Some(v) => v,
1512 None => return data.to_vec(),
1513 };
1514
1515 if num_rows != group_row_count {
1516 return data.to_vec();
1517 }
1518
1519 let col_chunks: Vec<&[u8]> = group_data.split(|&b| b == COL_SEP).collect();
1521 if col_chunks.len() != num_cols {
1522 return data.to_vec();
1523 }
1524
1525 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1526 for chunk in &col_chunks {
1527 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1528 if vals.len() != num_rows {
1529 return data.to_vec();
1530 }
1531 columns.push(vals);
1532 }
1533
1534 for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1536 let mut line = Vec::new();
1537 line.extend_from_slice(&parts[0]);
1538 line.extend_from_slice(columns[0][row_within_group]);
1539 for col in 1..num_cols {
1540 line.extend_from_slice(&parts[col]);
1541 line.extend_from_slice(columns[col][row_within_group]);
1542 }
1543 line.extend_from_slice(&parts[num_cols]);
1544
1545 if original_idx < total_rows {
1546 output_lines[original_idx] = Some(line);
1547 }
1548 }
1549 }
1550
1551 if mpos + 4 > metadata.len() {
1553 return data.to_vec();
1554 }
1555 let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1556 mpos += 4;
1557
1558 let mut residual_indices = Vec::with_capacity(residual_count);
1559 for _ in 0..residual_count {
1560 if mpos + 4 > metadata.len() {
1561 return data.to_vec();
1562 }
1563 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1564 mpos += 4;
1565 residual_indices.push(idx);
1566 }
1567
1568 let residual_data = &data[dpos..];
1570 if residual_count > 0 {
1571 let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
1572 vec![]
1573 } else {
1574 residual_data.split(|&b| b == b'\n').collect()
1575 };
1576 if residual_lines.len() != residual_count {
1578 return data.to_vec();
1579 }
1580 for (i, &idx) in residual_indices.iter().enumerate() {
1581 if idx < total_rows {
1582 output_lines[idx] = Some(residual_lines[i].to_vec());
1583 }
1584 }
1585 }
1586
1587 let mut output = Vec::with_capacity(data.len() * 2);
1589 for (i, slot) in output_lines.iter().enumerate() {
1590 match slot {
1591 Some(line) => output.extend_from_slice(line),
1592 None => {
1593 return data.to_vec();
1595 }
1596 }
1597 if i < total_rows - 1 || has_trailing_newline {
1598 output.push(b'\n');
1599 }
1600 }
1601
1602 output
1603}
1604
1605#[cfg(test)]
1606mod tests {
1607 use super::*;
1608
1609 #[test]
1610 fn extract_value_string() {
1611 let line = br#""hello","next""#;
1612 let (val, end) = extract_value(line, 0).unwrap();
1613 assert_eq!(val, b"\"hello\"");
1614 assert_eq!(end, 7);
1615 }
1616
1617 #[test]
1618 fn extract_value_number() {
1619 let line = b"42,next";
1620 let (val, end) = extract_value(line, 0).unwrap();
1621 assert_eq!(val, b"42");
1622 assert_eq!(end, 2);
1623 }
1624
1625 #[test]
1626 fn extract_value_bool() {
1627 let line = b"true,next";
1628 let (val, end) = extract_value(line, 0).unwrap();
1629 assert_eq!(val, b"true");
1630 assert_eq!(end, 4);
1631 }
1632
1633 #[test]
1634 fn extract_value_null() {
1635 let line = b"null,next";
1636 let (val, end) = extract_value(line, 0).unwrap();
1637 assert_eq!(val, b"null");
1638 assert_eq!(end, 4);
1639 }
1640
1641 #[test]
1642 fn extract_value_object() {
1643 let line = br#"{"a":1,"b":"x"},next"#;
1644 let (val, end) = extract_value(line, 0).unwrap();
1645 assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
1646 assert_eq!(end, 15);
1647 }
1648
1649 #[test]
1650 fn extract_value_array() {
1651 let line = b"[1,2,3],next";
1652 let (val, end) = extract_value(line, 0).unwrap();
1653 assert_eq!(val, b"[1,2,3]");
1654 assert_eq!(end, 7);
1655 }
1656
1657 #[test]
1658 fn extract_value_string_with_escapes() {
1659 let line = br#""he\"llo",next"#;
1660 let (val, end) = extract_value(line, 0).unwrap();
1661 assert_eq!(val, br#""he\"llo""#.to_vec());
1662 assert_eq!(end, 9);
1663 }
1664
1665 #[test]
1666 fn parse_line_simple() {
1667 let line = br#"{"a":1,"b":"x"}"#;
1668 let (parts, values) = parse_line(line).unwrap();
1669 assert_eq!(parts.len(), 3); assert_eq!(values.len(), 2);
1671 assert_eq!(values[0], b"1");
1672 assert_eq!(values[1], b"\"x\"");
1673 assert_eq!(parts[0], br#"{"a":"#.to_vec());
1674 assert_eq!(parts[1], br#","b":"#.to_vec());
1675 assert_eq!(parts[2], b"}");
1676 }
1677
1678 #[test]
1679 fn roundtrip_simple() {
1680 let data = br#"{"a":1,"b":"x"}
1681{"a":2,"b":"y"}
1682{"a":3,"b":"z"}
1683"#;
1684 let result = preprocess(data).expect("should produce transform");
1685 let restored = reverse(&result.data, &result.metadata);
1686 assert_eq!(
1687 String::from_utf8_lossy(&restored),
1688 String::from_utf8_lossy(data),
1689 );
1690 assert_eq!(restored, data.to_vec());
1691 }
1692
1693 #[test]
1694 fn roundtrip_no_trailing_newline() {
1695 let data = br#"{"a":1,"b":"x"}
1696{"a":2,"b":"y"}
1697{"a":3,"b":"z"}"#;
1698 let result = preprocess(data).expect("should produce transform");
1699 let restored = reverse(&result.data, &result.metadata);
1700 assert_eq!(restored, data.to_vec());
1701 }
1702
1703 #[test]
1704 fn roundtrip_nested_values() {
1705 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1706{"id":2,"meta":{"x":30,"y":40}}
1707{"id":3,"meta":{"x":50,"y":60}}
1708{"id":4,"meta":{"x":70,"y":80}}
1709{"id":5,"meta":{"x":90,"y":100}}
1710"#;
1711 let result = preprocess(data).expect("should produce transform");
1712 let restored = reverse(&result.data, &result.metadata);
1713 assert_eq!(restored, data.to_vec());
1714 }
1715
1716 #[test]
1717 fn roundtrip_mixed_types() {
1718 let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
1719{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
1720{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
1721{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
1722{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
1723"#;
1724 let result = preprocess(data).expect("should produce transform");
1725 let restored = reverse(&result.data, &result.metadata);
1726 assert_eq!(restored, data.to_vec());
1727 }
1728
1729 #[test]
1730 fn schema_mismatch_too_few_returns_none() {
1731 let data = br#"{"a":1,"b":2}
1733{"a":1,"c":3}
1734"#;
1735 assert!(preprocess(data).is_none());
1736 }
1737
1738 #[test]
1739 fn different_num_keys_too_few_returns_none() {
1740 let data = br#"{"a":1,"b":2}
1741{"a":1}
1742"#;
1743 assert!(preprocess(data).is_none());
1744 }
1745
1746 #[test]
1747 fn single_line_returns_none() {
1748 let data = br#"{"a":1,"b":2}
1749"#;
1750 assert!(preprocess(data).is_none());
1751 }
1752
1753 #[test]
1754 fn empty_returns_none() {
1755 assert!(preprocess(b"").is_none());
1756 }
1757
1758 #[test]
1759 fn column_layout_groups_similar_values() {
1760 let data = br#"{"type":"page_view","user":"alice"}
1761{"type":"api_call","user":"alice"}
1762{"type":"click","user":"bob"}
1763"#;
1764 let result = preprocess(data).unwrap();
1765
1766 let col_data = &result.data;
1768 let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
1769 assert_eq!(cols.len(), 2);
1770
1771 let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
1773 assert_eq!(type_vals.len(), 3);
1774 assert_eq!(type_vals[0], br#""page_view""#);
1775 assert_eq!(type_vals[1], br#""api_call""#);
1776 assert_eq!(type_vals[2], br#""click""#);
1777
1778 let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
1780 assert_eq!(user_vals.len(), 3);
1781 assert_eq!(user_vals[0], br#""alice""#);
1782 assert_eq!(user_vals[1], br#""alice""#);
1783 assert_eq!(user_vals[2], br#""bob""#);
1784 }
1785
1786 #[test]
1787 fn roundtrip_string_with_escaped_chars() {
1788 let data = br#"{"msg":"he said \"hi\"","val":1}
1789{"msg":"line\nbreak","val":2}
1790{"msg":"tab\there","val":3}
1791{"msg":"back\\slash","val":4}
1792{"msg":"normal text","val":5}
1793"#;
1794 let result = preprocess(data).expect("should produce transform");
1795 let restored = reverse(&result.data, &result.metadata);
1796 assert_eq!(restored, data.to_vec());
1797 }
1798
1799 #[test]
1800 fn roundtrip_negative_and_float_numbers() {
1801 let data = br#"{"x":-3.14,"y":0}
1802{"x":2.718,"y":-1}
1803{"x":0.001,"y":999}
1804{"x":-100,"y":-200}
1805{"x":42.0,"y":7}
1806"#;
1807 let result = preprocess(data).expect("should produce transform");
1808 let restored = reverse(&result.data, &result.metadata);
1809 assert_eq!(restored, data.to_vec());
1810 }
1811
1812 #[test]
1815 fn reverse_roundtrip_small_data() {
1816 let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
1818 assert_eq!(vals.len(), 2);
1819 assert_eq!(parts.len(), 3);
1820
1821 let big_data = br#"{"x":-3.14,"y":0}
1823{"x":2.718,"y":-1}
1824"#
1825 .repeat(20);
1826 let result = preprocess(&big_data).expect("should produce transform with 40 rows");
1827 let restored = reverse(&result.data, &result.metadata);
1828 assert_eq!(restored, big_data);
1829 }
1830
1831 #[test]
1834 fn grouped_roundtrip_two_schemas() {
1835 let mut data = Vec::new();
1837 for i in 0..10 {
1838 data.extend_from_slice(
1839 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1840 );
1841 data.push(b'\n');
1842 }
1843 for i in 10..20 {
1844 data.extend_from_slice(
1845 format!(
1846 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1847 i, i, i
1848 )
1849 .as_bytes(),
1850 );
1851 data.push(b'\n');
1852 }
1853 let result = preprocess(&data).expect("should produce grouped transform");
1854 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1856 let restored = reverse(&result.data, &result.metadata);
1857 assert_eq!(restored, data);
1858 }
1859
1860 #[test]
1861 fn grouped_roundtrip_interleaved_schemas() {
1862 let mut data = Vec::new();
1864 for i in 0..20 {
1865 if i % 2 == 0 {
1866 data.extend_from_slice(
1867 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1868 );
1869 } else {
1870 data.extend_from_slice(
1871 format!(
1872 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1873 i, i, i
1874 )
1875 .as_bytes(),
1876 );
1877 }
1878 data.push(b'\n');
1879 }
1880 let result = preprocess(&data).expect("should produce grouped transform");
1881 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1882 let restored = reverse(&result.data, &result.metadata);
1883 assert_eq!(restored, data);
1884 }
1885
1886 #[test]
1887 fn grouped_roundtrip_with_residuals() {
1888 let mut data = Vec::new();
1890 for i in 0..8 {
1892 data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
1893 data.push(b'\n');
1894 }
1895 data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
1897 data.push(b'\n');
1898 data.extend_from_slice(br#"{"p":"q"}"#);
1899 data.push(b'\n');
1900 for i in 0..6 {
1902 data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
1903 data.push(b'\n');
1904 }
1905 let result = preprocess(&data).expect("should produce grouped transform");
1906 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1907 let restored = reverse(&result.data, &result.metadata);
1908 assert_eq!(
1909 String::from_utf8_lossy(&restored),
1910 String::from_utf8_lossy(&data),
1911 );
1912 assert_eq!(restored, data);
1913 }
1914
1915 #[test]
1916 fn grouped_roundtrip_no_trailing_newline() {
1917 let mut data = Vec::new();
1918 for i in 0..6 {
1919 data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
1920 data.push(b'\n');
1921 }
1922 for i in 0..6 {
1923 data.extend_from_slice(
1924 format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
1925 );
1926 if i < 5 {
1927 data.push(b'\n');
1928 }
1929 }
1931 let result = preprocess(&data).expect("should produce grouped transform");
1932 let restored = reverse(&result.data, &result.metadata);
1933 assert_eq!(restored, data);
1934 }
1935
1936 #[test]
1937 fn uniform_still_preferred_over_grouped() {
1938 let data = br#"{"a":1,"b":"x"}
1940{"a":2,"b":"y"}
1941{"a":3,"b":"z"}
1942{"a":4,"b":"w"}
1943{"a":5,"b":"v"}
1944"#;
1945 let result = preprocess(data).expect("should produce transform");
1946 assert_eq!(
1947 result.metadata[0], METADATA_VERSION_UNIFORM,
1948 "uniform schema should use Strategy 1"
1949 );
1950 let restored = reverse(&result.data, &result.metadata);
1951 assert_eq!(restored, data.to_vec());
1952 }
1953
1954 #[test]
1955 fn grouped_gharchive_simulation() {
1956 let mut data = Vec::new();
1958 for i in 0..50 {
1959 if i % 5 == 0 {
1960 data.extend_from_slice(
1962 format!(
1963 r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
1964 i, i, i, i
1965 )
1966 .as_bytes(),
1967 );
1968 } else {
1969 data.extend_from_slice(
1971 format!(
1972 r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
1973 i, i, i
1974 )
1975 .as_bytes(),
1976 );
1977 }
1978 data.push(b'\n');
1979 }
1980 let result = preprocess(&data).expect("should produce grouped transform");
1981 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1982 let restored = reverse(&result.data, &result.metadata);
1983 assert_eq!(restored, data);
1984 }
1985
1986 #[test]
1989 fn test_nested_decomposition_basic() {
1990 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1992{"id":2,"meta":{"x":30,"y":40}}
1993{"id":3,"meta":{"x":50,"y":60}}
1994"#;
1995 let result = preprocess(data).expect("should produce transform");
1996 assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
1997
1998 let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
2000 assert_eq!(
2002 cols.len(),
2003 3,
2004 "should have 3 columns after flattening: got {}",
2005 cols.len()
2006 );
2007
2008 let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2010 assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
2011
2012 let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
2013 assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
2014 }
2015
2016 #[test]
2017 fn test_nested_roundtrip() {
2018 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2020{"id":2,"meta":{"x":30,"y":40}}
2021{"id":3,"meta":{"x":50,"y":60}}
2022"#;
2023 let result = preprocess(data).expect("should produce transform");
2024 let restored = reverse(&result.data, &result.metadata);
2025 assert_eq!(
2026 String::from_utf8_lossy(&restored),
2027 String::from_utf8_lossy(data),
2028 );
2029 assert_eq!(restored, data.to_vec());
2030 }
2031
2032 #[test]
2033 fn test_nested_mixed_schemas() {
2034 let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
2036{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
2037{"ts":"c","meta":{"query":"pricing","results_count":25}}
2038{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
2039{"ts":"e","meta":{"query":"api docs","results_count":41}}
2040"#;
2041 let result = preprocess(data).expect("should produce transform");
2042 let restored = reverse(&result.data, &result.metadata);
2043 assert_eq!(
2044 String::from_utf8_lossy(&restored),
2045 String::from_utf8_lossy(data),
2046 );
2047 assert_eq!(restored, data.to_vec());
2048 }
2049
2050 #[test]
2051 fn test_nested_no_nested_objects() {
2052 let data = br#"{"a":1,"b":"x"}
2054{"a":2,"b":"y"}
2055{"a":3,"b":"z"}
2056"#;
2057 let result = preprocess(data).expect("should produce transform");
2058 let restored = reverse(&result.data, &result.metadata);
2059 assert_eq!(restored, data.to_vec());
2060
2061 let meta = &result.metadata;
2067 let last_byte = meta[meta.len() - 1];
2068 assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
2069 }
2070
2071 #[test]
2072 fn test_nested_real_corpus() {
2073 let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
2075{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
2076{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
2077{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
2078{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
2079"#;
2080 let result = preprocess(data).expect("should produce transform");
2081 let restored = reverse(&result.data, &result.metadata);
2082 assert_eq!(
2083 String::from_utf8_lossy(&restored),
2084 String::from_utf8_lossy(data),
2085 );
2086 assert_eq!(restored, data.to_vec());
2087 }
2088
2089 #[test]
2090 fn test_nested_roundtrip_with_null_values() {
2091 let data = br#"{"id":1,"meta":{"x":10}}
2093{"id":2,"meta":null}
2094{"id":3,"meta":{"x":30}}
2095{"id":4,"meta":null}
2096{"id":5,"meta":{"x":50}}
2097"#;
2098 let result = preprocess(data).expect("should produce transform");
2099 let restored = reverse(&result.data, &result.metadata);
2100 assert_eq!(restored, data.to_vec());
2101 }
2102
2103 #[test]
2104 fn test_nested_string_values_preserved_exact() {
2105 let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2107{"id":2,"meta":{"name":"Bob","score":200}}
2108{"id":3,"meta":{"name":"Charlie","score":300}}
2109"#;
2110 let result = preprocess(data).expect("should produce transform");
2111 let restored = reverse(&result.data, &result.metadata);
2112 assert_eq!(restored, data.to_vec());
2113 }
2114
2115 #[test]
2116 fn test_parse_nested_object_kv() {
2117 let obj = br#"{"query":"benchmark","results_count":14}"#;
2118 let pairs = parse_nested_object_kv(obj).unwrap();
2119 assert_eq!(pairs.len(), 2);
2120 assert_eq!(pairs[0].0, b"query");
2121 assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2122 assert_eq!(pairs[1].0, b"results_count");
2123 assert_eq!(pairs[1].1, b"14");
2124 }
2125
2126 #[test]
2127 fn test_nested_varying_subkeys_roundtrip() {
2128 let mut lines = Vec::new();
2131 for i in 0..50 {
2132 let line = if i % 2 == 0 {
2133 format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
2134 } else {
2135 format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
2136 };
2137 lines.push(line);
2138 }
2139 let ndjson = lines.join("\n") + "\n";
2140 let data = ndjson.as_bytes();
2141
2142 let result = preprocess(data).expect("should produce transform");
2143 let restored = reverse(&result.data, &result.metadata);
2144 assert_eq!(
2145 std::str::from_utf8(&restored).unwrap(),
2146 std::str::from_utf8(data).unwrap(),
2147 "varying sub-keys roundtrip must be byte-exact"
2148 );
2149 }
2150
2151 #[test]
2152 fn test_nested_explicit_null_preserved() {
2153 let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
2156 {\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
2157 {\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
2158 let result = preprocess(data).expect("should produce transform");
2159 let restored = reverse(&result.data, &result.metadata);
2160 assert_eq!(
2161 std::str::from_utf8(&restored).unwrap(),
2162 std::str::from_utf8(data).unwrap(),
2163 "explicit null values must be preserved"
2164 );
2165 }
2166}