Skip to main content

datacortex_core/format/
ndjson.rs

1//! NDJSON columnar reorg — lossless transform that reorders row-oriented
2//! NDJSON data into column-oriented layout.
3//!
4//! Two strategies:
5//!
6//! **Strategy 1 (uniform):** All rows share the same schema (keys in same order).
7//!   Row-oriented (before):
8//!     {"ts":"2026-03-15T10:30:00.001Z","type":"page_view","user":"usr_a1b2c3d4"}
9//!     {"ts":"2026-03-15T10:30:00.234Z","type":"api_call","user":"usr_a1b2c3d4"}
10//!   Column-oriented (after):
11//!     [ts values] "2026-03-15T10:30:00.001Z" \x01 "2026-03-15T10:30:00.234Z" \x00
12//!     [type values] "page_view" \x01 "api_call" \x00
13//!     [user values] "usr_a1b2c3d4" \x01 "usr_a1b2c3d4"
14//!
15//! **Strategy 2 (grouped):** Rows have diverse schemas (e.g., GitHub Archive events).
16//!   Groups rows by schema, applies Strategy 1 per group, stores residual rows raw.
17//!   Metadata version byte distinguishes: 1 = uniform, 2 = grouped.
18//!
19//! Separators:
20//!   \x00 = column separator (cannot appear in valid JSON text)
21//!   \x01 = value separator within a column (cannot appear in valid JSON)
22
23use std::collections::HashMap;
24
25use super::transform::TransformResult;
26
27const COL_SEP: u8 = 0x00;
28const VAL_SEP: u8 = 0x01;
29const METADATA_VERSION_UNIFORM: u8 = 1;
30const METADATA_VERSION_GROUPED: u8 = 2;
31const METADATA_VERSION_SELECTIVE: u8 = 3;
32/// Grouped sub-group version: uniform columnar with typed encoding applied.
33/// Reserved for future use — typed encoding in grouped path doesn't improve
34/// post-zstd ratio because zstd compresses raw text equally well.
35#[allow(dead_code)]
36const METADATA_VERSION_TYPED: u8 = 4;
37
38/// Minimum rows in a schema group for it to be columnarized (not residual).
39const MIN_GROUP_ROWS: usize = 5;
40
41/// Minimum average value length for a column to be kept inline (row-major).
42/// Only large unique values (like nested JSON payloads of 100+ bytes) benefit from
43/// inline storage. Moderate-length values (metadata objects, URLs) compress well
44/// in column-major order even at high cardinality because zstd can exploit shared
45/// structural patterns between adjacent values.
46const SELECTIVE_MIN_AVG_LEN: usize = 128;
47
48/// Maximum cardinality ratio (unique/total) for a column to be extracted into columnar format.
49/// Columns above this threshold AND above `SELECTIVE_MIN_AVG_LEN` are kept inline (row-major)
50/// to preserve inter-row locality that benefits zstd/brotli compression.
51const SELECTIVE_MAX_CARDINALITY: f64 = 0.7;
52
53/// A schema group: template parts + list of (row_index, parsed_values).
54/// Uses borrowed slices to avoid allocations during parsing.
55type SchemaGroup<'a> = (Vec<&'a [u8]>, Vec<(usize, Vec<&'a [u8]>)>);
56
57/// Extract the raw value bytes from a JSON line at a given position.
58/// `pos` should point to the first byte of the value (after `:`).
59/// Returns (value_bytes, end_position).
60///
61/// Handles: strings, numbers, booleans, null, nested objects, nested arrays.
62fn extract_value(line: &[u8], mut pos: usize) -> Option<(&[u8], usize)> {
63    // Skip whitespace after colon.
64    while pos < line.len() && line[pos].is_ascii_whitespace() {
65        pos += 1;
66    }
67    if pos >= line.len() {
68        return None;
69    }
70
71    let start = pos;
72    match line[pos] {
73        b'"' => {
74            // String value — scan to closing unescaped quote.
75            pos += 1;
76            let mut escaped = false;
77            while pos < line.len() {
78                if escaped {
79                    escaped = false;
80                } else if line[pos] == b'\\' {
81                    escaped = true;
82                } else if line[pos] == b'"' {
83                    pos += 1;
84                    return Some((&line[start..pos], pos));
85                }
86                pos += 1;
87            }
88            None // Unterminated string.
89        }
90        b'{' => {
91            // Nested object — match braces, respecting strings.
92            let mut depth = 1;
93            pos += 1;
94            while pos < line.len() && depth > 0 {
95                match line[pos] {
96                    b'"' => {
97                        // Skip over string contents.
98                        pos += 1;
99                        let mut escaped = false;
100                        while pos < line.len() {
101                            if escaped {
102                                escaped = false;
103                            } else if line[pos] == b'\\' {
104                                escaped = true;
105                            } else if line[pos] == b'"' {
106                                break;
107                            }
108                            pos += 1;
109                        }
110                    }
111                    b'{' => depth += 1,
112                    b'}' => depth -= 1,
113                    _ => {}
114                }
115                pos += 1;
116            }
117            if depth != 0 || pos > line.len() {
118                return None; // Unterminated object.
119            }
120            Some((&line[start..pos], pos))
121        }
122        b'[' => {
123            // Nested array — match brackets, respecting strings.
124            let mut depth = 1;
125            pos += 1;
126            while pos < line.len() && depth > 0 {
127                match line[pos] {
128                    b'"' => {
129                        pos += 1;
130                        let mut escaped = false;
131                        while pos < line.len() {
132                            if escaped {
133                                escaped = false;
134                            } else if line[pos] == b'\\' {
135                                escaped = true;
136                            } else if line[pos] == b'"' {
137                                break;
138                            }
139                            pos += 1;
140                        }
141                    }
142                    b'[' => depth += 1,
143                    b']' => depth -= 1,
144                    _ => {}
145                }
146                pos += 1;
147            }
148            if depth != 0 || pos > line.len() {
149                return None; // Unterminated array.
150            }
151            Some((&line[start..pos], pos))
152        }
153        _ => {
154            // Number, boolean, null — scan until , or } or ] or whitespace.
155            while pos < line.len() {
156                match line[pos] {
157                    b',' | b'}' | b']' => break,
158                    _ if line[pos].is_ascii_whitespace() => break,
159                    _ => pos += 1,
160                }
161            }
162            if pos == start {
163                None
164            } else {
165                Some((&line[start..pos], pos))
166            }
167        }
168    }
169}
170
171/// Parse a single JSON line into (template_parts, values).
172///
173/// Template parts are the structural bytes between values:
174///   Part 0 = everything from { up to and including the : before value 0
175///   Part 1 = everything from after value 0 up to and including : before value 1
176///   ...
177///   Part N = everything from after the last value to end of line (including } and \n)
178///
179/// Returns None if the line is not a flat-ish JSON object (we handle nested values
180/// as opaque blobs, but the top-level structure must be key:value pairs).
181type ParsedLine<'a> = (Vec<&'a [u8]>, Vec<&'a [u8]>);
182
183fn parse_line(line: &[u8]) -> Option<ParsedLine<'_>> {
184    let mut pos = 0;
185
186    // Skip leading whitespace.
187    while pos < line.len() && line[pos].is_ascii_whitespace() {
188        pos += 1;
189    }
190    if pos >= line.len() || line[pos] != b'{' {
191        return None;
192    }
193
194    let mut parts: Vec<&[u8]> = Vec::new();
195    let mut values: Vec<&[u8]> = Vec::new();
196    let mut part_start = 0;
197
198    pos += 1; // Skip opening {.
199
200    loop {
201        // Skip whitespace.
202        while pos < line.len() && line[pos].is_ascii_whitespace() {
203            pos += 1;
204        }
205        if pos >= line.len() {
206            return None;
207        }
208
209        // Check for closing brace (end of object).
210        if line[pos] == b'}' {
211            // Capture the final part: everything from part_start to end of line.
212            parts.push(&line[part_start..]);
213            break;
214        }
215
216        // Expect a key string.
217        if line[pos] != b'"' {
218            return None;
219        }
220        // Skip over the key string.
221        pos += 1;
222        let mut escaped = false;
223        while pos < line.len() {
224            if escaped {
225                escaped = false;
226            } else if line[pos] == b'\\' {
227                escaped = true;
228            } else if line[pos] == b'"' {
229                pos += 1;
230                break;
231            }
232            pos += 1;
233        }
234
235        // Skip whitespace, expect colon.
236        while pos < line.len() && line[pos].is_ascii_whitespace() {
237            pos += 1;
238        }
239        if pos >= line.len() || line[pos] != b':' {
240            return None;
241        }
242        pos += 1; // Skip colon.
243
244        // Skip whitespace after colon so it becomes part of the template.
245        // Without this, spaces in "key": value would be lost during reverse
246        // because extract_value also skips leading whitespace.
247        while pos < line.len() && line[pos].is_ascii_whitespace() {
248            pos += 1;
249        }
250
251        // Everything from part_start up to here is a "template part".
252        parts.push(&line[part_start..pos]);
253
254        // Extract the value.
255        let (value, value_end) = extract_value(line, pos)?;
256        values.push(value);
257        pos = value_end;
258
259        // Mark the start of the next part.
260        part_start = pos;
261
262        // Skip whitespace.
263        while pos < line.len() && line[pos].is_ascii_whitespace() {
264            pos += 1;
265        }
266        if pos >= line.len() {
267            return None;
268        }
269
270        // Expect comma or closing brace.
271        if line[pos] == b',' {
272            pos += 1;
273        } else if line[pos] == b'}' {
274            // Will be caught at the top of the loop next iteration — but we
275            // need to NOT advance pos, so the } check above catches it.
276            // Actually, let's just handle it here.
277            parts.push(&line[part_start..]);
278            break;
279        } else {
280            return None; // Unexpected character.
281        }
282    }
283
284    if values.is_empty() {
285        return None;
286    }
287
288    Some((parts, values))
289}
290
291/// Split NDJSON data into lines (without newline characters).
292fn split_lines(data: &[u8]) -> Vec<&[u8]> {
293    let mut lines: Vec<&[u8]> = Vec::new();
294    let mut start = 0;
295    for pos in memchr::memchr_iter(b'\n', data) {
296        lines.push(&data[start..pos]);
297        start = pos + 1;
298    }
299    if start < data.len() {
300        lines.push(&data[start..]);
301    }
302    lines
303}
304
305/// Build columnar data from parsed lines that share the same template.
306/// Returns (col_data, metadata) for a uniform group.
307fn build_uniform_columnar(
308    template_parts: &[&[u8]],
309    columns: &[Vec<&[u8]>],
310    num_rows: usize,
311    has_trailing_newline: bool,
312) -> (Vec<u8>, Vec<u8>) {
313    let num_cols = columns.len();
314
315    // Build column data: values separated by \x01, columns separated by \x00.
316    let mut col_data = Vec::new();
317    for (ci, col) in columns.iter().enumerate() {
318        for (ri, val) in col.iter().enumerate() {
319            col_data.extend_from_slice(val);
320            if ri < num_rows - 1 {
321                col_data.push(VAL_SEP);
322            }
323        }
324        if ci < num_cols - 1 {
325            col_data.push(COL_SEP);
326        }
327    }
328
329    // Build metadata: version + num_rows + num_cols + trailing_newline + template parts.
330    let mut metadata = Vec::new();
331    metadata.push(METADATA_VERSION_UNIFORM);
332    metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
333    metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
334    metadata.push(u8::from(has_trailing_newline));
335    metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
336    for part in template_parts {
337        metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
338        metadata.extend_from_slice(part);
339    }
340
341    (col_data, metadata)
342}
343
344/// Analyze columns and classify as extract (columnar) or inline (row-major).
345/// Returns a bitmask: `true` = extract (column-major), `false` = inline (row-major).
346/// High-cardinality columns with long average values are kept inline to preserve
347/// inter-row locality that benefits zstd/brotli compression.
348fn classify_columns(columns: &[Vec<&[u8]>], num_rows: usize) -> Vec<bool> {
349    use std::collections::HashSet;
350    columns
351        .iter()
352        .map(|col_values| {
353            if num_rows < 10 {
354                return true; // Too few rows to measure cardinality meaningfully
355            }
356            let unique: HashSet<&[u8]> = col_values.iter().copied().collect();
357            let cardinality_ratio = unique.len() as f64 / num_rows as f64;
358            let avg_len = col_values.iter().map(|v| v.len()).sum::<usize>() / num_rows;
359            // Inline only if BOTH high-cardinality AND long values
360            !(cardinality_ratio > SELECTIVE_MAX_CARDINALITY && avg_len >= SELECTIVE_MIN_AVG_LEN)
361        })
362        .collect()
363}
364
365/// Build selective columnar data: low-cardinality columns in column-major order,
366/// high-cardinality columns in row-major order.
367///
368/// Data layout:
369///   [extracted_data_len: u32 LE]
370///   [extracted columnar: col vals separated by \x01, columns by \x00]
371///   [inline row data: per-row inline vals separated by \x01, rows by \x00]
372///
373/// Metadata (version=3):
374///   version(3) + num_rows + num_total_cols + trailing_newline +
375///   num_extracted + extracted_col_indices + template_parts
376fn build_selective_columnar(
377    template_parts: &[&[u8]],
378    columns: &[Vec<&[u8]>],
379    extract_mask: &[bool],
380    num_rows: usize,
381    has_trailing_newline: bool,
382) -> (Vec<u8>, Vec<u8>) {
383    let num_total_cols = columns.len();
384
385    let extracted_indices: Vec<u16> = (0..num_total_cols)
386        .filter(|&i| extract_mask[i])
387        .map(|i| i as u16)
388        .collect();
389    let inline_indices: Vec<u16> = (0..num_total_cols)
390        .filter(|&i| !extract_mask[i])
391        .map(|i| i as u16)
392        .collect();
393
394    // Build extracted columnar data (column-major).
395    let mut extracted_data = Vec::new();
396    for (ei, &col_idx) in extracted_indices.iter().enumerate() {
397        let col = &columns[col_idx as usize];
398        for (ri, val) in col.iter().enumerate() {
399            extracted_data.extend_from_slice(val);
400            if ri < num_rows - 1 {
401                extracted_data.push(VAL_SEP);
402            }
403        }
404        if ei < extracted_indices.len() - 1 {
405            extracted_data.push(COL_SEP);
406        }
407    }
408
409    // Build inline row data (row-major).
410    let mut inline_data = Vec::new();
411    if !inline_indices.is_empty() {
412        #[allow(clippy::needless_range_loop)]
413        for row in 0..num_rows {
414            for (ii, &col_idx) in inline_indices.iter().enumerate() {
415                inline_data.extend_from_slice(columns[col_idx as usize][row]);
416                if ii < inline_indices.len() - 1 {
417                    inline_data.push(VAL_SEP);
418                }
419            }
420            if row < num_rows - 1 {
421                inline_data.push(COL_SEP);
422            }
423        }
424    }
425
426    // Combined data blob.
427    let mut data = Vec::with_capacity(4 + extracted_data.len() + inline_data.len());
428    data.extend_from_slice(&(extracted_data.len() as u32).to_le_bytes());
429    data.extend_from_slice(&extracted_data);
430    data.extend_from_slice(&inline_data);
431
432    // Build metadata (version=3).
433    let mut metadata = Vec::new();
434    metadata.push(METADATA_VERSION_SELECTIVE);
435    metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
436    metadata.extend_from_slice(&(num_total_cols as u16).to_le_bytes());
437    metadata.push(u8::from(has_trailing_newline));
438    metadata.extend_from_slice(&(extracted_indices.len() as u16).to_le_bytes());
439    for &idx in &extracted_indices {
440        metadata.extend_from_slice(&idx.to_le_bytes());
441    }
442    metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
443    for part in template_parts {
444        metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
445        metadata.extend_from_slice(part);
446    }
447
448    (data, metadata)
449}
450
451/// Strategy 1: Uniform schema — all rows must have the same template.
452/// Returns None if schemas differ.
453fn preprocess_uniform(
454    non_empty: &[&[u8]],
455    has_trailing_newline: bool,
456) -> Option<(Vec<u8>, Vec<u8>)> {
457    if non_empty.len() < 2 {
458        return None;
459    }
460
461    let (template_parts, first_values) = parse_line(non_empty[0])?;
462    let num_cols = first_values.len();
463    if template_parts.len() != num_cols + 1 {
464        return None;
465    }
466
467    let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
468    for v in &first_values {
469        columns.push(vec![*v]);
470    }
471
472    for &line in &non_empty[1..] {
473        let (parts, values) = parse_line(line)?;
474        if values.len() != num_cols || parts.len() != template_parts.len() {
475            return None;
476        }
477        for (a, b) in parts.iter().zip(template_parts.iter()) {
478            if a != b {
479                return None;
480            }
481        }
482        for (col, val) in values.iter().enumerate() {
483            columns[col].push(*val);
484        }
485    }
486
487    // Check if selective columnar should be used (some columns high-cardinality).
488    // Selective columnar keeps high-cardinality columns in row-major order,
489    // preserving inter-row locality that benefits downstream compression (zstd/brotli).
490    // The raw size may be slightly larger due to metadata overhead, but the
491    // compression benefit typically outweighs this.
492    let extract_mask = classify_columns(&columns, non_empty.len());
493    let all_extracted = extract_mask.iter().all(|&e| e);
494
495    if all_extracted {
496        Some(build_uniform_columnar(
497            &template_parts,
498            &columns,
499            non_empty.len(),
500            has_trailing_newline,
501        ))
502    } else {
503        Some(build_selective_columnar(
504            &template_parts,
505            &columns,
506            &extract_mask,
507            non_empty.len(),
508            has_trailing_newline,
509        ))
510    }
511}
512
513/// Find the best discriminator column for schema clustering.
514/// A discriminator is a low-cardinality, short-valued column whose value predicts
515/// the schema of other columns (e.g., "type" in GitHub Archive events).
516/// Returns the column index, or None if no good discriminator exists.
517fn find_discriminator<'a>(parsed: &[Option<ParsedLine<'a>>]) -> Option<usize> {
518    use std::collections::HashSet;
519
520    // Sample up to 200 valid parsed lines.
521    let mut samples: Vec<&ParsedLine<'a>> = Vec::new();
522    for line in parsed.iter().flatten() {
523        samples.push(line);
524        if samples.len() >= 200 {
525            break;
526        }
527    }
528
529    if samples.len() < 10 {
530        return None;
531    }
532
533    // Use the minimum column count across all samples.
534    let num_cols = samples.iter().map(|s| s.1.len()).min().unwrap_or(0);
535    if num_cols == 0 {
536        return None;
537    }
538
539    let mut best_col = None;
540    let mut best_cardinality = usize::MAX;
541
542    for col_idx in 0..num_cols {
543        // Compute average value length — skip long values (nested objects).
544        let total_len: usize = samples.iter().map(|s| s.1[col_idx].len()).sum();
545        let avg_len = total_len / samples.len();
546        if avg_len > 30 {
547            continue;
548        }
549
550        let unique: HashSet<&[u8]> = samples.iter().map(|s| s.1[col_idx]).collect();
551        let cardinality = unique.len();
552
553        // Good discriminator: > 1 distinct value, < 1/3 of rows distinct.
554        if cardinality > 1 && cardinality < samples.len() / 3 && cardinality < best_cardinality {
555            best_col = Some(col_idx);
556            best_cardinality = cardinality;
557        }
558    }
559
560    best_col
561}
562
563/// Strategy 2: Group-by-schema — group rows by template, columnarize each group.
564///
565/// Metadata format (version=2):
566///   version: u8 = 2
567///   has_trailing_newline: u8
568///   total_rows: u32 LE
569///   num_groups: u16 LE
570///   for each group:
571///     num_rows: u32 LE
572///     row_indices: [u32 LE * num_rows]
573///     group_metadata_len: u32 LE
574///     group_metadata: [bytes]  (Strategy 1 metadata for this group)
575///   residual_count: u32 LE
576///   residual_indices: [u32 LE * residual_count]
577///
578/// Data format:
579///   for each group:
580///     data_len: u32 LE
581///     data: [bytes]  (columnar data for this group)
582///   residual_data: [bytes]  (raw lines joined by \n)
583fn preprocess_grouped<'a>(
584    non_empty: &[&'a [u8]],
585    has_trailing_newline: bool,
586) -> Option<(Vec<u8>, Vec<u8>)> {
587    if non_empty.len() < MIN_GROUP_ROWS {
588        return None;
589    }
590
591    // Detect discriminator column for potential sub-grouping.
592    let parsed: Vec<Option<ParsedLine<'a>>> = non_empty.iter().map(|&l| parse_line(l)).collect();
593    let disc_col = find_discriminator(&parsed);
594    drop(parsed);
595
596    // Try schema-only grouping first (always).
597    let result_no_disc = preprocess_grouped_core(non_empty, has_trailing_newline, None);
598
599    // If a discriminator exists, also try sub-grouped encoding and pick the winner.
600    if let Some(dc) = disc_col {
601        let result_disc = preprocess_grouped_core(non_empty, has_trailing_newline, Some(dc));
602        match (&result_no_disc, &result_disc) {
603            (Some((d1, m1)), Some((d2, m2))) => {
604                if d2.len() + m2.len() < d1.len() + m1.len() {
605                    return result_disc;
606                }
607                return result_no_disc;
608            }
609            (None, Some(_)) => return result_disc,
610            _ => return result_no_disc,
611        }
612    }
613
614    result_no_disc
615}
616
617/// Core grouped encoding: group by schema, optionally sub-group by discriminator,
618/// build columnar data per group with optional nested flatten.
619fn preprocess_grouped_core<'a>(
620    non_empty: &[&'a [u8]],
621    has_trailing_newline: bool,
622    disc_col: Option<usize>,
623) -> Option<(Vec<u8>, Vec<u8>)> {
624    // Parse all lines and group by template.
625    let mut parsed: Vec<Option<ParsedLine<'a>>> = Vec::with_capacity(non_empty.len());
626    for &line in non_empty {
627        parsed.push(parse_line(line));
628    }
629
630    let mut group_map: HashMap<Vec<u8>, SchemaGroup<'a>> = HashMap::new();
631    let mut residual_indices: Vec<usize> = Vec::new();
632
633    for (idx, parsed_line) in parsed.into_iter().enumerate() {
634        if let Some((parts, values)) = parsed_line {
635            let mut key = Vec::new();
636            for part in &parts {
637                key.extend_from_slice(&(part.len() as u32).to_le_bytes());
638                key.extend_from_slice(part);
639            }
640            // If discriminator column specified, include its value for sub-grouping.
641            if let Some(dc) = disc_col {
642                if dc < values.len() {
643                    key.push(0xFF);
644                    key.extend_from_slice(values[dc]);
645                }
646            }
647            group_map
648                .entry(key)
649                .or_insert_with(|| (parts, Vec::new()))
650                .1
651                .push((idx, values));
652        } else {
653            residual_indices.push(idx);
654        }
655    }
656
657    // Separate groups into qualifying (>= MIN_GROUP_ROWS) and residual.
658    let mut groups: Vec<SchemaGroup<'a>> = Vec::new();
659    for (_key, (template_parts, rows)) in group_map {
660        if rows.len() >= MIN_GROUP_ROWS {
661            groups.push((template_parts, rows));
662        } else {
663            // Too few rows — send to residual.
664            for (idx, _) in &rows {
665                residual_indices.push(*idx);
666            }
667        }
668    }
669
670    // Need at least 1 qualifying group for this to be useful.
671    if groups.is_empty() {
672        return None;
673    }
674
675    // Sort groups by their first row index for deterministic output.
676    groups.sort_by_key(|(_, rows)| rows[0].0);
677    residual_indices.sort_unstable();
678
679    // Build per-group columnar data and metadata.
680    struct GroupOutput {
681        row_indices: Vec<u32>,
682        col_data: Vec<u8>,
683        group_metadata: Vec<u8>,
684    }
685
686    let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
687
688    for (template_parts, rows) in &groups {
689        let num_cols = template_parts.len() - 1;
690        let mut columns: Vec<Vec<&[u8]>> = (0..num_cols).map(|_| Vec::new()).collect();
691        let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
692
693        for (idx, values) in rows {
694            row_indices.push(*idx as u32);
695            for (col, val) in values.iter().enumerate() {
696                columns[col].push(*val);
697            }
698        }
699
700        // Decide: uniform (version=1) or selective (version=3) per group.
701        let extract_mask = classify_columns(&columns, rows.len());
702        let all_extracted = extract_mask.iter().all(|&e| e);
703
704        let (mut col_data, mut group_metadata) = if all_extracted {
705            build_uniform_columnar(template_parts, &columns, rows.len(), false)
706        } else {
707            build_selective_columnar(template_parts, &columns, &extract_mask, rows.len(), false)
708        };
709
710        // Try nested object flatten for uniform groups.
711        // Decomposes nested JSON objects (e.g., payload, actor) into sub-columns
712        // for better compression when rows are schema-clustered.
713        if all_extracted {
714            if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, rows.len())
715            {
716                let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
717                let unflattened = unflatten_nested_columns(
718                    &flat_data,
719                    &nested_groups,
720                    rows.len(),
721                    total_flat_cols,
722                );
723                if unflattened == col_data {
724                    let nested_bytes = serialize_nested_info(&nested_groups);
725                    group_metadata.extend_from_slice(&nested_bytes);
726                    col_data = flat_data;
727                } else {
728                    group_metadata.push(0u8); // has_nested = 0
729                }
730            } else {
731                group_metadata.push(0u8); // has_nested = 0
732            }
733        }
734
735        group_outputs.push(GroupOutput {
736            row_indices,
737            col_data,
738            group_metadata,
739        });
740    }
741
742    // Build the combined data blob.
743    let mut data_out = Vec::new();
744    for group in &group_outputs {
745        data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
746        data_out.extend_from_slice(&group.col_data);
747    }
748
749    // Append residual lines (raw, separated by \n).
750    let residual_start = data_out.len();
751    for (i, &idx) in residual_indices.iter().enumerate() {
752        data_out.extend_from_slice(non_empty[idx]);
753        if i < residual_indices.len() - 1 {
754            data_out.push(b'\n');
755        }
756    }
757    let _residual_len = data_out.len() - residual_start;
758
759    // Build the combined metadata.
760    let mut metadata = Vec::new();
761    metadata.push(METADATA_VERSION_GROUPED);
762    metadata.push(u8::from(has_trailing_newline));
763    metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
764    metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
765
766    for group in &group_outputs {
767        metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
768        for &idx in &group.row_indices {
769            metadata.extend_from_slice(&idx.to_le_bytes());
770        }
771        metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
772        metadata.extend_from_slice(&group.group_metadata);
773    }
774
775    metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
776    for &idx in &residual_indices {
777        metadata.extend_from_slice(&(idx as u32).to_le_bytes());
778    }
779
780    Some((data_out, metadata))
781}
782
783/// Metadata describing which columns were flattened from nested objects.
784pub(crate) struct NestedGroupInfo {
785    /// Index of the original column that was expanded.
786    pub(crate) original_col_index: u16,
787    /// Sub-key names for the expanded sub-columns.
788    pub(crate) sub_keys: Vec<Vec<u8>>,
789    /// Template parts for reconstructing nested objects (preserves original formatting).
790    /// If empty, compact format `{"key":val,...}` is used (NDJSON compatibility).
791    pub(crate) nested_template: Vec<Vec<u8>>,
792    /// Absence bitmap: one bit per (sub_key, row). Bit=1 means the key was ABSENT
793    /// in the original object (not present at all), vs bit=0 means key was present
794    /// (possibly with explicit `null`). Packed LSB-first.
795    /// Length = ceil(num_sub_keys * num_rows / 8).
796    /// Empty if all rows had all keys (no absences).
797    pub(crate) absence_bitmap: Vec<u8>,
798}
799
800/// Attempt to flatten nested JSON objects in columnar data (depth-1).
801///
802/// Takes columnar data (\x00/\x01 separated) and returns expanded columnar data
803/// with nested objects decomposed into sub-columns.
804/// Returns None if no nested objects found.
805pub(crate) fn flatten_nested_columns(
806    col_data: &[u8],
807    num_rows: usize,
808) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
809    // Split into columns.
810    let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
811    if columns.is_empty() || num_rows == 0 {
812        return None;
813    }
814
815    let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
816    // Build the output columns: for non-nested cols, keep as-is.
817    // For nested cols, replace with sub-columns.
818    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
819
820    for (col_idx, &col_chunk) in columns.iter().enumerate() {
821        let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
822        if values.len() != num_rows {
823            return None;
824        }
825
826        // Check if ALL non-null values start with '{' (nested object).
827        let mut all_objects = true;
828        let mut has_non_null = false;
829        for val in &values {
830            if *val == b"null" {
831                continue;
832            }
833            has_non_null = true;
834            if !val.starts_with(b"{") {
835                all_objects = false;
836                break;
837            }
838        }
839
840        if !all_objects || !has_non_null {
841            // Not a nested-object column — keep as-is.
842            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
843            output_columns.push(col_values);
844            continue;
845        }
846
847        // This column contains nested objects — decompose depth-1.
848        // Parse all values and collect all unique sub-keys (preserving discovery order).
849        // Also capture the template from the first non-null object to preserve formatting.
850        let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
851        let mut nested_template: Vec<Vec<u8>> = Vec::new();
852        type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
853        let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
854
855        for val in &values {
856            if *val == b"null" {
857                parsed_rows.push(None);
858                continue;
859            }
860            if nested_template.is_empty() {
861                // First non-null: use template-preserving parser.
862                match parse_nested_object_with_template(val) {
863                    Some((template, kv_pairs)) => {
864                        for (key, _) in &kv_pairs {
865                            if !all_sub_keys.iter().any(|k| k == key) {
866                                all_sub_keys.push(key.clone());
867                            }
868                        }
869                        nested_template = template;
870                        parsed_rows.push(Some(kv_pairs));
871                    }
872                    None => {
873                        all_sub_keys.clear();
874                        break;
875                    }
876                }
877            } else {
878                // Subsequent rows: use simpler kv parser (template already captured).
879                match parse_nested_object_kv(val) {
880                    Some(kv_pairs) => {
881                        for (key, _) in &kv_pairs {
882                            if !all_sub_keys.iter().any(|k| k == key) {
883                                all_sub_keys.push(key.clone());
884                            }
885                        }
886                        parsed_rows.push(Some(kv_pairs));
887                    }
888                    None => {
889                        all_sub_keys.clear();
890                        break;
891                    }
892                }
893            }
894        }
895
896        if all_sub_keys.is_empty() {
897            // Could not parse — keep column as-is.
898            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
899            output_columns.push(col_values);
900            continue;
901        }
902
903        // Build sub-columns: for each sub-key, extract values from parsed rows.
904        // Also build an absence bitmap: bit=1 where a key was absent from the
905        // original row (as opposed to being present with explicit `null`).
906        let num_sub_keys = all_sub_keys.len();
907        let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
908        let total_bits = num_sub_keys * num_rows;
909        let bitmap_bytes = total_bits.div_ceil(8);
910        let mut absence_bitmap = vec![0u8; bitmap_bytes];
911        let mut has_any_absent = false;
912
913        for (row_idx, parsed) in parsed_rows.iter().enumerate() {
914            match parsed {
915                Some(kv_pairs) => {
916                    for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
917                        let found = kv_pairs.iter().find(|(k, _)| k == sk);
918                        match found {
919                            Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
920                            None => {
921                                sub_columns[sk_idx].push(b"null".to_vec());
922                                // Mark this (sub_key, row) as absent.
923                                let bit_idx = sk_idx * num_rows + row_idx;
924                                absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
925                                has_any_absent = true;
926                            }
927                        }
928                    }
929                }
930                None => {
931                    // null row — all sub-columns get null.
932                    // These are NOT marked as absent (the whole column was null,
933                    // not individual keys missing).
934                    for sc in &mut sub_columns {
935                        sc.push(b"null".to_vec());
936                    }
937                }
938            }
939        }
940
941        nested_groups.push(NestedGroupInfo {
942            original_col_index: col_idx as u16,
943            sub_keys: all_sub_keys,
944            nested_template,
945            absence_bitmap: if has_any_absent {
946                absence_bitmap
947            } else {
948                Vec::new()
949            },
950        });
951
952        for sc in sub_columns {
953            output_columns.push(sc);
954        }
955    }
956
957    if nested_groups.is_empty() {
958        return None;
959    }
960
961    // Build the flattened columnar data.
962    let num_out_cols = output_columns.len();
963    let mut out = Vec::new();
964    for (ci, col) in output_columns.iter().enumerate() {
965        for (ri, val) in col.iter().enumerate() {
966            out.extend_from_slice(val);
967            if ri < num_rows - 1 {
968                out.push(VAL_SEP);
969            }
970        }
971        if ci < num_out_cols - 1 {
972            out.push(COL_SEP);
973        }
974    }
975
976    Some((out, nested_groups))
977}
978
979/// Parse a nested JSON object into (template_parts, kv_pairs).
980/// Template parts include all structural bytes (braces, keys, colons, whitespace) —
981/// preserving the original formatting so the object can be reconstructed exactly.
982/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
983#[allow(clippy::type_complexity)]
984pub(crate) fn parse_nested_object_with_template(
985    obj: &[u8],
986) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
987    let mut pos = 0;
988
989    // Skip whitespace.
990    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
991        pos += 1;
992    }
993    if pos >= obj.len() || obj[pos] != b'{' {
994        return None;
995    }
996    pos += 1;
997
998    let mut parts: Vec<Vec<u8>> = Vec::new();
999    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1000    let mut part_start = 0;
1001
1002    loop {
1003        // Skip whitespace.
1004        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1005            pos += 1;
1006        }
1007        if pos >= obj.len() {
1008            return None;
1009        }
1010        if obj[pos] == b'}' {
1011            parts.push(obj[part_start..].to_vec());
1012            break;
1013        }
1014
1015        // Expect a key string.
1016        if obj[pos] != b'"' {
1017            return None;
1018        }
1019        let key_str_start = pos + 1;
1020        pos += 1;
1021        let mut escaped = false;
1022        while pos < obj.len() {
1023            if escaped {
1024                escaped = false;
1025            } else if obj[pos] == b'\\' {
1026                escaped = true;
1027            } else if obj[pos] == b'"' {
1028                break;
1029            }
1030            pos += 1;
1031        }
1032        if pos >= obj.len() {
1033            return None;
1034        }
1035        let key = obj[key_str_start..pos].to_vec();
1036        pos += 1; // skip closing quote
1037
1038        // Skip whitespace, expect colon.
1039        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1040            pos += 1;
1041        }
1042        if pos >= obj.len() || obj[pos] != b':' {
1043            return None;
1044        }
1045        pos += 1;
1046
1047        // Skip whitespace between colon and value — include in template.
1048        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1049            pos += 1;
1050        }
1051
1052        // Template part: everything from part_start to here (includes key, colon, post-colon ws).
1053        parts.push(obj[part_start..pos].to_vec());
1054
1055        // Extract the value (no whitespace skipping — already consumed above).
1056        let value_start = pos;
1057        // Use extract_value but we've already consumed whitespace.
1058        let (value, value_end) = extract_value(obj, value_start)?;
1059        pos = value_end;
1060        pairs.push((key, value.to_vec()));
1061
1062        part_start = pos;
1063
1064        // Skip whitespace.
1065        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1066            pos += 1;
1067        }
1068        if pos >= obj.len() {
1069            return None;
1070        }
1071        if obj[pos] == b',' {
1072            pos += 1;
1073        } else if obj[pos] == b'}' {
1074            parts.push(obj[part_start..].to_vec());
1075            break;
1076        } else {
1077            return None;
1078        }
1079    }
1080
1081    if pairs.is_empty() {
1082        return None;
1083    }
1084    Some((parts, pairs))
1085}
1086
1087/// Parse a nested JSON object into its key-value pairs (depth-1 only).
1088/// Returns the exact bytes for each key and value.
1089/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
1090pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
1091    let mut pos = 0;
1092
1093    // Skip whitespace.
1094    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1095        pos += 1;
1096    }
1097    if pos >= obj.len() || obj[pos] != b'{' {
1098        return None;
1099    }
1100    pos += 1;
1101
1102    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1103
1104    loop {
1105        // Skip whitespace.
1106        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1107            pos += 1;
1108        }
1109        if pos >= obj.len() {
1110            return None;
1111        }
1112        if obj[pos] == b'}' {
1113            break;
1114        }
1115
1116        // Expect a key string.
1117        if obj[pos] != b'"' {
1118            return None;
1119        }
1120        pos += 1;
1121        let key_start = pos;
1122        let mut escaped = false;
1123        while pos < obj.len() {
1124            if escaped {
1125                escaped = false;
1126            } else if obj[pos] == b'\\' {
1127                escaped = true;
1128            } else if obj[pos] == b'"' {
1129                break;
1130            }
1131            pos += 1;
1132        }
1133        if pos >= obj.len() {
1134            return None;
1135        }
1136        let key = obj[key_start..pos].to_vec();
1137        pos += 1; // skip closing quote
1138
1139        // Skip whitespace, expect colon.
1140        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1141            pos += 1;
1142        }
1143        if pos >= obj.len() || obj[pos] != b':' {
1144            return None;
1145        }
1146        pos += 1;
1147
1148        // Extract the value.
1149        let (value, value_end) = extract_value(obj, pos)?;
1150        pos = value_end;
1151        pairs.push((key, value.to_vec()));
1152
1153        // Skip whitespace.
1154        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1155            pos += 1;
1156        }
1157        if pos >= obj.len() {
1158            return None;
1159        }
1160        if obj[pos] == b',' {
1161            pos += 1;
1162        } else if obj[pos] == b'}' {
1163            break;
1164        } else {
1165            return None;
1166        }
1167    }
1168
1169    if pairs.is_empty() {
1170        return None;
1171    }
1172    Some(pairs)
1173}
1174
1175/// Unflatten nested sub-columns back into JSON objects.
1176///
1177/// Takes flattened columnar data and nested group info, merges sub-columns
1178/// back into the original nested object columns.
1179pub(crate) fn unflatten_nested_columns(
1180    flat_data: &[u8],
1181    nested_groups: &[NestedGroupInfo],
1182    num_rows: usize,
1183    total_flat_cols: usize,
1184) -> Vec<u8> {
1185    let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
1186    if flat_columns.len() != total_flat_cols {
1187        return flat_data.to_vec();
1188    }
1189
1190    // Parse all flat column values.
1191    let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
1192    for chunk in &flat_columns {
1193        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1194        if vals.len() != num_rows {
1195            return flat_data.to_vec();
1196        }
1197        flat_col_values.push(vals);
1198    }
1199
1200    // Reconstruct original columns from flat columns.
1201    // Walk through flat columns, merging sub-columns back where needed.
1202    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
1203
1204    // Build a set of original_col_index -> group for quick lookup.
1205    // We need to know which flat columns map to which nested group.
1206    // The flat columns are in order: non-nested cols keep their position,
1207    // nested cols are replaced by their sub-columns at that position.
1208    //
1209    // To figure out which flat_idx corresponds to what, we replay the
1210    // forward mapping.
1211    // We need to know the ORIGINAL number of columns.
1212    let original_num_cols = total_flat_cols
1213        - nested_groups
1214            .iter()
1215            .map(|g| g.sub_keys.len())
1216            .sum::<usize>()
1217        + nested_groups.len();
1218
1219    // Build mapping: for each original col, is it nested or not?
1220    let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
1221    for (gi, group) in nested_groups.iter().enumerate() {
1222        if (group.original_col_index as usize) < original_num_cols {
1223            original_col_map[group.original_col_index as usize] = Some(gi);
1224        }
1225    }
1226
1227    let mut flat_idx = 0;
1228    for entry in original_col_map.iter().take(original_num_cols) {
1229        if let Some(gi) = entry {
1230            let group = &nested_groups[*gi];
1231            let num_sub = group.sub_keys.len();
1232
1233            // Helper: check if sub-key `si` at `row` is absent using bitmap.
1234            let is_absent = |si: usize, row: usize| -> bool {
1235                if group.absence_bitmap.is_empty() {
1236                    return false; // no absences in this group
1237                }
1238                let bit_idx = si * num_rows + row;
1239                let byte_idx = bit_idx / 8;
1240                if byte_idx >= group.absence_bitmap.len() {
1241                    return false;
1242                }
1243                (group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
1244            };
1245
1246            // Merge sub-columns back into nested objects.
1247            let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
1248            for row in 0..num_rows {
1249                // Check if all sub-columns are null or absent for this row
1250                // (meaning the whole nested column was null).
1251                let all_null = (0..num_sub).all(|si| {
1252                    flat_idx + si < flat_col_values.len()
1253                        && flat_col_values[flat_idx + si][row] == b"null"
1254                });
1255                if all_null && !group.absence_bitmap.is_empty() {
1256                    // If all values are null but some are "absent" and some are
1257                    // "explicit null", we need to reconstruct, not collapse.
1258                    let any_present_null = (0..num_sub).any(|si| {
1259                        flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
1260                    });
1261                    if any_present_null {
1262                        // At least one key has an explicit null — don't collapse.
1263                        // Fall through to reconstruction below.
1264                    } else {
1265                        // All nulls are from absent keys — whole column was null.
1266                        merged_col.push(b"null".to_vec());
1267                        continue;
1268                    }
1269                } else if all_null {
1270                    merged_col.push(b"null".to_vec());
1271                    continue;
1272                }
1273
1274                // Check whether any sub-key is absent in this row.
1275                let has_absent = (0..num_sub).any(|si| is_absent(si, row));
1276
1277                if !has_absent
1278                    && !group.nested_template.is_empty()
1279                    && group.nested_template.len() == num_sub + 1
1280                {
1281                    // Template-based reconstruction: all keys present,
1282                    // preserves original formatting exactly.
1283                    let mut obj = Vec::new();
1284                    obj.extend_from_slice(&group.nested_template[0]);
1285                    if flat_idx < flat_col_values.len() {
1286                        obj.extend_from_slice(flat_col_values[flat_idx][row]);
1287                    }
1288                    for si in 1..num_sub {
1289                        obj.extend_from_slice(&group.nested_template[si]);
1290                        if flat_idx + si < flat_col_values.len() {
1291                            obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
1292                        }
1293                    }
1294                    obj.extend_from_slice(&group.nested_template[num_sub]);
1295                    merged_col.push(obj);
1296                } else {
1297                    // Compact reconstruction: some keys absent, or no template.
1298                    // Skip sub-keys that were absent in the original.
1299                    let mut obj = Vec::new();
1300                    obj.push(b'{');
1301                    let mut first = true;
1302                    for si in 0..num_sub {
1303                        if flat_idx + si >= flat_col_values.len() {
1304                            break;
1305                        }
1306                        if is_absent(si, row) {
1307                            continue; // key was absent — omit entirely
1308                        }
1309                        let val = flat_col_values[flat_idx + si][row];
1310                        if !first {
1311                            obj.push(b',');
1312                        }
1313                        first = false;
1314                        obj.push(b'"');
1315                        obj.extend_from_slice(&group.sub_keys[si]);
1316                        obj.push(b'"');
1317                        obj.push(b':');
1318                        obj.extend_from_slice(val);
1319                    }
1320                    obj.push(b'}');
1321                    merged_col.push(obj);
1322                }
1323            }
1324            output_columns.push(merged_col);
1325            flat_idx += num_sub;
1326        } else {
1327            // Non-nested column — copy as-is.
1328            if flat_idx < flat_col_values.len() {
1329                let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1330                    .iter()
1331                    .map(|v| v.to_vec())
1332                    .collect();
1333                output_columns.push(col);
1334            }
1335            flat_idx += 1;
1336        }
1337    }
1338
1339    // Rebuild columnar data.
1340    let num_out_cols = output_columns.len();
1341    let mut out = Vec::new();
1342    for (ci, col) in output_columns.iter().enumerate() {
1343        for (ri, val) in col.iter().enumerate() {
1344            out.extend_from_slice(val);
1345            if ri < num_rows - 1 {
1346                out.push(VAL_SEP);
1347            }
1348        }
1349        if ci < num_out_cols - 1 {
1350            out.push(COL_SEP);
1351        }
1352    }
1353
1354    out
1355}
1356
1357/// Serialize nested group info into bytes for storage in metadata.
1358/// Version 1 (has_nested=1): sub_keys only (backward compat, NDJSON path).
1359/// Version 2 (has_nested=2): sub_keys + nested_template (preserves formatting).
1360/// Version 3 (has_nested=3): sub_keys + nested_template + absence_bitmap.
1361pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1362    let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1363    let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
1364    let mut out = Vec::new();
1365    let version = if has_absence {
1366        3u8
1367    } else if has_template {
1368        2u8
1369    } else {
1370        1u8
1371    };
1372    out.push(version);
1373    out.push(groups.len() as u8);
1374    for group in groups {
1375        out.extend_from_slice(&group.original_col_index.to_le_bytes());
1376        out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1377        for key in &group.sub_keys {
1378            out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1379            out.extend_from_slice(key);
1380        }
1381        if has_template || version == 3 {
1382            out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1383            for part in &group.nested_template {
1384                out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1385                out.extend_from_slice(part);
1386            }
1387        }
1388        if version == 3 {
1389            let bm_len = group.absence_bitmap.len() as u32;
1390            out.extend_from_slice(&bm_len.to_le_bytes());
1391            out.extend_from_slice(&group.absence_bitmap);
1392        }
1393    }
1394    out
1395}
1396
1397/// Deserialize nested group info from metadata bytes.
1398/// Returns (nested_groups, bytes_consumed).
1399/// Handles version 1 (no template), version 2 (with template),
1400/// and version 3 (template + absence bitmap).
1401pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1402    if data.is_empty() {
1403        return None;
1404    }
1405    let mut pos = 0;
1406    let version = data[pos];
1407    pos += 1;
1408    if version != 1 && version != 2 && version != 3 {
1409        return None;
1410    }
1411    let has_template = version == 2 || version == 3;
1412    let has_absence = version == 3;
1413    if pos >= data.len() {
1414        return None;
1415    }
1416    let num_groups = data[pos] as usize;
1417    pos += 1;
1418
1419    let mut groups = Vec::with_capacity(num_groups);
1420    for _ in 0..num_groups {
1421        if pos + 4 > data.len() {
1422            return None;
1423        }
1424        let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1425        pos += 2;
1426        let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1427        pos += 2;
1428
1429        let mut sub_keys = Vec::with_capacity(num_sub_cols);
1430        for _ in 0..num_sub_cols {
1431            if pos + 2 > data.len() {
1432                return None;
1433            }
1434            let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1435            pos += 2;
1436            if pos + key_len > data.len() {
1437                return None;
1438            }
1439            sub_keys.push(data[pos..pos + key_len].to_vec());
1440            pos += key_len;
1441        }
1442
1443        let nested_template = if has_template {
1444            if pos + 2 > data.len() {
1445                return None;
1446            }
1447            let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1448            pos += 2;
1449            let mut parts = Vec::with_capacity(num_parts);
1450            for _ in 0..num_parts {
1451                if pos + 2 > data.len() {
1452                    return None;
1453                }
1454                let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1455                pos += 2;
1456                if pos + part_len > data.len() {
1457                    return None;
1458                }
1459                parts.push(data[pos..pos + part_len].to_vec());
1460                pos += part_len;
1461            }
1462            parts
1463        } else {
1464            Vec::new()
1465        };
1466
1467        let absence_bitmap = if has_absence {
1468            if pos + 4 > data.len() {
1469                return None;
1470            }
1471            let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1472            pos += 4;
1473            if pos + bm_len > data.len() {
1474                return None;
1475            }
1476            let bm = data[pos..pos + bm_len].to_vec();
1477            pos += bm_len;
1478            bm
1479        } else {
1480            Vec::new()
1481        };
1482
1483        groups.push(NestedGroupInfo {
1484            original_col_index,
1485            sub_keys,
1486            nested_template,
1487            absence_bitmap,
1488        });
1489    }
1490
1491    Some((groups, pos))
1492}
1493
1494/// Forward transform: NDJSON columnar reorg.
1495///
1496/// Tries Strategy 1 (uniform) first, then Strategy 2 (grouped) if schemas differ.
1497/// Returns None if data is not suitable for columnar transform.
1498pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1499    if data.is_empty() {
1500        return None;
1501    }
1502
1503    let has_trailing_newline = data.last() == Some(&b'\n');
1504    let lines = split_lines(data);
1505    let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1506
1507    if non_empty.len() < 2 {
1508        return None;
1509    }
1510
1511    // Strategy 1: try uniform schema first.
1512    if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1513        let is_selective = !metadata.is_empty() && metadata[0] == METADATA_VERSION_SELECTIVE;
1514        // Selective columnar may be slightly larger raw (metadata overhead) but the
1515        // row-major inline layout benefits downstream compression significantly.
1516        // Allow up to 5% overhead for selective; strict for uniform.
1517        let size_ok = if is_selective {
1518            (col_data.len() + metadata.len()) * 100 <= data.len() * 105
1519        } else {
1520            col_data.len() + metadata.len() < data.len()
1521        };
1522        if size_ok {
1523            // Selective columnar (version=3) has a different data layout — skip nested flatten.
1524            if is_selective {
1525                return Some(TransformResult {
1526                    data: col_data,
1527                    metadata,
1528                });
1529            }
1530            // Try depth-1 nested decomposition on the columnar output.
1531            // Even if the flattened data is slightly larger raw, the downstream
1532            // typed encoding + compression benefits are significant: null bitmaps
1533            // are compact and type-homogeneous columns compress much better.
1534            let num_rows = non_empty.len();
1535            if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1536                // Verify roundtrip: unflatten must produce the exact original columnar
1537                // data. Nested objects with varying sub-key sets or key ordering can
1538                // cause the compact reconstruction to reorder keys, breaking byte-exact
1539                // roundtrip. Only apply if the unflatten is provably lossless.
1540                let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
1541                let unflattened =
1542                    unflatten_nested_columns(&flat_data, &nested_groups, num_rows, total_flat_cols);
1543                if unflattened == col_data {
1544                    // Append nested info to metadata.
1545                    let nested_bytes = serialize_nested_info(&nested_groups);
1546                    metadata.extend_from_slice(&nested_bytes);
1547                    return Some(TransformResult {
1548                        data: flat_data,
1549                        metadata,
1550                    });
1551                }
1552                // else: roundtrip not exact — skip nested flatten.
1553            }
1554            // No nested objects found — append has_nested=0.
1555            metadata.push(0u8); // has_nested = 0
1556            return Some(TransformResult {
1557                data: col_data,
1558                metadata,
1559            });
1560        }
1561    }
1562
1563    // Strategy 2: group by schema.
1564    if let Some((grouped_data, grouped_metadata)) =
1565        preprocess_grouped(&non_empty, has_trailing_newline)
1566    {
1567        if grouped_data.len() + grouped_metadata.len() < data.len() {
1568            return Some(TransformResult {
1569                data: grouped_data,
1570                metadata: grouped_metadata,
1571            });
1572        }
1573    }
1574
1575    None
1576}
1577
1578/// Reverse transform: reconstruct NDJSON from columnar layout + metadata.
1579/// Dispatches to the appropriate decoder based on metadata version byte.
1580pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1581    if metadata.is_empty() {
1582        return data.to_vec();
1583    }
1584    match metadata[0] {
1585        METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1586        METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1587        METADATA_VERSION_SELECTIVE => reverse_selective(data, metadata),
1588        _ => data.to_vec(),
1589    }
1590}
1591
1592/// Reverse Strategy 1: uniform schema.
1593fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1594    if metadata.len() < 10 {
1595        return data.to_vec();
1596    }
1597    let mut pos = 0;
1598    // Skip version byte (reserved for format changes).
1599    pos += 1;
1600    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1601    pos += 4;
1602    let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1603    pos += 2;
1604    let has_trailing_newline = metadata[pos] != 0;
1605    pos += 1;
1606    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1607    pos += 2;
1608
1609    let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1610    for _ in 0..num_parts {
1611        if pos + 2 > metadata.len() {
1612            return data.to_vec();
1613        }
1614        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1615        pos += 2;
1616        if pos + part_len > metadata.len() {
1617            return data.to_vec();
1618        }
1619        parts.push(metadata[pos..pos + part_len].to_vec());
1620        pos += part_len;
1621    }
1622
1623    if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1624        return data.to_vec();
1625    }
1626
1627    // Check for nested metadata after template parts.
1628    let remaining_metadata = &metadata[pos..];
1629    if !remaining_metadata.is_empty()
1630        && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
1631    {
1632        // has_nested == 1, 2, or 3: unflatten before reconstructing rows.
1633        if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1634            // Calculate total number of flat columns.
1635            let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1636            let unflattened =
1637                unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1638            return reverse_uniform_from_parts(
1639                &unflattened,
1640                &parts,
1641                num_rows,
1642                num_cols,
1643                has_trailing_newline,
1644            );
1645        }
1646    }
1647
1648    reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1649}
1650
1651/// Core uniform reverse: given parsed parts, reconstruct lines from columnar data.
1652fn reverse_uniform_from_parts(
1653    data: &[u8],
1654    parts: &[Vec<u8>],
1655    num_rows: usize,
1656    num_cols: usize,
1657    has_trailing_newline: bool,
1658) -> Vec<u8> {
1659    let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1660    if col_chunks.len() != num_cols {
1661        return data.to_vec();
1662    }
1663
1664    let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1665    for chunk in &col_chunks {
1666        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1667        if vals.len() != num_rows {
1668            return data.to_vec();
1669        }
1670        columns.push(vals);
1671    }
1672
1673    // Pre-calculate exact output size to allocate once (avoid incremental realloc).
1674    let template_size_per_row: usize = parts.iter().map(std::vec::Vec::len).sum();
1675    let values_total: usize = columns
1676        .iter()
1677        .map(|col| col.iter().map(|v| v.len()).sum::<usize>())
1678        .sum();
1679    let newline_count = if has_trailing_newline {
1680        num_rows
1681    } else {
1682        num_rows - 1
1683    };
1684    let total_size = template_size_per_row * num_rows + values_total + newline_count;
1685
1686    let mut output = Vec::with_capacity(total_size);
1687    #[allow(clippy::needless_range_loop)]
1688    for row in 0..num_rows {
1689        output.extend_from_slice(&parts[0]);
1690        output.extend_from_slice(columns[0][row]);
1691        for col in 1..num_cols {
1692            output.extend_from_slice(&parts[col]);
1693            output.extend_from_slice(columns[col][row]);
1694        }
1695        output.extend_from_slice(&parts[num_cols]);
1696
1697        if row < num_rows - 1 || has_trailing_newline {
1698            output.push(b'\n');
1699        }
1700    }
1701
1702    output
1703}
1704
1705/// Parsed selective metadata.
1706struct SelectiveMetadata {
1707    parts: Vec<Vec<u8>>,
1708    num_rows: usize,
1709    num_total_cols: usize,
1710    has_trailing_newline: bool,
1711    extracted_col_indices: Vec<u16>,
1712}
1713
1714/// Parse version=3 (selective) metadata.
1715fn parse_selective_metadata(metadata: &[u8]) -> Option<SelectiveMetadata> {
1716    if metadata.len() < 12 {
1717        return None;
1718    }
1719    let mut pos = 1; // Skip version byte.
1720    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1721    pos += 4;
1722    let num_total_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1723    pos += 2;
1724    let has_trailing_newline = metadata[pos] != 0;
1725    pos += 1;
1726    let num_extracted = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1727    pos += 2;
1728
1729    let mut extracted_col_indices = Vec::with_capacity(num_extracted);
1730    for _ in 0..num_extracted {
1731        if pos + 2 > metadata.len() {
1732            return None;
1733        }
1734        let idx = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap());
1735        pos += 2;
1736        extracted_col_indices.push(idx);
1737    }
1738
1739    if pos + 2 > metadata.len() {
1740        return None;
1741    }
1742    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1743    pos += 2;
1744
1745    let mut parts = Vec::with_capacity(num_parts);
1746    for _ in 0..num_parts {
1747        if pos + 2 > metadata.len() {
1748            return None;
1749        }
1750        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1751        pos += 2;
1752        if pos + part_len > metadata.len() {
1753            return None;
1754        }
1755        parts.push(metadata[pos..pos + part_len].to_vec());
1756        pos += part_len;
1757    }
1758
1759    if parts.len() != num_total_cols + 1 || num_rows == 0 || num_total_cols == 0 {
1760        return None;
1761    }
1762
1763    Some(SelectiveMetadata {
1764        parts,
1765        num_rows,
1766        num_total_cols,
1767        has_trailing_newline,
1768        extracted_col_indices,
1769    })
1770}
1771
1772/// Reverse selective columnar: reconstruct rows from split extracted/inline sections.
1773fn reverse_selective(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1774    let sm = match parse_selective_metadata(metadata) {
1775        Some(v) => v,
1776        None => return data.to_vec(),
1777    };
1778    reverse_selective_from_data(data, &sm)
1779}
1780
1781/// Core selective reverse: reconstruct rows from selective columnar data.
1782fn reverse_selective_from_data(data: &[u8], sm: &SelectiveMetadata) -> Vec<u8> {
1783    if data.len() < 4 {
1784        return data.to_vec();
1785    }
1786
1787    // Read extracted_data_len from first 4 bytes.
1788    let extracted_data_len = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1789    if 4 + extracted_data_len > data.len() {
1790        return data.to_vec();
1791    }
1792    let extracted_section = &data[4..4 + extracted_data_len];
1793    let inline_section = &data[4 + extracted_data_len..];
1794
1795    let num_extracted = sm.extracted_col_indices.len();
1796    let num_inline = sm.num_total_cols - num_extracted;
1797
1798    // Parse extracted columns (column-major, split by \x00 then \x01).
1799    let extracted_columns: Vec<Vec<&[u8]>> = if num_extracted > 0 && !extracted_section.is_empty() {
1800        let col_chunks: Vec<&[u8]> = extracted_section.split(|&b| b == COL_SEP).collect();
1801        if col_chunks.len() != num_extracted {
1802            return data.to_vec();
1803        }
1804        let mut cols = Vec::with_capacity(num_extracted);
1805        for chunk in &col_chunks {
1806            let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1807            if vals.len() != sm.num_rows {
1808                return data.to_vec();
1809            }
1810            cols.push(vals);
1811        }
1812        cols
1813    } else if num_extracted > 0 {
1814        // extracted section is empty but we expect columns — error
1815        return data.to_vec();
1816    } else {
1817        Vec::new()
1818    };
1819
1820    // Parse inline rows (row-major, split by \x00 then \x01).
1821    let inline_rows: Vec<Vec<&[u8]>> = if num_inline > 0 && !inline_section.is_empty() {
1822        let row_chunks: Vec<&[u8]> = inline_section.split(|&b| b == COL_SEP).collect();
1823        if row_chunks.len() != sm.num_rows {
1824            return data.to_vec();
1825        }
1826        let mut rows = Vec::with_capacity(sm.num_rows);
1827        for chunk in &row_chunks {
1828            let vals: Vec<&[u8]> = if num_inline > 1 {
1829                chunk.split(|&b| b == VAL_SEP).collect()
1830            } else {
1831                vec![*chunk]
1832            };
1833            if vals.len() != num_inline {
1834                return data.to_vec();
1835            }
1836            rows.push(vals);
1837        }
1838        rows
1839    } else if num_inline > 0 {
1840        // inline section is empty but we expect inline columns — error
1841        return data.to_vec();
1842    } else {
1843        Vec::new()
1844    };
1845
1846    // Build a lookup: for each total column index, is it extracted or inline?
1847    // extracted_positions[col] = Some(index into extracted_columns)
1848    // inline_positions[col] = Some(index into inline row values)
1849    let mut extracted_positions = vec![None; sm.num_total_cols];
1850    let mut inline_positions = vec![None; sm.num_total_cols];
1851    for (ei, &col_idx) in sm.extracted_col_indices.iter().enumerate() {
1852        if (col_idx as usize) < sm.num_total_cols {
1853            extracted_positions[col_idx as usize] = Some(ei);
1854        }
1855    }
1856    let mut ii = 0;
1857    for col in 0..sm.num_total_cols {
1858        if extracted_positions[col].is_none() {
1859            inline_positions[col] = Some(ii);
1860            ii += 1;
1861        }
1862    }
1863
1864    // Reconstruct rows.
1865    let mut output = Vec::with_capacity(data.len() * 2);
1866    for row in 0..sm.num_rows {
1867        // Interleave template parts and column values.
1868        output.extend_from_slice(&sm.parts[0]);
1869        for col in 0..sm.num_total_cols {
1870            if let Some(ei) = extracted_positions[col] {
1871                output.extend_from_slice(extracted_columns[ei][row]);
1872            } else if let Some(ii_idx) = inline_positions[col] {
1873                output.extend_from_slice(inline_rows[row][ii_idx]);
1874            }
1875            output.extend_from_slice(&sm.parts[col + 1]);
1876        }
1877
1878        if row < sm.num_rows - 1 || sm.has_trailing_newline {
1879            output.push(b'\n');
1880        }
1881    }
1882
1883    output
1884}
1885
1886/// Reverse Strategy 2: grouped schema.
1887fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1888    if metadata.len() < 8 {
1889        return data.to_vec();
1890    }
1891
1892    let mut mpos = 1; // Skip version byte.
1893    let has_trailing_newline = metadata[mpos] != 0;
1894    mpos += 1;
1895    let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1896    mpos += 4;
1897    let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1898    mpos += 2;
1899
1900    // Allocate output slots.
1901    let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1902
1903    // Data cursor.
1904    let mut dpos: usize = 0;
1905
1906    for _ in 0..num_groups {
1907        // Read row indices for this group.
1908        if mpos + 4 > metadata.len() {
1909            return data.to_vec();
1910        }
1911        let group_row_count =
1912            u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1913        mpos += 4;
1914
1915        let mut row_indices = Vec::with_capacity(group_row_count);
1916        for _ in 0..group_row_count {
1917            if mpos + 4 > metadata.len() {
1918                return data.to_vec();
1919            }
1920            let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1921            mpos += 4;
1922            row_indices.push(idx);
1923        }
1924
1925        // Read group metadata.
1926        if mpos + 4 > metadata.len() {
1927            return data.to_vec();
1928        }
1929        let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1930        mpos += 4;
1931        if mpos + gm_len > metadata.len() {
1932            return data.to_vec();
1933        }
1934        let group_metadata = &metadata[mpos..mpos + gm_len];
1935        mpos += gm_len;
1936
1937        // Read group data from the data blob.
1938        if dpos + 4 > data.len() {
1939            return data.to_vec();
1940        }
1941        let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1942        dpos += 4;
1943        if dpos + gd_len > data.len() {
1944            return data.to_vec();
1945        }
1946        let group_data = &data[dpos..dpos + gd_len];
1947        dpos += gd_len;
1948
1949        // Decode this group — dispatch on per-group metadata version byte.
1950        let group_version = if group_metadata.is_empty() {
1951            0
1952        } else {
1953            group_metadata[0]
1954        };
1955
1956        if group_version == METADATA_VERSION_SELECTIVE {
1957            // Selective columnar group (version=3).
1958            let sm = match parse_selective_metadata(group_metadata) {
1959                Some(v) => v,
1960                None => return data.to_vec(),
1961            };
1962            if sm.num_rows != group_row_count {
1963                return data.to_vec();
1964            }
1965            let reconstructed = reverse_selective_from_data(group_data, &sm);
1966            // Split reconstructed into individual lines.
1967            let lines: Vec<&[u8]> = reconstructed.split(|&b| b == b'\n').collect();
1968            for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1969                if row_within_group < lines.len() && original_idx < total_rows {
1970                    output_lines[original_idx] = Some(lines[row_within_group].to_vec());
1971                }
1972            }
1973        } else {
1974            // Standard uniform group — delegate to reverse_uniform which
1975            // handles nested unflatten automatically.
1976            let reconstructed = reverse_uniform(group_data, group_metadata);
1977            let lines: Vec<&[u8]> = reconstructed.split(|&b| b == b'\n').collect();
1978            for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1979                if row_within_group < lines.len() && original_idx < total_rows {
1980                    output_lines[original_idx] = Some(lines[row_within_group].to_vec());
1981                }
1982            }
1983        }
1984    }
1985
1986    // Read residual indices from metadata.
1987    if mpos + 4 > metadata.len() {
1988        return data.to_vec();
1989    }
1990    let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1991    mpos += 4;
1992
1993    let mut residual_indices = Vec::with_capacity(residual_count);
1994    for _ in 0..residual_count {
1995        if mpos + 4 > metadata.len() {
1996            return data.to_vec();
1997        }
1998        let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1999        mpos += 4;
2000        residual_indices.push(idx);
2001    }
2002
2003    // Remaining data is residual lines.
2004    let residual_data = &data[dpos..];
2005    if residual_count > 0 {
2006        let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
2007            vec![]
2008        } else {
2009            residual_data.split(|&b| b == b'\n').collect()
2010        };
2011        // There should be exactly residual_count lines.
2012        if residual_lines.len() != residual_count {
2013            return data.to_vec();
2014        }
2015        for (i, &idx) in residual_indices.iter().enumerate() {
2016            if idx < total_rows {
2017                output_lines[idx] = Some(residual_lines[i].to_vec());
2018            }
2019        }
2020    }
2021
2022    // Assemble final output.
2023    let mut output = Vec::with_capacity(data.len() * 2);
2024    for (i, slot) in output_lines.iter().enumerate() {
2025        match slot {
2026            Some(line) => output.extend_from_slice(line),
2027            None => {
2028                // Should not happen — missing row. Return data as-is.
2029                return data.to_vec();
2030            }
2031        }
2032        if i < total_rows - 1 || has_trailing_newline {
2033            output.push(b'\n');
2034        }
2035    }
2036
2037    output
2038}
2039
2040#[cfg(test)]
2041mod tests {
2042    use super::*;
2043
2044    #[test]
2045    fn extract_value_string() {
2046        let line = br#""hello","next""#;
2047        let (val, end) = extract_value(line, 0).unwrap();
2048        assert_eq!(val, b"\"hello\"");
2049        assert_eq!(end, 7);
2050    }
2051
2052    #[test]
2053    fn extract_value_number() {
2054        let line = b"42,next";
2055        let (val, end) = extract_value(line, 0).unwrap();
2056        assert_eq!(val, b"42");
2057        assert_eq!(end, 2);
2058    }
2059
2060    #[test]
2061    fn extract_value_bool() {
2062        let line = b"true,next";
2063        let (val, end) = extract_value(line, 0).unwrap();
2064        assert_eq!(val, b"true");
2065        assert_eq!(end, 4);
2066    }
2067
2068    #[test]
2069    fn extract_value_null() {
2070        let line = b"null,next";
2071        let (val, end) = extract_value(line, 0).unwrap();
2072        assert_eq!(val, b"null");
2073        assert_eq!(end, 4);
2074    }
2075
2076    #[test]
2077    fn extract_value_object() {
2078        let line = br#"{"a":1,"b":"x"},next"#;
2079        let (val, end) = extract_value(line, 0).unwrap();
2080        assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
2081        assert_eq!(end, 15);
2082    }
2083
2084    #[test]
2085    fn extract_value_array() {
2086        let line = b"[1,2,3],next";
2087        let (val, end) = extract_value(line, 0).unwrap();
2088        assert_eq!(val, b"[1,2,3]");
2089        assert_eq!(end, 7);
2090    }
2091
2092    #[test]
2093    fn extract_value_string_with_escapes() {
2094        let line = br#""he\"llo",next"#;
2095        let (val, end) = extract_value(line, 0).unwrap();
2096        assert_eq!(val, br#""he\"llo""#.to_vec());
2097        assert_eq!(end, 9);
2098    }
2099
2100    #[test]
2101    fn parse_line_simple() {
2102        let line = br#"{"a":1,"b":"x"}"#;
2103        let (parts, values) = parse_line(line).unwrap();
2104        assert_eq!(parts.len(), 3); // {"a": , ,"b": , }
2105        assert_eq!(values.len(), 2);
2106        assert_eq!(values[0], b"1");
2107        assert_eq!(values[1], b"\"x\"");
2108        assert_eq!(parts[0], br#"{"a":"#.to_vec());
2109        assert_eq!(parts[1], br#","b":"#.to_vec());
2110        assert_eq!(parts[2], b"}");
2111    }
2112
2113    #[test]
2114    fn roundtrip_simple() {
2115        let data = br#"{"a":1,"b":"x"}
2116{"a":2,"b":"y"}
2117{"a":3,"b":"z"}
2118"#;
2119        let result = preprocess(data).expect("should produce transform");
2120        let restored = reverse(&result.data, &result.metadata);
2121        assert_eq!(
2122            String::from_utf8_lossy(&restored),
2123            String::from_utf8_lossy(data),
2124        );
2125        assert_eq!(restored, data.to_vec());
2126    }
2127
2128    #[test]
2129    fn roundtrip_no_trailing_newline() {
2130        let data = br#"{"a":1,"b":"x"}
2131{"a":2,"b":"y"}
2132{"a":3,"b":"z"}"#;
2133        let result = preprocess(data).expect("should produce transform");
2134        let restored = reverse(&result.data, &result.metadata);
2135        assert_eq!(restored, data.to_vec());
2136    }
2137
2138    #[test]
2139    fn roundtrip_nested_values() {
2140        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2141{"id":2,"meta":{"x":30,"y":40}}
2142{"id":3,"meta":{"x":50,"y":60}}
2143{"id":4,"meta":{"x":70,"y":80}}
2144{"id":5,"meta":{"x":90,"y":100}}
2145"#;
2146        let result = preprocess(data).expect("should produce transform");
2147        let restored = reverse(&result.data, &result.metadata);
2148        assert_eq!(restored, data.to_vec());
2149    }
2150
2151    #[test]
2152    fn roundtrip_mixed_types() {
2153        let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
2154{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
2155{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
2156{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
2157{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
2158"#;
2159        let result = preprocess(data).expect("should produce transform");
2160        let restored = reverse(&result.data, &result.metadata);
2161        assert_eq!(restored, data.to_vec());
2162    }
2163
2164    #[test]
2165    fn schema_mismatch_too_few_returns_none() {
2166        // Different keys on different lines, but each group has < MIN_GROUP_ROWS.
2167        let data = br#"{"a":1,"b":2}
2168{"a":1,"c":3}
2169"#;
2170        assert!(preprocess(data).is_none());
2171    }
2172
2173    #[test]
2174    fn different_num_keys_too_few_returns_none() {
2175        let data = br#"{"a":1,"b":2}
2176{"a":1}
2177"#;
2178        assert!(preprocess(data).is_none());
2179    }
2180
2181    #[test]
2182    fn single_line_returns_none() {
2183        let data = br#"{"a":1,"b":2}
2184"#;
2185        assert!(preprocess(data).is_none());
2186    }
2187
2188    #[test]
2189    fn empty_returns_none() {
2190        assert!(preprocess(b"").is_none());
2191    }
2192
2193    #[test]
2194    fn column_layout_groups_similar_values() {
2195        let data = br#"{"type":"page_view","user":"alice"}
2196{"type":"api_call","user":"alice"}
2197{"type":"click","user":"bob"}
2198"#;
2199        let result = preprocess(data).unwrap();
2200
2201        // The columnar data should have type values grouped, then user values grouped.
2202        let col_data = &result.data;
2203        let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
2204        assert_eq!(cols.len(), 2);
2205
2206        // Column 0 = type values.
2207        let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
2208        assert_eq!(type_vals.len(), 3);
2209        assert_eq!(type_vals[0], br#""page_view""#);
2210        assert_eq!(type_vals[1], br#""api_call""#);
2211        assert_eq!(type_vals[2], br#""click""#);
2212
2213        // Column 1 = user values.
2214        let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2215        assert_eq!(user_vals.len(), 3);
2216        assert_eq!(user_vals[0], br#""alice""#);
2217        assert_eq!(user_vals[1], br#""alice""#);
2218        assert_eq!(user_vals[2], br#""bob""#);
2219    }
2220
2221    #[test]
2222    fn roundtrip_string_with_escaped_chars() {
2223        let data = br#"{"msg":"he said \"hi\"","val":1}
2224{"msg":"line\nbreak","val":2}
2225{"msg":"tab\there","val":3}
2226{"msg":"back\\slash","val":4}
2227{"msg":"normal text","val":5}
2228"#;
2229        let result = preprocess(data).expect("should produce transform");
2230        let restored = reverse(&result.data, &result.metadata);
2231        assert_eq!(restored, data.to_vec());
2232    }
2233
2234    #[test]
2235    fn roundtrip_negative_and_float_numbers() {
2236        let data = br#"{"x":-3.14,"y":0}
2237{"x":2.718,"y":-1}
2238{"x":0.001,"y":999}
2239{"x":-100,"y":-200}
2240{"x":42.0,"y":7}
2241"#;
2242        let result = preprocess(data).expect("should produce transform");
2243        let restored = reverse(&result.data, &result.metadata);
2244        assert_eq!(restored, data.to_vec());
2245    }
2246
2247    /// Test that the transform+reverse is lossless even for tiny inputs
2248    /// by repeating them to pass the size check threshold.
2249    #[test]
2250    fn reverse_roundtrip_small_data() {
2251        // Verify parse_line works on small lines.
2252        let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
2253        assert_eq!(vals.len(), 2);
2254        assert_eq!(parts.len(), 3);
2255
2256        // 2-row data might fail the size check, so repeat to get enough rows.
2257        let big_data = br#"{"x":-3.14,"y":0}
2258{"x":2.718,"y":-1}
2259"#
2260        .repeat(20);
2261        let result = preprocess(&big_data).expect("should produce transform with 40 rows");
2262        let restored = reverse(&result.data, &result.metadata);
2263        assert_eq!(restored, big_data);
2264    }
2265
2266    // --- Strategy 2 (grouped) tests ---
2267
2268    #[test]
2269    fn grouped_roundtrip_two_schemas() {
2270        // Two different schemas, each with >= MIN_GROUP_ROWS rows.
2271        let mut data = Vec::new();
2272        for i in 0..10 {
2273            data.extend_from_slice(
2274                format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
2275            );
2276            data.push(b'\n');
2277        }
2278        for i in 10..20 {
2279            data.extend_from_slice(
2280                format!(
2281                    r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
2282                    i, i, i
2283                )
2284                .as_bytes(),
2285            );
2286            data.push(b'\n');
2287        }
2288        let result = preprocess(&data).expect("should produce grouped transform");
2289        // Should be version 2 (grouped).
2290        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2291        let restored = reverse(&result.data, &result.metadata);
2292        assert_eq!(restored, data);
2293    }
2294
2295    #[test]
2296    fn grouped_roundtrip_interleaved_schemas() {
2297        // Interleaved schemas: alternating between two different key sets.
2298        let mut data = Vec::new();
2299        for i in 0..20 {
2300            if i % 2 == 0 {
2301                data.extend_from_slice(
2302                    format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
2303                );
2304            } else {
2305                data.extend_from_slice(
2306                    format!(
2307                        r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
2308                        i, i, i
2309                    )
2310                    .as_bytes(),
2311                );
2312            }
2313            data.push(b'\n');
2314        }
2315        let result = preprocess(&data).expect("should produce grouped transform");
2316        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2317        let restored = reverse(&result.data, &result.metadata);
2318        assert_eq!(restored, data);
2319    }
2320
2321    #[test]
2322    fn grouped_roundtrip_with_residuals() {
2323        // Two large groups + a few unique-schema rows (residuals).
2324        let mut data = Vec::new();
2325        // Group A: 8 rows.
2326        for i in 0..8 {
2327            data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
2328            data.push(b'\n');
2329        }
2330        // 2 unique rows (will be residual).
2331        data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
2332        data.push(b'\n');
2333        data.extend_from_slice(br#"{"p":"q"}"#);
2334        data.push(b'\n');
2335        // Group B: 6 rows.
2336        for i in 0..6 {
2337            data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
2338            data.push(b'\n');
2339        }
2340        let result = preprocess(&data).expect("should produce grouped transform");
2341        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2342        let restored = reverse(&result.data, &result.metadata);
2343        assert_eq!(
2344            String::from_utf8_lossy(&restored),
2345            String::from_utf8_lossy(&data),
2346        );
2347        assert_eq!(restored, data);
2348    }
2349
2350    #[test]
2351    fn grouped_roundtrip_no_trailing_newline() {
2352        let mut data = Vec::new();
2353        for i in 0..6 {
2354            data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
2355            data.push(b'\n');
2356        }
2357        for i in 0..6 {
2358            data.extend_from_slice(
2359                format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
2360            );
2361            if i < 5 {
2362                data.push(b'\n');
2363            }
2364            // Last line: no trailing newline.
2365        }
2366        let result = preprocess(&data).expect("should produce grouped transform");
2367        let restored = reverse(&result.data, &result.metadata);
2368        assert_eq!(restored, data);
2369    }
2370
2371    #[test]
2372    fn uniform_still_preferred_over_grouped() {
2373        // All rows same schema — should use Strategy 1 (version 1), not Strategy 2.
2374        let data = br#"{"a":1,"b":"x"}
2375{"a":2,"b":"y"}
2376{"a":3,"b":"z"}
2377{"a":4,"b":"w"}
2378{"a":5,"b":"v"}
2379"#;
2380        let result = preprocess(data).expect("should produce transform");
2381        assert_eq!(
2382            result.metadata[0], METADATA_VERSION_UNIFORM,
2383            "uniform schema should use Strategy 1"
2384        );
2385        let restored = reverse(&result.data, &result.metadata);
2386        assert_eq!(restored, data.to_vec());
2387    }
2388
2389    #[test]
2390    fn grouped_gharchive_simulation() {
2391        // Simulates GitHub Archive: most rows have 7 keys, some have 8.
2392        let mut data = Vec::new();
2393        for i in 0..50 {
2394            if i % 5 == 0 {
2395                // 8-key rows (with org).
2396                data.extend_from_slice(
2397                    format!(
2398                        r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
2399                        i, i, i, i
2400                    )
2401                    .as_bytes(),
2402                );
2403            } else {
2404                // 7-key rows (no org).
2405                data.extend_from_slice(
2406                    format!(
2407                        r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
2408                        i, i, i
2409                    )
2410                    .as_bytes(),
2411                );
2412            }
2413            data.push(b'\n');
2414        }
2415        let result = preprocess(&data).expect("should produce grouped transform");
2416        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2417        let restored = reverse(&result.data, &result.metadata);
2418        assert_eq!(restored, data);
2419    }
2420
2421    #[test]
2422    fn grouped_nested_flatten_per_group() {
2423        // Groups with nested objects should get per-group nested flatten.
2424        // Two schemas (with/without extra field), nested objects in both.
2425        let mut data = Vec::new();
2426        for i in 0..30 {
2427            if i % 3 == 0 {
2428                // Schema B: 4 keys including nested + extra.
2429                data.extend_from_slice(
2430                    format!(
2431                        r#"{{"id":{},"info":{{"a":{},"b":{}}},"tag":"b","extra":"yes"}}"#,
2432                        i,
2433                        i * 10,
2434                        i * 20
2435                    )
2436                    .as_bytes(),
2437                );
2438            } else {
2439                // Schema A: 3 keys including nested.
2440                data.extend_from_slice(
2441                    format!(
2442                        r#"{{"id":{},"info":{{"a":{},"b":{}}},"tag":"a"}}"#,
2443                        i,
2444                        i * 10,
2445                        i * 20
2446                    )
2447                    .as_bytes(),
2448                );
2449            }
2450            data.push(b'\n');
2451        }
2452        let result = preprocess(&data).expect("should produce grouped transform");
2453        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2454        let restored = reverse(&result.data, &result.metadata);
2455        assert_eq!(restored, data, "grouped nested flatten roundtrip failed");
2456    }
2457
2458    #[test]
2459    fn grouped_discriminator_two_pass() {
2460        // Discriminator sub-grouping should be tried and the smaller result picked.
2461        // Build data with a discriminator column ("type") and varying nested payloads.
2462        let mut data = Vec::new();
2463        for i in 0..60 {
2464            let etype = if i % 2 == 0 { "push" } else { "create" };
2465            data.extend_from_slice(
2466                format!(
2467                    r#"{{"id":{},"type":"{}","payload":{{"ref":"r{}","size":{}}}}}"#,
2468                    i,
2469                    etype,
2470                    i,
2471                    i * 10
2472                )
2473                .as_bytes(),
2474            );
2475            data.push(b'\n');
2476        }
2477        let result = preprocess(&data).expect("should produce transform");
2478        let restored = reverse(&result.data, &result.metadata);
2479        assert_eq!(restored, data, "discriminator two-pass roundtrip failed");
2480    }
2481
2482    // --- Nested decomposition tests ---
2483
2484    #[test]
2485    fn test_nested_decomposition_basic() {
2486        // Simple nested object decomposed correctly.
2487        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2488{"id":2,"meta":{"x":30,"y":40}}
2489{"id":3,"meta":{"x":50,"y":60}}
2490"#;
2491        let result = preprocess(data).expect("should produce transform");
2492        assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
2493
2494        // The columnar data should have expanded columns.
2495        let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
2496        // Original: 2 cols (id, meta). After flattening: 3 cols (id, meta.x, meta.y).
2497        assert_eq!(
2498            cols.len(),
2499            3,
2500            "should have 3 columns after flattening: got {}",
2501            cols.len()
2502        );
2503
2504        // Verify sub-columns contain the extracted values.
2505        let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2506        assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
2507
2508        let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
2509        assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
2510    }
2511
2512    #[test]
2513    fn test_nested_roundtrip() {
2514        // Flatten -> unflatten produces byte-exact original.
2515        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2516{"id":2,"meta":{"x":30,"y":40}}
2517{"id":3,"meta":{"x":50,"y":60}}
2518"#;
2519        let result = preprocess(data).expect("should produce transform");
2520        let restored = reverse(&result.data, &result.metadata);
2521        assert_eq!(
2522            String::from_utf8_lossy(&restored),
2523            String::from_utf8_lossy(data),
2524        );
2525        assert_eq!(restored, data.to_vec());
2526    }
2527
2528    #[test]
2529    fn test_nested_mixed_schemas() {
2530        // Different nested objects per row (some keys missing -> null).
2531        let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
2532{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
2533{"ts":"c","meta":{"query":"pricing","results_count":25}}
2534{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
2535{"ts":"e","meta":{"query":"api docs","results_count":41}}
2536"#;
2537        let result = preprocess(data).expect("should produce transform");
2538        let restored = reverse(&result.data, &result.metadata);
2539        assert_eq!(
2540            String::from_utf8_lossy(&restored),
2541            String::from_utf8_lossy(data),
2542        );
2543        assert_eq!(restored, data.to_vec());
2544    }
2545
2546    #[test]
2547    fn test_nested_no_nested_objects() {
2548        // Returns None when no nested objects — flat data should still work.
2549        let data = br#"{"a":1,"b":"x"}
2550{"a":2,"b":"y"}
2551{"a":3,"b":"z"}
2552"#;
2553        let result = preprocess(data).expect("should produce transform");
2554        let restored = reverse(&result.data, &result.metadata);
2555        assert_eq!(restored, data.to_vec());
2556
2557        // Verify the metadata has has_nested=0 since no nested objects.
2558        // The nested flag is appended after template parts.
2559        // For uniform, metadata starts with version(1) + num_rows(4) + num_cols(2) +
2560        // trailing_newline(1) + num_parts(2) + parts.
2561        // After those parts, there should be a 0 byte (has_nested=0).
2562        let meta = &result.metadata;
2563        let last_byte = meta[meta.len() - 1];
2564        assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
2565    }
2566
2567    #[test]
2568    fn test_nested_real_corpus() {
2569        // Test with data shaped like the test-ndjson.ndjson corpus.
2570        let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
2571{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
2572{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
2573{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
2574{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
2575"#;
2576        let result = preprocess(data).expect("should produce transform");
2577        let restored = reverse(&result.data, &result.metadata);
2578        assert_eq!(
2579            String::from_utf8_lossy(&restored),
2580            String::from_utf8_lossy(data),
2581        );
2582        assert_eq!(restored, data.to_vec());
2583    }
2584
2585    #[test]
2586    fn test_nested_roundtrip_with_null_values() {
2587        // Some rows have null for the nested field.
2588        let data = br#"{"id":1,"meta":{"x":10}}
2589{"id":2,"meta":null}
2590{"id":3,"meta":{"x":30}}
2591{"id":4,"meta":null}
2592{"id":5,"meta":{"x":50}}
2593"#;
2594        let result = preprocess(data).expect("should produce transform");
2595        let restored = reverse(&result.data, &result.metadata);
2596        assert_eq!(restored, data.to_vec());
2597    }
2598
2599    #[test]
2600    fn test_nested_string_values_preserved_exact() {
2601        // Verify that string values in nested objects preserve exact bytes (with quotes).
2602        let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2603{"id":2,"meta":{"name":"Bob","score":200}}
2604{"id":3,"meta":{"name":"Charlie","score":300}}
2605"#;
2606        let result = preprocess(data).expect("should produce transform");
2607        let restored = reverse(&result.data, &result.metadata);
2608        assert_eq!(restored, data.to_vec());
2609    }
2610
2611    #[test]
2612    fn test_parse_nested_object_kv() {
2613        let obj = br#"{"query":"benchmark","results_count":14}"#;
2614        let pairs = parse_nested_object_kv(obj).unwrap();
2615        assert_eq!(pairs.len(), 2);
2616        assert_eq!(pairs[0].0, b"query");
2617        assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2618        assert_eq!(pairs[1].0, b"results_count");
2619        assert_eq!(pairs[1].1, b"14");
2620    }
2621
2622    #[test]
2623    fn test_nested_varying_subkeys_roundtrip() {
2624        // Regression: rows with varying sub-keys in nested objects must
2625        // round-trip byte-exact. Even rows have `extra`, odd rows don't.
2626        let mut lines = Vec::new();
2627        for i in 0..50 {
2628            let line = if i % 2 == 0 {
2629                format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
2630            } else {
2631                format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
2632            };
2633            lines.push(line);
2634        }
2635        let ndjson = lines.join("\n") + "\n";
2636        let data = ndjson.as_bytes();
2637
2638        let result = preprocess(data).expect("should produce transform");
2639        let restored = reverse(&result.data, &result.metadata);
2640        assert_eq!(
2641            std::str::from_utf8(&restored).unwrap(),
2642            std::str::from_utf8(data).unwrap(),
2643            "varying sub-keys roundtrip must be byte-exact"
2644        );
2645    }
2646
2647    #[test]
2648    fn test_nested_explicit_null_preserved() {
2649        // Explicit null values in nested objects must survive roundtrip.
2650        // `{"x":1,"y":null}` must NOT be collapsed to `{"x":1}`.
2651        let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
2652                     {\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
2653                     {\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
2654        let result = preprocess(data).expect("should produce transform");
2655        let restored = reverse(&result.data, &result.metadata);
2656        assert_eq!(
2657            std::str::from_utf8(&restored).unwrap(),
2658            std::str::from_utf8(data).unwrap(),
2659            "explicit null values must be preserved"
2660        );
2661    }
2662
2663    #[test]
2664    fn null_heavy_30_rows_roundtrip() {
2665        // Regression test: 30 rows with all-null column caused CRC mismatch.
2666        let mut data = Vec::new();
2667        for i in 0..30 {
2668            data.extend_from_slice(format!("{{\"id\":{},\"val\":null}}\n", i).as_bytes());
2669        }
2670        let result = preprocess(&data);
2671        if let Some(result) = result {
2672            let restored = reverse(&result.data, &result.metadata);
2673            assert_eq!(
2674                restored,
2675                data,
2676                "null-heavy 30-row roundtrip failed.\nOriginal len={}, Restored len={}\nOrig first 200: {:?}\nRest first 200: {:?}",
2677                data.len(),
2678                restored.len(),
2679                String::from_utf8_lossy(&data[..data.len().min(200)]),
2680                String::from_utf8_lossy(&restored[..restored.len().min(200)])
2681            );
2682        }
2683    }
2684
2685    #[test]
2686    fn null_heavy_60_rows_roundtrip() {
2687        // Regression test: 60 rows with multiple all-null columns.
2688        let mut data = Vec::new();
2689        for i in 0..60 {
2690            let name = if i % 10 == 0 {
2691                format!("\"user_{}\"", i)
2692            } else {
2693                "null".to_string()
2694            };
2695            data.extend_from_slice(
2696                format!("{{\"id\":{},\"name\":{},\"email\":null,\"score\":null,\"active\":null,\"tags\":null}}\n", i, name).as_bytes(),
2697            );
2698        }
2699        let result = preprocess(&data);
2700        if let Some(result) = result {
2701            let restored = reverse(&result.data, &result.metadata);
2702            assert_eq!(restored, data, "null-heavy 60-row ndjson roundtrip failed");
2703        }
2704    }
2705
2706    #[test]
2707    fn selective_columnar_roundtrip() {
2708        // Data with one low-cardinality column (type) and one high-cardinality + long column (payload).
2709        let mut data = Vec::new();
2710        for i in 0..50 {
2711            let event_type = match i % 3 {
2712                0 => "push",
2713                1 => "pull_request",
2714                _ => "create",
2715            };
2716            // Payload is unique per row and > 128 bytes avg to trigger selective.
2717            let payload = format!(
2718                "{{\"commits\":[{{\"sha\":\"abc{:04}def\",\"message\":\"commit message number {} with extra text to make it long enough for selective columnar threshold of 128 bytes average value length\"}}]}}",
2719                i, i
2720            );
2721            data.extend_from_slice(
2722                format!(
2723                    "{{\"id\":{},\"type\":\"{}\",\"payload\":{}}}\n",
2724                    i, event_type, payload
2725                )
2726                .as_bytes(),
2727            );
2728        }
2729        let result = preprocess(&data).expect("should preprocess");
2730        let restored = reverse(&result.data, &result.metadata);
2731        assert_eq!(restored, data, "selective columnar roundtrip failed");
2732    }
2733
2734    #[test]
2735    fn selective_columnar_uses_version3() {
2736        // Verify that high-cardinality + long columns trigger version=3.
2737        // Payload must exceed SELECTIVE_MIN_AVG_LEN (128) AND cardinality > 0.7.
2738        let mut data = Vec::new();
2739        for i in 0..50 {
2740            let payload = format!(
2741                "{{\"data\":\"unique_payload_{:04}\",\"extra\":\"padding_text_to_make_this_value_long_enough_to_exceed_the_128_byte_threshold_for_selective_columnar_detection_{:04}\"}}",
2742                i,
2743                i * 7
2744            );
2745            data.extend_from_slice(
2746                format!("{{\"type\":\"event\",\"payload\":{}}}\n", payload).as_bytes(),
2747            );
2748        }
2749        let result = preprocess(&data).expect("should preprocess");
2750        // Version byte should be 3 (selective).
2751        assert_eq!(
2752            result.metadata[0], METADATA_VERSION_SELECTIVE,
2753            "expected selective columnar (version=3), got version={}",
2754            result.metadata[0]
2755        );
2756        let restored = reverse(&result.data, &result.metadata);
2757        assert_eq!(restored, data, "selective columnar v3 roundtrip failed");
2758    }
2759
2760    #[test]
2761    fn selective_grouped_roundtrip() {
2762        // Two schema groups, each with a high-cardinality column (>128 bytes avg).
2763        let mut data = Vec::new();
2764        for i in 0..30 {
2765            let payload = format!(
2766                "{{\"sha\":\"hash_{:04}\",\"msg\":\"unique commit message number {} that is quite long and needs to exceed the 128 byte threshold for selective columnar to activate on this column\"}}",
2767                i, i
2768            );
2769            data.extend_from_slice(
2770                format!(
2771                    "{{\"id\":{},\"type\":\"push\",\"payload\":{}}}\n",
2772                    i, payload
2773                )
2774                .as_bytes(),
2775            );
2776        }
2777        for i in 0..20 {
2778            let body = format!(
2779                "{{\"title\":\"PR title {}\",\"body\":\"This is a long unique pull request body number {} with details and extra text to exceed the 128 byte threshold for the selective columnar transform\"}}",
2780                i, i
2781            );
2782            data.extend_from_slice(
2783                format!(
2784                    "{{\"id\":{},\"type\":\"pr\",\"payload\":{},\"org\":\"myorg\"}}\n",
2785                    100 + i,
2786                    body
2787                )
2788                .as_bytes(),
2789            );
2790        }
2791        let result = preprocess(&data).expect("should preprocess");
2792        // Should use grouped (version=2 at top level).
2793        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2794        let restored = reverse(&result.data, &result.metadata);
2795        assert_eq!(restored, data, "selective grouped roundtrip failed");
2796    }
2797
2798    #[test]
2799    fn selective_all_low_cardinality_stays_uniform() {
2800        // When all columns are low-cardinality, version=1 should still be used.
2801        let mut data = Vec::new();
2802        for i in 0..50 {
2803            let status = match i % 2 {
2804                0 => "active",
2805                _ => "inactive",
2806            };
2807            data.extend_from_slice(
2808                format!("{{\"type\":\"event\",\"status\":\"{}\"}}\n", status).as_bytes(),
2809            );
2810        }
2811        let result = preprocess(&data).expect("should preprocess");
2812        // Version should be 1 (uniform), not 3 (selective).
2813        assert_eq!(
2814            result.metadata[0], METADATA_VERSION_UNIFORM,
2815            "low-cardinality data should use uniform (version=1)"
2816        );
2817    }
2818
2819    #[test]
2820    fn selective_columnar_single_inline_column() {
2821        // Only one column is inline (the rest extracted).
2822        let mut data = Vec::new();
2823        for i in 0..30 {
2824            let unique_msg = format!(
2825                "A unique message for row {} with enough text to exceed the 128 byte threshold for selective columnar detection, adding padding here to be safe: extra_{:04}",
2826                i,
2827                i * 13
2828            );
2829            data.extend_from_slice(
2830                format!(
2831                    "{{\"type\":\"log\",\"level\":\"info\",\"msg\":\"{}\"}}\n",
2832                    unique_msg
2833                )
2834                .as_bytes(),
2835            );
2836        }
2837        let result = preprocess(&data).expect("should preprocess");
2838        let restored = reverse(&result.data, &result.metadata);
2839        assert_eq!(restored, data, "single-inline-column roundtrip failed");
2840    }
2841}