vibesql_executor/select/executor/execute.rs
1//! Main execution methods for SelectExecutor
2//!
3//! This module implements the unified execution dispatcher that routes queries
4//! to the appropriate execution pipeline based on the selected strategy.
5//!
6//! ## Execution Pipeline Architecture
7//!
8//! The dispatcher uses the `ExecutionPipeline` trait to provide a unified interface
9//! for query execution across different strategies:
10//!
11//! - **NativeColumnar**: Zero-copy SIMD execution from columnar storage
12//! - **StandardColumnar**: SIMD execution with row-to-batch conversion
13//! - **RowOriented**: Traditional row-by-row execution
14//! - **ExpressionOnly**: SELECT without FROM clause (special case)
15//!
16//! ```text
17//! Strategy Selection → Create Pipeline → Execute Pipeline Stages → Results
18//! ↓
19//! apply_filter → apply_projection → apply_aggregation → apply_limit_offset
20//! ```
21
22use std::collections::HashMap;
23
24use super::builder::SelectExecutor;
25use crate::{
26 errors::ExecutorError,
27 optimizer::adaptive::{
28 choose_execution_strategy, ExecutionStrategy, StrategyContext,
29 },
30 pipeline::{
31 ColumnarPipeline, ExecutionContext, ExecutionPipeline, NativeColumnarPipeline,
32 PipelineInput,
33 },
34 select::{
35 cte::{execute_ctes, execute_ctes_with_memory_check, CteResult},
36 helpers::{apply_limit_offset, estimate_result_size},
37 join::FromResult,
38 set_operations::apply_set_operation,
39 SelectResult,
40 },
41};
42
43impl SelectExecutor<'_> {
44 /// Execute a SELECT statement
45 pub fn execute(&self, stmt: &vibesql_ast::SelectStmt) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
46 #[cfg(feature = "profile-q6")]
47 let execute_start = std::time::Instant::now();
48
49 // Reset arena for fresh query execution (only at top level)
50 if self.subquery_depth == 0 {
51 self.reset_arena();
52 }
53
54 // Check timeout before starting execution
55 self.check_timeout()?;
56
57 // Check subquery depth limit to prevent stack overflow
58 if self.subquery_depth >= crate::limits::MAX_EXPRESSION_DEPTH {
59 return Err(ExecutorError::ExpressionDepthExceeded {
60 depth: self.subquery_depth,
61 max_depth: crate::limits::MAX_EXPRESSION_DEPTH,
62 });
63 }
64
65 // Fast path for simple point-lookup queries (TPC-C optimization)
66 // This bypasses expensive optimizer passes for queries like:
67 // SELECT col FROM table WHERE pk = value
68 if self.subquery_depth == 0
69 && self.outer_row.is_none()
70 && self.cte_context.is_none()
71 && super::fast_path::is_simple_point_query(stmt)
72 {
73 return self.execute_fast_path(stmt);
74 }
75
76 #[cfg(feature = "profile-q6")]
77 let _setup_time = execute_start.elapsed();
78
79 // Apply subquery rewriting optimizations (Phase 2 of IN subquery optimization)
80 // - Rewrites correlated IN → EXISTS with LIMIT 1 for early termination
81 // - Adds DISTINCT to uncorrelated IN subqueries to reduce duplicate processing
82 // This works in conjunction with Phase 1 (HashSet optimization, #2136)
83 #[cfg(feature = "profile-q6")]
84 let optimizer_start = std::time::Instant::now();
85
86 let optimized_stmt = crate::optimizer::rewrite_subquery_optimizations(stmt);
87
88 #[cfg(feature = "profile-q6")]
89 let _optimizer_time = optimizer_start.elapsed();
90
91 // Transform decorrelated IN/EXISTS subqueries to semi/anti-joins (#2424)
92 // This enables hash-based join execution instead of row-by-row subquery evaluation
93 // Converts WHERE clauses like "WHERE x IN (SELECT y FROM t)" to "SEMI JOIN t ON x = y"
94 let optimized_stmt = crate::optimizer::transform_subqueries_to_joins(&optimized_stmt);
95
96 // Execute CTEs if present and merge with outer query's CTE context
97 let mut cte_results = if let Some(with_clause) = &optimized_stmt.with_clause {
98 // This query has its own CTEs - execute them with memory tracking
99 execute_ctes_with_memory_check(
100 with_clause,
101 |query, cte_ctx| self.execute_with_ctes(query, cte_ctx),
102 |size| self.track_memory_allocation(size),
103 )?
104 } else {
105 HashMap::new()
106 };
107
108 // If we have access to outer query's CTEs (for subqueries), merge them in
109 // Local CTEs take precedence over outer CTEs if there are name conflicts
110 if let Some(outer_cte_ctx) = self.cte_context {
111 for (name, result) in outer_cte_ctx {
112 cte_results.entry(name.clone()).or_insert_with(|| result.clone());
113 }
114 }
115
116 #[cfg(feature = "profile-q6")]
117 let _pre_execute_time = execute_start.elapsed();
118
119 // Execute the main query with CTE context
120 let result = self.execute_with_ctes(&optimized_stmt, &cte_results)?;
121
122 #[cfg(feature = "profile-q6")]
123 {
124 let _total_execute = execute_start.elapsed();
125 }
126
127 Ok(result)
128 }
129
130 /// Execute a SELECT statement and return an iterator over results
131 ///
132 /// This enables early termination when the full result set is not needed,
133 /// such as for IN subqueries where we stop after finding the first match.
134 ///
135 /// # Phase 1 Implementation (Early Termination for IN subqueries)
136 ///
137 /// Current implementation materializes results then returns an iterator.
138 /// This still enables early termination in the consumer (e.g., eval_in_subquery)
139 /// by stopping iteration when a match is found.
140 ///
141 /// Future optimization: Leverage the existing RowIterator infrastructure
142 /// (crate::select::iterator) for truly lazy evaluation that stops execution
143 /// early, not just iteration.
144 pub fn execute_iter(
145 &self,
146 stmt: &vibesql_ast::SelectStmt,
147 ) -> Result<impl Iterator<Item = vibesql_storage::Row>, ExecutorError> {
148 // For Phase 1, materialize then return iterator
149 // This still enables early termination in the consumer
150 let rows = self.execute(stmt)?;
151 Ok(rows.into_iter())
152 }
153
154 /// Execute a SELECT statement and return both columns and rows
155 pub fn execute_with_columns(
156 &self,
157 stmt: &vibesql_ast::SelectStmt,
158 ) -> Result<SelectResult, ExecutorError> {
159 // First, get the FROM result to access the schema
160 let from_result = if let Some(from_clause) = &stmt.from {
161 let mut cte_results = if let Some(with_clause) = &stmt.with_clause {
162 execute_ctes(with_clause, |query, cte_ctx| self.execute_with_ctes(query, cte_ctx))?
163 } else {
164 HashMap::new()
165 };
166 // If we have access to outer query's CTEs (for subqueries/derived tables), merge them in
167 // Local CTEs take precedence over outer CTEs if there are name conflicts
168 // This is critical for queries like TPC-DS Q2 where CTEs are referenced from derived tables
169 if let Some(outer_cte_ctx) = self.cte_context {
170 for (name, result) in outer_cte_ctx {
171 cte_results.entry(name.clone()).or_insert_with(|| result.clone());
172 }
173 }
174 // Pass WHERE, ORDER BY, and LIMIT for optimizations
175 // This is critical for GROUP BY queries to avoid CROSS JOINs
176 // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
177 Some(self.execute_from_with_where(
178 from_clause,
179 &cte_results,
180 stmt.where_clause.as_ref(),
181 stmt.order_by.as_deref(),
182 stmt.limit,
183 )?)
184 } else {
185 None
186 };
187
188 // Derive column names from the SELECT list
189 let columns = self.derive_column_names(&stmt.select_list, from_result.as_ref())?;
190
191 // Execute the query to get rows
192 let rows = self.execute(stmt)?;
193
194 Ok(SelectResult { columns, rows })
195 }
196
197 /// Execute SELECT statement with CTE context
198 ///
199 /// Uses unified strategy selection to determine the optimal execution path:
200 /// - NativeColumnar: Zero-copy SIMD execution from columnar storage
201 /// - StandardColumnar: SIMD execution with row-to-batch conversion
202 /// - RowOriented: Traditional row-by-row execution
203 /// - ExpressionOnly: SELECT without FROM clause (special case)
204 ///
205 /// ## Pipeline-Based Execution (Phase 5)
206 ///
207 /// This method uses the `ExecutionPipeline` trait to provide a unified interface
208 /// for query execution. Each strategy creates an appropriate pipeline that
209 /// implements filter, projection, aggregation, and limit/offset operations.
210 ///
211 /// ```text
212 /// Strategy Selection → Create Pipeline → Execute via Trait Methods
213 /// ↓
214 /// NativeColumnar → NativeColumnarPipeline::apply_*()
215 /// StandardColumnar → ColumnarPipeline::apply_*()
216 /// RowOriented → RowOrientedPipeline::apply_*()
217 /// ExpressionOnly → Special case (no table scan)
218 /// ```
219 pub(super) fn execute_with_ctes(
220 &self,
221 stmt: &vibesql_ast::SelectStmt,
222 cte_results: &HashMap<String, CteResult>,
223 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
224 #[cfg(feature = "profile-q6")]
225 let _execute_ctes_start = std::time::Instant::now();
226
227 // Check if native columnar is enabled via feature flag or env var
228 let native_columnar_enabled =
229 cfg!(feature = "native-columnar") || std::env::var("VIBESQL_NATIVE_COLUMNAR").is_ok();
230
231 // Use unified strategy selection for the execution path
232 let strategy_ctx = StrategyContext::new(stmt, cte_results, native_columnar_enabled);
233 let strategy = choose_execution_strategy(&strategy_ctx);
234
235 log::debug!(
236 "Execution strategy selected: {} (reason: {})",
237 strategy.name(),
238 strategy.score().reason
239 );
240
241 #[cfg(feature = "profile-q6")]
242 eprintln!(
243 "[PROFILE-Q6] Execution strategy: {} ({})",
244 strategy.name(),
245 strategy.score().reason
246 );
247
248 // Dispatch based on selected strategy using ExecutionPipeline trait
249 // Pipeline execution returns Option<Vec<Row>> - None means fallback needed
250 let mut results = match strategy {
251 ExecutionStrategy::NativeColumnar { .. } => {
252 // First try the optimized zero-copy native columnar path
253 // This uses ColumnarBatch::from_storage_columnar() for zero-copy conversion
254 // and executes filter+aggregate in a single pass without row materialization
255 if let Some(result) = self.try_native_columnar_execution(stmt, cte_results)? {
256 #[cfg(feature = "profile-q6")]
257 eprintln!("[PROFILE-Q6] Native columnar: zero-copy path succeeded");
258 result
259 } else {
260 // Fall back to pipeline-based execution if zero-copy path is not applicable
261 // (e.g., complex predicates, multiple tables, unsupported aggregates)
262 log::debug!("Native columnar: zero-copy path not applicable, trying pipeline");
263 match self.execute_via_pipeline(
264 stmt,
265 cte_results,
266 NativeColumnarPipeline::new,
267 "NativeColumnar",
268 )? {
269 Some(result) => result,
270 None => {
271 // Fall back to row-oriented if pipeline also fails
272 log::debug!("Native columnar runtime fallback to row-oriented");
273 #[cfg(feature = "profile-q6")]
274 eprintln!("[PROFILE-Q6] Native columnar fallback to row-oriented");
275 self.execute_row_oriented(stmt, cte_results)?
276 }
277 }
278 }
279 }
280
281 ExecutionStrategy::StandardColumnar { .. } => {
282 // StandardColumnar uses the pipeline-based execution path
283 // Note: We don't use try_native_columnar_execution here because row tables
284 // go through the pipeline which correctly handles all data types including dates.
285 // The native columnar zero-copy path has known limitations with certain date comparisons.
286 match self.execute_via_pipeline(
287 stmt,
288 cte_results,
289 ColumnarPipeline::new,
290 "StandardColumnar",
291 )? {
292 Some(result) => result,
293 None => {
294 log::debug!("Standard columnar runtime fallback to row-oriented");
295 #[cfg(feature = "profile-q6")]
296 eprintln!("[PROFILE-Q6] Standard columnar fallback to row-oriented");
297 self.execute_row_oriented(stmt, cte_results)?
298 }
299 }
300 }
301
302 ExecutionStrategy::RowOriented { .. } => {
303 // Row-oriented uses the traditional path which has full feature support
304 // The RowOrientedPipeline is used for simpler queries, but complex
305 // queries (with JOINs, window functions, DISTINCT, etc.) need the
306 // full execute_row_oriented implementation
307
308 // Phase 4: Try columnar join execution for multi-table JOIN queries (#2943)
309 // This provides 3-5x speedup for TPC-H Q3 style queries
310 let has_joins = stmt.from.as_ref().is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
311 if has_joins {
312 if let Some(result) = self.try_columnar_join_execution(stmt, cte_results)? {
313 log::info!("Columnar join execution succeeded");
314 result
315 } else {
316 log::debug!("Columnar join execution not applicable, falling back to row-oriented");
317 self.execute_row_oriented(stmt, cte_results)?
318 }
319 } else {
320 self.execute_row_oriented(stmt, cte_results)?
321 }
322 }
323
324 ExecutionStrategy::ExpressionOnly { .. } => {
325 // SELECT without FROM - special case that doesn't use pipelines
326 // May still have aggregates (e.g., SELECT COUNT(*), SELECT MAX(1))
327 return self.execute_expression_only(stmt, cte_results);
328 }
329 };
330
331 // Handle set operations (UNION, INTERSECT, EXCEPT)
332 // Process operations left-to-right to ensure correct associativity
333 if let Some(set_op) = &stmt.set_operation {
334 results = self.execute_set_operations(results, set_op, cte_results)?;
335
336 // Apply LIMIT/OFFSET to the final result (after all set operations)
337 // For queries WITHOUT set operations, LIMIT/OFFSET is already applied
338 // in execute_without_aggregation() or execute_with_aggregation()
339 results = apply_limit_offset(results, stmt.limit, stmt.offset);
340 }
341
342 Ok(results)
343 }
344
345 /// Execute SELECT without FROM clause (ExpressionOnly strategy)
346 ///
347 /// This is a special case that doesn't use the pipeline trait since there's
348 /// no table scan involved. Handles both simple expressions and aggregates.
349 fn execute_expression_only(
350 &self,
351 stmt: &vibesql_ast::SelectStmt,
352 cte_results: &HashMap<String, CteResult>,
353 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
354 let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
355
356 if has_aggregates {
357 // Aggregates without FROM need the aggregation path
358 self.execute_with_aggregation(stmt, cte_results)
359 } else {
360 // Simple expression evaluation (e.g., SELECT 1 + 1)
361 self.execute_select_without_from(stmt)
362 }
363 }
364
365 /// Execute a query using the specified execution pipeline
366 ///
367 /// This method provides a unified interface for pipeline-based execution.
368 /// It creates the pipeline, prepares input, and executes the pipeline stages.
369 ///
370 /// Returns `Ok(Some(results))` if the pipeline executed successfully,
371 /// `Ok(None)` if the pipeline cannot handle the query (fallback needed),
372 /// or `Err` if an error occurred.
373 ///
374 /// # Type Parameters
375 ///
376 /// * `P` - The pipeline type (must implement `ExecutionPipeline`)
377 /// * `F` - Factory function to create the pipeline
378 fn execute_via_pipeline<P, F>(
379 &self,
380 stmt: &vibesql_ast::SelectStmt,
381 cte_results: &HashMap<String, CteResult>,
382 create_pipeline: F,
383 strategy_name: &str,
384 ) -> Result<Option<Vec<vibesql_storage::Row>>, ExecutorError>
385 where
386 P: ExecutionPipeline,
387 F: FnOnce() -> P,
388 {
389 #[cfg(feature = "profile-q6")]
390 let start = std::time::Instant::now();
391
392 // Check query complexity - pipelines don't support all features
393 let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
394 let has_group_by = stmt.group_by.is_some();
395 let has_joins = stmt.from.as_ref().is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
396 let has_order_by = stmt.order_by.is_some();
397 let has_distinct = stmt.distinct;
398 let has_set_ops = stmt.set_operation.is_some();
399 let has_window_funcs = self.has_window_functions(&stmt.select_list);
400 let has_distinct_aggregates = self.has_distinct_aggregates(&stmt.select_list);
401
402 // Create the pipeline
403 let pipeline = create_pipeline();
404
405 // Check if the pipeline supports this query pattern
406 if !pipeline.supports_query_pattern(has_aggregates, has_group_by, has_joins) {
407 log::debug!(
408 "{} pipeline doesn't support query pattern (agg={}, group_by={}, joins={})",
409 strategy_name,
410 has_aggregates,
411 has_group_by,
412 has_joins
413 );
414 return Ok(None);
415 }
416
417 // For complex queries (ORDER BY, DISTINCT, window functions, set ops, DISTINCT aggregates),
418 // fall back to full execution paths which have complete support
419 if has_order_by || has_distinct || has_window_funcs || has_set_ops || has_distinct_aggregates {
420 log::debug!(
421 "{} pipeline doesn't support complex features (order_by={}, distinct={}, window={}, set_ops={}, distinct_agg={})",
422 strategy_name,
423 has_order_by,
424 has_distinct,
425 has_window_funcs,
426 has_set_ops,
427 has_distinct_aggregates
428 );
429 return Ok(None);
430 }
431
432 // Must have a FROM clause for pipeline execution
433 let from_clause = match &stmt.from {
434 Some(from) => from,
435 None => return Ok(None),
436 };
437
438 // Execute FROM clause to get input data
439 // Note: WHERE, ORDER BY, and LIMIT are handled by the pipeline, not here
440 let from_result = self.execute_from_with_where(
441 from_clause,
442 cte_results,
443 None, // Pipeline will apply WHERE filter
444 None, // ORDER BY handled separately
445 None, // LIMIT applied after pipeline
446 )?;
447
448 // Build execution context
449 let mut exec_ctx = ExecutionContext::new(&from_result.schema, self.database);
450 // Add outer context for correlated subqueries (#2998)
451 if let (Some(outer_row), Some(outer_schema)) = (self.outer_row, self.outer_schema) {
452 exec_ctx = exec_ctx.with_outer_context(outer_row, outer_schema);
453 }
454 // Add CTE context if available
455 if !cte_results.is_empty() {
456 exec_ctx = exec_ctx.with_cte_context(cte_results);
457 }
458
459 // Validate column references BEFORE processing
460 super::validation::validate_select_columns_with_context(
461 &stmt.select_list,
462 stmt.where_clause.as_ref(),
463 &from_result.schema,
464 self.procedural_context,
465 self.outer_schema,
466 )?;
467
468 // Prepare input from FROM result
469 let input = PipelineInput::from_rows_owned(from_result.data.into_rows());
470
471 // Execute pipeline stages with fallback on error
472 // If any pipeline stage fails with UnsupportedFeature, fall back to row-oriented
473
474 // Stage 1: Filter (WHERE clause)
475 let filtered = match pipeline.apply_filter(input, stmt.where_clause.as_ref(), &exec_ctx) {
476 Ok(result) => result,
477 Err(ExecutorError::UnsupportedFeature(_)) | Err(ExecutorError::UnsupportedExpression(_)) => {
478 log::debug!("{} pipeline filter failed, falling back", strategy_name);
479 return Ok(None);
480 }
481 Err(e) => return Err(e),
482 };
483
484 // Stage 2: Projection or Aggregation
485 let result = if has_aggregates || has_group_by {
486 // Execute aggregation (includes projection)
487 // Get GROUP BY expressions if present (as slice)
488 let group_by_slice: Option<&[vibesql_ast::Expression]> =
489 stmt.group_by.as_ref().and_then(|g| g.as_simple()).map(|v| v.as_slice());
490 match pipeline.apply_aggregation(
491 filtered.into_input(),
492 &stmt.select_list,
493 group_by_slice,
494 stmt.having.as_ref(),
495 &exec_ctx,
496 ) {
497 Ok(result) => result,
498 Err(ExecutorError::UnsupportedFeature(_)) | Err(ExecutorError::UnsupportedExpression(_)) => {
499 log::debug!("{} pipeline aggregation failed, falling back", strategy_name);
500 return Ok(None);
501 }
502 Err(e) => return Err(e),
503 }
504 } else {
505 // Execute projection only
506 match pipeline.apply_projection(filtered.into_input(), &stmt.select_list, &exec_ctx) {
507 Ok(result) => result,
508 Err(ExecutorError::UnsupportedFeature(_)) | Err(ExecutorError::UnsupportedExpression(_)) => {
509 log::debug!("{} pipeline projection failed, falling back", strategy_name);
510 return Ok(None);
511 }
512 Err(e) => return Err(e),
513 }
514 };
515
516 // Stage 3: Limit/Offset (convert usize to u64)
517 let limit_u64 = stmt.limit.map(|l| l as u64);
518 let offset_u64 = stmt.offset.map(|o| o as u64);
519 let final_result = pipeline.apply_limit_offset(result, limit_u64, offset_u64)?;
520
521 #[cfg(feature = "profile-q6")]
522 {
523 eprintln!(
524 "[PROFILE-Q6] ✓ {} pipeline execution: {:?}",
525 strategy_name,
526 start.elapsed()
527 );
528 }
529
530 log::debug!("✓ {} pipeline execution succeeded", strategy_name);
531 Ok(Some(final_result))
532 }
533
534 /// Check if the select list contains window functions
535 fn has_window_functions(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
536 select_list.iter().any(|item| {
537 if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
538 self.expr_has_window_function(expr)
539 } else {
540 false
541 }
542 })
543 }
544
545 /// Recursively check if an expression contains a window function
546 #[allow(clippy::only_used_in_recursion)]
547 fn expr_has_window_function(&self, expr: &vibesql_ast::Expression) -> bool {
548 match expr {
549 vibesql_ast::Expression::WindowFunction { .. } => true,
550 vibesql_ast::Expression::BinaryOp { left, right, .. } => {
551 self.expr_has_window_function(left) || self.expr_has_window_function(right)
552 }
553 vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_window_function(expr),
554 vibesql_ast::Expression::Function { args, .. } => {
555 args.iter().any(|arg| self.expr_has_window_function(arg))
556 }
557 vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
558 operand.as_ref().is_some_and(|e| self.expr_has_window_function(e))
559 || when_clauses.iter().any(|case_when| {
560 case_when.conditions.iter().any(|c| self.expr_has_window_function(c))
561 || self.expr_has_window_function(&case_when.result)
562 })
563 || else_result.as_ref().is_some_and(|e| self.expr_has_window_function(e))
564 }
565 _ => false,
566 }
567 }
568
569 /// Check if the select list contains any DISTINCT aggregates (e.g., COUNT(DISTINCT x))
570 fn has_distinct_aggregates(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
571 select_list.iter().any(|item| {
572 if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
573 self.expr_has_distinct_aggregate(expr)
574 } else {
575 false
576 }
577 })
578 }
579
580 /// Recursively check if an expression contains a DISTINCT aggregate
581 #[allow(clippy::only_used_in_recursion)]
582 fn expr_has_distinct_aggregate(&self, expr: &vibesql_ast::Expression) -> bool {
583 match expr {
584 vibesql_ast::Expression::AggregateFunction { distinct, .. } => *distinct,
585 vibesql_ast::Expression::BinaryOp { left, right, .. } => {
586 self.expr_has_distinct_aggregate(left) || self.expr_has_distinct_aggregate(right)
587 }
588 vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_distinct_aggregate(expr),
589 vibesql_ast::Expression::Function { args, .. } => {
590 args.iter().any(|arg| self.expr_has_distinct_aggregate(arg))
591 }
592 vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
593 operand.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
594 || when_clauses.iter().any(|case_when| {
595 case_when.conditions.iter().any(|c| self.expr_has_distinct_aggregate(c))
596 || self.expr_has_distinct_aggregate(&case_when.result)
597 })
598 || else_result.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
599 }
600 _ => false,
601 }
602 }
603
604 /// Execute using traditional row-oriented path
605 ///
606 /// This is the fallback path when columnar execution is not available or not beneficial.
607 fn execute_row_oriented(
608 &self,
609 stmt: &vibesql_ast::SelectStmt,
610 cte_results: &HashMap<String, CteResult>,
611 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
612 let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
613 let has_group_by = stmt.group_by.is_some();
614
615 if has_aggregates || has_group_by {
616 self.execute_with_aggregation(stmt, cte_results)
617 } else if let Some(from_clause) = &stmt.from {
618 // Re-enabled predicate pushdown for all queries (issue #1902)
619 //
620 // Previously, predicate pushdown was selectively disabled for multi-column IN clauses
621 // because index optimization happened in execute_without_aggregation() on row indices
622 // from the FROM result. When predicate pushdown filtered rows early, the indices no
623 // longer matched the original table, causing incorrect results.
624 //
625 // Now that all index optimization has been moved to the scan level (execute_index_scan),
626 // it happens BEFORE predicate pushdown, avoiding the row-index mismatch problem.
627 // This allows predicate pushdown to work correctly for all queries, improving performance.
628 //
629 // Fixes issues #1807, #1895, #1896, and #1902.
630
631 // Pass WHERE, ORDER BY, and LIMIT to execute_from for optimization
632 // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
633 let from_result = self.execute_from_with_where(
634 from_clause,
635 cte_results,
636 stmt.where_clause.as_ref(),
637 stmt.order_by.as_deref(),
638 stmt.limit,
639 )?;
640
641 // Validate column references BEFORE processing rows (issue #2654)
642 // This ensures column errors are caught even when tables are empty
643 // Pass procedural context to allow procedure variables in WHERE clause
644 // Pass outer_schema for correlated subqueries (#2694)
645 super::validation::validate_select_columns_with_context(
646 &stmt.select_list,
647 stmt.where_clause.as_ref(),
648 &from_result.schema,
649 self.procedural_context,
650 self.outer_schema,
651 )?;
652
653 self.execute_without_aggregation(stmt, from_result, cte_results)
654 } else {
655 // SELECT without FROM - evaluate expressions as a single row
656 self.execute_select_without_from(stmt)
657 }
658 }
659
660 /// Execute a chain of set operations left-to-right
661 ///
662 /// SQL set operations are left-associative, so:
663 /// A EXCEPT B EXCEPT C should evaluate as (A EXCEPT B) EXCEPT C
664 ///
665 /// The parser creates a right-recursive AST structure, but we need to execute left-to-right.
666 fn execute_set_operations(
667 &self,
668 mut left_results: Vec<vibesql_storage::Row>,
669 set_op: &vibesql_ast::SetOperation,
670 cte_results: &HashMap<String, CteResult>,
671 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
672 // Execute the immediate right query WITHOUT its set operations
673 // This prevents right-recursive evaluation
674 let right_stmt = &set_op.right;
675 let has_aggregates = self.has_aggregates(&right_stmt.select_list) || right_stmt.having.is_some();
676 let has_group_by = right_stmt.group_by.is_some();
677
678 let right_results = if has_aggregates || has_group_by {
679 self.execute_with_aggregation(right_stmt, cte_results)?
680 } else if let Some(from_clause) = &right_stmt.from {
681 // Note: LIMIT is None for set operation sides - it's applied after the set operation
682 let from_result =
683 self.execute_from_with_where(from_clause, cte_results, right_stmt.where_clause.as_ref(), right_stmt.order_by.as_deref(), None)?;
684 self.execute_without_aggregation(right_stmt, from_result, cte_results)?
685 } else {
686 self.execute_select_without_from(right_stmt)?
687 };
688
689 // Track memory for right result before set operation
690 let right_size = estimate_result_size(&right_results);
691 self.track_memory_allocation(right_size)?;
692
693 // Apply the current operation
694 left_results = apply_set_operation(left_results, right_results, set_op)?;
695
696 // Track memory for combined result after set operation
697 let combined_size = estimate_result_size(&left_results);
698 self.track_memory_allocation(combined_size)?;
699
700 // If the right side has more set operations, continue processing them
701 // This creates the left-to-right evaluation: ((A op B) op C) op D
702 if let Some(next_set_op) = &right_stmt.set_operation {
703 left_results = self.execute_set_operations(left_results, next_set_op, cte_results)?;
704 }
705
706 Ok(left_results)
707 }
708
709 /// Execute a FROM clause with WHERE, ORDER BY, and LIMIT for optimization
710 ///
711 /// The LIMIT parameter enables early termination optimization (#3253):
712 /// - When ORDER BY is satisfied by an index and no post-filter is needed,
713 /// the index scan can stop after fetching LIMIT rows
714 pub(super) fn execute_from_with_where(
715 &self,
716 from: &vibesql_ast::FromClause,
717 cte_results: &HashMap<String, CteResult>,
718 where_clause: Option<&vibesql_ast::Expression>,
719 order_by: Option<&[vibesql_ast::OrderByItem]>,
720 limit: Option<usize>,
721 ) -> Result<FromResult, ExecutorError> {
722 use crate::select::scan::execute_from_clause;
723 let from_result = execute_from_clause(from, cte_results, self.database, where_clause, order_by, limit, self.outer_row, self.outer_schema, |query| {
724 // For derived table subqueries, create a child executor with CTE context
725 // This allows CTEs from the outer WITH clause to be referenced in subqueries
726 // Critical for queries like TPC-DS Q2 where CTEs are used in FROM subqueries
727 if !cte_results.is_empty() {
728 let child = SelectExecutor::new_with_cte_and_depth(
729 self.database,
730 cte_results,
731 self.subquery_depth,
732 );
733 child.execute_with_columns(query)
734 } else {
735 self.execute_with_columns(query)
736 }
737 })?;
738
739 // NOTE: We DON'T merge outer schema with from_result.schema here because:
740 // 1. from_result.rows only contain values from inner tables
741 // 2. Outer columns are resolved via the evaluator's outer_row/outer_schema
742 // 3. Merging would create schema/row mismatch (schema has outer cols, rows don't)
743
744 Ok(from_result)
745 }
746
747}