vibesql_executor/select/executor/
execute.rs

1//! Main execution methods for SelectExecutor
2//!
3//! This module implements the unified execution dispatcher that routes queries
4//! to the appropriate execution pipeline based on the selected strategy.
5//!
6//! ## Execution Pipeline Architecture
7//!
8//! The dispatcher uses the `ExecutionPipeline` trait to provide a unified interface
9//! for query execution across different strategies:
10//!
11//! - **NativeColumnar**: Zero-copy SIMD execution from columnar storage
12//! - **StandardColumnar**: SIMD execution with row-to-batch conversion
13//! - **RowOriented**: Traditional row-by-row execution
14//! - **ExpressionOnly**: SELECT without FROM clause (special case)
15//!
16//! ```text
17//! Strategy Selection → Create Pipeline → Execute Pipeline Stages → Results
18//!                          ↓
19//!          apply_filter → apply_projection → apply_aggregation → apply_limit_offset
20//! ```
21
22use std::collections::HashMap;
23
24use super::builder::SelectExecutor;
25use crate::{
26    errors::ExecutorError,
27    optimizer::adaptive::{choose_execution_strategy, ExecutionStrategy, StrategyContext},
28    pipeline::{
29        ColumnarPipeline, ExecutionContext, ExecutionPipeline, NativeColumnarPipeline,
30        PipelineInput,
31    },
32    select::{
33        cte::{execute_ctes, execute_ctes_with_memory_check, CteResult},
34        helpers::{apply_limit_offset, estimate_result_size},
35        join::FromResult,
36        set_operations::apply_set_operation,
37        SelectResult,
38    },
39};
40
41impl SelectExecutor<'_> {
42    /// Execute a SELECT statement
43    pub fn execute(
44        &self,
45        stmt: &vibesql_ast::SelectStmt,
46    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
47        #[cfg(feature = "profile-q6")]
48        let execute_start = std::time::Instant::now();
49
50        // Reset arena for fresh query execution (only at top level)
51        if self.subquery_depth == 0 {
52            self.reset_arena();
53        }
54
55        // Check timeout before starting execution
56        self.check_timeout()?;
57
58        // Check subquery depth limit to prevent stack overflow
59        if self.subquery_depth >= crate::limits::MAX_EXPRESSION_DEPTH {
60            return Err(ExecutorError::ExpressionDepthExceeded {
61                depth: self.subquery_depth,
62                max_depth: crate::limits::MAX_EXPRESSION_DEPTH,
63            });
64        }
65
66        // Fast path for simple point-lookup queries (TPC-C optimization)
67        // This bypasses expensive optimizer passes for queries like:
68        // SELECT col FROM table WHERE pk = value
69        if self.subquery_depth == 0
70            && self.outer_row.is_none()
71            && self.cte_context.is_none()
72            && super::fast_path::is_simple_point_query(stmt)
73        {
74            return self.execute_fast_path(stmt);
75        }
76
77        #[cfg(feature = "profile-q6")]
78        let _setup_time = execute_start.elapsed();
79
80        // Apply subquery rewriting optimizations (Phase 2 of IN subquery optimization)
81        // - Rewrites correlated IN → EXISTS with LIMIT 1 for early termination
82        // - Adds DISTINCT to uncorrelated IN subqueries to reduce duplicate processing
83        // This works in conjunction with Phase 1 (HashSet optimization, #2136)
84        #[cfg(feature = "profile-q6")]
85        let optimizer_start = std::time::Instant::now();
86
87        let optimized_stmt = crate::optimizer::rewrite_subquery_optimizations(stmt);
88
89        #[cfg(feature = "profile-q6")]
90        let _optimizer_time = optimizer_start.elapsed();
91
92        // Eliminate unused tables that create unnecessary cross joins (#3556)
93        // Must run BEFORE semi-join transformation to avoid complex interactions
94        // with derived tables from EXISTS/IN transformations
95        let optimized_stmt = crate::optimizer::eliminate_unused_tables(&optimized_stmt);
96
97        // Transform decorrelated IN/EXISTS subqueries to semi/anti-joins (#2424)
98        // This enables hash-based join execution instead of row-by-row subquery evaluation
99        // Converts WHERE clauses like "WHERE x IN (SELECT y FROM t)" to "SEMI JOIN t ON x = y"
100        let optimized_stmt = crate::optimizer::transform_subqueries_to_joins(&optimized_stmt);
101
102        // Execute CTEs if present and merge with outer query's CTE context
103        let mut cte_results = if let Some(with_clause) = &optimized_stmt.with_clause {
104            // This query has its own CTEs - execute them with memory tracking
105            execute_ctes_with_memory_check(
106                with_clause,
107                |query, cte_ctx| self.execute_with_ctes(query, cte_ctx),
108                |size| self.track_memory_allocation(size),
109            )?
110        } else {
111            HashMap::new()
112        };
113
114        // If we have access to outer query's CTEs (for subqueries), merge them in
115        // Local CTEs take precedence over outer CTEs if there are name conflicts
116        if let Some(outer_cte_ctx) = self.cte_context {
117            for (name, result) in outer_cte_ctx {
118                cte_results.entry(name.clone()).or_insert_with(|| result.clone());
119            }
120        }
121
122        #[cfg(feature = "profile-q6")]
123        let _pre_execute_time = execute_start.elapsed();
124
125        // Execute the main query with CTE context
126        let result = self.execute_with_ctes(&optimized_stmt, &cte_results)?;
127
128        #[cfg(feature = "profile-q6")]
129        {
130            let _total_execute = execute_start.elapsed();
131        }
132
133        Ok(result)
134    }
135
136    /// Execute a SELECT statement and return an iterator over results
137    ///
138    /// This enables early termination when the full result set is not needed,
139    /// such as for IN subqueries where we stop after finding the first match.
140    ///
141    /// # Phase 1 Implementation (Early Termination for IN subqueries)
142    ///
143    /// Current implementation materializes results then returns an iterator.
144    /// This still enables early termination in the consumer (e.g., eval_in_subquery)
145    /// by stopping iteration when a match is found.
146    ///
147    /// Future optimization: Leverage the existing RowIterator infrastructure
148    /// (crate::select::iterator) for truly lazy evaluation that stops execution
149    /// early, not just iteration.
150    pub fn execute_iter(
151        &self,
152        stmt: &vibesql_ast::SelectStmt,
153    ) -> Result<impl Iterator<Item = vibesql_storage::Row>, ExecutorError> {
154        // For Phase 1, materialize then return iterator
155        // This still enables early termination in the consumer
156        let rows = self.execute(stmt)?;
157        Ok(rows.into_iter())
158    }
159
160    /// Execute a SELECT statement and return both columns and rows
161    pub fn execute_with_columns(
162        &self,
163        stmt: &vibesql_ast::SelectStmt,
164    ) -> Result<SelectResult, ExecutorError> {
165        // First, get the FROM result to access the schema
166        let from_result = if let Some(from_clause) = &stmt.from {
167            let mut cte_results = if let Some(with_clause) = &stmt.with_clause {
168                execute_ctes(with_clause, |query, cte_ctx| self.execute_with_ctes(query, cte_ctx))?
169            } else {
170                HashMap::new()
171            };
172            // If we have access to outer query's CTEs (for subqueries/derived tables), merge them in
173            // Local CTEs take precedence over outer CTEs if there are name conflicts
174            // This is critical for queries like TPC-DS Q2 where CTEs are referenced from derived tables
175            if let Some(outer_cte_ctx) = self.cte_context {
176                for (name, result) in outer_cte_ctx {
177                    cte_results.entry(name.clone()).or_insert_with(|| result.clone());
178                }
179            }
180            // Pass WHERE, ORDER BY, and LIMIT for optimizations
181            // This is critical for GROUP BY queries to avoid CROSS JOINs
182            // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
183            // Pass select_list for table elimination optimization (#3556)
184            Some(self.execute_from_with_where(
185                from_clause,
186                &cte_results,
187                stmt.where_clause.as_ref(),
188                stmt.order_by.as_deref(),
189                stmt.limit,
190                Some(&stmt.select_list),
191            )?)
192        } else {
193            None
194        };
195
196        // Derive column names from the SELECT list
197        let columns = self.derive_column_names(&stmt.select_list, from_result.as_ref())?;
198
199        // Execute the query to get rows
200        let rows = self.execute(stmt)?;
201
202        Ok(SelectResult { columns, rows })
203    }
204
205    /// Execute SELECT statement with CTE context
206    ///
207    /// Uses unified strategy selection to determine the optimal execution path:
208    /// - NativeColumnar: Zero-copy SIMD execution from columnar storage
209    /// - StandardColumnar: SIMD execution with row-to-batch conversion
210    /// - RowOriented: Traditional row-by-row execution
211    /// - ExpressionOnly: SELECT without FROM clause (special case)
212    ///
213    /// ## Pipeline-Based Execution (Phase 5)
214    ///
215    /// This method uses the `ExecutionPipeline` trait to provide a unified interface
216    /// for query execution. Each strategy creates an appropriate pipeline that
217    /// implements filter, projection, aggregation, and limit/offset operations.
218    ///
219    /// ```text
220    /// Strategy Selection → Create Pipeline → Execute via Trait Methods
221    ///                              ↓
222    ///   NativeColumnar  → NativeColumnarPipeline::apply_*()
223    ///   StandardColumnar → ColumnarPipeline::apply_*()
224    ///   RowOriented     → RowOrientedPipeline::apply_*()
225    ///   ExpressionOnly  → Special case (no table scan)
226    /// ```
227    pub(super) fn execute_with_ctes(
228        &self,
229        stmt: &vibesql_ast::SelectStmt,
230        cte_results: &HashMap<String, CteResult>,
231    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
232        #[cfg(feature = "profile-q6")]
233        let _execute_ctes_start = std::time::Instant::now();
234
235        // Check if native columnar is enabled via feature flag or env var
236        let native_columnar_enabled =
237            cfg!(feature = "native-columnar") || std::env::var("VIBESQL_NATIVE_COLUMNAR").is_ok();
238
239        // Use unified strategy selection for the execution path
240        let strategy_ctx = StrategyContext::new(stmt, cte_results, native_columnar_enabled);
241        let strategy = choose_execution_strategy(&strategy_ctx);
242
243        log::debug!(
244            "Execution strategy selected: {} (reason: {})",
245            strategy.name(),
246            strategy.score().reason
247        );
248
249        #[cfg(feature = "profile-q6")]
250        eprintln!(
251            "[PROFILE-Q6] Execution strategy: {} ({})",
252            strategy.name(),
253            strategy.score().reason
254        );
255
256        // Dispatch based on selected strategy using ExecutionPipeline trait
257        // Pipeline execution returns Option<Vec<Row>> - None means fallback needed
258        let mut results = match strategy {
259            ExecutionStrategy::NativeColumnar { .. } => {
260                // First try the optimized zero-copy native columnar path
261                // This uses ColumnarBatch::from_storage_columnar() for zero-copy conversion
262                // and executes filter+aggregate in a single pass without row materialization
263                if let Some(result) = self.try_native_columnar_execution(stmt, cte_results)? {
264                    #[cfg(feature = "profile-q6")]
265                    eprintln!("[PROFILE-Q6] Native columnar: zero-copy path succeeded");
266                    result
267                } else {
268                    // Fall back to pipeline-based execution if zero-copy path is not applicable
269                    // (e.g., complex predicates, multiple tables, unsupported aggregates)
270                    log::debug!("Native columnar: zero-copy path not applicable, trying pipeline");
271                    match self.execute_via_pipeline(
272                        stmt,
273                        cte_results,
274                        NativeColumnarPipeline::new,
275                        "NativeColumnar",
276                    )? {
277                        Some(result) => result,
278                        None => {
279                            // Fall back to row-oriented if pipeline also fails
280                            log::debug!("Native columnar runtime fallback to row-oriented");
281                            #[cfg(feature = "profile-q6")]
282                            eprintln!("[PROFILE-Q6] Native columnar fallback to row-oriented");
283                            self.execute_row_oriented(stmt, cte_results)?
284                        }
285                    }
286                }
287            }
288
289            ExecutionStrategy::StandardColumnar { .. } => {
290                // StandardColumnar uses the pipeline-based execution path
291                // Note: We don't use try_native_columnar_execution here because row tables
292                // go through the pipeline which correctly handles all data types including dates.
293                // The native columnar zero-copy path has known limitations with certain date comparisons.
294                match self.execute_via_pipeline(
295                    stmt,
296                    cte_results,
297                    ColumnarPipeline::new,
298                    "StandardColumnar",
299                )? {
300                    Some(result) => result,
301                    None => {
302                        log::debug!("Standard columnar runtime fallback to row-oriented");
303                        #[cfg(feature = "profile-q6")]
304                        eprintln!("[PROFILE-Q6] Standard columnar fallback to row-oriented");
305                        self.execute_row_oriented(stmt, cte_results)?
306                    }
307                }
308            }
309
310            ExecutionStrategy::RowOriented { .. } => {
311                // Row-oriented uses the traditional path which has full feature support
312                // The RowOrientedPipeline is used for simpler queries, but complex
313                // queries (with JOINs, window functions, DISTINCT, etc.) need the
314                // full execute_row_oriented implementation
315
316                // Phase 4: Try columnar join execution for multi-table JOIN queries (#2943)
317                // This provides 3-5x speedup for TPC-H Q3 style queries
318                let has_joins = stmt
319                    .from
320                    .as_ref()
321                    .is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
322                if has_joins {
323                    if let Some(result) = self.try_columnar_join_execution(stmt, cte_results)? {
324                        log::info!("Columnar join execution succeeded");
325                        result
326                    } else {
327                        log::debug!(
328                            "Columnar join execution not applicable, falling back to row-oriented"
329                        );
330                        self.execute_row_oriented(stmt, cte_results)?
331                    }
332                } else {
333                    self.execute_row_oriented(stmt, cte_results)?
334                }
335            }
336
337            ExecutionStrategy::ExpressionOnly { .. } => {
338                // SELECT without FROM - special case that doesn't use pipelines
339                // May still have aggregates (e.g., SELECT COUNT(*), SELECT MAX(1))
340                // Note: Do NOT use early return here - we need to fall through to set operations handling
341                self.execute_expression_only(stmt, cte_results)?
342            }
343        };
344
345        // Handle set operations (UNION, INTERSECT, EXCEPT)
346        // Process operations left-to-right to ensure correct associativity
347        if let Some(set_op) = &stmt.set_operation {
348            results = self.execute_set_operations(results, set_op, cte_results)?;
349
350            // Apply LIMIT/OFFSET to the final result (after all set operations)
351            // For queries WITHOUT set operations, LIMIT/OFFSET is already applied
352            // in execute_without_aggregation() or execute_with_aggregation()
353            results = apply_limit_offset(results, stmt.limit, stmt.offset);
354        }
355
356        Ok(results)
357    }
358
359    /// Execute SELECT without FROM clause (ExpressionOnly strategy)
360    ///
361    /// This is a special case that doesn't use the pipeline trait since there's
362    /// no table scan involved. Handles both simple expressions and aggregates.
363    fn execute_expression_only(
364        &self,
365        stmt: &vibesql_ast::SelectStmt,
366        cte_results: &HashMap<String, CteResult>,
367    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
368        let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
369
370        if has_aggregates {
371            // Aggregates without FROM need the aggregation path
372            self.execute_with_aggregation(stmt, cte_results)
373        } else {
374            // Simple expression evaluation (e.g., SELECT 1 + 1)
375            self.execute_select_without_from(stmt)
376        }
377    }
378
379    /// Execute a query using the specified execution pipeline
380    ///
381    /// This method provides a unified interface for pipeline-based execution.
382    /// It creates the pipeline, prepares input, and executes the pipeline stages.
383    ///
384    /// Returns `Ok(Some(results))` if the pipeline executed successfully,
385    /// `Ok(None)` if the pipeline cannot handle the query (fallback needed),
386    /// or `Err` if an error occurred.
387    ///
388    /// # Type Parameters
389    ///
390    /// * `P` - The pipeline type (must implement `ExecutionPipeline`)
391    /// * `F` - Factory function to create the pipeline
392    fn execute_via_pipeline<P, F>(
393        &self,
394        stmt: &vibesql_ast::SelectStmt,
395        cte_results: &HashMap<String, CteResult>,
396        create_pipeline: F,
397        strategy_name: &str,
398    ) -> Result<Option<Vec<vibesql_storage::Row>>, ExecutorError>
399    where
400        P: ExecutionPipeline,
401        F: FnOnce() -> P,
402    {
403        #[cfg(feature = "profile-q6")]
404        let start = std::time::Instant::now();
405
406        // Check query complexity - pipelines don't support all features
407        let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
408        let has_group_by = stmt.group_by.is_some();
409        let has_joins =
410            stmt.from.as_ref().is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
411        let has_order_by = stmt.order_by.is_some();
412        let has_distinct = stmt.distinct;
413        let has_set_ops = stmt.set_operation.is_some();
414        let has_window_funcs = self.has_window_functions(&stmt.select_list);
415        let has_distinct_aggregates = self.has_distinct_aggregates(&stmt.select_list);
416
417        // Create the pipeline
418        let pipeline = create_pipeline();
419
420        // Check if the pipeline supports this query pattern
421        if !pipeline.supports_query_pattern(has_aggregates, has_group_by, has_joins) {
422            log::debug!(
423                "{} pipeline doesn't support query pattern (agg={}, group_by={}, joins={})",
424                strategy_name,
425                has_aggregates,
426                has_group_by,
427                has_joins
428            );
429            return Ok(None);
430        }
431
432        // For complex queries (ORDER BY, DISTINCT, window functions, set ops, DISTINCT aggregates),
433        // fall back to full execution paths which have complete support
434        if has_order_by
435            || has_distinct
436            || has_window_funcs
437            || has_set_ops
438            || has_distinct_aggregates
439        {
440            log::debug!(
441                "{} pipeline doesn't support complex features (order_by={}, distinct={}, window={}, set_ops={}, distinct_agg={})",
442                strategy_name,
443                has_order_by,
444                has_distinct,
445                has_window_funcs,
446                has_set_ops,
447                has_distinct_aggregates
448            );
449            return Ok(None);
450        }
451
452        // Must have a FROM clause for pipeline execution
453        let from_clause = match &stmt.from {
454            Some(from) => from,
455            None => return Ok(None),
456        };
457
458        // Execute FROM clause to get input data
459        // Note: WHERE, ORDER BY, and LIMIT are handled by the pipeline, not here
460        // Note: Table elimination requires WHERE clause, so pass None for select_list too
461        let from_result = self.execute_from_with_where(
462            from_clause,
463            cte_results,
464            None, // Pipeline will apply WHERE filter
465            None, // ORDER BY handled separately
466            None, // LIMIT applied after pipeline
467            None, // No table elimination when WHERE is deferred
468        )?;
469
470        // Build execution context
471        let mut exec_ctx = ExecutionContext::new(&from_result.schema, self.database);
472        // Add outer context for correlated subqueries (#2998)
473        if let (Some(outer_row), Some(outer_schema)) = (self.outer_row, self.outer_schema) {
474            exec_ctx = exec_ctx.with_outer_context(outer_row, outer_schema);
475        }
476        // Add CTE context if available
477        if !cte_results.is_empty() {
478            exec_ctx = exec_ctx.with_cte_context(cte_results);
479        }
480
481        // Validate column references BEFORE processing
482        super::validation::validate_select_columns_with_context(
483            &stmt.select_list,
484            stmt.where_clause.as_ref(),
485            &from_result.schema,
486            self.procedural_context,
487            self.outer_schema,
488        )?;
489
490        // Prepare input from FROM result
491        let input = PipelineInput::from_rows_owned(from_result.data.into_rows());
492
493        // Execute pipeline stages with fallback on error
494        // If any pipeline stage fails with UnsupportedFeature, fall back to row-oriented
495
496        // Stage 1: Filter (WHERE clause)
497        let filtered = match pipeline.apply_filter(input, stmt.where_clause.as_ref(), &exec_ctx) {
498            Ok(result) => result,
499            Err(ExecutorError::UnsupportedFeature(_))
500            | Err(ExecutorError::UnsupportedExpression(_)) => {
501                log::debug!("{} pipeline filter failed, falling back", strategy_name);
502                return Ok(None);
503            }
504            Err(e) => return Err(e),
505        };
506
507        // Stage 2: Projection or Aggregation
508        let result = if has_aggregates || has_group_by {
509            // Execute aggregation (includes projection)
510            // Get GROUP BY expressions if present (as slice)
511            let group_by_slice: Option<&[vibesql_ast::Expression]> =
512                stmt.group_by.as_ref().and_then(|g| g.as_simple()).map(|v| v.as_slice());
513            match pipeline.apply_aggregation(
514                filtered.into_input(),
515                &stmt.select_list,
516                group_by_slice,
517                stmt.having.as_ref(),
518                &exec_ctx,
519            ) {
520                Ok(result) => result,
521                Err(ExecutorError::UnsupportedFeature(_))
522                | Err(ExecutorError::UnsupportedExpression(_)) => {
523                    log::debug!("{} pipeline aggregation failed, falling back", strategy_name);
524                    return Ok(None);
525                }
526                Err(e) => return Err(e),
527            }
528        } else {
529            // Execute projection only
530            match pipeline.apply_projection(filtered.into_input(), &stmt.select_list, &exec_ctx) {
531                Ok(result) => result,
532                Err(ExecutorError::UnsupportedFeature(_))
533                | Err(ExecutorError::UnsupportedExpression(_)) => {
534                    log::debug!("{} pipeline projection failed, falling back", strategy_name);
535                    return Ok(None);
536                }
537                Err(e) => return Err(e),
538            }
539        };
540
541        // Stage 3: Limit/Offset (convert usize to u64)
542        let limit_u64 = stmt.limit.map(|l| l as u64);
543        let offset_u64 = stmt.offset.map(|o| o as u64);
544        let final_result = pipeline.apply_limit_offset(result, limit_u64, offset_u64)?;
545
546        #[cfg(feature = "profile-q6")]
547        {
548            eprintln!("[PROFILE-Q6] ✓ {} pipeline execution: {:?}", strategy_name, start.elapsed());
549        }
550
551        log::debug!("✓ {} pipeline execution succeeded", strategy_name);
552        Ok(Some(final_result))
553    }
554
555    /// Check if the select list contains window functions
556    fn has_window_functions(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
557        select_list.iter().any(|item| {
558            if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
559                self.expr_has_window_function(expr)
560            } else {
561                false
562            }
563        })
564    }
565
566    /// Recursively check if an expression contains a window function
567    #[allow(clippy::only_used_in_recursion)]
568    fn expr_has_window_function(&self, expr: &vibesql_ast::Expression) -> bool {
569        match expr {
570            vibesql_ast::Expression::WindowFunction { .. } => true,
571            vibesql_ast::Expression::BinaryOp { left, right, .. } => {
572                self.expr_has_window_function(left) || self.expr_has_window_function(right)
573            }
574            vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_window_function(expr),
575            vibesql_ast::Expression::Function { args, .. } => {
576                args.iter().any(|arg| self.expr_has_window_function(arg))
577            }
578            vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
579                operand.as_ref().is_some_and(|e| self.expr_has_window_function(e))
580                    || when_clauses.iter().any(|case_when| {
581                        case_when.conditions.iter().any(|c| self.expr_has_window_function(c))
582                            || self.expr_has_window_function(&case_when.result)
583                    })
584                    || else_result.as_ref().is_some_and(|e| self.expr_has_window_function(e))
585            }
586            _ => false,
587        }
588    }
589
590    /// Check if the select list contains any DISTINCT aggregates (e.g., COUNT(DISTINCT x))
591    fn has_distinct_aggregates(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
592        select_list.iter().any(|item| {
593            if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
594                self.expr_has_distinct_aggregate(expr)
595            } else {
596                false
597            }
598        })
599    }
600
601    /// Recursively check if an expression contains a DISTINCT aggregate
602    #[allow(clippy::only_used_in_recursion)]
603    fn expr_has_distinct_aggregate(&self, expr: &vibesql_ast::Expression) -> bool {
604        match expr {
605            vibesql_ast::Expression::AggregateFunction { distinct, .. } => *distinct,
606            vibesql_ast::Expression::BinaryOp { left, right, .. } => {
607                self.expr_has_distinct_aggregate(left) || self.expr_has_distinct_aggregate(right)
608            }
609            vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_distinct_aggregate(expr),
610            vibesql_ast::Expression::Function { args, .. } => {
611                args.iter().any(|arg| self.expr_has_distinct_aggregate(arg))
612            }
613            vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
614                operand.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
615                    || when_clauses.iter().any(|case_when| {
616                        case_when.conditions.iter().any(|c| self.expr_has_distinct_aggregate(c))
617                            || self.expr_has_distinct_aggregate(&case_when.result)
618                    })
619                    || else_result.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
620            }
621            _ => false,
622        }
623    }
624
625    /// Execute using traditional row-oriented path
626    ///
627    /// This is the fallback path when columnar execution is not available or not beneficial.
628    fn execute_row_oriented(
629        &self,
630        stmt: &vibesql_ast::SelectStmt,
631        cte_results: &HashMap<String, CteResult>,
632    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
633        let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
634        let has_group_by = stmt.group_by.is_some();
635
636        if has_aggregates || has_group_by {
637            self.execute_with_aggregation(stmt, cte_results)
638        } else if let Some(from_clause) = &stmt.from {
639            // Re-enabled predicate pushdown for all queries (issue #1902)
640            //
641            // Previously, predicate pushdown was selectively disabled for multi-column IN clauses
642            // because index optimization happened in execute_without_aggregation() on row indices
643            // from the FROM result. When predicate pushdown filtered rows early, the indices no
644            // longer matched the original table, causing incorrect results.
645            //
646            // Now that all index optimization has been moved to the scan level (execute_index_scan),
647            // it happens BEFORE predicate pushdown, avoiding the row-index mismatch problem.
648            // This allows predicate pushdown to work correctly for all queries, improving performance.
649            //
650            // Fixes issues #1807, #1895, #1896, and #1902.
651
652            // Pass WHERE, ORDER BY, and LIMIT to execute_from for optimization
653            // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
654            // Pass select_list for table elimination optimization (#3556)
655            let from_result = self.execute_from_with_where(
656                from_clause,
657                cte_results,
658                stmt.where_clause.as_ref(),
659                stmt.order_by.as_deref(),
660                stmt.limit,
661                Some(&stmt.select_list),
662            )?;
663
664            // Validate column references BEFORE processing rows (issue #2654)
665            // This ensures column errors are caught even when tables are empty
666            // Pass procedural context to allow procedure variables in WHERE clause
667            // Pass outer_schema for correlated subqueries (#2694)
668            super::validation::validate_select_columns_with_context(
669                &stmt.select_list,
670                stmt.where_clause.as_ref(),
671                &from_result.schema,
672                self.procedural_context,
673                self.outer_schema,
674            )?;
675
676            self.execute_without_aggregation(stmt, from_result, cte_results)
677        } else {
678            // SELECT without FROM - evaluate expressions as a single row
679            self.execute_select_without_from(stmt)
680        }
681    }
682
683    /// Execute a chain of set operations left-to-right
684    ///
685    /// SQL set operations are left-associative, so:
686    /// A EXCEPT B EXCEPT C should evaluate as (A EXCEPT B) EXCEPT C
687    ///
688    /// The parser creates a right-recursive AST structure, but we need to execute left-to-right.
689    fn execute_set_operations(
690        &self,
691        mut left_results: Vec<vibesql_storage::Row>,
692        set_op: &vibesql_ast::SetOperation,
693        cte_results: &HashMap<String, CteResult>,
694    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
695        // Execute the immediate right query WITHOUT its set operations
696        // This prevents right-recursive evaluation
697        let right_stmt = &set_op.right;
698        let has_aggregates =
699            self.has_aggregates(&right_stmt.select_list) || right_stmt.having.is_some();
700        let has_group_by = right_stmt.group_by.is_some();
701
702        let right_results = if has_aggregates || has_group_by {
703            self.execute_with_aggregation(right_stmt, cte_results)?
704        } else if let Some(from_clause) = &right_stmt.from {
705            // Note: LIMIT is None for set operation sides - it's applied after the set operation
706            // Pass select_list for table elimination optimization (#3556)
707            let from_result = self.execute_from_with_where(
708                from_clause,
709                cte_results,
710                right_stmt.where_clause.as_ref(),
711                right_stmt.order_by.as_deref(),
712                None,
713                Some(&right_stmt.select_list),
714            )?;
715            self.execute_without_aggregation(right_stmt, from_result, cte_results)?
716        } else {
717            self.execute_select_without_from(right_stmt)?
718        };
719
720        // Track memory for right result before set operation
721        let right_size = estimate_result_size(&right_results);
722        self.track_memory_allocation(right_size)?;
723
724        // Apply the current operation
725        left_results = apply_set_operation(left_results, right_results, set_op)?;
726
727        // Track memory for combined result after set operation
728        let combined_size = estimate_result_size(&left_results);
729        self.track_memory_allocation(combined_size)?;
730
731        // If the right side has more set operations, continue processing them
732        // This creates the left-to-right evaluation: ((A op B) op C) op D
733        if let Some(next_set_op) = &right_stmt.set_operation {
734            left_results = self.execute_set_operations(left_results, next_set_op, cte_results)?;
735        }
736
737        Ok(left_results)
738    }
739
740    /// Execute a FROM clause with WHERE, ORDER BY, and LIMIT for optimization
741    ///
742    /// The LIMIT parameter enables early termination optimization (#3253):
743    /// - When ORDER BY is satisfied by an index and no post-filter is needed,
744    ///   the index scan can stop after fetching LIMIT rows
745    ///
746    /// Note: Table elimination (#3556) is now handled at the optimizer level
747    /// via crate::optimizer::eliminate_unused_tables(), which runs before
748    /// semi-join transformation to avoid complex interactions.
749    pub(super) fn execute_from_with_where(
750        &self,
751        from: &vibesql_ast::FromClause,
752        cte_results: &HashMap<String, CteResult>,
753        where_clause: Option<&vibesql_ast::Expression>,
754        order_by: Option<&[vibesql_ast::OrderByItem]>,
755        limit: Option<usize>,
756        _select_list: Option<&[vibesql_ast::SelectItem]>, // No longer used - optimization moved to optimizer pass
757    ) -> Result<FromResult, ExecutorError> {
758        use crate::select::scan::execute_from_clause;
759
760        let from_result = execute_from_clause(
761            from,
762            cte_results,
763            self.database,
764            where_clause,
765            order_by,
766            limit,
767            self.outer_row,
768            self.outer_schema,
769            |query| {
770                // For derived table subqueries, create a child executor with CTE context
771                // This allows CTEs from the outer WITH clause to be referenced in subqueries
772                // Critical for queries like TPC-DS Q2 where CTEs are used in FROM subqueries
773                if !cte_results.is_empty() {
774                    let child = SelectExecutor::new_with_cte_and_depth(
775                        self.database,
776                        cte_results,
777                        self.subquery_depth,
778                    );
779                    child.execute_with_columns(query)
780                } else {
781                    self.execute_with_columns(query)
782                }
783            },
784        )?;
785
786        // NOTE: We DON'T merge outer schema with from_result.schema here because:
787        // 1. from_result.rows only contain values from inner tables
788        // 2. Outer columns are resolved via the evaluator's outer_row/outer_schema
789        // 3. Merging would create schema/row mismatch (schema has outer cols, rows don't)
790
791        Ok(from_result)
792    }
793}