Skip to main content

datacortex_core/format/
ndjson.rs

1//! NDJSON columnar reorg — lossless transform that reorders row-oriented
2//! NDJSON data into column-oriented layout.
3//!
4//! Two strategies:
5//!
6//! **Strategy 1 (uniform):** All rows share the same schema (keys in same order).
7//!   Row-oriented (before):
8//!     {"ts":"2026-03-15T10:30:00.001Z","type":"page_view","user":"usr_a1b2c3d4"}
9//!     {"ts":"2026-03-15T10:30:00.234Z","type":"api_call","user":"usr_a1b2c3d4"}
10//!   Column-oriented (after):
11//!     [ts values] "2026-03-15T10:30:00.001Z" \x01 "2026-03-15T10:30:00.234Z" \x00
12//!     [type values] "page_view" \x01 "api_call" \x00
13//!     [user values] "usr_a1b2c3d4" \x01 "usr_a1b2c3d4"
14//!
15//! **Strategy 2 (grouped):** Rows have diverse schemas (e.g., GitHub Archive events).
16//!   Groups rows by schema, applies Strategy 1 per group, stores residual rows raw.
17//!   Metadata version byte distinguishes: 1 = uniform, 2 = grouped.
18//!
19//! Separators:
20//!   \x00 = column separator (cannot appear in valid JSON text)
21//!   \x01 = value separator within a column (cannot appear in valid JSON)
22
23use super::transform::TransformResult;
24use std::collections::HashMap;
25
26const COL_SEP: u8 = 0x00;
27const VAL_SEP: u8 = 0x01;
28const METADATA_VERSION_UNIFORM: u8 = 1;
29const METADATA_VERSION_GROUPED: u8 = 2;
30const METADATA_VERSION_SELECTIVE: u8 = 3;
31/// Grouped sub-group version: uniform columnar with typed encoding applied.
32/// Reserved for future use — typed encoding in grouped path doesn't improve
33/// post-zstd ratio because zstd compresses raw text equally well.
34#[allow(dead_code)]
35const METADATA_VERSION_TYPED: u8 = 4;
36
37/// Minimum rows in a schema group for it to be columnarized (not residual).
38const MIN_GROUP_ROWS: usize = 5;
39
40/// Minimum average value length for a column to be kept inline (row-major).
41/// Only large unique values (like nested JSON payloads of 100+ bytes) benefit from
42/// inline storage. Moderate-length values (metadata objects, URLs) compress well
43/// in column-major order even at high cardinality because zstd can exploit shared
44/// structural patterns between adjacent values.
45const SELECTIVE_MIN_AVG_LEN: usize = 128;
46
47/// Maximum cardinality ratio (unique/total) for a column to be extracted into columnar format.
48/// Columns above this threshold AND above `SELECTIVE_MIN_AVG_LEN` are kept inline (row-major)
49/// to preserve inter-row locality that benefits zstd/brotli compression.
50const SELECTIVE_MAX_CARDINALITY: f64 = 0.7;
51
52/// A schema group: template parts + list of (row_index, parsed_values).
53/// Uses borrowed slices to avoid allocations during parsing.
54type SchemaGroup<'a> = (Vec<&'a [u8]>, Vec<(usize, Vec<&'a [u8]>)>);
55
56/// Extract the raw value bytes from a JSON line at a given position.
57/// `pos` should point to the first byte of the value (after `:`).
58/// Returns (value_bytes, end_position).
59///
60/// Handles: strings, numbers, booleans, null, nested objects, nested arrays.
61fn extract_value(line: &[u8], mut pos: usize) -> Option<(&[u8], usize)> {
62    // Skip whitespace after colon.
63    while pos < line.len() && line[pos].is_ascii_whitespace() {
64        pos += 1;
65    }
66    if pos >= line.len() {
67        return None;
68    }
69
70    let start = pos;
71    match line[pos] {
72        b'"' => {
73            // String value — scan to closing unescaped quote.
74            pos += 1;
75            let mut escaped = false;
76            while pos < line.len() {
77                if escaped {
78                    escaped = false;
79                } else if line[pos] == b'\\' {
80                    escaped = true;
81                } else if line[pos] == b'"' {
82                    pos += 1;
83                    return Some((&line[start..pos], pos));
84                }
85                pos += 1;
86            }
87            None // Unterminated string.
88        }
89        b'{' => {
90            // Nested object — match braces, respecting strings.
91            let mut depth = 1;
92            pos += 1;
93            while pos < line.len() && depth > 0 {
94                match line[pos] {
95                    b'"' => {
96                        // Skip over string contents.
97                        pos += 1;
98                        let mut escaped = false;
99                        while pos < line.len() {
100                            if escaped {
101                                escaped = false;
102                            } else if line[pos] == b'\\' {
103                                escaped = true;
104                            } else if line[pos] == b'"' {
105                                break;
106                            }
107                            pos += 1;
108                        }
109                    }
110                    b'{' => depth += 1,
111                    b'}' => depth -= 1,
112                    _ => {}
113                }
114                pos += 1;
115            }
116            if depth != 0 || pos > line.len() {
117                return None; // Unterminated object.
118            }
119            Some((&line[start..pos], pos))
120        }
121        b'[' => {
122            // Nested array — match brackets, respecting strings.
123            let mut depth = 1;
124            pos += 1;
125            while pos < line.len() && depth > 0 {
126                match line[pos] {
127                    b'"' => {
128                        pos += 1;
129                        let mut escaped = false;
130                        while pos < line.len() {
131                            if escaped {
132                                escaped = false;
133                            } else if line[pos] == b'\\' {
134                                escaped = true;
135                            } else if line[pos] == b'"' {
136                                break;
137                            }
138                            pos += 1;
139                        }
140                    }
141                    b'[' => depth += 1,
142                    b']' => depth -= 1,
143                    _ => {}
144                }
145                pos += 1;
146            }
147            if depth != 0 || pos > line.len() {
148                return None; // Unterminated array.
149            }
150            Some((&line[start..pos], pos))
151        }
152        _ => {
153            // Number, boolean, null — scan until , or } or ] or whitespace.
154            while pos < line.len() {
155                match line[pos] {
156                    b',' | b'}' | b']' => break,
157                    _ if line[pos].is_ascii_whitespace() => break,
158                    _ => pos += 1,
159                }
160            }
161            if pos == start {
162                None
163            } else {
164                Some((&line[start..pos], pos))
165            }
166        }
167    }
168}
169
170/// Parse a single JSON line into (template_parts, values).
171///
172/// Template parts are the structural bytes between values:
173///   Part 0 = everything from { up to and including the : before value 0
174///   Part 1 = everything from after value 0 up to and including : before value 1
175///   ...
176///   Part N = everything from after the last value to end of line (including } and \n)
177///
178/// Returns None if the line is not a flat-ish JSON object (we handle nested values
179/// as opaque blobs, but the top-level structure must be key:value pairs).
180type ParsedLine<'a> = (Vec<&'a [u8]>, Vec<&'a [u8]>);
181
182fn parse_line(line: &[u8]) -> Option<ParsedLine<'_>> {
183    let mut pos = 0;
184
185    // Skip leading whitespace.
186    while pos < line.len() && line[pos].is_ascii_whitespace() {
187        pos += 1;
188    }
189    if pos >= line.len() || line[pos] != b'{' {
190        return None;
191    }
192
193    let mut parts: Vec<&[u8]> = Vec::new();
194    let mut values: Vec<&[u8]> = Vec::new();
195    let mut part_start = 0;
196
197    pos += 1; // Skip opening {.
198
199    loop {
200        // Skip whitespace.
201        while pos < line.len() && line[pos].is_ascii_whitespace() {
202            pos += 1;
203        }
204        if pos >= line.len() {
205            return None;
206        }
207
208        // Check for closing brace (end of object).
209        if line[pos] == b'}' {
210            // Capture the final part: everything from part_start to end of line.
211            parts.push(&line[part_start..]);
212            break;
213        }
214
215        // Expect a key string.
216        if line[pos] != b'"' {
217            return None;
218        }
219        // Skip over the key string.
220        pos += 1;
221        let mut escaped = false;
222        while pos < line.len() {
223            if escaped {
224                escaped = false;
225            } else if line[pos] == b'\\' {
226                escaped = true;
227            } else if line[pos] == b'"' {
228                pos += 1;
229                break;
230            }
231            pos += 1;
232        }
233
234        // Skip whitespace, expect colon.
235        while pos < line.len() && line[pos].is_ascii_whitespace() {
236            pos += 1;
237        }
238        if pos >= line.len() || line[pos] != b':' {
239            return None;
240        }
241        pos += 1; // Skip colon.
242
243        // Skip whitespace after colon so it becomes part of the template.
244        // Without this, spaces in "key": value would be lost during reverse
245        // because extract_value also skips leading whitespace.
246        while pos < line.len() && line[pos].is_ascii_whitespace() {
247            pos += 1;
248        }
249
250        // Everything from part_start up to here is a "template part".
251        parts.push(&line[part_start..pos]);
252
253        // Extract the value.
254        let (value, value_end) = extract_value(line, pos)?;
255        values.push(value);
256        pos = value_end;
257
258        // Mark the start of the next part.
259        part_start = pos;
260
261        // Skip whitespace.
262        while pos < line.len() && line[pos].is_ascii_whitespace() {
263            pos += 1;
264        }
265        if pos >= line.len() {
266            return None;
267        }
268
269        // Expect comma or closing brace.
270        if line[pos] == b',' {
271            pos += 1;
272        } else if line[pos] == b'}' {
273            // Will be caught at the top of the loop next iteration — but we
274            // need to NOT advance pos, so the } check above catches it.
275            // Actually, let's just handle it here.
276            parts.push(&line[part_start..]);
277            break;
278        } else {
279            return None; // Unexpected character.
280        }
281    }
282
283    if values.is_empty() {
284        return None;
285    }
286
287    Some((parts, values))
288}
289
290/// Split NDJSON data into lines (without newline characters).
291fn split_lines(data: &[u8]) -> Vec<&[u8]> {
292    let mut lines: Vec<&[u8]> = Vec::new();
293    let mut start = 0;
294    for pos in memchr::memchr_iter(b'\n', data) {
295        lines.push(&data[start..pos]);
296        start = pos + 1;
297    }
298    if start < data.len() {
299        lines.push(&data[start..]);
300    }
301    lines
302}
303
304/// Build columnar data from parsed lines that share the same template.
305/// Returns (col_data, metadata) for a uniform group.
306fn build_uniform_columnar(
307    template_parts: &[&[u8]],
308    columns: &[Vec<&[u8]>],
309    num_rows: usize,
310    has_trailing_newline: bool,
311) -> (Vec<u8>, Vec<u8>) {
312    let num_cols = columns.len();
313
314    // Build column data: values separated by \x01, columns separated by \x00.
315    let mut col_data = Vec::new();
316    for (ci, col) in columns.iter().enumerate() {
317        for (ri, val) in col.iter().enumerate() {
318            col_data.extend_from_slice(val);
319            if ri < num_rows - 1 {
320                col_data.push(VAL_SEP);
321            }
322        }
323        if ci < num_cols - 1 {
324            col_data.push(COL_SEP);
325        }
326    }
327
328    // Build metadata: version + num_rows + num_cols + trailing_newline + template parts.
329    let mut metadata = Vec::new();
330    metadata.push(METADATA_VERSION_UNIFORM);
331    metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
332    metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
333    metadata.push(if has_trailing_newline { 1 } else { 0 });
334    metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
335    for part in template_parts {
336        metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
337        metadata.extend_from_slice(part);
338    }
339
340    (col_data, metadata)
341}
342
343/// Analyze columns and classify as extract (columnar) or inline (row-major).
344/// Returns a bitmask: `true` = extract (column-major), `false` = inline (row-major).
345/// High-cardinality columns with long average values are kept inline to preserve
346/// inter-row locality that benefits zstd/brotli compression.
347fn classify_columns(columns: &[Vec<&[u8]>], num_rows: usize) -> Vec<bool> {
348    use std::collections::HashSet;
349    columns
350        .iter()
351        .map(|col_values| {
352            if num_rows < 10 {
353                return true; // Too few rows to measure cardinality meaningfully
354            }
355            let unique: HashSet<&[u8]> = col_values.iter().copied().collect();
356            let cardinality_ratio = unique.len() as f64 / num_rows as f64;
357            let avg_len = col_values.iter().map(|v| v.len()).sum::<usize>() / num_rows;
358            // Inline only if BOTH high-cardinality AND long values
359            !(cardinality_ratio > SELECTIVE_MAX_CARDINALITY && avg_len >= SELECTIVE_MIN_AVG_LEN)
360        })
361        .collect()
362}
363
364/// Build selective columnar data: low-cardinality columns in column-major order,
365/// high-cardinality columns in row-major order.
366///
367/// Data layout:
368///   [extracted_data_len: u32 LE]
369///   [extracted columnar: col vals separated by \x01, columns by \x00]
370///   [inline row data: per-row inline vals separated by \x01, rows by \x00]
371///
372/// Metadata (version=3):
373///   version(3) + num_rows + num_total_cols + trailing_newline +
374///   num_extracted + extracted_col_indices + template_parts
375fn build_selective_columnar(
376    template_parts: &[&[u8]],
377    columns: &[Vec<&[u8]>],
378    extract_mask: &[bool],
379    num_rows: usize,
380    has_trailing_newline: bool,
381) -> (Vec<u8>, Vec<u8>) {
382    let num_total_cols = columns.len();
383
384    let extracted_indices: Vec<u16> = (0..num_total_cols)
385        .filter(|&i| extract_mask[i])
386        .map(|i| i as u16)
387        .collect();
388    let inline_indices: Vec<u16> = (0..num_total_cols)
389        .filter(|&i| !extract_mask[i])
390        .map(|i| i as u16)
391        .collect();
392
393    // Build extracted columnar data (column-major).
394    let mut extracted_data = Vec::new();
395    for (ei, &col_idx) in extracted_indices.iter().enumerate() {
396        let col = &columns[col_idx as usize];
397        for (ri, val) in col.iter().enumerate() {
398            extracted_data.extend_from_slice(val);
399            if ri < num_rows - 1 {
400                extracted_data.push(VAL_SEP);
401            }
402        }
403        if ei < extracted_indices.len() - 1 {
404            extracted_data.push(COL_SEP);
405        }
406    }
407
408    // Build inline row data (row-major).
409    let mut inline_data = Vec::new();
410    if !inline_indices.is_empty() {
411        #[allow(clippy::needless_range_loop)]
412        for row in 0..num_rows {
413            for (ii, &col_idx) in inline_indices.iter().enumerate() {
414                inline_data.extend_from_slice(columns[col_idx as usize][row]);
415                if ii < inline_indices.len() - 1 {
416                    inline_data.push(VAL_SEP);
417                }
418            }
419            if row < num_rows - 1 {
420                inline_data.push(COL_SEP);
421            }
422        }
423    }
424
425    // Combined data blob.
426    let mut data = Vec::with_capacity(4 + extracted_data.len() + inline_data.len());
427    data.extend_from_slice(&(extracted_data.len() as u32).to_le_bytes());
428    data.extend_from_slice(&extracted_data);
429    data.extend_from_slice(&inline_data);
430
431    // Build metadata (version=3).
432    let mut metadata = Vec::new();
433    metadata.push(METADATA_VERSION_SELECTIVE);
434    metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
435    metadata.extend_from_slice(&(num_total_cols as u16).to_le_bytes());
436    metadata.push(if has_trailing_newline { 1 } else { 0 });
437    metadata.extend_from_slice(&(extracted_indices.len() as u16).to_le_bytes());
438    for &idx in &extracted_indices {
439        metadata.extend_from_slice(&idx.to_le_bytes());
440    }
441    metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
442    for part in template_parts {
443        metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
444        metadata.extend_from_slice(part);
445    }
446
447    (data, metadata)
448}
449
450/// Strategy 1: Uniform schema — all rows must have the same template.
451/// Returns None if schemas differ.
452fn preprocess_uniform(
453    non_empty: &[&[u8]],
454    has_trailing_newline: bool,
455) -> Option<(Vec<u8>, Vec<u8>)> {
456    if non_empty.len() < 2 {
457        return None;
458    }
459
460    let (template_parts, first_values) = parse_line(non_empty[0])?;
461    let num_cols = first_values.len();
462    if template_parts.len() != num_cols + 1 {
463        return None;
464    }
465
466    let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
467    for v in &first_values {
468        columns.push(vec![*v]);
469    }
470
471    for &line in &non_empty[1..] {
472        let (parts, values) = parse_line(line)?;
473        if values.len() != num_cols || parts.len() != template_parts.len() {
474            return None;
475        }
476        for (a, b) in parts.iter().zip(template_parts.iter()) {
477            if a != b {
478                return None;
479            }
480        }
481        for (col, val) in values.iter().enumerate() {
482            columns[col].push(*val);
483        }
484    }
485
486    // Check if selective columnar should be used (some columns high-cardinality).
487    // Selective columnar keeps high-cardinality columns in row-major order,
488    // preserving inter-row locality that benefits downstream compression (zstd/brotli).
489    // The raw size may be slightly larger due to metadata overhead, but the
490    // compression benefit typically outweighs this.
491    let extract_mask = classify_columns(&columns, non_empty.len());
492    let all_extracted = extract_mask.iter().all(|&e| e);
493
494    if all_extracted {
495        Some(build_uniform_columnar(
496            &template_parts,
497            &columns,
498            non_empty.len(),
499            has_trailing_newline,
500        ))
501    } else {
502        Some(build_selective_columnar(
503            &template_parts,
504            &columns,
505            &extract_mask,
506            non_empty.len(),
507            has_trailing_newline,
508        ))
509    }
510}
511
512/// Find the best discriminator column for schema clustering.
513/// A discriminator is a low-cardinality, short-valued column whose value predicts
514/// the schema of other columns (e.g., "type" in GitHub Archive events).
515/// Returns the column index, or None if no good discriminator exists.
516fn find_discriminator<'a>(parsed: &[Option<ParsedLine<'a>>]) -> Option<usize> {
517    use std::collections::HashSet;
518
519    // Sample up to 200 valid parsed lines.
520    let mut samples: Vec<&ParsedLine<'a>> = Vec::new();
521    for line in parsed.iter().flatten() {
522        samples.push(line);
523        if samples.len() >= 200 {
524            break;
525        }
526    }
527
528    if samples.len() < 10 {
529        return None;
530    }
531
532    // Use the minimum column count across all samples.
533    let num_cols = samples.iter().map(|s| s.1.len()).min().unwrap_or(0);
534    if num_cols == 0 {
535        return None;
536    }
537
538    let mut best_col = None;
539    let mut best_cardinality = usize::MAX;
540
541    for col_idx in 0..num_cols {
542        // Compute average value length — skip long values (nested objects).
543        let total_len: usize = samples.iter().map(|s| s.1[col_idx].len()).sum();
544        let avg_len = total_len / samples.len();
545        if avg_len > 30 {
546            continue;
547        }
548
549        let unique: HashSet<&[u8]> = samples.iter().map(|s| s.1[col_idx]).collect();
550        let cardinality = unique.len();
551
552        // Good discriminator: > 1 distinct value, < 1/3 of rows distinct.
553        if cardinality > 1 && cardinality < samples.len() / 3 && cardinality < best_cardinality {
554            best_col = Some(col_idx);
555            best_cardinality = cardinality;
556        }
557    }
558
559    best_col
560}
561
562/// Strategy 2: Group-by-schema — group rows by template, columnarize each group.
563///
564/// Metadata format (version=2):
565///   version: u8 = 2
566///   has_trailing_newline: u8
567///   total_rows: u32 LE
568///   num_groups: u16 LE
569///   for each group:
570///     num_rows: u32 LE
571///     row_indices: [u32 LE * num_rows]
572///     group_metadata_len: u32 LE
573///     group_metadata: [bytes]  (Strategy 1 metadata for this group)
574///   residual_count: u32 LE
575///   residual_indices: [u32 LE * residual_count]
576///
577/// Data format:
578///   for each group:
579///     data_len: u32 LE
580///     data: [bytes]  (columnar data for this group)
581///   residual_data: [bytes]  (raw lines joined by \n)
582fn preprocess_grouped<'a>(
583    non_empty: &[&'a [u8]],
584    has_trailing_newline: bool,
585) -> Option<(Vec<u8>, Vec<u8>)> {
586    if non_empty.len() < MIN_GROUP_ROWS {
587        return None;
588    }
589
590    // Detect discriminator column for potential sub-grouping.
591    let parsed: Vec<Option<ParsedLine<'a>>> =
592        non_empty.iter().map(|&l| parse_line(l)).collect();
593    let disc_col = find_discriminator(&parsed);
594    drop(parsed);
595
596    // Try schema-only grouping first (always).
597    let result_no_disc = preprocess_grouped_core(non_empty, has_trailing_newline, None);
598
599    // If a discriminator exists, also try sub-grouped encoding and pick the winner.
600    if let Some(dc) = disc_col {
601        let result_disc = preprocess_grouped_core(non_empty, has_trailing_newline, Some(dc));
602        match (&result_no_disc, &result_disc) {
603            (Some((d1, m1)), Some((d2, m2))) => {
604                if d2.len() + m2.len() < d1.len() + m1.len() {
605                    return result_disc;
606                }
607                return result_no_disc;
608            }
609            (None, Some(_)) => return result_disc,
610            _ => return result_no_disc,
611        }
612    }
613
614    result_no_disc
615}
616
617/// Core grouped encoding: group by schema, optionally sub-group by discriminator,
618/// build columnar data per group with optional nested flatten.
619fn preprocess_grouped_core<'a>(
620    non_empty: &[&'a [u8]],
621    has_trailing_newline: bool,
622    disc_col: Option<usize>,
623) -> Option<(Vec<u8>, Vec<u8>)> {
624    // Parse all lines and group by template.
625    let mut parsed: Vec<Option<ParsedLine<'a>>> = Vec::with_capacity(non_empty.len());
626    for &line in non_empty {
627        parsed.push(parse_line(line));
628    }
629
630    let mut group_map: HashMap<Vec<u8>, SchemaGroup<'a>> = HashMap::new();
631    let mut residual_indices: Vec<usize> = Vec::new();
632
633    for (idx, parsed_line) in parsed.into_iter().enumerate() {
634        if let Some((parts, values)) = parsed_line {
635            let mut key = Vec::new();
636            for part in &parts {
637                key.extend_from_slice(&(part.len() as u32).to_le_bytes());
638                key.extend_from_slice(part);
639            }
640            // If discriminator column specified, include its value for sub-grouping.
641            if let Some(dc) = disc_col {
642                if dc < values.len() {
643                    key.push(0xFF);
644                    key.extend_from_slice(values[dc]);
645                }
646            }
647            group_map
648                .entry(key)
649                .or_insert_with(|| (parts, Vec::new()))
650                .1
651                .push((idx, values));
652        } else {
653            residual_indices.push(idx);
654        }
655    }
656
657    // Separate groups into qualifying (>= MIN_GROUP_ROWS) and residual.
658    let mut groups: Vec<SchemaGroup<'a>> = Vec::new();
659    for (_key, (template_parts, rows)) in group_map {
660        if rows.len() >= MIN_GROUP_ROWS {
661            groups.push((template_parts, rows));
662        } else {
663            // Too few rows — send to residual.
664            for (idx, _) in &rows {
665                residual_indices.push(*idx);
666            }
667        }
668    }
669
670    // Need at least 1 qualifying group for this to be useful.
671    if groups.is_empty() {
672        return None;
673    }
674
675    // Sort groups by their first row index for deterministic output.
676    groups.sort_by_key(|(_, rows)| rows[0].0);
677    residual_indices.sort_unstable();
678
679    // Build per-group columnar data and metadata.
680    struct GroupOutput {
681        row_indices: Vec<u32>,
682        col_data: Vec<u8>,
683        group_metadata: Vec<u8>,
684    }
685
686    let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
687
688    for (template_parts, rows) in &groups {
689        let num_cols = template_parts.len() - 1;
690        let mut columns: Vec<Vec<&[u8]>> = (0..num_cols).map(|_| Vec::new()).collect();
691        let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
692
693        for (idx, values) in rows {
694            row_indices.push(*idx as u32);
695            for (col, val) in values.iter().enumerate() {
696                columns[col].push(*val);
697            }
698        }
699
700        // Decide: uniform (version=1) or selective (version=3) per group.
701        let extract_mask = classify_columns(&columns, rows.len());
702        let all_extracted = extract_mask.iter().all(|&e| e);
703
704        let (mut col_data, mut group_metadata) = if all_extracted {
705            build_uniform_columnar(template_parts, &columns, rows.len(), false)
706        } else {
707            build_selective_columnar(template_parts, &columns, &extract_mask, rows.len(), false)
708        };
709
710        // Try nested object flatten for uniform groups.
711        // Decomposes nested JSON objects (e.g., payload, actor) into sub-columns
712        // for better compression when rows are schema-clustered.
713        if all_extracted {
714            if let Some((flat_data, nested_groups)) =
715                flatten_nested_columns(&col_data, rows.len())
716            {
717                let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
718                let unflattened = unflatten_nested_columns(
719                    &flat_data,
720                    &nested_groups,
721                    rows.len(),
722                    total_flat_cols,
723                );
724                if unflattened == col_data {
725                    let nested_bytes = serialize_nested_info(&nested_groups);
726                    group_metadata.extend_from_slice(&nested_bytes);
727                    col_data = flat_data;
728                } else {
729                    group_metadata.push(0u8); // has_nested = 0
730                }
731            } else {
732                group_metadata.push(0u8); // has_nested = 0
733            }
734
735        }
736
737        group_outputs.push(GroupOutput {
738            row_indices,
739            col_data,
740            group_metadata,
741        });
742    }
743
744    // Build the combined data blob.
745    let mut data_out = Vec::new();
746    for group in &group_outputs {
747        data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
748        data_out.extend_from_slice(&group.col_data);
749    }
750
751    // Append residual lines (raw, separated by \n).
752    let residual_start = data_out.len();
753    for (i, &idx) in residual_indices.iter().enumerate() {
754        data_out.extend_from_slice(non_empty[idx]);
755        if i < residual_indices.len() - 1 {
756            data_out.push(b'\n');
757        }
758    }
759    let _residual_len = data_out.len() - residual_start;
760
761    // Build the combined metadata.
762    let mut metadata = Vec::new();
763    metadata.push(METADATA_VERSION_GROUPED);
764    metadata.push(if has_trailing_newline { 1 } else { 0 });
765    metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
766    metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
767
768    for group in &group_outputs {
769        metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
770        for &idx in &group.row_indices {
771            metadata.extend_from_slice(&idx.to_le_bytes());
772        }
773        metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
774        metadata.extend_from_slice(&group.group_metadata);
775    }
776
777    metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
778    for &idx in &residual_indices {
779        metadata.extend_from_slice(&(idx as u32).to_le_bytes());
780    }
781
782    Some((data_out, metadata))
783}
784
785/// Metadata describing which columns were flattened from nested objects.
786pub(crate) struct NestedGroupInfo {
787    /// Index of the original column that was expanded.
788    pub(crate) original_col_index: u16,
789    /// Sub-key names for the expanded sub-columns.
790    pub(crate) sub_keys: Vec<Vec<u8>>,
791    /// Template parts for reconstructing nested objects (preserves original formatting).
792    /// If empty, compact format `{"key":val,...}` is used (NDJSON compatibility).
793    pub(crate) nested_template: Vec<Vec<u8>>,
794    /// Absence bitmap: one bit per (sub_key, row). Bit=1 means the key was ABSENT
795    /// in the original object (not present at all), vs bit=0 means key was present
796    /// (possibly with explicit `null`). Packed LSB-first.
797    /// Length = ceil(num_sub_keys * num_rows / 8).
798    /// Empty if all rows had all keys (no absences).
799    pub(crate) absence_bitmap: Vec<u8>,
800}
801
802/// Attempt to flatten nested JSON objects in columnar data (depth-1).
803///
804/// Takes columnar data (\x00/\x01 separated) and returns expanded columnar data
805/// with nested objects decomposed into sub-columns.
806/// Returns None if no nested objects found.
807pub(crate) fn flatten_nested_columns(
808    col_data: &[u8],
809    num_rows: usize,
810) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
811    // Split into columns.
812    let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
813    if columns.is_empty() || num_rows == 0 {
814        return None;
815    }
816
817    let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
818    // Build the output columns: for non-nested cols, keep as-is.
819    // For nested cols, replace with sub-columns.
820    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
821
822    for (col_idx, &col_chunk) in columns.iter().enumerate() {
823        let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
824        if values.len() != num_rows {
825            return None;
826        }
827
828        // Check if ALL non-null values start with '{' (nested object).
829        let mut all_objects = true;
830        let mut has_non_null = false;
831        for val in &values {
832            if *val == b"null" {
833                continue;
834            }
835            has_non_null = true;
836            if !val.starts_with(b"{") {
837                all_objects = false;
838                break;
839            }
840        }
841
842        if !all_objects || !has_non_null {
843            // Not a nested-object column — keep as-is.
844            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
845            output_columns.push(col_values);
846            continue;
847        }
848
849        // This column contains nested objects — decompose depth-1.
850        // Parse all values and collect all unique sub-keys (preserving discovery order).
851        // Also capture the template from the first non-null object to preserve formatting.
852        let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
853        let mut nested_template: Vec<Vec<u8>> = Vec::new();
854        type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
855        let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
856
857        for val in &values {
858            if *val == b"null" {
859                parsed_rows.push(None);
860                continue;
861            }
862            if nested_template.is_empty() {
863                // First non-null: use template-preserving parser.
864                match parse_nested_object_with_template(val) {
865                    Some((template, kv_pairs)) => {
866                        for (key, _) in &kv_pairs {
867                            if !all_sub_keys.iter().any(|k| k == key) {
868                                all_sub_keys.push(key.clone());
869                            }
870                        }
871                        nested_template = template;
872                        parsed_rows.push(Some(kv_pairs));
873                    }
874                    None => {
875                        all_sub_keys.clear();
876                        break;
877                    }
878                }
879            } else {
880                // Subsequent rows: use simpler kv parser (template already captured).
881                match parse_nested_object_kv(val) {
882                    Some(kv_pairs) => {
883                        for (key, _) in &kv_pairs {
884                            if !all_sub_keys.iter().any(|k| k == key) {
885                                all_sub_keys.push(key.clone());
886                            }
887                        }
888                        parsed_rows.push(Some(kv_pairs));
889                    }
890                    None => {
891                        all_sub_keys.clear();
892                        break;
893                    }
894                }
895            }
896        }
897
898        if all_sub_keys.is_empty() {
899            // Could not parse — keep column as-is.
900            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
901            output_columns.push(col_values);
902            continue;
903        }
904
905        // Build sub-columns: for each sub-key, extract values from parsed rows.
906        // Also build an absence bitmap: bit=1 where a key was absent from the
907        // original row (as opposed to being present with explicit `null`).
908        let num_sub_keys = all_sub_keys.len();
909        let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
910        let total_bits = num_sub_keys * num_rows;
911        let bitmap_bytes = total_bits.div_ceil(8);
912        let mut absence_bitmap = vec![0u8; bitmap_bytes];
913        let mut has_any_absent = false;
914
915        for (row_idx, parsed) in parsed_rows.iter().enumerate() {
916            match parsed {
917                Some(kv_pairs) => {
918                    for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
919                        let found = kv_pairs.iter().find(|(k, _)| k == sk);
920                        match found {
921                            Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
922                            None => {
923                                sub_columns[sk_idx].push(b"null".to_vec());
924                                // Mark this (sub_key, row) as absent.
925                                let bit_idx = sk_idx * num_rows + row_idx;
926                                absence_bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
927                                has_any_absent = true;
928                            }
929                        }
930                    }
931                }
932                None => {
933                    // null row — all sub-columns get null.
934                    // These are NOT marked as absent (the whole column was null,
935                    // not individual keys missing).
936                    for sc in sub_columns.iter_mut() {
937                        sc.push(b"null".to_vec());
938                    }
939                }
940            }
941        }
942
943        nested_groups.push(NestedGroupInfo {
944            original_col_index: col_idx as u16,
945            sub_keys: all_sub_keys,
946            nested_template,
947            absence_bitmap: if has_any_absent {
948                absence_bitmap
949            } else {
950                Vec::new()
951            },
952        });
953
954        for sc in sub_columns {
955            output_columns.push(sc);
956        }
957    }
958
959    if nested_groups.is_empty() {
960        return None;
961    }
962
963    // Build the flattened columnar data.
964    let num_out_cols = output_columns.len();
965    let mut out = Vec::new();
966    for (ci, col) in output_columns.iter().enumerate() {
967        for (ri, val) in col.iter().enumerate() {
968            out.extend_from_slice(val);
969            if ri < num_rows - 1 {
970                out.push(VAL_SEP);
971            }
972        }
973        if ci < num_out_cols - 1 {
974            out.push(COL_SEP);
975        }
976    }
977
978    Some((out, nested_groups))
979}
980
981/// Parse a nested JSON object into (template_parts, kv_pairs).
982/// Template parts include all structural bytes (braces, keys, colons, whitespace) —
983/// preserving the original formatting so the object can be reconstructed exactly.
984/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
985#[allow(clippy::type_complexity)]
986pub(crate) fn parse_nested_object_with_template(
987    obj: &[u8],
988) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
989    let mut pos = 0;
990
991    // Skip whitespace.
992    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
993        pos += 1;
994    }
995    if pos >= obj.len() || obj[pos] != b'{' {
996        return None;
997    }
998    pos += 1;
999
1000    let mut parts: Vec<Vec<u8>> = Vec::new();
1001    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1002    let mut part_start = 0;
1003
1004    loop {
1005        // Skip whitespace.
1006        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1007            pos += 1;
1008        }
1009        if pos >= obj.len() {
1010            return None;
1011        }
1012        if obj[pos] == b'}' {
1013            parts.push(obj[part_start..].to_vec());
1014            break;
1015        }
1016
1017        // Expect a key string.
1018        if obj[pos] != b'"' {
1019            return None;
1020        }
1021        let key_str_start = pos + 1;
1022        pos += 1;
1023        let mut escaped = false;
1024        while pos < obj.len() {
1025            if escaped {
1026                escaped = false;
1027            } else if obj[pos] == b'\\' {
1028                escaped = true;
1029            } else if obj[pos] == b'"' {
1030                break;
1031            }
1032            pos += 1;
1033        }
1034        if pos >= obj.len() {
1035            return None;
1036        }
1037        let key = obj[key_str_start..pos].to_vec();
1038        pos += 1; // skip closing quote
1039
1040        // Skip whitespace, expect colon.
1041        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1042            pos += 1;
1043        }
1044        if pos >= obj.len() || obj[pos] != b':' {
1045            return None;
1046        }
1047        pos += 1;
1048
1049        // Skip whitespace between colon and value — include in template.
1050        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1051            pos += 1;
1052        }
1053
1054        // Template part: everything from part_start to here (includes key, colon, post-colon ws).
1055        parts.push(obj[part_start..pos].to_vec());
1056
1057        // Extract the value (no whitespace skipping — already consumed above).
1058        let value_start = pos;
1059        // Use extract_value but we've already consumed whitespace.
1060        let (value, value_end) = extract_value(obj, value_start)?;
1061        pos = value_end;
1062        pairs.push((key, value.to_vec()));
1063
1064        part_start = pos;
1065
1066        // Skip whitespace.
1067        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1068            pos += 1;
1069        }
1070        if pos >= obj.len() {
1071            return None;
1072        }
1073        if obj[pos] == b',' {
1074            pos += 1;
1075        } else if obj[pos] == b'}' {
1076            parts.push(obj[part_start..].to_vec());
1077            break;
1078        } else {
1079            return None;
1080        }
1081    }
1082
1083    if pairs.is_empty() {
1084        return None;
1085    }
1086    Some((parts, pairs))
1087}
1088
1089/// Parse a nested JSON object into its key-value pairs (depth-1 only).
1090/// Returns the exact bytes for each key and value.
1091/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
1092pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
1093    let mut pos = 0;
1094
1095    // Skip whitespace.
1096    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1097        pos += 1;
1098    }
1099    if pos >= obj.len() || obj[pos] != b'{' {
1100        return None;
1101    }
1102    pos += 1;
1103
1104    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1105
1106    loop {
1107        // Skip whitespace.
1108        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1109            pos += 1;
1110        }
1111        if pos >= obj.len() {
1112            return None;
1113        }
1114        if obj[pos] == b'}' {
1115            break;
1116        }
1117
1118        // Expect a key string.
1119        if obj[pos] != b'"' {
1120            return None;
1121        }
1122        pos += 1;
1123        let key_start = pos;
1124        let mut escaped = false;
1125        while pos < obj.len() {
1126            if escaped {
1127                escaped = false;
1128            } else if obj[pos] == b'\\' {
1129                escaped = true;
1130            } else if obj[pos] == b'"' {
1131                break;
1132            }
1133            pos += 1;
1134        }
1135        if pos >= obj.len() {
1136            return None;
1137        }
1138        let key = obj[key_start..pos].to_vec();
1139        pos += 1; // skip closing quote
1140
1141        // Skip whitespace, expect colon.
1142        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1143            pos += 1;
1144        }
1145        if pos >= obj.len() || obj[pos] != b':' {
1146            return None;
1147        }
1148        pos += 1;
1149
1150        // Extract the value.
1151        let (value, value_end) = extract_value(obj, pos)?;
1152        pos = value_end;
1153        pairs.push((key, value.to_vec()));
1154
1155        // Skip whitespace.
1156        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
1157            pos += 1;
1158        }
1159        if pos >= obj.len() {
1160            return None;
1161        }
1162        if obj[pos] == b',' {
1163            pos += 1;
1164        } else if obj[pos] == b'}' {
1165            break;
1166        } else {
1167            return None;
1168        }
1169    }
1170
1171    if pairs.is_empty() {
1172        return None;
1173    }
1174    Some(pairs)
1175}
1176
1177/// Unflatten nested sub-columns back into JSON objects.
1178///
1179/// Takes flattened columnar data and nested group info, merges sub-columns
1180/// back into the original nested object columns.
1181pub(crate) fn unflatten_nested_columns(
1182    flat_data: &[u8],
1183    nested_groups: &[NestedGroupInfo],
1184    num_rows: usize,
1185    total_flat_cols: usize,
1186) -> Vec<u8> {
1187    let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
1188    if flat_columns.len() != total_flat_cols {
1189        return flat_data.to_vec();
1190    }
1191
1192    // Parse all flat column values.
1193    let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
1194    for chunk in &flat_columns {
1195        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1196        if vals.len() != num_rows {
1197            return flat_data.to_vec();
1198        }
1199        flat_col_values.push(vals);
1200    }
1201
1202    // Reconstruct original columns from flat columns.
1203    // Walk through flat columns, merging sub-columns back where needed.
1204    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
1205
1206    // Build a set of original_col_index -> group for quick lookup.
1207    // We need to know which flat columns map to which nested group.
1208    // The flat columns are in order: non-nested cols keep their position,
1209    // nested cols are replaced by their sub-columns at that position.
1210    //
1211    // To figure out which flat_idx corresponds to what, we replay the
1212    // forward mapping.
1213    // We need to know the ORIGINAL number of columns.
1214    let original_num_cols = total_flat_cols
1215        - nested_groups
1216            .iter()
1217            .map(|g| g.sub_keys.len())
1218            .sum::<usize>()
1219        + nested_groups.len();
1220
1221    // Build mapping: for each original col, is it nested or not?
1222    let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
1223    for (gi, group) in nested_groups.iter().enumerate() {
1224        if (group.original_col_index as usize) < original_num_cols {
1225            original_col_map[group.original_col_index as usize] = Some(gi);
1226        }
1227    }
1228
1229    let mut flat_idx = 0;
1230    for entry in original_col_map.iter().take(original_num_cols) {
1231        if let Some(gi) = entry {
1232            let group = &nested_groups[*gi];
1233            let num_sub = group.sub_keys.len();
1234
1235            // Helper: check if sub-key `si` at `row` is absent using bitmap.
1236            let is_absent = |si: usize, row: usize| -> bool {
1237                if group.absence_bitmap.is_empty() {
1238                    return false; // no absences in this group
1239                }
1240                let bit_idx = si * num_rows + row;
1241                let byte_idx = bit_idx / 8;
1242                if byte_idx >= group.absence_bitmap.len() {
1243                    return false;
1244                }
1245                (group.absence_bitmap[byte_idx] >> (bit_idx % 8)) & 1 == 1
1246            };
1247
1248            // Merge sub-columns back into nested objects.
1249            let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
1250            for row in 0..num_rows {
1251                // Check if all sub-columns are null or absent for this row
1252                // (meaning the whole nested column was null).
1253                let all_null = (0..num_sub).all(|si| {
1254                    flat_idx + si < flat_col_values.len()
1255                        && flat_col_values[flat_idx + si][row] == b"null"
1256                });
1257                if all_null && !group.absence_bitmap.is_empty() {
1258                    // If all values are null but some are "absent" and some are
1259                    // "explicit null", we need to reconstruct, not collapse.
1260                    let any_present_null = (0..num_sub).any(|si| {
1261                        flat_col_values[flat_idx + si][row] == b"null" && !is_absent(si, row)
1262                    });
1263                    if any_present_null {
1264                        // At least one key has an explicit null — don't collapse.
1265                        // Fall through to reconstruction below.
1266                    } else {
1267                        // All nulls are from absent keys — whole column was null.
1268                        merged_col.push(b"null".to_vec());
1269                        continue;
1270                    }
1271                } else if all_null {
1272                    merged_col.push(b"null".to_vec());
1273                    continue;
1274                }
1275
1276                // Check whether any sub-key is absent in this row.
1277                let has_absent = (0..num_sub).any(|si| is_absent(si, row));
1278
1279                if !has_absent
1280                    && !group.nested_template.is_empty()
1281                    && group.nested_template.len() == num_sub + 1
1282                {
1283                    // Template-based reconstruction: all keys present,
1284                    // preserves original formatting exactly.
1285                    let mut obj = Vec::new();
1286                    obj.extend_from_slice(&group.nested_template[0]);
1287                    if flat_idx < flat_col_values.len() {
1288                        obj.extend_from_slice(flat_col_values[flat_idx][row]);
1289                    }
1290                    for si in 1..num_sub {
1291                        obj.extend_from_slice(&group.nested_template[si]);
1292                        if flat_idx + si < flat_col_values.len() {
1293                            obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
1294                        }
1295                    }
1296                    obj.extend_from_slice(&group.nested_template[num_sub]);
1297                    merged_col.push(obj);
1298                } else {
1299                    // Compact reconstruction: some keys absent, or no template.
1300                    // Skip sub-keys that were absent in the original.
1301                    let mut obj = Vec::new();
1302                    obj.push(b'{');
1303                    let mut first = true;
1304                    for si in 0..num_sub {
1305                        if flat_idx + si >= flat_col_values.len() {
1306                            break;
1307                        }
1308                        if is_absent(si, row) {
1309                            continue; // key was absent — omit entirely
1310                        }
1311                        let val = flat_col_values[flat_idx + si][row];
1312                        if !first {
1313                            obj.push(b',');
1314                        }
1315                        first = false;
1316                        obj.push(b'"');
1317                        obj.extend_from_slice(&group.sub_keys[si]);
1318                        obj.push(b'"');
1319                        obj.push(b':');
1320                        obj.extend_from_slice(val);
1321                    }
1322                    obj.push(b'}');
1323                    merged_col.push(obj);
1324                }
1325            }
1326            output_columns.push(merged_col);
1327            flat_idx += num_sub;
1328        } else {
1329            // Non-nested column — copy as-is.
1330            if flat_idx < flat_col_values.len() {
1331                let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1332                    .iter()
1333                    .map(|v| v.to_vec())
1334                    .collect();
1335                output_columns.push(col);
1336            }
1337            flat_idx += 1;
1338        }
1339    }
1340
1341    // Rebuild columnar data.
1342    let num_out_cols = output_columns.len();
1343    let mut out = Vec::new();
1344    for (ci, col) in output_columns.iter().enumerate() {
1345        for (ri, val) in col.iter().enumerate() {
1346            out.extend_from_slice(val);
1347            if ri < num_rows - 1 {
1348                out.push(VAL_SEP);
1349            }
1350        }
1351        if ci < num_out_cols - 1 {
1352            out.push(COL_SEP);
1353        }
1354    }
1355
1356    out
1357}
1358
1359/// Serialize nested group info into bytes for storage in metadata.
1360/// Version 1 (has_nested=1): sub_keys only (backward compat, NDJSON path).
1361/// Version 2 (has_nested=2): sub_keys + nested_template (preserves formatting).
1362/// Version 3 (has_nested=3): sub_keys + nested_template + absence_bitmap.
1363pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1364    let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1365    let has_absence = groups.iter().any(|g| !g.absence_bitmap.is_empty());
1366    let mut out = Vec::new();
1367    let version = if has_absence {
1368        3u8
1369    } else if has_template {
1370        2u8
1371    } else {
1372        1u8
1373    };
1374    out.push(version);
1375    out.push(groups.len() as u8);
1376    for group in groups {
1377        out.extend_from_slice(&group.original_col_index.to_le_bytes());
1378        out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1379        for key in &group.sub_keys {
1380            out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1381            out.extend_from_slice(key);
1382        }
1383        if has_template || version == 3 {
1384            out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1385            for part in &group.nested_template {
1386                out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1387                out.extend_from_slice(part);
1388            }
1389        }
1390        if version == 3 {
1391            let bm_len = group.absence_bitmap.len() as u32;
1392            out.extend_from_slice(&bm_len.to_le_bytes());
1393            out.extend_from_slice(&group.absence_bitmap);
1394        }
1395    }
1396    out
1397}
1398
1399/// Deserialize nested group info from metadata bytes.
1400/// Returns (nested_groups, bytes_consumed).
1401/// Handles version 1 (no template), version 2 (with template),
1402/// and version 3 (template + absence bitmap).
1403pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1404    if data.is_empty() {
1405        return None;
1406    }
1407    let mut pos = 0;
1408    let version = data[pos];
1409    pos += 1;
1410    if version != 1 && version != 2 && version != 3 {
1411        return None;
1412    }
1413    let has_template = version == 2 || version == 3;
1414    let has_absence = version == 3;
1415    if pos >= data.len() {
1416        return None;
1417    }
1418    let num_groups = data[pos] as usize;
1419    pos += 1;
1420
1421    let mut groups = Vec::with_capacity(num_groups);
1422    for _ in 0..num_groups {
1423        if pos + 4 > data.len() {
1424            return None;
1425        }
1426        let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1427        pos += 2;
1428        let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1429        pos += 2;
1430
1431        let mut sub_keys = Vec::with_capacity(num_sub_cols);
1432        for _ in 0..num_sub_cols {
1433            if pos + 2 > data.len() {
1434                return None;
1435            }
1436            let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1437            pos += 2;
1438            if pos + key_len > data.len() {
1439                return None;
1440            }
1441            sub_keys.push(data[pos..pos + key_len].to_vec());
1442            pos += key_len;
1443        }
1444
1445        let nested_template = if has_template {
1446            if pos + 2 > data.len() {
1447                return None;
1448            }
1449            let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1450            pos += 2;
1451            let mut parts = Vec::with_capacity(num_parts);
1452            for _ in 0..num_parts {
1453                if pos + 2 > data.len() {
1454                    return None;
1455                }
1456                let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1457                pos += 2;
1458                if pos + part_len > data.len() {
1459                    return None;
1460                }
1461                parts.push(data[pos..pos + part_len].to_vec());
1462                pos += part_len;
1463            }
1464            parts
1465        } else {
1466            Vec::new()
1467        };
1468
1469        let absence_bitmap = if has_absence {
1470            if pos + 4 > data.len() {
1471                return None;
1472            }
1473            let bm_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1474            pos += 4;
1475            if pos + bm_len > data.len() {
1476                return None;
1477            }
1478            let bm = data[pos..pos + bm_len].to_vec();
1479            pos += bm_len;
1480            bm
1481        } else {
1482            Vec::new()
1483        };
1484
1485        groups.push(NestedGroupInfo {
1486            original_col_index,
1487            sub_keys,
1488            nested_template,
1489            absence_bitmap,
1490        });
1491    }
1492
1493    Some((groups, pos))
1494}
1495
1496/// Forward transform: NDJSON columnar reorg.
1497///
1498/// Tries Strategy 1 (uniform) first, then Strategy 2 (grouped) if schemas differ.
1499/// Returns None if data is not suitable for columnar transform.
1500pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1501    if data.is_empty() {
1502        return None;
1503    }
1504
1505    let has_trailing_newline = data.last() == Some(&b'\n');
1506    let lines = split_lines(data);
1507    let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1508
1509    if non_empty.len() < 2 {
1510        return None;
1511    }
1512
1513    // Strategy 1: try uniform schema first.
1514    if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1515        let is_selective = !metadata.is_empty() && metadata[0] == METADATA_VERSION_SELECTIVE;
1516        // Selective columnar may be slightly larger raw (metadata overhead) but the
1517        // row-major inline layout benefits downstream compression significantly.
1518        // Allow up to 5% overhead for selective; strict for uniform.
1519        let size_ok = if is_selective {
1520            (col_data.len() + metadata.len()) * 100 <= data.len() * 105
1521        } else {
1522            col_data.len() + metadata.len() < data.len()
1523        };
1524        if size_ok {
1525            // Selective columnar (version=3) has a different data layout — skip nested flatten.
1526            if is_selective {
1527                return Some(TransformResult {
1528                    data: col_data,
1529                    metadata,
1530                });
1531            }
1532            // Try depth-1 nested decomposition on the columnar output.
1533            // Even if the flattened data is slightly larger raw, the downstream
1534            // typed encoding + compression benefits are significant: null bitmaps
1535            // are compact and type-homogeneous columns compress much better.
1536            let num_rows = non_empty.len();
1537            if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1538                // Verify roundtrip: unflatten must produce the exact original columnar
1539                // data. Nested objects with varying sub-key sets or key ordering can
1540                // cause the compact reconstruction to reorder keys, breaking byte-exact
1541                // roundtrip. Only apply if the unflatten is provably lossless.
1542                let total_flat_cols = flat_data.split(|&b| b == COL_SEP).count();
1543                let unflattened =
1544                    unflatten_nested_columns(&flat_data, &nested_groups, num_rows, total_flat_cols);
1545                if unflattened == col_data {
1546                    // Append nested info to metadata.
1547                    let nested_bytes = serialize_nested_info(&nested_groups);
1548                    metadata.extend_from_slice(&nested_bytes);
1549                    return Some(TransformResult {
1550                        data: flat_data,
1551                        metadata,
1552                    });
1553                }
1554                // else: roundtrip not exact — skip nested flatten.
1555            }
1556            // No nested objects found — append has_nested=0.
1557            metadata.push(0u8); // has_nested = 0
1558            return Some(TransformResult {
1559                data: col_data,
1560                metadata,
1561            });
1562        }
1563    }
1564
1565    // Strategy 2: group by schema.
1566    if let Some((grouped_data, grouped_metadata)) =
1567        preprocess_grouped(&non_empty, has_trailing_newline)
1568    {
1569        if grouped_data.len() + grouped_metadata.len() < data.len() {
1570            return Some(TransformResult {
1571                data: grouped_data,
1572                metadata: grouped_metadata,
1573            });
1574        }
1575    }
1576
1577    None
1578}
1579
1580/// Reverse transform: reconstruct NDJSON from columnar layout + metadata.
1581/// Dispatches to the appropriate decoder based on metadata version byte.
1582pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1583    if metadata.is_empty() {
1584        return data.to_vec();
1585    }
1586    match metadata[0] {
1587        METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1588        METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1589        METADATA_VERSION_SELECTIVE => reverse_selective(data, metadata),
1590        _ => data.to_vec(),
1591    }
1592}
1593
1594/// Reverse Strategy 1: uniform schema.
1595fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1596    if metadata.len() < 10 {
1597        return data.to_vec();
1598    }
1599    let mut pos = 0;
1600    let _version = metadata[pos];
1601    pos += 1;
1602    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1603    pos += 4;
1604    let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1605    pos += 2;
1606    let has_trailing_newline = metadata[pos] != 0;
1607    pos += 1;
1608    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1609    pos += 2;
1610
1611    let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1612    for _ in 0..num_parts {
1613        if pos + 2 > metadata.len() {
1614            return data.to_vec();
1615        }
1616        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1617        pos += 2;
1618        if pos + part_len > metadata.len() {
1619            return data.to_vec();
1620        }
1621        parts.push(metadata[pos..pos + part_len].to_vec());
1622        pos += part_len;
1623    }
1624
1625    if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1626        return data.to_vec();
1627    }
1628
1629    // Check for nested metadata after template parts.
1630    let remaining_metadata = &metadata[pos..];
1631    if !remaining_metadata.is_empty()
1632        && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2 || remaining_metadata[0] == 3)
1633    {
1634        // has_nested == 1, 2, or 3: unflatten before reconstructing rows.
1635        if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1636            // Calculate total number of flat columns.
1637            let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1638            let unflattened =
1639                unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1640            return reverse_uniform_from_parts(
1641                &unflattened,
1642                &parts,
1643                num_rows,
1644                num_cols,
1645                has_trailing_newline,
1646            );
1647        }
1648    }
1649
1650    reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1651}
1652
1653/// Core uniform reverse: given parsed parts, reconstruct lines from columnar data.
1654fn reverse_uniform_from_parts(
1655    data: &[u8],
1656    parts: &[Vec<u8>],
1657    num_rows: usize,
1658    num_cols: usize,
1659    has_trailing_newline: bool,
1660) -> Vec<u8> {
1661    let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1662    if col_chunks.len() != num_cols {
1663        return data.to_vec();
1664    }
1665
1666    let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1667    for chunk in &col_chunks {
1668        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1669        if vals.len() != num_rows {
1670            return data.to_vec();
1671        }
1672        columns.push(vals);
1673    }
1674
1675    // Pre-calculate exact output size to allocate once (avoid incremental realloc).
1676    let template_size_per_row: usize = parts.iter().map(|p| p.len()).sum();
1677    let values_total: usize = columns
1678        .iter()
1679        .map(|col| col.iter().map(|v| v.len()).sum::<usize>())
1680        .sum();
1681    let newline_count = if has_trailing_newline {
1682        num_rows
1683    } else {
1684        num_rows - 1
1685    };
1686    let total_size = template_size_per_row * num_rows + values_total + newline_count;
1687
1688    let mut output = Vec::with_capacity(total_size);
1689    #[allow(clippy::needless_range_loop)]
1690    for row in 0..num_rows {
1691        output.extend_from_slice(&parts[0]);
1692        output.extend_from_slice(columns[0][row]);
1693        for col in 1..num_cols {
1694            output.extend_from_slice(&parts[col]);
1695            output.extend_from_slice(columns[col][row]);
1696        }
1697        output.extend_from_slice(&parts[num_cols]);
1698
1699        if row < num_rows - 1 || has_trailing_newline {
1700            output.push(b'\n');
1701        }
1702    }
1703
1704    output
1705}
1706
1707/// Parsed selective metadata.
1708struct SelectiveMetadata {
1709    parts: Vec<Vec<u8>>,
1710    num_rows: usize,
1711    num_total_cols: usize,
1712    has_trailing_newline: bool,
1713    extracted_col_indices: Vec<u16>,
1714}
1715
1716/// Parse version=3 (selective) metadata.
1717fn parse_selective_metadata(metadata: &[u8]) -> Option<SelectiveMetadata> {
1718    if metadata.len() < 12 {
1719        return None;
1720    }
1721    let mut pos = 1; // Skip version byte.
1722    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1723    pos += 4;
1724    let num_total_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1725    pos += 2;
1726    let has_trailing_newline = metadata[pos] != 0;
1727    pos += 1;
1728    let num_extracted = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1729    pos += 2;
1730
1731    let mut extracted_col_indices = Vec::with_capacity(num_extracted);
1732    for _ in 0..num_extracted {
1733        if pos + 2 > metadata.len() {
1734            return None;
1735        }
1736        let idx = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap());
1737        pos += 2;
1738        extracted_col_indices.push(idx);
1739    }
1740
1741    if pos + 2 > metadata.len() {
1742        return None;
1743    }
1744    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1745    pos += 2;
1746
1747    let mut parts = Vec::with_capacity(num_parts);
1748    for _ in 0..num_parts {
1749        if pos + 2 > metadata.len() {
1750            return None;
1751        }
1752        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1753        pos += 2;
1754        if pos + part_len > metadata.len() {
1755            return None;
1756        }
1757        parts.push(metadata[pos..pos + part_len].to_vec());
1758        pos += part_len;
1759    }
1760
1761    if parts.len() != num_total_cols + 1 || num_rows == 0 || num_total_cols == 0 {
1762        return None;
1763    }
1764
1765    Some(SelectiveMetadata {
1766        parts,
1767        num_rows,
1768        num_total_cols,
1769        has_trailing_newline,
1770        extracted_col_indices,
1771    })
1772}
1773
1774/// Reverse selective columnar: reconstruct rows from split extracted/inline sections.
1775fn reverse_selective(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1776    let sm = match parse_selective_metadata(metadata) {
1777        Some(v) => v,
1778        None => return data.to_vec(),
1779    };
1780    reverse_selective_from_data(data, &sm)
1781}
1782
1783/// Core selective reverse: reconstruct rows from selective columnar data.
1784fn reverse_selective_from_data(data: &[u8], sm: &SelectiveMetadata) -> Vec<u8> {
1785    if data.len() < 4 {
1786        return data.to_vec();
1787    }
1788
1789    // Read extracted_data_len from first 4 bytes.
1790    let extracted_data_len = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1791    if 4 + extracted_data_len > data.len() {
1792        return data.to_vec();
1793    }
1794    let extracted_section = &data[4..4 + extracted_data_len];
1795    let inline_section = &data[4 + extracted_data_len..];
1796
1797    let num_extracted = sm.extracted_col_indices.len();
1798    let num_inline = sm.num_total_cols - num_extracted;
1799
1800    // Parse extracted columns (column-major, split by \x00 then \x01).
1801    let extracted_columns: Vec<Vec<&[u8]>> = if num_extracted > 0 && !extracted_section.is_empty() {
1802        let col_chunks: Vec<&[u8]> = extracted_section.split(|&b| b == COL_SEP).collect();
1803        if col_chunks.len() != num_extracted {
1804            return data.to_vec();
1805        }
1806        let mut cols = Vec::with_capacity(num_extracted);
1807        for chunk in &col_chunks {
1808            let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1809            if vals.len() != sm.num_rows {
1810                return data.to_vec();
1811            }
1812            cols.push(vals);
1813        }
1814        cols
1815    } else if num_extracted > 0 {
1816        // extracted section is empty but we expect columns — error
1817        return data.to_vec();
1818    } else {
1819        Vec::new()
1820    };
1821
1822    // Parse inline rows (row-major, split by \x00 then \x01).
1823    let inline_rows: Vec<Vec<&[u8]>> = if num_inline > 0 && !inline_section.is_empty() {
1824        let row_chunks: Vec<&[u8]> = inline_section.split(|&b| b == COL_SEP).collect();
1825        if row_chunks.len() != sm.num_rows {
1826            return data.to_vec();
1827        }
1828        let mut rows = Vec::with_capacity(sm.num_rows);
1829        for chunk in &row_chunks {
1830            let vals: Vec<&[u8]> = if num_inline > 1 {
1831                chunk.split(|&b| b == VAL_SEP).collect()
1832            } else {
1833                vec![*chunk]
1834            };
1835            if vals.len() != num_inline {
1836                return data.to_vec();
1837            }
1838            rows.push(vals);
1839        }
1840        rows
1841    } else if num_inline > 0 {
1842        // inline section is empty but we expect inline columns — error
1843        return data.to_vec();
1844    } else {
1845        Vec::new()
1846    };
1847
1848    // Build a lookup: for each total column index, is it extracted or inline?
1849    // extracted_positions[col] = Some(index into extracted_columns)
1850    // inline_positions[col] = Some(index into inline row values)
1851    let mut extracted_positions = vec![None; sm.num_total_cols];
1852    let mut inline_positions = vec![None; sm.num_total_cols];
1853    for (ei, &col_idx) in sm.extracted_col_indices.iter().enumerate() {
1854        if (col_idx as usize) < sm.num_total_cols {
1855            extracted_positions[col_idx as usize] = Some(ei);
1856        }
1857    }
1858    let mut ii = 0;
1859    for col in 0..sm.num_total_cols {
1860        if extracted_positions[col].is_none() {
1861            inline_positions[col] = Some(ii);
1862            ii += 1;
1863        }
1864    }
1865
1866    // Reconstruct rows.
1867    let mut output = Vec::with_capacity(data.len() * 2);
1868    for row in 0..sm.num_rows {
1869        // Interleave template parts and column values.
1870        output.extend_from_slice(&sm.parts[0]);
1871        for col in 0..sm.num_total_cols {
1872            if let Some(ei) = extracted_positions[col] {
1873                output.extend_from_slice(extracted_columns[ei][row]);
1874            } else if let Some(ii_idx) = inline_positions[col] {
1875                output.extend_from_slice(inline_rows[row][ii_idx]);
1876            }
1877            output.extend_from_slice(&sm.parts[col + 1]);
1878        }
1879
1880        if row < sm.num_rows - 1 || sm.has_trailing_newline {
1881            output.push(b'\n');
1882        }
1883    }
1884
1885    output
1886}
1887
1888
1889/// Reverse Strategy 2: grouped schema.
1890fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1891    if metadata.len() < 8 {
1892        return data.to_vec();
1893    }
1894
1895    let mut mpos = 1; // Skip version byte.
1896    let has_trailing_newline = metadata[mpos] != 0;
1897    mpos += 1;
1898    let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1899    mpos += 4;
1900    let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1901    mpos += 2;
1902
1903    // Allocate output slots.
1904    let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1905
1906    // Data cursor.
1907    let mut dpos: usize = 0;
1908
1909    for _ in 0..num_groups {
1910        // Read row indices for this group.
1911        if mpos + 4 > metadata.len() {
1912            return data.to_vec();
1913        }
1914        let group_row_count =
1915            u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1916        mpos += 4;
1917
1918        let mut row_indices = Vec::with_capacity(group_row_count);
1919        for _ in 0..group_row_count {
1920            if mpos + 4 > metadata.len() {
1921                return data.to_vec();
1922            }
1923            let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1924            mpos += 4;
1925            row_indices.push(idx);
1926        }
1927
1928        // Read group metadata.
1929        if mpos + 4 > metadata.len() {
1930            return data.to_vec();
1931        }
1932        let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1933        mpos += 4;
1934        if mpos + gm_len > metadata.len() {
1935            return data.to_vec();
1936        }
1937        let group_metadata = &metadata[mpos..mpos + gm_len];
1938        mpos += gm_len;
1939
1940        // Read group data from the data blob.
1941        if dpos + 4 > data.len() {
1942            return data.to_vec();
1943        }
1944        let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1945        dpos += 4;
1946        if dpos + gd_len > data.len() {
1947            return data.to_vec();
1948        }
1949        let group_data = &data[dpos..dpos + gd_len];
1950        dpos += gd_len;
1951
1952        // Decode this group — dispatch on per-group metadata version byte.
1953        let group_version = if group_metadata.is_empty() {
1954            0
1955        } else {
1956            group_metadata[0]
1957        };
1958
1959        if group_version == METADATA_VERSION_SELECTIVE {
1960            // Selective columnar group (version=3).
1961            let sm = match parse_selective_metadata(group_metadata) {
1962                Some(v) => v,
1963                None => return data.to_vec(),
1964            };
1965            if sm.num_rows != group_row_count {
1966                return data.to_vec();
1967            }
1968            let reconstructed = reverse_selective_from_data(group_data, &sm);
1969            // Split reconstructed into individual lines.
1970            let lines: Vec<&[u8]> = reconstructed.split(|&b| b == b'\n').collect();
1971            for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1972                if row_within_group < lines.len() && original_idx < total_rows {
1973                    output_lines[original_idx] = Some(lines[row_within_group].to_vec());
1974                }
1975            }
1976        } else {
1977            // Standard uniform group — delegate to reverse_uniform which
1978            // handles nested unflatten automatically.
1979            let reconstructed = reverse_uniform(group_data, group_metadata);
1980            let lines: Vec<&[u8]> = reconstructed.split(|&b| b == b'\n').collect();
1981            for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1982                if row_within_group < lines.len() && original_idx < total_rows {
1983                    output_lines[original_idx] = Some(lines[row_within_group].to_vec());
1984                }
1985            }
1986        }
1987    }
1988
1989    // Read residual indices from metadata.
1990    if mpos + 4 > metadata.len() {
1991        return data.to_vec();
1992    }
1993    let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1994    mpos += 4;
1995
1996    let mut residual_indices = Vec::with_capacity(residual_count);
1997    for _ in 0..residual_count {
1998        if mpos + 4 > metadata.len() {
1999            return data.to_vec();
2000        }
2001        let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
2002        mpos += 4;
2003        residual_indices.push(idx);
2004    }
2005
2006    // Remaining data is residual lines.
2007    let residual_data = &data[dpos..];
2008    if residual_count > 0 {
2009        let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
2010            vec![]
2011        } else {
2012            residual_data.split(|&b| b == b'\n').collect()
2013        };
2014        // There should be exactly residual_count lines.
2015        if residual_lines.len() != residual_count {
2016            return data.to_vec();
2017        }
2018        for (i, &idx) in residual_indices.iter().enumerate() {
2019            if idx < total_rows {
2020                output_lines[idx] = Some(residual_lines[i].to_vec());
2021            }
2022        }
2023    }
2024
2025    // Assemble final output.
2026    let mut output = Vec::with_capacity(data.len() * 2);
2027    for (i, slot) in output_lines.iter().enumerate() {
2028        match slot {
2029            Some(line) => output.extend_from_slice(line),
2030            None => {
2031                // Should not happen — missing row. Return data as-is.
2032                return data.to_vec();
2033            }
2034        }
2035        if i < total_rows - 1 || has_trailing_newline {
2036            output.push(b'\n');
2037        }
2038    }
2039
2040    output
2041}
2042
2043#[cfg(test)]
2044mod tests {
2045    use super::*;
2046
2047    #[test]
2048    fn extract_value_string() {
2049        let line = br#""hello","next""#;
2050        let (val, end) = extract_value(line, 0).unwrap();
2051        assert_eq!(val, b"\"hello\"");
2052        assert_eq!(end, 7);
2053    }
2054
2055    #[test]
2056    fn extract_value_number() {
2057        let line = b"42,next";
2058        let (val, end) = extract_value(line, 0).unwrap();
2059        assert_eq!(val, b"42");
2060        assert_eq!(end, 2);
2061    }
2062
2063    #[test]
2064    fn extract_value_bool() {
2065        let line = b"true,next";
2066        let (val, end) = extract_value(line, 0).unwrap();
2067        assert_eq!(val, b"true");
2068        assert_eq!(end, 4);
2069    }
2070
2071    #[test]
2072    fn extract_value_null() {
2073        let line = b"null,next";
2074        let (val, end) = extract_value(line, 0).unwrap();
2075        assert_eq!(val, b"null");
2076        assert_eq!(end, 4);
2077    }
2078
2079    #[test]
2080    fn extract_value_object() {
2081        let line = br#"{"a":1,"b":"x"},next"#;
2082        let (val, end) = extract_value(line, 0).unwrap();
2083        assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
2084        assert_eq!(end, 15);
2085    }
2086
2087    #[test]
2088    fn extract_value_array() {
2089        let line = b"[1,2,3],next";
2090        let (val, end) = extract_value(line, 0).unwrap();
2091        assert_eq!(val, b"[1,2,3]");
2092        assert_eq!(end, 7);
2093    }
2094
2095    #[test]
2096    fn extract_value_string_with_escapes() {
2097        let line = br#""he\"llo",next"#;
2098        let (val, end) = extract_value(line, 0).unwrap();
2099        assert_eq!(val, br#""he\"llo""#.to_vec());
2100        assert_eq!(end, 9);
2101    }
2102
2103    #[test]
2104    fn parse_line_simple() {
2105        let line = br#"{"a":1,"b":"x"}"#;
2106        let (parts, values) = parse_line(line).unwrap();
2107        assert_eq!(parts.len(), 3); // {"a": , ,"b": , }
2108        assert_eq!(values.len(), 2);
2109        assert_eq!(values[0], b"1");
2110        assert_eq!(values[1], b"\"x\"");
2111        assert_eq!(parts[0], br#"{"a":"#.to_vec());
2112        assert_eq!(parts[1], br#","b":"#.to_vec());
2113        assert_eq!(parts[2], b"}");
2114    }
2115
2116    #[test]
2117    fn roundtrip_simple() {
2118        let data = br#"{"a":1,"b":"x"}
2119{"a":2,"b":"y"}
2120{"a":3,"b":"z"}
2121"#;
2122        let result = preprocess(data).expect("should produce transform");
2123        let restored = reverse(&result.data, &result.metadata);
2124        assert_eq!(
2125            String::from_utf8_lossy(&restored),
2126            String::from_utf8_lossy(data),
2127        );
2128        assert_eq!(restored, data.to_vec());
2129    }
2130
2131    #[test]
2132    fn roundtrip_no_trailing_newline() {
2133        let data = br#"{"a":1,"b":"x"}
2134{"a":2,"b":"y"}
2135{"a":3,"b":"z"}"#;
2136        let result = preprocess(data).expect("should produce transform");
2137        let restored = reverse(&result.data, &result.metadata);
2138        assert_eq!(restored, data.to_vec());
2139    }
2140
2141    #[test]
2142    fn roundtrip_nested_values() {
2143        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2144{"id":2,"meta":{"x":30,"y":40}}
2145{"id":3,"meta":{"x":50,"y":60}}
2146{"id":4,"meta":{"x":70,"y":80}}
2147{"id":5,"meta":{"x":90,"y":100}}
2148"#;
2149        let result = preprocess(data).expect("should produce transform");
2150        let restored = reverse(&result.data, &result.metadata);
2151        assert_eq!(restored, data.to_vec());
2152    }
2153
2154    #[test]
2155    fn roundtrip_mixed_types() {
2156        let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
2157{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
2158{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
2159{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
2160{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
2161"#;
2162        let result = preprocess(data).expect("should produce transform");
2163        let restored = reverse(&result.data, &result.metadata);
2164        assert_eq!(restored, data.to_vec());
2165    }
2166
2167    #[test]
2168    fn schema_mismatch_too_few_returns_none() {
2169        // Different keys on different lines, but each group has < MIN_GROUP_ROWS.
2170        let data = br#"{"a":1,"b":2}
2171{"a":1,"c":3}
2172"#;
2173        assert!(preprocess(data).is_none());
2174    }
2175
2176    #[test]
2177    fn different_num_keys_too_few_returns_none() {
2178        let data = br#"{"a":1,"b":2}
2179{"a":1}
2180"#;
2181        assert!(preprocess(data).is_none());
2182    }
2183
2184    #[test]
2185    fn single_line_returns_none() {
2186        let data = br#"{"a":1,"b":2}
2187"#;
2188        assert!(preprocess(data).is_none());
2189    }
2190
2191    #[test]
2192    fn empty_returns_none() {
2193        assert!(preprocess(b"").is_none());
2194    }
2195
2196    #[test]
2197    fn column_layout_groups_similar_values() {
2198        let data = br#"{"type":"page_view","user":"alice"}
2199{"type":"api_call","user":"alice"}
2200{"type":"click","user":"bob"}
2201"#;
2202        let result = preprocess(data).unwrap();
2203
2204        // The columnar data should have type values grouped, then user values grouped.
2205        let col_data = &result.data;
2206        let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
2207        assert_eq!(cols.len(), 2);
2208
2209        // Column 0 = type values.
2210        let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
2211        assert_eq!(type_vals.len(), 3);
2212        assert_eq!(type_vals[0], br#""page_view""#);
2213        assert_eq!(type_vals[1], br#""api_call""#);
2214        assert_eq!(type_vals[2], br#""click""#);
2215
2216        // Column 1 = user values.
2217        let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2218        assert_eq!(user_vals.len(), 3);
2219        assert_eq!(user_vals[0], br#""alice""#);
2220        assert_eq!(user_vals[1], br#""alice""#);
2221        assert_eq!(user_vals[2], br#""bob""#);
2222    }
2223
2224    #[test]
2225    fn roundtrip_string_with_escaped_chars() {
2226        let data = br#"{"msg":"he said \"hi\"","val":1}
2227{"msg":"line\nbreak","val":2}
2228{"msg":"tab\there","val":3}
2229{"msg":"back\\slash","val":4}
2230{"msg":"normal text","val":5}
2231"#;
2232        let result = preprocess(data).expect("should produce transform");
2233        let restored = reverse(&result.data, &result.metadata);
2234        assert_eq!(restored, data.to_vec());
2235    }
2236
2237    #[test]
2238    fn roundtrip_negative_and_float_numbers() {
2239        let data = br#"{"x":-3.14,"y":0}
2240{"x":2.718,"y":-1}
2241{"x":0.001,"y":999}
2242{"x":-100,"y":-200}
2243{"x":42.0,"y":7}
2244"#;
2245        let result = preprocess(data).expect("should produce transform");
2246        let restored = reverse(&result.data, &result.metadata);
2247        assert_eq!(restored, data.to_vec());
2248    }
2249
2250    /// Test that the transform+reverse is lossless even for tiny inputs
2251    /// by repeating them to pass the size check threshold.
2252    #[test]
2253    fn reverse_roundtrip_small_data() {
2254        // Verify parse_line works on small lines.
2255        let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
2256        assert_eq!(vals.len(), 2);
2257        assert_eq!(parts.len(), 3);
2258
2259        // 2-row data might fail the size check, so repeat to get enough rows.
2260        let big_data = br#"{"x":-3.14,"y":0}
2261{"x":2.718,"y":-1}
2262"#
2263        .repeat(20);
2264        let result = preprocess(&big_data).expect("should produce transform with 40 rows");
2265        let restored = reverse(&result.data, &result.metadata);
2266        assert_eq!(restored, big_data);
2267    }
2268
2269    // --- Strategy 2 (grouped) tests ---
2270
2271    #[test]
2272    fn grouped_roundtrip_two_schemas() {
2273        // Two different schemas, each with >= MIN_GROUP_ROWS rows.
2274        let mut data = Vec::new();
2275        for i in 0..10 {
2276            data.extend_from_slice(
2277                format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
2278            );
2279            data.push(b'\n');
2280        }
2281        for i in 10..20 {
2282            data.extend_from_slice(
2283                format!(
2284                    r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
2285                    i, i, i
2286                )
2287                .as_bytes(),
2288            );
2289            data.push(b'\n');
2290        }
2291        let result = preprocess(&data).expect("should produce grouped transform");
2292        // Should be version 2 (grouped).
2293        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2294        let restored = reverse(&result.data, &result.metadata);
2295        assert_eq!(restored, data);
2296    }
2297
2298    #[test]
2299    fn grouped_roundtrip_interleaved_schemas() {
2300        // Interleaved schemas: alternating between two different key sets.
2301        let mut data = Vec::new();
2302        for i in 0..20 {
2303            if i % 2 == 0 {
2304                data.extend_from_slice(
2305                    format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
2306                );
2307            } else {
2308                data.extend_from_slice(
2309                    format!(
2310                        r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
2311                        i, i, i
2312                    )
2313                    .as_bytes(),
2314                );
2315            }
2316            data.push(b'\n');
2317        }
2318        let result = preprocess(&data).expect("should produce grouped transform");
2319        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2320        let restored = reverse(&result.data, &result.metadata);
2321        assert_eq!(restored, data);
2322    }
2323
2324    #[test]
2325    fn grouped_roundtrip_with_residuals() {
2326        // Two large groups + a few unique-schema rows (residuals).
2327        let mut data = Vec::new();
2328        // Group A: 8 rows.
2329        for i in 0..8 {
2330            data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
2331            data.push(b'\n');
2332        }
2333        // 2 unique rows (will be residual).
2334        data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
2335        data.push(b'\n');
2336        data.extend_from_slice(br#"{"p":"q"}"#);
2337        data.push(b'\n');
2338        // Group B: 6 rows.
2339        for i in 0..6 {
2340            data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
2341            data.push(b'\n');
2342        }
2343        let result = preprocess(&data).expect("should produce grouped transform");
2344        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2345        let restored = reverse(&result.data, &result.metadata);
2346        assert_eq!(
2347            String::from_utf8_lossy(&restored),
2348            String::from_utf8_lossy(&data),
2349        );
2350        assert_eq!(restored, data);
2351    }
2352
2353    #[test]
2354    fn grouped_roundtrip_no_trailing_newline() {
2355        let mut data = Vec::new();
2356        for i in 0..6 {
2357            data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
2358            data.push(b'\n');
2359        }
2360        for i in 0..6 {
2361            data.extend_from_slice(
2362                format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
2363            );
2364            if i < 5 {
2365                data.push(b'\n');
2366            }
2367            // Last line: no trailing newline.
2368        }
2369        let result = preprocess(&data).expect("should produce grouped transform");
2370        let restored = reverse(&result.data, &result.metadata);
2371        assert_eq!(restored, data);
2372    }
2373
2374    #[test]
2375    fn uniform_still_preferred_over_grouped() {
2376        // All rows same schema — should use Strategy 1 (version 1), not Strategy 2.
2377        let data = br#"{"a":1,"b":"x"}
2378{"a":2,"b":"y"}
2379{"a":3,"b":"z"}
2380{"a":4,"b":"w"}
2381{"a":5,"b":"v"}
2382"#;
2383        let result = preprocess(data).expect("should produce transform");
2384        assert_eq!(
2385            result.metadata[0], METADATA_VERSION_UNIFORM,
2386            "uniform schema should use Strategy 1"
2387        );
2388        let restored = reverse(&result.data, &result.metadata);
2389        assert_eq!(restored, data.to_vec());
2390    }
2391
2392    #[test]
2393    fn grouped_gharchive_simulation() {
2394        // Simulates GitHub Archive: most rows have 7 keys, some have 8.
2395        let mut data = Vec::new();
2396        for i in 0..50 {
2397            if i % 5 == 0 {
2398                // 8-key rows (with org).
2399                data.extend_from_slice(
2400                    format!(
2401                        r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
2402                        i, i, i, i
2403                    )
2404                    .as_bytes(),
2405                );
2406            } else {
2407                // 7-key rows (no org).
2408                data.extend_from_slice(
2409                    format!(
2410                        r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
2411                        i, i, i
2412                    )
2413                    .as_bytes(),
2414                );
2415            }
2416            data.push(b'\n');
2417        }
2418        let result = preprocess(&data).expect("should produce grouped transform");
2419        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2420        let restored = reverse(&result.data, &result.metadata);
2421        assert_eq!(restored, data);
2422    }
2423
2424    #[test]
2425    fn grouped_nested_flatten_per_group() {
2426        // Groups with nested objects should get per-group nested flatten.
2427        // Two schemas (with/without extra field), nested objects in both.
2428        let mut data = Vec::new();
2429        for i in 0..30 {
2430            if i % 3 == 0 {
2431                // Schema B: 4 keys including nested + extra.
2432                data.extend_from_slice(
2433                    format!(
2434                        r#"{{"id":{},"info":{{"a":{},"b":{}}},"tag":"b","extra":"yes"}}"#,
2435                        i,
2436                        i * 10,
2437                        i * 20
2438                    )
2439                    .as_bytes(),
2440                );
2441            } else {
2442                // Schema A: 3 keys including nested.
2443                data.extend_from_slice(
2444                    format!(
2445                        r#"{{"id":{},"info":{{"a":{},"b":{}}},"tag":"a"}}"#,
2446                        i,
2447                        i * 10,
2448                        i * 20
2449                    )
2450                    .as_bytes(),
2451                );
2452            }
2453            data.push(b'\n');
2454        }
2455        let result = preprocess(&data).expect("should produce grouped transform");
2456        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2457        let restored = reverse(&result.data, &result.metadata);
2458        assert_eq!(restored, data, "grouped nested flatten roundtrip failed");
2459    }
2460
2461    #[test]
2462    fn grouped_discriminator_two_pass() {
2463        // Discriminator sub-grouping should be tried and the smaller result picked.
2464        // Build data with a discriminator column ("type") and varying nested payloads.
2465        let mut data = Vec::new();
2466        for i in 0..60 {
2467            let etype = if i % 2 == 0 { "push" } else { "create" };
2468            data.extend_from_slice(
2469                format!(
2470                    r#"{{"id":{},"type":"{}","payload":{{"ref":"r{}","size":{}}}}}"#,
2471                    i, etype, i, i * 10
2472                )
2473                .as_bytes(),
2474            );
2475            data.push(b'\n');
2476        }
2477        let result = preprocess(&data).expect("should produce transform");
2478        let restored = reverse(&result.data, &result.metadata);
2479        assert_eq!(
2480            restored, data,
2481            "discriminator two-pass roundtrip failed"
2482        );
2483    }
2484
2485    // --- Nested decomposition tests ---
2486
2487    #[test]
2488    fn test_nested_decomposition_basic() {
2489        // Simple nested object decomposed correctly.
2490        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2491{"id":2,"meta":{"x":30,"y":40}}
2492{"id":3,"meta":{"x":50,"y":60}}
2493"#;
2494        let result = preprocess(data).expect("should produce transform");
2495        assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
2496
2497        // The columnar data should have expanded columns.
2498        let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
2499        // Original: 2 cols (id, meta). After flattening: 3 cols (id, meta.x, meta.y).
2500        assert_eq!(
2501            cols.len(),
2502            3,
2503            "should have 3 columns after flattening: got {}",
2504            cols.len()
2505        );
2506
2507        // Verify sub-columns contain the extracted values.
2508        let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
2509        assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
2510
2511        let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
2512        assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
2513    }
2514
2515    #[test]
2516    fn test_nested_roundtrip() {
2517        // Flatten -> unflatten produces byte-exact original.
2518        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
2519{"id":2,"meta":{"x":30,"y":40}}
2520{"id":3,"meta":{"x":50,"y":60}}
2521"#;
2522        let result = preprocess(data).expect("should produce transform");
2523        let restored = reverse(&result.data, &result.metadata);
2524        assert_eq!(
2525            String::from_utf8_lossy(&restored),
2526            String::from_utf8_lossy(data),
2527        );
2528        assert_eq!(restored, data.to_vec());
2529    }
2530
2531    #[test]
2532    fn test_nested_mixed_schemas() {
2533        // Different nested objects per row (some keys missing -> null).
2534        let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
2535{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
2536{"ts":"c","meta":{"query":"pricing","results_count":25}}
2537{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
2538{"ts":"e","meta":{"query":"api docs","results_count":41}}
2539"#;
2540        let result = preprocess(data).expect("should produce transform");
2541        let restored = reverse(&result.data, &result.metadata);
2542        assert_eq!(
2543            String::from_utf8_lossy(&restored),
2544            String::from_utf8_lossy(data),
2545        );
2546        assert_eq!(restored, data.to_vec());
2547    }
2548
2549    #[test]
2550    fn test_nested_no_nested_objects() {
2551        // Returns None when no nested objects — flat data should still work.
2552        let data = br#"{"a":1,"b":"x"}
2553{"a":2,"b":"y"}
2554{"a":3,"b":"z"}
2555"#;
2556        let result = preprocess(data).expect("should produce transform");
2557        let restored = reverse(&result.data, &result.metadata);
2558        assert_eq!(restored, data.to_vec());
2559
2560        // Verify the metadata has has_nested=0 since no nested objects.
2561        // The nested flag is appended after template parts.
2562        // For uniform, metadata starts with version(1) + num_rows(4) + num_cols(2) +
2563        // trailing_newline(1) + num_parts(2) + parts.
2564        // After those parts, there should be a 0 byte (has_nested=0).
2565        let meta = &result.metadata;
2566        let last_byte = meta[meta.len() - 1];
2567        assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
2568    }
2569
2570    #[test]
2571    fn test_nested_real_corpus() {
2572        // Test with data shaped like the test-ndjson.ndjson corpus.
2573        let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
2574{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
2575{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
2576{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
2577{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
2578"#;
2579        let result = preprocess(data).expect("should produce transform");
2580        let restored = reverse(&result.data, &result.metadata);
2581        assert_eq!(
2582            String::from_utf8_lossy(&restored),
2583            String::from_utf8_lossy(data),
2584        );
2585        assert_eq!(restored, data.to_vec());
2586    }
2587
2588    #[test]
2589    fn test_nested_roundtrip_with_null_values() {
2590        // Some rows have null for the nested field.
2591        let data = br#"{"id":1,"meta":{"x":10}}
2592{"id":2,"meta":null}
2593{"id":3,"meta":{"x":30}}
2594{"id":4,"meta":null}
2595{"id":5,"meta":{"x":50}}
2596"#;
2597        let result = preprocess(data).expect("should produce transform");
2598        let restored = reverse(&result.data, &result.metadata);
2599        assert_eq!(restored, data.to_vec());
2600    }
2601
2602    #[test]
2603    fn test_nested_string_values_preserved_exact() {
2604        // Verify that string values in nested objects preserve exact bytes (with quotes).
2605        let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2606{"id":2,"meta":{"name":"Bob","score":200}}
2607{"id":3,"meta":{"name":"Charlie","score":300}}
2608"#;
2609        let result = preprocess(data).expect("should produce transform");
2610        let restored = reverse(&result.data, &result.metadata);
2611        assert_eq!(restored, data.to_vec());
2612    }
2613
2614    #[test]
2615    fn test_parse_nested_object_kv() {
2616        let obj = br#"{"query":"benchmark","results_count":14}"#;
2617        let pairs = parse_nested_object_kv(obj).unwrap();
2618        assert_eq!(pairs.len(), 2);
2619        assert_eq!(pairs[0].0, b"query");
2620        assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2621        assert_eq!(pairs[1].0, b"results_count");
2622        assert_eq!(pairs[1].1, b"14");
2623    }
2624
2625    #[test]
2626    fn test_nested_varying_subkeys_roundtrip() {
2627        // Regression: rows with varying sub-keys in nested objects must
2628        // round-trip byte-exact. Even rows have `extra`, odd rows don't.
2629        let mut lines = Vec::new();
2630        for i in 0..50 {
2631            let line = if i % 2 == 0 {
2632                format!("{{\"id\":{},\"meta\":{{\"x\":{},\"extra\":{}}}}}", i, i, i)
2633            } else {
2634                format!("{{\"id\":{},\"meta\":{{\"x\":{}}}}}", i, i)
2635            };
2636            lines.push(line);
2637        }
2638        let ndjson = lines.join("\n") + "\n";
2639        let data = ndjson.as_bytes();
2640
2641        let result = preprocess(data).expect("should produce transform");
2642        let restored = reverse(&result.data, &result.metadata);
2643        assert_eq!(
2644            std::str::from_utf8(&restored).unwrap(),
2645            std::str::from_utf8(data).unwrap(),
2646            "varying sub-keys roundtrip must be byte-exact"
2647        );
2648    }
2649
2650    #[test]
2651    fn test_nested_explicit_null_preserved() {
2652        // Explicit null values in nested objects must survive roundtrip.
2653        // `{"x":1,"y":null}` must NOT be collapsed to `{"x":1}`.
2654        let data = b"{\"id\":1,\"meta\":{\"x\":1,\"y\":null}}\n\
2655                     {\"id\":2,\"meta\":{\"x\":2,\"y\":null}}\n\
2656                     {\"id\":3,\"meta\":{\"x\":3,\"y\":null}}\n";
2657        let result = preprocess(data).expect("should produce transform");
2658        let restored = reverse(&result.data, &result.metadata);
2659        assert_eq!(
2660            std::str::from_utf8(&restored).unwrap(),
2661            std::str::from_utf8(data).unwrap(),
2662            "explicit null values must be preserved"
2663        );
2664    }
2665
2666    #[test]
2667    fn null_heavy_30_rows_roundtrip() {
2668        // Regression test: 30 rows with all-null column caused CRC mismatch.
2669        let mut data = Vec::new();
2670        for i in 0..30 {
2671            data.extend_from_slice(format!("{{\"id\":{},\"val\":null}}\n", i).as_bytes());
2672        }
2673        let result = preprocess(&data);
2674        if let Some(result) = result {
2675            let restored = reverse(&result.data, &result.metadata);
2676            assert_eq!(
2677                restored,
2678                data,
2679                "null-heavy 30-row roundtrip failed.\nOriginal len={}, Restored len={}\nOrig first 200: {:?}\nRest first 200: {:?}",
2680                data.len(),
2681                restored.len(),
2682                String::from_utf8_lossy(&data[..data.len().min(200)]),
2683                String::from_utf8_lossy(&restored[..restored.len().min(200)])
2684            );
2685        }
2686    }
2687
2688    #[test]
2689    fn null_heavy_60_rows_roundtrip() {
2690        // Regression test: 60 rows with multiple all-null columns.
2691        let mut data = Vec::new();
2692        for i in 0..60 {
2693            let name = if i % 10 == 0 {
2694                format!("\"user_{}\"", i)
2695            } else {
2696                "null".to_string()
2697            };
2698            data.extend_from_slice(
2699                format!("{{\"id\":{},\"name\":{},\"email\":null,\"score\":null,\"active\":null,\"tags\":null}}\n", i, name).as_bytes(),
2700            );
2701        }
2702        let result = preprocess(&data);
2703        if let Some(result) = result {
2704            let restored = reverse(&result.data, &result.metadata);
2705            assert_eq!(restored, data, "null-heavy 60-row ndjson roundtrip failed");
2706        }
2707    }
2708
2709    #[test]
2710    fn selective_columnar_roundtrip() {
2711        // Data with one low-cardinality column (type) and one high-cardinality + long column (payload).
2712        let mut data = Vec::new();
2713        for i in 0..50 {
2714            let event_type = match i % 3 {
2715                0 => "push",
2716                1 => "pull_request",
2717                _ => "create",
2718            };
2719            // Payload is unique per row and > 128 bytes avg to trigger selective.
2720            let payload = format!(
2721                "{{\"commits\":[{{\"sha\":\"abc{:04}def\",\"message\":\"commit message number {} with extra text to make it long enough for selective columnar threshold of 128 bytes average value length\"}}]}}",
2722                i, i
2723            );
2724            data.extend_from_slice(
2725                format!(
2726                    "{{\"id\":{},\"type\":\"{}\",\"payload\":{}}}\n",
2727                    i, event_type, payload
2728                )
2729                .as_bytes(),
2730            );
2731        }
2732        let result = preprocess(&data).expect("should preprocess");
2733        let restored = reverse(&result.data, &result.metadata);
2734        assert_eq!(restored, data, "selective columnar roundtrip failed");
2735    }
2736
2737    #[test]
2738    fn selective_columnar_uses_version3() {
2739        // Verify that high-cardinality + long columns trigger version=3.
2740        // Payload must exceed SELECTIVE_MIN_AVG_LEN (128) AND cardinality > 0.7.
2741        let mut data = Vec::new();
2742        for i in 0..50 {
2743            let payload = format!(
2744                "{{\"data\":\"unique_payload_{:04}\",\"extra\":\"padding_text_to_make_this_value_long_enough_to_exceed_the_128_byte_threshold_for_selective_columnar_detection_{:04}\"}}",
2745                i,
2746                i * 7
2747            );
2748            data.extend_from_slice(
2749                format!("{{\"type\":\"event\",\"payload\":{}}}\n", payload).as_bytes(),
2750            );
2751        }
2752        let result = preprocess(&data).expect("should preprocess");
2753        // Version byte should be 3 (selective).
2754        assert_eq!(
2755            result.metadata[0], METADATA_VERSION_SELECTIVE,
2756            "expected selective columnar (version=3), got version={}",
2757            result.metadata[0]
2758        );
2759        let restored = reverse(&result.data, &result.metadata);
2760        assert_eq!(restored, data, "selective columnar v3 roundtrip failed");
2761    }
2762
2763    #[test]
2764    fn selective_grouped_roundtrip() {
2765        // Two schema groups, each with a high-cardinality column (>128 bytes avg).
2766        let mut data = Vec::new();
2767        for i in 0..30 {
2768            let payload = format!(
2769                "{{\"sha\":\"hash_{:04}\",\"msg\":\"unique commit message number {} that is quite long and needs to exceed the 128 byte threshold for selective columnar to activate on this column\"}}",
2770                i, i
2771            );
2772            data.extend_from_slice(
2773                format!(
2774                    "{{\"id\":{},\"type\":\"push\",\"payload\":{}}}\n",
2775                    i, payload
2776                )
2777                .as_bytes(),
2778            );
2779        }
2780        for i in 0..20 {
2781            let body = format!(
2782                "{{\"title\":\"PR title {}\",\"body\":\"This is a long unique pull request body number {} with details and extra text to exceed the 128 byte threshold for the selective columnar transform\"}}",
2783                i, i
2784            );
2785            data.extend_from_slice(
2786                format!(
2787                    "{{\"id\":{},\"type\":\"pr\",\"payload\":{},\"org\":\"myorg\"}}\n",
2788                    100 + i,
2789                    body
2790                )
2791                .as_bytes(),
2792            );
2793        }
2794        let result = preprocess(&data).expect("should preprocess");
2795        // Should use grouped (version=2 at top level).
2796        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
2797        let restored = reverse(&result.data, &result.metadata);
2798        assert_eq!(restored, data, "selective grouped roundtrip failed");
2799    }
2800
2801    #[test]
2802    fn selective_all_low_cardinality_stays_uniform() {
2803        // When all columns are low-cardinality, version=1 should still be used.
2804        let mut data = Vec::new();
2805        for i in 0..50 {
2806            let status = match i % 2 {
2807                0 => "active",
2808                _ => "inactive",
2809            };
2810            data.extend_from_slice(
2811                format!("{{\"type\":\"event\",\"status\":\"{}\"}}\n", status).as_bytes(),
2812            );
2813        }
2814        let result = preprocess(&data).expect("should preprocess");
2815        // Version should be 1 (uniform), not 3 (selective).
2816        assert_eq!(
2817            result.metadata[0], METADATA_VERSION_UNIFORM,
2818            "low-cardinality data should use uniform (version=1)"
2819        );
2820    }
2821
2822    #[test]
2823    fn selective_columnar_single_inline_column() {
2824        // Only one column is inline (the rest extracted).
2825        let mut data = Vec::new();
2826        for i in 0..30 {
2827            let unique_msg = format!(
2828                "A unique message for row {} with enough text to exceed the 128 byte threshold for selective columnar detection, adding padding here to be safe: extra_{:04}",
2829                i,
2830                i * 13
2831            );
2832            data.extend_from_slice(
2833                format!(
2834                    "{{\"type\":\"log\",\"level\":\"info\",\"msg\":\"{}\"}}\n",
2835                    unique_msg
2836                )
2837                .as_bytes(),
2838            );
2839        }
2840        let result = preprocess(&data).expect("should preprocess");
2841        let restored = reverse(&result.data, &result.metadata);
2842        assert_eq!(restored, data, "single-inline-column roundtrip failed");
2843    }
2844}