1use featherdb_core::{Error, Result};
11use std::sync::atomic::{AtomicU64, Ordering};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
15pub enum CompressionType {
16 #[default]
18 None,
19 Lz4,
21 Zstd { level: i32 },
25}
26
27impl CompressionType {
28 pub fn zstd_default() -> Self {
30 CompressionType::Zstd { level: 3 }
31 }
32
33 pub fn zstd_high() -> Self {
35 CompressionType::Zstd { level: 9 }
36 }
37
38 pub fn zstd_max() -> Self {
40 CompressionType::Zstd { level: 19 }
41 }
42
43 pub fn to_byte(&self) -> u8 {
45 match self {
46 CompressionType::None => 0,
47 CompressionType::Lz4 => 1,
48 CompressionType::Zstd { .. } => 2,
49 }
50 }
51
52 pub fn from_byte(byte: u8, level: i32) -> Self {
54 match byte {
55 0 => CompressionType::None,
56 1 => CompressionType::Lz4,
57 2 => CompressionType::Zstd { level },
58 _ => CompressionType::None,
59 }
60 }
61}
62
63pub trait Compressor: Send + Sync {
65 fn compress(&self, data: &[u8]) -> Result<Vec<u8>>;
67
68 fn decompress(&self, data: &[u8], expected_size: usize) -> Result<Vec<u8>>;
70
71 fn compression_type(&self) -> CompressionType;
73}
74
75pub struct NoCompressor;
77
78impl Compressor for NoCompressor {
79 fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
80 Ok(data.to_vec())
81 }
82
83 fn decompress(&self, data: &[u8], _expected_size: usize) -> Result<Vec<u8>> {
84 Ok(data.to_vec())
85 }
86
87 fn compression_type(&self) -> CompressionType {
88 CompressionType::None
89 }
90}
91
92pub struct Lz4Compressor;
94
95impl Compressor for Lz4Compressor {
96 fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
97 Ok(lz4_flex::compress_prepend_size(data))
98 }
99
100 fn decompress(&self, data: &[u8], expected_size: usize) -> Result<Vec<u8>> {
101 lz4_flex::decompress_size_prepended(data).map_err(|e| Error::DecompressionError {
102 message: format!("LZ4 decompression failed: {}", e),
103 expected_size,
104 })
105 }
106
107 fn compression_type(&self) -> CompressionType {
108 CompressionType::Lz4
109 }
110}
111
112pub struct ZstdCompressor {
114 level: i32,
115}
116
117impl ZstdCompressor {
118 pub fn new(level: i32) -> Self {
119 let level = level.clamp(1, 22);
121 ZstdCompressor { level }
122 }
123}
124
125impl Compressor for ZstdCompressor {
126 fn compress(&self, data: &[u8]) -> Result<Vec<u8>> {
127 zstd::encode_all(data, self.level).map_err(|e| Error::CompressionError {
128 message: format!("ZSTD compression failed: {}", e),
129 })
130 }
131
132 fn decompress(&self, data: &[u8], expected_size: usize) -> Result<Vec<u8>> {
133 zstd::decode_all(data).map_err(|e| Error::DecompressionError {
134 message: format!("ZSTD decompression failed: {}", e),
135 expected_size,
136 })
137 }
138
139 fn compression_type(&self) -> CompressionType {
140 CompressionType::Zstd { level: self.level }
141 }
142}
143
144pub fn create_compressor(compression_type: CompressionType) -> Box<dyn Compressor> {
146 match compression_type {
147 CompressionType::None => Box::new(NoCompressor),
148 CompressionType::Lz4 => Box::new(Lz4Compressor),
149 CompressionType::Zstd { level } => Box::new(ZstdCompressor::new(level)),
150 }
151}
152
153#[derive(Debug, Default)]
155pub struct CompressionStats {
156 pub bytes_before_compression: AtomicU64,
158 pub bytes_after_compression: AtomicU64,
160 pub pages_compressed: AtomicU64,
162 pub pages_skipped: AtomicU64,
164 pub pages_decompressed: AtomicU64,
166}
167
168impl CompressionStats {
169 pub fn new() -> Self {
171 Self::default()
172 }
173
174 pub fn compression_ratio(&self) -> f64 {
177 let before = self.bytes_before_compression.load(Ordering::Relaxed);
178 let after = self.bytes_after_compression.load(Ordering::Relaxed);
179
180 if before == 0 {
181 1.0
182 } else {
183 after as f64 / before as f64
184 }
185 }
186
187 pub fn space_savings_percent(&self) -> f64 {
189 (1.0 - self.compression_ratio()) * 100.0
190 }
191
192 pub fn snapshot(&self) -> CompressionStatsSnapshot {
194 CompressionStatsSnapshot {
195 bytes_before_compression: self.bytes_before_compression.load(Ordering::Relaxed),
196 bytes_after_compression: self.bytes_after_compression.load(Ordering::Relaxed),
197 pages_compressed: self.pages_compressed.load(Ordering::Relaxed),
198 pages_skipped: self.pages_skipped.load(Ordering::Relaxed),
199 pages_decompressed: self.pages_decompressed.load(Ordering::Relaxed),
200 }
201 }
202
203 pub fn record_compression(&self, original_size: usize, compressed_size: usize) {
205 self.bytes_before_compression
206 .fetch_add(original_size as u64, Ordering::Relaxed);
207 self.bytes_after_compression
208 .fetch_add(compressed_size as u64, Ordering::Relaxed);
209 self.pages_compressed.fetch_add(1, Ordering::Relaxed);
210 }
211
212 pub fn record_skipped(&self, size: usize) {
214 self.bytes_before_compression
215 .fetch_add(size as u64, Ordering::Relaxed);
216 self.bytes_after_compression
217 .fetch_add(size as u64, Ordering::Relaxed);
218 self.pages_skipped.fetch_add(1, Ordering::Relaxed);
219 }
220
221 pub fn record_decompression(&self) {
223 self.pages_decompressed.fetch_add(1, Ordering::Relaxed);
224 }
225
226 pub fn reset(&self) {
228 self.bytes_before_compression.store(0, Ordering::Relaxed);
229 self.bytes_after_compression.store(0, Ordering::Relaxed);
230 self.pages_compressed.store(0, Ordering::Relaxed);
231 self.pages_skipped.store(0, Ordering::Relaxed);
232 self.pages_decompressed.store(0, Ordering::Relaxed);
233 }
234}
235
236#[derive(Debug, Clone, Copy)]
238pub struct CompressionStatsSnapshot {
239 pub bytes_before_compression: u64,
240 pub bytes_after_compression: u64,
241 pub pages_compressed: u64,
242 pub pages_skipped: u64,
243 pub pages_decompressed: u64,
244}
245
246impl CompressionStatsSnapshot {
247 pub fn compression_ratio(&self) -> f64 {
250 if self.bytes_before_compression == 0 {
251 1.0
252 } else {
253 self.bytes_after_compression as f64 / self.bytes_before_compression as f64
254 }
255 }
256
257 pub fn space_savings_percent(&self) -> f64 {
259 (1.0 - self.compression_ratio()) * 100.0
260 }
261}
262
263impl Clone for CompressionStats {
264 fn clone(&self) -> Self {
265 CompressionStats {
266 bytes_before_compression: AtomicU64::new(
267 self.bytes_before_compression.load(Ordering::Relaxed),
268 ),
269 bytes_after_compression: AtomicU64::new(
270 self.bytes_after_compression.load(Ordering::Relaxed),
271 ),
272 pages_compressed: AtomicU64::new(self.pages_compressed.load(Ordering::Relaxed)),
273 pages_skipped: AtomicU64::new(self.pages_skipped.load(Ordering::Relaxed)),
274 pages_decompressed: AtomicU64::new(self.pages_decompressed.load(Ordering::Relaxed)),
275 }
276 }
277}
278
279#[derive(Debug, Clone, Copy)]
282pub struct CompressedPageHeader {
283 pub magic: u8,
285 pub compression_type: u8,
287 pub original_size: u32,
289 pub compressed_size: u32,
291 pub compression_level: u8,
293 pub reserved: u8,
295}
296
297impl CompressedPageHeader {
298 pub const MAGIC: u8 = 0xC0;
300
301 pub const SIZE: usize = 12;
303
304 pub fn new(
306 compression_type: CompressionType,
307 original_size: usize,
308 compressed_size: usize,
309 ) -> Self {
310 let (comp_type, level) = match compression_type {
311 CompressionType::None => (0, 0),
312 CompressionType::Lz4 => (1, 0),
313 CompressionType::Zstd { level } => (2, level as u8),
314 };
315
316 CompressedPageHeader {
317 magic: Self::MAGIC,
318 compression_type: comp_type,
319 original_size: original_size as u32,
320 compressed_size: compressed_size as u32,
321 compression_level: level,
322 reserved: 0,
323 }
324 }
325
326 pub fn to_bytes(&self) -> [u8; Self::SIZE] {
328 let mut buf = [0u8; Self::SIZE];
329 buf[0] = self.magic;
330 buf[1] = self.compression_type;
331 buf[2..6].copy_from_slice(&self.original_size.to_le_bytes());
332 buf[6..10].copy_from_slice(&self.compressed_size.to_le_bytes());
333 buf[10] = self.compression_level;
334 buf[11] = self.reserved;
335 buf
336 }
337
338 pub fn from_bytes(data: &[u8]) -> Option<Self> {
340 if data.len() < Self::SIZE {
341 return None;
342 }
343
344 if data[0] != Self::MAGIC {
345 return None;
346 }
347
348 Some(CompressedPageHeader {
349 magic: data[0],
350 compression_type: data[1],
351 original_size: u32::from_le_bytes(data[2..6].try_into().ok()?),
352 compressed_size: u32::from_le_bytes(data[6..10].try_into().ok()?),
353 compression_level: data[10],
354 reserved: data[11],
355 })
356 }
357
358 pub fn get_compression_type(&self) -> CompressionType {
360 CompressionType::from_byte(self.compression_type, self.compression_level as i32)
361 }
362
363 pub fn is_valid(&self) -> bool {
365 self.magic == Self::MAGIC && self.compression_type <= 2
366 }
367}
368
369pub struct PageCompressor {
372 compressor: Box<dyn Compressor>,
374 threshold: usize,
376 pub stats: CompressionStats,
378}
379
380impl PageCompressor {
381 pub const DEFAULT_THRESHOLD: usize = 512;
383
384 pub fn new(compression_type: CompressionType, threshold: usize) -> Self {
386 PageCompressor {
387 compressor: create_compressor(compression_type),
388 threshold,
389 stats: CompressionStats::new(),
390 }
391 }
392
393 pub fn default_lz4() -> Self {
395 Self::new(CompressionType::Lz4, Self::DEFAULT_THRESHOLD)
396 }
397
398 pub fn default_zstd() -> Self {
400 Self::new(CompressionType::zstd_default(), Self::DEFAULT_THRESHOLD)
401 }
402
403 pub fn compress_page(&self, data: &[u8]) -> Result<Vec<u8>> {
406 let compression_type = self.compressor.compression_type();
407
408 if data.len() < self.threshold || matches!(compression_type, CompressionType::None) {
410 self.stats.record_skipped(data.len());
411 return Ok(data.to_vec());
412 }
413
414 let compressed = self.compressor.compress(data)?;
416
417 let total_compressed_size = CompressedPageHeader::SIZE + compressed.len();
420 if total_compressed_size >= data.len() || compressed.len() >= (data.len() * 9 / 10) {
421 self.stats.record_skipped(data.len());
423 return Ok(data.to_vec());
424 }
425
426 let header = CompressedPageHeader::new(compression_type, data.len(), compressed.len());
428 let mut result = Vec::with_capacity(total_compressed_size);
429 result.extend_from_slice(&header.to_bytes());
430 result.extend_from_slice(&compressed);
431
432 self.stats.record_compression(data.len(), result.len());
433 Ok(result)
434 }
435
436 pub fn decompress_page(&self, data: &[u8], page_size: usize) -> Result<Vec<u8>> {
439 if let Some(header) = CompressedPageHeader::from_bytes(data) {
441 if header.is_valid() {
442 let compressed_data = &data[CompressedPageHeader::SIZE..];
444 if compressed_data.len() < header.compressed_size as usize {
445 return Err(Error::DecompressionError {
446 message: "Compressed data truncated".into(),
447 expected_size: header.compressed_size as usize,
448 });
449 }
450
451 let compressed_data = &compressed_data[..header.compressed_size as usize];
452
453 let decompressor = create_compressor(header.get_compression_type());
455 let decompressed =
456 decompressor.decompress(compressed_data, header.original_size as usize)?;
457
458 if decompressed.len() != header.original_size as usize {
460 return Err(Error::DecompressionError {
461 message: format!(
462 "Decompressed size mismatch: expected {}, got {}",
463 header.original_size,
464 decompressed.len()
465 ),
466 expected_size: header.original_size as usize,
467 });
468 }
469
470 self.stats.record_decompression();
471 return Ok(decompressed);
472 }
473 }
474
475 if data.len() < page_size {
477 let mut result = data.to_vec();
478 result.resize(page_size, 0);
479 Ok(result)
480 } else {
481 Ok(data[..page_size].to_vec())
482 }
483 }
484
485 pub fn compression_type(&self) -> CompressionType {
487 self.compressor.compression_type()
488 }
489
490 pub fn threshold(&self) -> usize {
492 self.threshold
493 }
494
495 pub fn stats(&self) -> &CompressionStats {
497 &self.stats
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504
505 #[test]
506 fn test_compression_type_byte_conversion() {
507 assert_eq!(CompressionType::None.to_byte(), 0);
508 assert_eq!(CompressionType::Lz4.to_byte(), 1);
509 assert_eq!(CompressionType::Zstd { level: 5 }.to_byte(), 2);
510
511 assert_eq!(CompressionType::from_byte(0, 0), CompressionType::None);
512 assert_eq!(CompressionType::from_byte(1, 0), CompressionType::Lz4);
513 assert_eq!(
514 CompressionType::from_byte(2, 5),
515 CompressionType::Zstd { level: 5 }
516 );
517 }
518
519 #[test]
520 fn test_lz4_compression() {
521 let compressor = Lz4Compressor;
522 let data = b"Hello, World! This is a test of LZ4 compression. ".repeat(100);
523
524 let compressed = compressor.compress(&data).unwrap();
525 assert!(compressed.len() < data.len());
526
527 let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
528 assert_eq!(decompressed, data);
529 }
530
531 #[test]
532 fn test_zstd_compression() {
533 let compressor = ZstdCompressor::new(3);
534 let data = b"Hello, World! This is a test of ZSTD compression. ".repeat(100);
535
536 let compressed = compressor.compress(&data).unwrap();
537 assert!(compressed.len() < data.len());
538
539 let decompressed = compressor.decompress(&compressed, data.len()).unwrap();
540 assert_eq!(decompressed, data);
541 }
542
543 #[test]
544 fn test_zstd_levels() {
545 let data = b"Test data for compression level comparison. ".repeat(100);
546
547 let low = ZstdCompressor::new(1).compress(&data).unwrap();
549 let high = ZstdCompressor::new(19).compress(&data).unwrap();
550
551 let dec_low = ZstdCompressor::new(1).decompress(&low, data.len()).unwrap();
553 let dec_high = ZstdCompressor::new(19)
554 .decompress(&high, data.len())
555 .unwrap();
556
557 assert_eq!(dec_low, data.to_vec());
558 assert_eq!(dec_high, data.to_vec());
559 }
560
561 #[test]
562 fn test_compressed_page_header() {
563 let header = CompressedPageHeader::new(CompressionType::Lz4, 4096, 2048);
564
565 let bytes = header.to_bytes();
566 let parsed = CompressedPageHeader::from_bytes(&bytes).unwrap();
567
568 assert_eq!(parsed.magic, CompressedPageHeader::MAGIC);
569 assert_eq!(parsed.compression_type, 1); assert_eq!(parsed.original_size, 4096);
571 assert_eq!(parsed.compressed_size, 2048);
572 assert!(parsed.is_valid());
573 }
574
575 #[test]
576 fn test_page_compressor_small_page() {
577 let compressor = PageCompressor::new(CompressionType::Lz4, 512);
578 let small_data = vec![0u8; 256]; let result = compressor.compress_page(&small_data).unwrap();
581 assert_eq!(result, small_data); assert_eq!(compressor.stats.pages_skipped.load(Ordering::Relaxed), 1);
583 }
584
585 #[test]
586 fn test_page_compressor_compressible_data() {
587 let compressor = PageCompressor::new(CompressionType::Lz4, 512);
588 let data = vec![0xABu8; 4096]; let compressed = compressor.compress_page(&data).unwrap();
591 assert!(compressed.len() < data.len());
592
593 let header = CompressedPageHeader::from_bytes(&compressed).unwrap();
595 assert!(header.is_valid());
596 assert_eq!(header.original_size, 4096);
597
598 let decompressed = compressor.decompress_page(&compressed, 4096).unwrap();
600 assert_eq!(decompressed, data);
601 }
602
603 #[test]
604 fn test_page_compressor_incompressible_data() {
605 let compressor = PageCompressor::new(CompressionType::Lz4, 512);
606
607 let data: Vec<u8> = (0..4096).map(|i| (i * 17 + i / 3) as u8).collect();
609
610 let result = compressor.compress_page(&data).unwrap();
611
612 let decompressed = compressor.decompress_page(&result, 4096).unwrap();
614 assert_eq!(decompressed, data);
615 }
616
617 #[test]
618 fn test_compression_stats() {
619 let stats = CompressionStats::new();
620
621 stats.record_compression(4096, 2048);
622 stats.record_compression(4096, 1024);
623
624 assert_eq!(stats.bytes_before_compression.load(Ordering::Relaxed), 8192);
625 assert_eq!(stats.bytes_after_compression.load(Ordering::Relaxed), 3072);
626 assert_eq!(stats.pages_compressed.load(Ordering::Relaxed), 2);
627
628 assert!((stats.compression_ratio() - 0.375).abs() < 0.001);
630
631 assert!((stats.space_savings_percent() - 62.5).abs() < 0.1);
633 }
634
635 #[test]
636 fn test_no_compression() {
637 let compressor = PageCompressor::new(CompressionType::None, 512);
638 let data = vec![0xABu8; 4096];
639
640 let result = compressor.compress_page(&data).unwrap();
641 assert_eq!(result, data); let decompressed = compressor.decompress_page(&result, 4096).unwrap();
644 assert_eq!(decompressed, data);
645 }
646}