use crate::storage::timeseries::compression::{
delta_decode_timestamps, delta_encode_timestamps, t64_decode, t64_encode, xor_decode_values,
xor_encode_values, zstd_compress, zstd_decompress,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ColumnCodec {
None,
Lz4,
Zstd {
level: i32,
},
Delta,
DoubleDelta,
Dict,
Xor,
}
impl ColumnCodec {
pub fn tag(&self) -> u8 {
match self {
ColumnCodec::None => 0,
ColumnCodec::Lz4 => 1,
ColumnCodec::Zstd { .. } => 2,
ColumnCodec::Delta => 3,
ColumnCodec::DoubleDelta => 4,
ColumnCodec::Dict => 5,
ColumnCodec::Xor => 6,
}
}
pub fn from_tag(tag: u8) -> Option<ColumnCodec> {
match tag {
0 => Some(ColumnCodec::None),
1 => Some(ColumnCodec::Lz4),
2 => Some(ColumnCodec::Zstd { level: 3 }),
3 => Some(ColumnCodec::Delta),
4 => Some(ColumnCodec::DoubleDelta),
5 => Some(ColumnCodec::Dict),
6 => Some(ColumnCodec::Xor),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ColumnSemantics {
Timestamp,
Gauge,
Counter,
LowCardinality,
Generic,
}
pub fn select_codecs(logical_type: u8, semantics: ColumnSemantics) -> Vec<ColumnCodec> {
let zstd = ColumnCodec::Zstd { level: 3 };
match semantics {
ColumnSemantics::Timestamp => vec![ColumnCodec::DoubleDelta, zstd],
ColumnSemantics::Gauge => vec![ColumnCodec::Xor, zstd],
ColumnSemantics::Counter => vec![ColumnCodec::Delta, zstd],
ColumnSemantics::LowCardinality => vec![ColumnCodec::Dict, zstd],
ColumnSemantics::Generic => {
let _ = logical_type;
vec![zstd]
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CodecError {
Truncated,
UnknownCodec(u8),
InvalidPayload(&'static str),
Backend(String),
}
impl std::fmt::Display for CodecError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CodecError::Truncated => write!(f, "codec buffer truncated"),
CodecError::UnknownCodec(t) => write!(f, "unknown column codec tag: {t}"),
CodecError::InvalidPayload(why) => write!(f, "invalid codec payload: {why}"),
CodecError::Backend(msg) => write!(f, "codec backend error: {msg}"),
}
}
}
impl std::error::Error for CodecError {}
pub type CodecResult<T> = Result<T, CodecError>;
pub fn encode_bytes(codecs: &[ColumnCodec], raw: &[u8]) -> CodecResult<Vec<u8>> {
let mut buf = raw.to_vec();
for codec in codecs {
buf = apply_encode(codec, &buf)?;
}
let mut out = Vec::with_capacity(buf.len() + codecs.len() * 2 + 2);
out.extend_from_slice(&(codecs.len() as u16).to_le_bytes());
for codec in codecs {
out.push(codec.tag());
if let ColumnCodec::Zstd { level } = codec {
out.extend_from_slice(&level.to_le_bytes());
}
}
out.extend_from_slice(&buf);
Ok(out)
}
pub fn decode_bytes(buf: &[u8]) -> CodecResult<Vec<u8>> {
if buf.len() < 2 {
return Err(CodecError::Truncated);
}
let count = u16::from_le_bytes([buf[0], buf[1]]) as usize;
let mut cursor = 2;
let mut codecs: Vec<ColumnCodec> = Vec::with_capacity(count);
for _ in 0..count {
if cursor >= buf.len() {
return Err(CodecError::Truncated);
}
let tag = buf[cursor];
cursor += 1;
let codec = match tag {
2 => {
if cursor + 4 > buf.len() {
return Err(CodecError::Truncated);
}
let level = i32::from_le_bytes(buf[cursor..cursor + 4].try_into().unwrap());
cursor += 4;
ColumnCodec::Zstd { level }
}
other => ColumnCodec::from_tag(other).ok_or(CodecError::UnknownCodec(other))?,
};
codecs.push(codec);
}
let mut payload = buf[cursor..].to_vec();
for codec in codecs.iter().rev() {
payload = apply_decode(codec, &payload)?;
}
Ok(payload)
}
fn apply_encode(codec: &ColumnCodec, data: &[u8]) -> CodecResult<Vec<u8>> {
match codec {
ColumnCodec::None => Ok(data.to_vec()),
ColumnCodec::Lz4 => {
let mut out = (data.len() as u32).to_le_bytes().to_vec();
out.extend(lz4_flex::compress(data));
Ok(out)
}
ColumnCodec::Zstd { level } => Ok(zstd_compress_at_inner(data, *level)),
ColumnCodec::Delta | ColumnCodec::DoubleDelta => {
if !data.len().is_multiple_of(8) {
return Err(CodecError::InvalidPayload("delta expects i64 stream"));
}
let values: Vec<u64> = data
.chunks_exact(8)
.map(|c| u64::from_le_bytes(c.try_into().unwrap()))
.collect();
let encoded = delta_encode_timestamps(&values);
let mut out = Vec::with_capacity(4 + encoded.len() * 8);
out.extend_from_slice(&(encoded.len() as u32).to_le_bytes());
for v in encoded {
out.extend_from_slice(&v.to_le_bytes());
}
Ok(out)
}
ColumnCodec::Xor => {
if !data.len().is_multiple_of(8) {
return Err(CodecError::InvalidPayload("xor expects f64 stream"));
}
let values: Vec<f64> = data
.chunks_exact(8)
.map(|c| f64::from_le_bytes(c.try_into().unwrap()))
.collect();
let encoded = xor_encode_values(&values);
let mut out = Vec::with_capacity(4 + encoded.len() * 8);
out.extend_from_slice(&(encoded.len() as u32).to_le_bytes());
for v in encoded {
out.extend_from_slice(&v.to_le_bytes());
}
Ok(out)
}
ColumnCodec::Dict => encode_dict(data),
}
}
fn apply_decode(codec: &ColumnCodec, data: &[u8]) -> CodecResult<Vec<u8>> {
match codec {
ColumnCodec::None => Ok(data.to_vec()),
ColumnCodec::Lz4 => {
if data.len() < 4 {
return Err(CodecError::Truncated);
}
let raw_len = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
lz4_flex::decompress(&data[4..], raw_len)
.map_err(|e| CodecError::Backend(e.to_string()))
}
ColumnCodec::Zstd { .. } => {
zstd_decompress(data).ok_or(CodecError::InvalidPayload("zstd payload malformed"))
}
ColumnCodec::Delta | ColumnCodec::DoubleDelta => {
if data.len() < 4 {
return Err(CodecError::Truncated);
}
let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
let payload = &data[4..];
if payload.len() < count * 8 {
return Err(CodecError::Truncated);
}
let encoded: Vec<i64> = payload
.chunks_exact(8)
.take(count)
.map(|c| i64::from_le_bytes(c.try_into().unwrap()))
.collect();
let decoded = delta_decode_timestamps(&encoded);
let mut out = Vec::with_capacity(decoded.len() * 8);
for v in decoded {
out.extend_from_slice(&v.to_le_bytes());
}
Ok(out)
}
ColumnCodec::Xor => {
if data.len() < 4 {
return Err(CodecError::Truncated);
}
let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
let payload = &data[4..];
if payload.len() < count * 8 {
return Err(CodecError::Truncated);
}
let encoded: Vec<u64> = payload
.chunks_exact(8)
.take(count)
.map(|c| u64::from_le_bytes(c.try_into().unwrap()))
.collect();
let decoded = xor_decode_values(&encoded);
let mut out = Vec::with_capacity(decoded.len() * 8);
for v in decoded {
out.extend_from_slice(&v.to_le_bytes());
}
Ok(out)
}
ColumnCodec::Dict => decode_dict(data),
}
}
fn zstd_compress_at_inner(data: &[u8], level: i32) -> Vec<u8> {
use crate::storage::timeseries::compression::zstd_compress_at;
zstd_compress_at(data, level)
}
fn encode_dict(data: &[u8]) -> CodecResult<Vec<u8>> {
if data.len() < 4 {
return Err(CodecError::Truncated);
}
let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
let mut cursor = 4usize;
let mut dict: Vec<Vec<u8>> = Vec::new();
let mut indexes: Vec<u32> = Vec::with_capacity(count);
for _ in 0..count {
if cursor + 2 > data.len() {
return Err(CodecError::Truncated);
}
let len = u16::from_le_bytes(data[cursor..cursor + 2].try_into().unwrap()) as usize;
cursor += 2;
if cursor + len > data.len() {
return Err(CodecError::Truncated);
}
let slice = &data[cursor..cursor + len];
cursor += len;
let idx = match dict.iter().position(|v| v == slice) {
Some(p) => p as u32,
None => {
dict.push(slice.to_vec());
(dict.len() - 1) as u32
}
};
indexes.push(idx);
}
let mut out = Vec::new();
out.extend_from_slice(&(dict.len() as u32).to_le_bytes());
for entry in &dict {
out.extend_from_slice(&(entry.len() as u16).to_le_bytes());
out.extend_from_slice(entry);
}
out.extend_from_slice(&(indexes.len() as u32).to_le_bytes());
for idx in &indexes {
out.extend_from_slice(&idx.to_le_bytes());
}
Ok(out)
}
fn decode_dict(data: &[u8]) -> CodecResult<Vec<u8>> {
if data.len() < 4 {
return Err(CodecError::Truncated);
}
let dict_count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
let mut cursor = 4usize;
let mut dict: Vec<Vec<u8>> = Vec::with_capacity(dict_count);
for _ in 0..dict_count {
if cursor + 2 > data.len() {
return Err(CodecError::Truncated);
}
let len = u16::from_le_bytes(data[cursor..cursor + 2].try_into().unwrap()) as usize;
cursor += 2;
if cursor + len > data.len() {
return Err(CodecError::Truncated);
}
dict.push(data[cursor..cursor + len].to_vec());
cursor += len;
}
if cursor + 4 > data.len() {
return Err(CodecError::Truncated);
}
let idx_count = u32::from_le_bytes(data[cursor..cursor + 4].try_into().unwrap()) as usize;
cursor += 4;
if cursor + idx_count * 4 > data.len() {
return Err(CodecError::Truncated);
}
let mut out = Vec::new();
out.extend_from_slice(&(idx_count as u32).to_le_bytes());
for i in 0..idx_count {
let idx = u32::from_le_bytes(data[cursor + i * 4..cursor + i * 4 + 4].try_into().unwrap())
as usize;
if idx >= dict.len() {
return Err(CodecError::InvalidPayload("dict index out of range"));
}
let entry = &dict[idx];
out.extend_from_slice(&(entry.len() as u16).to_le_bytes());
out.extend_from_slice(entry);
}
Ok(out)
}
pub fn encode_delta_t64_zstd(values: &[i64]) -> (Vec<u8>, usize) {
let (t64_bytes, len) = t64_encode(values);
let compressed = zstd_compress(&t64_bytes);
(compressed, len)
}
pub fn decode_delta_t64_zstd(bytes: &[u8], len: usize) -> CodecResult<Vec<i64>> {
let raw = zstd_decompress(bytes).ok_or(CodecError::InvalidPayload(
"delta+t64 zstd envelope malformed",
))?;
t64_decode(&raw, len).ok_or(CodecError::InvalidPayload("t64 body malformed"))
}
#[cfg(test)]
mod tests {
use super::*;
fn str_stream(items: &[&str]) -> Vec<u8> {
let mut out = Vec::new();
out.extend_from_slice(&(items.len() as u32).to_le_bytes());
for s in items {
let bytes = s.as_bytes();
out.extend_from_slice(&(bytes.len() as u16).to_le_bytes());
out.extend_from_slice(bytes);
}
out
}
#[test]
fn none_codec_is_pure_passthrough() {
let raw = b"hello world".to_vec();
let encoded = encode_bytes(&[ColumnCodec::None], &raw).unwrap();
let decoded = decode_bytes(&encoded).unwrap();
assert_eq!(decoded, raw);
}
#[test]
fn lz4_round_trips() {
let raw: Vec<u8> = (0..4096).map(|i| (i % 19) as u8).collect();
let encoded = encode_bytes(&[ColumnCodec::Lz4], &raw).unwrap();
assert!(encoded.len() < raw.len());
let decoded = decode_bytes(&encoded).unwrap();
assert_eq!(decoded, raw);
}
#[test]
fn zstd_round_trips_with_explicit_level() {
let raw: Vec<u8> = (0..4096).map(|i| (i % 7) as u8).collect();
let encoded = encode_bytes(&[ColumnCodec::Zstd { level: 6 }], &raw).unwrap();
let decoded = decode_bytes(&encoded).unwrap();
assert_eq!(decoded, raw);
}
#[test]
fn lz4_then_zstd_chains_both_codecs() {
let raw: Vec<u8> = (0..4096).map(|i| (i as u8).wrapping_mul(17)).collect();
let encoded =
encode_bytes(&[ColumnCodec::Lz4, ColumnCodec::Zstd { level: 3 }], &raw).unwrap();
let decoded = decode_bytes(&encoded).unwrap();
assert_eq!(decoded, raw);
}
#[test]
fn delta_codec_round_trips_timestamps_as_bytes() {
let values: Vec<u64> = (0..1000).map(|i| 1_700_000_000 + i * 1000).collect();
let raw: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
let encoded = encode_bytes(&[ColumnCodec::Delta], &raw).unwrap();
let decoded = decode_bytes(&encoded).unwrap();
assert_eq!(decoded, raw);
}
#[test]
fn dict_codec_compresses_repeated_strings() {
let raw = str_stream(&[
"us-east-1",
"us-east-1",
"eu-west-1",
"us-east-1",
"apac-south-1",
"eu-west-1",
]);
let encoded = encode_bytes(&[ColumnCodec::Dict], &raw).unwrap();
assert!(encoded.len() < raw.len());
let decoded = decode_bytes(&encoded).unwrap();
assert_eq!(decoded, raw);
}
#[test]
fn decode_rejects_unknown_codec_tag() {
let mut buf = 1u16.to_le_bytes().to_vec();
buf.push(99); buf.push(0); let err = decode_bytes(&buf).unwrap_err();
assert!(matches!(err, CodecError::UnknownCodec(99)));
}
#[test]
fn decode_rejects_truncated_header() {
assert!(decode_bytes(&[]).is_err());
assert!(decode_bytes(&[0u8]).is_err());
}
#[test]
fn typed_delta_t64_zstd_round_trips() {
let values: Vec<i64> = (0..10_000).map(|i| 42 + i).collect();
let (encoded, len) = encode_delta_t64_zstd(&values);
assert_eq!(len, values.len());
assert!(encoded.len() < values.len() * 8 / 2);
let decoded = decode_delta_t64_zstd(&encoded, len).unwrap();
assert_eq!(decoded, values);
}
#[test]
fn delta_codec_rejects_non_multiple_of_eight() {
let encoded = encode_bytes(&[ColumnCodec::Delta], &[1u8, 2, 3]);
assert!(encoded.is_err());
}
fn f64_stream(values: &[f64]) -> Vec<u8> {
values.iter().flat_map(|v| v.to_le_bytes()).collect()
}
fn u64_stream(values: &[u64]) -> Vec<u8> {
values.iter().flat_map(|v| v.to_le_bytes()).collect()
}
#[test]
fn xor_codec_round_trips_gauge_floats() {
let values: Vec<f64> = (0..2000).map(|i| 95.0 + (i % 13) as f64 * 0.125).collect();
let raw = f64_stream(&values);
let encoded = encode_bytes(&[ColumnCodec::Xor], &raw).unwrap();
let decoded = decode_bytes(&encoded).unwrap();
assert_eq!(decoded, raw);
}
#[test]
fn xor_codec_is_lossless_for_special_floats() {
let values = vec![
f64::NAN,
f64::INFINITY,
f64::NEG_INFINITY,
0.0,
-0.0,
1.5,
-1.5,
];
let raw = f64_stream(&values);
let encoded = encode_bytes(&[ColumnCodec::Xor], &raw).unwrap();
let decoded = decode_bytes(&encoded).unwrap();
assert_eq!(decoded, raw);
}
#[test]
fn xor_codec_rejects_non_multiple_of_eight() {
assert!(encode_bytes(&[ColumnCodec::Xor], &[1u8, 2, 3]).is_err());
}
#[test]
fn xor_tag_round_trips() {
assert_eq!(
ColumnCodec::from_tag(ColumnCodec::Xor.tag()),
Some(ColumnCodec::Xor)
);
}
#[test]
fn select_codecs_maps_semantics_to_expected_chains() {
let z = ColumnCodec::Zstd { level: 3 };
assert_eq!(
select_codecs(0, ColumnSemantics::Timestamp),
vec![ColumnCodec::DoubleDelta, z.clone()]
);
assert_eq!(
select_codecs(0, ColumnSemantics::Gauge),
vec![ColumnCodec::Xor, z.clone()]
);
assert_eq!(
select_codecs(0, ColumnSemantics::Counter),
vec![ColumnCodec::Delta, z.clone()]
);
assert_eq!(
select_codecs(0, ColumnSemantics::LowCardinality),
vec![ColumnCodec::Dict, z.clone()]
);
assert_eq!(select_codecs(0, ColumnSemantics::Generic), vec![z]);
}
#[test]
fn selected_codecs_round_trip_losslessly() {
let ts = u64_stream(
&(0..1000)
.map(|i| 1_700_000_000_000 + i * 1_000_000)
.collect::<Vec<_>>(),
);
let gauge = f64_stream(
&(0..1000)
.map(|i| 50.0 + (i % 9) as f64 * 0.5)
.collect::<Vec<_>>(),
);
let counter = u64_stream(&(0..1000).map(|i| (i * 7) as u64).collect::<Vec<_>>());
let strings = str_stream(&["a", "a", "b", "c", "a", "b", "b", "a", "c", "a"]);
let cases = [
(ColumnSemantics::Timestamp, 2u8, &ts),
(ColumnSemantics::Gauge, 3u8, &gauge),
(ColumnSemantics::Counter, 2u8, &counter),
(ColumnSemantics::LowCardinality, 4u8, &strings),
(ColumnSemantics::Generic, 4u8, &strings),
];
for (sem, ty, raw) in cases {
let codecs = select_codecs(ty, sem);
let encoded = encode_bytes(&codecs, raw).unwrap();
let decoded = decode_bytes(&encoded).unwrap();
assert_eq!(decoded, *raw, "lossless round-trip failed for {sem:?}");
}
}
#[test]
fn selected_codecs_meet_loose_ratio_bounds() {
let ts = u64_stream(
&(0..4000)
.map(|i| 1_700_000_000_000 + i * 1_000_000)
.collect::<Vec<_>>(),
);
let enc = encode_bytes(&select_codecs(2, ColumnSemantics::Timestamp), &ts).unwrap();
assert!(
enc.len() < ts.len() / 4,
"timestamp codec ratio too weak: {} -> {}",
ts.len(),
enc.len()
);
let gauge = f64_stream(
&(0..4000)
.map(|i| 95.0 + (i % 5) as f64 * 0.1)
.collect::<Vec<_>>(),
);
let enc = encode_bytes(&select_codecs(3, ColumnSemantics::Gauge), &gauge).unwrap();
assert!(
enc.len() < gauge.len() / 2,
"gauge codec ratio too weak: {} -> {}",
gauge.len(),
enc.len()
);
let counter = u64_stream(&(0..4000).map(|i| (i * 3) as u64).collect::<Vec<_>>());
let enc = encode_bytes(&select_codecs(2, ColumnSemantics::Counter), &counter).unwrap();
assert!(
enc.len() < counter.len() / 4,
"counter codec ratio too weak: {} -> {}",
counter.len(),
enc.len()
);
let labels: Vec<&str> = (0..4000)
.map(|i| ["us-east-1", "eu-west-1", "apac-south-1"][i % 3])
.collect();
let strings = str_stream(&labels);
let enc =
encode_bytes(&select_codecs(4, ColumnSemantics::LowCardinality), &strings).unwrap();
assert!(
enc.len() < strings.len() / 2,
"low-cardinality codec ratio too weak: {} -> {}",
strings.len(),
enc.len()
);
}
}