1use std::io::{self, Cursor, Read, Write};
10
11use crate::dcx::{DcxHeader, FormatHint, Mode};
12use crate::entropy::arithmetic::{ArithmeticDecoder, ArithmeticEncoder};
13use crate::format::transform::TransformChain;
14use crate::format::{detect_format, preprocess, reverse_preprocess};
15use crate::mixer::MetaMixer;
16use crate::model::gru_model::GruModel;
17use crate::model::{CMConfig, CMEngine};
18
19fn adaptive_fast_level(data_size: usize, level_override: Option<i32>) -> i32 {
25 if let Some(level) = level_override {
26 return level; }
28 match data_size {
29 0..=1_048_576 => 19, 1_048_577..=10_485_760 => 13, _ => 9, }
33}
34
35const DICT_MIN_DATA_SIZE: usize = 8192;
40
41fn dict_chunk_size(data_len: usize) -> usize {
47 if data_len > 4_194_304 {
48 131_072 } else if data_len > 1_048_576 {
50 65_536 } else if data_len > 262_144 {
52 32_768 } else {
54 16_384 }
56}
57
58fn dict_max_size(data_len: usize) -> usize {
62 if data_len > 4_194_304 {
63 16_384 } else if data_len > 1_048_576 {
65 8_192 } else {
67 4_096 }
69}
70
71fn generate_training_samples(data: &[u8], chunk_size: usize) -> Vec<&[u8]> {
77 let col_chunks: Vec<&[u8]> = data.split(|&b| b == 0x00).collect();
79 if col_chunks.len() >= 5 {
80 let non_empty: Vec<&[u8]> = col_chunks.into_iter().filter(|c| !c.is_empty()).collect();
81 if !non_empty.is_empty() {
87 let avg_len = non_empty.iter().map(|c| c.len()).sum::<usize>() / non_empty.len();
88 if avg_len >= 8 {
89 return non_empty;
90 }
91 }
92 }
93
94 split_into_chunks(data, chunk_size)
96}
97
98fn split_into_chunks(data: &[u8], chunk_size: usize) -> Vec<&[u8]> {
101 let mut chunks = Vec::new();
102 let mut offset = 0;
103 while offset < data.len() {
104 let end = (offset + chunk_size).min(data.len());
105 chunks.push(&data[offset..end]);
106 offset = end;
107 }
108 chunks
109}
110
111fn try_dict_compress(data: &[u8], level: i32, plain_size: usize) -> Option<Vec<u8>> {
120 let chunk_size = dict_chunk_size(data.len());
121
122 let training_samples = generate_training_samples(data, chunk_size);
124 if training_samples.len() < 5 {
125 return None;
126 }
127
128 let max_dict = dict_max_size(data.len());
129
130 let dict = zstd::dict::from_samples(&training_samples, max_dict).ok()?;
132 if dict.is_empty() {
133 return None;
134 }
135
136 let chunks = split_into_chunks(data, chunk_size);
138
139 let mut compressor = zstd::bulk::Compressor::with_dictionary(level, &dict).ok()?;
141 let mut compressed_chunks: Vec<Vec<u8>> = Vec::with_capacity(chunks.len());
142 for chunk in &chunks {
143 let cc = compressor.compress(chunk).ok()?;
144 compressed_chunks.push(cc);
145 }
146
147 let total_compressed: usize = compressed_chunks.iter().map(|c| 4 + c.len()).sum();
152 let payload_size = 4 + dict.len() + 4 + total_compressed;
153
154 if payload_size >= plain_size {
156 return None;
157 }
158
159 let mut payload = Vec::with_capacity(payload_size);
160 payload.extend_from_slice(&(dict.len() as u32).to_le_bytes());
161 payload.extend_from_slice(&dict);
162 payload.extend_from_slice(&(compressed_chunks.len() as u32).to_le_bytes());
163 for cc in &compressed_chunks {
164 payload.extend_from_slice(&(cc.len() as u32).to_le_bytes());
165 payload.extend_from_slice(cc);
166 }
167
168 Some(payload)
169}
170
171fn decompress_with_dict(payload: &[u8], capacity: usize) -> std::io::Result<Vec<u8>> {
180 if payload.len() < 4 {
181 return Err(io::Error::new(
182 io::ErrorKind::InvalidData,
183 "dict payload too short for dict_size",
184 ));
185 }
186 let mut pos = 0;
187
188 let dict_size =
190 u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
191 pos += 4;
192 if payload.len() < pos + dict_size {
193 return Err(io::Error::new(
194 io::ErrorKind::InvalidData,
195 "dict payload truncated: dictionary bytes",
196 ));
197 }
198 let dict_bytes = &payload[pos..pos + dict_size];
199 pos += dict_size;
200
201 if payload.len() < pos + 4 {
203 return Err(io::Error::new(
204 io::ErrorKind::InvalidData,
205 "dict payload truncated: num_chunks",
206 ));
207 }
208 let num_chunks =
209 u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
210 pos += 4;
211
212 let mut decompressor = zstd::bulk::Decompressor::with_dictionary(dict_bytes)
214 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
215
216 let mut output = Vec::with_capacity(capacity);
217
218 for i in 0..num_chunks {
219 if payload.len() < pos + 4 {
220 return Err(io::Error::new(
221 io::ErrorKind::InvalidData,
222 format!("dict payload truncated: chunk {i} size"),
223 ));
224 }
225 let chunk_size =
226 u32::from_le_bytes(payload[pos..pos + 4].try_into().expect("4-byte slice")) as usize;
227 pos += 4;
228 if payload.len() < pos + chunk_size {
229 return Err(io::Error::new(
230 io::ErrorKind::InvalidData,
231 format!("dict payload truncated: chunk {i} data"),
232 ));
233 }
234 let chunk_data = &payload[pos..pos + chunk_size];
235 pos += chunk_size;
236
237 let chunk_capacity = capacity.saturating_sub(output.len());
239 let decompressed = decompressor
240 .decompress(chunk_data, chunk_capacity)
241 .map_err(|e| {
242 io::Error::new(
243 io::ErrorKind::InvalidData,
244 format!("chunk {i} decompress failed: {e}"),
245 )
246 })?;
247 output.extend_from_slice(&decompressed);
248 }
249
250 Ok(output)
251}
252
253const BROTLI_MODE_GENERIC: u32 = 0;
259const BROTLI_MODE_TEXT: u32 = 1;
260
261fn brotli_compress(data: &[u8], quality: u32, mode: u32) -> io::Result<Vec<u8>> {
264 use brotli::enc::backward_references::BrotliEncoderMode;
265 let mut output = Vec::new();
266 let brotli_mode = match mode {
267 1 => BrotliEncoderMode::BROTLI_MODE_TEXT,
268 _ => BrotliEncoderMode::BROTLI_MODE_GENERIC,
269 };
270 let params = brotli::enc::BrotliEncoderParams {
271 quality: quality as i32,
272 mode: brotli_mode,
273 ..Default::default()
274 };
275 brotli::BrotliCompress(&mut io::Cursor::new(data), &mut output, ¶ms)?;
276 Ok(output)
277}
278
279fn brotli_decompress(data: &[u8]) -> io::Result<Vec<u8>> {
281 let mut output = Vec::new();
282 brotli::BrotliDecompress(&mut io::Cursor::new(data), &mut output)?;
283 Ok(output)
284}
285
286fn cm_compress(data: &[u8], config: CMConfig) -> Vec<u8> {
289 let mut engine = CMEngine::with_config(config);
290 let mut encoder = ArithmeticEncoder::new();
291
292 for &byte in data {
293 for bpos in 0..8 {
294 let bit = (byte >> (7 - bpos)) & 1;
295 let p = engine.predict();
296 encoder.encode(bit, p);
297 engine.update(bit);
298 }
299 }
300
301 encoder.finish()
302}
303
304fn cm_decompress(compressed: &[u8], original_size: usize, config: CMConfig) -> Vec<u8> {
307 let mut engine = CMEngine::with_config(config);
308 let mut decoder = ArithmeticDecoder::new(compressed);
309 let mut output = Vec::with_capacity(original_size);
310
311 for _ in 0..original_size {
312 let mut byte_val: u8 = 0;
313 for bpos in 0..8 {
314 let p = engine.predict();
315 let bit = decoder.decode(p);
316 engine.update(bit);
317 byte_val |= bit << (7 - bpos);
318 }
319 output.push(byte_val);
320 }
321
322 output
323}
324
325fn gru_compress(data: &[u8], config: CMConfig) -> Vec<u8> {
333 let mut engine = CMEngine::with_config(config);
334 let mut gru = GruModel::new();
335 let mut meta_mixer = MetaMixer::new(12); let mut encoder = ArithmeticEncoder::new();
337
338 let total_bytes = data.len();
339 let report_interval = if total_bytes > 100_000 {
340 total_bytes / 20
341 } else {
342 0
343 };
344
345 for (byte_idx, &byte) in data.iter().enumerate() {
346 for bpos in 0..8u8 {
347 let bit = (byte >> (7 - bpos)) & 1;
348
349 let p_cm = engine.predict();
351
352 let partial = if bpos == 0 {
354 1u32
355 } else {
356 let mut p = 1u32;
357 for prev_bpos in 0..bpos {
358 let prev_bit = (byte >> (7 - prev_bpos)) & 1;
359 p = (p << 1) | prev_bit as u32;
360 }
361 p
362 };
363 let p_gru = gru.predict_bit(bpos, partial);
364
365 let p_final = meta_mixer.blend(p_cm, p_gru);
367
368 encoder.encode(bit, p_final);
369 engine.update(bit);
370 meta_mixer.update(bit);
371 }
372
373 gru.train(byte);
375 gru.forward(byte);
376
377 if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
378 let pct = (byte_idx + 1) * 100 / total_bytes;
379 eprint!("\r[gru] compressing... {pct}%");
380 }
381 }
382
383 if total_bytes > 100_000 {
384 eprintln!("\r[gru] compressing... 100%");
385 }
386
387 encoder.finish()
388}
389
390fn gru_decompress(compressed: &[u8], original_size: usize, config: CMConfig) -> Vec<u8> {
393 let mut engine = CMEngine::with_config(config);
394 let mut gru = GruModel::new();
395 let mut meta_mixer = MetaMixer::new(12); let mut decoder = ArithmeticDecoder::new(compressed);
397 let mut output = Vec::with_capacity(original_size);
398
399 let report_interval = if original_size > 100_000 {
400 original_size / 20
401 } else {
402 0
403 };
404
405 for byte_idx in 0..original_size {
406 let mut byte_val: u8 = 0;
407
408 for bpos in 0..8u8 {
409 let p_cm = engine.predict();
411
412 let partial = if bpos == 0 {
414 1u32
415 } else {
416 let mut p = 1u32;
417 for prev_bpos in 0..bpos {
418 let prev_bit = (byte_val >> (7 - prev_bpos)) & 1;
419 p = (p << 1) | prev_bit as u32;
420 }
421 p
422 };
423 let p_gru = gru.predict_bit(bpos, partial);
424
425 let p_final = meta_mixer.blend(p_cm, p_gru);
427
428 let bit = decoder.decode(p_final);
429 engine.update(bit);
430 meta_mixer.update(bit);
431 byte_val |= bit << (7 - bpos);
432 }
433
434 output.push(byte_val);
435
436 gru.train(byte_val);
438 gru.forward(byte_val);
439
440 if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
441 let pct = (byte_idx + 1) * 100 / original_size;
442 eprint!("\r[gru] decompressing... {pct}%");
443 }
444 }
445
446 if original_size > 100_000 {
447 eprintln!("\r[gru] decompressing... 100%");
448 }
449
450 output
451}
452
453#[cfg(feature = "neural")]
461fn neural_compress(
462 data: &[u8],
463 config: CMConfig,
464 llm: &mut datacortex_neural::LlmPredictor,
465 meta_mixer: &mut datacortex_neural::MetaMixer,
466) -> Vec<u8> {
467 let mut engine = CMEngine::with_config(config);
468 let mut encoder = ArithmeticEncoder::new();
469
470 let total_bytes = data.len();
476 let mut bytes_processed = 0;
477 let report_interval = total_bytes / 20; for (byte_idx, &byte) in data.iter().enumerate() {
480 for bpos in 0..8u8 {
484 let bit = (byte >> (7 - bpos)) & 1;
485
486 let p_cm = engine.predict();
488
489 let partial = if bpos == 0 {
492 1u32
493 } else {
494 let mut p = 1u32;
496 for prev_bpos in 0..bpos {
497 let prev_bit = (byte >> (7 - prev_bpos)) & 1;
498 p = (p << 1) | prev_bit as u32;
499 }
500 p
501 };
502 let p_llm = llm.predict_bit(bpos, partial);
503
504 let p_final = meta_mixer.blend(p_cm, p_llm);
506
507 encoder.encode(bit, p_final);
508 engine.update(bit);
509 meta_mixer.update(bit);
510 }
511
512 if let Err(e) = llm.predict_byte_probs(byte) {
514 if byte_idx < 5 {
516 eprintln!("[neural] LLM predict error at byte {byte_idx}: {e}");
517 }
518 }
519
520 bytes_processed += 1;
521 if report_interval > 0 && bytes_processed % report_interval == 0 {
522 let pct = bytes_processed * 100 / total_bytes;
523 eprint!("\r[neural] compressing... {pct}%");
524 }
525 }
526
527 if total_bytes > 1000 {
528 eprintln!("\r[neural] compressing... 100%");
529 }
530
531 encoder.finish()
532}
533
534#[cfg(feature = "neural")]
537fn neural_decompress(
538 compressed: &[u8],
539 original_size: usize,
540 config: CMConfig,
541 llm: &mut datacortex_neural::LlmPredictor,
542 meta_mixer: &mut datacortex_neural::MetaMixer,
543) -> Vec<u8> {
544 let mut engine = CMEngine::with_config(config);
545 let mut decoder = ArithmeticDecoder::new(compressed);
546 let mut output = Vec::with_capacity(original_size);
547
548 let report_interval = if original_size > 0 {
549 original_size / 20
550 } else {
551 1
552 };
553
554 for byte_idx in 0..original_size {
555 let mut byte_val: u8 = 0;
556
557 for bpos in 0..8u8 {
558 let p_cm = engine.predict();
560
561 let partial = if bpos == 0 {
563 1u32
564 } else {
565 let mut p = 1u32;
567 for prev_bpos in 0..bpos {
568 let prev_bit = (byte_val >> (7 - prev_bpos)) & 1;
569 p = (p << 1) | prev_bit as u32;
570 }
571 p
572 };
573 let p_llm = llm.predict_bit(bpos, partial);
574
575 let p_final = meta_mixer.blend(p_cm, p_llm);
577
578 let bit = decoder.decode(p_final);
579 engine.update(bit);
580 meta_mixer.update(bit);
581 byte_val |= bit << (7 - bpos);
582 }
583
584 output.push(byte_val);
585
586 if let Err(e) = llm.predict_byte_probs(byte_val) {
588 if byte_idx < 5 {
589 eprintln!("[neural] LLM predict error at byte {byte_idx}: {e}");
590 }
591 }
592
593 if report_interval > 0 && (byte_idx + 1) % report_interval == 0 {
594 let pct = (byte_idx + 1) * 100 / original_size;
595 eprint!("\r[neural] decompressing... {pct}%");
596 }
597 }
598
599 if original_size > 1000 {
600 eprintln!("\r[neural] decompressing... 100%");
601 }
602
603 output
604}
605
606fn cm_config_for_mode(mode: Mode) -> CMConfig {
608 match mode {
609 Mode::Max => CMConfig::max(),
610 Mode::Balanced => CMConfig::balanced(),
611 Mode::Fast => CMConfig::balanced(), }
613}
614
615#[cfg(feature = "neural")]
620fn resolve_model_path(explicit: Option<&str>) -> Option<String> {
621 if let Some(p) = explicit {
622 if std::path::Path::new(p).exists() {
623 return Some(p.to_string());
624 }
625 eprintln!("[neural] explicit model path not found: {p}");
626 return None;
627 }
628
629 if let Ok(p) = std::env::var("DATACORTEX_MODEL") {
630 if p.is_empty() {
631 return None;
633 }
634 if std::path::Path::new(&p).exists() {
635 return Some(p);
636 }
637 eprintln!("[neural] DATACORTEX_MODEL path not found: {p}");
638 return None; }
640
641 if let Some(home) = std::env::var_os("HOME") {
643 let default = format!(
644 "{}/.datacortex/models/SmolLM2-135M-Instruct-Q8_0.gguf",
645 home.to_string_lossy()
646 );
647 if std::path::Path::new(&default).exists() {
648 return Some(default);
649 }
650 }
651
652 None
653}
654
655pub fn compress<W: Write>(
657 data: &[u8],
658 mode: Mode,
659 format_override: Option<FormatHint>,
660 output: &mut W,
661) -> io::Result<()> {
662 compress_with_model(data, mode, format_override, None, output)
663}
664
665pub fn compress_with_model<W: Write>(
667 data: &[u8],
668 mode: Mode,
669 format_override: Option<FormatHint>,
670 model_path: Option<&str>,
671 output: &mut W,
672) -> io::Result<()> {
673 compress_with_options(data, mode, format_override, model_path, None, output)
674}
675
676pub fn compress_with_options<W: Write>(
678 data: &[u8],
679 mode: Mode,
680 format_override: Option<FormatHint>,
681 model_path: Option<&str>,
682 zstd_level_override: Option<i32>,
683 output: &mut W,
684) -> io::Result<()> {
685 let format_hint = format_override.unwrap_or_else(|| detect_format(data));
686 let crc = crc32fast::hash(data);
687
688 let (preprocessed, chain) = preprocess(data, format_hint, mode);
690 let transform_metadata = if chain.is_empty() {
691 vec![]
692 } else {
693 chain.serialize()
694 };
695
696 let mut use_dict = false;
698 let mut use_brotli = false;
699 let mut use_raw_fallback = false;
701 let mut use_meta_embedded = false;
703 let compressed = match mode {
704 Mode::Fast => {
720 let level = adaptive_fast_level(preprocessed.len(), zstd_level_override);
721
722 let plain_a = zstd::bulk::compress(&preprocessed, level).map_err(io::Error::other)?;
724
725 let (compressed_a, dict_a) = if preprocessed.len() >= DICT_MIN_DATA_SIZE {
726 if let Some(dict_payload) = try_dict_compress(&preprocessed, level, plain_a.len()) {
727 (dict_payload, true)
728 } else {
729 (plain_a, false)
730 }
731 } else {
732 (plain_a, false)
733 };
734
735 let meta_size_for_comparison = if transform_metadata.len() > 64 {
738 let compressed_meta = zstd::bulk::compress(&transform_metadata, 19)
739 .unwrap_or_else(|_| transform_metadata.clone());
740 if compressed_meta.len() < transform_metadata.len() {
741 compressed_meta.len()
742 } else {
743 transform_metadata.len()
744 }
745 } else {
746 transform_metadata.len()
747 };
748
749 let total_a = 32 + meta_size_for_comparison + compressed_a.len();
751
752 let raw_level = adaptive_fast_level(data.len(), zstd_level_override);
755 let compressed_b = zstd::bulk::compress(data, raw_level).map_err(io::Error::other)?;
756
757 let total_b = 32 + compressed_b.len();
759
760 let (
762 mut best_compressed,
763 mut best_total,
764 mut best_dict,
765 mut best_raw,
766 mut best_brotli,
767 mut best_embedded,
768 ) = if total_b < total_a {
769 (compressed_b, total_b, false, true, false, false)
770 } else {
771 (compressed_a, total_a, dict_a, false, false, false)
772 };
773
774 let brotli_quality = if data.len() <= 1_048_576 { 11 } else { 9 };
777 if let Ok(brotli_raw) = brotli_compress(data, brotli_quality, BROTLI_MODE_TEXT) {
778 let brotli_raw_total = 32 + brotli_raw.len();
779 if brotli_raw_total < best_total {
780 best_compressed = brotli_raw;
781 best_total = brotli_raw_total;
782 best_dict = false;
783 best_raw = true;
784 best_brotli = true;
785 best_embedded = false;
786 }
787 }
788
789 {
792 let brotli_prep_max_q = if preprocessed.len() <= 1_048_576 {
793 11
794 } else {
795 9
796 };
797 let qualities = if brotli_prep_max_q == 11 {
798 &[11u32, 10][..]
799 } else {
800 &[brotli_prep_max_q as u32][..]
801 };
802 for &q in qualities {
803 if let Ok(brotli_prep) = brotli_compress(&preprocessed, q, BROTLI_MODE_GENERIC)
804 {
805 let brotli_prep_total = 32 + meta_size_for_comparison + brotli_prep.len();
806 if brotli_prep_total < best_total {
807 best_compressed = brotli_prep;
808 best_total = brotli_prep_total;
809 best_dict = false;
810 best_raw = false;
811 best_brotli = true;
812 best_embedded = false;
813 }
814 }
815 }
816 }
817
818 let embedded_payload = if !transform_metadata.is_empty() {
820 let mut ep = Vec::with_capacity(4 + transform_metadata.len() + preprocessed.len());
821 ep.extend_from_slice(&(transform_metadata.len() as u32).to_le_bytes());
822 ep.extend_from_slice(&transform_metadata);
823 ep.extend_from_slice(&preprocessed);
824 Some(ep)
825 } else {
826 None
827 };
828
829 if let Some(ref embedded_payload) = embedded_payload {
837 let embed_max_q = if embedded_payload.len() <= 1_048_576 {
838 11
839 } else {
840 9
841 };
842 let qualities = if embed_max_q == 11 {
843 &[11u32, 10][..]
844 } else {
845 &[embed_max_q as u32][..]
846 };
847 for &q in qualities {
848 if let Ok(brotli_embedded) =
849 brotli_compress(embedded_payload, q, BROTLI_MODE_GENERIC)
850 {
851 let brotli_embedded_total = 32 + brotli_embedded.len();
853 if brotli_embedded_total < best_total {
854 best_compressed = brotli_embedded;
855 best_total = brotli_embedded_total;
856 best_dict = false;
857 best_raw = false;
858 best_brotli = true;
859 best_embedded = true;
860 }
861 }
862 }
863 }
864
865 if let Some(ref embedded_payload) = embedded_payload {
872 let embed_level = adaptive_fast_level(embedded_payload.len(), zstd_level_override);
873 if let Ok(zstd_embedded) = zstd::bulk::compress(embedded_payload, embed_level) {
874 let zstd_embedded_total = 32 + zstd_embedded.len();
876 if zstd_embedded_total < best_total {
877 best_compressed = zstd_embedded;
878 best_total = zstd_embedded_total;
879 best_dict = false;
880 best_raw = false;
881 best_brotli = false;
882 best_embedded = true;
883 }
884 }
885 }
886
887 let _ = best_total; use_dict = best_dict;
889 use_raw_fallback = best_raw;
890 use_brotli = best_brotli;
891 use_meta_embedded = best_embedded;
892 best_compressed
893 }
894 Mode::Balanced => {
896 let config = cm_config_for_mode(mode);
897 let cm_data = gru_compress(&preprocessed, config);
898 let mut payload = Vec::with_capacity(8 + cm_data.len());
899 payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
900 payload.extend_from_slice(&cm_data);
901 payload
902 }
903 Mode::Max => {
905 let config = cm_config_for_mode(mode);
906
907 #[cfg(feature = "neural")]
908 {
909 if let Some(mpath) = resolve_model_path(model_path) {
910 match datacortex_neural::LlmPredictor::new(&mpath) {
911 Ok(mut llm) => {
912 let mut meta_mixer = datacortex_neural::MetaMixer::new(5);
913 eprintln!(
914 "[neural] Max mode: dual-path CM+LLM ({} bytes mapped)",
915 llm.mapped_bytes()
916 );
917 let cm_data =
918 neural_compress(&preprocessed, config, &mut llm, &mut meta_mixer);
919 let mut payload = Vec::with_capacity(8 + cm_data.len());
920 let size_with_flag = preprocessed.len() as u64 | (1u64 << 63);
923 payload.extend_from_slice(&size_with_flag.to_le_bytes());
924 payload.extend_from_slice(&cm_data);
925 payload
926 }
927 Err(e) => {
928 eprintln!("[neural] LLM init failed, falling back to CM-only: {e}");
929 let cm_data = cm_compress(&preprocessed, config);
930 let mut payload = Vec::with_capacity(8 + cm_data.len());
931 payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
932 payload.extend_from_slice(&cm_data);
933 payload
934 }
935 }
936 } else {
937 eprintln!(
938 "[neural] no model found, Max mode using CM-only. \
939 Set DATACORTEX_MODEL or use --model-path."
940 );
941 let cm_data = cm_compress(&preprocessed, config);
942 let mut payload = Vec::with_capacity(8 + cm_data.len());
943 payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
944 payload.extend_from_slice(&cm_data);
945 payload
946 }
947 }
948
949 #[cfg(not(feature = "neural"))]
950 {
951 let _ = model_path; let cm_data = cm_compress(&preprocessed, config);
953 let mut payload = Vec::with_capacity(8 + cm_data.len());
954 payload.extend_from_slice(&(preprocessed.len() as u64).to_le_bytes());
955 payload.extend_from_slice(&cm_data);
956 payload
957 }
958 }
959 };
960
961 let final_metadata = if use_raw_fallback || use_meta_embedded {
965 vec![]
966 } else {
967 transform_metadata
968 };
969
970 let (header_metadata, meta_compressed) = if final_metadata.len() > 64 {
974 let compressed_meta =
975 zstd::bulk::compress(&final_metadata, 19).unwrap_or_else(|_| final_metadata.clone());
976 if compressed_meta.len() < final_metadata.len() {
977 (compressed_meta, true)
978 } else {
979 (final_metadata, false)
980 }
981 } else {
982 (final_metadata, false)
983 };
984
985 let header = DcxHeader {
986 mode,
987 format_hint,
988 original_size: data.len() as u64,
989 compressed_size: compressed.len() as u64,
990 crc32: crc,
991 transform_metadata: header_metadata,
992 has_dict: use_dict,
993 meta_compressed,
994 use_brotli,
995 meta_embedded: use_meta_embedded,
996 };
997
998 header.write_to(output)?;
999 output.write_all(&compressed)?;
1000
1001 Ok(())
1002}
1003
1004pub fn decompress<R: Read>(input: &mut R) -> io::Result<Vec<u8>> {
1006 decompress_with_model(input, None)
1007}
1008
1009pub fn decompress_with_model<R: Read>(
1011 input: &mut R,
1012 model_path: Option<&str>,
1013) -> io::Result<Vec<u8>> {
1014 let header = DcxHeader::read_from(input)?;
1015
1016 let mut compressed = vec![0u8; header.compressed_size as usize];
1017 input.read_exact(&mut compressed)?;
1018
1019 let preprocessed = match header.mode {
1021 Mode::Fast => {
1022 if header.use_brotli {
1023 brotli_decompress(&compressed)?
1024 } else {
1025 let capacity = header.original_size as usize * 2 + 65536;
1026 if header.has_dict {
1027 decompress_with_dict(&compressed, capacity)?
1028 } else {
1029 zstd::bulk::decompress(&compressed, capacity)
1030 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
1031 }
1032 }
1033 }
1034 Mode::Balanced => {
1035 if compressed.len() < 8 {
1037 return Err(io::Error::new(
1038 io::ErrorKind::InvalidData,
1039 "CM mode compressed data too short",
1040 ));
1041 }
1042 let size_raw = u64::from_le_bytes(compressed[..8].try_into().expect("8-byte slice"));
1043 let preprocessed_size = (size_raw & !(1u64 << 63)) as usize;
1044 let config = cm_config_for_mode(header.mode);
1045 gru_decompress(&compressed[8..], preprocessed_size, config)
1046 }
1047 Mode::Max => {
1048 if compressed.len() < 8 {
1050 return Err(io::Error::new(
1051 io::ErrorKind::InvalidData,
1052 "CM mode compressed data too short",
1053 ));
1054 }
1055 let size_raw = u64::from_le_bytes(compressed[..8].try_into().expect("8-byte slice"));
1056
1057 let neural_flag = size_raw & (1u64 << 63) != 0;
1059 let preprocessed_size = (size_raw & !(1u64 << 63)) as usize;
1060 let config = cm_config_for_mode(header.mode);
1061
1062 if neural_flag {
1063 #[cfg(feature = "neural")]
1064 {
1065 if let Some(mpath) = resolve_model_path(model_path) {
1066 match datacortex_neural::LlmPredictor::new(&mpath) {
1067 Ok(mut llm) => {
1068 let mut meta_mixer = datacortex_neural::MetaMixer::new(5);
1069 eprintln!(
1070 "[neural] decompressing with dual-path CM+LLM ({} bytes mapped)",
1071 llm.mapped_bytes()
1072 );
1073 neural_decompress(
1074 &compressed[8..],
1075 preprocessed_size,
1076 config,
1077 &mut llm,
1078 &mut meta_mixer,
1079 )
1080 }
1081 Err(e) => {
1082 return Err(io::Error::new(
1083 io::ErrorKind::Other,
1084 format!(
1085 "file was compressed with neural mode but LLM failed to load: {e}"
1086 ),
1087 ));
1088 }
1089 }
1090 } else {
1091 return Err(io::Error::new(
1092 io::ErrorKind::Other,
1093 "file was compressed with neural mode but no model found. \
1094 Set DATACORTEX_MODEL or use --model-path.",
1095 ));
1096 }
1097 }
1098
1099 #[cfg(not(feature = "neural"))]
1100 {
1101 let _ = model_path;
1102 return Err(io::Error::other(
1103 "file was compressed with neural mode but this build lacks the \
1104 `neural` feature. Rebuild with --features neural.",
1105 ));
1106 }
1107 } else {
1108 cm_decompress(&compressed[8..], preprocessed_size, config)
1109 }
1110 }
1111 };
1112
1113 let (preprocessed, transform_metadata) = if header.meta_embedded {
1118 if preprocessed.len() < 4 {
1119 return Err(io::Error::new(
1120 io::ErrorKind::InvalidData,
1121 "embedded metadata: decompressed stream too short for meta_len",
1122 ));
1123 }
1124 let meta_len =
1125 u32::from_le_bytes(preprocessed[0..4].try_into().expect("4-byte slice")) as usize;
1126 if preprocessed.len() < 4 + meta_len {
1127 return Err(io::Error::new(
1128 io::ErrorKind::InvalidData,
1129 format!(
1130 "embedded metadata: stream too short for metadata ({} bytes needed, {} available)",
1131 4 + meta_len,
1132 preprocessed.len()
1133 ),
1134 ));
1135 }
1136 let metadata = preprocessed[4..4 + meta_len].to_vec();
1137 let actual_preprocessed = preprocessed[4 + meta_len..].to_vec();
1138 (actual_preprocessed, metadata)
1139 } else {
1140 let tm = if header.meta_compressed && !header.transform_metadata.is_empty() {
1143 let mut decoder =
1144 zstd::Decoder::new(Cursor::new(&header.transform_metadata)).map_err(|e| {
1145 io::Error::new(
1146 io::ErrorKind::InvalidData,
1147 format!("failed to init metadata decompressor: {e}"),
1148 )
1149 })?;
1150 let mut decompressed_meta = Vec::new();
1151 decoder.read_to_end(&mut decompressed_meta).map_err(|e| {
1152 io::Error::new(
1153 io::ErrorKind::InvalidData,
1154 format!("failed to decompress transform metadata: {e}"),
1155 )
1156 })?;
1157 decompressed_meta
1158 } else {
1159 header.transform_metadata.clone()
1160 };
1161 (preprocessed, tm)
1162 };
1163
1164 let data = if transform_metadata.is_empty() {
1166 preprocessed
1167 } else {
1168 let chain = TransformChain::deserialize(&transform_metadata)?;
1169 reverse_preprocess(&preprocessed, &chain)
1170 };
1171
1172 let crc = crc32fast::hash(&data);
1174 if crc != header.crc32 {
1175 return Err(io::Error::new(
1176 io::ErrorKind::InvalidData,
1177 format!(
1178 "CRC-32 mismatch: expected {:#010X}, got {:#010X}",
1179 header.crc32, crc
1180 ),
1181 ));
1182 }
1183
1184 if data.len() as u64 != header.original_size {
1185 return Err(io::Error::new(
1186 io::ErrorKind::InvalidData,
1187 format!(
1188 "size mismatch: header says {} bytes, got {}",
1189 header.original_size,
1190 data.len()
1191 ),
1192 ));
1193 }
1194
1195 Ok(data)
1196}
1197
1198pub fn compress_to_vec(
1200 data: &[u8],
1201 mode: Mode,
1202 format_override: Option<FormatHint>,
1203) -> io::Result<Vec<u8>> {
1204 let mut buf = Vec::new();
1205 compress(data, mode, format_override, &mut buf)?;
1206 Ok(buf)
1207}
1208
1209pub fn compress_to_vec_with_model(
1211 data: &[u8],
1212 mode: Mode,
1213 format_override: Option<FormatHint>,
1214 model_path: Option<&str>,
1215) -> io::Result<Vec<u8>> {
1216 let mut buf = Vec::new();
1217 compress_with_model(data, mode, format_override, model_path, &mut buf)?;
1218 Ok(buf)
1219}
1220
1221pub fn compress_to_vec_with_options(
1223 data: &[u8],
1224 mode: Mode,
1225 format_override: Option<FormatHint>,
1226 model_path: Option<&str>,
1227 zstd_level_override: Option<i32>,
1228) -> io::Result<Vec<u8>> {
1229 let mut buf = Vec::new();
1230 compress_with_options(
1231 data,
1232 mode,
1233 format_override,
1234 model_path,
1235 zstd_level_override,
1236 &mut buf,
1237 )?;
1238 Ok(buf)
1239}
1240
1241pub fn decompress_from_slice(dcx_data: &[u8]) -> io::Result<Vec<u8>> {
1243 let mut cursor = Cursor::new(dcx_data);
1244 decompress(&mut cursor)
1245}
1246
1247pub fn read_header<R: Read>(input: &mut R) -> io::Result<DcxHeader> {
1249 DcxHeader::read_from(input)
1250}
1251
1252pub fn raw_zstd_compress(data: &[u8], level: i32) -> io::Result<Vec<u8>> {
1254 zstd::bulk::compress(data, level).map_err(io::Error::other)
1255}
1256
1257#[cfg(test)]
1258mod tests {
1259 use super::*;
1260
1261 #[test]
1262 fn fast_mode_roundtrip() {
1263 let original = b"Hello, DataCortex! This is a test of Fast mode compression.";
1264 let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1265 let decompressed = decompress_from_slice(&compressed).unwrap();
1266 assert_eq!(decompressed, original);
1267 }
1268
1269 #[test]
1270 fn fast_mode_json_roundtrip() {
1271 let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25,"name":"Carol","age":35}"#;
1272 let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1273 let decompressed = decompress_from_slice(&compressed).unwrap();
1274 assert_eq!(decompressed, data.to_vec());
1275 }
1276
1277 #[test]
1278 fn balanced_mode_roundtrip() {
1279 let original = b"Balanced mode test data with some content.";
1280 let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1281 let decompressed = decompress_from_slice(&compressed).unwrap();
1282 assert_eq!(decompressed, original);
1283 }
1284
1285 #[test]
1286 fn balanced_mode_longer_text() {
1287 let original = b"The quick brown fox jumps over the lazy dog. This sentence contains every letter of the English alphabet at least once. We need enough data to properly exercise the arithmetic coder and order-0 model.";
1288 let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1289 let decompressed = decompress_from_slice(&compressed).unwrap();
1290 assert_eq!(decompressed, original);
1291 }
1292
1293 #[test]
1294 fn balanced_mode_repetitive_data() {
1295 let data = "hello world! ".repeat(100);
1296 let compressed = compress_to_vec(data.as_bytes(), Mode::Balanced, None).unwrap();
1297 let decompressed = decompress_from_slice(&compressed).unwrap();
1298 assert_eq!(decompressed, data.as_bytes());
1299 }
1300
1301 #[test]
1302 fn balanced_mode_all_byte_values() {
1303 let original: Vec<u8> = (0..=255).collect();
1304 let compressed = compress_to_vec(&original, Mode::Balanced, None).unwrap();
1305 let decompressed = decompress_from_slice(&compressed).unwrap();
1306 assert_eq!(decompressed, original);
1307 }
1308
1309 #[test]
1310 fn balanced_mode_single_byte() {
1311 let original = b"X";
1312 let compressed = compress_to_vec(original, Mode::Balanced, None).unwrap();
1313 let decompressed = decompress_from_slice(&compressed).unwrap();
1314 assert_eq!(decompressed, original);
1315 }
1316
1317 #[test]
1318 fn balanced_mode_json_roundtrip() {
1319 let data = br#"{"name":"Alice","age":30,"name":"Bob","age":25,"name":"Carol","age":35}"#;
1320 let compressed = compress_to_vec(data, Mode::Balanced, Some(FormatHint::Json)).unwrap();
1321 let decompressed = decompress_from_slice(&compressed).unwrap();
1322 assert_eq!(decompressed, data.to_vec());
1323 }
1324
1325 #[test]
1326 fn empty_data_roundtrip() {
1327 let original = b"";
1328 for mode in [Mode::Fast, Mode::Balanced, Mode::Max] {
1329 let compressed = compress_to_vec(original, mode, None).unwrap();
1330 let decompressed = decompress_from_slice(&compressed).unwrap();
1331 assert_eq!(decompressed, original, "failed for mode {mode}");
1332 }
1333 }
1334
1335 #[test]
1336 fn crc_mismatch_detected() {
1337 let original = b"test data for CRC check";
1338 let mut compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1339 let header_size = 32; if compressed.len() > header_size + 5 {
1342 compressed[header_size + 3] ^= 0xFF;
1343 }
1344 assert!(decompress_from_slice(&compressed).is_err());
1345 }
1346
1347 #[test]
1348 fn fast_mode_actually_compresses() {
1349 let data = "hello world. ".repeat(100);
1351 let compressed = compress_to_vec(data.as_bytes(), Mode::Fast, None).unwrap();
1352 assert!(
1353 compressed.len() < data.len(),
1354 "Fast mode should compress repetitive data: {} vs {}",
1355 compressed.len(),
1356 data.len()
1357 );
1358 }
1359
1360 #[test]
1361 fn json_preprocessing_improves_fast_mode() {
1362 let data = br#"[{"name":"Alice","score":95},{"name":"Bob","score":87},{"name":"Carol","score":92},{"name":"Dave","score":88},{"name":"Eve","score":91}]"#;
1363 let with_preprocess = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1364 let without_preprocess =
1365 compress_to_vec(data, Mode::Fast, Some(FormatHint::Generic)).unwrap();
1366
1367 assert_eq!(
1369 decompress_from_slice(&with_preprocess).unwrap(),
1370 data.to_vec()
1371 );
1372 assert_eq!(
1373 decompress_from_slice(&without_preprocess).unwrap(),
1374 data.to_vec()
1375 );
1376 }
1377
1378 #[test]
1379 fn all_modes_roundtrip() {
1380 let data = b"test all modes with some more content to ensure decent compression";
1381 for mode in [Mode::Max, Mode::Balanced, Mode::Fast] {
1382 let compressed = compress_to_vec(data, mode, None).unwrap();
1383 let decompressed = decompress_from_slice(&compressed).unwrap();
1384 assert_eq!(decompressed, data, "failed for mode {mode}");
1385 }
1386 }
1387
1388 #[test]
1389 fn cm_compress_decompress_direct() {
1390 let data = b"Hello, World! This is a direct CM test.";
1391 let compressed = cm_compress(data, CMConfig::balanced());
1392 let decompressed = cm_decompress(&compressed, data.len(), CMConfig::balanced());
1393 assert_eq!(decompressed, data.to_vec());
1394 }
1395
1396 #[test]
1397 fn cm_empty() {
1398 let data: &[u8] = b"";
1399 let compressed = cm_compress(data, CMConfig::balanced());
1400 let decompressed = cm_decompress(&compressed, 0, CMConfig::balanced());
1401 assert!(decompressed.is_empty());
1402 }
1403
1404 #[test]
1405 fn cm_single_byte() {
1406 for byte in 0..=255u8 {
1407 let data = [byte];
1408 let compressed = cm_compress(&data, CMConfig::balanced());
1409 let decompressed = cm_decompress(&compressed, 1, CMConfig::balanced());
1410 assert_eq!(
1411 decompressed, data,
1412 "CM roundtrip failed for byte {byte:#04X}"
1413 );
1414 }
1415 }
1416
1417 #[test]
1418 fn cm_repetitive_compresses() {
1419 let data = vec![b'A'; 1000];
1420 let compressed = cm_compress(&data, CMConfig::balanced());
1421 assert!(
1423 compressed.len() < 200,
1424 "CM should compress 1000 identical bytes well: {} bytes",
1425 compressed.len()
1426 );
1427 let decompressed = cm_decompress(&compressed, data.len(), CMConfig::balanced());
1428 assert_eq!(decompressed, data);
1429 }
1430
1431 #[test]
1432 fn max_mode_roundtrip() {
1433 let original = b"Max mode test data with some content for compression.";
1434 let compressed = compress_to_vec(original, Mode::Max, None).unwrap();
1435 let decompressed = decompress_from_slice(&compressed).unwrap();
1436 assert_eq!(decompressed, original);
1437 }
1438
1439 #[test]
1440 fn max_mode_longer_text() {
1441 let original = b"The quick brown fox jumps over the lazy dog. Max mode uses 2x context maps for better predictions with fewer hash collisions. This should compress slightly better than balanced mode.";
1442 let compressed = compress_to_vec(original, Mode::Max, None).unwrap();
1443 let decompressed = decompress_from_slice(&compressed).unwrap();
1444 assert_eq!(decompressed, original);
1445 }
1446
1447 #[test]
1450 fn test_dict_compress_roundtrip() {
1451 let mut ndjson = String::new();
1454 for i in 0..500 {
1455 ndjson.push_str(&format!(
1456 r#"{{"id":{},"name":"user_{}","status":"active","score":{}}}"#,
1457 i,
1458 i,
1459 i * 17 % 100
1460 ));
1461 ndjson.push('\n');
1462 }
1463 let data = ndjson.as_bytes();
1464 assert!(
1465 data.len() > DICT_MIN_DATA_SIZE,
1466 "test data should exceed dict threshold: {} bytes",
1467 data.len()
1468 );
1469
1470 let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1471 let decompressed = decompress_from_slice(&compressed).unwrap();
1472 assert_eq!(
1473 decompressed, data,
1474 "dict compress roundtrip: byte-exact mismatch"
1475 );
1476 }
1477
1478 #[test]
1479 fn test_dict_falls_back_on_small() {
1480 let data = b"small data that won't trigger dictionary training";
1482 assert!(data.len() < DICT_MIN_DATA_SIZE);
1483
1484 let compressed = compress_to_vec(data, Mode::Fast, None).unwrap();
1485 let decompressed = decompress_from_slice(&compressed).unwrap();
1486 assert_eq!(decompressed, data.to_vec());
1487
1488 let mut cursor = Cursor::new(&compressed);
1490 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1491 assert!(!header.has_dict, "small data should not have dict flag set");
1492 }
1493
1494 #[test]
1495 fn test_dict_backward_compat() {
1496 let original = b"backward compatibility test data for decompression";
1499 let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1500
1501 let mut cursor = Cursor::new(&compressed);
1503 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1504 assert!(!header.has_dict);
1505
1506 let decompressed = decompress_from_slice(&compressed).unwrap();
1508 assert_eq!(decompressed, original.to_vec());
1509 }
1510
1511 #[test]
1512 fn test_dict_ndjson_large_roundtrip() {
1513 let mut ndjson = String::new();
1515 for i in 0..2000 {
1516 ndjson.push_str(&format!(
1517 r#"{{"timestamp":"2025-01-{:02}T{:02}:{:02}:00Z","level":"info","message":"Request processed","request_id":"req_{}","duration_ms":{}}}"#,
1518 (i % 28) + 1,
1519 i % 24,
1520 i % 60,
1521 i,
1522 (i * 13) % 500
1523 ));
1524 ndjson.push('\n');
1525 }
1526 let data = ndjson.as_bytes();
1527
1528 let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1529 let decompressed = decompress_from_slice(&compressed).unwrap();
1530 assert_eq!(decompressed, data, "large NDJSON roundtrip mismatch");
1531 }
1532
1533 #[test]
1534 fn test_dict_generic_data_roundtrip() {
1535 let mut data = Vec::new();
1538 for i in 0..3000 {
1539 data.extend_from_slice(
1540 format!("line {i}: the quick brown fox jumps over the lazy dog\n").as_bytes(),
1541 );
1542 }
1543 assert!(data.len() > DICT_MIN_DATA_SIZE);
1544
1545 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Generic)).unwrap();
1546 let decompressed = decompress_from_slice(&compressed).unwrap();
1547 assert_eq!(decompressed, data, "generic data dict roundtrip mismatch");
1548 }
1549
1550 #[test]
1551 fn test_dict_does_not_affect_other_modes() {
1552 let mut ndjson = String::new();
1555 for i in 0..200 {
1556 ndjson.push_str(&format!(
1557 r#"{{"id":{},"name":"user_{}","status":"active"}}"#,
1558 i, i
1559 ));
1560 ndjson.push('\n');
1561 }
1562 let data = ndjson.as_bytes();
1563
1564 for mode in [Mode::Balanced, Mode::Max] {
1565 let compressed = compress_to_vec(data, mode, Some(FormatHint::Ndjson)).unwrap();
1566 let mut cursor = Cursor::new(&compressed);
1567 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1568 assert!(!header.has_dict, "mode {mode} should never have dict flag");
1569 let decompressed = decompress_from_slice(&compressed).unwrap();
1570 assert_eq!(decompressed, data, "roundtrip failed for mode {mode}");
1571 }
1572 }
1573
1574 #[test]
1577 fn test_compress_with_level() {
1578 let data = "hello world, compressing with custom zstd level. ".repeat(50);
1580 let compressed =
1581 compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(19))
1582 .unwrap();
1583 let decompressed = decompress_from_slice(&compressed).unwrap();
1584 assert_eq!(decompressed, data.as_bytes(), "level 19 roundtrip failed");
1585 }
1586
1587 #[test]
1588 fn test_compress_with_level_default() {
1589 let data = "default level test data. ".repeat(50);
1591 let compressed =
1592 compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, None).unwrap();
1593 let decompressed = decompress_from_slice(&compressed).unwrap();
1594 assert_eq!(
1595 decompressed,
1596 data.as_bytes(),
1597 "default level roundtrip failed"
1598 );
1599 }
1600
1601 #[test]
1602 fn test_compress_with_level_higher_ratio() {
1603 let data = r#"{"name":"Alice","score":95}"#.repeat(200);
1605 let low =
1606 compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(1)).unwrap();
1607 let high = compress_to_vec_with_options(data.as_bytes(), Mode::Fast, None, None, Some(19))
1608 .unwrap();
1609
1610 assert_eq!(decompress_from_slice(&low).unwrap(), data.as_bytes());
1612 assert_eq!(decompress_from_slice(&high).unwrap(), data.as_bytes());
1613
1614 assert!(
1616 high.len() <= low.len(),
1617 "level 19 ({}) should be <= level 1 ({})",
1618 high.len(),
1619 low.len()
1620 );
1621 }
1622
1623 #[test]
1626 fn test_auto_fallback_picks_smaller() {
1627 let data = std::fs::read(concat!(
1631 env!("CARGO_MANIFEST_DIR"),
1632 "/../../corpus/json-bench/citm_catalog.json"
1633 ))
1634 .unwrap();
1635
1636 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1637 let decompressed = decompress_from_slice(&compressed).unwrap();
1638 assert_eq!(decompressed, data, "citm_catalog roundtrip failed");
1639
1640 let ratio = data.len() as f64 / compressed.len() as f64;
1642 assert!(
1643 ratio > 50.0,
1644 "citm_catalog should achieve >50x, got {ratio:.1}x"
1645 );
1646 }
1647
1648 #[test]
1649 fn test_auto_fallback_preprocessed_wins_on_ndjson() {
1650 let data = std::fs::read(concat!(
1653 env!("CARGO_MANIFEST_DIR"),
1654 "/../../corpus/test-ndjson.ndjson"
1655 ))
1656 .unwrap();
1657
1658 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1659 let decompressed = decompress_from_slice(&compressed).unwrap();
1660 assert_eq!(decompressed, data, "test-ndjson roundtrip failed");
1661
1662 let mut cursor = Cursor::new(&compressed);
1665 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1666 assert!(
1667 !header.transform_metadata.is_empty() || header.meta_embedded,
1668 "test-ndjson should prefer preprocessed path (non-empty transform metadata or embedded)"
1669 );
1670 }
1671
1672 #[test]
1673 fn test_auto_fallback_roundtrip() {
1674 let citm = std::fs::read(concat!(
1677 env!("CARGO_MANIFEST_DIR"),
1678 "/../../corpus/json-bench/citm_catalog.json"
1679 ))
1680 .unwrap();
1681 let ndjson = std::fs::read(concat!(
1682 env!("CARGO_MANIFEST_DIR"),
1683 "/../../corpus/test-ndjson.ndjson"
1684 ))
1685 .unwrap();
1686
1687 let compressed_citm = compress_to_vec(&citm, Mode::Fast, Some(FormatHint::Json)).unwrap();
1689 let decompressed_citm = decompress_from_slice(&compressed_citm).unwrap();
1690 assert_eq!(
1691 decompressed_citm, citm,
1692 "citm_catalog roundtrip (raw path) failed"
1693 );
1694
1695 let compressed_ndjson =
1697 compress_to_vec(&ndjson, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1698 let decompressed_ndjson = decompress_from_slice(&compressed_ndjson).unwrap();
1699 assert_eq!(
1700 decompressed_ndjson, ndjson,
1701 "test-ndjson roundtrip (preprocessed path) failed"
1702 );
1703 }
1704
1705 #[test]
1708 fn test_adaptive_level_small_data() {
1709 assert_eq!(adaptive_fast_level(100_000, None), 19);
1711 assert_eq!(adaptive_fast_level(500_000, None), 19);
1712 assert_eq!(adaptive_fast_level(1_048_576, None), 19);
1713 assert_eq!(adaptive_fast_level(0, None), 19);
1714 }
1715
1716 #[test]
1717 fn test_adaptive_level_large_data() {
1718 assert_eq!(adaptive_fast_level(1_048_577, None), 13);
1720 assert_eq!(adaptive_fast_level(5_000_000, None), 13);
1721 assert_eq!(adaptive_fast_level(10_485_760, None), 13);
1722 assert_eq!(adaptive_fast_level(10_485_761, None), 9);
1723 assert_eq!(adaptive_fast_level(100_000_000, None), 9);
1724 }
1725
1726 #[test]
1727 fn test_adaptive_level_override() {
1728 assert_eq!(adaptive_fast_level(100, Some(3)), 3);
1730 assert_eq!(adaptive_fast_level(100_000_000, Some(22)), 22);
1731 assert_eq!(adaptive_fast_level(0, Some(1)), 1);
1732 }
1733
1734 #[test]
1737 fn test_compressed_metadata_roundtrip() {
1738 let mut ndjson = String::new();
1740 for i in 0..500 {
1741 ndjson.push_str(&format!(
1742 r#"{{"id":{},"name":"user_{}","status":"active","score":{}}}"#,
1743 i,
1744 i,
1745 i * 17 % 100
1746 ));
1747 ndjson.push('\n');
1748 }
1749 let data = ndjson.as_bytes();
1750
1751 let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1752 let decompressed = decompress_from_slice(&compressed).unwrap();
1753 assert_eq!(
1754 decompressed, data,
1755 "compressed metadata roundtrip: byte-exact mismatch"
1756 );
1757
1758 let mut cursor = Cursor::new(&compressed);
1760 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1761 if !header.transform_metadata.is_empty() && header.transform_metadata.len() > 10 {
1763 }
1767 }
1768
1769 #[test]
1770 fn test_compressed_metadata_backward_compat() {
1771 let original = b"backward compatibility test data for metadata decompression";
1774 let compressed = compress_to_vec(original, Mode::Fast, None).unwrap();
1775
1776 let decompressed = decompress_from_slice(&compressed).unwrap();
1778 assert_eq!(decompressed, original.to_vec());
1779
1780 let mut cursor = Cursor::new(&compressed);
1782 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1783 assert!(!header.meta_compressed || !header.transform_metadata.is_empty());
1785 }
1786
1787 #[test]
1788 fn test_compressed_metadata_small_skipped() {
1789 let data = br#"{"name":"Alice","age":30}"#;
1792 let compressed = compress_to_vec(data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1793 let decompressed = decompress_from_slice(&compressed).unwrap();
1794 assert_eq!(decompressed, data.to_vec());
1795
1796 let mut cursor = Cursor::new(&compressed);
1797 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1798 if header.transform_metadata.len() <= 64 {
1800 assert!(
1801 !header.meta_compressed,
1802 "metadata <= 64 bytes should not be compressed, but meta_compressed=true \
1803 for {} bytes of metadata",
1804 header.transform_metadata.len()
1805 );
1806 }
1807 }
1808
1809 #[test]
1810 fn test_twitter_json_brotli_wins() {
1811 let data = std::fs::read(concat!(
1814 env!("CARGO_MANIFEST_DIR"),
1815 "/../../corpus/json-bench/twitter.json"
1816 ))
1817 .unwrap();
1818
1819 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1820 let decompressed = decompress_from_slice(&compressed).unwrap();
1821 assert_eq!(decompressed, data, "twitter.json roundtrip failed");
1822
1823 let mut cursor = Cursor::new(&compressed);
1825 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1826 assert!(
1827 header.use_brotli,
1828 "twitter.json should use brotli (FLAG_BROTLI set in header)"
1829 );
1830 }
1831
1832 #[test]
1833 fn test_compressed_metadata_all_modes_roundtrip() {
1834 let mut ndjson = String::new();
1836 for i in 0..200 {
1837 ndjson.push_str(&format!(
1838 r#"{{"id":{},"name":"user_{}","status":"active"}}"#,
1839 i, i
1840 ));
1841 ndjson.push('\n');
1842 }
1843 let data = ndjson.as_bytes();
1844
1845 for mode in [Mode::Fast, Mode::Balanced, Mode::Max] {
1846 let compressed = compress_to_vec(data, mode, Some(FormatHint::Ndjson)).unwrap();
1847 let decompressed = decompress_from_slice(&compressed).unwrap();
1848 assert_eq!(
1849 decompressed, data,
1850 "compressed metadata roundtrip failed for mode {mode}"
1851 );
1852 }
1853 }
1854
1855 #[test]
1858 fn test_brotli_compress_roundtrip() {
1859 let data = b"Hello, brotli! This is a test of the brotli compression helpers.";
1861 let compressed = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
1862 let decompressed = brotli_decompress(&compressed).unwrap();
1863 assert_eq!(decompressed, data.to_vec());
1864 }
1865
1866 #[test]
1867 fn test_brotli_auto_fallback_twitter() {
1868 let data = std::fs::read(concat!(
1870 env!("CARGO_MANIFEST_DIR"),
1871 "/../../corpus/json-bench/twitter.json"
1872 ))
1873 .unwrap();
1874
1875 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1876 let decompressed = decompress_from_slice(&compressed).unwrap();
1877 assert_eq!(decompressed, data, "twitter.json brotli roundtrip failed");
1878
1879 let mut cursor = Cursor::new(&compressed);
1880 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
1881 assert!(
1882 header.use_brotli,
1883 "twitter.json should use brotli in auto-fallback"
1884 );
1885 }
1886
1887 #[test]
1888 fn test_brotli_ndjson_roundtrip() {
1889 let data = std::fs::read(concat!(
1892 env!("CARGO_MANIFEST_DIR"),
1893 "/../../corpus/test-ndjson.ndjson"
1894 ))
1895 .unwrap();
1896
1897 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
1898 let decompressed = decompress_from_slice(&compressed).unwrap();
1899 assert_eq!(decompressed, data, "ndjson roundtrip failed");
1900 }
1901
1902 #[test]
1903 fn test_brotli_backward_compat() {
1904 let original = b"backward compatibility test: this data was compressed without brotli";
1908 let crc = crc32fast::hash(original);
1909 let zstd_compressed = zstd::bulk::compress(original, 19).unwrap();
1910
1911 let header = crate::dcx::DcxHeader {
1912 mode: Mode::Fast,
1913 format_hint: crate::dcx::FormatHint::Generic,
1914 original_size: original.len() as u64,
1915 compressed_size: zstd_compressed.len() as u64,
1916 crc32: crc,
1917 transform_metadata: vec![],
1918 has_dict: false,
1919 meta_compressed: false,
1920 use_brotli: false,
1921 meta_embedded: false,
1922 };
1923
1924 let mut buf = Vec::new();
1925 header.write_to(&mut buf).unwrap();
1926 buf.extend_from_slice(&zstd_compressed);
1927
1928 assert_eq!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
1930
1931 let decompressed = decompress_from_slice(&buf).unwrap();
1933 assert_eq!(decompressed, original.to_vec());
1934 }
1935
1936 #[test]
1939 fn test_embedded_metadata_roundtrip() {
1940 let data = std::fs::read(concat!(
1943 env!("CARGO_MANIFEST_DIR"),
1944 "/../../corpus/test-api.json"
1945 ))
1946 .unwrap();
1947
1948 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
1949 let decompressed = decompress_from_slice(&compressed).unwrap();
1950 assert_eq!(
1951 decompressed, data,
1952 "test-api.json embedded metadata roundtrip: byte-exact mismatch"
1953 );
1954 }
1955
1956 #[test]
1957 fn test_embedded_metadata_backward_compat() {
1958 let original = b"backward compat: no embedded metadata in this old file format";
1962 let crc = crc32fast::hash(original);
1963 let zstd_compressed = zstd::bulk::compress(original, 19).unwrap();
1964
1965 let header = crate::dcx::DcxHeader {
1966 mode: Mode::Fast,
1967 format_hint: crate::dcx::FormatHint::Generic,
1968 original_size: original.len() as u64,
1969 compressed_size: zstd_compressed.len() as u64,
1970 crc32: crc,
1971 transform_metadata: vec![],
1972 has_dict: false,
1973 meta_compressed: false,
1974 use_brotli: false,
1975 meta_embedded: false,
1976 };
1977
1978 let mut buf = Vec::new();
1979 header.write_to(&mut buf).unwrap();
1980 buf.extend_from_slice(&zstd_compressed);
1981
1982 assert_eq!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
1984
1985 let decompressed = decompress_from_slice(&buf).unwrap();
1987 assert_eq!(decompressed, original.to_vec());
1988 }
1989
1990 #[test]
1991 fn test_embedded_metadata_small_file_improvement() {
1992 let data = std::fs::read(concat!(
1995 env!("CARGO_MANIFEST_DIR"),
1996 "/../../corpus/test-api.json"
1997 ))
1998 .unwrap();
1999
2000 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2001 let decompressed = decompress_from_slice(&compressed).unwrap();
2002 assert_eq!(decompressed, data, "roundtrip failed");
2003
2004 let ratio = data.len() as f64 / compressed.len() as f64;
2006 assert!(
2007 ratio > 5.0,
2008 "test-api.json should achieve >5x compression, got {ratio:.1}x"
2009 );
2010
2011 let mut cursor = Cursor::new(&compressed);
2013 let header = crate::dcx::DcxHeader::read_from(&mut cursor).unwrap();
2014
2015 if header.meta_embedded {
2017 assert!(
2018 header.transform_metadata.is_empty(),
2019 "meta_embedded header should have empty transform_metadata"
2020 );
2021 assert!(header.use_brotli, "meta_embedded should use brotli codec");
2022 }
2023 }
2024
2025 #[test]
2026 fn test_embedded_metadata_ndjson_roundtrip() {
2027 let data = std::fs::read(concat!(
2030 env!("CARGO_MANIFEST_DIR"),
2031 "/../../corpus/test-ndjson.ndjson"
2032 ))
2033 .unwrap();
2034
2035 let compressed = compress_to_vec(&data, Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2036 let decompressed = decompress_from_slice(&compressed).unwrap();
2037 assert_eq!(
2038 decompressed, data,
2039 "NDJSON embedded metadata roundtrip: byte-exact mismatch"
2040 );
2041 }
2042
2043 #[test]
2044 fn test_embedded_metadata_manual_roundtrip() {
2045 let original = b"Hello, embedded metadata world! This is a test.";
2048 let crc = crc32fast::hash(original);
2049
2050 let empty_chain = TransformChain::new();
2053 let raw_metadata = empty_chain.serialize();
2054
2055 let mut embedded = Vec::new();
2057 embedded.extend_from_slice(&(raw_metadata.len() as u32).to_le_bytes());
2058 embedded.extend_from_slice(&raw_metadata);
2059 embedded.extend_from_slice(original);
2060
2061 let brotli_data = brotli_compress(&embedded, 11, BROTLI_MODE_GENERIC).unwrap();
2062
2063 let header = crate::dcx::DcxHeader {
2064 mode: Mode::Fast,
2065 format_hint: crate::dcx::FormatHint::Generic,
2066 original_size: original.len() as u64,
2067 compressed_size: brotli_data.len() as u64,
2068 crc32: crc,
2069 transform_metadata: vec![], has_dict: false,
2071 meta_compressed: false,
2072 use_brotli: true,
2073 meta_embedded: true,
2074 };
2075
2076 let mut buf = Vec::new();
2077 header.write_to(&mut buf).unwrap();
2078 buf.extend_from_slice(&brotli_data);
2079
2080 assert_ne!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2082 assert_ne!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2083
2084 let decompressed = decompress_from_slice(&buf).unwrap();
2086 assert_eq!(decompressed, original.to_vec());
2087 }
2088
2089 #[test]
2092 fn test_brotli_text_mode_on_raw() {
2093 let data = br#"{"name":"Alice","age":30,"city":"New York","active":true}"#;
2095
2096 let compressed_text = brotli_compress(data, 11, BROTLI_MODE_TEXT).unwrap();
2098 let decompressed_text = brotli_decompress(&compressed_text).unwrap();
2099 assert_eq!(
2100 decompressed_text,
2101 data.to_vec(),
2102 "TEXT mode roundtrip failed"
2103 );
2104
2105 let compressed_generic = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
2107 let decompressed_generic = brotli_decompress(&compressed_generic).unwrap();
2108 assert_eq!(
2109 decompressed_generic,
2110 data.to_vec(),
2111 "GENERIC mode roundtrip failed"
2112 );
2113
2114 assert!(
2119 !compressed_text.is_empty(),
2120 "TEXT mode should produce non-empty output"
2121 );
2122 }
2123
2124 #[test]
2127 fn test_zstd_embedded_metadata_roundtrip() {
2128 let original = b"Hello, zstd embedded metadata! This is a test of the zstd path.";
2131 let crc = crc32fast::hash(original);
2132
2133 let empty_chain = TransformChain::new();
2135 let raw_metadata = empty_chain.serialize();
2136
2137 let mut embedded = Vec::new();
2139 embedded.extend_from_slice(&(raw_metadata.len() as u32).to_le_bytes());
2140 embedded.extend_from_slice(&raw_metadata);
2141 embedded.extend_from_slice(original);
2142
2143 let zstd_data = zstd::bulk::compress(&embedded, 19).unwrap();
2144
2145 let header = crate::dcx::DcxHeader {
2146 mode: Mode::Fast,
2147 format_hint: crate::dcx::FormatHint::Generic,
2148 original_size: original.len() as u64,
2149 compressed_size: zstd_data.len() as u64,
2150 crc32: crc,
2151 transform_metadata: vec![], has_dict: false,
2153 meta_compressed: false,
2154 use_brotli: false, meta_embedded: true,
2156 };
2157
2158 let mut buf = Vec::new();
2159 header.write_to(&mut buf).unwrap();
2160 buf.extend_from_slice(&zstd_data);
2161
2162 assert_ne!(buf[7] & crate::dcx::FLAG_META_EMBEDDED, 0);
2164 assert_eq!(buf[7] & crate::dcx::FLAG_BROTLI, 0);
2165
2166 let decompressed = decompress_from_slice(&buf).unwrap();
2168 assert_eq!(decompressed, original.to_vec());
2169 }
2170
2171 #[test]
2174 fn test_multi_quality_brotli() {
2175 let data = br#"{"items":[1,2,3,4,5],"nested":{"a":"hello","b":"world"}}"#;
2178
2179 let q10 = brotli_compress(data, 10, BROTLI_MODE_GENERIC).unwrap();
2180 let q11 = brotli_compress(data, 11, BROTLI_MODE_GENERIC).unwrap();
2181
2182 let dec_q10 = brotli_decompress(&q10).unwrap();
2183 let dec_q11 = brotli_decompress(&q11).unwrap();
2184
2185 assert_eq!(dec_q10, data.to_vec(), "quality 10 roundtrip failed");
2186 assert_eq!(dec_q11, data.to_vec(), "quality 11 roundtrip failed");
2187
2188 assert!(!q10.is_empty());
2190 assert!(!q11.is_empty());
2191
2192 let corpus_files = [
2196 concat!(env!("CARGO_MANIFEST_DIR"), "/../../corpus/test-api.json"),
2197 concat!(
2198 env!("CARGO_MANIFEST_DIR"),
2199 "/../../corpus/json-bench/twitter.json"
2200 ),
2201 ];
2202 for path in corpus_files {
2203 let file_data = std::fs::read(path).unwrap();
2204 let compressed =
2205 compress_to_vec(&file_data, Mode::Fast, Some(FormatHint::Json)).unwrap();
2206 let decompressed = decompress_from_slice(&compressed).unwrap();
2207 assert_eq!(
2208 decompressed, file_data,
2209 "multi-quality roundtrip failed for {path}"
2210 );
2211 }
2212 }
2213
2214 #[test]
2217 fn test_singleton_arrays_fast_roundtrip() {
2218 let rows: Vec<String> = (0..500)
2221 .map(|i| format!("{{\"items\":[{{\"x\":{}}}],\"id\":{}}}", i, i))
2222 .collect();
2223 let data = rows.join("\n") + "\n";
2224 let compressed =
2225 compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2226 let decompressed = decompress_from_slice(&compressed).unwrap();
2227 assert_eq!(
2228 decompressed,
2229 data.as_bytes(),
2230 "singleton_arrays fast mode roundtrip failed"
2231 );
2232 }
2233
2234 #[test]
2235 fn test_very_long_lines_fast_roundtrip() {
2236 let rows: Vec<String> = (0..50)
2239 .map(|i| format!("{{\"data\":\"{}\",\"id\":{}}}", "X".repeat(100_000), i))
2240 .collect();
2241 let data = rows.join("\n") + "\n";
2242 let compressed =
2243 compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2244 let decompressed = decompress_from_slice(&compressed).unwrap();
2245 assert_eq!(
2246 decompressed,
2247 data.as_bytes(),
2248 "very_long_lines fast mode roundtrip failed"
2249 );
2250 }
2251
2252 #[test]
2253 fn test_very_long_lines_balanced_roundtrip() {
2254 let rows: Vec<String> = (0..10)
2257 .map(|i| format!("{{\"data\":\"{}\",\"id\":{}}}", "X".repeat(100_000), i))
2258 .collect();
2259 let data = rows.join("\n") + "\n";
2260 let compressed =
2261 compress_to_vec(data.as_bytes(), Mode::Balanced, Some(FormatHint::Ndjson)).unwrap();
2262 let decompressed = decompress_from_slice(&compressed).unwrap();
2263 assert_eq!(
2264 decompressed,
2265 data.as_bytes(),
2266 "very_long_lines balanced mode roundtrip failed"
2267 );
2268 }
2269
2270 #[test]
2271 fn test_all_same_value_fast_roundtrip() {
2272 let rows: Vec<String> = (0..10_000).map(|_| "{\"x\":1}".to_string()).collect();
2277 let data = rows.join("\n") + "\n";
2278 let compressed =
2279 compress_to_vec(data.as_bytes(), Mode::Fast, Some(FormatHint::Ndjson)).unwrap();
2280 let decompressed = decompress_from_slice(&compressed).unwrap();
2281 assert_eq!(
2282 decompressed,
2283 data.as_bytes(),
2284 "all_same_value fast mode roundtrip failed"
2285 );
2286 }
2287
2288 #[test]
2289 fn test_generate_training_samples_degenerate() {
2290 let mut data = vec![0x02u8]; data.extend_from_slice(&[0x00; 9999]); let samples = generate_training_samples(&data, 1024);
2295 let avg_len = samples.iter().map(|s| s.len()).sum::<usize>() / samples.len();
2297 assert!(
2298 avg_len >= 8,
2299 "training samples average size should be >= 8, got {avg_len}"
2300 );
2301 }
2302}