1use nodedb_codec::ColumnCodec;
14
15use crate::delete_bitmap::DeleteBitmap;
16use crate::error::ColumnarError;
17use crate::format::{ColumnMeta, HEADER_SIZE, SegmentFooter, SegmentHeader};
18use crate::predicate::ScanPredicate;
19
20#[derive(Debug)]
22pub enum DecodedColumn {
23 Int64 {
24 values: Vec<i64>,
25 valid: Vec<bool>,
26 },
27 Float64 {
28 values: Vec<f64>,
29 valid: Vec<bool>,
30 },
31 Timestamp {
32 values: Vec<i64>,
33 valid: Vec<bool>,
34 },
35 Bool {
36 values: Vec<bool>,
37 valid: Vec<bool>,
38 },
39 Binary {
41 data: Vec<u8>,
43 offsets: Vec<u32>,
45 valid: Vec<bool>,
46 },
47}
48
49pub struct SegmentReader<'a> {
54 data: &'a [u8],
55 footer: SegmentFooter,
56}
57
58impl<'a> SegmentReader<'a> {
59 pub fn open(data: &'a [u8]) -> Result<Self, ColumnarError> {
61 SegmentHeader::from_bytes(data)?;
62 let footer = SegmentFooter::from_segment_tail(data)?;
63 Ok(Self { data, footer })
64 }
65
66 pub fn footer(&self) -> &SegmentFooter {
68 &self.footer
69 }
70
71 pub fn row_count(&self) -> u64 {
73 self.footer.row_count
74 }
75
76 pub fn column_count(&self) -> usize {
78 self.footer.column_count as usize
79 }
80
81 pub fn read_column(&self, col_idx: usize) -> Result<DecodedColumn, ColumnarError> {
85 self.read_column_filtered(col_idx, &[])
86 }
87
88 pub fn read_column_filtered(
94 &self,
95 col_idx: usize,
96 predicates: &[ScanPredicate],
97 ) -> Result<DecodedColumn, ColumnarError> {
98 self.read_column_impl(col_idx, predicates, &DeleteBitmap::new())
99 }
100
101 pub fn read_columns(
106 &self,
107 col_indices: &[usize],
108 predicates: &[ScanPredicate],
109 ) -> Result<Vec<DecodedColumn>, ColumnarError> {
110 col_indices
111 .iter()
112 .map(|&idx| self.read_column_filtered(idx, predicates))
113 .collect()
114 }
115
116 pub fn read_column_with_deletes(
121 &self,
122 col_idx: usize,
123 predicates: &[ScanPredicate],
124 deletes: &DeleteBitmap,
125 ) -> Result<DecodedColumn, ColumnarError> {
126 self.read_column_impl(col_idx, predicates, deletes)
127 }
128
129 fn read_column_impl(
132 &self,
133 col_idx: usize,
134 predicates: &[ScanPredicate],
135 deletes: &DeleteBitmap,
136 ) -> Result<DecodedColumn, ColumnarError> {
137 if col_idx >= self.footer.columns.len() {
138 return Err(ColumnarError::ColumnOutOfRange {
139 index: col_idx,
140 count: self.footer.columns.len(),
141 });
142 }
143
144 let col_meta = &self.footer.columns[col_idx];
145 let my_preds: Vec<&ScanPredicate> =
146 predicates.iter().filter(|p| p.col_idx == col_idx).collect();
147
148 let col_start = HEADER_SIZE + col_meta.offset as usize;
149 let mut cursor = col_start;
150 let col_type = infer_column_type(col_meta);
151 let mut result = empty_decoded(&col_type);
152 let mut global_row: u32 = 0;
153
154 for block_stat in &col_meta.block_stats {
155 let block_row_count = block_stat.row_count;
156
157 if cursor + 4 > self.data.len() {
158 return Err(ColumnarError::TruncatedSegment {
159 expected: cursor + 4,
160 got: self.data.len(),
161 });
162 }
163 let block_len = u32::from_le_bytes([
164 self.data[cursor],
165 self.data[cursor + 1],
166 self.data[cursor + 2],
167 self.data[cursor + 3],
168 ]) as usize;
169 cursor += 4;
170 let block_data = &self.data[cursor..cursor + block_len];
171 cursor += block_len;
172
173 let pred_skip = my_preds.iter().any(|p| p.can_skip_block(block_stat));
175
176 let delete_skip =
178 !deletes.is_empty() && deletes.is_block_fully_deleted(global_row, block_row_count);
179
180 if pred_skip || delete_skip {
181 append_null_fill(&mut result, block_row_count as usize);
182 global_row += block_row_count;
183 continue;
184 }
185
186 let pre_len = result_valid_len(&result);
188 decode_block(
189 &mut result,
190 block_data,
191 &col_type,
192 col_meta.codec,
193 block_row_count as usize,
194 0,
195 )?;
196
197 if !deletes.is_empty() {
199 let valid_slice = result_valid_slice_mut(&mut result, pre_len);
200 deletes.apply_to_validity(valid_slice, global_row);
201 }
202
203 global_row += block_row_count;
204 }
205
206 Ok(result)
207 }
208
209 pub fn read_columns_with_deletes(
211 &self,
212 col_indices: &[usize],
213 predicates: &[ScanPredicate],
214 deletes: &DeleteBitmap,
215 ) -> Result<Vec<DecodedColumn>, ColumnarError> {
216 col_indices
217 .iter()
218 .map(|&idx| self.read_column_with_deletes(idx, predicates, deletes))
219 .collect()
220 }
221}
222
223fn infer_column_type(meta: &ColumnMeta) -> ColumnKind {
228 match meta.codec {
229 ColumnCodec::DeltaFastLanesLz4
230 | ColumnCodec::DeltaFastLanesRans
231 | ColumnCodec::FastLanesLz4
232 | ColumnCodec::Delta
233 | ColumnCodec::DoubleDelta => ColumnKind::Int64,
234
235 ColumnCodec::AlpFastLanesLz4
236 | ColumnCodec::AlpFastLanesRans
237 | ColumnCodec::AlpRdLz4
238 | ColumnCodec::PcodecLz4
239 | ColumnCodec::Gorilla => ColumnKind::Float64,
240
241 ColumnCodec::FsstLz4 | ColumnCodec::FsstRans => ColumnKind::VarLen,
242
243 ColumnCodec::Lz4 | ColumnCodec::Raw | ColumnCodec::Zstd | ColumnCodec::Auto => {
246 if meta.block_stats.first().is_some_and(|s| !s.min.is_nan()) {
247 ColumnKind::Int64 } else {
249 ColumnKind::Binary
250 }
251 }
252 }
253}
254
255#[derive(Debug, Clone, Copy)]
257enum ColumnKind {
258 Int64,
259 Float64,
260 VarLen,
261 Binary,
262}
263
264fn empty_decoded(kind: &ColumnKind) -> DecodedColumn {
266 match kind {
267 ColumnKind::Int64 => DecodedColumn::Int64 {
268 values: Vec::new(),
269 valid: Vec::new(),
270 },
271 ColumnKind::Float64 => DecodedColumn::Float64 {
272 values: Vec::new(),
273 valid: Vec::new(),
274 },
275 ColumnKind::VarLen | ColumnKind::Binary => DecodedColumn::Binary {
276 data: Vec::new(),
277 offsets: Vec::new(),
278 valid: Vec::new(),
279 },
280 }
281}
282
283fn append_null_fill(result: &mut DecodedColumn, row_count: usize) {
285 match result {
286 DecodedColumn::Int64 { values, valid } => {
287 values.extend(std::iter::repeat_n(0i64, row_count));
288 valid.extend(std::iter::repeat_n(false, row_count));
289 }
290 DecodedColumn::Float64 { values, valid } => {
291 values.extend(std::iter::repeat_n(0.0f64, row_count));
292 valid.extend(std::iter::repeat_n(false, row_count));
293 }
294 DecodedColumn::Timestamp { values, valid } => {
295 values.extend(std::iter::repeat_n(0i64, row_count));
296 valid.extend(std::iter::repeat_n(false, row_count));
297 }
298 DecodedColumn::Bool { values, valid } => {
299 values.extend(std::iter::repeat_n(false, row_count));
300 valid.extend(std::iter::repeat_n(false, row_count));
301 }
302 DecodedColumn::Binary {
303 data: _,
304 offsets,
305 valid,
306 } => {
307 let last = *offsets.last().unwrap_or(&0);
308 if offsets.is_empty() {
311 offsets.push(last); }
313 offsets.extend(std::iter::repeat_n(last, row_count));
314 valid.extend(std::iter::repeat_n(false, row_count));
315 }
316 }
317}
318
319fn result_valid_len(result: &DecodedColumn) -> usize {
321 match result {
322 DecodedColumn::Int64 { valid, .. }
323 | DecodedColumn::Float64 { valid, .. }
324 | DecodedColumn::Timestamp { valid, .. }
325 | DecodedColumn::Bool { valid, .. }
326 | DecodedColumn::Binary { valid, .. } => valid.len(),
327 }
328}
329
330fn result_valid_slice_mut(result: &mut DecodedColumn, offset: usize) -> &mut [bool] {
332 match result {
333 DecodedColumn::Int64 { valid, .. }
334 | DecodedColumn::Float64 { valid, .. }
335 | DecodedColumn::Timestamp { valid, .. }
336 | DecodedColumn::Bool { valid, .. }
337 | DecodedColumn::Binary { valid, .. } => &mut valid[offset..],
338 }
339}
340
341fn decode_block(
343 result: &mut DecodedColumn,
344 block_data: &[u8],
345 kind: &ColumnKind,
346 codec: ColumnCodec,
347 row_count: usize,
348 _block_idx: usize,
349) -> Result<(), ColumnarError> {
350 let bitmap_size = row_count.div_ceil(8);
351
352 if block_data.len() < bitmap_size {
353 return Err(ColumnarError::TruncatedSegment {
354 expected: bitmap_size,
355 got: block_data.len(),
356 });
357 }
358
359 let bitmap = &block_data[..bitmap_size];
360 let payload = &block_data[bitmap_size..];
361
362 let valid: Vec<bool> = (0..row_count)
364 .map(|i| bitmap[i / 8] & (1 << (i % 8)) != 0)
365 .collect();
366
367 match kind {
368 ColumnKind::Int64 => {
369 let DecodedColumn::Int64 { values, valid: v } = result else {
370 append_null_fill(result, row_count);
371 return Ok(());
372 };
373 let decoded = nodedb_codec::decode_i64_pipeline(payload, codec)?;
374 values.extend_from_slice(&decoded[..row_count.min(decoded.len())]);
375 while values.len() < v.len() + row_count {
376 values.push(0);
377 }
378 v.extend_from_slice(&valid);
379 }
380 ColumnKind::Float64 => {
381 let DecodedColumn::Float64 { values, valid: v } = result else {
382 append_null_fill(result, row_count);
383 return Ok(());
384 };
385 let decoded = nodedb_codec::decode_f64_pipeline(payload, codec)?;
386 values.extend_from_slice(&decoded[..row_count.min(decoded.len())]);
387 while values.len() < v.len() + row_count {
388 values.push(0.0);
389 }
390 v.extend_from_slice(&valid);
391 }
392 ColumnKind::VarLen => {
393 let DecodedColumn::Binary {
394 data,
395 offsets,
396 valid: v,
397 } = result
398 else {
399 append_null_fill(result, row_count);
400 return Ok(());
401 };
402 if payload.len() < 4 {
404 return Err(ColumnarError::TruncatedSegment {
405 expected: bitmap_size + 4,
406 got: block_data.len(),
407 });
408 }
409 let offset_len =
410 u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]) as usize;
411 let offset_data = &payload[4..4 + offset_len];
412 let string_data = &payload[4 + offset_len..];
413
414 let decoded_offsets =
415 nodedb_codec::decode_i64_pipeline(offset_data, ColumnCodec::DeltaFastLanesLz4)?;
416 let decoded_bytes = nodedb_codec::decode_bytes_pipeline(string_data, codec)?;
417
418 let base = data.len() as u32;
421 let n_offsets = (row_count + 1).min(decoded_offsets.len());
422 for &off in &decoded_offsets[..n_offsets] {
423 offsets.push(base + off as u32);
424 }
425
426 data.extend_from_slice(&decoded_bytes);
427 v.extend_from_slice(&valid);
428 }
429 ColumnKind::Binary => {
430 let DecodedColumn::Binary {
431 data,
432 offsets,
433 valid: v,
434 } = result
435 else {
436 append_null_fill(result, row_count);
437 return Ok(());
438 };
439 let decoded_bytes = nodedb_codec::decode_bytes_pipeline(payload, codec)?;
440 let base = data.len() as u32;
441
442 if row_count > 0 && !decoded_bytes.is_empty() {
443 let chunk_size = decoded_bytes.len() / row_count;
444 for i in 0..row_count {
445 offsets.push(base + (i * chunk_size) as u32);
446 }
447 offsets.push(base + decoded_bytes.len() as u32);
448 } else {
449 let last = *offsets.last().unwrap_or(&0);
450 offsets.extend(std::iter::repeat_n(last, row_count + 1));
451 }
452
453 data.extend_from_slice(&decoded_bytes);
454 v.extend_from_slice(&valid);
455 }
456 }
457
458 Ok(())
459}
460
461#[cfg(test)]
462mod tests {
463 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
464 use nodedb_types::value::Value;
465
466 use super::*;
467 use crate::memtable::ColumnarMemtable;
468 use crate::writer::SegmentWriter;
469
470 fn write_test_segment(rows: usize) -> Vec<u8> {
471 let schema = ColumnarSchema::new(vec![
472 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
473 ColumnDef::required("name", ColumnType::String),
474 ColumnDef::nullable("score", ColumnType::Float64),
475 ])
476 .expect("valid");
477
478 let mut mt = ColumnarMemtable::new(&schema);
479 for i in 0..rows {
480 mt.append_row(&[
481 Value::Integer(i as i64),
482 Value::String(format!("user_{i}")),
483 if i % 5 == 0 {
484 Value::Null
485 } else {
486 Value::Float(i as f64 * 0.5)
487 },
488 ])
489 .expect("append");
490 }
491
492 let (schema, columns, row_count) = mt.drain();
493 SegmentWriter::plain()
494 .write_segment(&schema, &columns, row_count)
495 .expect("write")
496 }
497
498 #[test]
499 fn read_int64_column() {
500 let segment = write_test_segment(100);
501 let reader = SegmentReader::open(&segment).expect("open");
502
503 assert_eq!(reader.row_count(), 100);
504 assert_eq!(reader.column_count(), 3);
505
506 let col = reader.read_column(0).expect("read id column");
507 match col {
508 DecodedColumn::Int64 { values, valid } => {
509 assert_eq!(values.len(), 100);
510 assert_eq!(valid.len(), 100);
511 assert_eq!(values[0], 0);
512 assert_eq!(values[99], 99);
513 assert!(valid.iter().all(|&v| v)); }
515 _ => panic!("expected Int64"),
516 }
517 }
518
519 #[test]
520 fn read_string_column() {
521 let segment = write_test_segment(50);
522 let reader = SegmentReader::open(&segment).expect("open");
523
524 let col = reader.read_column(1).expect("read name column");
525 match col {
526 DecodedColumn::Binary {
527 data,
528 offsets,
529 valid,
530 } => {
531 assert_eq!(valid.len(), 50);
532 assert!(valid.iter().all(|&v| v));
533 let start = offsets[0] as usize;
535 let end = offsets[1] as usize;
536 let first = std::str::from_utf8(&data[start..end]).expect("utf8");
537 assert_eq!(first, "user_0");
538 let start = offsets[49] as usize;
540 let end = offsets[50] as usize;
541 let last = std::str::from_utf8(&data[start..end]).expect("utf8");
542 assert_eq!(last, "user_49");
543 }
544 _ => panic!("expected Binary (string)"),
545 }
546 }
547
548 #[test]
549 fn read_float64_with_nulls() {
550 let segment = write_test_segment(100);
551 let reader = SegmentReader::open(&segment).expect("open");
552
553 let col = reader.read_column(2).expect("read score column");
554 let (values, valid) = match &col {
556 DecodedColumn::Float64 { values, valid } => (values.as_slice(), valid.as_slice()),
557 other => panic!("expected Float64, got {other:?}"),
558 };
559
560 assert_eq!(valid.len(), 100);
562 let null_count = valid.iter().filter(|&&v| !v).count();
563 assert_eq!(null_count, 20);
564
565 assert!(valid[1]);
567 assert!((values[1] - 0.5).abs() < 0.001);
568 }
569
570 #[test]
571 fn predicate_pushdown_skips_blocks() {
572 let segment = write_test_segment(2500);
574 let reader = SegmentReader::open(&segment).expect("open");
575
576 let footer = reader.footer();
578 assert_eq!(footer.columns[0].block_count, 3);
579
580 let pred = ScanPredicate::gt(0, 2100.0);
582 let col = reader
583 .read_column_filtered(0, &[pred])
584 .expect("filtered read");
585
586 match col {
587 DecodedColumn::Int64 { values, valid } => {
588 assert_eq!(values.len(), 2500);
589 assert!(!valid[0]); assert!(!valid[1023]); assert!(!valid[1024]); assert!(!valid[2047]); assert!(valid[2048]); assert_eq!(values[2048], 2048);
597 assert!(valid[2499]);
598 assert_eq!(values[2499], 2499);
599 }
600 _ => panic!("expected Int64"),
601 }
602 }
603
604 #[test]
605 fn read_multiple_columns() {
606 let segment = write_test_segment(50);
607 let reader = SegmentReader::open(&segment).expect("open");
608
609 let cols = reader.read_columns(&[0, 2], &[]).expect("read multi");
610 assert_eq!(cols.len(), 2);
611
612 match &cols[0] {
614 DecodedColumn::Int64 { values, .. } => {
615 assert_eq!(values.len(), 50);
616 }
617 _ => panic!("expected Int64 for id"),
618 }
619 }
620
621 #[test]
622 fn column_out_of_range() {
623 let segment = write_test_segment(10);
624 let reader = SegmentReader::open(&segment).expect("open");
625 assert!(matches!(
626 reader.read_column(99),
627 Err(ColumnarError::ColumnOutOfRange { index: 99, .. })
628 ));
629 }
630
631 #[test]
632 fn write_read_roundtrip_multi_block() {
633 let segment = write_test_segment(3000);
634 let reader = SegmentReader::open(&segment).expect("open");
635
636 let col = reader.read_column(0).expect("read id");
637 match col {
638 DecodedColumn::Int64 { values, valid } => {
639 assert_eq!(values.len(), 3000);
640 for i in 0..3000 {
641 assert!(valid[i], "row {i} should be valid");
642 assert_eq!(values[i], i as i64, "row {i} value mismatch");
643 }
644 }
645 _ => panic!("expected Int64"),
646 }
647 }
648}