vibesql_executor/select/executor/
execute.rs

1//! Main execution methods for SelectExecutor
2//!
3//! This module implements the unified execution dispatcher that routes queries
4//! to the appropriate execution pipeline based on the selected strategy.
5//!
6//! ## Execution Pipeline Architecture
7//!
8//! The dispatcher uses the `ExecutionPipeline` trait to provide a unified interface
9//! for query execution across different strategies:
10//!
11//! - **NativeColumnar**: Zero-copy SIMD execution from columnar storage
12//! - **StandardColumnar**: SIMD execution with row-to-batch conversion
13//! - **RowOriented**: Traditional row-by-row execution
14//! - **ExpressionOnly**: SELECT without FROM clause (special case)
15//!
16//! ```text
17//! Strategy Selection → Create Pipeline → Execute Pipeline Stages → Results
18//!                          ↓
19//!          apply_filter → apply_projection → apply_aggregation → apply_limit_offset
20//! ```
21
22use std::collections::HashMap;
23
24use super::builder::SelectExecutor;
25use crate::{
26    errors::ExecutorError,
27    optimizer::adaptive::{
28        choose_execution_strategy, ExecutionStrategy, StrategyContext,
29    },
30    pipeline::{
31        ColumnarPipeline, ExecutionContext, ExecutionPipeline, NativeColumnarPipeline,
32        PipelineInput,
33    },
34    select::{
35        cte::{execute_ctes, execute_ctes_with_memory_check, CteResult},
36        helpers::{apply_limit_offset, estimate_result_size},
37        join::FromResult,
38        set_operations::apply_set_operation,
39        SelectResult,
40    },
41};
42
43impl SelectExecutor<'_> {
44    /// Execute a SELECT statement
45    pub fn execute(&self, stmt: &vibesql_ast::SelectStmt) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
46        #[cfg(feature = "profile-q6")]
47        let execute_start = std::time::Instant::now();
48
49        // Reset arena for fresh query execution (only at top level)
50        if self.subquery_depth == 0 {
51            self.reset_arena();
52        }
53
54        // Check timeout before starting execution
55        self.check_timeout()?;
56
57        // Check subquery depth limit to prevent stack overflow
58        if self.subquery_depth >= crate::limits::MAX_EXPRESSION_DEPTH {
59            return Err(ExecutorError::ExpressionDepthExceeded {
60                depth: self.subquery_depth,
61                max_depth: crate::limits::MAX_EXPRESSION_DEPTH,
62            });
63        }
64
65        // Fast path for simple point-lookup queries (TPC-C optimization)
66        // This bypasses expensive optimizer passes for queries like:
67        // SELECT col FROM table WHERE pk = value
68        if self.subquery_depth == 0
69            && self.outer_row.is_none()
70            && self.cte_context.is_none()
71            && super::fast_path::is_simple_point_query(stmt)
72        {
73            return self.execute_fast_path(stmt);
74        }
75
76        #[cfg(feature = "profile-q6")]
77        let _setup_time = execute_start.elapsed();
78
79        // Apply subquery rewriting optimizations (Phase 2 of IN subquery optimization)
80        // - Rewrites correlated IN → EXISTS with LIMIT 1 for early termination
81        // - Adds DISTINCT to uncorrelated IN subqueries to reduce duplicate processing
82        // This works in conjunction with Phase 1 (HashSet optimization, #2136)
83        #[cfg(feature = "profile-q6")]
84        let optimizer_start = std::time::Instant::now();
85
86        let optimized_stmt = crate::optimizer::rewrite_subquery_optimizations(stmt);
87
88        #[cfg(feature = "profile-q6")]
89        let _optimizer_time = optimizer_start.elapsed();
90
91        // Transform decorrelated IN/EXISTS subqueries to semi/anti-joins (#2424)
92        // This enables hash-based join execution instead of row-by-row subquery evaluation
93        // Converts WHERE clauses like "WHERE x IN (SELECT y FROM t)" to "SEMI JOIN t ON x = y"
94        let optimized_stmt = crate::optimizer::transform_subqueries_to_joins(&optimized_stmt);
95
96        // Execute CTEs if present and merge with outer query's CTE context
97        let mut cte_results = if let Some(with_clause) = &optimized_stmt.with_clause {
98            // This query has its own CTEs - execute them with memory tracking
99            execute_ctes_with_memory_check(
100                with_clause,
101                |query, cte_ctx| self.execute_with_ctes(query, cte_ctx),
102                |size| self.track_memory_allocation(size),
103            )?
104        } else {
105            HashMap::new()
106        };
107
108        // If we have access to outer query's CTEs (for subqueries), merge them in
109        // Local CTEs take precedence over outer CTEs if there are name conflicts
110        if let Some(outer_cte_ctx) = self.cte_context {
111            for (name, result) in outer_cte_ctx {
112                cte_results.entry(name.clone()).or_insert_with(|| result.clone());
113            }
114        }
115
116        #[cfg(feature = "profile-q6")]
117        let _pre_execute_time = execute_start.elapsed();
118
119        // Execute the main query with CTE context
120        let result = self.execute_with_ctes(&optimized_stmt, &cte_results)?;
121
122        #[cfg(feature = "profile-q6")]
123        {
124            let _total_execute = execute_start.elapsed();
125        }
126
127        Ok(result)
128    }
129
130    /// Execute a SELECT statement and return an iterator over results
131    ///
132    /// This enables early termination when the full result set is not needed,
133    /// such as for IN subqueries where we stop after finding the first match.
134    ///
135    /// # Phase 1 Implementation (Early Termination for IN subqueries)
136    ///
137    /// Current implementation materializes results then returns an iterator.
138    /// This still enables early termination in the consumer (e.g., eval_in_subquery)
139    /// by stopping iteration when a match is found.
140    ///
141    /// Future optimization: Leverage the existing RowIterator infrastructure
142    /// (crate::select::iterator) for truly lazy evaluation that stops execution
143    /// early, not just iteration.
144    pub fn execute_iter(
145        &self,
146        stmt: &vibesql_ast::SelectStmt,
147    ) -> Result<impl Iterator<Item = vibesql_storage::Row>, ExecutorError> {
148        // For Phase 1, materialize then return iterator
149        // This still enables early termination in the consumer
150        let rows = self.execute(stmt)?;
151        Ok(rows.into_iter())
152    }
153
154    /// Execute a SELECT statement and return both columns and rows
155    pub fn execute_with_columns(
156        &self,
157        stmt: &vibesql_ast::SelectStmt,
158    ) -> Result<SelectResult, ExecutorError> {
159        // First, get the FROM result to access the schema
160        let from_result = if let Some(from_clause) = &stmt.from {
161            let mut cte_results = if let Some(with_clause) = &stmt.with_clause {
162                execute_ctes(with_clause, |query, cte_ctx| self.execute_with_ctes(query, cte_ctx))?
163            } else {
164                HashMap::new()
165            };
166            // If we have access to outer query's CTEs (for subqueries/derived tables), merge them in
167            // Local CTEs take precedence over outer CTEs if there are name conflicts
168            // This is critical for queries like TPC-DS Q2 where CTEs are referenced from derived tables
169            if let Some(outer_cte_ctx) = self.cte_context {
170                for (name, result) in outer_cte_ctx {
171                    cte_results.entry(name.clone()).or_insert_with(|| result.clone());
172                }
173            }
174            // Pass WHERE, ORDER BY, and LIMIT for optimizations
175            // This is critical for GROUP BY queries to avoid CROSS JOINs
176            // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
177            Some(self.execute_from_with_where(
178                from_clause,
179                &cte_results,
180                stmt.where_clause.as_ref(),
181                stmt.order_by.as_deref(),
182                stmt.limit,
183            )?)
184        } else {
185            None
186        };
187
188        // Derive column names from the SELECT list
189        let columns = self.derive_column_names(&stmt.select_list, from_result.as_ref())?;
190
191        // Execute the query to get rows
192        let rows = self.execute(stmt)?;
193
194        Ok(SelectResult { columns, rows })
195    }
196
197    /// Execute SELECT statement with CTE context
198    ///
199    /// Uses unified strategy selection to determine the optimal execution path:
200    /// - NativeColumnar: Zero-copy SIMD execution from columnar storage
201    /// - StandardColumnar: SIMD execution with row-to-batch conversion
202    /// - RowOriented: Traditional row-by-row execution
203    /// - ExpressionOnly: SELECT without FROM clause (special case)
204    ///
205    /// ## Pipeline-Based Execution (Phase 5)
206    ///
207    /// This method uses the `ExecutionPipeline` trait to provide a unified interface
208    /// for query execution. Each strategy creates an appropriate pipeline that
209    /// implements filter, projection, aggregation, and limit/offset operations.
210    ///
211    /// ```text
212    /// Strategy Selection → Create Pipeline → Execute via Trait Methods
213    ///                              ↓
214    ///   NativeColumnar  → NativeColumnarPipeline::apply_*()
215    ///   StandardColumnar → ColumnarPipeline::apply_*()
216    ///   RowOriented     → RowOrientedPipeline::apply_*()
217    ///   ExpressionOnly  → Special case (no table scan)
218    /// ```
219    pub(super) fn execute_with_ctes(
220        &self,
221        stmt: &vibesql_ast::SelectStmt,
222        cte_results: &HashMap<String, CteResult>,
223    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
224        #[cfg(feature = "profile-q6")]
225        let _execute_ctes_start = std::time::Instant::now();
226
227        // Check if native columnar is enabled via feature flag or env var
228        let native_columnar_enabled =
229            cfg!(feature = "native-columnar") || std::env::var("VIBESQL_NATIVE_COLUMNAR").is_ok();
230
231        // Use unified strategy selection for the execution path
232        let strategy_ctx = StrategyContext::new(stmt, cte_results, native_columnar_enabled);
233        let strategy = choose_execution_strategy(&strategy_ctx);
234
235        log::debug!(
236            "Execution strategy selected: {} (reason: {})",
237            strategy.name(),
238            strategy.score().reason
239        );
240
241        #[cfg(feature = "profile-q6")]
242        eprintln!(
243            "[PROFILE-Q6] Execution strategy: {} ({})",
244            strategy.name(),
245            strategy.score().reason
246        );
247
248        // Dispatch based on selected strategy using ExecutionPipeline trait
249        // Pipeline execution returns Option<Vec<Row>> - None means fallback needed
250        let mut results = match strategy {
251            ExecutionStrategy::NativeColumnar { .. } => {
252                // First try the optimized zero-copy native columnar path
253                // This uses ColumnarBatch::from_storage_columnar() for zero-copy conversion
254                // and executes filter+aggregate in a single pass without row materialization
255                if let Some(result) = self.try_native_columnar_execution(stmt, cte_results)? {
256                    #[cfg(feature = "profile-q6")]
257                    eprintln!("[PROFILE-Q6] Native columnar: zero-copy path succeeded");
258                    result
259                } else {
260                    // Fall back to pipeline-based execution if zero-copy path is not applicable
261                    // (e.g., complex predicates, multiple tables, unsupported aggregates)
262                    log::debug!("Native columnar: zero-copy path not applicable, trying pipeline");
263                    match self.execute_via_pipeline(
264                        stmt,
265                        cte_results,
266                        NativeColumnarPipeline::new,
267                        "NativeColumnar",
268                    )? {
269                        Some(result) => result,
270                        None => {
271                            // Fall back to row-oriented if pipeline also fails
272                            log::debug!("Native columnar runtime fallback to row-oriented");
273                            #[cfg(feature = "profile-q6")]
274                            eprintln!("[PROFILE-Q6] Native columnar fallback to row-oriented");
275                            self.execute_row_oriented(stmt, cte_results)?
276                        }
277                    }
278                }
279            }
280
281            ExecutionStrategy::StandardColumnar { .. } => {
282                // StandardColumnar uses the pipeline-based execution path
283                // Note: We don't use try_native_columnar_execution here because row tables
284                // go through the pipeline which correctly handles all data types including dates.
285                // The native columnar zero-copy path has known limitations with certain date comparisons.
286                match self.execute_via_pipeline(
287                    stmt,
288                    cte_results,
289                    ColumnarPipeline::new,
290                    "StandardColumnar",
291                )? {
292                    Some(result) => result,
293                    None => {
294                        log::debug!("Standard columnar runtime fallback to row-oriented");
295                        #[cfg(feature = "profile-q6")]
296                        eprintln!("[PROFILE-Q6] Standard columnar fallback to row-oriented");
297                        self.execute_row_oriented(stmt, cte_results)?
298                    }
299                }
300            }
301
302            ExecutionStrategy::RowOriented { .. } => {
303                // Row-oriented uses the traditional path which has full feature support
304                // The RowOrientedPipeline is used for simpler queries, but complex
305                // queries (with JOINs, window functions, DISTINCT, etc.) need the
306                // full execute_row_oriented implementation
307
308                // Phase 4: Try columnar join execution for multi-table JOIN queries (#2943)
309                // This provides 3-5x speedup for TPC-H Q3 style queries
310                let has_joins = stmt.from.as_ref().is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
311                if has_joins {
312                    if let Some(result) = self.try_columnar_join_execution(stmt, cte_results)? {
313                        log::info!("Columnar join execution succeeded");
314                        result
315                    } else {
316                        log::debug!("Columnar join execution not applicable, falling back to row-oriented");
317                        self.execute_row_oriented(stmt, cte_results)?
318                    }
319                } else {
320                    self.execute_row_oriented(stmt, cte_results)?
321                }
322            }
323
324            ExecutionStrategy::ExpressionOnly { .. } => {
325                // SELECT without FROM - special case that doesn't use pipelines
326                // May still have aggregates (e.g., SELECT COUNT(*), SELECT MAX(1))
327                return self.execute_expression_only(stmt, cte_results);
328            }
329        };
330
331        // Handle set operations (UNION, INTERSECT, EXCEPT)
332        // Process operations left-to-right to ensure correct associativity
333        if let Some(set_op) = &stmt.set_operation {
334            results = self.execute_set_operations(results, set_op, cte_results)?;
335
336            // Apply LIMIT/OFFSET to the final result (after all set operations)
337            // For queries WITHOUT set operations, LIMIT/OFFSET is already applied
338            // in execute_without_aggregation() or execute_with_aggregation()
339            results = apply_limit_offset(results, stmt.limit, stmt.offset);
340        }
341
342        Ok(results)
343    }
344
345    /// Execute SELECT without FROM clause (ExpressionOnly strategy)
346    ///
347    /// This is a special case that doesn't use the pipeline trait since there's
348    /// no table scan involved. Handles both simple expressions and aggregates.
349    fn execute_expression_only(
350        &self,
351        stmt: &vibesql_ast::SelectStmt,
352        cte_results: &HashMap<String, CteResult>,
353    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
354        let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
355
356        if has_aggregates {
357            // Aggregates without FROM need the aggregation path
358            self.execute_with_aggregation(stmt, cte_results)
359        } else {
360            // Simple expression evaluation (e.g., SELECT 1 + 1)
361            self.execute_select_without_from(stmt)
362        }
363    }
364
365    /// Execute a query using the specified execution pipeline
366    ///
367    /// This method provides a unified interface for pipeline-based execution.
368    /// It creates the pipeline, prepares input, and executes the pipeline stages.
369    ///
370    /// Returns `Ok(Some(results))` if the pipeline executed successfully,
371    /// `Ok(None)` if the pipeline cannot handle the query (fallback needed),
372    /// or `Err` if an error occurred.
373    ///
374    /// # Type Parameters
375    ///
376    /// * `P` - The pipeline type (must implement `ExecutionPipeline`)
377    /// * `F` - Factory function to create the pipeline
378    fn execute_via_pipeline<P, F>(
379        &self,
380        stmt: &vibesql_ast::SelectStmt,
381        cte_results: &HashMap<String, CteResult>,
382        create_pipeline: F,
383        strategy_name: &str,
384    ) -> Result<Option<Vec<vibesql_storage::Row>>, ExecutorError>
385    where
386        P: ExecutionPipeline,
387        F: FnOnce() -> P,
388    {
389        #[cfg(feature = "profile-q6")]
390        let start = std::time::Instant::now();
391
392        // Check query complexity - pipelines don't support all features
393        let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
394        let has_group_by = stmt.group_by.is_some();
395        let has_joins = stmt.from.as_ref().is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
396        let has_order_by = stmt.order_by.is_some();
397        let has_distinct = stmt.distinct;
398        let has_set_ops = stmt.set_operation.is_some();
399        let has_window_funcs = self.has_window_functions(&stmt.select_list);
400        let has_distinct_aggregates = self.has_distinct_aggregates(&stmt.select_list);
401
402        // Create the pipeline
403        let pipeline = create_pipeline();
404
405        // Check if the pipeline supports this query pattern
406        if !pipeline.supports_query_pattern(has_aggregates, has_group_by, has_joins) {
407            log::debug!(
408                "{} pipeline doesn't support query pattern (agg={}, group_by={}, joins={})",
409                strategy_name,
410                has_aggregates,
411                has_group_by,
412                has_joins
413            );
414            return Ok(None);
415        }
416
417        // For complex queries (ORDER BY, DISTINCT, window functions, set ops, DISTINCT aggregates),
418        // fall back to full execution paths which have complete support
419        if has_order_by || has_distinct || has_window_funcs || has_set_ops || has_distinct_aggregates {
420            log::debug!(
421                "{} pipeline doesn't support complex features (order_by={}, distinct={}, window={}, set_ops={}, distinct_agg={})",
422                strategy_name,
423                has_order_by,
424                has_distinct,
425                has_window_funcs,
426                has_set_ops,
427                has_distinct_aggregates
428            );
429            return Ok(None);
430        }
431
432        // Must have a FROM clause for pipeline execution
433        let from_clause = match &stmt.from {
434            Some(from) => from,
435            None => return Ok(None),
436        };
437
438        // Execute FROM clause to get input data
439        // Note: WHERE, ORDER BY, and LIMIT are handled by the pipeline, not here
440        let from_result = self.execute_from_with_where(
441            from_clause,
442            cte_results,
443            None, // Pipeline will apply WHERE filter
444            None, // ORDER BY handled separately
445            None, // LIMIT applied after pipeline
446        )?;
447
448        // Build execution context
449        let mut exec_ctx = ExecutionContext::new(&from_result.schema, self.database);
450        // Add outer context for correlated subqueries (#2998)
451        if let (Some(outer_row), Some(outer_schema)) = (self.outer_row, self.outer_schema) {
452            exec_ctx = exec_ctx.with_outer_context(outer_row, outer_schema);
453        }
454        // Add CTE context if available
455        if !cte_results.is_empty() {
456            exec_ctx = exec_ctx.with_cte_context(cte_results);
457        }
458
459        // Validate column references BEFORE processing
460        super::validation::validate_select_columns_with_context(
461            &stmt.select_list,
462            stmt.where_clause.as_ref(),
463            &from_result.schema,
464            self.procedural_context,
465            self.outer_schema,
466        )?;
467
468        // Prepare input from FROM result
469        let input = PipelineInput::from_rows_owned(from_result.data.into_rows());
470
471        // Execute pipeline stages with fallback on error
472        // If any pipeline stage fails with UnsupportedFeature, fall back to row-oriented
473
474        // Stage 1: Filter (WHERE clause)
475        let filtered = match pipeline.apply_filter(input, stmt.where_clause.as_ref(), &exec_ctx) {
476            Ok(result) => result,
477            Err(ExecutorError::UnsupportedFeature(_)) | Err(ExecutorError::UnsupportedExpression(_)) => {
478                log::debug!("{} pipeline filter failed, falling back", strategy_name);
479                return Ok(None);
480            }
481            Err(e) => return Err(e),
482        };
483
484        // Stage 2: Projection or Aggregation
485        let result = if has_aggregates || has_group_by {
486            // Execute aggregation (includes projection)
487            // Get GROUP BY expressions if present (as slice)
488            let group_by_slice: Option<&[vibesql_ast::Expression]> =
489                stmt.group_by.as_ref().and_then(|g| g.as_simple()).map(|v| v.as_slice());
490            match pipeline.apply_aggregation(
491                filtered.into_input(),
492                &stmt.select_list,
493                group_by_slice,
494                stmt.having.as_ref(),
495                &exec_ctx,
496            ) {
497                Ok(result) => result,
498                Err(ExecutorError::UnsupportedFeature(_)) | Err(ExecutorError::UnsupportedExpression(_)) => {
499                    log::debug!("{} pipeline aggregation failed, falling back", strategy_name);
500                    return Ok(None);
501                }
502                Err(e) => return Err(e),
503            }
504        } else {
505            // Execute projection only
506            match pipeline.apply_projection(filtered.into_input(), &stmt.select_list, &exec_ctx) {
507                Ok(result) => result,
508                Err(ExecutorError::UnsupportedFeature(_)) | Err(ExecutorError::UnsupportedExpression(_)) => {
509                    log::debug!("{} pipeline projection failed, falling back", strategy_name);
510                    return Ok(None);
511                }
512                Err(e) => return Err(e),
513            }
514        };
515
516        // Stage 3: Limit/Offset (convert usize to u64)
517        let limit_u64 = stmt.limit.map(|l| l as u64);
518        let offset_u64 = stmt.offset.map(|o| o as u64);
519        let final_result = pipeline.apply_limit_offset(result, limit_u64, offset_u64)?;
520
521        #[cfg(feature = "profile-q6")]
522        {
523            eprintln!(
524                "[PROFILE-Q6] ✓ {} pipeline execution: {:?}",
525                strategy_name,
526                start.elapsed()
527            );
528        }
529
530        log::debug!("✓ {} pipeline execution succeeded", strategy_name);
531        Ok(Some(final_result))
532    }
533
534    /// Check if the select list contains window functions
535    fn has_window_functions(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
536        select_list.iter().any(|item| {
537            if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
538                self.expr_has_window_function(expr)
539            } else {
540                false
541            }
542        })
543    }
544
545    /// Recursively check if an expression contains a window function
546    #[allow(clippy::only_used_in_recursion)]
547    fn expr_has_window_function(&self, expr: &vibesql_ast::Expression) -> bool {
548        match expr {
549            vibesql_ast::Expression::WindowFunction { .. } => true,
550            vibesql_ast::Expression::BinaryOp { left, right, .. } => {
551                self.expr_has_window_function(left) || self.expr_has_window_function(right)
552            }
553            vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_window_function(expr),
554            vibesql_ast::Expression::Function { args, .. } => {
555                args.iter().any(|arg| self.expr_has_window_function(arg))
556            }
557            vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
558                operand.as_ref().is_some_and(|e| self.expr_has_window_function(e))
559                    || when_clauses.iter().any(|case_when| {
560                        case_when.conditions.iter().any(|c| self.expr_has_window_function(c))
561                            || self.expr_has_window_function(&case_when.result)
562                    })
563                    || else_result.as_ref().is_some_and(|e| self.expr_has_window_function(e))
564            }
565            _ => false,
566        }
567    }
568
569    /// Check if the select list contains any DISTINCT aggregates (e.g., COUNT(DISTINCT x))
570    fn has_distinct_aggregates(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
571        select_list.iter().any(|item| {
572            if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
573                self.expr_has_distinct_aggregate(expr)
574            } else {
575                false
576            }
577        })
578    }
579
580    /// Recursively check if an expression contains a DISTINCT aggregate
581    #[allow(clippy::only_used_in_recursion)]
582    fn expr_has_distinct_aggregate(&self, expr: &vibesql_ast::Expression) -> bool {
583        match expr {
584            vibesql_ast::Expression::AggregateFunction { distinct, .. } => *distinct,
585            vibesql_ast::Expression::BinaryOp { left, right, .. } => {
586                self.expr_has_distinct_aggregate(left) || self.expr_has_distinct_aggregate(right)
587            }
588            vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_distinct_aggregate(expr),
589            vibesql_ast::Expression::Function { args, .. } => {
590                args.iter().any(|arg| self.expr_has_distinct_aggregate(arg))
591            }
592            vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
593                operand.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
594                    || when_clauses.iter().any(|case_when| {
595                        case_when.conditions.iter().any(|c| self.expr_has_distinct_aggregate(c))
596                            || self.expr_has_distinct_aggregate(&case_when.result)
597                    })
598                    || else_result.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
599            }
600            _ => false,
601        }
602    }
603
604    /// Execute using traditional row-oriented path
605    ///
606    /// This is the fallback path when columnar execution is not available or not beneficial.
607    fn execute_row_oriented(
608        &self,
609        stmt: &vibesql_ast::SelectStmt,
610        cte_results: &HashMap<String, CteResult>,
611    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
612        let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
613        let has_group_by = stmt.group_by.is_some();
614
615        if has_aggregates || has_group_by {
616            self.execute_with_aggregation(stmt, cte_results)
617        } else if let Some(from_clause) = &stmt.from {
618            // Re-enabled predicate pushdown for all queries (issue #1902)
619            //
620            // Previously, predicate pushdown was selectively disabled for multi-column IN clauses
621            // because index optimization happened in execute_without_aggregation() on row indices
622            // from the FROM result. When predicate pushdown filtered rows early, the indices no
623            // longer matched the original table, causing incorrect results.
624            //
625            // Now that all index optimization has been moved to the scan level (execute_index_scan),
626            // it happens BEFORE predicate pushdown, avoiding the row-index mismatch problem.
627            // This allows predicate pushdown to work correctly for all queries, improving performance.
628            //
629            // Fixes issues #1807, #1895, #1896, and #1902.
630
631            // Pass WHERE, ORDER BY, and LIMIT to execute_from for optimization
632            // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
633            let from_result = self.execute_from_with_where(
634                from_clause,
635                cte_results,
636                stmt.where_clause.as_ref(),
637                stmt.order_by.as_deref(),
638                stmt.limit,
639            )?;
640
641            // Validate column references BEFORE processing rows (issue #2654)
642            // This ensures column errors are caught even when tables are empty
643            // Pass procedural context to allow procedure variables in WHERE clause
644            // Pass outer_schema for correlated subqueries (#2694)
645            super::validation::validate_select_columns_with_context(
646                &stmt.select_list,
647                stmt.where_clause.as_ref(),
648                &from_result.schema,
649                self.procedural_context,
650                self.outer_schema,
651            )?;
652
653            self.execute_without_aggregation(stmt, from_result, cte_results)
654        } else {
655            // SELECT without FROM - evaluate expressions as a single row
656            self.execute_select_without_from(stmt)
657        }
658    }
659
660    /// Execute a chain of set operations left-to-right
661    ///
662    /// SQL set operations are left-associative, so:
663    /// A EXCEPT B EXCEPT C should evaluate as (A EXCEPT B) EXCEPT C
664    ///
665    /// The parser creates a right-recursive AST structure, but we need to execute left-to-right.
666    fn execute_set_operations(
667        &self,
668        mut left_results: Vec<vibesql_storage::Row>,
669        set_op: &vibesql_ast::SetOperation,
670        cte_results: &HashMap<String, CteResult>,
671    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
672        // Execute the immediate right query WITHOUT its set operations
673        // This prevents right-recursive evaluation
674        let right_stmt = &set_op.right;
675        let has_aggregates = self.has_aggregates(&right_stmt.select_list) || right_stmt.having.is_some();
676        let has_group_by = right_stmt.group_by.is_some();
677
678        let right_results = if has_aggregates || has_group_by {
679            self.execute_with_aggregation(right_stmt, cte_results)?
680        } else if let Some(from_clause) = &right_stmt.from {
681            // Note: LIMIT is None for set operation sides - it's applied after the set operation
682            let from_result =
683                self.execute_from_with_where(from_clause, cte_results, right_stmt.where_clause.as_ref(), right_stmt.order_by.as_deref(), None)?;
684            self.execute_without_aggregation(right_stmt, from_result, cte_results)?
685        } else {
686            self.execute_select_without_from(right_stmt)?
687        };
688
689        // Track memory for right result before set operation
690        let right_size = estimate_result_size(&right_results);
691        self.track_memory_allocation(right_size)?;
692
693        // Apply the current operation
694        left_results = apply_set_operation(left_results, right_results, set_op)?;
695
696        // Track memory for combined result after set operation
697        let combined_size = estimate_result_size(&left_results);
698        self.track_memory_allocation(combined_size)?;
699
700        // If the right side has more set operations, continue processing them
701        // This creates the left-to-right evaluation: ((A op B) op C) op D
702        if let Some(next_set_op) = &right_stmt.set_operation {
703            left_results = self.execute_set_operations(left_results, next_set_op, cte_results)?;
704        }
705
706        Ok(left_results)
707    }
708
709    /// Execute a FROM clause with WHERE, ORDER BY, and LIMIT for optimization
710    ///
711    /// The LIMIT parameter enables early termination optimization (#3253):
712    /// - When ORDER BY is satisfied by an index and no post-filter is needed,
713    ///   the index scan can stop after fetching LIMIT rows
714    pub(super) fn execute_from_with_where(
715        &self,
716        from: &vibesql_ast::FromClause,
717        cte_results: &HashMap<String, CteResult>,
718        where_clause: Option<&vibesql_ast::Expression>,
719        order_by: Option<&[vibesql_ast::OrderByItem]>,
720        limit: Option<usize>,
721    ) -> Result<FromResult, ExecutorError> {
722        use crate::select::scan::execute_from_clause;
723        let from_result = execute_from_clause(from, cte_results, self.database, where_clause, order_by, limit, self.outer_row, self.outer_schema, |query| {
724            // For derived table subqueries, create a child executor with CTE context
725            // This allows CTEs from the outer WITH clause to be referenced in subqueries
726            // Critical for queries like TPC-DS Q2 where CTEs are used in FROM subqueries
727            if !cte_results.is_empty() {
728                let child = SelectExecutor::new_with_cte_and_depth(
729                    self.database,
730                    cte_results,
731                    self.subquery_depth,
732                );
733                child.execute_with_columns(query)
734            } else {
735                self.execute_with_columns(query)
736            }
737        })?;
738
739        // NOTE: We DON'T merge outer schema with from_result.schema here because:
740        // 1. from_result.rows only contain values from inner tables
741        // 2. Outer columns are resolved via the evaluator's outer_row/outer_schema
742        // 3. Merging would create schema/row mismatch (schema has outer cols, rows don't)
743
744        Ok(from_result)
745    }
746
747}