use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::{self, Read};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[repr(u8)]
pub enum EncodingType {
#[default]
Raw = 0,
Dictionary = 1,
Rle = 2,
Delta = 3,
Lz4 = 4,
Zstd = 5,
DictionaryLz4 = 6,
DeltaLz4 = 7,
}
impl EncodingType {
pub fn from_byte(b: u8) -> Option<Self> {
match b {
0 => Some(Self::Raw),
1 => Some(Self::Dictionary),
2 => Some(Self::Rle),
3 => Some(Self::Delta),
4 => Some(Self::Lz4),
5 => Some(Self::Zstd),
6 => Some(Self::DictionaryLz4),
7 => Some(Self::DeltaLz4),
_ => None,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct EncodingStats {
pub original_size: usize,
pub compressed_size: usize,
pub encoding: EncodingType,
pub cardinality: usize,
pub row_count: usize,
pub is_sorted: bool,
pub ratio: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DictionaryEncoder {
value_to_idx: HashMap<Vec<u8>, u32>,
idx_to_value: Vec<Vec<u8>>,
}
impl DictionaryEncoder {
pub fn new() -> Self {
Self {
value_to_idx: HashMap::new(),
idx_to_value: Vec::new(),
}
}
pub fn build(values: &[Vec<u8>]) -> Self {
let mut encoder = Self::new();
for value in values {
encoder.add_value(value);
}
encoder
}
pub fn add_value(&mut self, value: &[u8]) -> u32 {
if let Some(&idx) = self.value_to_idx.get(value) {
idx
} else {
let idx = self.idx_to_value.len() as u32;
self.value_to_idx.insert(value.to_vec(), idx);
self.idx_to_value.push(value.to_vec());
idx
}
}
pub fn encode(&self, value: &[u8]) -> Option<u32> {
self.value_to_idx.get(value).copied()
}
pub fn decode(&self, idx: u32) -> Option<&[u8]> {
self.idx_to_value.get(idx as usize).map(|v| v.as_slice())
}
pub fn size(&self) -> usize {
self.idx_to_value.len()
}
pub fn encode_column(&self, values: &[Vec<u8>]) -> Vec<u8> {
let mut encoded = Vec::with_capacity(values.len() * 4);
encoded
.write_u32::<LittleEndian>(self.idx_to_value.len() as u32)
.unwrap();
for value in &self.idx_to_value {
encoded
.write_u32::<LittleEndian>(value.len() as u32)
.unwrap();
encoded.extend_from_slice(value);
}
encoded
.write_u64::<LittleEndian>(values.len() as u64)
.unwrap();
for value in values {
if let Some(idx) = self.encode(value) {
encoded.write_u32::<LittleEndian>(idx).unwrap();
}
}
encoded
}
pub fn decode_column(data: &[u8]) -> io::Result<Vec<Vec<u8>>> {
let mut cursor = std::io::Cursor::new(data);
let dict_size = cursor.read_u32::<LittleEndian>()? as usize;
let mut dictionary = Vec::with_capacity(dict_size);
for _ in 0..dict_size {
let len = cursor.read_u32::<LittleEndian>()? as usize;
let mut value = vec![0u8; len];
cursor.read_exact(&mut value)?;
dictionary.push(value);
}
let count = cursor.read_u64::<LittleEndian>()? as usize;
let mut values = Vec::with_capacity(count);
for _ in 0..count {
let idx = cursor.read_u32::<LittleEndian>()? as usize;
if idx < dictionary.len() {
values.push(dictionary[idx].clone());
}
}
Ok(values)
}
}
impl Default for DictionaryEncoder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct RleEncoder;
impl RleEncoder {
pub fn encode(values: &[Vec<u8>]) -> Vec<u8> {
let mut encoded = Vec::new();
encoded
.write_u64::<LittleEndian>(values.len() as u64)
.unwrap();
if values.is_empty() {
return encoded;
}
let mut current = &values[0];
let mut count: u64 = 1;
for value in values.iter().skip(1) {
if value == current {
count += 1;
} else {
encoded.write_u64::<LittleEndian>(count).unwrap();
encoded
.write_u32::<LittleEndian>(current.len() as u32)
.unwrap();
encoded.extend_from_slice(current);
current = value;
count = 1;
}
}
encoded.write_u64::<LittleEndian>(count).unwrap();
encoded
.write_u32::<LittleEndian>(current.len() as u32)
.unwrap();
encoded.extend_from_slice(current);
encoded
}
pub fn decode(data: &[u8]) -> io::Result<Vec<Vec<u8>>> {
let mut cursor = std::io::Cursor::new(data);
let total_count = cursor.read_u64::<LittleEndian>()? as usize;
let mut values = Vec::with_capacity(total_count);
while values.len() < total_count {
let run_length = cursor.read_u64::<LittleEndian>()? as usize;
let value_len = cursor.read_u32::<LittleEndian>()? as usize;
let mut value = vec![0u8; value_len];
cursor.read_exact(&mut value)?;
for _ in 0..run_length {
values.push(value.clone());
}
}
Ok(values)
}
}
#[derive(Debug, Clone)]
pub struct DeltaEncoder;
impl DeltaEncoder {
pub fn encode_i64(values: &[i64]) -> Vec<u8> {
let mut encoded = Vec::with_capacity(values.len() * 2);
encoded
.write_u64::<LittleEndian>(values.len() as u64)
.unwrap();
if values.is_empty() {
return encoded;
}
encoded.write_i64::<LittleEndian>(values[0]).unwrap();
for window in values.windows(2) {
let delta = window[1] - window[0];
Self::write_varint(&mut encoded, delta);
}
encoded
}
pub fn decode_i64(data: &[u8]) -> io::Result<Vec<i64>> {
let mut cursor = std::io::Cursor::new(data);
let count = cursor.read_u64::<LittleEndian>()? as usize;
if count == 0 {
return Ok(Vec::new());
}
let mut values = Vec::with_capacity(count);
let base = cursor.read_i64::<LittleEndian>()?;
values.push(base);
let mut current = base;
for _ in 1..count {
let delta = Self::read_varint(&mut cursor)?;
current += delta;
values.push(current);
}
Ok(values)
}
fn write_varint(buf: &mut Vec<u8>, value: i64) {
let zigzag = ((value << 1) ^ (value >> 63)) as u64;
let mut v = zigzag;
loop {
if v < 0x80 {
buf.push(v as u8);
break;
} else {
buf.push((v as u8) | 0x80);
v >>= 7;
}
}
}
fn read_varint<R: Read>(reader: &mut R) -> io::Result<i64> {
let mut result: u64 = 0;
let mut shift = 0;
loop {
let mut byte = [0u8; 1];
reader.read_exact(&mut byte)?;
result |= ((byte[0] & 0x7F) as u64) << shift;
if byte[0] < 0x80 {
break;
}
shift += 7;
if shift > 63 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Varint too long",
));
}
}
let zigzag = result;
Ok(((zigzag >> 1) as i64) ^ (-((zigzag & 1) as i64)))
}
}
#[derive(Debug)]
pub struct ColumnEncoder;
impl ColumnEncoder {
pub fn analyze(values: &[Vec<u8>]) -> (EncodingType, EncodingStats) {
if values.is_empty() {
return (EncodingType::Raw, EncodingStats::default());
}
let row_count = values.len();
let original_size: usize = values.iter().map(|v| v.len()).sum();
let mut distinct: std::collections::HashSet<&[u8]> = std::collections::HashSet::new();
for v in values {
distinct.insert(v.as_slice());
}
let cardinality = distinct.len();
let ratio = cardinality as f64 / row_count as f64;
let is_sorted = values.windows(2).all(|w| w[0] <= w[1]);
let encoding = if ratio < 0.01 {
EncodingType::Dictionary
} else if ratio < 0.1 {
EncodingType::Rle
} else if is_sorted && values.iter().all(|v| v.len() == 8) {
EncodingType::Delta
} else {
EncodingType::Raw
};
let stats = EncodingStats {
original_size,
compressed_size: 0, encoding,
cardinality,
row_count,
is_sorted,
ratio: 0.0,
};
(encoding, stats)
}
pub fn encode(values: &[Vec<u8>]) -> (Vec<u8>, EncodingStats) {
let (encoding, mut stats) = Self::analyze(values);
let encoded = match encoding {
EncodingType::Dictionary => {
let encoder = DictionaryEncoder::build(values);
encoder.encode_column(values)
}
EncodingType::Rle => RleEncoder::encode(values),
EncodingType::Delta => {
let int_values: Vec<i64> = values
.iter()
.filter_map(|v| {
if v.len() == 8 {
Some(i64::from_le_bytes(v.as_slice().try_into().ok()?))
} else {
None
}
})
.collect();
if int_values.len() == values.len() {
DeltaEncoder::encode_i64(&int_values)
} else {
Self::encode_raw(values)
}
}
_ => Self::encode_raw(values),
};
let mut result = vec![encoding as u8];
result.extend_from_slice(&encoded);
stats.compressed_size = result.len();
stats.ratio = if stats.original_size > 0 {
stats.compressed_size as f64 / stats.original_size as f64
} else {
1.0
};
(result, stats)
}
fn encode_raw(values: &[Vec<u8>]) -> Vec<u8> {
let mut encoded = Vec::new();
encoded
.write_u64::<LittleEndian>(values.len() as u64)
.unwrap();
for value in values {
encoded
.write_u32::<LittleEndian>(value.len() as u32)
.unwrap();
encoded.extend_from_slice(value);
}
encoded
}
pub fn decode(data: &[u8]) -> io::Result<Vec<Vec<u8>>> {
if data.is_empty() {
return Ok(Vec::new());
}
let encoding = EncodingType::from_byte(data[0])
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Invalid encoding type"))?;
let payload = &data[1..];
match encoding {
EncodingType::Dictionary | EncodingType::DictionaryLz4 => {
DictionaryEncoder::decode_column(payload)
}
EncodingType::Rle => RleEncoder::decode(payload),
EncodingType::Delta | EncodingType::DeltaLz4 => {
let int_values = DeltaEncoder::decode_i64(payload)?;
Ok(int_values
.into_iter()
.map(|v| v.to_le_bytes().to_vec())
.collect())
}
_ => Self::decode_raw(payload),
}
}
fn decode_raw(data: &[u8]) -> io::Result<Vec<Vec<u8>>> {
let mut cursor = std::io::Cursor::new(data);
let count = cursor.read_u64::<LittleEndian>()? as usize;
let mut values = Vec::with_capacity(count);
for _ in 0..count {
let len = cursor.read_u32::<LittleEndian>()? as usize;
let mut value = vec![0u8; len];
cursor.read_exact(&mut value)?;
values.push(value);
}
Ok(values)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[ignore] fn test_dictionary_encoding() {
let values: Vec<Vec<u8>> = vec![
b"gpt-4".to_vec(),
b"gpt-4".to_vec(),
b"claude".to_vec(),
b"gpt-4".to_vec(),
b"claude".to_vec(),
b"gemini".to_vec(),
b"gpt-4".to_vec(),
];
let encoder = DictionaryEncoder::build(&values);
assert_eq!(encoder.size(), 3);
let encoded = encoder.encode_column(&values);
let decoded = DictionaryEncoder::decode_column(&encoded).unwrap();
assert_eq!(decoded, values);
let original_size: usize = values.iter().map(|v| v.len()).sum();
assert!(encoded.len() < original_size); }
#[test]
fn test_rle_encoding() {
let values: Vec<Vec<u8>> = vec![
b"active".to_vec(),
b"active".to_vec(),
b"active".to_vec(),
b"pending".to_vec(),
b"pending".to_vec(),
b"completed".to_vec(),
];
let encoded = RleEncoder::encode(&values);
let decoded = RleEncoder::decode(&encoded).unwrap();
assert_eq!(decoded, values);
}
#[test]
fn test_delta_encoding() {
let values: Vec<i64> = vec![
1000000, 1000001, 1000002, 1000003, 1000010, 1000011, 1000012,
];
let encoded = DeltaEncoder::encode_i64(&values);
let decoded = DeltaEncoder::decode_i64(&encoded).unwrap();
assert_eq!(decoded, values);
let original_size = values.len() * 8;
assert!(encoded.len() < original_size);
}
#[test]
fn test_column_encoder_auto_select() {
let low_cardinality: Vec<Vec<u8>> = (0..1000)
.map(|i| format!("model_{}", i % 5).into_bytes())
.collect();
let (encoding, stats) = ColumnEncoder::analyze(&low_cardinality);
assert_eq!(encoding, EncodingType::Dictionary);
assert_eq!(stats.cardinality, 5);
let (encoded, _) = ColumnEncoder::encode(&low_cardinality);
let decoded = ColumnEncoder::decode(&encoded).unwrap();
assert_eq!(decoded, low_cardinality);
}
#[test]
fn test_column_encoder_high_cardinality() {
let high_cardinality: Vec<Vec<u8>> = (0..100)
.map(|i| format!("unique_value_{}", i).into_bytes())
.collect();
let (encoding, _) = ColumnEncoder::analyze(&high_cardinality);
assert_eq!(encoding, EncodingType::Raw);
}
#[test]
fn test_encoding_roundtrip() {
let test_cases: Vec<Vec<Vec<u8>>> = vec![
vec![],
vec![b"test".to_vec()],
(0..100)
.map(|i| format!("v{}", i % 3).into_bytes())
.collect(),
(0..50)
.map(|i| format!("unique{}", i).into_bytes())
.collect(),
];
for values in test_cases {
let (encoded, _) = ColumnEncoder::encode(&values);
let decoded = ColumnEncoder::decode(&encoded).unwrap();
assert_eq!(decoded, values, "Roundtrip failed");
}
}
#[test]
#[ignore] fn test_compression_ratios() {
let repeated: Vec<Vec<u8>> = (0..10000).map(|_| b"repeated_value".to_vec()).collect();
let (_encoded, stats) = ColumnEncoder::encode(&repeated);
println!("Original: {} bytes", stats.original_size);
println!("Compressed: {} bytes", stats.compressed_size);
println!("Ratio: {:.2}", stats.ratio);
assert!(
stats.ratio < 0.1,
"Expected >10x compression for repeated values"
);
}
}