1use crate::error::CodecError;
22
23#[inline]
32fn zigzag_encode(v: i64) -> u64 {
33 ((v << 1) ^ (v >> 63)) as u64
34}
35
36#[inline]
38fn zigzag_decode(v: u64) -> i64 {
39 ((v >> 1) as i64) ^ -((v & 1) as i64)
40}
41
42fn write_varint(buf: &mut Vec<u8>, mut value: u64) {
44 loop {
45 let mut byte = (value & 0x7F) as u8;
46 value >>= 7;
47 if value != 0 {
48 byte |= 0x80;
49 }
50 buf.push(byte);
51 if value == 0 {
52 break;
53 }
54 }
55}
56
57fn read_varint(data: &[u8]) -> Result<(u64, usize), CodecError> {
61 let mut value: u64 = 0;
62 let mut shift: u32 = 0;
63
64 for (i, &byte) in data.iter().enumerate() {
65 if shift >= 70 {
66 return Err(CodecError::Corrupt {
67 detail: "varint too long (>10 bytes)".into(),
68 });
69 }
70
71 value |= ((byte & 0x7F) as u64) << shift;
72 shift += 7;
73
74 if byte & 0x80 == 0 {
75 return Ok((value, i + 1));
76 }
77 }
78
79 Err(CodecError::Truncated {
80 expected: data.len() + 1,
81 actual: data.len(),
82 })
83}
84
85pub fn encode(values: &[i64]) -> Vec<u8> {
91 let count = values.len() as u32;
92 let mut out = Vec::with_capacity(12 + values.len() * 2);
94
95 out.extend_from_slice(&count.to_le_bytes());
96
97 if values.is_empty() {
98 return out;
99 }
100
101 out.extend_from_slice(&values[0].to_le_bytes());
102
103 for i in 1..values.len() {
104 let delta = values[i].wrapping_sub(values[i - 1]);
105 write_varint(&mut out, zigzag_encode(delta));
106 }
107
108 out
109}
110
111pub fn decode(data: &[u8]) -> Result<Vec<i64>, CodecError> {
113 if data.len() < 4 {
114 return Err(CodecError::Truncated {
115 expected: 4,
116 actual: data.len(),
117 });
118 }
119
120 let count = u32::from_le_bytes(data[0..4].try_into().map_err(|_| CodecError::Corrupt {
121 detail: "invalid header".into(),
122 })?) as usize;
123
124 if count == 0 {
125 return Ok(Vec::new());
126 }
127
128 if data.len() < 12 {
129 return Err(CodecError::Truncated {
130 expected: 12,
131 actual: data.len(),
132 });
133 }
134
135 let first_value =
136 i64::from_le_bytes(data[4..12].try_into().map_err(|_| CodecError::Corrupt {
137 detail: "invalid first value".into(),
138 })?);
139
140 let mut values = Vec::with_capacity(count);
141 values.push(first_value);
142
143 let mut offset = 12;
144 for _ in 1..count {
145 if offset >= data.len() {
146 return Err(CodecError::Truncated {
147 expected: offset + 1,
148 actual: data.len(),
149 });
150 }
151 let (encoded_delta, consumed) = read_varint(&data[offset..])?;
152 let delta = zigzag_decode(encoded_delta);
153 let value = values[values.len() - 1].wrapping_add(delta);
154 values.push(value);
155 offset += consumed;
156 }
157
158 Ok(values)
159}
160
161pub struct DeltaEncoder {
168 values: Vec<i64>,
169}
170
171impl DeltaEncoder {
172 pub fn new() -> Self {
173 Self {
174 values: Vec::with_capacity(4096),
175 }
176 }
177
178 pub fn push(&mut self, value: i64) {
179 self.values.push(value);
180 }
181
182 pub fn push_batch(&mut self, values: &[i64]) {
183 self.values.extend_from_slice(values);
184 }
185
186 pub fn count(&self) -> usize {
187 self.values.len()
188 }
189
190 pub fn finish(self) -> Vec<u8> {
191 encode(&self.values)
192 }
193}
194
195impl Default for DeltaEncoder {
196 fn default() -> Self {
197 Self::new()
198 }
199}
200
201pub struct DeltaDecoder {
203 values: Vec<i64>,
204 pos: usize,
205}
206
207impl DeltaDecoder {
208 pub fn new(data: &[u8]) -> Result<Self, CodecError> {
209 let values = decode(data)?;
210 Ok(Self { values, pos: 0 })
211 }
212
213 pub fn decode_all(data: &[u8]) -> Result<Vec<i64>, CodecError> {
214 decode(data)
215 }
216
217 pub fn next_value(&mut self) -> Option<i64> {
218 if self.pos < self.values.len() {
219 let v = self.values[self.pos];
220 self.pos += 1;
221 Some(v)
222 } else {
223 None
224 }
225 }
226
227 pub fn remaining(&self) -> usize {
228 self.values.len() - self.pos
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn zigzag_roundtrip() {
238 for v in [0i64, 1, -1, 2, -2, 63, -63, 127, -128, i64::MAX, i64::MIN] {
239 assert_eq!(zigzag_decode(zigzag_encode(v)), v, "zigzag failed for {v}");
240 }
241 }
242
243 #[test]
244 fn varint_roundtrip() {
245 for v in [0u64, 1, 127, 128, 255, 16383, 16384, u64::MAX / 2, u64::MAX] {
246 let mut buf = Vec::new();
247 write_varint(&mut buf, v);
248 let (decoded, consumed) = read_varint(&buf).unwrap();
249 assert_eq!(decoded, v, "varint failed for {v}");
250 assert_eq!(consumed, buf.len());
251 }
252 }
253
254 #[test]
255 fn empty_roundtrip() {
256 let encoded = encode(&[]);
257 let decoded = decode(&encoded).unwrap();
258 assert!(decoded.is_empty());
259 }
260
261 #[test]
262 fn single_value() {
263 let encoded = encode(&[42i64]);
264 let decoded = decode(&encoded).unwrap();
265 assert_eq!(decoded, vec![42i64]);
266 assert_eq!(encoded.len(), 12); }
268
269 #[test]
270 fn monotonic_counter() {
271 let values: Vec<i64> = (0..10_000).map(|i| i * 1000).collect();
273 let encoded = encode(&values);
274 let decoded = decode(&encoded).unwrap();
275 assert_eq!(decoded, values);
276
277 let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
279 assert!(
280 bytes_per_sample < 3.0,
281 "monotonic counter should compress to <3 bytes/sample, got {bytes_per_sample:.2}"
282 );
283 }
284
285 #[test]
286 fn counter_with_small_increments() {
287 let values: Vec<i64> = (0..10_000).collect();
289 let encoded = encode(&values);
290 let decoded = decode(&encoded).unwrap();
291 assert_eq!(decoded, values);
292
293 let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
295 assert!(
296 bytes_per_sample < 2.0,
297 "unit-increment counter should compress to <2 bytes/sample, got {bytes_per_sample:.2}"
298 );
299 }
300
301 #[test]
302 fn counter_reset() {
303 let mut values: Vec<i64> = (0..500).map(|i| i * 100).collect();
305 values.push(0); values.extend((1..500).map(|i| i * 100));
307
308 let encoded = encode(&values);
309 let decoded = decode(&encoded).unwrap();
310 assert_eq!(decoded, values);
311 }
312
313 #[test]
314 fn non_monotonic_gauge() {
315 let mut values = Vec::with_capacity(10_000);
317 let mut val = 50i64;
318 let mut rng: u64 = 12345;
319 for _ in 0..10_000 {
320 values.push(val);
321 rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
322 let delta = ((rng >> 33) as i64 % 11) - 5; val += delta;
324 }
325 let encoded = encode(&values);
326 let decoded = decode(&encoded).unwrap();
327 assert_eq!(decoded, values);
328
329 let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
331 assert!(
332 bytes_per_sample < 3.0,
333 "small-delta gauge should compress to <3 bytes/sample, got {bytes_per_sample:.2}"
334 );
335 }
336
337 #[test]
338 fn negative_values() {
339 let values: Vec<i64> = vec![-1000, -999, -998, -997, -996];
340 let encoded = encode(&values);
341 let decoded = decode(&encoded).unwrap();
342 assert_eq!(decoded, values);
343 }
344
345 #[test]
346 fn large_values() {
347 let values: Vec<i64> = vec![i64::MAX, i64::MAX - 1, i64::MAX - 2];
348 let encoded = encode(&values);
349 let decoded = decode(&encoded).unwrap();
350 assert_eq!(decoded, values);
351 }
352
353 #[test]
354 fn boundary_values() {
355 let values: Vec<i64> = vec![i64::MIN, 0, i64::MAX];
356 let encoded = encode(&values);
357 let decoded = decode(&encoded).unwrap();
358 assert_eq!(decoded, values);
359 }
360
361 #[test]
362 fn streaming_encoder_matches_batch() {
363 let values: Vec<i64> = (0..1000).map(|i| i * 7).collect();
364 let batch = encode(&values);
365
366 let mut enc = DeltaEncoder::new();
367 for &v in &values {
368 enc.push(v);
369 }
370 assert_eq!(enc.finish(), batch);
371 }
372
373 #[test]
374 fn streaming_decoder() {
375 let values: Vec<i64> = (0..100).map(|i| i * 10).collect();
376 let encoded = encode(&values);
377 let mut dec = DeltaDecoder::new(&encoded).unwrap();
378
379 for &expected in &values {
380 assert_eq!(dec.next_value(), Some(expected));
381 }
382 assert_eq!(dec.next_value(), None);
383 }
384
385 #[test]
386 fn truncated_input_errors() {
387 assert!(decode(&[]).is_err());
388 assert!(decode(&[1, 0, 0, 0]).is_err()); }
390
391 #[test]
392 fn compression_vs_raw() {
393 let values: Vec<i64> = (0..100_000).map(|i| i * 1000).collect();
394 let encoded = encode(&values);
395 let raw_size = values.len() * 8;
396 let ratio = raw_size as f64 / encoded.len() as f64;
397 assert!(
398 ratio > 3.0,
399 "expected >3x compression for monotonic counter, got {ratio:.1}x"
400 );
401 }
402}