Skip to main content

datacortex_core/
codec.rs

1//! Codec orchestrator — compress and decompress through the DataCortex pipeline.
2//!
3//! Phase 1: Format preprocessing + zstd (Fast mode).
4//! Phase 3: Full CM engine with higher-order models + mixer + APM (Balanced mode, ~256MB).
5//! Phase 5: Full CM engine with 2x context maps (Max mode, ~512MB).
6//! Phase 6: Dual-path CM + LLM with MetaMixer (Max mode with `neural` feature).
7//! Phase 7: Dual-path CM + GRU byte-level predictor (Balanced mode).
8
9use std::io::{self, Cursor, Read, Write};
10
11use crate::dcx::{DcxHeader, FormatHint, Mode};
12use crate::entropy::arithmetic::{ArithmeticDecoder, ArithmeticEncoder};
13use crate::format::transform::TransformChain;
14use crate::format::{detect_format, preprocess, reverse_preprocess};
15use crate::mixer::MetaMixer;
16use crate::model::gru_model::GruModel;
17use crate::model::{CMConfig, CMEngine};
18
19/// Adaptive zstd level for Fast mode based on preprocessed data size.
20///
21/// Smaller data compresses quickly even at high levels, so we use higher
22/// zstd levels for small-medium files without meaningful speed impact.
23/// If `level_override` is set (user passed --level), it always wins.
24fn adaptive_fast_level(data_size: usize, level_override: Option<i32>) -> i32 {
25    if let Some(level) = level_override {
26        return level; // User explicitly set level, respect it
27    }
28    match data_size {
29        0..=1_048_576 => 19,          // <1MB: zstd-19 is <50ms, use best ratio
30        1_048_577..=10_485_760 => 13, // 1MB-10MB: good balance
31        _ => 9,                       // >10MB: use level 9 for speed
32    }
33}
34
35// ─── Zstd Dictionary Training (Fast mode) ─────────────────────────────────────
36
37/// Minimum preprocessed data size to attempt dictionary training.
38/// Below this threshold the dictionary overhead exceeds any savings.
39const DICT_MIN_DATA_SIZE: usize = 8192;
40
41/// Target chunk size for splitting preprocessed data before per-chunk compression.
42/// Each chunk is compressed independently with the shared dictionary.
43/// Smaller chunks benefit more from dictionary priming, but each chunk has
44/// framing overhead (4 bytes size + zstd frame header ~10 bytes).
45/// Adaptive: scale with data size to avoid too many chunks.
46fn dict_chunk_size(data_len: usize) -> usize {
47    if data_len > 4_194_304 {
48        131_072 // 128 KB for > 4 MB
49    } else if data_len > 1_048_576 {
50        65_536 // 64 KB for 1 - 4 MB
51    } else if data_len > 262_144 {
52        32_768 // 32 KB for 256 KB - 1 MB
53    } else {
54        16_384 // 16 KB for smaller files
55    }
56}
57
58/// Maximum dictionary size based on input data size.
59/// Kept relatively small to minimize overhead. The dictionary primes each chunk's
60/// compressor context, so even a small dict provides most of the benefit.
61fn dict_max_size(data_len: usize) -> usize {
62    if data_len > 4_194_304 {
63        16_384 // 16 KB for > 4 MB
64    } else if data_len > 1_048_576 {
65        8_192 // 8 KB for 1 - 4 MB
66    } else {
67        4_096 // 4 KB for smaller files
68    }
69}
70
71/// Generate training samples from the data for dictionary training.
72///
73/// Uses column boundaries (0x00 separators) if available, otherwise fixed blocks.
74/// These samples are only used for `zstd::dict::from_samples`, NOT for the
75/// actual chunked compression (which uses `split_into_chunks`).
76fn generate_training_samples(data: &[u8], chunk_size: usize) -> Vec<&[u8]> {
77    // Try column boundaries (0x00 separators from columnar transform).
78    let col_chunks: Vec<&[u8]> = data.split(|&b| b == 0x00).collect();
79    if col_chunks.len() >= 5 {
80        let non_empty: Vec<&[u8]> = col_chunks.into_iter().filter(|c| !c.is_empty()).collect();
81        // Validate that the split produced reasonable samples. If the data is
82        // typed-encoded binary (not columnar text), 0x00 bytes are varint
83        // zeros, not column separators. Splitting on them creates thousands
84        // of tiny fragments that crash zstd dictionary training. Require
85        // non-empty samples with a minimum average size of 8 bytes.
86        if !non_empty.is_empty() {
87            let avg_len = non_empty.iter().map(|c| c.len()).sum::<usize>() / non_empty.len();
88            if avg_len >= 8 {
89                return non_empty;
90            }
91        }
92    }
93
94    // Fall back to fixed-size blocks for training.
95    split_into_chunks(data, chunk_size)
96}
97
98/// Split data into fixed-size chunks for per-chunk compression.
99/// Every byte is preserved exactly -- no bytes are lost at boundaries.
100fn split_into_chunks(data: &[u8], chunk_size: usize) -> Vec<&[u8]> {
101    let mut chunks = Vec::new();
102    let mut offset = 0;
103    while offset < data.len() {
104        let end = (offset + chunk_size).min(data.len());
105        chunks.push(&data[offset..end]);
106        offset = end;
107    }
108    chunks
109}
110
111/// Attempt chunk-based dictionary compression.
112///
113/// 1. Split data into chunks
114/// 2. Train a zstd dictionary on the chunks
115/// 3. Compress each chunk independently using the trained dictionary
116/// 4. Return the dict + all compressed chunks as a payload
117///
118/// Returns `Some(payload)` if the total is smaller than `plain_size`, else `None`.
119fn try_dict_compress(data: &[u8], level: i32, plain_size: usize) -> Option<Vec<u8>> {
120    let chunk_size = dict_chunk_size(data.len());
121
122    // Generate training samples (may use column boundaries for better diversity).
123    let training_samples = generate_training_samples(data, chunk_size);
124    if training_samples.len() < 5 {
125        return None;
126    }
127
128    let max_dict = dict_max_size(data.len());
129
130    // Train dictionary from the training samples.
131    let dict = zstd::dict::from_samples(&training_samples, max_dict).ok()?;
132    if dict.is_empty() {
133        return None;
134    }
135
136    // Split data into fixed-size chunks for per-chunk compression.
137    let chunks = split_into_chunks(data, chunk_size);
138
139    // Compress each chunk independently with the dictionary.
140    let mut compressor = zstd::bulk::Compressor::with_dictionary(level, &dict).ok()?;
141    let mut compressed_chunks: Vec<Vec<u8>> = Vec::with_capacity(chunks.len());
142    for chunk in &chunks {
143        let cc = compressor.compress(chunk).ok()?;
144        compressed_chunks.push(cc);
145    }
146
147    // Build payload:
148    //   [dict_size: u32 LE] [dict_bytes]
149    //   [num_chunks: u32 LE]
150    //   for each chunk: [chunk_compressed_size: u32 LE] [chunk_data]
151    let total_compressed: usize = compressed_chunks.iter().map(|c| 4 + c.len()).sum();
152    let payload_size = 4 + dict.len() + 4 + total_compressed;
153
154    // Only use dict if it beats plain compression.
155    if payload_size >= plain_size {
156        return None;
157    }
158
159    let mut payload = Vec::with_capacity(payload_size);
160    payload.extend_from_slice(&(dict.len() as u32).to_le_bytes());
161    payload.extend_from_slice(&dict);
162    payload.extend_from_slice(&(compressed_chunks.len() as u32).to_le_bytes());
163    for cc in &compressed_chunks {
164        payload.extend_from_slice(&(cc.len() as u32).to_le_bytes());
165        payload.extend_from_slice(cc);
166    }
167
168    Some(payload)
169}
170
171/// Decompress a chunk-based dictionary-compressed payload.
172///
173/// Payload format:
174///   [dict_size: u32 LE] [dict_bytes]
175///   [num_chunks: u32 LE]
176///   for each chunk: [chunk_compressed_size: u32 LE] [chunk_data]
177///
178/// Chunks are decompressed individually and concatenated.
179fn decompress_with_dict(payload: &[u8], capacity: usize) -> std::io::Result<Vec<u8>> {
180    if payload.len() < 4 {
181        return Err(io::Error::new(
182            io::ErrorKind::InvalidData,
183            "dict payload too short for dict_size",
184        ));
185    }
186    let mut pos = 0;
187
188    // Read dictionary.
189    let dict_size =
190        u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
191    pos += 4;
192    if payload.len() < pos + dict_size {
193        return Err(io::Error::new(
194            io::ErrorKind::InvalidData,
195            "dict payload truncated: dictionary bytes",
196        ));
197    }
198    let dict_bytes = &payload[pos..pos + dict_size];
199    pos += dict_size;
200
201    // Read number of chunks.
202    if payload.len() < pos + 4 {
203        return Err(io::Error::new(
204            io::ErrorKind::InvalidData,
205            "dict payload truncated: num_chunks",
206        ));
207    }
208    let num_chunks =
209        u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
210    pos += 4;
211
212    // Prepare decompressor with dictionary.
213    let mut decompressor = zstd::bulk::Decompressor::with_dictionary(dict_bytes)
214        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
215
216    let mut output = Vec::with_capacity(capacity);
217
218    for i in 0..num_chunks {
219        if payload.len() < pos + 4 {
220            return Err(io::Error::new(
221                io::ErrorKind::InvalidData,
222                format!("dict payload truncated: chunk {i} size"),
223            ));
224        }
225        let chunk_size =
226            u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
227        pos += 4;
228        if payload.len() < pos + chunk_size {
229            return Err(io::Error::new(
230                io::ErrorKind::InvalidData,
231                format!("dict payload truncated: chunk {i} data"),
232            ));
233        }
234        let chunk_data = &payload[pos..pos + chunk_size];
235        pos += chunk_size;
236
237        // Each chunk decompresses to at most chunk_size + some headroom.
238        let chunk_capacity = capacity.saturating_sub(output.len());
239        let decompressed = decompressor
240            .decompress(chunk_data, chunk_capacity)
241            .map_err(|e| {
242                io::Error::new(
243                    io::ErrorKind::InvalidData,
244                    format!("chunk {i} decompress failed: {e}"),
245                )
246            })?;
247        output.extend_from_slice(&decompressed);
248    }
249
250    Ok(output)
251}
252
253// ─── Brotli helpers (Fast mode auto-fallback) ─────────────────────────────────
254
255/// Brotli mode constants for `brotli_compress`.
256/// GENERIC (0): default, best for binary/preprocessed data.
257/// TEXT (1): optimized for UTF-8 text, better for raw JSON.
258const BROTLI_MODE_GENERIC: u32 = 0;
259const BROTLI_MODE_TEXT: u32 = 1;
260
261/// Compress `data` with brotli at the given quality (0-11) and mode.
262/// Use `BROTLI_MODE_TEXT` for raw UTF-8/JSON, `BROTLI_MODE_GENERIC` for preprocessed data.
263fn brotli_compress(data: &[u8], quality: u32, mode: u32) -> io::Result<Vec<u8>> {
264    use brotli::enc::backward_references::BrotliEncoderMode;
265    let mut output = Vec::new();
266    let brotli_mode = match mode {
267        1 => BrotliEncoderMode::BROTLI_MODE_TEXT,
268        _ => BrotliEncoderMode::BROTLI_MODE_GENERIC,
269    };
270    let params = brotli::enc::BrotliEncoderParams {
271        quality: quality as i32,
272        mode: brotli_mode,
273        ..Default::default()
274    };
275    brotli::BrotliCompress(&mut io::Cursor::new(data), &mut output, &params)?;
276    Ok(output)
277}
278
279/// Decompress a brotli stream. `max_size` is a capacity hint for the output buffer.
280fn brotli_decompress(data: &[u8]) -> io::Result<Vec<u8>> {
281    let mut output = Vec::new();
282    brotli::BrotliDecompress(&mut io::Cursor::new(data), &mut output)?;
283    Ok(output)
284}
285
286/// Compress data using the CM engine with the given configuration.
287/// Returns the compressed byte stream.
288fn cm_compress(data: &[u8], config: CMConfig) -> Vec<u8> {
289    let mut engine = CMEngine::with_config(config);
290    let mut encoder = ArithmeticEncoder::new();
291
292    for &byte in data {
293        for bpos in 0..8 {
294            let bit = (byte >> (7 - bpos)) & 1;
295            let p = engine.predict();
296            encoder.encode(bit, p);
297            engine.update(bit);
298        }
299    }
300
301    encoder.finish()
302}
303
304/// Decompress data using the CM engine with the given configuration.
305/// `compressed` is the arithmetic-coded stream, `original_size` is the expected output length.
306fn cm_decompress(compressed: &[u8], original_size: usize, config: CMConfig) -> Vec<u8> {
307    let mut engine = CMEngine::with_config(config);
308    let mut decoder = ArithmeticDecoder::new(compressed);
309    let mut output = Vec::with_capacity(original_size);
310
311    for _ in 0..original_size {
312        let mut byte_val: u8 = 0;
313        for bpos in 0..8 {
314            let p = engine.predict();
315            let bit = decoder.decode(p);
316            engine.update(bit);
317            byte_val |= bit << (7 - bpos);
318        }
319        output.push(byte_val);
320    }
321
322    output
323}
324
325// ─── GRU dual-path (CM + GRU byte predictor) ────────────────────────────────
326// The GRU provides a DIFFERENT signal from CM: byte-level cross-bit correlations.
327// It's blended AFTER the full CM pipeline via MetaMixer.
328// CRITICAL: encoder and decoder must produce IDENTICAL GRU + CM state.
329
330/// Compress using dual-path: CM engine + GRU byte predictor + MetaMixer.
331/// Used for Balanced mode.
332fn gru_compress(data: &[u8], config: CMConfig) -> Vec<u8> {
333    let mut engine = CMEngine::with_config(config);
334    let mut gru = GruModel::new();
335    let mut meta_mixer = MetaMixer::new(12); // 12% GRU weight
336    let mut encoder = ArithmeticEncoder::new();
337
338    let total_bytes = data.len();
339    let report_interval = if total_bytes > 100_000 {
340        total_bytes / 20
341    } else {
342        0
343    };
344
345    for (byte_idx, &byte) in data.iter().enumerate() {
346        for bpos in 0..8u8 {
347            let bit = (byte >> (7 - bpos)) & 1;
348
349            // CM prediction (full pipeline: 19 models + mixer + 7 APM).
350            let p_cm = engine.predict();
351
352            // GRU bit prediction from cached byte probs.
353            let partial = if bpos == 0 {
354                1u32
355            } else {
356                let mut p = 1u32;
357                for prev_bpos in 0..bpos {
358                    let prev_bit = (byte >> (7 - prev_bpos)) & 1;
359                    p = (p << 1) | prev_bit as u32;
360                }
361                p
362            };
363            let p_gru = gru.predict_bit(bpos, partial);
364
365            // MetaMixer blend.
366            let p_final = meta_mixer.blend(p_cm, p_gru);
367
368            encoder.encode(bit, p_final);
369            engine.update(bit);
370            meta_mixer.update(bit);
371        }
372
373        // Byte complete: train GRU on observed byte, then forward for next prediction.
374        gru.train(byte);
375        gru.forward(byte);
376
377        if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
378            let pct = (byte_idx + 1) * 100 / total_bytes;
379            eprint!("\r[gru] compressing... {pct}%");
380        }
381    }
382
383    if total_bytes > 100_000 {
384        eprintln!("\r[gru] compressing... 100%");
385    }
386
387    encoder.finish()
388}
389
390/// Decompress using dual-path: CM engine + GRU byte predictor + MetaMixer.
391/// Must produce IDENTICAL GRU + CM state as the encoder.
392fn gru_decompress(compressed: &[u8], original_size: usize, config: CMConfig) -> Vec<u8> {
393    let mut engine = CMEngine::with_config(config);
394    let mut gru = GruModel::new();
395    let mut meta_mixer = MetaMixer::new(12); // same 12% as encoder
396    let mut decoder = ArithmeticDecoder::new(compressed);
397    let mut output = Vec::with_capacity(original_size);
398
399    let report_interval = if original_size > 100_000 {
400        original_size / 20
401    } else {
402        0
403    };
404
405    for byte_idx in 0..original_size {
406        let mut byte_val: u8 = 0;
407
408        for bpos in 0..8u8 {
409            // CM prediction.
410            let p_cm = engine.predict();
411
412            // GRU bit prediction (same partial byte state as encoder).
413            let partial = if bpos == 0 {
414                1u32
415            } else {
416                let mut p = 1u32;
417                for prev_bpos in 0..bpos {
418                    let prev_bit = (byte_val >> (7 - prev_bpos)) & 1;
419                    p = (p << 1) | prev_bit as u32;
420                }
421                p
422            };
423            let p_gru = gru.predict_bit(bpos, partial);
424
425            // MetaMixer blend.
426            let p_final = meta_mixer.blend(p_cm, p_gru);
427
428            let bit = decoder.decode(p_final);
429            engine.update(bit);
430            meta_mixer.update(bit);
431            byte_val |= bit << (7 - bpos);
432        }
433
434        output.push(byte_val);
435
436        // Byte complete: train GRU then forward (same as encoder).
437        gru.train(byte_val);
438        gru.forward(byte_val);
439
440        if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
441            let pct = (byte_idx + 1) * 100 / original_size;
442            eprint!("\r[gru] decompressing... {pct}%");
443        }
444    }
445
446    if original_size > 100_000 {
447        eprintln!("\r[gru] decompressing... 100%");
448    }
449
450    output
451}
452
453// ─── Neural dual-path (CM + LLM) ─────────────────────────────────────────────
454// Feature-gated: only available when `neural` is enabled.
455// The LLM predictor runs alongside the CM engine. A MetaMixer blends them.
456// CRITICAL: encoder and decoder must produce IDENTICAL LLM + CM state.
457
458/// Compress using dual-path: CM engine + LLM predictor + MetaMixer.
459/// Only used for Max mode with neural feature enabled.
460#[cfg(feature = "neural")]
461fn neural_compress(
462    data: &[u8],
463    config: CMConfig,
464    llm: &mut datacortex_neural::LlmPredictor,
465    meta_mixer: &mut datacortex_neural::MetaMixer,
466) -> Vec<u8> {
467    let mut engine = CMEngine::with_config(config);
468    let mut encoder = ArithmeticEncoder::new();
469
470    // For the first byte, LLM has no context. Feed a zero byte to prime it.
471    // We need the LLM to have predicted byte probs BEFORE we start encoding.
472    // Strategy: process byte-by-byte. After encoding byte N, feed byte N to LLM
473    // to get predictions for byte N+1.
474
475    let total_bytes = data.len();
476    let mut bytes_processed = 0;
477    let report_interval = total_bytes / 20; // Report every 5%.
478
479    for (byte_idx, &byte) in data.iter().enumerate() {
480        // At this point, LLM has been fed bytes 0..byte_idx-1.
481        // LLM's cached_byte_probs predict byte_idx.
482
483        for bpos in 0..8u8 {
484            let bit = (byte >> (7 - bpos)) & 1;
485
486            // CM prediction.
487            let p_cm = engine.predict();
488
489            // LLM bit prediction.
490            // c0 is the partial byte being built: starts at 1, accumulates bits.
491            let partial = if bpos == 0 {
492                1u32
493            } else {
494                // Build partial from the bits we've already encoded for this byte.
495                let mut p = 1u32;
496                for prev_bpos in 0..bpos {
497                    let prev_bit = (byte >> (7 - prev_bpos)) & 1;
498                    p = (p << 1) | prev_bit as u32;
499                }
500                p
501            };
502            let p_llm = llm.predict_bit(bpos, partial);
503
504            // Meta-mixer blend.
505            let p_final = meta_mixer.blend(p_cm, p_llm);
506
507            encoder.encode(bit, p_final);
508            engine.update(bit);
509            meta_mixer.update(bit);
510        }
511
512        // Feed the completed byte to the LLM for next-byte prediction.
513        if let Err(e) = llm.predict_byte_probs(byte) {
514            // If LLM fails, it will return uniform on next call. Log but don't abort.
515            if byte_idx < 5 {
516                eprintln!("[neural] LLM predict error at byte {byte_idx}: {e}");
517            }
518        }
519
520        bytes_processed += 1;
521        if report_interval > 0 && bytes_processed % report_interval == 0 {
522            let pct = bytes_processed * 100 / total_bytes;
523            eprint!("\r[neural] compressing... {pct}%");
524        }
525    }
526
527    if total_bytes > 1000 {
528        eprintln!("\r[neural] compressing... 100%");
529    }
530
531    encoder.finish()
532}
533
534/// Decompress using dual-path: CM engine + LLM predictor + MetaMixer.
535/// Must produce IDENTICAL LLM + CM state as the encoder.
536#[cfg(feature = "neural")]
537fn neural_decompress(
538    compressed: &[u8],
539    original_size: usize,
540    config: CMConfig,
541    llm: &mut datacortex_neural::LlmPredictor,
542    meta_mixer: &mut datacortex_neural::MetaMixer,
543) -> Vec<u8> {
544    let mut engine = CMEngine::with_config(config);
545    let mut decoder = ArithmeticDecoder::new(compressed);
546    let mut output = Vec::with_capacity(original_size);
547
548    let report_interval = if original_size > 0 {
549        original_size / 20
550    } else {
551        1
552    };
553
554    for byte_idx in 0..original_size {
555        let mut byte_val: u8 = 0;
556
557        for bpos in 0..8u8 {
558            // CM prediction.
559            let p_cm = engine.predict();
560
561            // LLM bit prediction (using same partial byte state as encoder).
562            let partial = if bpos == 0 {
563                1u32
564            } else {
565                // Build partial from bits already decoded for this byte.
566                let mut p = 1u32;
567                for prev_bpos in 0..bpos {
568                    let prev_bit = (byte_val >> (7 - prev_bpos)) & 1;
569                    p = (p << 1) | prev_bit as u32;
570                }
571                p
572            };
573            let p_llm = llm.predict_bit(bpos, partial);
574
575            // Meta-mixer blend.
576            let p_final = meta_mixer.blend(p_cm, p_llm);
577
578            let bit = decoder.decode(p_final);
579            engine.update(bit);
580            meta_mixer.update(bit);
581            byte_val |= bit << (7 - bpos);
582        }
583
584        output.push(byte_val);
585
586        // Feed decoded byte to LLM (same as encoder did).
587        if let Err(e) = llm.predict_byte_probs(byte_val) {
588            if byte_idx < 5 {
589                eprintln!("[neural] LLM predict error at byte {byte_idx}: {e}");
590            }
591        }
592
593        if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
594            let pct = (byte_idx + 1) * 100 / original_size;
595            eprint!("\r[neural] decompressing... {pct}%");
596        }
597    }
598
599    if original_size > 1000 {
600        eprintln!("\r[neural] decompressing... 100%");
601    }
602
603    output
604}
605
606/// Get the CMConfig for a given mode.
607fn cm_config_for_mode(mode: Mode) -> CMConfig {
608    match mode {
609        Mode::Max => CMConfig::max(),
610        Mode::Balanced => CMConfig::balanced(),
611        Mode::Fast => CMConfig::balanced(), // not used for Fast, but keeps API clean
612    }
613}
614
615/// Resolve the model path from:
616/// 1. Explicit path (--model-path CLI flag)
617/// 2. DATACORTEX_MODEL environment variable
618/// 3. Default: ~/.datacortex/models/SmolLM2-135M-Instruct-Q8_0.gguf
619#[cfg(feature = "neural")]
620fn resolve_model_path(explicit: Option<&str>) -> Option<String> {
621    if let Some(p) = explicit {
622        if std::path::Path::new(p).exists() {
623            return Some(p.to_string());
624        }
625        eprintln!("[neural] explicit model path not found: {p}");
626        return None;
627    }
628
629    if let Ok(p) = std::env::var("DATACORTEX_MODEL") {
630        if p.is_empty() {
631            // Explicitly set to empty = disable neural.
632            return None;
633        }
634        if std::path::Path::new(&p).exists() {
635            return Some(p);
636        }
637        eprintln!("[neural] DATACORTEX_MODEL path not found: {p}");
638        return None; // Don't fall through to default.
639    }
640
641    // Default location.
642    if let Some(home) = std::env::var_os("HOME") {
643        let default = format!(
644            "{}/.datacortex/models/SmolLM2-135M-Instruct-Q8_0.gguf",
645            home.to_string_lossy()
646        );
647        if std::path::Path::new(&default).exists() {
648            return Some(default);
649        }
650    }
651
652    None
653}
654
655/// Train a zstd dictionary from multiple sample files.
656///
657/// Each sample should be a complete JSON/NDJSON file's bytes. The function
658/// splits them into training fragments and calls `zstd::dict::from_samples`.
659///
660/// `max_dict_size` controls the max dictionary size in bytes (typical: 32768-131072).
661/// Returns the trained dictionary bytes.
662pub fn train_dict(samples: &[&[u8]], max_dict_size: usize) -> io::Result<Vec<u8>> {
663    if samples.is_empty() {
664        return Err(io::Error::other(
665            "no samples provided for dictionary training",
666        ));
667    }
668
669    // Collect training fragments: split each sample into reasonable chunks.
670    let mut fragments: Vec<&[u8]> = Vec::new();
671    for sample in samples {
672        if sample.is_empty() {
673            continue;
674        }
675        // For NDJSON: split by newlines (each line is a training fragment).
676        let lines: Vec<&[u8]> = sample
677            .split(|&b| b == b'\n')
678            .filter(|l| !l.is_empty())
679            .collect();
680        if lines.len() >= 5 {
681            fragments.extend(lines);
682        } else {
683            // For non-NDJSON: use fixed-size blocks.
684            let chunk_size = 4096.min(sample.len());
685            let mut offset = 0;
686            while offset < sample.len() {
687                let end = (offset + chunk_size).min(sample.len());
688                fragments.push(&sample[offset..end]);
689                offset = end;
690            }
691        }
692    }
693
694    if fragments.len() < 5 {
695        return Err(io::Error::other(
696            "not enough training data (need at least 5 fragments)",
697        ));
698    }
699
700    let dict = zstd::dict::from_samples(&fragments, max_dict_size)
701        .map_err(|e| io::Error::other(format!("dictionary training failed: {e}")))?;
702
703    if dict.is_empty() {
704        return Err(io::Error::other(
705            "dictionary training produced empty dictionary",
706        ));
707    }
708
709    Ok(dict)
710}
711
712/// Compress `data` into .dcx format, writing to `output`.
713pub fn compress<W: Write>(
714    data: &[u8],
715    mode: Mode,
716    format_override: Option<FormatHint>,
717    output: &mut W,
718) -> io::Result<()> {
719    compress_with_model(data, mode, format_override, None, output)
720}
721
722/// Compress with optional explicit model path (for neural Max mode).
723pub fn compress_with_model<W: Write>(
724    data: &[u8],
725    mode: Mode,
726    format_override: Option<FormatHint>,
727    model_path: Option<&str>,
728    output: &mut W,
729) -> io::Result<()> {
730    compress_with_options(data, mode, format_override, model_path, None, output)
731}
732
733/// Compress with optional explicit model path and zstd level override.
734pub fn compress_with_options<W: Write>(
735    data: &[u8],
736    mode: Mode,
737    format_override: Option<FormatHint>,
738    model_path: Option<&str>,
739    zstd_level_override: Option<i32>,
740    output: &mut W,
741) -> io::Result<()> {
742    compress_with_full_options(
743        data,
744        mode,
745        format_override,
746        model_path,
747        zstd_level_override,
748        None,
749        output,
750    )
751}
752
753/// Compress with all options including external dictionary.
754pub fn compress_with_full_options<W: Write>(
755    data: &[u8],
756    mode: Mode,
757    format_override: Option<FormatHint>,
758    model_path: Option<&str>,
759    zstd_level_override: Option<i32>,
760    external_dict: Option<&[u8]>,
761    output: &mut W,
762) -> io::Result<()> {
763    let format_hint = format_override.unwrap_or_else(|| detect_format(data));
764    let crc = crc32fast::hash(data);
765
766    // Step 1: Format-aware preprocessing.
767    let (preprocessed, chain) = preprocess(data, format_hint, mode);
768    let transform_metadata = if chain.is_empty() {
769        vec![]
770    } else {
771        chain.serialize()
772    };
773
774    // Step 2: Compress with engine.
775    let mut use_dict = false;
776    let mut use_brotli = false;
777    // Track whether raw fallback won (empty transform chain).
778    let mut use_raw_fallback = false;
779    // Track whether metadata is embedded in the compressed stream.
780    let mut use_meta_embedded = false;
781    let compressed = match mode {
782        // Fast mode: auto-fallback — try preprocessed+zstd, raw+zstd, raw+brotli,
783        // preprocessed+brotli, and embedded-metadata+brotli. Keep whichever produces
784        // the smallest output (including header and metadata overhead).
785        //
786        // Preprocessing (columnar + typed encoding) usually helps zstd by grouping
787        // similar values. But for some files (e.g. citm_catalog.json with extreme
788        // repetition), raw zstd without preprocessing gives MUCH better results
789        // because preprocessing removes the repetition patterns zstd's LZ77 exploits.
790        //
791        // Brotli at quality 11 can beat zstd on some JSON files (e.g. twitter.json)
792        // because its context modeling handles certain data patterns better.
793        //
794        // For small files with transforms, embedding metadata inside the brotli stream
795        // saves the separate metadata overhead (~150 bytes), because brotli compresses
796        // the 4-byte length prefix + raw metadata nearly for free.
797        Mode::Fast => {
798            // Fast mode: auto-fallback with PARALLEL path evaluation.
799            // All 6+ compression paths run concurrently via rayon, then we
800            // keep whichever produces the smallest output.
801            use std::sync::Mutex;
802
803            let level = adaptive_fast_level(preprocessed.len(), zstd_level_override);
804            let raw_level = adaptive_fast_level(data.len(), zstd_level_override);
805
806            // Estimate compressed metadata size for fair comparison.
807            let meta_size_for_comparison = if transform_metadata.len() > 64 {
808                let compressed_meta = zstd::bulk::compress(&transform_metadata, 19)
809                    .unwrap_or_else(|_| transform_metadata.clone());
810                compressed_meta.len().min(transform_metadata.len())
811            } else {
812                transform_metadata.len()
813            };
814
815            // Build embedded metadata payload (shared read-only across threads).
816            let embedded_payload = if !transform_metadata.is_empty() {
817                let mut ep = Vec::with_capacity(4 + transform_metadata.len() + preprocessed.len());
818                ep.extend_from_slice(&(transform_metadata.len() as u32).to_le_bytes());
819                ep.extend_from_slice(&transform_metadata);
820                ep.extend_from_slice(&preprocessed);
821                Some(ep)
822            } else {
823                None
824            };
825
826            // Each path result: (compressed_bytes, total_size, use_dict, use_raw, use_brotli, use_embedded)
827            type PathResult = (Vec<u8>, usize, bool, bool, bool, bool);
828            let results = Mutex::new(Vec::<PathResult>::with_capacity(8));
829
830            rayon::scope(|s| {
831                // Path A: preprocessed + zstd (with optional dict).
832                s.spawn(|_| {
833                    if let Ok(plain) = zstd::bulk::compress(&preprocessed, level) {
834                        let (compressed, is_dict) = if let Some(ext_dict) = external_dict {
835                            // Use externally provided dictionary.
836                            let chunk_size = dict_chunk_size(preprocessed.len());
837                            let chunks = split_into_chunks(&preprocessed, chunk_size);
838                            if let Ok(mut compressor) =
839                                zstd::bulk::Compressor::with_dictionary(level, ext_dict)
840                            {
841                                let mut ok = true;
842                                let mut cc_list = Vec::with_capacity(chunks.len());
843                                for chunk in &chunks {
844                                    match compressor.compress(chunk) {
845                                        Ok(cc) => cc_list.push(cc),
846                                        Err(_) => {
847                                            ok = false;
848                                            break;
849                                        }
850                                    }
851                                }
852                                if ok {
853                                    let total_cc: usize = cc_list.iter().map(|c| 4 + c.len()).sum();
854                                    let payload_size = 4 + ext_dict.len() + 4 + total_cc;
855                                    if payload_size < plain.len() {
856                                        let mut payload = Vec::with_capacity(payload_size);
857                                        payload.extend_from_slice(
858                                            &(ext_dict.len() as u32).to_le_bytes(),
859                                        );
860                                        payload.extend_from_slice(ext_dict);
861                                        payload.extend_from_slice(
862                                            &(cc_list.len() as u32).to_le_bytes(),
863                                        );
864                                        for cc in &cc_list {
865                                            payload.extend_from_slice(
866                                                &(cc.len() as u32).to_le_bytes(),
867                                            );
868                                            payload.extend_from_slice(cc);
869                                        }
870                                        (payload, true)
871                                    } else {
872                                        (plain, false)
873                                    }
874                                } else {
875                                    (plain, false)
876                                }
877                            } else {
878                                (plain, false)
879                            }
880                        } else if preprocessed.len() >= DICT_MIN_DATA_SIZE {
881                            if let Some(dict_payload) =
882                                try_dict_compress(&preprocessed, level, plain.len())
883                            {
884                                (dict_payload, true)
885                            } else {
886                                (plain, false)
887                            }
888                        } else {
889                            (plain, false)
890                        };
891                        let total = 32 + meta_size_for_comparison + compressed.len();
892                        results
893                            .lock()
894                            .unwrap()
895                            .push((compressed, total, is_dict, false, false, false));
896                    }
897                });
898
899                // Path B: raw zstd (no preprocessing).
900                s.spawn(|_| {
901                    if let Ok(compressed) = zstd::bulk::compress(data, raw_level) {
902                        let total = 32 + compressed.len();
903                        results
904                            .lock()
905                            .unwrap()
906                            .push((compressed, total, false, true, false, false));
907                    }
908                });
909
910                // Path C: raw + brotli (TEXT mode).
911                s.spawn(|_| {
912                    let q = if data.len() <= 1_048_576 { 11 } else { 9 };
913                    if let Ok(compressed) = brotli_compress(data, q, BROTLI_MODE_TEXT) {
914                        let total = 32 + compressed.len();
915                        results
916                            .lock()
917                            .unwrap()
918                            .push((compressed, total, false, true, true, false));
919                    }
920                });
921
922                // Path D: preprocessed + brotli (GENERIC mode, dual quality).
923                s.spawn(|_| {
924                    let max_q = if preprocessed.len() <= 1_048_576 {
925                        11
926                    } else {
927                        9
928                    };
929                    let qualities: &[u32] = if max_q == 11 {
930                        &[11, 10]
931                    } else {
932                        &[max_q as u32]
933                    };
934                    let mut best: Option<PathResult> = None;
935                    for &q in qualities {
936                        if let Ok(compressed) =
937                            brotli_compress(&preprocessed, q, BROTLI_MODE_GENERIC)
938                        {
939                            let total = 32 + meta_size_for_comparison + compressed.len();
940                            if best.as_ref().is_none_or(|b| total < b.1) {
941                                best = Some((compressed, total, false, false, true, false));
942                            }
943                        }
944                    }
945                    if let Some(r) = best {
946                        results.lock().unwrap().push(r);
947                    }
948                });
949
950                // Path E: embedded metadata + brotli (GENERIC mode, dual quality).
951                if let Some(ref ep) = embedded_payload {
952                    s.spawn(|_| {
953                        let max_q = if ep.len() <= 1_048_576 { 11 } else { 9 };
954                        let qualities: &[u32] = if max_q == 11 {
955                            &[11, 10]
956                        } else {
957                            &[max_q as u32]
958                        };
959                        let mut best: Option<PathResult> = None;
960                        for &q in qualities {
961                            if let Ok(compressed) = brotli_compress(ep, q, BROTLI_MODE_GENERIC) {
962                                let total = 32 + compressed.len();
963                                if best.as_ref().is_none_or(|b| total < b.1) {
964                                    best = Some((compressed, total, false, false, true, true));
965                                }
966                            }
967                        }
968                        if let Some(r) = best {
969                            results.lock().unwrap().push(r);
970                        }
971                    });
972                }
973
974                // Path F: embedded metadata + zstd.
975                if let Some(ref ep) = embedded_payload {
976                    s.spawn(|_| {
977                        let embed_level = adaptive_fast_level(ep.len(), zstd_level_override);
978                        if let Ok(compressed) = zstd::bulk::compress(ep, embed_level) {
979                            let total = 32 + compressed.len();
980                            results
981                                .lock()
982                                .unwrap()
983                                .push((compressed, total, false, false, false, true));
984                        }
985                    });
986                }
987            });
988
989            // Pick the smallest result.
990            let results = results.into_inner().unwrap();
991            let best = results
992                .into_iter()
993                .min_by_key(|r| r.1)
994                .ok_or_else(|| io::Error::other("all compression paths failed"))?;
995
996            use_dict = best.2;
997            use_raw_fallback = best.3;
998            use_brotli = best.4;
999            use_meta_embedded = best.5;
1000            best.0
1001        }
1002        // Balanced mode: dual-path CM + GRU byte predictor.
1003        Mode::Balanced => {
1004            let config = cm_config_for_mode(mode);
1005            let cm_data = gru_compress(&preprocessed, config);
1006            let mut payload = Vec::with_capacity(8 + cm_data.len());
1007            payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
1008            payload.extend_from_slice(&cm_data);
1009            payload
1010        }
1011        // Max mode: try neural dual-path, fall back to CM-only.
1012        Mode::Max => {
1013            let config = cm_config_for_mode(mode);
1014
1015            #[cfg(feature = "neural")]
1016            {
1017                if let Some(mpath) = resolve_model_path(model_path) {
1018                    match datacortex_neural::LlmPredictor::new(&mpath) {
1019                        Ok(mut llm) => {
1020                            let mut meta_mixer = datacortex_neural::MetaMixer::new(5);
1021                            eprintln!(
1022                                "[neural] Max mode: dual-path CM+LLM ({} bytes mapped)",
1023                                llm.mapped_bytes()
1024                            );
1025                            let cm_data =
1026                                neural_compress(&preprocessed, config, &mut llm, &mut meta_mixer);
1027                            let mut payload = Vec::with_capacity(8 + cm_data.len());
1028                            // Byte 0 of the 8-byte size prefix: set bit 7 to flag neural mode.
1029                            // This lets the decompressor know to use neural path.
1030                            let size_with_flag = preprocessed.len() as u64 | (1u64 << 63);
1031                            payload.extend_from_slice(&size_with_flag.to_le_bytes());
1032                            payload.extend_from_slice(&cm_data);
1033                            payload
1034                        }
1035                        Err(e) => {
1036                            eprintln!("[neural] LLM init failed, falling back to CM-only: {e}");
1037                            let cm_data = cm_compress(&preprocessed, config);
1038                            let mut payload = Vec::with_capacity(8 + cm_data.len());
1039                            payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
1040                            payload.extend_from_slice(&cm_data);
1041                            payload
1042                        }
1043                    }
1044                } else {
1045                    eprintln!(
1046                        "[neural] no model found, Max mode using CM-only. \
1047                         Set DATACORTEX_MODEL or use --model-path."
1048                    );
1049                    let cm_data = cm_compress(&preprocessed, config);
1050                    let mut payload = Vec::with_capacity(8 + cm_data.len());
1051                    payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
1052                    payload.extend_from_slice(&cm_data);
1053                    payload
1054                }
1055            }
1056
1057            #[cfg(not(feature = "neural"))]
1058            {
1059                let _ = model_path; // suppress unused warning
1060                let cm_data = cm_compress(&preprocessed, config);
1061                let mut payload = Vec::with_capacity(8 + cm_data.len());
1062                payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
1063                payload.extend_from_slice(&cm_data);
1064                payload
1065            }
1066        }
1067    };
1068
1069    // When raw fallback or embedded metadata won, use empty header metadata.
1070    // - Raw fallback: decompressor handles empty chains (just decompresses, no reverse transforms).
1071    // - Embedded: metadata lives inside the compressed stream, not in the header.
1072    let final_metadata = if use_raw_fallback || use_meta_embedded {
1073        vec![]
1074    } else {
1075        transform_metadata
1076    };
1077
1078    // Compress metadata with zstd if it's large enough to benefit.
1079    // Small metadata (<= 64 bytes) stays raw to avoid zstd frame overhead.
1080    // Skipped when metadata is embedded (final_metadata is empty).
1081    let (header_metadata, meta_compressed) = if final_metadata.len() > 64 {
1082        let compressed_meta =
1083            zstd::bulk::compress(&final_metadata, 19).unwrap_or_else(|_| final_metadata.clone());
1084        if compressed_meta.len() < final_metadata.len() {
1085            (compressed_meta, true)
1086        } else {
1087            (final_metadata, false)
1088        }
1089    } else {
1090        (final_metadata, false)
1091    };
1092
1093    let header = DcxHeader {
1094        mode,
1095        format_hint,
1096        original_size: data.len() as u64,
1097        compressed_size: compressed.len() as u64,
1098        crc32: crc,
1099        transform_metadata: header_metadata,
1100        has_dict: use_dict,
1101        meta_compressed,
1102        use_brotli,
1103        meta_embedded: use_meta_embedded,
1104    };
1105
1106    header.write_to(output)?;
1107    output.write_all(&compressed)?;
1108
1109    Ok(())
1110}
1111
1112/// Decompress a .dcx file from `input`, returning the original data.
1113pub fn decompress<R: Read>(input: &mut R) -> io::Result<Vec<u8>> {
1114    decompress_with_model(input, None)
1115}
1116
1117/// Decompress with optional explicit model path (for neural Max mode).
1118pub fn decompress_with_model<R: Read>(
1119    input: &mut R,
1120    model_path: Option<&str>,
1121) -> io::Result<Vec<u8>> {
1122    let header = DcxHeader::read_from(input)?;
1123
1124    let mut compressed = vec![0u8; header.compressed_size as usize];
1125    input.read_exact(&mut compressed)?;
1126
1127    // Step 1: Decompress with engine.
1128    let preprocessed = match header.mode {
1129        Mode::Fast => {
1130            if header.use_brotli {
1131                brotli_decompress(&compressed)?
1132            } else {
1133                let capacity = header.original_size as usize * 2 + 65536;
1134                if header.has_dict {
1135                    decompress_with_dict(&compressed, capacity)?
1136                } else {
1137                    zstd::bulk::decompress(&compressed, capacity)
1138                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
1139                }
1140            }
1141        }
1142        Mode::Balanced => {
1143            // Balanced mode: dual-path CM + GRU byte predictor.
1144            if compressed.len() < 8 {
1145                return Err(io::Error::new(
1146                    io::ErrorKind::InvalidData,
1147                    "CM mode compressed data too short",
1148                ));
1149            }
1150            let size_raw = u64::from_le_bytes(compressed[..8].try_into().expect("8-byte slice"));
1151            let preprocessed_size = (size_raw & !(1u64 << 63)) as usize;
1152            let config = cm_config_for_mode(header.mode);
1153            gru_decompress(&compressed[8..], preprocessed_size, config)
1154        }
1155        Mode::Max => {
1156            // Max mode: may use neural (LLM) dual-path or CM-only.
1157            if compressed.len() < 8 {
1158                return Err(io::Error::new(
1159                    io::ErrorKind::InvalidData,
1160                    "CM mode compressed data too short",
1161                ));
1162            }
1163            let size_raw = u64::from_le_bytes(compressed[..8].try_into().expect("8-byte slice"));
1164
1165            // Check if bit 63 is set (neural flag).
1166            let neural_flag = size_raw & (1u64 << 63) != 0;
1167            let preprocessed_size = (size_raw & !(1u64 << 63)) as usize;
1168            let config = cm_config_for_mode(header.mode);
1169
1170            if neural_flag {
1171                #[cfg(feature = "neural")]
1172                {
1173                    if let Some(mpath) = resolve_model_path(model_path) {
1174                        match datacortex_neural::LlmPredictor::new(&mpath) {
1175                            Ok(mut llm) => {
1176                                let mut meta_mixer = datacortex_neural::MetaMixer::new(5);
1177                                eprintln!(
1178                                    "[neural] decompressing with dual-path CM+LLM ({} bytes mapped)",
1179                                    llm.mapped_bytes()
1180                                );
1181                                neural_decompress(
1182                                    &compressed[8..],
1183                                    preprocessed_size,
1184                                    config,
1185                                    &mut llm,
1186                                    &mut meta_mixer,
1187                                )
1188                            }
1189                            Err(e) => {
1190                                return Err(io::Error::new(
1191                                    io::ErrorKind::Other,
1192                                    format!(
1193                                        "file was compressed with neural mode but LLM failed to load: {e}"
1194                                    ),
1195                                ));
1196                            }
1197                        }
1198                    } else {
1199                        return Err(io::Error::new(
1200                            io::ErrorKind::Other,
1201                            "file was compressed with neural mode but no model found. \
1202                             Set DATACORTEX_MODEL or use --model-path.",
1203                        ));
1204                    }
1205                }
1206
1207                #[cfg(not(feature = "neural"))]
1208                {
1209                    let _ = model_path;
1210                    return Err(io::Error::other(
1211                        "file was compressed with neural mode but this build lacks the \
1212                         `neural` feature. Rebuild with --features neural.",
1213                    ));
1214                }
1215            } else {
1216                cm_decompress(&compressed[8..], preprocessed_size, config)
1217            }
1218        }
1219    };
1220
1221    // Step 1.5: Handle embedded metadata OR separate metadata.
1222    // When meta_embedded is set, the decompressed stream starts with:
1223    //   [meta_len: u32 LE][raw_metadata][preprocessed_data]
1224    // We extract the metadata and the actual preprocessed data from the stream.
1225    let (preprocessed, transform_metadata) = if header.meta_embedded {
1226        if preprocessed.len() < 4 {
1227            return Err(io::Error::new(
1228                io::ErrorKind::InvalidData,
1229                "embedded metadata: decompressed stream too short for meta_len",
1230            ));
1231        }
1232        let meta_len =
1233            u32::from_le_bytes(preprocessed[0..4].try_into().expect("4-byte slice")) as usize;
1234        if preprocessed.len() < 4 + meta_len {
1235            return Err(io::Error::new(
1236                io::ErrorKind::InvalidData,
1237                format!(
1238                    "embedded metadata: stream too short for metadata ({} bytes needed, {} available)",
1239                    4 + meta_len,
1240                    preprocessed.len()
1241                ),
1242            ));
1243        }
1244        let metadata = preprocessed[4..4 + meta_len].to_vec();
1245        let actual_preprocessed = preprocessed[4 + meta_len..].to_vec();
1246        (actual_preprocessed, metadata)
1247    } else {
1248        // Decompress metadata if it was zstd-compressed (separate metadata path).
1249        // Use streaming decoder to avoid guessing decompressed size.
1250        let tm = if header.meta_compressed && !header.transform_metadata.is_empty() {
1251            let mut decoder =
1252                zstd::Decoder::new(Cursor::new(&header.transform_metadata)).map_err(|e| {
1253                    io::Error::new(
1254                        io::ErrorKind::InvalidData,
1255                        format!("failed to init metadata decompressor: {e}"),
1256                    )
1257                })?;
1258            let mut decompressed_meta = Vec::new();
1259            decoder.read_to_end(&mut decompressed_meta).map_err(|e| {
1260                io::Error::new(
1261                    io::ErrorKind::InvalidData,
1262                    format!("failed to decompress transform metadata: {e}"),
1263                )
1264            })?;
1265            decompressed_meta
1266        } else {
1267            header.transform_metadata.clone()
1268        };
1269        (preprocessed, tm)
1270    };
1271
1272    // Step 2: Reverse preprocessing.
1273    let data = if transform_metadata.is_empty() {
1274        preprocessed
1275    } else {
1276        let chain = TransformChain::deserialize(&transform_metadata)?;
1277        reverse_preprocess(&preprocessed, &chain)
1278    };
1279
1280    // CRC-32 integrity check.
1281    let crc = crc32fast::hash(&data);
1282    if crc != header.crc32 {
1283        return Err(io::Error::new(
1284            io::ErrorKind::InvalidData,
1285            format!(
1286                "CRC-32 mismatch: expected {:#010X}, got {:#010X}",
1287                header.crc32, crc
1288            ),
1289        ));
1290    }
1291
1292    if data.len() as u64 != header.original_size {
1293        return Err(io::Error::new(
1294            io::ErrorKind::InvalidData,
1295            format!(
1296                "size mismatch: header says {} bytes, got {}",
1297                header.original_size,
1298                data.len()
1299            ),
1300        ));
1301    }
1302
1303    Ok(data)
1304}
1305
1306/// Compress to Vec (convenience).
1307pub fn compress_to_vec(
1308    data: &[u8],
1309    mode: Mode,
1310    format_override: Option<FormatHint>,
1311) -> io::Result<Vec<u8>> {
1312    let mut buf = Vec::new();
1313    compress(data, mode, format_override, &mut buf)?;
1314    Ok(buf)
1315}
1316
1317/// Compress to Vec with explicit model path.
1318pub fn compress_to_vec_with_model(
1319    data: &[u8],
1320    mode: Mode,
1321    format_override: Option<FormatHint>,
1322    model_path: Option<&str>,
1323) -> io::Result<Vec<u8>> {
1324    let mut buf = Vec::new();
1325    compress_with_model(data, mode, format_override, model_path, &mut buf)?;
1326    Ok(buf)
1327}
1328
1329/// Compress to Vec with explicit model path and zstd level override.
1330pub fn compress_to_vec_with_options(
1331    data: &[u8],
1332    mode: Mode,
1333    format_override: Option<FormatHint>,
1334    model_path: Option<&str>,
1335    zstd_level_override: Option<i32>,
1336) -> io::Result<Vec<u8>> {
1337    let mut buf = Vec::new();
1338    compress_with_options(
1339        data,
1340        mode,
1341        format_override,
1342        model_path,
1343        zstd_level_override,
1344        &mut buf,
1345    )?;
1346    Ok(buf)
1347}
1348
1349/// Decompress from slice (convenience).
1350pub fn decompress_from_slice(dcx_data: &[u8]) -> io::Result<Vec<u8>> {
1351    let mut cursor = Cursor::new(dcx_data);
1352    decompress(&mut cursor)
1353}
1354
1355/// Read header only (for `info` command).
1356pub fn read_header<R: Read>(input: &mut R) -> io::Result<DcxHeader> {
1357    DcxHeader::read_from(input)
1358}
1359
1360/// Compress raw data with zstd at a given level (for benchmark comparison).
1361pub fn raw_zstd_compress(data: &[u8], level: i32) -> io::Result<Vec<u8>> {
1362    zstd::bulk::compress(data, level).map_err(io::Error::other)
1363}
1364
1365#[cfg(test)]
1366mod tests {
1367    use super::*;
1368
1369    #[test]
1370    fn fast_mode_roundtrip() {
1371        let original = b"Hello, DataCortex! This is a test of Fast mode compression.";
1372        let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1373        let decompressed = decompress_from_slice(&compressed).unwrap();
1374        assert_eq!(decompressed, original);
1375    }
1376
1377    #[test]
1378    fn fast_mode_json_roundtrip() {
1379        let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25,"name":"Carol","age":35}"#;
1380        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1381        let decompressed = decompress_from_slice(&compressed).unwrap();
1382        assert_eq!(decompressed, data.to_vec());
1383    }
1384
1385    #[test]
1386    fn balanced_mode_roundtrip() {
1387        let original = b"Balanced mode test data with some content.";
1388        let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1389        let decompressed = decompress_from_slice(&compressed).unwrap();
1390        assert_eq!(decompressed, original);
1391    }
1392
1393    #[test]
1394    fn balanced_mode_longer_text() {
1395        let original = b"The quick brown fox jumps over the lazy dog. This sentence contains every letter of the English alphabet at least once. We need enough data to properly exercise the arithmetic coder and order-0 model.";
1396        let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1397        let decompressed = decompress_from_slice(&compressed).unwrap();
1398        assert_eq!(decompressed, original);
1399    }
1400
1401    #[test]
1402    fn balanced_mode_repetitive_data() {
1403        let data = "hello world! ".repeat(100);
1404        let compressed = compress_to_vec(data.as_bytes(), Mode::Balanced, None).unwrap();
1405        let decompressed = decompress_from_slice(&compressed).unwrap();
1406        assert_eq!(decompressed, data.as_bytes());
1407    }
1408
1409    #[test]
1410    fn balanced_mode_all_byte_values() {
1411        let original: Vec<u8> = (0..=255).collect();
1412        let compressed = compress_to_vec(&original, Mode::Balanced, None).unwrap();
1413        let decompressed = decompress_from_slice(&compressed).unwrap();
1414        assert_eq!(decompressed, original);
1415    }
1416
1417    #[test]
1418    fn balanced_mode_single_byte() {
1419        let original = b"X";
1420        let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1421        let decompressed = decompress_from_slice(&compressed).unwrap();
1422        assert_eq!(decompressed, original);
1423    }
1424
1425    #[test]
1426    fn balanced_mode_json_roundtrip() {
1427        let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25,"name":"Carol","age":35}"#;
1428        let compressed = compress_to_vec(data, Mode::Balanced, Some(FormatHint::Json)).unwrap();
1429        let decompressed = decompress_from_slice(&compressed).unwrap();
1430        assert_eq!(decompressed, data.to_vec());
1431    }
1432
1433    #[test]
1434    fn empty_data_roundtrip() {
1435        let original = b"";
1436        for mode in [Mode::Fast, Mode::Balanced, Mode::Max] {
1437            let compressed = compress_to_vec(original, mode, None).unwrap();
1438            let decompressed = decompress_from_slice(&compressed).unwrap();
1439            assert_eq!(decompressed, original, "failed for mode {mode}");
1440        }
1441    }
1442
1443    #[test]
1444    fn crc_mismatch_detected() {
1445        let original = b"test data for CRC check";
1446        let mut compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1447        // Corrupt in the compressed data section (after header).
1448        let header_size = 32; // minimum header
1449        if compressed.len() > header_size + 5 {
1450            compressed[header_size + 3] ^= 0xFF;
1451        }
1452        assert!(decompress_from_slice(&compressed).is_err());
1453    }
1454
1455    #[test]
1456    fn fast_mode_actually_compresses() {
1457        // Repetitive data should compress well with zstd.
1458        let data = "hello world. ".repeat(100);
1459        let compressed = compress_to_vec(data.as_bytes(), Mode::Fast, None).unwrap();
1460        assert!(
1461            compressed.len() < data.len(),
1462            "Fast mode should compress repetitive data: {} vs {}",
1463            compressed.len(),
1464            data.len()
1465        );
1466    }
1467
1468    #[test]
1469    fn json_preprocessing_improves_fast_mode() {
1470        let data = br#"[{"name":"Alice","score":95},{"name":"Bob","score":87},{"name":"Carol","score":92},{"name":"Dave","score":88},{"name":"Eve","score":91}]"#;
1471        let with_preprocess = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1472        let without_preprocess =
1473            compress_to_vec(data, Mode::Fast, Some(FormatHint::Generic)).unwrap();
1474
1475        // Both should decompress correctly.
1476        assert_eq!(
1477            decompress_from_slice(&with_preprocess).unwrap(),
1478            data.to_vec()
1479        );
1480        assert_eq!(
1481            decompress_from_slice(&without_preprocess).unwrap(),
1482            data.to_vec()
1483        );
1484    }
1485
1486    #[test]
1487    fn all_modes_roundtrip() {
1488        let data = b"test all modes with some more content to ensure decent compression";
1489        for mode in [Mode::Max, Mode::Balanced, Mode::Fast] {
1490            let compressed = compress_to_vec(data, mode, None).unwrap();
1491            let decompressed = decompress_from_slice(&compressed).unwrap();
1492            assert_eq!(decompressed, data, "failed for mode {mode}");
1493        }
1494    }
1495
1496    #[test]
1497    fn cm_compress_decompress_direct() {
1498        let data = b"Hello, World! This is a direct CM test.";
1499        let compressed = cm_compress(data, CMConfig::balanced());
1500        let decompressed = cm_decompress(&compressed, data.len(), CMConfig::balanced());
1501        assert_eq!(decompressed, data.to_vec());
1502    }
1503
1504    #[test]
1505    fn cm_empty() {
1506        let data: &[u8] = b"";
1507        let compressed = cm_compress(data, CMConfig::balanced());
1508        let decompressed = cm_decompress(&compressed, 0, CMConfig::balanced());
1509        assert!(decompressed.is_empty());
1510    }
1511
1512    #[test]
1513    fn cm_single_byte() {
1514        for byte in 0..=255u8 {
1515            let data = [byte];
1516            let compressed = cm_compress(&data, CMConfig::balanced());
1517            let decompressed = cm_decompress(&compressed, 1, CMConfig::balanced());
1518            assert_eq!(
1519                decompressed, data,
1520                "CM roundtrip failed for byte {byte:#04X}"
1521            );
1522        }
1523    }
1524
1525    #[test]
1526    fn cm_repetitive_compresses() {
1527        let data = vec![b'A'; 1000];
1528        let compressed = cm_compress(&data, CMConfig::balanced());
1529        // 1000 identical bytes should compress well with adaptive model.
1530        assert!(
1531            compressed.len() < 200,
1532            "CM should compress 1000 identical bytes well: {} bytes",
1533            compressed.len()
1534        );
1535        let decompressed = cm_decompress(&compressed, data.len(), CMConfig::balanced());
1536        assert_eq!(decompressed, data);
1537    }
1538
1539    #[test]
1540    fn max_mode_roundtrip() {
1541        let original = b"Max mode test data with some content for compression.";
1542        let compressed = compress_to_vec(original, Mode::Max, None).unwrap();
1543        let decompressed = decompress_from_slice(&compressed).unwrap();
1544        assert_eq!(decompressed, original);
1545    }
1546
1547    #[test]
1548    fn max_mode_longer_text() {
1549        let original = b"The quick brown fox jumps over the lazy dog. Max mode uses 2x context maps for better predictions with fewer hash collisions. This should compress slightly better than balanced mode.";
1550        let compressed = compress_to_vec(original, Mode::Max, None).unwrap();
1551        let decompressed = decompress_from_slice(&compressed).unwrap();
1552        assert_eq!(decompressed, original);
1553    }
1554
1555    // ─── Dictionary compression tests ──────────────────────────────────────────
1556
1557    #[test]
1558    fn test_dict_compress_roundtrip() {
1559        // Generate NDJSON data large enough to trigger dictionary training.
1560        // Repetitive columnar data is ideal for dictionary learning.
1561        let mut ndjson = String::new();
1562        for i in 0..500 {
1563            ndjson.push_str(&format!(
1564                r#"{{"id":{},"name":"user_{}","status":"active","score":{}}}"#,
1565                i,
1566                i,
1567                i * 17 % 100
1568            ));
1569            ndjson.push('\n');
1570        }
1571        let data = ndjson.as_bytes();
1572        assert!(
1573            data.len() > DICT_MIN_DATA_SIZE,
1574            "test data should exceed dict threshold: {} bytes",
1575            data.len()
1576        );
1577
1578        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1579        let decompressed = decompress_from_slice(&compressed).unwrap();
1580        assert_eq!(
1581            decompressed, data,
1582            "dict compress roundtrip: byte-exact mismatch"
1583        );
1584    }
1585
1586    #[test]
1587    fn test_dict_falls_back_on_small() {
1588        // Data smaller than DICT_MIN_DATA_SIZE should not use dictionary.
1589        let data = b"small data that won't trigger dictionary training";
1590        assert!(data.len() < DICT_MIN_DATA_SIZE);
1591
1592        let compressed = compress_to_vec(data, Mode::Fast, None).unwrap();
1593        let decompressed = decompress_from_slice(&compressed).unwrap();
1594        assert_eq!(decompressed, data.to_vec());
1595
1596        // Verify no dict flag in header.
1597        let mut cursor = Cursor::new(&compressed);
1598        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1599        assert!(!header.has_dict, "small data should not have dict flag set");
1600    }
1601
1602    #[test]
1603    fn test_dict_backward_compat() {
1604        // Compress with old behavior (no dict) and verify it still decompresses.
1605        // We simulate this by compressing small data (which skips dict).
1606        let original = b"backward compatibility test data for decompression";
1607        let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1608
1609        // Verify the flag is NOT set.
1610        let mut cursor = Cursor::new(&compressed);
1611        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1612        assert!(!header.has_dict);
1613
1614        // Decompress should work fine.
1615        let decompressed = decompress_from_slice(&compressed).unwrap();
1616        assert_eq!(decompressed, original.to_vec());
1617    }
1618
1619    #[test]
1620    fn test_dict_ndjson_large_roundtrip() {
1621        // Larger NDJSON dataset — should benefit from dictionary.
1622        let mut ndjson = String::new();
1623        for i in 0..2000 {
1624            ndjson.push_str(&format!(
1625                r#"{{"timestamp":"2025-01-{:02}T{:02}:{:02}:00Z","level":"info","message":"Request processed","request_id":"req_{}","duration_ms":{}}}"#,
1626                (i % 28) + 1,
1627                i % 24,
1628                i % 60,
1629                i,
1630                (i * 13) % 500
1631            ));
1632            ndjson.push('\n');
1633        }
1634        let data = ndjson.as_bytes();
1635
1636        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1637        let decompressed = decompress_from_slice(&compressed).unwrap();
1638        assert_eq!(decompressed, data, "large NDJSON roundtrip mismatch");
1639    }
1640
1641    #[test]
1642    fn test_dict_generic_data_roundtrip() {
1643        // Generic (non-JSON) data that's large enough for dict training.
1644        // Uses fixed-size block splitting instead of column boundaries.
1645        let mut data = Vec::new();
1646        for i in 0..3000 {
1647            data.extend_from_slice(
1648                format!("line {i}: the quick brown fox jumps over the lazy dog\n").as_bytes(),
1649            );
1650        }
1651        assert!(data.len() > DICT_MIN_DATA_SIZE);
1652
1653        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Generic)).unwrap();
1654        let decompressed = decompress_from_slice(&compressed).unwrap();
1655        assert_eq!(decompressed, data, "generic data dict roundtrip mismatch");
1656    }
1657
1658    #[test]
1659    fn test_dict_does_not_affect_other_modes() {
1660        // Dictionary training should only apply to Fast mode.
1661        // Balanced and Max modes should remain unchanged.
1662        let mut ndjson = String::new();
1663        for i in 0..200 {
1664            ndjson.push_str(&format!(
1665                r#"{{"id":{},"name":"user_{}","status":"active"}}"#,
1666                i, i
1667            ));
1668            ndjson.push('\n');
1669        }
1670        let data = ndjson.as_bytes();
1671
1672        for mode in [Mode::Balanced, Mode::Max] {
1673            let compressed = compress_to_vec(data, mode, Some(FormatHint::Ndjson)).unwrap();
1674            let mut cursor = Cursor::new(&compressed);
1675            let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1676            assert!(!header.has_dict, "mode {mode} should never have dict flag");
1677            let decompressed = decompress_from_slice(&compressed).unwrap();
1678            assert_eq!(decompressed, data, "roundtrip failed for mode {mode}");
1679        }
1680    }
1681
1682    // ─── Configurable zstd level tests ──────────────────────────────────────
1683
1684    #[test]
1685    fn test_compress_with_level() {
1686        // Compress with level 19 override in Fast mode, verify roundtrip.
1687        let data = "hello world, compressing with custom zstd level. ".repeat(50);
1688        let compressed =
1689            compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(19))
1690                .unwrap();
1691        let decompressed = decompress_from_slice(&compressed).unwrap();
1692        assert_eq!(decompressed, data.as_bytes(), "level 19 roundtrip failed");
1693    }
1694
1695    #[test]
1696    fn test_compress_with_level_default() {
1697        // No level override — should use mode default (9 for Fast).
1698        let data = "default level test data. ".repeat(50);
1699        let compressed =
1700            compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, None).unwrap();
1701        let decompressed = decompress_from_slice(&compressed).unwrap();
1702        assert_eq!(
1703            decompressed,
1704            data.as_bytes(),
1705            "default level roundtrip failed"
1706        );
1707    }
1708
1709    #[test]
1710    fn test_compress_with_level_higher_ratio() {
1711        // Level 19 should compress better than level 1 on repetitive data.
1712        let data = r#"{"name":"Alice","score":95}"#.repeat(200);
1713        let low =
1714            compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(1)).unwrap();
1715        let high = compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(19))
1716            .unwrap();
1717
1718        // Both must roundtrip.
1719        assert_eq!(decompress_from_slice(&low).unwrap(), data.as_bytes());
1720        assert_eq!(decompress_from_slice(&high).unwrap(), data.as_bytes());
1721
1722        // Higher level should produce smaller output (or at least not larger).
1723        assert!(
1724            high.len() <= low.len(),
1725            "level 19 ({}) should be <= level 1 ({})",
1726            high.len(),
1727            low.len()
1728        );
1729    }
1730
1731    // ─── Auto-fallback tests ──────────────────────────────────────────────────
1732
1733    #[test]
1734    fn test_auto_fallback_picks_smaller() {
1735        // citm_catalog.json has extreme repetition. The auto-fallback picks
1736        // whichever path (raw or preprocessed) produces the smallest output.
1737        // With compressed metadata, the preprocessed path may now win.
1738        let data = std::fs::read(concat!(
1739            env!("CARGO_MANIFEST_DIR"),
1740            "/../../corpus/json-bench/citm_catalog.json"
1741        ))
1742        .unwrap();
1743
1744        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1745        let decompressed = decompress_from_slice(&compressed).unwrap();
1746        assert_eq!(decompressed, data, "citm_catalog roundtrip failed");
1747
1748        // Verify good compression ratio regardless of which path won.
1749        let ratio = data.len() as f64 / compressed.len() as f64;
1750        assert!(
1751            ratio > 50.0,
1752            "citm_catalog should achieve >50x, got {ratio:.1}x"
1753        );
1754    }
1755
1756    #[test]
1757    fn test_auto_fallback_preprocessed_wins_on_ndjson() {
1758        // NDJSON with uniform schema should still prefer preprocessed path
1759        // (columnar + typed encoding beats raw zstd for structured data).
1760        let data = std::fs::read(concat!(
1761            env!("CARGO_MANIFEST_DIR"),
1762            "/../../corpus/test-ndjson.ndjson"
1763        ))
1764        .unwrap();
1765
1766        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1767        let decompressed = decompress_from_slice(&compressed).unwrap();
1768        assert_eq!(decompressed, data, "test-ndjson roundtrip failed");
1769
1770        // Check that preprocessing was used: either non-empty transform metadata
1771        // in the header, or metadata embedded in the compressed stream.
1772        let mut cursor = Cursor::new(&compressed);
1773        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1774        assert!(
1775            !header.transform_metadata.is_empty() || header.meta_embedded,
1776            "test-ndjson should prefer preprocessed path (non-empty transform metadata or embedded)"
1777        );
1778    }
1779
1780    #[test]
1781    fn test_auto_fallback_roundtrip() {
1782        // Verify both raw and preprocessed paths produce correct roundtrips.
1783        // Use citm_catalog (raw wins) and test-ndjson (preprocessed wins).
1784        let citm = std::fs::read(concat!(
1785            env!("CARGO_MANIFEST_DIR"),
1786            "/../../corpus/json-bench/citm_catalog.json"
1787        ))
1788        .unwrap();
1789        let ndjson = std::fs::read(concat!(
1790            env!("CARGO_MANIFEST_DIR"),
1791            "/../../corpus/test-ndjson.ndjson"
1792        ))
1793        .unwrap();
1794
1795        // citm_catalog — raw path
1796        let compressed_citm = compress_to_vec(&citm, Mode::Fast, Some(FormatHint::Json)).unwrap();
1797        let decompressed_citm = decompress_from_slice(&compressed_citm).unwrap();
1798        assert_eq!(
1799            decompressed_citm, citm,
1800            "citm_catalog roundtrip (raw path) failed"
1801        );
1802
1803        // test-ndjson — preprocessed path
1804        let compressed_ndjson =
1805            compress_to_vec(&ndjson, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1806        let decompressed_ndjson = decompress_from_slice(&compressed_ndjson).unwrap();
1807        assert_eq!(
1808            decompressed_ndjson, ndjson,
1809            "test-ndjson roundtrip (preprocessed path) failed"
1810        );
1811    }
1812
1813    // ─── Adaptive level tests ─────────────────────────────────────────────────
1814
1815    #[test]
1816    fn test_adaptive_level_small_data() {
1817        // <1MB should use level 19 (zstd-19 is <50ms on small data).
1818        assert_eq!(adaptive_fast_level(100_000, None), 19);
1819        assert_eq!(adaptive_fast_level(500_000, None), 19);
1820        assert_eq!(adaptive_fast_level(1_048_576, None), 19);
1821        assert_eq!(adaptive_fast_level(0, None), 19);
1822    }
1823
1824    #[test]
1825    fn test_adaptive_level_large_data() {
1826        // 1MB-10MB should use level 13, >10MB should use level 9.
1827        assert_eq!(adaptive_fast_level(1_048_577, None), 13);
1828        assert_eq!(adaptive_fast_level(5_000_000, None), 13);
1829        assert_eq!(adaptive_fast_level(10_485_760, None), 13);
1830        assert_eq!(adaptive_fast_level(10_485_761, None), 9);
1831        assert_eq!(adaptive_fast_level(100_000_000, None), 9);
1832    }
1833
1834    #[test]
1835    fn test_adaptive_level_override() {
1836        // --level flag should always override adaptive level.
1837        assert_eq!(adaptive_fast_level(100, Some(3)), 3);
1838        assert_eq!(adaptive_fast_level(100_000_000, Some(22)), 22);
1839        assert_eq!(adaptive_fast_level(0, Some(1)), 1);
1840    }
1841
1842    // ─── Compressed metadata tests ──────────────────────────────────────────────
1843
1844    #[test]
1845    fn test_compressed_metadata_roundtrip() {
1846        // Generate NDJSON data large enough to produce > 64 bytes of transform metadata.
1847        let mut ndjson = String::new();
1848        for i in 0..500 {
1849            ndjson.push_str(&format!(
1850                r#"{{"id":{},"name":"user_{}","status":"active","score":{}}}"#,
1851                i,
1852                i,
1853                i * 17 % 100
1854            ));
1855            ndjson.push('\n');
1856        }
1857        let data = ndjson.as_bytes();
1858
1859        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1860        let decompressed = decompress_from_slice(&compressed).unwrap();
1861        assert_eq!(
1862            decompressed, data,
1863            "compressed metadata roundtrip: byte-exact mismatch"
1864        );
1865
1866        // Verify the header has meta_compressed set if metadata was large enough.
1867        let mut cursor = Cursor::new(&compressed);
1868        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1869        // The file should have used preprocessed path (non-empty metadata).
1870        if !header.transform_metadata.is_empty() && header.transform_metadata.len() > 10 {
1871            // Metadata was present — check that compressed flag makes sense.
1872            // (meta_compressed is true only if compression actually saved space)
1873            // Just verify roundtrip was correct — the flag is an optimization detail.
1874        }
1875    }
1876
1877    #[test]
1878    fn test_compressed_metadata_backward_compat() {
1879        // Simulate old files that have no compressed metadata (bit 2 = 0).
1880        // These should still decompress correctly.
1881        let original = b"backward compatibility test data for metadata decompression";
1882        let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1883
1884        // Verify decompression works.
1885        let decompressed = decompress_from_slice(&compressed).unwrap();
1886        assert_eq!(decompressed, original.to_vec());
1887
1888        // For small data, metadata should be empty or very small — no compression.
1889        let mut cursor = Cursor::new(&compressed);
1890        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1891        // Small data may or may not have metadata, but it should roundtrip either way.
1892        assert!(!header.meta_compressed || !header.transform_metadata.is_empty());
1893    }
1894
1895    #[test]
1896    fn test_compressed_metadata_small_skipped() {
1897        // Small metadata (< 64 bytes) should NOT be compressed — zstd frame overhead
1898        // would make it larger.
1899        let data = br#"{"name":"Alice","age":30}"#;
1900        let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1901        let decompressed = decompress_from_slice(&compressed).unwrap();
1902        assert_eq!(decompressed, data.to_vec());
1903
1904        let mut cursor = Cursor::new(&compressed);
1905        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1906        // Small JSON has small metadata — should not be compressed.
1907        if header.transform_metadata.len() <= 64 {
1908            assert!(
1909                !header.meta_compressed,
1910                "metadata <= 64 bytes should not be compressed, but meta_compressed=true \
1911                 for {} bytes of metadata",
1912                header.transform_metadata.len()
1913            );
1914        }
1915    }
1916
1917    #[test]
1918    fn test_twitter_json_brotli_wins() {
1919        // twitter.json should use brotli — raw brotli-11 beats both preprocessed+zstd
1920        // and raw+zstd on this file.
1921        let data = std::fs::read(concat!(
1922            env!("CARGO_MANIFEST_DIR"),
1923            "/../../corpus/json-bench/twitter.json"
1924        ))
1925        .unwrap();
1926
1927        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1928        let decompressed = decompress_from_slice(&compressed).unwrap();
1929        assert_eq!(decompressed, data, "twitter.json roundtrip failed");
1930
1931        // Check that brotli was selected.
1932        let mut cursor = Cursor::new(&compressed);
1933        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1934        assert!(
1935            header.use_brotli,
1936            "twitter.json should use brotli (FLAG_BROTLI set in header)"
1937        );
1938    }
1939
1940    #[test]
1941    fn test_compressed_metadata_all_modes_roundtrip() {
1942        // Metadata compression applies to all modes, not just Fast.
1943        let mut ndjson = String::new();
1944        for i in 0..200 {
1945            ndjson.push_str(&format!(
1946                r#"{{"id":{},"name":"user_{}","status":"active"}}"#,
1947                i, i
1948            ));
1949            ndjson.push('\n');
1950        }
1951        let data = ndjson.as_bytes();
1952
1953        for mode in [Mode::Fast, Mode::Balanced, Mode::Max] {
1954            let compressed = compress_to_vec(data, mode, Some(FormatHint::Ndjson)).unwrap();
1955            let decompressed = decompress_from_slice(&compressed).unwrap();
1956            assert_eq!(
1957                decompressed, data,
1958                "compressed metadata roundtrip failed for mode {mode}"
1959            );
1960        }
1961    }
1962
1963    // ─── Brotli auto-fallback tests ──────────────────────────────────────────
1964
1965    #[test]
1966    fn test_brotli_compress_roundtrip() {
1967        // Direct brotli compress/decompress helper roundtrip.
1968        let data = b"Hello, brotli! This is a test of the brotli compression helpers.";
1969        let compressed = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
1970        let decompressed = brotli_decompress(&compressed).unwrap();
1971        assert_eq!(decompressed, data.to_vec());
1972    }
1973
1974    #[test]
1975    fn test_brotli_auto_fallback_twitter() {
1976        // twitter.json should select brotli and roundtrip correctly.
1977        let data = std::fs::read(concat!(
1978            env!("CARGO_MANIFEST_DIR"),
1979            "/../../corpus/json-bench/twitter.json"
1980        ))
1981        .unwrap();
1982
1983        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1984        let decompressed = decompress_from_slice(&compressed).unwrap();
1985        assert_eq!(decompressed, data, "twitter.json brotli roundtrip failed");
1986
1987        let mut cursor = Cursor::new(&compressed);
1988        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1989        assert!(
1990            header.use_brotli,
1991            "twitter.json should use brotli in auto-fallback"
1992        );
1993    }
1994
1995    #[test]
1996    fn test_brotli_ndjson_roundtrip() {
1997        // NDJSON with uniform schema — regardless of which entropy coder wins,
1998        // the roundtrip must be byte-exact.
1999        let data = std::fs::read(concat!(
2000            env!("CARGO_MANIFEST_DIR"),
2001            "/../../corpus/test-ndjson.ndjson"
2002        ))
2003        .unwrap();
2004
2005        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2006        let decompressed = decompress_from_slice(&compressed).unwrap();
2007        assert_eq!(decompressed, data, "ndjson roundtrip failed");
2008    }
2009
2010    #[test]
2011    fn test_brotli_backward_compat() {
2012        // Old .dcx files without the brotli flag (bit 3 = 0) must still decompress.
2013        // We simulate an old file by manually crafting a .dcx with FLAG_BROTLI unset.
2014        // Compress with zstd directly and build a minimal .dcx header.
2015        let original = b"backward compatibility test: this data was compressed without brotli";
2016        let crc = crc32fast::hash(original);
2017        let zstd_compressed = zstd::bulk::compress(original, 19).unwrap();
2018
2019        let header = crate::dcx::DcxHeader {
2020            mode: Mode::Fast,
2021            format_hint: crate::dcx::FormatHint::Generic,
2022            original_size: original.len() as u64,
2023            compressed_size: zstd_compressed.len() as u64,
2024            crc32: crc,
2025            transform_metadata: vec![],
2026            has_dict: false,
2027            meta_compressed: false,
2028            use_brotli: false,
2029            meta_embedded: false,
2030        };
2031
2032        let mut buf = Vec::new();
2033        header.write_to(&mut buf).unwrap();
2034        buf.extend_from_slice(&zstd_compressed);
2035
2036        // Verify the brotli flag is NOT set in the serialized header.
2037        assert_eq!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2038
2039        // Decompress — must work even though brotli path exists.
2040        let decompressed = decompress_from_slice(&buf).unwrap();
2041        assert_eq!(decompressed, original.to_vec());
2042    }
2043
2044    // ─── Embedded metadata tests ──────────────────────────────────────────────
2045
2046    #[test]
2047    fn test_embedded_metadata_roundtrip() {
2048        // Compress test-api.json with Fast mode — if embedded metadata is used,
2049        // the roundtrip must be byte-exact.
2050        let data = std::fs::read(concat!(
2051            env!("CARGO_MANIFEST_DIR"),
2052            "/../../corpus/test-api.json"
2053        ))
2054        .unwrap();
2055
2056        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2057        let decompressed = decompress_from_slice(&compressed).unwrap();
2058        assert_eq!(
2059            decompressed, data,
2060            "test-api.json embedded metadata roundtrip: byte-exact mismatch"
2061        );
2062    }
2063
2064    #[test]
2065    fn test_embedded_metadata_backward_compat() {
2066        // Old .dcx files without the meta_embedded flag (bit 4 = 0) must still decompress.
2067        // We simulate an old file by manually crafting a .dcx with FLAG_META_EMBEDDED unset
2068        // and separate transform metadata.
2069        let original = b"backward compat: no embedded metadata in this old file format";
2070        let crc = crc32fast::hash(original);
2071        let zstd_compressed = zstd::bulk::compress(original, 19).unwrap();
2072
2073        let header = crate::dcx::DcxHeader {
2074            mode: Mode::Fast,
2075            format_hint: crate::dcx::FormatHint::Generic,
2076            original_size: original.len() as u64,
2077            compressed_size: zstd_compressed.len() as u64,
2078            crc32: crc,
2079            transform_metadata: vec![],
2080            has_dict: false,
2081            meta_compressed: false,
2082            use_brotli: false,
2083            meta_embedded: false,
2084        };
2085
2086        let mut buf = Vec::new();
2087        header.write_to(&mut buf).unwrap();
2088        buf.extend_from_slice(&zstd_compressed);
2089
2090        // Verify meta_embedded flag is NOT set.
2091        assert_eq!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2092
2093        // Decompress — must work without embedded metadata support.
2094        let decompressed = decompress_from_slice(&buf).unwrap();
2095        assert_eq!(decompressed, original.to_vec());
2096    }
2097
2098    #[test]
2099    fn test_embedded_metadata_small_file_improvement() {
2100        // test-api.json is a small file (37KB) where embedded metadata should
2101        // save overhead compared to separate metadata.
2102        let data = std::fs::read(concat!(
2103            env!("CARGO_MANIFEST_DIR"),
2104            "/../../corpus/test-api.json"
2105        ))
2106        .unwrap();
2107
2108        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2109        let decompressed = decompress_from_slice(&compressed).unwrap();
2110        assert_eq!(decompressed, data, "roundtrip failed");
2111
2112        // Verify the file compresses to a reasonable size.
2113        let ratio = data.len() as f64 / compressed.len() as f64;
2114        assert!(
2115            ratio > 5.0,
2116            "test-api.json should achieve >5x compression, got {ratio:.1}x"
2117        );
2118
2119        // Check header to see which path was chosen.
2120        let mut cursor = Cursor::new(&compressed);
2121        let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
2122
2123        // If embedded was chosen, verify the flag is set and header metadata is empty.
2124        if header.meta_embedded {
2125            assert!(
2126                header.transform_metadata.is_empty(),
2127                "meta_embedded header should have empty transform_metadata"
2128            );
2129            assert!(header.use_brotli, "meta_embedded should use brotli codec");
2130        }
2131    }
2132
2133    #[test]
2134    fn test_embedded_metadata_ndjson_roundtrip() {
2135        // NDJSON files with transforms must still roundtrip correctly
2136        // regardless of whether embedded or separate metadata is chosen.
2137        let data = std::fs::read(concat!(
2138            env!("CARGO_MANIFEST_DIR"),
2139            "/../../corpus/test-ndjson.ndjson"
2140        ))
2141        .unwrap();
2142
2143        let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2144        let decompressed = decompress_from_slice(&compressed).unwrap();
2145        assert_eq!(
2146            decompressed, data,
2147            "NDJSON embedded metadata roundtrip: byte-exact mismatch"
2148        );
2149    }
2150
2151    #[test]
2152    fn test_embedded_metadata_manual_roundtrip() {
2153        // Manually construct an embedded-metadata .dcx to verify the decompress path
2154        // handles the format correctly, independent of what the compressor chooses.
2155        let original = b"Hello, embedded metadata world! This is a test.";
2156        let crc = crc32fast::hash(original);
2157
2158        // Build embedded payload with an empty transform chain so reverse_preprocess
2159        // is a no-op and the data passes through unchanged.
2160        let empty_chain = TransformChain::new();
2161        let raw_metadata = empty_chain.serialize();
2162
2163        // Build embedded payload: [meta_len:u32 LE][raw_metadata][original_data]
2164        let mut embedded = Vec::new();
2165        embedded.extend_from_slice(&(raw_metadata.len() as u32).to_le_bytes());
2166        embedded.extend_from_slice(&raw_metadata);
2167        embedded.extend_from_slice(original);
2168
2169        let brotli_data = brotli_compress(&embedded, 11, BROTLI_MODE_GENERIC).unwrap();
2170
2171        let header = crate::dcx::DcxHeader {
2172            mode: Mode::Fast,
2173            format_hint: crate::dcx::FormatHint::Generic,
2174            original_size: original.len() as u64,
2175            compressed_size: brotli_data.len() as u64,
2176            crc32: crc,
2177            transform_metadata: vec![], // empty — metadata is embedded
2178            has_dict: false,
2179            meta_compressed: false,
2180            use_brotli: true,
2181            meta_embedded: true,
2182        };
2183
2184        let mut buf = Vec::new();
2185        header.write_to(&mut buf).unwrap();
2186        buf.extend_from_slice(&brotli_data);
2187
2188        // Verify flags.
2189        assert_ne!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2190        assert_ne!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2191
2192        // Decompress and verify.
2193        let decompressed = decompress_from_slice(&buf).unwrap();
2194        assert_eq!(decompressed, original.to_vec());
2195    }
2196
2197    // ─── Optimization: Brotli TEXT mode tests ───────────────────────────────
2198
2199    #[test]
2200    fn test_brotli_text_mode_on_raw() {
2201        // Verify TEXT mode produces valid brotli that decompresses correctly.
2202        let data = br#"{"name":"Alice","age":30,"city":"New York","active":true}"#;
2203
2204        // TEXT mode (for raw UTF-8/JSON).
2205        let compressed_text = brotli_compress(data, 11, BROTLI_MODE_TEXT).unwrap();
2206        let decompressed_text = brotli_decompress(&compressed_text).unwrap();
2207        assert_eq!(
2208            decompressed_text,
2209            data.to_vec(),
2210            "TEXT mode roundtrip failed"
2211        );
2212
2213        // GENERIC mode (for comparison).
2214        let compressed_generic = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
2215        let decompressed_generic = brotli_decompress(&compressed_generic).unwrap();
2216        assert_eq!(
2217            decompressed_generic,
2218            data.to_vec(),
2219            "GENERIC mode roundtrip failed"
2220        );
2221
2222        // Both must produce valid output — TEXT mode should not be larger than
2223        // GENERIC on UTF-8 text (or at most within a few bytes).
2224        // We don't assert TEXT < GENERIC because on tiny data the difference is negligible,
2225        // but we verify the feature works.
2226        assert!(
2227            !compressed_text.is_empty(),
2228            "TEXT mode should produce non-empty output"
2229        );
2230    }
2231
2232    // ─── Optimization: Zstd embedded metadata tests ─────────────────────────
2233
2234    #[test]
2235    fn test_zstd_embedded_metadata_roundtrip() {
2236        // Manually construct a .dcx with zstd-compressed embedded metadata
2237        // (meta_embedded=true, use_brotli=false) and verify roundtrip.
2238        let original = b"Hello, zstd embedded metadata! This is a test of the zstd path.";
2239        let crc = crc32fast::hash(original);
2240
2241        // Build embedded payload with an empty transform chain.
2242        let empty_chain = TransformChain::new();
2243        let raw_metadata = empty_chain.serialize();
2244
2245        // [meta_len:u32 LE][raw_metadata][original_data]
2246        let mut embedded = Vec::new();
2247        embedded.extend_from_slice(&(raw_metadata.len() as u32).to_le_bytes());
2248        embedded.extend_from_slice(&raw_metadata);
2249        embedded.extend_from_slice(original);
2250
2251        let zstd_data = zstd::bulk::compress(&embedded, 19).unwrap();
2252
2253        let header = crate::dcx::DcxHeader {
2254            mode: Mode::Fast,
2255            format_hint: crate::dcx::FormatHint::Generic,
2256            original_size: original.len() as u64,
2257            compressed_size: zstd_data.len() as u64,
2258            crc32: crc,
2259            transform_metadata: vec![], // empty — metadata is embedded
2260            has_dict: false,
2261            meta_compressed: false,
2262            use_brotli: false, // zstd, not brotli
2263            meta_embedded: true,
2264        };
2265
2266        let mut buf = Vec::new();
2267        header.write_to(&mut buf).unwrap();
2268        buf.extend_from_slice(&zstd_data);
2269
2270        // Verify flags: meta_embedded set, brotli NOT set.
2271        assert_ne!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2272        assert_eq!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2273
2274        // Decompress and verify byte-exact roundtrip.
2275        let decompressed = decompress_from_slice(&buf).unwrap();
2276        assert_eq!(decompressed, original.to_vec());
2277    }
2278
2279    // ─── Optimization: Multi-quality brotli tests ───────────────────────────
2280
2281    #[test]
2282    fn test_multi_quality_brotli() {
2283        // Verify both quality 10 and 11 produce valid brotli that decompresses.
2284        // On some data q10 beats q11 — we just verify both work correctly.
2285        let data = br#"{"items":[1,2,3,4,5],"nested":{"a":"hello","b":"world"}}"#;
2286
2287        let q10 = brotli_compress(data, 10, BROTLI_MODE_GENERIC).unwrap();
2288        let q11 = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
2289
2290        let dec_q10 = brotli_decompress(&q10).unwrap();
2291        let dec_q11 = brotli_decompress(&q11).unwrap();
2292
2293        assert_eq!(dec_q10, data.to_vec(), "quality 10 roundtrip failed");
2294        assert_eq!(dec_q11, data.to_vec(), "quality 11 roundtrip failed");
2295
2296        // Both should produce non-empty compressed output.
2297        assert!(!q10.is_empty());
2298        assert!(!q11.is_empty());
2299
2300        // The auto-fallback should pick the smaller one.
2301        // We can't assert which is smaller (data-dependent), but verify the logic
2302        // by checking that auto-fallback roundtrips on real corpus files.
2303        let corpus_files = [
2304            concat!(env!("CARGO_MANIFEST_DIR"), "/../../corpus/test-api.json"),
2305            concat!(
2306                env!("CARGO_MANIFEST_DIR"),
2307                "/../../corpus/json-bench/twitter.json"
2308            ),
2309        ];
2310        for path in corpus_files {
2311            let file_data = std::fs::read(path).unwrap();
2312            let compressed =
2313                compress_to_vec(&file_data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2314            let decompressed = decompress_from_slice(&compressed).unwrap();
2315            assert_eq!(
2316                decompressed, file_data,
2317                "multi-quality roundtrip failed for {path}"
2318            );
2319        }
2320    }
2321
2322    // ─── Adversarial Regression Tests ────────────────────────────────────────
2323
2324    #[test]
2325    fn test_singleton_arrays_fast_roundtrip() {
2326        // Bug 1: NDJSON with singleton array values like [{"x":0}] caused CRC
2327        // mismatch in fast mode because typed encoding corrupted unquoted values.
2328        let rows: Vec<String> = (0..500)
2329            .map(|i| format!("{{\"items\":[{{\"x\":{}}}],\"id\":{}}}", i, i))
2330            .collect();
2331        let data = rows.join("\n") + "\n";
2332        let compressed =
2333            compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2334        let decompressed = decompress_from_slice(&compressed).unwrap();
2335        assert_eq!(
2336            decompressed,
2337            data.as_bytes(),
2338            "singleton_arrays fast mode roundtrip failed"
2339        );
2340    }
2341
2342    #[test]
2343    fn test_very_long_lines_fast_roundtrip() {
2344        // Bug 2: NDJSON with 100KB string values caused CRC mismatch because
2345        // encode_string_column used u16 for per-value lengths (max 65535).
2346        let rows: Vec<String> = (0..50)
2347            .map(|i| format!("{{\"data\":\"{}\",\"id\":{}}}", "X".repeat(100_000), i))
2348            .collect();
2349        let data = rows.join("\n") + "\n";
2350        let compressed =
2351            compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2352        let decompressed = decompress_from_slice(&compressed).unwrap();
2353        assert_eq!(
2354            decompressed,
2355            data.as_bytes(),
2356            "very_long_lines fast mode roundtrip failed"
2357        );
2358    }
2359
2360    #[test]
2361    fn test_very_long_lines_balanced_roundtrip() {
2362        // Bug 2 also affected balanced mode — the NDJSON columnar transform
2363        // itself is mode-independent, and long strings overflow u16 everywhere.
2364        let rows: Vec<String> = (0..10)
2365            .map(|i| format!("{{\"data\":\"{}\",\"id\":{}}}", "X".repeat(100_000), i))
2366            .collect();
2367        let data = rows.join("\n") + "\n";
2368        let compressed =
2369            compress_to_vec(data.as_bytes(), Mode::Balanced, Some(FormatHint::Ndjson)).unwrap();
2370        let decompressed = decompress_from_slice(&compressed).unwrap();
2371        assert_eq!(
2372            decompressed,
2373            data.as_bytes(),
2374            "very_long_lines balanced mode roundtrip failed"
2375        );
2376    }
2377
2378    #[test]
2379    fn test_all_same_value_fast_roundtrip() {
2380        // Bug 3: 10K identical rows of {"x":1} caused SIGBUS crash in fast mode.
2381        // After typed encoding, the delta-varint stream was full of 0x00 bytes.
2382        // generate_training_samples split on 0x00, creating thousands of tiny
2383        // fragments that crashed zstd dictionary training.
2384        let rows: Vec<String> = (0..10_000).map(|_| "{\"x\":1}".to_string()).collect();
2385        let data = rows.join("\n") + "\n";
2386        let compressed =
2387            compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2388        let decompressed = decompress_from_slice(&compressed).unwrap();
2389        assert_eq!(
2390            decompressed,
2391            data.as_bytes(),
2392            "all_same_value fast mode roundtrip failed"
2393        );
2394    }
2395
2396    #[test]
2397    fn test_generate_training_samples_degenerate() {
2398        // Verify that generate_training_samples falls back to fixed-size chunks
2399        // when 0x00 splitting produces degenerate samples (avg < 8 bytes).
2400        let mut data = vec![0x02u8]; // one non-zero byte
2401        data.extend_from_slice(&[0x00; 9999]); // 9999 zero bytes
2402        let samples = generate_training_samples(&data, 1024);
2403        // Must fall back to fixed-size chunks, not degenerate 0x00-split.
2404        let avg_len = samples.iter().map(|s| s.len()).sum::<usize>() / samples.len();
2405        assert!(
2406            avg_len >= 8,
2407            "training samples average size should be >= 8, got {avg_len}"
2408        );
2409    }
2410
2411    #[test]
2412    fn null_heavy_codec_roundtrip_fast() {
2413        // Regression: null-heavy NDJSON (30+ rows with all-null columns) caused CRC mismatch.
2414        // Python json.dumps produces spaces after colons: {"id": 0, "val": null}
2415        let mut data = Vec::new();
2416        for i in 0..30 {
2417            data.extend_from_slice(format!("{{\"id\": {}, \"val\": null}}\n", i).as_bytes());
2418        }
2419        let mut compressed = Vec::new();
2420        compress(&data, Mode::Fast, None, &mut compressed).unwrap();
2421        let decompressed = decompress(&mut std::io::Cursor::new(&compressed)).unwrap();
2422        assert_eq!(
2423            decompressed, data,
2424            "null-heavy 30-row fast mode roundtrip failed"
2425        );
2426    }
2427
2428    #[test]
2429    fn null_heavy_codec_roundtrip_balanced() {
2430        let mut data = Vec::new();
2431        for i in 0..30 {
2432            data.extend_from_slice(format!("{{\"id\": {}, \"val\": null}}\n", i).as_bytes());
2433        }
2434        let mut compressed = Vec::new();
2435        compress(&data, Mode::Balanced, None, &mut compressed).unwrap();
2436        let decompressed = decompress(&mut std::io::Cursor::new(&compressed)).unwrap();
2437        assert_eq!(
2438            decompressed, data,
2439            "null-heavy 30-row balanced mode roundtrip failed"
2440        );
2441    }
2442}