1use crate::error::CodecError;
20
21#[inline]
30fn zigzag_encode(v: i64) -> u64 {
31 ((v << 1) ^ (v >> 63)) as u64
32}
33
34#[inline]
36fn zigzag_decode(v: u64) -> i64 {
37 ((v >> 1) as i64) ^ -((v & 1) as i64)
38}
39
40fn write_varint(buf: &mut Vec<u8>, mut value: u64) {
42 loop {
43 let mut byte = (value & 0x7F) as u8;
44 value >>= 7;
45 if value != 0 {
46 byte |= 0x80;
47 }
48 buf.push(byte);
49 if value == 0 {
50 break;
51 }
52 }
53}
54
55fn read_varint(data: &[u8]) -> Result<(u64, usize), CodecError> {
59 let mut value: u64 = 0;
60 let mut shift: u32 = 0;
61
62 for (i, &byte) in data.iter().enumerate() {
63 if shift >= 70 {
64 return Err(CodecError::Corrupt {
65 detail: "varint too long (>10 bytes)".into(),
66 });
67 }
68
69 value |= ((byte & 0x7F) as u64) << shift;
70 shift += 7;
71
72 if byte & 0x80 == 0 {
73 return Ok((value, i + 1));
74 }
75 }
76
77 Err(CodecError::Truncated {
78 expected: data.len() + 1,
79 actual: data.len(),
80 })
81}
82
83pub fn encode(values: &[i64]) -> Vec<u8> {
89 let count = values.len() as u32;
90 let mut out = Vec::with_capacity(12 + values.len() * 2);
92
93 out.extend_from_slice(&count.to_le_bytes());
94
95 if values.is_empty() {
96 return out;
97 }
98
99 out.extend_from_slice(&values[0].to_le_bytes());
100
101 for i in 1..values.len() {
102 let delta = values[i].wrapping_sub(values[i - 1]);
103 write_varint(&mut out, zigzag_encode(delta));
104 }
105
106 out
107}
108
109pub fn decode(data: &[u8]) -> Result<Vec<i64>, CodecError> {
111 if data.len() < 4 {
112 return Err(CodecError::Truncated {
113 expected: 4,
114 actual: data.len(),
115 });
116 }
117
118 let count = u32::from_le_bytes(data[0..4].try_into().map_err(|_| CodecError::Corrupt {
119 detail: "invalid header".into(),
120 })?) as usize;
121
122 if count == 0 {
123 return Ok(Vec::new());
124 }
125
126 if data.len() < 12 {
127 return Err(CodecError::Truncated {
128 expected: 12,
129 actual: data.len(),
130 });
131 }
132
133 let first_value =
134 i64::from_le_bytes(data[4..12].try_into().map_err(|_| CodecError::Corrupt {
135 detail: "invalid first value".into(),
136 })?);
137
138 let mut values = Vec::with_capacity(count);
139 values.push(first_value);
140
141 let mut offset = 12;
142 for _ in 1..count {
143 if offset >= data.len() {
144 return Err(CodecError::Truncated {
145 expected: offset + 1,
146 actual: data.len(),
147 });
148 }
149 let (encoded_delta, consumed) = read_varint(&data[offset..])?;
150 let delta = zigzag_decode(encoded_delta);
151 let value = values[values.len() - 1].wrapping_add(delta);
152 values.push(value);
153 offset += consumed;
154 }
155
156 Ok(values)
157}
158
159pub struct DeltaEncoder {
166 values: Vec<i64>,
167}
168
169impl DeltaEncoder {
170 pub fn new() -> Self {
171 Self {
172 values: Vec::with_capacity(4096),
173 }
174 }
175
176 pub fn push(&mut self, value: i64) {
177 self.values.push(value);
178 }
179
180 pub fn push_batch(&mut self, values: &[i64]) {
181 self.values.extend_from_slice(values);
182 }
183
184 pub fn count(&self) -> usize {
185 self.values.len()
186 }
187
188 pub fn finish(self) -> Vec<u8> {
189 encode(&self.values)
190 }
191}
192
193impl Default for DeltaEncoder {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199pub struct DeltaDecoder {
201 values: Vec<i64>,
202 pos: usize,
203}
204
205impl DeltaDecoder {
206 pub fn new(data: &[u8]) -> Result<Self, CodecError> {
207 let values = decode(data)?;
208 Ok(Self { values, pos: 0 })
209 }
210
211 pub fn decode_all(data: &[u8]) -> Result<Vec<i64>, CodecError> {
212 decode(data)
213 }
214
215 pub fn next_value(&mut self) -> Option<i64> {
216 if self.pos < self.values.len() {
217 let v = self.values[self.pos];
218 self.pos += 1;
219 Some(v)
220 } else {
221 None
222 }
223 }
224
225 pub fn remaining(&self) -> usize {
226 self.values.len() - self.pos
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 #[test]
235 fn zigzag_roundtrip() {
236 for v in [0i64, 1, -1, 2, -2, 63, -63, 127, -128, i64::MAX, i64::MIN] {
237 assert_eq!(zigzag_decode(zigzag_encode(v)), v, "zigzag failed for {v}");
238 }
239 }
240
241 #[test]
242 fn varint_roundtrip() {
243 for v in [0u64, 1, 127, 128, 255, 16383, 16384, u64::MAX / 2, u64::MAX] {
244 let mut buf = Vec::new();
245 write_varint(&mut buf, v);
246 let (decoded, consumed) = read_varint(&buf).unwrap();
247 assert_eq!(decoded, v, "varint failed for {v}");
248 assert_eq!(consumed, buf.len());
249 }
250 }
251
252 #[test]
253 fn empty_roundtrip() {
254 let encoded = encode(&[]);
255 let decoded = decode(&encoded).unwrap();
256 assert!(decoded.is_empty());
257 }
258
259 #[test]
260 fn single_value() {
261 let encoded = encode(&[42i64]);
262 let decoded = decode(&encoded).unwrap();
263 assert_eq!(decoded, vec![42i64]);
264 assert_eq!(encoded.len(), 12); }
266
267 #[test]
268 fn monotonic_counter() {
269 let values: Vec<i64> = (0..10_000).map(|i| i * 1000).collect();
271 let encoded = encode(&values);
272 let decoded = decode(&encoded).unwrap();
273 assert_eq!(decoded, values);
274
275 let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
277 assert!(
278 bytes_per_sample < 3.0,
279 "monotonic counter should compress to <3 bytes/sample, got {bytes_per_sample:.2}"
280 );
281 }
282
283 #[test]
284 fn counter_with_small_increments() {
285 let values: Vec<i64> = (0..10_000).collect();
287 let encoded = encode(&values);
288 let decoded = decode(&encoded).unwrap();
289 assert_eq!(decoded, values);
290
291 let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
293 assert!(
294 bytes_per_sample < 2.0,
295 "unit-increment counter should compress to <2 bytes/sample, got {bytes_per_sample:.2}"
296 );
297 }
298
299 #[test]
300 fn counter_reset() {
301 let mut values: Vec<i64> = (0..500).map(|i| i * 100).collect();
303 values.push(0); values.extend((1..500).map(|i| i * 100));
305
306 let encoded = encode(&values);
307 let decoded = decode(&encoded).unwrap();
308 assert_eq!(decoded, values);
309 }
310
311 #[test]
312 fn non_monotonic_gauge() {
313 let mut values = Vec::with_capacity(10_000);
315 let mut val = 50i64;
316 let mut rng: u64 = 12345;
317 for _ in 0..10_000 {
318 values.push(val);
319 rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
320 let delta = ((rng >> 33) as i64 % 11) - 5; val += delta;
322 }
323 let encoded = encode(&values);
324 let decoded = decode(&encoded).unwrap();
325 assert_eq!(decoded, values);
326
327 let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
329 assert!(
330 bytes_per_sample < 3.0,
331 "small-delta gauge should compress to <3 bytes/sample, got {bytes_per_sample:.2}"
332 );
333 }
334
335 #[test]
336 fn negative_values() {
337 let values: Vec<i64> = vec![-1000, -999, -998, -997, -996];
338 let encoded = encode(&values);
339 let decoded = decode(&encoded).unwrap();
340 assert_eq!(decoded, values);
341 }
342
343 #[test]
344 fn large_values() {
345 let values: Vec<i64> = vec![i64::MAX, i64::MAX - 1, i64::MAX - 2];
346 let encoded = encode(&values);
347 let decoded = decode(&encoded).unwrap();
348 assert_eq!(decoded, values);
349 }
350
351 #[test]
352 fn boundary_values() {
353 let values: Vec<i64> = vec![i64::MIN, 0, i64::MAX];
354 let encoded = encode(&values);
355 let decoded = decode(&encoded).unwrap();
356 assert_eq!(decoded, values);
357 }
358
359 #[test]
360 fn streaming_encoder_matches_batch() {
361 let values: Vec<i64> = (0..1000).map(|i| i * 7).collect();
362 let batch = encode(&values);
363
364 let mut enc = DeltaEncoder::new();
365 for &v in &values {
366 enc.push(v);
367 }
368 assert_eq!(enc.finish(), batch);
369 }
370
371 #[test]
372 fn streaming_decoder() {
373 let values: Vec<i64> = (0..100).map(|i| i * 10).collect();
374 let encoded = encode(&values);
375 let mut dec = DeltaDecoder::new(&encoded).unwrap();
376
377 for &expected in &values {
378 assert_eq!(dec.next_value(), Some(expected));
379 }
380 assert_eq!(dec.next_value(), None);
381 }
382
383 #[test]
384 fn truncated_input_errors() {
385 assert!(decode(&[]).is_err());
386 assert!(decode(&[1, 0, 0, 0]).is_err()); }
388
389 #[test]
390 fn compression_vs_raw() {
391 let values: Vec<i64> = (0..100_000).map(|i| i * 1000).collect();
392 let encoded = encode(&values);
393 let raw_size = values.len() * 8;
394 let ratio = raw_size as f64 / encoded.len() as f64;
395 assert!(
396 ratio > 3.0,
397 "expected >3x compression for monotonic counter, got {ratio:.1}x"
398 );
399 }
400}