1use super::transform::TransformResult;
24use std::collections::HashMap;
25
26const COL_SEP: u8 = 0x00;
27const VAL_SEP: u8 = 0x01;
28const METADATA_VERSION_UNIFORM: u8 = 1;
29const METADATA_VERSION_GROUPED: u8 = 2;
30const METADATA_VERSION_SELECTIVE: u8 = 3;
31#[allow(dead_code)]
35const METADATA_VERSION_TYPED: u8 = 4;
36
37const MIN_GROUP_ROWS: usize = 5;
39
40const SELECTIVE_MIN_AVG_LEN: usize = 128;
46
47const SELECTIVE_MAX_CARDINALITY: f64 = 0.7;
51
52type SchemaGroup<'a> = (Vec<&'a [u8]>, Vec<(usize, Vec<&'a [u8]>)>);
55
56fn extract_value(line: &[u8], mut pos: usize) -> Option<(&[u8], usize)> {
62 while pos < line.len() && line[pos].is_ascii_whitespace() {
64 pos += 1;
65 }
66 if pos >= line.len() {
67 return None;
68 }
69
70 let start = pos;
71 match line[pos] {
72 b'"' => {
73 pos += 1;
75 let mut escaped = false;
76 while pos < line.len() {
77 if escaped {
78 escaped = false;
79 } else if line[pos] == b'\\' {
80 escaped = true;
81 } else if line[pos] == b'"' {
82 pos += 1;
83 return Some((&line[start..pos], pos));
84 }
85 pos += 1;
86 }
87 None }
89 b'{' => {
90 let mut depth = 1;
92 pos += 1;
93 while pos < line.len() && depth > 0 {
94 match line[pos] {
95 b'"' => {
96 pos += 1;
98 let mut escaped = false;
99 while pos < line.len() {
100 if escaped {
101 escaped = false;
102 } else if line[pos] == b'\\' {
103 escaped = true;
104 } else if line[pos] == b'"' {
105 break;
106 }
107 pos += 1;
108 }
109 }
110 b'{' => depth += 1,
111 b'}' => depth -= 1,
112 _ => {}
113 }
114 pos += 1;
115 }
116 if depth != 0 || pos > line.len() {
117 return None; }
119 Some((&line[start..pos], pos))
120 }
121 b'[' => {
122 let mut depth = 1;
124 pos += 1;
125 while pos < line.len() && depth > 0 {
126 match line[pos] {
127 b'"' => {
128 pos += 1;
129 let mut escaped = false;
130 while pos < line.len() {
131 if escaped {
132 escaped = false;
133 } else if line[pos] == b'\\' {
134 escaped = true;
135 } else if line[pos] == b'"' {
136 break;
137 }
138 pos += 1;
139 }
140 }
141 b'[' => depth += 1,
142 b']' => depth -= 1,
143 _ => {}
144 }
145 pos += 1;
146 }
147 if depth != 0 || pos > line.len() {
148 return None; }
150 Some((&line[start..pos], pos))
151 }
152 _ => {
153 while pos < line.len() {
155 match line[pos] {
156 b',' | b'}' | b']' => break,
157 _ if line[pos].is_ascii_whitespace() => break,
158 _ => pos += 1,
159 }
160 }
161 if pos == start {
162 None
163 } else {
164 Some((&line[start..pos], pos))
165 }
166 }
167 }
168}
169
170type ParsedLine<'a> = (Vec<&'a [u8]>, Vec<&'a [u8]>);
181
182fn parse_line(line: &[u8]) -> Option<ParsedLine<'_>> {
183 let mut pos = 0;
184
185 while pos < line.len() && line[pos].is_ascii_whitespace() {
187 pos += 1;
188 }
189 if pos >= line.len() || line[pos] != b'{' {
190 return None;
191 }
192
193 let mut parts: Vec<&[u8]> = Vec::new();
194 let mut values: Vec<&[u8]> = Vec::new();
195 let mut part_start = 0;
196
197 pos += 1; loop {
200 while pos < line.len() && line[pos].is_ascii_whitespace() {
202 pos += 1;
203 }
204 if pos >= line.len() {
205 return None;
206 }
207
208 if line[pos] == b'}' {
210 parts.push(&line[part_start..]);
212 break;
213 }
214
215 if line[pos] != b'"' {
217 return None;
218 }
219 pos += 1;
221 let mut escaped = false;
222 while pos < line.len() {
223 if escaped {
224 escaped = false;
225 } else if line[pos] == b'\\' {
226 escaped = true;
227 } else if line[pos] == b'"' {
228 pos += 1;
229 break;
230 }
231 pos += 1;
232 }
233
234 while pos < line.len() && line[pos].is_ascii_whitespace() {
236 pos += 1;
237 }
238 if pos >= line.len() || line[pos] != b':' {
239 return None;
240 }
241 pos += 1; while pos < line.len() && line[pos].is_ascii_whitespace() {
247 pos += 1;
248 }
249
250 parts.push(&line[part_start..pos]);
252
253 let (value, value_end) = extract_value(line, pos)?;
255 values.push(value);
256 pos = value_end;
257
258 part_start = pos;
260
261 while pos < line.len() && line[pos].is_ascii_whitespace() {
263 pos += 1;
264 }
265 if pos >= line.len() {
266 return None;
267 }
268
269 if line[pos] == b',' {
271 pos += 1;
272 } else if line[pos] == b'}' {
273 parts.push(&line[part_start..]);
277 break;
278 } else {
279 return None; }
281 }
282
283 if values.is_empty() {
284 return None;
285 }
286
287 Some((parts, values))
288}
289
290fn split_lines(data: &[u8]) -> Vec<&[u8]> {
292 let mut lines: Vec<&[u8]> = Vec::new();
293 let mut start = 0;
294 for pos in memchr::memchr_iter(b'\n', data) {
295 lines.push(&data[start..pos]);
296 start = pos + 1;
297 }
298 if start < data.len() {
299 lines.push(&data[start..]);
300 }
301 lines
302}
303
304fn build_uniform_columnar(
307 template_parts: &[&[u8]],
308 columns: &[Vec<&[u8]>],
309 num_rows: usize,
310 has_trailing_newline: bool,
311) -> (Vec<u8>, Vec<u8>) {
312 let num_cols = columns.len();
313
314 let mut col_data = Vec::new();
316 for (ci, col) in columns.iter().enumerate() {
317 for (ri, val) in col.iter().enumerate() {
318 col_data.extend_from_slice(val);
319 if ri < num_rows - 1 {
320 col_data.push(VAL_SEP);
321 }
322 }
323 if ci < num_cols - 1 {
324 col_data.push(COL_SEP);
325 }
326 }
327
328 let mut metadata = Vec::new();
330 metadata.push(METADATA_VERSION_UNIFORM);
331 metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
332 metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
333 metadata.push(if has_trailing_newline { 1 } else { 0 });
334 metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
335 for part in template_parts {
336 metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
337 metadata.extend_from_slice(part);
338 }
339
340 (col_data, metadata)
341}
342
343fn classify_columns(columns: &[Vec<&[u8]>], num_rows: usize) -> Vec<bool> {
348 use std::collections::HashSet;
349 columns
350 .iter()
351 .map(|col_values| {
352 if num_rows < 10 {
353 return true; }
355 let unique: HashSet<&[u8]> = col_values.iter().copied().collect();
356 let cardinality_ratio = unique.len() as f64 / num_rows as f64;
357 let avg_len = col_values.iter().map(|v| v.len()).sum::<usize>() / num_rows;
358 !(cardinality_ratio > SELECTIVE_MAX_CARDINALITY && avg_len >= SELECTIVE_MIN_AVG_LEN)
360 })
361 .collect()
362}
363
364fn build_selective_columnar(
376 template_parts: &[&[u8]],
377 columns: &[Vec<&[u8]>],
378 extract_mask: &[bool],
379 num_rows: usize,
380 has_trailing_newline: bool,
381) -> (Vec<u8>, Vec<u8>) {
382 let num_total_cols = columns.len();
383
384 let extracted_indices: Vec<u16> = (0..num_total_cols)
385 .filter(|&i| extract_mask[i])
386 .map(|i| i as u16)
387 .collect();
388 let inline_indices: Vec<u16> = (0..num_total_cols)
389 .filter(|&i| !extract_mask[i])
390 .map(|i| i as u16)
391 .collect();
392
393 let mut extracted_data = Vec::new();
395 for (ei, &col_idx) in extracted_indices.iter().enumerate() {
396 let col = &columns[col_idx as usize];
397 for (ri, val) in col.iter().enumerate() {
398 extracted_data.extend_from_slice(val);
399 if ri < num_rows - 1 {
400 extracted_data.push(VAL_SEP);
401 }
402 }
403 if ei < extracted_indices.len() - 1 {
404 extracted_data.push(COL_SEP);
405 }
406 }
407
408 let mut inline_data = Vec::new();
410 if !inline_indices.is_empty() {
411 #[allow(clippy::needless_range_loop)]
412 for row in 0..num_rows {
413 for (ii, &col_idx) in inline_indices.iter().enumerate() {
414 inline_data.extend_from_slice(columns[col_idx as usize][row]);
415 if ii < inline_indices.len() - 1 {
416 inline_data.push(VAL_SEP);
417 }
418 }
419 if row < num_rows - 1 {
420 inline_data.push(COL_SEP);
421 }
422 }
423 }
424
425 let mut data = Vec::with_capacity(4 + extracted_data.len() + inline_data.len());
427 data.extend_from_slice(&(extracted_data.len() as u32).to_le_bytes());
428 data.extend_from_slice(&extracted_data);
429 data.extend_from_slice(&inline_data);
430
431 let mut metadata = Vec::new();
433 metadata.push(METADATA_VERSION_SELECTIVE);
434 metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
435 metadata.extend_from_slice(&(num_total_cols as u16).to_le_bytes());
436 metadata.push(if has_trailing_newline { 1 } else { 0 });
437 metadata.extend_from_slice(&(extracted_indices.len() as u16).to_le_bytes());
438 for &idx in &extracted_indices {
439 metadata.extend_from_slice(&idx.to_le_bytes());
440 }
441 metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
442 for part in template_parts {
443 metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
444 metadata.extend_from_slice(part);
445 }
446
447 (data, metadata)
448}
449
450fn preprocess_uniform(
453 non_empty: &[&[u8]],
454 has_trailing_newline: bool,
455) -> Option<(Vec<u8>, Vec<u8>)> {
456 if non_empty.len() < 2 {
457 return None;
458 }
459
460 let (template_parts, first_values) = parse_line(non_empty[0])?;
461 let num_cols = first_values.len();
462 if template_parts.len() != num_cols + 1 {
463 return None;
464 }
465
466 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
467 for v in &first_values {
468 columns.push(vec![*v]);
469 }
470
471 for &line in &non_empty[1..] {
472 let (parts, values) = parse_line(line)?;
473 if values.len() != num_cols || parts.len() != template_parts.len() {
474 return None;
475 }
476 for (a, b) in parts.iter().zip(template_parts.iter()) {
477 if a != b {
478 return None;
479 }
480 }
481 for (col, val) in values.iter().enumerate() {
482 columns[col].push(*val);
483 }
484 }
485
486 let extract_mask = classify_columns(&columns, non_empty.len());
492 let all_extracted = extract_mask.iter().all(|&e| e);
493
494 if all_extracted {
495 Some(build_uniform_columnar(
496 &template_parts,
497 &columns,
498 non_empty.len(),
499 has_trailing_newline,
500 ))
501 } else {
502 Some(build_selective_columnar(
503 &template_parts,
504 &columns,
505 &extract_mask,
506 non_empty.len(),
507 has_trailing_newline,
508 ))
509 }
510}
511
512fn find_discriminator<'a>(parsed: &[Option<ParsedLine<'a>>]) -> Option<usize> {
517 use std::collections::HashSet;
518
519 let mut samples: Vec<&ParsedLine<'a>> = Vec::new();
521 for line in parsed.iter().flatten() {
522 samples.push(line);
523 if samples.len() >= 200 {
524 break;
525 }
526 }
527
528 if samples.len() < 10 {
529 return None;
530 }
531
532 let num_cols = samples.iter().map(|s| s.1.len()).min().unwrap_or(0);
534 if num_cols == 0 {
535 return None;
536 }
537
538 let mut best_col = None;
539 let mut best_cardinality = usize::MAX;
540
541 for col_idx in 0..num_cols {
542 let total_len: usize = samples.iter().map(|s| s.1[col_idx].len()).sum();
544 let avg_len = total_len / samples.len();
545 if avg_len > 30 {
546 continue;
547 }
548
549 let unique: HashSet<&[u8]> = samples.iter().map(|s| s.1[col_idx]).collect();
550 let cardinality = unique.len();
551
552 if cardinality > 1 && cardinality < samples.len() / 3 && cardinality < best_cardinality {
554 best_col = Some(col_idx);
555 best_cardinality = cardinality;
556 }
557 }
558
559 best_col
560}
561
562fn preprocess_grouped<'a>(
583 non_empty: &[&'a [u8]],
584 has_trailing_newline: bool,
585) -> Option<(Vec<u8>, Vec<u8>)> {
586 if non_empty.len() < MIN_GROUP_ROWS {
587 return None;
588 }
589
590 let parsed: Vec<Option<ParsedLine<'a>>> =
592 non_empty.iter().map(|&l| parse_line(l)).collect();
593 let disc_col = find_discriminator(&parsed);
594 drop(parsed);
595
596 let result_no_disc = preprocess_grouped_core(non_empty, has_trailing_newline, None);
598
599 if let Some(dc) = disc_col {
601 let result_disc = preprocess_grouped_core(non_empty, has_trailing_newline, Some(dc));
602 match (&result_no_disc, &result_disc) {
603 (Some((d1, m1)), Some((d2, m2))) => {
604 if d2.len() + m2.len() < d1.len() + m1.len() {
605 return result_disc;
606 }
607 return result_no_disc;
608 }
609 (None, Some(_)) => return result_disc,
610 _ => return result_no_disc,
611 }
612 }
613
614 result_no_disc
615}
616
617fn preprocess_grouped_core<'a>(
620 non_empty: &[&'a [u8]],
621 has_trailing_newline: bool,
622 disc_col: Option<usize>,
623) -> Option<(Vec<u8>, Vec<u8>)> {
624 let mut parsed: Vec<Option<ParsedLine<'a>>> = Vec::with_capacity(non_empty.len());
626 for &line in non_empty {
627 parsed.push(parse_line(line));
628 }
629
630 let mut group_map: HashMap<Vec<u8>, SchemaGroup<'a>> = HashMap::new();
631 let mut residual_indices: Vec<usize> = Vec::new();
632
633 for (idx, parsed_line) in parsed.into_iter().enumerate() {
634 if let Some((parts, values)) = parsed_line {
635 let mut key = Vec::new();
636 for part in &parts {
637 key.extend_from_slice(&(part.len() as u32).to_le_bytes());
638 key.extend_from_slice(part);
639 }
640 if let Some(dc) = disc_col {
642 if dc < values.len() {
643 key.push(0xFF);
644 key.extend_from_slice(values[dc]);
645 }
646 }
647 group_map
648 .entry(key)
649 .or_insert_with(|| (parts, Vec::new()))
650 .1
651 .push((idx, values));
652 } else {
653 residual_indices.push(idx);
654 }
655 }
656
657 let mut groups: Vec<SchemaGroup<'a>> = Vec::new();
659 for (_key, (template_parts, rows)) in group_map {
660 if rows.len() >= MIN_GROUP_ROWS {
661 groups.push((template_parts, rows));
662 } else {
663 for (idx, _) in &rows {
665 residual_indices.push(*idx);
666 }
667 }
668 }
669
670 if groups.is_empty() {
672 return None;
673 }
674
675 groups.sort_by_key(|(_, rows)| rows[0].0);
677 residual_indices.sort_unstable();
678
679 struct GroupOutput {
681 row_indices: Vec<u32>,
682 col_data: Vec<u8>,
683 group_metadata: Vec<u8>,
684 }
685
686 let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
687
688 for (template_parts, rows) in &groups {
689 let num_cols = template_parts.len() - 1;
690 let mut columns: Vec<Vec<&[u8]>> = (0..num_cols).map(|_| Vec::new()).collect();
691 let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
692
693 for (idx, values) in rows {
694 row_indices.push(*idx as u32);
695 for (col, val) in values.iter().enumerate() {
696 columns[col].push(*val);
697 }
698 }
699
700 let extract_mask = classify_columns(&columns, rows.len());
702 let all_extracted = extract_mask.iter().all(|&e| e);
703
704 let (mut col_data, mut group_metadata) = if all_extracted {
705 build_uniform_columnar(template_parts, &columns, rows.len(), false)
706 } else {
707 build_selective_columnar(template_parts, &columns, &extract_mask, rows.len(), false)
708 };
709
710 if all_extracted {
714 if let Some((flat_data, nested_groups)) =
715 flatten_nested_columns(&col_data, rows.len())
716 {
717 let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
718 let unflattened = unflatten_nested_columns(
719 &flat_data,
720 &nested_groups,
721 rows.len(),
722 total_flat_cols,
723 );
724 if unflattened == col_data {
725 let nested_bytes = serialize_nested_info(&nested_groups);
726 group_metadata.extend_from_slice(&nested_bytes);
727 col_data = flat_data;
728 } else {
729 group_metadata.push(0u8); }
731 } else {
732 group_metadata.push(0u8); }
734
735 }
736
737 group_outputs.push(GroupOutput {
738 row_indices,
739 col_data,
740 group_metadata,
741 });
742 }
743
744 let mut data_out = Vec::new();
746 for group in &group_outputs {
747 data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
748 data_out.extend_from_slice(&group.col_data);
749 }
750
751 let residual_start = data_out.len();
753 for (i, &idx) in residual_indices.iter().enumerate() {
754 data_out.extend_from_slice(non_empty[idx]);
755 if i < residual_indices.len() - 1 {
756 data_out.push(b'\n');
757 }
758 }
759 let _residual_len = data_out.len() - residual_start;
760
761 let mut metadata = Vec::new();
763 metadata.push(METADATA_VERSION_GROUPED);
764 metadata.push(if has_trailing_newline { 1 } else { 0 });
765 metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
766 metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
767
768 for group in &group_outputs {
769 metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
770 for &idx in &group.row_indices {
771 metadata.extend_from_slice(&idx.to_le_bytes());
772 }
773 metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
774 metadata.extend_from_slice(&group.group_metadata);
775 }
776
777 metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
778 for &idx in &residual_indices {
779 metadata.extend_from_slice(&(idx as u32).to_le_bytes());
780 }
781
782 Some((data_out, metadata))
783}
784
785pub(crate) struct NestedGroupInfo {
787 pub(crate) original_col_index: u16,
789 pub(crate) sub_keys: Vec<Vec<u8>>,
791 pub(crate) nested_template: Vec<Vec<u8>>,
794 pub(crate) absence_bitmap: Vec<u8>,
800}
801
802pub(crate) fn flatten_nested_columns(
808 col_data: &[u8],
809 num_rows: usize,
810) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
811 let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
813 if columns.is_empty() || num_rows == 0 {
814 return None;
815 }
816
817 let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
818 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
821
822 for (col_idx, &col_chunk) in columns.iter().enumerate() {
823 let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
824 if values.len() != num_rows {
825 return None;
826 }
827
828 let mut all_objects = true;
830 let mut has_non_null = false;
831 for val in &values {
832 if *val == b"null" {
833 continue;
834 }
835 has_non_null = true;
836 if !val.starts_with(b"{") {
837 all_objects = false;
838 break;
839 }
840 }
841
842 if !all_objects || !has_non_null {
843 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
845 output_columns.push(col_values);
846 continue;
847 }
848
849 let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
853 let mut nested_template: Vec<Vec<u8>> = Vec::new();
854 type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
855 let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
856
857 for val in &values {
858 if *val == b"null" {
859 parsed_rows.push(None);
860 continue;
861 }
862 if nested_template.is_empty() {
863 match parse_nested_object_with_template(val) {
865 Some((template, kv_pairs)) => {
866 for (key, _) in &kv_pairs {
867 if !all_sub_keys.iter().any(|k| k == key) {
868 all_sub_keys.push(key.clone());
869 }
870 }
871 nested_template = template;
872 parsed_rows.push(Some(kv_pairs));
873 }
874 None => {
875 all_sub_keys.clear();
876 break;
877 }
878 }
879 } else {
880 match parse_nested_object_kv(val) {
882 Some(kv_pairs) => {
883 for (key, _) in &kv_pairs {
884 if !all_sub_keys.iter().any(|k| k == key) {
885 all_sub_keys.push(key.clone());
886 }
887 }
888 parsed_rows.push(Some(kv_pairs));
889 }
890 None => {
891 all_sub_keys.clear();
892 break;
893 }
894 }
895 }
896 }
897
898 if all_sub_keys.is_empty() {
899 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
901 output_columns.push(col_values);
902 continue;
903 }
904
905 let num_sub_keys = all_sub_keys.len();
909 let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
910 let total_bits = num_sub_keys * num_rows;
911 let bitmap_bytes = total_bits.div_ceil(8);
912 let mut absence_bitmap = vec![0u8; bitmap_bytes];
913 let mut has_any_absent = false;
914
915 for (row_idx, parsed) in parsed_rows.iter().enumerate() {
916 match parsed {
917 Some(kv_pairs) => {
918 for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
919 let found = kv_pairs.iter().find(|(k, _)| k == sk);
920 match found {
921 Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
922 None => {
923 sub_columns[sk_idx].push(b"null".to_vec());
924 let bit_idx = sk_idx * num_rows + row_idx;
926 absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
927 has_any_absent = true;
928 }
929 }
930 }
931 }
932 None => {
933 for sc in sub_columns.iter_mut() {
937 sc.push(b"null".to_vec());
938 }
939 }
940 }
941 }
942
943 nested_groups.push(NestedGroupInfo {
944 original_col_index: col_idx as u16,
945 sub_keys: all_sub_keys,
946 nested_template,
947 absence_bitmap: if has_any_absent {
948 absence_bitmap
949 } else {
950 Vec::new()
951 },
952 });
953
954 for sc in sub_columns {
955 output_columns.push(sc);
956 }
957 }
958
959 if nested_groups.is_empty() {
960 return None;
961 }
962
963 let num_out_cols = output_columns.len();
965 let mut out = Vec::new();
966 for (ci, col) in output_columns.iter().enumerate() {
967 for (ri, val) in col.iter().enumerate() {
968 out.extend_from_slice(val);
969 if ri < num_rows - 1 {
970 out.push(VAL_SEP);
971 }
972 }
973 if ci < num_out_cols - 1 {
974 out.push(COL_SEP);
975 }
976 }
977
978 Some((out, nested_groups))
979}
980
981#[allow(clippy::type_complexity)]
986pub(crate) fn parse_nested_object_with_template(
987 obj: &[u8],
988) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
989 let mut pos = 0;
990
991 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
993 pos += 1;
994 }
995 if pos >= obj.len() || obj[pos] != b'{' {
996 return None;
997 }
998 pos += 1;
999
1000 let mut parts: Vec<Vec<u8>> = Vec::new();
1001 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1002 let mut part_start = 0;
1003
1004 loop {
1005 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1007 pos += 1;
1008 }
1009 if pos >= obj.len() {
1010 return None;
1011 }
1012 if obj[pos] == b'}' {
1013 parts.push(obj[part_start..].to_vec());
1014 break;
1015 }
1016
1017 if obj[pos] != b'"' {
1019 return None;
1020 }
1021 let key_str_start = pos + 1;
1022 pos += 1;
1023 let mut escaped = false;
1024 while pos < obj.len() {
1025 if escaped {
1026 escaped = false;
1027 } else if obj[pos] == b'\\' {
1028 escaped = true;
1029 } else if obj[pos] == b'"' {
1030 break;
1031 }
1032 pos += 1;
1033 }
1034 if pos >= obj.len() {
1035 return None;
1036 }
1037 let key = obj[key_str_start..pos].to_vec();
1038 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1042 pos += 1;
1043 }
1044 if pos >= obj.len() || obj[pos] != b':' {
1045 return None;
1046 }
1047 pos += 1;
1048
1049 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1051 pos += 1;
1052 }
1053
1054 parts.push(obj[part_start..pos].to_vec());
1056
1057 let value_start = pos;
1059 let (value, value_end) = extract_value(obj, value_start)?;
1061 pos = value_end;
1062 pairs.push((key, value.to_vec()));
1063
1064 part_start = pos;
1065
1066 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1068 pos += 1;
1069 }
1070 if pos >= obj.len() {
1071 return None;
1072 }
1073 if obj[pos] == b',' {
1074 pos += 1;
1075 } else if obj[pos] == b'}' {
1076 parts.push(obj[part_start..].to_vec());
1077 break;
1078 } else {
1079 return None;
1080 }
1081 }
1082
1083 if pairs.is_empty() {
1084 return None;
1085 }
1086 Some((parts, pairs))
1087}
1088
1089pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
1093 let mut pos = 0;
1094
1095 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1097 pos += 1;
1098 }
1099 if pos >= obj.len() || obj[pos] != b'{' {
1100 return None;
1101 }
1102 pos += 1;
1103
1104 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1105
1106 loop {
1107 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1109 pos += 1;
1110 }
1111 if pos >= obj.len() {
1112 return None;
1113 }
1114 if obj[pos] == b'}' {
1115 break;
1116 }
1117
1118 if obj[pos] != b'"' {
1120 return None;
1121 }
1122 pos += 1;
1123 let key_start = pos;
1124 let mut escaped = false;
1125 while pos < obj.len() {
1126 if escaped {
1127 escaped = false;
1128 } else if obj[pos] == b'\\' {
1129 escaped = true;
1130 } else if obj[pos] == b'"' {
1131 break;
1132 }
1133 pos += 1;
1134 }
1135 if pos >= obj.len() {
1136 return None;
1137 }
1138 let key = obj[key_start..pos].to_vec();
1139 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1143 pos += 1;
1144 }
1145 if pos >= obj.len() || obj[pos] != b':' {
1146 return None;
1147 }
1148 pos += 1;
1149
1150 let (value, value_end) = extract_value(obj, pos)?;
1152 pos = value_end;
1153 pairs.push((key, value.to_vec()));
1154
1155 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1157 pos += 1;
1158 }
1159 if pos >= obj.len() {
1160 return None;
1161 }
1162 if obj[pos] == b',' {
1163 pos += 1;
1164 } else if obj[pos] == b'}' {
1165 break;
1166 } else {
1167 return None;
1168 }
1169 }
1170
1171 if pairs.is_empty() {
1172 return None;
1173 }
1174 Some(pairs)
1175}
1176
1177pub(crate) fn unflatten_nested_columns(
1182 flat_data: &[u8],
1183 nested_groups: &[NestedGroupInfo],
1184 num_rows: usize,
1185 total_flat_cols: usize,
1186) -> Vec<u8> {
1187 let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
1188 if flat_columns.len() != total_flat_cols {
1189 return flat_data.to_vec();
1190 }
1191
1192 let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
1194 for chunk in &flat_columns {
1195 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1196 if vals.len() != num_rows {
1197 return flat_data.to_vec();
1198 }
1199 flat_col_values.push(vals);
1200 }
1201
1202 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
1205
1206 let original_num_cols = total_flat_cols
1215 - nested_groups
1216 .iter()
1217 .map(|g| g.sub_keys.len())
1218 .sum::<usize>()
1219 + nested_groups.len();
1220
1221 let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
1223 for (gi, group) in nested_groups.iter().enumerate() {
1224 if (group.original_col_index as usize) < original_num_cols {
1225 original_col_map[group.original_col_index as usize] = Some(gi);
1226 }
1227 }
1228
1229 let mut flat_idx = 0;
1230 for entry in original_col_map.iter().take(original_num_cols) {
1231 if let Some(gi) = entry {
1232 let group = &nested_groups[*gi];
1233 let num_sub = group.sub_keys.len();
1234
1235 let is_absent = |si: usize, row: usize| -> bool {
1237 if group.absence_bitmap.is_empty() {
1238 return false; }
1240 let bit_idx = si * num_rows + row;
1241 let byte_idx = bit_idx / 8;
1242 if byte_idx >= group.absence_bitmap.len() {
1243 return false;
1244 }
1245 (group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
1246 };
1247
1248 let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
1250 for row in 0..num_rows {
1251 let all_null = (0..num_sub).all(|si| {
1254 flat_idx + si < flat_col_values.len()
1255 && flat_col_values[flat_idx + si][row] == b"null"
1256 });
1257 if all_null && !group.absence_bitmap.is_empty() {
1258 let any_present_null = (0..num_sub).any(|si| {
1261 flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
1262 });
1263 if any_present_null {
1264 } else {
1267 merged_col.push(b"null".to_vec());
1269 continue;
1270 }
1271 } else if all_null {
1272 merged_col.push(b"null".to_vec());
1273 continue;
1274 }
1275
1276 let has_absent = (0..num_sub).any(|si| is_absent(si, row));
1278
1279 if !has_absent
1280 && !group.nested_template.is_empty()
1281 && group.nested_template.len() == num_sub + 1
1282 {
1283 let mut obj = Vec::new();
1286 obj.extend_from_slice(&group.nested_template[0]);
1287 if flat_idx < flat_col_values.len() {
1288 obj.extend_from_slice(flat_col_values[flat_idx][row]);
1289 }
1290 for si in 1..num_sub {
1291 obj.extend_from_slice(&group.nested_template[si]);
1292 if flat_idx + si < flat_col_values.len() {
1293 obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
1294 }
1295 }
1296 obj.extend_from_slice(&group.nested_template[num_sub]);
1297 merged_col.push(obj);
1298 } else {
1299 let mut obj = Vec::new();
1302 obj.push(b'{');
1303 let mut first = true;
1304 for si in 0..num_sub {
1305 if flat_idx + si >= flat_col_values.len() {
1306 break;
1307 }
1308 if is_absent(si, row) {
1309 continue; }
1311 let val = flat_col_values[flat_idx + si][row];
1312 if !first {
1313 obj.push(b',');
1314 }
1315 first = false;
1316 obj.push(b'"');
1317 obj.extend_from_slice(&group.sub_keys[si]);
1318 obj.push(b'"');
1319 obj.push(b':');
1320 obj.extend_from_slice(val);
1321 }
1322 obj.push(b'}');
1323 merged_col.push(obj);
1324 }
1325 }
1326 output_columns.push(merged_col);
1327 flat_idx += num_sub;
1328 } else {
1329 if flat_idx < flat_col_values.len() {
1331 let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1332 .iter()
1333 .map(|v| v.to_vec())
1334 .collect();
1335 output_columns.push(col);
1336 }
1337 flat_idx += 1;
1338 }
1339 }
1340
1341 let num_out_cols = output_columns.len();
1343 let mut out = Vec::new();
1344 for (ci, col) in output_columns.iter().enumerate() {
1345 for (ri, val) in col.iter().enumerate() {
1346 out.extend_from_slice(val);
1347 if ri < num_rows - 1 {
1348 out.push(VAL_SEP);
1349 }
1350 }
1351 if ci < num_out_cols - 1 {
1352 out.push(COL_SEP);
1353 }
1354 }
1355
1356 out
1357}
1358
1359pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1364 let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1365 let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
1366 let mut out = Vec::new();
1367 let version = if has_absence {
1368 3u8
1369 } else if has_template {
1370 2u8
1371 } else {
1372 1u8
1373 };
1374 out.push(version);
1375 out.push(groups.len() as u8);
1376 for group in groups {
1377 out.extend_from_slice(&group.original_col_index.to_le_bytes());
1378 out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1379 for key in &group.sub_keys {
1380 out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1381 out.extend_from_slice(key);
1382 }
1383 if has_template || version == 3 {
1384 out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1385 for part in &group.nested_template {
1386 out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1387 out.extend_from_slice(part);
1388 }
1389 }
1390 if version == 3 {
1391 let bm_len = group.absence_bitmap.len() as u32;
1392 out.extend_from_slice(&bm_len.to_le_bytes());
1393 out.extend_from_slice(&group.absence_bitmap);
1394 }
1395 }
1396 out
1397}
1398
1399pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1404 if data.is_empty() {
1405 return None;
1406 }
1407 let mut pos = 0;
1408 let version = data[pos];
1409 pos += 1;
1410 if version != 1 && version != 2 && version != 3 {
1411 return None;
1412 }
1413 let has_template = version == 2 || version == 3;
1414 let has_absence = version == 3;
1415 if pos >= data.len() {
1416 return None;
1417 }
1418 let num_groups = data[pos] as usize;
1419 pos += 1;
1420
1421 let mut groups = Vec::with_capacity(num_groups);
1422 for _ in 0..num_groups {
1423 if pos + 4 > data.len() {
1424 return None;
1425 }
1426 let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1427 pos += 2;
1428 let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1429 pos += 2;
1430
1431 let mut sub_keys = Vec::with_capacity(num_sub_cols);
1432 for _ in 0..num_sub_cols {
1433 if pos + 2 > data.len() {
1434 return None;
1435 }
1436 let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1437 pos += 2;
1438 if pos + key_len > data.len() {
1439 return None;
1440 }
1441 sub_keys.push(data[pos..pos + key_len].to_vec());
1442 pos += key_len;
1443 }
1444
1445 let nested_template = if has_template {
1446 if pos + 2 > data.len() {
1447 return None;
1448 }
1449 let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1450 pos += 2;
1451 let mut parts = Vec::with_capacity(num_parts);
1452 for _ in 0..num_parts {
1453 if pos + 2 > data.len() {
1454 return None;
1455 }
1456 let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1457 pos += 2;
1458 if pos + part_len > data.len() {
1459 return None;
1460 }
1461 parts.push(data[pos..pos + part_len].to_vec());
1462 pos += part_len;
1463 }
1464 parts
1465 } else {
1466 Vec::new()
1467 };
1468
1469 let absence_bitmap = if has_absence {
1470 if pos + 4 > data.len() {
1471 return None;
1472 }
1473 let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1474 pos += 4;
1475 if pos + bm_len > data.len() {
1476 return None;
1477 }
1478 let bm = data[pos..pos + bm_len].to_vec();
1479 pos += bm_len;
1480 bm
1481 } else {
1482 Vec::new()
1483 };
1484
1485 groups.push(NestedGroupInfo {
1486 original_col_index,
1487 sub_keys,
1488 nested_template,
1489 absence_bitmap,
1490 });
1491 }
1492
1493 Some((groups, pos))
1494}
1495
1496pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1501 if data.is_empty() {
1502 return None;
1503 }
1504
1505 let has_trailing_newline = data.last() == Some(&b'\n');
1506 let lines = split_lines(data);
1507 let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1508
1509 if non_empty.len() < 2 {
1510 return None;
1511 }
1512
1513 if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1515 let is_selective = !metadata.is_empty() && metadata[0] == METADATA_VERSION_SELECTIVE;
1516 let size_ok = if is_selective {
1520 (col_data.len() + metadata.len()) * 100 <= data.len() * 105
1521 } else {
1522 col_data.len() + metadata.len() < data.len()
1523 };
1524 if size_ok {
1525 if is_selective {
1527 return Some(TransformResult {
1528 data: col_data,
1529 metadata,
1530 });
1531 }
1532 let num_rows = non_empty.len();
1537 if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1538 let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
1543 let unflattened =
1544 unflatten_nested_columns(&flat_data, &nested_groups, num_rows, total_flat_cols);
1545 if unflattened == col_data {
1546 let nested_bytes = serialize_nested_info(&nested_groups);
1548 metadata.extend_from_slice(&nested_bytes);
1549 return Some(TransformResult {
1550 data: flat_data,
1551 metadata,
1552 });
1553 }
1554 }
1556 metadata.push(0u8); return Some(TransformResult {
1559 data: col_data,
1560 metadata,
1561 });
1562 }
1563 }
1564
1565 if let Some((grouped_data, grouped_metadata)) =
1567 preprocess_grouped(&non_empty, has_trailing_newline)
1568 {
1569 if grouped_data.len() + grouped_metadata.len() < data.len() {
1570 return Some(TransformResult {
1571 data: grouped_data,
1572 metadata: grouped_metadata,
1573 });
1574 }
1575 }
1576
1577 None
1578}
1579
1580pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1583 if metadata.is_empty() {
1584 return data.to_vec();
1585 }
1586 match metadata[0] {
1587 METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1588 METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1589 METADATA_VERSION_SELECTIVE => reverse_selective(data, metadata),
1590 _ => data.to_vec(),
1591 }
1592}
1593
1594fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1596 if metadata.len() < 10 {
1597 return data.to_vec();
1598 }
1599 let mut pos = 0;
1600 let _version = metadata[pos];
1601 pos += 1;
1602 let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1603 pos += 4;
1604 let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1605 pos += 2;
1606 let has_trailing_newline = metadata[pos] != 0;
1607 pos += 1;
1608 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1609 pos += 2;
1610
1611 let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1612 for _ in 0..num_parts {
1613 if pos + 2 > metadata.len() {
1614 return data.to_vec();
1615 }
1616 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1617 pos += 2;
1618 if pos + part_len > metadata.len() {
1619 return data.to_vec();
1620 }
1621 parts.push(metadata[pos..pos + part_len].to_vec());
1622 pos += part_len;
1623 }
1624
1625 if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1626 return data.to_vec();
1627 }
1628
1629 let remaining_metadata = &metadata[pos..];
1631 if !remaining_metadata.is_empty()
1632 && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
1633 {
1634 if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1636 let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1638 let unflattened =
1639 unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1640 return reverse_uniform_from_parts(
1641 &unflattened,
1642 &parts,
1643 num_rows,
1644 num_cols,
1645 has_trailing_newline,
1646 );
1647 }
1648 }
1649
1650 reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1651}
1652
1653fn reverse_uniform_from_parts(
1655 data: &[u8],
1656 parts: &[Vec<u8>],
1657 num_rows: usize,
1658 num_cols: usize,
1659 has_trailing_newline: bool,
1660) -> Vec<u8> {
1661 let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1662 if col_chunks.len() != num_cols {
1663 return data.to_vec();
1664 }
1665
1666 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1667 for chunk in &col_chunks {
1668 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1669 if vals.len() != num_rows {
1670 return data.to_vec();
1671 }
1672 columns.push(vals);
1673 }
1674
1675 let template_size_per_row: usize = parts.iter().map(|p| p.len()).sum();
1677 let values_total: usize = columns
1678 .iter()
1679 .map(|col| col.iter().map(|v| v.len()).sum::<usize>())
1680 .sum();
1681 let newline_count = if has_trailing_newline {
1682 num_rows
1683 } else {
1684 num_rows - 1
1685 };
1686 let total_size = template_size_per_row * num_rows + values_total + newline_count;
1687
1688 let mut output = Vec::with_capacity(total_size);
1689 #[allow(clippy::needless_range_loop)]
1690 for row in 0..num_rows {
1691 output.extend_from_slice(&parts[0]);
1692 output.extend_from_slice(columns[0][row]);
1693 for col in 1..num_cols {
1694 output.extend_from_slice(&parts[col]);
1695 output.extend_from_slice(columns[col][row]);
1696 }
1697 output.extend_from_slice(&parts[num_cols]);
1698
1699 if row < num_rows - 1 || has_trailing_newline {
1700 output.push(b'\n');
1701 }
1702 }
1703
1704 output
1705}
1706
1707struct SelectiveMetadata {
1709 parts: Vec<Vec<u8>>,
1710 num_rows: usize,
1711 num_total_cols: usize,
1712 has_trailing_newline: bool,
1713 extracted_col_indices: Vec<u16>,
1714}
1715
1716fn parse_selective_metadata(metadata: &[u8]) -> Option<SelectiveMetadata> {
1718 if metadata.len() < 12 {
1719 return None;
1720 }
1721 let mut pos = 1; let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1723 pos += 4;
1724 let num_total_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1725 pos += 2;
1726 let has_trailing_newline = metadata[pos] != 0;
1727 pos += 1;
1728 let num_extracted = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1729 pos += 2;
1730
1731 let mut extracted_col_indices = Vec::with_capacity(num_extracted);
1732 for _ in 0..num_extracted {
1733 if pos + 2 > metadata.len() {
1734 return None;
1735 }
1736 let idx = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap());
1737 pos += 2;
1738 extracted_col_indices.push(idx);
1739 }
1740
1741 if pos + 2 > metadata.len() {
1742 return None;
1743 }
1744 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1745 pos += 2;
1746
1747 let mut parts = Vec::with_capacity(num_parts);
1748 for _ in 0..num_parts {
1749 if pos + 2 > metadata.len() {
1750 return None;
1751 }
1752 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1753 pos += 2;
1754 if pos + part_len > metadata.len() {
1755 return None;
1756 }
1757 parts.push(metadata[pos..pos + part_len].to_vec());
1758 pos += part_len;
1759 }
1760
1761 if parts.len() != num_total_cols + 1 || num_rows == 0 || num_total_cols == 0 {
1762 return None;
1763 }
1764
1765 Some(SelectiveMetadata {
1766 parts,
1767 num_rows,
1768 num_total_cols,
1769 has_trailing_newline,
1770 extracted_col_indices,
1771 })
1772}
1773
1774fn reverse_selective(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1776 let sm = match parse_selective_metadata(metadata) {
1777 Some(v) => v,
1778 None => return data.to_vec(),
1779 };
1780 reverse_selective_from_data(data, &sm)
1781}
1782
1783fn reverse_selective_from_data(data: &[u8], sm: &SelectiveMetadata) -> Vec<u8> {
1785 if data.len() < 4 {
1786 return data.to_vec();
1787 }
1788
1789 let extracted_data_len = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1791 if 4 + extracted_data_len > data.len() {
1792 return data.to_vec();
1793 }
1794 let extracted_section = &data[4..4 + extracted_data_len];
1795 let inline_section = &data[4 + extracted_data_len..];
1796
1797 let num_extracted = sm.extracted_col_indices.len();
1798 let num_inline = sm.num_total_cols - num_extracted;
1799
1800 let extracted_columns: Vec<Vec<&[u8]>> = if num_extracted > 0 && !extracted_section.is_empty() {
1802 let col_chunks: Vec<&[u8]> = extracted_section.split(|&b| b == COL_SEP).collect();
1803 if col_chunks.len() != num_extracted {
1804 return data.to_vec();
1805 }
1806 let mut cols = Vec::with_capacity(num_extracted);
1807 for chunk in &col_chunks {
1808 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1809 if vals.len() != sm.num_rows {
1810 return data.to_vec();
1811 }
1812 cols.push(vals);
1813 }
1814 cols
1815 } else if num_extracted > 0 {
1816 return data.to_vec();
1818 } else {
1819 Vec::new()
1820 };
1821
1822 let inline_rows: Vec<Vec<&[u8]>> = if num_inline > 0 && !inline_section.is_empty() {
1824 let row_chunks: Vec<&[u8]> = inline_section.split(|&b| b == COL_SEP).collect();
1825 if row_chunks.len() != sm.num_rows {
1826 return data.to_vec();
1827 }
1828 let mut rows = Vec::with_capacity(sm.num_rows);
1829 for chunk in &row_chunks {
1830 let vals: Vec<&[u8]> = if num_inline > 1 {
1831 chunk.split(|&b| b == VAL_SEP).collect()
1832 } else {
1833 vec![*chunk]
1834 };
1835 if vals.len() != num_inline {
1836 return data.to_vec();
1837 }
1838 rows.push(vals);
1839 }
1840 rows
1841 } else if num_inline > 0 {
1842 return data.to_vec();
1844 } else {
1845 Vec::new()
1846 };
1847
1848 let mut extracted_positions = vec![None; sm.num_total_cols];
1852 let mut inline_positions = vec![None; sm.num_total_cols];
1853 for (ei, &col_idx) in sm.extracted_col_indices.iter().enumerate() {
1854 if (col_idx as usize) < sm.num_total_cols {
1855 extracted_positions[col_idx as usize] = Some(ei);
1856 }
1857 }
1858 let mut ii = 0;
1859 for col in 0..sm.num_total_cols {
1860 if extracted_positions[col].is_none() {
1861 inline_positions[col] = Some(ii);
1862 ii += 1;
1863 }
1864 }
1865
1866 let mut output = Vec::with_capacity(data.len() * 2);
1868 for row in 0..sm.num_rows {
1869 output.extend_from_slice(&sm.parts[0]);
1871 for col in 0..sm.num_total_cols {
1872 if let Some(ei) = extracted_positions[col] {
1873 output.extend_from_slice(extracted_columns[ei][row]);
1874 } else if let Some(ii_idx) = inline_positions[col] {
1875 output.extend_from_slice(inline_rows[row][ii_idx]);
1876 }
1877 output.extend_from_slice(&sm.parts[col + 1]);
1878 }
1879
1880 if row < sm.num_rows - 1 || sm.has_trailing_newline {
1881 output.push(b'\n');
1882 }
1883 }
1884
1885 output
1886}
1887
1888
1889fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1891 if metadata.len() < 8 {
1892 return data.to_vec();
1893 }
1894
1895 let mut mpos = 1; let has_trailing_newline = metadata[mpos] != 0;
1897 mpos += 1;
1898 let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1899 mpos += 4;
1900 let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1901 mpos += 2;
1902
1903 let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1905
1906 let mut dpos: usize = 0;
1908
1909 for _ in 0..num_groups {
1910 if mpos + 4 > metadata.len() {
1912 return data.to_vec();
1913 }
1914 let group_row_count =
1915 u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1916 mpos += 4;
1917
1918 let mut row_indices = Vec::with_capacity(group_row_count);
1919 for _ in 0..group_row_count {
1920 if mpos + 4 > metadata.len() {
1921 return data.to_vec();
1922 }
1923 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1924 mpos += 4;
1925 row_indices.push(idx);
1926 }
1927
1928 if mpos + 4 > metadata.len() {
1930 return data.to_vec();
1931 }
1932 let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1933 mpos += 4;
1934 if mpos + gm_len > metadata.len() {
1935 return data.to_vec();
1936 }
1937 let group_metadata = &metadata[mpos..mpos + gm_len];
1938 mpos += gm_len;
1939
1940 if dpos + 4 > data.len() {
1942 return data.to_vec();
1943 }
1944 let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1945 dpos += 4;
1946 if dpos + gd_len > data.len() {
1947 return data.to_vec();
1948 }
1949 let group_data = &data[dpos..dpos + gd_len];
1950 dpos += gd_len;
1951
1952 let group_version = if group_metadata.is_empty() {
1954 0
1955 } else {
1956 group_metadata[0]
1957 };
1958
1959 if group_version == METADATA_VERSION_SELECTIVE {
1960 let sm = match parse_selective_metadata(group_metadata) {
1962 Some(v) => v,
1963 None => return data.to_vec(),
1964 };
1965 if sm.num_rows != group_row_count {
1966 return data.to_vec();
1967 }
1968 let reconstructed = reverse_selective_from_data(group_data, &sm);
1969 let lines: Vec<&[u8]> = reconstructed.split(|&b| b == b'\n').collect();
1971 for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1972 if row_within_group < lines.len() && original_idx < total_rows {
1973 output_lines[original_idx] = Some(lines[row_within_group].to_vec());
1974 }
1975 }
1976 } else {
1977 let reconstructed = reverse_uniform(group_data, group_metadata);
1980 let lines: Vec<&[u8]> = reconstructed.split(|&b| b == b'\n').collect();
1981 for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1982 if row_within_group < lines.len() && original_idx < total_rows {
1983 output_lines[original_idx] = Some(lines[row_within_group].to_vec());
1984 }
1985 }
1986 }
1987 }
1988
1989 if mpos + 4 > metadata.len() {
1991 return data.to_vec();
1992 }
1993 let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1994 mpos += 4;
1995
1996 let mut residual_indices = Vec::with_capacity(residual_count);
1997 for _ in 0..residual_count {
1998 if mpos + 4 > metadata.len() {
1999 return data.to_vec();
2000 }
2001 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
2002 mpos += 4;
2003 residual_indices.push(idx);
2004 }
2005
2006 let residual_data = &data[dpos..];
2008 if residual_count > 0 {
2009 let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
2010 vec![]
2011 } else {
2012 residual_data.split(|&b| b == b'\n').collect()
2013 };
2014 if residual_lines.len() != residual_count {
2016 return data.to_vec();
2017 }
2018 for (i, &idx) in residual_indices.iter().enumerate() {
2019 if idx < total_rows {
2020 output_lines[idx] = Some(residual_lines[i].to_vec());
2021 }
2022 }
2023 }
2024
2025 let mut output = Vec::with_capacity(data.len() * 2);
2027 for (i, slot) in output_lines.iter().enumerate() {
2028 match slot {
2029 Some(line) => output.extend_from_slice(line),
2030 None => {
2031 return data.to_vec();
2033 }
2034 }
2035 if i < total_rows - 1 || has_trailing_newline {
2036 output.push(b'\n');
2037 }
2038 }
2039
2040 output
2041}
2042
2043#[cfg(test)]
2044mod tests {
2045 use super::*;
2046
2047 #[test]
2048 fn extract_value_string() {
2049 let line = br#""hello","next""#;
2050 let (val, end) = extract_value(line, 0).unwrap();
2051 assert_eq!(val, b"\"hello\"");
2052 assert_eq!(end, 7);
2053 }
2054
2055 #[test]
2056 fn extract_value_number() {
2057 let line = b"42,next";
2058 let (val, end) = extract_value(line, 0).unwrap();
2059 assert_eq!(val, b"42");
2060 assert_eq!(end, 2);
2061 }
2062
2063 #[test]
2064 fn extract_value_bool() {
2065 let line = b"true,next";
2066 let (val, end) = extract_value(line, 0).unwrap();
2067 assert_eq!(val, b"true");
2068 assert_eq!(end, 4);
2069 }
2070
2071 #[test]
2072 fn extract_value_null() {
2073 let line = b"null,next";
2074 let (val, end) = extract_value(line, 0).unwrap();
2075 assert_eq!(val, b"null");
2076 assert_eq!(end, 4);
2077 }
2078
2079 #[test]
2080 fn extract_value_object() {
2081 let line = br#"{"a":1,"b":"x"},next"#;
2082 let (val, end) = extract_value(line, 0).unwrap();
2083 assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
2084 assert_eq!(end, 15);
2085 }
2086
2087 #[test]
2088 fn extract_value_array() {
2089 let line = b"[1,2,3],next";
2090 let (val, end) = extract_value(line, 0).unwrap();
2091 assert_eq!(val, b"[1,2,3]");
2092 assert_eq!(end, 7);
2093 }
2094
2095 #[test]
2096 fn extract_value_string_with_escapes() {
2097 let line = br#""he\"llo",next"#;
2098 let (val, end) = extract_value(line, 0).unwrap();
2099 assert_eq!(val, br#""he\"llo""#.to_vec());
2100 assert_eq!(end, 9);
2101 }
2102
2103 #[test]
2104 fn parse_line_simple() {
2105 let line = br#"{"a":1,"b":"x"}"#;
2106 let (parts, values) = parse_line(line).unwrap();
2107 assert_eq!(parts.len(), 3); assert_eq!(values.len(), 2);
2109 assert_eq!(values[0], b"1");
2110 assert_eq!(values[1], b"\"x\"");
2111 assert_eq!(parts[0], br#"{"a":"#.to_vec());
2112 assert_eq!(parts[1], br#","b":"#.to_vec());
2113 assert_eq!(parts[2], b"}");
2114 }
2115
2116 #[test]
2117 fn roundtrip_simple() {
2118 let data = br#"{"a":1,"b":"x"}
2119{"a":2,"b":"y"}
2120{"a":3,"b":"z"}
2121"#;
2122 let result = preprocess(data).expect("should produce transform");
2123 let restored = reverse(&result.data, &result.metadata);
2124 assert_eq!(
2125 String::from_utf8_lossy(&restored),
2126 String::from_utf8_lossy(data),
2127 );
2128 assert_eq!(restored, data.to_vec());
2129 }
2130
2131 #[test]
2132 fn roundtrip_no_trailing_newline() {
2133 let data = br#"{"a":1,"b":"x"}
2134{"a":2,"b":"y"}
2135{"a":3,"b":"z"}"#;
2136 let result = preprocess(data).expect("should produce transform");
2137 let restored = reverse(&result.data, &result.metadata);
2138 assert_eq!(restored, data.to_vec());
2139 }
2140
2141 #[test]
2142 fn roundtrip_nested_values() {
2143 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2144{"id":2,"meta":{"x":30,"y":40}}
2145{"id":3,"meta":{"x":50,"y":60}}
2146{"id":4,"meta":{"x":70,"y":80}}
2147{"id":5,"meta":{"x":90,"y":100}}
2148"#;
2149 let result = preprocess(data).expect("should produce transform");
2150 let restored = reverse(&result.data, &result.metadata);
2151 assert_eq!(restored, data.to_vec());
2152 }
2153
2154 #[test]
2155 fn roundtrip_mixed_types() {
2156 let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
2157{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
2158{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
2159{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
2160{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
2161"#;
2162 let result = preprocess(data).expect("should produce transform");
2163 let restored = reverse(&result.data, &result.metadata);
2164 assert_eq!(restored, data.to_vec());
2165 }
2166
2167 #[test]
2168 fn schema_mismatch_too_few_returns_none() {
2169 let data = br#"{"a":1,"b":2}
2171{"a":1,"c":3}
2172"#;
2173 assert!(preprocess(data).is_none());
2174 }
2175
2176 #[test]
2177 fn different_num_keys_too_few_returns_none() {
2178 let data = br#"{"a":1,"b":2}
2179{"a":1}
2180"#;
2181 assert!(preprocess(data).is_none());
2182 }
2183
2184 #[test]
2185 fn single_line_returns_none() {
2186 let data = br#"{"a":1,"b":2}
2187"#;
2188 assert!(preprocess(data).is_none());
2189 }
2190
2191 #[test]
2192 fn empty_returns_none() {
2193 assert!(preprocess(b"").is_none());
2194 }
2195
2196 #[test]
2197 fn column_layout_groups_similar_values() {
2198 let data = br#"{"type":"page_view","user":"alice"}
2199{"type":"api_call","user":"alice"}
2200{"type":"click","user":"bob"}
2201"#;
2202 let result = preprocess(data).unwrap();
2203
2204 let col_data = &result.data;
2206 let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
2207 assert_eq!(cols.len(), 2);
2208
2209 let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
2211 assert_eq!(type_vals.len(), 3);
2212 assert_eq!(type_vals[0], br#""page_view""#);
2213 assert_eq!(type_vals[1], br#""api_call""#);
2214 assert_eq!(type_vals[2], br#""click""#);
2215
2216 let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2218 assert_eq!(user_vals.len(), 3);
2219 assert_eq!(user_vals[0], br#""alice""#);
2220 assert_eq!(user_vals[1], br#""alice""#);
2221 assert_eq!(user_vals[2], br#""bob""#);
2222 }
2223
2224 #[test]
2225 fn roundtrip_string_with_escaped_chars() {
2226 let data = br#"{"msg":"he said \"hi\"","val":1}
2227{"msg":"line\nbreak","val":2}
2228{"msg":"tab\there","val":3}
2229{"msg":"back\\slash","val":4}
2230{"msg":"normal text","val":5}
2231"#;
2232 let result = preprocess(data).expect("should produce transform");
2233 let restored = reverse(&result.data, &result.metadata);
2234 assert_eq!(restored, data.to_vec());
2235 }
2236
2237 #[test]
2238 fn roundtrip_negative_and_float_numbers() {
2239 let data = br#"{"x":-3.14,"y":0}
2240{"x":2.718,"y":-1}
2241{"x":0.001,"y":999}
2242{"x":-100,"y":-200}
2243{"x":42.0,"y":7}
2244"#;
2245 let result = preprocess(data).expect("should produce transform");
2246 let restored = reverse(&result.data, &result.metadata);
2247 assert_eq!(restored, data.to_vec());
2248 }
2249
2250 #[test]
2253 fn reverse_roundtrip_small_data() {
2254 let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
2256 assert_eq!(vals.len(), 2);
2257 assert_eq!(parts.len(), 3);
2258
2259 let big_data = br#"{"x":-3.14,"y":0}
2261{"x":2.718,"y":-1}
2262"#
2263 .repeat(20);
2264 let result = preprocess(&big_data).expect("should produce transform with 40 rows");
2265 let restored = reverse(&result.data, &result.metadata);
2266 assert_eq!(restored, big_data);
2267 }
2268
2269 #[test]
2272 fn grouped_roundtrip_two_schemas() {
2273 let mut data = Vec::new();
2275 for i in 0..10 {
2276 data.extend_from_slice(
2277 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
2278 );
2279 data.push(b'\n');
2280 }
2281 for i in 10..20 {
2282 data.extend_from_slice(
2283 format!(
2284 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
2285 i, i, i
2286 )
2287 .as_bytes(),
2288 );
2289 data.push(b'\n');
2290 }
2291 let result = preprocess(&data).expect("should produce grouped transform");
2292 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2294 let restored = reverse(&result.data, &result.metadata);
2295 assert_eq!(restored, data);
2296 }
2297
2298 #[test]
2299 fn grouped_roundtrip_interleaved_schemas() {
2300 let mut data = Vec::new();
2302 for i in 0..20 {
2303 if i % 2 == 0 {
2304 data.extend_from_slice(
2305 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
2306 );
2307 } else {
2308 data.extend_from_slice(
2309 format!(
2310 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
2311 i, i, i
2312 )
2313 .as_bytes(),
2314 );
2315 }
2316 data.push(b'\n');
2317 }
2318 let result = preprocess(&data).expect("should produce grouped transform");
2319 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2320 let restored = reverse(&result.data, &result.metadata);
2321 assert_eq!(restored, data);
2322 }
2323
2324 #[test]
2325 fn grouped_roundtrip_with_residuals() {
2326 let mut data = Vec::new();
2328 for i in 0..8 {
2330 data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
2331 data.push(b'\n');
2332 }
2333 data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
2335 data.push(b'\n');
2336 data.extend_from_slice(br#"{"p":"q"}"#);
2337 data.push(b'\n');
2338 for i in 0..6 {
2340 data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
2341 data.push(b'\n');
2342 }
2343 let result = preprocess(&data).expect("should produce grouped transform");
2344 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2345 let restored = reverse(&result.data, &result.metadata);
2346 assert_eq!(
2347 String::from_utf8_lossy(&restored),
2348 String::from_utf8_lossy(&data),
2349 );
2350 assert_eq!(restored, data);
2351 }
2352
2353 #[test]
2354 fn grouped_roundtrip_no_trailing_newline() {
2355 let mut data = Vec::new();
2356 for i in 0..6 {
2357 data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
2358 data.push(b'\n');
2359 }
2360 for i in 0..6 {
2361 data.extend_from_slice(
2362 format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
2363 );
2364 if i < 5 {
2365 data.push(b'\n');
2366 }
2367 }
2369 let result = preprocess(&data).expect("should produce grouped transform");
2370 let restored = reverse(&result.data, &result.metadata);
2371 assert_eq!(restored, data);
2372 }
2373
2374 #[test]
2375 fn uniform_still_preferred_over_grouped() {
2376 let data = br#"{"a":1,"b":"x"}
2378{"a":2,"b":"y"}
2379{"a":3,"b":"z"}
2380{"a":4,"b":"w"}
2381{"a":5,"b":"v"}
2382"#;
2383 let result = preprocess(data).expect("should produce transform");
2384 assert_eq!(
2385 result.metadata[0], METADATA_VERSION_UNIFORM,
2386 "uniform schema should use Strategy 1"
2387 );
2388 let restored = reverse(&result.data, &result.metadata);
2389 assert_eq!(restored, data.to_vec());
2390 }
2391
2392 #[test]
2393 fn grouped_gharchive_simulation() {
2394 let mut data = Vec::new();
2396 for i in 0..50 {
2397 if i % 5 == 0 {
2398 data.extend_from_slice(
2400 format!(
2401 r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
2402 i, i, i, i
2403 )
2404 .as_bytes(),
2405 );
2406 } else {
2407 data.extend_from_slice(
2409 format!(
2410 r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
2411 i, i, i
2412 )
2413 .as_bytes(),
2414 );
2415 }
2416 data.push(b'\n');
2417 }
2418 let result = preprocess(&data).expect("should produce grouped transform");
2419 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2420 let restored = reverse(&result.data, &result.metadata);
2421 assert_eq!(restored, data);
2422 }
2423
2424 #[test]
2425 fn grouped_nested_flatten_per_group() {
2426 let mut data = Vec::new();
2429 for i in 0..30 {
2430 if i % 3 == 0 {
2431 data.extend_from_slice(
2433 format!(
2434 r#"{{"id":{},"info":{{"a":{},"b":{}}},"tag":"b","extra":"yes"}}"#,
2435 i,
2436 i * 10,
2437 i * 20
2438 )
2439 .as_bytes(),
2440 );
2441 } else {
2442 data.extend_from_slice(
2444 format!(
2445 r#"{{"id":{},"info":{{"a":{},"b":{}}},"tag":"a"}}"#,
2446 i,
2447 i * 10,
2448 i * 20
2449 )
2450 .as_bytes(),
2451 );
2452 }
2453 data.push(b'\n');
2454 }
2455 let result = preprocess(&data).expect("should produce grouped transform");
2456 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2457 let restored = reverse(&result.data, &result.metadata);
2458 assert_eq!(restored, data, "grouped nested flatten roundtrip failed");
2459 }
2460
2461 #[test]
2462 fn grouped_discriminator_two_pass() {
2463 let mut data = Vec::new();
2466 for i in 0..60 {
2467 let etype = if i % 2 == 0 { "push" } else { "create" };
2468 data.extend_from_slice(
2469 format!(
2470 r#"{{"id":{},"type":"{}","payload":{{"ref":"r{}","size":{}}}}}"#,
2471 i, etype, i, i * 10
2472 )
2473 .as_bytes(),
2474 );
2475 data.push(b'\n');
2476 }
2477 let result = preprocess(&data).expect("should produce transform");
2478 let restored = reverse(&result.data, &result.metadata);
2479 assert_eq!(
2480 restored, data,
2481 "discriminator two-pass roundtrip failed"
2482 );
2483 }
2484
2485 #[test]
2488 fn test_nested_decomposition_basic() {
2489 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2491{"id":2,"meta":{"x":30,"y":40}}
2492{"id":3,"meta":{"x":50,"y":60}}
2493"#;
2494 let result = preprocess(data).expect("should produce transform");
2495 assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
2496
2497 let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
2499 assert_eq!(
2501 cols.len(),
2502 3,
2503 "should have 3 columns after flattening: got {}",
2504 cols.len()
2505 );
2506
2507 let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2509 assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
2510
2511 let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
2512 assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
2513 }
2514
2515 #[test]
2516 fn test_nested_roundtrip() {
2517 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2519{"id":2,"meta":{"x":30,"y":40}}
2520{"id":3,"meta":{"x":50,"y":60}}
2521"#;
2522 let result = preprocess(data).expect("should produce transform");
2523 let restored = reverse(&result.data, &result.metadata);
2524 assert_eq!(
2525 String::from_utf8_lossy(&restored),
2526 String::from_utf8_lossy(data),
2527 );
2528 assert_eq!(restored, data.to_vec());
2529 }
2530
2531 #[test]
2532 fn test_nested_mixed_schemas() {
2533 let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
2535{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
2536{"ts":"c","meta":{"query":"pricing","results_count":25}}
2537{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
2538{"ts":"e","meta":{"query":"api docs","results_count":41}}
2539"#;
2540 let result = preprocess(data).expect("should produce transform");
2541 let restored = reverse(&result.data, &result.metadata);
2542 assert_eq!(
2543 String::from_utf8_lossy(&restored),
2544 String::from_utf8_lossy(data),
2545 );
2546 assert_eq!(restored, data.to_vec());
2547 }
2548
2549 #[test]
2550 fn test_nested_no_nested_objects() {
2551 let data = br#"{"a":1,"b":"x"}
2553{"a":2,"b":"y"}
2554{"a":3,"b":"z"}
2555"#;
2556 let result = preprocess(data).expect("should produce transform");
2557 let restored = reverse(&result.data, &result.metadata);
2558 assert_eq!(restored, data.to_vec());
2559
2560 let meta = &result.metadata;
2566 let last_byte = meta[meta.len() - 1];
2567 assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
2568 }
2569
2570 #[test]
2571 fn test_nested_real_corpus() {
2572 let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
2574{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
2575{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
2576{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
2577{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
2578"#;
2579 let result = preprocess(data).expect("should produce transform");
2580 let restored = reverse(&result.data, &result.metadata);
2581 assert_eq!(
2582 String::from_utf8_lossy(&restored),
2583 String::from_utf8_lossy(data),
2584 );
2585 assert_eq!(restored, data.to_vec());
2586 }
2587
2588 #[test]
2589 fn test_nested_roundtrip_with_null_values() {
2590 let data = br#"{"id":1,"meta":{"x":10}}
2592{"id":2,"meta":null}
2593{"id":3,"meta":{"x":30}}
2594{"id":4,"meta":null}
2595{"id":5,"meta":{"x":50}}
2596"#;
2597 let result = preprocess(data).expect("should produce transform");
2598 let restored = reverse(&result.data, &result.metadata);
2599 assert_eq!(restored, data.to_vec());
2600 }
2601
2602 #[test]
2603 fn test_nested_string_values_preserved_exact() {
2604 let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2606{"id":2,"meta":{"name":"Bob","score":200}}
2607{"id":3,"meta":{"name":"Charlie","score":300}}
2608"#;
2609 let result = preprocess(data).expect("should produce transform");
2610 let restored = reverse(&result.data, &result.metadata);
2611 assert_eq!(restored, data.to_vec());
2612 }
2613
2614 #[test]
2615 fn test_parse_nested_object_kv() {
2616 let obj = br#"{"query":"benchmark","results_count":14}"#;
2617 let pairs = parse_nested_object_kv(obj).unwrap();
2618 assert_eq!(pairs.len(), 2);
2619 assert_eq!(pairs[0].0, b"query");
2620 assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2621 assert_eq!(pairs[1].0, b"results_count");
2622 assert_eq!(pairs[1].1, b"14");
2623 }
2624
2625 #[test]
2626 fn test_nested_varying_subkeys_roundtrip() {
2627 let mut lines = Vec::new();
2630 for i in 0..50 {
2631 let line = if i % 2 == 0 {
2632 format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
2633 } else {
2634 format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
2635 };
2636 lines.push(line);
2637 }
2638 let ndjson = lines.join("\n") + "\n";
2639 let data = ndjson.as_bytes();
2640
2641 let result = preprocess(data).expect("should produce transform");
2642 let restored = reverse(&result.data, &result.metadata);
2643 assert_eq!(
2644 std::str::from_utf8(&restored).unwrap(),
2645 std::str::from_utf8(data).unwrap(),
2646 "varying sub-keys roundtrip must be byte-exact"
2647 );
2648 }
2649
2650 #[test]
2651 fn test_nested_explicit_null_preserved() {
2652 let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
2655 {\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
2656 {\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
2657 let result = preprocess(data).expect("should produce transform");
2658 let restored = reverse(&result.data, &result.metadata);
2659 assert_eq!(
2660 std::str::from_utf8(&restored).unwrap(),
2661 std::str::from_utf8(data).unwrap(),
2662 "explicit null values must be preserved"
2663 );
2664 }
2665
2666 #[test]
2667 fn null_heavy_30_rows_roundtrip() {
2668 let mut data = Vec::new();
2670 for i in 0..30 {
2671 data.extend_from_slice(format!("{{\"id\":{},\"val\":null}}\n", i).as_bytes());
2672 }
2673 let result = preprocess(&data);
2674 if let Some(result) = result {
2675 let restored = reverse(&result.data, &result.metadata);
2676 assert_eq!(
2677 restored,
2678 data,
2679 "null-heavy 30-row roundtrip failed.\nOriginal len={}, Restored len={}\nOrig first 200: {:?}\nRest first 200: {:?}",
2680 data.len(),
2681 restored.len(),
2682 String::from_utf8_lossy(&data[..data.len().min(200)]),
2683 String::from_utf8_lossy(&restored[..restored.len().min(200)])
2684 );
2685 }
2686 }
2687
2688 #[test]
2689 fn null_heavy_60_rows_roundtrip() {
2690 let mut data = Vec::new();
2692 for i in 0..60 {
2693 let name = if i % 10 == 0 {
2694 format!("\"user_{}\"", i)
2695 } else {
2696 "null".to_string()
2697 };
2698 data.extend_from_slice(
2699 format!("{{\"id\":{},\"name\":{},\"email\":null,\"score\":null,\"active\":null,\"tags\":null}}\n", i, name).as_bytes(),
2700 );
2701 }
2702 let result = preprocess(&data);
2703 if let Some(result) = result {
2704 let restored = reverse(&result.data, &result.metadata);
2705 assert_eq!(restored, data, "null-heavy 60-row ndjson roundtrip failed");
2706 }
2707 }
2708
2709 #[test]
2710 fn selective_columnar_roundtrip() {
2711 let mut data = Vec::new();
2713 for i in 0..50 {
2714 let event_type = match i % 3 {
2715 0 => "push",
2716 1 => "pull_request",
2717 _ => "create",
2718 };
2719 let payload = format!(
2721 "{{\"commits\":[{{\"sha\":\"abc{:04}def\",\"message\":\"commit message number {} with extra text to make it long enough for selective columnar threshold of 128 bytes average value length\"}}]}}",
2722 i, i
2723 );
2724 data.extend_from_slice(
2725 format!(
2726 "{{\"id\":{},\"type\":\"{}\",\"payload\":{}}}\n",
2727 i, event_type, payload
2728 )
2729 .as_bytes(),
2730 );
2731 }
2732 let result = preprocess(&data).expect("should preprocess");
2733 let restored = reverse(&result.data, &result.metadata);
2734 assert_eq!(restored, data, "selective columnar roundtrip failed");
2735 }
2736
2737 #[test]
2738 fn selective_columnar_uses_version3() {
2739 let mut data = Vec::new();
2742 for i in 0..50 {
2743 let payload = format!(
2744 "{{\"data\":\"unique_payload_{:04}\",\"extra\":\"padding_text_to_make_this_value_long_enough_to_exceed_the_128_byte_threshold_for_selective_columnar_detection_{:04}\"}}",
2745 i,
2746 i * 7
2747 );
2748 data.extend_from_slice(
2749 format!("{{\"type\":\"event\",\"payload\":{}}}\n", payload).as_bytes(),
2750 );
2751 }
2752 let result = preprocess(&data).expect("should preprocess");
2753 assert_eq!(
2755 result.metadata[0], METADATA_VERSION_SELECTIVE,
2756 "expected selective columnar (version=3), got version={}",
2757 result.metadata[0]
2758 );
2759 let restored = reverse(&result.data, &result.metadata);
2760 assert_eq!(restored, data, "selective columnar v3 roundtrip failed");
2761 }
2762
2763 #[test]
2764 fn selective_grouped_roundtrip() {
2765 let mut data = Vec::new();
2767 for i in 0..30 {
2768 let payload = format!(
2769 "{{\"sha\":\"hash_{:04}\",\"msg\":\"unique commit message number {} that is quite long and needs to exceed the 128 byte threshold for selective columnar to activate on this column\"}}",
2770 i, i
2771 );
2772 data.extend_from_slice(
2773 format!(
2774 "{{\"id\":{},\"type\":\"push\",\"payload\":{}}}\n",
2775 i, payload
2776 )
2777 .as_bytes(),
2778 );
2779 }
2780 for i in 0..20 {
2781 let body = format!(
2782 "{{\"title\":\"PR title {}\",\"body\":\"This is a long unique pull request body number {} with details and extra text to exceed the 128 byte threshold for the selective columnar transform\"}}",
2783 i, i
2784 );
2785 data.extend_from_slice(
2786 format!(
2787 "{{\"id\":{},\"type\":\"pr\",\"payload\":{},\"org\":\"myorg\"}}\n",
2788 100 + i,
2789 body
2790 )
2791 .as_bytes(),
2792 );
2793 }
2794 let result = preprocess(&data).expect("should preprocess");
2795 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2797 let restored = reverse(&result.data, &result.metadata);
2798 assert_eq!(restored, data, "selective grouped roundtrip failed");
2799 }
2800
2801 #[test]
2802 fn selective_all_low_cardinality_stays_uniform() {
2803 let mut data = Vec::new();
2805 for i in 0..50 {
2806 let status = match i % 2 {
2807 0 => "active",
2808 _ => "inactive",
2809 };
2810 data.extend_from_slice(
2811 format!("{{\"type\":\"event\",\"status\":\"{}\"}}\n", status).as_bytes(),
2812 );
2813 }
2814 let result = preprocess(&data).expect("should preprocess");
2815 assert_eq!(
2817 result.metadata[0], METADATA_VERSION_UNIFORM,
2818 "low-cardinality data should use uniform (version=1)"
2819 );
2820 }
2821
2822 #[test]
2823 fn selective_columnar_single_inline_column() {
2824 let mut data = Vec::new();
2826 for i in 0..30 {
2827 let unique_msg = format!(
2828 "A unique message for row {} with enough text to exceed the 128 byte threshold for selective columnar detection, adding padding here to be safe: extra_{:04}",
2829 i,
2830 i * 13
2831 );
2832 data.extend_from_slice(
2833 format!(
2834 "{{\"type\":\"log\",\"level\":\"info\",\"msg\":\"{}\"}}\n",
2835 unique_msg
2836 )
2837 .as_bytes(),
2838 );
2839 }
2840 let result = preprocess(&data).expect("should preprocess");
2841 let restored = reverse(&result.data, &result.metadata);
2842 assert_eq!(restored, data, "single-inline-column roundtrip failed");
2843 }
2844}