use crate::error::TauqError;
use serde_json::Value;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionCodec {
Raw = 0,
Delta = 1,
Dictionary = 2,
RunLength = 3,
}
impl CompressionCodec {
pub fn from_u8(v: u8) -> Option<Self> {
match v {
0 => Some(CompressionCodec::Raw),
1 => Some(CompressionCodec::Delta),
2 => Some(CompressionCodec::Dictionary),
3 => Some(CompressionCodec::RunLength),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct CodecAnalyzer {
samples: Vec<Option<Value>>,
sample_size: usize,
}
impl CodecAnalyzer {
pub fn new(sample_size: usize) -> Self {
Self {
samples: Vec::with_capacity(sample_size),
sample_size,
}
}
pub fn add_sample(&mut self, value: Option<Value>) {
if self.samples.len() < self.sample_size {
self.samples.push(value);
}
}
pub fn choose_codec(&self) -> CompressionCodec {
if self.samples.is_empty() {
return CompressionCodec::Raw;
}
let non_null_samples: Vec<&Value> =
self.samples.iter().filter_map(|v| v.as_ref()).collect();
if non_null_samples.is_empty() {
return CompressionCodec::Raw;
}
if self.check_rle(&non_null_samples) {
return CompressionCodec::RunLength;
}
if self.check_delta(&non_null_samples) {
return CompressionCodec::Delta;
}
if self.check_dictionary(&non_null_samples) {
return CompressionCodec::Dictionary;
}
CompressionCodec::Raw
}
fn check_rle(&self, values: &[&Value]) -> bool {
if values.len() < 10 {
return false; }
let mut total_run_length = 0;
let mut current_run = 1;
for i in 1..values.len() {
if values[i] == values[i - 1] {
current_run += 1;
} else {
if current_run >= 3 {
total_run_length += current_run;
}
current_run = 1;
}
}
if current_run >= 3 {
total_run_length += current_run;
}
total_run_length as f64 / values.len() as f64 > 0.3
}
fn check_delta(&self, values: &[&Value]) -> bool {
if values.len() < 10 {
return false;
}
let numeric_values: Vec<f64> = values
.iter()
.filter_map(|v| {
if let Value::Number(n) = v {
n.as_f64()
} else {
None
}
})
.collect();
if numeric_values.len() < 10 {
return false;
}
let is_ascending = numeric_values.windows(2).all(|w| w[0] <= w[1]);
let is_descending = numeric_values.windows(2).all(|w| w[0] >= w[1]);
is_ascending || is_descending
}
fn check_dictionary(&self, values: &[&Value]) -> bool {
if values.len() < 20 {
return false;
}
let mut unique_counts: std::collections::HashMap<String, usize> =
std::collections::HashMap::new();
for val in values {
*unique_counts.entry(val.to_string()).or_insert(0) += 1;
}
let cardinality = unique_counts.len();
let max_cardinality = (values.len() / 4).max(10);
if cardinality > max_cardinality {
return false;
}
unique_counts.values().any(|&count| count > 1)
}
pub fn analyze(&self) -> CodecAnalysis {
CodecAnalysis {
sample_count: self.samples.len(),
null_count: self.samples.iter().filter(|v| v.is_none()).count(),
unique_values: self.count_unique_values(),
}
}
fn count_unique_values(&self) -> usize {
let mut unique = std::collections::HashSet::new();
for val in self.samples.iter().flatten() {
unique.insert(val.to_string());
}
unique.len()
}
}
#[derive(Debug, Clone)]
pub struct CodecAnalysis {
pub sample_count: usize,
pub null_count: usize,
pub unique_values: usize,
}
impl Default for CodecAnalyzer {
fn default() -> Self {
Self::new(100)
}
}
#[derive(Debug, Clone)]
pub struct DeltaEncoder {
base: i64,
deltas: Vec<i64>,
}
impl DeltaEncoder {
pub fn new(base: i64) -> Self {
Self {
base,
deltas: Vec::new(),
}
}
pub fn encode(&mut self, value: i64) {
let delta = value - self.base;
self.deltas.push(delta);
self.base = value;
}
pub fn deltas(&self) -> &[i64] {
&self.deltas
}
pub fn decode(&self, initial: i64) -> Vec<i64> {
let mut result = vec![initial];
let mut current = initial;
for &delta in &self.deltas {
current += delta;
result.push(current);
}
result
}
}
#[derive(Debug, Clone)]
pub struct DictionaryEncoder {
dictionary: Vec<Value>,
indices: Vec<u32>,
}
impl DictionaryEncoder {
pub fn new() -> Self {
Self {
dictionary: Vec::new(),
indices: Vec::new(),
}
}
pub fn encode(&mut self, value: &Value) -> Result<(), TauqError> {
let idx = if let Some(pos) = self.dictionary.iter().position(|v| v == value) {
pos as u32
} else {
let new_idx = self.dictionary.len() as u32;
self.dictionary.push(value.clone());
new_idx
};
self.indices.push(idx);
Ok(())
}
pub fn dictionary(&self) -> &[Value] {
&self.dictionary
}
pub fn indices(&self) -> &[u32] {
&self.indices
}
pub fn decode(&self) -> Vec<Value> {
self.indices
.iter()
.map(|&idx| self.dictionary[idx as usize].clone())
.collect()
}
}
impl Default for DictionaryEncoder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct RunLengthValue {
pub value: Value,
pub count: u32,
}
#[derive(Debug, Clone)]
pub struct RLEEncoder {
runs: Vec<RunLengthValue>,
}
impl RLEEncoder {
pub fn new() -> Self {
Self { runs: Vec::new() }
}
pub fn encode(&mut self, value: &Value) {
if let Some(last) = self.runs.last_mut()
&& last.value == *value
{
last.count += 1;
return;
}
self.runs.push(RunLengthValue {
value: value.clone(),
count: 1,
});
}
pub fn runs(&self) -> &[RunLengthValue] {
&self.runs
}
pub fn decode(&self) -> Vec<Value> {
let mut result = Vec::new();
for run in &self.runs {
for _ in 0..run.count {
result.push(run.value.clone());
}
}
result
}
}
impl Default for RLEEncoder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_codec_analyzer_rle_detection() {
let mut analyzer = CodecAnalyzer::new(100);
for _ in 0..20 {
analyzer.add_sample(Some(json!(true)));
}
let codec = analyzer.choose_codec();
assert_eq!(codec, CompressionCodec::RunLength);
}
#[test]
fn test_codec_analyzer_delta_detection() {
let mut analyzer = CodecAnalyzer::new(100);
for i in 0..50 {
analyzer.add_sample(Some(json!(i * 10)));
}
let codec = analyzer.choose_codec();
assert_eq!(codec, CompressionCodec::Delta);
}
#[test]
fn test_codec_analyzer_dictionary_detection() {
let mut analyzer = CodecAnalyzer::new(100);
let values = vec!["alice", "bob", "alice", "carol", "bob", "alice"];
for _ in 0..5 {
for v in &values {
analyzer.add_sample(Some(json!(v)));
}
}
let codec = analyzer.choose_codec();
assert_eq!(codec, CompressionCodec::Dictionary);
}
#[test]
fn test_delta_encoder() {
let mut encoder = DeltaEncoder::new(100);
encoder.encode(102);
encoder.encode(105);
encoder.encode(107);
assert_eq!(encoder.deltas(), &[2, 3, 2]);
let reconstructed = encoder.decode(100);
assert_eq!(reconstructed, vec![100, 102, 105, 107]);
}
#[test]
fn test_dictionary_encoder() {
let mut encoder = DictionaryEncoder::new();
encoder.encode(&json!("alice")).unwrap();
encoder.encode(&json!("bob")).unwrap();
encoder.encode(&json!("alice")).unwrap();
encoder.encode(&json!("carol")).unwrap();
assert_eq!(encoder.dictionary().len(), 3);
assert_eq!(encoder.indices(), &[0, 1, 0, 2]);
let reconstructed = encoder.decode();
assert_eq!(
reconstructed,
vec![json!("alice"), json!("bob"), json!("alice"), json!("carol"),]
);
}
#[test]
fn test_rle_encoder() {
let mut encoder = RLEEncoder::new();
encoder.encode(&json!(true));
encoder.encode(&json!(true));
encoder.encode(&json!(true));
encoder.encode(&json!(false));
encoder.encode(&json!(false));
encoder.encode(&json!(true));
assert_eq!(encoder.runs().len(), 3);
assert_eq!(encoder.runs()[0].count, 3);
assert_eq!(encoder.runs()[1].count, 2);
assert_eq!(encoder.runs()[2].count, 1);
let reconstructed = encoder.decode();
assert_eq!(
reconstructed,
vec![
json!(true),
json!(true),
json!(true),
json!(false),
json!(false),
json!(true),
]
);
}
#[test]
fn test_codec_analysis() {
let mut analyzer = CodecAnalyzer::new(50);
for i in 0..30 {
analyzer.add_sample(Some(json!(i)));
}
let analysis = analyzer.analyze();
assert_eq!(analysis.sample_count, 30);
assert_eq!(analysis.null_count, 0);
assert_eq!(analysis.unique_values, 30);
}
#[test]
fn test_codec_analysis_with_nulls() {
let mut analyzer = CodecAnalyzer::new(50);
for i in 0..30 {
if i % 5 == 0 {
analyzer.add_sample(None);
} else {
analyzer.add_sample(Some(json!(i)));
}
}
let analysis = analyzer.analyze();
assert_eq!(analysis.null_count, 6); }
#[test]
fn test_raw_codec_default() {
let analyzer = CodecAnalyzer::new(100);
let codec = analyzer.choose_codec();
assert_eq!(codec, CompressionCodec::Raw); }
#[test]
fn test_compression_codec_from_u8() {
assert_eq!(CompressionCodec::from_u8(0), Some(CompressionCodec::Raw));
assert_eq!(CompressionCodec::from_u8(1), Some(CompressionCodec::Delta));
assert_eq!(
CompressionCodec::from_u8(2),
Some(CompressionCodec::Dictionary)
);
assert_eq!(
CompressionCodec::from_u8(3),
Some(CompressionCodec::RunLength)
);
assert_eq!(CompressionCodec::from_u8(99), None);
}
#[test]
fn test_delta_encoder_empty() {
let encoder = DeltaEncoder::new(100);
assert!(encoder.deltas().is_empty());
let reconstructed = encoder.decode(100);
assert_eq!(reconstructed, vec![100]);
}
#[test]
fn test_dictionary_encoder_single_value() {
let mut encoder = DictionaryEncoder::new();
encoder.encode(&json!(42)).unwrap();
encoder.encode(&json!(42)).unwrap();
encoder.encode(&json!(42)).unwrap();
assert_eq!(encoder.dictionary().len(), 1);
assert_eq!(encoder.indices(), &[0, 0, 0]);
}
}