alopex_core/columnar/
encoding.rs

1//! Columnar encoding/decoding utilities for fixed/variable-length data with optional compression.
2use std::convert::TryInto;
3
4use crc32fast::Hasher;
5use serde::{Deserialize, Serialize};
6
7use crate::columnar::error::{ColumnarError, Result};
8
9/// Logical data type of a column.
10#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
11pub enum LogicalType {
12    /// Signed 64-bit integer.
13    Int64,
14    /// 32-bit floating point.
15    Float32,
16    /// 64-bit floating point.
17    Float64,
18    /// Boolean value.
19    Bool,
20    /// Arbitrary binary (variable-length).
21    Binary,
22    /// Fixed-length binary of `len` bytes.
23    Fixed(u16),
24}
25
26/// Encoding strategy for a column.
27#[derive(Clone, Copy, Debug, PartialEq, Eq)]
28pub enum Encoding {
29    /// Raw values.
30    Plain,
31    /// Dictionary encoding with indexes.
32    Dictionary,
33    /// Run-length encoding.
34    Rle,
35    /// Bit-packed representation (bools).
36    Bitpack,
37}
38
39/// Compression applied after encoding.
40#[derive(Clone, Copy, Debug, PartialEq, Eq)]
41pub enum Compression {
42    /// No compression.
43    None,
44    /// LZ4 block compression.
45    Lz4,
46}
47
48/// In-memory representation of a column.
49#[derive(Debug, Clone, PartialEq)]
50pub enum Column {
51    /// Column of i64 values.
52    Int64(Vec<i64>),
53    /// Column of f32 values.
54    Float32(Vec<f32>),
55    /// Column of f64 values.
56    Float64(Vec<f64>),
57    /// Column of bool values.
58    Bool(Vec<bool>),
59    /// Column of variable-length binary values.
60    Binary(Vec<Vec<u8>>),
61    /// Column of fixed-length binary values.
62    Fixed {
63        /// Fixed byte length for each value.
64        len: usize,
65        /// Fixed-length binary values.
66        values: Vec<Vec<u8>>,
67    },
68}
69
70/// Encode a column with the given encoding/compression and optional checksum.
71///
72/// The optional checksum computes CRC32 over the stored bytes (post-compression).
73pub fn encode_column(
74    column: &Column,
75    encoding: Encoding,
76    compression: Compression,
77    checksum: bool,
78    logical_type: LogicalType,
79) -> Result<Vec<u8>> {
80    validate_logical(column, logical_type)?;
81    let mut payload = match encoding {
82        Encoding::Plain => encode_plain(column)?,
83        Encoding::Dictionary => encode_dictionary(column)?,
84        Encoding::Rle => encode_rle(column)?,
85        Encoding::Bitpack => encode_bitpack(column)?,
86    };
87
88    if let Compression::Lz4 = compression {
89        #[cfg(feature = "compression-lz4")]
90        {
91            let orig_len: u32 =
92                payload
93                    .len()
94                    .try_into()
95                    .map_err(|_| ColumnarError::CorruptedSegment {
96                        reason: "payload too large for lz4".into(),
97                    })?;
98            let compressed = lz4::block::compress(&payload, None, false).map_err(|e| {
99                ColumnarError::CorruptedSegment {
100                    reason: e.to_string(),
101                }
102            })?;
103            let mut buf = Vec::with_capacity(4 + compressed.len());
104            buf.extend_from_slice(&orig_len.to_le_bytes());
105            buf.extend_from_slice(&compressed);
106            payload = buf;
107        }
108        #[cfg(not(feature = "compression-lz4"))]
109        {
110            return Err(ColumnarError::CorruptedSegment {
111                reason: "lz4 compression is disabled (feature compression-lz4)".into(),
112            });
113        }
114    }
115
116    if checksum {
117        let mut hasher = Hasher::new();
118        hasher.update(&payload);
119        let crc = hasher.finalize();
120        payload.extend_from_slice(&crc.to_le_bytes());
121    }
122
123    Ok(payload)
124}
125
126/// Decode bytes into a column using the specified logical type, encoding, and compression.
127pub fn decode_column(
128    bytes: &[u8],
129    logical_type: LogicalType,
130    encoding: Encoding,
131    compression: Compression,
132    checksum: bool,
133) -> Result<Column> {
134    let data = if checksum {
135        if bytes.len() < 4 {
136            return Err(ColumnarError::CorruptedSegment {
137                reason: "checksum missing".into(),
138            });
139        }
140        let (content, crc_bytes) = bytes.split_at(bytes.len() - 4);
141        let expected = u32::from_le_bytes(crc_bytes.try_into().unwrap());
142        let mut hasher = Hasher::new();
143        hasher.update(content);
144        let computed = hasher.finalize();
145        if expected != computed {
146            return Err(ColumnarError::ChecksumMismatch);
147        }
148        content
149    } else {
150        bytes
151    };
152
153    let decompressed = match compression {
154        Compression::None => data.to_vec(),
155        Compression::Lz4 => {
156            #[cfg(feature = "compression-lz4")]
157            {
158                if data.len() < 4 {
159                    return Err(ColumnarError::CorruptedSegment {
160                        reason: "lz4 header too short".into(),
161                    });
162                }
163                let orig_len = u32::from_le_bytes(data[0..4].try_into().unwrap()) as i32;
164                lz4::block::decompress(&data[4..], Some(orig_len)).map_err(|e| {
165                    ColumnarError::CorruptedSegment {
166                        reason: e.to_string(),
167                    }
168                })?
169            }
170            #[cfg(not(feature = "compression-lz4"))]
171            {
172                return Err(ColumnarError::CorruptedSegment {
173                    reason: "lz4 compression is disabled (feature compression-lz4)".into(),
174                });
175            }
176        }
177    };
178
179    match encoding {
180        Encoding::Plain => decode_plain(&decompressed, logical_type),
181        Encoding::Dictionary => decode_dictionary(&decompressed, logical_type),
182        Encoding::Rle => decode_rle(&decompressed, logical_type),
183        Encoding::Bitpack => decode_bitpack(&decompressed, logical_type),
184    }
185}
186
187fn validate_logical(column: &Column, logical: LogicalType) -> Result<()> {
188    match (column, logical) {
189        (Column::Int64(_), LogicalType::Int64)
190        | (Column::Float32(_), LogicalType::Float32)
191        | (Column::Float64(_), LogicalType::Float64)
192        | (Column::Bool(_), LogicalType::Bool)
193        | (Column::Binary(_), LogicalType::Binary) => Ok(()),
194        (Column::Fixed { len, .. }, LogicalType::Fixed(flen)) if *len == flen as usize => Ok(()),
195        (_, LogicalType::Fixed(_)) => Err(ColumnarError::CorruptedSegment {
196            reason: "fixed length mismatch".into(),
197        }),
198        _ => Err(ColumnarError::CorruptedSegment {
199            reason: "logical type mismatch".into(),
200        }),
201    }
202}
203
204fn encode_plain(column: &Column) -> Result<Vec<u8>> {
205    match column {
206        Column::Int64(values) => {
207            let mut buf = Vec::with_capacity(4 + values.len() * 8);
208            buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
209            for v in values {
210                buf.extend_from_slice(&v.to_le_bytes());
211            }
212            Ok(buf)
213        }
214        Column::Float32(values) => {
215            let mut buf = Vec::with_capacity(4 + values.len() * 4);
216            buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
217            for v in values {
218                buf.extend_from_slice(&v.to_le_bytes());
219            }
220            Ok(buf)
221        }
222        Column::Float64(values) => {
223            let mut buf = Vec::with_capacity(4 + values.len() * 8);
224            buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
225            for v in values {
226                buf.extend_from_slice(&v.to_le_bytes());
227            }
228            Ok(buf)
229        }
230        Column::Bool(values) => {
231            let mut buf = Vec::with_capacity(4 + values.len());
232            buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
233            for v in values {
234                buf.push(*v as u8);
235            }
236            Ok(buf)
237        }
238        Column::Binary(values) => encode_varlen(values),
239        Column::Fixed { len, values } => {
240            for v in values {
241                if v.len() != *len {
242                    return Err(ColumnarError::CorruptedSegment {
243                        reason: "fixed value length mismatch".into(),
244                    });
245                }
246            }
247            let mut buf = Vec::with_capacity(6 + values.len() * *len);
248            buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
249            buf.extend_from_slice(&(*len as u16).to_le_bytes());
250            for v in values {
251                buf.extend_from_slice(v);
252            }
253            Ok(buf)
254        }
255    }
256}
257
258fn encode_varlen(values: &[Vec<u8>]) -> Result<Vec<u8>> {
259    let mut buf = Vec::new();
260    buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
261    for v in values {
262        let len: u32 = v
263            .len()
264            .try_into()
265            .map_err(|_| ColumnarError::CorruptedSegment {
266                reason: "value too long".into(),
267            })?;
268        buf.extend_from_slice(&len.to_le_bytes());
269        buf.extend_from_slice(v);
270    }
271    Ok(buf)
272}
273
274fn decode_plain(bytes: &[u8], logical: LogicalType) -> Result<Column> {
275    if bytes.len() < 4 {
276        return Err(ColumnarError::CorruptedSegment {
277            reason: "plain header too short".into(),
278        });
279    }
280    let count = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
281    let mut pos = 4;
282    match logical {
283        LogicalType::Int64 => {
284            if bytes.len() < pos + count * 8 {
285                return Err(ColumnarError::CorruptedSegment {
286                    reason: "plain int64 truncated".into(),
287                });
288            }
289            let mut out = Vec::with_capacity(count);
290            for _ in 0..count {
291                let v = i64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap());
292                out.push(v);
293                pos += 8;
294            }
295            Ok(Column::Int64(out))
296        }
297        LogicalType::Float32 => {
298            if bytes.len() < pos + count * 4 {
299                return Err(ColumnarError::CorruptedSegment {
300                    reason: "plain float32 truncated".into(),
301                });
302            }
303            let mut out = Vec::with_capacity(count);
304            for _ in 0..count {
305                let v = f32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap());
306                out.push(v);
307                pos += 4;
308            }
309            Ok(Column::Float32(out))
310        }
311        LogicalType::Float64 => {
312            if bytes.len() < pos + count * 8 {
313                return Err(ColumnarError::CorruptedSegment {
314                    reason: "plain float64 truncated".into(),
315                });
316            }
317            let mut out = Vec::with_capacity(count);
318            for _ in 0..count {
319                let v = f64::from_le_bytes(bytes[pos..pos + 8].try_into().unwrap());
320                out.push(v);
321                pos += 8;
322            }
323            Ok(Column::Float64(out))
324        }
325        LogicalType::Bool => {
326            if bytes.len() < pos + count {
327                return Err(ColumnarError::CorruptedSegment {
328                    reason: "plain bool truncated".into(),
329                });
330            }
331            let mut out = Vec::with_capacity(count);
332            for _ in 0..count {
333                out.push(bytes[pos] != 0);
334                pos += 1;
335            }
336            Ok(Column::Bool(out))
337        }
338        LogicalType::Binary => decode_varlen(&bytes[4..], count).map(Column::Binary),
339        LogicalType::Fixed(len) => {
340            if bytes.len() < pos + 2 {
341                return Err(ColumnarError::CorruptedSegment {
342                    reason: "fixed header truncated".into(),
343                });
344            }
345            let stored_len = u16::from_le_bytes(bytes[pos..pos + 2].try_into().unwrap()) as usize;
346            pos += 2;
347            if stored_len as u16 != len {
348                return Err(ColumnarError::CorruptedSegment {
349                    reason: "fixed length mismatch".into(),
350                });
351            }
352            let expected = pos + count * stored_len;
353            if bytes.len() < expected {
354                return Err(ColumnarError::CorruptedSegment {
355                    reason: "fixed values truncated".into(),
356                });
357            }
358            let mut values = Vec::with_capacity(count);
359            for _ in 0..count {
360                let end = pos + stored_len;
361                values.push(bytes[pos..end].to_vec());
362                pos = end;
363            }
364            Ok(Column::Fixed {
365                len: stored_len,
366                values,
367            })
368        }
369    }
370}
371
372fn decode_varlen(bytes: &[u8], count: usize) -> Result<Vec<Vec<u8>>> {
373    let mut pos = 0;
374    let mut values = Vec::with_capacity(count);
375    for _ in 0..count {
376        if pos + 4 > bytes.len() {
377            return Err(ColumnarError::CorruptedSegment {
378                reason: "varlen length truncated".into(),
379            });
380        }
381        let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
382        pos += 4;
383        if pos + len > bytes.len() {
384            return Err(ColumnarError::CorruptedSegment {
385                reason: "varlen value truncated".into(),
386            });
387        }
388        values.push(bytes[pos..pos + len].to_vec());
389        pos += len;
390    }
391    Ok(values)
392}
393
394fn encode_dictionary(column: &Column) -> Result<Vec<u8>> {
395    let values = match column {
396        Column::Binary(v) => v,
397        Column::Fixed { values, .. } => values,
398        _ => {
399            return Err(ColumnarError::CorruptedSegment {
400                reason: "dictionary encoding requires binary data".into(),
401            })
402        }
403    };
404
405    let mut dict: Vec<Vec<u8>> = Vec::new();
406    let mut indices = Vec::with_capacity(values.len());
407    for v in values {
408        if let Some((idx, _)) = dict.iter().enumerate().find(|(_, existing)| *existing == v) {
409            indices.push(idx as u32);
410        } else {
411            let idx = dict.len() as u32;
412            dict.push(v.clone());
413            indices.push(idx);
414        }
415    }
416
417    let mut buf = Vec::new();
418    buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
419    buf.extend_from_slice(&(dict.len() as u32).to_le_bytes());
420    for entry in &dict {
421        let len: u32 = entry
422            .len()
423            .try_into()
424            .map_err(|_| ColumnarError::CorruptedSegment {
425                reason: "dict entry too long".into(),
426            })?;
427        buf.extend_from_slice(&len.to_le_bytes());
428        buf.extend_from_slice(entry);
429    }
430    for idx in indices {
431        buf.extend_from_slice(&idx.to_le_bytes());
432    }
433    Ok(buf)
434}
435
436fn decode_dictionary(bytes: &[u8], logical: LogicalType) -> Result<Column> {
437    if bytes.len() < 8 {
438        return Err(ColumnarError::CorruptedSegment {
439            reason: "dictionary header too short".into(),
440        });
441    }
442    let count = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
443    let dict_count = u32::from_le_bytes(bytes[4..8].try_into().unwrap()) as usize;
444
445    let mut pos = 8;
446    let mut dict = Vec::with_capacity(dict_count);
447    for _ in 0..dict_count {
448        if pos + 4 > bytes.len() {
449            return Err(ColumnarError::CorruptedSegment {
450                reason: "dict length truncated".into(),
451            });
452        }
453        let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
454        pos += 4;
455        if pos + len > bytes.len() {
456            return Err(ColumnarError::CorruptedSegment {
457                reason: "dict entry truncated".into(),
458            });
459        }
460        dict.push(bytes[pos..pos + len].to_vec());
461        pos += len;
462    }
463
464    let expected_idx_bytes =
465        count
466            .checked_mul(4)
467            .ok_or_else(|| ColumnarError::CorruptedSegment {
468                reason: "index overflow".into(),
469            })?;
470    if pos + expected_idx_bytes > bytes.len() {
471        return Err(ColumnarError::CorruptedSegment {
472            reason: "dictionary indices truncated".into(),
473        });
474    }
475
476    let mut values = Vec::with_capacity(count);
477    for _ in 0..count {
478        let idx = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
479        pos += 4;
480        let entry = dict
481            .get(idx)
482            .ok_or_else(|| ColumnarError::CorruptedSegment {
483                reason: "dictionary index out of bounds".into(),
484            })?;
485        values.push(entry.clone());
486    }
487
488    match logical {
489        LogicalType::Binary => Ok(Column::Binary(values)),
490        LogicalType::Fixed(len) => {
491            for v in &values {
492                if v.len() != len as usize {
493                    return Err(ColumnarError::CorruptedSegment {
494                        reason: "fixed length mismatch".into(),
495                    });
496                }
497            }
498            Ok(Column::Fixed {
499                len: len as usize,
500                values,
501            })
502        }
503        _ => Err(ColumnarError::CorruptedSegment {
504            reason: "dictionary logical mismatch".into(),
505        }),
506    }
507}
508
509fn encode_rle(column: &Column) -> Result<Vec<u8>> {
510    match column {
511        Column::Int64(values) => {
512            encode_rle_nums(values.iter().map(|v| v.to_le_bytes().to_vec()), 8)
513        }
514        Column::Float32(values) => {
515            encode_rle_nums(values.iter().map(|v| v.to_le_bytes().to_vec()), 4)
516        }
517        Column::Float64(values) => {
518            encode_rle_nums(values.iter().map(|v| v.to_le_bytes().to_vec()), 8)
519        }
520        Column::Bool(values) => {
521            let mut runs = Vec::new();
522            let mut iter = values.iter().copied();
523            if let Some(mut current) = iter.next() {
524                let mut len = 1u32;
525                for v in iter {
526                    if v == current && len < u32::MAX {
527                        len += 1;
528                    } else {
529                        runs.push((current as u8, len));
530                        current = v;
531                        len = 1;
532                    }
533                }
534                runs.push((current as u8, len));
535            }
536            let mut buf = Vec::new();
537            buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
538            buf.extend_from_slice(&(runs.len() as u32).to_le_bytes());
539            for (val, len) in runs {
540                buf.push(val);
541                buf.extend_from_slice(&len.to_le_bytes());
542            }
543            Ok(buf)
544        }
545        _ => Err(ColumnarError::CorruptedSegment {
546            reason: "rle only supports numeric/bool".into(),
547        }),
548    }
549}
550
551fn encode_rle_nums<I>(iter: I, width: usize) -> Result<Vec<u8>>
552where
553    I: Iterator<Item = Vec<u8>>,
554{
555    let mut runs: Vec<(Vec<u8>, u32)> = Vec::new();
556    let mut it = iter.peekable();
557    if let Some(first) = it.next() {
558        let mut current = first;
559        let mut len = 1u32;
560        for v in it {
561            if v == current && len < u32::MAX {
562                len += 1;
563            } else {
564                runs.push((current, len));
565                current = v;
566                len = 1;
567            }
568        }
569        runs.push((current, len));
570    }
571
572    let mut buf = Vec::new();
573    let total: u32 = runs.iter().map(|(_, l)| *l).sum();
574    buf.extend_from_slice(&total.to_le_bytes());
575    buf.extend_from_slice(&(runs.len() as u32).to_le_bytes());
576    for (val, len) in runs {
577        if val.len() != width {
578            return Err(ColumnarError::CorruptedSegment {
579                reason: "rle width mismatch".into(),
580            });
581        }
582        buf.extend_from_slice(&val);
583        buf.extend_from_slice(&len.to_le_bytes());
584    }
585    Ok(buf)
586}
587
588fn decode_rle(bytes: &[u8], logical: LogicalType) -> Result<Column> {
589    if bytes.len() < 8 {
590        return Err(ColumnarError::CorruptedSegment {
591            reason: "rle header too short".into(),
592        });
593    }
594    let total = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
595    let run_count = u32::from_le_bytes(bytes[4..8].try_into().unwrap()) as usize;
596    let mut pos = 8;
597
598    match logical {
599        LogicalType::Int64 | LogicalType::Float64 | LogicalType::Float32 => {
600            let width = if matches!(logical, LogicalType::Float32) {
601                4
602            } else {
603                8
604            };
605            let mut out: Vec<Vec<u8>> = Vec::with_capacity(run_count);
606            let mut lengths = Vec::with_capacity(run_count);
607            for _ in 0..run_count {
608                if pos + width + 4 > bytes.len() {
609                    return Err(ColumnarError::CorruptedSegment {
610                        reason: "rle numeric truncated".into(),
611                    });
612                }
613                out.push(bytes[pos..pos + width].to_vec());
614                pos += width;
615                lengths.push(u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize);
616                pos += 4;
617            }
618            let mut values = Vec::with_capacity(total);
619            for (val_bytes, len) in out.into_iter().zip(lengths) {
620                for _ in 0..len {
621                    let v = match logical {
622                        LogicalType::Int64 => {
623                            let val_arr: [u8; 8] = val_bytes.as_slice().try_into().unwrap();
624                            ColumnValue::I64(i64::from_le_bytes(val_arr))
625                        }
626                        LogicalType::Float64 => {
627                            let val_arr: [u8; 8] = val_bytes.as_slice().try_into().unwrap();
628                            ColumnValue::F64(f64::from_le_bytes(val_arr))
629                        }
630                        LogicalType::Float32 => {
631                            let val_arr: [u8; 4] = val_bytes.as_slice().try_into().unwrap();
632                            ColumnValue::F32(f32::from_le_bytes(val_arr))
633                        }
634                        _ => unreachable!(),
635                    };
636                    values.push(v);
637                }
638            }
639            match logical {
640                LogicalType::Int64 => Ok(Column::Int64(
641                    values
642                        .into_iter()
643                        .map(|v| match v {
644                            ColumnValue::I64(x) => x,
645                            _ => unreachable!(),
646                        })
647                        .collect(),
648                )),
649                LogicalType::Float32 => Ok(Column::Float32(
650                    values
651                        .into_iter()
652                        .map(|v| match v {
653                            ColumnValue::F32(x) => x,
654                            _ => unreachable!(),
655                        })
656                        .collect(),
657                )),
658                LogicalType::Float64 => Ok(Column::Float64(
659                    values
660                        .into_iter()
661                        .map(|v| match v {
662                            ColumnValue::F64(x) => x,
663                            _ => unreachable!(),
664                        })
665                        .collect(),
666                )),
667                _ => unreachable!(),
668            }
669        }
670        LogicalType::Bool => {
671            let mut runs = Vec::with_capacity(run_count);
672            for _ in 0..run_count {
673                if pos + 5 > bytes.len() {
674                    return Err(ColumnarError::CorruptedSegment {
675                        reason: "rle bool truncated".into(),
676                    });
677                }
678                let val = bytes[pos] != 0;
679                pos += 1;
680                let len = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap()) as usize;
681                pos += 4;
682                runs.push((val, len));
683            }
684            let mut out = Vec::with_capacity(total);
685            for (val, len) in runs {
686                out.extend(std::iter::repeat_n(val, len));
687            }
688            Ok(Column::Bool(out))
689        }
690        _ => Err(ColumnarError::CorruptedSegment {
691            reason: "rle logical mismatch".into(),
692        }),
693    }
694}
695
696enum ColumnValue {
697    I64(i64),
698    F32(f32),
699    F64(f64),
700}
701
702fn encode_bitpack(column: &Column) -> Result<Vec<u8>> {
703    let values = match column {
704        Column::Bool(v) => v,
705        _ => {
706            return Err(ColumnarError::CorruptedSegment {
707                reason: "bitpack supports bool only".into(),
708            })
709        }
710    };
711    let count = values.len();
712    let mut buf = Vec::with_capacity(4 + count.div_ceil(8));
713    buf.extend_from_slice(&(count as u32).to_le_bytes());
714    let mut current = 0u8;
715    let mut bit = 0;
716    for v in values {
717        if *v {
718            current |= 1 << bit;
719        }
720        bit += 1;
721        if bit == 8 {
722            buf.push(current);
723            current = 0;
724            bit = 0;
725        }
726    }
727    if bit > 0 {
728        buf.push(current);
729    }
730    Ok(buf)
731}
732
733fn decode_bitpack(bytes: &[u8], logical: LogicalType) -> Result<Column> {
734    if logical != LogicalType::Bool {
735        return Err(ColumnarError::CorruptedSegment {
736            reason: "bitpack logical mismatch".into(),
737        });
738    }
739    if bytes.len() < 4 {
740        return Err(ColumnarError::CorruptedSegment {
741            reason: "bitpack header too short".into(),
742        });
743    }
744    let count = u32::from_le_bytes(bytes[0..4].try_into().unwrap()) as usize;
745    let needed = 4 + count.div_ceil(8);
746    if bytes.len() < needed {
747        return Err(ColumnarError::CorruptedSegment {
748            reason: "bitpack data truncated".into(),
749        });
750    }
751    let mut out = Vec::with_capacity(count);
752    for i in 0..count {
753        let byte = bytes[4 + (i / 8)];
754        let bit = i % 8;
755        out.push(byte & (1 << bit) != 0);
756    }
757    Ok(Column::Bool(out))
758}
759
760#[cfg(all(test, not(target_arch = "wasm32")))]
761mod tests {
762    use super::*;
763
764    #[test]
765    fn plain_int64_roundtrip() {
766        let col = Column::Int64(vec![1, -2, 3]);
767        let encoded = encode_column(
768            &col,
769            Encoding::Plain,
770            Compression::None,
771            true,
772            LogicalType::Int64,
773        )
774        .unwrap();
775        let decoded = decode_column(
776            &encoded,
777            LogicalType::Int64,
778            Encoding::Plain,
779            Compression::None,
780            true,
781        )
782        .unwrap();
783        assert_eq!(col, decoded);
784    }
785
786    #[cfg(feature = "compression-lz4")]
787    #[test]
788    fn dictionary_binary_roundtrip_lz4() {
789        let col = Column::Binary(vec![b"aa".to_vec(), b"bb".to_vec(), b"aa".to_vec()]);
790        let encoded = encode_column(
791            &col,
792            Encoding::Dictionary,
793            Compression::Lz4,
794            true,
795            LogicalType::Binary,
796        )
797        .unwrap();
798        let decoded = decode_column(
799            &encoded,
800            LogicalType::Binary,
801            Encoding::Dictionary,
802            Compression::Lz4,
803            true,
804        )
805        .unwrap();
806        assert_eq!(col, decoded);
807    }
808
809    #[test]
810    fn rle_bool_roundtrip() {
811        let col = Column::Bool(vec![true, true, true, false, false, true]);
812        let encoded = encode_column(
813            &col,
814            Encoding::Rle,
815            Compression::None,
816            false,
817            LogicalType::Bool,
818        )
819        .unwrap();
820        let decoded = decode_column(
821            &encoded,
822            LogicalType::Bool,
823            Encoding::Rle,
824            Compression::None,
825            false,
826        )
827        .unwrap();
828        assert_eq!(col, decoded);
829    }
830
831    #[test]
832    fn bitpack_bool_roundtrip() {
833        let col = Column::Bool(vec![
834            true, false, true, true, false, false, false, true, true,
835        ]);
836        let encoded = encode_column(
837            &col,
838            Encoding::Bitpack,
839            Compression::None,
840            true,
841            LogicalType::Bool,
842        )
843        .unwrap();
844        let decoded = decode_column(
845            &encoded,
846            LogicalType::Bool,
847            Encoding::Bitpack,
848            Compression::None,
849            true,
850        )
851        .unwrap();
852        assert_eq!(col, decoded);
853    }
854
855    #[test]
856    fn checksum_mismatch_detected() {
857        let col = Column::Int64(vec![42]);
858        let mut encoded = encode_column(
859            &col,
860            Encoding::Plain,
861            Compression::None,
862            true,
863            LogicalType::Int64,
864        )
865        .unwrap();
866        encoded[5] ^= 0xFF; // flip a byte in payload
867        let err = decode_column(
868            &encoded,
869            LogicalType::Int64,
870            Encoding::Plain,
871            Compression::None,
872            true,
873        )
874        .unwrap_err();
875        assert!(matches!(err, ColumnarError::ChecksumMismatch));
876    }
877}