Skip to main content

oxigdal_cache_advanced/
compression.rs

1//! Adaptive compression for cached data
2//!
3//! Automatically selects the best compression codec based on:
4//! - Data type and characteristics
5//! - Compression ratio vs speed tradeoff
6//! - Historical performance metrics
7
8use crate::error::{CacheError, Result};
9use bytes::Bytes;
10use std::collections::HashMap;
11
12/// Compression codec type
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
14pub enum CompressionCodec {
15    /// No compression
16    None,
17    /// LZ4 - very fast, moderate compression
18    Lz4,
19    /// Zstd - configurable speed/ratio tradeoff
20    Zstd,
21    /// Snappy - fast, moderate compression
22    Snappy,
23}
24
25/// Compression level for codecs that support it
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum CompressionLevel {
28    /// Fastest compression
29    Fast,
30    /// Balanced compression
31    Default,
32    /// Best compression ratio
33    Best,
34}
35
36impl CompressionLevel {
37    /// Convert to zstd level
38    pub fn to_zstd_level(&self) -> i32 {
39        match self {
40            CompressionLevel::Fast => 1,
41            CompressionLevel::Default => 3,
42            CompressionLevel::Best => 19,
43        }
44    }
45}
46
47/// Statistics for a compression operation
48#[derive(Debug, Clone)]
49pub struct CompressionStats {
50    /// Original size in bytes
51    pub original_size: usize,
52    /// Compressed size in bytes
53    pub compressed_size: usize,
54    /// Compression time in microseconds
55    pub compression_time_us: u64,
56    /// Decompression time in microseconds
57    pub decompression_time_us: u64,
58    /// Codec used
59    pub codec: CompressionCodec,
60}
61
62impl CompressionStats {
63    /// Calculate compression ratio
64    pub fn compression_ratio(&self) -> f64 {
65        if self.compressed_size > 0 {
66            self.original_size as f64 / self.compressed_size as f64
67        } else {
68            1.0
69        }
70    }
71
72    /// Calculate compression throughput in MB/s
73    pub fn compression_throughput_mbps(&self) -> f64 {
74        if self.compression_time_us > 0 {
75            let mb = self.original_size as f64 / (1024.0 * 1024.0);
76            let seconds = self.compression_time_us as f64 / 1_000_000.0;
77            mb / seconds
78        } else {
79            0.0
80        }
81    }
82
83    /// Calculate decompression throughput in MB/s
84    pub fn decompression_throughput_mbps(&self) -> f64 {
85        if self.decompression_time_us > 0 {
86            let mb = self.compressed_size as f64 / (1024.0 * 1024.0);
87            let seconds = self.decompression_time_us as f64 / 1_000_000.0;
88            mb / seconds
89        } else {
90            0.0
91        }
92    }
93
94    /// Calculate efficiency score (ratio * throughput)
95    pub fn efficiency_score(&self) -> f64 {
96        self.compression_ratio() * self.compression_throughput_mbps()
97    }
98}
99
100/// Data type hints for compression selection
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
102pub enum DataType {
103    /// Generic binary data
104    Binary,
105    /// Text/JSON/XML
106    Text,
107    /// Image data
108    Image,
109    /// Numerical/scientific data
110    Numerical,
111    /// Already compressed data
112    Compressed,
113}
114
115/// Adaptive compressor that selects best codec
116pub struct AdaptiveCompressor {
117    /// Historical performance data per codec and data type
118    performance_history: HashMap<(CompressionCodec, DataType), Vec<CompressionStats>>,
119    /// Default compression level
120    default_level: CompressionLevel,
121    /// Minimum size threshold for compression (bytes)
122    min_compress_size: usize,
123    /// Maximum history entries per codec/datatype
124    max_history: usize,
125}
126
127impl AdaptiveCompressor {
128    /// Create new adaptive compressor
129    pub fn new() -> Self {
130        Self {
131            performance_history: HashMap::new(),
132            default_level: CompressionLevel::Default,
133            min_compress_size: 1024, // 1KB
134            max_history: 100,
135        }
136    }
137
138    /// Set compression level
139    pub fn with_level(mut self, level: CompressionLevel) -> Self {
140        self.default_level = level;
141        self
142    }
143
144    /// Set minimum compression size
145    pub fn with_min_size(mut self, size: usize) -> Self {
146        self.min_compress_size = size;
147        self
148    }
149
150    /// Compress data with specified codec
151    pub fn compress(
152        &mut self,
153        data: &[u8],
154        codec: CompressionCodec,
155        data_type: DataType,
156    ) -> Result<Bytes> {
157        if data.len() < self.min_compress_size {
158            return Ok(Bytes::copy_from_slice(data));
159        }
160
161        let start = std::time::Instant::now();
162
163        let compressed = match codec {
164            CompressionCodec::None => Bytes::copy_from_slice(data),
165            CompressionCodec::Lz4 => self.compress_lz4(data)?,
166            CompressionCodec::Zstd => self.compress_zstd(data)?,
167            CompressionCodec::Snappy => self.compress_snappy(data)?,
168        };
169
170        let compression_time_us = start.elapsed().as_micros() as u64;
171
172        // Record statistics
173        let stats = CompressionStats {
174            original_size: data.len(),
175            compressed_size: compressed.len(),
176            compression_time_us,
177            decompression_time_us: 0,
178            codec,
179        };
180
181        self.record_stats(data_type, stats);
182
183        Ok(compressed)
184    }
185
186    /// Decompress data with specified codec
187    pub fn decompress(&mut self, data: &[u8], codec: CompressionCodec) -> Result<Bytes> {
188        let start = std::time::Instant::now();
189
190        let decompressed = match codec {
191            CompressionCodec::None => Bytes::copy_from_slice(data),
192            CompressionCodec::Lz4 => self.decompress_lz4(data)?,
193            CompressionCodec::Zstd => self.decompress_zstd(data)?,
194            CompressionCodec::Snappy => self.decompress_snappy(data)?,
195        };
196
197        let _decompression_time_us = start.elapsed().as_micros() as u64;
198
199        Ok(decompressed)
200    }
201
202    /// Select best codec for data type based on historical performance
203    pub fn select_codec(&self, data_type: DataType) -> CompressionCodec {
204        // Find codec with best efficiency score for this data type
205        let mut best_codec = CompressionCodec::Lz4; // Default
206        let mut best_score = 0.0;
207
208        for codec in &[
209            CompressionCodec::Lz4,
210            CompressionCodec::Zstd,
211            CompressionCodec::Snappy,
212        ] {
213            if let Some(stats_vec) = self.performance_history.get(&(*codec, data_type)) {
214                if !stats_vec.is_empty() {
215                    let avg_score: f64 =
216                        stats_vec.iter().map(|s| s.efficiency_score()).sum::<f64>()
217                            / stats_vec.len() as f64;
218
219                    if avg_score > best_score {
220                        best_score = avg_score;
221                        best_codec = *codec;
222                    }
223                }
224            }
225        }
226
227        // If no history, use heuristics
228        if best_score == 0.0 {
229            return self.heuristic_codec(data_type);
230        }
231
232        best_codec
233    }
234
235    /// Heuristic codec selection based on data type
236    fn heuristic_codec(&self, data_type: DataType) -> CompressionCodec {
237        match data_type {
238            DataType::Binary => CompressionCodec::Lz4,
239            DataType::Text => CompressionCodec::Zstd,
240            DataType::Image => CompressionCodec::Lz4,
241            DataType::Numerical => CompressionCodec::Zstd,
242            DataType::Compressed => CompressionCodec::None,
243        }
244    }
245
246    /// Compress with LZ4
247    fn compress_lz4(&self, data: &[u8]) -> Result<Bytes> {
248        // prepend_size=true so that decompress can determine the original size
249        lz4::block::compress(data, None, true)
250            .map(Bytes::from)
251            .map_err(|e| CacheError::Compression(e.to_string()))
252    }
253
254    /// Decompress with LZ4
255    fn decompress_lz4(&self, data: &[u8]) -> Result<Bytes> {
256        lz4::block::decompress(data, None)
257            .map(Bytes::from)
258            .map_err(|e| CacheError::Decompression(e.to_string()))
259    }
260
261    /// Compress with Zstd
262    fn compress_zstd(&self, data: &[u8]) -> Result<Bytes> {
263        let level = self.default_level.to_zstd_level();
264        zstd::encode_all(data, level)
265            .map(Bytes::from)
266            .map_err(|e| CacheError::Compression(e.to_string()))
267    }
268
269    /// Decompress with Zstd
270    fn decompress_zstd(&self, data: &[u8]) -> Result<Bytes> {
271        zstd::decode_all(data)
272            .map(Bytes::from)
273            .map_err(|e| CacheError::Decompression(e.to_string()))
274    }
275
276    /// Compress with Snappy
277    fn compress_snappy(&self, data: &[u8]) -> Result<Bytes> {
278        let mut encoder = snap::raw::Encoder::new();
279        encoder
280            .compress_vec(data)
281            .map(Bytes::from)
282            .map_err(|e| CacheError::Compression(e.to_string()))
283    }
284
285    /// Decompress with Snappy
286    fn decompress_snappy(&self, data: &[u8]) -> Result<Bytes> {
287        let mut decoder = snap::raw::Decoder::new();
288        decoder
289            .decompress_vec(data)
290            .map(Bytes::from)
291            .map_err(|e| CacheError::Decompression(e.to_string()))
292    }
293
294    /// Record compression statistics
295    fn record_stats(&mut self, data_type: DataType, stats: CompressionStats) {
296        let key = (stats.codec, data_type);
297        let history = self.performance_history.entry(key).or_default();
298
299        history.push(stats);
300
301        // Limit history size
302        if history.len() > self.max_history {
303            history.remove(0);
304        }
305    }
306
307    /// Get average compression ratio for a codec and data type
308    pub fn avg_compression_ratio(
309        &self,
310        codec: CompressionCodec,
311        data_type: DataType,
312    ) -> Option<f64> {
313        self.performance_history
314            .get(&(codec, data_type))
315            .and_then(|stats_vec| {
316                if stats_vec.is_empty() {
317                    None
318                } else {
319                    let avg = stats_vec.iter().map(|s| s.compression_ratio()).sum::<f64>()
320                        / stats_vec.len() as f64;
321                    Some(avg)
322                }
323            })
324    }
325
326    /// Get performance statistics for all codecs
327    pub fn get_performance_stats(
328        &self,
329    ) -> HashMap<(CompressionCodec, DataType), PerformanceMetrics> {
330        let mut result = HashMap::new();
331
332        for (key, stats_vec) in &self.performance_history {
333            if stats_vec.is_empty() {
334                continue;
335            }
336
337            let avg_ratio = stats_vec.iter().map(|s| s.compression_ratio()).sum::<f64>()
338                / stats_vec.len() as f64;
339
340            let avg_comp_throughput = stats_vec
341                .iter()
342                .map(|s| s.compression_throughput_mbps())
343                .sum::<f64>()
344                / stats_vec.len() as f64;
345
346            let avg_decomp_throughput = stats_vec
347                .iter()
348                .filter(|s| s.decompression_time_us > 0)
349                .map(|s| s.decompression_throughput_mbps())
350                .sum::<f64>()
351                / stats_vec.len() as f64;
352
353            result.insert(
354                *key,
355                PerformanceMetrics {
356                    avg_compression_ratio: avg_ratio,
357                    avg_compression_throughput_mbps: avg_comp_throughput,
358                    avg_decompression_throughput_mbps: avg_decomp_throughput,
359                    sample_count: stats_vec.len(),
360                },
361            );
362        }
363
364        result
365    }
366
367    /// Clear all performance history
368    pub fn clear_history(&mut self) {
369        self.performance_history.clear();
370    }
371}
372
373impl Default for AdaptiveCompressor {
374    fn default() -> Self {
375        Self::new()
376    }
377}
378
379/// Performance metrics summary
380#[derive(Debug, Clone)]
381pub struct PerformanceMetrics {
382    /// Average compression ratio
383    pub avg_compression_ratio: f64,
384    /// Average compression throughput in MB/s
385    pub avg_compression_throughput_mbps: f64,
386    /// Average decompression throughput in MB/s
387    pub avg_decompression_throughput_mbps: f64,
388    /// Number of samples
389    pub sample_count: usize,
390}
391
392/// Compressed data container
393#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
394pub struct CompressedData {
395    /// Compressed bytes
396    #[serde(with = "serde_bytes")]
397    pub data: Vec<u8>,
398    /// Codec used
399    pub codec: CompressionCodec,
400    /// Original size (for validation)
401    pub original_size: usize,
402}
403
404impl CompressedData {
405    /// Create new compressed data
406    pub fn new(data: Vec<u8>, codec: CompressionCodec, original_size: usize) -> Self {
407        Self {
408            data,
409            codec,
410            original_size,
411        }
412    }
413
414    /// Decompress the data
415    pub fn decompress(&self, compressor: &mut AdaptiveCompressor) -> Result<Bytes> {
416        let decompressed = compressor.decompress(&self.data, self.codec)?;
417
418        // Validate size
419        if decompressed.len() != self.original_size {
420            return Err(CacheError::Decompression(format!(
421                "Size mismatch: expected {}, got {}",
422                self.original_size,
423                decompressed.len()
424            )));
425        }
426
427        Ok(decompressed)
428    }
429
430    /// Get compressed size
431    pub fn compressed_size(&self) -> usize {
432        self.data.len()
433    }
434
435    /// Calculate compression ratio
436    pub fn compression_ratio(&self) -> f64 {
437        if !self.data.is_empty() {
438            self.original_size as f64 / self.data.len() as f64
439        } else {
440            1.0
441        }
442    }
443}
444
445mod serde_bytes {
446    use serde::{Deserialize, Deserializer, Serialize, Serializer};
447
448    pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> std::result::Result<S::Ok, S::Error>
449    where
450        S: Serializer,
451    {
452        bytes.serialize(serializer)
453    }
454
455    pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
456    where
457        D: Deserializer<'de>,
458    {
459        Vec::<u8>::deserialize(deserializer)
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    #[test]
468    fn test_lz4_compression() {
469        let mut compressor = AdaptiveCompressor::new();
470        let data = b"Hello, World! ".repeat(100);
471
472        let compressed = compressor
473            .compress(&data, CompressionCodec::Lz4, DataType::Text)
474            .expect("compression failed");
475
476        assert!(compressed.len() < data.len());
477
478        let decompressed = compressor
479            .decompress(&compressed, CompressionCodec::Lz4)
480            .expect("decompression failed");
481
482        assert_eq!(decompressed.as_ref(), &data[..]);
483    }
484
485    #[test]
486    fn test_zstd_compression() {
487        let mut compressor = AdaptiveCompressor::new();
488        let data = b"Test data for compression ".repeat(50);
489
490        let compressed = compressor
491            .compress(&data, CompressionCodec::Zstd, DataType::Text)
492            .expect("compression failed");
493
494        assert!(compressed.len() < data.len());
495
496        let decompressed = compressor
497            .decompress(&compressed, CompressionCodec::Zstd)
498            .expect("decompression failed");
499
500        assert_eq!(decompressed.as_ref(), &data[..]);
501    }
502
503    #[test]
504    fn test_snappy_compression() {
505        let mut compressor = AdaptiveCompressor::new();
506        let data = b"Snappy compression test ".repeat(50);
507
508        let compressed = compressor
509            .compress(&data, CompressionCodec::Snappy, DataType::Binary)
510            .expect("compression failed");
511
512        assert!(compressed.len() < data.len());
513
514        let decompressed = compressor
515            .decompress(&compressed, CompressionCodec::Snappy)
516            .expect("decompression failed");
517
518        assert_eq!(decompressed.as_ref(), &data[..]);
519    }
520
521    #[test]
522    fn test_codec_selection() {
523        let compressor = AdaptiveCompressor::new();
524
525        // Initially uses heuristics
526        assert_eq!(
527            compressor.select_codec(DataType::Text),
528            CompressionCodec::Zstd
529        );
530        assert_eq!(
531            compressor.select_codec(DataType::Binary),
532            CompressionCodec::Lz4
533        );
534        assert_eq!(
535            compressor.select_codec(DataType::Compressed),
536            CompressionCodec::None
537        );
538    }
539
540    #[test]
541    fn test_min_compress_size() {
542        let mut compressor = AdaptiveCompressor::new().with_min_size(1000);
543        let small_data = b"small";
544
545        let result = compressor
546            .compress(small_data, CompressionCodec::Lz4, DataType::Binary)
547            .expect("compression failed");
548
549        // Should not compress small data
550        assert_eq!(result.len(), small_data.len());
551    }
552
553    #[test]
554    fn test_compression_stats() {
555        let stats = CompressionStats {
556            original_size: 1000,
557            compressed_size: 500,
558            compression_time_us: 1000,
559            decompression_time_us: 500,
560            codec: CompressionCodec::Lz4,
561        };
562
563        approx::assert_relative_eq!(stats.compression_ratio(), 2.0, epsilon = 0.01);
564        assert!(stats.compression_throughput_mbps() > 0.0);
565        assert!(stats.decompression_throughput_mbps() > 0.0);
566    }
567
568    #[test]
569    fn test_compressed_data() {
570        let mut compressor = AdaptiveCompressor::new();
571        // Use enough repeated data to exceed min_compress_size (1024 bytes)
572        // and achieve a compression ratio > 1.0
573        let original = b"Test data for compression ratio validation! ".repeat(100);
574
575        let compressed_bytes = compressor
576            .compress(&original, CompressionCodec::Zstd, DataType::Binary)
577            .expect("compression failed");
578
579        let compressed_data = CompressedData::new(
580            compressed_bytes.to_vec(),
581            CompressionCodec::Zstd,
582            original.len(),
583        );
584
585        assert!(compressed_data.compression_ratio() > 1.0);
586
587        let decompressed = compressed_data
588            .decompress(&mut compressor)
589            .expect("decompression failed");
590
591        assert_eq!(decompressed.as_ref(), &original[..]);
592    }
593}