1use std::collections::HashMap;
24
25use super::transform::TransformResult;
26
27const COL_SEP: u8 = 0x00;
28const VAL_SEP: u8 = 0x01;
29const METADATA_VERSION_UNIFORM: u8 = 1;
30const METADATA_VERSION_GROUPED: u8 = 2;
31const METADATA_VERSION_SELECTIVE: u8 = 3;
32#[allow(dead_code)]
36const METADATA_VERSION_TYPED: u8 = 4;
37
38const MIN_GROUP_ROWS: usize = 5;
40
41const SELECTIVE_MIN_AVG_LEN: usize = 128;
47
48const SELECTIVE_MAX_CARDINALITY: f64 = 0.7;
52
53type SchemaGroup<'a> = (Vec<&'a [u8]>, Vec<(usize, Vec<&'a [u8]>)>);
56
57fn extract_value(line: &[u8], mut pos: usize) -> Option<(&[u8], usize)> {
63 while pos < line.len() && line[pos].is_ascii_whitespace() {
65 pos += 1;
66 }
67 if pos >= line.len() {
68 return None;
69 }
70
71 let start = pos;
72 match line[pos] {
73 b'"' => {
74 pos += 1;
76 let mut escaped = false;
77 while pos < line.len() {
78 if escaped {
79 escaped = false;
80 } else if line[pos] == b'\\' {
81 escaped = true;
82 } else if line[pos] == b'"' {
83 pos += 1;
84 return Some((&line[start..pos], pos));
85 }
86 pos += 1;
87 }
88 None }
90 b'{' => {
91 let mut depth = 1;
93 pos += 1;
94 while pos < line.len() && depth > 0 {
95 match line[pos] {
96 b'"' => {
97 pos += 1;
99 let mut escaped = false;
100 while pos < line.len() {
101 if escaped {
102 escaped = false;
103 } else if line[pos] == b'\\' {
104 escaped = true;
105 } else if line[pos] == b'"' {
106 break;
107 }
108 pos += 1;
109 }
110 }
111 b'{' => depth += 1,
112 b'}' => depth -= 1,
113 _ => {}
114 }
115 pos += 1;
116 }
117 if depth != 0 || pos > line.len() {
118 return None; }
120 Some((&line[start..pos], pos))
121 }
122 b'[' => {
123 let mut depth = 1;
125 pos += 1;
126 while pos < line.len() && depth > 0 {
127 match line[pos] {
128 b'"' => {
129 pos += 1;
130 let mut escaped = false;
131 while pos < line.len() {
132 if escaped {
133 escaped = false;
134 } else if line[pos] == b'\\' {
135 escaped = true;
136 } else if line[pos] == b'"' {
137 break;
138 }
139 pos += 1;
140 }
141 }
142 b'[' => depth += 1,
143 b']' => depth -= 1,
144 _ => {}
145 }
146 pos += 1;
147 }
148 if depth != 0 || pos > line.len() {
149 return None; }
151 Some((&line[start..pos], pos))
152 }
153 _ => {
154 while pos < line.len() {
156 match line[pos] {
157 b',' | b'}' | b']' => break,
158 _ if line[pos].is_ascii_whitespace() => break,
159 _ => pos += 1,
160 }
161 }
162 if pos == start {
163 None
164 } else {
165 Some((&line[start..pos], pos))
166 }
167 }
168 }
169}
170
171type ParsedLine<'a> = (Vec<&'a [u8]>, Vec<&'a [u8]>);
182
183fn parse_line(line: &[u8]) -> Option<ParsedLine<'_>> {
184 let mut pos = 0;
185
186 while pos < line.len() && line[pos].is_ascii_whitespace() {
188 pos += 1;
189 }
190 if pos >= line.len() || line[pos] != b'{' {
191 return None;
192 }
193
194 let mut parts: Vec<&[u8]> = Vec::new();
195 let mut values: Vec<&[u8]> = Vec::new();
196 let mut part_start = 0;
197
198 pos += 1; loop {
201 while pos < line.len() && line[pos].is_ascii_whitespace() {
203 pos += 1;
204 }
205 if pos >= line.len() {
206 return None;
207 }
208
209 if line[pos] == b'}' {
211 parts.push(&line[part_start..]);
213 break;
214 }
215
216 if line[pos] != b'"' {
218 return None;
219 }
220 pos += 1;
222 let mut escaped = false;
223 while pos < line.len() {
224 if escaped {
225 escaped = false;
226 } else if line[pos] == b'\\' {
227 escaped = true;
228 } else if line[pos] == b'"' {
229 pos += 1;
230 break;
231 }
232 pos += 1;
233 }
234
235 while pos < line.len() && line[pos].is_ascii_whitespace() {
237 pos += 1;
238 }
239 if pos >= line.len() || line[pos] != b':' {
240 return None;
241 }
242 pos += 1; while pos < line.len() && line[pos].is_ascii_whitespace() {
248 pos += 1;
249 }
250
251 parts.push(&line[part_start..pos]);
253
254 let (value, value_end) = extract_value(line, pos)?;
256 values.push(value);
257 pos = value_end;
258
259 part_start = pos;
261
262 while pos < line.len() && line[pos].is_ascii_whitespace() {
264 pos += 1;
265 }
266 if pos >= line.len() {
267 return None;
268 }
269
270 if line[pos] == b',' {
272 pos += 1;
273 } else if line[pos] == b'}' {
274 parts.push(&line[part_start..]);
278 break;
279 } else {
280 return None; }
282 }
283
284 if values.is_empty() {
285 return None;
286 }
287
288 Some((parts, values))
289}
290
291fn split_lines(data: &[u8]) -> Vec<&[u8]> {
293 let mut lines: Vec<&[u8]> = Vec::new();
294 let mut start = 0;
295 for pos in memchr::memchr_iter(b'\n', data) {
296 lines.push(&data[start..pos]);
297 start = pos + 1;
298 }
299 if start < data.len() {
300 lines.push(&data[start..]);
301 }
302 lines
303}
304
305fn build_uniform_columnar(
308 template_parts: &[&[u8]],
309 columns: &[Vec<&[u8]>],
310 num_rows: usize,
311 has_trailing_newline: bool,
312) -> (Vec<u8>, Vec<u8>) {
313 let num_cols = columns.len();
314
315 let mut col_data = Vec::new();
317 for (ci, col) in columns.iter().enumerate() {
318 for (ri, val) in col.iter().enumerate() {
319 col_data.extend_from_slice(val);
320 if ri < num_rows - 1 {
321 col_data.push(VAL_SEP);
322 }
323 }
324 if ci < num_cols - 1 {
325 col_data.push(COL_SEP);
326 }
327 }
328
329 let mut metadata = Vec::new();
331 metadata.push(METADATA_VERSION_UNIFORM);
332 metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
333 metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
334 metadata.push(u8::from(has_trailing_newline));
335 metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
336 for part in template_parts {
337 metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
338 metadata.extend_from_slice(part);
339 }
340
341 (col_data, metadata)
342}
343
344fn classify_columns(columns: &[Vec<&[u8]>], num_rows: usize) -> Vec<bool> {
349 use std::collections::HashSet;
350 columns
351 .iter()
352 .map(|col_values| {
353 if num_rows < 10 {
354 return true; }
356 let unique: HashSet<&[u8]> = col_values.iter().copied().collect();
357 let cardinality_ratio = unique.len() as f64 / num_rows as f64;
358 let avg_len = col_values.iter().map(|v| v.len()).sum::<usize>() / num_rows;
359 !(cardinality_ratio > SELECTIVE_MAX_CARDINALITY && avg_len >= SELECTIVE_MIN_AVG_LEN)
361 })
362 .collect()
363}
364
365fn build_selective_columnar(
377 template_parts: &[&[u8]],
378 columns: &[Vec<&[u8]>],
379 extract_mask: &[bool],
380 num_rows: usize,
381 has_trailing_newline: bool,
382) -> (Vec<u8>, Vec<u8>) {
383 let num_total_cols = columns.len();
384
385 let extracted_indices: Vec<u16> = (0..num_total_cols)
386 .filter(|&i| extract_mask[i])
387 .map(|i| i as u16)
388 .collect();
389 let inline_indices: Vec<u16> = (0..num_total_cols)
390 .filter(|&i| !extract_mask[i])
391 .map(|i| i as u16)
392 .collect();
393
394 let mut extracted_data = Vec::new();
396 for (ei, &col_idx) in extracted_indices.iter().enumerate() {
397 let col = &columns[col_idx as usize];
398 for (ri, val) in col.iter().enumerate() {
399 extracted_data.extend_from_slice(val);
400 if ri < num_rows - 1 {
401 extracted_data.push(VAL_SEP);
402 }
403 }
404 if ei < extracted_indices.len() - 1 {
405 extracted_data.push(COL_SEP);
406 }
407 }
408
409 let mut inline_data = Vec::new();
411 if !inline_indices.is_empty() {
412 #[allow(clippy::needless_range_loop)]
413 for row in 0..num_rows {
414 for (ii, &col_idx) in inline_indices.iter().enumerate() {
415 inline_data.extend_from_slice(columns[col_idx as usize][row]);
416 if ii < inline_indices.len() - 1 {
417 inline_data.push(VAL_SEP);
418 }
419 }
420 if row < num_rows - 1 {
421 inline_data.push(COL_SEP);
422 }
423 }
424 }
425
426 let mut data = Vec::with_capacity(4 + extracted_data.len() + inline_data.len());
428 data.extend_from_slice(&(extracted_data.len() as u32).to_le_bytes());
429 data.extend_from_slice(&extracted_data);
430 data.extend_from_slice(&inline_data);
431
432 let mut metadata = Vec::new();
434 metadata.push(METADATA_VERSION_SELECTIVE);
435 metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
436 metadata.extend_from_slice(&(num_total_cols as u16).to_le_bytes());
437 metadata.push(u8::from(has_trailing_newline));
438 metadata.extend_from_slice(&(extracted_indices.len() as u16).to_le_bytes());
439 for &idx in &extracted_indices {
440 metadata.extend_from_slice(&idx.to_le_bytes());
441 }
442 metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
443 for part in template_parts {
444 metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
445 metadata.extend_from_slice(part);
446 }
447
448 (data, metadata)
449}
450
451fn preprocess_uniform(
454 non_empty: &[&[u8]],
455 has_trailing_newline: bool,
456) -> Option<(Vec<u8>, Vec<u8>)> {
457 if non_empty.len() < 2 {
458 return None;
459 }
460
461 let (template_parts, first_values) = parse_line(non_empty[0])?;
462 let num_cols = first_values.len();
463 if template_parts.len() != num_cols + 1 {
464 return None;
465 }
466
467 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
468 for v in &first_values {
469 columns.push(vec![*v]);
470 }
471
472 for &line in &non_empty[1..] {
473 let (parts, values) = parse_line(line)?;
474 if values.len() != num_cols || parts.len() != template_parts.len() {
475 return None;
476 }
477 for (a, b) in parts.iter().zip(template_parts.iter()) {
478 if a != b {
479 return None;
480 }
481 }
482 for (col, val) in values.iter().enumerate() {
483 columns[col].push(*val);
484 }
485 }
486
487 let extract_mask = classify_columns(&columns, non_empty.len());
493 let all_extracted = extract_mask.iter().all(|&e| e);
494
495 if all_extracted {
496 Some(build_uniform_columnar(
497 &template_parts,
498 &columns,
499 non_empty.len(),
500 has_trailing_newline,
501 ))
502 } else {
503 Some(build_selective_columnar(
504 &template_parts,
505 &columns,
506 &extract_mask,
507 non_empty.len(),
508 has_trailing_newline,
509 ))
510 }
511}
512
513fn find_discriminator<'a>(parsed: &[Option<ParsedLine<'a>>]) -> Option<usize> {
518 use std::collections::HashSet;
519
520 let mut samples: Vec<&ParsedLine<'a>> = Vec::new();
522 for line in parsed.iter().flatten() {
523 samples.push(line);
524 if samples.len() >= 200 {
525 break;
526 }
527 }
528
529 if samples.len() < 10 {
530 return None;
531 }
532
533 let num_cols = samples.iter().map(|s| s.1.len()).min().unwrap_or(0);
535 if num_cols == 0 {
536 return None;
537 }
538
539 let mut best_col = None;
540 let mut best_cardinality = usize::MAX;
541
542 for col_idx in 0..num_cols {
543 let total_len: usize = samples.iter().map(|s| s.1[col_idx].len()).sum();
545 let avg_len = total_len / samples.len();
546 if avg_len > 30 {
547 continue;
548 }
549
550 let unique: HashSet<&[u8]> = samples.iter().map(|s| s.1[col_idx]).collect();
551 let cardinality = unique.len();
552
553 if cardinality > 1 && cardinality < samples.len() / 3 && cardinality < best_cardinality {
555 best_col = Some(col_idx);
556 best_cardinality = cardinality;
557 }
558 }
559
560 best_col
561}
562
563fn preprocess_grouped<'a>(
584 non_empty: &[&'a [u8]],
585 has_trailing_newline: bool,
586) -> Option<(Vec<u8>, Vec<u8>)> {
587 if non_empty.len() < MIN_GROUP_ROWS {
588 return None;
589 }
590
591 let parsed: Vec<Option<ParsedLine<'a>>> = non_empty.iter().map(|&l| parse_line(l)).collect();
593 let disc_col = find_discriminator(&parsed);
594 drop(parsed);
595
596 let result_no_disc = preprocess_grouped_core(non_empty, has_trailing_newline, None);
598
599 if let Some(dc) = disc_col {
601 let result_disc = preprocess_grouped_core(non_empty, has_trailing_newline, Some(dc));
602 match (&result_no_disc, &result_disc) {
603 (Some((d1, m1)), Some((d2, m2))) => {
604 if d2.len() + m2.len() < d1.len() + m1.len() {
605 return result_disc;
606 }
607 return result_no_disc;
608 }
609 (None, Some(_)) => return result_disc,
610 _ => return result_no_disc,
611 }
612 }
613
614 result_no_disc
615}
616
617fn preprocess_grouped_core<'a>(
620 non_empty: &[&'a [u8]],
621 has_trailing_newline: bool,
622 disc_col: Option<usize>,
623) -> Option<(Vec<u8>, Vec<u8>)> {
624 let mut parsed: Vec<Option<ParsedLine<'a>>> = Vec::with_capacity(non_empty.len());
626 for &line in non_empty {
627 parsed.push(parse_line(line));
628 }
629
630 let mut group_map: HashMap<Vec<u8>, SchemaGroup<'a>> = HashMap::new();
631 let mut residual_indices: Vec<usize> = Vec::new();
632
633 for (idx, parsed_line) in parsed.into_iter().enumerate() {
634 if let Some((parts, values)) = parsed_line {
635 let mut key = Vec::new();
636 for part in &parts {
637 key.extend_from_slice(&(part.len() as u32).to_le_bytes());
638 key.extend_from_slice(part);
639 }
640 if let Some(dc) = disc_col {
642 if dc < values.len() {
643 key.push(0xFF);
644 key.extend_from_slice(values[dc]);
645 }
646 }
647 group_map
648 .entry(key)
649 .or_insert_with(|| (parts, Vec::new()))
650 .1
651 .push((idx, values));
652 } else {
653 residual_indices.push(idx);
654 }
655 }
656
657 let mut groups: Vec<SchemaGroup<'a>> = Vec::new();
659 for (_key, (template_parts, rows)) in group_map {
660 if rows.len() >= MIN_GROUP_ROWS {
661 groups.push((template_parts, rows));
662 } else {
663 for (idx, _) in &rows {
665 residual_indices.push(*idx);
666 }
667 }
668 }
669
670 if groups.is_empty() {
672 return None;
673 }
674
675 groups.sort_by_key(|(_, rows)| rows[0].0);
677 residual_indices.sort_unstable();
678
679 struct GroupOutput {
681 row_indices: Vec<u32>,
682 col_data: Vec<u8>,
683 group_metadata: Vec<u8>,
684 }
685
686 let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
687
688 for (template_parts, rows) in &groups {
689 let num_cols = template_parts.len() - 1;
690 let mut columns: Vec<Vec<&[u8]>> = (0..num_cols).map(|_| Vec::new()).collect();
691 let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
692
693 for (idx, values) in rows {
694 row_indices.push(*idx as u32);
695 for (col, val) in values.iter().enumerate() {
696 columns[col].push(*val);
697 }
698 }
699
700 let extract_mask = classify_columns(&columns, rows.len());
702 let all_extracted = extract_mask.iter().all(|&e| e);
703
704 let (mut col_data, mut group_metadata) = if all_extracted {
705 build_uniform_columnar(template_parts, &columns, rows.len(), false)
706 } else {
707 build_selective_columnar(template_parts, &columns, &extract_mask, rows.len(), false)
708 };
709
710 if all_extracted {
714 if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, rows.len())
715 {
716 let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
717 let unflattened = unflatten_nested_columns(
718 &flat_data,
719 &nested_groups,
720 rows.len(),
721 total_flat_cols,
722 );
723 if unflattened == col_data {
724 let nested_bytes = serialize_nested_info(&nested_groups);
725 group_metadata.extend_from_slice(&nested_bytes);
726 col_data = flat_data;
727 } else {
728 group_metadata.push(0u8); }
730 } else {
731 group_metadata.push(0u8); }
733 }
734
735 group_outputs.push(GroupOutput {
736 row_indices,
737 col_data,
738 group_metadata,
739 });
740 }
741
742 let mut data_out = Vec::new();
744 for group in &group_outputs {
745 data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
746 data_out.extend_from_slice(&group.col_data);
747 }
748
749 let residual_start = data_out.len();
751 for (i, &idx) in residual_indices.iter().enumerate() {
752 data_out.extend_from_slice(non_empty[idx]);
753 if i < residual_indices.len() - 1 {
754 data_out.push(b'\n');
755 }
756 }
757 let _residual_len = data_out.len() - residual_start;
758
759 let mut metadata = Vec::new();
761 metadata.push(METADATA_VERSION_GROUPED);
762 metadata.push(u8::from(has_trailing_newline));
763 metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
764 metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
765
766 for group in &group_outputs {
767 metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
768 for &idx in &group.row_indices {
769 metadata.extend_from_slice(&idx.to_le_bytes());
770 }
771 metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
772 metadata.extend_from_slice(&group.group_metadata);
773 }
774
775 metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
776 for &idx in &residual_indices {
777 metadata.extend_from_slice(&(idx as u32).to_le_bytes());
778 }
779
780 Some((data_out, metadata))
781}
782
783pub(crate) struct NestedGroupInfo {
785 pub(crate) original_col_index: u16,
787 pub(crate) sub_keys: Vec<Vec<u8>>,
789 pub(crate) nested_template: Vec<Vec<u8>>,
792 pub(crate) absence_bitmap: Vec<u8>,
798}
799
800pub(crate) fn flatten_nested_columns(
806 col_data: &[u8],
807 num_rows: usize,
808) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
809 let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
811 if columns.is_empty() || num_rows == 0 {
812 return None;
813 }
814
815 let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
816 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
819
820 for (col_idx, &col_chunk) in columns.iter().enumerate() {
821 let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
822 if values.len() != num_rows {
823 return None;
824 }
825
826 let mut all_objects = true;
828 let mut has_non_null = false;
829 for val in &values {
830 if *val == b"null" {
831 continue;
832 }
833 has_non_null = true;
834 if !val.starts_with(b"{") {
835 all_objects = false;
836 break;
837 }
838 }
839
840 if !all_objects || !has_non_null {
841 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
843 output_columns.push(col_values);
844 continue;
845 }
846
847 let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
851 let mut nested_template: Vec<Vec<u8>> = Vec::new();
852 type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
853 let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
854
855 for val in &values {
856 if *val == b"null" {
857 parsed_rows.push(None);
858 continue;
859 }
860 if nested_template.is_empty() {
861 match parse_nested_object_with_template(val) {
863 Some((template, kv_pairs)) => {
864 for (key, _) in &kv_pairs {
865 if !all_sub_keys.iter().any(|k| k == key) {
866 all_sub_keys.push(key.clone());
867 }
868 }
869 nested_template = template;
870 parsed_rows.push(Some(kv_pairs));
871 }
872 None => {
873 all_sub_keys.clear();
874 break;
875 }
876 }
877 } else {
878 match parse_nested_object_kv(val) {
880 Some(kv_pairs) => {
881 for (key, _) in &kv_pairs {
882 if !all_sub_keys.iter().any(|k| k == key) {
883 all_sub_keys.push(key.clone());
884 }
885 }
886 parsed_rows.push(Some(kv_pairs));
887 }
888 None => {
889 all_sub_keys.clear();
890 break;
891 }
892 }
893 }
894 }
895
896 if all_sub_keys.is_empty() {
897 let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
899 output_columns.push(col_values);
900 continue;
901 }
902
903 let num_sub_keys = all_sub_keys.len();
907 let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
908 let total_bits = num_sub_keys * num_rows;
909 let bitmap_bytes = total_bits.div_ceil(8);
910 let mut absence_bitmap = vec![0u8; bitmap_bytes];
911 let mut has_any_absent = false;
912
913 for (row_idx, parsed) in parsed_rows.iter().enumerate() {
914 match parsed {
915 Some(kv_pairs) => {
916 for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
917 let found = kv_pairs.iter().find(|(k, _)| k == sk);
918 match found {
919 Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
920 None => {
921 sub_columns[sk_idx].push(b"null".to_vec());
922 let bit_idx = sk_idx * num_rows + row_idx;
924 absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
925 has_any_absent = true;
926 }
927 }
928 }
929 }
930 None => {
931 for sc in &mut sub_columns {
935 sc.push(b"null".to_vec());
936 }
937 }
938 }
939 }
940
941 nested_groups.push(NestedGroupInfo {
942 original_col_index: col_idx as u16,
943 sub_keys: all_sub_keys,
944 nested_template,
945 absence_bitmap: if has_any_absent {
946 absence_bitmap
947 } else {
948 Vec::new()
949 },
950 });
951
952 for sc in sub_columns {
953 output_columns.push(sc);
954 }
955 }
956
957 if nested_groups.is_empty() {
958 return None;
959 }
960
961 let num_out_cols = output_columns.len();
963 let mut out = Vec::new();
964 for (ci, col) in output_columns.iter().enumerate() {
965 for (ri, val) in col.iter().enumerate() {
966 out.extend_from_slice(val);
967 if ri < num_rows - 1 {
968 out.push(VAL_SEP);
969 }
970 }
971 if ci < num_out_cols - 1 {
972 out.push(COL_SEP);
973 }
974 }
975
976 Some((out, nested_groups))
977}
978
979#[allow(clippy::type_complexity)]
984pub(crate) fn parse_nested_object_with_template(
985 obj: &[u8],
986) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
987 let mut pos = 0;
988
989 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
991 pos += 1;
992 }
993 if pos >= obj.len() || obj[pos] != b'{' {
994 return None;
995 }
996 pos += 1;
997
998 let mut parts: Vec<Vec<u8>> = Vec::new();
999 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1000 let mut part_start = 0;
1001
1002 loop {
1003 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1005 pos += 1;
1006 }
1007 if pos >= obj.len() {
1008 return None;
1009 }
1010 if obj[pos] == b'}' {
1011 parts.push(obj[part_start..].to_vec());
1012 break;
1013 }
1014
1015 if obj[pos] != b'"' {
1017 return None;
1018 }
1019 let key_str_start = pos + 1;
1020 pos += 1;
1021 let mut escaped = false;
1022 while pos < obj.len() {
1023 if escaped {
1024 escaped = false;
1025 } else if obj[pos] == b'\\' {
1026 escaped = true;
1027 } else if obj[pos] == b'"' {
1028 break;
1029 }
1030 pos += 1;
1031 }
1032 if pos >= obj.len() {
1033 return None;
1034 }
1035 let key = obj[key_str_start..pos].to_vec();
1036 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1040 pos += 1;
1041 }
1042 if pos >= obj.len() || obj[pos] != b':' {
1043 return None;
1044 }
1045 pos += 1;
1046
1047 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1049 pos += 1;
1050 }
1051
1052 parts.push(obj[part_start..pos].to_vec());
1054
1055 let value_start = pos;
1057 let (value, value_end) = extract_value(obj, value_start)?;
1059 pos = value_end;
1060 pairs.push((key, value.to_vec()));
1061
1062 part_start = pos;
1063
1064 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1066 pos += 1;
1067 }
1068 if pos >= obj.len() {
1069 return None;
1070 }
1071 if obj[pos] == b',' {
1072 pos += 1;
1073 } else if obj[pos] == b'}' {
1074 parts.push(obj[part_start..].to_vec());
1075 break;
1076 } else {
1077 return None;
1078 }
1079 }
1080
1081 if pairs.is_empty() {
1082 return None;
1083 }
1084 Some((parts, pairs))
1085}
1086
1087pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
1091 let mut pos = 0;
1092
1093 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1095 pos += 1;
1096 }
1097 if pos >= obj.len() || obj[pos] != b'{' {
1098 return None;
1099 }
1100 pos += 1;
1101
1102 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1103
1104 loop {
1105 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1107 pos += 1;
1108 }
1109 if pos >= obj.len() {
1110 return None;
1111 }
1112 if obj[pos] == b'}' {
1113 break;
1114 }
1115
1116 if obj[pos] != b'"' {
1118 return None;
1119 }
1120 pos += 1;
1121 let key_start = pos;
1122 let mut escaped = false;
1123 while pos < obj.len() {
1124 if escaped {
1125 escaped = false;
1126 } else if obj[pos] == b'\\' {
1127 escaped = true;
1128 } else if obj[pos] == b'"' {
1129 break;
1130 }
1131 pos += 1;
1132 }
1133 if pos >= obj.len() {
1134 return None;
1135 }
1136 let key = obj[key_start..pos].to_vec();
1137 pos += 1; while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1141 pos += 1;
1142 }
1143 if pos >= obj.len() || obj[pos] != b':' {
1144 return None;
1145 }
1146 pos += 1;
1147
1148 let (value, value_end) = extract_value(obj, pos)?;
1150 pos = value_end;
1151 pairs.push((key, value.to_vec()));
1152
1153 while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1155 pos += 1;
1156 }
1157 if pos >= obj.len() {
1158 return None;
1159 }
1160 if obj[pos] == b',' {
1161 pos += 1;
1162 } else if obj[pos] == b'}' {
1163 break;
1164 } else {
1165 return None;
1166 }
1167 }
1168
1169 if pairs.is_empty() {
1170 return None;
1171 }
1172 Some(pairs)
1173}
1174
1175pub(crate) fn unflatten_nested_columns(
1180 flat_data: &[u8],
1181 nested_groups: &[NestedGroupInfo],
1182 num_rows: usize,
1183 total_flat_cols: usize,
1184) -> Vec<u8> {
1185 let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
1186 if flat_columns.len() != total_flat_cols {
1187 return flat_data.to_vec();
1188 }
1189
1190 let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
1192 for chunk in &flat_columns {
1193 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1194 if vals.len() != num_rows {
1195 return flat_data.to_vec();
1196 }
1197 flat_col_values.push(vals);
1198 }
1199
1200 let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
1203
1204 let original_num_cols = total_flat_cols
1213 - nested_groups
1214 .iter()
1215 .map(|g| g.sub_keys.len())
1216 .sum::<usize>()
1217 + nested_groups.len();
1218
1219 let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
1221 for (gi, group) in nested_groups.iter().enumerate() {
1222 if (group.original_col_index as usize) < original_num_cols {
1223 original_col_map[group.original_col_index as usize] = Some(gi);
1224 }
1225 }
1226
1227 let mut flat_idx = 0;
1228 for entry in original_col_map.iter().take(original_num_cols) {
1229 if let Some(gi) = entry {
1230 let group = &nested_groups[*gi];
1231 let num_sub = group.sub_keys.len();
1232
1233 let is_absent = |si: usize, row: usize| -> bool {
1235 if group.absence_bitmap.is_empty() {
1236 return false; }
1238 let bit_idx = si * num_rows + row;
1239 let byte_idx = bit_idx / 8;
1240 if byte_idx >= group.absence_bitmap.len() {
1241 return false;
1242 }
1243 (group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
1244 };
1245
1246 let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
1248 for row in 0..num_rows {
1249 let all_null = (0..num_sub).all(|si| {
1252 flat_idx + si < flat_col_values.len()
1253 && flat_col_values[flat_idx + si][row] == b"null"
1254 });
1255 if all_null && !group.absence_bitmap.is_empty() {
1256 let any_present_null = (0..num_sub).any(|si| {
1259 flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
1260 });
1261 if any_present_null {
1262 } else {
1265 merged_col.push(b"null".to_vec());
1267 continue;
1268 }
1269 } else if all_null {
1270 merged_col.push(b"null".to_vec());
1271 continue;
1272 }
1273
1274 let has_absent = (0..num_sub).any(|si| is_absent(si, row));
1276
1277 if !has_absent
1278 && !group.nested_template.is_empty()
1279 && group.nested_template.len() == num_sub + 1
1280 {
1281 let mut obj = Vec::new();
1284 obj.extend_from_slice(&group.nested_template[0]);
1285 if flat_idx < flat_col_values.len() {
1286 obj.extend_from_slice(flat_col_values[flat_idx][row]);
1287 }
1288 for si in 1..num_sub {
1289 obj.extend_from_slice(&group.nested_template[si]);
1290 if flat_idx + si < flat_col_values.len() {
1291 obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
1292 }
1293 }
1294 obj.extend_from_slice(&group.nested_template[num_sub]);
1295 merged_col.push(obj);
1296 } else {
1297 let mut obj = Vec::new();
1300 obj.push(b'{');
1301 let mut first = true;
1302 for si in 0..num_sub {
1303 if flat_idx + si >= flat_col_values.len() {
1304 break;
1305 }
1306 if is_absent(si, row) {
1307 continue; }
1309 let val = flat_col_values[flat_idx + si][row];
1310 if !first {
1311 obj.push(b',');
1312 }
1313 first = false;
1314 obj.push(b'"');
1315 obj.extend_from_slice(&group.sub_keys[si]);
1316 obj.push(b'"');
1317 obj.push(b':');
1318 obj.extend_from_slice(val);
1319 }
1320 obj.push(b'}');
1321 merged_col.push(obj);
1322 }
1323 }
1324 output_columns.push(merged_col);
1325 flat_idx += num_sub;
1326 } else {
1327 if flat_idx < flat_col_values.len() {
1329 let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1330 .iter()
1331 .map(|v| v.to_vec())
1332 .collect();
1333 output_columns.push(col);
1334 }
1335 flat_idx += 1;
1336 }
1337 }
1338
1339 let num_out_cols = output_columns.len();
1341 let mut out = Vec::new();
1342 for (ci, col) in output_columns.iter().enumerate() {
1343 for (ri, val) in col.iter().enumerate() {
1344 out.extend_from_slice(val);
1345 if ri < num_rows - 1 {
1346 out.push(VAL_SEP);
1347 }
1348 }
1349 if ci < num_out_cols - 1 {
1350 out.push(COL_SEP);
1351 }
1352 }
1353
1354 out
1355}
1356
1357pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1362 let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1363 let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
1364 let mut out = Vec::new();
1365 let version = if has_absence {
1366 3u8
1367 } else if has_template {
1368 2u8
1369 } else {
1370 1u8
1371 };
1372 out.push(version);
1373 out.push(groups.len() as u8);
1374 for group in groups {
1375 out.extend_from_slice(&group.original_col_index.to_le_bytes());
1376 out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1377 for key in &group.sub_keys {
1378 out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1379 out.extend_from_slice(key);
1380 }
1381 if has_template || version == 3 {
1382 out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1383 for part in &group.nested_template {
1384 out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1385 out.extend_from_slice(part);
1386 }
1387 }
1388 if version == 3 {
1389 let bm_len = group.absence_bitmap.len() as u32;
1390 out.extend_from_slice(&bm_len.to_le_bytes());
1391 out.extend_from_slice(&group.absence_bitmap);
1392 }
1393 }
1394 out
1395}
1396
1397pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1402 if data.is_empty() {
1403 return None;
1404 }
1405 let mut pos = 0;
1406 let version = data[pos];
1407 pos += 1;
1408 if version != 1 && version != 2 && version != 3 {
1409 return None;
1410 }
1411 let has_template = version == 2 || version == 3;
1412 let has_absence = version == 3;
1413 if pos >= data.len() {
1414 return None;
1415 }
1416 let num_groups = data[pos] as usize;
1417 pos += 1;
1418
1419 let mut groups = Vec::with_capacity(num_groups);
1420 for _ in 0..num_groups {
1421 if pos + 4 > data.len() {
1422 return None;
1423 }
1424 let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1425 pos += 2;
1426 let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1427 pos += 2;
1428
1429 let mut sub_keys = Vec::with_capacity(num_sub_cols);
1430 for _ in 0..num_sub_cols {
1431 if pos + 2 > data.len() {
1432 return None;
1433 }
1434 let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1435 pos += 2;
1436 if pos + key_len > data.len() {
1437 return None;
1438 }
1439 sub_keys.push(data[pos..pos + key_len].to_vec());
1440 pos += key_len;
1441 }
1442
1443 let nested_template = if has_template {
1444 if pos + 2 > data.len() {
1445 return None;
1446 }
1447 let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1448 pos += 2;
1449 let mut parts = Vec::with_capacity(num_parts);
1450 for _ in 0..num_parts {
1451 if pos + 2 > data.len() {
1452 return None;
1453 }
1454 let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1455 pos += 2;
1456 if pos + part_len > data.len() {
1457 return None;
1458 }
1459 parts.push(data[pos..pos + part_len].to_vec());
1460 pos += part_len;
1461 }
1462 parts
1463 } else {
1464 Vec::new()
1465 };
1466
1467 let absence_bitmap = if has_absence {
1468 if pos + 4 > data.len() {
1469 return None;
1470 }
1471 let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1472 pos += 4;
1473 if pos + bm_len > data.len() {
1474 return None;
1475 }
1476 let bm = data[pos..pos + bm_len].to_vec();
1477 pos += bm_len;
1478 bm
1479 } else {
1480 Vec::new()
1481 };
1482
1483 groups.push(NestedGroupInfo {
1484 original_col_index,
1485 sub_keys,
1486 nested_template,
1487 absence_bitmap,
1488 });
1489 }
1490
1491 Some((groups, pos))
1492}
1493
1494pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1499 if data.is_empty() {
1500 return None;
1501 }
1502
1503 let has_trailing_newline = data.last() == Some(&b'\n');
1504 let lines = split_lines(data);
1505 let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1506
1507 if non_empty.len() < 2 {
1508 return None;
1509 }
1510
1511 if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1513 let is_selective = !metadata.is_empty() && metadata[0] == METADATA_VERSION_SELECTIVE;
1514 let size_ok = if is_selective {
1518 (col_data.len() + metadata.len()) * 100 <= data.len() * 105
1519 } else {
1520 col_data.len() + metadata.len() < data.len()
1521 };
1522 if size_ok {
1523 if is_selective {
1525 return Some(TransformResult {
1526 data: col_data,
1527 metadata,
1528 });
1529 }
1530 let num_rows = non_empty.len();
1535 if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1536 let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
1541 let unflattened =
1542 unflatten_nested_columns(&flat_data, &nested_groups, num_rows, total_flat_cols);
1543 if unflattened == col_data {
1544 let nested_bytes = serialize_nested_info(&nested_groups);
1546 metadata.extend_from_slice(&nested_bytes);
1547 return Some(TransformResult {
1548 data: flat_data,
1549 metadata,
1550 });
1551 }
1552 }
1554 metadata.push(0u8); return Some(TransformResult {
1557 data: col_data,
1558 metadata,
1559 });
1560 }
1561 }
1562
1563 if let Some((grouped_data, grouped_metadata)) =
1565 preprocess_grouped(&non_empty, has_trailing_newline)
1566 {
1567 if grouped_data.len() + grouped_metadata.len() < data.len() {
1568 return Some(TransformResult {
1569 data: grouped_data,
1570 metadata: grouped_metadata,
1571 });
1572 }
1573 }
1574
1575 None
1576}
1577
1578pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1581 if metadata.is_empty() {
1582 return data.to_vec();
1583 }
1584 match metadata[0] {
1585 METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1586 METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1587 METADATA_VERSION_SELECTIVE => reverse_selective(data, metadata),
1588 _ => data.to_vec(),
1589 }
1590}
1591
1592fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1594 if metadata.len() < 10 {
1595 return data.to_vec();
1596 }
1597 let mut pos = 0;
1598 pos += 1;
1600 let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1601 pos += 4;
1602 let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1603 pos += 2;
1604 let has_trailing_newline = metadata[pos] != 0;
1605 pos += 1;
1606 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1607 pos += 2;
1608
1609 let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1610 for _ in 0..num_parts {
1611 if pos + 2 > metadata.len() {
1612 return data.to_vec();
1613 }
1614 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1615 pos += 2;
1616 if pos + part_len > metadata.len() {
1617 return data.to_vec();
1618 }
1619 parts.push(metadata[pos..pos + part_len].to_vec());
1620 pos += part_len;
1621 }
1622
1623 if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1624 return data.to_vec();
1625 }
1626
1627 let remaining_metadata = &metadata[pos..];
1629 if !remaining_metadata.is_empty()
1630 && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
1631 {
1632 if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1634 let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1636 let unflattened =
1637 unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1638 return reverse_uniform_from_parts(
1639 &unflattened,
1640 &parts,
1641 num_rows,
1642 num_cols,
1643 has_trailing_newline,
1644 );
1645 }
1646 }
1647
1648 reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1649}
1650
1651fn reverse_uniform_from_parts(
1653 data: &[u8],
1654 parts: &[Vec<u8>],
1655 num_rows: usize,
1656 num_cols: usize,
1657 has_trailing_newline: bool,
1658) -> Vec<u8> {
1659 let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1660 if col_chunks.len() != num_cols {
1661 return data.to_vec();
1662 }
1663
1664 let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1665 for chunk in &col_chunks {
1666 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1667 if vals.len() != num_rows {
1668 return data.to_vec();
1669 }
1670 columns.push(vals);
1671 }
1672
1673 let template_size_per_row: usize = parts.iter().map(std::vec::Vec::len).sum();
1675 let values_total: usize = columns
1676 .iter()
1677 .map(|col| col.iter().map(|v| v.len()).sum::<usize>())
1678 .sum();
1679 let newline_count = if has_trailing_newline {
1680 num_rows
1681 } else {
1682 num_rows - 1
1683 };
1684 let total_size = template_size_per_row * num_rows + values_total + newline_count;
1685
1686 let mut output = Vec::with_capacity(total_size);
1687 #[allow(clippy::needless_range_loop)]
1688 for row in 0..num_rows {
1689 output.extend_from_slice(&parts[0]);
1690 output.extend_from_slice(columns[0][row]);
1691 for col in 1..num_cols {
1692 output.extend_from_slice(&parts[col]);
1693 output.extend_from_slice(columns[col][row]);
1694 }
1695 output.extend_from_slice(&parts[num_cols]);
1696
1697 if row < num_rows - 1 || has_trailing_newline {
1698 output.push(b'\n');
1699 }
1700 }
1701
1702 output
1703}
1704
1705struct SelectiveMetadata {
1707 parts: Vec<Vec<u8>>,
1708 num_rows: usize,
1709 num_total_cols: usize,
1710 has_trailing_newline: bool,
1711 extracted_col_indices: Vec<u16>,
1712}
1713
1714fn parse_selective_metadata(metadata: &[u8]) -> Option<SelectiveMetadata> {
1716 if metadata.len() < 12 {
1717 return None;
1718 }
1719 let mut pos = 1; let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1721 pos += 4;
1722 let num_total_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1723 pos += 2;
1724 let has_trailing_newline = metadata[pos] != 0;
1725 pos += 1;
1726 let num_extracted = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1727 pos += 2;
1728
1729 let mut extracted_col_indices = Vec::with_capacity(num_extracted);
1730 for _ in 0..num_extracted {
1731 if pos + 2 > metadata.len() {
1732 return None;
1733 }
1734 let idx = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap());
1735 pos += 2;
1736 extracted_col_indices.push(idx);
1737 }
1738
1739 if pos + 2 > metadata.len() {
1740 return None;
1741 }
1742 let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1743 pos += 2;
1744
1745 let mut parts = Vec::with_capacity(num_parts);
1746 for _ in 0..num_parts {
1747 if pos + 2 > metadata.len() {
1748 return None;
1749 }
1750 let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1751 pos += 2;
1752 if pos + part_len > metadata.len() {
1753 return None;
1754 }
1755 parts.push(metadata[pos..pos + part_len].to_vec());
1756 pos += part_len;
1757 }
1758
1759 if parts.len() != num_total_cols + 1 || num_rows == 0 || num_total_cols == 0 {
1760 return None;
1761 }
1762
1763 Some(SelectiveMetadata {
1764 parts,
1765 num_rows,
1766 num_total_cols,
1767 has_trailing_newline,
1768 extracted_col_indices,
1769 })
1770}
1771
1772fn reverse_selective(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1774 let sm = match parse_selective_metadata(metadata) {
1775 Some(v) => v,
1776 None => return data.to_vec(),
1777 };
1778 reverse_selective_from_data(data, &sm)
1779}
1780
1781fn reverse_selective_from_data(data: &[u8], sm: &SelectiveMetadata) -> Vec<u8> {
1783 if data.len() < 4 {
1784 return data.to_vec();
1785 }
1786
1787 let extracted_data_len = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1789 if 4 + extracted_data_len > data.len() {
1790 return data.to_vec();
1791 }
1792 let extracted_section = &data[4..4 + extracted_data_len];
1793 let inline_section = &data[4 + extracted_data_len..];
1794
1795 let num_extracted = sm.extracted_col_indices.len();
1796 let num_inline = sm.num_total_cols - num_extracted;
1797
1798 let extracted_columns: Vec<Vec<&[u8]>> = if num_extracted > 0 && !extracted_section.is_empty() {
1800 let col_chunks: Vec<&[u8]> = extracted_section.split(|&b| b == COL_SEP).collect();
1801 if col_chunks.len() != num_extracted {
1802 return data.to_vec();
1803 }
1804 let mut cols = Vec::with_capacity(num_extracted);
1805 for chunk in &col_chunks {
1806 let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1807 if vals.len() != sm.num_rows {
1808 return data.to_vec();
1809 }
1810 cols.push(vals);
1811 }
1812 cols
1813 } else if num_extracted > 0 {
1814 return data.to_vec();
1816 } else {
1817 Vec::new()
1818 };
1819
1820 let inline_rows: Vec<Vec<&[u8]>> = if num_inline > 0 && !inline_section.is_empty() {
1822 let row_chunks: Vec<&[u8]> = inline_section.split(|&b| b == COL_SEP).collect();
1823 if row_chunks.len() != sm.num_rows {
1824 return data.to_vec();
1825 }
1826 let mut rows = Vec::with_capacity(sm.num_rows);
1827 for chunk in &row_chunks {
1828 let vals: Vec<&[u8]> = if num_inline > 1 {
1829 chunk.split(|&b| b == VAL_SEP).collect()
1830 } else {
1831 vec![*chunk]
1832 };
1833 if vals.len() != num_inline {
1834 return data.to_vec();
1835 }
1836 rows.push(vals);
1837 }
1838 rows
1839 } else if num_inline > 0 {
1840 return data.to_vec();
1842 } else {
1843 Vec::new()
1844 };
1845
1846 let mut extracted_positions = vec![None; sm.num_total_cols];
1850 let mut inline_positions = vec![None; sm.num_total_cols];
1851 for (ei, &col_idx) in sm.extracted_col_indices.iter().enumerate() {
1852 if (col_idx as usize) < sm.num_total_cols {
1853 extracted_positions[col_idx as usize] = Some(ei);
1854 }
1855 }
1856 let mut ii = 0;
1857 for col in 0..sm.num_total_cols {
1858 if extracted_positions[col].is_none() {
1859 inline_positions[col] = Some(ii);
1860 ii += 1;
1861 }
1862 }
1863
1864 let mut output = Vec::with_capacity(data.len() * 2);
1866 for row in 0..sm.num_rows {
1867 output.extend_from_slice(&sm.parts[0]);
1869 for col in 0..sm.num_total_cols {
1870 if let Some(ei) = extracted_positions[col] {
1871 output.extend_from_slice(extracted_columns[ei][row]);
1872 } else if let Some(ii_idx) = inline_positions[col] {
1873 output.extend_from_slice(inline_rows[row][ii_idx]);
1874 }
1875 output.extend_from_slice(&sm.parts[col + 1]);
1876 }
1877
1878 if row < sm.num_rows - 1 || sm.has_trailing_newline {
1879 output.push(b'\n');
1880 }
1881 }
1882
1883 output
1884}
1885
1886fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1888 if metadata.len() < 8 {
1889 return data.to_vec();
1890 }
1891
1892 let mut mpos = 1; let has_trailing_newline = metadata[mpos] != 0;
1894 mpos += 1;
1895 let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1896 mpos += 4;
1897 let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1898 mpos += 2;
1899
1900 let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1902
1903 let mut dpos: usize = 0;
1905
1906 for _ in 0..num_groups {
1907 if mpos + 4 > metadata.len() {
1909 return data.to_vec();
1910 }
1911 let group_row_count =
1912 u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1913 mpos += 4;
1914
1915 let mut row_indices = Vec::with_capacity(group_row_count);
1916 for _ in 0..group_row_count {
1917 if mpos + 4 > metadata.len() {
1918 return data.to_vec();
1919 }
1920 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1921 mpos += 4;
1922 row_indices.push(idx);
1923 }
1924
1925 if mpos + 4 > metadata.len() {
1927 return data.to_vec();
1928 }
1929 let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1930 mpos += 4;
1931 if mpos + gm_len > metadata.len() {
1932 return data.to_vec();
1933 }
1934 let group_metadata = &metadata[mpos..mpos + gm_len];
1935 mpos += gm_len;
1936
1937 if dpos + 4 > data.len() {
1939 return data.to_vec();
1940 }
1941 let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1942 dpos += 4;
1943 if dpos + gd_len > data.len() {
1944 return data.to_vec();
1945 }
1946 let group_data = &data[dpos..dpos + gd_len];
1947 dpos += gd_len;
1948
1949 let group_version = if group_metadata.is_empty() {
1951 0
1952 } else {
1953 group_metadata[0]
1954 };
1955
1956 if group_version == METADATA_VERSION_SELECTIVE {
1957 let sm = match parse_selective_metadata(group_metadata) {
1959 Some(v) => v,
1960 None => return data.to_vec(),
1961 };
1962 if sm.num_rows != group_row_count {
1963 return data.to_vec();
1964 }
1965 let reconstructed = reverse_selective_from_data(group_data, &sm);
1966 let lines: Vec<&[u8]> = reconstructed.split(|&b| b == b'\n').collect();
1968 for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1969 if row_within_group < lines.len() && original_idx < total_rows {
1970 output_lines[original_idx] = Some(lines[row_within_group].to_vec());
1971 }
1972 }
1973 } else {
1974 let reconstructed = reverse_uniform(group_data, group_metadata);
1977 let lines: Vec<&[u8]> = reconstructed.split(|&b| b == b'\n').collect();
1978 for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1979 if row_within_group < lines.len() && original_idx < total_rows {
1980 output_lines[original_idx] = Some(lines[row_within_group].to_vec());
1981 }
1982 }
1983 }
1984 }
1985
1986 if mpos + 4 > metadata.len() {
1988 return data.to_vec();
1989 }
1990 let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1991 mpos += 4;
1992
1993 let mut residual_indices = Vec::with_capacity(residual_count);
1994 for _ in 0..residual_count {
1995 if mpos + 4 > metadata.len() {
1996 return data.to_vec();
1997 }
1998 let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1999 mpos += 4;
2000 residual_indices.push(idx);
2001 }
2002
2003 let residual_data = &data[dpos..];
2005 if residual_count > 0 {
2006 let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
2007 vec![]
2008 } else {
2009 residual_data.split(|&b| b == b'\n').collect()
2010 };
2011 if residual_lines.len() != residual_count {
2013 return data.to_vec();
2014 }
2015 for (i, &idx) in residual_indices.iter().enumerate() {
2016 if idx < total_rows {
2017 output_lines[idx] = Some(residual_lines[i].to_vec());
2018 }
2019 }
2020 }
2021
2022 let mut output = Vec::with_capacity(data.len() * 2);
2024 for (i, slot) in output_lines.iter().enumerate() {
2025 match slot {
2026 Some(line) => output.extend_from_slice(line),
2027 None => {
2028 return data.to_vec();
2030 }
2031 }
2032 if i < total_rows - 1 || has_trailing_newline {
2033 output.push(b'\n');
2034 }
2035 }
2036
2037 output
2038}
2039
2040#[cfg(test)]
2041mod tests {
2042 use super::*;
2043
2044 #[test]
2045 fn extract_value_string() {
2046 let line = br#""hello","next""#;
2047 let (val, end) = extract_value(line, 0).unwrap();
2048 assert_eq!(val, b"\"hello\"");
2049 assert_eq!(end, 7);
2050 }
2051
2052 #[test]
2053 fn extract_value_number() {
2054 let line = b"42,next";
2055 let (val, end) = extract_value(line, 0).unwrap();
2056 assert_eq!(val, b"42");
2057 assert_eq!(end, 2);
2058 }
2059
2060 #[test]
2061 fn extract_value_bool() {
2062 let line = b"true,next";
2063 let (val, end) = extract_value(line, 0).unwrap();
2064 assert_eq!(val, b"true");
2065 assert_eq!(end, 4);
2066 }
2067
2068 #[test]
2069 fn extract_value_null() {
2070 let line = b"null,next";
2071 let (val, end) = extract_value(line, 0).unwrap();
2072 assert_eq!(val, b"null");
2073 assert_eq!(end, 4);
2074 }
2075
2076 #[test]
2077 fn extract_value_object() {
2078 let line = br#"{"a":1,"b":"x"},next"#;
2079 let (val, end) = extract_value(line, 0).unwrap();
2080 assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
2081 assert_eq!(end, 15);
2082 }
2083
2084 #[test]
2085 fn extract_value_array() {
2086 let line = b"[1,2,3],next";
2087 let (val, end) = extract_value(line, 0).unwrap();
2088 assert_eq!(val, b"[1,2,3]");
2089 assert_eq!(end, 7);
2090 }
2091
2092 #[test]
2093 fn extract_value_string_with_escapes() {
2094 let line = br#""he\"llo",next"#;
2095 let (val, end) = extract_value(line, 0).unwrap();
2096 assert_eq!(val, br#""he\"llo""#.to_vec());
2097 assert_eq!(end, 9);
2098 }
2099
2100 #[test]
2101 fn parse_line_simple() {
2102 let line = br#"{"a":1,"b":"x"}"#;
2103 let (parts, values) = parse_line(line).unwrap();
2104 assert_eq!(parts.len(), 3); assert_eq!(values.len(), 2);
2106 assert_eq!(values[0], b"1");
2107 assert_eq!(values[1], b"\"x\"");
2108 assert_eq!(parts[0], br#"{"a":"#.to_vec());
2109 assert_eq!(parts[1], br#","b":"#.to_vec());
2110 assert_eq!(parts[2], b"}");
2111 }
2112
2113 #[test]
2114 fn roundtrip_simple() {
2115 let data = br#"{"a":1,"b":"x"}
2116{"a":2,"b":"y"}
2117{"a":3,"b":"z"}
2118"#;
2119 let result = preprocess(data).expect("should produce transform");
2120 let restored = reverse(&result.data, &result.metadata);
2121 assert_eq!(
2122 String::from_utf8_lossy(&restored),
2123 String::from_utf8_lossy(data),
2124 );
2125 assert_eq!(restored, data.to_vec());
2126 }
2127
2128 #[test]
2129 fn roundtrip_no_trailing_newline() {
2130 let data = br#"{"a":1,"b":"x"}
2131{"a":2,"b":"y"}
2132{"a":3,"b":"z"}"#;
2133 let result = preprocess(data).expect("should produce transform");
2134 let restored = reverse(&result.data, &result.metadata);
2135 assert_eq!(restored, data.to_vec());
2136 }
2137
2138 #[test]
2139 fn roundtrip_nested_values() {
2140 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2141{"id":2,"meta":{"x":30,"y":40}}
2142{"id":3,"meta":{"x":50,"y":60}}
2143{"id":4,"meta":{"x":70,"y":80}}
2144{"id":5,"meta":{"x":90,"y":100}}
2145"#;
2146 let result = preprocess(data).expect("should produce transform");
2147 let restored = reverse(&result.data, &result.metadata);
2148 assert_eq!(restored, data.to_vec());
2149 }
2150
2151 #[test]
2152 fn roundtrip_mixed_types() {
2153 let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
2154{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
2155{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
2156{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
2157{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
2158"#;
2159 let result = preprocess(data).expect("should produce transform");
2160 let restored = reverse(&result.data, &result.metadata);
2161 assert_eq!(restored, data.to_vec());
2162 }
2163
2164 #[test]
2165 fn schema_mismatch_too_few_returns_none() {
2166 let data = br#"{"a":1,"b":2}
2168{"a":1,"c":3}
2169"#;
2170 assert!(preprocess(data).is_none());
2171 }
2172
2173 #[test]
2174 fn different_num_keys_too_few_returns_none() {
2175 let data = br#"{"a":1,"b":2}
2176{"a":1}
2177"#;
2178 assert!(preprocess(data).is_none());
2179 }
2180
2181 #[test]
2182 fn single_line_returns_none() {
2183 let data = br#"{"a":1,"b":2}
2184"#;
2185 assert!(preprocess(data).is_none());
2186 }
2187
2188 #[test]
2189 fn empty_returns_none() {
2190 assert!(preprocess(b"").is_none());
2191 }
2192
2193 #[test]
2194 fn column_layout_groups_similar_values() {
2195 let data = br#"{"type":"page_view","user":"alice"}
2196{"type":"api_call","user":"alice"}
2197{"type":"click","user":"bob"}
2198"#;
2199 let result = preprocess(data).unwrap();
2200
2201 let col_data = &result.data;
2203 let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
2204 assert_eq!(cols.len(), 2);
2205
2206 let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
2208 assert_eq!(type_vals.len(), 3);
2209 assert_eq!(type_vals[0], br#""page_view""#);
2210 assert_eq!(type_vals[1], br#""api_call""#);
2211 assert_eq!(type_vals[2], br#""click""#);
2212
2213 let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2215 assert_eq!(user_vals.len(), 3);
2216 assert_eq!(user_vals[0], br#""alice""#);
2217 assert_eq!(user_vals[1], br#""alice""#);
2218 assert_eq!(user_vals[2], br#""bob""#);
2219 }
2220
2221 #[test]
2222 fn roundtrip_string_with_escaped_chars() {
2223 let data = br#"{"msg":"he said \"hi\"","val":1}
2224{"msg":"line\nbreak","val":2}
2225{"msg":"tab\there","val":3}
2226{"msg":"back\\slash","val":4}
2227{"msg":"normal text","val":5}
2228"#;
2229 let result = preprocess(data).expect("should produce transform");
2230 let restored = reverse(&result.data, &result.metadata);
2231 assert_eq!(restored, data.to_vec());
2232 }
2233
2234 #[test]
2235 fn roundtrip_negative_and_float_numbers() {
2236 let data = br#"{"x":-3.14,"y":0}
2237{"x":2.718,"y":-1}
2238{"x":0.001,"y":999}
2239{"x":-100,"y":-200}
2240{"x":42.0,"y":7}
2241"#;
2242 let result = preprocess(data).expect("should produce transform");
2243 let restored = reverse(&result.data, &result.metadata);
2244 assert_eq!(restored, data.to_vec());
2245 }
2246
2247 #[test]
2250 fn reverse_roundtrip_small_data() {
2251 let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
2253 assert_eq!(vals.len(), 2);
2254 assert_eq!(parts.len(), 3);
2255
2256 let big_data = br#"{"x":-3.14,"y":0}
2258{"x":2.718,"y":-1}
2259"#
2260 .repeat(20);
2261 let result = preprocess(&big_data).expect("should produce transform with 40 rows");
2262 let restored = reverse(&result.data, &result.metadata);
2263 assert_eq!(restored, big_data);
2264 }
2265
2266 #[test]
2269 fn grouped_roundtrip_two_schemas() {
2270 let mut data = Vec::new();
2272 for i in 0..10 {
2273 data.extend_from_slice(
2274 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
2275 );
2276 data.push(b'\n');
2277 }
2278 for i in 10..20 {
2279 data.extend_from_slice(
2280 format!(
2281 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
2282 i, i, i
2283 )
2284 .as_bytes(),
2285 );
2286 data.push(b'\n');
2287 }
2288 let result = preprocess(&data).expect("should produce grouped transform");
2289 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2291 let restored = reverse(&result.data, &result.metadata);
2292 assert_eq!(restored, data);
2293 }
2294
2295 #[test]
2296 fn grouped_roundtrip_interleaved_schemas() {
2297 let mut data = Vec::new();
2299 for i in 0..20 {
2300 if i % 2 == 0 {
2301 data.extend_from_slice(
2302 format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
2303 );
2304 } else {
2305 data.extend_from_slice(
2306 format!(
2307 r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
2308 i, i, i
2309 )
2310 .as_bytes(),
2311 );
2312 }
2313 data.push(b'\n');
2314 }
2315 let result = preprocess(&data).expect("should produce grouped transform");
2316 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2317 let restored = reverse(&result.data, &result.metadata);
2318 assert_eq!(restored, data);
2319 }
2320
2321 #[test]
2322 fn grouped_roundtrip_with_residuals() {
2323 let mut data = Vec::new();
2325 for i in 0..8 {
2327 data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
2328 data.push(b'\n');
2329 }
2330 data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
2332 data.push(b'\n');
2333 data.extend_from_slice(br#"{"p":"q"}"#);
2334 data.push(b'\n');
2335 for i in 0..6 {
2337 data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
2338 data.push(b'\n');
2339 }
2340 let result = preprocess(&data).expect("should produce grouped transform");
2341 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2342 let restored = reverse(&result.data, &result.metadata);
2343 assert_eq!(
2344 String::from_utf8_lossy(&restored),
2345 String::from_utf8_lossy(&data),
2346 );
2347 assert_eq!(restored, data);
2348 }
2349
2350 #[test]
2351 fn grouped_roundtrip_no_trailing_newline() {
2352 let mut data = Vec::new();
2353 for i in 0..6 {
2354 data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
2355 data.push(b'\n');
2356 }
2357 for i in 0..6 {
2358 data.extend_from_slice(
2359 format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
2360 );
2361 if i < 5 {
2362 data.push(b'\n');
2363 }
2364 }
2366 let result = preprocess(&data).expect("should produce grouped transform");
2367 let restored = reverse(&result.data, &result.metadata);
2368 assert_eq!(restored, data);
2369 }
2370
2371 #[test]
2372 fn uniform_still_preferred_over_grouped() {
2373 let data = br#"{"a":1,"b":"x"}
2375{"a":2,"b":"y"}
2376{"a":3,"b":"z"}
2377{"a":4,"b":"w"}
2378{"a":5,"b":"v"}
2379"#;
2380 let result = preprocess(data).expect("should produce transform");
2381 assert_eq!(
2382 result.metadata[0], METADATA_VERSION_UNIFORM,
2383 "uniform schema should use Strategy 1"
2384 );
2385 let restored = reverse(&result.data, &result.metadata);
2386 assert_eq!(restored, data.to_vec());
2387 }
2388
2389 #[test]
2390 fn grouped_gharchive_simulation() {
2391 let mut data = Vec::new();
2393 for i in 0..50 {
2394 if i % 5 == 0 {
2395 data.extend_from_slice(
2397 format!(
2398 r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
2399 i, i, i, i
2400 )
2401 .as_bytes(),
2402 );
2403 } else {
2404 data.extend_from_slice(
2406 format!(
2407 r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
2408 i, i, i
2409 )
2410 .as_bytes(),
2411 );
2412 }
2413 data.push(b'\n');
2414 }
2415 let result = preprocess(&data).expect("should produce grouped transform");
2416 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2417 let restored = reverse(&result.data, &result.metadata);
2418 assert_eq!(restored, data);
2419 }
2420
2421 #[test]
2422 fn grouped_nested_flatten_per_group() {
2423 let mut data = Vec::new();
2426 for i in 0..30 {
2427 if i % 3 == 0 {
2428 data.extend_from_slice(
2430 format!(
2431 r#"{{"id":{},"info":{{"a":{},"b":{}}},"tag":"b","extra":"yes"}}"#,
2432 i,
2433 i * 10,
2434 i * 20
2435 )
2436 .as_bytes(),
2437 );
2438 } else {
2439 data.extend_from_slice(
2441 format!(
2442 r#"{{"id":{},"info":{{"a":{},"b":{}}},"tag":"a"}}"#,
2443 i,
2444 i * 10,
2445 i * 20
2446 )
2447 .as_bytes(),
2448 );
2449 }
2450 data.push(b'\n');
2451 }
2452 let result = preprocess(&data).expect("should produce grouped transform");
2453 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2454 let restored = reverse(&result.data, &result.metadata);
2455 assert_eq!(restored, data, "grouped nested flatten roundtrip failed");
2456 }
2457
2458 #[test]
2459 fn grouped_discriminator_two_pass() {
2460 let mut data = Vec::new();
2463 for i in 0..60 {
2464 let etype = if i % 2 == 0 { "push" } else { "create" };
2465 data.extend_from_slice(
2466 format!(
2467 r#"{{"id":{},"type":"{}","payload":{{"ref":"r{}","size":{}}}}}"#,
2468 i,
2469 etype,
2470 i,
2471 i * 10
2472 )
2473 .as_bytes(),
2474 );
2475 data.push(b'\n');
2476 }
2477 let result = preprocess(&data).expect("should produce transform");
2478 let restored = reverse(&result.data, &result.metadata);
2479 assert_eq!(restored, data, "discriminator two-pass roundtrip failed");
2480 }
2481
2482 #[test]
2485 fn test_nested_decomposition_basic() {
2486 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2488{"id":2,"meta":{"x":30,"y":40}}
2489{"id":3,"meta":{"x":50,"y":60}}
2490"#;
2491 let result = preprocess(data).expect("should produce transform");
2492 assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
2493
2494 let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
2496 assert_eq!(
2498 cols.len(),
2499 3,
2500 "should have 3 columns after flattening: got {}",
2501 cols.len()
2502 );
2503
2504 let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2506 assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
2507
2508 let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
2509 assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
2510 }
2511
2512 #[test]
2513 fn test_nested_roundtrip() {
2514 let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2516{"id":2,"meta":{"x":30,"y":40}}
2517{"id":3,"meta":{"x":50,"y":60}}
2518"#;
2519 let result = preprocess(data).expect("should produce transform");
2520 let restored = reverse(&result.data, &result.metadata);
2521 assert_eq!(
2522 String::from_utf8_lossy(&restored),
2523 String::from_utf8_lossy(data),
2524 );
2525 assert_eq!(restored, data.to_vec());
2526 }
2527
2528 #[test]
2529 fn test_nested_mixed_schemas() {
2530 let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
2532{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
2533{"ts":"c","meta":{"query":"pricing","results_count":25}}
2534{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
2535{"ts":"e","meta":{"query":"api docs","results_count":41}}
2536"#;
2537 let result = preprocess(data).expect("should produce transform");
2538 let restored = reverse(&result.data, &result.metadata);
2539 assert_eq!(
2540 String::from_utf8_lossy(&restored),
2541 String::from_utf8_lossy(data),
2542 );
2543 assert_eq!(restored, data.to_vec());
2544 }
2545
2546 #[test]
2547 fn test_nested_no_nested_objects() {
2548 let data = br#"{"a":1,"b":"x"}
2550{"a":2,"b":"y"}
2551{"a":3,"b":"z"}
2552"#;
2553 let result = preprocess(data).expect("should produce transform");
2554 let restored = reverse(&result.data, &result.metadata);
2555 assert_eq!(restored, data.to_vec());
2556
2557 let meta = &result.metadata;
2563 let last_byte = meta[meta.len() - 1];
2564 assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
2565 }
2566
2567 #[test]
2568 fn test_nested_real_corpus() {
2569 let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
2571{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
2572{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
2573{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
2574{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
2575"#;
2576 let result = preprocess(data).expect("should produce transform");
2577 let restored = reverse(&result.data, &result.metadata);
2578 assert_eq!(
2579 String::from_utf8_lossy(&restored),
2580 String::from_utf8_lossy(data),
2581 );
2582 assert_eq!(restored, data.to_vec());
2583 }
2584
2585 #[test]
2586 fn test_nested_roundtrip_with_null_values() {
2587 let data = br#"{"id":1,"meta":{"x":10}}
2589{"id":2,"meta":null}
2590{"id":3,"meta":{"x":30}}
2591{"id":4,"meta":null}
2592{"id":5,"meta":{"x":50}}
2593"#;
2594 let result = preprocess(data).expect("should produce transform");
2595 let restored = reverse(&result.data, &result.metadata);
2596 assert_eq!(restored, data.to_vec());
2597 }
2598
2599 #[test]
2600 fn test_nested_string_values_preserved_exact() {
2601 let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2603{"id":2,"meta":{"name":"Bob","score":200}}
2604{"id":3,"meta":{"name":"Charlie","score":300}}
2605"#;
2606 let result = preprocess(data).expect("should produce transform");
2607 let restored = reverse(&result.data, &result.metadata);
2608 assert_eq!(restored, data.to_vec());
2609 }
2610
2611 #[test]
2612 fn test_parse_nested_object_kv() {
2613 let obj = br#"{"query":"benchmark","results_count":14}"#;
2614 let pairs = parse_nested_object_kv(obj).unwrap();
2615 assert_eq!(pairs.len(), 2);
2616 assert_eq!(pairs[0].0, b"query");
2617 assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2618 assert_eq!(pairs[1].0, b"results_count");
2619 assert_eq!(pairs[1].1, b"14");
2620 }
2621
2622 #[test]
2623 fn test_nested_varying_subkeys_roundtrip() {
2624 let mut lines = Vec::new();
2627 for i in 0..50 {
2628 let line = if i % 2 == 0 {
2629 format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
2630 } else {
2631 format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
2632 };
2633 lines.push(line);
2634 }
2635 let ndjson = lines.join("\n") + "\n";
2636 let data = ndjson.as_bytes();
2637
2638 let result = preprocess(data).expect("should produce transform");
2639 let restored = reverse(&result.data, &result.metadata);
2640 assert_eq!(
2641 std::str::from_utf8(&restored).unwrap(),
2642 std::str::from_utf8(data).unwrap(),
2643 "varying sub-keys roundtrip must be byte-exact"
2644 );
2645 }
2646
2647 #[test]
2648 fn test_nested_explicit_null_preserved() {
2649 let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
2652 {\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
2653 {\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
2654 let result = preprocess(data).expect("should produce transform");
2655 let restored = reverse(&result.data, &result.metadata);
2656 assert_eq!(
2657 std::str::from_utf8(&restored).unwrap(),
2658 std::str::from_utf8(data).unwrap(),
2659 "explicit null values must be preserved"
2660 );
2661 }
2662
2663 #[test]
2664 fn null_heavy_30_rows_roundtrip() {
2665 let mut data = Vec::new();
2667 for i in 0..30 {
2668 data.extend_from_slice(format!("{{\"id\":{},\"val\":null}}\n", i).as_bytes());
2669 }
2670 let result = preprocess(&data);
2671 if let Some(result) = result {
2672 let restored = reverse(&result.data, &result.metadata);
2673 assert_eq!(
2674 restored,
2675 data,
2676 "null-heavy 30-row roundtrip failed.\nOriginal len={}, Restored len={}\nOrig first 200: {:?}\nRest first 200: {:?}",
2677 data.len(),
2678 restored.len(),
2679 String::from_utf8_lossy(&data[..data.len().min(200)]),
2680 String::from_utf8_lossy(&restored[..restored.len().min(200)])
2681 );
2682 }
2683 }
2684
2685 #[test]
2686 fn null_heavy_60_rows_roundtrip() {
2687 let mut data = Vec::new();
2689 for i in 0..60 {
2690 let name = if i % 10 == 0 {
2691 format!("\"user_{}\"", i)
2692 } else {
2693 "null".to_string()
2694 };
2695 data.extend_from_slice(
2696 format!("{{\"id\":{},\"name\":{},\"email\":null,\"score\":null,\"active\":null,\"tags\":null}}\n", i, name).as_bytes(),
2697 );
2698 }
2699 let result = preprocess(&data);
2700 if let Some(result) = result {
2701 let restored = reverse(&result.data, &result.metadata);
2702 assert_eq!(restored, data, "null-heavy 60-row ndjson roundtrip failed");
2703 }
2704 }
2705
2706 #[test]
2707 fn selective_columnar_roundtrip() {
2708 let mut data = Vec::new();
2710 for i in 0..50 {
2711 let event_type = match i % 3 {
2712 0 => "push",
2713 1 => "pull_request",
2714 _ => "create",
2715 };
2716 let payload = format!(
2718 "{{\"commits\":[{{\"sha\":\"abc{:04}def\",\"message\":\"commit message number {} with extra text to make it long enough for selective columnar threshold of 128 bytes average value length\"}}]}}",
2719 i, i
2720 );
2721 data.extend_from_slice(
2722 format!(
2723 "{{\"id\":{},\"type\":\"{}\",\"payload\":{}}}\n",
2724 i, event_type, payload
2725 )
2726 .as_bytes(),
2727 );
2728 }
2729 let result = preprocess(&data).expect("should preprocess");
2730 let restored = reverse(&result.data, &result.metadata);
2731 assert_eq!(restored, data, "selective columnar roundtrip failed");
2732 }
2733
2734 #[test]
2735 fn selective_columnar_uses_version3() {
2736 let mut data = Vec::new();
2739 for i in 0..50 {
2740 let payload = format!(
2741 "{{\"data\":\"unique_payload_{:04}\",\"extra\":\"padding_text_to_make_this_value_long_enough_to_exceed_the_128_byte_threshold_for_selective_columnar_detection_{:04}\"}}",
2742 i,
2743 i * 7
2744 );
2745 data.extend_from_slice(
2746 format!("{{\"type\":\"event\",\"payload\":{}}}\n", payload).as_bytes(),
2747 );
2748 }
2749 let result = preprocess(&data).expect("should preprocess");
2750 assert_eq!(
2752 result.metadata[0], METADATA_VERSION_SELECTIVE,
2753 "expected selective columnar (version=3), got version={}",
2754 result.metadata[0]
2755 );
2756 let restored = reverse(&result.data, &result.metadata);
2757 assert_eq!(restored, data, "selective columnar v3 roundtrip failed");
2758 }
2759
2760 #[test]
2761 fn selective_grouped_roundtrip() {
2762 let mut data = Vec::new();
2764 for i in 0..30 {
2765 let payload = format!(
2766 "{{\"sha\":\"hash_{:04}\",\"msg\":\"unique commit message number {} that is quite long and needs to exceed the 128 byte threshold for selective columnar to activate on this column\"}}",
2767 i, i
2768 );
2769 data.extend_from_slice(
2770 format!(
2771 "{{\"id\":{},\"type\":\"push\",\"payload\":{}}}\n",
2772 i, payload
2773 )
2774 .as_bytes(),
2775 );
2776 }
2777 for i in 0..20 {
2778 let body = format!(
2779 "{{\"title\":\"PR title {}\",\"body\":\"This is a long unique pull request body number {} with details and extra text to exceed the 128 byte threshold for the selective columnar transform\"}}",
2780 i, i
2781 );
2782 data.extend_from_slice(
2783 format!(
2784 "{{\"id\":{},\"type\":\"pr\",\"payload\":{},\"org\":\"myorg\"}}\n",
2785 100 + i,
2786 body
2787 )
2788 .as_bytes(),
2789 );
2790 }
2791 let result = preprocess(&data).expect("should preprocess");
2792 assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2794 let restored = reverse(&result.data, &result.metadata);
2795 assert_eq!(restored, data, "selective grouped roundtrip failed");
2796 }
2797
2798 #[test]
2799 fn selective_all_low_cardinality_stays_uniform() {
2800 let mut data = Vec::new();
2802 for i in 0..50 {
2803 let status = match i % 2 {
2804 0 => "active",
2805 _ => "inactive",
2806 };
2807 data.extend_from_slice(
2808 format!("{{\"type\":\"event\",\"status\":\"{}\"}}\n", status).as_bytes(),
2809 );
2810 }
2811 let result = preprocess(&data).expect("should preprocess");
2812 assert_eq!(
2814 result.metadata[0], METADATA_VERSION_UNIFORM,
2815 "low-cardinality data should use uniform (version=1)"
2816 );
2817 }
2818
2819 #[test]
2820 fn selective_columnar_single_inline_column() {
2821 let mut data = Vec::new();
2823 for i in 0..30 {
2824 let unique_msg = format!(
2825 "A unique message for row {} with enough text to exceed the 128 byte threshold for selective columnar detection, adding padding here to be safe: extra_{:04}",
2826 i,
2827 i * 13
2828 );
2829 data.extend_from_slice(
2830 format!(
2831 "{{\"type\":\"log\",\"level\":\"info\",\"msg\":\"{}\"}}\n",
2832 unique_msg
2833 )
2834 .as_bytes(),
2835 );
2836 }
2837 let result = preprocess(&data).expect("should preprocess");
2838 let restored = reverse(&result.data, &result.metadata);
2839 assert_eq!(restored, data, "single-inline-column roundtrip failed");
2840 }
2841}