Skip to main content

nodedb_columnar/reader/
segment_reader.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Segment reader: decode compressed columns from a segment into typed vectors,
4//! with column projection and block predicate pushdown.
5//!
6//! # Block Wire Format
7//!
8//! Each block in a column is stored as `[compressed_len: u32 LE][compressed_data]`.
9//!
10//! The compressed_data structure depends on the column type:
11//! - **Int64/Float64/Timestamp**: `[validity_bitmap][codec_compressed_values]`
12//! - **Bool/Decimal/Uuid/Vector**: `[validity_bitmap][codec_compressed_bytes]`
13//! - **String/Bytes/Geometry**: `[validity_bitmap][offset_len: u32][compressed_offsets][compressed_data]`
14
15use crate::delete_bitmap::DeleteBitmap;
16use crate::error::ColumnarError;
17use crate::format::{HEADER_SIZE, SegmentFooter, SegmentHeader};
18use crate::predicate::ScanPredicate;
19
20use super::block_decode::{
21    append_null_fill, decode_block, empty_decoded, infer_column_type, result_valid_len,
22    result_valid_slice_mut,
23};
24use super::types::DecodedColumn;
25
26/// Reads and decodes columns from a segment byte buffer.
27///
28/// Supports column projection (only decode requested columns) and block
29/// predicate pushdown (skip blocks whose stats prove no match).
30pub struct SegmentReader<'a> {
31    pub(super) data: &'a [u8],
32    pub(super) footer: SegmentFooter,
33}
34
35/// An owned segment reader that holds decrypted segment bytes.
36///
37/// Used when a segment was encrypted at rest: the decrypted plaintext is owned
38/// by this struct, and `reader()` borrows from it for column decoding.
39#[derive(Debug)]
40pub struct OwnedSegmentReader {
41    /// Decrypted plaintext segment bytes.
42    plaintext: Vec<u8>,
43    footer: SegmentFooter,
44}
45
46impl OwnedSegmentReader {
47    fn from_plaintext(plaintext: Vec<u8>) -> Result<Self, ColumnarError> {
48        SegmentHeader::from_bytes(&plaintext)?;
49        let footer = SegmentFooter::from_segment_tail(&plaintext)?;
50        Ok(Self { plaintext, footer })
51    }
52
53    /// Open a segment with optional at-rest decryption.
54    ///
55    /// - `kek = None` → requires a plaintext (`NDBS`) segment; returns
56    ///   `Err(MissingKek)` if the blob starts with `SEGC`.
57    /// - `kek = Some(key)` → requires an encrypted (`SEGC`) segment; decrypts
58    ///   the blob, then parses the inner plaintext. Returns `Err(KekRequired)`
59    ///   if the blob starts with `NDBS`.
60    pub fn open_with_kek(
61        blob: &[u8],
62        kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
63    ) -> Result<Self, ColumnarError> {
64        let is_encrypted = blob.len() >= 4 && blob[0..4] == crate::encrypt::SEGC_MAGIC;
65        if is_encrypted {
66            let key = kek.ok_or(ColumnarError::MissingKek)?;
67            let plaintext = crate::encrypt::decrypt_segment(key, blob)?;
68            Self::from_plaintext(plaintext)
69        } else if kek.is_some() {
70            Err(ColumnarError::KekRequired)
71        } else {
72            Self::from_plaintext(blob.to_vec())
73        }
74    }
75
76    /// Borrow a `SegmentReader` over the owned plaintext bytes.
77    pub fn reader(&self) -> SegmentReader<'_> {
78        SegmentReader {
79            data: &self.plaintext,
80            footer: self.footer.clone(),
81        }
82    }
83
84    /// Access the segment footer.
85    pub fn footer(&self) -> &SegmentFooter {
86        &self.footer
87    }
88
89    /// Total row count in the segment.
90    pub fn row_count(&self) -> u64 {
91        self.footer.row_count
92    }
93}
94
95impl<'a> SegmentReader<'a> {
96    /// Open a plaintext segment from a byte buffer.
97    ///
98    /// Validates the `NDBS` header and footer CRC. If `data` starts with `SEGC`
99    /// (an encrypted envelope) and `kek` is `None`, returns `Err(MissingKek)`.
100    /// If `data` starts with `NDBS` (plaintext) and `kek` is `Some`, returns
101    /// `Err(KekRequired)` — refusing to load unencrypted data when encryption
102    /// is configured.
103    ///
104    /// To read an encrypted segment, use [`OwnedSegmentReader::open_with_kek`].
105    pub fn open(data: &'a [u8]) -> Result<Self, ColumnarError> {
106        if data.len() >= 4 && data[0..4] == crate::encrypt::SEGC_MAGIC {
107            return Err(ColumnarError::MissingKek);
108        }
109        SegmentHeader::from_bytes(data)?;
110        let footer = SegmentFooter::from_segment_tail(data)?;
111        Ok(Self { data, footer })
112    }
113
114    /// Access the footer metadata.
115    pub fn footer(&self) -> &SegmentFooter {
116        &self.footer
117    }
118
119    /// Total row count in the segment.
120    pub fn row_count(&self) -> u64 {
121        self.footer.row_count
122    }
123
124    /// Number of columns in the segment.
125    pub fn column_count(&self) -> usize {
126        self.footer.column_count as usize
127    }
128
129    /// Read a single column, decoding all blocks.
130    ///
131    /// `col_idx` is the column index in the footer's column metadata.
132    pub fn read_column(&self, col_idx: usize) -> Result<DecodedColumn, ColumnarError> {
133        self.read_column_filtered(col_idx, &[])
134    }
135
136    /// Read a single column with predicate pushdown.
137    ///
138    /// Blocks whose stats satisfy the predicates are skipped. For skipped
139    /// blocks, null/zero-fill rows are emitted to preserve row alignment
140    /// across projected columns.
141    pub fn read_column_filtered(
142        &self,
143        col_idx: usize,
144        predicates: &[ScanPredicate],
145    ) -> Result<DecodedColumn, ColumnarError> {
146        self.read_column_impl(col_idx, predicates, &DeleteBitmap::new())
147    }
148
149    /// Read multiple columns with shared predicate pushdown.
150    ///
151    /// All columns share the same block skip decisions so row alignment
152    /// is maintained across the result set.
153    pub fn read_columns(
154        &self,
155        col_indices: &[usize],
156        predicates: &[ScanPredicate],
157    ) -> Result<Vec<DecodedColumn>, ColumnarError> {
158        col_indices
159            .iter()
160            .map(|&idx| self.read_column_filtered(idx, predicates))
161            .collect()
162    }
163
164    /// Read a column with both predicate pushdown and delete bitmap masking.
165    ///
166    /// Deleted rows have their validity set to false in the output.
167    /// Fully deleted blocks are skipped entirely (no decompression).
168    pub fn read_column_with_deletes(
169        &self,
170        col_idx: usize,
171        predicates: &[ScanPredicate],
172        deletes: &DeleteBitmap,
173    ) -> Result<DecodedColumn, ColumnarError> {
174        self.read_column_impl(col_idx, predicates, deletes)
175    }
176
177    /// Read multiple columns with predicate pushdown and delete bitmap.
178    pub fn read_columns_with_deletes(
179        &self,
180        col_indices: &[usize],
181        predicates: &[ScanPredicate],
182        deletes: &DeleteBitmap,
183    ) -> Result<Vec<DecodedColumn>, ColumnarError> {
184        col_indices
185            .iter()
186            .map(|&idx| self.read_column_with_deletes(idx, predicates, deletes))
187            .collect()
188    }
189
190    /// Shared implementation for column reading with predicate pushdown and
191    /// optional delete bitmap masking.
192    fn read_column_impl(
193        &self,
194        col_idx: usize,
195        predicates: &[ScanPredicate],
196        deletes: &DeleteBitmap,
197    ) -> Result<DecodedColumn, ColumnarError> {
198        if col_idx >= self.footer.columns.len() {
199            return Err(ColumnarError::ColumnOutOfRange {
200                index: col_idx,
201                count: self.footer.columns.len(),
202            });
203        }
204
205        let col_meta = &self.footer.columns[col_idx];
206        let my_preds: Vec<&ScanPredicate> =
207            predicates.iter().filter(|p| p.col_idx == col_idx).collect();
208
209        let col_start = HEADER_SIZE + col_meta.offset as usize;
210        let mut cursor = col_start;
211        let col_type = infer_column_type(col_meta);
212        let mut result = empty_decoded(&col_type);
213        let mut global_row: u32 = 0;
214
215        for block_stat in &col_meta.block_stats {
216            let block_row_count = block_stat.row_count;
217
218            if cursor + 4 > self.data.len() {
219                return Err(ColumnarError::TruncatedSegment {
220                    expected: cursor + 4,
221                    got: self.data.len(),
222                });
223            }
224            let block_len = u32::from_le_bytes([
225                self.data[cursor],
226                self.data[cursor + 1],
227                self.data[cursor + 2],
228                self.data[cursor + 3],
229            ]) as usize;
230            cursor += 4;
231            let block_data = &self.data[cursor..cursor + block_len];
232            cursor += block_len;
233
234            // Skip via predicate pushdown.
235            let pred_skip = my_preds.iter().any(|p| p.can_skip_block(block_stat));
236
237            // Skip if entire block is deleted.
238            let delete_skip =
239                !deletes.is_empty() && deletes.is_block_fully_deleted(global_row, block_row_count);
240
241            if pred_skip || delete_skip {
242                append_null_fill(&mut result, block_row_count as usize);
243                global_row += block_row_count;
244                continue;
245            }
246
247            // Decode the block.
248            let pre_len = result_valid_len(&result);
249            decode_block(
250                &mut result,
251                block_data,
252                &col_type,
253                col_meta.codec,
254                block_row_count as usize,
255                col_meta.dictionary.as_deref(),
256            )?;
257
258            // Apply delete bitmap to the newly decoded rows.
259            if !deletes.is_empty() {
260                let valid_slice = result_valid_slice_mut(&mut result, pre_len);
261                deletes.apply_to_validity(valid_slice, global_row);
262            }
263
264            global_row += block_row_count;
265        }
266
267        Ok(result)
268    }
269}