vibesql_executor/pipeline/mod.rs
1//! Unified Execution Pipeline
2//!
3//! This module provides a trait-based abstraction for query execution, enabling
4//! multiple execution strategies (row-oriented, columnar, native columnar) to
5//! share common logic while preserving specialized hot paths.
6//!
7//! ## Architecture
8//!
9//! The `ExecutionPipeline` trait defines the core operations for query execution:
10//! - Filter (WHERE clause)
11//! - Projection (SELECT columns)
12//! - Aggregation (GROUP BY, aggregate functions)
13//! - Limit/Offset
14//!
15//! Each execution strategy implements this trait with its own optimized logic:
16//! - `RowOrientedPipeline`: Traditional row-by-row execution
17//! - `ColumnarPipeline`: SIMD-accelerated batch execution (with row-to-batch conversion)
18//! - `NativeColumnarPipeline`: Zero-copy SIMD execution directly from columnar storage
19//!
20//! ## Benefits
21//!
22//! 1. **Reduced Duplication**: Common logic (evaluator setup, limit/offset) is shared
23//! 2. **Extensibility**: New strategies (GPU, distributed) can implement the trait
24//! 3. **Maintainability**: Single interface for all execution paths
25//! 4. **Performance**: Trait methods can be specialized with `#[inline]` for hot paths
26//!
27//! ## Usage
28//!
29//! ```text
30//! use vibesql_executor::pipeline::{ExecutionPipeline, ExecutionContext};
31//!
32//! let ctx = ExecutionContext::new(schema, database, ...);
33//! let pipeline = RowOrientedPipeline::new(&ctx);
34//!
35//! let filtered = pipeline.apply_filter(input, predicate, &ctx)?;
36//! let projected = pipeline.apply_projection(filtered, select_items, &ctx)?;
37//! let result = pipeline.apply_limit_offset(projected, limit, offset)?;
38//! ```
39
40mod columnar;
41mod context;
42mod native_columnar;
43mod row_oriented;
44mod types;
45
46pub use columnar::ColumnarPipeline;
47pub use context::ExecutionContext;
48pub use native_columnar::NativeColumnarPipeline;
49pub use row_oriented::RowOrientedPipeline;
50pub use types::{PipelineInput, PipelineOutput};
51
52use crate::errors::ExecutorError;
53use crate::evaluator::CombinedExpressionEvaluator;
54use vibesql_ast::{Expression, SelectItem};
55
56/// Unified execution pipeline trait for all query execution strategies.
57///
58/// Each method represents a stage in query execution. Implementations can
59/// override methods to provide optimized logic for their execution model.
60///
61/// # Type Parameters
62///
63/// The trait is designed to work with different input/output representations:
64/// - Row-oriented: Works with `Vec<Row>`
65/// - Columnar: Works with `ColumnarBatch`
66/// - Native columnar: Works directly with table storage
67pub trait ExecutionPipeline {
68 /// Create an evaluator with appropriate context for this pipeline.
69 ///
70 /// This factory method allows each pipeline implementation to create
71 /// evaluators configured for their specific execution model (CTE context,
72 /// outer row for correlation, procedural context, etc.).
73 ///
74 /// # Arguments
75 /// * `ctx` - Execution context containing schema, database, and optional contexts
76 ///
77 /// # Returns
78 /// A `CombinedExpressionEvaluator` configured for this pipeline's execution model
79 ///
80 /// # Default Implementation
81 ///
82 /// The default implementation delegates to `ctx.create_evaluator()`, which handles
83 /// all the context variants (CTE, outer row, procedural, windows).
84 #[inline]
85 fn create_evaluator<'a>(
86 &self,
87 ctx: &'a ExecutionContext<'a>,
88 ) -> CombinedExpressionEvaluator<'a> {
89 ctx.create_evaluator()
90 }
91
92 /// Apply WHERE clause filtering to the input data.
93 ///
94 /// # Arguments
95 /// * `input` - The input data (rows or batches)
96 /// * `predicate` - Optional WHERE clause expression
97 /// * `ctx` - Execution context with schema, database, and evaluator
98 ///
99 /// # Returns
100 /// Filtered output data
101 fn apply_filter(
102 &self,
103 input: PipelineInput<'_>,
104 predicate: Option<&Expression>,
105 ctx: &ExecutionContext<'_>,
106 ) -> Result<PipelineOutput, ExecutorError>;
107
108 /// Apply SELECT projection to transform columns.
109 ///
110 /// # Arguments
111 /// * `input` - The filtered data
112 /// * `select_items` - The SELECT list items
113 /// * `ctx` - Execution context
114 ///
115 /// # Returns
116 /// Projected output data
117 fn apply_projection(
118 &self,
119 input: PipelineInput<'_>,
120 select_items: &[SelectItem],
121 ctx: &ExecutionContext<'_>,
122 ) -> Result<PipelineOutput, ExecutorError>;
123
124 /// Execute aggregation with optional GROUP BY.
125 ///
126 /// # Arguments
127 /// * `input` - The filtered/projected data
128 /// * `select_items` - SELECT list (may contain aggregate functions)
129 /// * `group_by` - Optional GROUP BY expressions
130 /// * `having` - Optional HAVING clause
131 /// * `ctx` - Execution context
132 ///
133 /// # Returns
134 /// Aggregated output data
135 fn apply_aggregation(
136 &self,
137 input: PipelineInput<'_>,
138 select_items: &[SelectItem],
139 group_by: Option<&[Expression]>,
140 having: Option<&Expression>,
141 ctx: &ExecutionContext<'_>,
142 ) -> Result<PipelineOutput, ExecutorError>;
143
144 /// Apply LIMIT and OFFSET to the results.
145 ///
146 /// This has a default implementation that works for all strategies.
147 ///
148 /// # Arguments
149 /// * `input` - The aggregated/projected data
150 /// * `limit` - Optional maximum number of rows
151 /// * `offset` - Optional number of rows to skip
152 ///
153 /// # Returns
154 /// Final result rows
155 #[inline]
156 fn apply_limit_offset(
157 &self,
158 input: PipelineOutput,
159 limit: Option<u64>,
160 offset: Option<u64>,
161 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
162 let mut rows = input.into_rows();
163
164 // Apply offset
165 if let Some(off) = offset {
166 let off = off as usize;
167 if off >= rows.len() {
168 return Ok(Vec::new());
169 }
170 rows = rows.into_iter().skip(off).collect();
171 }
172
173 // Apply limit
174 if let Some(lim) = limit {
175 rows.truncate(lim as usize);
176 }
177
178 Ok(rows)
179 }
180
181 /// Check if the pipeline supports a specific query pattern.
182 ///
183 /// This allows strategies to indicate they cannot handle certain queries,
184 /// enabling fallback to a more general strategy.
185 ///
186 /// # Arguments
187 /// * `has_aggregation` - Whether the query has aggregate functions
188 /// * `has_group_by` - Whether the query has GROUP BY
189 /// * `has_joins` - Whether the query has JOINs
190 ///
191 /// # Returns
192 /// `true` if this pipeline can handle the query, `false` for fallback
193 fn supports_query_pattern(
194 &self,
195 has_aggregation: bool,
196 has_group_by: bool,
197 has_joins: bool,
198 ) -> bool;
199
200 /// Get the name of this pipeline for debugging/logging.
201 fn name(&self) -> &'static str;
202}