Skip to main content

datacortex_core/format/
ndjson.rs

1//! NDJSON columnar reorg — lossless transform that reorders row-oriented
2//! NDJSON data into column-oriented layout.
3//!
4//! Two strategies:
5//!
6//! **Strategy 1 (uniform):** All rows share the same schema (keys in same order).
7//!   Row-oriented (before):
8//!     {"ts":"2026-03-15T10:30:00.001Z","type":"page_view","user":"usr_a1b2c3d4"}
9//!     {"ts":"2026-03-15T10:30:00.234Z","type":"api_call","user":"usr_a1b2c3d4"}
10//!   Column-oriented (after):
11//!     [ts values] "2026-03-15T10:30:00.001Z" \x01 "2026-03-15T10:30:00.234Z" \x00
12//!     [type values] "page_view" \x01 "api_call" \x00
13//!     [user values] "usr_a1b2c3d4" \x01 "usr_a1b2c3d4"
14//!
15//! **Strategy 2 (grouped):** Rows have diverse schemas (e.g., GitHub Archive events).
16//!   Groups rows by schema, applies Strategy 1 per group, stores residual rows raw.
17//!   Metadata version byte distinguishes: 1 = uniform, 2 = grouped.
18//!
19//! Separators:
20//!   \x00 = column separator (cannot appear in valid JSON text)
21//!   \x01 = value separator within a column (cannot appear in valid JSON)
22
23use super::transform::TransformResult;
24use std::collections::HashMap;
25
26const COL_SEP: u8 = 0x00;
27const VAL_SEP: u8 = 0x01;
28const METADATA_VERSION_UNIFORM: u8 = 1;
29const METADATA_VERSION_GROUPED: u8 = 2;
30
31/// Minimum rows in a schema group for it to be columnarized (not residual).
32const MIN_GROUP_ROWS: usize = 5;
33
34/// A schema group: template parts + list of (row_index, parsed_values).
35type SchemaGroup = (Vec<Vec<u8>>, Vec<(usize, Vec<Vec<u8>>)>);
36
37/// Extract the raw value bytes from a JSON line at a given position.
38/// `pos` should point to the first byte of the value (after `:`).
39/// Returns (value_bytes, end_position).
40///
41/// Handles: strings, numbers, booleans, null, nested objects, nested arrays.
42fn extract_value(line: &[u8], mut pos: usize) -> Option<(Vec<u8>, usize)> {
43    // Skip whitespace after colon.
44    while pos < line.len() && line[pos].is_ascii_whitespace() {
45        pos += 1;
46    }
47    if pos >= line.len() {
48        return None;
49    }
50
51    let start = pos;
52    match line[pos] {
53        b'"' => {
54            // String value — scan to closing unescaped quote.
55            pos += 1;
56            let mut escaped = false;
57            while pos < line.len() {
58                if escaped {
59                    escaped = false;
60                } else if line[pos] == b'\\' {
61                    escaped = true;
62                } else if line[pos] == b'"' {
63                    pos += 1;
64                    return Some((line[start..pos].to_vec(), pos));
65                }
66                pos += 1;
67            }
68            None // Unterminated string.
69        }
70        b'{' => {
71            // Nested object — match braces, respecting strings.
72            let mut depth = 1;
73            pos += 1;
74            while pos < line.len() && depth > 0 {
75                match line[pos] {
76                    b'"' => {
77                        // Skip over string contents.
78                        pos += 1;
79                        let mut escaped = false;
80                        while pos < line.len() {
81                            if escaped {
82                                escaped = false;
83                            } else if line[pos] == b'\\' {
84                                escaped = true;
85                            } else if line[pos] == b'"' {
86                                break;
87                            }
88                            pos += 1;
89                        }
90                    }
91                    b'{' => depth += 1,
92                    b'}' => depth -= 1,
93                    _ => {}
94                }
95                pos += 1;
96            }
97            if depth != 0 || pos > line.len() {
98                return None; // Unterminated object.
99            }
100            Some((line[start..pos].to_vec(), pos))
101        }
102        b'[' => {
103            // Nested array — match brackets, respecting strings.
104            let mut depth = 1;
105            pos += 1;
106            while pos < line.len() && depth > 0 {
107                match line[pos] {
108                    b'"' => {
109                        pos += 1;
110                        let mut escaped = false;
111                        while pos < line.len() {
112                            if escaped {
113                                escaped = false;
114                            } else if line[pos] == b'\\' {
115                                escaped = true;
116                            } else if line[pos] == b'"' {
117                                break;
118                            }
119                            pos += 1;
120                        }
121                    }
122                    b'[' => depth += 1,
123                    b']' => depth -= 1,
124                    _ => {}
125                }
126                pos += 1;
127            }
128            if depth != 0 || pos > line.len() {
129                return None; // Unterminated array.
130            }
131            Some((line[start..pos].to_vec(), pos))
132        }
133        _ => {
134            // Number, boolean, null — scan until , or } or ] or whitespace.
135            while pos < line.len() {
136                match line[pos] {
137                    b',' | b'}' | b']' => break,
138                    _ if line[pos].is_ascii_whitespace() => break,
139                    _ => pos += 1,
140                }
141            }
142            if pos == start {
143                None
144            } else {
145                Some((line[start..pos].to_vec(), pos))
146            }
147        }
148    }
149}
150
151/// Parse a single JSON line into (template_parts, values).
152///
153/// Template parts are the structural bytes between values:
154///   Part 0 = everything from { up to and including the : before value 0
155///   Part 1 = everything from after value 0 up to and including : before value 1
156///   ...
157///   Part N = everything from after the last value to end of line (including } and \n)
158///
159/// Returns None if the line is not a flat-ish JSON object (we handle nested values
160/// as opaque blobs, but the top-level structure must be key:value pairs).
161type ParsedLine = (Vec<Vec<u8>>, Vec<Vec<u8>>);
162
163fn parse_line(line: &[u8]) -> Option<ParsedLine> {
164    let mut pos = 0;
165
166    // Skip leading whitespace.
167    while pos < line.len() && line[pos].is_ascii_whitespace() {
168        pos += 1;
169    }
170    if pos >= line.len() || line[pos] != b'{' {
171        return None;
172    }
173
174    let mut parts: Vec<Vec<u8>> = Vec::new();
175    let mut values: Vec<Vec<u8>> = Vec::new();
176    let mut part_start = 0;
177
178    pos += 1; // Skip opening {.
179
180    loop {
181        // Skip whitespace.
182        while pos < line.len() && line[pos].is_ascii_whitespace() {
183            pos += 1;
184        }
185        if pos >= line.len() {
186            return None;
187        }
188
189        // Check for closing brace (end of object).
190        if line[pos] == b'}' {
191            // Capture the final part: everything from part_start to end of line.
192            parts.push(line[part_start..].to_vec());
193            break;
194        }
195
196        // Expect a key string.
197        if line[pos] != b'"' {
198            return None;
199        }
200        // Skip over the key string.
201        pos += 1;
202        let mut escaped = false;
203        while pos < line.len() {
204            if escaped {
205                escaped = false;
206            } else if line[pos] == b'\\' {
207                escaped = true;
208            } else if line[pos] == b'"' {
209                pos += 1;
210                break;
211            }
212            pos += 1;
213        }
214
215        // Skip whitespace, expect colon.
216        while pos < line.len() && line[pos].is_ascii_whitespace() {
217            pos += 1;
218        }
219        if pos >= line.len() || line[pos] != b':' {
220            return None;
221        }
222        pos += 1; // Skip colon.
223
224        // Skip whitespace after colon so it becomes part of the template.
225        // Without this, spaces in "key": value would be lost during reverse
226        // because extract_value also skips leading whitespace.
227        while pos < line.len() && line[pos].is_ascii_whitespace() {
228            pos += 1;
229        }
230
231        // Everything from part_start up to here is a "template part".
232        parts.push(line[part_start..pos].to_vec());
233
234        // Extract the value.
235        let (value, value_end) = extract_value(line, pos)?;
236        values.push(value);
237        pos = value_end;
238
239        // Mark the start of the next part.
240        part_start = pos;
241
242        // Skip whitespace.
243        while pos < line.len() && line[pos].is_ascii_whitespace() {
244            pos += 1;
245        }
246        if pos >= line.len() {
247            return None;
248        }
249
250        // Expect comma or closing brace.
251        if line[pos] == b',' {
252            pos += 1;
253        } else if line[pos] == b'}' {
254            // Will be caught at the top of the loop next iteration — but we
255            // need to NOT advance pos, so the } check above catches it.
256            // Actually, let's just handle it here.
257            parts.push(line[part_start..].to_vec());
258            break;
259        } else {
260            return None; // Unexpected character.
261        }
262    }
263
264    if values.is_empty() {
265        return None;
266    }
267
268    Some((parts, values))
269}
270
271/// Split NDJSON data into lines (without newline characters).
272fn split_lines(data: &[u8]) -> Vec<&[u8]> {
273    let mut lines: Vec<&[u8]> = Vec::new();
274    let mut start = 0;
275    for i in 0..data.len() {
276        if data[i] == b'\n' {
277            lines.push(&data[start..i]);
278            start = i + 1;
279        }
280    }
281    if start < data.len() {
282        lines.push(&data[start..]);
283    }
284    lines
285}
286
287/// Build columnar data from parsed lines that share the same template.
288/// Returns (col_data, metadata) for a uniform group.
289fn build_uniform_columnar(
290    template_parts: &[Vec<u8>],
291    columns: &[Vec<Vec<u8>>],
292    num_rows: usize,
293    has_trailing_newline: bool,
294) -> (Vec<u8>, Vec<u8>) {
295    let num_cols = columns.len();
296
297    // Build column data: values separated by \x01, columns separated by \x00.
298    let mut col_data = Vec::new();
299    for (ci, col) in columns.iter().enumerate() {
300        for (ri, val) in col.iter().enumerate() {
301            col_data.extend_from_slice(val);
302            if ri < num_rows - 1 {
303                col_data.push(VAL_SEP);
304            }
305        }
306        if ci < num_cols - 1 {
307            col_data.push(COL_SEP);
308        }
309    }
310
311    // Build metadata: version + num_rows + num_cols + trailing_newline + template parts.
312    let mut metadata = Vec::new();
313    metadata.push(METADATA_VERSION_UNIFORM);
314    metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
315    metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
316    metadata.push(if has_trailing_newline { 1 } else { 0 });
317    metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
318    for part in template_parts {
319        metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
320        metadata.extend_from_slice(part);
321    }
322
323    (col_data, metadata)
324}
325
326/// Strategy 1: Uniform schema — all rows must have the same template.
327/// Returns None if schemas differ.
328fn preprocess_uniform(
329    non_empty: &[&[u8]],
330    has_trailing_newline: bool,
331) -> Option<(Vec<u8>, Vec<u8>)> {
332    if non_empty.len() < 2 {
333        return None;
334    }
335
336    let (template_parts, first_values) = parse_line(non_empty[0])?;
337    let num_cols = first_values.len();
338    if template_parts.len() != num_cols + 1 {
339        return None;
340    }
341
342    let mut columns: Vec<Vec<Vec<u8>>> = Vec::with_capacity(num_cols);
343    for v in &first_values {
344        columns.push(vec![v.clone()]);
345    }
346
347    for &line in &non_empty[1..] {
348        let (parts, values) = parse_line(line)?;
349        if values.len() != num_cols || parts.len() != template_parts.len() {
350            return None;
351        }
352        for (a, b) in parts.iter().zip(template_parts.iter()) {
353            if a != b {
354                return None;
355            }
356        }
357        for (col, val) in values.iter().enumerate() {
358            columns[col].push(val.clone());
359        }
360    }
361
362    Some(build_uniform_columnar(
363        &template_parts,
364        &columns,
365        non_empty.len(),
366        has_trailing_newline,
367    ))
368}
369
370/// Strategy 2: Group-by-schema — group rows by template, columnarize each group.
371///
372/// Metadata format (version=2):
373///   version: u8 = 2
374///   has_trailing_newline: u8
375///   total_rows: u32 LE
376///   num_groups: u16 LE
377///   for each group:
378///     num_rows: u32 LE
379///     row_indices: [u32 LE * num_rows]
380///     group_metadata_len: u32 LE
381///     group_metadata: [bytes]  (Strategy 1 metadata for this group)
382///   residual_count: u32 LE
383///   residual_indices: [u32 LE * residual_count]
384///
385/// Data format:
386///   for each group:
387///     data_len: u32 LE
388///     data: [bytes]  (columnar data for this group)
389///   residual_data: [bytes]  (raw lines joined by \n)
390fn preprocess_grouped(
391    non_empty: &[&[u8]],
392    has_trailing_newline: bool,
393) -> Option<(Vec<u8>, Vec<u8>)> {
394    if non_empty.len() < MIN_GROUP_ROWS {
395        return None;
396    }
397
398    // Parse all lines and group by template (the template parts identify the schema).
399    // We use the template parts as the group key.
400    let mut parsed: Vec<Option<ParsedLine>> = Vec::with_capacity(non_empty.len());
401    for &line in non_empty {
402        parsed.push(parse_line(line));
403    }
404
405    // Group rows by template. Key = template parts (as bytes for hashing).
406    // We store groups as: template_key -> (template_parts, vec of (row_index, values)).
407    let mut group_map: HashMap<Vec<u8>, SchemaGroup> = HashMap::new();
408    let mut residual_indices: Vec<usize> = Vec::new();
409
410    for (idx, parsed_line) in parsed.into_iter().enumerate() {
411        if let Some((parts, values)) = parsed_line {
412            // Build a hashable key from the template parts.
413            let mut key = Vec::new();
414            for part in &parts {
415                key.extend_from_slice(&(part.len() as u32).to_le_bytes());
416                key.extend_from_slice(part);
417            }
418            group_map
419                .entry(key)
420                .or_insert_with(|| (parts, Vec::new()))
421                .1
422                .push((idx, values));
423        } else {
424            // Unparseable line goes to residual.
425            residual_indices.push(idx);
426        }
427    }
428
429    // Separate groups into qualifying (>= MIN_GROUP_ROWS) and residual.
430    let mut groups: Vec<SchemaGroup> = Vec::new();
431    for (_key, (template_parts, rows)) in group_map {
432        if rows.len() >= MIN_GROUP_ROWS {
433            groups.push((template_parts, rows));
434        } else {
435            // Too few rows — send to residual.
436            for (idx, _) in &rows {
437                residual_indices.push(*idx);
438            }
439        }
440    }
441
442    // Need at least 1 qualifying group for this to be useful.
443    if groups.is_empty() {
444        return None;
445    }
446
447    // Sort groups by their first row index for deterministic output.
448    groups.sort_by_key(|(_, rows)| rows[0].0);
449    residual_indices.sort_unstable();
450
451    // Build per-group columnar data and metadata.
452    struct GroupOutput {
453        row_indices: Vec<u32>,
454        col_data: Vec<u8>,
455        group_metadata: Vec<u8>,
456    }
457
458    let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
459
460    for (template_parts, rows) in &groups {
461        let num_cols = template_parts.len() - 1;
462        let mut columns: Vec<Vec<Vec<u8>>> = (0..num_cols).map(|_| Vec::new()).collect();
463        let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
464
465        for (idx, values) in rows {
466            row_indices.push(*idx as u32);
467            for (col, val) in values.iter().enumerate() {
468                columns[col].push(val.clone());
469            }
470        }
471
472        // Build columnar data for this group (trailing_newline=false for sub-groups).
473        let (col_data, group_metadata) =
474            build_uniform_columnar(template_parts, &columns, rows.len(), false);
475
476        group_outputs.push(GroupOutput {
477            row_indices,
478            col_data,
479            group_metadata,
480        });
481    }
482
483    // Build the combined data blob.
484    let mut data_out = Vec::new();
485    for group in &group_outputs {
486        data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
487        data_out.extend_from_slice(&group.col_data);
488    }
489
490    // Append residual lines (raw, separated by \n).
491    let residual_start = data_out.len();
492    for (i, &idx) in residual_indices.iter().enumerate() {
493        data_out.extend_from_slice(non_empty[idx]);
494        if i < residual_indices.len() - 1 {
495            data_out.push(b'\n');
496        }
497    }
498    let _residual_len = data_out.len() - residual_start;
499
500    // Build the combined metadata.
501    let mut metadata = Vec::new();
502    metadata.push(METADATA_VERSION_GROUPED);
503    metadata.push(if has_trailing_newline { 1 } else { 0 });
504    metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
505    metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
506
507    for group in &group_outputs {
508        metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
509        for &idx in &group.row_indices {
510            metadata.extend_from_slice(&idx.to_le_bytes());
511        }
512        metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
513        metadata.extend_from_slice(&group.group_metadata);
514    }
515
516    metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
517    for &idx in &residual_indices {
518        metadata.extend_from_slice(&(idx as u32).to_le_bytes());
519    }
520
521    Some((data_out, metadata))
522}
523
524/// Metadata describing which columns were flattened from nested objects.
525pub(crate) struct NestedGroupInfo {
526    /// Index of the original column that was expanded.
527    pub(crate) original_col_index: u16,
528    /// Sub-key names for the expanded sub-columns.
529    pub(crate) sub_keys: Vec<Vec<u8>>,
530    /// Template parts for reconstructing nested objects (preserves original formatting).
531    /// If empty, compact format `{"key":val,...}` is used (NDJSON compatibility).
532    pub(crate) nested_template: Vec<Vec<u8>>,
533    /// Absence bitmap: one bit per (sub_key, row). Bit=1 means the key was ABSENT
534    /// in the original object (not present at all), vs bit=0 means key was present
535    /// (possibly with explicit `null`). Packed LSB-first.
536    /// Length = ceil(num_sub_keys * num_rows / 8).
537    /// Empty if all rows had all keys (no absences).
538    pub(crate) absence_bitmap: Vec<u8>,
539}
540
541/// Attempt to flatten nested JSON objects in columnar data (depth-1).
542///
543/// Takes columnar data (\x00/\x01 separated) and returns expanded columnar data
544/// with nested objects decomposed into sub-columns.
545/// Returns None if no nested objects found.
546pub(crate) fn flatten_nested_columns(
547    col_data: &[u8],
548    num_rows: usize,
549) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
550    // Split into columns.
551    let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
552    if columns.is_empty() || num_rows == 0 {
553        return None;
554    }
555
556    let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
557    // Build the output columns: for non-nested cols, keep as-is.
558    // For nested cols, replace with sub-columns.
559    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
560
561    for (col_idx, &col_chunk) in columns.iter().enumerate() {
562        let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
563        if values.len() != num_rows {
564            return None;
565        }
566
567        // Check if ALL non-null values start with '{' (nested object).
568        let mut all_objects = true;
569        let mut has_non_null = false;
570        for val in &values {
571            if *val == b"null" {
572                continue;
573            }
574            has_non_null = true;
575            if !val.starts_with(b"{") {
576                all_objects = false;
577                break;
578            }
579        }
580
581        if !all_objects || !has_non_null {
582            // Not a nested-object column — keep as-is.
583            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
584            output_columns.push(col_values);
585            continue;
586        }
587
588        // This column contains nested objects — decompose depth-1.
589        // Parse all values and collect all unique sub-keys (preserving discovery order).
590        // Also capture the template from the first non-null object to preserve formatting.
591        let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
592        let mut nested_template: Vec<Vec<u8>> = Vec::new();
593        type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
594        let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
595
596        for val in &values {
597            if *val == b"null" {
598                parsed_rows.push(None);
599                continue;
600            }
601            if nested_template.is_empty() {
602                // First non-null: use template-preserving parser.
603                match parse_nested_object_with_template(val) {
604                    Some((template, kv_pairs)) => {
605                        for (key, _) in &kv_pairs {
606                            if !all_sub_keys.iter().any(|k| k == key) {
607                                all_sub_keys.push(key.clone());
608                            }
609                        }
610                        nested_template = template;
611                        parsed_rows.push(Some(kv_pairs));
612                    }
613                    None => {
614                        all_sub_keys.clear();
615                        break;
616                    }
617                }
618            } else {
619                // Subsequent rows: use simpler kv parser (template already captured).
620                match parse_nested_object_kv(val) {
621                    Some(kv_pairs) => {
622                        for (key, _) in &kv_pairs {
623                            if !all_sub_keys.iter().any(|k| k == key) {
624                                all_sub_keys.push(key.clone());
625                            }
626                        }
627                        parsed_rows.push(Some(kv_pairs));
628                    }
629                    None => {
630                        all_sub_keys.clear();
631                        break;
632                    }
633                }
634            }
635        }
636
637        if all_sub_keys.is_empty() {
638            // Could not parse — keep column as-is.
639            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
640            output_columns.push(col_values);
641            continue;
642        }
643
644        // Build sub-columns: for each sub-key, extract values from parsed rows.
645        // Also build an absence bitmap: bit=1 where a key was absent from the
646        // original row (as opposed to being present with explicit `null`).
647        let num_sub_keys = all_sub_keys.len();
648        let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
649        let total_bits = num_sub_keys * num_rows;
650        let bitmap_bytes = total_bits.div_ceil(8);
651        let mut absence_bitmap = vec![0u8; bitmap_bytes];
652        let mut has_any_absent = false;
653
654        for (row_idx, parsed) in parsed_rows.iter().enumerate() {
655            match parsed {
656                Some(kv_pairs) => {
657                    for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
658                        let found = kv_pairs.iter().find(|(k, _)| k == sk);
659                        match found {
660                            Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
661                            None => {
662                                sub_columns[sk_idx].push(b"null".to_vec());
663                                // Mark this (sub_key, row) as absent.
664                                let bit_idx = sk_idx * num_rows + row_idx;
665                                absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
666                                has_any_absent = true;
667                            }
668                        }
669                    }
670                }
671                None => {
672                    // null row — all sub-columns get null.
673                    // These are NOT marked as absent (the whole column was null,
674                    // not individual keys missing).
675                    for sc in sub_columns.iter_mut() {
676                        sc.push(b"null".to_vec());
677                    }
678                }
679            }
680        }
681
682        nested_groups.push(NestedGroupInfo {
683            original_col_index: col_idx as u16,
684            sub_keys: all_sub_keys,
685            nested_template,
686            absence_bitmap: if has_any_absent {
687                absence_bitmap
688            } else {
689                Vec::new()
690            },
691        });
692
693        for sc in sub_columns {
694            output_columns.push(sc);
695        }
696    }
697
698    if nested_groups.is_empty() {
699        return None;
700    }
701
702    // Build the flattened columnar data.
703    let num_out_cols = output_columns.len();
704    let mut out = Vec::new();
705    for (ci, col) in output_columns.iter().enumerate() {
706        for (ri, val) in col.iter().enumerate() {
707            out.extend_from_slice(val);
708            if ri < num_rows - 1 {
709                out.push(VAL_SEP);
710            }
711        }
712        if ci < num_out_cols - 1 {
713            out.push(COL_SEP);
714        }
715    }
716
717    Some((out, nested_groups))
718}
719
720/// Parse a nested JSON object into (template_parts, kv_pairs).
721/// Template parts include all structural bytes (braces, keys, colons, whitespace) —
722/// preserving the original formatting so the object can be reconstructed exactly.
723/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
724#[allow(clippy::type_complexity)]
725pub(crate) fn parse_nested_object_with_template(
726    obj: &[u8],
727) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
728    let mut pos = 0;
729
730    // Skip whitespace.
731    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
732        pos += 1;
733    }
734    if pos >= obj.len() || obj[pos] != b'{' {
735        return None;
736    }
737    pos += 1;
738
739    let mut parts: Vec<Vec<u8>> = Vec::new();
740    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
741    let mut part_start = 0;
742
743    loop {
744        // Skip whitespace.
745        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
746            pos += 1;
747        }
748        if pos >= obj.len() {
749            return None;
750        }
751        if obj[pos] == b'}' {
752            parts.push(obj[part_start..].to_vec());
753            break;
754        }
755
756        // Expect a key string.
757        if obj[pos] != b'"' {
758            return None;
759        }
760        let key_str_start = pos + 1;
761        pos += 1;
762        let mut escaped = false;
763        while pos < obj.len() {
764            if escaped {
765                escaped = false;
766            } else if obj[pos] == b'\\' {
767                escaped = true;
768            } else if obj[pos] == b'"' {
769                break;
770            }
771            pos += 1;
772        }
773        if pos >= obj.len() {
774            return None;
775        }
776        let key = obj[key_str_start..pos].to_vec();
777        pos += 1; // skip closing quote
778
779        // Skip whitespace, expect colon.
780        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
781            pos += 1;
782        }
783        if pos >= obj.len() || obj[pos] != b':' {
784            return None;
785        }
786        pos += 1;
787
788        // Skip whitespace between colon and value — include in template.
789        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
790            pos += 1;
791        }
792
793        // Template part: everything from part_start to here (includes key, colon, post-colon ws).
794        parts.push(obj[part_start..pos].to_vec());
795
796        // Extract the value (no whitespace skipping — already consumed above).
797        let value_start = pos;
798        // Use extract_value but we've already consumed whitespace.
799        let (value, value_end) = extract_value(obj, value_start)?;
800        pos = value_end;
801        pairs.push((key, value));
802
803        part_start = pos;
804
805        // Skip whitespace.
806        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
807            pos += 1;
808        }
809        if pos >= obj.len() {
810            return None;
811        }
812        if obj[pos] == b',' {
813            pos += 1;
814        } else if obj[pos] == b'}' {
815            parts.push(obj[part_start..].to_vec());
816            break;
817        } else {
818            return None;
819        }
820    }
821
822    if pairs.is_empty() {
823        return None;
824    }
825    Some((parts, pairs))
826}
827
828/// Parse a nested JSON object into its key-value pairs (depth-1 only).
829/// Returns the exact bytes for each key and value.
830/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
831pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
832    let mut pos = 0;
833
834    // Skip whitespace.
835    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
836        pos += 1;
837    }
838    if pos >= obj.len() || obj[pos] != b'{' {
839        return None;
840    }
841    pos += 1;
842
843    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
844
845    loop {
846        // Skip whitespace.
847        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
848            pos += 1;
849        }
850        if pos >= obj.len() {
851            return None;
852        }
853        if obj[pos] == b'}' {
854            break;
855        }
856
857        // Expect a key string.
858        if obj[pos] != b'"' {
859            return None;
860        }
861        pos += 1;
862        let key_start = pos;
863        let mut escaped = false;
864        while pos < obj.len() {
865            if escaped {
866                escaped = false;
867            } else if obj[pos] == b'\\' {
868                escaped = true;
869            } else if obj[pos] == b'"' {
870                break;
871            }
872            pos += 1;
873        }
874        if pos >= obj.len() {
875            return None;
876        }
877        let key = obj[key_start..pos].to_vec();
878        pos += 1; // skip closing quote
879
880        // Skip whitespace, expect colon.
881        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
882            pos += 1;
883        }
884        if pos >= obj.len() || obj[pos] != b':' {
885            return None;
886        }
887        pos += 1;
888
889        // Extract the value.
890        let (value, value_end) = extract_value(obj, pos)?;
891        pos = value_end;
892        pairs.push((key, value));
893
894        // Skip whitespace.
895        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
896            pos += 1;
897        }
898        if pos >= obj.len() {
899            return None;
900        }
901        if obj[pos] == b',' {
902            pos += 1;
903        } else if obj[pos] == b'}' {
904            break;
905        } else {
906            return None;
907        }
908    }
909
910    if pairs.is_empty() {
911        return None;
912    }
913    Some(pairs)
914}
915
916/// Unflatten nested sub-columns back into JSON objects.
917///
918/// Takes flattened columnar data and nested group info, merges sub-columns
919/// back into the original nested object columns.
920pub(crate) fn unflatten_nested_columns(
921    flat_data: &[u8],
922    nested_groups: &[NestedGroupInfo],
923    num_rows: usize,
924    total_flat_cols: usize,
925) -> Vec<u8> {
926    let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
927    if flat_columns.len() != total_flat_cols {
928        return flat_data.to_vec();
929    }
930
931    // Parse all flat column values.
932    let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
933    for chunk in &flat_columns {
934        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
935        if vals.len() != num_rows {
936            return flat_data.to_vec();
937        }
938        flat_col_values.push(vals);
939    }
940
941    // Reconstruct original columns from flat columns.
942    // Walk through flat columns, merging sub-columns back where needed.
943    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
944
945    // Build a set of original_col_index -> group for quick lookup.
946    // We need to know which flat columns map to which nested group.
947    // The flat columns are in order: non-nested cols keep their position,
948    // nested cols are replaced by their sub-columns at that position.
949    //
950    // To figure out which flat_idx corresponds to what, we replay the
951    // forward mapping.
952    // We need to know the ORIGINAL number of columns.
953    let original_num_cols = total_flat_cols
954        - nested_groups
955            .iter()
956            .map(|g| g.sub_keys.len())
957            .sum::<usize>()
958        + nested_groups.len();
959
960    // Build mapping: for each original col, is it nested or not?
961    let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
962    for (gi, group) in nested_groups.iter().enumerate() {
963        if (group.original_col_index as usize) < original_num_cols {
964            original_col_map[group.original_col_index as usize] = Some(gi);
965        }
966    }
967
968    let mut flat_idx = 0;
969    for entry in original_col_map.iter().take(original_num_cols) {
970        if let Some(gi) = entry {
971            let group = &nested_groups[*gi];
972            let num_sub = group.sub_keys.len();
973
974            // Helper: check if sub-key `si` at `row` is absent using bitmap.
975            let is_absent = |si: usize, row: usize| -> bool {
976                if group.absence_bitmap.is_empty() {
977                    return false; // no absences in this group
978                }
979                let bit_idx = si * num_rows + row;
980                let byte_idx = bit_idx / 8;
981                if byte_idx >= group.absence_bitmap.len() {
982                    return false;
983                }
984                (group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
985            };
986
987            // Merge sub-columns back into nested objects.
988            let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
989            for row in 0..num_rows {
990                // Check if all sub-columns are null or absent for this row
991                // (meaning the whole nested column was null).
992                let all_null = (0..num_sub).all(|si| {
993                    flat_idx + si < flat_col_values.len()
994                        && flat_col_values[flat_idx + si][row] == b"null"
995                });
996                if all_null && !group.absence_bitmap.is_empty() {
997                    // If all values are null but some are "absent" and some are
998                    // "explicit null", we need to reconstruct, not collapse.
999                    let any_present_null = (0..num_sub).any(|si| {
1000                        flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
1001                    });
1002                    if any_present_null {
1003                        // At least one key has an explicit null — don't collapse.
1004                        // Fall through to reconstruction below.
1005                    } else {
1006                        // All nulls are from absent keys — whole column was null.
1007                        merged_col.push(b"null".to_vec());
1008                        continue;
1009                    }
1010                } else if all_null {
1011                    merged_col.push(b"null".to_vec());
1012                    continue;
1013                }
1014
1015                // Check whether any sub-key is absent in this row.
1016                let has_absent = (0..num_sub).any(|si| is_absent(si, row));
1017
1018                if !has_absent
1019                    && !group.nested_template.is_empty()
1020                    && group.nested_template.len() == num_sub + 1
1021                {
1022                    // Template-based reconstruction: all keys present,
1023                    // preserves original formatting exactly.
1024                    let mut obj = Vec::new();
1025                    obj.extend_from_slice(&group.nested_template[0]);
1026                    if flat_idx < flat_col_values.len() {
1027                        obj.extend_from_slice(flat_col_values[flat_idx][row]);
1028                    }
1029                    for si in 1..num_sub {
1030                        obj.extend_from_slice(&group.nested_template[si]);
1031                        if flat_idx + si < flat_col_values.len() {
1032                            obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
1033                        }
1034                    }
1035                    obj.extend_from_slice(&group.nested_template[num_sub]);
1036                    merged_col.push(obj);
1037                } else {
1038                    // Compact reconstruction: some keys absent, or no template.
1039                    // Skip sub-keys that were absent in the original.
1040                    let mut obj = Vec::new();
1041                    obj.push(b'{');
1042                    let mut first = true;
1043                    for si in 0..num_sub {
1044                        if flat_idx + si >= flat_col_values.len() {
1045                            break;
1046                        }
1047                        if is_absent(si, row) {
1048                            continue; // key was absent — omit entirely
1049                        }
1050                        let val = flat_col_values[flat_idx + si][row];
1051                        if !first {
1052                            obj.push(b',');
1053                        }
1054                        first = false;
1055                        obj.push(b'"');
1056                        obj.extend_from_slice(&group.sub_keys[si]);
1057                        obj.push(b'"');
1058                        obj.push(b':');
1059                        obj.extend_from_slice(val);
1060                    }
1061                    obj.push(b'}');
1062                    merged_col.push(obj);
1063                }
1064            }
1065            output_columns.push(merged_col);
1066            flat_idx += num_sub;
1067        } else {
1068            // Non-nested column — copy as-is.
1069            if flat_idx < flat_col_values.len() {
1070                let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1071                    .iter()
1072                    .map(|v| v.to_vec())
1073                    .collect();
1074                output_columns.push(col);
1075            }
1076            flat_idx += 1;
1077        }
1078    }
1079
1080    // Rebuild columnar data.
1081    let num_out_cols = output_columns.len();
1082    let mut out = Vec::new();
1083    for (ci, col) in output_columns.iter().enumerate() {
1084        for (ri, val) in col.iter().enumerate() {
1085            out.extend_from_slice(val);
1086            if ri < num_rows - 1 {
1087                out.push(VAL_SEP);
1088            }
1089        }
1090        if ci < num_out_cols - 1 {
1091            out.push(COL_SEP);
1092        }
1093    }
1094
1095    out
1096}
1097
1098/// Serialize nested group info into bytes for storage in metadata.
1099/// Version 1 (has_nested=1): sub_keys only (backward compat, NDJSON path).
1100/// Version 2 (has_nested=2): sub_keys + nested_template (preserves formatting).
1101/// Version 3 (has_nested=3): sub_keys + nested_template + absence_bitmap.
1102pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1103    let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1104    let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
1105    let mut out = Vec::new();
1106    let version = if has_absence {
1107        3u8
1108    } else if has_template {
1109        2u8
1110    } else {
1111        1u8
1112    };
1113    out.push(version);
1114    out.push(groups.len() as u8);
1115    for group in groups {
1116        out.extend_from_slice(&group.original_col_index.to_le_bytes());
1117        out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1118        for key in &group.sub_keys {
1119            out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1120            out.extend_from_slice(key);
1121        }
1122        if has_template || version == 3 {
1123            out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1124            for part in &group.nested_template {
1125                out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1126                out.extend_from_slice(part);
1127            }
1128        }
1129        if version == 3 {
1130            let bm_len = group.absence_bitmap.len() as u32;
1131            out.extend_from_slice(&bm_len.to_le_bytes());
1132            out.extend_from_slice(&group.absence_bitmap);
1133        }
1134    }
1135    out
1136}
1137
1138/// Deserialize nested group info from metadata bytes.
1139/// Returns (nested_groups, bytes_consumed).
1140/// Handles version 1 (no template), version 2 (with template),
1141/// and version 3 (template + absence bitmap).
1142pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1143    if data.is_empty() {
1144        return None;
1145    }
1146    let mut pos = 0;
1147    let version = data[pos];
1148    pos += 1;
1149    if version != 1 && version != 2 && version != 3 {
1150        return None;
1151    }
1152    let has_template = version == 2 || version == 3;
1153    let has_absence = version == 3;
1154    if pos >= data.len() {
1155        return None;
1156    }
1157    let num_groups = data[pos] as usize;
1158    pos += 1;
1159
1160    let mut groups = Vec::with_capacity(num_groups);
1161    for _ in 0..num_groups {
1162        if pos + 4 > data.len() {
1163            return None;
1164        }
1165        let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1166        pos += 2;
1167        let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1168        pos += 2;
1169
1170        let mut sub_keys = Vec::with_capacity(num_sub_cols);
1171        for _ in 0..num_sub_cols {
1172            if pos + 2 > data.len() {
1173                return None;
1174            }
1175            let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1176            pos += 2;
1177            if pos + key_len > data.len() {
1178                return None;
1179            }
1180            sub_keys.push(data[pos..pos + key_len].to_vec());
1181            pos += key_len;
1182        }
1183
1184        let nested_template = if has_template {
1185            if pos + 2 > data.len() {
1186                return None;
1187            }
1188            let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1189            pos += 2;
1190            let mut parts = Vec::with_capacity(num_parts);
1191            for _ in 0..num_parts {
1192                if pos + 2 > data.len() {
1193                    return None;
1194                }
1195                let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1196                pos += 2;
1197                if pos + part_len > data.len() {
1198                    return None;
1199                }
1200                parts.push(data[pos..pos + part_len].to_vec());
1201                pos += part_len;
1202            }
1203            parts
1204        } else {
1205            Vec::new()
1206        };
1207
1208        let absence_bitmap = if has_absence {
1209            if pos + 4 > data.len() {
1210                return None;
1211            }
1212            let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1213            pos += 4;
1214            if pos + bm_len > data.len() {
1215                return None;
1216            }
1217            let bm = data[pos..pos + bm_len].to_vec();
1218            pos += bm_len;
1219            bm
1220        } else {
1221            Vec::new()
1222        };
1223
1224        groups.push(NestedGroupInfo {
1225            original_col_index,
1226            sub_keys,
1227            nested_template,
1228            absence_bitmap,
1229        });
1230    }
1231
1232    Some((groups, pos))
1233}
1234
1235/// Forward transform: NDJSON columnar reorg.
1236///
1237/// Tries Strategy 1 (uniform) first, then Strategy 2 (grouped) if schemas differ.
1238/// Returns None if data is not suitable for columnar transform.
1239pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1240    if data.is_empty() {
1241        return None;
1242    }
1243
1244    let has_trailing_newline = data.last() == Some(&b'\n');
1245    let lines = split_lines(data);
1246    let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1247
1248    if non_empty.len() < 2 {
1249        return None;
1250    }
1251
1252    // Strategy 1: try uniform schema first.
1253    if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1254        if col_data.len() + metadata.len() < data.len() {
1255            // Try depth-1 nested decomposition on the columnar output.
1256            // Even if the flattened data is slightly larger raw, the downstream
1257            // typed encoding + compression benefits are significant: null bitmaps
1258            // are compact and type-homogeneous columns compress much better.
1259            let num_rows = non_empty.len();
1260            if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1261                // Verify roundtrip: unflatten must produce the exact original columnar
1262                // data. Nested objects with varying sub-key sets or key ordering can
1263                // cause the compact reconstruction to reorder keys, breaking byte-exact
1264                // roundtrip. Only apply if the unflatten is provably lossless.
1265                let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
1266                let unflattened =
1267                    unflatten_nested_columns(&flat_data, &nested_groups, num_rows, total_flat_cols);
1268                if unflattened == col_data {
1269                    // Append nested info to metadata.
1270                    let nested_bytes = serialize_nested_info(&nested_groups);
1271                    metadata.extend_from_slice(&nested_bytes);
1272                    return Some(TransformResult {
1273                        data: flat_data,
1274                        metadata,
1275                    });
1276                }
1277                // else: roundtrip not exact — skip nested flatten.
1278            }
1279            // No nested objects found — append has_nested=0.
1280            metadata.push(0u8); // has_nested = 0
1281            return Some(TransformResult {
1282                data: col_data,
1283                metadata,
1284            });
1285        }
1286    }
1287
1288    // Strategy 2: group by schema.
1289    if let Some((grouped_data, grouped_metadata)) =
1290        preprocess_grouped(&non_empty, has_trailing_newline)
1291    {
1292        if grouped_data.len() + grouped_metadata.len() < data.len() {
1293            return Some(TransformResult {
1294                data: grouped_data,
1295                metadata: grouped_metadata,
1296            });
1297        }
1298    }
1299
1300    None
1301}
1302
1303/// Reverse transform: reconstruct NDJSON from columnar layout + metadata.
1304/// Dispatches to the appropriate decoder based on metadata version byte.
1305pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1306    if metadata.is_empty() {
1307        return data.to_vec();
1308    }
1309    match metadata[0] {
1310        METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1311        METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1312        _ => data.to_vec(),
1313    }
1314}
1315
1316/// Reverse Strategy 1: uniform schema.
1317fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1318    if metadata.len() < 10 {
1319        return data.to_vec();
1320    }
1321    let mut pos = 0;
1322    let _version = metadata[pos];
1323    pos += 1;
1324    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1325    pos += 4;
1326    let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1327    pos += 2;
1328    let has_trailing_newline = metadata[pos] != 0;
1329    pos += 1;
1330    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1331    pos += 2;
1332
1333    let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1334    for _ in 0..num_parts {
1335        if pos + 2 > metadata.len() {
1336            return data.to_vec();
1337        }
1338        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1339        pos += 2;
1340        if pos + part_len > metadata.len() {
1341            return data.to_vec();
1342        }
1343        parts.push(metadata[pos..pos + part_len].to_vec());
1344        pos += part_len;
1345    }
1346
1347    if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1348        return data.to_vec();
1349    }
1350
1351    // Check for nested metadata after template parts.
1352    let remaining_metadata = &metadata[pos..];
1353    if !remaining_metadata.is_empty()
1354        && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
1355    {
1356        // has_nested == 1, 2, or 3: unflatten before reconstructing rows.
1357        if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1358            // Calculate total number of flat columns.
1359            let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1360            let unflattened =
1361                unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1362            return reverse_uniform_from_parts(
1363                &unflattened,
1364                &parts,
1365                num_rows,
1366                num_cols,
1367                has_trailing_newline,
1368            );
1369        }
1370    }
1371
1372    reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1373}
1374
1375/// Core uniform reverse: given parsed parts, reconstruct lines from columnar data.
1376fn reverse_uniform_from_parts(
1377    data: &[u8],
1378    parts: &[Vec<u8>],
1379    num_rows: usize,
1380    num_cols: usize,
1381    has_trailing_newline: bool,
1382) -> Vec<u8> {
1383    let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1384    if col_chunks.len() != num_cols {
1385        return data.to_vec();
1386    }
1387
1388    let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1389    for chunk in &col_chunks {
1390        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1391        if vals.len() != num_rows {
1392            return data.to_vec();
1393        }
1394        columns.push(vals);
1395    }
1396
1397    let mut output = Vec::with_capacity(data.len() * 2);
1398    #[allow(clippy::needless_range_loop)]
1399    for row in 0..num_rows {
1400        output.extend_from_slice(&parts[0]);
1401        output.extend_from_slice(columns[0][row]);
1402        for col in 1..num_cols {
1403            output.extend_from_slice(&parts[col]);
1404            output.extend_from_slice(columns[col][row]);
1405        }
1406        output.extend_from_slice(&parts[num_cols]);
1407
1408        if row < num_rows - 1 || has_trailing_newline {
1409            output.push(b'\n');
1410        }
1411    }
1412
1413    output
1414}
1415
1416/// Parse Strategy 1 metadata and return (parts, num_rows, num_cols, has_trailing_newline).
1417/// Used by reverse_grouped to decode per-group metadata.
1418fn parse_uniform_metadata(metadata: &[u8]) -> Option<(Vec<Vec<u8>>, usize, usize, bool)> {
1419    if metadata.len() < 10 {
1420        return None;
1421    }
1422    let mut pos = 1; // Skip version byte.
1423    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1424    pos += 4;
1425    let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1426    pos += 2;
1427    let has_trailing_newline = metadata[pos] != 0;
1428    pos += 1;
1429    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1430    pos += 2;
1431
1432    let mut parts = Vec::with_capacity(num_parts);
1433    for _ in 0..num_parts {
1434        if pos + 2 > metadata.len() {
1435            return None;
1436        }
1437        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1438        pos += 2;
1439        if pos + part_len > metadata.len() {
1440            return None;
1441        }
1442        parts.push(metadata[pos..pos + part_len].to_vec());
1443        pos += part_len;
1444    }
1445
1446    if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1447        return None;
1448    }
1449
1450    Some((parts, num_rows, num_cols, has_trailing_newline))
1451}
1452
1453/// Reverse Strategy 2: grouped schema.
1454fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1455    if metadata.len() < 8 {
1456        return data.to_vec();
1457    }
1458
1459    let mut mpos = 1; // Skip version byte.
1460    let has_trailing_newline = metadata[mpos] != 0;
1461    mpos += 1;
1462    let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1463    mpos += 4;
1464    let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1465    mpos += 2;
1466
1467    // Allocate output slots.
1468    let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1469
1470    // Data cursor.
1471    let mut dpos: usize = 0;
1472
1473    for _ in 0..num_groups {
1474        // Read row indices for this group.
1475        if mpos + 4 > metadata.len() {
1476            return data.to_vec();
1477        }
1478        let group_row_count =
1479            u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1480        mpos += 4;
1481
1482        let mut row_indices = Vec::with_capacity(group_row_count);
1483        for _ in 0..group_row_count {
1484            if mpos + 4 > metadata.len() {
1485                return data.to_vec();
1486            }
1487            let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1488            mpos += 4;
1489            row_indices.push(idx);
1490        }
1491
1492        // Read group metadata.
1493        if mpos + 4 > metadata.len() {
1494            return data.to_vec();
1495        }
1496        let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1497        mpos += 4;
1498        if mpos + gm_len > metadata.len() {
1499            return data.to_vec();
1500        }
1501        let group_metadata = &metadata[mpos..mpos + gm_len];
1502        mpos += gm_len;
1503
1504        // Read group data from the data blob.
1505        if dpos + 4 > data.len() {
1506            return data.to_vec();
1507        }
1508        let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1509        dpos += 4;
1510        if dpos + gd_len > data.len() {
1511            return data.to_vec();
1512        }
1513        let group_data = &data[dpos..dpos + gd_len];
1514        dpos += gd_len;
1515
1516        // Decode this group using Strategy 1 reverse.
1517        let (parts, num_rows, num_cols, _trailing) = match parse_uniform_metadata(group_metadata) {
1518            Some(v) => v,
1519            None => return data.to_vec(),
1520        };
1521
1522        if num_rows != group_row_count {
1523            return data.to_vec();
1524        }
1525
1526        // Split columnar data into columns and values.
1527        let col_chunks: Vec<&[u8]> = group_data.split(|&b| b == COL_SEP).collect();
1528        if col_chunks.len() != num_cols {
1529            return data.to_vec();
1530        }
1531
1532        let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1533        for chunk in &col_chunks {
1534            let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1535            if vals.len() != num_rows {
1536                return data.to_vec();
1537            }
1538            columns.push(vals);
1539        }
1540
1541        // Reconstruct each line for this group.
1542        for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1543            let mut line = Vec::new();
1544            line.extend_from_slice(&parts[0]);
1545            line.extend_from_slice(columns[0][row_within_group]);
1546            for col in 1..num_cols {
1547                line.extend_from_slice(&parts[col]);
1548                line.extend_from_slice(columns[col][row_within_group]);
1549            }
1550            line.extend_from_slice(&parts[num_cols]);
1551
1552            if original_idx < total_rows {
1553                output_lines[original_idx] = Some(line);
1554            }
1555        }
1556    }
1557
1558    // Read residual indices from metadata.
1559    if mpos + 4 > metadata.len() {
1560        return data.to_vec();
1561    }
1562    let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1563    mpos += 4;
1564
1565    let mut residual_indices = Vec::with_capacity(residual_count);
1566    for _ in 0..residual_count {
1567        if mpos + 4 > metadata.len() {
1568            return data.to_vec();
1569        }
1570        let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1571        mpos += 4;
1572        residual_indices.push(idx);
1573    }
1574
1575    // Remaining data is residual lines.
1576    let residual_data = &data[dpos..];
1577    if residual_count > 0 {
1578        let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
1579            vec![]
1580        } else {
1581            residual_data.split(|&b| b == b'\n').collect()
1582        };
1583        // There should be exactly residual_count lines.
1584        if residual_lines.len() != residual_count {
1585            return data.to_vec();
1586        }
1587        for (i, &idx) in residual_indices.iter().enumerate() {
1588            if idx < total_rows {
1589                output_lines[idx] = Some(residual_lines[i].to_vec());
1590            }
1591        }
1592    }
1593
1594    // Assemble final output.
1595    let mut output = Vec::with_capacity(data.len() * 2);
1596    for (i, slot) in output_lines.iter().enumerate() {
1597        match slot {
1598            Some(line) => output.extend_from_slice(line),
1599            None => {
1600                // Should not happen — missing row. Return data as-is.
1601                return data.to_vec();
1602            }
1603        }
1604        if i < total_rows - 1 || has_trailing_newline {
1605            output.push(b'\n');
1606        }
1607    }
1608
1609    output
1610}
1611
1612#[cfg(test)]
1613mod tests {
1614    use super::*;
1615
1616    #[test]
1617    fn extract_value_string() {
1618        let line = br#""hello","next""#;
1619        let (val, end) = extract_value(line, 0).unwrap();
1620        assert_eq!(val, b"\"hello\"");
1621        assert_eq!(end, 7);
1622    }
1623
1624    #[test]
1625    fn extract_value_number() {
1626        let line = b"42,next";
1627        let (val, end) = extract_value(line, 0).unwrap();
1628        assert_eq!(val, b"42");
1629        assert_eq!(end, 2);
1630    }
1631
1632    #[test]
1633    fn extract_value_bool() {
1634        let line = b"true,next";
1635        let (val, end) = extract_value(line, 0).unwrap();
1636        assert_eq!(val, b"true");
1637        assert_eq!(end, 4);
1638    }
1639
1640    #[test]
1641    fn extract_value_null() {
1642        let line = b"null,next";
1643        let (val, end) = extract_value(line, 0).unwrap();
1644        assert_eq!(val, b"null");
1645        assert_eq!(end, 4);
1646    }
1647
1648    #[test]
1649    fn extract_value_object() {
1650        let line = br#"{"a":1,"b":"x"},next"#;
1651        let (val, end) = extract_value(line, 0).unwrap();
1652        assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
1653        assert_eq!(end, 15);
1654    }
1655
1656    #[test]
1657    fn extract_value_array() {
1658        let line = b"[1,2,3],next";
1659        let (val, end) = extract_value(line, 0).unwrap();
1660        assert_eq!(val, b"[1,2,3]");
1661        assert_eq!(end, 7);
1662    }
1663
1664    #[test]
1665    fn extract_value_string_with_escapes() {
1666        let line = br#""he\"llo",next"#;
1667        let (val, end) = extract_value(line, 0).unwrap();
1668        assert_eq!(val, br#""he\"llo""#.to_vec());
1669        assert_eq!(end, 9);
1670    }
1671
1672    #[test]
1673    fn parse_line_simple() {
1674        let line = br#"{"a":1,"b":"x"}"#;
1675        let (parts, values) = parse_line(line).unwrap();
1676        assert_eq!(parts.len(), 3); // {"a": , ,"b": , }
1677        assert_eq!(values.len(), 2);
1678        assert_eq!(values[0], b"1");
1679        assert_eq!(values[1], b"\"x\"");
1680        assert_eq!(parts[0], br#"{"a":"#.to_vec());
1681        assert_eq!(parts[1], br#","b":"#.to_vec());
1682        assert_eq!(parts[2], b"}");
1683    }
1684
1685    #[test]
1686    fn roundtrip_simple() {
1687        let data = br#"{"a":1,"b":"x"}
1688{"a":2,"b":"y"}
1689{"a":3,"b":"z"}
1690"#;
1691        let result = preprocess(data).expect("should produce transform");
1692        let restored = reverse(&result.data, &result.metadata);
1693        assert_eq!(
1694            String::from_utf8_lossy(&restored),
1695            String::from_utf8_lossy(data),
1696        );
1697        assert_eq!(restored, data.to_vec());
1698    }
1699
1700    #[test]
1701    fn roundtrip_no_trailing_newline() {
1702        let data = br#"{"a":1,"b":"x"}
1703{"a":2,"b":"y"}
1704{"a":3,"b":"z"}"#;
1705        let result = preprocess(data).expect("should produce transform");
1706        let restored = reverse(&result.data, &result.metadata);
1707        assert_eq!(restored, data.to_vec());
1708    }
1709
1710    #[test]
1711    fn roundtrip_nested_values() {
1712        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1713{"id":2,"meta":{"x":30,"y":40}}
1714{"id":3,"meta":{"x":50,"y":60}}
1715{"id":4,"meta":{"x":70,"y":80}}
1716{"id":5,"meta":{"x":90,"y":100}}
1717"#;
1718        let result = preprocess(data).expect("should produce transform");
1719        let restored = reverse(&result.data, &result.metadata);
1720        assert_eq!(restored, data.to_vec());
1721    }
1722
1723    #[test]
1724    fn roundtrip_mixed_types() {
1725        let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
1726{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
1727{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
1728{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
1729{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
1730"#;
1731        let result = preprocess(data).expect("should produce transform");
1732        let restored = reverse(&result.data, &result.metadata);
1733        assert_eq!(restored, data.to_vec());
1734    }
1735
1736    #[test]
1737    fn schema_mismatch_too_few_returns_none() {
1738        // Different keys on different lines, but each group has < MIN_GROUP_ROWS.
1739        let data = br#"{"a":1,"b":2}
1740{"a":1,"c":3}
1741"#;
1742        assert!(preprocess(data).is_none());
1743    }
1744
1745    #[test]
1746    fn different_num_keys_too_few_returns_none() {
1747        let data = br#"{"a":1,"b":2}
1748{"a":1}
1749"#;
1750        assert!(preprocess(data).is_none());
1751    }
1752
1753    #[test]
1754    fn single_line_returns_none() {
1755        let data = br#"{"a":1,"b":2}
1756"#;
1757        assert!(preprocess(data).is_none());
1758    }
1759
1760    #[test]
1761    fn empty_returns_none() {
1762        assert!(preprocess(b"").is_none());
1763    }
1764
1765    #[test]
1766    fn column_layout_groups_similar_values() {
1767        let data = br#"{"type":"page_view","user":"alice"}
1768{"type":"api_call","user":"alice"}
1769{"type":"click","user":"bob"}
1770"#;
1771        let result = preprocess(data).unwrap();
1772
1773        // The columnar data should have type values grouped, then user values grouped.
1774        let col_data = &result.data;
1775        let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
1776        assert_eq!(cols.len(), 2);
1777
1778        // Column 0 = type values.
1779        let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
1780        assert_eq!(type_vals.len(), 3);
1781        assert_eq!(type_vals[0], br#""page_view""#);
1782        assert_eq!(type_vals[1], br#""api_call""#);
1783        assert_eq!(type_vals[2], br#""click""#);
1784
1785        // Column 1 = user values.
1786        let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
1787        assert_eq!(user_vals.len(), 3);
1788        assert_eq!(user_vals[0], br#""alice""#);
1789        assert_eq!(user_vals[1], br#""alice""#);
1790        assert_eq!(user_vals[2], br#""bob""#);
1791    }
1792
1793    #[test]
1794    fn roundtrip_string_with_escaped_chars() {
1795        let data = br#"{"msg":"he said \"hi\"","val":1}
1796{"msg":"line\nbreak","val":2}
1797{"msg":"tab\there","val":3}
1798{"msg":"back\\slash","val":4}
1799{"msg":"normal text","val":5}
1800"#;
1801        let result = preprocess(data).expect("should produce transform");
1802        let restored = reverse(&result.data, &result.metadata);
1803        assert_eq!(restored, data.to_vec());
1804    }
1805
1806    #[test]
1807    fn roundtrip_negative_and_float_numbers() {
1808        let data = br#"{"x":-3.14,"y":0}
1809{"x":2.718,"y":-1}
1810{"x":0.001,"y":999}
1811{"x":-100,"y":-200}
1812{"x":42.0,"y":7}
1813"#;
1814        let result = preprocess(data).expect("should produce transform");
1815        let restored = reverse(&result.data, &result.metadata);
1816        assert_eq!(restored, data.to_vec());
1817    }
1818
1819    /// Test that the transform+reverse is lossless even for tiny inputs
1820    /// by repeating them to pass the size check threshold.
1821    #[test]
1822    fn reverse_roundtrip_small_data() {
1823        // Verify parse_line works on small lines.
1824        let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
1825        assert_eq!(vals.len(), 2);
1826        assert_eq!(parts.len(), 3);
1827
1828        // 2-row data might fail the size check, so repeat to get enough rows.
1829        let big_data = br#"{"x":-3.14,"y":0}
1830{"x":2.718,"y":-1}
1831"#
1832        .repeat(20);
1833        let result = preprocess(&big_data).expect("should produce transform with 40 rows");
1834        let restored = reverse(&result.data, &result.metadata);
1835        assert_eq!(restored, big_data);
1836    }
1837
1838    // --- Strategy 2 (grouped) tests ---
1839
1840    #[test]
1841    fn grouped_roundtrip_two_schemas() {
1842        // Two different schemas, each with >= MIN_GROUP_ROWS rows.
1843        let mut data = Vec::new();
1844        for i in 0..10 {
1845            data.extend_from_slice(
1846                format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1847            );
1848            data.push(b'\n');
1849        }
1850        for i in 10..20 {
1851            data.extend_from_slice(
1852                format!(
1853                    r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1854                    i, i, i
1855                )
1856                .as_bytes(),
1857            );
1858            data.push(b'\n');
1859        }
1860        let result = preprocess(&data).expect("should produce grouped transform");
1861        // Should be version 2 (grouped).
1862        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1863        let restored = reverse(&result.data, &result.metadata);
1864        assert_eq!(restored, data);
1865    }
1866
1867    #[test]
1868    fn grouped_roundtrip_interleaved_schemas() {
1869        // Interleaved schemas: alternating between two different key sets.
1870        let mut data = Vec::new();
1871        for i in 0..20 {
1872            if i % 2 == 0 {
1873                data.extend_from_slice(
1874                    format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1875                );
1876            } else {
1877                data.extend_from_slice(
1878                    format!(
1879                        r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1880                        i, i, i
1881                    )
1882                    .as_bytes(),
1883                );
1884            }
1885            data.push(b'\n');
1886        }
1887        let result = preprocess(&data).expect("should produce grouped transform");
1888        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1889        let restored = reverse(&result.data, &result.metadata);
1890        assert_eq!(restored, data);
1891    }
1892
1893    #[test]
1894    fn grouped_roundtrip_with_residuals() {
1895        // Two large groups + a few unique-schema rows (residuals).
1896        let mut data = Vec::new();
1897        // Group A: 8 rows.
1898        for i in 0..8 {
1899            data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
1900            data.push(b'\n');
1901        }
1902        // 2 unique rows (will be residual).
1903        data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
1904        data.push(b'\n');
1905        data.extend_from_slice(br#"{"p":"q"}"#);
1906        data.push(b'\n');
1907        // Group B: 6 rows.
1908        for i in 0..6 {
1909            data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
1910            data.push(b'\n');
1911        }
1912        let result = preprocess(&data).expect("should produce grouped transform");
1913        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1914        let restored = reverse(&result.data, &result.metadata);
1915        assert_eq!(
1916            String::from_utf8_lossy(&restored),
1917            String::from_utf8_lossy(&data),
1918        );
1919        assert_eq!(restored, data);
1920    }
1921
1922    #[test]
1923    fn grouped_roundtrip_no_trailing_newline() {
1924        let mut data = Vec::new();
1925        for i in 0..6 {
1926            data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
1927            data.push(b'\n');
1928        }
1929        for i in 0..6 {
1930            data.extend_from_slice(
1931                format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
1932            );
1933            if i < 5 {
1934                data.push(b'\n');
1935            }
1936            // Last line: no trailing newline.
1937        }
1938        let result = preprocess(&data).expect("should produce grouped transform");
1939        let restored = reverse(&result.data, &result.metadata);
1940        assert_eq!(restored, data);
1941    }
1942
1943    #[test]
1944    fn uniform_still_preferred_over_grouped() {
1945        // All rows same schema — should use Strategy 1 (version 1), not Strategy 2.
1946        let data = br#"{"a":1,"b":"x"}
1947{"a":2,"b":"y"}
1948{"a":3,"b":"z"}
1949{"a":4,"b":"w"}
1950{"a":5,"b":"v"}
1951"#;
1952        let result = preprocess(data).expect("should produce transform");
1953        assert_eq!(
1954            result.metadata[0], METADATA_VERSION_UNIFORM,
1955            "uniform schema should use Strategy 1"
1956        );
1957        let restored = reverse(&result.data, &result.metadata);
1958        assert_eq!(restored, data.to_vec());
1959    }
1960
1961    #[test]
1962    fn grouped_gharchive_simulation() {
1963        // Simulates GitHub Archive: most rows have 7 keys, some have 8.
1964        let mut data = Vec::new();
1965        for i in 0..50 {
1966            if i % 5 == 0 {
1967                // 8-key rows (with org).
1968                data.extend_from_slice(
1969                    format!(
1970                        r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
1971                        i, i, i, i
1972                    )
1973                    .as_bytes(),
1974                );
1975            } else {
1976                // 7-key rows (no org).
1977                data.extend_from_slice(
1978                    format!(
1979                        r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
1980                        i, i, i
1981                    )
1982                    .as_bytes(),
1983                );
1984            }
1985            data.push(b'\n');
1986        }
1987        let result = preprocess(&data).expect("should produce grouped transform");
1988        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1989        let restored = reverse(&result.data, &result.metadata);
1990        assert_eq!(restored, data);
1991    }
1992
1993    // --- Nested decomposition tests ---
1994
1995    #[test]
1996    fn test_nested_decomposition_basic() {
1997        // Simple nested object decomposed correctly.
1998        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1999{"id":2,"meta":{"x":30,"y":40}}
2000{"id":3,"meta":{"x":50,"y":60}}
2001"#;
2002        let result = preprocess(data).expect("should produce transform");
2003        assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
2004
2005        // The columnar data should have expanded columns.
2006        let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
2007        // Original: 2 cols (id, meta). After flattening: 3 cols (id, meta.x, meta.y).
2008        assert_eq!(
2009            cols.len(),
2010            3,
2011            "should have 3 columns after flattening: got {}",
2012            cols.len()
2013        );
2014
2015        // Verify sub-columns contain the extracted values.
2016        let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2017        assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
2018
2019        let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
2020        assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
2021    }
2022
2023    #[test]
2024    fn test_nested_roundtrip() {
2025        // Flatten -> unflatten produces byte-exact original.
2026        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2027{"id":2,"meta":{"x":30,"y":40}}
2028{"id":3,"meta":{"x":50,"y":60}}
2029"#;
2030        let result = preprocess(data).expect("should produce transform");
2031        let restored = reverse(&result.data, &result.metadata);
2032        assert_eq!(
2033            String::from_utf8_lossy(&restored),
2034            String::from_utf8_lossy(data),
2035        );
2036        assert_eq!(restored, data.to_vec());
2037    }
2038
2039    #[test]
2040    fn test_nested_mixed_schemas() {
2041        // Different nested objects per row (some keys missing -> null).
2042        let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
2043{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
2044{"ts":"c","meta":{"query":"pricing","results_count":25}}
2045{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
2046{"ts":"e","meta":{"query":"api docs","results_count":41}}
2047"#;
2048        let result = preprocess(data).expect("should produce transform");
2049        let restored = reverse(&result.data, &result.metadata);
2050        assert_eq!(
2051            String::from_utf8_lossy(&restored),
2052            String::from_utf8_lossy(data),
2053        );
2054        assert_eq!(restored, data.to_vec());
2055    }
2056
2057    #[test]
2058    fn test_nested_no_nested_objects() {
2059        // Returns None when no nested objects — flat data should still work.
2060        let data = br#"{"a":1,"b":"x"}
2061{"a":2,"b":"y"}
2062{"a":3,"b":"z"}
2063"#;
2064        let result = preprocess(data).expect("should produce transform");
2065        let restored = reverse(&result.data, &result.metadata);
2066        assert_eq!(restored, data.to_vec());
2067
2068        // Verify the metadata has has_nested=0 since no nested objects.
2069        // The nested flag is appended after template parts.
2070        // For uniform, metadata starts with version(1) + num_rows(4) + num_cols(2) +
2071        // trailing_newline(1) + num_parts(2) + parts.
2072        // After those parts, there should be a 0 byte (has_nested=0).
2073        let meta = &result.metadata;
2074        let last_byte = meta[meta.len() - 1];
2075        assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
2076    }
2077
2078    #[test]
2079    fn test_nested_real_corpus() {
2080        // Test with data shaped like the test-ndjson.ndjson corpus.
2081        let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
2082{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
2083{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
2084{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
2085{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
2086"#;
2087        let result = preprocess(data).expect("should produce transform");
2088        let restored = reverse(&result.data, &result.metadata);
2089        assert_eq!(
2090            String::from_utf8_lossy(&restored),
2091            String::from_utf8_lossy(data),
2092        );
2093        assert_eq!(restored, data.to_vec());
2094    }
2095
2096    #[test]
2097    fn test_nested_roundtrip_with_null_values() {
2098        // Some rows have null for the nested field.
2099        let data = br#"{"id":1,"meta":{"x":10}}
2100{"id":2,"meta":null}
2101{"id":3,"meta":{"x":30}}
2102{"id":4,"meta":null}
2103{"id":5,"meta":{"x":50}}
2104"#;
2105        let result = preprocess(data).expect("should produce transform");
2106        let restored = reverse(&result.data, &result.metadata);
2107        assert_eq!(restored, data.to_vec());
2108    }
2109
2110    #[test]
2111    fn test_nested_string_values_preserved_exact() {
2112        // Verify that string values in nested objects preserve exact bytes (with quotes).
2113        let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2114{"id":2,"meta":{"name":"Bob","score":200}}
2115{"id":3,"meta":{"name":"Charlie","score":300}}
2116"#;
2117        let result = preprocess(data).expect("should produce transform");
2118        let restored = reverse(&result.data, &result.metadata);
2119        assert_eq!(restored, data.to_vec());
2120    }
2121
2122    #[test]
2123    fn test_parse_nested_object_kv() {
2124        let obj = br#"{"query":"benchmark","results_count":14}"#;
2125        let pairs = parse_nested_object_kv(obj).unwrap();
2126        assert_eq!(pairs.len(), 2);
2127        assert_eq!(pairs[0].0, b"query");
2128        assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2129        assert_eq!(pairs[1].0, b"results_count");
2130        assert_eq!(pairs[1].1, b"14");
2131    }
2132
2133    #[test]
2134    fn test_nested_varying_subkeys_roundtrip() {
2135        // Regression: rows with varying sub-keys in nested objects must
2136        // round-trip byte-exact. Even rows have `extra`, odd rows don't.
2137        let mut lines = Vec::new();
2138        for i in 0..50 {
2139            let line = if i % 2 == 0 {
2140                format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
2141            } else {
2142                format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
2143            };
2144            lines.push(line);
2145        }
2146        let ndjson = lines.join("\n") + "\n";
2147        let data = ndjson.as_bytes();
2148
2149        let result = preprocess(data).expect("should produce transform");
2150        let restored = reverse(&result.data, &result.metadata);
2151        assert_eq!(
2152            std::str::from_utf8(&restored).unwrap(),
2153            std::str::from_utf8(data).unwrap(),
2154            "varying sub-keys roundtrip must be byte-exact"
2155        );
2156    }
2157
2158    #[test]
2159    fn test_nested_explicit_null_preserved() {
2160        // Explicit null values in nested objects must survive roundtrip.
2161        // `{"x":1,"y":null}` must NOT be collapsed to `{"x":1}`.
2162        let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
2163                     {\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
2164                     {\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
2165        let result = preprocess(data).expect("should produce transform");
2166        let restored = reverse(&result.data, &result.metadata);
2167        assert_eq!(
2168            std::str::from_utf8(&restored).unwrap(),
2169            std::str::from_utf8(data).unwrap(),
2170            "explicit null values must be preserved"
2171        );
2172    }
2173
2174    #[test]
2175    fn null_heavy_30_rows_roundtrip() {
2176        // Regression test: 30 rows with all-null column caused CRC mismatch.
2177        let mut data = Vec::new();
2178        for i in 0..30 {
2179            data.extend_from_slice(format!("{{\"id\":{},\"val\":null}}\n", i).as_bytes());
2180        }
2181        let result = preprocess(&data);
2182        if let Some(result) = result {
2183            let restored = reverse(&result.data, &result.metadata);
2184            assert_eq!(
2185                restored, data,
2186                "null-heavy 30-row roundtrip failed.\nOriginal len={}, Restored len={}\nOrig first 200: {:?}\nRest first 200: {:?}",
2187                data.len(), restored.len(),
2188                String::from_utf8_lossy(&data[..data.len().min(200)]),
2189                String::from_utf8_lossy(&restored[..restored.len().min(200)])
2190            );
2191        }
2192    }
2193
2194    #[test]
2195    fn null_heavy_60_rows_roundtrip() {
2196        // Regression test: 60 rows with multiple all-null columns.
2197        let mut data = Vec::new();
2198        for i in 0..60 {
2199            let name = if i % 10 == 0 {
2200                format!("\"user_{}\"", i)
2201            } else {
2202                "null".to_string()
2203            };
2204            data.extend_from_slice(
2205                format!("{{\"id\":{},\"name\":{},\"email\":null,\"score\":null,\"active\":null,\"tags\":null}}\n", i, name).as_bytes(),
2206            );
2207        }
2208        let result = preprocess(&data);
2209        if let Some(result) = result {
2210            let restored = reverse(&result.data, &result.metadata);
2211            assert_eq!(restored, data, "null-heavy 60-row ndjson roundtrip failed");
2212        }
2213    }
2214}