Skip to main content

datacortex_core/
codec.rs

1//! Codec orchestrator — compress and decompress through the DataCortex pipeline.
2//!
3//! Phase 1: Format preprocessing + zstd (Fast mode).
4//! Phase 3: Full CM engine with higher-order models + mixer + APM (Balanced mode, ~256MB).
5//! Phase 5: Full CM engine with 2x context maps (Max mode, ~512MB).
6//! Phase 6: Dual-path CM + LLM with MetaMixer (Max mode with `neural` feature).
7//! Phase 7: Dual-path CM + GRU byte-level predictor (Balanced mode).
8
9use std::io::{self, Cursor, Read, Write};
10
11use crate::dcx::{DcxHeader, FormatHint, Mode};
12use crate::entropy::arithmetic::{ArithmeticDecoder, ArithmeticEncoder};
13use crate::format::transform::TransformChain;
14use crate::format::{detect_format, preprocess, reverse_preprocess};
15use crate::mixer::MetaMixer;
16use crate::model::gru_model::GruModel;
17use crate::model::{CMConfig, CMEngine};
18
19/// Adaptive zstd level for Fast mode based on preprocessed data size.
20///
21/// Smaller data compresses quickly even at high levels, so we use higher
22/// zstd levels for small-medium files without meaningful speed impact.
23/// If `level_override` is set (user passed --level), it always wins.
24fn adaptive_fast_level(data_size: usize, level_override: Option<i32>) -> i32 {
25    if let Some(level) = level_override {
26        return level; // User explicitly set level, respect it
27    }
28    // Empirically, zstd levels 9-15 produce nearly identical ratios on
29    // structured JSON (btlazy2 strategy plateau). The meaningful jump
30    // happens at level 16+ (btultra strategy). Level 13 wastes encode
31    // time without ratio gain over level 9.
32    //
33    // DataCortex benchmarks against zstd-19. Our preprocessing adds
34    // ~3-5% on top, but we need internal zstd at level 17+ to beat
35    // raw zstd-19 on diverse data like GH Archive.
36    //
37    // Encode time impact: preprocessing (columnar reorg, schema inference)
38    // dominates. With rayon parallelism the zstd level cost is marginal.
39    // Decode is completely unaffected by compression level.
40    match data_size {
41        0..=16_777_216 => 19,          // ≤16MB: best ratio, <3s encode on 10MB
42        16_777_217..=67_108_864 => 16, // 16-64MB: btultra breakpoint, good ratio
43        _ => 9,                        // >64MB: skip 10-15 plateau, use fast
44    }
45}
46
47// ─── Zstd Dictionary Training (Fast mode) ─────────────────────────────────────
48
49/// Minimum preprocessed data size to attempt dictionary training.
50/// Below this threshold the dictionary overhead exceeds any savings.
51const DICT_MIN_DATA_SIZE: usize = 8192;
52
53/// Target chunk size for splitting preprocessed data before per-chunk compression.
54/// Each chunk is compressed independently with the shared dictionary.
55/// Smaller chunks benefit more from dictionary priming, but each chunk has
56/// framing overhead (4 bytes size + zstd frame header ~10 bytes).
57/// Adaptive: scale with data size to avoid too many chunks.
58fn dict_chunk_size(data_len: usize) -> usize {
59    if data_len > 4_194_304 {
60        131_072 // 128 KB for > 4 MB
61    } else if data_len > 1_048_576 {
62        65_536 // 64 KB for 1 - 4 MB
63    } else if data_len > 262_144 {
64        32_768 // 32 KB for 256 KB - 1 MB
65    } else {
66        16_384 // 16 KB for smaller files
67    }
68}
69
70/// Maximum dictionary size based on input data size.
71/// Kept relatively small to minimize overhead. The dictionary primes each chunk's
72/// compressor context, so even a small dict provides most of the benefit.
73fn dict_max_size(data_len: usize) -> usize {
74    if data_len > 4_194_304 {
75        16_384 // 16 KB for > 4 MB
76    } else if data_len > 1_048_576 {
77        8_192 // 8 KB for 1 - 4 MB
78    } else {
79        4_096 // 4 KB for smaller files
80    }
81}
82
83/// Generate training samples from the data for dictionary training.
84///
85/// Uses column boundaries (0x00 separators) if available, otherwise fixed blocks.
86/// These samples are only used for `zstd::dict::from_samples`, NOT for the
87/// actual chunked compression (which uses `split_into_chunks`).
88fn generate_training_samples(data: &[u8], chunk_size: usize) -> Vec<&[u8]> {
89    // Try column boundaries (0x00 separators from columnar transform).
90    let col_chunks: Vec<&[u8]> = data.split(|&b| b == 0x00).collect();
91    if col_chunks.len() >= 5 {
92        let non_empty: Vec<&[u8]> = col_chunks.into_iter().filter(|c| !c.is_empty()).collect();
93        // Validate that the split produced reasonable samples. If the data is
94        // typed-encoded binary (not columnar text), 0x00 bytes are varint
95        // zeros, not column separators. Splitting on them creates thousands
96        // of tiny fragments that crash zstd dictionary training. Require
97        // non-empty samples with a minimum average size of 8 bytes.
98        if !non_empty.is_empty() {
99            let avg_len = non_empty.iter().map(|c| c.len()).sum::<usize>() / non_empty.len();
100            if avg_len >= 8 {
101                return non_empty;
102            }
103        }
104    }
105
106    // Fall back to fixed-size blocks for training.
107    split_into_chunks(data, chunk_size)
108}
109
110/// Split data into fixed-size chunks for per-chunk compression.
111/// Every byte is preserved exactly -- no bytes are lost at boundaries.
112fn split_into_chunks(data: &[u8], chunk_size: usize) -> Vec<&[u8]> {
113    let mut chunks = Vec::new();
114    let mut offset = 0;
115    while offset < data.len() {
116        let end = (offset + chunk_size).min(data.len());
117        chunks.push(&data[offset..end]);
118        offset = end;
119    }
120    chunks
121}
122
123/// Attempt chunk-based dictionary compression.
124///
125/// 1. Split data into chunks
126/// 2. Train a zstd dictionary on the chunks
127/// 3. Compress each chunk independently using the trained dictionary
128/// 4. Return the dict + all compressed chunks as a payload
129///
130/// Returns `Some(payload)` if the total is smaller than `plain_size`, else `None`.
131fn try_dict_compress(data: &[u8], level: i32, plain_size: usize) -> Option<Vec<u8>> {
132    let chunk_size = dict_chunk_size(data.len());
133
134    // Generate training samples (may use column boundaries for better diversity).
135    let training_samples = generate_training_samples(data, chunk_size);
136    if training_samples.len() < 5 {
137        return None;
138    }
139
140    let max_dict = dict_max_size(data.len());
141
142    // Train dictionary from the training samples.
143    let dict = zstd::dict::from_samples(&training_samples, max_dict).ok()?;
144    if dict.is_empty() {
145        return None;
146    }
147
148    // Split data into fixed-size chunks for per-chunk compression.
149    let chunks = split_into_chunks(data, chunk_size);
150
151    // Compress each chunk independently with the dictionary.
152    let mut compressor = zstd::bulk::Compressor::with_dictionary(level, &dict).ok()?;
153    let mut compressed_chunks: Vec<Vec<u8>> = Vec::with_capacity(chunks.len());
154    for chunk in &chunks {
155        let cc = compressor.compress(chunk).ok()?;
156        compressed_chunks.push(cc);
157    }
158
159    // Build payload:
160    //   [dict_size: u32 LE] [dict_bytes]
161    //   [num_chunks: u32 LE]
162    //   for each chunk: [chunk_compressed_size: u32 LE] [chunk_data]
163    let total_compressed: usize = compressed_chunks.iter().map(|c| 4 + c.len()).sum();
164    let payload_size = 4 + dict.len() + 4 + total_compressed;
165
166    // Only use dict if it beats plain compression.
167    if payload_size >= plain_size {
168        return None;
169    }
170
171    let mut payload = Vec::with_capacity(payload_size);
172    payload.extend_from_slice(&(dict.len() as u32).to_le_bytes());
173    payload.extend_from_slice(&dict);
174    payload.extend_from_slice(&(compressed_chunks.len() as u32).to_le_bytes());
175    for cc in &compressed_chunks {
176        payload.extend_from_slice(&(cc.len() as u32).to_le_bytes());
177        payload.extend_from_slice(cc);
178    }
179
180    Some(payload)
181}
182
183/// Decompress a chunk-based dictionary-compressed payload.
184///
185/// Payload format:
186///   [dict_size: u32 LE] [dict_bytes]
187///   [num_chunks: u32 LE]
188///   for each chunk: [chunk_compressed_size: u32 LE] [chunk_data]
189///
190/// Chunks are decompressed individually and concatenated.
191fn decompress_with_dict(payload: &[u8], capacity: usize) -> std::io::Result<Vec<u8>> {
192    if payload.len() < 4 {
193        return Err(io::Error::new(
194            io::ErrorKind::InvalidData,
195            "dict payload too short for dict_size",
196        ));
197    }
198    let mut pos = 0;
199
200    // Read dictionary.
201    let dict_size =
202        u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
203    pos += 4;
204    if payload.len() < pos + dict_size {
205        return Err(io::Error::new(
206            io::ErrorKind::InvalidData,
207            "dict payload truncated: dictionary bytes",
208        ));
209    }
210    let dict_bytes = &payload[pos..pos + dict_size];
211    pos += dict_size;
212
213    // Read number of chunks.
214    if payload.len() < pos + 4 {
215        return Err(io::Error::new(
216            io::ErrorKind::InvalidData,
217            "dict payload truncated: num_chunks",
218        ));
219    }
220    let num_chunks =
221        u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
222    pos += 4;
223
224    // Prepare decompressor with dictionary.
225    let mut decompressor = zstd::bulk::Decompressor::with_dictionary(dict_bytes)
226        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
227
228    let mut output = Vec::with_capacity(capacity);
229
230    for i in 0..num_chunks {
231        if payload.len() < pos + 4 {
232            return Err(io::Error::new(
233                io::ErrorKind::InvalidData,
234                format!("dict payload truncated: chunk {i} size"),
235            ));
236        }
237        let chunk_size =
238            u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
239        pos += 4;
240        if payload.len() < pos + chunk_size {
241            return Err(io::Error::new(
242                io::ErrorKind::InvalidData,
243                format!("dict payload truncated: chunk {i} data"),
244            ));
245        }
246        let chunk_data = &payload[pos..pos + chunk_size];
247        pos += chunk_size;
248
249        // Each chunk decompresses to at most chunk_size + some headroom.
250        let chunk_capacity = capacity.saturating_sub(output.len());
251        let decompressed = decompressor
252            .decompress(chunk_data, chunk_capacity)
253            .map_err(|e| {
254                io::Error::new(
255                    io::ErrorKind::InvalidData,
256                    format!("chunk {i} decompress failed: {e}"),
257                )
258            })?;
259        output.extend_from_slice(&decompressed);
260    }
261
262    Ok(output)
263}
264
265// ─── Brotli helpers (Fast mode auto-fallback) ─────────────────────────────────
266
267/// Brotli mode constants for `brotli_compress`.
268/// GENERIC (0): default, best for binary/preprocessed data.
269/// TEXT (1): optimized for UTF-8 text, better for raw JSON.
270const BROTLI_MODE_GENERIC: u32 = 0;
271const BROTLI_MODE_TEXT: u32 = 1;
272
273/// Compress `data` with brotli at the given quality (0-11) and mode.
274/// Use `BROTLI_MODE_TEXT` for raw UTF-8/JSON, `BROTLI_MODE_GENERIC` for preprocessed data.
275fn brotli_compress(data: &[u8], quality: u32, mode: u32) -> io::Result<Vec<u8>> {
276    use brotli::enc::backward_references::BrotliEncoderMode;
277    let mut output = Vec::new();
278    let brotli_mode = match mode {
279        1 => BrotliEncoderMode::BROTLI_MODE_TEXT,
280        _ => BrotliEncoderMode::BROTLI_MODE_GENERIC,
281    };
282    let params = brotli::enc::BrotliEncoderParams {
283        quality: quality as i32,
284        mode: brotli_mode,
285        ..Default::default()
286    };
287    brotli::BrotliCompress(&mut io::Cursor::new(data), &mut output, &params)?;
288    Ok(output)
289}
290
291/// Decompress a brotli stream. `max_size` is a capacity hint for the output buffer.
292fn brotli_decompress(data: &[u8]) -> io::Result<Vec<u8>> {
293    let mut output = Vec::new();
294    brotli::BrotliDecompress(&mut io::Cursor::new(data), &mut output)?;
295    Ok(output)
296}
297
298/// Compress data using the CM engine with the given configuration.
299/// Returns the compressed byte stream.
300fn cm_compress(data: &[u8], config: CMConfig) -> Vec<u8> {
301    let mut engine = CMEngine::with_config(config);
302    let mut encoder = ArithmeticEncoder::new();
303
304    for &byte in data {
305        for bpos in 0..8 {
306            let bit = (byte >> (7 - bpos)) & 1;
307            let p = engine.predict();
308            encoder.encode(bit, p);
309            engine.update(bit);
310        }
311    }
312
313    encoder.finish()
314}
315
316/// Decompress data using the CM engine with the given configuration.
317/// `compressed` is the arithmetic-coded stream, `original_size` is the expected output length.
318fn cm_decompress(compressed: &[u8], original_size: usize, config: CMConfig) -> Vec<u8> {
319    let mut engine = CMEngine::with_config(config);
320    let mut decoder = ArithmeticDecoder::new(compressed);
321    let mut output = Vec::with_capacity(original_size);
322
323    for _ in 0..original_size {
324        let mut byte_val: u8 = 0;
325        for bpos in 0..8 {
326            let p = engine.predict();
327            let bit = decoder.decode(p);
328            engine.update(bit);
329            byte_val |= bit << (7 - bpos);
330        }
331        output.push(byte_val);
332    }
333
334    output
335}
336
337// ─── GRU dual-path (CM + GRU byte predictor) ────────────────────────────────
338// The GRU provides a DIFFERENT signal from CM: byte-level cross-bit correlations.
339// It's blended AFTER the full CM pipeline via MetaMixer.
340// CRITICAL: encoder and decoder must produce IDENTICAL GRU + CM state.
341
342/// Compress using dual-path: CM engine + GRU byte predictor + MetaMixer.
343/// Used for Balanced mode.
344fn gru_compress(data: &[u8], config: CMConfig) -> Vec<u8> {
345    let mut engine = CMEngine::with_config(config);
346    let mut gru = GruModel::new();
347    let mut meta_mixer = MetaMixer::new(12); // 12% GRU weight
348    let mut encoder = ArithmeticEncoder::new();
349
350    let total_bytes = data.len();
351    let report_interval = if total_bytes > 100_000 {
352        total_bytes / 20
353    } else {
354        0
355    };
356
357    for (byte_idx, &byte) in data.iter().enumerate() {
358        for bpos in 0..8u8 {
359            let bit = (byte >> (7 - bpos)) & 1;
360
361            // CM prediction (full pipeline: 19 models + mixer + 7 APM).
362            let p_cm = engine.predict();
363
364            // GRU bit prediction from cached byte probs.
365            let partial = if bpos == 0 {
366                1u32
367            } else {
368                let mut p = 1u32;
369                for prev_bpos in 0..bpos {
370                    let prev_bit = (byte >> (7 - prev_bpos)) & 1;
371                    p = (p << 1) | prev_bit as u32;
372                }
373                p
374            };
375            let p_gru = gru.predict_bit(bpos, partial);
376
377            // MetaMixer blend.
378            let p_final = meta_mixer.blend(p_cm, p_gru);
379
380            encoder.encode(bit, p_final);
381            engine.update(bit);
382            meta_mixer.update(bit);
383        }
384
385        // Byte complete: train GRU on observed byte, then forward for next prediction.
386        gru.train(byte);
387        gru.forward(byte);
388
389        if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
390            let pct = (byte_idx + 1) * 100 / total_bytes;
391            eprint!("\r[gru] compressing... {pct}%");
392        }
393    }
394
395    if total_bytes > 100_000 {
396        eprintln!("\r[gru] compressing... 100%");
397    }
398
399    encoder.finish()
400}
401
402/// Decompress using dual-path: CM engine + GRU byte predictor + MetaMixer.
403/// Must produce IDENTICAL GRU + CM state as the encoder.
404fn gru_decompress(compressed: &[u8], original_size: usize, config: CMConfig) -> Vec<u8> {
405    let mut engine = CMEngine::with_config(config);
406    let mut gru = GruModel::new();
407    let mut meta_mixer = MetaMixer::new(12); // same 12% as encoder
408    let mut decoder = ArithmeticDecoder::new(compressed);
409    let mut output = Vec::with_capacity(original_size);
410
411    let report_interval = if original_size > 100_000 {
412        original_size / 20
413    } else {
414        0
415    };
416
417    for byte_idx in 0..original_size {
418        let mut byte_val: u8 = 0;
419
420        for bpos in 0..8u8 {
421            // CM prediction.
422            let p_cm = engine.predict();
423
424            // GRU bit prediction (same partial byte state as encoder).
425            let partial = if bpos == 0 {
426                1u32
427            } else {
428                let mut p = 1u32;
429                for prev_bpos in 0..bpos {
430                    let prev_bit = (byte_val >> (7 - prev_bpos)) & 1;
431                    p = (p << 1) | prev_bit as u32;
432                }
433                p
434            };
435            let p_gru = gru.predict_bit(bpos, partial);
436
437            // MetaMixer blend.
438            let p_final = meta_mixer.blend(p_cm, p_gru);
439
440            let bit = decoder.decode(p_final);
441            engine.update(bit);
442            meta_mixer.update(bit);
443            byte_val |= bit << (7 - bpos);
444        }
445
446        output.push(byte_val);
447
448        // Byte complete: train GRU then forward (same as encoder).
449        gru.train(byte_val);
450        gru.forward(byte_val);
451
452        if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
453            let pct = (byte_idx + 1) * 100 / original_size;
454            eprint!("\r[gru] decompressing... {pct}%");
455        }
456    }
457
458    if original_size > 100_000 {
459        eprintln!("\r[gru] decompressing... 100%");
460    }
461
462    output
463}
464
465// ─── Neural dual-path (CM + LLM) ─────────────────────────────────────────────
466// Feature-gated: only available when `neural` is enabled.
467// The LLM predictor runs alongside the CM engine. A MetaMixer blends them.
468// CRITICAL: encoder and decoder must produce IDENTICAL LLM + CM state.
469
470/// Compress using dual-path: CM engine + LLM predictor + MetaMixer.
471/// Only used for Max mode with neural feature enabled.
472#[cfg(feature = "neural")]
473fn neural_compress(
474    data: &[u8],
475    config: CMConfig,
476    llm: &mut datacortex_neural::LlmPredictor,
477    meta_mixer: &mut datacortex_neural::MetaMixer,
478) -> Vec<u8> {
479    let mut engine = CMEngine::with_config(config);
480    let mut encoder = ArithmeticEncoder::new();
481
482    // For the first byte, LLM has no context. Feed a zero byte to prime it.
483    // We need the LLM to have predicted byte probs BEFORE we start encoding.
484    // Strategy: process byte-by-byte. After encoding byte N, feed byte N to LLM
485    // to get predictions for byte N+1.
486
487    let total_bytes = data.len();
488    let mut bytes_processed = 0;
489    let report_interval = total_bytes / 20; // Report every 5%.
490
491    for (byte_idx, &byte) in data.iter().enumerate() {
492        // At this point, LLM has been fed bytes 0..byte_idx-1.
493        // LLM's cached_byte_probs predict byte_idx.
494
495        for bpos in 0..8u8 {
496            let bit = (byte >> (7 - bpos)) & 1;
497
498            // CM prediction.
499            let p_cm = engine.predict();
500
501            // LLM bit prediction.
502            // c0 is the partial byte being built: starts at 1, accumulates bits.
503            let partial = if bpos == 0 {
504                1u32
505            } else {
506                // Build partial from the bits we've already encoded for this byte.
507                let mut p = 1u32;
508                for prev_bpos in 0..bpos {
509                    let prev_bit = (byte >> (7 - prev_bpos)) & 1;
510                    p = (p << 1) | prev_bit as u32;
511                }
512                p
513            };
514            let p_llm = llm.predict_bit(bpos, partial);
515
516            // Meta-mixer blend.
517            let p_final = meta_mixer.blend(p_cm, p_llm);
518
519            encoder.encode(bit, p_final);
520            engine.update(bit);
521            meta_mixer.update(bit);
522        }
523
524        // Feed the completed byte to the LLM for next-byte prediction.
525        if let Err(e) = llm.predict_byte_probs(byte) {
526            // If LLM fails, it will return uniform on next call. Log but don't abort.
527            if byte_idx < 5 {
528                eprintln!("[neural] LLM predict error at byte {byte_idx}: {e}");
529            }
530        }
531
532        bytes_processed += 1;
533        if report_interval > 0 && bytes_processed % report_interval == 0 {
534            let pct = bytes_processed * 100 / total_bytes;
535            eprint!("\r[neural] compressing... {pct}%");
536        }
537    }
538
539    if total_bytes > 1000 {
540        eprintln!("\r[neural] compressing... 100%");
541    }
542
543    encoder.finish()
544}
545
546/// Decompress using dual-path: CM engine + LLM predictor + MetaMixer.
547/// Must produce IDENTICAL LLM + CM state as the encoder.
548#[cfg(feature = "neural")]
549fn neural_decompress(
550    compressed: &[u8],
551    original_size: usize,
552    config: CMConfig,
553    llm: &mut datacortex_neural::LlmPredictor,
554    meta_mixer: &mut datacortex_neural::MetaMixer,
555) -> Vec<u8> {
556    let mut engine = CMEngine::with_config(config);
557    let mut decoder = ArithmeticDecoder::new(compressed);
558    let mut output = Vec::with_capacity(original_size);
559
560    let report_interval = if original_size > 0 {
561        original_size / 20
562    } else {
563        1
564    };
565
566    for byte_idx in 0..original_size {
567        let mut byte_val: u8 = 0;
568
569        for bpos in 0..8u8 {
570            // CM prediction.
571            let p_cm = engine.predict();
572
573            // LLM bit prediction (using same partial byte state as encoder).
574            let partial = if bpos == 0 {
575                1u32
576            } else {
577                // Build partial from bits already decoded for this byte.
578                let mut p = 1u32;
579                for prev_bpos in 0..bpos {
580                    let prev_bit = (byte_val >> (7 - prev_bpos)) & 1;
581                    p = (p << 1) | prev_bit as u32;
582                }
583                p
584            };
585            let p_llm = llm.predict_bit(bpos, partial);
586
587            // Meta-mixer blend.
588            let p_final = meta_mixer.blend(p_cm, p_llm);
589
590            let bit = decoder.decode(p_final);
591            engine.update(bit);
592            meta_mixer.update(bit);
593            byte_val |= bit << (7 - bpos);
594        }
595
596        output.push(byte_val);
597
598        // Feed decoded byte to LLM (same as encoder did).
599        if let Err(e) = llm.predict_byte_probs(byte_val) {
600            if byte_idx < 5 {
601                eprintln!("[neural] LLM predict error at byte {byte_idx}: {e}");
602            }
603        }
604
605        if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
606            let pct = (byte_idx + 1) * 100 / original_size;
607            eprint!("\r[neural] decompressing... {pct}%");
608        }
609    }
610
611    if original_size > 1000 {
612        eprintln!("\r[neural] decompressing... 100%");
613    }
614
615    output
616}
617
618/// Get the CMConfig for a given mode.
619fn cm_config_for_mode(mode: Mode) -> CMConfig {
620    match mode {
621        Mode::Max => CMConfig::max(),
622        Mode::Balanced => CMConfig::balanced(),
623        Mode::Fast => CMConfig::balanced(), // not used for Fast, but keeps API clean
624    }
625}
626
627/// Resolve the model path from:
628/// 1. Explicit path (--model-path CLI flag)
629/// 2. DATACORTEX_MODEL environment variable
630/// 3. Default: ~/.datacortex/models/SmolLM2-135M-Instruct-Q8_0.gguf
631#[cfg(feature = "neural")]
632fn resolve_model_path(explicit: Option<&str>) -> Option<String> {
633    if let Some(p) = explicit {
634        if std::path::Path::new(p).exists() {
635            return Some(p.to_string());
636        }
637        eprintln!("[neural] explicit model path not found: {p}");
638        return None;
639    }
640
641    if let Ok(p) = std::env::var("DATACORTEX_MODEL") {
642        if p.is_empty() {
643            // Explicitly set to empty = disable neural.
644            return None;
645        }
646        if std::path::Path::new(&p).exists() {
647            return Some(p);
648        }
649        eprintln!("[neural] DATACORTEX_MODEL path not found: {p}");
650        return None; // Don't fall through to default.
651    }
652
653    // Default location.
654    if let Some(home) = std::env::var_os("HOME") {
655        let default = format!(
656            "{}/.datacortex/models/SmolLM2-135M-Instruct-Q8_0.gguf",
657            home.to_string_lossy()
658        );
659        if std::path::Path::new(&default).exists() {
660            return Some(default);
661        }
662    }
663
664    None
665}
666
667/// Train a zstd dictionary from multiple sample files.
668///
669/// Each sample should be a complete JSON/NDJSON file's bytes. The function
670/// splits them into training fragments and calls `zstd::dict::from_samples`.
671///
672/// `max_dict_size` controls the max dictionary size in bytes (typical: 32768-131072).
673/// Returns the trained dictionary bytes.
674pub fn train_dict(samples: &[&[u8]], max_dict_size: usize) -> io::Result<Vec<u8>> {
675    if samples.is_empty() {
676        return Err(io::Error::other(
677            "no samples provided for dictionary training",
678        ));
679    }
680
681    // Collect training fragments: split each sample into reasonable chunks.
682    let mut fragments: Vec<&[u8]> = Vec::new();
683    for sample in samples {
684        if sample.is_empty() {
685            continue;
686        }
687        // For NDJSON: split by newlines (each line is a training fragment).
688        let lines: Vec<&[u8]> = sample
689            .split(|&b| b == b'\n')
690            .filter(|l| !l.is_empty())
691            .collect();
692        if lines.len() >= 5 {
693            fragments.extend(lines);
694        } else {
695            // For non-NDJSON: use fixed-size blocks.
696            let chunk_size = 4096.min(sample.len());
697            let mut offset = 0;
698            while offset < sample.len() {
699                let end = (offset + chunk_size).min(sample.len());
700                fragments.push(&sample[offset..end]);
701                offset = end;
702            }
703        }
704    }
705
706    if fragments.len() < 5 {
707        return Err(io::Error::other(
708            "not enough training data (need at least 5 fragments)",
709        ));
710    }
711
712    let dict = zstd::dict::from_samples(&fragments, max_dict_size)
713        .map_err(|e| io::Error::other(format!("dictionary training failed: {e}")))?;
714
715    if dict.is_empty() {
716        return Err(io::Error::other(
717            "dictionary training produced empty dictionary",
718        ));
719    }
720
721    Ok(dict)
722}
723
724/// Compress `data` into .dcx format, writing to `output`.
725pub fn compress<W: Write>(
726    data: &[u8],
727    mode: Mode,
728    format_override: Option<FormatHint>,
729    output: &mut W,
730) -> io::Result<()> {
731    compress_with_model(data, mode, format_override, None, output)
732}
733
734/// Compress with optional explicit model path (for neural Max mode).
735pub fn compress_with_model<W: Write>(
736    data: &[u8],
737    mode: Mode,
738    format_override: Option<FormatHint>,
739    model_path: Option<&str>,
740    output: &mut W,
741) -> io::Result<()> {
742    compress_with_options(data, mode, format_override, model_path, None, output)
743}
744
745/// Compress with optional explicit model path and zstd level override.
746pub fn compress_with_options<W: Write>(
747    data: &[u8],
748    mode: Mode,
749    format_override: Option<FormatHint>,
750    model_path: Option<&str>,
751    zstd_level_override: Option<i32>,
752    output: &mut W,
753) -> io::Result<()> {
754    compress_with_full_options(
755        data,
756        mode,
757        format_override,
758        model_path,
759        zstd_level_override,
760        None,
761        output,
762    )
763}
764
765/// Compress with all options including external dictionary.
766pub fn compress_with_full_options<W: Write>(
767    data: &[u8],
768    mode: Mode,
769    format_override: Option<FormatHint>,
770    model_path: Option<&str>,
771    zstd_level_override: Option<i32>,
772    external_dict: Option<&[u8]>,
773    output: &mut W,
774) -> io::Result<()> {
775    let format_hint = format_override.unwrap_or_else(|| detect_format(data));
776    let crc = crc32fast::hash(data);
777
778    // Step 1: Format-aware preprocessing.
779    let (preprocessed, chain) = preprocess(data, format_hint, mode);
780    let transform_metadata = if chain.is_empty() {
781        vec![]
782    } else {
783        chain.serialize()
784    };
785
786    // Step 2: Compress with engine.
787    let mut use_dict = false;
788    let mut use_brotli = false;
789    // Track whether raw fallback won (empty transform chain).
790    let mut use_raw_fallback = false;
791    // Track whether metadata is embedded in the compressed stream.
792    let mut use_meta_embedded = false;
793    let compressed = match mode {
794        // Fast mode: auto-fallback — try preprocessed+zstd, raw+zstd, raw+brotli,
795        // preprocessed+brotli, and embedded-metadata+brotli. Keep whichever produces
796        // the smallest output (including header and metadata overhead).
797        //
798        // Preprocessing (columnar + typed encoding) usually helps zstd by grouping
799        // similar values. But for some files (e.g. citm_catalog.json with extreme
800        // repetition), raw zstd without preprocessing gives MUCH better results
801        // because preprocessing removes the repetition patterns zstd's LZ77 exploits.
802        //
803        // Brotli at quality 11 can beat zstd on some JSON files (e.g. twitter.json)
804        // because its context modeling handles certain data patterns better.
805        //
806        // For small files with transforms, embedding metadata inside the brotli stream
807        // saves the separate metadata overhead (~150 bytes), because brotli compresses
808        // the 4-byte length prefix + raw metadata nearly for free.
809        Mode::Fast => {
810            // Fast mode: auto-fallback with PARALLEL path evaluation.
811            // All 6+ compression paths run concurrently via rayon, then we
812            // keep whichever produces the smallest output.
813            use std::sync::Mutex;
814
815            let level = adaptive_fast_level(preprocessed.len(), zstd_level_override);
816            let raw_level = adaptive_fast_level(data.len(), zstd_level_override);
817
818            // Estimate compressed metadata size for fair comparison.
819            let meta_size_for_comparison = if transform_metadata.len() > 64 {
820                let compressed_meta = zstd::bulk::compress(&transform_metadata, 19)
821                    .unwrap_or_else(|_| transform_metadata.clone());
822                compressed_meta.len().min(transform_metadata.len())
823            } else {
824                transform_metadata.len()
825            };
826
827            // Build embedded metadata payload (shared read-only across threads).
828            let embedded_payload = if !transform_metadata.is_empty() {
829                let mut ep = Vec::with_capacity(4 + transform_metadata.len() + preprocessed.len());
830                ep.extend_from_slice(&(transform_metadata.len() as u32).to_le_bytes());
831                ep.extend_from_slice(&transform_metadata);
832                ep.extend_from_slice(&preprocessed);
833                Some(ep)
834            } else {
835                None
836            };
837
838            // Each path result: (compressed_bytes, total_size, use_dict, use_raw, use_brotli, use_embedded)
839            type PathResult = (Vec<u8>, usize, bool, bool, bool, bool);
840            let results = Mutex::new(Vec::<PathResult>::with_capacity(8));
841
842            rayon::scope(|s| {
843                // Path A: preprocessed + zstd (with optional dict).
844                s.spawn(|_| {
845                    if let Ok(plain) = zstd::bulk::compress(&preprocessed, level) {
846                        let (compressed, is_dict) = if let Some(ext_dict) = external_dict {
847                            // Use externally provided dictionary.
848                            let chunk_size = dict_chunk_size(preprocessed.len());
849                            let chunks = split_into_chunks(&preprocessed, chunk_size);
850                            if let Ok(mut compressor) =
851                                zstd::bulk::Compressor::with_dictionary(level, ext_dict)
852                            {
853                                let mut ok = true;
854                                let mut cc_list = Vec::with_capacity(chunks.len());
855                                for chunk in &chunks {
856                                    match compressor.compress(chunk) {
857                                        Ok(cc) => cc_list.push(cc),
858                                        Err(_) => {
859                                            ok = false;
860                                            break;
861                                        }
862                                    }
863                                }
864                                if ok {
865                                    let total_cc: usize = cc_list.iter().map(|c| 4 + c.len()).sum();
866                                    let payload_size = 4 + ext_dict.len() + 4 + total_cc;
867                                    if payload_size < plain.len() {
868                                        let mut payload = Vec::with_capacity(payload_size);
869                                        payload.extend_from_slice(
870                                            &(ext_dict.len() as u32).to_le_bytes(),
871                                        );
872                                        payload.extend_from_slice(ext_dict);
873                                        payload.extend_from_slice(
874                                            &(cc_list.len() as u32).to_le_bytes(),
875                                        );
876                                        for cc in &cc_list {
877                                            payload.extend_from_slice(
878                                                &(cc.len() as u32).to_le_bytes(),
879                                            );
880                                            payload.extend_from_slice(cc);
881                                        }
882                                        (payload, true)
883                                    } else {
884                                        (plain, false)
885                                    }
886                                } else {
887                                    (plain, false)
888                                }
889                            } else {
890                                (plain, false)
891                            }
892                        } else if preprocessed.len() >= DICT_MIN_DATA_SIZE {
893                            if let Some(dict_payload) =
894                                try_dict_compress(&preprocessed, level, plain.len())
895                            {
896                                (dict_payload, true)
897                            } else {
898                                (plain, false)
899                            }
900                        } else {
901                            (plain, false)
902                        };
903                        let total = 32 + meta_size_for_comparison + compressed.len();
904                        results
905                            .lock()
906                            .unwrap()
907                            .push((compressed, total, is_dict, false, false, false));
908                    }
909                });
910
911                // Path B: raw zstd (no preprocessing).
912                s.spawn(|_| {
913                    if let Ok(compressed) = zstd::bulk::compress(data, raw_level) {
914                        let total = 32 + compressed.len();
915                        results
916                            .lock()
917                            .unwrap()
918                            .push((compressed, total, false, true, false, false));
919                    }
920                });
921
922                // Path C: raw + brotli (TEXT mode).
923                s.spawn(|_| {
924                    let q = if data.len() <= 1_048_576 { 11 } else { 9 };
925                    if let Ok(compressed) = brotli_compress(data, q, BROTLI_MODE_TEXT) {
926                        let total = 32 + compressed.len();
927                        results
928                            .lock()
929                            .unwrap()
930                            .push((compressed, total, false, true, true, false));
931                    }
932                });
933
934                // Path D: preprocessed + brotli (GENERIC mode, dual quality).
935                s.spawn(|_| {
936                    let max_q = if preprocessed.len() <= 1_048_576 {
937                        11
938                    } else {
939                        9
940                    };
941                    let qualities: &[u32] = if max_q == 11 {
942                        &[11, 10]
943                    } else {
944                        &[max_q as u32]
945                    };
946                    let mut best: Option<PathResult> = None;
947                    for &q in qualities {
948                        if let Ok(compressed) =
949                            brotli_compress(&preprocessed, q, BROTLI_MODE_GENERIC)
950                        {
951                            let total = 32 + meta_size_for_comparison + compressed.len();
952                            if best.as_ref().is_none_or(|b| total < b.1) {
953                                best = Some((compressed, total, false, false, true, false));
954                            }
955                        }
956                    }
957                    if let Some(r) = best {
958                        results.lock().unwrap().push(r);
959                    }
960                });
961
962                // Path E: embedded metadata + brotli (GENERIC mode, dual quality).
963                if let Some(ref ep) = embedded_payload {
964                    s.spawn(|_| {
965                        let max_q = if ep.len() <= 1_048_576 { 11 } else { 9 };
966                        let qualities: &[u32] = if max_q == 11 {
967                            &[11, 10]
968                        } else {
969                            &[max_q as u32]
970                        };
971                        let mut best: Option<PathResult> = None;
972                        for &q in qualities {
973                            if let Ok(compressed) = brotli_compress(ep, q, BROTLI_MODE_GENERIC) {
974                                let total = 32 + compressed.len();
975                                if best.as_ref().is_none_or(|b| total < b.1) {
976                                    best = Some((compressed, total, false, false, true, true));
977                                }
978                            }
979                        }
980                        if let Some(r) = best {
981                            results.lock().unwrap().push(r);
982                        }
983                    });
984                }
985
986                // Path F: embedded metadata + zstd.
987                if let Some(ref ep) = embedded_payload {
988                    s.spawn(|_| {
989                        let embed_level = adaptive_fast_level(ep.len(), zstd_level_override);
990                        if let Ok(compressed) = zstd::bulk::compress(ep, embed_level) {
991                            let total = 32 + compressed.len();
992                            results
993                                .lock()
994                                .unwrap()
995                                .push((compressed, total, false, false, false, true));
996                        }
997                    });
998                }
999            });
1000
1001            // Pick the smallest result.
1002            let results = results.into_inner().unwrap();
1003            let best = results
1004                .into_iter()
1005                .min_by_key(|r| r.1)
1006                .ok_or_else(|| io::Error::other("all compression paths failed"))?;
1007
1008            use_dict = best.2;
1009            use_raw_fallback = best.3;
1010            use_brotli = best.4;
1011            use_meta_embedded = best.5;
1012            best.0
1013        }
1014        // Balanced mode: dual-path CM + GRU byte predictor.
1015        Mode::Balanced => {
1016            let config = cm_config_for_mode(mode);
1017            let cm_data = gru_compress(&preprocessed, config);
1018            let mut payload = Vec::with_capacity(8 + cm_data.len());
1019            payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
1020            payload.extend_from_slice(&cm_data);
1021            payload
1022        }
1023        // Max mode: try neural dual-path, fall back to CM-only.
1024        Mode::Max => {
1025            let config = cm_config_for_mode(mode);
1026
1027            #[cfg(feature = "neural")]
1028            {
1029                if let Some(mpath) = resolve_model_path(model_path) {
1030                    match datacortex_neural::LlmPredictor::new(&mpath) {
1031                        Ok(mut llm) => {
1032                            let mut meta_mixer = datacortex_neural::MetaMixer::new(5);
1033                            eprintln!(
1034                                "[neural] Max mode: dual-path CM+LLM ({} bytes mapped)",
1035                                llm.mapped_bytes()
1036                            );
1037                            let cm_data =
1038                                neural_compress(&preprocessed, config, &mut llm, &mut meta_mixer);
1039                            let mut payload = Vec::with_capacity(8 + cm_data.len());
1040                            // Byte 0 of the 8-byte size prefix: set bit 7 to flag neural mode.
1041                            // This lets the decompressor know to use neural path.
1042                            let size_with_flag = preprocessed.len() as u64 | (1u64 << 63);
1043                            payload.extend_from_slice(&size_with_flag.to_le_bytes());
1044                            payload.extend_from_slice(&cm_data);
1045                            payload
1046                        }
1047                        Err(e) => {
1048                            eprintln!("[neural] LLM init failed, falling back to CM-only: {e}");
1049                            let cm_data = cm_compress(&preprocessed, config);
1050                            let mut payload = Vec::with_capacity(8 + cm_data.len());
1051                            payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
1052                            payload.extend_from_slice(&cm_data);
1053                            payload
1054                        }
1055                    }
1056                } else {
1057                    eprintln!(
1058                        "[neural] no model found, Max mode using CM-only. \
1059                         Set DATACORTEX_MODEL or use --model-path."
1060                    );
1061                    let cm_data = cm_compress(&preprocessed, config);
1062                    let mut payload = Vec::with_capacity(8 + cm_data.len());
1063                    payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
1064                    payload.extend_from_slice(&cm_data);
1065                    payload
1066                }
1067            }
1068
1069            #[cfg(not(feature = "neural"))]
1070            {
1071                let _ = model_path; // suppress unused warning
1072                let cm_data = cm_compress(&preprocessed, config);
1073                let mut payload = Vec::with_capacity(8 + cm_data.len());
1074                payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
1075                payload.extend_from_slice(&cm_data);
1076                payload
1077            }
1078        }
1079    };
1080
1081    // When raw fallback or embedded metadata won, use empty header metadata.
1082    // - Raw fallback: decompressor handles empty chains (just decompresses, no reverse transforms).
1083    // - Embedded: metadata lives inside the compressed stream, not in the header.
1084    let final_metadata = if use_raw_fallback || use_meta_embedded {
1085        vec![]
1086    } else {
1087        transform_metadata
1088    };
1089
1090    // Compress metadata with zstd if it's large enough to benefit.
1091    // Small metadata (<= 64 bytes) stays raw to avoid zstd frame overhead.
1092    // Skipped when metadata is embedded (final_metadata is empty).
1093    let (header_metadata, meta_compressed) = if final_metadata.len() > 64 {
1094        let compressed_meta =
1095            zstd::bulk::compress(&final_metadata, 19).unwrap_or_else(|_| final_metadata.clone());
1096        if compressed_meta.len() < final_metadata.len() {
1097            (compressed_meta, true)
1098        } else {
1099            (final_metadata, false)
1100        }
1101    } else {
1102        (final_metadata, false)
1103    };
1104
1105    let header = DcxHeader {
1106        mode,
1107        format_hint,
1108        original_size: data.len() as u64,
1109        compressed_size: compressed.len() as u64,
1110        crc32: crc,
1111        transform_metadata: header_metadata,
1112        has_dict: use_dict,
1113        meta_compressed,
1114        use_brotli,
1115        meta_embedded: use_meta_embedded,
1116    };
1117
1118    header.write_to(output)?;
1119    output.write_all(&compressed)?;
1120
1121    Ok(())
1122}
1123
1124/// Decompress a .dcx file from `input`, returning the original data.
1125pub fn decompress<R: Read>(input: &mut R) -> io::Result<Vec<u8>> {
1126    decompress_with_model(input, None)
1127}
1128
1129/// Decompress with optional explicit model path (for neural Max mode).
1130pub fn decompress_with_model<R: Read>(
1131    input: &mut R,
1132    model_path: Option<&str>,
1133) -> io::Result<Vec<u8>> {
1134    let header = DcxHeader::read_from(input)?;
1135
1136    let mut compressed = vec![0u8; header.compressed_size as usize];
1137    input.read_exact(&mut compressed)?;
1138
1139    // Step 1: Decompress with engine.
1140    let preprocessed = match header.mode {
1141        Mode::Fast => {
1142            if header.use_brotli {
1143                brotli_decompress(&compressed)?
1144            } else {
1145                let capacity = header.original_size as usize * 2 + 65536;
1146                if header.has_dict {
1147                    decompress_with_dict(&compressed, capacity)?
1148                } else {
1149                    zstd::bulk::decompress(&compressed, capacity)
1150                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
1151                }
1152            }
1153        }
1154        Mode::Balanced => {
1155            // Balanced mode: dual-path CM + GRU byte predictor.
1156            if compressed.len() < 8 {
1157                return Err(io::Error::new(
1158                    io::ErrorKind::InvalidData,
1159                    "CM mode compressed data too short",
1160                ));
1161            }
1162            let size_raw = u64::from_le_bytes(compressed[..8].try_into().expect("8-byte slice"));
1163            let preprocessed_size = (size_raw & !(1u64 << 63)) as usize;
1164            let config = cm_config_for_mode(header.mode);
1165            gru_decompress(&compressed[8..], preprocessed_size, config)
1166        }
1167        Mode::Max => {
1168            // Max mode: may use neural (LLM) dual-path or CM-only.
1169            if compressed.len() < 8 {
1170                return Err(io::Error::new(
1171                    io::ErrorKind::InvalidData,
1172                    "CM mode compressed data too short",
1173                ));
1174            }
1175            let size_raw = u64::from_le_bytes(compressed[..8].try_into().expect("8-byte slice"));
1176
1177            // Check if bit 63 is set (neural flag).
1178            let neural_flag = size_raw & (1u64 << 63) != 0;
1179            let preprocessed_size = (size_raw & !(1u64 << 63)) as usize;
1180            let config = cm_config_for_mode(header.mode);
1181
1182            if neural_flag {
1183                #[cfg(feature = "neural")]
1184                {
1185                    if let Some(mpath) = resolve_model_path(model_path) {
1186                        match datacortex_neural::LlmPredictor::new(&mpath) {
1187                            Ok(mut llm) => {
1188                                let mut meta_mixer = datacortex_neural::MetaMixer::new(5);
1189                                eprintln!(
1190                                    "[neural] decompressing with dual-path CM+LLM ({} bytes mapped)",
1191                                    llm.mapped_bytes()
1192                                );
1193                                neural_decompress(
1194                                    &compressed[8..],
1195                                    preprocessed_size,
1196                                    config,
1197                                    &mut llm,
1198                                    &mut meta_mixer,
1199                                )
1200                            }
1201                            Err(e) => {
1202                                return Err(io::Error::new(
1203                                    io::ErrorKind::Other,
1204                                    format!(
1205                                        "file was compressed with neural mode but LLM failed to load: {e}"
1206                                    ),
1207                                ));
1208                            }
1209                        }
1210                    } else {
1211                        return Err(io::Error::new(
1212                            io::ErrorKind::Other,
1213                            "file was compressed with neural mode but no model found. \
1214                             Set DATACORTEX_MODEL or use --model-path.",
1215                        ));
1216                    }
1217                }
1218
1219                #[cfg(not(feature = "neural"))]
1220                {
1221                    let _ = model_path;
1222                    return Err(io::Error::other(
1223                        "file was compressed with neural mode but this build lacks the \
1224                         `neural` feature. Rebuild with --features neural.",
1225                    ));
1226                }
1227            } else {
1228                cm_decompress(&compressed[8..], preprocessed_size, config)
1229            }
1230        }
1231    };
1232
1233    // Step 1.5: Handle embedded metadata OR separate metadata.
1234    // When meta_embedded is set, the decompressed stream starts with:
1235    //   [meta_len: u32 LE][raw_metadata][preprocessed_data]
1236    // We extract the metadata and the actual preprocessed data from the stream.
1237    let (preprocessed, transform_metadata) = if header.meta_embedded {
1238        if preprocessed.len() < 4 {
1239            return Err(io::Error::new(
1240                io::ErrorKind::InvalidData,
1241                "embedded metadata: decompressed stream too short for meta_len",
1242            ));
1243        }
1244        let meta_len =
1245            u32::from_le_bytes(preprocessed[0..4].try_into().expect("4-byte slice")) as usize;
1246        if preprocessed.len() < 4 + meta_len {
1247            return Err(io::Error::new(
1248                io::ErrorKind::InvalidData,
1249                format!(
1250                    "embedded metadata: stream too short for metadata ({} bytes needed, {} available)",
1251                    4 + meta_len,
1252                    preprocessed.len()
1253                ),
1254            ));
1255        }
1256        let metadata = preprocessed[4..4 + meta_len].to_vec();
1257        let actual_preprocessed = preprocessed[4 + meta_len..].to_vec();
1258        (actual_preprocessed, metadata)
1259    } else {
1260        // Decompress metadata if it was zstd-compressed (separate metadata path).
1261        // Use streaming decoder to avoid guessing decompressed size.
1262        let tm = if header.meta_compressed && !header.transform_metadata.is_empty() {
1263            let mut decoder =
1264                zstd::Decoder::new(Cursor::new(&header.transform_metadata)).map_err(|e| {
1265                    io::Error::new(
1266                        io::ErrorKind::InvalidData,
1267                        format!("failed to init metadata decompressor: {e}"),
1268                    )
1269                })?;
1270            let mut decompressed_meta = Vec::new();
1271            decoder.read_to_end(&mut decompressed_meta).map_err(|e| {
1272                io::Error::new(
1273                    io::ErrorKind::InvalidData,
1274                    format!("failed to decompress transform metadata: {e}"),
1275                )
1276            })?;
1277            decompressed_meta
1278        } else {
1279            header.transform_metadata.clone()
1280        };
1281        (preprocessed, tm)
1282    };
1283
1284    // Step 2: Reverse preprocessing.
1285    let data = if transform_metadata.is_empty() {
1286        preprocessed
1287    } else {
1288        let chain = TransformChain::deserialize(&transform_metadata)?;
1289        reverse_preprocess(&preprocessed, &chain)
1290    };
1291
1292    // CRC-32 integrity check.
1293    let crc = crc32fast::hash(&data);
1294    if crc != header.crc32 {
1295        return Err(io::Error::new(
1296            io::ErrorKind::InvalidData,
1297            format!(
1298                "CRC-32 mismatch: expected {:#010X}, got {:#010X}",
1299                header.crc32, crc
1300            ),
1301        ));
1302    }
1303
1304    if data.len() as u64 != header.original_size {
1305        return Err(io::Error::new(
1306            io::ErrorKind::InvalidData,
1307            format!(
1308                "size mismatch: header says {} bytes, got {}",
1309                header.original_size,
1310                data.len()
1311            ),
1312        ));
1313    }
1314
1315    Ok(data)
1316}
1317
1318/// Compress to Vec (convenience).
1319pub fn compress_to_vec(
1320    data: &[u8],
1321    mode: Mode,
1322    format_override: Option<FormatHint>,
1323) -> io::Result<Vec<u8>> {
1324    let mut buf = Vec::new();
1325    compress(data, mode, format_override, &mut buf)?;
1326    Ok(buf)
1327}
1328
1329/// Compress to Vec with explicit model path.
1330pub fn compress_to_vec_with_model(
1331    data: &[u8],
1332    mode: Mode,
1333    format_override: Option<FormatHint>,
1334    model_path: Option<&str>,
1335) -> io::Result<Vec<u8>> {
1336    let mut buf = Vec::new();
1337    compress_with_model(data, mode, format_override, model_path, &mut buf)?;
1338    Ok(buf)
1339}
1340
1341/// Compress to Vec with explicit model path and zstd level override.
1342pub fn compress_to_vec_with_options(
1343    data: &[u8],
1344    mode: Mode,
1345    format_override: Option<FormatHint>,
1346    model_path: Option<&str>,
1347    zstd_level_override: Option<i32>,
1348) -> io::Result<Vec<u8>> {
1349    let mut buf = Vec::new();
1350    compress_with_options(
1351        data,
1352        mode,
1353        format_override,
1354        model_path,
1355        zstd_level_override,
1356        &mut buf,
1357    )?;
1358    Ok(buf)
1359}
1360
1361/// Decompress from slice (convenience).
1362pub fn decompress_from_slice(dcx_data: &[u8]) -> io::Result<Vec<u8>> {
1363    let mut cursor = Cursor::new(dcx_data);
1364    decompress(&mut cursor)
1365}
1366
1367/// Read header only (for `info` command).
1368pub fn read_header<R: Read>(input: &mut R) -> io::Result<DcxHeader> {
1369    DcxHeader::read_from(input)
1370}
1371
1372/// Compress raw data with zstd at a given level (for benchmark comparison).
1373pub fn raw_zstd_compress(data: &[u8], level: i32) -> io::Result<Vec<u8>> {
1374    zstd::bulk::compress(data, level).map_err(io::Error::other)
1375}
1376
1377#[cfg(test)]
1378mod tests {
1379    use super::*;
1380
1381    #[test]
1382    fn fast_mode_roundtrip() {
1383        let original = b"Hello, DataCortex! This is a test of Fast mode compression.";
1384        let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1385        let decompressed = decompress_from_slice(&compressed).unwrap();
1386        assert_eq!(decompressed, original);
1387    }
1388
1389    #[test]
1390    fn fast_mode_json_roundtrip() {
1391        let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25,"name":"Carol","age":35}"#;
1392        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1393        let decompressed = decompress_from_slice(&compressed).unwrap();
1394        assert_eq!(decompressed, data.to_vec());
1395    }
1396
1397    #[test]
1398    fn balanced_mode_roundtrip() {
1399        let original = b"Balanced mode test data with some content.";
1400        let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1401        let decompressed = decompress_from_slice(&compressed).unwrap();
1402        assert_eq!(decompressed, original);
1403    }
1404
1405    #[test]
1406    fn balanced_mode_longer_text() {
1407        let original = b"The quick brown fox jumps over the lazy dog. This sentence contains every letter of the English alphabet at least once. We need enough data to properly exercise the arithmetic coder and order-0 model.";
1408        let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1409        let decompressed = decompress_from_slice(&compressed).unwrap();
1410        assert_eq!(decompressed, original);
1411    }
1412
1413    #[test]
1414    fn balanced_mode_repetitive_data() {
1415        let data = "hello world! ".repeat(100);
1416        let compressed = compress_to_vec(data.as_bytes(), Mode::Balanced, None).unwrap();
1417        let decompressed = decompress_from_slice(&compressed).unwrap();
1418        assert_eq!(decompressed, data.as_bytes());
1419    }
1420
1421    #[test]
1422    fn balanced_mode_all_byte_values() {
1423        let original: Vec<u8> = (0..=255).collect();
1424        let compressed = compress_to_vec(&original, Mode::Balanced, None).unwrap();
1425        let decompressed = decompress_from_slice(&compressed).unwrap();
1426        assert_eq!(decompressed, original);
1427    }
1428
1429    #[test]
1430    fn balanced_mode_single_byte() {
1431        let original = b"X";
1432        let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1433        let decompressed = decompress_from_slice(&compressed).unwrap();
1434        assert_eq!(decompressed, original);
1435    }
1436
1437    #[test]
1438    fn balanced_mode_json_roundtrip() {
1439        let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25,"name":"Carol","age":35}"#;
1440        let compressed = compress_to_vec(data, Mode::Balanced, Some(FormatHint::Json)).unwrap();
1441        let decompressed = decompress_from_slice(&compressed).unwrap();
1442        assert_eq!(decompressed, data.to_vec());
1443    }
1444
1445    #[test]
1446    fn empty_data_roundtrip() {
1447        let original = b"";
1448        for mode in [Mode::Fast, Mode::Balanced, Mode::Max] {
1449            let compressed = compress_to_vec(original, mode, None).unwrap();
1450            let decompressed = decompress_from_slice(&compressed).unwrap();
1451            assert_eq!(decompressed, original, "failed for mode {mode}");
1452        }
1453    }
1454
1455    #[test]
1456    fn crc_mismatch_detected() {
1457        let original = b"test data for CRC check";
1458        let mut compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1459        // Corrupt in the compressed data section (after header).
1460        let header_size = 32; // minimum header
1461        if compressed.len() > header_size + 5 {
1462            compressed[header_size + 3] ^= 0xFF;
1463        }
1464        assert!(decompress_from_slice(&compressed).is_err());
1465    }
1466
1467    #[test]
1468    fn fast_mode_actually_compresses() {
1469        // Repetitive data should compress well with zstd.
1470        let data = "hello world. ".repeat(100);
1471        let compressed = compress_to_vec(data.as_bytes(), Mode::Fast, None).unwrap();
1472        assert!(
1473            compressed.len() < data.len(),
1474            "Fast mode should compress repetitive data: {} vs {}",
1475            compressed.len(),
1476            data.len()
1477        );
1478    }
1479
1480    #[test]
1481    fn json_preprocessing_improves_fast_mode() {
1482        let data = br#"[{"name":"Alice","score":95},{"name":"Bob","score":87},{"name":"Carol","score":92},{"name":"Dave","score":88},{"name":"Eve","score":91}]"#;
1483        let with_preprocess = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1484        let without_preprocess =
1485            compress_to_vec(data, Mode::Fast, Some(FormatHint::Generic)).unwrap();
1486
1487        // Both should decompress correctly.
1488        assert_eq!(
1489            decompress_from_slice(&with_preprocess).unwrap(),
1490            data.to_vec()
1491        );
1492        assert_eq!(
1493            decompress_from_slice(&without_preprocess).unwrap(),
1494            data.to_vec()
1495        );
1496    }
1497
1498    #[test]
1499    fn all_modes_roundtrip() {
1500        let data = b"test all modes with some more content to ensure decent compression";
1501        for mode in [Mode::Max, Mode::Balanced, Mode::Fast] {
1502            let compressed = compress_to_vec(data, mode, None).unwrap();
1503            let decompressed = decompress_from_slice(&compressed).unwrap();
1504            assert_eq!(decompressed, data, "failed for mode {mode}");
1505        }
1506    }
1507
1508    #[test]
1509    fn cm_compress_decompress_direct() {
1510        let data = b"Hello, World! This is a direct CM test.";
1511        let compressed = cm_compress(data, CMConfig::balanced());
1512        let decompressed = cm_decompress(&compressed, data.len(), CMConfig::balanced());
1513        assert_eq!(decompressed, data.to_vec());
1514    }
1515
1516    #[test]
1517    fn cm_empty() {
1518        let data: &[u8] = b"";
1519        let compressed = cm_compress(data, CMConfig::balanced());
1520        let decompressed = cm_decompress(&compressed, 0, CMConfig::balanced());
1521        assert!(decompressed.is_empty());
1522    }
1523
1524    #[test]
1525    fn cm_single_byte() {
1526        for byte in 0..=255u8 {
1527            let data = [byte];
1528            let compressed = cm_compress(&data, CMConfig::balanced());
1529            let decompressed = cm_decompress(&compressed, 1, CMConfig::balanced());
1530            assert_eq!(
1531                decompressed, data,
1532                "CM roundtrip failed for byte {byte:#04X}"
1533            );
1534        }
1535    }
1536
1537    #[test]
1538    fn cm_repetitive_compresses() {
1539        let data = vec![b'A'; 1000];
1540        let compressed = cm_compress(&data, CMConfig::balanced());
1541        // 1000 identical bytes should compress well with adaptive model.
1542        assert!(
1543            compressed.len() < 200,
1544            "CM should compress 1000 identical bytes well: {} bytes",
1545            compressed.len()
1546        );
1547        let decompressed = cm_decompress(&compressed, data.len(), CMConfig::balanced());
1548        assert_eq!(decompressed, data);
1549    }
1550
1551    #[test]
1552    fn max_mode_roundtrip() {
1553        let original = b"Max mode test data with some content for compression.";
1554        let compressed = compress_to_vec(original, Mode::Max, None).unwrap();
1555        let decompressed = decompress_from_slice(&compressed).unwrap();
1556        assert_eq!(decompressed, original);
1557    }
1558
1559    #[test]
1560    fn max_mode_longer_text() {
1561        let original = b"The quick brown fox jumps over the lazy dog. Max mode uses 2x context maps for better predictions with fewer hash collisions. This should compress slightly better than balanced mode.";
1562        let compressed = compress_to_vec(original, Mode::Max, None).unwrap();
1563        let decompressed = decompress_from_slice(&compressed).unwrap();
1564        assert_eq!(decompressed, original);
1565    }
1566
1567    // ─── Dictionary compression tests ──────────────────────────────────────────
1568
1569    #[test]
1570    fn test_dict_compress_roundtrip() {
1571        // Generate NDJSON data large enough to trigger dictionary training.
1572        // Repetitive columnar data is ideal for dictionary learning.
1573        let mut ndjson = String::new();
1574        for i in 0..500 {
1575            ndjson.push_str(&format!(
1576                r#"{{"id":{},"name":"user_{}","status":"active","score":{}}}"#,
1577                i,
1578                i,
1579                i * 17 % 100
1580            ));
1581            ndjson.push('\n');
1582        }
1583        let data = ndjson.as_bytes();
1584        assert!(
1585            data.len() > DICT_MIN_DATA_SIZE,
1586            "test data should exceed dict threshold: {} bytes",
1587            data.len()
1588        );
1589
1590        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1591        let decompressed = decompress_from_slice(&compressed).unwrap();
1592        assert_eq!(
1593            decompressed, data,
1594            "dict compress roundtrip: byte-exact mismatch"
1595        );
1596    }
1597
1598    #[test]
1599    fn test_dict_falls_back_on_small() {
1600        // Data smaller than DICT_MIN_DATA_SIZE should not use dictionary.
1601        let data = b"small data that won't trigger dictionary training";
1602        assert!(data.len() < DICT_MIN_DATA_SIZE);
1603
1604        let compressed = compress_to_vec(data, Mode::Fast, None).unwrap();
1605        let decompressed = decompress_from_slice(&compressed).unwrap();
1606        assert_eq!(decompressed, data.to_vec());
1607
1608        // Verify no dict flag in header.
1609        let mut cursor = Cursor::new(&compressed);
1610        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1611        assert!(!header.has_dict, "small data should not have dict flag set");
1612    }
1613
1614    #[test]
1615    fn test_dict_backward_compat() {
1616        // Compress with old behavior (no dict) and verify it still decompresses.
1617        // We simulate this by compressing small data (which skips dict).
1618        let original = b"backward compatibility test data for decompression";
1619        let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1620
1621        // Verify the flag is NOT set.
1622        let mut cursor = Cursor::new(&compressed);
1623        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1624        assert!(!header.has_dict);
1625
1626        // Decompress should work fine.
1627        let decompressed = decompress_from_slice(&compressed).unwrap();
1628        assert_eq!(decompressed, original.to_vec());
1629    }
1630
1631    #[test]
1632    fn test_dict_ndjson_large_roundtrip() {
1633        // Larger NDJSON dataset — should benefit from dictionary.
1634        let mut ndjson = String::new();
1635        for i in 0..2000 {
1636            ndjson.push_str(&format!(
1637                r#"{{"timestamp":"2025-01-{:02}T{:02}:{:02}:00Z","level":"info","message":"Request processed","request_id":"req_{}","duration_ms":{}}}"#,
1638                (i % 28) + 1,
1639                i % 24,
1640                i % 60,
1641                i,
1642                (i * 13) % 500
1643            ));
1644            ndjson.push('\n');
1645        }
1646        let data = ndjson.as_bytes();
1647
1648        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1649        let decompressed = decompress_from_slice(&compressed).unwrap();
1650        assert_eq!(decompressed, data, "large NDJSON roundtrip mismatch");
1651    }
1652
1653    #[test]
1654    fn test_dict_generic_data_roundtrip() {
1655        // Generic (non-JSON) data that's large enough for dict training.
1656        // Uses fixed-size block splitting instead of column boundaries.
1657        let mut data = Vec::new();
1658        for i in 0..3000 {
1659            data.extend_from_slice(
1660                format!("line {i}: the quick brown fox jumps over the lazy dog\n").as_bytes(),
1661            );
1662        }
1663        assert!(data.len() > DICT_MIN_DATA_SIZE);
1664
1665        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Generic)).unwrap();
1666        let decompressed = decompress_from_slice(&compressed).unwrap();
1667        assert_eq!(decompressed, data, "generic data dict roundtrip mismatch");
1668    }
1669
1670    #[test]
1671    fn test_dict_does_not_affect_other_modes() {
1672        // Dictionary training should only apply to Fast mode.
1673        // Balanced and Max modes should remain unchanged.
1674        let mut ndjson = String::new();
1675        for i in 0..200 {
1676            ndjson.push_str(&format!(
1677                r#"{{"id":{},"name":"user_{}","status":"active"}}"#,
1678                i, i
1679            ));
1680            ndjson.push('\n');
1681        }
1682        let data = ndjson.as_bytes();
1683
1684        for mode in [Mode::Balanced, Mode::Max] {
1685            let compressed = compress_to_vec(data, mode, Some(FormatHint::Ndjson)).unwrap();
1686            let mut cursor = Cursor::new(&compressed);
1687            let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1688            assert!(!header.has_dict, "mode {mode} should never have dict flag");
1689            let decompressed = decompress_from_slice(&compressed).unwrap();
1690            assert_eq!(decompressed, data, "roundtrip failed for mode {mode}");
1691        }
1692    }
1693
1694    // ─── Configurable zstd level tests ──────────────────────────────────────
1695
1696    #[test]
1697    fn test_compress_with_level() {
1698        // Compress with level 19 override in Fast mode, verify roundtrip.
1699        let data = "hello world, compressing with custom zstd level. ".repeat(50);
1700        let compressed =
1701            compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(19))
1702                .unwrap();
1703        let decompressed = decompress_from_slice(&compressed).unwrap();
1704        assert_eq!(decompressed, data.as_bytes(), "level 19 roundtrip failed");
1705    }
1706
1707    #[test]
1708    fn test_compress_with_level_default() {
1709        // No level override — should use mode default (9 for Fast).
1710        let data = "default level test data. ".repeat(50);
1711        let compressed =
1712            compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, None).unwrap();
1713        let decompressed = decompress_from_slice(&compressed).unwrap();
1714        assert_eq!(
1715            decompressed,
1716            data.as_bytes(),
1717            "default level roundtrip failed"
1718        );
1719    }
1720
1721    #[test]
1722    fn test_compress_with_level_higher_ratio() {
1723        // Level 19 should compress better than level 1 on repetitive data.
1724        let data = r#"{"name":"Alice","score":95}"#.repeat(200);
1725        let low =
1726            compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(1)).unwrap();
1727        let high = compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(19))
1728            .unwrap();
1729
1730        // Both must roundtrip.
1731        assert_eq!(decompress_from_slice(&low).unwrap(), data.as_bytes());
1732        assert_eq!(decompress_from_slice(&high).unwrap(), data.as_bytes());
1733
1734        // Higher level should produce smaller output (or at least not larger).
1735        assert!(
1736            high.len() <= low.len(),
1737            "level 19 ({}) should be <= level 1 ({})",
1738            high.len(),
1739            low.len()
1740        );
1741    }
1742
1743    // ─── Auto-fallback tests ──────────────────────────────────────────────────
1744
1745    #[test]
1746    fn test_auto_fallback_picks_smaller() {
1747        // citm_catalog.json has extreme repetition. The auto-fallback picks
1748        // whichever path (raw or preprocessed) produces the smallest output.
1749        // With compressed metadata, the preprocessed path may now win.
1750        let data = std::fs::read(concat!(
1751            env!("CARGO_MANIFEST_DIR"),
1752            "/../../corpus/json-bench/citm_catalog.json"
1753        ))
1754        .unwrap();
1755
1756        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1757        let decompressed = decompress_from_slice(&compressed).unwrap();
1758        assert_eq!(decompressed, data, "citm_catalog roundtrip failed");
1759
1760        // Verify good compression ratio regardless of which path won.
1761        let ratio = data.len() as f64 / compressed.len() as f64;
1762        assert!(
1763            ratio > 50.0,
1764            "citm_catalog should achieve >50x, got {ratio:.1}x"
1765        );
1766    }
1767
1768    #[test]
1769    fn test_auto_fallback_preprocessed_wins_on_ndjson() {
1770        // NDJSON with uniform schema should still prefer preprocessed path
1771        // (columnar + typed encoding beats raw zstd for structured data).
1772        let data = std::fs::read(concat!(
1773            env!("CARGO_MANIFEST_DIR"),
1774            "/../../corpus/test-ndjson.ndjson"
1775        ))
1776        .unwrap();
1777
1778        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1779        let decompressed = decompress_from_slice(&compressed).unwrap();
1780        assert_eq!(decompressed, data, "test-ndjson roundtrip failed");
1781
1782        // Check that preprocessing was used: either non-empty transform metadata
1783        // in the header, or metadata embedded in the compressed stream.
1784        let mut cursor = Cursor::new(&compressed);
1785        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1786        assert!(
1787            !header.transform_metadata.is_empty() || header.meta_embedded,
1788            "test-ndjson should prefer preprocessed path (non-empty transform metadata or embedded)"
1789        );
1790    }
1791
1792    #[test]
1793    fn test_auto_fallback_roundtrip() {
1794        // Verify both raw and preprocessed paths produce correct roundtrips.
1795        // Use citm_catalog (raw wins) and test-ndjson (preprocessed wins).
1796        let citm = std::fs::read(concat!(
1797            env!("CARGO_MANIFEST_DIR"),
1798            "/../../corpus/json-bench/citm_catalog.json"
1799        ))
1800        .unwrap();
1801        let ndjson = std::fs::read(concat!(
1802            env!("CARGO_MANIFEST_DIR"),
1803            "/../../corpus/test-ndjson.ndjson"
1804        ))
1805        .unwrap();
1806
1807        // citm_catalog — raw path
1808        let compressed_citm = compress_to_vec(&citm, Mode::Fast, Some(FormatHint::Json)).unwrap();
1809        let decompressed_citm = decompress_from_slice(&compressed_citm).unwrap();
1810        assert_eq!(
1811            decompressed_citm, citm,
1812            "citm_catalog roundtrip (raw path) failed"
1813        );
1814
1815        // test-ndjson — preprocessed path
1816        let compressed_ndjson =
1817            compress_to_vec(&ndjson, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1818        let decompressed_ndjson = decompress_from_slice(&compressed_ndjson).unwrap();
1819        assert_eq!(
1820            decompressed_ndjson, ndjson,
1821            "test-ndjson roundtrip (preprocessed path) failed"
1822        );
1823    }
1824
1825    // ─── Adaptive level tests ─────────────────────────────────────────────────
1826
1827    #[test]
1828    fn test_adaptive_level_small_data() {
1829        // ≤16MB should use level 19 — best ratio, preprocessing dominates encode time.
1830        assert_eq!(adaptive_fast_level(100_000, None), 19);
1831        assert_eq!(adaptive_fast_level(500_000, None), 19);
1832        assert_eq!(adaptive_fast_level(1_048_576, None), 19);
1833        assert_eq!(adaptive_fast_level(0, None), 19);
1834    }
1835
1836    #[test]
1837    fn test_adaptive_level_medium_data() {
1838        // 1-16MB still gets level 19 — zstd levels 9-15 are a plateau
1839        // (identical ratio on structured JSON), so we skip to 19.
1840        assert_eq!(adaptive_fast_level(1_048_577, None), 19);
1841        assert_eq!(adaptive_fast_level(5_000_000, None), 19);
1842        assert_eq!(adaptive_fast_level(10_485_760, None), 19);
1843        assert_eq!(adaptive_fast_level(16_777_216, None), 19);
1844    }
1845
1846    #[test]
1847    fn test_adaptive_level_large_data() {
1848        // 16-64MB uses level 16 (btultra breakpoint), >64MB uses level 9.
1849        assert_eq!(adaptive_fast_level(16_777_217, None), 16);
1850        assert_eq!(adaptive_fast_level(33_554_432, None), 16);
1851        assert_eq!(adaptive_fast_level(67_108_864, None), 16);
1852        assert_eq!(adaptive_fast_level(67_108_865, None), 9);
1853        assert_eq!(adaptive_fast_level(100_000_000, None), 9);
1854    }
1855
1856    #[test]
1857    fn test_adaptive_level_override() {
1858        // --level flag should always override adaptive level.
1859        assert_eq!(adaptive_fast_level(100, Some(3)), 3);
1860        assert_eq!(adaptive_fast_level(100_000_000, Some(22)), 22);
1861        assert_eq!(adaptive_fast_level(0, Some(1)), 1);
1862    }
1863
1864    // ─── Compressed metadata tests ──────────────────────────────────────────────
1865
1866    #[test]
1867    fn test_compressed_metadata_roundtrip() {
1868        // Generate NDJSON data large enough to produce > 64 bytes of transform metadata.
1869        let mut ndjson = String::new();
1870        for i in 0..500 {
1871            ndjson.push_str(&format!(
1872                r#"{{"id":{},"name":"user_{}","status":"active","score":{}}}"#,
1873                i,
1874                i,
1875                i * 17 % 100
1876            ));
1877            ndjson.push('\n');
1878        }
1879        let data = ndjson.as_bytes();
1880
1881        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1882        let decompressed = decompress_from_slice(&compressed).unwrap();
1883        assert_eq!(
1884            decompressed, data,
1885            "compressed metadata roundtrip: byte-exact mismatch"
1886        );
1887
1888        // Verify the header has meta_compressed set if metadata was large enough.
1889        let mut cursor = Cursor::new(&compressed);
1890        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1891        // The file should have used preprocessed path (non-empty metadata).
1892        if !header.transform_metadata.is_empty() && header.transform_metadata.len() > 10 {
1893            // Metadata was present — check that compressed flag makes sense.
1894            // (meta_compressed is true only if compression actually saved space)
1895            // Just verify roundtrip was correct — the flag is an optimization detail.
1896        }
1897    }
1898
1899    #[test]
1900    fn test_compressed_metadata_backward_compat() {
1901        // Simulate old files that have no compressed metadata (bit 2 = 0).
1902        // These should still decompress correctly.
1903        let original = b"backward compatibility test data for metadata decompression";
1904        let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1905
1906        // Verify decompression works.
1907        let decompressed = decompress_from_slice(&compressed).unwrap();
1908        assert_eq!(decompressed, original.to_vec());
1909
1910        // For small data, metadata should be empty or very small — no compression.
1911        let mut cursor = Cursor::new(&compressed);
1912        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1913        // Small data may or may not have metadata, but it should roundtrip either way.
1914        assert!(!header.meta_compressed || !header.transform_metadata.is_empty());
1915    }
1916
1917    #[test]
1918    fn test_compressed_metadata_small_skipped() {
1919        // Small metadata (< 64 bytes) should NOT be compressed — zstd frame overhead
1920        // would make it larger.
1921        let data = br#"{"name":"Alice","age":30}"#;
1922        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1923        let decompressed = decompress_from_slice(&compressed).unwrap();
1924        assert_eq!(decompressed, data.to_vec());
1925
1926        let mut cursor = Cursor::new(&compressed);
1927        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1928        // Small JSON has small metadata — should not be compressed.
1929        if header.transform_metadata.len() <= 64 {
1930            assert!(
1931                !header.meta_compressed,
1932                "metadata <= 64 bytes should not be compressed, but meta_compressed=true \
1933                 for {} bytes of metadata",
1934                header.transform_metadata.len()
1935            );
1936        }
1937    }
1938
1939    #[test]
1940    fn test_twitter_json_brotli_wins() {
1941        // twitter.json should use brotli — raw brotli-11 beats both preprocessed+zstd
1942        // and raw+zstd on this file.
1943        let data = std::fs::read(concat!(
1944            env!("CARGO_MANIFEST_DIR"),
1945            "/../../corpus/json-bench/twitter.json"
1946        ))
1947        .unwrap();
1948
1949        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1950        let decompressed = decompress_from_slice(&compressed).unwrap();
1951        assert_eq!(decompressed, data, "twitter.json roundtrip failed");
1952
1953        // Check that brotli was selected.
1954        let mut cursor = Cursor::new(&compressed);
1955        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1956        assert!(
1957            header.use_brotli,
1958            "twitter.json should use brotli (FLAG_BROTLI set in header)"
1959        );
1960    }
1961
1962    #[test]
1963    fn test_compressed_metadata_all_modes_roundtrip() {
1964        // Metadata compression applies to all modes, not just Fast.
1965        let mut ndjson = String::new();
1966        for i in 0..200 {
1967            ndjson.push_str(&format!(
1968                r#"{{"id":{},"name":"user_{}","status":"active"}}"#,
1969                i, i
1970            ));
1971            ndjson.push('\n');
1972        }
1973        let data = ndjson.as_bytes();
1974
1975        for mode in [Mode::Fast, Mode::Balanced, Mode::Max] {
1976            let compressed = compress_to_vec(data, mode, Some(FormatHint::Ndjson)).unwrap();
1977            let decompressed = decompress_from_slice(&compressed).unwrap();
1978            assert_eq!(
1979                decompressed, data,
1980                "compressed metadata roundtrip failed for mode {mode}"
1981            );
1982        }
1983    }
1984
1985    // ─── Brotli auto-fallback tests ──────────────────────────────────────────
1986
1987    #[test]
1988    fn test_brotli_compress_roundtrip() {
1989        // Direct brotli compress/decompress helper roundtrip.
1990        let data = b"Hello, brotli! This is a test of the brotli compression helpers.";
1991        let compressed = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
1992        let decompressed = brotli_decompress(&compressed).unwrap();
1993        assert_eq!(decompressed, data.to_vec());
1994    }
1995
1996    #[test]
1997    fn test_brotli_auto_fallback_twitter() {
1998        // twitter.json should select brotli and roundtrip correctly.
1999        let data = std::fs::read(concat!(
2000            env!("CARGO_MANIFEST_DIR"),
2001            "/../../corpus/json-bench/twitter.json"
2002        ))
2003        .unwrap();
2004
2005        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2006        let decompressed = decompress_from_slice(&compressed).unwrap();
2007        assert_eq!(decompressed, data, "twitter.json brotli roundtrip failed");
2008
2009        let mut cursor = Cursor::new(&compressed);
2010        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
2011        assert!(
2012            header.use_brotli,
2013            "twitter.json should use brotli in auto-fallback"
2014        );
2015    }
2016
2017    #[test]
2018    fn test_brotli_ndjson_roundtrip() {
2019        // NDJSON with uniform schema — regardless of which entropy coder wins,
2020        // the roundtrip must be byte-exact.
2021        let data = std::fs::read(concat!(
2022            env!("CARGO_MANIFEST_DIR"),
2023            "/../../corpus/test-ndjson.ndjson"
2024        ))
2025        .unwrap();
2026
2027        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2028        let decompressed = decompress_from_slice(&compressed).unwrap();
2029        assert_eq!(decompressed, data, "ndjson roundtrip failed");
2030    }
2031
2032    #[test]
2033    fn test_brotli_backward_compat() {
2034        // Old .dcx files without the brotli flag (bit 3 = 0) must still decompress.
2035        // We simulate an old file by manually crafting a .dcx with FLAG_BROTLI unset.
2036        // Compress with zstd directly and build a minimal .dcx header.
2037        let original = b"backward compatibility test: this data was compressed without brotli";
2038        let crc = crc32fast::hash(original);
2039        let zstd_compressed = zstd::bulk::compress(original, 19).unwrap();
2040
2041        let header = crate::dcx::DcxHeader {
2042            mode: Mode::Fast,
2043            format_hint: crate::dcx::FormatHint::Generic,
2044            original_size: original.len() as u64,
2045            compressed_size: zstd_compressed.len() as u64,
2046            crc32: crc,
2047            transform_metadata: vec![],
2048            has_dict: false,
2049            meta_compressed: false,
2050            use_brotli: false,
2051            meta_embedded: false,
2052        };
2053
2054        let mut buf = Vec::new();
2055        header.write_to(&mut buf).unwrap();
2056        buf.extend_from_slice(&zstd_compressed);
2057
2058        // Verify the brotli flag is NOT set in the serialized header.
2059        assert_eq!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2060
2061        // Decompress — must work even though brotli path exists.
2062        let decompressed = decompress_from_slice(&buf).unwrap();
2063        assert_eq!(decompressed, original.to_vec());
2064    }
2065
2066    // ─── Embedded metadata tests ──────────────────────────────────────────────
2067
2068    #[test]
2069    fn test_embedded_metadata_roundtrip() {
2070        // Compress test-api.json with Fast mode — if embedded metadata is used,
2071        // the roundtrip must be byte-exact.
2072        let data = std::fs::read(concat!(
2073            env!("CARGO_MANIFEST_DIR"),
2074            "/../../corpus/test-api.json"
2075        ))
2076        .unwrap();
2077
2078        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2079        let decompressed = decompress_from_slice(&compressed).unwrap();
2080        assert_eq!(
2081            decompressed, data,
2082            "test-api.json embedded metadata roundtrip: byte-exact mismatch"
2083        );
2084    }
2085
2086    #[test]
2087    fn test_embedded_metadata_backward_compat() {
2088        // Old .dcx files without the meta_embedded flag (bit 4 = 0) must still decompress.
2089        // We simulate an old file by manually crafting a .dcx with FLAG_META_EMBEDDED unset
2090        // and separate transform metadata.
2091        let original = b"backward compat: no embedded metadata in this old file format";
2092        let crc = crc32fast::hash(original);
2093        let zstd_compressed = zstd::bulk::compress(original, 19).unwrap();
2094
2095        let header = crate::dcx::DcxHeader {
2096            mode: Mode::Fast,
2097            format_hint: crate::dcx::FormatHint::Generic,
2098            original_size: original.len() as u64,
2099            compressed_size: zstd_compressed.len() as u64,
2100            crc32: crc,
2101            transform_metadata: vec![],
2102            has_dict: false,
2103            meta_compressed: false,
2104            use_brotli: false,
2105            meta_embedded: false,
2106        };
2107
2108        let mut buf = Vec::new();
2109        header.write_to(&mut buf).unwrap();
2110        buf.extend_from_slice(&zstd_compressed);
2111
2112        // Verify meta_embedded flag is NOT set.
2113        assert_eq!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2114
2115        // Decompress — must work without embedded metadata support.
2116        let decompressed = decompress_from_slice(&buf).unwrap();
2117        assert_eq!(decompressed, original.to_vec());
2118    }
2119
2120    #[test]
2121    fn test_embedded_metadata_small_file_improvement() {
2122        // test-api.json is a small file (37KB) where embedded metadata should
2123        // save overhead compared to separate metadata.
2124        let data = std::fs::read(concat!(
2125            env!("CARGO_MANIFEST_DIR"),
2126            "/../../corpus/test-api.json"
2127        ))
2128        .unwrap();
2129
2130        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2131        let decompressed = decompress_from_slice(&compressed).unwrap();
2132        assert_eq!(decompressed, data, "roundtrip failed");
2133
2134        // Verify the file compresses to a reasonable size.
2135        let ratio = data.len() as f64 / compressed.len() as f64;
2136        assert!(
2137            ratio > 5.0,
2138            "test-api.json should achieve >5x compression, got {ratio:.1}x"
2139        );
2140
2141        // Check header to see which path was chosen.
2142        let mut cursor = Cursor::new(&compressed);
2143        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
2144
2145        // If embedded was chosen, verify the flag is set and header metadata is empty.
2146        if header.meta_embedded {
2147            assert!(
2148                header.transform_metadata.is_empty(),
2149                "meta_embedded header should have empty transform_metadata"
2150            );
2151            assert!(header.use_brotli, "meta_embedded should use brotli codec");
2152        }
2153    }
2154
2155    #[test]
2156    fn test_embedded_metadata_ndjson_roundtrip() {
2157        // NDJSON files with transforms must still roundtrip correctly
2158        // regardless of whether embedded or separate metadata is chosen.
2159        let data = std::fs::read(concat!(
2160            env!("CARGO_MANIFEST_DIR"),
2161            "/../../corpus/test-ndjson.ndjson"
2162        ))
2163        .unwrap();
2164
2165        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2166        let decompressed = decompress_from_slice(&compressed).unwrap();
2167        assert_eq!(
2168            decompressed, data,
2169            "NDJSON embedded metadata roundtrip: byte-exact mismatch"
2170        );
2171    }
2172
2173    #[test]
2174    fn test_embedded_metadata_manual_roundtrip() {
2175        // Manually construct an embedded-metadata .dcx to verify the decompress path
2176        // handles the format correctly, independent of what the compressor chooses.
2177        let original = b"Hello, embedded metadata world! This is a test.";
2178        let crc = crc32fast::hash(original);
2179
2180        // Build embedded payload with an empty transform chain so reverse_preprocess
2181        // is a no-op and the data passes through unchanged.
2182        let empty_chain = TransformChain::new();
2183        let raw_metadata = empty_chain.serialize();
2184
2185        // Build embedded payload: [meta_len:u32 LE][raw_metadata][original_data]
2186        let mut embedded = Vec::new();
2187        embedded.extend_from_slice(&(raw_metadata.len() as u32).to_le_bytes());
2188        embedded.extend_from_slice(&raw_metadata);
2189        embedded.extend_from_slice(original);
2190
2191        let brotli_data = brotli_compress(&embedded, 11, BROTLI_MODE_GENERIC).unwrap();
2192
2193        let header = crate::dcx::DcxHeader {
2194            mode: Mode::Fast,
2195            format_hint: crate::dcx::FormatHint::Generic,
2196            original_size: original.len() as u64,
2197            compressed_size: brotli_data.len() as u64,
2198            crc32: crc,
2199            transform_metadata: vec![], // empty — metadata is embedded
2200            has_dict: false,
2201            meta_compressed: false,
2202            use_brotli: true,
2203            meta_embedded: true,
2204        };
2205
2206        let mut buf = Vec::new();
2207        header.write_to(&mut buf).unwrap();
2208        buf.extend_from_slice(&brotli_data);
2209
2210        // Verify flags.
2211        assert_ne!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2212        assert_ne!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2213
2214        // Decompress and verify.
2215        let decompressed = decompress_from_slice(&buf).unwrap();
2216        assert_eq!(decompressed, original.to_vec());
2217    }
2218
2219    // ─── Optimization: Brotli TEXT mode tests ───────────────────────────────
2220
2221    #[test]
2222    fn test_brotli_text_mode_on_raw() {
2223        // Verify TEXT mode produces valid brotli that decompresses correctly.
2224        let data = br#"{"name":"Alice","age":30,"city":"New York","active":true}"#;
2225
2226        // TEXT mode (for raw UTF-8/JSON).
2227        let compressed_text = brotli_compress(data, 11, BROTLI_MODE_TEXT).unwrap();
2228        let decompressed_text = brotli_decompress(&compressed_text).unwrap();
2229        assert_eq!(
2230            decompressed_text,
2231            data.to_vec(),
2232            "TEXT mode roundtrip failed"
2233        );
2234
2235        // GENERIC mode (for comparison).
2236        let compressed_generic = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
2237        let decompressed_generic = brotli_decompress(&compressed_generic).unwrap();
2238        assert_eq!(
2239            decompressed_generic,
2240            data.to_vec(),
2241            "GENERIC mode roundtrip failed"
2242        );
2243
2244        // Both must produce valid output — TEXT mode should not be larger than
2245        // GENERIC on UTF-8 text (or at most within a few bytes).
2246        // We don't assert TEXT < GENERIC because on tiny data the difference is negligible,
2247        // but we verify the feature works.
2248        assert!(
2249            !compressed_text.is_empty(),
2250            "TEXT mode should produce non-empty output"
2251        );
2252    }
2253
2254    // ─── Optimization: Zstd embedded metadata tests ─────────────────────────
2255
2256    #[test]
2257    fn test_zstd_embedded_metadata_roundtrip() {
2258        // Manually construct a .dcx with zstd-compressed embedded metadata
2259        // (meta_embedded=true, use_brotli=false) and verify roundtrip.
2260        let original = b"Hello, zstd embedded metadata! This is a test of the zstd path.";
2261        let crc = crc32fast::hash(original);
2262
2263        // Build embedded payload with an empty transform chain.
2264        let empty_chain = TransformChain::new();
2265        let raw_metadata = empty_chain.serialize();
2266
2267        // [meta_len:u32 LE][raw_metadata][original_data]
2268        let mut embedded = Vec::new();
2269        embedded.extend_from_slice(&(raw_metadata.len() as u32).to_le_bytes());
2270        embedded.extend_from_slice(&raw_metadata);
2271        embedded.extend_from_slice(original);
2272
2273        let zstd_data = zstd::bulk::compress(&embedded, 19).unwrap();
2274
2275        let header = crate::dcx::DcxHeader {
2276            mode: Mode::Fast,
2277            format_hint: crate::dcx::FormatHint::Generic,
2278            original_size: original.len() as u64,
2279            compressed_size: zstd_data.len() as u64,
2280            crc32: crc,
2281            transform_metadata: vec![], // empty — metadata is embedded
2282            has_dict: false,
2283            meta_compressed: false,
2284            use_brotli: false, // zstd, not brotli
2285            meta_embedded: true,
2286        };
2287
2288        let mut buf = Vec::new();
2289        header.write_to(&mut buf).unwrap();
2290        buf.extend_from_slice(&zstd_data);
2291
2292        // Verify flags: meta_embedded set, brotli NOT set.
2293        assert_ne!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2294        assert_eq!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2295
2296        // Decompress and verify byte-exact roundtrip.
2297        let decompressed = decompress_from_slice(&buf).unwrap();
2298        assert_eq!(decompressed, original.to_vec());
2299    }
2300
2301    // ─── Optimization: Multi-quality brotli tests ───────────────────────────
2302
2303    #[test]
2304    fn test_multi_quality_brotli() {
2305        // Verify both quality 10 and 11 produce valid brotli that decompresses.
2306        // On some data q10 beats q11 — we just verify both work correctly.
2307        let data = br#"{"items":[1,2,3,4,5],"nested":{"a":"hello","b":"world"}}"#;
2308
2309        let q10 = brotli_compress(data, 10, BROTLI_MODE_GENERIC).unwrap();
2310        let q11 = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
2311
2312        let dec_q10 = brotli_decompress(&q10).unwrap();
2313        let dec_q11 = brotli_decompress(&q11).unwrap();
2314
2315        assert_eq!(dec_q10, data.to_vec(), "quality 10 roundtrip failed");
2316        assert_eq!(dec_q11, data.to_vec(), "quality 11 roundtrip failed");
2317
2318        // Both should produce non-empty compressed output.
2319        assert!(!q10.is_empty());
2320        assert!(!q11.is_empty());
2321
2322        // The auto-fallback should pick the smaller one.
2323        // We can't assert which is smaller (data-dependent), but verify the logic
2324        // by checking that auto-fallback roundtrips on real corpus files.
2325        let corpus_files = [
2326            concat!(env!("CARGO_MANIFEST_DIR"), "/../../corpus/test-api.json"),
2327            concat!(
2328                env!("CARGO_MANIFEST_DIR"),
2329                "/../../corpus/json-bench/twitter.json"
2330            ),
2331        ];
2332        for path in corpus_files {
2333            let file_data = std::fs::read(path).unwrap();
2334            let compressed =
2335                compress_to_vec(&file_data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2336            let decompressed = decompress_from_slice(&compressed).unwrap();
2337            assert_eq!(
2338                decompressed, file_data,
2339                "multi-quality roundtrip failed for {path}"
2340            );
2341        }
2342    }
2343
2344    // ─── Adversarial Regression Tests ────────────────────────────────────────
2345
2346    #[test]
2347    fn test_singleton_arrays_fast_roundtrip() {
2348        // Bug 1: NDJSON with singleton array values like [{"x":0}] caused CRC
2349        // mismatch in fast mode because typed encoding corrupted unquoted values.
2350        let rows: Vec<String> = (0..500)
2351            .map(|i| format!("{{\"items\":[{{\"x\":{}}}],\"id\":{}}}", i, i))
2352            .collect();
2353        let data = rows.join("\n") + "\n";
2354        let compressed =
2355            compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2356        let decompressed = decompress_from_slice(&compressed).unwrap();
2357        assert_eq!(
2358            decompressed,
2359            data.as_bytes(),
2360            "singleton_arrays fast mode roundtrip failed"
2361        );
2362    }
2363
2364    #[test]
2365    fn test_very_long_lines_fast_roundtrip() {
2366        // Bug 2: NDJSON with 100KB string values caused CRC mismatch because
2367        // encode_string_column used u16 for per-value lengths (max 65535).
2368        let rows: Vec<String> = (0..50)
2369            .map(|i| format!("{{\"data\":\"{}\",\"id\":{}}}", "X".repeat(100_000), i))
2370            .collect();
2371        let data = rows.join("\n") + "\n";
2372        let compressed =
2373            compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2374        let decompressed = decompress_from_slice(&compressed).unwrap();
2375        assert_eq!(
2376            decompressed,
2377            data.as_bytes(),
2378            "very_long_lines fast mode roundtrip failed"
2379        );
2380    }
2381
2382    #[test]
2383    fn test_very_long_lines_balanced_roundtrip() {
2384        // Bug 2 also affected balanced mode — the NDJSON columnar transform
2385        // itself is mode-independent, and long strings overflow u16 everywhere.
2386        let rows: Vec<String> = (0..10)
2387            .map(|i| format!("{{\"data\":\"{}\",\"id\":{}}}", "X".repeat(100_000), i))
2388            .collect();
2389        let data = rows.join("\n") + "\n";
2390        let compressed =
2391            compress_to_vec(data.as_bytes(), Mode::Balanced, Some(FormatHint::Ndjson)).unwrap();
2392        let decompressed = decompress_from_slice(&compressed).unwrap();
2393        assert_eq!(
2394            decompressed,
2395            data.as_bytes(),
2396            "very_long_lines balanced mode roundtrip failed"
2397        );
2398    }
2399
2400    #[test]
2401    fn test_all_same_value_fast_roundtrip() {
2402        // Bug 3: 10K identical rows of {"x":1} caused SIGBUS crash in fast mode.
2403        // After typed encoding, the delta-varint stream was full of 0x00 bytes.
2404        // generate_training_samples split on 0x00, creating thousands of tiny
2405        // fragments that crashed zstd dictionary training.
2406        let rows: Vec<String> = (0..10_000).map(|_| "{\"x\":1}".to_string()).collect();
2407        let data = rows.join("\n") + "\n";
2408        let compressed =
2409            compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2410        let decompressed = decompress_from_slice(&compressed).unwrap();
2411        assert_eq!(
2412            decompressed,
2413            data.as_bytes(),
2414            "all_same_value fast mode roundtrip failed"
2415        );
2416    }
2417
2418    #[test]
2419    fn test_generate_training_samples_degenerate() {
2420        // Verify that generate_training_samples falls back to fixed-size chunks
2421        // when 0x00 splitting produces degenerate samples (avg < 8 bytes).
2422        let mut data = vec![0x02u8]; // one non-zero byte
2423        data.extend_from_slice(&[0x00; 9999]); // 9999 zero bytes
2424        let samples = generate_training_samples(&data, 1024);
2425        // Must fall back to fixed-size chunks, not degenerate 0x00-split.
2426        let avg_len = samples.iter().map(|s| s.len()).sum::<usize>() / samples.len();
2427        assert!(
2428            avg_len >= 8,
2429            "training samples average size should be >= 8, got {avg_len}"
2430        );
2431    }
2432
2433    #[test]
2434    fn null_heavy_codec_roundtrip_fast() {
2435        // Regression: null-heavy NDJSON (30+ rows with all-null columns) caused CRC mismatch.
2436        // Python json.dumps produces spaces after colons: {"id": 0, "val": null}
2437        let mut data = Vec::new();
2438        for i in 0..30 {
2439            data.extend_from_slice(format!("{{\"id\": {}, \"val\": null}}\n", i).as_bytes());
2440        }
2441        let mut compressed = Vec::new();
2442        compress(&data, Mode::Fast, None, &mut compressed).unwrap();
2443        let decompressed = decompress(&mut std::io::Cursor::new(&compressed)).unwrap();
2444        assert_eq!(
2445            decompressed, data,
2446            "null-heavy 30-row fast mode roundtrip failed"
2447        );
2448    }
2449
2450    #[test]
2451    fn null_heavy_codec_roundtrip_balanced() {
2452        let mut data = Vec::new();
2453        for i in 0..30 {
2454            data.extend_from_slice(format!("{{\"id\": {}, \"val\": null}}\n", i).as_bytes());
2455        }
2456        let mut compressed = Vec::new();
2457        compress(&data, Mode::Balanced, None, &mut compressed).unwrap();
2458        let decompressed = decompress(&mut std::io::Cursor::new(&compressed)).unwrap();
2459        assert_eq!(
2460            decompressed, data,
2461            "null-heavy 30-row balanced mode roundtrip failed"
2462        );
2463    }
2464
2465    #[test]
2466    fn gharchive_selective_roundtrip() {
2467        // Verify GH Archive roundtrip with selective columnar transform.
2468        let path = concat!(
2469            env!("CARGO_MANIFEST_DIR"),
2470            "/../../corpus/json-bench/gharchive-10mb.ndjson"
2471        );
2472        let data = match std::fs::read(path) {
2473            Ok(d) => d,
2474            Err(_) => return, // Skip if corpus not available
2475        };
2476        let mut compressed = Vec::new();
2477        compress(
2478            &data,
2479            Mode::Fast,
2480            Some(crate::dcx::FormatHint::Ndjson),
2481            &mut compressed,
2482        )
2483        .unwrap();
2484        let decompressed = decompress(&mut std::io::Cursor::new(&compressed)).unwrap();
2485        assert_eq!(
2486            decompressed, data,
2487            "GH Archive selective columnar roundtrip failed"
2488        );
2489    }
2490}