Skip to main content

phantom_protocol/transport/
compression.rs

1//! Production Adaptive Compression
2//!
3//! Real compression using LZ4 (lz4_flex) and Zstd:
4//! - Performance tier: LZ4 frame compression (~4 GB/s decompress, ~2 GB/s compress)
5//! - Standard tier: Zstd level 1 (~500 MB/s compress, ratio ~2.5x)
6//! - Constrained tier: off (CPU > bandwidth)
7//!
8//! Auto-probe: если compression ratio < threshold на первых N пакетах → отключить.
9//! Минимальный размер для сжатия = 64 байт (overhead не оправдан).
10
11/// Compression algorithm
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13#[repr(u8)]
14pub enum CompressionAlgo {
15    /// No compression (Constrained tier, or incompressible data)
16    None = 0,
17    /// LZ4 block compression (Performance tier) — lz4_flex
18    Lz4 = 1,
19    /// Zstd level 1 (Standard tier)
20    Zstd1 = 2,
21}
22
23impl CompressionAlgo {
24    pub fn from_byte(b: u8) -> Option<Self> {
25        match b {
26            0 => Some(Self::None),
27            1 => Some(Self::Lz4),
28            2 => Some(Self::Zstd1),
29            _ => None,
30        }
31    }
32
33    pub fn to_byte(self) -> u8 {
34        self as u8
35    }
36}
37
38/// Minimum payload size worth compressing
39const MIN_COMPRESS_SIZE: usize = 64;
40
41/// Default upper bound on decompressed output (16 MiB — matches the
42/// established-session frame cap in `TcpSessionTransport`). Decompression is an
43/// asymmetric operation: a few KiB of crafted LZ4/Zstd can expand to gigabytes
44/// (a "decompression bomb"). [`AdaptiveCompressor::decompress`] enforces this
45/// cap unconditionally; callers that need a different bound use
46/// [`AdaptiveCompressor::decompress_with_limit`].
47pub const MAX_DECOMPRESSED_LEN: usize = 16 * 1024 * 1024;
48
49/// Compression statistics for auto-probe
50#[derive(Debug, Clone)]
51pub struct CompressionStats {
52    /// Total uncompressed bytes fed in
53    pub total_input: u64,
54    /// Total compressed bytes produced
55    pub total_output: u64,
56    /// Number of compression samples
57    pub samples: u32,
58}
59
60impl Default for CompressionStats {
61    fn default() -> Self {
62        Self::new()
63    }
64}
65
66impl CompressionStats {
67    pub fn new() -> Self {
68        Self {
69            total_input: 0,
70            total_output: 0,
71            samples: 0,
72        }
73    }
74
75    /// Current compression ratio (input / output, higher = better, 1.0 = no benefit)
76    pub fn ratio(&self) -> f64 {
77        if self.total_output == 0 {
78            1.0
79        } else {
80            self.total_input as f64 / self.total_output as f64
81        }
82    }
83}
84
85/// Adaptive compressor with auto-probe.
86///
87/// Frame format for compressed data: `[algo:1][data:N]`
88/// If compression is skipped, raw data is returned with algo=0.
89pub struct AdaptiveCompressor {
90    algo: CompressionAlgo,
91    stats: CompressionStats,
92    /// Auto-disable if ratio < threshold after probe_samples
93    probe_threshold: f64,
94    probe_samples: u32,
95    /// Whether auto-probe disabled compression
96    disabled_by_probe: bool,
97    /// Zstd compression level (1-22, default 1 for speed)
98    zstd_level: i32,
99}
100
101impl AdaptiveCompressor {
102    /// Create with specified algorithm
103    pub fn new(algo: CompressionAlgo) -> Self {
104        Self {
105            algo,
106            stats: CompressionStats::new(),
107            probe_threshold: 1.05, // 5% savings minimum to keep compressing
108            probe_samples: 32,
109            disabled_by_probe: false,
110            zstd_level: 1,
111        }
112    }
113
114    /// No compression
115    pub fn none() -> Self {
116        Self::new(CompressionAlgo::None)
117    }
118
119    /// LZ4 for Performance tier
120    pub fn lz4() -> Self {
121        Self::new(CompressionAlgo::Lz4)
122    }
123
124    /// Zstd level 1 for Standard tier
125    pub fn zstd(level: i32) -> Self {
126        let mut c = Self::new(CompressionAlgo::Zstd1);
127        c.zstd_level = level.clamp(1, 22);
128        c
129    }
130
131    /// Currently active algorithm (may be None if auto-probe disabled it)
132    pub fn algorithm(&self) -> CompressionAlgo {
133        if self.disabled_by_probe {
134            CompressionAlgo::None
135        } else {
136            self.algo
137        }
138    }
139
140    /// Whether compression is effectively active
141    pub fn is_active(&self) -> bool {
142        self.algorithm() != CompressionAlgo::None
143    }
144
145    /// Compress data. Returns (algo_byte, compressed_data).
146    ///
147    /// If compression is off, skipped for small data, or output >= input:
148    /// returns (0, original_data_clone).
149    pub fn compress(&mut self, data: &[u8]) -> (u8, Vec<u8>) {
150        let active_algo = self.algorithm();
151
152        // Skip for None or small payloads
153        if active_algo == CompressionAlgo::None || data.len() < MIN_COMPRESS_SIZE {
154            return (CompressionAlgo::None.to_byte(), data.to_vec());
155        }
156
157        let compressed = match active_algo {
158            CompressionAlgo::Lz4 => Self::compress_lz4(data),
159            #[cfg(feature = "compression-zstd")]
160            CompressionAlgo::Zstd1 => Self::compress_zstd(data, self.zstd_level),
161            // Without `compression-zstd`, fall back to LZ4 transparently.
162            #[cfg(not(feature = "compression-zstd"))]
163            CompressionAlgo::Zstd1 => Self::compress_lz4(data),
164            // The `None` arm is already eliminated by the early-return at the
165            // top of this function, but matching defensively (rather than
166            // calling `unreachable!()`) means malformed enum extensions
167            // cannot become a panic source.
168            CompressionAlgo::None => return (CompressionAlgo::None.to_byte(), data.to_vec()),
169        };
170
171        // Update stats
172        self.stats.total_input += data.len() as u64;
173        self.stats.total_output += compressed.len() as u64;
174        self.stats.samples += 1;
175
176        // Auto-probe check
177        if self.stats.samples == self.probe_samples && self.stats.ratio() < self.probe_threshold {
178            self.disabled_by_probe = true;
179            return (CompressionAlgo::None.to_byte(), data.to_vec());
180        }
181
182        // Only return compressed if it actually saves space
183        if compressed.len() < data.len() {
184            (active_algo.to_byte(), compressed)
185        } else {
186            (CompressionAlgo::None.to_byte(), data.to_vec())
187        }
188    }
189
190    /// Decompress data given the algorithm byte, bounding the output to
191    /// [`MAX_DECOMPRESSED_LEN`]. An input that would expand beyond the cap is
192    /// rejected with [`CompressionError::OutputTooLarge`] rather than allocated.
193    pub fn decompress(algo_byte: u8, data: &[u8]) -> Result<Vec<u8>, CompressionError> {
194        Self::decompress_with_limit(algo_byte, data, MAX_DECOMPRESSED_LEN)
195    }
196
197    /// Decompress with a caller-chosen output cap. `max_output` is the largest
198    /// plaintext the caller is willing to materialise; the decoders refuse to
199    /// allocate or emit beyond it, so a decompression bomb cannot exhaust memory.
200    pub fn decompress_with_limit(
201        algo_byte: u8,
202        data: &[u8],
203        max_output: usize,
204    ) -> Result<Vec<u8>, CompressionError> {
205        let algo = CompressionAlgo::from_byte(algo_byte)
206            .ok_or(CompressionError::UnknownAlgorithm(algo_byte))?;
207        match algo {
208            CompressionAlgo::None => {
209                if data.len() > max_output {
210                    return Err(CompressionError::OutputTooLarge { limit: max_output });
211                }
212                Ok(data.to_vec())
213            }
214            CompressionAlgo::Lz4 => Self::decompress_lz4(data, max_output),
215            #[cfg(feature = "compression-zstd")]
216            CompressionAlgo::Zstd1 => Self::decompress_zstd(data, max_output),
217            // Without `compression-zstd`, we cannot decode Zstd payloads.
218            // Surface the failure as a typed error rather than fabricating
219            // garbage plaintext.
220            #[cfg(not(feature = "compression-zstd"))]
221            CompressionAlgo::Zstd1 => Err(CompressionError::DecompressFailed(
222                "Zstd disabled in this build (compression-zstd feature off)".into(),
223            )),
224        }
225    }
226
227    /// Compression stats
228    pub fn stats(&self) -> &CompressionStats {
229        &self.stats
230    }
231
232    /// Reset auto-probe state
233    pub fn reset_probe(&mut self) {
234        self.stats = CompressionStats::new();
235        self.disabled_by_probe = false;
236    }
237
238    // ── LZ4 (lz4_flex block mode) ────────────────────────────────────────
239
240    fn compress_lz4(data: &[u8]) -> Vec<u8> {
241        lz4_flex::compress_prepend_size(data)
242    }
243
244    fn decompress_lz4(data: &[u8], max_output: usize) -> Result<Vec<u8>, CompressionError> {
245        // `compress_prepend_size` writes the uncompressed length as a
246        // little-endian u32 prefix and `decompress_size_prepended` trusts it to
247        // pre-allocate the whole output. Reject an oversized declared length
248        // BEFORE that allocation happens — otherwise 8 bytes on the wire could
249        // force a multi-GiB `Vec` reservation (decompression-bomb guard).
250        if data.len() >= 4 {
251            let declared = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
252            if declared > max_output {
253                return Err(CompressionError::OutputTooLarge { limit: max_output });
254            }
255        }
256        lz4_flex::decompress_size_prepended(data)
257            .map_err(|e| CompressionError::DecompressFailed(format!("LZ4: {}", e)))
258    }
259
260    // ── Zstd ─────────────────────────────────────────────────────────────
261
262    #[cfg(feature = "compression-zstd")]
263    fn compress_zstd(data: &[u8], level: i32) -> Vec<u8> {
264        zstd::encode_all(data, level).unwrap_or_else(|_| data.to_vec())
265    }
266
267    #[cfg(feature = "compression-zstd")]
268    fn decompress_zstd(data: &[u8], max_output: usize) -> Result<Vec<u8>, CompressionError> {
269        use std::io::Read;
270        // Zstd frames may declare no content size, so we cannot pre-check a
271        // header the way LZ4 lets us. Instead, stream-decode through a reader
272        // capped at `max_output + 1`: if the decoder produces that many bytes
273        // the frame exceeds the cap and we fail closed before the `Vec` can
274        // grow without bound.
275        let mut decoder = zstd::stream::read::Decoder::new(data)
276            .map_err(|e| CompressionError::DecompressFailed(format!("Zstd: {}", e)))?;
277        let mut out = Vec::new();
278        let cap_plus_one = (max_output as u64).saturating_add(1);
279        decoder
280            .by_ref()
281            .take(cap_plus_one)
282            .read_to_end(&mut out)
283            .map_err(|e| CompressionError::DecompressFailed(format!("Zstd: {}", e)))?;
284        if out.len() > max_output {
285            return Err(CompressionError::OutputTooLarge { limit: max_output });
286        }
287        Ok(out)
288    }
289}
290
291/// Compression errors
292#[derive(Debug)]
293pub enum CompressionError {
294    UnknownAlgorithm(u8),
295    DecompressFailed(String),
296    /// The decompressed output reached the configured cap. Guards against a
297    /// decompression bomb (a tiny ciphertext that expands to gigabytes).
298    OutputTooLarge {
299        limit: usize,
300    },
301}
302
303impl std::fmt::Display for CompressionError {
304    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
305        match self {
306            Self::UnknownAlgorithm(b) => write!(f, "Unknown compression algorithm: 0x{:02x}", b),
307            Self::DecompressFailed(msg) => write!(f, "Decompression failed: {}", msg),
308            Self::OutputTooLarge { limit } => {
309                write!(f, "Decompressed output exceeds the {}-byte cap", limit)
310            }
311        }
312    }
313}
314
315impl std::error::Error for CompressionError {}
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320
321    #[test]
322    fn no_compression_passthrough() {
323        let mut c = AdaptiveCompressor::none();
324        let data = b"Hello, world!";
325        let (algo, result) = c.compress(data);
326        assert_eq!(algo, 0);
327        assert_eq!(result, data);
328    }
329
330    #[test]
331    fn lz4_round_trip() {
332        let mut c = AdaptiveCompressor::lz4();
333        // Highly compressible data (repeating pattern)
334        let data = vec![0u8; 4096];
335        let (algo, compressed) = c.compress(&data);
336        assert_eq!(algo, CompressionAlgo::Lz4.to_byte());
337        assert!(compressed.len() < data.len(), "LZ4 should compress zeros");
338        let decompressed = AdaptiveCompressor::decompress(algo, &compressed).unwrap();
339        assert_eq!(decompressed, data);
340        eprintln!(
341            "LZ4: {} → {} bytes (ratio {:.2}x)",
342            data.len(),
343            compressed.len(),
344            data.len() as f64 / compressed.len() as f64
345        );
346    }
347
348    #[cfg(feature = "compression-zstd")]
349    #[test]
350    fn zstd_round_trip() {
351        let mut c = AdaptiveCompressor::zstd(1);
352        // Compressible text-like data
353        let data: Vec<u8> = (0..2048)
354            .map(|i| b"The quick brown fox jumps over the lazy dog. "[i % 45])
355            .collect();
356        let (algo, compressed) = c.compress(&data);
357        assert_eq!(algo, CompressionAlgo::Zstd1.to_byte());
358        assert!(compressed.len() < data.len(), "Zstd should compress text");
359        let decompressed = AdaptiveCompressor::decompress(algo, &compressed).unwrap();
360        assert_eq!(decompressed, data);
361        eprintln!(
362            "Zstd: {} → {} bytes (ratio {:.2}x)",
363            data.len(),
364            compressed.len(),
365            data.len() as f64 / compressed.len() as f64
366        );
367    }
368
369    #[test]
370    fn skip_tiny_data() {
371        let mut c = AdaptiveCompressor::lz4();
372        let data = b"tiny"; // < 64 bytes → skip
373        let (algo, result) = c.compress(data);
374        assert_eq!(algo, 0);
375        assert_eq!(result, data);
376    }
377
378    #[test]
379    fn auto_probe_disable_on_random() {
380        let mut c = AdaptiveCompressor::lz4();
381        c.probe_samples = 8;
382        c.probe_threshold = 1.5; // Very high threshold — random data won't hit this
383
384        // Compress pseudo-random (incompressible) data
385        for i in 0u32..10 {
386            let data: Vec<u8> = (0..256)
387                .map(|j| ((i.wrapping_mul(2654435761).wrapping_add(j)) & 0xFF) as u8)
388                .collect();
389            let _ = c.compress(&data);
390        }
391        // After probe_samples, should be disabled
392        assert!(c.disabled_by_probe);
393        assert_eq!(c.algorithm(), CompressionAlgo::None);
394    }
395
396    #[test]
397    fn lz4_decompress_rejects_oversized_declared_size() {
398        // Craft an LZ4 size-prepended frame whose declared output length is
399        // absurd (~3 GiB). The guard must reject it from the LE size prefix
400        // alone, before `decompress_size_prepended` tries to allocate it.
401        let mut bomb = Vec::new();
402        bomb.extend_from_slice(&u32::to_le_bytes(0xC000_0000)); // declared ≈ 3 GiB
403        bomb.extend_from_slice(&[0u8; 16]); // arbitrary (never reached)
404        let err = AdaptiveCompressor::decompress(CompressionAlgo::Lz4.to_byte(), &bomb)
405            .expect_err("oversized declared size must be rejected");
406        assert!(
407            matches!(err, CompressionError::OutputTooLarge { .. }),
408            "expected OutputTooLarge, got {err:?}"
409        );
410    }
411
412    #[test]
413    fn lz4_decompress_with_limit_rejects_overlimit_output() {
414        // A genuinely compressible 4 KiB payload decodes fine under a generous
415        // cap, but a tight 100-byte cap rejects it via the declared-size guard.
416        let mut c = AdaptiveCompressor::lz4();
417        let data = vec![7u8; 4096];
418        let (algo, compressed) = c.compress(&data);
419        assert!(AdaptiveCompressor::decompress(algo, &compressed).is_ok());
420        let err = AdaptiveCompressor::decompress_with_limit(algo, &compressed, 100)
421            .expect_err("4 KiB output must exceed a 100-byte cap");
422        assert!(matches!(
423            err,
424            CompressionError::OutputTooLarge { limit: 100 }
425        ));
426    }
427
428    #[cfg(feature = "compression-zstd")]
429    #[test]
430    fn zstd_decompress_with_limit_rejects_overlimit_output() {
431        // Zstd carries no peekable content-size guarantee, so the cap is
432        // enforced by the bounded streaming read. A 4 KiB payload under a
433        // 100-byte cap must fail closed mid-stream.
434        let mut c = AdaptiveCompressor::zstd(1);
435        let data = vec![9u8; 4096];
436        let (algo, compressed) = c.compress(&data);
437        assert_eq!(algo, CompressionAlgo::Zstd1.to_byte());
438        assert!(AdaptiveCompressor::decompress(algo, &compressed).is_ok());
439        let err = AdaptiveCompressor::decompress_with_limit(algo, &compressed, 100)
440            .expect_err("4 KiB output must exceed a 100-byte cap");
441        assert!(matches!(
442            err,
443            CompressionError::OutputTooLarge { limit: 100 }
444        ));
445    }
446
447    #[test]
448    fn lz4_throughput() {
449        use std::time::Instant;
450
451        let data = vec![42u8; 64 * 1024]; // 64KB of compressible data
452        let iters = 10_000;
453
454        // Compress throughput
455        let start = Instant::now();
456        for _ in 0..iters {
457            let c = lz4_flex::compress_prepend_size(&data);
458            std::hint::black_box(c);
459        }
460        let elapsed = start.elapsed();
461        let tput = (data.len() * iters) as f64 / 1_048_576.0 / elapsed.as_secs_f64();
462        eprintln!("LZ4 compress:   {:.0} MiB/s (64KB payload)", tput);
463
464        // Decompress throughput
465        let compressed = lz4_flex::compress_prepend_size(&data);
466        let start = Instant::now();
467        for _ in 0..iters {
468            let d = lz4_flex::decompress_size_prepended(&compressed).unwrap();
469            std::hint::black_box(d);
470        }
471        let elapsed = start.elapsed();
472        let tput = (data.len() * iters) as f64 / 1_048_576.0 / elapsed.as_secs_f64();
473        eprintln!("LZ4 decompress: {:.0} MiB/s (64KB payload)", tput);
474    }
475
476    #[cfg(feature = "compression-zstd")]
477    #[test]
478    fn zstd_throughput() {
479        use std::time::Instant;
480
481        let data = vec![42u8; 64 * 1024]; // 64KB
482        let iters = 5_000;
483
484        let start = Instant::now();
485        for _ in 0..iters {
486            let c = zstd::encode_all(&data[..], 1).unwrap();
487            std::hint::black_box(c);
488        }
489        let elapsed = start.elapsed();
490        let tput = (data.len() * iters) as f64 / 1_048_576.0 / elapsed.as_secs_f64();
491        eprintln!("Zstd-1 compress:   {:.0} MiB/s (64KB payload)", tput);
492
493        let compressed = zstd::encode_all(&data[..], 1).unwrap();
494        let start = Instant::now();
495        for _ in 0..iters {
496            let d = zstd::decode_all(&compressed[..]).unwrap();
497            std::hint::black_box(d);
498        }
499        let elapsed = start.elapsed();
500        let tput = (data.len() * iters) as f64 / 1_048_576.0 / elapsed.as_secs_f64();
501        eprintln!("Zstd-1 decompress: {:.0} MiB/s (64KB payload)", tput);
502    }
503
504    #[test]
505    fn decompress_unknown_algo_fails() {
506        let result = AdaptiveCompressor::decompress(0xFF, &[1, 2, 3]);
507        assert!(result.is_err());
508    }
509}