vibesql_executor/pipeline/
mod.rs

1//! Unified Execution Pipeline
2//!
3//! This module provides a trait-based abstraction for query execution, enabling
4//! multiple execution strategies (row-oriented, columnar, native columnar) to
5//! share common logic while preserving specialized hot paths.
6//!
7//! ## Architecture
8//!
9//! The `ExecutionPipeline` trait defines the core operations for query execution:
10//! - Filter (WHERE clause)
11//! - Projection (SELECT columns)
12//! - Aggregation (GROUP BY, aggregate functions)
13//! - Limit/Offset
14//!
15//! Each execution strategy implements this trait with its own optimized logic:
16//! - `RowOrientedPipeline`: Traditional row-by-row execution
17//! - `ColumnarPipeline`: SIMD-accelerated batch execution (with row-to-batch conversion)
18//! - `NativeColumnarPipeline`: Zero-copy SIMD execution directly from columnar storage
19//!
20//! ## Benefits
21//!
22//! 1. **Reduced Duplication**: Common logic (evaluator setup, limit/offset) is shared
23//! 2. **Extensibility**: New strategies (GPU, distributed) can implement the trait
24//! 3. **Maintainability**: Single interface for all execution paths
25//! 4. **Performance**: Trait methods can be specialized with `#[inline]` for hot paths
26//!
27//! ## Usage
28//!
29//! ```text
30//! use vibesql_executor::pipeline::{ExecutionPipeline, ExecutionContext};
31//!
32//! let ctx = ExecutionContext::new(schema, database, ...);
33//! let pipeline = RowOrientedPipeline::new(&ctx);
34//!
35//! let filtered = pipeline.apply_filter(input, predicate, &ctx)?;
36//! let projected = pipeline.apply_projection(filtered, select_items, &ctx)?;
37//! let result = pipeline.apply_limit_offset(projected, limit, offset)?;
38//! ```
39
40mod columnar;
41mod context;
42mod native_columnar;
43mod row_oriented;
44mod types;
45
46pub use columnar::ColumnarPipeline;
47pub use context::ExecutionContext;
48pub use native_columnar::NativeColumnarPipeline;
49pub use row_oriented::RowOrientedPipeline;
50pub use types::{PipelineInput, PipelineOutput};
51use vibesql_ast::{Expression, SelectItem};
52
53use crate::{errors::ExecutorError, evaluator::CombinedExpressionEvaluator};
54
55/// Unified execution pipeline trait for all query execution strategies.
56///
57/// Each method represents a stage in query execution. Implementations can
58/// override methods to provide optimized logic for their execution model.
59///
60/// # Type Parameters
61///
62/// The trait is designed to work with different input/output representations:
63/// - Row-oriented: Works with `Vec<Row>`
64/// - Columnar: Works with `ColumnarBatch`
65/// - Native columnar: Works directly with table storage
66pub trait ExecutionPipeline {
67    /// Create an evaluator with appropriate context for this pipeline.
68    ///
69    /// This factory method allows each pipeline implementation to create
70    /// evaluators configured for their specific execution model (CTE context,
71    /// outer row for correlation, procedural context, etc.).
72    ///
73    /// # Arguments
74    /// * `ctx` - Execution context containing schema, database, and optional contexts
75    ///
76    /// # Returns
77    /// A `CombinedExpressionEvaluator` configured for this pipeline's execution model
78    ///
79    /// # Default Implementation
80    ///
81    /// The default implementation delegates to `ctx.create_evaluator()`, which handles
82    /// all the context variants (CTE, outer row, procedural, windows).
83    #[inline]
84    fn create_evaluator<'a>(
85        &self,
86        ctx: &'a ExecutionContext<'a>,
87    ) -> CombinedExpressionEvaluator<'a> {
88        ctx.create_evaluator()
89    }
90
91    /// Apply WHERE clause filtering to the input data.
92    ///
93    /// # Arguments
94    /// * `input` - The input data (rows or batches)
95    /// * `predicate` - Optional WHERE clause expression
96    /// * `ctx` - Execution context with schema, database, and evaluator
97    ///
98    /// # Returns
99    /// Filtered output data
100    fn apply_filter(
101        &self,
102        input: PipelineInput<'_>,
103        predicate: Option<&Expression>,
104        ctx: &ExecutionContext<'_>,
105    ) -> Result<PipelineOutput, ExecutorError>;
106
107    /// Apply SELECT projection to transform columns.
108    ///
109    /// # Arguments
110    /// * `input` - The filtered data
111    /// * `select_items` - The SELECT list items
112    /// * `ctx` - Execution context
113    ///
114    /// # Returns
115    /// Projected output data
116    fn apply_projection(
117        &self,
118        input: PipelineInput<'_>,
119        select_items: &[SelectItem],
120        ctx: &ExecutionContext<'_>,
121    ) -> Result<PipelineOutput, ExecutorError>;
122
123    /// Execute aggregation with optional GROUP BY.
124    ///
125    /// # Arguments
126    /// * `input` - The filtered/projected data
127    /// * `select_items` - SELECT list (may contain aggregate functions)
128    /// * `group_by` - Optional GROUP BY expressions
129    /// * `having` - Optional HAVING clause
130    /// * `ctx` - Execution context
131    ///
132    /// # Returns
133    /// Aggregated output data
134    fn apply_aggregation(
135        &self,
136        input: PipelineInput<'_>,
137        select_items: &[SelectItem],
138        group_by: Option<&[Expression]>,
139        having: Option<&Expression>,
140        ctx: &ExecutionContext<'_>,
141    ) -> Result<PipelineOutput, ExecutorError>;
142
143    /// Apply LIMIT and OFFSET to the results.
144    ///
145    /// This has a default implementation that works for all strategies.
146    ///
147    /// # Arguments
148    /// * `input` - The aggregated/projected data
149    /// * `limit` - Optional maximum number of rows
150    /// * `offset` - Optional number of rows to skip
151    ///
152    /// # Returns
153    /// Final result rows
154    #[inline]
155    fn apply_limit_offset(
156        &self,
157        input: PipelineOutput,
158        limit: Option<u64>,
159        offset: Option<u64>,
160    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
161        let mut rows = input.into_rows();
162
163        // Apply offset
164        if let Some(off) = offset {
165            let off = off as usize;
166            if off >= rows.len() {
167                return Ok(Vec::new());
168            }
169            rows = rows.into_iter().skip(off).collect();
170        }
171
172        // Apply limit
173        if let Some(lim) = limit {
174            rows.truncate(lim as usize);
175        }
176
177        Ok(rows)
178    }
179
180    /// Check if the pipeline supports a specific query pattern.
181    ///
182    /// This allows strategies to indicate they cannot handle certain queries,
183    /// enabling fallback to a more general strategy.
184    ///
185    /// # Arguments
186    /// * `has_aggregation` - Whether the query has aggregate functions
187    /// * `has_group_by` - Whether the query has GROUP BY
188    /// * `has_joins` - Whether the query has JOINs
189    ///
190    /// # Returns
191    /// `true` if this pipeline can handle the query, `false` for fallback
192    fn supports_query_pattern(
193        &self,
194        has_aggregation: bool,
195        has_group_by: bool,
196        has_joins: bool,
197    ) -> bool;
198
199    /// Get the name of this pipeline for debugging/logging.
200    fn name(&self) -> &'static str;
201}