Skip to main content

datacortex_core/format/
ndjson.rs

1//! NDJSON columnar reorg — lossless transform that reorders row-oriented
2//! NDJSON data into column-oriented layout.
3//!
4//! Two strategies:
5//!
6//! **Strategy 1 (uniform):** All rows share the same schema (keys in same order).
7//!   Row-oriented (before):
8//!     {"ts":"2026-03-15T10:30:00.001Z","type":"page_view","user":"usr_a1b2c3d4"}
9//!     {"ts":"2026-03-15T10:30:00.234Z","type":"api_call","user":"usr_a1b2c3d4"}
10//!   Column-oriented (after):
11//!     [ts values] "2026-03-15T10:30:00.001Z" \x01 "2026-03-15T10:30:00.234Z" \x00
12//!     [type values] "page_view" \x01 "api_call" \x00
13//!     [user values] "usr_a1b2c3d4" \x01 "usr_a1b2c3d4"
14//!
15//! **Strategy 2 (grouped):** Rows have diverse schemas (e.g., GitHub Archive events).
16//!   Groups rows by schema, applies Strategy 1 per group, stores residual rows raw.
17//!   Metadata version byte distinguishes: 1 = uniform, 2 = grouped.
18//!
19//! Separators:
20//!   \x00 = column separator (cannot appear in valid JSON text)
21//!   \x01 = value separator within a column (cannot appear in valid JSON)
22
23use super::transform::TransformResult;
24use std::collections::HashMap;
25
26const COL_SEP: u8 = 0x00;
27const VAL_SEP: u8 = 0x01;
28const METADATA_VERSION_UNIFORM: u8 = 1;
29const METADATA_VERSION_GROUPED: u8 = 2;
30
31/// Minimum rows in a schema group for it to be columnarized (not residual).
32const MIN_GROUP_ROWS: usize = 5;
33
34/// A schema group: template parts + list of (row_index, parsed_values).
35type SchemaGroup = (Vec<Vec<u8>>, Vec<(usize, Vec<Vec<u8>>)>);
36
37/// Extract the raw value bytes from a JSON line at a given position.
38/// `pos` should point to the first byte of the value (after `:`).
39/// Returns (value_bytes, end_position).
40///
41/// Handles: strings, numbers, booleans, null, nested objects, nested arrays.
42fn extract_value(line: &[u8], mut pos: usize) -> Option<(Vec<u8>, usize)> {
43    // Skip whitespace after colon.
44    while pos < line.len() && line[pos].is_ascii_whitespace() {
45        pos += 1;
46    }
47    if pos >= line.len() {
48        return None;
49    }
50
51    let start = pos;
52    match line[pos] {
53        b'"' => {
54            // String value — scan to closing unescaped quote.
55            pos += 1;
56            let mut escaped = false;
57            while pos < line.len() {
58                if escaped {
59                    escaped = false;
60                } else if line[pos] == b'\\' {
61                    escaped = true;
62                } else if line[pos] == b'"' {
63                    pos += 1;
64                    return Some((line[start..pos].to_vec(), pos));
65                }
66                pos += 1;
67            }
68            None // Unterminated string.
69        }
70        b'{' => {
71            // Nested object — match braces, respecting strings.
72            let mut depth = 1;
73            pos += 1;
74            while pos < line.len() && depth > 0 {
75                match line[pos] {
76                    b'"' => {
77                        // Skip over string contents.
78                        pos += 1;
79                        let mut escaped = false;
80                        while pos < line.len() {
81                            if escaped {
82                                escaped = false;
83                            } else if line[pos] == b'\\' {
84                                escaped = true;
85                            } else if line[pos] == b'"' {
86                                break;
87                            }
88                            pos += 1;
89                        }
90                    }
91                    b'{' => depth += 1,
92                    b'}' => depth -= 1,
93                    _ => {}
94                }
95                pos += 1;
96            }
97            if depth != 0 || pos > line.len() {
98                return None; // Unterminated object.
99            }
100            Some((line[start..pos].to_vec(), pos))
101        }
102        b'[' => {
103            // Nested array — match brackets, respecting strings.
104            let mut depth = 1;
105            pos += 1;
106            while pos < line.len() && depth > 0 {
107                match line[pos] {
108                    b'"' => {
109                        pos += 1;
110                        let mut escaped = false;
111                        while pos < line.len() {
112                            if escaped {
113                                escaped = false;
114                            } else if line[pos] == b'\\' {
115                                escaped = true;
116                            } else if line[pos] == b'"' {
117                                break;
118                            }
119                            pos += 1;
120                        }
121                    }
122                    b'[' => depth += 1,
123                    b']' => depth -= 1,
124                    _ => {}
125                }
126                pos += 1;
127            }
128            if depth != 0 || pos > line.len() {
129                return None; // Unterminated array.
130            }
131            Some((line[start..pos].to_vec(), pos))
132        }
133        _ => {
134            // Number, boolean, null — scan until , or } or ] or whitespace.
135            while pos < line.len() {
136                match line[pos] {
137                    b',' | b'}' | b']' => break,
138                    _ if line[pos].is_ascii_whitespace() => break,
139                    _ => pos += 1,
140                }
141            }
142            if pos == start {
143                None
144            } else {
145                Some((line[start..pos].to_vec(), pos))
146            }
147        }
148    }
149}
150
151/// Parse a single JSON line into (template_parts, values).
152///
153/// Template parts are the structural bytes between values:
154///   Part 0 = everything from { up to and including the : before value 0
155///   Part 1 = everything from after value 0 up to and including : before value 1
156///   ...
157///   Part N = everything from after the last value to end of line (including } and \n)
158///
159/// Returns None if the line is not a flat-ish JSON object (we handle nested values
160/// as opaque blobs, but the top-level structure must be key:value pairs).
161type ParsedLine = (Vec<Vec<u8>>, Vec<Vec<u8>>);
162
163fn parse_line(line: &[u8]) -> Option<ParsedLine> {
164    let mut pos = 0;
165
166    // Skip leading whitespace.
167    while pos < line.len() && line[pos].is_ascii_whitespace() {
168        pos += 1;
169    }
170    if pos >= line.len() || line[pos] != b'{' {
171        return None;
172    }
173
174    let mut parts: Vec<Vec<u8>> = Vec::new();
175    let mut values: Vec<Vec<u8>> = Vec::new();
176    let mut part_start = 0;
177
178    pos += 1; // Skip opening {.
179
180    loop {
181        // Skip whitespace.
182        while pos < line.len() && line[pos].is_ascii_whitespace() {
183            pos += 1;
184        }
185        if pos >= line.len() {
186            return None;
187        }
188
189        // Check for closing brace (end of object).
190        if line[pos] == b'}' {
191            // Capture the final part: everything from part_start to end of line.
192            parts.push(line[part_start..].to_vec());
193            break;
194        }
195
196        // Expect a key string.
197        if line[pos] != b'"' {
198            return None;
199        }
200        // Skip over the key string.
201        pos += 1;
202        let mut escaped = false;
203        while pos < line.len() {
204            if escaped {
205                escaped = false;
206            } else if line[pos] == b'\\' {
207                escaped = true;
208            } else if line[pos] == b'"' {
209                pos += 1;
210                break;
211            }
212            pos += 1;
213        }
214
215        // Skip whitespace, expect colon.
216        while pos < line.len() && line[pos].is_ascii_whitespace() {
217            pos += 1;
218        }
219        if pos >= line.len() || line[pos] != b':' {
220            return None;
221        }
222        pos += 1; // Skip colon.
223
224        // Everything from part_start up to here is a "template part".
225        parts.push(line[part_start..pos].to_vec());
226
227        // Extract the value.
228        let (value, value_end) = extract_value(line, pos)?;
229        values.push(value);
230        pos = value_end;
231
232        // Mark the start of the next part.
233        part_start = pos;
234
235        // Skip whitespace.
236        while pos < line.len() && line[pos].is_ascii_whitespace() {
237            pos += 1;
238        }
239        if pos >= line.len() {
240            return None;
241        }
242
243        // Expect comma or closing brace.
244        if line[pos] == b',' {
245            pos += 1;
246        } else if line[pos] == b'}' {
247            // Will be caught at the top of the loop next iteration — but we
248            // need to NOT advance pos, so the } check above catches it.
249            // Actually, let's just handle it here.
250            parts.push(line[part_start..].to_vec());
251            break;
252        } else {
253            return None; // Unexpected character.
254        }
255    }
256
257    if values.is_empty() {
258        return None;
259    }
260
261    Some((parts, values))
262}
263
264/// Split NDJSON data into lines (without newline characters).
265fn split_lines(data: &[u8]) -> Vec<&[u8]> {
266    let mut lines: Vec<&[u8]> = Vec::new();
267    let mut start = 0;
268    for i in 0..data.len() {
269        if data[i] == b'\n' {
270            lines.push(&data[start..i]);
271            start = i + 1;
272        }
273    }
274    if start < data.len() {
275        lines.push(&data[start..]);
276    }
277    lines
278}
279
280/// Build columnar data from parsed lines that share the same template.
281/// Returns (col_data, metadata) for a uniform group.
282fn build_uniform_columnar(
283    template_parts: &[Vec<u8>],
284    columns: &[Vec<Vec<u8>>],
285    num_rows: usize,
286    has_trailing_newline: bool,
287) -> (Vec<u8>, Vec<u8>) {
288    let num_cols = columns.len();
289
290    // Build column data: values separated by \x01, columns separated by \x00.
291    let mut col_data = Vec::new();
292    for (ci, col) in columns.iter().enumerate() {
293        for (ri, val) in col.iter().enumerate() {
294            col_data.extend_from_slice(val);
295            if ri < num_rows - 1 {
296                col_data.push(VAL_SEP);
297            }
298        }
299        if ci < num_cols - 1 {
300            col_data.push(COL_SEP);
301        }
302    }
303
304    // Build metadata: version + num_rows + num_cols + trailing_newline + template parts.
305    let mut metadata = Vec::new();
306    metadata.push(METADATA_VERSION_UNIFORM);
307    metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
308    metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
309    metadata.push(if has_trailing_newline { 1 } else { 0 });
310    metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
311    for part in template_parts {
312        metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
313        metadata.extend_from_slice(part);
314    }
315
316    (col_data, metadata)
317}
318
319/// Strategy 1: Uniform schema — all rows must have the same template.
320/// Returns None if schemas differ.
321fn preprocess_uniform(
322    non_empty: &[&[u8]],
323    has_trailing_newline: bool,
324) -> Option<(Vec<u8>, Vec<u8>)> {
325    if non_empty.len() < 2 {
326        return None;
327    }
328
329    let (template_parts, first_values) = parse_line(non_empty[0])?;
330    let num_cols = first_values.len();
331    if template_parts.len() != num_cols + 1 {
332        return None;
333    }
334
335    let mut columns: Vec<Vec<Vec<u8>>> = Vec::with_capacity(num_cols);
336    for v in &first_values {
337        columns.push(vec![v.clone()]);
338    }
339
340    for &line in &non_empty[1..] {
341        let (parts, values) = parse_line(line)?;
342        if values.len() != num_cols || parts.len() != template_parts.len() {
343            return None;
344        }
345        for (a, b) in parts.iter().zip(template_parts.iter()) {
346            if a != b {
347                return None;
348            }
349        }
350        for (col, val) in values.iter().enumerate() {
351            columns[col].push(val.clone());
352        }
353    }
354
355    Some(build_uniform_columnar(
356        &template_parts,
357        &columns,
358        non_empty.len(),
359        has_trailing_newline,
360    ))
361}
362
363/// Strategy 2: Group-by-schema — group rows by template, columnarize each group.
364///
365/// Metadata format (version=2):
366///   version: u8 = 2
367///   has_trailing_newline: u8
368///   total_rows: u32 LE
369///   num_groups: u16 LE
370///   for each group:
371///     num_rows: u32 LE
372///     row_indices: [u32 LE * num_rows]
373///     group_metadata_len: u32 LE
374///     group_metadata: [bytes]  (Strategy 1 metadata for this group)
375///   residual_count: u32 LE
376///   residual_indices: [u32 LE * residual_count]
377///
378/// Data format:
379///   for each group:
380///     data_len: u32 LE
381///     data: [bytes]  (columnar data for this group)
382///   residual_data: [bytes]  (raw lines joined by \n)
383fn preprocess_grouped(
384    non_empty: &[&[u8]],
385    has_trailing_newline: bool,
386) -> Option<(Vec<u8>, Vec<u8>)> {
387    if non_empty.len() < MIN_GROUP_ROWS {
388        return None;
389    }
390
391    // Parse all lines and group by template (the template parts identify the schema).
392    // We use the template parts as the group key.
393    let mut parsed: Vec<Option<ParsedLine>> = Vec::with_capacity(non_empty.len());
394    for &line in non_empty {
395        parsed.push(parse_line(line));
396    }
397
398    // Group rows by template. Key = template parts (as bytes for hashing).
399    // We store groups as: template_key -> (template_parts, vec of (row_index, values)).
400    let mut group_map: HashMap<Vec<u8>, SchemaGroup> = HashMap::new();
401    let mut residual_indices: Vec<usize> = Vec::new();
402
403    for (idx, parsed_line) in parsed.into_iter().enumerate() {
404        if let Some((parts, values)) = parsed_line {
405            // Build a hashable key from the template parts.
406            let mut key = Vec::new();
407            for part in &parts {
408                key.extend_from_slice(&(part.len() as u32).to_le_bytes());
409                key.extend_from_slice(part);
410            }
411            group_map
412                .entry(key)
413                .or_insert_with(|| (parts, Vec::new()))
414                .1
415                .push((idx, values));
416        } else {
417            // Unparseable line goes to residual.
418            residual_indices.push(idx);
419        }
420    }
421
422    // Separate groups into qualifying (>= MIN_GROUP_ROWS) and residual.
423    let mut groups: Vec<SchemaGroup> = Vec::new();
424    for (_key, (template_parts, rows)) in group_map {
425        if rows.len() >= MIN_GROUP_ROWS {
426            groups.push((template_parts, rows));
427        } else {
428            // Too few rows — send to residual.
429            for (idx, _) in &rows {
430                residual_indices.push(*idx);
431            }
432        }
433    }
434
435    // Need at least 1 qualifying group for this to be useful.
436    if groups.is_empty() {
437        return None;
438    }
439
440    // Sort groups by their first row index for deterministic output.
441    groups.sort_by_key(|(_, rows)| rows[0].0);
442    residual_indices.sort_unstable();
443
444    // Build per-group columnar data and metadata.
445    struct GroupOutput {
446        row_indices: Vec<u32>,
447        col_data: Vec<u8>,
448        group_metadata: Vec<u8>,
449    }
450
451    let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
452
453    for (template_parts, rows) in &groups {
454        let num_cols = template_parts.len() - 1;
455        let mut columns: Vec<Vec<Vec<u8>>> = (0..num_cols).map(|_| Vec::new()).collect();
456        let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
457
458        for (idx, values) in rows {
459            row_indices.push(*idx as u32);
460            for (col, val) in values.iter().enumerate() {
461                columns[col].push(val.clone());
462            }
463        }
464
465        // Build columnar data for this group (trailing_newline=false for sub-groups).
466        let (col_data, group_metadata) =
467            build_uniform_columnar(template_parts, &columns, rows.len(), false);
468
469        group_outputs.push(GroupOutput {
470            row_indices,
471            col_data,
472            group_metadata,
473        });
474    }
475
476    // Build the combined data blob.
477    let mut data_out = Vec::new();
478    for group in &group_outputs {
479        data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
480        data_out.extend_from_slice(&group.col_data);
481    }
482
483    // Append residual lines (raw, separated by \n).
484    let residual_start = data_out.len();
485    for (i, &idx) in residual_indices.iter().enumerate() {
486        data_out.extend_from_slice(non_empty[idx]);
487        if i < residual_indices.len() - 1 {
488            data_out.push(b'\n');
489        }
490    }
491    let _residual_len = data_out.len() - residual_start;
492
493    // Build the combined metadata.
494    let mut metadata = Vec::new();
495    metadata.push(METADATA_VERSION_GROUPED);
496    metadata.push(if has_trailing_newline { 1 } else { 0 });
497    metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
498    metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
499
500    for group in &group_outputs {
501        metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
502        for &idx in &group.row_indices {
503            metadata.extend_from_slice(&idx.to_le_bytes());
504        }
505        metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
506        metadata.extend_from_slice(&group.group_metadata);
507    }
508
509    metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
510    for &idx in &residual_indices {
511        metadata.extend_from_slice(&(idx as u32).to_le_bytes());
512    }
513
514    Some((data_out, metadata))
515}
516
517/// Metadata describing which columns were flattened from nested objects.
518pub(crate) struct NestedGroupInfo {
519    /// Index of the original column that was expanded.
520    pub(crate) original_col_index: u16,
521    /// Sub-key names for the expanded sub-columns.
522    pub(crate) sub_keys: Vec<Vec<u8>>,
523    /// Template parts for reconstructing nested objects (preserves original formatting).
524    /// If empty, compact format `{"key":val,...}` is used (NDJSON compatibility).
525    pub(crate) nested_template: Vec<Vec<u8>>,
526    /// Absence bitmap: one bit per (sub_key, row). Bit=1 means the key was ABSENT
527    /// in the original object (not present at all), vs bit=0 means key was present
528    /// (possibly with explicit `null`). Packed LSB-first.
529    /// Length = ceil(num_sub_keys * num_rows / 8).
530    /// Empty if all rows had all keys (no absences).
531    pub(crate) absence_bitmap: Vec<u8>,
532}
533
534/// Attempt to flatten nested JSON objects in columnar data (depth-1).
535///
536/// Takes columnar data (\x00/\x01 separated) and returns expanded columnar data
537/// with nested objects decomposed into sub-columns.
538/// Returns None if no nested objects found.
539pub(crate) fn flatten_nested_columns(
540    col_data: &[u8],
541    num_rows: usize,
542) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
543    // Split into columns.
544    let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
545    if columns.is_empty() || num_rows == 0 {
546        return None;
547    }
548
549    let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
550    // Build the output columns: for non-nested cols, keep as-is.
551    // For nested cols, replace with sub-columns.
552    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
553
554    for (col_idx, &col_chunk) in columns.iter().enumerate() {
555        let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
556        if values.len() != num_rows {
557            return None;
558        }
559
560        // Check if ALL non-null values start with '{' (nested object).
561        let mut all_objects = true;
562        let mut has_non_null = false;
563        for val in &values {
564            if *val == b"null" {
565                continue;
566            }
567            has_non_null = true;
568            if !val.starts_with(b"{") {
569                all_objects = false;
570                break;
571            }
572        }
573
574        if !all_objects || !has_non_null {
575            // Not a nested-object column — keep as-is.
576            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
577            output_columns.push(col_values);
578            continue;
579        }
580
581        // This column contains nested objects — decompose depth-1.
582        // Parse all values and collect all unique sub-keys (preserving discovery order).
583        // Also capture the template from the first non-null object to preserve formatting.
584        let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
585        let mut nested_template: Vec<Vec<u8>> = Vec::new();
586        type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
587        let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
588
589        for val in &values {
590            if *val == b"null" {
591                parsed_rows.push(None);
592                continue;
593            }
594            if nested_template.is_empty() {
595                // First non-null: use template-preserving parser.
596                match parse_nested_object_with_template(val) {
597                    Some((template, kv_pairs)) => {
598                        for (key, _) in &kv_pairs {
599                            if !all_sub_keys.iter().any(|k| k == key) {
600                                all_sub_keys.push(key.clone());
601                            }
602                        }
603                        nested_template = template;
604                        parsed_rows.push(Some(kv_pairs));
605                    }
606                    None => {
607                        all_sub_keys.clear();
608                        break;
609                    }
610                }
611            } else {
612                // Subsequent rows: use simpler kv parser (template already captured).
613                match parse_nested_object_kv(val) {
614                    Some(kv_pairs) => {
615                        for (key, _) in &kv_pairs {
616                            if !all_sub_keys.iter().any(|k| k == key) {
617                                all_sub_keys.push(key.clone());
618                            }
619                        }
620                        parsed_rows.push(Some(kv_pairs));
621                    }
622                    None => {
623                        all_sub_keys.clear();
624                        break;
625                    }
626                }
627            }
628        }
629
630        if all_sub_keys.is_empty() {
631            // Could not parse — keep column as-is.
632            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
633            output_columns.push(col_values);
634            continue;
635        }
636
637        // Build sub-columns: for each sub-key, extract values from parsed rows.
638        // Also build an absence bitmap: bit=1 where a key was absent from the
639        // original row (as opposed to being present with explicit `null`).
640        let num_sub_keys = all_sub_keys.len();
641        let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
642        let total_bits = num_sub_keys * num_rows;
643        let bitmap_bytes = total_bits.div_ceil(8);
644        let mut absence_bitmap = vec![0u8; bitmap_bytes];
645        let mut has_any_absent = false;
646
647        for (row_idx, parsed) in parsed_rows.iter().enumerate() {
648            match parsed {
649                Some(kv_pairs) => {
650                    for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
651                        let found = kv_pairs.iter().find(|(k, _)| k == sk);
652                        match found {
653                            Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
654                            None => {
655                                sub_columns[sk_idx].push(b"null".to_vec());
656                                // Mark this (sub_key, row) as absent.
657                                let bit_idx = sk_idx * num_rows + row_idx;
658                                absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
659                                has_any_absent = true;
660                            }
661                        }
662                    }
663                }
664                None => {
665                    // null row — all sub-columns get null.
666                    // These are NOT marked as absent (the whole column was null,
667                    // not individual keys missing).
668                    for sc in sub_columns.iter_mut() {
669                        sc.push(b"null".to_vec());
670                    }
671                }
672            }
673        }
674
675        nested_groups.push(NestedGroupInfo {
676            original_col_index: col_idx as u16,
677            sub_keys: all_sub_keys,
678            nested_template,
679            absence_bitmap: if has_any_absent {
680                absence_bitmap
681            } else {
682                Vec::new()
683            },
684        });
685
686        for sc in sub_columns {
687            output_columns.push(sc);
688        }
689    }
690
691    if nested_groups.is_empty() {
692        return None;
693    }
694
695    // Build the flattened columnar data.
696    let num_out_cols = output_columns.len();
697    let mut out = Vec::new();
698    for (ci, col) in output_columns.iter().enumerate() {
699        for (ri, val) in col.iter().enumerate() {
700            out.extend_from_slice(val);
701            if ri < num_rows - 1 {
702                out.push(VAL_SEP);
703            }
704        }
705        if ci < num_out_cols - 1 {
706            out.push(COL_SEP);
707        }
708    }
709
710    Some((out, nested_groups))
711}
712
713/// Parse a nested JSON object into (template_parts, kv_pairs).
714/// Template parts include all structural bytes (braces, keys, colons, whitespace) —
715/// preserving the original formatting so the object can be reconstructed exactly.
716/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
717#[allow(clippy::type_complexity)]
718pub(crate) fn parse_nested_object_with_template(
719    obj: &[u8],
720) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
721    let mut pos = 0;
722
723    // Skip whitespace.
724    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
725        pos += 1;
726    }
727    if pos >= obj.len() || obj[pos] != b'{' {
728        return None;
729    }
730    pos += 1;
731
732    let mut parts: Vec<Vec<u8>> = Vec::new();
733    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
734    let mut part_start = 0;
735
736    loop {
737        // Skip whitespace.
738        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
739            pos += 1;
740        }
741        if pos >= obj.len() {
742            return None;
743        }
744        if obj[pos] == b'}' {
745            parts.push(obj[part_start..].to_vec());
746            break;
747        }
748
749        // Expect a key string.
750        if obj[pos] != b'"' {
751            return None;
752        }
753        let key_str_start = pos + 1;
754        pos += 1;
755        let mut escaped = false;
756        while pos < obj.len() {
757            if escaped {
758                escaped = false;
759            } else if obj[pos] == b'\\' {
760                escaped = true;
761            } else if obj[pos] == b'"' {
762                break;
763            }
764            pos += 1;
765        }
766        if pos >= obj.len() {
767            return None;
768        }
769        let key = obj[key_str_start..pos].to_vec();
770        pos += 1; // skip closing quote
771
772        // Skip whitespace, expect colon.
773        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
774            pos += 1;
775        }
776        if pos >= obj.len() || obj[pos] != b':' {
777            return None;
778        }
779        pos += 1;
780
781        // Skip whitespace between colon and value — include in template.
782        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
783            pos += 1;
784        }
785
786        // Template part: everything from part_start to here (includes key, colon, post-colon ws).
787        parts.push(obj[part_start..pos].to_vec());
788
789        // Extract the value (no whitespace skipping — already consumed above).
790        let value_start = pos;
791        // Use extract_value but we've already consumed whitespace.
792        let (value, value_end) = extract_value(obj, value_start)?;
793        pos = value_end;
794        pairs.push((key, value));
795
796        part_start = pos;
797
798        // Skip whitespace.
799        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
800            pos += 1;
801        }
802        if pos >= obj.len() {
803            return None;
804        }
805        if obj[pos] == b',' {
806            pos += 1;
807        } else if obj[pos] == b'}' {
808            parts.push(obj[part_start..].to_vec());
809            break;
810        } else {
811            return None;
812        }
813    }
814
815    if pairs.is_empty() {
816        return None;
817    }
818    Some((parts, pairs))
819}
820
821/// Parse a nested JSON object into its key-value pairs (depth-1 only).
822/// Returns the exact bytes for each key and value.
823/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
824pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
825    let mut pos = 0;
826
827    // Skip whitespace.
828    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
829        pos += 1;
830    }
831    if pos >= obj.len() || obj[pos] != b'{' {
832        return None;
833    }
834    pos += 1;
835
836    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
837
838    loop {
839        // Skip whitespace.
840        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
841            pos += 1;
842        }
843        if pos >= obj.len() {
844            return None;
845        }
846        if obj[pos] == b'}' {
847            break;
848        }
849
850        // Expect a key string.
851        if obj[pos] != b'"' {
852            return None;
853        }
854        pos += 1;
855        let key_start = pos;
856        let mut escaped = false;
857        while pos < obj.len() {
858            if escaped {
859                escaped = false;
860            } else if obj[pos] == b'\\' {
861                escaped = true;
862            } else if obj[pos] == b'"' {
863                break;
864            }
865            pos += 1;
866        }
867        if pos >= obj.len() {
868            return None;
869        }
870        let key = obj[key_start..pos].to_vec();
871        pos += 1; // skip closing quote
872
873        // Skip whitespace, expect colon.
874        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
875            pos += 1;
876        }
877        if pos >= obj.len() || obj[pos] != b':' {
878            return None;
879        }
880        pos += 1;
881
882        // Extract the value.
883        let (value, value_end) = extract_value(obj, pos)?;
884        pos = value_end;
885        pairs.push((key, value));
886
887        // Skip whitespace.
888        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
889            pos += 1;
890        }
891        if pos >= obj.len() {
892            return None;
893        }
894        if obj[pos] == b',' {
895            pos += 1;
896        } else if obj[pos] == b'}' {
897            break;
898        } else {
899            return None;
900        }
901    }
902
903    if pairs.is_empty() {
904        return None;
905    }
906    Some(pairs)
907}
908
909/// Unflatten nested sub-columns back into JSON objects.
910///
911/// Takes flattened columnar data and nested group info, merges sub-columns
912/// back into the original nested object columns.
913pub(crate) fn unflatten_nested_columns(
914    flat_data: &[u8],
915    nested_groups: &[NestedGroupInfo],
916    num_rows: usize,
917    total_flat_cols: usize,
918) -> Vec<u8> {
919    let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
920    if flat_columns.len() != total_flat_cols {
921        return flat_data.to_vec();
922    }
923
924    // Parse all flat column values.
925    let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
926    for chunk in &flat_columns {
927        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
928        if vals.len() != num_rows {
929            return flat_data.to_vec();
930        }
931        flat_col_values.push(vals);
932    }
933
934    // Reconstruct original columns from flat columns.
935    // Walk through flat columns, merging sub-columns back where needed.
936    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
937
938    // Build a set of original_col_index -> group for quick lookup.
939    // We need to know which flat columns map to which nested group.
940    // The flat columns are in order: non-nested cols keep their position,
941    // nested cols are replaced by their sub-columns at that position.
942    //
943    // To figure out which flat_idx corresponds to what, we replay the
944    // forward mapping.
945    // We need to know the ORIGINAL number of columns.
946    let original_num_cols = total_flat_cols
947        - nested_groups
948            .iter()
949            .map(|g| g.sub_keys.len())
950            .sum::<usize>()
951        + nested_groups.len();
952
953    // Build mapping: for each original col, is it nested or not?
954    let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
955    for (gi, group) in nested_groups.iter().enumerate() {
956        if (group.original_col_index as usize) < original_num_cols {
957            original_col_map[group.original_col_index as usize] = Some(gi);
958        }
959    }
960
961    let mut flat_idx = 0;
962    for entry in original_col_map.iter().take(original_num_cols) {
963        if let Some(gi) = entry {
964            let group = &nested_groups[*gi];
965            let num_sub = group.sub_keys.len();
966
967            // Helper: check if sub-key `si` at `row` is absent using bitmap.
968            let is_absent = |si: usize, row: usize| -> bool {
969                if group.absence_bitmap.is_empty() {
970                    return false; // no absences in this group
971                }
972                let bit_idx = si * num_rows + row;
973                let byte_idx = bit_idx / 8;
974                if byte_idx >= group.absence_bitmap.len() {
975                    return false;
976                }
977                (group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
978            };
979
980            // Merge sub-columns back into nested objects.
981            let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
982            for row in 0..num_rows {
983                // Check if all sub-columns are null or absent for this row
984                // (meaning the whole nested column was null).
985                let all_null = (0..num_sub).all(|si| {
986                    flat_idx + si < flat_col_values.len()
987                        && flat_col_values[flat_idx + si][row] == b"null"
988                });
989                if all_null && !group.absence_bitmap.is_empty() {
990                    // If all values are null but some are "absent" and some are
991                    // "explicit null", we need to reconstruct, not collapse.
992                    let any_present_null = (0..num_sub).any(|si| {
993                        flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
994                    });
995                    if any_present_null {
996                        // At least one key has an explicit null — don't collapse.
997                        // Fall through to reconstruction below.
998                    } else {
999                        // All nulls are from absent keys — whole column was null.
1000                        merged_col.push(b"null".to_vec());
1001                        continue;
1002                    }
1003                } else if all_null {
1004                    merged_col.push(b"null".to_vec());
1005                    continue;
1006                }
1007
1008                // Check whether any sub-key is absent in this row.
1009                let has_absent = (0..num_sub).any(|si| is_absent(si, row));
1010
1011                if !has_absent
1012                    && !group.nested_template.is_empty()
1013                    && group.nested_template.len() == num_sub + 1
1014                {
1015                    // Template-based reconstruction: all keys present,
1016                    // preserves original formatting exactly.
1017                    let mut obj = Vec::new();
1018                    obj.extend_from_slice(&group.nested_template[0]);
1019                    if flat_idx < flat_col_values.len() {
1020                        obj.extend_from_slice(flat_col_values[flat_idx][row]);
1021                    }
1022                    for si in 1..num_sub {
1023                        obj.extend_from_slice(&group.nested_template[si]);
1024                        if flat_idx + si < flat_col_values.len() {
1025                            obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
1026                        }
1027                    }
1028                    obj.extend_from_slice(&group.nested_template[num_sub]);
1029                    merged_col.push(obj);
1030                } else {
1031                    // Compact reconstruction: some keys absent, or no template.
1032                    // Skip sub-keys that were absent in the original.
1033                    let mut obj = Vec::new();
1034                    obj.push(b'{');
1035                    let mut first = true;
1036                    for si in 0..num_sub {
1037                        if flat_idx + si >= flat_col_values.len() {
1038                            break;
1039                        }
1040                        if is_absent(si, row) {
1041                            continue; // key was absent — omit entirely
1042                        }
1043                        let val = flat_col_values[flat_idx + si][row];
1044                        if !first {
1045                            obj.push(b',');
1046                        }
1047                        first = false;
1048                        obj.push(b'"');
1049                        obj.extend_from_slice(&group.sub_keys[si]);
1050                        obj.push(b'"');
1051                        obj.push(b':');
1052                        obj.extend_from_slice(val);
1053                    }
1054                    obj.push(b'}');
1055                    merged_col.push(obj);
1056                }
1057            }
1058            output_columns.push(merged_col);
1059            flat_idx += num_sub;
1060        } else {
1061            // Non-nested column — copy as-is.
1062            if flat_idx < flat_col_values.len() {
1063                let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1064                    .iter()
1065                    .map(|v| v.to_vec())
1066                    .collect();
1067                output_columns.push(col);
1068            }
1069            flat_idx += 1;
1070        }
1071    }
1072
1073    // Rebuild columnar data.
1074    let num_out_cols = output_columns.len();
1075    let mut out = Vec::new();
1076    for (ci, col) in output_columns.iter().enumerate() {
1077        for (ri, val) in col.iter().enumerate() {
1078            out.extend_from_slice(val);
1079            if ri < num_rows - 1 {
1080                out.push(VAL_SEP);
1081            }
1082        }
1083        if ci < num_out_cols - 1 {
1084            out.push(COL_SEP);
1085        }
1086    }
1087
1088    out
1089}
1090
1091/// Serialize nested group info into bytes for storage in metadata.
1092/// Version 1 (has_nested=1): sub_keys only (backward compat, NDJSON path).
1093/// Version 2 (has_nested=2): sub_keys + nested_template (preserves formatting).
1094/// Version 3 (has_nested=3): sub_keys + nested_template + absence_bitmap.
1095pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1096    let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1097    let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
1098    let mut out = Vec::new();
1099    let version = if has_absence {
1100        3u8
1101    } else if has_template {
1102        2u8
1103    } else {
1104        1u8
1105    };
1106    out.push(version);
1107    out.push(groups.len() as u8);
1108    for group in groups {
1109        out.extend_from_slice(&group.original_col_index.to_le_bytes());
1110        out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1111        for key in &group.sub_keys {
1112            out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1113            out.extend_from_slice(key);
1114        }
1115        if has_template || version == 3 {
1116            out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1117            for part in &group.nested_template {
1118                out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1119                out.extend_from_slice(part);
1120            }
1121        }
1122        if version == 3 {
1123            let bm_len = group.absence_bitmap.len() as u32;
1124            out.extend_from_slice(&bm_len.to_le_bytes());
1125            out.extend_from_slice(&group.absence_bitmap);
1126        }
1127    }
1128    out
1129}
1130
1131/// Deserialize nested group info from metadata bytes.
1132/// Returns (nested_groups, bytes_consumed).
1133/// Handles version 1 (no template), version 2 (with template),
1134/// and version 3 (template + absence bitmap).
1135pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1136    if data.is_empty() {
1137        return None;
1138    }
1139    let mut pos = 0;
1140    let version = data[pos];
1141    pos += 1;
1142    if version != 1 && version != 2 && version != 3 {
1143        return None;
1144    }
1145    let has_template = version == 2 || version == 3;
1146    let has_absence = version == 3;
1147    if pos >= data.len() {
1148        return None;
1149    }
1150    let num_groups = data[pos] as usize;
1151    pos += 1;
1152
1153    let mut groups = Vec::with_capacity(num_groups);
1154    for _ in 0..num_groups {
1155        if pos + 4 > data.len() {
1156            return None;
1157        }
1158        let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1159        pos += 2;
1160        let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1161        pos += 2;
1162
1163        let mut sub_keys = Vec::with_capacity(num_sub_cols);
1164        for _ in 0..num_sub_cols {
1165            if pos + 2 > data.len() {
1166                return None;
1167            }
1168            let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1169            pos += 2;
1170            if pos + key_len > data.len() {
1171                return None;
1172            }
1173            sub_keys.push(data[pos..pos + key_len].to_vec());
1174            pos += key_len;
1175        }
1176
1177        let nested_template = if has_template {
1178            if pos + 2 > data.len() {
1179                return None;
1180            }
1181            let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1182            pos += 2;
1183            let mut parts = Vec::with_capacity(num_parts);
1184            for _ in 0..num_parts {
1185                if pos + 2 > data.len() {
1186                    return None;
1187                }
1188                let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1189                pos += 2;
1190                if pos + part_len > data.len() {
1191                    return None;
1192                }
1193                parts.push(data[pos..pos + part_len].to_vec());
1194                pos += part_len;
1195            }
1196            parts
1197        } else {
1198            Vec::new()
1199        };
1200
1201        let absence_bitmap = if has_absence {
1202            if pos + 4 > data.len() {
1203                return None;
1204            }
1205            let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1206            pos += 4;
1207            if pos + bm_len > data.len() {
1208                return None;
1209            }
1210            let bm = data[pos..pos + bm_len].to_vec();
1211            pos += bm_len;
1212            bm
1213        } else {
1214            Vec::new()
1215        };
1216
1217        groups.push(NestedGroupInfo {
1218            original_col_index,
1219            sub_keys,
1220            nested_template,
1221            absence_bitmap,
1222        });
1223    }
1224
1225    Some((groups, pos))
1226}
1227
1228/// Forward transform: NDJSON columnar reorg.
1229///
1230/// Tries Strategy 1 (uniform) first, then Strategy 2 (grouped) if schemas differ.
1231/// Returns None if data is not suitable for columnar transform.
1232pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1233    if data.is_empty() {
1234        return None;
1235    }
1236
1237    let has_trailing_newline = data.last() == Some(&b'\n');
1238    let lines = split_lines(data);
1239    let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1240
1241    if non_empty.len() < 2 {
1242        return None;
1243    }
1244
1245    // Strategy 1: try uniform schema first.
1246    if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1247        if col_data.len() + metadata.len() < data.len() {
1248            // Try depth-1 nested decomposition on the columnar output.
1249            // Even if the flattened data is slightly larger raw, the downstream
1250            // typed encoding + compression benefits are significant: null bitmaps
1251            // are compact and type-homogeneous columns compress much better.
1252            let num_rows = non_empty.len();
1253            if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1254                // Append nested info to metadata.
1255                let nested_bytes = serialize_nested_info(&nested_groups);
1256                metadata.extend_from_slice(&nested_bytes);
1257                return Some(TransformResult {
1258                    data: flat_data,
1259                    metadata,
1260                });
1261            }
1262            // No nested objects found — append has_nested=0.
1263            metadata.push(0u8); // has_nested = 0
1264            return Some(TransformResult {
1265                data: col_data,
1266                metadata,
1267            });
1268        }
1269    }
1270
1271    // Strategy 2: group by schema.
1272    if let Some((grouped_data, grouped_metadata)) =
1273        preprocess_grouped(&non_empty, has_trailing_newline)
1274    {
1275        if grouped_data.len() + grouped_metadata.len() < data.len() {
1276            return Some(TransformResult {
1277                data: grouped_data,
1278                metadata: grouped_metadata,
1279            });
1280        }
1281    }
1282
1283    None
1284}
1285
1286/// Reverse transform: reconstruct NDJSON from columnar layout + metadata.
1287/// Dispatches to the appropriate decoder based on metadata version byte.
1288pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1289    if metadata.is_empty() {
1290        return data.to_vec();
1291    }
1292    match metadata[0] {
1293        METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1294        METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1295        _ => data.to_vec(),
1296    }
1297}
1298
1299/// Reverse Strategy 1: uniform schema.
1300fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1301    if metadata.len() < 10 {
1302        return data.to_vec();
1303    }
1304    let mut pos = 0;
1305    let _version = metadata[pos];
1306    pos += 1;
1307    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1308    pos += 4;
1309    let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1310    pos += 2;
1311    let has_trailing_newline = metadata[pos] != 0;
1312    pos += 1;
1313    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1314    pos += 2;
1315
1316    let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1317    for _ in 0..num_parts {
1318        if pos + 2 > metadata.len() {
1319            return data.to_vec();
1320        }
1321        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1322        pos += 2;
1323        if pos + part_len > metadata.len() {
1324            return data.to_vec();
1325        }
1326        parts.push(metadata[pos..pos + part_len].to_vec());
1327        pos += part_len;
1328    }
1329
1330    if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1331        return data.to_vec();
1332    }
1333
1334    // Check for nested metadata after template parts.
1335    let remaining_metadata = &metadata[pos..];
1336    if !remaining_metadata.is_empty()
1337        && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
1338    {
1339        // has_nested == 1, 2, or 3: unflatten before reconstructing rows.
1340        if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1341            // Calculate total number of flat columns.
1342            let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1343            let unflattened =
1344                unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1345            return reverse_uniform_from_parts(
1346                &unflattened,
1347                &parts,
1348                num_rows,
1349                num_cols,
1350                has_trailing_newline,
1351            );
1352        }
1353    }
1354
1355    reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1356}
1357
1358/// Core uniform reverse: given parsed parts, reconstruct lines from columnar data.
1359fn reverse_uniform_from_parts(
1360    data: &[u8],
1361    parts: &[Vec<u8>],
1362    num_rows: usize,
1363    num_cols: usize,
1364    has_trailing_newline: bool,
1365) -> Vec<u8> {
1366    let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1367    if col_chunks.len() != num_cols {
1368        return data.to_vec();
1369    }
1370
1371    let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1372    for chunk in &col_chunks {
1373        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1374        if vals.len() != num_rows {
1375            return data.to_vec();
1376        }
1377        columns.push(vals);
1378    }
1379
1380    let mut output = Vec::with_capacity(data.len() * 2);
1381    #[allow(clippy::needless_range_loop)]
1382    for row in 0..num_rows {
1383        output.extend_from_slice(&parts[0]);
1384        output.extend_from_slice(columns[0][row]);
1385        for col in 1..num_cols {
1386            output.extend_from_slice(&parts[col]);
1387            output.extend_from_slice(columns[col][row]);
1388        }
1389        output.extend_from_slice(&parts[num_cols]);
1390
1391        if row < num_rows - 1 || has_trailing_newline {
1392            output.push(b'\n');
1393        }
1394    }
1395
1396    output
1397}
1398
1399/// Parse Strategy 1 metadata and return (parts, num_rows, num_cols, has_trailing_newline).
1400/// Used by reverse_grouped to decode per-group metadata.
1401fn parse_uniform_metadata(metadata: &[u8]) -> Option<(Vec<Vec<u8>>, usize, usize, bool)> {
1402    if metadata.len() < 10 {
1403        return None;
1404    }
1405    let mut pos = 1; // Skip version byte.
1406    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1407    pos += 4;
1408    let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1409    pos += 2;
1410    let has_trailing_newline = metadata[pos] != 0;
1411    pos += 1;
1412    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1413    pos += 2;
1414
1415    let mut parts = Vec::with_capacity(num_parts);
1416    for _ in 0..num_parts {
1417        if pos + 2 > metadata.len() {
1418            return None;
1419        }
1420        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1421        pos += 2;
1422        if pos + part_len > metadata.len() {
1423            return None;
1424        }
1425        parts.push(metadata[pos..pos + part_len].to_vec());
1426        pos += part_len;
1427    }
1428
1429    if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1430        return None;
1431    }
1432
1433    Some((parts, num_rows, num_cols, has_trailing_newline))
1434}
1435
1436/// Reverse Strategy 2: grouped schema.
1437fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1438    if metadata.len() < 8 {
1439        return data.to_vec();
1440    }
1441
1442    let mut mpos = 1; // Skip version byte.
1443    let has_trailing_newline = metadata[mpos] != 0;
1444    mpos += 1;
1445    let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1446    mpos += 4;
1447    let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1448    mpos += 2;
1449
1450    // Allocate output slots.
1451    let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1452
1453    // Data cursor.
1454    let mut dpos: usize = 0;
1455
1456    for _ in 0..num_groups {
1457        // Read row indices for this group.
1458        if mpos + 4 > metadata.len() {
1459            return data.to_vec();
1460        }
1461        let group_row_count =
1462            u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1463        mpos += 4;
1464
1465        let mut row_indices = Vec::with_capacity(group_row_count);
1466        for _ in 0..group_row_count {
1467            if mpos + 4 > metadata.len() {
1468                return data.to_vec();
1469            }
1470            let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1471            mpos += 4;
1472            row_indices.push(idx);
1473        }
1474
1475        // Read group metadata.
1476        if mpos + 4 > metadata.len() {
1477            return data.to_vec();
1478        }
1479        let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1480        mpos += 4;
1481        if mpos + gm_len > metadata.len() {
1482            return data.to_vec();
1483        }
1484        let group_metadata = &metadata[mpos..mpos + gm_len];
1485        mpos += gm_len;
1486
1487        // Read group data from the data blob.
1488        if dpos + 4 > data.len() {
1489            return data.to_vec();
1490        }
1491        let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1492        dpos += 4;
1493        if dpos + gd_len > data.len() {
1494            return data.to_vec();
1495        }
1496        let group_data = &data[dpos..dpos + gd_len];
1497        dpos += gd_len;
1498
1499        // Decode this group using Strategy 1 reverse.
1500        let (parts, num_rows, num_cols, _trailing) = match parse_uniform_metadata(group_metadata) {
1501            Some(v) => v,
1502            None => return data.to_vec(),
1503        };
1504
1505        if num_rows != group_row_count {
1506            return data.to_vec();
1507        }
1508
1509        // Split columnar data into columns and values.
1510        let col_chunks: Vec<&[u8]> = group_data.split(|&b| b == COL_SEP).collect();
1511        if col_chunks.len() != num_cols {
1512            return data.to_vec();
1513        }
1514
1515        let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1516        for chunk in &col_chunks {
1517            let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1518            if vals.len() != num_rows {
1519                return data.to_vec();
1520            }
1521            columns.push(vals);
1522        }
1523
1524        // Reconstruct each line for this group.
1525        for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1526            let mut line = Vec::new();
1527            line.extend_from_slice(&parts[0]);
1528            line.extend_from_slice(columns[0][row_within_group]);
1529            for col in 1..num_cols {
1530                line.extend_from_slice(&parts[col]);
1531                line.extend_from_slice(columns[col][row_within_group]);
1532            }
1533            line.extend_from_slice(&parts[num_cols]);
1534
1535            if original_idx < total_rows {
1536                output_lines[original_idx] = Some(line);
1537            }
1538        }
1539    }
1540
1541    // Read residual indices from metadata.
1542    if mpos + 4 > metadata.len() {
1543        return data.to_vec();
1544    }
1545    let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1546    mpos += 4;
1547
1548    let mut residual_indices = Vec::with_capacity(residual_count);
1549    for _ in 0..residual_count {
1550        if mpos + 4 > metadata.len() {
1551            return data.to_vec();
1552        }
1553        let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1554        mpos += 4;
1555        residual_indices.push(idx);
1556    }
1557
1558    // Remaining data is residual lines.
1559    let residual_data = &data[dpos..];
1560    if residual_count > 0 {
1561        let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
1562            vec![]
1563        } else {
1564            residual_data.split(|&b| b == b'\n').collect()
1565        };
1566        // There should be exactly residual_count lines.
1567        if residual_lines.len() != residual_count {
1568            return data.to_vec();
1569        }
1570        for (i, &idx) in residual_indices.iter().enumerate() {
1571            if idx < total_rows {
1572                output_lines[idx] = Some(residual_lines[i].to_vec());
1573            }
1574        }
1575    }
1576
1577    // Assemble final output.
1578    let mut output = Vec::with_capacity(data.len() * 2);
1579    for (i, slot) in output_lines.iter().enumerate() {
1580        match slot {
1581            Some(line) => output.extend_from_slice(line),
1582            None => {
1583                // Should not happen — missing row. Return data as-is.
1584                return data.to_vec();
1585            }
1586        }
1587        if i < total_rows - 1 || has_trailing_newline {
1588            output.push(b'\n');
1589        }
1590    }
1591
1592    output
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597    use super::*;
1598
1599    #[test]
1600    fn extract_value_string() {
1601        let line = br#""hello","next""#;
1602        let (val, end) = extract_value(line, 0).unwrap();
1603        assert_eq!(val, b"\"hello\"");
1604        assert_eq!(end, 7);
1605    }
1606
1607    #[test]
1608    fn extract_value_number() {
1609        let line = b"42,next";
1610        let (val, end) = extract_value(line, 0).unwrap();
1611        assert_eq!(val, b"42");
1612        assert_eq!(end, 2);
1613    }
1614
1615    #[test]
1616    fn extract_value_bool() {
1617        let line = b"true,next";
1618        let (val, end) = extract_value(line, 0).unwrap();
1619        assert_eq!(val, b"true");
1620        assert_eq!(end, 4);
1621    }
1622
1623    #[test]
1624    fn extract_value_null() {
1625        let line = b"null,next";
1626        let (val, end) = extract_value(line, 0).unwrap();
1627        assert_eq!(val, b"null");
1628        assert_eq!(end, 4);
1629    }
1630
1631    #[test]
1632    fn extract_value_object() {
1633        let line = br#"{"a":1,"b":"x"},next"#;
1634        let (val, end) = extract_value(line, 0).unwrap();
1635        assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
1636        assert_eq!(end, 15);
1637    }
1638
1639    #[test]
1640    fn extract_value_array() {
1641        let line = b"[1,2,3],next";
1642        let (val, end) = extract_value(line, 0).unwrap();
1643        assert_eq!(val, b"[1,2,3]");
1644        assert_eq!(end, 7);
1645    }
1646
1647    #[test]
1648    fn extract_value_string_with_escapes() {
1649        let line = br#""he\"llo",next"#;
1650        let (val, end) = extract_value(line, 0).unwrap();
1651        assert_eq!(val, br#""he\"llo""#.to_vec());
1652        assert_eq!(end, 9);
1653    }
1654
1655    #[test]
1656    fn parse_line_simple() {
1657        let line = br#"{"a":1,"b":"x"}"#;
1658        let (parts, values) = parse_line(line).unwrap();
1659        assert_eq!(parts.len(), 3); // {"a": , ,"b": , }
1660        assert_eq!(values.len(), 2);
1661        assert_eq!(values[0], b"1");
1662        assert_eq!(values[1], b"\"x\"");
1663        assert_eq!(parts[0], br#"{"a":"#.to_vec());
1664        assert_eq!(parts[1], br#","b":"#.to_vec());
1665        assert_eq!(parts[2], b"}");
1666    }
1667
1668    #[test]
1669    fn roundtrip_simple() {
1670        let data = br#"{"a":1,"b":"x"}
1671{"a":2,"b":"y"}
1672{"a":3,"b":"z"}
1673"#;
1674        let result = preprocess(data).expect("should produce transform");
1675        let restored = reverse(&result.data, &result.metadata);
1676        assert_eq!(
1677            String::from_utf8_lossy(&restored),
1678            String::from_utf8_lossy(data),
1679        );
1680        assert_eq!(restored, data.to_vec());
1681    }
1682
1683    #[test]
1684    fn roundtrip_no_trailing_newline() {
1685        let data = br#"{"a":1,"b":"x"}
1686{"a":2,"b":"y"}
1687{"a":3,"b":"z"}"#;
1688        let result = preprocess(data).expect("should produce transform");
1689        let restored = reverse(&result.data, &result.metadata);
1690        assert_eq!(restored, data.to_vec());
1691    }
1692
1693    #[test]
1694    fn roundtrip_nested_values() {
1695        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1696{"id":2,"meta":{"x":30,"y":40}}
1697{"id":3,"meta":{"x":50,"y":60}}
1698{"id":4,"meta":{"x":70,"y":80}}
1699{"id":5,"meta":{"x":90,"y":100}}
1700"#;
1701        let result = preprocess(data).expect("should produce transform");
1702        let restored = reverse(&result.data, &result.metadata);
1703        assert_eq!(restored, data.to_vec());
1704    }
1705
1706    #[test]
1707    fn roundtrip_mixed_types() {
1708        let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
1709{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
1710{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
1711{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
1712{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
1713"#;
1714        let result = preprocess(data).expect("should produce transform");
1715        let restored = reverse(&result.data, &result.metadata);
1716        assert_eq!(restored, data.to_vec());
1717    }
1718
1719    #[test]
1720    fn schema_mismatch_too_few_returns_none() {
1721        // Different keys on different lines, but each group has < MIN_GROUP_ROWS.
1722        let data = br#"{"a":1,"b":2}
1723{"a":1,"c":3}
1724"#;
1725        assert!(preprocess(data).is_none());
1726    }
1727
1728    #[test]
1729    fn different_num_keys_too_few_returns_none() {
1730        let data = br#"{"a":1,"b":2}
1731{"a":1}
1732"#;
1733        assert!(preprocess(data).is_none());
1734    }
1735
1736    #[test]
1737    fn single_line_returns_none() {
1738        let data = br#"{"a":1,"b":2}
1739"#;
1740        assert!(preprocess(data).is_none());
1741    }
1742
1743    #[test]
1744    fn empty_returns_none() {
1745        assert!(preprocess(b"").is_none());
1746    }
1747
1748    #[test]
1749    fn column_layout_groups_similar_values() {
1750        let data = br#"{"type":"page_view","user":"alice"}
1751{"type":"api_call","user":"alice"}
1752{"type":"click","user":"bob"}
1753"#;
1754        let result = preprocess(data).unwrap();
1755
1756        // The columnar data should have type values grouped, then user values grouped.
1757        let col_data = &result.data;
1758        let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
1759        assert_eq!(cols.len(), 2);
1760
1761        // Column 0 = type values.
1762        let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
1763        assert_eq!(type_vals.len(), 3);
1764        assert_eq!(type_vals[0], br#""page_view""#);
1765        assert_eq!(type_vals[1], br#""api_call""#);
1766        assert_eq!(type_vals[2], br#""click""#);
1767
1768        // Column 1 = user values.
1769        let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
1770        assert_eq!(user_vals.len(), 3);
1771        assert_eq!(user_vals[0], br#""alice""#);
1772        assert_eq!(user_vals[1], br#""alice""#);
1773        assert_eq!(user_vals[2], br#""bob""#);
1774    }
1775
1776    #[test]
1777    fn roundtrip_string_with_escaped_chars() {
1778        let data = br#"{"msg":"he said \"hi\"","val":1}
1779{"msg":"line\nbreak","val":2}
1780{"msg":"tab\there","val":3}
1781{"msg":"back\\slash","val":4}
1782{"msg":"normal text","val":5}
1783"#;
1784        let result = preprocess(data).expect("should produce transform");
1785        let restored = reverse(&result.data, &result.metadata);
1786        assert_eq!(restored, data.to_vec());
1787    }
1788
1789    #[test]
1790    fn roundtrip_negative_and_float_numbers() {
1791        let data = br#"{"x":-3.14,"y":0}
1792{"x":2.718,"y":-1}
1793{"x":0.001,"y":999}
1794{"x":-100,"y":-200}
1795{"x":42.0,"y":7}
1796"#;
1797        let result = preprocess(data).expect("should produce transform");
1798        let restored = reverse(&result.data, &result.metadata);
1799        assert_eq!(restored, data.to_vec());
1800    }
1801
1802    /// Test that the transform+reverse is lossless even for tiny inputs
1803    /// by repeating them to pass the size check threshold.
1804    #[test]
1805    fn reverse_roundtrip_small_data() {
1806        // Verify parse_line works on small lines.
1807        let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
1808        assert_eq!(vals.len(), 2);
1809        assert_eq!(parts.len(), 3);
1810
1811        // 2-row data might fail the size check, so repeat to get enough rows.
1812        let big_data = br#"{"x":-3.14,"y":0}
1813{"x":2.718,"y":-1}
1814"#
1815        .repeat(20);
1816        let result = preprocess(&big_data).expect("should produce transform with 40 rows");
1817        let restored = reverse(&result.data, &result.metadata);
1818        assert_eq!(restored, big_data);
1819    }
1820
1821    // --- Strategy 2 (grouped) tests ---
1822
1823    #[test]
1824    fn grouped_roundtrip_two_schemas() {
1825        // Two different schemas, each with >= MIN_GROUP_ROWS rows.
1826        let mut data = Vec::new();
1827        for i in 0..10 {
1828            data.extend_from_slice(
1829                format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1830            );
1831            data.push(b'\n');
1832        }
1833        for i in 10..20 {
1834            data.extend_from_slice(
1835                format!(
1836                    r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1837                    i, i, i
1838                )
1839                .as_bytes(),
1840            );
1841            data.push(b'\n');
1842        }
1843        let result = preprocess(&data).expect("should produce grouped transform");
1844        // Should be version 2 (grouped).
1845        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1846        let restored = reverse(&result.data, &result.metadata);
1847        assert_eq!(restored, data);
1848    }
1849
1850    #[test]
1851    fn grouped_roundtrip_interleaved_schemas() {
1852        // Interleaved schemas: alternating between two different key sets.
1853        let mut data = Vec::new();
1854        for i in 0..20 {
1855            if i % 2 == 0 {
1856                data.extend_from_slice(
1857                    format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1858                );
1859            } else {
1860                data.extend_from_slice(
1861                    format!(
1862                        r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1863                        i, i, i
1864                    )
1865                    .as_bytes(),
1866                );
1867            }
1868            data.push(b'\n');
1869        }
1870        let result = preprocess(&data).expect("should produce grouped transform");
1871        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1872        let restored = reverse(&result.data, &result.metadata);
1873        assert_eq!(restored, data);
1874    }
1875
1876    #[test]
1877    fn grouped_roundtrip_with_residuals() {
1878        // Two large groups + a few unique-schema rows (residuals).
1879        let mut data = Vec::new();
1880        // Group A: 8 rows.
1881        for i in 0..8 {
1882            data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
1883            data.push(b'\n');
1884        }
1885        // 2 unique rows (will be residual).
1886        data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
1887        data.push(b'\n');
1888        data.extend_from_slice(br#"{"p":"q"}"#);
1889        data.push(b'\n');
1890        // Group B: 6 rows.
1891        for i in 0..6 {
1892            data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
1893            data.push(b'\n');
1894        }
1895        let result = preprocess(&data).expect("should produce grouped transform");
1896        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1897        let restored = reverse(&result.data, &result.metadata);
1898        assert_eq!(
1899            String::from_utf8_lossy(&restored),
1900            String::from_utf8_lossy(&data),
1901        );
1902        assert_eq!(restored, data);
1903    }
1904
1905    #[test]
1906    fn grouped_roundtrip_no_trailing_newline() {
1907        let mut data = Vec::new();
1908        for i in 0..6 {
1909            data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
1910            data.push(b'\n');
1911        }
1912        for i in 0..6 {
1913            data.extend_from_slice(
1914                format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
1915            );
1916            if i < 5 {
1917                data.push(b'\n');
1918            }
1919            // Last line: no trailing newline.
1920        }
1921        let result = preprocess(&data).expect("should produce grouped transform");
1922        let restored = reverse(&result.data, &result.metadata);
1923        assert_eq!(restored, data);
1924    }
1925
1926    #[test]
1927    fn uniform_still_preferred_over_grouped() {
1928        // All rows same schema — should use Strategy 1 (version 1), not Strategy 2.
1929        let data = br#"{"a":1,"b":"x"}
1930{"a":2,"b":"y"}
1931{"a":3,"b":"z"}
1932{"a":4,"b":"w"}
1933{"a":5,"b":"v"}
1934"#;
1935        let result = preprocess(data).expect("should produce transform");
1936        assert_eq!(
1937            result.metadata[0], METADATA_VERSION_UNIFORM,
1938            "uniform schema should use Strategy 1"
1939        );
1940        let restored = reverse(&result.data, &result.metadata);
1941        assert_eq!(restored, data.to_vec());
1942    }
1943
1944    #[test]
1945    fn grouped_gharchive_simulation() {
1946        // Simulates GitHub Archive: most rows have 7 keys, some have 8.
1947        let mut data = Vec::new();
1948        for i in 0..50 {
1949            if i % 5 == 0 {
1950                // 8-key rows (with org).
1951                data.extend_from_slice(
1952                    format!(
1953                        r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
1954                        i, i, i, i
1955                    )
1956                    .as_bytes(),
1957                );
1958            } else {
1959                // 7-key rows (no org).
1960                data.extend_from_slice(
1961                    format!(
1962                        r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
1963                        i, i, i
1964                    )
1965                    .as_bytes(),
1966                );
1967            }
1968            data.push(b'\n');
1969        }
1970        let result = preprocess(&data).expect("should produce grouped transform");
1971        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1972        let restored = reverse(&result.data, &result.metadata);
1973        assert_eq!(restored, data);
1974    }
1975
1976    // --- Nested decomposition tests ---
1977
1978    #[test]
1979    fn test_nested_decomposition_basic() {
1980        // Simple nested object decomposed correctly.
1981        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1982{"id":2,"meta":{"x":30,"y":40}}
1983{"id":3,"meta":{"x":50,"y":60}}
1984"#;
1985        let result = preprocess(data).expect("should produce transform");
1986        assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
1987
1988        // The columnar data should have expanded columns.
1989        let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
1990        // Original: 2 cols (id, meta). After flattening: 3 cols (id, meta.x, meta.y).
1991        assert_eq!(
1992            cols.len(),
1993            3,
1994            "should have 3 columns after flattening: got {}",
1995            cols.len()
1996        );
1997
1998        // Verify sub-columns contain the extracted values.
1999        let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2000        assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
2001
2002        let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
2003        assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
2004    }
2005
2006    #[test]
2007    fn test_nested_roundtrip() {
2008        // Flatten -> unflatten produces byte-exact original.
2009        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2010{"id":2,"meta":{"x":30,"y":40}}
2011{"id":3,"meta":{"x":50,"y":60}}
2012"#;
2013        let result = preprocess(data).expect("should produce transform");
2014        let restored = reverse(&result.data, &result.metadata);
2015        assert_eq!(
2016            String::from_utf8_lossy(&restored),
2017            String::from_utf8_lossy(data),
2018        );
2019        assert_eq!(restored, data.to_vec());
2020    }
2021
2022    #[test]
2023    fn test_nested_mixed_schemas() {
2024        // Different nested objects per row (some keys missing -> null).
2025        let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
2026{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
2027{"ts":"c","meta":{"query":"pricing","results_count":25}}
2028{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
2029{"ts":"e","meta":{"query":"api docs","results_count":41}}
2030"#;
2031        let result = preprocess(data).expect("should produce transform");
2032        let restored = reverse(&result.data, &result.metadata);
2033        assert_eq!(
2034            String::from_utf8_lossy(&restored),
2035            String::from_utf8_lossy(data),
2036        );
2037        assert_eq!(restored, data.to_vec());
2038    }
2039
2040    #[test]
2041    fn test_nested_no_nested_objects() {
2042        // Returns None when no nested objects — flat data should still work.
2043        let data = br#"{"a":1,"b":"x"}
2044{"a":2,"b":"y"}
2045{"a":3,"b":"z"}
2046"#;
2047        let result = preprocess(data).expect("should produce transform");
2048        let restored = reverse(&result.data, &result.metadata);
2049        assert_eq!(restored, data.to_vec());
2050
2051        // Verify the metadata has has_nested=0 since no nested objects.
2052        // The nested flag is appended after template parts.
2053        // For uniform, metadata starts with version(1) + num_rows(4) + num_cols(2) +
2054        // trailing_newline(1) + num_parts(2) + parts.
2055        // After those parts, there should be a 0 byte (has_nested=0).
2056        let meta = &result.metadata;
2057        let last_byte = meta[meta.len() - 1];
2058        assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
2059    }
2060
2061    #[test]
2062    fn test_nested_real_corpus() {
2063        // Test with data shaped like the test-ndjson.ndjson corpus.
2064        let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
2065{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
2066{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
2067{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
2068{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
2069"#;
2070        let result = preprocess(data).expect("should produce transform");
2071        let restored = reverse(&result.data, &result.metadata);
2072        assert_eq!(
2073            String::from_utf8_lossy(&restored),
2074            String::from_utf8_lossy(data),
2075        );
2076        assert_eq!(restored, data.to_vec());
2077    }
2078
2079    #[test]
2080    fn test_nested_roundtrip_with_null_values() {
2081        // Some rows have null for the nested field.
2082        let data = br#"{"id":1,"meta":{"x":10}}
2083{"id":2,"meta":null}
2084{"id":3,"meta":{"x":30}}
2085{"id":4,"meta":null}
2086{"id":5,"meta":{"x":50}}
2087"#;
2088        let result = preprocess(data).expect("should produce transform");
2089        let restored = reverse(&result.data, &result.metadata);
2090        assert_eq!(restored, data.to_vec());
2091    }
2092
2093    #[test]
2094    fn test_nested_string_values_preserved_exact() {
2095        // Verify that string values in nested objects preserve exact bytes (with quotes).
2096        let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2097{"id":2,"meta":{"name":"Bob","score":200}}
2098{"id":3,"meta":{"name":"Charlie","score":300}}
2099"#;
2100        let result = preprocess(data).expect("should produce transform");
2101        let restored = reverse(&result.data, &result.metadata);
2102        assert_eq!(restored, data.to_vec());
2103    }
2104
2105    #[test]
2106    fn test_parse_nested_object_kv() {
2107        let obj = br#"{"query":"benchmark","results_count":14}"#;
2108        let pairs = parse_nested_object_kv(obj).unwrap();
2109        assert_eq!(pairs.len(), 2);
2110        assert_eq!(pairs[0].0, b"query");
2111        assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2112        assert_eq!(pairs[1].0, b"results_count");
2113        assert_eq!(pairs[1].1, b"14");
2114    }
2115
2116    #[test]
2117    fn test_nested_varying_subkeys_roundtrip() {
2118        // Regression: rows with varying sub-keys in nested objects must
2119        // round-trip byte-exact. Even rows have `extra`, odd rows don't.
2120        let mut lines = Vec::new();
2121        for i in 0..50 {
2122            let line = if i % 2 == 0 {
2123                format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
2124            } else {
2125                format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
2126            };
2127            lines.push(line);
2128        }
2129        let ndjson = lines.join("\n") + "\n";
2130        let data = ndjson.as_bytes();
2131
2132        let result = preprocess(data).expect("should produce transform");
2133        let restored = reverse(&result.data, &result.metadata);
2134        assert_eq!(
2135            std::str::from_utf8(&restored).unwrap(),
2136            std::str::from_utf8(data).unwrap(),
2137            "varying sub-keys roundtrip must be byte-exact"
2138        );
2139    }
2140
2141    #[test]
2142    fn test_nested_explicit_null_preserved() {
2143        // Explicit null values in nested objects must survive roundtrip.
2144        // `{"x":1,"y":null}` must NOT be collapsed to `{"x":1}`.
2145        let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
2146                     {\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
2147                     {\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
2148        let result = preprocess(data).expect("should produce transform");
2149        let restored = reverse(&result.data, &result.metadata);
2150        assert_eq!(
2151            std::str::from_utf8(&restored).unwrap(),
2152            std::str::from_utf8(data).unwrap(),
2153            "explicit null values must be preserved"
2154        );
2155    }
2156}