Skip to main content

datacortex_core/format/
ndjson.rs

1//! NDJSON columnar reorg — lossless transform that reorders row-oriented
2//! NDJSON data into column-oriented layout.
3//!
4//! Two strategies:
5//!
6//! **Strategy 1 (uniform):** All rows share the same schema (keys in same order).
7//!   Row-oriented (before):
8//!     {"ts":"2026-03-15T10:30:00.001Z","type":"page_view","user":"usr_a1b2c3d4"}
9//!     {"ts":"2026-03-15T10:30:00.234Z","type":"api_call","user":"usr_a1b2c3d4"}
10//!   Column-oriented (after):
11//!     [ts values] "2026-03-15T10:30:00.001Z" \x01 "2026-03-15T10:30:00.234Z" \x00
12//!     [type values] "page_view" \x01 "api_call" \x00
13//!     [user values] "usr_a1b2c3d4" \x01 "usr_a1b2c3d4"
14//!
15//! **Strategy 2 (grouped):** Rows have diverse schemas (e.g., GitHub Archive events).
16//!   Groups rows by schema, applies Strategy 1 per group, stores residual rows raw.
17//!   Metadata version byte distinguishes: 1 = uniform, 2 = grouped.
18//!
19//! Separators:
20//!   \x00 = column separator (cannot appear in valid JSON text)
21//!   \x01 = value separator within a column (cannot appear in valid JSON)
22
23use super::transform::TransformResult;
24use std::collections::HashMap;
25
26const COL_SEP: u8 = 0x00;
27const VAL_SEP: u8 = 0x01;
28const METADATA_VERSION_UNIFORM: u8 = 1;
29const METADATA_VERSION_GROUPED: u8 = 2;
30
31/// Minimum rows in a schema group for it to be columnarized (not residual).
32const MIN_GROUP_ROWS: usize = 5;
33
34/// A schema group: template parts + list of (row_index, parsed_values).
35type SchemaGroup = (Vec<Vec<u8>>, Vec<(usize, Vec<Vec<u8>>)>);
36
37/// Extract the raw value bytes from a JSON line at a given position.
38/// `pos` should point to the first byte of the value (after `:`).
39/// Returns (value_bytes, end_position).
40///
41/// Handles: strings, numbers, booleans, null, nested objects, nested arrays.
42fn extract_value(line: &[u8], mut pos: usize) -> Option<(Vec<u8>, usize)> {
43    // Skip whitespace after colon.
44    while pos < line.len() && line[pos].is_ascii_whitespace() {
45        pos += 1;
46    }
47    if pos >= line.len() {
48        return None;
49    }
50
51    let start = pos;
52    match line[pos] {
53        b'"' => {
54            // String value — scan to closing unescaped quote.
55            pos += 1;
56            let mut escaped = false;
57            while pos < line.len() {
58                if escaped {
59                    escaped = false;
60                } else if line[pos] == b'\\' {
61                    escaped = true;
62                } else if line[pos] == b'"' {
63                    pos += 1;
64                    return Some((line[start..pos].to_vec(), pos));
65                }
66                pos += 1;
67            }
68            None // Unterminated string.
69        }
70        b'{' => {
71            // Nested object — match braces, respecting strings.
72            let mut depth = 1;
73            pos += 1;
74            while pos < line.len() && depth > 0 {
75                match line[pos] {
76                    b'"' => {
77                        // Skip over string contents.
78                        pos += 1;
79                        let mut escaped = false;
80                        while pos < line.len() {
81                            if escaped {
82                                escaped = false;
83                            } else if line[pos] == b'\\' {
84                                escaped = true;
85                            } else if line[pos] == b'"' {
86                                break;
87                            }
88                            pos += 1;
89                        }
90                    }
91                    b'{' => depth += 1,
92                    b'}' => depth -= 1,
93                    _ => {}
94                }
95                pos += 1;
96            }
97            if depth != 0 || pos > line.len() {
98                return None; // Unterminated object.
99            }
100            Some((line[start..pos].to_vec(), pos))
101        }
102        b'[' => {
103            // Nested array — match brackets, respecting strings.
104            let mut depth = 1;
105            pos += 1;
106            while pos < line.len() && depth > 0 {
107                match line[pos] {
108                    b'"' => {
109                        pos += 1;
110                        let mut escaped = false;
111                        while pos < line.len() {
112                            if escaped {
113                                escaped = false;
114                            } else if line[pos] == b'\\' {
115                                escaped = true;
116                            } else if line[pos] == b'"' {
117                                break;
118                            }
119                            pos += 1;
120                        }
121                    }
122                    b'[' => depth += 1,
123                    b']' => depth -= 1,
124                    _ => {}
125                }
126                pos += 1;
127            }
128            if depth != 0 || pos > line.len() {
129                return None; // Unterminated array.
130            }
131            Some((line[start..pos].to_vec(), pos))
132        }
133        _ => {
134            // Number, boolean, null — scan until , or } or ] or whitespace.
135            while pos < line.len() {
136                match line[pos] {
137                    b',' | b'}' | b']' => break,
138                    _ if line[pos].is_ascii_whitespace() => break,
139                    _ => pos += 1,
140                }
141            }
142            if pos == start {
143                None
144            } else {
145                Some((line[start..pos].to_vec(), pos))
146            }
147        }
148    }
149}
150
151/// Parse a single JSON line into (template_parts, values).
152///
153/// Template parts are the structural bytes between values:
154///   Part 0 = everything from { up to and including the : before value 0
155///   Part 1 = everything from after value 0 up to and including : before value 1
156///   ...
157///   Part N = everything from after the last value to end of line (including } and \n)
158///
159/// Returns None if the line is not a flat-ish JSON object (we handle nested values
160/// as opaque blobs, but the top-level structure must be key:value pairs).
161type ParsedLine = (Vec<Vec<u8>>, Vec<Vec<u8>>);
162
163fn parse_line(line: &[u8]) -> Option<ParsedLine> {
164    let mut pos = 0;
165
166    // Skip leading whitespace.
167    while pos < line.len() && line[pos].is_ascii_whitespace() {
168        pos += 1;
169    }
170    if pos >= line.len() || line[pos] != b'{' {
171        return None;
172    }
173
174    let mut parts: Vec<Vec<u8>> = Vec::new();
175    let mut values: Vec<Vec<u8>> = Vec::new();
176    let mut part_start = 0;
177
178    pos += 1; // Skip opening {.
179
180    loop {
181        // Skip whitespace.
182        while pos < line.len() && line[pos].is_ascii_whitespace() {
183            pos += 1;
184        }
185        if pos >= line.len() {
186            return None;
187        }
188
189        // Check for closing brace (end of object).
190        if line[pos] == b'}' {
191            // Capture the final part: everything from part_start to end of line.
192            parts.push(line[part_start..].to_vec());
193            break;
194        }
195
196        // Expect a key string.
197        if line[pos] != b'"' {
198            return None;
199        }
200        // Skip over the key string.
201        pos += 1;
202        let mut escaped = false;
203        while pos < line.len() {
204            if escaped {
205                escaped = false;
206            } else if line[pos] == b'\\' {
207                escaped = true;
208            } else if line[pos] == b'"' {
209                pos += 1;
210                break;
211            }
212            pos += 1;
213        }
214
215        // Skip whitespace, expect colon.
216        while pos < line.len() && line[pos].is_ascii_whitespace() {
217            pos += 1;
218        }
219        if pos >= line.len() || line[pos] != b':' {
220            return None;
221        }
222        pos += 1; // Skip colon.
223
224        // Everything from part_start up to here is a "template part".
225        parts.push(line[part_start..pos].to_vec());
226
227        // Extract the value.
228        let (value, value_end) = extract_value(line, pos)?;
229        values.push(value);
230        pos = value_end;
231
232        // Mark the start of the next part.
233        part_start = pos;
234
235        // Skip whitespace.
236        while pos < line.len() && line[pos].is_ascii_whitespace() {
237            pos += 1;
238        }
239        if pos >= line.len() {
240            return None;
241        }
242
243        // Expect comma or closing brace.
244        if line[pos] == b',' {
245            pos += 1;
246        } else if line[pos] == b'}' {
247            // Will be caught at the top of the loop next iteration — but we
248            // need to NOT advance pos, so the } check above catches it.
249            // Actually, let's just handle it here.
250            parts.push(line[part_start..].to_vec());
251            break;
252        } else {
253            return None; // Unexpected character.
254        }
255    }
256
257    if values.is_empty() {
258        return None;
259    }
260
261    Some((parts, values))
262}
263
264/// Split NDJSON data into lines (without newline characters).
265fn split_lines(data: &[u8]) -> Vec<&[u8]> {
266    let mut lines: Vec<&[u8]> = Vec::new();
267    let mut start = 0;
268    for i in 0..data.len() {
269        if data[i] == b'\n' {
270            lines.push(&data[start..i]);
271            start = i + 1;
272        }
273    }
274    if start < data.len() {
275        lines.push(&data[start..]);
276    }
277    lines
278}
279
280/// Build columnar data from parsed lines that share the same template.
281/// Returns (col_data, metadata) for a uniform group.
282fn build_uniform_columnar(
283    template_parts: &[Vec<u8>],
284    columns: &[Vec<Vec<u8>>],
285    num_rows: usize,
286    has_trailing_newline: bool,
287) -> (Vec<u8>, Vec<u8>) {
288    let num_cols = columns.len();
289
290    // Build column data: values separated by \x01, columns separated by \x00.
291    let mut col_data = Vec::new();
292    for (ci, col) in columns.iter().enumerate() {
293        for (ri, val) in col.iter().enumerate() {
294            col_data.extend_from_slice(val);
295            if ri < num_rows - 1 {
296                col_data.push(VAL_SEP);
297            }
298        }
299        if ci < num_cols - 1 {
300            col_data.push(COL_SEP);
301        }
302    }
303
304    // Build metadata: version + num_rows + num_cols + trailing_newline + template parts.
305    let mut metadata = Vec::new();
306    metadata.push(METADATA_VERSION_UNIFORM);
307    metadata.extend_from_slice(&(num_rows as u32).to_le_bytes());
308    metadata.extend_from_slice(&(num_cols as u16).to_le_bytes());
309    metadata.push(if has_trailing_newline { 1 } else { 0 });
310    metadata.extend_from_slice(&(template_parts.len() as u16).to_le_bytes());
311    for part in template_parts {
312        metadata.extend_from_slice(&(part.len() as u16).to_le_bytes());
313        metadata.extend_from_slice(part);
314    }
315
316    (col_data, metadata)
317}
318
319/// Strategy 1: Uniform schema — all rows must have the same template.
320/// Returns None if schemas differ.
321fn preprocess_uniform(
322    non_empty: &[&[u8]],
323    has_trailing_newline: bool,
324) -> Option<(Vec<u8>, Vec<u8>)> {
325    if non_empty.len() < 2 {
326        return None;
327    }
328
329    let (template_parts, first_values) = parse_line(non_empty[0])?;
330    let num_cols = first_values.len();
331    if template_parts.len() != num_cols + 1 {
332        return None;
333    }
334
335    let mut columns: Vec<Vec<Vec<u8>>> = Vec::with_capacity(num_cols);
336    for v in &first_values {
337        columns.push(vec![v.clone()]);
338    }
339
340    for &line in &non_empty[1..] {
341        let (parts, values) = parse_line(line)?;
342        if values.len() != num_cols || parts.len() != template_parts.len() {
343            return None;
344        }
345        for (a, b) in parts.iter().zip(template_parts.iter()) {
346            if a != b {
347                return None;
348            }
349        }
350        for (col, val) in values.iter().enumerate() {
351            columns[col].push(val.clone());
352        }
353    }
354
355    Some(build_uniform_columnar(
356        &template_parts,
357        &columns,
358        non_empty.len(),
359        has_trailing_newline,
360    ))
361}
362
363/// Strategy 2: Group-by-schema — group rows by template, columnarize each group.
364///
365/// Metadata format (version=2):
366///   version: u8 = 2
367///   has_trailing_newline: u8
368///   total_rows: u32 LE
369///   num_groups: u16 LE
370///   for each group:
371///     num_rows: u32 LE
372///     row_indices: [u32 LE * num_rows]
373///     group_metadata_len: u32 LE
374///     group_metadata: [bytes]  (Strategy 1 metadata for this group)
375///   residual_count: u32 LE
376///   residual_indices: [u32 LE * residual_count]
377///
378/// Data format:
379///   for each group:
380///     data_len: u32 LE
381///     data: [bytes]  (columnar data for this group)
382///   residual_data: [bytes]  (raw lines joined by \n)
383fn preprocess_grouped(
384    non_empty: &[&[u8]],
385    has_trailing_newline: bool,
386) -> Option<(Vec<u8>, Vec<u8>)> {
387    if non_empty.len() < MIN_GROUP_ROWS {
388        return None;
389    }
390
391    // Parse all lines and group by template (the template parts identify the schema).
392    // We use the template parts as the group key.
393    let mut parsed: Vec<Option<ParsedLine>> = Vec::with_capacity(non_empty.len());
394    for &line in non_empty {
395        parsed.push(parse_line(line));
396    }
397
398    // Group rows by template. Key = template parts (as bytes for hashing).
399    // We store groups as: template_key -> (template_parts, vec of (row_index, values)).
400    let mut group_map: HashMap<Vec<u8>, SchemaGroup> = HashMap::new();
401    let mut residual_indices: Vec<usize> = Vec::new();
402
403    for (idx, parsed_line) in parsed.into_iter().enumerate() {
404        if let Some((parts, values)) = parsed_line {
405            // Build a hashable key from the template parts.
406            let mut key = Vec::new();
407            for part in &parts {
408                key.extend_from_slice(&(part.len() as u32).to_le_bytes());
409                key.extend_from_slice(part);
410            }
411            group_map
412                .entry(key)
413                .or_insert_with(|| (parts, Vec::new()))
414                .1
415                .push((idx, values));
416        } else {
417            // Unparseable line goes to residual.
418            residual_indices.push(idx);
419        }
420    }
421
422    // Separate groups into qualifying (>= MIN_GROUP_ROWS) and residual.
423    let mut groups: Vec<SchemaGroup> = Vec::new();
424    for (_key, (template_parts, rows)) in group_map {
425        if rows.len() >= MIN_GROUP_ROWS {
426            groups.push((template_parts, rows));
427        } else {
428            // Too few rows — send to residual.
429            for (idx, _) in &rows {
430                residual_indices.push(*idx);
431            }
432        }
433    }
434
435    // Need at least 1 qualifying group for this to be useful.
436    if groups.is_empty() {
437        return None;
438    }
439
440    // Sort groups by their first row index for deterministic output.
441    groups.sort_by_key(|(_, rows)| rows[0].0);
442    residual_indices.sort_unstable();
443
444    // Build per-group columnar data and metadata.
445    struct GroupOutput {
446        row_indices: Vec<u32>,
447        col_data: Vec<u8>,
448        group_metadata: Vec<u8>,
449    }
450
451    let mut group_outputs: Vec<GroupOutput> = Vec::with_capacity(groups.len());
452
453    for (template_parts, rows) in &groups {
454        let num_cols = template_parts.len() - 1;
455        let mut columns: Vec<Vec<Vec<u8>>> = (0..num_cols).map(|_| Vec::new()).collect();
456        let mut row_indices: Vec<u32> = Vec::with_capacity(rows.len());
457
458        for (idx, values) in rows {
459            row_indices.push(*idx as u32);
460            for (col, val) in values.iter().enumerate() {
461                columns[col].push(val.clone());
462            }
463        }
464
465        // Build columnar data for this group (trailing_newline=false for sub-groups).
466        let (col_data, group_metadata) =
467            build_uniform_columnar(template_parts, &columns, rows.len(), false);
468
469        group_outputs.push(GroupOutput {
470            row_indices,
471            col_data,
472            group_metadata,
473        });
474    }
475
476    // Build the combined data blob.
477    let mut data_out = Vec::new();
478    for group in &group_outputs {
479        data_out.extend_from_slice(&(group.col_data.len() as u32).to_le_bytes());
480        data_out.extend_from_slice(&group.col_data);
481    }
482
483    // Append residual lines (raw, separated by \n).
484    let residual_start = data_out.len();
485    for (i, &idx) in residual_indices.iter().enumerate() {
486        data_out.extend_from_slice(non_empty[idx]);
487        if i < residual_indices.len() - 1 {
488            data_out.push(b'\n');
489        }
490    }
491    let _residual_len = data_out.len() - residual_start;
492
493    // Build the combined metadata.
494    let mut metadata = Vec::new();
495    metadata.push(METADATA_VERSION_GROUPED);
496    metadata.push(if has_trailing_newline { 1 } else { 0 });
497    metadata.extend_from_slice(&(non_empty.len() as u32).to_le_bytes());
498    metadata.extend_from_slice(&(group_outputs.len() as u16).to_le_bytes());
499
500    for group in &group_outputs {
501        metadata.extend_from_slice(&(group.row_indices.len() as u32).to_le_bytes());
502        for &idx in &group.row_indices {
503            metadata.extend_from_slice(&idx.to_le_bytes());
504        }
505        metadata.extend_from_slice(&(group.group_metadata.len() as u32).to_le_bytes());
506        metadata.extend_from_slice(&group.group_metadata);
507    }
508
509    metadata.extend_from_slice(&(residual_indices.len() as u32).to_le_bytes());
510    for &idx in &residual_indices {
511        metadata.extend_from_slice(&(idx as u32).to_le_bytes());
512    }
513
514    Some((data_out, metadata))
515}
516
517/// Metadata describing which columns were flattened from nested objects.
518pub(crate) struct NestedGroupInfo {
519    /// Index of the original column that was expanded.
520    pub(crate) original_col_index: u16,
521    /// Sub-key names for the expanded sub-columns.
522    pub(crate) sub_keys: Vec<Vec<u8>>,
523    /// Template parts for reconstructing nested objects (preserves original formatting).
524    /// If empty, compact format `{"key":val,...}` is used (NDJSON compatibility).
525    pub(crate) nested_template: Vec<Vec<u8>>,
526}
527
528/// Attempt to flatten nested JSON objects in columnar data (depth-1).
529///
530/// Takes columnar data (\x00/\x01 separated) and returns expanded columnar data
531/// with nested objects decomposed into sub-columns.
532/// Returns None if no nested objects found.
533pub(crate) fn flatten_nested_columns(
534    col_data: &[u8],
535    num_rows: usize,
536) -> Option<(Vec<u8>, Vec<NestedGroupInfo>)> {
537    // Split into columns.
538    let columns: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
539    if columns.is_empty() || num_rows == 0 {
540        return None;
541    }
542
543    let mut nested_groups: Vec<NestedGroupInfo> = Vec::new();
544    // Build the output columns: for non-nested cols, keep as-is.
545    // For nested cols, replace with sub-columns.
546    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
547
548    for (col_idx, &col_chunk) in columns.iter().enumerate() {
549        let values: Vec<&[u8]> = col_chunk.split(|&b| b == VAL_SEP).collect();
550        if values.len() != num_rows {
551            return None;
552        }
553
554        // Check if ALL non-null values start with '{' (nested object).
555        let mut all_objects = true;
556        let mut has_non_null = false;
557        for val in &values {
558            if *val == b"null" {
559                continue;
560            }
561            has_non_null = true;
562            if !val.starts_with(b"{") {
563                all_objects = false;
564                break;
565            }
566        }
567
568        if !all_objects || !has_non_null {
569            // Not a nested-object column — keep as-is.
570            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
571            output_columns.push(col_values);
572            continue;
573        }
574
575        // This column contains nested objects — decompose depth-1.
576        // Parse all values and collect all unique sub-keys (preserving discovery order).
577        // Also capture the template from the first non-null object to preserve formatting.
578        let mut all_sub_keys: Vec<Vec<u8>> = Vec::new();
579        let mut nested_template: Vec<Vec<u8>> = Vec::new();
580        type KvPairs = Vec<(Vec<u8>, Vec<u8>)>;
581        let mut parsed_rows: Vec<Option<KvPairs>> = Vec::with_capacity(num_rows);
582
583        for val in &values {
584            if *val == b"null" {
585                parsed_rows.push(None);
586                continue;
587            }
588            if nested_template.is_empty() {
589                // First non-null: use template-preserving parser.
590                match parse_nested_object_with_template(val) {
591                    Some((template, kv_pairs)) => {
592                        for (key, _) in &kv_pairs {
593                            if !all_sub_keys.iter().any(|k| k == key) {
594                                all_sub_keys.push(key.clone());
595                            }
596                        }
597                        nested_template = template;
598                        parsed_rows.push(Some(kv_pairs));
599                    }
600                    None => {
601                        all_sub_keys.clear();
602                        break;
603                    }
604                }
605            } else {
606                // Subsequent rows: use simpler kv parser (template already captured).
607                match parse_nested_object_kv(val) {
608                    Some(kv_pairs) => {
609                        for (key, _) in &kv_pairs {
610                            if !all_sub_keys.iter().any(|k| k == key) {
611                                all_sub_keys.push(key.clone());
612                            }
613                        }
614                        parsed_rows.push(Some(kv_pairs));
615                    }
616                    None => {
617                        all_sub_keys.clear();
618                        break;
619                    }
620                }
621            }
622        }
623
624        if all_sub_keys.is_empty() {
625            // Could not parse — keep column as-is.
626            let col_values: Vec<Vec<u8>> = values.iter().map(|v| v.to_vec()).collect();
627            output_columns.push(col_values);
628            continue;
629        }
630
631        // Build sub-columns: for each sub-key, extract values from parsed rows.
632        let num_sub_keys = all_sub_keys.len();
633        let mut sub_columns: Vec<Vec<Vec<u8>>> = vec![Vec::with_capacity(num_rows); num_sub_keys];
634
635        for parsed in &parsed_rows {
636            match parsed {
637                Some(kv_pairs) => {
638                    for (sk_idx, sk) in all_sub_keys.iter().enumerate() {
639                        let found = kv_pairs.iter().find(|(k, _)| k == sk);
640                        match found {
641                            Some((_, v)) => sub_columns[sk_idx].push(v.clone()),
642                            None => sub_columns[sk_idx].push(b"null".to_vec()),
643                        }
644                    }
645                }
646                None => {
647                    // null row — all sub-columns get null.
648                    for sc in sub_columns.iter_mut() {
649                        sc.push(b"null".to_vec());
650                    }
651                }
652            }
653        }
654
655        nested_groups.push(NestedGroupInfo {
656            original_col_index: col_idx as u16,
657            sub_keys: all_sub_keys,
658            nested_template,
659        });
660
661        for sc in sub_columns {
662            output_columns.push(sc);
663        }
664    }
665
666    if nested_groups.is_empty() {
667        return None;
668    }
669
670    // Build the flattened columnar data.
671    let num_out_cols = output_columns.len();
672    let mut out = Vec::new();
673    for (ci, col) in output_columns.iter().enumerate() {
674        for (ri, val) in col.iter().enumerate() {
675            out.extend_from_slice(val);
676            if ri < num_rows - 1 {
677                out.push(VAL_SEP);
678            }
679        }
680        if ci < num_out_cols - 1 {
681            out.push(COL_SEP);
682        }
683    }
684
685    Some((out, nested_groups))
686}
687
688/// Parse a nested JSON object into (template_parts, kv_pairs).
689/// Template parts include all structural bytes (braces, keys, colons, whitespace) —
690/// preserving the original formatting so the object can be reconstructed exactly.
691/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
692#[allow(clippy::type_complexity)]
693pub(crate) fn parse_nested_object_with_template(
694    obj: &[u8],
695) -> Option<(Vec<Vec<u8>>, Vec<(Vec<u8>, Vec<u8>)>)> {
696    let mut pos = 0;
697
698    // Skip whitespace.
699    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
700        pos += 1;
701    }
702    if pos >= obj.len() || obj[pos] != b'{' {
703        return None;
704    }
705    pos += 1;
706
707    let mut parts: Vec<Vec<u8>> = Vec::new();
708    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
709    let mut part_start = 0;
710
711    loop {
712        // Skip whitespace.
713        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
714            pos += 1;
715        }
716        if pos >= obj.len() {
717            return None;
718        }
719        if obj[pos] == b'}' {
720            parts.push(obj[part_start..].to_vec());
721            break;
722        }
723
724        // Expect a key string.
725        if obj[pos] != b'"' {
726            return None;
727        }
728        let key_str_start = pos + 1;
729        pos += 1;
730        let mut escaped = false;
731        while pos < obj.len() {
732            if escaped {
733                escaped = false;
734            } else if obj[pos] == b'\\' {
735                escaped = true;
736            } else if obj[pos] == b'"' {
737                break;
738            }
739            pos += 1;
740        }
741        if pos >= obj.len() {
742            return None;
743        }
744        let key = obj[key_str_start..pos].to_vec();
745        pos += 1; // skip closing quote
746
747        // Skip whitespace, expect colon.
748        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
749            pos += 1;
750        }
751        if pos >= obj.len() || obj[pos] != b':' {
752            return None;
753        }
754        pos += 1;
755
756        // Skip whitespace between colon and value — include in template.
757        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
758            pos += 1;
759        }
760
761        // Template part: everything from part_start to here (includes key, colon, post-colon ws).
762        parts.push(obj[part_start..pos].to_vec());
763
764        // Extract the value (no whitespace skipping — already consumed above).
765        let value_start = pos;
766        // Use extract_value but we've already consumed whitespace.
767        let (value, value_end) = extract_value(obj, value_start)?;
768        pos = value_end;
769        pairs.push((key, value));
770
771        part_start = pos;
772
773        // Skip whitespace.
774        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
775            pos += 1;
776        }
777        if pos >= obj.len() {
778            return None;
779        }
780        if obj[pos] == b',' {
781            pos += 1;
782        } else if obj[pos] == b'}' {
783            parts.push(obj[part_start..].to_vec());
784            break;
785        } else {
786            return None;
787        }
788    }
789
790    if pairs.is_empty() {
791        return None;
792    }
793    Some((parts, pairs))
794}
795
796/// Parse a nested JSON object into its key-value pairs (depth-1 only).
797/// Returns the exact bytes for each key and value.
798/// Keys are returned WITHOUT quotes. Values are the exact bytes from the JSON.
799pub(crate) fn parse_nested_object_kv(obj: &[u8]) -> Option<Vec<(Vec<u8>, Vec<u8>)>> {
800    let mut pos = 0;
801
802    // Skip whitespace.
803    while pos < obj.len() && obj[pos].is_ascii_whitespace() {
804        pos += 1;
805    }
806    if pos >= obj.len() || obj[pos] != b'{' {
807        return None;
808    }
809    pos += 1;
810
811    let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
812
813    loop {
814        // Skip whitespace.
815        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
816            pos += 1;
817        }
818        if pos >= obj.len() {
819            return None;
820        }
821        if obj[pos] == b'}' {
822            break;
823        }
824
825        // Expect a key string.
826        if obj[pos] != b'"' {
827            return None;
828        }
829        pos += 1;
830        let key_start = pos;
831        let mut escaped = false;
832        while pos < obj.len() {
833            if escaped {
834                escaped = false;
835            } else if obj[pos] == b'\\' {
836                escaped = true;
837            } else if obj[pos] == b'"' {
838                break;
839            }
840            pos += 1;
841        }
842        if pos >= obj.len() {
843            return None;
844        }
845        let key = obj[key_start..pos].to_vec();
846        pos += 1; // skip closing quote
847
848        // Skip whitespace, expect colon.
849        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
850            pos += 1;
851        }
852        if pos >= obj.len() || obj[pos] != b':' {
853            return None;
854        }
855        pos += 1;
856
857        // Extract the value.
858        let (value, value_end) = extract_value(obj, pos)?;
859        pos = value_end;
860        pairs.push((key, value));
861
862        // Skip whitespace.
863        while pos < obj.len() && obj[pos].is_ascii_whitespace() {
864            pos += 1;
865        }
866        if pos >= obj.len() {
867            return None;
868        }
869        if obj[pos] == b',' {
870            pos += 1;
871        } else if obj[pos] == b'}' {
872            break;
873        } else {
874            return None;
875        }
876    }
877
878    if pairs.is_empty() {
879        return None;
880    }
881    Some(pairs)
882}
883
884/// Unflatten nested sub-columns back into JSON objects.
885///
886/// Takes flattened columnar data and nested group info, merges sub-columns
887/// back into the original nested object columns.
888pub(crate) fn unflatten_nested_columns(
889    flat_data: &[u8],
890    nested_groups: &[NestedGroupInfo],
891    num_rows: usize,
892    total_flat_cols: usize,
893) -> Vec<u8> {
894    let flat_columns: Vec<&[u8]> = flat_data.split(|&b| b == COL_SEP).collect();
895    if flat_columns.len() != total_flat_cols {
896        return flat_data.to_vec();
897    }
898
899    // Parse all flat column values.
900    let mut flat_col_values: Vec<Vec<&[u8]>> = Vec::with_capacity(total_flat_cols);
901    for chunk in &flat_columns {
902        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
903        if vals.len() != num_rows {
904            return flat_data.to_vec();
905        }
906        flat_col_values.push(vals);
907    }
908
909    // Reconstruct original columns from flat columns.
910    // Walk through flat columns, merging sub-columns back where needed.
911    let mut output_columns: Vec<Vec<Vec<u8>>> = Vec::new();
912
913    // Build a set of original_col_index -> group for quick lookup.
914    // We need to know which flat columns map to which nested group.
915    // The flat columns are in order: non-nested cols keep their position,
916    // nested cols are replaced by their sub-columns at that position.
917    //
918    // To figure out which flat_idx corresponds to what, we replay the
919    // forward mapping.
920    // We need to know the ORIGINAL number of columns.
921    let original_num_cols = total_flat_cols
922        - nested_groups
923            .iter()
924            .map(|g| g.sub_keys.len())
925            .sum::<usize>()
926        + nested_groups.len();
927
928    // Build mapping: for each original col, is it nested or not?
929    let mut original_col_map: Vec<Option<usize>> = vec![None; original_num_cols];
930    for (gi, group) in nested_groups.iter().enumerate() {
931        if (group.original_col_index as usize) < original_num_cols {
932            original_col_map[group.original_col_index as usize] = Some(gi);
933        }
934    }
935
936    let mut flat_idx = 0;
937    for entry in original_col_map.iter().take(original_num_cols) {
938        if let Some(gi) = entry {
939            let group = &nested_groups[*gi];
940            let num_sub = group.sub_keys.len();
941
942            // Merge sub-columns back into nested objects.
943            let mut merged_col: Vec<Vec<u8>> = Vec::with_capacity(num_rows);
944            for row in 0..num_rows {
945                // Check if all sub-columns are null for this row.
946                let all_null = (0..num_sub).all(|si| {
947                    flat_idx + si < flat_col_values.len()
948                        && flat_col_values[flat_idx + si][row] == b"null"
949                });
950                if all_null {
951                    merged_col.push(b"null".to_vec());
952                } else if !group.nested_template.is_empty()
953                    && group.nested_template.len() == num_sub + 1
954                {
955                    // Template-based reconstruction: preserves original formatting.
956                    let mut obj = Vec::new();
957                    obj.extend_from_slice(&group.nested_template[0]);
958                    if flat_idx < flat_col_values.len() {
959                        obj.extend_from_slice(flat_col_values[flat_idx][row]);
960                    }
961                    for si in 1..num_sub {
962                        obj.extend_from_slice(&group.nested_template[si]);
963                        if flat_idx + si < flat_col_values.len() {
964                            obj.extend_from_slice(flat_col_values[flat_idx + si][row]);
965                        }
966                    }
967                    obj.extend_from_slice(&group.nested_template[num_sub]);
968                    merged_col.push(obj);
969                } else {
970                    // Compact reconstruction (NDJSON compatibility / no template).
971                    let mut obj = Vec::new();
972                    obj.push(b'{');
973                    let mut first = true;
974                    for si in 0..num_sub {
975                        if flat_idx + si >= flat_col_values.len() {
976                            break;
977                        }
978                        let val = flat_col_values[flat_idx + si][row];
979                        if val == b"null" {
980                            continue; // skip null sub-keys in reconstruction
981                        }
982                        if !first {
983                            obj.push(b',');
984                        }
985                        first = false;
986                        obj.push(b'"');
987                        obj.extend_from_slice(&group.sub_keys[si]);
988                        obj.push(b'"');
989                        obj.push(b':');
990                        obj.extend_from_slice(val);
991                    }
992                    obj.push(b'}');
993                    merged_col.push(obj);
994                }
995            }
996            output_columns.push(merged_col);
997            flat_idx += num_sub;
998        } else {
999            // Non-nested column — copy as-is.
1000            if flat_idx < flat_col_values.len() {
1001                let col: Vec<Vec<u8>> = flat_col_values[flat_idx]
1002                    .iter()
1003                    .map(|v| v.to_vec())
1004                    .collect();
1005                output_columns.push(col);
1006            }
1007            flat_idx += 1;
1008        }
1009    }
1010
1011    // Rebuild columnar data.
1012    let num_out_cols = output_columns.len();
1013    let mut out = Vec::new();
1014    for (ci, col) in output_columns.iter().enumerate() {
1015        for (ri, val) in col.iter().enumerate() {
1016            out.extend_from_slice(val);
1017            if ri < num_rows - 1 {
1018                out.push(VAL_SEP);
1019            }
1020        }
1021        if ci < num_out_cols - 1 {
1022            out.push(COL_SEP);
1023        }
1024    }
1025
1026    out
1027}
1028
1029/// Serialize nested group info into bytes for storage in metadata.
1030/// Version 1 (has_nested=1): sub_keys only (backward compat, NDJSON path).
1031/// Version 2 (has_nested=2): sub_keys + nested_template (preserves formatting).
1032pub(crate) fn serialize_nested_info(groups: &[NestedGroupInfo]) -> Vec<u8> {
1033    let has_template = groups.iter().any(|g| !g.nested_template.is_empty());
1034    let mut out = Vec::new();
1035    out.push(if has_template { 2u8 } else { 1u8 });
1036    out.push(groups.len() as u8);
1037    for group in groups {
1038        out.extend_from_slice(&group.original_col_index.to_le_bytes());
1039        out.extend_from_slice(&(group.sub_keys.len() as u16).to_le_bytes());
1040        for key in &group.sub_keys {
1041            out.extend_from_slice(&(key.len() as u16).to_le_bytes());
1042            out.extend_from_slice(key);
1043        }
1044        if has_template {
1045            out.extend_from_slice(&(group.nested_template.len() as u16).to_le_bytes());
1046            for part in &group.nested_template {
1047                out.extend_from_slice(&(part.len() as u16).to_le_bytes());
1048                out.extend_from_slice(part);
1049            }
1050        }
1051    }
1052    out
1053}
1054
1055/// Deserialize nested group info from metadata bytes.
1056/// Returns (nested_groups, bytes_consumed).
1057/// Handles version 1 (no template) and version 2 (with template).
1058pub(crate) fn deserialize_nested_info(data: &[u8]) -> Option<(Vec<NestedGroupInfo>, usize)> {
1059    if data.is_empty() {
1060        return None;
1061    }
1062    let mut pos = 0;
1063    let version = data[pos];
1064    pos += 1;
1065    if version != 1 && version != 2 {
1066        return None;
1067    }
1068    let has_template = version == 2;
1069    if pos >= data.len() {
1070        return None;
1071    }
1072    let num_groups = data[pos] as usize;
1073    pos += 1;
1074
1075    let mut groups = Vec::with_capacity(num_groups);
1076    for _ in 0..num_groups {
1077        if pos + 4 > data.len() {
1078            return None;
1079        }
1080        let original_col_index = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
1081        pos += 2;
1082        let num_sub_cols = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1083        pos += 2;
1084
1085        let mut sub_keys = Vec::with_capacity(num_sub_cols);
1086        for _ in 0..num_sub_cols {
1087            if pos + 2 > data.len() {
1088                return None;
1089            }
1090            let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1091            pos += 2;
1092            if pos + key_len > data.len() {
1093                return None;
1094            }
1095            sub_keys.push(data[pos..pos + key_len].to_vec());
1096            pos += key_len;
1097        }
1098
1099        let nested_template = if has_template {
1100            if pos + 2 > data.len() {
1101                return None;
1102            }
1103            let num_parts = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1104            pos += 2;
1105            let mut parts = Vec::with_capacity(num_parts);
1106            for _ in 0..num_parts {
1107                if pos + 2 > data.len() {
1108                    return None;
1109                }
1110                let part_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1111                pos += 2;
1112                if pos + part_len > data.len() {
1113                    return None;
1114                }
1115                parts.push(data[pos..pos + part_len].to_vec());
1116                pos += part_len;
1117            }
1118            parts
1119        } else {
1120            Vec::new()
1121        };
1122
1123        groups.push(NestedGroupInfo {
1124            original_col_index,
1125            sub_keys,
1126            nested_template,
1127        });
1128    }
1129
1130    Some((groups, pos))
1131}
1132
1133/// Forward transform: NDJSON columnar reorg.
1134///
1135/// Tries Strategy 1 (uniform) first, then Strategy 2 (grouped) if schemas differ.
1136/// Returns None if data is not suitable for columnar transform.
1137pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
1138    if data.is_empty() {
1139        return None;
1140    }
1141
1142    let has_trailing_newline = data.last() == Some(&b'\n');
1143    let lines = split_lines(data);
1144    let non_empty: Vec<&[u8]> = lines.into_iter().filter(|l| !l.is_empty()).collect();
1145
1146    if non_empty.len() < 2 {
1147        return None;
1148    }
1149
1150    // Strategy 1: try uniform schema first.
1151    if let Some((col_data, mut metadata)) = preprocess_uniform(&non_empty, has_trailing_newline) {
1152        if col_data.len() + metadata.len() < data.len() {
1153            // Try depth-1 nested decomposition on the columnar output.
1154            // Even if the flattened data is slightly larger raw, the downstream
1155            // typed encoding + compression benefits are significant: null bitmaps
1156            // are compact and type-homogeneous columns compress much better.
1157            let num_rows = non_empty.len();
1158            if let Some((flat_data, nested_groups)) = flatten_nested_columns(&col_data, num_rows) {
1159                // Append nested info to metadata.
1160                let nested_bytes = serialize_nested_info(&nested_groups);
1161                metadata.extend_from_slice(&nested_bytes);
1162                return Some(TransformResult {
1163                    data: flat_data,
1164                    metadata,
1165                });
1166            }
1167            // No nested objects found — append has_nested=0.
1168            metadata.push(0u8); // has_nested = 0
1169            return Some(TransformResult {
1170                data: col_data,
1171                metadata,
1172            });
1173        }
1174    }
1175
1176    // Strategy 2: group by schema.
1177    if let Some((grouped_data, grouped_metadata)) =
1178        preprocess_grouped(&non_empty, has_trailing_newline)
1179    {
1180        if grouped_data.len() + grouped_metadata.len() < data.len() {
1181            return Some(TransformResult {
1182                data: grouped_data,
1183                metadata: grouped_metadata,
1184            });
1185        }
1186    }
1187
1188    None
1189}
1190
1191/// Reverse transform: reconstruct NDJSON from columnar layout + metadata.
1192/// Dispatches to the appropriate decoder based on metadata version byte.
1193pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1194    if metadata.is_empty() {
1195        return data.to_vec();
1196    }
1197    match metadata[0] {
1198        METADATA_VERSION_UNIFORM => reverse_uniform(data, metadata),
1199        METADATA_VERSION_GROUPED => reverse_grouped(data, metadata),
1200        _ => data.to_vec(),
1201    }
1202}
1203
1204/// Reverse Strategy 1: uniform schema.
1205fn reverse_uniform(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1206    if metadata.len() < 10 {
1207        return data.to_vec();
1208    }
1209    let mut pos = 0;
1210    let _version = metadata[pos];
1211    pos += 1;
1212    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1213    pos += 4;
1214    let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1215    pos += 2;
1216    let has_trailing_newline = metadata[pos] != 0;
1217    pos += 1;
1218    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1219    pos += 2;
1220
1221    let mut parts: Vec<Vec<u8>> = Vec::with_capacity(num_parts);
1222    for _ in 0..num_parts {
1223        if pos + 2 > metadata.len() {
1224            return data.to_vec();
1225        }
1226        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1227        pos += 2;
1228        if pos + part_len > metadata.len() {
1229            return data.to_vec();
1230        }
1231        parts.push(metadata[pos..pos + part_len].to_vec());
1232        pos += part_len;
1233    }
1234
1235    if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1236        return data.to_vec();
1237    }
1238
1239    // Check for nested metadata after template parts.
1240    let remaining_metadata = &metadata[pos..];
1241    if !remaining_metadata.is_empty() && (remaining_metadata[0] == 1 || remaining_metadata[0] == 2)
1242    {
1243        // has_nested == 1 or 2: unflatten before reconstructing rows.
1244        if let Some((nested_groups, _)) = deserialize_nested_info(remaining_metadata) {
1245            // Calculate total number of flat columns.
1246            let total_flat_cols = data.split(|&b| b == COL_SEP).count();
1247            let unflattened =
1248                unflatten_nested_columns(data, &nested_groups, num_rows, total_flat_cols);
1249            return reverse_uniform_from_parts(
1250                &unflattened,
1251                &parts,
1252                num_rows,
1253                num_cols,
1254                has_trailing_newline,
1255            );
1256        }
1257    }
1258
1259    reverse_uniform_from_parts(data, &parts, num_rows, num_cols, has_trailing_newline)
1260}
1261
1262/// Core uniform reverse: given parsed parts, reconstruct lines from columnar data.
1263fn reverse_uniform_from_parts(
1264    data: &[u8],
1265    parts: &[Vec<u8>],
1266    num_rows: usize,
1267    num_cols: usize,
1268    has_trailing_newline: bool,
1269) -> Vec<u8> {
1270    let col_chunks: Vec<&[u8]> = data.split(|&b| b == COL_SEP).collect();
1271    if col_chunks.len() != num_cols {
1272        return data.to_vec();
1273    }
1274
1275    let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1276    for chunk in &col_chunks {
1277        let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1278        if vals.len() != num_rows {
1279            return data.to_vec();
1280        }
1281        columns.push(vals);
1282    }
1283
1284    let mut output = Vec::with_capacity(data.len() * 2);
1285    #[allow(clippy::needless_range_loop)]
1286    for row in 0..num_rows {
1287        output.extend_from_slice(&parts[0]);
1288        output.extend_from_slice(columns[0][row]);
1289        for col in 1..num_cols {
1290            output.extend_from_slice(&parts[col]);
1291            output.extend_from_slice(columns[col][row]);
1292        }
1293        output.extend_from_slice(&parts[num_cols]);
1294
1295        if row < num_rows - 1 || has_trailing_newline {
1296            output.push(b'\n');
1297        }
1298    }
1299
1300    output
1301}
1302
1303/// Parse Strategy 1 metadata and return (parts, num_rows, num_cols, has_trailing_newline).
1304/// Used by reverse_grouped to decode per-group metadata.
1305fn parse_uniform_metadata(metadata: &[u8]) -> Option<(Vec<Vec<u8>>, usize, usize, bool)> {
1306    if metadata.len() < 10 {
1307        return None;
1308    }
1309    let mut pos = 1; // Skip version byte.
1310    let num_rows = u32::from_le_bytes(metadata[pos..pos + 4].try_into().unwrap()) as usize;
1311    pos += 4;
1312    let num_cols = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1313    pos += 2;
1314    let has_trailing_newline = metadata[pos] != 0;
1315    pos += 1;
1316    let num_parts = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1317    pos += 2;
1318
1319    let mut parts = Vec::with_capacity(num_parts);
1320    for _ in 0..num_parts {
1321        if pos + 2 > metadata.len() {
1322            return None;
1323        }
1324        let part_len = u16::from_le_bytes(metadata[pos..pos + 2].try_into().unwrap()) as usize;
1325        pos += 2;
1326        if pos + part_len > metadata.len() {
1327            return None;
1328        }
1329        parts.push(metadata[pos..pos + part_len].to_vec());
1330        pos += part_len;
1331    }
1332
1333    if parts.len() != num_cols + 1 || num_rows == 0 || num_cols == 0 {
1334        return None;
1335    }
1336
1337    Some((parts, num_rows, num_cols, has_trailing_newline))
1338}
1339
1340/// Reverse Strategy 2: grouped schema.
1341fn reverse_grouped(data: &[u8], metadata: &[u8]) -> Vec<u8> {
1342    if metadata.len() < 8 {
1343        return data.to_vec();
1344    }
1345
1346    let mut mpos = 1; // Skip version byte.
1347    let has_trailing_newline = metadata[mpos] != 0;
1348    mpos += 1;
1349    let total_rows = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1350    mpos += 4;
1351    let num_groups = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
1352    mpos += 2;
1353
1354    // Allocate output slots.
1355    let mut output_lines: Vec<Option<Vec<u8>>> = vec![None; total_rows];
1356
1357    // Data cursor.
1358    let mut dpos: usize = 0;
1359
1360    for _ in 0..num_groups {
1361        // Read row indices for this group.
1362        if mpos + 4 > metadata.len() {
1363            return data.to_vec();
1364        }
1365        let group_row_count =
1366            u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1367        mpos += 4;
1368
1369        let mut row_indices = Vec::with_capacity(group_row_count);
1370        for _ in 0..group_row_count {
1371            if mpos + 4 > metadata.len() {
1372                return data.to_vec();
1373            }
1374            let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1375            mpos += 4;
1376            row_indices.push(idx);
1377        }
1378
1379        // Read group metadata.
1380        if mpos + 4 > metadata.len() {
1381            return data.to_vec();
1382        }
1383        let gm_len = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1384        mpos += 4;
1385        if mpos + gm_len > metadata.len() {
1386            return data.to_vec();
1387        }
1388        let group_metadata = &metadata[mpos..mpos + gm_len];
1389        mpos += gm_len;
1390
1391        // Read group data from the data blob.
1392        if dpos + 4 > data.len() {
1393            return data.to_vec();
1394        }
1395        let gd_len = u32::from_le_bytes(data[dpos..dpos + 4].try_into().unwrap()) as usize;
1396        dpos += 4;
1397        if dpos + gd_len > data.len() {
1398            return data.to_vec();
1399        }
1400        let group_data = &data[dpos..dpos + gd_len];
1401        dpos += gd_len;
1402
1403        // Decode this group using Strategy 1 reverse.
1404        let (parts, num_rows, num_cols, _trailing) = match parse_uniform_metadata(group_metadata) {
1405            Some(v) => v,
1406            None => return data.to_vec(),
1407        };
1408
1409        if num_rows != group_row_count {
1410            return data.to_vec();
1411        }
1412
1413        // Split columnar data into columns and values.
1414        let col_chunks: Vec<&[u8]> = group_data.split(|&b| b == COL_SEP).collect();
1415        if col_chunks.len() != num_cols {
1416            return data.to_vec();
1417        }
1418
1419        let mut columns: Vec<Vec<&[u8]>> = Vec::with_capacity(num_cols);
1420        for chunk in &col_chunks {
1421            let vals: Vec<&[u8]> = chunk.split(|&b| b == VAL_SEP).collect();
1422            if vals.len() != num_rows {
1423                return data.to_vec();
1424            }
1425            columns.push(vals);
1426        }
1427
1428        // Reconstruct each line for this group.
1429        for (row_within_group, &original_idx) in row_indices.iter().enumerate() {
1430            let mut line = Vec::new();
1431            line.extend_from_slice(&parts[0]);
1432            line.extend_from_slice(columns[0][row_within_group]);
1433            for col in 1..num_cols {
1434                line.extend_from_slice(&parts[col]);
1435                line.extend_from_slice(columns[col][row_within_group]);
1436            }
1437            line.extend_from_slice(&parts[num_cols]);
1438
1439            if original_idx < total_rows {
1440                output_lines[original_idx] = Some(line);
1441            }
1442        }
1443    }
1444
1445    // Read residual indices from metadata.
1446    if mpos + 4 > metadata.len() {
1447        return data.to_vec();
1448    }
1449    let residual_count = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1450    mpos += 4;
1451
1452    let mut residual_indices = Vec::with_capacity(residual_count);
1453    for _ in 0..residual_count {
1454        if mpos + 4 > metadata.len() {
1455            return data.to_vec();
1456        }
1457        let idx = u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
1458        mpos += 4;
1459        residual_indices.push(idx);
1460    }
1461
1462    // Remaining data is residual lines.
1463    let residual_data = &data[dpos..];
1464    if residual_count > 0 {
1465        let residual_lines: Vec<&[u8]> = if residual_data.is_empty() {
1466            vec![]
1467        } else {
1468            residual_data.split(|&b| b == b'\n').collect()
1469        };
1470        // There should be exactly residual_count lines.
1471        if residual_lines.len() != residual_count {
1472            return data.to_vec();
1473        }
1474        for (i, &idx) in residual_indices.iter().enumerate() {
1475            if idx < total_rows {
1476                output_lines[idx] = Some(residual_lines[i].to_vec());
1477            }
1478        }
1479    }
1480
1481    // Assemble final output.
1482    let mut output = Vec::with_capacity(data.len() * 2);
1483    for (i, slot) in output_lines.iter().enumerate() {
1484        match slot {
1485            Some(line) => output.extend_from_slice(line),
1486            None => {
1487                // Should not happen — missing row. Return data as-is.
1488                return data.to_vec();
1489            }
1490        }
1491        if i < total_rows - 1 || has_trailing_newline {
1492            output.push(b'\n');
1493        }
1494    }
1495
1496    output
1497}
1498
1499#[cfg(test)]
1500mod tests {
1501    use super::*;
1502
1503    #[test]
1504    fn extract_value_string() {
1505        let line = br#""hello","next""#;
1506        let (val, end) = extract_value(line, 0).unwrap();
1507        assert_eq!(val, b"\"hello\"");
1508        assert_eq!(end, 7);
1509    }
1510
1511    #[test]
1512    fn extract_value_number() {
1513        let line = b"42,next";
1514        let (val, end) = extract_value(line, 0).unwrap();
1515        assert_eq!(val, b"42");
1516        assert_eq!(end, 2);
1517    }
1518
1519    #[test]
1520    fn extract_value_bool() {
1521        let line = b"true,next";
1522        let (val, end) = extract_value(line, 0).unwrap();
1523        assert_eq!(val, b"true");
1524        assert_eq!(end, 4);
1525    }
1526
1527    #[test]
1528    fn extract_value_null() {
1529        let line = b"null,next";
1530        let (val, end) = extract_value(line, 0).unwrap();
1531        assert_eq!(val, b"null");
1532        assert_eq!(end, 4);
1533    }
1534
1535    #[test]
1536    fn extract_value_object() {
1537        let line = br#"{"a":1,"b":"x"},next"#;
1538        let (val, end) = extract_value(line, 0).unwrap();
1539        assert_eq!(val, br#"{"a":1,"b":"x"}"#.to_vec());
1540        assert_eq!(end, 15);
1541    }
1542
1543    #[test]
1544    fn extract_value_array() {
1545        let line = b"[1,2,3],next";
1546        let (val, end) = extract_value(line, 0).unwrap();
1547        assert_eq!(val, b"[1,2,3]");
1548        assert_eq!(end, 7);
1549    }
1550
1551    #[test]
1552    fn extract_value_string_with_escapes() {
1553        let line = br#""he\"llo",next"#;
1554        let (val, end) = extract_value(line, 0).unwrap();
1555        assert_eq!(val, br#""he\"llo""#.to_vec());
1556        assert_eq!(end, 9);
1557    }
1558
1559    #[test]
1560    fn parse_line_simple() {
1561        let line = br#"{"a":1,"b":"x"}"#;
1562        let (parts, values) = parse_line(line).unwrap();
1563        assert_eq!(parts.len(), 3); // {"a": , ,"b": , }
1564        assert_eq!(values.len(), 2);
1565        assert_eq!(values[0], b"1");
1566        assert_eq!(values[1], b"\"x\"");
1567        assert_eq!(parts[0], br#"{"a":"#.to_vec());
1568        assert_eq!(parts[1], br#","b":"#.to_vec());
1569        assert_eq!(parts[2], b"}");
1570    }
1571
1572    #[test]
1573    fn roundtrip_simple() {
1574        let data = br#"{"a":1,"b":"x"}
1575{"a":2,"b":"y"}
1576{"a":3,"b":"z"}
1577"#;
1578        let result = preprocess(data).expect("should produce transform");
1579        let restored = reverse(&result.data, &result.metadata);
1580        assert_eq!(
1581            String::from_utf8_lossy(&restored),
1582            String::from_utf8_lossy(data),
1583        );
1584        assert_eq!(restored, data.to_vec());
1585    }
1586
1587    #[test]
1588    fn roundtrip_no_trailing_newline() {
1589        let data = br#"{"a":1,"b":"x"}
1590{"a":2,"b":"y"}
1591{"a":3,"b":"z"}"#;
1592        let result = preprocess(data).expect("should produce transform");
1593        let restored = reverse(&result.data, &result.metadata);
1594        assert_eq!(restored, data.to_vec());
1595    }
1596
1597    #[test]
1598    fn roundtrip_nested_values() {
1599        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1600{"id":2,"meta":{"x":30,"y":40}}
1601{"id":3,"meta":{"x":50,"y":60}}
1602{"id":4,"meta":{"x":70,"y":80}}
1603{"id":5,"meta":{"x":90,"y":100}}
1604"#;
1605        let result = preprocess(data).expect("should produce transform");
1606        let restored = reverse(&result.data, &result.metadata);
1607        assert_eq!(restored, data.to_vec());
1608    }
1609
1610    #[test]
1611    fn roundtrip_mixed_types() {
1612        let data = br#"{"s":"hello","n":42,"b":true,"x":null,"a":[1,2]}
1613{"s":"world","n":99,"b":false,"x":null,"a":[3,4]}
1614{"s":"foo","n":7,"b":true,"x":null,"a":[5,6]}
1615{"s":"bar","n":13,"b":false,"x":null,"a":[7,8]}
1616{"s":"baz","n":21,"b":true,"x":null,"a":[9,0]}
1617"#;
1618        let result = preprocess(data).expect("should produce transform");
1619        let restored = reverse(&result.data, &result.metadata);
1620        assert_eq!(restored, data.to_vec());
1621    }
1622
1623    #[test]
1624    fn schema_mismatch_too_few_returns_none() {
1625        // Different keys on different lines, but each group has < MIN_GROUP_ROWS.
1626        let data = br#"{"a":1,"b":2}
1627{"a":1,"c":3}
1628"#;
1629        assert!(preprocess(data).is_none());
1630    }
1631
1632    #[test]
1633    fn different_num_keys_too_few_returns_none() {
1634        let data = br#"{"a":1,"b":2}
1635{"a":1}
1636"#;
1637        assert!(preprocess(data).is_none());
1638    }
1639
1640    #[test]
1641    fn single_line_returns_none() {
1642        let data = br#"{"a":1,"b":2}
1643"#;
1644        assert!(preprocess(data).is_none());
1645    }
1646
1647    #[test]
1648    fn empty_returns_none() {
1649        assert!(preprocess(b"").is_none());
1650    }
1651
1652    #[test]
1653    fn column_layout_groups_similar_values() {
1654        let data = br#"{"type":"page_view","user":"alice"}
1655{"type":"api_call","user":"alice"}
1656{"type":"click","user":"bob"}
1657"#;
1658        let result = preprocess(data).unwrap();
1659
1660        // The columnar data should have type values grouped, then user values grouped.
1661        let col_data = &result.data;
1662        let cols: Vec<&[u8]> = col_data.split(|&b| b == COL_SEP).collect();
1663        assert_eq!(cols.len(), 2);
1664
1665        // Column 0 = type values.
1666        let type_vals: Vec<&[u8]> = cols[0].split(|&b| b == VAL_SEP).collect();
1667        assert_eq!(type_vals.len(), 3);
1668        assert_eq!(type_vals[0], br#""page_view""#);
1669        assert_eq!(type_vals[1], br#""api_call""#);
1670        assert_eq!(type_vals[2], br#""click""#);
1671
1672        // Column 1 = user values.
1673        let user_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
1674        assert_eq!(user_vals.len(), 3);
1675        assert_eq!(user_vals[0], br#""alice""#);
1676        assert_eq!(user_vals[1], br#""alice""#);
1677        assert_eq!(user_vals[2], br#""bob""#);
1678    }
1679
1680    #[test]
1681    fn roundtrip_string_with_escaped_chars() {
1682        let data = br#"{"msg":"he said \"hi\"","val":1}
1683{"msg":"line\nbreak","val":2}
1684{"msg":"tab\there","val":3}
1685{"msg":"back\\slash","val":4}
1686{"msg":"normal text","val":5}
1687"#;
1688        let result = preprocess(data).expect("should produce transform");
1689        let restored = reverse(&result.data, &result.metadata);
1690        assert_eq!(restored, data.to_vec());
1691    }
1692
1693    #[test]
1694    fn roundtrip_negative_and_float_numbers() {
1695        let data = br#"{"x":-3.14,"y":0}
1696{"x":2.718,"y":-1}
1697{"x":0.001,"y":999}
1698{"x":-100,"y":-200}
1699{"x":42.0,"y":7}
1700"#;
1701        let result = preprocess(data).expect("should produce transform");
1702        let restored = reverse(&result.data, &result.metadata);
1703        assert_eq!(restored, data.to_vec());
1704    }
1705
1706    /// Test that the transform+reverse is lossless even for tiny inputs
1707    /// by repeating them to pass the size check threshold.
1708    #[test]
1709    fn reverse_roundtrip_small_data() {
1710        // Verify parse_line works on small lines.
1711        let (parts, vals) = parse_line(br#"{"x":-3.14,"y":0}"#).unwrap();
1712        assert_eq!(vals.len(), 2);
1713        assert_eq!(parts.len(), 3);
1714
1715        // 2-row data might fail the size check, so repeat to get enough rows.
1716        let big_data = br#"{"x":-3.14,"y":0}
1717{"x":2.718,"y":-1}
1718"#
1719        .repeat(20);
1720        let result = preprocess(&big_data).expect("should produce transform with 40 rows");
1721        let restored = reverse(&result.data, &result.metadata);
1722        assert_eq!(restored, big_data);
1723    }
1724
1725    // --- Strategy 2 (grouped) tests ---
1726
1727    #[test]
1728    fn grouped_roundtrip_two_schemas() {
1729        // Two different schemas, each with >= MIN_GROUP_ROWS rows.
1730        let mut data = Vec::new();
1731        for i in 0..10 {
1732            data.extend_from_slice(
1733                format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1734            );
1735            data.push(b'\n');
1736        }
1737        for i in 10..20 {
1738            data.extend_from_slice(
1739                format!(
1740                    r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1741                    i, i, i
1742                )
1743                .as_bytes(),
1744            );
1745            data.push(b'\n');
1746        }
1747        let result = preprocess(&data).expect("should produce grouped transform");
1748        // Should be version 2 (grouped).
1749        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1750        let restored = reverse(&result.data, &result.metadata);
1751        assert_eq!(restored, data);
1752    }
1753
1754    #[test]
1755    fn grouped_roundtrip_interleaved_schemas() {
1756        // Interleaved schemas: alternating between two different key sets.
1757        let mut data = Vec::new();
1758        for i in 0..20 {
1759            if i % 2 == 0 {
1760                data.extend_from_slice(
1761                    format!(r#"{{"id":{},"type":"push","repo":"r{}"}}"#, i, i).as_bytes(),
1762                );
1763            } else {
1764                data.extend_from_slice(
1765                    format!(
1766                        r#"{{"id":{},"type":"watch","repo":"r{}","org":"o{}"}}"#,
1767                        i, i, i
1768                    )
1769                    .as_bytes(),
1770                );
1771            }
1772            data.push(b'\n');
1773        }
1774        let result = preprocess(&data).expect("should produce grouped transform");
1775        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1776        let restored = reverse(&result.data, &result.metadata);
1777        assert_eq!(restored, data);
1778    }
1779
1780    #[test]
1781    fn grouped_roundtrip_with_residuals() {
1782        // Two large groups + a few unique-schema rows (residuals).
1783        let mut data = Vec::new();
1784        // Group A: 8 rows.
1785        for i in 0..8 {
1786            data.extend_from_slice(format!(r#"{{"a":{},"b":"val{}"}}"#, i, i).as_bytes());
1787            data.push(b'\n');
1788        }
1789        // 2 unique rows (will be residual).
1790        data.extend_from_slice(br#"{"x":1,"y":2,"z":3}"#);
1791        data.push(b'\n');
1792        data.extend_from_slice(br#"{"p":"q"}"#);
1793        data.push(b'\n');
1794        // Group B: 6 rows.
1795        for i in 0..6 {
1796            data.extend_from_slice(format!(r#"{{"c":{},"d":"val{}","e":true}}"#, i, i).as_bytes());
1797            data.push(b'\n');
1798        }
1799        let result = preprocess(&data).expect("should produce grouped transform");
1800        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1801        let restored = reverse(&result.data, &result.metadata);
1802        assert_eq!(
1803            String::from_utf8_lossy(&restored),
1804            String::from_utf8_lossy(&data),
1805        );
1806        assert_eq!(restored, data);
1807    }
1808
1809    #[test]
1810    fn grouped_roundtrip_no_trailing_newline() {
1811        let mut data = Vec::new();
1812        for i in 0..6 {
1813            data.extend_from_slice(format!(r#"{{"id":{},"type":"push"}}"#, i).as_bytes());
1814            data.push(b'\n');
1815        }
1816        for i in 0..6 {
1817            data.extend_from_slice(
1818                format!(r#"{{"id":{},"type":"watch","org":"o{}"}}"#, i, i).as_bytes(),
1819            );
1820            if i < 5 {
1821                data.push(b'\n');
1822            }
1823            // Last line: no trailing newline.
1824        }
1825        let result = preprocess(&data).expect("should produce grouped transform");
1826        let restored = reverse(&result.data, &result.metadata);
1827        assert_eq!(restored, data);
1828    }
1829
1830    #[test]
1831    fn uniform_still_preferred_over_grouped() {
1832        // All rows same schema — should use Strategy 1 (version 1), not Strategy 2.
1833        let data = br#"{"a":1,"b":"x"}
1834{"a":2,"b":"y"}
1835{"a":3,"b":"z"}
1836{"a":4,"b":"w"}
1837{"a":5,"b":"v"}
1838"#;
1839        let result = preprocess(data).expect("should produce transform");
1840        assert_eq!(
1841            result.metadata[0], METADATA_VERSION_UNIFORM,
1842            "uniform schema should use Strategy 1"
1843        );
1844        let restored = reverse(&result.data, &result.metadata);
1845        assert_eq!(restored, data.to_vec());
1846    }
1847
1848    #[test]
1849    fn grouped_gharchive_simulation() {
1850        // Simulates GitHub Archive: most rows have 7 keys, some have 8.
1851        let mut data = Vec::new();
1852        for i in 0..50 {
1853            if i % 5 == 0 {
1854                // 8-key rows (with org).
1855                data.extend_from_slice(
1856                    format!(
1857                        r#"{{"id":"{}","type":"WatchEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z","org":{{"id":{}}}}}"#,
1858                        i, i, i, i
1859                    )
1860                    .as_bytes(),
1861                );
1862            } else {
1863                // 7-key rows (no org).
1864                data.extend_from_slice(
1865                    format!(
1866                        r#"{{"id":"{}","type":"PushEvent","actor":{{"id":{}}},"repo":{{"id":{}}},"payload":{{}},"public":true,"created_at":"2026-03-20T12:00:00Z"}}"#,
1867                        i, i, i
1868                    )
1869                    .as_bytes(),
1870                );
1871            }
1872            data.push(b'\n');
1873        }
1874        let result = preprocess(&data).expect("should produce grouped transform");
1875        assert_eq!(result.metadata[0], METADATA_VERSION_GROUPED);
1876        let restored = reverse(&result.data, &result.metadata);
1877        assert_eq!(restored, data);
1878    }
1879
1880    // --- Nested decomposition tests ---
1881
1882    #[test]
1883    fn test_nested_decomposition_basic() {
1884        // Simple nested object decomposed correctly.
1885        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1886{"id":2,"meta":{"x":30,"y":40}}
1887{"id":3,"meta":{"x":50,"y":60}}
1888"#;
1889        let result = preprocess(data).expect("should produce transform");
1890        assert_eq!(result.metadata[0], METADATA_VERSION_UNIFORM);
1891
1892        // The columnar data should have expanded columns.
1893        let cols: Vec<&[u8]> = result.data.split(|&b| b == COL_SEP).collect();
1894        // Original: 2 cols (id, meta). After flattening: 3 cols (id, meta.x, meta.y).
1895        assert_eq!(
1896            cols.len(),
1897            3,
1898            "should have 3 columns after flattening: got {}",
1899            cols.len()
1900        );
1901
1902        // Verify sub-columns contain the extracted values.
1903        let meta_x_vals: Vec<&[u8]> = cols[1].split(|&b| b == VAL_SEP).collect();
1904        assert_eq!(meta_x_vals, vec![b"10".as_slice(), b"30", b"50"]);
1905
1906        let meta_y_vals: Vec<&[u8]> = cols[2].split(|&b| b == VAL_SEP).collect();
1907        assert_eq!(meta_y_vals, vec![b"20".as_slice(), b"40", b"60"]);
1908    }
1909
1910    #[test]
1911    fn test_nested_roundtrip() {
1912        // Flatten -> unflatten produces byte-exact original.
1913        let data = br#"{"id":1,"meta":{"x":10,"y":20}}
1914{"id":2,"meta":{"x":30,"y":40}}
1915{"id":3,"meta":{"x":50,"y":60}}
1916"#;
1917        let result = preprocess(data).expect("should produce transform");
1918        let restored = reverse(&result.data, &result.metadata);
1919        assert_eq!(
1920            String::from_utf8_lossy(&restored),
1921            String::from_utf8_lossy(data),
1922        );
1923        assert_eq!(restored, data.to_vec());
1924    }
1925
1926    #[test]
1927    fn test_nested_mixed_schemas() {
1928        // Different nested objects per row (some keys missing -> null).
1929        let data = br#"{"ts":"a","meta":{"query":"benchmark","results_count":14}}
1930{"ts":"b","meta":{"element_id":"btn_5","x":450,"y":230}}
1931{"ts":"c","meta":{"query":"pricing","results_count":25}}
1932{"ts":"d","meta":{"element_id":"btn_2","x":100,"y":200}}
1933{"ts":"e","meta":{"query":"api docs","results_count":41}}
1934"#;
1935        let result = preprocess(data).expect("should produce transform");
1936        let restored = reverse(&result.data, &result.metadata);
1937        assert_eq!(
1938            String::from_utf8_lossy(&restored),
1939            String::from_utf8_lossy(data),
1940        );
1941        assert_eq!(restored, data.to_vec());
1942    }
1943
1944    #[test]
1945    fn test_nested_no_nested_objects() {
1946        // Returns None when no nested objects — flat data should still work.
1947        let data = br#"{"a":1,"b":"x"}
1948{"a":2,"b":"y"}
1949{"a":3,"b":"z"}
1950"#;
1951        let result = preprocess(data).expect("should produce transform");
1952        let restored = reverse(&result.data, &result.metadata);
1953        assert_eq!(restored, data.to_vec());
1954
1955        // Verify the metadata has has_nested=0 since no nested objects.
1956        // The nested flag is appended after template parts.
1957        // For uniform, metadata starts with version(1) + num_rows(4) + num_cols(2) +
1958        // trailing_newline(1) + num_parts(2) + parts.
1959        // After those parts, there should be a 0 byte (has_nested=0).
1960        let meta = &result.metadata;
1961        let last_byte = meta[meta.len() - 1];
1962        assert_eq!(last_byte, 0, "should have has_nested=0 for flat data");
1963    }
1964
1965    #[test]
1966    fn test_nested_real_corpus() {
1967        // Test with data shaped like the test-ndjson.ndjson corpus.
1968        let data = br#"{"ts":"a","type":"search","meta":{"query":"benchmark","results_count":14}}
1969{"ts":"b","type":"click","meta":{"element_id":"btn_5","x":450,"y":230}}
1970{"ts":"c","type":"scroll","meta":{"scroll_depth":0.27,"scroll_direction":"down","max_scroll":0.27}}
1971{"ts":"d","type":"api_call","meta":{"endpoint":"/api/v1/docs","method":"GET","status_code":200,"response_bytes":20460}}
1972{"ts":"e","type":"page_view","meta":{"viewport_width":1920,"viewport_height":1080,"color_depth":30,"timezone":"Asia/Tokyo","language":"ja-JP"}}
1973"#;
1974        let result = preprocess(data).expect("should produce transform");
1975        let restored = reverse(&result.data, &result.metadata);
1976        assert_eq!(
1977            String::from_utf8_lossy(&restored),
1978            String::from_utf8_lossy(data),
1979        );
1980        assert_eq!(restored, data.to_vec());
1981    }
1982
1983    #[test]
1984    fn test_nested_roundtrip_with_null_values() {
1985        // Some rows have null for the nested field.
1986        let data = br#"{"id":1,"meta":{"x":10}}
1987{"id":2,"meta":null}
1988{"id":3,"meta":{"x":30}}
1989{"id":4,"meta":null}
1990{"id":5,"meta":{"x":50}}
1991"#;
1992        let result = preprocess(data).expect("should produce transform");
1993        let restored = reverse(&result.data, &result.metadata);
1994        assert_eq!(restored, data.to_vec());
1995    }
1996
1997    #[test]
1998    fn test_nested_string_values_preserved_exact() {
1999        // Verify that string values in nested objects preserve exact bytes (with quotes).
2000        let data = br#"{"id":1,"meta":{"name":"Alice","score":100}}
2001{"id":2,"meta":{"name":"Bob","score":200}}
2002{"id":3,"meta":{"name":"Charlie","score":300}}
2003"#;
2004        let result = preprocess(data).expect("should produce transform");
2005        let restored = reverse(&result.data, &result.metadata);
2006        assert_eq!(restored, data.to_vec());
2007    }
2008
2009    #[test]
2010    fn test_parse_nested_object_kv() {
2011        let obj = br#"{"query":"benchmark","results_count":14}"#;
2012        let pairs = parse_nested_object_kv(obj).unwrap();
2013        assert_eq!(pairs.len(), 2);
2014        assert_eq!(pairs[0].0, b"query");
2015        assert_eq!(pairs[0].1, br#""benchmark""#.to_vec());
2016        assert_eq!(pairs[1].0, b"results_count");
2017        assert_eq!(pairs[1].1, b"14");
2018    }
2019}