vibesql_executor/select/executor/
execute.rs

1//! Main execution methods for SelectExecutor
2//!
3//! This module implements the unified execution dispatcher that routes queries
4//! to the appropriate execution pipeline based on the selected strategy.
5//!
6//! ## Execution Pipeline Architecture
7//!
8//! The dispatcher uses the `ExecutionPipeline` trait to provide a unified interface
9//! for query execution across different strategies:
10//!
11//! - **NativeColumnar**: Zero-copy SIMD execution from columnar storage
12//! - **StandardColumnar**: SIMD execution with row-to-batch conversion
13//! - **RowOriented**: Traditional row-by-row execution
14//! - **ExpressionOnly**: SELECT without FROM clause (special case)
15//!
16//! ```text
17//! Strategy Selection → Create Pipeline → Execute Pipeline Stages → Results
18//!                          ↓
19//!          apply_filter → apply_projection → apply_aggregation → apply_limit_offset
20//! ```
21
22use std::collections::HashMap;
23
24use super::builder::SelectExecutor;
25use crate::{
26    errors::ExecutorError,
27    optimizer::adaptive::{choose_execution_strategy, ExecutionStrategy, StrategyContext},
28    pipeline::{
29        ColumnarPipeline, ExecutionContext, ExecutionPipeline, NativeColumnarPipeline,
30        PipelineInput,
31    },
32    select::{
33        cte::{execute_ctes, execute_ctes_with_memory_check, CteResult},
34        helpers::{apply_limit_offset, estimate_result_size},
35        join::FromResult,
36        set_operations::apply_set_operation,
37        SelectResult,
38    },
39};
40
41impl SelectExecutor<'_> {
42    /// Execute a SELECT statement
43    pub fn execute(
44        &self,
45        stmt: &vibesql_ast::SelectStmt,
46    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
47        #[cfg(feature = "profile-q6")]
48        let execute_start = std::time::Instant::now();
49
50        // Reset arena for fresh query execution (only at top level)
51        if self.subquery_depth == 0 {
52            self.reset_arena();
53        }
54
55        // Check timeout before starting execution
56        self.check_timeout()?;
57
58        // Check subquery depth limit to prevent stack overflow
59        if self.subquery_depth >= crate::limits::MAX_EXPRESSION_DEPTH {
60            return Err(ExecutorError::ExpressionDepthExceeded {
61                depth: self.subquery_depth,
62                max_depth: crate::limits::MAX_EXPRESSION_DEPTH,
63            });
64        }
65
66        // Fast path for simple point-lookup queries (TPC-C optimization)
67        // This bypasses expensive optimizer passes for queries like:
68        // SELECT col FROM table WHERE pk = value
69        if self.subquery_depth == 0
70            && self.outer_row.is_none()
71            && self.cte_context.is_none()
72            && super::fast_path::is_simple_point_query(stmt)
73        {
74            return self.execute_fast_path(stmt);
75        }
76
77        // Streaming aggregate fast path (#3815)
78        // For queries like: SELECT SUM(k) FROM sbtest1 WHERE id BETWEEN ? AND ?
79        // Accumulates aggregates inline during PK range scan without materializing rows
80        if self.subquery_depth == 0
81            && self.outer_row.is_none()
82            && self.cte_context.is_none()
83            && super::fast_path::is_streaming_aggregate_query(stmt)
84        {
85            if let Ok(result) = self.execute_streaming_aggregate(stmt) {
86                return Ok(result);
87            }
88            // Fall through to standard path if streaming aggregate fails
89        }
90
91        #[cfg(feature = "profile-q6")]
92        let _setup_time = execute_start.elapsed();
93
94        // Apply subquery rewriting optimizations (Phase 2 of IN subquery optimization)
95        // - Rewrites correlated IN → EXISTS with LIMIT 1 for early termination
96        // - Adds DISTINCT to uncorrelated IN subqueries to reduce duplicate processing
97        // This works in conjunction with Phase 1 (HashSet optimization, #2136)
98        #[cfg(feature = "profile-q6")]
99        let optimizer_start = std::time::Instant::now();
100
101        let optimized_stmt = crate::optimizer::rewrite_subquery_optimizations(stmt);
102
103        #[cfg(feature = "profile-q6")]
104        let _optimizer_time = optimizer_start.elapsed();
105
106        // Eliminate unused tables that create unnecessary cross joins (#3556)
107        // Must run BEFORE semi-join transformation to avoid complex interactions
108        // with derived tables from EXISTS/IN transformations
109        let optimized_stmt = crate::optimizer::eliminate_unused_tables(&optimized_stmt);
110
111        // Transform decorrelated IN/EXISTS subqueries to semi/anti-joins (#2424)
112        // This enables hash-based join execution instead of row-by-row subquery evaluation
113        // Converts WHERE clauses like "WHERE x IN (SELECT y FROM t)" to "SEMI JOIN t ON x = y"
114        let optimized_stmt = crate::optimizer::transform_subqueries_to_joins(&optimized_stmt);
115
116        // Execute CTEs if present and merge with outer query's CTE context
117        let mut cte_results = if let Some(with_clause) = &optimized_stmt.with_clause {
118            // This query has its own CTEs - execute them with memory tracking
119            execute_ctes_with_memory_check(
120                with_clause,
121                |query, cte_ctx| self.execute_with_ctes(query, cte_ctx),
122                |size| self.track_memory_allocation(size),
123            )?
124        } else {
125            HashMap::new()
126        };
127
128        // If we have access to outer query's CTEs (for subqueries), merge them in
129        // Local CTEs take precedence over outer CTEs if there are name conflicts
130        if let Some(outer_cte_ctx) = self.cte_context {
131            for (name, result) in outer_cte_ctx {
132                cte_results.entry(name.clone()).or_insert_with(|| result.clone());
133            }
134        }
135
136        #[cfg(feature = "profile-q6")]
137        let _pre_execute_time = execute_start.elapsed();
138
139        // Execute the main query with CTE context
140        let result = self.execute_with_ctes(&optimized_stmt, &cte_results)?;
141
142        #[cfg(feature = "profile-q6")]
143        {
144            let _total_execute = execute_start.elapsed();
145        }
146
147        Ok(result)
148    }
149
150    /// Execute a SELECT statement and return an iterator over results
151    ///
152    /// This enables early termination when the full result set is not needed,
153    /// such as for IN subqueries where we stop after finding the first match.
154    ///
155    /// # Phase 1 Implementation (Early Termination for IN subqueries)
156    ///
157    /// Current implementation materializes results then returns an iterator.
158    /// This still enables early termination in the consumer (e.g., eval_in_subquery)
159    /// by stopping iteration when a match is found.
160    ///
161    /// Future optimization: Leverage the existing RowIterator infrastructure
162    /// (crate::select::iterator) for truly lazy evaluation that stops execution
163    /// early, not just iteration.
164    pub fn execute_iter(
165        &self,
166        stmt: &vibesql_ast::SelectStmt,
167    ) -> Result<impl Iterator<Item = vibesql_storage::Row>, ExecutorError> {
168        // For Phase 1, materialize then return iterator
169        // This still enables early termination in the consumer
170        let rows = self.execute(stmt)?;
171        Ok(rows.into_iter())
172    }
173
174    /// Execute a SELECT statement using the fast path directly
175    ///
176    /// This method is used by prepared statements with cached SimpleFastPath plans.
177    /// It bypasses the `is_simple_point_query()` check because the eligibility was
178    /// already determined at prepare time.
179    ///
180    /// # Performance
181    ///
182    /// For repeated execution of prepared statements, this saves the cost of
183    /// re-checking fast path eligibility on every execution (~5-10µs per query).
184    pub fn execute_fast_path_with_columns(
185        &self,
186        stmt: &vibesql_ast::SelectStmt,
187    ) -> Result<SelectResult, ExecutorError> {
188        // Reset arena for fresh query execution
189        if self.subquery_depth == 0 {
190            self.reset_arena();
191        }
192
193        // Check timeout before starting execution
194        self.check_timeout()?;
195
196        // Execute via fast path directly (skip is_simple_point_query check)
197        let rows = self.execute_fast_path(stmt)?;
198
199        // Derive column names from the SELECT list
200        // For fast path queries, we don't have a FromResult, so pass None
201        // The column derivation will use the SELECT list expressions directly
202        let columns = self.derive_fast_path_column_names(stmt)?;
203
204        Ok(SelectResult { columns, rows })
205    }
206
207    /// Derive column names for fast path execution
208    ///
209    /// For fast path queries, we derive column names directly from the SELECT list
210    /// and table schema without going through the full FROM clause execution.
211    ///
212    /// # Performance Note (#3780)
213    ///
214    /// This method is called by `Session::execute_prepared()` to cache column names
215    /// in `SimpleFastPathPlan`. After the first execution, cached column names are
216    /// reused to avoid repeated table lookups and column name derivation.
217    pub fn derive_fast_path_column_names(
218        &self,
219        stmt: &vibesql_ast::SelectStmt,
220    ) -> Result<Vec<String>, ExecutorError> {
221        use vibesql_ast::{FromClause, SelectItem};
222
223        // Get table name and schema for column resolution
224        let (table_name, table_alias) = match &stmt.from {
225            Some(FromClause::Table { name, alias, .. }) => (name.as_str(), alias.as_deref()),
226            _ => {
227                return Err(ExecutorError::Other(
228                    "Fast path requires simple table FROM clause".to_string(),
229                ))
230            }
231        };
232
233        let table = self.database.get_table(table_name).ok_or_else(|| {
234            ExecutorError::TableNotFound(table_name.to_string())
235        })?;
236
237        let mut columns = Vec::with_capacity(stmt.select_list.len());
238
239        for item in &stmt.select_list {
240            match item {
241                SelectItem::Wildcard { .. } => {
242                    // Add all columns from the table
243                    for col in &table.schema.columns {
244                        columns.push(col.name.clone());
245                    }
246                }
247                SelectItem::QualifiedWildcard { qualifier, .. } => {
248                    // Check if qualifier matches table name or alias
249                    let effective_name = table_alias.unwrap_or(table_name);
250                    if qualifier.eq_ignore_ascii_case(effective_name)
251                        || qualifier.eq_ignore_ascii_case(table_name)
252                    {
253                        for col in &table.schema.columns {
254                            columns.push(col.name.clone());
255                        }
256                    }
257                }
258                SelectItem::Expression { expr, alias: col_alias } => {
259                    // Use alias if provided, otherwise derive from expression
260                    let col_name = if let Some(a) = col_alias {
261                        a.clone()
262                    } else {
263                        self.derive_column_name_from_expr(expr)
264                    };
265                    columns.push(col_name);
266                }
267            }
268        }
269
270        Ok(columns)
271    }
272
273    /// Derive a column name from an expression
274    fn derive_column_name_from_expr(&self, expr: &vibesql_ast::Expression) -> String {
275        match expr {
276            vibesql_ast::Expression::ColumnRef { column, .. } => column.clone(),
277            vibesql_ast::Expression::Literal(val) => format!("{}", val),
278            _ => "?column?".to_string(),
279        }
280    }
281
282    /// Execute a SELECT statement and return both columns and rows
283    pub fn execute_with_columns(
284        &self,
285        stmt: &vibesql_ast::SelectStmt,
286    ) -> Result<SelectResult, ExecutorError> {
287        // First, get the FROM result to access the schema
288        let from_result = if let Some(from_clause) = &stmt.from {
289            let mut cte_results = if let Some(with_clause) = &stmt.with_clause {
290                execute_ctes(with_clause, |query, cte_ctx| self.execute_with_ctes(query, cte_ctx))?
291            } else {
292                HashMap::new()
293            };
294            // If we have access to outer query's CTEs (for subqueries/derived tables), merge them in
295            // Local CTEs take precedence over outer CTEs if there are name conflicts
296            // This is critical for queries like TPC-DS Q2 where CTEs are referenced from derived tables
297            if let Some(outer_cte_ctx) = self.cte_context {
298                for (name, result) in outer_cte_ctx {
299                    cte_results.entry(name.clone()).or_insert_with(|| result.clone());
300                }
301            }
302            // Pass WHERE, ORDER BY, and LIMIT for optimizations
303            // This is critical for GROUP BY queries to avoid CROSS JOINs
304            // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
305            // Pass select_list for table elimination optimization (#3556)
306            Some(self.execute_from_with_where(
307                from_clause,
308                &cte_results,
309                stmt.where_clause.as_ref(),
310                stmt.order_by.as_deref(),
311                stmt.limit,
312                Some(&stmt.select_list),
313            )?)
314        } else {
315            None
316        };
317
318        // Derive column names from the SELECT list
319        let columns = self.derive_column_names(&stmt.select_list, from_result.as_ref())?;
320
321        // Execute the query to get rows
322        let rows = self.execute(stmt)?;
323
324        Ok(SelectResult { columns, rows })
325    }
326
327    /// Execute SELECT statement with CTE context
328    ///
329    /// Uses unified strategy selection to determine the optimal execution path:
330    /// - NativeColumnar: Zero-copy SIMD execution from columnar storage
331    /// - StandardColumnar: SIMD execution with row-to-batch conversion
332    /// - RowOriented: Traditional row-by-row execution
333    /// - ExpressionOnly: SELECT without FROM clause (special case)
334    ///
335    /// ## Pipeline-Based Execution (Phase 5)
336    ///
337    /// This method uses the `ExecutionPipeline` trait to provide a unified interface
338    /// for query execution. Each strategy creates an appropriate pipeline that
339    /// implements filter, projection, aggregation, and limit/offset operations.
340    ///
341    /// ```text
342    /// Strategy Selection → Create Pipeline → Execute via Trait Methods
343    ///                              ↓
344    ///   NativeColumnar  → NativeColumnarPipeline::apply_*()
345    ///   StandardColumnar → ColumnarPipeline::apply_*()
346    ///   RowOriented     → RowOrientedPipeline::apply_*()
347    ///   ExpressionOnly  → Special case (no table scan)
348    /// ```
349    pub(super) fn execute_with_ctes(
350        &self,
351        stmt: &vibesql_ast::SelectStmt,
352        cte_results: &HashMap<String, CteResult>,
353    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
354        #[cfg(feature = "profile-q6")]
355        let _execute_ctes_start = std::time::Instant::now();
356
357        // Check if native columnar is enabled via feature flag or env var
358        let native_columnar_enabled =
359            cfg!(feature = "native-columnar") || std::env::var("VIBESQL_NATIVE_COLUMNAR").is_ok();
360
361        // Use unified strategy selection for the execution path
362        let strategy_ctx = StrategyContext::new(stmt, cte_results, native_columnar_enabled);
363        let strategy = choose_execution_strategy(&strategy_ctx);
364
365        log::debug!(
366            "Execution strategy selected: {} (reason: {})",
367            strategy.name(),
368            strategy.score().reason
369        );
370
371        #[cfg(feature = "profile-q6")]
372        eprintln!(
373            "[PROFILE-Q6] Execution strategy: {} ({})",
374            strategy.name(),
375            strategy.score().reason
376        );
377
378        // Dispatch based on selected strategy using ExecutionPipeline trait
379        // Pipeline execution returns Option<Vec<Row>> - None means fallback needed
380        let mut results = match strategy {
381            ExecutionStrategy::NativeColumnar { .. } => {
382                // First try the optimized zero-copy native columnar path
383                // This uses ColumnarBatch::from_storage_columnar() for zero-copy conversion
384                // and executes filter+aggregate in a single pass without row materialization
385                if let Some(result) = self.try_native_columnar_execution(stmt, cte_results)? {
386                    #[cfg(feature = "profile-q6")]
387                    eprintln!("[PROFILE-Q6] Native columnar: zero-copy path succeeded");
388                    result
389                } else {
390                    // Fall back to pipeline-based execution if zero-copy path is not applicable
391                    // (e.g., complex predicates, multiple tables, unsupported aggregates)
392                    log::debug!("Native columnar: zero-copy path not applicable, trying pipeline");
393                    match self.execute_via_pipeline(
394                        stmt,
395                        cte_results,
396                        NativeColumnarPipeline::new,
397                        "NativeColumnar",
398                    )? {
399                        Some(result) => result,
400                        None => {
401                            // Fall back to row-oriented if pipeline also fails
402                            log::debug!("Native columnar runtime fallback to row-oriented");
403                            #[cfg(feature = "profile-q6")]
404                            eprintln!("[PROFILE-Q6] Native columnar fallback to row-oriented");
405                            self.execute_row_oriented(stmt, cte_results)?
406                        }
407                    }
408                }
409            }
410
411            ExecutionStrategy::StandardColumnar { .. } => {
412                // StandardColumnar uses the pipeline-based execution path
413                // Note: We don't use try_native_columnar_execution here because row tables
414                // go through the pipeline which correctly handles all data types including dates.
415                // The native columnar zero-copy path has known limitations with certain date comparisons.
416                match self.execute_via_pipeline(
417                    stmt,
418                    cte_results,
419                    ColumnarPipeline::new,
420                    "StandardColumnar",
421                )? {
422                    Some(result) => result,
423                    None => {
424                        log::debug!("Standard columnar runtime fallback to row-oriented");
425                        #[cfg(feature = "profile-q6")]
426                        eprintln!("[PROFILE-Q6] Standard columnar fallback to row-oriented");
427                        self.execute_row_oriented(stmt, cte_results)?
428                    }
429                }
430            }
431
432            ExecutionStrategy::RowOriented { .. } => {
433                // Row-oriented uses the traditional path which has full feature support
434                // The RowOrientedPipeline is used for simpler queries, but complex
435                // queries (with JOINs, window functions, DISTINCT, etc.) need the
436                // full execute_row_oriented implementation
437
438                // Phase 4: Try columnar join execution for multi-table JOIN queries (#2943)
439                // This provides 3-5x speedup for TPC-H Q3 style queries
440                let has_joins = stmt
441                    .from
442                    .as_ref()
443                    .is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
444                if has_joins {
445                    if let Some(result) = self.try_columnar_join_execution(stmt, cte_results)? {
446                        log::info!("Columnar join execution succeeded");
447                        // Apply LIMIT/OFFSET to columnar join results (#3776)
448                        // Skip if set_operation exists - it will be applied later
449                        if stmt.set_operation.is_none() {
450                            apply_limit_offset(result, stmt.limit, stmt.offset)
451                        } else {
452                            result
453                        }
454                    } else {
455                        log::debug!(
456                            "Columnar join execution not applicable, falling back to row-oriented"
457                        );
458                        self.execute_row_oriented(stmt, cte_results)?
459                    }
460                } else {
461                    self.execute_row_oriented(stmt, cte_results)?
462                }
463            }
464
465            ExecutionStrategy::ExpressionOnly { .. } => {
466                // SELECT without FROM - special case that doesn't use pipelines
467                // May still have aggregates (e.g., SELECT COUNT(*), SELECT MAX(1))
468                // Note: Do NOT use early return here - we need to fall through to set operations handling
469                self.execute_expression_only(stmt, cte_results)?
470            }
471        };
472
473        // Handle set operations (UNION, INTERSECT, EXCEPT)
474        // Process operations left-to-right to ensure correct associativity
475        if let Some(set_op) = &stmt.set_operation {
476            results = self.execute_set_operations(results, set_op, cte_results)?;
477
478            // Apply LIMIT/OFFSET to the final result (after all set operations)
479            // For queries WITHOUT set operations, LIMIT/OFFSET is already applied
480            // in execute_without_aggregation() or execute_with_aggregation()
481            results = apply_limit_offset(results, stmt.limit, stmt.offset);
482        }
483
484        Ok(results)
485    }
486
487    /// Execute SELECT without FROM clause (ExpressionOnly strategy)
488    ///
489    /// This is a special case that doesn't use the pipeline trait since there's
490    /// no table scan involved. Handles both simple expressions and aggregates.
491    fn execute_expression_only(
492        &self,
493        stmt: &vibesql_ast::SelectStmt,
494        cte_results: &HashMap<String, CteResult>,
495    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
496        let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
497
498        if has_aggregates {
499            // Aggregates without FROM need the aggregation path
500            self.execute_with_aggregation(stmt, cte_results)
501        } else {
502            // Simple expression evaluation (e.g., SELECT 1 + 1)
503            self.execute_select_without_from(stmt)
504        }
505    }
506
507    /// Execute a query using the specified execution pipeline
508    ///
509    /// This method provides a unified interface for pipeline-based execution.
510    /// It creates the pipeline, prepares input, and executes the pipeline stages.
511    ///
512    /// Returns `Ok(Some(results))` if the pipeline executed successfully,
513    /// `Ok(None)` if the pipeline cannot handle the query (fallback needed),
514    /// or `Err` if an error occurred.
515    ///
516    /// # Type Parameters
517    ///
518    /// * `P` - The pipeline type (must implement `ExecutionPipeline`)
519    /// * `F` - Factory function to create the pipeline
520    fn execute_via_pipeline<P, F>(
521        &self,
522        stmt: &vibesql_ast::SelectStmt,
523        cte_results: &HashMap<String, CteResult>,
524        create_pipeline: F,
525        strategy_name: &str,
526    ) -> Result<Option<Vec<vibesql_storage::Row>>, ExecutorError>
527    where
528        P: ExecutionPipeline,
529        F: FnOnce() -> P,
530    {
531        #[cfg(feature = "profile-q6")]
532        let start = std::time::Instant::now();
533
534        // Check query complexity - pipelines don't support all features
535        let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
536        let has_group_by = stmt.group_by.is_some();
537        let has_joins =
538            stmt.from.as_ref().is_some_and(|f| matches!(f, vibesql_ast::FromClause::Join { .. }));
539        let has_order_by = stmt.order_by.is_some();
540        let has_distinct = stmt.distinct;
541        let has_set_ops = stmt.set_operation.is_some();
542        let has_window_funcs = self.has_window_functions(&stmt.select_list);
543        let has_distinct_aggregates = self.has_distinct_aggregates(&stmt.select_list);
544
545        // Create the pipeline
546        let pipeline = create_pipeline();
547
548        // Check if the pipeline supports this query pattern
549        if !pipeline.supports_query_pattern(has_aggregates, has_group_by, has_joins) {
550            log::debug!(
551                "{} pipeline doesn't support query pattern (agg={}, group_by={}, joins={})",
552                strategy_name,
553                has_aggregates,
554                has_group_by,
555                has_joins
556            );
557            return Ok(None);
558        }
559
560        // For complex queries (ORDER BY, DISTINCT, window functions, set ops, DISTINCT aggregates),
561        // fall back to full execution paths which have complete support
562        if has_order_by
563            || has_distinct
564            || has_window_funcs
565            || has_set_ops
566            || has_distinct_aggregates
567        {
568            log::debug!(
569                "{} pipeline doesn't support complex features (order_by={}, distinct={}, window={}, set_ops={}, distinct_agg={})",
570                strategy_name,
571                has_order_by,
572                has_distinct,
573                has_window_funcs,
574                has_set_ops,
575                has_distinct_aggregates
576            );
577            return Ok(None);
578        }
579
580        // Must have a FROM clause for pipeline execution
581        let from_clause = match &stmt.from {
582            Some(from) => from,
583            None => return Ok(None),
584        };
585
586        // Execute FROM clause to get input data
587        // Note: WHERE, ORDER BY, and LIMIT are handled by the pipeline, not here
588        // Note: Table elimination requires WHERE clause, so pass None for select_list too
589        let from_result = self.execute_from_with_where(
590            from_clause,
591            cte_results,
592            None, // Pipeline will apply WHERE filter
593            None, // ORDER BY handled separately
594            None, // LIMIT applied after pipeline
595            None, // No table elimination when WHERE is deferred
596        )?;
597
598        // Build execution context
599        let mut exec_ctx = ExecutionContext::new(&from_result.schema, self.database);
600        // Add outer context for correlated subqueries (#2998)
601        if let (Some(outer_row), Some(outer_schema)) = (self.outer_row, self.outer_schema) {
602            exec_ctx = exec_ctx.with_outer_context(outer_row, outer_schema);
603        }
604        // Add CTE context if available
605        if !cte_results.is_empty() {
606            exec_ctx = exec_ctx.with_cte_context(cte_results);
607        }
608
609        // Validate column references BEFORE processing
610        super::validation::validate_select_columns_with_context(
611            &stmt.select_list,
612            stmt.where_clause.as_ref(),
613            &from_result.schema,
614            self.procedural_context,
615            self.outer_schema,
616        )?;
617
618        // Prepare input from FROM result
619        let input = PipelineInput::from_rows_owned(from_result.data.into_rows());
620
621        // Execute pipeline stages with fallback on error
622        // If any pipeline stage fails with UnsupportedFeature, fall back to row-oriented
623
624        // Stage 1: Filter (WHERE clause)
625        let filtered = match pipeline.apply_filter(input, stmt.where_clause.as_ref(), &exec_ctx) {
626            Ok(result) => result,
627            Err(ExecutorError::UnsupportedFeature(_))
628            | Err(ExecutorError::UnsupportedExpression(_)) => {
629                log::debug!("{} pipeline filter failed, falling back", strategy_name);
630                return Ok(None);
631            }
632            Err(e) => return Err(e),
633        };
634
635        // Stage 2: Projection or Aggregation
636        let result = if has_aggregates || has_group_by {
637            // Execute aggregation (includes projection)
638            // Get GROUP BY expressions if present (as slice)
639            let group_by_slice: Option<&[vibesql_ast::Expression]> =
640                stmt.group_by.as_ref().and_then(|g| g.as_simple()).map(|v| v.as_slice());
641            match pipeline.apply_aggregation(
642                filtered.into_input(),
643                &stmt.select_list,
644                group_by_slice,
645                stmt.having.as_ref(),
646                &exec_ctx,
647            ) {
648                Ok(result) => result,
649                Err(ExecutorError::UnsupportedFeature(_))
650                | Err(ExecutorError::UnsupportedExpression(_)) => {
651                    log::debug!("{} pipeline aggregation failed, falling back", strategy_name);
652                    return Ok(None);
653                }
654                Err(e) => return Err(e),
655            }
656        } else {
657            // Execute projection only
658            match pipeline.apply_projection(filtered.into_input(), &stmt.select_list, &exec_ctx) {
659                Ok(result) => result,
660                Err(ExecutorError::UnsupportedFeature(_))
661                | Err(ExecutorError::UnsupportedExpression(_)) => {
662                    log::debug!("{} pipeline projection failed, falling back", strategy_name);
663                    return Ok(None);
664                }
665                Err(e) => return Err(e),
666            }
667        };
668
669        // Stage 3: Limit/Offset (convert usize to u64)
670        let limit_u64 = stmt.limit.map(|l| l as u64);
671        let offset_u64 = stmt.offset.map(|o| o as u64);
672        let final_result = pipeline.apply_limit_offset(result, limit_u64, offset_u64)?;
673
674        #[cfg(feature = "profile-q6")]
675        {
676            eprintln!("[PROFILE-Q6] ✓ {} pipeline execution: {:?}", strategy_name, start.elapsed());
677        }
678
679        log::debug!("✓ {} pipeline execution succeeded", strategy_name);
680        Ok(Some(final_result))
681    }
682
683    /// Check if the select list contains window functions
684    fn has_window_functions(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
685        select_list.iter().any(|item| {
686            if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
687                self.expr_has_window_function(expr)
688            } else {
689                false
690            }
691        })
692    }
693
694    /// Recursively check if an expression contains a window function
695    #[allow(clippy::only_used_in_recursion)]
696    fn expr_has_window_function(&self, expr: &vibesql_ast::Expression) -> bool {
697        match expr {
698            vibesql_ast::Expression::WindowFunction { .. } => true,
699            vibesql_ast::Expression::BinaryOp { left, right, .. } => {
700                self.expr_has_window_function(left) || self.expr_has_window_function(right)
701            }
702            vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_window_function(expr),
703            vibesql_ast::Expression::Function { args, .. } => {
704                args.iter().any(|arg| self.expr_has_window_function(arg))
705            }
706            vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
707                operand.as_ref().is_some_and(|e| self.expr_has_window_function(e))
708                    || when_clauses.iter().any(|case_when| {
709                        case_when.conditions.iter().any(|c| self.expr_has_window_function(c))
710                            || self.expr_has_window_function(&case_when.result)
711                    })
712                    || else_result.as_ref().is_some_and(|e| self.expr_has_window_function(e))
713            }
714            _ => false,
715        }
716    }
717
718    /// Check if the select list contains any DISTINCT aggregates (e.g., COUNT(DISTINCT x))
719    fn has_distinct_aggregates(&self, select_list: &[vibesql_ast::SelectItem]) -> bool {
720        select_list.iter().any(|item| {
721            if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
722                self.expr_has_distinct_aggregate(expr)
723            } else {
724                false
725            }
726        })
727    }
728
729    /// Recursively check if an expression contains a DISTINCT aggregate
730    #[allow(clippy::only_used_in_recursion)]
731    fn expr_has_distinct_aggregate(&self, expr: &vibesql_ast::Expression) -> bool {
732        match expr {
733            vibesql_ast::Expression::AggregateFunction { distinct, .. } => *distinct,
734            vibesql_ast::Expression::BinaryOp { left, right, .. } => {
735                self.expr_has_distinct_aggregate(left) || self.expr_has_distinct_aggregate(right)
736            }
737            vibesql_ast::Expression::UnaryOp { expr, .. } => self.expr_has_distinct_aggregate(expr),
738            vibesql_ast::Expression::Function { args, .. } => {
739                args.iter().any(|arg| self.expr_has_distinct_aggregate(arg))
740            }
741            vibesql_ast::Expression::Case { operand, when_clauses, else_result } => {
742                operand.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
743                    || when_clauses.iter().any(|case_when| {
744                        case_when.conditions.iter().any(|c| self.expr_has_distinct_aggregate(c))
745                            || self.expr_has_distinct_aggregate(&case_when.result)
746                    })
747                    || else_result.as_ref().is_some_and(|e| self.expr_has_distinct_aggregate(e))
748            }
749            _ => false,
750        }
751    }
752
753    /// Execute using traditional row-oriented path
754    ///
755    /// This is the fallback path when columnar execution is not available or not beneficial.
756    fn execute_row_oriented(
757        &self,
758        stmt: &vibesql_ast::SelectStmt,
759        cte_results: &HashMap<String, CteResult>,
760    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
761        let has_aggregates = self.has_aggregates(&stmt.select_list) || stmt.having.is_some();
762        let has_group_by = stmt.group_by.is_some();
763
764        if has_aggregates || has_group_by {
765            self.execute_with_aggregation(stmt, cte_results)
766        } else if let Some(from_clause) = &stmt.from {
767            // Re-enabled predicate pushdown for all queries (issue #1902)
768            //
769            // Previously, predicate pushdown was selectively disabled for multi-column IN clauses
770            // because index optimization happened in execute_without_aggregation() on row indices
771            // from the FROM result. When predicate pushdown filtered rows early, the indices no
772            // longer matched the original table, causing incorrect results.
773            //
774            // Now that all index optimization has been moved to the scan level (execute_index_scan),
775            // it happens BEFORE predicate pushdown, avoiding the row-index mismatch problem.
776            // This allows predicate pushdown to work correctly for all queries, improving performance.
777            //
778            // Fixes issues #1807, #1895, #1896, and #1902.
779
780            // Pass WHERE, ORDER BY, and LIMIT to execute_from for optimization
781            // LIMIT enables early termination when ORDER BY is satisfied by index (#3253)
782            // Pass select_list for table elimination optimization (#3556)
783            let from_result = self.execute_from_with_where(
784                from_clause,
785                cte_results,
786                stmt.where_clause.as_ref(),
787                stmt.order_by.as_deref(),
788                stmt.limit,
789                Some(&stmt.select_list),
790            )?;
791
792            // Validate column references BEFORE processing rows (issue #2654)
793            // This ensures column errors are caught even when tables are empty
794            // Pass procedural context to allow procedure variables in WHERE clause
795            // Pass outer_schema for correlated subqueries (#2694)
796            super::validation::validate_select_columns_with_context(
797                &stmt.select_list,
798                stmt.where_clause.as_ref(),
799                &from_result.schema,
800                self.procedural_context,
801                self.outer_schema,
802            )?;
803
804            self.execute_without_aggregation(stmt, from_result, cte_results)
805        } else {
806            // SELECT without FROM - evaluate expressions as a single row
807            self.execute_select_without_from(stmt)
808        }
809    }
810
811    /// Execute a chain of set operations left-to-right
812    ///
813    /// SQL set operations are left-associative, so:
814    /// A EXCEPT B EXCEPT C should evaluate as (A EXCEPT B) EXCEPT C
815    ///
816    /// The parser creates a right-recursive AST structure, but we need to execute left-to-right.
817    fn execute_set_operations(
818        &self,
819        mut left_results: Vec<vibesql_storage::Row>,
820        set_op: &vibesql_ast::SetOperation,
821        cte_results: &HashMap<String, CteResult>,
822    ) -> Result<Vec<vibesql_storage::Row>, ExecutorError> {
823        // Execute the immediate right query WITHOUT its set operations
824        // This prevents right-recursive evaluation
825        let right_stmt = &set_op.right;
826        let has_aggregates =
827            self.has_aggregates(&right_stmt.select_list) || right_stmt.having.is_some();
828        let has_group_by = right_stmt.group_by.is_some();
829
830        let right_results = if has_aggregates || has_group_by {
831            self.execute_with_aggregation(right_stmt, cte_results)?
832        } else if let Some(from_clause) = &right_stmt.from {
833            // Note: LIMIT is None for set operation sides - it's applied after the set operation
834            // Pass select_list for table elimination optimization (#3556)
835            let from_result = self.execute_from_with_where(
836                from_clause,
837                cte_results,
838                right_stmt.where_clause.as_ref(),
839                right_stmt.order_by.as_deref(),
840                None,
841                Some(&right_stmt.select_list),
842            )?;
843            self.execute_without_aggregation(right_stmt, from_result, cte_results)?
844        } else {
845            self.execute_select_without_from(right_stmt)?
846        };
847
848        // Track memory for right result before set operation
849        let right_size = estimate_result_size(&right_results);
850        self.track_memory_allocation(right_size)?;
851
852        // Apply the current operation
853        left_results = apply_set_operation(left_results, right_results, set_op)?;
854
855        // Track memory for combined result after set operation
856        let combined_size = estimate_result_size(&left_results);
857        self.track_memory_allocation(combined_size)?;
858
859        // If the right side has more set operations, continue processing them
860        // This creates the left-to-right evaluation: ((A op B) op C) op D
861        if let Some(next_set_op) = &right_stmt.set_operation {
862            left_results = self.execute_set_operations(left_results, next_set_op, cte_results)?;
863        }
864
865        Ok(left_results)
866    }
867
868    /// Execute a FROM clause with WHERE, ORDER BY, and LIMIT for optimization
869    ///
870    /// The LIMIT parameter enables early termination optimization (#3253):
871    /// - When ORDER BY is satisfied by an index and no post-filter is needed,
872    ///   the index scan can stop after fetching LIMIT rows
873    ///
874    /// Note: Table elimination (#3556) is now handled at the optimizer level
875    /// via crate::optimizer::eliminate_unused_tables(), which runs before
876    /// semi-join transformation to avoid complex interactions.
877    pub(super) fn execute_from_with_where(
878        &self,
879        from: &vibesql_ast::FromClause,
880        cte_results: &HashMap<String, CteResult>,
881        where_clause: Option<&vibesql_ast::Expression>,
882        order_by: Option<&[vibesql_ast::OrderByItem]>,
883        limit: Option<usize>,
884        _select_list: Option<&[vibesql_ast::SelectItem]>, // No longer used - optimization moved to optimizer pass
885    ) -> Result<FromResult, ExecutorError> {
886        use crate::select::scan::execute_from_clause;
887
888        let from_result = execute_from_clause(
889            from,
890            cte_results,
891            self.database,
892            where_clause,
893            order_by,
894            limit,
895            self.outer_row,
896            self.outer_schema,
897            |query| {
898                // For derived table subqueries, create a child executor with CTE context
899                // This allows CTEs from the outer WITH clause to be referenced in subqueries
900                // Critical for queries like TPC-DS Q2 where CTEs are used in FROM subqueries
901                if !cte_results.is_empty() {
902                    let child = SelectExecutor::new_with_cte_and_depth(
903                        self.database,
904                        cte_results,
905                        self.subquery_depth,
906                    );
907                    child.execute_with_columns(query)
908                } else {
909                    self.execute_with_columns(query)
910                }
911            },
912        )?;
913
914        // NOTE: We DON'T merge outer schema with from_result.schema here because:
915        // 1. from_result.rows only contain values from inner tables
916        // 2. Outer columns are resolved via the evaluator's outer_row/outer_schema
917        // 3. Merging would create schema/row mismatch (schema has outer cols, rows don't)
918
919        Ok(from_result)
920    }
921}