1use std::io;
18
19use super::bitpack::{BitPackedInts, DeltaBitPacked};
20use super::bitvec::BitVector;
21use super::runlength::{RunLengthAnalyzer, RunLengthEncoding};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25#[non_exhaustive]
26pub enum CompressionCodec {
27 None,
29
30 Delta,
32
33 BitPacked {
35 bits: u8,
37 },
38
39 DeltaBitPacked {
41 bits: u8,
43 },
44
45 Dictionary,
47
48 BitVector,
50
51 RunLength,
53}
54
55impl CompressionCodec {
56 #[must_use]
58 pub fn name(&self) -> &'static str {
59 match self {
60 Self::None => "None",
61 Self::Delta => "Delta",
62 Self::BitPacked { .. } => "BitPacked",
63 Self::DeltaBitPacked { .. } => "DeltaBitPacked",
64 Self::Dictionary => "Dictionary",
65 Self::BitVector => "BitVector",
66 Self::RunLength => "RunLength",
67 }
68 }
69
70 #[must_use]
72 pub fn is_lossless(&self) -> bool {
73 true
75 }
76}
77
78#[derive(Debug, Clone)]
83pub struct CompressedData {
84 pub codec: CompressionCodec,
86 pub uncompressed_size: usize,
88 pub data: Vec<u8>,
90 pub metadata: CompressionMetadata,
92}
93
94#[derive(Debug, Clone)]
96#[non_exhaustive]
97pub enum CompressionMetadata {
98 None,
100 Delta {
102 base: i64,
104 },
105 BitPacked {
107 count: usize,
109 },
110 DeltaBitPacked {
112 base: i64,
114 count: usize,
116 },
117 Dictionary {
119 dict_id: u32,
121 },
122 RunLength {
124 run_count: usize,
126 },
127}
128
129impl CompressedData {
130 pub fn uncompressed(data: Vec<u8>) -> Self {
132 let size = data.len();
133 Self {
134 codec: CompressionCodec::None,
135 uncompressed_size: size,
136 data,
137 metadata: CompressionMetadata::None,
138 }
139 }
140
141 #[must_use]
143 pub fn compression_ratio(&self) -> f64 {
144 if self.data.is_empty() {
145 return 1.0;
146 }
147 self.uncompressed_size as f64 / self.data.len() as f64
148 }
149
150 #[must_use]
152 pub fn is_compressed(&self) -> bool {
153 !matches!(self.codec, CompressionCodec::None)
154 }
155}
156
157pub struct CodecSelector;
163
164impl CodecSelector {
165 #[must_use]
173 pub fn select_for_integers(values: &[u64]) -> CompressionCodec {
174 if values.is_empty() {
175 return CompressionCodec::None;
176 }
177
178 if values.len() < 8 {
179 return CompressionCodec::None;
181 }
182
183 let rle_ratio = RunLengthAnalyzer::estimate_ratio(values);
185 let avg_run_length = RunLengthAnalyzer::average_run_length(values);
186
187 if avg_run_length > 2.0 && rle_ratio > 1.5 {
189 return CompressionCodec::RunLength;
190 }
191
192 let is_sorted = values.windows(2).all(|w| w[0] <= w[1]);
194
195 if is_sorted {
196 let deltas: Vec<u64> = values.windows(2).map(|w| w[1] - w[0]).collect();
198 let max_delta = deltas.iter().copied().max().unwrap_or(0);
199 let bits_needed = BitPackedInts::bits_needed(max_delta);
200
201 let delta_ratio = 64.0 / bits_needed as f64;
203
204 if rle_ratio > delta_ratio && rle_ratio > 1.0 {
206 return CompressionCodec::RunLength;
207 }
208
209 return CompressionCodec::DeltaBitPacked { bits: bits_needed };
210 }
211
212 let max_value = values.iter().copied().max().unwrap_or(0);
214 let bits_needed = BitPackedInts::bits_needed(max_value);
215
216 let bitpack_ratio = if bits_needed > 0 {
218 64.0 / bits_needed as f64
219 } else {
220 1.0
221 };
222
223 if rle_ratio > bitpack_ratio && rle_ratio > 1.0 {
225 return CompressionCodec::RunLength;
226 }
227
228 if bits_needed < 32 {
229 CompressionCodec::BitPacked { bits: bits_needed }
230 } else {
231 CompressionCodec::None
232 }
233 }
234
235 #[must_use]
237 pub fn select_for_strings(values: &[&str]) -> CompressionCodec {
238 if values.is_empty() || values.len() < 4 {
239 return CompressionCodec::None;
240 }
241
242 let unique: std::collections::HashSet<_> = values.iter().collect();
244 let cardinality_ratio = unique.len() as f64 / values.len() as f64;
245
246 if cardinality_ratio < 0.5 {
248 CompressionCodec::Dictionary
249 } else {
250 CompressionCodec::None
251 }
252 }
253
254 #[must_use]
256 pub fn select_for_booleans(_values: &[bool]) -> CompressionCodec {
257 CompressionCodec::BitVector
259 }
260}
261
262pub struct TypeSpecificCompressor;
267
268impl TypeSpecificCompressor {
269 pub fn compress_integers(values: &[u64]) -> CompressedData {
271 let codec = CodecSelector::select_for_integers(values);
272
273 match codec {
274 CompressionCodec::None => {
275 let mut data = Vec::with_capacity(values.len() * 8);
276 for &v in values {
277 data.extend_from_slice(&v.to_le_bytes());
278 }
279 CompressedData {
280 codec,
281 uncompressed_size: values.len() * 8,
282 data,
283 metadata: CompressionMetadata::None,
284 }
285 }
286 CompressionCodec::DeltaBitPacked { bits } => {
287 let encoded = DeltaBitPacked::encode(values);
288 CompressedData {
289 codec: CompressionCodec::DeltaBitPacked { bits },
290 uncompressed_size: values.len() * 8,
291 data: encoded.to_bytes(),
292 metadata: CompressionMetadata::DeltaBitPacked {
293 base: encoded.base() as i64,
294 count: values.len(),
295 },
296 }
297 }
298 CompressionCodec::BitPacked { bits } => {
299 let packed = BitPackedInts::pack(values);
300 CompressedData {
301 codec: CompressionCodec::BitPacked { bits },
302 uncompressed_size: values.len() * 8,
303 data: packed.to_bytes(),
304 metadata: CompressionMetadata::BitPacked {
305 count: values.len(),
306 },
307 }
308 }
309 CompressionCodec::RunLength => {
310 let encoded = RunLengthEncoding::encode(values);
311 CompressedData {
312 codec: CompressionCodec::RunLength,
313 uncompressed_size: values.len() * 8,
314 data: encoded.to_bytes(),
315 metadata: CompressionMetadata::RunLength {
316 run_count: encoded.run_count(),
317 },
318 }
319 }
320 _ => unreachable!("Unexpected codec for integers"),
321 }
322 }
323
324 pub fn compress_signed_integers(values: &[i64]) -> CompressedData {
326 let zigzag: Vec<u64> = values
328 .iter()
329 .map(|&v| super::delta::zigzag_encode(v))
330 .collect();
331 Self::compress_integers(&zigzag)
332 }
333
334 pub fn compress_booleans(values: &[bool]) -> CompressedData {
336 let bitvec = BitVector::from_bools(values);
337 CompressedData {
338 codec: CompressionCodec::BitVector,
339 uncompressed_size: values.len(),
340 data: bitvec.to_bytes(),
341 metadata: CompressionMetadata::BitPacked {
342 count: values.len(),
343 },
344 }
345 }
346
347 pub fn decompress_integers(data: &CompressedData) -> io::Result<Vec<u64>> {
358 match data.codec {
359 CompressionCodec::None => {
360 let mut values = Vec::with_capacity(data.data.len() / 8);
361 for chunk in data.data.chunks_exact(8) {
362 values.push(u64::from_le_bytes(
363 chunk
364 .try_into()
365 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
366 ));
367 }
368 Ok(values)
369 }
370 CompressionCodec::DeltaBitPacked { .. } => {
371 let encoded = DeltaBitPacked::from_bytes(&data.data)?;
372 Ok(encoded.decode())
373 }
374 CompressionCodec::BitPacked { .. } => {
375 let packed = BitPackedInts::from_bytes(&data.data)?;
376 Ok(packed.unpack())
377 }
378 CompressionCodec::RunLength => {
379 let encoded = RunLengthEncoding::from_bytes(&data.data)?;
380 Ok(encoded.decode())
381 }
382 _ => Err(io::Error::new(
383 io::ErrorKind::InvalidData,
384 "Invalid codec for integer decompression",
385 )),
386 }
387 }
388
389 pub fn decompress_booleans(data: &CompressedData) -> io::Result<Vec<bool>> {
395 match data.codec {
396 CompressionCodec::BitVector => {
397 let bitvec = BitVector::from_bytes(&data.data)?;
398 Ok(bitvec.to_bools())
399 }
400 _ => Err(io::Error::new(
401 io::ErrorKind::InvalidData,
402 "Invalid codec for boolean decompression",
403 )),
404 }
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411
412 #[test]
413 fn test_codec_selection_sorted_integers() {
414 let sorted: Vec<u64> = (0..100).collect();
415 let codec = CodecSelector::select_for_integers(&sorted);
416 assert!(matches!(codec, CompressionCodec::DeltaBitPacked { .. }));
417 }
418
419 #[test]
420 fn test_codec_selection_small_integers() {
421 let small: Vec<u64> = vec![1, 5, 3, 7, 2, 4, 6, 8];
422 let codec = CodecSelector::select_for_integers(&small);
423 assert!(matches!(codec, CompressionCodec::BitPacked { .. }));
424 }
425
426 #[test]
427 fn test_codec_selection_strings() {
428 let repeated = vec!["a", "b", "a", "a", "b", "a", "c", "a"];
429 let codec = CodecSelector::select_for_strings(&repeated);
430 assert_eq!(codec, CompressionCodec::Dictionary);
431
432 let unique = vec!["a", "b", "c", "d", "e", "f", "g", "h"];
433 let codec = CodecSelector::select_for_strings(&unique);
434 assert_eq!(codec, CompressionCodec::None);
435 }
436
437 #[test]
438 fn test_codec_selection_booleans() {
439 let bools = vec![true, false, true];
440 let codec = CodecSelector::select_for_booleans(&bools);
441 assert_eq!(codec, CompressionCodec::BitVector);
442 }
443
444 #[test]
445 fn test_compress_decompress_sorted_integers() {
446 let values: Vec<u64> = (100..200).collect();
447 let compressed = TypeSpecificCompressor::compress_integers(&values);
448
449 assert!(matches!(
450 compressed.codec,
451 CompressionCodec::DeltaBitPacked { .. }
452 ));
453 assert!(compressed.compression_ratio() > 1.0);
454
455 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
456 assert_eq!(values, decompressed);
457 }
458
459 #[test]
460 fn test_compress_decompress_small_integers() {
461 let values: Vec<u64> = vec![5, 2, 7, 1, 9, 3, 8, 4, 6, 0];
462 let compressed = TypeSpecificCompressor::compress_integers(&values);
463
464 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
465 assert_eq!(values, decompressed);
466 }
467
468 #[test]
469 fn test_compress_decompress_booleans() {
470 let values = vec![true, false, true, true, false, false, true, false];
471 let compressed = TypeSpecificCompressor::compress_booleans(&values);
472
473 assert_eq!(compressed.codec, CompressionCodec::BitVector);
474
475 let decompressed = TypeSpecificCompressor::decompress_booleans(&compressed).unwrap();
476 assert_eq!(values, decompressed);
477 }
478
479 #[test]
480 fn test_compression_ratio() {
481 let values: Vec<u64> = (1000..1100).collect();
483 let compressed = TypeSpecificCompressor::compress_integers(&values);
484
485 let ratio = compressed.compression_ratio();
486 assert!(ratio > 5.0, "Expected ratio > 5, got {}", ratio);
487 }
488
489 #[test]
490 fn test_codec_names() {
491 assert_eq!(CompressionCodec::None.name(), "None");
492 assert_eq!(CompressionCodec::Delta.name(), "Delta");
493 assert_eq!(CompressionCodec::BitPacked { bits: 4 }.name(), "BitPacked");
494 assert_eq!(
495 CompressionCodec::DeltaBitPacked { bits: 4 }.name(),
496 "DeltaBitPacked"
497 );
498 assert_eq!(CompressionCodec::Dictionary.name(), "Dictionary");
499 assert_eq!(CompressionCodec::BitVector.name(), "BitVector");
500 assert_eq!(CompressionCodec::RunLength.name(), "RunLength");
501 }
502
503 #[test]
504 fn test_codec_selection_repetitive_integers() {
505 let repetitive: Vec<u64> = vec![1; 100];
507 let codec = CodecSelector::select_for_integers(&repetitive);
508 assert_eq!(codec, CompressionCodec::RunLength);
509
510 let mut mixed = vec![1u64; 30];
512 mixed.extend(vec![2u64; 30]);
513 mixed.extend(vec![3u64; 30]);
514 let codec = CodecSelector::select_for_integers(&mixed);
515 assert_eq!(codec, CompressionCodec::RunLength);
516 }
517
518 #[test]
519 fn test_compress_decompress_runlength() {
520 let values: Vec<u64> = vec![42; 1000];
522 let compressed = TypeSpecificCompressor::compress_integers(&values);
523
524 assert_eq!(compressed.codec, CompressionCodec::RunLength);
525 assert!(
526 compressed.compression_ratio() > 50.0,
527 "Expected ratio > 50, got {}",
528 compressed.compression_ratio()
529 );
530
531 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
532 assert_eq!(values, decompressed);
533 }
534
535 #[test]
536 fn test_compress_decompress_mixed_runs() {
537 let mut values = vec![1u64; 100];
539 values.extend(vec![2u64; 100]);
540 values.extend(vec![3u64; 100]);
541
542 let compressed = TypeSpecificCompressor::compress_integers(&values);
543
544 assert_eq!(compressed.codec, CompressionCodec::RunLength);
545 assert!(compressed.compression_ratio() > 10.0);
546
547 let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
548 assert_eq!(values, decompressed);
549 }
550
551 #[test]
552 fn test_runlength_vs_delta_selection() {
553 let sequential: Vec<u64> = (0..100).collect();
555 let codec = CodecSelector::select_for_integers(&sequential);
556 assert!(matches!(codec, CompressionCodec::DeltaBitPacked { .. }));
557
558 let constant: Vec<u64> = vec![100; 100];
560 let codec = CodecSelector::select_for_integers(&constant);
561 assert_eq!(codec, CompressionCodec::RunLength);
562 }
563}