Skip to main content

nodedb_columnar/
reader.rs

1//! Segment reader: decode compressed columns from a segment into typed vectors
2//! or Arrow arrays, with column projection and block predicate pushdown.
3//!
4//! # Block Wire Format
5//!
6//! Each block in a column is stored as `[compressed_len: u32 LE][compressed_data]`.
7//!
8//! The compressed_data structure depends on the column type:
9//! - **Int64/Float64/Timestamp**: `[validity_bitmap][codec_compressed_values]`
10//! - **Bool/Decimal/Uuid/Vector**: `[validity_bitmap][codec_compressed_bytes]`
11//! - **String/Bytes/Geometry**: `[validity_bitmap][offset_len: u32][compressed_offsets][compressed_data]`
12
13use nodedb_codec::ColumnCodec;
14
15use crate::delete_bitmap::DeleteBitmap;
16use crate::error::ColumnarError;
17use crate::format::{ColumnMeta, HEADER_SIZE, SegmentFooter, SegmentHeader};
18use crate::predicate::ScanPredicate;
19
20/// Decoded column data from a segment scan.
21#[derive(Debug)]
22pub enum DecodedColumn {
23    Int64 {
24        values: Vec<i64>,
25        valid: Vec<bool>,
26    },
27    Float64 {
28        values: Vec<f64>,
29        valid: Vec<bool>,
30    },
31    Timestamp {
32        values: Vec<i64>,
33        valid: Vec<bool>,
34    },
35    Bool {
36        values: Vec<bool>,
37        valid: Vec<bool>,
38    },
39    /// Variable-length or fixed-size binary (String, Bytes, Geometry, Decimal, Uuid, Vector).
40    Binary {
41        /// Raw decompressed bytes for the block.
42        data: Vec<u8>,
43        /// Per-row byte offsets into `data`. Length = row_count + 1.
44        offsets: Vec<u32>,
45        valid: Vec<bool>,
46    },
47    /// Dictionary-encoded string column.
48    ///
49    /// IDs index into `dictionary`. Use `dictionary[ids[i]]` to recover the string
50    /// for row `i` when `valid[i]` is true.
51    DictEncoded {
52        /// Symbol IDs per row (index into `dictionary`).
53        ids: Vec<u32>,
54        /// Dictionary: ID → string value. Populated from `ColumnMeta.dictionary`.
55        dictionary: Vec<String>,
56        valid: Vec<bool>,
57    },
58}
59
60/// Reads and decodes columns from a segment byte buffer.
61///
62/// Supports column projection (only decode requested columns) and block
63/// predicate pushdown (skip blocks whose stats prove no match).
64pub struct SegmentReader<'a> {
65    data: &'a [u8],
66    footer: SegmentFooter,
67}
68
69impl<'a> SegmentReader<'a> {
70    /// Open a segment from a byte buffer. Validates header and footer CRC.
71    pub fn open(data: &'a [u8]) -> Result<Self, ColumnarError> {
72        SegmentHeader::from_bytes(data)?;
73        let footer = SegmentFooter::from_segment_tail(data)?;
74        Ok(Self { data, footer })
75    }
76
77    /// Access the footer metadata.
78    pub fn footer(&self) -> &SegmentFooter {
79        &self.footer
80    }
81
82    /// Total row count in the segment.
83    pub fn row_count(&self) -> u64 {
84        self.footer.row_count
85    }
86
87    /// Number of columns in the segment.
88    pub fn column_count(&self) -> usize {
89        self.footer.column_count as usize
90    }
91
92    /// Read a single column, decoding all blocks.
93    ///
94    /// `col_idx` is the column index in the footer's column metadata.
95    pub fn read_column(&self, col_idx: usize) -> Result<DecodedColumn, ColumnarError> {
96        self.read_column_filtered(col_idx, &[])
97    }
98
99    /// Read a single column with predicate pushdown.
100    ///
101    /// Blocks whose stats satisfy the predicates are skipped. For skipped
102    /// blocks, null/zero-fill rows are emitted to preserve row alignment
103    /// across projected columns.
104    pub fn read_column_filtered(
105        &self,
106        col_idx: usize,
107        predicates: &[ScanPredicate],
108    ) -> Result<DecodedColumn, ColumnarError> {
109        self.read_column_impl(col_idx, predicates, &DeleteBitmap::new())
110    }
111
112    /// Read multiple columns with shared predicate pushdown.
113    ///
114    /// All columns share the same block skip decisions so row alignment
115    /// is maintained across the result set.
116    pub fn read_columns(
117        &self,
118        col_indices: &[usize],
119        predicates: &[ScanPredicate],
120    ) -> Result<Vec<DecodedColumn>, ColumnarError> {
121        col_indices
122            .iter()
123            .map(|&idx| self.read_column_filtered(idx, predicates))
124            .collect()
125    }
126
127    /// Read a column with both predicate pushdown and delete bitmap masking.
128    ///
129    /// Deleted rows have their validity set to false in the output.
130    /// Fully deleted blocks are skipped entirely (no decompression).
131    pub fn read_column_with_deletes(
132        &self,
133        col_idx: usize,
134        predicates: &[ScanPredicate],
135        deletes: &DeleteBitmap,
136    ) -> Result<DecodedColumn, ColumnarError> {
137        self.read_column_impl(col_idx, predicates, deletes)
138    }
139
140    /// Shared implementation for column reading with predicate pushdown and
141    /// optional delete bitmap masking.
142    fn read_column_impl(
143        &self,
144        col_idx: usize,
145        predicates: &[ScanPredicate],
146        deletes: &DeleteBitmap,
147    ) -> Result<DecodedColumn, ColumnarError> {
148        if col_idx >= self.footer.columns.len() {
149            return Err(ColumnarError::ColumnOutOfRange {
150                index: col_idx,
151                count: self.footer.columns.len(),
152            });
153        }
154
155        let col_meta = &self.footer.columns[col_idx];
156        let my_preds: Vec<&ScanPredicate> =
157            predicates.iter().filter(|p| p.col_idx == col_idx).collect();
158
159        let col_start = HEADER_SIZE + col_meta.offset as usize;
160        let mut cursor = col_start;
161        let col_type = infer_column_type(col_meta);
162        let mut result = empty_decoded(&col_type);
163        let mut global_row: u32 = 0;
164
165        for block_stat in &col_meta.block_stats {
166            let block_row_count = block_stat.row_count;
167
168            if cursor + 4 > self.data.len() {
169                return Err(ColumnarError::TruncatedSegment {
170                    expected: cursor + 4,
171                    got: self.data.len(),
172                });
173            }
174            let block_len = u32::from_le_bytes([
175                self.data[cursor],
176                self.data[cursor + 1],
177                self.data[cursor + 2],
178                self.data[cursor + 3],
179            ]) as usize;
180            cursor += 4;
181            let block_data = &self.data[cursor..cursor + block_len];
182            cursor += block_len;
183
184            // Skip via predicate pushdown.
185            let pred_skip = my_preds.iter().any(|p| p.can_skip_block(block_stat));
186
187            // Skip if entire block is deleted.
188            let delete_skip =
189                !deletes.is_empty() && deletes.is_block_fully_deleted(global_row, block_row_count);
190
191            if pred_skip || delete_skip {
192                append_null_fill(&mut result, block_row_count as usize);
193                global_row += block_row_count;
194                continue;
195            }
196
197            // Decode the block.
198            let pre_len = result_valid_len(&result);
199            decode_block(
200                &mut result,
201                block_data,
202                &col_type,
203                col_meta.codec,
204                block_row_count as usize,
205                col_meta.dictionary.as_deref(),
206            )?;
207
208            // Apply delete bitmap to the newly decoded rows.
209            if !deletes.is_empty() {
210                let valid_slice = result_valid_slice_mut(&mut result, pre_len);
211                deletes.apply_to_validity(valid_slice, global_row);
212            }
213
214            global_row += block_row_count;
215        }
216
217        Ok(result)
218    }
219
220    /// Read multiple columns with predicate pushdown and delete bitmap.
221    pub fn read_columns_with_deletes(
222        &self,
223        col_indices: &[usize],
224        predicates: &[ScanPredicate],
225        deletes: &DeleteBitmap,
226    ) -> Result<Vec<DecodedColumn>, ColumnarError> {
227        col_indices
228            .iter()
229            .map(|&idx| self.read_column_with_deletes(idx, predicates, deletes))
230            .collect()
231    }
232}
233
234/// Infer a simplified column type from ColumnMeta for decode dispatch.
235///
236/// We use the codec as a strong signal: DeltaFastLanesLz4 = numeric,
237/// FsstLz4 = string, etc. The name is a fallback heuristic.
238fn infer_column_type(meta: &ColumnMeta) -> ColumnKind {
239    // Dict-encoded columns store IDs as DeltaFastLanesLz4 but must be decoded
240    // differently — the presence of a dictionary distinguishes them.
241    if meta.dictionary.is_some() {
242        return ColumnKind::DictEncoded;
243    }
244
245    match meta.codec {
246        ColumnCodec::DeltaFastLanesLz4
247        | ColumnCodec::DeltaFastLanesRans
248        | ColumnCodec::FastLanesLz4
249        | ColumnCodec::Delta
250        | ColumnCodec::DoubleDelta => ColumnKind::Int64,
251
252        ColumnCodec::AlpFastLanesLz4
253        | ColumnCodec::AlpFastLanesRans
254        | ColumnCodec::AlpRdLz4
255        | ColumnCodec::PcodecLz4
256        | ColumnCodec::Gorilla => ColumnKind::Float64,
257
258        ColumnCodec::FsstLz4 | ColumnCodec::FsstRans => ColumnKind::VarLen,
259
260        // LZ4/Raw/Zstd could be bool, binary, decimal, uuid, vector — use
261        // block_stats to distinguish: if min/max are NaN → binary-like.
262        ColumnCodec::Lz4 | ColumnCodec::Raw | ColumnCodec::Zstd | ColumnCodec::Auto => {
263            if meta.block_stats.first().is_some_and(|s| !s.min.is_nan()) {
264                ColumnKind::Int64 // Numeric fallback.
265            } else {
266                ColumnKind::Binary
267            }
268        }
269    }
270}
271
272/// Simplified column kind for decode dispatch.
273#[derive(Debug, Clone, Copy)]
274enum ColumnKind {
275    Int64,
276    Float64,
277    VarLen,
278    Binary,
279    DictEncoded,
280}
281
282/// Create an empty DecodedColumn for the given kind.
283fn empty_decoded(kind: &ColumnKind) -> DecodedColumn {
284    match kind {
285        ColumnKind::Int64 => DecodedColumn::Int64 {
286            values: Vec::new(),
287            valid: Vec::new(),
288        },
289        ColumnKind::Float64 => DecodedColumn::Float64 {
290            values: Vec::new(),
291            valid: Vec::new(),
292        },
293        ColumnKind::VarLen | ColumnKind::Binary => DecodedColumn::Binary {
294            data: Vec::new(),
295            offsets: Vec::new(),
296            valid: Vec::new(),
297        },
298        ColumnKind::DictEncoded => DecodedColumn::DictEncoded {
299            ids: Vec::new(),
300            dictionary: Vec::new(), // Populated during decode_block.
301            valid: Vec::new(),
302        },
303    }
304}
305
306/// Append null-fill rows for a skipped block.
307fn append_null_fill(result: &mut DecodedColumn, row_count: usize) {
308    match result {
309        DecodedColumn::Int64 { values, valid } => {
310            values.extend(std::iter::repeat_n(0i64, row_count));
311            valid.extend(std::iter::repeat_n(false, row_count));
312        }
313        DecodedColumn::Float64 { values, valid } => {
314            values.extend(std::iter::repeat_n(0.0f64, row_count));
315            valid.extend(std::iter::repeat_n(false, row_count));
316        }
317        DecodedColumn::Timestamp { values, valid } => {
318            values.extend(std::iter::repeat_n(0i64, row_count));
319            valid.extend(std::iter::repeat_n(false, row_count));
320        }
321        DecodedColumn::Bool { values, valid } => {
322            values.extend(std::iter::repeat_n(false, row_count));
323            valid.extend(std::iter::repeat_n(false, row_count));
324        }
325        DecodedColumn::Binary {
326            data: _,
327            offsets,
328            valid,
329        } => {
330            let last = *offsets.last().unwrap_or(&0);
331            // For null-fill: each null row has zero-length data.
332            // Need row_count + 1 offsets if this is the first block, else row_count.
333            if offsets.is_empty() {
334                offsets.push(last); // Initial sentinel for first block.
335            }
336            offsets.extend(std::iter::repeat_n(last, row_count));
337            valid.extend(std::iter::repeat_n(false, row_count));
338        }
339        DecodedColumn::DictEncoded { ids, valid, .. } => {
340            ids.extend(std::iter::repeat_n(0u32, row_count));
341            valid.extend(std::iter::repeat_n(false, row_count));
342        }
343    }
344}
345
346/// Get the current length of the validity vector in a DecodedColumn.
347fn result_valid_len(result: &DecodedColumn) -> usize {
348    match result {
349        DecodedColumn::Int64 { valid, .. }
350        | DecodedColumn::Float64 { valid, .. }
351        | DecodedColumn::Timestamp { valid, .. }
352        | DecodedColumn::Bool { valid, .. }
353        | DecodedColumn::Binary { valid, .. }
354        | DecodedColumn::DictEncoded { valid, .. } => valid.len(),
355    }
356}
357
358/// Get a mutable slice of the validity vector starting from `offset`.
359fn result_valid_slice_mut(result: &mut DecodedColumn, offset: usize) -> &mut [bool] {
360    match result {
361        DecodedColumn::Int64 { valid, .. }
362        | DecodedColumn::Float64 { valid, .. }
363        | DecodedColumn::Timestamp { valid, .. }
364        | DecodedColumn::Bool { valid, .. }
365        | DecodedColumn::Binary { valid, .. }
366        | DecodedColumn::DictEncoded { valid, .. } => &mut valid[offset..],
367    }
368}
369
370/// Decode a single block and append results to the DecodedColumn.
371fn decode_block(
372    result: &mut DecodedColumn,
373    block_data: &[u8],
374    kind: &ColumnKind,
375    codec: ColumnCodec,
376    row_count: usize,
377    dictionary: Option<&[String]>,
378) -> Result<(), ColumnarError> {
379    let bitmap_size = row_count.div_ceil(8);
380
381    if block_data.len() < bitmap_size {
382        return Err(ColumnarError::TruncatedSegment {
383            expected: bitmap_size,
384            got: block_data.len(),
385        });
386    }
387
388    let bitmap = &block_data[..bitmap_size];
389    let payload = &block_data[bitmap_size..];
390
391    // Extract validity from bitmap.
392    let valid: Vec<bool> = (0..row_count)
393        .map(|i| bitmap[i / 8] & (1 << (i % 8)) != 0)
394        .collect();
395
396    match kind {
397        ColumnKind::Int64 => {
398            let DecodedColumn::Int64 { values, valid: v } = result else {
399                append_null_fill(result, row_count);
400                return Ok(());
401            };
402            let decoded = nodedb_codec::decode_i64_pipeline(payload, codec)?;
403            values.extend_from_slice(&decoded[..row_count.min(decoded.len())]);
404            while values.len() < v.len() + row_count {
405                values.push(0);
406            }
407            v.extend_from_slice(&valid);
408        }
409        ColumnKind::Float64 => {
410            let DecodedColumn::Float64 { values, valid: v } = result else {
411                append_null_fill(result, row_count);
412                return Ok(());
413            };
414            let decoded = nodedb_codec::decode_f64_pipeline(payload, codec)?;
415            values.extend_from_slice(&decoded[..row_count.min(decoded.len())]);
416            while values.len() < v.len() + row_count {
417                values.push(0.0);
418            }
419            v.extend_from_slice(&valid);
420        }
421        ColumnKind::VarLen => {
422            let DecodedColumn::Binary {
423                data,
424                offsets,
425                valid: v,
426            } = result
427            else {
428                append_null_fill(result, row_count);
429                return Ok(());
430            };
431            // Variable-length layout: [offset_len: u32][compressed_offsets][compressed_data].
432            if payload.len() < 4 {
433                return Err(ColumnarError::TruncatedSegment {
434                    expected: bitmap_size + 4,
435                    got: block_data.len(),
436                });
437            }
438            let offset_len =
439                u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]) as usize;
440            let offset_data = &payload[4..4 + offset_len];
441            let string_data = &payload[4 + offset_len..];
442
443            let decoded_offsets =
444                nodedb_codec::decode_i64_pipeline(offset_data, ColumnCodec::DeltaFastLanesLz4)?;
445            let decoded_bytes = nodedb_codec::decode_bytes_pipeline(string_data, codec)?;
446
447            // decoded_offsets has row_count + 1 entries (including sentinel).
448            // Map them to absolute positions in the output data buffer.
449            let base = data.len() as u32;
450            let n_offsets = (row_count + 1).min(decoded_offsets.len());
451            for &off in &decoded_offsets[..n_offsets] {
452                offsets.push(base + off as u32);
453            }
454
455            data.extend_from_slice(&decoded_bytes);
456            v.extend_from_slice(&valid);
457        }
458        ColumnKind::Binary => {
459            let DecodedColumn::Binary {
460                data,
461                offsets,
462                valid: v,
463            } = result
464            else {
465                append_null_fill(result, row_count);
466                return Ok(());
467            };
468            let decoded_bytes = nodedb_codec::decode_bytes_pipeline(payload, codec)?;
469            let base = data.len() as u32;
470
471            if row_count > 0 && !decoded_bytes.is_empty() {
472                let chunk_size = decoded_bytes.len() / row_count;
473                for i in 0..row_count {
474                    offsets.push(base + (i * chunk_size) as u32);
475                }
476                offsets.push(base + decoded_bytes.len() as u32);
477            } else {
478                let last = *offsets.last().unwrap_or(&0);
479                offsets.extend(std::iter::repeat_n(last, row_count + 1));
480            }
481
482            data.extend_from_slice(&decoded_bytes);
483            v.extend_from_slice(&valid);
484        }
485        ColumnKind::DictEncoded => {
486            let DecodedColumn::DictEncoded {
487                ids,
488                dictionary: col_dict,
489                valid: v,
490            } = result
491            else {
492                append_null_fill(result, row_count);
493                return Ok(());
494            };
495
496            // IDs are stored as i64 via DeltaFastLanesLz4.
497            let decoded =
498                nodedb_codec::decode_i64_pipeline(payload, ColumnCodec::DeltaFastLanesLz4)?;
499            let id_slice = &decoded[..row_count.min(decoded.len())];
500            ids.extend(id_slice.iter().map(|&id| id as u32));
501            // Pad to row_count if decoded is shorter.
502            while ids.len() < v.len() + row_count {
503                ids.push(0);
504            }
505            v.extend_from_slice(&valid);
506
507            // Populate the dictionary on the first block that provides it.
508            if col_dict.is_empty()
509                && let Some(dict) = dictionary
510            {
511                col_dict.extend_from_slice(dict);
512            }
513        }
514    }
515
516    Ok(())
517}
518
519#[cfg(test)]
520mod tests {
521    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
522    use nodedb_types::value::Value;
523
524    use super::*;
525    use crate::memtable::ColumnarMemtable;
526    use crate::writer::SegmentWriter;
527
528    fn write_test_segment(rows: usize) -> Vec<u8> {
529        let schema = ColumnarSchema::new(vec![
530            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
531            ColumnDef::required("name", ColumnType::String),
532            ColumnDef::nullable("score", ColumnType::Float64),
533        ])
534        .expect("valid");
535
536        let mut mt = ColumnarMemtable::new(&schema);
537        for i in 0..rows {
538            mt.append_row(&[
539                Value::Integer(i as i64),
540                Value::String(format!("user_{i}")),
541                if i % 5 == 0 {
542                    Value::Null
543                } else {
544                    Value::Float(i as f64 * 0.5)
545                },
546            ])
547            .expect("append");
548        }
549
550        let (schema, columns, row_count) = mt.drain();
551        SegmentWriter::plain()
552            .write_segment(&schema, &columns, row_count)
553            .expect("write")
554    }
555
556    #[test]
557    fn read_int64_column() {
558        let segment = write_test_segment(100);
559        let reader = SegmentReader::open(&segment).expect("open");
560
561        assert_eq!(reader.row_count(), 100);
562        assert_eq!(reader.column_count(), 3);
563
564        let col = reader.read_column(0).expect("read id column");
565        match col {
566            DecodedColumn::Int64 { values, valid } => {
567                assert_eq!(values.len(), 100);
568                assert_eq!(valid.len(), 100);
569                assert_eq!(values[0], 0);
570                assert_eq!(values[99], 99);
571                assert!(valid.iter().all(|&v| v)); // No nulls in id.
572            }
573            _ => panic!("expected Int64"),
574        }
575    }
576
577    #[test]
578    fn read_string_column() {
579        let segment = write_test_segment(50);
580        let reader = SegmentReader::open(&segment).expect("open");
581
582        let col = reader.read_column(1).expect("read name column");
583        match col {
584            DecodedColumn::Binary {
585                data,
586                offsets,
587                valid,
588            } => {
589                assert_eq!(valid.len(), 50);
590                assert!(valid.iter().all(|&v| v));
591                // Check first row.
592                let start = offsets[0] as usize;
593                let end = offsets[1] as usize;
594                let first = std::str::from_utf8(&data[start..end]).expect("utf8");
595                assert_eq!(first, "user_0");
596                // Check last row.
597                let start = offsets[49] as usize;
598                let end = offsets[50] as usize;
599                let last = std::str::from_utf8(&data[start..end]).expect("utf8");
600                assert_eq!(last, "user_49");
601            }
602            _ => panic!("expected Binary (string)"),
603        }
604    }
605
606    #[test]
607    fn read_float64_with_nulls() {
608        let segment = write_test_segment(100);
609        let reader = SegmentReader::open(&segment).expect("open");
610
611        let col = reader.read_column(2).expect("read score column");
612        // Score column uses AlpFastLanesLz4 → decoded as Float64.
613        let (values, valid) = match &col {
614            DecodedColumn::Float64 { values, valid } => (values.as_slice(), valid.as_slice()),
615            other => panic!("expected Float64, got {other:?}"),
616        };
617
618        // Float64 column: every 5th row is null (rows 0,5,10,...,95 = 20 nulls).
619        assert_eq!(valid.len(), 100);
620        let null_count = valid.iter().filter(|&&v| !v).count();
621        assert_eq!(null_count, 20);
622
623        // Row 1: score = 1 * 0.5 = 0.5
624        assert!(valid[1]);
625        assert!((values[1] - 0.5).abs() < 0.001);
626    }
627
628    #[test]
629    fn predicate_pushdown_skips_blocks() {
630        // Create a segment with multiple blocks (> 1024 rows).
631        let segment = write_test_segment(2500);
632        let reader = SegmentReader::open(&segment).expect("open");
633
634        // id column has 3 blocks: [0..1023], [1024..2047], [2048..2499].
635        let footer = reader.footer();
636        assert_eq!(footer.columns[0].block_count, 3);
637
638        // Predicate: id > 2100 → should skip blocks 0 and 1.
639        let pred = ScanPredicate::gt(0, 2100.0);
640        let col = reader
641            .read_column_filtered(0, &[pred])
642            .expect("filtered read");
643
644        match col {
645            DecodedColumn::Int64 { values, valid } => {
646                assert_eq!(values.len(), 2500);
647                // Blocks 0 and 1 should be null-filled (skipped).
648                assert!(!valid[0]); // Block 0 row 0: skipped.
649                assert!(!valid[1023]); // Block 0 last row: skipped.
650                assert!(!valid[1024]); // Block 1 first row: skipped.
651                assert!(!valid[2047]); // Block 1 last row: skipped.
652                // Block 2 should be present.
653                assert!(valid[2048]); // Block 2 first row: present.
654                assert_eq!(values[2048], 2048);
655                assert!(valid[2499]);
656                assert_eq!(values[2499], 2499);
657            }
658            _ => panic!("expected Int64"),
659        }
660    }
661
662    #[test]
663    fn read_multiple_columns() {
664        let segment = write_test_segment(50);
665        let reader = SegmentReader::open(&segment).expect("open");
666
667        let cols = reader.read_columns(&[0, 2], &[]).expect("read multi");
668        assert_eq!(cols.len(), 2);
669
670        // Column 0 (id): Int64.
671        match &cols[0] {
672            DecodedColumn::Int64 { values, .. } => {
673                assert_eq!(values.len(), 50);
674            }
675            _ => panic!("expected Int64 for id"),
676        }
677    }
678
679    #[test]
680    fn column_out_of_range() {
681        let segment = write_test_segment(10);
682        let reader = SegmentReader::open(&segment).expect("open");
683        assert!(matches!(
684            reader.read_column(99),
685            Err(ColumnarError::ColumnOutOfRange { index: 99, .. })
686        ));
687    }
688
689    #[test]
690    fn write_read_roundtrip_multi_block() {
691        let segment = write_test_segment(3000);
692        let reader = SegmentReader::open(&segment).expect("open");
693
694        let col = reader.read_column(0).expect("read id");
695        match col {
696            DecodedColumn::Int64 { values, valid } => {
697                assert_eq!(values.len(), 3000);
698                for i in 0..3000 {
699                    assert!(valid[i], "row {i} should be valid");
700                    assert_eq!(values[i], i as i64, "row {i} value mismatch");
701                }
702            }
703            _ => panic!("expected Int64"),
704        }
705    }
706
707    #[test]
708    fn string_predicate_pushdown_skips_blocks() {
709        use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
710        use nodedb_types::value::Value;
711
712        // Two-block segment:
713        //   Block 0 (1024 rows): "aaaa_NNNN" → lexicographic range [aaaa_0000, aaaa_1023]
714        //   Block 1 (476 rows):  "zzzz_NNNN" → range [zzzz_1024, zzzz_1499]
715        let schema = ColumnarSchema::new(vec![ColumnDef::required("tag", ColumnType::String)])
716            .expect("valid");
717
718        let mut mt = crate::memtable::ColumnarMemtable::new(&schema);
719        for i in 0..1024usize {
720            mt.append_row(&[Value::String(format!("aaaa_{i:04}"))])
721                .expect("append");
722        }
723        for i in 1024..1500usize {
724            mt.append_row(&[Value::String(format!("zzzz_{i}"))])
725                .expect("append");
726        }
727
728        let (schema, columns, row_count) = mt.drain();
729        let segment = crate::writer::SegmentWriter::plain()
730            .write_segment(&schema, &columns, row_count)
731            .expect("write");
732
733        let reader = SegmentReader::open(&segment).expect("open");
734        let footer = reader.footer();
735
736        // Confirm zone maps are populated on both blocks.
737        let b0 = &footer.columns[0].block_stats[0];
738        let b1 = &footer.columns[0].block_stats[1];
739        assert!(b0.str_min.is_some(), "block 0 str_min missing");
740        assert!(b1.str_min.is_some(), "block 1 str_min missing");
741
742        // Predicate: tag >= "zzzz_0" → block 0 max ≈ "aaaa_..." < "zzzz_0" → skip block 0.
743        let pred = ScanPredicate::str_gte(0, "zzzz_0");
744        assert!(pred.can_skip_block(b0), "block 0 should be skippable");
745        assert!(!pred.can_skip_block(b1), "block 1 should not be skipped");
746
747        // End-to-end read: block 0 should be null-filled, block 1 decoded.
748        let col = reader
749            .read_column_filtered(0, &[pred])
750            .expect("filtered read");
751        match col {
752            DecodedColumn::Binary { valid, .. } => {
753                assert_eq!(valid.len(), 1500);
754                // Block 0 (rows 0..1024) null-filled.
755                assert!(!valid[0], "row 0 should be null-filled (skipped block)");
756                assert!(!valid[1023], "row 1023 should be null-filled");
757                // Block 1 (rows 1024..1500) present.
758                assert!(valid[1024], "row 1024 should be valid");
759                assert!(valid[1499], "row 1499 should be valid");
760            }
761            _ => panic!("expected Binary for string column"),
762        }
763    }
764
765    /// Write a segment from a memtable that has been dict-encoded, then read back
766    /// and verify the dictionary and IDs match the original values.
767    #[test]
768    fn dict_encoded_roundtrip() {
769        use crate::memtable::{ColumnData, ColumnarMemtable, DICT_ENCODE_MAX_CARDINALITY};
770        use crate::writer::SegmentWriter;
771
772        let schema = ColumnarSchema::new(vec![
773            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
774            ColumnDef::required("qtype", ColumnType::String),
775        ])
776        .expect("valid");
777
778        let qtypes = ["A", "AAAA", "MX", "NS", "SOA", "CNAME", "PTR", "TXT"];
779        let mut mt = ColumnarMemtable::new(&schema);
780        for (i, &q) in qtypes.iter().cycle().take(100).enumerate() {
781            mt.append_row(&[Value::Integer(i as i64), Value::String(q.into())])
782                .expect("append");
783        }
784
785        // Convert low-cardinality string column to dict-encoded.
786        mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
787
788        // Verify it converted.
789        assert!(matches!(mt.columns()[1], ColumnData::DictEncoded { .. }));
790
791        let (schema, columns, row_count) = mt.drain();
792        let segment = SegmentWriter::plain()
793            .write_segment(&schema, &columns, row_count)
794            .expect("write segment");
795
796        // Read back.
797        let reader = SegmentReader::open(&segment).expect("open");
798        assert_eq!(reader.row_count(), 100);
799
800        // The footer should record the dictionary.
801        let dict_in_meta = reader.footer().columns[1].dictionary.as_deref();
802        assert!(dict_in_meta.is_some(), "dictionary should be in ColumnMeta");
803        let meta_dict = dict_in_meta.expect("present");
804        assert_eq!(meta_dict.len(), 8, "8 distinct qtypes");
805
806        // Read the qtype column — should come back as DictEncoded.
807        let col = reader.read_column(1).expect("read qtype column");
808        match col {
809            DecodedColumn::DictEncoded {
810                ids,
811                dictionary,
812                valid,
813            } => {
814                assert_eq!(ids.len(), 100);
815                assert_eq!(valid.len(), 100);
816                assert!(valid.iter().all(|&v| v));
817                assert_eq!(dictionary.len(), 8);
818
819                // Verify round-trip: each row's ID resolves to the original qtype.
820                for (i, &q) in qtypes.iter().cycle().take(100).enumerate() {
821                    let resolved = &dictionary[ids[i] as usize];
822                    assert_eq!(resolved, q, "row {i}: expected {q}, got {resolved}");
823                }
824            }
825            _ => panic!("expected DictEncoded, got {col:?}"),
826        }
827    }
828
829    /// Dict-encoded column with nulls must decode with valid=false for null rows.
830    #[test]
831    fn dict_encoded_roundtrip_with_nulls() {
832        use crate::memtable::{ColumnarMemtable, DICT_ENCODE_MAX_CARDINALITY};
833        use crate::writer::SegmentWriter;
834
835        let schema = ColumnarSchema::new(vec![ColumnDef::nullable("rcode", ColumnType::String)])
836            .expect("valid");
837
838        let mut mt = ColumnarMemtable::new(&schema);
839        mt.append_row(&[Value::String("NOERROR".into())])
840            .expect("append");
841        mt.append_row(&[Value::Null]).expect("null");
842        mt.append_row(&[Value::String("NXDOMAIN".into())])
843            .expect("append");
844        mt.append_row(&[Value::Null]).expect("null");
845        mt.append_row(&[Value::String("SERVFAIL".into())])
846            .expect("append");
847
848        mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
849
850        let (schema, columns, row_count) = mt.drain();
851        let segment = SegmentWriter::plain()
852            .write_segment(&schema, &columns, row_count)
853            .expect("write");
854
855        let reader = SegmentReader::open(&segment).expect("open");
856        let col = reader.read_column(0).expect("read");
857
858        match col {
859            DecodedColumn::DictEncoded {
860                ids,
861                dictionary,
862                valid,
863            } => {
864                assert_eq!(ids.len(), 5);
865                assert!(valid[0]);
866                assert!(!valid[1]); // Null.
867                assert!(valid[2]);
868                assert!(!valid[3]); // Null.
869                assert!(valid[4]);
870                assert_eq!(dictionary.len(), 3);
871
872                assert_eq!(&dictionary[ids[0] as usize], "NOERROR");
873                assert_eq!(&dictionary[ids[2] as usize], "NXDOMAIN");
874                assert_eq!(&dictionary[ids[4] as usize], "SERVFAIL");
875            }
876            _ => panic!("expected DictEncoded"),
877        }
878    }
879}