use super::transform::TransformResult;
const COL_SEP: u8 = 0x00;
const VAL_SEP: u8 = 0x01;
const DICT_CODE_START: u8 = 0x02;
const MAX_DICT_ENTRIES: usize = 253; const RAW_COLUMN_MARKER: u8 = 0xFF;
const VDICT_VERSION: u8 = 1;
pub fn preprocess(data: &[u8]) -> Option<TransformResult> {
if data.is_empty() {
return None;
}
let columns: Vec<&[u8]> = split_columns(data);
if columns.is_empty() {
return None;
}
let mut col_analyses: Vec<ColumnAnalysis> = Vec::with_capacity(columns.len());
let mut any_dictable = false;
for col_data in &columns {
let analysis = analyze_column(col_data);
if analysis.should_dict {
any_dictable = true;
}
col_analyses.push(analysis);
}
if !any_dictable {
return None; }
let mut output = Vec::with_capacity(data.len());
let mut metadata = Vec::new();
metadata.push(VDICT_VERSION);
metadata.extend_from_slice(&(columns.len() as u16).to_le_bytes());
for (ci, (col_data, analysis)) in columns.iter().zip(col_analyses.iter()).enumerate() {
if analysis.should_dict {
let dict = &analysis.dictionary;
metadata.push(dict.len() as u8);
for entry in dict {
metadata.extend_from_slice(&(entry.len() as u32).to_le_bytes());
metadata.extend_from_slice(entry);
}
let values = split_values(col_data);
for (vi, val) in values.iter().enumerate() {
if let Some(idx) = dict.iter().position(|d| d.as_slice() == *val) {
output.push(DICT_CODE_START + idx as u8);
} else {
output.push(DICT_CODE_START);
}
if vi < values.len() - 1 {
output.push(VAL_SEP);
}
}
} else {
metadata.push(RAW_COLUMN_MARKER);
output.extend_from_slice(col_data);
}
if ci < columns.len() - 1 {
output.push(COL_SEP);
}
}
if output.len() + metadata.len() >= data.len() {
return None;
}
Some(TransformResult {
data: output,
metadata,
})
}
pub fn reverse(data: &[u8], metadata: &[u8]) -> Vec<u8> {
if metadata.is_empty() {
return data.to_vec();
}
let mut mpos = 0;
let _version = metadata[mpos];
mpos += 1;
if mpos + 2 > metadata.len() {
return data.to_vec();
}
let num_cols = u16::from_le_bytes(metadata[mpos..mpos + 2].try_into().unwrap()) as usize;
mpos += 2;
let mut dictionaries: Vec<Option<Vec<Vec<u8>>>> = Vec::with_capacity(num_cols);
for _ in 0..num_cols {
if mpos >= metadata.len() {
return data.to_vec();
}
let marker = metadata[mpos];
mpos += 1;
if marker == RAW_COLUMN_MARKER {
dictionaries.push(None);
} else {
let num_entries = marker as usize;
let mut dict = Vec::with_capacity(num_entries);
for _ in 0..num_entries {
if mpos + 4 > metadata.len() {
return data.to_vec();
}
let val_len =
u32::from_le_bytes(metadata[mpos..mpos + 4].try_into().unwrap()) as usize;
mpos += 4;
if mpos + val_len > metadata.len() {
return data.to_vec();
}
dict.push(metadata[mpos..mpos + val_len].to_vec());
mpos += val_len;
}
dictionaries.push(Some(dict));
}
}
let encoded_columns = split_columns(data);
let mut output = Vec::with_capacity(data.len() * 2); for (ci, dict_opt) in dictionaries.iter().enumerate() {
if ci >= encoded_columns.len() {
break;
}
let col_data = encoded_columns[ci];
if let Some(dict) = dict_opt {
let values = split_values(col_data);
for (vi, val) in values.iter().enumerate() {
if val.len() == 1 && val[0] >= DICT_CODE_START {
let idx = (val[0] - DICT_CODE_START) as usize;
if idx < dict.len() {
output.extend_from_slice(&dict[idx]);
}
} else {
output.extend_from_slice(val);
}
if vi < values.len() - 1 {
output.push(VAL_SEP);
}
}
} else {
output.extend_from_slice(col_data);
}
if ci < dictionaries.len() - 1 {
output.push(COL_SEP);
}
}
output
}
fn split_columns(data: &[u8]) -> Vec<&[u8]> {
let mut columns = Vec::new();
let mut start = 0;
for i in 0..data.len() {
if data[i] == COL_SEP {
columns.push(&data[start..i]);
start = i + 1;
}
}
if start <= data.len() {
columns.push(&data[start..]);
}
columns
}
fn split_values(col: &[u8]) -> Vec<&[u8]> {
let mut values = Vec::new();
let mut start = 0;
for i in 0..col.len() {
if col[i] == VAL_SEP {
values.push(&col[start..i]);
start = i + 1;
}
}
if start <= col.len() {
values.push(&col[start..]);
}
values
}
struct ColumnAnalysis {
should_dict: bool,
dictionary: Vec<Vec<u8>>,
}
fn analyze_column(col_data: &[u8]) -> ColumnAnalysis {
let values = split_values(col_data);
if values.is_empty() {
return ColumnAnalysis {
should_dict: false,
dictionary: Vec::new(),
};
}
let mut freq: std::collections::HashMap<&[u8], usize> = std::collections::HashMap::new();
for val in &values {
*freq.entry(*val).or_insert(0) += 1;
}
let unique_count = freq.len();
if unique_count > MAX_DICT_ENTRIES {
return ColumnAnalysis {
should_dict: false,
dictionary: Vec::new(),
};
}
let current_size = col_data.len();
let dict_overhead: usize = freq.keys().map(|k| 4 + k.len()).sum::<usize>() + 1; let encoded_data_size = values.len() + values.len().saturating_sub(1);
let dict_total = encoded_data_size + dict_overhead;
let min_savings = current_size / 10;
if current_size <= dict_total + min_savings {
return ColumnAnalysis {
should_dict: false,
dictionary: Vec::new(),
};
}
let mut sorted: Vec<(&[u8], usize)> = freq.into_iter().collect();
sorted.sort_by(|a, b| b.1.cmp(&a.1));
let dictionary: Vec<Vec<u8>> = sorted.into_iter().map(|(k, _)| k.to_vec()).collect();
ColumnAnalysis {
should_dict: true,
dictionary,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn roundtrip_simple() {
let mut col0 = Vec::new();
let mut col1 = Vec::new();
let mut col2 = Vec::new();
for i in 0..20 {
if i > 0 {
col0.push(VAL_SEP);
col1.push(VAL_SEP);
col2.push(VAL_SEP);
}
col0.extend_from_slice(if i % 3 == 0 {
b"page_view"
} else {
b"api_call"
});
col1.extend_from_slice(if i % 2 == 0 { b"alice" } else { b"bob" });
col2.extend_from_slice(format!("unique{i}").as_bytes());
}
let mut input = col0;
input.push(COL_SEP);
input.extend_from_slice(&col1);
input.push(COL_SEP);
input.extend_from_slice(&col2);
let result = preprocess(&input);
assert!(result.is_some(), "should apply dict transform");
let result = result.unwrap();
let recovered = reverse(&result.data, &result.metadata);
assert_eq!(recovered, input, "roundtrip failed");
}
#[test]
fn roundtrip_all_unique() {
let input = b"a\x01b\x01c\x00d\x01e\x01f";
let result = preprocess(input);
assert!(result.is_none(), "should not apply dict when all unique");
}
#[test]
fn roundtrip_single_column() {
let input = b"hello\x01hello\x01hello\x01hello\x01hello\x01hello\x01hello\x01hello\x01hello\x01hello";
let result = preprocess(input);
assert!(result.is_some(), "should apply dict for repeated values");
let result = result.unwrap();
assert!(result.data.len() < input.len(), "should be smaller");
let recovered = reverse(&result.data, &result.metadata);
assert_eq!(recovered, input.to_vec(), "roundtrip failed");
}
#[test]
fn saves_space() {
let mut col_data = Vec::new();
for i in 0..100 {
if i > 0 {
col_data.push(VAL_SEP);
}
let val = match i % 5 {
0 => b"page_view".as_slice(),
1 => b"api_call".as_slice(),
2 => b"click".as_slice(),
3 => b"scroll".as_slice(),
_ => b"form_submit".as_slice(),
};
col_data.extend_from_slice(val);
}
let result = preprocess(&col_data);
assert!(result.is_some());
let result = result.unwrap();
let saving_pct = (col_data.len() - result.data.len()) * 100 / col_data.len();
assert!(
saving_pct > 70,
"should save >70% on repeated values, got {saving_pct}%"
);
let recovered = reverse(&result.data, &result.metadata);
assert_eq!(recovered, col_data, "roundtrip failed");
}
}