vibesql_executor/select/executor/
execute.rs

1//! Main execution methods for SelectExecutor
2//!
3//! This module implements the unified execution dispatcher that routes queries
4//! to the appropriate execution pipeline based on the selected strategy.
5//!
6//! ## Execution Pipeline Architecture
7//!
8//! The dispatcher uses the `ExecutionPipeline` trait to provide a unified interface
9//! for query execution across different strategies:
10//!
11//! - **NativeColumnar**: Zero-copy SIMD execution from columnar storage
12//! - **StandardColumnar**: SIMD execution with row-to-batch conversion
13//! - **RowOriented**: Traditional row-by-row execution
14//! - **ExpressionOnly**: SELECT without FROM clause (special case)
15//!
16//! ```text
17//! Strategy Selection → Create Pipeline → Execute Pipeline Stages → Results
18//!                          ↓
19//!          apply_filter → apply_projection → apply_aggregation → apply_limit_offset
20//! ```
21
22use std::collections::HashMap;
23
24use super::builder::SelectExecutor;
25use crate::{
26    errors::ExecutorError,
27    optimizer::adaptive::{choose_execution_strategy, ExecutionStrategy, StrategyContext},
28    pipeline::{
29        ColumnarPipeline, ExecutionContext, ExecutionPipeline, NativeColumnarPipeline,
30        PipelineInput,
31    },
32    select::{
33        cte::{execute_ctes, execute_ctes_with_memory_check, CteResult},
34        helpers::apply_limit_offset,
35        join::FromResult,
36        SelectResult,
37    },
38};
39
40impl SelectExecutor<'_> {
41    /// Execute a SELECT statement
42    pub fn execute(
43        &self,
44        stmt: &vibesql_ast::SelectStmt,
45    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
46        // Validate aggregate function argument counts FIRST (issue #4367)
47        // This catches errors like max(), min(*), sum(*) before any execution
48        // Must happen before any fast paths or strategy selection
49        super::validation::validate_aggregate_arguments(&stmt.select_list)?;
50
51        // Validate no nested aggregates (issue #4439)
52        // This catches errors like SUM(MIN(x)) before any execution
53        // Uses "misuse of aggregate function X()" format to match SQLite's resolve.c
54        super::validation::validate_no_nested_aggregates(&stmt.select_list)?;
55
56        // Validate join table limit (issue #4711)
57        // SQLite enforces a limit of 64 tables in a single join
58        // This catches queries like SELECT * FROM t, t, t, ... (65+ times)
59        super::validation::validate_join_table_limit(stmt)?;
60
61        // Validate aggregates with outer column references in subqueries (issue #4853)
62        // This catches the specific case where a scalar subquery appears as an argument
63        // to an outer aggregate function, and the subquery's aggregate references an
64        // outer column. Example: SELECT max((SELECT count(x) FROM t35b)) FROM t35a;
65        // Note: Standalone correlated subqueries are valid and allowed, e.g.,
66        // SELECT (SELECT count(x) FROM t35b) FROM t35a; is valid in SQLite.
67        super::validation::validate_aggregate_subquery_outer_refs(stmt, self.database)?;
68
69        #[cfg(feature = "profile-q6")]
70        let execute_start = std::time::Instant::now();
71
72        // Reset arena and clear pointer-based caches for fresh query execution (only at top level)
73        // The subquery hash cache uses pointer-based keys which become invalid when ASTs are
74        // dropped and memory is reused, so we must clear it between queries.
75        // Note: The IN subquery result cache and correlation cache use content hashes as keys,
76        // not pointers, so they can safely persist across queries.
77        if self.subquery_depth == 0 {
78            self.reset_arena();
79            crate::evaluator::caching::clear_subquery_hash_cache();
80        }
81
82        // Check timeout before starting execution
83        self.check_timeout()?;
84
85        // Check subquery depth limit to prevent stack overflow
86        if self.subquery_depth >= crate::limits::MAX_EXPRESSION_DEPTH {
87            return Err(ExecutorError::ExpressionDepthExceeded {
88                depth: self.subquery_depth,
89                max_depth: crate::limits::MAX_EXPRESSION_DEPTH,
90            });
91        }
92
93        // Fast path for simple point-lookup queries (TPC-C optimization)
94        // This bypasses expensive optimizer passes for queries like:
95        // SELECT col FROM table WHERE pk = value
96        // Skip fast path if reverse_unordered_selects is ON and no ORDER BY (needs reversal)
97        let skip_fast_path_for_reversal =
98            self.database.reverse_unordered_selects() && stmt.order_by.is_none();
99        if self.subquery_depth == 0
100            && self.outer_row.is_none()
101            && self.cte_context.is_none()
102            && !skip_fast_path_for_reversal
103            && super::fast_path::is_simple_point_query(stmt)
104        {
105            return self.execute_fast_path(stmt);
106        }
107
108        // Streaming aggregate fast path (#3815)
109        // For queries like: SELECT SUM(k) FROM sbtest1 WHERE id BETWEEN ? AND ?
110        // Accumulates aggregates inline during PK range scan without materializing rows
111        if self.subquery_depth == 0
112            && self.outer_row.is_none()
113            && self.cte_context.is_none()
114            && !skip_fast_path_for_reversal
115            && super::fast_path::is_streaming_aggregate_query(stmt)
116        {
117            if let Ok(result) = self.execute_streaming_aggregate(stmt) {
118                return Ok(result);
119            }
120            // Fall through to standard path if streaming aggregate fails
121        }
122
123        #[cfg(feature = "profile-q6")]
124        let _setup_time = execute_start.elapsed();
125
126        // Apply subquery rewriting optimizations (Phase 2 of IN subquery optimization)
127        // - Rewrites correlated IN → EXISTS with LIMIT 1 for early termination
128        // - Adds DISTINCT to uncorrelated IN subqueries to reduce duplicate processing
129        // This works in conjunction with Phase 1 (HashSet optimization, #2136)
130        #[cfg(feature = "profile-q6")]
131        let optimizer_start = std::time::Instant::now();
132
133        let optimized_stmt = crate::optimizer::rewrite_subquery_optimizations(stmt);
134
135        // Apply scalar subquery decorrelation (#4760)
136        // Transforms correlated scalar subqueries with aggregates (e.g., AVG, SUM, MIN)
137        // into CTE + JOIN patterns for O(n) instead of O(n²) execution.
138        // Example: WHERE x > 1.2 * (SELECT AVG(y) FROM t WHERE t.c = outer.c)
139        // Becomes: WITH _cte AS (SELECT c, AVG(y) FROM t GROUP BY c)
140        //          ... JOIN _cte ON outer.c = _cte.c WHERE x > 1.2 * _cte._avg
141        let optimized_stmt = crate::optimizer::apply_scalar_decorrelation(&optimized_stmt);
142
143        #[cfg(feature = "profile-q6")]
144        let _optimizer_time = optimizer_start.elapsed();
145
146        // Eliminate unused tables that create unnecessary cross joins (#3556)
147        // Must run BEFORE semi-join transformation to avoid complex interactions
148        // with derived tables from EXISTS/IN transformations
149        let optimized_stmt = crate::optimizer::eliminate_unused_tables(&optimized_stmt);
150
151        // Transform decorrelated IN/EXISTS subqueries to semi/anti-joins (#2424)
152        // This enables hash-based join execution instead of row-by-row subquery evaluation
153        // Converts WHERE clauses like "WHERE x IN (SELECT y FROM t)" to "SEMI JOIN t ON x = y"
154        let optimized_stmt = crate::optimizer::transform_subqueries_to_joins(&optimized_stmt);
155
156        // Execute CTEs if present and merge with outer query's CTE context
157        let mut cte_results = if let Some(with_clause) = &optimized_stmt.with_clause {
158            // This query has its own CTEs - execute them with memory tracking
159            execute_ctes_with_memory_check(
160                with_clause,
161                |query, cte_ctx| self.execute_with_ctes(query, cte_ctx),
162                |size| self.track_memory_allocation(size),
163            )?
164        } else {
165            HashMap::new()
166        };
167
168        // If we have access to outer query's CTEs (for subqueries), merge them in
169        // Local CTEs take precedence over outer CTEs if there are name conflicts
170        if let Some(outer_cte_ctx) = self.cte_context {
171            for (name, result) in outer_cte_ctx {
172                cte_results.entry(name.clone()).or_insert_with(|| result.clone());
173            }
174        }
175
176        #[cfg(feature = "profile-q6")]
177        let _pre_execute_time = execute_start.elapsed();
178
179        // Execute the main query with CTE context
180        let mut result = self.execute_with_ctes(&optimized_stmt, &cte_results)?;
181
182        #[cfg(feature = "profile-q6")]
183        {
184            let _total_execute = execute_start.elapsed();
185        }
186
187        // Apply PRAGMA reverse_unordered_selects if enabled
188        // Only reverse if there's no ORDER BY clause in the original statement
189        if stmt.order_by.is_none() && self.database.reverse_unordered_selects() {
190            result.reverse();
191        }
192
193        Ok(result)
194    }
195
196    /// Execute a SELECT statement and return an iterator over results
197    ///
198    /// This enables early termination when the full result set is not needed,
199    /// such as for IN subqueries where we stop after finding the first match.
200    ///
201    /// # Phase 1 Implementation (Early Termination for IN subqueries)
202    ///
203    /// Current implementation materializes results then returns an iterator.
204    /// This still enables early termination in the consumer (e.g., eval_in_subquery)
205    /// by stopping iteration when a match is found.
206    ///
207    /// Future optimization: Leverage the existing RowIterator infrastructure
208    /// (crate::select::iterator) for truly lazy evaluation that stops execution
209    /// early, not just iteration.
210    pub fn execute_iter(
211        &self,
212        stmt: &vibesql_ast::SelectStmt,
213    ) -> Result<impl Iterator<Item = vibesql_storage::Row>, ExecutorError> {
214        // For Phase 1, materialize then return iterator
215        // This still enables early termination in the consumer
216        let rows = self.execute(stmt)?;
217        Ok(rows.into_iter())
218    }
219
220    /// Execute a SELECT statement using the fast path directly
221    ///
222    /// This method is used by prepared statements with cached SimpleFastPath plans.
223    /// It bypasses the `is_simple_point_query()` check because the eligibility was
224    /// already determined at prepare time.
225    ///
226    /// # Performance
227    ///
228    /// For repeated execution of prepared statements, this saves the cost of
229    /// re-checking fast path eligibility on every execution (~5-10µs per query).
230    pub fn execute_fast_path_with_columns(
231        &self,
232        stmt: &vibesql_ast::SelectStmt,
233    ) -> Result<SelectResult, ExecutorError> {
234        // Reset arena for fresh query execution
235        if self.subquery_depth == 0 {
236            self.reset_arena();
237        }
238
239        // Check timeout before starting execution
240        self.check_timeout()?;
241
242        // Execute via fast path directly (skip is_simple_point_query check)
243        let rows = self.execute_fast_path(stmt)?;
244
245        // Derive column names from the SELECT list
246        // For fast path queries, we don't have a FromResult, so pass None
247        // The column derivation will use the SELECT list expressions directly
248        let columns = self.derive_fast_path_column_names(stmt)?;
249
250        Ok(SelectResult { columns, rows })
251    }
252
253    /// Derive column names for fast path execution
254    ///
255    /// For fast path queries, we derive column names directly from the SELECT list
256    /// and table schema without going through the full FROM clause execution.
257    ///
258    /// # Performance Note (#3780)
259    ///
260    /// This method is called by `Session::execute_prepared()` to cache column names
261    /// in `SimpleFastPathPlan`. After the first execution, cached column names are
262    /// reused to avoid repeated table lookups and column name derivation.
263    pub fn derive_fast_path_column_names(
264        &self,
265        stmt: &vibesql_ast::SelectStmt,
266    ) -> Result<Vec<String>, ExecutorError> {
267        use vibesql_ast::{FromClause, SelectItem};
268
269        // Get table name and schema for column resolution
270        let (table_name, table_alias) = match &stmt.from {
271            Some(FromClause::Table { name, alias, .. }) => (name.as_str(), alias.as_deref()),
272            _ => {
273                return Err(ExecutorError::Other(
274                    "Fast path requires simple table FROM clause".to_string(),
275                ))
276            }
277        };
278
279        let table = self
280            .database
281            .get_table(table_name)
282            .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
283
284        let mut columns = Vec::with_capacity(stmt.select_list.len());
285
286        for item in &stmt.select_list {
287            match item {
288                SelectItem::Wildcard { .. } => {
289                    // Add all columns from the table
290                    for col in &table.schema.columns {
291                        columns.push(col.name.clone());
292                    }
293                }
294                SelectItem::QualifiedWildcard { qualifier, .. } => {
295                    // Check if qualifier matches table name or alias
296                    let effective_name = table_alias.unwrap_or(table_name);
297                    if qualifier.eq_ignore_ascii_case(effective_name)
298                        || qualifier.eq_ignore_ascii_case(table_name)
299                    {
300                        for col in &table.schema.columns {
301                            columns.push(col.name.clone());
302                        }
303                    }
304                }
305                SelectItem::Expression { expr, alias: col_alias, .. } => {
306                    // Use alias if provided, otherwise derive from expression
307                    let col_name = if let Some(a) = col_alias {
308                        a.clone()
309                    } else {
310                        self.derive_column_name_from_expr(expr)
311                    };
312                    columns.push(col_name);
313                }
314            }
315        }
316
317        Ok(columns)
318    }
319
320    /// Derive a column name from an expression
321    fn derive_column_name_from_expr(&self, expr: &vibesql_ast::Expression) -> String {
322        match expr {
323            vibesql_ast::Expression::ColumnRef(col_id) => col_id.column_canonical().to_string(),
324            vibesql_ast::Expression::Literal(val) => format!("{}", val),
325            vibesql_ast::Expression::Wildcard => "*".to_string(),
326            _ => "?column?".to_string(),
327        }
328    }
329
330    /// Execute a SELECT statement and return both columns and rows
331    pub fn execute_with_columns(
332        &self,
333        stmt: &vibesql_ast::SelectStmt,
334    ) -> Result<SelectResult, ExecutorError> {
335        // Resolve SELECT aliases in WHERE clause BEFORE predicate pushdown (SQLite extension)
336        // This allows queries like: SELECT f1-22 AS x FROM t1 WHERE x > 0
337        // IMPORTANT: Use schema-aware resolution to avoid incorrectly substituting
338        // table column names with aggregate aliases (SQLite behavior)
339        // Example: SELECT COUNT(*) AS col1 FROM tab0 WHERE col1 > 0
340        // Here 'col1' in WHERE refers to the TABLE COLUMN, not the COUNT(*) alias
341        // PERFORMANCE: Skip alias resolution if no aliases exist (common case in OLTP)
342        let resolved_where = if stmt.where_clause.is_some()
343            && crate::select::order::select_list_has_aliases(&stmt.select_list)
344        {
345            stmt.where_clause.as_ref().map(|where_expr| {
346                // Try to build early schema from FROM clause
347                if let Some(from_clause) = &stmt.from {
348                    if let Some(early_schema) =
349                        super::aggregation::build_early_schema(from_clause, self.database)
350                    {
351                        // Use schema-aware resolution
352                        return crate::select::order::resolve_where_aliases_with_schema(
353                            where_expr,
354                            &stmt.select_list,
355                            &early_schema,
356                        );
357                    }
358                }
359                // Fall back to non-schema-aware resolution for complex FROM clauses
360                crate::select::order::resolve_where_aliases(where_expr, &stmt.select_list)
361            })
362        } else {
363            stmt.where_clause.clone()
364        };
365
366        // First, get the FROM result to access the schema
367        let from_result = if let Some(from_clause) = &stmt.from {
368            let mut cte_results = if let Some(with_clause) = &stmt.with_clause {
369                execute_ctes(with_clause, |query, cte_ctx| self.execute_with_ctes(query, cte_ctx))?
370            } else {
371                HashMap::new()
372            };
373            // If we have access to outer query's CTEs (for subqueries/derived tables), merge them
374            // in Local CTEs take precedence over outer CTEs if there are name conflicts
375            // This is critical for queries like TPC-DS Q2 where CTEs are referenced from derived
376            // tables
377            if let Some(outer_cte_ctx) = self.cte_context {
378                for (name, result) in outer_cte_ctx {
379                    cte_results.entry(name.clone()).or_insert_with(|| result.clone());
380                }
381            }
382            // Pass WHERE, ORDER BY, and LIMIT for optimizations
383            // This is critical for GROUP BY queries to avoid CROSS JOINs
384            // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
385            // Pass select_list for table elimination optimization (#3556)
386            let limit_val = stmt
387                .limit
388                .as_ref()
389                .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
390                .transpose()?;
391            Some(self.execute_from_with_where(
392                from_clause,
393                &cte_results,
394                resolved_where.as_ref(),
395                stmt.order_by.as_deref(),
396                limit_val,
397                Some(&stmt.select_list),
398            )?)
399        } else {
400            None
401        };
402
403        // Derive column names from the SELECT list (with table prefix for display)
404        // Issue #4696: For VALUES statements, select_list is empty - derive from VALUES rows
405        let columns = if stmt.select_list.is_empty() {
406            if let Some(values_rows) = &stmt.values {
407                // Generate column names: column1, column2, etc.
408                let num_cols = values_rows.first().map(|r| r.len()).unwrap_or(0);
409                (1..=num_cols).map(|i| format!("column{}", i)).collect()
410            } else {
411                // Empty select_list and no VALUES - return empty columns
412                Vec::new()
413            }
414        } else {
415            self.derive_column_names(&stmt.select_list, from_result.as_ref())?
416        };
417
418        // Execute the query to get rows
419        let rows = self.execute(stmt)?;
420
421        Ok(SelectResult { columns, rows })
422    }
423
424    /// Execute SELECT statement and return results with simple column names
425    ///
426    /// This is similar to `execute_with_columns` but returns column names without
427    /// table prefixes. This is used for internal purposes like view creation
428    /// where the full table.column format would cause column lookup issues.
429    pub fn execute_with_simple_columns(
430        &self,
431        stmt: &vibesql_ast::SelectStmt,
432    ) -> Result<SelectResult, ExecutorError> {
433        // Resolve SELECT aliases in WHERE clause BEFORE predicate pushdown (SQLite extension)
434        // This allows queries like: SELECT f1-22 AS x FROM t1 WHERE x > 0
435        // IMPORTANT: Use schema-aware resolution to avoid incorrectly substituting
436        // table column names with aggregate aliases (SQLite behavior)
437        // PERFORMANCE: Skip alias resolution if no aliases exist (common case in OLTP)
438        let resolved_where = if stmt.where_clause.is_some()
439            && crate::select::order::select_list_has_aliases(&stmt.select_list)
440        {
441            stmt.where_clause.as_ref().map(|where_expr| {
442                // Try to build early schema from FROM clause
443                if let Some(from_clause) = &stmt.from {
444                    if let Some(early_schema) =
445                        super::aggregation::build_early_schema(from_clause, self.database)
446                    {
447                        // Use schema-aware resolution
448                        return crate::select::order::resolve_where_aliases_with_schema(
449                            where_expr,
450                            &stmt.select_list,
451                            &early_schema,
452                        );
453                    }
454                }
455                // Fall back to non-schema-aware resolution for complex FROM clauses
456                crate::select::order::resolve_where_aliases(where_expr, &stmt.select_list)
457            })
458        } else {
459            stmt.where_clause.clone()
460        };
461
462        // Execute the FROM clause to get combined schema
463        let from_result = if let Some(from_clause) = &stmt.from {
464            let mut cte_results = if let Some(with_clause) = &stmt.with_clause {
465                execute_ctes(with_clause, |query, cte_ctx| self.execute_with_ctes(query, cte_ctx))?
466            } else {
467                HashMap::new()
468            };
469            // If we have access to outer query's CTEs (for subqueries/derived tables), merge them
470            if let Some(outer_cte_ctx) = self.cte_context {
471                for (name, result) in outer_cte_ctx {
472                    cte_results.entry(name.clone()).or_insert_with(|| result.clone());
473                }
474            }
475            let limit_val = stmt
476                .limit
477                .as_ref()
478                .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
479                .transpose()?;
480            Some(self.execute_from_with_where(
481                from_clause,
482                &cte_results,
483                resolved_where.as_ref(),
484                stmt.order_by.as_deref(),
485                limit_val,
486                Some(&stmt.select_list),
487            )?)
488        } else {
489            None
490        };
491
492        // Derive column names from the SELECT list (without table prefix)
493        // Issue #4696: For VALUES statements, select_list is empty - derive from VALUES rows
494        let columns = if stmt.select_list.is_empty() {
495            if let Some(values_rows) = &stmt.values {
496                // Generate column names: column1, column2, etc.
497                let num_cols = values_rows.first().map(|r| r.len()).unwrap_or(0);
498                (1..=num_cols).map(|i| format!("column{}", i)).collect()
499            } else {
500                // Empty select_list and no VALUES - return empty columns
501                Vec::new()
502            }
503        } else {
504            self.derive_simple_column_names(&stmt.select_list, from_result.as_ref())?
505        };
506
507        // Execute the query to get rows
508        let rows = self.execute(stmt)?;
509
510        Ok(SelectResult { columns, rows })
511    }
512
513    /// Execute SELECT statement with CTE context
514    ///
515    /// Uses unified strategy selection to determine the optimal execution path:
516    /// - NativeColumnar: Zero-copy SIMD execution from columnar storage
517    /// - StandardColumnar: SIMD execution with row-to-batch conversion
518    /// - RowOriented: Traditional row-by-row execution
519    /// - ExpressionOnly: SELECT without FROM clause (special case)
520    ///
521    /// ## Pipeline-Based Execution (Phase 5)
522    ///
523    /// This method uses the `ExecutionPipeline` trait to provide a unified interface
524    /// for query execution. Each strategy creates an appropriate pipeline that
525    /// implements filter, projection, aggregation, and limit/offset operations.
526    ///
527    /// ```text
528    /// Strategy Selection → Create Pipeline → Execute via Trait Methods
529    ///                              ↓
530    ///   NativeColumnar  → NativeColumnarPipeline::apply_*()
531    ///   StandardColumnar → ColumnarPipeline::apply_*()
532    ///   RowOriented     → RowOrientedPipeline::apply_*()
533    ///   ExpressionOnly  → Special case (no table scan)
534    /// ```
535    pub(super) fn execute_with_ctes(
536        &self,
537        stmt: &vibesql_ast::SelectStmt,
538        cte_results: &HashMap<String, CteResult>,
539    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
540        // Note: Aggregate argument validation is done in execute() at the entry point.
541        // See issue #4367.
542
543        #[cfg(feature = "profile-q6")]
544        let _execute_ctes_start = std::time::Instant::now();
545
546        // Check if native columnar is enabled via feature flag or env var
547        let native_columnar_enabled =
548            cfg!(feature = "native-columnar") || std::env::var("VIBESQL_NATIVE_COLUMNAR").is_ok();
549
550        // Use unified strategy selection for the execution path
551        let strategy_ctx = StrategyContext::new(stmt, cte_results, native_columnar_enabled);
552        let strategy = choose_execution_strategy(&strategy_ctx);
553
554        log::debug!(
555            "Execution strategy selected: {} (reason: {})",
556            strategy.name(),
557            strategy.score().reason
558        );
559
560        #[cfg(feature = "profile-q6")]
561        eprintln!(
562            "[PROFILE-Q6] Execution strategy: {} ({})",
563            strategy.name(),
564            strategy.score().reason
565        );
566
567        // Dispatch based on selected strategy using ExecutionPipeline trait
568        // Pipeline execution returns Option<Vec<Row>> - None means fallback needed
569        let mut results = match strategy {
570            ExecutionStrategy::NativeColumnar { .. } => {
571                // First try the optimized zero-copy native columnar path
572                // This uses ColumnarBatch::from_storage_columnar() for zero-copy conversion
573                // and executes filter+aggregate in a single pass without row materialization
574                if let Some(result) = self.try_native_columnar_execution(stmt, cte_results)? {
575                    #[cfg(feature = "profile-q6")]
576                    eprintln!("[PROFILE-Q6] Native columnar: zero-copy path succeeded");
577                    result
578                } else {
579                    // Fall back to pipeline-based execution if zero-copy path is not applicable
580                    // (e.g., complex predicates, multiple tables, unsupported aggregates)
581                    log::debug!("Native columnar: zero-copy path not applicable, trying pipeline");
582                    match self.execute_via_pipeline(
583                        stmt,
584                        cte_results,
585                        NativeColumnarPipeline::new,
586                        "NativeColumnar",
587                    )? {
588                        Some(result) => result,
589                        None => {
590                            // Fall back to row-oriented if pipeline also fails
591                            log::debug!("Native columnar runtime fallback to row-oriented");
592                            #[cfg(feature = "profile-q6")]
593                            eprintln!("[PROFILE-Q6] Native columnar fallback to row-oriented");
594                            self.execute_row_oriented(stmt, cte_results)?
595                        }
596                    }
597                }
598            }
599
600            ExecutionStrategy::StandardColumnar { .. } => {
601                // StandardColumnar uses the pipeline-based execution path
602                // Note: We don't use try_native_columnar_execution here because row tables
603                // go through the pipeline which correctly handles all data types including dates.
604                // The native columnar zero-copy path has known limitations with certain date
605                // comparisons.
606                match self.execute_via_pipeline(
607                    stmt,
608                    cte_results,
609                    ColumnarPipeline::new,
610                    "StandardColumnar",
611                )? {
612                    Some(result) => result,
613                    None => {
614                        log::debug!("Standard columnar runtime fallback to row-oriented");
615                        #[cfg(feature = "profile-q6")]
616                        eprintln!("[PROFILE-Q6] Standard columnar fallback to row-oriented");
617                        self.execute_row_oriented(stmt, cte_results)?
618                    }
619                }
620            }
621
622            ExecutionStrategy::RowOriented { .. } => {
623                // Row-oriented uses the traditional path which has full feature support
624                // The RowOrientedPipeline is used for simpler queries, but complex
625                // queries (with JOINs, window functions, DISTINCT, etc.) need the
626                // full execute_row_oriented implementation
627
628                // Phase 4: Try columnar join execution for multi-table JOIN queries (#2943)
629                // This provides 3-5x speedup for TPC-H Q3 style queries
630                let has_joins = stmt
631                    .from
632                    .as_ref()
633                    .is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
634                if has_joins {
635                    if let Some(result) = self.try_columnar_join_execution(stmt, cte_results)? {
636                        log::info!("Columnar join execution succeeded");
637                        // Apply LIMIT/OFFSET to columnar join results (#3776)
638                        // Skip if set_operation exists - it will be applied later
639                        if stmt.set_operation.is_none() {
640                            let limit_val = stmt
641                                .limit
642                                .as_ref()
643                                .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
644                                .transpose()?;
645                            let offset_val = stmt
646                                .offset
647                                .as_ref()
648                                .map(|expr| self.eval_limit_offset_expr(expr, "OFFSET"))
649                                .transpose()?;
650                            apply_limit_offset(result, limit_val, offset_val)
651                        } else {
652                            result
653                        }
654                    } else {
655                        log::debug!(
656                            "Columnar join execution not applicable, falling back to row-oriented"
657                        );
658                        self.execute_row_oriented(stmt, cte_results)?
659                    }
660                } else {
661                    self.execute_row_oriented(stmt, cte_results)?
662                }
663            }
664
665            ExecutionStrategy::ExpressionOnly { .. } => {
666                // SELECT without FROM - special case that doesn't use pipelines
667                // May still have aggregates (e.g., SELECT COUNT(*), SELECT MAX(1))
668                // Note: Do NOT use early return here - we need to fall through to set operations
669                // handling
670                self.execute_expression_only(stmt, cte_results)?
671            }
672        };
673
674        // Handle set operations (UNION, INTERSECT, EXCEPT)
675        // Process operations left-to-right to ensure correct associativity
676        if let Some(set_op) = &stmt.set_operation {
677            // Extract collations from the leftmost SELECT list for set operation comparisons
678            // Use schema-aware lookup to get column collations from CREATE TABLE definitions
679            let collations = Self::extract_collations_from_select_list_with_schema(
680                &stmt.select_list,
681                Some(self.database),
682                stmt.from.as_ref(),
683            );
684            // Issue #4602: Compute left column count from AST for schema-level validation
685            // This is needed when the left result set is empty (table has no rows)
686            // Issue #4922: Must propagate errors (not use .ok()) to catch column count mismatches
687            // in set operations like UNION/INTERSECT/EXCEPT
688            let left_col_count = super::nonagg::compute_select_list_column_count(
689                stmt,
690                self.database,
691                Some(cte_results),
692            )?;
693            let left_col_count = Some(left_col_count);
694            results = self.execute_set_operations(
695                results,
696                set_op,
697                cte_results,
698                &collations,
699                left_col_count,
700            )?;
701
702            // Apply ORDER BY after set operations (if specified)
703            // The UNION's default sort is overridden by explicit ORDER BY
704            if let Some(order_by) = &stmt.order_by {
705                // Collect aliases from all UNION branches for ORDER BY resolution
706                // Pass database to enable wildcard expansion using table schemas
707                let all_aliases = super::set_operations::collect_union_aliases(self.database, stmt);
708                // Pass column collations from the first SELECT for collation-aware sorting
709                results = self.sort_set_operation_results(
710                    results,
711                    order_by,
712                    &stmt.select_list,
713                    &all_aliases,
714                    &collations,
715                )?;
716            }
717
718            // Apply LIMIT/OFFSET to the final result (after all set operations and ORDER BY)
719            // For queries WITHOUT set operations, LIMIT/OFFSET is already applied
720            // in execute_without_aggregation() or execute_with_aggregation()
721            let limit_val = stmt
722                .limit
723                .as_ref()
724                .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
725                .transpose()?;
726            let offset_val = stmt
727                .offset
728                .as_ref()
729                .map(|expr| self.eval_limit_offset_expr(expr, "OFFSET"))
730                .transpose()?;
731            results = apply_limit_offset(results, limit_val, offset_val);
732        }
733
734        Ok(results)
735    }
736
737    /// Execute SELECT without FROM clause (ExpressionOnly strategy)
738    ///
739    /// This is a special case that doesn't use the pipeline trait since there's
740    /// no table scan involved. Handles both simple expressions, aggregates,
741    /// and standalone VALUES statements.
742    fn execute_expression_only(
743        &self,
744        stmt: &vibesql_ast::SelectStmt,
745        cte_results: &HashMap<String, CteResult>,
746    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
747        // Handle standalone VALUES statements (Issue #4546)
748        // VALUES(1,2), (3,4) returns rows directly without a SELECT list
749        if let Some(values_rows) = &stmt.values {
750            let from_result = crate::select::scan::values::execute_values(
751                values_rows,
752                "_values_",
753                None,
754                Some(self.database),
755            )?;
756            let mut results = from_result.into_rows();
757
758            // Issue #4696: If there's a set_operation, don't apply ORDER BY or LIMIT/OFFSET here
759            // Let the caller handle them after set operations are processed
760            if stmt.set_operation.is_some() {
761                return Ok(results);
762            }
763
764            // Apply ORDER BY if specified
765            if let Some(order_by) = &stmt.order_by {
766                // For VALUES, column names are column1, column2, etc.
767                // We need to map ORDER BY expressions to column indices
768                results = self.apply_values_order_by(results, order_by, values_rows)?;
769            }
770
771            // Apply LIMIT/OFFSET
772            let limit_val = stmt
773                .limit
774                .as_ref()
775                .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
776                .transpose()?;
777            let offset_val = stmt
778                .offset
779                .as_ref()
780                .map(|expr| self.eval_limit_offset_expr(expr, "OFFSET"))
781                .transpose()?;
782            return Ok(apply_limit_offset(results, limit_val, offset_val));
783        }
784
785        let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
786
787        if has_aggregates {
788            // Aggregates without FROM need the aggregation path
789            self.execute_with_aggregation(stmt, cte_results)
790        } else {
791            // Simple expression evaluation (e.g., SELECT 1 + 1)
792            self.execute_select_without_from(stmt)
793        }
794    }
795
796    /// Apply ORDER BY to VALUES statement results
797    fn apply_values_order_by(
798        &self,
799        mut rows: Vec<vibesql_storage::Row>,
800        order_by: &[vibesql_ast::OrderByItem],
801        values_rows: &[Vec<vibesql_ast::Expression>],
802    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
803        // For VALUES, we sort by column position (1-indexed)
804        // or by column name (column1, column2, etc.)
805        let num_cols = values_rows.first().map(|r| r.len()).unwrap_or(0);
806
807        rows.sort_by(|a, b| {
808            for item in order_by {
809                // Get the column index from the ORDER BY expression
810                let col_idx = match &item.expr {
811                    vibesql_ast::Expression::Literal(vibesql_types::SqlValue::Integer(n)) => {
812                        // 1-indexed column position
813                        (*n as usize).saturating_sub(1)
814                    }
815                    vibesql_ast::Expression::ColumnRef(col_id) => {
816                        // column1, column2, etc.
817                        let col_name = col_id.column_canonical();
818                        if let Some(stripped) = col_name.strip_prefix("column") {
819                            stripped.parse::<usize>().unwrap_or(0).saturating_sub(1)
820                        } else {
821                            0
822                        }
823                    }
824                    _ => 0, // Default to first column for complex expressions
825                };
826
827                if col_idx < num_cols {
828                    let cmp = a.values[col_idx].partial_cmp(&b.values[col_idx]);
829                    if let Some(ord) = cmp {
830                        let ord = match item.direction {
831                            vibesql_ast::OrderDirection::Asc => ord,
832                            vibesql_ast::OrderDirection::Desc => ord.reverse(),
833                        };
834                        if ord != std::cmp::Ordering::Equal {
835                            return ord;
836                        }
837                    }
838                }
839            }
840            std::cmp::Ordering::Equal
841        });
842
843        Ok(rows)
844    }
845
846    /// Execute a query using the specified execution pipeline
847    ///
848    /// This method provides a unified interface for pipeline-based execution.
849    /// It creates the pipeline, prepares input, and executes the pipeline stages.
850    ///
851    /// Returns `Ok(Some(results))` if the pipeline executed successfully,
852    /// `Ok(None)` if the pipeline cannot handle the query (fallback needed),
853    /// or `Err` if an error occurred.
854    ///
855    /// # Type Parameters
856    ///
857    /// * `P` - The pipeline type (must implement `ExecutionPipeline`)
858    /// * `F` - Factory function to create the pipeline
859    fn execute_via_pipeline<P, F>(
860        &self,
861        stmt: &vibesql_ast::SelectStmt,
862        cte_results: &HashMap<String, CteResult>,
863        create_pipeline: F,
864        strategy_name: &str,
865    ) -> Result<Option<Vec<vibesql_storage::Row>>, ExecutorError>
866    where
867        P: ExecutionPipeline,
868        F: FnOnce() -> P,
869    {
870        #[cfg(feature = "profile-q6")]
871        let start = std::time::Instant::now();
872
873        // Check query complexity - pipelines don't support all features
874        let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
875        let has_group_by = stmt.group_by.is_some();
876        let has_joins =
877            stmt.from.as_ref().is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
878        let has_order_by = stmt.order_by.is_some();
879        let has_distinct = stmt.distinct;
880        let has_set_ops = stmt.set_operation.is_some();
881        let has_window_funcs = self.has_window_functions(&stmt.select_list);
882        let has_distinct_aggregates = self.has_distinct_aggregates(&stmt.select_list);
883
884        // Create the pipeline
885        let pipeline = create_pipeline();
886
887        // Check if the pipeline supports this query pattern
888        if !pipeline.supports_query_pattern(has_aggregates, has_group_by, has_joins) {
889            log::debug!(
890                "{} pipeline doesn't support query pattern (agg={}, group_by={}, joins={})",
891                strategy_name,
892                has_aggregates,
893                has_group_by,
894                has_joins
895            );
896            return Ok(None);
897        }
898
899        // For complex queries (ORDER BY, DISTINCT, window functions, set ops, DISTINCT aggregates),
900        // fall back to full execution paths which have complete support
901        if has_order_by
902            || has_distinct
903            || has_window_funcs
904            || has_set_ops
905            || has_distinct_aggregates
906        {
907            log::debug!(
908                "{} pipeline doesn't support complex features (order_by={}, distinct={}, window={}, set_ops={}, distinct_agg={})",
909                strategy_name,
910                has_order_by,
911                has_distinct,
912                has_window_funcs,
913                has_set_ops,
914                has_distinct_aggregates
915            );
916            return Ok(None);
917        }
918
919        // Must have a FROM clause for pipeline execution
920        let from_clause = match &stmt.from {
921            Some(from) => from,
922            None => return Ok(None),
923        };
924
925        // Execute FROM clause to get input data
926        // Note: WHERE, ORDER BY, and LIMIT are handled by the pipeline, not here
927        // Note: Table elimination requires WHERE clause, so pass None for select_list too
928        let from_result = self.execute_from_with_where(
929            from_clause,
930            cte_results,
931            None, // Pipeline will apply WHERE filter
932            None, // ORDER BY handled separately
933            None, // LIMIT applied after pipeline
934            None, // No table elimination when WHERE is deferred
935        )?;
936
937        // Build execution context
938        let mut exec_ctx = ExecutionContext::new(&from_result.schema, self.database);
939        // Add outer context for correlated subqueries (#2998)
940        if let (Some(outer_row), Some(outer_schema)) = (self.outer_row, self.outer_schema) {
941            exec_ctx = exec_ctx.with_outer_context(outer_row, outer_schema);
942        }
943        // Add CTE context if available
944        if !cte_results.is_empty() {
945            exec_ctx = exec_ctx.with_cte_context(cte_results);
946        }
947
948        // Validate column references BEFORE processing
949        super::validation::validate_select_columns_with_context(
950            &stmt.select_list,
951            stmt.where_clause.as_ref(),
952            &from_result.schema,
953            self.procedural_context,
954            self.outer_schema,
955        )?;
956
957        // Prepare input from FROM result
958        let input = PipelineInput::from_rows_owned(from_result.data.into_rows());
959
960        // Execute pipeline stages with fallback on error
961        // If any pipeline stage fails with UnsupportedFeature, fall back to row-oriented
962
963        // Resolve SELECT aliases in WHERE clause (SQLite extension)
964        // This allows queries like: SELECT f1-22 AS x FROM t1 WHERE x > 0
965        // IMPORTANT: Use schema-aware resolution to avoid incorrectly substituting
966        // table column names with aggregate aliases (issue #4XXX)
967        // Example: SELECT COUNT(*) AS col1 FROM tab0 WHERE col1 > 0
968        // Here 'col1' in WHERE refers to the TABLE COLUMN, not the COUNT(*) alias
969        // PERFORMANCE: Skip alias resolution if no aliases exist (common case in OLTP)
970        let resolved_where = if crate::select::order::select_list_has_aliases(&stmt.select_list) {
971            stmt.where_clause.as_ref().map(|where_expr| {
972                crate::select::order::resolve_where_aliases_with_schema(
973                    where_expr,
974                    &stmt.select_list,
975                    &from_result.schema,
976                )
977            })
978        } else {
979            stmt.where_clause.clone()
980        };
981
982        // Stage 1: Filter (WHERE clause)
983        let filtered = match pipeline.apply_filter(input, resolved_where.as_ref(), &exec_ctx) {
984            Ok(result) => result,
985            Err(ExecutorError::UnsupportedFeature(_))
986            | Err(ExecutorError::UnsupportedExpression(_)) => {
987                log::debug!("{} pipeline filter failed, falling back", strategy_name);
988                return Ok(None);
989            }
990            Err(e) => return Err(e),
991        };
992
993        // Stage 2: Projection or Aggregation
994        let result = if has_aggregates || has_group_by {
995            // Execute aggregation (includes projection)
996            // Get GROUP BY expressions if present (as slice)
997            let group_by_slice: Option<&[vibesql_ast::Expression]> =
998                stmt.group_by.as_ref().and_then(|g| g.as_simple()).map(|v| v.as_slice());
999            match pipeline.apply_aggregation(
1000                filtered.into_input(),
1001                &stmt.select_list,
1002                group_by_slice,
1003                stmt.having.as_ref(),
1004                &exec_ctx,
1005            ) {
1006                Ok(result) => result,
1007                Err(ExecutorError::UnsupportedFeature(_))
1008                | Err(ExecutorError::UnsupportedExpression(_)) => {
1009                    log::debug!("{} pipeline aggregation failed, falling back", strategy_name);
1010                    return Ok(None);
1011                }
1012                Err(e) => return Err(e),
1013            }
1014        } else {
1015            // Execute projection only
1016            match pipeline.apply_projection(filtered.into_input(), &stmt.select_list, &exec_ctx) {
1017                Ok(result) => result,
1018                Err(ExecutorError::UnsupportedFeature(_))
1019                | Err(ExecutorError::UnsupportedExpression(_)) => {
1020                    log::debug!("{} pipeline projection failed, falling back", strategy_name);
1021                    return Ok(None);
1022                }
1023                Err(e) => return Err(e),
1024            }
1025        };
1026
1027        // Stage 3: Limit/Offset (evaluate expressions and convert to u64)
1028        let limit_usize = stmt
1029            .limit
1030            .as_ref()
1031            .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
1032            .transpose()?;
1033        let offset_usize = stmt
1034            .offset
1035            .as_ref()
1036            .map(|expr| self.eval_limit_offset_expr(expr, "OFFSET"))
1037            .transpose()?;
1038        let limit_u64 = limit_usize.map(|l| l as u64);
1039        let offset_u64 = offset_usize.map(|o| o as u64);
1040        let final_result = pipeline.apply_limit_offset(result, limit_u64, offset_u64)?;
1041
1042        #[cfg(feature = "profile-q6")]
1043        {
1044            eprintln!("[PROFILE-Q6] ✓ {} pipeline execution: {:?}", strategy_name, start.elapsed());
1045        }
1046
1047        log::debug!("✓ {} pipeline execution succeeded", strategy_name);
1048        Ok(Some(final_result))
1049    }
1050
1051    /// Check if the select list contains window functions
1052    fn has_window_functions(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
1053        select_list.iter().any(|item| {
1054            if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
1055                self.expr_has_window_function(expr)
1056            } else {
1057                false
1058            }
1059        })
1060    }
1061
1062    /// Recursively check if an expression contains a window function
1063    #[allow(clippy::only_used_in_recursion)]
1064    fn expr_has_window_function(&self, expr: &vibesql_ast::Expression) -> bool {
1065        match expr {
1066            vibesql_ast::Expression::WindowFunction { .. } => true,
1067            vibesql_ast::Expression::BinaryOp { left, right, .. } => {
1068                self.expr_has_window_function(left) || self.expr_has_window_function(right)
1069            }
1070            vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_window_function(expr),
1071            vibesql_ast::Expression::Function { args, .. } => {
1072                args.iter().any(|arg| self.expr_has_window_function(arg))
1073            }
1074            vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
1075                operand.as_ref().is_some_and(|e| self.expr_has_window_function(e))
1076                    || when_clauses.iter().any(|case_when| {
1077                        case_when.conditions.iter().any(|c| self.expr_has_window_function(c))
1078                            || self.expr_has_window_function(&case_when.result)
1079                    })
1080                    || else_result.as_ref().is_some_and(|e| self.expr_has_window_function(e))
1081            }
1082            _ => false,
1083        }
1084    }
1085
1086    /// Check if the select list contains any DISTINCT aggregates (e.g., COUNT(DISTINCT x))
1087    fn has_distinct_aggregates(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
1088        select_list.iter().any(|item| {
1089            if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
1090                self.expr_has_distinct_aggregate(expr)
1091            } else {
1092                false
1093            }
1094        })
1095    }
1096
1097    /// Recursively check if an expression contains a DISTINCT aggregate
1098    #[allow(clippy::only_used_in_recursion)]
1099    fn expr_has_distinct_aggregate(&self, expr: &vibesql_ast::Expression) -> bool {
1100        match expr {
1101            vibesql_ast::Expression::AggregateFunction { distinct, .. } => *distinct,
1102            vibesql_ast::Expression::BinaryOp { left, right, .. } => {
1103                self.expr_has_distinct_aggregate(left) || self.expr_has_distinct_aggregate(right)
1104            }
1105            vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_distinct_aggregate(expr),
1106            vibesql_ast::Expression::Function { args, .. } => {
1107                args.iter().any(|arg| self.expr_has_distinct_aggregate(arg))
1108            }
1109            vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
1110                operand.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
1111                    || when_clauses.iter().any(|case_when| {
1112                        case_when.conditions.iter().any(|c| self.expr_has_distinct_aggregate(c))
1113                            || self.expr_has_distinct_aggregate(&case_when.result)
1114                    })
1115                    || else_result.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
1116            }
1117            _ => false,
1118        }
1119    }
1120
1121    /// Execute using traditional row-oriented path
1122    ///
1123    /// This is the fallback path when columnar execution is not available or not beneficial.
1124    fn execute_row_oriented(
1125        &self,
1126        stmt: &vibesql_ast::SelectStmt,
1127        cte_results: &HashMap<String, CteResult>,
1128    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
1129        let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
1130        let has_group_by = stmt.group_by.is_some();
1131
1132        if has_aggregates || has_group_by {
1133            self.execute_with_aggregation(stmt, cte_results)
1134        } else if let Some(from_clause) = &stmt.from {
1135            // Re-enabled predicate pushdown for all queries (issue #1902)
1136            //
1137            // Previously, predicate pushdown was selectively disabled for multi-column IN clauses
1138            // because index optimization happened in execute_without_aggregation() on row indices
1139            // from the FROM result. When predicate pushdown filtered rows early, the indices no
1140            // longer matched the original table, causing incorrect results.
1141            //
1142            // Now that all index optimization has been moved to the scan level
1143            // (execute_index_scan), it happens BEFORE predicate pushdown, avoiding the
1144            // row-index mismatch problem. This allows predicate pushdown to work
1145            // correctly for all queries, improving performance.
1146            //
1147            // Fixes issues #1807, #1895, #1896, and #1902.
1148
1149            // Resolve SELECT aliases in WHERE clause BEFORE predicate pushdown (SQLite extension)
1150            // This allows queries like: SELECT f1-22 AS x FROM t1 WHERE x > 0
1151            // The alias 'x' is resolved to 'f1-22' so predicate pushdown can work correctly
1152            // IMPORTANT: Use schema-aware resolution to avoid incorrectly substituting
1153            // table column names with aggregate aliases (SQLite behavior)
1154            // PERFORMANCE: Skip alias resolution if no aliases exist (common case in OLTP)
1155            let resolved_where = if stmt.where_clause.is_some()
1156                && crate::select::order::select_list_has_aliases(&stmt.select_list)
1157            {
1158                stmt.where_clause.as_ref().map(|where_expr| {
1159                    // Try to build early schema from FROM clause
1160                    if let Some(early_schema) =
1161                        super::aggregation::build_early_schema(from_clause, self.database)
1162                    {
1163                        // Use schema-aware resolution
1164                        return crate::select::order::resolve_where_aliases_with_schema(
1165                            where_expr,
1166                            &stmt.select_list,
1167                            &early_schema,
1168                        );
1169                    }
1170                    // Fall back to non-schema-aware resolution for complex FROM clauses
1171                    crate::select::order::resolve_where_aliases(where_expr, &stmt.select_list)
1172                })
1173            } else {
1174                stmt.where_clause.clone()
1175            };
1176
1177            // Pass WHERE, ORDER BY, and LIMIT to execute_from for optimization
1178            // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
1179            // Pass select_list for table elimination optimization (#3556)
1180            //
1181            // Don't pass ORDER BY if there's a set operation - it will be handled at the set
1182            // operation level
1183            let order_by_hint =
1184                if stmt.set_operation.is_some() { None } else { stmt.order_by.as_deref() };
1185            // Don't pass LIMIT hint for set operations - limit must be applied after combining
1186            // results This prevents early termination from incorrectly limiting the
1187            // left side of UNION queries
1188            let limit_val = if stmt.set_operation.is_some() {
1189                None
1190            } else {
1191                stmt.limit
1192                    .as_ref()
1193                    .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
1194                    .transpose()?
1195            };
1196            let from_result = self.execute_from_with_where(
1197                from_clause,
1198                cte_results,
1199                resolved_where.as_ref(),
1200                order_by_hint,
1201                limit_val,
1202                Some(&stmt.select_list),
1203            )?;
1204
1205            // Validate column references BEFORE processing rows (issue #2654)
1206            // This ensures column errors are caught even when tables are empty
1207            // Pass procedural context to allow procedure variables in WHERE clause
1208            // Pass outer_schema for correlated subqueries (#2694)
1209            // Note: We validate with the resolved_where since that's what gets executed
1210            super::validation::validate_select_columns_with_context(
1211                &stmt.select_list,
1212                resolved_where.as_ref(),
1213                &from_result.schema,
1214                self.procedural_context,
1215                self.outer_schema,
1216            )?;
1217
1218            self.execute_without_aggregation(stmt, from_result, cte_results)
1219        } else {
1220            // SELECT without FROM - evaluate expressions as a single row
1221            self.execute_select_without_from(stmt)
1222        }
1223    }
1224
1225    /// Execute a FROM clause with WHERE, ORDER BY, and LIMIT for optimization
1226    ///
1227    /// The LIMIT parameter enables early termination optimization (#3253):
1228    /// - When ORDER BY is satisfied by an index and no post-filter is needed, the index scan can
1229    ///   stop after fetching LIMIT rows
1230    ///
1231    /// Note: Table elimination (#3556) is now handled at the optimizer level
1232    /// via crate::optimizer::eliminate_unused_tables(), which runs before
1233    /// semi-join transformation to avoid complex interactions.
1234    pub(super) fn execute_from_with_where(
1235        &self,
1236        from: &vibesql_ast::FromClause,
1237        cte_results: &HashMap<String, CteResult>,
1238        where_clause: Option<&vibesql_ast::Expression>,
1239        order_by: Option<&[vibesql_ast::OrderByItem]>,
1240        limit: Option<usize>,
1241        _select_list: Option<&[vibesql_ast::SelectItem]>, /* No longer used - optimization moved
1242                                                           * to optimizer pass */
1243    ) -> Result<FromResult, ExecutorError> {
1244        use crate::select::scan::execute_from_clause;
1245
1246        let from_result = execute_from_clause(
1247            from,
1248            cte_results,
1249            self.database,
1250            where_clause,
1251            order_by,
1252            limit,
1253            self.outer_row,
1254            self.outer_schema,
1255            |query| {
1256                // For derived table subqueries, create a child executor with:
1257                // 1. CTE context (allows CTEs from outer WITH clause to be referenced)
1258                // 2. Outer context (allows correlated columns from outer queries to be resolved)
1259                // Critical for:
1260                // - TPC-DS Q2: CTEs in FROM subqueries
1261                // - select1-18.x: Nested correlated subqueries referencing outer columns
1262                if !cte_results.is_empty() {
1263                    // FIX for select1-18.x: When both CTE and outer context exist,
1264                    // we need to use new_with_outer_and_cte_and_depth to pass both
1265                    if let (Some(outer_row), Some(outer_schema)) =
1266                        (self.outer_row, self.outer_schema)
1267                    {
1268                        let child = SelectExecutor::new_with_outer_and_cte_and_depth(
1269                            self.database,
1270                            outer_row,
1271                            outer_schema,
1272                            cte_results,
1273                            self.subquery_depth,
1274                        );
1275                        child.execute_with_columns(query)
1276                    } else {
1277                        let child = SelectExecutor::new_with_cte_and_depth(
1278                            self.database,
1279                            cte_results,
1280                            self.subquery_depth,
1281                        );
1282                        child.execute_with_columns(query)
1283                    }
1284                } else if let (Some(outer_row), Some(outer_schema)) =
1285                    (self.outer_row, self.outer_schema)
1286                {
1287                    // FIX for select1-18.x: Pass outer context for derived table subqueries
1288                    // This enables column resolution from outer scopes in deeply nested queries
1289                    let child = SelectExecutor::new_with_outer_context_and_depth(
1290                        self.database,
1291                        outer_row,
1292                        outer_schema,
1293                        self.subquery_depth,
1294                    );
1295                    child.execute_with_columns(query)
1296                } else {
1297                    self.execute_with_columns(query)
1298                }
1299            },
1300        )?;
1301
1302        // NOTE: We DON'T merge outer schema with from_result.schema here because:
1303        // 1. from_result.rows only contain values from inner tables
1304        // 2. Outer columns are resolved via the evaluator's outer_row/outer_schema
1305        // 3. Merging would create schema/row mismatch (schema has outer cols, rows don't)
1306
1307        Ok(from_result)
1308    }
1309}