vibesql_executor/pipeline/
native_columnar.rs

1//! Native Columnar Execution Pipeline
2//!
3//! Implements the `ExecutionPipeline` trait using zero-copy columnar execution
4//! directly from storage. This pipeline keeps data in columnar format throughout
5//! the execution to minimize materialization overhead.
6
7use vibesql_ast::{Expression, SelectItem};
8use vibesql_storage::Row;
9
10use super::{ExecutionContext, ExecutionPipeline, PipelineInput, PipelineOutput};
11use crate::{
12    errors::ExecutorError,
13    select::columnar::{
14        extract_aggregates, extract_column_predicates, simd_filter_batch, AggregateOp,
15        AggregateSource, AggregateSpec, ColumnarBatch,
16    },
17};
18
19/// Native columnar execution pipeline.
20///
21/// This pipeline operates on data in columnar format throughout the execution,
22/// only converting to rows at the final output stage. This provides maximum
23/// performance for large-scale analytical queries.
24///
25/// # Performance
26///
27/// Uses LLVM auto-vectorization for SIMD-accelerated operations.
28///
29/// # Performance Characteristics
30///
31/// - **Filter**: SIMD-accelerated predicate evaluation (4-8x speedup)
32/// - **Projection**: Zero-copy column selection
33/// - **Aggregation**: SIMD reduction operations (10x speedup)
34/// - **GROUP BY**: Hash-based grouping with columnar aggregation
35///
36/// # When to Use
37///
38/// - Large datasets (>100k rows)
39/// - TPC-H style analytical queries
40/// - Simple GROUP BY with column references
41/// - Single table scans without complex JOINs
42///
43/// # Limitations
44///
45/// - No JOIN support (single table only)
46/// - GROUP BY limited to simple column references
47/// - No ROLLUP/CUBE/GROUPING SETS support
48/// - Requires columnar storage format
49pub struct NativeColumnarPipeline {
50    /// Whether the pipeline has access to columnar storage
51    #[allow(dead_code)]
52    has_columnar_storage: bool,
53}
54
55impl NativeColumnarPipeline {
56    /// Create a new native columnar pipeline.
57    #[inline]
58    pub fn new() -> Self {
59        Self { has_columnar_storage: true }
60    }
61
62    /// Create a native columnar pipeline with explicit columnar storage availability.
63    #[inline]
64    #[allow(dead_code)]
65    pub fn with_storage(has_columnar_storage: bool) -> Self {
66        Self { has_columnar_storage }
67    }
68}
69
70impl Default for NativeColumnarPipeline {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl ExecutionPipeline for NativeColumnarPipeline {
77    /// Create an evaluator with context for native columnar execution.
78    #[inline]
79    fn create_evaluator<'a>(
80        &self,
81        ctx: &'a ExecutionContext<'a>,
82    ) -> crate::evaluator::CombinedExpressionEvaluator<'a> {
83        ctx.create_evaluator()
84    }
85
86    /// Apply WHERE clause filtering using SIMD-accelerated columnar operations.
87    ///
88    /// This is the most optimized path:
89    /// 1. Extracts simple predicates from WHERE clause
90    /// 2. Applies SIMD filtering directly on column arrays
91    /// 3. Returns filtered columnar batch (converted to rows for output)
92    fn apply_filter(
93        &self,
94        input: PipelineInput<'_>,
95        predicate: Option<&Expression>,
96        ctx: &ExecutionContext<'_>,
97    ) -> Result<PipelineOutput, ExecutorError> {
98        // Handle native columnar input specially
99        if let PipelineInput::NativeColumnar { table_name, column_indices: _ } = &input {
100            // Get columnar data directly from storage
101            let columnar_table = match ctx.database.get_columnar(table_name) {
102                Ok(Some(ct)) => ct,
103                Ok(None) | Err(_) => {
104                    // Fall back to row-based if columnar not available
105                    return self.fallback_filter(input.into_rows(), predicate, ctx);
106                }
107            };
108
109            // Skip empty tables
110            if columnar_table.row_count() == 0 {
111                return Ok(PipelineOutput::Empty);
112            }
113
114            // Convert to batch
115            let batch = ColumnarBatch::from_storage_columnar(&columnar_table)?;
116
117            // No predicate = return all rows
118            if predicate.is_none() {
119                let rows = batch.to_rows()?;
120                return Ok(PipelineOutput::from_rows(rows));
121            }
122
123            let predicate = predicate.unwrap();
124
125            // Extract simple predicates for SIMD filtering
126            let predicates = match extract_column_predicates(predicate, ctx.schema) {
127                Some(preds) => preds,
128                None => {
129                    // Complex predicate - fall back to row filtering
130                    let rows = batch.to_rows()?;
131                    return self.fallback_filter(rows, Some(predicate), ctx);
132                }
133            };
134
135            // Apply SIMD-accelerated filtering
136            let filtered_batch =
137                if predicates.is_empty() { batch } else { simd_filter_batch(&batch, &predicates)? };
138
139            // Return batch directly - avoid row conversion overhead
140            // The batch will stay in columnar format through the pipeline
141            return Ok(PipelineOutput::from_batch(filtered_batch));
142        }
143
144        // For row-based input, convert to columnar and filter
145        let rows = input.into_rows();
146
147        if predicate.is_none() {
148            return Ok(PipelineOutput::from_rows(rows));
149        }
150
151        let predicate = predicate.unwrap();
152
153        // Try to extract simple predicates
154        let predicates = match extract_column_predicates(predicate, ctx.schema) {
155            Some(preds) => preds,
156            None => {
157                return self.fallback_filter(rows, Some(predicate), ctx);
158            }
159        };
160
161        if rows.is_empty() {
162            return Ok(PipelineOutput::Empty);
163        }
164
165        // Convert to columnar batch
166        let batch = ColumnarBatch::from_rows(&rows)?;
167
168        // Apply SIMD filtering
169        let filtered_batch =
170            if predicates.is_empty() { batch } else { simd_filter_batch(&batch, &predicates)? };
171
172        // Return batch directly - keep data in columnar format
173        Ok(PipelineOutput::from_batch(filtered_batch))
174    }
175
176    /// Apply SELECT projection using columnar operations.
177    ///
178    /// In native columnar execution, projection is typically implicit -
179    /// we only compute the required columns. For explicit projection,
180    /// we fall back to row-oriented processing only when necessary.
181    fn apply_projection(
182        &self,
183        input: PipelineInput<'_>,
184        select_items: &[SelectItem],
185        ctx: &ExecutionContext<'_>,
186    ) -> Result<PipelineOutput, ExecutorError> {
187        // For batch input, convert to rows for projection then convert back
188        // This is less efficient but ensures correctness for complex projections
189        // TODO: Implement native columnar projection for simple column selections
190        let rows = match input {
191            PipelineInput::Batch(batch) => {
192                if batch.row_count() == 0 {
193                    return Ok(PipelineOutput::Empty);
194                }
195                batch.to_rows()?
196            }
197            PipelineInput::Rows(rows) => {
198                if rows.is_empty() {
199                    return Ok(PipelineOutput::Empty);
200                }
201                rows.to_vec()
202            }
203            PipelineInput::RowsOwned(rows) => {
204                if rows.is_empty() {
205                    return Ok(PipelineOutput::Empty);
206                }
207                rows
208            }
209            PipelineInput::Empty => return Ok(PipelineOutput::Empty),
210            PipelineInput::NativeColumnar { .. } => {
211                // Should not reach here - native columnar goes through filter first
212                return Ok(PipelineOutput::Empty);
213            }
214        };
215
216        // Use row-oriented projection for now
217        // Native columnar projection would require computing column indices
218        // and extracting only those columns from the batch
219        let evaluator = ctx.create_evaluator();
220        let buffer_pool = vibesql_storage::QueryBufferPool::new();
221
222        let mut projected_rows = Vec::with_capacity(rows.len());
223        for row in rows {
224            let projected = crate::select::projection::project_row_combined(
225                &row,
226                select_items,
227                &evaluator,
228                ctx.schema,
229                &None,
230                &buffer_pool,
231            )?;
232            projected_rows.push(projected);
233        }
234
235        Ok(PipelineOutput::from_rows(projected_rows))
236    }
237
238    /// Execute aggregation using SIMD-accelerated columnar operations.
239    ///
240    /// This is the most optimized aggregation path:
241    /// 1. Extracts aggregate specifications from SELECT list
242    /// 2. For simple aggregates without GROUP BY: Uses SIMD reductions
243    /// 3. For GROUP BY: Uses hash-based grouping with columnar aggregation
244    fn apply_aggregation(
245        &self,
246        input: PipelineInput<'_>,
247        select_items: &[SelectItem],
248        group_by: Option<&[Expression]>,
249        having: Option<&Expression>,
250        ctx: &ExecutionContext<'_>,
251    ) -> Result<PipelineOutput, ExecutorError> {
252        // HAVING not supported in native columnar pipeline - fall back to row-oriented
253        if having.is_some() {
254            return Err(ExecutorError::UnsupportedFeature(
255                "HAVING not supported in native columnar pipeline".to_string(),
256            ));
257        }
258
259        // Extract aggregate expressions from select items
260        let agg_exprs: Vec<Expression> = select_items
261            .iter()
262            .filter_map(|item| {
263                if let SelectItem::Expression { expr, .. } = item {
264                    Some(expr.clone())
265                } else {
266                    None
267                }
268            })
269            .collect();
270
271        // Try to extract aggregate specs
272        let agg_specs = match extract_aggregates(&agg_exprs, ctx.schema) {
273            Some(specs) => specs,
274            None => {
275                return Err(ExecutorError::UnsupportedFeature(
276                    "Complex aggregates not supported in native columnar pipeline".to_string(),
277                ));
278            }
279        };
280
281        // Check if there's a GROUP BY with non-empty expressions
282        let has_group_by = group_by.is_some_and(|exprs| !exprs.is_empty());
283        let group_by_count = group_by.map_or(0, |exprs| exprs.len());
284
285        // Count non-aggregate expressions in SELECT list (should match GROUP BY columns)
286        let select_non_agg_count = select_items
287            .iter()
288            .filter(|item| {
289                matches!(item, SelectItem::Expression { expr, .. }
290                    if !matches!(expr, Expression::AggregateFunction { .. }))
291            })
292            .count();
293
294        // Issue #4233: columnar_group_by_batch returns [group_keys..., aggregates...] format.
295        // If the SELECT list doesn't include all GROUP BY columns (e.g., SELECT AVG(col) FROM t
296        // GROUP BY col), the result format won't match the SELECT list. Fall back to
297        // row-oriented execution.
298        if has_group_by && select_non_agg_count != group_by_count {
299            return Err(ExecutorError::UnsupportedFeature(format!(
300                "GROUP BY with SELECT list that doesn't include all group keys not supported in native columnar (SELECT has {} non-aggs, GROUP BY has {} keys)",
301                select_non_agg_count, group_by_count
302            )));
303        }
304
305        // Issue #4510: columnar_group_by_batch returns [group_keys..., aggregates...] format.
306        // If the SELECT list has aggregates before GROUP BY columns (e.g., SELECT count(*), col
307        // FROM t GROUP BY col), the result order won't match. Fall back to row-oriented.
308        // The SELECT list must have all non-aggregates first, then all aggregates.
309        if has_group_by {
310            let mut first_agg_pos = None;
311            let mut last_non_agg_pos = None;
312            for (i, item) in select_items.iter().enumerate() {
313                if let SelectItem::Expression { expr, .. } = item {
314                    if matches!(expr, Expression::AggregateFunction { .. }) {
315                        if first_agg_pos.is_none() {
316                            first_agg_pos = Some(i);
317                        }
318                    } else {
319                        last_non_agg_pos = Some(i);
320                    }
321                }
322            }
323            // If first aggregate comes before last non-aggregate, order doesn't match columnar
324            if let (Some(first_agg), Some(last_non_agg)) = (first_agg_pos, last_non_agg_pos) {
325                if first_agg < last_non_agg {
326                    log::debug!(
327                        "Native columnar fallback: aggregate at {} before non-aggregate at {}",
328                        first_agg,
329                        last_non_agg
330                    );
331                    return Err(ExecutorError::UnsupportedFeature(
332                        "SELECT list with aggregates before GROUP BY columns not supported in native columnar".to_string(),
333                    ));
334                }
335            }
336        }
337
338        // Helper to return empty result for GROUP BY with empty input
339        // SQL semantics: GROUP BY on empty input returns 0 rows (no groups)
340        // Without GROUP BY: aggregates on empty input return 1 row (COUNT=0, others=NULL)
341        let return_empty_result = |has_gb: bool, specs: &[AggregateSpec]| -> PipelineOutput {
342            if has_gb {
343                // GROUP BY with empty input: return 0 rows
344                PipelineOutput::from_rows(vec![])
345            } else {
346                // No GROUP BY: return 1 row with COUNT=0, others=NULL
347                let values: Vec<vibesql_types::SqlValue> = specs
348                    .iter()
349                    .map(|spec| match spec.op {
350                        AggregateOp::Count => vibesql_types::SqlValue::Integer(0),
351                        _ => vibesql_types::SqlValue::Null,
352                    })
353                    .collect();
354                PipelineOutput::from_rows(vec![Row::new(values)])
355            }
356        };
357
358        // Get batch directly if input is already columnar, otherwise convert
359        // This is the key optimization: avoid row conversion if data is already in batch format
360        let batch = match input {
361            PipelineInput::Batch(batch) => batch,
362            PipelineInput::Rows(rows) => {
363                if rows.is_empty() {
364                    return Ok(return_empty_result(has_group_by, &agg_specs));
365                }
366                ColumnarBatch::from_rows(rows)?
367            }
368            PipelineInput::RowsOwned(rows) => {
369                if rows.is_empty() {
370                    return Ok(return_empty_result(has_group_by, &agg_specs));
371                }
372                ColumnarBatch::from_rows(&rows)?
373            }
374            PipelineInput::Empty => {
375                return Ok(return_empty_result(has_group_by, &agg_specs));
376            }
377            PipelineInput::NativeColumnar { table_name, .. } => {
378                // Get columnar data directly from storage
379                let columnar_table = match ctx.database.get_columnar(&table_name) {
380                    Ok(Some(ct)) => ct,
381                    Ok(None) | Err(_) => {
382                        return Err(ExecutorError::Other(format!(
383                            "Table '{}' not found for columnar aggregation",
384                            table_name
385                        )));
386                    }
387                };
388                ColumnarBatch::from_storage_columnar(&columnar_table)?
389            }
390        };
391
392        // Handle empty batch
393        if batch.row_count() == 0 {
394            return Ok(return_empty_result(has_group_by, &agg_specs));
395        }
396
397        // Check if we have GROUP BY
398        if let Some(group_exprs) = group_by {
399            if !group_exprs.is_empty() {
400                return self.execute_group_by(&batch, group_exprs, &agg_specs, ctx);
401            }
402        }
403
404        // Simple aggregation without GROUP BY
405        let needs_schema =
406            agg_specs.iter().any(|spec| matches!(spec.source, AggregateSource::Expression(_)));
407        let schema_ref = if needs_schema { Some(ctx.schema) } else { None };
408
409        let results =
410            crate::select::columnar::compute_aggregates_from_batch(&batch, &agg_specs, schema_ref)?;
411
412        Ok(PipelineOutput::from_rows(vec![Row::new(results)]))
413    }
414
415    /// Native columnar supports simple aggregates with optional GROUP BY.
416    fn supports_query_pattern(
417        &self,
418        has_aggregation: bool,
419        _has_group_by: bool,
420        has_joins: bool,
421    ) -> bool {
422        // Native columnar supports:
423        // - Aggregates with or without GROUP BY (single table only)
424        // - No JOINs (single table requirement)
425        // GROUP BY is supported for simple column references
426        has_aggregation && !has_joins
427    }
428
429    #[inline]
430    fn name(&self) -> &'static str {
431        "NativeColumnarPipeline (SIMD)"
432    }
433}
434
435impl NativeColumnarPipeline {
436    /// Fallback to row-oriented filtering for complex predicates.
437    fn fallback_filter(
438        &self,
439        rows: Vec<Row>,
440        predicate: Option<&Expression>,
441        ctx: &ExecutionContext<'_>,
442    ) -> Result<PipelineOutput, ExecutorError> {
443        if predicate.is_none() {
444            return Ok(PipelineOutput::from_rows(rows));
445        }
446
447        let predicate = predicate.unwrap();
448        let evaluator = ctx.create_evaluator();
449
450        let mut filtered = Vec::with_capacity(rows.len());
451        for row in rows {
452            let value = evaluator.eval(predicate, &row)?;
453            let include = match value {
454                vibesql_types::SqlValue::Boolean(true) => true,
455                vibesql_types::SqlValue::Boolean(false) | vibesql_types::SqlValue::Null => false,
456                vibesql_types::SqlValue::Integer(0) => false,
457                vibesql_types::SqlValue::Integer(_) => true,
458                _ => false,
459            };
460            if include {
461                filtered.push(row);
462            }
463        }
464
465        Ok(PipelineOutput::from_rows(filtered))
466    }
467
468    /// Execute GROUP BY using hash-based columnar aggregation.
469    fn execute_group_by(
470        &self,
471        batch: &ColumnarBatch,
472        group_exprs: &[Expression],
473        agg_specs: &[AggregateSpec],
474        ctx: &ExecutionContext<'_>,
475    ) -> Result<PipelineOutput, ExecutorError> {
476        // Extract group column indices (only simple column refs supported)
477        let group_cols: Vec<usize> = group_exprs
478            .iter()
479            .filter_map(|expr| match expr {
480                Expression::ColumnRef(col_id) => {
481                    ctx.schema.get_column_index(col_id.table_canonical(), col_id.column_canonical())
482                }
483                _ => None,
484            })
485            .collect();
486
487        if group_cols.len() != group_exprs.len() {
488            return Err(ExecutorError::UnsupportedFeature(
489                "GROUP BY with non-column expressions not supported in native columnar".to_string(),
490            ));
491        }
492
493        // Convert aggregates to (column_idx, op) format
494        let agg_cols: Vec<(usize, AggregateOp)> = agg_specs
495            .iter()
496            .filter_map(|spec| match &spec.source {
497                AggregateSource::Column(idx) => Some((*idx, spec.op)),
498                AggregateSource::CountStar => Some((0, AggregateOp::Count)),
499                AggregateSource::Expression(_) => None,
500            })
501            .collect();
502
503        if agg_cols.len() != agg_specs.len() {
504            return Err(ExecutorError::UnsupportedFeature(
505                "Expression aggregates not supported in native columnar GROUP BY".to_string(),
506            ));
507        }
508
509        // Execute SIMD-accelerated hash-based GROUP BY directly on batch
510        let result =
511            crate::select::columnar::columnar_group_by_batch(batch, &group_cols, &agg_cols)?;
512
513        Ok(PipelineOutput::from_rows(result))
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use vibesql_catalog::TableSchema;
520    use vibesql_types::SqlValue;
521
522    use super::*;
523    use crate::schema::CombinedSchema;
524
525    fn create_test_setup() -> (vibesql_storage::Database, CombinedSchema) {
526        let database = vibesql_storage::Database::new();
527        let table_schema = TableSchema::new("test".to_string(), vec![]);
528        let schema = CombinedSchema::from_table("test".to_string(), table_schema);
529        (database, schema)
530    }
531
532    fn make_test_row(values: Vec<i64>) -> Row {
533        Row::new(values.into_iter().map(SqlValue::Integer).collect::<Vec<_>>())
534    }
535
536    #[test]
537    fn test_native_columnar_pipeline_name() {
538        let pipeline = NativeColumnarPipeline::new();
539        let name = pipeline.name();
540        assert!(name.starts_with("NativeColumnarPipeline"));
541    }
542
543    #[test]
544    fn test_native_columnar_pipeline_supports_aggregates() {
545        let pipeline = NativeColumnarPipeline::new();
546
547        // Supports aggregates without joins (SIMD always enabled via auto-vectorization)
548        let supports_simple = pipeline.supports_query_pattern(true, false, false);
549        assert!(supports_simple);
550
551        // Does not support joins
552        assert!(!pipeline.supports_query_pattern(true, false, true));
553    }
554
555    #[test]
556    fn test_native_columnar_pipeline_filter_no_predicate() {
557        let (database, schema) = create_test_setup();
558        let pipeline = NativeColumnarPipeline::new();
559
560        let rows = vec![make_test_row(vec![1, 2]), make_test_row(vec![3, 4])];
561        let input = PipelineInput::from_rows_owned(rows);
562        let ctx = ExecutionContext::new(&schema, &database);
563
564        let result = pipeline.apply_filter(input, None, &ctx).unwrap();
565        assert_eq!(result.row_count(), 2);
566    }
567
568    #[test]
569    fn test_native_columnar_pipeline_default() {
570        let pipeline = NativeColumnarPipeline::default();
571        assert!(pipeline.name().starts_with("NativeColumnarPipeline"));
572    }
573
574    #[test]
575    fn test_native_columnar_pipeline_limit_offset() {
576        let pipeline = NativeColumnarPipeline::new();
577
578        let rows = vec![
579            make_test_row(vec![1]),
580            make_test_row(vec![2]),
581            make_test_row(vec![3]),
582            make_test_row(vec![4]),
583            make_test_row(vec![5]),
584        ];
585        let output = PipelineOutput::from_rows(rows);
586
587        // Test limit
588        let result = pipeline.apply_limit_offset(output, Some(3), None).unwrap();
589        assert_eq!(result.len(), 3);
590    }
591}