Skip to main content

datacortex_core/
codec.rs

1//! Codec orchestrator — compress and decompress through the DataCortex pipeline.
2//!
3//! Phase 1: Format preprocessing + zstd (Fast mode).
4//! Phase 3: Full CM engine with higher-order models + mixer + APM (Balanced mode, ~256MB).
5//! Phase 5: Full CM engine with 2x context maps (Max mode, ~512MB).
6//! Phase 6: Dual-path CM + LLM with MetaMixer (Max mode with `neural` feature).
7//! Phase 7: Dual-path CM + GRU byte-level predictor (Balanced mode).
8
9use std::io::{self, Cursor, Read, Write};
10
11use crate::dcx::{DcxHeader, FormatHint, Mode};
12use crate::entropy::arithmetic::{ArithmeticDecoder, ArithmeticEncoder};
13use crate::format::transform::TransformChain;
14use crate::format::{detect_format, preprocess, reverse_preprocess};
15use crate::mixer::MetaMixer;
16use crate::model::gru_model::GruModel;
17use crate::model::{CMConfig, CMEngine};
18
19/// Adaptive zstd level for Fast mode based on preprocessed data size.
20///
21/// Smaller data compresses quickly even at high levels, so we use higher
22/// zstd levels for small-medium files without meaningful speed impact.
23/// If `level_override` is set (user passed --level), it always wins.
24fn adaptive_fast_level(data_size: usize, level_override: Option<i32>) -> i32 {
25    if let Some(level) = level_override {
26        return level; // User explicitly set level, respect it
27    }
28    // Empirically, zstd levels 9-15 produce nearly identical ratios on
29    // structured JSON (btlazy2 strategy plateau). The meaningful jump
30    // happens at level 16+ (btultra strategy). Level 13 wastes encode
31    // time without ratio gain over level 9.
32    //
33    // DataCortex benchmarks against zstd-19. Our preprocessing adds
34    // ~3-5% on top, but we need internal zstd at level 17+ to beat
35    // raw zstd-19 on diverse data like GH Archive.
36    //
37    // Encode time impact: preprocessing (columnar reorg, schema inference)
38    // dominates. With rayon parallelism the zstd level cost is marginal.
39    // Decode is completely unaffected by compression level.
40    match data_size {
41        0..=16_777_216 => 19,          // ≤16MB: best ratio, <3s encode on 10MB
42        16_777_217..=67_108_864 => 16, // 16-64MB: btultra breakpoint, good ratio
43        _ => 9,                        // >64MB: skip 10-15 plateau, use fast
44    }
45}
46
47// ─── Zstd Dictionary Training (Fast mode) ─────────────────────────────────────
48
49/// Minimum preprocessed data size to attempt dictionary training.
50/// Below this threshold the dictionary overhead exceeds any savings.
51const DICT_MIN_DATA_SIZE: usize = 8192;
52
53/// Target chunk size for splitting preprocessed data before per-chunk compression.
54/// Each chunk is compressed independently with the shared dictionary.
55/// Smaller chunks benefit more from dictionary priming, but each chunk has
56/// framing overhead (4 bytes size + zstd frame header ~10 bytes).
57/// Adaptive: scale with data size to avoid too many chunks.
58fn dict_chunk_size(data_len: usize) -> usize {
59    if data_len > 4_194_304 {
60        131_072 // 128 KB for > 4 MB
61    } else if data_len > 1_048_576 {
62        65_536 // 64 KB for 1 - 4 MB
63    } else if data_len > 262_144 {
64        32_768 // 32 KB for 256 KB - 1 MB
65    } else {
66        16_384 // 16 KB for smaller files
67    }
68}
69
70/// Maximum dictionary size based on input data size.
71/// Kept relatively small to minimize overhead. The dictionary primes each chunk's
72/// compressor context, so even a small dict provides most of the benefit.
73fn dict_max_size(data_len: usize) -> usize {
74    if data_len > 4_194_304 {
75        16_384 // 16 KB for > 4 MB
76    } else if data_len > 1_048_576 {
77        8_192 // 8 KB for 1 - 4 MB
78    } else {
79        4_096 // 4 KB for smaller files
80    }
81}
82
83/// Generate training samples from the data for dictionary training.
84///
85/// Uses column boundaries (0x00 separators) if available, otherwise fixed blocks.
86/// These samples are only used for `zstd::dict::from_samples`, NOT for the
87/// actual chunked compression (which uses `split_into_chunks`).
88fn generate_training_samples(data: &[u8], chunk_size: usize) -> Vec<&[u8]> {
89    // Try column boundaries (0x00 separators from columnar transform).
90    let col_chunks: Vec<&[u8]> = data.split(|&b| b == 0x00).collect();
91    if col_chunks.len() >= 5 {
92        let non_empty: Vec<&[u8]> = col_chunks.into_iter().filter(|c| !c.is_empty()).collect();
93        // Validate that the split produced reasonable samples. If the data is
94        // typed-encoded binary (not columnar text), 0x00 bytes are varint
95        // zeros, not column separators. Splitting on them creates thousands
96        // of tiny fragments that crash zstd dictionary training. Require
97        // non-empty samples with a minimum average size of 8 bytes.
98        if !non_empty.is_empty() {
99            let avg_len = non_empty.iter().map(|c| c.len()).sum::<usize>() / non_empty.len();
100            if avg_len >= 8 {
101                return non_empty;
102            }
103        }
104    }
105
106    // Fall back to fixed-size blocks for training.
107    split_into_chunks(data, chunk_size)
108}
109
110/// Split data into fixed-size chunks for per-chunk compression.
111/// Every byte is preserved exactly -- no bytes are lost at boundaries.
112fn split_into_chunks(data: &[u8], chunk_size: usize) -> Vec<&[u8]> {
113    let mut chunks = Vec::new();
114    let mut offset = 0;
115    while offset < data.len() {
116        let end = (offset + chunk_size).min(data.len());
117        chunks.push(&data[offset..end]);
118        offset = end;
119    }
120    chunks
121}
122
123/// Attempt chunk-based dictionary compression.
124///
125/// 1. Split data into chunks
126/// 2. Train a zstd dictionary on the chunks
127/// 3. Compress each chunk independently using the trained dictionary
128/// 4. Return the dict + all compressed chunks as a payload
129///
130/// Returns `Some(payload)` if the total is smaller than `plain_size`, else `None`.
131fn try_dict_compress(data: &[u8], level: i32, plain_size: usize) -> Option<Vec<u8>> {
132    let chunk_size = dict_chunk_size(data.len());
133
134    // Generate training samples (may use column boundaries for better diversity).
135    let training_samples = generate_training_samples(data, chunk_size);
136    if training_samples.len() < 5 {
137        return None;
138    }
139
140    let max_dict = dict_max_size(data.len());
141
142    // Train dictionary from the training samples.
143    let dict = zstd::dict::from_samples(&training_samples, max_dict).ok()?;
144    if dict.is_empty() {
145        return None;
146    }
147
148    // Split data into fixed-size chunks for per-chunk compression.
149    let chunks = split_into_chunks(data, chunk_size);
150
151    // Compress each chunk independently with the dictionary.
152    let mut compressor = zstd::bulk::Compressor::with_dictionary(level, &dict).ok()?;
153    let mut compressed_chunks: Vec<Vec<u8>> = Vec::with_capacity(chunks.len());
154    for chunk in &chunks {
155        let cc = compressor.compress(chunk).ok()?;
156        compressed_chunks.push(cc);
157    }
158
159    // Build payload:
160    //   [dict_size: u32 LE] [dict_bytes]
161    //   [num_chunks: u32 LE]
162    //   for each chunk: [chunk_compressed_size: u32 LE] [chunk_data]
163    let total_compressed: usize = compressed_chunks.iter().map(|c| 4 + c.len()).sum();
164    let payload_size = 4 + dict.len() + 4 + total_compressed;
165
166    // Only use dict if it beats plain compression.
167    if payload_size >= plain_size {
168        return None;
169    }
170
171    let mut payload = Vec::with_capacity(payload_size);
172    payload.extend_from_slice(&(dict.len() as u32).to_le_bytes());
173    payload.extend_from_slice(&dict);
174    payload.extend_from_slice(&(compressed_chunks.len() as u32).to_le_bytes());
175    for cc in &compressed_chunks {
176        payload.extend_from_slice(&(cc.len() as u32).to_le_bytes());
177        payload.extend_from_slice(cc);
178    }
179
180    Some(payload)
181}
182
183/// Decompress a chunk-based dictionary-compressed payload.
184///
185/// Payload format:
186///   [dict_size: u32 LE] [dict_bytes]
187///   [num_chunks: u32 LE]
188///   for each chunk: [chunk_compressed_size: u32 LE] [chunk_data]
189///
190/// Chunks are decompressed individually and concatenated.
191fn decompress_with_dict(payload: &[u8], capacity: usize) -> std::io::Result<Vec<u8>> {
192    if payload.len() < 4 {
193        return Err(io::Error::new(
194            io::ErrorKind::InvalidData,
195            "dict payload too short for dict_size",
196        ));
197    }
198    let mut pos = 0;
199
200    // Read dictionary.
201    let dict_size =
202        u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
203    pos += 4;
204    if payload.len() < pos + dict_size {
205        return Err(io::Error::new(
206            io::ErrorKind::InvalidData,
207            "dict payload truncated: dictionary bytes",
208        ));
209    }
210    let dict_bytes = &payload[pos..pos + dict_size];
211    pos += dict_size;
212
213    // Read number of chunks.
214    if payload.len() < pos + 4 {
215        return Err(io::Error::new(
216            io::ErrorKind::InvalidData,
217            "dict payload truncated: num_chunks",
218        ));
219    }
220    let num_chunks =
221        u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
222    pos += 4;
223
224    // Prepare decompressor with dictionary.
225    let mut decompressor = zstd::bulk::Decompressor::with_dictionary(dict_bytes)
226        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
227
228    let mut output = Vec::with_capacity(capacity);
229
230    for i in 0..num_chunks {
231        if payload.len() < pos + 4 {
232            return Err(io::Error::new(
233                io::ErrorKind::InvalidData,
234                format!("dict payload truncated: chunk {i} size"),
235            ));
236        }
237        let chunk_size =
238            u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
239        pos += 4;
240        if payload.len() < pos + chunk_size {
241            return Err(io::Error::new(
242                io::ErrorKind::InvalidData,
243                format!("dict payload truncated: chunk {i} data"),
244            ));
245        }
246        let chunk_data = &payload[pos..pos + chunk_size];
247        pos += chunk_size;
248
249        // Each chunk decompresses to at most chunk_size + some headroom.
250        let chunk_capacity = capacity.saturating_sub(output.len());
251        let decompressed = decompressor
252            .decompress(chunk_data, chunk_capacity)
253            .map_err(|e| {
254                io::Error::new(
255                    io::ErrorKind::InvalidData,
256                    format!("chunk {i} decompress failed: {e}"),
257                )
258            })?;
259        output.extend_from_slice(&decompressed);
260    }
261
262    Ok(output)
263}
264
265// ─── Brotli helpers (Fast mode auto-fallback) ─────────────────────────────────
266
267/// Brotli mode constants for `brotli_compress`.
268/// GENERIC (0): default, best for binary/preprocessed data.
269/// TEXT (1): optimized for UTF-8 text, better for raw JSON.
270const BROTLI_MODE_GENERIC: u32 = 0;
271const BROTLI_MODE_TEXT: u32 = 1;
272
273/// Compress `data` with brotli at the given quality (0-11) and mode.
274/// Use `BROTLI_MODE_TEXT` for raw UTF-8/JSON, `BROTLI_MODE_GENERIC` for preprocessed data.
275fn brotli_compress(data: &[u8], quality: u32, mode: u32) -> io::Result<Vec<u8>> {
276    use brotli::enc::backward_references::BrotliEncoderMode;
277    let mut output = Vec::new();
278    let brotli_mode = match mode {
279        1 => BrotliEncoderMode::BROTLI_MODE_TEXT,
280        _ => BrotliEncoderMode::BROTLI_MODE_GENERIC,
281    };
282    let params = brotli::enc::BrotliEncoderParams {
283        quality: quality as i32,
284        mode: brotli_mode,
285        ..Default::default()
286    };
287    brotli::BrotliCompress(&mut io::Cursor::new(data), &mut output, &params)?;
288    Ok(output)
289}
290
291/// Decompress a brotli stream. `max_size` is a capacity hint for the output buffer.
292fn brotli_decompress(data: &[u8]) -> io::Result<Vec<u8>> {
293    let mut output = Vec::new();
294    brotli::BrotliDecompress(&mut io::Cursor::new(data), &mut output)?;
295    Ok(output)
296}
297
298/// Compress data using the CM engine with the given configuration.
299/// Returns the compressed byte stream.
300fn cm_compress(data: &[u8], config: CMConfig) -> Vec<u8> {
301    let mut engine = CMEngine::with_config(config);
302    let mut encoder = ArithmeticEncoder::new();
303
304    for &byte in data {
305        for bpos in 0..8 {
306            let bit = (byte >> (7 - bpos)) & 1;
307            let p = engine.predict();
308            encoder.encode(bit, p);
309            engine.update(bit);
310        }
311    }
312
313    encoder.finish()
314}
315
316/// Decompress data using the CM engine with the given configuration.
317/// `compressed` is the arithmetic-coded stream, `original_size` is the expected output length.
318fn cm_decompress(compressed: &[u8], original_size: usize, config: CMConfig) -> Vec<u8> {
319    let mut engine = CMEngine::with_config(config);
320    let mut decoder = ArithmeticDecoder::new(compressed);
321    let mut output = Vec::with_capacity(original_size);
322
323    for _ in 0..original_size {
324        let mut byte_val: u8 = 0;
325        for bpos in 0..8 {
326            let p = engine.predict();
327            let bit = decoder.decode(p);
328            engine.update(bit);
329            byte_val |= bit << (7 - bpos);
330        }
331        output.push(byte_val);
332    }
333
334    output
335}
336
337// ─── GRU dual-path (CM + GRU byte predictor) ────────────────────────────────
338// The GRU provides a DIFFERENT signal from CM: byte-level cross-bit correlations.
339// It's blended AFTER the full CM pipeline via MetaMixer.
340// CRITICAL: encoder and decoder must produce IDENTICAL GRU + CM state.
341
342/// Compress using dual-path: CM engine + GRU byte predictor + MetaMixer.
343/// Used for Balanced mode.
344fn gru_compress(data: &[u8], config: CMConfig) -> Vec<u8> {
345    let mut engine = CMEngine::with_config(config);
346    let mut gru = GruModel::new();
347    let mut meta_mixer = MetaMixer::new(12); // 12% GRU weight
348    let mut encoder = ArithmeticEncoder::new();
349
350    let total_bytes = data.len();
351    let report_interval = if total_bytes > 100_000 {
352        total_bytes / 20
353    } else {
354        0
355    };
356
357    for (byte_idx, &byte) in data.iter().enumerate() {
358        for bpos in 0..8u8 {
359            let bit = (byte >> (7 - bpos)) & 1;
360
361            // CM prediction (full pipeline: 19 models + mixer + 7 APM).
362            let p_cm = engine.predict();
363
364            // GRU bit prediction from cached byte probs.
365            let partial = if bpos == 0 {
366                1u32
367            } else {
368                let mut p = 1u32;
369                for prev_bpos in 0..bpos {
370                    let prev_bit = (byte >> (7 - prev_bpos)) & 1;
371                    p = (p << 1) | prev_bit as u32;
372                }
373                p
374            };
375            let p_gru = gru.predict_bit(bpos, partial);
376
377            // MetaMixer blend.
378            let p_final = meta_mixer.blend(p_cm, p_gru);
379
380            encoder.encode(bit, p_final);
381            engine.update(bit);
382            meta_mixer.update(bit);
383        }
384
385        // Byte complete: train GRU on observed byte, then forward for next prediction.
386        gru.train(byte);
387        gru.forward(byte);
388
389        if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
390            let pct = (byte_idx + 1) * 100 / total_bytes;
391            eprint!("\r[gru] compressing... {pct}%");
392        }
393    }
394
395    if total_bytes > 100_000 {
396        eprintln!("\r[gru] compressing... 100%");
397    }
398
399    encoder.finish()
400}
401
402/// Decompress using dual-path: CM engine + GRU byte predictor + MetaMixer.
403/// Must produce IDENTICAL GRU + CM state as the encoder.
404fn gru_decompress(compressed: &[u8], original_size: usize, config: CMConfig) -> Vec<u8> {
405    let mut engine = CMEngine::with_config(config);
406    let mut gru = GruModel::new();
407    let mut meta_mixer = MetaMixer::new(12); // same 12% as encoder
408    let mut decoder = ArithmeticDecoder::new(compressed);
409    let mut output = Vec::with_capacity(original_size);
410
411    let report_interval = if original_size > 100_000 {
412        original_size / 20
413    } else {
414        0
415    };
416
417    for byte_idx in 0..original_size {
418        let mut byte_val: u8 = 0;
419
420        for bpos in 0..8u8 {
421            // CM prediction.
422            let p_cm = engine.predict();
423
424            // GRU bit prediction (same partial byte state as encoder).
425            let partial = if bpos == 0 {
426                1u32
427            } else {
428                let mut p = 1u32;
429                for prev_bpos in 0..bpos {
430                    let prev_bit = (byte_val >> (7 - prev_bpos)) & 1;
431                    p = (p << 1) | prev_bit as u32;
432                }
433                p
434            };
435            let p_gru = gru.predict_bit(bpos, partial);
436
437            // MetaMixer blend.
438            let p_final = meta_mixer.blend(p_cm, p_gru);
439
440            let bit = decoder.decode(p_final);
441            engine.update(bit);
442            meta_mixer.update(bit);
443            byte_val |= bit << (7 - bpos);
444        }
445
446        output.push(byte_val);
447
448        // Byte complete: train GRU then forward (same as encoder).
449        gru.train(byte_val);
450        gru.forward(byte_val);
451
452        if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
453            let pct = (byte_idx + 1) * 100 / original_size;
454            eprint!("\r[gru] decompressing... {pct}%");
455        }
456    }
457
458    if original_size > 100_000 {
459        eprintln!("\r[gru] decompressing... 100%");
460    }
461
462    output
463}
464
465// ─── Neural dual-path (CM + LLM) ─────────────────────────────────────────────
466// Feature-gated: only available when `neural` is enabled.
467// The LLM predictor runs alongside the CM engine. A MetaMixer blends them.
468// CRITICAL: encoder and decoder must produce IDENTICAL LLM + CM state.
469
470/// Compress using dual-path: CM engine + LLM predictor + MetaMixer.
471/// Only used for Max mode with neural feature enabled.
472#[cfg(feature = "neural")]
473fn neural_compress(
474    data: &[u8],
475    config: CMConfig,
476    llm: &mut datacortex_neural::LlmPredictor,
477    meta_mixer: &mut datacortex_neural::MetaMixer,
478) -> Vec<u8> {
479    let mut engine = CMEngine::with_config(config);
480    let mut encoder = ArithmeticEncoder::new();
481
482    // For the first byte, LLM has no context. Feed a zero byte to prime it.
483    // We need the LLM to have predicted byte probs BEFORE we start encoding.
484    // Strategy: process byte-by-byte. After encoding byte N, feed byte N to LLM
485    // to get predictions for byte N+1.
486
487    let total_bytes = data.len();
488    let mut bytes_processed = 0;
489    let report_interval = total_bytes / 20; // Report every 5%.
490
491    for (byte_idx, &byte) in data.iter().enumerate() {
492        // At this point, LLM has been fed bytes 0..byte_idx-1.
493        // LLM's cached_byte_probs predict byte_idx.
494
495        for bpos in 0..8u8 {
496            let bit = (byte >> (7 - bpos)) & 1;
497
498            // CM prediction.
499            let p_cm = engine.predict();
500
501            // LLM bit prediction.
502            // c0 is the partial byte being built: starts at 1, accumulates bits.
503            let partial = if bpos == 0 {
504                1u32
505            } else {
506                // Build partial from the bits we've already encoded for this byte.
507                let mut p = 1u32;
508                for prev_bpos in 0..bpos {
509                    let prev_bit = (byte >> (7 - prev_bpos)) & 1;
510                    p = (p << 1) | prev_bit as u32;
511                }
512                p
513            };
514            let p_llm = llm.predict_bit(bpos, partial);
515
516            // Meta-mixer blend.
517            let p_final = meta_mixer.blend(p_cm, p_llm);
518
519            encoder.encode(bit, p_final);
520            engine.update(bit);
521            meta_mixer.update(bit);
522        }
523
524        // Feed the completed byte to the LLM for next-byte prediction.
525        if let Err(e) = llm.predict_byte_probs(byte) {
526            // If LLM fails, it will return uniform on next call. Log but don't abort.
527            if byte_idx < 5 {
528                eprintln!("[neural] LLM predict error at byte {byte_idx}: {e}");
529            }
530        }
531
532        bytes_processed += 1;
533        if report_interval > 0 && bytes_processed % report_interval == 0 {
534            let pct = bytes_processed * 100 / total_bytes;
535            eprint!("\r[neural] compressing... {pct}%");
536        }
537    }
538
539    if total_bytes > 1000 {
540        eprintln!("\r[neural] compressing... 100%");
541    }
542
543    encoder.finish()
544}
545
546/// Decompress using dual-path: CM engine + LLM predictor + MetaMixer.
547/// Must produce IDENTICAL LLM + CM state as the encoder.
548#[cfg(feature = "neural")]
549fn neural_decompress(
550    compressed: &[u8],
551    original_size: usize,
552    config: CMConfig,
553    llm: &mut datacortex_neural::LlmPredictor,
554    meta_mixer: &mut datacortex_neural::MetaMixer,
555) -> Vec<u8> {
556    let mut engine = CMEngine::with_config(config);
557    let mut decoder = ArithmeticDecoder::new(compressed);
558    let mut output = Vec::with_capacity(original_size);
559
560    let report_interval = if original_size > 0 {
561        original_size / 20
562    } else {
563        1
564    };
565
566    for byte_idx in 0..original_size {
567        let mut byte_val: u8 = 0;
568
569        for bpos in 0..8u8 {
570            // CM prediction.
571            let p_cm = engine.predict();
572
573            // LLM bit prediction (using same partial byte state as encoder).
574            let partial = if bpos == 0 {
575                1u32
576            } else {
577                // Build partial from bits already decoded for this byte.
578                let mut p = 1u32;
579                for prev_bpos in 0..bpos {
580                    let prev_bit = (byte_val >> (7 - prev_bpos)) & 1;
581                    p = (p << 1) | prev_bit as u32;
582                }
583                p
584            };
585            let p_llm = llm.predict_bit(bpos, partial);
586
587            // Meta-mixer blend.
588            let p_final = meta_mixer.blend(p_cm, p_llm);
589
590            let bit = decoder.decode(p_final);
591            engine.update(bit);
592            meta_mixer.update(bit);
593            byte_val |= bit << (7 - bpos);
594        }
595
596        output.push(byte_val);
597
598        // Feed decoded byte to LLM (same as encoder did).
599        if let Err(e) = llm.predict_byte_probs(byte_val) {
600            if byte_idx < 5 {
601                eprintln!("[neural] LLM predict error at byte {byte_idx}: {e}");
602            }
603        }
604
605        if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
606            let pct = (byte_idx + 1) * 100 / original_size;
607            eprint!("\r[neural] decompressing... {pct}%");
608        }
609    }
610
611    if original_size > 1000 {
612        eprintln!("\r[neural] decompressing... 100%");
613    }
614
615    output
616}
617
618/// Get the CMConfig for a given mode.
619fn cm_config_for_mode(mode: Mode) -> CMConfig {
620    match mode {
621        Mode::Max => CMConfig::max(),
622        Mode::Balanced => CMConfig::balanced(),
623        Mode::Fast => CMConfig::balanced(), // not used for Fast, but keeps API clean
624    }
625}
626
627/// Resolve the model path from:
628/// 1. Explicit path (--model-path CLI flag)
629/// 2. DATACORTEX_MODEL environment variable
630/// 3. Default: ~/.datacortex/models/SmolLM2-135M-Instruct-Q8_0.gguf
631#[cfg(feature = "neural")]
632fn resolve_model_path(explicit: Option<&str>) -> Option<String> {
633    if let Some(p) = explicit {
634        if std::path::Path::new(p).exists() {
635            return Some(p.to_string());
636        }
637        eprintln!("[neural] explicit model path not found: {p}");
638        return None;
639    }
640
641    if let Ok(p) = std::env::var("DATACORTEX_MODEL") {
642        if p.is_empty() {
643            // Explicitly set to empty = disable neural.
644            return None;
645        }
646        if std::path::Path::new(&p).exists() {
647            return Some(p);
648        }
649        eprintln!("[neural] DATACORTEX_MODEL path not found: {p}");
650        return None; // Don't fall through to default.
651    }
652
653    // Default location.
654    if let Some(home) = std::env::var_os("HOME") {
655        let default = format!(
656            "{}/.datacortex/models/SmolLM2-135M-Instruct-Q8_0.gguf",
657            home.to_string_lossy()
658        );
659        if std::path::Path::new(&default).exists() {
660            return Some(default);
661        }
662    }
663
664    None
665}
666
667/// Train a zstd dictionary from multiple sample files.
668///
669/// Each sample should be a complete JSON/NDJSON file's bytes. The function
670/// splits them into training fragments and calls `zstd::dict::from_samples`.
671///
672/// `max_dict_size` controls the max dictionary size in bytes (typical: 32768-131072).
673/// Returns the trained dictionary bytes.
674pub fn train_dict(samples: &[&[u8]], max_dict_size: usize) -> io::Result<Vec<u8>> {
675    if samples.is_empty() {
676        return Err(io::Error::other(
677            "no samples provided for dictionary training",
678        ));
679    }
680
681    // Collect training fragments: split each sample into reasonable chunks.
682    let mut fragments: Vec<&[u8]> = Vec::new();
683    for sample in samples {
684        if sample.is_empty() {
685            continue;
686        }
687        // For NDJSON: split by newlines (each line is a training fragment).
688        let lines: Vec<&[u8]> = sample
689            .split(|&b| b == b'\n')
690            .filter(|l| !l.is_empty())
691            .collect();
692        if lines.len() >= 5 {
693            fragments.extend(lines);
694        } else {
695            // For non-NDJSON: use fixed-size blocks.
696            let chunk_size = 4096.min(sample.len());
697            let mut offset = 0;
698            while offset < sample.len() {
699                let end = (offset + chunk_size).min(sample.len());
700                fragments.push(&sample[offset..end]);
701                offset = end;
702            }
703        }
704    }
705
706    if fragments.len() < 5 {
707        return Err(io::Error::other(
708            "not enough training data (need at least 5 fragments)",
709        ));
710    }
711
712    let dict = zstd::dict::from_samples(&fragments, max_dict_size)
713        .map_err(|e| io::Error::other(format!("dictionary training failed: {e}")))?;
714
715    if dict.is_empty() {
716        return Err(io::Error::other(
717            "dictionary training produced empty dictionary",
718        ));
719    }
720
721    Ok(dict)
722}
723
724/// Compress `data` into .dcx format, writing to `output`.
725pub fn compress<W: Write>(
726    data: &[u8],
727    mode: Mode,
728    format_override: Option<FormatHint>,
729    output: &mut W,
730) -> io::Result<()> {
731    compress_with_model(data, mode, format_override, None, output)
732}
733
734/// Compress with optional explicit model path (for neural Max mode).
735pub fn compress_with_model<W: Write>(
736    data: &[u8],
737    mode: Mode,
738    format_override: Option<FormatHint>,
739    model_path: Option<&str>,
740    output: &mut W,
741) -> io::Result<()> {
742    compress_with_options(data, mode, format_override, model_path, None, output)
743}
744
745/// Compress with optional explicit model path and zstd level override.
746pub fn compress_with_options<W: Write>(
747    data: &[u8],
748    mode: Mode,
749    format_override: Option<FormatHint>,
750    model_path: Option<&str>,
751    zstd_level_override: Option<i32>,
752    output: &mut W,
753) -> io::Result<()> {
754    compress_with_full_options(
755        data,
756        mode,
757        format_override,
758        model_path,
759        zstd_level_override,
760        None,
761        output,
762    )
763}
764
765/// Compress with all options including external dictionary.
766pub fn compress_with_full_options<W: Write>(
767    data: &[u8],
768    mode: Mode,
769    format_override: Option<FormatHint>,
770    model_path: Option<&str>,
771    zstd_level_override: Option<i32>,
772    external_dict: Option<&[u8]>,
773    output: &mut W,
774) -> io::Result<()> {
775    compress_with_all_options(
776        data,
777        mode,
778        format_override,
779        model_path,
780        zstd_level_override,
781        external_dict,
782        false,
783        output,
784    )
785}
786
787/// Compress with all options including turbo mode.
788///
789/// When `turbo` is true (Fast mode only):
790///   - Uses zstd level 3 (instead of adaptive 19/16/9)
791///   - Skips brotli paths entirely
792///   - Skips dictionary training
793///   - Runs only 2 paths (preprocessed+zstd, raw+zstd) instead of 6+
794///   - Result: 5-15x faster encode with ~10-15% ratio loss
795///
796/// The output .dcx format is identical -- decompression is unchanged.
797pub fn compress_with_all_options<W: Write>(
798    data: &[u8],
799    mode: Mode,
800    format_override: Option<FormatHint>,
801    model_path: Option<&str>,
802    zstd_level_override: Option<i32>,
803    external_dict: Option<&[u8]>,
804    turbo: bool,
805    output: &mut W,
806) -> io::Result<()> {
807    let format_hint = format_override.unwrap_or_else(|| detect_format(data));
808    let crc = crc32fast::hash(data);
809
810    // Step 1: Format-aware preprocessing.
811    let (preprocessed, chain) = preprocess(data, format_hint, mode);
812    let transform_metadata = if chain.is_empty() {
813        vec![]
814    } else {
815        chain.serialize()
816    };
817
818    // Step 2: Compress with engine.
819    let mut use_dict = false;
820    let mut use_brotli = false;
821    // Track whether raw fallback won (empty transform chain).
822    let mut use_raw_fallback = false;
823    // Track whether metadata is embedded in the compressed stream.
824    let mut use_meta_embedded = false;
825    let compressed = match mode {
826        // Fast mode: auto-fallback — try preprocessed+zstd, raw+zstd, raw+brotli,
827        // preprocessed+brotli, and embedded-metadata+brotli. Keep whichever produces
828        // the smallest output (including header and metadata overhead).
829        //
830        // Preprocessing (columnar + typed encoding) usually helps zstd by grouping
831        // similar values. But for some files (e.g. citm_catalog.json with extreme
832        // repetition), raw zstd without preprocessing gives MUCH better results
833        // because preprocessing removes the repetition patterns zstd's LZ77 exploits.
834        //
835        // Brotli at quality 11 can beat zstd on some JSON files (e.g. twitter.json)
836        // because its context modeling handles certain data patterns better.
837        //
838        // For small files with transforms, embedding metadata inside the brotli stream
839        // saves the separate metadata overhead (~150 bytes), because brotli compresses
840        // the 4-byte length prefix + raw metadata nearly for free.
841        Mode::Fast if turbo => {
842            // Turbo mode: speed-optimized Fast mode.
843            // Only 2 paths (preprocessed+zstd-3, raw+zstd-3), no brotli, no dict.
844            // 5-15x faster encode with ~10-15% ratio loss vs max-ratio Fast mode.
845            let level = zstd_level_override.unwrap_or(3);
846
847            let (comp_pre, comp_raw) = rayon::join(
848                || zstd::bulk::compress(&preprocessed, level),
849                || zstd::bulk::compress(data, level),
850            );
851
852            let meta_size = if transform_metadata.len() > 64 {
853                let cm = zstd::bulk::compress(&transform_metadata, 19)
854                    .unwrap_or_else(|_| transform_metadata.clone());
855                cm.len().min(transform_metadata.len())
856            } else {
857                transform_metadata.len()
858            };
859
860            match (comp_pre, comp_raw) {
861                (Ok(pre), Ok(raw)) => {
862                    let pre_total = 32 + meta_size + pre.len();
863                    let raw_total = 32 + raw.len();
864                    if raw_total < pre_total {
865                        use_raw_fallback = true;
866                        raw
867                    } else {
868                        pre
869                    }
870                }
871                (Ok(pre), Err(_)) => pre,
872                (Err(_), Ok(raw)) => {
873                    use_raw_fallback = true;
874                    raw
875                }
876                (Err(e), Err(_)) => {
877                    return Err(io::Error::other(format!(
878                        "turbo compression failed: {e}"
879                    )));
880                }
881            }
882        }
883        Mode::Fast => {
884            // Fast mode: auto-fallback with PARALLEL path evaluation.
885            // All 6+ compression paths run concurrently via rayon, then we
886            // keep whichever produces the smallest output.
887            use std::sync::Mutex;
888
889            let level = adaptive_fast_level(preprocessed.len(), zstd_level_override);
890            let raw_level = adaptive_fast_level(data.len(), zstd_level_override);
891
892            // Estimate compressed metadata size for fair comparison.
893            let meta_size_for_comparison = if transform_metadata.len() > 64 {
894                let compressed_meta = zstd::bulk::compress(&transform_metadata, 19)
895                    .unwrap_or_else(|_| transform_metadata.clone());
896                compressed_meta.len().min(transform_metadata.len())
897            } else {
898                transform_metadata.len()
899            };
900
901            // Build embedded metadata payload (shared read-only across threads).
902            let embedded_payload = if !transform_metadata.is_empty() {
903                let mut ep = Vec::with_capacity(4 + transform_metadata.len() + preprocessed.len());
904                ep.extend_from_slice(&(transform_metadata.len() as u32).to_le_bytes());
905                ep.extend_from_slice(&transform_metadata);
906                ep.extend_from_slice(&preprocessed);
907                Some(ep)
908            } else {
909                None
910            };
911
912            // Each path result: (compressed_bytes, total_size, use_dict, use_raw, use_brotli, use_embedded)
913            type PathResult = (Vec<u8>, usize, bool, bool, bool, bool);
914            let results = Mutex::new(Vec::<PathResult>::with_capacity(8));
915
916            rayon::scope(|s| {
917                // Path A: preprocessed + zstd (with optional dict).
918                s.spawn(|_| {
919                    if let Ok(plain) = zstd::bulk::compress(&preprocessed, level) {
920                        let (compressed, is_dict) = if let Some(ext_dict) = external_dict {
921                            // Use externally provided dictionary.
922                            let chunk_size = dict_chunk_size(preprocessed.len());
923                            let chunks = split_into_chunks(&preprocessed, chunk_size);
924                            if let Ok(mut compressor) =
925                                zstd::bulk::Compressor::with_dictionary(level, ext_dict)
926                            {
927                                let mut ok = true;
928                                let mut cc_list = Vec::with_capacity(chunks.len());
929                                for chunk in &chunks {
930                                    match compressor.compress(chunk) {
931                                        Ok(cc) => cc_list.push(cc),
932                                        Err(_) => {
933                                            ok = false;
934                                            break;
935                                        }
936                                    }
937                                }
938                                if ok {
939                                    let total_cc: usize = cc_list.iter().map(|c| 4 + c.len()).sum();
940                                    let payload_size = 4 + ext_dict.len() + 4 + total_cc;
941                                    if payload_size < plain.len() {
942                                        let mut payload = Vec::with_capacity(payload_size);
943                                        payload.extend_from_slice(
944                                            &(ext_dict.len() as u32).to_le_bytes(),
945                                        );
946                                        payload.extend_from_slice(ext_dict);
947                                        payload.extend_from_slice(
948                                            &(cc_list.len() as u32).to_le_bytes(),
949                                        );
950                                        for cc in &cc_list {
951                                            payload.extend_from_slice(
952                                                &(cc.len() as u32).to_le_bytes(),
953                                            );
954                                            payload.extend_from_slice(cc);
955                                        }
956                                        (payload, true)
957                                    } else {
958                                        (plain, false)
959                                    }
960                                } else {
961                                    (plain, false)
962                                }
963                            } else {
964                                (plain, false)
965                            }
966                        } else if preprocessed.len() >= DICT_MIN_DATA_SIZE {
967                            if let Some(dict_payload) =
968                                try_dict_compress(&preprocessed, level, plain.len())
969                            {
970                                (dict_payload, true)
971                            } else {
972                                (plain, false)
973                            }
974                        } else {
975                            (plain, false)
976                        };
977                        let total = 32 + meta_size_for_comparison + compressed.len();
978                        results
979                            .lock()
980                            .unwrap()
981                            .push((compressed, total, is_dict, false, false, false));
982                    }
983                });
984
985                // Path B: raw zstd (no preprocessing).
986                s.spawn(|_| {
987                    if let Ok(compressed) = zstd::bulk::compress(data, raw_level) {
988                        let total = 32 + compressed.len();
989                        results
990                            .lock()
991                            .unwrap()
992                            .push((compressed, total, false, true, false, false));
993                    }
994                });
995
996                // Path C: raw + brotli (TEXT mode).
997                // Quality tiers: q11 ≤1MB, q10 1-16MB, q9 >16MB.
998                // q10 is ~10x faster than q11 with only ~3% ratio loss, keeping
999                // wall time competitive with zstd-19.
1000                s.spawn(|_| {
1001                    let q = if data.len() <= 1_048_576 {
1002                        11
1003                    } else if data.len() <= 16_777_216 {
1004                        10
1005                    } else {
1006                        9
1007                    };
1008                    if let Ok(compressed) = brotli_compress(data, q, BROTLI_MODE_TEXT) {
1009                        let total = 32 + compressed.len();
1010                        results
1011                            .lock()
1012                            .unwrap()
1013                            .push((compressed, total, false, true, true, false));
1014                    }
1015                });
1016
1017                // Path D: preprocessed + brotli (GENERIC mode, dual quality).
1018                s.spawn(|_| {
1019                    let max_q = if preprocessed.len() <= 1_048_576 {
1020                        11
1021                    } else if preprocessed.len() <= 16_777_216 {
1022                        10
1023                    } else {
1024                        9
1025                    };
1026                    let qualities: &[u32] = if max_q == 11 {
1027                        &[11, 10]
1028                    } else {
1029                        &[max_q as u32]
1030                    };
1031                    let mut best: Option<PathResult> = None;
1032                    for &q in qualities {
1033                        if let Ok(compressed) =
1034                            brotli_compress(&preprocessed, q, BROTLI_MODE_GENERIC)
1035                        {
1036                            let total = 32 + meta_size_for_comparison + compressed.len();
1037                            if best.as_ref().is_none_or(|b| total < b.1) {
1038                                best = Some((compressed, total, false, false, true, false));
1039                            }
1040                        }
1041                    }
1042                    if let Some(r) = best {
1043                        results.lock().unwrap().push(r);
1044                    }
1045                });
1046
1047                // Path E: embedded metadata + brotli (GENERIC mode, dual quality).
1048                if let Some(ref ep) = embedded_payload {
1049                    s.spawn(|_| {
1050                        let max_q = if ep.len() <= 1_048_576 {
1051                            11
1052                        } else if ep.len() <= 16_777_216 {
1053                            10
1054                        } else {
1055                            9
1056                        };
1057                        let qualities: &[u32] = if max_q == 11 {
1058                            &[11, 10]
1059                        } else {
1060                            &[max_q as u32]
1061                        };
1062                        let mut best: Option<PathResult> = None;
1063                        for &q in qualities {
1064                            if let Ok(compressed) = brotli_compress(ep, q, BROTLI_MODE_GENERIC) {
1065                                let total = 32 + compressed.len();
1066                                if best.as_ref().is_none_or(|b| total < b.1) {
1067                                    best = Some((compressed, total, false, false, true, true));
1068                                }
1069                            }
1070                        }
1071                        if let Some(r) = best {
1072                            results.lock().unwrap().push(r);
1073                        }
1074                    });
1075                }
1076
1077                // Path F: embedded metadata + zstd.
1078                if let Some(ref ep) = embedded_payload {
1079                    s.spawn(|_| {
1080                        let embed_level = adaptive_fast_level(ep.len(), zstd_level_override);
1081                        if let Ok(compressed) = zstd::bulk::compress(ep, embed_level) {
1082                            let total = 32 + compressed.len();
1083                            results
1084                                .lock()
1085                                .unwrap()
1086                                .push((compressed, total, false, false, false, true));
1087                        }
1088                    });
1089                }
1090            });
1091
1092            // Pick the smallest result.
1093            let results = results.into_inner().unwrap();
1094            let best = results
1095                .into_iter()
1096                .min_by_key(|r| r.1)
1097                .ok_or_else(|| io::Error::other("all compression paths failed"))?;
1098
1099            use_dict = best.2;
1100            use_raw_fallback = best.3;
1101            use_brotli = best.4;
1102            use_meta_embedded = best.5;
1103            best.0
1104        }
1105        // Balanced mode: dual-path CM + GRU byte predictor.
1106        Mode::Balanced => {
1107            let config = cm_config_for_mode(mode);
1108            let cm_data = gru_compress(&preprocessed, config);
1109            let mut payload = Vec::with_capacity(8 + cm_data.len());
1110            payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
1111            payload.extend_from_slice(&cm_data);
1112            payload
1113        }
1114        // Max mode: try neural dual-path, fall back to CM-only.
1115        Mode::Max => {
1116            let config = cm_config_for_mode(mode);
1117
1118            #[cfg(feature = "neural")]
1119            {
1120                if let Some(mpath) = resolve_model_path(model_path) {
1121                    match datacortex_neural::LlmPredictor::new(&mpath) {
1122                        Ok(mut llm) => {
1123                            let mut meta_mixer = datacortex_neural::MetaMixer::new(5);
1124                            eprintln!(
1125                                "[neural] Max mode: dual-path CM+LLM ({} bytes mapped)",
1126                                llm.mapped_bytes()
1127                            );
1128                            let cm_data =
1129                                neural_compress(&preprocessed, config, &mut llm, &mut meta_mixer);
1130                            let mut payload = Vec::with_capacity(8 + cm_data.len());
1131                            // Byte 0 of the 8-byte size prefix: set bit 7 to flag neural mode.
1132                            // This lets the decompressor know to use neural path.
1133                            let size_with_flag = preprocessed.len() as u64 | (1u64 << 63);
1134                            payload.extend_from_slice(&size_with_flag.to_le_bytes());
1135                            payload.extend_from_slice(&cm_data);
1136                            payload
1137                        }
1138                        Err(e) => {
1139                            eprintln!("[neural] LLM init failed, falling back to CM-only: {e}");
1140                            let cm_data = cm_compress(&preprocessed, config);
1141                            let mut payload = Vec::with_capacity(8 + cm_data.len());
1142                            payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
1143                            payload.extend_from_slice(&cm_data);
1144                            payload
1145                        }
1146                    }
1147                } else {
1148                    eprintln!(
1149                        "[neural] no model found, Max mode using CM-only. \
1150                         Set DATACORTEX_MODEL or use --model-path."
1151                    );
1152                    let cm_data = cm_compress(&preprocessed, config);
1153                    let mut payload = Vec::with_capacity(8 + cm_data.len());
1154                    payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
1155                    payload.extend_from_slice(&cm_data);
1156                    payload
1157                }
1158            }
1159
1160            #[cfg(not(feature = "neural"))]
1161            {
1162                let _ = model_path; // suppress unused warning
1163                let cm_data = cm_compress(&preprocessed, config);
1164                let mut payload = Vec::with_capacity(8 + cm_data.len());
1165                payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
1166                payload.extend_from_slice(&cm_data);
1167                payload
1168            }
1169        }
1170    };
1171
1172    // When raw fallback or embedded metadata won, use empty header metadata.
1173    // - Raw fallback: decompressor handles empty chains (just decompresses, no reverse transforms).
1174    // - Embedded: metadata lives inside the compressed stream, not in the header.
1175    let final_metadata = if use_raw_fallback || use_meta_embedded {
1176        vec![]
1177    } else {
1178        transform_metadata
1179    };
1180
1181    // Compress metadata with zstd if it's large enough to benefit.
1182    // Small metadata (<= 64 bytes) stays raw to avoid zstd frame overhead.
1183    // Skipped when metadata is embedded (final_metadata is empty).
1184    let (header_metadata, meta_compressed) = if final_metadata.len() > 64 {
1185        let compressed_meta =
1186            zstd::bulk::compress(&final_metadata, 19).unwrap_or_else(|_| final_metadata.clone());
1187        if compressed_meta.len() < final_metadata.len() {
1188            (compressed_meta, true)
1189        } else {
1190            (final_metadata, false)
1191        }
1192    } else {
1193        (final_metadata, false)
1194    };
1195
1196    let header = DcxHeader {
1197        mode,
1198        format_hint,
1199        original_size: data.len() as u64,
1200        compressed_size: compressed.len() as u64,
1201        crc32: crc,
1202        transform_metadata: header_metadata,
1203        has_dict: use_dict,
1204        meta_compressed,
1205        use_brotli,
1206        meta_embedded: use_meta_embedded,
1207    };
1208
1209    header.write_to(output)?;
1210    output.write_all(&compressed)?;
1211
1212    Ok(())
1213}
1214
1215/// Decompress a .dcx file from `input`, returning the original data.
1216pub fn decompress<R: Read>(input: &mut R) -> io::Result<Vec<u8>> {
1217    decompress_with_model(input, None)
1218}
1219
1220/// Decompress with optional explicit model path (for neural Max mode).
1221pub fn decompress_with_model<R: Read>(
1222    input: &mut R,
1223    model_path: Option<&str>,
1224) -> io::Result<Vec<u8>> {
1225    let header = DcxHeader::read_from(input)?;
1226
1227    let mut compressed = vec![0u8; header.compressed_size as usize];
1228    input.read_exact(&mut compressed)?;
1229
1230    // Step 1: Decompress with engine.
1231    let preprocessed = match header.mode {
1232        Mode::Fast => {
1233            if header.use_brotli {
1234                brotli_decompress(&compressed)?
1235            } else {
1236                let capacity = header.original_size as usize * 2 + 65536;
1237                if header.has_dict {
1238                    decompress_with_dict(&compressed, capacity)?
1239                } else {
1240                    zstd::bulk::decompress(&compressed, capacity)
1241                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
1242                }
1243            }
1244        }
1245        Mode::Balanced => {
1246            // Balanced mode: dual-path CM + GRU byte predictor.
1247            if compressed.len() < 8 {
1248                return Err(io::Error::new(
1249                    io::ErrorKind::InvalidData,
1250                    "CM mode compressed data too short",
1251                ));
1252            }
1253            let size_raw = u64::from_le_bytes(compressed[..8].try_into().expect("8-byte slice"));
1254            let preprocessed_size = (size_raw & !(1u64 << 63)) as usize;
1255            let config = cm_config_for_mode(header.mode);
1256            gru_decompress(&compressed[8..], preprocessed_size, config)
1257        }
1258        Mode::Max => {
1259            // Max mode: may use neural (LLM) dual-path or CM-only.
1260            if compressed.len() < 8 {
1261                return Err(io::Error::new(
1262                    io::ErrorKind::InvalidData,
1263                    "CM mode compressed data too short",
1264                ));
1265            }
1266            let size_raw = u64::from_le_bytes(compressed[..8].try_into().expect("8-byte slice"));
1267
1268            // Check if bit 63 is set (neural flag).
1269            let neural_flag = size_raw & (1u64 << 63) != 0;
1270            let preprocessed_size = (size_raw & !(1u64 << 63)) as usize;
1271            let config = cm_config_for_mode(header.mode);
1272
1273            if neural_flag {
1274                #[cfg(feature = "neural")]
1275                {
1276                    if let Some(mpath) = resolve_model_path(model_path) {
1277                        match datacortex_neural::LlmPredictor::new(&mpath) {
1278                            Ok(mut llm) => {
1279                                let mut meta_mixer = datacortex_neural::MetaMixer::new(5);
1280                                eprintln!(
1281                                    "[neural] decompressing with dual-path CM+LLM ({} bytes mapped)",
1282                                    llm.mapped_bytes()
1283                                );
1284                                neural_decompress(
1285                                    &compressed[8..],
1286                                    preprocessed_size,
1287                                    config,
1288                                    &mut llm,
1289                                    &mut meta_mixer,
1290                                )
1291                            }
1292                            Err(e) => {
1293                                return Err(io::Error::new(
1294                                    io::ErrorKind::Other,
1295                                    format!(
1296                                        "file was compressed with neural mode but LLM failed to load: {e}"
1297                                    ),
1298                                ));
1299                            }
1300                        }
1301                    } else {
1302                        return Err(io::Error::new(
1303                            io::ErrorKind::Other,
1304                            "file was compressed with neural mode but no model found. \
1305                             Set DATACORTEX_MODEL or use --model-path.",
1306                        ));
1307                    }
1308                }
1309
1310                #[cfg(not(feature = "neural"))]
1311                {
1312                    let _ = model_path;
1313                    return Err(io::Error::other(
1314                        "file was compressed with neural mode but this build lacks the \
1315                         `neural` feature. Rebuild with --features neural.",
1316                    ));
1317                }
1318            } else {
1319                cm_decompress(&compressed[8..], preprocessed_size, config)
1320            }
1321        }
1322    };
1323
1324    // Step 1.5: Handle embedded metadata OR separate metadata.
1325    // When meta_embedded is set, the decompressed stream starts with:
1326    //   [meta_len: u32 LE][raw_metadata][preprocessed_data]
1327    // We extract the metadata and the actual preprocessed data from the stream.
1328    let (preprocessed, transform_metadata) = if header.meta_embedded {
1329        if preprocessed.len() < 4 {
1330            return Err(io::Error::new(
1331                io::ErrorKind::InvalidData,
1332                "embedded metadata: decompressed stream too short for meta_len",
1333            ));
1334        }
1335        let meta_len =
1336            u32::from_le_bytes(preprocessed[0..4].try_into().expect("4-byte slice")) as usize;
1337        if preprocessed.len() < 4 + meta_len {
1338            return Err(io::Error::new(
1339                io::ErrorKind::InvalidData,
1340                format!(
1341                    "embedded metadata: stream too short for metadata ({} bytes needed, {} available)",
1342                    4 + meta_len,
1343                    preprocessed.len()
1344                ),
1345            ));
1346        }
1347        let metadata = preprocessed[4..4 + meta_len].to_vec();
1348        let actual_preprocessed = preprocessed[4 + meta_len..].to_vec();
1349        (actual_preprocessed, metadata)
1350    } else {
1351        // Decompress metadata if it was zstd-compressed (separate metadata path).
1352        // Use streaming decoder to avoid guessing decompressed size.
1353        let tm = if header.meta_compressed && !header.transform_metadata.is_empty() {
1354            let mut decoder =
1355                zstd::Decoder::new(Cursor::new(&header.transform_metadata)).map_err(|e| {
1356                    io::Error::new(
1357                        io::ErrorKind::InvalidData,
1358                        format!("failed to init metadata decompressor: {e}"),
1359                    )
1360                })?;
1361            let mut decompressed_meta = Vec::new();
1362            decoder.read_to_end(&mut decompressed_meta).map_err(|e| {
1363                io::Error::new(
1364                    io::ErrorKind::InvalidData,
1365                    format!("failed to decompress transform metadata: {e}"),
1366                )
1367            })?;
1368            decompressed_meta
1369        } else {
1370            header.transform_metadata.clone()
1371        };
1372        (preprocessed, tm)
1373    };
1374
1375    // Step 2: Reverse preprocessing.
1376    let data = if transform_metadata.is_empty() {
1377        preprocessed
1378    } else {
1379        let chain = TransformChain::deserialize(&transform_metadata)?;
1380        reverse_preprocess(&preprocessed, &chain)
1381    };
1382
1383    // CRC-32 integrity check.
1384    let crc = crc32fast::hash(&data);
1385    if crc != header.crc32 {
1386        return Err(io::Error::new(
1387            io::ErrorKind::InvalidData,
1388            format!(
1389                "CRC-32 mismatch: expected {:#010X}, got {:#010X}",
1390                header.crc32, crc
1391            ),
1392        ));
1393    }
1394
1395    if data.len() as u64 != header.original_size {
1396        return Err(io::Error::new(
1397            io::ErrorKind::InvalidData,
1398            format!(
1399                "size mismatch: header says {} bytes, got {}",
1400                header.original_size,
1401                data.len()
1402            ),
1403        ));
1404    }
1405
1406    Ok(data)
1407}
1408
1409/// Compress to Vec (convenience).
1410pub fn compress_to_vec(
1411    data: &[u8],
1412    mode: Mode,
1413    format_override: Option<FormatHint>,
1414) -> io::Result<Vec<u8>> {
1415    let mut buf = Vec::new();
1416    compress(data, mode, format_override, &mut buf)?;
1417    Ok(buf)
1418}
1419
1420/// Compress in turbo mode (speed-optimized Fast mode).
1421/// Uses zstd-3, skips brotli, 2 paths only. 5-15x faster, ~10-15% ratio loss.
1422pub fn compress_turbo<W: Write>(
1423    data: &[u8],
1424    format_override: Option<FormatHint>,
1425    output: &mut W,
1426) -> io::Result<()> {
1427    compress_with_all_options(data, Mode::Fast, format_override, None, None, None, true, output)
1428}
1429
1430/// Compress to Vec in turbo mode (convenience).
1431pub fn compress_to_vec_turbo(
1432    data: &[u8],
1433    format_override: Option<FormatHint>,
1434) -> io::Result<Vec<u8>> {
1435    let mut buf = Vec::new();
1436    compress_turbo(data, format_override, &mut buf)?;
1437    Ok(buf)
1438}
1439
1440/// Compress to Vec with explicit model path.
1441pub fn compress_to_vec_with_model(
1442    data: &[u8],
1443    mode: Mode,
1444    format_override: Option<FormatHint>,
1445    model_path: Option<&str>,
1446) -> io::Result<Vec<u8>> {
1447    let mut buf = Vec::new();
1448    compress_with_model(data, mode, format_override, model_path, &mut buf)?;
1449    Ok(buf)
1450}
1451
1452/// Compress to Vec with explicit model path and zstd level override.
1453pub fn compress_to_vec_with_options(
1454    data: &[u8],
1455    mode: Mode,
1456    format_override: Option<FormatHint>,
1457    model_path: Option<&str>,
1458    zstd_level_override: Option<i32>,
1459) -> io::Result<Vec<u8>> {
1460    let mut buf = Vec::new();
1461    compress_with_options(
1462        data,
1463        mode,
1464        format_override,
1465        model_path,
1466        zstd_level_override,
1467        &mut buf,
1468    )?;
1469    Ok(buf)
1470}
1471
1472/// Decompress from slice (convenience).
1473pub fn decompress_from_slice(dcx_data: &[u8]) -> io::Result<Vec<u8>> {
1474    let mut cursor = Cursor::new(dcx_data);
1475    decompress(&mut cursor)
1476}
1477
1478/// Read header only (for `info` command).
1479pub fn read_header<R: Read>(input: &mut R) -> io::Result<DcxHeader> {
1480    DcxHeader::read_from(input)
1481}
1482
1483/// Compress raw data with zstd at a given level (for benchmark comparison).
1484pub fn raw_zstd_compress(data: &[u8], level: i32) -> io::Result<Vec<u8>> {
1485    zstd::bulk::compress(data, level).map_err(io::Error::other)
1486}
1487
1488#[cfg(test)]
1489mod tests {
1490    use super::*;
1491
1492    #[test]
1493    fn fast_mode_roundtrip() {
1494        let original = b"Hello, DataCortex! This is a test of Fast mode compression.";
1495        let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1496        let decompressed = decompress_from_slice(&compressed).unwrap();
1497        assert_eq!(decompressed, original);
1498    }
1499
1500    #[test]
1501    fn turbo_mode_roundtrip() {
1502        let original = b"Hello, DataCortex! This is a test of turbo mode compression.";
1503        let compressed = compress_to_vec_turbo(original, None).unwrap();
1504        let decompressed = decompress_from_slice(&compressed).unwrap();
1505        assert_eq!(decompressed, original);
1506    }
1507
1508    #[test]
1509    fn turbo_mode_ndjson_roundtrip() {
1510        let data = b"{\"id\":1,\"name\":\"Alice\"}\n{\"id\":2,\"name\":\"Bob\"}\n{\"id\":3,\"name\":\"Carol\"}\n";
1511        let compressed = compress_to_vec_turbo(data, Some(FormatHint::Ndjson)).unwrap();
1512        let decompressed = decompress_from_slice(&compressed).unwrap();
1513        assert_eq!(decompressed, data.to_vec());
1514    }
1515
1516    #[test]
1517    fn turbo_mode_beats_raw_zstd3() {
1518        // Turbo mode should produce smaller output than raw zstd-3 on structured data.
1519        let mut data = Vec::new();
1520        for i in 0..200 {
1521            data.extend_from_slice(
1522                format!("{{\"id\":{},\"type\":\"PushEvent\",\"name\":\"user{}\"}}\n", i, i % 20)
1523                    .as_bytes(),
1524            );
1525        }
1526        let turbo = compress_to_vec_turbo(&data, Some(FormatHint::Ndjson)).unwrap();
1527        let raw = raw_zstd_compress(&data, 3).unwrap();
1528        assert!(
1529            turbo.len() <= raw.len(),
1530            "turbo {} should be <= raw zstd-3 {} on structured NDJSON",
1531            turbo.len(),
1532            raw.len()
1533        );
1534    }
1535
1536    #[test]
1537    fn fast_mode_json_roundtrip() {
1538        let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25,"name":"Carol","age":35}"#;
1539        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1540        let decompressed = decompress_from_slice(&compressed).unwrap();
1541        assert_eq!(decompressed, data.to_vec());
1542    }
1543
1544    #[test]
1545    fn balanced_mode_roundtrip() {
1546        let original = b"Balanced mode test data with some content.";
1547        let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1548        let decompressed = decompress_from_slice(&compressed).unwrap();
1549        assert_eq!(decompressed, original);
1550    }
1551
1552    #[test]
1553    fn balanced_mode_longer_text() {
1554        let original = b"The quick brown fox jumps over the lazy dog. This sentence contains every letter of the English alphabet at least once. We need enough data to properly exercise the arithmetic coder and order-0 model.";
1555        let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1556        let decompressed = decompress_from_slice(&compressed).unwrap();
1557        assert_eq!(decompressed, original);
1558    }
1559
1560    #[test]
1561    fn balanced_mode_repetitive_data() {
1562        let data = "hello world! ".repeat(100);
1563        let compressed = compress_to_vec(data.as_bytes(), Mode::Balanced, None).unwrap();
1564        let decompressed = decompress_from_slice(&compressed).unwrap();
1565        assert_eq!(decompressed, data.as_bytes());
1566    }
1567
1568    #[test]
1569    fn balanced_mode_all_byte_values() {
1570        let original: Vec<u8> = (0..=255).collect();
1571        let compressed = compress_to_vec(&original, Mode::Balanced, None).unwrap();
1572        let decompressed = decompress_from_slice(&compressed).unwrap();
1573        assert_eq!(decompressed, original);
1574    }
1575
1576    #[test]
1577    fn balanced_mode_single_byte() {
1578        let original = b"X";
1579        let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1580        let decompressed = decompress_from_slice(&compressed).unwrap();
1581        assert_eq!(decompressed, original);
1582    }
1583
1584    #[test]
1585    fn balanced_mode_json_roundtrip() {
1586        let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25,"name":"Carol","age":35}"#;
1587        let compressed = compress_to_vec(data, Mode::Balanced, Some(FormatHint::Json)).unwrap();
1588        let decompressed = decompress_from_slice(&compressed).unwrap();
1589        assert_eq!(decompressed, data.to_vec());
1590    }
1591
1592    #[test]
1593    fn empty_data_roundtrip() {
1594        let original = b"";
1595        for mode in [Mode::Fast, Mode::Balanced, Mode::Max] {
1596            let compressed = compress_to_vec(original, mode, None).unwrap();
1597            let decompressed = decompress_from_slice(&compressed).unwrap();
1598            assert_eq!(decompressed, original, "failed for mode {mode}");
1599        }
1600    }
1601
1602    #[test]
1603    fn crc_mismatch_detected() {
1604        let original = b"test data for CRC check";
1605        let mut compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1606        // Corrupt in the compressed data section (after header).
1607        let header_size = 32; // minimum header
1608        if compressed.len() > header_size + 5 {
1609            compressed[header_size + 3] ^= 0xFF;
1610        }
1611        assert!(decompress_from_slice(&compressed).is_err());
1612    }
1613
1614    #[test]
1615    fn fast_mode_actually_compresses() {
1616        // Repetitive data should compress well with zstd.
1617        let data = "hello world. ".repeat(100);
1618        let compressed = compress_to_vec(data.as_bytes(), Mode::Fast, None).unwrap();
1619        assert!(
1620            compressed.len() < data.len(),
1621            "Fast mode should compress repetitive data: {} vs {}",
1622            compressed.len(),
1623            data.len()
1624        );
1625    }
1626
1627    #[test]
1628    fn json_preprocessing_improves_fast_mode() {
1629        let data = br#"[{"name":"Alice","score":95},{"name":"Bob","score":87},{"name":"Carol","score":92},{"name":"Dave","score":88},{"name":"Eve","score":91}]"#;
1630        let with_preprocess = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1631        let without_preprocess =
1632            compress_to_vec(data, Mode::Fast, Some(FormatHint::Generic)).unwrap();
1633
1634        // Both should decompress correctly.
1635        assert_eq!(
1636            decompress_from_slice(&with_preprocess).unwrap(),
1637            data.to_vec()
1638        );
1639        assert_eq!(
1640            decompress_from_slice(&without_preprocess).unwrap(),
1641            data.to_vec()
1642        );
1643    }
1644
1645    #[test]
1646    fn all_modes_roundtrip() {
1647        let data = b"test all modes with some more content to ensure decent compression";
1648        for mode in [Mode::Max, Mode::Balanced, Mode::Fast] {
1649            let compressed = compress_to_vec(data, mode, None).unwrap();
1650            let decompressed = decompress_from_slice(&compressed).unwrap();
1651            assert_eq!(decompressed, data, "failed for mode {mode}");
1652        }
1653    }
1654
1655    #[test]
1656    fn cm_compress_decompress_direct() {
1657        let data = b"Hello, World! This is a direct CM test.";
1658        let compressed = cm_compress(data, CMConfig::balanced());
1659        let decompressed = cm_decompress(&compressed, data.len(), CMConfig::balanced());
1660        assert_eq!(decompressed, data.to_vec());
1661    }
1662
1663    #[test]
1664    fn cm_empty() {
1665        let data: &[u8] = b"";
1666        let compressed = cm_compress(data, CMConfig::balanced());
1667        let decompressed = cm_decompress(&compressed, 0, CMConfig::balanced());
1668        assert!(decompressed.is_empty());
1669    }
1670
1671    #[test]
1672    fn cm_single_byte() {
1673        for byte in 0..=255u8 {
1674            let data = [byte];
1675            let compressed = cm_compress(&data, CMConfig::balanced());
1676            let decompressed = cm_decompress(&compressed, 1, CMConfig::balanced());
1677            assert_eq!(
1678                decompressed, data,
1679                "CM roundtrip failed for byte {byte:#04X}"
1680            );
1681        }
1682    }
1683
1684    #[test]
1685    fn cm_repetitive_compresses() {
1686        let data = vec![b'A'; 1000];
1687        let compressed = cm_compress(&data, CMConfig::balanced());
1688        // 1000 identical bytes should compress well with adaptive model.
1689        assert!(
1690            compressed.len() < 200,
1691            "CM should compress 1000 identical bytes well: {} bytes",
1692            compressed.len()
1693        );
1694        let decompressed = cm_decompress(&compressed, data.len(), CMConfig::balanced());
1695        assert_eq!(decompressed, data);
1696    }
1697
1698    #[test]
1699    fn max_mode_roundtrip() {
1700        let original = b"Max mode test data with some content for compression.";
1701        let compressed = compress_to_vec(original, Mode::Max, None).unwrap();
1702        let decompressed = decompress_from_slice(&compressed).unwrap();
1703        assert_eq!(decompressed, original);
1704    }
1705
1706    #[test]
1707    fn max_mode_longer_text() {
1708        let original = b"The quick brown fox jumps over the lazy dog. Max mode uses 2x context maps for better predictions with fewer hash collisions. This should compress slightly better than balanced mode.";
1709        let compressed = compress_to_vec(original, Mode::Max, None).unwrap();
1710        let decompressed = decompress_from_slice(&compressed).unwrap();
1711        assert_eq!(decompressed, original);
1712    }
1713
1714    // ─── Dictionary compression tests ──────────────────────────────────────────
1715
1716    #[test]
1717    fn test_dict_compress_roundtrip() {
1718        // Generate NDJSON data large enough to trigger dictionary training.
1719        // Repetitive columnar data is ideal for dictionary learning.
1720        let mut ndjson = String::new();
1721        for i in 0..500 {
1722            ndjson.push_str(&format!(
1723                r#"{{"id":{},"name":"user_{}","status":"active","score":{}}}"#,
1724                i,
1725                i,
1726                i * 17 % 100
1727            ));
1728            ndjson.push('\n');
1729        }
1730        let data = ndjson.as_bytes();
1731        assert!(
1732            data.len() > DICT_MIN_DATA_SIZE,
1733            "test data should exceed dict threshold: {} bytes",
1734            data.len()
1735        );
1736
1737        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1738        let decompressed = decompress_from_slice(&compressed).unwrap();
1739        assert_eq!(
1740            decompressed, data,
1741            "dict compress roundtrip: byte-exact mismatch"
1742        );
1743    }
1744
1745    #[test]
1746    fn test_dict_falls_back_on_small() {
1747        // Data smaller than DICT_MIN_DATA_SIZE should not use dictionary.
1748        let data = b"small data that won't trigger dictionary training";
1749        assert!(data.len() < DICT_MIN_DATA_SIZE);
1750
1751        let compressed = compress_to_vec(data, Mode::Fast, None).unwrap();
1752        let decompressed = decompress_from_slice(&compressed).unwrap();
1753        assert_eq!(decompressed, data.to_vec());
1754
1755        // Verify no dict flag in header.
1756        let mut cursor = Cursor::new(&compressed);
1757        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1758        assert!(!header.has_dict, "small data should not have dict flag set");
1759    }
1760
1761    #[test]
1762    fn test_dict_backward_compat() {
1763        // Compress with old behavior (no dict) and verify it still decompresses.
1764        // We simulate this by compressing small data (which skips dict).
1765        let original = b"backward compatibility test data for decompression";
1766        let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1767
1768        // Verify the flag is NOT set.
1769        let mut cursor = Cursor::new(&compressed);
1770        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1771        assert!(!header.has_dict);
1772
1773        // Decompress should work fine.
1774        let decompressed = decompress_from_slice(&compressed).unwrap();
1775        assert_eq!(decompressed, original.to_vec());
1776    }
1777
1778    #[test]
1779    fn test_dict_ndjson_large_roundtrip() {
1780        // Larger NDJSON dataset — should benefit from dictionary.
1781        let mut ndjson = String::new();
1782        for i in 0..2000 {
1783            ndjson.push_str(&format!(
1784                r#"{{"timestamp":"2025-01-{:02}T{:02}:{:02}:00Z","level":"info","message":"Request processed","request_id":"req_{}","duration_ms":{}}}"#,
1785                (i % 28) + 1,
1786                i % 24,
1787                i % 60,
1788                i,
1789                (i * 13) % 500
1790            ));
1791            ndjson.push('\n');
1792        }
1793        let data = ndjson.as_bytes();
1794
1795        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1796        let decompressed = decompress_from_slice(&compressed).unwrap();
1797        assert_eq!(decompressed, data, "large NDJSON roundtrip mismatch");
1798    }
1799
1800    #[test]
1801    fn test_dict_generic_data_roundtrip() {
1802        // Generic (non-JSON) data that's large enough for dict training.
1803        // Uses fixed-size block splitting instead of column boundaries.
1804        let mut data = Vec::new();
1805        for i in 0..3000 {
1806            data.extend_from_slice(
1807                format!("line {i}: the quick brown fox jumps over the lazy dog\n").as_bytes(),
1808            );
1809        }
1810        assert!(data.len() > DICT_MIN_DATA_SIZE);
1811
1812        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Generic)).unwrap();
1813        let decompressed = decompress_from_slice(&compressed).unwrap();
1814        assert_eq!(decompressed, data, "generic data dict roundtrip mismatch");
1815    }
1816
1817    #[test]
1818    fn test_dict_does_not_affect_other_modes() {
1819        // Dictionary training should only apply to Fast mode.
1820        // Balanced and Max modes should remain unchanged.
1821        let mut ndjson = String::new();
1822        for i in 0..200 {
1823            ndjson.push_str(&format!(
1824                r#"{{"id":{},"name":"user_{}","status":"active"}}"#,
1825                i, i
1826            ));
1827            ndjson.push('\n');
1828        }
1829        let data = ndjson.as_bytes();
1830
1831        for mode in [Mode::Balanced, Mode::Max] {
1832            let compressed = compress_to_vec(data, mode, Some(FormatHint::Ndjson)).unwrap();
1833            let mut cursor = Cursor::new(&compressed);
1834            let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1835            assert!(!header.has_dict, "mode {mode} should never have dict flag");
1836            let decompressed = decompress_from_slice(&compressed).unwrap();
1837            assert_eq!(decompressed, data, "roundtrip failed for mode {mode}");
1838        }
1839    }
1840
1841    // ─── Configurable zstd level tests ──────────────────────────────────────
1842
1843    #[test]
1844    fn test_compress_with_level() {
1845        // Compress with level 19 override in Fast mode, verify roundtrip.
1846        let data = "hello world, compressing with custom zstd level. ".repeat(50);
1847        let compressed =
1848            compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(19))
1849                .unwrap();
1850        let decompressed = decompress_from_slice(&compressed).unwrap();
1851        assert_eq!(decompressed, data.as_bytes(), "level 19 roundtrip failed");
1852    }
1853
1854    #[test]
1855    fn test_compress_with_level_default() {
1856        // No level override — should use mode default (9 for Fast).
1857        let data = "default level test data. ".repeat(50);
1858        let compressed =
1859            compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, None).unwrap();
1860        let decompressed = decompress_from_slice(&compressed).unwrap();
1861        assert_eq!(
1862            decompressed,
1863            data.as_bytes(),
1864            "default level roundtrip failed"
1865        );
1866    }
1867
1868    #[test]
1869    fn test_compress_with_level_higher_ratio() {
1870        // Level 19 should compress better than level 1 on repetitive data.
1871        let data = r#"{"name":"Alice","score":95}"#.repeat(200);
1872        let low =
1873            compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(1)).unwrap();
1874        let high = compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(19))
1875            .unwrap();
1876
1877        // Both must roundtrip.
1878        assert_eq!(decompress_from_slice(&low).unwrap(), data.as_bytes());
1879        assert_eq!(decompress_from_slice(&high).unwrap(), data.as_bytes());
1880
1881        // Higher level should produce smaller output (or at least not larger).
1882        assert!(
1883            high.len() <= low.len(),
1884            "level 19 ({}) should be <= level 1 ({})",
1885            high.len(),
1886            low.len()
1887        );
1888    }
1889
1890    // ─── Auto-fallback tests ──────────────────────────────────────────────────
1891
1892    #[test]
1893    fn test_auto_fallback_picks_smaller() {
1894        // citm_catalog.json has extreme repetition. The auto-fallback picks
1895        // whichever path (raw or preprocessed) produces the smallest output.
1896        // With compressed metadata, the preprocessed path may now win.
1897        let data = std::fs::read(concat!(
1898            env!("CARGO_MANIFEST_DIR"),
1899            "/../../corpus/json-bench/citm_catalog.json"
1900        ))
1901        .unwrap();
1902
1903        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1904        let decompressed = decompress_from_slice(&compressed).unwrap();
1905        assert_eq!(decompressed, data, "citm_catalog roundtrip failed");
1906
1907        // Verify good compression ratio regardless of which path won.
1908        let ratio = data.len() as f64 / compressed.len() as f64;
1909        assert!(
1910            ratio > 50.0,
1911            "citm_catalog should achieve >50x, got {ratio:.1}x"
1912        );
1913    }
1914
1915    #[test]
1916    fn test_auto_fallback_preprocessed_wins_on_ndjson() {
1917        // NDJSON with uniform schema should still prefer preprocessed path
1918        // (columnar + typed encoding beats raw zstd for structured data).
1919        let data = std::fs::read(concat!(
1920            env!("CARGO_MANIFEST_DIR"),
1921            "/../../corpus/test-ndjson.ndjson"
1922        ))
1923        .unwrap();
1924
1925        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1926        let decompressed = decompress_from_slice(&compressed).unwrap();
1927        assert_eq!(decompressed, data, "test-ndjson roundtrip failed");
1928
1929        // Check that preprocessing was used: either non-empty transform metadata
1930        // in the header, or metadata embedded in the compressed stream.
1931        let mut cursor = Cursor::new(&compressed);
1932        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1933        assert!(
1934            !header.transform_metadata.is_empty() || header.meta_embedded,
1935            "test-ndjson should prefer preprocessed path (non-empty transform metadata or embedded)"
1936        );
1937    }
1938
1939    #[test]
1940    fn test_auto_fallback_roundtrip() {
1941        // Verify both raw and preprocessed paths produce correct roundtrips.
1942        // Use citm_catalog (raw wins) and test-ndjson (preprocessed wins).
1943        let citm = std::fs::read(concat!(
1944            env!("CARGO_MANIFEST_DIR"),
1945            "/../../corpus/json-bench/citm_catalog.json"
1946        ))
1947        .unwrap();
1948        let ndjson = std::fs::read(concat!(
1949            env!("CARGO_MANIFEST_DIR"),
1950            "/../../corpus/test-ndjson.ndjson"
1951        ))
1952        .unwrap();
1953
1954        // citm_catalog — raw path
1955        let compressed_citm = compress_to_vec(&citm, Mode::Fast, Some(FormatHint::Json)).unwrap();
1956        let decompressed_citm = decompress_from_slice(&compressed_citm).unwrap();
1957        assert_eq!(
1958            decompressed_citm, citm,
1959            "citm_catalog roundtrip (raw path) failed"
1960        );
1961
1962        // test-ndjson — preprocessed path
1963        let compressed_ndjson =
1964            compress_to_vec(&ndjson, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1965        let decompressed_ndjson = decompress_from_slice(&compressed_ndjson).unwrap();
1966        assert_eq!(
1967            decompressed_ndjson, ndjson,
1968            "test-ndjson roundtrip (preprocessed path) failed"
1969        );
1970    }
1971
1972    // ─── Adaptive level tests ─────────────────────────────────────────────────
1973
1974    #[test]
1975    fn test_adaptive_level_small_data() {
1976        // ≤16MB should use level 19 — best ratio, preprocessing dominates encode time.
1977        assert_eq!(adaptive_fast_level(100_000, None), 19);
1978        assert_eq!(adaptive_fast_level(500_000, None), 19);
1979        assert_eq!(adaptive_fast_level(1_048_576, None), 19);
1980        assert_eq!(adaptive_fast_level(0, None), 19);
1981    }
1982
1983    #[test]
1984    fn test_adaptive_level_medium_data() {
1985        // 1-16MB still gets level 19 — zstd levels 9-15 are a plateau
1986        // (identical ratio on structured JSON), so we skip to 19.
1987        assert_eq!(adaptive_fast_level(1_048_577, None), 19);
1988        assert_eq!(adaptive_fast_level(5_000_000, None), 19);
1989        assert_eq!(adaptive_fast_level(10_485_760, None), 19);
1990        assert_eq!(adaptive_fast_level(16_777_216, None), 19);
1991    }
1992
1993    #[test]
1994    fn test_adaptive_level_large_data() {
1995        // 16-64MB uses level 16 (btultra breakpoint), >64MB uses level 9.
1996        assert_eq!(adaptive_fast_level(16_777_217, None), 16);
1997        assert_eq!(adaptive_fast_level(33_554_432, None), 16);
1998        assert_eq!(adaptive_fast_level(67_108_864, None), 16);
1999        assert_eq!(adaptive_fast_level(67_108_865, None), 9);
2000        assert_eq!(adaptive_fast_level(100_000_000, None), 9);
2001    }
2002
2003    #[test]
2004    fn test_adaptive_level_override() {
2005        // --level flag should always override adaptive level.
2006        assert_eq!(adaptive_fast_level(100, Some(3)), 3);
2007        assert_eq!(adaptive_fast_level(100_000_000, Some(22)), 22);
2008        assert_eq!(adaptive_fast_level(0, Some(1)), 1);
2009    }
2010
2011    // ─── Compressed metadata tests ──────────────────────────────────────────────
2012
2013    #[test]
2014    fn test_compressed_metadata_roundtrip() {
2015        // Generate NDJSON data large enough to produce > 64 bytes of transform metadata.
2016        let mut ndjson = String::new();
2017        for i in 0..500 {
2018            ndjson.push_str(&format!(
2019                r#"{{"id":{},"name":"user_{}","status":"active","score":{}}}"#,
2020                i,
2021                i,
2022                i * 17 % 100
2023            ));
2024            ndjson.push('\n');
2025        }
2026        let data = ndjson.as_bytes();
2027
2028        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2029        let decompressed = decompress_from_slice(&compressed).unwrap();
2030        assert_eq!(
2031            decompressed, data,
2032            "compressed metadata roundtrip: byte-exact mismatch"
2033        );
2034
2035        // Verify the header has meta_compressed set if metadata was large enough.
2036        let mut cursor = Cursor::new(&compressed);
2037        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
2038        // The file should have used preprocessed path (non-empty metadata).
2039        if !header.transform_metadata.is_empty() && header.transform_metadata.len() > 10 {
2040            // Metadata was present — check that compressed flag makes sense.
2041            // (meta_compressed is true only if compression actually saved space)
2042            // Just verify roundtrip was correct — the flag is an optimization detail.
2043        }
2044    }
2045
2046    #[test]
2047    fn test_compressed_metadata_backward_compat() {
2048        // Simulate old files that have no compressed metadata (bit 2 = 0).
2049        // These should still decompress correctly.
2050        let original = b"backward compatibility test data for metadata decompression";
2051        let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
2052
2053        // Verify decompression works.
2054        let decompressed = decompress_from_slice(&compressed).unwrap();
2055        assert_eq!(decompressed, original.to_vec());
2056
2057        // For small data, metadata should be empty or very small — no compression.
2058        let mut cursor = Cursor::new(&compressed);
2059        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
2060        // Small data may or may not have metadata, but it should roundtrip either way.
2061        assert!(!header.meta_compressed || !header.transform_metadata.is_empty());
2062    }
2063
2064    #[test]
2065    fn test_compressed_metadata_small_skipped() {
2066        // Small metadata (< 64 bytes) should NOT be compressed — zstd frame overhead
2067        // would make it larger.
2068        let data = br#"{"name":"Alice","age":30}"#;
2069        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2070        let decompressed = decompress_from_slice(&compressed).unwrap();
2071        assert_eq!(decompressed, data.to_vec());
2072
2073        let mut cursor = Cursor::new(&compressed);
2074        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
2075        // Small JSON has small metadata — should not be compressed.
2076        if header.transform_metadata.len() <= 64 {
2077            assert!(
2078                !header.meta_compressed,
2079                "metadata <= 64 bytes should not be compressed, but meta_compressed=true \
2080                 for {} bytes of metadata",
2081                header.transform_metadata.len()
2082            );
2083        }
2084    }
2085
2086    #[test]
2087    fn test_twitter_json_brotli_wins() {
2088        // twitter.json should use brotli — raw brotli-11 beats both preprocessed+zstd
2089        // and raw+zstd on this file.
2090        let data = std::fs::read(concat!(
2091            env!("CARGO_MANIFEST_DIR"),
2092            "/../../corpus/json-bench/twitter.json"
2093        ))
2094        .unwrap();
2095
2096        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2097        let decompressed = decompress_from_slice(&compressed).unwrap();
2098        assert_eq!(decompressed, data, "twitter.json roundtrip failed");
2099
2100        // Check that brotli was selected.
2101        let mut cursor = Cursor::new(&compressed);
2102        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
2103        assert!(
2104            header.use_brotli,
2105            "twitter.json should use brotli (FLAG_BROTLI set in header)"
2106        );
2107    }
2108
2109    #[test]
2110    fn test_compressed_metadata_all_modes_roundtrip() {
2111        // Metadata compression applies to all modes, not just Fast.
2112        let mut ndjson = String::new();
2113        for i in 0..200 {
2114            ndjson.push_str(&format!(
2115                r#"{{"id":{},"name":"user_{}","status":"active"}}"#,
2116                i, i
2117            ));
2118            ndjson.push('\n');
2119        }
2120        let data = ndjson.as_bytes();
2121
2122        for mode in [Mode::Fast, Mode::Balanced, Mode::Max] {
2123            let compressed = compress_to_vec(data, mode, Some(FormatHint::Ndjson)).unwrap();
2124            let decompressed = decompress_from_slice(&compressed).unwrap();
2125            assert_eq!(
2126                decompressed, data,
2127                "compressed metadata roundtrip failed for mode {mode}"
2128            );
2129        }
2130    }
2131
2132    // ─── Brotli auto-fallback tests ──────────────────────────────────────────
2133
2134    #[test]
2135    fn test_brotli_compress_roundtrip() {
2136        // Direct brotli compress/decompress helper roundtrip.
2137        let data = b"Hello, brotli! This is a test of the brotli compression helpers.";
2138        let compressed = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
2139        let decompressed = brotli_decompress(&compressed).unwrap();
2140        assert_eq!(decompressed, data.to_vec());
2141    }
2142
2143    #[test]
2144    fn test_brotli_auto_fallback_twitter() {
2145        // twitter.json should select brotli and roundtrip correctly.
2146        let data = std::fs::read(concat!(
2147            env!("CARGO_MANIFEST_DIR"),
2148            "/../../corpus/json-bench/twitter.json"
2149        ))
2150        .unwrap();
2151
2152        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2153        let decompressed = decompress_from_slice(&compressed).unwrap();
2154        assert_eq!(decompressed, data, "twitter.json brotli roundtrip failed");
2155
2156        let mut cursor = Cursor::new(&compressed);
2157        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
2158        assert!(
2159            header.use_brotli,
2160            "twitter.json should use brotli in auto-fallback"
2161        );
2162    }
2163
2164    #[test]
2165    fn test_brotli_ndjson_roundtrip() {
2166        // NDJSON with uniform schema — regardless of which entropy coder wins,
2167        // the roundtrip must be byte-exact.
2168        let data = std::fs::read(concat!(
2169            env!("CARGO_MANIFEST_DIR"),
2170            "/../../corpus/test-ndjson.ndjson"
2171        ))
2172        .unwrap();
2173
2174        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2175        let decompressed = decompress_from_slice(&compressed).unwrap();
2176        assert_eq!(decompressed, data, "ndjson roundtrip failed");
2177    }
2178
2179    #[test]
2180    fn test_brotli_backward_compat() {
2181        // Old .dcx files without the brotli flag (bit 3 = 0) must still decompress.
2182        // We simulate an old file by manually crafting a .dcx with FLAG_BROTLI unset.
2183        // Compress with zstd directly and build a minimal .dcx header.
2184        let original = b"backward compatibility test: this data was compressed without brotli";
2185        let crc = crc32fast::hash(original);
2186        let zstd_compressed = zstd::bulk::compress(original, 19).unwrap();
2187
2188        let header = crate::dcx::DcxHeader {
2189            mode: Mode::Fast,
2190            format_hint: crate::dcx::FormatHint::Generic,
2191            original_size: original.len() as u64,
2192            compressed_size: zstd_compressed.len() as u64,
2193            crc32: crc,
2194            transform_metadata: vec![],
2195            has_dict: false,
2196            meta_compressed: false,
2197            use_brotli: false,
2198            meta_embedded: false,
2199        };
2200
2201        let mut buf = Vec::new();
2202        header.write_to(&mut buf).unwrap();
2203        buf.extend_from_slice(&zstd_compressed);
2204
2205        // Verify the brotli flag is NOT set in the serialized header.
2206        assert_eq!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2207
2208        // Decompress — must work even though brotli path exists.
2209        let decompressed = decompress_from_slice(&buf).unwrap();
2210        assert_eq!(decompressed, original.to_vec());
2211    }
2212
2213    // ─── Embedded metadata tests ──────────────────────────────────────────────
2214
2215    #[test]
2216    fn test_embedded_metadata_roundtrip() {
2217        // Compress test-api.json with Fast mode — if embedded metadata is used,
2218        // the roundtrip must be byte-exact.
2219        let data = std::fs::read(concat!(
2220            env!("CARGO_MANIFEST_DIR"),
2221            "/../../corpus/test-api.json"
2222        ))
2223        .unwrap();
2224
2225        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2226        let decompressed = decompress_from_slice(&compressed).unwrap();
2227        assert_eq!(
2228            decompressed, data,
2229            "test-api.json embedded metadata roundtrip: byte-exact mismatch"
2230        );
2231    }
2232
2233    #[test]
2234    fn test_embedded_metadata_backward_compat() {
2235        // Old .dcx files without the meta_embedded flag (bit 4 = 0) must still decompress.
2236        // We simulate an old file by manually crafting a .dcx with FLAG_META_EMBEDDED unset
2237        // and separate transform metadata.
2238        let original = b"backward compat: no embedded metadata in this old file format";
2239        let crc = crc32fast::hash(original);
2240        let zstd_compressed = zstd::bulk::compress(original, 19).unwrap();
2241
2242        let header = crate::dcx::DcxHeader {
2243            mode: Mode::Fast,
2244            format_hint: crate::dcx::FormatHint::Generic,
2245            original_size: original.len() as u64,
2246            compressed_size: zstd_compressed.len() as u64,
2247            crc32: crc,
2248            transform_metadata: vec![],
2249            has_dict: false,
2250            meta_compressed: false,
2251            use_brotli: false,
2252            meta_embedded: false,
2253        };
2254
2255        let mut buf = Vec::new();
2256        header.write_to(&mut buf).unwrap();
2257        buf.extend_from_slice(&zstd_compressed);
2258
2259        // Verify meta_embedded flag is NOT set.
2260        assert_eq!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2261
2262        // Decompress — must work without embedded metadata support.
2263        let decompressed = decompress_from_slice(&buf).unwrap();
2264        assert_eq!(decompressed, original.to_vec());
2265    }
2266
2267    #[test]
2268    fn test_embedded_metadata_small_file_improvement() {
2269        // test-api.json is a small file (37KB) where embedded metadata should
2270        // save overhead compared to separate metadata.
2271        let data = std::fs::read(concat!(
2272            env!("CARGO_MANIFEST_DIR"),
2273            "/../../corpus/test-api.json"
2274        ))
2275        .unwrap();
2276
2277        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2278        let decompressed = decompress_from_slice(&compressed).unwrap();
2279        assert_eq!(decompressed, data, "roundtrip failed");
2280
2281        // Verify the file compresses to a reasonable size.
2282        let ratio = data.len() as f64 / compressed.len() as f64;
2283        assert!(
2284            ratio > 5.0,
2285            "test-api.json should achieve >5x compression, got {ratio:.1}x"
2286        );
2287
2288        // Check header to see which path was chosen.
2289        let mut cursor = Cursor::new(&compressed);
2290        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
2291
2292        // If embedded was chosen, verify the flag is set and header metadata is empty.
2293        if header.meta_embedded {
2294            assert!(
2295                header.transform_metadata.is_empty(),
2296                "meta_embedded header should have empty transform_metadata"
2297            );
2298            assert!(header.use_brotli, "meta_embedded should use brotli codec");
2299        }
2300    }
2301
2302    #[test]
2303    fn test_embedded_metadata_ndjson_roundtrip() {
2304        // NDJSON files with transforms must still roundtrip correctly
2305        // regardless of whether embedded or separate metadata is chosen.
2306        let data = std::fs::read(concat!(
2307            env!("CARGO_MANIFEST_DIR"),
2308            "/../../corpus/test-ndjson.ndjson"
2309        ))
2310        .unwrap();
2311
2312        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2313        let decompressed = decompress_from_slice(&compressed).unwrap();
2314        assert_eq!(
2315            decompressed, data,
2316            "NDJSON embedded metadata roundtrip: byte-exact mismatch"
2317        );
2318    }
2319
2320    #[test]
2321    fn test_embedded_metadata_manual_roundtrip() {
2322        // Manually construct an embedded-metadata .dcx to verify the decompress path
2323        // handles the format correctly, independent of what the compressor chooses.
2324        let original = b"Hello, embedded metadata world! This is a test.";
2325        let crc = crc32fast::hash(original);
2326
2327        // Build embedded payload with an empty transform chain so reverse_preprocess
2328        // is a no-op and the data passes through unchanged.
2329        let empty_chain = TransformChain::new();
2330        let raw_metadata = empty_chain.serialize();
2331
2332        // Build embedded payload: [meta_len:u32 LE][raw_metadata][original_data]
2333        let mut embedded = Vec::new();
2334        embedded.extend_from_slice(&(raw_metadata.len() as u32).to_le_bytes());
2335        embedded.extend_from_slice(&raw_metadata);
2336        embedded.extend_from_slice(original);
2337
2338        let brotli_data = brotli_compress(&embedded, 11, BROTLI_MODE_GENERIC).unwrap();
2339
2340        let header = crate::dcx::DcxHeader {
2341            mode: Mode::Fast,
2342            format_hint: crate::dcx::FormatHint::Generic,
2343            original_size: original.len() as u64,
2344            compressed_size: brotli_data.len() as u64,
2345            crc32: crc,
2346            transform_metadata: vec![], // empty — metadata is embedded
2347            has_dict: false,
2348            meta_compressed: false,
2349            use_brotli: true,
2350            meta_embedded: true,
2351        };
2352
2353        let mut buf = Vec::new();
2354        header.write_to(&mut buf).unwrap();
2355        buf.extend_from_slice(&brotli_data);
2356
2357        // Verify flags.
2358        assert_ne!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2359        assert_ne!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2360
2361        // Decompress and verify.
2362        let decompressed = decompress_from_slice(&buf).unwrap();
2363        assert_eq!(decompressed, original.to_vec());
2364    }
2365
2366    // ─── Optimization: Brotli TEXT mode tests ───────────────────────────────
2367
2368    #[test]
2369    fn test_brotli_text_mode_on_raw() {
2370        // Verify TEXT mode produces valid brotli that decompresses correctly.
2371        let data = br#"{"name":"Alice","age":30,"city":"New York","active":true}"#;
2372
2373        // TEXT mode (for raw UTF-8/JSON).
2374        let compressed_text = brotli_compress(data, 11, BROTLI_MODE_TEXT).unwrap();
2375        let decompressed_text = brotli_decompress(&compressed_text).unwrap();
2376        assert_eq!(
2377            decompressed_text,
2378            data.to_vec(),
2379            "TEXT mode roundtrip failed"
2380        );
2381
2382        // GENERIC mode (for comparison).
2383        let compressed_generic = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
2384        let decompressed_generic = brotli_decompress(&compressed_generic).unwrap();
2385        assert_eq!(
2386            decompressed_generic,
2387            data.to_vec(),
2388            "GENERIC mode roundtrip failed"
2389        );
2390
2391        // Both must produce valid output — TEXT mode should not be larger than
2392        // GENERIC on UTF-8 text (or at most within a few bytes).
2393        // We don't assert TEXT < GENERIC because on tiny data the difference is negligible,
2394        // but we verify the feature works.
2395        assert!(
2396            !compressed_text.is_empty(),
2397            "TEXT mode should produce non-empty output"
2398        );
2399    }
2400
2401    // ─── Optimization: Zstd embedded metadata tests ─────────────────────────
2402
2403    #[test]
2404    fn test_zstd_embedded_metadata_roundtrip() {
2405        // Manually construct a .dcx with zstd-compressed embedded metadata
2406        // (meta_embedded=true, use_brotli=false) and verify roundtrip.
2407        let original = b"Hello, zstd embedded metadata! This is a test of the zstd path.";
2408        let crc = crc32fast::hash(original);
2409
2410        // Build embedded payload with an empty transform chain.
2411        let empty_chain = TransformChain::new();
2412        let raw_metadata = empty_chain.serialize();
2413
2414        // [meta_len:u32 LE][raw_metadata][original_data]
2415        let mut embedded = Vec::new();
2416        embedded.extend_from_slice(&(raw_metadata.len() as u32).to_le_bytes());
2417        embedded.extend_from_slice(&raw_metadata);
2418        embedded.extend_from_slice(original);
2419
2420        let zstd_data = zstd::bulk::compress(&embedded, 19).unwrap();
2421
2422        let header = crate::dcx::DcxHeader {
2423            mode: Mode::Fast,
2424            format_hint: crate::dcx::FormatHint::Generic,
2425            original_size: original.len() as u64,
2426            compressed_size: zstd_data.len() as u64,
2427            crc32: crc,
2428            transform_metadata: vec![], // empty — metadata is embedded
2429            has_dict: false,
2430            meta_compressed: false,
2431            use_brotli: false, // zstd, not brotli
2432            meta_embedded: true,
2433        };
2434
2435        let mut buf = Vec::new();
2436        header.write_to(&mut buf).unwrap();
2437        buf.extend_from_slice(&zstd_data);
2438
2439        // Verify flags: meta_embedded set, brotli NOT set.
2440        assert_ne!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2441        assert_eq!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2442
2443        // Decompress and verify byte-exact roundtrip.
2444        let decompressed = decompress_from_slice(&buf).unwrap();
2445        assert_eq!(decompressed, original.to_vec());
2446    }
2447
2448    // ─── Optimization: Multi-quality brotli tests ───────────────────────────
2449
2450    #[test]
2451    fn test_multi_quality_brotli() {
2452        // Verify both quality 10 and 11 produce valid brotli that decompresses.
2453        // On some data q10 beats q11 — we just verify both work correctly.
2454        let data = br#"{"items":[1,2,3,4,5],"nested":{"a":"hello","b":"world"}}"#;
2455
2456        let q10 = brotli_compress(data, 10, BROTLI_MODE_GENERIC).unwrap();
2457        let q11 = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
2458
2459        let dec_q10 = brotli_decompress(&q10).unwrap();
2460        let dec_q11 = brotli_decompress(&q11).unwrap();
2461
2462        assert_eq!(dec_q10, data.to_vec(), "quality 10 roundtrip failed");
2463        assert_eq!(dec_q11, data.to_vec(), "quality 11 roundtrip failed");
2464
2465        // Both should produce non-empty compressed output.
2466        assert!(!q10.is_empty());
2467        assert!(!q11.is_empty());
2468
2469        // The auto-fallback should pick the smaller one.
2470        // We can't assert which is smaller (data-dependent), but verify the logic
2471        // by checking that auto-fallback roundtrips on real corpus files.
2472        let corpus_files = [
2473            concat!(env!("CARGO_MANIFEST_DIR"), "/../../corpus/test-api.json"),
2474            concat!(
2475                env!("CARGO_MANIFEST_DIR"),
2476                "/../../corpus/json-bench/twitter.json"
2477            ),
2478        ];
2479        for path in corpus_files {
2480            let file_data = std::fs::read(path).unwrap();
2481            let compressed =
2482                compress_to_vec(&file_data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2483            let decompressed = decompress_from_slice(&compressed).unwrap();
2484            assert_eq!(
2485                decompressed, file_data,
2486                "multi-quality roundtrip failed for {path}"
2487            );
2488        }
2489    }
2490
2491    // ─── Adversarial Regression Tests ────────────────────────────────────────
2492
2493    #[test]
2494    fn test_singleton_arrays_fast_roundtrip() {
2495        // Bug 1: NDJSON with singleton array values like [{"x":0}] caused CRC
2496        // mismatch in fast mode because typed encoding corrupted unquoted values.
2497        let rows: Vec<String> = (0..500)
2498            .map(|i| format!("{{\"items\":[{{\"x\":{}}}],\"id\":{}}}", i, i))
2499            .collect();
2500        let data = rows.join("\n") + "\n";
2501        let compressed =
2502            compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2503        let decompressed = decompress_from_slice(&compressed).unwrap();
2504        assert_eq!(
2505            decompressed,
2506            data.as_bytes(),
2507            "singleton_arrays fast mode roundtrip failed"
2508        );
2509    }
2510
2511    #[test]
2512    fn test_very_long_lines_fast_roundtrip() {
2513        // Bug 2: NDJSON with 100KB string values caused CRC mismatch because
2514        // encode_string_column used u16 for per-value lengths (max 65535).
2515        let rows: Vec<String> = (0..50)
2516            .map(|i| format!("{{\"data\":\"{}\",\"id\":{}}}", "X".repeat(100_000), i))
2517            .collect();
2518        let data = rows.join("\n") + "\n";
2519        let compressed =
2520            compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2521        let decompressed = decompress_from_slice(&compressed).unwrap();
2522        assert_eq!(
2523            decompressed,
2524            data.as_bytes(),
2525            "very_long_lines fast mode roundtrip failed"
2526        );
2527    }
2528
2529    #[test]
2530    fn test_very_long_lines_balanced_roundtrip() {
2531        // Bug 2 also affected balanced mode — the NDJSON columnar transform
2532        // itself is mode-independent, and long strings overflow u16 everywhere.
2533        let rows: Vec<String> = (0..10)
2534            .map(|i| format!("{{\"data\":\"{}\",\"id\":{}}}", "X".repeat(100_000), i))
2535            .collect();
2536        let data = rows.join("\n") + "\n";
2537        let compressed =
2538            compress_to_vec(data.as_bytes(), Mode::Balanced, Some(FormatHint::Ndjson)).unwrap();
2539        let decompressed = decompress_from_slice(&compressed).unwrap();
2540        assert_eq!(
2541            decompressed,
2542            data.as_bytes(),
2543            "very_long_lines balanced mode roundtrip failed"
2544        );
2545    }
2546
2547    #[test]
2548    fn test_all_same_value_fast_roundtrip() {
2549        // Bug 3: 10K identical rows of {"x":1} caused SIGBUS crash in fast mode.
2550        // After typed encoding, the delta-varint stream was full of 0x00 bytes.
2551        // generate_training_samples split on 0x00, creating thousands of tiny
2552        // fragments that crashed zstd dictionary training.
2553        let rows: Vec<String> = (0..10_000).map(|_| "{\"x\":1}".to_string()).collect();
2554        let data = rows.join("\n") + "\n";
2555        let compressed =
2556            compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2557        let decompressed = decompress_from_slice(&compressed).unwrap();
2558        assert_eq!(
2559            decompressed,
2560            data.as_bytes(),
2561            "all_same_value fast mode roundtrip failed"
2562        );
2563    }
2564
2565    #[test]
2566    fn test_generate_training_samples_degenerate() {
2567        // Verify that generate_training_samples falls back to fixed-size chunks
2568        // when 0x00 splitting produces degenerate samples (avg < 8 bytes).
2569        let mut data = vec![0x02u8]; // one non-zero byte
2570        data.extend_from_slice(&[0x00; 9999]); // 9999 zero bytes
2571        let samples = generate_training_samples(&data, 1024);
2572        // Must fall back to fixed-size chunks, not degenerate 0x00-split.
2573        let avg_len = samples.iter().map(|s| s.len()).sum::<usize>() / samples.len();
2574        assert!(
2575            avg_len >= 8,
2576            "training samples average size should be >= 8, got {avg_len}"
2577        );
2578    }
2579
2580    #[test]
2581    fn null_heavy_codec_roundtrip_fast() {
2582        // Regression: null-heavy NDJSON (30+ rows with all-null columns) caused CRC mismatch.
2583        // Python json.dumps produces spaces after colons: {"id": 0, "val": null}
2584        let mut data = Vec::new();
2585        for i in 0..30 {
2586            data.extend_from_slice(format!("{{\"id\": {}, \"val\": null}}\n", i).as_bytes());
2587        }
2588        let mut compressed = Vec::new();
2589        compress(&data, Mode::Fast, None, &mut compressed).unwrap();
2590        let decompressed = decompress(&mut std::io::Cursor::new(&compressed)).unwrap();
2591        assert_eq!(
2592            decompressed, data,
2593            "null-heavy 30-row fast mode roundtrip failed"
2594        );
2595    }
2596
2597    #[test]
2598    fn null_heavy_codec_roundtrip_balanced() {
2599        let mut data = Vec::new();
2600        for i in 0..30 {
2601            data.extend_from_slice(format!("{{\"id\": {}, \"val\": null}}\n", i).as_bytes());
2602        }
2603        let mut compressed = Vec::new();
2604        compress(&data, Mode::Balanced, None, &mut compressed).unwrap();
2605        let decompressed = decompress(&mut std::io::Cursor::new(&compressed)).unwrap();
2606        assert_eq!(
2607            decompressed, data,
2608            "null-heavy 30-row balanced mode roundtrip failed"
2609        );
2610    }
2611
2612    #[test]
2613    fn gharchive_selective_roundtrip() {
2614        // Verify GH Archive roundtrip with selective columnar transform.
2615        let path = concat!(
2616            env!("CARGO_MANIFEST_DIR"),
2617            "/../../corpus/json-bench/gharchive-10mb.ndjson"
2618        );
2619        let data = match std::fs::read(path) {
2620            Ok(d) => d,
2621            Err(_) => return, // Skip if corpus not available
2622        };
2623        let mut compressed = Vec::new();
2624        compress(
2625            &data,
2626            Mode::Fast,
2627            Some(crate::dcx::FormatHint::Ndjson),
2628            &mut compressed,
2629        )
2630        .unwrap();
2631        let decompressed = decompress(&mut std::io::Cursor::new(&compressed)).unwrap();
2632        assert_eq!(
2633            decompressed, data,
2634            "GH Archive selective columnar roundtrip failed"
2635        );
2636    }
2637}