Skip to main content

oxigdal_cache_advanced/
compression.rs

1//! Adaptive compression for cached data
2//!
3//! Automatically selects the best compression codec based on:
4//! - Data type and characteristics
5//! - Compression ratio vs speed tradeoff
6//! - Historical performance metrics
7
8use crate::error::{CacheError, Result};
9use bytes::Bytes;
10use std::collections::HashMap;
11
12/// Compression codec type
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
14pub enum CompressionCodec {
15    /// No compression
16    None,
17    /// LZ4 - very fast, moderate compression
18    Lz4,
19    /// Zstd - configurable speed/ratio tradeoff
20    Zstd,
21    /// Snappy - fast, moderate compression
22    Snappy,
23}
24
25/// Compression level for codecs that support it
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum CompressionLevel {
28    /// Fastest compression
29    Fast,
30    /// Balanced compression
31    Default,
32    /// Best compression ratio
33    Best,
34}
35
36impl CompressionLevel {
37    /// Convert to zstd level
38    pub fn to_zstd_level(&self) -> i32 {
39        match self {
40            CompressionLevel::Fast => 1,
41            CompressionLevel::Default => 3,
42            CompressionLevel::Best => 19,
43        }
44    }
45}
46
47/// Statistics for a compression operation
48#[derive(Debug, Clone)]
49pub struct CompressionStats {
50    /// Original size in bytes
51    pub original_size: usize,
52    /// Compressed size in bytes
53    pub compressed_size: usize,
54    /// Compression time in microseconds
55    pub compression_time_us: u64,
56    /// Decompression time in microseconds
57    pub decompression_time_us: u64,
58    /// Codec used
59    pub codec: CompressionCodec,
60}
61
62impl CompressionStats {
63    /// Calculate compression ratio
64    pub fn compression_ratio(&self) -> f64 {
65        if self.compressed_size > 0 {
66            self.original_size as f64 / self.compressed_size as f64
67        } else {
68            1.0
69        }
70    }
71
72    /// Calculate compression throughput in MB/s
73    pub fn compression_throughput_mbps(&self) -> f64 {
74        if self.compression_time_us > 0 {
75            let mb = self.original_size as f64 / (1024.0 * 1024.0);
76            let seconds = self.compression_time_us as f64 / 1_000_000.0;
77            mb / seconds
78        } else {
79            0.0
80        }
81    }
82
83    /// Calculate decompression throughput in MB/s
84    pub fn decompression_throughput_mbps(&self) -> f64 {
85        if self.decompression_time_us > 0 {
86            let mb = self.compressed_size as f64 / (1024.0 * 1024.0);
87            let seconds = self.decompression_time_us as f64 / 1_000_000.0;
88            mb / seconds
89        } else {
90            0.0
91        }
92    }
93
94    /// Calculate efficiency score (ratio * throughput)
95    pub fn efficiency_score(&self) -> f64 {
96        self.compression_ratio() * self.compression_throughput_mbps()
97    }
98}
99
100/// Data type hints for compression selection
101#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
102pub enum DataType {
103    /// Generic binary data
104    Binary,
105    /// Text/JSON/XML
106    Text,
107    /// Image data
108    Image,
109    /// Numerical/scientific data
110    Numerical,
111    /// Already compressed data
112    Compressed,
113}
114
115/// Adaptive compressor that selects best codec
116pub struct AdaptiveCompressor {
117    /// Historical performance data per codec and data type
118    performance_history: HashMap<(CompressionCodec, DataType), Vec<CompressionStats>>,
119    /// Default compression level
120    default_level: CompressionLevel,
121    /// Minimum size threshold for compression (bytes)
122    min_compress_size: usize,
123    /// Maximum history entries per codec/datatype
124    max_history: usize,
125}
126
127impl AdaptiveCompressor {
128    /// Create new adaptive compressor
129    pub fn new() -> Self {
130        Self {
131            performance_history: HashMap::new(),
132            default_level: CompressionLevel::Default,
133            min_compress_size: 1024, // 1KB
134            max_history: 100,
135        }
136    }
137
138    /// Set compression level
139    pub fn with_level(mut self, level: CompressionLevel) -> Self {
140        self.default_level = level;
141        self
142    }
143
144    /// Set minimum compression size
145    pub fn with_min_size(mut self, size: usize) -> Self {
146        self.min_compress_size = size;
147        self
148    }
149
150    /// Compress data with specified codec
151    pub fn compress(
152        &mut self,
153        data: &[u8],
154        codec: CompressionCodec,
155        data_type: DataType,
156    ) -> Result<Bytes> {
157        if data.len() < self.min_compress_size {
158            return Ok(Bytes::copy_from_slice(data));
159        }
160
161        let start = std::time::Instant::now();
162
163        let compressed = match codec {
164            CompressionCodec::None => Bytes::copy_from_slice(data),
165            CompressionCodec::Lz4 => self.compress_lz4(data)?,
166            CompressionCodec::Zstd => self.compress_zstd(data)?,
167            CompressionCodec::Snappy => self.compress_snappy(data)?,
168        };
169
170        let compression_time_us = start.elapsed().as_micros() as u64;
171
172        // Record statistics
173        let stats = CompressionStats {
174            original_size: data.len(),
175            compressed_size: compressed.len(),
176            compression_time_us,
177            decompression_time_us: 0,
178            codec,
179        };
180
181        self.record_stats(data_type, stats);
182
183        Ok(compressed)
184    }
185
186    /// Decompress data with specified codec
187    pub fn decompress(&mut self, data: &[u8], codec: CompressionCodec) -> Result<Bytes> {
188        let start = std::time::Instant::now();
189
190        let decompressed = match codec {
191            CompressionCodec::None => Bytes::copy_from_slice(data),
192            CompressionCodec::Lz4 => self.decompress_lz4(data)?,
193            CompressionCodec::Zstd => self.decompress_zstd(data)?,
194            CompressionCodec::Snappy => self.decompress_snappy(data)?,
195        };
196
197        let _decompression_time_us = start.elapsed().as_micros() as u64;
198
199        Ok(decompressed)
200    }
201
202    /// Select best codec for data type based on historical performance
203    pub fn select_codec(&self, data_type: DataType) -> CompressionCodec {
204        // Find codec with best efficiency score for this data type
205        let mut best_codec = CompressionCodec::Lz4; // Default
206        let mut best_score = 0.0;
207
208        for codec in &[
209            CompressionCodec::Lz4,
210            CompressionCodec::Zstd,
211            CompressionCodec::Snappy,
212        ] {
213            if let Some(stats_vec) = self.performance_history.get(&(*codec, data_type)) {
214                if !stats_vec.is_empty() {
215                    let avg_score: f64 =
216                        stats_vec.iter().map(|s| s.efficiency_score()).sum::<f64>()
217                            / stats_vec.len() as f64;
218
219                    if avg_score > best_score {
220                        best_score = avg_score;
221                        best_codec = *codec;
222                    }
223                }
224            }
225        }
226
227        // If no history, use heuristics
228        if best_score == 0.0 {
229            return self.heuristic_codec(data_type);
230        }
231
232        best_codec
233    }
234
235    /// Heuristic codec selection based on data type
236    fn heuristic_codec(&self, data_type: DataType) -> CompressionCodec {
237        match data_type {
238            DataType::Binary => CompressionCodec::Lz4,
239            DataType::Text => CompressionCodec::Zstd,
240            DataType::Image => CompressionCodec::Lz4,
241            DataType::Numerical => CompressionCodec::Zstd,
242            DataType::Compressed => CompressionCodec::None,
243        }
244    }
245
246    /// Compress with LZ4
247    fn compress_lz4(&self, data: &[u8]) -> Result<Bytes> {
248        // Compress with oxiarc-lz4 and prepend original size as 4-byte LE i32
249        let compressed =
250            oxiarc_lz4::compress_block(data).map_err(|e| CacheError::Compression(e.to_string()))?;
251        let orig_size = data.len() as i32;
252        let mut result = Vec::with_capacity(4 + compressed.len());
253        result.extend_from_slice(&orig_size.to_le_bytes());
254        result.extend_from_slice(&compressed);
255        Ok(Bytes::from(result))
256    }
257
258    /// Decompress with LZ4
259    fn decompress_lz4(&self, data: &[u8]) -> Result<Bytes> {
260        // Data has 4-byte LE i32 size prefix followed by compressed block
261        if data.len() < 4 {
262            return Err(CacheError::Decompression("LZ4 data too short".to_string()));
263        }
264        let orig_size = i32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
265        let decompressed = oxiarc_lz4::decompress_block(&data[4..], orig_size)
266            .map_err(|e| CacheError::Decompression(e.to_string()))?;
267        Ok(Bytes::from(decompressed))
268    }
269
270    /// Compress with Zstd
271    fn compress_zstd(&self, data: &[u8]) -> Result<Bytes> {
272        let level = self.default_level.to_zstd_level();
273        oxiarc_zstd::encode_all(data, level)
274            .map(Bytes::from)
275            .map_err(|e| CacheError::Compression(e.to_string()))
276    }
277
278    /// Decompress with Zstd
279    fn decompress_zstd(&self, data: &[u8]) -> Result<Bytes> {
280        oxiarc_zstd::decode_all(data)
281            .map(Bytes::from)
282            .map_err(|e| CacheError::Decompression(e.to_string()))
283    }
284
285    /// Compress with Snappy
286    fn compress_snappy(&self, data: &[u8]) -> Result<Bytes> {
287        Ok(Bytes::from(oxiarc_snappy::compress(data)))
288    }
289
290    /// Decompress with Snappy
291    fn decompress_snappy(&self, data: &[u8]) -> Result<Bytes> {
292        oxiarc_snappy::decompress(data)
293            .map(Bytes::from)
294            .map_err(|e| CacheError::Decompression(e.to_string()))
295    }
296
297    /// Record compression statistics
298    fn record_stats(&mut self, data_type: DataType, stats: CompressionStats) {
299        let key = (stats.codec, data_type);
300        let history = self.performance_history.entry(key).or_default();
301
302        history.push(stats);
303
304        // Limit history size
305        if history.len() > self.max_history {
306            history.remove(0);
307        }
308    }
309
310    /// Get average compression ratio for a codec and data type
311    pub fn avg_compression_ratio(
312        &self,
313        codec: CompressionCodec,
314        data_type: DataType,
315    ) -> Option<f64> {
316        self.performance_history
317            .get(&(codec, data_type))
318            .and_then(|stats_vec| {
319                if stats_vec.is_empty() {
320                    None
321                } else {
322                    let avg = stats_vec.iter().map(|s| s.compression_ratio()).sum::<f64>()
323                        / stats_vec.len() as f64;
324                    Some(avg)
325                }
326            })
327    }
328
329    /// Get performance statistics for all codecs
330    pub fn get_performance_stats(
331        &self,
332    ) -> HashMap<(CompressionCodec, DataType), PerformanceMetrics> {
333        let mut result = HashMap::new();
334
335        for (key, stats_vec) in &self.performance_history {
336            if stats_vec.is_empty() {
337                continue;
338            }
339
340            let avg_ratio = stats_vec.iter().map(|s| s.compression_ratio()).sum::<f64>()
341                / stats_vec.len() as f64;
342
343            let avg_comp_throughput = stats_vec
344                .iter()
345                .map(|s| s.compression_throughput_mbps())
346                .sum::<f64>()
347                / stats_vec.len() as f64;
348
349            let avg_decomp_throughput = stats_vec
350                .iter()
351                .filter(|s| s.decompression_time_us > 0)
352                .map(|s| s.decompression_throughput_mbps())
353                .sum::<f64>()
354                / stats_vec.len() as f64;
355
356            result.insert(
357                *key,
358                PerformanceMetrics {
359                    avg_compression_ratio: avg_ratio,
360                    avg_compression_throughput_mbps: avg_comp_throughput,
361                    avg_decompression_throughput_mbps: avg_decomp_throughput,
362                    sample_count: stats_vec.len(),
363                },
364            );
365        }
366
367        result
368    }
369
370    /// Clear all performance history
371    pub fn clear_history(&mut self) {
372        self.performance_history.clear();
373    }
374}
375
376impl Default for AdaptiveCompressor {
377    fn default() -> Self {
378        Self::new()
379    }
380}
381
382/// Performance metrics summary
383#[derive(Debug, Clone)]
384pub struct PerformanceMetrics {
385    /// Average compression ratio
386    pub avg_compression_ratio: f64,
387    /// Average compression throughput in MB/s
388    pub avg_compression_throughput_mbps: f64,
389    /// Average decompression throughput in MB/s
390    pub avg_decompression_throughput_mbps: f64,
391    /// Number of samples
392    pub sample_count: usize,
393}
394
395/// Compressed data container
396#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
397pub struct CompressedData {
398    /// Compressed bytes
399    #[serde(with = "serde_bytes")]
400    pub data: Vec<u8>,
401    /// Codec used
402    pub codec: CompressionCodec,
403    /// Original size (for validation)
404    pub original_size: usize,
405}
406
407impl CompressedData {
408    /// Create new compressed data
409    pub fn new(data: Vec<u8>, codec: CompressionCodec, original_size: usize) -> Self {
410        Self {
411            data,
412            codec,
413            original_size,
414        }
415    }
416
417    /// Decompress the data
418    pub fn decompress(&self, compressor: &mut AdaptiveCompressor) -> Result<Bytes> {
419        let decompressed = compressor.decompress(&self.data, self.codec)?;
420
421        // Validate size
422        if decompressed.len() != self.original_size {
423            return Err(CacheError::Decompression(format!(
424                "Size mismatch: expected {}, got {}",
425                self.original_size,
426                decompressed.len()
427            )));
428        }
429
430        Ok(decompressed)
431    }
432
433    /// Get compressed size
434    pub fn compressed_size(&self) -> usize {
435        self.data.len()
436    }
437
438    /// Calculate compression ratio
439    pub fn compression_ratio(&self) -> f64 {
440        if !self.data.is_empty() {
441            self.original_size as f64 / self.data.len() as f64
442        } else {
443            1.0
444        }
445    }
446}
447
448mod serde_bytes {
449    use serde::{Deserialize, Deserializer, Serialize, Serializer};
450
451    pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> std::result::Result<S::Ok, S::Error>
452    where
453        S: Serializer,
454    {
455        bytes.serialize(serializer)
456    }
457
458    pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
459    where
460        D: Deserializer<'de>,
461    {
462        Vec::<u8>::deserialize(deserializer)
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    #[test]
471    fn test_lz4_compression() {
472        let mut compressor = AdaptiveCompressor::new();
473        let data = b"Hello, World! ".repeat(100);
474
475        let compressed = compressor
476            .compress(&data, CompressionCodec::Lz4, DataType::Text)
477            .expect("compression failed");
478
479        assert!(compressed.len() < data.len());
480
481        let decompressed = compressor
482            .decompress(&compressed, CompressionCodec::Lz4)
483            .expect("decompression failed");
484
485        assert_eq!(decompressed.as_ref(), &data[..]);
486    }
487
488    #[test]
489    fn test_zstd_compression() {
490        let mut compressor = AdaptiveCompressor::new();
491        let data = b"Test data for compression ".repeat(50);
492
493        let compressed = compressor
494            .compress(&data, CompressionCodec::Zstd, DataType::Text)
495            .expect("compression failed");
496
497        assert!(compressed.len() < data.len());
498
499        let decompressed = compressor
500            .decompress(&compressed, CompressionCodec::Zstd)
501            .expect("decompression failed");
502
503        assert_eq!(decompressed.as_ref(), &data[..]);
504    }
505
506    #[test]
507    fn test_snappy_compression() {
508        let mut compressor = AdaptiveCompressor::new();
509        let data = b"Snappy compression test ".repeat(50);
510
511        let compressed = compressor
512            .compress(&data, CompressionCodec::Snappy, DataType::Binary)
513            .expect("compression failed");
514
515        assert!(compressed.len() < data.len());
516
517        let decompressed = compressor
518            .decompress(&compressed, CompressionCodec::Snappy)
519            .expect("decompression failed");
520
521        assert_eq!(decompressed.as_ref(), &data[..]);
522    }
523
524    #[test]
525    fn test_codec_selection() {
526        let compressor = AdaptiveCompressor::new();
527
528        // Initially uses heuristics
529        assert_eq!(
530            compressor.select_codec(DataType::Text),
531            CompressionCodec::Zstd
532        );
533        assert_eq!(
534            compressor.select_codec(DataType::Binary),
535            CompressionCodec::Lz4
536        );
537        assert_eq!(
538            compressor.select_codec(DataType::Compressed),
539            CompressionCodec::None
540        );
541    }
542
543    #[test]
544    fn test_min_compress_size() {
545        let mut compressor = AdaptiveCompressor::new().with_min_size(1000);
546        let small_data = b"small";
547
548        let result = compressor
549            .compress(small_data, CompressionCodec::Lz4, DataType::Binary)
550            .expect("compression failed");
551
552        // Should not compress small data
553        assert_eq!(result.len(), small_data.len());
554    }
555
556    #[test]
557    fn test_compression_stats() {
558        let stats = CompressionStats {
559            original_size: 1000,
560            compressed_size: 500,
561            compression_time_us: 1000,
562            decompression_time_us: 500,
563            codec: CompressionCodec::Lz4,
564        };
565
566        approx::assert_relative_eq!(stats.compression_ratio(), 2.0, epsilon = 0.01);
567        assert!(stats.compression_throughput_mbps() > 0.0);
568        assert!(stats.decompression_throughput_mbps() > 0.0);
569    }
570
571    #[test]
572    fn test_compressed_data() {
573        let mut compressor = AdaptiveCompressor::new();
574        // Use enough repeated data to exceed min_compress_size (1024 bytes)
575        // and achieve a compression ratio > 1.0
576        let original = b"Test data for compression ratio validation! ".repeat(100);
577
578        let compressed_bytes = compressor
579            .compress(&original, CompressionCodec::Zstd, DataType::Binary)
580            .expect("compression failed");
581
582        let compressed_data = CompressedData::new(
583            compressed_bytes.to_vec(),
584            CompressionCodec::Zstd,
585            original.len(),
586        );
587
588        assert!(compressed_data.compression_ratio() > 1.0);
589
590        let decompressed = compressed_data
591            .decompress(&mut compressor)
592            .expect("decompression failed");
593
594        assert_eq!(decompressed.as_ref(), &original[..]);
595    }
596}