1use alopex_core::columnar::encoding::Column;
2use alopex_core::columnar::encoding_v2::Bitmap;
3use alopex_core::columnar::kvs_bridge::key_layout;
4use alopex_core::columnar::segment_v2::{
5 ColumnSegmentV2, InMemorySegmentSource, RecordBatch, SegmentReaderV2,
6};
7use alopex_core::kv::{KVStore, KVTransaction};
8use alopex_core::storage::format::bincode_config;
9use bincode::config::Options;
10
11use crate::ast::expr::BinaryOp;
12use crate::catalog::{ColumnMetadata, RowIdMode, TableMetadata};
13use crate::columnar::statistics::RowGroupStatistics;
14use crate::executor::evaluator::{EvalContext, evaluate};
15use crate::executor::query::iterator::RowIterator;
16use crate::executor::{ExecutorError, Result, Row};
17use crate::planner::typed_expr::{Projection, TypedExpr, TypedExprKind};
18use crate::planner::types::ResolvedType;
19use crate::storage::{SqlTxn, SqlValue};
20use std::collections::BTreeSet;
21
22#[derive(Debug, Clone)]
24pub struct ColumnarScan {
25 pub table_id: u32,
26 pub projected_columns: Vec<usize>,
27 pub pushed_filter: Option<PushdownFilter>,
28 pub residual_filter: Option<TypedExpr>,
29}
30
31#[derive(Debug, Clone, PartialEq)]
33pub enum PushdownFilter {
34 Eq {
35 column_idx: usize,
36 value: SqlValue,
37 },
38 Range {
39 column_idx: usize,
40 min: Option<SqlValue>,
41 max: Option<SqlValue>,
42 },
43 IsNull {
44 column_idx: usize,
45 is_null: bool,
46 },
47 And(Vec<PushdownFilter>),
48 Or(Vec<PushdownFilter>),
49}
50
51impl ColumnarScan {
52 pub fn new(
53 table_id: u32,
54 projected_columns: Vec<usize>,
55 pushed_filter: Option<PushdownFilter>,
56 residual_filter: Option<TypedExpr>,
57 ) -> Self {
58 Self {
59 table_id,
60 projected_columns,
61 pushed_filter,
62 residual_filter,
63 }
64 }
65
66 pub fn should_skip_row_group(&self, stats: &RowGroupStatistics) -> bool {
68 match &self.pushed_filter {
69 None => false,
70 Some(filter) => Self::evaluate_pushdown(filter, stats),
71 }
72 }
73
74 pub fn evaluate_pushdown(filter: &PushdownFilter, stats: &RowGroupStatistics) -> bool {
76 match filter {
77 PushdownFilter::Eq { column_idx, value } => match stats.columns.get(*column_idx) {
78 Some(col_stats) => {
79 if col_stats.total_count == 0 {
80 return true;
81 }
82 if matches!(
83 value.partial_cmp(&col_stats.min),
84 Some(std::cmp::Ordering::Less)
85 ) {
86 return true;
87 }
88 matches!(
89 value.partial_cmp(&col_stats.max),
90 Some(std::cmp::Ordering::Greater)
91 )
92 }
93 None => false,
94 },
95
96 PushdownFilter::Range {
97 column_idx,
98 min,
99 max,
100 } => match stats.columns.get(*column_idx) {
101 Some(col_stats) => {
102 if col_stats.total_count == 0 {
103 return true;
104 }
105 if let Some(filter_min) = min
106 && matches!(
107 col_stats.max.partial_cmp(filter_min),
108 Some(std::cmp::Ordering::Less)
109 )
110 {
111 return true;
112 }
113 if let Some(filter_max) = max
114 && matches!(
115 col_stats.min.partial_cmp(filter_max),
116 Some(std::cmp::Ordering::Greater)
117 )
118 {
119 return true;
120 }
121 false
122 }
123 None => false,
124 },
125
126 PushdownFilter::IsNull {
127 column_idx,
128 is_null,
129 } => match stats.columns.get(*column_idx) {
130 Some(col_stats) => {
131 if *is_null {
132 col_stats.null_count == 0
133 } else {
134 col_stats.null_count == col_stats.total_count
135 }
136 }
137 None => false,
138 },
139
140 PushdownFilter::And(filters) => {
141 if filters.is_empty() {
142 return false;
143 }
144 filters.iter().any(|f| Self::evaluate_pushdown(f, stats))
145 }
146
147 PushdownFilter::Or(filters) => {
148 if filters.is_empty() {
149 return false;
150 }
151 filters.iter().all(|f| Self::evaluate_pushdown(f, stats))
152 }
153 }
154 }
155}
156
157struct LoadedSegment {
163 reader: SegmentReaderV2,
165 row_group_stats: Option<Vec<RowGroupStatistics>>,
167 row_ids: Vec<u64>,
169 row_groups: Vec<alopex_core::columnar::segment_v2::RowGroupMeta>,
171}
172
173pub struct ColumnarScanIterator {
180 segments: Vec<LoadedSegment>,
182 segment_idx: usize,
184 row_group_idx: usize,
186 row_idx: usize,
188 current_batch: Option<RecordBatch>,
190 projected: Vec<usize>,
192 table_meta: TableMetadata,
194 schema: Vec<ColumnMetadata>,
196 scan: ColumnarScan,
198 row_id_col_idx: Option<usize>,
200 next_row_id: u64,
202}
203
204impl ColumnarScanIterator {
205 fn advance(&mut self) -> Option<Result<Row>> {
210 loop {
211 if self.current_batch.is_none() && !self.load_next_batch() {
213 return None; }
215
216 let row_count = match &self.current_batch {
218 Some(batch) => batch.num_rows(),
219 None => continue,
220 };
221
222 if self.row_idx >= row_count {
224 self.current_batch = None;
225 self.row_idx = 0;
226 self.row_group_idx += 1;
227 continue;
228 }
229
230 let row_idx = self.row_idx;
232 match self.convert_current_row(row_idx) {
233 Ok(Some(row)) => {
234 self.row_idx += 1;
235 return Some(Ok(row));
236 }
237 Ok(None) => {
238 self.row_idx += 1;
240 continue;
241 }
242 Err(e) => {
243 self.row_idx += 1;
244 return Some(Err(e));
245 }
246 }
247 }
248 }
249
250 fn load_next_batch(&mut self) -> bool {
254 while self.segment_idx < self.segments.len() {
255 let segment = &self.segments[self.segment_idx];
256 let row_group_count = segment.row_groups.len();
257
258 while self.row_group_idx < row_group_count {
259 let should_skip = match segment.row_group_stats.as_ref() {
261 Some(stats) if stats.len() == row_group_count => {
262 self.scan.should_skip_row_group(&stats[self.row_group_idx])
263 }
264 _ => false,
265 };
266
267 if should_skip {
268 self.row_group_idx += 1;
269 continue;
270 }
271
272 match segment
274 .reader
275 .read_row_group_by_index(&self.projected, self.row_group_idx)
276 {
277 Ok(mut batch) => {
278 if !segment.row_ids.is_empty()
280 && let Some(meta) = segment.row_groups.get(self.row_group_idx)
281 {
282 let start = meta.row_start as usize;
283 let end = start + meta.row_count as usize;
284 if end <= segment.row_ids.len() {
285 batch =
286 batch.with_row_ids(Some(segment.row_ids[start..end].to_vec()));
287 }
288 }
289 self.current_batch = Some(batch);
290 self.row_idx = 0;
291 return true;
292 }
293 Err(_) => {
294 self.row_group_idx += 1;
296 continue;
297 }
298 }
299 }
300
301 self.segment_idx += 1;
303 self.row_group_idx = 0;
304 }
305
306 false
307 }
308
309 fn convert_current_row(&mut self, row_idx: usize) -> Result<Option<Row>> {
313 let batch = self
314 .current_batch
315 .as_ref()
316 .ok_or_else(|| ExecutorError::Columnar("no current batch".into()))?;
317
318 let column_count = self.table_meta.column_count();
319 let mut values = vec![SqlValue::Null; column_count];
320
321 for (pos, &table_col_idx) in self.projected.iter().enumerate() {
322 let column = batch
323 .columns
324 .get(pos)
325 .ok_or_else(|| ExecutorError::Columnar("missing projected column".into()))?;
326 let bitmap = batch.null_bitmaps.get(pos).and_then(|b| b.as_ref());
327 let col_meta = self
328 .table_meta
329 .columns
330 .get(table_col_idx)
331 .ok_or_else(|| ExecutorError::Columnar("column index out of bounds".into()))?;
332 let value = value_from_column(column, bitmap, row_idx, &col_meta.data_type)?;
333 values[table_col_idx] = value;
334 }
335
336 if let Some(predicate) = self.scan.residual_filter.as_ref() {
338 let ctx = EvalContext::new(&values);
339 let keep = matches!(evaluate(predicate, &ctx)?, SqlValue::Boolean(true));
340 if !keep {
341 return Ok(None);
342 }
343 }
344
345 let batch = self
347 .current_batch
348 .as_ref()
349 .ok_or_else(|| ExecutorError::Columnar("no current batch".into()))?;
350
351 let row_id = match self.table_meta.storage_options.row_id_mode {
352 RowIdMode::Direct => {
353 if let Some(row_ids) = batch.row_ids.as_ref() {
354 *row_ids.get(row_idx).ok_or_else(|| {
355 ExecutorError::Columnar(
356 "row_id missing for row in row_id_mode=direct".into(),
357 )
358 })?
359 } else if let Some(idx) = self.row_id_col_idx {
360 let val = values.get(idx).ok_or_else(|| {
361 ExecutorError::Columnar("row_id column missing in projected values".into())
362 })?;
363 match val {
364 SqlValue::Integer(v) if *v >= 0 => *v as u64,
365 SqlValue::BigInt(v) if *v >= 0 => *v as u64,
366 other => {
367 return Err(ExecutorError::Columnar(format!(
368 "row_id column must be non-negative integer, got {}",
369 other.type_name()
370 )));
371 }
372 }
373 } else {
374 let rid = self.next_row_id;
375 self.next_row_id = self.next_row_id.saturating_add(1);
376 rid
377 }
378 }
379 RowIdMode::None => {
380 let rid = self.next_row_id;
381 self.next_row_id = self.next_row_id.saturating_add(1);
382 rid
383 }
384 };
385
386 Ok(Some(Row::new(row_id, values)))
387 }
388}
389
390impl RowIterator for ColumnarScanIterator {
391 fn next_row(&mut self) -> Option<Result<Row>> {
392 self.advance()
393 }
394
395 fn schema(&self) -> &[ColumnMetadata] {
396 &self.schema
397 }
398}
399
400pub fn create_columnar_scan_iterator<'txn, S: KVStore + 'txn>(
415 txn: &mut impl SqlTxn<'txn, S>,
416 table_meta: &TableMetadata,
417 scan: &ColumnarScan,
418) -> Result<ColumnarScanIterator> {
419 debug_assert_eq!(scan.table_id, table_meta.table_id);
420
421 let projected: Vec<usize> = if scan.projected_columns.is_empty() {
422 (0..table_meta.columns.len()).collect()
423 } else {
424 scan.projected_columns.clone()
425 };
426
427 let segment_ids = load_segment_index(txn, table_meta.table_id)?;
428
429 let row_id_col_idx = if table_meta.storage_options.row_id_mode == RowIdMode::Direct {
430 table_meta
431 .columns
432 .iter()
433 .position(|c| c.name.eq_ignore_ascii_case("row_id"))
434 } else {
435 None
436 };
437
438 let mut segments = Vec::with_capacity(segment_ids.len());
440 for segment_id in segment_ids {
441 let segment = load_segment(txn, table_meta.table_id, segment_id)?;
442 let reader =
443 SegmentReaderV2::open(Box::new(InMemorySegmentSource::new(segment.data.clone())))
444 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
445 let row_group_stats = load_row_group_stats(txn, table_meta.table_id, segment_id);
446
447 segments.push(LoadedSegment {
448 reader,
449 row_group_stats,
450 row_ids: segment.row_ids.clone(),
451 row_groups: segment.meta.row_groups.clone(),
452 });
453 }
454
455 Ok(ColumnarScanIterator {
456 segments,
457 segment_idx: 0,
458 row_group_idx: 0,
459 row_idx: 0,
460 current_batch: None,
461 projected,
462 schema: table_meta.columns.clone(),
463 table_meta: table_meta.clone(),
464 scan: scan.clone(),
465 row_id_col_idx,
466 next_row_id: 0,
467 })
468}
469
470pub fn execute_columnar_scan<'txn, S: KVStore + 'txn>(
472 txn: &mut impl SqlTxn<'txn, S>,
473 table_meta: &TableMetadata,
474 scan: &ColumnarScan,
475) -> Result<Vec<Row>> {
476 debug_assert_eq!(scan.table_id, table_meta.table_id);
477 let projected: Vec<usize> = if scan.projected_columns.is_empty() {
478 (0..table_meta.columns.len()).collect()
479 } else {
480 scan.projected_columns.clone()
481 };
482
483 let segment_ids = load_segment_index(txn, table_meta.table_id)?;
484 if segment_ids.is_empty() {
485 return Ok(Vec::new());
486 }
487
488 let row_id_col_idx = if table_meta.storage_options.row_id_mode == RowIdMode::Direct {
489 table_meta
490 .columns
491 .iter()
492 .position(|c| c.name.eq_ignore_ascii_case("row_id"))
493 } else {
494 None
495 };
496
497 let mut results = Vec::new();
498 let mut next_row_id = 0u64;
499 for segment_id in segment_ids {
500 let segment = load_segment(txn, table_meta.table_id, segment_id)?;
501 let reader =
502 SegmentReaderV2::open(Box::new(InMemorySegmentSource::new(segment.data.clone())))
503 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
504
505 let row_group_stats = load_row_group_stats(txn, table_meta.table_id, segment_id);
506 let row_group_count = segment.meta.row_groups.len();
507 for rg_index in 0..row_group_count {
508 let should_skip = match row_group_stats.as_ref() {
509 Some(stats) if stats.len() == row_group_count => {
510 scan.should_skip_row_group(&stats[rg_index])
511 }
512 _ => false,
513 };
514 if should_skip {
515 continue;
516 }
517
518 let batch = reader
519 .read_row_group_by_index(&projected, rg_index)
520 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
521 let batch = if !segment.row_ids.is_empty() {
522 if let Some(meta) = segment.meta.row_groups.get(rg_index) {
523 let start = meta.row_start as usize;
524 let end = start + meta.row_count as usize;
525 if end <= segment.row_ids.len() {
526 batch.with_row_ids(Some(segment.row_ids[start..end].to_vec()))
527 } else {
528 batch
529 }
530 } else {
531 batch
532 }
533 } else {
534 batch
535 };
536 append_rows_from_batch(
537 &mut results,
538 &batch,
539 table_meta,
540 &projected,
541 scan.residual_filter.as_ref(),
542 table_meta.storage_options.row_id_mode,
543 row_id_col_idx,
544 &mut next_row_id,
545 )?;
546 }
547 }
548
549 Ok(results)
550}
551
552pub fn execute_columnar_row_ids<'txn, S: KVStore + 'txn>(
557 txn: &mut impl SqlTxn<'txn, S>,
558 table_meta: &TableMetadata,
559 scan: &ColumnarScan,
560) -> Result<Vec<u64>> {
561 if table_meta.storage_options.storage_type != crate::catalog::StorageType::Columnar {
562 return Err(ExecutorError::Columnar(
563 "execute_columnar_row_ids requires columnar storage".into(),
564 ));
565 }
566
567 let mut needed: BTreeSet<usize> = scan.projected_columns.iter().copied().collect();
568 if let Some(pred) = &scan.residual_filter {
569 collect_column_indices(pred, &mut needed);
570 }
571 let projected: Vec<usize> = needed.into_iter().collect();
572
573 let segment_ids = load_segment_index(txn, table_meta.table_id)?;
574 if segment_ids.is_empty() {
575 return Ok(Vec::new());
576 }
577
578 let row_id_col_idx = if table_meta.storage_options.row_id_mode == RowIdMode::Direct {
579 table_meta
580 .columns
581 .iter()
582 .position(|c| c.name.eq_ignore_ascii_case("row_id"))
583 } else {
584 None
585 };
586
587 let mut results = Vec::new();
588 let mut next_row_id = 0u64;
589 for segment_id in segment_ids {
590 let segment = load_segment(txn, table_meta.table_id, segment_id)?;
591 let reader =
592 SegmentReaderV2::open(Box::new(InMemorySegmentSource::new(segment.data.clone())))
593 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
594
595 let row_group_stats = load_row_group_stats(txn, table_meta.table_id, segment_id);
596 let row_group_count = segment.meta.row_groups.len();
597 for rg_index in 0..row_group_count {
598 let should_skip = match row_group_stats.as_ref() {
599 Some(stats) if stats.len() == row_group_count => {
600 scan.should_skip_row_group(&stats[rg_index])
601 }
602 _ => false,
603 };
604 if should_skip {
605 continue;
606 }
607
608 let batch = reader
609 .read_row_group_by_index(&projected, rg_index)
610 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
611 let batch = if !segment.row_ids.is_empty() {
612 if let Some(meta) = segment.meta.row_groups.get(rg_index) {
613 let start = meta.row_start as usize;
614 let end = start + meta.row_count as usize;
615 if end <= segment.row_ids.len() {
616 batch.with_row_ids(Some(segment.row_ids[start..end].to_vec()))
617 } else {
618 batch
619 }
620 } else {
621 batch
622 }
623 } else {
624 batch
625 };
626
627 let row_count = batch.num_rows();
628 for row_idx in 0..row_count {
629 let mut values = vec![SqlValue::Null; table_meta.column_count()];
631 for (pos, &table_col_idx) in projected.iter().enumerate() {
632 let column = batch.columns.get(pos).ok_or_else(|| {
633 ExecutorError::Columnar("missing projected column".into())
634 })?;
635 let bitmap = batch.null_bitmaps.get(pos).and_then(|b| b.as_ref());
636 let value = value_from_column(
637 column,
638 bitmap,
639 row_idx,
640 &table_meta
641 .columns
642 .get(table_col_idx)
643 .ok_or_else(|| {
644 ExecutorError::Columnar("column index out of bounds".into())
645 })?
646 .data_type,
647 )?;
648 values[table_col_idx] = value;
649 }
650
651 if let Some(predicate) = scan.residual_filter.as_ref() {
652 let ctx = EvalContext::new(&values);
653 let keep = matches!(evaluate(predicate, &ctx)?, SqlValue::Boolean(true));
654 if !keep {
655 continue;
656 }
657 }
658
659 let row_id = match table_meta.storage_options.row_id_mode {
660 RowIdMode::Direct => {
661 if let Some(row_ids) = batch.row_ids.as_ref() {
662 *row_ids.get(row_idx).ok_or_else(|| {
663 ExecutorError::Columnar(
664 "row_id missing for row in row_id_mode=direct".into(),
665 )
666 })?
667 } else if let Some(idx) = row_id_col_idx {
668 let val = values.get(idx).ok_or_else(|| {
669 ExecutorError::Columnar(
670 "row_id column missing in projected values".into(),
671 )
672 })?;
673 match val {
674 SqlValue::Integer(v) if *v >= 0 => *v as u64,
675 SqlValue::BigInt(v) if *v >= 0 => *v as u64,
676 other => {
677 return Err(ExecutorError::Columnar(format!(
678 "row_id column must be non-negative integer, got {}",
679 other.type_name()
680 )));
681 }
682 }
683 } else {
684 let rid = next_row_id;
685 next_row_id = next_row_id.saturating_add(1);
686 rid
687 }
688 }
689 RowIdMode::None => {
690 let rid = next_row_id;
691 next_row_id = next_row_id.saturating_add(1);
692 rid
693 }
694 };
695 results.push(row_id);
696 }
697 }
698 }
699
700 Ok(results)
701}
702
703pub fn expr_to_pushdown(expr: &TypedExpr) -> Option<PushdownFilter> {
705 match &expr.kind {
706 TypedExprKind::BinaryOp { left, op, right } => match op {
707 BinaryOp::And => {
708 let l = expr_to_pushdown(left)?;
709 let r = expr_to_pushdown(right)?;
710 Some(PushdownFilter::And(vec![l, r]))
711 }
712 BinaryOp::Or => {
713 let l = expr_to_pushdown(left)?;
714 let r = expr_to_pushdown(right)?;
715 Some(PushdownFilter::Or(vec![l, r]))
716 }
717 BinaryOp::Eq => extract_eq(left, right),
718 BinaryOp::Lt | BinaryOp::LtEq | BinaryOp::Gt | BinaryOp::GtEq => {
719 extract_range(op, left, right)
720 }
721 _ => None,
722 },
723 TypedExprKind::Between {
724 expr,
725 low,
726 high,
727 negated,
728 } => {
729 if *negated {
730 return None;
731 }
732 let (column_idx, value_min, value_max) = match expr.kind {
733 TypedExprKind::ColumnRef { column_index, .. } => {
734 let low_v = literal_value(low)?;
735 let high_v = literal_value(high)?;
736 (column_index, low_v, high_v)
737 }
738 _ => return None,
739 };
740 Some(PushdownFilter::Range {
741 column_idx,
742 min: Some(value_min),
743 max: Some(value_max),
744 })
745 }
746 TypedExprKind::IsNull { expr, negated } => match expr.kind {
747 TypedExprKind::ColumnRef { column_index, .. } => Some(PushdownFilter::IsNull {
748 column_idx: column_index,
749 is_null: !negated,
750 }),
751 _ => None,
752 },
753 _ => None,
754 }
755}
756
757fn extract_eq(left: &TypedExpr, right: &TypedExpr) -> Option<PushdownFilter> {
758 if let Some((col_idx, value)) = extract_column_literal(left, right) {
759 return Some(PushdownFilter::Eq {
760 column_idx: col_idx,
761 value,
762 });
763 }
764 if let Some((col_idx, value)) = extract_column_literal(right, left) {
765 return Some(PushdownFilter::Eq {
766 column_idx: col_idx,
767 value,
768 });
769 }
770 None
771}
772
773fn extract_range(op: &BinaryOp, left: &TypedExpr, right: &TypedExpr) -> Option<PushdownFilter> {
774 match (
775 extract_column_literal(left, right),
776 extract_column_literal(right, left),
777 ) {
778 (Some((col_idx, value)), _) => match op {
779 BinaryOp::Lt | BinaryOp::LtEq => Some(PushdownFilter::Range {
780 column_idx: col_idx,
781 min: None,
782 max: Some(value),
783 }),
784 BinaryOp::Gt | BinaryOp::GtEq => Some(PushdownFilter::Range {
785 column_idx: col_idx,
786 min: Some(value),
787 max: None,
788 }),
789 _ => None,
790 },
791 (_, Some((col_idx, value))) => match op {
792 BinaryOp::Lt | BinaryOp::LtEq => Some(PushdownFilter::Range {
793 column_idx: col_idx,
794 min: Some(value),
795 max: None,
796 }),
797 BinaryOp::Gt | BinaryOp::GtEq => Some(PushdownFilter::Range {
798 column_idx: col_idx,
799 min: None,
800 max: Some(value),
801 }),
802 _ => None,
803 },
804 _ => None,
805 }
806}
807
808fn extract_column_literal(
809 column_expr: &TypedExpr,
810 literal_expr: &TypedExpr,
811) -> Option<(usize, SqlValue)> {
812 match column_expr.kind {
813 TypedExprKind::ColumnRef { column_index, .. } => {
814 let value = literal_value(literal_expr)?;
815 Some((column_index, value))
816 }
817 _ => None,
818 }
819}
820
821fn literal_value(expr: &TypedExpr) -> Option<SqlValue> {
822 match &expr.kind {
823 TypedExprKind::Literal(_) | TypedExprKind::VectorLiteral(_) => {
824 evaluate(expr, &EvalContext::new(&[])).ok()
825 }
826 _ => None,
827 }
828}
829
830pub fn projection_to_columns(projection: &Projection, table_meta: &TableMetadata) -> Vec<usize> {
832 match projection {
833 Projection::All(names) => names
834 .iter()
835 .filter_map(|name| table_meta.columns.iter().position(|c| &c.name == name))
836 .collect(),
837 Projection::Columns(cols) => {
838 let mut indices = BTreeSet::new();
839 for col in cols {
840 collect_column_indices(&col.expr, &mut indices);
841 }
842 if indices.is_empty() {
843 return (0..table_meta.columns.len()).collect();
844 }
845 indices
846 .into_iter()
847 .filter(|idx| *idx < table_meta.columns.len())
848 .collect()
849 }
850 }
851}
852
853pub fn build_columnar_scan_for_filter(
855 table_meta: &TableMetadata,
856 projection: Projection,
857 predicate: &TypedExpr,
858) -> ColumnarScan {
859 let mut projected_columns = projection_to_columns(&projection, table_meta);
860 let mut predicate_indices = BTreeSet::new();
861 collect_column_indices(predicate, &mut predicate_indices);
862 for idx in predicate_indices {
863 if !projected_columns.contains(&idx) {
864 projected_columns.push(idx);
865 }
866 }
867 projected_columns.sort_unstable();
868 let pushed_filter = expr_to_pushdown(predicate);
869 ColumnarScan::new(
870 table_meta.table_id,
871 projected_columns,
872 pushed_filter,
873 Some(predicate.clone()),
874 )
875}
876
877pub fn build_columnar_scan(table_meta: &TableMetadata, projection: &Projection) -> ColumnarScan {
879 let projected_columns = projection_to_columns(projection, table_meta);
880 ColumnarScan::new(table_meta.table_id, projected_columns, None, None)
881}
882
883fn collect_column_indices(expr: &TypedExpr, acc: &mut BTreeSet<usize>) {
885 match &expr.kind {
886 TypedExprKind::ColumnRef { column_index, .. } => {
887 acc.insert(*column_index);
888 }
889 TypedExprKind::BinaryOp { left, right, .. } => {
890 collect_column_indices(left, acc);
891 collect_column_indices(right, acc);
892 }
893 TypedExprKind::UnaryOp { operand, .. } => collect_column_indices(operand, acc),
894 TypedExprKind::Between {
895 expr, low, high, ..
896 } => {
897 collect_column_indices(expr, acc);
898 collect_column_indices(low, acc);
899 collect_column_indices(high, acc);
900 }
901 TypedExprKind::InList { expr, list, .. } => {
902 collect_column_indices(expr, acc);
903 for item in list {
904 collect_column_indices(item, acc);
905 }
906 }
907 TypedExprKind::IsNull { expr, .. } => collect_column_indices(expr, acc),
908 TypedExprKind::FunctionCall { args, .. } => {
909 for arg in args {
910 collect_column_indices(arg, acc);
911 }
912 }
913 _ => {}
914 }
915}
916
917fn load_segment_index<'txn, S: KVStore + 'txn>(
918 txn: &mut impl SqlTxn<'txn, S>,
919 table_id: u32,
920) -> Result<Vec<u64>> {
921 let key = key_layout::segment_index_key(table_id);
922 let bytes = txn.inner_mut().get(&key)?;
923 if let Some(raw) = bytes {
924 bincode_config()
925 .deserialize(&raw)
926 .map_err(|e| ExecutorError::Columnar(e.to_string()))
927 } else {
928 Ok(Vec::new())
929 }
930}
931
932fn load_segment<'txn, S: KVStore + 'txn>(
933 txn: &mut impl SqlTxn<'txn, S>,
934 table_id: u32,
935 segment_id: u64,
936) -> Result<ColumnSegmentV2> {
937 let key = key_layout::column_segment_key(table_id, segment_id, 0);
938 let bytes = txn
939 .inner_mut()
940 .get(&key)?
941 .ok_or_else(|| ExecutorError::Columnar(format!("segment {segment_id} missing")))?;
942 bincode_config()
943 .deserialize(&bytes)
944 .map_err(|e| ExecutorError::Columnar(e.to_string()))
945}
946
947fn load_row_group_stats<'txn, S: KVStore + 'txn>(
948 txn: &mut impl SqlTxn<'txn, S>,
949 table_id: u32,
950 segment_id: u64,
951) -> Option<Vec<RowGroupStatistics>> {
952 let key = key_layout::row_group_stats_key(table_id, segment_id);
953 match txn.inner_mut().get(&key) {
954 Ok(Some(bytes)) => bincode_config().deserialize(&bytes).ok(),
955 Ok(None) => None,
956 Err(_) => None,
957 }
958}
959
960#[allow(clippy::too_many_arguments)]
961fn append_rows_from_batch(
962 out: &mut Vec<Row>,
963 batch: &alopex_core::columnar::segment_v2::RecordBatch,
964 table_meta: &TableMetadata,
965 projected: &[usize],
966 residual_filter: Option<&TypedExpr>,
967 row_id_mode: RowIdMode,
968 row_id_col_idx: Option<usize>,
969 next_row_id: &mut u64,
970) -> Result<()> {
971 if batch.columns.len() != projected.len() {
972 return Err(ExecutorError::Columnar(format!(
973 "projected column count mismatch: requested {}, got {}",
974 projected.len(),
975 batch.columns.len()
976 )));
977 }
978
979 let row_count = batch.num_rows();
980 for row_idx in 0..row_count {
981 let mut values = vec![SqlValue::Null; table_meta.column_count()];
982 for (pos, &table_col_idx) in projected.iter().enumerate() {
983 let column = batch
984 .columns
985 .get(pos)
986 .ok_or_else(|| ExecutorError::Columnar("missing projected column".into()))?;
987 let bitmap = batch.null_bitmaps.get(pos).and_then(|b| b.as_ref());
988 let value = value_from_column(
989 column,
990 bitmap,
991 row_idx,
992 &table_meta
993 .columns
994 .get(table_col_idx)
995 .ok_or_else(|| ExecutorError::Columnar("column index out of bounds".into()))?
996 .data_type,
997 )?;
998 values[table_col_idx] = value;
999 }
1000
1001 if let Some(predicate) = residual_filter {
1002 let ctx = EvalContext::new(&values);
1003 let keep = matches!(evaluate(predicate, &ctx)?, SqlValue::Boolean(true));
1004 if !keep {
1005 continue;
1006 }
1007 }
1008
1009 let row_id = match row_id_mode {
1010 RowIdMode::Direct => {
1011 if let Some(row_ids) = batch.row_ids.as_ref() {
1012 *row_ids.get(row_idx).ok_or_else(|| {
1013 ExecutorError::Columnar(
1014 "row_id missing for row in row_id_mode=direct".into(),
1015 )
1016 })?
1017 } else if let Some(idx) = row_id_col_idx {
1018 let val = values.get(idx).ok_or_else(|| {
1019 ExecutorError::Columnar("row_id column missing in projected values".into())
1020 })?;
1021 match val {
1022 SqlValue::Integer(v) if *v >= 0 => *v as u64,
1023 SqlValue::BigInt(v) if *v >= 0 => *v as u64,
1024 other => {
1025 return Err(ExecutorError::Columnar(format!(
1026 "row_id column must be non-negative integer, got {}",
1027 other.type_name()
1028 )));
1029 }
1030 }
1031 } else {
1032 let rid = *next_row_id;
1033 *next_row_id = next_row_id.saturating_add(1);
1034 rid
1035 }
1036 }
1037 RowIdMode::None => {
1038 let rid = *next_row_id;
1039 *next_row_id = next_row_id.saturating_add(1);
1040 rid
1041 }
1042 };
1043 out.push(Row::new(row_id, values));
1044 }
1045
1046 Ok(())
1047}
1048
1049fn value_from_column(
1050 column: &Column,
1051 bitmap: Option<&Bitmap>,
1052 row_idx: usize,
1053 ty: &ResolvedType,
1054) -> Result<SqlValue> {
1055 if let Some(bm) = bitmap
1056 && !bm.get(row_idx)
1057 {
1058 return Ok(SqlValue::Null);
1059 }
1060
1061 match (ty, column) {
1062 (ResolvedType::Integer, Column::Int64(values)) => {
1063 let v = *values
1064 .get(row_idx)
1065 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1066 Ok(SqlValue::Integer(v as i32))
1067 }
1068 (ResolvedType::BigInt | ResolvedType::Timestamp, Column::Int64(values)) => {
1069 let v = *values
1070 .get(row_idx)
1071 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1072 if matches!(ty, ResolvedType::Timestamp) {
1073 Ok(SqlValue::Timestamp(v))
1074 } else {
1075 Ok(SqlValue::BigInt(v))
1076 }
1077 }
1078 (ResolvedType::Float, Column::Float32(values)) => {
1079 let v = *values
1080 .get(row_idx)
1081 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1082 Ok(SqlValue::Float(v))
1083 }
1084 (ResolvedType::Double, Column::Float64(values)) => {
1085 let v = *values
1086 .get(row_idx)
1087 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1088 Ok(SqlValue::Double(v))
1089 }
1090 (ResolvedType::Boolean, Column::Bool(values)) => {
1091 let v = *values
1092 .get(row_idx)
1093 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1094 Ok(SqlValue::Boolean(v))
1095 }
1096 (ResolvedType::Text, Column::Binary(values)) => {
1097 let raw = values
1098 .get(row_idx)
1099 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1100 String::from_utf8(raw.clone())
1101 .map(SqlValue::Text)
1102 .map_err(|e| ExecutorError::Columnar(e.to_string()))
1103 }
1104 (ResolvedType::Blob, Column::Binary(values)) => {
1105 let raw = values
1106 .get(row_idx)
1107 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1108 Ok(SqlValue::Blob(raw.clone()))
1109 }
1110 (ResolvedType::Vector { .. }, Column::Fixed { values, .. }) => {
1111 let raw = values
1112 .get(row_idx)
1113 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1114 if raw.len() % 4 != 0 {
1115 return Err(ExecutorError::Columnar(
1116 "invalid vector byte length in columnar segment".into(),
1117 ));
1118 }
1119 let floats: Vec<f32> = raw
1120 .chunks_exact(4)
1121 .map(|bytes| f32::from_le_bytes(bytes.try_into().unwrap()))
1122 .collect();
1123 Ok(SqlValue::Vector(floats))
1124 }
1125 (_, Column::Binary(values)) => {
1126 let raw = values
1127 .get(row_idx)
1128 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1129 Ok(SqlValue::Blob(raw.clone()))
1130 }
1131 _ => Err(ExecutorError::Columnar(
1132 "unsupported column type for columnar read".into(),
1133 )),
1134 }
1135}
1136#[cfg(test)]
1137mod tests {
1138 use super::*;
1139 use crate::ast::expr::Literal;
1140 use crate::catalog::{ColumnMetadata, RowIdMode, TableMetadata};
1141 use crate::columnar::statistics::ColumnStatistics;
1142 use crate::planner::typed_expr::TypedExpr;
1143 use crate::planner::typed_expr::TypedExprKind;
1144 use crate::planner::types::ResolvedType;
1145 use crate::storage::TxnBridge;
1146 use alopex_core::kv::memory::MemoryKV;
1147 use bincode::config::Options;
1148 use std::sync::Arc;
1149
1150 #[test]
1151 fn evaluate_pushdown_eq_prunes_out_of_range() {
1152 let stats = RowGroupStatistics {
1153 row_count: 3,
1154 columns: vec![ColumnStatistics {
1155 min: SqlValue::Integer(1),
1156 max: SqlValue::Integer(3),
1157 null_count: 0,
1158 total_count: 3,
1159 distinct_count: None,
1160 }],
1161 row_id_min: None,
1162 row_id_max: None,
1163 };
1164 let filter = PushdownFilter::Eq {
1165 column_idx: 0,
1166 value: SqlValue::Integer(10),
1167 };
1168 assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
1169 }
1170
1171 #[test]
1172 fn evaluate_pushdown_range_allows_overlap() {
1173 let stats = RowGroupStatistics {
1174 row_count: 3,
1175 columns: vec![ColumnStatistics {
1176 min: SqlValue::Integer(5),
1177 max: SqlValue::Integer(10),
1178 null_count: 0,
1179 total_count: 3,
1180 distinct_count: None,
1181 }],
1182 row_id_min: None,
1183 row_id_max: None,
1184 };
1185 let filter = PushdownFilter::Range {
1186 column_idx: 0,
1187 min: Some(SqlValue::Integer(8)),
1188 max: Some(SqlValue::Integer(12)),
1189 };
1190 assert!(!ColumnarScan::evaluate_pushdown(&filter, &stats));
1191 }
1192
1193 #[test]
1194 fn evaluate_pushdown_is_null_skips_when_no_nulls() {
1195 let stats = RowGroupStatistics {
1196 row_count: 2,
1197 columns: vec![ColumnStatistics {
1198 min: SqlValue::Integer(1),
1199 max: SqlValue::Integer(2),
1200 null_count: 0,
1201 total_count: 2,
1202 distinct_count: None,
1203 }],
1204 row_id_min: None,
1205 row_id_max: None,
1206 };
1207 let filter = PushdownFilter::IsNull {
1208 column_idx: 0,
1209 is_null: true,
1210 };
1211 assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
1212 }
1213
1214 #[test]
1215 fn evaluate_pushdown_is_not_null_skips_when_all_null() {
1216 let stats = RowGroupStatistics {
1217 row_count: 2,
1218 columns: vec![ColumnStatistics {
1219 min: SqlValue::Null,
1220 max: SqlValue::Null,
1221 null_count: 2,
1222 total_count: 2,
1223 distinct_count: None,
1224 }],
1225 row_id_min: None,
1226 row_id_max: None,
1227 };
1228 let filter = PushdownFilter::IsNull {
1229 column_idx: 0,
1230 is_null: false,
1231 };
1232 assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
1233 }
1234
1235 #[test]
1236 fn evaluate_pushdown_and_prunes_if_any_branch_skips() {
1237 let stats = RowGroupStatistics {
1238 row_count: 3,
1239 columns: vec![ColumnStatistics {
1240 min: SqlValue::Integer(1),
1241 max: SqlValue::Integer(3),
1242 null_count: 0,
1243 total_count: 3,
1244 distinct_count: None,
1245 }],
1246 row_id_min: None,
1247 row_id_max: None,
1248 };
1249 let filter = PushdownFilter::And(vec![
1250 PushdownFilter::Eq {
1251 column_idx: 0,
1252 value: SqlValue::Integer(10),
1253 },
1254 PushdownFilter::Eq {
1255 column_idx: 0,
1256 value: SqlValue::Integer(2),
1257 },
1258 ]);
1259 assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
1260 }
1261
1262 #[test]
1263 fn evaluate_pushdown_or_keeps_if_any_branch_may_match() {
1264 let stats = RowGroupStatistics {
1265 row_count: 3,
1266 columns: vec![ColumnStatistics {
1267 min: SqlValue::Integer(1),
1268 max: SqlValue::Integer(3),
1269 null_count: 0,
1270 total_count: 3,
1271 distinct_count: None,
1272 }],
1273 row_id_min: None,
1274 row_id_max: None,
1275 };
1276 let filter = PushdownFilter::Or(vec![
1277 PushdownFilter::Eq {
1278 column_idx: 0,
1279 value: SqlValue::Integer(10),
1280 },
1281 PushdownFilter::Eq {
1282 column_idx: 0,
1283 value: SqlValue::Integer(2),
1284 },
1285 ]);
1286 assert!(!ColumnarScan::evaluate_pushdown(&filter, &stats));
1287 }
1288
1289 #[test]
1290 fn expr_to_pushdown_converts_eq() {
1291 let expr = TypedExpr {
1292 kind: TypedExprKind::BinaryOp {
1293 left: Box::new(TypedExpr::column_ref(
1294 "t".into(),
1295 "c".into(),
1296 0,
1297 ResolvedType::Integer,
1298 crate::Span::default(),
1299 )),
1300 op: BinaryOp::Eq,
1301 right: Box::new(TypedExpr::literal(
1302 Literal::Number("1".into()),
1303 ResolvedType::Integer,
1304 crate::Span::default(),
1305 )),
1306 },
1307 resolved_type: ResolvedType::Boolean,
1308 span: crate::Span::default(),
1309 };
1310 let filter = expr_to_pushdown(&expr).unwrap();
1311 assert_eq!(
1312 filter,
1313 PushdownFilter::Eq {
1314 column_idx: 0,
1315 value: SqlValue::Integer(1)
1316 }
1317 );
1318 }
1319
1320 #[test]
1321 fn execute_columnar_scan_applies_residual_filter() {
1322 let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
1323 let mut table = TableMetadata::new(
1324 "users",
1325 vec![
1326 ColumnMetadata::new("id", ResolvedType::Integer),
1327 ColumnMetadata::new("name", ResolvedType::Text),
1328 ],
1329 )
1330 .with_table_id(1);
1331 table.storage_options.storage_type = crate::catalog::StorageType::Columnar;
1332
1333 let schema = alopex_core::columnar::segment_v2::Schema {
1335 columns: vec![
1336 alopex_core::columnar::segment_v2::ColumnSchema {
1337 name: "id".into(),
1338 logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1339 nullable: false,
1340 fixed_len: None,
1341 },
1342 alopex_core::columnar::segment_v2::ColumnSchema {
1343 name: "name".into(),
1344 logical_type: alopex_core::columnar::encoding::LogicalType::Binary,
1345 nullable: false,
1346 fixed_len: None,
1347 },
1348 ],
1349 };
1350 let batch = alopex_core::columnar::segment_v2::RecordBatch::new(
1351 schema.clone(),
1352 vec![
1353 alopex_core::columnar::encoding::Column::Int64(vec![1]),
1354 alopex_core::columnar::encoding::Column::Binary(vec![b"alice".to_vec()]),
1355 ],
1356 vec![None, None],
1357 );
1358 let mut writer =
1359 alopex_core::columnar::segment_v2::SegmentWriterV2::new(Default::default());
1360 writer.write_batch(batch).unwrap();
1361 let segment = writer.finish().unwrap();
1362
1363 let stats = vec![crate::columnar::statistics::compute_row_group_statistics(
1364 &[vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]],
1365 )];
1366
1367 let mut txn = bridge.begin_write().unwrap();
1368 let segment_bytes = alopex_core::storage::format::bincode_config()
1369 .serialize(&segment)
1370 .unwrap();
1371 let meta_bytes = alopex_core::storage::format::bincode_config()
1372 .serialize(&segment.meta)
1373 .unwrap();
1374 let stats_bytes = alopex_core::storage::format::bincode_config()
1375 .serialize(&stats)
1376 .unwrap();
1377 txn.inner_mut()
1378 .put(
1379 alopex_core::columnar::kvs_bridge::key_layout::column_segment_key(1, 0, 0),
1380 segment_bytes,
1381 )
1382 .unwrap();
1383 txn.inner_mut()
1384 .put(
1385 alopex_core::columnar::kvs_bridge::key_layout::statistics_key(1, 0),
1386 meta_bytes,
1387 )
1388 .unwrap();
1389 txn.inner_mut()
1390 .put(
1391 alopex_core::columnar::kvs_bridge::key_layout::row_group_stats_key(1, 0),
1392 stats_bytes,
1393 )
1394 .unwrap();
1395 let index_bytes = alopex_core::storage::format::bincode_config()
1396 .serialize(&vec![0u64])
1397 .unwrap();
1398 txn.inner_mut()
1399 .put(
1400 alopex_core::columnar::kvs_bridge::key_layout::segment_index_key(1),
1401 index_bytes,
1402 )
1403 .unwrap();
1404 txn.commit().unwrap();
1405
1406 let scan = ColumnarScan::new(
1407 table.table_id,
1408 vec![0, 1],
1409 Some(PushdownFilter::Eq {
1410 column_idx: 0,
1411 value: SqlValue::Integer(1),
1412 }),
1413 Some(TypedExpr {
1414 kind: TypedExprKind::BinaryOp {
1415 left: Box::new(TypedExpr::column_ref(
1416 "users".into(),
1417 "id".into(),
1418 0,
1419 ResolvedType::Integer,
1420 crate::Span::default(),
1421 )),
1422 op: BinaryOp::Eq,
1423 right: Box::new(TypedExpr::literal(
1424 Literal::Number("1".into()),
1425 ResolvedType::Integer,
1426 crate::Span::default(),
1427 )),
1428 },
1429 resolved_type: ResolvedType::Boolean,
1430 span: crate::Span::default(),
1431 }),
1432 );
1433
1434 let mut read_txn = bridge.begin_read().unwrap();
1435 let rows = execute_columnar_scan(&mut read_txn, &table, &scan).unwrap();
1436 assert_eq!(rows.len(), 1);
1437 assert_eq!(rows[0].values[1], SqlValue::Text("alice".into()));
1438 }
1439
1440 #[test]
1441 fn rowid_mode_direct_prefers_rowid_column() {
1442 let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
1443 let mut table = TableMetadata::new(
1444 "items",
1445 vec![
1446 ColumnMetadata::new("row_id", ResolvedType::BigInt),
1447 ColumnMetadata::new("val", ResolvedType::Integer),
1448 ],
1449 )
1450 .with_table_id(20);
1451 table.storage_options.storage_type = crate::catalog::StorageType::Columnar;
1452 table.storage_options.row_id_mode = RowIdMode::Direct;
1453
1454 let schema = alopex_core::columnar::segment_v2::Schema {
1455 columns: vec![
1456 alopex_core::columnar::segment_v2::ColumnSchema {
1457 name: "row_id".into(),
1458 logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1459 nullable: false,
1460 fixed_len: None,
1461 },
1462 alopex_core::columnar::segment_v2::ColumnSchema {
1463 name: "val".into(),
1464 logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1465 nullable: false,
1466 fixed_len: None,
1467 },
1468 ],
1469 };
1470 let batch = alopex_core::columnar::segment_v2::RecordBatch::new(
1471 schema.clone(),
1472 vec![
1473 alopex_core::columnar::encoding::Column::Int64(vec![999]),
1474 alopex_core::columnar::encoding::Column::Int64(vec![7]),
1475 ],
1476 vec![None, None],
1477 );
1478 let mut writer =
1479 alopex_core::columnar::segment_v2::SegmentWriterV2::new(Default::default());
1480 writer.write_batch(batch).unwrap();
1481 let segment = writer.finish().unwrap();
1482 let stats = vec![crate::columnar::statistics::compute_row_group_statistics(
1483 &[vec![SqlValue::BigInt(999), SqlValue::Integer(7)]],
1484 )];
1485
1486 persist_segment_for_test(&bridge, table.table_id, &segment, &stats);
1487
1488 let scan = ColumnarScan::new(table.table_id, vec![0, 1], None, None);
1489 let mut read_txn = bridge.begin_read().unwrap();
1490 let rows = execute_columnar_scan(&mut read_txn, &table, &scan).unwrap();
1491 assert_eq!(rows.len(), 1);
1492 assert_eq!(rows[0].row_id, 999);
1493 assert_eq!(rows[0].values[1], SqlValue::Integer(7));
1494 }
1495
1496 #[test]
1497 fn rowid_mode_none_uses_position() {
1498 let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
1499 let mut table = TableMetadata::new(
1500 "items",
1501 vec![ColumnMetadata::new("val", ResolvedType::Integer)],
1502 )
1503 .with_table_id(21);
1504 table.storage_options.storage_type = crate::catalog::StorageType::Columnar;
1505 table.storage_options.row_id_mode = RowIdMode::Direct;
1506
1507 let schema = alopex_core::columnar::segment_v2::Schema {
1508 columns: vec![alopex_core::columnar::segment_v2::ColumnSchema {
1509 name: "val".into(),
1510 logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1511 nullable: false,
1512 fixed_len: None,
1513 }],
1514 };
1515 let batch = alopex_core::columnar::segment_v2::RecordBatch::new(
1516 schema.clone(),
1517 vec![alopex_core::columnar::encoding::Column::Int64(vec![3, 4])],
1518 vec![None],
1519 );
1520 let mut writer =
1521 alopex_core::columnar::segment_v2::SegmentWriterV2::new(Default::default());
1522 writer.write_batch(batch).unwrap();
1523 let segment = writer.finish().unwrap();
1524 let stats = vec![crate::columnar::statistics::compute_row_group_statistics(
1525 &[vec![SqlValue::Integer(3)], vec![SqlValue::Integer(4)]],
1526 )];
1527
1528 persist_segment_for_test(&bridge, table.table_id, &segment, &stats);
1529
1530 let scan = ColumnarScan::new(table.table_id, vec![0], None, None);
1531 let mut read_txn = bridge.begin_read().unwrap();
1532 let rows = execute_columnar_scan(&mut read_txn, &table, &scan).unwrap();
1533 assert_eq!(rows.len(), 2);
1534 assert_eq!(rows[0].row_id, 0);
1535 assert_eq!(rows[1].row_id, 1);
1536 }
1537
1538 fn persist_segment_for_test(
1539 bridge: &TxnBridge<MemoryKV>,
1540 table_id: u32,
1541 segment: &alopex_core::columnar::segment_v2::ColumnSegmentV2,
1542 row_group_stats: &[crate::columnar::statistics::RowGroupStatistics],
1543 ) {
1544 let mut txn = bridge.begin_write().unwrap();
1545 let segment_bytes = alopex_core::storage::format::bincode_config()
1546 .serialize(segment)
1547 .unwrap();
1548 let meta_bytes = alopex_core::storage::format::bincode_config()
1549 .serialize(&segment.meta)
1550 .unwrap();
1551 let stats_bytes = alopex_core::storage::format::bincode_config()
1552 .serialize(row_group_stats)
1553 .unwrap();
1554 txn.inner_mut()
1555 .put(
1556 alopex_core::columnar::kvs_bridge::key_layout::column_segment_key(table_id, 0, 0),
1557 segment_bytes,
1558 )
1559 .unwrap();
1560 txn.inner_mut()
1561 .put(
1562 alopex_core::columnar::kvs_bridge::key_layout::statistics_key(table_id, 0),
1563 meta_bytes,
1564 )
1565 .unwrap();
1566 txn.inner_mut()
1567 .put(
1568 alopex_core::columnar::kvs_bridge::key_layout::row_group_stats_key(table_id, 0),
1569 stats_bytes,
1570 )
1571 .unwrap();
1572 let index_bytes = alopex_core::storage::format::bincode_config()
1573 .serialize(&vec![0u64])
1574 .unwrap();
1575 txn.inner_mut()
1576 .put(
1577 alopex_core::columnar::kvs_bridge::key_layout::segment_index_key(table_id),
1578 index_bytes,
1579 )
1580 .unwrap();
1581 txn.commit().unwrap();
1582 }
1583}