1use crate::double_delta::{BitReader, BitWriter};
23use crate::error::CodecError;
24
25#[derive(Debug)]
34pub struct GorillaEncoder {
35 buf: BitWriter,
36 prev_ts: i64,
37 prev_delta: i64,
38 prev_value: u64,
39 prev_leading: u8,
40 prev_trailing: u8,
41 count: u64,
42}
43
44impl GorillaEncoder {
45 pub fn new() -> Self {
46 Self {
47 buf: BitWriter::new(),
48 prev_ts: 0,
49 prev_delta: 0,
50 prev_value: 0,
51 prev_leading: u8::MAX,
52 prev_trailing: 0,
53 count: 0,
54 }
55 }
56
57 pub fn encode(&mut self, timestamp_ms: i64, value: f64) {
59 let value_bits = value.to_bits();
60
61 if self.count == 0 {
62 self.buf.write_bits(timestamp_ms as u64, 64);
63 self.buf.write_bits(value_bits, 64);
64 self.prev_ts = timestamp_ms;
65 self.prev_value = value_bits;
66 self.count = 1;
67 return;
68 }
69
70 let delta = timestamp_ms - self.prev_ts;
72 let dod = delta - self.prev_delta;
73 self.encode_timestamp_dod(dod);
74 self.prev_ts = timestamp_ms;
75 self.prev_delta = delta;
76
77 let xor = self.prev_value ^ value_bits;
79 self.encode_value_xor(xor);
80 self.prev_value = value_bits;
81
82 self.count += 1;
83 }
84
85 fn encode_timestamp_dod(&mut self, dod: i64) {
86 if dod == 0 {
87 self.buf.write_bit(false);
88 } else if (-64..=63).contains(&dod) {
89 self.buf.write_bits(0b10, 2);
90 self.buf.write_bits((dod as u64) & 0x7F, 7);
91 } else if (-256..=255).contains(&dod) {
92 self.buf.write_bits(0b110, 3);
93 self.buf.write_bits((dod as u64) & 0x1FF, 9);
94 } else if (-2048..=2047).contains(&dod) {
95 self.buf.write_bits(0b1110, 4);
96 self.buf.write_bits((dod as u64) & 0xFFF, 12);
97 } else {
98 self.buf.write_bits(0b1111, 4);
99 self.buf.write_bits(dod as u64, 64);
100 }
101 }
102
103 fn encode_value_xor(&mut self, xor: u64) {
104 if xor == 0 {
105 self.buf.write_bit(false);
106 return;
107 }
108
109 self.buf.write_bit(true);
110
111 let leading = xor.leading_zeros() as u8;
112 let trailing = xor.trailing_zeros() as u8;
113
114 if self.prev_leading != u8::MAX
115 && leading >= self.prev_leading
116 && trailing >= self.prev_trailing
117 {
118 self.buf.write_bit(false);
120 let meaningful_bits = 64 - self.prev_leading - self.prev_trailing;
121 self.buf
122 .write_bits(xor >> self.prev_trailing, meaningful_bits as usize);
123 } else {
124 self.buf.write_bit(true);
126 self.buf.write_bits(leading as u64, 6);
127 let meaningful_bits = 64 - leading - trailing;
128 self.buf.write_bits((meaningful_bits - 1) as u64, 6);
129 self.buf
130 .write_bits(xor >> trailing, meaningful_bits as usize);
131 self.prev_leading = leading;
132 self.prev_trailing = trailing;
133 }
134 }
135
136 pub fn finish(self) -> Vec<u8> {
140 let count_bytes = (self.count as u32).to_le_bytes();
141 let bitstream = self.buf.as_bytes();
142 let mut out = Vec::with_capacity(4 + bitstream.len());
143 out.extend_from_slice(&count_bytes);
144 out.extend_from_slice(bitstream);
145 out
146 }
147
148 pub fn count(&self) -> u64 {
149 self.count
150 }
151
152 pub fn compressed_size(&self) -> usize {
153 self.buf.bit_len().div_ceil(8)
154 }
155}
156
157impl Default for GorillaEncoder {
158 fn default() -> Self {
159 Self::new()
160 }
161}
162
163pub struct GorillaDecoder<'a> {
169 reader: BitReader<'a>,
170 prev_ts: i64,
171 prev_delta: i64,
172 prev_value: u64,
173 prev_leading: u8,
174 prev_trailing: u8,
175 count: u64,
176 total: u64,
177 first: bool,
178}
179
180impl<'a> GorillaDecoder<'a> {
181 pub fn new(buf: &'a [u8]) -> Self {
185 if buf.len() < 4 {
186 return Self {
187 reader: BitReader::new(&[]),
188 prev_ts: 0,
189 prev_delta: 0,
190 prev_value: 0,
191 prev_leading: 0,
192 prev_trailing: 0,
193 count: 0,
194 total: 0,
195 first: true,
196 };
197 }
198 let total = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]) as u64;
199 Self {
200 reader: BitReader::new(&buf[4..]),
201 prev_ts: 0,
202 prev_delta: 0,
203 prev_value: 0,
204 prev_leading: 0,
205 prev_trailing: 0,
206 count: 0,
207 total,
208 first: true,
209 }
210 }
211
212 pub fn next_sample(&mut self) -> Option<(i64, f64)> {
214 if self.count >= self.total {
215 return None;
216 }
217
218 if self.first {
219 self.first = false;
220 let ts = self.reader.read_bits(64).ok()? as i64;
221 let val = self.reader.read_bits(64).ok()?;
222 self.prev_ts = ts;
223 self.prev_value = val;
224 self.count = 1;
225 return Some((ts, f64::from_bits(val)));
226 }
227
228 let ts = self.decode_timestamp().ok()?;
229 let val = self.decode_value().ok()?;
230 self.count += 1;
231 Some((ts, f64::from_bits(val)))
232 }
233
234 fn decode_timestamp(&mut self) -> Result<i64, CodecError> {
235 let bit = self.reader.read_bit()?;
236 let dod = if !bit {
237 0i64
238 } else {
239 let bit2 = self.reader.read_bit()?;
240 if !bit2 {
241 let raw = self.reader.read_bits(7)? as i64;
242 sign_extend(raw, 7)
243 } else {
244 let bit3 = self.reader.read_bit()?;
245 if !bit3 {
246 let raw = self.reader.read_bits(9)? as i64;
247 sign_extend(raw, 9)
248 } else {
249 let bit4 = self.reader.read_bit()?;
250 if !bit4 {
251 let raw = self.reader.read_bits(12)? as i64;
252 sign_extend(raw, 12)
253 } else {
254 self.reader.read_bits(64)? as i64
255 }
256 }
257 }
258 };
259
260 let delta = self.prev_delta + dod;
261 let ts = self.prev_ts + delta;
262 self.prev_ts = ts;
263 self.prev_delta = delta;
264 Ok(ts)
265 }
266
267 fn decode_value(&mut self) -> Result<u64, CodecError> {
268 let bit = self.reader.read_bit()?;
269 if !bit {
270 return Ok(self.prev_value);
271 }
272
273 let bit2 = self.reader.read_bit()?;
274 let xor = if !bit2 {
275 let meaningful_bits = 64 - self.prev_leading - self.prev_trailing;
276 let bits = self.reader.read_bits(meaningful_bits as usize)?;
277 bits << self.prev_trailing
278 } else {
279 let leading = self.reader.read_bits(6)? as u8;
280 let meaningful_bits = self.reader.read_bits(6)? as u8 + 1;
281 let trailing = 64 - leading - meaningful_bits;
282 let bits = self.reader.read_bits(meaningful_bits as usize)?;
283 self.prev_leading = leading;
284 self.prev_trailing = trailing;
285 bits << trailing
286 };
287
288 let val = self.prev_value ^ xor;
289 self.prev_value = val;
290 Ok(val)
291 }
292
293 pub fn decode_all(&mut self) -> Vec<(i64, f64)> {
295 let mut samples = Vec::new();
296 while let Some(s) = self.next_sample() {
297 samples.push(s);
298 }
299 samples
300 }
301}
302
303fn sign_extend(value: i64, bits: u32) -> i64 {
304 let shift = 64 - bits;
305 (value << shift) >> shift
306}
307
308pub fn encode_f64(values: &[f64]) -> Vec<u8> {
317 let mut enc = GorillaEncoder::new();
318 for (i, &v) in values.iter().enumerate() {
319 enc.encode(i as i64, v);
320 }
321 enc.finish()
322}
323
324pub fn decode_f64(data: &[u8]) -> Result<Vec<f64>, CodecError> {
326 let mut dec = GorillaDecoder::new(data);
327 let samples = dec.decode_all();
328 if samples.len() != dec.total as usize {
329 return Err(CodecError::Truncated {
330 expected: dec.total as usize,
331 actual: samples.len(),
332 });
333 }
334 Ok(samples.into_iter().map(|(_, v)| v).collect())
335}
336
337pub fn encode_timestamps(timestamps: &[i64]) -> Vec<u8> {
342 let mut enc = GorillaEncoder::new();
343 for &ts in timestamps {
344 enc.encode(ts, 0.0);
345 }
346 enc.finish()
347}
348
349pub fn decode_timestamps(data: &[u8]) -> Result<Vec<i64>, CodecError> {
351 let mut dec = GorillaDecoder::new(data);
352 let samples = dec.decode_all();
353 if samples.len() != dec.total as usize {
354 return Err(CodecError::Truncated {
355 expected: dec.total as usize,
356 actual: samples.len(),
357 });
358 }
359 Ok(samples.into_iter().map(|(ts, _)| ts).collect())
360}
361
362#[cfg(test)]
363mod tests {
364 use super::*;
365
366 #[test]
367 fn empty_encoder() {
368 let enc = GorillaEncoder::new();
369 assert_eq!(enc.count(), 0);
370 let data = enc.finish();
371 assert_eq!(data.len(), 4);
372 assert_eq!(u32::from_le_bytes(data[0..4].try_into().unwrap()), 0);
373 }
374
375 #[test]
376 fn single_sample_roundtrip() {
377 let mut enc = GorillaEncoder::new();
378 enc.encode(1000, 42.5);
379 let data = enc.finish();
380
381 let mut dec = GorillaDecoder::new(&data);
382 let (ts, val) = dec.next_sample().unwrap();
383 assert_eq!(ts, 1000);
384 assert!((val - 42.5).abs() < f64::EPSILON);
385 assert!(dec.next_sample().is_none());
386 }
387
388 #[test]
389 fn monotonic_timestamps_compress_well() {
390 let mut enc = GorillaEncoder::new();
391 for i in 0..1000 {
392 enc.encode(1_000_000 + i * 10_000, 100.0 + (i as f64) * 0.001);
393 }
394 let data = enc.finish();
395
396 assert!(
397 data.len() < 8000,
398 "expected good compression, got {} bytes for 1000 samples",
399 data.len()
400 );
401
402 let mut dec = GorillaDecoder::new(&data);
403 let samples = dec.decode_all();
404 assert_eq!(samples.len(), 1000);
405 assert_eq!(samples[0].0, 1_000_000);
406 }
407
408 #[test]
409 fn identical_values_compress_minimally() {
410 let mut enc = GorillaEncoder::new();
411 for i in 0..100 {
412 enc.encode(1000 + i * 1000, 42.0);
413 }
414 let data = enc.finish();
415
416 assert!(
417 data.len() < 100,
418 "identical values should compress well, got {} bytes",
419 data.len()
420 );
421
422 let mut dec = GorillaDecoder::new(&data);
423 let samples = dec.decode_all();
424 assert_eq!(samples.len(), 100);
425 for s in &samples {
426 assert!((s.1 - 42.0).abs() < f64::EPSILON);
427 }
428 }
429
430 #[test]
431 fn f64_batch_roundtrip() {
432 let values: Vec<f64> = (0..500).map(|i| 42.0 + i as f64 * 0.1).collect();
433 let encoded = encode_f64(&values);
434 let decoded = decode_f64(&encoded).unwrap();
435 assert_eq!(values.len(), decoded.len());
436 for (a, b) in values.iter().zip(decoded.iter()) {
437 assert_eq!(a.to_bits(), b.to_bits());
438 }
439 }
440
441 #[test]
442 fn timestamp_batch_roundtrip() {
443 let timestamps: Vec<i64> = (0..1000).map(|i| 1_700_000_000_000 + i * 10_000).collect();
444 let encoded = encode_timestamps(×tamps);
445 let decoded = decode_timestamps(&encoded).unwrap();
446 assert_eq!(timestamps, decoded);
447 }
448
449 #[test]
450 fn varying_values_roundtrip() {
451 let mut enc = GorillaEncoder::new();
452 let test_values = [
453 0.0,
454 1.0,
455 -1.0,
456 f64::MAX,
457 f64::MIN,
458 std::f64::consts::PI,
459 1e-300,
460 1e300,
461 ];
462 for (i, &val) in test_values.iter().enumerate() {
463 enc.encode(i as i64 * 1000, val);
464 }
465 let data = enc.finish();
466
467 let mut dec = GorillaDecoder::new(&data);
468 let samples = dec.decode_all();
469 assert_eq!(samples.len(), test_values.len());
470 for (i, &expected) in test_values.iter().enumerate() {
471 assert_eq!(samples[i].1.to_bits(), expected.to_bits());
472 }
473 }
474
475 #[test]
476 fn compression_ratio() {
477 let mut enc = GorillaEncoder::new();
478 let mut rng_state: u64 = 12345;
479 for i in 0..10_000 {
480 rng_state = rng_state.wrapping_mul(6364136223846793005).wrapping_add(1);
481 let jitter = ((rng_state >> 33) as f64) / (u32::MAX as f64) * 2.0 - 1.0;
482 let value = 50.0 + jitter * 5.0;
483 enc.encode(1_700_000_000_000 + i * 10_000, value);
484 }
485 let data = enc.finish();
486
487 let raw_size = 10_000 * 16;
488 let ratio = raw_size as f64 / data.len() as f64;
489 assert!(
490 ratio > 2.0,
491 "compression ratio {ratio:.1}:1 too low (expected >2:1)"
492 );
493 }
494}