1use std::io;
18
19use super::bitpack::{BitPackedInts, DeltaBitPacked};
20use super::bitvec::BitVector;
21use super::runlength::{RunLengthAnalyzer, RunLengthEncoding};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub enum CompressionCodec {
26 None,
28
29 Delta,
31
32 BitPacked {
34 bits: u8,
36 },
37
38 DeltaBitPacked {
40 bits: u8,
42 },
43
44 Dictionary,
46
47 BitVector,
49
50 RunLength,
52}
53
54impl CompressionCodec {
55 #[must_use]
57 pub fn name(&self) -> &'static str {
58 match self {
59 Self::None => "None",
60 Self::Delta => "Delta",
61 Self::BitPacked { .. } => "BitPacked",
62 Self::DeltaBitPacked { .. } => "DeltaBitPacked",
63 Self::Dictionary => "Dictionary",
64 Self::BitVector => "BitVector",
65 Self::RunLength => "RunLength",
66 }
67 }
68
69 #[must_use]
71 pub fn is_lossless(&self) -> bool {
72 true
74 }
75}
76
77#[derive(Debug, Clone)]
82pub struct CompressedData {
83 pub codec: CompressionCodec,
85 pub uncompressed_size: usize,
87 pub data: Vec<u8>,
89 pub metadata: CompressionMetadata,
91}
92
93#[derive(Debug, Clone)]
95pub enum CompressionMetadata {
96 None,
98 Delta {
100 base: i64,
102 },
103 BitPacked {
105 count: usize,
107 },
108 DeltaBitPacked {
110 base: i64,
112 count: usize,
114 },
115 Dictionary {
117 dict_id: u32,
119 },
120 RunLength {
122 run_count: usize,
124 },
125}
126
127impl CompressedData {
128 pub fn uncompressed(data: Vec<u8>) -> Self {
130 let size = data.len();
131 Self {
132 codec: CompressionCodec::None,
133 uncompressed_size: size,
134 data,
135 metadata: CompressionMetadata::None,
136 }
137 }
138
139 #[must_use]
141 pub fn compression_ratio(&self) -> f64 {
142 if self.data.is_empty() {
143 return 1.0;
144 }
145 self.uncompressed_size as f64 / self.data.len() as f64
146 }
147
148 #[must_use]
150 pub fn is_compressed(&self) -> bool {
151 !matches!(self.codec, CompressionCodec::None)
152 }
153}
154
155pub struct CodecSelector;
161
162impl CodecSelector {
163 #[must_use]
171 pub fn select_for_integers(values: &[u64]) -> CompressionCodec {
172 if values.is_empty() {
173 return CompressionCodec::None;
174 }
175
176 if values.len() < 8 {
177 return CompressionCodec::None;
179 }
180
181 let rle_ratio = RunLengthAnalyzer::estimate_ratio(values);
183 let avg_run_length = RunLengthAnalyzer::average_run_length(values);
184
185 if avg_run_length > 2.0 && rle_ratio > 1.5 {
187 return CompressionCodec::RunLength;
188 }
189
190 let is_sorted = values.windows(2).all(|w| w[0] <= w[1]);
192
193 if is_sorted {
194 let deltas: Vec<u64> = values.windows(2).map(|w| w[1] - w[0]).collect();
196 let max_delta = deltas.iter().copied().max().unwrap_or(0);
197 let bits_needed = BitPackedInts::bits_needed(max_delta);
198
199 let delta_ratio = 64.0 / bits_needed as f64;
201
202 if rle_ratio > delta_ratio && rle_ratio > 1.0 {
204 return CompressionCodec::RunLength;
205 }
206
207 return CompressionCodec::DeltaBitPacked { bits: bits_needed };
208 }
209
210 let max_value = values.iter().copied().max().unwrap_or(0);
212 let bits_needed = BitPackedInts::bits_needed(max_value);
213
214 let bitpack_ratio = if bits_needed > 0 {
216 64.0 / bits_needed as f64
217 } else {
218 1.0
219 };
220
221 if rle_ratio > bitpack_ratio && rle_ratio > 1.0 {
223 return CompressionCodec::RunLength;
224 }
225
226 if bits_needed < 32 {
227 CompressionCodec::BitPacked { bits: bits_needed }
228 } else {
229 CompressionCodec::None
230 }
231 }
232
233 #[must_use]
235 pub fn select_for_strings(values: &[&str]) -> CompressionCodec {
236 if values.is_empty() || values.len() < 4 {
237 return CompressionCodec::None;
238 }
239
240 let unique: std::collections::HashSet<_> = values.iter().collect();
242 let cardinality_ratio = unique.len() as f64 / values.len() as f64;
243
244 if cardinality_ratio < 0.5 {
246 CompressionCodec::Dictionary
247 } else {
248 CompressionCodec::None
249 }
250 }
251
252 #[must_use]
254 pub fn select_for_booleans(_values: &[bool]) -> CompressionCodec {
255 CompressionCodec::BitVector
257 }
258}
259
260pub struct TypeSpecificCompressor;
265
266impl TypeSpecificCompressor {
267 pub fn compress_integers(values: &[u64]) -> CompressedData {
269 let codec = CodecSelector::select_for_integers(values);
270
271 match codec {
272 CompressionCodec::None => {
273 let mut data = Vec::with_capacity(values.len() * 8);
274 for &v in values {
275 data.extend_from_slice(&v.to_le_bytes());
276 }
277 CompressedData {
278 codec,
279 uncompressed_size: values.len() * 8,
280 data,
281 metadata: CompressionMetadata::None,
282 }
283 }
284 CompressionCodec::DeltaBitPacked { bits } => {
285 let encoded = DeltaBitPacked::encode(values);
286 CompressedData {
287 codec: CompressionCodec::DeltaBitPacked { bits },
288 uncompressed_size: values.len() * 8,
289 data: encoded.to_bytes(),
290 metadata: CompressionMetadata::DeltaBitPacked {
291 base: encoded.base() as i64,
292 count: values.len(),
293 },
294 }
295 }
296 CompressionCodec::BitPacked { bits } => {
297 let packed = BitPackedInts::pack(values);
298 CompressedData {
299 codec: CompressionCodec::BitPacked { bits },
300 uncompressed_size: values.len() * 8,
301 data: packed.to_bytes(),
302 metadata: CompressionMetadata::BitPacked {
303 count: values.len(),
304 },
305 }
306 }
307 CompressionCodec::RunLength => {
308 let encoded = RunLengthEncoding::encode(values);
309 CompressedData {
310 codec: CompressionCodec::RunLength,
311 uncompressed_size: values.len() * 8,
312 data: encoded.to_bytes(),
313 metadata: CompressionMetadata::RunLength {
314 run_count: encoded.run_count(),
315 },
316 }
317 }
318 _ => unreachable!("Unexpected codec for integers"),
319 }
320 }
321
322 pub fn compress_signed_integers(values: &[i64]) -> CompressedData {
324 let zigzag: Vec<u64> = values
326 .iter()
327 .map(|&v| super::delta::zigzag_encode(v))
328 .collect();
329 Self::compress_integers(&zigzag)
330 }
331
332 pub fn compress_booleans(values: &[bool]) -> CompressedData {
334 let bitvec = BitVector::from_bools(values);
335 CompressedData {
336 codec: CompressionCodec::BitVector,
337 uncompressed_size: values.len(),
338 data: bitvec.to_bytes(),
339 metadata: CompressionMetadata::BitPacked {
340 count: values.len(),
341 },
342 }
343 }
344
345 pub fn decompress_integers(data: &CompressedData) -> io::Result<Vec<u64>> {
347 match data.codec {
348 CompressionCodec::None => {
349 let mut values = Vec::with_capacity(data.data.len() / 8);
350 for chunk in data.data.chunks_exact(8) {
351 values.push(u64::from_le_bytes(chunk.try_into().unwrap()));
352 }
353 Ok(values)
354 }
355 CompressionCodec::DeltaBitPacked { .. } => {
356 let encoded = DeltaBitPacked::from_bytes(&data.data)?;
357 Ok(encoded.decode())
358 }
359 CompressionCodec::BitPacked { .. } => {
360 let packed = BitPackedInts::from_bytes(&data.data)?;
361 Ok(packed.unpack())
362 }
363 CompressionCodec::RunLength => {
364 let encoded = RunLengthEncoding::from_bytes(&data.data)?;
365 Ok(encoded.decode())
366 }
367 _ => Err(io::Error::new(
368 io::ErrorKind::InvalidData,
369 "Invalid codec for integer decompression",
370 )),
371 }
372 }
373
374 pub fn decompress_booleans(data: &CompressedData) -> io::Result<Vec<bool>> {
376 match data.codec {
377 CompressionCodec::BitVector => {
378 let bitvec = BitVector::from_bytes(&data.data)?;
379 Ok(bitvec.to_bools())
380 }
381 _ => Err(io::Error::new(
382 io::ErrorKind::InvalidData,
383 "Invalid codec for boolean decompression",
384 )),
385 }
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392
393 #[test]
394 fn test_codec_selection_sorted_integers() {
395 let sorted: Vec<u64> = (0..100).collect();
396 let codec = CodecSelector::select_for_integers(&sorted);
397 assert!(matches!(codec, CompressionCodec::DeltaBitPacked { .. }));
398 }
399
400 #[test]
401 fn test_codec_selection_small_integers() {
402 let small: Vec<u64> = vec![1, 5, 3, 7, 2, 4, 6, 8];
403 let codec = CodecSelector::select_for_integers(&small);
404 assert!(matches!(codec, CompressionCodec::BitPacked { .. }));
405 }
406
407 #[test]
408 fn test_codec_selection_strings() {
409 let repeated = vec!["a", "b", "a", "a", "b", "a", "c", "a"];
410 let codec = CodecSelector::select_for_strings(&repeated);
411 assert_eq!(codec, CompressionCodec::Dictionary);
412
413 let unique = vec!["a", "b", "c", "d", "e", "f", "g", "h"];
414 let codec = CodecSelector::select_for_strings(&unique);
415 assert_eq!(codec, CompressionCodec::None);
416 }
417
418 #[test]
419 fn test_codec_selection_booleans() {
420 let bools = vec![true, false, true];
421 let codec = CodecSelector::select_for_booleans(&bools);
422 assert_eq!(codec, CompressionCodec::BitVector);
423 }
424
425 #[test]
426 fn test_compress_decompress_sorted_integers() {
427 let values: Vec<u64> = (100..200).collect();
428 let compressed = TypeSpecificCompressor::compress_integers(&values);
429
430 assert!(matches!(
431 compressed.codec,
432 CompressionCodec::DeltaBitPacked { .. }
433 ));
434 assert!(compressed.compression_ratio() > 1.0);
435
436 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
437 assert_eq!(values, decompressed);
438 }
439
440 #[test]
441 fn test_compress_decompress_small_integers() {
442 let values: Vec<u64> = vec![5, 2, 7, 1, 9, 3, 8, 4, 6, 0];
443 let compressed = TypeSpecificCompressor::compress_integers(&values);
444
445 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
446 assert_eq!(values, decompressed);
447 }
448
449 #[test]
450 fn test_compress_decompress_booleans() {
451 let values = vec![true, false, true, true, false, false, true, false];
452 let compressed = TypeSpecificCompressor::compress_booleans(&values);
453
454 assert_eq!(compressed.codec, CompressionCodec::BitVector);
455
456 let decompressed = TypeSpecificCompressor::decompress_booleans(&compressed).unwrap();
457 assert_eq!(values, decompressed);
458 }
459
460 #[test]
461 fn test_compression_ratio() {
462 let values: Vec<u64> = (1000..1100).collect();
464 let compressed = TypeSpecificCompressor::compress_integers(&values);
465
466 let ratio = compressed.compression_ratio();
467 assert!(ratio > 5.0, "Expected ratio > 5, got {}", ratio);
468 }
469
470 #[test]
471 fn test_codec_names() {
472 assert_eq!(CompressionCodec::None.name(), "None");
473 assert_eq!(CompressionCodec::Delta.name(), "Delta");
474 assert_eq!(CompressionCodec::BitPacked { bits: 4 }.name(), "BitPacked");
475 assert_eq!(
476 CompressionCodec::DeltaBitPacked { bits: 4 }.name(),
477 "DeltaBitPacked"
478 );
479 assert_eq!(CompressionCodec::Dictionary.name(), "Dictionary");
480 assert_eq!(CompressionCodec::BitVector.name(), "BitVector");
481 assert_eq!(CompressionCodec::RunLength.name(), "RunLength");
482 }
483
484 #[test]
485 fn test_codec_selection_repetitive_integers() {
486 let repetitive: Vec<u64> = vec![1; 100];
488 let codec = CodecSelector::select_for_integers(&repetitive);
489 assert_eq!(codec, CompressionCodec::RunLength);
490
491 let mut mixed = vec![1u64; 30];
493 mixed.extend(vec![2u64; 30]);
494 mixed.extend(vec![3u64; 30]);
495 let codec = CodecSelector::select_for_integers(&mixed);
496 assert_eq!(codec, CompressionCodec::RunLength);
497 }
498
499 #[test]
500 fn test_compress_decompress_runlength() {
501 let values: Vec<u64> = vec![42; 1000];
503 let compressed = TypeSpecificCompressor::compress_integers(&values);
504
505 assert_eq!(compressed.codec, CompressionCodec::RunLength);
506 assert!(
507 compressed.compression_ratio() > 50.0,
508 "Expected ratio > 50, got {}",
509 compressed.compression_ratio()
510 );
511
512 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
513 assert_eq!(values, decompressed);
514 }
515
516 #[test]
517 fn test_compress_decompress_mixed_runs() {
518 let mut values = vec![1u64; 100];
520 values.extend(vec![2u64; 100]);
521 values.extend(vec![3u64; 100]);
522
523 let compressed = TypeSpecificCompressor::compress_integers(&values);
524
525 assert_eq!(compressed.codec, CompressionCodec::RunLength);
526 assert!(compressed.compression_ratio() > 10.0);
527
528 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
529 assert_eq!(values, decompressed);
530 }
531
532 #[test]
533 fn test_runlength_vs_delta_selection() {
534 let sequential: Vec<u64> = (0..100).collect();
536 let codec = CodecSelector::select_for_integers(&sequential);
537 assert!(matches!(codec, CompressionCodec::DeltaBitPacked { .. }));
538
539 let constant: Vec<u64> = vec![100; 100];
541 let codec = CodecSelector::select_for_integers(&constant);
542 assert_eq!(codec, CompressionCodec::RunLength);
543 }
544}