nodedb_columnar/reader/
segment_reader.rs1use crate::delete_bitmap::DeleteBitmap;
16use crate::error::ColumnarError;
17use crate::format::{HEADER_SIZE, SegmentFooter, SegmentHeader};
18use crate::predicate::ScanPredicate;
19
20use super::block_decode::{
21 append_null_fill, decode_block, empty_decoded, infer_column_type, result_valid_len,
22 result_valid_slice_mut,
23};
24use super::types::DecodedColumn;
25
26pub struct SegmentReader<'a> {
31 pub(super) data: &'a [u8],
32 pub(super) footer: SegmentFooter,
33}
34
35#[derive(Debug)]
40pub struct OwnedSegmentReader {
41 plaintext: Vec<u8>,
43 footer: SegmentFooter,
44}
45
46impl OwnedSegmentReader {
47 fn from_plaintext(plaintext: Vec<u8>) -> Result<Self, ColumnarError> {
48 SegmentHeader::from_bytes(&plaintext)?;
49 let footer = SegmentFooter::from_segment_tail(&plaintext)?;
50 Ok(Self { plaintext, footer })
51 }
52
53 pub fn open_with_kek(
61 blob: &[u8],
62 kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
63 ) -> Result<Self, ColumnarError> {
64 let is_encrypted = blob.len() >= 4 && blob[0..4] == crate::encrypt::SEGC_MAGIC;
65 if is_encrypted {
66 let key = kek.ok_or(ColumnarError::MissingKek)?;
67 let plaintext = crate::encrypt::decrypt_segment(key, blob)?;
68 Self::from_plaintext(plaintext)
69 } else if kek.is_some() {
70 Err(ColumnarError::KekRequired)
71 } else {
72 Self::from_plaintext(blob.to_vec())
73 }
74 }
75
76 pub fn reader(&self) -> SegmentReader<'_> {
78 SegmentReader {
79 data: &self.plaintext,
80 footer: self.footer.clone(),
81 }
82 }
83
84 pub fn footer(&self) -> &SegmentFooter {
86 &self.footer
87 }
88
89 pub fn row_count(&self) -> u64 {
91 self.footer.row_count
92 }
93}
94
95impl<'a> SegmentReader<'a> {
96 pub fn open(data: &'a [u8]) -> Result<Self, ColumnarError> {
106 if data.len() >= 4 && data[0..4] == crate::encrypt::SEGC_MAGIC {
107 return Err(ColumnarError::MissingKek);
108 }
109 SegmentHeader::from_bytes(data)?;
110 let footer = SegmentFooter::from_segment_tail(data)?;
111 Ok(Self { data, footer })
112 }
113
114 pub fn footer(&self) -> &SegmentFooter {
116 &self.footer
117 }
118
119 pub fn row_count(&self) -> u64 {
121 self.footer.row_count
122 }
123
124 pub fn column_count(&self) -> usize {
126 self.footer.column_count as usize
127 }
128
129 pub fn read_column(&self, col_idx: usize) -> Result<DecodedColumn, ColumnarError> {
133 self.read_column_filtered(col_idx, &[])
134 }
135
136 pub fn read_column_filtered(
142 &self,
143 col_idx: usize,
144 predicates: &[ScanPredicate],
145 ) -> Result<DecodedColumn, ColumnarError> {
146 self.read_column_impl(col_idx, predicates, &DeleteBitmap::new())
147 }
148
149 pub fn read_columns(
154 &self,
155 col_indices: &[usize],
156 predicates: &[ScanPredicate],
157 ) -> Result<Vec<DecodedColumn>, ColumnarError> {
158 col_indices
159 .iter()
160 .map(|&idx| self.read_column_filtered(idx, predicates))
161 .collect()
162 }
163
164 pub fn read_column_with_deletes(
169 &self,
170 col_idx: usize,
171 predicates: &[ScanPredicate],
172 deletes: &DeleteBitmap,
173 ) -> Result<DecodedColumn, ColumnarError> {
174 self.read_column_impl(col_idx, predicates, deletes)
175 }
176
177 pub fn read_columns_with_deletes(
179 &self,
180 col_indices: &[usize],
181 predicates: &[ScanPredicate],
182 deletes: &DeleteBitmap,
183 ) -> Result<Vec<DecodedColumn>, ColumnarError> {
184 col_indices
185 .iter()
186 .map(|&idx| self.read_column_with_deletes(idx, predicates, deletes))
187 .collect()
188 }
189
190 fn read_column_impl(
193 &self,
194 col_idx: usize,
195 predicates: &[ScanPredicate],
196 deletes: &DeleteBitmap,
197 ) -> Result<DecodedColumn, ColumnarError> {
198 if col_idx >= self.footer.columns.len() {
199 return Err(ColumnarError::ColumnOutOfRange {
200 index: col_idx,
201 count: self.footer.columns.len(),
202 });
203 }
204
205 let col_meta = &self.footer.columns[col_idx];
206 let my_preds: Vec<&ScanPredicate> =
207 predicates.iter().filter(|p| p.col_idx == col_idx).collect();
208
209 let col_start = HEADER_SIZE + col_meta.offset as usize;
210 let mut cursor = col_start;
211 let col_type = infer_column_type(col_meta);
212 let mut result = empty_decoded(&col_type);
213 let mut global_row: u32 = 0;
214
215 for block_stat in &col_meta.block_stats {
216 let block_row_count = block_stat.row_count;
217
218 if cursor + 4 > self.data.len() {
219 return Err(ColumnarError::TruncatedSegment {
220 expected: cursor + 4,
221 got: self.data.len(),
222 });
223 }
224 let block_len = u32::from_le_bytes([
225 self.data[cursor],
226 self.data[cursor + 1],
227 self.data[cursor + 2],
228 self.data[cursor + 3],
229 ]) as usize;
230 cursor += 4;
231 let block_data = &self.data[cursor..cursor + block_len];
232 cursor += block_len;
233
234 let pred_skip = my_preds.iter().any(|p| p.can_skip_block(block_stat));
236
237 let delete_skip =
239 !deletes.is_empty() && deletes.is_block_fully_deleted(global_row, block_row_count);
240
241 if pred_skip || delete_skip {
242 append_null_fill(&mut result, block_row_count as usize);
243 global_row += block_row_count;
244 continue;
245 }
246
247 let pre_len = result_valid_len(&result);
249 decode_block(
250 &mut result,
251 block_data,
252 &col_type,
253 col_meta.codec,
254 block_row_count as usize,
255 col_meta.dictionary.as_deref(),
256 )?;
257
258 if !deletes.is_empty() {
260 let valid_slice = result_valid_slice_mut(&mut result, pre_len);
261 deletes.apply_to_validity(valid_slice, global_row);
262 }
263
264 global_row += block_row_count;
265 }
266
267 Ok(result)
268 }
269}