vibesql_executor/select/executor/execute.rs
1//! Main execution methods for SelectExecutor
2//!
3//! This module implements the unified execution dispatcher that routes queries
4//! to the appropriate execution pipeline based on the selected strategy.
5//!
6//! ## Execution Pipeline Architecture
7//!
8//! The dispatcher uses the `ExecutionPipeline` trait to provide a unified interface
9//! for query execution across different strategies:
10//!
11//! - **NativeColumnar**: Zero-copy SIMD execution from columnar storage
12//! - **StandardColumnar**: SIMD execution with row-to-batch conversion
13//! - **RowOriented**: Traditional row-by-row execution
14//! - **ExpressionOnly**: SELECT without FROM clause (special case)
15//!
16//! ```text
17//! Strategy Selection → Create Pipeline → Execute Pipeline Stages → Results
18//! ↓
19//! apply_filter → apply_projection → apply_aggregation → apply_limit_offset
20//! ```
21
22use std::collections::HashMap;
23
24use super::builder::SelectExecutor;
25use crate::{
26 errors::ExecutorError,
27 optimizer::adaptive::{choose_execution_strategy, ExecutionStrategy, StrategyContext},
28 pipeline::{
29 ColumnarPipeline, ExecutionContext, ExecutionPipeline, NativeColumnarPipeline,
30 PipelineInput,
31 },
32 select::{
33 cte::{execute_ctes, execute_ctes_with_memory_check, CteResult},
34 helpers::{apply_limit_offset, estimate_result_size},
35 join::FromResult,
36 set_operations::apply_set_operation,
37 SelectResult,
38 },
39};
40
41impl SelectExecutor<'_> {
42 /// Execute a SELECT statement
43 pub fn execute(
44 &self,
45 stmt: &vibesql_ast::SelectStmt,
46 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
47 #[cfg(feature = "profile-q6")]
48 let execute_start = std::time::Instant::now();
49
50 // Reset arena for fresh query execution (only at top level)
51 if self.subquery_depth == 0 {
52 self.reset_arena();
53 }
54
55 // Check timeout before starting execution
56 self.check_timeout()?;
57
58 // Check subquery depth limit to prevent stack overflow
59 if self.subquery_depth >= crate::limits::MAX_EXPRESSION_DEPTH {
60 return Err(ExecutorError::ExpressionDepthExceeded {
61 depth: self.subquery_depth,
62 max_depth: crate::limits::MAX_EXPRESSION_DEPTH,
63 });
64 }
65
66 // Fast path for simple point-lookup queries (TPC-C optimization)
67 // This bypasses expensive optimizer passes for queries like:
68 // SELECT col FROM table WHERE pk = value
69 if self.subquery_depth == 0
70 && self.outer_row.is_none()
71 && self.cte_context.is_none()
72 && super::fast_path::is_simple_point_query(stmt)
73 {
74 return self.execute_fast_path(stmt);
75 }
76
77 #[cfg(feature = "profile-q6")]
78 let _setup_time = execute_start.elapsed();
79
80 // Apply subquery rewriting optimizations (Phase 2 of IN subquery optimization)
81 // - Rewrites correlated IN → EXISTS with LIMIT 1 for early termination
82 // - Adds DISTINCT to uncorrelated IN subqueries to reduce duplicate processing
83 // This works in conjunction with Phase 1 (HashSet optimization, #2136)
84 #[cfg(feature = "profile-q6")]
85 let optimizer_start = std::time::Instant::now();
86
87 let optimized_stmt = crate::optimizer::rewrite_subquery_optimizations(stmt);
88
89 #[cfg(feature = "profile-q6")]
90 let _optimizer_time = optimizer_start.elapsed();
91
92 // Eliminate unused tables that create unnecessary cross joins (#3556)
93 // Must run BEFORE semi-join transformation to avoid complex interactions
94 // with derived tables from EXISTS/IN transformations
95 let optimized_stmt = crate::optimizer::eliminate_unused_tables(&optimized_stmt);
96
97 // Transform decorrelated IN/EXISTS subqueries to semi/anti-joins (#2424)
98 // This enables hash-based join execution instead of row-by-row subquery evaluation
99 // Converts WHERE clauses like "WHERE x IN (SELECT y FROM t)" to "SEMI JOIN t ON x = y"
100 let optimized_stmt = crate::optimizer::transform_subqueries_to_joins(&optimized_stmt);
101
102 // Execute CTEs if present and merge with outer query's CTE context
103 let mut cte_results = if let Some(with_clause) = &optimized_stmt.with_clause {
104 // This query has its own CTEs - execute them with memory tracking
105 execute_ctes_with_memory_check(
106 with_clause,
107 |query, cte_ctx| self.execute_with_ctes(query, cte_ctx),
108 |size| self.track_memory_allocation(size),
109 )?
110 } else {
111 HashMap::new()
112 };
113
114 // If we have access to outer query's CTEs (for subqueries), merge them in
115 // Local CTEs take precedence over outer CTEs if there are name conflicts
116 if let Some(outer_cte_ctx) = self.cte_context {
117 for (name, result) in outer_cte_ctx {
118 cte_results.entry(name.clone()).or_insert_with(|| result.clone());
119 }
120 }
121
122 #[cfg(feature = "profile-q6")]
123 let _pre_execute_time = execute_start.elapsed();
124
125 // Execute the main query with CTE context
126 let result = self.execute_with_ctes(&optimized_stmt, &cte_results)?;
127
128 #[cfg(feature = "profile-q6")]
129 {
130 let _total_execute = execute_start.elapsed();
131 }
132
133 Ok(result)
134 }
135
136 /// Execute a SELECT statement and return an iterator over results
137 ///
138 /// This enables early termination when the full result set is not needed,
139 /// such as for IN subqueries where we stop after finding the first match.
140 ///
141 /// # Phase 1 Implementation (Early Termination for IN subqueries)
142 ///
143 /// Current implementation materializes results then returns an iterator.
144 /// This still enables early termination in the consumer (e.g., eval_in_subquery)
145 /// by stopping iteration when a match is found.
146 ///
147 /// Future optimization: Leverage the existing RowIterator infrastructure
148 /// (crate::select::iterator) for truly lazy evaluation that stops execution
149 /// early, not just iteration.
150 pub fn execute_iter(
151 &self,
152 stmt: &vibesql_ast::SelectStmt,
153 ) -> Result<impl Iterator<Item = vibesql_storage::Row>, ExecutorError> {
154 // For Phase 1, materialize then return iterator
155 // This still enables early termination in the consumer
156 let rows = self.execute(stmt)?;
157 Ok(rows.into_iter())
158 }
159
160 /// Execute a SELECT statement and return both columns and rows
161 pub fn execute_with_columns(
162 &self,
163 stmt: &vibesql_ast::SelectStmt,
164 ) -> Result<SelectResult, ExecutorError> {
165 // First, get the FROM result to access the schema
166 let from_result = if let Some(from_clause) = &stmt.from {
167 let mut cte_results = if let Some(with_clause) = &stmt.with_clause {
168 execute_ctes(with_clause, |query, cte_ctx| self.execute_with_ctes(query, cte_ctx))?
169 } else {
170 HashMap::new()
171 };
172 // If we have access to outer query's CTEs (for subqueries/derived tables), merge them in
173 // Local CTEs take precedence over outer CTEs if there are name conflicts
174 // This is critical for queries like TPC-DS Q2 where CTEs are referenced from derived tables
175 if let Some(outer_cte_ctx) = self.cte_context {
176 for (name, result) in outer_cte_ctx {
177 cte_results.entry(name.clone()).or_insert_with(|| result.clone());
178 }
179 }
180 // Pass WHERE, ORDER BY, and LIMIT for optimizations
181 // This is critical for GROUP BY queries to avoid CROSS JOINs
182 // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
183 // Pass select_list for table elimination optimization (#3556)
184 Some(self.execute_from_with_where(
185 from_clause,
186 &cte_results,
187 stmt.where_clause.as_ref(),
188 stmt.order_by.as_deref(),
189 stmt.limit,
190 Some(&stmt.select_list),
191 )?)
192 } else {
193 None
194 };
195
196 // Derive column names from the SELECT list
197 let columns = self.derive_column_names(&stmt.select_list, from_result.as_ref())?;
198
199 // Execute the query to get rows
200 let rows = self.execute(stmt)?;
201
202 Ok(SelectResult { columns, rows })
203 }
204
205 /// Execute SELECT statement with CTE context
206 ///
207 /// Uses unified strategy selection to determine the optimal execution path:
208 /// - NativeColumnar: Zero-copy SIMD execution from columnar storage
209 /// - StandardColumnar: SIMD execution with row-to-batch conversion
210 /// - RowOriented: Traditional row-by-row execution
211 /// - ExpressionOnly: SELECT without FROM clause (special case)
212 ///
213 /// ## Pipeline-Based Execution (Phase 5)
214 ///
215 /// This method uses the `ExecutionPipeline` trait to provide a unified interface
216 /// for query execution. Each strategy creates an appropriate pipeline that
217 /// implements filter, projection, aggregation, and limit/offset operations.
218 ///
219 /// ```text
220 /// Strategy Selection → Create Pipeline → Execute via Trait Methods
221 /// ↓
222 /// NativeColumnar → NativeColumnarPipeline::apply_*()
223 /// StandardColumnar → ColumnarPipeline::apply_*()
224 /// RowOriented → RowOrientedPipeline::apply_*()
225 /// ExpressionOnly → Special case (no table scan)
226 /// ```
227 pub(super) fn execute_with_ctes(
228 &self,
229 stmt: &vibesql_ast::SelectStmt,
230 cte_results: &HashMap<String, CteResult>,
231 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
232 #[cfg(feature = "profile-q6")]
233 let _execute_ctes_start = std::time::Instant::now();
234
235 // Check if native columnar is enabled via feature flag or env var
236 let native_columnar_enabled =
237 cfg!(feature = "native-columnar") || std::env::var("VIBESQL_NATIVE_COLUMNAR").is_ok();
238
239 // Use unified strategy selection for the execution path
240 let strategy_ctx = StrategyContext::new(stmt, cte_results, native_columnar_enabled);
241 let strategy = choose_execution_strategy(&strategy_ctx);
242
243 log::debug!(
244 "Execution strategy selected: {} (reason: {})",
245 strategy.name(),
246 strategy.score().reason
247 );
248
249 #[cfg(feature = "profile-q6")]
250 eprintln!(
251 "[PROFILE-Q6] Execution strategy: {} ({})",
252 strategy.name(),
253 strategy.score().reason
254 );
255
256 // Dispatch based on selected strategy using ExecutionPipeline trait
257 // Pipeline execution returns Option<Vec<Row>> - None means fallback needed
258 let mut results = match strategy {
259 ExecutionStrategy::NativeColumnar { .. } => {
260 // First try the optimized zero-copy native columnar path
261 // This uses ColumnarBatch::from_storage_columnar() for zero-copy conversion
262 // and executes filter+aggregate in a single pass without row materialization
263 if let Some(result) = self.try_native_columnar_execution(stmt, cte_results)? {
264 #[cfg(feature = "profile-q6")]
265 eprintln!("[PROFILE-Q6] Native columnar: zero-copy path succeeded");
266 result
267 } else {
268 // Fall back to pipeline-based execution if zero-copy path is not applicable
269 // (e.g., complex predicates, multiple tables, unsupported aggregates)
270 log::debug!("Native columnar: zero-copy path not applicable, trying pipeline");
271 match self.execute_via_pipeline(
272 stmt,
273 cte_results,
274 NativeColumnarPipeline::new,
275 "NativeColumnar",
276 )? {
277 Some(result) => result,
278 None => {
279 // Fall back to row-oriented if pipeline also fails
280 log::debug!("Native columnar runtime fallback to row-oriented");
281 #[cfg(feature = "profile-q6")]
282 eprintln!("[PROFILE-Q6] Native columnar fallback to row-oriented");
283 self.execute_row_oriented(stmt, cte_results)?
284 }
285 }
286 }
287 }
288
289 ExecutionStrategy::StandardColumnar { .. } => {
290 // StandardColumnar uses the pipeline-based execution path
291 // Note: We don't use try_native_columnar_execution here because row tables
292 // go through the pipeline which correctly handles all data types including dates.
293 // The native columnar zero-copy path has known limitations with certain date comparisons.
294 match self.execute_via_pipeline(
295 stmt,
296 cte_results,
297 ColumnarPipeline::new,
298 "StandardColumnar",
299 )? {
300 Some(result) => result,
301 None => {
302 log::debug!("Standard columnar runtime fallback to row-oriented");
303 #[cfg(feature = "profile-q6")]
304 eprintln!("[PROFILE-Q6] Standard columnar fallback to row-oriented");
305 self.execute_row_oriented(stmt, cte_results)?
306 }
307 }
308 }
309
310 ExecutionStrategy::RowOriented { .. } => {
311 // Row-oriented uses the traditional path which has full feature support
312 // The RowOrientedPipeline is used for simpler queries, but complex
313 // queries (with JOINs, window functions, DISTINCT, etc.) need the
314 // full execute_row_oriented implementation
315
316 // Phase 4: Try columnar join execution for multi-table JOIN queries (#2943)
317 // This provides 3-5x speedup for TPC-H Q3 style queries
318 let has_joins = stmt
319 .from
320 .as_ref()
321 .is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
322 if has_joins {
323 if let Some(result) = self.try_columnar_join_execution(stmt, cte_results)? {
324 log::info!("Columnar join execution succeeded");
325 result
326 } else {
327 log::debug!(
328 "Columnar join execution not applicable, falling back to row-oriented"
329 );
330 self.execute_row_oriented(stmt, cte_results)?
331 }
332 } else {
333 self.execute_row_oriented(stmt, cte_results)?
334 }
335 }
336
337 ExecutionStrategy::ExpressionOnly { .. } => {
338 // SELECT without FROM - special case that doesn't use pipelines
339 // May still have aggregates (e.g., SELECT COUNT(*), SELECT MAX(1))
340 // Note: Do NOT use early return here - we need to fall through to set operations handling
341 self.execute_expression_only(stmt, cte_results)?
342 }
343 };
344
345 // Handle set operations (UNION, INTERSECT, EXCEPT)
346 // Process operations left-to-right to ensure correct associativity
347 if let Some(set_op) = &stmt.set_operation {
348 results = self.execute_set_operations(results, set_op, cte_results)?;
349
350 // Apply LIMIT/OFFSET to the final result (after all set operations)
351 // For queries WITHOUT set operations, LIMIT/OFFSET is already applied
352 // in execute_without_aggregation() or execute_with_aggregation()
353 results = apply_limit_offset(results, stmt.limit, stmt.offset);
354 }
355
356 Ok(results)
357 }
358
359 /// Execute SELECT without FROM clause (ExpressionOnly strategy)
360 ///
361 /// This is a special case that doesn't use the pipeline trait since there's
362 /// no table scan involved. Handles both simple expressions and aggregates.
363 fn execute_expression_only(
364 &self,
365 stmt: &vibesql_ast::SelectStmt,
366 cte_results: &HashMap<String, CteResult>,
367 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
368 let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
369
370 if has_aggregates {
371 // Aggregates without FROM need the aggregation path
372 self.execute_with_aggregation(stmt, cte_results)
373 } else {
374 // Simple expression evaluation (e.g., SELECT 1 + 1)
375 self.execute_select_without_from(stmt)
376 }
377 }
378
379 /// Execute a query using the specified execution pipeline
380 ///
381 /// This method provides a unified interface for pipeline-based execution.
382 /// It creates the pipeline, prepares input, and executes the pipeline stages.
383 ///
384 /// Returns `Ok(Some(results))` if the pipeline executed successfully,
385 /// `Ok(None)` if the pipeline cannot handle the query (fallback needed),
386 /// or `Err` if an error occurred.
387 ///
388 /// # Type Parameters
389 ///
390 /// * `P` - The pipeline type (must implement `ExecutionPipeline`)
391 /// * `F` - Factory function to create the pipeline
392 fn execute_via_pipeline<P, F>(
393 &self,
394 stmt: &vibesql_ast::SelectStmt,
395 cte_results: &HashMap<String, CteResult>,
396 create_pipeline: F,
397 strategy_name: &str,
398 ) -> Result<Option<Vec<vibesql_storage::Row>>, ExecutorError>
399 where
400 P: ExecutionPipeline,
401 F: FnOnce() -> P,
402 {
403 #[cfg(feature = "profile-q6")]
404 let start = std::time::Instant::now();
405
406 // Check query complexity - pipelines don't support all features
407 let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
408 let has_group_by = stmt.group_by.is_some();
409 let has_joins =
410 stmt.from.as_ref().is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
411 let has_order_by = stmt.order_by.is_some();
412 let has_distinct = stmt.distinct;
413 let has_set_ops = stmt.set_operation.is_some();
414 let has_window_funcs = self.has_window_functions(&stmt.select_list);
415 let has_distinct_aggregates = self.has_distinct_aggregates(&stmt.select_list);
416
417 // Create the pipeline
418 let pipeline = create_pipeline();
419
420 // Check if the pipeline supports this query pattern
421 if !pipeline.supports_query_pattern(has_aggregates, has_group_by, has_joins) {
422 log::debug!(
423 "{} pipeline doesn't support query pattern (agg={}, group_by={}, joins={})",
424 strategy_name,
425 has_aggregates,
426 has_group_by,
427 has_joins
428 );
429 return Ok(None);
430 }
431
432 // For complex queries (ORDER BY, DISTINCT, window functions, set ops, DISTINCT aggregates),
433 // fall back to full execution paths which have complete support
434 if has_order_by
435 || has_distinct
436 || has_window_funcs
437 || has_set_ops
438 || has_distinct_aggregates
439 {
440 log::debug!(
441 "{} pipeline doesn't support complex features (order_by={}, distinct={}, window={}, set_ops={}, distinct_agg={})",
442 strategy_name,
443 has_order_by,
444 has_distinct,
445 has_window_funcs,
446 has_set_ops,
447 has_distinct_aggregates
448 );
449 return Ok(None);
450 }
451
452 // Must have a FROM clause for pipeline execution
453 let from_clause = match &stmt.from {
454 Some(from) => from,
455 None => return Ok(None),
456 };
457
458 // Execute FROM clause to get input data
459 // Note: WHERE, ORDER BY, and LIMIT are handled by the pipeline, not here
460 // Note: Table elimination requires WHERE clause, so pass None for select_list too
461 let from_result = self.execute_from_with_where(
462 from_clause,
463 cte_results,
464 None, // Pipeline will apply WHERE filter
465 None, // ORDER BY handled separately
466 None, // LIMIT applied after pipeline
467 None, // No table elimination when WHERE is deferred
468 )?;
469
470 // Build execution context
471 let mut exec_ctx = ExecutionContext::new(&from_result.schema, self.database);
472 // Add outer context for correlated subqueries (#2998)
473 if let (Some(outer_row), Some(outer_schema)) = (self.outer_row, self.outer_schema) {
474 exec_ctx = exec_ctx.with_outer_context(outer_row, outer_schema);
475 }
476 // Add CTE context if available
477 if !cte_results.is_empty() {
478 exec_ctx = exec_ctx.with_cte_context(cte_results);
479 }
480
481 // Validate column references BEFORE processing
482 super::validation::validate_select_columns_with_context(
483 &stmt.select_list,
484 stmt.where_clause.as_ref(),
485 &from_result.schema,
486 self.procedural_context,
487 self.outer_schema,
488 )?;
489
490 // Prepare input from FROM result
491 let input = PipelineInput::from_rows_owned(from_result.data.into_rows());
492
493 // Execute pipeline stages with fallback on error
494 // If any pipeline stage fails with UnsupportedFeature, fall back to row-oriented
495
496 // Stage 1: Filter (WHERE clause)
497 let filtered = match pipeline.apply_filter(input, stmt.where_clause.as_ref(), &exec_ctx) {
498 Ok(result) => result,
499 Err(ExecutorError::UnsupportedFeature(_))
500 | Err(ExecutorError::UnsupportedExpression(_)) => {
501 log::debug!("{} pipeline filter failed, falling back", strategy_name);
502 return Ok(None);
503 }
504 Err(e) => return Err(e),
505 };
506
507 // Stage 2: Projection or Aggregation
508 let result = if has_aggregates || has_group_by {
509 // Execute aggregation (includes projection)
510 // Get GROUP BY expressions if present (as slice)
511 let group_by_slice: Option<&[vibesql_ast::Expression]> =
512 stmt.group_by.as_ref().and_then(|g| g.as_simple()).map(|v| v.as_slice());
513 match pipeline.apply_aggregation(
514 filtered.into_input(),
515 &stmt.select_list,
516 group_by_slice,
517 stmt.having.as_ref(),
518 &exec_ctx,
519 ) {
520 Ok(result) => result,
521 Err(ExecutorError::UnsupportedFeature(_))
522 | Err(ExecutorError::UnsupportedExpression(_)) => {
523 log::debug!("{} pipeline aggregation failed, falling back", strategy_name);
524 return Ok(None);
525 }
526 Err(e) => return Err(e),
527 }
528 } else {
529 // Execute projection only
530 match pipeline.apply_projection(filtered.into_input(), &stmt.select_list, &exec_ctx) {
531 Ok(result) => result,
532 Err(ExecutorError::UnsupportedFeature(_))
533 | Err(ExecutorError::UnsupportedExpression(_)) => {
534 log::debug!("{} pipeline projection failed, falling back", strategy_name);
535 return Ok(None);
536 }
537 Err(e) => return Err(e),
538 }
539 };
540
541 // Stage 3: Limit/Offset (convert usize to u64)
542 let limit_u64 = stmt.limit.map(|l| l as u64);
543 let offset_u64 = stmt.offset.map(|o| o as u64);
544 let final_result = pipeline.apply_limit_offset(result, limit_u64, offset_u64)?;
545
546 #[cfg(feature = "profile-q6")]
547 {
548 eprintln!("[PROFILE-Q6] ✓ {} pipeline execution: {:?}", strategy_name, start.elapsed());
549 }
550
551 log::debug!("✓ {} pipeline execution succeeded", strategy_name);
552 Ok(Some(final_result))
553 }
554
555 /// Check if the select list contains window functions
556 fn has_window_functions(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
557 select_list.iter().any(|item| {
558 if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
559 self.expr_has_window_function(expr)
560 } else {
561 false
562 }
563 })
564 }
565
566 /// Recursively check if an expression contains a window function
567 #[allow(clippy::only_used_in_recursion)]
568 fn expr_has_window_function(&self, expr: &vibesql_ast::Expression) -> bool {
569 match expr {
570 vibesql_ast::Expression::WindowFunction { .. } => true,
571 vibesql_ast::Expression::BinaryOp { left, right, .. } => {
572 self.expr_has_window_function(left) || self.expr_has_window_function(right)
573 }
574 vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_window_function(expr),
575 vibesql_ast::Expression::Function { args, .. } => {
576 args.iter().any(|arg| self.expr_has_window_function(arg))
577 }
578 vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
579 operand.as_ref().is_some_and(|e| self.expr_has_window_function(e))
580 || when_clauses.iter().any(|case_when| {
581 case_when.conditions.iter().any(|c| self.expr_has_window_function(c))
582 || self.expr_has_window_function(&case_when.result)
583 })
584 || else_result.as_ref().is_some_and(|e| self.expr_has_window_function(e))
585 }
586 _ => false,
587 }
588 }
589
590 /// Check if the select list contains any DISTINCT aggregates (e.g., COUNT(DISTINCT x))
591 fn has_distinct_aggregates(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
592 select_list.iter().any(|item| {
593 if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
594 self.expr_has_distinct_aggregate(expr)
595 } else {
596 false
597 }
598 })
599 }
600
601 /// Recursively check if an expression contains a DISTINCT aggregate
602 #[allow(clippy::only_used_in_recursion)]
603 fn expr_has_distinct_aggregate(&self, expr: &vibesql_ast::Expression) -> bool {
604 match expr {
605 vibesql_ast::Expression::AggregateFunction { distinct, .. } => *distinct,
606 vibesql_ast::Expression::BinaryOp { left, right, .. } => {
607 self.expr_has_distinct_aggregate(left) || self.expr_has_distinct_aggregate(right)
608 }
609 vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_distinct_aggregate(expr),
610 vibesql_ast::Expression::Function { args, .. } => {
611 args.iter().any(|arg| self.expr_has_distinct_aggregate(arg))
612 }
613 vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
614 operand.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
615 || when_clauses.iter().any(|case_when| {
616 case_when.conditions.iter().any(|c| self.expr_has_distinct_aggregate(c))
617 || self.expr_has_distinct_aggregate(&case_when.result)
618 })
619 || else_result.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
620 }
621 _ => false,
622 }
623 }
624
625 /// Execute using traditional row-oriented path
626 ///
627 /// This is the fallback path when columnar execution is not available or not beneficial.
628 fn execute_row_oriented(
629 &self,
630 stmt: &vibesql_ast::SelectStmt,
631 cte_results: &HashMap<String, CteResult>,
632 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
633 let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
634 let has_group_by = stmt.group_by.is_some();
635
636 if has_aggregates || has_group_by {
637 self.execute_with_aggregation(stmt, cte_results)
638 } else if let Some(from_clause) = &stmt.from {
639 // Re-enabled predicate pushdown for all queries (issue #1902)
640 //
641 // Previously, predicate pushdown was selectively disabled for multi-column IN clauses
642 // because index optimization happened in execute_without_aggregation() on row indices
643 // from the FROM result. When predicate pushdown filtered rows early, the indices no
644 // longer matched the original table, causing incorrect results.
645 //
646 // Now that all index optimization has been moved to the scan level (execute_index_scan),
647 // it happens BEFORE predicate pushdown, avoiding the row-index mismatch problem.
648 // This allows predicate pushdown to work correctly for all queries, improving performance.
649 //
650 // Fixes issues #1807, #1895, #1896, and #1902.
651
652 // Pass WHERE, ORDER BY, and LIMIT to execute_from for optimization
653 // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
654 // Pass select_list for table elimination optimization (#3556)
655 let from_result = self.execute_from_with_where(
656 from_clause,
657 cte_results,
658 stmt.where_clause.as_ref(),
659 stmt.order_by.as_deref(),
660 stmt.limit,
661 Some(&stmt.select_list),
662 )?;
663
664 // Validate column references BEFORE processing rows (issue #2654)
665 // This ensures column errors are caught even when tables are empty
666 // Pass procedural context to allow procedure variables in WHERE clause
667 // Pass outer_schema for correlated subqueries (#2694)
668 super::validation::validate_select_columns_with_context(
669 &stmt.select_list,
670 stmt.where_clause.as_ref(),
671 &from_result.schema,
672 self.procedural_context,
673 self.outer_schema,
674 )?;
675
676 self.execute_without_aggregation(stmt, from_result, cte_results)
677 } else {
678 // SELECT without FROM - evaluate expressions as a single row
679 self.execute_select_without_from(stmt)
680 }
681 }
682
683 /// Execute a chain of set operations left-to-right
684 ///
685 /// SQL set operations are left-associative, so:
686 /// A EXCEPT B EXCEPT C should evaluate as (A EXCEPT B) EXCEPT C
687 ///
688 /// The parser creates a right-recursive AST structure, but we need to execute left-to-right.
689 fn execute_set_operations(
690 &self,
691 mut left_results: Vec<vibesql_storage::Row>,
692 set_op: &vibesql_ast::SetOperation,
693 cte_results: &HashMap<String, CteResult>,
694 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
695 // Execute the immediate right query WITHOUT its set operations
696 // This prevents right-recursive evaluation
697 let right_stmt = &set_op.right;
698 let has_aggregates =
699 self.has_aggregates(&right_stmt.select_list) || right_stmt.having.is_some();
700 let has_group_by = right_stmt.group_by.is_some();
701
702 let right_results = if has_aggregates || has_group_by {
703 self.execute_with_aggregation(right_stmt, cte_results)?
704 } else if let Some(from_clause) = &right_stmt.from {
705 // Note: LIMIT is None for set operation sides - it's applied after the set operation
706 // Pass select_list for table elimination optimization (#3556)
707 let from_result = self.execute_from_with_where(
708 from_clause,
709 cte_results,
710 right_stmt.where_clause.as_ref(),
711 right_stmt.order_by.as_deref(),
712 None,
713 Some(&right_stmt.select_list),
714 )?;
715 self.execute_without_aggregation(right_stmt, from_result, cte_results)?
716 } else {
717 self.execute_select_without_from(right_stmt)?
718 };
719
720 // Track memory for right result before set operation
721 let right_size = estimate_result_size(&right_results);
722 self.track_memory_allocation(right_size)?;
723
724 // Apply the current operation
725 left_results = apply_set_operation(left_results, right_results, set_op)?;
726
727 // Track memory for combined result after set operation
728 let combined_size = estimate_result_size(&left_results);
729 self.track_memory_allocation(combined_size)?;
730
731 // If the right side has more set operations, continue processing them
732 // This creates the left-to-right evaluation: ((A op B) op C) op D
733 if let Some(next_set_op) = &right_stmt.set_operation {
734 left_results = self.execute_set_operations(left_results, next_set_op, cte_results)?;
735 }
736
737 Ok(left_results)
738 }
739
740 /// Execute a FROM clause with WHERE, ORDER BY, and LIMIT for optimization
741 ///
742 /// The LIMIT parameter enables early termination optimization (#3253):
743 /// - When ORDER BY is satisfied by an index and no post-filter is needed,
744 /// the index scan can stop after fetching LIMIT rows
745 ///
746 /// Note: Table elimination (#3556) is now handled at the optimizer level
747 /// via crate::optimizer::eliminate_unused_tables(), which runs before
748 /// semi-join transformation to avoid complex interactions.
749 pub(super) fn execute_from_with_where(
750 &self,
751 from: &vibesql_ast::FromClause,
752 cte_results: &HashMap<String, CteResult>,
753 where_clause: Option<&vibesql_ast::Expression>,
754 order_by: Option<&[vibesql_ast::OrderByItem]>,
755 limit: Option<usize>,
756 _select_list: Option<&[vibesql_ast::SelectItem]>, // No longer used - optimization moved to optimizer pass
757 ) -> Result<FromResult, ExecutorError> {
758 use crate::select::scan::execute_from_clause;
759
760 let from_result = execute_from_clause(
761 from,
762 cte_results,
763 self.database,
764 where_clause,
765 order_by,
766 limit,
767 self.outer_row,
768 self.outer_schema,
769 |query| {
770 // For derived table subqueries, create a child executor with CTE context
771 // This allows CTEs from the outer WITH clause to be referenced in subqueries
772 // Critical for queries like TPC-DS Q2 where CTEs are used in FROM subqueries
773 if !cte_results.is_empty() {
774 let child = SelectExecutor::new_with_cte_and_depth(
775 self.database,
776 cte_results,
777 self.subquery_depth,
778 );
779 child.execute_with_columns(query)
780 } else {
781 self.execute_with_columns(query)
782 }
783 },
784 )?;
785
786 // NOTE: We DON'T merge outer schema with from_result.schema here because:
787 // 1. from_result.rows only contain values from inner tables
788 // 2. Outer columns are resolved via the evaluator's outer_row/outer_schema
789 // 3. Merging would create schema/row mismatch (schema has outer cols, rows don't)
790
791 Ok(from_result)
792 }
793}