1use super::transform::TransformResult;
24use std::collections::HashMap;
25
26const COL_SEP: u8 = 0x00;
27const VAL_SEP: u8 = 0x01;
28const METADATA_VERSION_UNIFORM: u8 = 1;
29const METADATA_VERSION_GROUPED: u8 = 2;
30
31const MIN_GROUP_ROWS: usize = 5;
33
34type SchemaGroup = (Vec<Vec<u8>>, Vec<(usize, Vec<Vec<u8>>)>);
36
37fn extract_value(line: &[u8], mut pos: usize) -> Option<(Vec<u8>, usize)> {
43 while pos < line.len() && line[pos].is_ascii_whitespace() {
45 pos += 1;
46 }
47 if pos >= line.len() {
48 return None;
49 }
50
51 let start = pos;
52 match line[pos] {
53 b'"' => {
54 pos += 1;
56 let mut escaped = false;
57 while pos < line.len() {
58 if escaped {
59 escaped = false;
60 } else if line[pos] == b'\\' {
61 escaped = true;
62 } else if line[pos] == b'"' {
63 pos += 1;
64 return Some((line[start..pos].to_vec(), pos));
65 }
66 pos += 1;
67 }
68 None }
70 b'{' => {
71 let mut depth = 1;
73 pos += 1;
74 while pos < line.len() && depth > 0 {
75 match line[pos] {
76 b'"' => {
77 pos += 1;
79 let mut escaped = false;
80 while pos < line.len() {
81 if escaped {
82 escaped = false;
83 } else if line[pos] == b'\\' {
84 escaped = true;
85 } else if line[pos] == b'"' {
86 break;
87 }
88 pos += 1;
89 }
90 }
91 b'{' => depth += 1,
92 b'}' => depth -= 1,
93 _ => {}
94 }
95 pos += 1;
96 }
97 if depth != 0 || pos > line.len() {
98 return None; }
100 Some((line[start..pos].to_vec(), pos))
101 }
102 b'[' => {
103 let mut depth = 1;
105 pos += 1;
106 while pos < line.len() && depth > 0 {
107 match line[pos] {
108 b'"' => {
109 pos += 1;
110 let mut escaped = false;
111 while pos < line.len() {
112 if escaped {
113 escaped = false;
114 } else if line[pos] == b'\\' {
115 escaped = true;
116 } else if line[pos] == b'"' {
117 break;
118 }
119 pos += 1;
120 }
121 }
122 b'[' => depth += 1,
123 b']' => depth -= 1,
124 _ => {}
125 }
126 pos += 1;
127 }
128 if depth != 0 || pos > line.len() {
129 return None; }
131 Some((line[start..pos].to_vec(), pos))
132 }
133 _ => {
134 while pos < line.len() {
136 match line[pos] {
137 b',' | b'}' | b']' => break,
138 _ if line[pos].is_ascii_whitespace() => break,
139 _ => pos += 1,
140 }
141 }
142 if pos == start {
143 None
144 } else {
145 Some((line[start..pos].to_vec(), pos))
146 }
147 }
148 }
149}
150
151type ParsedLine = (Vec<Vec<u8>>, Vec<Vec<u8>>);
162
163fn parse_line(line: &[u8]) -> Option<ParsedLine> {
164 let mut pos = 0;
165
166 while pos < line.len() && line[pos].is_ascii_whitespace() {
168 pos += 1;
169 }
170 if pos >= line.len() || line[pos] != b'{' {
171 return None;
172 }
173
174 let mut parts: Vec<Vec<u8>> = Vec::new();
175 let mut values: Vec<Vec<u8>> = Vec::new();
176 let mut part_start = 0;
177
178 pos += 1; loop {
181 while pos < line.len() && line[pos].is_ascii_whitespace() {
183 pos += 1;
184 }
185 if pos >= line.len() {
186 return None;
187 }
188
189 if line[pos] == b'}' {
191 parts.push(line[part_start..].to_vec());
193 break;
194 }
195
196 if line[pos] != b'"' {
198 return None;
199 }
200 pos += 1;
202 let mut escaped = false;
203 while pos < line.len() {
204 if escaped {
205 escaped = false;
206 } else if line[pos] == b'\\' {
207 escaped = true;
208 } else if line[pos] == b'"' {
209 pos += 1;
210 break;
211 }
212 pos += 1;
213 }
214
215 while pos < line.len() && line[pos].is_ascii_whitespace() {
217 pos += 1;
218 }
219 if pos >= line.len() || line[pos] != b':' {
220 return None;
221 }
222 pos += 1; while pos < line.len() && line[pos].is_ascii_whitespace() {
228 pos += 1;
229 }
230
231 parts.push(line[part_start..pos].to_vec());
233
234 let (value, value_end) = extract_value(line, pos)?;
236 values.push(value);
237 pos = value_end;
238
239 part_start = pos;
241
242 while pos < line.len() && line[pos].is_ascii_whitespace() {
244 pos += 1;
245 }
246 if pos >= line.len() {
247 return None;
248 }
249
250 if line[pos] == b',' {
252 pos += 1;
253 } else if line[pos] == b'}' {
254 parts.push(line[part_start..].to_vec());
258 break;
259 } else {
260 return None; }
262 }
263
264 if values.is_empty() {
265 return None;
266 }
267
268 Some((parts, values))
269}
270
271fn split_lines(data: &[u8]) -> Vec<&[u8]> {
273 let mut lines: Vec<&[u8]> = Vec::new();
274 let mut start = 0;
275 for i in 0..data.len() {
276 if data[i] == b'\n' {
277 lines.push(&data[start..i]);
278 start = i + 1;
279 }
280 }
281 if start < data.len() {
282 lines.push(&data[start..]);
283 }
284 lines
285}
286
287fn build_uniform_columnar(
290 template_parts: &[Vec<u8>],
291 columns: &[Vec<Vec<u8>>],
292 num_rows: usize,
293 has_trailing_newline: bool,
294) -> (Vec<u8>, Vec<u8>) {
295 let num_cols = columns.len();
296
297 let mut col_data = Vec::new();
299 for (ci, col) in columns.iter().enumerate() {
300 for (ri, val) in col.iter().enumerate() {
301 col_data.extend_from_slice(val);
302 if ri < num_rows - 1 {
303 col_data.push(VAL_SEP);
304 }
305 }
306 if ci < num_cols - 1 {
307 col_data.push(COL_SEP);
308 }
309 }
310
311 let mut metadata = Vec::new();
313 metadata.push(METADATA_VERSION_UNIFORM);
314 metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
315 metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
316 metadata.push(if has_trailing_newline { 1 } else { 0 });
317 metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
318 for part in template_parts {
319 metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
320 metadata.extend_from_slice(part);
321 }
322
323 (col_data, metadata)
324}
325
326fn preprocess_uniform(
329 non_empty: &[&[u8]],
330 has_trailing_newline: bool,
331) -> Option<(Vec<u8>, Vec<u8>)> {
332 if non_empty.len() < 2 {
333 return None;
334 }
335
336 let (template_parts, first_values) = parse_line(non_empty[0])?;
337 let num_cols = first_values.len();
338 if template_parts.len() != num_cols + 1 {
339 return None;
340 }
341
342 let mut columns: Vec<Vec<Vec<u8>>> = Vec::with_capacity(num_cols);
343 for v in &first_values {
344 columns.push(vec![v.clone()]);
345 }
346
347 for &line in &non_empty[1..] {
348 let (parts, values) = parse_line(line)?;
349 if values.len() != num_cols || parts.len() != template_parts.len() {
350 return None;
351 }
352 for (a, b) in parts.iter().zip(template_parts.iter()) {
353 if a != b {
354 return None;
355 }
356 }
357 for (col, val) in values.iter().enumerate() {
358 columns[col].push(val.clone());
359 }
360 }
361
362 Some(build_uniform_columnar(
363 &template_parts,
364 &columns,
365 non_empty.len(),
366 has_trailing_newline,
367 ))
368}
369
370fn preprocess_grouped(
391 non_empty: &[&[u8]],
392 has_trailing_newline: bool,
393) -> Option<(Vec<u8>, Vec<u8>)> {
394 if non_empty.len() < MIN_GROUP_ROWS {
395 return None;
396 }
397
398 let mut parsed: Vec<Option<ParsedLine>> = Vec::with_capacity(non_empty.len());
401 for &line in non_empty {
402 parsed.push(parse_line(line));
403 }
404
405 let mut group_map: HashMap<Vec<u8>, SchemaGroup> = HashMap::new();
408 let mut residual_indices: Vec<usize> = Vec::new();
409
410 for (idx, parsed_line) in parsed.into_iter().enumerate() {
411 if let Some((parts, values)) = parsed_line {
412 let mut key = Vec::new();
414 for part in &parts {
415 key.extend_from_slice(&(part.len() as u32).to_le_bytes());
416 key.extend_from_slice(part);
417 }
418 group_map
419 .entry(key)
420 .or_insert_with(|| (parts, Vec::new()))
421 .1
422 .push((idx, values));
423 } else {
424 residual_indices.push(idx);
426 }
427 }
428
429 let mut groups: Vec<SchemaGroup> = Vec::new();
431 for (_key, (template_parts, rows)) in group_map {
432 if rows.len() >= MIN_GROUP_ROWS {
433 groups.push((template_parts, rows));
434 } else {
435 for (idx, _) in &rows {
437 residual_indices.push(*idx);
438 }
439 }
440 }
441
442 if groups.is_empty() {
444 return None;
445 }
446
447 groups.sort_by_key(|(_, rows)| rows[0].0);
449 residual_indices.sort_unstable();
450
451 struct GroupOutput {
453 row_indices: Vec<u32>,
454 col_data: Vec<u8>,
455 group_metadata: Vec<u8>,
456 }
457
458 let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
459
460 for (template_parts, rows) in &groups {
461 let num_cols = template_parts.len() - 1;
462 let mut columns: Vec<Vec<Vec<u8>>> = (0..num_cols).map(|_| Vec::new()).collect();
463 let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
464
465 for (idx, values) in rows {
466 row_indices.push(*idx as u32);
467 for (col, val) in values.iter().enumerate() {
468 columns[col].push(val.clone());
469 }
470 }
471
472 let (col_data, group_metadata) =
474 build_uniform_columnar(template_parts, &columns, rows.len(), false);
475
476 group_outputs.push(GroupOutput {
477 row_indices,
478 col_data,
479 group_metadata,
480 });
481 }
482
483 let mut data_out = Vec::new();
485 for group in &group_outputs {
486 data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
487 data_out.extend_from_slice(&group.col_data);
488 }
489
490 let residual_start = data_out.len();
492 for (i, &idx) in residual_indices.iter().enumerate() {
493 data_out.extend_from_slice(non_empty[idx]);
494 if i < residual_indices.len() - 1 {
495 data_out.push(b'\n');
496 }
497 }
498 let _residual_len = data_out.len() - residual_start;
499
500 let mut metadata = Vec::new();
502 metadata.push(METADATA_VERSION_GROUPED);
503 metadata.push(if has_trailing_newline { 1 } else { 0 });
504 metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
505 metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
506
507 for group in &group_outputs {
508 metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
509 for &idx in &group.row_indices {
510 metadata.extend_from_slice(&idx.to_le_bytes());
511 }
512 metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
513 metadata.extend_from_slice(&group.group_metadata);
514 }
515
516 metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
517 for &idx in &residual_indices {
518 metadata.extend_from_slice(&(idx as u32).to_le_bytes());
519 }
520
521 Some((data_out, metadata))
522}
523
524pub(crate) struct NestedGroupInfo {
526 pub(crate) original_col_index: u16,
528 pub(crate) sub_keys: Vec<Vec<u8>>,
530 pub(crate) nested_template: Vec<Vec<u8>>,
533 pub(crate) absence_bitmap: Vec<u8>,
539}
540
541pub(crate) fn flatten_nested_columns(
547 col_data: &[u8],
548 num_rows: usize,
549) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
550 let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
552 if columns.is_empty() || num_rows == 0 {
553 return None;
554 }
555
556 let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
557 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
560
561 for (col_idx, &col_chunk) in columns.iter().enumerate() {
562 let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
563 if values.len() != num_rows {
564 return None;
565 }
566
567 let mut all_objects = true;
569 let mut has_non_null = false;
570 for val in &values {
571 if *val == b"null" {
572 continue;
573 }
574 has_non_null = true;
575 if !val.starts_with(b"{") {
576 all_objects = false;
577 break;
578 }
579 }
580
581 if !all_objects || !has_non_null {
582 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
584 output_columns.push(col_values);
585 continue;
586 }
587
588 let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
592 let mut nested_template: Vec<Vec<u8>> = Vec::new();
593 type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
594 let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
595
596 for val in &values {
597 if *val == b"null" {
598 parsed_rows.push(None);
599 continue;
600 }
601 if nested_template.is_empty() {
602 match parse_nested_object_with_template(val) {
604 Some((template, kv_pairs)) => {
605 for (key, _) in &kv_pairs {
606 if !all_sub_keys.iter().any(|k| k == key) {
607 all_sub_keys.push(key.clone());
608 }
609 }
610 nested_template = template;
611 parsed_rows.push(Some(kv_pairs));
612 }
613 None => {
614 all_sub_keys.clear();
615 break;
616 }
617 }
618 } else {
619 match parse_nested_object_kv(val) {
621 Some(kv_pairs) => {
622 for (key, _) in &kv_pairs {
623 if !all_sub_keys.iter().any(|k| k == key) {
624 all_sub_keys.push(key.clone());
625 }
626 }
627 parsed_rows.push(Some(kv_pairs));
628 }
629 None => {
630 all_sub_keys.clear();
631 break;
632 }
633 }
634 }
635 }
636
637 if all_sub_keys.is_empty() {
638 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
640 output_columns.push(col_values);
641 continue;
642 }
643
644 let num_sub_keys = all_sub_keys.len();
648 let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
649 let total_bits = num_sub_keys * num_rows;
650 let bitmap_bytes = total_bits.div_ceil(8);
651 let mut absence_bitmap = vec![0u8; bitmap_bytes];
652 let mut has_any_absent = false;
653
654 for (row_idx, parsed) in parsed_rows.iter().enumerate() {
655 match parsed {
656 Some(kv_pairs) => {
657 for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
658 let found = kv_pairs.iter().find(|(k, _)| k == sk);
659 match found {
660 Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
661 None => {
662 sub_columns[sk_idx].push(b"null".to_vec());
663 let bit_idx = sk_idx * num_rows + row_idx;
665 absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
666 has_any_absent = true;
667 }
668 }
669 }
670 }
671 None => {
672 for sc in sub_columns.iter_mut() {
676 sc.push(b"null".to_vec());
677 }
678 }
679 }
680 }
681
682 nested_groups.push(NestedGroupInfo {
683 original_col_index: col_idx as u16,
684 sub_keys: all_sub_keys,
685 nested_template,
686 absence_bitmap: if has_any_absent {
687 absence_bitmap
688 } else {
689 Vec::new()
690 },
691 });
692
693 for sc in sub_columns {
694 output_columns.push(sc);
695 }
696 }
697
698 if nested_groups.is_empty() {
699 return None;
700 }
701
702 let num_out_cols = output_columns.len();
704 let mut out = Vec::new();
705 for (ci, col) in output_columns.iter().enumerate() {
706 for (ri, val) in col.iter().enumerate() {
707 out.extend_from_slice(val);
708 if ri < num_rows - 1 {
709 out.push(VAL_SEP);
710 }
711 }
712 if ci < num_out_cols - 1 {
713 out.push(COL_SEP);
714 }
715 }
716
717 Some((out, nested_groups))
718}
719
720#[allow(clippy::type_complexity)]
725pub(crate) fn parse_nested_object_with_template(
726 obj: &[u8],
727) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
728 let mut pos = 0;
729
730 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
732 pos += 1;
733 }
734 if pos >= obj.len() || obj[pos] != b'{' {
735 return None;
736 }
737 pos += 1;
738
739 let mut parts: Vec<Vec<u8>> = Vec::new();
740 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
741 let mut part_start = 0;
742
743 loop {
744 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
746 pos += 1;
747 }
748 if pos >= obj.len() {
749 return None;
750 }
751 if obj[pos] == b'}' {
752 parts.push(obj[part_start..].to_vec());
753 break;
754 }
755
756 if obj[pos] != b'"' {
758 return None;
759 }
760 let key_str_start = pos + 1;
761 pos += 1;
762 let mut escaped = false;
763 while pos < obj.len() {
764 if escaped {
765 escaped = false;
766 } else if obj[pos] == b'\\' {
767 escaped = true;
768 } else if obj[pos] == b'"' {
769 break;
770 }
771 pos += 1;
772 }
773 if pos >= obj.len() {
774 return None;
775 }
776 let key = obj[key_str_start..pos].to_vec();
777 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
781 pos += 1;
782 }
783 if pos >= obj.len() || obj[pos] != b':' {
784 return None;
785 }
786 pos += 1;
787
788 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
790 pos += 1;
791 }
792
793 parts.push(obj[part_start..pos].to_vec());
795
796 let value_start = pos;
798 let (value, value_end) = extract_value(obj, value_start)?;
800 pos = value_end;
801 pairs.push((key, value));
802
803 part_start = pos;
804
805 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
807 pos += 1;
808 }
809 if pos >= obj.len() {
810 return None;
811 }
812 if obj[pos] == b',' {
813 pos += 1;
814 } else if obj[pos] == b'}' {
815 parts.push(obj[part_start..].to_vec());
816 break;
817 } else {
818 return None;
819 }
820 }
821
822 if pairs.is_empty() {
823 return None;
824 }
825 Some((parts, pairs))
826}
827
828pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
832 let mut pos = 0;
833
834 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
836 pos += 1;
837 }
838 if pos >= obj.len() || obj[pos] != b'{' {
839 return None;
840 }
841 pos += 1;
842
843 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
844
845 loop {
846 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
848 pos += 1;
849 }
850 if pos >= obj.len() {
851 return None;
852 }
853 if obj[pos] == b'}' {
854 break;
855 }
856
857 if obj[pos] != b'"' {
859 return None;
860 }
861 pos += 1;
862 let key_start = pos;
863 let mut escaped = false;
864 while pos < obj.len() {
865 if escaped {
866 escaped = false;
867 } else if obj[pos] == b'\\' {
868 escaped = true;
869 } else if obj[pos] == b'"' {
870 break;
871 }
872 pos += 1;
873 }
874 if pos >= obj.len() {
875 return None;
876 }
877 let key = obj[key_start..pos].to_vec();
878 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
882 pos += 1;
883 }
884 if pos >= obj.len() || obj[pos] != b':' {
885 return None;
886 }
887 pos += 1;
888
889 let (value, value_end) = extract_value(obj, pos)?;
891 pos = value_end;
892 pairs.push((key, value));
893
894 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
896 pos += 1;
897 }
898 if pos >= obj.len() {
899 return None;
900 }
901 if obj[pos] == b',' {
902 pos += 1;
903 } else if obj[pos] == b'}' {
904 break;
905 } else {
906 return None;
907 }
908 }
909
910 if pairs.is_empty() {
911 return None;
912 }
913 Some(pairs)
914}
915
916pub(crate) fn unflatten_nested_columns(
921 flat_data: &[u8],
922 nested_groups: &[NestedGroupInfo],
923 num_rows: usize,
924 total_flat_cols: usize,
925) -> Vec<u8> {
926 let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
927 if flat_columns.len() != total_flat_cols {
928 return flat_data.to_vec();
929 }
930
931 let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
933 for chunk in &flat_columns {
934 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
935 if vals.len() != num_rows {
936 return flat_data.to_vec();
937 }
938 flat_col_values.push(vals);
939 }
940
941 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
944
945 let original_num_cols = total_flat_cols
954 - nested_groups
955 .iter()
956 .map(|g| g.sub_keys.len())
957 .sum::<usize>()
958 + nested_groups.len();
959
960 let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
962 for (gi, group) in nested_groups.iter().enumerate() {
963 if (group.original_col_index as usize) < original_num_cols {
964 original_col_map[group.original_col_index as usize] = Some(gi);
965 }
966 }
967
968 let mut flat_idx = 0;
969 for entry in original_col_map.iter().take(original_num_cols) {
970 if let Some(gi) = entry {
971 let group = &nested_groups[*gi];
972 let num_sub = group.sub_keys.len();
973
974 let is_absent = |si: usize, row: usize| -> bool {
976 if group.absence_bitmap.is_empty() {
977 return false; }
979 let bit_idx = si * num_rows + row;
980 let byte_idx = bit_idx / 8;
981 if byte_idx >= group.absence_bitmap.len() {
982 return false;
983 }
984 (group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
985 };
986
987 let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
989 for row in 0..num_rows {
990 let all_null = (0..num_sub).all(|si| {
993 flat_idx + si < flat_col_values.len()
994 && flat_col_values[flat_idx + si][row] == b"null"
995 });
996 if all_null && !group.absence_bitmap.is_empty() {
997 let any_present_null = (0..num_sub).any(|si| {
1000 flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
1001 });
1002 if any_present_null {
1003 } else {
1006 merged_col.push(b"null".to_vec());
1008 continue;
1009 }
1010 } else if all_null {
1011 merged_col.push(b"null".to_vec());
1012 continue;
1013 }
1014
1015 let has_absent = (0..num_sub).any(|si| is_absent(si, row));
1017
1018 if !has_absent
1019 && !group.nested_template.is_empty()
1020 && group.nested_template.len() == num_sub + 1
1021 {
1022 let mut obj = Vec::new();
1025 obj.extend_from_slice(&group.nested_template[0]);
1026 if flat_idx < flat_col_values.len() {
1027 obj.extend_from_slice(flat_col_values[flat_idx][row]);
1028 }
1029 for si in 1..num_sub {
1030 obj.extend_from_slice(&group.nested_template[si]);
1031 if flat_idx + si < flat_col_values.len() {
1032 obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
1033 }
1034 }
1035 obj.extend_from_slice(&group.nested_template[num_sub]);
1036 merged_col.push(obj);
1037 } else {
1038 let mut obj = Vec::new();
1041 obj.push(b'{');
1042 let mut first = true;
1043 for si in 0..num_sub {
1044 if flat_idx + si >= flat_col_values.len() {
1045 break;
1046 }
1047 if is_absent(si, row) {
1048 continue; }
1050 let val = flat_col_values[flat_idx + si][row];
1051 if !first {
1052 obj.push(b',');
1053 }
1054 first = false;
1055 obj.push(b'"');
1056 obj.extend_from_slice(&group.sub_keys[si]);
1057 obj.push(b'"');
1058 obj.push(b':');
1059 obj.extend_from_slice(val);
1060 }
1061 obj.push(b'}');
1062 merged_col.push(obj);
1063 }
1064 }
1065 output_columns.push(merged_col);
1066 flat_idx += num_sub;
1067 } else {
1068 if flat_idx < flat_col_values.len() {
1070 let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1071 .iter()
1072 .map(|v| v.to_vec())
1073 .collect();
1074 output_columns.push(col);
1075 }
1076 flat_idx += 1;
1077 }
1078 }
1079
1080 let num_out_cols = output_columns.len();
1082 let mut out = Vec::new();
1083 for (ci, col) in output_columns.iter().enumerate() {
1084 for (ri, val) in col.iter().enumerate() {
1085 out.extend_from_slice(val);
1086 if ri < num_rows - 1 {
1087 out.push(VAL_SEP);
1088 }
1089 }
1090 if ci < num_out_cols - 1 {
1091 out.push(COL_SEP);
1092 }
1093 }
1094
1095 out
1096}
1097
1098pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1103 let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1104 let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
1105 let mut out = Vec::new();
1106 let version = if has_absence {
1107 3u8
1108 } else if has_template {
1109 2u8
1110 } else {
1111 1u8
1112 };
1113 out.push(version);
1114 out.push(groups.len() as u8);
1115 for group in groups {
1116 out.extend_from_slice(&group.original_col_index.to_le_bytes());
1117 out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1118 for key in &group.sub_keys {
1119 out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1120 out.extend_from_slice(key);
1121 }
1122 if has_template || version == 3 {
1123 out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1124 for part in &group.nested_template {
1125 out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1126 out.extend_from_slice(part);
1127 }
1128 }
1129 if version == 3 {
1130 let bm_len = group.absence_bitmap.len() as u32;
1131 out.extend_from_slice(&bm_len.to_le_bytes());
1132 out.extend_from_slice(&group.absence_bitmap);
1133 }
1134 }
1135 out
1136}
1137
1138pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1143 if data.is_empty() {
1144 return None;
1145 }
1146 let mut pos = 0;
1147 let version = data[pos];
1148 pos += 1;
1149 if version != 1 && version != 2 && version != 3 {
1150 return None;
1151 }
1152 let has_template = version == 2 || version == 3;
1153 let has_absence = version == 3;
1154 if pos >= data.len() {
1155 return None;
1156 }
1157 let num_groups = data[pos] as usize;
1158 pos += 1;
1159
1160 let mut groups = Vec::with_capacity(num_groups);
1161 for _ in 0..num_groups {
1162 if pos + 4 > data.len() {
1163 return None;
1164 }
1165 let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1166 pos += 2;
1167 let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1168 pos += 2;
1169
1170 let mut sub_keys = Vec::with_capacity(num_sub_cols);
1171 for _ in 0..num_sub_cols {
1172 if pos + 2 > data.len() {
1173 return None;
1174 }
1175 let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1176 pos += 2;
1177 if pos + key_len > data.len() {
1178 return None;
1179 }
1180 sub_keys.push(data[pos..pos + key_len].to_vec());
1181 pos += key_len;
1182 }
1183
1184 let nested_template = if has_template {
1185 if pos + 2 > data.len() {
1186 return None;
1187 }
1188 let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1189 pos += 2;
1190 let mut parts = Vec::with_capacity(num_parts);
1191 for _ in 0..num_parts {
1192 if pos + 2 > data.len() {
1193 return None;
1194 }
1195 let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1196 pos += 2;
1197 if pos + part_len > data.len() {
1198 return None;
1199 }
1200 parts.push(data[pos..pos + part_len].to_vec());
1201 pos += part_len;
1202 }
1203 parts
1204 } else {
1205 Vec::new()
1206 };
1207
1208 let absence_bitmap = if has_absence {
1209 if pos + 4 > data.len() {
1210 return None;
1211 }
1212 let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1213 pos += 4;
1214 if pos + bm_len > data.len() {
1215 return None;
1216 }
1217 let bm = data[pos..pos + bm_len].to_vec();
1218 pos += bm_len;
1219 bm
1220 } else {
1221 Vec::new()
1222 };
1223
1224 groups.push(NestedGroupInfo {
1225 original_col_index,
1226 sub_keys,
1227 nested_template,
1228 absence_bitmap,
1229 });
1230 }
1231
1232 Some((groups, pos))
1233}
1234
1235pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1240 if data.is_empty() {
1241 return None;
1242 }
1243
1244 let has_trailing_newline = data.last() == Some(&b'\n');
1245 let lines = split_lines(data);
1246 let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1247
1248 if non_empty.len() < 2 {
1249 return None;
1250 }
1251
1252 if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1254 if col_data.len() + metadata.len() < data.len() {
1255 let num_rows = non_empty.len();
1260 if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1261 let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
1266 let unflattened =
1267 unflatten_nested_columns(&flat_data, &nested_groups, num_rows, total_flat_cols);
1268 if unflattened == col_data {
1269 let nested_bytes = serialize_nested_info(&nested_groups);
1271 metadata.extend_from_slice(&nested_bytes);
1272 return Some(TransformResult {
1273 data: flat_data,
1274 metadata,
1275 });
1276 }
1277 }
1279 metadata.push(0u8); return Some(TransformResult {
1282 data: col_data,
1283 metadata,
1284 });
1285 }
1286 }
1287
1288 if let Some((grouped_data, grouped_metadata)) =
1290 preprocess_grouped(&non_empty, has_trailing_newline)
1291 {
1292 if grouped_data.len() + grouped_metadata.len() < data.len() {
1293 return Some(TransformResult {
1294 data: grouped_data,
1295 metadata: grouped_metadata,
1296 });
1297 }
1298 }
1299
1300 None
1301}
1302
1303pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1306 if metadata.is_empty() {
1307 return data.to_vec();
1308 }
1309 match metadata[0] {
1310 METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1311 METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1312 _ => data.to_vec(),
1313 }
1314}
1315
1316fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1318 if metadata.len() < 10 {
1319 return data.to_vec();
1320 }
1321 let mut pos = 0;
1322 let _version = metadata[pos];
1323 pos += 1;
1324 let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1325 pos += 4;
1326 let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1327 pos += 2;
1328 let has_trailing_newline = metadata[pos] != 0;
1329 pos += 1;
1330 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1331 pos += 2;
1332
1333 let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1334 for _ in 0..num_parts {
1335 if pos + 2 > metadata.len() {
1336 return data.to_vec();
1337 }
1338 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1339 pos += 2;
1340 if pos + part_len > metadata.len() {
1341 return data.to_vec();
1342 }
1343 parts.push(metadata[pos..pos + part_len].to_vec());
1344 pos += part_len;
1345 }
1346
1347 if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1348 return data.to_vec();
1349 }
1350
1351 let remaining_metadata = &metadata[pos..];
1353 if !remaining_metadata.is_empty()
1354 && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
1355 {
1356 if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1358 let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1360 let unflattened =
1361 unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1362 return reverse_uniform_from_parts(
1363 &unflattened,
1364 &parts,
1365 num_rows,
1366 num_cols,
1367 has_trailing_newline,
1368 );
1369 }
1370 }
1371
1372 reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1373}
1374
1375fn reverse_uniform_from_parts(
1377 data: &[u8],
1378 parts: &[Vec<u8>],
1379 num_rows: usize,
1380 num_cols: usize,
1381 has_trailing_newline: bool,
1382) -> Vec<u8> {
1383 let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1384 if col_chunks.len() != num_cols {
1385 return data.to_vec();
1386 }
1387
1388 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1389 for chunk in &col_chunks {
1390 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1391 if vals.len() != num_rows {
1392 return data.to_vec();
1393 }
1394 columns.push(vals);
1395 }
1396
1397 let mut output = Vec::with_capacity(data.len() * 2);
1398 #[allow(clippy::needless_range_loop)]
1399 for row in 0..num_rows {
1400 output.extend_from_slice(&parts[0]);
1401 output.extend_from_slice(columns[0][row]);
1402 for col in 1..num_cols {
1403 output.extend_from_slice(&parts[col]);
1404 output.extend_from_slice(columns[col][row]);
1405 }
1406 output.extend_from_slice(&parts[num_cols]);
1407
1408 if row < num_rows - 1 || has_trailing_newline {
1409 output.push(b'\n');
1410 }
1411 }
1412
1413 output
1414}
1415
1416fn parse_uniform_metadata(metadata: &[u8]) -> Option<(Vec<Vec<u8>>, usize, usize, bool)> {
1419 if metadata.len() < 10 {
1420 return None;
1421 }
1422 let mut pos = 1; let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1424 pos += 4;
1425 let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1426 pos += 2;
1427 let has_trailing_newline = metadata[pos] != 0;
1428 pos += 1;
1429 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1430 pos += 2;
1431
1432 let mut parts = Vec::with_capacity(num_parts);
1433 for _ in 0..num_parts {
1434 if pos + 2 > metadata.len() {
1435 return None;
1436 }
1437 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1438 pos += 2;
1439 if pos + part_len > metadata.len() {
1440 return None;
1441 }
1442 parts.push(metadata[pos..pos + part_len].to_vec());
1443 pos += part_len;
1444 }
1445
1446 if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1447 return None;
1448 }
1449
1450 Some((parts, num_rows, num_cols, has_trailing_newline))
1451}
1452
1453fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1455 if metadata.len() < 8 {
1456 return data.to_vec();
1457 }
1458
1459 let mut mpos = 1; let has_trailing_newline = metadata[mpos] != 0;
1461 mpos += 1;
1462 let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1463 mpos += 4;
1464 let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1465 mpos += 2;
1466
1467 let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1469
1470 let mut dpos: usize = 0;
1472
1473 for _ in 0..num_groups {
1474 if mpos + 4 > metadata.len() {
1476 return data.to_vec();
1477 }
1478 let group_row_count =
1479 u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1480 mpos += 4;
1481
1482 let mut row_indices = Vec::with_capacity(group_row_count);
1483 for _ in 0..group_row_count {
1484 if mpos + 4 > metadata.len() {
1485 return data.to_vec();
1486 }
1487 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1488 mpos += 4;
1489 row_indices.push(idx);
1490 }
1491
1492 if mpos + 4 > metadata.len() {
1494 return data.to_vec();
1495 }
1496 let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1497 mpos += 4;
1498 if mpos + gm_len > metadata.len() {
1499 return data.to_vec();
1500 }
1501 let group_metadata = &metadata[mpos..mpos + gm_len];
1502 mpos += gm_len;
1503
1504 if dpos + 4 > data.len() {
1506 return data.to_vec();
1507 }
1508 let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1509 dpos += 4;
1510 if dpos + gd_len > data.len() {
1511 return data.to_vec();
1512 }
1513 let group_data = &data[dpos..dpos + gd_len];
1514 dpos += gd_len;
1515
1516 let (parts, num_rows, num_cols, _trailing) = match parse_uniform_metadata(group_metadata) {
1518 Some(v) => v,
1519 None => return data.to_vec(),
1520 };
1521
1522 if num_rows != group_row_count {
1523 return data.to_vec();
1524 }
1525
1526 let col_chunks: Vec<&[u8]> = group_data.split(|&b| b == COL_SEP).collect();
1528 if col_chunks.len() != num_cols {
1529 return data.to_vec();
1530 }
1531
1532 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1533 for chunk in &col_chunks {
1534 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1535 if vals.len() != num_rows {
1536 return data.to_vec();
1537 }
1538 columns.push(vals);
1539 }
1540
1541 for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1543 let mut line = Vec::new();
1544 line.extend_from_slice(&parts[0]);
1545 line.extend_from_slice(columns[0][row_within_group]);
1546 for col in 1..num_cols {
1547 line.extend_from_slice(&parts[col]);
1548 line.extend_from_slice(columns[col][row_within_group]);
1549 }
1550 line.extend_from_slice(&parts[num_cols]);
1551
1552 if original_idx < total_rows {
1553 output_lines[original_idx] = Some(line);
1554 }
1555 }
1556 }
1557
1558 if mpos + 4 > metadata.len() {
1560 return data.to_vec();
1561 }
1562 let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1563 mpos += 4;
1564
1565 let mut residual_indices = Vec::with_capacity(residual_count);
1566 for _ in 0..residual_count {
1567 if mpos + 4 > metadata.len() {
1568 return data.to_vec();
1569 }
1570 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1571 mpos += 4;
1572 residual_indices.push(idx);
1573 }
1574
1575 let residual_data = &data[dpos..];
1577 if residual_count > 0 {
1578 let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
1579 vec![]
1580 } else {
1581 residual_data.split(|&b| b == b'\n').collect()
1582 };
1583 if residual_lines.len() != residual_count {
1585 return data.to_vec();
1586 }
1587 for (i, &idx) in residual_indices.iter().enumerate() {
1588 if idx < total_rows {
1589 output_lines[idx] = Some(residual_lines[i].to_vec());
1590 }
1591 }
1592 }
1593
1594 let mut output = Vec::with_capacity(data.len() * 2);
1596 for (i, slot) in output_lines.iter().enumerate() {
1597 match slot {
1598 Some(line) => output.extend_from_slice(line),
1599 None => {
1600 return data.to_vec();
1602 }
1603 }
1604 if i < total_rows - 1 || has_trailing_newline {
1605 output.push(b'\n');
1606 }
1607 }
1608
1609 output
1610}
1611
1612#[cfg(test)]
1613mod tests {
1614 use super::*;
1615
1616 #[test]
1617 fn extract_value_string() {
1618 let line = br#""hello","next""#;
1619 let (val, end) = extract_value(line, 0).unwrap();
1620 assert_eq!(val, b"\"hello\"");
1621 assert_eq!(end, 7);
1622 }
1623
1624 #[test]
1625 fn extract_value_number() {
1626 let line = b"42,next";
1627 let (val, end) = extract_value(line, 0).unwrap();
1628 assert_eq!(val, b"42");
1629 assert_eq!(end, 2);
1630 }
1631
1632 #[test]
1633 fn extract_value_bool() {
1634 let line = b"true,next";
1635 let (val, end) = extract_value(line, 0).unwrap();
1636 assert_eq!(val, b"true");
1637 assert_eq!(end, 4);
1638 }
1639
1640 #[test]
1641 fn extract_value_null() {
1642 let line = b"null,next";
1643 let (val, end) = extract_value(line, 0).unwrap();
1644 assert_eq!(val, b"null");
1645 assert_eq!(end, 4);
1646 }
1647
1648 #[test]
1649 fn extract_value_object() {
1650 let line = br#"{"a":1,"b":"x"},next"#;
1651 let (val, end) = extract_value(line, 0).unwrap();
1652 assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
1653 assert_eq!(end, 15);
1654 }
1655
1656 #[test]
1657 fn extract_value_array() {
1658 let line = b"[1,2,3],next";
1659 let (val, end) = extract_value(line, 0).unwrap();
1660 assert_eq!(val, b"[1,2,3]");
1661 assert_eq!(end, 7);
1662 }
1663
1664 #[test]
1665 fn extract_value_string_with_escapes() {
1666 let line = br#""he\"llo",next"#;
1667 let (val, end) = extract_value(line, 0).unwrap();
1668 assert_eq!(val, br#""he\"llo""#.to_vec());
1669 assert_eq!(end, 9);
1670 }
1671
1672 #[test]
1673 fn parse_line_simple() {
1674 let line = br#"{"a":1,"b":"x"}"#;
1675 let (parts, values) = parse_line(line).unwrap();
1676 assert_eq!(parts.len(), 3); assert_eq!(values.len(), 2);
1678 assert_eq!(values[0], b"1");
1679 assert_eq!(values[1], b"\"x\"");
1680 assert_eq!(parts[0], br#"{"a":"#.to_vec());
1681 assert_eq!(parts[1], br#","b":"#.to_vec());
1682 assert_eq!(parts[2], b"}");
1683 }
1684
1685 #[test]
1686 fn roundtrip_simple() {
1687 let data = br#"{"a":1,"b":"x"}
1688{"a":2,"b":"y"}
1689{"a":3,"b":"z"}
1690"#;
1691 let result = preprocess(data).expect("should produce transform");
1692 let restored = reverse(&result.data, &result.metadata);
1693 assert_eq!(
1694 String::from_utf8_lossy(&restored),
1695 String::from_utf8_lossy(data),
1696 );
1697 assert_eq!(restored, data.to_vec());
1698 }
1699
1700 #[test]
1701 fn roundtrip_no_trailing_newline() {
1702 let data = br#"{"a":1,"b":"x"}
1703{"a":2,"b":"y"}
1704{"a":3,"b":"z"}"#;
1705 let result = preprocess(data).expect("should produce transform");
1706 let restored = reverse(&result.data, &result.metadata);
1707 assert_eq!(restored, data.to_vec());
1708 }
1709
1710 #[test]
1711 fn roundtrip_nested_values() {
1712 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1713{"id":2,"meta":{"x":30,"y":40}}
1714{"id":3,"meta":{"x":50,"y":60}}
1715{"id":4,"meta":{"x":70,"y":80}}
1716{"id":5,"meta":{"x":90,"y":100}}
1717"#;
1718 let result = preprocess(data).expect("should produce transform");
1719 let restored = reverse(&result.data, &result.metadata);
1720 assert_eq!(restored, data.to_vec());
1721 }
1722
1723 #[test]
1724 fn roundtrip_mixed_types() {
1725 let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
1726{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
1727{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
1728{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
1729{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
1730"#;
1731 let result = preprocess(data).expect("should produce transform");
1732 let restored = reverse(&result.data, &result.metadata);
1733 assert_eq!(restored, data.to_vec());
1734 }
1735
1736 #[test]
1737 fn schema_mismatch_too_few_returns_none() {
1738 let data = br#"{"a":1,"b":2}
1740{"a":1,"c":3}
1741"#;
1742 assert!(preprocess(data).is_none());
1743 }
1744
1745 #[test]
1746 fn different_num_keys_too_few_returns_none() {
1747 let data = br#"{"a":1,"b":2}
1748{"a":1}
1749"#;
1750 assert!(preprocess(data).is_none());
1751 }
1752
1753 #[test]
1754 fn single_line_returns_none() {
1755 let data = br#"{"a":1,"b":2}
1756"#;
1757 assert!(preprocess(data).is_none());
1758 }
1759
1760 #[test]
1761 fn empty_returns_none() {
1762 assert!(preprocess(b"").is_none());
1763 }
1764
1765 #[test]
1766 fn column_layout_groups_similar_values() {
1767 let data = br#"{"type":"page_view","user":"alice"}
1768{"type":"api_call","user":"alice"}
1769{"type":"click","user":"bob"}
1770"#;
1771 let result = preprocess(data).unwrap();
1772
1773 let col_data = &result.data;
1775 let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
1776 assert_eq!(cols.len(), 2);
1777
1778 let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
1780 assert_eq!(type_vals.len(), 3);
1781 assert_eq!(type_vals[0], br#""page_view""#);
1782 assert_eq!(type_vals[1], br#""api_call""#);
1783 assert_eq!(type_vals[2], br#""click""#);
1784
1785 let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
1787 assert_eq!(user_vals.len(), 3);
1788 assert_eq!(user_vals[0], br#""alice""#);
1789 assert_eq!(user_vals[1], br#""alice""#);
1790 assert_eq!(user_vals[2], br#""bob""#);
1791 }
1792
1793 #[test]
1794 fn roundtrip_string_with_escaped_chars() {
1795 let data = br#"{"msg":"he said \"hi\"","val":1}
1796{"msg":"line\nbreak","val":2}
1797{"msg":"tab\there","val":3}
1798{"msg":"back\\slash","val":4}
1799{"msg":"normal text","val":5}
1800"#;
1801 let result = preprocess(data).expect("should produce transform");
1802 let restored = reverse(&result.data, &result.metadata);
1803 assert_eq!(restored, data.to_vec());
1804 }
1805
1806 #[test]
1807 fn roundtrip_negative_and_float_numbers() {
1808 let data = br#"{"x":-3.14,"y":0}
1809{"x":2.718,"y":-1}
1810{"x":0.001,"y":999}
1811{"x":-100,"y":-200}
1812{"x":42.0,"y":7}
1813"#;
1814 let result = preprocess(data).expect("should produce transform");
1815 let restored = reverse(&result.data, &result.metadata);
1816 assert_eq!(restored, data.to_vec());
1817 }
1818
1819 #[test]
1822 fn reverse_roundtrip_small_data() {
1823 let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
1825 assert_eq!(vals.len(), 2);
1826 assert_eq!(parts.len(), 3);
1827
1828 let big_data = br#"{"x":-3.14,"y":0}
1830{"x":2.718,"y":-1}
1831"#
1832 .repeat(20);
1833 let result = preprocess(&big_data).expect("should produce transform with 40 rows");
1834 let restored = reverse(&result.data, &result.metadata);
1835 assert_eq!(restored, big_data);
1836 }
1837
1838 #[test]
1841 fn grouped_roundtrip_two_schemas() {
1842 let mut data = Vec::new();
1844 for i in 0..10 {
1845 data.extend_from_slice(
1846 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1847 );
1848 data.push(b'\n');
1849 }
1850 for i in 10..20 {
1851 data.extend_from_slice(
1852 format!(
1853 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1854 i, i, i
1855 )
1856 .as_bytes(),
1857 );
1858 data.push(b'\n');
1859 }
1860 let result = preprocess(&data).expect("should produce grouped transform");
1861 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1863 let restored = reverse(&result.data, &result.metadata);
1864 assert_eq!(restored, data);
1865 }
1866
1867 #[test]
1868 fn grouped_roundtrip_interleaved_schemas() {
1869 let mut data = Vec::new();
1871 for i in 0..20 {
1872 if i % 2 == 0 {
1873 data.extend_from_slice(
1874 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1875 );
1876 } else {
1877 data.extend_from_slice(
1878 format!(
1879 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1880 i, i, i
1881 )
1882 .as_bytes(),
1883 );
1884 }
1885 data.push(b'\n');
1886 }
1887 let result = preprocess(&data).expect("should produce grouped transform");
1888 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1889 let restored = reverse(&result.data, &result.metadata);
1890 assert_eq!(restored, data);
1891 }
1892
1893 #[test]
1894 fn grouped_roundtrip_with_residuals() {
1895 let mut data = Vec::new();
1897 for i in 0..8 {
1899 data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
1900 data.push(b'\n');
1901 }
1902 data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
1904 data.push(b'\n');
1905 data.extend_from_slice(br#"{"p":"q"}"#);
1906 data.push(b'\n');
1907 for i in 0..6 {
1909 data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
1910 data.push(b'\n');
1911 }
1912 let result = preprocess(&data).expect("should produce grouped transform");
1913 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1914 let restored = reverse(&result.data, &result.metadata);
1915 assert_eq!(
1916 String::from_utf8_lossy(&restored),
1917 String::from_utf8_lossy(&data),
1918 );
1919 assert_eq!(restored, data);
1920 }
1921
1922 #[test]
1923 fn grouped_roundtrip_no_trailing_newline() {
1924 let mut data = Vec::new();
1925 for i in 0..6 {
1926 data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
1927 data.push(b'\n');
1928 }
1929 for i in 0..6 {
1930 data.extend_from_slice(
1931 format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
1932 );
1933 if i < 5 {
1934 data.push(b'\n');
1935 }
1936 }
1938 let result = preprocess(&data).expect("should produce grouped transform");
1939 let restored = reverse(&result.data, &result.metadata);
1940 assert_eq!(restored, data);
1941 }
1942
1943 #[test]
1944 fn uniform_still_preferred_over_grouped() {
1945 let data = br#"{"a":1,"b":"x"}
1947{"a":2,"b":"y"}
1948{"a":3,"b":"z"}
1949{"a":4,"b":"w"}
1950{"a":5,"b":"v"}
1951"#;
1952 let result = preprocess(data).expect("should produce transform");
1953 assert_eq!(
1954 result.metadata[0], METADATA_VERSION_UNIFORM,
1955 "uniform schema should use Strategy 1"
1956 );
1957 let restored = reverse(&result.data, &result.metadata);
1958 assert_eq!(restored, data.to_vec());
1959 }
1960
1961 #[test]
1962 fn grouped_gharchive_simulation() {
1963 let mut data = Vec::new();
1965 for i in 0..50 {
1966 if i % 5 == 0 {
1967 data.extend_from_slice(
1969 format!(
1970 r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
1971 i, i, i, i
1972 )
1973 .as_bytes(),
1974 );
1975 } else {
1976 data.extend_from_slice(
1978 format!(
1979 r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
1980 i, i, i
1981 )
1982 .as_bytes(),
1983 );
1984 }
1985 data.push(b'\n');
1986 }
1987 let result = preprocess(&data).expect("should produce grouped transform");
1988 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1989 let restored = reverse(&result.data, &result.metadata);
1990 assert_eq!(restored, data);
1991 }
1992
1993 #[test]
1996 fn test_nested_decomposition_basic() {
1997 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1999{"id":2,"meta":{"x":30,"y":40}}
2000{"id":3,"meta":{"x":50,"y":60}}
2001"#;
2002 let result = preprocess(data).expect("should produce transform");
2003 assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
2004
2005 let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
2007 assert_eq!(
2009 cols.len(),
2010 3,
2011 "should have 3 columns after flattening: got {}",
2012 cols.len()
2013 );
2014
2015 let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2017 assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
2018
2019 let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
2020 assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
2021 }
2022
2023 #[test]
2024 fn test_nested_roundtrip() {
2025 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2027{"id":2,"meta":{"x":30,"y":40}}
2028{"id":3,"meta":{"x":50,"y":60}}
2029"#;
2030 let result = preprocess(data).expect("should produce transform");
2031 let restored = reverse(&result.data, &result.metadata);
2032 assert_eq!(
2033 String::from_utf8_lossy(&restored),
2034 String::from_utf8_lossy(data),
2035 );
2036 assert_eq!(restored, data.to_vec());
2037 }
2038
2039 #[test]
2040 fn test_nested_mixed_schemas() {
2041 let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
2043{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
2044{"ts":"c","meta":{"query":"pricing","results_count":25}}
2045{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
2046{"ts":"e","meta":{"query":"api docs","results_count":41}}
2047"#;
2048 let result = preprocess(data).expect("should produce transform");
2049 let restored = reverse(&result.data, &result.metadata);
2050 assert_eq!(
2051 String::from_utf8_lossy(&restored),
2052 String::from_utf8_lossy(data),
2053 );
2054 assert_eq!(restored, data.to_vec());
2055 }
2056
2057 #[test]
2058 fn test_nested_no_nested_objects() {
2059 let data = br#"{"a":1,"b":"x"}
2061{"a":2,"b":"y"}
2062{"a":3,"b":"z"}
2063"#;
2064 let result = preprocess(data).expect("should produce transform");
2065 let restored = reverse(&result.data, &result.metadata);
2066 assert_eq!(restored, data.to_vec());
2067
2068 let meta = &result.metadata;
2074 let last_byte = meta[meta.len() - 1];
2075 assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
2076 }
2077
2078 #[test]
2079 fn test_nested_real_corpus() {
2080 let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
2082{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
2083{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
2084{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
2085{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
2086"#;
2087 let result = preprocess(data).expect("should produce transform");
2088 let restored = reverse(&result.data, &result.metadata);
2089 assert_eq!(
2090 String::from_utf8_lossy(&restored),
2091 String::from_utf8_lossy(data),
2092 );
2093 assert_eq!(restored, data.to_vec());
2094 }
2095
2096 #[test]
2097 fn test_nested_roundtrip_with_null_values() {
2098 let data = br#"{"id":1,"meta":{"x":10}}
2100{"id":2,"meta":null}
2101{"id":3,"meta":{"x":30}}
2102{"id":4,"meta":null}
2103{"id":5,"meta":{"x":50}}
2104"#;
2105 let result = preprocess(data).expect("should produce transform");
2106 let restored = reverse(&result.data, &result.metadata);
2107 assert_eq!(restored, data.to_vec());
2108 }
2109
2110 #[test]
2111 fn test_nested_string_values_preserved_exact() {
2112 let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2114{"id":2,"meta":{"name":"Bob","score":200}}
2115{"id":3,"meta":{"name":"Charlie","score":300}}
2116"#;
2117 let result = preprocess(data).expect("should produce transform");
2118 let restored = reverse(&result.data, &result.metadata);
2119 assert_eq!(restored, data.to_vec());
2120 }
2121
2122 #[test]
2123 fn test_parse_nested_object_kv() {
2124 let obj = br#"{"query":"benchmark","results_count":14}"#;
2125 let pairs = parse_nested_object_kv(obj).unwrap();
2126 assert_eq!(pairs.len(), 2);
2127 assert_eq!(pairs[0].0, b"query");
2128 assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2129 assert_eq!(pairs[1].0, b"results_count");
2130 assert_eq!(pairs[1].1, b"14");
2131 }
2132
2133 #[test]
2134 fn test_nested_varying_subkeys_roundtrip() {
2135 let mut lines = Vec::new();
2138 for i in 0..50 {
2139 let line = if i % 2 == 0 {
2140 format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
2141 } else {
2142 format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
2143 };
2144 lines.push(line);
2145 }
2146 let ndjson = lines.join("\n") + "\n";
2147 let data = ndjson.as_bytes();
2148
2149 let result = preprocess(data).expect("should produce transform");
2150 let restored = reverse(&result.data, &result.metadata);
2151 assert_eq!(
2152 std::str::from_utf8(&restored).unwrap(),
2153 std::str::from_utf8(data).unwrap(),
2154 "varying sub-keys roundtrip must be byte-exact"
2155 );
2156 }
2157
2158 #[test]
2159 fn test_nested_explicit_null_preserved() {
2160 let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
2163 {\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
2164 {\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
2165 let result = preprocess(data).expect("should produce transform");
2166 let restored = reverse(&result.data, &result.metadata);
2167 assert_eq!(
2168 std::str::from_utf8(&restored).unwrap(),
2169 std::str::from_utf8(data).unwrap(),
2170 "explicit null values must be preserved"
2171 );
2172 }
2173
2174 #[test]
2175 fn null_heavy_30_rows_roundtrip() {
2176 let mut data = Vec::new();
2178 for i in 0..30 {
2179 data.extend_from_slice(format!("{{\"id\":{},\"val\":null}}\n", i).as_bytes());
2180 }
2181 let result = preprocess(&data);
2182 if let Some(result) = result {
2183 let restored = reverse(&result.data, &result.metadata);
2184 assert_eq!(
2185 restored, data,
2186 "null-heavy 30-row roundtrip failed.\nOriginal len={}, Restored len={}\nOrig first 200: {:?}\nRest first 200: {:?}",
2187 data.len(), restored.len(),
2188 String::from_utf8_lossy(&data[..data.len().min(200)]),
2189 String::from_utf8_lossy(&restored[..restored.len().min(200)])
2190 );
2191 }
2192 }
2193
2194 #[test]
2195 fn null_heavy_60_rows_roundtrip() {
2196 let mut data = Vec::new();
2198 for i in 0..60 {
2199 let name = if i % 10 == 0 {
2200 format!("\"user_{}\"", i)
2201 } else {
2202 "null".to_string()
2203 };
2204 data.extend_from_slice(
2205 format!("{{\"id\":{},\"name\":{},\"email\":null,\"score\":null,\"active\":null,\"tags\":null}}\n", i, name).as_bytes(),
2206 );
2207 }
2208 let result = preprocess(&data);
2209 if let Some(result) = result {
2210 let restored = reverse(&result.data, &result.metadata);
2211 assert_eq!(restored, data, "null-heavy 60-row ndjson roundtrip failed");
2212 }
2213 }
2214}