1use nodedb_codec::ColumnCodec;
14
15use crate::delete_bitmap::DeleteBitmap;
16use crate::error::ColumnarError;
17use crate::format::{ColumnMeta, HEADER_SIZE, SegmentFooter, SegmentHeader};
18use crate::predicate::ScanPredicate;
19
20#[derive(Debug)]
22pub enum DecodedColumn {
23 Int64 {
24 values: Vec<i64>,
25 valid: Vec<bool>,
26 },
27 Float64 {
28 values: Vec<f64>,
29 valid: Vec<bool>,
30 },
31 Timestamp {
32 values: Vec<i64>,
33 valid: Vec<bool>,
34 },
35 Bool {
36 values: Vec<bool>,
37 valid: Vec<bool>,
38 },
39 Binary {
41 data: Vec<u8>,
43 offsets: Vec<u32>,
45 valid: Vec<bool>,
46 },
47 DictEncoded {
52 ids: Vec<u32>,
54 dictionary: Vec<String>,
56 valid: Vec<bool>,
57 },
58}
59
60pub struct SegmentReader<'a> {
65 data: &'a [u8],
66 footer: SegmentFooter,
67}
68
69impl<'a> SegmentReader<'a> {
70 pub fn open(data: &'a [u8]) -> Result<Self, ColumnarError> {
72 SegmentHeader::from_bytes(data)?;
73 let footer = SegmentFooter::from_segment_tail(data)?;
74 Ok(Self { data, footer })
75 }
76
77 pub fn footer(&self) -> &SegmentFooter {
79 &self.footer
80 }
81
82 pub fn row_count(&self) -> u64 {
84 self.footer.row_count
85 }
86
87 pub fn column_count(&self) -> usize {
89 self.footer.column_count as usize
90 }
91
92 pub fn read_column(&self, col_idx: usize) -> Result<DecodedColumn, ColumnarError> {
96 self.read_column_filtered(col_idx, &[])
97 }
98
99 pub fn read_column_filtered(
105 &self,
106 col_idx: usize,
107 predicates: &[ScanPredicate],
108 ) -> Result<DecodedColumn, ColumnarError> {
109 self.read_column_impl(col_idx, predicates, &DeleteBitmap::new())
110 }
111
112 pub fn read_columns(
117 &self,
118 col_indices: &[usize],
119 predicates: &[ScanPredicate],
120 ) -> Result<Vec<DecodedColumn>, ColumnarError> {
121 col_indices
122 .iter()
123 .map(|&idx| self.read_column_filtered(idx, predicates))
124 .collect()
125 }
126
127 pub fn read_column_with_deletes(
132 &self,
133 col_idx: usize,
134 predicates: &[ScanPredicate],
135 deletes: &DeleteBitmap,
136 ) -> Result<DecodedColumn, ColumnarError> {
137 self.read_column_impl(col_idx, predicates, deletes)
138 }
139
140 fn read_column_impl(
143 &self,
144 col_idx: usize,
145 predicates: &[ScanPredicate],
146 deletes: &DeleteBitmap,
147 ) -> Result<DecodedColumn, ColumnarError> {
148 if col_idx >= self.footer.columns.len() {
149 return Err(ColumnarError::ColumnOutOfRange {
150 index: col_idx,
151 count: self.footer.columns.len(),
152 });
153 }
154
155 let col_meta = &self.footer.columns[col_idx];
156 let my_preds: Vec<&ScanPredicate> =
157 predicates.iter().filter(|p| p.col_idx == col_idx).collect();
158
159 let col_start = HEADER_SIZE + col_meta.offset as usize;
160 let mut cursor = col_start;
161 let col_type = infer_column_type(col_meta);
162 let mut result = empty_decoded(&col_type);
163 let mut global_row: u32 = 0;
164
165 for block_stat in &col_meta.block_stats {
166 let block_row_count = block_stat.row_count;
167
168 if cursor + 4 > self.data.len() {
169 return Err(ColumnarError::TruncatedSegment {
170 expected: cursor + 4,
171 got: self.data.len(),
172 });
173 }
174 let block_len = u32::from_le_bytes([
175 self.data[cursor],
176 self.data[cursor + 1],
177 self.data[cursor + 2],
178 self.data[cursor + 3],
179 ]) as usize;
180 cursor += 4;
181 let block_data = &self.data[cursor..cursor + block_len];
182 cursor += block_len;
183
184 let pred_skip = my_preds.iter().any(|p| p.can_skip_block(block_stat));
186
187 let delete_skip =
189 !deletes.is_empty() && deletes.is_block_fully_deleted(global_row, block_row_count);
190
191 if pred_skip || delete_skip {
192 append_null_fill(&mut result, block_row_count as usize);
193 global_row += block_row_count;
194 continue;
195 }
196
197 let pre_len = result_valid_len(&result);
199 decode_block(
200 &mut result,
201 block_data,
202 &col_type,
203 col_meta.codec,
204 block_row_count as usize,
205 col_meta.dictionary.as_deref(),
206 )?;
207
208 if !deletes.is_empty() {
210 let valid_slice = result_valid_slice_mut(&mut result, pre_len);
211 deletes.apply_to_validity(valid_slice, global_row);
212 }
213
214 global_row += block_row_count;
215 }
216
217 Ok(result)
218 }
219
220 pub fn read_columns_with_deletes(
222 &self,
223 col_indices: &[usize],
224 predicates: &[ScanPredicate],
225 deletes: &DeleteBitmap,
226 ) -> Result<Vec<DecodedColumn>, ColumnarError> {
227 col_indices
228 .iter()
229 .map(|&idx| self.read_column_with_deletes(idx, predicates, deletes))
230 .collect()
231 }
232}
233
234fn infer_column_type(meta: &ColumnMeta) -> ColumnKind {
239 if meta.dictionary.is_some() {
242 return ColumnKind::DictEncoded;
243 }
244
245 match meta.codec {
246 ColumnCodec::DeltaFastLanesLz4
247 | ColumnCodec::DeltaFastLanesRans
248 | ColumnCodec::FastLanesLz4
249 | ColumnCodec::Delta
250 | ColumnCodec::DoubleDelta => ColumnKind::Int64,
251
252 ColumnCodec::AlpFastLanesLz4
253 | ColumnCodec::AlpFastLanesRans
254 | ColumnCodec::AlpRdLz4
255 | ColumnCodec::PcodecLz4
256 | ColumnCodec::Gorilla => ColumnKind::Float64,
257
258 ColumnCodec::FsstLz4 | ColumnCodec::FsstRans => ColumnKind::VarLen,
259
260 ColumnCodec::Lz4 | ColumnCodec::Raw | ColumnCodec::Zstd | ColumnCodec::Auto => {
263 if meta.block_stats.first().is_some_and(|s| !s.min.is_nan()) {
264 ColumnKind::Int64 } else {
266 ColumnKind::Binary
267 }
268 }
269 }
270}
271
272#[derive(Debug, Clone, Copy)]
274enum ColumnKind {
275 Int64,
276 Float64,
277 VarLen,
278 Binary,
279 DictEncoded,
280}
281
282fn empty_decoded(kind: &ColumnKind) -> DecodedColumn {
284 match kind {
285 ColumnKind::Int64 => DecodedColumn::Int64 {
286 values: Vec::new(),
287 valid: Vec::new(),
288 },
289 ColumnKind::Float64 => DecodedColumn::Float64 {
290 values: Vec::new(),
291 valid: Vec::new(),
292 },
293 ColumnKind::VarLen | ColumnKind::Binary => DecodedColumn::Binary {
294 data: Vec::new(),
295 offsets: Vec::new(),
296 valid: Vec::new(),
297 },
298 ColumnKind::DictEncoded => DecodedColumn::DictEncoded {
299 ids: Vec::new(),
300 dictionary: Vec::new(), valid: Vec::new(),
302 },
303 }
304}
305
306fn append_null_fill(result: &mut DecodedColumn, row_count: usize) {
308 match result {
309 DecodedColumn::Int64 { values, valid } => {
310 values.extend(std::iter::repeat_n(0i64, row_count));
311 valid.extend(std::iter::repeat_n(false, row_count));
312 }
313 DecodedColumn::Float64 { values, valid } => {
314 values.extend(std::iter::repeat_n(0.0f64, row_count));
315 valid.extend(std::iter::repeat_n(false, row_count));
316 }
317 DecodedColumn::Timestamp { values, valid } => {
318 values.extend(std::iter::repeat_n(0i64, row_count));
319 valid.extend(std::iter::repeat_n(false, row_count));
320 }
321 DecodedColumn::Bool { values, valid } => {
322 values.extend(std::iter::repeat_n(false, row_count));
323 valid.extend(std::iter::repeat_n(false, row_count));
324 }
325 DecodedColumn::Binary {
326 data: _,
327 offsets,
328 valid,
329 } => {
330 let last = *offsets.last().unwrap_or(&0);
331 if offsets.is_empty() {
334 offsets.push(last); }
336 offsets.extend(std::iter::repeat_n(last, row_count));
337 valid.extend(std::iter::repeat_n(false, row_count));
338 }
339 DecodedColumn::DictEncoded { ids, valid, .. } => {
340 ids.extend(std::iter::repeat_n(0u32, row_count));
341 valid.extend(std::iter::repeat_n(false, row_count));
342 }
343 }
344}
345
346fn result_valid_len(result: &DecodedColumn) -> usize {
348 match result {
349 DecodedColumn::Int64 { valid, .. }
350 | DecodedColumn::Float64 { valid, .. }
351 | DecodedColumn::Timestamp { valid, .. }
352 | DecodedColumn::Bool { valid, .. }
353 | DecodedColumn::Binary { valid, .. }
354 | DecodedColumn::DictEncoded { valid, .. } => valid.len(),
355 }
356}
357
358fn result_valid_slice_mut(result: &mut DecodedColumn, offset: usize) -> &mut [bool] {
360 match result {
361 DecodedColumn::Int64 { valid, .. }
362 | DecodedColumn::Float64 { valid, .. }
363 | DecodedColumn::Timestamp { valid, .. }
364 | DecodedColumn::Bool { valid, .. }
365 | DecodedColumn::Binary { valid, .. }
366 | DecodedColumn::DictEncoded { valid, .. } => &mut valid[offset..],
367 }
368}
369
370fn decode_block(
372 result: &mut DecodedColumn,
373 block_data: &[u8],
374 kind: &ColumnKind,
375 codec: ColumnCodec,
376 row_count: usize,
377 dictionary: Option<&[String]>,
378) -> Result<(), ColumnarError> {
379 let bitmap_size = row_count.div_ceil(8);
380
381 if block_data.len() < bitmap_size {
382 return Err(ColumnarError::TruncatedSegment {
383 expected: bitmap_size,
384 got: block_data.len(),
385 });
386 }
387
388 let bitmap = &block_data[..bitmap_size];
389 let payload = &block_data[bitmap_size..];
390
391 let valid: Vec<bool> = (0..row_count)
393 .map(|i| bitmap[i / 8] & (1 << (i % 8)) != 0)
394 .collect();
395
396 match kind {
397 ColumnKind::Int64 => {
398 let DecodedColumn::Int64 { values, valid: v } = result else {
399 append_null_fill(result, row_count);
400 return Ok(());
401 };
402 let decoded = nodedb_codec::decode_i64_pipeline(payload, codec)?;
403 values.extend_from_slice(&decoded[..row_count.min(decoded.len())]);
404 while values.len() < v.len() + row_count {
405 values.push(0);
406 }
407 v.extend_from_slice(&valid);
408 }
409 ColumnKind::Float64 => {
410 let DecodedColumn::Float64 { values, valid: v } = result else {
411 append_null_fill(result, row_count);
412 return Ok(());
413 };
414 let decoded = nodedb_codec::decode_f64_pipeline(payload, codec)?;
415 values.extend_from_slice(&decoded[..row_count.min(decoded.len())]);
416 while values.len() < v.len() + row_count {
417 values.push(0.0);
418 }
419 v.extend_from_slice(&valid);
420 }
421 ColumnKind::VarLen => {
422 let DecodedColumn::Binary {
423 data,
424 offsets,
425 valid: v,
426 } = result
427 else {
428 append_null_fill(result, row_count);
429 return Ok(());
430 };
431 if payload.len() < 4 {
433 return Err(ColumnarError::TruncatedSegment {
434 expected: bitmap_size + 4,
435 got: block_data.len(),
436 });
437 }
438 let offset_len =
439 u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]) as usize;
440 let offset_data = &payload[4..4 + offset_len];
441 let string_data = &payload[4 + offset_len..];
442
443 let decoded_offsets =
444 nodedb_codec::decode_i64_pipeline(offset_data, ColumnCodec::DeltaFastLanesLz4)?;
445 let decoded_bytes = nodedb_codec::decode_bytes_pipeline(string_data, codec)?;
446
447 let base = data.len() as u32;
450 let n_offsets = (row_count + 1).min(decoded_offsets.len());
451 for &off in &decoded_offsets[..n_offsets] {
452 offsets.push(base + off as u32);
453 }
454
455 data.extend_from_slice(&decoded_bytes);
456 v.extend_from_slice(&valid);
457 }
458 ColumnKind::Binary => {
459 let DecodedColumn::Binary {
460 data,
461 offsets,
462 valid: v,
463 } = result
464 else {
465 append_null_fill(result, row_count);
466 return Ok(());
467 };
468 let decoded_bytes = nodedb_codec::decode_bytes_pipeline(payload, codec)?;
469 let base = data.len() as u32;
470
471 if row_count > 0 && !decoded_bytes.is_empty() {
472 let chunk_size = decoded_bytes.len() / row_count;
473 for i in 0..row_count {
474 offsets.push(base + (i * chunk_size) as u32);
475 }
476 offsets.push(base + decoded_bytes.len() as u32);
477 } else {
478 let last = *offsets.last().unwrap_or(&0);
479 offsets.extend(std::iter::repeat_n(last, row_count + 1));
480 }
481
482 data.extend_from_slice(&decoded_bytes);
483 v.extend_from_slice(&valid);
484 }
485 ColumnKind::DictEncoded => {
486 let DecodedColumn::DictEncoded {
487 ids,
488 dictionary: col_dict,
489 valid: v,
490 } = result
491 else {
492 append_null_fill(result, row_count);
493 return Ok(());
494 };
495
496 let decoded =
498 nodedb_codec::decode_i64_pipeline(payload, ColumnCodec::DeltaFastLanesLz4)?;
499 let id_slice = &decoded[..row_count.min(decoded.len())];
500 ids.extend(id_slice.iter().map(|&id| id as u32));
501 while ids.len() < v.len() + row_count {
503 ids.push(0);
504 }
505 v.extend_from_slice(&valid);
506
507 if col_dict.is_empty()
509 && let Some(dict) = dictionary
510 {
511 col_dict.extend_from_slice(dict);
512 }
513 }
514 }
515
516 Ok(())
517}
518
519#[cfg(test)]
520mod tests {
521 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
522 use nodedb_types::value::Value;
523
524 use super::*;
525 use crate::memtable::ColumnarMemtable;
526 use crate::writer::SegmentWriter;
527
528 fn write_test_segment(rows: usize) -> Vec<u8> {
529 let schema = ColumnarSchema::new(vec![
530 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
531 ColumnDef::required("name", ColumnType::String),
532 ColumnDef::nullable("score", ColumnType::Float64),
533 ])
534 .expect("valid");
535
536 let mut mt = ColumnarMemtable::new(&schema);
537 for i in 0..rows {
538 mt.append_row(&[
539 Value::Integer(i as i64),
540 Value::String(format!("user_{i}")),
541 if i % 5 == 0 {
542 Value::Null
543 } else {
544 Value::Float(i as f64 * 0.5)
545 },
546 ])
547 .expect("append");
548 }
549
550 let (schema, columns, row_count) = mt.drain();
551 SegmentWriter::plain()
552 .write_segment(&schema, &columns, row_count)
553 .expect("write")
554 }
555
556 #[test]
557 fn read_int64_column() {
558 let segment = write_test_segment(100);
559 let reader = SegmentReader::open(&segment).expect("open");
560
561 assert_eq!(reader.row_count(), 100);
562 assert_eq!(reader.column_count(), 3);
563
564 let col = reader.read_column(0).expect("read id column");
565 match col {
566 DecodedColumn::Int64 { values, valid } => {
567 assert_eq!(values.len(), 100);
568 assert_eq!(valid.len(), 100);
569 assert_eq!(values[0], 0);
570 assert_eq!(values[99], 99);
571 assert!(valid.iter().all(|&v| v)); }
573 _ => panic!("expected Int64"),
574 }
575 }
576
577 #[test]
578 fn read_string_column() {
579 let segment = write_test_segment(50);
580 let reader = SegmentReader::open(&segment).expect("open");
581
582 let col = reader.read_column(1).expect("read name column");
583 match col {
584 DecodedColumn::Binary {
585 data,
586 offsets,
587 valid,
588 } => {
589 assert_eq!(valid.len(), 50);
590 assert!(valid.iter().all(|&v| v));
591 let start = offsets[0] as usize;
593 let end = offsets[1] as usize;
594 let first = std::str::from_utf8(&data[start..end]).expect("utf8");
595 assert_eq!(first, "user_0");
596 let start = offsets[49] as usize;
598 let end = offsets[50] as usize;
599 let last = std::str::from_utf8(&data[start..end]).expect("utf8");
600 assert_eq!(last, "user_49");
601 }
602 _ => panic!("expected Binary (string)"),
603 }
604 }
605
606 #[test]
607 fn read_float64_with_nulls() {
608 let segment = write_test_segment(100);
609 let reader = SegmentReader::open(&segment).expect("open");
610
611 let col = reader.read_column(2).expect("read score column");
612 let (values, valid) = match &col {
614 DecodedColumn::Float64 { values, valid } => (values.as_slice(), valid.as_slice()),
615 other => panic!("expected Float64, got {other:?}"),
616 };
617
618 assert_eq!(valid.len(), 100);
620 let null_count = valid.iter().filter(|&&v| !v).count();
621 assert_eq!(null_count, 20);
622
623 assert!(valid[1]);
625 assert!((values[1] - 0.5).abs() < 0.001);
626 }
627
628 #[test]
629 fn predicate_pushdown_skips_blocks() {
630 let segment = write_test_segment(2500);
632 let reader = SegmentReader::open(&segment).expect("open");
633
634 let footer = reader.footer();
636 assert_eq!(footer.columns[0].block_count, 3);
637
638 let pred = ScanPredicate::gt(0, 2100.0);
640 let col = reader
641 .read_column_filtered(0, &[pred])
642 .expect("filtered read");
643
644 match col {
645 DecodedColumn::Int64 { values, valid } => {
646 assert_eq!(values.len(), 2500);
647 assert!(!valid[0]); assert!(!valid[1023]); assert!(!valid[1024]); assert!(!valid[2047]); assert!(valid[2048]); assert_eq!(values[2048], 2048);
655 assert!(valid[2499]);
656 assert_eq!(values[2499], 2499);
657 }
658 _ => panic!("expected Int64"),
659 }
660 }
661
662 #[test]
663 fn read_multiple_columns() {
664 let segment = write_test_segment(50);
665 let reader = SegmentReader::open(&segment).expect("open");
666
667 let cols = reader.read_columns(&[0, 2], &[]).expect("read multi");
668 assert_eq!(cols.len(), 2);
669
670 match &cols[0] {
672 DecodedColumn::Int64 { values, .. } => {
673 assert_eq!(values.len(), 50);
674 }
675 _ => panic!("expected Int64 for id"),
676 }
677 }
678
679 #[test]
680 fn column_out_of_range() {
681 let segment = write_test_segment(10);
682 let reader = SegmentReader::open(&segment).expect("open");
683 assert!(matches!(
684 reader.read_column(99),
685 Err(ColumnarError::ColumnOutOfRange { index: 99, .. })
686 ));
687 }
688
689 #[test]
690 fn write_read_roundtrip_multi_block() {
691 let segment = write_test_segment(3000);
692 let reader = SegmentReader::open(&segment).expect("open");
693
694 let col = reader.read_column(0).expect("read id");
695 match col {
696 DecodedColumn::Int64 { values, valid } => {
697 assert_eq!(values.len(), 3000);
698 for i in 0..3000 {
699 assert!(valid[i], "row {i} should be valid");
700 assert_eq!(values[i], i as i64, "row {i} value mismatch");
701 }
702 }
703 _ => panic!("expected Int64"),
704 }
705 }
706
707 #[test]
708 fn string_predicate_pushdown_skips_blocks() {
709 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
710 use nodedb_types::value::Value;
711
712 let schema = ColumnarSchema::new(vec![ColumnDef::required("tag", ColumnType::String)])
716 .expect("valid");
717
718 let mut mt = crate::memtable::ColumnarMemtable::new(&schema);
719 for i in 0..1024usize {
720 mt.append_row(&[Value::String(format!("aaaa_{i:04}"))])
721 .expect("append");
722 }
723 for i in 1024..1500usize {
724 mt.append_row(&[Value::String(format!("zzzz_{i}"))])
725 .expect("append");
726 }
727
728 let (schema, columns, row_count) = mt.drain();
729 let segment = crate::writer::SegmentWriter::plain()
730 .write_segment(&schema, &columns, row_count)
731 .expect("write");
732
733 let reader = SegmentReader::open(&segment).expect("open");
734 let footer = reader.footer();
735
736 let b0 = &footer.columns[0].block_stats[0];
738 let b1 = &footer.columns[0].block_stats[1];
739 assert!(b0.str_min.is_some(), "block 0 str_min missing");
740 assert!(b1.str_min.is_some(), "block 1 str_min missing");
741
742 let pred = ScanPredicate::str_gte(0, "zzzz_0");
744 assert!(pred.can_skip_block(b0), "block 0 should be skippable");
745 assert!(!pred.can_skip_block(b1), "block 1 should not be skipped");
746
747 let col = reader
749 .read_column_filtered(0, &[pred])
750 .expect("filtered read");
751 match col {
752 DecodedColumn::Binary { valid, .. } => {
753 assert_eq!(valid.len(), 1500);
754 assert!(!valid[0], "row 0 should be null-filled (skipped block)");
756 assert!(!valid[1023], "row 1023 should be null-filled");
757 assert!(valid[1024], "row 1024 should be valid");
759 assert!(valid[1499], "row 1499 should be valid");
760 }
761 _ => panic!("expected Binary for string column"),
762 }
763 }
764
765 #[test]
768 fn dict_encoded_roundtrip() {
769 use crate::memtable::{ColumnData, ColumnarMemtable, DICT_ENCODE_MAX_CARDINALITY};
770 use crate::writer::SegmentWriter;
771
772 let schema = ColumnarSchema::new(vec![
773 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
774 ColumnDef::required("qtype", ColumnType::String),
775 ])
776 .expect("valid");
777
778 let qtypes = ["A", "AAAA", "MX", "NS", "SOA", "CNAME", "PTR", "TXT"];
779 let mut mt = ColumnarMemtable::new(&schema);
780 for (i, &q) in qtypes.iter().cycle().take(100).enumerate() {
781 mt.append_row(&[Value::Integer(i as i64), Value::String(q.into())])
782 .expect("append");
783 }
784
785 mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
787
788 assert!(matches!(mt.columns()[1], ColumnData::DictEncoded { .. }));
790
791 let (schema, columns, row_count) = mt.drain();
792 let segment = SegmentWriter::plain()
793 .write_segment(&schema, &columns, row_count)
794 .expect("write segment");
795
796 let reader = SegmentReader::open(&segment).expect("open");
798 assert_eq!(reader.row_count(), 100);
799
800 let dict_in_meta = reader.footer().columns[1].dictionary.as_deref();
802 assert!(dict_in_meta.is_some(), "dictionary should be in ColumnMeta");
803 let meta_dict = dict_in_meta.expect("present");
804 assert_eq!(meta_dict.len(), 8, "8 distinct qtypes");
805
806 let col = reader.read_column(1).expect("read qtype column");
808 match col {
809 DecodedColumn::DictEncoded {
810 ids,
811 dictionary,
812 valid,
813 } => {
814 assert_eq!(ids.len(), 100);
815 assert_eq!(valid.len(), 100);
816 assert!(valid.iter().all(|&v| v));
817 assert_eq!(dictionary.len(), 8);
818
819 for (i, &q) in qtypes.iter().cycle().take(100).enumerate() {
821 let resolved = &dictionary[ids[i] as usize];
822 assert_eq!(resolved, q, "row {i}: expected {q}, got {resolved}");
823 }
824 }
825 _ => panic!("expected DictEncoded, got {col:?}"),
826 }
827 }
828
829 #[test]
831 fn dict_encoded_roundtrip_with_nulls() {
832 use crate::memtable::{ColumnarMemtable, DICT_ENCODE_MAX_CARDINALITY};
833 use crate::writer::SegmentWriter;
834
835 let schema = ColumnarSchema::new(vec![ColumnDef::nullable("rcode", ColumnType::String)])
836 .expect("valid");
837
838 let mut mt = ColumnarMemtable::new(&schema);
839 mt.append_row(&[Value::String("NOERROR".into())])
840 .expect("append");
841 mt.append_row(&[Value::Null]).expect("null");
842 mt.append_row(&[Value::String("NXDOMAIN".into())])
843 .expect("append");
844 mt.append_row(&[Value::Null]).expect("null");
845 mt.append_row(&[Value::String("SERVFAIL".into())])
846 .expect("append");
847
848 mt.try_dict_encode_columns(DICT_ENCODE_MAX_CARDINALITY);
849
850 let (schema, columns, row_count) = mt.drain();
851 let segment = SegmentWriter::plain()
852 .write_segment(&schema, &columns, row_count)
853 .expect("write");
854
855 let reader = SegmentReader::open(&segment).expect("open");
856 let col = reader.read_column(0).expect("read");
857
858 match col {
859 DecodedColumn::DictEncoded {
860 ids,
861 dictionary,
862 valid,
863 } => {
864 assert_eq!(ids.len(), 5);
865 assert!(valid[0]);
866 assert!(!valid[1]); assert!(valid[2]);
868 assert!(!valid[3]); assert!(valid[4]);
870 assert_eq!(dictionary.len(), 3);
871
872 assert_eq!(&dictionary[ids[0] as usize], "NOERROR");
873 assert_eq!(&dictionary[ids[2] as usize], "NXDOMAIN");
874 assert_eq!(&dictionary[ids[4] as usize], "SERVFAIL");
875 }
876 _ => panic!("expected DictEncoded"),
877 }
878 }
879}