alopex_sql/executor/query/
columnar_scan.rs

1use alopex_core::columnar::encoding::Column;
2use alopex_core::columnar::encoding_v2::Bitmap;
3use alopex_core::columnar::kvs_bridge::key_layout;
4use alopex_core::columnar::segment_v2::{ColumnSegmentV2, InMemorySegmentSource, SegmentReaderV2};
5use alopex_core::kv::{KVStore, KVTransaction};
6use alopex_core::storage::format::bincode_config;
7use bincode::config::Options;
8
9use crate::ast::expr::BinaryOp;
10use crate::catalog::{RowIdMode, TableMetadata};
11use crate::columnar::statistics::RowGroupStatistics;
12use crate::executor::evaluator::{EvalContext, evaluate};
13use crate::executor::{ExecutorError, Result, Row};
14use crate::planner::typed_expr::{Projection, TypedExpr, TypedExprKind};
15use crate::planner::types::ResolvedType;
16use crate::storage::{SqlTxn, SqlValue};
17use std::collections::BTreeSet;
18
19/// ColumnarScan オペレータ。
20#[derive(Debug, Clone)]
21pub struct ColumnarScan {
22    pub table_id: u32,
23    pub projected_columns: Vec<usize>,
24    pub pushed_filter: Option<PushdownFilter>,
25    pub residual_filter: Option<TypedExpr>,
26}
27
28/// プッシュダウン可能なフィルタ。
29#[derive(Debug, Clone, PartialEq)]
30pub enum PushdownFilter {
31    Eq {
32        column_idx: usize,
33        value: SqlValue,
34    },
35    Range {
36        column_idx: usize,
37        min: Option<SqlValue>,
38        max: Option<SqlValue>,
39    },
40    IsNull {
41        column_idx: usize,
42        is_null: bool,
43    },
44    And(Vec<PushdownFilter>),
45    Or(Vec<PushdownFilter>),
46}
47
48impl ColumnarScan {
49    pub fn new(
50        table_id: u32,
51        projected_columns: Vec<usize>,
52        pushed_filter: Option<PushdownFilter>,
53        residual_filter: Option<TypedExpr>,
54    ) -> Self {
55        Self {
56            table_id,
57            projected_columns,
58            pushed_filter,
59            residual_filter,
60        }
61    }
62
63    /// RowGroup をプルーニングするか判定する。
64    pub fn should_skip_row_group(&self, stats: &RowGroupStatistics) -> bool {
65        match &self.pushed_filter {
66            None => false,
67            Some(filter) => Self::evaluate_pushdown(filter, stats),
68        }
69    }
70
71    /// プッシュダウンフィルタを統計情報で評価する。
72    pub fn evaluate_pushdown(filter: &PushdownFilter, stats: &RowGroupStatistics) -> bool {
73        match filter {
74            PushdownFilter::Eq { column_idx, value } => match stats.columns.get(*column_idx) {
75                Some(col_stats) => {
76                    if col_stats.total_count == 0 {
77                        return true;
78                    }
79                    if matches!(
80                        value.partial_cmp(&col_stats.min),
81                        Some(std::cmp::Ordering::Less)
82                    ) {
83                        return true;
84                    }
85                    matches!(
86                        value.partial_cmp(&col_stats.max),
87                        Some(std::cmp::Ordering::Greater)
88                    )
89                }
90                None => false,
91            },
92
93            PushdownFilter::Range {
94                column_idx,
95                min,
96                max,
97            } => match stats.columns.get(*column_idx) {
98                Some(col_stats) => {
99                    if col_stats.total_count == 0 {
100                        return true;
101                    }
102                    if let Some(filter_min) = min
103                        && matches!(
104                            col_stats.max.partial_cmp(filter_min),
105                            Some(std::cmp::Ordering::Less)
106                        )
107                    {
108                        return true;
109                    }
110                    if let Some(filter_max) = max
111                        && matches!(
112                            col_stats.min.partial_cmp(filter_max),
113                            Some(std::cmp::Ordering::Greater)
114                        )
115                    {
116                        return true;
117                    }
118                    false
119                }
120                None => false,
121            },
122
123            PushdownFilter::IsNull {
124                column_idx,
125                is_null,
126            } => match stats.columns.get(*column_idx) {
127                Some(col_stats) => {
128                    if *is_null {
129                        col_stats.null_count == 0
130                    } else {
131                        col_stats.null_count == col_stats.total_count
132                    }
133                }
134                None => false,
135            },
136
137            PushdownFilter::And(filters) => {
138                if filters.is_empty() {
139                    return false;
140                }
141                filters.iter().any(|f| Self::evaluate_pushdown(f, stats))
142            }
143
144            PushdownFilter::Or(filters) => {
145                if filters.is_empty() {
146                    return false;
147                }
148                filters.iter().all(|f| Self::evaluate_pushdown(f, stats))
149            }
150        }
151    }
152}
153
154/// ColumnarScan を実行する。
155pub fn execute_columnar_scan<'txn, S: KVStore + 'txn>(
156    txn: &mut impl SqlTxn<'txn, S>,
157    table_meta: &TableMetadata,
158    scan: &ColumnarScan,
159) -> Result<Vec<Row>> {
160    debug_assert_eq!(scan.table_id, table_meta.table_id);
161    let projected: Vec<usize> = if scan.projected_columns.is_empty() {
162        (0..table_meta.columns.len()).collect()
163    } else {
164        scan.projected_columns.clone()
165    };
166
167    let segment_ids = load_segment_index(txn, table_meta.table_id)?;
168    if segment_ids.is_empty() {
169        return Ok(Vec::new());
170    }
171
172    let row_id_col_idx = if table_meta.storage_options.row_id_mode == RowIdMode::Direct {
173        table_meta
174            .columns
175            .iter()
176            .position(|c| c.name.eq_ignore_ascii_case("row_id"))
177    } else {
178        None
179    };
180
181    let mut results = Vec::new();
182    let mut next_row_id = 0u64;
183    for segment_id in segment_ids {
184        let segment = load_segment(txn, table_meta.table_id, segment_id)?;
185        let reader =
186            SegmentReaderV2::open(Box::new(InMemorySegmentSource::new(segment.data.clone())))
187                .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
188
189        let row_group_stats = load_row_group_stats(txn, table_meta.table_id, segment_id);
190        let row_group_count = segment.meta.row_groups.len();
191        for rg_index in 0..row_group_count {
192            let should_skip = match row_group_stats.as_ref() {
193                Some(stats) if stats.len() == row_group_count => {
194                    scan.should_skip_row_group(&stats[rg_index])
195                }
196                _ => false,
197            };
198            if should_skip {
199                continue;
200            }
201
202            let batch = reader
203                .read_row_group_by_index(&projected, rg_index)
204                .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
205            let batch = if !segment.row_ids.is_empty() {
206                if let Some(meta) = segment.meta.row_groups.get(rg_index) {
207                    let start = meta.row_start as usize;
208                    let end = start + meta.row_count as usize;
209                    if end <= segment.row_ids.len() {
210                        batch.with_row_ids(Some(segment.row_ids[start..end].to_vec()))
211                    } else {
212                        batch
213                    }
214                } else {
215                    batch
216                }
217            } else {
218                batch
219            };
220            append_rows_from_batch(
221                &mut results,
222                &batch,
223                table_meta,
224                &projected,
225                scan.residual_filter.as_ref(),
226                table_meta.storage_options.row_id_mode,
227                row_id_col_idx,
228                &mut next_row_id,
229            )?;
230        }
231    }
232
233    Ok(results)
234}
235
236/// ColumnarScan を実行し、フィルタ後の RowID のみを返す。
237///
238/// RowIdMode::Direct で columnar ストレージの場合、行本体の読み込みを避けて
239/// RowID 再フェッチ用の候補セットを得る目的で使用する。
240pub fn execute_columnar_row_ids<'txn, S: KVStore + 'txn>(
241    txn: &mut impl SqlTxn<'txn, S>,
242    table_meta: &TableMetadata,
243    scan: &ColumnarScan,
244) -> Result<Vec<u64>> {
245    if table_meta.storage_options.storage_type != crate::catalog::StorageType::Columnar {
246        return Err(ExecutorError::Columnar(
247            "execute_columnar_row_ids requires columnar storage".into(),
248        ));
249    }
250
251    let mut needed: BTreeSet<usize> = scan.projected_columns.iter().copied().collect();
252    if let Some(pred) = &scan.residual_filter {
253        collect_column_indices(pred, &mut needed);
254    }
255    let projected: Vec<usize> = needed.into_iter().collect();
256
257    let segment_ids = load_segment_index(txn, table_meta.table_id)?;
258    if segment_ids.is_empty() {
259        return Ok(Vec::new());
260    }
261
262    let row_id_col_idx = if table_meta.storage_options.row_id_mode == RowIdMode::Direct {
263        table_meta
264            .columns
265            .iter()
266            .position(|c| c.name.eq_ignore_ascii_case("row_id"))
267    } else {
268        None
269    };
270
271    let mut results = Vec::new();
272    let mut next_row_id = 0u64;
273    for segment_id in segment_ids {
274        let segment = load_segment(txn, table_meta.table_id, segment_id)?;
275        let reader =
276            SegmentReaderV2::open(Box::new(InMemorySegmentSource::new(segment.data.clone())))
277                .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
278
279        let row_group_stats = load_row_group_stats(txn, table_meta.table_id, segment_id);
280        let row_group_count = segment.meta.row_groups.len();
281        for rg_index in 0..row_group_count {
282            let should_skip = match row_group_stats.as_ref() {
283                Some(stats) if stats.len() == row_group_count => {
284                    scan.should_skip_row_group(&stats[rg_index])
285                }
286                _ => false,
287            };
288            if should_skip {
289                continue;
290            }
291
292            let batch = reader
293                .read_row_group_by_index(&projected, rg_index)
294                .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
295            let batch = if !segment.row_ids.is_empty() {
296                if let Some(meta) = segment.meta.row_groups.get(rg_index) {
297                    let start = meta.row_start as usize;
298                    let end = start + meta.row_count as usize;
299                    if end <= segment.row_ids.len() {
300                        batch.with_row_ids(Some(segment.row_ids[start..end].to_vec()))
301                    } else {
302                        batch
303                    }
304                } else {
305                    batch
306                }
307            } else {
308                batch
309            };
310
311            let row_count = batch.num_rows();
312            for row_idx in 0..row_count {
313                // 残余フィルタの評価に必要なカラムだけ値を復元する。
314                let mut values = vec![SqlValue::Null; table_meta.column_count()];
315                for (pos, &table_col_idx) in projected.iter().enumerate() {
316                    let column = batch.columns.get(pos).ok_or_else(|| {
317                        ExecutorError::Columnar("missing projected column".into())
318                    })?;
319                    let bitmap = batch.null_bitmaps.get(pos).and_then(|b| b.as_ref());
320                    let value = value_from_column(
321                        column,
322                        bitmap,
323                        row_idx,
324                        &table_meta
325                            .columns
326                            .get(table_col_idx)
327                            .ok_or_else(|| {
328                                ExecutorError::Columnar("column index out of bounds".into())
329                            })?
330                            .data_type,
331                    )?;
332                    values[table_col_idx] = value;
333                }
334
335                if let Some(predicate) = scan.residual_filter.as_ref() {
336                    let ctx = EvalContext::new(&values);
337                    let keep = matches!(evaluate(predicate, &ctx)?, SqlValue::Boolean(true));
338                    if !keep {
339                        continue;
340                    }
341                }
342
343                let row_id = match table_meta.storage_options.row_id_mode {
344                    RowIdMode::Direct => {
345                        if let Some(row_ids) = batch.row_ids.as_ref() {
346                            *row_ids.get(row_idx).ok_or_else(|| {
347                                ExecutorError::Columnar(
348                                    "row_id missing for row in row_id_mode=direct".into(),
349                                )
350                            })?
351                        } else if let Some(idx) = row_id_col_idx {
352                            let val = values.get(idx).ok_or_else(|| {
353                                ExecutorError::Columnar(
354                                    "row_id column missing in projected values".into(),
355                                )
356                            })?;
357                            match val {
358                                SqlValue::Integer(v) if *v >= 0 => *v as u64,
359                                SqlValue::BigInt(v) if *v >= 0 => *v as u64,
360                                other => {
361                                    return Err(ExecutorError::Columnar(format!(
362                                        "row_id column must be non-negative integer, got {}",
363                                        other.type_name()
364                                    )));
365                                }
366                            }
367                        } else {
368                            let rid = next_row_id;
369                            next_row_id = next_row_id.saturating_add(1);
370                            rid
371                        }
372                    }
373                    RowIdMode::None => {
374                        let rid = next_row_id;
375                        next_row_id = next_row_id.saturating_add(1);
376                        rid
377                    }
378                };
379                results.push(row_id);
380            }
381        }
382    }
383
384    Ok(results)
385}
386
387/// TypedExpr から PushdownFilter へ変換する(変換不可なら None)。
388pub fn expr_to_pushdown(expr: &TypedExpr) -> Option<PushdownFilter> {
389    match &expr.kind {
390        TypedExprKind::BinaryOp { left, op, right } => match op {
391            BinaryOp::And => {
392                let l = expr_to_pushdown(left)?;
393                let r = expr_to_pushdown(right)?;
394                Some(PushdownFilter::And(vec![l, r]))
395            }
396            BinaryOp::Or => {
397                let l = expr_to_pushdown(left)?;
398                let r = expr_to_pushdown(right)?;
399                Some(PushdownFilter::Or(vec![l, r]))
400            }
401            BinaryOp::Eq => extract_eq(left, right),
402            BinaryOp::Lt | BinaryOp::LtEq | BinaryOp::Gt | BinaryOp::GtEq => {
403                extract_range(op, left, right)
404            }
405            _ => None,
406        },
407        TypedExprKind::Between {
408            expr,
409            low,
410            high,
411            negated,
412        } => {
413            if *negated {
414                return None;
415            }
416            let (column_idx, value_min, value_max) = match expr.kind {
417                TypedExprKind::ColumnRef { column_index, .. } => {
418                    let low_v = literal_value(low)?;
419                    let high_v = literal_value(high)?;
420                    (column_index, low_v, high_v)
421                }
422                _ => return None,
423            };
424            Some(PushdownFilter::Range {
425                column_idx,
426                min: Some(value_min),
427                max: Some(value_max),
428            })
429        }
430        TypedExprKind::IsNull { expr, negated } => match expr.kind {
431            TypedExprKind::ColumnRef { column_index, .. } => Some(PushdownFilter::IsNull {
432                column_idx: column_index,
433                is_null: !negated,
434            }),
435            _ => None,
436        },
437        _ => None,
438    }
439}
440
441fn extract_eq(left: &TypedExpr, right: &TypedExpr) -> Option<PushdownFilter> {
442    if let Some((col_idx, value)) = extract_column_literal(left, right) {
443        return Some(PushdownFilter::Eq {
444            column_idx: col_idx,
445            value,
446        });
447    }
448    if let Some((col_idx, value)) = extract_column_literal(right, left) {
449        return Some(PushdownFilter::Eq {
450            column_idx: col_idx,
451            value,
452        });
453    }
454    None
455}
456
457fn extract_range(op: &BinaryOp, left: &TypedExpr, right: &TypedExpr) -> Option<PushdownFilter> {
458    match (
459        extract_column_literal(left, right),
460        extract_column_literal(right, left),
461    ) {
462        (Some((col_idx, value)), _) => match op {
463            BinaryOp::Lt | BinaryOp::LtEq => Some(PushdownFilter::Range {
464                column_idx: col_idx,
465                min: None,
466                max: Some(value),
467            }),
468            BinaryOp::Gt | BinaryOp::GtEq => Some(PushdownFilter::Range {
469                column_idx: col_idx,
470                min: Some(value),
471                max: None,
472            }),
473            _ => None,
474        },
475        (_, Some((col_idx, value))) => match op {
476            BinaryOp::Lt | BinaryOp::LtEq => Some(PushdownFilter::Range {
477                column_idx: col_idx,
478                min: Some(value),
479                max: None,
480            }),
481            BinaryOp::Gt | BinaryOp::GtEq => Some(PushdownFilter::Range {
482                column_idx: col_idx,
483                min: None,
484                max: Some(value),
485            }),
486            _ => None,
487        },
488        _ => None,
489    }
490}
491
492fn extract_column_literal(
493    column_expr: &TypedExpr,
494    literal_expr: &TypedExpr,
495) -> Option<(usize, SqlValue)> {
496    match column_expr.kind {
497        TypedExprKind::ColumnRef { column_index, .. } => {
498            let value = literal_value(literal_expr)?;
499            Some((column_index, value))
500        }
501        _ => None,
502    }
503}
504
505fn literal_value(expr: &TypedExpr) -> Option<SqlValue> {
506    match &expr.kind {
507        TypedExprKind::Literal(_) | TypedExprKind::VectorLiteral(_) => {
508            evaluate(expr, &EvalContext::new(&[])).ok()
509        }
510        _ => None,
511    }
512}
513
514/// projection 情報からカラムインデックスを推定する(現状は全カラム)。
515pub fn projection_to_columns(projection: &Projection, table_meta: &TableMetadata) -> Vec<usize> {
516    match projection {
517        Projection::All(names) => names
518            .iter()
519            .filter_map(|name| table_meta.columns.iter().position(|c| &c.name == name))
520            .collect(),
521        Projection::Columns(cols) => {
522            let mut indices = BTreeSet::new();
523            for col in cols {
524                collect_column_indices(&col.expr, &mut indices);
525            }
526            if indices.is_empty() {
527                return (0..table_meta.columns.len()).collect();
528            }
529            indices
530                .into_iter()
531                .filter(|idx| *idx < table_meta.columns.len())
532                .collect()
533        }
534    }
535}
536
537/// フィルタと Projection を ColumnarScan にまとめるユーティリティ。
538pub fn build_columnar_scan_for_filter(
539    table_meta: &TableMetadata,
540    projection: Projection,
541    predicate: &TypedExpr,
542) -> ColumnarScan {
543    let mut projected_columns = projection_to_columns(&projection, table_meta);
544    let mut predicate_indices = BTreeSet::new();
545    collect_column_indices(predicate, &mut predicate_indices);
546    for idx in predicate_indices {
547        if !projected_columns.contains(&idx) {
548            projected_columns.push(idx);
549        }
550    }
551    projected_columns.sort_unstable();
552    let pushed_filter = expr_to_pushdown(predicate);
553    ColumnarScan::new(
554        table_meta.table_id,
555        projected_columns,
556        pushed_filter,
557        Some(predicate.clone()),
558    )
559}
560
561/// Projection だけを指定して ColumnarScan を構築する。
562pub fn build_columnar_scan(table_meta: &TableMetadata, projection: &Projection) -> ColumnarScan {
563    let projected_columns = projection_to_columns(projection, table_meta);
564    ColumnarScan::new(table_meta.table_id, projected_columns, None, None)
565}
566
567/// 式中に現れるカラムインデックスを収集する。
568fn collect_column_indices(expr: &TypedExpr, acc: &mut BTreeSet<usize>) {
569    match &expr.kind {
570        TypedExprKind::ColumnRef { column_index, .. } => {
571            acc.insert(*column_index);
572        }
573        TypedExprKind::BinaryOp { left, right, .. } => {
574            collect_column_indices(left, acc);
575            collect_column_indices(right, acc);
576        }
577        TypedExprKind::UnaryOp { operand, .. } => collect_column_indices(operand, acc),
578        TypedExprKind::Between {
579            expr, low, high, ..
580        } => {
581            collect_column_indices(expr, acc);
582            collect_column_indices(low, acc);
583            collect_column_indices(high, acc);
584        }
585        TypedExprKind::InList { expr, list, .. } => {
586            collect_column_indices(expr, acc);
587            for item in list {
588                collect_column_indices(item, acc);
589            }
590        }
591        TypedExprKind::IsNull { expr, .. } => collect_column_indices(expr, acc),
592        TypedExprKind::FunctionCall { args, .. } => {
593            for arg in args {
594                collect_column_indices(arg, acc);
595            }
596        }
597        _ => {}
598    }
599}
600
601fn load_segment_index<'txn, S: KVStore + 'txn>(
602    txn: &mut impl SqlTxn<'txn, S>,
603    table_id: u32,
604) -> Result<Vec<u64>> {
605    let key = key_layout::segment_index_key(table_id);
606    let bytes = txn.inner_mut().get(&key)?;
607    if let Some(raw) = bytes {
608        bincode_config()
609            .deserialize(&raw)
610            .map_err(|e| ExecutorError::Columnar(e.to_string()))
611    } else {
612        Ok(Vec::new())
613    }
614}
615
616fn load_segment<'txn, S: KVStore + 'txn>(
617    txn: &mut impl SqlTxn<'txn, S>,
618    table_id: u32,
619    segment_id: u64,
620) -> Result<ColumnSegmentV2> {
621    let key = key_layout::column_segment_key(table_id, segment_id, 0);
622    let bytes = txn
623        .inner_mut()
624        .get(&key)?
625        .ok_or_else(|| ExecutorError::Columnar(format!("segment {segment_id} missing")))?;
626    bincode_config()
627        .deserialize(&bytes)
628        .map_err(|e| ExecutorError::Columnar(e.to_string()))
629}
630
631fn load_row_group_stats<'txn, S: KVStore + 'txn>(
632    txn: &mut impl SqlTxn<'txn, S>,
633    table_id: u32,
634    segment_id: u64,
635) -> Option<Vec<RowGroupStatistics>> {
636    let key = key_layout::row_group_stats_key(table_id, segment_id);
637    match txn.inner_mut().get(&key) {
638        Ok(Some(bytes)) => bincode_config().deserialize(&bytes).ok(),
639        Ok(None) => None,
640        Err(_) => None,
641    }
642}
643
644#[allow(clippy::too_many_arguments)]
645fn append_rows_from_batch(
646    out: &mut Vec<Row>,
647    batch: &alopex_core::columnar::segment_v2::RecordBatch,
648    table_meta: &TableMetadata,
649    projected: &[usize],
650    residual_filter: Option<&TypedExpr>,
651    row_id_mode: RowIdMode,
652    row_id_col_idx: Option<usize>,
653    next_row_id: &mut u64,
654) -> Result<()> {
655    if batch.columns.len() != projected.len() {
656        return Err(ExecutorError::Columnar(format!(
657            "projected column count mismatch: requested {}, got {}",
658            projected.len(),
659            batch.columns.len()
660        )));
661    }
662
663    let row_count = batch.num_rows();
664    for row_idx in 0..row_count {
665        let mut values = vec![SqlValue::Null; table_meta.column_count()];
666        for (pos, &table_col_idx) in projected.iter().enumerate() {
667            let column = batch
668                .columns
669                .get(pos)
670                .ok_or_else(|| ExecutorError::Columnar("missing projected column".into()))?;
671            let bitmap = batch.null_bitmaps.get(pos).and_then(|b| b.as_ref());
672            let value = value_from_column(
673                column,
674                bitmap,
675                row_idx,
676                &table_meta
677                    .columns
678                    .get(table_col_idx)
679                    .ok_or_else(|| ExecutorError::Columnar("column index out of bounds".into()))?
680                    .data_type,
681            )?;
682            values[table_col_idx] = value;
683        }
684
685        if let Some(predicate) = residual_filter {
686            let ctx = EvalContext::new(&values);
687            let keep = matches!(evaluate(predicate, &ctx)?, SqlValue::Boolean(true));
688            if !keep {
689                continue;
690            }
691        }
692
693        let row_id = match row_id_mode {
694            RowIdMode::Direct => {
695                if let Some(row_ids) = batch.row_ids.as_ref() {
696                    *row_ids.get(row_idx).ok_or_else(|| {
697                        ExecutorError::Columnar(
698                            "row_id missing for row in row_id_mode=direct".into(),
699                        )
700                    })?
701                } else if let Some(idx) = row_id_col_idx {
702                    let val = values.get(idx).ok_or_else(|| {
703                        ExecutorError::Columnar("row_id column missing in projected values".into())
704                    })?;
705                    match val {
706                        SqlValue::Integer(v) if *v >= 0 => *v as u64,
707                        SqlValue::BigInt(v) if *v >= 0 => *v as u64,
708                        other => {
709                            return Err(ExecutorError::Columnar(format!(
710                                "row_id column must be non-negative integer, got {}",
711                                other.type_name()
712                            )));
713                        }
714                    }
715                } else {
716                    let rid = *next_row_id;
717                    *next_row_id = next_row_id.saturating_add(1);
718                    rid
719                }
720            }
721            RowIdMode::None => {
722                let rid = *next_row_id;
723                *next_row_id = next_row_id.saturating_add(1);
724                rid
725            }
726        };
727        out.push(Row::new(row_id, values));
728    }
729
730    Ok(())
731}
732
733fn value_from_column(
734    column: &Column,
735    bitmap: Option<&Bitmap>,
736    row_idx: usize,
737    ty: &ResolvedType,
738) -> Result<SqlValue> {
739    if let Some(bm) = bitmap
740        && !bm.get(row_idx)
741    {
742        return Ok(SqlValue::Null);
743    }
744
745    match (ty, column) {
746        (ResolvedType::Integer, Column::Int64(values)) => {
747            let v = *values
748                .get(row_idx)
749                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
750            Ok(SqlValue::Integer(v as i32))
751        }
752        (ResolvedType::BigInt | ResolvedType::Timestamp, Column::Int64(values)) => {
753            let v = *values
754                .get(row_idx)
755                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
756            if matches!(ty, ResolvedType::Timestamp) {
757                Ok(SqlValue::Timestamp(v))
758            } else {
759                Ok(SqlValue::BigInt(v))
760            }
761        }
762        (ResolvedType::Float, Column::Float32(values)) => {
763            let v = *values
764                .get(row_idx)
765                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
766            Ok(SqlValue::Float(v))
767        }
768        (ResolvedType::Double, Column::Float64(values)) => {
769            let v = *values
770                .get(row_idx)
771                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
772            Ok(SqlValue::Double(v))
773        }
774        (ResolvedType::Boolean, Column::Bool(values)) => {
775            let v = *values
776                .get(row_idx)
777                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
778            Ok(SqlValue::Boolean(v))
779        }
780        (ResolvedType::Text, Column::Binary(values)) => {
781            let raw = values
782                .get(row_idx)
783                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
784            String::from_utf8(raw.clone())
785                .map(SqlValue::Text)
786                .map_err(|e| ExecutorError::Columnar(e.to_string()))
787        }
788        (ResolvedType::Blob, Column::Binary(values)) => {
789            let raw = values
790                .get(row_idx)
791                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
792            Ok(SqlValue::Blob(raw.clone()))
793        }
794        (ResolvedType::Vector { .. }, Column::Fixed { values, .. }) => {
795            let raw = values
796                .get(row_idx)
797                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
798            if raw.len() % 4 != 0 {
799                return Err(ExecutorError::Columnar(
800                    "invalid vector byte length in columnar segment".into(),
801                ));
802            }
803            let floats: Vec<f32> = raw
804                .chunks_exact(4)
805                .map(|bytes| f32::from_le_bytes(bytes.try_into().unwrap()))
806                .collect();
807            Ok(SqlValue::Vector(floats))
808        }
809        (_, Column::Binary(values)) => {
810            let raw = values
811                .get(row_idx)
812                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
813            Ok(SqlValue::Blob(raw.clone()))
814        }
815        _ => Err(ExecutorError::Columnar(
816            "unsupported column type for columnar read".into(),
817        )),
818    }
819}
820#[cfg(test)]
821mod tests {
822    use super::*;
823    use crate::ast::expr::Literal;
824    use crate::catalog::{ColumnMetadata, RowIdMode, TableMetadata};
825    use crate::columnar::statistics::ColumnStatistics;
826    use crate::planner::typed_expr::TypedExpr;
827    use crate::planner::typed_expr::TypedExprKind;
828    use crate::planner::types::ResolvedType;
829    use crate::storage::TxnBridge;
830    use alopex_core::kv::memory::MemoryKV;
831    use bincode::config::Options;
832    use std::sync::Arc;
833
834    #[test]
835    fn evaluate_pushdown_eq_prunes_out_of_range() {
836        let stats = RowGroupStatistics {
837            row_count: 3,
838            columns: vec![ColumnStatistics {
839                min: SqlValue::Integer(1),
840                max: SqlValue::Integer(3),
841                null_count: 0,
842                total_count: 3,
843                distinct_count: None,
844            }],
845            row_id_min: None,
846            row_id_max: None,
847        };
848        let filter = PushdownFilter::Eq {
849            column_idx: 0,
850            value: SqlValue::Integer(10),
851        };
852        assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
853    }
854
855    #[test]
856    fn evaluate_pushdown_range_allows_overlap() {
857        let stats = RowGroupStatistics {
858            row_count: 3,
859            columns: vec![ColumnStatistics {
860                min: SqlValue::Integer(5),
861                max: SqlValue::Integer(10),
862                null_count: 0,
863                total_count: 3,
864                distinct_count: None,
865            }],
866            row_id_min: None,
867            row_id_max: None,
868        };
869        let filter = PushdownFilter::Range {
870            column_idx: 0,
871            min: Some(SqlValue::Integer(8)),
872            max: Some(SqlValue::Integer(12)),
873        };
874        assert!(!ColumnarScan::evaluate_pushdown(&filter, &stats));
875    }
876
877    #[test]
878    fn evaluate_pushdown_is_null_skips_when_no_nulls() {
879        let stats = RowGroupStatistics {
880            row_count: 2,
881            columns: vec![ColumnStatistics {
882                min: SqlValue::Integer(1),
883                max: SqlValue::Integer(2),
884                null_count: 0,
885                total_count: 2,
886                distinct_count: None,
887            }],
888            row_id_min: None,
889            row_id_max: None,
890        };
891        let filter = PushdownFilter::IsNull {
892            column_idx: 0,
893            is_null: true,
894        };
895        assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
896    }
897
898    #[test]
899    fn evaluate_pushdown_is_not_null_skips_when_all_null() {
900        let stats = RowGroupStatistics {
901            row_count: 2,
902            columns: vec![ColumnStatistics {
903                min: SqlValue::Null,
904                max: SqlValue::Null,
905                null_count: 2,
906                total_count: 2,
907                distinct_count: None,
908            }],
909            row_id_min: None,
910            row_id_max: None,
911        };
912        let filter = PushdownFilter::IsNull {
913            column_idx: 0,
914            is_null: false,
915        };
916        assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
917    }
918
919    #[test]
920    fn evaluate_pushdown_and_prunes_if_any_branch_skips() {
921        let stats = RowGroupStatistics {
922            row_count: 3,
923            columns: vec![ColumnStatistics {
924                min: SqlValue::Integer(1),
925                max: SqlValue::Integer(3),
926                null_count: 0,
927                total_count: 3,
928                distinct_count: None,
929            }],
930            row_id_min: None,
931            row_id_max: None,
932        };
933        let filter = PushdownFilter::And(vec![
934            PushdownFilter::Eq {
935                column_idx: 0,
936                value: SqlValue::Integer(10),
937            },
938            PushdownFilter::Eq {
939                column_idx: 0,
940                value: SqlValue::Integer(2),
941            },
942        ]);
943        assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
944    }
945
946    #[test]
947    fn evaluate_pushdown_or_keeps_if_any_branch_may_match() {
948        let stats = RowGroupStatistics {
949            row_count: 3,
950            columns: vec![ColumnStatistics {
951                min: SqlValue::Integer(1),
952                max: SqlValue::Integer(3),
953                null_count: 0,
954                total_count: 3,
955                distinct_count: None,
956            }],
957            row_id_min: None,
958            row_id_max: None,
959        };
960        let filter = PushdownFilter::Or(vec![
961            PushdownFilter::Eq {
962                column_idx: 0,
963                value: SqlValue::Integer(10),
964            },
965            PushdownFilter::Eq {
966                column_idx: 0,
967                value: SqlValue::Integer(2),
968            },
969        ]);
970        assert!(!ColumnarScan::evaluate_pushdown(&filter, &stats));
971    }
972
973    #[test]
974    fn expr_to_pushdown_converts_eq() {
975        let expr = TypedExpr {
976            kind: TypedExprKind::BinaryOp {
977                left: Box::new(TypedExpr::column_ref(
978                    "t".into(),
979                    "c".into(),
980                    0,
981                    ResolvedType::Integer,
982                    crate::Span::default(),
983                )),
984                op: BinaryOp::Eq,
985                right: Box::new(TypedExpr::literal(
986                    Literal::Number("1".into()),
987                    ResolvedType::Integer,
988                    crate::Span::default(),
989                )),
990            },
991            resolved_type: ResolvedType::Boolean,
992            span: crate::Span::default(),
993        };
994        let filter = expr_to_pushdown(&expr).unwrap();
995        assert_eq!(
996            filter,
997            PushdownFilter::Eq {
998                column_idx: 0,
999                value: SqlValue::Integer(1)
1000            }
1001        );
1002    }
1003
1004    #[test]
1005    fn execute_columnar_scan_applies_residual_filter() {
1006        let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
1007        let mut table = TableMetadata::new(
1008            "users",
1009            vec![
1010                ColumnMetadata::new("id", ResolvedType::Integer),
1011                ColumnMetadata::new("name", ResolvedType::Text),
1012            ],
1013        )
1014        .with_table_id(1);
1015        table.storage_options.storage_type = crate::catalog::StorageType::Columnar;
1016
1017        // Columnar セグメントを直接書き込む。
1018        let schema = alopex_core::columnar::segment_v2::Schema {
1019            columns: vec![
1020                alopex_core::columnar::segment_v2::ColumnSchema {
1021                    name: "id".into(),
1022                    logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1023                    nullable: false,
1024                    fixed_len: None,
1025                },
1026                alopex_core::columnar::segment_v2::ColumnSchema {
1027                    name: "name".into(),
1028                    logical_type: alopex_core::columnar::encoding::LogicalType::Binary,
1029                    nullable: false,
1030                    fixed_len: None,
1031                },
1032            ],
1033        };
1034        let batch = alopex_core::columnar::segment_v2::RecordBatch::new(
1035            schema.clone(),
1036            vec![
1037                alopex_core::columnar::encoding::Column::Int64(vec![1]),
1038                alopex_core::columnar::encoding::Column::Binary(vec![b"alice".to_vec()]),
1039            ],
1040            vec![None, None],
1041        );
1042        let mut writer =
1043            alopex_core::columnar::segment_v2::SegmentWriterV2::new(Default::default());
1044        writer.write_batch(batch).unwrap();
1045        let segment = writer.finish().unwrap();
1046
1047        let stats = vec![crate::columnar::statistics::compute_row_group_statistics(
1048            &[vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]],
1049        )];
1050
1051        let mut txn = bridge.begin_write().unwrap();
1052        let segment_bytes = alopex_core::storage::format::bincode_config()
1053            .serialize(&segment)
1054            .unwrap();
1055        let meta_bytes = alopex_core::storage::format::bincode_config()
1056            .serialize(&segment.meta)
1057            .unwrap();
1058        let stats_bytes = alopex_core::storage::format::bincode_config()
1059            .serialize(&stats)
1060            .unwrap();
1061        txn.inner_mut()
1062            .put(
1063                alopex_core::columnar::kvs_bridge::key_layout::column_segment_key(1, 0, 0),
1064                segment_bytes,
1065            )
1066            .unwrap();
1067        txn.inner_mut()
1068            .put(
1069                alopex_core::columnar::kvs_bridge::key_layout::statistics_key(1, 0),
1070                meta_bytes,
1071            )
1072            .unwrap();
1073        txn.inner_mut()
1074            .put(
1075                alopex_core::columnar::kvs_bridge::key_layout::row_group_stats_key(1, 0),
1076                stats_bytes,
1077            )
1078            .unwrap();
1079        let index_bytes = alopex_core::storage::format::bincode_config()
1080            .serialize(&vec![0u64])
1081            .unwrap();
1082        txn.inner_mut()
1083            .put(
1084                alopex_core::columnar::kvs_bridge::key_layout::segment_index_key(1),
1085                index_bytes,
1086            )
1087            .unwrap();
1088        txn.commit().unwrap();
1089
1090        let scan = ColumnarScan::new(
1091            table.table_id,
1092            vec![0, 1],
1093            Some(PushdownFilter::Eq {
1094                column_idx: 0,
1095                value: SqlValue::Integer(1),
1096            }),
1097            Some(TypedExpr {
1098                kind: TypedExprKind::BinaryOp {
1099                    left: Box::new(TypedExpr::column_ref(
1100                        "users".into(),
1101                        "id".into(),
1102                        0,
1103                        ResolvedType::Integer,
1104                        crate::Span::default(),
1105                    )),
1106                    op: BinaryOp::Eq,
1107                    right: Box::new(TypedExpr::literal(
1108                        Literal::Number("1".into()),
1109                        ResolvedType::Integer,
1110                        crate::Span::default(),
1111                    )),
1112                },
1113                resolved_type: ResolvedType::Boolean,
1114                span: crate::Span::default(),
1115            }),
1116        );
1117
1118        let mut read_txn = bridge.begin_read().unwrap();
1119        let rows = execute_columnar_scan(&mut read_txn, &table, &scan).unwrap();
1120        assert_eq!(rows.len(), 1);
1121        assert_eq!(rows[0].values[1], SqlValue::Text("alice".into()));
1122    }
1123
1124    #[test]
1125    fn rowid_mode_direct_prefers_rowid_column() {
1126        let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
1127        let mut table = TableMetadata::new(
1128            "items",
1129            vec![
1130                ColumnMetadata::new("row_id", ResolvedType::BigInt),
1131                ColumnMetadata::new("val", ResolvedType::Integer),
1132            ],
1133        )
1134        .with_table_id(20);
1135        table.storage_options.storage_type = crate::catalog::StorageType::Columnar;
1136        table.storage_options.row_id_mode = RowIdMode::Direct;
1137
1138        let schema = alopex_core::columnar::segment_v2::Schema {
1139            columns: vec![
1140                alopex_core::columnar::segment_v2::ColumnSchema {
1141                    name: "row_id".into(),
1142                    logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1143                    nullable: false,
1144                    fixed_len: None,
1145                },
1146                alopex_core::columnar::segment_v2::ColumnSchema {
1147                    name: "val".into(),
1148                    logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1149                    nullable: false,
1150                    fixed_len: None,
1151                },
1152            ],
1153        };
1154        let batch = alopex_core::columnar::segment_v2::RecordBatch::new(
1155            schema.clone(),
1156            vec![
1157                alopex_core::columnar::encoding::Column::Int64(vec![999]),
1158                alopex_core::columnar::encoding::Column::Int64(vec![7]),
1159            ],
1160            vec![None, None],
1161        );
1162        let mut writer =
1163            alopex_core::columnar::segment_v2::SegmentWriterV2::new(Default::default());
1164        writer.write_batch(batch).unwrap();
1165        let segment = writer.finish().unwrap();
1166        let stats = vec![crate::columnar::statistics::compute_row_group_statistics(
1167            &[vec![SqlValue::BigInt(999), SqlValue::Integer(7)]],
1168        )];
1169
1170        persist_segment_for_test(&bridge, table.table_id, &segment, &stats);
1171
1172        let scan = ColumnarScan::new(table.table_id, vec![0, 1], None, None);
1173        let mut read_txn = bridge.begin_read().unwrap();
1174        let rows = execute_columnar_scan(&mut read_txn, &table, &scan).unwrap();
1175        assert_eq!(rows.len(), 1);
1176        assert_eq!(rows[0].row_id, 999);
1177        assert_eq!(rows[0].values[1], SqlValue::Integer(7));
1178    }
1179
1180    #[test]
1181    fn rowid_mode_none_uses_position() {
1182        let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
1183        let mut table = TableMetadata::new(
1184            "items",
1185            vec![ColumnMetadata::new("val", ResolvedType::Integer)],
1186        )
1187        .with_table_id(21);
1188        table.storage_options.storage_type = crate::catalog::StorageType::Columnar;
1189        table.storage_options.row_id_mode = RowIdMode::Direct;
1190
1191        let schema = alopex_core::columnar::segment_v2::Schema {
1192            columns: vec![alopex_core::columnar::segment_v2::ColumnSchema {
1193                name: "val".into(),
1194                logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1195                nullable: false,
1196                fixed_len: None,
1197            }],
1198        };
1199        let batch = alopex_core::columnar::segment_v2::RecordBatch::new(
1200            schema.clone(),
1201            vec![alopex_core::columnar::encoding::Column::Int64(vec![3, 4])],
1202            vec![None],
1203        );
1204        let mut writer =
1205            alopex_core::columnar::segment_v2::SegmentWriterV2::new(Default::default());
1206        writer.write_batch(batch).unwrap();
1207        let segment = writer.finish().unwrap();
1208        let stats = vec![crate::columnar::statistics::compute_row_group_statistics(
1209            &[vec![SqlValue::Integer(3)], vec![SqlValue::Integer(4)]],
1210        )];
1211
1212        persist_segment_for_test(&bridge, table.table_id, &segment, &stats);
1213
1214        let scan = ColumnarScan::new(table.table_id, vec![0], None, None);
1215        let mut read_txn = bridge.begin_read().unwrap();
1216        let rows = execute_columnar_scan(&mut read_txn, &table, &scan).unwrap();
1217        assert_eq!(rows.len(), 2);
1218        assert_eq!(rows[0].row_id, 0);
1219        assert_eq!(rows[1].row_id, 1);
1220    }
1221
1222    fn persist_segment_for_test(
1223        bridge: &TxnBridge<MemoryKV>,
1224        table_id: u32,
1225        segment: &alopex_core::columnar::segment_v2::ColumnSegmentV2,
1226        row_group_stats: &[crate::columnar::statistics::RowGroupStatistics],
1227    ) {
1228        let mut txn = bridge.begin_write().unwrap();
1229        let segment_bytes = alopex_core::storage::format::bincode_config()
1230            .serialize(segment)
1231            .unwrap();
1232        let meta_bytes = alopex_core::storage::format::bincode_config()
1233            .serialize(&segment.meta)
1234            .unwrap();
1235        let stats_bytes = alopex_core::storage::format::bincode_config()
1236            .serialize(row_group_stats)
1237            .unwrap();
1238        txn.inner_mut()
1239            .put(
1240                alopex_core::columnar::kvs_bridge::key_layout::column_segment_key(table_id, 0, 0),
1241                segment_bytes,
1242            )
1243            .unwrap();
1244        txn.inner_mut()
1245            .put(
1246                alopex_core::columnar::kvs_bridge::key_layout::statistics_key(table_id, 0),
1247                meta_bytes,
1248            )
1249            .unwrap();
1250        txn.inner_mut()
1251            .put(
1252                alopex_core::columnar::kvs_bridge::key_layout::row_group_stats_key(table_id, 0),
1253                stats_bytes,
1254            )
1255            .unwrap();
1256        let index_bytes = alopex_core::storage::format::bincode_config()
1257            .serialize(&vec![0u64])
1258            .unwrap();
1259        txn.inner_mut()
1260            .put(
1261                alopex_core::columnar::kvs_bridge::key_layout::segment_index_key(table_id),
1262                index_bytes,
1263            )
1264            .unwrap();
1265        txn.commit().unwrap();
1266    }
1267}