1use super::transform::TransformResult;
24use std::collections::HashMap;
25
26const COL_SEP: u8 = 0x00;
27const VAL_SEP: u8 = 0x01;
28const METADATA_VERSION_UNIFORM: u8 = 1;
29const METADATA_VERSION_GROUPED: u8 = 2;
30
31const MIN_GROUP_ROWS: usize = 5;
33
34type SchemaGroup = (Vec<Vec<u8>>, Vec<(usize, Vec<Vec<u8>>)>);
36
37fn extract_value(line: &[u8], mut pos: usize) -> Option<(Vec<u8>, usize)> {
43 while pos < line.len() && line[pos].is_ascii_whitespace() {
45 pos += 1;
46 }
47 if pos >= line.len() {
48 return None;
49 }
50
51 let start = pos;
52 match line[pos] {
53 b'"' => {
54 pos += 1;
56 let mut escaped = false;
57 while pos < line.len() {
58 if escaped {
59 escaped = false;
60 } else if line[pos] == b'\\' {
61 escaped = true;
62 } else if line[pos] == b'"' {
63 pos += 1;
64 return Some((line[start..pos].to_vec(), pos));
65 }
66 pos += 1;
67 }
68 None }
70 b'{' => {
71 let mut depth = 1;
73 pos += 1;
74 while pos < line.len() && depth > 0 {
75 match line[pos] {
76 b'"' => {
77 pos += 1;
79 let mut escaped = false;
80 while pos < line.len() {
81 if escaped {
82 escaped = false;
83 } else if line[pos] == b'\\' {
84 escaped = true;
85 } else if line[pos] == b'"' {
86 break;
87 }
88 pos += 1;
89 }
90 }
91 b'{' => depth += 1,
92 b'}' => depth -= 1,
93 _ => {}
94 }
95 pos += 1;
96 }
97 if depth != 0 || pos > line.len() {
98 return None; }
100 Some((line[start..pos].to_vec(), pos))
101 }
102 b'[' => {
103 let mut depth = 1;
105 pos += 1;
106 while pos < line.len() && depth > 0 {
107 match line[pos] {
108 b'"' => {
109 pos += 1;
110 let mut escaped = false;
111 while pos < line.len() {
112 if escaped {
113 escaped = false;
114 } else if line[pos] == b'\\' {
115 escaped = true;
116 } else if line[pos] == b'"' {
117 break;
118 }
119 pos += 1;
120 }
121 }
122 b'[' => depth += 1,
123 b']' => depth -= 1,
124 _ => {}
125 }
126 pos += 1;
127 }
128 if depth != 0 || pos > line.len() {
129 return None; }
131 Some((line[start..pos].to_vec(), pos))
132 }
133 _ => {
134 while pos < line.len() {
136 match line[pos] {
137 b',' | b'}' | b']' => break,
138 _ if line[pos].is_ascii_whitespace() => break,
139 _ => pos += 1,
140 }
141 }
142 if pos == start {
143 None
144 } else {
145 Some((line[start..pos].to_vec(), pos))
146 }
147 }
148 }
149}
150
151type ParsedLine = (Vec<Vec<u8>>, Vec<Vec<u8>>);
162
163fn parse_line(line: &[u8]) -> Option<ParsedLine> {
164 let mut pos = 0;
165
166 while pos < line.len() && line[pos].is_ascii_whitespace() {
168 pos += 1;
169 }
170 if pos >= line.len() || line[pos] != b'{' {
171 return None;
172 }
173
174 let mut parts: Vec<Vec<u8>> = Vec::new();
175 let mut values: Vec<Vec<u8>> = Vec::new();
176 let mut part_start = 0;
177
178 pos += 1; loop {
181 while pos < line.len() && line[pos].is_ascii_whitespace() {
183 pos += 1;
184 }
185 if pos >= line.len() {
186 return None;
187 }
188
189 if line[pos] == b'}' {
191 parts.push(line[part_start..].to_vec());
193 break;
194 }
195
196 if line[pos] != b'"' {
198 return None;
199 }
200 pos += 1;
202 let mut escaped = false;
203 while pos < line.len() {
204 if escaped {
205 escaped = false;
206 } else if line[pos] == b'\\' {
207 escaped = true;
208 } else if line[pos] == b'"' {
209 pos += 1;
210 break;
211 }
212 pos += 1;
213 }
214
215 while pos < line.len() && line[pos].is_ascii_whitespace() {
217 pos += 1;
218 }
219 if pos >= line.len() || line[pos] != b':' {
220 return None;
221 }
222 pos += 1; parts.push(line[part_start..pos].to_vec());
226
227 let (value, value_end) = extract_value(line, pos)?;
229 values.push(value);
230 pos = value_end;
231
232 part_start = pos;
234
235 while pos < line.len() && line[pos].is_ascii_whitespace() {
237 pos += 1;
238 }
239 if pos >= line.len() {
240 return None;
241 }
242
243 if line[pos] == b',' {
245 pos += 1;
246 } else if line[pos] == b'}' {
247 parts.push(line[part_start..].to_vec());
251 break;
252 } else {
253 return None; }
255 }
256
257 if values.is_empty() {
258 return None;
259 }
260
261 Some((parts, values))
262}
263
264fn split_lines(data: &[u8]) -> Vec<&[u8]> {
266 let mut lines: Vec<&[u8]> = Vec::new();
267 let mut start = 0;
268 for i in 0..data.len() {
269 if data[i] == b'\n' {
270 lines.push(&data[start..i]);
271 start = i + 1;
272 }
273 }
274 if start < data.len() {
275 lines.push(&data[start..]);
276 }
277 lines
278}
279
280fn build_uniform_columnar(
283 template_parts: &[Vec<u8>],
284 columns: &[Vec<Vec<u8>>],
285 num_rows: usize,
286 has_trailing_newline: bool,
287) -> (Vec<u8>, Vec<u8>) {
288 let num_cols = columns.len();
289
290 let mut col_data = Vec::new();
292 for (ci, col) in columns.iter().enumerate() {
293 for (ri, val) in col.iter().enumerate() {
294 col_data.extend_from_slice(val);
295 if ri < num_rows - 1 {
296 col_data.push(VAL_SEP);
297 }
298 }
299 if ci < num_cols - 1 {
300 col_data.push(COL_SEP);
301 }
302 }
303
304 let mut metadata = Vec::new();
306 metadata.push(METADATA_VERSION_UNIFORM);
307 metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
308 metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
309 metadata.push(if has_trailing_newline { 1 } else { 0 });
310 metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
311 for part in template_parts {
312 metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
313 metadata.extend_from_slice(part);
314 }
315
316 (col_data, metadata)
317}
318
319fn preprocess_uniform(
322 non_empty: &[&[u8]],
323 has_trailing_newline: bool,
324) -> Option<(Vec<u8>, Vec<u8>)> {
325 if non_empty.len() < 2 {
326 return None;
327 }
328
329 let (template_parts, first_values) = parse_line(non_empty[0])?;
330 let num_cols = first_values.len();
331 if template_parts.len() != num_cols + 1 {
332 return None;
333 }
334
335 let mut columns: Vec<Vec<Vec<u8>>> = Vec::with_capacity(num_cols);
336 for v in &first_values {
337 columns.push(vec![v.clone()]);
338 }
339
340 for &line in &non_empty[1..] {
341 let (parts, values) = parse_line(line)?;
342 if values.len() != num_cols || parts.len() != template_parts.len() {
343 return None;
344 }
345 for (a, b) in parts.iter().zip(template_parts.iter()) {
346 if a != b {
347 return None;
348 }
349 }
350 for (col, val) in values.iter().enumerate() {
351 columns[col].push(val.clone());
352 }
353 }
354
355 Some(build_uniform_columnar(
356 &template_parts,
357 &columns,
358 non_empty.len(),
359 has_trailing_newline,
360 ))
361}
362
363fn preprocess_grouped(
384 non_empty: &[&[u8]],
385 has_trailing_newline: bool,
386) -> Option<(Vec<u8>, Vec<u8>)> {
387 if non_empty.len() < MIN_GROUP_ROWS {
388 return None;
389 }
390
391 let mut parsed: Vec<Option<ParsedLine>> = Vec::with_capacity(non_empty.len());
394 for &line in non_empty {
395 parsed.push(parse_line(line));
396 }
397
398 let mut group_map: HashMap<Vec<u8>, SchemaGroup> = HashMap::new();
401 let mut residual_indices: Vec<usize> = Vec::new();
402
403 for (idx, parsed_line) in parsed.into_iter().enumerate() {
404 if let Some((parts, values)) = parsed_line {
405 let mut key = Vec::new();
407 for part in &parts {
408 key.extend_from_slice(&(part.len() as u32).to_le_bytes());
409 key.extend_from_slice(part);
410 }
411 group_map
412 .entry(key)
413 .or_insert_with(|| (parts, Vec::new()))
414 .1
415 .push((idx, values));
416 } else {
417 residual_indices.push(idx);
419 }
420 }
421
422 let mut groups: Vec<SchemaGroup> = Vec::new();
424 for (_key, (template_parts, rows)) in group_map {
425 if rows.len() >= MIN_GROUP_ROWS {
426 groups.push((template_parts, rows));
427 } else {
428 for (idx, _) in &rows {
430 residual_indices.push(*idx);
431 }
432 }
433 }
434
435 if groups.is_empty() {
437 return None;
438 }
439
440 groups.sort_by_key(|(_, rows)| rows[0].0);
442 residual_indices.sort_unstable();
443
444 struct GroupOutput {
446 row_indices: Vec<u32>,
447 col_data: Vec<u8>,
448 group_metadata: Vec<u8>,
449 }
450
451 let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
452
453 for (template_parts, rows) in &groups {
454 let num_cols = template_parts.len() - 1;
455 let mut columns: Vec<Vec<Vec<u8>>> = (0..num_cols).map(|_| Vec::new()).collect();
456 let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
457
458 for (idx, values) in rows {
459 row_indices.push(*idx as u32);
460 for (col, val) in values.iter().enumerate() {
461 columns[col].push(val.clone());
462 }
463 }
464
465 let (col_data, group_metadata) =
467 build_uniform_columnar(template_parts, &columns, rows.len(), false);
468
469 group_outputs.push(GroupOutput {
470 row_indices,
471 col_data,
472 group_metadata,
473 });
474 }
475
476 let mut data_out = Vec::new();
478 for group in &group_outputs {
479 data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
480 data_out.extend_from_slice(&group.col_data);
481 }
482
483 let residual_start = data_out.len();
485 for (i, &idx) in residual_indices.iter().enumerate() {
486 data_out.extend_from_slice(non_empty[idx]);
487 if i < residual_indices.len() - 1 {
488 data_out.push(b'\n');
489 }
490 }
491 let _residual_len = data_out.len() - residual_start;
492
493 let mut metadata = Vec::new();
495 metadata.push(METADATA_VERSION_GROUPED);
496 metadata.push(if has_trailing_newline { 1 } else { 0 });
497 metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
498 metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
499
500 for group in &group_outputs {
501 metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
502 for &idx in &group.row_indices {
503 metadata.extend_from_slice(&idx.to_le_bytes());
504 }
505 metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
506 metadata.extend_from_slice(&group.group_metadata);
507 }
508
509 metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
510 for &idx in &residual_indices {
511 metadata.extend_from_slice(&(idx as u32).to_le_bytes());
512 }
513
514 Some((data_out, metadata))
515}
516
517pub(crate) struct NestedGroupInfo {
519 pub(crate) original_col_index: u16,
521 pub(crate) sub_keys: Vec<Vec<u8>>,
523 pub(crate) nested_template: Vec<Vec<u8>>,
526 pub(crate) absence_bitmap: Vec<u8>,
532}
533
534pub(crate) fn flatten_nested_columns(
540 col_data: &[u8],
541 num_rows: usize,
542) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
543 let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
545 if columns.is_empty() || num_rows == 0 {
546 return None;
547 }
548
549 let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
550 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
553
554 for (col_idx, &col_chunk) in columns.iter().enumerate() {
555 let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
556 if values.len() != num_rows {
557 return None;
558 }
559
560 let mut all_objects = true;
562 let mut has_non_null = false;
563 for val in &values {
564 if *val == b"null" {
565 continue;
566 }
567 has_non_null = true;
568 if !val.starts_with(b"{") {
569 all_objects = false;
570 break;
571 }
572 }
573
574 if !all_objects || !has_non_null {
575 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
577 output_columns.push(col_values);
578 continue;
579 }
580
581 let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
585 let mut nested_template: Vec<Vec<u8>> = Vec::new();
586 type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
587 let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
588
589 for val in &values {
590 if *val == b"null" {
591 parsed_rows.push(None);
592 continue;
593 }
594 if nested_template.is_empty() {
595 match parse_nested_object_with_template(val) {
597 Some((template, kv_pairs)) => {
598 for (key, _) in &kv_pairs {
599 if !all_sub_keys.iter().any(|k| k == key) {
600 all_sub_keys.push(key.clone());
601 }
602 }
603 nested_template = template;
604 parsed_rows.push(Some(kv_pairs));
605 }
606 None => {
607 all_sub_keys.clear();
608 break;
609 }
610 }
611 } else {
612 match parse_nested_object_kv(val) {
614 Some(kv_pairs) => {
615 for (key, _) in &kv_pairs {
616 if !all_sub_keys.iter().any(|k| k == key) {
617 all_sub_keys.push(key.clone());
618 }
619 }
620 parsed_rows.push(Some(kv_pairs));
621 }
622 None => {
623 all_sub_keys.clear();
624 break;
625 }
626 }
627 }
628 }
629
630 if all_sub_keys.is_empty() {
631 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
633 output_columns.push(col_values);
634 continue;
635 }
636
637 let num_sub_keys = all_sub_keys.len();
641 let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
642 let total_bits = num_sub_keys * num_rows;
643 let bitmap_bytes = total_bits.div_ceil(8);
644 let mut absence_bitmap = vec![0u8; bitmap_bytes];
645 let mut has_any_absent = false;
646
647 for (row_idx, parsed) in parsed_rows.iter().enumerate() {
648 match parsed {
649 Some(kv_pairs) => {
650 for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
651 let found = kv_pairs.iter().find(|(k, _)| k == sk);
652 match found {
653 Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
654 None => {
655 sub_columns[sk_idx].push(b"null".to_vec());
656 let bit_idx = sk_idx * num_rows + row_idx;
658 absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
659 has_any_absent = true;
660 }
661 }
662 }
663 }
664 None => {
665 for sc in sub_columns.iter_mut() {
669 sc.push(b"null".to_vec());
670 }
671 }
672 }
673 }
674
675 nested_groups.push(NestedGroupInfo {
676 original_col_index: col_idx as u16,
677 sub_keys: all_sub_keys,
678 nested_template,
679 absence_bitmap: if has_any_absent {
680 absence_bitmap
681 } else {
682 Vec::new()
683 },
684 });
685
686 for sc in sub_columns {
687 output_columns.push(sc);
688 }
689 }
690
691 if nested_groups.is_empty() {
692 return None;
693 }
694
695 let num_out_cols = output_columns.len();
697 let mut out = Vec::new();
698 for (ci, col) in output_columns.iter().enumerate() {
699 for (ri, val) in col.iter().enumerate() {
700 out.extend_from_slice(val);
701 if ri < num_rows - 1 {
702 out.push(VAL_SEP);
703 }
704 }
705 if ci < num_out_cols - 1 {
706 out.push(COL_SEP);
707 }
708 }
709
710 Some((out, nested_groups))
711}
712
713#[allow(clippy::type_complexity)]
718pub(crate) fn parse_nested_object_with_template(
719 obj: &[u8],
720) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
721 let mut pos = 0;
722
723 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
725 pos += 1;
726 }
727 if pos >= obj.len() || obj[pos] != b'{' {
728 return None;
729 }
730 pos += 1;
731
732 let mut parts: Vec<Vec<u8>> = Vec::new();
733 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
734 let mut part_start = 0;
735
736 loop {
737 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
739 pos += 1;
740 }
741 if pos >= obj.len() {
742 return None;
743 }
744 if obj[pos] == b'}' {
745 parts.push(obj[part_start..].to_vec());
746 break;
747 }
748
749 if obj[pos] != b'"' {
751 return None;
752 }
753 let key_str_start = pos + 1;
754 pos += 1;
755 let mut escaped = false;
756 while pos < obj.len() {
757 if escaped {
758 escaped = false;
759 } else if obj[pos] == b'\\' {
760 escaped = true;
761 } else if obj[pos] == b'"' {
762 break;
763 }
764 pos += 1;
765 }
766 if pos >= obj.len() {
767 return None;
768 }
769 let key = obj[key_str_start..pos].to_vec();
770 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
774 pos += 1;
775 }
776 if pos >= obj.len() || obj[pos] != b':' {
777 return None;
778 }
779 pos += 1;
780
781 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
783 pos += 1;
784 }
785
786 parts.push(obj[part_start..pos].to_vec());
788
789 let value_start = pos;
791 let (value, value_end) = extract_value(obj, value_start)?;
793 pos = value_end;
794 pairs.push((key, value));
795
796 part_start = pos;
797
798 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
800 pos += 1;
801 }
802 if pos >= obj.len() {
803 return None;
804 }
805 if obj[pos] == b',' {
806 pos += 1;
807 } else if obj[pos] == b'}' {
808 parts.push(obj[part_start..].to_vec());
809 break;
810 } else {
811 return None;
812 }
813 }
814
815 if pairs.is_empty() {
816 return None;
817 }
818 Some((parts, pairs))
819}
820
821pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
825 let mut pos = 0;
826
827 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
829 pos += 1;
830 }
831 if pos >= obj.len() || obj[pos] != b'{' {
832 return None;
833 }
834 pos += 1;
835
836 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
837
838 loop {
839 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
841 pos += 1;
842 }
843 if pos >= obj.len() {
844 return None;
845 }
846 if obj[pos] == b'}' {
847 break;
848 }
849
850 if obj[pos] != b'"' {
852 return None;
853 }
854 pos += 1;
855 let key_start = pos;
856 let mut escaped = false;
857 while pos < obj.len() {
858 if escaped {
859 escaped = false;
860 } else if obj[pos] == b'\\' {
861 escaped = true;
862 } else if obj[pos] == b'"' {
863 break;
864 }
865 pos += 1;
866 }
867 if pos >= obj.len() {
868 return None;
869 }
870 let key = obj[key_start..pos].to_vec();
871 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
875 pos += 1;
876 }
877 if pos >= obj.len() || obj[pos] != b':' {
878 return None;
879 }
880 pos += 1;
881
882 let (value, value_end) = extract_value(obj, pos)?;
884 pos = value_end;
885 pairs.push((key, value));
886
887 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
889 pos += 1;
890 }
891 if pos >= obj.len() {
892 return None;
893 }
894 if obj[pos] == b',' {
895 pos += 1;
896 } else if obj[pos] == b'}' {
897 break;
898 } else {
899 return None;
900 }
901 }
902
903 if pairs.is_empty() {
904 return None;
905 }
906 Some(pairs)
907}
908
909pub(crate) fn unflatten_nested_columns(
914 flat_data: &[u8],
915 nested_groups: &[NestedGroupInfo],
916 num_rows: usize,
917 total_flat_cols: usize,
918) -> Vec<u8> {
919 let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
920 if flat_columns.len() != total_flat_cols {
921 return flat_data.to_vec();
922 }
923
924 let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
926 for chunk in &flat_columns {
927 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
928 if vals.len() != num_rows {
929 return flat_data.to_vec();
930 }
931 flat_col_values.push(vals);
932 }
933
934 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
937
938 let original_num_cols = total_flat_cols
947 - nested_groups
948 .iter()
949 .map(|g| g.sub_keys.len())
950 .sum::<usize>()
951 + nested_groups.len();
952
953 let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
955 for (gi, group) in nested_groups.iter().enumerate() {
956 if (group.original_col_index as usize) < original_num_cols {
957 original_col_map[group.original_col_index as usize] = Some(gi);
958 }
959 }
960
961 let mut flat_idx = 0;
962 for entry in original_col_map.iter().take(original_num_cols) {
963 if let Some(gi) = entry {
964 let group = &nested_groups[*gi];
965 let num_sub = group.sub_keys.len();
966
967 let is_absent = |si: usize, row: usize| -> bool {
969 if group.absence_bitmap.is_empty() {
970 return false; }
972 let bit_idx = si * num_rows + row;
973 let byte_idx = bit_idx / 8;
974 if byte_idx >= group.absence_bitmap.len() {
975 return false;
976 }
977 (group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
978 };
979
980 let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
982 for row in 0..num_rows {
983 let all_null = (0..num_sub).all(|si| {
986 flat_idx + si < flat_col_values.len()
987 && flat_col_values[flat_idx + si][row] == b"null"
988 });
989 if all_null && !group.absence_bitmap.is_empty() {
990 let any_present_null = (0..num_sub).any(|si| {
993 flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
994 });
995 if any_present_null {
996 } else {
999 merged_col.push(b"null".to_vec());
1001 continue;
1002 }
1003 } else if all_null {
1004 merged_col.push(b"null".to_vec());
1005 continue;
1006 }
1007
1008 let has_absent = (0..num_sub).any(|si| is_absent(si, row));
1010
1011 if !has_absent
1012 && !group.nested_template.is_empty()
1013 && group.nested_template.len() == num_sub + 1
1014 {
1015 let mut obj = Vec::new();
1018 obj.extend_from_slice(&group.nested_template[0]);
1019 if flat_idx < flat_col_values.len() {
1020 obj.extend_from_slice(flat_col_values[flat_idx][row]);
1021 }
1022 for si in 1..num_sub {
1023 obj.extend_from_slice(&group.nested_template[si]);
1024 if flat_idx + si < flat_col_values.len() {
1025 obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
1026 }
1027 }
1028 obj.extend_from_slice(&group.nested_template[num_sub]);
1029 merged_col.push(obj);
1030 } else {
1031 let mut obj = Vec::new();
1034 obj.push(b'{');
1035 let mut first = true;
1036 for si in 0..num_sub {
1037 if flat_idx + si >= flat_col_values.len() {
1038 break;
1039 }
1040 if is_absent(si, row) {
1041 continue; }
1043 let val = flat_col_values[flat_idx + si][row];
1044 if !first {
1045 obj.push(b',');
1046 }
1047 first = false;
1048 obj.push(b'"');
1049 obj.extend_from_slice(&group.sub_keys[si]);
1050 obj.push(b'"');
1051 obj.push(b':');
1052 obj.extend_from_slice(val);
1053 }
1054 obj.push(b'}');
1055 merged_col.push(obj);
1056 }
1057 }
1058 output_columns.push(merged_col);
1059 flat_idx += num_sub;
1060 } else {
1061 if flat_idx < flat_col_values.len() {
1063 let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1064 .iter()
1065 .map(|v| v.to_vec())
1066 .collect();
1067 output_columns.push(col);
1068 }
1069 flat_idx += 1;
1070 }
1071 }
1072
1073 let num_out_cols = output_columns.len();
1075 let mut out = Vec::new();
1076 for (ci, col) in output_columns.iter().enumerate() {
1077 for (ri, val) in col.iter().enumerate() {
1078 out.extend_from_slice(val);
1079 if ri < num_rows - 1 {
1080 out.push(VAL_SEP);
1081 }
1082 }
1083 if ci < num_out_cols - 1 {
1084 out.push(COL_SEP);
1085 }
1086 }
1087
1088 out
1089}
1090
1091pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1096 let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1097 let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
1098 let mut out = Vec::new();
1099 let version = if has_absence {
1100 3u8
1101 } else if has_template {
1102 2u8
1103 } else {
1104 1u8
1105 };
1106 out.push(version);
1107 out.push(groups.len() as u8);
1108 for group in groups {
1109 out.extend_from_slice(&group.original_col_index.to_le_bytes());
1110 out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1111 for key in &group.sub_keys {
1112 out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1113 out.extend_from_slice(key);
1114 }
1115 if has_template || version == 3 {
1116 out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1117 for part in &group.nested_template {
1118 out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1119 out.extend_from_slice(part);
1120 }
1121 }
1122 if version == 3 {
1123 let bm_len = group.absence_bitmap.len() as u32;
1124 out.extend_from_slice(&bm_len.to_le_bytes());
1125 out.extend_from_slice(&group.absence_bitmap);
1126 }
1127 }
1128 out
1129}
1130
1131pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1136 if data.is_empty() {
1137 return None;
1138 }
1139 let mut pos = 0;
1140 let version = data[pos];
1141 pos += 1;
1142 if version != 1 && version != 2 && version != 3 {
1143 return None;
1144 }
1145 let has_template = version == 2 || version == 3;
1146 let has_absence = version == 3;
1147 if pos >= data.len() {
1148 return None;
1149 }
1150 let num_groups = data[pos] as usize;
1151 pos += 1;
1152
1153 let mut groups = Vec::with_capacity(num_groups);
1154 for _ in 0..num_groups {
1155 if pos + 4 > data.len() {
1156 return None;
1157 }
1158 let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1159 pos += 2;
1160 let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1161 pos += 2;
1162
1163 let mut sub_keys = Vec::with_capacity(num_sub_cols);
1164 for _ in 0..num_sub_cols {
1165 if pos + 2 > data.len() {
1166 return None;
1167 }
1168 let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1169 pos += 2;
1170 if pos + key_len > data.len() {
1171 return None;
1172 }
1173 sub_keys.push(data[pos..pos + key_len].to_vec());
1174 pos += key_len;
1175 }
1176
1177 let nested_template = if has_template {
1178 if pos + 2 > data.len() {
1179 return None;
1180 }
1181 let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1182 pos += 2;
1183 let mut parts = Vec::with_capacity(num_parts);
1184 for _ in 0..num_parts {
1185 if pos + 2 > data.len() {
1186 return None;
1187 }
1188 let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1189 pos += 2;
1190 if pos + part_len > data.len() {
1191 return None;
1192 }
1193 parts.push(data[pos..pos + part_len].to_vec());
1194 pos += part_len;
1195 }
1196 parts
1197 } else {
1198 Vec::new()
1199 };
1200
1201 let absence_bitmap = if has_absence {
1202 if pos + 4 > data.len() {
1203 return None;
1204 }
1205 let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1206 pos += 4;
1207 if pos + bm_len > data.len() {
1208 return None;
1209 }
1210 let bm = data[pos..pos + bm_len].to_vec();
1211 pos += bm_len;
1212 bm
1213 } else {
1214 Vec::new()
1215 };
1216
1217 groups.push(NestedGroupInfo {
1218 original_col_index,
1219 sub_keys,
1220 nested_template,
1221 absence_bitmap,
1222 });
1223 }
1224
1225 Some((groups, pos))
1226}
1227
1228pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1233 if data.is_empty() {
1234 return None;
1235 }
1236
1237 let has_trailing_newline = data.last() == Some(&b'\n');
1238 let lines = split_lines(data);
1239 let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1240
1241 if non_empty.len() < 2 {
1242 return None;
1243 }
1244
1245 if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1247 if col_data.len() + metadata.len() < data.len() {
1248 let num_rows = non_empty.len();
1253 if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1254 let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
1259 let unflattened = unflatten_nested_columns(
1260 &flat_data,
1261 &nested_groups,
1262 num_rows,
1263 total_flat_cols,
1264 );
1265 if unflattened == col_data {
1266 let nested_bytes = serialize_nested_info(&nested_groups);
1268 metadata.extend_from_slice(&nested_bytes);
1269 return Some(TransformResult {
1270 data: flat_data,
1271 metadata,
1272 });
1273 }
1274 }
1276 metadata.push(0u8); return Some(TransformResult {
1279 data: col_data,
1280 metadata,
1281 });
1282 }
1283 }
1284
1285 if let Some((grouped_data, grouped_metadata)) =
1287 preprocess_grouped(&non_empty, has_trailing_newline)
1288 {
1289 if grouped_data.len() + grouped_metadata.len() < data.len() {
1290 return Some(TransformResult {
1291 data: grouped_data,
1292 metadata: grouped_metadata,
1293 });
1294 }
1295 }
1296
1297 None
1298}
1299
1300pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1303 if metadata.is_empty() {
1304 return data.to_vec();
1305 }
1306 match metadata[0] {
1307 METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1308 METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1309 _ => data.to_vec(),
1310 }
1311}
1312
1313fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1315 if metadata.len() < 10 {
1316 return data.to_vec();
1317 }
1318 let mut pos = 0;
1319 let _version = metadata[pos];
1320 pos += 1;
1321 let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1322 pos += 4;
1323 let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1324 pos += 2;
1325 let has_trailing_newline = metadata[pos] != 0;
1326 pos += 1;
1327 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1328 pos += 2;
1329
1330 let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1331 for _ in 0..num_parts {
1332 if pos + 2 > metadata.len() {
1333 return data.to_vec();
1334 }
1335 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1336 pos += 2;
1337 if pos + part_len > metadata.len() {
1338 return data.to_vec();
1339 }
1340 parts.push(metadata[pos..pos + part_len].to_vec());
1341 pos += part_len;
1342 }
1343
1344 if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1345 return data.to_vec();
1346 }
1347
1348 let remaining_metadata = &metadata[pos..];
1350 if !remaining_metadata.is_empty()
1351 && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
1352 {
1353 if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1355 let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1357 let unflattened =
1358 unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1359 return reverse_uniform_from_parts(
1360 &unflattened,
1361 &parts,
1362 num_rows,
1363 num_cols,
1364 has_trailing_newline,
1365 );
1366 }
1367 }
1368
1369 reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1370}
1371
1372fn reverse_uniform_from_parts(
1374 data: &[u8],
1375 parts: &[Vec<u8>],
1376 num_rows: usize,
1377 num_cols: usize,
1378 has_trailing_newline: bool,
1379) -> Vec<u8> {
1380 let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1381 if col_chunks.len() != num_cols {
1382 return data.to_vec();
1383 }
1384
1385 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1386 for chunk in &col_chunks {
1387 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1388 if vals.len() != num_rows {
1389 return data.to_vec();
1390 }
1391 columns.push(vals);
1392 }
1393
1394 let mut output = Vec::with_capacity(data.len() * 2);
1395 #[allow(clippy::needless_range_loop)]
1396 for row in 0..num_rows {
1397 output.extend_from_slice(&parts[0]);
1398 output.extend_from_slice(columns[0][row]);
1399 for col in 1..num_cols {
1400 output.extend_from_slice(&parts[col]);
1401 output.extend_from_slice(columns[col][row]);
1402 }
1403 output.extend_from_slice(&parts[num_cols]);
1404
1405 if row < num_rows - 1 || has_trailing_newline {
1406 output.push(b'\n');
1407 }
1408 }
1409
1410 output
1411}
1412
1413fn parse_uniform_metadata(metadata: &[u8]) -> Option<(Vec<Vec<u8>>, usize, usize, bool)> {
1416 if metadata.len() < 10 {
1417 return None;
1418 }
1419 let mut pos = 1; let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1421 pos += 4;
1422 let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1423 pos += 2;
1424 let has_trailing_newline = metadata[pos] != 0;
1425 pos += 1;
1426 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1427 pos += 2;
1428
1429 let mut parts = Vec::with_capacity(num_parts);
1430 for _ in 0..num_parts {
1431 if pos + 2 > metadata.len() {
1432 return None;
1433 }
1434 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1435 pos += 2;
1436 if pos + part_len > metadata.len() {
1437 return None;
1438 }
1439 parts.push(metadata[pos..pos + part_len].to_vec());
1440 pos += part_len;
1441 }
1442
1443 if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1444 return None;
1445 }
1446
1447 Some((parts, num_rows, num_cols, has_trailing_newline))
1448}
1449
1450fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1452 if metadata.len() < 8 {
1453 return data.to_vec();
1454 }
1455
1456 let mut mpos = 1; let has_trailing_newline = metadata[mpos] != 0;
1458 mpos += 1;
1459 let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1460 mpos += 4;
1461 let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1462 mpos += 2;
1463
1464 let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1466
1467 let mut dpos: usize = 0;
1469
1470 for _ in 0..num_groups {
1471 if mpos + 4 > metadata.len() {
1473 return data.to_vec();
1474 }
1475 let group_row_count =
1476 u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1477 mpos += 4;
1478
1479 let mut row_indices = Vec::with_capacity(group_row_count);
1480 for _ in 0..group_row_count {
1481 if mpos + 4 > metadata.len() {
1482 return data.to_vec();
1483 }
1484 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1485 mpos += 4;
1486 row_indices.push(idx);
1487 }
1488
1489 if mpos + 4 > metadata.len() {
1491 return data.to_vec();
1492 }
1493 let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1494 mpos += 4;
1495 if mpos + gm_len > metadata.len() {
1496 return data.to_vec();
1497 }
1498 let group_metadata = &metadata[mpos..mpos + gm_len];
1499 mpos += gm_len;
1500
1501 if dpos + 4 > data.len() {
1503 return data.to_vec();
1504 }
1505 let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1506 dpos += 4;
1507 if dpos + gd_len > data.len() {
1508 return data.to_vec();
1509 }
1510 let group_data = &data[dpos..dpos + gd_len];
1511 dpos += gd_len;
1512
1513 let (parts, num_rows, num_cols, _trailing) = match parse_uniform_metadata(group_metadata) {
1515 Some(v) => v,
1516 None => return data.to_vec(),
1517 };
1518
1519 if num_rows != group_row_count {
1520 return data.to_vec();
1521 }
1522
1523 let col_chunks: Vec<&[u8]> = group_data.split(|&b| b == COL_SEP).collect();
1525 if col_chunks.len() != num_cols {
1526 return data.to_vec();
1527 }
1528
1529 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1530 for chunk in &col_chunks {
1531 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1532 if vals.len() != num_rows {
1533 return data.to_vec();
1534 }
1535 columns.push(vals);
1536 }
1537
1538 for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1540 let mut line = Vec::new();
1541 line.extend_from_slice(&parts[0]);
1542 line.extend_from_slice(columns[0][row_within_group]);
1543 for col in 1..num_cols {
1544 line.extend_from_slice(&parts[col]);
1545 line.extend_from_slice(columns[col][row_within_group]);
1546 }
1547 line.extend_from_slice(&parts[num_cols]);
1548
1549 if original_idx < total_rows {
1550 output_lines[original_idx] = Some(line);
1551 }
1552 }
1553 }
1554
1555 if mpos + 4 > metadata.len() {
1557 return data.to_vec();
1558 }
1559 let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1560 mpos += 4;
1561
1562 let mut residual_indices = Vec::with_capacity(residual_count);
1563 for _ in 0..residual_count {
1564 if mpos + 4 > metadata.len() {
1565 return data.to_vec();
1566 }
1567 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1568 mpos += 4;
1569 residual_indices.push(idx);
1570 }
1571
1572 let residual_data = &data[dpos..];
1574 if residual_count > 0 {
1575 let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
1576 vec![]
1577 } else {
1578 residual_data.split(|&b| b == b'\n').collect()
1579 };
1580 if residual_lines.len() != residual_count {
1582 return data.to_vec();
1583 }
1584 for (i, &idx) in residual_indices.iter().enumerate() {
1585 if idx < total_rows {
1586 output_lines[idx] = Some(residual_lines[i].to_vec());
1587 }
1588 }
1589 }
1590
1591 let mut output = Vec::with_capacity(data.len() * 2);
1593 for (i, slot) in output_lines.iter().enumerate() {
1594 match slot {
1595 Some(line) => output.extend_from_slice(line),
1596 None => {
1597 return data.to_vec();
1599 }
1600 }
1601 if i < total_rows - 1 || has_trailing_newline {
1602 output.push(b'\n');
1603 }
1604 }
1605
1606 output
1607}
1608
1609#[cfg(test)]
1610mod tests {
1611 use super::*;
1612
1613 #[test]
1614 fn extract_value_string() {
1615 let line = br#""hello","next""#;
1616 let (val, end) = extract_value(line, 0).unwrap();
1617 assert_eq!(val, b"\"hello\"");
1618 assert_eq!(end, 7);
1619 }
1620
1621 #[test]
1622 fn extract_value_number() {
1623 let line = b"42,next";
1624 let (val, end) = extract_value(line, 0).unwrap();
1625 assert_eq!(val, b"42");
1626 assert_eq!(end, 2);
1627 }
1628
1629 #[test]
1630 fn extract_value_bool() {
1631 let line = b"true,next";
1632 let (val, end) = extract_value(line, 0).unwrap();
1633 assert_eq!(val, b"true");
1634 assert_eq!(end, 4);
1635 }
1636
1637 #[test]
1638 fn extract_value_null() {
1639 let line = b"null,next";
1640 let (val, end) = extract_value(line, 0).unwrap();
1641 assert_eq!(val, b"null");
1642 assert_eq!(end, 4);
1643 }
1644
1645 #[test]
1646 fn extract_value_object() {
1647 let line = br#"{"a":1,"b":"x"},next"#;
1648 let (val, end) = extract_value(line, 0).unwrap();
1649 assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
1650 assert_eq!(end, 15);
1651 }
1652
1653 #[test]
1654 fn extract_value_array() {
1655 let line = b"[1,2,3],next";
1656 let (val, end) = extract_value(line, 0).unwrap();
1657 assert_eq!(val, b"[1,2,3]");
1658 assert_eq!(end, 7);
1659 }
1660
1661 #[test]
1662 fn extract_value_string_with_escapes() {
1663 let line = br#""he\"llo",next"#;
1664 let (val, end) = extract_value(line, 0).unwrap();
1665 assert_eq!(val, br#""he\"llo""#.to_vec());
1666 assert_eq!(end, 9);
1667 }
1668
1669 #[test]
1670 fn parse_line_simple() {
1671 let line = br#"{"a":1,"b":"x"}"#;
1672 let (parts, values) = parse_line(line).unwrap();
1673 assert_eq!(parts.len(), 3); assert_eq!(values.len(), 2);
1675 assert_eq!(values[0], b"1");
1676 assert_eq!(values[1], b"\"x\"");
1677 assert_eq!(parts[0], br#"{"a":"#.to_vec());
1678 assert_eq!(parts[1], br#","b":"#.to_vec());
1679 assert_eq!(parts[2], b"}");
1680 }
1681
1682 #[test]
1683 fn roundtrip_simple() {
1684 let data = br#"{"a":1,"b":"x"}
1685{"a":2,"b":"y"}
1686{"a":3,"b":"z"}
1687"#;
1688 let result = preprocess(data).expect("should produce transform");
1689 let restored = reverse(&result.data, &result.metadata);
1690 assert_eq!(
1691 String::from_utf8_lossy(&restored),
1692 String::from_utf8_lossy(data),
1693 );
1694 assert_eq!(restored, data.to_vec());
1695 }
1696
1697 #[test]
1698 fn roundtrip_no_trailing_newline() {
1699 let data = br#"{"a":1,"b":"x"}
1700{"a":2,"b":"y"}
1701{"a":3,"b":"z"}"#;
1702 let result = preprocess(data).expect("should produce transform");
1703 let restored = reverse(&result.data, &result.metadata);
1704 assert_eq!(restored, data.to_vec());
1705 }
1706
1707 #[test]
1708 fn roundtrip_nested_values() {
1709 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1710{"id":2,"meta":{"x":30,"y":40}}
1711{"id":3,"meta":{"x":50,"y":60}}
1712{"id":4,"meta":{"x":70,"y":80}}
1713{"id":5,"meta":{"x":90,"y":100}}
1714"#;
1715 let result = preprocess(data).expect("should produce transform");
1716 let restored = reverse(&result.data, &result.metadata);
1717 assert_eq!(restored, data.to_vec());
1718 }
1719
1720 #[test]
1721 fn roundtrip_mixed_types() {
1722 let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
1723{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
1724{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
1725{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
1726{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
1727"#;
1728 let result = preprocess(data).expect("should produce transform");
1729 let restored = reverse(&result.data, &result.metadata);
1730 assert_eq!(restored, data.to_vec());
1731 }
1732
1733 #[test]
1734 fn schema_mismatch_too_few_returns_none() {
1735 let data = br#"{"a":1,"b":2}
1737{"a":1,"c":3}
1738"#;
1739 assert!(preprocess(data).is_none());
1740 }
1741
1742 #[test]
1743 fn different_num_keys_too_few_returns_none() {
1744 let data = br#"{"a":1,"b":2}
1745{"a":1}
1746"#;
1747 assert!(preprocess(data).is_none());
1748 }
1749
1750 #[test]
1751 fn single_line_returns_none() {
1752 let data = br#"{"a":1,"b":2}
1753"#;
1754 assert!(preprocess(data).is_none());
1755 }
1756
1757 #[test]
1758 fn empty_returns_none() {
1759 assert!(preprocess(b"").is_none());
1760 }
1761
1762 #[test]
1763 fn column_layout_groups_similar_values() {
1764 let data = br#"{"type":"page_view","user":"alice"}
1765{"type":"api_call","user":"alice"}
1766{"type":"click","user":"bob"}
1767"#;
1768 let result = preprocess(data).unwrap();
1769
1770 let col_data = &result.data;
1772 let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
1773 assert_eq!(cols.len(), 2);
1774
1775 let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
1777 assert_eq!(type_vals.len(), 3);
1778 assert_eq!(type_vals[0], br#""page_view""#);
1779 assert_eq!(type_vals[1], br#""api_call""#);
1780 assert_eq!(type_vals[2], br#""click""#);
1781
1782 let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
1784 assert_eq!(user_vals.len(), 3);
1785 assert_eq!(user_vals[0], br#""alice""#);
1786 assert_eq!(user_vals[1], br#""alice""#);
1787 assert_eq!(user_vals[2], br#""bob""#);
1788 }
1789
1790 #[test]
1791 fn roundtrip_string_with_escaped_chars() {
1792 let data = br#"{"msg":"he said \"hi\"","val":1}
1793{"msg":"line\nbreak","val":2}
1794{"msg":"tab\there","val":3}
1795{"msg":"back\\slash","val":4}
1796{"msg":"normal text","val":5}
1797"#;
1798 let result = preprocess(data).expect("should produce transform");
1799 let restored = reverse(&result.data, &result.metadata);
1800 assert_eq!(restored, data.to_vec());
1801 }
1802
1803 #[test]
1804 fn roundtrip_negative_and_float_numbers() {
1805 let data = br#"{"x":-3.14,"y":0}
1806{"x":2.718,"y":-1}
1807{"x":0.001,"y":999}
1808{"x":-100,"y":-200}
1809{"x":42.0,"y":7}
1810"#;
1811 let result = preprocess(data).expect("should produce transform");
1812 let restored = reverse(&result.data, &result.metadata);
1813 assert_eq!(restored, data.to_vec());
1814 }
1815
1816 #[test]
1819 fn reverse_roundtrip_small_data() {
1820 let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
1822 assert_eq!(vals.len(), 2);
1823 assert_eq!(parts.len(), 3);
1824
1825 let big_data = br#"{"x":-3.14,"y":0}
1827{"x":2.718,"y":-1}
1828"#
1829 .repeat(20);
1830 let result = preprocess(&big_data).expect("should produce transform with 40 rows");
1831 let restored = reverse(&result.data, &result.metadata);
1832 assert_eq!(restored, big_data);
1833 }
1834
1835 #[test]
1838 fn grouped_roundtrip_two_schemas() {
1839 let mut data = Vec::new();
1841 for i in 0..10 {
1842 data.extend_from_slice(
1843 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1844 );
1845 data.push(b'\n');
1846 }
1847 for i in 10..20 {
1848 data.extend_from_slice(
1849 format!(
1850 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1851 i, i, i
1852 )
1853 .as_bytes(),
1854 );
1855 data.push(b'\n');
1856 }
1857 let result = preprocess(&data).expect("should produce grouped transform");
1858 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1860 let restored = reverse(&result.data, &result.metadata);
1861 assert_eq!(restored, data);
1862 }
1863
1864 #[test]
1865 fn grouped_roundtrip_interleaved_schemas() {
1866 let mut data = Vec::new();
1868 for i in 0..20 {
1869 if i % 2 == 0 {
1870 data.extend_from_slice(
1871 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1872 );
1873 } else {
1874 data.extend_from_slice(
1875 format!(
1876 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1877 i, i, i
1878 )
1879 .as_bytes(),
1880 );
1881 }
1882 data.push(b'\n');
1883 }
1884 let result = preprocess(&data).expect("should produce grouped transform");
1885 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1886 let restored = reverse(&result.data, &result.metadata);
1887 assert_eq!(restored, data);
1888 }
1889
1890 #[test]
1891 fn grouped_roundtrip_with_residuals() {
1892 let mut data = Vec::new();
1894 for i in 0..8 {
1896 data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
1897 data.push(b'\n');
1898 }
1899 data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
1901 data.push(b'\n');
1902 data.extend_from_slice(br#"{"p":"q"}"#);
1903 data.push(b'\n');
1904 for i in 0..6 {
1906 data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
1907 data.push(b'\n');
1908 }
1909 let result = preprocess(&data).expect("should produce grouped transform");
1910 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1911 let restored = reverse(&result.data, &result.metadata);
1912 assert_eq!(
1913 String::from_utf8_lossy(&restored),
1914 String::from_utf8_lossy(&data),
1915 );
1916 assert_eq!(restored, data);
1917 }
1918
1919 #[test]
1920 fn grouped_roundtrip_no_trailing_newline() {
1921 let mut data = Vec::new();
1922 for i in 0..6 {
1923 data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
1924 data.push(b'\n');
1925 }
1926 for i in 0..6 {
1927 data.extend_from_slice(
1928 format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
1929 );
1930 if i < 5 {
1931 data.push(b'\n');
1932 }
1933 }
1935 let result = preprocess(&data).expect("should produce grouped transform");
1936 let restored = reverse(&result.data, &result.metadata);
1937 assert_eq!(restored, data);
1938 }
1939
1940 #[test]
1941 fn uniform_still_preferred_over_grouped() {
1942 let data = br#"{"a":1,"b":"x"}
1944{"a":2,"b":"y"}
1945{"a":3,"b":"z"}
1946{"a":4,"b":"w"}
1947{"a":5,"b":"v"}
1948"#;
1949 let result = preprocess(data).expect("should produce transform");
1950 assert_eq!(
1951 result.metadata[0], METADATA_VERSION_UNIFORM,
1952 "uniform schema should use Strategy 1"
1953 );
1954 let restored = reverse(&result.data, &result.metadata);
1955 assert_eq!(restored, data.to_vec());
1956 }
1957
1958 #[test]
1959 fn grouped_gharchive_simulation() {
1960 let mut data = Vec::new();
1962 for i in 0..50 {
1963 if i % 5 == 0 {
1964 data.extend_from_slice(
1966 format!(
1967 r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
1968 i, i, i, i
1969 )
1970 .as_bytes(),
1971 );
1972 } else {
1973 data.extend_from_slice(
1975 format!(
1976 r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
1977 i, i, i
1978 )
1979 .as_bytes(),
1980 );
1981 }
1982 data.push(b'\n');
1983 }
1984 let result = preprocess(&data).expect("should produce grouped transform");
1985 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1986 let restored = reverse(&result.data, &result.metadata);
1987 assert_eq!(restored, data);
1988 }
1989
1990 #[test]
1993 fn test_nested_decomposition_basic() {
1994 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1996{"id":2,"meta":{"x":30,"y":40}}
1997{"id":3,"meta":{"x":50,"y":60}}
1998"#;
1999 let result = preprocess(data).expect("should produce transform");
2000 assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
2001
2002 let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
2004 assert_eq!(
2006 cols.len(),
2007 3,
2008 "should have 3 columns after flattening: got {}",
2009 cols.len()
2010 );
2011
2012 let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2014 assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
2015
2016 let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
2017 assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
2018 }
2019
2020 #[test]
2021 fn test_nested_roundtrip() {
2022 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2024{"id":2,"meta":{"x":30,"y":40}}
2025{"id":3,"meta":{"x":50,"y":60}}
2026"#;
2027 let result = preprocess(data).expect("should produce transform");
2028 let restored = reverse(&result.data, &result.metadata);
2029 assert_eq!(
2030 String::from_utf8_lossy(&restored),
2031 String::from_utf8_lossy(data),
2032 );
2033 assert_eq!(restored, data.to_vec());
2034 }
2035
2036 #[test]
2037 fn test_nested_mixed_schemas() {
2038 let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
2040{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
2041{"ts":"c","meta":{"query":"pricing","results_count":25}}
2042{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
2043{"ts":"e","meta":{"query":"api docs","results_count":41}}
2044"#;
2045 let result = preprocess(data).expect("should produce transform");
2046 let restored = reverse(&result.data, &result.metadata);
2047 assert_eq!(
2048 String::from_utf8_lossy(&restored),
2049 String::from_utf8_lossy(data),
2050 );
2051 assert_eq!(restored, data.to_vec());
2052 }
2053
2054 #[test]
2055 fn test_nested_no_nested_objects() {
2056 let data = br#"{"a":1,"b":"x"}
2058{"a":2,"b":"y"}
2059{"a":3,"b":"z"}
2060"#;
2061 let result = preprocess(data).expect("should produce transform");
2062 let restored = reverse(&result.data, &result.metadata);
2063 assert_eq!(restored, data.to_vec());
2064
2065 let meta = &result.metadata;
2071 let last_byte = meta[meta.len() - 1];
2072 assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
2073 }
2074
2075 #[test]
2076 fn test_nested_real_corpus() {
2077 let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
2079{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
2080{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
2081{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
2082{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
2083"#;
2084 let result = preprocess(data).expect("should produce transform");
2085 let restored = reverse(&result.data, &result.metadata);
2086 assert_eq!(
2087 String::from_utf8_lossy(&restored),
2088 String::from_utf8_lossy(data),
2089 );
2090 assert_eq!(restored, data.to_vec());
2091 }
2092
2093 #[test]
2094 fn test_nested_roundtrip_with_null_values() {
2095 let data = br#"{"id":1,"meta":{"x":10}}
2097{"id":2,"meta":null}
2098{"id":3,"meta":{"x":30}}
2099{"id":4,"meta":null}
2100{"id":5,"meta":{"x":50}}
2101"#;
2102 let result = preprocess(data).expect("should produce transform");
2103 let restored = reverse(&result.data, &result.metadata);
2104 assert_eq!(restored, data.to_vec());
2105 }
2106
2107 #[test]
2108 fn test_nested_string_values_preserved_exact() {
2109 let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2111{"id":2,"meta":{"name":"Bob","score":200}}
2112{"id":3,"meta":{"name":"Charlie","score":300}}
2113"#;
2114 let result = preprocess(data).expect("should produce transform");
2115 let restored = reverse(&result.data, &result.metadata);
2116 assert_eq!(restored, data.to_vec());
2117 }
2118
2119 #[test]
2120 fn test_parse_nested_object_kv() {
2121 let obj = br#"{"query":"benchmark","results_count":14}"#;
2122 let pairs = parse_nested_object_kv(obj).unwrap();
2123 assert_eq!(pairs.len(), 2);
2124 assert_eq!(pairs[0].0, b"query");
2125 assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2126 assert_eq!(pairs[1].0, b"results_count");
2127 assert_eq!(pairs[1].1, b"14");
2128 }
2129
2130 #[test]
2131 fn test_nested_varying_subkeys_roundtrip() {
2132 let mut lines = Vec::new();
2135 for i in 0..50 {
2136 let line = if i % 2 == 0 {
2137 format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
2138 } else {
2139 format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
2140 };
2141 lines.push(line);
2142 }
2143 let ndjson = lines.join("\n") + "\n";
2144 let data = ndjson.as_bytes();
2145
2146 let result = preprocess(data).expect("should produce transform");
2147 let restored = reverse(&result.data, &result.metadata);
2148 assert_eq!(
2149 std::str::from_utf8(&restored).unwrap(),
2150 std::str::from_utf8(data).unwrap(),
2151 "varying sub-keys roundtrip must be byte-exact"
2152 );
2153 }
2154
2155 #[test]
2156 fn test_nested_explicit_null_preserved() {
2157 let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
2160 {\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
2161 {\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
2162 let result = preprocess(data).expect("should produce transform");
2163 let restored = reverse(&result.data, &result.metadata);
2164 assert_eq!(
2165 std::str::from_utf8(&restored).unwrap(),
2166 std::str::from_utf8(data).unwrap(),
2167 "explicit null values must be preserved"
2168 );
2169 }
2170}