Skip to main content

datacortex_core/format/
ndjson.rs

1//! NDJSON columnar reorg — lossless transform that reorders row-oriented
2//! NDJSON data into column-oriented layout.
3//!
4//! Two strategies:
5//!
6//! **Strategy 1 (uniform):** All rows share the same schema (keys in same order).
7//!   Row-oriented (before):
8//!     {"ts":"2026-03-15T10:30:00.001Z","type":"page_view","user":"usr_a1b2c3d4"}
9//!     {"ts":"2026-03-15T10:30:00.234Z","type":"api_call","user":"usr_a1b2c3d4"}
10//!   Column-oriented (after):
11//!     [ts values] "2026-03-15T10:30:00.001Z" \x01 "2026-03-15T10:30:00.234Z" \x00
12//!     [type values] "page_view" \x01 "api_call" \x00
13//!     [user values] "usr_a1b2c3d4" \x01 "usr_a1b2c3d4"
14//!
15//! **Strategy 2 (grouped):** Rows have diverse schemas (e.g., GitHub Archive events).
16//!   Groups rows by schema, applies Strategy 1 per group, stores residual rows raw.
17//!   Metadata version byte distinguishes: 1 = uniform, 2 = grouped.
18//!
19//! Separators:
20//!   \x00 = column separator (cannot appear in valid JSON text)
21//!   \x01 = value separator within a column (cannot appear in valid JSON)
22
23use super::transform::TransformResult;
24use std::collections::HashMap;
25
26const COL_SEP: u8 = 0x00;
27const VAL_SEP: u8 = 0x01;
28const METADATA_VERSION_UNIFORM: u8 = 1;
29const METADATA_VERSION_GROUPED: u8 = 2;
30
31/// Minimum rows in a schema group for it to be columnarized (not residual).
32const MIN_GROUP_ROWS: usize = 5;
33
34/// A schema group: template parts + list of (row_index, parsed_values).
35type SchemaGroup = (Vec<Vec<u8>>, Vec<(usize, Vec<Vec<u8>>)>);
36
37/// Extract the raw value bytes from a JSON line at a given position.
38/// `pos` should point to the first byte of the value (after `:`).
39/// Returns (value_bytes, end_position).
40///
41/// Handles: strings, numbers, booleans, null, nested objects, nested arrays.
42fn extract_value(line: &[u8], mut pos: usize) -> Option<(Vec<u8>, usize)> {
43    // Skip whitespace after colon.
44    while pos < line.len() && line[pos].is_ascii_whitespace() {
45        pos += 1;
46    }
47    if pos >= line.len() {
48        return None;
49    }
50
51    let start = pos;
52    match line[pos] {
53        b'"' => {
54            // String value — scan to closing unescaped quote.
55            pos += 1;
56            let mut escaped = false;
57            while pos < line.len() {
58                if escaped {
59                    escaped = false;
60                } else if line[pos] == b'\\' {
61                    escaped = true;
62                } else if line[pos] == b'"' {
63                    pos += 1;
64                    return Some((line[start..pos].to_vec(), pos));
65                }
66                pos += 1;
67            }
68            None // Unterminated string.
69        }
70        b'{' => {
71            // Nested object — match braces, respecting strings.
72            let mut depth = 1;
73            pos += 1;
74            while pos < line.len() && depth > 0 {
75                match line[pos] {
76                    b'"' => {
77                        // Skip over string contents.
78                        pos += 1;
79                        let mut escaped = false;
80                        while pos < line.len() {
81                            if escaped {
82                                escaped = false;
83                            } else if line[pos] == b'\\' {
84                                escaped = true;
85                            } else if line[pos] == b'"' {
86                                break;
87                            }
88                            pos += 1;
89                        }
90                    }
91                    b'{' => depth += 1,
92                    b'}' => depth -= 1,
93                    _ => {}
94                }
95                pos += 1;
96            }
97            if depth != 0 || pos > line.len() {
98                return None; // Unterminated object.
99            }
100            Some((line[start..pos].to_vec(), pos))
101        }
102        b'[' => {
103            // Nested array — match brackets, respecting strings.
104            let mut depth = 1;
105            pos += 1;
106            while pos < line.len() && depth > 0 {
107                match line[pos] {
108                    b'"' => {
109                        pos += 1;
110                        let mut escaped = false;
111                        while pos < line.len() {
112                            if escaped {
113                                escaped = false;
114                            } else if line[pos] == b'\\' {
115                                escaped = true;
116                            } else if line[pos] == b'"' {
117                                break;
118                            }
119                            pos += 1;
120                        }
121                    }
122                    b'[' => depth += 1,
123                    b']' => depth -= 1,
124                    _ => {}
125                }
126                pos += 1;
127            }
128            if depth != 0 || pos > line.len() {
129                return None; // Unterminated array.
130            }
131            Some((line[start..pos].to_vec(), pos))
132        }
133        _ => {
134            // Number, boolean, null — scan until , or } or ] or whitespace.
135            while pos < line.len() {
136                match line[pos] {
137                    b',' | b'}' | b']' => break,
138                    _ if line[pos].is_ascii_whitespace() => break,
139                    _ => pos += 1,
140                }
141            }
142            if pos == start {
143                None
144            } else {
145                Some((line[start..pos].to_vec(), pos))
146            }
147        }
148    }
149}
150
151/// Parse a single JSON line into (template_parts, values).
152///
153/// Template parts are the structural bytes between values:
154///   Part 0 = everything from { up to and including the : before value 0
155///   Part 1 = everything from after value 0 up to and including : before value 1
156///   ...
157///   Part N = everything from after the last value to end of line (including } and \n)
158///
159/// Returns None if the line is not a flat-ish JSON object (we handle nested values
160/// as opaque blobs, but the top-level structure must be key:value pairs).
161type ParsedLine = (Vec<Vec<u8>>, Vec<Vec<u8>>);
162
163fn parse_line(line: &[u8]) -> Option<ParsedLine> {
164    let mut pos = 0;
165
166    // Skip leading whitespace.
167    while pos < line.len() && line[pos].is_ascii_whitespace() {
168        pos += 1;
169    }
170    if pos >= line.len() || line[pos] != b'{' {
171        return None;
172    }
173
174    let mut parts: Vec<Vec<u8>> = Vec::new();
175    let mut values: Vec<Vec<u8>> = Vec::new();
176    let mut part_start = 0;
177
178    pos += 1; // Skip opening {.
179
180    loop {
181        // Skip whitespace.
182        while pos < line.len() && line[pos].is_ascii_whitespace() {
183            pos += 1;
184        }
185        if pos >= line.len() {
186            return None;
187        }
188
189        // Check for closing brace (end of object).
190        if line[pos] == b'}' {
191            // Capture the final part: everything from part_start to end of line.
192            parts.push(line[part_start..].to_vec());
193            break;
194        }
195
196        // Expect a key string.
197        if line[pos] != b'"' {
198            return None;
199        }
200        // Skip over the key string.
201        pos += 1;
202        let mut escaped = false;
203        while pos < line.len() {
204            if escaped {
205                escaped = false;
206            } else if line[pos] == b'\\' {
207                escaped = true;
208            } else if line[pos] == b'"' {
209                pos += 1;
210                break;
211            }
212            pos += 1;
213        }
214
215        // Skip whitespace, expect colon.
216        while pos < line.len() && line[pos].is_ascii_whitespace() {
217            pos += 1;
218        }
219        if pos >= line.len() || line[pos] != b':' {
220            return None;
221        }
222        pos += 1; // Skip colon.
223
224        // Everything from part_start up to here is a "template part".
225        parts.push(line[part_start..pos].to_vec());
226
227        // Extract the value.
228        let (value, value_end) = extract_value(line, pos)?;
229        values.push(value);
230        pos = value_end;
231
232        // Mark the start of the next part.
233        part_start = pos;
234
235        // Skip whitespace.
236        while pos < line.len() && line[pos].is_ascii_whitespace() {
237            pos += 1;
238        }
239        if pos >= line.len() {
240            return None;
241        }
242
243        // Expect comma or closing brace.
244        if line[pos] == b',' {
245            pos += 1;
246        } else if line[pos] == b'}' {
247            // Will be caught at the top of the loop next iteration — but we
248            // need to NOT advance pos, so the } check above catches it.
249            // Actually, let's just handle it here.
250            parts.push(line[part_start..].to_vec());
251            break;
252        } else {
253            return None; // Unexpected character.
254        }
255    }
256
257    if values.is_empty() {
258        return None;
259    }
260
261    Some((parts, values))
262}
263
264/// Split NDJSON data into lines (without newline characters).
265fn split_lines(data: &[u8]) -> Vec<&[u8]> {
266    let mut lines: Vec<&[u8]> = Vec::new();
267    let mut start = 0;
268    for i in 0..data.len() {
269        if data[i] == b'\n' {
270            lines.push(&data[start..i]);
271            start = i + 1;
272        }
273    }
274    if start < data.len() {
275        lines.push(&data[start..]);
276    }
277    lines
278}
279
280/// Build columnar data from parsed lines that share the same template.
281/// Returns (col_data, metadata) for a uniform group.
282fn build_uniform_columnar(
283    template_parts: &[Vec<u8>],
284    columns: &[Vec<Vec<u8>>],
285    num_rows: usize,
286    has_trailing_newline: bool,
287) -> (Vec<u8>, Vec<u8>) {
288    let num_cols = columns.len();
289
290    // Build column data: values separated by \x01, columns separated by \x00.
291    let mut col_data = Vec::new();
292    for (ci, col) in columns.iter().enumerate() {
293        for (ri, val) in col.iter().enumerate() {
294            col_data.extend_from_slice(val);
295            if ri < num_rows - 1 {
296                col_data.push(VAL_SEP);
297            }
298        }
299        if ci < num_cols - 1 {
300            col_data.push(COL_SEP);
301        }
302    }
303
304    // Build metadata: version + num_rows + num_cols + trailing_newline + template parts.
305    let mut metadata = Vec::new();
306    metadata.push(METADATA_VERSION_UNIFORM);
307    metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
308    metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
309    metadata.push(if has_trailing_newline { 1 } else { 0 });
310    metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
311    for part in template_parts {
312        metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
313        metadata.extend_from_slice(part);
314    }
315
316    (col_data, metadata)
317}
318
319/// Strategy 1: Uniform schema — all rows must have the same template.
320/// Returns None if schemas differ.
321fn preprocess_uniform(
322    non_empty: &[&[u8]],
323    has_trailing_newline: bool,
324) -> Option<(Vec<u8>, Vec<u8>)> {
325    if non_empty.len() < 2 {
326        return None;
327    }
328
329    let (template_parts, first_values) = parse_line(non_empty[0])?;
330    let num_cols = first_values.len();
331    if template_parts.len() != num_cols + 1 {
332        return None;
333    }
334
335    let mut columns: Vec<Vec<Vec<u8>>> = Vec::with_capacity(num_cols);
336    for v in &first_values {
337        columns.push(vec![v.clone()]);
338    }
339
340    for &line in &non_empty[1..] {
341        let (parts, values) = parse_line(line)?;
342        if values.len() != num_cols || parts.len() != template_parts.len() {
343            return None;
344        }
345        for (a, b) in parts.iter().zip(template_parts.iter()) {
346            if a != b {
347                return None;
348            }
349        }
350        for (col, val) in values.iter().enumerate() {
351            columns[col].push(val.clone());
352        }
353    }
354
355    Some(build_uniform_columnar(
356        &template_parts,
357        &columns,
358        non_empty.len(),
359        has_trailing_newline,
360    ))
361}
362
363/// Strategy 2: Group-by-schema — group rows by template, columnarize each group.
364///
365/// Metadata format (version=2):
366///   version: u8 = 2
367///   has_trailing_newline: u8
368///   total_rows: u32 LE
369///   num_groups: u16 LE
370///   for each group:
371///     num_rows: u32 LE
372///     row_indices: [u32 LE * num_rows]
373///     group_metadata_len: u32 LE
374///     group_metadata: [bytes]  (Strategy 1 metadata for this group)
375///   residual_count: u32 LE
376///   residual_indices: [u32 LE * residual_count]
377///
378/// Data format:
379///   for each group:
380///     data_len: u32 LE
381///     data: [bytes]  (columnar data for this group)
382///   residual_data: [bytes]  (raw lines joined by \n)
383fn preprocess_grouped(
384    non_empty: &[&[u8]],
385    has_trailing_newline: bool,
386) -> Option<(Vec<u8>, Vec<u8>)> {
387    if non_empty.len() < MIN_GROUP_ROWS {
388        return None;
389    }
390
391    // Parse all lines and group by template (the template parts identify the schema).
392    // We use the template parts as the group key.
393    let mut parsed: Vec<Option<ParsedLine>> = Vec::with_capacity(non_empty.len());
394    for &line in non_empty {
395        parsed.push(parse_line(line));
396    }
397
398    // Group rows by template. Key = template parts (as bytes for hashing).
399    // We store groups as: template_key -> (template_parts, vec of (row_index, values)).
400    let mut group_map: HashMap<Vec<u8>, SchemaGroup> = HashMap::new();
401    let mut residual_indices: Vec<usize> = Vec::new();
402
403    for (idx, parsed_line) in parsed.into_iter().enumerate() {
404        if let Some((parts, values)) = parsed_line {
405            // Build a hashable key from the template parts.
406            let mut key = Vec::new();
407            for part in &parts {
408                key.extend_from_slice(&(part.len() as u32).to_le_bytes());
409                key.extend_from_slice(part);
410            }
411            group_map
412                .entry(key)
413                .or_insert_with(|| (parts, Vec::new()))
414                .1
415                .push((idx, values));
416        } else {
417            // Unparseable line goes to residual.
418            residual_indices.push(idx);
419        }
420    }
421
422    // Separate groups into qualifying (>= MIN_GROUP_ROWS) and residual.
423    let mut groups: Vec<SchemaGroup> = Vec::new();
424    for (_key, (template_parts, rows)) in group_map {
425        if rows.len() >= MIN_GROUP_ROWS {
426            groups.push((template_parts, rows));
427        } else {
428            // Too few rows — send to residual.
429            for (idx, _) in &rows {
430                residual_indices.push(*idx);
431            }
432        }
433    }
434
435    // Need at least 1 qualifying group for this to be useful.
436    if groups.is_empty() {
437        return None;
438    }
439
440    // Sort groups by their first row index for deterministic output.
441    groups.sort_by_key(|(_, rows)| rows[0].0);
442    residual_indices.sort_unstable();
443
444    // Build per-group columnar data and metadata.
445    struct GroupOutput {
446        row_indices: Vec<u32>,
447        col_data: Vec<u8>,
448        group_metadata: Vec<u8>,
449    }
450
451    let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
452
453    for (template_parts, rows) in &groups {
454        let num_cols = template_parts.len() - 1;
455        let mut columns: Vec<Vec<Vec<u8>>> = (0..num_cols).map(|_| Vec::new()).collect();
456        let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
457
458        for (idx, values) in rows {
459            row_indices.push(*idx as u32);
460            for (col, val) in values.iter().enumerate() {
461                columns[col].push(val.clone());
462            }
463        }
464
465        // Build columnar data for this group (trailing_newline=false for sub-groups).
466        let (col_data, group_metadata) =
467            build_uniform_columnar(template_parts, &columns, rows.len(), false);
468
469        group_outputs.push(GroupOutput {
470            row_indices,
471            col_data,
472            group_metadata,
473        });
474    }
475
476    // Build the combined data blob.
477    let mut data_out = Vec::new();
478    for group in &group_outputs {
479        data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
480        data_out.extend_from_slice(&group.col_data);
481    }
482
483    // Append residual lines (raw, separated by \n).
484    let residual_start = data_out.len();
485    for (i, &idx) in residual_indices.iter().enumerate() {
486        data_out.extend_from_slice(non_empty[idx]);
487        if i < residual_indices.len() - 1 {
488            data_out.push(b'\n');
489        }
490    }
491    let _residual_len = data_out.len() - residual_start;
492
493    // Build the combined metadata.
494    let mut metadata = Vec::new();
495    metadata.push(METADATA_VERSION_GROUPED);
496    metadata.push(if has_trailing_newline { 1 } else { 0 });
497    metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
498    metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
499
500    for group in &group_outputs {
501        metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
502        for &idx in &group.row_indices {
503            metadata.extend_from_slice(&idx.to_le_bytes());
504        }
505        metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
506        metadata.extend_from_slice(&group.group_metadata);
507    }
508
509    metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
510    for &idx in &residual_indices {
511        metadata.extend_from_slice(&(idx as u32).to_le_bytes());
512    }
513
514    Some((data_out, metadata))
515}
516
517/// Metadata describing which columns were flattened from nested objects.
518pub(crate) struct NestedGroupInfo {
519    /// Index of the original column that was expanded.
520    pub(crate) original_col_index: u16,
521    /// Sub-key names for the expanded sub-columns.
522    pub(crate) sub_keys: Vec<Vec<u8>>,
523    /// Template parts for reconstructing nested objects (preserves original formatting).
524    /// If empty, compact format `{"key":val,...}` is used (NDJSON compatibility).
525    pub(crate) nested_template: Vec<Vec<u8>>,
526    /// Absence bitmap: one bit per (sub_key, row). Bit=1 means the key was ABSENT
527    /// in the original object (not present at all), vs bit=0 means key was present
528    /// (possibly with explicit `null`). Packed LSB-first.
529    /// Length = ceil(num_sub_keys * num_rows / 8).
530    /// Empty if all rows had all keys (no absences).
531    pub(crate) absence_bitmap: Vec<u8>,
532}
533
534/// Attempt to flatten nested JSON objects in columnar data (depth-1).
535///
536/// Takes columnar data (\x00/\x01 separated) and returns expanded columnar data
537/// with nested objects decomposed into sub-columns.
538/// Returns None if no nested objects found.
539pub(crate) fn flatten_nested_columns(
540    col_data: &[u8],
541    num_rows: usize,
542) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
543    // Split into columns.
544    let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
545    if columns.is_empty() || num_rows == 0 {
546        return None;
547    }
548
549    let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
550    // Build the output columns: for non-nested cols, keep as-is.
551    // For nested cols, replace with sub-columns.
552    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
553
554    for (col_idx, &col_chunk) in columns.iter().enumerate() {
555        let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
556        if values.len() != num_rows {
557            return None;
558        }
559
560        // Check if ALL non-null values start with '{' (nested object).
561        let mut all_objects = true;
562        let mut has_non_null = false;
563        for val in &values {
564            if *val == b"null" {
565                continue;
566            }
567            has_non_null = true;
568            if !val.starts_with(b"{") {
569                all_objects = false;
570                break;
571            }
572        }
573
574        if !all_objects || !has_non_null {
575            // Not a nested-object column — keep as-is.
576            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
577            output_columns.push(col_values);
578            continue;
579        }
580
581        // This column contains nested objects — decompose depth-1.
582        // Parse all values and collect all unique sub-keys (preserving discovery order).
583        // Also capture the template from the first non-null object to preserve formatting.
584        let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
585        let mut nested_template: Vec<Vec<u8>> = Vec::new();
586        type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
587        let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
588
589        for val in &values {
590            if *val == b"null" {
591                parsed_rows.push(None);
592                continue;
593            }
594            if nested_template.is_empty() {
595                // First non-null: use template-preserving parser.
596                match parse_nested_object_with_template(val) {
597                    Some((template, kv_pairs)) => {
598                        for (key, _) in &kv_pairs {
599                            if !all_sub_keys.iter().any(|k| k == key) {
600                                all_sub_keys.push(key.clone());
601                            }
602                        }
603                        nested_template = template;
604                        parsed_rows.push(Some(kv_pairs));
605                    }
606                    None => {
607                        all_sub_keys.clear();
608                        break;
609                    }
610                }
611            } else {
612                // Subsequent rows: use simpler kv parser (template already captured).
613                match parse_nested_object_kv(val) {
614                    Some(kv_pairs) => {
615                        for (key, _) in &kv_pairs {
616                            if !all_sub_keys.iter().any(|k| k == key) {
617                                all_sub_keys.push(key.clone());
618                            }
619                        }
620                        parsed_rows.push(Some(kv_pairs));
621                    }
622                    None => {
623                        all_sub_keys.clear();
624                        break;
625                    }
626                }
627            }
628        }
629
630        if all_sub_keys.is_empty() {
631            // Could not parse — keep column as-is.
632            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
633            output_columns.push(col_values);
634            continue;
635        }
636
637        // Build sub-columns: for each sub-key, extract values from parsed rows.
638        // Also build an absence bitmap: bit=1 where a key was absent from the
639        // original row (as opposed to being present with explicit `null`).
640        let num_sub_keys = all_sub_keys.len();
641        let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
642        let total_bits = num_sub_keys * num_rows;
643        let bitmap_bytes = total_bits.div_ceil(8);
644        let mut absence_bitmap = vec![0u8; bitmap_bytes];
645        let mut has_any_absent = false;
646
647        for (row_idx, parsed) in parsed_rows.iter().enumerate() {
648            match parsed {
649                Some(kv_pairs) => {
650                    for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
651                        let found = kv_pairs.iter().find(|(k, _)| k == sk);
652                        match found {
653                            Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
654                            None => {
655                                sub_columns[sk_idx].push(b"null".to_vec());
656                                // Mark this (sub_key, row) as absent.
657                                let bit_idx = sk_idx * num_rows + row_idx;
658                                absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
659                                has_any_absent = true;
660                            }
661                        }
662                    }
663                }
664                None => {
665                    // null row — all sub-columns get null.
666                    // These are NOT marked as absent (the whole column was null,
667                    // not individual keys missing).
668                    for sc in sub_columns.iter_mut() {
669                        sc.push(b"null".to_vec());
670                    }
671                }
672            }
673        }
674
675        nested_groups.push(NestedGroupInfo {
676            original_col_index: col_idx as u16,
677            sub_keys: all_sub_keys,
678            nested_template,
679            absence_bitmap: if has_any_absent {
680                absence_bitmap
681            } else {
682                Vec::new()
683            },
684        });
685
686        for sc in sub_columns {
687            output_columns.push(sc);
688        }
689    }
690
691    if nested_groups.is_empty() {
692        return None;
693    }
694
695    // Build the flattened columnar data.
696    let num_out_cols = output_columns.len();
697    let mut out = Vec::new();
698    for (ci, col) in output_columns.iter().enumerate() {
699        for (ri, val) in col.iter().enumerate() {
700            out.extend_from_slice(val);
701            if ri < num_rows - 1 {
702                out.push(VAL_SEP);
703            }
704        }
705        if ci < num_out_cols - 1 {
706            out.push(COL_SEP);
707        }
708    }
709
710    Some((out, nested_groups))
711}
712
713/// Parse a nested JSON object into (template_parts, kv_pairs).
714/// Template parts include all structural bytes (braces, keys, colons, whitespace) —
715/// preserving the original formatting so the object can be reconstructed exactly.
716/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
717#[allow(clippy::type_complexity)]
718pub(crate) fn parse_nested_object_with_template(
719    obj: &[u8],
720) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
721    let mut pos = 0;
722
723    // Skip whitespace.
724    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
725        pos += 1;
726    }
727    if pos >= obj.len() || obj[pos] != b'{' {
728        return None;
729    }
730    pos += 1;
731
732    let mut parts: Vec<Vec<u8>> = Vec::new();
733    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
734    let mut part_start = 0;
735
736    loop {
737        // Skip whitespace.
738        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
739            pos += 1;
740        }
741        if pos >= obj.len() {
742            return None;
743        }
744        if obj[pos] == b'}' {
745            parts.push(obj[part_start..].to_vec());
746            break;
747        }
748
749        // Expect a key string.
750        if obj[pos] != b'"' {
751            return None;
752        }
753        let key_str_start = pos + 1;
754        pos += 1;
755        let mut escaped = false;
756        while pos < obj.len() {
757            if escaped {
758                escaped = false;
759            } else if obj[pos] == b'\\' {
760                escaped = true;
761            } else if obj[pos] == b'"' {
762                break;
763            }
764            pos += 1;
765        }
766        if pos >= obj.len() {
767            return None;
768        }
769        let key = obj[key_str_start..pos].to_vec();
770        pos += 1; // skip closing quote
771
772        // Skip whitespace, expect colon.
773        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
774            pos += 1;
775        }
776        if pos >= obj.len() || obj[pos] != b':' {
777            return None;
778        }
779        pos += 1;
780
781        // Skip whitespace between colon and value — include in template.
782        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
783            pos += 1;
784        }
785
786        // Template part: everything from part_start to here (includes key, colon, post-colon ws).
787        parts.push(obj[part_start..pos].to_vec());
788
789        // Extract the value (no whitespace skipping — already consumed above).
790        let value_start = pos;
791        // Use extract_value but we've already consumed whitespace.
792        let (value, value_end) = extract_value(obj, value_start)?;
793        pos = value_end;
794        pairs.push((key, value));
795
796        part_start = pos;
797
798        // Skip whitespace.
799        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
800            pos += 1;
801        }
802        if pos >= obj.len() {
803            return None;
804        }
805        if obj[pos] == b',' {
806            pos += 1;
807        } else if obj[pos] == b'}' {
808            parts.push(obj[part_start..].to_vec());
809            break;
810        } else {
811            return None;
812        }
813    }
814
815    if pairs.is_empty() {
816        return None;
817    }
818    Some((parts, pairs))
819}
820
821/// Parse a nested JSON object into its key-value pairs (depth-1 only).
822/// Returns the exact bytes for each key and value.
823/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
824pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
825    let mut pos = 0;
826
827    // Skip whitespace.
828    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
829        pos += 1;
830    }
831    if pos >= obj.len() || obj[pos] != b'{' {
832        return None;
833    }
834    pos += 1;
835
836    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
837
838    loop {
839        // Skip whitespace.
840        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
841            pos += 1;
842        }
843        if pos >= obj.len() {
844            return None;
845        }
846        if obj[pos] == b'}' {
847            break;
848        }
849
850        // Expect a key string.
851        if obj[pos] != b'"' {
852            return None;
853        }
854        pos += 1;
855        let key_start = pos;
856        let mut escaped = false;
857        while pos < obj.len() {
858            if escaped {
859                escaped = false;
860            } else if obj[pos] == b'\\' {
861                escaped = true;
862            } else if obj[pos] == b'"' {
863                break;
864            }
865            pos += 1;
866        }
867        if pos >= obj.len() {
868            return None;
869        }
870        let key = obj[key_start..pos].to_vec();
871        pos += 1; // skip closing quote
872
873        // Skip whitespace, expect colon.
874        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
875            pos += 1;
876        }
877        if pos >= obj.len() || obj[pos] != b':' {
878            return None;
879        }
880        pos += 1;
881
882        // Extract the value.
883        let (value, value_end) = extract_value(obj, pos)?;
884        pos = value_end;
885        pairs.push((key, value));
886
887        // Skip whitespace.
888        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
889            pos += 1;
890        }
891        if pos >= obj.len() {
892            return None;
893        }
894        if obj[pos] == b',' {
895            pos += 1;
896        } else if obj[pos] == b'}' {
897            break;
898        } else {
899            return None;
900        }
901    }
902
903    if pairs.is_empty() {
904        return None;
905    }
906    Some(pairs)
907}
908
909/// Unflatten nested sub-columns back into JSON objects.
910///
911/// Takes flattened columnar data and nested group info, merges sub-columns
912/// back into the original nested object columns.
913pub(crate) fn unflatten_nested_columns(
914    flat_data: &[u8],
915    nested_groups: &[NestedGroupInfo],
916    num_rows: usize,
917    total_flat_cols: usize,
918) -> Vec<u8> {
919    let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
920    if flat_columns.len() != total_flat_cols {
921        return flat_data.to_vec();
922    }
923
924    // Parse all flat column values.
925    let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
926    for chunk in &flat_columns {
927        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
928        if vals.len() != num_rows {
929            return flat_data.to_vec();
930        }
931        flat_col_values.push(vals);
932    }
933
934    // Reconstruct original columns from flat columns.
935    // Walk through flat columns, merging sub-columns back where needed.
936    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
937
938    // Build a set of original_col_index -> group for quick lookup.
939    // We need to know which flat columns map to which nested group.
940    // The flat columns are in order: non-nested cols keep their position,
941    // nested cols are replaced by their sub-columns at that position.
942    //
943    // To figure out which flat_idx corresponds to what, we replay the
944    // forward mapping.
945    // We need to know the ORIGINAL number of columns.
946    let original_num_cols = total_flat_cols
947        - nested_groups
948            .iter()
949            .map(|g| g.sub_keys.len())
950            .sum::<usize>()
951        + nested_groups.len();
952
953    // Build mapping: for each original col, is it nested or not?
954    let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
955    for (gi, group) in nested_groups.iter().enumerate() {
956        if (group.original_col_index as usize) < original_num_cols {
957            original_col_map[group.original_col_index as usize] = Some(gi);
958        }
959    }
960
961    let mut flat_idx = 0;
962    for entry in original_col_map.iter().take(original_num_cols) {
963        if let Some(gi) = entry {
964            let group = &nested_groups[*gi];
965            let num_sub = group.sub_keys.len();
966
967            // Helper: check if sub-key `si` at `row` is absent using bitmap.
968            let is_absent = |si: usize, row: usize| -> bool {
969                if group.absence_bitmap.is_empty() {
970                    return false; // no absences in this group
971                }
972                let bit_idx = si * num_rows + row;
973                let byte_idx = bit_idx / 8;
974                if byte_idx >= group.absence_bitmap.len() {
975                    return false;
976                }
977                (group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
978            };
979
980            // Merge sub-columns back into nested objects.
981            let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
982            for row in 0..num_rows {
983                // Check if all sub-columns are null or absent for this row
984                // (meaning the whole nested column was null).
985                let all_null = (0..num_sub).all(|si| {
986                    flat_idx + si < flat_col_values.len()
987                        && flat_col_values[flat_idx + si][row] == b"null"
988                });
989                if all_null && !group.absence_bitmap.is_empty() {
990                    // If all values are null but some are "absent" and some are
991                    // "explicit null", we need to reconstruct, not collapse.
992                    let any_present_null = (0..num_sub).any(|si| {
993                        flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
994                    });
995                    if any_present_null {
996                        // At least one key has an explicit null — don't collapse.
997                        // Fall through to reconstruction below.
998                    } else {
999                        // All nulls are from absent keys — whole column was null.
1000                        merged_col.push(b"null".to_vec());
1001                        continue;
1002                    }
1003                } else if all_null {
1004                    merged_col.push(b"null".to_vec());
1005                    continue;
1006                }
1007
1008                // Check whether any sub-key is absent in this row.
1009                let has_absent = (0..num_sub).any(|si| is_absent(si, row));
1010
1011                if !has_absent
1012                    && !group.nested_template.is_empty()
1013                    && group.nested_template.len() == num_sub + 1
1014                {
1015                    // Template-based reconstruction: all keys present,
1016                    // preserves original formatting exactly.
1017                    let mut obj = Vec::new();
1018                    obj.extend_from_slice(&group.nested_template[0]);
1019                    if flat_idx < flat_col_values.len() {
1020                        obj.extend_from_slice(flat_col_values[flat_idx][row]);
1021                    }
1022                    for si in 1..num_sub {
1023                        obj.extend_from_slice(&group.nested_template[si]);
1024                        if flat_idx + si < flat_col_values.len() {
1025                            obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
1026                        }
1027                    }
1028                    obj.extend_from_slice(&group.nested_template[num_sub]);
1029                    merged_col.push(obj);
1030                } else {
1031                    // Compact reconstruction: some keys absent, or no template.
1032                    // Skip sub-keys that were absent in the original.
1033                    let mut obj = Vec::new();
1034                    obj.push(b'{');
1035                    let mut first = true;
1036                    for si in 0..num_sub {
1037                        if flat_idx + si >= flat_col_values.len() {
1038                            break;
1039                        }
1040                        if is_absent(si, row) {
1041                            continue; // key was absent — omit entirely
1042                        }
1043                        let val = flat_col_values[flat_idx + si][row];
1044                        if !first {
1045                            obj.push(b',');
1046                        }
1047                        first = false;
1048                        obj.push(b'"');
1049                        obj.extend_from_slice(&group.sub_keys[si]);
1050                        obj.push(b'"');
1051                        obj.push(b':');
1052                        obj.extend_from_slice(val);
1053                    }
1054                    obj.push(b'}');
1055                    merged_col.push(obj);
1056                }
1057            }
1058            output_columns.push(merged_col);
1059            flat_idx += num_sub;
1060        } else {
1061            // Non-nested column — copy as-is.
1062            if flat_idx < flat_col_values.len() {
1063                let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1064                    .iter()
1065                    .map(|v| v.to_vec())
1066                    .collect();
1067                output_columns.push(col);
1068            }
1069            flat_idx += 1;
1070        }
1071    }
1072
1073    // Rebuild columnar data.
1074    let num_out_cols = output_columns.len();
1075    let mut out = Vec::new();
1076    for (ci, col) in output_columns.iter().enumerate() {
1077        for (ri, val) in col.iter().enumerate() {
1078            out.extend_from_slice(val);
1079            if ri < num_rows - 1 {
1080                out.push(VAL_SEP);
1081            }
1082        }
1083        if ci < num_out_cols - 1 {
1084            out.push(COL_SEP);
1085        }
1086    }
1087
1088    out
1089}
1090
1091/// Serialize nested group info into bytes for storage in metadata.
1092/// Version 1 (has_nested=1): sub_keys only (backward compat, NDJSON path).
1093/// Version 2 (has_nested=2): sub_keys + nested_template (preserves formatting).
1094/// Version 3 (has_nested=3): sub_keys + nested_template + absence_bitmap.
1095pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1096    let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1097    let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
1098    let mut out = Vec::new();
1099    let version = if has_absence {
1100        3u8
1101    } else if has_template {
1102        2u8
1103    } else {
1104        1u8
1105    };
1106    out.push(version);
1107    out.push(groups.len() as u8);
1108    for group in groups {
1109        out.extend_from_slice(&group.original_col_index.to_le_bytes());
1110        out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1111        for key in &group.sub_keys {
1112            out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1113            out.extend_from_slice(key);
1114        }
1115        if has_template || version == 3 {
1116            out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1117            for part in &group.nested_template {
1118                out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1119                out.extend_from_slice(part);
1120            }
1121        }
1122        if version == 3 {
1123            let bm_len = group.absence_bitmap.len() as u32;
1124            out.extend_from_slice(&bm_len.to_le_bytes());
1125            out.extend_from_slice(&group.absence_bitmap);
1126        }
1127    }
1128    out
1129}
1130
1131/// Deserialize nested group info from metadata bytes.
1132/// Returns (nested_groups, bytes_consumed).
1133/// Handles version 1 (no template), version 2 (with template),
1134/// and version 3 (template + absence bitmap).
1135pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1136    if data.is_empty() {
1137        return None;
1138    }
1139    let mut pos = 0;
1140    let version = data[pos];
1141    pos += 1;
1142    if version != 1 && version != 2 && version != 3 {
1143        return None;
1144    }
1145    let has_template = version == 2 || version == 3;
1146    let has_absence = version == 3;
1147    if pos >= data.len() {
1148        return None;
1149    }
1150    let num_groups = data[pos] as usize;
1151    pos += 1;
1152
1153    let mut groups = Vec::with_capacity(num_groups);
1154    for _ in 0..num_groups {
1155        if pos + 4 > data.len() {
1156            return None;
1157        }
1158        let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1159        pos += 2;
1160        let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1161        pos += 2;
1162
1163        let mut sub_keys = Vec::with_capacity(num_sub_cols);
1164        for _ in 0..num_sub_cols {
1165            if pos + 2 > data.len() {
1166                return None;
1167            }
1168            let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1169            pos += 2;
1170            if pos + key_len > data.len() {
1171                return None;
1172            }
1173            sub_keys.push(data[pos..pos + key_len].to_vec());
1174            pos += key_len;
1175        }
1176
1177        let nested_template = if has_template {
1178            if pos + 2 > data.len() {
1179                return None;
1180            }
1181            let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1182            pos += 2;
1183            let mut parts = Vec::with_capacity(num_parts);
1184            for _ in 0..num_parts {
1185                if pos + 2 > data.len() {
1186                    return None;
1187                }
1188                let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1189                pos += 2;
1190                if pos + part_len > data.len() {
1191                    return None;
1192                }
1193                parts.push(data[pos..pos + part_len].to_vec());
1194                pos += part_len;
1195            }
1196            parts
1197        } else {
1198            Vec::new()
1199        };
1200
1201        let absence_bitmap = if has_absence {
1202            if pos + 4 > data.len() {
1203                return None;
1204            }
1205            let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1206            pos += 4;
1207            if pos + bm_len > data.len() {
1208                return None;
1209            }
1210            let bm = data[pos..pos + bm_len].to_vec();
1211            pos += bm_len;
1212            bm
1213        } else {
1214            Vec::new()
1215        };
1216
1217        groups.push(NestedGroupInfo {
1218            original_col_index,
1219            sub_keys,
1220            nested_template,
1221            absence_bitmap,
1222        });
1223    }
1224
1225    Some((groups, pos))
1226}
1227
1228/// Forward transform: NDJSON columnar reorg.
1229///
1230/// Tries Strategy 1 (uniform) first, then Strategy 2 (grouped) if schemas differ.
1231/// Returns None if data is not suitable for columnar transform.
1232pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1233    if data.is_empty() {
1234        return None;
1235    }
1236
1237    let has_trailing_newline = data.last() == Some(&b'\n');
1238    let lines = split_lines(data);
1239    let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1240
1241    if non_empty.len() < 2 {
1242        return None;
1243    }
1244
1245    // Strategy 1: try uniform schema first.
1246    if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1247        if col_data.len() + metadata.len() < data.len() {
1248            // Try depth-1 nested decomposition on the columnar output.
1249            // Even if the flattened data is slightly larger raw, the downstream
1250            // typed encoding + compression benefits are significant: null bitmaps
1251            // are compact and type-homogeneous columns compress much better.
1252            let num_rows = non_empty.len();
1253            if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1254                // Verify roundtrip: unflatten must produce the exact original columnar
1255                // data. Nested objects with varying sub-key sets or key ordering can
1256                // cause the compact reconstruction to reorder keys, breaking byte-exact
1257                // roundtrip. Only apply if the unflatten is provably lossless.
1258                let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
1259                let unflattened = unflatten_nested_columns(
1260                    &flat_data,
1261                    &nested_groups,
1262                    num_rows,
1263                    total_flat_cols,
1264                );
1265                if unflattened == col_data {
1266                    // Append nested info to metadata.
1267                    let nested_bytes = serialize_nested_info(&nested_groups);
1268                    metadata.extend_from_slice(&nested_bytes);
1269                    return Some(TransformResult {
1270                        data: flat_data,
1271                        metadata,
1272                    });
1273                }
1274                // else: roundtrip not exact — skip nested flatten.
1275            }
1276            // No nested objects found — append has_nested=0.
1277            metadata.push(0u8); // has_nested = 0
1278            return Some(TransformResult {
1279                data: col_data,
1280                metadata,
1281            });
1282        }
1283    }
1284
1285    // Strategy 2: group by schema.
1286    if let Some((grouped_data, grouped_metadata)) =
1287        preprocess_grouped(&non_empty, has_trailing_newline)
1288    {
1289        if grouped_data.len() + grouped_metadata.len() < data.len() {
1290            return Some(TransformResult {
1291                data: grouped_data,
1292                metadata: grouped_metadata,
1293            });
1294        }
1295    }
1296
1297    None
1298}
1299
1300/// Reverse transform: reconstruct NDJSON from columnar layout + metadata.
1301/// Dispatches to the appropriate decoder based on metadata version byte.
1302pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1303    if metadata.is_empty() {
1304        return data.to_vec();
1305    }
1306    match metadata[0] {
1307        METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1308        METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1309        _ => data.to_vec(),
1310    }
1311}
1312
1313/// Reverse Strategy 1: uniform schema.
1314fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1315    if metadata.len() < 10 {
1316        return data.to_vec();
1317    }
1318    let mut pos = 0;
1319    let _version = metadata[pos];
1320    pos += 1;
1321    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1322    pos += 4;
1323    let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1324    pos += 2;
1325    let has_trailing_newline = metadata[pos] != 0;
1326    pos += 1;
1327    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1328    pos += 2;
1329
1330    let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1331    for _ in 0..num_parts {
1332        if pos + 2 > metadata.len() {
1333            return data.to_vec();
1334        }
1335        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1336        pos += 2;
1337        if pos + part_len > metadata.len() {
1338            return data.to_vec();
1339        }
1340        parts.push(metadata[pos..pos + part_len].to_vec());
1341        pos += part_len;
1342    }
1343
1344    if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1345        return data.to_vec();
1346    }
1347
1348    // Check for nested metadata after template parts.
1349    let remaining_metadata = &metadata[pos..];
1350    if !remaining_metadata.is_empty()
1351        && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
1352    {
1353        // has_nested == 1, 2, or 3: unflatten before reconstructing rows.
1354        if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1355            // Calculate total number of flat columns.
1356            let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1357            let unflattened =
1358                unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1359            return reverse_uniform_from_parts(
1360                &unflattened,
1361                &parts,
1362                num_rows,
1363                num_cols,
1364                has_trailing_newline,
1365            );
1366        }
1367    }
1368
1369    reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1370}
1371
1372/// Core uniform reverse: given parsed parts, reconstruct lines from columnar data.
1373fn reverse_uniform_from_parts(
1374    data: &[u8],
1375    parts: &[Vec<u8>],
1376    num_rows: usize,
1377    num_cols: usize,
1378    has_trailing_newline: bool,
1379) -> Vec<u8> {
1380    let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1381    if col_chunks.len() != num_cols {
1382        return data.to_vec();
1383    }
1384
1385    let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1386    for chunk in &col_chunks {
1387        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1388        if vals.len() != num_rows {
1389            return data.to_vec();
1390        }
1391        columns.push(vals);
1392    }
1393
1394    let mut output = Vec::with_capacity(data.len() * 2);
1395    #[allow(clippy::needless_range_loop)]
1396    for row in 0..num_rows {
1397        output.extend_from_slice(&parts[0]);
1398        output.extend_from_slice(columns[0][row]);
1399        for col in 1..num_cols {
1400            output.extend_from_slice(&parts[col]);
1401            output.extend_from_slice(columns[col][row]);
1402        }
1403        output.extend_from_slice(&parts[num_cols]);
1404
1405        if row < num_rows - 1 || has_trailing_newline {
1406            output.push(b'\n');
1407        }
1408    }
1409
1410    output
1411}
1412
1413/// Parse Strategy 1 metadata and return (parts, num_rows, num_cols, has_trailing_newline).
1414/// Used by reverse_grouped to decode per-group metadata.
1415fn parse_uniform_metadata(metadata: &[u8]) -> Option<(Vec<Vec<u8>>, usize, usize, bool)> {
1416    if metadata.len() < 10 {
1417        return None;
1418    }
1419    let mut pos = 1; // Skip version byte.
1420    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1421    pos += 4;
1422    let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1423    pos += 2;
1424    let has_trailing_newline = metadata[pos] != 0;
1425    pos += 1;
1426    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1427    pos += 2;
1428
1429    let mut parts = Vec::with_capacity(num_parts);
1430    for _ in 0..num_parts {
1431        if pos + 2 > metadata.len() {
1432            return None;
1433        }
1434        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1435        pos += 2;
1436        if pos + part_len > metadata.len() {
1437            return None;
1438        }
1439        parts.push(metadata[pos..pos + part_len].to_vec());
1440        pos += part_len;
1441    }
1442
1443    if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1444        return None;
1445    }
1446
1447    Some((parts, num_rows, num_cols, has_trailing_newline))
1448}
1449
1450/// Reverse Strategy 2: grouped schema.
1451fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1452    if metadata.len() < 8 {
1453        return data.to_vec();
1454    }
1455
1456    let mut mpos = 1; // Skip version byte.
1457    let has_trailing_newline = metadata[mpos] != 0;
1458    mpos += 1;
1459    let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1460    mpos += 4;
1461    let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1462    mpos += 2;
1463
1464    // Allocate output slots.
1465    let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1466
1467    // Data cursor.
1468    let mut dpos: usize = 0;
1469
1470    for _ in 0..num_groups {
1471        // Read row indices for this group.
1472        if mpos + 4 > metadata.len() {
1473            return data.to_vec();
1474        }
1475        let group_row_count =
1476            u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1477        mpos += 4;
1478
1479        let mut row_indices = Vec::with_capacity(group_row_count);
1480        for _ in 0..group_row_count {
1481            if mpos + 4 > metadata.len() {
1482                return data.to_vec();
1483            }
1484            let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1485            mpos += 4;
1486            row_indices.push(idx);
1487        }
1488
1489        // Read group metadata.
1490        if mpos + 4 > metadata.len() {
1491            return data.to_vec();
1492        }
1493        let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1494        mpos += 4;
1495        if mpos + gm_len > metadata.len() {
1496            return data.to_vec();
1497        }
1498        let group_metadata = &metadata[mpos..mpos + gm_len];
1499        mpos += gm_len;
1500
1501        // Read group data from the data blob.
1502        if dpos + 4 > data.len() {
1503            return data.to_vec();
1504        }
1505        let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1506        dpos += 4;
1507        if dpos + gd_len > data.len() {
1508            return data.to_vec();
1509        }
1510        let group_data = &data[dpos..dpos + gd_len];
1511        dpos += gd_len;
1512
1513        // Decode this group using Strategy 1 reverse.
1514        let (parts, num_rows, num_cols, _trailing) = match parse_uniform_metadata(group_metadata) {
1515            Some(v) => v,
1516            None => return data.to_vec(),
1517        };
1518
1519        if num_rows != group_row_count {
1520            return data.to_vec();
1521        }
1522
1523        // Split columnar data into columns and values.
1524        let col_chunks: Vec<&[u8]> = group_data.split(|&b| b == COL_SEP).collect();
1525        if col_chunks.len() != num_cols {
1526            return data.to_vec();
1527        }
1528
1529        let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1530        for chunk in &col_chunks {
1531            let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1532            if vals.len() != num_rows {
1533                return data.to_vec();
1534            }
1535            columns.push(vals);
1536        }
1537
1538        // Reconstruct each line for this group.
1539        for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1540            let mut line = Vec::new();
1541            line.extend_from_slice(&parts[0]);
1542            line.extend_from_slice(columns[0][row_within_group]);
1543            for col in 1..num_cols {
1544                line.extend_from_slice(&parts[col]);
1545                line.extend_from_slice(columns[col][row_within_group]);
1546            }
1547            line.extend_from_slice(&parts[num_cols]);
1548
1549            if original_idx < total_rows {
1550                output_lines[original_idx] = Some(line);
1551            }
1552        }
1553    }
1554
1555    // Read residual indices from metadata.
1556    if mpos + 4 > metadata.len() {
1557        return data.to_vec();
1558    }
1559    let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1560    mpos += 4;
1561
1562    let mut residual_indices = Vec::with_capacity(residual_count);
1563    for _ in 0..residual_count {
1564        if mpos + 4 > metadata.len() {
1565            return data.to_vec();
1566        }
1567        let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1568        mpos += 4;
1569        residual_indices.push(idx);
1570    }
1571
1572    // Remaining data is residual lines.
1573    let residual_data = &data[dpos..];
1574    if residual_count > 0 {
1575        let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
1576            vec![]
1577        } else {
1578            residual_data.split(|&b| b == b'\n').collect()
1579        };
1580        // There should be exactly residual_count lines.
1581        if residual_lines.len() != residual_count {
1582            return data.to_vec();
1583        }
1584        for (i, &idx) in residual_indices.iter().enumerate() {
1585            if idx < total_rows {
1586                output_lines[idx] = Some(residual_lines[i].to_vec());
1587            }
1588        }
1589    }
1590
1591    // Assemble final output.
1592    let mut output = Vec::with_capacity(data.len() * 2);
1593    for (i, slot) in output_lines.iter().enumerate() {
1594        match slot {
1595            Some(line) => output.extend_from_slice(line),
1596            None => {
1597                // Should not happen — missing row. Return data as-is.
1598                return data.to_vec();
1599            }
1600        }
1601        if i < total_rows - 1 || has_trailing_newline {
1602            output.push(b'\n');
1603        }
1604    }
1605
1606    output
1607}
1608
1609#[cfg(test)]
1610mod tests {
1611    use super::*;
1612
1613    #[test]
1614    fn extract_value_string() {
1615        let line = br#""hello","next""#;
1616        let (val, end) = extract_value(line, 0).unwrap();
1617        assert_eq!(val, b"\"hello\"");
1618        assert_eq!(end, 7);
1619    }
1620
1621    #[test]
1622    fn extract_value_number() {
1623        let line = b"42,next";
1624        let (val, end) = extract_value(line, 0).unwrap();
1625        assert_eq!(val, b"42");
1626        assert_eq!(end, 2);
1627    }
1628
1629    #[test]
1630    fn extract_value_bool() {
1631        let line = b"true,next";
1632        let (val, end) = extract_value(line, 0).unwrap();
1633        assert_eq!(val, b"true");
1634        assert_eq!(end, 4);
1635    }
1636
1637    #[test]
1638    fn extract_value_null() {
1639        let line = b"null,next";
1640        let (val, end) = extract_value(line, 0).unwrap();
1641        assert_eq!(val, b"null");
1642        assert_eq!(end, 4);
1643    }
1644
1645    #[test]
1646    fn extract_value_object() {
1647        let line = br#"{"a":1,"b":"x"},next"#;
1648        let (val, end) = extract_value(line, 0).unwrap();
1649        assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
1650        assert_eq!(end, 15);
1651    }
1652
1653    #[test]
1654    fn extract_value_array() {
1655        let line = b"[1,2,3],next";
1656        let (val, end) = extract_value(line, 0).unwrap();
1657        assert_eq!(val, b"[1,2,3]");
1658        assert_eq!(end, 7);
1659    }
1660
1661    #[test]
1662    fn extract_value_string_with_escapes() {
1663        let line = br#""he\"llo",next"#;
1664        let (val, end) = extract_value(line, 0).unwrap();
1665        assert_eq!(val, br#""he\"llo""#.to_vec());
1666        assert_eq!(end, 9);
1667    }
1668
1669    #[test]
1670    fn parse_line_simple() {
1671        let line = br#"{"a":1,"b":"x"}"#;
1672        let (parts, values) = parse_line(line).unwrap();
1673        assert_eq!(parts.len(), 3); // {"a": , ,"b": , }
1674        assert_eq!(values.len(), 2);
1675        assert_eq!(values[0], b"1");
1676        assert_eq!(values[1], b"\"x\"");
1677        assert_eq!(parts[0], br#"{"a":"#.to_vec());
1678        assert_eq!(parts[1], br#","b":"#.to_vec());
1679        assert_eq!(parts[2], b"}");
1680    }
1681
1682    #[test]
1683    fn roundtrip_simple() {
1684        let data = br#"{"a":1,"b":"x"}
1685{"a":2,"b":"y"}
1686{"a":3,"b":"z"}
1687"#;
1688        let result = preprocess(data).expect("should produce transform");
1689        let restored = reverse(&result.data, &result.metadata);
1690        assert_eq!(
1691            String::from_utf8_lossy(&restored),
1692            String::from_utf8_lossy(data),
1693        );
1694        assert_eq!(restored, data.to_vec());
1695    }
1696
1697    #[test]
1698    fn roundtrip_no_trailing_newline() {
1699        let data = br#"{"a":1,"b":"x"}
1700{"a":2,"b":"y"}
1701{"a":3,"b":"z"}"#;
1702        let result = preprocess(data).expect("should produce transform");
1703        let restored = reverse(&result.data, &result.metadata);
1704        assert_eq!(restored, data.to_vec());
1705    }
1706
1707    #[test]
1708    fn roundtrip_nested_values() {
1709        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1710{"id":2,"meta":{"x":30,"y":40}}
1711{"id":3,"meta":{"x":50,"y":60}}
1712{"id":4,"meta":{"x":70,"y":80}}
1713{"id":5,"meta":{"x":90,"y":100}}
1714"#;
1715        let result = preprocess(data).expect("should produce transform");
1716        let restored = reverse(&result.data, &result.metadata);
1717        assert_eq!(restored, data.to_vec());
1718    }
1719
1720    #[test]
1721    fn roundtrip_mixed_types() {
1722        let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
1723{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
1724{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
1725{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
1726{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
1727"#;
1728        let result = preprocess(data).expect("should produce transform");
1729        let restored = reverse(&result.data, &result.metadata);
1730        assert_eq!(restored, data.to_vec());
1731    }
1732
1733    #[test]
1734    fn schema_mismatch_too_few_returns_none() {
1735        // Different keys on different lines, but each group has < MIN_GROUP_ROWS.
1736        let data = br#"{"a":1,"b":2}
1737{"a":1,"c":3}
1738"#;
1739        assert!(preprocess(data).is_none());
1740    }
1741
1742    #[test]
1743    fn different_num_keys_too_few_returns_none() {
1744        let data = br#"{"a":1,"b":2}
1745{"a":1}
1746"#;
1747        assert!(preprocess(data).is_none());
1748    }
1749
1750    #[test]
1751    fn single_line_returns_none() {
1752        let data = br#"{"a":1,"b":2}
1753"#;
1754        assert!(preprocess(data).is_none());
1755    }
1756
1757    #[test]
1758    fn empty_returns_none() {
1759        assert!(preprocess(b"").is_none());
1760    }
1761
1762    #[test]
1763    fn column_layout_groups_similar_values() {
1764        let data = br#"{"type":"page_view","user":"alice"}
1765{"type":"api_call","user":"alice"}
1766{"type":"click","user":"bob"}
1767"#;
1768        let result = preprocess(data).unwrap();
1769
1770        // The columnar data should have type values grouped, then user values grouped.
1771        let col_data = &result.data;
1772        let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
1773        assert_eq!(cols.len(), 2);
1774
1775        // Column 0 = type values.
1776        let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
1777        assert_eq!(type_vals.len(), 3);
1778        assert_eq!(type_vals[0], br#""page_view""#);
1779        assert_eq!(type_vals[1], br#""api_call""#);
1780        assert_eq!(type_vals[2], br#""click""#);
1781
1782        // Column 1 = user values.
1783        let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
1784        assert_eq!(user_vals.len(), 3);
1785        assert_eq!(user_vals[0], br#""alice""#);
1786        assert_eq!(user_vals[1], br#""alice""#);
1787        assert_eq!(user_vals[2], br#""bob""#);
1788    }
1789
1790    #[test]
1791    fn roundtrip_string_with_escaped_chars() {
1792        let data = br#"{"msg":"he said \"hi\"","val":1}
1793{"msg":"line\nbreak","val":2}
1794{"msg":"tab\there","val":3}
1795{"msg":"back\\slash","val":4}
1796{"msg":"normal text","val":5}
1797"#;
1798        let result = preprocess(data).expect("should produce transform");
1799        let restored = reverse(&result.data, &result.metadata);
1800        assert_eq!(restored, data.to_vec());
1801    }
1802
1803    #[test]
1804    fn roundtrip_negative_and_float_numbers() {
1805        let data = br#"{"x":-3.14,"y":0}
1806{"x":2.718,"y":-1}
1807{"x":0.001,"y":999}
1808{"x":-100,"y":-200}
1809{"x":42.0,"y":7}
1810"#;
1811        let result = preprocess(data).expect("should produce transform");
1812        let restored = reverse(&result.data, &result.metadata);
1813        assert_eq!(restored, data.to_vec());
1814    }
1815
1816    /// Test that the transform+reverse is lossless even for tiny inputs
1817    /// by repeating them to pass the size check threshold.
1818    #[test]
1819    fn reverse_roundtrip_small_data() {
1820        // Verify parse_line works on small lines.
1821        let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
1822        assert_eq!(vals.len(), 2);
1823        assert_eq!(parts.len(), 3);
1824
1825        // 2-row data might fail the size check, so repeat to get enough rows.
1826        let big_data = br#"{"x":-3.14,"y":0}
1827{"x":2.718,"y":-1}
1828"#
1829        .repeat(20);
1830        let result = preprocess(&big_data).expect("should produce transform with 40 rows");
1831        let restored = reverse(&result.data, &result.metadata);
1832        assert_eq!(restored, big_data);
1833    }
1834
1835    // --- Strategy 2 (grouped) tests ---
1836
1837    #[test]
1838    fn grouped_roundtrip_two_schemas() {
1839        // Two different schemas, each with >= MIN_GROUP_ROWS rows.
1840        let mut data = Vec::new();
1841        for i in 0..10 {
1842            data.extend_from_slice(
1843                format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1844            );
1845            data.push(b'\n');
1846        }
1847        for i in 10..20 {
1848            data.extend_from_slice(
1849                format!(
1850                    r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1851                    i, i, i
1852                )
1853                .as_bytes(),
1854            );
1855            data.push(b'\n');
1856        }
1857        let result = preprocess(&data).expect("should produce grouped transform");
1858        // Should be version 2 (grouped).
1859        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1860        let restored = reverse(&result.data, &result.metadata);
1861        assert_eq!(restored, data);
1862    }
1863
1864    #[test]
1865    fn grouped_roundtrip_interleaved_schemas() {
1866        // Interleaved schemas: alternating between two different key sets.
1867        let mut data = Vec::new();
1868        for i in 0..20 {
1869            if i % 2 == 0 {
1870                data.extend_from_slice(
1871                    format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1872                );
1873            } else {
1874                data.extend_from_slice(
1875                    format!(
1876                        r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1877                        i, i, i
1878                    )
1879                    .as_bytes(),
1880                );
1881            }
1882            data.push(b'\n');
1883        }
1884        let result = preprocess(&data).expect("should produce grouped transform");
1885        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1886        let restored = reverse(&result.data, &result.metadata);
1887        assert_eq!(restored, data);
1888    }
1889
1890    #[test]
1891    fn grouped_roundtrip_with_residuals() {
1892        // Two large groups + a few unique-schema rows (residuals).
1893        let mut data = Vec::new();
1894        // Group A: 8 rows.
1895        for i in 0..8 {
1896            data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
1897            data.push(b'\n');
1898        }
1899        // 2 unique rows (will be residual).
1900        data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
1901        data.push(b'\n');
1902        data.extend_from_slice(br#"{"p":"q"}"#);
1903        data.push(b'\n');
1904        // Group B: 6 rows.
1905        for i in 0..6 {
1906            data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
1907            data.push(b'\n');
1908        }
1909        let result = preprocess(&data).expect("should produce grouped transform");
1910        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1911        let restored = reverse(&result.data, &result.metadata);
1912        assert_eq!(
1913            String::from_utf8_lossy(&restored),
1914            String::from_utf8_lossy(&data),
1915        );
1916        assert_eq!(restored, data);
1917    }
1918
1919    #[test]
1920    fn grouped_roundtrip_no_trailing_newline() {
1921        let mut data = Vec::new();
1922        for i in 0..6 {
1923            data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
1924            data.push(b'\n');
1925        }
1926        for i in 0..6 {
1927            data.extend_from_slice(
1928                format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
1929            );
1930            if i < 5 {
1931                data.push(b'\n');
1932            }
1933            // Last line: no trailing newline.
1934        }
1935        let result = preprocess(&data).expect("should produce grouped transform");
1936        let restored = reverse(&result.data, &result.metadata);
1937        assert_eq!(restored, data);
1938    }
1939
1940    #[test]
1941    fn uniform_still_preferred_over_grouped() {
1942        // All rows same schema — should use Strategy 1 (version 1), not Strategy 2.
1943        let data = br#"{"a":1,"b":"x"}
1944{"a":2,"b":"y"}
1945{"a":3,"b":"z"}
1946{"a":4,"b":"w"}
1947{"a":5,"b":"v"}
1948"#;
1949        let result = preprocess(data).expect("should produce transform");
1950        assert_eq!(
1951            result.metadata[0], METADATA_VERSION_UNIFORM,
1952            "uniform schema should use Strategy 1"
1953        );
1954        let restored = reverse(&result.data, &result.metadata);
1955        assert_eq!(restored, data.to_vec());
1956    }
1957
1958    #[test]
1959    fn grouped_gharchive_simulation() {
1960        // Simulates GitHub Archive: most rows have 7 keys, some have 8.
1961        let mut data = Vec::new();
1962        for i in 0..50 {
1963            if i % 5 == 0 {
1964                // 8-key rows (with org).
1965                data.extend_from_slice(
1966                    format!(
1967                        r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
1968                        i, i, i, i
1969                    )
1970                    .as_bytes(),
1971                );
1972            } else {
1973                // 7-key rows (no org).
1974                data.extend_from_slice(
1975                    format!(
1976                        r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
1977                        i, i, i
1978                    )
1979                    .as_bytes(),
1980                );
1981            }
1982            data.push(b'\n');
1983        }
1984        let result = preprocess(&data).expect("should produce grouped transform");
1985        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1986        let restored = reverse(&result.data, &result.metadata);
1987        assert_eq!(restored, data);
1988    }
1989
1990    // --- Nested decomposition tests ---
1991
1992    #[test]
1993    fn test_nested_decomposition_basic() {
1994        // Simple nested object decomposed correctly.
1995        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1996{"id":2,"meta":{"x":30,"y":40}}
1997{"id":3,"meta":{"x":50,"y":60}}
1998"#;
1999        let result = preprocess(data).expect("should produce transform");
2000        assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
2001
2002        // The columnar data should have expanded columns.
2003        let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
2004        // Original: 2 cols (id, meta). After flattening: 3 cols (id, meta.x, meta.y).
2005        assert_eq!(
2006            cols.len(),
2007            3,
2008            "should have 3 columns after flattening: got {}",
2009            cols.len()
2010        );
2011
2012        // Verify sub-columns contain the extracted values.
2013        let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2014        assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
2015
2016        let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
2017        assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
2018    }
2019
2020    #[test]
2021    fn test_nested_roundtrip() {
2022        // Flatten -> unflatten produces byte-exact original.
2023        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2024{"id":2,"meta":{"x":30,"y":40}}
2025{"id":3,"meta":{"x":50,"y":60}}
2026"#;
2027        let result = preprocess(data).expect("should produce transform");
2028        let restored = reverse(&result.data, &result.metadata);
2029        assert_eq!(
2030            String::from_utf8_lossy(&restored),
2031            String::from_utf8_lossy(data),
2032        );
2033        assert_eq!(restored, data.to_vec());
2034    }
2035
2036    #[test]
2037    fn test_nested_mixed_schemas() {
2038        // Different nested objects per row (some keys missing -> null).
2039        let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
2040{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
2041{"ts":"c","meta":{"query":"pricing","results_count":25}}
2042{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
2043{"ts":"e","meta":{"query":"api docs","results_count":41}}
2044"#;
2045        let result = preprocess(data).expect("should produce transform");
2046        let restored = reverse(&result.data, &result.metadata);
2047        assert_eq!(
2048            String::from_utf8_lossy(&restored),
2049            String::from_utf8_lossy(data),
2050        );
2051        assert_eq!(restored, data.to_vec());
2052    }
2053
2054    #[test]
2055    fn test_nested_no_nested_objects() {
2056        // Returns None when no nested objects — flat data should still work.
2057        let data = br#"{"a":1,"b":"x"}
2058{"a":2,"b":"y"}
2059{"a":3,"b":"z"}
2060"#;
2061        let result = preprocess(data).expect("should produce transform");
2062        let restored = reverse(&result.data, &result.metadata);
2063        assert_eq!(restored, data.to_vec());
2064
2065        // Verify the metadata has has_nested=0 since no nested objects.
2066        // The nested flag is appended after template parts.
2067        // For uniform, metadata starts with version(1) + num_rows(4) + num_cols(2) +
2068        // trailing_newline(1) + num_parts(2) + parts.
2069        // After those parts, there should be a 0 byte (has_nested=0).
2070        let meta = &result.metadata;
2071        let last_byte = meta[meta.len() - 1];
2072        assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
2073    }
2074
2075    #[test]
2076    fn test_nested_real_corpus() {
2077        // Test with data shaped like the test-ndjson.ndjson corpus.
2078        let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
2079{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
2080{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
2081{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
2082{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
2083"#;
2084        let result = preprocess(data).expect("should produce transform");
2085        let restored = reverse(&result.data, &result.metadata);
2086        assert_eq!(
2087            String::from_utf8_lossy(&restored),
2088            String::from_utf8_lossy(data),
2089        );
2090        assert_eq!(restored, data.to_vec());
2091    }
2092
2093    #[test]
2094    fn test_nested_roundtrip_with_null_values() {
2095        // Some rows have null for the nested field.
2096        let data = br#"{"id":1,"meta":{"x":10}}
2097{"id":2,"meta":null}
2098{"id":3,"meta":{"x":30}}
2099{"id":4,"meta":null}
2100{"id":5,"meta":{"x":50}}
2101"#;
2102        let result = preprocess(data).expect("should produce transform");
2103        let restored = reverse(&result.data, &result.metadata);
2104        assert_eq!(restored, data.to_vec());
2105    }
2106
2107    #[test]
2108    fn test_nested_string_values_preserved_exact() {
2109        // Verify that string values in nested objects preserve exact bytes (with quotes).
2110        let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2111{"id":2,"meta":{"name":"Bob","score":200}}
2112{"id":3,"meta":{"name":"Charlie","score":300}}
2113"#;
2114        let result = preprocess(data).expect("should produce transform");
2115        let restored = reverse(&result.data, &result.metadata);
2116        assert_eq!(restored, data.to_vec());
2117    }
2118
2119    #[test]
2120    fn test_parse_nested_object_kv() {
2121        let obj = br#"{"query":"benchmark","results_count":14}"#;
2122        let pairs = parse_nested_object_kv(obj).unwrap();
2123        assert_eq!(pairs.len(), 2);
2124        assert_eq!(pairs[0].0, b"query");
2125        assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2126        assert_eq!(pairs[1].0, b"results_count");
2127        assert_eq!(pairs[1].1, b"14");
2128    }
2129
2130    #[test]
2131    fn test_nested_varying_subkeys_roundtrip() {
2132        // Regression: rows with varying sub-keys in nested objects must
2133        // round-trip byte-exact. Even rows have `extra`, odd rows don't.
2134        let mut lines = Vec::new();
2135        for i in 0..50 {
2136            let line = if i % 2 == 0 {
2137                format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
2138            } else {
2139                format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
2140            };
2141            lines.push(line);
2142        }
2143        let ndjson = lines.join("\n") + "\n";
2144        let data = ndjson.as_bytes();
2145
2146        let result = preprocess(data).expect("should produce transform");
2147        let restored = reverse(&result.data, &result.metadata);
2148        assert_eq!(
2149            std::str::from_utf8(&restored).unwrap(),
2150            std::str::from_utf8(data).unwrap(),
2151            "varying sub-keys roundtrip must be byte-exact"
2152        );
2153    }
2154
2155    #[test]
2156    fn test_nested_explicit_null_preserved() {
2157        // Explicit null values in nested objects must survive roundtrip.
2158        // `{"x":1,"y":null}` must NOT be collapsed to `{"x":1}`.
2159        let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
2160                     {\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
2161                     {\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
2162        let result = preprocess(data).expect("should produce transform");
2163        let restored = reverse(&result.data, &result.metadata);
2164        assert_eq!(
2165            std::str::from_utf8(&restored).unwrap(),
2166            std::str::from_utf8(data).unwrap(),
2167            "explicit null values must be preserved"
2168        );
2169    }
2170}