alopex_sql/executor/query/
columnar_scan.rs

1use alopex_core::columnar::encoding::Column;
2use alopex_core::columnar::encoding_v2::Bitmap;
3use alopex_core::columnar::kvs_bridge::key_layout;
4use alopex_core::columnar::segment_v2::{
5    ColumnSegmentV2, InMemorySegmentSource, RecordBatch, SegmentReaderV2,
6};
7use alopex_core::kv::{KVStore, KVTransaction};
8use alopex_core::storage::format::bincode_config;
9use bincode::config::Options;
10
11use crate::ast::expr::BinaryOp;
12use crate::catalog::{ColumnMetadata, RowIdMode, TableMetadata};
13use crate::columnar::statistics::RowGroupStatistics;
14use crate::executor::evaluator::{EvalContext, evaluate};
15use crate::executor::query::iterator::RowIterator;
16use crate::executor::{ExecutorError, Result, Row};
17use crate::planner::typed_expr::{Projection, TypedExpr, TypedExprKind};
18use crate::planner::types::ResolvedType;
19use crate::storage::{SqlTxn, SqlValue};
20use std::collections::BTreeSet;
21
22/// ColumnarScan オペレータ。
23#[derive(Debug, Clone)]
24pub struct ColumnarScan {
25    pub table_id: u32,
26    pub projected_columns: Vec<usize>,
27    pub pushed_filter: Option<PushdownFilter>,
28    pub residual_filter: Option<TypedExpr>,
29}
30
31/// プッシュダウン可能なフィルタ。
32#[derive(Debug, Clone, PartialEq)]
33pub enum PushdownFilter {
34    Eq {
35        column_idx: usize,
36        value: SqlValue,
37    },
38    Range {
39        column_idx: usize,
40        min: Option<SqlValue>,
41        max: Option<SqlValue>,
42    },
43    IsNull {
44        column_idx: usize,
45        is_null: bool,
46    },
47    And(Vec<PushdownFilter>),
48    Or(Vec<PushdownFilter>),
49}
50
51impl ColumnarScan {
52    pub fn new(
53        table_id: u32,
54        projected_columns: Vec<usize>,
55        pushed_filter: Option<PushdownFilter>,
56        residual_filter: Option<TypedExpr>,
57    ) -> Self {
58        Self {
59            table_id,
60            projected_columns,
61            pushed_filter,
62            residual_filter,
63        }
64    }
65
66    /// RowGroup をプルーニングするか判定する。
67    pub fn should_skip_row_group(&self, stats: &RowGroupStatistics) -> bool {
68        match &self.pushed_filter {
69            None => false,
70            Some(filter) => Self::evaluate_pushdown(filter, stats),
71        }
72    }
73
74    /// プッシュダウンフィルタを統計情報で評価する。
75    pub fn evaluate_pushdown(filter: &PushdownFilter, stats: &RowGroupStatistics) -> bool {
76        match filter {
77            PushdownFilter::Eq { column_idx, value } => match stats.columns.get(*column_idx) {
78                Some(col_stats) => {
79                    if col_stats.total_count == 0 {
80                        return true;
81                    }
82                    if matches!(
83                        value.partial_cmp(&col_stats.min),
84                        Some(std::cmp::Ordering::Less)
85                    ) {
86                        return true;
87                    }
88                    matches!(
89                        value.partial_cmp(&col_stats.max),
90                        Some(std::cmp::Ordering::Greater)
91                    )
92                }
93                None => false,
94            },
95
96            PushdownFilter::Range {
97                column_idx,
98                min,
99                max,
100            } => match stats.columns.get(*column_idx) {
101                Some(col_stats) => {
102                    if col_stats.total_count == 0 {
103                        return true;
104                    }
105                    if let Some(filter_min) = min
106                        && matches!(
107                            col_stats.max.partial_cmp(filter_min),
108                            Some(std::cmp::Ordering::Less)
109                        )
110                    {
111                        return true;
112                    }
113                    if let Some(filter_max) = max
114                        && matches!(
115                            col_stats.min.partial_cmp(filter_max),
116                            Some(std::cmp::Ordering::Greater)
117                        )
118                    {
119                        return true;
120                    }
121                    false
122                }
123                None => false,
124            },
125
126            PushdownFilter::IsNull {
127                column_idx,
128                is_null,
129            } => match stats.columns.get(*column_idx) {
130                Some(col_stats) => {
131                    if *is_null {
132                        col_stats.null_count == 0
133                    } else {
134                        col_stats.null_count == col_stats.total_count
135                    }
136                }
137                None => false,
138            },
139
140            PushdownFilter::And(filters) => {
141                if filters.is_empty() {
142                    return false;
143                }
144                filters.iter().any(|f| Self::evaluate_pushdown(f, stats))
145            }
146
147            PushdownFilter::Or(filters) => {
148                if filters.is_empty() {
149                    return false;
150                }
151                filters.iter().all(|f| Self::evaluate_pushdown(f, stats))
152            }
153        }
154    }
155}
156
157// ============================================================================
158// ColumnarScanIterator - FR-7 Streaming Iterator for Columnar Storage
159// ============================================================================
160
161/// Pre-loaded segment data for streaming iteration.
162struct LoadedSegment {
163    /// Segment reader for reading row groups.
164    reader: SegmentReaderV2,
165    /// Row group statistics for pruning (if available).
166    row_group_stats: Option<Vec<RowGroupStatistics>>,
167    /// Row IDs for RowIdMode::Direct.
168    row_ids: Vec<u64>,
169    /// Row group metadata for row ID slicing.
170    row_groups: Vec<alopex_core::columnar::segment_v2::RowGroupMeta>,
171}
172
173/// Streaming iterator for columnar storage (FR-7 compliant).
174///
175/// This iterator yields rows one at a time from columnar storage,
176/// avoiding the need to materialize all rows into a `Vec<Row>` upfront.
177/// Segments are pre-loaded during construction, but row conversion is
178/// performed lazily as rows are requested.
179pub struct ColumnarScanIterator {
180    /// Pre-loaded segments.
181    segments: Vec<LoadedSegment>,
182    /// Current segment index.
183    segment_idx: usize,
184    /// Current row group index within the segment.
185    row_group_idx: usize,
186    /// Current row index within the batch.
187    row_idx: usize,
188    /// Current loaded RecordBatch (lazy loaded per row group).
189    current_batch: Option<RecordBatch>,
190    /// Projected column indices.
191    projected: Vec<usize>,
192    /// Table metadata.
193    table_meta: TableMetadata,
194    /// Schema for RowIterator trait.
195    schema: Vec<ColumnMetadata>,
196    /// ColumnarScan operator for filter evaluation.
197    scan: ColumnarScan,
198    /// RowID column index for RowIdMode::Direct.
199    row_id_col_idx: Option<usize>,
200    /// Next synthetic row ID (for RowIdMode::None).
201    next_row_id: u64,
202}
203
204impl ColumnarScanIterator {
205    /// Advances to the next valid row, loading batches as needed.
206    ///
207    /// Returns `Some(Ok(row))` for valid rows, `Some(Err(_))` for errors,
208    /// or `None` when all rows have been consumed.
209    fn advance(&mut self) -> Option<Result<Row>> {
210        loop {
211            // Check if we need to load a new batch
212            if self.current_batch.is_none() && !self.load_next_batch() {
213                return None; // No more batches
214            }
215
216            // Get row count to check if exhausted
217            let row_count = match &self.current_batch {
218                Some(batch) => batch.num_rows(),
219                None => continue,
220            };
221
222            // Check if we've exhausted the current batch
223            if self.row_idx >= row_count {
224                self.current_batch = None;
225                self.row_idx = 0;
226                self.row_group_idx += 1;
227                continue;
228            }
229
230            // Convert current row (take batch temporarily to avoid borrow conflict)
231            let row_idx = self.row_idx;
232            match self.convert_current_row(row_idx) {
233                Ok(Some(row)) => {
234                    self.row_idx += 1;
235                    return Some(Ok(row));
236                }
237                Ok(None) => {
238                    // Row filtered out by residual filter
239                    self.row_idx += 1;
240                    continue;
241                }
242                Err(e) => {
243                    self.row_idx += 1;
244                    return Some(Err(e));
245                }
246            }
247        }
248    }
249
250    /// Loads the next row group batch, advancing segments as needed.
251    ///
252    /// Returns `true` if a batch was successfully loaded, `false` if no more data.
253    fn load_next_batch(&mut self) -> bool {
254        while self.segment_idx < self.segments.len() {
255            let segment = &self.segments[self.segment_idx];
256            let row_group_count = segment.row_groups.len();
257
258            while self.row_group_idx < row_group_count {
259                // Check if this row group should be skipped via pushdown
260                let should_skip = match segment.row_group_stats.as_ref() {
261                    Some(stats) if stats.len() == row_group_count => {
262                        self.scan.should_skip_row_group(&stats[self.row_group_idx])
263                    }
264                    _ => false,
265                };
266
267                if should_skip {
268                    self.row_group_idx += 1;
269                    continue;
270                }
271
272                // Load the batch
273                match segment
274                    .reader
275                    .read_row_group_by_index(&self.projected, self.row_group_idx)
276                {
277                    Ok(mut batch) => {
278                        // Attach row IDs if available
279                        if !segment.row_ids.is_empty()
280                            && let Some(meta) = segment.row_groups.get(self.row_group_idx)
281                        {
282                            let start = meta.row_start as usize;
283                            let end = start + meta.row_count as usize;
284                            if end <= segment.row_ids.len() {
285                                batch =
286                                    batch.with_row_ids(Some(segment.row_ids[start..end].to_vec()));
287                            }
288                        }
289                        self.current_batch = Some(batch);
290                        self.row_idx = 0;
291                        return true;
292                    }
293                    Err(_) => {
294                        // Skip this row group on error
295                        self.row_group_idx += 1;
296                        continue;
297                    }
298                }
299            }
300
301            // Move to next segment
302            self.segment_idx += 1;
303            self.row_group_idx = 0;
304        }
305
306        false
307    }
308
309    /// Converts a row from the current batch, applying residual filter.
310    ///
311    /// Returns `Ok(Some(row))` if row passes filter, `Ok(None)` if filtered out.
312    fn convert_current_row(&mut self, row_idx: usize) -> Result<Option<Row>> {
313        let batch = self
314            .current_batch
315            .as_ref()
316            .ok_or_else(|| ExecutorError::Columnar("no current batch".into()))?;
317
318        let column_count = self.table_meta.column_count();
319        let mut values = vec![SqlValue::Null; column_count];
320
321        for (pos, &table_col_idx) in self.projected.iter().enumerate() {
322            let column = batch
323                .columns
324                .get(pos)
325                .ok_or_else(|| ExecutorError::Columnar("missing projected column".into()))?;
326            let bitmap = batch.null_bitmaps.get(pos).and_then(|b| b.as_ref());
327            let col_meta = self
328                .table_meta
329                .columns
330                .get(table_col_idx)
331                .ok_or_else(|| ExecutorError::Columnar("column index out of bounds".into()))?;
332            let value = value_from_column(column, bitmap, row_idx, &col_meta.data_type)?;
333            values[table_col_idx] = value;
334        }
335
336        // Apply residual filter
337        if let Some(predicate) = self.scan.residual_filter.as_ref() {
338            let ctx = EvalContext::new(&values);
339            let keep = matches!(evaluate(predicate, &ctx)?, SqlValue::Boolean(true));
340            if !keep {
341                return Ok(None);
342            }
343        }
344
345        // Determine row ID - need to access batch again for row_ids
346        let batch = self
347            .current_batch
348            .as_ref()
349            .ok_or_else(|| ExecutorError::Columnar("no current batch".into()))?;
350
351        let row_id = match self.table_meta.storage_options.row_id_mode {
352            RowIdMode::Direct => {
353                if let Some(row_ids) = batch.row_ids.as_ref() {
354                    *row_ids.get(row_idx).ok_or_else(|| {
355                        ExecutorError::Columnar(
356                            "row_id missing for row in row_id_mode=direct".into(),
357                        )
358                    })?
359                } else if let Some(idx) = self.row_id_col_idx {
360                    let val = values.get(idx).ok_or_else(|| {
361                        ExecutorError::Columnar("row_id column missing in projected values".into())
362                    })?;
363                    match val {
364                        SqlValue::Integer(v) if *v >= 0 => *v as u64,
365                        SqlValue::BigInt(v) if *v >= 0 => *v as u64,
366                        other => {
367                            return Err(ExecutorError::Columnar(format!(
368                                "row_id column must be non-negative integer, got {}",
369                                other.type_name()
370                            )));
371                        }
372                    }
373                } else {
374                    let rid = self.next_row_id;
375                    self.next_row_id = self.next_row_id.saturating_add(1);
376                    rid
377                }
378            }
379            RowIdMode::None => {
380                let rid = self.next_row_id;
381                self.next_row_id = self.next_row_id.saturating_add(1);
382                rid
383            }
384        };
385
386        Ok(Some(Row::new(row_id, values)))
387    }
388}
389
390impl RowIterator for ColumnarScanIterator {
391    fn next_row(&mut self) -> Option<Result<Row>> {
392        self.advance()
393    }
394
395    fn schema(&self) -> &[ColumnMetadata] {
396        &self.schema
397    }
398}
399
400/// Create a streaming columnar scan iterator (FR-7 compliant).
401///
402/// This function pre-loads segment data during construction but yields rows
403/// one at a time during iteration, avoiding full materialization of all rows.
404///
405/// # Arguments
406///
407/// * `txn` - Transaction for loading segment data
408/// * `table_meta` - Table metadata
409/// * `scan` - ColumnarScan operator with projection and filters
410///
411/// # Returns
412///
413/// A `ColumnarScanIterator` that implements `RowIterator`.
414pub fn create_columnar_scan_iterator<'txn, S: KVStore + 'txn>(
415    txn: &mut impl SqlTxn<'txn, S>,
416    table_meta: &TableMetadata,
417    scan: &ColumnarScan,
418) -> Result<ColumnarScanIterator> {
419    debug_assert_eq!(scan.table_id, table_meta.table_id);
420
421    let projected: Vec<usize> = if scan.projected_columns.is_empty() {
422        (0..table_meta.columns.len()).collect()
423    } else {
424        scan.projected_columns.clone()
425    };
426
427    let segment_ids = load_segment_index(txn, table_meta.table_id)?;
428
429    let row_id_col_idx = if table_meta.storage_options.row_id_mode == RowIdMode::Direct {
430        table_meta
431            .columns
432            .iter()
433            .position(|c| c.name.eq_ignore_ascii_case("row_id"))
434    } else {
435        None
436    };
437
438    // Pre-load all segments
439    let mut segments = Vec::with_capacity(segment_ids.len());
440    for segment_id in segment_ids {
441        let segment = load_segment(txn, table_meta.table_id, segment_id)?;
442        let reader =
443            SegmentReaderV2::open(Box::new(InMemorySegmentSource::new(segment.data.clone())))
444                .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
445        let row_group_stats = load_row_group_stats(txn, table_meta.table_id, segment_id);
446
447        segments.push(LoadedSegment {
448            reader,
449            row_group_stats,
450            row_ids: segment.row_ids.clone(),
451            row_groups: segment.meta.row_groups.clone(),
452        });
453    }
454
455    Ok(ColumnarScanIterator {
456        segments,
457        segment_idx: 0,
458        row_group_idx: 0,
459        row_idx: 0,
460        current_batch: None,
461        projected,
462        schema: table_meta.columns.clone(),
463        table_meta: table_meta.clone(),
464        scan: scan.clone(),
465        row_id_col_idx,
466        next_row_id: 0,
467    })
468}
469
470/// ColumnarScan を実行する。
471pub fn execute_columnar_scan<'txn, S: KVStore + 'txn>(
472    txn: &mut impl SqlTxn<'txn, S>,
473    table_meta: &TableMetadata,
474    scan: &ColumnarScan,
475) -> Result<Vec<Row>> {
476    debug_assert_eq!(scan.table_id, table_meta.table_id);
477    let projected: Vec<usize> = if scan.projected_columns.is_empty() {
478        (0..table_meta.columns.len()).collect()
479    } else {
480        scan.projected_columns.clone()
481    };
482
483    let segment_ids = load_segment_index(txn, table_meta.table_id)?;
484    if segment_ids.is_empty() {
485        return Ok(Vec::new());
486    }
487
488    let row_id_col_idx = if table_meta.storage_options.row_id_mode == RowIdMode::Direct {
489        table_meta
490            .columns
491            .iter()
492            .position(|c| c.name.eq_ignore_ascii_case("row_id"))
493    } else {
494        None
495    };
496
497    let mut results = Vec::new();
498    let mut next_row_id = 0u64;
499    for segment_id in segment_ids {
500        let segment = load_segment(txn, table_meta.table_id, segment_id)?;
501        let reader =
502            SegmentReaderV2::open(Box::new(InMemorySegmentSource::new(segment.data.clone())))
503                .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
504
505        let row_group_stats = load_row_group_stats(txn, table_meta.table_id, segment_id);
506        let row_group_count = segment.meta.row_groups.len();
507        for rg_index in 0..row_group_count {
508            let should_skip = match row_group_stats.as_ref() {
509                Some(stats) if stats.len() == row_group_count => {
510                    scan.should_skip_row_group(&stats[rg_index])
511                }
512                _ => false,
513            };
514            if should_skip {
515                continue;
516            }
517
518            let batch = reader
519                .read_row_group_by_index(&projected, rg_index)
520                .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
521            let batch = if !segment.row_ids.is_empty() {
522                if let Some(meta) = segment.meta.row_groups.get(rg_index) {
523                    let start = meta.row_start as usize;
524                    let end = start + meta.row_count as usize;
525                    if end <= segment.row_ids.len() {
526                        batch.with_row_ids(Some(segment.row_ids[start..end].to_vec()))
527                    } else {
528                        batch
529                    }
530                } else {
531                    batch
532                }
533            } else {
534                batch
535            };
536            append_rows_from_batch(
537                &mut results,
538                &batch,
539                table_meta,
540                &projected,
541                scan.residual_filter.as_ref(),
542                table_meta.storage_options.row_id_mode,
543                row_id_col_idx,
544                &mut next_row_id,
545            )?;
546        }
547    }
548
549    Ok(results)
550}
551
552/// ColumnarScan を実行し、フィルタ後の RowID のみを返す。
553///
554/// RowIdMode::Direct で columnar ストレージの場合、行本体の読み込みを避けて
555/// RowID 再フェッチ用の候補セットを得る目的で使用する。
556pub fn execute_columnar_row_ids<'txn, S: KVStore + 'txn>(
557    txn: &mut impl SqlTxn<'txn, S>,
558    table_meta: &TableMetadata,
559    scan: &ColumnarScan,
560) -> Result<Vec<u64>> {
561    if table_meta.storage_options.storage_type != crate::catalog::StorageType::Columnar {
562        return Err(ExecutorError::Columnar(
563            "execute_columnar_row_ids requires columnar storage".into(),
564        ));
565    }
566
567    let mut needed: BTreeSet<usize> = scan.projected_columns.iter().copied().collect();
568    if let Some(pred) = &scan.residual_filter {
569        collect_column_indices(pred, &mut needed);
570    }
571    let projected: Vec<usize> = needed.into_iter().collect();
572
573    let segment_ids = load_segment_index(txn, table_meta.table_id)?;
574    if segment_ids.is_empty() {
575        return Ok(Vec::new());
576    }
577
578    let row_id_col_idx = if table_meta.storage_options.row_id_mode == RowIdMode::Direct {
579        table_meta
580            .columns
581            .iter()
582            .position(|c| c.name.eq_ignore_ascii_case("row_id"))
583    } else {
584        None
585    };
586
587    let mut results = Vec::new();
588    let mut next_row_id = 0u64;
589    for segment_id in segment_ids {
590        let segment = load_segment(txn, table_meta.table_id, segment_id)?;
591        let reader =
592            SegmentReaderV2::open(Box::new(InMemorySegmentSource::new(segment.data.clone())))
593                .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
594
595        let row_group_stats = load_row_group_stats(txn, table_meta.table_id, segment_id);
596        let row_group_count = segment.meta.row_groups.len();
597        for rg_index in 0..row_group_count {
598            let should_skip = match row_group_stats.as_ref() {
599                Some(stats) if stats.len() == row_group_count => {
600                    scan.should_skip_row_group(&stats[rg_index])
601                }
602                _ => false,
603            };
604            if should_skip {
605                continue;
606            }
607
608            let batch = reader
609                .read_row_group_by_index(&projected, rg_index)
610                .map_err(|e| ExecutorError::Columnar(e.to_string()))?;
611            let batch = if !segment.row_ids.is_empty() {
612                if let Some(meta) = segment.meta.row_groups.get(rg_index) {
613                    let start = meta.row_start as usize;
614                    let end = start + meta.row_count as usize;
615                    if end <= segment.row_ids.len() {
616                        batch.with_row_ids(Some(segment.row_ids[start..end].to_vec()))
617                    } else {
618                        batch
619                    }
620                } else {
621                    batch
622                }
623            } else {
624                batch
625            };
626
627            let row_count = batch.num_rows();
628            for row_idx in 0..row_count {
629                // 残余フィルタの評価に必要なカラムだけ値を復元する。
630                let mut values = vec![SqlValue::Null; table_meta.column_count()];
631                for (pos, &table_col_idx) in projected.iter().enumerate() {
632                    let column = batch.columns.get(pos).ok_or_else(|| {
633                        ExecutorError::Columnar("missing projected column".into())
634                    })?;
635                    let bitmap = batch.null_bitmaps.get(pos).and_then(|b| b.as_ref());
636                    let value = value_from_column(
637                        column,
638                        bitmap,
639                        row_idx,
640                        &table_meta
641                            .columns
642                            .get(table_col_idx)
643                            .ok_or_else(|| {
644                                ExecutorError::Columnar("column index out of bounds".into())
645                            })?
646                            .data_type,
647                    )?;
648                    values[table_col_idx] = value;
649                }
650
651                if let Some(predicate) = scan.residual_filter.as_ref() {
652                    let ctx = EvalContext::new(&values);
653                    let keep = matches!(evaluate(predicate, &ctx)?, SqlValue::Boolean(true));
654                    if !keep {
655                        continue;
656                    }
657                }
658
659                let row_id = match table_meta.storage_options.row_id_mode {
660                    RowIdMode::Direct => {
661                        if let Some(row_ids) = batch.row_ids.as_ref() {
662                            *row_ids.get(row_idx).ok_or_else(|| {
663                                ExecutorError::Columnar(
664                                    "row_id missing for row in row_id_mode=direct".into(),
665                                )
666                            })?
667                        } else if let Some(idx) = row_id_col_idx {
668                            let val = values.get(idx).ok_or_else(|| {
669                                ExecutorError::Columnar(
670                                    "row_id column missing in projected values".into(),
671                                )
672                            })?;
673                            match val {
674                                SqlValue::Integer(v) if *v >= 0 => *v as u64,
675                                SqlValue::BigInt(v) if *v >= 0 => *v as u64,
676                                other => {
677                                    return Err(ExecutorError::Columnar(format!(
678                                        "row_id column must be non-negative integer, got {}",
679                                        other.type_name()
680                                    )));
681                                }
682                            }
683                        } else {
684                            let rid = next_row_id;
685                            next_row_id = next_row_id.saturating_add(1);
686                            rid
687                        }
688                    }
689                    RowIdMode::None => {
690                        let rid = next_row_id;
691                        next_row_id = next_row_id.saturating_add(1);
692                        rid
693                    }
694                };
695                results.push(row_id);
696            }
697        }
698    }
699
700    Ok(results)
701}
702
703/// TypedExpr から PushdownFilter へ変換する(変換不可なら None)。
704pub fn expr_to_pushdown(expr: &TypedExpr) -> Option<PushdownFilter> {
705    match &expr.kind {
706        TypedExprKind::BinaryOp { left, op, right } => match op {
707            BinaryOp::And => {
708                let l = expr_to_pushdown(left)?;
709                let r = expr_to_pushdown(right)?;
710                Some(PushdownFilter::And(vec![l, r]))
711            }
712            BinaryOp::Or => {
713                let l = expr_to_pushdown(left)?;
714                let r = expr_to_pushdown(right)?;
715                Some(PushdownFilter::Or(vec![l, r]))
716            }
717            BinaryOp::Eq => extract_eq(left, right),
718            BinaryOp::Lt | BinaryOp::LtEq | BinaryOp::Gt | BinaryOp::GtEq => {
719                extract_range(op, left, right)
720            }
721            _ => None,
722        },
723        TypedExprKind::Between {
724            expr,
725            low,
726            high,
727            negated,
728        } => {
729            if *negated {
730                return None;
731            }
732            let (column_idx, value_min, value_max) = match expr.kind {
733                TypedExprKind::ColumnRef { column_index, .. } => {
734                    let low_v = literal_value(low)?;
735                    let high_v = literal_value(high)?;
736                    (column_index, low_v, high_v)
737                }
738                _ => return None,
739            };
740            Some(PushdownFilter::Range {
741                column_idx,
742                min: Some(value_min),
743                max: Some(value_max),
744            })
745        }
746        TypedExprKind::IsNull { expr, negated } => match expr.kind {
747            TypedExprKind::ColumnRef { column_index, .. } => Some(PushdownFilter::IsNull {
748                column_idx: column_index,
749                is_null: !negated,
750            }),
751            _ => None,
752        },
753        _ => None,
754    }
755}
756
757fn extract_eq(left: &TypedExpr, right: &TypedExpr) -> Option<PushdownFilter> {
758    if let Some((col_idx, value)) = extract_column_literal(left, right) {
759        return Some(PushdownFilter::Eq {
760            column_idx: col_idx,
761            value,
762        });
763    }
764    if let Some((col_idx, value)) = extract_column_literal(right, left) {
765        return Some(PushdownFilter::Eq {
766            column_idx: col_idx,
767            value,
768        });
769    }
770    None
771}
772
773fn extract_range(op: &BinaryOp, left: &TypedExpr, right: &TypedExpr) -> Option<PushdownFilter> {
774    match (
775        extract_column_literal(left, right),
776        extract_column_literal(right, left),
777    ) {
778        (Some((col_idx, value)), _) => match op {
779            BinaryOp::Lt | BinaryOp::LtEq => Some(PushdownFilter::Range {
780                column_idx: col_idx,
781                min: None,
782                max: Some(value),
783            }),
784            BinaryOp::Gt | BinaryOp::GtEq => Some(PushdownFilter::Range {
785                column_idx: col_idx,
786                min: Some(value),
787                max: None,
788            }),
789            _ => None,
790        },
791        (_, Some((col_idx, value))) => match op {
792            BinaryOp::Lt | BinaryOp::LtEq => Some(PushdownFilter::Range {
793                column_idx: col_idx,
794                min: Some(value),
795                max: None,
796            }),
797            BinaryOp::Gt | BinaryOp::GtEq => Some(PushdownFilter::Range {
798                column_idx: col_idx,
799                min: None,
800                max: Some(value),
801            }),
802            _ => None,
803        },
804        _ => None,
805    }
806}
807
808fn extract_column_literal(
809    column_expr: &TypedExpr,
810    literal_expr: &TypedExpr,
811) -> Option<(usize, SqlValue)> {
812    match column_expr.kind {
813        TypedExprKind::ColumnRef { column_index, .. } => {
814            let value = literal_value(literal_expr)?;
815            Some((column_index, value))
816        }
817        _ => None,
818    }
819}
820
821fn literal_value(expr: &TypedExpr) -> Option<SqlValue> {
822    match &expr.kind {
823        TypedExprKind::Literal(_) | TypedExprKind::VectorLiteral(_) => {
824            evaluate(expr, &EvalContext::new(&[])).ok()
825        }
826        _ => None,
827    }
828}
829
830/// projection 情報からカラムインデックスを推定する(現状は全カラム)。
831pub fn projection_to_columns(projection: &Projection, table_meta: &TableMetadata) -> Vec<usize> {
832    match projection {
833        Projection::All(names) => names
834            .iter()
835            .filter_map(|name| table_meta.columns.iter().position(|c| &c.name == name))
836            .collect(),
837        Projection::Columns(cols) => {
838            let mut indices = BTreeSet::new();
839            for col in cols {
840                collect_column_indices(&col.expr, &mut indices);
841            }
842            if indices.is_empty() {
843                return (0..table_meta.columns.len()).collect();
844            }
845            indices
846                .into_iter()
847                .filter(|idx| *idx < table_meta.columns.len())
848                .collect()
849        }
850    }
851}
852
853/// フィルタと Projection を ColumnarScan にまとめるユーティリティ。
854pub fn build_columnar_scan_for_filter(
855    table_meta: &TableMetadata,
856    projection: Projection,
857    predicate: &TypedExpr,
858) -> ColumnarScan {
859    let mut projected_columns = projection_to_columns(&projection, table_meta);
860    let mut predicate_indices = BTreeSet::new();
861    collect_column_indices(predicate, &mut predicate_indices);
862    for idx in predicate_indices {
863        if !projected_columns.contains(&idx) {
864            projected_columns.push(idx);
865        }
866    }
867    projected_columns.sort_unstable();
868    let pushed_filter = expr_to_pushdown(predicate);
869    ColumnarScan::new(
870        table_meta.table_id,
871        projected_columns,
872        pushed_filter,
873        Some(predicate.clone()),
874    )
875}
876
877/// Projection だけを指定して ColumnarScan を構築する。
878pub fn build_columnar_scan(table_meta: &TableMetadata, projection: &Projection) -> ColumnarScan {
879    let projected_columns = projection_to_columns(projection, table_meta);
880    ColumnarScan::new(table_meta.table_id, projected_columns, None, None)
881}
882
883/// 式中に現れるカラムインデックスを収集する。
884fn collect_column_indices(expr: &TypedExpr, acc: &mut BTreeSet<usize>) {
885    match &expr.kind {
886        TypedExprKind::ColumnRef { column_index, .. } => {
887            acc.insert(*column_index);
888        }
889        TypedExprKind::BinaryOp { left, right, .. } => {
890            collect_column_indices(left, acc);
891            collect_column_indices(right, acc);
892        }
893        TypedExprKind::UnaryOp { operand, .. } => collect_column_indices(operand, acc),
894        TypedExprKind::Between {
895            expr, low, high, ..
896        } => {
897            collect_column_indices(expr, acc);
898            collect_column_indices(low, acc);
899            collect_column_indices(high, acc);
900        }
901        TypedExprKind::InList { expr, list, .. } => {
902            collect_column_indices(expr, acc);
903            for item in list {
904                collect_column_indices(item, acc);
905            }
906        }
907        TypedExprKind::IsNull { expr, .. } => collect_column_indices(expr, acc),
908        TypedExprKind::FunctionCall { args, .. } => {
909            for arg in args {
910                collect_column_indices(arg, acc);
911            }
912        }
913        _ => {}
914    }
915}
916
917fn load_segment_index<'txn, S: KVStore + 'txn>(
918    txn: &mut impl SqlTxn<'txn, S>,
919    table_id: u32,
920) -> Result<Vec<u64>> {
921    let key = key_layout::segment_index_key(table_id);
922    let bytes = txn.inner_mut().get(&key)?;
923    if let Some(raw) = bytes {
924        bincode_config()
925            .deserialize(&raw)
926            .map_err(|e| ExecutorError::Columnar(e.to_string()))
927    } else {
928        Ok(Vec::new())
929    }
930}
931
932fn load_segment<'txn, S: KVStore + 'txn>(
933    txn: &mut impl SqlTxn<'txn, S>,
934    table_id: u32,
935    segment_id: u64,
936) -> Result<ColumnSegmentV2> {
937    let key = key_layout::column_segment_key(table_id, segment_id, 0);
938    let bytes = txn
939        .inner_mut()
940        .get(&key)?
941        .ok_or_else(|| ExecutorError::Columnar(format!("segment {segment_id} missing")))?;
942    bincode_config()
943        .deserialize(&bytes)
944        .map_err(|e| ExecutorError::Columnar(e.to_string()))
945}
946
947fn load_row_group_stats<'txn, S: KVStore + 'txn>(
948    txn: &mut impl SqlTxn<'txn, S>,
949    table_id: u32,
950    segment_id: u64,
951) -> Option<Vec<RowGroupStatistics>> {
952    let key = key_layout::row_group_stats_key(table_id, segment_id);
953    match txn.inner_mut().get(&key) {
954        Ok(Some(bytes)) => bincode_config().deserialize(&bytes).ok(),
955        Ok(None) => None,
956        Err(_) => None,
957    }
958}
959
960#[allow(clippy::too_many_arguments)]
961fn append_rows_from_batch(
962    out: &mut Vec<Row>,
963    batch: &alopex_core::columnar::segment_v2::RecordBatch,
964    table_meta: &TableMetadata,
965    projected: &[usize],
966    residual_filter: Option<&TypedExpr>,
967    row_id_mode: RowIdMode,
968    row_id_col_idx: Option<usize>,
969    next_row_id: &mut u64,
970) -> Result<()> {
971    if batch.columns.len() != projected.len() {
972        return Err(ExecutorError::Columnar(format!(
973            "projected column count mismatch: requested {}, got {}",
974            projected.len(),
975            batch.columns.len()
976        )));
977    }
978
979    let row_count = batch.num_rows();
980    for row_idx in 0..row_count {
981        let mut values = vec![SqlValue::Null; table_meta.column_count()];
982        for (pos, &table_col_idx) in projected.iter().enumerate() {
983            let column = batch
984                .columns
985                .get(pos)
986                .ok_or_else(|| ExecutorError::Columnar("missing projected column".into()))?;
987            let bitmap = batch.null_bitmaps.get(pos).and_then(|b| b.as_ref());
988            let value = value_from_column(
989                column,
990                bitmap,
991                row_idx,
992                &table_meta
993                    .columns
994                    .get(table_col_idx)
995                    .ok_or_else(|| ExecutorError::Columnar("column index out of bounds".into()))?
996                    .data_type,
997            )?;
998            values[table_col_idx] = value;
999        }
1000
1001        if let Some(predicate) = residual_filter {
1002            let ctx = EvalContext::new(&values);
1003            let keep = matches!(evaluate(predicate, &ctx)?, SqlValue::Boolean(true));
1004            if !keep {
1005                continue;
1006            }
1007        }
1008
1009        let row_id = match row_id_mode {
1010            RowIdMode::Direct => {
1011                if let Some(row_ids) = batch.row_ids.as_ref() {
1012                    *row_ids.get(row_idx).ok_or_else(|| {
1013                        ExecutorError::Columnar(
1014                            "row_id missing for row in row_id_mode=direct".into(),
1015                        )
1016                    })?
1017                } else if let Some(idx) = row_id_col_idx {
1018                    let val = values.get(idx).ok_or_else(|| {
1019                        ExecutorError::Columnar("row_id column missing in projected values".into())
1020                    })?;
1021                    match val {
1022                        SqlValue::Integer(v) if *v >= 0 => *v as u64,
1023                        SqlValue::BigInt(v) if *v >= 0 => *v as u64,
1024                        other => {
1025                            return Err(ExecutorError::Columnar(format!(
1026                                "row_id column must be non-negative integer, got {}",
1027                                other.type_name()
1028                            )));
1029                        }
1030                    }
1031                } else {
1032                    let rid = *next_row_id;
1033                    *next_row_id = next_row_id.saturating_add(1);
1034                    rid
1035                }
1036            }
1037            RowIdMode::None => {
1038                let rid = *next_row_id;
1039                *next_row_id = next_row_id.saturating_add(1);
1040                rid
1041            }
1042        };
1043        out.push(Row::new(row_id, values));
1044    }
1045
1046    Ok(())
1047}
1048
1049fn value_from_column(
1050    column: &Column,
1051    bitmap: Option<&Bitmap>,
1052    row_idx: usize,
1053    ty: &ResolvedType,
1054) -> Result<SqlValue> {
1055    if let Some(bm) = bitmap
1056        && !bm.get(row_idx)
1057    {
1058        return Ok(SqlValue::Null);
1059    }
1060
1061    match (ty, column) {
1062        (ResolvedType::Integer, Column::Int64(values)) => {
1063            let v = *values
1064                .get(row_idx)
1065                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1066            Ok(SqlValue::Integer(v as i32))
1067        }
1068        (ResolvedType::BigInt | ResolvedType::Timestamp, Column::Int64(values)) => {
1069            let v = *values
1070                .get(row_idx)
1071                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1072            if matches!(ty, ResolvedType::Timestamp) {
1073                Ok(SqlValue::Timestamp(v))
1074            } else {
1075                Ok(SqlValue::BigInt(v))
1076            }
1077        }
1078        (ResolvedType::Float, Column::Float32(values)) => {
1079            let v = *values
1080                .get(row_idx)
1081                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1082            Ok(SqlValue::Float(v))
1083        }
1084        (ResolvedType::Double, Column::Float64(values)) => {
1085            let v = *values
1086                .get(row_idx)
1087                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1088            Ok(SqlValue::Double(v))
1089        }
1090        (ResolvedType::Boolean, Column::Bool(values)) => {
1091            let v = *values
1092                .get(row_idx)
1093                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1094            Ok(SqlValue::Boolean(v))
1095        }
1096        (ResolvedType::Text, Column::Binary(values)) => {
1097            let raw = values
1098                .get(row_idx)
1099                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1100            String::from_utf8(raw.clone())
1101                .map(SqlValue::Text)
1102                .map_err(|e| ExecutorError::Columnar(e.to_string()))
1103        }
1104        (ResolvedType::Blob, Column::Binary(values)) => {
1105            let raw = values
1106                .get(row_idx)
1107                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1108            Ok(SqlValue::Blob(raw.clone()))
1109        }
1110        (ResolvedType::Vector { .. }, Column::Fixed { values, .. }) => {
1111            let raw = values
1112                .get(row_idx)
1113                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1114            if raw.len() % 4 != 0 {
1115                return Err(ExecutorError::Columnar(
1116                    "invalid vector byte length in columnar segment".into(),
1117                ));
1118            }
1119            let floats: Vec<f32> = raw
1120                .chunks_exact(4)
1121                .map(|bytes| f32::from_le_bytes(bytes.try_into().unwrap()))
1122                .collect();
1123            Ok(SqlValue::Vector(floats))
1124        }
1125        (_, Column::Binary(values)) => {
1126            let raw = values
1127                .get(row_idx)
1128                .ok_or_else(|| ExecutorError::Columnar("row index out of bounds".into()))?;
1129            Ok(SqlValue::Blob(raw.clone()))
1130        }
1131        _ => Err(ExecutorError::Columnar(
1132            "unsupported column type for columnar read".into(),
1133        )),
1134    }
1135}
1136#[cfg(test)]
1137mod tests {
1138    use super::*;
1139    use crate::ast::expr::Literal;
1140    use crate::catalog::{ColumnMetadata, RowIdMode, TableMetadata};
1141    use crate::columnar::statistics::ColumnStatistics;
1142    use crate::planner::typed_expr::TypedExpr;
1143    use crate::planner::typed_expr::TypedExprKind;
1144    use crate::planner::types::ResolvedType;
1145    use crate::storage::TxnBridge;
1146    use alopex_core::kv::memory::MemoryKV;
1147    use bincode::config::Options;
1148    use std::sync::Arc;
1149
1150    #[test]
1151    fn evaluate_pushdown_eq_prunes_out_of_range() {
1152        let stats = RowGroupStatistics {
1153            row_count: 3,
1154            columns: vec![ColumnStatistics {
1155                min: SqlValue::Integer(1),
1156                max: SqlValue::Integer(3),
1157                null_count: 0,
1158                total_count: 3,
1159                distinct_count: None,
1160            }],
1161            row_id_min: None,
1162            row_id_max: None,
1163        };
1164        let filter = PushdownFilter::Eq {
1165            column_idx: 0,
1166            value: SqlValue::Integer(10),
1167        };
1168        assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
1169    }
1170
1171    #[test]
1172    fn evaluate_pushdown_range_allows_overlap() {
1173        let stats = RowGroupStatistics {
1174            row_count: 3,
1175            columns: vec![ColumnStatistics {
1176                min: SqlValue::Integer(5),
1177                max: SqlValue::Integer(10),
1178                null_count: 0,
1179                total_count: 3,
1180                distinct_count: None,
1181            }],
1182            row_id_min: None,
1183            row_id_max: None,
1184        };
1185        let filter = PushdownFilter::Range {
1186            column_idx: 0,
1187            min: Some(SqlValue::Integer(8)),
1188            max: Some(SqlValue::Integer(12)),
1189        };
1190        assert!(!ColumnarScan::evaluate_pushdown(&filter, &stats));
1191    }
1192
1193    #[test]
1194    fn evaluate_pushdown_is_null_skips_when_no_nulls() {
1195        let stats = RowGroupStatistics {
1196            row_count: 2,
1197            columns: vec![ColumnStatistics {
1198                min: SqlValue::Integer(1),
1199                max: SqlValue::Integer(2),
1200                null_count: 0,
1201                total_count: 2,
1202                distinct_count: None,
1203            }],
1204            row_id_min: None,
1205            row_id_max: None,
1206        };
1207        let filter = PushdownFilter::IsNull {
1208            column_idx: 0,
1209            is_null: true,
1210        };
1211        assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
1212    }
1213
1214    #[test]
1215    fn evaluate_pushdown_is_not_null_skips_when_all_null() {
1216        let stats = RowGroupStatistics {
1217            row_count: 2,
1218            columns: vec![ColumnStatistics {
1219                min: SqlValue::Null,
1220                max: SqlValue::Null,
1221                null_count: 2,
1222                total_count: 2,
1223                distinct_count: None,
1224            }],
1225            row_id_min: None,
1226            row_id_max: None,
1227        };
1228        let filter = PushdownFilter::IsNull {
1229            column_idx: 0,
1230            is_null: false,
1231        };
1232        assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
1233    }
1234
1235    #[test]
1236    fn evaluate_pushdown_and_prunes_if_any_branch_skips() {
1237        let stats = RowGroupStatistics {
1238            row_count: 3,
1239            columns: vec![ColumnStatistics {
1240                min: SqlValue::Integer(1),
1241                max: SqlValue::Integer(3),
1242                null_count: 0,
1243                total_count: 3,
1244                distinct_count: None,
1245            }],
1246            row_id_min: None,
1247            row_id_max: None,
1248        };
1249        let filter = PushdownFilter::And(vec![
1250            PushdownFilter::Eq {
1251                column_idx: 0,
1252                value: SqlValue::Integer(10),
1253            },
1254            PushdownFilter::Eq {
1255                column_idx: 0,
1256                value: SqlValue::Integer(2),
1257            },
1258        ]);
1259        assert!(ColumnarScan::evaluate_pushdown(&filter, &stats));
1260    }
1261
1262    #[test]
1263    fn evaluate_pushdown_or_keeps_if_any_branch_may_match() {
1264        let stats = RowGroupStatistics {
1265            row_count: 3,
1266            columns: vec![ColumnStatistics {
1267                min: SqlValue::Integer(1),
1268                max: SqlValue::Integer(3),
1269                null_count: 0,
1270                total_count: 3,
1271                distinct_count: None,
1272            }],
1273            row_id_min: None,
1274            row_id_max: None,
1275        };
1276        let filter = PushdownFilter::Or(vec![
1277            PushdownFilter::Eq {
1278                column_idx: 0,
1279                value: SqlValue::Integer(10),
1280            },
1281            PushdownFilter::Eq {
1282                column_idx: 0,
1283                value: SqlValue::Integer(2),
1284            },
1285        ]);
1286        assert!(!ColumnarScan::evaluate_pushdown(&filter, &stats));
1287    }
1288
1289    #[test]
1290    fn expr_to_pushdown_converts_eq() {
1291        let expr = TypedExpr {
1292            kind: TypedExprKind::BinaryOp {
1293                left: Box::new(TypedExpr::column_ref(
1294                    "t".into(),
1295                    "c".into(),
1296                    0,
1297                    ResolvedType::Integer,
1298                    crate::Span::default(),
1299                )),
1300                op: BinaryOp::Eq,
1301                right: Box::new(TypedExpr::literal(
1302                    Literal::Number("1".into()),
1303                    ResolvedType::Integer,
1304                    crate::Span::default(),
1305                )),
1306            },
1307            resolved_type: ResolvedType::Boolean,
1308            span: crate::Span::default(),
1309        };
1310        let filter = expr_to_pushdown(&expr).unwrap();
1311        assert_eq!(
1312            filter,
1313            PushdownFilter::Eq {
1314                column_idx: 0,
1315                value: SqlValue::Integer(1)
1316            }
1317        );
1318    }
1319
1320    #[test]
1321    fn execute_columnar_scan_applies_residual_filter() {
1322        let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
1323        let mut table = TableMetadata::new(
1324            "users",
1325            vec![
1326                ColumnMetadata::new("id", ResolvedType::Integer),
1327                ColumnMetadata::new("name", ResolvedType::Text),
1328            ],
1329        )
1330        .with_table_id(1);
1331        table.storage_options.storage_type = crate::catalog::StorageType::Columnar;
1332
1333        // Columnar セグメントを直接書き込む。
1334        let schema = alopex_core::columnar::segment_v2::Schema {
1335            columns: vec![
1336                alopex_core::columnar::segment_v2::ColumnSchema {
1337                    name: "id".into(),
1338                    logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1339                    nullable: false,
1340                    fixed_len: None,
1341                },
1342                alopex_core::columnar::segment_v2::ColumnSchema {
1343                    name: "name".into(),
1344                    logical_type: alopex_core::columnar::encoding::LogicalType::Binary,
1345                    nullable: false,
1346                    fixed_len: None,
1347                },
1348            ],
1349        };
1350        let batch = alopex_core::columnar::segment_v2::RecordBatch::new(
1351            schema.clone(),
1352            vec![
1353                alopex_core::columnar::encoding::Column::Int64(vec![1]),
1354                alopex_core::columnar::encoding::Column::Binary(vec![b"alice".to_vec()]),
1355            ],
1356            vec![None, None],
1357        );
1358        let mut writer =
1359            alopex_core::columnar::segment_v2::SegmentWriterV2::new(Default::default());
1360        writer.write_batch(batch).unwrap();
1361        let segment = writer.finish().unwrap();
1362
1363        let stats = vec![crate::columnar::statistics::compute_row_group_statistics(
1364            &[vec![SqlValue::Integer(1), SqlValue::Text("alice".into())]],
1365        )];
1366
1367        let mut txn = bridge.begin_write().unwrap();
1368        let segment_bytes = alopex_core::storage::format::bincode_config()
1369            .serialize(&segment)
1370            .unwrap();
1371        let meta_bytes = alopex_core::storage::format::bincode_config()
1372            .serialize(&segment.meta)
1373            .unwrap();
1374        let stats_bytes = alopex_core::storage::format::bincode_config()
1375            .serialize(&stats)
1376            .unwrap();
1377        txn.inner_mut()
1378            .put(
1379                alopex_core::columnar::kvs_bridge::key_layout::column_segment_key(1, 0, 0),
1380                segment_bytes,
1381            )
1382            .unwrap();
1383        txn.inner_mut()
1384            .put(
1385                alopex_core::columnar::kvs_bridge::key_layout::statistics_key(1, 0),
1386                meta_bytes,
1387            )
1388            .unwrap();
1389        txn.inner_mut()
1390            .put(
1391                alopex_core::columnar::kvs_bridge::key_layout::row_group_stats_key(1, 0),
1392                stats_bytes,
1393            )
1394            .unwrap();
1395        let index_bytes = alopex_core::storage::format::bincode_config()
1396            .serialize(&vec![0u64])
1397            .unwrap();
1398        txn.inner_mut()
1399            .put(
1400                alopex_core::columnar::kvs_bridge::key_layout::segment_index_key(1),
1401                index_bytes,
1402            )
1403            .unwrap();
1404        txn.commit().unwrap();
1405
1406        let scan = ColumnarScan::new(
1407            table.table_id,
1408            vec![0, 1],
1409            Some(PushdownFilter::Eq {
1410                column_idx: 0,
1411                value: SqlValue::Integer(1),
1412            }),
1413            Some(TypedExpr {
1414                kind: TypedExprKind::BinaryOp {
1415                    left: Box::new(TypedExpr::column_ref(
1416                        "users".into(),
1417                        "id".into(),
1418                        0,
1419                        ResolvedType::Integer,
1420                        crate::Span::default(),
1421                    )),
1422                    op: BinaryOp::Eq,
1423                    right: Box::new(TypedExpr::literal(
1424                        Literal::Number("1".into()),
1425                        ResolvedType::Integer,
1426                        crate::Span::default(),
1427                    )),
1428                },
1429                resolved_type: ResolvedType::Boolean,
1430                span: crate::Span::default(),
1431            }),
1432        );
1433
1434        let mut read_txn = bridge.begin_read().unwrap();
1435        let rows = execute_columnar_scan(&mut read_txn, &table, &scan).unwrap();
1436        assert_eq!(rows.len(), 1);
1437        assert_eq!(rows[0].values[1], SqlValue::Text("alice".into()));
1438    }
1439
1440    #[test]
1441    fn rowid_mode_direct_prefers_rowid_column() {
1442        let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
1443        let mut table = TableMetadata::new(
1444            "items",
1445            vec![
1446                ColumnMetadata::new("row_id", ResolvedType::BigInt),
1447                ColumnMetadata::new("val", ResolvedType::Integer),
1448            ],
1449        )
1450        .with_table_id(20);
1451        table.storage_options.storage_type = crate::catalog::StorageType::Columnar;
1452        table.storage_options.row_id_mode = RowIdMode::Direct;
1453
1454        let schema = alopex_core::columnar::segment_v2::Schema {
1455            columns: vec![
1456                alopex_core::columnar::segment_v2::ColumnSchema {
1457                    name: "row_id".into(),
1458                    logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1459                    nullable: false,
1460                    fixed_len: None,
1461                },
1462                alopex_core::columnar::segment_v2::ColumnSchema {
1463                    name: "val".into(),
1464                    logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1465                    nullable: false,
1466                    fixed_len: None,
1467                },
1468            ],
1469        };
1470        let batch = alopex_core::columnar::segment_v2::RecordBatch::new(
1471            schema.clone(),
1472            vec![
1473                alopex_core::columnar::encoding::Column::Int64(vec![999]),
1474                alopex_core::columnar::encoding::Column::Int64(vec![7]),
1475            ],
1476            vec![None, None],
1477        );
1478        let mut writer =
1479            alopex_core::columnar::segment_v2::SegmentWriterV2::new(Default::default());
1480        writer.write_batch(batch).unwrap();
1481        let segment = writer.finish().unwrap();
1482        let stats = vec![crate::columnar::statistics::compute_row_group_statistics(
1483            &[vec![SqlValue::BigInt(999), SqlValue::Integer(7)]],
1484        )];
1485
1486        persist_segment_for_test(&bridge, table.table_id, &segment, &stats);
1487
1488        let scan = ColumnarScan::new(table.table_id, vec![0, 1], None, None);
1489        let mut read_txn = bridge.begin_read().unwrap();
1490        let rows = execute_columnar_scan(&mut read_txn, &table, &scan).unwrap();
1491        assert_eq!(rows.len(), 1);
1492        assert_eq!(rows[0].row_id, 999);
1493        assert_eq!(rows[0].values[1], SqlValue::Integer(7));
1494    }
1495
1496    #[test]
1497    fn rowid_mode_none_uses_position() {
1498        let bridge = TxnBridge::new(Arc::new(MemoryKV::new()));
1499        let mut table = TableMetadata::new(
1500            "items",
1501            vec![ColumnMetadata::new("val", ResolvedType::Integer)],
1502        )
1503        .with_table_id(21);
1504        table.storage_options.storage_type = crate::catalog::StorageType::Columnar;
1505        table.storage_options.row_id_mode = RowIdMode::Direct;
1506
1507        let schema = alopex_core::columnar::segment_v2::Schema {
1508            columns: vec![alopex_core::columnar::segment_v2::ColumnSchema {
1509                name: "val".into(),
1510                logical_type: alopex_core::columnar::encoding::LogicalType::Int64,
1511                nullable: false,
1512                fixed_len: None,
1513            }],
1514        };
1515        let batch = alopex_core::columnar::segment_v2::RecordBatch::new(
1516            schema.clone(),
1517            vec![alopex_core::columnar::encoding::Column::Int64(vec![3, 4])],
1518            vec![None],
1519        );
1520        let mut writer =
1521            alopex_core::columnar::segment_v2::SegmentWriterV2::new(Default::default());
1522        writer.write_batch(batch).unwrap();
1523        let segment = writer.finish().unwrap();
1524        let stats = vec![crate::columnar::statistics::compute_row_group_statistics(
1525            &[vec![SqlValue::Integer(3)], vec![SqlValue::Integer(4)]],
1526        )];
1527
1528        persist_segment_for_test(&bridge, table.table_id, &segment, &stats);
1529
1530        let scan = ColumnarScan::new(table.table_id, vec![0], None, None);
1531        let mut read_txn = bridge.begin_read().unwrap();
1532        let rows = execute_columnar_scan(&mut read_txn, &table, &scan).unwrap();
1533        assert_eq!(rows.len(), 2);
1534        assert_eq!(rows[0].row_id, 0);
1535        assert_eq!(rows[1].row_id, 1);
1536    }
1537
1538    fn persist_segment_for_test(
1539        bridge: &TxnBridge<MemoryKV>,
1540        table_id: u32,
1541        segment: &alopex_core::columnar::segment_v2::ColumnSegmentV2,
1542        row_group_stats: &[crate::columnar::statistics::RowGroupStatistics],
1543    ) {
1544        let mut txn = bridge.begin_write().unwrap();
1545        let segment_bytes = alopex_core::storage::format::bincode_config()
1546            .serialize(segment)
1547            .unwrap();
1548        let meta_bytes = alopex_core::storage::format::bincode_config()
1549            .serialize(&segment.meta)
1550            .unwrap();
1551        let stats_bytes = alopex_core::storage::format::bincode_config()
1552            .serialize(row_group_stats)
1553            .unwrap();
1554        txn.inner_mut()
1555            .put(
1556                alopex_core::columnar::kvs_bridge::key_layout::column_segment_key(table_id, 0, 0),
1557                segment_bytes,
1558            )
1559            .unwrap();
1560        txn.inner_mut()
1561            .put(
1562                alopex_core::columnar::kvs_bridge::key_layout::statistics_key(table_id, 0),
1563                meta_bytes,
1564            )
1565            .unwrap();
1566        txn.inner_mut()
1567            .put(
1568                alopex_core::columnar::kvs_bridge::key_layout::row_group_stats_key(table_id, 0),
1569                stats_bytes,
1570            )
1571            .unwrap();
1572        let index_bytes = alopex_core::storage::format::bincode_config()
1573            .serialize(&vec![0u64])
1574            .unwrap();
1575        txn.inner_mut()
1576            .put(
1577                alopex_core::columnar::kvs_bridge::key_layout::segment_index_key(table_id),
1578                index_bytes,
1579            )
1580            .unwrap();
1581        txn.commit().unwrap();
1582    }
1583}