1use std::io;
18
19use super::bitpack::{BitPackedInts, DeltaBitPacked};
20use super::bitvec::BitVector;
21use super::runlength::{RunLengthAnalyzer, RunLengthEncoding};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25#[non_exhaustive]
26pub enum CompressionCodec {
27 None,
29
30 Delta,
32
33 BitPacked {
35 bits: u8,
37 },
38
39 DeltaBitPacked {
41 bits: u8,
43 },
44
45 Dictionary,
47
48 BitVector,
50
51 RunLength,
53}
54
55impl CompressionCodec {
56 #[must_use]
58 pub fn name(&self) -> &'static str {
59 match self {
60 Self::None => "None",
61 Self::Delta => "Delta",
62 Self::BitPacked { .. } => "BitPacked",
63 Self::DeltaBitPacked { .. } => "DeltaBitPacked",
64 Self::Dictionary => "Dictionary",
65 Self::BitVector => "BitVector",
66 Self::RunLength => "RunLength",
67 }
68 }
69
70 #[must_use]
72 pub fn is_lossless(&self) -> bool {
73 true
75 }
76}
77
78#[derive(Debug, Clone)]
83pub struct CompressedData {
84 pub codec: CompressionCodec,
86 pub uncompressed_size: usize,
88 pub data: Vec<u8>,
90 pub metadata: CompressionMetadata,
92}
93
94#[derive(Debug, Clone)]
96#[non_exhaustive]
97pub enum CompressionMetadata {
98 None,
100 Delta {
102 base: i64,
104 },
105 BitPacked {
107 count: usize,
109 },
110 DeltaBitPacked {
112 base: i64,
114 count: usize,
116 },
117 Dictionary {
119 dict_id: u32,
121 },
122 RunLength {
124 run_count: usize,
126 },
127}
128
129impl CompressedData {
130 pub fn uncompressed(data: Vec<u8>) -> Self {
132 let size = data.len();
133 Self {
134 codec: CompressionCodec::None,
135 uncompressed_size: size,
136 data,
137 metadata: CompressionMetadata::None,
138 }
139 }
140
141 #[must_use]
143 pub fn compression_ratio(&self) -> f64 {
144 if self.data.is_empty() {
145 return 1.0;
146 }
147 self.uncompressed_size as f64 / self.data.len() as f64
148 }
149
150 #[must_use]
152 pub fn is_compressed(&self) -> bool {
153 !matches!(self.codec, CompressionCodec::None)
154 }
155}
156
157pub struct CodecSelector;
163
164impl CodecSelector {
165 #[must_use]
173 pub fn select_for_integers(values: &[u64]) -> CompressionCodec {
174 if values.is_empty() {
175 return CompressionCodec::None;
176 }
177
178 if values.len() < 8 {
179 return CompressionCodec::None;
181 }
182
183 let rle_ratio = RunLengthAnalyzer::estimate_ratio(values);
185 let avg_run_length = RunLengthAnalyzer::average_run_length(values);
186
187 if avg_run_length > 2.0 && rle_ratio > 1.5 {
189 return CompressionCodec::RunLength;
190 }
191
192 let is_sorted = values.windows(2).all(|w| w[0] <= w[1]);
194
195 if is_sorted {
196 let deltas: Vec<u64> = values.windows(2).map(|w| w[1] - w[0]).collect();
198 let max_delta = deltas.iter().copied().max().unwrap_or(0);
199 let bits_needed = BitPackedInts::bits_needed(max_delta);
200
201 let delta_ratio = 64.0 / bits_needed as f64;
203
204 if rle_ratio > delta_ratio && rle_ratio > 1.0 {
206 return CompressionCodec::RunLength;
207 }
208
209 return CompressionCodec::DeltaBitPacked { bits: bits_needed };
210 }
211
212 let max_value = values.iter().copied().max().unwrap_or(0);
214 let bits_needed = BitPackedInts::bits_needed(max_value);
215
216 let bitpack_ratio = if bits_needed > 0 {
218 64.0 / bits_needed as f64
219 } else {
220 1.0
221 };
222
223 if rle_ratio > bitpack_ratio && rle_ratio > 1.0 {
225 return CompressionCodec::RunLength;
226 }
227
228 if bits_needed < 32 {
229 CompressionCodec::BitPacked { bits: bits_needed }
230 } else {
231 CompressionCodec::None
232 }
233 }
234
235 #[must_use]
237 pub fn select_for_strings(values: &[&str]) -> CompressionCodec {
238 if values.is_empty() || values.len() < 4 {
239 return CompressionCodec::None;
240 }
241
242 let unique: std::collections::HashSet<_> = values.iter().collect();
244 let cardinality_ratio = unique.len() as f64 / values.len() as f64;
245
246 if cardinality_ratio < 0.5 {
248 CompressionCodec::Dictionary
249 } else {
250 CompressionCodec::None
251 }
252 }
253
254 #[must_use]
256 pub fn select_for_booleans(_values: &[bool]) -> CompressionCodec {
257 CompressionCodec::BitVector
259 }
260}
261
262pub struct TypeSpecificCompressor;
267
268impl TypeSpecificCompressor {
269 pub fn compress_integers(values: &[u64]) -> io::Result<CompressedData> {
276 let codec = CodecSelector::select_for_integers(values);
277
278 match codec {
279 CompressionCodec::None => {
280 let mut data = Vec::with_capacity(values.len() * 8);
281 for &v in values {
282 data.extend_from_slice(&v.to_le_bytes());
283 }
284 Ok(CompressedData {
285 codec,
286 uncompressed_size: values.len() * 8,
287 data,
288 metadata: CompressionMetadata::None,
289 })
290 }
291 CompressionCodec::DeltaBitPacked { bits } => {
292 let encoded = DeltaBitPacked::encode(values);
293 Ok(CompressedData {
294 codec: CompressionCodec::DeltaBitPacked { bits },
295 uncompressed_size: values.len() * 8,
296 data: encoded.to_bytes()?,
297 metadata: CompressionMetadata::DeltaBitPacked {
298 #[allow(clippy::cast_possible_wrap)]
300 base: encoded.base() as i64,
301 count: values.len(),
302 },
303 })
304 }
305 CompressionCodec::BitPacked { bits } => {
306 let packed = BitPackedInts::pack(values);
307 Ok(CompressedData {
308 codec: CompressionCodec::BitPacked { bits },
309 uncompressed_size: values.len() * 8,
310 data: packed.to_bytes()?,
311 metadata: CompressionMetadata::BitPacked {
312 count: values.len(),
313 },
314 })
315 }
316 CompressionCodec::RunLength => {
317 let encoded = RunLengthEncoding::encode(values);
318 Ok(CompressedData {
319 codec: CompressionCodec::RunLength,
320 uncompressed_size: values.len() * 8,
321 data: encoded.to_bytes(),
322 metadata: CompressionMetadata::RunLength {
323 run_count: encoded.run_count(),
324 },
325 })
326 }
327 _ => unreachable!("Unexpected codec for integers"),
328 }
329 }
330
331 pub fn compress_signed_integers(values: &[i64]) -> io::Result<CompressedData> {
338 let zigzag: Vec<u64> = values
340 .iter()
341 .map(|&v| super::delta::zigzag_encode(v))
342 .collect();
343 Self::compress_integers(&zigzag)
344 }
345
346 pub fn compress_booleans(values: &[bool]) -> io::Result<CompressedData> {
352 let bitvec = BitVector::from_bools(values);
353 Ok(CompressedData {
354 codec: CompressionCodec::BitVector,
355 uncompressed_size: values.len(),
356 data: bitvec.to_bytes()?,
357 metadata: CompressionMetadata::BitPacked {
358 count: values.len(),
359 },
360 })
361 }
362
363 pub fn decompress_integers(data: &CompressedData) -> io::Result<Vec<u64>> {
374 match data.codec {
375 CompressionCodec::None => {
376 let mut values = Vec::with_capacity(data.data.len() / 8);
377 for chunk in data.data.chunks_exact(8) {
378 values.push(u64::from_le_bytes(
379 chunk
380 .try_into()
381 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
382 ));
383 }
384 Ok(values)
385 }
386 CompressionCodec::DeltaBitPacked { .. } => {
387 let encoded = DeltaBitPacked::from_bytes(&data.data)?;
388 Ok(encoded.decode())
389 }
390 CompressionCodec::BitPacked { .. } => {
391 let packed = BitPackedInts::from_bytes(&data.data)?;
392 Ok(packed.unpack())
393 }
394 CompressionCodec::RunLength => {
395 let encoded = RunLengthEncoding::from_bytes(&data.data)?;
396 Ok(encoded.decode())
397 }
398 _ => Err(io::Error::new(
399 io::ErrorKind::InvalidData,
400 "Invalid codec for integer decompression",
401 )),
402 }
403 }
404
405 pub fn decompress_booleans(data: &CompressedData) -> io::Result<Vec<bool>> {
411 match data.codec {
412 CompressionCodec::BitVector => {
413 let bitvec = BitVector::from_bytes(&data.data)?;
414 Ok(bitvec.to_bools())
415 }
416 _ => Err(io::Error::new(
417 io::ErrorKind::InvalidData,
418 "Invalid codec for boolean decompression",
419 )),
420 }
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427
428 #[test]
429 fn test_codec_selection_sorted_integers() {
430 let sorted: Vec<u64> = (0..100).collect();
431 let codec = CodecSelector::select_for_integers(&sorted);
432 assert!(matches!(codec, CompressionCodec::DeltaBitPacked { .. }));
433 }
434
435 #[test]
436 fn test_codec_selection_small_integers() {
437 let small: Vec<u64> = vec![1, 5, 3, 7, 2, 4, 6, 8];
438 let codec = CodecSelector::select_for_integers(&small);
439 assert!(matches!(codec, CompressionCodec::BitPacked { .. }));
440 }
441
442 #[test]
443 fn test_codec_selection_strings() {
444 let repeated = vec!["a", "b", "a", "a", "b", "a", "c", "a"];
445 let codec = CodecSelector::select_for_strings(&repeated);
446 assert_eq!(codec, CompressionCodec::Dictionary);
447
448 let unique = vec!["a", "b", "c", "d", "e", "f", "g", "h"];
449 let codec = CodecSelector::select_for_strings(&unique);
450 assert_eq!(codec, CompressionCodec::None);
451 }
452
453 #[test]
454 fn test_codec_selection_booleans() {
455 let bools = vec![true, false, true];
456 let codec = CodecSelector::select_for_booleans(&bools);
457 assert_eq!(codec, CompressionCodec::BitVector);
458 }
459
460 #[test]
461 fn test_compress_decompress_sorted_integers() {
462 let values: Vec<u64> = (100..200).collect();
463 let compressed = TypeSpecificCompressor::compress_integers(&values).unwrap();
464
465 assert!(matches!(
466 compressed.codec,
467 CompressionCodec::DeltaBitPacked { .. }
468 ));
469 assert!(compressed.compression_ratio() > 1.0);
470
471 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
472 assert_eq!(values, decompressed);
473 }
474
475 #[test]
476 fn test_compress_decompress_small_integers() {
477 let values: Vec<u64> = vec![5, 2, 7, 1, 9, 3, 8, 4, 6, 0];
478 let compressed = TypeSpecificCompressor::compress_integers(&values).unwrap();
479
480 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
481 assert_eq!(values, decompressed);
482 }
483
484 #[test]
485 fn test_compress_decompress_booleans() {
486 let values = vec![true, false, true, true, false, false, true, false];
487 let compressed = TypeSpecificCompressor::compress_booleans(&values).unwrap();
488
489 assert_eq!(compressed.codec, CompressionCodec::BitVector);
490
491 let decompressed = TypeSpecificCompressor::decompress_booleans(&compressed).unwrap();
492 assert_eq!(values, decompressed);
493 }
494
495 #[test]
496 fn test_compression_ratio() {
497 let values: Vec<u64> = (1000..1100).collect();
499 let compressed = TypeSpecificCompressor::compress_integers(&values).unwrap();
500
501 let ratio = compressed.compression_ratio();
502 assert!(ratio > 5.0, "Expected ratio > 5, got {}", ratio);
503 }
504
505 #[test]
506 fn test_codec_names() {
507 assert_eq!(CompressionCodec::None.name(), "None");
508 assert_eq!(CompressionCodec::Delta.name(), "Delta");
509 assert_eq!(CompressionCodec::BitPacked { bits: 4 }.name(), "BitPacked");
510 assert_eq!(
511 CompressionCodec::DeltaBitPacked { bits: 4 }.name(),
512 "DeltaBitPacked"
513 );
514 assert_eq!(CompressionCodec::Dictionary.name(), "Dictionary");
515 assert_eq!(CompressionCodec::BitVector.name(), "BitVector");
516 assert_eq!(CompressionCodec::RunLength.name(), "RunLength");
517 }
518
519 #[test]
520 fn test_codec_selection_repetitive_integers() {
521 let repetitive: Vec<u64> = vec![1; 100];
523 let codec = CodecSelector::select_for_integers(&repetitive);
524 assert_eq!(codec, CompressionCodec::RunLength);
525
526 let mut mixed = vec![1u64; 30];
528 mixed.extend(vec![2u64; 30]);
529 mixed.extend(vec![3u64; 30]);
530 let codec = CodecSelector::select_for_integers(&mixed);
531 assert_eq!(codec, CompressionCodec::RunLength);
532 }
533
534 #[test]
535 fn test_compress_decompress_runlength() {
536 let values: Vec<u64> = vec![42; 1000];
538 let compressed = TypeSpecificCompressor::compress_integers(&values).unwrap();
539
540 assert_eq!(compressed.codec, CompressionCodec::RunLength);
541 assert!(
542 compressed.compression_ratio() > 50.0,
543 "Expected ratio > 50, got {}",
544 compressed.compression_ratio()
545 );
546
547 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
548 assert_eq!(values, decompressed);
549 }
550
551 #[test]
552 fn test_compress_decompress_mixed_runs() {
553 let mut values = vec![1u64; 100];
555 values.extend(vec![2u64; 100]);
556 values.extend(vec![3u64; 100]);
557
558 let compressed = TypeSpecificCompressor::compress_integers(&values).unwrap();
559
560 assert_eq!(compressed.codec, CompressionCodec::RunLength);
561 assert!(compressed.compression_ratio() > 10.0);
562
563 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
564 assert_eq!(values, decompressed);
565 }
566
567 #[test]
568 fn test_runlength_vs_delta_selection() {
569 let sequential: Vec<u64> = (0..100).collect();
571 let codec = CodecSelector::select_for_integers(&sequential);
572 assert!(matches!(codec, CompressionCodec::DeltaBitPacked { .. }));
573
574 let constant: Vec<u64> = vec![100; 100];
576 let codec = CodecSelector::select_for_integers(&constant);
577 assert_eq!(codec, CompressionCodec::RunLength);
578 }
579}