Skip to main content

datacortex_core/format/
mod.rs

1//! Format detection and preprocessing pipeline.
2//!
3//! Phase 0: heuristic detection.
4//! Phase 1: format-aware preprocessing (JSON key interning) + detection.
5
6pub mod json;
7pub mod json_array;
8pub mod ndjson;
9pub mod schema;
10pub mod transform;
11pub mod typed_encoding;
12pub mod value_dict;
13
14use crate::dcx::{FormatHint, Mode};
15use transform::{
16    TRANSFORM_JSON_ARRAY_COLUMNAR, TRANSFORM_JSON_KEY_INTERN, TRANSFORM_NDJSON_COLUMNAR,
17    TRANSFORM_NESTED_FLATTEN, TRANSFORM_TYPED_ENCODING, TRANSFORM_VALUE_DICT, TransformChain,
18};
19
20/// Detect file format from content bytes.
21pub fn detect_format(data: &[u8]) -> FormatHint {
22    if data.is_empty() {
23        return FormatHint::Generic;
24    }
25
26    let trimmed = trim_leading_whitespace(data);
27
28    if starts_with_byte(trimmed, b'{') || starts_with_byte(trimmed, b'[') {
29        if is_ndjson(data) {
30            return FormatHint::Ndjson;
31        }
32        return FormatHint::Json;
33    }
34
35    FormatHint::Generic
36}
37
38/// Detect format from file extension (fallback).
39pub fn detect_from_extension(path: &str) -> Option<FormatHint> {
40    let ext = path.rsplit('.').next()?.to_lowercase();
41    match ext.as_str() {
42        "json" => Some(FormatHint::Json),
43        "ndjson" | "jsonl" => Some(FormatHint::Ndjson),
44        _ => None,
45    }
46}
47
48/// Apply format-aware preprocessing transforms.
49/// Returns (preprocessed_data, transform_chain).
50///
51/// NDJSON columnar: ALL modes (grouping similar values helps both zstd and CM).
52/// Key interning: Balanced/Max only (hurts Fast mode due to zstd redundancy).
53/// For NDJSON, columnar is applied FIRST — if it succeeds, key interning is skipped
54/// (keys are already removed from the data stream by the columnar transform).
55pub fn preprocess(data: &[u8], format: FormatHint, mode: Mode) -> (Vec<u8>, TransformChain) {
56    let mut chain = TransformChain::new();
57    let mut current = data.to_vec();
58
59    // Track whether a uniform columnar transform was applied (for value dict chaining).
60    // Uniform columnar = data is \x00/\x01-separated, downstream transforms are compatible.
61    let mut columnar_applied = false;
62    // Track whether ANY ndjson transform was applied (uniform or grouped).
63    let mut ndjson_transform_applied = false;
64
65    // NDJSON columnar reorg: ALL modes (dramatic improvement for uniform NDJSON).
66    // Strategy 1 (uniform, version=1) produces \x00/\x01 separated columnar data.
67    // Strategy 2 (grouped, version=2) produces a different format with per-group data.
68    // Only Strategy 1 output is compatible with downstream typed_encoding/value_dict.
69    if format == FormatHint::Ndjson {
70        if let Some(result) = ndjson::preprocess(&current) {
71            let is_uniform_columnar = !result.metadata.is_empty() && result.metadata[0] == 1;
72            chain.push(TRANSFORM_NDJSON_COLUMNAR, result.metadata);
73            current = result.data;
74            ndjson_transform_applied = true;
75            columnar_applied = is_uniform_columnar;
76        }
77    }
78
79    // JSON array columnar reorg: ALL modes.
80    // Strategy 1 (uniform, version=1) produces \x00/\x01 separated columnar data.
81    // Strategy 2 (grouped, version=2) produces a different format with per-group data.
82    // Only Strategy 1 output is compatible with downstream typed_encoding/value_dict/nested_flatten.
83    let mut json_array_applied = false;
84    if !columnar_applied && !ndjson_transform_applied && format == FormatHint::Json {
85        if let Some(result) = json_array::preprocess(&current) {
86            let is_uniform = !result.metadata.is_empty() && result.metadata[0] == 1;
87            chain.push(TRANSFORM_JSON_ARRAY_COLUMNAR, result.metadata);
88            current = result.data;
89            json_array_applied = true;
90            columnar_applied = is_uniform;
91        }
92    }
93
94    // Nested flatten: decompose nested objects into sub-columns.
95    // Works on any \x00/\x01 columnar data. Only for non-NDJSON paths because
96    // the NDJSON uniform path already handles its own nested flatten internally.
97    if columnar_applied && !ndjson_transform_applied {
98        // Extract num_rows from the json_array metadata (offset 1, u32 LE).
99        let ja_meta = &chain.records.last().unwrap().metadata;
100        if ja_meta.len() >= 5 {
101            let num_rows = u32::from_le_bytes(ja_meta[1..5].try_into().unwrap()) as usize;
102            if let Some((flat_data, nested_groups)) =
103                ndjson::flatten_nested_columns(&current, num_rows)
104            {
105                // Build metadata: num_rows + total_flat_cols + serialized nested info.
106                let total_flat_cols = flat_data.split(|&b| b == 0x00).count() as u16;
107                let mut nested_meta = Vec::new();
108                nested_meta.extend_from_slice(&(num_rows as u32).to_le_bytes());
109                nested_meta.extend_from_slice(&total_flat_cols.to_le_bytes());
110                nested_meta.extend_from_slice(&ndjson::serialize_nested_info(&nested_groups));
111                chain.push(TRANSFORM_NESTED_FLATTEN, nested_meta);
112                current = flat_data;
113            }
114        }
115    }
116
117    // Typed encoding: Fast mode ONLY. CM mode doesn't benefit (gotcha #35 confirmed).
118    // Binary encoding disrupts CM's learned text patterns. But zstd benefits from
119    // smaller raw data (delta varints, boolean bitmaps).
120    if columnar_applied && mode == Mode::Fast {
121        if let Some(result) = typed_encoding::preprocess(&current) {
122            chain.push(TRANSFORM_TYPED_ENCODING, result.metadata);
123            current = result.data;
124        }
125    }
126
127    // Value dictionary: chain AFTER any columnar transform.
128    // Replaces repeated multi-byte values with single-byte codes.
129    // Only applies to columnar data (uses \x00/\x01 separators).
130    // NOTE: value dict only operates on \x00/\x01-separated data.
131    // If typed encoding was applied, the data is now binary (no separators),
132    // so value dict will naturally not apply (it won't find separators to split on,
133    // or its size check will fail).
134    if columnar_applied {
135        if let Some(result) = value_dict::preprocess(&current) {
136            chain.push(TRANSFORM_VALUE_DICT, result.metadata);
137            current = result.data;
138        }
139    }
140
141    if columnar_applied || ndjson_transform_applied || json_array_applied {
142        return (current, chain);
143    }
144
145    // JSON key interning: Balanced/Max only (hurts Fast mode due to zstd redundancy).
146    if matches!(mode, Mode::Balanced | Mode::Max)
147        && matches!(format, FormatHint::Json | FormatHint::Ndjson)
148        && let Some(result) = json::preprocess(&current)
149    {
150        chain.push(TRANSFORM_JSON_KEY_INTERN, result.metadata);
151        current = result.data;
152    }
153
154    (current, chain)
155}
156
157/// Reverse preprocessing transforms (applied in reverse order).
158pub fn reverse_preprocess(data: &[u8], chain: &TransformChain) -> Vec<u8> {
159    let mut current = data.to_vec();
160
161    // Apply in reverse order.
162    for record in chain.records.iter().rev() {
163        match record.id {
164            TRANSFORM_JSON_KEY_INTERN => {
165                current = json::reverse(&current, &record.metadata);
166            }
167            TRANSFORM_NDJSON_COLUMNAR => {
168                current = ndjson::reverse(&current, &record.metadata);
169            }
170            TRANSFORM_JSON_ARRAY_COLUMNAR => {
171                current = json_array::reverse(&current, &record.metadata);
172            }
173            TRANSFORM_VALUE_DICT => {
174                current = value_dict::reverse(&current, &record.metadata);
175            }
176            TRANSFORM_TYPED_ENCODING => {
177                current = typed_encoding::reverse(&current, &record.metadata);
178            }
179            TRANSFORM_NESTED_FLATTEN => {
180                // Metadata: num_rows (u32 LE) + total_flat_cols (u16 LE) + nested_info.
181                if record.metadata.len() >= 6 {
182                    let num_rows =
183                        u32::from_le_bytes(record.metadata[0..4].try_into().unwrap()) as usize;
184                    let total_flat_cols =
185                        u16::from_le_bytes(record.metadata[4..6].try_into().unwrap()) as usize;
186                    if let Some((nested_groups, _)) =
187                        ndjson::deserialize_nested_info(&record.metadata[6..])
188                    {
189                        current = ndjson::unflatten_nested_columns(
190                            &current,
191                            &nested_groups,
192                            num_rows,
193                            total_flat_cols,
194                        );
195                    }
196                }
197            }
198            _ => {} // Unknown/legacy transform — skip.
199        }
200    }
201
202    current
203}
204
205// --- Detection helpers (unchanged from Phase 0) ---
206
207fn trim_leading_whitespace(data: &[u8]) -> &[u8] {
208    let start = data
209        .iter()
210        .position(|&b| !b.is_ascii_whitespace())
211        .unwrap_or(data.len());
212    &data[start..]
213}
214
215fn starts_with_byte(data: &[u8], byte: u8) -> bool {
216    data.first() == Some(&byte)
217}
218
219fn is_ndjson(data: &[u8]) -> bool {
220    let mut json_lines = 0;
221    let mut total_lines = 0;
222
223    for line in data.split(|&b| b == b'\n') {
224        let trimmed = trim_leading_whitespace(line);
225        if trimmed.is_empty() {
226            continue;
227        }
228        total_lines += 1;
229        if starts_with_byte(trimmed, b'{') {
230            json_lines += 1;
231        }
232    }
233
234    total_lines >= 2 && json_lines as f64 / total_lines as f64 > 0.8
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240
241    #[test]
242    fn detect_json() {
243        assert_eq!(detect_format(b"  {\"key\": \"value\"}"), FormatHint::Json);
244        assert_eq!(detect_format(b"[1, 2, 3]"), FormatHint::Json);
245    }
246
247    #[test]
248    fn detect_ndjson() {
249        let data = b"{\"a\":1}\n{\"b\":2}\n{\"c\":3}\n";
250        assert_eq!(detect_format(data), FormatHint::Ndjson);
251    }
252
253    #[test]
254    fn detect_generic_fallback() {
255        assert_eq!(detect_format(b""), FormatHint::Generic);
256        assert_eq!(detect_format(b"just some random text"), FormatHint::Generic);
257    }
258
259    #[test]
260    fn extension_detection() {
261        assert_eq!(detect_from_extension("test.json"), Some(FormatHint::Json));
262        assert_eq!(
263            detect_from_extension("data.ndjson"),
264            Some(FormatHint::Ndjson)
265        );
266        assert_eq!(detect_from_extension("file.txt"), None);
267    }
268
269    #[test]
270    fn preprocess_json_key_interning() {
271        let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25}"#;
272        let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
273        assert!(!chain.is_empty(), "should have applied key interning");
274        assert!(
275            preprocessed.len() < data.len(),
276            "preprocessed should be smaller"
277        );
278
279        // Reverse and verify.
280        let restored = reverse_preprocess(&preprocessed, &chain);
281        assert_eq!(restored, data.to_vec());
282    }
283
284    #[test]
285    fn preprocess_ndjson_columnar() {
286        let data = br#"{"ts":"a","val":1}
287{"ts":"b","val":2}
288{"ts":"c","val":3}
289"#;
290        let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Balanced);
291        assert!(!chain.is_empty());
292        // Should use columnar transform (ID 2), not key interning.
293        assert_eq!(
294            chain.records[0].id,
295            transform::TRANSFORM_NDJSON_COLUMNAR,
296            "NDJSON should use columnar transform"
297        );
298
299        let restored = reverse_preprocess(&preprocessed, &chain);
300        assert_eq!(restored, data.to_vec());
301    }
302
303    #[test]
304    fn preprocess_ndjson_columnar_fast_mode() {
305        // Columnar should apply for ALL modes, including Fast.
306        let data = br#"{"ts":"a","val":1}
307{"ts":"b","val":2}
308{"ts":"c","val":3}
309"#;
310        let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
311        assert!(!chain.is_empty());
312        assert_eq!(chain.records[0].id, transform::TRANSFORM_NDJSON_COLUMNAR);
313
314        let restored = reverse_preprocess(&preprocessed, &chain);
315        assert_eq!(restored, data.to_vec());
316
317        // Verify columnar data groups values.
318        let cols: Vec<&[u8]> = preprocessed.split(|&b| b == 0x00).collect();
319        assert_eq!(cols.len(), 2, "should have 2 columns");
320    }
321
322    #[test]
323    fn preprocess_json_array_columnar() {
324        let data = br#"{"data": [{"id": 1, "type": "a"}, {"id": 2, "type": "b"}, {"id": 3, "type": "c"}, {"id": 4, "type": "d"}, {"id": 5, "type": "e"}], "meta": {"count": 5}}"#;
325        let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
326        assert!(!chain.is_empty());
327        assert_eq!(
328            chain.records[0].id,
329            transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
330            "JSON with array should use array columnar transform"
331        );
332
333        let restored = reverse_preprocess(&preprocessed, &chain);
334        assert_eq!(restored, data.to_vec());
335    }
336
337    #[test]
338    fn preprocess_json_array_too_few_falls_through() {
339        // Only 3 elements — below MIN_ROWS, should fall through to key interning.
340        let data = br#"{"data": [{"id": 1, "type": "a"}, {"id": 2, "type": "a"}, {"id": 3, "type": "a"}], "meta": {"count": 3}, "data2": [{"id": 1, "type": "a"}, {"id": 2, "type": "a"}, {"id": 3, "type": "a"}]}"#;
341        let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Balanced);
342        // Should fall through to key interning (not array columnar).
343        if !chain.is_empty() {
344            assert_ne!(
345                chain.records[0].id,
346                transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
347                "3 elements should NOT trigger array columnar"
348            );
349        }
350
351        let restored = reverse_preprocess(&preprocessed, &chain);
352        assert_eq!(restored, data.to_vec());
353    }
354
355    #[test]
356    fn preprocess_non_json_passthrough() {
357        let data = b"just some plain text with no JSON keys";
358        let (preprocessed, chain) = preprocess(data, FormatHint::Generic, Mode::Fast);
359        assert!(chain.is_empty());
360        assert_eq!(preprocessed, data.to_vec());
361    }
362
363    #[test]
364    fn test_json_array_nested_flatten_roundtrip() {
365        // JSON array with nested objects — should apply json_array columnar + nested flatten.
366        let mut json = String::from(r#"{"data": ["#);
367        for i in 0..10 {
368            if i > 0 {
369                json.push_str(", ");
370            }
371            json.push_str(&format!(
372                r#"{{"id": {}, "name": "item_{}", "meta": {{"score": {}, "active": {}, "tag": "t{}"}}}}"#,
373                i, i, i * 10, if i % 2 == 0 { "true" } else { "false" }, i
374            ));
375        }
376        json.push_str(r#"], "total": 10}"#);
377
378        let data = json.as_bytes();
379        let (preprocessed, chain) = preprocess(data, FormatHint::Json, Mode::Fast);
380        assert!(!chain.is_empty());
381        assert_eq!(
382            chain.records[0].id,
383            transform::TRANSFORM_JSON_ARRAY_COLUMNAR,
384            "should apply json_array columnar first"
385        );
386
387        // Check that nested flatten was applied.
388        let has_nested_flatten = chain
389            .records
390            .iter()
391            .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN);
392        assert!(
393            has_nested_flatten,
394            "should apply nested flatten for objects with nested fields"
395        );
396
397        // Verify byte-exact roundtrip.
398        let restored = reverse_preprocess(&preprocessed, &chain);
399        assert_eq!(
400            String::from_utf8_lossy(&restored),
401            String::from_utf8_lossy(data),
402        );
403        assert_eq!(restored, data.to_vec());
404    }
405
406    #[test]
407    fn test_json_array_nested_flatten_improves_ratio() {
408        // Build a dataset where nested flatten demonstrably helps:
409        // many rows with a nested object having repeated/similar values.
410        let mut json = String::from(r#"{"items": ["#);
411        for i in 0..50 {
412            if i > 0 {
413                json.push_str(", ");
414            }
415            json.push_str(&format!(
416                r#"{{"id": {}, "user": {{"name": "user_{}", "role": "admin", "level": {}, "verified": true, "email": "user_{}@test.com"}}}}"#,
417                i, i, i % 5, i
418            ));
419        }
420        json.push_str(r#"]}"#);
421
422        let data = json.as_bytes();
423
424        // Preprocess WITH nested flatten (current code).
425        let (preprocessed_with, chain_with) = preprocess(data, FormatHint::Json, Mode::Fast);
426        assert!(
427            chain_with
428                .records
429                .iter()
430                .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN),
431            "nested flatten should activate"
432        );
433
434        // Verify roundtrip.
435        let restored = reverse_preprocess(&preprocessed_with, &chain_with);
436        assert_eq!(restored, data.to_vec());
437
438        // The preprocessed data should have more columns (sub-columns from nested objects).
439        let num_cols_with = preprocessed_with.split(|&b| b == 0x00).count();
440        // Without nested flatten, json_array produces 2 columns (id, user).
441        // With nested flatten, user is decomposed into 5 sub-columns, so total = 1 + 5 = 6.
442        assert!(
443            num_cols_with > 2,
444            "nested flatten should produce more columns: got {}",
445            num_cols_with
446        );
447    }
448
449    #[test]
450    fn test_ndjson_unaffected() {
451        // NDJSON with nested objects — should use NDJSON path, NOT the standalone nested flatten.
452        let mut ndjson = String::new();
453        for i in 0..10 {
454            ndjson.push_str(&format!(
455                r#"{{"id":{},"user":{{"name":"u{}","level":{}}}}}"#,
456                i,
457                i,
458                i % 3
459            ));
460            ndjson.push('\n');
461        }
462
463        let data = ndjson.as_bytes();
464        let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
465        assert!(!chain.is_empty());
466        assert_eq!(
467            chain.records[0].id,
468            transform::TRANSFORM_NDJSON_COLUMNAR,
469            "NDJSON should use its own columnar transform"
470        );
471
472        // Should NOT have standalone TRANSFORM_NESTED_FLATTEN in chain.
473        let has_standalone_nested = chain
474            .records
475            .iter()
476            .any(|r| r.id == transform::TRANSFORM_NESTED_FLATTEN);
477        assert!(
478            !has_standalone_nested,
479            "NDJSON path should NOT use standalone nested flatten (it handles nesting internally)"
480        );
481
482        // Verify roundtrip.
483        let restored = reverse_preprocess(&preprocessed, &chain);
484        assert_eq!(restored, data.to_vec());
485    }
486
487    #[test]
488    fn test_ndjson_large_delta_integer_roundtrip() {
489        // Regression: NDJSON with integers spanning the epoch-timestamp range
490        // (e.g. 2147483647 = i32::MAX) caused schema misclassification and
491        // CRC-32 mismatch on decompression in Fast mode.
492        let edges: &[i64] = &[
493            0, -1, 1, -2147483648, 2147483647, -9007199254740991, 9007199254740991,
494        ];
495        let mut ndjson = String::new();
496        for i in 0..203 {
497            ndjson.push_str(&format!(
498                "{{\"val\":{},\"idx\":{}}}\n",
499                edges[i % 7],
500                i
501            ));
502        }
503
504        let data = ndjson.as_bytes();
505
506        // Full pipeline roundtrip (ndjson columnar + typed encoding in Fast mode).
507        let (preprocessed, chain) = preprocess(data, FormatHint::Ndjson, Mode::Fast);
508
509        // Verify typed encoding was applied.
510        assert!(
511            chain
512                .records
513                .iter()
514                .any(|r| r.id == transform::TRANSFORM_TYPED_ENCODING),
515            "typed encoding should be applied in Fast mode"
516        );
517
518        let restored = reverse_preprocess(&preprocessed, &chain);
519        assert_eq!(restored, data.to_vec(), "byte-exact roundtrip failed");
520    }
521}