use crate::error::CodecError;
#[inline]
fn zigzag_encode(v: i64) -> u64 {
((v << 1) ^ (v >> 63)) as u64
}
#[inline]
fn zigzag_decode(v: u64) -> i64 {
((v >> 1) as i64) ^ -((v & 1) as i64)
}
fn write_varint(buf: &mut Vec<u8>, mut value: u64) {
loop {
let mut byte = (value & 0x7F) as u8;
value >>= 7;
if value != 0 {
byte |= 0x80;
}
buf.push(byte);
if value == 0 {
break;
}
}
}
fn read_varint(data: &[u8]) -> Result<(u64, usize), CodecError> {
let mut value: u64 = 0;
let mut shift: u32 = 0;
for (i, &byte) in data.iter().enumerate() {
if shift >= 70 {
return Err(CodecError::Corrupt {
detail: "varint too long (>10 bytes)".into(),
});
}
value |= ((byte & 0x7F) as u64) << shift;
shift += 7;
if byte & 0x80 == 0 {
return Ok((value, i + 1));
}
}
Err(CodecError::Truncated {
expected: data.len() + 1,
actual: data.len(),
})
}
pub fn encode(values: &[i64]) -> Vec<u8> {
let count = values.len() as u32;
let mut out = Vec::with_capacity(12 + values.len() * 2);
out.extend_from_slice(&count.to_le_bytes());
if values.is_empty() {
return out;
}
out.extend_from_slice(&values[0].to_le_bytes());
for i in 1..values.len() {
let delta = values[i].wrapping_sub(values[i - 1]);
write_varint(&mut out, zigzag_encode(delta));
}
out
}
pub fn decode(data: &[u8]) -> Result<Vec<i64>, CodecError> {
if data.len() < 4 {
return Err(CodecError::Truncated {
expected: 4,
actual: data.len(),
});
}
let count = u32::from_le_bytes(data[0..4].try_into().map_err(|_| CodecError::Corrupt {
detail: "invalid header".into(),
})?) as usize;
if count == 0 {
return Ok(Vec::new());
}
if data.len() < 12 {
return Err(CodecError::Truncated {
expected: 12,
actual: data.len(),
});
}
let first_value =
i64::from_le_bytes(data[4..12].try_into().map_err(|_| CodecError::Corrupt {
detail: "invalid first value".into(),
})?);
let mut values = Vec::with_capacity(count);
values.push(first_value);
let mut offset = 12;
for _ in 1..count {
if offset >= data.len() {
return Err(CodecError::Truncated {
expected: offset + 1,
actual: data.len(),
});
}
let (encoded_delta, consumed) = read_varint(&data[offset..])?;
let delta = zigzag_decode(encoded_delta);
let value = values[values.len() - 1].wrapping_add(delta);
values.push(value);
offset += consumed;
}
Ok(values)
}
pub struct DeltaEncoder {
values: Vec<i64>,
}
impl DeltaEncoder {
pub fn new() -> Self {
Self {
values: Vec::with_capacity(4096),
}
}
pub fn push(&mut self, value: i64) {
self.values.push(value);
}
pub fn push_batch(&mut self, values: &[i64]) {
self.values.extend_from_slice(values);
}
pub fn count(&self) -> usize {
self.values.len()
}
pub fn finish(self) -> Vec<u8> {
encode(&self.values)
}
}
impl Default for DeltaEncoder {
fn default() -> Self {
Self::new()
}
}
pub struct DeltaDecoder {
values: Vec<i64>,
pos: usize,
}
impl DeltaDecoder {
pub fn new(data: &[u8]) -> Result<Self, CodecError> {
let values = decode(data)?;
Ok(Self { values, pos: 0 })
}
pub fn decode_all(data: &[u8]) -> Result<Vec<i64>, CodecError> {
decode(data)
}
pub fn next_value(&mut self) -> Option<i64> {
if self.pos < self.values.len() {
let v = self.values[self.pos];
self.pos += 1;
Some(v)
} else {
None
}
}
pub fn remaining(&self) -> usize {
self.values.len() - self.pos
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn zigzag_roundtrip() {
for v in [0i64, 1, -1, 2, -2, 63, -63, 127, -128, i64::MAX, i64::MIN] {
assert_eq!(zigzag_decode(zigzag_encode(v)), v, "zigzag failed for {v}");
}
}
#[test]
fn varint_roundtrip() {
for v in [0u64, 1, 127, 128, 255, 16383, 16384, u64::MAX / 2, u64::MAX] {
let mut buf = Vec::new();
write_varint(&mut buf, v);
let (decoded, consumed) = read_varint(&buf).unwrap();
assert_eq!(decoded, v, "varint failed for {v}");
assert_eq!(consumed, buf.len());
}
}
#[test]
fn empty_roundtrip() {
let encoded = encode(&[]);
let decoded = decode(&encoded).unwrap();
assert!(decoded.is_empty());
}
#[test]
fn single_value() {
let encoded = encode(&[42i64]);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, vec![42i64]);
assert_eq!(encoded.len(), 12); }
#[test]
fn monotonic_counter() {
let values: Vec<i64> = (0..10_000).map(|i| i * 1000).collect();
let encoded = encode(&values);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, values);
let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
assert!(
bytes_per_sample < 3.0,
"monotonic counter should compress to <3 bytes/sample, got {bytes_per_sample:.2}"
);
}
#[test]
fn counter_with_small_increments() {
let values: Vec<i64> = (0..10_000).collect();
let encoded = encode(&values);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, values);
let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
assert!(
bytes_per_sample < 2.0,
"unit-increment counter should compress to <2 bytes/sample, got {bytes_per_sample:.2}"
);
}
#[test]
fn counter_reset() {
let mut values: Vec<i64> = (0..500).map(|i| i * 100).collect();
values.push(0); values.extend((1..500).map(|i| i * 100));
let encoded = encode(&values);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, values);
}
#[test]
fn non_monotonic_gauge() {
let mut values = Vec::with_capacity(10_000);
let mut val = 50i64;
let mut rng: u64 = 12345;
for _ in 0..10_000 {
values.push(val);
rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
let delta = ((rng >> 33) as i64 % 11) - 5; val += delta;
}
let encoded = encode(&values);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, values);
let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
assert!(
bytes_per_sample < 3.0,
"small-delta gauge should compress to <3 bytes/sample, got {bytes_per_sample:.2}"
);
}
#[test]
fn negative_values() {
let values: Vec<i64> = vec![-1000, -999, -998, -997, -996];
let encoded = encode(&values);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, values);
}
#[test]
fn large_values() {
let values: Vec<i64> = vec![i64::MAX, i64::MAX - 1, i64::MAX - 2];
let encoded = encode(&values);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, values);
}
#[test]
fn boundary_values() {
let values: Vec<i64> = vec![i64::MIN, 0, i64::MAX];
let encoded = encode(&values);
let decoded = decode(&encoded).unwrap();
assert_eq!(decoded, values);
}
#[test]
fn streaming_encoder_matches_batch() {
let values: Vec<i64> = (0..1000).map(|i| i * 7).collect();
let batch = encode(&values);
let mut enc = DeltaEncoder::new();
for &v in &values {
enc.push(v);
}
assert_eq!(enc.finish(), batch);
}
#[test]
fn streaming_decoder() {
let values: Vec<i64> = (0..100).map(|i| i * 10).collect();
let encoded = encode(&values);
let mut dec = DeltaDecoder::new(&encoded).unwrap();
for &expected in &values {
assert_eq!(dec.next_value(), Some(expected));
}
assert_eq!(dec.next_value(), None);
}
#[test]
fn truncated_input_errors() {
assert!(decode(&[]).is_err());
assert!(decode(&[1, 0, 0, 0]).is_err()); }
#[test]
fn compression_vs_raw() {
let values: Vec<i64> = (0..100_000).map(|i| i * 1000).collect();
let encoded = encode(&values);
let raw_size = values.len() * 8;
let ratio = raw_size as f64 / encoded.len() as f64;
assert!(
ratio > 3.0,
"expected >3x compression for monotonic counter, got {ratio:.1}x"
);
}
}