vibesql_executor/pipeline/
mod.rs

1//! Unified Execution Pipeline
2//!
3//! This module provides a trait-based abstraction for query execution, enabling
4//! multiple execution strategies (row-oriented, columnar, native columnar) to
5//! share common logic while preserving specialized hot paths.
6//!
7//! ## Architecture
8//!
9//! The `ExecutionPipeline` trait defines the core operations for query execution:
10//! - Filter (WHERE clause)
11//! - Projection (SELECT columns)
12//! - Aggregation (GROUP BY, aggregate functions)
13//! - Limit/Offset
14//!
15//! Each execution strategy implements this trait with its own optimized logic:
16//! - `RowOrientedPipeline`: Traditional row-by-row execution
17//! - `ColumnarPipeline`: SIMD-accelerated batch execution (with row-to-batch conversion)
18//! - `NativeColumnarPipeline`: Zero-copy SIMD execution directly from columnar storage
19//!
20//! ## Benefits
21//!
22//! 1. **Reduced Duplication**: Common logic (evaluator setup, limit/offset) is shared
23//! 2. **Extensibility**: New strategies (GPU, distributed) can implement the trait
24//! 3. **Maintainability**: Single interface for all execution paths
25//! 4. **Performance**: Trait methods can be specialized with `#[inline]` for hot paths
26//!
27//! ## Usage
28//!
29//! ```text
30//! use vibesql_executor::pipeline::{ExecutionPipeline, ExecutionContext};
31//!
32//! let ctx = ExecutionContext::new(schema, database, ...);
33//! let pipeline = RowOrientedPipeline::new(&ctx);
34//!
35//! let filtered = pipeline.apply_filter(input, predicate, &ctx)?;
36//! let projected = pipeline.apply_projection(filtered, select_items, &ctx)?;
37//! let result = pipeline.apply_limit_offset(projected, limit, offset)?;
38//! ```
39
40mod columnar;
41mod context;
42mod native_columnar;
43mod row_oriented;
44mod types;
45
46pub use columnar::ColumnarPipeline;
47pub use context::ExecutionContext;
48pub use native_columnar::NativeColumnarPipeline;
49pub use row_oriented::RowOrientedPipeline;
50pub use types::{PipelineInput, PipelineOutput};
51
52use crate::errors::ExecutorError;
53use crate::evaluator::CombinedExpressionEvaluator;
54use vibesql_ast::{Expression, SelectItem};
55
56/// Unified execution pipeline trait for all query execution strategies.
57///
58/// Each method represents a stage in query execution. Implementations can
59/// override methods to provide optimized logic for their execution model.
60///
61/// # Type Parameters
62///
63/// The trait is designed to work with different input/output representations:
64/// - Row-oriented: Works with `Vec<Row>`
65/// - Columnar: Works with `ColumnarBatch`
66/// - Native columnar: Works directly with table storage
67pub trait ExecutionPipeline {
68    /// Create an evaluator with appropriate context for this pipeline.
69    ///
70    /// This factory method allows each pipeline implementation to create
71    /// evaluators configured for their specific execution model (CTE context,
72    /// outer row for correlation, procedural context, etc.).
73    ///
74    /// # Arguments
75    /// * `ctx` - Execution context containing schema, database, and optional contexts
76    ///
77    /// # Returns
78    /// A `CombinedExpressionEvaluator` configured for this pipeline's execution model
79    ///
80    /// # Default Implementation
81    ///
82    /// The default implementation delegates to `ctx.create_evaluator()`, which handles
83    /// all the context variants (CTE, outer row, procedural, windows).
84    #[inline]
85    fn create_evaluator<'a>(
86        &self,
87        ctx: &'a ExecutionContext<'a>,
88    ) -> CombinedExpressionEvaluator<'a> {
89        ctx.create_evaluator()
90    }
91
92    /// Apply WHERE clause filtering to the input data.
93    ///
94    /// # Arguments
95    /// * `input` - The input data (rows or batches)
96    /// * `predicate` - Optional WHERE clause expression
97    /// * `ctx` - Execution context with schema, database, and evaluator
98    ///
99    /// # Returns
100    /// Filtered output data
101    fn apply_filter(
102        &self,
103        input: PipelineInput<'_>,
104        predicate: Option<&Expression>,
105        ctx: &ExecutionContext<'_>,
106    ) -> Result<PipelineOutput, ExecutorError>;
107
108    /// Apply SELECT projection to transform columns.
109    ///
110    /// # Arguments
111    /// * `input` - The filtered data
112    /// * `select_items` - The SELECT list items
113    /// * `ctx` - Execution context
114    ///
115    /// # Returns
116    /// Projected output data
117    fn apply_projection(
118        &self,
119        input: PipelineInput<'_>,
120        select_items: &[SelectItem],
121        ctx: &ExecutionContext<'_>,
122    ) -> Result<PipelineOutput, ExecutorError>;
123
124    /// Execute aggregation with optional GROUP BY.
125    ///
126    /// # Arguments
127    /// * `input` - The filtered/projected data
128    /// * `select_items` - SELECT list (may contain aggregate functions)
129    /// * `group_by` - Optional GROUP BY expressions
130    /// * `having` - Optional HAVING clause
131    /// * `ctx` - Execution context
132    ///
133    /// # Returns
134    /// Aggregated output data
135    fn apply_aggregation(
136        &self,
137        input: PipelineInput<'_>,
138        select_items: &[SelectItem],
139        group_by: Option<&[Expression]>,
140        having: Option<&Expression>,
141        ctx: &ExecutionContext<'_>,
142    ) -> Result<PipelineOutput, ExecutorError>;
143
144    /// Apply LIMIT and OFFSET to the results.
145    ///
146    /// This has a default implementation that works for all strategies.
147    ///
148    /// # Arguments
149    /// * `input` - The aggregated/projected data
150    /// * `limit` - Optional maximum number of rows
151    /// * `offset` - Optional number of rows to skip
152    ///
153    /// # Returns
154    /// Final result rows
155    #[inline]
156    fn apply_limit_offset(
157        &self,
158        input: PipelineOutput,
159        limit: Option<u64>,
160        offset: Option<u64>,
161    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
162        let mut rows = input.into_rows();
163
164        // Apply offset
165        if let Some(off) = offset {
166            let off = off as usize;
167            if off >= rows.len() {
168                return Ok(Vec::new());
169            }
170            rows = rows.into_iter().skip(off).collect();
171        }
172
173        // Apply limit
174        if let Some(lim) = limit {
175            rows.truncate(lim as usize);
176        }
177
178        Ok(rows)
179    }
180
181    /// Check if the pipeline supports a specific query pattern.
182    ///
183    /// This allows strategies to indicate they cannot handle certain queries,
184    /// enabling fallback to a more general strategy.
185    ///
186    /// # Arguments
187    /// * `has_aggregation` - Whether the query has aggregate functions
188    /// * `has_group_by` - Whether the query has GROUP BY
189    /// * `has_joins` - Whether the query has JOINs
190    ///
191    /// # Returns
192    /// `true` if this pipeline can handle the query, `false` for fallback
193    fn supports_query_pattern(
194        &self,
195        has_aggregation: bool,
196        has_group_by: bool,
197        has_joins: bool,
198    ) -> bool;
199
200    /// Get the name of this pipeline for debugging/logging.
201    fn name(&self) -> &'static str;
202}