use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub enum BinaryFormatError {
CompressionError(String),
DecompressionError(String),
EncodingError(String),
InvalidConfig(String),
}
impl std::fmt::Display for BinaryFormatError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
BinaryFormatError::CompressionError(e) => write!(f, "Compression error: {}", e),
BinaryFormatError::DecompressionError(e) => write!(f, "Decompression error: {}", e),
BinaryFormatError::EncodingError(e) => write!(f, "Encoding error: {}", e),
BinaryFormatError::InvalidConfig(e) => write!(f, "Invalid config: {}", e),
}
}
}
impl std::error::Error for BinaryFormatError {}
#[derive(Debug)]
pub struct DeltaEncoder;
impl DeltaEncoder {
pub fn encode(values: &[i64]) -> Result<Vec<u8>, BinaryFormatError> {
if values.is_empty() {
return Ok(Vec::new());
}
let mut encoded = Vec::with_capacity(values.len() * 8);
let mut prev = values[0];
encoded.extend_from_slice(&prev.to_le_bytes());
for &value in &values[1..] {
let delta = value.wrapping_sub(prev);
encoded.extend_from_slice(&delta.to_le_bytes());
prev = value;
}
Ok(encoded)
}
pub fn decode(bytes: &[u8]) -> Result<Vec<i64>, BinaryFormatError> {
if bytes.is_empty() {
return Ok(Vec::new());
}
if bytes.len() % 8 != 0 {
return Err(BinaryFormatError::DecompressionError(
"Invalid delta encoded data: length not multiple of 8".to_string(),
));
}
let mut values = Vec::new();
let mut prev = i64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3],
bytes[4], bytes[5], bytes[6], bytes[7],
]);
values.push(prev);
for chunk in bytes[8..].chunks(8) {
let delta = i64::from_le_bytes([
chunk[0], chunk[1], chunk[2], chunk[3],
chunk[4], chunk[5], chunk[6], chunk[7],
]);
prev = prev.wrapping_add(delta);
values.push(prev);
}
Ok(values)
}
}
#[derive(Debug)]
pub struct DictionaryCompressor;
impl DictionaryCompressor {
pub fn compress_strings(
values: &[&str],
) -> Result<(Vec<u8>, HashMap<String, u32>), BinaryFormatError> {
let mut dictionary: HashMap<String, u32> = HashMap::new();
let mut indices = Vec::with_capacity(values.len());
let mut next_id = 0u32;
for &value in values {
let id = dictionary.entry(value.to_string()).or_insert_with(|| {
let id = next_id;
next_id += 1;
id
});
indices.push(*id);
}
let mut encoded = Vec::new();
for idx in indices {
encoded.extend_from_slice(&idx.to_le_bytes());
}
Ok((encoded, dictionary))
}
pub fn decompress_strings(
bytes: &[u8],
dictionary: &HashMap<String, u32>,
) -> Result<Vec<String>, BinaryFormatError> {
if bytes.len() % 4 != 0 {
return Err(BinaryFormatError::DecompressionError(
"Invalid dictionary encoded data: length not multiple of 4".to_string(),
));
}
let mut reverse_dict: HashMap<u32, String> = HashMap::new();
for (key, &idx) in dictionary.iter() {
reverse_dict.insert(idx, key.clone());
}
let mut result = Vec::new();
for chunk in bytes.chunks(4) {
let idx = u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
let value = reverse_dict
.get(&idx)
.ok_or_else(|| {
BinaryFormatError::DecompressionError(format!("Unknown dictionary index: {}", idx))
})?
.clone();
result.push(value);
}
Ok(result)
}
}
#[derive(Debug)]
pub struct IncrementalEncoder {
column_types: Vec<ColumnType>,
last_values: Vec<Vec<u8>>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ColumnType {
Int64,
Float64,
String,
Binary(usize),
}
impl IncrementalEncoder {
pub fn new(schema: Vec<ColumnType>) -> Result<Self, BinaryFormatError> {
Ok(IncrementalEncoder {
column_types: schema.clone(),
last_values: vec![Vec::new(); schema.len()],
})
}
pub fn encode_row(&mut self, row: &[&[u8]]) -> Result<Vec<u8>, BinaryFormatError> {
if row.len() != self.column_types.len() {
return Err(BinaryFormatError::EncodingError(
format!("Row size {} doesn't match schema size {}", row.len(), self.column_types.len()),
));
}
let mut encoded = Vec::new();
for (i, column_data) in row.iter().enumerate() {
let changed = &self.last_values[i] != column_data;
encoded.push(if changed { 1u8 } else { 0u8 });
if changed {
let len = column_data.len() as u32;
encoded.extend_from_slice(&len.to_le_bytes());
encoded.extend_from_slice(column_data);
self.last_values[i] = column_data.to_vec();
}
}
Ok(encoded)
}
}
#[derive(Debug, Clone, Copy)]
pub struct CompressionLevel(u8);
impl CompressionLevel {
pub fn new(level: u8) -> Result<Self, BinaryFormatError> {
if level < 1 || level > 9 {
return Err(BinaryFormatError::InvalidConfig(
format!("Compression level must be 1-9, got {}", level),
));
}
Ok(CompressionLevel(level))
}
pub fn level(&self) -> u8 {
self.0
}
}
#[derive(Debug, Clone)]
pub struct ColumnStats {
pub name: String,
pub count: u64,
pub null_count: u64,
pub distinct_count: u64,
pub min_value: Option<Vec<u8>>,
pub max_value: Option<Vec<u8>>,
}
#[derive(Debug)]
pub struct FormatMetadata {
pub version: String,
pub compression: String,
pub level: u8,
pub row_count: u64,
pub column_count: u32,
pub column_stats: Vec<ColumnStats>,
pub checksum: u32,
}
#[derive(Debug)]
pub struct BitPackedDelta;
impl BitPackedDelta {
fn min_bits_for_value(min_val: i64, max_val: i64) -> u8 {
let range = (max_val as u128).wrapping_sub(min_val as u128);
if range == 0 {
1
} else {
let bits = 64 - (range as u64).leading_zeros() as u8;
match bits {
0 => 1,
1..=1 => 1,
2..=2 => 2,
3..=4 => 4,
5..=8 => 8,
9..=16 => 16,
_ => 32,
}
}
}
pub fn encode(values: &[i64]) -> Result<Vec<u8>, BinaryFormatError> {
if values.is_empty() {
return Ok(Vec::new());
}
let mut deltas = Vec::with_capacity(values.len());
let mut prev = values[0];
for &value in &values[1..] {
let delta = value.wrapping_sub(prev);
deltas.push(delta);
prev = value;
}
let min_delta = *deltas.iter().min().unwrap_or(&0);
let max_delta = *deltas.iter().max().unwrap_or(&0);
let bits_needed = Self::min_bits_for_value(min_delta, max_delta);
let frame_value = min_delta;
let normalized: Vec<u64> = deltas
.iter()
.map(|d| d.wrapping_sub(frame_value) as u64)
.collect();
let mut encoded = Vec::new();
encoded.extend_from_slice(&values[0].to_le_bytes());
encoded.push(bits_needed);
encoded.extend_from_slice(&frame_value.to_le_bytes());
encoded.extend_from_slice(&(deltas.len() as u32).to_le_bytes());
match bits_needed {
1 => Self::pack_bits(&normalized, 1, &mut encoded),
2 => Self::pack_bits(&normalized, 2, &mut encoded),
4 => Self::pack_bits(&normalized, 4, &mut encoded),
8 => Self::pack_bytes(&normalized, 1, &mut encoded),
16 => Self::pack_bytes(&normalized, 2, &mut encoded),
32 => Self::pack_bytes(&normalized, 4, &mut encoded),
_ => Self::pack_bytes(&normalized, 8, &mut encoded),
}
Ok(encoded)
}
fn pack_bits(values: &[u64], bits: usize, output: &mut Vec<u8>) {
let mut byte_buffer = 0u8;
let mut bit_pos = 0;
let mask = (1u64 << bits) - 1;
for &value in values {
let val = (value & mask) as u8;
if bit_pos + bits <= 8 {
byte_buffer |= (val << bit_pos) as u8;
bit_pos += bits;
if bit_pos == 8 {
output.push(byte_buffer);
byte_buffer = 0;
bit_pos = 0;
}
} else {
let bits_in_current = 8 - bit_pos;
byte_buffer |= (val << bit_pos) as u8;
output.push(byte_buffer);
byte_buffer = (val >> bits_in_current) as u8;
bit_pos = bits - bits_in_current;
}
}
if bit_pos > 0 {
output.push(byte_buffer);
}
}
fn pack_bytes(values: &[u64], bytes: usize, output: &mut Vec<u8>) {
for &value in values {
match bytes {
1 => output.push(value as u8),
2 => output.extend_from_slice(&(value as u16).to_le_bytes()),
4 => output.extend_from_slice(&(value as u32).to_le_bytes()),
_ => output.extend_from_slice(&value.to_le_bytes()),
}
}
}
pub fn decode(bytes: &[u8]) -> Result<Vec<i64>, BinaryFormatError> {
if bytes.len() < 21 {
return Err(BinaryFormatError::DecompressionError(
"BitPacked data too short".to_string(),
));
}
let baseline = i64::from_le_bytes([
bytes[0], bytes[1], bytes[2], bytes[3],
bytes[4], bytes[5], bytes[6], bytes[7],
]);
let bits_needed = bytes[8];
let frame_value = i64::from_le_bytes([
bytes[9], bytes[10], bytes[11], bytes[12],
bytes[13], bytes[14], bytes[15], bytes[16],
]);
let count = u32::from_le_bytes([bytes[17], bytes[18], bytes[19], bytes[20]]) as usize;
let mut result = vec![baseline];
let mut prev = baseline;
if count == 0 {
return Ok(result);
}
let packed_data = &bytes[21..];
match bits_needed {
1 => {
let unpacked = Self::unpack_bits(packed_data, 1, count)?;
for delta_norm in unpacked {
let delta = (delta_norm as i64).wrapping_add(frame_value);
prev = prev.wrapping_add(delta);
result.push(prev);
}
}
2 => {
let unpacked = Self::unpack_bits(packed_data, 2, count)?;
for delta_norm in unpacked {
let delta = (delta_norm as i64).wrapping_add(frame_value);
prev = prev.wrapping_add(delta);
result.push(prev);
}
}
4 => {
let unpacked = Self::unpack_bits(packed_data, 4, count)?;
for delta_norm in unpacked {
let delta = (delta_norm as i64).wrapping_add(frame_value);
prev = prev.wrapping_add(delta);
result.push(prev);
}
}
8 => {
for i in 0..count {
let delta_norm = packed_data[i] as i64;
let delta = delta_norm.wrapping_add(frame_value);
prev = prev.wrapping_add(delta);
result.push(prev);
}
}
16 => {
for i in 0..count {
let delta_norm = u16::from_le_bytes([packed_data[i * 2], packed_data[i * 2 + 1]]) as i64;
let delta = delta_norm.wrapping_add(frame_value);
prev = prev.wrapping_add(delta);
result.push(prev);
}
}
_ => {
for i in 0..count {
let delta_norm = u32::from_le_bytes([
packed_data[i * 4],
packed_data[i * 4 + 1],
packed_data[i * 4 + 2],
packed_data[i * 4 + 3],
]) as i64;
let delta = delta_norm.wrapping_add(frame_value);
prev = prev.wrapping_add(delta);
result.push(prev);
}
}
}
Ok(result)
}
fn unpack_bits(data: &[u8], bits: usize, count: usize) -> Result<Vec<u64>, BinaryFormatError> {
let mut result = Vec::with_capacity(count);
let mut byte_idx = 0;
let mut bit_pos = 0;
let mask = (1u64 << bits) - 1;
for _ in 0..count {
if byte_idx >= data.len() {
return Err(BinaryFormatError::DecompressionError(
"Not enough data to unpack".to_string(),
));
}
let mut value = 0u64;
let mut bits_read = 0;
while bits_read < bits && byte_idx < data.len() {
let byte_val = data[byte_idx] as u64;
let bits_available = 8 - bit_pos;
let bits_to_read = std::cmp::min(bits - bits_read, bits_available);
let bits_mask = ((1u64 << bits_to_read) - 1) << bit_pos;
let bits_val = (byte_val & bits_mask) >> bit_pos;
value |= bits_val << bits_read;
bits_read += bits_to_read;
bit_pos += bits_to_read;
if bit_pos >= 8 {
bit_pos = 0;
byte_idx += 1;
}
}
result.push(value & mask);
}
Ok(result)
}
}
#[derive(Debug)]
pub struct ZigzagEncoding;
impl ZigzagEncoding {
pub fn encode(n: i64) -> u64 {
((n << 1) ^ (n >> 63)) as u64
}
pub fn decode(n: u64) -> i64 {
((n >> 1) as i64) ^ -((n & 1) as i64)
}
pub fn encode_sequence(values: &[i64]) -> Vec<u64> {
values.iter().map(|&n| Self::encode(n)).collect()
}
pub fn decode_sequence(values: &[u64]) -> Vec<i64> {
values.iter().map(|&n| Self::decode(n)).collect()
}
}
#[derive(Debug)]
pub struct DictionaryRleEncoder;
impl DictionaryRleEncoder {
pub fn compress_with_rle(
values: &[&str],
) -> Result<(Vec<u8>, HashMap<String, u32>), BinaryFormatError> {
if values.is_empty() {
return Ok((Vec::new(), HashMap::new()));
}
let mut dictionary: HashMap<String, u32> = HashMap::new();
let mut next_id = 0u32;
for &value in values {
dictionary.entry(value.to_string()).or_insert_with(|| {
let id = next_id;
next_id += 1;
id
});
}
let mut encoded = Vec::new();
let mut i = 0;
while i < values.len() {
let current_id = *dictionary.get(values[i]).unwrap();
let mut count = 1u32;
while i + (count as usize) < values.len()
&& dictionary.get(values[i + count as usize]) == Some(¤t_id) {
count += 1;
}
encoded.extend_from_slice(¤t_id.to_le_bytes());
encoded.extend_from_slice(&count.to_le_bytes());
i += count as usize;
}
Ok((encoded, dictionary))
}
pub fn decompress_rle(
bytes: &[u8],
dictionary: &HashMap<String, u32>,
) -> Result<Vec<String>, BinaryFormatError> {
if bytes.len() % 8 != 0 {
return Err(BinaryFormatError::DecompressionError(
"Invalid RLE data: length not multiple of 8".to_string(),
));
}
let mut reverse_dict: HashMap<u32, String> = HashMap::new();
for (key, &idx) in dictionary.iter() {
reverse_dict.insert(idx, key.clone());
}
let mut result = Vec::new();
for chunk in bytes.chunks(8) {
let id = u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
let count = u32::from_le_bytes([chunk[4], chunk[5], chunk[6], chunk[7]]) as usize;
let value = reverse_dict.get(&id).ok_or_else(|| {
BinaryFormatError::DecompressionError(format!("Unknown RLE ID: {}", id))
})?;
for _ in 0..count {
result.push(value.clone());
}
}
Ok(result)
}
}
#[derive(Debug)]
pub struct PrefixCompressedDict {
pub prefixes: Vec<String>,
pub dictionary: HashMap<String, (u32, String)>,
}
impl PrefixCompressedDict {
fn find_common_prefixes(values: &[&str]) -> Vec<String> {
if values.is_empty() {
return vec![];
}
let mut prefixes = vec![];
for prefix_len in 1..=3 {
let mut prefix_map: HashMap<&str, usize> = HashMap::new();
for value in values {
if value.len() >= prefix_len {
let prefix = &value[..prefix_len];
*prefix_map.entry(prefix).or_insert(0) += 1;
}
}
for (prefix, count) in prefix_map {
if count >= 2 {
prefixes.push(prefix.to_string());
}
}
}
prefixes
}
pub fn compress(values: &[&str]) -> Result<(Vec<u8>, Self), BinaryFormatError> {
let prefixes = Self::find_common_prefixes(values);
let mut dictionary: HashMap<String, (u32, String)> = HashMap::new();
for value in values {
let mut best_prefix_id = u32::MAX;
let mut best_suffix = value.to_string();
for (prefix_id, prefix) in prefixes.iter().enumerate() {
if value.starts_with(prefix) && prefix.len() > 0 {
let suffix = &value[prefix.len()..];
if best_prefix_id == u32::MAX || suffix.len() < best_suffix.len() {
best_prefix_id = prefix_id as u32;
best_suffix = suffix.to_string();
}
}
}
if best_prefix_id == u32::MAX {
best_prefix_id = u32::MAX;
best_suffix = value.to_string();
}
dictionary.insert(value.to_string(), (best_prefix_id, best_suffix));
}
let mut encoded = Vec::new();
encoded.push(prefixes.len() as u8);
for prefix in &prefixes {
encoded.push(prefix.len() as u8);
encoded.extend_from_slice(prefix.as_bytes());
}
encoded.push(dictionary.len() as u8);
for (value, (prefix_id, suffix)) in &dictionary {
encoded.push(*prefix_id as u8);
encoded.push(suffix.len() as u8);
encoded.extend_from_slice(suffix.as_bytes());
}
let encoder = PrefixCompressedDict { prefixes, dictionary };
Ok((encoded, encoder))
}
pub fn decompress(bytes: &[u8]) -> Result<(Vec<String>, Self), BinaryFormatError> {
if bytes.is_empty() {
return Err(BinaryFormatError::DecompressionError(
"Empty prefix-compressed data".to_string(),
));
}
let mut pos = 0;
let prefix_count = bytes[pos] as usize;
pos += 1;
let mut prefixes = Vec::new();
for _ in 0..prefix_count {
let prefix_len = bytes[pos] as usize;
pos += 1;
let prefix = String::from_utf8(bytes[pos..pos + prefix_len].to_vec())
.map_err(|_| BinaryFormatError::DecompressionError("Invalid UTF-8".to_string()))?;
prefixes.push(prefix);
pos += prefix_len;
}
let dict_count = bytes[pos] as usize;
pos += 1;
let mut dictionary = HashMap::new();
let mut values = Vec::new();
for _ in 0..dict_count {
let prefix_id = bytes[pos] as u32;
pos += 1;
let suffix_len = bytes[pos] as usize;
pos += 1;
let suffix = String::from_utf8(bytes[pos..pos + suffix_len].to_vec())
.map_err(|_| BinaryFormatError::DecompressionError("Invalid UTF-8".to_string()))?;
pos += suffix_len;
let value = if prefix_id < prefixes.len() as u32 {
format!("{}{}", prefixes[prefix_id as usize], suffix)
} else {
suffix.clone()
};
dictionary.insert(value.clone(), (prefix_id, suffix));
values.push(value);
}
let encoder = PrefixCompressedDict { prefixes, dictionary };
Ok((values, encoder))
}
}
#[derive(Debug)]
pub struct HuffmanCoding {
pub codes: HashMap<u32, (u32, u8)>,
}
impl HuffmanCoding {
pub fn build_from_frequencies(frequencies: &[(u32, usize)]) -> Result<Self, BinaryFormatError> {
if frequencies.is_empty() {
return Ok(HuffmanCoding { codes: HashMap::new() });
}
let mut codes = HashMap::new();
let mut sorted = frequencies.to_vec();
sorted.sort_by_key(|x| std::cmp::Reverse(x.1));
for (idx, (value, _freq)) in sorted.iter().enumerate() {
let code_len = if idx < 16 {
4 } else if idx < 64 {
6 } else {
8 };
codes.insert(*value, (idx as u32, code_len));
}
Ok(HuffmanCoding { codes })
}
pub fn encode(&self, value: u32) -> Result<(u32, u8), BinaryFormatError> {
self.codes.get(&value).copied().ok_or_else(|| {
BinaryFormatError::EncodingError(format!("No Huffman code for value {}", value))
})
}
}
#[derive(Debug)]
pub struct ColumnOrderingOptimizer;
impl ColumnOrderingOptimizer {
pub fn score_column(values: &[&str]) -> u32 {
if values.is_empty() {
return 1;
}
let mut numeric_count = 0;
let mut all_numeric = true;
let mut numeric_values: Vec<i64> = Vec::new();
for value in values {
if let Ok(num) = value.parse::<i64>() {
numeric_count += 1;
numeric_values.push(num);
} else {
all_numeric = false;
}
}
if numeric_count as f64 / values.len() as f64 > 0.8 {
if all_numeric {
let mut max_delta = 0i64;
let mut avg_delta = 0i64;
for i in 1..numeric_values.len() {
let delta = (numeric_values[i] - numeric_values[i-1]).abs();
max_delta = max_delta.max(delta);
avg_delta += delta;
}
avg_delta /= (numeric_values.len() - 1).max(1) as i64;
if max_delta < 256 && avg_delta < 32 {
return 9; } else if max_delta < 65536 {
return 8; } else {
return 6; }
}
}
let mut unique_values = std::collections::HashSet::new();
for value in values {
unique_values.insert(*value);
}
let cardinality = unique_values.len() as f64 / values.len() as f64;
if cardinality < 0.01 {
return 8; } else if cardinality < 0.1 {
return 7; } else if cardinality < 0.5 {
return 5; } else {
return 3; }
}
pub fn reorder_columns(
columns: &[Vec<String>],
) -> Result<Vec<usize>, BinaryFormatError> {
if columns.is_empty() {
return Ok(Vec::new());
}
let mut scores: Vec<(usize, u32)> = columns
.iter()
.enumerate()
.map(|(idx, col)| {
let score = Self::score_column(
&col.iter().map(|s| s.as_ref()).collect::<Vec<_>>()
);
(idx, score)
})
.collect();
scores.sort_by_key(|(_idx, score)| std::cmp::Reverse(*score));
let reordered = scores.iter().map(|(idx, _)| *idx).collect();
Ok(reordered)
}
pub fn strategy_for_column(values: &[&str]) -> &'static str {
let score = Self::score_column(values);
match score {
8..=9 => "BitPackedDelta + ZigzagEncoding",
7 => "DictionaryRleEncoder",
6 => "HuffmanCoding",
_ => "Minimal compression",
}
}
}
#[derive(Debug, Clone)]
pub struct BlockMetadata {
pub block_index: u32,
pub original_offset: usize,
pub original_size: usize,
pub compressed_size: usize,
pub local_dictionary: Option<HashMap<String, u32>>,
}
#[derive(Debug)]
pub struct Block {
pub index: u32,
pub data: Vec<u8>,
pub metadata: BlockMetadata,
}
pub struct BlockCompressor {
block_size: usize,
}
impl BlockCompressor {
pub fn new() -> Self {
Self {
block_size: 65536, }
}
pub fn with_block_size(size: usize) -> Result<Self, BinaryFormatError> {
if size == 0 || size > 1024 * 1024 { return Err(BinaryFormatError::InvalidConfig(
"Block size must be between 1 byte and 1MB".to_string()
));
}
Ok(Self { block_size: size })
}
pub fn create_blocks(&self, data: &[u8]) -> Result<Vec<Block>, BinaryFormatError> {
if data.is_empty() {
return Ok(Vec::new());
}
let mut blocks = Vec::new();
let mut offset = 0;
let mut block_index = 0;
while offset < data.len() {
let end = (offset + self.block_size).min(data.len());
let block_data = data[offset..end].to_vec();
let block = Block {
index: block_index,
data: block_data.clone(),
metadata: BlockMetadata {
block_index,
original_offset: offset,
original_size: block_data.len(),
compressed_size: 0, local_dictionary: None,
},
};
blocks.push(block);
offset = end;
block_index += 1;
}
Ok(blocks)
}
pub fn compress_block(
&self,
block: &mut Block,
) -> Result<Vec<u8>, BinaryFormatError> {
if block.data.is_empty() {
block.metadata.compressed_size = 0;
return Ok(Vec::new());
}
let mut compressed = Vec::new();
compressed.extend_from_slice(&block.metadata.block_index.to_le_bytes());
compressed.extend_from_slice(&(block.metadata.original_size as u32).to_le_bytes());
compressed.extend_from_slice(&block.data);
block.metadata.compressed_size = compressed.len();
Ok(compressed)
}
pub fn decompress_block(block_data: &[u8]) -> Result<Vec<u8>, BinaryFormatError> {
if block_data.len() < 8 {
return Err(BinaryFormatError::DecompressionError(
"Block data too small for header".to_string()
));
}
let payload = &block_data[8..];
Ok(payload.to_vec())
}
pub fn compress_blocks_parallel(
&self,
blocks: Vec<Block>,
) -> Result<Vec<(Vec<u8>, BlockMetadata)>, BinaryFormatError> {
let mut results = Vec::new();
for mut block in blocks {
let compressed = self.compress_block(&mut block)?;
let metadata = block.metadata.clone();
results.push((compressed, metadata));
}
results.sort_by_key(|(_, meta)| meta.block_index);
Ok(results)
}
pub fn decompress_blocks(
compressed_blocks: &[(Vec<u8>, BlockMetadata)],
) -> Result<Vec<u8>, BinaryFormatError> {
let mut result = Vec::new();
for (block_data, _metadata) in compressed_blocks {
let decompressed = Self::decompress_block(block_data)?;
result.extend_from_slice(&decompressed);
}
Ok(result)
}
pub fn get_compression_stats(
results: &[(Vec<u8>, BlockMetadata)],
) -> (usize, usize, f64) {
let original_size: usize = results.iter().map(|(_, m)| m.original_size).sum();
let compressed_size: usize = results.iter().map(|(_, m)| m.compressed_size).sum();
let ratio = if original_size > 0 {
compressed_size as f64 / original_size as f64
} else {
1.0
};
(original_size, compressed_size, ratio)
}
pub fn get_block_size(&self) -> usize {
self.block_size
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_delta_encoding_integers() {
let values = vec![100, 105, 103, 108, 110];
let encoded = DeltaEncoder::encode(&values).unwrap();
let decoded = DeltaEncoder::decode(&encoded).unwrap();
assert_eq!(values, decoded);
}
#[test]
fn test_dictionary_compression() {
let values = vec!["red", "blue", "red", "green", "blue", "red"];
let (encoded, dict) = DictionaryCompressor::compress_strings(&values).unwrap();
let decoded = DictionaryCompressor::decompress_strings(&encoded, &dict).unwrap();
assert_eq!(
decoded,
vec!["red", "blue", "red", "green", "blue", "red"]
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
);
}
#[test]
fn test_compression_level_validation() {
assert!(CompressionLevel::new(1).is_ok());
assert!(CompressionLevel::new(9).is_ok());
assert!(CompressionLevel::new(0).is_err());
assert!(CompressionLevel::new(10).is_err());
}
#[test]
fn test_incremental_encoder() {
let schema = vec![ColumnType::Int64, ColumnType::String];
let mut encoder = IncrementalEncoder::new(schema).unwrap();
let row1 = vec![b"100".as_ref(), b"hello".as_ref()];
let encoded1 = encoder.encode_row(&row1).unwrap();
assert!(!encoded1.is_empty());
let encoded2 = encoder.encode_row(&row1).unwrap();
assert!(encoded2.len() < encoded1.len());
}
#[test]
fn test_bitpacked_delta_simple() {
let values = vec![100, 105, 103, 108, 110];
let encoded = BitPackedDelta::encode(&values).unwrap();
let decoded = BitPackedDelta::decode(&encoded).unwrap();
assert_eq!(values, decoded);
}
#[test]
fn test_bitpacked_delta_small_deltas() {
let values = vec![1000, 1001, 1000, 1001, 1000, 1001];
let encoded = BitPackedDelta::encode(&values).unwrap();
let decoded = BitPackedDelta::decode(&encoded).unwrap();
assert_eq!(values, decoded);
assert!(encoded.len() < values.len() * 8);
}
#[test]
fn test_bitpacked_delta_large_range() {
let values = vec![0, 1000, 2000, 3000, 4000, 5000];
let encoded = BitPackedDelta::encode(&values).unwrap();
let decoded = BitPackedDelta::decode(&encoded).unwrap();
assert_eq!(values, decoded);
}
#[test]
fn test_bitpacked_delta_negative() {
let values = vec![-100, -95, -90, -85, -80];
let encoded = BitPackedDelta::encode(&values).unwrap();
let decoded = BitPackedDelta::decode(&encoded).unwrap();
assert_eq!(values, decoded);
}
#[test]
fn test_bitpacked_delta_monotonic() {
let values: Vec<i64> = (0..100).collect();
let encoded = BitPackedDelta::encode(&values).unwrap();
let decoded = BitPackedDelta::decode(&encoded).unwrap();
assert_eq!(values, decoded);
assert!(encoded.len() < values.len() * 2);
}
#[test]
fn test_bitpacked_delta_empty() {
let values: Vec<i64> = vec![];
let encoded = BitPackedDelta::encode(&values).unwrap();
assert_eq!(encoded.len(), 0);
}
#[test]
fn test_bitpacked_delta_single_value() {
let values = vec![42];
let encoded = BitPackedDelta::encode(&values).unwrap();
let decoded = BitPackedDelta::decode(&encoded).unwrap();
assert_eq!(values, decoded);
}
#[test]
fn test_bitpacked_delta_compression_ratio() {
let mut values = vec![];
let mut val = 1000000i64;
for _ in 0..1000 {
values.push(val);
val += 100; }
let std_delta = DeltaEncoder::encode(&values).unwrap();
let bitpacked = BitPackedDelta::encode(&values).unwrap();
let ratio = bitpacked.len() as f64 / std_delta.len() as f64;
assert!(ratio < 0.3, "Bitpacked should be <30% of standard delta, got {}", ratio);
}
#[test]
fn test_zigzag_positive() {
assert_eq!(ZigzagEncoding::encode(0), 0);
assert_eq!(ZigzagEncoding::encode(1), 2);
assert_eq!(ZigzagEncoding::encode(2), 4);
assert_eq!(ZigzagEncoding::encode(3), 6);
}
#[test]
fn test_zigzag_negative() {
assert_eq!(ZigzagEncoding::encode(-1), 1);
assert_eq!(ZigzagEncoding::encode(-2), 3);
assert_eq!(ZigzagEncoding::encode(-3), 5);
}
#[test]
fn test_zigzag_roundtrip() {
let values = vec![-100, -50, -1, 0, 1, 50, 100];
let encoded = ZigzagEncoding::encode_sequence(&values);
let decoded = ZigzagEncoding::decode_sequence(&encoded);
assert_eq!(values, decoded);
}
#[test]
fn test_zigzag_small_magnitude() {
let small_neg = ZigzagEncoding::encode(-1);
let small_pos = ZigzagEncoding::encode(1);
let large_neg = ZigzagEncoding::encode(-1000);
let large_pos = ZigzagEncoding::encode(1000);
assert!(small_neg < large_neg as u64);
assert!(small_pos < large_pos as u64);
}
#[test]
fn test_enhanced_delta_compression_ratio() {
let values = vec![100, 105, 103, 108, 110, 115, 113, 118, 120, 125];
let std_delta = DeltaEncoder::encode(&values).unwrap();
let bitpacked = BitPackedDelta::encode(&values).unwrap();
let std_decoded = DeltaEncoder::decode(&std_delta).unwrap();
let bp_decoded = BitPackedDelta::decode(&bitpacked).unwrap();
assert_eq!(std_decoded, values);
assert_eq!(bp_decoded, values);
assert!(bitpacked.len() < std_delta.len());
}
#[test]
fn test_dictionary_rle_simple() {
let values = vec!["NY", "NY", "NY", "TX", "TX", "CA"];
let (encoded, dict) = DictionaryRleEncoder::compress_with_rle(&values).unwrap();
let decoded = DictionaryRleEncoder::decompress_rle(&encoded, &dict).unwrap();
let expected: Vec<String> = values.iter().map(|s| s.to_string()).collect();
assert_eq!(decoded, expected);
}
#[test]
fn test_dictionary_rle_high_repetition() {
let mut values = vec![];
for _ in 0..100 {
values.push("A");
}
for _ in 0..50 {
values.push("B");
}
for _ in 0..25 {
values.push("C");
}
let (encoded, dict) = DictionaryRleEncoder::compress_with_rle(&values).unwrap();
let decoded = DictionaryRleEncoder::decompress_rle(&encoded, &dict).unwrap();
assert_eq!(decoded.len(), 175);
assert_eq!(decoded[0], "A");
assert_eq!(decoded[100], "B");
assert_eq!(decoded[150], "C");
assert!(encoded.len() < values.len() * 4);
}
#[test]
fn test_dictionary_rle_no_repetition() {
let values = vec!["A", "B", "C", "D", "E", "F"];
let (encoded, dict) = DictionaryRleEncoder::compress_with_rle(&values).unwrap();
let decoded = DictionaryRleEncoder::decompress_rle(&encoded, &dict).unwrap();
let expected: Vec<String> = values.iter().map(|s| s.to_string()).collect();
assert_eq!(decoded, expected);
}
#[test]
fn test_dictionary_rle_single_value() {
let values = vec!["X"];
let (encoded, dict) = DictionaryRleEncoder::compress_with_rle(&values).unwrap();
let decoded = DictionaryRleEncoder::decompress_rle(&encoded, &dict).unwrap();
assert_eq!(decoded, vec!["X".to_string()]);
}
#[test]
fn test_dictionary_rle_compression_ratio() {
let mut values = vec![];
for _ in 0..1000 {
values.extend(&["NY", "TX", "CA", "FL"]);
}
let (rle_encoded, dict) = DictionaryRleEncoder::compress_with_rle(&values).unwrap();
assert!(!rle_encoded.is_empty(), "RLE encoded data should not be empty");
let decoded = DictionaryRleEncoder::decompress_rle(&rle_encoded, &dict).unwrap();
assert_eq!(decoded.len(), values.len(), "Decompressed should match original length");
assert_eq!(decoded[0], "NY");
assert_eq!(decoded[1], "TX");
assert_eq!(decoded[2], "CA");
assert_eq!(decoded[3], "FL");
}
#[test]
fn test_prefix_compression_similar_strings() {
let values = vec!["Alabama", "Alaska", "Arizona"];
let (encoded, _) = PrefixCompressedDict::compress(
&values.iter().map(|s| s.as_ref()).collect::<Vec<_>>()
).unwrap();
let (decoded, _) = PrefixCompressedDict::decompress(&encoded).unwrap();
let mut decoded_set: std::collections::HashSet<_> = decoded.into_iter().collect();
for value in values {
assert!(decoded_set.contains(value));
}
}
#[test]
fn test_prefix_compression_different_strings() {
let values = vec!["apple", "banana", "cherry"];
let (encoded, _) = PrefixCompressedDict::compress(
&values.iter().map(|s| s.as_ref()).collect::<Vec<_>>()
).unwrap();
let (decoded, _) = PrefixCompressedDict::decompress(&encoded).unwrap();
let mut decoded_set: std::collections::HashSet<_> = decoded.into_iter().collect();
for value in values {
assert!(decoded_set.contains(value));
}
}
#[test]
fn test_prefix_compression_efficiency() {
let values = vec![
"USA/NewYork/NYC",
"USA/NewYork/Buffalo",
"USA/California/LA",
"USA/California/SF",
];
let (encoded, _) = PrefixCompressedDict::compress(
&values.iter().map(|s| s.as_ref()).collect::<Vec<_>>()
).unwrap();
assert!(encoded.len() > 0, "Encoded output should not be empty");
let (decoded, _) = PrefixCompressedDict::decompress(&encoded).unwrap();
let decoded_set: std::collections::HashSet<_> = decoded.into_iter().collect();
for value in values {
assert!(decoded_set.contains(value));
}
}
#[test]
fn test_huffman_code_assignment() {
let frequencies = vec![
(0, 100), (1, 50), (2, 10), (3, 5), ];
let huffman = HuffmanCoding::build_from_frequencies(&frequencies).unwrap();
let (code0, len0) = huffman.encode(0).unwrap();
let (code3, len3) = huffman.encode(3).unwrap();
assert!(len0 <= len3);
}
#[test]
fn test_huffman_all_values_encoded() {
let frequencies = vec![(0, 10), (1, 5), (2, 3)];
let huffman = HuffmanCoding::build_from_frequencies(&frequencies).unwrap();
for (value, _) in frequencies {
assert!(huffman.encode(value).is_ok());
}
}
#[test]
fn test_huffman_unknown_value() {
let frequencies = vec![(0, 10)];
let huffman = HuffmanCoding::build_from_frequencies(&frequencies).unwrap();
assert!(huffman.encode(99).is_err());
}
#[test]
fn test_column_score_high_delta_numeric() {
let values = vec!["100", "105", "103", "108", "110"];
let score = ColumnOrderingOptimizer::score_column(
&values.iter().map(|s| s.as_ref()).collect::<Vec<_>>()
);
assert!(score >= 8, "High-delta numeric should score 8-9, got {}", score);
}
#[test]
fn test_column_score_low_cardinality_categorical() {
let mut values = vec![];
for _ in 0..100 {
values.push("NY".to_string());
}
for _ in 0..50 {
values.push("TX".to_string());
}
for _ in 0..25 {
values.push("CA".to_string());
}
let str_values: Vec<&str> = values.iter().map(|s| s.as_ref()).collect();
let score = ColumnOrderingOptimizer::score_column(&str_values);
assert!(score >= 7, "Low-cardinality categorical should score 7-8, got {}", score);
}
#[test]
fn test_column_score_high_cardinality() {
let values = vec![
"Alice", "Bob", "Charlie", "David", "Eve",
"Frank", "Grace", "Henry", "Ivy", "Jack"
];
let score = ColumnOrderingOptimizer::score_column(
&values.iter().map(|s| s.as_ref()).collect::<Vec<_>>()
);
assert!(score <= 5, "High-cardinality should score low, got {}", score);
}
#[test]
fn test_column_reordering() {
let col1: Vec<String> = (0..100).map(|i| (i % 4).to_string()).collect(); let col2: Vec<String> = (0..100).map(|i| format!("unique_{}", i)).collect(); let col3: Vec<String> = (0..100).map(|i| (100 + i * 2).to_string()).collect();
let columns = vec![col2, col1, col3]; let reordered = ColumnOrderingOptimizer::reorder_columns(&columns).unwrap();
assert_eq!(reordered.len(), 3);
assert!(reordered != vec![0, 1, 2], "Columns should be reordered for optimization");
}
#[test]
fn test_column_strategy_selection() {
let numeric: Vec<String> = (0..50).map(|i| (100 + i * 2).to_string()).collect();
let numeric_refs: Vec<&str> = numeric.iter().map(|s| s.as_ref()).collect();
let strategy = ColumnOrderingOptimizer::strategy_for_column(&numeric_refs);
assert!(strategy.contains("BitPacked") || strategy.contains("Delta"),
"Small-delta numeric should use delta encoding, got: {}", strategy);
let mut categorical = vec![];
for _ in 0..50 {
categorical.push("A");
}
for _ in 0..30 {
categorical.push("B");
}
for _ in 0..20 {
categorical.push("C");
}
let categorical_refs: Vec<&str> = categorical.iter().map(|s| s.as_ref()).collect();
let strategy = ColumnOrderingOptimizer::strategy_for_column(&categorical_refs);
assert!(strategy.contains("Dictionary") || strategy.contains("Minimal"),
"Low cardinality should recognize dictionary/RLE, got: {}", strategy);
let high_card = vec!["unique1", "unique2", "unique3", "unique4", "unique5"];
let strategy = ColumnOrderingOptimizer::strategy_for_column(&high_card);
assert!(!strategy.is_empty(), "Strategy should be selected");
}
#[test]
fn test_column_score_empty() {
let values: Vec<&str> = vec![];
let score = ColumnOrderingOptimizer::score_column(&values);
assert_eq!(score, 1);
}
#[test]
fn test_block_creation_single_block() {
let compressor = BlockCompressor::new();
let data = vec![1, 2, 3, 4, 5];
let blocks = compressor.create_blocks(&data).unwrap();
assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0].index, 0);
assert_eq!(blocks[0].data, data);
}
#[test]
fn test_block_creation_multiple_blocks() {
let compressor = BlockCompressor::with_block_size(10).unwrap();
let data: Vec<u8> = (0..35).collect();
let blocks = compressor.create_blocks(&data).unwrap();
assert_eq!(blocks.len(), 4);
assert_eq!(blocks[0].metadata.original_size, 10);
assert_eq!(blocks[1].metadata.original_size, 10);
assert_eq!(blocks[2].metadata.original_size, 10);
assert_eq!(blocks[3].metadata.original_size, 5);
}
#[test]
fn test_block_creation_empty_data() {
let compressor = BlockCompressor::new();
let data: Vec<u8> = vec![];
let blocks = compressor.create_blocks(&data).unwrap();
assert_eq!(blocks.len(), 0);
}
#[test]
fn test_block_compression_simple() {
let compressor = BlockCompressor::new();
let data = vec![1, 2, 3, 4, 5];
let mut blocks = compressor.create_blocks(&data).unwrap();
let compressed = compressor.compress_block(&mut blocks[0]).unwrap();
assert!(compressed.len() >= 8 + data.len());
assert_eq!(blocks[0].metadata.compressed_size, compressed.len());
}
#[test]
fn test_block_decompression_accuracy() {
let compressor = BlockCompressor::new();
let data = vec![10, 20, 30, 40, 50];
let mut blocks = compressor.create_blocks(&data).unwrap();
let compressed = compressor.compress_block(&mut blocks[0]).unwrap();
let decompressed = BlockCompressor::decompress_block(&compressed).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn test_block_size_validation() {
assert!(BlockCompressor::with_block_size(1024).is_ok());
assert!(BlockCompressor::with_block_size(0).is_err());
assert!(BlockCompressor::with_block_size(2 * 1024 * 1024).is_err());
}
#[test]
fn test_block_metadata_tracking() {
let compressor = BlockCompressor::with_block_size(20).unwrap();
let data: Vec<u8> = (0..50).collect();
let blocks = compressor.create_blocks(&data).unwrap();
assert_eq!(blocks[0].metadata.block_index, 0);
assert_eq!(blocks[0].metadata.original_offset, 0);
assert_eq!(blocks[1].metadata.block_index, 1);
assert_eq!(blocks[1].metadata.original_offset, 20);
assert_eq!(blocks[2].metadata.block_index, 2);
assert_eq!(blocks[2].metadata.original_offset, 40);
}
#[test]
fn test_block_roundtrip() {
let compressor = BlockCompressor::new();
let data: Vec<u8> = (0..100).map(|i| (i % 256) as u8).collect();
let mut blocks = compressor.create_blocks(&data).unwrap();
for block in &mut blocks {
let compressed = compressor.compress_block(block).unwrap();
let decompressed = BlockCompressor::decompress_block(&compressed).unwrap();
assert_eq!(decompressed, block.data);
}
}
#[test]
fn test_parallel_compression_simple() {
let compressor = BlockCompressor::with_block_size(50).unwrap();
let data: Vec<u8> = (0..150).collect();
let blocks = compressor.create_blocks(&data).unwrap();
let results = compressor.compress_blocks_parallel(blocks).unwrap();
assert_eq!(results.len(), 3);
for (_compressed, metadata) in &results {
assert!(metadata.compressed_size > 0);
}
}
#[test]
fn test_parallel_decompression() {
let compressor = BlockCompressor::with_block_size(30).unwrap();
let data: Vec<u8> = (0..100).map(|i| (i % 256) as u8).collect();
let blocks = compressor.create_blocks(&data).unwrap();
let mut results = compressor.compress_blocks_parallel(blocks).unwrap();
for (compressed, metadata) in &mut results {
metadata.compressed_size = compressed.len();
}
let decompressed = BlockCompressor::decompress_blocks(&results).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn test_compression_statistics() {
let compressor = BlockCompressor::new();
let data: Vec<u8> = (0..100).collect();
let mut blocks = compressor.create_blocks(&data).unwrap();
for block in &mut blocks {
let _compressed = compressor.compress_block(block).unwrap();
}
let results: Vec<(Vec<u8>, BlockMetadata)> = blocks.into_iter()
.map(|b| {
let data = b.data.clone();
let metadata = b.metadata.clone();
(data, metadata)
})
.collect();
let (original, compressed, _ratio) = BlockCompressor::get_compression_stats(&results);
assert!(original > 0);
assert!(compressed > 0);
}
#[test]
fn test_block_ordering_preserved() {
let compressor = BlockCompressor::with_block_size(10).unwrap();
let data: Vec<u8> = (0..30).collect();
let blocks = compressor.create_blocks(&data).unwrap();
let results = compressor.compress_blocks_parallel(blocks).unwrap();
for (i, (_compressed, metadata)) in results.iter().enumerate() {
assert_eq!(metadata.block_index as usize, i);
}
}
#[test]
fn test_large_file_block_handling() {
let compressor = BlockCompressor::with_block_size(1024).unwrap();
let data: Vec<u8> = (0..10000).map(|i| (i % 256) as u8).collect();
let blocks = compressor.create_blocks(&data).unwrap();
assert!(blocks.len() >= 9 && blocks.len() <= 11);
for block in &blocks {
assert!(block.metadata.original_size <= 1024);
}
}
}