vibesql_executor/pipeline/
native_columnar.rs

1//! Native Columnar Execution Pipeline
2//!
3//! Implements the `ExecutionPipeline` trait using zero-copy columnar execution
4//! directly from storage. This pipeline keeps data in columnar format throughout
5//! the execution to minimize materialization overhead.
6
7use vibesql_ast::{Expression, SelectItem};
8use vibesql_storage::Row;
9
10use crate::errors::ExecutorError;
11
12use super::{ExecutionContext, ExecutionPipeline, PipelineInput, PipelineOutput};
13
14use crate::select::columnar::{
15    extract_aggregates, extract_column_predicates, simd_filter_batch, AggregateOp, AggregateSource,
16    AggregateSpec, ColumnarBatch,
17};
18
19/// Native columnar execution pipeline.
20///
21/// This pipeline operates on data in columnar format throughout the execution,
22/// only converting to rows at the final output stage. This provides maximum
23/// performance for large-scale analytical queries.
24///
25/// # Performance
26///
27/// Uses LLVM auto-vectorization for SIMD-accelerated operations.
28///
29/// # Performance Characteristics
30///
31/// - **Filter**: SIMD-accelerated predicate evaluation (4-8x speedup)
32/// - **Projection**: Zero-copy column selection
33/// - **Aggregation**: SIMD reduction operations (10x speedup)
34/// - **GROUP BY**: Hash-based grouping with columnar aggregation
35///
36/// # When to Use
37///
38/// - Large datasets (>100k rows)
39/// - TPC-H style analytical queries
40/// - Simple GROUP BY with column references
41/// - Single table scans without complex JOINs
42///
43/// # Limitations
44///
45/// - No JOIN support (single table only)
46/// - GROUP BY limited to simple column references
47/// - No ROLLUP/CUBE/GROUPING SETS support
48/// - Requires columnar storage format
49pub struct NativeColumnarPipeline {
50    /// Whether the pipeline has access to columnar storage
51    #[allow(dead_code)]
52    has_columnar_storage: bool,
53}
54
55impl NativeColumnarPipeline {
56    /// Create a new native columnar pipeline.
57    #[inline]
58    pub fn new() -> Self {
59        Self { has_columnar_storage: true }
60    }
61
62    /// Create a native columnar pipeline with explicit columnar storage availability.
63    #[inline]
64    #[allow(dead_code)]
65    pub fn with_storage(has_columnar_storage: bool) -> Self {
66        Self { has_columnar_storage }
67    }
68}
69
70impl Default for NativeColumnarPipeline {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76impl ExecutionPipeline for NativeColumnarPipeline {
77    /// Create an evaluator with context for native columnar execution.
78    #[inline]
79    fn create_evaluator<'a>(
80        &self,
81        ctx: &'a ExecutionContext<'a>,
82    ) -> crate::evaluator::CombinedExpressionEvaluator<'a> {
83        ctx.create_evaluator()
84    }
85
86    /// Apply WHERE clause filtering using SIMD-accelerated columnar operations.
87    ///
88    /// This is the most optimized path:
89    /// 1. Extracts simple predicates from WHERE clause
90    /// 2. Applies SIMD filtering directly on column arrays
91    /// 3. Returns filtered columnar batch (converted to rows for output)
92    fn apply_filter(
93        &self,
94        input: PipelineInput<'_>,
95        predicate: Option<&Expression>,
96        ctx: &ExecutionContext<'_>,
97    ) -> Result<PipelineOutput, ExecutorError> {
98        // Handle native columnar input specially
99        if let PipelineInput::NativeColumnar { table_name, column_indices: _ } = &input {
100            // Get columnar data directly from storage
101            let columnar_table = match ctx.database.get_columnar(table_name) {
102                Ok(Some(ct)) => ct,
103                Ok(None) | Err(_) => {
104                    // Fall back to row-based if columnar not available
105                    return self.fallback_filter(input.into_rows(), predicate, ctx);
106                }
107            };
108
109            // Skip empty tables
110            if columnar_table.row_count() == 0 {
111                return Ok(PipelineOutput::Empty);
112            }
113
114            // Convert to batch
115            let batch = ColumnarBatch::from_storage_columnar(&columnar_table)?;
116
117            // No predicate = return all rows
118            if predicate.is_none() {
119                let rows = batch.to_rows()?;
120                return Ok(PipelineOutput::from_rows(rows));
121            }
122
123            let predicate = predicate.unwrap();
124
125            // Extract simple predicates for SIMD filtering
126            let predicates = match extract_column_predicates(predicate, ctx.schema) {
127                Some(preds) => preds,
128                None => {
129                    // Complex predicate - fall back to row filtering
130                    let rows = batch.to_rows()?;
131                    return self.fallback_filter(rows, Some(predicate), ctx);
132                }
133            };
134
135            // Apply SIMD-accelerated filtering
136            let filtered_batch =
137                if predicates.is_empty() { batch } else { simd_filter_batch(&batch, &predicates)? };
138
139            // Return batch directly - avoid row conversion overhead
140            // The batch will stay in columnar format through the pipeline
141            return Ok(PipelineOutput::from_batch(filtered_batch));
142        }
143
144        // For row-based input, convert to columnar and filter
145        let rows = input.into_rows();
146
147        if predicate.is_none() {
148            return Ok(PipelineOutput::from_rows(rows));
149        }
150
151        let predicate = predicate.unwrap();
152
153        // Try to extract simple predicates
154        let predicates = match extract_column_predicates(predicate, ctx.schema) {
155            Some(preds) => preds,
156            None => {
157                return self.fallback_filter(rows, Some(predicate), ctx);
158            }
159        };
160
161        if rows.is_empty() {
162            return Ok(PipelineOutput::Empty);
163        }
164
165        // Convert to columnar batch
166        let batch = ColumnarBatch::from_rows(&rows)?;
167
168        // Apply SIMD filtering
169        let filtered_batch =
170            if predicates.is_empty() { batch } else { simd_filter_batch(&batch, &predicates)? };
171
172        // Return batch directly - keep data in columnar format
173        Ok(PipelineOutput::from_batch(filtered_batch))
174    }
175
176    /// Apply SELECT projection using columnar operations.
177    ///
178    /// In native columnar execution, projection is typically implicit -
179    /// we only compute the required columns. For explicit projection,
180    /// we fall back to row-oriented processing only when necessary.
181    fn apply_projection(
182        &self,
183        input: PipelineInput<'_>,
184        select_items: &[SelectItem],
185        ctx: &ExecutionContext<'_>,
186    ) -> Result<PipelineOutput, ExecutorError> {
187        // For batch input, convert to rows for projection then convert back
188        // This is less efficient but ensures correctness for complex projections
189        // TODO: Implement native columnar projection for simple column selections
190        let rows = match input {
191            PipelineInput::Batch(batch) => {
192                if batch.row_count() == 0 {
193                    return Ok(PipelineOutput::Empty);
194                }
195                batch.to_rows()?
196            }
197            PipelineInput::Rows(rows) => {
198                if rows.is_empty() {
199                    return Ok(PipelineOutput::Empty);
200                }
201                rows.to_vec()
202            }
203            PipelineInput::RowsOwned(rows) => {
204                if rows.is_empty() {
205                    return Ok(PipelineOutput::Empty);
206                }
207                rows
208            }
209            PipelineInput::Empty => return Ok(PipelineOutput::Empty),
210            PipelineInput::NativeColumnar { .. } => {
211                // Should not reach here - native columnar goes through filter first
212                return Ok(PipelineOutput::Empty);
213            }
214        };
215
216        // Use row-oriented projection for now
217        // Native columnar projection would require computing column indices
218        // and extracting only those columns from the batch
219        let evaluator = ctx.create_evaluator();
220        let buffer_pool = vibesql_storage::QueryBufferPool::new();
221
222        let mut projected_rows = Vec::with_capacity(rows.len());
223        for row in rows {
224            let projected = crate::select::projection::project_row_combined(
225                &row,
226                select_items,
227                &evaluator,
228                ctx.schema,
229                &None,
230                &buffer_pool,
231            )?;
232            projected_rows.push(projected);
233        }
234
235        Ok(PipelineOutput::from_rows(projected_rows))
236    }
237
238    /// Execute aggregation using SIMD-accelerated columnar operations.
239    ///
240    /// This is the most optimized aggregation path:
241    /// 1. Extracts aggregate specifications from SELECT list
242    /// 2. For simple aggregates without GROUP BY: Uses SIMD reductions
243    /// 3. For GROUP BY: Uses hash-based grouping with columnar aggregation
244    fn apply_aggregation(
245        &self,
246        input: PipelineInput<'_>,
247        select_items: &[SelectItem],
248        group_by: Option<&[Expression]>,
249        _having: Option<&Expression>,
250        ctx: &ExecutionContext<'_>,
251    ) -> Result<PipelineOutput, ExecutorError> {
252        // Extract aggregate expressions from select items
253        let agg_exprs: Vec<Expression> = select_items
254            .iter()
255            .filter_map(|item| {
256                if let SelectItem::Expression { expr, .. } = item {
257                    Some(expr.clone())
258                } else {
259                    None
260                }
261            })
262            .collect();
263
264        // Try to extract aggregate specs
265        let agg_specs = match extract_aggregates(&agg_exprs, ctx.schema) {
266            Some(specs) => specs,
267            None => {
268                return Err(ExecutorError::UnsupportedFeature(
269                    "Complex aggregates not supported in native columnar pipeline".to_string(),
270                ));
271            }
272        };
273
274        // Get batch directly if input is already columnar, otherwise convert
275        // This is the key optimization: avoid row conversion if data is already in batch format
276        let batch = match input {
277            PipelineInput::Batch(batch) => batch,
278            PipelineInput::Rows(rows) => {
279                if rows.is_empty() {
280                    // Handle empty input per SQL standard
281                    let values: Vec<vibesql_types::SqlValue> = agg_specs
282                        .iter()
283                        .map(|spec| match spec.op {
284                            AggregateOp::Count => vibesql_types::SqlValue::Integer(0),
285                            _ => vibesql_types::SqlValue::Null,
286                        })
287                        .collect();
288                    return Ok(PipelineOutput::from_rows(vec![Row::new(values)]));
289                }
290                ColumnarBatch::from_rows(rows)?
291            }
292            PipelineInput::RowsOwned(rows) => {
293                if rows.is_empty() {
294                    // Handle empty input per SQL standard
295                    let values: Vec<vibesql_types::SqlValue> = agg_specs
296                        .iter()
297                        .map(|spec| match spec.op {
298                            AggregateOp::Count => vibesql_types::SqlValue::Integer(0),
299                            _ => vibesql_types::SqlValue::Null,
300                        })
301                        .collect();
302                    return Ok(PipelineOutput::from_rows(vec![Row::new(values)]));
303                }
304                ColumnarBatch::from_rows(&rows)?
305            }
306            PipelineInput::Empty => {
307                // Handle empty input per SQL standard
308                let values: Vec<vibesql_types::SqlValue> = agg_specs
309                    .iter()
310                    .map(|spec| match spec.op {
311                        AggregateOp::Count => vibesql_types::SqlValue::Integer(0),
312                        _ => vibesql_types::SqlValue::Null,
313                    })
314                    .collect();
315                return Ok(PipelineOutput::from_rows(vec![Row::new(values)]));
316            }
317            PipelineInput::NativeColumnar { table_name, .. } => {
318                // Get columnar data directly from storage
319                let columnar_table = match ctx.database.get_columnar(&table_name) {
320                    Ok(Some(ct)) => ct,
321                    Ok(None) | Err(_) => {
322                        return Err(ExecutorError::Other(format!(
323                            "Table '{}' not found for columnar aggregation",
324                            table_name
325                        )));
326                    }
327                };
328                ColumnarBatch::from_storage_columnar(&columnar_table)?
329            }
330        };
331
332        // Handle empty batch
333        if batch.row_count() == 0 {
334            let values: Vec<vibesql_types::SqlValue> = agg_specs
335                .iter()
336                .map(|spec| match spec.op {
337                    AggregateOp::Count => vibesql_types::SqlValue::Integer(0),
338                    _ => vibesql_types::SqlValue::Null,
339                })
340                .collect();
341            return Ok(PipelineOutput::from_rows(vec![Row::new(values)]));
342        }
343
344        // Check if we have GROUP BY
345        if let Some(group_exprs) = group_by {
346            if !group_exprs.is_empty() {
347                return self.execute_group_by(&batch, group_exprs, &agg_specs, ctx);
348            }
349        }
350
351        // Simple aggregation without GROUP BY
352        let needs_schema =
353            agg_specs.iter().any(|spec| matches!(spec.source, AggregateSource::Expression(_)));
354        let schema_ref = if needs_schema { Some(ctx.schema) } else { None };
355
356        let results =
357            crate::select::columnar::compute_aggregates_from_batch(&batch, &agg_specs, schema_ref)?;
358
359        Ok(PipelineOutput::from_rows(vec![Row::new(results)]))
360    }
361
362    /// Native columnar supports simple aggregates with optional GROUP BY.
363    fn supports_query_pattern(
364        &self,
365        has_aggregation: bool,
366        _has_group_by: bool,
367        has_joins: bool,
368    ) -> bool {
369        // Native columnar supports:
370        // - Aggregates with or without GROUP BY (single table only)
371        // - No JOINs (single table requirement)
372        // GROUP BY is supported for simple column references
373        has_aggregation && !has_joins
374    }
375
376    #[inline]
377    fn name(&self) -> &'static str {
378        "NativeColumnarPipeline (SIMD)"
379    }
380}
381
382impl NativeColumnarPipeline {
383    /// Fallback to row-oriented filtering for complex predicates.
384    fn fallback_filter(
385        &self,
386        rows: Vec<Row>,
387        predicate: Option<&Expression>,
388        ctx: &ExecutionContext<'_>,
389    ) -> Result<PipelineOutput, ExecutorError> {
390        if predicate.is_none() {
391            return Ok(PipelineOutput::from_rows(rows));
392        }
393
394        let predicate = predicate.unwrap();
395        let evaluator = ctx.create_evaluator();
396
397        let mut filtered = Vec::with_capacity(rows.len());
398        for row in rows {
399            let value = evaluator.eval(predicate, &row)?;
400            let include = match value {
401                vibesql_types::SqlValue::Boolean(true) => true,
402                vibesql_types::SqlValue::Boolean(false) | vibesql_types::SqlValue::Null => false,
403                vibesql_types::SqlValue::Integer(0) => false,
404                vibesql_types::SqlValue::Integer(_) => true,
405                _ => false,
406            };
407            if include {
408                filtered.push(row);
409            }
410        }
411
412        Ok(PipelineOutput::from_rows(filtered))
413    }
414
415    /// Execute GROUP BY using hash-based columnar aggregation.
416    fn execute_group_by(
417        &self,
418        batch: &ColumnarBatch,
419        group_exprs: &[Expression],
420        agg_specs: &[AggregateSpec],
421        ctx: &ExecutionContext<'_>,
422    ) -> Result<PipelineOutput, ExecutorError> {
423        // Extract group column indices (only simple column refs supported)
424        let group_cols: Vec<usize> = group_exprs
425            .iter()
426            .filter_map(|expr| match expr {
427                Expression::ColumnRef { table, column } => {
428                    ctx.schema.get_column_index(table.as_deref(), column.as_str())
429                }
430                _ => None,
431            })
432            .collect();
433
434        if group_cols.len() != group_exprs.len() {
435            return Err(ExecutorError::UnsupportedFeature(
436                "GROUP BY with non-column expressions not supported in native columnar".to_string(),
437            ));
438        }
439
440        // Convert aggregates to (column_idx, op) format
441        let agg_cols: Vec<(usize, AggregateOp)> = agg_specs
442            .iter()
443            .filter_map(|spec| match &spec.source {
444                AggregateSource::Column(idx) => Some((*idx, spec.op)),
445                AggregateSource::CountStar => Some((0, AggregateOp::Count)),
446                AggregateSource::Expression(_) => None,
447            })
448            .collect();
449
450        if agg_cols.len() != agg_specs.len() {
451            return Err(ExecutorError::UnsupportedFeature(
452                "Expression aggregates not supported in native columnar GROUP BY".to_string(),
453            ));
454        }
455
456        // Execute SIMD-accelerated hash-based GROUP BY directly on batch
457        let result =
458            crate::select::columnar::columnar_group_by_batch(batch, &group_cols, &agg_cols)?;
459
460        Ok(PipelineOutput::from_rows(result))
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467    use crate::schema::CombinedSchema;
468    use vibesql_catalog::TableSchema;
469    use vibesql_types::SqlValue;
470
471    fn create_test_setup() -> (vibesql_storage::Database, CombinedSchema) {
472        let database = vibesql_storage::Database::new();
473        let table_schema = TableSchema::new("test".to_string(), vec![]);
474        let schema = CombinedSchema::from_table("test".to_string(), table_schema);
475        (database, schema)
476    }
477
478    fn make_test_row(values: Vec<i64>) -> Row {
479        Row::new(values.into_iter().map(SqlValue::Integer).collect())
480    }
481
482    #[test]
483    fn test_native_columnar_pipeline_name() {
484        let pipeline = NativeColumnarPipeline::new();
485        let name = pipeline.name();
486        assert!(name.starts_with("NativeColumnarPipeline"));
487    }
488
489    #[test]
490    fn test_native_columnar_pipeline_supports_aggregates() {
491        let pipeline = NativeColumnarPipeline::new();
492
493        // Supports aggregates without joins (SIMD always enabled via auto-vectorization)
494        let supports_simple = pipeline.supports_query_pattern(true, false, false);
495        assert!(supports_simple);
496
497        // Does not support joins
498        assert!(!pipeline.supports_query_pattern(true, false, true));
499    }
500
501    #[test]
502    fn test_native_columnar_pipeline_filter_no_predicate() {
503        let (database, schema) = create_test_setup();
504        let pipeline = NativeColumnarPipeline::new();
505
506        let rows = vec![make_test_row(vec![1, 2]), make_test_row(vec![3, 4])];
507        let input = PipelineInput::from_rows_owned(rows);
508        let ctx = ExecutionContext::new(&schema, &database);
509
510        let result = pipeline.apply_filter(input, None, &ctx).unwrap();
511        assert_eq!(result.row_count(), 2);
512    }
513
514    #[test]
515    fn test_native_columnar_pipeline_default() {
516        let pipeline = NativeColumnarPipeline::default();
517        assert!(pipeline.name().starts_with("NativeColumnarPipeline"));
518    }
519
520    #[test]
521    fn test_native_columnar_pipeline_limit_offset() {
522        let pipeline = NativeColumnarPipeline::new();
523
524        let rows = vec![
525            make_test_row(vec![1]),
526            make_test_row(vec![2]),
527            make_test_row(vec![3]),
528            make_test_row(vec![4]),
529            make_test_row(vec![5]),
530        ];
531        let output = PipelineOutput::from_rows(rows);
532
533        // Test limit
534        let result = pipeline.apply_limit_offset(output, Some(3), None).unwrap();
535        assert_eq!(result.len(), 3);
536    }
537}