1use std::convert::TryInto;
11
12use crate::columnar::error::{ColumnarError, Result};
13use serde::{Deserialize, Serialize};
14
15use super::encoding::{Column, LogicalType};
16
17#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
19pub enum EncodingV2 {
20 Plain,
22 Dictionary,
24 Rle,
26 Bitpack,
28 Delta,
30 DeltaLength,
32 ByteStreamSplit,
34 FOR,
36 PFOR,
38 IncrementalString,
40}
41
42#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
44pub struct Bitmap {
45 bits: Vec<u8>,
47 len: usize,
49}
50
51impl Bitmap {
52 pub fn all_valid(len: usize) -> Self {
54 let num_bytes = len.div_ceil(8);
55 Self {
56 bits: vec![0xFF; num_bytes],
57 len,
58 }
59 }
60
61 pub fn from_bools(valid: &[bool]) -> Self {
63 let len = valid.len();
64 let num_bytes = len.div_ceil(8);
65 let mut bits = vec![0u8; num_bytes];
66 for (i, &v) in valid.iter().enumerate() {
67 if v {
68 bits[i / 8] |= 1 << (i % 8);
69 }
70 }
71 Self { bits, len }
72 }
73
74 pub fn new(len: usize) -> Self {
76 Self::new_zeroed(len)
77 }
78
79 pub fn new_zeroed(len: usize) -> Self {
81 let num_bytes = len.div_ceil(8);
82 Self {
83 bits: vec![0u8; num_bytes],
84 len,
85 }
86 }
87
88 pub fn is_valid(&self, index: usize) -> bool {
90 if index >= self.len {
91 return false;
92 }
93 (self.bits[index / 8] & (1 << (index % 8))) != 0
94 }
95
96 pub fn get(&self, index: usize) -> bool {
98 self.is_valid(index)
99 }
100
101 pub fn set(&mut self, index: usize, valid: bool) {
103 if index >= self.len {
104 return;
105 }
106 if valid {
107 self.bits[index / 8] |= 1 << (index % 8);
108 } else {
109 self.bits[index / 8] &= !(1 << (index % 8));
110 }
111 }
112
113 pub fn null_count(&self) -> usize {
115 self.len
116 - self
117 .bits
118 .iter()
119 .map(|b| b.count_ones() as usize)
120 .sum::<usize>()
121 }
122
123 pub fn to_bytes(&self) -> Vec<u8> {
125 let mut buf = Vec::with_capacity(4 + self.bits.len());
126 buf.extend_from_slice(&(self.len as u32).to_le_bytes());
127 buf.extend_from_slice(&self.bits);
128 buf
129 }
130
131 pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
133 if bytes.len() < 4 {
134 return Err(ColumnarError::InvalidFormat(
135 "bitmap header too short".into(),
136 ));
137 }
138 let len = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
139 let num_bytes = len.div_ceil(8);
140 if bytes.len() < 4 + num_bytes {
141 return Err(ColumnarError::InvalidFormat("bitmap data truncated".into()));
142 }
143 Ok(Self {
144 bits: bytes[4..4 + num_bytes].to_vec(),
145 len,
146 })
147 }
148
149 pub fn len(&self) -> usize {
151 self.len
152 }
153
154 pub fn is_empty(&self) -> bool {
156 self.len == 0
157 }
158}
159
160pub trait Encoder: Send + Sync {
162 fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>>;
164 fn encoding_type(&self) -> EncodingV2;
166}
167
168pub trait Decoder: Send + Sync {
170 fn decode(
172 &self,
173 data: &[u8],
174 num_values: usize,
175 logical_type: LogicalType,
176 ) -> Result<(Column, Option<Bitmap>)>;
177}
178
179pub struct DeltaEncoder;
188
189impl Encoder for DeltaEncoder {
190 fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
191 let values = match data {
192 Column::Int64(v) => v,
193 _ => {
194 return Err(ColumnarError::InvalidFormat(
195 "delta encoding requires Int64".into(),
196 ))
197 }
198 };
199
200 if values.is_empty() {
201 let mut buf = Vec::with_capacity(8);
202 buf.extend_from_slice(&0u32.to_le_bytes()); buf.extend_from_slice(&0u8.to_le_bytes()); return Ok(buf);
205 }
206
207 let mut buf = Vec::new();
208 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
209
210 if let Some(bitmap) = null_bitmap {
212 buf.push(1u8);
213 buf.extend_from_slice(&bitmap.to_bytes());
214 } else {
215 buf.push(0u8);
216 }
217
218 buf.extend_from_slice(&values[0].to_le_bytes());
220
221 for window in values.windows(2) {
223 let delta = window[1].wrapping_sub(window[0]);
224 let zigzag = zigzag_encode(delta);
225 encode_varint(zigzag, &mut buf);
226 }
227
228 Ok(buf)
229 }
230
231 fn encoding_type(&self) -> EncodingV2 {
232 EncodingV2::Delta
233 }
234}
235
236pub struct DeltaDecoder;
238
239impl Decoder for DeltaDecoder {
240 fn decode(
241 &self,
242 data: &[u8],
243 _num_values: usize,
244 logical_type: LogicalType,
245 ) -> Result<(Column, Option<Bitmap>)> {
246 if logical_type != LogicalType::Int64 {
247 return Err(ColumnarError::InvalidFormat(
248 "delta decoding requires Int64".into(),
249 ));
250 }
251
252 if data.len() < 5 {
253 return Err(ColumnarError::InvalidFormat(
254 "delta header too short".into(),
255 ));
256 }
257
258 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
259 let has_bitmap = data[4] != 0;
260 let mut pos = 5;
261
262 let bitmap = if has_bitmap {
263 let bm = Bitmap::from_bytes(&data[pos..])?;
264 pos += 4 + bm.len().div_ceil(8);
265 Some(bm)
266 } else {
267 None
268 };
269
270 if count == 0 {
271 return Ok((Column::Int64(vec![]), bitmap));
272 }
273
274 if pos + 8 > data.len() {
275 return Err(ColumnarError::InvalidFormat(
276 "delta first value truncated".into(),
277 ));
278 }
279
280 let first = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
281 pos += 8;
282
283 let mut values = Vec::with_capacity(count);
284 values.push(first);
285
286 let mut current = first;
287 for _ in 1..count {
288 let (zigzag, bytes_read) = decode_varint(&data[pos..])?;
289 pos += bytes_read;
290 let delta = zigzag_decode(zigzag);
291 current = current.wrapping_add(delta);
292 values.push(current);
293 }
294
295 Ok((Column::Int64(values), bitmap))
296 }
297}
298
299pub struct ForEncoder;
308
309impl Encoder for ForEncoder {
310 fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
311 let values = match data {
312 Column::Int64(v) => v,
313 _ => {
314 return Err(ColumnarError::InvalidFormat(
315 "FOR encoding requires Int64".into(),
316 ))
317 }
318 };
319
320 if values.is_empty() {
321 let mut buf = Vec::with_capacity(5);
322 buf.extend_from_slice(&0u32.to_le_bytes());
323 buf.push(0u8); return Ok(buf);
325 }
326
327 let min_val = *values.iter().min().unwrap();
328 let max_val = *values.iter().max().unwrap();
329 let range = (max_val - min_val) as u64;
330
331 let bits_needed = if range == 0 {
333 1
334 } else {
335 64 - range.leading_zeros()
336 } as u8;
337
338 let mut buf = Vec::new();
339 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
340
341 if let Some(bitmap) = null_bitmap {
343 buf.push(1u8);
344 buf.extend_from_slice(&bitmap.to_bytes());
345 } else {
346 buf.push(0u8);
347 }
348
349 buf.extend_from_slice(&min_val.to_le_bytes());
351 buf.push(bits_needed);
352
353 let mut bit_buffer = 0u64;
355 let mut bits_in_buffer = 0u8;
356
357 for &v in values {
358 let offset = (v - min_val) as u64;
359 bit_buffer |= offset << bits_in_buffer;
360 bits_in_buffer += bits_needed;
361
362 while bits_in_buffer >= 8 {
363 buf.push(bit_buffer as u8);
364 bit_buffer >>= 8;
365 bits_in_buffer -= 8;
366 }
367 }
368
369 if bits_in_buffer > 0 {
371 buf.push(bit_buffer as u8);
372 }
373
374 Ok(buf)
375 }
376
377 fn encoding_type(&self) -> EncodingV2 {
378 EncodingV2::FOR
379 }
380}
381
382pub struct ForDecoder;
384
385impl Decoder for ForDecoder {
386 fn decode(
387 &self,
388 data: &[u8],
389 _num_values: usize,
390 logical_type: LogicalType,
391 ) -> Result<(Column, Option<Bitmap>)> {
392 if logical_type != LogicalType::Int64 {
393 return Err(ColumnarError::InvalidFormat(
394 "FOR decoding requires Int64".into(),
395 ));
396 }
397
398 if data.len() < 5 {
399 return Err(ColumnarError::InvalidFormat("FOR header too short".into()));
400 }
401
402 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
403 let has_bitmap = data[4] != 0;
404 let mut pos = 5;
405
406 let bitmap = if has_bitmap {
407 let bm = Bitmap::from_bytes(&data[pos..])?;
408 pos += 4 + bm.len().div_ceil(8);
409 Some(bm)
410 } else {
411 None
412 };
413
414 if count == 0 {
415 return Ok((Column::Int64(vec![]), bitmap));
416 }
417
418 if pos + 9 > data.len() {
419 return Err(ColumnarError::InvalidFormat(
420 "FOR reference truncated".into(),
421 ));
422 }
423
424 let reference = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
425 pos += 8;
426 let bits_per_value = data[pos];
427 pos += 1;
428
429 let mut values = Vec::with_capacity(count);
430 let mask = if bits_per_value >= 64 {
431 u64::MAX
432 } else {
433 (1u64 << bits_per_value) - 1
434 };
435
436 let mut bit_buffer = 0u64;
437 let mut bits_in_buffer = 0u8;
438 let mut byte_pos = pos;
439
440 for _ in 0..count {
441 while bits_in_buffer < bits_per_value && byte_pos < data.len() {
442 bit_buffer |= (data[byte_pos] as u64) << bits_in_buffer;
443 bits_in_buffer += 8;
444 byte_pos += 1;
445 }
446
447 if bits_in_buffer < bits_per_value {
449 return Err(ColumnarError::InvalidFormat("FOR data truncated".into()));
450 }
451
452 let offset = bit_buffer & mask;
453 bit_buffer >>= bits_per_value;
454 bits_in_buffer -= bits_per_value;
455
456 values.push(reference + offset as i64);
457 }
458
459 Ok((Column::Int64(values), bitmap))
460 }
461}
462
463pub struct PforEncoder {
472 pub percentile: f64,
474}
475
476impl Default for PforEncoder {
477 fn default() -> Self {
478 Self { percentile: 0.9 }
479 }
480}
481
482impl Encoder for PforEncoder {
483 fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
484 let values = match data {
485 Column::Int64(v) => v,
486 _ => {
487 return Err(ColumnarError::InvalidFormat(
488 "PFOR encoding requires Int64".into(),
489 ))
490 }
491 };
492
493 if values.is_empty() {
494 let mut buf = Vec::with_capacity(5);
495 buf.extend_from_slice(&0u32.to_le_bytes());
496 buf.push(0u8); return Ok(buf);
498 }
499
500 let min_val = *values.iter().min().unwrap();
501
502 let mut offsets: Vec<u64> = values.iter().map(|&v| (v - min_val) as u64).collect();
504 let mut sorted_offsets = offsets.clone();
505 sorted_offsets.sort_unstable();
506
507 let percentile_idx =
508 ((values.len() as f64 * self.percentile) as usize).min(values.len() - 1);
509 let percentile_max = sorted_offsets[percentile_idx];
510
511 let bits_needed = if percentile_max == 0 {
512 1
513 } else {
514 64 - percentile_max.leading_zeros()
515 } as u8;
516
517 let max_packed = if bits_needed >= 64 {
518 u64::MAX
519 } else {
520 (1u64 << bits_needed) - 1
521 };
522
523 let mut exceptions: Vec<(u32, u64)> = Vec::new();
525 for (i, offset) in offsets.iter_mut().enumerate() {
526 if *offset > max_packed {
527 exceptions.push((i as u32, *offset));
528 *offset = 0; }
530 }
531
532 let mut buf = Vec::new();
533 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
534
535 if let Some(bitmap) = null_bitmap {
537 buf.push(1u8);
538 buf.extend_from_slice(&bitmap.to_bytes());
539 } else {
540 buf.push(0u8);
541 }
542
543 buf.extend_from_slice(&min_val.to_le_bytes());
545 buf.push(bits_needed);
546 buf.extend_from_slice(&(exceptions.len() as u32).to_le_bytes());
547
548 let mut bit_buffer = 0u64;
550 let mut bits_in_buffer = 0u8;
551
552 for &offset in &offsets {
553 bit_buffer |= (offset & max_packed) << bits_in_buffer;
554 bits_in_buffer += bits_needed;
555
556 while bits_in_buffer >= 8 {
557 buf.push(bit_buffer as u8);
558 bit_buffer >>= 8;
559 bits_in_buffer -= 8;
560 }
561 }
562
563 if bits_in_buffer > 0 {
564 buf.push(bit_buffer as u8);
565 }
566
567 for (idx, val) in exceptions {
569 buf.extend_from_slice(&idx.to_le_bytes());
570 buf.extend_from_slice(&val.to_le_bytes());
571 }
572
573 Ok(buf)
574 }
575
576 fn encoding_type(&self) -> EncodingV2 {
577 EncodingV2::PFOR
578 }
579}
580
581pub struct PforDecoder;
583
584impl Decoder for PforDecoder {
585 fn decode(
586 &self,
587 data: &[u8],
588 _num_values: usize,
589 logical_type: LogicalType,
590 ) -> Result<(Column, Option<Bitmap>)> {
591 if logical_type != LogicalType::Int64 {
592 return Err(ColumnarError::InvalidFormat(
593 "PFOR decoding requires Int64".into(),
594 ));
595 }
596
597 if data.len() < 5 {
598 return Err(ColumnarError::InvalidFormat("PFOR header too short".into()));
599 }
600
601 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
602 let has_bitmap = data[4] != 0;
603 let mut pos = 5;
604
605 let bitmap = if has_bitmap {
606 let bm = Bitmap::from_bytes(&data[pos..])?;
607 pos += 4 + bm.len().div_ceil(8);
608 Some(bm)
609 } else {
610 None
611 };
612
613 if count == 0 {
614 return Ok((Column::Int64(vec![]), bitmap));
615 }
616
617 if pos + 13 > data.len() {
618 return Err(ColumnarError::InvalidFormat("PFOR header truncated".into()));
619 }
620
621 let reference = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
622 pos += 8;
623 let bits_per_value = data[pos];
624 pos += 1;
625 let exception_count = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
626 pos += 4;
627
628 let mut values = Vec::with_capacity(count);
630 let mask = if bits_per_value >= 64 {
631 u64::MAX
632 } else {
633 (1u64 << bits_per_value) - 1
634 };
635
636 let mut bit_buffer = 0u64;
637 let mut bits_in_buffer = 0u8;
638
639 for _ in 0..count {
640 while bits_in_buffer < bits_per_value && pos < data.len() {
641 bit_buffer |= (data[pos] as u64) << bits_in_buffer;
642 bits_in_buffer += 8;
643 pos += 1;
644 }
645
646 if bits_in_buffer < bits_per_value {
648 return Err(ColumnarError::InvalidFormat("PFOR data truncated".into()));
649 }
650
651 let offset = bit_buffer & mask;
652 bit_buffer >>= bits_per_value;
653 bits_in_buffer -= bits_per_value;
654
655 values.push(reference + offset as i64);
656 }
657
658 if bits_in_buffer > 0 && bits_in_buffer < 8 {
660 }
662
663 for _ in 0..exception_count {
665 if pos + 12 > data.len() {
666 return Err(ColumnarError::InvalidFormat(
667 "PFOR exception truncated".into(),
668 ));
669 }
670 let idx = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
671 pos += 4;
672 let val = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
673 pos += 8;
674
675 if idx < values.len() {
676 values[idx] = reference + val as i64;
677 }
678 }
679
680 Ok((Column::Int64(values), bitmap))
681 }
682}
683
684const BYTE_STREAM_SPLIT_V1_FLAG: u8 = 0x80;
692const BYTE_STREAM_SPLIT_HEADER_MASK: u8 = 0x7F;
693const BYTE_STREAM_SPLIT_BLOCK_SIZE: usize = 256;
694const BYTE_STREAM_SPLIT_SIGN_FLAG: u8 = 0x80;
695const BYTE_STREAM_SPLIT_STREAM_COUNT_MASK: u8 = 0x7F;
696const BYTE_STREAM_SPLIT_FLAG_RAW: u8 = 0;
697const BYTE_STREAM_SPLIT_FLAG_LZ4: u8 = 1;
698const BYTE_STREAM_SPLIT_FLAG_ZSTD: u8 = 2;
699
700pub struct ByteStreamSplitEncoder;
705
706impl Encoder for ByteStreamSplitEncoder {
707 fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
708 let mut sign_bitmap: Option<Bitmap> = None;
709 let (bytes_per_value, raw_bytes): (usize, Vec<u8>) = match data {
710 Column::Float32(values) => {
711 let mut sign_bits = Bitmap::new(values.len());
712 let bytes: Vec<u8> = values
713 .iter()
714 .enumerate()
715 .flat_map(|(idx, v)| {
716 let mut bits = v.to_bits();
717 if bits & 0x8000_0000 != 0 {
718 sign_bits.set(idx, true);
719 bits &= 0x7fff_ffff;
720 }
721 bits.to_le_bytes()
722 })
723 .collect();
724 sign_bitmap = Some(sign_bits);
725 (4, bytes)
726 }
727 Column::Float64(values) => {
728 let mut sign_bits = Bitmap::new(values.len());
729 let bytes: Vec<u8> = values
730 .iter()
731 .enumerate()
732 .flat_map(|(idx, v)| {
733 let mut bits = v.to_bits();
734 if bits & 0x8000_0000_0000_0000 != 0 {
735 sign_bits.set(idx, true);
736 bits &= 0x7fff_ffff_ffff_ffff;
737 }
738 bits.to_le_bytes()
739 })
740 .collect();
741 sign_bitmap = Some(sign_bits);
742 (8, bytes)
743 }
744 Column::Int64(values) => {
745 let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
746 (8, bytes)
747 }
748 _ => {
749 return Err(ColumnarError::InvalidFormat(
750 "ByteStreamSplit requires Float32, Float64, or Int64".into(),
751 ))
752 }
753 };
754
755 let num_values = raw_bytes.len() / bytes_per_value;
756
757 let mut buf = Vec::new();
758 buf.extend_from_slice(&(num_values as u32).to_le_bytes());
759 let header = (bytes_per_value as u8) | BYTE_STREAM_SPLIT_V1_FLAG;
760 buf.push(header);
761
762 if let Some(bitmap) = null_bitmap {
764 buf.push(1u8);
765 buf.extend_from_slice(&bitmap.to_bytes());
766 } else {
767 buf.push(0u8);
768 }
769
770 let _expected_size = num_values * bytes_per_value;
771 let mut streams: Vec<Vec<u8>> = Vec::with_capacity(bytes_per_value);
772
773 let mut offset = 0;
776 for _ in 0..bytes_per_value {
777 streams.push(Vec::with_capacity(num_values));
778 }
779 while offset < num_values {
780 let block_len = (num_values - offset).min(BYTE_STREAM_SPLIT_BLOCK_SIZE);
781 for (stream_idx, byte_idx) in (0..bytes_per_value).rev().enumerate() {
782 let start = offset * bytes_per_value + byte_idx;
783 let stream = &mut streams[stream_idx];
784 for value_idx in 0..block_len {
785 stream.push(raw_bytes[start + value_idx * bytes_per_value]);
786 }
787 }
788 offset += block_len;
789 }
790
791 let mut stream_count_byte = bytes_per_value as u8;
793 if sign_bitmap.is_some() {
794 stream_count_byte |= BYTE_STREAM_SPLIT_SIGN_FLAG;
795 }
796 buf.push(stream_count_byte); if let Some(sign) = sign_bitmap {
800 buf.extend_from_slice(&sign.to_bytes());
801 }
802 for stream in streams {
803 let original_len = stream.len() as u32;
804 buf.push(BYTE_STREAM_SPLIT_FLAG_RAW);
805 buf.extend_from_slice(&original_len.to_le_bytes());
806 buf.extend_from_slice(&original_len.to_le_bytes());
807 buf.extend_from_slice(&stream);
808 }
809 buf[4] = header;
810 Ok(buf)
811 }
812
813 fn encoding_type(&self) -> EncodingV2 {
814 EncodingV2::ByteStreamSplit
815 }
816}
817
818pub struct ByteStreamSplitDecoder;
820
821impl Decoder for ByteStreamSplitDecoder {
822 fn decode(
823 &self,
824 data: &[u8],
825 _num_values: usize,
826 logical_type: LogicalType,
827 ) -> Result<(Column, Option<Bitmap>)> {
828 if data.len() < 6 {
829 return Err(ColumnarError::InvalidFormat(
830 "ByteStreamSplit header too short".into(),
831 ));
832 }
833
834 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
835 let header = data[4];
836 let bytes_per_value = (header & BYTE_STREAM_SPLIT_HEADER_MASK) as usize;
837 let has_bitmap = data[5] != 0;
838 let use_v1_layout = (header & BYTE_STREAM_SPLIT_V1_FLAG) != 0;
839 let mut pos = 6;
840
841 if bytes_per_value == 0 {
842 return Err(ColumnarError::InvalidFormat(
843 "ByteStreamSplit bytes_per_value cannot be zero".into(),
844 ));
845 }
846
847 let bitmap = if has_bitmap {
848 let bm = Bitmap::from_bytes(&data[pos..])?;
849 pos += 4 + bm.len().div_ceil(8);
850 Some(bm)
851 } else {
852 None
853 };
854
855 if count == 0 {
856 return match logical_type {
857 LogicalType::Float32 => Ok((Column::Float32(vec![]), bitmap)),
858 LogicalType::Float64 => Ok((Column::Float64(vec![]), bitmap)),
859 LogicalType::Int64 => Ok((Column::Int64(vec![]), bitmap)),
860 _ => Err(ColumnarError::InvalidFormat(
861 "ByteStreamSplit logical type mismatch".into(),
862 )),
863 };
864 }
865
866 let expected_size = count * bytes_per_value;
867 let mut raw_bytes = vec![0u8; expected_size];
868
869 if use_v1_layout {
870 if pos >= data.len() {
871 return Err(ColumnarError::InvalidFormat(
872 "ByteStreamSplit stream header truncated".into(),
873 ));
874 }
875
876 let stream_count_byte = data[pos];
877 pos += 1;
878 let has_sign_bitmap = (stream_count_byte & BYTE_STREAM_SPLIT_SIGN_FLAG) != 0;
879 let stream_count = (stream_count_byte & BYTE_STREAM_SPLIT_STREAM_COUNT_MASK) as usize;
880 if stream_count != bytes_per_value {
881 return Err(ColumnarError::InvalidFormat(
882 "ByteStreamSplit stream count mismatch".into(),
883 ));
884 }
885
886 let mut sign_bitmap = if has_sign_bitmap {
887 let bm = Bitmap::from_bytes(&data[pos..])?;
888 pos += 4 + bm.len().div_ceil(8);
889 Some(bm)
890 } else {
891 None
892 };
893 if !matches!(logical_type, LogicalType::Float32 | LogicalType::Float64) {
894 sign_bitmap = None;
895 }
896
897 let raw_ptr = raw_bytes.as_mut_ptr();
898 for stream_idx in 0..stream_count {
899 if pos + 9 > data.len() {
900 return Err(ColumnarError::InvalidFormat(
901 "ByteStreamSplit stream header truncated".into(),
902 ));
903 }
904 let flag = data[pos];
905 let orig_len =
906 u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
907 let payload_len =
908 u32::from_le_bytes(data[pos + 5..pos + 9].try_into().unwrap()) as usize;
909 pos += 9;
910
911 if orig_len != count {
912 return Err(ColumnarError::InvalidFormat(
913 "ByteStreamSplit stream length mismatch".into(),
914 ));
915 }
916 if pos + payload_len > data.len() {
917 return Err(ColumnarError::InvalidFormat(
918 "ByteStreamSplit stream payload truncated".into(),
919 ));
920 }
921
922 let payload = &data[pos..pos + payload_len];
923 pos += payload_len;
924
925 let stream_buf: Option<Vec<u8>> = match flag {
926 BYTE_STREAM_SPLIT_FLAG_LZ4 => {
927 #[cfg(feature = "compression-lz4")]
928 {
929 let orig_len_i32: i32 = orig_len.try_into().map_err(|_| {
930 ColumnarError::InvalidFormat(
931 "ByteStreamSplit stream length too large".into(),
932 )
933 })?;
934 Some(lz4::block::decompress(payload, Some(orig_len_i32)).map_err(
935 |_| {
936 ColumnarError::InvalidFormat(
937 "ByteStreamSplit stream decompress failed".into(),
938 )
939 },
940 )?)
941 }
942 #[cfg(not(feature = "compression-lz4"))]
943 {
944 return Err(ColumnarError::InvalidFormat(
945 "ByteStreamSplit compressed stream requires compression-lz4".into(),
946 ));
947 }
948 }
949 BYTE_STREAM_SPLIT_FLAG_ZSTD => {
950 #[cfg(feature = "compression-zstd")]
951 {
952 Some(zstd::bulk::decompress(payload, orig_len).map_err(|_| {
953 ColumnarError::InvalidFormat(
954 "ByteStreamSplit stream decompress failed".into(),
955 )
956 })?)
957 }
958 #[cfg(not(feature = "compression-zstd"))]
959 {
960 return Err(ColumnarError::InvalidFormat(
961 "ByteStreamSplit zstd stream requires compression-zstd".into(),
962 ));
963 }
964 }
965 _ => {
966 None
968 }
969 };
970
971 let stream = match stream_buf.as_deref() {
972 Some(buf) => buf,
973 None => payload,
974 };
975
976 if stream.len() != count {
977 return Err(ColumnarError::InvalidFormat(
978 "ByteStreamSplit stream length invalid".into(),
979 ));
980 }
981
982 let byte_idx = bytes_per_value - 1 - stream_idx;
983 if stream_idx == 0 {
984 if let Some(sign_bitmap) = sign_bitmap.as_ref() {
985 let mut dst_index = byte_idx;
986 for (value_idx, &byte) in stream.iter().enumerate() {
987 let mut out = byte;
988 if sign_bitmap.get(value_idx) {
989 out |= 0x80;
990 }
991 unsafe {
993 *raw_ptr.add(dst_index) = out;
994 }
995 dst_index += bytes_per_value;
996 }
997 continue;
998 }
999 }
1000
1001 let mut dst_index = byte_idx;
1002 for &byte in stream {
1003 unsafe {
1005 *raw_ptr.add(dst_index) = byte;
1006 }
1007 dst_index += bytes_per_value;
1008 }
1009 }
1010 } else {
1011 if pos + expected_size > data.len() {
1012 return Err(ColumnarError::InvalidFormat(
1013 "ByteStreamSplit data truncated".into(),
1014 ));
1015 }
1016
1017 let raw_ptr = raw_bytes.as_mut_ptr();
1018 let data_ptr = data.as_ptr();
1019 for byte_idx in 0..bytes_per_value {
1020 let dst_index = byte_idx;
1021 let src_index = pos + byte_idx * count;
1022 unsafe {
1024 let mut dst_ptr = raw_ptr.add(dst_index);
1025 let mut src_ptr = data_ptr.add(src_index);
1026 for _ in 0..count {
1027 *dst_ptr = *src_ptr;
1028 dst_ptr = dst_ptr.add(bytes_per_value);
1029 src_ptr = src_ptr.add(1);
1030 }
1031 }
1032 }
1033 }
1034
1035 match logical_type {
1036 LogicalType::Float32 => {
1037 if bytes_per_value != 4 {
1038 return Err(ColumnarError::InvalidFormat(
1039 "ByteStreamSplit Float32 length mismatch".into(),
1040 ));
1041 }
1042 if cfg!(target_endian = "little") {
1043 let mut values = vec![0f32; count];
1044 unsafe {
1046 std::ptr::copy_nonoverlapping(
1047 raw_bytes.as_ptr(),
1048 values.as_mut_ptr() as *mut u8,
1049 raw_bytes.len(),
1050 );
1051 }
1052 Ok((Column::Float32(values), bitmap))
1053 } else {
1054 let values: Vec<f32> = raw_bytes
1055 .chunks_exact(4)
1056 .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
1057 .collect();
1058 Ok((Column::Float32(values), bitmap))
1059 }
1060 }
1061 LogicalType::Float64 => {
1062 if bytes_per_value != 8 {
1063 return Err(ColumnarError::InvalidFormat(
1064 "ByteStreamSplit Float64 length mismatch".into(),
1065 ));
1066 }
1067 if cfg!(target_endian = "little") {
1068 let mut values = vec![0f64; count];
1069 unsafe {
1071 std::ptr::copy_nonoverlapping(
1072 raw_bytes.as_ptr(),
1073 values.as_mut_ptr() as *mut u8,
1074 raw_bytes.len(),
1075 );
1076 }
1077 Ok((Column::Float64(values), bitmap))
1078 } else {
1079 let values: Vec<f64> = raw_bytes
1080 .chunks_exact(8)
1081 .map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
1082 .collect();
1083 Ok((Column::Float64(values), bitmap))
1084 }
1085 }
1086 LogicalType::Int64 => {
1087 if bytes_per_value != 8 {
1088 return Err(ColumnarError::InvalidFormat(
1089 "ByteStreamSplit Int64 length mismatch".into(),
1090 ));
1091 }
1092 if cfg!(target_endian = "little") {
1093 let mut values = vec![0i64; count];
1094 unsafe {
1096 std::ptr::copy_nonoverlapping(
1097 raw_bytes.as_ptr(),
1098 values.as_mut_ptr() as *mut u8,
1099 raw_bytes.len(),
1100 );
1101 }
1102 Ok((Column::Int64(values), bitmap))
1103 } else {
1104 let values: Vec<i64> = raw_bytes
1105 .chunks_exact(8)
1106 .map(|chunk| i64::from_le_bytes(chunk.try_into().unwrap()))
1107 .collect();
1108 Ok((Column::Int64(values), bitmap))
1109 }
1110 }
1111 _ => Err(ColumnarError::InvalidFormat(
1112 "ByteStreamSplit requires Float32, Float64, or Int64".into(),
1113 )),
1114 }
1115 }
1116}
1117
1118pub struct IncrementalStringEncoder;
1127
1128impl Encoder for IncrementalStringEncoder {
1129 fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1130 let values = match data {
1131 Column::Binary(v) => v,
1132 _ => {
1133 return Err(ColumnarError::InvalidFormat(
1134 "IncrementalString encoding requires Binary".into(),
1135 ))
1136 }
1137 };
1138
1139 let mut buf = Vec::new();
1140 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1141
1142 if let Some(bitmap) = null_bitmap {
1144 buf.push(1u8);
1145 buf.extend_from_slice(&bitmap.to_bytes());
1146 } else {
1147 buf.push(0u8);
1148 }
1149
1150 if values.is_empty() {
1151 return Ok(buf);
1152 }
1153
1154 buf.extend_from_slice(&(values[0].len() as u32).to_le_bytes());
1156 buf.extend_from_slice(&values[0]);
1157
1158 for window in values.windows(2) {
1160 let prev = &window[0];
1161 let curr = &window[1];
1162
1163 let common_prefix = prev
1164 .iter()
1165 .zip(curr.iter())
1166 .take_while(|(a, b)| a == b)
1167 .count();
1168
1169 let suffix = &curr[common_prefix..];
1170
1171 buf.extend_from_slice(&(common_prefix as u16).to_le_bytes());
1172 buf.extend_from_slice(&(suffix.len() as u16).to_le_bytes());
1173 buf.extend_from_slice(suffix);
1174 }
1175
1176 Ok(buf)
1177 }
1178
1179 fn encoding_type(&self) -> EncodingV2 {
1180 EncodingV2::IncrementalString
1181 }
1182}
1183
1184pub struct IncrementalStringDecoder;
1186
1187impl Decoder for IncrementalStringDecoder {
1188 fn decode(
1189 &self,
1190 data: &[u8],
1191 _num_values: usize,
1192 logical_type: LogicalType,
1193 ) -> Result<(Column, Option<Bitmap>)> {
1194 if logical_type != LogicalType::Binary {
1195 return Err(ColumnarError::InvalidFormat(
1196 "IncrementalString decoding requires Binary".into(),
1197 ));
1198 }
1199
1200 if data.len() < 5 {
1201 return Err(ColumnarError::InvalidFormat(
1202 "IncrementalString header too short".into(),
1203 ));
1204 }
1205
1206 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1207 let has_bitmap = data[4] != 0;
1208 let mut pos = 5;
1209
1210 let bitmap = if has_bitmap {
1211 let bm = Bitmap::from_bytes(&data[pos..])?;
1212 pos += 4 + bm.len().div_ceil(8);
1213 Some(bm)
1214 } else {
1215 None
1216 };
1217
1218 if count == 0 {
1219 return Ok((Column::Binary(vec![]), bitmap));
1220 }
1221
1222 let mut values = Vec::with_capacity(count);
1223
1224 if pos + 4 > data.len() {
1226 return Err(ColumnarError::InvalidFormat(
1227 "IncrementalString first len truncated".into(),
1228 ));
1229 }
1230 let first_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1231 pos += 4;
1232
1233 if pos + first_len > data.len() {
1234 return Err(ColumnarError::InvalidFormat(
1235 "IncrementalString first value truncated".into(),
1236 ));
1237 }
1238 let mut current = data[pos..pos + first_len].to_vec();
1239 pos += first_len;
1240 values.push(current.clone());
1241
1242 for _ in 1..count {
1244 if pos + 4 > data.len() {
1245 return Err(ColumnarError::InvalidFormat(
1246 "IncrementalString header truncated".into(),
1247 ));
1248 }
1249 let prefix_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1250 pos += 2;
1251 let suffix_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1252 pos += 2;
1253
1254 if pos + suffix_len > data.len() {
1255 return Err(ColumnarError::InvalidFormat(
1256 "IncrementalString suffix truncated".into(),
1257 ));
1258 }
1259
1260 current.truncate(prefix_len);
1261 current.extend_from_slice(&data[pos..pos + suffix_len]);
1262 pos += suffix_len;
1263
1264 values.push(current.clone());
1265 }
1266
1267 Ok((Column::Binary(values), bitmap))
1268 }
1269}
1270
1271pub struct RleEncoder;
1282
1283impl Encoder for RleEncoder {
1284 fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1285 match data {
1286 Column::Bool(values) => encode_rle_bool(values, null_bitmap),
1287 Column::Int64(values) => encode_rle_int64(values, null_bitmap),
1288 _ => Err(ColumnarError::InvalidFormat(
1289 "RLE encoding requires Bool or Int64".into(),
1290 )),
1291 }
1292 }
1293
1294 fn encoding_type(&self) -> EncodingV2 {
1295 EncodingV2::Rle
1296 }
1297}
1298
1299fn encode_rle_bool(values: &[bool], null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1300 let mut buf = Vec::new();
1301 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1302
1303 if let Some(bitmap) = null_bitmap {
1305 buf.push(1u8);
1306 buf.extend_from_slice(&bitmap.to_bytes());
1307 } else {
1308 buf.push(0u8);
1309 }
1310
1311 if values.is_empty() {
1312 buf.extend_from_slice(&0u32.to_le_bytes()); return Ok(buf);
1314 }
1315
1316 let mut runs: Vec<(bool, u32)> = Vec::new();
1318 let mut current_value = values[0];
1319 let mut current_run_len = 1u32;
1320
1321 for &v in values.iter().skip(1) {
1322 if v == current_value {
1323 current_run_len += 1;
1324 } else {
1325 runs.push((current_value, current_run_len));
1326 current_value = v;
1327 current_run_len = 1;
1328 }
1329 }
1330 runs.push((current_value, current_run_len));
1331
1332 buf.extend_from_slice(&(runs.len() as u32).to_le_bytes());
1334
1335 for (value, run_len) in runs {
1337 buf.push(value as u8);
1338 buf.extend_from_slice(&run_len.to_le_bytes());
1339 }
1340
1341 Ok(buf)
1342}
1343
1344fn encode_rle_int64(values: &[i64], null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1345 let mut buf = Vec::new();
1346 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1347
1348 if let Some(bitmap) = null_bitmap {
1350 buf.push(1u8);
1351 buf.extend_from_slice(&bitmap.to_bytes());
1352 } else {
1353 buf.push(0u8);
1354 }
1355
1356 if values.is_empty() {
1357 buf.extend_from_slice(&0u32.to_le_bytes()); return Ok(buf);
1359 }
1360
1361 let mut runs: Vec<(i64, u32)> = Vec::new();
1363 let mut current_value = values[0];
1364 let mut current_run_len = 1u32;
1365
1366 for &v in values.iter().skip(1) {
1367 if v == current_value {
1368 current_run_len += 1;
1369 } else {
1370 runs.push((current_value, current_run_len));
1371 current_value = v;
1372 current_run_len = 1;
1373 }
1374 }
1375 runs.push((current_value, current_run_len));
1376
1377 buf.extend_from_slice(&(runs.len() as u32).to_le_bytes());
1379
1380 for (value, run_len) in runs {
1382 buf.extend_from_slice(&value.to_le_bytes());
1383 buf.extend_from_slice(&run_len.to_le_bytes());
1384 }
1385
1386 Ok(buf)
1387}
1388
1389pub struct RleDecoder;
1391
1392impl Decoder for RleDecoder {
1393 fn decode(
1394 &self,
1395 data: &[u8],
1396 _num_values: usize,
1397 logical_type: LogicalType,
1398 ) -> Result<(Column, Option<Bitmap>)> {
1399 match logical_type {
1400 LogicalType::Bool => decode_rle_bool(data),
1401 LogicalType::Int64 => decode_rle_int64(data),
1402 _ => Err(ColumnarError::InvalidFormat(
1403 "RLE decoding requires Bool or Int64".into(),
1404 )),
1405 }
1406 }
1407}
1408
1409fn decode_rle_bool(data: &[u8]) -> Result<(Column, Option<Bitmap>)> {
1410 if data.len() < 5 {
1411 return Err(ColumnarError::InvalidFormat("RLE header too short".into()));
1412 }
1413
1414 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1415 let has_bitmap = data[4] != 0;
1416 let mut pos = 5;
1417
1418 let bitmap = if has_bitmap {
1419 let bm = Bitmap::from_bytes(&data[pos..])?;
1420 pos += 4 + bm.len().div_ceil(8);
1421 Some(bm)
1422 } else {
1423 None
1424 };
1425
1426 if pos + 4 > data.len() {
1427 return Err(ColumnarError::InvalidFormat(
1428 "RLE run_count truncated".into(),
1429 ));
1430 }
1431
1432 let run_count = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1433 pos += 4;
1434
1435 if count == 0 {
1436 return Ok((Column::Bool(vec![]), bitmap));
1437 }
1438
1439 let mut values = Vec::with_capacity(count);
1440
1441 for _ in 0..run_count {
1442 if pos + 5 > data.len() {
1443 return Err(ColumnarError::InvalidFormat(
1444 "RLE bool run truncated".into(),
1445 ));
1446 }
1447 let value = data[pos] != 0;
1448 pos += 1;
1449 let run_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1450 pos += 4;
1451
1452 for _ in 0..run_len {
1453 values.push(value);
1454 }
1455 }
1456
1457 if values.len() != count {
1458 return Err(ColumnarError::InvalidFormat("RLE count mismatch".into()));
1459 }
1460
1461 Ok((Column::Bool(values), bitmap))
1462}
1463
1464fn decode_rle_int64(data: &[u8]) -> Result<(Column, Option<Bitmap>)> {
1465 if data.len() < 5 {
1466 return Err(ColumnarError::InvalidFormat("RLE header too short".into()));
1467 }
1468
1469 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1470 let has_bitmap = data[4] != 0;
1471 let mut pos = 5;
1472
1473 let bitmap = if has_bitmap {
1474 let bm = Bitmap::from_bytes(&data[pos..])?;
1475 pos += 4 + bm.len().div_ceil(8);
1476 Some(bm)
1477 } else {
1478 None
1479 };
1480
1481 if pos + 4 > data.len() {
1482 return Err(ColumnarError::InvalidFormat(
1483 "RLE run_count truncated".into(),
1484 ));
1485 }
1486
1487 let run_count = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1488 pos += 4;
1489
1490 if count == 0 {
1491 return Ok((Column::Int64(vec![]), bitmap));
1492 }
1493
1494 let mut values = Vec::with_capacity(count);
1495
1496 for _ in 0..run_count {
1497 if pos + 12 > data.len() {
1498 return Err(ColumnarError::InvalidFormat(
1499 "RLE int64 run truncated".into(),
1500 ));
1501 }
1502 let value = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
1503 pos += 8;
1504 let run_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1505 pos += 4;
1506
1507 for _ in 0..run_len {
1508 values.push(value);
1509 }
1510 }
1511
1512 if values.len() != count {
1513 return Err(ColumnarError::InvalidFormat("RLE count mismatch".into()));
1514 }
1515
1516 Ok((Column::Int64(values), bitmap))
1517}
1518
1519pub struct DictionaryEncoder;
1530
1531impl Encoder for DictionaryEncoder {
1532 fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1533 match data {
1534 Column::Binary(values) => encode_dict_binary(values, null_bitmap),
1535 Column::Fixed { len, values } => encode_dict_fixed(*len, values, null_bitmap),
1536 _ => Err(ColumnarError::InvalidFormat(
1537 "Dictionary encoding requires Binary or Fixed".into(),
1538 )),
1539 }
1540 }
1541
1542 fn encoding_type(&self) -> EncodingV2 {
1543 EncodingV2::Dictionary
1544 }
1545}
1546
1547fn encode_dict_binary(values: &[Vec<u8>], null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1548 use std::collections::HashMap;
1549
1550 let mut buf = Vec::new();
1551 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1552
1553 if let Some(bitmap) = null_bitmap {
1555 buf.push(1u8);
1556 buf.extend_from_slice(&bitmap.to_bytes());
1557 } else {
1558 buf.push(0u8);
1559 }
1560
1561 if values.is_empty() {
1562 buf.extend_from_slice(&0u32.to_le_bytes()); return Ok(buf);
1564 }
1565
1566 let mut dict: Vec<&Vec<u8>> = Vec::new();
1568 let mut dict_map: HashMap<&Vec<u8>, u32> = HashMap::new();
1569 let mut indices: Vec<u32> = Vec::with_capacity(values.len());
1570
1571 for v in values {
1572 if let Some(&idx) = dict_map.get(v) {
1573 indices.push(idx);
1574 } else {
1575 let idx = dict.len() as u32;
1576 dict.push(v);
1577 dict_map.insert(v, idx);
1578 indices.push(idx);
1579 }
1580 }
1581
1582 buf.extend_from_slice(&(dict.len() as u32).to_le_bytes());
1584
1585 for entry in &dict {
1587 buf.extend_from_slice(&(entry.len() as u32).to_le_bytes());
1588 buf.extend_from_slice(entry);
1589 }
1590
1591 for idx in indices {
1593 buf.extend_from_slice(&idx.to_le_bytes());
1594 }
1595
1596 Ok(buf)
1597}
1598
1599fn encode_dict_fixed(
1600 fixed_len: usize,
1601 values: &[Vec<u8>],
1602 null_bitmap: Option<&Bitmap>,
1603) -> Result<Vec<u8>> {
1604 use std::collections::HashMap;
1605
1606 let mut buf = Vec::new();
1607 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1608
1609 if let Some(bitmap) = null_bitmap {
1611 buf.push(1u8);
1612 buf.extend_from_slice(&bitmap.to_bytes());
1613 } else {
1614 buf.push(0u8);
1615 }
1616
1617 buf.extend_from_slice(&(fixed_len as u16).to_le_bytes());
1619
1620 if values.is_empty() {
1621 buf.extend_from_slice(&0u32.to_le_bytes()); return Ok(buf);
1623 }
1624
1625 let mut dict: Vec<&Vec<u8>> = Vec::new();
1627 let mut dict_map: HashMap<&Vec<u8>, u32> = HashMap::new();
1628 let mut indices: Vec<u32> = Vec::with_capacity(values.len());
1629
1630 for v in values {
1631 if let Some(&idx) = dict_map.get(v) {
1632 indices.push(idx);
1633 } else {
1634 let idx = dict.len() as u32;
1635 dict.push(v);
1636 dict_map.insert(v, idx);
1637 indices.push(idx);
1638 }
1639 }
1640
1641 buf.extend_from_slice(&(dict.len() as u32).to_le_bytes());
1643
1644 for entry in &dict {
1646 buf.extend_from_slice(entry);
1647 }
1648
1649 for idx in indices {
1651 buf.extend_from_slice(&idx.to_le_bytes());
1652 }
1653
1654 Ok(buf)
1655}
1656
1657pub struct DictionaryDecoder;
1659
1660impl Decoder for DictionaryDecoder {
1661 fn decode(
1662 &self,
1663 data: &[u8],
1664 _num_values: usize,
1665 logical_type: LogicalType,
1666 ) -> Result<(Column, Option<Bitmap>)> {
1667 match logical_type {
1668 LogicalType::Binary => decode_dict_binary(data),
1669 LogicalType::Fixed(fixed_len) => decode_dict_fixed(data, fixed_len as usize),
1670 _ => Err(ColumnarError::InvalidFormat(
1671 "Dictionary decoding requires Binary or Fixed".into(),
1672 )),
1673 }
1674 }
1675}
1676
1677fn decode_dict_binary(data: &[u8]) -> Result<(Column, Option<Bitmap>)> {
1678 if data.len() < 5 {
1679 return Err(ColumnarError::InvalidFormat(
1680 "Dictionary header too short".into(),
1681 ));
1682 }
1683
1684 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1685 let has_bitmap = data[4] != 0;
1686 let mut pos = 5;
1687
1688 let bitmap = if has_bitmap {
1689 let bm = Bitmap::from_bytes(&data[pos..])?;
1690 pos += 4 + bm.len().div_ceil(8);
1691 Some(bm)
1692 } else {
1693 None
1694 };
1695
1696 if pos + 4 > data.len() {
1697 return Err(ColumnarError::InvalidFormat(
1698 "Dictionary dict_count truncated".into(),
1699 ));
1700 }
1701
1702 let dict_count = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1703 pos += 4;
1704
1705 if count == 0 {
1706 return Ok((Column::Binary(vec![]), bitmap));
1707 }
1708
1709 let mut dict: Vec<Vec<u8>> = Vec::with_capacity(dict_count);
1711 for _ in 0..dict_count {
1712 if pos + 4 > data.len() {
1713 return Err(ColumnarError::InvalidFormat(
1714 "Dictionary entry len truncated".into(),
1715 ));
1716 }
1717 let entry_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1718 pos += 4;
1719 if pos + entry_len > data.len() {
1720 return Err(ColumnarError::InvalidFormat(
1721 "Dictionary entry data truncated".into(),
1722 ));
1723 }
1724 dict.push(data[pos..pos + entry_len].to_vec());
1725 pos += entry_len;
1726 }
1727
1728 let mut values = Vec::with_capacity(count);
1730 for _ in 0..count {
1731 if pos + 4 > data.len() {
1732 return Err(ColumnarError::InvalidFormat(
1733 "Dictionary index truncated".into(),
1734 ));
1735 }
1736 let idx = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1737 pos += 4;
1738 if idx >= dict.len() {
1739 return Err(ColumnarError::InvalidFormat(
1740 "Dictionary index out of range".into(),
1741 ));
1742 }
1743 values.push(dict[idx].clone());
1744 }
1745
1746 Ok((Column::Binary(values), bitmap))
1747}
1748
1749fn decode_dict_fixed(data: &[u8], expected_len: usize) -> Result<(Column, Option<Bitmap>)> {
1750 if data.len() < 5 {
1751 return Err(ColumnarError::InvalidFormat(
1752 "Dictionary header too short".into(),
1753 ));
1754 }
1755
1756 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1757 let has_bitmap = data[4] != 0;
1758 let mut pos = 5;
1759
1760 let bitmap = if has_bitmap {
1761 let bm = Bitmap::from_bytes(&data[pos..])?;
1762 pos += 4 + bm.len().div_ceil(8);
1763 Some(bm)
1764 } else {
1765 None
1766 };
1767
1768 if pos + 2 > data.len() {
1769 return Err(ColumnarError::InvalidFormat(
1770 "Dictionary fixed_len truncated".into(),
1771 ));
1772 }
1773 let fixed_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1774 pos += 2;
1775
1776 if fixed_len != expected_len {
1777 return Err(ColumnarError::InvalidFormat(
1778 "Dictionary fixed length mismatch".into(),
1779 ));
1780 }
1781
1782 if pos + 4 > data.len() {
1783 return Err(ColumnarError::InvalidFormat(
1784 "Dictionary dict_count truncated".into(),
1785 ));
1786 }
1787
1788 let dict_count = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1789 pos += 4;
1790
1791 if count == 0 {
1792 return Ok((
1793 Column::Fixed {
1794 len: fixed_len,
1795 values: vec![],
1796 },
1797 bitmap,
1798 ));
1799 }
1800
1801 let mut dict: Vec<Vec<u8>> = Vec::with_capacity(dict_count);
1803 for _ in 0..dict_count {
1804 if pos + fixed_len > data.len() {
1805 return Err(ColumnarError::InvalidFormat(
1806 "Dictionary fixed entry truncated".into(),
1807 ));
1808 }
1809 dict.push(data[pos..pos + fixed_len].to_vec());
1810 pos += fixed_len;
1811 }
1812
1813 let mut values = Vec::with_capacity(count);
1815 for _ in 0..count {
1816 if pos + 4 > data.len() {
1817 return Err(ColumnarError::InvalidFormat(
1818 "Dictionary index truncated".into(),
1819 ));
1820 }
1821 let idx = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1822 pos += 4;
1823 if idx >= dict.len() {
1824 return Err(ColumnarError::InvalidFormat(
1825 "Dictionary index out of range".into(),
1826 ));
1827 }
1828 values.push(dict[idx].clone());
1829 }
1830
1831 Ok((
1832 Column::Fixed {
1833 len: fixed_len,
1834 values,
1835 },
1836 bitmap,
1837 ))
1838}
1839
1840pub struct BitpackEncoder;
1850
1851impl Encoder for BitpackEncoder {
1852 fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1853 let values = match data {
1854 Column::Bool(v) => v,
1855 _ => {
1856 return Err(ColumnarError::InvalidFormat(
1857 "Bitpack encoding requires Bool".into(),
1858 ))
1859 }
1860 };
1861
1862 let mut buf = Vec::new();
1863 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1864
1865 if let Some(bitmap) = null_bitmap {
1867 buf.push(1u8);
1868 buf.extend_from_slice(&bitmap.to_bytes());
1869 } else {
1870 buf.push(0u8);
1871 }
1872
1873 if values.is_empty() {
1874 return Ok(buf);
1875 }
1876
1877 let num_bytes = values.len().div_ceil(8);
1879 let mut packed = vec![0u8; num_bytes];
1880
1881 for (i, &v) in values.iter().enumerate() {
1882 if v {
1883 packed[i / 8] |= 1 << (i % 8);
1884 }
1885 }
1886
1887 buf.extend_from_slice(&packed);
1888
1889 Ok(buf)
1890 }
1891
1892 fn encoding_type(&self) -> EncodingV2 {
1893 EncodingV2::Bitpack
1894 }
1895}
1896
1897pub struct BitpackDecoder;
1899
1900impl Decoder for BitpackDecoder {
1901 fn decode(
1902 &self,
1903 data: &[u8],
1904 _num_values: usize,
1905 logical_type: LogicalType,
1906 ) -> Result<(Column, Option<Bitmap>)> {
1907 if logical_type != LogicalType::Bool {
1908 return Err(ColumnarError::InvalidFormat(
1909 "Bitpack decoding requires Bool".into(),
1910 ));
1911 }
1912
1913 if data.len() < 5 {
1914 return Err(ColumnarError::InvalidFormat(
1915 "Bitpack header too short".into(),
1916 ));
1917 }
1918
1919 let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1920 let has_bitmap = data[4] != 0;
1921 let mut pos = 5;
1922
1923 let bitmap = if has_bitmap {
1924 let bm = Bitmap::from_bytes(&data[pos..])?;
1925 pos += 4 + bm.len().div_ceil(8);
1926 Some(bm)
1927 } else {
1928 None
1929 };
1930
1931 if count == 0 {
1932 return Ok((Column::Bool(vec![]), bitmap));
1933 }
1934
1935 let num_bytes = count.div_ceil(8);
1936 if pos + num_bytes > data.len() {
1937 return Err(ColumnarError::InvalidFormat(
1938 "Bitpack data truncated".into(),
1939 ));
1940 }
1941
1942 let packed = &data[pos..pos + num_bytes];
1943
1944 let mut values = Vec::with_capacity(count);
1946 for i in 0..count {
1947 let value = (packed[i / 8] & (1 << (i % 8))) != 0;
1948 values.push(value);
1949 }
1950
1951 Ok((Column::Bool(values), bitmap))
1952 }
1953}
1954
1955#[derive(Default)]
1961pub struct EncodingHints {
1962 pub is_sorted: bool,
1964 pub distinct_count: usize,
1966 pub total_count: usize,
1968 pub value_range: Option<u64>,
1970 pub in_range_ratio: Option<f64>,
1972}
1973
1974pub fn select_encoding(logical_type: LogicalType, hints: &EncodingHints) -> EncodingV2 {
1979 match logical_type {
1980 LogicalType::Int64 => select_int_encoding(hints),
1981 LogicalType::Float32 => EncodingV2::ByteStreamSplit,
1982 LogicalType::Float64 => EncodingV2::ByteStreamSplit,
1983 LogicalType::Bool => select_bool_encoding(hints),
1984 LogicalType::Binary => select_binary_encoding(hints),
1985 LogicalType::Fixed(_) => select_binary_encoding(hints),
1986 }
1987}
1988
1989fn select_bool_encoding(hints: &EncodingHints) -> EncodingV2 {
1990 if hints.total_count > 0 && hints.distinct_count <= 2 {
1992 let avg_run_len = hints.total_count / hints.distinct_count.max(1);
1995 if avg_run_len >= 4 {
1996 return EncodingV2::Rle;
1997 }
1998 }
1999
2000 EncodingV2::Bitpack
2002}
2003
2004fn select_int_encoding(hints: &EncodingHints) -> EncodingV2 {
2005 if hints.is_sorted {
2007 return EncodingV2::Delta;
2008 }
2009
2010 if let Some(range) = hints.value_range {
2012 let bits_needed = if range == 0 {
2013 1
2014 } else {
2015 64 - range.leading_zeros()
2016 };
2017 if bits_needed <= 16 {
2018 if let Some(ratio) = hints.in_range_ratio {
2020 if (0.9..1.0).contains(&ratio) {
2021 return EncodingV2::PFOR;
2022 }
2023 }
2024 return EncodingV2::FOR;
2025 }
2026 }
2027
2028 if hints.total_count > 0 && hints.distinct_count > 0 {
2030 let avg_run_len = hints.total_count / hints.distinct_count;
2031 if avg_run_len >= 4 {
2032 return EncodingV2::Rle;
2033 }
2034 }
2035
2036 EncodingV2::Plain
2037}
2038
2039fn select_binary_encoding(hints: &EncodingHints) -> EncodingV2 {
2040 if hints.is_sorted && hints.total_count > 0 {
2042 return EncodingV2::IncrementalString;
2043 }
2044
2045 if hints.total_count > 0 && hints.distinct_count > 0 {
2047 let cardinality_ratio = hints.distinct_count as f64 / hints.total_count as f64;
2048 if cardinality_ratio < 0.5 {
2049 return EncodingV2::Dictionary;
2050 }
2051 }
2052
2053 EncodingV2::Plain
2054}
2055
2056pub fn create_encoder(encoding: EncodingV2) -> Box<dyn Encoder> {
2058 match encoding {
2059 EncodingV2::Plain => Box::new(PlainEncoder),
2060 EncodingV2::Delta => Box::new(DeltaEncoder),
2061 EncodingV2::FOR => Box::new(ForEncoder),
2062 EncodingV2::PFOR => Box::new(PforEncoder::default()),
2063 EncodingV2::ByteStreamSplit => Box::new(ByteStreamSplitEncoder),
2064 EncodingV2::IncrementalString => Box::new(IncrementalStringEncoder),
2065 EncodingV2::Rle => Box::new(RleEncoder),
2066 EncodingV2::Dictionary => Box::new(DictionaryEncoder),
2067 EncodingV2::Bitpack => Box::new(BitpackEncoder),
2068 EncodingV2::DeltaLength => Box::new(PlainEncoder),
2070 }
2071}
2072
2073pub fn create_decoder(encoding: EncodingV2) -> Box<dyn Decoder> {
2075 match encoding {
2076 EncodingV2::Plain => Box::new(PlainDecoder),
2077 EncodingV2::Delta => Box::new(DeltaDecoder),
2078 EncodingV2::FOR => Box::new(ForDecoder),
2079 EncodingV2::PFOR => Box::new(PforDecoder),
2080 EncodingV2::ByteStreamSplit => Box::new(ByteStreamSplitDecoder),
2081 EncodingV2::IncrementalString => Box::new(IncrementalStringDecoder),
2082 EncodingV2::Rle => Box::new(RleDecoder),
2083 EncodingV2::Dictionary => Box::new(DictionaryDecoder),
2084 EncodingV2::Bitpack => Box::new(BitpackDecoder),
2085 EncodingV2::DeltaLength => Box::new(PlainDecoder),
2087 }
2088}
2089
2090pub struct PlainEncoder;
2096
2097impl Encoder for PlainEncoder {
2098 fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
2099 let mut buf = Vec::new();
2100
2101 if let Some(bitmap) = null_bitmap {
2103 buf.push(1u8);
2104 buf.extend_from_slice(&bitmap.to_bytes());
2105 } else {
2106 buf.push(0u8);
2107 }
2108
2109 match data {
2110 Column::Int64(values) => {
2111 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
2112 for v in values {
2113 buf.extend_from_slice(&v.to_le_bytes());
2114 }
2115 }
2116 Column::Float32(values) => {
2117 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
2118 for v in values {
2119 buf.extend_from_slice(&v.to_le_bytes());
2120 }
2121 }
2122 Column::Float64(values) => {
2123 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
2124 for v in values {
2125 buf.extend_from_slice(&v.to_le_bytes());
2126 }
2127 }
2128 Column::Bool(values) => {
2129 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
2130 for v in values {
2131 buf.push(*v as u8);
2132 }
2133 }
2134 Column::Binary(values) => {
2135 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
2136 for v in values {
2137 buf.extend_from_slice(&(v.len() as u32).to_le_bytes());
2138 buf.extend_from_slice(v);
2139 }
2140 }
2141 Column::Fixed { len, values } => {
2142 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
2143 buf.extend_from_slice(&(*len as u16).to_le_bytes());
2144 for v in values {
2145 buf.extend_from_slice(v);
2146 }
2147 }
2148 }
2149
2150 Ok(buf)
2151 }
2152
2153 fn encoding_type(&self) -> EncodingV2 {
2154 EncodingV2::Plain
2155 }
2156}
2157
2158pub struct PlainDecoder;
2160
2161impl Decoder for PlainDecoder {
2162 fn decode(
2163 &self,
2164 data: &[u8],
2165 _num_values: usize,
2166 logical_type: LogicalType,
2167 ) -> Result<(Column, Option<Bitmap>)> {
2168 if data.is_empty() {
2169 return Err(ColumnarError::InvalidFormat("plain data empty".into()));
2170 }
2171
2172 let has_bitmap = data[0] != 0;
2173 let mut pos = 1;
2174
2175 let bitmap = if has_bitmap {
2176 let bm = Bitmap::from_bytes(&data[pos..])?;
2177 pos += 4 + bm.len().div_ceil(8);
2178 Some(bm)
2179 } else {
2180 None
2181 };
2182
2183 if pos + 4 > data.len() {
2184 return Err(ColumnarError::InvalidFormat("plain count truncated".into()));
2185 }
2186
2187 let count = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
2188 pos += 4;
2189
2190 let column = match logical_type {
2191 LogicalType::Int64 => {
2192 let mut values = Vec::with_capacity(count);
2193 for _ in 0..count {
2194 if pos + 8 > data.len() {
2195 return Err(ColumnarError::InvalidFormat("plain int64 truncated".into()));
2196 }
2197 values.push(i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()));
2198 pos += 8;
2199 }
2200 Column::Int64(values)
2201 }
2202 LogicalType::Float32 => {
2203 let mut values = Vec::with_capacity(count);
2204 for _ in 0..count {
2205 if pos + 4 > data.len() {
2206 return Err(ColumnarError::InvalidFormat(
2207 "plain float32 truncated".into(),
2208 ));
2209 }
2210 values.push(f32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()));
2211 pos += 4;
2212 }
2213 Column::Float32(values)
2214 }
2215 LogicalType::Float64 => {
2216 let mut values = Vec::with_capacity(count);
2217 for _ in 0..count {
2218 if pos + 8 > data.len() {
2219 return Err(ColumnarError::InvalidFormat(
2220 "plain float64 truncated".into(),
2221 ));
2222 }
2223 values.push(f64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()));
2224 pos += 8;
2225 }
2226 Column::Float64(values)
2227 }
2228 LogicalType::Bool => {
2229 let mut values = Vec::with_capacity(count);
2230 for _ in 0..count {
2231 if pos >= data.len() {
2232 return Err(ColumnarError::InvalidFormat("plain bool truncated".into()));
2233 }
2234 values.push(data[pos] != 0);
2235 pos += 1;
2236 }
2237 Column::Bool(values)
2238 }
2239 LogicalType::Binary => {
2240 let mut values = Vec::with_capacity(count);
2241 for _ in 0..count {
2242 if pos + 4 > data.len() {
2243 return Err(ColumnarError::InvalidFormat(
2244 "plain binary len truncated".into(),
2245 ));
2246 }
2247 let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
2248 pos += 4;
2249 if pos + len > data.len() {
2250 return Err(ColumnarError::InvalidFormat(
2251 "plain binary data truncated".into(),
2252 ));
2253 }
2254 values.push(data[pos..pos + len].to_vec());
2255 pos += len;
2256 }
2257 Column::Binary(values)
2258 }
2259 LogicalType::Fixed(fixed_len) => {
2260 if pos + 2 > data.len() {
2261 return Err(ColumnarError::InvalidFormat(
2262 "plain fixed len truncated".into(),
2263 ));
2264 }
2265 let stored_len =
2266 u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
2267 pos += 2;
2268 if stored_len != fixed_len as usize {
2269 return Err(ColumnarError::InvalidFormat(
2270 "plain fixed length mismatch".into(),
2271 ));
2272 }
2273 let mut values = Vec::with_capacity(count);
2274 for _ in 0..count {
2275 if pos + stored_len > data.len() {
2276 return Err(ColumnarError::InvalidFormat(
2277 "plain fixed data truncated".into(),
2278 ));
2279 }
2280 values.push(data[pos..pos + stored_len].to_vec());
2281 pos += stored_len;
2282 }
2283 Column::Fixed {
2284 len: stored_len,
2285 values,
2286 }
2287 }
2288 };
2289
2290 Ok((column, bitmap))
2291 }
2292}
2293
2294#[inline]
2300fn zigzag_encode(n: i64) -> u64 {
2301 ((n << 1) ^ (n >> 63)) as u64
2302}
2303
2304#[inline]
2306fn zigzag_decode(n: u64) -> i64 {
2307 ((n >> 1) as i64) ^ -((n & 1) as i64)
2308}
2309
2310fn encode_varint(mut n: u64, buf: &mut Vec<u8>) {
2312 while n >= 0x80 {
2313 buf.push((n as u8) | 0x80);
2314 n >>= 7;
2315 }
2316 buf.push(n as u8);
2317}
2318
2319fn decode_varint(data: &[u8]) -> Result<(u64, usize)> {
2321 let mut result = 0u64;
2322 let mut shift = 0;
2323 for (i, &byte) in data.iter().enumerate() {
2324 result |= ((byte & 0x7F) as u64) << shift;
2325 if byte & 0x80 == 0 {
2326 return Ok((result, i + 1));
2327 }
2328 shift += 7;
2329 if shift >= 64 {
2330 return Err(ColumnarError::InvalidFormat("varint overflow".into()));
2331 }
2332 }
2333 Err(ColumnarError::InvalidFormat("varint truncated".into()))
2334}
2335
2336#[cfg(all(test, not(target_arch = "wasm32")))]
2341mod tests {
2342 use super::*;
2343
2344 #[test]
2345 fn test_delta_encoding_sorted_integers() {
2346 let values = vec![100i64, 105, 110, 115, 120, 125];
2347 let col = Column::Int64(values.clone());
2348
2349 let encoder = DeltaEncoder;
2350 let encoded = encoder.encode(&col, None).unwrap();
2351
2352 let decoder = DeltaDecoder;
2353 let (decoded, bitmap) = decoder
2354 .decode(&encoded, values.len(), LogicalType::Int64)
2355 .unwrap();
2356
2357 assert!(bitmap.is_none());
2358 if let Column::Int64(decoded_values) = decoded {
2359 assert_eq!(decoded_values, values);
2360 } else {
2361 panic!("Expected Int64 column");
2362 }
2363 }
2364
2365 #[test]
2366 fn test_for_encoding_small_range() {
2367 let values = vec![1000i64, 1005, 1002, 1008, 1001, 1007];
2368 let col = Column::Int64(values.clone());
2369
2370 let encoder = ForEncoder;
2371 let encoded = encoder.encode(&col, None).unwrap();
2372
2373 let decoder = ForDecoder;
2374 let (decoded, bitmap) = decoder
2375 .decode(&encoded, values.len(), LogicalType::Int64)
2376 .unwrap();
2377
2378 assert!(bitmap.is_none());
2379 if let Column::Int64(decoded_values) = decoded {
2380 assert_eq!(decoded_values, values);
2381 } else {
2382 panic!("Expected Int64 column");
2383 }
2384 }
2385
2386 #[test]
2387 fn test_pfor_encoding_with_outliers() {
2388 let mut values = vec![10i64, 12, 11, 15, 13, 14, 10, 11];
2390 values.push(1000000); values.push(12);
2392 values.push(2000000); let col = Column::Int64(values.clone());
2394
2395 let encoder = PforEncoder::default();
2396 let encoded = encoder.encode(&col, None).unwrap();
2397
2398 let decoder = PforDecoder;
2399 let (decoded, bitmap) = decoder
2400 .decode(&encoded, values.len(), LogicalType::Int64)
2401 .unwrap();
2402
2403 assert!(bitmap.is_none());
2404 if let Column::Int64(decoded_values) = decoded {
2405 assert_eq!(decoded_values, values);
2406 } else {
2407 panic!("Expected Int64 column");
2408 }
2409 }
2410
2411 #[test]
2412 fn test_byte_stream_split_floats() {
2413 let values = vec![1.5f64, 2.7, std::f64::consts::PI, 4.0, 5.5];
2414 let col = Column::Float64(values.clone());
2415
2416 let encoder = ByteStreamSplitEncoder;
2417 let encoded = encoder.encode(&col, None).unwrap();
2418
2419 let decoder = ByteStreamSplitDecoder;
2420 let (decoded, bitmap) = decoder
2421 .decode(&encoded, values.len(), LogicalType::Float64)
2422 .unwrap();
2423
2424 assert!(bitmap.is_none());
2425 if let Column::Float64(decoded_values) = decoded {
2426 assert_eq!(decoded_values, values);
2427 } else {
2428 panic!("Expected Float64 column");
2429 }
2430 }
2431
2432 #[test]
2433 fn test_byte_stream_split_f32_roundtrip() {
2434 let values = vec![1.0f32, -0.5, 3.25, 0.0, std::f32::consts::PI];
2435 let col = Column::Float32(values.clone());
2436
2437 let encoder = ByteStreamSplitEncoder;
2438 let encoded = encoder.encode(&col, None).unwrap();
2439
2440 let decoder = ByteStreamSplitDecoder;
2441 let (decoded, bitmap) = decoder
2442 .decode(&encoded, values.len(), LogicalType::Float32)
2443 .unwrap();
2444
2445 assert!(bitmap.is_none());
2446 if let Column::Float32(decoded_values) = decoded {
2447 assert_eq!(decoded_values, values);
2448 } else {
2449 panic!("Expected Float32 column");
2450 }
2451 }
2452
2453 #[cfg(feature = "compression-zstd")]
2454 #[test]
2455 fn test_byte_stream_split_f32_compression_ratio() {
2456 use rand::{rngs::StdRng, Rng, SeedableRng};
2457 use std::time::Instant;
2458
2459 let mut rng = StdRng::seed_from_u64(42);
2460 let mut values = Vec::with_capacity(100_000);
2461 while values.len() < 100_000 {
2462 let u1: f32 = rng.gen::<f32>().max(f32::MIN_POSITIVE);
2464 let u2: f32 = rng.gen::<f32>();
2465 let r = (-2.0 * u1.ln()).sqrt();
2466 let theta = 2.0 * std::f32::consts::PI * u2;
2467 values.push(r * theta.cos());
2468 if values.len() < 100_000 {
2469 values.push(r * theta.sin());
2470 }
2471 }
2472
2473 let mut msb_counts = std::collections::HashMap::new();
2474 for v in &values {
2475 *msb_counts
2476 .entry((v.to_bits() >> 24) as u8)
2477 .or_insert(0usize) += 1;
2478 }
2479 let mut top = msb_counts.into_iter().collect::<Vec<_>>();
2480 top.sort_by(|a, b| b.1.cmp(&a.1));
2481 println!("Top MSB bytes: {:?}", &top[..5.min(top.len())]);
2482
2483 let col = Column::Float32(values.clone());
2484 let encoder = ByteStreamSplitEncoder;
2485 let t_enc_start = Instant::now();
2486 let encoded = encoder.encode(&col, None).unwrap();
2487 let enc_ms = t_enc_start.elapsed().as_secs_f64() * 1e3;
2488
2489 let _header = encoded[4];
2490 let payload_offset = 6;
2491 println!(
2492 "ByteStreamSplit payload length: {}",
2493 encoded.len().saturating_sub(payload_offset)
2494 );
2495
2496 let compressed = zstd::stream::encode_all(std::io::Cursor::new(&encoded), 3)
2497 .expect("zstd compression should succeed");
2498 let t_dec_start = Instant::now();
2499 let decoder = ByteStreamSplitDecoder;
2500 let (decoded, _) = decoder
2501 .decode(&encoded, values.len(), LogicalType::Float32)
2502 .unwrap();
2503 let dec_ms = t_dec_start.elapsed().as_secs_f64() * 1e3;
2504
2505 let raw_len = (values.len() * std::mem::size_of::<f32>()) as f32;
2506 let ratio = compressed.len() as f32 / raw_len;
2507
2508 assert_eq!(decoded, Column::Float32(values.clone()));
2510
2511 assert!(
2512 ratio < 0.86,
2513 "expected >=14% reduction, got ratio {:.3}",
2514 ratio
2515 );
2516 if std::env::var("ALOPEX_SKIP_PERF_TESTS").is_err() {
2518 assert!(
2519 enc_ms < 220.0,
2520 "encode too slow: {:.2}ms (target <220ms)",
2521 enc_ms
2522 );
2523 assert!(
2524 dec_ms < 30.0,
2525 "decode too slow: {:.2}ms (target <30ms)",
2526 dec_ms
2527 );
2528 }
2529 println!("encode_ms={:.2} decode_ms={:.2}", enc_ms, dec_ms);
2530 }
2531
2532 #[cfg(any(feature = "compression-lz4", feature = "compression-zstd"))]
2533 #[test]
2534 fn bench_byte_stream_split_layout_variants() {
2535 use rand::{rngs::StdRng, Rng, SeedableRng};
2536
2537 #[derive(Clone, Copy)]
2538 struct LayoutConfig {
2539 name: &'static str,
2540 block: Option<usize>,
2541 block_bitshuffle: Option<usize>,
2542 msb_first: bool,
2543 xor_delta: bool,
2544 delta_xor: bool,
2545 sign_split: bool,
2546 per_stream_lz4: bool,
2547 per_stream_zstd: bool,
2548 outer_zstd: bool,
2549 bitshuffle: bool,
2550 exponent_split: bool,
2551 exponent_rle: bool,
2552 exponent_delta: bool,
2553 fpc_predict: bool,
2554 }
2555
2556 struct VariantEncoded {
2557 sign_bytes: Vec<u8>,
2558 streams: Vec<Vec<u8>>,
2559 payload_concat: Vec<u8>, }
2561
2562 let mut rng = StdRng::seed_from_u64(42);
2563 let mut values = Vec::with_capacity(100_000);
2564 while values.len() < 100_000 {
2565 let u1: f32 = rng.gen::<f32>().max(f32::MIN_POSITIVE);
2566 let u2: f32 = rng.gen::<f32>();
2567 let r = (-2.0 * u1.ln()).sqrt();
2568 let theta = 2.0 * std::f32::consts::PI * u2;
2569 values.push(r * theta.cos());
2570 if values.len() < 100_000 {
2571 values.push(r * theta.sin());
2572 }
2573 }
2574
2575 let raw_len = (values.len() * std::mem::size_of::<f32>()) as f32;
2576
2577 let configs = [
2578 LayoutConfig {
2579 name: "legacy_lsb",
2580 block: None,
2581 block_bitshuffle: None,
2582 msb_first: false,
2583 xor_delta: false,
2584 delta_xor: false,
2585 sign_split: false,
2586 per_stream_lz4: false,
2587 per_stream_zstd: false,
2588 outer_zstd: false,
2589 bitshuffle: false,
2590 exponent_split: false,
2591 exponent_rle: false,
2592 exponent_delta: false,
2593 fpc_predict: false,
2594 },
2595 LayoutConfig {
2596 name: "msb_block_256",
2597 block: Some(256),
2598 block_bitshuffle: None,
2599 msb_first: true,
2600 xor_delta: false,
2601 delta_xor: false,
2602 sign_split: false,
2603 per_stream_lz4: false,
2604 per_stream_zstd: false,
2605 outer_zstd: false,
2606 bitshuffle: false,
2607 exponent_split: false,
2608 exponent_rle: false,
2609 exponent_delta: false,
2610 fpc_predict: false,
2611 },
2612 LayoutConfig {
2613 name: "msb_block_1024",
2614 block: Some(1024),
2615 block_bitshuffle: None,
2616 msb_first: true,
2617 xor_delta: false,
2618 delta_xor: false,
2619 sign_split: false,
2620 per_stream_lz4: false,
2621 per_stream_zstd: false,
2622 outer_zstd: false,
2623 bitshuffle: false,
2624 exponent_split: false,
2625 exponent_rle: false,
2626 exponent_delta: false,
2627 fpc_predict: false,
2628 },
2629 LayoutConfig {
2630 name: "msb_block_256_xor",
2631 block: Some(256),
2632 block_bitshuffle: None,
2633 msb_first: true,
2634 xor_delta: true,
2635 delta_xor: false,
2636 sign_split: false,
2637 per_stream_lz4: false,
2638 per_stream_zstd: false,
2639 outer_zstd: false,
2640 bitshuffle: false,
2641 exponent_split: false,
2642 exponent_rle: false,
2643 exponent_delta: false,
2644 fpc_predict: false,
2645 },
2646 LayoutConfig {
2647 name: "sign_split_msb_256_outer",
2648 block: Some(256),
2649 block_bitshuffle: None,
2650 msb_first: true,
2651 xor_delta: false,
2652 delta_xor: false,
2653 sign_split: true,
2654 per_stream_lz4: false,
2655 per_stream_zstd: false,
2656 outer_zstd: false,
2657 bitshuffle: false,
2658 exponent_split: false,
2659 exponent_rle: false,
2660 exponent_delta: false,
2661 fpc_predict: false,
2662 },
2663 LayoutConfig {
2664 name: "sign_split_msb_256_per_stream_lz4",
2665 block: Some(256),
2666 block_bitshuffle: None,
2667 msb_first: true,
2668 xor_delta: false,
2669 delta_xor: false,
2670 sign_split: true,
2671 per_stream_lz4: true,
2672 per_stream_zstd: false,
2673 outer_zstd: false,
2674 bitshuffle: false,
2675 exponent_split: false,
2676 exponent_rle: false,
2677 exponent_delta: false,
2678 fpc_predict: false,
2679 },
2680 LayoutConfig {
2681 name: "sign_split_msb_128_per_stream_lz4",
2682 block: Some(128),
2683 block_bitshuffle: None,
2684 msb_first: true,
2685 xor_delta: false,
2686 delta_xor: false,
2687 sign_split: true,
2688 per_stream_lz4: true,
2689 per_stream_zstd: false,
2690 outer_zstd: false,
2691 bitshuffle: false,
2692 exponent_split: false,
2693 exponent_rle: false,
2694 exponent_delta: false,
2695 fpc_predict: false,
2696 },
2697 LayoutConfig {
2698 name: "sign_split_msb_64_per_stream_lz4",
2699 block: Some(64),
2700 block_bitshuffle: None,
2701 msb_first: true,
2702 xor_delta: false,
2703 delta_xor: false,
2704 sign_split: true,
2705 per_stream_lz4: true,
2706 per_stream_zstd: false,
2707 outer_zstd: false,
2708 bitshuffle: false,
2709 exponent_split: false,
2710 exponent_rle: false,
2711 exponent_delta: false,
2712 fpc_predict: false,
2713 },
2714 LayoutConfig {
2715 name: "sign_split_msb_256_per_stream_lz4_delta",
2716 block: Some(256),
2717 block_bitshuffle: None,
2718 msb_first: true,
2719 xor_delta: false,
2720 delta_xor: true,
2721 sign_split: true,
2722 per_stream_lz4: true,
2723 per_stream_zstd: false,
2724 outer_zstd: false,
2725 bitshuffle: false,
2726 exponent_split: false,
2727 exponent_rle: false,
2728 exponent_delta: false,
2729 fpc_predict: false,
2730 },
2731 LayoutConfig {
2732 name: "sign_split_bitshuffle",
2733 block: None,
2734 block_bitshuffle: None,
2735 msb_first: false,
2736 xor_delta: false,
2737 delta_xor: false,
2738 sign_split: true,
2739 per_stream_lz4: true,
2740 per_stream_zstd: false,
2741 outer_zstd: false,
2742 bitshuffle: true,
2743 exponent_split: false,
2744 exponent_rle: false,
2745 exponent_delta: false,
2746 fpc_predict: false,
2747 },
2748 LayoutConfig {
2749 name: "sign_split_bitshuffle_per_stream_zstd",
2750 block: None,
2751 block_bitshuffle: None,
2752 msb_first: false,
2753 xor_delta: false,
2754 delta_xor: false,
2755 sign_split: true,
2756 per_stream_lz4: false,
2757 per_stream_zstd: true,
2758 outer_zstd: false,
2759 bitshuffle: true,
2760 exponent_split: false,
2761 exponent_rle: false,
2762 exponent_delta: false,
2763 fpc_predict: false,
2764 },
2765 LayoutConfig {
2766 name: "sign_split_bitshuffle_outer_only",
2767 block: None,
2768 block_bitshuffle: None,
2769 msb_first: false,
2770 xor_delta: false,
2771 delta_xor: false,
2772 sign_split: true,
2773 per_stream_lz4: false,
2774 per_stream_zstd: false,
2775 outer_zstd: false,
2776 bitshuffle: true,
2777 exponent_split: false,
2778 exponent_rle: false,
2779 exponent_delta: false,
2780 fpc_predict: false,
2781 },
2782 LayoutConfig {
2783 name: "sign_split_exp_split_msb_256",
2784 block: Some(256),
2785 block_bitshuffle: None,
2786 msb_first: true,
2787 xor_delta: false,
2788 delta_xor: false,
2789 sign_split: true,
2790 per_stream_lz4: true,
2791 per_stream_zstd: false,
2792 outer_zstd: false,
2793 bitshuffle: false,
2794 exponent_split: true,
2795 exponent_rle: false,
2796 exponent_delta: false,
2797 fpc_predict: false,
2798 },
2799 LayoutConfig {
2800 name: "sign_split_exp_split_msb_256_delta",
2801 block: Some(256),
2802 block_bitshuffle: None,
2803 msb_first: true,
2804 xor_delta: false,
2805 delta_xor: true,
2806 sign_split: true,
2807 per_stream_lz4: true,
2808 per_stream_zstd: false,
2809 outer_zstd: false,
2810 bitshuffle: false,
2811 exponent_split: true,
2812 exponent_rle: false,
2813 exponent_delta: true,
2814 fpc_predict: false,
2815 },
2816 LayoutConfig {
2817 name: "sign_split_exp_bitshuffle",
2818 block: None,
2819 block_bitshuffle: None,
2820 msb_first: false,
2821 xor_delta: false,
2822 delta_xor: false,
2823 sign_split: true,
2824 per_stream_lz4: true,
2825 per_stream_zstd: false,
2826 outer_zstd: false,
2827 bitshuffle: true,
2828 exponent_split: true,
2829 exponent_rle: false,
2830 exponent_delta: false,
2831 fpc_predict: false,
2832 },
2833 LayoutConfig {
2834 name: "sign_split_exp_bitshuffle_per_stream_zstd",
2835 block: None,
2836 block_bitshuffle: None,
2837 msb_first: false,
2838 xor_delta: false,
2839 delta_xor: false,
2840 sign_split: true,
2841 per_stream_lz4: false,
2842 per_stream_zstd: true,
2843 outer_zstd: false,
2844 bitshuffle: true,
2845 exponent_split: true,
2846 exponent_rle: false,
2847 exponent_delta: false,
2848 fpc_predict: false,
2849 },
2850 LayoutConfig {
2851 name: "sign_split_exp_bitshuffle_outer_zstd",
2852 block: None,
2853 block_bitshuffle: None,
2854 msb_first: false,
2855 xor_delta: false,
2856 delta_xor: false,
2857 sign_split: true,
2858 per_stream_lz4: false,
2859 per_stream_zstd: false,
2860 outer_zstd: true,
2861 bitshuffle: true,
2862 exponent_split: true,
2863 exponent_rle: false,
2864 exponent_delta: false,
2865 fpc_predict: false,
2866 },
2867 LayoutConfig {
2868 name: "sign_split_exp_rle_bitshuffle",
2869 block: None,
2870 block_bitshuffle: None,
2871 msb_first: false,
2872 xor_delta: false,
2873 delta_xor: false,
2874 sign_split: true,
2875 per_stream_lz4: true,
2876 per_stream_zstd: false,
2877 outer_zstd: false,
2878 bitshuffle: true,
2879 exponent_split: true,
2880 exponent_rle: true,
2881 exponent_delta: false,
2882 fpc_predict: false,
2883 },
2884 LayoutConfig {
2885 name: "sign_split_delta_msb_256",
2886 block: Some(256),
2887 block_bitshuffle: None,
2888 msb_first: true,
2889 xor_delta: false,
2890 delta_xor: true,
2891 sign_split: true,
2892 per_stream_lz4: true,
2893 per_stream_zstd: false,
2894 outer_zstd: false,
2895 bitshuffle: false,
2896 exponent_split: false,
2897 exponent_rle: false,
2898 exponent_delta: false,
2899 fpc_predict: false,
2900 },
2901 LayoutConfig {
2902 name: "sign_split_delta_bitshuffle",
2903 block: None,
2904 block_bitshuffle: None,
2905 msb_first: false,
2906 xor_delta: false,
2907 delta_xor: true,
2908 sign_split: true,
2909 per_stream_lz4: true,
2910 per_stream_zstd: false,
2911 outer_zstd: false,
2912 bitshuffle: true,
2913 exponent_split: false,
2914 exponent_rle: false,
2915 exponent_delta: false,
2916 fpc_predict: false,
2917 },
2918 LayoutConfig {
2919 name: "sign_split_delta_bitshuffle_per_stream_zstd",
2920 block: None,
2921 block_bitshuffle: None,
2922 msb_first: false,
2923 xor_delta: false,
2924 delta_xor: true,
2925 sign_split: true,
2926 per_stream_lz4: false,
2927 per_stream_zstd: true,
2928 outer_zstd: false,
2929 bitshuffle: true,
2930 exponent_split: false,
2931 exponent_rle: false,
2932 exponent_delta: false,
2933 fpc_predict: false,
2934 },
2935 LayoutConfig {
2936 name: "sign_split_exp_delta_bitshuffle",
2937 block: None,
2938 block_bitshuffle: None,
2939 msb_first: false,
2940 xor_delta: false,
2941 delta_xor: true,
2942 sign_split: true,
2943 per_stream_lz4: true,
2944 per_stream_zstd: false,
2945 outer_zstd: false,
2946 bitshuffle: true,
2947 exponent_split: true,
2948 exponent_rle: false,
2949 exponent_delta: true,
2950 fpc_predict: false,
2951 },
2952 LayoutConfig {
2953 name: "sign_split_exp_delta_bitshuffle_per_stream_zstd",
2954 block: None,
2955 block_bitshuffle: None,
2956 msb_first: false,
2957 xor_delta: false,
2958 delta_xor: true,
2959 sign_split: true,
2960 per_stream_lz4: false,
2961 per_stream_zstd: true,
2962 outer_zstd: false,
2963 bitshuffle: true,
2964 exponent_split: true,
2965 exponent_rle: false,
2966 exponent_delta: true,
2967 fpc_predict: false,
2968 },
2969 LayoutConfig {
2970 name: "sign_split_bitshuffle_block256",
2971 block: None,
2972 block_bitshuffle: Some(256),
2973 msb_first: false,
2974 xor_delta: false,
2975 delta_xor: false,
2976 sign_split: true,
2977 per_stream_lz4: true,
2978 per_stream_zstd: false,
2979 outer_zstd: false,
2980 bitshuffle: true,
2981 exponent_split: false,
2982 exponent_rle: false,
2983 exponent_delta: false,
2984 fpc_predict: false,
2985 },
2986 LayoutConfig {
2987 name: "sign_split_bitshuffle_block256_per_stream_zstd",
2988 block: None,
2989 block_bitshuffle: Some(256),
2990 msb_first: false,
2991 xor_delta: false,
2992 delta_xor: false,
2993 sign_split: true,
2994 per_stream_lz4: false,
2995 per_stream_zstd: true,
2996 outer_zstd: false,
2997 bitshuffle: true,
2998 exponent_split: false,
2999 exponent_rle: false,
3000 exponent_delta: false,
3001 fpc_predict: false,
3002 },
3003 LayoutConfig {
3004 name: "sign_split_bitshuffle_block1024",
3005 block: None,
3006 block_bitshuffle: Some(1024),
3007 msb_first: false,
3008 xor_delta: false,
3009 delta_xor: false,
3010 sign_split: true,
3011 per_stream_lz4: true,
3012 per_stream_zstd: false,
3013 outer_zstd: false,
3014 bitshuffle: true,
3015 exponent_split: false,
3016 exponent_rle: false,
3017 exponent_delta: false,
3018 fpc_predict: false,
3019 },
3020 LayoutConfig {
3021 name: "sign_split_exp_bitshuffle_block256",
3022 block: None,
3023 block_bitshuffle: Some(256),
3024 msb_first: false,
3025 xor_delta: false,
3026 delta_xor: false,
3027 sign_split: true,
3028 per_stream_lz4: true,
3029 per_stream_zstd: false,
3030 outer_zstd: false,
3031 bitshuffle: true,
3032 exponent_split: true,
3033 exponent_rle: false,
3034 exponent_delta: false,
3035 fpc_predict: false,
3036 },
3037 LayoutConfig {
3038 name: "sign_split_exp_bitshuffle_block256_per_stream_zstd",
3039 block: None,
3040 block_bitshuffle: Some(256),
3041 msb_first: false,
3042 xor_delta: false,
3043 delta_xor: false,
3044 sign_split: true,
3045 per_stream_lz4: false,
3046 per_stream_zstd: true,
3047 outer_zstd: false,
3048 bitshuffle: true,
3049 exponent_split: true,
3050 exponent_rle: false,
3051 exponent_delta: false,
3052 fpc_predict: false,
3053 },
3054 LayoutConfig {
3055 name: "sign_split_msb_256_per_stream_zstd",
3056 block: Some(256),
3057 block_bitshuffle: None,
3058 msb_first: true,
3059 xor_delta: false,
3060 delta_xor: false,
3061 sign_split: true,
3062 per_stream_lz4: false,
3063 per_stream_zstd: true,
3064 outer_zstd: false,
3065 bitshuffle: false,
3066 exponent_split: false,
3067 exponent_rle: false,
3068 exponent_delta: false,
3069 fpc_predict: false,
3070 },
3071 LayoutConfig {
3072 name: "sign_split_msb_256_outer_zstd",
3073 block: Some(256),
3074 block_bitshuffle: None,
3075 msb_first: true,
3076 xor_delta: false,
3077 delta_xor: false,
3078 sign_split: true,
3079 per_stream_lz4: false,
3080 per_stream_zstd: false,
3081 outer_zstd: true,
3082 bitshuffle: false,
3083 exponent_split: false,
3084 exponent_rle: false,
3085 exponent_delta: false,
3086 fpc_predict: false,
3087 },
3088 LayoutConfig {
3089 name: "sign_split_fpc_predict_per_stream_zstd",
3090 block: None,
3091 block_bitshuffle: None,
3092 msb_first: false,
3093 xor_delta: false,
3094 delta_xor: false,
3095 sign_split: true,
3096 per_stream_lz4: false,
3097 per_stream_zstd: true,
3098 outer_zstd: false,
3099 bitshuffle: false,
3100 exponent_split: false,
3101 exponent_rle: false,
3102 exponent_delta: false,
3103 fpc_predict: true,
3104 },
3105 ];
3106 #[derive(Default)]
3107 struct Timings {
3108 encode_ms: f64,
3109 decode_ms: f64,
3110 }
3111
3112 #[cfg(feature = "compression-lz4")]
3113 #[allow(dead_code)]
3114 fn decompress_lz4(payload: &[u8], orig_len: usize) -> Vec<u8> {
3115 let orig_len_i32: i32 = orig_len.try_into().unwrap();
3116 lz4::block::decompress(payload, Some(orig_len_i32)).unwrap()
3117 }
3118
3119 #[cfg(feature = "compression-zstd")]
3120 #[allow(dead_code)]
3121 fn decompress_zstd(payload: &[u8]) -> Vec<u8> {
3122 zstd::stream::decode_all(std::io::Cursor::new(payload)).unwrap()
3123 }
3124
3125 fn bitshuffle_u32(values: &[u32]) -> Vec<u8> {
3126 let count = values.len();
3127 let bytes_per_plane = count.div_ceil(8);
3128 let mut out = vec![0u8; bytes_per_plane * 32];
3129 for bit in 0..32 {
3130 let base = bit * bytes_per_plane;
3131 for (idx, &v) in values.iter().enumerate() {
3132 if (v >> bit) & 1 != 0 {
3133 out[base + idx / 8] |= 1 << (idx % 8);
3134 }
3135 }
3136 }
3137 out
3138 }
3139
3140 fn bitshuffle_block_u32(values: &[u32], block: usize) -> Vec<u8> {
3141 let mut out = Vec::with_capacity(values.len().div_ceil(8) * 32);
3142 let mut offset = 0;
3143 while offset < values.len() {
3144 let len = (values.len() - offset).min(block);
3145 let bytes_per_plane = len.div_ceil(8);
3146 out.resize(out.len() + bytes_per_plane * 32, 0);
3147 let base_out = out.len() - bytes_per_plane * 32;
3148 for bit in 0..32 {
3149 let plane_base = base_out + bit * bytes_per_plane;
3150 for idx in 0..len {
3151 if (values[offset + idx] >> bit) & 1 != 0 {
3152 out[plane_base + idx / 8] |= 1 << (idx % 8);
3153 }
3154 }
3155 }
3156 offset += len;
3157 }
3158 out
3159 }
3160
3161 fn bitunshuffle_block_u32(data: &[u8], count: usize, block: usize) -> Vec<u32> {
3162 let mut values = vec![0u32; count];
3163 let mut offset = 0;
3164 let mut data_offset = 0;
3165 while offset < count {
3166 let len = (count - offset).min(block);
3167 let bytes_per_plane = len.div_ceil(8);
3168 for bit in 0..32 {
3169 let plane_base = data_offset + bit * bytes_per_plane;
3170 for idx in 0..len {
3171 if (data[plane_base + idx / 8] >> (idx % 8)) & 1 != 0 {
3172 values[offset + idx] |= 1u32 << bit;
3173 }
3174 }
3175 }
3176 data_offset += bytes_per_plane * 32;
3177 offset += len;
3178 }
3179 values
3180 }
3181
3182 fn rle_encode_u8(values: &[u8]) -> Vec<u8> {
3183 let mut out = Vec::new();
3184 if values.is_empty() {
3185 return out;
3186 }
3187 let mut cur = values[0];
3188 let mut run: u16 = 1;
3189 for &v in &values[1..] {
3190 if v == cur && run < u16::MAX {
3191 run += 1;
3192 } else {
3193 out.push(cur);
3194 out.extend_from_slice(&run.to_le_bytes());
3195 cur = v;
3196 run = 1;
3197 }
3198 }
3199 out.push(cur);
3200 out.extend_from_slice(&run.to_le_bytes());
3201 out
3202 }
3203
3204 fn rle_decode_u8(bytes: &[u8], expected_len: usize) -> Vec<u8> {
3205 let mut out = Vec::with_capacity(expected_len);
3206 let mut i = 0;
3207 while i + 3 <= bytes.len() {
3208 let val = bytes[i];
3209 let run = u16::from_le_bytes([bytes[i + 1], bytes[i + 2]]) as usize;
3210 i += 3;
3211 for _ in 0..run {
3212 out.push(val);
3213 }
3214 if out.len() >= expected_len {
3215 break;
3216 }
3217 }
3218 out
3219 }
3220
3221 fn delta_xor_u32(values: &mut [u32]) {
3222 let mut prev = 0u32;
3223 for v in values.iter_mut() {
3224 let cur = *v;
3225 *v ^= prev;
3226 prev = cur;
3227 }
3228 }
3229
3230 fn inv_delta_xor_u32(values: &mut [u32]) {
3231 let mut prev = 0u32;
3232 for v in values.iter_mut() {
3233 let cur = *v ^ prev;
3234 prev = cur;
3235 *v = cur;
3236 }
3237 }
3238
3239 fn delta_xor_u8(values: &mut [u8]) {
3240 let mut prev = 0u8;
3241 for v in values.iter_mut() {
3242 let cur = *v;
3243 *v ^= prev;
3244 prev = cur;
3245 }
3246 }
3247
3248 fn inv_delta_xor_u8(values: &mut [u8]) {
3249 let mut prev = 0u8;
3250 for v in values.iter_mut() {
3251 let cur = *v ^ prev;
3252 prev = cur;
3253 *v = cur;
3254 }
3255 }
3256
3257 fn encode_variant(
3258 values: &[f32],
3259 cfg: LayoutConfig,
3260 timings: &mut Timings,
3261 ) -> VariantEncoded {
3262 let start = std::time::Instant::now();
3263 let mut sign_bytes = if cfg.sign_split {
3264 vec![0u8; values.len().div_ceil(8)]
3265 } else {
3266 Vec::new()
3267 };
3268
3269 let bytes_per_value = 4;
3270 let num_values = values.len();
3271 let mut streams: Vec<Vec<u8>> = Vec::new();
3272
3273 if cfg.fpc_predict {
3274 let mut len_stream = Vec::with_capacity(num_values);
3276 let mut payload = Vec::with_capacity(num_values * 2); let mut prev1: u32 = 0;
3278 let mut prev2: u32 = 0;
3279 for (idx, v) in values.iter().enumerate() {
3280 let mut bits = v.to_bits();
3281 if cfg.sign_split {
3282 if bits & 0x8000_0000 != 0 {
3283 sign_bytes[idx / 8] |= 1 << (idx % 8);
3284 }
3285 bits &= 0x7fff_ffff;
3286 }
3287 let pred = prev1.wrapping_add(prev1.wrapping_sub(prev2));
3288 let diff = bits ^ pred;
3289 let lz_bytes = (diff.leading_zeros() / 8) as u8;
3290 let lz_clamped = lz_bytes.min(3); let sig_len = if diff == 0 {
3292 0
3293 } else {
3294 4 - lz_clamped as usize
3295 };
3296 len_stream.push(sig_len as u8);
3297 if sig_len > 0 {
3298 let be = diff.to_be_bytes();
3299 payload.extend_from_slice(&be[4 - sig_len..]);
3300 }
3301 prev2 = prev1;
3302 prev1 = bits;
3303 }
3304 streams.push(len_stream);
3305 streams.push(payload);
3306 } else if cfg.exponent_split {
3307 let mut exp_stream = Vec::with_capacity(num_values);
3309 let mut mant = Vec::with_capacity(num_values);
3310 for (idx, v) in values.iter().enumerate() {
3311 let mut bits = v.to_bits();
3312 if cfg.sign_split {
3313 if bits & 0x8000_0000 != 0 {
3314 sign_bytes[idx / 8] |= 1 << (idx % 8);
3315 }
3316 bits &= 0x7fff_ffff;
3317 }
3318 let exp = ((bits >> 23) & 0xFF) as u8;
3319 let mantissa = bits & 0x7F_FFFF;
3320 exp_stream.push(exp);
3321 mant.push(mantissa);
3322 }
3323 if cfg.exponent_delta {
3324 delta_xor_u8(&mut exp_stream);
3325 }
3326 if cfg.delta_xor {
3327 delta_xor_u32(&mut mant);
3328 }
3329 if cfg.bitshuffle {
3330 let mant_stream = if let Some(block) = cfg.block_bitshuffle {
3331 bitshuffle_block_u32(&mant, block)
3332 } else {
3333 bitshuffle_u32(&mant)
3334 };
3335 if cfg.exponent_rle {
3336 streams.push(rle_encode_u8(&exp_stream));
3337 } else {
3338 streams.push(exp_stream);
3339 }
3340 streams.push(mant_stream);
3341 } else {
3342 if cfg.exponent_rle {
3343 streams.push(rle_encode_u8(&exp_stream));
3344 } else {
3345 streams.push(exp_stream);
3346 }
3347 let mut mant_bytes = Vec::with_capacity(num_values * 3);
3348 for m in mant {
3349 let bytes = m.to_le_bytes();
3350 mant_bytes.extend_from_slice(&bytes[0..3]);
3351 }
3352 streams.push(mant_bytes);
3353 }
3354 } else if cfg.bitshuffle {
3355 let mut bits_vec = Vec::with_capacity(num_values);
3357 for (idx, v) in values.iter().enumerate() {
3358 let mut bits = v.to_bits();
3359 if cfg.sign_split {
3360 if bits & 0x8000_0000 != 0 {
3361 sign_bytes[idx / 8] |= 1 << (idx % 8);
3362 }
3363 bits &= 0x7fff_ffff;
3364 }
3365 bits_vec.push(bits);
3366 }
3367 if cfg.delta_xor {
3368 delta_xor_u32(&mut bits_vec);
3369 }
3370 let out = if let Some(block) = cfg.block_bitshuffle {
3371 bitshuffle_block_u32(&bits_vec, block)
3372 } else {
3373 bitshuffle_u32(&bits_vec)
3374 };
3375 streams.push(out);
3376 } else {
3377 let mut vals_u32 = Vec::with_capacity(num_values);
3378 for (idx, v) in values.iter().enumerate() {
3379 let mut bits = v.to_bits();
3380 if cfg.sign_split {
3381 if bits & 0x8000_0000 != 0 {
3382 sign_bytes[idx / 8] |= 1 << (idx % 8);
3383 }
3384 bits &= 0x7fff_ffff;
3385 }
3386 vals_u32.push(bits);
3387 }
3388 if cfg.delta_xor {
3389 delta_xor_u32(&mut vals_u32);
3390 }
3391 let mut raw_bytes = Vec::with_capacity(num_values * bytes_per_value);
3392 for bits in vals_u32 {
3393 raw_bytes.extend_from_slice(&bits.to_le_bytes());
3394 }
3395 streams = (0..bytes_per_value)
3396 .map(|_| Vec::with_capacity(num_values))
3397 .collect();
3398 let order: Vec<usize> = if cfg.exponent_split && bytes_per_value == 4 {
3399 vec![3, 2, 1, 0] } else if cfg.msb_first {
3401 (0..bytes_per_value).rev().collect()
3402 } else {
3403 (0..bytes_per_value).collect()
3404 };
3405
3406 let mut offset = 0;
3407 while offset < num_values {
3408 let block_len = cfg.block.unwrap_or(num_values).min(num_values - offset);
3409 for (stream_idx, byte_idx) in order.iter().enumerate() {
3410 let stream = &mut streams[stream_idx];
3411 let start = offset * bytes_per_value + byte_idx;
3412 for value_idx in 0..block_len {
3413 stream.push(raw_bytes[start + value_idx * bytes_per_value]);
3414 }
3415 }
3416 offset += block_len;
3417 }
3418
3419 if cfg.xor_delta {
3420 for stream in streams.iter_mut() {
3421 let mut prev = 0u8;
3422 for b in stream.iter_mut() {
3423 let cur = *b;
3424 *b ^= prev;
3425 prev = cur;
3426 }
3427 }
3428 }
3429 }
3430
3431 let mut payload_concat =
3432 Vec::with_capacity(sign_bytes.len() + num_values * bytes_per_value);
3433 payload_concat.extend_from_slice(&sign_bytes);
3434
3435 for stream in &streams {
3436 if cfg.per_stream_zstd {
3437 #[cfg(feature = "compression-zstd")]
3438 {
3439 let compressed = zstd::stream::encode_all(std::io::Cursor::new(stream), 9)
3440 .unwrap_or_else(|_| stream.clone());
3441 if compressed.len() < stream.len() {
3442 payload_concat.extend_from_slice(&compressed);
3443 continue;
3444 }
3445 }
3446 }
3447 if cfg.per_stream_lz4 {
3448 #[cfg(feature = "compression-lz4")]
3449 {
3450 if let Ok(compressed) = lz4::block::compress(
3451 stream,
3452 Some(lz4::block::CompressionMode::HIGHCOMPRESSION(12)),
3453 false,
3454 ) {
3455 if compressed.len() < stream.len() {
3456 payload_concat.extend_from_slice(&compressed);
3457 continue;
3458 }
3459 }
3460 }
3461 }
3462 payload_concat.extend_from_slice(stream);
3463 }
3464
3465 timings.encode_ms += start.elapsed().as_secs_f64() * 1e3;
3466 VariantEncoded {
3467 sign_bytes,
3468 streams,
3469 payload_concat,
3470 }
3471 }
3472
3473 fn decode_variant(
3474 encoded: &VariantEncoded,
3475 cfg: LayoutConfig,
3476 value_count: usize,
3477 timings: &mut Timings,
3478 ) -> Vec<f32> {
3479 let start = std::time::Instant::now();
3480 let bytes_per_value = 4;
3481
3482 if cfg.fpc_predict {
3483 let len_stream = &encoded.streams[0];
3484 let payload = &encoded.streams[1];
3485 let mut values = Vec::with_capacity(value_count);
3486 let mut prev1: u32 = 0;
3487 let mut prev2: u32 = 0;
3488 let mut payload_pos = 0;
3489 for (idx, &len_byte) in len_stream.iter().take(value_count).enumerate() {
3490 let sig_len = len_byte as usize;
3491 let mut diff: u32 = 0;
3492 if sig_len > 0 {
3493 let mut buf = [0u8; 4];
3494 for b in 0..sig_len {
3495 buf[4 - sig_len + b] = payload[payload_pos + b];
3496 }
3497 payload_pos += sig_len;
3498 diff = u32::from_be_bytes(buf);
3499 }
3500 let pred = prev1.wrapping_add(prev1.wrapping_sub(prev2));
3501 let mut bits = diff ^ pred;
3502 prev2 = prev1;
3503 prev1 = bits;
3504 if cfg.sign_split && (encoded.sign_bytes[idx / 8] >> (idx % 8)) & 1 != 0 {
3505 bits |= 0x8000_0000;
3506 }
3507 values.push(f32::from_bits(bits));
3508 }
3509 timings.decode_ms += start.elapsed().as_secs_f64() * 1e3;
3510 return values;
3511 } else if cfg.exponent_split {
3512 fn bitunshuffle_u32(data: &[u8], count: usize) -> Vec<u32> {
3513 let bytes_per_plane = count.div_ceil(8);
3514 let expected = bytes_per_plane * 32;
3515 assert_eq!(data.len(), expected);
3516 let mut values = vec![0u32; count];
3517 for bit in 0..32 {
3518 let base = bit * bytes_per_plane;
3519 for idx in 0..count {
3520 if (data[base + idx / 8] >> (idx % 8)) & 1 != 0 {
3521 values[idx] |= 1u32 << bit;
3522 }
3523 }
3524 }
3525 values
3526 }
3527
3528 let exp_stream_raw = &encoded.streams[0];
3529 let exp_stream = if cfg.exponent_rle {
3530 rle_decode_u8(exp_stream_raw, value_count)
3531 } else {
3532 exp_stream_raw.clone()
3533 };
3534 let mut exp_stream = exp_stream;
3535 if cfg.exponent_delta {
3536 inv_delta_xor_u8(&mut exp_stream);
3537 }
3538 if cfg.bitshuffle {
3539 let mant_stream = &encoded.streams[1];
3540 assert_eq!(exp_stream.len(), value_count);
3541 let mut mant_values = if let Some(block) = cfg.block_bitshuffle {
3542 bitunshuffle_block_u32(mant_stream, value_count, block)
3543 } else {
3544 bitunshuffle_u32(mant_stream, value_count)
3545 };
3546 if cfg.delta_xor {
3547 inv_delta_xor_u32(&mut mant_values);
3548 }
3549
3550 let mut values = Vec::with_capacity(value_count);
3551 for idx in 0..value_count {
3552 let mut bits = mant_values[idx] | ((exp_stream[idx] as u32) << 23);
3553 if cfg.sign_split && (encoded.sign_bytes[idx / 8] >> (idx % 8)) & 1 != 0 {
3554 bits |= 0x8000_0000;
3555 }
3556 values.push(f32::from_bits(bits));
3557 }
3558 timings.decode_ms += start.elapsed().as_secs_f64() * 1e3;
3559 return values;
3560 } else {
3561 let mant_stream = &encoded.streams[1];
3562 assert_eq!(mant_stream.len(), value_count * 3);
3563 let mut mant_values = Vec::with_capacity(value_count);
3564 for idx in 0..value_count {
3565 let start = idx * 3;
3566 let mut buf = [0u8; 4];
3567 buf[0..3].copy_from_slice(&mant_stream[start..start + 3]);
3568 mant_values.push(u32::from_le_bytes(buf));
3569 }
3570 if cfg.delta_xor {
3571 inv_delta_xor_u32(&mut mant_values);
3572 }
3573
3574 let mut values = Vec::with_capacity(value_count);
3575 for idx in 0..value_count {
3576 let mut bits = mant_values[idx] | ((exp_stream[idx] as u32) << 23);
3577 if cfg.sign_split && (encoded.sign_bytes[idx / 8] >> (idx % 8)) & 1 != 0 {
3578 bits |= 0x8000_0000;
3579 }
3580 values.push(f32::from_bits(bits));
3581 }
3582 timings.decode_ms += start.elapsed().as_secs_f64() * 1e3;
3583 return values;
3584 }
3585 } else if cfg.bitshuffle {
3586 let data = &encoded.streams[0];
3587 let mut bits_vec = if let Some(block) = cfg.block_bitshuffle {
3588 bitunshuffle_block_u32(data, value_count, block)
3589 } else {
3590 let planes = value_count.div_ceil(8);
3591 assert_eq!(data.len(), planes * 32);
3592 let mut vals = vec![0u32; value_count];
3593 for bit in 0..32 {
3594 let base = bit * planes;
3595 for idx in 0..value_count {
3596 if (data[base + idx / 8] >> (idx % 8)) & 1 != 0 {
3597 vals[idx] |= 1u32 << bit;
3598 }
3599 }
3600 }
3601 vals
3602 };
3603 if cfg.delta_xor {
3604 inv_delta_xor_u32(&mut bits_vec);
3605 }
3606
3607 let mut values = Vec::with_capacity(value_count);
3608 for (idx, &bits_value) in bits_vec.iter().take(value_count).enumerate() {
3609 let mut bits = bits_value;
3610 if cfg.sign_split && (encoded.sign_bytes[idx / 8] >> (idx % 8)) & 1 != 0 {
3611 bits |= 0x8000_0000;
3612 }
3613 values.push(f32::from_bits(bits));
3614 }
3615 timings.decode_ms += start.elapsed().as_secs_f64() * 1e3;
3616 return values;
3617 }
3618
3619 let mut streams = encoded.streams.clone();
3620
3621 if cfg.xor_delta {
3622 for stream in streams.iter_mut() {
3623 let mut prev = 0u8;
3624 for b in stream.iter_mut() {
3625 let cur = *b ^ prev;
3626 prev = cur;
3627 *b = cur;
3628 }
3629 }
3630 }
3631
3632 let order: Vec<usize> = if cfg.exponent_split && bytes_per_value == 4 {
3633 vec![3, 2, 1, 0]
3634 } else if cfg.msb_first {
3635 (0..bytes_per_value).rev().collect()
3636 } else {
3637 (0..bytes_per_value).collect()
3638 };
3639
3640 let mut raw_bytes = vec![0u8; value_count * bytes_per_value];
3641 let mut offset = 0;
3642 while offset < value_count {
3643 let block_len = cfg.block.unwrap_or(value_count).min(value_count - offset);
3644 for (stream_idx, byte_idx) in order.iter().enumerate() {
3645 let stream = &streams[stream_idx];
3646 let start = offset;
3647 for value_idx in 0..block_len {
3648 raw_bytes[(offset + value_idx) * bytes_per_value + byte_idx] =
3649 stream[start + value_idx];
3650 }
3651 }
3652 offset += block_len;
3653 }
3654
3655 let mut values_u32 = Vec::with_capacity(value_count);
3656 for idx in 0..value_count {
3657 let bits = u32::from_le_bytes(
3658 raw_bytes[idx * bytes_per_value..(idx + 1) * bytes_per_value]
3659 .try_into()
3660 .unwrap(),
3661 );
3662 values_u32.push(bits);
3663 }
3664 if cfg.delta_xor {
3665 inv_delta_xor_u32(&mut values_u32);
3666 }
3667 let mut values = Vec::with_capacity(value_count);
3668 for (idx, mut bits) in values_u32.into_iter().enumerate() {
3669 if cfg.sign_split && (encoded.sign_bytes[idx / 8] >> (idx % 8)) & 1 != 0 {
3670 bits |= 0x8000_0000;
3671 }
3672 values.push(f32::from_bits(bits));
3673 }
3674 timings.decode_ms += start.elapsed().as_secs_f64() * 1e3;
3675 values
3676 }
3677
3678 for cfg in configs {
3679 let mut timings = Timings::default();
3680 let encoded = encode_variant(&values, cfg, &mut timings);
3681 let decoded = decode_variant(&encoded, cfg, values.len(), &mut timings);
3682 assert_eq!(decoded, values, "roundtrip failed for {}", cfg.name);
3683
3684 let encoded_len = encoded.payload_concat.len() as f32;
3685 #[cfg(feature = "compression-lz4")]
3686 let compressed_len_outer = lz4::block::compress(&encoded.payload_concat, None, false)
3687 .expect("lz4 compress")
3688 .len() as f32;
3689 #[cfg(not(feature = "compression-lz4"))]
3690 let compressed_len_outer = 0f32;
3691 #[cfg(feature = "compression-zstd")]
3692 let compressed_len_outer_zstd =
3693 zstd::stream::encode_all(std::io::Cursor::new(&encoded.payload_concat), 3)
3694 .expect("zstd compress")
3695 .len() as f32;
3696 #[cfg(not(feature = "compression-zstd"))]
3697 let compressed_len_outer_zstd = 0f32;
3698 #[cfg(feature = "compression-lz4")]
3699 let compressed_len_per_stream: usize = if cfg.per_stream_lz4 {
3700 encoded.sign_bytes.len()
3701 + encoded
3702 .streams
3703 .iter()
3704 .map(|s| {
3705 lz4::block::compress(
3706 s,
3707 Some(lz4::block::CompressionMode::HIGHCOMPRESSION(12)),
3708 false,
3709 )
3710 .map(|c| c.len())
3711 .unwrap_or_else(|_| s.len())
3712 })
3713 .sum::<usize>()
3714 } else {
3715 0
3716 };
3717 #[cfg(not(feature = "compression-lz4"))]
3718 let compressed_len_per_stream: usize = 0;
3719 #[cfg(feature = "compression-zstd")]
3720 let compressed_len_per_stream_zstd: usize = if cfg.per_stream_zstd {
3721 encoded.sign_bytes.len()
3722 + encoded
3723 .streams
3724 .iter()
3725 .map(|s| {
3726 zstd::stream::encode_all(std::io::Cursor::new(s), 9)
3727 .map(|c| c.len())
3728 .unwrap_or_else(|_| s.len())
3729 })
3730 .sum::<usize>()
3731 } else {
3732 0
3733 };
3734 #[cfg(not(feature = "compression-zstd"))]
3735 let compressed_len_per_stream_zstd: usize = 0;
3736
3737 println!(
3738 "variant={} encoded_ratio={:.3} lz4_outer_ratio={:.3}{}{}{} encode_ms={:.3} decode_ms={:.3}",
3739 cfg.name,
3740 encoded_len / raw_len,
3741 compressed_len_outer / raw_len,
3742 if cfg.per_stream_lz4 {
3743 format!(
3744 " lz4_per_stream_ratio={:.3}",
3745 compressed_len_per_stream as f32 / raw_len
3746 )
3747 } else {
3748 "".to_string()
3749 },
3750 if cfg.outer_zstd {
3751 format!(
3752 " zstd_outer_ratio={:.3}",
3753 compressed_len_outer_zstd / raw_len
3754 )
3755 } else {
3756 "".to_string()
3757 },
3758 if cfg.per_stream_zstd {
3759 format!(
3760 " zstd_per_stream_ratio={:.3}",
3761 compressed_len_per_stream_zstd as f32 / raw_len
3762 )
3763 } else {
3764 "".to_string()
3765 },
3766 timings.encode_ms,
3767 timings.decode_ms,
3768 );
3769 }
3770
3771 #[cfg(all(feature = "compression-lz4", feature = "compression-zstd"))]
3772 #[allow(dead_code)]
3773 fn reencode_with_flag(encoded: &[u8], flag: u8, value_count: usize) -> Vec<u8> {
3774 let count = u32::from_le_bytes(encoded[0..4].try_into().unwrap()) as usize;
3776 assert_eq!(count, value_count);
3777 let header = encoded[4];
3778 assert!((header & BYTE_STREAM_SPLIT_V1_FLAG) != 0);
3779 let has_bitmap = encoded[5] != 0;
3780 let mut pos = 6;
3781 if has_bitmap {
3782 let bm = Bitmap::from_bytes(&encoded[pos..]).unwrap();
3783 pos += 4 + bm.len().div_ceil(8);
3784 }
3785 let stream_count_byte = encoded[pos];
3786 pos += 1;
3787 let has_sign = (stream_count_byte & BYTE_STREAM_SPLIT_SIGN_FLAG) != 0;
3788 let stream_count = (stream_count_byte & BYTE_STREAM_SPLIT_STREAM_COUNT_MASK) as usize;
3789 let mut sign_bytes = Vec::new();
3790 if has_sign {
3791 let bm = Bitmap::from_bytes(&encoded[pos..]).unwrap();
3792 pos += 4 + bm.len().div_ceil(8);
3793 sign_bytes = bm.to_bytes();
3794 }
3795
3796 let mut raw_streams: Vec<Vec<u8>> = Vec::with_capacity(stream_count);
3797 for _ in 0..stream_count {
3798 let orig_len =
3799 u32::from_le_bytes(encoded[pos + 1..pos + 5].try_into().unwrap()) as usize;
3800 let payload_len =
3801 u32::from_le_bytes(encoded[pos + 5..pos + 9].try_into().unwrap()) as usize;
3802 let flag_orig = encoded[pos];
3803 pos += 9;
3804 let payload = &encoded[pos..pos + payload_len];
3805 pos += payload_len;
3806 let stream = match flag_orig {
3807 1 => decompress_lz4(payload, orig_len),
3808 2 => decompress_zstd(payload),
3809 _ => payload.to_vec(),
3810 };
3811 assert_eq!(stream.len(), orig_len);
3812 raw_streams.push(stream);
3813 }
3814
3815 let mut buf = Vec::new();
3817 buf.extend_from_slice(&(count as u32).to_le_bytes());
3818 buf.push(header);
3819 buf.push(if has_bitmap { 1 } else { 0 });
3820 if has_bitmap {
3821 let bm = Bitmap::from_bytes(&encoded[6..]).unwrap();
3824 buf.extend_from_slice(&bm.to_bytes());
3825 }
3826 buf.push(stream_count_byte);
3827 if has_sign {
3828 buf.extend_from_slice(&sign_bytes);
3829 }
3830 for stream in raw_streams {
3831 let orig_len = stream.len() as u32;
3832 let (flag_set, payload) = match flag {
3833 1 => {
3834 let lz = lz4::block::compress(
3835 &stream,
3836 Some(lz4::block::CompressionMode::HIGHCOMPRESSION(12)),
3837 false,
3838 )
3839 .unwrap();
3840 if lz.len() < stream.len() {
3841 (1u8, lz)
3842 } else {
3843 (0u8, stream.clone())
3844 }
3845 }
3846 2 => {
3847 let zs =
3848 zstd::stream::encode_all(std::io::Cursor::new(&stream), 15).unwrap();
3849 if zs.len() < stream.len() {
3850 (2u8, zs)
3851 } else {
3852 (0u8, stream.clone())
3853 }
3854 }
3855 _ => (0u8, stream.clone()),
3856 };
3857 buf.push(flag_set);
3858 buf.extend_from_slice(&orig_len.to_le_bytes());
3859 buf.extend_from_slice(&(payload.len() as u32).to_le_bytes());
3860 buf.extend_from_slice(&payload);
3861 }
3862 buf
3863 }
3864
3865 #[cfg(all(feature = "compression-lz4", feature = "compression-zstd"))]
3866 #[allow(dead_code)]
3867 fn _test_byte_stream_split_stream_flag_integrity() {
3868 let values: Vec<f32> = (0..1024).map(|i| (i as f32).sin()).collect();
3869 let col = Column::Float32(values.clone());
3870 let encoder = ByteStreamSplitEncoder;
3871 let encoded = encoder.encode(&col, None).unwrap();
3872
3873 let raw_variant = reencode_with_flag(&encoded, 0, values.len());
3875 let decoder = ByteStreamSplitDecoder;
3876 let (decoded_raw, _) = decoder
3877 .decode(&raw_variant, values.len(), LogicalType::Float32)
3878 .unwrap();
3879 assert_eq!(decoded_raw, Column::Float32(values.clone()));
3880
3881 let lz4_variant = reencode_with_flag(&encoded, 1, values.len());
3883 let (decoded_lz4, _) = decoder
3884 .decode(&lz4_variant, values.len(), LogicalType::Float32)
3885 .unwrap();
3886 assert_eq!(decoded_lz4, Column::Float32(values.clone()));
3887
3888 let zstd_variant = reencode_with_flag(&encoded, 2, values.len());
3890 let (decoded_zstd, _) = decoder
3891 .decode(&zstd_variant, values.len(), LogicalType::Float32)
3892 .unwrap();
3893 assert_eq!(decoded_zstd, Column::Float32(values));
3894 }
3895 }
3896
3897 #[test]
3898 fn test_byte_stream_split_legacy_layout_decode() {
3899 let values = vec![1.0f32, -0.5, 3.25, 0.0, std::f32::consts::PI];
3900 let count = values.len();
3901
3902 let mut buf = Vec::new();
3904 buf.extend_from_slice(&(count as u32).to_le_bytes());
3905 buf.push(4u8); buf.push(0u8); let raw_bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
3909 for byte_idx in 0..4 {
3910 for value_idx in 0..count {
3911 buf.push(raw_bytes[value_idx * 4 + byte_idx]);
3912 }
3913 }
3914
3915 let decoder = ByteStreamSplitDecoder;
3916 let (decoded, bitmap) = decoder.decode(&buf, count, LogicalType::Float32).unwrap();
3917
3918 assert!(bitmap.is_none());
3919 if let Column::Float32(decoded_values) = decoded {
3920 assert_eq!(decoded_values, values);
3921 } else {
3922 panic!("Expected Float32 column");
3923 }
3924 }
3925
3926 #[test]
3927 fn test_incremental_string_sorted() {
3928 let values: Vec<Vec<u8>> = vec![
3929 b"apple".to_vec(),
3930 b"application".to_vec(),
3931 b"apply".to_vec(),
3932 b"banana".to_vec(),
3933 b"bandana".to_vec(),
3934 ];
3935 let col = Column::Binary(values.clone());
3936
3937 let encoder = IncrementalStringEncoder;
3938 let encoded = encoder.encode(&col, None).unwrap();
3939
3940 let decoder = IncrementalStringDecoder;
3941 let (decoded, bitmap) = decoder
3942 .decode(&encoded, values.len(), LogicalType::Binary)
3943 .unwrap();
3944
3945 assert!(bitmap.is_none());
3946 if let Column::Binary(decoded_values) = decoded {
3947 assert_eq!(decoded_values, values);
3948 } else {
3949 panic!("Expected Binary column");
3950 }
3951 }
3952
3953 #[test]
3954 fn test_encoding_fallback_to_plain() {
3955 let hints = EncodingHints {
3957 is_sorted: false,
3958 distinct_count: 1000,
3959 total_count: 1000,
3960 value_range: Some(u64::MAX), in_range_ratio: None,
3962 };
3963
3964 let encoding = select_encoding(LogicalType::Int64, &hints);
3965 assert_eq!(encoding, EncodingV2::Plain);
3966 }
3967
3968 #[test]
3969 fn test_bitmap_operations() {
3970 let bitmap = Bitmap::from_bools(&[true, false, true, true, false]);
3971 assert!(bitmap.is_valid(0));
3972 assert!(!bitmap.is_valid(1));
3973 assert!(bitmap.is_valid(2));
3974 assert!(bitmap.is_valid(3));
3975 assert!(!bitmap.is_valid(4));
3976 assert_eq!(bitmap.null_count(), 2);
3977 assert_eq!(bitmap.len(), 5);
3978
3979 let bytes = bitmap.to_bytes();
3981 let restored = Bitmap::from_bytes(&bytes).unwrap();
3982 assert_eq!(restored.len(), bitmap.len());
3983 for i in 0..bitmap.len() {
3984 assert_eq!(restored.is_valid(i), bitmap.is_valid(i));
3985 }
3986 }
3987
3988 #[test]
3989 fn test_delta_with_bitmap() {
3990 let values = vec![100i64, 105, 110, 115, 120];
3991 let bitmap = Bitmap::from_bools(&[true, false, true, true, false]);
3992 let col = Column::Int64(values.clone());
3993
3994 let encoder = DeltaEncoder;
3995 let encoded = encoder.encode(&col, Some(&bitmap)).unwrap();
3996
3997 let decoder = DeltaDecoder;
3998 let (decoded, decoded_bitmap) = decoder
3999 .decode(&encoded, values.len(), LogicalType::Int64)
4000 .unwrap();
4001
4002 assert!(decoded_bitmap.is_some());
4003 let decoded_bitmap = decoded_bitmap.unwrap();
4004 assert_eq!(decoded_bitmap.null_count(), 2);
4005
4006 if let Column::Int64(decoded_values) = decoded {
4007 assert_eq!(decoded_values, values);
4008 } else {
4009 panic!("Expected Int64 column");
4010 }
4011 }
4012
4013 #[test]
4014 fn test_zigzag_encoding() {
4015 assert_eq!(zigzag_encode(0), 0);
4016 assert_eq!(zigzag_encode(-1), 1);
4017 assert_eq!(zigzag_encode(1), 2);
4018 assert_eq!(zigzag_encode(-2), 3);
4019 assert_eq!(zigzag_encode(2), 4);
4020
4021 for n in [-1000i64, -1, 0, 1, 1000, i64::MIN, i64::MAX] {
4022 assert_eq!(zigzag_decode(zigzag_encode(n)), n);
4023 }
4024 }
4025
4026 #[test]
4027 fn test_varint_encoding() {
4028 let mut buf = Vec::new();
4029 encode_varint(300, &mut buf);
4030 let (decoded, bytes_read) = decode_varint(&buf).unwrap();
4031 assert_eq!(decoded, 300);
4032 assert_eq!(bytes_read, 2);
4033
4034 buf.clear();
4035 encode_varint(0, &mut buf);
4036 let (decoded, bytes_read) = decode_varint(&buf).unwrap();
4037 assert_eq!(decoded, 0);
4038 assert_eq!(bytes_read, 1);
4039 }
4040
4041 #[test]
4042 fn test_select_encoding_sorted_int() {
4043 let hints = EncodingHints {
4044 is_sorted: true,
4045 distinct_count: 100,
4046 total_count: 100,
4047 value_range: Some(100),
4048 in_range_ratio: None,
4049 };
4050 assert_eq!(
4051 select_encoding(LogicalType::Int64, &hints),
4052 EncodingV2::Delta
4053 );
4054 }
4055
4056 #[test]
4057 fn test_select_encoding_small_range() {
4058 let hints = EncodingHints {
4059 is_sorted: false,
4060 distinct_count: 100,
4061 total_count: 100,
4062 value_range: Some(255), in_range_ratio: Some(1.0),
4064 };
4065 assert_eq!(select_encoding(LogicalType::Int64, &hints), EncodingV2::FOR);
4066 }
4067
4068 #[test]
4069 fn test_select_encoding_float() {
4070 let hints = EncodingHints::default();
4071 assert_eq!(
4072 select_encoding(LogicalType::Float64, &hints),
4073 EncodingV2::ByteStreamSplit
4074 );
4075 assert_eq!(
4076 select_encoding(LogicalType::Float32, &hints),
4077 EncodingV2::ByteStreamSplit
4078 );
4079 }
4080
4081 #[test]
4082 fn test_select_encoding_sorted_binary() {
4083 let hints = EncodingHints {
4084 is_sorted: true,
4085 distinct_count: 100,
4086 total_count: 100,
4087 value_range: None,
4088 in_range_ratio: None,
4089 };
4090 assert_eq!(
4091 select_encoding(LogicalType::Binary, &hints),
4092 EncodingV2::IncrementalString
4093 );
4094 }
4095
4096 #[test]
4101 fn test_rle_bool_roundtrip() {
4102 let values = vec![
4104 true, true, true, false, false, true, true, true, true, false,
4105 ];
4106 let col = Column::Bool(values.clone());
4107
4108 let encoder = RleEncoder;
4109 let encoded = encoder.encode(&col, None).unwrap();
4110
4111 let decoder = RleDecoder;
4112 let (decoded, bitmap) = decoder
4113 .decode(&encoded, values.len(), LogicalType::Bool)
4114 .unwrap();
4115
4116 assert!(bitmap.is_none());
4117 if let Column::Bool(decoded_values) = decoded {
4118 assert_eq!(decoded_values, values);
4119 } else {
4120 panic!("Expected Bool column");
4121 }
4122 }
4123
4124 #[test]
4125 fn test_rle_bool_roundtrip_with_bitmap() {
4126 let values = vec![true, true, false, false, true];
4127 let bitmap = Bitmap::from_bools(&[true, false, true, true, false]);
4128 let col = Column::Bool(values.clone());
4129
4130 let encoder = RleEncoder;
4131 let encoded = encoder.encode(&col, Some(&bitmap)).unwrap();
4132
4133 let decoder = RleDecoder;
4134 let (decoded, decoded_bitmap) = decoder
4135 .decode(&encoded, values.len(), LogicalType::Bool)
4136 .unwrap();
4137
4138 assert!(decoded_bitmap.is_some());
4139 let decoded_bitmap = decoded_bitmap.unwrap();
4140 assert_eq!(decoded_bitmap.null_count(), 2);
4141
4142 if let Column::Bool(decoded_values) = decoded {
4143 assert_eq!(decoded_values, values);
4144 } else {
4145 panic!("Expected Bool column");
4146 }
4147 }
4148
4149 #[test]
4150 fn test_rle_int64_roundtrip() {
4151 let values = vec![100i64, 100, 100, 200, 200, 100, 100, 100, 100, 300];
4153 let col = Column::Int64(values.clone());
4154
4155 let encoder = RleEncoder;
4156 let encoded = encoder.encode(&col, None).unwrap();
4157
4158 let decoder = RleDecoder;
4159 let (decoded, bitmap) = decoder
4160 .decode(&encoded, values.len(), LogicalType::Int64)
4161 .unwrap();
4162
4163 assert!(bitmap.is_none());
4164 if let Column::Int64(decoded_values) = decoded {
4165 assert_eq!(decoded_values, values);
4166 } else {
4167 panic!("Expected Int64 column");
4168 }
4169 }
4170
4171 #[test]
4172 fn test_rle_int64_roundtrip_with_bitmap() {
4173 let values = vec![100i64, 100, 200, 200, 100];
4174 let bitmap = Bitmap::from_bools(&[true, false, true, true, false]);
4175 let col = Column::Int64(values.clone());
4176
4177 let encoder = RleEncoder;
4178 let encoded = encoder.encode(&col, Some(&bitmap)).unwrap();
4179
4180 let decoder = RleDecoder;
4181 let (decoded, decoded_bitmap) = decoder
4182 .decode(&encoded, values.len(), LogicalType::Int64)
4183 .unwrap();
4184
4185 assert!(decoded_bitmap.is_some());
4186 let decoded_bitmap = decoded_bitmap.unwrap();
4187 assert_eq!(decoded_bitmap.null_count(), 2);
4188
4189 if let Column::Int64(decoded_values) = decoded {
4190 assert_eq!(decoded_values, values);
4191 } else {
4192 panic!("Expected Int64 column");
4193 }
4194 }
4195
4196 #[test]
4197 fn test_rle_empty() {
4198 let col = Column::Bool(vec![]);
4199
4200 let encoder = RleEncoder;
4201 let encoded = encoder.encode(&col, None).unwrap();
4202
4203 let decoder = RleDecoder;
4204 let (decoded, bitmap) = decoder.decode(&encoded, 0, LogicalType::Bool).unwrap();
4205
4206 assert!(bitmap.is_none());
4207 if let Column::Bool(decoded_values) = decoded {
4208 assert!(decoded_values.is_empty());
4209 } else {
4210 panic!("Expected Bool column");
4211 }
4212 }
4213
4214 #[test]
4219 fn test_dictionary_binary_roundtrip() {
4220 let values: Vec<Vec<u8>> = vec![
4222 b"apple".to_vec(),
4223 b"banana".to_vec(),
4224 b"apple".to_vec(),
4225 b"cherry".to_vec(),
4226 b"banana".to_vec(),
4227 b"apple".to_vec(),
4228 ];
4229 let col = Column::Binary(values.clone());
4230
4231 let encoder = DictionaryEncoder;
4232 let encoded = encoder.encode(&col, None).unwrap();
4233
4234 let decoder = DictionaryDecoder;
4235 let (decoded, bitmap) = decoder
4236 .decode(&encoded, values.len(), LogicalType::Binary)
4237 .unwrap();
4238
4239 assert!(bitmap.is_none());
4240 if let Column::Binary(decoded_values) = decoded {
4241 assert_eq!(decoded_values, values);
4242 } else {
4243 panic!("Expected Binary column");
4244 }
4245 }
4246
4247 #[test]
4248 fn test_dictionary_binary_roundtrip_with_bitmap() {
4249 let values: Vec<Vec<u8>> = vec![
4250 b"apple".to_vec(),
4251 b"banana".to_vec(),
4252 b"apple".to_vec(),
4253 b"cherry".to_vec(),
4254 b"banana".to_vec(),
4255 ];
4256 let bitmap = Bitmap::from_bools(&[true, false, true, true, false]);
4257 let col = Column::Binary(values.clone());
4258
4259 let encoder = DictionaryEncoder;
4260 let encoded = encoder.encode(&col, Some(&bitmap)).unwrap();
4261
4262 let decoder = DictionaryDecoder;
4263 let (decoded, decoded_bitmap) = decoder
4264 .decode(&encoded, values.len(), LogicalType::Binary)
4265 .unwrap();
4266
4267 assert!(decoded_bitmap.is_some());
4268 let decoded_bitmap = decoded_bitmap.unwrap();
4269 assert_eq!(decoded_bitmap.null_count(), 2);
4270
4271 if let Column::Binary(decoded_values) = decoded {
4272 assert_eq!(decoded_values, values);
4273 } else {
4274 panic!("Expected Binary column");
4275 }
4276 }
4277
4278 #[test]
4279 fn test_dictionary_fixed_roundtrip() {
4280 let values: Vec<Vec<u8>> = vec![
4282 vec![1, 2, 3, 4],
4283 vec![5, 6, 7, 8],
4284 vec![1, 2, 3, 4],
4285 vec![9, 10, 11, 12],
4286 vec![5, 6, 7, 8],
4287 ];
4288 let col = Column::Fixed {
4289 len: 4,
4290 values: values.clone(),
4291 };
4292
4293 let encoder = DictionaryEncoder;
4294 let encoded = encoder.encode(&col, None).unwrap();
4295
4296 let decoder = DictionaryDecoder;
4297 let (decoded, bitmap) = decoder
4298 .decode(&encoded, values.len(), LogicalType::Fixed(4))
4299 .unwrap();
4300
4301 assert!(bitmap.is_none());
4302 if let Column::Fixed {
4303 len,
4304 values: decoded_values,
4305 } = decoded
4306 {
4307 assert_eq!(len, 4);
4308 assert_eq!(decoded_values, values);
4309 } else {
4310 panic!("Expected Fixed column");
4311 }
4312 }
4313
4314 #[test]
4315 fn test_dictionary_empty() {
4316 let col = Column::Binary(vec![]);
4317
4318 let encoder = DictionaryEncoder;
4319 let encoded = encoder.encode(&col, None).unwrap();
4320
4321 let decoder = DictionaryDecoder;
4322 let (decoded, bitmap) = decoder.decode(&encoded, 0, LogicalType::Binary).unwrap();
4323
4324 assert!(bitmap.is_none());
4325 if let Column::Binary(decoded_values) = decoded {
4326 assert!(decoded_values.is_empty());
4327 } else {
4328 panic!("Expected Binary column");
4329 }
4330 }
4331
4332 #[test]
4337 fn test_bitpack_bool_roundtrip() {
4338 let values = vec![
4339 true, false, true, true, false, true, false, false, true, true,
4340 ];
4341 let col = Column::Bool(values.clone());
4342
4343 let encoder = BitpackEncoder;
4344 let encoded = encoder.encode(&col, None).unwrap();
4345
4346 assert!(encoded.len() < 10);
4349
4350 let decoder = BitpackDecoder;
4351 let (decoded, bitmap) = decoder
4352 .decode(&encoded, values.len(), LogicalType::Bool)
4353 .unwrap();
4354
4355 assert!(bitmap.is_none());
4356 if let Column::Bool(decoded_values) = decoded {
4357 assert_eq!(decoded_values, values);
4358 } else {
4359 panic!("Expected Bool column");
4360 }
4361 }
4362
4363 #[test]
4364 fn test_bitpack_bool_roundtrip_with_bitmap() {
4365 let values = vec![true, false, true, true, false];
4366 let bitmap = Bitmap::from_bools(&[true, false, true, true, false]);
4367 let col = Column::Bool(values.clone());
4368
4369 let encoder = BitpackEncoder;
4370 let encoded = encoder.encode(&col, Some(&bitmap)).unwrap();
4371
4372 let decoder = BitpackDecoder;
4373 let (decoded, decoded_bitmap) = decoder
4374 .decode(&encoded, values.len(), LogicalType::Bool)
4375 .unwrap();
4376
4377 assert!(decoded_bitmap.is_some());
4378 let decoded_bitmap = decoded_bitmap.unwrap();
4379 assert_eq!(decoded_bitmap.null_count(), 2);
4380
4381 if let Column::Bool(decoded_values) = decoded {
4382 assert_eq!(decoded_values, values);
4383 } else {
4384 panic!("Expected Bool column");
4385 }
4386 }
4387
4388 #[test]
4389 fn test_bitpack_empty() {
4390 let col = Column::Bool(vec![]);
4391
4392 let encoder = BitpackEncoder;
4393 let encoded = encoder.encode(&col, None).unwrap();
4394
4395 let decoder = BitpackDecoder;
4396 let (decoded, bitmap) = decoder.decode(&encoded, 0, LogicalType::Bool).unwrap();
4397
4398 assert!(bitmap.is_none());
4399 if let Column::Bool(decoded_values) = decoded {
4400 assert!(decoded_values.is_empty());
4401 } else {
4402 panic!("Expected Bool column");
4403 }
4404 }
4405
4406 #[test]
4407 fn test_bitpack_large() {
4408 let values: Vec<bool> = (0..100).map(|i| i % 3 == 0).collect();
4410 let col = Column::Bool(values.clone());
4411
4412 let encoder = BitpackEncoder;
4413 let encoded = encoder.encode(&col, None).unwrap();
4414
4415 assert_eq!(encoded.len(), 5 + 13);
4418
4419 let decoder = BitpackDecoder;
4420 let (decoded, _) = decoder
4421 .decode(&encoded, values.len(), LogicalType::Bool)
4422 .unwrap();
4423
4424 if let Column::Bool(decoded_values) = decoded {
4425 assert_eq!(decoded_values, values);
4426 } else {
4427 panic!("Expected Bool column");
4428 }
4429 }
4430
4431 #[test]
4436 fn test_select_encoding_bool_bitpack() {
4437 let hints = EncodingHints {
4439 is_sorted: false,
4440 distinct_count: 2,
4441 total_count: 6, value_range: None,
4443 in_range_ratio: None,
4444 };
4445 assert_eq!(
4446 select_encoding(LogicalType::Bool, &hints),
4447 EncodingV2::Bitpack
4448 );
4449 }
4450
4451 #[test]
4452 fn test_select_encoding_bool_rle() {
4453 let hints = EncodingHints {
4455 is_sorted: false,
4456 distinct_count: 2,
4457 total_count: 100, value_range: None,
4459 in_range_ratio: None,
4460 };
4461 assert_eq!(select_encoding(LogicalType::Bool, &hints), EncodingV2::Rle);
4462 }
4463
4464 #[test]
4465 fn test_select_encoding_int64_rle() {
4466 let hints = EncodingHints {
4468 is_sorted: false,
4469 distinct_count: 5,
4470 total_count: 100, value_range: Some(u64::MAX), in_range_ratio: None,
4473 };
4474 assert_eq!(select_encoding(LogicalType::Int64, &hints), EncodingV2::Rle);
4475 }
4476
4477 #[test]
4478 fn test_select_encoding_binary_dictionary() {
4479 let hints = EncodingHints {
4481 is_sorted: false,
4482 distinct_count: 10,
4483 total_count: 100, value_range: None,
4485 in_range_ratio: None,
4486 };
4487 assert_eq!(
4488 select_encoding(LogicalType::Binary, &hints),
4489 EncodingV2::Dictionary
4490 );
4491 }
4492
4493 #[test]
4494 fn test_select_encoding_binary_plain() {
4495 let hints = EncodingHints {
4497 is_sorted: false,
4498 distinct_count: 80,
4499 total_count: 100, value_range: None,
4501 in_range_ratio: None,
4502 };
4503 assert_eq!(
4504 select_encoding(LogicalType::Binary, &hints),
4505 EncodingV2::Plain
4506 );
4507 }
4508
4509 #[test]
4514 fn test_create_encoder_rle() {
4515 let encoder = create_encoder(EncodingV2::Rle);
4516 assert_eq!(encoder.encoding_type(), EncodingV2::Rle);
4517 }
4518
4519 #[test]
4520 fn test_create_encoder_dictionary() {
4521 let encoder = create_encoder(EncodingV2::Dictionary);
4522 assert_eq!(encoder.encoding_type(), EncodingV2::Dictionary);
4523 }
4524
4525 #[test]
4526 fn test_create_encoder_bitpack() {
4527 let encoder = create_encoder(EncodingV2::Bitpack);
4528 assert_eq!(encoder.encoding_type(), EncodingV2::Bitpack);
4529 }
4530
4531 #[test]
4532 fn test_create_decoder_roundtrip_via_factory() {
4533 let values = vec![true, true, true, false, false];
4535 let col = Column::Bool(values.clone());
4536
4537 let encoder = create_encoder(EncodingV2::Rle);
4538 let encoded = encoder.encode(&col, None).unwrap();
4539
4540 let decoder = create_decoder(EncodingV2::Rle);
4541 let (decoded, _) = decoder
4542 .decode(&encoded, values.len(), LogicalType::Bool)
4543 .unwrap();
4544
4545 if let Column::Bool(decoded_values) = decoded {
4546 assert_eq!(decoded_values, values);
4547 } else {
4548 panic!("Expected Bool column");
4549 }
4550 }
4551}