1use super::transform::TransformResult;
24use std::collections::HashMap;
25
26const COL_SEP: u8 = 0x00;
27const VAL_SEP: u8 = 0x01;
28const METADATA_VERSION_UNIFORM: u8 = 1;
29const METADATA_VERSION_GROUPED: u8 = 2;
30
31const MIN_GROUP_ROWS: usize = 5;
33
34type SchemaGroup = (Vec<Vec<u8>>, Vec<(usize, Vec<Vec<u8>>)>);
36
37fn extract_value(line: &[u8], mut pos: usize) -> Option<(Vec<u8>, usize)> {
43 while pos < line.len() && line[pos].is_ascii_whitespace() {
45 pos += 1;
46 }
47 if pos >= line.len() {
48 return None;
49 }
50
51 let start = pos;
52 match line[pos] {
53 b'"' => {
54 pos += 1;
56 let mut escaped = false;
57 while pos < line.len() {
58 if escaped {
59 escaped = false;
60 } else if line[pos] == b'\\' {
61 escaped = true;
62 } else if line[pos] == b'"' {
63 pos += 1;
64 return Some((line[start..pos].to_vec(), pos));
65 }
66 pos += 1;
67 }
68 None }
70 b'{' => {
71 let mut depth = 1;
73 pos += 1;
74 while pos < line.len() && depth > 0 {
75 match line[pos] {
76 b'"' => {
77 pos += 1;
79 let mut escaped = false;
80 while pos < line.len() {
81 if escaped {
82 escaped = false;
83 } else if line[pos] == b'\\' {
84 escaped = true;
85 } else if line[pos] == b'"' {
86 break;
87 }
88 pos += 1;
89 }
90 }
91 b'{' => depth += 1,
92 b'}' => depth -= 1,
93 _ => {}
94 }
95 pos += 1;
96 }
97 if depth != 0 || pos > line.len() {
98 return None; }
100 Some((line[start..pos].to_vec(), pos))
101 }
102 b'[' => {
103 let mut depth = 1;
105 pos += 1;
106 while pos < line.len() && depth > 0 {
107 match line[pos] {
108 b'"' => {
109 pos += 1;
110 let mut escaped = false;
111 while pos < line.len() {
112 if escaped {
113 escaped = false;
114 } else if line[pos] == b'\\' {
115 escaped = true;
116 } else if line[pos] == b'"' {
117 break;
118 }
119 pos += 1;
120 }
121 }
122 b'[' => depth += 1,
123 b']' => depth -= 1,
124 _ => {}
125 }
126 pos += 1;
127 }
128 if depth != 0 || pos > line.len() {
129 return None; }
131 Some((line[start..pos].to_vec(), pos))
132 }
133 _ => {
134 while pos < line.len() {
136 match line[pos] {
137 b',' | b'}' | b']' => break,
138 _ if line[pos].is_ascii_whitespace() => break,
139 _ => pos += 1,
140 }
141 }
142 if pos == start {
143 None
144 } else {
145 Some((line[start..pos].to_vec(), pos))
146 }
147 }
148 }
149}
150
151type ParsedLine = (Vec<Vec<u8>>, Vec<Vec<u8>>);
162
163fn parse_line(line: &[u8]) -> Option<ParsedLine> {
164 let mut pos = 0;
165
166 while pos < line.len() && line[pos].is_ascii_whitespace() {
168 pos += 1;
169 }
170 if pos >= line.len() || line[pos] != b'{' {
171 return None;
172 }
173
174 let mut parts: Vec<Vec<u8>> = Vec::new();
175 let mut values: Vec<Vec<u8>> = Vec::new();
176 let mut part_start = 0;
177
178 pos += 1; loop {
181 while pos < line.len() && line[pos].is_ascii_whitespace() {
183 pos += 1;
184 }
185 if pos >= line.len() {
186 return None;
187 }
188
189 if line[pos] == b'}' {
191 parts.push(line[part_start..].to_vec());
193 break;
194 }
195
196 if line[pos] != b'"' {
198 return None;
199 }
200 pos += 1;
202 let mut escaped = false;
203 while pos < line.len() {
204 if escaped {
205 escaped = false;
206 } else if line[pos] == b'\\' {
207 escaped = true;
208 } else if line[pos] == b'"' {
209 pos += 1;
210 break;
211 }
212 pos += 1;
213 }
214
215 while pos < line.len() && line[pos].is_ascii_whitespace() {
217 pos += 1;
218 }
219 if pos >= line.len() || line[pos] != b':' {
220 return None;
221 }
222 pos += 1; parts.push(line[part_start..pos].to_vec());
226
227 let (value, value_end) = extract_value(line, pos)?;
229 values.push(value);
230 pos = value_end;
231
232 part_start = pos;
234
235 while pos < line.len() && line[pos].is_ascii_whitespace() {
237 pos += 1;
238 }
239 if pos >= line.len() {
240 return None;
241 }
242
243 if line[pos] == b',' {
245 pos += 1;
246 } else if line[pos] == b'}' {
247 parts.push(line[part_start..].to_vec());
251 break;
252 } else {
253 return None; }
255 }
256
257 if values.is_empty() {
258 return None;
259 }
260
261 Some((parts, values))
262}
263
264fn split_lines(data: &[u8]) -> Vec<&[u8]> {
266 let mut lines: Vec<&[u8]> = Vec::new();
267 let mut start = 0;
268 for i in 0..data.len() {
269 if data[i] == b'\n' {
270 lines.push(&data[start..i]);
271 start = i + 1;
272 }
273 }
274 if start < data.len() {
275 lines.push(&data[start..]);
276 }
277 lines
278}
279
280fn build_uniform_columnar(
283 template_parts: &[Vec<u8>],
284 columns: &[Vec<Vec<u8>>],
285 num_rows: usize,
286 has_trailing_newline: bool,
287) -> (Vec<u8>, Vec<u8>) {
288 let num_cols = columns.len();
289
290 let mut col_data = Vec::new();
292 for (ci, col) in columns.iter().enumerate() {
293 for (ri, val) in col.iter().enumerate() {
294 col_data.extend_from_slice(val);
295 if ri < num_rows - 1 {
296 col_data.push(VAL_SEP);
297 }
298 }
299 if ci < num_cols - 1 {
300 col_data.push(COL_SEP);
301 }
302 }
303
304 let mut metadata = Vec::new();
306 metadata.push(METADATA_VERSION_UNIFORM);
307 metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
308 metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
309 metadata.push(if has_trailing_newline { 1 } else { 0 });
310 metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
311 for part in template_parts {
312 metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
313 metadata.extend_from_slice(part);
314 }
315
316 (col_data, metadata)
317}
318
319fn preprocess_uniform(
322 non_empty: &[&[u8]],
323 has_trailing_newline: bool,
324) -> Option<(Vec<u8>, Vec<u8>)> {
325 if non_empty.len() < 2 {
326 return None;
327 }
328
329 let (template_parts, first_values) = parse_line(non_empty[0])?;
330 let num_cols = first_values.len();
331 if template_parts.len() != num_cols + 1 {
332 return None;
333 }
334
335 let mut columns: Vec<Vec<Vec<u8>>> = Vec::with_capacity(num_cols);
336 for v in &first_values {
337 columns.push(vec![v.clone()]);
338 }
339
340 for &line in &non_empty[1..] {
341 let (parts, values) = parse_line(line)?;
342 if values.len() != num_cols || parts.len() != template_parts.len() {
343 return None;
344 }
345 for (a, b) in parts.iter().zip(template_parts.iter()) {
346 if a != b {
347 return None;
348 }
349 }
350 for (col, val) in values.iter().enumerate() {
351 columns[col].push(val.clone());
352 }
353 }
354
355 Some(build_uniform_columnar(
356 &template_parts,
357 &columns,
358 non_empty.len(),
359 has_trailing_newline,
360 ))
361}
362
363fn preprocess_grouped(
384 non_empty: &[&[u8]],
385 has_trailing_newline: bool,
386) -> Option<(Vec<u8>, Vec<u8>)> {
387 if non_empty.len() < MIN_GROUP_ROWS {
388 return None;
389 }
390
391 let mut parsed: Vec<Option<ParsedLine>> = Vec::with_capacity(non_empty.len());
394 for &line in non_empty {
395 parsed.push(parse_line(line));
396 }
397
398 let mut group_map: HashMap<Vec<u8>, SchemaGroup> = HashMap::new();
401 let mut residual_indices: Vec<usize> = Vec::new();
402
403 for (idx, parsed_line) in parsed.into_iter().enumerate() {
404 if let Some((parts, values)) = parsed_line {
405 let mut key = Vec::new();
407 for part in &parts {
408 key.extend_from_slice(&(part.len() as u32).to_le_bytes());
409 key.extend_from_slice(part);
410 }
411 group_map
412 .entry(key)
413 .or_insert_with(|| (parts, Vec::new()))
414 .1
415 .push((idx, values));
416 } else {
417 residual_indices.push(idx);
419 }
420 }
421
422 let mut groups: Vec<SchemaGroup> = Vec::new();
424 for (_key, (template_parts, rows)) in group_map {
425 if rows.len() >= MIN_GROUP_ROWS {
426 groups.push((template_parts, rows));
427 } else {
428 for (idx, _) in &rows {
430 residual_indices.push(*idx);
431 }
432 }
433 }
434
435 if groups.is_empty() {
437 return None;
438 }
439
440 groups.sort_by_key(|(_, rows)| rows[0].0);
442 residual_indices.sort_unstable();
443
444 struct GroupOutput {
446 row_indices: Vec<u32>,
447 col_data: Vec<u8>,
448 group_metadata: Vec<u8>,
449 }
450
451 let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
452
453 for (template_parts, rows) in &groups {
454 let num_cols = template_parts.len() - 1;
455 let mut columns: Vec<Vec<Vec<u8>>> = (0..num_cols).map(|_| Vec::new()).collect();
456 let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
457
458 for (idx, values) in rows {
459 row_indices.push(*idx as u32);
460 for (col, val) in values.iter().enumerate() {
461 columns[col].push(val.clone());
462 }
463 }
464
465 let (col_data, group_metadata) =
467 build_uniform_columnar(template_parts, &columns, rows.len(), false);
468
469 group_outputs.push(GroupOutput {
470 row_indices,
471 col_data,
472 group_metadata,
473 });
474 }
475
476 let mut data_out = Vec::new();
478 for group in &group_outputs {
479 data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
480 data_out.extend_from_slice(&group.col_data);
481 }
482
483 let residual_start = data_out.len();
485 for (i, &idx) in residual_indices.iter().enumerate() {
486 data_out.extend_from_slice(non_empty[idx]);
487 if i < residual_indices.len() - 1 {
488 data_out.push(b'\n');
489 }
490 }
491 let _residual_len = data_out.len() - residual_start;
492
493 let mut metadata = Vec::new();
495 metadata.push(METADATA_VERSION_GROUPED);
496 metadata.push(if has_trailing_newline { 1 } else { 0 });
497 metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
498 metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
499
500 for group in &group_outputs {
501 metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
502 for &idx in &group.row_indices {
503 metadata.extend_from_slice(&idx.to_le_bytes());
504 }
505 metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
506 metadata.extend_from_slice(&group.group_metadata);
507 }
508
509 metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
510 for &idx in &residual_indices {
511 metadata.extend_from_slice(&(idx as u32).to_le_bytes());
512 }
513
514 Some((data_out, metadata))
515}
516
517pub(crate) struct NestedGroupInfo {
519 pub(crate) original_col_index: u16,
521 pub(crate) sub_keys: Vec<Vec<u8>>,
523 pub(crate) nested_template: Vec<Vec<u8>>,
526}
527
528pub(crate) fn flatten_nested_columns(
534 col_data: &[u8],
535 num_rows: usize,
536) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
537 let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
539 if columns.is_empty() || num_rows == 0 {
540 return None;
541 }
542
543 let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
544 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
547
548 for (col_idx, &col_chunk) in columns.iter().enumerate() {
549 let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
550 if values.len() != num_rows {
551 return None;
552 }
553
554 let mut all_objects = true;
556 let mut has_non_null = false;
557 for val in &values {
558 if *val == b"null" {
559 continue;
560 }
561 has_non_null = true;
562 if !val.starts_with(b"{") {
563 all_objects = false;
564 break;
565 }
566 }
567
568 if !all_objects || !has_non_null {
569 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
571 output_columns.push(col_values);
572 continue;
573 }
574
575 let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
579 let mut nested_template: Vec<Vec<u8>> = Vec::new();
580 type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
581 let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
582
583 for val in &values {
584 if *val == b"null" {
585 parsed_rows.push(None);
586 continue;
587 }
588 if nested_template.is_empty() {
589 match parse_nested_object_with_template(val) {
591 Some((template, kv_pairs)) => {
592 for (key, _) in &kv_pairs {
593 if !all_sub_keys.iter().any(|k| k == key) {
594 all_sub_keys.push(key.clone());
595 }
596 }
597 nested_template = template;
598 parsed_rows.push(Some(kv_pairs));
599 }
600 None => {
601 all_sub_keys.clear();
602 break;
603 }
604 }
605 } else {
606 match parse_nested_object_kv(val) {
608 Some(kv_pairs) => {
609 for (key, _) in &kv_pairs {
610 if !all_sub_keys.iter().any(|k| k == key) {
611 all_sub_keys.push(key.clone());
612 }
613 }
614 parsed_rows.push(Some(kv_pairs));
615 }
616 None => {
617 all_sub_keys.clear();
618 break;
619 }
620 }
621 }
622 }
623
624 if all_sub_keys.is_empty() {
625 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
627 output_columns.push(col_values);
628 continue;
629 }
630
631 let num_sub_keys = all_sub_keys.len();
633 let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
634
635 for parsed in &parsed_rows {
636 match parsed {
637 Some(kv_pairs) => {
638 for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
639 let found = kv_pairs.iter().find(|(k, _)| k == sk);
640 match found {
641 Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
642 None => sub_columns[sk_idx].push(b"null".to_vec()),
643 }
644 }
645 }
646 None => {
647 for sc in sub_columns.iter_mut() {
649 sc.push(b"null".to_vec());
650 }
651 }
652 }
653 }
654
655 nested_groups.push(NestedGroupInfo {
656 original_col_index: col_idx as u16,
657 sub_keys: all_sub_keys,
658 nested_template,
659 });
660
661 for sc in sub_columns {
662 output_columns.push(sc);
663 }
664 }
665
666 if nested_groups.is_empty() {
667 return None;
668 }
669
670 let num_out_cols = output_columns.len();
672 let mut out = Vec::new();
673 for (ci, col) in output_columns.iter().enumerate() {
674 for (ri, val) in col.iter().enumerate() {
675 out.extend_from_slice(val);
676 if ri < num_rows - 1 {
677 out.push(VAL_SEP);
678 }
679 }
680 if ci < num_out_cols - 1 {
681 out.push(COL_SEP);
682 }
683 }
684
685 Some((out, nested_groups))
686}
687
688#[allow(clippy::type_complexity)]
693pub(crate) fn parse_nested_object_with_template(
694 obj: &[u8],
695) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
696 let mut pos = 0;
697
698 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
700 pos += 1;
701 }
702 if pos >= obj.len() || obj[pos] != b'{' {
703 return None;
704 }
705 pos += 1;
706
707 let mut parts: Vec<Vec<u8>> = Vec::new();
708 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
709 let mut part_start = 0;
710
711 loop {
712 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
714 pos += 1;
715 }
716 if pos >= obj.len() {
717 return None;
718 }
719 if obj[pos] == b'}' {
720 parts.push(obj[part_start..].to_vec());
721 break;
722 }
723
724 if obj[pos] != b'"' {
726 return None;
727 }
728 let key_str_start = pos + 1;
729 pos += 1;
730 let mut escaped = false;
731 while pos < obj.len() {
732 if escaped {
733 escaped = false;
734 } else if obj[pos] == b'\\' {
735 escaped = true;
736 } else if obj[pos] == b'"' {
737 break;
738 }
739 pos += 1;
740 }
741 if pos >= obj.len() {
742 return None;
743 }
744 let key = obj[key_str_start..pos].to_vec();
745 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
749 pos += 1;
750 }
751 if pos >= obj.len() || obj[pos] != b':' {
752 return None;
753 }
754 pos += 1;
755
756 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
758 pos += 1;
759 }
760
761 parts.push(obj[part_start..pos].to_vec());
763
764 let value_start = pos;
766 let (value, value_end) = extract_value(obj, value_start)?;
768 pos = value_end;
769 pairs.push((key, value));
770
771 part_start = pos;
772
773 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
775 pos += 1;
776 }
777 if pos >= obj.len() {
778 return None;
779 }
780 if obj[pos] == b',' {
781 pos += 1;
782 } else if obj[pos] == b'}' {
783 parts.push(obj[part_start..].to_vec());
784 break;
785 } else {
786 return None;
787 }
788 }
789
790 if pairs.is_empty() {
791 return None;
792 }
793 Some((parts, pairs))
794}
795
796pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
800 let mut pos = 0;
801
802 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
804 pos += 1;
805 }
806 if pos >= obj.len() || obj[pos] != b'{' {
807 return None;
808 }
809 pos += 1;
810
811 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
812
813 loop {
814 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
816 pos += 1;
817 }
818 if pos >= obj.len() {
819 return None;
820 }
821 if obj[pos] == b'}' {
822 break;
823 }
824
825 if obj[pos] != b'"' {
827 return None;
828 }
829 pos += 1;
830 let key_start = pos;
831 let mut escaped = false;
832 while pos < obj.len() {
833 if escaped {
834 escaped = false;
835 } else if obj[pos] == b'\\' {
836 escaped = true;
837 } else if obj[pos] == b'"' {
838 break;
839 }
840 pos += 1;
841 }
842 if pos >= obj.len() {
843 return None;
844 }
845 let key = obj[key_start..pos].to_vec();
846 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
850 pos += 1;
851 }
852 if pos >= obj.len() || obj[pos] != b':' {
853 return None;
854 }
855 pos += 1;
856
857 let (value, value_end) = extract_value(obj, pos)?;
859 pos = value_end;
860 pairs.push((key, value));
861
862 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
864 pos += 1;
865 }
866 if pos >= obj.len() {
867 return None;
868 }
869 if obj[pos] == b',' {
870 pos += 1;
871 } else if obj[pos] == b'}' {
872 break;
873 } else {
874 return None;
875 }
876 }
877
878 if pairs.is_empty() {
879 return None;
880 }
881 Some(pairs)
882}
883
884pub(crate) fn unflatten_nested_columns(
889 flat_data: &[u8],
890 nested_groups: &[NestedGroupInfo],
891 num_rows: usize,
892 total_flat_cols: usize,
893) -> Vec<u8> {
894 let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
895 if flat_columns.len() != total_flat_cols {
896 return flat_data.to_vec();
897 }
898
899 let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
901 for chunk in &flat_columns {
902 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
903 if vals.len() != num_rows {
904 return flat_data.to_vec();
905 }
906 flat_col_values.push(vals);
907 }
908
909 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
912
913 let original_num_cols = total_flat_cols
922 - nested_groups
923 .iter()
924 .map(|g| g.sub_keys.len())
925 .sum::<usize>()
926 + nested_groups.len();
927
928 let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
930 for (gi, group) in nested_groups.iter().enumerate() {
931 if (group.original_col_index as usize) < original_num_cols {
932 original_col_map[group.original_col_index as usize] = Some(gi);
933 }
934 }
935
936 let mut flat_idx = 0;
937 for entry in original_col_map.iter().take(original_num_cols) {
938 if let Some(gi) = entry {
939 let group = &nested_groups[*gi];
940 let num_sub = group.sub_keys.len();
941
942 let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
944 for row in 0..num_rows {
945 let all_null = (0..num_sub).all(|si| {
947 flat_idx + si < flat_col_values.len()
948 && flat_col_values[flat_idx + si][row] == b"null"
949 });
950 if all_null {
951 merged_col.push(b"null".to_vec());
952 } else if !group.nested_template.is_empty()
953 && group.nested_template.len() == num_sub + 1
954 {
955 let mut obj = Vec::new();
957 obj.extend_from_slice(&group.nested_template[0]);
958 if flat_idx < flat_col_values.len() {
959 obj.extend_from_slice(flat_col_values[flat_idx][row]);
960 }
961 for si in 1..num_sub {
962 obj.extend_from_slice(&group.nested_template[si]);
963 if flat_idx + si < flat_col_values.len() {
964 obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
965 }
966 }
967 obj.extend_from_slice(&group.nested_template[num_sub]);
968 merged_col.push(obj);
969 } else {
970 let mut obj = Vec::new();
972 obj.push(b'{');
973 let mut first = true;
974 for si in 0..num_sub {
975 if flat_idx + si >= flat_col_values.len() {
976 break;
977 }
978 let val = flat_col_values[flat_idx + si][row];
979 if val == b"null" {
980 continue; }
982 if !first {
983 obj.push(b',');
984 }
985 first = false;
986 obj.push(b'"');
987 obj.extend_from_slice(&group.sub_keys[si]);
988 obj.push(b'"');
989 obj.push(b':');
990 obj.extend_from_slice(val);
991 }
992 obj.push(b'}');
993 merged_col.push(obj);
994 }
995 }
996 output_columns.push(merged_col);
997 flat_idx += num_sub;
998 } else {
999 if flat_idx < flat_col_values.len() {
1001 let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1002 .iter()
1003 .map(|v| v.to_vec())
1004 .collect();
1005 output_columns.push(col);
1006 }
1007 flat_idx += 1;
1008 }
1009 }
1010
1011 let num_out_cols = output_columns.len();
1013 let mut out = Vec::new();
1014 for (ci, col) in output_columns.iter().enumerate() {
1015 for (ri, val) in col.iter().enumerate() {
1016 out.extend_from_slice(val);
1017 if ri < num_rows - 1 {
1018 out.push(VAL_SEP);
1019 }
1020 }
1021 if ci < num_out_cols - 1 {
1022 out.push(COL_SEP);
1023 }
1024 }
1025
1026 out
1027}
1028
1029pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1033 let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1034 let mut out = Vec::new();
1035 out.push(if has_template { 2u8 } else { 1u8 });
1036 out.push(groups.len() as u8);
1037 for group in groups {
1038 out.extend_from_slice(&group.original_col_index.to_le_bytes());
1039 out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1040 for key in &group.sub_keys {
1041 out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1042 out.extend_from_slice(key);
1043 }
1044 if has_template {
1045 out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1046 for part in &group.nested_template {
1047 out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1048 out.extend_from_slice(part);
1049 }
1050 }
1051 }
1052 out
1053}
1054
1055pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1059 if data.is_empty() {
1060 return None;
1061 }
1062 let mut pos = 0;
1063 let version = data[pos];
1064 pos += 1;
1065 if version != 1 && version != 2 {
1066 return None;
1067 }
1068 let has_template = version == 2;
1069 if pos >= data.len() {
1070 return None;
1071 }
1072 let num_groups = data[pos] as usize;
1073 pos += 1;
1074
1075 let mut groups = Vec::with_capacity(num_groups);
1076 for _ in 0..num_groups {
1077 if pos + 4 > data.len() {
1078 return None;
1079 }
1080 let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1081 pos += 2;
1082 let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1083 pos += 2;
1084
1085 let mut sub_keys = Vec::with_capacity(num_sub_cols);
1086 for _ in 0..num_sub_cols {
1087 if pos + 2 > data.len() {
1088 return None;
1089 }
1090 let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1091 pos += 2;
1092 if pos + key_len > data.len() {
1093 return None;
1094 }
1095 sub_keys.push(data[pos..pos + key_len].to_vec());
1096 pos += key_len;
1097 }
1098
1099 let nested_template = if has_template {
1100 if pos + 2 > data.len() {
1101 return None;
1102 }
1103 let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1104 pos += 2;
1105 let mut parts = Vec::with_capacity(num_parts);
1106 for _ in 0..num_parts {
1107 if pos + 2 > data.len() {
1108 return None;
1109 }
1110 let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1111 pos += 2;
1112 if pos + part_len > data.len() {
1113 return None;
1114 }
1115 parts.push(data[pos..pos + part_len].to_vec());
1116 pos += part_len;
1117 }
1118 parts
1119 } else {
1120 Vec::new()
1121 };
1122
1123 groups.push(NestedGroupInfo {
1124 original_col_index,
1125 sub_keys,
1126 nested_template,
1127 });
1128 }
1129
1130 Some((groups, pos))
1131}
1132
1133pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1138 if data.is_empty() {
1139 return None;
1140 }
1141
1142 let has_trailing_newline = data.last() == Some(&b'\n');
1143 let lines = split_lines(data);
1144 let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1145
1146 if non_empty.len() < 2 {
1147 return None;
1148 }
1149
1150 if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1152 if col_data.len() + metadata.len() < data.len() {
1153 let num_rows = non_empty.len();
1158 if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1159 let nested_bytes = serialize_nested_info(&nested_groups);
1161 metadata.extend_from_slice(&nested_bytes);
1162 return Some(TransformResult {
1163 data: flat_data,
1164 metadata,
1165 });
1166 }
1167 metadata.push(0u8); return Some(TransformResult {
1170 data: col_data,
1171 metadata,
1172 });
1173 }
1174 }
1175
1176 if let Some((grouped_data, grouped_metadata)) =
1178 preprocess_grouped(&non_empty, has_trailing_newline)
1179 {
1180 if grouped_data.len() + grouped_metadata.len() < data.len() {
1181 return Some(TransformResult {
1182 data: grouped_data,
1183 metadata: grouped_metadata,
1184 });
1185 }
1186 }
1187
1188 None
1189}
1190
1191pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1194 if metadata.is_empty() {
1195 return data.to_vec();
1196 }
1197 match metadata[0] {
1198 METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1199 METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1200 _ => data.to_vec(),
1201 }
1202}
1203
1204fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1206 if metadata.len() < 10 {
1207 return data.to_vec();
1208 }
1209 let mut pos = 0;
1210 let _version = metadata[pos];
1211 pos += 1;
1212 let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1213 pos += 4;
1214 let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1215 pos += 2;
1216 let has_trailing_newline = metadata[pos] != 0;
1217 pos += 1;
1218 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1219 pos += 2;
1220
1221 let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1222 for _ in 0..num_parts {
1223 if pos + 2 > metadata.len() {
1224 return data.to_vec();
1225 }
1226 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1227 pos += 2;
1228 if pos + part_len > metadata.len() {
1229 return data.to_vec();
1230 }
1231 parts.push(metadata[pos..pos + part_len].to_vec());
1232 pos += part_len;
1233 }
1234
1235 if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1236 return data.to_vec();
1237 }
1238
1239 let remaining_metadata = &metadata[pos..];
1241 if !remaining_metadata.is_empty() && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2)
1242 {
1243 if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1245 let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1247 let unflattened =
1248 unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1249 return reverse_uniform_from_parts(
1250 &unflattened,
1251 &parts,
1252 num_rows,
1253 num_cols,
1254 has_trailing_newline,
1255 );
1256 }
1257 }
1258
1259 reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1260}
1261
1262fn reverse_uniform_from_parts(
1264 data: &[u8],
1265 parts: &[Vec<u8>],
1266 num_rows: usize,
1267 num_cols: usize,
1268 has_trailing_newline: bool,
1269) -> Vec<u8> {
1270 let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1271 if col_chunks.len() != num_cols {
1272 return data.to_vec();
1273 }
1274
1275 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1276 for chunk in &col_chunks {
1277 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1278 if vals.len() != num_rows {
1279 return data.to_vec();
1280 }
1281 columns.push(vals);
1282 }
1283
1284 let mut output = Vec::with_capacity(data.len() * 2);
1285 #[allow(clippy::needless_range_loop)]
1286 for row in 0..num_rows {
1287 output.extend_from_slice(&parts[0]);
1288 output.extend_from_slice(columns[0][row]);
1289 for col in 1..num_cols {
1290 output.extend_from_slice(&parts[col]);
1291 output.extend_from_slice(columns[col][row]);
1292 }
1293 output.extend_from_slice(&parts[num_cols]);
1294
1295 if row < num_rows - 1 || has_trailing_newline {
1296 output.push(b'\n');
1297 }
1298 }
1299
1300 output
1301}
1302
1303fn parse_uniform_metadata(metadata: &[u8]) -> Option<(Vec<Vec<u8>>, usize, usize, bool)> {
1306 if metadata.len() < 10 {
1307 return None;
1308 }
1309 let mut pos = 1; let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1311 pos += 4;
1312 let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1313 pos += 2;
1314 let has_trailing_newline = metadata[pos] != 0;
1315 pos += 1;
1316 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1317 pos += 2;
1318
1319 let mut parts = Vec::with_capacity(num_parts);
1320 for _ in 0..num_parts {
1321 if pos + 2 > metadata.len() {
1322 return None;
1323 }
1324 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1325 pos += 2;
1326 if pos + part_len > metadata.len() {
1327 return None;
1328 }
1329 parts.push(metadata[pos..pos + part_len].to_vec());
1330 pos += part_len;
1331 }
1332
1333 if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1334 return None;
1335 }
1336
1337 Some((parts, num_rows, num_cols, has_trailing_newline))
1338}
1339
1340fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1342 if metadata.len() < 8 {
1343 return data.to_vec();
1344 }
1345
1346 let mut mpos = 1; let has_trailing_newline = metadata[mpos] != 0;
1348 mpos += 1;
1349 let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1350 mpos += 4;
1351 let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1352 mpos += 2;
1353
1354 let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1356
1357 let mut dpos: usize = 0;
1359
1360 for _ in 0..num_groups {
1361 if mpos + 4 > metadata.len() {
1363 return data.to_vec();
1364 }
1365 let group_row_count =
1366 u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1367 mpos += 4;
1368
1369 let mut row_indices = Vec::with_capacity(group_row_count);
1370 for _ in 0..group_row_count {
1371 if mpos + 4 > metadata.len() {
1372 return data.to_vec();
1373 }
1374 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1375 mpos += 4;
1376 row_indices.push(idx);
1377 }
1378
1379 if mpos + 4 > metadata.len() {
1381 return data.to_vec();
1382 }
1383 let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1384 mpos += 4;
1385 if mpos + gm_len > metadata.len() {
1386 return data.to_vec();
1387 }
1388 let group_metadata = &metadata[mpos..mpos + gm_len];
1389 mpos += gm_len;
1390
1391 if dpos + 4 > data.len() {
1393 return data.to_vec();
1394 }
1395 let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1396 dpos += 4;
1397 if dpos + gd_len > data.len() {
1398 return data.to_vec();
1399 }
1400 let group_data = &data[dpos..dpos + gd_len];
1401 dpos += gd_len;
1402
1403 let (parts, num_rows, num_cols, _trailing) = match parse_uniform_metadata(group_metadata) {
1405 Some(v) => v,
1406 None => return data.to_vec(),
1407 };
1408
1409 if num_rows != group_row_count {
1410 return data.to_vec();
1411 }
1412
1413 let col_chunks: Vec<&[u8]> = group_data.split(|&b| b == COL_SEP).collect();
1415 if col_chunks.len() != num_cols {
1416 return data.to_vec();
1417 }
1418
1419 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1420 for chunk in &col_chunks {
1421 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1422 if vals.len() != num_rows {
1423 return data.to_vec();
1424 }
1425 columns.push(vals);
1426 }
1427
1428 for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1430 let mut line = Vec::new();
1431 line.extend_from_slice(&parts[0]);
1432 line.extend_from_slice(columns[0][row_within_group]);
1433 for col in 1..num_cols {
1434 line.extend_from_slice(&parts[col]);
1435 line.extend_from_slice(columns[col][row_within_group]);
1436 }
1437 line.extend_from_slice(&parts[num_cols]);
1438
1439 if original_idx < total_rows {
1440 output_lines[original_idx] = Some(line);
1441 }
1442 }
1443 }
1444
1445 if mpos + 4 > metadata.len() {
1447 return data.to_vec();
1448 }
1449 let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1450 mpos += 4;
1451
1452 let mut residual_indices = Vec::with_capacity(residual_count);
1453 for _ in 0..residual_count {
1454 if mpos + 4 > metadata.len() {
1455 return data.to_vec();
1456 }
1457 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1458 mpos += 4;
1459 residual_indices.push(idx);
1460 }
1461
1462 let residual_data = &data[dpos..];
1464 if residual_count > 0 {
1465 let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
1466 vec![]
1467 } else {
1468 residual_data.split(|&b| b == b'\n').collect()
1469 };
1470 if residual_lines.len() != residual_count {
1472 return data.to_vec();
1473 }
1474 for (i, &idx) in residual_indices.iter().enumerate() {
1475 if idx < total_rows {
1476 output_lines[idx] = Some(residual_lines[i].to_vec());
1477 }
1478 }
1479 }
1480
1481 let mut output = Vec::with_capacity(data.len() * 2);
1483 for (i, slot) in output_lines.iter().enumerate() {
1484 match slot {
1485 Some(line) => output.extend_from_slice(line),
1486 None => {
1487 return data.to_vec();
1489 }
1490 }
1491 if i < total_rows - 1 || has_trailing_newline {
1492 output.push(b'\n');
1493 }
1494 }
1495
1496 output
1497}
1498
1499#[cfg(test)]
1500mod tests {
1501 use super::*;
1502
1503 #[test]
1504 fn extract_value_string() {
1505 let line = br#""hello","next""#;
1506 let (val, end) = extract_value(line, 0).unwrap();
1507 assert_eq!(val, b"\"hello\"");
1508 assert_eq!(end, 7);
1509 }
1510
1511 #[test]
1512 fn extract_value_number() {
1513 let line = b"42,next";
1514 let (val, end) = extract_value(line, 0).unwrap();
1515 assert_eq!(val, b"42");
1516 assert_eq!(end, 2);
1517 }
1518
1519 #[test]
1520 fn extract_value_bool() {
1521 let line = b"true,next";
1522 let (val, end) = extract_value(line, 0).unwrap();
1523 assert_eq!(val, b"true");
1524 assert_eq!(end, 4);
1525 }
1526
1527 #[test]
1528 fn extract_value_null() {
1529 let line = b"null,next";
1530 let (val, end) = extract_value(line, 0).unwrap();
1531 assert_eq!(val, b"null");
1532 assert_eq!(end, 4);
1533 }
1534
1535 #[test]
1536 fn extract_value_object() {
1537 let line = br#"{"a":1,"b":"x"},next"#;
1538 let (val, end) = extract_value(line, 0).unwrap();
1539 assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
1540 assert_eq!(end, 15);
1541 }
1542
1543 #[test]
1544 fn extract_value_array() {
1545 let line = b"[1,2,3],next";
1546 let (val, end) = extract_value(line, 0).unwrap();
1547 assert_eq!(val, b"[1,2,3]");
1548 assert_eq!(end, 7);
1549 }
1550
1551 #[test]
1552 fn extract_value_string_with_escapes() {
1553 let line = br#""he\"llo",next"#;
1554 let (val, end) = extract_value(line, 0).unwrap();
1555 assert_eq!(val, br#""he\"llo""#.to_vec());
1556 assert_eq!(end, 9);
1557 }
1558
1559 #[test]
1560 fn parse_line_simple() {
1561 let line = br#"{"a":1,"b":"x"}"#;
1562 let (parts, values) = parse_line(line).unwrap();
1563 assert_eq!(parts.len(), 3); assert_eq!(values.len(), 2);
1565 assert_eq!(values[0], b"1");
1566 assert_eq!(values[1], b"\"x\"");
1567 assert_eq!(parts[0], br#"{"a":"#.to_vec());
1568 assert_eq!(parts[1], br#","b":"#.to_vec());
1569 assert_eq!(parts[2], b"}");
1570 }
1571
1572 #[test]
1573 fn roundtrip_simple() {
1574 let data = br#"{"a":1,"b":"x"}
1575{"a":2,"b":"y"}
1576{"a":3,"b":"z"}
1577"#;
1578 let result = preprocess(data).expect("should produce transform");
1579 let restored = reverse(&result.data, &result.metadata);
1580 assert_eq!(
1581 String::from_utf8_lossy(&restored),
1582 String::from_utf8_lossy(data),
1583 );
1584 assert_eq!(restored, data.to_vec());
1585 }
1586
1587 #[test]
1588 fn roundtrip_no_trailing_newline() {
1589 let data = br#"{"a":1,"b":"x"}
1590{"a":2,"b":"y"}
1591{"a":3,"b":"z"}"#;
1592 let result = preprocess(data).expect("should produce transform");
1593 let restored = reverse(&result.data, &result.metadata);
1594 assert_eq!(restored, data.to_vec());
1595 }
1596
1597 #[test]
1598 fn roundtrip_nested_values() {
1599 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1600{"id":2,"meta":{"x":30,"y":40}}
1601{"id":3,"meta":{"x":50,"y":60}}
1602{"id":4,"meta":{"x":70,"y":80}}
1603{"id":5,"meta":{"x":90,"y":100}}
1604"#;
1605 let result = preprocess(data).expect("should produce transform");
1606 let restored = reverse(&result.data, &result.metadata);
1607 assert_eq!(restored, data.to_vec());
1608 }
1609
1610 #[test]
1611 fn roundtrip_mixed_types() {
1612 let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
1613{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
1614{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
1615{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
1616{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
1617"#;
1618 let result = preprocess(data).expect("should produce transform");
1619 let restored = reverse(&result.data, &result.metadata);
1620 assert_eq!(restored, data.to_vec());
1621 }
1622
1623 #[test]
1624 fn schema_mismatch_too_few_returns_none() {
1625 let data = br#"{"a":1,"b":2}
1627{"a":1,"c":3}
1628"#;
1629 assert!(preprocess(data).is_none());
1630 }
1631
1632 #[test]
1633 fn different_num_keys_too_few_returns_none() {
1634 let data = br#"{"a":1,"b":2}
1635{"a":1}
1636"#;
1637 assert!(preprocess(data).is_none());
1638 }
1639
1640 #[test]
1641 fn single_line_returns_none() {
1642 let data = br#"{"a":1,"b":2}
1643"#;
1644 assert!(preprocess(data).is_none());
1645 }
1646
1647 #[test]
1648 fn empty_returns_none() {
1649 assert!(preprocess(b"").is_none());
1650 }
1651
1652 #[test]
1653 fn column_layout_groups_similar_values() {
1654 let data = br#"{"type":"page_view","user":"alice"}
1655{"type":"api_call","user":"alice"}
1656{"type":"click","user":"bob"}
1657"#;
1658 let result = preprocess(data).unwrap();
1659
1660 let col_data = &result.data;
1662 let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
1663 assert_eq!(cols.len(), 2);
1664
1665 let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
1667 assert_eq!(type_vals.len(), 3);
1668 assert_eq!(type_vals[0], br#""page_view""#);
1669 assert_eq!(type_vals[1], br#""api_call""#);
1670 assert_eq!(type_vals[2], br#""click""#);
1671
1672 let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
1674 assert_eq!(user_vals.len(), 3);
1675 assert_eq!(user_vals[0], br#""alice""#);
1676 assert_eq!(user_vals[1], br#""alice""#);
1677 assert_eq!(user_vals[2], br#""bob""#);
1678 }
1679
1680 #[test]
1681 fn roundtrip_string_with_escaped_chars() {
1682 let data = br#"{"msg":"he said \"hi\"","val":1}
1683{"msg":"line\nbreak","val":2}
1684{"msg":"tab\there","val":3}
1685{"msg":"back\\slash","val":4}
1686{"msg":"normal text","val":5}
1687"#;
1688 let result = preprocess(data).expect("should produce transform");
1689 let restored = reverse(&result.data, &result.metadata);
1690 assert_eq!(restored, data.to_vec());
1691 }
1692
1693 #[test]
1694 fn roundtrip_negative_and_float_numbers() {
1695 let data = br#"{"x":-3.14,"y":0}
1696{"x":2.718,"y":-1}
1697{"x":0.001,"y":999}
1698{"x":-100,"y":-200}
1699{"x":42.0,"y":7}
1700"#;
1701 let result = preprocess(data).expect("should produce transform");
1702 let restored = reverse(&result.data, &result.metadata);
1703 assert_eq!(restored, data.to_vec());
1704 }
1705
1706 #[test]
1709 fn reverse_roundtrip_small_data() {
1710 let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
1712 assert_eq!(vals.len(), 2);
1713 assert_eq!(parts.len(), 3);
1714
1715 let big_data = br#"{"x":-3.14,"y":0}
1717{"x":2.718,"y":-1}
1718"#
1719 .repeat(20);
1720 let result = preprocess(&big_data).expect("should produce transform with 40 rows");
1721 let restored = reverse(&result.data, &result.metadata);
1722 assert_eq!(restored, big_data);
1723 }
1724
1725 #[test]
1728 fn grouped_roundtrip_two_schemas() {
1729 let mut data = Vec::new();
1731 for i in 0..10 {
1732 data.extend_from_slice(
1733 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1734 );
1735 data.push(b'\n');
1736 }
1737 for i in 10..20 {
1738 data.extend_from_slice(
1739 format!(
1740 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1741 i, i, i
1742 )
1743 .as_bytes(),
1744 );
1745 data.push(b'\n');
1746 }
1747 let result = preprocess(&data).expect("should produce grouped transform");
1748 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1750 let restored = reverse(&result.data, &result.metadata);
1751 assert_eq!(restored, data);
1752 }
1753
1754 #[test]
1755 fn grouped_roundtrip_interleaved_schemas() {
1756 let mut data = Vec::new();
1758 for i in 0..20 {
1759 if i % 2 == 0 {
1760 data.extend_from_slice(
1761 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1762 );
1763 } else {
1764 data.extend_from_slice(
1765 format!(
1766 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1767 i, i, i
1768 )
1769 .as_bytes(),
1770 );
1771 }
1772 data.push(b'\n');
1773 }
1774 let result = preprocess(&data).expect("should produce grouped transform");
1775 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1776 let restored = reverse(&result.data, &result.metadata);
1777 assert_eq!(restored, data);
1778 }
1779
1780 #[test]
1781 fn grouped_roundtrip_with_residuals() {
1782 let mut data = Vec::new();
1784 for i in 0..8 {
1786 data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
1787 data.push(b'\n');
1788 }
1789 data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
1791 data.push(b'\n');
1792 data.extend_from_slice(br#"{"p":"q"}"#);
1793 data.push(b'\n');
1794 for i in 0..6 {
1796 data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
1797 data.push(b'\n');
1798 }
1799 let result = preprocess(&data).expect("should produce grouped transform");
1800 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1801 let restored = reverse(&result.data, &result.metadata);
1802 assert_eq!(
1803 String::from_utf8_lossy(&restored),
1804 String::from_utf8_lossy(&data),
1805 );
1806 assert_eq!(restored, data);
1807 }
1808
1809 #[test]
1810 fn grouped_roundtrip_no_trailing_newline() {
1811 let mut data = Vec::new();
1812 for i in 0..6 {
1813 data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
1814 data.push(b'\n');
1815 }
1816 for i in 0..6 {
1817 data.extend_from_slice(
1818 format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
1819 );
1820 if i < 5 {
1821 data.push(b'\n');
1822 }
1823 }
1825 let result = preprocess(&data).expect("should produce grouped transform");
1826 let restored = reverse(&result.data, &result.metadata);
1827 assert_eq!(restored, data);
1828 }
1829
1830 #[test]
1831 fn uniform_still_preferred_over_grouped() {
1832 let data = br#"{"a":1,"b":"x"}
1834{"a":2,"b":"y"}
1835{"a":3,"b":"z"}
1836{"a":4,"b":"w"}
1837{"a":5,"b":"v"}
1838"#;
1839 let result = preprocess(data).expect("should produce transform");
1840 assert_eq!(
1841 result.metadata[0], METADATA_VERSION_UNIFORM,
1842 "uniform schema should use Strategy 1"
1843 );
1844 let restored = reverse(&result.data, &result.metadata);
1845 assert_eq!(restored, data.to_vec());
1846 }
1847
1848 #[test]
1849 fn grouped_gharchive_simulation() {
1850 let mut data = Vec::new();
1852 for i in 0..50 {
1853 if i % 5 == 0 {
1854 data.extend_from_slice(
1856 format!(
1857 r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
1858 i, i, i, i
1859 )
1860 .as_bytes(),
1861 );
1862 } else {
1863 data.extend_from_slice(
1865 format!(
1866 r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
1867 i, i, i
1868 )
1869 .as_bytes(),
1870 );
1871 }
1872 data.push(b'\n');
1873 }
1874 let result = preprocess(&data).expect("should produce grouped transform");
1875 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1876 let restored = reverse(&result.data, &result.metadata);
1877 assert_eq!(restored, data);
1878 }
1879
1880 #[test]
1883 fn test_nested_decomposition_basic() {
1884 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1886{"id":2,"meta":{"x":30,"y":40}}
1887{"id":3,"meta":{"x":50,"y":60}}
1888"#;
1889 let result = preprocess(data).expect("should produce transform");
1890 assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
1891
1892 let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
1894 assert_eq!(
1896 cols.len(),
1897 3,
1898 "should have 3 columns after flattening: got {}",
1899 cols.len()
1900 );
1901
1902 let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
1904 assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
1905
1906 let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
1907 assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
1908 }
1909
1910 #[test]
1911 fn test_nested_roundtrip() {
1912 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1914{"id":2,"meta":{"x":30,"y":40}}
1915{"id":3,"meta":{"x":50,"y":60}}
1916"#;
1917 let result = preprocess(data).expect("should produce transform");
1918 let restored = reverse(&result.data, &result.metadata);
1919 assert_eq!(
1920 String::from_utf8_lossy(&restored),
1921 String::from_utf8_lossy(data),
1922 );
1923 assert_eq!(restored, data.to_vec());
1924 }
1925
1926 #[test]
1927 fn test_nested_mixed_schemas() {
1928 let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
1930{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
1931{"ts":"c","meta":{"query":"pricing","results_count":25}}
1932{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
1933{"ts":"e","meta":{"query":"api docs","results_count":41}}
1934"#;
1935 let result = preprocess(data).expect("should produce transform");
1936 let restored = reverse(&result.data, &result.metadata);
1937 assert_eq!(
1938 String::from_utf8_lossy(&restored),
1939 String::from_utf8_lossy(data),
1940 );
1941 assert_eq!(restored, data.to_vec());
1942 }
1943
1944 #[test]
1945 fn test_nested_no_nested_objects() {
1946 let data = br#"{"a":1,"b":"x"}
1948{"a":2,"b":"y"}
1949{"a":3,"b":"z"}
1950"#;
1951 let result = preprocess(data).expect("should produce transform");
1952 let restored = reverse(&result.data, &result.metadata);
1953 assert_eq!(restored, data.to_vec());
1954
1955 let meta = &result.metadata;
1961 let last_byte = meta[meta.len() - 1];
1962 assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
1963 }
1964
1965 #[test]
1966 fn test_nested_real_corpus() {
1967 let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
1969{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
1970{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
1971{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
1972{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
1973"#;
1974 let result = preprocess(data).expect("should produce transform");
1975 let restored = reverse(&result.data, &result.metadata);
1976 assert_eq!(
1977 String::from_utf8_lossy(&restored),
1978 String::from_utf8_lossy(data),
1979 );
1980 assert_eq!(restored, data.to_vec());
1981 }
1982
1983 #[test]
1984 fn test_nested_roundtrip_with_null_values() {
1985 let data = br#"{"id":1,"meta":{"x":10}}
1987{"id":2,"meta":null}
1988{"id":3,"meta":{"x":30}}
1989{"id":4,"meta":null}
1990{"id":5,"meta":{"x":50}}
1991"#;
1992 let result = preprocess(data).expect("should produce transform");
1993 let restored = reverse(&result.data, &result.metadata);
1994 assert_eq!(restored, data.to_vec());
1995 }
1996
1997 #[test]
1998 fn test_nested_string_values_preserved_exact() {
1999 let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2001{"id":2,"meta":{"name":"Bob","score":200}}
2002{"id":3,"meta":{"name":"Charlie","score":300}}
2003"#;
2004 let result = preprocess(data).expect("should produce transform");
2005 let restored = reverse(&result.data, &result.metadata);
2006 assert_eq!(restored, data.to_vec());
2007 }
2008
2009 #[test]
2010 fn test_parse_nested_object_kv() {
2011 let obj = br#"{"query":"benchmark","results_count":14}"#;
2012 let pairs = parse_nested_object_kv(obj).unwrap();
2013 assert_eq!(pairs.len(), 2);
2014 assert_eq!(pairs[0].0, b"query");
2015 assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2016 assert_eq!(pairs[1].0, b"results_count");
2017 assert_eq!(pairs[1].1, b"14");
2018 }
2019}