vibesql_executor/pipeline/mod.rs
1//! Unified Execution Pipeline
2//!
3//! This module provides a trait-based abstraction for query execution, enabling
4//! multiple execution strategies (row-oriented, columnar, native columnar) to
5//! share common logic while preserving specialized hot paths.
6//!
7//! ## Architecture
8//!
9//! The `ExecutionPipeline` trait defines the core operations for query execution:
10//! - Filter (WHERE clause)
11//! - Projection (SELECT columns)
12//! - Aggregation (GROUP BY, aggregate functions)
13//! - Limit/Offset
14//!
15//! Each execution strategy implements this trait with its own optimized logic:
16//! - `RowOrientedPipeline`: Traditional row-by-row execution
17//! - `ColumnarPipeline`: SIMD-accelerated batch execution (with row-to-batch conversion)
18//! - `NativeColumnarPipeline`: Zero-copy SIMD execution directly from columnar storage
19//!
20//! ## Benefits
21//!
22//! 1. **Reduced Duplication**: Common logic (evaluator setup, limit/offset) is shared
23//! 2. **Extensibility**: New strategies (GPU, distributed) can implement the trait
24//! 3. **Maintainability**: Single interface for all execution paths
25//! 4. **Performance**: Trait methods can be specialized with `#[inline]` for hot paths
26//!
27//! ## Usage
28//!
29//! ```text
30//! use vibesql_executor::pipeline::{ExecutionPipeline, ExecutionContext};
31//!
32//! let ctx = ExecutionContext::new(schema, database, ...);
33//! let pipeline = RowOrientedPipeline::new(&ctx);
34//!
35//! let filtered = pipeline.apply_filter(input, predicate, &ctx)?;
36//! let projected = pipeline.apply_projection(filtered, select_items, &ctx)?;
37//! let result = pipeline.apply_limit_offset(projected, limit, offset)?;
38//! ```
39
40mod columnar;
41mod context;
42mod native_columnar;
43mod row_oriented;
44mod types;
45
46pub use columnar::ColumnarPipeline;
47pub use context::ExecutionContext;
48pub use native_columnar::NativeColumnarPipeline;
49pub use row_oriented::RowOrientedPipeline;
50pub use types::{PipelineInput, PipelineOutput};
51use vibesql_ast::{Expression, SelectItem};
52
53use crate::{errors::ExecutorError, evaluator::CombinedExpressionEvaluator};
54
55/// Unified execution pipeline trait for all query execution strategies.
56///
57/// Each method represents a stage in query execution. Implementations can
58/// override methods to provide optimized logic for their execution model.
59///
60/// # Type Parameters
61///
62/// The trait is designed to work with different input/output representations:
63/// - Row-oriented: Works with `Vec<Row>`
64/// - Columnar: Works with `ColumnarBatch`
65/// - Native columnar: Works directly with table storage
66pub trait ExecutionPipeline {
67 /// Create an evaluator with appropriate context for this pipeline.
68 ///
69 /// This factory method allows each pipeline implementation to create
70 /// evaluators configured for their specific execution model (CTE context,
71 /// outer row for correlation, procedural context, etc.).
72 ///
73 /// # Arguments
74 /// * `ctx` - Execution context containing schema, database, and optional contexts
75 ///
76 /// # Returns
77 /// A `CombinedExpressionEvaluator` configured for this pipeline's execution model
78 ///
79 /// # Default Implementation
80 ///
81 /// The default implementation delegates to `ctx.create_evaluator()`, which handles
82 /// all the context variants (CTE, outer row, procedural, windows).
83 #[inline]
84 fn create_evaluator<'a>(
85 &self,
86 ctx: &'a ExecutionContext<'a>,
87 ) -> CombinedExpressionEvaluator<'a> {
88 ctx.create_evaluator()
89 }
90
91 /// Apply WHERE clause filtering to the input data.
92 ///
93 /// # Arguments
94 /// * `input` - The input data (rows or batches)
95 /// * `predicate` - Optional WHERE clause expression
96 /// * `ctx` - Execution context with schema, database, and evaluator
97 ///
98 /// # Returns
99 /// Filtered output data
100 fn apply_filter(
101 &self,
102 input: PipelineInput<'_>,
103 predicate: Option<&Expression>,
104 ctx: &ExecutionContext<'_>,
105 ) -> Result<PipelineOutput, ExecutorError>;
106
107 /// Apply SELECT projection to transform columns.
108 ///
109 /// # Arguments
110 /// * `input` - The filtered data
111 /// * `select_items` - The SELECT list items
112 /// * `ctx` - Execution context
113 ///
114 /// # Returns
115 /// Projected output data
116 fn apply_projection(
117 &self,
118 input: PipelineInput<'_>,
119 select_items: &[SelectItem],
120 ctx: &ExecutionContext<'_>,
121 ) -> Result<PipelineOutput, ExecutorError>;
122
123 /// Execute aggregation with optional GROUP BY.
124 ///
125 /// # Arguments
126 /// * `input` - The filtered/projected data
127 /// * `select_items` - SELECT list (may contain aggregate functions)
128 /// * `group_by` - Optional GROUP BY expressions
129 /// * `having` - Optional HAVING clause
130 /// * `ctx` - Execution context
131 ///
132 /// # Returns
133 /// Aggregated output data
134 fn apply_aggregation(
135 &self,
136 input: PipelineInput<'_>,
137 select_items: &[SelectItem],
138 group_by: Option<&[Expression]>,
139 having: Option<&Expression>,
140 ctx: &ExecutionContext<'_>,
141 ) -> Result<PipelineOutput, ExecutorError>;
142
143 /// Apply LIMIT and OFFSET to the results.
144 ///
145 /// This has a default implementation that works for all strategies.
146 ///
147 /// # Arguments
148 /// * `input` - The aggregated/projected data
149 /// * `limit` - Optional maximum number of rows
150 /// * `offset` - Optional number of rows to skip
151 ///
152 /// # Returns
153 /// Final result rows
154 #[inline]
155 fn apply_limit_offset(
156 &self,
157 input: PipelineOutput,
158 limit: Option<u64>,
159 offset: Option<u64>,
160 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
161 let mut rows = input.into_rows();
162
163 // Apply offset
164 if let Some(off) = offset {
165 let off = off as usize;
166 if off >= rows.len() {
167 return Ok(Vec::new());
168 }
169 rows = rows.into_iter().skip(off).collect();
170 }
171
172 // Apply limit
173 if let Some(lim) = limit {
174 rows.truncate(lim as usize);
175 }
176
177 Ok(rows)
178 }
179
180 /// Check if the pipeline supports a specific query pattern.
181 ///
182 /// This allows strategies to indicate they cannot handle certain queries,
183 /// enabling fallback to a more general strategy.
184 ///
185 /// # Arguments
186 /// * `has_aggregation` - Whether the query has aggregate functions
187 /// * `has_group_by` - Whether the query has GROUP BY
188 /// * `has_joins` - Whether the query has JOINs
189 ///
190 /// # Returns
191 /// `true` if this pipeline can handle the query, `false` for fallback
192 fn supports_query_pattern(
193 &self,
194 has_aggregation: bool,
195 has_group_by: bool,
196 has_joins: bool,
197 ) -> bool;
198
199 /// Get the name of this pipeline for debugging/logging.
200 fn name(&self) -> &'static str;
201}