vibesql_executor/select/executor/execute.rs
1//! Main execution methods for SelectExecutor
2//!
3//! This module implements the unified execution dispatcher that routes queries
4//! to the appropriate execution pipeline based on the selected strategy.
5//!
6//! ## Execution Pipeline Architecture
7//!
8//! The dispatcher uses the `ExecutionPipeline` trait to provide a unified interface
9//! for query execution across different strategies:
10//!
11//! - **NativeColumnar**: Zero-copy SIMD execution from columnar storage
12//! - **StandardColumnar**: SIMD execution with row-to-batch conversion
13//! - **RowOriented**: Traditional row-by-row execution
14//! - **ExpressionOnly**: SELECT without FROM clause (special case)
15//!
16//! ```text
17//! Strategy Selection → Create Pipeline → Execute Pipeline Stages → Results
18//! ↓
19//! apply_filter → apply_projection → apply_aggregation → apply_limit_offset
20//! ```
21
22use std::collections::HashMap;
23
24use super::builder::SelectExecutor;
25use crate::{
26 errors::ExecutorError,
27 optimizer::adaptive::{choose_execution_strategy, ExecutionStrategy, StrategyContext},
28 pipeline::{
29 ColumnarPipeline, ExecutionContext, ExecutionPipeline, NativeColumnarPipeline,
30 PipelineInput,
31 },
32 select::{
33 cte::{execute_ctes, execute_ctes_with_memory_check, CteResult},
34 helpers::apply_limit_offset,
35 join::FromResult,
36 SelectResult,
37 },
38};
39
40impl SelectExecutor<'_> {
41 /// Execute a SELECT statement
42 pub fn execute(
43 &self,
44 stmt: &vibesql_ast::SelectStmt,
45 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
46 // Validate aggregate function argument counts FIRST (issue #4367)
47 // This catches errors like max(), min(*), sum(*) before any execution
48 // Must happen before any fast paths or strategy selection
49 super::validation::validate_aggregate_arguments(&stmt.select_list)?;
50
51 // Validate no nested aggregates (issue #4439)
52 // This catches errors like SUM(MIN(x)) before any execution
53 // Uses "misuse of aggregate function X()" format to match SQLite's resolve.c
54 super::validation::validate_no_nested_aggregates(&stmt.select_list)?;
55
56 // Validate join table limit (issue #4711)
57 // SQLite enforces a limit of 64 tables in a single join
58 // This catches queries like SELECT * FROM t, t, t, ... (65+ times)
59 super::validation::validate_join_table_limit(stmt)?;
60
61 // Validate aggregates with outer column references in subqueries (issue #4853)
62 // This catches the specific case where a scalar subquery appears as an argument
63 // to an outer aggregate function, and the subquery's aggregate references an
64 // outer column. Example: SELECT max((SELECT count(x) FROM t35b)) FROM t35a;
65 // Note: Standalone correlated subqueries are valid and allowed, e.g.,
66 // SELECT (SELECT count(x) FROM t35b) FROM t35a; is valid in SQLite.
67 super::validation::validate_aggregate_subquery_outer_refs(stmt, self.database)?;
68
69 #[cfg(feature = "profile-q6")]
70 let execute_start = std::time::Instant::now();
71
72 // Reset arena and clear pointer-based caches for fresh query execution (only at top level)
73 // The subquery hash cache uses pointer-based keys which become invalid when ASTs are
74 // dropped and memory is reused, so we must clear it between queries.
75 // Note: The IN subquery result cache and correlation cache use content hashes as keys,
76 // not pointers, so they can safely persist across queries.
77 if self.subquery_depth == 0 {
78 self.reset_arena();
79 crate::evaluator::caching::clear_subquery_hash_cache();
80 }
81
82 // Check timeout before starting execution
83 self.check_timeout()?;
84
85 // Check subquery depth limit to prevent stack overflow
86 if self.subquery_depth >= crate::limits::MAX_EXPRESSION_DEPTH {
87 return Err(ExecutorError::ExpressionDepthExceeded {
88 depth: self.subquery_depth,
89 max_depth: crate::limits::MAX_EXPRESSION_DEPTH,
90 });
91 }
92
93 // Fast path for simple point-lookup queries (TPC-C optimization)
94 // This bypasses expensive optimizer passes for queries like:
95 // SELECT col FROM table WHERE pk = value
96 // Skip fast path if reverse_unordered_selects is ON and no ORDER BY (needs reversal)
97 let skip_fast_path_for_reversal =
98 self.database.reverse_unordered_selects() && stmt.order_by.is_none();
99 if self.subquery_depth == 0
100 && self.outer_row.is_none()
101 && self.cte_context.is_none()
102 && !skip_fast_path_for_reversal
103 && super::fast_path::is_simple_point_query(stmt)
104 {
105 return self.execute_fast_path(stmt);
106 }
107
108 // Streaming aggregate fast path (#3815)
109 // For queries like: SELECT SUM(k) FROM sbtest1 WHERE id BETWEEN ? AND ?
110 // Accumulates aggregates inline during PK range scan without materializing rows
111 if self.subquery_depth == 0
112 && self.outer_row.is_none()
113 && self.cte_context.is_none()
114 && !skip_fast_path_for_reversal
115 && super::fast_path::is_streaming_aggregate_query(stmt)
116 {
117 if let Ok(result) = self.execute_streaming_aggregate(stmt) {
118 return Ok(result);
119 }
120 // Fall through to standard path if streaming aggregate fails
121 }
122
123 #[cfg(feature = "profile-q6")]
124 let _setup_time = execute_start.elapsed();
125
126 // Apply subquery rewriting optimizations (Phase 2 of IN subquery optimization)
127 // - Rewrites correlated IN → EXISTS with LIMIT 1 for early termination
128 // - Adds DISTINCT to uncorrelated IN subqueries to reduce duplicate processing
129 // This works in conjunction with Phase 1 (HashSet optimization, #2136)
130 #[cfg(feature = "profile-q6")]
131 let optimizer_start = std::time::Instant::now();
132
133 let optimized_stmt = crate::optimizer::rewrite_subquery_optimizations(stmt);
134
135 // Apply scalar subquery decorrelation (#4760)
136 // Transforms correlated scalar subqueries with aggregates (e.g., AVG, SUM, MIN)
137 // into CTE + JOIN patterns for O(n) instead of O(n²) execution.
138 // Example: WHERE x > 1.2 * (SELECT AVG(y) FROM t WHERE t.c = outer.c)
139 // Becomes: WITH _cte AS (SELECT c, AVG(y) FROM t GROUP BY c)
140 // ... JOIN _cte ON outer.c = _cte.c WHERE x > 1.2 * _cte._avg
141 let optimized_stmt = crate::optimizer::apply_scalar_decorrelation(&optimized_stmt);
142
143 #[cfg(feature = "profile-q6")]
144 let _optimizer_time = optimizer_start.elapsed();
145
146 // Eliminate unused tables that create unnecessary cross joins (#3556)
147 // Must run BEFORE semi-join transformation to avoid complex interactions
148 // with derived tables from EXISTS/IN transformations
149 let optimized_stmt = crate::optimizer::eliminate_unused_tables(&optimized_stmt);
150
151 // Transform decorrelated IN/EXISTS subqueries to semi/anti-joins (#2424)
152 // This enables hash-based join execution instead of row-by-row subquery evaluation
153 // Converts WHERE clauses like "WHERE x IN (SELECT y FROM t)" to "SEMI JOIN t ON x = y"
154 let optimized_stmt = crate::optimizer::transform_subqueries_to_joins(&optimized_stmt);
155
156 // Execute CTEs if present and merge with outer query's CTE context
157 let mut cte_results = if let Some(with_clause) = &optimized_stmt.with_clause {
158 // This query has its own CTEs - execute them with memory tracking
159 execute_ctes_with_memory_check(
160 with_clause,
161 |query, cte_ctx| self.execute_with_ctes(query, cte_ctx),
162 |size| self.track_memory_allocation(size),
163 )?
164 } else {
165 HashMap::new()
166 };
167
168 // If we have access to outer query's CTEs (for subqueries), merge them in
169 // Local CTEs take precedence over outer CTEs if there are name conflicts
170 if let Some(outer_cte_ctx) = self.cte_context {
171 for (name, result) in outer_cte_ctx {
172 cte_results.entry(name.clone()).or_insert_with(|| result.clone());
173 }
174 }
175
176 #[cfg(feature = "profile-q6")]
177 let _pre_execute_time = execute_start.elapsed();
178
179 // Execute the main query with CTE context
180 let mut result = self.execute_with_ctes(&optimized_stmt, &cte_results)?;
181
182 #[cfg(feature = "profile-q6")]
183 {
184 let _total_execute = execute_start.elapsed();
185 }
186
187 // Apply PRAGMA reverse_unordered_selects if enabled
188 // Only reverse if there's no ORDER BY clause in the original statement
189 if stmt.order_by.is_none() && self.database.reverse_unordered_selects() {
190 result.reverse();
191 }
192
193 Ok(result)
194 }
195
196 /// Execute a SELECT statement and return an iterator over results
197 ///
198 /// This enables early termination when the full result set is not needed,
199 /// such as for IN subqueries where we stop after finding the first match.
200 ///
201 /// # Phase 1 Implementation (Early Termination for IN subqueries)
202 ///
203 /// Current implementation materializes results then returns an iterator.
204 /// This still enables early termination in the consumer (e.g., eval_in_subquery)
205 /// by stopping iteration when a match is found.
206 ///
207 /// Future optimization: Leverage the existing RowIterator infrastructure
208 /// (crate::select::iterator) for truly lazy evaluation that stops execution
209 /// early, not just iteration.
210 pub fn execute_iter(
211 &self,
212 stmt: &vibesql_ast::SelectStmt,
213 ) -> Result<impl Iterator<Item = vibesql_storage::Row>, ExecutorError> {
214 // For Phase 1, materialize then return iterator
215 // This still enables early termination in the consumer
216 let rows = self.execute(stmt)?;
217 Ok(rows.into_iter())
218 }
219
220 /// Execute a SELECT statement using the fast path directly
221 ///
222 /// This method is used by prepared statements with cached SimpleFastPath plans.
223 /// It bypasses the `is_simple_point_query()` check because the eligibility was
224 /// already determined at prepare time.
225 ///
226 /// # Performance
227 ///
228 /// For repeated execution of prepared statements, this saves the cost of
229 /// re-checking fast path eligibility on every execution (~5-10µs per query).
230 pub fn execute_fast_path_with_columns(
231 &self,
232 stmt: &vibesql_ast::SelectStmt,
233 ) -> Result<SelectResult, ExecutorError> {
234 // Reset arena for fresh query execution
235 if self.subquery_depth == 0 {
236 self.reset_arena();
237 }
238
239 // Check timeout before starting execution
240 self.check_timeout()?;
241
242 // Execute via fast path directly (skip is_simple_point_query check)
243 let rows = self.execute_fast_path(stmt)?;
244
245 // Derive column names from the SELECT list
246 // For fast path queries, we don't have a FromResult, so pass None
247 // The column derivation will use the SELECT list expressions directly
248 let columns = self.derive_fast_path_column_names(stmt)?;
249
250 Ok(SelectResult { columns, rows })
251 }
252
253 /// Derive column names for fast path execution
254 ///
255 /// For fast path queries, we derive column names directly from the SELECT list
256 /// and table schema without going through the full FROM clause execution.
257 ///
258 /// # Performance Note (#3780)
259 ///
260 /// This method is called by `Session::execute_prepared()` to cache column names
261 /// in `SimpleFastPathPlan`. After the first execution, cached column names are
262 /// reused to avoid repeated table lookups and column name derivation.
263 pub fn derive_fast_path_column_names(
264 &self,
265 stmt: &vibesql_ast::SelectStmt,
266 ) -> Result<Vec<String>, ExecutorError> {
267 use vibesql_ast::{FromClause, SelectItem};
268
269 // Get table name and schema for column resolution
270 let (table_name, table_alias) = match &stmt.from {
271 Some(FromClause::Table { name, alias, .. }) => (name.as_str(), alias.as_deref()),
272 _ => {
273 return Err(ExecutorError::Other(
274 "Fast path requires simple table FROM clause".to_string(),
275 ))
276 }
277 };
278
279 let table = self
280 .database
281 .get_table(table_name)
282 .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
283
284 let mut columns = Vec::with_capacity(stmt.select_list.len());
285
286 for item in &stmt.select_list {
287 match item {
288 SelectItem::Wildcard { .. } => {
289 // Add all columns from the table
290 for col in &table.schema.columns {
291 columns.push(col.name.clone());
292 }
293 }
294 SelectItem::QualifiedWildcard { qualifier, .. } => {
295 // Check if qualifier matches table name or alias
296 let effective_name = table_alias.unwrap_or(table_name);
297 if qualifier.eq_ignore_ascii_case(effective_name)
298 || qualifier.eq_ignore_ascii_case(table_name)
299 {
300 for col in &table.schema.columns {
301 columns.push(col.name.clone());
302 }
303 }
304 }
305 SelectItem::Expression { expr, alias: col_alias, .. } => {
306 // Use alias if provided, otherwise derive from expression
307 let col_name = if let Some(a) = col_alias {
308 a.clone()
309 } else {
310 self.derive_column_name_from_expr(expr)
311 };
312 columns.push(col_name);
313 }
314 }
315 }
316
317 Ok(columns)
318 }
319
320 /// Derive a column name from an expression
321 fn derive_column_name_from_expr(&self, expr: &vibesql_ast::Expression) -> String {
322 match expr {
323 vibesql_ast::Expression::ColumnRef(col_id) => col_id.column_canonical().to_string(),
324 vibesql_ast::Expression::Literal(val) => format!("{}", val),
325 vibesql_ast::Expression::Wildcard => "*".to_string(),
326 _ => "?column?".to_string(),
327 }
328 }
329
330 /// Execute a SELECT statement and return both columns and rows
331 pub fn execute_with_columns(
332 &self,
333 stmt: &vibesql_ast::SelectStmt,
334 ) -> Result<SelectResult, ExecutorError> {
335 // Resolve SELECT aliases in WHERE clause BEFORE predicate pushdown (SQLite extension)
336 // This allows queries like: SELECT f1-22 AS x FROM t1 WHERE x > 0
337 // IMPORTANT: Use schema-aware resolution to avoid incorrectly substituting
338 // table column names with aggregate aliases (SQLite behavior)
339 // Example: SELECT COUNT(*) AS col1 FROM tab0 WHERE col1 > 0
340 // Here 'col1' in WHERE refers to the TABLE COLUMN, not the COUNT(*) alias
341 // PERFORMANCE: Skip alias resolution if no aliases exist (common case in OLTP)
342 let resolved_where = if stmt.where_clause.is_some()
343 && crate::select::order::select_list_has_aliases(&stmt.select_list)
344 {
345 stmt.where_clause.as_ref().map(|where_expr| {
346 // Try to build early schema from FROM clause
347 if let Some(from_clause) = &stmt.from {
348 if let Some(early_schema) =
349 super::aggregation::build_early_schema(from_clause, self.database)
350 {
351 // Use schema-aware resolution
352 return crate::select::order::resolve_where_aliases_with_schema(
353 where_expr,
354 &stmt.select_list,
355 &early_schema,
356 );
357 }
358 }
359 // Fall back to non-schema-aware resolution for complex FROM clauses
360 crate::select::order::resolve_where_aliases(where_expr, &stmt.select_list)
361 })
362 } else {
363 stmt.where_clause.clone()
364 };
365
366 // First, get the FROM result to access the schema
367 let from_result = if let Some(from_clause) = &stmt.from {
368 let mut cte_results = if let Some(with_clause) = &stmt.with_clause {
369 execute_ctes(with_clause, |query, cte_ctx| self.execute_with_ctes(query, cte_ctx))?
370 } else {
371 HashMap::new()
372 };
373 // If we have access to outer query's CTEs (for subqueries/derived tables), merge them
374 // in Local CTEs take precedence over outer CTEs if there are name conflicts
375 // This is critical for queries like TPC-DS Q2 where CTEs are referenced from derived
376 // tables
377 if let Some(outer_cte_ctx) = self.cte_context {
378 for (name, result) in outer_cte_ctx {
379 cte_results.entry(name.clone()).or_insert_with(|| result.clone());
380 }
381 }
382 // Pass WHERE, ORDER BY, and LIMIT for optimizations
383 // This is critical for GROUP BY queries to avoid CROSS JOINs
384 // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
385 // Pass select_list for table elimination optimization (#3556)
386 let limit_val = stmt
387 .limit
388 .as_ref()
389 .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
390 .transpose()?;
391 Some(self.execute_from_with_where(
392 from_clause,
393 &cte_results,
394 resolved_where.as_ref(),
395 stmt.order_by.as_deref(),
396 limit_val,
397 Some(&stmt.select_list),
398 )?)
399 } else {
400 None
401 };
402
403 // Derive column names from the SELECT list (with table prefix for display)
404 // Issue #4696: For VALUES statements, select_list is empty - derive from VALUES rows
405 let columns = if stmt.select_list.is_empty() {
406 if let Some(values_rows) = &stmt.values {
407 // Generate column names: column1, column2, etc.
408 let num_cols = values_rows.first().map(|r| r.len()).unwrap_or(0);
409 (1..=num_cols).map(|i| format!("column{}", i)).collect()
410 } else {
411 // Empty select_list and no VALUES - return empty columns
412 Vec::new()
413 }
414 } else {
415 self.derive_column_names(&stmt.select_list, from_result.as_ref())?
416 };
417
418 // Execute the query to get rows
419 let rows = self.execute(stmt)?;
420
421 Ok(SelectResult { columns, rows })
422 }
423
424 /// Execute SELECT statement and return results with simple column names
425 ///
426 /// This is similar to `execute_with_columns` but returns column names without
427 /// table prefixes. This is used for internal purposes like view creation
428 /// where the full table.column format would cause column lookup issues.
429 pub fn execute_with_simple_columns(
430 &self,
431 stmt: &vibesql_ast::SelectStmt,
432 ) -> Result<SelectResult, ExecutorError> {
433 // Resolve SELECT aliases in WHERE clause BEFORE predicate pushdown (SQLite extension)
434 // This allows queries like: SELECT f1-22 AS x FROM t1 WHERE x > 0
435 // IMPORTANT: Use schema-aware resolution to avoid incorrectly substituting
436 // table column names with aggregate aliases (SQLite behavior)
437 // PERFORMANCE: Skip alias resolution if no aliases exist (common case in OLTP)
438 let resolved_where = if stmt.where_clause.is_some()
439 && crate::select::order::select_list_has_aliases(&stmt.select_list)
440 {
441 stmt.where_clause.as_ref().map(|where_expr| {
442 // Try to build early schema from FROM clause
443 if let Some(from_clause) = &stmt.from {
444 if let Some(early_schema) =
445 super::aggregation::build_early_schema(from_clause, self.database)
446 {
447 // Use schema-aware resolution
448 return crate::select::order::resolve_where_aliases_with_schema(
449 where_expr,
450 &stmt.select_list,
451 &early_schema,
452 );
453 }
454 }
455 // Fall back to non-schema-aware resolution for complex FROM clauses
456 crate::select::order::resolve_where_aliases(where_expr, &stmt.select_list)
457 })
458 } else {
459 stmt.where_clause.clone()
460 };
461
462 // Execute the FROM clause to get combined schema
463 let from_result = if let Some(from_clause) = &stmt.from {
464 let mut cte_results = if let Some(with_clause) = &stmt.with_clause {
465 execute_ctes(with_clause, |query, cte_ctx| self.execute_with_ctes(query, cte_ctx))?
466 } else {
467 HashMap::new()
468 };
469 // If we have access to outer query's CTEs (for subqueries/derived tables), merge them
470 if let Some(outer_cte_ctx) = self.cte_context {
471 for (name, result) in outer_cte_ctx {
472 cte_results.entry(name.clone()).or_insert_with(|| result.clone());
473 }
474 }
475 let limit_val = stmt
476 .limit
477 .as_ref()
478 .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
479 .transpose()?;
480 Some(self.execute_from_with_where(
481 from_clause,
482 &cte_results,
483 resolved_where.as_ref(),
484 stmt.order_by.as_deref(),
485 limit_val,
486 Some(&stmt.select_list),
487 )?)
488 } else {
489 None
490 };
491
492 // Derive column names from the SELECT list (without table prefix)
493 // Issue #4696: For VALUES statements, select_list is empty - derive from VALUES rows
494 let columns = if stmt.select_list.is_empty() {
495 if let Some(values_rows) = &stmt.values {
496 // Generate column names: column1, column2, etc.
497 let num_cols = values_rows.first().map(|r| r.len()).unwrap_or(0);
498 (1..=num_cols).map(|i| format!("column{}", i)).collect()
499 } else {
500 // Empty select_list and no VALUES - return empty columns
501 Vec::new()
502 }
503 } else {
504 self.derive_simple_column_names(&stmt.select_list, from_result.as_ref())?
505 };
506
507 // Execute the query to get rows
508 let rows = self.execute(stmt)?;
509
510 Ok(SelectResult { columns, rows })
511 }
512
513 /// Execute SELECT statement with CTE context
514 ///
515 /// Uses unified strategy selection to determine the optimal execution path:
516 /// - NativeColumnar: Zero-copy SIMD execution from columnar storage
517 /// - StandardColumnar: SIMD execution with row-to-batch conversion
518 /// - RowOriented: Traditional row-by-row execution
519 /// - ExpressionOnly: SELECT without FROM clause (special case)
520 ///
521 /// ## Pipeline-Based Execution (Phase 5)
522 ///
523 /// This method uses the `ExecutionPipeline` trait to provide a unified interface
524 /// for query execution. Each strategy creates an appropriate pipeline that
525 /// implements filter, projection, aggregation, and limit/offset operations.
526 ///
527 /// ```text
528 /// Strategy Selection → Create Pipeline → Execute via Trait Methods
529 /// ↓
530 /// NativeColumnar → NativeColumnarPipeline::apply_*()
531 /// StandardColumnar → ColumnarPipeline::apply_*()
532 /// RowOriented → RowOrientedPipeline::apply_*()
533 /// ExpressionOnly → Special case (no table scan)
534 /// ```
535 pub(super) fn execute_with_ctes(
536 &self,
537 stmt: &vibesql_ast::SelectStmt,
538 cte_results: &HashMap<String, CteResult>,
539 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
540 // Note: Aggregate argument validation is done in execute() at the entry point.
541 // See issue #4367.
542
543 #[cfg(feature = "profile-q6")]
544 let _execute_ctes_start = std::time::Instant::now();
545
546 // Check if native columnar is enabled via feature flag or env var
547 let native_columnar_enabled =
548 cfg!(feature = "native-columnar") || std::env::var("VIBESQL_NATIVE_COLUMNAR").is_ok();
549
550 // Use unified strategy selection for the execution path
551 let strategy_ctx = StrategyContext::new(stmt, cte_results, native_columnar_enabled);
552 let strategy = choose_execution_strategy(&strategy_ctx);
553
554 log::debug!(
555 "Execution strategy selected: {} (reason: {})",
556 strategy.name(),
557 strategy.score().reason
558 );
559
560 #[cfg(feature = "profile-q6")]
561 eprintln!(
562 "[PROFILE-Q6] Execution strategy: {} ({})",
563 strategy.name(),
564 strategy.score().reason
565 );
566
567 // Dispatch based on selected strategy using ExecutionPipeline trait
568 // Pipeline execution returns Option<Vec<Row>> - None means fallback needed
569 let mut results = match strategy {
570 ExecutionStrategy::NativeColumnar { .. } => {
571 // First try the optimized zero-copy native columnar path
572 // This uses ColumnarBatch::from_storage_columnar() for zero-copy conversion
573 // and executes filter+aggregate in a single pass without row materialization
574 if let Some(result) = self.try_native_columnar_execution(stmt, cte_results)? {
575 #[cfg(feature = "profile-q6")]
576 eprintln!("[PROFILE-Q6] Native columnar: zero-copy path succeeded");
577 result
578 } else {
579 // Fall back to pipeline-based execution if zero-copy path is not applicable
580 // (e.g., complex predicates, multiple tables, unsupported aggregates)
581 log::debug!("Native columnar: zero-copy path not applicable, trying pipeline");
582 match self.execute_via_pipeline(
583 stmt,
584 cte_results,
585 NativeColumnarPipeline::new,
586 "NativeColumnar",
587 )? {
588 Some(result) => result,
589 None => {
590 // Fall back to row-oriented if pipeline also fails
591 log::debug!("Native columnar runtime fallback to row-oriented");
592 #[cfg(feature = "profile-q6")]
593 eprintln!("[PROFILE-Q6] Native columnar fallback to row-oriented");
594 self.execute_row_oriented(stmt, cte_results)?
595 }
596 }
597 }
598 }
599
600 ExecutionStrategy::StandardColumnar { .. } => {
601 // StandardColumnar uses the pipeline-based execution path
602 // Note: We don't use try_native_columnar_execution here because row tables
603 // go through the pipeline which correctly handles all data types including dates.
604 // The native columnar zero-copy path has known limitations with certain date
605 // comparisons.
606 match self.execute_via_pipeline(
607 stmt,
608 cte_results,
609 ColumnarPipeline::new,
610 "StandardColumnar",
611 )? {
612 Some(result) => result,
613 None => {
614 log::debug!("Standard columnar runtime fallback to row-oriented");
615 #[cfg(feature = "profile-q6")]
616 eprintln!("[PROFILE-Q6] Standard columnar fallback to row-oriented");
617 self.execute_row_oriented(stmt, cte_results)?
618 }
619 }
620 }
621
622 ExecutionStrategy::RowOriented { .. } => {
623 // Row-oriented uses the traditional path which has full feature support
624 // The RowOrientedPipeline is used for simpler queries, but complex
625 // queries (with JOINs, window functions, DISTINCT, etc.) need the
626 // full execute_row_oriented implementation
627
628 // Phase 4: Try columnar join execution for multi-table JOIN queries (#2943)
629 // This provides 3-5x speedup for TPC-H Q3 style queries
630 let has_joins = stmt
631 .from
632 .as_ref()
633 .is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
634 if has_joins {
635 if let Some(result) = self.try_columnar_join_execution(stmt, cte_results)? {
636 log::info!("Columnar join execution succeeded");
637 // Apply LIMIT/OFFSET to columnar join results (#3776)
638 // Skip if set_operation exists - it will be applied later
639 if stmt.set_operation.is_none() {
640 let limit_val = stmt
641 .limit
642 .as_ref()
643 .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
644 .transpose()?;
645 let offset_val = stmt
646 .offset
647 .as_ref()
648 .map(|expr| self.eval_limit_offset_expr(expr, "OFFSET"))
649 .transpose()?;
650 apply_limit_offset(result, limit_val, offset_val)
651 } else {
652 result
653 }
654 } else {
655 log::debug!(
656 "Columnar join execution not applicable, falling back to row-oriented"
657 );
658 self.execute_row_oriented(stmt, cte_results)?
659 }
660 } else {
661 self.execute_row_oriented(stmt, cte_results)?
662 }
663 }
664
665 ExecutionStrategy::ExpressionOnly { .. } => {
666 // SELECT without FROM - special case that doesn't use pipelines
667 // May still have aggregates (e.g., SELECT COUNT(*), SELECT MAX(1))
668 // Note: Do NOT use early return here - we need to fall through to set operations
669 // handling
670 self.execute_expression_only(stmt, cte_results)?
671 }
672 };
673
674 // Handle set operations (UNION, INTERSECT, EXCEPT)
675 // Process operations left-to-right to ensure correct associativity
676 if let Some(set_op) = &stmt.set_operation {
677 // Extract collations from the leftmost SELECT list for set operation comparisons
678 // Use schema-aware lookup to get column collations from CREATE TABLE definitions
679 let collations = Self::extract_collations_from_select_list_with_schema(
680 &stmt.select_list,
681 Some(self.database),
682 stmt.from.as_ref(),
683 );
684 // Issue #4602: Compute left column count from AST for schema-level validation
685 // This is needed when the left result set is empty (table has no rows)
686 // Issue #4922: Must propagate errors (not use .ok()) to catch column count mismatches
687 // in set operations like UNION/INTERSECT/EXCEPT
688 let left_col_count = super::nonagg::compute_select_list_column_count(
689 stmt,
690 self.database,
691 Some(cte_results),
692 )?;
693 let left_col_count = Some(left_col_count);
694 results = self.execute_set_operations(
695 results,
696 set_op,
697 cte_results,
698 &collations,
699 left_col_count,
700 )?;
701
702 // Apply ORDER BY after set operations (if specified)
703 // The UNION's default sort is overridden by explicit ORDER BY
704 if let Some(order_by) = &stmt.order_by {
705 // Collect aliases from all UNION branches for ORDER BY resolution
706 // Pass database to enable wildcard expansion using table schemas
707 let all_aliases = super::set_operations::collect_union_aliases(self.database, stmt);
708 // Pass column collations from the first SELECT for collation-aware sorting
709 results = self.sort_set_operation_results(
710 results,
711 order_by,
712 &stmt.select_list,
713 &all_aliases,
714 &collations,
715 )?;
716 }
717
718 // Apply LIMIT/OFFSET to the final result (after all set operations and ORDER BY)
719 // For queries WITHOUT set operations, LIMIT/OFFSET is already applied
720 // in execute_without_aggregation() or execute_with_aggregation()
721 let limit_val = stmt
722 .limit
723 .as_ref()
724 .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
725 .transpose()?;
726 let offset_val = stmt
727 .offset
728 .as_ref()
729 .map(|expr| self.eval_limit_offset_expr(expr, "OFFSET"))
730 .transpose()?;
731 results = apply_limit_offset(results, limit_val, offset_val);
732 }
733
734 Ok(results)
735 }
736
737 /// Execute SELECT without FROM clause (ExpressionOnly strategy)
738 ///
739 /// This is a special case that doesn't use the pipeline trait since there's
740 /// no table scan involved. Handles both simple expressions, aggregates,
741 /// and standalone VALUES statements.
742 fn execute_expression_only(
743 &self,
744 stmt: &vibesql_ast::SelectStmt,
745 cte_results: &HashMap<String, CteResult>,
746 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
747 // Handle standalone VALUES statements (Issue #4546)
748 // VALUES(1,2), (3,4) returns rows directly without a SELECT list
749 if let Some(values_rows) = &stmt.values {
750 let from_result = crate::select::scan::values::execute_values(
751 values_rows,
752 "_values_",
753 None,
754 Some(self.database),
755 )?;
756 let mut results = from_result.into_rows();
757
758 // Issue #4696: If there's a set_operation, don't apply ORDER BY or LIMIT/OFFSET here
759 // Let the caller handle them after set operations are processed
760 if stmt.set_operation.is_some() {
761 return Ok(results);
762 }
763
764 // Apply ORDER BY if specified
765 if let Some(order_by) = &stmt.order_by {
766 // For VALUES, column names are column1, column2, etc.
767 // We need to map ORDER BY expressions to column indices
768 results = self.apply_values_order_by(results, order_by, values_rows)?;
769 }
770
771 // Apply LIMIT/OFFSET
772 let limit_val = stmt
773 .limit
774 .as_ref()
775 .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
776 .transpose()?;
777 let offset_val = stmt
778 .offset
779 .as_ref()
780 .map(|expr| self.eval_limit_offset_expr(expr, "OFFSET"))
781 .transpose()?;
782 return Ok(apply_limit_offset(results, limit_val, offset_val));
783 }
784
785 let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
786
787 if has_aggregates {
788 // Aggregates without FROM need the aggregation path
789 self.execute_with_aggregation(stmt, cte_results)
790 } else {
791 // Simple expression evaluation (e.g., SELECT 1 + 1)
792 self.execute_select_without_from(stmt)
793 }
794 }
795
796 /// Apply ORDER BY to VALUES statement results
797 fn apply_values_order_by(
798 &self,
799 mut rows: Vec<vibesql_storage::Row>,
800 order_by: &[vibesql_ast::OrderByItem],
801 values_rows: &[Vec<vibesql_ast::Expression>],
802 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
803 // For VALUES, we sort by column position (1-indexed)
804 // or by column name (column1, column2, etc.)
805 let num_cols = values_rows.first().map(|r| r.len()).unwrap_or(0);
806
807 rows.sort_by(|a, b| {
808 for item in order_by {
809 // Get the column index from the ORDER BY expression
810 let col_idx = match &item.expr {
811 vibesql_ast::Expression::Literal(vibesql_types::SqlValue::Integer(n)) => {
812 // 1-indexed column position
813 (*n as usize).saturating_sub(1)
814 }
815 vibesql_ast::Expression::ColumnRef(col_id) => {
816 // column1, column2, etc.
817 let col_name = col_id.column_canonical();
818 if let Some(stripped) = col_name.strip_prefix("column") {
819 stripped.parse::<usize>().unwrap_or(0).saturating_sub(1)
820 } else {
821 0
822 }
823 }
824 _ => 0, // Default to first column for complex expressions
825 };
826
827 if col_idx < num_cols {
828 let cmp = a.values[col_idx].partial_cmp(&b.values[col_idx]);
829 if let Some(ord) = cmp {
830 let ord = match item.direction {
831 vibesql_ast::OrderDirection::Asc => ord,
832 vibesql_ast::OrderDirection::Desc => ord.reverse(),
833 };
834 if ord != std::cmp::Ordering::Equal {
835 return ord;
836 }
837 }
838 }
839 }
840 std::cmp::Ordering::Equal
841 });
842
843 Ok(rows)
844 }
845
846 /// Execute a query using the specified execution pipeline
847 ///
848 /// This method provides a unified interface for pipeline-based execution.
849 /// It creates the pipeline, prepares input, and executes the pipeline stages.
850 ///
851 /// Returns `Ok(Some(results))` if the pipeline executed successfully,
852 /// `Ok(None)` if the pipeline cannot handle the query (fallback needed),
853 /// or `Err` if an error occurred.
854 ///
855 /// # Type Parameters
856 ///
857 /// * `P` - The pipeline type (must implement `ExecutionPipeline`)
858 /// * `F` - Factory function to create the pipeline
859 fn execute_via_pipeline<P, F>(
860 &self,
861 stmt: &vibesql_ast::SelectStmt,
862 cte_results: &HashMap<String, CteResult>,
863 create_pipeline: F,
864 strategy_name: &str,
865 ) -> Result<Option<Vec<vibesql_storage::Row>>, ExecutorError>
866 where
867 P: ExecutionPipeline,
868 F: FnOnce() -> P,
869 {
870 #[cfg(feature = "profile-q6")]
871 let start = std::time::Instant::now();
872
873 // Check query complexity - pipelines don't support all features
874 let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
875 let has_group_by = stmt.group_by.is_some();
876 let has_joins =
877 stmt.from.as_ref().is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
878 let has_order_by = stmt.order_by.is_some();
879 let has_distinct = stmt.distinct;
880 let has_set_ops = stmt.set_operation.is_some();
881 let has_window_funcs = self.has_window_functions(&stmt.select_list);
882 let has_distinct_aggregates = self.has_distinct_aggregates(&stmt.select_list);
883
884 // Create the pipeline
885 let pipeline = create_pipeline();
886
887 // Check if the pipeline supports this query pattern
888 if !pipeline.supports_query_pattern(has_aggregates, has_group_by, has_joins) {
889 log::debug!(
890 "{} pipeline doesn't support query pattern (agg={}, group_by={}, joins={})",
891 strategy_name,
892 has_aggregates,
893 has_group_by,
894 has_joins
895 );
896 return Ok(None);
897 }
898
899 // For complex queries (ORDER BY, DISTINCT, window functions, set ops, DISTINCT aggregates),
900 // fall back to full execution paths which have complete support
901 if has_order_by
902 || has_distinct
903 || has_window_funcs
904 || has_set_ops
905 || has_distinct_aggregates
906 {
907 log::debug!(
908 "{} pipeline doesn't support complex features (order_by={}, distinct={}, window={}, set_ops={}, distinct_agg={})",
909 strategy_name,
910 has_order_by,
911 has_distinct,
912 has_window_funcs,
913 has_set_ops,
914 has_distinct_aggregates
915 );
916 return Ok(None);
917 }
918
919 // Must have a FROM clause for pipeline execution
920 let from_clause = match &stmt.from {
921 Some(from) => from,
922 None => return Ok(None),
923 };
924
925 // Execute FROM clause to get input data
926 // Note: WHERE, ORDER BY, and LIMIT are handled by the pipeline, not here
927 // Note: Table elimination requires WHERE clause, so pass None for select_list too
928 let from_result = self.execute_from_with_where(
929 from_clause,
930 cte_results,
931 None, // Pipeline will apply WHERE filter
932 None, // ORDER BY handled separately
933 None, // LIMIT applied after pipeline
934 None, // No table elimination when WHERE is deferred
935 )?;
936
937 // Build execution context
938 let mut exec_ctx = ExecutionContext::new(&from_result.schema, self.database);
939 // Add outer context for correlated subqueries (#2998)
940 if let (Some(outer_row), Some(outer_schema)) = (self.outer_row, self.outer_schema) {
941 exec_ctx = exec_ctx.with_outer_context(outer_row, outer_schema);
942 }
943 // Add CTE context if available
944 if !cte_results.is_empty() {
945 exec_ctx = exec_ctx.with_cte_context(cte_results);
946 }
947
948 // Validate column references BEFORE processing
949 super::validation::validate_select_columns_with_context(
950 &stmt.select_list,
951 stmt.where_clause.as_ref(),
952 &from_result.schema,
953 self.procedural_context,
954 self.outer_schema,
955 )?;
956
957 // Prepare input from FROM result
958 let input = PipelineInput::from_rows_owned(from_result.data.into_rows());
959
960 // Execute pipeline stages with fallback on error
961 // If any pipeline stage fails with UnsupportedFeature, fall back to row-oriented
962
963 // Resolve SELECT aliases in WHERE clause (SQLite extension)
964 // This allows queries like: SELECT f1-22 AS x FROM t1 WHERE x > 0
965 // IMPORTANT: Use schema-aware resolution to avoid incorrectly substituting
966 // table column names with aggregate aliases (issue #4XXX)
967 // Example: SELECT COUNT(*) AS col1 FROM tab0 WHERE col1 > 0
968 // Here 'col1' in WHERE refers to the TABLE COLUMN, not the COUNT(*) alias
969 // PERFORMANCE: Skip alias resolution if no aliases exist (common case in OLTP)
970 let resolved_where = if crate::select::order::select_list_has_aliases(&stmt.select_list) {
971 stmt.where_clause.as_ref().map(|where_expr| {
972 crate::select::order::resolve_where_aliases_with_schema(
973 where_expr,
974 &stmt.select_list,
975 &from_result.schema,
976 )
977 })
978 } else {
979 stmt.where_clause.clone()
980 };
981
982 // Stage 1: Filter (WHERE clause)
983 let filtered = match pipeline.apply_filter(input, resolved_where.as_ref(), &exec_ctx) {
984 Ok(result) => result,
985 Err(ExecutorError::UnsupportedFeature(_))
986 | Err(ExecutorError::UnsupportedExpression(_)) => {
987 log::debug!("{} pipeline filter failed, falling back", strategy_name);
988 return Ok(None);
989 }
990 Err(e) => return Err(e),
991 };
992
993 // Stage 2: Projection or Aggregation
994 let result = if has_aggregates || has_group_by {
995 // Execute aggregation (includes projection)
996 // Get GROUP BY expressions if present (as slice)
997 let group_by_slice: Option<&[vibesql_ast::Expression]> =
998 stmt.group_by.as_ref().and_then(|g| g.as_simple()).map(|v| v.as_slice());
999 match pipeline.apply_aggregation(
1000 filtered.into_input(),
1001 &stmt.select_list,
1002 group_by_slice,
1003 stmt.having.as_ref(),
1004 &exec_ctx,
1005 ) {
1006 Ok(result) => result,
1007 Err(ExecutorError::UnsupportedFeature(_))
1008 | Err(ExecutorError::UnsupportedExpression(_)) => {
1009 log::debug!("{} pipeline aggregation failed, falling back", strategy_name);
1010 return Ok(None);
1011 }
1012 Err(e) => return Err(e),
1013 }
1014 } else {
1015 // Execute projection only
1016 match pipeline.apply_projection(filtered.into_input(), &stmt.select_list, &exec_ctx) {
1017 Ok(result) => result,
1018 Err(ExecutorError::UnsupportedFeature(_))
1019 | Err(ExecutorError::UnsupportedExpression(_)) => {
1020 log::debug!("{} pipeline projection failed, falling back", strategy_name);
1021 return Ok(None);
1022 }
1023 Err(e) => return Err(e),
1024 }
1025 };
1026
1027 // Stage 3: Limit/Offset (evaluate expressions and convert to u64)
1028 let limit_usize = stmt
1029 .limit
1030 .as_ref()
1031 .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
1032 .transpose()?;
1033 let offset_usize = stmt
1034 .offset
1035 .as_ref()
1036 .map(|expr| self.eval_limit_offset_expr(expr, "OFFSET"))
1037 .transpose()?;
1038 let limit_u64 = limit_usize.map(|l| l as u64);
1039 let offset_u64 = offset_usize.map(|o| o as u64);
1040 let final_result = pipeline.apply_limit_offset(result, limit_u64, offset_u64)?;
1041
1042 #[cfg(feature = "profile-q6")]
1043 {
1044 eprintln!("[PROFILE-Q6] ✓ {} pipeline execution: {:?}", strategy_name, start.elapsed());
1045 }
1046
1047 log::debug!("✓ {} pipeline execution succeeded", strategy_name);
1048 Ok(Some(final_result))
1049 }
1050
1051 /// Check if the select list contains window functions
1052 fn has_window_functions(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
1053 select_list.iter().any(|item| {
1054 if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
1055 self.expr_has_window_function(expr)
1056 } else {
1057 false
1058 }
1059 })
1060 }
1061
1062 /// Recursively check if an expression contains a window function
1063 #[allow(clippy::only_used_in_recursion)]
1064 fn expr_has_window_function(&self, expr: &vibesql_ast::Expression) -> bool {
1065 match expr {
1066 vibesql_ast::Expression::WindowFunction { .. } => true,
1067 vibesql_ast::Expression::BinaryOp { left, right, .. } => {
1068 self.expr_has_window_function(left) || self.expr_has_window_function(right)
1069 }
1070 vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_window_function(expr),
1071 vibesql_ast::Expression::Function { args, .. } => {
1072 args.iter().any(|arg| self.expr_has_window_function(arg))
1073 }
1074 vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
1075 operand.as_ref().is_some_and(|e| self.expr_has_window_function(e))
1076 || when_clauses.iter().any(|case_when| {
1077 case_when.conditions.iter().any(|c| self.expr_has_window_function(c))
1078 || self.expr_has_window_function(&case_when.result)
1079 })
1080 || else_result.as_ref().is_some_and(|e| self.expr_has_window_function(e))
1081 }
1082 _ => false,
1083 }
1084 }
1085
1086 /// Check if the select list contains any DISTINCT aggregates (e.g., COUNT(DISTINCT x))
1087 fn has_distinct_aggregates(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
1088 select_list.iter().any(|item| {
1089 if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
1090 self.expr_has_distinct_aggregate(expr)
1091 } else {
1092 false
1093 }
1094 })
1095 }
1096
1097 /// Recursively check if an expression contains a DISTINCT aggregate
1098 #[allow(clippy::only_used_in_recursion)]
1099 fn expr_has_distinct_aggregate(&self, expr: &vibesql_ast::Expression) -> bool {
1100 match expr {
1101 vibesql_ast::Expression::AggregateFunction { distinct, .. } => *distinct,
1102 vibesql_ast::Expression::BinaryOp { left, right, .. } => {
1103 self.expr_has_distinct_aggregate(left) || self.expr_has_distinct_aggregate(right)
1104 }
1105 vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_distinct_aggregate(expr),
1106 vibesql_ast::Expression::Function { args, .. } => {
1107 args.iter().any(|arg| self.expr_has_distinct_aggregate(arg))
1108 }
1109 vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
1110 operand.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
1111 || when_clauses.iter().any(|case_when| {
1112 case_when.conditions.iter().any(|c| self.expr_has_distinct_aggregate(c))
1113 || self.expr_has_distinct_aggregate(&case_when.result)
1114 })
1115 || else_result.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
1116 }
1117 _ => false,
1118 }
1119 }
1120
1121 /// Execute using traditional row-oriented path
1122 ///
1123 /// This is the fallback path when columnar execution is not available or not beneficial.
1124 fn execute_row_oriented(
1125 &self,
1126 stmt: &vibesql_ast::SelectStmt,
1127 cte_results: &HashMap<String, CteResult>,
1128 ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
1129 let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
1130 let has_group_by = stmt.group_by.is_some();
1131
1132 if has_aggregates || has_group_by {
1133 self.execute_with_aggregation(stmt, cte_results)
1134 } else if let Some(from_clause) = &stmt.from {
1135 // Re-enabled predicate pushdown for all queries (issue #1902)
1136 //
1137 // Previously, predicate pushdown was selectively disabled for multi-column IN clauses
1138 // because index optimization happened in execute_without_aggregation() on row indices
1139 // from the FROM result. When predicate pushdown filtered rows early, the indices no
1140 // longer matched the original table, causing incorrect results.
1141 //
1142 // Now that all index optimization has been moved to the scan level
1143 // (execute_index_scan), it happens BEFORE predicate pushdown, avoiding the
1144 // row-index mismatch problem. This allows predicate pushdown to work
1145 // correctly for all queries, improving performance.
1146 //
1147 // Fixes issues #1807, #1895, #1896, and #1902.
1148
1149 // Resolve SELECT aliases in WHERE clause BEFORE predicate pushdown (SQLite extension)
1150 // This allows queries like: SELECT f1-22 AS x FROM t1 WHERE x > 0
1151 // The alias 'x' is resolved to 'f1-22' so predicate pushdown can work correctly
1152 // IMPORTANT: Use schema-aware resolution to avoid incorrectly substituting
1153 // table column names with aggregate aliases (SQLite behavior)
1154 // PERFORMANCE: Skip alias resolution if no aliases exist (common case in OLTP)
1155 let resolved_where = if stmt.where_clause.is_some()
1156 && crate::select::order::select_list_has_aliases(&stmt.select_list)
1157 {
1158 stmt.where_clause.as_ref().map(|where_expr| {
1159 // Try to build early schema from FROM clause
1160 if let Some(early_schema) =
1161 super::aggregation::build_early_schema(from_clause, self.database)
1162 {
1163 // Use schema-aware resolution
1164 return crate::select::order::resolve_where_aliases_with_schema(
1165 where_expr,
1166 &stmt.select_list,
1167 &early_schema,
1168 );
1169 }
1170 // Fall back to non-schema-aware resolution for complex FROM clauses
1171 crate::select::order::resolve_where_aliases(where_expr, &stmt.select_list)
1172 })
1173 } else {
1174 stmt.where_clause.clone()
1175 };
1176
1177 // Pass WHERE, ORDER BY, and LIMIT to execute_from for optimization
1178 // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
1179 // Pass select_list for table elimination optimization (#3556)
1180 //
1181 // Don't pass ORDER BY if there's a set operation - it will be handled at the set
1182 // operation level
1183 let order_by_hint =
1184 if stmt.set_operation.is_some() { None } else { stmt.order_by.as_deref() };
1185 // Don't pass LIMIT hint for set operations - limit must be applied after combining
1186 // results This prevents early termination from incorrectly limiting the
1187 // left side of UNION queries
1188 let limit_val = if stmt.set_operation.is_some() {
1189 None
1190 } else {
1191 stmt.limit
1192 .as_ref()
1193 .map(|expr| self.eval_limit_offset_expr(expr, "LIMIT"))
1194 .transpose()?
1195 };
1196 let from_result = self.execute_from_with_where(
1197 from_clause,
1198 cte_results,
1199 resolved_where.as_ref(),
1200 order_by_hint,
1201 limit_val,
1202 Some(&stmt.select_list),
1203 )?;
1204
1205 // Validate column references BEFORE processing rows (issue #2654)
1206 // This ensures column errors are caught even when tables are empty
1207 // Pass procedural context to allow procedure variables in WHERE clause
1208 // Pass outer_schema for correlated subqueries (#2694)
1209 // Note: We validate with the resolved_where since that's what gets executed
1210 super::validation::validate_select_columns_with_context(
1211 &stmt.select_list,
1212 resolved_where.as_ref(),
1213 &from_result.schema,
1214 self.procedural_context,
1215 self.outer_schema,
1216 )?;
1217
1218 self.execute_without_aggregation(stmt, from_result, cte_results)
1219 } else {
1220 // SELECT without FROM - evaluate expressions as a single row
1221 self.execute_select_without_from(stmt)
1222 }
1223 }
1224
1225 /// Execute a FROM clause with WHERE, ORDER BY, and LIMIT for optimization
1226 ///
1227 /// The LIMIT parameter enables early termination optimization (#3253):
1228 /// - When ORDER BY is satisfied by an index and no post-filter is needed, the index scan can
1229 /// stop after fetching LIMIT rows
1230 ///
1231 /// Note: Table elimination (#3556) is now handled at the optimizer level
1232 /// via crate::optimizer::eliminate_unused_tables(), which runs before
1233 /// semi-join transformation to avoid complex interactions.
1234 pub(super) fn execute_from_with_where(
1235 &self,
1236 from: &vibesql_ast::FromClause,
1237 cte_results: &HashMap<String, CteResult>,
1238 where_clause: Option<&vibesql_ast::Expression>,
1239 order_by: Option<&[vibesql_ast::OrderByItem]>,
1240 limit: Option<usize>,
1241 _select_list: Option<&[vibesql_ast::SelectItem]>, /* No longer used - optimization moved
1242 * to optimizer pass */
1243 ) -> Result<FromResult, ExecutorError> {
1244 use crate::select::scan::execute_from_clause;
1245
1246 let from_result = execute_from_clause(
1247 from,
1248 cte_results,
1249 self.database,
1250 where_clause,
1251 order_by,
1252 limit,
1253 self.outer_row,
1254 self.outer_schema,
1255 |query| {
1256 // For derived table subqueries, create a child executor with:
1257 // 1. CTE context (allows CTEs from outer WITH clause to be referenced)
1258 // 2. Outer context (allows correlated columns from outer queries to be resolved)
1259 // Critical for:
1260 // - TPC-DS Q2: CTEs in FROM subqueries
1261 // - select1-18.x: Nested correlated subqueries referencing outer columns
1262 if !cte_results.is_empty() {
1263 // FIX for select1-18.x: When both CTE and outer context exist,
1264 // we need to use new_with_outer_and_cte_and_depth to pass both
1265 if let (Some(outer_row), Some(outer_schema)) =
1266 (self.outer_row, self.outer_schema)
1267 {
1268 let child = SelectExecutor::new_with_outer_and_cte_and_depth(
1269 self.database,
1270 outer_row,
1271 outer_schema,
1272 cte_results,
1273 self.subquery_depth,
1274 );
1275 child.execute_with_columns(query)
1276 } else {
1277 let child = SelectExecutor::new_with_cte_and_depth(
1278 self.database,
1279 cte_results,
1280 self.subquery_depth,
1281 );
1282 child.execute_with_columns(query)
1283 }
1284 } else if let (Some(outer_row), Some(outer_schema)) =
1285 (self.outer_row, self.outer_schema)
1286 {
1287 // FIX for select1-18.x: Pass outer context for derived table subqueries
1288 // This enables column resolution from outer scopes in deeply nested queries
1289 let child = SelectExecutor::new_with_outer_context_and_depth(
1290 self.database,
1291 outer_row,
1292 outer_schema,
1293 self.subquery_depth,
1294 );
1295 child.execute_with_columns(query)
1296 } else {
1297 self.execute_with_columns(query)
1298 }
1299 },
1300 )?;
1301
1302 // NOTE: We DON'T merge outer schema with from_result.schema here because:
1303 // 1. from_result.rows only contain values from inner tables
1304 // 2. Outer columns are resolved via the evaluator's outer_row/outer_schema
1305 // 3. Merging would create schema/row mismatch (schema has outer cols, rows don't)
1306
1307 Ok(from_result)
1308 }
1309}