1use super::transform::TransformResult;
24use std::collections::HashMap;
25
26const COL_SEP: u8 = 0x00;
27const VAL_SEP: u8 = 0x01;
28const METADATA_VERSION_UNIFORM: u8 = 1;
29const METADATA_VERSION_GROUPED: u8 = 2;
30
31const MIN_GROUP_ROWS: usize = 5;
33
34type SchemaGroup = (Vec<Vec<u8>>, Vec<(usize, Vec<Vec<u8>>)>);
36
37fn extract_value(line: &[u8], mut pos: usize) -> Option<(Vec<u8>, usize)> {
43 while pos < line.len() && line[pos].is_ascii_whitespace() {
45 pos += 1;
46 }
47 if pos >= line.len() {
48 return None;
49 }
50
51 let start = pos;
52 match line[pos] {
53 b'"' => {
54 pos += 1;
56 let mut escaped = false;
57 while pos < line.len() {
58 if escaped {
59 escaped = false;
60 } else if line[pos] == b'\\' {
61 escaped = true;
62 } else if line[pos] == b'"' {
63 pos += 1;
64 return Some((line[start..pos].to_vec(), pos));
65 }
66 pos += 1;
67 }
68 None }
70 b'{' => {
71 let mut depth = 1;
73 pos += 1;
74 while pos < line.len() && depth > 0 {
75 match line[pos] {
76 b'"' => {
77 pos += 1;
79 let mut escaped = false;
80 while pos < line.len() {
81 if escaped {
82 escaped = false;
83 } else if line[pos] == b'\\' {
84 escaped = true;
85 } else if line[pos] == b'"' {
86 break;
87 }
88 pos += 1;
89 }
90 }
91 b'{' => depth += 1,
92 b'}' => depth -= 1,
93 _ => {}
94 }
95 pos += 1;
96 }
97 if depth != 0 || pos > line.len() {
98 return None; }
100 Some((line[start..pos].to_vec(), pos))
101 }
102 b'[' => {
103 let mut depth = 1;
105 pos += 1;
106 while pos < line.len() && depth > 0 {
107 match line[pos] {
108 b'"' => {
109 pos += 1;
110 let mut escaped = false;
111 while pos < line.len() {
112 if escaped {
113 escaped = false;
114 } else if line[pos] == b'\\' {
115 escaped = true;
116 } else if line[pos] == b'"' {
117 break;
118 }
119 pos += 1;
120 }
121 }
122 b'[' => depth += 1,
123 b']' => depth -= 1,
124 _ => {}
125 }
126 pos += 1;
127 }
128 if depth != 0 || pos > line.len() {
129 return None; }
131 Some((line[start..pos].to_vec(), pos))
132 }
133 _ => {
134 while pos < line.len() {
136 match line[pos] {
137 b',' | b'}' | b']' => break,
138 _ if line[pos].is_ascii_whitespace() => break,
139 _ => pos += 1,
140 }
141 }
142 if pos == start {
143 None
144 } else {
145 Some((line[start..pos].to_vec(), pos))
146 }
147 }
148 }
149}
150
151type ParsedLine = (Vec<Vec<u8>>, Vec<Vec<u8>>);
162
163fn parse_line(line: &[u8]) -> Option<ParsedLine> {
164 let mut pos = 0;
165
166 while pos < line.len() && line[pos].is_ascii_whitespace() {
168 pos += 1;
169 }
170 if pos >= line.len() || line[pos] != b'{' {
171 return None;
172 }
173
174 let mut parts: Vec<Vec<u8>> = Vec::new();
175 let mut values: Vec<Vec<u8>> = Vec::new();
176 let mut part_start = 0;
177
178 pos += 1; loop {
181 while pos < line.len() && line[pos].is_ascii_whitespace() {
183 pos += 1;
184 }
185 if pos >= line.len() {
186 return None;
187 }
188
189 if line[pos] == b'}' {
191 parts.push(line[part_start..].to_vec());
193 break;
194 }
195
196 if line[pos] != b'"' {
198 return None;
199 }
200 pos += 1;
202 let mut escaped = false;
203 while pos < line.len() {
204 if escaped {
205 escaped = false;
206 } else if line[pos] == b'\\' {
207 escaped = true;
208 } else if line[pos] == b'"' {
209 pos += 1;
210 break;
211 }
212 pos += 1;
213 }
214
215 while pos < line.len() && line[pos].is_ascii_whitespace() {
217 pos += 1;
218 }
219 if pos >= line.len() || line[pos] != b':' {
220 return None;
221 }
222 pos += 1; parts.push(line[part_start..pos].to_vec());
226
227 let (value, value_end) = extract_value(line, pos)?;
229 values.push(value);
230 pos = value_end;
231
232 part_start = pos;
234
235 while pos < line.len() && line[pos].is_ascii_whitespace() {
237 pos += 1;
238 }
239 if pos >= line.len() {
240 return None;
241 }
242
243 if line[pos] == b',' {
245 pos += 1;
246 } else if line[pos] == b'}' {
247 parts.push(line[part_start..].to_vec());
251 break;
252 } else {
253 return None; }
255 }
256
257 if values.is_empty() {
258 return None;
259 }
260
261 Some((parts, values))
262}
263
264fn split_lines(data: &[u8]) -> Vec<&[u8]> {
266 let mut lines: Vec<&[u8]> = Vec::new();
267 let mut start = 0;
268 for i in 0..data.len() {
269 if data[i] == b'\n' {
270 lines.push(&data[start..i]);
271 start = i + 1;
272 }
273 }
274 if start < data.len() {
275 lines.push(&data[start..]);
276 }
277 lines
278}
279
280fn build_uniform_columnar(
283 template_parts: &[Vec<u8>],
284 columns: &[Vec<Vec<u8>>],
285 num_rows: usize,
286 has_trailing_newline: bool,
287) -> (Vec<u8>, Vec<u8>) {
288 let num_cols = columns.len();
289
290 let mut col_data = Vec::new();
292 for (ci, col) in columns.iter().enumerate() {
293 for (ri, val) in col.iter().enumerate() {
294 col_data.extend_from_slice(val);
295 if ri < num_rows - 1 {
296 col_data.push(VAL_SEP);
297 }
298 }
299 if ci < num_cols - 1 {
300 col_data.push(COL_SEP);
301 }
302 }
303
304 let mut metadata = Vec::new();
306 metadata.push(METADATA_VERSION_UNIFORM);
307 metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
308 metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
309 metadata.push(if has_trailing_newline { 1 } else { 0 });
310 metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
311 for part in template_parts {
312 metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
313 metadata.extend_from_slice(part);
314 }
315
316 (col_data, metadata)
317}
318
319fn preprocess_uniform(
322 non_empty: &[&[u8]],
323 has_trailing_newline: bool,
324) -> Option<(Vec<u8>, Vec<u8>)> {
325 if non_empty.len() < 2 {
326 return None;
327 }
328
329 let (template_parts, first_values) = parse_line(non_empty[0])?;
330 let num_cols = first_values.len();
331 if template_parts.len() != num_cols + 1 {
332 return None;
333 }
334
335 let mut columns: Vec<Vec<Vec<u8>>> = Vec::with_capacity(num_cols);
336 for v in &first_values {
337 columns.push(vec![v.clone()]);
338 }
339
340 for &line in &non_empty[1..] {
341 let (parts, values) = parse_line(line)?;
342 if values.len() != num_cols || parts.len() != template_parts.len() {
343 return None;
344 }
345 for (a, b) in parts.iter().zip(template_parts.iter()) {
346 if a != b {
347 return None;
348 }
349 }
350 for (col, val) in values.iter().enumerate() {
351 columns[col].push(val.clone());
352 }
353 }
354
355 Some(build_uniform_columnar(
356 &template_parts,
357 &columns,
358 non_empty.len(),
359 has_trailing_newline,
360 ))
361}
362
363fn preprocess_grouped(
384 non_empty: &[&[u8]],
385 has_trailing_newline: bool,
386) -> Option<(Vec<u8>, Vec<u8>)> {
387 if non_empty.len() < MIN_GROUP_ROWS {
388 return None;
389 }
390
391 let mut parsed: Vec<Option<ParsedLine>> = Vec::with_capacity(non_empty.len());
394 for &line in non_empty {
395 parsed.push(parse_line(line));
396 }
397
398 let mut group_map: HashMap<Vec<u8>, SchemaGroup> = HashMap::new();
401 let mut residual_indices: Vec<usize> = Vec::new();
402
403 for (idx, parsed_line) in parsed.into_iter().enumerate() {
404 if let Some((parts, values)) = parsed_line {
405 let mut key = Vec::new();
407 for part in &parts {
408 key.extend_from_slice(&(part.len() as u32).to_le_bytes());
409 key.extend_from_slice(part);
410 }
411 group_map
412 .entry(key)
413 .or_insert_with(|| (parts, Vec::new()))
414 .1
415 .push((idx, values));
416 } else {
417 residual_indices.push(idx);
419 }
420 }
421
422 let mut groups: Vec<SchemaGroup> = Vec::new();
424 for (_key, (template_parts, rows)) in group_map {
425 if rows.len() >= MIN_GROUP_ROWS {
426 groups.push((template_parts, rows));
427 } else {
428 for (idx, _) in &rows {
430 residual_indices.push(*idx);
431 }
432 }
433 }
434
435 if groups.is_empty() {
437 return None;
438 }
439
440 groups.sort_by_key(|(_, rows)| rows[0].0);
442 residual_indices.sort_unstable();
443
444 struct GroupOutput {
446 row_indices: Vec<u32>,
447 col_data: Vec<u8>,
448 group_metadata: Vec<u8>,
449 }
450
451 let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
452
453 for (template_parts, rows) in &groups {
454 let num_cols = template_parts.len() - 1;
455 let mut columns: Vec<Vec<Vec<u8>>> = (0..num_cols).map(|_| Vec::new()).collect();
456 let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
457
458 for (idx, values) in rows {
459 row_indices.push(*idx as u32);
460 for (col, val) in values.iter().enumerate() {
461 columns[col].push(val.clone());
462 }
463 }
464
465 let (col_data, group_metadata) =
467 build_uniform_columnar(template_parts, &columns, rows.len(), false);
468
469 group_outputs.push(GroupOutput {
470 row_indices,
471 col_data,
472 group_metadata,
473 });
474 }
475
476 let mut data_out = Vec::new();
478 for group in &group_outputs {
479 data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
480 data_out.extend_from_slice(&group.col_data);
481 }
482
483 let residual_start = data_out.len();
485 for (i, &idx) in residual_indices.iter().enumerate() {
486 data_out.extend_from_slice(non_empty[idx]);
487 if i < residual_indices.len() - 1 {
488 data_out.push(b'\n');
489 }
490 }
491 let _residual_len = data_out.len() - residual_start;
492
493 let mut metadata = Vec::new();
495 metadata.push(METADATA_VERSION_GROUPED);
496 metadata.push(if has_trailing_newline { 1 } else { 0 });
497 metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
498 metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
499
500 for group in &group_outputs {
501 metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
502 for &idx in &group.row_indices {
503 metadata.extend_from_slice(&idx.to_le_bytes());
504 }
505 metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
506 metadata.extend_from_slice(&group.group_metadata);
507 }
508
509 metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
510 for &idx in &residual_indices {
511 metadata.extend_from_slice(&(idx as u32).to_le_bytes());
512 }
513
514 Some((data_out, metadata))
515}
516
517pub(crate) struct NestedGroupInfo {
519 pub(crate) original_col_index: u16,
521 pub(crate) sub_keys: Vec<Vec<u8>>,
523 pub(crate) nested_template: Vec<Vec<u8>>,
526 pub(crate) absence_bitmap: Vec<u8>,
532}
533
534pub(crate) fn flatten_nested_columns(
540 col_data: &[u8],
541 num_rows: usize,
542) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
543 let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
545 if columns.is_empty() || num_rows == 0 {
546 return None;
547 }
548
549 let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
550 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
553
554 for (col_idx, &col_chunk) in columns.iter().enumerate() {
555 let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
556 if values.len() != num_rows {
557 return None;
558 }
559
560 let mut all_objects = true;
562 let mut has_non_null = false;
563 for val in &values {
564 if *val == b"null" {
565 continue;
566 }
567 has_non_null = true;
568 if !val.starts_with(b"{") {
569 all_objects = false;
570 break;
571 }
572 }
573
574 if !all_objects || !has_non_null {
575 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
577 output_columns.push(col_values);
578 continue;
579 }
580
581 let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
585 let mut nested_template: Vec<Vec<u8>> = Vec::new();
586 type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
587 let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
588
589 for val in &values {
590 if *val == b"null" {
591 parsed_rows.push(None);
592 continue;
593 }
594 if nested_template.is_empty() {
595 match parse_nested_object_with_template(val) {
597 Some((template, kv_pairs)) => {
598 for (key, _) in &kv_pairs {
599 if !all_sub_keys.iter().any(|k| k == key) {
600 all_sub_keys.push(key.clone());
601 }
602 }
603 nested_template = template;
604 parsed_rows.push(Some(kv_pairs));
605 }
606 None => {
607 all_sub_keys.clear();
608 break;
609 }
610 }
611 } else {
612 match parse_nested_object_kv(val) {
614 Some(kv_pairs) => {
615 for (key, _) in &kv_pairs {
616 if !all_sub_keys.iter().any(|k| k == key) {
617 all_sub_keys.push(key.clone());
618 }
619 }
620 parsed_rows.push(Some(kv_pairs));
621 }
622 None => {
623 all_sub_keys.clear();
624 break;
625 }
626 }
627 }
628 }
629
630 if all_sub_keys.is_empty() {
631 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
633 output_columns.push(col_values);
634 continue;
635 }
636
637 let num_sub_keys = all_sub_keys.len();
641 let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
642 let total_bits = num_sub_keys * num_rows;
643 let bitmap_bytes = (total_bits + 7) / 8;
644 let mut absence_bitmap = vec![0u8; bitmap_bytes];
645 let mut has_any_absent = false;
646
647 for (row_idx, parsed) in parsed_rows.iter().enumerate() {
648 match parsed {
649 Some(kv_pairs) => {
650 for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
651 let found = kv_pairs.iter().find(|(k, _)| k == sk);
652 match found {
653 Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
654 None => {
655 sub_columns[sk_idx].push(b"null".to_vec());
656 let bit_idx = sk_idx * num_rows + row_idx;
658 absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
659 has_any_absent = true;
660 }
661 }
662 }
663 }
664 None => {
665 for sc in sub_columns.iter_mut() {
669 sc.push(b"null".to_vec());
670 }
671 }
672 }
673 }
674
675 nested_groups.push(NestedGroupInfo {
676 original_col_index: col_idx as u16,
677 sub_keys: all_sub_keys,
678 nested_template,
679 absence_bitmap: if has_any_absent {
680 absence_bitmap
681 } else {
682 Vec::new()
683 },
684 });
685
686 for sc in sub_columns {
687 output_columns.push(sc);
688 }
689 }
690
691 if nested_groups.is_empty() {
692 return None;
693 }
694
695 let num_out_cols = output_columns.len();
697 let mut out = Vec::new();
698 for (ci, col) in output_columns.iter().enumerate() {
699 for (ri, val) in col.iter().enumerate() {
700 out.extend_from_slice(val);
701 if ri < num_rows - 1 {
702 out.push(VAL_SEP);
703 }
704 }
705 if ci < num_out_cols - 1 {
706 out.push(COL_SEP);
707 }
708 }
709
710 Some((out, nested_groups))
711}
712
713#[allow(clippy::type_complexity)]
718pub(crate) fn parse_nested_object_with_template(
719 obj: &[u8],
720) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
721 let mut pos = 0;
722
723 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
725 pos += 1;
726 }
727 if pos >= obj.len() || obj[pos] != b'{' {
728 return None;
729 }
730 pos += 1;
731
732 let mut parts: Vec<Vec<u8>> = Vec::new();
733 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
734 let mut part_start = 0;
735
736 loop {
737 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
739 pos += 1;
740 }
741 if pos >= obj.len() {
742 return None;
743 }
744 if obj[pos] == b'}' {
745 parts.push(obj[part_start..].to_vec());
746 break;
747 }
748
749 if obj[pos] != b'"' {
751 return None;
752 }
753 let key_str_start = pos + 1;
754 pos += 1;
755 let mut escaped = false;
756 while pos < obj.len() {
757 if escaped {
758 escaped = false;
759 } else if obj[pos] == b'\\' {
760 escaped = true;
761 } else if obj[pos] == b'"' {
762 break;
763 }
764 pos += 1;
765 }
766 if pos >= obj.len() {
767 return None;
768 }
769 let key = obj[key_str_start..pos].to_vec();
770 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
774 pos += 1;
775 }
776 if pos >= obj.len() || obj[pos] != b':' {
777 return None;
778 }
779 pos += 1;
780
781 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
783 pos += 1;
784 }
785
786 parts.push(obj[part_start..pos].to_vec());
788
789 let value_start = pos;
791 let (value, value_end) = extract_value(obj, value_start)?;
793 pos = value_end;
794 pairs.push((key, value));
795
796 part_start = pos;
797
798 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
800 pos += 1;
801 }
802 if pos >= obj.len() {
803 return None;
804 }
805 if obj[pos] == b',' {
806 pos += 1;
807 } else if obj[pos] == b'}' {
808 parts.push(obj[part_start..].to_vec());
809 break;
810 } else {
811 return None;
812 }
813 }
814
815 if pairs.is_empty() {
816 return None;
817 }
818 Some((parts, pairs))
819}
820
821pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
825 let mut pos = 0;
826
827 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
829 pos += 1;
830 }
831 if pos >= obj.len() || obj[pos] != b'{' {
832 return None;
833 }
834 pos += 1;
835
836 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
837
838 loop {
839 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
841 pos += 1;
842 }
843 if pos >= obj.len() {
844 return None;
845 }
846 if obj[pos] == b'}' {
847 break;
848 }
849
850 if obj[pos] != b'"' {
852 return None;
853 }
854 pos += 1;
855 let key_start = pos;
856 let mut escaped = false;
857 while pos < obj.len() {
858 if escaped {
859 escaped = false;
860 } else if obj[pos] == b'\\' {
861 escaped = true;
862 } else if obj[pos] == b'"' {
863 break;
864 }
865 pos += 1;
866 }
867 if pos >= obj.len() {
868 return None;
869 }
870 let key = obj[key_start..pos].to_vec();
871 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
875 pos += 1;
876 }
877 if pos >= obj.len() || obj[pos] != b':' {
878 return None;
879 }
880 pos += 1;
881
882 let (value, value_end) = extract_value(obj, pos)?;
884 pos = value_end;
885 pairs.push((key, value));
886
887 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
889 pos += 1;
890 }
891 if pos >= obj.len() {
892 return None;
893 }
894 if obj[pos] == b',' {
895 pos += 1;
896 } else if obj[pos] == b'}' {
897 break;
898 } else {
899 return None;
900 }
901 }
902
903 if pairs.is_empty() {
904 return None;
905 }
906 Some(pairs)
907}
908
909pub(crate) fn unflatten_nested_columns(
914 flat_data: &[u8],
915 nested_groups: &[NestedGroupInfo],
916 num_rows: usize,
917 total_flat_cols: usize,
918) -> Vec<u8> {
919 let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
920 if flat_columns.len() != total_flat_cols {
921 return flat_data.to_vec();
922 }
923
924 let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
926 for chunk in &flat_columns {
927 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
928 if vals.len() != num_rows {
929 return flat_data.to_vec();
930 }
931 flat_col_values.push(vals);
932 }
933
934 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
937
938 let original_num_cols = total_flat_cols
947 - nested_groups
948 .iter()
949 .map(|g| g.sub_keys.len())
950 .sum::<usize>()
951 + nested_groups.len();
952
953 let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
955 for (gi, group) in nested_groups.iter().enumerate() {
956 if (group.original_col_index as usize) < original_num_cols {
957 original_col_map[group.original_col_index as usize] = Some(gi);
958 }
959 }
960
961 let mut flat_idx = 0;
962 for entry in original_col_map.iter().take(original_num_cols) {
963 if let Some(gi) = entry {
964 let group = &nested_groups[*gi];
965 let num_sub = group.sub_keys.len();
966
967 let is_absent = |si: usize, row: usize| -> bool {
969 if group.absence_bitmap.is_empty() {
970 return false; }
972 let bit_idx = si * num_rows + row;
973 let byte_idx = bit_idx / 8;
974 if byte_idx >= group.absence_bitmap.len() {
975 return false;
976 }
977 (group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
978 };
979
980 let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
982 for row in 0..num_rows {
983 let all_null = (0..num_sub).all(|si| {
986 flat_idx + si < flat_col_values.len()
987 && flat_col_values[flat_idx + si][row] == b"null"
988 });
989 if all_null && !group.absence_bitmap.is_empty() {
990 let any_present_null = (0..num_sub).any(|si| {
993 flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
994 });
995 if any_present_null {
996 } else {
999 merged_col.push(b"null".to_vec());
1001 continue;
1002 }
1003 } else if all_null {
1004 merged_col.push(b"null".to_vec());
1005 continue;
1006 }
1007
1008 let has_absent = (0..num_sub).any(|si| is_absent(si, row));
1010
1011 if !has_absent
1012 && !group.nested_template.is_empty()
1013 && group.nested_template.len() == num_sub + 1
1014 {
1015 let mut obj = Vec::new();
1018 obj.extend_from_slice(&group.nested_template[0]);
1019 if flat_idx < flat_col_values.len() {
1020 obj.extend_from_slice(flat_col_values[flat_idx][row]);
1021 }
1022 for si in 1..num_sub {
1023 obj.extend_from_slice(&group.nested_template[si]);
1024 if flat_idx + si < flat_col_values.len() {
1025 obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
1026 }
1027 }
1028 obj.extend_from_slice(&group.nested_template[num_sub]);
1029 merged_col.push(obj);
1030 } else {
1031 let mut obj = Vec::new();
1034 obj.push(b'{');
1035 let mut first = true;
1036 for si in 0..num_sub {
1037 if flat_idx + si >= flat_col_values.len() {
1038 break;
1039 }
1040 if is_absent(si, row) {
1041 continue; }
1043 let val = flat_col_values[flat_idx + si][row];
1044 if !first {
1045 obj.push(b',');
1046 }
1047 first = false;
1048 obj.push(b'"');
1049 obj.extend_from_slice(&group.sub_keys[si]);
1050 obj.push(b'"');
1051 obj.push(b':');
1052 obj.extend_from_slice(val);
1053 }
1054 obj.push(b'}');
1055 merged_col.push(obj);
1056 }
1057 }
1058 output_columns.push(merged_col);
1059 flat_idx += num_sub;
1060 } else {
1061 if flat_idx < flat_col_values.len() {
1063 let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1064 .iter()
1065 .map(|v| v.to_vec())
1066 .collect();
1067 output_columns.push(col);
1068 }
1069 flat_idx += 1;
1070 }
1071 }
1072
1073 let num_out_cols = output_columns.len();
1075 let mut out = Vec::new();
1076 for (ci, col) in output_columns.iter().enumerate() {
1077 for (ri, val) in col.iter().enumerate() {
1078 out.extend_from_slice(val);
1079 if ri < num_rows - 1 {
1080 out.push(VAL_SEP);
1081 }
1082 }
1083 if ci < num_out_cols - 1 {
1084 out.push(COL_SEP);
1085 }
1086 }
1087
1088 out
1089}
1090
1091pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1096 let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1097 let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
1098 let mut out = Vec::new();
1099 let version = if has_absence {
1100 3u8
1101 } else if has_template {
1102 2u8
1103 } else {
1104 1u8
1105 };
1106 out.push(version);
1107 out.push(groups.len() as u8);
1108 for group in groups {
1109 out.extend_from_slice(&group.original_col_index.to_le_bytes());
1110 out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1111 for key in &group.sub_keys {
1112 out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1113 out.extend_from_slice(key);
1114 }
1115 if has_template || version == 3 {
1116 out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1117 for part in &group.nested_template {
1118 out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1119 out.extend_from_slice(part);
1120 }
1121 }
1122 if version == 3 {
1123 let bm_len = group.absence_bitmap.len() as u32;
1124 out.extend_from_slice(&bm_len.to_le_bytes());
1125 out.extend_from_slice(&group.absence_bitmap);
1126 }
1127 }
1128 out
1129}
1130
1131pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1136 if data.is_empty() {
1137 return None;
1138 }
1139 let mut pos = 0;
1140 let version = data[pos];
1141 pos += 1;
1142 if version != 1 && version != 2 && version != 3 {
1143 return None;
1144 }
1145 let has_template = version == 2 || version == 3;
1146 let has_absence = version == 3;
1147 if pos >= data.len() {
1148 return None;
1149 }
1150 let num_groups = data[pos] as usize;
1151 pos += 1;
1152
1153 let mut groups = Vec::with_capacity(num_groups);
1154 for _ in 0..num_groups {
1155 if pos + 4 > data.len() {
1156 return None;
1157 }
1158 let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1159 pos += 2;
1160 let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1161 pos += 2;
1162
1163 let mut sub_keys = Vec::with_capacity(num_sub_cols);
1164 for _ in 0..num_sub_cols {
1165 if pos + 2 > data.len() {
1166 return None;
1167 }
1168 let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1169 pos += 2;
1170 if pos + key_len > data.len() {
1171 return None;
1172 }
1173 sub_keys.push(data[pos..pos + key_len].to_vec());
1174 pos += key_len;
1175 }
1176
1177 let nested_template = if has_template {
1178 if pos + 2 > data.len() {
1179 return None;
1180 }
1181 let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1182 pos += 2;
1183 let mut parts = Vec::with_capacity(num_parts);
1184 for _ in 0..num_parts {
1185 if pos + 2 > data.len() {
1186 return None;
1187 }
1188 let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1189 pos += 2;
1190 if pos + part_len > data.len() {
1191 return None;
1192 }
1193 parts.push(data[pos..pos + part_len].to_vec());
1194 pos += part_len;
1195 }
1196 parts
1197 } else {
1198 Vec::new()
1199 };
1200
1201 let absence_bitmap = if has_absence {
1202 if pos + 4 > data.len() {
1203 return None;
1204 }
1205 let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1206 pos += 4;
1207 if pos + bm_len > data.len() {
1208 return None;
1209 }
1210 let bm = data[pos..pos + bm_len].to_vec();
1211 pos += bm_len;
1212 bm
1213 } else {
1214 Vec::new()
1215 };
1216
1217 groups.push(NestedGroupInfo {
1218 original_col_index,
1219 sub_keys,
1220 nested_template,
1221 absence_bitmap,
1222 });
1223 }
1224
1225 Some((groups, pos))
1226}
1227
1228pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1233 if data.is_empty() {
1234 return None;
1235 }
1236
1237 let has_trailing_newline = data.last() == Some(&b'\n');
1238 let lines = split_lines(data);
1239 let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1240
1241 if non_empty.len() < 2 {
1242 return None;
1243 }
1244
1245 if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1247 if col_data.len() + metadata.len() < data.len() {
1248 let num_rows = non_empty.len();
1253 if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1254 let nested_bytes = serialize_nested_info(&nested_groups);
1256 metadata.extend_from_slice(&nested_bytes);
1257 return Some(TransformResult {
1258 data: flat_data,
1259 metadata,
1260 });
1261 }
1262 metadata.push(0u8); return Some(TransformResult {
1265 data: col_data,
1266 metadata,
1267 });
1268 }
1269 }
1270
1271 if let Some((grouped_data, grouped_metadata)) =
1273 preprocess_grouped(&non_empty, has_trailing_newline)
1274 {
1275 if grouped_data.len() + grouped_metadata.len() < data.len() {
1276 return Some(TransformResult {
1277 data: grouped_data,
1278 metadata: grouped_metadata,
1279 });
1280 }
1281 }
1282
1283 None
1284}
1285
1286pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1289 if metadata.is_empty() {
1290 return data.to_vec();
1291 }
1292 match metadata[0] {
1293 METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1294 METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1295 _ => data.to_vec(),
1296 }
1297}
1298
1299fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1301 if metadata.len() < 10 {
1302 return data.to_vec();
1303 }
1304 let mut pos = 0;
1305 let _version = metadata[pos];
1306 pos += 1;
1307 let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1308 pos += 4;
1309 let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1310 pos += 2;
1311 let has_trailing_newline = metadata[pos] != 0;
1312 pos += 1;
1313 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1314 pos += 2;
1315
1316 let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1317 for _ in 0..num_parts {
1318 if pos + 2 > metadata.len() {
1319 return data.to_vec();
1320 }
1321 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1322 pos += 2;
1323 if pos + part_len > metadata.len() {
1324 return data.to_vec();
1325 }
1326 parts.push(metadata[pos..pos + part_len].to_vec());
1327 pos += part_len;
1328 }
1329
1330 if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1331 return data.to_vec();
1332 }
1333
1334 let remaining_metadata = &metadata[pos..];
1336 if !remaining_metadata.is_empty()
1337 && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
1338 {
1339 if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1341 let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1343 let unflattened =
1344 unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1345 return reverse_uniform_from_parts(
1346 &unflattened,
1347 &parts,
1348 num_rows,
1349 num_cols,
1350 has_trailing_newline,
1351 );
1352 }
1353 }
1354
1355 reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1356}
1357
1358fn reverse_uniform_from_parts(
1360 data: &[u8],
1361 parts: &[Vec<u8>],
1362 num_rows: usize,
1363 num_cols: usize,
1364 has_trailing_newline: bool,
1365) -> Vec<u8> {
1366 let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1367 if col_chunks.len() != num_cols {
1368 return data.to_vec();
1369 }
1370
1371 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1372 for chunk in &col_chunks {
1373 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1374 if vals.len() != num_rows {
1375 return data.to_vec();
1376 }
1377 columns.push(vals);
1378 }
1379
1380 let mut output = Vec::with_capacity(data.len() * 2);
1381 #[allow(clippy::needless_range_loop)]
1382 for row in 0..num_rows {
1383 output.extend_from_slice(&parts[0]);
1384 output.extend_from_slice(columns[0][row]);
1385 for col in 1..num_cols {
1386 output.extend_from_slice(&parts[col]);
1387 output.extend_from_slice(columns[col][row]);
1388 }
1389 output.extend_from_slice(&parts[num_cols]);
1390
1391 if row < num_rows - 1 || has_trailing_newline {
1392 output.push(b'\n');
1393 }
1394 }
1395
1396 output
1397}
1398
1399fn parse_uniform_metadata(metadata: &[u8]) -> Option<(Vec<Vec<u8>>, usize, usize, bool)> {
1402 if metadata.len() < 10 {
1403 return None;
1404 }
1405 let mut pos = 1; let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1407 pos += 4;
1408 let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1409 pos += 2;
1410 let has_trailing_newline = metadata[pos] != 0;
1411 pos += 1;
1412 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1413 pos += 2;
1414
1415 let mut parts = Vec::with_capacity(num_parts);
1416 for _ in 0..num_parts {
1417 if pos + 2 > metadata.len() {
1418 return None;
1419 }
1420 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1421 pos += 2;
1422 if pos + part_len > metadata.len() {
1423 return None;
1424 }
1425 parts.push(metadata[pos..pos + part_len].to_vec());
1426 pos += part_len;
1427 }
1428
1429 if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1430 return None;
1431 }
1432
1433 Some((parts, num_rows, num_cols, has_trailing_newline))
1434}
1435
1436fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1438 if metadata.len() < 8 {
1439 return data.to_vec();
1440 }
1441
1442 let mut mpos = 1; let has_trailing_newline = metadata[mpos] != 0;
1444 mpos += 1;
1445 let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1446 mpos += 4;
1447 let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1448 mpos += 2;
1449
1450 let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1452
1453 let mut dpos: usize = 0;
1455
1456 for _ in 0..num_groups {
1457 if mpos + 4 > metadata.len() {
1459 return data.to_vec();
1460 }
1461 let group_row_count =
1462 u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1463 mpos += 4;
1464
1465 let mut row_indices = Vec::with_capacity(group_row_count);
1466 for _ in 0..group_row_count {
1467 if mpos + 4 > metadata.len() {
1468 return data.to_vec();
1469 }
1470 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1471 mpos += 4;
1472 row_indices.push(idx);
1473 }
1474
1475 if mpos + 4 > metadata.len() {
1477 return data.to_vec();
1478 }
1479 let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1480 mpos += 4;
1481 if mpos + gm_len > metadata.len() {
1482 return data.to_vec();
1483 }
1484 let group_metadata = &metadata[mpos..mpos + gm_len];
1485 mpos += gm_len;
1486
1487 if dpos + 4 > data.len() {
1489 return data.to_vec();
1490 }
1491 let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1492 dpos += 4;
1493 if dpos + gd_len > data.len() {
1494 return data.to_vec();
1495 }
1496 let group_data = &data[dpos..dpos + gd_len];
1497 dpos += gd_len;
1498
1499 let (parts, num_rows, num_cols, _trailing) = match parse_uniform_metadata(group_metadata) {
1501 Some(v) => v,
1502 None => return data.to_vec(),
1503 };
1504
1505 if num_rows != group_row_count {
1506 return data.to_vec();
1507 }
1508
1509 let col_chunks: Vec<&[u8]> = group_data.split(|&b| b == COL_SEP).collect();
1511 if col_chunks.len() != num_cols {
1512 return data.to_vec();
1513 }
1514
1515 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1516 for chunk in &col_chunks {
1517 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1518 if vals.len() != num_rows {
1519 return data.to_vec();
1520 }
1521 columns.push(vals);
1522 }
1523
1524 for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1526 let mut line = Vec::new();
1527 line.extend_from_slice(&parts[0]);
1528 line.extend_from_slice(columns[0][row_within_group]);
1529 for col in 1..num_cols {
1530 line.extend_from_slice(&parts[col]);
1531 line.extend_from_slice(columns[col][row_within_group]);
1532 }
1533 line.extend_from_slice(&parts[num_cols]);
1534
1535 if original_idx < total_rows {
1536 output_lines[original_idx] = Some(line);
1537 }
1538 }
1539 }
1540
1541 if mpos + 4 > metadata.len() {
1543 return data.to_vec();
1544 }
1545 let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1546 mpos += 4;
1547
1548 let mut residual_indices = Vec::with_capacity(residual_count);
1549 for _ in 0..residual_count {
1550 if mpos + 4 > metadata.len() {
1551 return data.to_vec();
1552 }
1553 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1554 mpos += 4;
1555 residual_indices.push(idx);
1556 }
1557
1558 let residual_data = &data[dpos..];
1560 if residual_count > 0 {
1561 let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
1562 vec![]
1563 } else {
1564 residual_data.split(|&b| b == b'\n').collect()
1565 };
1566 if residual_lines.len() != residual_count {
1568 return data.to_vec();
1569 }
1570 for (i, &idx) in residual_indices.iter().enumerate() {
1571 if idx < total_rows {
1572 output_lines[idx] = Some(residual_lines[i].to_vec());
1573 }
1574 }
1575 }
1576
1577 let mut output = Vec::with_capacity(data.len() * 2);
1579 for (i, slot) in output_lines.iter().enumerate() {
1580 match slot {
1581 Some(line) => output.extend_from_slice(line),
1582 None => {
1583 return data.to_vec();
1585 }
1586 }
1587 if i < total_rows - 1 || has_trailing_newline {
1588 output.push(b'\n');
1589 }
1590 }
1591
1592 output
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597 use super::*;
1598
1599 #[test]
1600 fn extract_value_string() {
1601 let line = br#""hello","next""#;
1602 let (val, end) = extract_value(line, 0).unwrap();
1603 assert_eq!(val, b"\"hello\"");
1604 assert_eq!(end, 7);
1605 }
1606
1607 #[test]
1608 fn extract_value_number() {
1609 let line = b"42,next";
1610 let (val, end) = extract_value(line, 0).unwrap();
1611 assert_eq!(val, b"42");
1612 assert_eq!(end, 2);
1613 }
1614
1615 #[test]
1616 fn extract_value_bool() {
1617 let line = b"true,next";
1618 let (val, end) = extract_value(line, 0).unwrap();
1619 assert_eq!(val, b"true");
1620 assert_eq!(end, 4);
1621 }
1622
1623 #[test]
1624 fn extract_value_null() {
1625 let line = b"null,next";
1626 let (val, end) = extract_value(line, 0).unwrap();
1627 assert_eq!(val, b"null");
1628 assert_eq!(end, 4);
1629 }
1630
1631 #[test]
1632 fn extract_value_object() {
1633 let line = br#"{"a":1,"b":"x"},next"#;
1634 let (val, end) = extract_value(line, 0).unwrap();
1635 assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
1636 assert_eq!(end, 15);
1637 }
1638
1639 #[test]
1640 fn extract_value_array() {
1641 let line = b"[1,2,3],next";
1642 let (val, end) = extract_value(line, 0).unwrap();
1643 assert_eq!(val, b"[1,2,3]");
1644 assert_eq!(end, 7);
1645 }
1646
1647 #[test]
1648 fn extract_value_string_with_escapes() {
1649 let line = br#""he\"llo",next"#;
1650 let (val, end) = extract_value(line, 0).unwrap();
1651 assert_eq!(val, br#""he\"llo""#.to_vec());
1652 assert_eq!(end, 9);
1653 }
1654
1655 #[test]
1656 fn parse_line_simple() {
1657 let line = br#"{"a":1,"b":"x"}"#;
1658 let (parts, values) = parse_line(line).unwrap();
1659 assert_eq!(parts.len(), 3); assert_eq!(values.len(), 2);
1661 assert_eq!(values[0], b"1");
1662 assert_eq!(values[1], b"\"x\"");
1663 assert_eq!(parts[0], br#"{"a":"#.to_vec());
1664 assert_eq!(parts[1], br#","b":"#.to_vec());
1665 assert_eq!(parts[2], b"}");
1666 }
1667
1668 #[test]
1669 fn roundtrip_simple() {
1670 let data = br#"{"a":1,"b":"x"}
1671{"a":2,"b":"y"}
1672{"a":3,"b":"z"}
1673"#;
1674 let result = preprocess(data).expect("should produce transform");
1675 let restored = reverse(&result.data, &result.metadata);
1676 assert_eq!(
1677 String::from_utf8_lossy(&restored),
1678 String::from_utf8_lossy(data),
1679 );
1680 assert_eq!(restored, data.to_vec());
1681 }
1682
1683 #[test]
1684 fn roundtrip_no_trailing_newline() {
1685 let data = br#"{"a":1,"b":"x"}
1686{"a":2,"b":"y"}
1687{"a":3,"b":"z"}"#;
1688 let result = preprocess(data).expect("should produce transform");
1689 let restored = reverse(&result.data, &result.metadata);
1690 assert_eq!(restored, data.to_vec());
1691 }
1692
1693 #[test]
1694 fn roundtrip_nested_values() {
1695 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1696{"id":2,"meta":{"x":30,"y":40}}
1697{"id":3,"meta":{"x":50,"y":60}}
1698{"id":4,"meta":{"x":70,"y":80}}
1699{"id":5,"meta":{"x":90,"y":100}}
1700"#;
1701 let result = preprocess(data).expect("should produce transform");
1702 let restored = reverse(&result.data, &result.metadata);
1703 assert_eq!(restored, data.to_vec());
1704 }
1705
1706 #[test]
1707 fn roundtrip_mixed_types() {
1708 let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
1709{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
1710{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
1711{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
1712{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
1713"#;
1714 let result = preprocess(data).expect("should produce transform");
1715 let restored = reverse(&result.data, &result.metadata);
1716 assert_eq!(restored, data.to_vec());
1717 }
1718
1719 #[test]
1720 fn schema_mismatch_too_few_returns_none() {
1721 let data = br#"{"a":1,"b":2}
1723{"a":1,"c":3}
1724"#;
1725 assert!(preprocess(data).is_none());
1726 }
1727
1728 #[test]
1729 fn different_num_keys_too_few_returns_none() {
1730 let data = br#"{"a":1,"b":2}
1731{"a":1}
1732"#;
1733 assert!(preprocess(data).is_none());
1734 }
1735
1736 #[test]
1737 fn single_line_returns_none() {
1738 let data = br#"{"a":1,"b":2}
1739"#;
1740 assert!(preprocess(data).is_none());
1741 }
1742
1743 #[test]
1744 fn empty_returns_none() {
1745 assert!(preprocess(b"").is_none());
1746 }
1747
1748 #[test]
1749 fn column_layout_groups_similar_values() {
1750 let data = br#"{"type":"page_view","user":"alice"}
1751{"type":"api_call","user":"alice"}
1752{"type":"click","user":"bob"}
1753"#;
1754 let result = preprocess(data).unwrap();
1755
1756 let col_data = &result.data;
1758 let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
1759 assert_eq!(cols.len(), 2);
1760
1761 let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
1763 assert_eq!(type_vals.len(), 3);
1764 assert_eq!(type_vals[0], br#""page_view""#);
1765 assert_eq!(type_vals[1], br#""api_call""#);
1766 assert_eq!(type_vals[2], br#""click""#);
1767
1768 let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
1770 assert_eq!(user_vals.len(), 3);
1771 assert_eq!(user_vals[0], br#""alice""#);
1772 assert_eq!(user_vals[1], br#""alice""#);
1773 assert_eq!(user_vals[2], br#""bob""#);
1774 }
1775
1776 #[test]
1777 fn roundtrip_string_with_escaped_chars() {
1778 let data = br#"{"msg":"he said \"hi\"","val":1}
1779{"msg":"line\nbreak","val":2}
1780{"msg":"tab\there","val":3}
1781{"msg":"back\\slash","val":4}
1782{"msg":"normal text","val":5}
1783"#;
1784 let result = preprocess(data).expect("should produce transform");
1785 let restored = reverse(&result.data, &result.metadata);
1786 assert_eq!(restored, data.to_vec());
1787 }
1788
1789 #[test]
1790 fn roundtrip_negative_and_float_numbers() {
1791 let data = br#"{"x":-3.14,"y":0}
1792{"x":2.718,"y":-1}
1793{"x":0.001,"y":999}
1794{"x":-100,"y":-200}
1795{"x":42.0,"y":7}
1796"#;
1797 let result = preprocess(data).expect("should produce transform");
1798 let restored = reverse(&result.data, &result.metadata);
1799 assert_eq!(restored, data.to_vec());
1800 }
1801
1802 #[test]
1805 fn reverse_roundtrip_small_data() {
1806 let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
1808 assert_eq!(vals.len(), 2);
1809 assert_eq!(parts.len(), 3);
1810
1811 let big_data = br#"{"x":-3.14,"y":0}
1813{"x":2.718,"y":-1}
1814"#
1815 .repeat(20);
1816 let result = preprocess(&big_data).expect("should produce transform with 40 rows");
1817 let restored = reverse(&result.data, &result.metadata);
1818 assert_eq!(restored, big_data);
1819 }
1820
1821 #[test]
1824 fn grouped_roundtrip_two_schemas() {
1825 let mut data = Vec::new();
1827 for i in 0..10 {
1828 data.extend_from_slice(
1829 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1830 );
1831 data.push(b'\n');
1832 }
1833 for i in 10..20 {
1834 data.extend_from_slice(
1835 format!(
1836 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1837 i, i, i
1838 )
1839 .as_bytes(),
1840 );
1841 data.push(b'\n');
1842 }
1843 let result = preprocess(&data).expect("should produce grouped transform");
1844 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1846 let restored = reverse(&result.data, &result.metadata);
1847 assert_eq!(restored, data);
1848 }
1849
1850 #[test]
1851 fn grouped_roundtrip_interleaved_schemas() {
1852 let mut data = Vec::new();
1854 for i in 0..20 {
1855 if i % 2 == 0 {
1856 data.extend_from_slice(
1857 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1858 );
1859 } else {
1860 data.extend_from_slice(
1861 format!(
1862 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1863 i, i, i
1864 )
1865 .as_bytes(),
1866 );
1867 }
1868 data.push(b'\n');
1869 }
1870 let result = preprocess(&data).expect("should produce grouped transform");
1871 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1872 let restored = reverse(&result.data, &result.metadata);
1873 assert_eq!(restored, data);
1874 }
1875
1876 #[test]
1877 fn grouped_roundtrip_with_residuals() {
1878 let mut data = Vec::new();
1880 for i in 0..8 {
1882 data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
1883 data.push(b'\n');
1884 }
1885 data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
1887 data.push(b'\n');
1888 data.extend_from_slice(br#"{"p":"q"}"#);
1889 data.push(b'\n');
1890 for i in 0..6 {
1892 data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
1893 data.push(b'\n');
1894 }
1895 let result = preprocess(&data).expect("should produce grouped transform");
1896 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1897 let restored = reverse(&result.data, &result.metadata);
1898 assert_eq!(
1899 String::from_utf8_lossy(&restored),
1900 String::from_utf8_lossy(&data),
1901 );
1902 assert_eq!(restored, data);
1903 }
1904
1905 #[test]
1906 fn grouped_roundtrip_no_trailing_newline() {
1907 let mut data = Vec::new();
1908 for i in 0..6 {
1909 data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
1910 data.push(b'\n');
1911 }
1912 for i in 0..6 {
1913 data.extend_from_slice(
1914 format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
1915 );
1916 if i < 5 {
1917 data.push(b'\n');
1918 }
1919 }
1921 let result = preprocess(&data).expect("should produce grouped transform");
1922 let restored = reverse(&result.data, &result.metadata);
1923 assert_eq!(restored, data);
1924 }
1925
1926 #[test]
1927 fn uniform_still_preferred_over_grouped() {
1928 let data = br#"{"a":1,"b":"x"}
1930{"a":2,"b":"y"}
1931{"a":3,"b":"z"}
1932{"a":4,"b":"w"}
1933{"a":5,"b":"v"}
1934"#;
1935 let result = preprocess(data).expect("should produce transform");
1936 assert_eq!(
1937 result.metadata[0], METADATA_VERSION_UNIFORM,
1938 "uniform schema should use Strategy 1"
1939 );
1940 let restored = reverse(&result.data, &result.metadata);
1941 assert_eq!(restored, data.to_vec());
1942 }
1943
1944 #[test]
1945 fn grouped_gharchive_simulation() {
1946 let mut data = Vec::new();
1948 for i in 0..50 {
1949 if i % 5 == 0 {
1950 data.extend_from_slice(
1952 format!(
1953 r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
1954 i, i, i, i
1955 )
1956 .as_bytes(),
1957 );
1958 } else {
1959 data.extend_from_slice(
1961 format!(
1962 r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
1963 i, i, i
1964 )
1965 .as_bytes(),
1966 );
1967 }
1968 data.push(b'\n');
1969 }
1970 let result = preprocess(&data).expect("should produce grouped transform");
1971 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1972 let restored = reverse(&result.data, &result.metadata);
1973 assert_eq!(restored, data);
1974 }
1975
1976 #[test]
1979 fn test_nested_decomposition_basic() {
1980 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1982{"id":2,"meta":{"x":30,"y":40}}
1983{"id":3,"meta":{"x":50,"y":60}}
1984"#;
1985 let result = preprocess(data).expect("should produce transform");
1986 assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
1987
1988 let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
1990 assert_eq!(
1992 cols.len(),
1993 3,
1994 "should have 3 columns after flattening: got {}",
1995 cols.len()
1996 );
1997
1998 let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2000 assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
2001
2002 let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
2003 assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
2004 }
2005
2006 #[test]
2007 fn test_nested_roundtrip() {
2008 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2010{"id":2,"meta":{"x":30,"y":40}}
2011{"id":3,"meta":{"x":50,"y":60}}
2012"#;
2013 let result = preprocess(data).expect("should produce transform");
2014 let restored = reverse(&result.data, &result.metadata);
2015 assert_eq!(
2016 String::from_utf8_lossy(&restored),
2017 String::from_utf8_lossy(data),
2018 );
2019 assert_eq!(restored, data.to_vec());
2020 }
2021
2022 #[test]
2023 fn test_nested_mixed_schemas() {
2024 let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
2026{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
2027{"ts":"c","meta":{"query":"pricing","results_count":25}}
2028{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
2029{"ts":"e","meta":{"query":"api docs","results_count":41}}
2030"#;
2031 let result = preprocess(data).expect("should produce transform");
2032 let restored = reverse(&result.data, &result.metadata);
2033 assert_eq!(
2034 String::from_utf8_lossy(&restored),
2035 String::from_utf8_lossy(data),
2036 );
2037 assert_eq!(restored, data.to_vec());
2038 }
2039
2040 #[test]
2041 fn test_nested_no_nested_objects() {
2042 let data = br#"{"a":1,"b":"x"}
2044{"a":2,"b":"y"}
2045{"a":3,"b":"z"}
2046"#;
2047 let result = preprocess(data).expect("should produce transform");
2048 let restored = reverse(&result.data, &result.metadata);
2049 assert_eq!(restored, data.to_vec());
2050
2051 let meta = &result.metadata;
2057 let last_byte = meta[meta.len() - 1];
2058 assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
2059 }
2060
2061 #[test]
2062 fn test_nested_real_corpus() {
2063 let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
2065{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
2066{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
2067{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
2068{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
2069"#;
2070 let result = preprocess(data).expect("should produce transform");
2071 let restored = reverse(&result.data, &result.metadata);
2072 assert_eq!(
2073 String::from_utf8_lossy(&restored),
2074 String::from_utf8_lossy(data),
2075 );
2076 assert_eq!(restored, data.to_vec());
2077 }
2078
2079 #[test]
2080 fn test_nested_roundtrip_with_null_values() {
2081 let data = br#"{"id":1,"meta":{"x":10}}
2083{"id":2,"meta":null}
2084{"id":3,"meta":{"x":30}}
2085{"id":4,"meta":null}
2086{"id":5,"meta":{"x":50}}
2087"#;
2088 let result = preprocess(data).expect("should produce transform");
2089 let restored = reverse(&result.data, &result.metadata);
2090 assert_eq!(restored, data.to_vec());
2091 }
2092
2093 #[test]
2094 fn test_nested_string_values_preserved_exact() {
2095 let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2097{"id":2,"meta":{"name":"Bob","score":200}}
2098{"id":3,"meta":{"name":"Charlie","score":300}}
2099"#;
2100 let result = preprocess(data).expect("should produce transform");
2101 let restored = reverse(&result.data, &result.metadata);
2102 assert_eq!(restored, data.to_vec());
2103 }
2104
2105 #[test]
2106 fn test_parse_nested_object_kv() {
2107 let obj = br#"{"query":"benchmark","results_count":14}"#;
2108 let pairs = parse_nested_object_kv(obj).unwrap();
2109 assert_eq!(pairs.len(), 2);
2110 assert_eq!(pairs[0].0, b"query");
2111 assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2112 assert_eq!(pairs[1].0, b"results_count");
2113 assert_eq!(pairs[1].1, b"14");
2114 }
2115
2116 #[test]
2117 fn test_nested_varying_subkeys_roundtrip() {
2118 let mut lines = Vec::new();
2121 for i in 0..50 {
2122 let line = if i % 2 == 0 {
2123 format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
2124 } else {
2125 format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
2126 };
2127 lines.push(line);
2128 }
2129 let ndjson = lines.join("\n") + "\n";
2130 let data = ndjson.as_bytes();
2131
2132 let result = preprocess(data).expect("should produce transform");
2133 let restored = reverse(&result.data, &result.metadata);
2134 assert_eq!(
2135 std::str::from_utf8(&restored).unwrap(),
2136 std::str::from_utf8(data).unwrap(),
2137 "varying sub-keys roundtrip must be byte-exact"
2138 );
2139 }
2140
2141 #[test]
2142 fn test_nested_explicit_null_preserved() {
2143 let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
2146 {\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
2147 {\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
2148 let result = preprocess(data).expect("should produce transform");
2149 let restored = reverse(&result.data, &result.metadata);
2150 assert_eq!(
2151 std::str::from_utf8(&restored).unwrap(),
2152 std::str::from_utf8(data).unwrap(),
2153 "explicit null values must be preserved"
2154 );
2155 }
2156}