phantom_protocol/transport/
compression.rs1#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13#[repr(u8)]
14pub enum CompressionAlgo {
15 None = 0,
17 Lz4 = 1,
19 Zstd1 = 2,
21}
22
23impl CompressionAlgo {
24 pub fn from_byte(b: u8) -> Option<Self> {
25 match b {
26 0 => Some(Self::None),
27 1 => Some(Self::Lz4),
28 2 => Some(Self::Zstd1),
29 _ => None,
30 }
31 }
32
33 pub fn to_byte(self) -> u8 {
34 self as u8
35 }
36}
37
38const MIN_COMPRESS_SIZE: usize = 64;
40
41pub const MAX_DECOMPRESSED_LEN: usize = 16 * 1024 * 1024;
48
49#[derive(Debug, Clone)]
51pub struct CompressionStats {
52 pub total_input: u64,
54 pub total_output: u64,
56 pub samples: u32,
58}
59
60impl Default for CompressionStats {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66impl CompressionStats {
67 pub fn new() -> Self {
68 Self {
69 total_input: 0,
70 total_output: 0,
71 samples: 0,
72 }
73 }
74
75 pub fn ratio(&self) -> f64 {
77 if self.total_output == 0 {
78 1.0
79 } else {
80 self.total_input as f64 / self.total_output as f64
81 }
82 }
83}
84
85pub struct AdaptiveCompressor {
90 algo: CompressionAlgo,
91 stats: CompressionStats,
92 probe_threshold: f64,
94 probe_samples: u32,
95 disabled_by_probe: bool,
97 zstd_level: i32,
99}
100
101impl AdaptiveCompressor {
102 pub fn new(algo: CompressionAlgo) -> Self {
104 Self {
105 algo,
106 stats: CompressionStats::new(),
107 probe_threshold: 1.05, probe_samples: 32,
109 disabled_by_probe: false,
110 zstd_level: 1,
111 }
112 }
113
114 pub fn none() -> Self {
116 Self::new(CompressionAlgo::None)
117 }
118
119 pub fn lz4() -> Self {
121 Self::new(CompressionAlgo::Lz4)
122 }
123
124 pub fn zstd(level: i32) -> Self {
126 let mut c = Self::new(CompressionAlgo::Zstd1);
127 c.zstd_level = level.clamp(1, 22);
128 c
129 }
130
131 pub fn algorithm(&self) -> CompressionAlgo {
133 if self.disabled_by_probe {
134 CompressionAlgo::None
135 } else {
136 self.algo
137 }
138 }
139
140 pub fn is_active(&self) -> bool {
142 self.algorithm() != CompressionAlgo::None
143 }
144
145 pub fn compress(&mut self, data: &[u8]) -> (u8, Vec<u8>) {
150 let active_algo = self.algorithm();
151
152 if active_algo == CompressionAlgo::None || data.len() < MIN_COMPRESS_SIZE {
154 return (CompressionAlgo::None.to_byte(), data.to_vec());
155 }
156
157 let compressed = match active_algo {
158 CompressionAlgo::Lz4 => Self::compress_lz4(data),
159 #[cfg(feature = "compression-zstd")]
160 CompressionAlgo::Zstd1 => Self::compress_zstd(data, self.zstd_level),
161 #[cfg(not(feature = "compression-zstd"))]
163 CompressionAlgo::Zstd1 => Self::compress_lz4(data),
164 CompressionAlgo::None => return (CompressionAlgo::None.to_byte(), data.to_vec()),
169 };
170
171 self.stats.total_input += data.len() as u64;
173 self.stats.total_output += compressed.len() as u64;
174 self.stats.samples += 1;
175
176 if self.stats.samples == self.probe_samples && self.stats.ratio() < self.probe_threshold {
178 self.disabled_by_probe = true;
179 return (CompressionAlgo::None.to_byte(), data.to_vec());
180 }
181
182 if compressed.len() < data.len() {
184 (active_algo.to_byte(), compressed)
185 } else {
186 (CompressionAlgo::None.to_byte(), data.to_vec())
187 }
188 }
189
190 pub fn decompress(algo_byte: u8, data: &[u8]) -> Result<Vec<u8>, CompressionError> {
194 Self::decompress_with_limit(algo_byte, data, MAX_DECOMPRESSED_LEN)
195 }
196
197 pub fn decompress_with_limit(
201 algo_byte: u8,
202 data: &[u8],
203 max_output: usize,
204 ) -> Result<Vec<u8>, CompressionError> {
205 let algo = CompressionAlgo::from_byte(algo_byte)
206 .ok_or(CompressionError::UnknownAlgorithm(algo_byte))?;
207 match algo {
208 CompressionAlgo::None => {
209 if data.len() > max_output {
210 return Err(CompressionError::OutputTooLarge { limit: max_output });
211 }
212 Ok(data.to_vec())
213 }
214 CompressionAlgo::Lz4 => Self::decompress_lz4(data, max_output),
215 #[cfg(feature = "compression-zstd")]
216 CompressionAlgo::Zstd1 => Self::decompress_zstd(data, max_output),
217 #[cfg(not(feature = "compression-zstd"))]
221 CompressionAlgo::Zstd1 => Err(CompressionError::DecompressFailed(
222 "Zstd disabled in this build (compression-zstd feature off)".into(),
223 )),
224 }
225 }
226
227 pub fn stats(&self) -> &CompressionStats {
229 &self.stats
230 }
231
232 pub fn reset_probe(&mut self) {
234 self.stats = CompressionStats::new();
235 self.disabled_by_probe = false;
236 }
237
238 fn compress_lz4(data: &[u8]) -> Vec<u8> {
241 lz4_flex::compress_prepend_size(data)
242 }
243
244 fn decompress_lz4(data: &[u8], max_output: usize) -> Result<Vec<u8>, CompressionError> {
245 if data.len() >= 4 {
251 let declared = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
252 if declared > max_output {
253 return Err(CompressionError::OutputTooLarge { limit: max_output });
254 }
255 }
256 lz4_flex::decompress_size_prepended(data)
257 .map_err(|e| CompressionError::DecompressFailed(format!("LZ4: {}", e)))
258 }
259
260 #[cfg(feature = "compression-zstd")]
263 fn compress_zstd(data: &[u8], level: i32) -> Vec<u8> {
264 zstd::encode_all(data, level).unwrap_or_else(|_| data.to_vec())
265 }
266
267 #[cfg(feature = "compression-zstd")]
268 fn decompress_zstd(data: &[u8], max_output: usize) -> Result<Vec<u8>, CompressionError> {
269 use std::io::Read;
270 let mut decoder = zstd::stream::read::Decoder::new(data)
276 .map_err(|e| CompressionError::DecompressFailed(format!("Zstd: {}", e)))?;
277 let mut out = Vec::new();
278 let cap_plus_one = (max_output as u64).saturating_add(1);
279 decoder
280 .by_ref()
281 .take(cap_plus_one)
282 .read_to_end(&mut out)
283 .map_err(|e| CompressionError::DecompressFailed(format!("Zstd: {}", e)))?;
284 if out.len() > max_output {
285 return Err(CompressionError::OutputTooLarge { limit: max_output });
286 }
287 Ok(out)
288 }
289}
290
291#[derive(Debug)]
293pub enum CompressionError {
294 UnknownAlgorithm(u8),
295 DecompressFailed(String),
296 OutputTooLarge {
299 limit: usize,
300 },
301}
302
303impl std::fmt::Display for CompressionError {
304 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305 match self {
306 Self::UnknownAlgorithm(b) => write!(f, "Unknown compression algorithm: 0x{:02x}", b),
307 Self::DecompressFailed(msg) => write!(f, "Decompression failed: {}", msg),
308 Self::OutputTooLarge { limit } => {
309 write!(f, "Decompressed output exceeds the {}-byte cap", limit)
310 }
311 }
312 }
313}
314
315impl std::error::Error for CompressionError {}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320
321 #[test]
322 fn no_compression_passthrough() {
323 let mut c = AdaptiveCompressor::none();
324 let data = b"Hello, world!";
325 let (algo, result) = c.compress(data);
326 assert_eq!(algo, 0);
327 assert_eq!(result, data);
328 }
329
330 #[test]
331 fn lz4_round_trip() {
332 let mut c = AdaptiveCompressor::lz4();
333 let data = vec![0u8; 4096];
335 let (algo, compressed) = c.compress(&data);
336 assert_eq!(algo, CompressionAlgo::Lz4.to_byte());
337 assert!(compressed.len() < data.len(), "LZ4 should compress zeros");
338 let decompressed = AdaptiveCompressor::decompress(algo, &compressed).unwrap();
339 assert_eq!(decompressed, data);
340 eprintln!(
341 "LZ4: {} → {} bytes (ratio {:.2}x)",
342 data.len(),
343 compressed.len(),
344 data.len() as f64 / compressed.len() as f64
345 );
346 }
347
348 #[cfg(feature = "compression-zstd")]
349 #[test]
350 fn zstd_round_trip() {
351 let mut c = AdaptiveCompressor::zstd(1);
352 let data: Vec<u8> = (0..2048)
354 .map(|i| b"The quick brown fox jumps over the lazy dog. "[i % 45])
355 .collect();
356 let (algo, compressed) = c.compress(&data);
357 assert_eq!(algo, CompressionAlgo::Zstd1.to_byte());
358 assert!(compressed.len() < data.len(), "Zstd should compress text");
359 let decompressed = AdaptiveCompressor::decompress(algo, &compressed).unwrap();
360 assert_eq!(decompressed, data);
361 eprintln!(
362 "Zstd: {} → {} bytes (ratio {:.2}x)",
363 data.len(),
364 compressed.len(),
365 data.len() as f64 / compressed.len() as f64
366 );
367 }
368
369 #[test]
370 fn skip_tiny_data() {
371 let mut c = AdaptiveCompressor::lz4();
372 let data = b"tiny"; let (algo, result) = c.compress(data);
374 assert_eq!(algo, 0);
375 assert_eq!(result, data);
376 }
377
378 #[test]
379 fn auto_probe_disable_on_random() {
380 let mut c = AdaptiveCompressor::lz4();
381 c.probe_samples = 8;
382 c.probe_threshold = 1.5; for i in 0u32..10 {
386 let data: Vec<u8> = (0..256)
387 .map(|j| ((i.wrapping_mul(2654435761).wrapping_add(j)) & 0xFF) as u8)
388 .collect();
389 let _ = c.compress(&data);
390 }
391 assert!(c.disabled_by_probe);
393 assert_eq!(c.algorithm(), CompressionAlgo::None);
394 }
395
396 #[test]
397 fn lz4_decompress_rejects_oversized_declared_size() {
398 let mut bomb = Vec::new();
402 bomb.extend_from_slice(&u32::to_le_bytes(0xC000_0000)); bomb.extend_from_slice(&[0u8; 16]); let err = AdaptiveCompressor::decompress(CompressionAlgo::Lz4.to_byte(), &bomb)
405 .expect_err("oversized declared size must be rejected");
406 assert!(
407 matches!(err, CompressionError::OutputTooLarge { .. }),
408 "expected OutputTooLarge, got {err:?}"
409 );
410 }
411
412 #[test]
413 fn lz4_decompress_with_limit_rejects_overlimit_output() {
414 let mut c = AdaptiveCompressor::lz4();
417 let data = vec![7u8; 4096];
418 let (algo, compressed) = c.compress(&data);
419 assert!(AdaptiveCompressor::decompress(algo, &compressed).is_ok());
420 let err = AdaptiveCompressor::decompress_with_limit(algo, &compressed, 100)
421 .expect_err("4 KiB output must exceed a 100-byte cap");
422 assert!(matches!(
423 err,
424 CompressionError::OutputTooLarge { limit: 100 }
425 ));
426 }
427
428 #[cfg(feature = "compression-zstd")]
429 #[test]
430 fn zstd_decompress_with_limit_rejects_overlimit_output() {
431 let mut c = AdaptiveCompressor::zstd(1);
435 let data = vec![9u8; 4096];
436 let (algo, compressed) = c.compress(&data);
437 assert_eq!(algo, CompressionAlgo::Zstd1.to_byte());
438 assert!(AdaptiveCompressor::decompress(algo, &compressed).is_ok());
439 let err = AdaptiveCompressor::decompress_with_limit(algo, &compressed, 100)
440 .expect_err("4 KiB output must exceed a 100-byte cap");
441 assert!(matches!(
442 err,
443 CompressionError::OutputTooLarge { limit: 100 }
444 ));
445 }
446
447 #[test]
448 fn lz4_throughput() {
449 use std::time::Instant;
450
451 let data = vec![42u8; 64 * 1024]; let iters = 10_000;
453
454 let start = Instant::now();
456 for _ in 0..iters {
457 let c = lz4_flex::compress_prepend_size(&data);
458 std::hint::black_box(c);
459 }
460 let elapsed = start.elapsed();
461 let tput = (data.len() * iters) as f64 / 1_048_576.0 / elapsed.as_secs_f64();
462 eprintln!("LZ4 compress: {:.0} MiB/s (64KB payload)", tput);
463
464 let compressed = lz4_flex::compress_prepend_size(&data);
466 let start = Instant::now();
467 for _ in 0..iters {
468 let d = lz4_flex::decompress_size_prepended(&compressed).unwrap();
469 std::hint::black_box(d);
470 }
471 let elapsed = start.elapsed();
472 let tput = (data.len() * iters) as f64 / 1_048_576.0 / elapsed.as_secs_f64();
473 eprintln!("LZ4 decompress: {:.0} MiB/s (64KB payload)", tput);
474 }
475
476 #[cfg(feature = "compression-zstd")]
477 #[test]
478 fn zstd_throughput() {
479 use std::time::Instant;
480
481 let data = vec![42u8; 64 * 1024]; let iters = 5_000;
483
484 let start = Instant::now();
485 for _ in 0..iters {
486 let c = zstd::encode_all(&data[..], 1).unwrap();
487 std::hint::black_box(c);
488 }
489 let elapsed = start.elapsed();
490 let tput = (data.len() * iters) as f64 / 1_048_576.0 / elapsed.as_secs_f64();
491 eprintln!("Zstd-1 compress: {:.0} MiB/s (64KB payload)", tput);
492
493 let compressed = zstd::encode_all(&data[..], 1).unwrap();
494 let start = Instant::now();
495 for _ in 0..iters {
496 let d = zstd::decode_all(&compressed[..]).unwrap();
497 std::hint::black_box(d);
498 }
499 let elapsed = start.elapsed();
500 let tput = (data.len() * iters) as f64 / 1_048_576.0 / elapsed.as_secs_f64();
501 eprintln!("Zstd-1 decompress: {:.0} MiB/s (64KB payload)", tput);
502 }
503
504 #[test]
505 fn decompress_unknown_algo_fails() {
506 let result = AdaptiveCompressor::decompress(0xFF, &[1, 2, 3]);
507 assert!(result.is_err());
508 }
509}