alopex_core/columnar/
encoding_v2.rs

1//! V2 Encoding algorithms for columnar storage.
2//!
3//! Extends the base encoding with advanced algorithms:
4//! - Delta encoding for sorted integers
5//! - Frame of Reference (FOR) for small-range integers
6//! - Patched FOR (PFOR) for integers with outliers
7//! - Byte Stream Split for floating point
8//! - Incremental String for sorted strings
9
10use std::convert::TryInto;
11
12use crate::columnar::error::{ColumnarError, Result};
13use serde::{Deserialize, Serialize};
14
15use super::encoding::{Column, LogicalType};
16
17/// V2 Encoding strategy for a column.
18#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
19pub enum EncodingV2 {
20    /// Raw values (V1 compatible).
21    Plain,
22    /// Dictionary encoding with indexes (V1 compatible).
23    Dictionary,
24    /// Run-length encoding (V1 compatible).
25    Rle,
26    /// Bit-packed representation for bools (V1 compatible).
27    Bitpack,
28    /// Delta encoding for sorted integers.
29    Delta,
30    /// Delta-length encoding for variable-length data.
31    DeltaLength,
32    /// Byte stream split for floating point values.
33    ByteStreamSplit,
34    /// Frame of Reference encoding for small-range integers.
35    FOR,
36    /// Patched Frame of Reference for integers with outliers.
37    PFOR,
38    /// Incremental string encoding for sorted strings.
39    IncrementalString,
40}
41
42/// Null bitmap for nullable columns.
43#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
44pub struct Bitmap {
45    /// Bit vector where 1 = valid, 0 = null.
46    bits: Vec<u8>,
47    /// Number of values.
48    len: usize,
49}
50
51impl Bitmap {
52    /// Create a new bitmap with all values valid.
53    pub fn all_valid(len: usize) -> Self {
54        let num_bytes = len.div_ceil(8);
55        Self {
56            bits: vec![0xFF; num_bytes],
57            len,
58        }
59    }
60
61    /// Create a new bitmap from a boolean slice.
62    pub fn from_bools(valid: &[bool]) -> Self {
63        let len = valid.len();
64        let num_bytes = len.div_ceil(8);
65        let mut bits = vec![0u8; num_bytes];
66        for (i, &v) in valid.iter().enumerate() {
67            if v {
68                bits[i / 8] |= 1 << (i % 8);
69            }
70        }
71        Self { bits, len }
72    }
73
74    /// Create a new bitmap with all values set to invalid (null).
75    pub fn new(len: usize) -> Self {
76        Self::new_zeroed(len)
77    }
78
79    /// Create a new bitmap with all bits cleared.
80    pub fn new_zeroed(len: usize) -> Self {
81        let num_bytes = len.div_ceil(8);
82        Self {
83            bits: vec![0u8; num_bytes],
84            len,
85        }
86    }
87
88    /// Check if value at index is valid (not null).
89    pub fn is_valid(&self, index: usize) -> bool {
90        if index >= self.len {
91            return false;
92        }
93        (self.bits[index / 8] & (1 << (index % 8))) != 0
94    }
95
96    /// Alias for is_valid - get the validity at index.
97    pub fn get(&self, index: usize) -> bool {
98        self.is_valid(index)
99    }
100
101    /// Set the validity at index.
102    pub fn set(&mut self, index: usize, valid: bool) {
103        if index >= self.len {
104            return;
105        }
106        if valid {
107            self.bits[index / 8] |= 1 << (index % 8);
108        } else {
109            self.bits[index / 8] &= !(1 << (index % 8));
110        }
111    }
112
113    /// Get the number of null values.
114    pub fn null_count(&self) -> usize {
115        self.len
116            - self
117                .bits
118                .iter()
119                .map(|b| b.count_ones() as usize)
120                .sum::<usize>()
121    }
122
123    /// Encode bitmap to bytes.
124    pub fn to_bytes(&self) -> Vec<u8> {
125        let mut buf = Vec::with_capacity(4 + self.bits.len());
126        buf.extend_from_slice(&(self.len as u32).to_le_bytes());
127        buf.extend_from_slice(&self.bits);
128        buf
129    }
130
131    /// Decode bitmap from bytes.
132    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
133        if bytes.len() < 4 {
134            return Err(ColumnarError::InvalidFormat(
135                "bitmap header too short".into(),
136            ));
137        }
138        let len = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
139        let num_bytes = len.div_ceil(8);
140        if bytes.len() < 4 + num_bytes {
141            return Err(ColumnarError::InvalidFormat("bitmap data truncated".into()));
142        }
143        Ok(Self {
144            bits: bytes[4..4 + num_bytes].to_vec(),
145            len,
146        })
147    }
148
149    /// Get the length.
150    pub fn len(&self) -> usize {
151        self.len
152    }
153
154    /// Check if empty.
155    pub fn is_empty(&self) -> bool {
156        self.len == 0
157    }
158}
159
160/// Encoder trait for V2 encodings.
161pub trait Encoder: Send + Sync {
162    /// Encode column data with optional null bitmap.
163    fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>>;
164    /// Get the encoding type.
165    fn encoding_type(&self) -> EncodingV2;
166}
167
168/// Decoder trait for V2 encodings.
169pub trait Decoder: Send + Sync {
170    /// Decode bytes to column data with optional null bitmap.
171    fn decode(
172        &self,
173        data: &[u8],
174        num_values: usize,
175        logical_type: LogicalType,
176    ) -> Result<(Column, Option<Bitmap>)>;
177}
178
179// ============================================================================
180// Delta Encoding - for sorted integers
181// ============================================================================
182
183/// Delta encoder for sorted integer sequences.
184///
185/// Stores first value + deltas between consecutive values.
186/// Efficient for monotonically increasing sequences (timestamps, IDs).
187pub struct DeltaEncoder;
188
189impl Encoder for DeltaEncoder {
190    fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
191        let values = match data {
192            Column::Int64(v) => v,
193            _ => {
194                return Err(ColumnarError::InvalidFormat(
195                    "delta encoding requires Int64".into(),
196                ))
197            }
198        };
199
200        if values.is_empty() {
201            let mut buf = Vec::with_capacity(8);
202            buf.extend_from_slice(&0u32.to_le_bytes()); // count
203            buf.extend_from_slice(&0u8.to_le_bytes()); // has_bitmap
204            return Ok(buf);
205        }
206
207        let mut buf = Vec::new();
208        buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
209
210        // Write bitmap flag and data
211        if let Some(bitmap) = null_bitmap {
212            buf.push(1u8);
213            buf.extend_from_slice(&bitmap.to_bytes());
214        } else {
215            buf.push(0u8);
216        }
217
218        // First value
219        buf.extend_from_slice(&values[0].to_le_bytes());
220
221        // Deltas (variable-length zigzag encoded)
222        for window in values.windows(2) {
223            let delta = window[1].wrapping_sub(window[0]);
224            let zigzag = zigzag_encode(delta);
225            encode_varint(zigzag, &mut buf);
226        }
227
228        Ok(buf)
229    }
230
231    fn encoding_type(&self) -> EncodingV2 {
232        EncodingV2::Delta
233    }
234}
235
236/// Delta decoder.
237pub struct DeltaDecoder;
238
239impl Decoder for DeltaDecoder {
240    fn decode(
241        &self,
242        data: &[u8],
243        _num_values: usize,
244        logical_type: LogicalType,
245    ) -> Result<(Column, Option<Bitmap>)> {
246        if logical_type != LogicalType::Int64 {
247            return Err(ColumnarError::InvalidFormat(
248                "delta decoding requires Int64".into(),
249            ));
250        }
251
252        if data.len() < 5 {
253            return Err(ColumnarError::InvalidFormat(
254                "delta header too short".into(),
255            ));
256        }
257
258        let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
259        let has_bitmap = data[4] != 0;
260        let mut pos = 5;
261
262        let bitmap = if has_bitmap {
263            let bm = Bitmap::from_bytes(&data[pos..])?;
264            pos += 4 + bm.len().div_ceil(8);
265            Some(bm)
266        } else {
267            None
268        };
269
270        if count == 0 {
271            return Ok((Column::Int64(vec![]), bitmap));
272        }
273
274        if pos + 8 > data.len() {
275            return Err(ColumnarError::InvalidFormat(
276                "delta first value truncated".into(),
277            ));
278        }
279
280        let first = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
281        pos += 8;
282
283        let mut values = Vec::with_capacity(count);
284        values.push(first);
285
286        let mut current = first;
287        for _ in 1..count {
288            let (zigzag, bytes_read) = decode_varint(&data[pos..])?;
289            pos += bytes_read;
290            let delta = zigzag_decode(zigzag);
291            current = current.wrapping_add(delta);
292            values.push(current);
293        }
294
295        Ok((Column::Int64(values), bitmap))
296    }
297}
298
299// ============================================================================
300// FOR (Frame of Reference) Encoding - for small-range integers
301// ============================================================================
302
303/// Frame of Reference encoder.
304///
305/// Stores min value + bit-packed offsets from min.
306/// Efficient when all values fit in a small range.
307pub struct ForEncoder;
308
309impl Encoder for ForEncoder {
310    fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
311        let values = match data {
312            Column::Int64(v) => v,
313            _ => {
314                return Err(ColumnarError::InvalidFormat(
315                    "FOR encoding requires Int64".into(),
316                ))
317            }
318        };
319
320        if values.is_empty() {
321            let mut buf = Vec::with_capacity(5);
322            buf.extend_from_slice(&0u32.to_le_bytes());
323            buf.push(0u8); // has_bitmap
324            return Ok(buf);
325        }
326
327        let min_val = *values.iter().min().unwrap();
328        let max_val = *values.iter().max().unwrap();
329        let range = (max_val - min_val) as u64;
330
331        // Calculate bits needed
332        let bits_needed = if range == 0 {
333            1
334        } else {
335            64 - range.leading_zeros()
336        } as u8;
337
338        let mut buf = Vec::new();
339        buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
340
341        // Write bitmap flag and data
342        if let Some(bitmap) = null_bitmap {
343            buf.push(1u8);
344            buf.extend_from_slice(&bitmap.to_bytes());
345        } else {
346            buf.push(0u8);
347        }
348
349        // Reference value and bit width
350        buf.extend_from_slice(&min_val.to_le_bytes());
351        buf.push(bits_needed);
352
353        // Bit-pack the offsets
354        let mut bit_buffer = 0u64;
355        let mut bits_in_buffer = 0u8;
356
357        for &v in values {
358            let offset = (v - min_val) as u64;
359            bit_buffer |= offset << bits_in_buffer;
360            bits_in_buffer += bits_needed;
361
362            while bits_in_buffer >= 8 {
363                buf.push(bit_buffer as u8);
364                bit_buffer >>= 8;
365                bits_in_buffer -= 8;
366            }
367        }
368
369        // Flush remaining bits
370        if bits_in_buffer > 0 {
371            buf.push(bit_buffer as u8);
372        }
373
374        Ok(buf)
375    }
376
377    fn encoding_type(&self) -> EncodingV2 {
378        EncodingV2::FOR
379    }
380}
381
382/// Frame of Reference decoder.
383pub struct ForDecoder;
384
385impl Decoder for ForDecoder {
386    fn decode(
387        &self,
388        data: &[u8],
389        _num_values: usize,
390        logical_type: LogicalType,
391    ) -> Result<(Column, Option<Bitmap>)> {
392        if logical_type != LogicalType::Int64 {
393            return Err(ColumnarError::InvalidFormat(
394                "FOR decoding requires Int64".into(),
395            ));
396        }
397
398        if data.len() < 5 {
399            return Err(ColumnarError::InvalidFormat("FOR header too short".into()));
400        }
401
402        let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
403        let has_bitmap = data[4] != 0;
404        let mut pos = 5;
405
406        let bitmap = if has_bitmap {
407            let bm = Bitmap::from_bytes(&data[pos..])?;
408            pos += 4 + bm.len().div_ceil(8);
409            Some(bm)
410        } else {
411            None
412        };
413
414        if count == 0 {
415            return Ok((Column::Int64(vec![]), bitmap));
416        }
417
418        if pos + 9 > data.len() {
419            return Err(ColumnarError::InvalidFormat(
420                "FOR reference truncated".into(),
421            ));
422        }
423
424        let reference = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
425        pos += 8;
426        let bits_per_value = data[pos];
427        pos += 1;
428
429        let mut values = Vec::with_capacity(count);
430        let mask = if bits_per_value >= 64 {
431            u64::MAX
432        } else {
433            (1u64 << bits_per_value) - 1
434        };
435
436        let mut bit_buffer = 0u64;
437        let mut bits_in_buffer = 0u8;
438        let mut byte_pos = pos;
439
440        for _ in 0..count {
441            while bits_in_buffer < bits_per_value && byte_pos < data.len() {
442                bit_buffer |= (data[byte_pos] as u64) << bits_in_buffer;
443                bits_in_buffer += 8;
444                byte_pos += 1;
445            }
446
447            // Check for truncated data before subtraction to prevent underflow
448            if bits_in_buffer < bits_per_value {
449                return Err(ColumnarError::InvalidFormat("FOR data truncated".into()));
450            }
451
452            let offset = bit_buffer & mask;
453            bit_buffer >>= bits_per_value;
454            bits_in_buffer -= bits_per_value;
455
456            values.push(reference + offset as i64);
457        }
458
459        Ok((Column::Int64(values), bitmap))
460    }
461}
462
463// ============================================================================
464// PFOR (Patched Frame of Reference) - for integers with outliers
465// ============================================================================
466
467/// Patched FOR encoder.
468///
469/// Like FOR but handles outliers separately.
470/// Efficient when most values fit in small range with few exceptions.
471pub struct PforEncoder {
472    /// Percentile threshold for determining bit width (0.0-1.0).
473    pub percentile: f64,
474}
475
476impl Default for PforEncoder {
477    fn default() -> Self {
478        Self { percentile: 0.9 }
479    }
480}
481
482impl Encoder for PforEncoder {
483    fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
484        let values = match data {
485            Column::Int64(v) => v,
486            _ => {
487                return Err(ColumnarError::InvalidFormat(
488                    "PFOR encoding requires Int64".into(),
489                ))
490            }
491        };
492
493        if values.is_empty() {
494            let mut buf = Vec::with_capacity(5);
495            buf.extend_from_slice(&0u32.to_le_bytes());
496            buf.push(0u8); // has_bitmap
497            return Ok(buf);
498        }
499
500        let min_val = *values.iter().min().unwrap();
501
502        // Calculate offsets and find percentile-based max
503        let mut offsets: Vec<u64> = values.iter().map(|&v| (v - min_val) as u64).collect();
504        let mut sorted_offsets = offsets.clone();
505        sorted_offsets.sort_unstable();
506
507        let percentile_idx =
508            ((values.len() as f64 * self.percentile) as usize).min(values.len() - 1);
509        let percentile_max = sorted_offsets[percentile_idx];
510
511        let bits_needed = if percentile_max == 0 {
512            1
513        } else {
514            64 - percentile_max.leading_zeros()
515        } as u8;
516
517        let max_packed = if bits_needed >= 64 {
518            u64::MAX
519        } else {
520            (1u64 << bits_needed) - 1
521        };
522
523        // Find exceptions (values that don't fit)
524        let mut exceptions: Vec<(u32, u64)> = Vec::new();
525        for (i, offset) in offsets.iter_mut().enumerate() {
526            if *offset > max_packed {
527                exceptions.push((i as u32, *offset));
528                *offset = 0; // Will be patched
529            }
530        }
531
532        let mut buf = Vec::new();
533        buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
534
535        // Write bitmap flag and data
536        if let Some(bitmap) = null_bitmap {
537            buf.push(1u8);
538            buf.extend_from_slice(&bitmap.to_bytes());
539        } else {
540            buf.push(0u8);
541        }
542
543        // Reference, bit width, exception count
544        buf.extend_from_slice(&min_val.to_le_bytes());
545        buf.push(bits_needed);
546        buf.extend_from_slice(&(exceptions.len() as u32).to_le_bytes());
547
548        // Bit-pack main values
549        let mut bit_buffer = 0u64;
550        let mut bits_in_buffer = 0u8;
551
552        for &offset in &offsets {
553            bit_buffer |= (offset & max_packed) << bits_in_buffer;
554            bits_in_buffer += bits_needed;
555
556            while bits_in_buffer >= 8 {
557                buf.push(bit_buffer as u8);
558                bit_buffer >>= 8;
559                bits_in_buffer -= 8;
560            }
561        }
562
563        if bits_in_buffer > 0 {
564            buf.push(bit_buffer as u8);
565        }
566
567        // Write exceptions
568        for (idx, val) in exceptions {
569            buf.extend_from_slice(&idx.to_le_bytes());
570            buf.extend_from_slice(&val.to_le_bytes());
571        }
572
573        Ok(buf)
574    }
575
576    fn encoding_type(&self) -> EncodingV2 {
577        EncodingV2::PFOR
578    }
579}
580
581/// Patched FOR decoder.
582pub struct PforDecoder;
583
584impl Decoder for PforDecoder {
585    fn decode(
586        &self,
587        data: &[u8],
588        _num_values: usize,
589        logical_type: LogicalType,
590    ) -> Result<(Column, Option<Bitmap>)> {
591        if logical_type != LogicalType::Int64 {
592            return Err(ColumnarError::InvalidFormat(
593                "PFOR decoding requires Int64".into(),
594            ));
595        }
596
597        if data.len() < 5 {
598            return Err(ColumnarError::InvalidFormat("PFOR header too short".into()));
599        }
600
601        let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
602        let has_bitmap = data[4] != 0;
603        let mut pos = 5;
604
605        let bitmap = if has_bitmap {
606            let bm = Bitmap::from_bytes(&data[pos..])?;
607            pos += 4 + bm.len().div_ceil(8);
608            Some(bm)
609        } else {
610            None
611        };
612
613        if count == 0 {
614            return Ok((Column::Int64(vec![]), bitmap));
615        }
616
617        if pos + 13 > data.len() {
618            return Err(ColumnarError::InvalidFormat("PFOR header truncated".into()));
619        }
620
621        let reference = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
622        pos += 8;
623        let bits_per_value = data[pos];
624        pos += 1;
625        let exception_count = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
626        pos += 4;
627
628        // Decode bit-packed values
629        let mut values = Vec::with_capacity(count);
630        let mask = if bits_per_value >= 64 {
631            u64::MAX
632        } else {
633            (1u64 << bits_per_value) - 1
634        };
635
636        let mut bit_buffer = 0u64;
637        let mut bits_in_buffer = 0u8;
638
639        for _ in 0..count {
640            while bits_in_buffer < bits_per_value && pos < data.len() {
641                bit_buffer |= (data[pos] as u64) << bits_in_buffer;
642                bits_in_buffer += 8;
643                pos += 1;
644            }
645
646            // Check for truncated data before subtraction to prevent underflow
647            if bits_in_buffer < bits_per_value {
648                return Err(ColumnarError::InvalidFormat("PFOR data truncated".into()));
649            }
650
651            let offset = bit_buffer & mask;
652            bit_buffer >>= bits_per_value;
653            bits_in_buffer -= bits_per_value;
654
655            values.push(reference + offset as i64);
656        }
657
658        // Align to byte boundary for exceptions
659        if bits_in_buffer > 0 && bits_in_buffer < 8 {
660            // Skip partial byte already consumed
661        }
662
663        // Apply exceptions
664        for _ in 0..exception_count {
665            if pos + 12 > data.len() {
666                return Err(ColumnarError::InvalidFormat(
667                    "PFOR exception truncated".into(),
668                ));
669            }
670            let idx = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
671            pos += 4;
672            let val = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
673            pos += 8;
674
675            if idx < values.len() {
676                values[idx] = reference + val as i64;
677            }
678        }
679
680        Ok((Column::Int64(values), bitmap))
681    }
682}
683
684// ============================================================================
685// ByteStreamSplit - for floating point
686// ============================================================================
687
688// Layout flag placed in the high bit of the bytes_per_value header.
689// V1 introduces block-based MSB-first streams to improve LZ4 compression ratio
690// while remaining backward-compatible with the legacy layout.
691const BYTE_STREAM_SPLIT_V1_FLAG: u8 = 0x80;
692const BYTE_STREAM_SPLIT_HEADER_MASK: u8 = 0x7F;
693const BYTE_STREAM_SPLIT_BLOCK_SIZE: usize = 256;
694const BYTE_STREAM_SPLIT_SIGN_FLAG: u8 = 0x80;
695const BYTE_STREAM_SPLIT_STREAM_COUNT_MASK: u8 = 0x7F;
696const BYTE_STREAM_SPLIT_FLAG_RAW: u8 = 0;
697const BYTE_STREAM_SPLIT_FLAG_LZ4: u8 = 1;
698const BYTE_STREAM_SPLIT_FLAG_ZSTD: u8 = 2;
699
700/// Byte stream split encoder.
701///
702/// Splits float bytes into separate streams for better compression.
703/// Each byte position forms its own stream.
704pub struct ByteStreamSplitEncoder;
705
706impl Encoder for ByteStreamSplitEncoder {
707    fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
708        let mut sign_bitmap: Option<Bitmap> = None;
709        let (bytes_per_value, raw_bytes): (usize, Vec<u8>) = match data {
710            Column::Float32(values) => {
711                let mut sign_bits = Bitmap::new(values.len());
712                let bytes: Vec<u8> = values
713                    .iter()
714                    .enumerate()
715                    .flat_map(|(idx, v)| {
716                        let mut bits = v.to_bits();
717                        if bits & 0x8000_0000 != 0 {
718                            sign_bits.set(idx, true);
719                            bits &= 0x7fff_ffff;
720                        }
721                        bits.to_le_bytes()
722                    })
723                    .collect();
724                sign_bitmap = Some(sign_bits);
725                (4, bytes)
726            }
727            Column::Float64(values) => {
728                let mut sign_bits = Bitmap::new(values.len());
729                let bytes: Vec<u8> = values
730                    .iter()
731                    .enumerate()
732                    .flat_map(|(idx, v)| {
733                        let mut bits = v.to_bits();
734                        if bits & 0x8000_0000_0000_0000 != 0 {
735                            sign_bits.set(idx, true);
736                            bits &= 0x7fff_ffff_ffff_ffff;
737                        }
738                        bits.to_le_bytes()
739                    })
740                    .collect();
741                sign_bitmap = Some(sign_bits);
742                (8, bytes)
743            }
744            Column::Int64(values) => {
745                let bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
746                (8, bytes)
747            }
748            _ => {
749                return Err(ColumnarError::InvalidFormat(
750                    "ByteStreamSplit requires Float32, Float64, or Int64".into(),
751                ))
752            }
753        };
754
755        let num_values = raw_bytes.len() / bytes_per_value;
756
757        let mut buf = Vec::new();
758        buf.extend_from_slice(&(num_values as u32).to_le_bytes());
759        let header = (bytes_per_value as u8) | BYTE_STREAM_SPLIT_V1_FLAG;
760        buf.push(header);
761
762        // Write bitmap flag and data
763        if let Some(bitmap) = null_bitmap {
764            buf.push(1u8);
765            buf.extend_from_slice(&bitmap.to_bytes());
766        } else {
767            buf.push(0u8);
768        }
769
770        let _expected_size = num_values * bytes_per_value;
771        let mut streams: Vec<Vec<u8>> = Vec::with_capacity(bytes_per_value);
772
773        // Blocked MSB-first byte streams to cluster sign/exponent bytes, improving
774        // downstream compression while keeping simple layout.
775        let mut offset = 0;
776        for _ in 0..bytes_per_value {
777            streams.push(Vec::with_capacity(num_values));
778        }
779        while offset < num_values {
780            let block_len = (num_values - offset).min(BYTE_STREAM_SPLIT_BLOCK_SIZE);
781            for (stream_idx, byte_idx) in (0..bytes_per_value).rev().enumerate() {
782                let start = offset * bytes_per_value + byte_idx;
783                let stream = &mut streams[stream_idx];
784                for value_idx in 0..block_len {
785                    stream.push(raw_bytes[start + value_idx * bytes_per_value]);
786                }
787            }
788            offset += block_len;
789        }
790
791        // Write per-stream compression blocks
792        let mut stream_count_byte = bytes_per_value as u8;
793        if sign_bitmap.is_some() {
794            stream_count_byte |= BYTE_STREAM_SPLIT_SIGN_FLAG;
795        }
796        buf.push(stream_count_byte); // stream count (for future widths)
797
798        // Write sign bitmap when present (only for float types)
799        if let Some(sign) = sign_bitmap {
800            buf.extend_from_slice(&sign.to_bytes());
801        }
802        for stream in streams {
803            let original_len = stream.len() as u32;
804            buf.push(BYTE_STREAM_SPLIT_FLAG_RAW);
805            buf.extend_from_slice(&original_len.to_le_bytes());
806            buf.extend_from_slice(&original_len.to_le_bytes());
807            buf.extend_from_slice(&stream);
808        }
809        buf[4] = header;
810        Ok(buf)
811    }
812
813    fn encoding_type(&self) -> EncodingV2 {
814        EncodingV2::ByteStreamSplit
815    }
816}
817
818/// Byte stream split decoder.
819pub struct ByteStreamSplitDecoder;
820
821impl Decoder for ByteStreamSplitDecoder {
822    fn decode(
823        &self,
824        data: &[u8],
825        _num_values: usize,
826        logical_type: LogicalType,
827    ) -> Result<(Column, Option<Bitmap>)> {
828        if data.len() < 6 {
829            return Err(ColumnarError::InvalidFormat(
830                "ByteStreamSplit header too short".into(),
831            ));
832        }
833
834        let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
835        let header = data[4];
836        let bytes_per_value = (header & BYTE_STREAM_SPLIT_HEADER_MASK) as usize;
837        let has_bitmap = data[5] != 0;
838        let use_v1_layout = (header & BYTE_STREAM_SPLIT_V1_FLAG) != 0;
839        let mut pos = 6;
840
841        if bytes_per_value == 0 {
842            return Err(ColumnarError::InvalidFormat(
843                "ByteStreamSplit bytes_per_value cannot be zero".into(),
844            ));
845        }
846
847        let bitmap = if has_bitmap {
848            let bm = Bitmap::from_bytes(&data[pos..])?;
849            pos += 4 + bm.len().div_ceil(8);
850            Some(bm)
851        } else {
852            None
853        };
854
855        if count == 0 {
856            return match logical_type {
857                LogicalType::Float32 => Ok((Column::Float32(vec![]), bitmap)),
858                LogicalType::Float64 => Ok((Column::Float64(vec![]), bitmap)),
859                LogicalType::Int64 => Ok((Column::Int64(vec![]), bitmap)),
860                _ => Err(ColumnarError::InvalidFormat(
861                    "ByteStreamSplit logical type mismatch".into(),
862                )),
863            };
864        }
865
866        let expected_size = count * bytes_per_value;
867        let mut raw_bytes = vec![0u8; expected_size];
868
869        if use_v1_layout {
870            if pos >= data.len() {
871                return Err(ColumnarError::InvalidFormat(
872                    "ByteStreamSplit stream header truncated".into(),
873                ));
874            }
875
876            let stream_count_byte = data[pos];
877            pos += 1;
878            let has_sign_bitmap = (stream_count_byte & BYTE_STREAM_SPLIT_SIGN_FLAG) != 0;
879            let stream_count = (stream_count_byte & BYTE_STREAM_SPLIT_STREAM_COUNT_MASK) as usize;
880            if stream_count != bytes_per_value {
881                return Err(ColumnarError::InvalidFormat(
882                    "ByteStreamSplit stream count mismatch".into(),
883                ));
884            }
885
886            let mut sign_bitmap = if has_sign_bitmap {
887                let bm = Bitmap::from_bytes(&data[pos..])?;
888                pos += 4 + bm.len().div_ceil(8);
889                Some(bm)
890            } else {
891                None
892            };
893            if !matches!(logical_type, LogicalType::Float32 | LogicalType::Float64) {
894                sign_bitmap = None;
895            }
896
897            let raw_ptr = raw_bytes.as_mut_ptr();
898            for stream_idx in 0..stream_count {
899                if pos + 9 > data.len() {
900                    return Err(ColumnarError::InvalidFormat(
901                        "ByteStreamSplit stream header truncated".into(),
902                    ));
903                }
904                let flag = data[pos];
905                let orig_len =
906                    u32::from_le_bytes(data[pos + 1..pos + 5].try_into().unwrap()) as usize;
907                let payload_len =
908                    u32::from_le_bytes(data[pos + 5..pos + 9].try_into().unwrap()) as usize;
909                pos += 9;
910
911                if orig_len != count {
912                    return Err(ColumnarError::InvalidFormat(
913                        "ByteStreamSplit stream length mismatch".into(),
914                    ));
915                }
916                if pos + payload_len > data.len() {
917                    return Err(ColumnarError::InvalidFormat(
918                        "ByteStreamSplit stream payload truncated".into(),
919                    ));
920                }
921
922                let payload = &data[pos..pos + payload_len];
923                pos += payload_len;
924
925                let stream_buf: Option<Vec<u8>> = match flag {
926                    BYTE_STREAM_SPLIT_FLAG_LZ4 => {
927                        #[cfg(feature = "compression-lz4")]
928                        {
929                            let orig_len_i32: i32 = orig_len.try_into().map_err(|_| {
930                                ColumnarError::InvalidFormat(
931                                    "ByteStreamSplit stream length too large".into(),
932                                )
933                            })?;
934                            Some(lz4::block::decompress(payload, Some(orig_len_i32)).map_err(
935                                |_| {
936                                    ColumnarError::InvalidFormat(
937                                        "ByteStreamSplit stream decompress failed".into(),
938                                    )
939                                },
940                            )?)
941                        }
942                        #[cfg(not(feature = "compression-lz4"))]
943                        {
944                            return Err(ColumnarError::InvalidFormat(
945                                "ByteStreamSplit compressed stream requires compression-lz4".into(),
946                            ));
947                        }
948                    }
949                    BYTE_STREAM_SPLIT_FLAG_ZSTD => {
950                        #[cfg(feature = "compression-zstd")]
951                        {
952                            Some(zstd::bulk::decompress(payload, orig_len).map_err(|_| {
953                                ColumnarError::InvalidFormat(
954                                    "ByteStreamSplit stream decompress failed".into(),
955                                )
956                            })?)
957                        }
958                        #[cfg(not(feature = "compression-zstd"))]
959                        {
960                            return Err(ColumnarError::InvalidFormat(
961                                "ByteStreamSplit zstd stream requires compression-zstd".into(),
962                            ));
963                        }
964                    }
965                    _ => {
966                        // Raw stream: reuse the payload slice without allocating.
967                        None
968                    }
969                };
970
971                let stream = match stream_buf.as_deref() {
972                    Some(buf) => buf,
973                    None => payload,
974                };
975
976                if stream.len() != count {
977                    return Err(ColumnarError::InvalidFormat(
978                        "ByteStreamSplit stream length invalid".into(),
979                    ));
980                }
981
982                let byte_idx = bytes_per_value - 1 - stream_idx;
983                if stream_idx == 0 {
984                    if let Some(sign_bitmap) = sign_bitmap.as_ref() {
985                        let mut dst_index = byte_idx;
986                        for (value_idx, &byte) in stream.iter().enumerate() {
987                            let mut out = byte;
988                            if sign_bitmap.get(value_idx) {
989                                out |= 0x80;
990                            }
991                            // Safety: dst_index is within raw_bytes (count * bytes_per_value).
992                            unsafe {
993                                *raw_ptr.add(dst_index) = out;
994                            }
995                            dst_index += bytes_per_value;
996                        }
997                        continue;
998                    }
999                }
1000
1001                let mut dst_index = byte_idx;
1002                for &byte in stream {
1003                    // Safety: dst_index is within raw_bytes (count * bytes_per_value).
1004                    unsafe {
1005                        *raw_ptr.add(dst_index) = byte;
1006                    }
1007                    dst_index += bytes_per_value;
1008                }
1009            }
1010        } else {
1011            if pos + expected_size > data.len() {
1012                return Err(ColumnarError::InvalidFormat(
1013                    "ByteStreamSplit data truncated".into(),
1014                ));
1015            }
1016
1017            let raw_ptr = raw_bytes.as_mut_ptr();
1018            let data_ptr = data.as_ptr();
1019            for byte_idx in 0..bytes_per_value {
1020                let dst_index = byte_idx;
1021                let src_index = pos + byte_idx * count;
1022                // Safety: src_index/ dst_index are bounded by data/raw_bytes sizes.
1023                unsafe {
1024                    let mut dst_ptr = raw_ptr.add(dst_index);
1025                    let mut src_ptr = data_ptr.add(src_index);
1026                    for _ in 0..count {
1027                        *dst_ptr = *src_ptr;
1028                        dst_ptr = dst_ptr.add(bytes_per_value);
1029                        src_ptr = src_ptr.add(1);
1030                    }
1031                }
1032            }
1033        }
1034
1035        match logical_type {
1036            LogicalType::Float32 => {
1037                if bytes_per_value != 4 {
1038                    return Err(ColumnarError::InvalidFormat(
1039                        "ByteStreamSplit Float32 length mismatch".into(),
1040                    ));
1041                }
1042                if cfg!(target_endian = "little") {
1043                    let mut values = vec![0f32; count];
1044                    // Safety: values is a contiguous little-endian buffer of the right size.
1045                    unsafe {
1046                        std::ptr::copy_nonoverlapping(
1047                            raw_bytes.as_ptr(),
1048                            values.as_mut_ptr() as *mut u8,
1049                            raw_bytes.len(),
1050                        );
1051                    }
1052                    Ok((Column::Float32(values), bitmap))
1053                } else {
1054                    let values: Vec<f32> = raw_bytes
1055                        .chunks_exact(4)
1056                        .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap()))
1057                        .collect();
1058                    Ok((Column::Float32(values), bitmap))
1059                }
1060            }
1061            LogicalType::Float64 => {
1062                if bytes_per_value != 8 {
1063                    return Err(ColumnarError::InvalidFormat(
1064                        "ByteStreamSplit Float64 length mismatch".into(),
1065                    ));
1066                }
1067                if cfg!(target_endian = "little") {
1068                    let mut values = vec![0f64; count];
1069                    // Safety: values is a contiguous little-endian buffer of the right size.
1070                    unsafe {
1071                        std::ptr::copy_nonoverlapping(
1072                            raw_bytes.as_ptr(),
1073                            values.as_mut_ptr() as *mut u8,
1074                            raw_bytes.len(),
1075                        );
1076                    }
1077                    Ok((Column::Float64(values), bitmap))
1078                } else {
1079                    let values: Vec<f64> = raw_bytes
1080                        .chunks_exact(8)
1081                        .map(|chunk| f64::from_le_bytes(chunk.try_into().unwrap()))
1082                        .collect();
1083                    Ok((Column::Float64(values), bitmap))
1084                }
1085            }
1086            LogicalType::Int64 => {
1087                if bytes_per_value != 8 {
1088                    return Err(ColumnarError::InvalidFormat(
1089                        "ByteStreamSplit Int64 length mismatch".into(),
1090                    ));
1091                }
1092                if cfg!(target_endian = "little") {
1093                    let mut values = vec![0i64; count];
1094                    // Safety: values is a contiguous little-endian buffer of the right size.
1095                    unsafe {
1096                        std::ptr::copy_nonoverlapping(
1097                            raw_bytes.as_ptr(),
1098                            values.as_mut_ptr() as *mut u8,
1099                            raw_bytes.len(),
1100                        );
1101                    }
1102                    Ok((Column::Int64(values), bitmap))
1103                } else {
1104                    let values: Vec<i64> = raw_bytes
1105                        .chunks_exact(8)
1106                        .map(|chunk| i64::from_le_bytes(chunk.try_into().unwrap()))
1107                        .collect();
1108                    Ok((Column::Int64(values), bitmap))
1109                }
1110            }
1111            _ => Err(ColumnarError::InvalidFormat(
1112                "ByteStreamSplit requires Float32, Float64, or Int64".into(),
1113            )),
1114        }
1115    }
1116}
1117
1118// ============================================================================
1119// IncrementalString - for sorted strings
1120// ============================================================================
1121
1122/// Incremental string encoder.
1123///
1124/// Stores common prefix length + suffix for sorted strings.
1125/// Efficient for lexicographically sorted string columns.
1126pub struct IncrementalStringEncoder;
1127
1128impl Encoder for IncrementalStringEncoder {
1129    fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1130        let values = match data {
1131            Column::Binary(v) => v,
1132            _ => {
1133                return Err(ColumnarError::InvalidFormat(
1134                    "IncrementalString encoding requires Binary".into(),
1135                ))
1136            }
1137        };
1138
1139        let mut buf = Vec::new();
1140        buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1141
1142        // Write bitmap flag and data
1143        if let Some(bitmap) = null_bitmap {
1144            buf.push(1u8);
1145            buf.extend_from_slice(&bitmap.to_bytes());
1146        } else {
1147            buf.push(0u8);
1148        }
1149
1150        if values.is_empty() {
1151            return Ok(buf);
1152        }
1153
1154        // First value: full length + data
1155        buf.extend_from_slice(&(values[0].len() as u32).to_le_bytes());
1156        buf.extend_from_slice(&values[0]);
1157
1158        // Subsequent values: prefix length + suffix
1159        for window in values.windows(2) {
1160            let prev = &window[0];
1161            let curr = &window[1];
1162
1163            let common_prefix = prev
1164                .iter()
1165                .zip(curr.iter())
1166                .take_while(|(a, b)| a == b)
1167                .count();
1168
1169            let suffix = &curr[common_prefix..];
1170
1171            buf.extend_from_slice(&(common_prefix as u16).to_le_bytes());
1172            buf.extend_from_slice(&(suffix.len() as u16).to_le_bytes());
1173            buf.extend_from_slice(suffix);
1174        }
1175
1176        Ok(buf)
1177    }
1178
1179    fn encoding_type(&self) -> EncodingV2 {
1180        EncodingV2::IncrementalString
1181    }
1182}
1183
1184/// Incremental string decoder.
1185pub struct IncrementalStringDecoder;
1186
1187impl Decoder for IncrementalStringDecoder {
1188    fn decode(
1189        &self,
1190        data: &[u8],
1191        _num_values: usize,
1192        logical_type: LogicalType,
1193    ) -> Result<(Column, Option<Bitmap>)> {
1194        if logical_type != LogicalType::Binary {
1195            return Err(ColumnarError::InvalidFormat(
1196                "IncrementalString decoding requires Binary".into(),
1197            ));
1198        }
1199
1200        if data.len() < 5 {
1201            return Err(ColumnarError::InvalidFormat(
1202                "IncrementalString header too short".into(),
1203            ));
1204        }
1205
1206        let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1207        let has_bitmap = data[4] != 0;
1208        let mut pos = 5;
1209
1210        let bitmap = if has_bitmap {
1211            let bm = Bitmap::from_bytes(&data[pos..])?;
1212            pos += 4 + bm.len().div_ceil(8);
1213            Some(bm)
1214        } else {
1215            None
1216        };
1217
1218        if count == 0 {
1219            return Ok((Column::Binary(vec![]), bitmap));
1220        }
1221
1222        let mut values = Vec::with_capacity(count);
1223
1224        // First value
1225        if pos + 4 > data.len() {
1226            return Err(ColumnarError::InvalidFormat(
1227                "IncrementalString first len truncated".into(),
1228            ));
1229        }
1230        let first_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1231        pos += 4;
1232
1233        if pos + first_len > data.len() {
1234            return Err(ColumnarError::InvalidFormat(
1235                "IncrementalString first value truncated".into(),
1236            ));
1237        }
1238        let mut current = data[pos..pos + first_len].to_vec();
1239        pos += first_len;
1240        values.push(current.clone());
1241
1242        // Subsequent values
1243        for _ in 1..count {
1244            if pos + 4 > data.len() {
1245                return Err(ColumnarError::InvalidFormat(
1246                    "IncrementalString header truncated".into(),
1247                ));
1248            }
1249            let prefix_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1250            pos += 2;
1251            let suffix_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1252            pos += 2;
1253
1254            if pos + suffix_len > data.len() {
1255                return Err(ColumnarError::InvalidFormat(
1256                    "IncrementalString suffix truncated".into(),
1257                ));
1258            }
1259
1260            current.truncate(prefix_len);
1261            current.extend_from_slice(&data[pos..pos + suffix_len]);
1262            pos += suffix_len;
1263
1264            values.push(current.clone());
1265        }
1266
1267        Ok((Column::Binary(values), bitmap))
1268    }
1269}
1270
1271// ============================================================================
1272// RLE (Run-Length Encoding) - for Bool/Int64
1273// ============================================================================
1274
1275/// Run-Length encoder for Bool and Int64.
1276///
1277/// Format: count(u32) + has_bitmap(u8) + bitmap? + run_count(u32) + {value, run_len}*
1278/// - For Bool: value is 1 byte (0 or 1)
1279/// - For Int64: value is 8 bytes (i64 LE)
1280/// - run_len is u32 LE
1281pub struct RleEncoder;
1282
1283impl Encoder for RleEncoder {
1284    fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1285        match data {
1286            Column::Bool(values) => encode_rle_bool(values, null_bitmap),
1287            Column::Int64(values) => encode_rle_int64(values, null_bitmap),
1288            _ => Err(ColumnarError::InvalidFormat(
1289                "RLE encoding requires Bool or Int64".into(),
1290            )),
1291        }
1292    }
1293
1294    fn encoding_type(&self) -> EncodingV2 {
1295        EncodingV2::Rle
1296    }
1297}
1298
1299fn encode_rle_bool(values: &[bool], null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1300    let mut buf = Vec::new();
1301    buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1302
1303    // Write bitmap flag and data
1304    if let Some(bitmap) = null_bitmap {
1305        buf.push(1u8);
1306        buf.extend_from_slice(&bitmap.to_bytes());
1307    } else {
1308        buf.push(0u8);
1309    }
1310
1311    if values.is_empty() {
1312        buf.extend_from_slice(&0u32.to_le_bytes()); // run_count = 0
1313        return Ok(buf);
1314    }
1315
1316    // Build runs
1317    let mut runs: Vec<(bool, u32)> = Vec::new();
1318    let mut current_value = values[0];
1319    let mut current_run_len = 1u32;
1320
1321    for &v in values.iter().skip(1) {
1322        if v == current_value {
1323            current_run_len += 1;
1324        } else {
1325            runs.push((current_value, current_run_len));
1326            current_value = v;
1327            current_run_len = 1;
1328        }
1329    }
1330    runs.push((current_value, current_run_len));
1331
1332    // Write run count
1333    buf.extend_from_slice(&(runs.len() as u32).to_le_bytes());
1334
1335    // Write runs: {value(1 byte), run_len(4 bytes)}*
1336    for (value, run_len) in runs {
1337        buf.push(value as u8);
1338        buf.extend_from_slice(&run_len.to_le_bytes());
1339    }
1340
1341    Ok(buf)
1342}
1343
1344fn encode_rle_int64(values: &[i64], null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1345    let mut buf = Vec::new();
1346    buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1347
1348    // Write bitmap flag and data
1349    if let Some(bitmap) = null_bitmap {
1350        buf.push(1u8);
1351        buf.extend_from_slice(&bitmap.to_bytes());
1352    } else {
1353        buf.push(0u8);
1354    }
1355
1356    if values.is_empty() {
1357        buf.extend_from_slice(&0u32.to_le_bytes()); // run_count = 0
1358        return Ok(buf);
1359    }
1360
1361    // Build runs
1362    let mut runs: Vec<(i64, u32)> = Vec::new();
1363    let mut current_value = values[0];
1364    let mut current_run_len = 1u32;
1365
1366    for &v in values.iter().skip(1) {
1367        if v == current_value {
1368            current_run_len += 1;
1369        } else {
1370            runs.push((current_value, current_run_len));
1371            current_value = v;
1372            current_run_len = 1;
1373        }
1374    }
1375    runs.push((current_value, current_run_len));
1376
1377    // Write run count
1378    buf.extend_from_slice(&(runs.len() as u32).to_le_bytes());
1379
1380    // Write runs: {value(8 bytes), run_len(4 bytes)}*
1381    for (value, run_len) in runs {
1382        buf.extend_from_slice(&value.to_le_bytes());
1383        buf.extend_from_slice(&run_len.to_le_bytes());
1384    }
1385
1386    Ok(buf)
1387}
1388
1389/// Run-Length decoder for Bool and Int64.
1390pub struct RleDecoder;
1391
1392impl Decoder for RleDecoder {
1393    fn decode(
1394        &self,
1395        data: &[u8],
1396        _num_values: usize,
1397        logical_type: LogicalType,
1398    ) -> Result<(Column, Option<Bitmap>)> {
1399        match logical_type {
1400            LogicalType::Bool => decode_rle_bool(data),
1401            LogicalType::Int64 => decode_rle_int64(data),
1402            _ => Err(ColumnarError::InvalidFormat(
1403                "RLE decoding requires Bool or Int64".into(),
1404            )),
1405        }
1406    }
1407}
1408
1409fn decode_rle_bool(data: &[u8]) -> Result<(Column, Option<Bitmap>)> {
1410    if data.len() < 5 {
1411        return Err(ColumnarError::InvalidFormat("RLE header too short".into()));
1412    }
1413
1414    let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1415    let has_bitmap = data[4] != 0;
1416    let mut pos = 5;
1417
1418    let bitmap = if has_bitmap {
1419        let bm = Bitmap::from_bytes(&data[pos..])?;
1420        pos += 4 + bm.len().div_ceil(8);
1421        Some(bm)
1422    } else {
1423        None
1424    };
1425
1426    if pos + 4 > data.len() {
1427        return Err(ColumnarError::InvalidFormat(
1428            "RLE run_count truncated".into(),
1429        ));
1430    }
1431
1432    let run_count = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1433    pos += 4;
1434
1435    if count == 0 {
1436        return Ok((Column::Bool(vec![]), bitmap));
1437    }
1438
1439    let mut values = Vec::with_capacity(count);
1440
1441    for _ in 0..run_count {
1442        if pos + 5 > data.len() {
1443            return Err(ColumnarError::InvalidFormat(
1444                "RLE bool run truncated".into(),
1445            ));
1446        }
1447        let value = data[pos] != 0;
1448        pos += 1;
1449        let run_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1450        pos += 4;
1451
1452        for _ in 0..run_len {
1453            values.push(value);
1454        }
1455    }
1456
1457    if values.len() != count {
1458        return Err(ColumnarError::InvalidFormat("RLE count mismatch".into()));
1459    }
1460
1461    Ok((Column::Bool(values), bitmap))
1462}
1463
1464fn decode_rle_int64(data: &[u8]) -> Result<(Column, Option<Bitmap>)> {
1465    if data.len() < 5 {
1466        return Err(ColumnarError::InvalidFormat("RLE header too short".into()));
1467    }
1468
1469    let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1470    let has_bitmap = data[4] != 0;
1471    let mut pos = 5;
1472
1473    let bitmap = if has_bitmap {
1474        let bm = Bitmap::from_bytes(&data[pos..])?;
1475        pos += 4 + bm.len().div_ceil(8);
1476        Some(bm)
1477    } else {
1478        None
1479    };
1480
1481    if pos + 4 > data.len() {
1482        return Err(ColumnarError::InvalidFormat(
1483            "RLE run_count truncated".into(),
1484        ));
1485    }
1486
1487    let run_count = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1488    pos += 4;
1489
1490    if count == 0 {
1491        return Ok((Column::Int64(vec![]), bitmap));
1492    }
1493
1494    let mut values = Vec::with_capacity(count);
1495
1496    for _ in 0..run_count {
1497        if pos + 12 > data.len() {
1498            return Err(ColumnarError::InvalidFormat(
1499                "RLE int64 run truncated".into(),
1500            ));
1501        }
1502        let value = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
1503        pos += 8;
1504        let run_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1505        pos += 4;
1506
1507        for _ in 0..run_len {
1508            values.push(value);
1509        }
1510    }
1511
1512    if values.len() != count {
1513        return Err(ColumnarError::InvalidFormat("RLE count mismatch".into()));
1514    }
1515
1516    Ok((Column::Int64(values), bitmap))
1517}
1518
1519// ============================================================================
1520// Dictionary Encoding - for Binary/Fixed
1521// ============================================================================
1522
1523/// Dictionary encoder for Binary and Fixed columns.
1524///
1525/// Format: count(u32) + has_bitmap(u8) + bitmap? + dict_count(u32) + dict_entries + indices[u32]*
1526/// - dict_entries for Binary: {len(u32) + bytes}*
1527/// - dict_entries for Fixed: bytes* (fixed length known from type)
1528/// - indices: u32 LE index into dictionary
1529pub struct DictionaryEncoder;
1530
1531impl Encoder for DictionaryEncoder {
1532    fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1533        match data {
1534            Column::Binary(values) => encode_dict_binary(values, null_bitmap),
1535            Column::Fixed { len, values } => encode_dict_fixed(*len, values, null_bitmap),
1536            _ => Err(ColumnarError::InvalidFormat(
1537                "Dictionary encoding requires Binary or Fixed".into(),
1538            )),
1539        }
1540    }
1541
1542    fn encoding_type(&self) -> EncodingV2 {
1543        EncodingV2::Dictionary
1544    }
1545}
1546
1547fn encode_dict_binary(values: &[Vec<u8>], null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1548    use std::collections::HashMap;
1549
1550    let mut buf = Vec::new();
1551    buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1552
1553    // Write bitmap flag and data
1554    if let Some(bitmap) = null_bitmap {
1555        buf.push(1u8);
1556        buf.extend_from_slice(&bitmap.to_bytes());
1557    } else {
1558        buf.push(0u8);
1559    }
1560
1561    if values.is_empty() {
1562        buf.extend_from_slice(&0u32.to_le_bytes()); // dict_count = 0
1563        return Ok(buf);
1564    }
1565
1566    // Build dictionary
1567    let mut dict: Vec<&Vec<u8>> = Vec::new();
1568    let mut dict_map: HashMap<&Vec<u8>, u32> = HashMap::new();
1569    let mut indices: Vec<u32> = Vec::with_capacity(values.len());
1570
1571    for v in values {
1572        if let Some(&idx) = dict_map.get(v) {
1573            indices.push(idx);
1574        } else {
1575            let idx = dict.len() as u32;
1576            dict.push(v);
1577            dict_map.insert(v, idx);
1578            indices.push(idx);
1579        }
1580    }
1581
1582    // Write dict_count
1583    buf.extend_from_slice(&(dict.len() as u32).to_le_bytes());
1584
1585    // Write dictionary entries: {len(u32) + bytes}*
1586    for entry in &dict {
1587        buf.extend_from_slice(&(entry.len() as u32).to_le_bytes());
1588        buf.extend_from_slice(entry);
1589    }
1590
1591    // Write indices
1592    for idx in indices {
1593        buf.extend_from_slice(&idx.to_le_bytes());
1594    }
1595
1596    Ok(buf)
1597}
1598
1599fn encode_dict_fixed(
1600    fixed_len: usize,
1601    values: &[Vec<u8>],
1602    null_bitmap: Option<&Bitmap>,
1603) -> Result<Vec<u8>> {
1604    use std::collections::HashMap;
1605
1606    let mut buf = Vec::new();
1607    buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1608
1609    // Write bitmap flag and data
1610    if let Some(bitmap) = null_bitmap {
1611        buf.push(1u8);
1612        buf.extend_from_slice(&bitmap.to_bytes());
1613    } else {
1614        buf.push(0u8);
1615    }
1616
1617    // Write fixed length
1618    buf.extend_from_slice(&(fixed_len as u16).to_le_bytes());
1619
1620    if values.is_empty() {
1621        buf.extend_from_slice(&0u32.to_le_bytes()); // dict_count = 0
1622        return Ok(buf);
1623    }
1624
1625    // Build dictionary
1626    let mut dict: Vec<&Vec<u8>> = Vec::new();
1627    let mut dict_map: HashMap<&Vec<u8>, u32> = HashMap::new();
1628    let mut indices: Vec<u32> = Vec::with_capacity(values.len());
1629
1630    for v in values {
1631        if let Some(&idx) = dict_map.get(v) {
1632            indices.push(idx);
1633        } else {
1634            let idx = dict.len() as u32;
1635            dict.push(v);
1636            dict_map.insert(v, idx);
1637            indices.push(idx);
1638        }
1639    }
1640
1641    // Write dict_count
1642    buf.extend_from_slice(&(dict.len() as u32).to_le_bytes());
1643
1644    // Write dictionary entries: fixed-length bytes*
1645    for entry in &dict {
1646        buf.extend_from_slice(entry);
1647    }
1648
1649    // Write indices
1650    for idx in indices {
1651        buf.extend_from_slice(&idx.to_le_bytes());
1652    }
1653
1654    Ok(buf)
1655}
1656
1657/// Dictionary decoder for Binary and Fixed columns.
1658pub struct DictionaryDecoder;
1659
1660impl Decoder for DictionaryDecoder {
1661    fn decode(
1662        &self,
1663        data: &[u8],
1664        _num_values: usize,
1665        logical_type: LogicalType,
1666    ) -> Result<(Column, Option<Bitmap>)> {
1667        match logical_type {
1668            LogicalType::Binary => decode_dict_binary(data),
1669            LogicalType::Fixed(fixed_len) => decode_dict_fixed(data, fixed_len as usize),
1670            _ => Err(ColumnarError::InvalidFormat(
1671                "Dictionary decoding requires Binary or Fixed".into(),
1672            )),
1673        }
1674    }
1675}
1676
1677fn decode_dict_binary(data: &[u8]) -> Result<(Column, Option<Bitmap>)> {
1678    if data.len() < 5 {
1679        return Err(ColumnarError::InvalidFormat(
1680            "Dictionary header too short".into(),
1681        ));
1682    }
1683
1684    let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1685    let has_bitmap = data[4] != 0;
1686    let mut pos = 5;
1687
1688    let bitmap = if has_bitmap {
1689        let bm = Bitmap::from_bytes(&data[pos..])?;
1690        pos += 4 + bm.len().div_ceil(8);
1691        Some(bm)
1692    } else {
1693        None
1694    };
1695
1696    if pos + 4 > data.len() {
1697        return Err(ColumnarError::InvalidFormat(
1698            "Dictionary dict_count truncated".into(),
1699        ));
1700    }
1701
1702    let dict_count = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1703    pos += 4;
1704
1705    if count == 0 {
1706        return Ok((Column::Binary(vec![]), bitmap));
1707    }
1708
1709    // Read dictionary entries
1710    let mut dict: Vec<Vec<u8>> = Vec::with_capacity(dict_count);
1711    for _ in 0..dict_count {
1712        if pos + 4 > data.len() {
1713            return Err(ColumnarError::InvalidFormat(
1714                "Dictionary entry len truncated".into(),
1715            ));
1716        }
1717        let entry_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1718        pos += 4;
1719        if pos + entry_len > data.len() {
1720            return Err(ColumnarError::InvalidFormat(
1721                "Dictionary entry data truncated".into(),
1722            ));
1723        }
1724        dict.push(data[pos..pos + entry_len].to_vec());
1725        pos += entry_len;
1726    }
1727
1728    // Read indices and reconstruct values
1729    let mut values = Vec::with_capacity(count);
1730    for _ in 0..count {
1731        if pos + 4 > data.len() {
1732            return Err(ColumnarError::InvalidFormat(
1733                "Dictionary index truncated".into(),
1734            ));
1735        }
1736        let idx = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1737        pos += 4;
1738        if idx >= dict.len() {
1739            return Err(ColumnarError::InvalidFormat(
1740                "Dictionary index out of range".into(),
1741            ));
1742        }
1743        values.push(dict[idx].clone());
1744    }
1745
1746    Ok((Column::Binary(values), bitmap))
1747}
1748
1749fn decode_dict_fixed(data: &[u8], expected_len: usize) -> Result<(Column, Option<Bitmap>)> {
1750    if data.len() < 5 {
1751        return Err(ColumnarError::InvalidFormat(
1752            "Dictionary header too short".into(),
1753        ));
1754    }
1755
1756    let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1757    let has_bitmap = data[4] != 0;
1758    let mut pos = 5;
1759
1760    let bitmap = if has_bitmap {
1761        let bm = Bitmap::from_bytes(&data[pos..])?;
1762        pos += 4 + bm.len().div_ceil(8);
1763        Some(bm)
1764    } else {
1765        None
1766    };
1767
1768    if pos + 2 > data.len() {
1769        return Err(ColumnarError::InvalidFormat(
1770            "Dictionary fixed_len truncated".into(),
1771        ));
1772    }
1773    let fixed_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
1774    pos += 2;
1775
1776    if fixed_len != expected_len {
1777        return Err(ColumnarError::InvalidFormat(
1778            "Dictionary fixed length mismatch".into(),
1779        ));
1780    }
1781
1782    if pos + 4 > data.len() {
1783        return Err(ColumnarError::InvalidFormat(
1784            "Dictionary dict_count truncated".into(),
1785        ));
1786    }
1787
1788    let dict_count = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1789    pos += 4;
1790
1791    if count == 0 {
1792        return Ok((
1793            Column::Fixed {
1794                len: fixed_len,
1795                values: vec![],
1796            },
1797            bitmap,
1798        ));
1799    }
1800
1801    // Read dictionary entries (fixed length)
1802    let mut dict: Vec<Vec<u8>> = Vec::with_capacity(dict_count);
1803    for _ in 0..dict_count {
1804        if pos + fixed_len > data.len() {
1805            return Err(ColumnarError::InvalidFormat(
1806                "Dictionary fixed entry truncated".into(),
1807            ));
1808        }
1809        dict.push(data[pos..pos + fixed_len].to_vec());
1810        pos += fixed_len;
1811    }
1812
1813    // Read indices and reconstruct values
1814    let mut values = Vec::with_capacity(count);
1815    for _ in 0..count {
1816        if pos + 4 > data.len() {
1817            return Err(ColumnarError::InvalidFormat(
1818                "Dictionary index truncated".into(),
1819            ));
1820        }
1821        let idx = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
1822        pos += 4;
1823        if idx >= dict.len() {
1824            return Err(ColumnarError::InvalidFormat(
1825                "Dictionary index out of range".into(),
1826            ));
1827        }
1828        values.push(dict[idx].clone());
1829    }
1830
1831    Ok((
1832        Column::Fixed {
1833            len: fixed_len,
1834            values,
1835        },
1836        bitmap,
1837    ))
1838}
1839
1840// ============================================================================
1841// Bitpack Encoding - for Bool
1842// ============================================================================
1843
1844/// Bitpack encoder for Bool columns.
1845///
1846/// Format: count(u32) + has_bitmap(u8) + bitmap? + packed_bytes
1847/// - Each bit represents one bool value (1 = true, 0 = false)
1848/// - Bits are packed LSB first within each byte
1849pub struct BitpackEncoder;
1850
1851impl Encoder for BitpackEncoder {
1852    fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
1853        let values = match data {
1854            Column::Bool(v) => v,
1855            _ => {
1856                return Err(ColumnarError::InvalidFormat(
1857                    "Bitpack encoding requires Bool".into(),
1858                ))
1859            }
1860        };
1861
1862        let mut buf = Vec::new();
1863        buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1864
1865        // Write bitmap flag and data
1866        if let Some(bitmap) = null_bitmap {
1867            buf.push(1u8);
1868            buf.extend_from_slice(&bitmap.to_bytes());
1869        } else {
1870            buf.push(0u8);
1871        }
1872
1873        if values.is_empty() {
1874            return Ok(buf);
1875        }
1876
1877        // Pack bool values into bytes
1878        let num_bytes = values.len().div_ceil(8);
1879        let mut packed = vec![0u8; num_bytes];
1880
1881        for (i, &v) in values.iter().enumerate() {
1882            if v {
1883                packed[i / 8] |= 1 << (i % 8);
1884            }
1885        }
1886
1887        buf.extend_from_slice(&packed);
1888
1889        Ok(buf)
1890    }
1891
1892    fn encoding_type(&self) -> EncodingV2 {
1893        EncodingV2::Bitpack
1894    }
1895}
1896
1897/// Bitpack decoder for Bool columns.
1898pub struct BitpackDecoder;
1899
1900impl Decoder for BitpackDecoder {
1901    fn decode(
1902        &self,
1903        data: &[u8],
1904        _num_values: usize,
1905        logical_type: LogicalType,
1906    ) -> Result<(Column, Option<Bitmap>)> {
1907        if logical_type != LogicalType::Bool {
1908            return Err(ColumnarError::InvalidFormat(
1909                "Bitpack decoding requires Bool".into(),
1910            ));
1911        }
1912
1913        if data.len() < 5 {
1914            return Err(ColumnarError::InvalidFormat(
1915                "Bitpack header too short".into(),
1916            ));
1917        }
1918
1919        let count = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize;
1920        let has_bitmap = data[4] != 0;
1921        let mut pos = 5;
1922
1923        let bitmap = if has_bitmap {
1924            let bm = Bitmap::from_bytes(&data[pos..])?;
1925            pos += 4 + bm.len().div_ceil(8);
1926            Some(bm)
1927        } else {
1928            None
1929        };
1930
1931        if count == 0 {
1932            return Ok((Column::Bool(vec![]), bitmap));
1933        }
1934
1935        let num_bytes = count.div_ceil(8);
1936        if pos + num_bytes > data.len() {
1937            return Err(ColumnarError::InvalidFormat(
1938                "Bitpack data truncated".into(),
1939            ));
1940        }
1941
1942        let packed = &data[pos..pos + num_bytes];
1943
1944        // Unpack bool values
1945        let mut values = Vec::with_capacity(count);
1946        for i in 0..count {
1947            let value = (packed[i / 8] & (1 << (i % 8))) != 0;
1948            values.push(value);
1949        }
1950
1951        Ok((Column::Bool(values), bitmap))
1952    }
1953}
1954
1955// ============================================================================
1956// Encoding Selection
1957// ============================================================================
1958
1959/// Statistics used for encoding selection.
1960#[derive(Default)]
1961pub struct EncodingHints {
1962    /// Is data sorted?
1963    pub is_sorted: bool,
1964    /// Number of distinct values.
1965    pub distinct_count: usize,
1966    /// Total number of values.
1967    pub total_count: usize,
1968    /// For integers: value range (max - min).
1969    pub value_range: Option<u64>,
1970    /// Percentage of values within main range (for PFOR).
1971    pub in_range_ratio: Option<f64>,
1972}
1973
1974/// Select optimal encoding based on data type and hints.
1975///
1976/// Returns the best encoding for the given data characteristics.
1977/// Falls back to Plain if no better encoding is applicable.
1978pub fn select_encoding(logical_type: LogicalType, hints: &EncodingHints) -> EncodingV2 {
1979    match logical_type {
1980        LogicalType::Int64 => select_int_encoding(hints),
1981        LogicalType::Float32 => EncodingV2::ByteStreamSplit,
1982        LogicalType::Float64 => EncodingV2::ByteStreamSplit,
1983        LogicalType::Bool => select_bool_encoding(hints),
1984        LogicalType::Binary => select_binary_encoding(hints),
1985        LogicalType::Fixed(_) => select_binary_encoding(hints),
1986    }
1987}
1988
1989fn select_bool_encoding(hints: &EncodingHints) -> EncodingV2 {
1990    // Low cardinality with runs: use RLE
1991    if hints.total_count > 0 && hints.distinct_count <= 2 {
1992        // If there are likely many consecutive runs, RLE is efficient
1993        // Heuristic: if average run length > 4, use RLE
1994        let avg_run_len = hints.total_count / hints.distinct_count.max(1);
1995        if avg_run_len >= 4 {
1996            return EncodingV2::Rle;
1997        }
1998    }
1999
2000    // Default: Bitpack is always efficient for bool
2001    EncodingV2::Bitpack
2002}
2003
2004fn select_int_encoding(hints: &EncodingHints) -> EncodingV2 {
2005    // Sorted data: use Delta encoding
2006    if hints.is_sorted {
2007        return EncodingV2::Delta;
2008    }
2009
2010    // Small range: use FOR
2011    if let Some(range) = hints.value_range {
2012        let bits_needed = if range == 0 {
2013            1
2014        } else {
2015            64 - range.leading_zeros()
2016        };
2017        if bits_needed <= 16 {
2018            // Check if PFOR would be better (has outliers)
2019            if let Some(ratio) = hints.in_range_ratio {
2020                if (0.9..1.0).contains(&ratio) {
2021                    return EncodingV2::PFOR;
2022                }
2023            }
2024            return EncodingV2::FOR;
2025        }
2026    }
2027
2028    // Low cardinality with runs: use RLE
2029    if hints.total_count > 0 && hints.distinct_count > 0 {
2030        let avg_run_len = hints.total_count / hints.distinct_count;
2031        if avg_run_len >= 4 {
2032            return EncodingV2::Rle;
2033        }
2034    }
2035
2036    EncodingV2::Plain
2037}
2038
2039fn select_binary_encoding(hints: &EncodingHints) -> EncodingV2 {
2040    // ソート済みデータ: 先頭との差分(prefix)を活用できるため IncrementalString を優先する。
2041    if hints.is_sorted && hints.total_count > 0 {
2042        return EncodingV2::IncrementalString;
2043    }
2044
2045    // Low cardinality: use Dictionary
2046    if hints.total_count > 0 && hints.distinct_count > 0 {
2047        let cardinality_ratio = hints.distinct_count as f64 / hints.total_count as f64;
2048        if cardinality_ratio < 0.5 {
2049            return EncodingV2::Dictionary;
2050        }
2051    }
2052
2053    EncodingV2::Plain
2054}
2055
2056/// Create an encoder for the given encoding type.
2057pub fn create_encoder(encoding: EncodingV2) -> Box<dyn Encoder> {
2058    match encoding {
2059        EncodingV2::Plain => Box::new(PlainEncoder),
2060        EncodingV2::Delta => Box::new(DeltaEncoder),
2061        EncodingV2::FOR => Box::new(ForEncoder),
2062        EncodingV2::PFOR => Box::new(PforEncoder::default()),
2063        EncodingV2::ByteStreamSplit => Box::new(ByteStreamSplitEncoder),
2064        EncodingV2::IncrementalString => Box::new(IncrementalStringEncoder),
2065        EncodingV2::Rle => Box::new(RleEncoder),
2066        EncodingV2::Dictionary => Box::new(DictionaryEncoder),
2067        EncodingV2::Bitpack => Box::new(BitpackEncoder),
2068        // DeltaLength not yet implemented
2069        EncodingV2::DeltaLength => Box::new(PlainEncoder),
2070    }
2071}
2072
2073/// Create a decoder for the given encoding type.
2074pub fn create_decoder(encoding: EncodingV2) -> Box<dyn Decoder> {
2075    match encoding {
2076        EncodingV2::Plain => Box::new(PlainDecoder),
2077        EncodingV2::Delta => Box::new(DeltaDecoder),
2078        EncodingV2::FOR => Box::new(ForDecoder),
2079        EncodingV2::PFOR => Box::new(PforDecoder),
2080        EncodingV2::ByteStreamSplit => Box::new(ByteStreamSplitDecoder),
2081        EncodingV2::IncrementalString => Box::new(IncrementalStringDecoder),
2082        EncodingV2::Rle => Box::new(RleDecoder),
2083        EncodingV2::Dictionary => Box::new(DictionaryDecoder),
2084        EncodingV2::Bitpack => Box::new(BitpackDecoder),
2085        // DeltaLength not yet implemented
2086        EncodingV2::DeltaLength => Box::new(PlainDecoder),
2087    }
2088}
2089
2090// ============================================================================
2091// Plain Encoder/Decoder (fallback)
2092// ============================================================================
2093
2094/// Plain encoder (fallback for unsupported types).
2095pub struct PlainEncoder;
2096
2097impl Encoder for PlainEncoder {
2098    fn encode(&self, data: &Column, null_bitmap: Option<&Bitmap>) -> Result<Vec<u8>> {
2099        let mut buf = Vec::new();
2100
2101        // Write bitmap flag and data first
2102        if let Some(bitmap) = null_bitmap {
2103            buf.push(1u8);
2104            buf.extend_from_slice(&bitmap.to_bytes());
2105        } else {
2106            buf.push(0u8);
2107        }
2108
2109        match data {
2110            Column::Int64(values) => {
2111                buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
2112                for v in values {
2113                    buf.extend_from_slice(&v.to_le_bytes());
2114                }
2115            }
2116            Column::Float32(values) => {
2117                buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
2118                for v in values {
2119                    buf.extend_from_slice(&v.to_le_bytes());
2120                }
2121            }
2122            Column::Float64(values) => {
2123                buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
2124                for v in values {
2125                    buf.extend_from_slice(&v.to_le_bytes());
2126                }
2127            }
2128            Column::Bool(values) => {
2129                buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
2130                for v in values {
2131                    buf.push(*v as u8);
2132                }
2133            }
2134            Column::Binary(values) => {
2135                buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
2136                for v in values {
2137                    buf.extend_from_slice(&(v.len() as u32).to_le_bytes());
2138                    buf.extend_from_slice(v);
2139                }
2140            }
2141            Column::Fixed { len, values } => {
2142                buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
2143                buf.extend_from_slice(&(*len as u16).to_le_bytes());
2144                for v in values {
2145                    buf.extend_from_slice(v);
2146                }
2147            }
2148        }
2149
2150        Ok(buf)
2151    }
2152
2153    fn encoding_type(&self) -> EncodingV2 {
2154        EncodingV2::Plain
2155    }
2156}
2157
2158/// Plain decoder (fallback).
2159pub struct PlainDecoder;
2160
2161impl Decoder for PlainDecoder {
2162    fn decode(
2163        &self,
2164        data: &[u8],
2165        _num_values: usize,
2166        logical_type: LogicalType,
2167    ) -> Result<(Column, Option<Bitmap>)> {
2168        if data.is_empty() {
2169            return Err(ColumnarError::InvalidFormat("plain data empty".into()));
2170        }
2171
2172        let has_bitmap = data[0] != 0;
2173        let mut pos = 1;
2174
2175        let bitmap = if has_bitmap {
2176            let bm = Bitmap::from_bytes(&data[pos..])?;
2177            pos += 4 + bm.len().div_ceil(8);
2178            Some(bm)
2179        } else {
2180            None
2181        };
2182
2183        if pos + 4 > data.len() {
2184            return Err(ColumnarError::InvalidFormat("plain count truncated".into()));
2185        }
2186
2187        let count = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
2188        pos += 4;
2189
2190        let column = match logical_type {
2191            LogicalType::Int64 => {
2192                let mut values = Vec::with_capacity(count);
2193                for _ in 0..count {
2194                    if pos + 8 > data.len() {
2195                        return Err(ColumnarError::InvalidFormat("plain int64 truncated".into()));
2196                    }
2197                    values.push(i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()));
2198                    pos += 8;
2199                }
2200                Column::Int64(values)
2201            }
2202            LogicalType::Float32 => {
2203                let mut values = Vec::with_capacity(count);
2204                for _ in 0..count {
2205                    if pos + 4 > data.len() {
2206                        return Err(ColumnarError::InvalidFormat(
2207                            "plain float32 truncated".into(),
2208                        ));
2209                    }
2210                    values.push(f32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()));
2211                    pos += 4;
2212                }
2213                Column::Float32(values)
2214            }
2215            LogicalType::Float64 => {
2216                let mut values = Vec::with_capacity(count);
2217                for _ in 0..count {
2218                    if pos + 8 > data.len() {
2219                        return Err(ColumnarError::InvalidFormat(
2220                            "plain float64 truncated".into(),
2221                        ));
2222                    }
2223                    values.push(f64::from_le_bytes(data[pos..pos + 8].try_into().unwrap()));
2224                    pos += 8;
2225                }
2226                Column::Float64(values)
2227            }
2228            LogicalType::Bool => {
2229                let mut values = Vec::with_capacity(count);
2230                for _ in 0..count {
2231                    if pos >= data.len() {
2232                        return Err(ColumnarError::InvalidFormat("plain bool truncated".into()));
2233                    }
2234                    values.push(data[pos] != 0);
2235                    pos += 1;
2236                }
2237                Column::Bool(values)
2238            }
2239            LogicalType::Binary => {
2240                let mut values = Vec::with_capacity(count);
2241                for _ in 0..count {
2242                    if pos + 4 > data.len() {
2243                        return Err(ColumnarError::InvalidFormat(
2244                            "plain binary len truncated".into(),
2245                        ));
2246                    }
2247                    let len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
2248                    pos += 4;
2249                    if pos + len > data.len() {
2250                        return Err(ColumnarError::InvalidFormat(
2251                            "plain binary data truncated".into(),
2252                        ));
2253                    }
2254                    values.push(data[pos..pos + len].to_vec());
2255                    pos += len;
2256                }
2257                Column::Binary(values)
2258            }
2259            LogicalType::Fixed(fixed_len) => {
2260                if pos + 2 > data.len() {
2261                    return Err(ColumnarError::InvalidFormat(
2262                        "plain fixed len truncated".into(),
2263                    ));
2264                }
2265                let stored_len =
2266                    u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
2267                pos += 2;
2268                if stored_len != fixed_len as usize {
2269                    return Err(ColumnarError::InvalidFormat(
2270                        "plain fixed length mismatch".into(),
2271                    ));
2272                }
2273                let mut values = Vec::with_capacity(count);
2274                for _ in 0..count {
2275                    if pos + stored_len > data.len() {
2276                        return Err(ColumnarError::InvalidFormat(
2277                            "plain fixed data truncated".into(),
2278                        ));
2279                    }
2280                    values.push(data[pos..pos + stored_len].to_vec());
2281                    pos += stored_len;
2282                }
2283                Column::Fixed {
2284                    len: stored_len,
2285                    values,
2286                }
2287            }
2288        };
2289
2290        Ok((column, bitmap))
2291    }
2292}
2293
2294// ============================================================================
2295// Helper functions
2296// ============================================================================
2297
2298/// Zigzag encode a signed integer.
2299#[inline]
2300fn zigzag_encode(n: i64) -> u64 {
2301    ((n << 1) ^ (n >> 63)) as u64
2302}
2303
2304/// Zigzag decode to signed integer.
2305#[inline]
2306fn zigzag_decode(n: u64) -> i64 {
2307    ((n >> 1) as i64) ^ -((n & 1) as i64)
2308}
2309
2310/// Encode a u64 as variable-length integer.
2311fn encode_varint(mut n: u64, buf: &mut Vec<u8>) {
2312    while n >= 0x80 {
2313        buf.push((n as u8) | 0x80);
2314        n >>= 7;
2315    }
2316    buf.push(n as u8);
2317}
2318
2319/// Decode a variable-length integer, returning (value, bytes_read).
2320fn decode_varint(data: &[u8]) -> Result<(u64, usize)> {
2321    let mut result = 0u64;
2322    let mut shift = 0;
2323    for (i, &byte) in data.iter().enumerate() {
2324        result |= ((byte & 0x7F) as u64) << shift;
2325        if byte & 0x80 == 0 {
2326            return Ok((result, i + 1));
2327        }
2328        shift += 7;
2329        if shift >= 64 {
2330            return Err(ColumnarError::InvalidFormat("varint overflow".into()));
2331        }
2332    }
2333    Err(ColumnarError::InvalidFormat("varint truncated".into()))
2334}
2335
2336// ============================================================================
2337// Tests
2338// ============================================================================
2339
2340#[cfg(all(test, not(target_arch = "wasm32")))]
2341mod tests {
2342    use super::*;
2343
2344    #[test]
2345    fn test_delta_encoding_sorted_integers() {
2346        let values = vec![100i64, 105, 110, 115, 120, 125];
2347        let col = Column::Int64(values.clone());
2348
2349        let encoder = DeltaEncoder;
2350        let encoded = encoder.encode(&col, None).unwrap();
2351
2352        let decoder = DeltaDecoder;
2353        let (decoded, bitmap) = decoder
2354            .decode(&encoded, values.len(), LogicalType::Int64)
2355            .unwrap();
2356
2357        assert!(bitmap.is_none());
2358        if let Column::Int64(decoded_values) = decoded {
2359            assert_eq!(decoded_values, values);
2360        } else {
2361            panic!("Expected Int64 column");
2362        }
2363    }
2364
2365    #[test]
2366    fn test_for_encoding_small_range() {
2367        let values = vec![1000i64, 1005, 1002, 1008, 1001, 1007];
2368        let col = Column::Int64(values.clone());
2369
2370        let encoder = ForEncoder;
2371        let encoded = encoder.encode(&col, None).unwrap();
2372
2373        let decoder = ForDecoder;
2374        let (decoded, bitmap) = decoder
2375            .decode(&encoded, values.len(), LogicalType::Int64)
2376            .unwrap();
2377
2378        assert!(bitmap.is_none());
2379        if let Column::Int64(decoded_values) = decoded {
2380            assert_eq!(decoded_values, values);
2381        } else {
2382            panic!("Expected Int64 column");
2383        }
2384    }
2385
2386    #[test]
2387    fn test_pfor_encoding_with_outliers() {
2388        // Most values in small range, with some outliers
2389        let mut values = vec![10i64, 12, 11, 15, 13, 14, 10, 11];
2390        values.push(1000000); // outlier
2391        values.push(12);
2392        values.push(2000000); // outlier
2393        let col = Column::Int64(values.clone());
2394
2395        let encoder = PforEncoder::default();
2396        let encoded = encoder.encode(&col, None).unwrap();
2397
2398        let decoder = PforDecoder;
2399        let (decoded, bitmap) = decoder
2400            .decode(&encoded, values.len(), LogicalType::Int64)
2401            .unwrap();
2402
2403        assert!(bitmap.is_none());
2404        if let Column::Int64(decoded_values) = decoded {
2405            assert_eq!(decoded_values, values);
2406        } else {
2407            panic!("Expected Int64 column");
2408        }
2409    }
2410
2411    #[test]
2412    fn test_byte_stream_split_floats() {
2413        let values = vec![1.5f64, 2.7, std::f64::consts::PI, 4.0, 5.5];
2414        let col = Column::Float64(values.clone());
2415
2416        let encoder = ByteStreamSplitEncoder;
2417        let encoded = encoder.encode(&col, None).unwrap();
2418
2419        let decoder = ByteStreamSplitDecoder;
2420        let (decoded, bitmap) = decoder
2421            .decode(&encoded, values.len(), LogicalType::Float64)
2422            .unwrap();
2423
2424        assert!(bitmap.is_none());
2425        if let Column::Float64(decoded_values) = decoded {
2426            assert_eq!(decoded_values, values);
2427        } else {
2428            panic!("Expected Float64 column");
2429        }
2430    }
2431
2432    #[test]
2433    fn test_byte_stream_split_f32_roundtrip() {
2434        let values = vec![1.0f32, -0.5, 3.25, 0.0, std::f32::consts::PI];
2435        let col = Column::Float32(values.clone());
2436
2437        let encoder = ByteStreamSplitEncoder;
2438        let encoded = encoder.encode(&col, None).unwrap();
2439
2440        let decoder = ByteStreamSplitDecoder;
2441        let (decoded, bitmap) = decoder
2442            .decode(&encoded, values.len(), LogicalType::Float32)
2443            .unwrap();
2444
2445        assert!(bitmap.is_none());
2446        if let Column::Float32(decoded_values) = decoded {
2447            assert_eq!(decoded_values, values);
2448        } else {
2449            panic!("Expected Float32 column");
2450        }
2451    }
2452
2453    #[cfg(feature = "compression-zstd")]
2454    #[test]
2455    fn test_byte_stream_split_f32_compression_ratio() {
2456        use rand::{rngs::StdRng, Rng, SeedableRng};
2457        use std::time::Instant;
2458
2459        let mut rng = StdRng::seed_from_u64(42);
2460        let mut values = Vec::with_capacity(100_000);
2461        while values.len() < 100_000 {
2462            // Box-Muller で正規分布(平均0, 分散1)を生成
2463            let u1: f32 = rng.gen::<f32>().max(f32::MIN_POSITIVE);
2464            let u2: f32 = rng.gen::<f32>();
2465            let r = (-2.0 * u1.ln()).sqrt();
2466            let theta = 2.0 * std::f32::consts::PI * u2;
2467            values.push(r * theta.cos());
2468            if values.len() < 100_000 {
2469                values.push(r * theta.sin());
2470            }
2471        }
2472
2473        let mut msb_counts = std::collections::HashMap::new();
2474        for v in &values {
2475            *msb_counts
2476                .entry((v.to_bits() >> 24) as u8)
2477                .or_insert(0usize) += 1;
2478        }
2479        let mut top = msb_counts.into_iter().collect::<Vec<_>>();
2480        top.sort_by(|a, b| b.1.cmp(&a.1));
2481        println!("Top MSB bytes: {:?}", &top[..5.min(top.len())]);
2482
2483        let col = Column::Float32(values.clone());
2484        let encoder = ByteStreamSplitEncoder;
2485        let t_enc_start = Instant::now();
2486        let encoded = encoder.encode(&col, None).unwrap();
2487        let enc_ms = t_enc_start.elapsed().as_secs_f64() * 1e3;
2488
2489        let _header = encoded[4];
2490        let payload_offset = 6;
2491        println!(
2492            "ByteStreamSplit payload length: {}",
2493            encoded.len().saturating_sub(payload_offset)
2494        );
2495
2496        let compressed = zstd::stream::encode_all(std::io::Cursor::new(&encoded), 3)
2497            .expect("zstd compression should succeed");
2498        let t_dec_start = Instant::now();
2499        let decoder = ByteStreamSplitDecoder;
2500        let (decoded, _) = decoder
2501            .decode(&encoded, values.len(), LogicalType::Float32)
2502            .unwrap();
2503        let dec_ms = t_dec_start.elapsed().as_secs_f64() * 1e3;
2504
2505        let raw_len = (values.len() * std::mem::size_of::<f32>()) as f32;
2506        let ratio = compressed.len() as f32 / raw_len;
2507
2508        // 完全性チェック(ロスなし)
2509        assert_eq!(decoded, Column::Float32(values.clone()));
2510
2511        assert!(
2512            ratio < 0.86,
2513            "expected >=14% reduction, got ratio {:.3}",
2514            ratio
2515        );
2516        // パフォーマンス目安(広めに設定)
2517        if std::env::var("ALOPEX_SKIP_PERF_TESTS").is_err() {
2518            assert!(
2519                enc_ms < 220.0,
2520                "encode too slow: {:.2}ms (target <220ms)",
2521                enc_ms
2522            );
2523            assert!(
2524                dec_ms < 30.0,
2525                "decode too slow: {:.2}ms (target <30ms)",
2526                dec_ms
2527            );
2528        }
2529        println!("encode_ms={:.2} decode_ms={:.2}", enc_ms, dec_ms);
2530    }
2531
2532    #[cfg(any(feature = "compression-lz4", feature = "compression-zstd"))]
2533    #[test]
2534    fn bench_byte_stream_split_layout_variants() {
2535        use rand::{rngs::StdRng, Rng, SeedableRng};
2536
2537        #[derive(Clone, Copy)]
2538        struct LayoutConfig {
2539            name: &'static str,
2540            block: Option<usize>,
2541            block_bitshuffle: Option<usize>,
2542            msb_first: bool,
2543            xor_delta: bool,
2544            delta_xor: bool,
2545            sign_split: bool,
2546            per_stream_lz4: bool,
2547            per_stream_zstd: bool,
2548            outer_zstd: bool,
2549            bitshuffle: bool,
2550            exponent_split: bool,
2551            exponent_rle: bool,
2552            exponent_delta: bool,
2553            fpc_predict: bool,
2554        }
2555
2556        struct VariantEncoded {
2557            sign_bytes: Vec<u8>,
2558            streams: Vec<Vec<u8>>,
2559            payload_concat: Vec<u8>, // payload laid out as we would store (sign bytes + stream payloads)
2560        }
2561
2562        let mut rng = StdRng::seed_from_u64(42);
2563        let mut values = Vec::with_capacity(100_000);
2564        while values.len() < 100_000 {
2565            let u1: f32 = rng.gen::<f32>().max(f32::MIN_POSITIVE);
2566            let u2: f32 = rng.gen::<f32>();
2567            let r = (-2.0 * u1.ln()).sqrt();
2568            let theta = 2.0 * std::f32::consts::PI * u2;
2569            values.push(r * theta.cos());
2570            if values.len() < 100_000 {
2571                values.push(r * theta.sin());
2572            }
2573        }
2574
2575        let raw_len = (values.len() * std::mem::size_of::<f32>()) as f32;
2576
2577        let configs = [
2578            LayoutConfig {
2579                name: "legacy_lsb",
2580                block: None,
2581                block_bitshuffle: None,
2582                msb_first: false,
2583                xor_delta: false,
2584                delta_xor: false,
2585                sign_split: false,
2586                per_stream_lz4: false,
2587                per_stream_zstd: false,
2588                outer_zstd: false,
2589                bitshuffle: false,
2590                exponent_split: false,
2591                exponent_rle: false,
2592                exponent_delta: false,
2593                fpc_predict: false,
2594            },
2595            LayoutConfig {
2596                name: "msb_block_256",
2597                block: Some(256),
2598                block_bitshuffle: None,
2599                msb_first: true,
2600                xor_delta: false,
2601                delta_xor: false,
2602                sign_split: false,
2603                per_stream_lz4: false,
2604                per_stream_zstd: false,
2605                outer_zstd: false,
2606                bitshuffle: false,
2607                exponent_split: false,
2608                exponent_rle: false,
2609                exponent_delta: false,
2610                fpc_predict: false,
2611            },
2612            LayoutConfig {
2613                name: "msb_block_1024",
2614                block: Some(1024),
2615                block_bitshuffle: None,
2616                msb_first: true,
2617                xor_delta: false,
2618                delta_xor: false,
2619                sign_split: false,
2620                per_stream_lz4: false,
2621                per_stream_zstd: false,
2622                outer_zstd: false,
2623                bitshuffle: false,
2624                exponent_split: false,
2625                exponent_rle: false,
2626                exponent_delta: false,
2627                fpc_predict: false,
2628            },
2629            LayoutConfig {
2630                name: "msb_block_256_xor",
2631                block: Some(256),
2632                block_bitshuffle: None,
2633                msb_first: true,
2634                xor_delta: true,
2635                delta_xor: false,
2636                sign_split: false,
2637                per_stream_lz4: false,
2638                per_stream_zstd: false,
2639                outer_zstd: false,
2640                bitshuffle: false,
2641                exponent_split: false,
2642                exponent_rle: false,
2643                exponent_delta: false,
2644                fpc_predict: false,
2645            },
2646            LayoutConfig {
2647                name: "sign_split_msb_256_outer",
2648                block: Some(256),
2649                block_bitshuffle: None,
2650                msb_first: true,
2651                xor_delta: false,
2652                delta_xor: false,
2653                sign_split: true,
2654                per_stream_lz4: false,
2655                per_stream_zstd: false,
2656                outer_zstd: false,
2657                bitshuffle: false,
2658                exponent_split: false,
2659                exponent_rle: false,
2660                exponent_delta: false,
2661                fpc_predict: false,
2662            },
2663            LayoutConfig {
2664                name: "sign_split_msb_256_per_stream_lz4",
2665                block: Some(256),
2666                block_bitshuffle: None,
2667                msb_first: true,
2668                xor_delta: false,
2669                delta_xor: false,
2670                sign_split: true,
2671                per_stream_lz4: true,
2672                per_stream_zstd: false,
2673                outer_zstd: false,
2674                bitshuffle: false,
2675                exponent_split: false,
2676                exponent_rle: false,
2677                exponent_delta: false,
2678                fpc_predict: false,
2679            },
2680            LayoutConfig {
2681                name: "sign_split_msb_128_per_stream_lz4",
2682                block: Some(128),
2683                block_bitshuffle: None,
2684                msb_first: true,
2685                xor_delta: false,
2686                delta_xor: false,
2687                sign_split: true,
2688                per_stream_lz4: true,
2689                per_stream_zstd: false,
2690                outer_zstd: false,
2691                bitshuffle: false,
2692                exponent_split: false,
2693                exponent_rle: false,
2694                exponent_delta: false,
2695                fpc_predict: false,
2696            },
2697            LayoutConfig {
2698                name: "sign_split_msb_64_per_stream_lz4",
2699                block: Some(64),
2700                block_bitshuffle: None,
2701                msb_first: true,
2702                xor_delta: false,
2703                delta_xor: false,
2704                sign_split: true,
2705                per_stream_lz4: true,
2706                per_stream_zstd: false,
2707                outer_zstd: false,
2708                bitshuffle: false,
2709                exponent_split: false,
2710                exponent_rle: false,
2711                exponent_delta: false,
2712                fpc_predict: false,
2713            },
2714            LayoutConfig {
2715                name: "sign_split_msb_256_per_stream_lz4_delta",
2716                block: Some(256),
2717                block_bitshuffle: None,
2718                msb_first: true,
2719                xor_delta: false,
2720                delta_xor: true,
2721                sign_split: true,
2722                per_stream_lz4: true,
2723                per_stream_zstd: false,
2724                outer_zstd: false,
2725                bitshuffle: false,
2726                exponent_split: false,
2727                exponent_rle: false,
2728                exponent_delta: false,
2729                fpc_predict: false,
2730            },
2731            LayoutConfig {
2732                name: "sign_split_bitshuffle",
2733                block: None,
2734                block_bitshuffle: None,
2735                msb_first: false,
2736                xor_delta: false,
2737                delta_xor: false,
2738                sign_split: true,
2739                per_stream_lz4: true,
2740                per_stream_zstd: false,
2741                outer_zstd: false,
2742                bitshuffle: true,
2743                exponent_split: false,
2744                exponent_rle: false,
2745                exponent_delta: false,
2746                fpc_predict: false,
2747            },
2748            LayoutConfig {
2749                name: "sign_split_bitshuffle_per_stream_zstd",
2750                block: None,
2751                block_bitshuffle: None,
2752                msb_first: false,
2753                xor_delta: false,
2754                delta_xor: false,
2755                sign_split: true,
2756                per_stream_lz4: false,
2757                per_stream_zstd: true,
2758                outer_zstd: false,
2759                bitshuffle: true,
2760                exponent_split: false,
2761                exponent_rle: false,
2762                exponent_delta: false,
2763                fpc_predict: false,
2764            },
2765            LayoutConfig {
2766                name: "sign_split_bitshuffle_outer_only",
2767                block: None,
2768                block_bitshuffle: None,
2769                msb_first: false,
2770                xor_delta: false,
2771                delta_xor: false,
2772                sign_split: true,
2773                per_stream_lz4: false,
2774                per_stream_zstd: false,
2775                outer_zstd: false,
2776                bitshuffle: true,
2777                exponent_split: false,
2778                exponent_rle: false,
2779                exponent_delta: false,
2780                fpc_predict: false,
2781            },
2782            LayoutConfig {
2783                name: "sign_split_exp_split_msb_256",
2784                block: Some(256),
2785                block_bitshuffle: None,
2786                msb_first: true,
2787                xor_delta: false,
2788                delta_xor: false,
2789                sign_split: true,
2790                per_stream_lz4: true,
2791                per_stream_zstd: false,
2792                outer_zstd: false,
2793                bitshuffle: false,
2794                exponent_split: true,
2795                exponent_rle: false,
2796                exponent_delta: false,
2797                fpc_predict: false,
2798            },
2799            LayoutConfig {
2800                name: "sign_split_exp_split_msb_256_delta",
2801                block: Some(256),
2802                block_bitshuffle: None,
2803                msb_first: true,
2804                xor_delta: false,
2805                delta_xor: true,
2806                sign_split: true,
2807                per_stream_lz4: true,
2808                per_stream_zstd: false,
2809                outer_zstd: false,
2810                bitshuffle: false,
2811                exponent_split: true,
2812                exponent_rle: false,
2813                exponent_delta: true,
2814                fpc_predict: false,
2815            },
2816            LayoutConfig {
2817                name: "sign_split_exp_bitshuffle",
2818                block: None,
2819                block_bitshuffle: None,
2820                msb_first: false,
2821                xor_delta: false,
2822                delta_xor: false,
2823                sign_split: true,
2824                per_stream_lz4: true,
2825                per_stream_zstd: false,
2826                outer_zstd: false,
2827                bitshuffle: true,
2828                exponent_split: true,
2829                exponent_rle: false,
2830                exponent_delta: false,
2831                fpc_predict: false,
2832            },
2833            LayoutConfig {
2834                name: "sign_split_exp_bitshuffle_per_stream_zstd",
2835                block: None,
2836                block_bitshuffle: None,
2837                msb_first: false,
2838                xor_delta: false,
2839                delta_xor: false,
2840                sign_split: true,
2841                per_stream_lz4: false,
2842                per_stream_zstd: true,
2843                outer_zstd: false,
2844                bitshuffle: true,
2845                exponent_split: true,
2846                exponent_rle: false,
2847                exponent_delta: false,
2848                fpc_predict: false,
2849            },
2850            LayoutConfig {
2851                name: "sign_split_exp_bitshuffle_outer_zstd",
2852                block: None,
2853                block_bitshuffle: None,
2854                msb_first: false,
2855                xor_delta: false,
2856                delta_xor: false,
2857                sign_split: true,
2858                per_stream_lz4: false,
2859                per_stream_zstd: false,
2860                outer_zstd: true,
2861                bitshuffle: true,
2862                exponent_split: true,
2863                exponent_rle: false,
2864                exponent_delta: false,
2865                fpc_predict: false,
2866            },
2867            LayoutConfig {
2868                name: "sign_split_exp_rle_bitshuffle",
2869                block: None,
2870                block_bitshuffle: None,
2871                msb_first: false,
2872                xor_delta: false,
2873                delta_xor: false,
2874                sign_split: true,
2875                per_stream_lz4: true,
2876                per_stream_zstd: false,
2877                outer_zstd: false,
2878                bitshuffle: true,
2879                exponent_split: true,
2880                exponent_rle: true,
2881                exponent_delta: false,
2882                fpc_predict: false,
2883            },
2884            LayoutConfig {
2885                name: "sign_split_delta_msb_256",
2886                block: Some(256),
2887                block_bitshuffle: None,
2888                msb_first: true,
2889                xor_delta: false,
2890                delta_xor: true,
2891                sign_split: true,
2892                per_stream_lz4: true,
2893                per_stream_zstd: false,
2894                outer_zstd: false,
2895                bitshuffle: false,
2896                exponent_split: false,
2897                exponent_rle: false,
2898                exponent_delta: false,
2899                fpc_predict: false,
2900            },
2901            LayoutConfig {
2902                name: "sign_split_delta_bitshuffle",
2903                block: None,
2904                block_bitshuffle: None,
2905                msb_first: false,
2906                xor_delta: false,
2907                delta_xor: true,
2908                sign_split: true,
2909                per_stream_lz4: true,
2910                per_stream_zstd: false,
2911                outer_zstd: false,
2912                bitshuffle: true,
2913                exponent_split: false,
2914                exponent_rle: false,
2915                exponent_delta: false,
2916                fpc_predict: false,
2917            },
2918            LayoutConfig {
2919                name: "sign_split_delta_bitshuffle_per_stream_zstd",
2920                block: None,
2921                block_bitshuffle: None,
2922                msb_first: false,
2923                xor_delta: false,
2924                delta_xor: true,
2925                sign_split: true,
2926                per_stream_lz4: false,
2927                per_stream_zstd: true,
2928                outer_zstd: false,
2929                bitshuffle: true,
2930                exponent_split: false,
2931                exponent_rle: false,
2932                exponent_delta: false,
2933                fpc_predict: false,
2934            },
2935            LayoutConfig {
2936                name: "sign_split_exp_delta_bitshuffle",
2937                block: None,
2938                block_bitshuffle: None,
2939                msb_first: false,
2940                xor_delta: false,
2941                delta_xor: true,
2942                sign_split: true,
2943                per_stream_lz4: true,
2944                per_stream_zstd: false,
2945                outer_zstd: false,
2946                bitshuffle: true,
2947                exponent_split: true,
2948                exponent_rle: false,
2949                exponent_delta: true,
2950                fpc_predict: false,
2951            },
2952            LayoutConfig {
2953                name: "sign_split_exp_delta_bitshuffle_per_stream_zstd",
2954                block: None,
2955                block_bitshuffle: None,
2956                msb_first: false,
2957                xor_delta: false,
2958                delta_xor: true,
2959                sign_split: true,
2960                per_stream_lz4: false,
2961                per_stream_zstd: true,
2962                outer_zstd: false,
2963                bitshuffle: true,
2964                exponent_split: true,
2965                exponent_rle: false,
2966                exponent_delta: true,
2967                fpc_predict: false,
2968            },
2969            LayoutConfig {
2970                name: "sign_split_bitshuffle_block256",
2971                block: None,
2972                block_bitshuffle: Some(256),
2973                msb_first: false,
2974                xor_delta: false,
2975                delta_xor: false,
2976                sign_split: true,
2977                per_stream_lz4: true,
2978                per_stream_zstd: false,
2979                outer_zstd: false,
2980                bitshuffle: true,
2981                exponent_split: false,
2982                exponent_rle: false,
2983                exponent_delta: false,
2984                fpc_predict: false,
2985            },
2986            LayoutConfig {
2987                name: "sign_split_bitshuffle_block256_per_stream_zstd",
2988                block: None,
2989                block_bitshuffle: Some(256),
2990                msb_first: false,
2991                xor_delta: false,
2992                delta_xor: false,
2993                sign_split: true,
2994                per_stream_lz4: false,
2995                per_stream_zstd: true,
2996                outer_zstd: false,
2997                bitshuffle: true,
2998                exponent_split: false,
2999                exponent_rle: false,
3000                exponent_delta: false,
3001                fpc_predict: false,
3002            },
3003            LayoutConfig {
3004                name: "sign_split_bitshuffle_block1024",
3005                block: None,
3006                block_bitshuffle: Some(1024),
3007                msb_first: false,
3008                xor_delta: false,
3009                delta_xor: false,
3010                sign_split: true,
3011                per_stream_lz4: true,
3012                per_stream_zstd: false,
3013                outer_zstd: false,
3014                bitshuffle: true,
3015                exponent_split: false,
3016                exponent_rle: false,
3017                exponent_delta: false,
3018                fpc_predict: false,
3019            },
3020            LayoutConfig {
3021                name: "sign_split_exp_bitshuffle_block256",
3022                block: None,
3023                block_bitshuffle: Some(256),
3024                msb_first: false,
3025                xor_delta: false,
3026                delta_xor: false,
3027                sign_split: true,
3028                per_stream_lz4: true,
3029                per_stream_zstd: false,
3030                outer_zstd: false,
3031                bitshuffle: true,
3032                exponent_split: true,
3033                exponent_rle: false,
3034                exponent_delta: false,
3035                fpc_predict: false,
3036            },
3037            LayoutConfig {
3038                name: "sign_split_exp_bitshuffle_block256_per_stream_zstd",
3039                block: None,
3040                block_bitshuffle: Some(256),
3041                msb_first: false,
3042                xor_delta: false,
3043                delta_xor: false,
3044                sign_split: true,
3045                per_stream_lz4: false,
3046                per_stream_zstd: true,
3047                outer_zstd: false,
3048                bitshuffle: true,
3049                exponent_split: true,
3050                exponent_rle: false,
3051                exponent_delta: false,
3052                fpc_predict: false,
3053            },
3054            LayoutConfig {
3055                name: "sign_split_msb_256_per_stream_zstd",
3056                block: Some(256),
3057                block_bitshuffle: None,
3058                msb_first: true,
3059                xor_delta: false,
3060                delta_xor: false,
3061                sign_split: true,
3062                per_stream_lz4: false,
3063                per_stream_zstd: true,
3064                outer_zstd: false,
3065                bitshuffle: false,
3066                exponent_split: false,
3067                exponent_rle: false,
3068                exponent_delta: false,
3069                fpc_predict: false,
3070            },
3071            LayoutConfig {
3072                name: "sign_split_msb_256_outer_zstd",
3073                block: Some(256),
3074                block_bitshuffle: None,
3075                msb_first: true,
3076                xor_delta: false,
3077                delta_xor: false,
3078                sign_split: true,
3079                per_stream_lz4: false,
3080                per_stream_zstd: false,
3081                outer_zstd: true,
3082                bitshuffle: false,
3083                exponent_split: false,
3084                exponent_rle: false,
3085                exponent_delta: false,
3086                fpc_predict: false,
3087            },
3088            LayoutConfig {
3089                name: "sign_split_fpc_predict_per_stream_zstd",
3090                block: None,
3091                block_bitshuffle: None,
3092                msb_first: false,
3093                xor_delta: false,
3094                delta_xor: false,
3095                sign_split: true,
3096                per_stream_lz4: false,
3097                per_stream_zstd: true,
3098                outer_zstd: false,
3099                bitshuffle: false,
3100                exponent_split: false,
3101                exponent_rle: false,
3102                exponent_delta: false,
3103                fpc_predict: true,
3104            },
3105        ];
3106        #[derive(Default)]
3107        struct Timings {
3108            encode_ms: f64,
3109            decode_ms: f64,
3110        }
3111
3112        #[cfg(feature = "compression-lz4")]
3113        #[allow(dead_code)]
3114        fn decompress_lz4(payload: &[u8], orig_len: usize) -> Vec<u8> {
3115            let orig_len_i32: i32 = orig_len.try_into().unwrap();
3116            lz4::block::decompress(payload, Some(orig_len_i32)).unwrap()
3117        }
3118
3119        #[cfg(feature = "compression-zstd")]
3120        #[allow(dead_code)]
3121        fn decompress_zstd(payload: &[u8]) -> Vec<u8> {
3122            zstd::stream::decode_all(std::io::Cursor::new(payload)).unwrap()
3123        }
3124
3125        fn bitshuffle_u32(values: &[u32]) -> Vec<u8> {
3126            let count = values.len();
3127            let bytes_per_plane = count.div_ceil(8);
3128            let mut out = vec![0u8; bytes_per_plane * 32];
3129            for bit in 0..32 {
3130                let base = bit * bytes_per_plane;
3131                for (idx, &v) in values.iter().enumerate() {
3132                    if (v >> bit) & 1 != 0 {
3133                        out[base + idx / 8] |= 1 << (idx % 8);
3134                    }
3135                }
3136            }
3137            out
3138        }
3139
3140        fn bitshuffle_block_u32(values: &[u32], block: usize) -> Vec<u8> {
3141            let mut out = Vec::with_capacity(values.len().div_ceil(8) * 32);
3142            let mut offset = 0;
3143            while offset < values.len() {
3144                let len = (values.len() - offset).min(block);
3145                let bytes_per_plane = len.div_ceil(8);
3146                out.resize(out.len() + bytes_per_plane * 32, 0);
3147                let base_out = out.len() - bytes_per_plane * 32;
3148                for bit in 0..32 {
3149                    let plane_base = base_out + bit * bytes_per_plane;
3150                    for idx in 0..len {
3151                        if (values[offset + idx] >> bit) & 1 != 0 {
3152                            out[plane_base + idx / 8] |= 1 << (idx % 8);
3153                        }
3154                    }
3155                }
3156                offset += len;
3157            }
3158            out
3159        }
3160
3161        fn bitunshuffle_block_u32(data: &[u8], count: usize, block: usize) -> Vec<u32> {
3162            let mut values = vec![0u32; count];
3163            let mut offset = 0;
3164            let mut data_offset = 0;
3165            while offset < count {
3166                let len = (count - offset).min(block);
3167                let bytes_per_plane = len.div_ceil(8);
3168                for bit in 0..32 {
3169                    let plane_base = data_offset + bit * bytes_per_plane;
3170                    for idx in 0..len {
3171                        if (data[plane_base + idx / 8] >> (idx % 8)) & 1 != 0 {
3172                            values[offset + idx] |= 1u32 << bit;
3173                        }
3174                    }
3175                }
3176                data_offset += bytes_per_plane * 32;
3177                offset += len;
3178            }
3179            values
3180        }
3181
3182        fn rle_encode_u8(values: &[u8]) -> Vec<u8> {
3183            let mut out = Vec::new();
3184            if values.is_empty() {
3185                return out;
3186            }
3187            let mut cur = values[0];
3188            let mut run: u16 = 1;
3189            for &v in &values[1..] {
3190                if v == cur && run < u16::MAX {
3191                    run += 1;
3192                } else {
3193                    out.push(cur);
3194                    out.extend_from_slice(&run.to_le_bytes());
3195                    cur = v;
3196                    run = 1;
3197                }
3198            }
3199            out.push(cur);
3200            out.extend_from_slice(&run.to_le_bytes());
3201            out
3202        }
3203
3204        fn rle_decode_u8(bytes: &[u8], expected_len: usize) -> Vec<u8> {
3205            let mut out = Vec::with_capacity(expected_len);
3206            let mut i = 0;
3207            while i + 3 <= bytes.len() {
3208                let val = bytes[i];
3209                let run = u16::from_le_bytes([bytes[i + 1], bytes[i + 2]]) as usize;
3210                i += 3;
3211                for _ in 0..run {
3212                    out.push(val);
3213                }
3214                if out.len() >= expected_len {
3215                    break;
3216                }
3217            }
3218            out
3219        }
3220
3221        fn delta_xor_u32(values: &mut [u32]) {
3222            let mut prev = 0u32;
3223            for v in values.iter_mut() {
3224                let cur = *v;
3225                *v ^= prev;
3226                prev = cur;
3227            }
3228        }
3229
3230        fn inv_delta_xor_u32(values: &mut [u32]) {
3231            let mut prev = 0u32;
3232            for v in values.iter_mut() {
3233                let cur = *v ^ prev;
3234                prev = cur;
3235                *v = cur;
3236            }
3237        }
3238
3239        fn delta_xor_u8(values: &mut [u8]) {
3240            let mut prev = 0u8;
3241            for v in values.iter_mut() {
3242                let cur = *v;
3243                *v ^= prev;
3244                prev = cur;
3245            }
3246        }
3247
3248        fn inv_delta_xor_u8(values: &mut [u8]) {
3249            let mut prev = 0u8;
3250            for v in values.iter_mut() {
3251                let cur = *v ^ prev;
3252                prev = cur;
3253                *v = cur;
3254            }
3255        }
3256
3257        fn encode_variant(
3258            values: &[f32],
3259            cfg: LayoutConfig,
3260            timings: &mut Timings,
3261        ) -> VariantEncoded {
3262            let start = std::time::Instant::now();
3263            let mut sign_bytes = if cfg.sign_split {
3264                vec![0u8; values.len().div_ceil(8)]
3265            } else {
3266                Vec::new()
3267            };
3268
3269            let bytes_per_value = 4;
3270            let num_values = values.len();
3271            let mut streams: Vec<Vec<u8>> = Vec::new();
3272
3273            if cfg.fpc_predict {
3274                // FPC-like predictor: pred = 2*prev - prev2, store xor(diff) truncated by leading zero bytes
3275                let mut len_stream = Vec::with_capacity(num_values);
3276                let mut payload = Vec::with_capacity(num_values * 2); // heuristic
3277                let mut prev1: u32 = 0;
3278                let mut prev2: u32 = 0;
3279                for (idx, v) in values.iter().enumerate() {
3280                    let mut bits = v.to_bits();
3281                    if cfg.sign_split {
3282                        if bits & 0x8000_0000 != 0 {
3283                            sign_bytes[idx / 8] |= 1 << (idx % 8);
3284                        }
3285                        bits &= 0x7fff_ffff;
3286                    }
3287                    let pred = prev1.wrapping_add(prev1.wrapping_sub(prev2));
3288                    let diff = bits ^ pred;
3289                    let lz_bytes = (diff.leading_zeros() / 8) as u8;
3290                    let lz_clamped = lz_bytes.min(3); // 0..3 -> sig_len 1..4, diff==0 handled by lz=4 equivalent
3291                    let sig_len = if diff == 0 {
3292                        0
3293                    } else {
3294                        4 - lz_clamped as usize
3295                    };
3296                    len_stream.push(sig_len as u8);
3297                    if sig_len > 0 {
3298                        let be = diff.to_be_bytes();
3299                        payload.extend_from_slice(&be[4 - sig_len..]);
3300                    }
3301                    prev2 = prev1;
3302                    prev1 = bits;
3303                }
3304                streams.push(len_stream);
3305                streams.push(payload);
3306            } else if cfg.exponent_split {
3307                // Split exponent (1 byte) + mantissa (24 bits) with optional bitshuffle
3308                let mut exp_stream = Vec::with_capacity(num_values);
3309                let mut mant = Vec::with_capacity(num_values);
3310                for (idx, v) in values.iter().enumerate() {
3311                    let mut bits = v.to_bits();
3312                    if cfg.sign_split {
3313                        if bits & 0x8000_0000 != 0 {
3314                            sign_bytes[idx / 8] |= 1 << (idx % 8);
3315                        }
3316                        bits &= 0x7fff_ffff;
3317                    }
3318                    let exp = ((bits >> 23) & 0xFF) as u8;
3319                    let mantissa = bits & 0x7F_FFFF;
3320                    exp_stream.push(exp);
3321                    mant.push(mantissa);
3322                }
3323                if cfg.exponent_delta {
3324                    delta_xor_u8(&mut exp_stream);
3325                }
3326                if cfg.delta_xor {
3327                    delta_xor_u32(&mut mant);
3328                }
3329                if cfg.bitshuffle {
3330                    let mant_stream = if let Some(block) = cfg.block_bitshuffle {
3331                        bitshuffle_block_u32(&mant, block)
3332                    } else {
3333                        bitshuffle_u32(&mant)
3334                    };
3335                    if cfg.exponent_rle {
3336                        streams.push(rle_encode_u8(&exp_stream));
3337                    } else {
3338                        streams.push(exp_stream);
3339                    }
3340                    streams.push(mant_stream);
3341                } else {
3342                    if cfg.exponent_rle {
3343                        streams.push(rle_encode_u8(&exp_stream));
3344                    } else {
3345                        streams.push(exp_stream);
3346                    }
3347                    let mut mant_bytes = Vec::with_capacity(num_values * 3);
3348                    for m in mant {
3349                        let bytes = m.to_le_bytes();
3350                        mant_bytes.extend_from_slice(&bytes[0..3]);
3351                    }
3352                    streams.push(mant_bytes);
3353                }
3354            } else if cfg.bitshuffle {
3355                // Bitshuffle over mantissa+exponent (sign stripped if configured)
3356                let mut bits_vec = Vec::with_capacity(num_values);
3357                for (idx, v) in values.iter().enumerate() {
3358                    let mut bits = v.to_bits();
3359                    if cfg.sign_split {
3360                        if bits & 0x8000_0000 != 0 {
3361                            sign_bytes[idx / 8] |= 1 << (idx % 8);
3362                        }
3363                        bits &= 0x7fff_ffff;
3364                    }
3365                    bits_vec.push(bits);
3366                }
3367                if cfg.delta_xor {
3368                    delta_xor_u32(&mut bits_vec);
3369                }
3370                let out = if let Some(block) = cfg.block_bitshuffle {
3371                    bitshuffle_block_u32(&bits_vec, block)
3372                } else {
3373                    bitshuffle_u32(&bits_vec)
3374                };
3375                streams.push(out);
3376            } else {
3377                let mut vals_u32 = Vec::with_capacity(num_values);
3378                for (idx, v) in values.iter().enumerate() {
3379                    let mut bits = v.to_bits();
3380                    if cfg.sign_split {
3381                        if bits & 0x8000_0000 != 0 {
3382                            sign_bytes[idx / 8] |= 1 << (idx % 8);
3383                        }
3384                        bits &= 0x7fff_ffff;
3385                    }
3386                    vals_u32.push(bits);
3387                }
3388                if cfg.delta_xor {
3389                    delta_xor_u32(&mut vals_u32);
3390                }
3391                let mut raw_bytes = Vec::with_capacity(num_values * bytes_per_value);
3392                for bits in vals_u32 {
3393                    raw_bytes.extend_from_slice(&bits.to_le_bytes());
3394                }
3395                streams = (0..bytes_per_value)
3396                    .map(|_| Vec::with_capacity(num_values))
3397                    .collect();
3398                let order: Vec<usize> = if cfg.exponent_split && bytes_per_value == 4 {
3399                    vec![3, 2, 1, 0] // explicit MSB->LSB, exponent first
3400                } else if cfg.msb_first {
3401                    (0..bytes_per_value).rev().collect()
3402                } else {
3403                    (0..bytes_per_value).collect()
3404                };
3405
3406                let mut offset = 0;
3407                while offset < num_values {
3408                    let block_len = cfg.block.unwrap_or(num_values).min(num_values - offset);
3409                    for (stream_idx, byte_idx) in order.iter().enumerate() {
3410                        let stream = &mut streams[stream_idx];
3411                        let start = offset * bytes_per_value + byte_idx;
3412                        for value_idx in 0..block_len {
3413                            stream.push(raw_bytes[start + value_idx * bytes_per_value]);
3414                        }
3415                    }
3416                    offset += block_len;
3417                }
3418
3419                if cfg.xor_delta {
3420                    for stream in streams.iter_mut() {
3421                        let mut prev = 0u8;
3422                        for b in stream.iter_mut() {
3423                            let cur = *b;
3424                            *b ^= prev;
3425                            prev = cur;
3426                        }
3427                    }
3428                }
3429            }
3430
3431            let mut payload_concat =
3432                Vec::with_capacity(sign_bytes.len() + num_values * bytes_per_value);
3433            payload_concat.extend_from_slice(&sign_bytes);
3434
3435            for stream in &streams {
3436                if cfg.per_stream_zstd {
3437                    #[cfg(feature = "compression-zstd")]
3438                    {
3439                        let compressed = zstd::stream::encode_all(std::io::Cursor::new(stream), 9)
3440                            .unwrap_or_else(|_| stream.clone());
3441                        if compressed.len() < stream.len() {
3442                            payload_concat.extend_from_slice(&compressed);
3443                            continue;
3444                        }
3445                    }
3446                }
3447                if cfg.per_stream_lz4 {
3448                    #[cfg(feature = "compression-lz4")]
3449                    {
3450                        if let Ok(compressed) = lz4::block::compress(
3451                            stream,
3452                            Some(lz4::block::CompressionMode::HIGHCOMPRESSION(12)),
3453                            false,
3454                        ) {
3455                            if compressed.len() < stream.len() {
3456                                payload_concat.extend_from_slice(&compressed);
3457                                continue;
3458                            }
3459                        }
3460                    }
3461                }
3462                payload_concat.extend_from_slice(stream);
3463            }
3464
3465            timings.encode_ms += start.elapsed().as_secs_f64() * 1e3;
3466            VariantEncoded {
3467                sign_bytes,
3468                streams,
3469                payload_concat,
3470            }
3471        }
3472
3473        fn decode_variant(
3474            encoded: &VariantEncoded,
3475            cfg: LayoutConfig,
3476            value_count: usize,
3477            timings: &mut Timings,
3478        ) -> Vec<f32> {
3479            let start = std::time::Instant::now();
3480            let bytes_per_value = 4;
3481
3482            if cfg.fpc_predict {
3483                let len_stream = &encoded.streams[0];
3484                let payload = &encoded.streams[1];
3485                let mut values = Vec::with_capacity(value_count);
3486                let mut prev1: u32 = 0;
3487                let mut prev2: u32 = 0;
3488                let mut payload_pos = 0;
3489                for (idx, &len_byte) in len_stream.iter().take(value_count).enumerate() {
3490                    let sig_len = len_byte as usize;
3491                    let mut diff: u32 = 0;
3492                    if sig_len > 0 {
3493                        let mut buf = [0u8; 4];
3494                        for b in 0..sig_len {
3495                            buf[4 - sig_len + b] = payload[payload_pos + b];
3496                        }
3497                        payload_pos += sig_len;
3498                        diff = u32::from_be_bytes(buf);
3499                    }
3500                    let pred = prev1.wrapping_add(prev1.wrapping_sub(prev2));
3501                    let mut bits = diff ^ pred;
3502                    prev2 = prev1;
3503                    prev1 = bits;
3504                    if cfg.sign_split && (encoded.sign_bytes[idx / 8] >> (idx % 8)) & 1 != 0 {
3505                        bits |= 0x8000_0000;
3506                    }
3507                    values.push(f32::from_bits(bits));
3508                }
3509                timings.decode_ms += start.elapsed().as_secs_f64() * 1e3;
3510                return values;
3511            } else if cfg.exponent_split {
3512                fn bitunshuffle_u32(data: &[u8], count: usize) -> Vec<u32> {
3513                    let bytes_per_plane = count.div_ceil(8);
3514                    let expected = bytes_per_plane * 32;
3515                    assert_eq!(data.len(), expected);
3516                    let mut values = vec![0u32; count];
3517                    for bit in 0..32 {
3518                        let base = bit * bytes_per_plane;
3519                        for idx in 0..count {
3520                            if (data[base + idx / 8] >> (idx % 8)) & 1 != 0 {
3521                                values[idx] |= 1u32 << bit;
3522                            }
3523                        }
3524                    }
3525                    values
3526                }
3527
3528                let exp_stream_raw = &encoded.streams[0];
3529                let exp_stream = if cfg.exponent_rle {
3530                    rle_decode_u8(exp_stream_raw, value_count)
3531                } else {
3532                    exp_stream_raw.clone()
3533                };
3534                let mut exp_stream = exp_stream;
3535                if cfg.exponent_delta {
3536                    inv_delta_xor_u8(&mut exp_stream);
3537                }
3538                if cfg.bitshuffle {
3539                    let mant_stream = &encoded.streams[1];
3540                    assert_eq!(exp_stream.len(), value_count);
3541                    let mut mant_values = if let Some(block) = cfg.block_bitshuffle {
3542                        bitunshuffle_block_u32(mant_stream, value_count, block)
3543                    } else {
3544                        bitunshuffle_u32(mant_stream, value_count)
3545                    };
3546                    if cfg.delta_xor {
3547                        inv_delta_xor_u32(&mut mant_values);
3548                    }
3549
3550                    let mut values = Vec::with_capacity(value_count);
3551                    for idx in 0..value_count {
3552                        let mut bits = mant_values[idx] | ((exp_stream[idx] as u32) << 23);
3553                        if cfg.sign_split && (encoded.sign_bytes[idx / 8] >> (idx % 8)) & 1 != 0 {
3554                            bits |= 0x8000_0000;
3555                        }
3556                        values.push(f32::from_bits(bits));
3557                    }
3558                    timings.decode_ms += start.elapsed().as_secs_f64() * 1e3;
3559                    return values;
3560                } else {
3561                    let mant_stream = &encoded.streams[1];
3562                    assert_eq!(mant_stream.len(), value_count * 3);
3563                    let mut mant_values = Vec::with_capacity(value_count);
3564                    for idx in 0..value_count {
3565                        let start = idx * 3;
3566                        let mut buf = [0u8; 4];
3567                        buf[0..3].copy_from_slice(&mant_stream[start..start + 3]);
3568                        mant_values.push(u32::from_le_bytes(buf));
3569                    }
3570                    if cfg.delta_xor {
3571                        inv_delta_xor_u32(&mut mant_values);
3572                    }
3573
3574                    let mut values = Vec::with_capacity(value_count);
3575                    for idx in 0..value_count {
3576                        let mut bits = mant_values[idx] | ((exp_stream[idx] as u32) << 23);
3577                        if cfg.sign_split && (encoded.sign_bytes[idx / 8] >> (idx % 8)) & 1 != 0 {
3578                            bits |= 0x8000_0000;
3579                        }
3580                        values.push(f32::from_bits(bits));
3581                    }
3582                    timings.decode_ms += start.elapsed().as_secs_f64() * 1e3;
3583                    return values;
3584                }
3585            } else if cfg.bitshuffle {
3586                let data = &encoded.streams[0];
3587                let mut bits_vec = if let Some(block) = cfg.block_bitshuffle {
3588                    bitunshuffle_block_u32(data, value_count, block)
3589                } else {
3590                    let planes = value_count.div_ceil(8);
3591                    assert_eq!(data.len(), planes * 32);
3592                    let mut vals = vec![0u32; value_count];
3593                    for bit in 0..32 {
3594                        let base = bit * planes;
3595                        for idx in 0..value_count {
3596                            if (data[base + idx / 8] >> (idx % 8)) & 1 != 0 {
3597                                vals[idx] |= 1u32 << bit;
3598                            }
3599                        }
3600                    }
3601                    vals
3602                };
3603                if cfg.delta_xor {
3604                    inv_delta_xor_u32(&mut bits_vec);
3605                }
3606
3607                let mut values = Vec::with_capacity(value_count);
3608                for (idx, &bits_value) in bits_vec.iter().take(value_count).enumerate() {
3609                    let mut bits = bits_value;
3610                    if cfg.sign_split && (encoded.sign_bytes[idx / 8] >> (idx % 8)) & 1 != 0 {
3611                        bits |= 0x8000_0000;
3612                    }
3613                    values.push(f32::from_bits(bits));
3614                }
3615                timings.decode_ms += start.elapsed().as_secs_f64() * 1e3;
3616                return values;
3617            }
3618
3619            let mut streams = encoded.streams.clone();
3620
3621            if cfg.xor_delta {
3622                for stream in streams.iter_mut() {
3623                    let mut prev = 0u8;
3624                    for b in stream.iter_mut() {
3625                        let cur = *b ^ prev;
3626                        prev = cur;
3627                        *b = cur;
3628                    }
3629                }
3630            }
3631
3632            let order: Vec<usize> = if cfg.exponent_split && bytes_per_value == 4 {
3633                vec![3, 2, 1, 0]
3634            } else if cfg.msb_first {
3635                (0..bytes_per_value).rev().collect()
3636            } else {
3637                (0..bytes_per_value).collect()
3638            };
3639
3640            let mut raw_bytes = vec![0u8; value_count * bytes_per_value];
3641            let mut offset = 0;
3642            while offset < value_count {
3643                let block_len = cfg.block.unwrap_or(value_count).min(value_count - offset);
3644                for (stream_idx, byte_idx) in order.iter().enumerate() {
3645                    let stream = &streams[stream_idx];
3646                    let start = offset;
3647                    for value_idx in 0..block_len {
3648                        raw_bytes[(offset + value_idx) * bytes_per_value + byte_idx] =
3649                            stream[start + value_idx];
3650                    }
3651                }
3652                offset += block_len;
3653            }
3654
3655            let mut values_u32 = Vec::with_capacity(value_count);
3656            for idx in 0..value_count {
3657                let bits = u32::from_le_bytes(
3658                    raw_bytes[idx * bytes_per_value..(idx + 1) * bytes_per_value]
3659                        .try_into()
3660                        .unwrap(),
3661                );
3662                values_u32.push(bits);
3663            }
3664            if cfg.delta_xor {
3665                inv_delta_xor_u32(&mut values_u32);
3666            }
3667            let mut values = Vec::with_capacity(value_count);
3668            for (idx, mut bits) in values_u32.into_iter().enumerate() {
3669                if cfg.sign_split && (encoded.sign_bytes[idx / 8] >> (idx % 8)) & 1 != 0 {
3670                    bits |= 0x8000_0000;
3671                }
3672                values.push(f32::from_bits(bits));
3673            }
3674            timings.decode_ms += start.elapsed().as_secs_f64() * 1e3;
3675            values
3676        }
3677
3678        for cfg in configs {
3679            let mut timings = Timings::default();
3680            let encoded = encode_variant(&values, cfg, &mut timings);
3681            let decoded = decode_variant(&encoded, cfg, values.len(), &mut timings);
3682            assert_eq!(decoded, values, "roundtrip failed for {}", cfg.name);
3683
3684            let encoded_len = encoded.payload_concat.len() as f32;
3685            #[cfg(feature = "compression-lz4")]
3686            let compressed_len_outer = lz4::block::compress(&encoded.payload_concat, None, false)
3687                .expect("lz4 compress")
3688                .len() as f32;
3689            #[cfg(not(feature = "compression-lz4"))]
3690            let compressed_len_outer = 0f32;
3691            #[cfg(feature = "compression-zstd")]
3692            let compressed_len_outer_zstd =
3693                zstd::stream::encode_all(std::io::Cursor::new(&encoded.payload_concat), 3)
3694                    .expect("zstd compress")
3695                    .len() as f32;
3696            #[cfg(not(feature = "compression-zstd"))]
3697            let compressed_len_outer_zstd = 0f32;
3698            #[cfg(feature = "compression-lz4")]
3699            let compressed_len_per_stream: usize = if cfg.per_stream_lz4 {
3700                encoded.sign_bytes.len()
3701                    + encoded
3702                        .streams
3703                        .iter()
3704                        .map(|s| {
3705                            lz4::block::compress(
3706                                s,
3707                                Some(lz4::block::CompressionMode::HIGHCOMPRESSION(12)),
3708                                false,
3709                            )
3710                            .map(|c| c.len())
3711                            .unwrap_or_else(|_| s.len())
3712                        })
3713                        .sum::<usize>()
3714            } else {
3715                0
3716            };
3717            #[cfg(not(feature = "compression-lz4"))]
3718            let compressed_len_per_stream: usize = 0;
3719            #[cfg(feature = "compression-zstd")]
3720            let compressed_len_per_stream_zstd: usize = if cfg.per_stream_zstd {
3721                encoded.sign_bytes.len()
3722                    + encoded
3723                        .streams
3724                        .iter()
3725                        .map(|s| {
3726                            zstd::stream::encode_all(std::io::Cursor::new(s), 9)
3727                                .map(|c| c.len())
3728                                .unwrap_or_else(|_| s.len())
3729                        })
3730                        .sum::<usize>()
3731            } else {
3732                0
3733            };
3734            #[cfg(not(feature = "compression-zstd"))]
3735            let compressed_len_per_stream_zstd: usize = 0;
3736
3737            println!(
3738                "variant={} encoded_ratio={:.3} lz4_outer_ratio={:.3}{}{}{} encode_ms={:.3} decode_ms={:.3}",
3739                cfg.name,
3740                encoded_len / raw_len,
3741                compressed_len_outer / raw_len,
3742                if cfg.per_stream_lz4 {
3743                    format!(
3744                        " lz4_per_stream_ratio={:.3}",
3745                        compressed_len_per_stream as f32 / raw_len
3746                    )
3747                } else {
3748                    "".to_string()
3749                },
3750                if cfg.outer_zstd {
3751                    format!(
3752                        " zstd_outer_ratio={:.3}",
3753                        compressed_len_outer_zstd / raw_len
3754                    )
3755                } else {
3756                    "".to_string()
3757                },
3758                if cfg.per_stream_zstd {
3759                    format!(
3760                        " zstd_per_stream_ratio={:.3}",
3761                        compressed_len_per_stream_zstd as f32 / raw_len
3762                    )
3763                } else {
3764                    "".to_string()
3765                },
3766                timings.encode_ms,
3767                timings.decode_ms,
3768            );
3769        }
3770
3771        #[cfg(all(feature = "compression-lz4", feature = "compression-zstd"))]
3772        #[allow(dead_code)]
3773        fn reencode_with_flag(encoded: &[u8], flag: u8, value_count: usize) -> Vec<u8> {
3774            // Parse header
3775            let count = u32::from_le_bytes(encoded[0..4].try_into().unwrap()) as usize;
3776            assert_eq!(count, value_count);
3777            let header = encoded[4];
3778            assert!((header & BYTE_STREAM_SPLIT_V1_FLAG) != 0);
3779            let has_bitmap = encoded[5] != 0;
3780            let mut pos = 6;
3781            if has_bitmap {
3782                let bm = Bitmap::from_bytes(&encoded[pos..]).unwrap();
3783                pos += 4 + bm.len().div_ceil(8);
3784            }
3785            let stream_count_byte = encoded[pos];
3786            pos += 1;
3787            let has_sign = (stream_count_byte & BYTE_STREAM_SPLIT_SIGN_FLAG) != 0;
3788            let stream_count = (stream_count_byte & BYTE_STREAM_SPLIT_STREAM_COUNT_MASK) as usize;
3789            let mut sign_bytes = Vec::new();
3790            if has_sign {
3791                let bm = Bitmap::from_bytes(&encoded[pos..]).unwrap();
3792                pos += 4 + bm.len().div_ceil(8);
3793                sign_bytes = bm.to_bytes();
3794            }
3795
3796            let mut raw_streams: Vec<Vec<u8>> = Vec::with_capacity(stream_count);
3797            for _ in 0..stream_count {
3798                let orig_len =
3799                    u32::from_le_bytes(encoded[pos + 1..pos + 5].try_into().unwrap()) as usize;
3800                let payload_len =
3801                    u32::from_le_bytes(encoded[pos + 5..pos + 9].try_into().unwrap()) as usize;
3802                let flag_orig = encoded[pos];
3803                pos += 9;
3804                let payload = &encoded[pos..pos + payload_len];
3805                pos += payload_len;
3806                let stream = match flag_orig {
3807                    1 => decompress_lz4(payload, orig_len),
3808                    2 => decompress_zstd(payload),
3809                    _ => payload.to_vec(),
3810                };
3811                assert_eq!(stream.len(), orig_len);
3812                raw_streams.push(stream);
3813            }
3814
3815            // Rebuild with new flag
3816            let mut buf = Vec::new();
3817            buf.extend_from_slice(&(count as u32).to_le_bytes());
3818            buf.push(header);
3819            buf.push(if has_bitmap { 1 } else { 0 });
3820            if has_bitmap {
3821                // bitmap bytes were already included in header parse; reuse slice
3822                // For simplicity, copy from encoded since structure is unchanged.
3823                let bm = Bitmap::from_bytes(&encoded[6..]).unwrap();
3824                buf.extend_from_slice(&bm.to_bytes());
3825            }
3826            buf.push(stream_count_byte);
3827            if has_sign {
3828                buf.extend_from_slice(&sign_bytes);
3829            }
3830            for stream in raw_streams {
3831                let orig_len = stream.len() as u32;
3832                let (flag_set, payload) = match flag {
3833                    1 => {
3834                        let lz = lz4::block::compress(
3835                            &stream,
3836                            Some(lz4::block::CompressionMode::HIGHCOMPRESSION(12)),
3837                            false,
3838                        )
3839                        .unwrap();
3840                        if lz.len() < stream.len() {
3841                            (1u8, lz)
3842                        } else {
3843                            (0u8, stream.clone())
3844                        }
3845                    }
3846                    2 => {
3847                        let zs =
3848                            zstd::stream::encode_all(std::io::Cursor::new(&stream), 15).unwrap();
3849                        if zs.len() < stream.len() {
3850                            (2u8, zs)
3851                        } else {
3852                            (0u8, stream.clone())
3853                        }
3854                    }
3855                    _ => (0u8, stream.clone()),
3856                };
3857                buf.push(flag_set);
3858                buf.extend_from_slice(&orig_len.to_le_bytes());
3859                buf.extend_from_slice(&(payload.len() as u32).to_le_bytes());
3860                buf.extend_from_slice(&payload);
3861            }
3862            buf
3863        }
3864
3865        #[cfg(all(feature = "compression-lz4", feature = "compression-zstd"))]
3866        #[allow(dead_code)]
3867        fn _test_byte_stream_split_stream_flag_integrity() {
3868            let values: Vec<f32> = (0..1024).map(|i| (i as f32).sin()).collect();
3869            let col = Column::Float32(values.clone());
3870            let encoder = ByteStreamSplitEncoder;
3871            let encoded = encoder.encode(&col, None).unwrap();
3872
3873            // raw
3874            let raw_variant = reencode_with_flag(&encoded, 0, values.len());
3875            let decoder = ByteStreamSplitDecoder;
3876            let (decoded_raw, _) = decoder
3877                .decode(&raw_variant, values.len(), LogicalType::Float32)
3878                .unwrap();
3879            assert_eq!(decoded_raw, Column::Float32(values.clone()));
3880
3881            // lz4
3882            let lz4_variant = reencode_with_flag(&encoded, 1, values.len());
3883            let (decoded_lz4, _) = decoder
3884                .decode(&lz4_variant, values.len(), LogicalType::Float32)
3885                .unwrap();
3886            assert_eq!(decoded_lz4, Column::Float32(values.clone()));
3887
3888            // zstd
3889            let zstd_variant = reencode_with_flag(&encoded, 2, values.len());
3890            let (decoded_zstd, _) = decoder
3891                .decode(&zstd_variant, values.len(), LogicalType::Float32)
3892                .unwrap();
3893            assert_eq!(decoded_zstd, Column::Float32(values));
3894        }
3895    }
3896
3897    #[test]
3898    fn test_byte_stream_split_legacy_layout_decode() {
3899        let values = vec![1.0f32, -0.5, 3.25, 0.0, std::f32::consts::PI];
3900        let count = values.len();
3901
3902        // Build legacy layout (no V1 flag, LSB-first streams)
3903        let mut buf = Vec::new();
3904        buf.extend_from_slice(&(count as u32).to_le_bytes());
3905        buf.push(4u8); // bytes_per_value without layout flag
3906        buf.push(0u8); // no bitmap
3907
3908        let raw_bytes: Vec<u8> = values.iter().flat_map(|v| v.to_le_bytes()).collect();
3909        for byte_idx in 0..4 {
3910            for value_idx in 0..count {
3911                buf.push(raw_bytes[value_idx * 4 + byte_idx]);
3912            }
3913        }
3914
3915        let decoder = ByteStreamSplitDecoder;
3916        let (decoded, bitmap) = decoder.decode(&buf, count, LogicalType::Float32).unwrap();
3917
3918        assert!(bitmap.is_none());
3919        if let Column::Float32(decoded_values) = decoded {
3920            assert_eq!(decoded_values, values);
3921        } else {
3922            panic!("Expected Float32 column");
3923        }
3924    }
3925
3926    #[test]
3927    fn test_incremental_string_sorted() {
3928        let values: Vec<Vec<u8>> = vec![
3929            b"apple".to_vec(),
3930            b"application".to_vec(),
3931            b"apply".to_vec(),
3932            b"banana".to_vec(),
3933            b"bandana".to_vec(),
3934        ];
3935        let col = Column::Binary(values.clone());
3936
3937        let encoder = IncrementalStringEncoder;
3938        let encoded = encoder.encode(&col, None).unwrap();
3939
3940        let decoder = IncrementalStringDecoder;
3941        let (decoded, bitmap) = decoder
3942            .decode(&encoded, values.len(), LogicalType::Binary)
3943            .unwrap();
3944
3945        assert!(bitmap.is_none());
3946        if let Column::Binary(decoded_values) = decoded {
3947            assert_eq!(decoded_values, values);
3948        } else {
3949            panic!("Expected Binary column");
3950        }
3951    }
3952
3953    #[test]
3954    fn test_encoding_fallback_to_plain() {
3955        // Test that select_encoding returns Plain for unsupported scenarios
3956        let hints = EncodingHints {
3957            is_sorted: false,
3958            distinct_count: 1000,
3959            total_count: 1000,
3960            value_range: Some(u64::MAX), // Very large range
3961            in_range_ratio: None,
3962        };
3963
3964        let encoding = select_encoding(LogicalType::Int64, &hints);
3965        assert_eq!(encoding, EncodingV2::Plain);
3966    }
3967
3968    #[test]
3969    fn test_bitmap_operations() {
3970        let bitmap = Bitmap::from_bools(&[true, false, true, true, false]);
3971        assert!(bitmap.is_valid(0));
3972        assert!(!bitmap.is_valid(1));
3973        assert!(bitmap.is_valid(2));
3974        assert!(bitmap.is_valid(3));
3975        assert!(!bitmap.is_valid(4));
3976        assert_eq!(bitmap.null_count(), 2);
3977        assert_eq!(bitmap.len(), 5);
3978
3979        // Roundtrip
3980        let bytes = bitmap.to_bytes();
3981        let restored = Bitmap::from_bytes(&bytes).unwrap();
3982        assert_eq!(restored.len(), bitmap.len());
3983        for i in 0..bitmap.len() {
3984            assert_eq!(restored.is_valid(i), bitmap.is_valid(i));
3985        }
3986    }
3987
3988    #[test]
3989    fn test_delta_with_bitmap() {
3990        let values = vec![100i64, 105, 110, 115, 120];
3991        let bitmap = Bitmap::from_bools(&[true, false, true, true, false]);
3992        let col = Column::Int64(values.clone());
3993
3994        let encoder = DeltaEncoder;
3995        let encoded = encoder.encode(&col, Some(&bitmap)).unwrap();
3996
3997        let decoder = DeltaDecoder;
3998        let (decoded, decoded_bitmap) = decoder
3999            .decode(&encoded, values.len(), LogicalType::Int64)
4000            .unwrap();
4001
4002        assert!(decoded_bitmap.is_some());
4003        let decoded_bitmap = decoded_bitmap.unwrap();
4004        assert_eq!(decoded_bitmap.null_count(), 2);
4005
4006        if let Column::Int64(decoded_values) = decoded {
4007            assert_eq!(decoded_values, values);
4008        } else {
4009            panic!("Expected Int64 column");
4010        }
4011    }
4012
4013    #[test]
4014    fn test_zigzag_encoding() {
4015        assert_eq!(zigzag_encode(0), 0);
4016        assert_eq!(zigzag_encode(-1), 1);
4017        assert_eq!(zigzag_encode(1), 2);
4018        assert_eq!(zigzag_encode(-2), 3);
4019        assert_eq!(zigzag_encode(2), 4);
4020
4021        for n in [-1000i64, -1, 0, 1, 1000, i64::MIN, i64::MAX] {
4022            assert_eq!(zigzag_decode(zigzag_encode(n)), n);
4023        }
4024    }
4025
4026    #[test]
4027    fn test_varint_encoding() {
4028        let mut buf = Vec::new();
4029        encode_varint(300, &mut buf);
4030        let (decoded, bytes_read) = decode_varint(&buf).unwrap();
4031        assert_eq!(decoded, 300);
4032        assert_eq!(bytes_read, 2);
4033
4034        buf.clear();
4035        encode_varint(0, &mut buf);
4036        let (decoded, bytes_read) = decode_varint(&buf).unwrap();
4037        assert_eq!(decoded, 0);
4038        assert_eq!(bytes_read, 1);
4039    }
4040
4041    #[test]
4042    fn test_select_encoding_sorted_int() {
4043        let hints = EncodingHints {
4044            is_sorted: true,
4045            distinct_count: 100,
4046            total_count: 100,
4047            value_range: Some(100),
4048            in_range_ratio: None,
4049        };
4050        assert_eq!(
4051            select_encoding(LogicalType::Int64, &hints),
4052            EncodingV2::Delta
4053        );
4054    }
4055
4056    #[test]
4057    fn test_select_encoding_small_range() {
4058        let hints = EncodingHints {
4059            is_sorted: false,
4060            distinct_count: 100,
4061            total_count: 100,
4062            value_range: Some(255), // Fits in 8 bits
4063            in_range_ratio: Some(1.0),
4064        };
4065        assert_eq!(select_encoding(LogicalType::Int64, &hints), EncodingV2::FOR);
4066    }
4067
4068    #[test]
4069    fn test_select_encoding_float() {
4070        let hints = EncodingHints::default();
4071        assert_eq!(
4072            select_encoding(LogicalType::Float64, &hints),
4073            EncodingV2::ByteStreamSplit
4074        );
4075        assert_eq!(
4076            select_encoding(LogicalType::Float32, &hints),
4077            EncodingV2::ByteStreamSplit
4078        );
4079    }
4080
4081    #[test]
4082    fn test_select_encoding_sorted_binary() {
4083        let hints = EncodingHints {
4084            is_sorted: true,
4085            distinct_count: 100,
4086            total_count: 100,
4087            value_range: None,
4088            in_range_ratio: None,
4089        };
4090        assert_eq!(
4091            select_encoding(LogicalType::Binary, &hints),
4092            EncodingV2::IncrementalString
4093        );
4094    }
4095
4096    // ========================================================================
4097    // RLE Tests
4098    // ========================================================================
4099
4100    #[test]
4101    fn test_rle_bool_roundtrip() {
4102        // Runs of consecutive values
4103        let values = vec![
4104            true, true, true, false, false, true, true, true, true, false,
4105        ];
4106        let col = Column::Bool(values.clone());
4107
4108        let encoder = RleEncoder;
4109        let encoded = encoder.encode(&col, None).unwrap();
4110
4111        let decoder = RleDecoder;
4112        let (decoded, bitmap) = decoder
4113            .decode(&encoded, values.len(), LogicalType::Bool)
4114            .unwrap();
4115
4116        assert!(bitmap.is_none());
4117        if let Column::Bool(decoded_values) = decoded {
4118            assert_eq!(decoded_values, values);
4119        } else {
4120            panic!("Expected Bool column");
4121        }
4122    }
4123
4124    #[test]
4125    fn test_rle_bool_roundtrip_with_bitmap() {
4126        let values = vec![true, true, false, false, true];
4127        let bitmap = Bitmap::from_bools(&[true, false, true, true, false]);
4128        let col = Column::Bool(values.clone());
4129
4130        let encoder = RleEncoder;
4131        let encoded = encoder.encode(&col, Some(&bitmap)).unwrap();
4132
4133        let decoder = RleDecoder;
4134        let (decoded, decoded_bitmap) = decoder
4135            .decode(&encoded, values.len(), LogicalType::Bool)
4136            .unwrap();
4137
4138        assert!(decoded_bitmap.is_some());
4139        let decoded_bitmap = decoded_bitmap.unwrap();
4140        assert_eq!(decoded_bitmap.null_count(), 2);
4141
4142        if let Column::Bool(decoded_values) = decoded {
4143            assert_eq!(decoded_values, values);
4144        } else {
4145            panic!("Expected Bool column");
4146        }
4147    }
4148
4149    #[test]
4150    fn test_rle_int64_roundtrip() {
4151        // Runs of consecutive values
4152        let values = vec![100i64, 100, 100, 200, 200, 100, 100, 100, 100, 300];
4153        let col = Column::Int64(values.clone());
4154
4155        let encoder = RleEncoder;
4156        let encoded = encoder.encode(&col, None).unwrap();
4157
4158        let decoder = RleDecoder;
4159        let (decoded, bitmap) = decoder
4160            .decode(&encoded, values.len(), LogicalType::Int64)
4161            .unwrap();
4162
4163        assert!(bitmap.is_none());
4164        if let Column::Int64(decoded_values) = decoded {
4165            assert_eq!(decoded_values, values);
4166        } else {
4167            panic!("Expected Int64 column");
4168        }
4169    }
4170
4171    #[test]
4172    fn test_rle_int64_roundtrip_with_bitmap() {
4173        let values = vec![100i64, 100, 200, 200, 100];
4174        let bitmap = Bitmap::from_bools(&[true, false, true, true, false]);
4175        let col = Column::Int64(values.clone());
4176
4177        let encoder = RleEncoder;
4178        let encoded = encoder.encode(&col, Some(&bitmap)).unwrap();
4179
4180        let decoder = RleDecoder;
4181        let (decoded, decoded_bitmap) = decoder
4182            .decode(&encoded, values.len(), LogicalType::Int64)
4183            .unwrap();
4184
4185        assert!(decoded_bitmap.is_some());
4186        let decoded_bitmap = decoded_bitmap.unwrap();
4187        assert_eq!(decoded_bitmap.null_count(), 2);
4188
4189        if let Column::Int64(decoded_values) = decoded {
4190            assert_eq!(decoded_values, values);
4191        } else {
4192            panic!("Expected Int64 column");
4193        }
4194    }
4195
4196    #[test]
4197    fn test_rle_empty() {
4198        let col = Column::Bool(vec![]);
4199
4200        let encoder = RleEncoder;
4201        let encoded = encoder.encode(&col, None).unwrap();
4202
4203        let decoder = RleDecoder;
4204        let (decoded, bitmap) = decoder.decode(&encoded, 0, LogicalType::Bool).unwrap();
4205
4206        assert!(bitmap.is_none());
4207        if let Column::Bool(decoded_values) = decoded {
4208            assert!(decoded_values.is_empty());
4209        } else {
4210            panic!("Expected Bool column");
4211        }
4212    }
4213
4214    // ========================================================================
4215    // Dictionary Tests
4216    // ========================================================================
4217
4218    #[test]
4219    fn test_dictionary_binary_roundtrip() {
4220        // Low cardinality strings
4221        let values: Vec<Vec<u8>> = vec![
4222            b"apple".to_vec(),
4223            b"banana".to_vec(),
4224            b"apple".to_vec(),
4225            b"cherry".to_vec(),
4226            b"banana".to_vec(),
4227            b"apple".to_vec(),
4228        ];
4229        let col = Column::Binary(values.clone());
4230
4231        let encoder = DictionaryEncoder;
4232        let encoded = encoder.encode(&col, None).unwrap();
4233
4234        let decoder = DictionaryDecoder;
4235        let (decoded, bitmap) = decoder
4236            .decode(&encoded, values.len(), LogicalType::Binary)
4237            .unwrap();
4238
4239        assert!(bitmap.is_none());
4240        if let Column::Binary(decoded_values) = decoded {
4241            assert_eq!(decoded_values, values);
4242        } else {
4243            panic!("Expected Binary column");
4244        }
4245    }
4246
4247    #[test]
4248    fn test_dictionary_binary_roundtrip_with_bitmap() {
4249        let values: Vec<Vec<u8>> = vec![
4250            b"apple".to_vec(),
4251            b"banana".to_vec(),
4252            b"apple".to_vec(),
4253            b"cherry".to_vec(),
4254            b"banana".to_vec(),
4255        ];
4256        let bitmap = Bitmap::from_bools(&[true, false, true, true, false]);
4257        let col = Column::Binary(values.clone());
4258
4259        let encoder = DictionaryEncoder;
4260        let encoded = encoder.encode(&col, Some(&bitmap)).unwrap();
4261
4262        let decoder = DictionaryDecoder;
4263        let (decoded, decoded_bitmap) = decoder
4264            .decode(&encoded, values.len(), LogicalType::Binary)
4265            .unwrap();
4266
4267        assert!(decoded_bitmap.is_some());
4268        let decoded_bitmap = decoded_bitmap.unwrap();
4269        assert_eq!(decoded_bitmap.null_count(), 2);
4270
4271        if let Column::Binary(decoded_values) = decoded {
4272            assert_eq!(decoded_values, values);
4273        } else {
4274            panic!("Expected Binary column");
4275        }
4276    }
4277
4278    #[test]
4279    fn test_dictionary_fixed_roundtrip() {
4280        // Fixed-length values (e.g., UUIDs)
4281        let values: Vec<Vec<u8>> = vec![
4282            vec![1, 2, 3, 4],
4283            vec![5, 6, 7, 8],
4284            vec![1, 2, 3, 4],
4285            vec![9, 10, 11, 12],
4286            vec![5, 6, 7, 8],
4287        ];
4288        let col = Column::Fixed {
4289            len: 4,
4290            values: values.clone(),
4291        };
4292
4293        let encoder = DictionaryEncoder;
4294        let encoded = encoder.encode(&col, None).unwrap();
4295
4296        let decoder = DictionaryDecoder;
4297        let (decoded, bitmap) = decoder
4298            .decode(&encoded, values.len(), LogicalType::Fixed(4))
4299            .unwrap();
4300
4301        assert!(bitmap.is_none());
4302        if let Column::Fixed {
4303            len,
4304            values: decoded_values,
4305        } = decoded
4306        {
4307            assert_eq!(len, 4);
4308            assert_eq!(decoded_values, values);
4309        } else {
4310            panic!("Expected Fixed column");
4311        }
4312    }
4313
4314    #[test]
4315    fn test_dictionary_empty() {
4316        let col = Column::Binary(vec![]);
4317
4318        let encoder = DictionaryEncoder;
4319        let encoded = encoder.encode(&col, None).unwrap();
4320
4321        let decoder = DictionaryDecoder;
4322        let (decoded, bitmap) = decoder.decode(&encoded, 0, LogicalType::Binary).unwrap();
4323
4324        assert!(bitmap.is_none());
4325        if let Column::Binary(decoded_values) = decoded {
4326            assert!(decoded_values.is_empty());
4327        } else {
4328            panic!("Expected Binary column");
4329        }
4330    }
4331
4332    // ========================================================================
4333    // Bitpack Tests
4334    // ========================================================================
4335
4336    #[test]
4337    fn test_bitpack_bool_roundtrip() {
4338        let values = vec![
4339            true, false, true, true, false, true, false, false, true, true,
4340        ];
4341        let col = Column::Bool(values.clone());
4342
4343        let encoder = BitpackEncoder;
4344        let encoded = encoder.encode(&col, None).unwrap();
4345
4346        // Verify compression: 10 bools should take ~2 bytes (packed) vs 10 bytes (plain)
4347        // Header is 5 bytes (count + has_bitmap), so total should be ~7 bytes
4348        assert!(encoded.len() < 10);
4349
4350        let decoder = BitpackDecoder;
4351        let (decoded, bitmap) = decoder
4352            .decode(&encoded, values.len(), LogicalType::Bool)
4353            .unwrap();
4354
4355        assert!(bitmap.is_none());
4356        if let Column::Bool(decoded_values) = decoded {
4357            assert_eq!(decoded_values, values);
4358        } else {
4359            panic!("Expected Bool column");
4360        }
4361    }
4362
4363    #[test]
4364    fn test_bitpack_bool_roundtrip_with_bitmap() {
4365        let values = vec![true, false, true, true, false];
4366        let bitmap = Bitmap::from_bools(&[true, false, true, true, false]);
4367        let col = Column::Bool(values.clone());
4368
4369        let encoder = BitpackEncoder;
4370        let encoded = encoder.encode(&col, Some(&bitmap)).unwrap();
4371
4372        let decoder = BitpackDecoder;
4373        let (decoded, decoded_bitmap) = decoder
4374            .decode(&encoded, values.len(), LogicalType::Bool)
4375            .unwrap();
4376
4377        assert!(decoded_bitmap.is_some());
4378        let decoded_bitmap = decoded_bitmap.unwrap();
4379        assert_eq!(decoded_bitmap.null_count(), 2);
4380
4381        if let Column::Bool(decoded_values) = decoded {
4382            assert_eq!(decoded_values, values);
4383        } else {
4384            panic!("Expected Bool column");
4385        }
4386    }
4387
4388    #[test]
4389    fn test_bitpack_empty() {
4390        let col = Column::Bool(vec![]);
4391
4392        let encoder = BitpackEncoder;
4393        let encoded = encoder.encode(&col, None).unwrap();
4394
4395        let decoder = BitpackDecoder;
4396        let (decoded, bitmap) = decoder.decode(&encoded, 0, LogicalType::Bool).unwrap();
4397
4398        assert!(bitmap.is_none());
4399        if let Column::Bool(decoded_values) = decoded {
4400            assert!(decoded_values.is_empty());
4401        } else {
4402            panic!("Expected Bool column");
4403        }
4404    }
4405
4406    #[test]
4407    fn test_bitpack_large() {
4408        // Test with more than 8 values to verify multi-byte packing
4409        let values: Vec<bool> = (0..100).map(|i| i % 3 == 0).collect();
4410        let col = Column::Bool(values.clone());
4411
4412        let encoder = BitpackEncoder;
4413        let encoded = encoder.encode(&col, None).unwrap();
4414
4415        // 100 bools should pack into 13 bytes (100/8 rounded up)
4416        // Plus 5 bytes header = 18 bytes total
4417        assert_eq!(encoded.len(), 5 + 13);
4418
4419        let decoder = BitpackDecoder;
4420        let (decoded, _) = decoder
4421            .decode(&encoded, values.len(), LogicalType::Bool)
4422            .unwrap();
4423
4424        if let Column::Bool(decoded_values) = decoded {
4425            assert_eq!(decoded_values, values);
4426        } else {
4427            panic!("Expected Bool column");
4428        }
4429    }
4430
4431    // ========================================================================
4432    // Select Encoding Tests for New Encodings
4433    // ========================================================================
4434
4435    #[test]
4436    fn test_select_encoding_bool_bitpack() {
4437        // Bool with short runs should use Bitpack (avg run len < 4)
4438        let hints = EncodingHints {
4439            is_sorted: false,
4440            distinct_count: 2,
4441            total_count: 6, // avg run length = 3 < 4
4442            value_range: None,
4443            in_range_ratio: None,
4444        };
4445        assert_eq!(
4446            select_encoding(LogicalType::Bool, &hints),
4447            EncodingV2::Bitpack
4448        );
4449    }
4450
4451    #[test]
4452    fn test_select_encoding_bool_rle() {
4453        // Bool with long runs should use RLE
4454        let hints = EncodingHints {
4455            is_sorted: false,
4456            distinct_count: 2,
4457            total_count: 100, // avg run length = 50
4458            value_range: None,
4459            in_range_ratio: None,
4460        };
4461        assert_eq!(select_encoding(LogicalType::Bool, &hints), EncodingV2::Rle);
4462    }
4463
4464    #[test]
4465    fn test_select_encoding_int64_rle() {
4466        // Int64 with many repeating values
4467        let hints = EncodingHints {
4468            is_sorted: false,
4469            distinct_count: 5,
4470            total_count: 100,            // avg run length = 20
4471            value_range: Some(u64::MAX), // large range so FOR won't be selected
4472            in_range_ratio: None,
4473        };
4474        assert_eq!(select_encoding(LogicalType::Int64, &hints), EncodingV2::Rle);
4475    }
4476
4477    #[test]
4478    fn test_select_encoding_binary_dictionary() {
4479        // Low cardinality binary should use Dictionary
4480        let hints = EncodingHints {
4481            is_sorted: false,
4482            distinct_count: 10,
4483            total_count: 100, // 10% cardinality
4484            value_range: None,
4485            in_range_ratio: None,
4486        };
4487        assert_eq!(
4488            select_encoding(LogicalType::Binary, &hints),
4489            EncodingV2::Dictionary
4490        );
4491    }
4492
4493    #[test]
4494    fn test_select_encoding_binary_plain() {
4495        // High cardinality binary should use Plain
4496        let hints = EncodingHints {
4497            is_sorted: false,
4498            distinct_count: 80,
4499            total_count: 100, // 80% cardinality
4500            value_range: None,
4501            in_range_ratio: None,
4502        };
4503        assert_eq!(
4504            select_encoding(LogicalType::Binary, &hints),
4505            EncodingV2::Plain
4506        );
4507    }
4508
4509    // ========================================================================
4510    // Create Encoder/Decoder Tests
4511    // ========================================================================
4512
4513    #[test]
4514    fn test_create_encoder_rle() {
4515        let encoder = create_encoder(EncodingV2::Rle);
4516        assert_eq!(encoder.encoding_type(), EncodingV2::Rle);
4517    }
4518
4519    #[test]
4520    fn test_create_encoder_dictionary() {
4521        let encoder = create_encoder(EncodingV2::Dictionary);
4522        assert_eq!(encoder.encoding_type(), EncodingV2::Dictionary);
4523    }
4524
4525    #[test]
4526    fn test_create_encoder_bitpack() {
4527        let encoder = create_encoder(EncodingV2::Bitpack);
4528        assert_eq!(encoder.encoding_type(), EncodingV2::Bitpack);
4529    }
4530
4531    #[test]
4532    fn test_create_decoder_roundtrip_via_factory() {
4533        // Test RLE via factory
4534        let values = vec![true, true, true, false, false];
4535        let col = Column::Bool(values.clone());
4536
4537        let encoder = create_encoder(EncodingV2::Rle);
4538        let encoded = encoder.encode(&col, None).unwrap();
4539
4540        let decoder = create_decoder(EncodingV2::Rle);
4541        let (decoded, _) = decoder
4542            .decode(&encoded, values.len(), LogicalType::Bool)
4543            .unwrap();
4544
4545        if let Column::Bool(decoded_values) = decoded {
4546            assert_eq!(decoded_values, values);
4547        } else {
4548            panic!("Expected Bool column");
4549        }
4550    }
4551}