1use crate::error::CodecError;
33
34#[derive(Debug)]
39pub(crate) struct BitWriter {
40 buf: Vec<u8>,
41 bit_pos: usize,
42}
43
44impl BitWriter {
45 pub(crate) fn new() -> Self {
46 Self {
47 buf: Vec::with_capacity(1024),
48 bit_pos: 0,
49 }
50 }
51
52 pub(crate) fn write_bit(&mut self, bit: bool) {
53 let byte_idx = self.bit_pos / 8;
54 let bit_idx = 7 - (self.bit_pos % 8);
55 if byte_idx >= self.buf.len() {
56 self.buf.push(0);
57 }
58 if bit {
59 self.buf[byte_idx] |= 1 << bit_idx;
60 }
61 self.bit_pos += 1;
62 }
63
64 pub(crate) fn write_bits(&mut self, value: u64, num_bits: usize) {
65 for i in (0..num_bits).rev() {
66 self.write_bit((value >> i) & 1 == 1);
67 }
68 }
69
70 pub(crate) fn as_bytes(&self) -> &[u8] {
71 &self.buf
72 }
73
74 pub(crate) fn bit_len(&self) -> usize {
75 self.bit_pos
76 }
77}
78
79pub(crate) struct BitReader<'a> {
80 buf: &'a [u8],
81 bit_pos: usize,
82}
83
84impl<'a> BitReader<'a> {
85 pub(crate) fn new(buf: &'a [u8]) -> Self {
86 Self { buf, bit_pos: 0 }
87 }
88
89 pub(crate) fn read_bit(&mut self) -> Result<bool, CodecError> {
90 let byte_idx = self.bit_pos / 8;
91 if byte_idx >= self.buf.len() {
92 return Err(CodecError::Truncated {
93 expected: byte_idx + 1,
94 actual: self.buf.len(),
95 });
96 }
97 let bit_idx = 7 - (self.bit_pos % 8);
98 let bit = (self.buf[byte_idx] >> bit_idx) & 1 == 1;
99 self.bit_pos += 1;
100 Ok(bit)
101 }
102
103 pub(crate) fn read_bits(&mut self, num_bits: usize) -> Result<u64, CodecError> {
104 let mut value = 0u64;
105 for _ in 0..num_bits {
106 value = (value << 1) | u64::from(self.read_bit()?);
107 }
108 Ok(value)
109 }
110}
111
112pub fn encode(values: &[i64]) -> Vec<u8> {
118 let count = values.len() as u32;
119 let mut out = Vec::with_capacity(20 + values.len() / 4);
120
121 out.extend_from_slice(&count.to_le_bytes());
122
123 if values.is_empty() {
124 return out;
125 }
126
127 out.extend_from_slice(&values[0].to_le_bytes());
128
129 if values.len() == 1 {
130 return out;
131 }
132
133 let first_delta = values[1].wrapping_sub(values[0]);
134 out.extend_from_slice(&first_delta.to_le_bytes());
135
136 if values.len() == 2 {
137 return out;
138 }
139
140 let mut bs = BitWriter::new();
141 let mut prev_delta = first_delta;
142
143 for i in 2..values.len() {
144 let delta = values[i].wrapping_sub(values[i - 1]);
145 let dod = delta.wrapping_sub(prev_delta);
146 encode_dod(&mut bs, dod);
147 prev_delta = delta;
148 }
149
150 out.extend_from_slice(bs.as_bytes());
151 out
152}
153
154pub fn decode(data: &[u8]) -> Result<Vec<i64>, CodecError> {
156 if data.len() < 4 {
157 return Err(CodecError::Truncated {
158 expected: 4,
159 actual: data.len(),
160 });
161 }
162
163 let count = u32::from_le_bytes(data[0..4].try_into().map_err(|_| CodecError::Corrupt {
164 detail: "invalid header".into(),
165 })?) as usize;
166
167 if count == 0 {
168 return Ok(Vec::new());
169 }
170
171 if data.len() < 12 {
172 return Err(CodecError::Truncated {
173 expected: 12,
174 actual: data.len(),
175 });
176 }
177
178 let first_value =
179 i64::from_le_bytes(data[4..12].try_into().map_err(|_| CodecError::Corrupt {
180 detail: "invalid first value".into(),
181 })?);
182
183 let mut values = Vec::with_capacity(count);
184 values.push(first_value);
185
186 if count == 1 {
187 return Ok(values);
188 }
189
190 if data.len() < 20 {
191 return Err(CodecError::Truncated {
192 expected: 20,
193 actual: data.len(),
194 });
195 }
196
197 let first_delta =
198 i64::from_le_bytes(data[12..20].try_into().map_err(|_| CodecError::Corrupt {
199 detail: "invalid first delta".into(),
200 })?);
201 values.push(first_value.wrapping_add(first_delta));
202
203 if count == 2 {
204 return Ok(values);
205 }
206
207 let mut reader = BitReader::new(&data[20..]);
208 let mut prev_delta = first_delta;
209
210 for _ in 2..count {
211 let dod = decode_dod(&mut reader)?;
212 let delta = prev_delta.wrapping_add(dod);
213 let value = values[values.len() - 1].wrapping_add(delta);
214 values.push(value);
215 prev_delta = delta;
216 }
217
218 Ok(values)
219}
220
221fn encode_dod(bs: &mut BitWriter, dod: i64) {
226 if dod == 0 {
227 bs.write_bit(false);
228 } else if (-64..=63).contains(&dod) {
229 bs.write_bits(0b10, 2);
230 bs.write_bits((dod as u64) & 0x7F, 7);
231 } else if (-256..=255).contains(&dod) {
232 bs.write_bits(0b110, 3);
233 bs.write_bits((dod as u64) & 0x1FF, 9);
234 } else if (-2048..=2047).contains(&dod) {
235 bs.write_bits(0b1110, 4);
236 bs.write_bits((dod as u64) & 0xFFF, 12);
237 } else {
238 bs.write_bits(0b1111, 4);
239 bs.write_bits(dod as u64, 64);
240 }
241}
242
243fn decode_dod(reader: &mut BitReader<'_>) -> Result<i64, CodecError> {
244 let bit = reader.read_bit()?;
245 if !bit {
246 return Ok(0);
247 }
248
249 let bit2 = reader.read_bit()?;
250 if !bit2 {
251 let raw = reader.read_bits(7)? as i64;
252 return Ok(sign_extend(raw, 7));
253 }
254
255 let bit3 = reader.read_bit()?;
256 if !bit3 {
257 let raw = reader.read_bits(9)? as i64;
258 return Ok(sign_extend(raw, 9));
259 }
260
261 let bit4 = reader.read_bit()?;
262 if !bit4 {
263 let raw = reader.read_bits(12)? as i64;
264 return Ok(sign_extend(raw, 12));
265 }
266
267 let raw = reader.read_bits(64)?;
268 Ok(raw as i64)
269}
270
271fn sign_extend(value: i64, bits: u32) -> i64 {
272 let shift = 64 - bits;
273 (value << shift) >> shift
274}
275
276pub struct DoubleDeltaEncoder {
283 values: Vec<i64>,
284}
285
286impl DoubleDeltaEncoder {
287 pub fn new() -> Self {
288 Self {
289 values: Vec::with_capacity(4096),
290 }
291 }
292
293 pub fn push(&mut self, value: i64) {
295 self.values.push(value);
296 }
297
298 pub fn push_batch(&mut self, values: &[i64]) {
300 self.values.extend_from_slice(values);
301 }
302
303 pub fn count(&self) -> usize {
305 self.values.len()
306 }
307
308 pub fn finish(self) -> Vec<u8> {
310 encode(&self.values)
311 }
312}
313
314impl Default for DoubleDeltaEncoder {
315 fn default() -> Self {
316 Self::new()
317 }
318}
319
320pub struct DoubleDeltaDecoder {
322 values: Vec<i64>,
323 pos: usize,
324}
325
326impl DoubleDeltaDecoder {
327 pub fn new(data: &[u8]) -> Result<Self, CodecError> {
329 let values = decode(data)?;
330 Ok(Self { values, pos: 0 })
331 }
332
333 pub fn decode_all(data: &[u8]) -> Result<Vec<i64>, CodecError> {
335 decode(data)
336 }
337
338 pub fn next_value(&mut self) -> Option<i64> {
340 if self.pos < self.values.len() {
341 let v = self.values[self.pos];
342 self.pos += 1;
343 Some(v)
344 } else {
345 None
346 }
347 }
348
349 pub fn remaining(&self) -> usize {
351 self.values.len() - self.pos
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn empty_roundtrip() {
361 let encoded = encode(&[]);
362 let decoded = decode(&encoded).unwrap();
363 assert!(decoded.is_empty());
364 }
365
366 #[test]
367 fn single_value() {
368 let encoded = encode(&[1_700_000_000_000i64]);
369 let decoded = decode(&encoded).unwrap();
370 assert_eq!(decoded, vec![1_700_000_000_000i64]);
371 assert_eq!(encoded.len(), 12);
372 }
373
374 #[test]
375 fn two_values() {
376 let values = vec![1000i64, 2000];
377 let encoded = encode(&values);
378 let decoded = decode(&encoded).unwrap();
379 assert_eq!(decoded, values);
380 assert_eq!(encoded.len(), 20);
381 }
382
383 #[test]
384 fn constant_rate_timestamps() {
385 let values: Vec<i64> = (0..10_000)
386 .map(|i| 1_700_000_000_000 + i * 10_000)
387 .collect();
388 let encoded = encode(&values);
389 let decoded = decode(&encoded).unwrap();
390 assert_eq!(decoded, values);
391
392 let bits_per_sample = (encoded.len() as f64 * 8.0) / values.len() as f64;
393 assert!(
394 bits_per_sample < 2.0,
395 "constant-rate should compress to ~1 bit/sample, got {bits_per_sample:.1}"
396 );
397 }
398
399 #[test]
400 fn monotonic_with_jitter() {
401 let mut values = Vec::with_capacity(10_000);
402 let mut ts = 1_700_000_000_000i64;
403 let mut rng: u64 = 42;
404 for _ in 0..10_000 {
405 values.push(ts);
406 rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
407 let jitter = ((rng >> 33) as i64 % 101) - 50;
408 ts += 10_000 + jitter;
409 }
410 let encoded = encode(&values);
411 let decoded = decode(&encoded).unwrap();
412 assert_eq!(decoded, values);
413
414 let bytes_per_sample = encoded.len() as f64 / values.len() as f64;
415 assert!(
416 bytes_per_sample < 2.0,
417 "jittered timestamps should compress to <2 bytes/sample, got {bytes_per_sample:.2}"
418 );
419 }
420
421 #[test]
422 fn non_monotonic_values() {
423 let values: Vec<i64> = vec![100, 50, 200, 10, 300, 5, 1000, -500, 0, 42];
424 let encoded = encode(&values);
425 let decoded = decode(&encoded).unwrap();
426 assert_eq!(decoded, values);
427 }
428
429 #[test]
430 fn negative_values() {
431 let values: Vec<i64> = vec![-1000, -999, -998, -997, -996];
432 let encoded = encode(&values);
433 let decoded = decode(&encoded).unwrap();
434 assert_eq!(decoded, values);
435 }
436
437 #[test]
438 fn large_deltas() {
439 let values: Vec<i64> = vec![0, i64::MAX / 2, i64::MIN / 2, i64::MAX / 4, 0];
440 let encoded = encode(&values);
441 let decoded = decode(&encoded).unwrap();
442 assert_eq!(decoded, values);
443 }
444
445 #[test]
446 fn boundary_values() {
447 let values: Vec<i64> = vec![i64::MIN, 0, i64::MAX, 0, i64::MIN];
448 let encoded = encode(&values);
449 let decoded = decode(&encoded).unwrap();
450 assert_eq!(decoded, values);
451 }
452
453 #[test]
454 fn compression_better_than_raw_for_constant_rate() {
455 let values: Vec<i64> = (0..100_000)
456 .map(|i| 1_700_000_000_000 + i * 10_000)
457 .collect();
458 let encoded = encode(&values);
459 let raw_size = values.len() * 8;
460 let ratio = raw_size as f64 / encoded.len() as f64;
461 assert!(
462 ratio > 5.0,
463 "expected >5x compression for constant-rate, got {ratio:.1}x"
464 );
465 }
466
467 #[test]
468 fn streaming_encoder_matches_batch() {
469 let values: Vec<i64> = (0..1000).map(|i| 1_000_000 + i * 100).collect();
470 let batch_encoded = encode(&values);
471
472 let mut enc = DoubleDeltaEncoder::new();
473 for &v in &values {
474 enc.push(v);
475 }
476 let stream_encoded = enc.finish();
477
478 assert_eq!(batch_encoded, stream_encoded);
479 }
480
481 #[test]
482 fn streaming_decoder() {
483 let values: Vec<i64> = (0..100).map(|i| 5000 + i * 10).collect();
484 let encoded = encode(&values);
485 let mut dec = DoubleDeltaDecoder::new(&encoded).unwrap();
486
487 for &expected in &values {
488 assert_eq!(dec.next_value(), Some(expected));
489 }
490 assert_eq!(dec.next_value(), None);
491 assert_eq!(dec.remaining(), 0);
492 }
493
494 #[test]
495 fn truncated_input_errors() {
496 assert!(decode(&[]).is_err());
497 assert!(decode(&[1, 0, 0, 0]).is_err());
498 assert!(decode(&[2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]).is_err());
499 }
500}