1use crate::error::{CacheError, Result};
9use bytes::Bytes;
10use std::collections::HashMap;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
14pub enum CompressionCodec {
15 None,
17 Lz4,
19 Zstd,
21 Snappy,
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum CompressionLevel {
28 Fast,
30 Default,
32 Best,
34}
35
36impl CompressionLevel {
37 pub fn to_zstd_level(&self) -> i32 {
39 match self {
40 CompressionLevel::Fast => 1,
41 CompressionLevel::Default => 3,
42 CompressionLevel::Best => 19,
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct CompressionStats {
50 pub original_size: usize,
52 pub compressed_size: usize,
54 pub compression_time_us: u64,
56 pub decompression_time_us: u64,
58 pub codec: CompressionCodec,
60}
61
62impl CompressionStats {
63 pub fn compression_ratio(&self) -> f64 {
65 if self.compressed_size > 0 {
66 self.original_size as f64 / self.compressed_size as f64
67 } else {
68 1.0
69 }
70 }
71
72 pub fn compression_throughput_mbps(&self) -> f64 {
74 if self.compression_time_us > 0 {
75 let mb = self.original_size as f64 / (1024.0 * 1024.0);
76 let seconds = self.compression_time_us as f64 / 1_000_000.0;
77 mb / seconds
78 } else {
79 0.0
80 }
81 }
82
83 pub fn decompression_throughput_mbps(&self) -> f64 {
85 if self.decompression_time_us > 0 {
86 let mb = self.compressed_size as f64 / (1024.0 * 1024.0);
87 let seconds = self.decompression_time_us as f64 / 1_000_000.0;
88 mb / seconds
89 } else {
90 0.0
91 }
92 }
93
94 pub fn efficiency_score(&self) -> f64 {
96 self.compression_ratio() * self.compression_throughput_mbps()
97 }
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
102pub enum DataType {
103 Binary,
105 Text,
107 Image,
109 Numerical,
111 Compressed,
113}
114
115pub struct AdaptiveCompressor {
117 performance_history: HashMap<(CompressionCodec, DataType), Vec<CompressionStats>>,
119 default_level: CompressionLevel,
121 min_compress_size: usize,
123 max_history: usize,
125}
126
127impl AdaptiveCompressor {
128 pub fn new() -> Self {
130 Self {
131 performance_history: HashMap::new(),
132 default_level: CompressionLevel::Default,
133 min_compress_size: 1024, max_history: 100,
135 }
136 }
137
138 pub fn with_level(mut self, level: CompressionLevel) -> Self {
140 self.default_level = level;
141 self
142 }
143
144 pub fn with_min_size(mut self, size: usize) -> Self {
146 self.min_compress_size = size;
147 self
148 }
149
150 pub fn compress(
152 &mut self,
153 data: &[u8],
154 codec: CompressionCodec,
155 data_type: DataType,
156 ) -> Result<Bytes> {
157 if data.len() < self.min_compress_size {
158 return Ok(Bytes::copy_from_slice(data));
159 }
160
161 let start = std::time::Instant::now();
162
163 let compressed = match codec {
164 CompressionCodec::None => Bytes::copy_from_slice(data),
165 CompressionCodec::Lz4 => self.compress_lz4(data)?,
166 CompressionCodec::Zstd => self.compress_zstd(data)?,
167 CompressionCodec::Snappy => self.compress_snappy(data)?,
168 };
169
170 let compression_time_us = start.elapsed().as_micros() as u64;
171
172 let stats = CompressionStats {
174 original_size: data.len(),
175 compressed_size: compressed.len(),
176 compression_time_us,
177 decompression_time_us: 0,
178 codec,
179 };
180
181 self.record_stats(data_type, stats);
182
183 Ok(compressed)
184 }
185
186 pub fn decompress(&mut self, data: &[u8], codec: CompressionCodec) -> Result<Bytes> {
188 let start = std::time::Instant::now();
189
190 let decompressed = match codec {
191 CompressionCodec::None => Bytes::copy_from_slice(data),
192 CompressionCodec::Lz4 => self.decompress_lz4(data)?,
193 CompressionCodec::Zstd => self.decompress_zstd(data)?,
194 CompressionCodec::Snappy => self.decompress_snappy(data)?,
195 };
196
197 let _decompression_time_us = start.elapsed().as_micros() as u64;
198
199 Ok(decompressed)
200 }
201
202 pub fn select_codec(&self, data_type: DataType) -> CompressionCodec {
204 let mut best_codec = CompressionCodec::Lz4; let mut best_score = 0.0;
207
208 for codec in &[
209 CompressionCodec::Lz4,
210 CompressionCodec::Zstd,
211 CompressionCodec::Snappy,
212 ] {
213 if let Some(stats_vec) = self.performance_history.get(&(*codec, data_type)) {
214 if !stats_vec.is_empty() {
215 let avg_score: f64 =
216 stats_vec.iter().map(|s| s.efficiency_score()).sum::<f64>()
217 / stats_vec.len() as f64;
218
219 if avg_score > best_score {
220 best_score = avg_score;
221 best_codec = *codec;
222 }
223 }
224 }
225 }
226
227 if best_score == 0.0 {
229 return self.heuristic_codec(data_type);
230 }
231
232 best_codec
233 }
234
235 fn heuristic_codec(&self, data_type: DataType) -> CompressionCodec {
237 match data_type {
238 DataType::Binary => CompressionCodec::Lz4,
239 DataType::Text => CompressionCodec::Zstd,
240 DataType::Image => CompressionCodec::Lz4,
241 DataType::Numerical => CompressionCodec::Zstd,
242 DataType::Compressed => CompressionCodec::None,
243 }
244 }
245
246 fn compress_lz4(&self, data: &[u8]) -> Result<Bytes> {
248 let compressed =
250 oxiarc_lz4::compress_block(data).map_err(|e| CacheError::Compression(e.to_string()))?;
251 let orig_size = data.len() as i32;
252 let mut result = Vec::with_capacity(4 + compressed.len());
253 result.extend_from_slice(&orig_size.to_le_bytes());
254 result.extend_from_slice(&compressed);
255 Ok(Bytes::from(result))
256 }
257
258 fn decompress_lz4(&self, data: &[u8]) -> Result<Bytes> {
260 if data.len() < 4 {
262 return Err(CacheError::Decompression("LZ4 data too short".to_string()));
263 }
264 let orig_size = i32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
265 let decompressed = oxiarc_lz4::decompress_block(&data[4..], orig_size)
266 .map_err(|e| CacheError::Decompression(e.to_string()))?;
267 Ok(Bytes::from(decompressed))
268 }
269
270 fn compress_zstd(&self, data: &[u8]) -> Result<Bytes> {
272 let level = self.default_level.to_zstd_level();
273 oxiarc_zstd::encode_all(data, level)
274 .map(Bytes::from)
275 .map_err(|e| CacheError::Compression(e.to_string()))
276 }
277
278 fn decompress_zstd(&self, data: &[u8]) -> Result<Bytes> {
280 oxiarc_zstd::decode_all(data)
281 .map(Bytes::from)
282 .map_err(|e| CacheError::Decompression(e.to_string()))
283 }
284
285 fn compress_snappy(&self, data: &[u8]) -> Result<Bytes> {
287 Ok(Bytes::from(oxiarc_snappy::compress(data)))
288 }
289
290 fn decompress_snappy(&self, data: &[u8]) -> Result<Bytes> {
292 oxiarc_snappy::decompress(data)
293 .map(Bytes::from)
294 .map_err(|e| CacheError::Decompression(e.to_string()))
295 }
296
297 fn record_stats(&mut self, data_type: DataType, stats: CompressionStats) {
299 let key = (stats.codec, data_type);
300 let history = self.performance_history.entry(key).or_default();
301
302 history.push(stats);
303
304 if history.len() > self.max_history {
306 history.remove(0);
307 }
308 }
309
310 pub fn avg_compression_ratio(
312 &self,
313 codec: CompressionCodec,
314 data_type: DataType,
315 ) -> Option<f64> {
316 self.performance_history
317 .get(&(codec, data_type))
318 .and_then(|stats_vec| {
319 if stats_vec.is_empty() {
320 None
321 } else {
322 let avg = stats_vec.iter().map(|s| s.compression_ratio()).sum::<f64>()
323 / stats_vec.len() as f64;
324 Some(avg)
325 }
326 })
327 }
328
329 pub fn get_performance_stats(
331 &self,
332 ) -> HashMap<(CompressionCodec, DataType), PerformanceMetrics> {
333 let mut result = HashMap::new();
334
335 for (key, stats_vec) in &self.performance_history {
336 if stats_vec.is_empty() {
337 continue;
338 }
339
340 let avg_ratio = stats_vec.iter().map(|s| s.compression_ratio()).sum::<f64>()
341 / stats_vec.len() as f64;
342
343 let avg_comp_throughput = stats_vec
344 .iter()
345 .map(|s| s.compression_throughput_mbps())
346 .sum::<f64>()
347 / stats_vec.len() as f64;
348
349 let avg_decomp_throughput = stats_vec
350 .iter()
351 .filter(|s| s.decompression_time_us > 0)
352 .map(|s| s.decompression_throughput_mbps())
353 .sum::<f64>()
354 / stats_vec.len() as f64;
355
356 result.insert(
357 *key,
358 PerformanceMetrics {
359 avg_compression_ratio: avg_ratio,
360 avg_compression_throughput_mbps: avg_comp_throughput,
361 avg_decompression_throughput_mbps: avg_decomp_throughput,
362 sample_count: stats_vec.len(),
363 },
364 );
365 }
366
367 result
368 }
369
370 pub fn clear_history(&mut self) {
372 self.performance_history.clear();
373 }
374}
375
376impl Default for AdaptiveCompressor {
377 fn default() -> Self {
378 Self::new()
379 }
380}
381
382#[derive(Debug, Clone)]
384pub struct PerformanceMetrics {
385 pub avg_compression_ratio: f64,
387 pub avg_compression_throughput_mbps: f64,
389 pub avg_decompression_throughput_mbps: f64,
391 pub sample_count: usize,
393}
394
395#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
397pub struct CompressedData {
398 #[serde(with = "serde_bytes")]
400 pub data: Vec<u8>,
401 pub codec: CompressionCodec,
403 pub original_size: usize,
405}
406
407impl CompressedData {
408 pub fn new(data: Vec<u8>, codec: CompressionCodec, original_size: usize) -> Self {
410 Self {
411 data,
412 codec,
413 original_size,
414 }
415 }
416
417 pub fn decompress(&self, compressor: &mut AdaptiveCompressor) -> Result<Bytes> {
419 let decompressed = compressor.decompress(&self.data, self.codec)?;
420
421 if decompressed.len() != self.original_size {
423 return Err(CacheError::Decompression(format!(
424 "Size mismatch: expected {}, got {}",
425 self.original_size,
426 decompressed.len()
427 )));
428 }
429
430 Ok(decompressed)
431 }
432
433 pub fn compressed_size(&self) -> usize {
435 self.data.len()
436 }
437
438 pub fn compression_ratio(&self) -> f64 {
440 if !self.data.is_empty() {
441 self.original_size as f64 / self.data.len() as f64
442 } else {
443 1.0
444 }
445 }
446}
447
448mod serde_bytes {
449 use serde::{Deserialize, Deserializer, Serialize, Serializer};
450
451 pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> std::result::Result<S::Ok, S::Error>
452 where
453 S: Serializer,
454 {
455 bytes.serialize(serializer)
456 }
457
458 pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result<Vec<u8>, D::Error>
459 where
460 D: Deserializer<'de>,
461 {
462 Vec::<u8>::deserialize(deserializer)
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469
470 #[test]
471 fn test_lz4_compression() {
472 let mut compressor = AdaptiveCompressor::new();
473 let data = b"Hello, World! ".repeat(100);
474
475 let compressed = compressor
476 .compress(&data, CompressionCodec::Lz4, DataType::Text)
477 .expect("compression failed");
478
479 assert!(compressed.len() < data.len());
480
481 let decompressed = compressor
482 .decompress(&compressed, CompressionCodec::Lz4)
483 .expect("decompression failed");
484
485 assert_eq!(decompressed.as_ref(), &data[..]);
486 }
487
488 #[test]
489 fn test_zstd_compression() {
490 let mut compressor = AdaptiveCompressor::new();
491 let data = b"Test data for compression ".repeat(50);
492
493 let compressed = compressor
494 .compress(&data, CompressionCodec::Zstd, DataType::Text)
495 .expect("compression failed");
496
497 assert!(compressed.len() < data.len());
498
499 let decompressed = compressor
500 .decompress(&compressed, CompressionCodec::Zstd)
501 .expect("decompression failed");
502
503 assert_eq!(decompressed.as_ref(), &data[..]);
504 }
505
506 #[test]
507 fn test_snappy_compression() {
508 let mut compressor = AdaptiveCompressor::new();
509 let data = b"Snappy compression test ".repeat(50);
510
511 let compressed = compressor
512 .compress(&data, CompressionCodec::Snappy, DataType::Binary)
513 .expect("compression failed");
514
515 assert!(compressed.len() < data.len());
516
517 let decompressed = compressor
518 .decompress(&compressed, CompressionCodec::Snappy)
519 .expect("decompression failed");
520
521 assert_eq!(decompressed.as_ref(), &data[..]);
522 }
523
524 #[test]
525 fn test_codec_selection() {
526 let compressor = AdaptiveCompressor::new();
527
528 assert_eq!(
530 compressor.select_codec(DataType::Text),
531 CompressionCodec::Zstd
532 );
533 assert_eq!(
534 compressor.select_codec(DataType::Binary),
535 CompressionCodec::Lz4
536 );
537 assert_eq!(
538 compressor.select_codec(DataType::Compressed),
539 CompressionCodec::None
540 );
541 }
542
543 #[test]
544 fn test_min_compress_size() {
545 let mut compressor = AdaptiveCompressor::new().with_min_size(1000);
546 let small_data = b"small";
547
548 let result = compressor
549 .compress(small_data, CompressionCodec::Lz4, DataType::Binary)
550 .expect("compression failed");
551
552 assert_eq!(result.len(), small_data.len());
554 }
555
556 #[test]
557 fn test_compression_stats() {
558 let stats = CompressionStats {
559 original_size: 1000,
560 compressed_size: 500,
561 compression_time_us: 1000,
562 decompression_time_us: 500,
563 codec: CompressionCodec::Lz4,
564 };
565
566 approx::assert_relative_eq!(stats.compression_ratio(), 2.0, epsilon = 0.01);
567 assert!(stats.compression_throughput_mbps() > 0.0);
568 assert!(stats.decompression_throughput_mbps() > 0.0);
569 }
570
571 #[test]
572 fn test_compressed_data() {
573 let mut compressor = AdaptiveCompressor::new();
574 let original = b"Test data for compression ratio validation! ".repeat(100);
577
578 let compressed_bytes = compressor
579 .compress(&original, CompressionCodec::Zstd, DataType::Binary)
580 .expect("compression failed");
581
582 let compressed_data = CompressedData::new(
583 compressed_bytes.to_vec(),
584 CompressionCodec::Zstd,
585 original.len(),
586 );
587
588 assert!(compressed_data.compression_ratio() > 1.0);
589
590 let decompressed = compressed_data
591 .decompress(&mut compressor)
592 .expect("decompression failed");
593
594 assert_eq!(decompressed.as_ref(), &original[..]);
595 }
596}