Skip to main content

preprocess

Function preprocess 

Source
pub fn preprocess(
    data: &[u8],
    format: FormatHint,
    mode: Mode,
) -> (Vec<u8>, TransformChain)
Expand description

Apply format-aware preprocessing transforms. Returns (preprocessed_data, transform_chain).

NDJSON columnar: ALL modes (grouping similar values helps both zstd and CM). Key interning: Balanced/Max only (hurts Fast mode due to zstd redundancy). For NDJSON, columnar is applied FIRST — if it succeeds, key interning is skipped (keys are already removed from the data stream by the columnar transform).

Examples found in repository?
examples/test_pipeline.rs (line 10)
4fn main() {
5    let data = std::fs::read("corpus/test-ndjson.ndjson").unwrap();
6    let format = detect_format(&data);
7    eprintln!("Format: {:?}", format);
8    eprintln!("Original: {} bytes", data.len());
9
10    let (preprocessed, chain) = preprocess(&data, format, Mode::Balanced);
11    eprintln!("Preprocessed: {} bytes", preprocessed.len());
12    eprintln!("Transforms: {}", chain.records.len());
13    for (i, rec) in chain.records.iter().enumerate() {
14        eprintln!(
15            "  [{}] ID={} metadata={} bytes",
16            i,
17            rec.id,
18            rec.metadata.len()
19        );
20    }
21
22    let restored = reverse_preprocess(&preprocessed, &chain);
23    if restored == data {
24        eprintln!("Roundtrip: OK");
25    } else {
26        eprintln!("Roundtrip: FAILED ({} vs {})", restored.len(), data.len());
27    }
28}
More examples
Hide additional examples
examples/dump_transform.rs (line 31)
14fn main() {
15    let args: Vec<String> = env::args().collect();
16    if args.len() != 3 {
17        eprintln!("Usage: {} <input> <output>", args[0]);
18        process::exit(1);
19    }
20
21    let input_path = &args[1];
22    let output_path = &args[2];
23
24    let data = fs::read(input_path).expect("Failed to read input file");
25
26    // Detect format (try extension first, then content).
27    let format = datacortex_core::format::detect_from_extension(input_path)
28        .unwrap_or_else(|| detect_format(&data));
29
30    // Run preprocessing.
31    let (transformed, chain) = preprocess(&data, format, Mode::Balanced);
32
33    // Calculate metadata size.
34    let metadata_bytes = chain.serialize();
35    let metadata_size = metadata_bytes.len();
36
37    // Write transformed data to output.
38    fs::write(output_path, &transformed).expect("Failed to write output file");
39
40    // Report stats.
41    let name = std::path::Path::new(input_path)
42        .file_name()
43        .unwrap()
44        .to_str()
45        .unwrap();
46
47    eprintln!("=== {} ===", name);
48    eprintln!("Format:      {:?}", format);
49    eprintln!("Original:    {} bytes", data.len());
50    eprintln!("Transformed: {} bytes", transformed.len());
51    eprintln!("Metadata:    {} bytes", metadata_size);
52    eprintln!(
53        "Total (transform + meta): {} bytes",
54        transformed.len() + metadata_size
55    );
56    eprintln!(
57        "Transform ratio: {:.1}%",
58        (transformed.len() + metadata_size) as f64 / data.len() as f64 * 100.0
59    );
60
61    if transformed.len() == data.len() {
62        eprintln!("NOTE: No transform applied (output == input)");
63    }
64}
examples/trace_spacex.rs (line 11)
4fn main() {
5    let data = std::fs::read("/tmp/spacex.json").expect("read spacex.json");
6    let format = detect_format(&data);
7    eprintln!("Format: {:?}", format);
8    eprintln!("Original: {} bytes", data.len());
9
10    // Test Fast mode (where the bug is)
11    let (preprocessed, chain) = preprocess(&data, format, Mode::Fast);
12    eprintln!("Preprocessed: {} bytes", preprocessed.len());
13    eprintln!("Transforms: {}", chain.records.len());
14    for (i, rec) in chain.records.iter().enumerate() {
15        let name = match rec.id {
16            7 => "JSON_ARRAY_COLUMNAR",
17            15 => "NESTED_FLATTEN",
18            14 => "TYPED_ENCODING",
19            13 => "VALUE_DICT",
20            _ => "?",
21        };
22        eprintln!(
23            "  [{}] ID={} ({}) metadata={} bytes",
24            i,
25            rec.id,
26            name,
27            rec.metadata.len()
28        );
29    }
30
31    let restored = reverse_preprocess(&preprocessed, &chain);
32    if restored == data {
33        eprintln!("Full pipeline roundtrip: OK");
34    } else {
35        eprintln!(
36            "Full pipeline roundtrip: FAILED ({} vs {} bytes)",
37            restored.len(),
38            data.len()
39        );
40        // Find first difference
41        for (i, (a, b)) in restored.iter().zip(data.iter()).enumerate() {
42            if a != b {
43                let start = i.saturating_sub(20);
44                let end = (i + 20).min(restored.len()).min(data.len());
45                eprintln!("  First diff at byte {}", i);
46                eprintln!(
47                    "  Restored: {:?}",
48                    String::from_utf8_lossy(&restored[start..end])
49                );
50                eprintln!(
51                    "  Original: {:?}",
52                    String::from_utf8_lossy(&data[start..end])
53                );
54                break;
55            }
56        }
57        if restored.len() != data.len() {
58            eprintln!("  Length diff: {} vs {}", restored.len(), data.len());
59        }
60    }
61
62    // Test Balanced mode (should pass)
63    let (preprocessed_bal, chain_bal) = preprocess(&data, format, Mode::Balanced);
64    let restored_bal = reverse_preprocess(&preprocessed_bal, &chain_bal);
65    if restored_bal == data {
66        eprintln!("Balanced pipeline roundtrip: OK");
67    } else {
68        eprintln!("Balanced pipeline roundtrip: FAILED");
69    }
70}