1use std::io::{self, Cursor, Read, Write};
10
11use crate::dcx::{DcxHeader, FormatHint, Mode};
12use crate::entropy::arithmetic::{ArithmeticDecoder, ArithmeticEncoder};
13use crate::format::transform::TransformChain;
14use crate::format::{detect_format, preprocess, reverse_preprocess};
15use crate::mixer::MetaMixer;
16use crate::model::gru_model::GruModel;
17use crate::model::{CMConfig, CMEngine};
18
19fn adaptive_fast_level(data_size: usize, level_override: Option<i32>) -> i32 {
25 if let Some(level) = level_override {
26 return level; }
28 match data_size {
29 0..=1_048_576 => 19, 1_048_577..=10_485_760 => 13, _ => 9, }
33}
34
35const DICT_MIN_DATA_SIZE: usize = 8192;
40
41fn dict_chunk_size(data_len: usize) -> usize {
47 if data_len > 4_194_304 {
48 131_072 } else if data_len > 1_048_576 {
50 65_536 } else if data_len > 262_144 {
52 32_768 } else {
54 16_384 }
56}
57
58fn dict_max_size(data_len: usize) -> usize {
62 if data_len > 4_194_304 {
63 16_384 } else if data_len > 1_048_576 {
65 8_192 } else {
67 4_096 }
69}
70
71fn generate_training_samples(data: &[u8], chunk_size: usize) -> Vec<&[u8]> {
77 let col_chunks: Vec<&[u8]> = data.split(|&b| b == 0x00).collect();
79 if col_chunks.len() >= 5 {
80 let non_empty: Vec<&[u8]> = col_chunks.into_iter().filter(|c| !c.is_empty()).collect();
81 if !non_empty.is_empty() {
87 let avg_len = non_empty.iter().map(|c| c.len()).sum::<usize>() / non_empty.len();
88 if avg_len >= 8 {
89 return non_empty;
90 }
91 }
92 }
93
94 split_into_chunks(data, chunk_size)
96}
97
98fn split_into_chunks(data: &[u8], chunk_size: usize) -> Vec<&[u8]> {
101 let mut chunks = Vec::new();
102 let mut offset = 0;
103 while offset < data.len() {
104 let end = (offset + chunk_size).min(data.len());
105 chunks.push(&data[offset..end]);
106 offset = end;
107 }
108 chunks
109}
110
111fn try_dict_compress(data: &[u8], level: i32, plain_size: usize) -> Option<Vec<u8>> {
120 let chunk_size = dict_chunk_size(data.len());
121
122 let training_samples = generate_training_samples(data, chunk_size);
124 if training_samples.len() < 5 {
125 return None;
126 }
127
128 let max_dict = dict_max_size(data.len());
129
130 let dict = zstd::dict::from_samples(&training_samples, max_dict).ok()?;
132 if dict.is_empty() {
133 return None;
134 }
135
136 let chunks = split_into_chunks(data, chunk_size);
138
139 let mut compressor = zstd::bulk::Compressor::with_dictionary(level, &dict).ok()?;
141 let mut compressed_chunks: Vec<Vec<u8>> = Vec::with_capacity(chunks.len());
142 for chunk in &chunks {
143 let cc = compressor.compress(chunk).ok()?;
144 compressed_chunks.push(cc);
145 }
146
147 let total_compressed: usize = compressed_chunks.iter().map(|c| 4 + c.len()).sum();
152 let payload_size = 4 + dict.len() + 4 + total_compressed;
153
154 if payload_size >= plain_size {
156 return None;
157 }
158
159 let mut payload = Vec::with_capacity(payload_size);
160 payload.extend_from_slice(&(dict.len() as u32).to_le_bytes());
161 payload.extend_from_slice(&dict);
162 payload.extend_from_slice(&(compressed_chunks.len() as u32).to_le_bytes());
163 for cc in &compressed_chunks {
164 payload.extend_from_slice(&(cc.len() as u32).to_le_bytes());
165 payload.extend_from_slice(cc);
166 }
167
168 Some(payload)
169}
170
171fn decompress_with_dict(payload: &[u8], capacity: usize) -> std::io::Result<Vec<u8>> {
180 if payload.len() < 4 {
181 return Err(io::Error::new(
182 io::ErrorKind::InvalidData,
183 "dict payload too short for dict_size",
184 ));
185 }
186 let mut pos = 0;
187
188 let dict_size =
190 u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
191 pos += 4;
192 if payload.len() < pos + dict_size {
193 return Err(io::Error::new(
194 io::ErrorKind::InvalidData,
195 "dict payload truncated: dictionary bytes",
196 ));
197 }
198 let dict_bytes = &payload[pos..pos + dict_size];
199 pos += dict_size;
200
201 if payload.len() < pos + 4 {
203 return Err(io::Error::new(
204 io::ErrorKind::InvalidData,
205 "dict payload truncated: num_chunks",
206 ));
207 }
208 let num_chunks =
209 u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
210 pos += 4;
211
212 let mut decompressor = zstd::bulk::Decompressor::with_dictionary(dict_bytes)
214 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
215
216 let mut output = Vec::with_capacity(capacity);
217
218 for i in 0..num_chunks {
219 if payload.len() < pos + 4 {
220 return Err(io::Error::new(
221 io::ErrorKind::InvalidData,
222 format!("dict payload truncated: chunk {i} size"),
223 ));
224 }
225 let chunk_size =
226 u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
227 pos += 4;
228 if payload.len() < pos + chunk_size {
229 return Err(io::Error::new(
230 io::ErrorKind::InvalidData,
231 format!("dict payload truncated: chunk {i} data"),
232 ));
233 }
234 let chunk_data = &payload[pos..pos + chunk_size];
235 pos += chunk_size;
236
237 let chunk_capacity = capacity.saturating_sub(output.len());
239 let decompressed = decompressor
240 .decompress(chunk_data, chunk_capacity)
241 .map_err(|e| {
242 io::Error::new(
243 io::ErrorKind::InvalidData,
244 format!("chunk {i} decompress failed: {e}"),
245 )
246 })?;
247 output.extend_from_slice(&decompressed);
248 }
249
250 Ok(output)
251}
252
253const BROTLI_MODE_GENERIC: u32 = 0;
259const BROTLI_MODE_TEXT: u32 = 1;
260
261fn brotli_compress(data: &[u8], quality: u32, mode: u32) -> io::Result<Vec<u8>> {
264 use brotli::enc::backward_references::BrotliEncoderMode;
265 let mut output = Vec::new();
266 let brotli_mode = match mode {
267 1 => BrotliEncoderMode::BROTLI_MODE_TEXT,
268 _ => BrotliEncoderMode::BROTLI_MODE_GENERIC,
269 };
270 let params = brotli::enc::BrotliEncoderParams {
271 quality: quality as i32,
272 mode: brotli_mode,
273 ..Default::default()
274 };
275 brotli::BrotliCompress(&mut io::Cursor::new(data), &mut output, ¶ms)?;
276 Ok(output)
277}
278
279fn brotli_decompress(data: &[u8]) -> io::Result<Vec<u8>> {
281 let mut output = Vec::new();
282 brotli::BrotliDecompress(&mut io::Cursor::new(data), &mut output)?;
283 Ok(output)
284}
285
286fn cm_compress(data: &[u8], config: CMConfig) -> Vec<u8> {
289 let mut engine = CMEngine::with_config(config);
290 let mut encoder = ArithmeticEncoder::new();
291
292 for &byte in data {
293 for bpos in 0..8 {
294 let bit = (byte >> (7 - bpos)) & 1;
295 let p = engine.predict();
296 encoder.encode(bit, p);
297 engine.update(bit);
298 }
299 }
300
301 encoder.finish()
302}
303
304fn cm_decompress(compressed: &[u8], original_size: usize, config: CMConfig) -> Vec<u8> {
307 let mut engine = CMEngine::with_config(config);
308 let mut decoder = ArithmeticDecoder::new(compressed);
309 let mut output = Vec::with_capacity(original_size);
310
311 for _ in 0..original_size {
312 let mut byte_val: u8 = 0;
313 for bpos in 0..8 {
314 let p = engine.predict();
315 let bit = decoder.decode(p);
316 engine.update(bit);
317 byte_val |= bit << (7 - bpos);
318 }
319 output.push(byte_val);
320 }
321
322 output
323}
324
325fn gru_compress(data: &[u8], config: CMConfig) -> Vec<u8> {
333 let mut engine = CMEngine::with_config(config);
334 let mut gru = GruModel::new();
335 let mut meta_mixer = MetaMixer::new(12); let mut encoder = ArithmeticEncoder::new();
337
338 let total_bytes = data.len();
339 let report_interval = if total_bytes > 100_000 {
340 total_bytes / 20
341 } else {
342 0
343 };
344
345 for (byte_idx, &byte) in data.iter().enumerate() {
346 for bpos in 0..8u8 {
347 let bit = (byte >> (7 - bpos)) & 1;
348
349 let p_cm = engine.predict();
351
352 let partial = if bpos == 0 {
354 1u32
355 } else {
356 let mut p = 1u32;
357 for prev_bpos in 0..bpos {
358 let prev_bit = (byte >> (7 - prev_bpos)) & 1;
359 p = (p << 1) | prev_bit as u32;
360 }
361 p
362 };
363 let p_gru = gru.predict_bit(bpos, partial);
364
365 let p_final = meta_mixer.blend(p_cm, p_gru);
367
368 encoder.encode(bit, p_final);
369 engine.update(bit);
370 meta_mixer.update(bit);
371 }
372
373 gru.train(byte);
375 gru.forward(byte);
376
377 if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
378 let pct = (byte_idx + 1) * 100 / total_bytes;
379 eprint!("\r[gru] compressing... {pct}%");
380 }
381 }
382
383 if total_bytes > 100_000 {
384 eprintln!("\r[gru] compressing... 100%");
385 }
386
387 encoder.finish()
388}
389
390fn gru_decompress(compressed: &[u8], original_size: usize, config: CMConfig) -> Vec<u8> {
393 let mut engine = CMEngine::with_config(config);
394 let mut gru = GruModel::new();
395 let mut meta_mixer = MetaMixer::new(12); let mut decoder = ArithmeticDecoder::new(compressed);
397 let mut output = Vec::with_capacity(original_size);
398
399 let report_interval = if original_size > 100_000 {
400 original_size / 20
401 } else {
402 0
403 };
404
405 for byte_idx in 0..original_size {
406 let mut byte_val: u8 = 0;
407
408 for bpos in 0..8u8 {
409 let p_cm = engine.predict();
411
412 let partial = if bpos == 0 {
414 1u32
415 } else {
416 let mut p = 1u32;
417 for prev_bpos in 0..bpos {
418 let prev_bit = (byte_val >> (7 - prev_bpos)) & 1;
419 p = (p << 1) | prev_bit as u32;
420 }
421 p
422 };
423 let p_gru = gru.predict_bit(bpos, partial);
424
425 let p_final = meta_mixer.blend(p_cm, p_gru);
427
428 let bit = decoder.decode(p_final);
429 engine.update(bit);
430 meta_mixer.update(bit);
431 byte_val |= bit << (7 - bpos);
432 }
433
434 output.push(byte_val);
435
436 gru.train(byte_val);
438 gru.forward(byte_val);
439
440 if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
441 let pct = (byte_idx + 1) * 100 / original_size;
442 eprint!("\r[gru] decompressing... {pct}%");
443 }
444 }
445
446 if original_size > 100_000 {
447 eprintln!("\r[gru] decompressing... 100%");
448 }
449
450 output
451}
452
453#[cfg(feature = "neural")]
461fn neural_compress(
462 data: &[u8],
463 config: CMConfig,
464 llm: &mut datacortex_neural::LlmPredictor,
465 meta_mixer: &mut datacortex_neural::MetaMixer,
466) -> Vec<u8> {
467 let mut engine = CMEngine::with_config(config);
468 let mut encoder = ArithmeticEncoder::new();
469
470 let total_bytes = data.len();
476 let mut bytes_processed = 0;
477 let report_interval = total_bytes / 20; for (byte_idx, &byte) in data.iter().enumerate() {
480 for bpos in 0..8u8 {
484 let bit = (byte >> (7 - bpos)) & 1;
485
486 let p_cm = engine.predict();
488
489 let partial = if bpos == 0 {
492 1u32
493 } else {
494 let mut p = 1u32;
496 for prev_bpos in 0..bpos {
497 let prev_bit = (byte >> (7 - prev_bpos)) & 1;
498 p = (p << 1) | prev_bit as u32;
499 }
500 p
501 };
502 let p_llm = llm.predict_bit(bpos, partial);
503
504 let p_final = meta_mixer.blend(p_cm, p_llm);
506
507 encoder.encode(bit, p_final);
508 engine.update(bit);
509 meta_mixer.update(bit);
510 }
511
512 if let Err(e) = llm.predict_byte_probs(byte) {
514 if byte_idx < 5 {
516 eprintln!("[neural] LLM predict error at byte {byte_idx}: {e}");
517 }
518 }
519
520 bytes_processed += 1;
521 if report_interval > 0 && bytes_processed % report_interval == 0 {
522 let pct = bytes_processed * 100 / total_bytes;
523 eprint!("\r[neural] compressing... {pct}%");
524 }
525 }
526
527 if total_bytes > 1000 {
528 eprintln!("\r[neural] compressing... 100%");
529 }
530
531 encoder.finish()
532}
533
534#[cfg(feature = "neural")]
537fn neural_decompress(
538 compressed: &[u8],
539 original_size: usize,
540 config: CMConfig,
541 llm: &mut datacortex_neural::LlmPredictor,
542 meta_mixer: &mut datacortex_neural::MetaMixer,
543) -> Vec<u8> {
544 let mut engine = CMEngine::with_config(config);
545 let mut decoder = ArithmeticDecoder::new(compressed);
546 let mut output = Vec::with_capacity(original_size);
547
548 let report_interval = if original_size > 0 {
549 original_size / 20
550 } else {
551 1
552 };
553
554 for byte_idx in 0..original_size {
555 let mut byte_val: u8 = 0;
556
557 for bpos in 0..8u8 {
558 let p_cm = engine.predict();
560
561 let partial = if bpos == 0 {
563 1u32
564 } else {
565 let mut p = 1u32;
567 for prev_bpos in 0..bpos {
568 let prev_bit = (byte_val >> (7 - prev_bpos)) & 1;
569 p = (p << 1) | prev_bit as u32;
570 }
571 p
572 };
573 let p_llm = llm.predict_bit(bpos, partial);
574
575 let p_final = meta_mixer.blend(p_cm, p_llm);
577
578 let bit = decoder.decode(p_final);
579 engine.update(bit);
580 meta_mixer.update(bit);
581 byte_val |= bit << (7 - bpos);
582 }
583
584 output.push(byte_val);
585
586 if let Err(e) = llm.predict_byte_probs(byte_val) {
588 if byte_idx < 5 {
589 eprintln!("[neural] LLM predict error at byte {byte_idx}: {e}");
590 }
591 }
592
593 if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
594 let pct = (byte_idx + 1) * 100 / original_size;
595 eprint!("\r[neural] decompressing... {pct}%");
596 }
597 }
598
599 if original_size > 1000 {
600 eprintln!("\r[neural] decompressing... 100%");
601 }
602
603 output
604}
605
606fn cm_config_for_mode(mode: Mode) -> CMConfig {
608 match mode {
609 Mode::Max => CMConfig::max(),
610 Mode::Balanced => CMConfig::balanced(),
611 Mode::Fast => CMConfig::balanced(), }
613}
614
615#[cfg(feature = "neural")]
620fn resolve_model_path(explicit: Option<&str>) -> Option<String> {
621 if let Some(p) = explicit {
622 if std::path::Path::new(p).exists() {
623 return Some(p.to_string());
624 }
625 eprintln!("[neural] explicit model path not found: {p}");
626 return None;
627 }
628
629 if let Ok(p) = std::env::var("DATACORTEX_MODEL") {
630 if p.is_empty() {
631 return None;
633 }
634 if std::path::Path::new(&p).exists() {
635 return Some(p);
636 }
637 eprintln!("[neural] DATACORTEX_MODEL path not found: {p}");
638 return None; }
640
641 if let Some(home) = std::env::var_os("HOME") {
643 let default = format!(
644 "{}/.datacortex/models/SmolLM2-135M-Instruct-Q8_0.gguf",
645 home.to_string_lossy()
646 );
647 if std::path::Path::new(&default).exists() {
648 return Some(default);
649 }
650 }
651
652 None
653}
654
655pub fn compress<W: Write>(
657 data: &[u8],
658 mode: Mode,
659 format_override: Option<FormatHint>,
660 output: &mut W,
661) -> io::Result<()> {
662 compress_with_model(data, mode, format_override, None, output)
663}
664
665pub fn compress_with_model<W: Write>(
667 data: &[u8],
668 mode: Mode,
669 format_override: Option<FormatHint>,
670 model_path: Option<&str>,
671 output: &mut W,
672) -> io::Result<()> {
673 compress_with_options(data, mode, format_override, model_path, None, output)
674}
675
676pub fn compress_with_options<W: Write>(
678 data: &[u8],
679 mode: Mode,
680 format_override: Option<FormatHint>,
681 model_path: Option<&str>,
682 zstd_level_override: Option<i32>,
683 output: &mut W,
684) -> io::Result<()> {
685 let format_hint = format_override.unwrap_or_else(|| detect_format(data));
686 let crc = crc32fast::hash(data);
687
688 let (preprocessed, chain) = preprocess(data, format_hint, mode);
690 let transform_metadata = if chain.is_empty() {
691 vec![]
692 } else {
693 chain.serialize()
694 };
695
696 let mut use_dict = false;
698 let mut use_brotli = false;
699 let mut use_raw_fallback = false;
701 let mut use_meta_embedded = false;
703 let compressed = match mode {
704 Mode::Fast => {
720 use std::sync::Mutex;
724
725 let level = adaptive_fast_level(preprocessed.len(), zstd_level_override);
726 let raw_level = adaptive_fast_level(data.len(), zstd_level_override);
727
728 let meta_size_for_comparison = if transform_metadata.len() > 64 {
730 let compressed_meta = zstd::bulk::compress(&transform_metadata, 19)
731 .unwrap_or_else(|_| transform_metadata.clone());
732 compressed_meta.len().min(transform_metadata.len())
733 } else {
734 transform_metadata.len()
735 };
736
737 let embedded_payload = if !transform_metadata.is_empty() {
739 let mut ep = Vec::with_capacity(4 + transform_metadata.len() + preprocessed.len());
740 ep.extend_from_slice(&(transform_metadata.len() as u32).to_le_bytes());
741 ep.extend_from_slice(&transform_metadata);
742 ep.extend_from_slice(&preprocessed);
743 Some(ep)
744 } else {
745 None
746 };
747
748 type PathResult = (Vec<u8>, usize, bool, bool, bool, bool);
750 let results = Mutex::new(Vec::<PathResult>::with_capacity(8));
751
752 rayon::scope(|s| {
753 s.spawn(|_| {
755 if let Ok(plain) = zstd::bulk::compress(&preprocessed, level) {
756 let (compressed, is_dict) = if preprocessed.len() >= DICT_MIN_DATA_SIZE {
757 if let Some(dict_payload) =
758 try_dict_compress(&preprocessed, level, plain.len())
759 {
760 (dict_payload, true)
761 } else {
762 (plain, false)
763 }
764 } else {
765 (plain, false)
766 };
767 let total = 32 + meta_size_for_comparison + compressed.len();
768 results
769 .lock()
770 .unwrap()
771 .push((compressed, total, is_dict, false, false, false));
772 }
773 });
774
775 s.spawn(|_| {
777 if let Ok(compressed) = zstd::bulk::compress(data, raw_level) {
778 let total = 32 + compressed.len();
779 results
780 .lock()
781 .unwrap()
782 .push((compressed, total, false, true, false, false));
783 }
784 });
785
786 s.spawn(|_| {
788 let q = if data.len() <= 1_048_576 { 11 } else { 9 };
789 if let Ok(compressed) = brotli_compress(data, q, BROTLI_MODE_TEXT) {
790 let total = 32 + compressed.len();
791 results
792 .lock()
793 .unwrap()
794 .push((compressed, total, false, true, true, false));
795 }
796 });
797
798 s.spawn(|_| {
800 let max_q = if preprocessed.len() <= 1_048_576 {
801 11
802 } else {
803 9
804 };
805 let qualities: &[u32] = if max_q == 11 {
806 &[11, 10]
807 } else {
808 &[max_q as u32]
809 };
810 let mut best: Option<PathResult> = None;
811 for &q in qualities {
812 if let Ok(compressed) =
813 brotli_compress(&preprocessed, q, BROTLI_MODE_GENERIC)
814 {
815 let total = 32 + meta_size_for_comparison + compressed.len();
816 if best.as_ref().is_none_or(|b| total < b.1) {
817 best = Some((compressed, total, false, false, true, false));
818 }
819 }
820 }
821 if let Some(r) = best {
822 results.lock().unwrap().push(r);
823 }
824 });
825
826 if let Some(ref ep) = embedded_payload {
828 s.spawn(|_| {
829 let max_q = if ep.len() <= 1_048_576 { 11 } else { 9 };
830 let qualities: &[u32] = if max_q == 11 {
831 &[11, 10]
832 } else {
833 &[max_q as u32]
834 };
835 let mut best: Option<PathResult> = None;
836 for &q in qualities {
837 if let Ok(compressed) = brotli_compress(ep, q, BROTLI_MODE_GENERIC) {
838 let total = 32 + compressed.len();
839 if best.as_ref().is_none_or(|b| total < b.1) {
840 best = Some((compressed, total, false, false, true, true));
841 }
842 }
843 }
844 if let Some(r) = best {
845 results.lock().unwrap().push(r);
846 }
847 });
848 }
849
850 if let Some(ref ep) = embedded_payload {
852 s.spawn(|_| {
853 let embed_level = adaptive_fast_level(ep.len(), zstd_level_override);
854 if let Ok(compressed) = zstd::bulk::compress(ep, embed_level) {
855 let total = 32 + compressed.len();
856 results
857 .lock()
858 .unwrap()
859 .push((compressed, total, false, false, false, true));
860 }
861 });
862 }
863 });
864
865 let results = results.into_inner().unwrap();
867 let best = results
868 .into_iter()
869 .min_by_key(|r| r.1)
870 .ok_or_else(|| io::Error::other("all compression paths failed"))?;
871
872 use_dict = best.2;
873 use_raw_fallback = best.3;
874 use_brotli = best.4;
875 use_meta_embedded = best.5;
876 best.0
877 }
878 Mode::Balanced => {
880 let config = cm_config_for_mode(mode);
881 let cm_data = gru_compress(&preprocessed, config);
882 let mut payload = Vec::with_capacity(8 + cm_data.len());
883 payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
884 payload.extend_from_slice(&cm_data);
885 payload
886 }
887 Mode::Max => {
889 let config = cm_config_for_mode(mode);
890
891 #[cfg(feature = "neural")]
892 {
893 if let Some(mpath) = resolve_model_path(model_path) {
894 match datacortex_neural::LlmPredictor::new(&mpath) {
895 Ok(mut llm) => {
896 let mut meta_mixer = datacortex_neural::MetaMixer::new(5);
897 eprintln!(
898 "[neural] Max mode: dual-path CM+LLM ({} bytes mapped)",
899 llm.mapped_bytes()
900 );
901 let cm_data =
902 neural_compress(&preprocessed, config, &mut llm, &mut meta_mixer);
903 let mut payload = Vec::with_capacity(8 + cm_data.len());
904 let size_with_flag = preprocessed.len() as u64 | (1u64 << 63);
907 payload.extend_from_slice(&size_with_flag.to_le_bytes());
908 payload.extend_from_slice(&cm_data);
909 payload
910 }
911 Err(e) => {
912 eprintln!("[neural] LLM init failed, falling back to CM-only: {e}");
913 let cm_data = cm_compress(&preprocessed, config);
914 let mut payload = Vec::with_capacity(8 + cm_data.len());
915 payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
916 payload.extend_from_slice(&cm_data);
917 payload
918 }
919 }
920 } else {
921 eprintln!(
922 "[neural] no model found, Max mode using CM-only. \
923 Set DATACORTEX_MODEL or use --model-path."
924 );
925 let cm_data = cm_compress(&preprocessed, config);
926 let mut payload = Vec::with_capacity(8 + cm_data.len());
927 payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
928 payload.extend_from_slice(&cm_data);
929 payload
930 }
931 }
932
933 #[cfg(not(feature = "neural"))]
934 {
935 let _ = model_path; let cm_data = cm_compress(&preprocessed, config);
937 let mut payload = Vec::with_capacity(8 + cm_data.len());
938 payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
939 payload.extend_from_slice(&cm_data);
940 payload
941 }
942 }
943 };
944
945 let final_metadata = if use_raw_fallback || use_meta_embedded {
949 vec![]
950 } else {
951 transform_metadata
952 };
953
954 let (header_metadata, meta_compressed) = if final_metadata.len() > 64 {
958 let compressed_meta =
959 zstd::bulk::compress(&final_metadata, 19).unwrap_or_else(|_| final_metadata.clone());
960 if compressed_meta.len() < final_metadata.len() {
961 (compressed_meta, true)
962 } else {
963 (final_metadata, false)
964 }
965 } else {
966 (final_metadata, false)
967 };
968
969 let header = DcxHeader {
970 mode,
971 format_hint,
972 original_size: data.len() as u64,
973 compressed_size: compressed.len() as u64,
974 crc32: crc,
975 transform_metadata: header_metadata,
976 has_dict: use_dict,
977 meta_compressed,
978 use_brotli,
979 meta_embedded: use_meta_embedded,
980 };
981
982 header.write_to(output)?;
983 output.write_all(&compressed)?;
984
985 Ok(())
986}
987
988pub fn decompress<R: Read>(input: &mut R) -> io::Result<Vec<u8>> {
990 decompress_with_model(input, None)
991}
992
993pub fn decompress_with_model<R: Read>(
995 input: &mut R,
996 model_path: Option<&str>,
997) -> io::Result<Vec<u8>> {
998 let header = DcxHeader::read_from(input)?;
999
1000 let mut compressed = vec![0u8; header.compressed_size as usize];
1001 input.read_exact(&mut compressed)?;
1002
1003 let preprocessed = match header.mode {
1005 Mode::Fast => {
1006 if header.use_brotli {
1007 brotli_decompress(&compressed)?
1008 } else {
1009 let capacity = header.original_size as usize * 2 + 65536;
1010 if header.has_dict {
1011 decompress_with_dict(&compressed, capacity)?
1012 } else {
1013 zstd::bulk::decompress(&compressed, capacity)
1014 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
1015 }
1016 }
1017 }
1018 Mode::Balanced => {
1019 if compressed.len() < 8 {
1021 return Err(io::Error::new(
1022 io::ErrorKind::InvalidData,
1023 "CM mode compressed data too short",
1024 ));
1025 }
1026 let size_raw = u64::from_le_bytes(compressed[..8].try_into().expect("8-byte slice"));
1027 let preprocessed_size = (size_raw & !(1u64 << 63)) as usize;
1028 let config = cm_config_for_mode(header.mode);
1029 gru_decompress(&compressed[8..], preprocessed_size, config)
1030 }
1031 Mode::Max => {
1032 if compressed.len() < 8 {
1034 return Err(io::Error::new(
1035 io::ErrorKind::InvalidData,
1036 "CM mode compressed data too short",
1037 ));
1038 }
1039 let size_raw = u64::from_le_bytes(compressed[..8].try_into().expect("8-byte slice"));
1040
1041 let neural_flag = size_raw & (1u64 << 63) != 0;
1043 let preprocessed_size = (size_raw & !(1u64 << 63)) as usize;
1044 let config = cm_config_for_mode(header.mode);
1045
1046 if neural_flag {
1047 #[cfg(feature = "neural")]
1048 {
1049 if let Some(mpath) = resolve_model_path(model_path) {
1050 match datacortex_neural::LlmPredictor::new(&mpath) {
1051 Ok(mut llm) => {
1052 let mut meta_mixer = datacortex_neural::MetaMixer::new(5);
1053 eprintln!(
1054 "[neural] decompressing with dual-path CM+LLM ({} bytes mapped)",
1055 llm.mapped_bytes()
1056 );
1057 neural_decompress(
1058 &compressed[8..],
1059 preprocessed_size,
1060 config,
1061 &mut llm,
1062 &mut meta_mixer,
1063 )
1064 }
1065 Err(e) => {
1066 return Err(io::Error::new(
1067 io::ErrorKind::Other,
1068 format!(
1069 "file was compressed with neural mode but LLM failed to load: {e}"
1070 ),
1071 ));
1072 }
1073 }
1074 } else {
1075 return Err(io::Error::new(
1076 io::ErrorKind::Other,
1077 "file was compressed with neural mode but no model found. \
1078 Set DATACORTEX_MODEL or use --model-path.",
1079 ));
1080 }
1081 }
1082
1083 #[cfg(not(feature = "neural"))]
1084 {
1085 let _ = model_path;
1086 return Err(io::Error::other(
1087 "file was compressed with neural mode but this build lacks the \
1088 `neural` feature. Rebuild with --features neural.",
1089 ));
1090 }
1091 } else {
1092 cm_decompress(&compressed[8..], preprocessed_size, config)
1093 }
1094 }
1095 };
1096
1097 let (preprocessed, transform_metadata) = if header.meta_embedded {
1102 if preprocessed.len() < 4 {
1103 return Err(io::Error::new(
1104 io::ErrorKind::InvalidData,
1105 "embedded metadata: decompressed stream too short for meta_len",
1106 ));
1107 }
1108 let meta_len =
1109 u32::from_le_bytes(preprocessed[0..4].try_into().expect("4-byte slice")) as usize;
1110 if preprocessed.len() < 4 + meta_len {
1111 return Err(io::Error::new(
1112 io::ErrorKind::InvalidData,
1113 format!(
1114 "embedded metadata: stream too short for metadata ({} bytes needed, {} available)",
1115 4 + meta_len,
1116 preprocessed.len()
1117 ),
1118 ));
1119 }
1120 let metadata = preprocessed[4..4 + meta_len].to_vec();
1121 let actual_preprocessed = preprocessed[4 + meta_len..].to_vec();
1122 (actual_preprocessed, metadata)
1123 } else {
1124 let tm = if header.meta_compressed && !header.transform_metadata.is_empty() {
1127 let mut decoder =
1128 zstd::Decoder::new(Cursor::new(&header.transform_metadata)).map_err(|e| {
1129 io::Error::new(
1130 io::ErrorKind::InvalidData,
1131 format!("failed to init metadata decompressor: {e}"),
1132 )
1133 })?;
1134 let mut decompressed_meta = Vec::new();
1135 decoder.read_to_end(&mut decompressed_meta).map_err(|e| {
1136 io::Error::new(
1137 io::ErrorKind::InvalidData,
1138 format!("failed to decompress transform metadata: {e}"),
1139 )
1140 })?;
1141 decompressed_meta
1142 } else {
1143 header.transform_metadata.clone()
1144 };
1145 (preprocessed, tm)
1146 };
1147
1148 let data = if transform_metadata.is_empty() {
1150 preprocessed
1151 } else {
1152 let chain = TransformChain::deserialize(&transform_metadata)?;
1153 reverse_preprocess(&preprocessed, &chain)
1154 };
1155
1156 let crc = crc32fast::hash(&data);
1158 if crc != header.crc32 {
1159 return Err(io::Error::new(
1160 io::ErrorKind::InvalidData,
1161 format!(
1162 "CRC-32 mismatch: expected {:#010X}, got {:#010X}",
1163 header.crc32, crc
1164 ),
1165 ));
1166 }
1167
1168 if data.len() as u64 != header.original_size {
1169 return Err(io::Error::new(
1170 io::ErrorKind::InvalidData,
1171 format!(
1172 "size mismatch: header says {} bytes, got {}",
1173 header.original_size,
1174 data.len()
1175 ),
1176 ));
1177 }
1178
1179 Ok(data)
1180}
1181
1182pub fn compress_to_vec(
1184 data: &[u8],
1185 mode: Mode,
1186 format_override: Option<FormatHint>,
1187) -> io::Result<Vec<u8>> {
1188 let mut buf = Vec::new();
1189 compress(data, mode, format_override, &mut buf)?;
1190 Ok(buf)
1191}
1192
1193pub fn compress_to_vec_with_model(
1195 data: &[u8],
1196 mode: Mode,
1197 format_override: Option<FormatHint>,
1198 model_path: Option<&str>,
1199) -> io::Result<Vec<u8>> {
1200 let mut buf = Vec::new();
1201 compress_with_model(data, mode, format_override, model_path, &mut buf)?;
1202 Ok(buf)
1203}
1204
1205pub fn compress_to_vec_with_options(
1207 data: &[u8],
1208 mode: Mode,
1209 format_override: Option<FormatHint>,
1210 model_path: Option<&str>,
1211 zstd_level_override: Option<i32>,
1212) -> io::Result<Vec<u8>> {
1213 let mut buf = Vec::new();
1214 compress_with_options(
1215 data,
1216 mode,
1217 format_override,
1218 model_path,
1219 zstd_level_override,
1220 &mut buf,
1221 )?;
1222 Ok(buf)
1223}
1224
1225pub fn decompress_from_slice(dcx_data: &[u8]) -> io::Result<Vec<u8>> {
1227 let mut cursor = Cursor::new(dcx_data);
1228 decompress(&mut cursor)
1229}
1230
1231pub fn read_header<R: Read>(input: &mut R) -> io::Result<DcxHeader> {
1233 DcxHeader::read_from(input)
1234}
1235
1236pub fn raw_zstd_compress(data: &[u8], level: i32) -> io::Result<Vec<u8>> {
1238 zstd::bulk::compress(data, level).map_err(io::Error::other)
1239}
1240
1241#[cfg(test)]
1242mod tests {
1243 use super::*;
1244
1245 #[test]
1246 fn fast_mode_roundtrip() {
1247 let original = b"Hello, DataCortex! This is a test of Fast mode compression.";
1248 let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1249 let decompressed = decompress_from_slice(&compressed).unwrap();
1250 assert_eq!(decompressed, original);
1251 }
1252
1253 #[test]
1254 fn fast_mode_json_roundtrip() {
1255 let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25,"name":"Carol","age":35}"#;
1256 let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1257 let decompressed = decompress_from_slice(&compressed).unwrap();
1258 assert_eq!(decompressed, data.to_vec());
1259 }
1260
1261 #[test]
1262 fn balanced_mode_roundtrip() {
1263 let original = b"Balanced mode test data with some content.";
1264 let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1265 let decompressed = decompress_from_slice(&compressed).unwrap();
1266 assert_eq!(decompressed, original);
1267 }
1268
1269 #[test]
1270 fn balanced_mode_longer_text() {
1271 let original = b"The quick brown fox jumps over the lazy dog. This sentence contains every letter of the English alphabet at least once. We need enough data to properly exercise the arithmetic coder and order-0 model.";
1272 let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1273 let decompressed = decompress_from_slice(&compressed).unwrap();
1274 assert_eq!(decompressed, original);
1275 }
1276
1277 #[test]
1278 fn balanced_mode_repetitive_data() {
1279 let data = "hello world! ".repeat(100);
1280 let compressed = compress_to_vec(data.as_bytes(), Mode::Balanced, None).unwrap();
1281 let decompressed = decompress_from_slice(&compressed).unwrap();
1282 assert_eq!(decompressed, data.as_bytes());
1283 }
1284
1285 #[test]
1286 fn balanced_mode_all_byte_values() {
1287 let original: Vec<u8> = (0..=255).collect();
1288 let compressed = compress_to_vec(&original, Mode::Balanced, None).unwrap();
1289 let decompressed = decompress_from_slice(&compressed).unwrap();
1290 assert_eq!(decompressed, original);
1291 }
1292
1293 #[test]
1294 fn balanced_mode_single_byte() {
1295 let original = b"X";
1296 let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1297 let decompressed = decompress_from_slice(&compressed).unwrap();
1298 assert_eq!(decompressed, original);
1299 }
1300
1301 #[test]
1302 fn balanced_mode_json_roundtrip() {
1303 let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25,"name":"Carol","age":35}"#;
1304 let compressed = compress_to_vec(data, Mode::Balanced, Some(FormatHint::Json)).unwrap();
1305 let decompressed = decompress_from_slice(&compressed).unwrap();
1306 assert_eq!(decompressed, data.to_vec());
1307 }
1308
1309 #[test]
1310 fn empty_data_roundtrip() {
1311 let original = b"";
1312 for mode in [Mode::Fast, Mode::Balanced, Mode::Max] {
1313 let compressed = compress_to_vec(original, mode, None).unwrap();
1314 let decompressed = decompress_from_slice(&compressed).unwrap();
1315 assert_eq!(decompressed, original, "failed for mode {mode}");
1316 }
1317 }
1318
1319 #[test]
1320 fn crc_mismatch_detected() {
1321 let original = b"test data for CRC check";
1322 let mut compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1323 let header_size = 32; if compressed.len() > header_size + 5 {
1326 compressed[header_size + 3] ^= 0xFF;
1327 }
1328 assert!(decompress_from_slice(&compressed).is_err());
1329 }
1330
1331 #[test]
1332 fn fast_mode_actually_compresses() {
1333 let data = "hello world. ".repeat(100);
1335 let compressed = compress_to_vec(data.as_bytes(), Mode::Fast, None).unwrap();
1336 assert!(
1337 compressed.len() < data.len(),
1338 "Fast mode should compress repetitive data: {} vs {}",
1339 compressed.len(),
1340 data.len()
1341 );
1342 }
1343
1344 #[test]
1345 fn json_preprocessing_improves_fast_mode() {
1346 let data = br#"[{"name":"Alice","score":95},{"name":"Bob","score":87},{"name":"Carol","score":92},{"name":"Dave","score":88},{"name":"Eve","score":91}]"#;
1347 let with_preprocess = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1348 let without_preprocess =
1349 compress_to_vec(data, Mode::Fast, Some(FormatHint::Generic)).unwrap();
1350
1351 assert_eq!(
1353 decompress_from_slice(&with_preprocess).unwrap(),
1354 data.to_vec()
1355 );
1356 assert_eq!(
1357 decompress_from_slice(&without_preprocess).unwrap(),
1358 data.to_vec()
1359 );
1360 }
1361
1362 #[test]
1363 fn all_modes_roundtrip() {
1364 let data = b"test all modes with some more content to ensure decent compression";
1365 for mode in [Mode::Max, Mode::Balanced, Mode::Fast] {
1366 let compressed = compress_to_vec(data, mode, None).unwrap();
1367 let decompressed = decompress_from_slice(&compressed).unwrap();
1368 assert_eq!(decompressed, data, "failed for mode {mode}");
1369 }
1370 }
1371
1372 #[test]
1373 fn cm_compress_decompress_direct() {
1374 let data = b"Hello, World! This is a direct CM test.";
1375 let compressed = cm_compress(data, CMConfig::balanced());
1376 let decompressed = cm_decompress(&compressed, data.len(), CMConfig::balanced());
1377 assert_eq!(decompressed, data.to_vec());
1378 }
1379
1380 #[test]
1381 fn cm_empty() {
1382 let data: &[u8] = b"";
1383 let compressed = cm_compress(data, CMConfig::balanced());
1384 let decompressed = cm_decompress(&compressed, 0, CMConfig::balanced());
1385 assert!(decompressed.is_empty());
1386 }
1387
1388 #[test]
1389 fn cm_single_byte() {
1390 for byte in 0..=255u8 {
1391 let data = [byte];
1392 let compressed = cm_compress(&data, CMConfig::balanced());
1393 let decompressed = cm_decompress(&compressed, 1, CMConfig::balanced());
1394 assert_eq!(
1395 decompressed, data,
1396 "CM roundtrip failed for byte {byte:#04X}"
1397 );
1398 }
1399 }
1400
1401 #[test]
1402 fn cm_repetitive_compresses() {
1403 let data = vec![b'A'; 1000];
1404 let compressed = cm_compress(&data, CMConfig::balanced());
1405 assert!(
1407 compressed.len() < 200,
1408 "CM should compress 1000 identical bytes well: {} bytes",
1409 compressed.len()
1410 );
1411 let decompressed = cm_decompress(&compressed, data.len(), CMConfig::balanced());
1412 assert_eq!(decompressed, data);
1413 }
1414
1415 #[test]
1416 fn max_mode_roundtrip() {
1417 let original = b"Max mode test data with some content for compression.";
1418 let compressed = compress_to_vec(original, Mode::Max, None).unwrap();
1419 let decompressed = decompress_from_slice(&compressed).unwrap();
1420 assert_eq!(decompressed, original);
1421 }
1422
1423 #[test]
1424 fn max_mode_longer_text() {
1425 let original = b"The quick brown fox jumps over the lazy dog. Max mode uses 2x context maps for better predictions with fewer hash collisions. This should compress slightly better than balanced mode.";
1426 let compressed = compress_to_vec(original, Mode::Max, None).unwrap();
1427 let decompressed = decompress_from_slice(&compressed).unwrap();
1428 assert_eq!(decompressed, original);
1429 }
1430
1431 #[test]
1434 fn test_dict_compress_roundtrip() {
1435 let mut ndjson = String::new();
1438 for i in 0..500 {
1439 ndjson.push_str(&format!(
1440 r#"{{"id":{},"name":"user_{}","status":"active","score":{}}}"#,
1441 i,
1442 i,
1443 i * 17 % 100
1444 ));
1445 ndjson.push('\n');
1446 }
1447 let data = ndjson.as_bytes();
1448 assert!(
1449 data.len() > DICT_MIN_DATA_SIZE,
1450 "test data should exceed dict threshold: {} bytes",
1451 data.len()
1452 );
1453
1454 let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1455 let decompressed = decompress_from_slice(&compressed).unwrap();
1456 assert_eq!(
1457 decompressed, data,
1458 "dict compress roundtrip: byte-exact mismatch"
1459 );
1460 }
1461
1462 #[test]
1463 fn test_dict_falls_back_on_small() {
1464 let data = b"small data that won't trigger dictionary training";
1466 assert!(data.len() < DICT_MIN_DATA_SIZE);
1467
1468 let compressed = compress_to_vec(data, Mode::Fast, None).unwrap();
1469 let decompressed = decompress_from_slice(&compressed).unwrap();
1470 assert_eq!(decompressed, data.to_vec());
1471
1472 let mut cursor = Cursor::new(&compressed);
1474 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1475 assert!(!header.has_dict, "small data should not have dict flag set");
1476 }
1477
1478 #[test]
1479 fn test_dict_backward_compat() {
1480 let original = b"backward compatibility test data for decompression";
1483 let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1484
1485 let mut cursor = Cursor::new(&compressed);
1487 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1488 assert!(!header.has_dict);
1489
1490 let decompressed = decompress_from_slice(&compressed).unwrap();
1492 assert_eq!(decompressed, original.to_vec());
1493 }
1494
1495 #[test]
1496 fn test_dict_ndjson_large_roundtrip() {
1497 let mut ndjson = String::new();
1499 for i in 0..2000 {
1500 ndjson.push_str(&format!(
1501 r#"{{"timestamp":"2025-01-{:02}T{:02}:{:02}:00Z","level":"info","message":"Request processed","request_id":"req_{}","duration_ms":{}}}"#,
1502 (i % 28) + 1,
1503 i % 24,
1504 i % 60,
1505 i,
1506 (i * 13) % 500
1507 ));
1508 ndjson.push('\n');
1509 }
1510 let data = ndjson.as_bytes();
1511
1512 let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1513 let decompressed = decompress_from_slice(&compressed).unwrap();
1514 assert_eq!(decompressed, data, "large NDJSON roundtrip mismatch");
1515 }
1516
1517 #[test]
1518 fn test_dict_generic_data_roundtrip() {
1519 let mut data = Vec::new();
1522 for i in 0..3000 {
1523 data.extend_from_slice(
1524 format!("line {i}: the quick brown fox jumps over the lazy dog\n").as_bytes(),
1525 );
1526 }
1527 assert!(data.len() > DICT_MIN_DATA_SIZE);
1528
1529 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Generic)).unwrap();
1530 let decompressed = decompress_from_slice(&compressed).unwrap();
1531 assert_eq!(decompressed, data, "generic data dict roundtrip mismatch");
1532 }
1533
1534 #[test]
1535 fn test_dict_does_not_affect_other_modes() {
1536 let mut ndjson = String::new();
1539 for i in 0..200 {
1540 ndjson.push_str(&format!(
1541 r#"{{"id":{},"name":"user_{}","status":"active"}}"#,
1542 i, i
1543 ));
1544 ndjson.push('\n');
1545 }
1546 let data = ndjson.as_bytes();
1547
1548 for mode in [Mode::Balanced, Mode::Max] {
1549 let compressed = compress_to_vec(data, mode, Some(FormatHint::Ndjson)).unwrap();
1550 let mut cursor = Cursor::new(&compressed);
1551 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1552 assert!(!header.has_dict, "mode {mode} should never have dict flag");
1553 let decompressed = decompress_from_slice(&compressed).unwrap();
1554 assert_eq!(decompressed, data, "roundtrip failed for mode {mode}");
1555 }
1556 }
1557
1558 #[test]
1561 fn test_compress_with_level() {
1562 let data = "hello world, compressing with custom zstd level. ".repeat(50);
1564 let compressed =
1565 compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(19))
1566 .unwrap();
1567 let decompressed = decompress_from_slice(&compressed).unwrap();
1568 assert_eq!(decompressed, data.as_bytes(), "level 19 roundtrip failed");
1569 }
1570
1571 #[test]
1572 fn test_compress_with_level_default() {
1573 let data = "default level test data. ".repeat(50);
1575 let compressed =
1576 compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, None).unwrap();
1577 let decompressed = decompress_from_slice(&compressed).unwrap();
1578 assert_eq!(
1579 decompressed,
1580 data.as_bytes(),
1581 "default level roundtrip failed"
1582 );
1583 }
1584
1585 #[test]
1586 fn test_compress_with_level_higher_ratio() {
1587 let data = r#"{"name":"Alice","score":95}"#.repeat(200);
1589 let low =
1590 compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(1)).unwrap();
1591 let high = compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(19))
1592 .unwrap();
1593
1594 assert_eq!(decompress_from_slice(&low).unwrap(), data.as_bytes());
1596 assert_eq!(decompress_from_slice(&high).unwrap(), data.as_bytes());
1597
1598 assert!(
1600 high.len() <= low.len(),
1601 "level 19 ({}) should be <= level 1 ({})",
1602 high.len(),
1603 low.len()
1604 );
1605 }
1606
1607 #[test]
1610 fn test_auto_fallback_picks_smaller() {
1611 let data = std::fs::read(concat!(
1615 env!("CARGO_MANIFEST_DIR"),
1616 "/../../corpus/json-bench/citm_catalog.json"
1617 ))
1618 .unwrap();
1619
1620 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1621 let decompressed = decompress_from_slice(&compressed).unwrap();
1622 assert_eq!(decompressed, data, "citm_catalog roundtrip failed");
1623
1624 let ratio = data.len() as f64 / compressed.len() as f64;
1626 assert!(
1627 ratio > 50.0,
1628 "citm_catalog should achieve >50x, got {ratio:.1}x"
1629 );
1630 }
1631
1632 #[test]
1633 fn test_auto_fallback_preprocessed_wins_on_ndjson() {
1634 let data = std::fs::read(concat!(
1637 env!("CARGO_MANIFEST_DIR"),
1638 "/../../corpus/test-ndjson.ndjson"
1639 ))
1640 .unwrap();
1641
1642 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1643 let decompressed = decompress_from_slice(&compressed).unwrap();
1644 assert_eq!(decompressed, data, "test-ndjson roundtrip failed");
1645
1646 let mut cursor = Cursor::new(&compressed);
1649 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1650 assert!(
1651 !header.transform_metadata.is_empty() || header.meta_embedded,
1652 "test-ndjson should prefer preprocessed path (non-empty transform metadata or embedded)"
1653 );
1654 }
1655
1656 #[test]
1657 fn test_auto_fallback_roundtrip() {
1658 let citm = std::fs::read(concat!(
1661 env!("CARGO_MANIFEST_DIR"),
1662 "/../../corpus/json-bench/citm_catalog.json"
1663 ))
1664 .unwrap();
1665 let ndjson = std::fs::read(concat!(
1666 env!("CARGO_MANIFEST_DIR"),
1667 "/../../corpus/test-ndjson.ndjson"
1668 ))
1669 .unwrap();
1670
1671 let compressed_citm = compress_to_vec(&citm, Mode::Fast, Some(FormatHint::Json)).unwrap();
1673 let decompressed_citm = decompress_from_slice(&compressed_citm).unwrap();
1674 assert_eq!(
1675 decompressed_citm, citm,
1676 "citm_catalog roundtrip (raw path) failed"
1677 );
1678
1679 let compressed_ndjson =
1681 compress_to_vec(&ndjson, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1682 let decompressed_ndjson = decompress_from_slice(&compressed_ndjson).unwrap();
1683 assert_eq!(
1684 decompressed_ndjson, ndjson,
1685 "test-ndjson roundtrip (preprocessed path) failed"
1686 );
1687 }
1688
1689 #[test]
1692 fn test_adaptive_level_small_data() {
1693 assert_eq!(adaptive_fast_level(100_000, None), 19);
1695 assert_eq!(adaptive_fast_level(500_000, None), 19);
1696 assert_eq!(adaptive_fast_level(1_048_576, None), 19);
1697 assert_eq!(adaptive_fast_level(0, None), 19);
1698 }
1699
1700 #[test]
1701 fn test_adaptive_level_large_data() {
1702 assert_eq!(adaptive_fast_level(1_048_577, None), 13);
1704 assert_eq!(adaptive_fast_level(5_000_000, None), 13);
1705 assert_eq!(adaptive_fast_level(10_485_760, None), 13);
1706 assert_eq!(adaptive_fast_level(10_485_761, None), 9);
1707 assert_eq!(adaptive_fast_level(100_000_000, None), 9);
1708 }
1709
1710 #[test]
1711 fn test_adaptive_level_override() {
1712 assert_eq!(adaptive_fast_level(100, Some(3)), 3);
1714 assert_eq!(adaptive_fast_level(100_000_000, Some(22)), 22);
1715 assert_eq!(adaptive_fast_level(0, Some(1)), 1);
1716 }
1717
1718 #[test]
1721 fn test_compressed_metadata_roundtrip() {
1722 let mut ndjson = String::new();
1724 for i in 0..500 {
1725 ndjson.push_str(&format!(
1726 r#"{{"id":{},"name":"user_{}","status":"active","score":{}}}"#,
1727 i,
1728 i,
1729 i * 17 % 100
1730 ));
1731 ndjson.push('\n');
1732 }
1733 let data = ndjson.as_bytes();
1734
1735 let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1736 let decompressed = decompress_from_slice(&compressed).unwrap();
1737 assert_eq!(
1738 decompressed, data,
1739 "compressed metadata roundtrip: byte-exact mismatch"
1740 );
1741
1742 let mut cursor = Cursor::new(&compressed);
1744 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1745 if !header.transform_metadata.is_empty() && header.transform_metadata.len() > 10 {
1747 }
1751 }
1752
1753 #[test]
1754 fn test_compressed_metadata_backward_compat() {
1755 let original = b"backward compatibility test data for metadata decompression";
1758 let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1759
1760 let decompressed = decompress_from_slice(&compressed).unwrap();
1762 assert_eq!(decompressed, original.to_vec());
1763
1764 let mut cursor = Cursor::new(&compressed);
1766 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1767 assert!(!header.meta_compressed || !header.transform_metadata.is_empty());
1769 }
1770
1771 #[test]
1772 fn test_compressed_metadata_small_skipped() {
1773 let data = br#"{"name":"Alice","age":30}"#;
1776 let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1777 let decompressed = decompress_from_slice(&compressed).unwrap();
1778 assert_eq!(decompressed, data.to_vec());
1779
1780 let mut cursor = Cursor::new(&compressed);
1781 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1782 if header.transform_metadata.len() <= 64 {
1784 assert!(
1785 !header.meta_compressed,
1786 "metadata <= 64 bytes should not be compressed, but meta_compressed=true \
1787 for {} bytes of metadata",
1788 header.transform_metadata.len()
1789 );
1790 }
1791 }
1792
1793 #[test]
1794 fn test_twitter_json_brotli_wins() {
1795 let data = std::fs::read(concat!(
1798 env!("CARGO_MANIFEST_DIR"),
1799 "/../../corpus/json-bench/twitter.json"
1800 ))
1801 .unwrap();
1802
1803 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1804 let decompressed = decompress_from_slice(&compressed).unwrap();
1805 assert_eq!(decompressed, data, "twitter.json roundtrip failed");
1806
1807 let mut cursor = Cursor::new(&compressed);
1809 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1810 assert!(
1811 header.use_brotli,
1812 "twitter.json should use brotli (FLAG_BROTLI set in header)"
1813 );
1814 }
1815
1816 #[test]
1817 fn test_compressed_metadata_all_modes_roundtrip() {
1818 let mut ndjson = String::new();
1820 for i in 0..200 {
1821 ndjson.push_str(&format!(
1822 r#"{{"id":{},"name":"user_{}","status":"active"}}"#,
1823 i, i
1824 ));
1825 ndjson.push('\n');
1826 }
1827 let data = ndjson.as_bytes();
1828
1829 for mode in [Mode::Fast, Mode::Balanced, Mode::Max] {
1830 let compressed = compress_to_vec(data, mode, Some(FormatHint::Ndjson)).unwrap();
1831 let decompressed = decompress_from_slice(&compressed).unwrap();
1832 assert_eq!(
1833 decompressed, data,
1834 "compressed metadata roundtrip failed for mode {mode}"
1835 );
1836 }
1837 }
1838
1839 #[test]
1842 fn test_brotli_compress_roundtrip() {
1843 let data = b"Hello, brotli! This is a test of the brotli compression helpers.";
1845 let compressed = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
1846 let decompressed = brotli_decompress(&compressed).unwrap();
1847 assert_eq!(decompressed, data.to_vec());
1848 }
1849
1850 #[test]
1851 fn test_brotli_auto_fallback_twitter() {
1852 let data = std::fs::read(concat!(
1854 env!("CARGO_MANIFEST_DIR"),
1855 "/../../corpus/json-bench/twitter.json"
1856 ))
1857 .unwrap();
1858
1859 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1860 let decompressed = decompress_from_slice(&compressed).unwrap();
1861 assert_eq!(decompressed, data, "twitter.json brotli roundtrip failed");
1862
1863 let mut cursor = Cursor::new(&compressed);
1864 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1865 assert!(
1866 header.use_brotli,
1867 "twitter.json should use brotli in auto-fallback"
1868 );
1869 }
1870
1871 #[test]
1872 fn test_brotli_ndjson_roundtrip() {
1873 let data = std::fs::read(concat!(
1876 env!("CARGO_MANIFEST_DIR"),
1877 "/../../corpus/test-ndjson.ndjson"
1878 ))
1879 .unwrap();
1880
1881 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1882 let decompressed = decompress_from_slice(&compressed).unwrap();
1883 assert_eq!(decompressed, data, "ndjson roundtrip failed");
1884 }
1885
1886 #[test]
1887 fn test_brotli_backward_compat() {
1888 let original = b"backward compatibility test: this data was compressed without brotli";
1892 let crc = crc32fast::hash(original);
1893 let zstd_compressed = zstd::bulk::compress(original, 19).unwrap();
1894
1895 let header = crate::dcx::DcxHeader {
1896 mode: Mode::Fast,
1897 format_hint: crate::dcx::FormatHint::Generic,
1898 original_size: original.len() as u64,
1899 compressed_size: zstd_compressed.len() as u64,
1900 crc32: crc,
1901 transform_metadata: vec![],
1902 has_dict: false,
1903 meta_compressed: false,
1904 use_brotli: false,
1905 meta_embedded: false,
1906 };
1907
1908 let mut buf = Vec::new();
1909 header.write_to(&mut buf).unwrap();
1910 buf.extend_from_slice(&zstd_compressed);
1911
1912 assert_eq!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
1914
1915 let decompressed = decompress_from_slice(&buf).unwrap();
1917 assert_eq!(decompressed, original.to_vec());
1918 }
1919
1920 #[test]
1923 fn test_embedded_metadata_roundtrip() {
1924 let data = std::fs::read(concat!(
1927 env!("CARGO_MANIFEST_DIR"),
1928 "/../../corpus/test-api.json"
1929 ))
1930 .unwrap();
1931
1932 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1933 let decompressed = decompress_from_slice(&compressed).unwrap();
1934 assert_eq!(
1935 decompressed, data,
1936 "test-api.json embedded metadata roundtrip: byte-exact mismatch"
1937 );
1938 }
1939
1940 #[test]
1941 fn test_embedded_metadata_backward_compat() {
1942 let original = b"backward compat: no embedded metadata in this old file format";
1946 let crc = crc32fast::hash(original);
1947 let zstd_compressed = zstd::bulk::compress(original, 19).unwrap();
1948
1949 let header = crate::dcx::DcxHeader {
1950 mode: Mode::Fast,
1951 format_hint: crate::dcx::FormatHint::Generic,
1952 original_size: original.len() as u64,
1953 compressed_size: zstd_compressed.len() as u64,
1954 crc32: crc,
1955 transform_metadata: vec![],
1956 has_dict: false,
1957 meta_compressed: false,
1958 use_brotli: false,
1959 meta_embedded: false,
1960 };
1961
1962 let mut buf = Vec::new();
1963 header.write_to(&mut buf).unwrap();
1964 buf.extend_from_slice(&zstd_compressed);
1965
1966 assert_eq!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
1968
1969 let decompressed = decompress_from_slice(&buf).unwrap();
1971 assert_eq!(decompressed, original.to_vec());
1972 }
1973
1974 #[test]
1975 fn test_embedded_metadata_small_file_improvement() {
1976 let data = std::fs::read(concat!(
1979 env!("CARGO_MANIFEST_DIR"),
1980 "/../../corpus/test-api.json"
1981 ))
1982 .unwrap();
1983
1984 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1985 let decompressed = decompress_from_slice(&compressed).unwrap();
1986 assert_eq!(decompressed, data, "roundtrip failed");
1987
1988 let ratio = data.len() as f64 / compressed.len() as f64;
1990 assert!(
1991 ratio > 5.0,
1992 "test-api.json should achieve >5x compression, got {ratio:.1}x"
1993 );
1994
1995 let mut cursor = Cursor::new(&compressed);
1997 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1998
1999 if header.meta_embedded {
2001 assert!(
2002 header.transform_metadata.is_empty(),
2003 "meta_embedded header should have empty transform_metadata"
2004 );
2005 assert!(header.use_brotli, "meta_embedded should use brotli codec");
2006 }
2007 }
2008
2009 #[test]
2010 fn test_embedded_metadata_ndjson_roundtrip() {
2011 let data = std::fs::read(concat!(
2014 env!("CARGO_MANIFEST_DIR"),
2015 "/../../corpus/test-ndjson.ndjson"
2016 ))
2017 .unwrap();
2018
2019 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2020 let decompressed = decompress_from_slice(&compressed).unwrap();
2021 assert_eq!(
2022 decompressed, data,
2023 "NDJSON embedded metadata roundtrip: byte-exact mismatch"
2024 );
2025 }
2026
2027 #[test]
2028 fn test_embedded_metadata_manual_roundtrip() {
2029 let original = b"Hello, embedded metadata world! This is a test.";
2032 let crc = crc32fast::hash(original);
2033
2034 let empty_chain = TransformChain::new();
2037 let raw_metadata = empty_chain.serialize();
2038
2039 let mut embedded = Vec::new();
2041 embedded.extend_from_slice(&(raw_metadata.len() as u32).to_le_bytes());
2042 embedded.extend_from_slice(&raw_metadata);
2043 embedded.extend_from_slice(original);
2044
2045 let brotli_data = brotli_compress(&embedded, 11, BROTLI_MODE_GENERIC).unwrap();
2046
2047 let header = crate::dcx::DcxHeader {
2048 mode: Mode::Fast,
2049 format_hint: crate::dcx::FormatHint::Generic,
2050 original_size: original.len() as u64,
2051 compressed_size: brotli_data.len() as u64,
2052 crc32: crc,
2053 transform_metadata: vec![], has_dict: false,
2055 meta_compressed: false,
2056 use_brotli: true,
2057 meta_embedded: true,
2058 };
2059
2060 let mut buf = Vec::new();
2061 header.write_to(&mut buf).unwrap();
2062 buf.extend_from_slice(&brotli_data);
2063
2064 assert_ne!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2066 assert_ne!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2067
2068 let decompressed = decompress_from_slice(&buf).unwrap();
2070 assert_eq!(decompressed, original.to_vec());
2071 }
2072
2073 #[test]
2076 fn test_brotli_text_mode_on_raw() {
2077 let data = br#"{"name":"Alice","age":30,"city":"New York","active":true}"#;
2079
2080 let compressed_text = brotli_compress(data, 11, BROTLI_MODE_TEXT).unwrap();
2082 let decompressed_text = brotli_decompress(&compressed_text).unwrap();
2083 assert_eq!(
2084 decompressed_text,
2085 data.to_vec(),
2086 "TEXT mode roundtrip failed"
2087 );
2088
2089 let compressed_generic = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
2091 let decompressed_generic = brotli_decompress(&compressed_generic).unwrap();
2092 assert_eq!(
2093 decompressed_generic,
2094 data.to_vec(),
2095 "GENERIC mode roundtrip failed"
2096 );
2097
2098 assert!(
2103 !compressed_text.is_empty(),
2104 "TEXT mode should produce non-empty output"
2105 );
2106 }
2107
2108 #[test]
2111 fn test_zstd_embedded_metadata_roundtrip() {
2112 let original = b"Hello, zstd embedded metadata! This is a test of the zstd path.";
2115 let crc = crc32fast::hash(original);
2116
2117 let empty_chain = TransformChain::new();
2119 let raw_metadata = empty_chain.serialize();
2120
2121 let mut embedded = Vec::new();
2123 embedded.extend_from_slice(&(raw_metadata.len() as u32).to_le_bytes());
2124 embedded.extend_from_slice(&raw_metadata);
2125 embedded.extend_from_slice(original);
2126
2127 let zstd_data = zstd::bulk::compress(&embedded, 19).unwrap();
2128
2129 let header = crate::dcx::DcxHeader {
2130 mode: Mode::Fast,
2131 format_hint: crate::dcx::FormatHint::Generic,
2132 original_size: original.len() as u64,
2133 compressed_size: zstd_data.len() as u64,
2134 crc32: crc,
2135 transform_metadata: vec![], has_dict: false,
2137 meta_compressed: false,
2138 use_brotli: false, meta_embedded: true,
2140 };
2141
2142 let mut buf = Vec::new();
2143 header.write_to(&mut buf).unwrap();
2144 buf.extend_from_slice(&zstd_data);
2145
2146 assert_ne!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2148 assert_eq!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2149
2150 let decompressed = decompress_from_slice(&buf).unwrap();
2152 assert_eq!(decompressed, original.to_vec());
2153 }
2154
2155 #[test]
2158 fn test_multi_quality_brotli() {
2159 let data = br#"{"items":[1,2,3,4,5],"nested":{"a":"hello","b":"world"}}"#;
2162
2163 let q10 = brotli_compress(data, 10, BROTLI_MODE_GENERIC).unwrap();
2164 let q11 = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
2165
2166 let dec_q10 = brotli_decompress(&q10).unwrap();
2167 let dec_q11 = brotli_decompress(&q11).unwrap();
2168
2169 assert_eq!(dec_q10, data.to_vec(), "quality 10 roundtrip failed");
2170 assert_eq!(dec_q11, data.to_vec(), "quality 11 roundtrip failed");
2171
2172 assert!(!q10.is_empty());
2174 assert!(!q11.is_empty());
2175
2176 let corpus_files = [
2180 concat!(env!("CARGO_MANIFEST_DIR"), "/../../corpus/test-api.json"),
2181 concat!(
2182 env!("CARGO_MANIFEST_DIR"),
2183 "/../../corpus/json-bench/twitter.json"
2184 ),
2185 ];
2186 for path in corpus_files {
2187 let file_data = std::fs::read(path).unwrap();
2188 let compressed =
2189 compress_to_vec(&file_data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2190 let decompressed = decompress_from_slice(&compressed).unwrap();
2191 assert_eq!(
2192 decompressed, file_data,
2193 "multi-quality roundtrip failed for {path}"
2194 );
2195 }
2196 }
2197
2198 #[test]
2201 fn test_singleton_arrays_fast_roundtrip() {
2202 let rows: Vec<String> = (0..500)
2205 .map(|i| format!("{{\"items\":[{{\"x\":{}}}],\"id\":{}}}", i, i))
2206 .collect();
2207 let data = rows.join("\n") + "\n";
2208 let compressed =
2209 compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2210 let decompressed = decompress_from_slice(&compressed).unwrap();
2211 assert_eq!(
2212 decompressed,
2213 data.as_bytes(),
2214 "singleton_arrays fast mode roundtrip failed"
2215 );
2216 }
2217
2218 #[test]
2219 fn test_very_long_lines_fast_roundtrip() {
2220 let rows: Vec<String> = (0..50)
2223 .map(|i| format!("{{\"data\":\"{}\",\"id\":{}}}", "X".repeat(100_000), i))
2224 .collect();
2225 let data = rows.join("\n") + "\n";
2226 let compressed =
2227 compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2228 let decompressed = decompress_from_slice(&compressed).unwrap();
2229 assert_eq!(
2230 decompressed,
2231 data.as_bytes(),
2232 "very_long_lines fast mode roundtrip failed"
2233 );
2234 }
2235
2236 #[test]
2237 fn test_very_long_lines_balanced_roundtrip() {
2238 let rows: Vec<String> = (0..10)
2241 .map(|i| format!("{{\"data\":\"{}\",\"id\":{}}}", "X".repeat(100_000), i))
2242 .collect();
2243 let data = rows.join("\n") + "\n";
2244 let compressed =
2245 compress_to_vec(data.as_bytes(), Mode::Balanced, Some(FormatHint::Ndjson)).unwrap();
2246 let decompressed = decompress_from_slice(&compressed).unwrap();
2247 assert_eq!(
2248 decompressed,
2249 data.as_bytes(),
2250 "very_long_lines balanced mode roundtrip failed"
2251 );
2252 }
2253
2254 #[test]
2255 fn test_all_same_value_fast_roundtrip() {
2256 let rows: Vec<String> = (0..10_000).map(|_| "{\"x\":1}".to_string()).collect();
2261 let data = rows.join("\n") + "\n";
2262 let compressed =
2263 compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2264 let decompressed = decompress_from_slice(&compressed).unwrap();
2265 assert_eq!(
2266 decompressed,
2267 data.as_bytes(),
2268 "all_same_value fast mode roundtrip failed"
2269 );
2270 }
2271
2272 #[test]
2273 fn test_generate_training_samples_degenerate() {
2274 let mut data = vec![0x02u8]; data.extend_from_slice(&[0x00; 9999]); let samples = generate_training_samples(&data, 1024);
2279 let avg_len = samples.iter().map(|s| s.len()).sum::<usize>() / samples.len();
2281 assert!(
2282 avg_len >= 8,
2283 "training samples average size should be >= 8, got {avg_len}"
2284 );
2285 }
2286}