Skip to main content

datacortex_core/format/
ndjson.rs

1//! NDJSON columnar reorg — lossless transform that reorders row-oriented
2//! NDJSON data into column-oriented layout.
3//!
4//! Two strategies:
5//!
6//! **Strategy 1 (uniform):** All rows share the same schema (keys in same order).
7//!   Row-oriented (before):
8//!     {"ts":"2026-03-15T10:30:00.001Z","type":"page_view","user":"usr_a1b2c3d4"}
9//!     {"ts":"2026-03-15T10:30:00.234Z","type":"api_call","user":"usr_a1b2c3d4"}
10//!   Column-oriented (after):
11//!     [ts values] "2026-03-15T10:30:00.001Z" \x01 "2026-03-15T10:30:00.234Z" \x00
12//!     [type values] "page_view" \x01 "api_call" \x00
13//!     [user values] "usr_a1b2c3d4" \x01 "usr_a1b2c3d4"
14//!
15//! **Strategy 2 (grouped):** Rows have diverse schemas (e.g., GitHub Archive events).
16//!   Groups rows by schema, applies Strategy 1 per group, stores residual rows raw.
17//!   Metadata version byte distinguishes: 1 = uniform, 2 = grouped.
18//!
19//! Separators:
20//!   \x00 = column separator (cannot appear in valid JSON text)
21//!   \x01 = value separator within a column (cannot appear in valid JSON)
22
23use super::transform::TransformResult;
24use std::collections::HashMap;
25
26const COL_SEP: u8 = 0x00;
27const VAL_SEP: u8 = 0x01;
28const METADATA_VERSION_UNIFORM: u8 = 1;
29const METADATA_VERSION_GROUPED: u8 = 2;
30
31/// Minimum rows in a schema group for it to be columnarized (not residual).
32const MIN_GROUP_ROWS: usize = 5;
33
34/// A schema group: template parts + list of (row_index, parsed_values).
35type SchemaGroup = (Vec<Vec<u8>>, Vec<(usize, Vec<Vec<u8>>)>);
36
37/// Extract the raw value bytes from a JSON line at a given position.
38/// `pos` should point to the first byte of the value (after `:`).
39/// Returns (value_bytes, end_position).
40///
41/// Handles: strings, numbers, booleans, null, nested objects, nested arrays.
42fn extract_value(line: &[u8], mut pos: usize) -> Option<(Vec<u8>, usize)> {
43    // Skip whitespace after colon.
44    while pos < line.len() && line[pos].is_ascii_whitespace() {
45        pos += 1;
46    }
47    if pos >= line.len() {
48        return None;
49    }
50
51    let start = pos;
52    match line[pos] {
53        b'"' => {
54            // String value — scan to closing unescaped quote.
55            pos += 1;
56            let mut escaped = false;
57            while pos < line.len() {
58                if escaped {
59                    escaped = false;
60                } else if line[pos] == b'\\' {
61                    escaped = true;
62                } else if line[pos] == b'"' {
63                    pos += 1;
64                    return Some((line[start..pos].to_vec(), pos));
65                }
66                pos += 1;
67            }
68            None // Unterminated string.
69        }
70        b'{' => {
71            // Nested object — match braces, respecting strings.
72            let mut depth = 1;
73            pos += 1;
74            while pos < line.len() && depth > 0 {
75                match line[pos] {
76                    b'"' => {
77                        // Skip over string contents.
78                        pos += 1;
79                        let mut escaped = false;
80                        while pos < line.len() {
81                            if escaped {
82                                escaped = false;
83                            } else if line[pos] == b'\\' {
84                                escaped = true;
85                            } else if line[pos] == b'"' {
86                                break;
87                            }
88                            pos += 1;
89                        }
90                    }
91                    b'{' => depth += 1,
92                    b'}' => depth -= 1,
93                    _ => {}
94                }
95                pos += 1;
96            }
97            if depth != 0 || pos > line.len() {
98                return None; // Unterminated object.
99            }
100            Some((line[start..pos].to_vec(), pos))
101        }
102        b'[' => {
103            // Nested array — match brackets, respecting strings.
104            let mut depth = 1;
105            pos += 1;
106            while pos < line.len() && depth > 0 {
107                match line[pos] {
108                    b'"' => {
109                        pos += 1;
110                        let mut escaped = false;
111                        while pos < line.len() {
112                            if escaped {
113                                escaped = false;
114                            } else if line[pos] == b'\\' {
115                                escaped = true;
116                            } else if line[pos] == b'"' {
117                                break;
118                            }
119                            pos += 1;
120                        }
121                    }
122                    b'[' => depth += 1,
123                    b']' => depth -= 1,
124                    _ => {}
125                }
126                pos += 1;
127            }
128            if depth != 0 || pos > line.len() {
129                return None; // Unterminated array.
130            }
131            Some((line[start..pos].to_vec(), pos))
132        }
133        _ => {
134            // Number, boolean, null — scan until , or } or ] or whitespace.
135            while pos < line.len() {
136                match line[pos] {
137                    b',' | b'}' | b']' => break,
138                    _ if line[pos].is_ascii_whitespace() => break,
139                    _ => pos += 1,
140                }
141            }
142            if pos == start {
143                None
144            } else {
145                Some((line[start..pos].to_vec(), pos))
146            }
147        }
148    }
149}
150
151/// Parse a single JSON line into (template_parts, values).
152///
153/// Template parts are the structural bytes between values:
154///   Part 0 = everything from { up to and including the : before value 0
155///   Part 1 = everything from after value 0 up to and including : before value 1
156///   ...
157///   Part N = everything from after the last value to end of line (including } and \n)
158///
159/// Returns None if the line is not a flat-ish JSON object (we handle nested values
160/// as opaque blobs, but the top-level structure must be key:value pairs).
161type ParsedLine = (Vec<Vec<u8>>, Vec<Vec<u8>>);
162
163fn parse_line(line: &[u8]) -> Option<ParsedLine> {
164    let mut pos = 0;
165
166    // Skip leading whitespace.
167    while pos < line.len() && line[pos].is_ascii_whitespace() {
168        pos += 1;
169    }
170    if pos >= line.len() || line[pos] != b'{' {
171        return None;
172    }
173
174    let mut parts: Vec<Vec<u8>> = Vec::new();
175    let mut values: Vec<Vec<u8>> = Vec::new();
176    let mut part_start = 0;
177
178    pos += 1; // Skip opening {.
179
180    loop {
181        // Skip whitespace.
182        while pos < line.len() && line[pos].is_ascii_whitespace() {
183            pos += 1;
184        }
185        if pos >= line.len() {
186            return None;
187        }
188
189        // Check for closing brace (end of object).
190        if line[pos] == b'}' {
191            // Capture the final part: everything from part_start to end of line.
192            parts.push(line[part_start..].to_vec());
193            break;
194        }
195
196        // Expect a key string.
197        if line[pos] != b'"' {
198            return None;
199        }
200        // Skip over the key string.
201        pos += 1;
202        let mut escaped = false;
203        while pos < line.len() {
204            if escaped {
205                escaped = false;
206            } else if line[pos] == b'\\' {
207                escaped = true;
208            } else if line[pos] == b'"' {
209                pos += 1;
210                break;
211            }
212            pos += 1;
213        }
214
215        // Skip whitespace, expect colon.
216        while pos < line.len() && line[pos].is_ascii_whitespace() {
217            pos += 1;
218        }
219        if pos >= line.len() || line[pos] != b':' {
220            return None;
221        }
222        pos += 1; // Skip colon.
223
224        // Everything from part_start up to here is a "template part".
225        parts.push(line[part_start..pos].to_vec());
226
227        // Extract the value.
228        let (value, value_end) = extract_value(line, pos)?;
229        values.push(value);
230        pos = value_end;
231
232        // Mark the start of the next part.
233        part_start = pos;
234
235        // Skip whitespace.
236        while pos < line.len() && line[pos].is_ascii_whitespace() {
237            pos += 1;
238        }
239        if pos >= line.len() {
240            return None;
241        }
242
243        // Expect comma or closing brace.
244        if line[pos] == b',' {
245            pos += 1;
246        } else if line[pos] == b'}' {
247            // Will be caught at the top of the loop next iteration — but we
248            // need to NOT advance pos, so the } check above catches it.
249            // Actually, let's just handle it here.
250            parts.push(line[part_start..].to_vec());
251            break;
252        } else {
253            return None; // Unexpected character.
254        }
255    }
256
257    if values.is_empty() {
258        return None;
259    }
260
261    Some((parts, values))
262}
263
264/// Split NDJSON data into lines (without newline characters).
265fn split_lines(data: &[u8]) -> Vec<&[u8]> {
266    let mut lines: Vec<&[u8]> = Vec::new();
267    let mut start = 0;
268    for i in 0..data.len() {
269        if data[i] == b'\n' {
270            lines.push(&data[start..i]);
271            start = i + 1;
272        }
273    }
274    if start < data.len() {
275        lines.push(&data[start..]);
276    }
277    lines
278}
279
280/// Build columnar data from parsed lines that share the same template.
281/// Returns (col_data, metadata) for a uniform group.
282fn build_uniform_columnar(
283    template_parts: &[Vec<u8>],
284    columns: &[Vec<Vec<u8>>],
285    num_rows: usize,
286    has_trailing_newline: bool,
287) -> (Vec<u8>, Vec<u8>) {
288    let num_cols = columns.len();
289
290    // Build column data: values separated by \x01, columns separated by \x00.
291    let mut col_data = Vec::new();
292    for (ci, col) in columns.iter().enumerate() {
293        for (ri, val) in col.iter().enumerate() {
294            col_data.extend_from_slice(val);
295            if ri < num_rows - 1 {
296                col_data.push(VAL_SEP);
297            }
298        }
299        if ci < num_cols - 1 {
300            col_data.push(COL_SEP);
301        }
302    }
303
304    // Build metadata: version + num_rows + num_cols + trailing_newline + template parts.
305    let mut metadata = Vec::new();
306    metadata.push(METADATA_VERSION_UNIFORM);
307    metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
308    metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
309    metadata.push(if has_trailing_newline { 1 } else { 0 });
310    metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
311    for part in template_parts {
312        metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
313        metadata.extend_from_slice(part);
314    }
315
316    (col_data, metadata)
317}
318
319/// Strategy 1: Uniform schema — all rows must have the same template.
320/// Returns None if schemas differ.
321fn preprocess_uniform(
322    non_empty: &[&[u8]],
323    has_trailing_newline: bool,
324) -> Option<(Vec<u8>, Vec<u8>)> {
325    if non_empty.len() < 2 {
326        return None;
327    }
328
329    let (template_parts, first_values) = parse_line(non_empty[0])?;
330    let num_cols = first_values.len();
331    if template_parts.len() != num_cols + 1 {
332        return None;
333    }
334
335    let mut columns: Vec<Vec<Vec<u8>>> = Vec::with_capacity(num_cols);
336    for v in &first_values {
337        columns.push(vec![v.clone()]);
338    }
339
340    for &line in &non_empty[1..] {
341        let (parts, values) = parse_line(line)?;
342        if values.len() != num_cols || parts.len() != template_parts.len() {
343            return None;
344        }
345        for (a, b) in parts.iter().zip(template_parts.iter()) {
346            if a != b {
347                return None;
348            }
349        }
350        for (col, val) in values.iter().enumerate() {
351            columns[col].push(val.clone());
352        }
353    }
354
355    Some(build_uniform_columnar(
356        &template_parts,
357        &columns,
358        non_empty.len(),
359        has_trailing_newline,
360    ))
361}
362
363/// Strategy 2: Group-by-schema — group rows by template, columnarize each group.
364///
365/// Metadata format (version=2):
366///   version: u8 = 2
367///   has_trailing_newline: u8
368///   total_rows: u32 LE
369///   num_groups: u16 LE
370///   for each group:
371///     num_rows: u32 LE
372///     row_indices: [u32 LE * num_rows]
373///     group_metadata_len: u32 LE
374///     group_metadata: [bytes]  (Strategy 1 metadata for this group)
375///   residual_count: u32 LE
376///   residual_indices: [u32 LE * residual_count]
377///
378/// Data format:
379///   for each group:
380///     data_len: u32 LE
381///     data: [bytes]  (columnar data for this group)
382///   residual_data: [bytes]  (raw lines joined by \n)
383fn preprocess_grouped(
384    non_empty: &[&[u8]],
385    has_trailing_newline: bool,
386) -> Option<(Vec<u8>, Vec<u8>)> {
387    if non_empty.len() < MIN_GROUP_ROWS {
388        return None;
389    }
390
391    // Parse all lines and group by template (the template parts identify the schema).
392    // We use the template parts as the group key.
393    let mut parsed: Vec<Option<ParsedLine>> = Vec::with_capacity(non_empty.len());
394    for &line in non_empty {
395        parsed.push(parse_line(line));
396    }
397
398    // Group rows by template. Key = template parts (as bytes for hashing).
399    // We store groups as: template_key -> (template_parts, vec of (row_index, values)).
400    let mut group_map: HashMap<Vec<u8>, SchemaGroup> = HashMap::new();
401    let mut residual_indices: Vec<usize> = Vec::new();
402
403    for (idx, parsed_line) in parsed.into_iter().enumerate() {
404        if let Some((parts, values)) = parsed_line {
405            // Build a hashable key from the template parts.
406            let mut key = Vec::new();
407            for part in &parts {
408                key.extend_from_slice(&(part.len() as u32).to_le_bytes());
409                key.extend_from_slice(part);
410            }
411            group_map
412                .entry(key)
413                .or_insert_with(|| (parts, Vec::new()))
414                .1
415                .push((idx, values));
416        } else {
417            // Unparseable line goes to residual.
418            residual_indices.push(idx);
419        }
420    }
421
422    // Separate groups into qualifying (>= MIN_GROUP_ROWS) and residual.
423    let mut groups: Vec<SchemaGroup> = Vec::new();
424    for (_key, (template_parts, rows)) in group_map {
425        if rows.len() >= MIN_GROUP_ROWS {
426            groups.push((template_parts, rows));
427        } else {
428            // Too few rows — send to residual.
429            for (idx, _) in &rows {
430                residual_indices.push(*idx);
431            }
432        }
433    }
434
435    // Need at least 1 qualifying group for this to be useful.
436    if groups.is_empty() {
437        return None;
438    }
439
440    // Sort groups by their first row index for deterministic output.
441    groups.sort_by_key(|(_, rows)| rows[0].0);
442    residual_indices.sort_unstable();
443
444    // Build per-group columnar data and metadata.
445    struct GroupOutput {
446        row_indices: Vec<u32>,
447        col_data: Vec<u8>,
448        group_metadata: Vec<u8>,
449    }
450
451    let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
452
453    for (template_parts, rows) in &groups {
454        let num_cols = template_parts.len() - 1;
455        let mut columns: Vec<Vec<Vec<u8>>> = (0..num_cols).map(|_| Vec::new()).collect();
456        let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
457
458        for (idx, values) in rows {
459            row_indices.push(*idx as u32);
460            for (col, val) in values.iter().enumerate() {
461                columns[col].push(val.clone());
462            }
463        }
464
465        // Build columnar data for this group (trailing_newline=false for sub-groups).
466        let (col_data, group_metadata) =
467            build_uniform_columnar(template_parts, &columns, rows.len(), false);
468
469        group_outputs.push(GroupOutput {
470            row_indices,
471            col_data,
472            group_metadata,
473        });
474    }
475
476    // Build the combined data blob.
477    let mut data_out = Vec::new();
478    for group in &group_outputs {
479        data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
480        data_out.extend_from_slice(&group.col_data);
481    }
482
483    // Append residual lines (raw, separated by \n).
484    let residual_start = data_out.len();
485    for (i, &idx) in residual_indices.iter().enumerate() {
486        data_out.extend_from_slice(non_empty[idx]);
487        if i < residual_indices.len() - 1 {
488            data_out.push(b'\n');
489        }
490    }
491    let _residual_len = data_out.len() - residual_start;
492
493    // Build the combined metadata.
494    let mut metadata = Vec::new();
495    metadata.push(METADATA_VERSION_GROUPED);
496    metadata.push(if has_trailing_newline { 1 } else { 0 });
497    metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
498    metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
499
500    for group in &group_outputs {
501        metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
502        for &idx in &group.row_indices {
503            metadata.extend_from_slice(&idx.to_le_bytes());
504        }
505        metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
506        metadata.extend_from_slice(&group.group_metadata);
507    }
508
509    metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
510    for &idx in &residual_indices {
511        metadata.extend_from_slice(&(idx as u32).to_le_bytes());
512    }
513
514    Some((data_out, metadata))
515}
516
517/// Metadata describing which columns were flattened from nested objects.
518pub(crate) struct NestedGroupInfo {
519    /// Index of the original column that was expanded.
520    pub(crate) original_col_index: u16,
521    /// Sub-key names for the expanded sub-columns.
522    pub(crate) sub_keys: Vec<Vec<u8>>,
523    /// Template parts for reconstructing nested objects (preserves original formatting).
524    /// If empty, compact format `{"key":val,...}` is used (NDJSON compatibility).
525    pub(crate) nested_template: Vec<Vec<u8>>,
526    /// Absence bitmap: one bit per (sub_key, row). Bit=1 means the key was ABSENT
527    /// in the original object (not present at all), vs bit=0 means key was present
528    /// (possibly with explicit `null`). Packed LSB-first.
529    /// Length = ceil(num_sub_keys * num_rows / 8).
530    /// Empty if all rows had all keys (no absences).
531    pub(crate) absence_bitmap: Vec<u8>,
532}
533
534/// Attempt to flatten nested JSON objects in columnar data (depth-1).
535///
536/// Takes columnar data (\x00/\x01 separated) and returns expanded columnar data
537/// with nested objects decomposed into sub-columns.
538/// Returns None if no nested objects found.
539pub(crate) fn flatten_nested_columns(
540    col_data: &[u8],
541    num_rows: usize,
542) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
543    // Split into columns.
544    let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
545    if columns.is_empty() || num_rows == 0 {
546        return None;
547    }
548
549    let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
550    // Build the output columns: for non-nested cols, keep as-is.
551    // For nested cols, replace with sub-columns.
552    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
553
554    for (col_idx, &col_chunk) in columns.iter().enumerate() {
555        let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
556        if values.len() != num_rows {
557            return None;
558        }
559
560        // Check if ALL non-null values start with '{' (nested object).
561        let mut all_objects = true;
562        let mut has_non_null = false;
563        for val in &values {
564            if *val == b"null" {
565                continue;
566            }
567            has_non_null = true;
568            if !val.starts_with(b"{") {
569                all_objects = false;
570                break;
571            }
572        }
573
574        if !all_objects || !has_non_null {
575            // Not a nested-object column — keep as-is.
576            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
577            output_columns.push(col_values);
578            continue;
579        }
580
581        // This column contains nested objects — decompose depth-1.
582        // Parse all values and collect all unique sub-keys (preserving discovery order).
583        // Also capture the template from the first non-null object to preserve formatting.
584        let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
585        let mut nested_template: Vec<Vec<u8>> = Vec::new();
586        type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
587        let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
588
589        for val in &values {
590            if *val == b"null" {
591                parsed_rows.push(None);
592                continue;
593            }
594            if nested_template.is_empty() {
595                // First non-null: use template-preserving parser.
596                match parse_nested_object_with_template(val) {
597                    Some((template, kv_pairs)) => {
598                        for (key, _) in &kv_pairs {
599                            if !all_sub_keys.iter().any(|k| k == key) {
600                                all_sub_keys.push(key.clone());
601                            }
602                        }
603                        nested_template = template;
604                        parsed_rows.push(Some(kv_pairs));
605                    }
606                    None => {
607                        all_sub_keys.clear();
608                        break;
609                    }
610                }
611            } else {
612                // Subsequent rows: use simpler kv parser (template already captured).
613                match parse_nested_object_kv(val) {
614                    Some(kv_pairs) => {
615                        for (key, _) in &kv_pairs {
616                            if !all_sub_keys.iter().any(|k| k == key) {
617                                all_sub_keys.push(key.clone());
618                            }
619                        }
620                        parsed_rows.push(Some(kv_pairs));
621                    }
622                    None => {
623                        all_sub_keys.clear();
624                        break;
625                    }
626                }
627            }
628        }
629
630        if all_sub_keys.is_empty() {
631            // Could not parse — keep column as-is.
632            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
633            output_columns.push(col_values);
634            continue;
635        }
636
637        // Build sub-columns: for each sub-key, extract values from parsed rows.
638        // Also build an absence bitmap: bit=1 where a key was absent from the
639        // original row (as opposed to being present with explicit `null`).
640        let num_sub_keys = all_sub_keys.len();
641        let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
642        let total_bits = num_sub_keys * num_rows;
643        let bitmap_bytes = total_bits.div_ceil(8);
644        let mut absence_bitmap = vec![0u8; bitmap_bytes];
645        let mut has_any_absent = false;
646
647        for (row_idx, parsed) in parsed_rows.iter().enumerate() {
648            match parsed {
649                Some(kv_pairs) => {
650                    for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
651                        let found = kv_pairs.iter().find(|(k, _)| k == sk);
652                        match found {
653                            Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
654                            None => {
655                                sub_columns[sk_idx].push(b"null".to_vec());
656                                // Mark this (sub_key, row) as absent.
657                                let bit_idx = sk_idx * num_rows + row_idx;
658                                absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
659                                has_any_absent = true;
660                            }
661                        }
662                    }
663                }
664                None => {
665                    // null row — all sub-columns get null.
666                    // These are NOT marked as absent (the whole column was null,
667                    // not individual keys missing).
668                    for sc in sub_columns.iter_mut() {
669                        sc.push(b"null".to_vec());
670                    }
671                }
672            }
673        }
674
675        nested_groups.push(NestedGroupInfo {
676            original_col_index: col_idx as u16,
677            sub_keys: all_sub_keys,
678            nested_template,
679            absence_bitmap: if has_any_absent {
680                absence_bitmap
681            } else {
682                Vec::new()
683            },
684        });
685
686        for sc in sub_columns {
687            output_columns.push(sc);
688        }
689    }
690
691    if nested_groups.is_empty() {
692        return None;
693    }
694
695    // Build the flattened columnar data.
696    let num_out_cols = output_columns.len();
697    let mut out = Vec::new();
698    for (ci, col) in output_columns.iter().enumerate() {
699        for (ri, val) in col.iter().enumerate() {
700            out.extend_from_slice(val);
701            if ri < num_rows - 1 {
702                out.push(VAL_SEP);
703            }
704        }
705        if ci < num_out_cols - 1 {
706            out.push(COL_SEP);
707        }
708    }
709
710    Some((out, nested_groups))
711}
712
713/// Parse a nested JSON object into (template_parts, kv_pairs).
714/// Template parts include all structural bytes (braces, keys, colons, whitespace) —
715/// preserving the original formatting so the object can be reconstructed exactly.
716/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
717#[allow(clippy::type_complexity)]
718pub(crate) fn parse_nested_object_with_template(
719    obj: &[u8],
720) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
721    let mut pos = 0;
722
723    // Skip whitespace.
724    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
725        pos += 1;
726    }
727    if pos >= obj.len() || obj[pos] != b'{' {
728        return None;
729    }
730    pos += 1;
731
732    let mut parts: Vec<Vec<u8>> = Vec::new();
733    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
734    let mut part_start = 0;
735
736    loop {
737        // Skip whitespace.
738        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
739            pos += 1;
740        }
741        if pos >= obj.len() {
742            return None;
743        }
744        if obj[pos] == b'}' {
745            parts.push(obj[part_start..].to_vec());
746            break;
747        }
748
749        // Expect a key string.
750        if obj[pos] != b'"' {
751            return None;
752        }
753        let key_str_start = pos + 1;
754        pos += 1;
755        let mut escaped = false;
756        while pos < obj.len() {
757            if escaped {
758                escaped = false;
759            } else if obj[pos] == b'\\' {
760                escaped = true;
761            } else if obj[pos] == b'"' {
762                break;
763            }
764            pos += 1;
765        }
766        if pos >= obj.len() {
767            return None;
768        }
769        let key = obj[key_str_start..pos].to_vec();
770        pos += 1; // skip closing quote
771
772        // Skip whitespace, expect colon.
773        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
774            pos += 1;
775        }
776        if pos >= obj.len() || obj[pos] != b':' {
777            return None;
778        }
779        pos += 1;
780
781        // Skip whitespace between colon and value — include in template.
782        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
783            pos += 1;
784        }
785
786        // Template part: everything from part_start to here (includes key, colon, post-colon ws).
787        parts.push(obj[part_start..pos].to_vec());
788
789        // Extract the value (no whitespace skipping — already consumed above).
790        let value_start = pos;
791        // Use extract_value but we've already consumed whitespace.
792        let (value, value_end) = extract_value(obj, value_start)?;
793        pos = value_end;
794        pairs.push((key, value));
795
796        part_start = pos;
797
798        // Skip whitespace.
799        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
800            pos += 1;
801        }
802        if pos >= obj.len() {
803            return None;
804        }
805        if obj[pos] == b',' {
806            pos += 1;
807        } else if obj[pos] == b'}' {
808            parts.push(obj[part_start..].to_vec());
809            break;
810        } else {
811            return None;
812        }
813    }
814
815    if pairs.is_empty() {
816        return None;
817    }
818    Some((parts, pairs))
819}
820
821/// Parse a nested JSON object into its key-value pairs (depth-1 only).
822/// Returns the exact bytes for each key and value.
823/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
824pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
825    let mut pos = 0;
826
827    // Skip whitespace.
828    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
829        pos += 1;
830    }
831    if pos >= obj.len() || obj[pos] != b'{' {
832        return None;
833    }
834    pos += 1;
835
836    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
837
838    loop {
839        // Skip whitespace.
840        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
841            pos += 1;
842        }
843        if pos >= obj.len() {
844            return None;
845        }
846        if obj[pos] == b'}' {
847            break;
848        }
849
850        // Expect a key string.
851        if obj[pos] != b'"' {
852            return None;
853        }
854        pos += 1;
855        let key_start = pos;
856        let mut escaped = false;
857        while pos < obj.len() {
858            if escaped {
859                escaped = false;
860            } else if obj[pos] == b'\\' {
861                escaped = true;
862            } else if obj[pos] == b'"' {
863                break;
864            }
865            pos += 1;
866        }
867        if pos >= obj.len() {
868            return None;
869        }
870        let key = obj[key_start..pos].to_vec();
871        pos += 1; // skip closing quote
872
873        // Skip whitespace, expect colon.
874        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
875            pos += 1;
876        }
877        if pos >= obj.len() || obj[pos] != b':' {
878            return None;
879        }
880        pos += 1;
881
882        // Extract the value.
883        let (value, value_end) = extract_value(obj, pos)?;
884        pos = value_end;
885        pairs.push((key, value));
886
887        // Skip whitespace.
888        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
889            pos += 1;
890        }
891        if pos >= obj.len() {
892            return None;
893        }
894        if obj[pos] == b',' {
895            pos += 1;
896        } else if obj[pos] == b'}' {
897            break;
898        } else {
899            return None;
900        }
901    }
902
903    if pairs.is_empty() {
904        return None;
905    }
906    Some(pairs)
907}
908
909/// Unflatten nested sub-columns back into JSON objects.
910///
911/// Takes flattened columnar data and nested group info, merges sub-columns
912/// back into the original nested object columns.
913pub(crate) fn unflatten_nested_columns(
914    flat_data: &[u8],
915    nested_groups: &[NestedGroupInfo],
916    num_rows: usize,
917    total_flat_cols: usize,
918) -> Vec<u8> {
919    let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
920    if flat_columns.len() != total_flat_cols {
921        return flat_data.to_vec();
922    }
923
924    // Parse all flat column values.
925    let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
926    for chunk in &flat_columns {
927        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
928        if vals.len() != num_rows {
929            return flat_data.to_vec();
930        }
931        flat_col_values.push(vals);
932    }
933
934    // Reconstruct original columns from flat columns.
935    // Walk through flat columns, merging sub-columns back where needed.
936    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
937
938    // Build a set of original_col_index -> group for quick lookup.
939    // We need to know which flat columns map to which nested group.
940    // The flat columns are in order: non-nested cols keep their position,
941    // nested cols are replaced by their sub-columns at that position.
942    //
943    // To figure out which flat_idx corresponds to what, we replay the
944    // forward mapping.
945    // We need to know the ORIGINAL number of columns.
946    let original_num_cols = total_flat_cols
947        - nested_groups
948            .iter()
949            .map(|g| g.sub_keys.len())
950            .sum::<usize>()
951        + nested_groups.len();
952
953    // Build mapping: for each original col, is it nested or not?
954    let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
955    for (gi, group) in nested_groups.iter().enumerate() {
956        if (group.original_col_index as usize) < original_num_cols {
957            original_col_map[group.original_col_index as usize] = Some(gi);
958        }
959    }
960
961    let mut flat_idx = 0;
962    for entry in original_col_map.iter().take(original_num_cols) {
963        if let Some(gi) = entry {
964            let group = &nested_groups[*gi];
965            let num_sub = group.sub_keys.len();
966
967            // Helper: check if sub-key `si` at `row` is absent using bitmap.
968            let is_absent = |si: usize, row: usize| -> bool {
969                if group.absence_bitmap.is_empty() {
970                    return false; // no absences in this group
971                }
972                let bit_idx = si * num_rows + row;
973                let byte_idx = bit_idx / 8;
974                if byte_idx >= group.absence_bitmap.len() {
975                    return false;
976                }
977                (group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
978            };
979
980            // Merge sub-columns back into nested objects.
981            let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
982            for row in 0..num_rows {
983                // Check if all sub-columns are null or absent for this row
984                // (meaning the whole nested column was null).
985                let all_null = (0..num_sub).all(|si| {
986                    flat_idx + si < flat_col_values.len()
987                        && flat_col_values[flat_idx + si][row] == b"null"
988                });
989                if all_null && !group.absence_bitmap.is_empty() {
990                    // If all values are null but some are "absent" and some are
991                    // "explicit null", we need to reconstruct, not collapse.
992                    let any_present_null = (0..num_sub).any(|si| {
993                        flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
994                    });
995                    if any_present_null {
996                        // At least one key has an explicit null — don't collapse.
997                        // Fall through to reconstruction below.
998                    } else {
999                        // All nulls are from absent keys — whole column was null.
1000                        merged_col.push(b"null".to_vec());
1001                        continue;
1002                    }
1003                } else if all_null {
1004                    merged_col.push(b"null".to_vec());
1005                    continue;
1006                }
1007
1008                // Check whether any sub-key is absent in this row.
1009                let has_absent = (0..num_sub).any(|si| is_absent(si, row));
1010
1011                if !has_absent
1012                    && !group.nested_template.is_empty()
1013                    && group.nested_template.len() == num_sub + 1
1014                {
1015                    // Template-based reconstruction: all keys present,
1016                    // preserves original formatting exactly.
1017                    let mut obj = Vec::new();
1018                    obj.extend_from_slice(&group.nested_template[0]);
1019                    if flat_idx < flat_col_values.len() {
1020                        obj.extend_from_slice(flat_col_values[flat_idx][row]);
1021                    }
1022                    for si in 1..num_sub {
1023                        obj.extend_from_slice(&group.nested_template[si]);
1024                        if flat_idx + si < flat_col_values.len() {
1025                            obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
1026                        }
1027                    }
1028                    obj.extend_from_slice(&group.nested_template[num_sub]);
1029                    merged_col.push(obj);
1030                } else {
1031                    // Compact reconstruction: some keys absent, or no template.
1032                    // Skip sub-keys that were absent in the original.
1033                    let mut obj = Vec::new();
1034                    obj.push(b'{');
1035                    let mut first = true;
1036                    for si in 0..num_sub {
1037                        if flat_idx + si >= flat_col_values.len() {
1038                            break;
1039                        }
1040                        if is_absent(si, row) {
1041                            continue; // key was absent — omit entirely
1042                        }
1043                        let val = flat_col_values[flat_idx + si][row];
1044                        if !first {
1045                            obj.push(b',');
1046                        }
1047                        first = false;
1048                        obj.push(b'"');
1049                        obj.extend_from_slice(&group.sub_keys[si]);
1050                        obj.push(b'"');
1051                        obj.push(b':');
1052                        obj.extend_from_slice(val);
1053                    }
1054                    obj.push(b'}');
1055                    merged_col.push(obj);
1056                }
1057            }
1058            output_columns.push(merged_col);
1059            flat_idx += num_sub;
1060        } else {
1061            // Non-nested column — copy as-is.
1062            if flat_idx < flat_col_values.len() {
1063                let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1064                    .iter()
1065                    .map(|v| v.to_vec())
1066                    .collect();
1067                output_columns.push(col);
1068            }
1069            flat_idx += 1;
1070        }
1071    }
1072
1073    // Rebuild columnar data.
1074    let num_out_cols = output_columns.len();
1075    let mut out = Vec::new();
1076    for (ci, col) in output_columns.iter().enumerate() {
1077        for (ri, val) in col.iter().enumerate() {
1078            out.extend_from_slice(val);
1079            if ri < num_rows - 1 {
1080                out.push(VAL_SEP);
1081            }
1082        }
1083        if ci < num_out_cols - 1 {
1084            out.push(COL_SEP);
1085        }
1086    }
1087
1088    out
1089}
1090
1091/// Serialize nested group info into bytes for storage in metadata.
1092/// Version 1 (has_nested=1): sub_keys only (backward compat, NDJSON path).
1093/// Version 2 (has_nested=2): sub_keys + nested_template (preserves formatting).
1094/// Version 3 (has_nested=3): sub_keys + nested_template + absence_bitmap.
1095pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1096    let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1097    let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
1098    let mut out = Vec::new();
1099    let version = if has_absence {
1100        3u8
1101    } else if has_template {
1102        2u8
1103    } else {
1104        1u8
1105    };
1106    out.push(version);
1107    out.push(groups.len() as u8);
1108    for group in groups {
1109        out.extend_from_slice(&group.original_col_index.to_le_bytes());
1110        out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1111        for key in &group.sub_keys {
1112            out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1113            out.extend_from_slice(key);
1114        }
1115        if has_template || version == 3 {
1116            out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1117            for part in &group.nested_template {
1118                out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1119                out.extend_from_slice(part);
1120            }
1121        }
1122        if version == 3 {
1123            let bm_len = group.absence_bitmap.len() as u32;
1124            out.extend_from_slice(&bm_len.to_le_bytes());
1125            out.extend_from_slice(&group.absence_bitmap);
1126        }
1127    }
1128    out
1129}
1130
1131/// Deserialize nested group info from metadata bytes.
1132/// Returns (nested_groups, bytes_consumed).
1133/// Handles version 1 (no template), version 2 (with template),
1134/// and version 3 (template + absence bitmap).
1135pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1136    if data.is_empty() {
1137        return None;
1138    }
1139    let mut pos = 0;
1140    let version = data[pos];
1141    pos += 1;
1142    if version != 1 && version != 2 && version != 3 {
1143        return None;
1144    }
1145    let has_template = version == 2 || version == 3;
1146    let has_absence = version == 3;
1147    if pos >= data.len() {
1148        return None;
1149    }
1150    let num_groups = data[pos] as usize;
1151    pos += 1;
1152
1153    let mut groups = Vec::with_capacity(num_groups);
1154    for _ in 0..num_groups {
1155        if pos + 4 > data.len() {
1156            return None;
1157        }
1158        let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1159        pos += 2;
1160        let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1161        pos += 2;
1162
1163        let mut sub_keys = Vec::with_capacity(num_sub_cols);
1164        for _ in 0..num_sub_cols {
1165            if pos + 2 > data.len() {
1166                return None;
1167            }
1168            let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1169            pos += 2;
1170            if pos + key_len > data.len() {
1171                return None;
1172            }
1173            sub_keys.push(data[pos..pos + key_len].to_vec());
1174            pos += key_len;
1175        }
1176
1177        let nested_template = if has_template {
1178            if pos + 2 > data.len() {
1179                return None;
1180            }
1181            let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1182            pos += 2;
1183            let mut parts = Vec::with_capacity(num_parts);
1184            for _ in 0..num_parts {
1185                if pos + 2 > data.len() {
1186                    return None;
1187                }
1188                let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1189                pos += 2;
1190                if pos + part_len > data.len() {
1191                    return None;
1192                }
1193                parts.push(data[pos..pos + part_len].to_vec());
1194                pos += part_len;
1195            }
1196            parts
1197        } else {
1198            Vec::new()
1199        };
1200
1201        let absence_bitmap = if has_absence {
1202            if pos + 4 > data.len() {
1203                return None;
1204            }
1205            let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1206            pos += 4;
1207            if pos + bm_len > data.len() {
1208                return None;
1209            }
1210            let bm = data[pos..pos + bm_len].to_vec();
1211            pos += bm_len;
1212            bm
1213        } else {
1214            Vec::new()
1215        };
1216
1217        groups.push(NestedGroupInfo {
1218            original_col_index,
1219            sub_keys,
1220            nested_template,
1221            absence_bitmap,
1222        });
1223    }
1224
1225    Some((groups, pos))
1226}
1227
1228/// Forward transform: NDJSON columnar reorg.
1229///
1230/// Tries Strategy 1 (uniform) first, then Strategy 2 (grouped) if schemas differ.
1231/// Returns None if data is not suitable for columnar transform.
1232pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1233    if data.is_empty() {
1234        return None;
1235    }
1236
1237    let has_trailing_newline = data.last() == Some(&b'\n');
1238    let lines = split_lines(data);
1239    let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1240
1241    if non_empty.len() < 2 {
1242        return None;
1243    }
1244
1245    // Strategy 1: try uniform schema first.
1246    if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1247        if col_data.len() + metadata.len() < data.len() {
1248            // Try depth-1 nested decomposition on the columnar output.
1249            // Even if the flattened data is slightly larger raw, the downstream
1250            // typed encoding + compression benefits are significant: null bitmaps
1251            // are compact and type-homogeneous columns compress much better.
1252            let num_rows = non_empty.len();
1253            if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1254                // Verify roundtrip: unflatten must produce the exact original columnar
1255                // data. Nested objects with varying sub-key sets or key ordering can
1256                // cause the compact reconstruction to reorder keys, breaking byte-exact
1257                // roundtrip. Only apply if the unflatten is provably lossless.
1258                let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
1259                let unflattened =
1260                    unflatten_nested_columns(&flat_data, &nested_groups, num_rows, total_flat_cols);
1261                if unflattened == col_data {
1262                    // Append nested info to metadata.
1263                    let nested_bytes = serialize_nested_info(&nested_groups);
1264                    metadata.extend_from_slice(&nested_bytes);
1265                    return Some(TransformResult {
1266                        data: flat_data,
1267                        metadata,
1268                    });
1269                }
1270                // else: roundtrip not exact — skip nested flatten.
1271            }
1272            // No nested objects found — append has_nested=0.
1273            metadata.push(0u8); // has_nested = 0
1274            return Some(TransformResult {
1275                data: col_data,
1276                metadata,
1277            });
1278        }
1279    }
1280
1281    // Strategy 2: group by schema.
1282    if let Some((grouped_data, grouped_metadata)) =
1283        preprocess_grouped(&non_empty, has_trailing_newline)
1284    {
1285        if grouped_data.len() + grouped_metadata.len() < data.len() {
1286            return Some(TransformResult {
1287                data: grouped_data,
1288                metadata: grouped_metadata,
1289            });
1290        }
1291    }
1292
1293    None
1294}
1295
1296/// Reverse transform: reconstruct NDJSON from columnar layout + metadata.
1297/// Dispatches to the appropriate decoder based on metadata version byte.
1298pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1299    if metadata.is_empty() {
1300        return data.to_vec();
1301    }
1302    match metadata[0] {
1303        METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1304        METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1305        _ => data.to_vec(),
1306    }
1307}
1308
1309/// Reverse Strategy 1: uniform schema.
1310fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1311    if metadata.len() < 10 {
1312        return data.to_vec();
1313    }
1314    let mut pos = 0;
1315    let _version = metadata[pos];
1316    pos += 1;
1317    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1318    pos += 4;
1319    let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1320    pos += 2;
1321    let has_trailing_newline = metadata[pos] != 0;
1322    pos += 1;
1323    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1324    pos += 2;
1325
1326    let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1327    for _ in 0..num_parts {
1328        if pos + 2 > metadata.len() {
1329            return data.to_vec();
1330        }
1331        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1332        pos += 2;
1333        if pos + part_len > metadata.len() {
1334            return data.to_vec();
1335        }
1336        parts.push(metadata[pos..pos + part_len].to_vec());
1337        pos += part_len;
1338    }
1339
1340    if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1341        return data.to_vec();
1342    }
1343
1344    // Check for nested metadata after template parts.
1345    let remaining_metadata = &metadata[pos..];
1346    if !remaining_metadata.is_empty()
1347        && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
1348    {
1349        // has_nested == 1, 2, or 3: unflatten before reconstructing rows.
1350        if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1351            // Calculate total number of flat columns.
1352            let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1353            let unflattened =
1354                unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1355            return reverse_uniform_from_parts(
1356                &unflattened,
1357                &parts,
1358                num_rows,
1359                num_cols,
1360                has_trailing_newline,
1361            );
1362        }
1363    }
1364
1365    reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1366}
1367
1368/// Core uniform reverse: given parsed parts, reconstruct lines from columnar data.
1369fn reverse_uniform_from_parts(
1370    data: &[u8],
1371    parts: &[Vec<u8>],
1372    num_rows: usize,
1373    num_cols: usize,
1374    has_trailing_newline: bool,
1375) -> Vec<u8> {
1376    let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1377    if col_chunks.len() != num_cols {
1378        return data.to_vec();
1379    }
1380
1381    let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1382    for chunk in &col_chunks {
1383        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1384        if vals.len() != num_rows {
1385            return data.to_vec();
1386        }
1387        columns.push(vals);
1388    }
1389
1390    let mut output = Vec::with_capacity(data.len() * 2);
1391    #[allow(clippy::needless_range_loop)]
1392    for row in 0..num_rows {
1393        output.extend_from_slice(&parts[0]);
1394        output.extend_from_slice(columns[0][row]);
1395        for col in 1..num_cols {
1396            output.extend_from_slice(&parts[col]);
1397            output.extend_from_slice(columns[col][row]);
1398        }
1399        output.extend_from_slice(&parts[num_cols]);
1400
1401        if row < num_rows - 1 || has_trailing_newline {
1402            output.push(b'\n');
1403        }
1404    }
1405
1406    output
1407}
1408
1409/// Parse Strategy 1 metadata and return (parts, num_rows, num_cols, has_trailing_newline).
1410/// Used by reverse_grouped to decode per-group metadata.
1411fn parse_uniform_metadata(metadata: &[u8]) -> Option<(Vec<Vec<u8>>, usize, usize, bool)> {
1412    if metadata.len() < 10 {
1413        return None;
1414    }
1415    let mut pos = 1; // Skip version byte.
1416    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1417    pos += 4;
1418    let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1419    pos += 2;
1420    let has_trailing_newline = metadata[pos] != 0;
1421    pos += 1;
1422    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1423    pos += 2;
1424
1425    let mut parts = Vec::with_capacity(num_parts);
1426    for _ in 0..num_parts {
1427        if pos + 2 > metadata.len() {
1428            return None;
1429        }
1430        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1431        pos += 2;
1432        if pos + part_len > metadata.len() {
1433            return None;
1434        }
1435        parts.push(metadata[pos..pos + part_len].to_vec());
1436        pos += part_len;
1437    }
1438
1439    if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1440        return None;
1441    }
1442
1443    Some((parts, num_rows, num_cols, has_trailing_newline))
1444}
1445
1446/// Reverse Strategy 2: grouped schema.
1447fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1448    if metadata.len() < 8 {
1449        return data.to_vec();
1450    }
1451
1452    let mut mpos = 1; // Skip version byte.
1453    let has_trailing_newline = metadata[mpos] != 0;
1454    mpos += 1;
1455    let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1456    mpos += 4;
1457    let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1458    mpos += 2;
1459
1460    // Allocate output slots.
1461    let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1462
1463    // Data cursor.
1464    let mut dpos: usize = 0;
1465
1466    for _ in 0..num_groups {
1467        // Read row indices for this group.
1468        if mpos + 4 > metadata.len() {
1469            return data.to_vec();
1470        }
1471        let group_row_count =
1472            u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1473        mpos += 4;
1474
1475        let mut row_indices = Vec::with_capacity(group_row_count);
1476        for _ in 0..group_row_count {
1477            if mpos + 4 > metadata.len() {
1478                return data.to_vec();
1479            }
1480            let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1481            mpos += 4;
1482            row_indices.push(idx);
1483        }
1484
1485        // Read group metadata.
1486        if mpos + 4 > metadata.len() {
1487            return data.to_vec();
1488        }
1489        let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1490        mpos += 4;
1491        if mpos + gm_len > metadata.len() {
1492            return data.to_vec();
1493        }
1494        let group_metadata = &metadata[mpos..mpos + gm_len];
1495        mpos += gm_len;
1496
1497        // Read group data from the data blob.
1498        if dpos + 4 > data.len() {
1499            return data.to_vec();
1500        }
1501        let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1502        dpos += 4;
1503        if dpos + gd_len > data.len() {
1504            return data.to_vec();
1505        }
1506        let group_data = &data[dpos..dpos + gd_len];
1507        dpos += gd_len;
1508
1509        // Decode this group using Strategy 1 reverse.
1510        let (parts, num_rows, num_cols, _trailing) = match parse_uniform_metadata(group_metadata) {
1511            Some(v) => v,
1512            None => return data.to_vec(),
1513        };
1514
1515        if num_rows != group_row_count {
1516            return data.to_vec();
1517        }
1518
1519        // Split columnar data into columns and values.
1520        let col_chunks: Vec<&[u8]> = group_data.split(|&b| b == COL_SEP).collect();
1521        if col_chunks.len() != num_cols {
1522            return data.to_vec();
1523        }
1524
1525        let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1526        for chunk in &col_chunks {
1527            let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1528            if vals.len() != num_rows {
1529                return data.to_vec();
1530            }
1531            columns.push(vals);
1532        }
1533
1534        // Reconstruct each line for this group.
1535        for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1536            let mut line = Vec::new();
1537            line.extend_from_slice(&parts[0]);
1538            line.extend_from_slice(columns[0][row_within_group]);
1539            for col in 1..num_cols {
1540                line.extend_from_slice(&parts[col]);
1541                line.extend_from_slice(columns[col][row_within_group]);
1542            }
1543            line.extend_from_slice(&parts[num_cols]);
1544
1545            if original_idx < total_rows {
1546                output_lines[original_idx] = Some(line);
1547            }
1548        }
1549    }
1550
1551    // Read residual indices from metadata.
1552    if mpos + 4 > metadata.len() {
1553        return data.to_vec();
1554    }
1555    let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1556    mpos += 4;
1557
1558    let mut residual_indices = Vec::with_capacity(residual_count);
1559    for _ in 0..residual_count {
1560        if mpos + 4 > metadata.len() {
1561            return data.to_vec();
1562        }
1563        let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1564        mpos += 4;
1565        residual_indices.push(idx);
1566    }
1567
1568    // Remaining data is residual lines.
1569    let residual_data = &data[dpos..];
1570    if residual_count > 0 {
1571        let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
1572            vec![]
1573        } else {
1574            residual_data.split(|&b| b == b'\n').collect()
1575        };
1576        // There should be exactly residual_count lines.
1577        if residual_lines.len() != residual_count {
1578            return data.to_vec();
1579        }
1580        for (i, &idx) in residual_indices.iter().enumerate() {
1581            if idx < total_rows {
1582                output_lines[idx] = Some(residual_lines[i].to_vec());
1583            }
1584        }
1585    }
1586
1587    // Assemble final output.
1588    let mut output = Vec::with_capacity(data.len() * 2);
1589    for (i, slot) in output_lines.iter().enumerate() {
1590        match slot {
1591            Some(line) => output.extend_from_slice(line),
1592            None => {
1593                // Should not happen — missing row. Return data as-is.
1594                return data.to_vec();
1595            }
1596        }
1597        if i < total_rows - 1 || has_trailing_newline {
1598            output.push(b'\n');
1599        }
1600    }
1601
1602    output
1603}
1604
1605#[cfg(test)]
1606mod tests {
1607    use super::*;
1608
1609    #[test]
1610    fn extract_value_string() {
1611        let line = br#""hello","next""#;
1612        let (val, end) = extract_value(line, 0).unwrap();
1613        assert_eq!(val, b"\"hello\"");
1614        assert_eq!(end, 7);
1615    }
1616
1617    #[test]
1618    fn extract_value_number() {
1619        let line = b"42,next";
1620        let (val, end) = extract_value(line, 0).unwrap();
1621        assert_eq!(val, b"42");
1622        assert_eq!(end, 2);
1623    }
1624
1625    #[test]
1626    fn extract_value_bool() {
1627        let line = b"true,next";
1628        let (val, end) = extract_value(line, 0).unwrap();
1629        assert_eq!(val, b"true");
1630        assert_eq!(end, 4);
1631    }
1632
1633    #[test]
1634    fn extract_value_null() {
1635        let line = b"null,next";
1636        let (val, end) = extract_value(line, 0).unwrap();
1637        assert_eq!(val, b"null");
1638        assert_eq!(end, 4);
1639    }
1640
1641    #[test]
1642    fn extract_value_object() {
1643        let line = br#"{"a":1,"b":"x"},next"#;
1644        let (val, end) = extract_value(line, 0).unwrap();
1645        assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
1646        assert_eq!(end, 15);
1647    }
1648
1649    #[test]
1650    fn extract_value_array() {
1651        let line = b"[1,2,3],next";
1652        let (val, end) = extract_value(line, 0).unwrap();
1653        assert_eq!(val, b"[1,2,3]");
1654        assert_eq!(end, 7);
1655    }
1656
1657    #[test]
1658    fn extract_value_string_with_escapes() {
1659        let line = br#""he\"llo",next"#;
1660        let (val, end) = extract_value(line, 0).unwrap();
1661        assert_eq!(val, br#""he\"llo""#.to_vec());
1662        assert_eq!(end, 9);
1663    }
1664
1665    #[test]
1666    fn parse_line_simple() {
1667        let line = br#"{"a":1,"b":"x"}"#;
1668        let (parts, values) = parse_line(line).unwrap();
1669        assert_eq!(parts.len(), 3); // {"a": , ,"b": , }
1670        assert_eq!(values.len(), 2);
1671        assert_eq!(values[0], b"1");
1672        assert_eq!(values[1], b"\"x\"");
1673        assert_eq!(parts[0], br#"{"a":"#.to_vec());
1674        assert_eq!(parts[1], br#","b":"#.to_vec());
1675        assert_eq!(parts[2], b"}");
1676    }
1677
1678    #[test]
1679    fn roundtrip_simple() {
1680        let data = br#"{"a":1,"b":"x"}
1681{"a":2,"b":"y"}
1682{"a":3,"b":"z"}
1683"#;
1684        let result = preprocess(data).expect("should produce transform");
1685        let restored = reverse(&result.data, &result.metadata);
1686        assert_eq!(
1687            String::from_utf8_lossy(&restored),
1688            String::from_utf8_lossy(data),
1689        );
1690        assert_eq!(restored, data.to_vec());
1691    }
1692
1693    #[test]
1694    fn roundtrip_no_trailing_newline() {
1695        let data = br#"{"a":1,"b":"x"}
1696{"a":2,"b":"y"}
1697{"a":3,"b":"z"}"#;
1698        let result = preprocess(data).expect("should produce transform");
1699        let restored = reverse(&result.data, &result.metadata);
1700        assert_eq!(restored, data.to_vec());
1701    }
1702
1703    #[test]
1704    fn roundtrip_nested_values() {
1705        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1706{"id":2,"meta":{"x":30,"y":40}}
1707{"id":3,"meta":{"x":50,"y":60}}
1708{"id":4,"meta":{"x":70,"y":80}}
1709{"id":5,"meta":{"x":90,"y":100}}
1710"#;
1711        let result = preprocess(data).expect("should produce transform");
1712        let restored = reverse(&result.data, &result.metadata);
1713        assert_eq!(restored, data.to_vec());
1714    }
1715
1716    #[test]
1717    fn roundtrip_mixed_types() {
1718        let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
1719{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
1720{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
1721{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
1722{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
1723"#;
1724        let result = preprocess(data).expect("should produce transform");
1725        let restored = reverse(&result.data, &result.metadata);
1726        assert_eq!(restored, data.to_vec());
1727    }
1728
1729    #[test]
1730    fn schema_mismatch_too_few_returns_none() {
1731        // Different keys on different lines, but each group has < MIN_GROUP_ROWS.
1732        let data = br#"{"a":1,"b":2}
1733{"a":1,"c":3}
1734"#;
1735        assert!(preprocess(data).is_none());
1736    }
1737
1738    #[test]
1739    fn different_num_keys_too_few_returns_none() {
1740        let data = br#"{"a":1,"b":2}
1741{"a":1}
1742"#;
1743        assert!(preprocess(data).is_none());
1744    }
1745
1746    #[test]
1747    fn single_line_returns_none() {
1748        let data = br#"{"a":1,"b":2}
1749"#;
1750        assert!(preprocess(data).is_none());
1751    }
1752
1753    #[test]
1754    fn empty_returns_none() {
1755        assert!(preprocess(b"").is_none());
1756    }
1757
1758    #[test]
1759    fn column_layout_groups_similar_values() {
1760        let data = br#"{"type":"page_view","user":"alice"}
1761{"type":"api_call","user":"alice"}
1762{"type":"click","user":"bob"}
1763"#;
1764        let result = preprocess(data).unwrap();
1765
1766        // The columnar data should have type values grouped, then user values grouped.
1767        let col_data = &result.data;
1768        let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
1769        assert_eq!(cols.len(), 2);
1770
1771        // Column 0 = type values.
1772        let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
1773        assert_eq!(type_vals.len(), 3);
1774        assert_eq!(type_vals[0], br#""page_view""#);
1775        assert_eq!(type_vals[1], br#""api_call""#);
1776        assert_eq!(type_vals[2], br#""click""#);
1777
1778        // Column 1 = user values.
1779        let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
1780        assert_eq!(user_vals.len(), 3);
1781        assert_eq!(user_vals[0], br#""alice""#);
1782        assert_eq!(user_vals[1], br#""alice""#);
1783        assert_eq!(user_vals[2], br#""bob""#);
1784    }
1785
1786    #[test]
1787    fn roundtrip_string_with_escaped_chars() {
1788        let data = br#"{"msg":"he said \"hi\"","val":1}
1789{"msg":"line\nbreak","val":2}
1790{"msg":"tab\there","val":3}
1791{"msg":"back\\slash","val":4}
1792{"msg":"normal text","val":5}
1793"#;
1794        let result = preprocess(data).expect("should produce transform");
1795        let restored = reverse(&result.data, &result.metadata);
1796        assert_eq!(restored, data.to_vec());
1797    }
1798
1799    #[test]
1800    fn roundtrip_negative_and_float_numbers() {
1801        let data = br#"{"x":-3.14,"y":0}
1802{"x":2.718,"y":-1}
1803{"x":0.001,"y":999}
1804{"x":-100,"y":-200}
1805{"x":42.0,"y":7}
1806"#;
1807        let result = preprocess(data).expect("should produce transform");
1808        let restored = reverse(&result.data, &result.metadata);
1809        assert_eq!(restored, data.to_vec());
1810    }
1811
1812    /// Test that the transform+reverse is lossless even for tiny inputs
1813    /// by repeating them to pass the size check threshold.
1814    #[test]
1815    fn reverse_roundtrip_small_data() {
1816        // Verify parse_line works on small lines.
1817        let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
1818        assert_eq!(vals.len(), 2);
1819        assert_eq!(parts.len(), 3);
1820
1821        // 2-row data might fail the size check, so repeat to get enough rows.
1822        let big_data = br#"{"x":-3.14,"y":0}
1823{"x":2.718,"y":-1}
1824"#
1825        .repeat(20);
1826        let result = preprocess(&big_data).expect("should produce transform with 40 rows");
1827        let restored = reverse(&result.data, &result.metadata);
1828        assert_eq!(restored, big_data);
1829    }
1830
1831    // --- Strategy 2 (grouped) tests ---
1832
1833    #[test]
1834    fn grouped_roundtrip_two_schemas() {
1835        // Two different schemas, each with >= MIN_GROUP_ROWS rows.
1836        let mut data = Vec::new();
1837        for i in 0..10 {
1838            data.extend_from_slice(
1839                format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1840            );
1841            data.push(b'\n');
1842        }
1843        for i in 10..20 {
1844            data.extend_from_slice(
1845                format!(
1846                    r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1847                    i, i, i
1848                )
1849                .as_bytes(),
1850            );
1851            data.push(b'\n');
1852        }
1853        let result = preprocess(&data).expect("should produce grouped transform");
1854        // Should be version 2 (grouped).
1855        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1856        let restored = reverse(&result.data, &result.metadata);
1857        assert_eq!(restored, data);
1858    }
1859
1860    #[test]
1861    fn grouped_roundtrip_interleaved_schemas() {
1862        // Interleaved schemas: alternating between two different key sets.
1863        let mut data = Vec::new();
1864        for i in 0..20 {
1865            if i % 2 == 0 {
1866                data.extend_from_slice(
1867                    format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1868                );
1869            } else {
1870                data.extend_from_slice(
1871                    format!(
1872                        r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1873                        i, i, i
1874                    )
1875                    .as_bytes(),
1876                );
1877            }
1878            data.push(b'\n');
1879        }
1880        let result = preprocess(&data).expect("should produce grouped transform");
1881        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1882        let restored = reverse(&result.data, &result.metadata);
1883        assert_eq!(restored, data);
1884    }
1885
1886    #[test]
1887    fn grouped_roundtrip_with_residuals() {
1888        // Two large groups + a few unique-schema rows (residuals).
1889        let mut data = Vec::new();
1890        // Group A: 8 rows.
1891        for i in 0..8 {
1892            data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
1893            data.push(b'\n');
1894        }
1895        // 2 unique rows (will be residual).
1896        data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
1897        data.push(b'\n');
1898        data.extend_from_slice(br#"{"p":"q"}"#);
1899        data.push(b'\n');
1900        // Group B: 6 rows.
1901        for i in 0..6 {
1902            data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
1903            data.push(b'\n');
1904        }
1905        let result = preprocess(&data).expect("should produce grouped transform");
1906        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1907        let restored = reverse(&result.data, &result.metadata);
1908        assert_eq!(
1909            String::from_utf8_lossy(&restored),
1910            String::from_utf8_lossy(&data),
1911        );
1912        assert_eq!(restored, data);
1913    }
1914
1915    #[test]
1916    fn grouped_roundtrip_no_trailing_newline() {
1917        let mut data = Vec::new();
1918        for i in 0..6 {
1919            data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
1920            data.push(b'\n');
1921        }
1922        for i in 0..6 {
1923            data.extend_from_slice(
1924                format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
1925            );
1926            if i < 5 {
1927                data.push(b'\n');
1928            }
1929            // Last line: no trailing newline.
1930        }
1931        let result = preprocess(&data).expect("should produce grouped transform");
1932        let restored = reverse(&result.data, &result.metadata);
1933        assert_eq!(restored, data);
1934    }
1935
1936    #[test]
1937    fn uniform_still_preferred_over_grouped() {
1938        // All rows same schema — should use Strategy 1 (version 1), not Strategy 2.
1939        let data = br#"{"a":1,"b":"x"}
1940{"a":2,"b":"y"}
1941{"a":3,"b":"z"}
1942{"a":4,"b":"w"}
1943{"a":5,"b":"v"}
1944"#;
1945        let result = preprocess(data).expect("should produce transform");
1946        assert_eq!(
1947            result.metadata[0], METADATA_VERSION_UNIFORM,
1948            "uniform schema should use Strategy 1"
1949        );
1950        let restored = reverse(&result.data, &result.metadata);
1951        assert_eq!(restored, data.to_vec());
1952    }
1953
1954    #[test]
1955    fn grouped_gharchive_simulation() {
1956        // Simulates GitHub Archive: most rows have 7 keys, some have 8.
1957        let mut data = Vec::new();
1958        for i in 0..50 {
1959            if i % 5 == 0 {
1960                // 8-key rows (with org).
1961                data.extend_from_slice(
1962                    format!(
1963                        r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
1964                        i, i, i, i
1965                    )
1966                    .as_bytes(),
1967                );
1968            } else {
1969                // 7-key rows (no org).
1970                data.extend_from_slice(
1971                    format!(
1972                        r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
1973                        i, i, i
1974                    )
1975                    .as_bytes(),
1976                );
1977            }
1978            data.push(b'\n');
1979        }
1980        let result = preprocess(&data).expect("should produce grouped transform");
1981        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1982        let restored = reverse(&result.data, &result.metadata);
1983        assert_eq!(restored, data);
1984    }
1985
1986    // --- Nested decomposition tests ---
1987
1988    #[test]
1989    fn test_nested_decomposition_basic() {
1990        // Simple nested object decomposed correctly.
1991        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1992{"id":2,"meta":{"x":30,"y":40}}
1993{"id":3,"meta":{"x":50,"y":60}}
1994"#;
1995        let result = preprocess(data).expect("should produce transform");
1996        assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
1997
1998        // The columnar data should have expanded columns.
1999        let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
2000        // Original: 2 cols (id, meta). After flattening: 3 cols (id, meta.x, meta.y).
2001        assert_eq!(
2002            cols.len(),
2003            3,
2004            "should have 3 columns after flattening: got {}",
2005            cols.len()
2006        );
2007
2008        // Verify sub-columns contain the extracted values.
2009        let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2010        assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
2011
2012        let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
2013        assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
2014    }
2015
2016    #[test]
2017    fn test_nested_roundtrip() {
2018        // Flatten -> unflatten produces byte-exact original.
2019        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2020{"id":2,"meta":{"x":30,"y":40}}
2021{"id":3,"meta":{"x":50,"y":60}}
2022"#;
2023        let result = preprocess(data).expect("should produce transform");
2024        let restored = reverse(&result.data, &result.metadata);
2025        assert_eq!(
2026            String::from_utf8_lossy(&restored),
2027            String::from_utf8_lossy(data),
2028        );
2029        assert_eq!(restored, data.to_vec());
2030    }
2031
2032    #[test]
2033    fn test_nested_mixed_schemas() {
2034        // Different nested objects per row (some keys missing -> null).
2035        let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
2036{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
2037{"ts":"c","meta":{"query":"pricing","results_count":25}}
2038{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
2039{"ts":"e","meta":{"query":"api docs","results_count":41}}
2040"#;
2041        let result = preprocess(data).expect("should produce transform");
2042        let restored = reverse(&result.data, &result.metadata);
2043        assert_eq!(
2044            String::from_utf8_lossy(&restored),
2045            String::from_utf8_lossy(data),
2046        );
2047        assert_eq!(restored, data.to_vec());
2048    }
2049
2050    #[test]
2051    fn test_nested_no_nested_objects() {
2052        // Returns None when no nested objects — flat data should still work.
2053        let data = br#"{"a":1,"b":"x"}
2054{"a":2,"b":"y"}
2055{"a":3,"b":"z"}
2056"#;
2057        let result = preprocess(data).expect("should produce transform");
2058        let restored = reverse(&result.data, &result.metadata);
2059        assert_eq!(restored, data.to_vec());
2060
2061        // Verify the metadata has has_nested=0 since no nested objects.
2062        // The nested flag is appended after template parts.
2063        // For uniform, metadata starts with version(1) + num_rows(4) + num_cols(2) +
2064        // trailing_newline(1) + num_parts(2) + parts.
2065        // After those parts, there should be a 0 byte (has_nested=0).
2066        let meta = &result.metadata;
2067        let last_byte = meta[meta.len() - 1];
2068        assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
2069    }
2070
2071    #[test]
2072    fn test_nested_real_corpus() {
2073        // Test with data shaped like the test-ndjson.ndjson corpus.
2074        let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
2075{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
2076{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
2077{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
2078{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
2079"#;
2080        let result = preprocess(data).expect("should produce transform");
2081        let restored = reverse(&result.data, &result.metadata);
2082        assert_eq!(
2083            String::from_utf8_lossy(&restored),
2084            String::from_utf8_lossy(data),
2085        );
2086        assert_eq!(restored, data.to_vec());
2087    }
2088
2089    #[test]
2090    fn test_nested_roundtrip_with_null_values() {
2091        // Some rows have null for the nested field.
2092        let data = br#"{"id":1,"meta":{"x":10}}
2093{"id":2,"meta":null}
2094{"id":3,"meta":{"x":30}}
2095{"id":4,"meta":null}
2096{"id":5,"meta":{"x":50}}
2097"#;
2098        let result = preprocess(data).expect("should produce transform");
2099        let restored = reverse(&result.data, &result.metadata);
2100        assert_eq!(restored, data.to_vec());
2101    }
2102
2103    #[test]
2104    fn test_nested_string_values_preserved_exact() {
2105        // Verify that string values in nested objects preserve exact bytes (with quotes).
2106        let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2107{"id":2,"meta":{"name":"Bob","score":200}}
2108{"id":3,"meta":{"name":"Charlie","score":300}}
2109"#;
2110        let result = preprocess(data).expect("should produce transform");
2111        let restored = reverse(&result.data, &result.metadata);
2112        assert_eq!(restored, data.to_vec());
2113    }
2114
2115    #[test]
2116    fn test_parse_nested_object_kv() {
2117        let obj = br#"{"query":"benchmark","results_count":14}"#;
2118        let pairs = parse_nested_object_kv(obj).unwrap();
2119        assert_eq!(pairs.len(), 2);
2120        assert_eq!(pairs[0].0, b"query");
2121        assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2122        assert_eq!(pairs[1].0, b"results_count");
2123        assert_eq!(pairs[1].1, b"14");
2124    }
2125
2126    #[test]
2127    fn test_nested_varying_subkeys_roundtrip() {
2128        // Regression: rows with varying sub-keys in nested objects must
2129        // round-trip byte-exact. Even rows have `extra`, odd rows don't.
2130        let mut lines = Vec::new();
2131        for i in 0..50 {
2132            let line = if i % 2 == 0 {
2133                format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
2134            } else {
2135                format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
2136            };
2137            lines.push(line);
2138        }
2139        let ndjson = lines.join("\n") + "\n";
2140        let data = ndjson.as_bytes();
2141
2142        let result = preprocess(data).expect("should produce transform");
2143        let restored = reverse(&result.data, &result.metadata);
2144        assert_eq!(
2145            std::str::from_utf8(&restored).unwrap(),
2146            std::str::from_utf8(data).unwrap(),
2147            "varying sub-keys roundtrip must be byte-exact"
2148        );
2149    }
2150
2151    #[test]
2152    fn test_nested_explicit_null_preserved() {
2153        // Explicit null values in nested objects must survive roundtrip.
2154        // `{"x":1,"y":null}` must NOT be collapsed to `{"x":1}`.
2155        let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
2156                     {\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
2157                     {\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
2158        let result = preprocess(data).expect("should produce transform");
2159        let restored = reverse(&result.data, &result.metadata);
2160        assert_eq!(
2161            std::str::from_utf8(&restored).unwrap(),
2162            std::str::from_utf8(data).unwrap(),
2163            "explicit null values must be preserved"
2164        );
2165    }
2166}