1use alopex_core::columnar::encoding::Column;
2use alopex_core::columnar::encoding_v2::Bitmap;
3use alopex_core::columnar::kvs_bridge::key_layout;
4use alopex_core::columnar::segment_v2::{ColumnSegmentV2, InMemorySegmentSource, SegmentReaderV2};
5use alopex_core::kv::{KVStore, KVTransaction};
6use alopex_core::storage::format::bincode_config;
7use bincode::config::Options;
8
9use crate::ast::expr::BinaryOp;
10use crate::catalog::{RowIdMode, TableMetadata};
11use crate::columnar::statistics::RowGroupStatistics;
12use crate::executor::evaluator::{EvalContext, evaluate};
13use crate::executor::{ExecutorError, Result, Row};
14use crate::planner::typed_expr::{Projection, TypedExpr, TypedExprKind};
15use crate::planner::types::ResolvedType;
16use crate::storage::{SqlTxn, SqlValue};
17use std::collections::BTreeSet;
18
19#[derive(Debug, Clone)]
21pub struct ColumnarScan {
22 pub table_id: u32,
23 pub projected_columns: Vec<usize>,
24 pub pushed_filter: Option<PushdownFilter>,
25 pub residual_filter: Option<TypedExpr>,
26}
27
28#[derive(Debug, Clone, PartialEq)]
30pub enum PushdownFilter {
31 Eq {
32 column_idx: usize,
33 value: SqlValue,
34 },
35 Range {
36 column_idx: usize,
37 min: Option<SqlValue>,
38 max: Option<SqlValue>,
39 },
40 IsNull {
41 column_idx: usize,
42 is_null: bool,
43 },
44 And(Vec<PushdownFilter>),
45 Or(Vec<PushdownFilter>),
46}
47
48impl ColumnarScan {
49 pub fn new(
50 table_id: u32,
51 projected_columns: Vec<usize>,
52 pushed_filter: Option<PushdownFilter>,
53 residual_filter: Option<TypedExpr>,
54 ) -> Self {
55 Self {
56 table_id,
57 projected_columns,
58 pushed_filter,
59 residual_filter,
60 }
61 }
62
63 pub fn should_skip_row_group(&self, stats: &RowGroupStatistics) -> bool {
65 match &self.pushed_filter {
66 None => false,
67 Some(filter) => Self::evaluate_pushdown(filter, stats),
68 }
69 }
70
71 pub fn evaluate_pushdown(filter: &PushdownFilter, stats: &RowGroupStatistics) -> bool {
73 match filter {
74 PushdownFilter::Eq { column_idx, value } => match stats.columns.get(*column_idx) {
75 Some(col_stats) => {
76 if col_stats.total_count == 0 {
77 return true;
78 }
79 if matches!(
80 value.partial_cmp(&col_stats.min),
81 Some(std::cmp::Ordering::Less)
82 ) {
83 return true;
84 }
85 matches!(
86 value.partial_cmp(&col_stats.max),
87 Some(std::cmp::Ordering::Greater)
88 )
89 }
90 None => false,
91 },
92
93 PushdownFilter::Range {
94 column_idx,
95 min,
96 max,
97 } => match stats.columns.get(*column_idx) {
98 Some(col_stats) => {
99 if col_stats.total_count == 0 {
100 return true;
101 }
102 if let Some(filter_min) = min
103 && matches!(
104 col_stats.max.partial_cmp(filter_min),
105 Some(std::cmp::Ordering::Less)
106 )
107 {
108 return true;
109 }
110 if let Some(filter_max) = max
111 && matches!(
112 col_stats.min.partial_cmp(filter_max),
113 Some(std::cmp::Ordering::Greater)
114 )
115 {
116 return true;
117 }
118 false
119 }
120 None => false,
121 },
122
123 PushdownFilter::IsNull {
124 column_idx,
125 is_null,
126 } => match stats.columns.get(*column_idx) {
127 Some(col_stats) => {
128 if *is_null {
129 col_stats.null_count == 0
130 } else {
131 col_stats.null_count == col_stats.total_count
132 }
133 }
134 None => false,
135 },
136
137 PushdownFilter::And(filters) => {
138 if filters.is_empty() {
139 return false;
140 }
141 filters.iter().any(|f| Self::evaluate_pushdown(f, stats))
142 }
143
144 PushdownFilter::Or(filters) => {
145 if filters.is_empty() {
146 return false;
147 }
148 filters.iter().all(|f| Self::evaluate_pushdown(f, stats))
149 }
150 }
151 }
152}
153
154pub fn execute_columnar_scan<'txn, S: KVStore + 'txn>(
156 txn: &mut impl SqlTxn<'txn, S>,
157 table_meta: &TableMetadata,
158 scan: &ColumnarScan,
159) -> Result<Vec<Row>> {
160 debug_assert_eq!(scan.table_id, table_meta.table_id);
161 let projected: Vec<usize> = if scan.projected_columns.is_empty() {
162 (0..table_meta.columns.len()).collect()
163 } else {
164 scan.projected_columns.clone()
165 };
166
167 let segment_ids = load_segment_index(txn, table_meta.table_id)?;
168 if segment_ids.is_empty() {
169 return Ok(Vec::new());
170 }
171
172 let row_id_col_idx = if table_meta.storage_options.row_id_mode == RowIdMode::Direct {
173 table_meta
174 .columns
175 .iter()
176 .position(|c| c.name.eq_ignore_ascii_case("row_id"))
177 } else {
178 None
179 };
180
181 let mut results = Vec::new();
182 let mut next_row_id = 0u64;
183 for segment_id in segment_ids {
184 let segment = load_segment(txn, table_meta.table_id, segment_id)?;
185 let reader =
186 SegmentReaderV2::open(Box::new(InMemorySegmentSource::new(segment.data.clone())))
187 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
188
189 let row_group_stats = load_row_group_stats(txn, table_meta.table_id, segment_id);
190 let row_group_count = segment.meta.row_groups.len();
191 for rg_index in 0..row_group_count {
192 let should_skip = match row_group_stats.as_ref() {
193 Some(stats) if stats.len() == row_group_count => {
194 scan.should_skip_row_group(&stats[rg_index])
195 }
196 _ => false,
197 };
198 if should_skip {
199 continue;
200 }
201
202 let batch = reader
203 .read_row_group_by_index(&projected, rg_index)
204 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
205 let batch = if !segment.row_ids.is_empty() {
206 if let Some(meta) = segment.meta.row_groups.get(rg_index) {
207 let start = meta.row_start as usize;
208 let end = start + meta.row_count as usize;
209 if end <= segment.row_ids.len() {
210 batch.with_row_ids(Some(segment.row_ids[start..end].to_vec()))
211 } else {
212 batch
213 }
214 } else {
215 batch
216 }
217 } else {
218 batch
219 };
220 append_rows_from_batch(
221 &mut results,
222 &batch,
223 table_meta,
224 &projected,
225 scan.residual_filter.as_ref(),
226 table_meta.storage_options.row_id_mode,
227 row_id_col_idx,
228 &mut next_row_id,
229 )?;
230 }
231 }
232
233 Ok(results)
234}
235
236pub fn execute_columnar_row_ids<'txn, S: KVStore + 'txn>(
241 txn: &mut impl SqlTxn<'txn, S>,
242 table_meta: &TableMetadata,
243 scan: &ColumnarScan,
244) -> Result<Vec<u64>> {
245 if table_meta.storage_options.storage_type != crate::catalog::StorageType::Columnar {
246 return Err(ExecutorError::Columnar(
247 "execute_columnar_row_ids requires columnar storage".into(),
248 ));
249 }
250
251 let mut needed: BTreeSet<usize> = scan.projected_columns.iter().copied().collect();
252 if let Some(pred) = &scan.residual_filter {
253 collect_column_indices(pred, &mut needed);
254 }
255 let projected: Vec<usize> = needed.into_iter().collect();
256
257 let segment_ids = load_segment_index(txn, table_meta.table_id)?;
258 if segment_ids.is_empty() {
259 return Ok(Vec::new());
260 }
261
262 let row_id_col_idx = if table_meta.storage_options.row_id_mode == RowIdMode::Direct {
263 table_meta
264 .columns
265 .iter()
266 .position(|c| c.name.eq_ignore_ascii_case("row_id"))
267 } else {
268 None
269 };
270
271 let mut results = Vec::new();
272 let mut next_row_id = 0u64;
273 for segment_id in segment_ids {
274 let segment = load_segment(txn, table_meta.table_id, segment_id)?;
275 let reader =
276 SegmentReaderV2::open(Box::new(InMemorySegmentSource::new(segment.data.clone())))
277 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
278
279 let row_group_stats = load_row_group_stats(txn, table_meta.table_id, segment_id);
280 let row_group_count = segment.meta.row_groups.len();
281 for rg_index in 0..row_group_count {
282 let should_skip = match row_group_stats.as_ref() {
283 Some(stats) if stats.len() == row_group_count => {
284 scan.should_skip_row_group(&stats[rg_index])
285 }
286 _ => false,
287 };
288 if should_skip {
289 continue;
290 }
291
292 let batch = reader
293 .read_row_group_by_index(&projected, rg_index)
294 .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
295 let batch = if !segment.row_ids.is_empty() {
296 if let Some(meta) = segment.meta.row_groups.get(rg_index) {
297 let start = meta.row_start as usize;
298 let end = start + meta.row_count as usize;
299 if end <= segment.row_ids.len() {
300 batch.with_row_ids(Some(segment.row_ids[start..end].to_vec()))
301 } else {
302 batch
303 }
304 } else {
305 batch
306 }
307 } else {
308 batch
309 };
310
311 let row_count = batch.num_rows();
312 for row_idx in 0..row_count {
313 let mut values = vec![SqlValue::Null; table_meta.column_count()];
315 for (pos, &table_col_idx) in projected.iter().enumerate() {
316 let column = batch.columns.get(pos).ok_or_else(|| {
317 ExecutorError::Columnar("missing projected column".into())
318 })?;
319 let bitmap = batch.null_bitmaps.get(pos).and_then(|b| b.as_ref());
320 let value = value_from_column(
321 column,
322 bitmap,
323 row_idx,
324 &table_meta
325 .columns
326 .get(table_col_idx)
327 .ok_or_else(|| {
328 ExecutorError::Columnar("column index out of bounds".into())
329 })?
330 .data_type,
331 )?;
332 values[table_col_idx] = value;
333 }
334
335 if let Some(predicate) = scan.residual_filter.as_ref() {
336 let ctx = EvalContext::new(&values);
337 let keep = matches!(evaluate(predicate, &ctx)?, SqlValue::Boolean(true));
338 if !keep {
339 continue;
340 }
341 }
342
343 let row_id = match table_meta.storage_options.row_id_mode {
344 RowIdMode::Direct => {
345 if let Some(row_ids) = batch.row_ids.as_ref() {
346 *row_ids.get(row_idx).ok_or_else(|| {
347 ExecutorError::Columnar(
348 "row_id missing for row in row_id_mode=direct".into(),
349 )
350 })?
351 } else if let Some(idx) = row_id_col_idx {
352 let val = values.get(idx).ok_or_else(|| {
353 ExecutorError::Columnar(
354 "row_id column missing in projected values".into(),
355 )
356 })?;
357 match val {
358 SqlValue::Integer(v) if *v >= 0 => *v as u64,
359 SqlValue::BigInt(v) if *v >= 0 => *v as u64,
360 other => {
361 return Err(ExecutorError::Columnar(format!(
362 "row_id column must be non-negative integer, got {}",
363 other.type_name()
364 )));
365 }
366 }
367 } else {
368 let rid = next_row_id;
369 next_row_id = next_row_id.saturating_add(1);
370 rid
371 }
372 }
373 RowIdMode::None => {
374 let rid = next_row_id;
375 next_row_id = next_row_id.saturating_add(1);
376 rid
377 }
378 };
379 results.push(row_id);
380 }
381 }
382 }
383
384 Ok(results)
385}
386
387pub fn expr_to_pushdown(expr: &TypedExpr) -> Option<PushdownFilter> {
389 match &expr.kind {
390 TypedExprKind::BinaryOp { left, op, right } => match op {
391 BinaryOp::And => {
392 let l = expr_to_pushdown(left)?;
393 let r = expr_to_pushdown(right)?;
394 Some(PushdownFilter::And(vec![l, r]))
395 }
396 BinaryOp::Or => {
397 let l = expr_to_pushdown(left)?;
398 let r = expr_to_pushdown(right)?;
399 Some(PushdownFilter::Or(vec![l, r]))
400 }
401 BinaryOp::Eq => extract_eq(left, right),
402 BinaryOp::Lt | BinaryOp::LtEq | BinaryOp::Gt | BinaryOp::GtEq => {
403 extract_range(op, left, right)
404 }
405 _ => None,
406 },
407 TypedExprKind::Between {
408 expr,
409 low,
410 high,
411 negated,
412 } => {
413 if *negated {
414 return None;
415 }
416 let (column_idx, value_min, value_max) = match expr.kind {
417 TypedExprKind::ColumnRef { column_index, .. } => {
418 let low_v = literal_value(low)?;
419 let high_v = literal_value(high)?;
420 (column_index, low_v, high_v)
421 }
422 _ => return None,
423 };
424 Some(PushdownFilter::Range {
425 column_idx,
426 min: Some(value_min),
427 max: Some(value_max),
428 })
429 }
430 TypedExprKind::IsNull { expr, negated } => match expr.kind {
431 TypedExprKind::ColumnRef { column_index, .. } => Some(PushdownFilter::IsNull {
432 column_idx: column_index,
433 is_null: !negated,
434 }),
435 _ => None,
436 },
437 _ => None,
438 }
439}
440
441fn extract_eq(left: &TypedExpr, right: &TypedExpr) -> Option<PushdownFilter> {
442 if let Some((col_idx, value)) = extract_column_literal(left, right) {
443 return Some(PushdownFilter::Eq {
444 column_idx: col_idx,
445 value,
446 });
447 }
448 if let Some((col_idx, value)) = extract_column_literal(right, left) {
449 return Some(PushdownFilter::Eq {
450 column_idx: col_idx,
451 value,
452 });
453 }
454 None
455}
456
457fn extract_range(op: &BinaryOp, left: &TypedExpr, right: &TypedExpr) -> Option<PushdownFilter> {
458 match (
459 extract_column_literal(left, right),
460 extract_column_literal(right, left),
461 ) {
462 (Some((col_idx, value)), _) => match op {
463 BinaryOp::Lt | BinaryOp::LtEq => Some(PushdownFilter::Range {
464 column_idx: col_idx,
465 min: None,
466 max: Some(value),
467 }),
468 BinaryOp::Gt | BinaryOp::GtEq => Some(PushdownFilter::Range {
469 column_idx: col_idx,
470 min: Some(value),
471 max: None,
472 }),
473 _ => None,
474 },
475 (_, Some((col_idx, value))) => match op {
476 BinaryOp::Lt | BinaryOp::LtEq => Some(PushdownFilter::Range {
477 column_idx: col_idx,
478 min: Some(value),
479 max: None,
480 }),
481 BinaryOp::Gt | BinaryOp::GtEq => Some(PushdownFilter::Range {
482 column_idx: col_idx,
483 min: None,
484 max: Some(value),
485 }),
486 _ => None,
487 },
488 _ => None,
489 }
490}
491
492fn extract_column_literal(
493 column_expr: &TypedExpr,
494 literal_expr: &TypedExpr,
495) -> Option<(usize, SqlValue)> {
496 match column_expr.kind {
497 TypedExprKind::ColumnRef { column_index, .. } => {
498 let value = literal_value(literal_expr)?;
499 Some((column_index, value))
500 }
501 _ => None,
502 }
503}
504
505fn literal_value(expr: &TypedExpr) -> Option<SqlValue> {
506 match &expr.kind {
507 TypedExprKind::Literal(_) | TypedExprKind::VectorLiteral(_) => {
508 evaluate(expr, &EvalContext::new(&[])).ok()
509 }
510 _ => None,
511 }
512}
513
514pub fn projection_to_columns(projection: &Projection, table_meta: &TableMetadata) -> Vec<usize> {
516 match projection {
517 Projection::All(names) => names
518 .iter()
519 .filter_map(|name| table_meta.columns.iter().position(|c| &c.name == name))
520 .collect(),
521 Projection::Columns(cols) => {
522 let mut indices = BTreeSet::new();
523 for col in cols {
524 collect_column_indices(&col.expr, &mut indices);
525 }
526 if indices.is_empty() {
527 return (0..table_meta.columns.len()).collect();
528 }
529 indices
530 .into_iter()
531 .filter(|idx| *idx < table_meta.columns.len())
532 .collect()
533 }
534 }
535}
536
537pub fn build_columnar_scan_for_filter(
539 table_meta: &TableMetadata,
540 projection: Projection,
541 predicate: &TypedExpr,
542) -> ColumnarScan {
543 let mut projected_columns = projection_to_columns(&projection, table_meta);
544 let mut predicate_indices = BTreeSet::new();
545 collect_column_indices(predicate, &mut predicate_indices);
546 for idx in predicate_indices {
547 if !projected_columns.contains(&idx) {
548 projected_columns.push(idx);
549 }
550 }
551 projected_columns.sort_unstable();
552 let pushed_filter = expr_to_pushdown(predicate);
553 ColumnarScan::new(
554 table_meta.table_id,
555 projected_columns,
556 pushed_filter,
557 Some(predicate.clone()),
558 )
559}
560
561pub fn build_columnar_scan(table_meta: &TableMetadata, projection: &Projection) -> ColumnarScan {
563 let projected_columns = projection_to_columns(projection, table_meta);
564 ColumnarScan::new(table_meta.table_id, projected_columns, None, None)
565}
566
567fn collect_column_indices(expr: &TypedExpr, acc: &mut BTreeSet<usize>) {
569 match &expr.kind {
570 TypedExprKind::ColumnRef { column_index, .. } => {
571 acc.insert(*column_index);
572 }
573 TypedExprKind::BinaryOp { left, right, .. } => {
574 collect_column_indices(left, acc);
575 collect_column_indices(right, acc);
576 }
577 TypedExprKind::UnaryOp { operand, .. } => collect_column_indices(operand, acc),
578 TypedExprKind::Between {
579 expr, low, high, ..
580 } => {
581 collect_column_indices(expr, acc);
582 collect_column_indices(low, acc);
583 collect_column_indices(high, acc);
584 }
585 TypedExprKind::InList { expr, list, .. } => {
586 collect_column_indices(expr, acc);
587 for item in list {
588 collect_column_indices(item, acc);
589 }
590 }
591 TypedExprKind::IsNull { expr, .. } => collect_column_indices(expr, acc),
592 TypedExprKind::FunctionCall { args, .. } => {
593 for arg in args {
594 collect_column_indices(arg, acc);
595 }
596 }
597 _ => {}
598 }
599}
600
601fn load_segment_index<'txn, S: KVStore + 'txn>(
602 txn: &mut impl SqlTxn<'txn, S>,
603 table_id: u32,
604) -> Result<Vec<u64>> {
605 let key = key_layout::segment_index_key(table_id);
606 let bytes = txn.inner_mut().get(&key)?;
607 if let Some(raw) = bytes {
608 bincode_config()
609 .deserialize(&raw)
610 .map_err(|e| ExecutorError::Columnar(e.to_string()))
611 } else {
612 Ok(Vec::new())
613 }
614}
615
616fn load_segment<'txn, S: KVStore + 'txn>(
617 txn: &mut impl SqlTxn<'txn, S>,
618 table_id: u32,
619 segment_id: u64,
620) -> Result<ColumnSegmentV2> {
621 let key = key_layout::column_segment_key(table_id, segment_id, 0);
622 let bytes = txn
623 .inner_mut()
624 .get(&key)?
625 .ok_or_else(|| ExecutorError::Columnar(format!("segment {segment_id} missing")))?;
626 bincode_config()
627 .deserialize(&bytes)
628 .map_err(|e| ExecutorError::Columnar(e.to_string()))
629}
630
631fn load_row_group_stats<'txn, S: KVStore + 'txn>(
632 txn: &mut impl SqlTxn<'txn, S>,
633 table_id: u32,
634 segment_id: u64,
635) -> Option<Vec<RowGroupStatistics>> {
636 let key = key_layout::row_group_stats_key(table_id, segment_id);
637 match txn.inner_mut().get(&key) {
638 Ok(Some(bytes)) => bincode_config().deserialize(&bytes).ok(),
639 Ok(None) => None,
640 Err(_) => None,
641 }
642}
643
644#[allow(clippy::too_many_arguments)]
645fn append_rows_from_batch(
646 out: &mut Vec<Row>,
647 batch: &alopex_core::columnar::segment_v2::RecordBatch,
648 table_meta: &TableMetadata,
649 projected: &[usize],
650 residual_filter: Option<&TypedExpr>,
651 row_id_mode: RowIdMode,
652 row_id_col_idx: Option<usize>,
653 next_row_id: &mut u64,
654) -> Result<()> {
655 if batch.columns.len() != projected.len() {
656 return Err(ExecutorError::Columnar(format!(
657 "projected column count mismatch: requested {}, got {}",
658 projected.len(),
659 batch.columns.len()
660 )));
661 }
662
663 let row_count = batch.num_rows();
664 for row_idx in 0..row_count {
665 let mut values = vec![SqlValue::Null; table_meta.column_count()];
666 for (pos, &table_col_idx) in projected.iter().enumerate() {
667 let column = batch
668 .columns
669 .get(pos)
670 .ok_or_else(|| ExecutorError::Columnar("missing projected column".into()))?;
671 let bitmap = batch.null_bitmaps.get(pos).and_then(|b| b.as_ref());
672 let value = value_from_column(
673 column,
674 bitmap,
675 row_idx,
676 &table_meta
677 .columns
678 .get(table_col_idx)
679 .ok_or_else(|| ExecutorError::Columnar("column index out of bounds".into()))?
680 .data_type,
681 )?;
682 values[table_col_idx] = value;
683 }
684
685 if let Some(predicate) = residual_filter {
686 let ctx = EvalContext::new(&values);
687 let keep = matches!(evaluate(predicate, &ctx)?, SqlValue::Boolean(true));
688 if !keep {
689 continue;
690 }
691 }
692
693 let row_id = match row_id_mode {
694 RowIdMode::Direct => {
695 if let Some(row_ids) = batch.row_ids.as_ref() {
696 *row_ids.get(row_idx).ok_or_else(|| {
697 ExecutorError::Columnar(
698 "row_id missing for row in row_id_mode=direct".into(),
699 )
700 })?
701 } else if let Some(idx) = row_id_col_idx {
702 let val = values.get(idx).ok_or_else(|| {
703 ExecutorError::Columnar("row_id column missing in projected values".into())
704 })?;
705 match val {
706 SqlValue::Integer(v) if *v >= 0 => *v as u64,
707 SqlValue::BigInt(v) if *v >= 0 => *v as u64,
708 other => {
709 return Err(ExecutorError::Columnar(format!(
710 "row_id column must be non-negative integer, got {}",
711 other.type_name()
712 )));
713 }
714 }
715 } else {
716 let rid = *next_row_id;
717 *next_row_id = next_row_id.saturating_add(1);
718 rid
719 }
720 }
721 RowIdMode::None => {
722 let rid = *next_row_id;
723 *next_row_id = next_row_id.saturating_add(1);
724 rid
725 }
726 };
727 out.push(Row::new(row_id, values));
728 }
729
730 Ok(())
731}
732
733fn value_from_column(
734 column: &Column,
735 bitmap: Option<&Bitmap>,
736 row_idx: usize,
737 ty: &ResolvedType,
738) -> Result<SqlValue> {
739 if let Some(bm) = bitmap
740 && !bm.get(row_idx)
741 {
742 return Ok(SqlValue::Null);
743 }
744
745 match (ty, column) {
746 (ResolvedType::Integer, Column::Int64(values)) => {
747 let v = *values
748 .get(row_idx)
749 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
750 Ok(SqlValue::Integer(v as i32))
751 }
752 (ResolvedType::BigInt | ResolvedType::Timestamp, Column::Int64(values)) => {
753 let v = *values
754 .get(row_idx)
755 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
756 if matches!(ty, ResolvedType::Timestamp) {
757 Ok(SqlValue::Timestamp(v))
758 } else {
759 Ok(SqlValue::BigInt(v))
760 }
761 }
762 (ResolvedType::Float, Column::Float32(values)) => {
763 let v = *values
764 .get(row_idx)
765 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
766 Ok(SqlValue::Float(v))
767 }
768 (ResolvedType::Double, Column::Float64(values)) => {
769 let v = *values
770 .get(row_idx)
771 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
772 Ok(SqlValue::Double(v))
773 }
774 (ResolvedType::Boolean, Column::Bool(values)) => {
775 let v = *values
776 .get(row_idx)
777 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
778 Ok(SqlValue::Boolean(v))
779 }
780 (ResolvedType::Text, Column::Binary(values)) => {
781 let raw = values
782 .get(row_idx)
783 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
784 String::from_utf8(raw.clone())
785 .map(SqlValue::Text)
786 .map_err(|e| ExecutorError::Columnar(e.to_string()))
787 }
788 (ResolvedType::Blob, Column::Binary(values)) => {
789 let raw = values
790 .get(row_idx)
791 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
792 Ok(SqlValue::Blob(raw.clone()))
793 }
794 (ResolvedType::Vector { .. }, Column::Fixed { values, .. }) => {
795 let raw = values
796 .get(row_idx)
797 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
798 if raw.len() % 4 != 0 {
799 return Err(ExecutorError::Columnar(
800 "invalid vector byte length in columnar segment".into(),
801 ));
802 }
803 let floats: Vec<f32> = raw
804 .chunks_exact(4)
805 .map(|bytes| f32::from_le_bytes(bytes.try_into().unwrap()))
806 .collect();
807 Ok(SqlValue::Vector(floats))
808 }
809 (_, Column::Binary(values)) => {
810 let raw = values
811 .get(row_idx)
812 .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
813 Ok(SqlValue::Blob(raw.clone()))
814 }
815 _ => Err(ExecutorError::Columnar(
816 "unsupported column type for columnar read".into(),
817 )),
818 }
819}
820#[cfg(test)]
821mod tests {
822 use super::*;
823 use crate::ast::expr::Literal;
824 use crate::catalog::{ColumnMetadata, RowIdMode, TableMetadata};
825 use crate::columnar::statistics::ColumnStatistics;
826 use crate::planner::typed_expr::TypedExpr;
827 use crate::planner::typed_expr::TypedExprKind;
828 use crate::planner::types::ResolvedType;
829 use crate::storage::TxnBridge;
830 use alopex_core::kv::memory::MemoryKV;
831 use bincode::config::Options;
832 use std::sync::Arc;
833
834 #[test]
835 fn evaluate_pushdown_eq_prunes_out_of_range() {
836 let stats = RowGroupStatistics {
837 row_count: 3,
838 columns: vec![ColumnStatistics {
839 min: SqlValue::Integer(1),
840 max: SqlValue::Integer(3),
841 null_count: 0,
842 total_count: 3,
843 distinct_count: None,
844 }],
845 row_id_min: None,
846 row_id_max: None,
847 };
848 let filter = PushdownFilter::Eq {
849 column_idx: 0,
850 value: SqlValue::Integer(10),
851 };
852 assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
853 }
854
855 #[test]
856 fn evaluate_pushdown_range_allows_overlap() {
857 let stats = RowGroupStatistics {
858 row_count: 3,
859 columns: vec![ColumnStatistics {
860 min: SqlValue::Integer(5),
861 max: SqlValue::Integer(10),
862 null_count: 0,
863 total_count: 3,
864 distinct_count: None,
865 }],
866 row_id_min: None,
867 row_id_max: None,
868 };
869 let filter = PushdownFilter::Range {
870 column_idx: 0,
871 min: Some(SqlValue::Integer(8)),
872 max: Some(SqlValue::Integer(12)),
873 };
874 assert!(!ColumnarScan::evaluate_pushdown(&filter, &stats));
875 }
876
877 #[test]
878 fn evaluate_pushdown_is_null_skips_when_no_nulls() {
879 let stats = RowGroupStatistics {
880 row_count: 2,
881 columns: vec![ColumnStatistics {
882 min: SqlValue::Integer(1),
883 max: SqlValue::Integer(2),
884 null_count: 0,
885 total_count: 2,
886 distinct_count: None,
887 }],
888 row_id_min: None,
889 row_id_max: None,
890 };
891 let filter = PushdownFilter::IsNull {
892 column_idx: 0,
893 is_null: true,
894 };
895 assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
896 }
897
898 #[test]
899 fn evaluate_pushdown_is_not_null_skips_when_all_null() {
900 let stats = RowGroupStatistics {
901 row_count: 2,
902 columns: vec![ColumnStatistics {
903 min: SqlValue::Null,
904 max: SqlValue::Null,
905 null_count: 2,
906 total_count: 2,
907 distinct_count: None,
908 }],
909 row_id_min: None,
910 row_id_max: None,
911 };
912 let filter = PushdownFilter::IsNull {
913 column_idx: 0,
914 is_null: false,
915 };
916 assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
917 }
918
919 #[test]
920 fn evaluate_pushdown_and_prunes_if_any_branch_skips() {
921 let stats = RowGroupStatistics {
922 row_count: 3,
923 columns: vec![ColumnStatistics {
924 min: SqlValue::Integer(1),
925 max: SqlValue::Integer(3),
926 null_count: 0,
927 total_count: 3,
928 distinct_count: None,
929 }],
930 row_id_min: None,
931 row_id_max: None,
932 };
933 let filter = PushdownFilter::And(vec![
934 PushdownFilter::Eq {
935 column_idx: 0,
936 value: SqlValue::Integer(10),
937 },
938 PushdownFilter::Eq {
939 column_idx: 0,
940 value: SqlValue::Integer(2),
941 },
942 ]);
943 assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
944 }
945
946 #[test]
947 fn evaluate_pushdown_or_keeps_if_any_branch_may_match() {
948 let stats = RowGroupStatistics {
949 row_count: 3,
950 columns: vec![ColumnStatistics {
951 min: SqlValue::Integer(1),
952 max: SqlValue::Integer(3),
953 null_count: 0,
954 total_count: 3,
955 distinct_count: None,
956 }],
957 row_id_min: None,
958 row_id_max: None,
959 };
960 let filter = PushdownFilter::Or(vec![
961 PushdownFilter::Eq {
962 column_idx: 0,
963 value: SqlValue::Integer(10),
964 },
965 PushdownFilter::Eq {
966 column_idx: 0,
967 value: SqlValue::Integer(2),
968 },
969 ]);
970 assert!(!ColumnarScan::evaluate_pushdown(&filter, &stats));
971 }
972
973 #[test]
974 fn expr_to_pushdown_converts_eq() {
975 let expr = TypedExpr {
976 kind: TypedExprKind::BinaryOp {
977 left: Box::new(TypedExpr::column_ref(
978 "t".into(),
979 "c".into(),
980 0,
981 ResolvedType::Integer,
982 crate::Span::default(),
983 )),
984 op: BinaryOp::Eq,
985 right: Box::new(TypedExpr::literal(
986 Literal::Number("1".into()),
987 ResolvedType::Integer,
988 crate::Span::default(),
989 )),
990 },
991 resolved_type: ResolvedType::Boolean,
992 span: crate::Span::default(),
993 };
994 let filter = expr_to_pushdown(&expr).unwrap();
995 assert_eq!(
996 filter,
997 PushdownFilter::Eq {
998 column_idx: 0,
999 value: SqlValue::Integer(1)
1000 }
1001 );
1002 }
1003
1004 #[test]
1005 fn execute_columnar_scan_applies_residual_filter() {
1006 let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
1007 let mut table = TableMetadata::new(
1008 "users",
1009 vec![
1010 ColumnMetadata::new("id", ResolvedType::Integer),
1011 ColumnMetadata::new("name", ResolvedType::Text),
1012 ],
1013 )
1014 .with_table_id(1);
1015 table.storage_options.storage_type = crate::catalog::StorageType::Columnar;
1016
1017 let schema = alopex_core::columnar::segment_v2::Schema {
1019 columns: vec![
1020 alopex_core::columnar::segment_v2::ColumnSchema {
1021 name: "id".into(),
1022 logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1023 nullable: false,
1024 fixed_len: None,
1025 },
1026 alopex_core::columnar::segment_v2::ColumnSchema {
1027 name: "name".into(),
1028 logical_type: alopex_core::columnar::encoding::LogicalType::Binary,
1029 nullable: false,
1030 fixed_len: None,
1031 },
1032 ],
1033 };
1034 let batch = alopex_core::columnar::segment_v2::RecordBatch::new(
1035 schema.clone(),
1036 vec![
1037 alopex_core::columnar::encoding::Column::Int64(vec![1]),
1038 alopex_core::columnar::encoding::Column::Binary(vec![b"alice".to_vec()]),
1039 ],
1040 vec![None, None],
1041 );
1042 let mut writer =
1043 alopex_core::columnar::segment_v2::SegmentWriterV2::new(Default::default());
1044 writer.write_batch(batch).unwrap();
1045 let segment = writer.finish().unwrap();
1046
1047 let stats = vec![crate::columnar::statistics::compute_row_group_statistics(
1048 &[vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]],
1049 )];
1050
1051 let mut txn = bridge.begin_write().unwrap();
1052 let segment_bytes = alopex_core::storage::format::bincode_config()
1053 .serialize(&segment)
1054 .unwrap();
1055 let meta_bytes = alopex_core::storage::format::bincode_config()
1056 .serialize(&segment.meta)
1057 .unwrap();
1058 let stats_bytes = alopex_core::storage::format::bincode_config()
1059 .serialize(&stats)
1060 .unwrap();
1061 txn.inner_mut()
1062 .put(
1063 alopex_core::columnar::kvs_bridge::key_layout::column_segment_key(1, 0, 0),
1064 segment_bytes,
1065 )
1066 .unwrap();
1067 txn.inner_mut()
1068 .put(
1069 alopex_core::columnar::kvs_bridge::key_layout::statistics_key(1, 0),
1070 meta_bytes,
1071 )
1072 .unwrap();
1073 txn.inner_mut()
1074 .put(
1075 alopex_core::columnar::kvs_bridge::key_layout::row_group_stats_key(1, 0),
1076 stats_bytes,
1077 )
1078 .unwrap();
1079 let index_bytes = alopex_core::storage::format::bincode_config()
1080 .serialize(&vec![0u64])
1081 .unwrap();
1082 txn.inner_mut()
1083 .put(
1084 alopex_core::columnar::kvs_bridge::key_layout::segment_index_key(1),
1085 index_bytes,
1086 )
1087 .unwrap();
1088 txn.commit().unwrap();
1089
1090 let scan = ColumnarScan::new(
1091 table.table_id,
1092 vec![0, 1],
1093 Some(PushdownFilter::Eq {
1094 column_idx: 0,
1095 value: SqlValue::Integer(1),
1096 }),
1097 Some(TypedExpr {
1098 kind: TypedExprKind::BinaryOp {
1099 left: Box::new(TypedExpr::column_ref(
1100 "users".into(),
1101 "id".into(),
1102 0,
1103 ResolvedType::Integer,
1104 crate::Span::default(),
1105 )),
1106 op: BinaryOp::Eq,
1107 right: Box::new(TypedExpr::literal(
1108 Literal::Number("1".into()),
1109 ResolvedType::Integer,
1110 crate::Span::default(),
1111 )),
1112 },
1113 resolved_type: ResolvedType::Boolean,
1114 span: crate::Span::default(),
1115 }),
1116 );
1117
1118 let mut read_txn = bridge.begin_read().unwrap();
1119 let rows = execute_columnar_scan(&mut read_txn, &table, &scan).unwrap();
1120 assert_eq!(rows.len(), 1);
1121 assert_eq!(rows[0].values[1], SqlValue::Text("alice".into()));
1122 }
1123
1124 #[test]
1125 fn rowid_mode_direct_prefers_rowid_column() {
1126 let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
1127 let mut table = TableMetadata::new(
1128 "items",
1129 vec![
1130 ColumnMetadata::new("row_id", ResolvedType::BigInt),
1131 ColumnMetadata::new("val", ResolvedType::Integer),
1132 ],
1133 )
1134 .with_table_id(20);
1135 table.storage_options.storage_type = crate::catalog::StorageType::Columnar;
1136 table.storage_options.row_id_mode = RowIdMode::Direct;
1137
1138 let schema = alopex_core::columnar::segment_v2::Schema {
1139 columns: vec![
1140 alopex_core::columnar::segment_v2::ColumnSchema {
1141 name: "row_id".into(),
1142 logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1143 nullable: false,
1144 fixed_len: None,
1145 },
1146 alopex_core::columnar::segment_v2::ColumnSchema {
1147 name: "val".into(),
1148 logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1149 nullable: false,
1150 fixed_len: None,
1151 },
1152 ],
1153 };
1154 let batch = alopex_core::columnar::segment_v2::RecordBatch::new(
1155 schema.clone(),
1156 vec![
1157 alopex_core::columnar::encoding::Column::Int64(vec![999]),
1158 alopex_core::columnar::encoding::Column::Int64(vec![7]),
1159 ],
1160 vec![None, None],
1161 );
1162 let mut writer =
1163 alopex_core::columnar::segment_v2::SegmentWriterV2::new(Default::default());
1164 writer.write_batch(batch).unwrap();
1165 let segment = writer.finish().unwrap();
1166 let stats = vec![crate::columnar::statistics::compute_row_group_statistics(
1167 &[vec![SqlValue::BigInt(999), SqlValue::Integer(7)]],
1168 )];
1169
1170 persist_segment_for_test(&bridge, table.table_id, &segment, &stats);
1171
1172 let scan = ColumnarScan::new(table.table_id, vec![0, 1], None, None);
1173 let mut read_txn = bridge.begin_read().unwrap();
1174 let rows = execute_columnar_scan(&mut read_txn, &table, &scan).unwrap();
1175 assert_eq!(rows.len(), 1);
1176 assert_eq!(rows[0].row_id, 999);
1177 assert_eq!(rows[0].values[1], SqlValue::Integer(7));
1178 }
1179
1180 #[test]
1181 fn rowid_mode_none_uses_position() {
1182 let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
1183 let mut table = TableMetadata::new(
1184 "items",
1185 vec![ColumnMetadata::new("val", ResolvedType::Integer)],
1186 )
1187 .with_table_id(21);
1188 table.storage_options.storage_type = crate::catalog::StorageType::Columnar;
1189 table.storage_options.row_id_mode = RowIdMode::Direct;
1190
1191 let schema = alopex_core::columnar::segment_v2::Schema {
1192 columns: vec![alopex_core::columnar::segment_v2::ColumnSchema {
1193 name: "val".into(),
1194 logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1195 nullable: false,
1196 fixed_len: None,
1197 }],
1198 };
1199 let batch = alopex_core::columnar::segment_v2::RecordBatch::new(
1200 schema.clone(),
1201 vec![alopex_core::columnar::encoding::Column::Int64(vec![3, 4])],
1202 vec![None],
1203 );
1204 let mut writer =
1205 alopex_core::columnar::segment_v2::SegmentWriterV2::new(Default::default());
1206 writer.write_batch(batch).unwrap();
1207 let segment = writer.finish().unwrap();
1208 let stats = vec![crate::columnar::statistics::compute_row_group_statistics(
1209 &[vec![SqlValue::Integer(3)], vec![SqlValue::Integer(4)]],
1210 )];
1211
1212 persist_segment_for_test(&bridge, table.table_id, &segment, &stats);
1213
1214 let scan = ColumnarScan::new(table.table_id, vec![0], None, None);
1215 let mut read_txn = bridge.begin_read().unwrap();
1216 let rows = execute_columnar_scan(&mut read_txn, &table, &scan).unwrap();
1217 assert_eq!(rows.len(), 2);
1218 assert_eq!(rows[0].row_id, 0);
1219 assert_eq!(rows[1].row_id, 1);
1220 }
1221
1222 fn persist_segment_for_test(
1223 bridge: &TxnBridge<MemoryKV>,
1224 table_id: u32,
1225 segment: &alopex_core::columnar::segment_v2::ColumnSegmentV2,
1226 row_group_stats: &[crate::columnar::statistics::RowGroupStatistics],
1227 ) {
1228 let mut txn = bridge.begin_write().unwrap();
1229 let segment_bytes = alopex_core::storage::format::bincode_config()
1230 .serialize(segment)
1231 .unwrap();
1232 let meta_bytes = alopex_core::storage::format::bincode_config()
1233 .serialize(&segment.meta)
1234 .unwrap();
1235 let stats_bytes = alopex_core::storage::format::bincode_config()
1236 .serialize(row_group_stats)
1237 .unwrap();
1238 txn.inner_mut()
1239 .put(
1240 alopex_core::columnar::kvs_bridge::key_layout::column_segment_key(table_id, 0, 0),
1241 segment_bytes,
1242 )
1243 .unwrap();
1244 txn.inner_mut()
1245 .put(
1246 alopex_core::columnar::kvs_bridge::key_layout::statistics_key(table_id, 0),
1247 meta_bytes,
1248 )
1249 .unwrap();
1250 txn.inner_mut()
1251 .put(
1252 alopex_core::columnar::kvs_bridge::key_layout::row_group_stats_key(table_id, 0),
1253 stats_bytes,
1254 )
1255 .unwrap();
1256 let index_bytes = alopex_core::storage::format::bincode_config()
1257 .serialize(&vec![0u64])
1258 .unwrap();
1259 txn.inner_mut()
1260 .put(
1261 alopex_core::columnar::kvs_bridge::key_layout::segment_index_key(table_id),
1262 index_bytes,
1263 )
1264 .unwrap();
1265 txn.commit().unwrap();
1266 }
1267}