Skip to main content

oxigdal_edge/
compression.rs

1//! Edge-optimized compression for bandwidth-limited environments
2//!
3//! Provides compression strategies optimized for edge devices with
4//! limited CPU and memory resources.
5
6use crate::error::{EdgeError, Result};
7use bytes::Bytes;
8use serde::{Deserialize, Serialize};
9
10/// Compression level
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
12pub enum CompressionLevel {
13    /// Fastest compression, lower ratio
14    Fast,
15    /// Balanced compression and speed
16    Balanced,
17    /// Best compression ratio, slower
18    Best,
19}
20
21impl CompressionLevel {
22    /// Get LZ4 compression level
23    pub fn lz4_level(&self) -> i32 {
24        match self {
25            Self::Fast => 1,
26            Self::Balanced => 4,
27            Self::Best => 9,
28        }
29    }
30
31    /// Get deflate level as u8
32    pub fn deflate_level_u8(&self) -> u8 {
33        match self {
34            Self::Fast => 1,
35            Self::Balanced => 6,
36            Self::Best => 9,
37        }
38    }
39}
40
41/// Compression strategy
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
43pub enum CompressionStrategy {
44    /// LZ4 compression (fast, good for real-time)
45    Lz4,
46    /// Snappy compression (fast, good for throughput)
47    Snappy,
48    /// Deflate/GZIP (balanced)
49    Deflate,
50    /// No compression
51    None,
52}
53
54impl CompressionStrategy {
55    /// Select best strategy based on data characteristics
56    pub fn auto_select(data: &[u8]) -> Self {
57        // Simple heuristic based on data size and entropy
58        if data.len() < 1024 {
59            // Small data: no compression overhead
60            Self::None
61        } else if Self::estimate_entropy(data) > 0.9 {
62            // High entropy (likely already compressed): skip compression
63            Self::None
64        } else if data.len() < 10 * 1024 {
65            // Small to medium: use fast compression
66            Self::Snappy
67        } else {
68            // Larger data: use balanced compression
69            Self::Lz4
70        }
71    }
72
73    /// Estimate Shannon entropy of data
74    fn estimate_entropy(data: &[u8]) -> f64 {
75        if data.is_empty() {
76            return 0.0;
77        }
78
79        let mut counts = [0u32; 256];
80        for &byte in data.iter().take(1024.min(data.len())) {
81            counts[byte as usize] = counts[byte as usize].saturating_add(1);
82        }
83
84        let len = data.len().min(1024) as f64;
85        let mut entropy = 0.0;
86
87        for &count in &counts {
88            if count > 0 {
89                let p = count as f64 / len;
90                entropy -= p * p.log2();
91            }
92        }
93
94        entropy / 8.0 // Normalize to 0-1 range
95    }
96}
97
98/// Edge compressor
99pub struct EdgeCompressor {
100    strategy: CompressionStrategy,
101    level: CompressionLevel,
102}
103
104impl EdgeCompressor {
105    /// Create new compressor with strategy and level
106    pub fn new(strategy: CompressionStrategy, level: CompressionLevel) -> Self {
107        Self { strategy, level }
108    }
109
110    /// Create compressor with auto-selected strategy
111    pub fn auto() -> Self {
112        Self {
113            strategy: CompressionStrategy::Lz4,
114            level: CompressionLevel::Balanced,
115        }
116    }
117
118    /// Create fast compressor for real-time use
119    pub fn fast() -> Self {
120        Self {
121            strategy: CompressionStrategy::Snappy,
122            level: CompressionLevel::Fast,
123        }
124    }
125
126    /// Create best compression for storage
127    pub fn best() -> Self {
128        Self {
129            strategy: CompressionStrategy::Deflate,
130            level: CompressionLevel::Best,
131        }
132    }
133
134    /// Compress data
135    pub fn compress(&self, data: &[u8]) -> Result<Bytes> {
136        match self.strategy {
137            CompressionStrategy::None => Ok(Bytes::copy_from_slice(data)),
138            CompressionStrategy::Lz4 => self.compress_lz4(data),
139            CompressionStrategy::Snappy => self.compress_snappy(data),
140            CompressionStrategy::Deflate => self.compress_deflate(data),
141        }
142    }
143
144    /// Decompress data
145    pub fn decompress(&self, data: &[u8]) -> Result<Bytes> {
146        match self.strategy {
147            CompressionStrategy::None => Ok(Bytes::copy_from_slice(data)),
148            CompressionStrategy::Lz4 => self.decompress_lz4(data),
149            CompressionStrategy::Snappy => self.decompress_snappy(data),
150            CompressionStrategy::Deflate => self.decompress_deflate(data),
151        }
152    }
153
154    /// Compress with LZ4
155    fn compress_lz4(&self, data: &[u8]) -> Result<Bytes> {
156        // Compress with oxiarc-lz4 and prepend original size as 4-byte LE i32
157        let compressed = oxiarc_lz4::compress_block_with_accel(data, self.level.lz4_level())
158            .map_err(|e| EdgeError::compression(e.to_string()))?;
159        let orig_size = data.len() as i32;
160        let mut result = Vec::with_capacity(4 + compressed.len());
161        result.extend_from_slice(&orig_size.to_le_bytes());
162        result.extend_from_slice(&compressed);
163        Ok(Bytes::from(result))
164    }
165
166    /// Decompress with LZ4
167    fn decompress_lz4(&self, data: &[u8]) -> Result<Bytes> {
168        // Data has 4-byte LE i32 size prefix followed by compressed block
169        if data.len() < 4 {
170            return Err(EdgeError::decompression("LZ4 data too short".to_string()));
171        }
172        let orig_size = i32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
173        let decompressed = oxiarc_lz4::decompress_block(&data[4..], orig_size)
174            .map_err(|e| EdgeError::decompression(e.to_string()))?;
175        Ok(Bytes::from(decompressed))
176    }
177
178    /// Compress with Snappy
179    fn compress_snappy(&self, data: &[u8]) -> Result<Bytes> {
180        Ok(Bytes::from(oxiarc_snappy::compress(data)))
181    }
182
183    /// Decompress with Snappy
184    fn decompress_snappy(&self, data: &[u8]) -> Result<Bytes> {
185        oxiarc_snappy::decompress(data)
186            .map(Bytes::from)
187            .map_err(|e| EdgeError::decompression(e.to_string()))
188    }
189
190    /// Compress with Deflate
191    fn compress_deflate(&self, data: &[u8]) -> Result<Bytes> {
192        oxiarc_deflate::deflate(data, self.level.deflate_level_u8())
193            .map(Bytes::from)
194            .map_err(|e| EdgeError::compression(e.to_string()))
195    }
196
197    /// Decompress with Deflate
198    fn decompress_deflate(&self, data: &[u8]) -> Result<Bytes> {
199        oxiarc_deflate::inflate(data)
200            .map(Bytes::from)
201            .map_err(|e| EdgeError::decompression(e.to_string()))
202    }
203
204    /// Get compression ratio for data
205    pub fn compression_ratio(&self, original_size: usize, compressed_size: usize) -> f64 {
206        if original_size == 0 {
207            return 0.0;
208        }
209        compressed_size as f64 / original_size as f64
210    }
211
212    /// Estimate compressed size without actually compressing
213    pub fn estimate_compressed_size(&self, data: &[u8]) -> usize {
214        match self.strategy {
215            CompressionStrategy::None => data.len(),
216            CompressionStrategy::Snappy => {
217                // Snappy worst case: ~1.5x original size
218                (data.len() as f64 * 1.5) as usize
219            }
220            CompressionStrategy::Lz4 => {
221                // LZ4 worst case: original size + overhead
222                data.len() + (data.len() / 255) + 16
223            }
224            CompressionStrategy::Deflate => {
225                // Deflate worst case: ~1.1x original size
226                (data.len() as f64 * 1.1) as usize
227            }
228        }
229    }
230}
231
232/// Adaptive compressor that selects strategy based on data
233pub struct AdaptiveCompressor {
234    level: CompressionLevel,
235}
236
237impl AdaptiveCompressor {
238    /// Create new adaptive compressor
239    pub fn new(level: CompressionLevel) -> Self {
240        Self { level }
241    }
242
243    /// Compress data with auto-selected strategy
244    pub fn compress(&self, data: &[u8]) -> Result<(Bytes, CompressionStrategy)> {
245        let strategy = CompressionStrategy::auto_select(data);
246        let compressor = EdgeCompressor::new(strategy, self.level);
247        let compressed = compressor.compress(data)?;
248        Ok((compressed, strategy))
249    }
250
251    /// Decompress data with specified strategy
252    pub fn decompress(&self, data: &[u8], strategy: CompressionStrategy) -> Result<Bytes> {
253        let compressor = EdgeCompressor::new(strategy, self.level);
254        compressor.decompress(data)
255    }
256}
257
258/// Compressed data with metadata
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct CompressedData {
261    /// Compression strategy used
262    pub strategy: CompressionStrategy,
263    /// Original size
264    pub original_size: usize,
265    /// Compressed size
266    pub compressed_size: usize,
267    /// Compressed data
268    pub data: Vec<u8>,
269}
270
271impl CompressedData {
272    /// Create new compressed data
273    pub fn new(strategy: CompressionStrategy, original_size: usize, data: Bytes) -> Self {
274        let compressed_size = data.len();
275        Self {
276            strategy,
277            original_size,
278            compressed_size,
279            data: data.to_vec(),
280        }
281    }
282
283    /// Get compression ratio
284    pub fn ratio(&self) -> f64 {
285        if self.original_size == 0 {
286            return 0.0;
287        }
288        self.compressed_size as f64 / self.original_size as f64
289    }
290
291    /// Get space saved in bytes
292    pub fn space_saved(&self) -> usize {
293        self.original_size.saturating_sub(self.compressed_size)
294    }
295
296    /// Get space saved as percentage
297    pub fn space_saved_percent(&self) -> f64 {
298        if self.original_size == 0 {
299            return 0.0;
300        }
301        (self.space_saved() as f64 / self.original_size as f64) * 100.0
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308
309    #[test]
310    fn test_compression_lz4() -> Result<()> {
311        let compressor = EdgeCompressor::new(CompressionStrategy::Lz4, CompressionLevel::Balanced);
312        // Use larger data to ensure compression is beneficial
313        let data = b"Hello, World! This is a test message for compression. \
314                     Repeat this several times to make it worth compressing. \
315                     Hello, World! This is a test message for compression.";
316
317        let compressed = compressor.compress(data)?;
318        let decompressed = compressor.decompress(&compressed)?;
319
320        assert_eq!(&decompressed[..], &data[..]);
321        // Note: For very small data, compression may not reduce size due to overhead
322        // Just verify decompression works correctly
323
324        Ok(())
325    }
326
327    #[test]
328    fn test_compression_snappy() -> Result<()> {
329        let compressor = EdgeCompressor::new(CompressionStrategy::Snappy, CompressionLevel::Fast);
330        let data = b"Hello, World! This is a test message for compression.";
331
332        let compressed = compressor.compress(data)?;
333        let decompressed = compressor.decompress(&compressed)?;
334
335        assert_eq!(&decompressed[..], &data[..]);
336
337        Ok(())
338    }
339
340    #[test]
341    fn test_compression_deflate() -> Result<()> {
342        let compressor = EdgeCompressor::new(CompressionStrategy::Deflate, CompressionLevel::Best);
343        let data = b"Hello, World! This is a test message for compression.";
344
345        let compressed = compressor.compress(data)?;
346        let decompressed = compressor.decompress(&compressed)?;
347
348        assert_eq!(&decompressed[..], &data[..]);
349
350        Ok(())
351    }
352
353    #[test]
354    fn test_compression_none() -> Result<()> {
355        let compressor = EdgeCompressor::new(CompressionStrategy::None, CompressionLevel::Fast);
356        let data = b"Hello, World!";
357
358        let compressed = compressor.compress(data)?;
359        assert_eq!(&compressed[..], &data[..]);
360
361        Ok(())
362    }
363
364    #[test]
365    fn test_adaptive_compression() -> Result<()> {
366        let compressor = AdaptiveCompressor::new(CompressionLevel::Balanced);
367        let data = b"Hello, World! This is a test message for adaptive compression.";
368
369        let (compressed, strategy) = compressor.compress(data)?;
370        let decompressed = compressor.decompress(&compressed, strategy)?;
371
372        assert_eq!(&decompressed[..], &data[..]);
373
374        Ok(())
375    }
376
377    #[test]
378    fn test_auto_select_strategy() {
379        let small_data = b"Hi";
380        let strategy = CompressionStrategy::auto_select(small_data);
381        assert_eq!(strategy, CompressionStrategy::None);
382
383        let medium_data = vec![0u8; 5000];
384        let strategy = CompressionStrategy::auto_select(&medium_data);
385        assert!(matches!(
386            strategy,
387            CompressionStrategy::Snappy | CompressionStrategy::Lz4
388        ));
389    }
390
391    #[test]
392    fn test_compression_ratio() {
393        let compressor = EdgeCompressor::fast();
394        let ratio = compressor.compression_ratio(1000, 500);
395        assert_eq!(ratio, 0.5);
396    }
397
398    #[test]
399    fn test_compressed_data_metadata() -> Result<()> {
400        // Use larger, more compressible data to ensure compression works
401        let original = b"Test data for compression. This message repeats. \
402                         Test data for compression. This message repeats. \
403                         Test data for compression. This message repeats.";
404        let compressor = EdgeCompressor::fast();
405        let compressed = compressor.compress(original)?;
406
407        let metadata = CompressedData::new(CompressionStrategy::Snappy, original.len(), compressed);
408
409        assert_eq!(metadata.original_size, original.len());
410        // Compression ratio should be positive (can be > 1.0 if compression increases size due to overhead)
411        assert!(metadata.ratio() > 0.0);
412        // Space saved percentage can be negative if compression increased size
413        assert!(
414            metadata.space_saved_percent() >= -100.0 && metadata.space_saved_percent() <= 100.0
415        );
416
417        Ok(())
418    }
419}