vibesql_executor/select/join/
mod.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use super::{cte::CteResult, from_iterator::FromIterator};
5use crate::{
6    errors::ExecutorError, evaluator::CombinedExpressionEvaluator, optimizer::combine_with_and,
7    schema::CombinedSchema, timeout::TimeoutContext,
8};
9
10mod expression_mapper;
11mod hash_anti_join;
12pub(crate) mod hash_join;
13mod hash_join_iterator;
14mod hash_semi_join;
15mod join_analyzer;
16mod nested_loop;
17pub mod reorder;
18pub mod search;
19
20#[cfg(test)]
21mod tests;
22
23// Re-export join reorder analyzer for public tests
24// Re-export hash_join functions for internal use
25use hash_anti_join::{hash_anti_join, hash_anti_join_with_filter};
26use hash_join::{
27    hash_join_inner, hash_join_inner_arithmetic, hash_join_inner_multi, hash_join_left_outer,
28};
29use hash_semi_join::{hash_semi_join, hash_semi_join_with_filter};
30// Re-export hash join iterator for public use
31pub use hash_join_iterator::HashJoinIterator;
32// Re-export nested loop join variants for internal use
33use nested_loop::{
34    nested_loop_anti_join, nested_loop_cross_join, nested_loop_full_outer_join,
35    nested_loop_inner_join, nested_loop_left_outer_join, nested_loop_right_outer_join,
36    nested_loop_semi_join,
37};
38pub use reorder::JoinOrderAnalyzer;
39// Re-export join order search for public tests
40pub use search::JoinOrderSearch;
41
42/// Data source for FROM clause results
43///
44/// This enum allows FROM results to be either materialized (Vec<Row>) or lazy (iterator).
45/// Materialized results are used for JOINs, CTEs, and operations that need multiple passes.
46/// Lazy results are used for simple table scans to enable streaming execution.
47pub(super) enum FromData {
48    /// Materialized rows (for JOINs, CTEs, operations needing multiple passes)
49    Materialized(Vec<vibesql_storage::Row>),
50
51    /// Shared rows (for zero-copy CTE references without filtering)
52    ///
53    /// This variant enables O(1) cloning when CTEs are referenced multiple times
54    /// without any filtering. Only materializes to Vec when mutation is needed.
55    SharedRows(Arc<Vec<vibesql_storage::Row>>),
56
57    /// Lazy iterator (for streaming table scans)
58    Iterator(FromIterator),
59}
60
61impl FromData {
62    /// Get rows, materializing if needed
63    ///
64    /// For SharedRows, this will clone only if the Arc is shared.
65    /// If the Arc has a single reference, the Vec is moved out efficiently.
66    pub fn into_rows(self) -> Vec<vibesql_storage::Row> {
67        match self {
68            Self::Materialized(rows) => rows,
69            Self::SharedRows(arc) => Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()),
70            Self::Iterator(iter) => iter.collect_vec(),
71        }
72    }
73
74    /// Get a reference to materialized rows, or materialize if iterator
75    ///
76    /// For SharedRows, returns a reference to the shared data without cloning.
77    pub fn as_rows(&mut self) -> &Vec<vibesql_storage::Row> {
78        // If we have an iterator, materialize it
79        if let Self::Iterator(iter) = self {
80            #[cfg(feature = "profile-q6")]
81            let materialize_start = std::time::Instant::now();
82
83            let rows = std::mem::replace(iter, FromIterator::from_vec(vec![])).collect_vec();
84            *self = Self::Materialized(rows);
85
86            #[cfg(feature = "profile-q6")]
87            {
88                let materialize_time = materialize_start.elapsed();
89                if let Self::Materialized(rows) = self {
90                    eprintln!(
91                        "[Q6 PROFILE] Row materialization (collect_vec): {:?} ({} rows, {:?}/row)",
92                        materialize_time,
93                        rows.len(),
94                        materialize_time / rows.len() as u32
95                    );
96                }
97            }
98        }
99
100        // Now we're guaranteed to have materialized or shared rows
101        match self {
102            Self::Materialized(ref rows) => rows,
103            Self::SharedRows(ref arc) => arc.as_ref(),
104            Self::Iterator(_) => unreachable!(),
105        }
106    }
107
108    /// Get a slice reference to the underlying rows without triggering materialization
109    ///
110    /// This is a zero-cost operation that directly accesses the underlying Vec<Row>
111    /// without calling collect_vec(). This avoids the 137ms row materialization
112    /// bottleneck in Q6 by skipping iteration entirely.
113    ///
114    /// Critical for columnar execution performance (#2521).
115    pub fn as_slice(&self) -> &[vibesql_storage::Row] {
116        match self {
117            Self::Materialized(rows) => rows.as_slice(),
118            Self::SharedRows(arc) => arc.as_slice(),
119            Self::Iterator(iter) => iter.as_slice(),
120        }
121    }
122}
123
124/// Result of executing a FROM clause
125///
126/// Contains the combined schema and data (either materialized or lazy).
127pub(super) struct FromResult {
128    pub(super) schema: CombinedSchema,
129    pub(super) data: FromData,
130    /// If present, indicates that results are already sorted by the specified columns
131    /// in the given order (ASC/DESC). This allows skipping ORDER BY sorting.
132    pub(super) sorted_by: Option<Vec<(String, vibesql_ast::OrderDirection)>>,
133    /// If true, indicates that WHERE clause filtering has already been fully applied
134    /// during the scan (e.g., by index scan with predicate pushdown). This allows
135    /// skipping redundant WHERE clause evaluation in the executor.
136    pub(super) where_filtered: bool,
137}
138
139impl FromResult {
140    /// Create a FromResult from materialized rows
141    pub(super) fn from_rows(schema: CombinedSchema, rows: Vec<vibesql_storage::Row>) -> Self {
142        Self { schema, data: FromData::Materialized(rows), sorted_by: None, where_filtered: false }
143    }
144
145    /// Create a FromResult from shared rows (zero-copy for CTEs)
146    ///
147    /// This variant is used when CTE rows can be shared without cloning,
148    /// enabling O(1) memory usage for CTE references without filtering.
149    pub(super) fn from_shared_rows(
150        schema: CombinedSchema,
151        rows: Arc<Vec<vibesql_storage::Row>>,
152    ) -> Self {
153        Self { schema, data: FromData::SharedRows(rows), sorted_by: None, where_filtered: false }
154    }
155
156    /// Create a FromResult from materialized rows with sorting metadata
157    pub(super) fn from_rows_sorted(
158        schema: CombinedSchema,
159        rows: Vec<vibesql_storage::Row>,
160        sorted_by: Vec<(String, vibesql_ast::OrderDirection)>,
161    ) -> Self {
162        Self {
163            schema,
164            data: FromData::Materialized(rows),
165            sorted_by: Some(sorted_by),
166            where_filtered: false,
167        }
168    }
169
170    /// Create a FromResult from materialized rows with WHERE filtering already applied
171    pub(super) fn from_rows_where_filtered(
172        schema: CombinedSchema,
173        rows: Vec<vibesql_storage::Row>,
174        sorted_by: Option<Vec<(String, vibesql_ast::OrderDirection)>>,
175    ) -> Self {
176        Self { schema, data: FromData::Materialized(rows), sorted_by, where_filtered: true }
177    }
178
179    /// Create a FromResult from an iterator
180    pub(super) fn from_iterator(schema: CombinedSchema, iterator: FromIterator) -> Self {
181        Self { schema, data: FromData::Iterator(iterator), sorted_by: None, where_filtered: false }
182    }
183
184    /// Get the rows, materializing if needed
185    pub(super) fn into_rows(self) -> Vec<vibesql_storage::Row> {
186        self.data.into_rows()
187    }
188
189    /// Get a mutable reference to the rows, materializing if needed
190    ///
191    /// For SharedRows, this triggers copy-on-write: the shared data is cloned
192    /// into owned Materialized data to allow mutation.
193    #[allow(dead_code)]
194    pub(super) fn rows_mut(&mut self) -> &mut Vec<vibesql_storage::Row> {
195        // Convert iterator or shared to materialized
196        match &mut self.data {
197            FromData::Iterator(iter) => {
198                let rows = std::mem::replace(iter, FromIterator::from_vec(vec![])).collect_vec();
199                self.data = FromData::Materialized(rows);
200            }
201            FromData::SharedRows(arc) => {
202                // Copy-on-write: clone the shared data to allow mutation
203                let rows = arc.as_ref().clone();
204                self.data = FromData::Materialized(rows);
205            }
206            FromData::Materialized(_) => {}
207        }
208
209        // Now we're guaranteed to have materialized rows
210        match &mut self.data {
211            FromData::Materialized(rows) => rows,
212            FromData::SharedRows(_) | FromData::Iterator(_) => unreachable!(),
213        }
214    }
215
216    /// Get a reference to rows, materializing if needed
217    pub(super) fn rows(&mut self) -> &Vec<vibesql_storage::Row> {
218        self.data.as_rows()
219    }
220
221    /// Get a slice reference to rows without triggering materialization
222    ///
223    /// This is a zero-cost operation that accesses the underlying data directly
224    /// without calling collect_vec(). This is critical for performance as it
225    /// avoids the row materialization bottleneck (up to 57% of query time).
226    ///
227    /// Unlike `rows()` which may trigger iterator collection, this method
228    /// provides direct access to the underlying Vec<Row> or iterator buffer.
229    pub(super) fn as_slice(&self) -> &[vibesql_storage::Row] {
230        self.data.as_slice()
231    }
232}
233
234/// Helper function to combine two rows without unnecessary cloning
235/// Only creates a single combined row, avoiding intermediate clones
236#[inline]
237fn combine_rows(
238    left_row: &vibesql_storage::Row,
239    right_row: &vibesql_storage::Row,
240) -> vibesql_storage::Row {
241    let mut combined_values = Vec::with_capacity(left_row.values.len() + right_row.values.len());
242    combined_values.extend_from_slice(&left_row.values);
243    combined_values.extend_from_slice(&right_row.values);
244    vibesql_storage::Row::new(combined_values)
245}
246
247/// Apply a post-join filter expression to join result rows
248///
249/// This is used to filter rows produced by hash join with additional conditions
250/// from the WHERE clause that weren't used in the hash join itself.
251///
252/// Issue #3562: Added cte_results parameter so IN subqueries in filter expressions
253/// can resolve CTE references.
254fn apply_post_join_filter(
255    result: FromResult,
256    filter_expr: &vibesql_ast::Expression,
257    database: &vibesql_storage::Database,
258    cte_results: &HashMap<String, CteResult>,
259) -> Result<FromResult, ExecutorError> {
260    // Extract schema before moving result
261    let schema = result.schema.clone();
262    // Issue #3562: Use evaluator with CTE context if CTEs exist
263    let evaluator = if cte_results.is_empty() {
264        CombinedExpressionEvaluator::with_database(&schema, database)
265    } else {
266        CombinedExpressionEvaluator::with_database_and_cte(&schema, database, cte_results)
267    };
268
269    // Filter rows based on the expression
270    let mut filtered_rows = Vec::new();
271    for row in result.into_rows() {
272        match evaluator.eval(filter_expr, &row)? {
273            vibesql_types::SqlValue::Boolean(true) => filtered_rows.push(row),
274            vibesql_types::SqlValue::Boolean(false) => {} // Skip this row
275            vibesql_types::SqlValue::Null => {}           // Skip NULL results
276            // SQLLogicTest compatibility: treat integers as truthy/falsy
277            vibesql_types::SqlValue::Integer(0) => {} // Skip 0
278            vibesql_types::SqlValue::Integer(_) => filtered_rows.push(row),
279            vibesql_types::SqlValue::Smallint(0) => {} // Skip 0
280            vibesql_types::SqlValue::Smallint(_) => filtered_rows.push(row),
281            vibesql_types::SqlValue::Bigint(0) => {} // Skip 0
282            vibesql_types::SqlValue::Bigint(_) => filtered_rows.push(row),
283            vibesql_types::SqlValue::Float(0.0) => {} // Skip 0.0
284            vibesql_types::SqlValue::Float(_) => filtered_rows.push(row),
285            vibesql_types::SqlValue::Real(0.0) => {} // Skip 0.0
286            vibesql_types::SqlValue::Real(_) => filtered_rows.push(row),
287            vibesql_types::SqlValue::Double(0.0) => {} // Skip 0.0
288            vibesql_types::SqlValue::Double(_) => filtered_rows.push(row),
289            other => {
290                return Err(ExecutorError::InvalidWhereClause(format!(
291                    "Filter expression must evaluate to boolean, got: {:?}",
292                    other
293                )))
294            }
295        }
296    }
297
298    Ok(FromResult::from_rows(schema, filtered_rows))
299}
300
301/// Perform join between two FROM results, optimizing with hash join when possible
302///
303/// This function now supports predicate pushdown from WHERE clauses. Additional equijoin
304/// predicates from WHERE can be passed to optimize hash join selection and execution.
305///
306/// Note: This function combines rows from left and right according to the join type
307/// and join condition. For queries with many tables and large intermediate results,
308/// consider applying WHERE filters earlier to reduce memory usage.
309///
310/// Issue #3562: Added cte_results parameter so post-join filters with IN subqueries
311/// can resolve CTE references.
312#[allow(clippy::too_many_arguments)]
313pub(super) fn nested_loop_join(
314    left: FromResult,
315    right: FromResult,
316    join_type: &vibesql_ast::JoinType,
317    condition: &Option<vibesql_ast::Expression>,
318    natural: bool,
319    database: &vibesql_storage::Database,
320    additional_equijoins: &[vibesql_ast::Expression],
321    timeout_ctx: &TimeoutContext,
322    cte_results: &HashMap<String, CteResult>,
323) -> Result<FromResult, ExecutorError> {
324    // Try to use hash join for INNER JOINs with simple equi-join conditions
325    if let vibesql_ast::JoinType::Inner = join_type {
326        // Get column count and right table info once for analysis
327        // IMPORTANT: Sum up columns from ALL tables in the left schema,
328        // not just the first table, to handle accumulated multi-table joins
329        let left_col_count: usize =
330            left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
331
332        let right_table_name = right
333            .schema
334            .table_schemas
335            .keys()
336            .next()
337            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
338            .clone();
339
340        let right_schema = right
341            .schema
342            .table_schemas
343            .get(&right_table_name)
344            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
345            .1
346            .clone();
347
348        // Clone right_table_name before it gets moved into combine()
349        let right_table_name_for_natural = right_table_name.clone();
350
351        let temp_schema =
352            CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
353
354        // Phase 3.1: Try ON condition first (preferred for hash join)
355        // Now supports multi-column composite keys for better performance
356        if let Some(cond) = condition {
357            // First try multi-column hash join for composite keys (2+ equi-join conditions)
358            if let Some(multi_result) =
359                join_analyzer::analyze_multi_column_equi_join(cond, &temp_schema, left_col_count)
360            {
361                // Use multi-column hash join if there are 2+ join columns
362                if multi_result.equi_joins.left_col_indices.len() >= 2 {
363                    // Save schemas for NATURAL JOIN processing before moving left/right
364                    let (left_schema_for_natural, right_schema_for_natural) = if natural {
365                        (Some(left.schema.clone()), Some(right.schema.clone()))
366                    } else {
367                        (None, None)
368                    };
369
370                    let mut result = hash_join_inner_multi(
371                        left,
372                        right,
373                        &multi_result.equi_joins.left_col_indices,
374                        &multi_result.equi_joins.right_col_indices,
375                    )?;
376
377                    // Apply remaining conditions as post-join filter
378                    if !multi_result.remaining_conditions.is_empty() {
379                        if let Some(filter_expr) =
380                            combine_with_and(multi_result.remaining_conditions)
381                        {
382                            result = apply_post_join_filter(
383                                result,
384                                &filter_expr,
385                                database,
386                                cte_results,
387                            )?;
388                        }
389                    }
390
391                    // For NATURAL JOIN, remove duplicate columns from the result
392                    if natural {
393                        if let (Some(left_schema), Some(right_schema_orig)) =
394                            (left_schema_for_natural, right_schema_for_natural)
395                        {
396                            let right_schema_for_removal = CombinedSchema {
397                                table_schemas: vec![(
398                                    right_table_name_for_natural.clone(),
399                                    (
400                                        0,
401                                        right_schema_orig
402                                            .table_schemas
403                                            .values()
404                                            .next()
405                                            .unwrap()
406                                            .1
407                                            .clone(),
408                                    ),
409                                )]
410                                .into_iter()
411                                .collect(),
412                                total_columns: right_schema_orig.total_columns,
413                            };
414                            result = remove_duplicate_columns_for_natural_join(
415                                result,
416                                &left_schema,
417                                &right_schema_for_removal,
418                            )?;
419                        }
420                    }
421
422                    return Ok(result);
423                }
424
425                // Single-column equi-join: use standard hash join (more efficient for single key)
426                // Save schemas for NATURAL JOIN processing before moving left/right
427                let (left_schema_for_natural, right_schema_for_natural) = if natural {
428                    (Some(left.schema.clone()), Some(right.schema.clone()))
429                } else {
430                    (None, None)
431                };
432
433                let mut result = hash_join_inner(
434                    left,
435                    right,
436                    multi_result.equi_joins.left_col_indices[0],
437                    multi_result.equi_joins.right_col_indices[0],
438                )?;
439
440                // Apply remaining conditions as post-join filter
441                if !multi_result.remaining_conditions.is_empty() {
442                    if let Some(filter_expr) = combine_with_and(multi_result.remaining_conditions) {
443                        result =
444                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
445                    }
446                }
447
448                // For NATURAL JOIN, remove duplicate columns from the result
449                if natural {
450                    if let (Some(left_schema), Some(right_schema_orig)) =
451                        (left_schema_for_natural, right_schema_for_natural)
452                    {
453                        let right_schema_for_removal = CombinedSchema {
454                            table_schemas: vec![(
455                                right_table_name_for_natural.clone(),
456                                (
457                                    0,
458                                    right_schema_orig
459                                        .table_schemas
460                                        .values()
461                                        .next()
462                                        .unwrap()
463                                        .1
464                                        .clone(),
465                                ),
466                            )]
467                            .into_iter()
468                            .collect(),
469                            total_columns: right_schema_orig.total_columns,
470                        };
471                        result = remove_duplicate_columns_for_natural_join(
472                            result,
473                            &left_schema,
474                            &right_schema_for_removal,
475                        )?;
476                    }
477                }
478
479                return Ok(result);
480            }
481        }
482
483        // Phase 3.2: Try OR conditions with common equi-join (TPC-H Q19 optimization)
484        // For expressions like `(a.x = b.x AND ...) OR (a.x = b.x AND ...) OR (a.x = b.x AND ...)`,
485        // extract the common equi-join `a.x = b.x` for hash join
486        if let Some(cond) = condition {
487            if let Some(or_result) =
488                join_analyzer::analyze_or_equi_join(cond, &temp_schema, left_col_count)
489            {
490                // Save schemas for NATURAL JOIN processing before moving left/right
491                let (left_schema_for_natural, right_schema_for_natural) = if natural {
492                    (Some(left.schema.clone()), Some(right.schema.clone()))
493                } else {
494                    (None, None)
495                };
496
497                let mut result = hash_join_inner(
498                    left,
499                    right,
500                    or_result.equi_join.left_col_idx,
501                    or_result.equi_join.right_col_idx,
502                )?;
503
504                // Apply remaining OR conditions as post-join filter
505                if !or_result.remaining_conditions.is_empty() {
506                    if let Some(filter_expr) = combine_with_and(or_result.remaining_conditions) {
507                        result =
508                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
509                    }
510                }
511
512                // For NATURAL JOIN, remove duplicate columns from the result
513                if natural {
514                    if let (Some(left_schema), Some(right_schema_orig)) =
515                        (left_schema_for_natural, right_schema_for_natural)
516                    {
517                        let right_schema_for_removal = CombinedSchema {
518                            table_schemas: vec![(
519                                right_table_name_for_natural.clone(),
520                                (
521                                    0,
522                                    right_schema_orig
523                                        .table_schemas
524                                        .values()
525                                        .next()
526                                        .unwrap()
527                                        .1
528                                        .clone(),
529                                ),
530                            )]
531                            .into_iter()
532                            .collect(),
533                            total_columns: right_schema_orig.total_columns,
534                        };
535                        result = remove_duplicate_columns_for_natural_join(
536                            result,
537                            &left_schema,
538                            &right_schema_for_removal,
539                        )?;
540                    }
541                }
542
543                return Ok(result);
544            }
545        }
546
547        // Phase 3.3: Try arithmetic equi-join (TPC-DS Q2 optimization)
548        // For expressions like `col1 = col2 - 53`, extract the arithmetic offset for hash join
549        if let Some(cond) = condition {
550            if let Some(arith_info) =
551                join_analyzer::analyze_arithmetic_equi_join(cond, &temp_schema, left_col_count)
552            {
553                // Save schemas for NATURAL JOIN processing before moving left/right
554                let (left_schema_for_natural, right_schema_for_natural) = if natural {
555                    (Some(left.schema.clone()), Some(right.schema.clone()))
556                } else {
557                    (None, None)
558                };
559
560                let mut result = hash_join_inner_arithmetic(
561                    left,
562                    right,
563                    arith_info.left_col_idx,
564                    arith_info.right_col_idx,
565                    arith_info.offset,
566                )?;
567
568                // For NATURAL JOIN, remove duplicate columns from the result
569                if natural {
570                    if let (Some(left_schema), Some(right_schema_orig)) =
571                        (left_schema_for_natural, right_schema_for_natural)
572                    {
573                        let right_schema_for_removal = CombinedSchema {
574                            table_schemas: vec![(
575                                right_table_name_for_natural.clone(),
576                                (
577                                    0,
578                                    right_schema_orig
579                                        .table_schemas
580                                        .values()
581                                        .next()
582                                        .unwrap()
583                                        .1
584                                        .clone(),
585                                ),
586                            )]
587                            .into_iter()
588                            .collect(),
589                            total_columns: right_schema_orig.total_columns,
590                        };
591                        result = remove_duplicate_columns_for_natural_join(
592                            result,
593                            &left_schema,
594                            &right_schema_for_removal,
595                        )?;
596                    }
597                }
598
599                return Ok(result);
600            }
601        }
602
603        // Phase 3.4: If no ON condition hash join, try WHERE clause equijoins
604        // Iterate through all additional equijoins to find one suitable for hash join
605        for (idx, equijoin) in additional_equijoins.iter().enumerate() {
606            if let Some(equi_join_info) =
607                join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
608            {
609                // Save schemas for NATURAL JOIN processing before moving left/right
610                let (left_schema_for_natural, right_schema_for_natural) = if natural {
611                    (Some(left.schema.clone()), Some(right.schema.clone()))
612                } else {
613                    (None, None)
614                };
615
616                // Found a WHERE clause equijoin suitable for hash join!
617                let mut result = hash_join_inner(
618                    left,
619                    right,
620                    equi_join_info.left_col_idx,
621                    equi_join_info.right_col_idx,
622                )?;
623
624                // Apply remaining equijoins and conditions as post-join filters
625                let remaining_conditions: Vec<_> = additional_equijoins
626                    .iter()
627                    .enumerate()
628                    .filter(|(i, _)| *i != idx)
629                    .map(|(_, e)| e.clone())
630                    .collect();
631
632                if !remaining_conditions.is_empty() {
633                    if let Some(filter_expr) = combine_with_and(remaining_conditions) {
634                        result =
635                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
636                    }
637                }
638
639                // For NATURAL JOIN, remove duplicate columns from the result
640                if natural {
641                    if let (Some(left_schema), Some(right_schema_orig)) =
642                        (left_schema_for_natural, right_schema_for_natural)
643                    {
644                        let right_schema_for_removal = CombinedSchema {
645                            table_schemas: vec![(
646                                right_table_name_for_natural.clone(),
647                                (
648                                    0,
649                                    right_schema_orig
650                                        .table_schemas
651                                        .values()
652                                        .next()
653                                        .unwrap()
654                                        .1
655                                        .clone(),
656                                ),
657                            )]
658                            .into_iter()
659                            .collect(),
660                            total_columns: right_schema_orig.total_columns,
661                        };
662                        result = remove_duplicate_columns_for_natural_join(
663                            result,
664                            &left_schema,
665                            &right_schema_for_removal,
666                        )?;
667                    }
668                }
669
670                return Ok(result);
671            }
672        }
673
674        // Phase 3.5: Try arithmetic equijoins from WHERE clause for hash join
675        // For expressions like `col1 = col2 - 53` in WHERE clause with Inner joins
676        // This enables hash join for derived table joins with arithmetic conditions (TPC-DS Q2)
677        for (idx, equijoin) in additional_equijoins.iter().enumerate() {
678            if let Some(arith_info) =
679                join_analyzer::analyze_arithmetic_equi_join(equijoin, &temp_schema, left_col_count)
680            {
681                // Save schemas for NATURAL JOIN processing before moving left/right
682                let (left_schema_for_natural, right_schema_for_natural) = if natural {
683                    (Some(left.schema.clone()), Some(right.schema.clone()))
684                } else {
685                    (None, None)
686                };
687
688                // Found an arithmetic equijoin suitable for hash join!
689                let mut result = hash_join_inner_arithmetic(
690                    left,
691                    right,
692                    arith_info.left_col_idx,
693                    arith_info.right_col_idx,
694                    arith_info.offset,
695                )?;
696
697                // Apply remaining equijoins as post-join filters
698                let remaining_conditions: Vec<_> = additional_equijoins
699                    .iter()
700                    .enumerate()
701                    .filter(|(i, _)| *i != idx)
702                    .map(|(_, e)| e.clone())
703                    .collect();
704
705                if !remaining_conditions.is_empty() {
706                    if let Some(filter_expr) = combine_with_and(remaining_conditions) {
707                        result =
708                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
709                    }
710                }
711
712                // For NATURAL JOIN, remove duplicate columns from the result
713                if natural {
714                    if let (Some(left_schema), Some(right_schema_orig)) =
715                        (left_schema_for_natural, right_schema_for_natural)
716                    {
717                        let right_schema_for_removal = CombinedSchema {
718                            table_schemas: vec![(
719                                right_table_name_for_natural.clone(),
720                                (
721                                    0,
722                                    right_schema_orig
723                                        .table_schemas
724                                        .values()
725                                        .next()
726                                        .unwrap()
727                                        .1
728                                        .clone(),
729                                ),
730                            )]
731                            .into_iter()
732                            .collect(),
733                            total_columns: right_schema_orig.total_columns,
734                        };
735                        result = remove_duplicate_columns_for_natural_join(
736                            result,
737                            &left_schema,
738                            &right_schema_for_removal,
739                        )?;
740                    }
741                }
742
743                return Ok(result);
744            }
745        }
746    }
747
748    // Try to use hash join for LEFT OUTER JOINs with equi-join conditions
749    // This optimization is critical for Q13 (customer LEFT JOIN orders)
750    if let vibesql_ast::JoinType::LeftOuter = join_type {
751        // Get column count and right table info for analysis
752        let left_col_count: usize =
753            left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
754
755        let right_table_name = right
756            .schema
757            .table_schemas
758            .keys()
759            .next()
760            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
761            .clone();
762
763        let right_schema = right
764            .schema
765            .table_schemas
766            .get(&right_table_name)
767            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
768            .1
769            .clone();
770
771        // Clone right_table_name before it gets moved into combine()
772        let right_table_name_for_natural = right_table_name.clone();
773
774        let temp_schema =
775            CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
776
777        // Try ON condition for hash join with compound AND support
778        if let Some(cond) = condition {
779            if let Some(compound_result) =
780                join_analyzer::analyze_compound_equi_join(cond, &temp_schema, left_col_count)
781            {
782                // Save schemas for NATURAL JOIN processing before moving left/right
783                let (left_schema_for_natural, right_schema_for_natural) = if natural {
784                    (Some(left.schema.clone()), Some(right.schema.clone()))
785                } else {
786                    (None, None)
787                };
788
789                let mut result = hash_join_left_outer(
790                    left,
791                    right,
792                    compound_result.equi_join.left_col_idx,
793                    compound_result.equi_join.right_col_idx,
794                )?;
795
796                // Apply remaining conditions from compound AND as post-join filter
797                if !compound_result.remaining_conditions.is_empty() {
798                    if let Some(filter_expr) =
799                        combine_with_and(compound_result.remaining_conditions)
800                    {
801                        result =
802                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
803                    }
804                }
805
806                // For NATURAL JOIN, remove duplicate columns from the result
807                if natural {
808                    if let (Some(left_schema), Some(right_schema_orig)) =
809                        (left_schema_for_natural, right_schema_for_natural)
810                    {
811                        let right_schema_for_removal = CombinedSchema {
812                            table_schemas: vec![(
813                                right_table_name_for_natural.clone(),
814                                (
815                                    0,
816                                    right_schema_orig
817                                        .table_schemas
818                                        .values()
819                                        .next()
820                                        .unwrap()
821                                        .1
822                                        .clone(),
823                                ),
824                            )]
825                            .into_iter()
826                            .collect(),
827                            total_columns: right_schema_orig.total_columns,
828                        };
829                        result = remove_duplicate_columns_for_natural_join(
830                            result,
831                            &left_schema,
832                            &right_schema_for_removal,
833                        )?;
834                    }
835                }
836
837                return Ok(result);
838            }
839        }
840    }
841
842    // Try to use hash join for SEMI/ANTI JOINs with equi-join conditions
843    if matches!(join_type, vibesql_ast::JoinType::Semi | vibesql_ast::JoinType::Anti) {
844        // Get column count for analysis
845        let left_col_count: usize =
846            left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
847
848        let right_table_name = right
849            .schema
850            .table_schemas
851            .keys()
852            .next()
853            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
854            .clone();
855
856        let right_schema = right
857            .schema
858            .table_schemas
859            .get(&right_table_name)
860            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
861            .1
862            .clone();
863
864        let temp_schema =
865            CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
866
867        // Try ON condition first - use analyze_compound_equi_join to handle complex conditions
868        // This enables hash join optimization for EXISTS subqueries with additional predicates
869        // Example: EXISTS (SELECT * FROM t WHERE t.x = outer.x AND t.y <> outer.y)
870        // The equi-join (t.x = outer.x) is used for hash join, and the inequality is a post-filter
871        if let Some(cond) = condition {
872            if let Some(compound_result) =
873                join_analyzer::analyze_compound_equi_join(cond, &temp_schema, left_col_count)
874            {
875                // Build the combined remaining condition (if any)
876                let remaining_filter = combine_with_and(compound_result.remaining_conditions);
877
878                let result = if matches!(join_type, vibesql_ast::JoinType::Semi) {
879                    hash_semi_join_with_filter(
880                        left,
881                        right,
882                        compound_result.equi_join.left_col_idx,
883                        compound_result.equi_join.right_col_idx,
884                        remaining_filter.as_ref(),
885                        &temp_schema,
886                        database,
887                    )?
888                } else {
889                    hash_anti_join_with_filter(
890                        left,
891                        right,
892                        compound_result.equi_join.left_col_idx,
893                        compound_result.equi_join.right_col_idx,
894                        remaining_filter.as_ref(),
895                        &temp_schema,
896                        database,
897                    )?
898                };
899
900                return Ok(result);
901            }
902        }
903
904        // Try WHERE clause equijoins
905        for equijoin in additional_equijoins.iter() {
906            if let Some(equi_join_info) =
907                join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
908            {
909                let result = if matches!(join_type, vibesql_ast::JoinType::Semi) {
910                    hash_semi_join(
911                        left,
912                        right,
913                        equi_join_info.left_col_idx,
914                        equi_join_info.right_col_idx,
915                    )?
916                } else {
917                    hash_anti_join(
918                        left,
919                        right,
920                        equi_join_info.left_col_idx,
921                        equi_join_info.right_col_idx,
922                    )?
923                };
924
925                return Ok(result);
926            }
927        }
928    }
929
930    // Try to use hash join for CROSS JOINs when equijoin conditions exist in WHERE clause
931    // This is critical for Q21 and other TPC-H queries with implicit (comma-separated) joins
932    // CROSS JOIN with equijoin predicates should be executed as hash INNER JOIN
933    if let vibesql_ast::JoinType::Cross = join_type {
934        if !additional_equijoins.is_empty() {
935            // Get column count and right table info for analysis
936            let left_col_count: usize =
937                left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
938
939            let right_table_name = right
940                .schema
941                .table_schemas
942                .keys()
943                .next()
944                .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
945                .clone();
946
947            let right_schema = right
948                .schema
949                .table_schemas
950                .get(&right_table_name)
951                .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
952                .1
953                .clone();
954
955            let temp_schema =
956                CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
957
958            // Try WHERE clause equijoins for hash join
959            for (idx, equijoin) in additional_equijoins.iter().enumerate() {
960                if let Some(equi_join_info) =
961                    join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
962                {
963                    // Found a WHERE clause equijoin suitable for hash join!
964                    // Execute CROSS JOIN as hash INNER JOIN with the equijoin condition
965                    let mut result = hash_join_inner(
966                        left,
967                        right,
968                        equi_join_info.left_col_idx,
969                        equi_join_info.right_col_idx,
970                    )?;
971
972                    // Apply remaining equijoins as post-join filters
973                    let remaining_conditions: Vec<_> = additional_equijoins
974                        .iter()
975                        .enumerate()
976                        .filter(|(i, _)| *i != idx)
977                        .map(|(_, e)| e.clone())
978                        .collect();
979
980                    if !remaining_conditions.is_empty() {
981                        if let Some(filter_expr) = combine_with_and(remaining_conditions) {
982                            result = apply_post_join_filter(
983                                result,
984                                &filter_expr,
985                                database,
986                                cte_results,
987                            )?;
988                        }
989                    }
990
991                    return Ok(result);
992                }
993            }
994
995            // Try arithmetic equijoins for hash join (TPC-DS Q2 optimization)
996            // For expressions like `col1 = col2 - 53` in WHERE clause
997            for (idx, equijoin) in additional_equijoins.iter().enumerate() {
998                if let Some(arith_info) = join_analyzer::analyze_arithmetic_equi_join(
999                    equijoin,
1000                    &temp_schema,
1001                    left_col_count,
1002                ) {
1003                    // Found an arithmetic equijoin suitable for hash join!
1004                    let mut result = hash_join_inner_arithmetic(
1005                        left,
1006                        right,
1007                        arith_info.left_col_idx,
1008                        arith_info.right_col_idx,
1009                        arith_info.offset,
1010                    )?;
1011
1012                    // Apply remaining equijoins as post-join filters
1013                    let remaining_conditions: Vec<_> = additional_equijoins
1014                        .iter()
1015                        .enumerate()
1016                        .filter(|(i, _)| *i != idx)
1017                        .map(|(_, e)| e.clone())
1018                        .collect();
1019
1020                    if !remaining_conditions.is_empty() {
1021                        if let Some(filter_expr) = combine_with_and(remaining_conditions) {
1022                            result = apply_post_join_filter(
1023                                result,
1024                                &filter_expr,
1025                                database,
1026                                cte_results,
1027                            )?;
1028                        }
1029                    }
1030
1031                    return Ok(result);
1032                }
1033            }
1034        }
1035    }
1036
1037    // Prepare combined join condition including additional equijoins from WHERE clause
1038    let mut all_join_conditions = Vec::new();
1039    if let Some(cond) = condition {
1040        all_join_conditions.push(cond.clone());
1041    }
1042    all_join_conditions.extend_from_slice(additional_equijoins);
1043
1044    // Combine all join conditions with AND
1045    let combined_condition = combine_with_and(all_join_conditions);
1046
1047    // Fall back to nested loop join for all other cases
1048    // For NATURAL JOIN, we need to preserve the original schemas for duplicate removal
1049    let (left_schema_for_natural, right_schema_for_natural) = if natural {
1050        (Some(left.schema.clone()), Some(right.schema.clone()))
1051    } else {
1052        (None, None)
1053    };
1054
1055    let mut result = match join_type {
1056        vibesql_ast::JoinType::Inner => {
1057            nested_loop_inner_join(left, right, &combined_condition, database, timeout_ctx)
1058        }
1059        vibesql_ast::JoinType::LeftOuter => {
1060            nested_loop_left_outer_join(left, right, &combined_condition, database, timeout_ctx)
1061        }
1062        vibesql_ast::JoinType::RightOuter => {
1063            nested_loop_right_outer_join(left, right, &combined_condition, database, timeout_ctx)
1064        }
1065        vibesql_ast::JoinType::FullOuter => {
1066            nested_loop_full_outer_join(left, right, &combined_condition, database, timeout_ctx)
1067        }
1068        vibesql_ast::JoinType::Cross => {
1069            nested_loop_cross_join(left, right, &combined_condition, database, timeout_ctx)
1070        }
1071        vibesql_ast::JoinType::Semi => {
1072            nested_loop_semi_join(left, right, &combined_condition, database, timeout_ctx)
1073        }
1074        vibesql_ast::JoinType::Anti => {
1075            nested_loop_anti_join(left, right, &combined_condition, database, timeout_ctx)
1076        }
1077    }?;
1078
1079    // For NATURAL JOIN, remove duplicate columns from the result
1080    if natural {
1081        if let (Some(left_schema), Some(right_schema)) =
1082            (left_schema_for_natural, right_schema_for_natural)
1083        {
1084            result =
1085                remove_duplicate_columns_for_natural_join(result, &left_schema, &right_schema)?;
1086        }
1087    }
1088
1089    Ok(result)
1090}
1091
1092/// Remove duplicate columns for NATURAL JOIN
1093///
1094/// NATURAL JOIN should only include common columns once (from the left side).
1095/// This function identifies common columns and removes duplicates from the right side.
1096fn remove_duplicate_columns_for_natural_join(
1097    mut result: FromResult,
1098    left_schema: &CombinedSchema,
1099    right_schema: &CombinedSchema,
1100) -> Result<FromResult, ExecutorError> {
1101    use std::collections::{HashMap, HashSet};
1102
1103    // Find common column names (case-insensitive)
1104    let mut left_column_map: HashMap<String, Vec<(String, String, usize)>> = HashMap::new(); // lowercase -> [(table, actual_name, idx)]
1105    let mut col_idx = 0;
1106    for (table_name, (_table_idx, table_schema)) in &left_schema.table_schemas {
1107        for col in &table_schema.columns {
1108            let lowercase = col.name.to_lowercase();
1109            left_column_map.entry(lowercase).or_default().push((
1110                table_name.clone(),
1111                col.name.clone(),
1112                col_idx,
1113            ));
1114            col_idx += 1;
1115        }
1116    }
1117
1118    // Identify which columns from the right side are duplicates
1119    let mut right_duplicate_indices: HashSet<usize> = HashSet::new();
1120    let left_col_count = col_idx;
1121    col_idx = 0;
1122    for (_table_idx, table_schema) in right_schema.table_schemas.values() {
1123        for col in &table_schema.columns {
1124            let lowercase = col.name.to_lowercase();
1125            if left_column_map.contains_key(&lowercase) {
1126                // This is a common column, mark it as a duplicate to remove
1127                right_duplicate_indices.insert(left_col_count + col_idx);
1128            }
1129            col_idx += 1;
1130        }
1131    }
1132
1133    // If no duplicates, return as-is
1134    if right_duplicate_indices.is_empty() {
1135        return Ok(result);
1136    }
1137
1138    // Project out the duplicate columns from the result
1139    let total_cols = left_col_count + col_idx;
1140    let keep_indices: Vec<usize> =
1141        (0..total_cols).filter(|i| !right_duplicate_indices.contains(i)).collect();
1142
1143    // Build new schema without duplicate columns
1144    let mut new_schema = CombinedSchema { table_schemas: HashMap::new(), total_columns: 0 };
1145    for (table_name, (table_start_idx, table_schema)) in &result.schema.table_schemas {
1146        let mut new_cols = Vec::new();
1147
1148        for (idx, col) in table_schema.columns.iter().enumerate() {
1149            // Calculate absolute column index manually
1150            let abs_col_idx = table_start_idx + idx;
1151
1152            if keep_indices.contains(&abs_col_idx) {
1153                new_cols.push(col.clone());
1154            }
1155        }
1156
1157        if !new_cols.is_empty() {
1158            let new_table_schema =
1159                vibesql_catalog::TableSchema::new(table_schema.name.clone(), new_cols);
1160            new_schema
1161                .table_schemas
1162                .insert(table_name.clone(), (new_schema.total_columns, new_table_schema.clone()));
1163            new_schema.total_columns += new_table_schema.columns.len();
1164        }
1165    }
1166
1167    // Project the rows - get mutable reference to rows to work with FromResult API
1168    let rows = result.rows();
1169    let new_rows: Vec<vibesql_storage::Row> = rows
1170        .iter()
1171        .map(|row| {
1172            let new_values: Vec<vibesql_types::SqlValue> =
1173                keep_indices.iter().filter_map(|&i| row.values.get(i).cloned()).collect();
1174            vibesql_storage::Row::new(new_values)
1175        })
1176        .collect();
1177
1178    Ok(FromResult::from_rows(new_schema, new_rows))
1179}