Skip to main content

datacortex_core/format/
mod.rs

1//! Format detection and preprocessing pipeline.
2//!
3//! Phase 0: heuristic detection.
4//! Phase 1: format-aware preprocessing (JSON key interning) + detection.
5
6pub mod json;
7pub mod json_array;
8pub mod ndjson;
9pub mod schema;
10pub mod transform;
11pub mod typed_encoding;
12pub mod value_dict;
13
14use crate::dcx::{FormatHint, Mode};
15use transform::{
16    TRANSFORM_JSON_ARRAY_COLUMNAR, TRANSFORM_JSON_KEY_INTERN, TRANSFORM_NDJSON_COLUMNAR,
17    TRANSFORM_NESTED_FLATTEN, TRANSFORM_TYPED_ENCODING, TRANSFORM_VALUE_DICT, TransformChain,
18};
19
20/// Detect file format from content bytes.
21pub fn detect_format(data: &[u8]) -> FormatHint {
22    if data.is_empty() {
23        return FormatHint::Generic;
24    }
25
26    let trimmed = trim_leading_whitespace(data);
27
28    if starts_with_byte(trimmed, b'{') || starts_with_byte(trimmed, b'[') {
29        if is_ndjson(data) {
30            return FormatHint::Ndjson;
31        }
32        return FormatHint::Json;
33    }
34
35    FormatHint::Generic
36}
37
38/// Detect format from file extension (fallback).
39pub fn detect_from_extension(path: &str) -> Option<FormatHint> {
40    let ext = path.rsplit('.').next()?.to_lowercase();
41    match ext.as_str() {
42        "json" => Some(FormatHint::Json),
43        "ndjson" | "jsonl" => Some(FormatHint::Ndjson),
44        _ => None,
45    }
46}
47
48/// Apply format-aware preprocessing transforms.
49/// Returns (preprocessed_data, transform_chain).
50///
51/// NDJSON columnar: ALL modes (grouping similar values helps both zstd and CM).
52/// Key interning: Balanced/Max only (hurts Fast mode due to zstd redundancy).
53/// For NDJSON, columnar is applied FIRST — if it succeeds, key interning is skipped
54/// (keys are already removed from the data stream by the columnar transform).
55pub fn preprocess(data: &[u8], format: FormatHint, mode: Mode) -> (Vec<u8>, TransformChain) {
56    let mut chain = TransformChain::new();
57    let mut current = data.to_vec();
58
59    // Track whether a uniform columnar transform was applied (for value dict chaining).
60    // Uniform columnar = data is \x00/\x01-separated, downstream transforms are compatible.
61    let mut columnar_applied = false;
62    // Track whether ANY ndjson transform was applied (uniform or grouped).
63    let mut ndjson_transform_applied = false;
64
65    // NDJSON columnar reorg: ALL modes (dramatic improvement for uniform NDJSON).
66    // Strategy 1 (uniform, version=1) produces \x00/\x01 separated columnar data.
67    // Strategy 2 (grouped, version=2) produces a different format with per-group data.
68    // Only Strategy 1 output is compatible with downstream typed_encoding/value_dict.
69    if format == FormatHint::Ndjson {
70        if let Some(result) = ndjson::preprocess(&current) {
71            let is_uniform_columnar = !result.metadata.is_empty() && result.metadata[0] == 1;
72            chain.push(TRANSFORM_NDJSON_COLUMNAR, result.metadata);
73            current = result.data;
74            ndjson_transform_applied = true;
75            columnar_applied = is_uniform_columnar;
76        }
77    }
78
79    // JSON array columnar reorg: ALL modes.
80    // Strategy 1 (uniform, version=1) produces \x00/\x01 separated columnar data.
81    // Strategy 2 (grouped, version=2) produces a different format with per-group data.
82    // Only Strategy 1 output is compatible with downstream typed_encoding/value_dict/nested_flatten.
83    let mut json_array_applied = false;
84    if !columnar_applied && !ndjson_transform_applied && format == FormatHint::Json {
85        if let Some(result) = json_array::preprocess(&current) {
86            let is_uniform = !result.metadata.is_empty() && result.metadata[0] == 1;
87            chain.push(TRANSFORM_JSON_ARRAY_COLUMNAR, result.metadata);
88            current = result.data;
89            json_array_applied = true;
90            columnar_applied = is_uniform;
91        }
92    }
93
94    // Nested flatten: decompose nested objects into sub-columns.
95    // Works on any \x00/\x01 columnar data. Only for non-NDJSON paths because
96    // the NDJSON uniform path already handles its own nested flatten internally.
97    if columnar_applied && !ndjson_transform_applied {
98        // Extract num_rows from the json_array metadata (offset 1, u32 LE).
99        let ja_meta = &chain.records.last().unwrap().metadata;
100        if ja_meta.len() >= 5 {
101            let num_rows = u32::from_le_bytes(ja_meta[1..5].try_into().unwrap()) as usize;
102            if let Some((flat_data, nested_groups)) =
103                ndjson::flatten_nested_columns(&current, num_rows)
104            {
105                // Build metadata: num_rows + total_flat_cols + serialized nested info.
106                let total_flat_cols = flat_data.split(|&b| b == 0x00).count() as u16;
107
108                // Verify roundtrip: unflatten must produce the exact original columnar
109                // data. Nested objects with varying sub-key sets or key ordering can
110                // cause the compact reconstruction to reorder keys, breaking byte-exact
111                // roundtrip. Only apply if the unflatten is provably lossless.
112                let unflattened = ndjson::unflatten_nested_columns(
113                    &flat_data,
114                    &nested_groups,
115                    num_rows,
116                    total_flat_cols as usize,
117                );
118                if unflattened == current {
119                    let mut nested_meta = Vec::new();
120                    nested_meta.extend_from_slice(&(num_rows as u32).to_le_bytes());
121                    nested_meta.extend_from_slice(&total_flat_cols.to_le_bytes());
122                    nested_meta
123                        .extend_from_slice(&ndjson::serialize_nested_info(&nested_groups));
124                    chain.push(TRANSFORM_NESTED_FLATTEN, nested_meta);
125                    current = flat_data;
126                }
127                // else: roundtrip not exact — skip nested flatten (data stays columnar
128                // without sub-column decomposition, still benefits from typed encoding
129                // and value dict on the outer columns).
130            }
131        }
132    }
133
134    // Typed encoding: Fast mode ONLY. CM mode doesn't benefit (gotcha #35 confirmed).
135    // Binary encoding disrupts CM's learned text patterns. But zstd benefits from
136    // smaller raw data (delta varints, boolean bitmaps).
137    if columnar_applied && mode == Mode::Fast {
138        if let Some(result) = typed_encoding::preprocess(&current) {
139            chain.push(TRANSFORM_TYPED_ENCODING, result.metadata);
140            current = result.data;
141        }
142    }
143
144    // Value dictionary: chain AFTER any columnar transform.
145    // Replaces repeated multi-byte values with single-byte codes.
146    // Only applies to columnar data (uses \x00/\x01 separators).
147    // NOTE: value dict only operates on \x00/\x01-separated data.
148    // If typed encoding was applied, the data is now binary (no separators),
149    // so value dict will naturally not apply (it won't find separators to split on,
150    // or its size check will fail).
151    if columnar_applied {
152        if let Some(result) = value_dict::preprocess(&current) {
153            chain.push(TRANSFORM_VALUE_DICT, result.metadata);
154            current = result.data;
155        }
156    }
157
158    if columnar_applied || ndjson_transform_applied || json_array_applied {
159        return (current, chain);
160    }
161
162    // JSON key interning: Balanced/Max only (hurts Fast mode due to zstd redundancy).
163    if matches!(mode, Mode::Balanced | Mode::Max)
164        && matches!(format, FormatHint::Json | FormatHint::Ndjson)
165        && let Some(result) = json::preprocess(&current)
166    {
167        chain.push(TRANSFORM_JSON_KEY_INTERN, result.metadata);
168        current = result.data;
169    }
170
171    (current, chain)
172}
173
174/// Reverse preprocessing transforms (applied in reverse order).
175pub fn reverse_preprocess(data: &[u8], chain: &TransformChain) -> Vec<u8> {
176    let mut current = data.to_vec();
177
178    // Apply in reverse order.
179    for record in chain.records.iter().rev() {
180        match record.id {
181            TRANSFORM_JSON_KEY_INTERN => {
182                current = json::reverse(&current, &record.metadata);
183            }
184            TRANSFORM_NDJSON_COLUMNAR => {
185                current = ndjson::reverse(&current, &record.metadata);
186            }
187            TRANSFORM_JSON_ARRAY_COLUMNAR => {
188                current = json_array::reverse(&current, &record.metadata);
189            }
190            TRANSFORM_VALUE_DICT => {
191                current = value_dict::reverse(&current, &record.metadata);
192            }
193            TRANSFORM_TYPED_ENCODING => {
194                current = typed_encoding::reverse(&current, &record.metadata);
195            }
196            TRANSFORM_NESTED_FLATTEN => {
197                // Metadata: num_rows (u32 LE) + total_flat_cols (u16 LE) + nested_info.
198                if record.metadata.len() >= 6 {
199                    let num_rows =
200                        u32::from_le_bytes(record.metadata[0..4].try_into().unwrap()) as usize;
201                    let total_flat_cols =
202                        u16::from_le_bytes(record.metadata[4..6].try_into().unwrap()) as usize;
203                    if let Some((nested_groups, _)) =
204                        ndjson::deserialize_nested_info(&record.metadata[6..])
205                    {
206                        current = ndjson::unflatten_nested_columns(
207                            &current,
208                            &nested_groups,
209                            num_rows,
210                            total_flat_cols,
211                        );
212                    }
213                }
214            }
215            _ => {} // Unknown/legacy transform — skip.
216        }
217    }
218
219    current
220}
221
222// --- Detection helpers (unchanged from Phase 0) ---
223
224fn trim_leading_whitespace(data: &[u8]) -> &[u8] {
225    let start = data
226        .iter()
227        .position(|&b| !b.is_ascii_whitespace())
228        .unwrap_or(data.len());
229    &data[start..]
230}
231
232fn starts_with_byte(data: &[u8], byte: u8) -> bool {
233    data.first() == Some(&byte)
234}
235
236fn is_ndjson(data: &[u8]) -> bool {
237    let mut json_lines = 0;
238    let mut total_lines = 0;
239
240    for line in data.split(|&b| b == b'\n') {
241        let trimmed = trim_leading_whitespace(line);
242        if trimmed.is_empty() {
243            continue;
244        }
245        total_lines += 1;
246        if starts_with_byte(trimmed, b'{') {
247            json_lines += 1;
248        }
249    }
250
251    total_lines >= 2 && json_lines as f64 / total_lines as f64 > 0.8
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    #[test]
259    fn detect_json() {
260        assert_eq!(detect_format(b"  {\"key\": \"value\"}"), FormatHint::Json);
261        assert_eq!(detect_format(b"[1, 2, 3]"), FormatHint::Json);
262    }
263
264    #[test]
265    fn detect_ndjson() {
266        let data = b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n";
267        assert_eq!(detect_format(data), FormatHint::Ndjson);
268    }
269
270    #[test]
271    fn detect_generic_fallback() {
272        assert_eq!(detect_format(b""), FormatHint::Generic);
273        assert_eq!(detect_format(b"just some random text"), FormatHint::Generic);
274    }
275
276    #[test]
277    fn extension_detection() {
278        assert_eq!(detect_from_extension("test.json"), Some(FormatHint::Json));
279        assert_eq!(
280            detect_from_extension("data.ndjson"),
281            Some(FormatHint::Ndjson)
282        );
283        assert_eq!(detect_from_extension("file.txt"), None);
284    }
285
286    #[test]
287    fn preprocess_json_key_interning() {
288        let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25}"#;
289        let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
290        assert!(!chain.is_empty(), "should have applied key interning");
291        assert!(
292            preprocessed.len() < data.len(),
293            "preprocessed should be smaller"
294        );
295
296        // Reverse and verify.
297        let restored = reverse_preprocess(&preprocessed, &chain);
298        assert_eq!(restored, data.to_vec());
299    }
300
301    #[test]
302    fn preprocess_ndjson_columnar() {
303        let data = br#"{"ts":"a","val":1}
304{"ts":"b","val":2}
305{"ts":"c","val":3}
306"#;
307        let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Balanced);
308        assert!(!chain.is_empty());
309        // Should use columnar transform (ID 2), not key interning.
310        assert_eq!(
311            chain.records[0].id,
312            transform::TRANSFORM_NDJSON_COLUMNAR,
313            "NDJSON should use columnar transform"
314        );
315
316        let restored = reverse_preprocess(&preprocessed, &chain);
317        assert_eq!(restored, data.to_vec());
318    }
319
320    #[test]
321    fn preprocess_ndjson_columnar_fast_mode() {
322        // Columnar should apply for ALL modes, including Fast.
323        let data = br#"{"ts":"a","val":1}
324{"ts":"b","val":2}
325{"ts":"c","val":3}
326"#;
327        let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
328        assert!(!chain.is_empty());
329        assert_eq!(chain.records[0].id, transform::TRANSFORM_NDJSON_COLUMNAR);
330
331        let restored = reverse_preprocess(&preprocessed, &chain);
332        assert_eq!(restored, data.to_vec());
333
334        // Verify columnar data groups values.
335        let cols: Vec<&[u8]> = preprocessed.split(|&b| b == 0x00).collect();
336        assert_eq!(cols.len(), 2, "should have 2 columns");
337    }
338
339    #[test]
340    fn preprocess_json_array_columnar() {
341        let data = br#"{"data": [{"id": 1, "type": "a"}, {"id": 2, "type": "b"}, {"id": 3, "type": "c"}, {"id": 4, "type": "d"}, {"id": 5, "type": "e"}], "meta": {"count": 5}}"#;
342        let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
343        assert!(!chain.is_empty());
344        assert_eq!(
345            chain.records[0].id,
346            transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
347            "JSON with array should use array columnar transform"
348        );
349
350        let restored = reverse_preprocess(&preprocessed, &chain);
351        assert_eq!(restored, data.to_vec());
352    }
353
354    #[test]
355    fn preprocess_json_array_too_few_falls_through() {
356        // Only 3 elements — below MIN_ROWS, should fall through to key interning.
357        let data = br#"{"data": [{"id": 1, "type": "a"}, {"id": 2, "type": "a"}, {"id": 3, "type": "a"}], "meta": {"count": 3}, "data2": [{"id": 1, "type": "a"}, {"id": 2, "type": "a"}, {"id": 3, "type": "a"}]}"#;
358        let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
359        // Should fall through to key interning (not array columnar).
360        if !chain.is_empty() {
361            assert_ne!(
362                chain.records[0].id,
363                transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
364                "3 elements should NOT trigger array columnar"
365            );
366        }
367
368        let restored = reverse_preprocess(&preprocessed, &chain);
369        assert_eq!(restored, data.to_vec());
370    }
371
372    #[test]
373    fn preprocess_non_json_passthrough() {
374        let data = b"just some plain text with no JSON keys";
375        let (preprocessed, chain) = preprocess(data, FormatHint::Generic, Mode::Fast);
376        assert!(chain.is_empty());
377        assert_eq!(preprocessed, data.to_vec());
378    }
379
380    #[test]
381    fn test_json_array_nested_flatten_roundtrip() {
382        // JSON array with nested objects — should apply json_array columnar + nested flatten.
383        let mut json = String::from(r#"{"data": ["#);
384        for i in 0..10 {
385            if i > 0 {
386                json.push_str(", ");
387            }
388            json.push_str(&format!(
389                r#"{{"id": {}, "name": "item_{}", "meta": {{"score": {}, "active": {}, "tag": "t{}"}}}}"#,
390                i, i, i * 10, if i % 2 == 0 { "true" } else { "false" }, i
391            ));
392        }
393        json.push_str(r#"], "total": 10}"#);
394
395        let data = json.as_bytes();
396        let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Fast);
397        assert!(!chain.is_empty());
398        assert_eq!(
399            chain.records[0].id,
400            transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
401            "should apply json_array columnar first"
402        );
403
404        // Check that nested flatten was applied.
405        let has_nested_flatten = chain
406            .records
407            .iter()
408            .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN);
409        assert!(
410            has_nested_flatten,
411            "should apply nested flatten for objects with nested fields"
412        );
413
414        // Verify byte-exact roundtrip.
415        let restored = reverse_preprocess(&preprocessed, &chain);
416        assert_eq!(
417            String::from_utf8_lossy(&restored),
418            String::from_utf8_lossy(data),
419        );
420        assert_eq!(restored, data.to_vec());
421    }
422
423    #[test]
424    fn test_json_array_nested_flatten_improves_ratio() {
425        // Build a dataset where nested flatten demonstrably helps:
426        // many rows with a nested object having repeated/similar values.
427        let mut json = String::from(r#"{"items": ["#);
428        for i in 0..50 {
429            if i > 0 {
430                json.push_str(", ");
431            }
432            json.push_str(&format!(
433                r#"{{"id": {}, "user": {{"name": "user_{}", "role": "admin", "level": {}, "verified": true, "email": "user_{}@test.com"}}}}"#,
434                i, i, i % 5, i
435            ));
436        }
437        json.push_str(r#"]}"#);
438
439        let data = json.as_bytes();
440
441        // Preprocess WITH nested flatten (current code).
442        let (preprocessed_with, chain_with) = preprocess(data, FormatHint::Json, Mode::Fast);
443        assert!(
444            chain_with
445                .records
446                .iter()
447                .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN),
448            "nested flatten should activate"
449        );
450
451        // Verify roundtrip.
452        let restored = reverse_preprocess(&preprocessed_with, &chain_with);
453        assert_eq!(restored, data.to_vec());
454
455        // The preprocessed data should have more columns (sub-columns from nested objects).
456        let num_cols_with = preprocessed_with.split(|&b| b == 0x00).count();
457        // Without nested flatten, json_array produces 2 columns (id, user).
458        // With nested flatten, user is decomposed into 5 sub-columns, so total = 1 + 5 = 6.
459        assert!(
460            num_cols_with > 2,
461            "nested flatten should produce more columns: got {}",
462            num_cols_with
463        );
464    }
465
466    #[test]
467    fn test_ndjson_unaffected() {
468        // NDJSON with nested objects — should use NDJSON path, NOT the standalone nested flatten.
469        let mut ndjson = String::new();
470        for i in 0..10 {
471            ndjson.push_str(&format!(
472                r#"{{"id":{},"user":{{"name":"u{}","level":{}}}}}"#,
473                i,
474                i,
475                i % 3
476            ));
477            ndjson.push('\n');
478        }
479
480        let data = ndjson.as_bytes();
481        let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
482        assert!(!chain.is_empty());
483        assert_eq!(
484            chain.records[0].id,
485            transform::TRANSFORM_NDJSON_COLUMNAR,
486            "NDJSON should use its own columnar transform"
487        );
488
489        // Should NOT have standalone TRANSFORM_NESTED_FLATTEN in chain.
490        let has_standalone_nested = chain
491            .records
492            .iter()
493            .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN);
494        assert!(
495            !has_standalone_nested,
496            "NDJSON path should NOT use standalone nested flatten (it handles nesting internally)"
497        );
498
499        // Verify roundtrip.
500        let restored = reverse_preprocess(&preprocessed, &chain);
501        assert_eq!(restored, data.to_vec());
502    }
503
504    #[test]
505    fn test_ndjson_large_delta_integer_roundtrip() {
506        // Regression: NDJSON with integers spanning the epoch-timestamp range
507        // (e.g. 2147483647 = i32::MAX) caused schema misclassification and
508        // CRC-32 mismatch on decompression in Fast mode.
509        let edges: &[i64] = &[
510            0, -1, 1, -2147483648, 2147483647, -9007199254740991, 9007199254740991,
511        ];
512        let mut ndjson = String::new();
513        for i in 0..203 {
514            ndjson.push_str(&format!(
515                "{{\"val\":{},\"idx\":{}}}\n",
516                edges[i % 7],
517                i
518            ));
519        }
520
521        let data = ndjson.as_bytes();
522
523        // Full pipeline roundtrip (ndjson columnar + typed encoding in Fast mode).
524        let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
525
526        // Verify typed encoding was applied.
527        assert!(
528            chain
529                .records
530                .iter()
531                .any(|r| r.id == transform::TRANSFORM_TYPED_ENCODING),
532            "typed encoding should be applied in Fast mode"
533        );
534
535        let restored = reverse_preprocess(&preprocessed, &chain);
536        assert_eq!(restored, data.to_vec(), "byte-exact roundtrip failed");
537    }
538
539    #[test]
540    fn test_nested_flatten_varying_subkeys_roundtrip() {
541        // Regression test for npm_search.json roundtrip bug:
542        // JSON array of uniform objects where nested dicts have VARYING sub-keys
543        // across rows (e.g., some rows have "license", some don't; "links" has
544        // 5 different schemas). The nested flatten must verify its roundtrip is
545        // byte-exact before applying, because compact reconstruction reorders
546        // keys to discovery order instead of preserving the original order.
547        let mut json = String::from(r#"{"objects":["#);
548        for i in 0..250 {
549            if i > 0 {
550                json.push(',');
551            }
552            // Nested dict with optional key (missing for first 6 rows)
553            let license = if i >= 6 {
554                r#","license":"MIT""#
555            } else {
556                ""
557            };
558            // Nested dict with varying key sets across rows
559            let links = match i % 5 {
560                0 => format!(r#"{{"homepage":"h{i}","repository":"r{i}","bugs":"b{i}","npm":"n{i}"}}"#),
561                1 => format!(r#"{{"homepage":"h{i}","npm":"n{i}","repository":"r{i}"}}"#),
562                2 => format!(r#"{{"npm":"n{i}"}}"#),
563                3 => format!(r#"{{"bugs":"b{i}","homepage":"h{i}","npm":"n{i}"}}"#),
564                _ => format!(r#"{{"npm":"n{i}","repository":"r{i}"}}"#),
565            };
566            let publisher = if i % 3 == 0 {
567                format!(r#"{{"email":"u{i}@t.com","username":"u{i}","actor":"a{i}"}}"#)
568            } else {
569                format!(r#"{{"email":"u{i}@t.com","username":"u{i}"}}"#)
570            };
571            json.push_str(&format!(
572                r#"{{"dl":{{"m":{},"w":{}}},"dep":"{}","sc":{},"pkg":{{"name":"p{i}","kw":["j","t"],"ver":"{i}.0","pub":{publisher},"mnt":[{{"u":"u{i}"}}]{license},"links":{links}}},"score":{{"f":0.5,"d":{{"q":0.8}}}},"flags":{{"x":0}}}}"#,
573                1000 * (i + 1),
574                250 * (i + 1),
575                i * 5,
576                1697.0894 + i as f64 * 0.1,
577            ));
578        }
579        json.push_str(r#"],"total":250}"#);
580
581        let data = json.as_bytes();
582
583        for mode in [Mode::Fast, Mode::Balanced] {
584            let (preprocessed, chain) = preprocess(data, FormatHint::Json, mode);
585            assert!(!chain.is_empty(), "should apply transforms in {mode} mode");
586            let restored = reverse_preprocess(&preprocessed, &chain);
587            assert_eq!(
588                restored.len(),
589                data.len(),
590                "length mismatch in {mode} mode",
591            );
592            assert_eq!(restored, data.to_vec(), "roundtrip failed in {mode} mode");
593        }
594    }
595}