Skip to main content

nodedb_columnar/
reader.rs

1//! Segment reader: decode compressed columns from a segment into typed vectors
2//! or Arrow arrays, with column projection and block predicate pushdown.
3//!
4//! # Block Wire Format
5//!
6//! Each block in a column is stored as `[compressed_len: u32 LE][compressed_data]`.
7//!
8//! The compressed_data structure depends on the column type:
9//! - **Int64/Float64/Timestamp**: `[validity_bitmap][codec_compressed_values]`
10//! - **Bool/Decimal/Uuid/Vector**: `[validity_bitmap][codec_compressed_bytes]`
11//! - **String/Bytes/Geometry**: `[validity_bitmap][offset_len: u32][compressed_offsets][compressed_data]`
12
13use nodedb_codec::ColumnCodec;
14
15use crate::delete_bitmap::DeleteBitmap;
16use crate::error::ColumnarError;
17use crate::format::{ColumnMeta, HEADER_SIZE, SegmentFooter, SegmentHeader};
18use crate::predicate::ScanPredicate;
19
20/// Decoded column data from a segment scan.
21#[derive(Debug)]
22pub enum DecodedColumn {
23    Int64 {
24        values: Vec<i64>,
25        valid: Vec<bool>,
26    },
27    Float64 {
28        values: Vec<f64>,
29        valid: Vec<bool>,
30    },
31    Timestamp {
32        values: Vec<i64>,
33        valid: Vec<bool>,
34    },
35    Bool {
36        values: Vec<bool>,
37        valid: Vec<bool>,
38    },
39    /// Variable-length or fixed-size binary (String, Bytes, Geometry, Decimal, Uuid, Vector).
40    Binary {
41        /// Raw decompressed bytes for the block.
42        data: Vec<u8>,
43        /// Per-row byte offsets into `data`. Length = row_count + 1.
44        offsets: Vec<u32>,
45        valid: Vec<bool>,
46    },
47}
48
49/// Reads and decodes columns from a segment byte buffer.
50///
51/// Supports column projection (only decode requested columns) and block
52/// predicate pushdown (skip blocks whose stats prove no match).
53pub struct SegmentReader<'a> {
54    data: &'a [u8],
55    footer: SegmentFooter,
56}
57
58impl<'a> SegmentReader<'a> {
59    /// Open a segment from a byte buffer. Validates header and footer CRC.
60    pub fn open(data: &'a [u8]) -> Result<Self, ColumnarError> {
61        SegmentHeader::from_bytes(data)?;
62        let footer = SegmentFooter::from_segment_tail(data)?;
63        Ok(Self { data, footer })
64    }
65
66    /// Access the footer metadata.
67    pub fn footer(&self) -> &SegmentFooter {
68        &self.footer
69    }
70
71    /// Total row count in the segment.
72    pub fn row_count(&self) -> u64 {
73        self.footer.row_count
74    }
75
76    /// Number of columns in the segment.
77    pub fn column_count(&self) -> usize {
78        self.footer.column_count as usize
79    }
80
81    /// Read a single column, decoding all blocks.
82    ///
83    /// `col_idx` is the column index in the footer's column metadata.
84    pub fn read_column(&self, col_idx: usize) -> Result<DecodedColumn, ColumnarError> {
85        self.read_column_filtered(col_idx, &[])
86    }
87
88    /// Read a single column with predicate pushdown.
89    ///
90    /// Blocks whose stats satisfy the predicates are skipped. For skipped
91    /// blocks, null/zero-fill rows are emitted to preserve row alignment
92    /// across projected columns.
93    pub fn read_column_filtered(
94        &self,
95        col_idx: usize,
96        predicates: &[ScanPredicate],
97    ) -> Result<DecodedColumn, ColumnarError> {
98        self.read_column_impl(col_idx, predicates, &DeleteBitmap::new())
99    }
100
101    /// Read multiple columns with shared predicate pushdown.
102    ///
103    /// All columns share the same block skip decisions so row alignment
104    /// is maintained across the result set.
105    pub fn read_columns(
106        &self,
107        col_indices: &[usize],
108        predicates: &[ScanPredicate],
109    ) -> Result<Vec<DecodedColumn>, ColumnarError> {
110        col_indices
111            .iter()
112            .map(|&idx| self.read_column_filtered(idx, predicates))
113            .collect()
114    }
115
116    /// Read a column with both predicate pushdown and delete bitmap masking.
117    ///
118    /// Deleted rows have their validity set to false in the output.
119    /// Fully deleted blocks are skipped entirely (no decompression).
120    pub fn read_column_with_deletes(
121        &self,
122        col_idx: usize,
123        predicates: &[ScanPredicate],
124        deletes: &DeleteBitmap,
125    ) -> Result<DecodedColumn, ColumnarError> {
126        self.read_column_impl(col_idx, predicates, deletes)
127    }
128
129    /// Shared implementation for column reading with predicate pushdown and
130    /// optional delete bitmap masking.
131    fn read_column_impl(
132        &self,
133        col_idx: usize,
134        predicates: &[ScanPredicate],
135        deletes: &DeleteBitmap,
136    ) -> Result<DecodedColumn, ColumnarError> {
137        if col_idx >= self.footer.columns.len() {
138            return Err(ColumnarError::ColumnOutOfRange {
139                index: col_idx,
140                count: self.footer.columns.len(),
141            });
142        }
143
144        let col_meta = &self.footer.columns[col_idx];
145        let my_preds: Vec<&ScanPredicate> =
146            predicates.iter().filter(|p| p.col_idx == col_idx).collect();
147
148        let col_start = HEADER_SIZE + col_meta.offset as usize;
149        let mut cursor = col_start;
150        let col_type = infer_column_type(col_meta);
151        let mut result = empty_decoded(&col_type);
152        let mut global_row: u32 = 0;
153
154        for block_stat in &col_meta.block_stats {
155            let block_row_count = block_stat.row_count;
156
157            if cursor + 4 > self.data.len() {
158                return Err(ColumnarError::TruncatedSegment {
159                    expected: cursor + 4,
160                    got: self.data.len(),
161                });
162            }
163            let block_len = u32::from_le_bytes([
164                self.data[cursor],
165                self.data[cursor + 1],
166                self.data[cursor + 2],
167                self.data[cursor + 3],
168            ]) as usize;
169            cursor += 4;
170            let block_data = &self.data[cursor..cursor + block_len];
171            cursor += block_len;
172
173            // Skip via predicate pushdown.
174            let pred_skip = my_preds.iter().any(|p| p.can_skip_block(block_stat));
175
176            // Skip if entire block is deleted.
177            let delete_skip =
178                !deletes.is_empty() && deletes.is_block_fully_deleted(global_row, block_row_count);
179
180            if pred_skip || delete_skip {
181                append_null_fill(&mut result, block_row_count as usize);
182                global_row += block_row_count;
183                continue;
184            }
185
186            // Decode the block.
187            let pre_len = result_valid_len(&result);
188            decode_block(
189                &mut result,
190                block_data,
191                &col_type,
192                col_meta.codec,
193                block_row_count as usize,
194                0,
195            )?;
196
197            // Apply delete bitmap to the newly decoded rows.
198            if !deletes.is_empty() {
199                let valid_slice = result_valid_slice_mut(&mut result, pre_len);
200                deletes.apply_to_validity(valid_slice, global_row);
201            }
202
203            global_row += block_row_count;
204        }
205
206        Ok(result)
207    }
208
209    /// Read multiple columns with predicate pushdown and delete bitmap.
210    pub fn read_columns_with_deletes(
211        &self,
212        col_indices: &[usize],
213        predicates: &[ScanPredicate],
214        deletes: &DeleteBitmap,
215    ) -> Result<Vec<DecodedColumn>, ColumnarError> {
216        col_indices
217            .iter()
218            .map(|&idx| self.read_column_with_deletes(idx, predicates, deletes))
219            .collect()
220    }
221}
222
223/// Infer a simplified column type from ColumnMeta for decode dispatch.
224///
225/// We use the codec as a strong signal: DeltaFastLanesLz4 = numeric,
226/// FsstLz4 = string, etc. The name is a fallback heuristic.
227fn infer_column_type(meta: &ColumnMeta) -> ColumnKind {
228    match meta.codec {
229        ColumnCodec::DeltaFastLanesLz4
230        | ColumnCodec::DeltaFastLanesRans
231        | ColumnCodec::FastLanesLz4
232        | ColumnCodec::Delta
233        | ColumnCodec::DoubleDelta => ColumnKind::Int64,
234
235        ColumnCodec::AlpFastLanesLz4
236        | ColumnCodec::AlpFastLanesRans
237        | ColumnCodec::AlpRdLz4
238        | ColumnCodec::PcodecLz4
239        | ColumnCodec::Gorilla => ColumnKind::Float64,
240
241        ColumnCodec::FsstLz4 | ColumnCodec::FsstRans => ColumnKind::VarLen,
242
243        // LZ4/Raw/Zstd could be bool, binary, decimal, uuid, vector — use
244        // block_stats to distinguish: if min/max are NaN → binary-like.
245        ColumnCodec::Lz4 | ColumnCodec::Raw | ColumnCodec::Zstd | ColumnCodec::Auto => {
246            if meta.block_stats.first().is_some_and(|s| !s.min.is_nan()) {
247                ColumnKind::Int64 // Numeric fallback.
248            } else {
249                ColumnKind::Binary
250            }
251        }
252    }
253}
254
255/// Simplified column kind for decode dispatch.
256#[derive(Debug, Clone, Copy)]
257enum ColumnKind {
258    Int64,
259    Float64,
260    VarLen,
261    Binary,
262}
263
264/// Create an empty DecodedColumn for the given kind.
265fn empty_decoded(kind: &ColumnKind) -> DecodedColumn {
266    match kind {
267        ColumnKind::Int64 => DecodedColumn::Int64 {
268            values: Vec::new(),
269            valid: Vec::new(),
270        },
271        ColumnKind::Float64 => DecodedColumn::Float64 {
272            values: Vec::new(),
273            valid: Vec::new(),
274        },
275        ColumnKind::VarLen | ColumnKind::Binary => DecodedColumn::Binary {
276            data: Vec::new(),
277            offsets: Vec::new(),
278            valid: Vec::new(),
279        },
280    }
281}
282
283/// Append null-fill rows for a skipped block.
284fn append_null_fill(result: &mut DecodedColumn, row_count: usize) {
285    match result {
286        DecodedColumn::Int64 { values, valid } => {
287            values.extend(std::iter::repeat_n(0i64, row_count));
288            valid.extend(std::iter::repeat_n(false, row_count));
289        }
290        DecodedColumn::Float64 { values, valid } => {
291            values.extend(std::iter::repeat_n(0.0f64, row_count));
292            valid.extend(std::iter::repeat_n(false, row_count));
293        }
294        DecodedColumn::Timestamp { values, valid } => {
295            values.extend(std::iter::repeat_n(0i64, row_count));
296            valid.extend(std::iter::repeat_n(false, row_count));
297        }
298        DecodedColumn::Bool { values, valid } => {
299            values.extend(std::iter::repeat_n(false, row_count));
300            valid.extend(std::iter::repeat_n(false, row_count));
301        }
302        DecodedColumn::Binary {
303            data: _,
304            offsets,
305            valid,
306        } => {
307            let last = *offsets.last().unwrap_or(&0);
308            // For null-fill: each null row has zero-length data.
309            // Need row_count + 1 offsets if this is the first block, else row_count.
310            if offsets.is_empty() {
311                offsets.push(last); // Initial sentinel for first block.
312            }
313            offsets.extend(std::iter::repeat_n(last, row_count));
314            valid.extend(std::iter::repeat_n(false, row_count));
315        }
316    }
317}
318
319/// Get the current length of the validity vector in a DecodedColumn.
320fn result_valid_len(result: &DecodedColumn) -> usize {
321    match result {
322        DecodedColumn::Int64 { valid, .. }
323        | DecodedColumn::Float64 { valid, .. }
324        | DecodedColumn::Timestamp { valid, .. }
325        | DecodedColumn::Bool { valid, .. }
326        | DecodedColumn::Binary { valid, .. } => valid.len(),
327    }
328}
329
330/// Get a mutable slice of the validity vector starting from `offset`.
331fn result_valid_slice_mut(result: &mut DecodedColumn, offset: usize) -> &mut [bool] {
332    match result {
333        DecodedColumn::Int64 { valid, .. }
334        | DecodedColumn::Float64 { valid, .. }
335        | DecodedColumn::Timestamp { valid, .. }
336        | DecodedColumn::Bool { valid, .. }
337        | DecodedColumn::Binary { valid, .. } => &mut valid[offset..],
338    }
339}
340
341/// Decode a single block and append results to the DecodedColumn.
342fn decode_block(
343    result: &mut DecodedColumn,
344    block_data: &[u8],
345    kind: &ColumnKind,
346    codec: ColumnCodec,
347    row_count: usize,
348    _block_idx: usize,
349) -> Result<(), ColumnarError> {
350    let bitmap_size = row_count.div_ceil(8);
351
352    if block_data.len() < bitmap_size {
353        return Err(ColumnarError::TruncatedSegment {
354            expected: bitmap_size,
355            got: block_data.len(),
356        });
357    }
358
359    let bitmap = &block_data[..bitmap_size];
360    let payload = &block_data[bitmap_size..];
361
362    // Extract validity from bitmap.
363    let valid: Vec<bool> = (0..row_count)
364        .map(|i| bitmap[i / 8] & (1 << (i % 8)) != 0)
365        .collect();
366
367    match kind {
368        ColumnKind::Int64 => {
369            let DecodedColumn::Int64 { values, valid: v } = result else {
370                append_null_fill(result, row_count);
371                return Ok(());
372            };
373            let decoded = nodedb_codec::decode_i64_pipeline(payload, codec)?;
374            values.extend_from_slice(&decoded[..row_count.min(decoded.len())]);
375            while values.len() < v.len() + row_count {
376                values.push(0);
377            }
378            v.extend_from_slice(&valid);
379        }
380        ColumnKind::Float64 => {
381            let DecodedColumn::Float64 { values, valid: v } = result else {
382                append_null_fill(result, row_count);
383                return Ok(());
384            };
385            let decoded = nodedb_codec::decode_f64_pipeline(payload, codec)?;
386            values.extend_from_slice(&decoded[..row_count.min(decoded.len())]);
387            while values.len() < v.len() + row_count {
388                values.push(0.0);
389            }
390            v.extend_from_slice(&valid);
391        }
392        ColumnKind::VarLen => {
393            let DecodedColumn::Binary {
394                data,
395                offsets,
396                valid: v,
397            } = result
398            else {
399                append_null_fill(result, row_count);
400                return Ok(());
401            };
402            // Variable-length layout: [offset_len: u32][compressed_offsets][compressed_data].
403            if payload.len() < 4 {
404                return Err(ColumnarError::TruncatedSegment {
405                    expected: bitmap_size + 4,
406                    got: block_data.len(),
407                });
408            }
409            let offset_len =
410                u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]) as usize;
411            let offset_data = &payload[4..4 + offset_len];
412            let string_data = &payload[4 + offset_len..];
413
414            let decoded_offsets =
415                nodedb_codec::decode_i64_pipeline(offset_data, ColumnCodec::DeltaFastLanesLz4)?;
416            let decoded_bytes = nodedb_codec::decode_bytes_pipeline(string_data, codec)?;
417
418            // decoded_offsets has row_count + 1 entries (including sentinel).
419            // Map them to absolute positions in the output data buffer.
420            let base = data.len() as u32;
421            let n_offsets = (row_count + 1).min(decoded_offsets.len());
422            for &off in &decoded_offsets[..n_offsets] {
423                offsets.push(base + off as u32);
424            }
425
426            data.extend_from_slice(&decoded_bytes);
427            v.extend_from_slice(&valid);
428        }
429        ColumnKind::Binary => {
430            let DecodedColumn::Binary {
431                data,
432                offsets,
433                valid: v,
434            } = result
435            else {
436                append_null_fill(result, row_count);
437                return Ok(());
438            };
439            let decoded_bytes = nodedb_codec::decode_bytes_pipeline(payload, codec)?;
440            let base = data.len() as u32;
441
442            if row_count > 0 && !decoded_bytes.is_empty() {
443                let chunk_size = decoded_bytes.len() / row_count;
444                for i in 0..row_count {
445                    offsets.push(base + (i * chunk_size) as u32);
446                }
447                offsets.push(base + decoded_bytes.len() as u32);
448            } else {
449                let last = *offsets.last().unwrap_or(&0);
450                offsets.extend(std::iter::repeat_n(last, row_count + 1));
451            }
452
453            data.extend_from_slice(&decoded_bytes);
454            v.extend_from_slice(&valid);
455        }
456    }
457
458    Ok(())
459}
460
461#[cfg(test)]
462mod tests {
463    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
464    use nodedb_types::value::Value;
465
466    use super::*;
467    use crate::memtable::ColumnarMemtable;
468    use crate::writer::SegmentWriter;
469
470    fn write_test_segment(rows: usize) -> Vec<u8> {
471        let schema = ColumnarSchema::new(vec![
472            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
473            ColumnDef::required("name", ColumnType::String),
474            ColumnDef::nullable("score", ColumnType::Float64),
475        ])
476        .expect("valid");
477
478        let mut mt = ColumnarMemtable::new(&schema);
479        for i in 0..rows {
480            mt.append_row(&[
481                Value::Integer(i as i64),
482                Value::String(format!("user_{i}")),
483                if i % 5 == 0 {
484                    Value::Null
485                } else {
486                    Value::Float(i as f64 * 0.5)
487                },
488            ])
489            .expect("append");
490        }
491
492        let (schema, columns, row_count) = mt.drain();
493        SegmentWriter::plain()
494            .write_segment(&schema, &columns, row_count)
495            .expect("write")
496    }
497
498    #[test]
499    fn read_int64_column() {
500        let segment = write_test_segment(100);
501        let reader = SegmentReader::open(&segment).expect("open");
502
503        assert_eq!(reader.row_count(), 100);
504        assert_eq!(reader.column_count(), 3);
505
506        let col = reader.read_column(0).expect("read id column");
507        match col {
508            DecodedColumn::Int64 { values, valid } => {
509                assert_eq!(values.len(), 100);
510                assert_eq!(valid.len(), 100);
511                assert_eq!(values[0], 0);
512                assert_eq!(values[99], 99);
513                assert!(valid.iter().all(|&v| v)); // No nulls in id.
514            }
515            _ => panic!("expected Int64"),
516        }
517    }
518
519    #[test]
520    fn read_string_column() {
521        let segment = write_test_segment(50);
522        let reader = SegmentReader::open(&segment).expect("open");
523
524        let col = reader.read_column(1).expect("read name column");
525        match col {
526            DecodedColumn::Binary {
527                data,
528                offsets,
529                valid,
530            } => {
531                assert_eq!(valid.len(), 50);
532                assert!(valid.iter().all(|&v| v));
533                // Check first row.
534                let start = offsets[0] as usize;
535                let end = offsets[1] as usize;
536                let first = std::str::from_utf8(&data[start..end]).expect("utf8");
537                assert_eq!(first, "user_0");
538                // Check last row.
539                let start = offsets[49] as usize;
540                let end = offsets[50] as usize;
541                let last = std::str::from_utf8(&data[start..end]).expect("utf8");
542                assert_eq!(last, "user_49");
543            }
544            _ => panic!("expected Binary (string)"),
545        }
546    }
547
548    #[test]
549    fn read_float64_with_nulls() {
550        let segment = write_test_segment(100);
551        let reader = SegmentReader::open(&segment).expect("open");
552
553        let col = reader.read_column(2).expect("read score column");
554        // Score column uses AlpFastLanesLz4 → decoded as Float64.
555        let (values, valid) = match &col {
556            DecodedColumn::Float64 { values, valid } => (values.as_slice(), valid.as_slice()),
557            other => panic!("expected Float64, got {other:?}"),
558        };
559
560        // Float64 column: every 5th row is null (rows 0,5,10,...,95 = 20 nulls).
561        assert_eq!(valid.len(), 100);
562        let null_count = valid.iter().filter(|&&v| !v).count();
563        assert_eq!(null_count, 20);
564
565        // Row 1: score = 1 * 0.5 = 0.5
566        assert!(valid[1]);
567        assert!((values[1] - 0.5).abs() < 0.001);
568    }
569
570    #[test]
571    fn predicate_pushdown_skips_blocks() {
572        // Create a segment with multiple blocks (> 1024 rows).
573        let segment = write_test_segment(2500);
574        let reader = SegmentReader::open(&segment).expect("open");
575
576        // id column has 3 blocks: [0..1023], [1024..2047], [2048..2499].
577        let footer = reader.footer();
578        assert_eq!(footer.columns[0].block_count, 3);
579
580        // Predicate: id > 2100 → should skip blocks 0 and 1.
581        let pred = ScanPredicate::gt(0, 2100.0);
582        let col = reader
583            .read_column_filtered(0, &[pred])
584            .expect("filtered read");
585
586        match col {
587            DecodedColumn::Int64 { values, valid } => {
588                assert_eq!(values.len(), 2500);
589                // Blocks 0 and 1 should be null-filled (skipped).
590                assert!(!valid[0]); // Block 0 row 0: skipped.
591                assert!(!valid[1023]); // Block 0 last row: skipped.
592                assert!(!valid[1024]); // Block 1 first row: skipped.
593                assert!(!valid[2047]); // Block 1 last row: skipped.
594                // Block 2 should be present.
595                assert!(valid[2048]); // Block 2 first row: present.
596                assert_eq!(values[2048], 2048);
597                assert!(valid[2499]);
598                assert_eq!(values[2499], 2499);
599            }
600            _ => panic!("expected Int64"),
601        }
602    }
603
604    #[test]
605    fn read_multiple_columns() {
606        let segment = write_test_segment(50);
607        let reader = SegmentReader::open(&segment).expect("open");
608
609        let cols = reader.read_columns(&[0, 2], &[]).expect("read multi");
610        assert_eq!(cols.len(), 2);
611
612        // Column 0 (id): Int64.
613        match &cols[0] {
614            DecodedColumn::Int64 { values, .. } => {
615                assert_eq!(values.len(), 50);
616            }
617            _ => panic!("expected Int64 for id"),
618        }
619    }
620
621    #[test]
622    fn column_out_of_range() {
623        let segment = write_test_segment(10);
624        let reader = SegmentReader::open(&segment).expect("open");
625        assert!(matches!(
626            reader.read_column(99),
627            Err(ColumnarError::ColumnOutOfRange { index: 99, .. })
628        ));
629    }
630
631    #[test]
632    fn write_read_roundtrip_multi_block() {
633        let segment = write_test_segment(3000);
634        let reader = SegmentReader::open(&segment).expect("open");
635
636        let col = reader.read_column(0).expect("read id");
637        match col {
638            DecodedColumn::Int64 { values, valid } => {
639                assert_eq!(values.len(), 3000);
640                for i in 0..3000 {
641                    assert!(valid[i], "row {i} should be valid");
642                    assert_eq!(values[i], i as i64, "row {i} value mismatch");
643                }
644            }
645            _ => panic!("expected Int64"),
646        }
647    }
648}