use super::adaptive_encode::{
CodecAnalyzer, CompressionCodec, DeltaEncoder, DictionaryEncoder, RLEEncoder,
};
use crate::error::TauqError;
use serde_json::Value;
#[derive(Debug, Clone)]
pub struct CodecEncodingContext {
pub analyzer: CodecAnalyzer,
pub selected_codec: Option<CompressionCodec>,
pub sample_threshold: usize,
pub items_collected: usize,
pub delta_encoder: Option<DeltaEncoder>,
pub dict_encoder: Option<DictionaryEncoder>,
pub rle_encoder: Option<RLEEncoder>,
}
impl CodecEncodingContext {
pub fn new(sample_threshold: usize) -> Self {
Self {
analyzer: CodecAnalyzer::new(sample_threshold),
selected_codec: None,
sample_threshold,
items_collected: 0,
delta_encoder: None,
dict_encoder: None,
rle_encoder: None,
}
}
pub fn add_sample(&mut self, value: Option<&Value>) {
if self.selected_codec.is_none() && self.items_collected < self.sample_threshold {
self.analyzer.add_sample(value.cloned());
self.items_collected += 1;
if self.items_collected >= self.sample_threshold {
self.selected_codec = Some(self.analyzer.choose_codec());
self.initialize_codec_encoder();
}
}
}
fn initialize_codec_encoder(&mut self) {
match self.selected_codec {
Some(CompressionCodec::Delta) => {
self.delta_encoder = Some(DeltaEncoder::new(0));
}
Some(CompressionCodec::Dictionary) => {
self.dict_encoder = Some(DictionaryEncoder::new());
}
Some(CompressionCodec::RunLength) => {
self.rle_encoder = Some(RLEEncoder::new());
}
_ => {
}
}
}
pub fn is_codec_selected(&self) -> bool {
self.selected_codec.is_some()
}
pub fn get_selected_codec(&self) -> Option<CompressionCodec> {
self.selected_codec
}
pub fn encode_value(&mut self, value: &Value) -> Result<(), TauqError> {
match self.selected_codec {
Some(CompressionCodec::Delta) => {
if let Some(num) = value.as_i64() {
if let Some(ref mut encoder) = self.delta_encoder {
encoder.encode(num);
Ok(())
} else {
Err(TauqError::Interpret(crate::error::InterpretError::new(
"Delta encoder not initialized",
)))
}
} else {
Ok(())
}
}
Some(CompressionCodec::Dictionary) => {
if let Some(ref mut encoder) = self.dict_encoder {
encoder.encode(value)
} else {
Err(TauqError::Interpret(crate::error::InterpretError::new(
"Dictionary encoder not initialized",
)))
}
}
Some(CompressionCodec::RunLength) => {
if let Some(ref mut encoder) = self.rle_encoder {
encoder.encode(value);
Ok(())
} else {
Err(TauqError::Interpret(crate::error::InterpretError::new(
"RLE encoder not initialized",
)))
}
}
Some(CompressionCodec::Raw) | None => {
Ok(())
}
}
}
pub fn get_codec_metadata(&self) -> CodecMetadata {
match self.selected_codec {
Some(CompressionCodec::Delta) => {
if let Some(_encoder) = &self.delta_encoder {
CodecMetadata::Delta {
initial_value: 0i64,
}
} else {
CodecMetadata::None
}
}
Some(CompressionCodec::Dictionary) => {
if let Some(encoder) = &self.dict_encoder {
CodecMetadata::Dictionary {
dictionary_size: encoder.dictionary().len() as u32,
}
} else {
CodecMetadata::None
}
}
Some(CompressionCodec::RunLength) => CodecMetadata::RLE,
Some(CompressionCodec::Raw) | None => CodecMetadata::None,
}
}
}
#[derive(Debug, Clone)]
pub enum CodecMetadata {
None,
Delta {
initial_value: i64,
},
Dictionary {
dictionary_size: u32,
},
RLE,
}
impl CodecMetadata {
pub fn encode(&self) -> Vec<u8> {
match self {
CodecMetadata::None => vec![],
CodecMetadata::Delta { initial_value } => {
let mut buf = Vec::new();
super::varint::encode_signed_varint(*initial_value, &mut buf);
buf
}
CodecMetadata::Dictionary { dictionary_size } => {
let mut buf = Vec::new();
super::varint::encode_varint(*dictionary_size as u64, &mut buf);
buf
}
CodecMetadata::RLE => vec![],
}
}
pub fn size(&self) -> usize {
self.encode().len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_codec_encoding_context_creation() {
let ctx = CodecEncodingContext::new(100);
assert_eq!(ctx.sample_threshold, 100);
assert_eq!(ctx.items_collected, 0);
assert!(ctx.selected_codec.is_none());
}
#[test]
fn test_sampling_and_codec_selection() {
let mut ctx = CodecEncodingContext::new(10);
for i in 0..10 {
ctx.add_sample(Some(&json!(i)));
}
assert!(ctx.is_codec_selected());
assert_eq!(ctx.selected_codec, Some(CompressionCodec::Delta));
}
#[test]
fn test_delta_encoding() {
let mut ctx = CodecEncodingContext::new(5);
for i in 0..5 {
ctx.add_sample(Some(&json!(i)));
}
assert!(ctx.encode_value(&json!(0)).is_ok());
assert!(ctx.encode_value(&json!(2)).is_ok());
assert!(ctx.encode_value(&json!(5)).is_ok());
}
#[test]
fn test_dictionary_encoding() {
let mut ctx = CodecEncodingContext::new(20);
for _ in 0..5 {
ctx.add_sample(Some(&json!("alice")));
ctx.add_sample(Some(&json!("bob")));
ctx.add_sample(Some(&json!("carol")));
ctx.add_sample(Some(&json!("alice")));
}
assert!(ctx.is_codec_selected());
assert_eq!(ctx.selected_codec, Some(CompressionCodec::Dictionary));
assert!(ctx.encode_value(&json!("alice")).is_ok());
assert!(ctx.encode_value(&json!("bob")).is_ok());
assert!(ctx.encode_value(&json!("alice")).is_ok());
}
#[test]
fn test_rle_encoding() {
let mut ctx = CodecEncodingContext::new(20);
for _ in 0..6 {
ctx.add_sample(Some(&json!(true)));
}
for _ in 0..8 {
ctx.add_sample(Some(&json!(false)));
}
for _ in 0..6 {
ctx.add_sample(Some(&json!(true)));
}
assert!(ctx.is_codec_selected());
assert_eq!(ctx.selected_codec, Some(CompressionCodec::RunLength));
assert!(ctx.encode_value(&json!(true)).is_ok());
assert!(ctx.encode_value(&json!(true)).is_ok());
}
#[test]
fn test_codec_metadata_encode() {
let metadata = CodecMetadata::Delta {
initial_value: 100i64,
};
let encoded = metadata.encode();
assert!(!encoded.is_empty());
let metadata2 = CodecMetadata::Dictionary {
dictionary_size: 50,
};
let encoded2 = metadata2.encode();
assert!(!encoded2.is_empty());
}
#[test]
fn test_codec_metadata_size() {
let metadata = CodecMetadata::Delta {
initial_value: 42i64,
};
let size = metadata.size();
let encoded_size = metadata.encode().len();
assert_eq!(size, encoded_size);
}
#[test]
fn test_no_codec_metadata() {
let metadata = CodecMetadata::None;
assert_eq!(metadata.size(), 0);
assert!(metadata.encode().is_empty());
let rle_metadata = CodecMetadata::RLE;
assert_eq!(rle_metadata.size(), 0);
}
#[test]
fn test_non_numeric_delta_fallback() {
let mut ctx = CodecEncodingContext::new(5);
for i in 0..5 {
ctx.add_sample(Some(&json!(i)));
}
let result = ctx.encode_value(&json!("not a number"));
assert!(result.is_ok() || result.is_err());
}
#[test]
fn test_codec_encoder_initialization() {
let mut ctx = CodecEncodingContext::new(10);
for i in 0..10 {
ctx.add_sample(Some(&json!(i)));
}
assert!(ctx.delta_encoder.is_some());
assert!(ctx.dict_encoder.is_none());
assert!(ctx.rle_encoder.is_none());
}
}