1use std::io;
18
19use super::bitpack::{BitPackedInts, DeltaBitPacked};
20use super::bitvec::BitVector;
21use super::runlength::{RunLengthAnalyzer, RunLengthEncoding};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub enum CompressionCodec {
26 None,
28
29 Delta,
31
32 BitPacked {
34 bits: u8,
36 },
37
38 DeltaBitPacked {
40 bits: u8,
42 },
43
44 Dictionary,
46
47 BitVector,
49
50 RunLength,
52}
53
54impl CompressionCodec {
55 #[must_use]
57 pub fn name(&self) -> &'static str {
58 match self {
59 Self::None => "None",
60 Self::Delta => "Delta",
61 Self::BitPacked { .. } => "BitPacked",
62 Self::DeltaBitPacked { .. } => "DeltaBitPacked",
63 Self::Dictionary => "Dictionary",
64 Self::BitVector => "BitVector",
65 Self::RunLength => "RunLength",
66 }
67 }
68
69 #[must_use]
71 pub fn is_lossless(&self) -> bool {
72 true
74 }
75}
76
77#[derive(Debug, Clone)]
82pub struct CompressedData {
83 pub codec: CompressionCodec,
85 pub uncompressed_size: usize,
87 pub data: Vec<u8>,
89 pub metadata: CompressionMetadata,
91}
92
93#[derive(Debug, Clone)]
95pub enum CompressionMetadata {
96 None,
98 Delta {
100 base: i64,
102 },
103 BitPacked {
105 count: usize,
107 },
108 DeltaBitPacked {
110 base: i64,
112 count: usize,
114 },
115 Dictionary {
117 dict_id: u32,
119 },
120 RunLength {
122 run_count: usize,
124 },
125}
126
127impl CompressedData {
128 pub fn uncompressed(data: Vec<u8>) -> Self {
130 let size = data.len();
131 Self {
132 codec: CompressionCodec::None,
133 uncompressed_size: size,
134 data,
135 metadata: CompressionMetadata::None,
136 }
137 }
138
139 #[must_use]
141 pub fn compression_ratio(&self) -> f64 {
142 if self.data.is_empty() {
143 return 1.0;
144 }
145 self.uncompressed_size as f64 / self.data.len() as f64
146 }
147
148 #[must_use]
150 pub fn is_compressed(&self) -> bool {
151 !matches!(self.codec, CompressionCodec::None)
152 }
153}
154
155pub struct CodecSelector;
161
162impl CodecSelector {
163 #[must_use]
171 pub fn select_for_integers(values: &[u64]) -> CompressionCodec {
172 if values.is_empty() {
173 return CompressionCodec::None;
174 }
175
176 if values.len() < 8 {
177 return CompressionCodec::None;
179 }
180
181 let rle_ratio = RunLengthAnalyzer::estimate_ratio(values);
183 let avg_run_length = RunLengthAnalyzer::average_run_length(values);
184
185 if avg_run_length > 2.0 && rle_ratio > 1.5 {
187 return CompressionCodec::RunLength;
188 }
189
190 let is_sorted = values.windows(2).all(|w| w[0] <= w[1]);
192
193 if is_sorted {
194 let deltas: Vec<u64> = values.windows(2).map(|w| w[1] - w[0]).collect();
196 let max_delta = deltas.iter().copied().max().unwrap_or(0);
197 let bits_needed = BitPackedInts::bits_needed(max_delta);
198
199 let delta_ratio = 64.0 / bits_needed as f64;
201
202 if rle_ratio > delta_ratio && rle_ratio > 1.0 {
204 return CompressionCodec::RunLength;
205 }
206
207 return CompressionCodec::DeltaBitPacked { bits: bits_needed };
208 }
209
210 let max_value = values.iter().copied().max().unwrap_or(0);
212 let bits_needed = BitPackedInts::bits_needed(max_value);
213
214 let bitpack_ratio = if bits_needed > 0 {
216 64.0 / bits_needed as f64
217 } else {
218 1.0
219 };
220
221 if rle_ratio > bitpack_ratio && rle_ratio > 1.0 {
223 return CompressionCodec::RunLength;
224 }
225
226 if bits_needed < 32 {
227 CompressionCodec::BitPacked { bits: bits_needed }
228 } else {
229 CompressionCodec::None
230 }
231 }
232
233 #[must_use]
235 pub fn select_for_strings(values: &[&str]) -> CompressionCodec {
236 if values.is_empty() || values.len() < 4 {
237 return CompressionCodec::None;
238 }
239
240 let unique: std::collections::HashSet<_> = values.iter().collect();
242 let cardinality_ratio = unique.len() as f64 / values.len() as f64;
243
244 if cardinality_ratio < 0.5 {
246 CompressionCodec::Dictionary
247 } else {
248 CompressionCodec::None
249 }
250 }
251
252 #[must_use]
254 pub fn select_for_booleans(_values: &[bool]) -> CompressionCodec {
255 CompressionCodec::BitVector
257 }
258}
259
260pub struct TypeSpecificCompressor;
265
266impl TypeSpecificCompressor {
267 pub fn compress_integers(values: &[u64]) -> CompressedData {
269 let codec = CodecSelector::select_for_integers(values);
270
271 match codec {
272 CompressionCodec::None => {
273 let mut data = Vec::with_capacity(values.len() * 8);
274 for &v in values {
275 data.extend_from_slice(&v.to_le_bytes());
276 }
277 CompressedData {
278 codec,
279 uncompressed_size: values.len() * 8,
280 data,
281 metadata: CompressionMetadata::None,
282 }
283 }
284 CompressionCodec::DeltaBitPacked { bits } => {
285 let encoded = DeltaBitPacked::encode(values);
286 CompressedData {
287 codec: CompressionCodec::DeltaBitPacked { bits },
288 uncompressed_size: values.len() * 8,
289 data: encoded.to_bytes(),
290 metadata: CompressionMetadata::DeltaBitPacked {
291 base: encoded.base() as i64,
292 count: values.len(),
293 },
294 }
295 }
296 CompressionCodec::BitPacked { bits } => {
297 let packed = BitPackedInts::pack(values);
298 CompressedData {
299 codec: CompressionCodec::BitPacked { bits },
300 uncompressed_size: values.len() * 8,
301 data: packed.to_bytes(),
302 metadata: CompressionMetadata::BitPacked {
303 count: values.len(),
304 },
305 }
306 }
307 CompressionCodec::RunLength => {
308 let encoded = RunLengthEncoding::encode(values);
309 CompressedData {
310 codec: CompressionCodec::RunLength,
311 uncompressed_size: values.len() * 8,
312 data: encoded.to_bytes(),
313 metadata: CompressionMetadata::RunLength {
314 run_count: encoded.run_count(),
315 },
316 }
317 }
318 _ => unreachable!("Unexpected codec for integers"),
319 }
320 }
321
322 pub fn compress_signed_integers(values: &[i64]) -> CompressedData {
324 let zigzag: Vec<u64> = values
326 .iter()
327 .map(|&v| super::delta::zigzag_encode(v))
328 .collect();
329 Self::compress_integers(&zigzag)
330 }
331
332 pub fn compress_booleans(values: &[bool]) -> CompressedData {
334 let bitvec = BitVector::from_bools(values);
335 CompressedData {
336 codec: CompressionCodec::BitVector,
337 uncompressed_size: values.len(),
338 data: bitvec.to_bytes(),
339 metadata: CompressionMetadata::BitPacked {
340 count: values.len(),
341 },
342 }
343 }
344
345 pub fn decompress_integers(data: &CompressedData) -> io::Result<Vec<u64>> {
347 match data.codec {
348 CompressionCodec::None => {
349 let mut values = Vec::with_capacity(data.data.len() / 8);
350 for chunk in data.data.chunks_exact(8) {
351 values.push(u64::from_le_bytes(
352 chunk
353 .try_into()
354 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
355 ));
356 }
357 Ok(values)
358 }
359 CompressionCodec::DeltaBitPacked { .. } => {
360 let encoded = DeltaBitPacked::from_bytes(&data.data)?;
361 Ok(encoded.decode())
362 }
363 CompressionCodec::BitPacked { .. } => {
364 let packed = BitPackedInts::from_bytes(&data.data)?;
365 Ok(packed.unpack())
366 }
367 CompressionCodec::RunLength => {
368 let encoded = RunLengthEncoding::from_bytes(&data.data)?;
369 Ok(encoded.decode())
370 }
371 _ => Err(io::Error::new(
372 io::ErrorKind::InvalidData,
373 "Invalid codec for integer decompression",
374 )),
375 }
376 }
377
378 pub fn decompress_booleans(data: &CompressedData) -> io::Result<Vec<bool>> {
380 match data.codec {
381 CompressionCodec::BitVector => {
382 let bitvec = BitVector::from_bytes(&data.data)?;
383 Ok(bitvec.to_bools())
384 }
385 _ => Err(io::Error::new(
386 io::ErrorKind::InvalidData,
387 "Invalid codec for boolean decompression",
388 )),
389 }
390 }
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396
397 #[test]
398 fn test_codec_selection_sorted_integers() {
399 let sorted: Vec<u64> = (0..100).collect();
400 let codec = CodecSelector::select_for_integers(&sorted);
401 assert!(matches!(codec, CompressionCodec::DeltaBitPacked { .. }));
402 }
403
404 #[test]
405 fn test_codec_selection_small_integers() {
406 let small: Vec<u64> = vec![1, 5, 3, 7, 2, 4, 6, 8];
407 let codec = CodecSelector::select_for_integers(&small);
408 assert!(matches!(codec, CompressionCodec::BitPacked { .. }));
409 }
410
411 #[test]
412 fn test_codec_selection_strings() {
413 let repeated = vec!["a", "b", "a", "a", "b", "a", "c", "a"];
414 let codec = CodecSelector::select_for_strings(&repeated);
415 assert_eq!(codec, CompressionCodec::Dictionary);
416
417 let unique = vec!["a", "b", "c", "d", "e", "f", "g", "h"];
418 let codec = CodecSelector::select_for_strings(&unique);
419 assert_eq!(codec, CompressionCodec::None);
420 }
421
422 #[test]
423 fn test_codec_selection_booleans() {
424 let bools = vec![true, false, true];
425 let codec = CodecSelector::select_for_booleans(&bools);
426 assert_eq!(codec, CompressionCodec::BitVector);
427 }
428
429 #[test]
430 fn test_compress_decompress_sorted_integers() {
431 let values: Vec<u64> = (100..200).collect();
432 let compressed = TypeSpecificCompressor::compress_integers(&values);
433
434 assert!(matches!(
435 compressed.codec,
436 CompressionCodec::DeltaBitPacked { .. }
437 ));
438 assert!(compressed.compression_ratio() > 1.0);
439
440 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
441 assert_eq!(values, decompressed);
442 }
443
444 #[test]
445 fn test_compress_decompress_small_integers() {
446 let values: Vec<u64> = vec![5, 2, 7, 1, 9, 3, 8, 4, 6, 0];
447 let compressed = TypeSpecificCompressor::compress_integers(&values);
448
449 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
450 assert_eq!(values, decompressed);
451 }
452
453 #[test]
454 fn test_compress_decompress_booleans() {
455 let values = vec![true, false, true, true, false, false, true, false];
456 let compressed = TypeSpecificCompressor::compress_booleans(&values);
457
458 assert_eq!(compressed.codec, CompressionCodec::BitVector);
459
460 let decompressed = TypeSpecificCompressor::decompress_booleans(&compressed).unwrap();
461 assert_eq!(values, decompressed);
462 }
463
464 #[test]
465 fn test_compression_ratio() {
466 let values: Vec<u64> = (1000..1100).collect();
468 let compressed = TypeSpecificCompressor::compress_integers(&values);
469
470 let ratio = compressed.compression_ratio();
471 assert!(ratio > 5.0, "Expected ratio > 5, got {}", ratio);
472 }
473
474 #[test]
475 fn test_codec_names() {
476 assert_eq!(CompressionCodec::None.name(), "None");
477 assert_eq!(CompressionCodec::Delta.name(), "Delta");
478 assert_eq!(CompressionCodec::BitPacked { bits: 4 }.name(), "BitPacked");
479 assert_eq!(
480 CompressionCodec::DeltaBitPacked { bits: 4 }.name(),
481 "DeltaBitPacked"
482 );
483 assert_eq!(CompressionCodec::Dictionary.name(), "Dictionary");
484 assert_eq!(CompressionCodec::BitVector.name(), "BitVector");
485 assert_eq!(CompressionCodec::RunLength.name(), "RunLength");
486 }
487
488 #[test]
489 fn test_codec_selection_repetitive_integers() {
490 let repetitive: Vec<u64> = vec![1; 100];
492 let codec = CodecSelector::select_for_integers(&repetitive);
493 assert_eq!(codec, CompressionCodec::RunLength);
494
495 let mut mixed = vec![1u64; 30];
497 mixed.extend(vec![2u64; 30]);
498 mixed.extend(vec![3u64; 30]);
499 let codec = CodecSelector::select_for_integers(&mixed);
500 assert_eq!(codec, CompressionCodec::RunLength);
501 }
502
503 #[test]
504 fn test_compress_decompress_runlength() {
505 let values: Vec<u64> = vec![42; 1000];
507 let compressed = TypeSpecificCompressor::compress_integers(&values);
508
509 assert_eq!(compressed.codec, CompressionCodec::RunLength);
510 assert!(
511 compressed.compression_ratio() > 50.0,
512 "Expected ratio > 50, got {}",
513 compressed.compression_ratio()
514 );
515
516 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
517 assert_eq!(values, decompressed);
518 }
519
520 #[test]
521 fn test_compress_decompress_mixed_runs() {
522 let mut values = vec![1u64; 100];
524 values.extend(vec![2u64; 100]);
525 values.extend(vec![3u64; 100]);
526
527 let compressed = TypeSpecificCompressor::compress_integers(&values);
528
529 assert_eq!(compressed.codec, CompressionCodec::RunLength);
530 assert!(compressed.compression_ratio() > 10.0);
531
532 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
533 assert_eq!(values, decompressed);
534 }
535
536 #[test]
537 fn test_runlength_vs_delta_selection() {
538 let sequential: Vec<u64> = (0..100).collect();
540 let codec = CodecSelector::select_for_integers(&sequential);
541 assert!(matches!(codec, CompressionCodec::DeltaBitPacked { .. }));
542
543 let constant: Vec<u64> = vec![100; 100];
545 let codec = CodecSelector::select_for_integers(&constant);
546 assert_eq!(codec, CompressionCodec::RunLength);
547 }
548}