vibesql_executor/select/executor/execute.rs
1//! Main execution methods for SelectExecutor
2//!
3//! This module implements the unified execution dispatcher that routes queries
4//! to the appropriate execution pipeline based on the selected strategy.
5//!
6//! ## Execution Pipeline Architecture
7//!
8//! The dispatcher uses the `ExecutionPipeline` trait to provide a unified interface
9//! for query execution across different strategies:
10//!
11//! - **NativeColumnar**: Zero-copy SIMD execution from columnar storage
12//! - **StandardColumnar**: SIMD execution with row-to-batch conversion
13//! - **RowOriented**: Traditional row-by-row execution
14//! - **ExpressionOnly**: SELECT without FROM clause (special case)
15//!
16//! ```text
17//! Strategy Selection → Create Pipeline → Execute Pipeline Stages → Results
18//! ↓
19//! apply_filter → apply_projection → apply_aggregation → apply_limit_offset
20//! ```
21
22use std::collections::HashMap;
23
24use super::builder::SelectExecutor;
25use crate::{
26 errors::ExecutorError,
27 optimizer::adaptive::{choose_execution_strategy, ExecutionStrategy, StrategyContext},
28 pipeline::{
29 ColumnarPipeline, ExecutionContext, ExecutionPipeline, NativeColumnarPipeline,
30 PipelineInput,
31 },
32 select::{
33 cte::{execute_ctes, execute_ctes_with_memory_check, CteResult},
34 helpers::{apply_limit_offset, estimate_result_size},
35 join::FromResult,
36 set_operations::apply_set_operation,
37 SelectResult,
38 },
39};
40
41impl SelectExecutor<'_> {
42 /// Execute a SELECT statement
43 pub fn execute(
44 &self,
45 stmt: &vibesql_ast::SelectStmt,
46 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
47 #[cfg(feature = "profile-q6")]
48 let execute_start = std::time::Instant::now();
49
50 // Reset arena for fresh query execution (only at top level)
51 if self.subquery_depth == 0 {
52 self.reset_arena();
53 }
54
55 // Check timeout before starting execution
56 self.check_timeout()?;
57
58 // Check subquery depth limit to prevent stack overflow
59 if self.subquery_depth >= crate::limits::MAX_EXPRESSION_DEPTH {
60 return Err(ExecutorError::ExpressionDepthExceeded {
61 depth: self.subquery_depth,
62 max_depth: crate::limits::MAX_EXPRESSION_DEPTH,
63 });
64 }
65
66 // Fast path for simple point-lookup queries (TPC-C optimization)
67 // This bypasses expensive optimizer passes for queries like:
68 // SELECT col FROM table WHERE pk = value
69 if self.subquery_depth == 0
70 && self.outer_row.is_none()
71 && self.cte_context.is_none()
72 && super::fast_path::is_simple_point_query(stmt)
73 {
74 return self.execute_fast_path(stmt);
75 }
76
77 // Streaming aggregate fast path (#3815)
78 // For queries like: SELECT SUM(k) FROM sbtest1 WHERE id BETWEEN ? AND ?
79 // Accumulates aggregates inline during PK range scan without materializing rows
80 if self.subquery_depth == 0
81 && self.outer_row.is_none()
82 && self.cte_context.is_none()
83 && super::fast_path::is_streaming_aggregate_query(stmt)
84 {
85 if let Ok(result) = self.execute_streaming_aggregate(stmt) {
86 return Ok(result);
87 }
88 // Fall through to standard path if streaming aggregate fails
89 }
90
91 #[cfg(feature = "profile-q6")]
92 let _setup_time = execute_start.elapsed();
93
94 // Apply subquery rewriting optimizations (Phase 2 of IN subquery optimization)
95 // - Rewrites correlated IN → EXISTS with LIMIT 1 for early termination
96 // - Adds DISTINCT to uncorrelated IN subqueries to reduce duplicate processing
97 // This works in conjunction with Phase 1 (HashSet optimization, #2136)
98 #[cfg(feature = "profile-q6")]
99 let optimizer_start = std::time::Instant::now();
100
101 let optimized_stmt = crate::optimizer::rewrite_subquery_optimizations(stmt);
102
103 #[cfg(feature = "profile-q6")]
104 let _optimizer_time = optimizer_start.elapsed();
105
106 // Eliminate unused tables that create unnecessary cross joins (#3556)
107 // Must run BEFORE semi-join transformation to avoid complex interactions
108 // with derived tables from EXISTS/IN transformations
109 let optimized_stmt = crate::optimizer::eliminate_unused_tables(&optimized_stmt);
110
111 // Transform decorrelated IN/EXISTS subqueries to semi/anti-joins (#2424)
112 // This enables hash-based join execution instead of row-by-row subquery evaluation
113 // Converts WHERE clauses like "WHERE x IN (SELECT y FROM t)" to "SEMI JOIN t ON x = y"
114 let optimized_stmt = crate::optimizer::transform_subqueries_to_joins(&optimized_stmt);
115
116 // Execute CTEs if present and merge with outer query's CTE context
117 let mut cte_results = if let Some(with_clause) = &optimized_stmt.with_clause {
118 // This query has its own CTEs - execute them with memory tracking
119 execute_ctes_with_memory_check(
120 with_clause,
121 |query, cte_ctx| self.execute_with_ctes(query, cte_ctx),
122 |size| self.track_memory_allocation(size),
123 )?
124 } else {
125 HashMap::new()
126 };
127
128 // If we have access to outer query's CTEs (for subqueries), merge them in
129 // Local CTEs take precedence over outer CTEs if there are name conflicts
130 if let Some(outer_cte_ctx) = self.cte_context {
131 for (name, result) in outer_cte_ctx {
132 cte_results.entry(name.clone()).or_insert_with(|| result.clone());
133 }
134 }
135
136 #[cfg(feature = "profile-q6")]
137 let _pre_execute_time = execute_start.elapsed();
138
139 // Execute the main query with CTE context
140 let result = self.execute_with_ctes(&optimized_stmt, &cte_results)?;
141
142 #[cfg(feature = "profile-q6")]
143 {
144 let _total_execute = execute_start.elapsed();
145 }
146
147 Ok(result)
148 }
149
150 /// Execute a SELECT statement and return an iterator over results
151 ///
152 /// This enables early termination when the full result set is not needed,
153 /// such as for IN subqueries where we stop after finding the first match.
154 ///
155 /// # Phase 1 Implementation (Early Termination for IN subqueries)
156 ///
157 /// Current implementation materializes results then returns an iterator.
158 /// This still enables early termination in the consumer (e.g., eval_in_subquery)
159 /// by stopping iteration when a match is found.
160 ///
161 /// Future optimization: Leverage the existing RowIterator infrastructure
162 /// (crate::select::iterator) for truly lazy evaluation that stops execution
163 /// early, not just iteration.
164 pub fn execute_iter(
165 &self,
166 stmt: &vibesql_ast::SelectStmt,
167 ) -> Result<impl Iterator<Item = vibesql_storage::Row>, ExecutorError> {
168 // For Phase 1, materialize then return iterator
169 // This still enables early termination in the consumer
170 let rows = self.execute(stmt)?;
171 Ok(rows.into_iter())
172 }
173
174 /// Execute a SELECT statement using the fast path directly
175 ///
176 /// This method is used by prepared statements with cached SimpleFastPath plans.
177 /// It bypasses the `is_simple_point_query()` check because the eligibility was
178 /// already determined at prepare time.
179 ///
180 /// # Performance
181 ///
182 /// For repeated execution of prepared statements, this saves the cost of
183 /// re-checking fast path eligibility on every execution (~5-10µs per query).
184 pub fn execute_fast_path_with_columns(
185 &self,
186 stmt: &vibesql_ast::SelectStmt,
187 ) -> Result<SelectResult, ExecutorError> {
188 // Reset arena for fresh query execution
189 if self.subquery_depth == 0 {
190 self.reset_arena();
191 }
192
193 // Check timeout before starting execution
194 self.check_timeout()?;
195
196 // Execute via fast path directly (skip is_simple_point_query check)
197 let rows = self.execute_fast_path(stmt)?;
198
199 // Derive column names from the SELECT list
200 // For fast path queries, we don't have a FromResult, so pass None
201 // The column derivation will use the SELECT list expressions directly
202 let columns = self.derive_fast_path_column_names(stmt)?;
203
204 Ok(SelectResult { columns, rows })
205 }
206
207 /// Derive column names for fast path execution
208 ///
209 /// For fast path queries, we derive column names directly from the SELECT list
210 /// and table schema without going through the full FROM clause execution.
211 ///
212 /// # Performance Note (#3780)
213 ///
214 /// This method is called by `Session::execute_prepared()` to cache column names
215 /// in `SimpleFastPathPlan`. After the first execution, cached column names are
216 /// reused to avoid repeated table lookups and column name derivation.
217 pub fn derive_fast_path_column_names(
218 &self,
219 stmt: &vibesql_ast::SelectStmt,
220 ) -> Result<Vec<String>, ExecutorError> {
221 use vibesql_ast::{FromClause, SelectItem};
222
223 // Get table name and schema for column resolution
224 let (table_name, table_alias) = match &stmt.from {
225 Some(FromClause::Table { name, alias, .. }) => (name.as_str(), alias.as_deref()),
226 _ => {
227 return Err(ExecutorError::Other(
228 "Fast path requires simple table FROM clause".to_string(),
229 ))
230 }
231 };
232
233 let table = self.database.get_table(table_name).ok_or_else(|| {
234 ExecutorError::TableNotFound(table_name.to_string())
235 })?;
236
237 let mut columns = Vec::with_capacity(stmt.select_list.len());
238
239 for item in &stmt.select_list {
240 match item {
241 SelectItem::Wildcard { .. } => {
242 // Add all columns from the table
243 for col in &table.schema.columns {
244 columns.push(col.name.clone());
245 }
246 }
247 SelectItem::QualifiedWildcard { qualifier, .. } => {
248 // Check if qualifier matches table name or alias
249 let effective_name = table_alias.unwrap_or(table_name);
250 if qualifier.eq_ignore_ascii_case(effective_name)
251 || qualifier.eq_ignore_ascii_case(table_name)
252 {
253 for col in &table.schema.columns {
254 columns.push(col.name.clone());
255 }
256 }
257 }
258 SelectItem::Expression { expr, alias: col_alias } => {
259 // Use alias if provided, otherwise derive from expression
260 let col_name = if let Some(a) = col_alias {
261 a.clone()
262 } else {
263 self.derive_column_name_from_expr(expr)
264 };
265 columns.push(col_name);
266 }
267 }
268 }
269
270 Ok(columns)
271 }
272
273 /// Derive a column name from an expression
274 fn derive_column_name_from_expr(&self, expr: &vibesql_ast::Expression) -> String {
275 match expr {
276 vibesql_ast::Expression::ColumnRef { column, .. } => column.clone(),
277 vibesql_ast::Expression::Literal(val) => format!("{}", val),
278 _ => "?column?".to_string(),
279 }
280 }
281
282 /// Execute a SELECT statement and return both columns and rows
283 pub fn execute_with_columns(
284 &self,
285 stmt: &vibesql_ast::SelectStmt,
286 ) -> Result<SelectResult, ExecutorError> {
287 // First, get the FROM result to access the schema
288 let from_result = if let Some(from_clause) = &stmt.from {
289 let mut cte_results = if let Some(with_clause) = &stmt.with_clause {
290 execute_ctes(with_clause, |query, cte_ctx| self.execute_with_ctes(query, cte_ctx))?
291 } else {
292 HashMap::new()
293 };
294 // If we have access to outer query's CTEs (for subqueries/derived tables), merge them in
295 // Local CTEs take precedence over outer CTEs if there are name conflicts
296 // This is critical for queries like TPC-DS Q2 where CTEs are referenced from derived tables
297 if let Some(outer_cte_ctx) = self.cte_context {
298 for (name, result) in outer_cte_ctx {
299 cte_results.entry(name.clone()).or_insert_with(|| result.clone());
300 }
301 }
302 // Pass WHERE, ORDER BY, and LIMIT for optimizations
303 // This is critical for GROUP BY queries to avoid CROSS JOINs
304 // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
305 // Pass select_list for table elimination optimization (#3556)
306 Some(self.execute_from_with_where(
307 from_clause,
308 &cte_results,
309 stmt.where_clause.as_ref(),
310 stmt.order_by.as_deref(),
311 stmt.limit,
312 Some(&stmt.select_list),
313 )?)
314 } else {
315 None
316 };
317
318 // Derive column names from the SELECT list
319 let columns = self.derive_column_names(&stmt.select_list, from_result.as_ref())?;
320
321 // Execute the query to get rows
322 let rows = self.execute(stmt)?;
323
324 Ok(SelectResult { columns, rows })
325 }
326
327 /// Execute SELECT statement with CTE context
328 ///
329 /// Uses unified strategy selection to determine the optimal execution path:
330 /// - NativeColumnar: Zero-copy SIMD execution from columnar storage
331 /// - StandardColumnar: SIMD execution with row-to-batch conversion
332 /// - RowOriented: Traditional row-by-row execution
333 /// - ExpressionOnly: SELECT without FROM clause (special case)
334 ///
335 /// ## Pipeline-Based Execution (Phase 5)
336 ///
337 /// This method uses the `ExecutionPipeline` trait to provide a unified interface
338 /// for query execution. Each strategy creates an appropriate pipeline that
339 /// implements filter, projection, aggregation, and limit/offset operations.
340 ///
341 /// ```text
342 /// Strategy Selection → Create Pipeline → Execute via Trait Methods
343 /// ↓
344 /// NativeColumnar → NativeColumnarPipeline::apply_*()
345 /// StandardColumnar → ColumnarPipeline::apply_*()
346 /// RowOriented → RowOrientedPipeline::apply_*()
347 /// ExpressionOnly → Special case (no table scan)
348 /// ```
349 pub(super) fn execute_with_ctes(
350 &self,
351 stmt: &vibesql_ast::SelectStmt,
352 cte_results: &HashMap<String, CteResult>,
353 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
354 #[cfg(feature = "profile-q6")]
355 let _execute_ctes_start = std::time::Instant::now();
356
357 // Check if native columnar is enabled via feature flag or env var
358 let native_columnar_enabled =
359 cfg!(feature = "native-columnar") || std::env::var("VIBESQL_NATIVE_COLUMNAR").is_ok();
360
361 // Use unified strategy selection for the execution path
362 let strategy_ctx = StrategyContext::new(stmt, cte_results, native_columnar_enabled);
363 let strategy = choose_execution_strategy(&strategy_ctx);
364
365 log::debug!(
366 "Execution strategy selected: {} (reason: {})",
367 strategy.name(),
368 strategy.score().reason
369 );
370
371 #[cfg(feature = "profile-q6")]
372 eprintln!(
373 "[PROFILE-Q6] Execution strategy: {} ({})",
374 strategy.name(),
375 strategy.score().reason
376 );
377
378 // Dispatch based on selected strategy using ExecutionPipeline trait
379 // Pipeline execution returns Option<Vec<Row>> - None means fallback needed
380 let mut results = match strategy {
381 ExecutionStrategy::NativeColumnar { .. } => {
382 // First try the optimized zero-copy native columnar path
383 // This uses ColumnarBatch::from_storage_columnar() for zero-copy conversion
384 // and executes filter+aggregate in a single pass without row materialization
385 if let Some(result) = self.try_native_columnar_execution(stmt, cte_results)? {
386 #[cfg(feature = "profile-q6")]
387 eprintln!("[PROFILE-Q6] Native columnar: zero-copy path succeeded");
388 result
389 } else {
390 // Fall back to pipeline-based execution if zero-copy path is not applicable
391 // (e.g., complex predicates, multiple tables, unsupported aggregates)
392 log::debug!("Native columnar: zero-copy path not applicable, trying pipeline");
393 match self.execute_via_pipeline(
394 stmt,
395 cte_results,
396 NativeColumnarPipeline::new,
397 "NativeColumnar",
398 )? {
399 Some(result) => result,
400 None => {
401 // Fall back to row-oriented if pipeline also fails
402 log::debug!("Native columnar runtime fallback to row-oriented");
403 #[cfg(feature = "profile-q6")]
404 eprintln!("[PROFILE-Q6] Native columnar fallback to row-oriented");
405 self.execute_row_oriented(stmt, cte_results)?
406 }
407 }
408 }
409 }
410
411 ExecutionStrategy::StandardColumnar { .. } => {
412 // StandardColumnar uses the pipeline-based execution path
413 // Note: We don't use try_native_columnar_execution here because row tables
414 // go through the pipeline which correctly handles all data types including dates.
415 // The native columnar zero-copy path has known limitations with certain date comparisons.
416 match self.execute_via_pipeline(
417 stmt,
418 cte_results,
419 ColumnarPipeline::new,
420 "StandardColumnar",
421 )? {
422 Some(result) => result,
423 None => {
424 log::debug!("Standard columnar runtime fallback to row-oriented");
425 #[cfg(feature = "profile-q6")]
426 eprintln!("[PROFILE-Q6] Standard columnar fallback to row-oriented");
427 self.execute_row_oriented(stmt, cte_results)?
428 }
429 }
430 }
431
432 ExecutionStrategy::RowOriented { .. } => {
433 // Row-oriented uses the traditional path which has full feature support
434 // The RowOrientedPipeline is used for simpler queries, but complex
435 // queries (with JOINs, window functions, DISTINCT, etc.) need the
436 // full execute_row_oriented implementation
437
438 // Phase 4: Try columnar join execution for multi-table JOIN queries (#2943)
439 // This provides 3-5x speedup for TPC-H Q3 style queries
440 let has_joins = stmt
441 .from
442 .as_ref()
443 .is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
444 if has_joins {
445 if let Some(result) = self.try_columnar_join_execution(stmt, cte_results)? {
446 log::info!("Columnar join execution succeeded");
447 // Apply LIMIT/OFFSET to columnar join results (#3776)
448 // Skip if set_operation exists - it will be applied later
449 if stmt.set_operation.is_none() {
450 apply_limit_offset(result, stmt.limit, stmt.offset)
451 } else {
452 result
453 }
454 } else {
455 log::debug!(
456 "Columnar join execution not applicable, falling back to row-oriented"
457 );
458 self.execute_row_oriented(stmt, cte_results)?
459 }
460 } else {
461 self.execute_row_oriented(stmt, cte_results)?
462 }
463 }
464
465 ExecutionStrategy::ExpressionOnly { .. } => {
466 // SELECT without FROM - special case that doesn't use pipelines
467 // May still have aggregates (e.g., SELECT COUNT(*), SELECT MAX(1))
468 // Note: Do NOT use early return here - we need to fall through to set operations handling
469 self.execute_expression_only(stmt, cte_results)?
470 }
471 };
472
473 // Handle set operations (UNION, INTERSECT, EXCEPT)
474 // Process operations left-to-right to ensure correct associativity
475 if let Some(set_op) = &stmt.set_operation {
476 results = self.execute_set_operations(results, set_op, cte_results)?;
477
478 // Apply LIMIT/OFFSET to the final result (after all set operations)
479 // For queries WITHOUT set operations, LIMIT/OFFSET is already applied
480 // in execute_without_aggregation() or execute_with_aggregation()
481 results = apply_limit_offset(results, stmt.limit, stmt.offset);
482 }
483
484 Ok(results)
485 }
486
487 /// Execute SELECT without FROM clause (ExpressionOnly strategy)
488 ///
489 /// This is a special case that doesn't use the pipeline trait since there's
490 /// no table scan involved. Handles both simple expressions and aggregates.
491 fn execute_expression_only(
492 &self,
493 stmt: &vibesql_ast::SelectStmt,
494 cte_results: &HashMap<String, CteResult>,
495 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
496 let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
497
498 if has_aggregates {
499 // Aggregates without FROM need the aggregation path
500 self.execute_with_aggregation(stmt, cte_results)
501 } else {
502 // Simple expression evaluation (e.g., SELECT 1 + 1)
503 self.execute_select_without_from(stmt)
504 }
505 }
506
507 /// Execute a query using the specified execution pipeline
508 ///
509 /// This method provides a unified interface for pipeline-based execution.
510 /// It creates the pipeline, prepares input, and executes the pipeline stages.
511 ///
512 /// Returns `Ok(Some(results))` if the pipeline executed successfully,
513 /// `Ok(None)` if the pipeline cannot handle the query (fallback needed),
514 /// or `Err` if an error occurred.
515 ///
516 /// # Type Parameters
517 ///
518 /// * `P` - The pipeline type (must implement `ExecutionPipeline`)
519 /// * `F` - Factory function to create the pipeline
520 fn execute_via_pipeline<P, F>(
521 &self,
522 stmt: &vibesql_ast::SelectStmt,
523 cte_results: &HashMap<String, CteResult>,
524 create_pipeline: F,
525 strategy_name: &str,
526 ) -> Result<Option<Vec<vibesql_storage::Row>>, ExecutorError>
527 where
528 P: ExecutionPipeline,
529 F: FnOnce() -> P,
530 {
531 #[cfg(feature = "profile-q6")]
532 let start = std::time::Instant::now();
533
534 // Check query complexity - pipelines don't support all features
535 let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
536 let has_group_by = stmt.group_by.is_some();
537 let has_joins =
538 stmt.from.as_ref().is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
539 let has_order_by = stmt.order_by.is_some();
540 let has_distinct = stmt.distinct;
541 let has_set_ops = stmt.set_operation.is_some();
542 let has_window_funcs = self.has_window_functions(&stmt.select_list);
543 let has_distinct_aggregates = self.has_distinct_aggregates(&stmt.select_list);
544
545 // Create the pipeline
546 let pipeline = create_pipeline();
547
548 // Check if the pipeline supports this query pattern
549 if !pipeline.supports_query_pattern(has_aggregates, has_group_by, has_joins) {
550 log::debug!(
551 "{} pipeline doesn't support query pattern (agg={}, group_by={}, joins={})",
552 strategy_name,
553 has_aggregates,
554 has_group_by,
555 has_joins
556 );
557 return Ok(None);
558 }
559
560 // For complex queries (ORDER BY, DISTINCT, window functions, set ops, DISTINCT aggregates),
561 // fall back to full execution paths which have complete support
562 if has_order_by
563 || has_distinct
564 || has_window_funcs
565 || has_set_ops
566 || has_distinct_aggregates
567 {
568 log::debug!(
569 "{} pipeline doesn't support complex features (order_by={}, distinct={}, window={}, set_ops={}, distinct_agg={})",
570 strategy_name,
571 has_order_by,
572 has_distinct,
573 has_window_funcs,
574 has_set_ops,
575 has_distinct_aggregates
576 );
577 return Ok(None);
578 }
579
580 // Must have a FROM clause for pipeline execution
581 let from_clause = match &stmt.from {
582 Some(from) => from,
583 None => return Ok(None),
584 };
585
586 // Execute FROM clause to get input data
587 // Note: WHERE, ORDER BY, and LIMIT are handled by the pipeline, not here
588 // Note: Table elimination requires WHERE clause, so pass None for select_list too
589 let from_result = self.execute_from_with_where(
590 from_clause,
591 cte_results,
592 None, // Pipeline will apply WHERE filter
593 None, // ORDER BY handled separately
594 None, // LIMIT applied after pipeline
595 None, // No table elimination when WHERE is deferred
596 )?;
597
598 // Build execution context
599 let mut exec_ctx = ExecutionContext::new(&from_result.schema, self.database);
600 // Add outer context for correlated subqueries (#2998)
601 if let (Some(outer_row), Some(outer_schema)) = (self.outer_row, self.outer_schema) {
602 exec_ctx = exec_ctx.with_outer_context(outer_row, outer_schema);
603 }
604 // Add CTE context if available
605 if !cte_results.is_empty() {
606 exec_ctx = exec_ctx.with_cte_context(cte_results);
607 }
608
609 // Validate column references BEFORE processing
610 super::validation::validate_select_columns_with_context(
611 &stmt.select_list,
612 stmt.where_clause.as_ref(),
613 &from_result.schema,
614 self.procedural_context,
615 self.outer_schema,
616 )?;
617
618 // Prepare input from FROM result
619 let input = PipelineInput::from_rows_owned(from_result.data.into_rows());
620
621 // Execute pipeline stages with fallback on error
622 // If any pipeline stage fails with UnsupportedFeature, fall back to row-oriented
623
624 // Stage 1: Filter (WHERE clause)
625 let filtered = match pipeline.apply_filter(input, stmt.where_clause.as_ref(), &exec_ctx) {
626 Ok(result) => result,
627 Err(ExecutorError::UnsupportedFeature(_))
628 | Err(ExecutorError::UnsupportedExpression(_)) => {
629 log::debug!("{} pipeline filter failed, falling back", strategy_name);
630 return Ok(None);
631 }
632 Err(e) => return Err(e),
633 };
634
635 // Stage 2: Projection or Aggregation
636 let result = if has_aggregates || has_group_by {
637 // Execute aggregation (includes projection)
638 // Get GROUP BY expressions if present (as slice)
639 let group_by_slice: Option<&[vibesql_ast::Expression]> =
640 stmt.group_by.as_ref().and_then(|g| g.as_simple()).map(|v| v.as_slice());
641 match pipeline.apply_aggregation(
642 filtered.into_input(),
643 &stmt.select_list,
644 group_by_slice,
645 stmt.having.as_ref(),
646 &exec_ctx,
647 ) {
648 Ok(result) => result,
649 Err(ExecutorError::UnsupportedFeature(_))
650 | Err(ExecutorError::UnsupportedExpression(_)) => {
651 log::debug!("{} pipeline aggregation failed, falling back", strategy_name);
652 return Ok(None);
653 }
654 Err(e) => return Err(e),
655 }
656 } else {
657 // Execute projection only
658 match pipeline.apply_projection(filtered.into_input(), &stmt.select_list, &exec_ctx) {
659 Ok(result) => result,
660 Err(ExecutorError::UnsupportedFeature(_))
661 | Err(ExecutorError::UnsupportedExpression(_)) => {
662 log::debug!("{} pipeline projection failed, falling back", strategy_name);
663 return Ok(None);
664 }
665 Err(e) => return Err(e),
666 }
667 };
668
669 // Stage 3: Limit/Offset (convert usize to u64)
670 let limit_u64 = stmt.limit.map(|l| l as u64);
671 let offset_u64 = stmt.offset.map(|o| o as u64);
672 let final_result = pipeline.apply_limit_offset(result, limit_u64, offset_u64)?;
673
674 #[cfg(feature = "profile-q6")]
675 {
676 eprintln!("[PROFILE-Q6] ✓ {} pipeline execution: {:?}", strategy_name, start.elapsed());
677 }
678
679 log::debug!("✓ {} pipeline execution succeeded", strategy_name);
680 Ok(Some(final_result))
681 }
682
683 /// Check if the select list contains window functions
684 fn has_window_functions(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
685 select_list.iter().any(|item| {
686 if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
687 self.expr_has_window_function(expr)
688 } else {
689 false
690 }
691 })
692 }
693
694 /// Recursively check if an expression contains a window function
695 #[allow(clippy::only_used_in_recursion)]
696 fn expr_has_window_function(&self, expr: &vibesql_ast::Expression) -> bool {
697 match expr {
698 vibesql_ast::Expression::WindowFunction { .. } => true,
699 vibesql_ast::Expression::BinaryOp { left, right, .. } => {
700 self.expr_has_window_function(left) || self.expr_has_window_function(right)
701 }
702 vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_window_function(expr),
703 vibesql_ast::Expression::Function { args, .. } => {
704 args.iter().any(|arg| self.expr_has_window_function(arg))
705 }
706 vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
707 operand.as_ref().is_some_and(|e| self.expr_has_window_function(e))
708 || when_clauses.iter().any(|case_when| {
709 case_when.conditions.iter().any(|c| self.expr_has_window_function(c))
710 || self.expr_has_window_function(&case_when.result)
711 })
712 || else_result.as_ref().is_some_and(|e| self.expr_has_window_function(e))
713 }
714 _ => false,
715 }
716 }
717
718 /// Check if the select list contains any DISTINCT aggregates (e.g., COUNT(DISTINCT x))
719 fn has_distinct_aggregates(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
720 select_list.iter().any(|item| {
721 if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
722 self.expr_has_distinct_aggregate(expr)
723 } else {
724 false
725 }
726 })
727 }
728
729 /// Recursively check if an expression contains a DISTINCT aggregate
730 #[allow(clippy::only_used_in_recursion)]
731 fn expr_has_distinct_aggregate(&self, expr: &vibesql_ast::Expression) -> bool {
732 match expr {
733 vibesql_ast::Expression::AggregateFunction { distinct, .. } => *distinct,
734 vibesql_ast::Expression::BinaryOp { left, right, .. } => {
735 self.expr_has_distinct_aggregate(left) || self.expr_has_distinct_aggregate(right)
736 }
737 vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_distinct_aggregate(expr),
738 vibesql_ast::Expression::Function { args, .. } => {
739 args.iter().any(|arg| self.expr_has_distinct_aggregate(arg))
740 }
741 vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
742 operand.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
743 || when_clauses.iter().any(|case_when| {
744 case_when.conditions.iter().any(|c| self.expr_has_distinct_aggregate(c))
745 || self.expr_has_distinct_aggregate(&case_when.result)
746 })
747 || else_result.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
748 }
749 _ => false,
750 }
751 }
752
753 /// Execute using traditional row-oriented path
754 ///
755 /// This is the fallback path when columnar execution is not available or not beneficial.
756 fn execute_row_oriented(
757 &self,
758 stmt: &vibesql_ast::SelectStmt,
759 cte_results: &HashMap<String, CteResult>,
760 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
761 let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
762 let has_group_by = stmt.group_by.is_some();
763
764 if has_aggregates || has_group_by {
765 self.execute_with_aggregation(stmt, cte_results)
766 } else if let Some(from_clause) = &stmt.from {
767 // Re-enabled predicate pushdown for all queries (issue #1902)
768 //
769 // Previously, predicate pushdown was selectively disabled for multi-column IN clauses
770 // because index optimization happened in execute_without_aggregation() on row indices
771 // from the FROM result. When predicate pushdown filtered rows early, the indices no
772 // longer matched the original table, causing incorrect results.
773 //
774 // Now that all index optimization has been moved to the scan level (execute_index_scan),
775 // it happens BEFORE predicate pushdown, avoiding the row-index mismatch problem.
776 // This allows predicate pushdown to work correctly for all queries, improving performance.
777 //
778 // Fixes issues #1807, #1895, #1896, and #1902.
779
780 // Pass WHERE, ORDER BY, and LIMIT to execute_from for optimization
781 // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
782 // Pass select_list for table elimination optimization (#3556)
783 let from_result = self.execute_from_with_where(
784 from_clause,
785 cte_results,
786 stmt.where_clause.as_ref(),
787 stmt.order_by.as_deref(),
788 stmt.limit,
789 Some(&stmt.select_list),
790 )?;
791
792 // Validate column references BEFORE processing rows (issue #2654)
793 // This ensures column errors are caught even when tables are empty
794 // Pass procedural context to allow procedure variables in WHERE clause
795 // Pass outer_schema for correlated subqueries (#2694)
796 super::validation::validate_select_columns_with_context(
797 &stmt.select_list,
798 stmt.where_clause.as_ref(),
799 &from_result.schema,
800 self.procedural_context,
801 self.outer_schema,
802 )?;
803
804 self.execute_without_aggregation(stmt, from_result, cte_results)
805 } else {
806 // SELECT without FROM - evaluate expressions as a single row
807 self.execute_select_without_from(stmt)
808 }
809 }
810
811 /// Execute a chain of set operations left-to-right
812 ///
813 /// SQL set operations are left-associative, so:
814 /// A EXCEPT B EXCEPT C should evaluate as (A EXCEPT B) EXCEPT C
815 ///
816 /// The parser creates a right-recursive AST structure, but we need to execute left-to-right.
817 fn execute_set_operations(
818 &self,
819 mut left_results: Vec<vibesql_storage::Row>,
820 set_op: &vibesql_ast::SetOperation,
821 cte_results: &HashMap<String, CteResult>,
822 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
823 // Execute the immediate right query WITHOUT its set operations
824 // This prevents right-recursive evaluation
825 let right_stmt = &set_op.right;
826 let has_aggregates =
827 self.has_aggregates(&right_stmt.select_list) || right_stmt.having.is_some();
828 let has_group_by = right_stmt.group_by.is_some();
829
830 let right_results = if has_aggregates || has_group_by {
831 self.execute_with_aggregation(right_stmt, cte_results)?
832 } else if let Some(from_clause) = &right_stmt.from {
833 // Note: LIMIT is None for set operation sides - it's applied after the set operation
834 // Pass select_list for table elimination optimization (#3556)
835 let from_result = self.execute_from_with_where(
836 from_clause,
837 cte_results,
838 right_stmt.where_clause.as_ref(),
839 right_stmt.order_by.as_deref(),
840 None,
841 Some(&right_stmt.select_list),
842 )?;
843 self.execute_without_aggregation(right_stmt, from_result, cte_results)?
844 } else {
845 self.execute_select_without_from(right_stmt)?
846 };
847
848 // Track memory for right result before set operation
849 let right_size = estimate_result_size(&right_results);
850 self.track_memory_allocation(right_size)?;
851
852 // Apply the current operation
853 left_results = apply_set_operation(left_results, right_results, set_op)?;
854
855 // Track memory for combined result after set operation
856 let combined_size = estimate_result_size(&left_results);
857 self.track_memory_allocation(combined_size)?;
858
859 // If the right side has more set operations, continue processing them
860 // This creates the left-to-right evaluation: ((A op B) op C) op D
861 if let Some(next_set_op) = &right_stmt.set_operation {
862 left_results = self.execute_set_operations(left_results, next_set_op, cte_results)?;
863 }
864
865 Ok(left_results)
866 }
867
868 /// Execute a FROM clause with WHERE, ORDER BY, and LIMIT for optimization
869 ///
870 /// The LIMIT parameter enables early termination optimization (#3253):
871 /// - When ORDER BY is satisfied by an index and no post-filter is needed,
872 /// the index scan can stop after fetching LIMIT rows
873 ///
874 /// Note: Table elimination (#3556) is now handled at the optimizer level
875 /// via crate::optimizer::eliminate_unused_tables(), which runs before
876 /// semi-join transformation to avoid complex interactions.
877 pub(super) fn execute_from_with_where(
878 &self,
879 from: &vibesql_ast::FromClause,
880 cte_results: &HashMap<String, CteResult>,
881 where_clause: Option<&vibesql_ast::Expression>,
882 order_by: Option<&[vibesql_ast::OrderByItem]>,
883 limit: Option<usize>,
884 _select_list: Option<&[vibesql_ast::SelectItem]>, // No longer used - optimization moved to optimizer pass
885 ) -> Result<FromResult, ExecutorError> {
886 use crate::select::scan::execute_from_clause;
887
888 let from_result = execute_from_clause(
889 from,
890 cte_results,
891 self.database,
892 where_clause,
893 order_by,
894 limit,
895 self.outer_row,
896 self.outer_schema,
897 |query| {
898 // For derived table subqueries, create a child executor with CTE context
899 // This allows CTEs from the outer WITH clause to be referenced in subqueries
900 // Critical for queries like TPC-DS Q2 where CTEs are used in FROM subqueries
901 if !cte_results.is_empty() {
902 let child = SelectExecutor::new_with_cte_and_depth(
903 self.database,
904 cte_results,
905 self.subquery_depth,
906 );
907 child.execute_with_columns(query)
908 } else {
909 self.execute_with_columns(query)
910 }
911 },
912 )?;
913
914 // NOTE: We DON'T merge outer schema with from_result.schema here because:
915 // 1. from_result.rows only contain values from inner tables
916 // 2. Outer columns are resolved via the evaluator's outer_row/outer_schema
917 // 3. Merging would create schema/row mismatch (schema has outer cols, rows don't)
918
919 Ok(from_result)
920 }
921}