pub fn preprocess(
data: &[u8],
format: FormatHint,
mode: Mode,
) -> (Vec<u8>, TransformChain)Expand description
Apply format-aware preprocessing transforms. Returns (preprocessed_data, transform_chain).
NDJSON columnar: ALL modes (grouping similar values helps both zstd and CM). Key interning: Balanced/Max only (hurts Fast mode due to zstd redundancy). For NDJSON, columnar is applied FIRST — if it succeeds, key interning is skipped (keys are already removed from the data stream by the columnar transform).
Examples found in repository?
examples/test_pipeline.rs (line 10)
4fn main() {
5 let data = std::fs::read("corpus/test-ndjson.ndjson").unwrap();
6 let format = detect_format(&data);
7 eprintln!("Format: {:?}", format);
8 eprintln!("Original: {} bytes", data.len());
9
10 let (preprocessed, chain) = preprocess(&data, format, Mode::Balanced);
11 eprintln!("Preprocessed: {} bytes", preprocessed.len());
12 eprintln!("Transforms: {}", chain.records.len());
13 for (i, rec) in chain.records.iter().enumerate() {
14 eprintln!(
15 " [{}] ID={} metadata={} bytes",
16 i,
17 rec.id,
18 rec.metadata.len()
19 );
20 }
21
22 let restored = reverse_preprocess(&preprocessed, &chain);
23 if restored == data {
24 eprintln!("Roundtrip: OK");
25 } else {
26 eprintln!("Roundtrip: FAILED ({} vs {})", restored.len(), data.len());
27 }
28}More examples
examples/dump_transform.rs (line 31)
14fn main() {
15 let args: Vec<String> = env::args().collect();
16 if args.len() != 3 {
17 eprintln!("Usage: {} <input> <output>", args[0]);
18 process::exit(1);
19 }
20
21 let input_path = &args[1];
22 let output_path = &args[2];
23
24 let data = fs::read(input_path).expect("Failed to read input file");
25
26 // Detect format (try extension first, then content).
27 let format = datacortex_core::format::detect_from_extension(input_path)
28 .unwrap_or_else(|| detect_format(&data));
29
30 // Run preprocessing.
31 let (transformed, chain) = preprocess(&data, format, Mode::Balanced);
32
33 // Calculate metadata size.
34 let metadata_bytes = chain.serialize();
35 let metadata_size = metadata_bytes.len();
36
37 // Write transformed data to output.
38 fs::write(output_path, &transformed).expect("Failed to write output file");
39
40 // Report stats.
41 let name = std::path::Path::new(input_path)
42 .file_name()
43 .unwrap()
44 .to_str()
45 .unwrap();
46
47 eprintln!("=== {} ===", name);
48 eprintln!("Format: {:?}", format);
49 eprintln!("Original: {} bytes", data.len());
50 eprintln!("Transformed: {} bytes", transformed.len());
51 eprintln!("Metadata: {} bytes", metadata_size);
52 eprintln!(
53 "Total (transform + meta): {} bytes",
54 transformed.len() + metadata_size
55 );
56 eprintln!(
57 "Transform ratio: {:.1}%",
58 (transformed.len() + metadata_size) as f64 / data.len() as f64 * 100.0
59 );
60
61 if transformed.len() == data.len() {
62 eprintln!("NOTE: No transform applied (output == input)");
63 }
64}examples/trace_spacex.rs (line 11)
4fn main() {
5 let data = std::fs::read("/tmp/spacex.json").expect("read spacex.json");
6 let format = detect_format(&data);
7 eprintln!("Format: {:?}", format);
8 eprintln!("Original: {} bytes", data.len());
9
10 // Test Fast mode (where the bug is)
11 let (preprocessed, chain) = preprocess(&data, format, Mode::Fast);
12 eprintln!("Preprocessed: {} bytes", preprocessed.len());
13 eprintln!("Transforms: {}", chain.records.len());
14 for (i, rec) in chain.records.iter().enumerate() {
15 let name = match rec.id {
16 7 => "JSON_ARRAY_COLUMNAR",
17 15 => "NESTED_FLATTEN",
18 14 => "TYPED_ENCODING",
19 13 => "VALUE_DICT",
20 _ => "?",
21 };
22 eprintln!(
23 " [{}] ID={} ({}) metadata={} bytes",
24 i,
25 rec.id,
26 name,
27 rec.metadata.len()
28 );
29 }
30
31 let restored = reverse_preprocess(&preprocessed, &chain);
32 if restored == data {
33 eprintln!("Full pipeline roundtrip: OK");
34 } else {
35 eprintln!(
36 "Full pipeline roundtrip: FAILED ({} vs {} bytes)",
37 restored.len(),
38 data.len()
39 );
40 // Find first difference
41 for (i, (a, b)) in restored.iter().zip(data.iter()).enumerate() {
42 if a != b {
43 let start = i.saturating_sub(20);
44 let end = (i + 20).min(restored.len()).min(data.len());
45 eprintln!(" First diff at byte {}", i);
46 eprintln!(
47 " Restored: {:?}",
48 String::from_utf8_lossy(&restored[start..end])
49 );
50 eprintln!(
51 " Original: {:?}",
52 String::from_utf8_lossy(&data[start..end])
53 );
54 break;
55 }
56 }
57 if restored.len() != data.len() {
58 eprintln!(" Length diff: {} vs {}", restored.len(), data.len());
59 }
60 }
61
62 // Test Balanced mode (should pass)
63 let (preprocessed_bal, chain_bal) = preprocess(&data, format, Mode::Balanced);
64 let restored_bal = reverse_preprocess(&preprocessed_bal, &chain_bal);
65 if restored_bal == data {
66 eprintln!("Balanced pipeline roundtrip: OK");
67 } else {
68 eprintln!("Balanced pipeline roundtrip: FAILED");
69 }
70}