1use std::io::{self, Write};
13
14use byteorder::{LittleEndian, WriteBytesExt};
15
16use super::{bitpack_read, bitpack_write, bits_needed_u64};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[repr(u8)]
23pub enum CodecType {
24 Constant = 0,
25 Bitpacked = 1,
26 Linear = 2,
27 BlockwiseLinear = 3,
28}
29
30impl CodecType {
31 pub fn from_u8(v: u8) -> Option<Self> {
32 match v {
33 0 => Some(Self::Constant),
34 1 => Some(Self::Bitpacked),
35 2 => Some(Self::Linear),
36 3 => Some(Self::BlockwiseLinear),
37 _ => None,
38 }
39 }
40}
41
42pub const BLOCKWISE_LINEAR_BLOCK_SIZE: usize = 512;
44
45pub trait CodecEstimator {
52 fn collect(&mut self, value: u64);
53 fn finalize(&mut self) {}
54 fn estimate(&self) -> Option<u64>;
55 fn serialize(&self, values: &[u64], writer: &mut dyn Write) -> io::Result<u64>;
56}
57
58#[derive(Default)]
62pub struct ConstantEstimator {
63 first: Option<u64>,
64 all_same: bool,
65}
66
67impl CodecEstimator for ConstantEstimator {
68 fn collect(&mut self, value: u64) {
69 match self.first {
70 None => {
71 self.first = Some(value);
72 self.all_same = true;
73 }
74 Some(f) => {
75 if value != f {
76 self.all_same = false;
77 }
78 }
79 }
80 }
81
82 fn estimate(&self) -> Option<u64> {
83 if self.all_same {
84 Some(9)
86 } else {
87 None
88 }
89 }
90
91 fn serialize(&self, values: &[u64], writer: &mut dyn Write) -> io::Result<u64> {
92 let val = if values.is_empty() { 0 } else { values[0] };
93 writer.write_u8(CodecType::Constant as u8)?;
94 writer.write_u64::<LittleEndian>(val)?;
95 Ok(9)
96 }
97}
98
99#[derive(Default)]
103pub struct BitpackedEstimator {
104 min: u64,
105 max: u64,
106 count: usize,
107 initialized: bool,
108}
109
110impl CodecEstimator for BitpackedEstimator {
111 fn collect(&mut self, value: u64) {
112 if !self.initialized {
113 self.min = value;
114 self.max = value;
115 self.initialized = true;
116 } else {
117 self.min = self.min.min(value);
118 self.max = self.max.max(value);
119 }
120 self.count += 1;
121 }
122
123 fn estimate(&self) -> Option<u64> {
124 if self.count == 0 {
125 return Some(0);
126 }
127 let range = self.max - self.min;
128 let bpv = bits_needed_u64(range) as u64;
129 let data_bits = self.count as u64 * bpv;
131 let data_bytes = data_bits.div_ceil(8);
132 Some(1 + 8 + 1 + data_bytes)
133 }
134
135 fn serialize(&self, values: &[u64], writer: &mut dyn Write) -> io::Result<u64> {
136 let (min_value, bpv) = if values.is_empty() {
137 (0u64, 0u8)
138 } else {
139 let min_val = values.iter().copied().min().unwrap();
140 let max_val = values.iter().copied().max().unwrap();
141 (min_val, bits_needed_u64(max_val - min_val))
142 };
143
144 writer.write_u8(CodecType::Bitpacked as u8)?;
145 writer.write_u64::<LittleEndian>(min_value)?;
146 writer.write_u8(bpv)?;
147 let mut bytes_written = 10u64; if bpv > 0 && !values.is_empty() {
150 let shifted: Vec<u64> = values.iter().map(|&v| v - min_value).collect();
151 let mut packed = Vec::new();
152 bitpack_write(&shifted, bpv, &mut packed);
153 writer.write_all(&packed)?;
154 bytes_written += packed.len() as u64;
155 }
156 Ok(bytes_written)
157 }
158}
159
160#[inline]
164pub fn bitpacked_read(data: &[u8], index: usize) -> u64 {
165 let min_value = u64::from_le_bytes(data[0..8].try_into().unwrap());
166 let bpv = data[8];
167 if bpv == 0 {
168 return min_value;
169 }
170 let packed = &data[9..];
171 bitpack_read(packed, bpv, index) + min_value
172}
173
174#[derive(Default)]
183pub struct LinearEstimator {
184 count: usize,
185 first: u64,
186 last: u64,
187 min_val: u64,
188 max_val: u64,
189 min_residual: i64,
190 max_residual: i64,
191 values_collected: bool,
192}
193
194impl CodecEstimator for LinearEstimator {
195 fn collect(&mut self, value: u64) {
196 if !self.values_collected {
197 self.first = value;
198 self.min_val = value;
199 self.max_val = value;
200 self.values_collected = true;
201 } else {
202 self.min_val = self.min_val.min(value);
203 self.max_val = self.max_val.max(value);
204 }
205 self.last = value;
206 self.count += 1;
207 }
208
209 fn finalize(&mut self) {
210 if self.count < 2 {
211 return;
212 }
213 let pred_min = self.first.min(self.last) as i128;
221 let pred_max = self.first.max(self.last) as i128;
222 let min_res = self.min_val as i128 - pred_max;
223 let max_res = self.max_val as i128 - pred_min;
224 self.min_residual = min_res.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
225 self.max_residual = max_res.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
226 }
227
228 fn estimate(&self) -> Option<u64> {
229 if self.count < 2 {
230 return None;
231 }
232 let range = (self.max_residual as i128 - self.min_residual as i128) as u64;
234 let bpv = bits_needed_u64(range) as u64;
235 let data_bits = self.count as u64 * bpv;
236 let data_bytes = data_bits.div_ceil(8);
237 Some(1 + 8 + 8 + 4 + 8 + 1 + data_bytes)
239 }
240
241 fn serialize(&self, values: &[u64], writer: &mut dyn Write) -> io::Result<u64> {
242 let n = values.len();
243 if n < 2 {
244 return Err(io::Error::new(
245 io::ErrorKind::InvalidInput,
246 "linear needs ≥ 2 values",
247 ));
248 }
249 let first = values[0];
250 let last = values[n - 1];
251
252 let mut min_residual = i128::MAX;
254 for (i, &val) in values.iter().enumerate() {
255 let predicted = interpolate(first, last, n, i);
256 let residual = val as i128 - predicted as i128;
257 min_residual = min_residual.min(residual);
258 }
259
260 let shifted: Vec<u64> = values
262 .iter()
263 .enumerate()
264 .map(|(i, &val)| {
265 let predicted = interpolate(first, last, n, i);
266 let residual = val as i128 - predicted as i128;
267 (residual - min_residual) as u64
268 })
269 .collect();
270 let max_shifted = shifted.iter().copied().max().unwrap_or(0);
271 let bpv = bits_needed_u64(max_shifted);
272
273 let min_residual_i64 = min_residual.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
274 writer.write_u8(CodecType::Linear as u8)?;
275 writer.write_u64::<LittleEndian>(first)?;
276 writer.write_u64::<LittleEndian>(last)?;
277 writer.write_u32::<LittleEndian>(n as u32)?;
278 writer.write_i64::<LittleEndian>(min_residual_i64)?;
279 writer.write_u8(bpv)?;
280 let mut bytes_written = 30u64; if bpv > 0 {
283 let mut packed = Vec::new();
284 bitpack_write(&shifted, bpv, &mut packed);
285 writer.write_all(&packed)?;
286 bytes_written += packed.len() as u64;
287 }
288
289 Ok(bytes_written)
290 }
291}
292
293#[inline]
295fn interpolate(first: u64, last: u64, n: usize, i: usize) -> u64 {
296 if n <= 1 {
297 return first;
298 }
299 let first = first as i128;
301 let last = last as i128;
302 let n = n as i128;
303 let i = i as i128;
304 let result = first + (last - first) * i / (n - 1);
305 result as u64
306}
307
308#[inline]
312pub fn linear_read(data: &[u8], index: usize) -> u64 {
313 let first = u64::from_le_bytes(data[0..8].try_into().unwrap());
314 let last = u64::from_le_bytes(data[8..16].try_into().unwrap());
315 let n = u32::from_le_bytes(data[16..20].try_into().unwrap()) as usize;
316 let offset = i64::from_le_bytes(data[20..28].try_into().unwrap());
317 let bpv = data[28];
318 let predicted = interpolate(first, last, n, index);
319 let residual = if bpv == 0 {
320 0u64
321 } else {
322 bitpack_read(&data[29..], bpv, index)
323 };
324 (predicted as i128 + offset as i128 + residual as i128) as u64
326}
327
328#[derive(Default)]
335pub struct BlockwiseLinearEstimator {
336 values: Vec<u64>,
337}
338
339impl CodecEstimator for BlockwiseLinearEstimator {
340 fn collect(&mut self, value: u64) {
341 self.values.push(value);
342 }
343
344 fn estimate(&self) -> Option<u64> {
345 let n = self.values.len();
346 if n < 2 * BLOCKWISE_LINEAR_BLOCK_SIZE {
347 return None;
349 }
350
351 let num_blocks = n.div_ceil(BLOCKWISE_LINEAR_BLOCK_SIZE);
352 let mut total = 9u64; for b in 0..num_blocks {
356 let start = b * BLOCKWISE_LINEAR_BLOCK_SIZE;
357 let end = (start + BLOCKWISE_LINEAR_BLOCK_SIZE).min(n);
358 let block = &self.values[start..end];
359 let block_len = block.len();
360
361 if block_len < 2 {
362 total += 29; continue;
365 }
366
367 let first = block[0];
368 let last = block[block_len - 1];
369 let mut min_res = i128::MAX;
370 let mut max_res = i128::MIN;
371 for (i, &val) in block.iter().enumerate() {
372 let pred = interpolate(first, last, block_len, i);
373 let res = val as i128 - pred as i128;
374 min_res = min_res.min(res);
375 max_res = max_res.max(res);
376 }
377 let range = (max_res - min_res) as u64;
378 let bpv = bits_needed_u64(range) as u64;
379 let data_bits = block_len as u64 * bpv;
380 let data_bytes = data_bits.div_ceil(8);
381 total += 29 + data_bytes;
382 }
383
384 Some(total)
385 }
386
387 fn serialize(&self, values: &[u64], writer: &mut dyn Write) -> io::Result<u64> {
388 let n = values.len();
389 let num_blocks = n.div_ceil(BLOCKWISE_LINEAR_BLOCK_SIZE);
390
391 writer.write_u8(CodecType::BlockwiseLinear as u8)?;
392 writer.write_u32::<LittleEndian>(n as u32)?;
393 writer.write_u32::<LittleEndian>(num_blocks as u32)?;
394 let mut bytes_written = 9u64;
395
396 for b in 0..num_blocks {
397 let start = b * BLOCKWISE_LINEAR_BLOCK_SIZE;
398 let end = (start + BLOCKWISE_LINEAR_BLOCK_SIZE).min(n);
399 let block = &values[start..end];
400 let block_len = block.len();
401
402 let first = block[0];
403 let last = if block_len > 1 {
404 block[block_len - 1]
405 } else {
406 first
407 };
408
409 let mut min_residual = i128::MAX;
411 if block_len >= 2 {
412 for (i, &val) in block.iter().enumerate() {
413 let pred = interpolate(first, last, block_len, i);
414 let res = val as i128 - pred as i128;
415 min_residual = min_residual.min(res);
416 }
417 } else {
418 min_residual = 0;
419 }
420
421 let shifted: Vec<u64> = block
422 .iter()
423 .enumerate()
424 .map(|(i, &val)| {
425 if block_len < 2 {
426 return 0;
427 }
428 let pred = interpolate(first, last, block_len, i);
429 let res = val as i128 - pred as i128;
430 (res - min_residual) as u64
431 })
432 .collect();
433 let max_shifted = shifted.iter().copied().max().unwrap_or(0);
434 let bpv = bits_needed_u64(max_shifted);
435
436 let min_res_i64 = min_residual.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
437 writer.write_u64::<LittleEndian>(first)?;
438 writer.write_u64::<LittleEndian>(last)?;
439 writer.write_i64::<LittleEndian>(min_res_i64)?;
440 writer.write_u8(bpv)?;
441
442 let mut packed = Vec::new();
443 if bpv > 0 {
444 bitpack_write(&shifted, bpv, &mut packed);
445 }
446 writer.write_u32::<LittleEndian>(packed.len() as u32)?;
447 writer.write_all(&packed)?;
448 bytes_written += 29 + packed.len() as u64;
449 }
450
451 Ok(bytes_written)
452 }
453}
454
455pub fn blockwise_linear_read(data: &[u8], index: usize) -> u64 {
459 let _num_values = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
460 let num_blocks = u32::from_le_bytes(data[4..8].try_into().unwrap()) as usize;
461
462 let target_block = index / BLOCKWISE_LINEAR_BLOCK_SIZE;
463 let index_in_block = index % BLOCKWISE_LINEAR_BLOCK_SIZE;
464
465 let mut pos = 8usize;
467 for b in 0..num_blocks {
468 let first = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
469 let last = u64::from_le_bytes(data[pos + 8..pos + 16].try_into().unwrap());
470 let offset = i64::from_le_bytes(data[pos + 16..pos + 24].try_into().unwrap());
471 let bpv = data[pos + 24];
472 let packed_len = u32::from_le_bytes(data[pos + 25..pos + 29].try_into().unwrap()) as usize;
473
474 if b == target_block {
475 let block_start = b * BLOCKWISE_LINEAR_BLOCK_SIZE;
476 let block_end = ((b + 1) * BLOCKWISE_LINEAR_BLOCK_SIZE).min(_num_values);
477 let block_len = block_end - block_start;
478
479 let predicted = interpolate(first, last, block_len, index_in_block);
480 let residual = if bpv == 0 {
481 0u64
482 } else {
483 bitpack_read(&data[pos + 29..], bpv, index_in_block)
484 };
485 return (predicted as i128 + offset as i128 + residual as i128) as u64;
486 }
487
488 pos += 29 + packed_len;
489 }
490
491 0 }
493
494pub fn serialize_auto(values: &[u64], writer: &mut dyn Write) -> io::Result<u64> {
500 let mut constant = ConstantEstimator::default();
501 let mut bitpacked = BitpackedEstimator::default();
502 let mut linear = LinearEstimator::default();
503 let mut blockwise = BlockwiseLinearEstimator::default();
504
505 for &v in values {
507 constant.collect(v);
508 bitpacked.collect(v);
509 linear.collect(v);
510 blockwise.collect(v);
511 }
512
513 constant.finalize();
515 bitpacked.finalize();
516 linear.finalize();
517 blockwise.finalize();
518
519 let candidates: Vec<(&dyn CodecEstimator, &str)> = vec![
521 (&constant, "constant"),
522 (&bitpacked, "bitpacked"),
523 (&linear, "linear"),
524 (&blockwise, "blockwise_linear"),
525 ];
526
527 let (best, _name) = candidates
528 .into_iter()
529 .filter_map(|(est, name)| est.estimate().map(|size| (est, name, size)))
530 .min_by_key(|&(_, _, size)| size)
531 .map(|(est, name, _)| (est, name))
532 .unwrap_or((&bitpacked as &dyn CodecEstimator, "bitpacked"));
533
534 best.serialize(values, writer)
535}
536
537pub fn bitpacked_read_batch(data: &[u8], start_index: usize, out: &mut [u64]) {
544 let min_value = u64::from_le_bytes(data[0..8].try_into().unwrap());
545 let bpv = data[8];
546
547 if bpv == 0 {
548 out.iter_mut().for_each(|v| *v = min_value);
549 return;
550 }
551
552 let packed = &data[9..];
553
554 match bpv {
555 8 => {
557 for (i, v) in out.iter_mut().enumerate() {
558 let idx = start_index + i;
559 *v = packed[idx] as u64 + min_value;
560 }
561 }
562 16 => {
563 for (i, v) in out.iter_mut().enumerate() {
564 let idx = start_index + i;
565 let byte_off = idx * 2;
566 let raw = u16::from_le_bytes([packed[byte_off], packed[byte_off + 1]]);
567 *v = raw as u64 + min_value;
568 }
569 }
570 32 => {
571 for (i, v) in out.iter_mut().enumerate() {
572 let idx = start_index + i;
573 let byte_off = idx * 4;
574 let raw = u32::from_le_bytes(packed[byte_off..byte_off + 4].try_into().unwrap());
575 *v = raw as u64 + min_value;
576 }
577 }
578 64 => {
579 for (i, v) in out.iter_mut().enumerate() {
580 let idx = start_index + i;
581 let byte_off = idx * 8;
582 let raw = u64::from_le_bytes(packed[byte_off..byte_off + 8].try_into().unwrap());
583 *v = raw.wrapping_add(min_value);
584 }
585 }
586 _ => {
588 for (i, v) in out.iter_mut().enumerate() {
589 *v = super::bitpack_read(packed, bpv, start_index + i) + min_value;
590 }
591 }
592 }
593}
594
595pub fn auto_read_batch(data: &[u8], start_index: usize, out: &mut [u64]) {
600 if data.is_empty() || out.is_empty() {
601 out.iter_mut().for_each(|v| *v = 0);
602 return;
603 }
604 let codec_id = data[0];
605 let rest = &data[1..];
606 match CodecType::from_u8(codec_id) {
607 Some(CodecType::Constant) => {
608 let val = u64::from_le_bytes(rest[0..8].try_into().unwrap());
609 out.iter_mut().for_each(|v| *v = val);
610 }
611 Some(CodecType::Bitpacked) => bitpacked_read_batch(rest, start_index, out),
612 Some(CodecType::Linear) => {
613 for (i, v) in out.iter_mut().enumerate() {
614 *v = linear_read(rest, start_index + i);
615 }
616 }
617 Some(CodecType::BlockwiseLinear) => {
618 for (i, v) in out.iter_mut().enumerate() {
619 *v = blockwise_linear_read(rest, start_index + i);
620 }
621 }
622 None => out.iter_mut().for_each(|v| *v = 0),
623 }
624}
625
626#[inline]
630pub fn auto_read(data: &[u8], index: usize) -> u64 {
631 if data.is_empty() {
632 return 0;
633 }
634 let codec_id = data[0];
635 let rest = &data[1..];
636 match CodecType::from_u8(codec_id) {
637 Some(CodecType::Constant) => {
638 u64::from_le_bytes(rest[0..8].try_into().unwrap())
640 }
641 Some(CodecType::Bitpacked) => bitpacked_read(rest, index),
642 Some(CodecType::Linear) => linear_read(rest, index),
643 Some(CodecType::BlockwiseLinear) => blockwise_linear_read(rest, index),
644 None => 0,
645 }
646}
647
648#[cfg(test)]
651mod tests {
652 use super::*;
653
654 fn roundtrip(values: &[u64]) -> Vec<u64> {
655 let mut buf = Vec::new();
656 serialize_auto(values, &mut buf).unwrap();
657 (0..values.len()).map(|i| auto_read(&buf, i)).collect()
658 }
659
660 #[test]
661 fn test_constant_codec() {
662 let values: Vec<u64> = vec![42; 100];
663 let mut buf = Vec::new();
664 serialize_auto(&values, &mut buf).unwrap();
665 assert_eq!(buf[0], CodecType::Constant as u8);
666 assert_eq!(buf.len(), 9);
667 assert_eq!(roundtrip(&values), values);
668 }
669
670 #[test]
671 fn test_bitpacked_codec() {
672 let values: Vec<u64> = (0..50).map(|i| 1000 + (i % 7) * 13).collect();
673 let result = roundtrip(&values);
674 assert_eq!(result, values);
675 }
676
677 #[test]
678 fn test_linear_codec_sequential() {
679 let values: Vec<u64> = (0..1000).map(|i| 100 + i * 3).collect();
681 let mut buf = Vec::new();
682 serialize_auto(&values, &mut buf).unwrap();
683 assert_eq!(roundtrip(&values), values);
685 }
686
687 #[test]
688 fn test_blockwise_linear_codec() {
689 let mut values: Vec<u64> = Vec::new();
691 for i in 0..1500 {
692 if i < 750 {
693 values.push(100 + i * 2);
694 } else {
695 values.push(5000 + (i - 750) * 5);
696 }
697 }
698 let result = roundtrip(&values);
699 assert_eq!(result, values);
700 }
701
702 #[test]
703 fn test_empty() {
704 let values: Vec<u64> = vec![];
705 let mut buf = Vec::new();
706 serialize_auto(&values, &mut buf).unwrap();
707 assert!(buf.len() <= 10);
708 }
709
710 #[test]
711 fn test_single_value() {
712 let values = vec![999u64];
713 assert_eq!(roundtrip(&values), values);
714 }
715
716 #[test]
717 fn test_two_values() {
718 let values = vec![10u64, 20];
719 assert_eq!(roundtrip(&values), values);
720 }
721
722 #[test]
723 fn test_large_range() {
724 let values = vec![0u64, u64::MAX / 2, u64::MAX];
725 assert_eq!(roundtrip(&values), values);
726 }
727
728 #[test]
729 fn test_timestamps_pick_linear_or_blockwise() {
730 let mut values: Vec<u64> = Vec::new();
732 let mut ts = 1_700_000_000u64;
733 for _ in 0..2000 {
734 values.push(ts);
735 ts += 1000 + (ts % 7); }
737 let result = roundtrip(&values);
738 assert_eq!(result, values);
739 }
740
741 fn roundtrip_batch(values: &[u64]) {
743 let mut buf = Vec::new();
744 serialize_auto(values, &mut buf).unwrap();
745
746 let mut batch_out = vec![0u64; values.len()];
748 auto_read_batch(&buf, 0, &mut batch_out);
749 assert_eq!(batch_out, values, "batch read mismatch");
750
751 if values.len() >= 10 {
753 let start = 3;
754 let count = values.len() - 6;
755 let mut sub = vec![0u64; count];
756 auto_read_batch(&buf, start, &mut sub);
757 assert_eq!(
758 sub,
759 &values[start..start + count],
760 "sub-range batch mismatch"
761 );
762 }
763 }
764
765 #[test]
766 fn test_batch_read_constant() {
767 roundtrip_batch(&vec![42u64; 100]);
768 }
769
770 #[test]
771 fn test_batch_read_bitpacked_8bit() {
772 let values: Vec<u64> = (0..200).map(|i| 1000 + (i % 200)).collect();
774 roundtrip_batch(&values);
775 }
776
777 #[test]
778 fn test_batch_read_bitpacked_16bit() {
779 let values: Vec<u64> = (0..200).map(|i| 50000 + i * 100).collect();
781 roundtrip_batch(&values);
782 }
783
784 #[test]
785 fn test_batch_read_bitpacked_arbitrary() {
786 let values: Vec<u64> = (0..100).map(|i| 999 + (i * 37) % 8000).collect();
788 roundtrip_batch(&values);
789 }
790
791 #[test]
792 fn test_batch_read_linear() {
793 let values: Vec<u64> = (0..500).map(|i| 100 + i * 3).collect();
794 roundtrip_batch(&values);
795 }
796
797 #[test]
798 fn test_batch_read_blockwise() {
799 let mut values = Vec::new();
800 for i in 0..1500u64 {
801 values.push(if i < 750 {
802 100 + i * 2
803 } else {
804 5000 + (i - 750) * 5
805 });
806 }
807 roundtrip_batch(&values);
808 }
809}