use std::io;
use super::bitpack::{BitPackedInts, DeltaBitPacked};
use super::bitvec::BitVector;
use super::runlength::{RunLengthAnalyzer, RunLengthEncoding};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum CompressionCodec {
None,
Delta,
BitPacked {
bits: u8,
},
DeltaBitPacked {
bits: u8,
},
Dictionary,
BitVector,
RunLength,
}
impl CompressionCodec {
#[must_use]
pub fn name(&self) -> &'static str {
match self {
Self::None => "None",
Self::Delta => "Delta",
Self::BitPacked { .. } => "BitPacked",
Self::DeltaBitPacked { .. } => "DeltaBitPacked",
Self::Dictionary => "Dictionary",
Self::BitVector => "BitVector",
Self::RunLength => "RunLength",
}
}
#[must_use]
pub fn is_lossless(&self) -> bool {
true
}
}
#[derive(Debug, Clone)]
pub struct CompressedData {
pub codec: CompressionCodec,
pub uncompressed_size: usize,
pub data: Vec<u8>,
pub metadata: CompressionMetadata,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum CompressionMetadata {
None,
Delta {
base: i64,
},
BitPacked {
count: usize,
},
DeltaBitPacked {
base: i64,
count: usize,
},
Dictionary {
dict_id: u32,
},
RunLength {
run_count: usize,
},
}
impl CompressedData {
pub fn uncompressed(data: Vec<u8>) -> Self {
let size = data.len();
Self {
codec: CompressionCodec::None,
uncompressed_size: size,
data,
metadata: CompressionMetadata::None,
}
}
#[must_use]
pub fn compression_ratio(&self) -> f64 {
if self.data.is_empty() {
return 1.0;
}
self.uncompressed_size as f64 / self.data.len() as f64
}
#[must_use]
pub fn is_compressed(&self) -> bool {
!matches!(self.codec, CompressionCodec::None)
}
}
pub struct CodecSelector;
impl CodecSelector {
#[must_use]
pub fn select_for_integers(values: &[u64]) -> CompressionCodec {
if values.is_empty() {
return CompressionCodec::None;
}
if values.len() < 8 {
return CompressionCodec::None;
}
let rle_ratio = RunLengthAnalyzer::estimate_ratio(values);
let avg_run_length = RunLengthAnalyzer::average_run_length(values);
if avg_run_length > 2.0 && rle_ratio > 1.5 {
return CompressionCodec::RunLength;
}
let is_sorted = values.windows(2).all(|w| w[0] <= w[1]);
if is_sorted {
let deltas: Vec<u64> = values.windows(2).map(|w| w[1] - w[0]).collect();
let max_delta = deltas.iter().copied().max().unwrap_or(0);
let bits_needed = BitPackedInts::bits_needed(max_delta);
let delta_ratio = 64.0 / bits_needed as f64;
if rle_ratio > delta_ratio && rle_ratio > 1.0 {
return CompressionCodec::RunLength;
}
return CompressionCodec::DeltaBitPacked { bits: bits_needed };
}
let max_value = values.iter().copied().max().unwrap_or(0);
let bits_needed = BitPackedInts::bits_needed(max_value);
let bitpack_ratio = if bits_needed > 0 {
64.0 / bits_needed as f64
} else {
1.0
};
if rle_ratio > bitpack_ratio && rle_ratio > 1.0 {
return CompressionCodec::RunLength;
}
if bits_needed < 32 {
CompressionCodec::BitPacked { bits: bits_needed }
} else {
CompressionCodec::None
}
}
#[must_use]
pub fn select_for_strings(values: &[&str]) -> CompressionCodec {
if values.is_empty() || values.len() < 4 {
return CompressionCodec::None;
}
let unique: std::collections::HashSet<_> = values.iter().collect();
let cardinality_ratio = unique.len() as f64 / values.len() as f64;
if cardinality_ratio < 0.5 {
CompressionCodec::Dictionary
} else {
CompressionCodec::None
}
}
#[must_use]
pub fn select_for_booleans(_values: &[bool]) -> CompressionCodec {
CompressionCodec::BitVector
}
}
pub struct TypeSpecificCompressor;
impl TypeSpecificCompressor {
pub fn compress_integers(values: &[u64]) -> io::Result<CompressedData> {
let codec = CodecSelector::select_for_integers(values);
match codec {
CompressionCodec::None => {
let mut data = Vec::with_capacity(values.len() * 8);
for &v in values {
data.extend_from_slice(&v.to_le_bytes());
}
Ok(CompressedData {
codec,
uncompressed_size: values.len() * 8,
data,
metadata: CompressionMetadata::None,
})
}
CompressionCodec::DeltaBitPacked { bits } => {
let encoded = DeltaBitPacked::encode(values);
Ok(CompressedData {
codec: CompressionCodec::DeltaBitPacked { bits },
uncompressed_size: values.len() * 8,
data: encoded.to_bytes()?,
metadata: CompressionMetadata::DeltaBitPacked {
#[allow(clippy::cast_possible_wrap)]
base: encoded.base() as i64,
count: values.len(),
},
})
}
CompressionCodec::BitPacked { bits } => {
let packed = BitPackedInts::pack(values);
Ok(CompressedData {
codec: CompressionCodec::BitPacked { bits },
uncompressed_size: values.len() * 8,
data: packed.to_bytes()?,
metadata: CompressionMetadata::BitPacked {
count: values.len(),
},
})
}
CompressionCodec::RunLength => {
let encoded = RunLengthEncoding::encode(values);
Ok(CompressedData {
codec: CompressionCodec::RunLength,
uncompressed_size: values.len() * 8,
data: encoded.to_bytes(),
metadata: CompressionMetadata::RunLength {
run_count: encoded.run_count(),
},
})
}
_ => unreachable!("Unexpected codec for integers"),
}
}
pub fn compress_signed_integers(values: &[i64]) -> io::Result<CompressedData> {
let zigzag: Vec<u64> = values
.iter()
.map(|&v| super::delta::zigzag_encode(v))
.collect();
Self::compress_integers(&zigzag)
}
pub fn compress_booleans(values: &[bool]) -> io::Result<CompressedData> {
let bitvec = BitVector::from_bools(values);
Ok(CompressedData {
codec: CompressionCodec::BitVector,
uncompressed_size: values.len(),
data: bitvec.to_bytes()?,
metadata: CompressionMetadata::BitPacked {
count: values.len(),
},
})
}
pub fn decompress_integers(data: &CompressedData) -> io::Result<Vec<u64>> {
match data.codec {
CompressionCodec::None => {
let mut values = Vec::with_capacity(data.data.len() / 8);
for chunk in data.data.chunks_exact(8) {
values.push(u64::from_le_bytes(
chunk
.try_into()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
));
}
Ok(values)
}
CompressionCodec::DeltaBitPacked { .. } => {
let encoded = DeltaBitPacked::from_bytes(&data.data)?;
Ok(encoded.decode())
}
CompressionCodec::BitPacked { .. } => {
let packed = BitPackedInts::from_bytes(&data.data)?;
Ok(packed.unpack())
}
CompressionCodec::RunLength => {
let encoded = RunLengthEncoding::from_bytes(&data.data)?;
Ok(encoded.decode())
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid codec for integer decompression",
)),
}
}
pub fn decompress_booleans(data: &CompressedData) -> io::Result<Vec<bool>> {
match data.codec {
CompressionCodec::BitVector => {
let bitvec = BitVector::from_bytes(&data.data)?;
Ok(bitvec.to_bools())
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid codec for boolean decompression",
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_codec_selection_sorted_integers() {
let sorted: Vec<u64> = (0..100).collect();
let codec = CodecSelector::select_for_integers(&sorted);
assert!(matches!(codec, CompressionCodec::DeltaBitPacked { .. }));
}
#[test]
fn test_codec_selection_small_integers() {
let small: Vec<u64> = vec![1, 5, 3, 7, 2, 4, 6, 8];
let codec = CodecSelector::select_for_integers(&small);
assert!(matches!(codec, CompressionCodec::BitPacked { .. }));
}
#[test]
fn test_codec_selection_strings() {
let repeated = vec!["a", "b", "a", "a", "b", "a", "c", "a"];
let codec = CodecSelector::select_for_strings(&repeated);
assert_eq!(codec, CompressionCodec::Dictionary);
let unique = vec!["a", "b", "c", "d", "e", "f", "g", "h"];
let codec = CodecSelector::select_for_strings(&unique);
assert_eq!(codec, CompressionCodec::None);
}
#[test]
fn test_codec_selection_booleans() {
let bools = vec![true, false, true];
let codec = CodecSelector::select_for_booleans(&bools);
assert_eq!(codec, CompressionCodec::BitVector);
}
#[test]
fn test_compress_decompress_sorted_integers() {
let values: Vec<u64> = (100..200).collect();
let compressed = TypeSpecificCompressor::compress_integers(&values).unwrap();
assert!(matches!(
compressed.codec,
CompressionCodec::DeltaBitPacked { .. }
));
assert!(compressed.compression_ratio() > 1.0);
let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
assert_eq!(values, decompressed);
}
#[test]
fn test_compress_decompress_small_integers() {
let values: Vec<u64> = vec![5, 2, 7, 1, 9, 3, 8, 4, 6, 0];
let compressed = TypeSpecificCompressor::compress_integers(&values).unwrap();
let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
assert_eq!(values, decompressed);
}
#[test]
fn test_compress_decompress_booleans() {
let values = vec![true, false, true, true, false, false, true, false];
let compressed = TypeSpecificCompressor::compress_booleans(&values).unwrap();
assert_eq!(compressed.codec, CompressionCodec::BitVector);
let decompressed = TypeSpecificCompressor::decompress_booleans(&compressed).unwrap();
assert_eq!(values, decompressed);
}
#[test]
fn test_compression_ratio() {
let values: Vec<u64> = (1000..1100).collect();
let compressed = TypeSpecificCompressor::compress_integers(&values).unwrap();
let ratio = compressed.compression_ratio();
assert!(ratio > 5.0, "Expected ratio > 5, got {}", ratio);
}
#[test]
fn test_codec_names() {
assert_eq!(CompressionCodec::None.name(), "None");
assert_eq!(CompressionCodec::Delta.name(), "Delta");
assert_eq!(CompressionCodec::BitPacked { bits: 4 }.name(), "BitPacked");
assert_eq!(
CompressionCodec::DeltaBitPacked { bits: 4 }.name(),
"DeltaBitPacked"
);
assert_eq!(CompressionCodec::Dictionary.name(), "Dictionary");
assert_eq!(CompressionCodec::BitVector.name(), "BitVector");
assert_eq!(CompressionCodec::RunLength.name(), "RunLength");
}
#[test]
fn test_codec_selection_repetitive_integers() {
let repetitive: Vec<u64> = vec![1; 100];
let codec = CodecSelector::select_for_integers(&repetitive);
assert_eq!(codec, CompressionCodec::RunLength);
let mut mixed = vec![1u64; 30];
mixed.extend(vec![2u64; 30]);
mixed.extend(vec![3u64; 30]);
let codec = CodecSelector::select_for_integers(&mixed);
assert_eq!(codec, CompressionCodec::RunLength);
}
#[test]
fn test_compress_decompress_runlength() {
let values: Vec<u64> = vec![42; 1000];
let compressed = TypeSpecificCompressor::compress_integers(&values).unwrap();
assert_eq!(compressed.codec, CompressionCodec::RunLength);
assert!(
compressed.compression_ratio() > 50.0,
"Expected ratio > 50, got {}",
compressed.compression_ratio()
);
let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
assert_eq!(values, decompressed);
}
#[test]
fn test_compress_decompress_mixed_runs() {
let mut values = vec![1u64; 100];
values.extend(vec![2u64; 100]);
values.extend(vec![3u64; 100]);
let compressed = TypeSpecificCompressor::compress_integers(&values).unwrap();
assert_eq!(compressed.codec, CompressionCodec::RunLength);
assert!(compressed.compression_ratio() > 10.0);
let decompressed = TypeSpecificCompressor::decompress_integers(&compressed).unwrap();
assert_eq!(values, decompressed);
}
#[test]
fn test_runlength_vs_delta_selection() {
let sequential: Vec<u64> = (0..100).collect();
let codec = CodecSelector::select_for_integers(&sequential);
assert!(matches!(codec, CompressionCodec::DeltaBitPacked { .. }));
let constant: Vec<u64> = vec![100; 100];
let codec = CodecSelector::select_for_integers(&constant);
assert_eq!(codec, CompressionCodec::RunLength);
}
}