vibesql_executor/select/join/
mod.rs

1use std::sync::Arc;
2
3use crate::{
4    errors::ExecutorError, evaluator::CombinedExpressionEvaluator, optimizer::combine_with_and,
5    schema::CombinedSchema, timeout::TimeoutContext,
6};
7use super::from_iterator::FromIterator;
8
9mod expression_mapper;
10pub(crate) mod hash_join;
11mod hash_join_iterator;
12mod hash_semi_join;
13mod hash_anti_join;
14mod join_analyzer;
15mod nested_loop;
16pub mod reorder;
17pub mod search;
18
19#[cfg(test)]
20mod tests;
21
22// Re-export join reorder analyzer for public tests
23// Re-export hash_join functions for internal use
24use hash_join::{hash_join_inner, hash_join_inner_arithmetic, hash_join_inner_multi, hash_join_left_outer};
25use hash_semi_join::{hash_semi_join, hash_semi_join_with_filter};
26use hash_anti_join::{hash_anti_join, hash_anti_join_with_filter};
27// Re-export hash join iterator for public use
28pub use hash_join_iterator::HashJoinIterator;
29// Re-export nested loop join variants for internal use
30use nested_loop::{
31    nested_loop_anti_join, nested_loop_cross_join, nested_loop_full_outer_join,
32    nested_loop_inner_join, nested_loop_left_outer_join, nested_loop_right_outer_join,
33    nested_loop_semi_join,
34};
35pub use reorder::JoinOrderAnalyzer;
36// Re-export join order search for public tests
37pub use search::JoinOrderSearch;
38
39/// Data source for FROM clause results
40///
41/// This enum allows FROM results to be either materialized (Vec<Row>) or lazy (iterator).
42/// Materialized results are used for JOINs, CTEs, and operations that need multiple passes.
43/// Lazy results are used for simple table scans to enable streaming execution.
44pub(super) enum FromData {
45    /// Materialized rows (for JOINs, CTEs, operations needing multiple passes)
46    Materialized(Vec<vibesql_storage::Row>),
47
48    /// Shared rows (for zero-copy CTE references without filtering)
49    ///
50    /// This variant enables O(1) cloning when CTEs are referenced multiple times
51    /// without any filtering. Only materializes to Vec when mutation is needed.
52    SharedRows(Arc<Vec<vibesql_storage::Row>>),
53
54    /// Lazy iterator (for streaming table scans)
55    Iterator(FromIterator),
56}
57
58impl FromData {
59    /// Get rows, materializing if needed
60    ///
61    /// For SharedRows, this will clone only if the Arc is shared.
62    /// If the Arc has a single reference, the Vec is moved out efficiently.
63    pub fn into_rows(self) -> Vec<vibesql_storage::Row> {
64        match self {
65            Self::Materialized(rows) => rows,
66            Self::SharedRows(arc) => Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()),
67            Self::Iterator(iter) => iter.collect_vec(),
68        }
69    }
70
71    /// Get a reference to materialized rows, or materialize if iterator
72    ///
73    /// For SharedRows, returns a reference to the shared data without cloning.
74    pub fn as_rows(&mut self) -> &Vec<vibesql_storage::Row> {
75        // If we have an iterator, materialize it
76        if let Self::Iterator(iter) = self {
77            #[cfg(feature = "profile-q6")]
78            let materialize_start = std::time::Instant::now();
79
80            let rows = std::mem::replace(iter, FromIterator::from_vec(vec![])).collect_vec();
81            *self = Self::Materialized(rows);
82
83            #[cfg(feature = "profile-q6")]
84            {
85                let materialize_time = materialize_start.elapsed();
86                if let Self::Materialized(rows) = self {
87                    eprintln!("[Q6 PROFILE] Row materialization (collect_vec): {:?} ({} rows, {:?}/row)",
88                        materialize_time, rows.len(), materialize_time / rows.len() as u32);
89                }
90            }
91        }
92
93        // Now we're guaranteed to have materialized or shared rows
94        match self {
95            Self::Materialized(ref rows) => rows,
96            Self::SharedRows(ref arc) => arc.as_ref(),
97            Self::Iterator(_) => unreachable!(),
98        }
99    }
100
101    /// Get a slice reference to the underlying rows without triggering materialization
102    ///
103    /// This is a zero-cost operation that directly accesses the underlying Vec<Row>
104    /// without calling collect_vec(). This avoids the 137ms row materialization
105    /// bottleneck in Q6 by skipping iteration entirely.
106    ///
107    /// Critical for columnar execution performance (#2521).
108    pub fn as_slice(&self) -> &[vibesql_storage::Row] {
109        match self {
110            Self::Materialized(rows) => rows.as_slice(),
111            Self::SharedRows(arc) => arc.as_slice(),
112            Self::Iterator(iter) => iter.as_slice(),
113        }
114    }
115}
116
117/// Result of executing a FROM clause
118///
119/// Contains the combined schema and data (either materialized or lazy).
120pub(super) struct FromResult {
121    pub(super) schema: CombinedSchema,
122    pub(super) data: FromData,
123    /// If present, indicates that results are already sorted by the specified columns
124    /// in the given order (ASC/DESC). This allows skipping ORDER BY sorting.
125    pub(super) sorted_by: Option<Vec<(String, vibesql_ast::OrderDirection)>>,
126    /// If true, indicates that WHERE clause filtering has already been fully applied
127    /// during the scan (e.g., by index scan with predicate pushdown). This allows
128    /// skipping redundant WHERE clause evaluation in the executor.
129    pub(super) where_filtered: bool,
130}
131
132impl FromResult {
133    /// Create a FromResult from materialized rows
134    pub(super) fn from_rows(schema: CombinedSchema, rows: Vec<vibesql_storage::Row>) -> Self {
135        Self { schema, data: FromData::Materialized(rows), sorted_by: None, where_filtered: false }
136    }
137
138    /// Create a FromResult from shared rows (zero-copy for CTEs)
139    ///
140    /// This variant is used when CTE rows can be shared without cloning,
141    /// enabling O(1) memory usage for CTE references without filtering.
142    pub(super) fn from_shared_rows(schema: CombinedSchema, rows: Arc<Vec<vibesql_storage::Row>>) -> Self {
143        Self { schema, data: FromData::SharedRows(rows), sorted_by: None, where_filtered: false }
144    }
145
146    /// Create a FromResult from materialized rows with sorting metadata
147    pub(super) fn from_rows_sorted(
148        schema: CombinedSchema,
149        rows: Vec<vibesql_storage::Row>,
150        sorted_by: Vec<(String, vibesql_ast::OrderDirection)>,
151    ) -> Self {
152        Self { schema, data: FromData::Materialized(rows), sorted_by: Some(sorted_by), where_filtered: false }
153    }
154
155    /// Create a FromResult from materialized rows with WHERE filtering already applied
156    pub(super) fn from_rows_where_filtered(
157        schema: CombinedSchema,
158        rows: Vec<vibesql_storage::Row>,
159        sorted_by: Option<Vec<(String, vibesql_ast::OrderDirection)>>,
160    ) -> Self {
161        Self { schema, data: FromData::Materialized(rows), sorted_by, where_filtered: true }
162    }
163
164    /// Create a FromResult from an iterator
165    pub(super) fn from_iterator(schema: CombinedSchema, iterator: FromIterator) -> Self {
166        Self { schema, data: FromData::Iterator(iterator), sorted_by: None, where_filtered: false }
167    }
168
169    /// Get the rows, materializing if needed
170    pub(super) fn into_rows(self) -> Vec<vibesql_storage::Row> {
171        self.data.into_rows()
172    }
173
174    /// Get a mutable reference to the rows, materializing if needed
175    ///
176    /// For SharedRows, this triggers copy-on-write: the shared data is cloned
177    /// into owned Materialized data to allow mutation.
178    #[allow(dead_code)]
179    pub(super) fn rows_mut(&mut self) -> &mut Vec<vibesql_storage::Row> {
180        // Convert iterator or shared to materialized
181        match &mut self.data {
182            FromData::Iterator(iter) => {
183                let rows = std::mem::replace(iter, FromIterator::from_vec(vec![])).collect_vec();
184                self.data = FromData::Materialized(rows);
185            }
186            FromData::SharedRows(arc) => {
187                // Copy-on-write: clone the shared data to allow mutation
188                let rows = arc.as_ref().clone();
189                self.data = FromData::Materialized(rows);
190            }
191            FromData::Materialized(_) => {}
192        }
193
194        // Now we're guaranteed to have materialized rows
195        match &mut self.data {
196            FromData::Materialized(rows) => rows,
197            FromData::SharedRows(_) | FromData::Iterator(_) => unreachable!(),
198        }
199    }
200
201    /// Get a reference to rows, materializing if needed
202    pub(super) fn rows(&mut self) -> &Vec<vibesql_storage::Row> {
203        self.data.as_rows()
204    }
205
206    /// Get a slice reference to rows without triggering materialization
207    ///
208    /// This is a zero-cost operation that accesses the underlying data directly
209    /// without calling collect_vec(). This is critical for performance as it
210    /// avoids the row materialization bottleneck (up to 57% of query time).
211    ///
212    /// Unlike `rows()` which may trigger iterator collection, this method
213    /// provides direct access to the underlying Vec<Row> or iterator buffer.
214    pub(super) fn as_slice(&self) -> &[vibesql_storage::Row] {
215        self.data.as_slice()
216    }
217}
218
219/// Helper function to combine two rows without unnecessary cloning
220/// Only creates a single combined row, avoiding intermediate clones
221#[inline]
222fn combine_rows(left_row: &vibesql_storage::Row, right_row: &vibesql_storage::Row) -> vibesql_storage::Row {
223    let mut combined_values = Vec::with_capacity(left_row.values.len() + right_row.values.len());
224    combined_values.extend_from_slice(&left_row.values);
225    combined_values.extend_from_slice(&right_row.values);
226    vibesql_storage::Row::new(combined_values)
227}
228
229/// Apply a post-join filter expression to join result rows
230///
231/// This is used to filter rows produced by hash join with additional conditions
232/// from the WHERE clause that weren't used in the hash join itself.
233fn apply_post_join_filter(
234    result: FromResult,
235    filter_expr: &vibesql_ast::Expression,
236    database: &vibesql_storage::Database,
237) -> Result<FromResult, ExecutorError> {
238    // Extract schema before moving result
239    let schema = result.schema.clone();
240    let evaluator = CombinedExpressionEvaluator::with_database(&schema, database);
241
242    // Filter rows based on the expression
243    let mut filtered_rows = Vec::new();
244    for row in result.into_rows() {
245        match evaluator.eval(filter_expr, &row)? {
246            vibesql_types::SqlValue::Boolean(true) => filtered_rows.push(row),
247            vibesql_types::SqlValue::Boolean(false) => {} // Skip this row
248            vibesql_types::SqlValue::Null => {}           // Skip NULL results
249            // SQLLogicTest compatibility: treat integers as truthy/falsy
250            vibesql_types::SqlValue::Integer(0) => {} // Skip 0
251            vibesql_types::SqlValue::Integer(_) => filtered_rows.push(row),
252            vibesql_types::SqlValue::Smallint(0) => {} // Skip 0
253            vibesql_types::SqlValue::Smallint(_) => filtered_rows.push(row),
254            vibesql_types::SqlValue::Bigint(0) => {} // Skip 0
255            vibesql_types::SqlValue::Bigint(_) => filtered_rows.push(row),
256            vibesql_types::SqlValue::Float(0.0) => {} // Skip 0.0
257            vibesql_types::SqlValue::Float(_) => filtered_rows.push(row),
258            vibesql_types::SqlValue::Real(0.0) => {} // Skip 0.0
259            vibesql_types::SqlValue::Real(_) => filtered_rows.push(row),
260            vibesql_types::SqlValue::Double(0.0) => {} // Skip 0.0
261            vibesql_types::SqlValue::Double(_) => filtered_rows.push(row),
262            other => {
263                return Err(ExecutorError::InvalidWhereClause(format!(
264                    "Filter expression must evaluate to boolean, got: {:?}",
265                    other
266                )))
267            }
268        }
269    }
270
271    Ok(FromResult::from_rows(schema, filtered_rows))
272}
273
274/// Perform join between two FROM results, optimizing with hash join when possible
275///
276/// This function now supports predicate pushdown from WHERE clauses. Additional equijoin
277/// predicates from WHERE can be passed to optimize hash join selection and execution.
278///
279/// Note: This function combines rows from left and right according to the join type
280/// and join condition. For queries with many tables and large intermediate results,
281/// consider applying WHERE filters earlier to reduce memory usage.
282#[allow(clippy::too_many_arguments)]
283pub(super) fn nested_loop_join(
284    left: FromResult,
285    right: FromResult,
286    join_type: &vibesql_ast::JoinType,
287    condition: &Option<vibesql_ast::Expression>,
288    natural: bool,
289    database: &vibesql_storage::Database,
290    additional_equijoins: &[vibesql_ast::Expression],
291    timeout_ctx: &TimeoutContext,
292) -> Result<FromResult, ExecutorError> {
293    // Try to use hash join for INNER JOINs with simple equi-join conditions
294    if let vibesql_ast::JoinType::Inner = join_type {
295        // Get column count and right table info once for analysis
296        // IMPORTANT: Sum up columns from ALL tables in the left schema,
297        // not just the first table, to handle accumulated multi-table joins
298        let left_col_count: usize =
299            left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
300
301        let right_table_name = right
302            .schema
303            .table_schemas
304            .keys()
305            .next()
306            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
307            .clone();
308
309        let right_schema = right
310            .schema
311            .table_schemas
312            .get(&right_table_name)
313            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
314            .1
315            .clone();
316
317        // Clone right_table_name before it gets moved into combine()
318        let right_table_name_for_natural = right_table_name.clone();
319
320        let temp_schema =
321            CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
322
323        // Phase 3.1: Try ON condition first (preferred for hash join)
324        // Now supports multi-column composite keys for better performance
325        if let Some(cond) = condition {
326            // First try multi-column hash join for composite keys (2+ equi-join conditions)
327            if let Some(multi_result) =
328                join_analyzer::analyze_multi_column_equi_join(cond, &temp_schema, left_col_count)
329            {
330                // Use multi-column hash join if there are 2+ join columns
331                if multi_result.equi_joins.left_col_indices.len() >= 2 {
332                    // Save schemas for NATURAL JOIN processing before moving left/right
333                    let (left_schema_for_natural, right_schema_for_natural) = if natural {
334                        (Some(left.schema.clone()), Some(right.schema.clone()))
335                    } else {
336                        (None, None)
337                    };
338
339                    let mut result = hash_join_inner_multi(
340                        left,
341                        right,
342                        &multi_result.equi_joins.left_col_indices,
343                        &multi_result.equi_joins.right_col_indices,
344                    )?;
345
346                    // Apply remaining conditions as post-join filter
347                    if !multi_result.remaining_conditions.is_empty() {
348                        if let Some(filter_expr) = combine_with_and(multi_result.remaining_conditions) {
349                            result = apply_post_join_filter(result, &filter_expr, database)?;
350                        }
351                    }
352
353                    // For NATURAL JOIN, remove duplicate columns from the result
354                    if natural {
355                        if let (Some(left_schema), Some(right_schema_orig)) =
356                            (left_schema_for_natural, right_schema_for_natural)
357                        {
358                            let right_schema_for_removal = CombinedSchema {
359                                table_schemas: vec![(
360                                    right_table_name_for_natural.clone(),
361                                    (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
362                                )]
363                                .into_iter()
364                                .collect(),
365                                total_columns: right_schema_orig.total_columns,
366                            };
367                            result = remove_duplicate_columns_for_natural_join(
368                                result,
369                                &left_schema,
370                                &right_schema_for_removal,
371                            )?;
372                        }
373                    }
374
375                    return Ok(result);
376                }
377
378                // Single-column equi-join: use standard hash join (more efficient for single key)
379                // Save schemas for NATURAL JOIN processing before moving left/right
380                let (left_schema_for_natural, right_schema_for_natural) = if natural {
381                    (Some(left.schema.clone()), Some(right.schema.clone()))
382                } else {
383                    (None, None)
384                };
385
386                let mut result = hash_join_inner(
387                    left,
388                    right,
389                    multi_result.equi_joins.left_col_indices[0],
390                    multi_result.equi_joins.right_col_indices[0],
391                )?;
392
393                // Apply remaining conditions as post-join filter
394                if !multi_result.remaining_conditions.is_empty() {
395                    if let Some(filter_expr) = combine_with_and(multi_result.remaining_conditions) {
396                        result = apply_post_join_filter(result, &filter_expr, database)?;
397                    }
398                }
399
400                // For NATURAL JOIN, remove duplicate columns from the result
401                if natural {
402                    if let (Some(left_schema), Some(right_schema_orig)) =
403                        (left_schema_for_natural, right_schema_for_natural)
404                    {
405                        let right_schema_for_removal = CombinedSchema {
406                            table_schemas: vec![(
407                                right_table_name_for_natural.clone(),
408                                (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
409                            )]
410                            .into_iter()
411                            .collect(),
412                            total_columns: right_schema_orig.total_columns,
413                        };
414                        result = remove_duplicate_columns_for_natural_join(
415                            result,
416                            &left_schema,
417                            &right_schema_for_removal,
418                        )?;
419                    }
420                }
421
422                return Ok(result);
423            }
424        }
425
426        // Phase 3.2: Try OR conditions with common equi-join (TPC-H Q19 optimization)
427        // For expressions like `(a.x = b.x AND ...) OR (a.x = b.x AND ...) OR (a.x = b.x AND ...)`,
428        // extract the common equi-join `a.x = b.x` for hash join
429        if let Some(cond) = condition {
430            if let Some(or_result) =
431                join_analyzer::analyze_or_equi_join(cond, &temp_schema, left_col_count)
432            {
433                // Save schemas for NATURAL JOIN processing before moving left/right
434                let (left_schema_for_natural, right_schema_for_natural) = if natural {
435                    (Some(left.schema.clone()), Some(right.schema.clone()))
436                } else {
437                    (None, None)
438                };
439
440                let mut result = hash_join_inner(
441                    left,
442                    right,
443                    or_result.equi_join.left_col_idx,
444                    or_result.equi_join.right_col_idx,
445                )?;
446
447                // Apply remaining OR conditions as post-join filter
448                if !or_result.remaining_conditions.is_empty() {
449                    if let Some(filter_expr) = combine_with_and(or_result.remaining_conditions) {
450                        result = apply_post_join_filter(result, &filter_expr, database)?;
451                    }
452                }
453
454                // For NATURAL JOIN, remove duplicate columns from the result
455                if natural {
456                    if let (Some(left_schema), Some(right_schema_orig)) =
457                        (left_schema_for_natural, right_schema_for_natural)
458                    {
459                        let right_schema_for_removal = CombinedSchema {
460                            table_schemas: vec![(
461                                right_table_name_for_natural.clone(),
462                                (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
463                            )]
464                            .into_iter()
465                            .collect(),
466                            total_columns: right_schema_orig.total_columns,
467                        };
468                        result = remove_duplicate_columns_for_natural_join(
469                            result,
470                            &left_schema,
471                            &right_schema_for_removal,
472                        )?;
473                    }
474                }
475
476                return Ok(result);
477            }
478        }
479
480        // Phase 3.3: Try arithmetic equi-join (TPC-DS Q2 optimization)
481        // For expressions like `col1 = col2 - 53`, extract the arithmetic offset for hash join
482        if let Some(cond) = condition {
483            if let Some(arith_info) =
484                join_analyzer::analyze_arithmetic_equi_join(cond, &temp_schema, left_col_count)
485            {
486                // Save schemas for NATURAL JOIN processing before moving left/right
487                let (left_schema_for_natural, right_schema_for_natural) = if natural {
488                    (Some(left.schema.clone()), Some(right.schema.clone()))
489                } else {
490                    (None, None)
491                };
492
493                let mut result = hash_join_inner_arithmetic(
494                    left,
495                    right,
496                    arith_info.left_col_idx,
497                    arith_info.right_col_idx,
498                    arith_info.offset,
499                )?;
500
501                // For NATURAL JOIN, remove duplicate columns from the result
502                if natural {
503                    if let (Some(left_schema), Some(right_schema_orig)) =
504                        (left_schema_for_natural, right_schema_for_natural)
505                    {
506                        let right_schema_for_removal = CombinedSchema {
507                            table_schemas: vec![(
508                                right_table_name_for_natural.clone(),
509                                (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
510                            )]
511                            .into_iter()
512                            .collect(),
513                            total_columns: right_schema_orig.total_columns,
514                        };
515                        result = remove_duplicate_columns_for_natural_join(
516                            result,
517                            &left_schema,
518                            &right_schema_for_removal,
519                        )?;
520                    }
521                }
522
523                return Ok(result);
524            }
525        }
526
527        // Phase 3.4: If no ON condition hash join, try WHERE clause equijoins
528        // Iterate through all additional equijoins to find one suitable for hash join
529        for (idx, equijoin) in additional_equijoins.iter().enumerate() {
530            if let Some(equi_join_info) =
531                join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
532            {
533                // Save schemas for NATURAL JOIN processing before moving left/right
534                let (left_schema_for_natural, right_schema_for_natural) = if natural {
535                    (Some(left.schema.clone()), Some(right.schema.clone()))
536                } else {
537                    (None, None)
538                };
539
540                // Found a WHERE clause equijoin suitable for hash join!
541                let mut result = hash_join_inner(
542                    left,
543                    right,
544                    equi_join_info.left_col_idx,
545                    equi_join_info.right_col_idx,
546                )?;
547
548                // Apply remaining equijoins and conditions as post-join filters
549                let remaining_conditions: Vec<_> = additional_equijoins
550                    .iter()
551                    .enumerate()
552                    .filter(|(i, _)| *i != idx)
553                    .map(|(_, e)| e.clone())
554                    .collect();
555
556                if !remaining_conditions.is_empty() {
557                    if let Some(filter_expr) = combine_with_and(remaining_conditions) {
558                        result = apply_post_join_filter(result, &filter_expr, database)?;
559                    }
560                }
561
562                // For NATURAL JOIN, remove duplicate columns from the result
563                if natural {
564                    if let (Some(left_schema), Some(right_schema_orig)) =
565                        (left_schema_for_natural, right_schema_for_natural)
566                    {
567                        let right_schema_for_removal = CombinedSchema {
568                            table_schemas: vec![(
569                                right_table_name_for_natural.clone(),
570                                (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
571                            )]
572                            .into_iter()
573                            .collect(),
574                            total_columns: right_schema_orig.total_columns,
575                        };
576                        result = remove_duplicate_columns_for_natural_join(
577                            result,
578                            &left_schema,
579                            &right_schema_for_removal,
580                        )?;
581                    }
582                }
583
584                return Ok(result);
585            }
586        }
587
588        // Phase 3.5: Try arithmetic equijoins from WHERE clause for hash join
589        // For expressions like `col1 = col2 - 53` in WHERE clause with Inner joins
590        // This enables hash join for derived table joins with arithmetic conditions (TPC-DS Q2)
591        for (idx, equijoin) in additional_equijoins.iter().enumerate() {
592            if let Some(arith_info) =
593                join_analyzer::analyze_arithmetic_equi_join(equijoin, &temp_schema, left_col_count)
594            {
595                // Save schemas for NATURAL JOIN processing before moving left/right
596                let (left_schema_for_natural, right_schema_for_natural) = if natural {
597                    (Some(left.schema.clone()), Some(right.schema.clone()))
598                } else {
599                    (None, None)
600                };
601
602                // Found an arithmetic equijoin suitable for hash join!
603                let mut result = hash_join_inner_arithmetic(
604                    left,
605                    right,
606                    arith_info.left_col_idx,
607                    arith_info.right_col_idx,
608                    arith_info.offset,
609                )?;
610
611                // Apply remaining equijoins as post-join filters
612                let remaining_conditions: Vec<_> = additional_equijoins
613                    .iter()
614                    .enumerate()
615                    .filter(|(i, _)| *i != idx)
616                    .map(|(_, e)| e.clone())
617                    .collect();
618
619                if !remaining_conditions.is_empty() {
620                    if let Some(filter_expr) = combine_with_and(remaining_conditions) {
621                        result = apply_post_join_filter(result, &filter_expr, database)?;
622                    }
623                }
624
625                // For NATURAL JOIN, remove duplicate columns from the result
626                if natural {
627                    if let (Some(left_schema), Some(right_schema_orig)) =
628                        (left_schema_for_natural, right_schema_for_natural)
629                    {
630                        let right_schema_for_removal = CombinedSchema {
631                            table_schemas: vec![(
632                                right_table_name_for_natural.clone(),
633                                (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
634                            )]
635                            .into_iter()
636                            .collect(),
637                            total_columns: right_schema_orig.total_columns,
638                        };
639                        result = remove_duplicate_columns_for_natural_join(
640                            result,
641                            &left_schema,
642                            &right_schema_for_removal,
643                        )?;
644                    }
645                }
646
647                return Ok(result);
648            }
649        }
650    }
651
652    // Try to use hash join for LEFT OUTER JOINs with equi-join conditions
653    // This optimization is critical for Q13 (customer LEFT JOIN orders)
654    if let vibesql_ast::JoinType::LeftOuter = join_type {
655        // Get column count and right table info for analysis
656        let left_col_count: usize =
657            left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
658
659        let right_table_name = right
660            .schema
661            .table_schemas
662            .keys()
663            .next()
664            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
665            .clone();
666
667        let right_schema = right
668            .schema
669            .table_schemas
670            .get(&right_table_name)
671            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
672            .1
673            .clone();
674
675        // Clone right_table_name before it gets moved into combine()
676        let right_table_name_for_natural = right_table_name.clone();
677
678        let temp_schema =
679            CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
680
681        // Try ON condition for hash join with compound AND support
682        if let Some(cond) = condition {
683            if let Some(compound_result) =
684                join_analyzer::analyze_compound_equi_join(cond, &temp_schema, left_col_count)
685            {
686                // Save schemas for NATURAL JOIN processing before moving left/right
687                let (left_schema_for_natural, right_schema_for_natural) = if natural {
688                    (Some(left.schema.clone()), Some(right.schema.clone()))
689                } else {
690                    (None, None)
691                };
692
693                let mut result = hash_join_left_outer(
694                    left,
695                    right,
696                    compound_result.equi_join.left_col_idx,
697                    compound_result.equi_join.right_col_idx,
698                )?;
699
700                // Apply remaining conditions from compound AND as post-join filter
701                if !compound_result.remaining_conditions.is_empty() {
702                    if let Some(filter_expr) = combine_with_and(compound_result.remaining_conditions) {
703                        result = apply_post_join_filter(result, &filter_expr, database)?;
704                    }
705                }
706
707                // For NATURAL JOIN, remove duplicate columns from the result
708                if natural {
709                    if let (Some(left_schema), Some(right_schema_orig)) =
710                        (left_schema_for_natural, right_schema_for_natural)
711                    {
712                        let right_schema_for_removal = CombinedSchema {
713                            table_schemas: vec![(
714                                right_table_name_for_natural.clone(),
715                                (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
716                            )]
717                            .into_iter()
718                            .collect(),
719                            total_columns: right_schema_orig.total_columns,
720                        };
721                        result = remove_duplicate_columns_for_natural_join(
722                            result,
723                            &left_schema,
724                            &right_schema_for_removal,
725                        )?;
726                    }
727                }
728
729                return Ok(result);
730            }
731        }
732    }
733
734    // Try to use hash join for SEMI/ANTI JOINs with equi-join conditions
735    if matches!(join_type, vibesql_ast::JoinType::Semi | vibesql_ast::JoinType::Anti) {
736        // Get column count for analysis
737        let left_col_count: usize =
738            left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
739
740        let right_table_name = right
741            .schema
742            .table_schemas
743            .keys()
744            .next()
745            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
746            .clone();
747
748        let right_schema = right
749            .schema
750            .table_schemas
751            .get(&right_table_name)
752            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
753            .1
754            .clone();
755
756        let temp_schema =
757            CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
758
759        // Try ON condition first - use analyze_compound_equi_join to handle complex conditions
760        // This enables hash join optimization for EXISTS subqueries with additional predicates
761        // Example: EXISTS (SELECT * FROM t WHERE t.x = outer.x AND t.y <> outer.y)
762        // The equi-join (t.x = outer.x) is used for hash join, and the inequality is a post-filter
763        if let Some(cond) = condition {
764            if let Some(compound_result) =
765                join_analyzer::analyze_compound_equi_join(cond, &temp_schema, left_col_count)
766            {
767                // Build the combined remaining condition (if any)
768                let remaining_filter = combine_with_and(compound_result.remaining_conditions);
769
770                let result = if matches!(join_type, vibesql_ast::JoinType::Semi) {
771                    hash_semi_join_with_filter(
772                        left,
773                        right,
774                        compound_result.equi_join.left_col_idx,
775                        compound_result.equi_join.right_col_idx,
776                        remaining_filter.as_ref(),
777                        &temp_schema,
778                        database,
779                    )?
780                } else {
781                    hash_anti_join_with_filter(
782                        left,
783                        right,
784                        compound_result.equi_join.left_col_idx,
785                        compound_result.equi_join.right_col_idx,
786                        remaining_filter.as_ref(),
787                        &temp_schema,
788                        database,
789                    )?
790                };
791
792                return Ok(result);
793            }
794        }
795
796        // Try WHERE clause equijoins
797        for equijoin in additional_equijoins.iter() {
798            if let Some(equi_join_info) =
799                join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
800            {
801                let result = if matches!(join_type, vibesql_ast::JoinType::Semi) {
802                    hash_semi_join(
803                        left,
804                        right,
805                        equi_join_info.left_col_idx,
806                        equi_join_info.right_col_idx,
807                    )?
808                } else {
809                    hash_anti_join(
810                        left,
811                        right,
812                        equi_join_info.left_col_idx,
813                        equi_join_info.right_col_idx,
814                    )?
815                };
816
817                return Ok(result);
818            }
819        }
820    }
821
822    // Try to use hash join for CROSS JOINs when equijoin conditions exist in WHERE clause
823    // This is critical for Q21 and other TPC-H queries with implicit (comma-separated) joins
824    // CROSS JOIN with equijoin predicates should be executed as hash INNER JOIN
825    if let vibesql_ast::JoinType::Cross = join_type {
826        if !additional_equijoins.is_empty() {
827            // Get column count and right table info for analysis
828            let left_col_count: usize =
829                left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
830
831            let right_table_name = right
832                .schema
833                .table_schemas
834                .keys()
835                .next()
836                .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
837                .clone();
838
839            let right_schema = right
840                .schema
841                .table_schemas
842                .get(&right_table_name)
843                .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
844                .1
845                .clone();
846
847            let temp_schema =
848                CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
849
850            // Try WHERE clause equijoins for hash join
851            for (idx, equijoin) in additional_equijoins.iter().enumerate() {
852                if let Some(equi_join_info) =
853                    join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
854                {
855                    // Found a WHERE clause equijoin suitable for hash join!
856                    // Execute CROSS JOIN as hash INNER JOIN with the equijoin condition
857                    let mut result = hash_join_inner(
858                        left,
859                        right,
860                        equi_join_info.left_col_idx,
861                        equi_join_info.right_col_idx,
862                    )?;
863
864                    // Apply remaining equijoins as post-join filters
865                    let remaining_conditions: Vec<_> = additional_equijoins
866                        .iter()
867                        .enumerate()
868                        .filter(|(i, _)| *i != idx)
869                        .map(|(_, e)| e.clone())
870                        .collect();
871
872                    if !remaining_conditions.is_empty() {
873                        if let Some(filter_expr) = combine_with_and(remaining_conditions) {
874                            result = apply_post_join_filter(result, &filter_expr, database)?;
875                        }
876                    }
877
878                    return Ok(result);
879                }
880            }
881
882            // Try arithmetic equijoins for hash join (TPC-DS Q2 optimization)
883            // For expressions like `col1 = col2 - 53` in WHERE clause
884            for (idx, equijoin) in additional_equijoins.iter().enumerate() {
885                if let Some(arith_info) =
886                    join_analyzer::analyze_arithmetic_equi_join(equijoin, &temp_schema, left_col_count)
887                {
888                    // Found an arithmetic equijoin suitable for hash join!
889                    let mut result = hash_join_inner_arithmetic(
890                        left,
891                        right,
892                        arith_info.left_col_idx,
893                        arith_info.right_col_idx,
894                        arith_info.offset,
895                    )?;
896
897                    // Apply remaining equijoins as post-join filters
898                    let remaining_conditions: Vec<_> = additional_equijoins
899                        .iter()
900                        .enumerate()
901                        .filter(|(i, _)| *i != idx)
902                        .map(|(_, e)| e.clone())
903                        .collect();
904
905                    if !remaining_conditions.is_empty() {
906                        if let Some(filter_expr) = combine_with_and(remaining_conditions) {
907                            result = apply_post_join_filter(result, &filter_expr, database)?;
908                        }
909                    }
910
911                    return Ok(result);
912                }
913            }
914        }
915    }
916
917    // Prepare combined join condition including additional equijoins from WHERE clause
918    let mut all_join_conditions = Vec::new();
919    if let Some(cond) = condition {
920        all_join_conditions.push(cond.clone());
921    }
922    all_join_conditions.extend_from_slice(additional_equijoins);
923
924    // Combine all join conditions with AND
925    let combined_condition = combine_with_and(all_join_conditions);
926
927    // Fall back to nested loop join for all other cases
928    // For NATURAL JOIN, we need to preserve the original schemas for duplicate removal
929    let (left_schema_for_natural, right_schema_for_natural) = if natural {
930        (Some(left.schema.clone()), Some(right.schema.clone()))
931    } else {
932        (None, None)
933    };
934
935    let mut result = match join_type {
936        vibesql_ast::JoinType::Inner => nested_loop_inner_join(left, right, &combined_condition, database, timeout_ctx),
937        vibesql_ast::JoinType::LeftOuter => {
938            nested_loop_left_outer_join(left, right, &combined_condition, database, timeout_ctx)
939        }
940        vibesql_ast::JoinType::RightOuter => {
941            nested_loop_right_outer_join(left, right, &combined_condition, database, timeout_ctx)
942        }
943        vibesql_ast::JoinType::FullOuter => {
944            nested_loop_full_outer_join(left, right, &combined_condition, database, timeout_ctx)
945        }
946        vibesql_ast::JoinType::Cross => nested_loop_cross_join(left, right, &combined_condition, database, timeout_ctx),
947        vibesql_ast::JoinType::Semi => nested_loop_semi_join(left, right, &combined_condition, database, timeout_ctx),
948        vibesql_ast::JoinType::Anti => nested_loop_anti_join(left, right, &combined_condition, database, timeout_ctx),
949    }?;
950
951    // For NATURAL JOIN, remove duplicate columns from the result
952    if natural {
953        if let (Some(left_schema), Some(right_schema)) = (left_schema_for_natural, right_schema_for_natural) {
954            result = remove_duplicate_columns_for_natural_join(result, &left_schema, &right_schema)?;
955        }
956    }
957
958    Ok(result)
959}
960
961/// Remove duplicate columns for NATURAL JOIN
962///
963/// NATURAL JOIN should only include common columns once (from the left side).
964/// This function identifies common columns and removes duplicates from the right side.
965fn remove_duplicate_columns_for_natural_join(
966    mut result: FromResult,
967    left_schema: &CombinedSchema,
968    right_schema: &CombinedSchema,
969) -> Result<FromResult, ExecutorError> {
970    use std::collections::{HashMap, HashSet};
971
972    // Find common column names (case-insensitive)
973    let mut left_column_map: HashMap<String, Vec<(String, String, usize)>> = HashMap::new(); // lowercase -> [(table, actual_name, idx)]
974    let mut col_idx = 0;
975    for (table_name, (_table_idx, table_schema)) in &left_schema.table_schemas {
976        for col in &table_schema.columns {
977            let lowercase = col.name.to_lowercase();
978            left_column_map
979                .entry(lowercase)
980                .or_default()
981                .push((table_name.clone(), col.name.clone(), col_idx));
982            col_idx += 1;
983        }
984    }
985
986    // Identify which columns from the right side are duplicates
987    let mut right_duplicate_indices: HashSet<usize> = HashSet::new();
988    let left_col_count = col_idx;
989    col_idx = 0;
990    for (_table_idx, table_schema) in right_schema.table_schemas.values() {
991        for col in &table_schema.columns {
992            let lowercase = col.name.to_lowercase();
993            if left_column_map.contains_key(&lowercase) {
994                // This is a common column, mark it as a duplicate to remove
995                right_duplicate_indices.insert(left_col_count + col_idx);
996            }
997            col_idx += 1;
998        }
999    }
1000
1001    // If no duplicates, return as-is
1002    if right_duplicate_indices.is_empty() {
1003        return Ok(result);
1004    }
1005
1006    // Project out the duplicate columns from the result
1007    let total_cols = left_col_count + col_idx;
1008    let keep_indices: Vec<usize> = (0..total_cols)
1009        .filter(|i| !right_duplicate_indices.contains(i))
1010        .collect();
1011
1012    // Build new schema without duplicate columns
1013    let mut new_schema = CombinedSchema { table_schemas: HashMap::new(), total_columns: 0 };
1014    for (table_name, (table_start_idx, table_schema)) in &result.schema.table_schemas {
1015        let mut new_cols = Vec::new();
1016
1017        for (idx, col) in table_schema.columns.iter().enumerate() {
1018            // Calculate absolute column index manually
1019            let abs_col_idx = table_start_idx + idx;
1020
1021            if keep_indices.contains(&abs_col_idx) {
1022                new_cols.push(col.clone());
1023            }
1024        }
1025
1026        if !new_cols.is_empty() {
1027            let new_table_schema = vibesql_catalog::TableSchema::new(
1028                table_schema.name.clone(),
1029                new_cols,
1030            );
1031            new_schema.table_schemas.insert(
1032                table_name.clone(),
1033                (new_schema.total_columns, new_table_schema.clone()),
1034            );
1035            new_schema.total_columns += new_table_schema.columns.len();
1036        }
1037    }
1038
1039    // Project the rows - get mutable reference to rows to work with FromResult API
1040    let rows = result.rows();
1041    let new_rows: Vec<vibesql_storage::Row> = rows
1042        .iter()
1043        .map(|row| {
1044            let new_values: Vec<vibesql_types::SqlValue> = keep_indices
1045                .iter()
1046                .filter_map(|&i| row.values.get(i).cloned())
1047                .collect();
1048            vibesql_storage::Row::new(new_values)
1049        })
1050        .collect();
1051
1052    Ok(FromResult::from_rows(new_schema, new_rows))
1053}