1use crate::error::{CacheError, Result};
9use bytes::Bytes;
10use std::collections::HashMap;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
14pub enum CompressionCodec {
15 None,
17 Lz4,
19 Zstd,
21 Snappy,
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum CompressionLevel {
28 Fast,
30 Default,
32 Best,
34}
35
36impl CompressionLevel {
37 pub fn to_zstd_level(&self) -> i32 {
39 match self {
40 CompressionLevel::Fast => 1,
41 CompressionLevel::Default => 3,
42 CompressionLevel::Best => 19,
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct CompressionStats {
50 pub original_size: usize,
52 pub compressed_size: usize,
54 pub compression_time_us: u64,
56 pub decompression_time_us: u64,
58 pub codec: CompressionCodec,
60}
61
62impl CompressionStats {
63 pub fn compression_ratio(&self) -> f64 {
65 if self.compressed_size > 0 {
66 self.original_size as f64 / self.compressed_size as f64
67 } else {
68 1.0
69 }
70 }
71
72 pub fn compression_throughput_mbps(&self) -> f64 {
74 if self.compression_time_us > 0 {
75 let mb = self.original_size as f64 / (1024.0 * 1024.0);
76 let seconds = self.compression_time_us as f64 / 1_000_000.0;
77 mb / seconds
78 } else {
79 0.0
80 }
81 }
82
83 pub fn decompression_throughput_mbps(&self) -> f64 {
85 if self.decompression_time_us > 0 {
86 let mb = self.compressed_size as f64 / (1024.0 * 1024.0);
87 let seconds = self.decompression_time_us as f64 / 1_000_000.0;
88 mb / seconds
89 } else {
90 0.0
91 }
92 }
93
94 pub fn efficiency_score(&self) -> f64 {
96 self.compression_ratio() * self.compression_throughput_mbps()
97 }
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
102pub enum DataType {
103 Binary,
105 Text,
107 Image,
109 Numerical,
111 Compressed,
113}
114
115pub struct AdaptiveCompressor {
117 performance_history: HashMap<(CompressionCodec, DataType), Vec<CompressionStats>>,
119 default_level: CompressionLevel,
121 min_compress_size: usize,
123 max_history: usize,
125}
126
127impl AdaptiveCompressor {
128 pub fn new() -> Self {
130 Self {
131 performance_history: HashMap::new(),
132 default_level: CompressionLevel::Default,
133 min_compress_size: 1024, max_history: 100,
135 }
136 }
137
138 pub fn with_level(mut self, level: CompressionLevel) -> Self {
140 self.default_level = level;
141 self
142 }
143
144 pub fn with_min_size(mut self, size: usize) -> Self {
146 self.min_compress_size = size;
147 self
148 }
149
150 pub fn compress(
152 &mut self,
153 data: &[u8],
154 codec: CompressionCodec,
155 data_type: DataType,
156 ) -> Result<Bytes> {
157 if data.len() < self.min_compress_size {
158 return Ok(Bytes::copy_from_slice(data));
159 }
160
161 let start = std::time::Instant::now();
162
163 let compressed = match codec {
164 CompressionCodec::None => Bytes::copy_from_slice(data),
165 CompressionCodec::Lz4 => self.compress_lz4(data)?,
166 CompressionCodec::Zstd => self.compress_zstd(data)?,
167 CompressionCodec::Snappy => self.compress_snappy(data)?,
168 };
169
170 let compression_time_us = start.elapsed().as_micros() as u64;
171
172 let stats = CompressionStats {
174 original_size: data.len(),
175 compressed_size: compressed.len(),
176 compression_time_us,
177 decompression_time_us: 0,
178 codec,
179 };
180
181 self.record_stats(data_type, stats);
182
183 Ok(compressed)
184 }
185
186 pub fn decompress(&mut self, data: &[u8], codec: CompressionCodec) -> Result<Bytes> {
188 let start = std::time::Instant::now();
189
190 let decompressed = match codec {
191 CompressionCodec::None => Bytes::copy_from_slice(data),
192 CompressionCodec::Lz4 => self.decompress_lz4(data)?,
193 CompressionCodec::Zstd => self.decompress_zstd(data)?,
194 CompressionCodec::Snappy => self.decompress_snappy(data)?,
195 };
196
197 let _decompression_time_us = start.elapsed().as_micros() as u64;
198
199 Ok(decompressed)
200 }
201
202 pub fn select_codec(&self, data_type: DataType) -> CompressionCodec {
204 let mut best_codec = CompressionCodec::Lz4; let mut best_score = 0.0;
207
208 for codec in &[
209 CompressionCodec::Lz4,
210 CompressionCodec::Zstd,
211 CompressionCodec::Snappy,
212 ] {
213 if let Some(stats_vec) = self.performance_history.get(&(*codec, data_type)) {
214 if !stats_vec.is_empty() {
215 let avg_score: f64 =
216 stats_vec.iter().map(|s| s.efficiency_score()).sum::<f64>()
217 / stats_vec.len() as f64;
218
219 if avg_score > best_score {
220 best_score = avg_score;
221 best_codec = *codec;
222 }
223 }
224 }
225 }
226
227 if best_score == 0.0 {
229 return self.heuristic_codec(data_type);
230 }
231
232 best_codec
233 }
234
235 fn heuristic_codec(&self, data_type: DataType) -> CompressionCodec {
237 match data_type {
238 DataType::Binary => CompressionCodec::Lz4,
239 DataType::Text => CompressionCodec::Zstd,
240 DataType::Image => CompressionCodec::Lz4,
241 DataType::Numerical => CompressionCodec::Zstd,
242 DataType::Compressed => CompressionCodec::None,
243 }
244 }
245
246 fn compress_lz4(&self, data: &[u8]) -> Result<Bytes> {
248 lz4::block::compress(data, None, true)
250 .map(Bytes::from)
251 .map_err(|e| CacheError::Compression(e.to_string()))
252 }
253
254 fn decompress_lz4(&self, data: &[u8]) -> Result<Bytes> {
256 lz4::block::decompress(data, None)
257 .map(Bytes::from)
258 .map_err(|e| CacheError::Decompression(e.to_string()))
259 }
260
261 fn compress_zstd(&self, data: &[u8]) -> Result<Bytes> {
263 let level = self.default_level.to_zstd_level();
264 oxiarc_zstd::encode_all(data, level)
265 .map(Bytes::from)
266 .map_err(|e| CacheError::Compression(e.to_string()))
267 }
268
269 fn decompress_zstd(&self, data: &[u8]) -> Result<Bytes> {
271 oxiarc_zstd::decode_all(data)
272 .map(Bytes::from)
273 .map_err(|e| CacheError::Decompression(e.to_string()))
274 }
275
276 fn compress_snappy(&self, data: &[u8]) -> Result<Bytes> {
278 let mut encoder = snap::raw::Encoder::new();
279 encoder
280 .compress_vec(data)
281 .map(Bytes::from)
282 .map_err(|e| CacheError::Compression(e.to_string()))
283 }
284
285 fn decompress_snappy(&self, data: &[u8]) -> Result<Bytes> {
287 let mut decoder = snap::raw::Decoder::new();
288 decoder
289 .decompress_vec(data)
290 .map(Bytes::from)
291 .map_err(|e| CacheError::Decompression(e.to_string()))
292 }
293
294 fn record_stats(&mut self, data_type: DataType, stats: CompressionStats) {
296 let key = (stats.codec, data_type);
297 let history = self.performance_history.entry(key).or_default();
298
299 history.push(stats);
300
301 if history.len() > self.max_history {
303 history.remove(0);
304 }
305 }
306
307 pub fn avg_compression_ratio(
309 &self,
310 codec: CompressionCodec,
311 data_type: DataType,
312 ) -> Option<f64> {
313 self.performance_history
314 .get(&(codec, data_type))
315 .and_then(|stats_vec| {
316 if stats_vec.is_empty() {
317 None
318 } else {
319 let avg = stats_vec.iter().map(|s| s.compression_ratio()).sum::<f64>()
320 / stats_vec.len() as f64;
321 Some(avg)
322 }
323 })
324 }
325
326 pub fn get_performance_stats(
328 &self,
329 ) -> HashMap<(CompressionCodec, DataType), PerformanceMetrics> {
330 let mut result = HashMap::new();
331
332 for (key, stats_vec) in &self.performance_history {
333 if stats_vec.is_empty() {
334 continue;
335 }
336
337 let avg_ratio = stats_vec.iter().map(|s| s.compression_ratio()).sum::<f64>()
338 / stats_vec.len() as f64;
339
340 let avg_comp_throughput = stats_vec
341 .iter()
342 .map(|s| s.compression_throughput_mbps())
343 .sum::<f64>()
344 / stats_vec.len() as f64;
345
346 let avg_decomp_throughput = stats_vec
347 .iter()
348 .filter(|s| s.decompression_time_us > 0)
349 .map(|s| s.decompression_throughput_mbps())
350 .sum::<f64>()
351 / stats_vec.len() as f64;
352
353 result.insert(
354 *key,
355 PerformanceMetrics {
356 avg_compression_ratio: avg_ratio,
357 avg_compression_throughput_mbps: avg_comp_throughput,
358 avg_decompression_throughput_mbps: avg_decomp_throughput,
359 sample_count: stats_vec.len(),
360 },
361 );
362 }
363
364 result
365 }
366
367 pub fn clear_history(&mut self) {
369 self.performance_history.clear();
370 }
371}
372
373impl Default for AdaptiveCompressor {
374 fn default() -> Self {
375 Self::new()
376 }
377}
378
379#[derive(Debug, Clone)]
381pub struct PerformanceMetrics {
382 pub avg_compression_ratio: f64,
384 pub avg_compression_throughput_mbps: f64,
386 pub avg_decompression_throughput_mbps: f64,
388 pub sample_count: usize,
390}
391
392#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
394pub struct CompressedData {
395 #[serde(with = "serde_bytes")]
397 pub data: Vec<u8>,
398 pub codec: CompressionCodec,
400 pub original_size: usize,
402}
403
404impl CompressedData {
405 pub fn new(data: Vec<u8>, codec: CompressionCodec, original_size: usize) -> Self {
407 Self {
408 data,
409 codec,
410 original_size,
411 }
412 }
413
414 pub fn decompress(&self, compressor: &mut AdaptiveCompressor) -> Result<Bytes> {
416 let decompressed = compressor.decompress(&self.data, self.codec)?;
417
418 if decompressed.len() != self.original_size {
420 return Err(CacheError::Decompression(format!(
421 "Size mismatch: expected {}, got {}",
422 self.original_size,
423 decompressed.len()
424 )));
425 }
426
427 Ok(decompressed)
428 }
429
430 pub fn compressed_size(&self) -> usize {
432 self.data.len()
433 }
434
435 pub fn compression_ratio(&self) -> f64 {
437 if !self.data.is_empty() {
438 self.original_size as f64 / self.data.len() as f64
439 } else {
440 1.0
441 }
442 }
443}
444
445mod serde_bytes {
446 use serde::{Deserialize, Deserializer, Serialize, Serializer};
447
448 pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> std::result::Result<S::Ok, S::Error>
449 where
450 S: Serializer,
451 {
452 bytes.serialize(serializer)
453 }
454
455 pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
456 where
457 D: Deserializer<'de>,
458 {
459 Vec::<u8>::deserialize(deserializer)
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 #[test]
468 fn test_lz4_compression() {
469 let mut compressor = AdaptiveCompressor::new();
470 let data = b"Hello, World! ".repeat(100);
471
472 let compressed = compressor
473 .compress(&data, CompressionCodec::Lz4, DataType::Text)
474 .expect("compression failed");
475
476 assert!(compressed.len() < data.len());
477
478 let decompressed = compressor
479 .decompress(&compressed, CompressionCodec::Lz4)
480 .expect("decompression failed");
481
482 assert_eq!(decompressed.as_ref(), &data[..]);
483 }
484
485 #[test]
486 fn test_zstd_compression() {
487 let mut compressor = AdaptiveCompressor::new();
488 let data = b"Test data for compression ".repeat(50);
489
490 let compressed = compressor
491 .compress(&data, CompressionCodec::Zstd, DataType::Text)
492 .expect("compression failed");
493
494 assert!(compressed.len() < data.len());
495
496 let decompressed = compressor
497 .decompress(&compressed, CompressionCodec::Zstd)
498 .expect("decompression failed");
499
500 assert_eq!(decompressed.as_ref(), &data[..]);
501 }
502
503 #[test]
504 fn test_snappy_compression() {
505 let mut compressor = AdaptiveCompressor::new();
506 let data = b"Snappy compression test ".repeat(50);
507
508 let compressed = compressor
509 .compress(&data, CompressionCodec::Snappy, DataType::Binary)
510 .expect("compression failed");
511
512 assert!(compressed.len() < data.len());
513
514 let decompressed = compressor
515 .decompress(&compressed, CompressionCodec::Snappy)
516 .expect("decompression failed");
517
518 assert_eq!(decompressed.as_ref(), &data[..]);
519 }
520
521 #[test]
522 fn test_codec_selection() {
523 let compressor = AdaptiveCompressor::new();
524
525 assert_eq!(
527 compressor.select_codec(DataType::Text),
528 CompressionCodec::Zstd
529 );
530 assert_eq!(
531 compressor.select_codec(DataType::Binary),
532 CompressionCodec::Lz4
533 );
534 assert_eq!(
535 compressor.select_codec(DataType::Compressed),
536 CompressionCodec::None
537 );
538 }
539
540 #[test]
541 fn test_min_compress_size() {
542 let mut compressor = AdaptiveCompressor::new().with_min_size(1000);
543 let small_data = b"small";
544
545 let result = compressor
546 .compress(small_data, CompressionCodec::Lz4, DataType::Binary)
547 .expect("compression failed");
548
549 assert_eq!(result.len(), small_data.len());
551 }
552
553 #[test]
554 fn test_compression_stats() {
555 let stats = CompressionStats {
556 original_size: 1000,
557 compressed_size: 500,
558 compression_time_us: 1000,
559 decompression_time_us: 500,
560 codec: CompressionCodec::Lz4,
561 };
562
563 approx::assert_relative_eq!(stats.compression_ratio(), 2.0, epsilon = 0.01);
564 assert!(stats.compression_throughput_mbps() > 0.0);
565 assert!(stats.decompression_throughput_mbps() > 0.0);
566 }
567
568 #[test]
569 fn test_compressed_data() {
570 let mut compressor = AdaptiveCompressor::new();
571 let original = b"Test data for compression ratio validation! ".repeat(100);
574
575 let compressed_bytes = compressor
576 .compress(&original, CompressionCodec::Zstd, DataType::Binary)
577 .expect("compression failed");
578
579 let compressed_data = CompressedData::new(
580 compressed_bytes.to_vec(),
581 CompressionCodec::Zstd,
582 original.len(),
583 );
584
585 assert!(compressed_data.compression_ratio() > 1.0);
586
587 let decompressed = compressed_data
588 .decompress(&mut compressor)
589 .expect("decompression failed");
590
591 assert_eq!(decompressed.as_ref(), &original[..]);
592 }
593}