Skip to main content

featherdb_storage/
compression.rs

1//! Adaptive Page Compression for FeatherDB
2//!
3//! This module provides transparent compression for database pages,
4//! supporting multiple compression algorithms:
5//! - LZ4: Fast compression/decompression, moderate compression ratio
6//! - ZSTD: Higher compression ratio, configurable levels
7//!
8//! Compression is adaptive and skips pages smaller than a configurable threshold.
9
10use featherdb_core::{Error, Result};
11use std::sync::atomic::{AtomicU64, Ordering};
12
13/// Compression algorithm type
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
15pub enum CompressionType {
16    /// No compression
17    #[default]
18    None,
19    /// LZ4 - fast compression with moderate ratio
20    Lz4,
21    /// ZSTD - high compression ratio with configurable level
22    /// Level ranges from 1 (fastest) to 22 (best compression)
23    /// Default level is 3 for a good balance
24    Zstd { level: i32 },
25}
26
27impl CompressionType {
28    /// Create ZSTD compression with default level (3)
29    pub fn zstd_default() -> Self {
30        CompressionType::Zstd { level: 3 }
31    }
32
33    /// Create ZSTD compression with high ratio (level 9)
34    pub fn zstd_high() -> Self {
35        CompressionType::Zstd { level: 9 }
36    }
37
38    /// Create ZSTD compression with maximum ratio (level 19)
39    pub fn zstd_max() -> Self {
40        CompressionType::Zstd { level: 19 }
41    }
42
43    /// Get the byte identifier for this compression type
44    pub fn to_byte(&self) -> u8 {
45        match self {
46            CompressionType::None => 0,
47            CompressionType::Lz4 => 1,
48            CompressionType::Zstd { .. } => 2,
49        }
50    }
51
52    /// Create compression type from byte identifier
53    pub fn from_byte(byte: u8, level: i32) -> Self {
54        match byte {
55            0 => CompressionType::None,
56            1 => CompressionType::Lz4,
57            2 => CompressionType::Zstd { level },
58            _ => CompressionType::None,
59        }
60    }
61}
62
63/// Trait for compression implementations
64pub trait Compressor: Send + Sync {
65    /// Compress data
66    fn compress(&self, data: &[u8]) -> Result<Vec<u8>>;
67
68    /// Decompress data to expected size
69    fn decompress(&self, data: &[u8], expected_size: usize) -> Result<Vec<u8>>;
70
71    /// Get the compression type
72    fn compression_type(&self) -> CompressionType;
73}
74
75/// No-op compressor (passthrough)
76pub struct NoCompressor;
77
78impl Compressor for NoCompressor {
79    fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
80        Ok(data.to_vec())
81    }
82
83    fn decompress(&self, data: &[u8], _expected_size: usize) -> Result<Vec<u8>> {
84        Ok(data.to_vec())
85    }
86
87    fn compression_type(&self) -> CompressionType {
88        CompressionType::None
89    }
90}
91
92/// LZ4 compressor - optimized for speed
93pub struct Lz4Compressor;
94
95impl Compressor for Lz4Compressor {
96    fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
97        Ok(lz4_flex::compress_prepend_size(data))
98    }
99
100    fn decompress(&self, data: &[u8], expected_size: usize) -> Result<Vec<u8>> {
101        lz4_flex::decompress_size_prepended(data).map_err(|e| Error::DecompressionError {
102            message: format!("LZ4 decompression failed: {}", e),
103            expected_size,
104        })
105    }
106
107    fn compression_type(&self) -> CompressionType {
108        CompressionType::Lz4
109    }
110}
111
112/// ZSTD compressor - optimized for compression ratio
113pub struct ZstdCompressor {
114    level: i32,
115}
116
117impl ZstdCompressor {
118    pub fn new(level: i32) -> Self {
119        // Clamp level to valid range (1-22)
120        let level = level.clamp(1, 22);
121        ZstdCompressor { level }
122    }
123}
124
125impl Compressor for ZstdCompressor {
126    fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
127        zstd::encode_all(data, self.level).map_err(|e| Error::CompressionError {
128            message: format!("ZSTD compression failed: {}", e),
129        })
130    }
131
132    fn decompress(&self, data: &[u8], expected_size: usize) -> Result<Vec<u8>> {
133        zstd::decode_all(data).map_err(|e| Error::DecompressionError {
134            message: format!("ZSTD decompression failed: {}", e),
135            expected_size,
136        })
137    }
138
139    fn compression_type(&self) -> CompressionType {
140        CompressionType::Zstd { level: self.level }
141    }
142}
143
144/// Factory to create compressors based on type
145pub fn create_compressor(compression_type: CompressionType) -> Box<dyn Compressor> {
146    match compression_type {
147        CompressionType::None => Box::new(NoCompressor),
148        CompressionType::Lz4 => Box::new(Lz4Compressor),
149        CompressionType::Zstd { level } => Box::new(ZstdCompressor::new(level)),
150    }
151}
152
153/// Statistics for compression operations
154#[derive(Debug, Default)]
155pub struct CompressionStats {
156    /// Total bytes before compression
157    pub bytes_before_compression: AtomicU64,
158    /// Total bytes after compression
159    pub bytes_after_compression: AtomicU64,
160    /// Number of pages compressed
161    pub pages_compressed: AtomicU64,
162    /// Number of pages skipped (below threshold or incompressible)
163    pub pages_skipped: AtomicU64,
164    /// Number of pages decompressed
165    pub pages_decompressed: AtomicU64,
166}
167
168impl CompressionStats {
169    /// Create new compression stats
170    pub fn new() -> Self {
171        Self::default()
172    }
173
174    /// Get the overall compression ratio (0.0-1.0, lower is better)
175    /// Returns 1.0 if no compression has occurred
176    pub fn compression_ratio(&self) -> f64 {
177        let before = self.bytes_before_compression.load(Ordering::Relaxed);
178        let after = self.bytes_after_compression.load(Ordering::Relaxed);
179
180        if before == 0 {
181            1.0
182        } else {
183            after as f64 / before as f64
184        }
185    }
186
187    /// Get the space savings percentage (0-100%, higher is better)
188    pub fn space_savings_percent(&self) -> f64 {
189        (1.0 - self.compression_ratio()) * 100.0
190    }
191
192    /// Get a snapshot of current statistics
193    pub fn snapshot(&self) -> CompressionStatsSnapshot {
194        CompressionStatsSnapshot {
195            bytes_before_compression: self.bytes_before_compression.load(Ordering::Relaxed),
196            bytes_after_compression: self.bytes_after_compression.load(Ordering::Relaxed),
197            pages_compressed: self.pages_compressed.load(Ordering::Relaxed),
198            pages_skipped: self.pages_skipped.load(Ordering::Relaxed),
199            pages_decompressed: self.pages_decompressed.load(Ordering::Relaxed),
200        }
201    }
202
203    /// Record a compression operation
204    pub fn record_compression(&self, original_size: usize, compressed_size: usize) {
205        self.bytes_before_compression
206            .fetch_add(original_size as u64, Ordering::Relaxed);
207        self.bytes_after_compression
208            .fetch_add(compressed_size as u64, Ordering::Relaxed);
209        self.pages_compressed.fetch_add(1, Ordering::Relaxed);
210    }
211
212    /// Record a skipped page (not compressed)
213    pub fn record_skipped(&self, size: usize) {
214        self.bytes_before_compression
215            .fetch_add(size as u64, Ordering::Relaxed);
216        self.bytes_after_compression
217            .fetch_add(size as u64, Ordering::Relaxed);
218        self.pages_skipped.fetch_add(1, Ordering::Relaxed);
219    }
220
221    /// Record a decompression operation
222    pub fn record_decompression(&self) {
223        self.pages_decompressed.fetch_add(1, Ordering::Relaxed);
224    }
225
226    /// Reset all statistics
227    pub fn reset(&self) {
228        self.bytes_before_compression.store(0, Ordering::Relaxed);
229        self.bytes_after_compression.store(0, Ordering::Relaxed);
230        self.pages_compressed.store(0, Ordering::Relaxed);
231        self.pages_skipped.store(0, Ordering::Relaxed);
232        self.pages_decompressed.store(0, Ordering::Relaxed);
233    }
234}
235
236/// Immutable snapshot of compression statistics
237#[derive(Debug, Clone, Copy)]
238pub struct CompressionStatsSnapshot {
239    pub bytes_before_compression: u64,
240    pub bytes_after_compression: u64,
241    pub pages_compressed: u64,
242    pub pages_skipped: u64,
243    pub pages_decompressed: u64,
244}
245
246impl CompressionStatsSnapshot {
247    /// Get the overall compression ratio (0.0-1.0, lower is better)
248    /// Returns 1.0 if no compression has occurred
249    pub fn compression_ratio(&self) -> f64 {
250        if self.bytes_before_compression == 0 {
251            1.0
252        } else {
253            self.bytes_after_compression as f64 / self.bytes_before_compression as f64
254        }
255    }
256
257    /// Get the space savings percentage (0-100%, higher is better)
258    pub fn space_savings_percent(&self) -> f64 {
259        (1.0 - self.compression_ratio()) * 100.0
260    }
261}
262
263impl Clone for CompressionStats {
264    fn clone(&self) -> Self {
265        CompressionStats {
266            bytes_before_compression: AtomicU64::new(
267                self.bytes_before_compression.load(Ordering::Relaxed),
268            ),
269            bytes_after_compression: AtomicU64::new(
270                self.bytes_after_compression.load(Ordering::Relaxed),
271            ),
272            pages_compressed: AtomicU64::new(self.pages_compressed.load(Ordering::Relaxed)),
273            pages_skipped: AtomicU64::new(self.pages_skipped.load(Ordering::Relaxed)),
274            pages_decompressed: AtomicU64::new(self.pages_decompressed.load(Ordering::Relaxed)),
275        }
276    }
277}
278
279/// Compressed page header stored at the beginning of compressed pages
280/// This is stored within the page data on disk
281#[derive(Debug, Clone, Copy)]
282pub struct CompressedPageHeader {
283    /// Magic byte to identify compressed pages (0xC0 = "CO" for compressed)
284    pub magic: u8,
285    /// Compression type (0=none, 1=lz4, 2=zstd)
286    pub compression_type: u8,
287    /// Original uncompressed size (u32 to support up to 4GB pages)
288    pub original_size: u32,
289    /// Compressed data size (excluding this header)
290    pub compressed_size: u32,
291    /// ZSTD compression level (only meaningful for ZSTD)
292    pub compression_level: u8,
293    /// Reserved for future use
294    pub reserved: u8,
295}
296
297impl CompressedPageHeader {
298    /// Magic byte for compressed pages
299    pub const MAGIC: u8 = 0xC0;
300
301    /// Header size in bytes
302    pub const SIZE: usize = 12;
303
304    /// Create a new compressed page header
305    pub fn new(
306        compression_type: CompressionType,
307        original_size: usize,
308        compressed_size: usize,
309    ) -> Self {
310        let (comp_type, level) = match compression_type {
311            CompressionType::None => (0, 0),
312            CompressionType::Lz4 => (1, 0),
313            CompressionType::Zstd { level } => (2, level as u8),
314        };
315
316        CompressedPageHeader {
317            magic: Self::MAGIC,
318            compression_type: comp_type,
319            original_size: original_size as u32,
320            compressed_size: compressed_size as u32,
321            compression_level: level,
322            reserved: 0,
323        }
324    }
325
326    /// Serialize header to bytes
327    pub fn to_bytes(&self) -> [u8; Self::SIZE] {
328        let mut buf = [0u8; Self::SIZE];
329        buf[0] = self.magic;
330        buf[1] = self.compression_type;
331        buf[2..6].copy_from_slice(&self.original_size.to_le_bytes());
332        buf[6..10].copy_from_slice(&self.compressed_size.to_le_bytes());
333        buf[10] = self.compression_level;
334        buf[11] = self.reserved;
335        buf
336    }
337
338    /// Deserialize header from bytes
339    pub fn from_bytes(data: &[u8]) -> Option<Self> {
340        if data.len() < Self::SIZE {
341            return None;
342        }
343
344        if data[0] != Self::MAGIC {
345            return None;
346        }
347
348        Some(CompressedPageHeader {
349            magic: data[0],
350            compression_type: data[1],
351            original_size: u32::from_le_bytes(data[2..6].try_into().ok()?),
352            compressed_size: u32::from_le_bytes(data[6..10].try_into().ok()?),
353            compression_level: data[10],
354            reserved: data[11],
355        })
356    }
357
358    /// Get the compression type enum
359    pub fn get_compression_type(&self) -> CompressionType {
360        CompressionType::from_byte(self.compression_type, self.compression_level as i32)
361    }
362
363    /// Check if this is a valid compressed page header
364    pub fn is_valid(&self) -> bool {
365        self.magic == Self::MAGIC && self.compression_type <= 2
366    }
367}
368
369/// Page compression manager
370/// Handles compression/decompression of pages with configurable settings
371pub struct PageCompressor {
372    /// The compressor implementation
373    compressor: Box<dyn Compressor>,
374    /// Minimum page size to compress (pages smaller than this are stored uncompressed)
375    threshold: usize,
376    /// Compression statistics
377    pub stats: CompressionStats,
378}
379
380impl PageCompressor {
381    /// Default compression threshold (512 bytes)
382    pub const DEFAULT_THRESHOLD: usize = 512;
383
384    /// Create a new page compressor
385    pub fn new(compression_type: CompressionType, threshold: usize) -> Self {
386        PageCompressor {
387            compressor: create_compressor(compression_type),
388            threshold,
389            stats: CompressionStats::new(),
390        }
391    }
392
393    /// Create a page compressor with default settings (LZ4, 512 byte threshold)
394    pub fn default_lz4() -> Self {
395        Self::new(CompressionType::Lz4, Self::DEFAULT_THRESHOLD)
396    }
397
398    /// Create a page compressor with ZSTD and default settings
399    pub fn default_zstd() -> Self {
400        Self::new(CompressionType::zstd_default(), Self::DEFAULT_THRESHOLD)
401    }
402
403    /// Compress page data for writing to disk
404    /// Returns the compressed data with header, or original data if compression is not beneficial
405    pub fn compress_page(&self, data: &[u8]) -> Result<Vec<u8>> {
406        let compression_type = self.compressor.compression_type();
407
408        // Skip compression for small pages or if compression is disabled
409        if data.len() < self.threshold || matches!(compression_type, CompressionType::None) {
410            self.stats.record_skipped(data.len());
411            return Ok(data.to_vec());
412        }
413
414        // Compress the data
415        let compressed = self.compressor.compress(data)?;
416
417        // Check if compression is beneficial (at least 10% smaller)
418        // Also ensure compressed + header is smaller than original
419        let total_compressed_size = CompressedPageHeader::SIZE + compressed.len();
420        if total_compressed_size >= data.len() || compressed.len() >= (data.len() * 9 / 10) {
421            // Compression not beneficial, store uncompressed
422            self.stats.record_skipped(data.len());
423            return Ok(data.to_vec());
424        }
425
426        // Create header and prepend to compressed data
427        let header = CompressedPageHeader::new(compression_type, data.len(), compressed.len());
428        let mut result = Vec::with_capacity(total_compressed_size);
429        result.extend_from_slice(&header.to_bytes());
430        result.extend_from_slice(&compressed);
431
432        self.stats.record_compression(data.len(), result.len());
433        Ok(result)
434    }
435
436    /// Decompress page data read from disk
437    /// Automatically detects if the page is compressed and decompresses if needed
438    pub fn decompress_page(&self, data: &[u8], page_size: usize) -> Result<Vec<u8>> {
439        // Check if this is compressed data by looking for the magic byte
440        if let Some(header) = CompressedPageHeader::from_bytes(data) {
441            if header.is_valid() {
442                // Extract compressed data (skip header)
443                let compressed_data = &data[CompressedPageHeader::SIZE..];
444                if compressed_data.len() < header.compressed_size as usize {
445                    return Err(Error::DecompressionError {
446                        message: "Compressed data truncated".into(),
447                        expected_size: header.compressed_size as usize,
448                    });
449                }
450
451                let compressed_data = &compressed_data[..header.compressed_size as usize];
452
453                // Create appropriate decompressor
454                let decompressor = create_compressor(header.get_compression_type());
455                let decompressed =
456                    decompressor.decompress(compressed_data, header.original_size as usize)?;
457
458                // Verify size
459                if decompressed.len() != header.original_size as usize {
460                    return Err(Error::DecompressionError {
461                        message: format!(
462                            "Decompressed size mismatch: expected {}, got {}",
463                            header.original_size,
464                            decompressed.len()
465                        ),
466                        expected_size: header.original_size as usize,
467                    });
468                }
469
470                self.stats.record_decompression();
471                return Ok(decompressed);
472            }
473        }
474
475        // Not compressed, return as-is (padded to page size if needed)
476        if data.len() < page_size {
477            let mut result = data.to_vec();
478            result.resize(page_size, 0);
479            Ok(result)
480        } else {
481            Ok(data[..page_size].to_vec())
482        }
483    }
484
485    /// Get the compression type
486    pub fn compression_type(&self) -> CompressionType {
487        self.compressor.compression_type()
488    }
489
490    /// Get the compression threshold
491    pub fn threshold(&self) -> usize {
492        self.threshold
493    }
494
495    /// Get compression statistics
496    pub fn stats(&self) -> &CompressionStats {
497        &self.stats
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504
505    #[test]
506    fn test_compression_type_byte_conversion() {
507        assert_eq!(CompressionType::None.to_byte(), 0);
508        assert_eq!(CompressionType::Lz4.to_byte(), 1);
509        assert_eq!(CompressionType::Zstd { level: 5 }.to_byte(), 2);
510
511        assert_eq!(CompressionType::from_byte(0, 0), CompressionType::None);
512        assert_eq!(CompressionType::from_byte(1, 0), CompressionType::Lz4);
513        assert_eq!(
514            CompressionType::from_byte(2, 5),
515            CompressionType::Zstd { level: 5 }
516        );
517    }
518
519    #[test]
520    fn test_lz4_compression() {
521        let compressor = Lz4Compressor;
522        let data = b"Hello, World! This is a test of LZ4 compression. ".repeat(100);
523
524        let compressed = compressor.compress(&data).unwrap();
525        assert!(compressed.len() < data.len());
526
527        let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
528        assert_eq!(decompressed, data);
529    }
530
531    #[test]
532    fn test_zstd_compression() {
533        let compressor = ZstdCompressor::new(3);
534        let data = b"Hello, World! This is a test of ZSTD compression. ".repeat(100);
535
536        let compressed = compressor.compress(&data).unwrap();
537        assert!(compressed.len() < data.len());
538
539        let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
540        assert_eq!(decompressed, data);
541    }
542
543    #[test]
544    fn test_zstd_levels() {
545        let data = b"Test data for compression level comparison. ".repeat(100);
546
547        // Higher levels should produce smaller output (usually)
548        let low = ZstdCompressor::new(1).compress(&data).unwrap();
549        let high = ZstdCompressor::new(19).compress(&data).unwrap();
550
551        // Both should decompress correctly
552        let dec_low = ZstdCompressor::new(1).decompress(&low, data.len()).unwrap();
553        let dec_high = ZstdCompressor::new(19)
554            .decompress(&high, data.len())
555            .unwrap();
556
557        assert_eq!(dec_low, data.to_vec());
558        assert_eq!(dec_high, data.to_vec());
559    }
560
561    #[test]
562    fn test_compressed_page_header() {
563        let header = CompressedPageHeader::new(CompressionType::Lz4, 4096, 2048);
564
565        let bytes = header.to_bytes();
566        let parsed = CompressedPageHeader::from_bytes(&bytes).unwrap();
567
568        assert_eq!(parsed.magic, CompressedPageHeader::MAGIC);
569        assert_eq!(parsed.compression_type, 1); // LZ4
570        assert_eq!(parsed.original_size, 4096);
571        assert_eq!(parsed.compressed_size, 2048);
572        assert!(parsed.is_valid());
573    }
574
575    #[test]
576    fn test_page_compressor_small_page() {
577        let compressor = PageCompressor::new(CompressionType::Lz4, 512);
578        let small_data = vec![0u8; 256]; // Below threshold
579
580        let result = compressor.compress_page(&small_data).unwrap();
581        assert_eq!(result, small_data); // Should be unchanged
582        assert_eq!(compressor.stats.pages_skipped.load(Ordering::Relaxed), 1);
583    }
584
585    #[test]
586    fn test_page_compressor_compressible_data() {
587        let compressor = PageCompressor::new(CompressionType::Lz4, 512);
588        let data = vec![0xABu8; 4096]; // Highly compressible
589
590        let compressed = compressor.compress_page(&data).unwrap();
591        assert!(compressed.len() < data.len());
592
593        // Should have compression header
594        let header = CompressedPageHeader::from_bytes(&compressed).unwrap();
595        assert!(header.is_valid());
596        assert_eq!(header.original_size, 4096);
597
598        // Decompress and verify
599        let decompressed = compressor.decompress_page(&compressed, 4096).unwrap();
600        assert_eq!(decompressed, data);
601    }
602
603    #[test]
604    fn test_page_compressor_incompressible_data() {
605        let compressor = PageCompressor::new(CompressionType::Lz4, 512);
606
607        // Random-ish data that doesn't compress well
608        let data: Vec<u8> = (0..4096).map(|i| (i * 17 + i / 3) as u8).collect();
609
610        let result = compressor.compress_page(&data).unwrap();
611
612        // Decompress should work regardless
613        let decompressed = compressor.decompress_page(&result, 4096).unwrap();
614        assert_eq!(decompressed, data);
615    }
616
617    #[test]
618    fn test_compression_stats() {
619        let stats = CompressionStats::new();
620
621        stats.record_compression(4096, 2048);
622        stats.record_compression(4096, 1024);
623
624        assert_eq!(stats.bytes_before_compression.load(Ordering::Relaxed), 8192);
625        assert_eq!(stats.bytes_after_compression.load(Ordering::Relaxed), 3072);
626        assert_eq!(stats.pages_compressed.load(Ordering::Relaxed), 2);
627
628        // 3072 / 8192 = 0.375
629        assert!((stats.compression_ratio() - 0.375).abs() < 0.001);
630
631        // 62.5% space savings
632        assert!((stats.space_savings_percent() - 62.5).abs() < 0.1);
633    }
634
635    #[test]
636    fn test_no_compression() {
637        let compressor = PageCompressor::new(CompressionType::None, 512);
638        let data = vec![0xABu8; 4096];
639
640        let result = compressor.compress_page(&data).unwrap();
641        assert_eq!(result, data); // Should be unchanged
642
643        let decompressed = compressor.decompress_page(&result, 4096).unwrap();
644        assert_eq!(decompressed, data);
645    }
646}