vibesql_executor/select/join/
mod.rs

1use std::{collections::HashMap, sync::Arc};
2
3use super::{cte::CteResult, from_iterator::FromIterator};
4use crate::{
5    errors::ExecutorError, evaluator::CombinedExpressionEvaluator, optimizer::combine_with_and,
6    schema::CombinedSchema, timeout::TimeoutContext,
7};
8
9mod bloom_filter;
10mod expression_mapper;
11mod hash_anti_join;
12pub(crate) mod hash_join;
13mod hash_join_iterator;
14mod hash_semi_join;
15mod join_analyzer;
16mod nested_loop;
17pub mod reorder;
18pub mod search;
19
20// Re-export Bloom filter for use in hash join implementations
21pub(crate) use bloom_filter::BloomFilter;
22
23#[cfg(test)]
24mod tests;
25
26// Re-export join reorder analyzer for public tests
27// Re-export hash_join functions for internal use
28use hash_anti_join::{hash_anti_join, hash_anti_join_with_filter};
29use hash_join::{
30    hash_join_inner, hash_join_inner_arithmetic, hash_join_inner_multi, hash_join_left_outer,
31    hash_join_left_outer_multi, hash_join_left_outer_with_filter,
32};
33// Re-export hash join iterator for public use
34pub use hash_join_iterator::HashJoinIterator;
35use hash_semi_join::{hash_semi_join, hash_semi_join_with_filter};
36// Re-export nested loop join variants for internal use
37use nested_loop::{
38    nested_loop_anti_join, nested_loop_cross_join, nested_loop_full_outer_join,
39    nested_loop_inner_join, nested_loop_left_outer_join, nested_loop_right_outer_join,
40    nested_loop_semi_join,
41};
42pub use reorder::JoinOrderAnalyzer;
43// Re-export join order search for public tests
44pub use search::JoinOrderSearch;
45
46/// Iterator over `FromData` rows without forcing full materialization
47///
48/// This enum wraps either a Vec iterator or a lazy `FromIterator`, allowing
49/// uniform iteration over rows regardless of how they were stored.
50///
51/// # Issue #4060
52///
53/// This type enables deferred row materialization for LIMIT queries:
54/// - `SELECT * FROM t LIMIT 10` only clones 10 rows, not all of `t`
55/// - Memory usage is O(LIMIT) instead of O(table_size)
56#[allow(dead_code)]
57pub(crate) enum FromDataIterator {
58    /// Iterator over a materialized Vec<Row>
59    Vec(std::vec::IntoIter<vibesql_storage::Row>),
60    /// Lazy iterator from FromIterator (table scan)
61    Lazy(FromIterator),
62}
63
64impl Iterator for FromDataIterator {
65    type Item = vibesql_storage::Row;
66
67    #[inline]
68    fn next(&mut self) -> Option<Self::Item> {
69        match self {
70            Self::Vec(iter) => iter.next(),
71            Self::Lazy(iter) => iter.next(),
72        }
73    }
74
75    #[inline]
76    fn size_hint(&self) -> (usize, Option<usize>) {
77        match self {
78            Self::Vec(iter) => iter.size_hint(),
79            Self::Lazy(iter) => iter.size_hint(),
80        }
81    }
82}
83
84/// Data source for FROM clause results
85///
86/// This enum allows FROM results to be either materialized (Vec<Row>) or lazy (iterator).
87/// Materialized results are used for JOINs, CTEs, and operations that need multiple passes.
88/// Lazy results are used for simple table scans to enable streaming execution.
89pub(crate) enum FromData {
90    /// Materialized rows (for JOINs, CTEs, operations needing multiple passes)
91    Materialized(Vec<vibesql_storage::Row>),
92
93    /// Shared rows (for zero-copy CTE references without filtering)
94    ///
95    /// This variant enables O(1) cloning when CTEs are referenced multiple times
96    /// without any filtering. Only materializes to Vec when mutation is needed.
97    SharedRows(Arc<Vec<vibesql_storage::Row>>),
98
99    /// Lazy iterator (for streaming table scans)
100    Iterator(FromIterator),
101}
102
103impl FromData {
104    /// Get rows, materializing if needed
105    ///
106    /// For SharedRows, this will clone only if the Arc is shared.
107    /// If the Arc has a single reference, the Vec is moved out efficiently.
108    pub fn into_rows(self) -> Vec<vibesql_storage::Row> {
109        match self {
110            Self::Materialized(rows) => rows,
111            Self::SharedRows(arc) => Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()),
112            Self::Iterator(iter) => iter.collect_vec(),
113        }
114    }
115
116    /// Returns an iterator over rows without forcing full materialization
117    ///
118    /// This is more efficient than `into_rows()` when you don't need all rows,
119    /// particularly for LIMIT queries where only a subset will be consumed.
120    ///
121    /// For Materialized and SharedRows variants, this returns an iterator over
122    /// the owned/cloned Vec. For Iterator variant, it returns the lazy iterator
123    /// directly without collecting.
124    ///
125    /// # Performance
126    ///
127    /// - `into_rows()`: O(n) allocation + cloning for all rows
128    /// - `into_iter()`: O(k) where k is the number of rows actually consumed
129    ///
130    /// Use `into_iter()` when:
131    /// - Processing with LIMIT (only need first N rows)
132    /// - Filtering results (may discard many rows)
133    /// - Streaming output without full materialization
134    #[allow(dead_code)]
135    pub fn into_iter(self) -> FromDataIterator {
136        match self {
137            Self::Materialized(rows) => FromDataIterator::Vec(rows.into_iter()),
138            Self::SharedRows(arc) => {
139                // Try to unwrap the Arc; if shared, clone the Vec
140                let rows = Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone());
141                FromDataIterator::Vec(rows.into_iter())
142            }
143            Self::Iterator(iter) => FromDataIterator::Lazy(iter),
144        }
145    }
146
147    /// Get a reference to materialized rows, or materialize if iterator
148    ///
149    /// For SharedRows, returns a reference to the shared data without cloning.
150    pub fn as_rows(&mut self) -> &Vec<vibesql_storage::Row> {
151        // If we have an iterator, materialize it
152        if let Self::Iterator(iter) = self {
153            #[cfg(feature = "profile-q6")]
154            let materialize_start = std::time::Instant::now();
155
156            let rows = std::mem::replace(iter, FromIterator::from_vec(vec![])).collect_vec();
157            *self = Self::Materialized(rows);
158
159            #[cfg(feature = "profile-q6")]
160            {
161                let materialize_time = materialize_start.elapsed();
162                if let Self::Materialized(rows) = self {
163                    eprintln!(
164                        "[Q6 PROFILE] Row materialization (collect_vec): {:?} ({} rows, {:?}/row)",
165                        materialize_time,
166                        rows.len(),
167                        materialize_time / rows.len() as u32
168                    );
169                }
170            }
171        }
172
173        // Now we're guaranteed to have materialized or shared rows
174        match self {
175            Self::Materialized(ref rows) => rows,
176            Self::SharedRows(ref arc) => arc.as_ref(),
177            Self::Iterator(_) => unreachable!(),
178        }
179    }
180
181    /// Get a slice reference to the underlying rows without triggering materialization
182    ///
183    /// This is a zero-cost operation that directly accesses the underlying Vec<Row>
184    /// without calling collect_vec(). This avoids the 137ms row materialization
185    /// bottleneck in Q6 by skipping iteration entirely.
186    ///
187    /// Critical for columnar execution performance (#2521).
188    pub fn as_slice(&self) -> &[vibesql_storage::Row] {
189        match self {
190            Self::Materialized(rows) => rows.as_slice(),
191            Self::SharedRows(arc) => arc.as_slice(),
192            Self::Iterator(iter) => iter.as_slice(),
193        }
194    }
195}
196
197/// Result of executing a FROM clause
198///
199/// Contains the combined schema and data (either materialized or lazy).
200pub(crate) struct FromResult {
201    pub(crate) schema: CombinedSchema,
202    pub(crate) data: FromData,
203    /// If present, indicates that results are already sorted by the specified columns
204    /// in the given order (ASC/DESC). This allows skipping ORDER BY sorting.
205    pub(crate) sorted_by: Option<Vec<(String, vibesql_ast::OrderDirection)>>,
206    /// If true, indicates that WHERE clause filtering has already been fully applied
207    /// during the scan (e.g., by index scan with predicate pushdown). This allows
208    /// skipping redundant WHERE clause evaluation in the executor.
209    pub(crate) where_filtered: bool,
210}
211
212impl FromResult {
213    /// Create a FromResult from materialized rows
214    pub(super) fn from_rows(schema: CombinedSchema, rows: Vec<vibesql_storage::Row>) -> Self {
215        Self { schema, data: FromData::Materialized(rows), sorted_by: None, where_filtered: false }
216    }
217
218    /// Create a FromResult from shared rows (zero-copy for CTEs)
219    ///
220    /// This variant is used when CTE rows can be shared without cloning,
221    /// enabling O(1) memory usage for CTE references without filtering.
222    pub(super) fn from_shared_rows(
223        schema: CombinedSchema,
224        rows: Arc<Vec<vibesql_storage::Row>>,
225    ) -> Self {
226        Self { schema, data: FromData::SharedRows(rows), sorted_by: None, where_filtered: false }
227    }
228
229    /// Create a FromResult from materialized rows with sorting metadata
230    pub(super) fn from_rows_sorted(
231        schema: CombinedSchema,
232        rows: Vec<vibesql_storage::Row>,
233        sorted_by: Vec<(String, vibesql_ast::OrderDirection)>,
234    ) -> Self {
235        Self {
236            schema,
237            data: FromData::Materialized(rows),
238            sorted_by: Some(sorted_by),
239            where_filtered: false,
240        }
241    }
242
243    /// Create a FromResult from materialized rows with WHERE filtering already applied
244    pub(super) fn from_rows_where_filtered(
245        schema: CombinedSchema,
246        rows: Vec<vibesql_storage::Row>,
247        sorted_by: Option<Vec<(String, vibesql_ast::OrderDirection)>>,
248    ) -> Self {
249        Self { schema, data: FromData::Materialized(rows), sorted_by, where_filtered: true }
250    }
251
252    /// Create a FromResult from an iterator
253    pub(super) fn from_iterator(schema: CombinedSchema, iterator: FromIterator) -> Self {
254        Self { schema, data: FromData::Iterator(iterator), sorted_by: None, where_filtered: false }
255    }
256
257    /// Get the rows, materializing if needed
258    pub(super) fn into_rows(self) -> Vec<vibesql_storage::Row> {
259        self.data.into_rows()
260    }
261
262    /// Returns an iterator over rows without forcing full materialization
263    ///
264    /// This delegates to `FromData::into_iter()` and is more efficient than
265    /// `into_rows()` when only a subset of rows will be consumed.
266    ///
267    /// # Example Use Cases
268    ///
269    /// - LIMIT queries: `result.into_iter().take(10).collect()`
270    /// - Filtered iteration: `result.into_iter().filter(|r| ...).collect()`
271    /// - Early termination: Stop iterating when a condition is met
272    ///
273    /// # Issue #4060
274    #[allow(dead_code)]
275    pub(super) fn into_iter(self) -> FromDataIterator {
276        self.data.into_iter()
277    }
278
279    /// Take up to N rows without full materialization
280    ///
281    /// This is a convenience method equivalent to `self.into_iter().take(n).collect()`.
282    /// It's optimized for LIMIT queries where only a small subset of rows is needed.
283    ///
284    /// # Performance
285    ///
286    /// For a table with 10,000 rows and `take(10)`:
287    /// - `into_rows()` + truncate: clones all 10,000 rows, then discards 9,990
288    /// - `take(10)`: clones only 10 rows
289    ///
290    /// # Issue #4060
291    #[allow(dead_code)]
292    pub(super) fn take(self, n: usize) -> Vec<vibesql_storage::Row> {
293        self.into_iter().take(n).collect()
294    }
295
296    /// Get a mutable reference to the rows, materializing if needed
297    ///
298    /// For SharedRows, this triggers copy-on-write: the shared data is cloned
299    /// into owned Materialized data to allow mutation.
300    #[allow(dead_code)]
301    pub(super) fn rows_mut(&mut self) -> &mut Vec<vibesql_storage::Row> {
302        // Convert iterator or shared to materialized
303        match &mut self.data {
304            FromData::Iterator(iter) => {
305                let rows = std::mem::replace(iter, FromIterator::from_vec(vec![])).collect_vec();
306                self.data = FromData::Materialized(rows);
307            }
308            FromData::SharedRows(arc) => {
309                // Copy-on-write: clone the shared data to allow mutation
310                let rows = arc.as_ref().clone();
311                self.data = FromData::Materialized(rows);
312            }
313            FromData::Materialized(_) => {}
314        }
315
316        // Now we're guaranteed to have materialized rows
317        match &mut self.data {
318            FromData::Materialized(rows) => rows,
319            FromData::SharedRows(_) | FromData::Iterator(_) => unreachable!(),
320        }
321    }
322
323    /// Get a reference to rows, materializing if needed
324    pub(super) fn rows(&mut self) -> &Vec<vibesql_storage::Row> {
325        self.data.as_rows()
326    }
327
328    /// Get a slice reference to rows without triggering materialization
329    ///
330    /// This is a zero-cost operation that accesses the underlying data directly
331    /// without calling collect_vec(). This is critical for performance as it
332    /// avoids the row materialization bottleneck (up to 57% of query time).
333    ///
334    /// Unlike `rows()` which may trigger iterator collection, this method
335    /// provides direct access to the underlying Vec<Row> or iterator buffer.
336    pub(super) fn as_slice(&self) -> &[vibesql_storage::Row] {
337        self.data.as_slice()
338    }
339}
340
341/// Helper function to combine two rows without unnecessary cloning
342/// Only creates a single combined row, avoiding intermediate clones
343///
344/// Note: This preserves existing row_ids from both rows when present.
345/// For optimal ROWID tracking in JOINs, use combine_rows_with_schema.
346#[inline]
347fn combine_rows(
348    left_row: &vibesql_storage::Row,
349    right_row: &vibesql_storage::Row,
350) -> vibesql_storage::Row {
351    let mut combined_values = Vec::with_capacity(left_row.values.len() + right_row.values.len());
352    combined_values.extend_from_slice(&left_row.values);
353    combined_values.extend_from_slice(&right_row.values);
354
355    // Merge existing row_ids from both rows (issue #4370)
356    // This handles nested joins where intermediate results already have row_ids
357    let mut combined_row_ids = std::collections::HashMap::new();
358    let mut has_row_ids = false;
359
360    if let Some(ref row_ids) = left_row.row_ids {
361        combined_row_ids.extend(row_ids.iter().map(|(k, v)| (k.clone(), *v)));
362        has_row_ids = true;
363    }
364    if let Some(ref row_ids) = right_row.row_ids {
365        combined_row_ids.extend(row_ids.iter().map(|(k, v)| (k.clone(), *v)));
366        has_row_ids = true;
367    }
368
369    if has_row_ids {
370        vibesql_storage::Row::with_row_ids(combined_values, combined_row_ids)
371    } else {
372        vibesql_storage::Row::new(combined_values)
373    }
374}
375
376/// Apply a post-join filter expression to join result rows
377///
378/// This is used to filter rows produced by hash join with additional conditions
379/// from the WHERE clause that weren't used in the hash join itself.
380///
381/// Issue #3562: Added cte_results parameter so IN subqueries in filter expressions
382/// can resolve CTE references.
383fn apply_post_join_filter(
384    result: FromResult,
385    filter_expr: &vibesql_ast::Expression,
386    database: &vibesql_storage::Database,
387    cte_results: &HashMap<String, CteResult>,
388) -> Result<FromResult, ExecutorError> {
389    // Extract schema before moving result
390    let schema = result.schema.clone();
391    // Issue #3562: Use evaluator with CTE context if CTEs exist
392    let evaluator = if cte_results.is_empty() {
393        CombinedExpressionEvaluator::with_database(&schema, database)
394    } else {
395        CombinedExpressionEvaluator::with_database_and_cte(&schema, database, cte_results)
396    };
397
398    // Filter rows based on the expression
399    let mut filtered_rows = Vec::new();
400    for row in result.into_rows() {
401        match evaluator.eval(filter_expr, &row)? {
402            vibesql_types::SqlValue::Boolean(true) => filtered_rows.push(row),
403            vibesql_types::SqlValue::Boolean(false) => {} // Skip this row
404            vibesql_types::SqlValue::Null => {}           // Skip NULL results
405            // SQLLogicTest compatibility: treat integers as truthy/falsy
406            vibesql_types::SqlValue::Integer(0) => {} // Skip 0
407            vibesql_types::SqlValue::Integer(_) => filtered_rows.push(row),
408            vibesql_types::SqlValue::Smallint(0) => {} // Skip 0
409            vibesql_types::SqlValue::Smallint(_) => filtered_rows.push(row),
410            vibesql_types::SqlValue::Bigint(0) => {} // Skip 0
411            vibesql_types::SqlValue::Bigint(_) => filtered_rows.push(row),
412            vibesql_types::SqlValue::Float(0.0) => {} // Skip 0.0
413            vibesql_types::SqlValue::Float(_) => filtered_rows.push(row),
414            vibesql_types::SqlValue::Real(0.0) => {} // Skip 0.0
415            vibesql_types::SqlValue::Real(_) => filtered_rows.push(row),
416            vibesql_types::SqlValue::Double(0.0) => {} // Skip 0.0
417            vibesql_types::SqlValue::Double(_) => filtered_rows.push(row),
418            other => {
419                return Err(ExecutorError::InvalidWhereClause(format!(
420                    "Filter expression must evaluate to boolean, got: {:?}",
421                    other
422                )))
423            }
424        }
425    }
426
427    Ok(FromResult::from_rows(schema, filtered_rows))
428}
429
430/// Perform join between two FROM results, optimizing with hash join when possible
431///
432/// This function now supports predicate pushdown from WHERE clauses. Additional equijoin
433/// predicates from WHERE can be passed to optimize hash join selection and execution.
434///
435/// Note: This function combines rows from left and right according to the join type
436/// and join condition. For queries with many tables and large intermediate results,
437/// consider applying WHERE filters earlier to reduce memory usage.
438///
439/// Issue #3562: Added cte_results parameter so post-join filters with IN subqueries
440/// can resolve CTE references.
441#[allow(clippy::too_many_arguments)]
442pub(super) fn nested_loop_join(
443    left: FromResult,
444    right: FromResult,
445    join_type: &vibesql_ast::JoinType,
446    condition: &Option<vibesql_ast::Expression>,
447    natural: bool,
448    using_columns: Option<&[String]>,
449    database: &vibesql_storage::Database,
450    additional_equijoins: &[vibesql_ast::Expression],
451    timeout_ctx: &TimeoutContext,
452    cte_results: &HashMap<String, CteResult>,
453) -> Result<FromResult, ExecutorError> {
454    // Try to use hash join for INNER JOINs with simple equi-join conditions
455    if let vibesql_ast::JoinType::Inner = join_type {
456        // Get column count and right table info once for analysis
457        // IMPORTANT: Sum up columns from ALL tables in the left schema,
458        // not just the first table, to handle accumulated multi-table joins
459        let left_col_count = left.schema.total_columns;
460
461        // Use merge to preserve all tables from nested joins for column resolution
462        let temp_schema = CombinedSchema::merge(left.schema.clone(), right.schema.clone());
463
464        // Phase 3.1: Try ON condition first (preferred for hash join)
465        // Now supports multi-column composite keys for better performance
466        if let Some(cond) = condition {
467            // First try multi-column hash join for composite keys (2+ equi-join conditions)
468            if let Some(multi_result) =
469                join_analyzer::analyze_multi_column_equi_join(cond, &temp_schema, left_col_count)
470            {
471                // Use multi-column hash join if there are 2+ join columns
472                if multi_result.equi_joins.left_col_indices.len() >= 2 {
473                    // Save schemas for NATURAL JOIN processing before moving left/right
474                    let (left_schema_for_dedup, right_schema_for_dedup) =
475                        if natural || using_columns.is_some() {
476                            (Some(left.schema.clone()), Some(right.schema.clone()))
477                        } else {
478                            (None, None)
479                        };
480
481                    let mut result = hash_join_inner_multi(
482                        left,
483                        right,
484                        &multi_result.equi_joins.left_col_indices,
485                        &multi_result.equi_joins.right_col_indices,
486                    )?;
487
488                    // Apply remaining conditions as post-join filter
489                    if !multi_result.remaining_conditions.is_empty() {
490                        if let Some(filter_expr) =
491                            combine_with_and(multi_result.remaining_conditions)
492                        {
493                            result = apply_post_join_filter(
494                                result,
495                                &filter_expr,
496                                database,
497                                cte_results,
498                            )?;
499                        }
500                    }
501
502                    // For NATURAL JOIN or USING clause, remove duplicate columns from the result
503                    if natural {
504                        if let (Some(ref left_schema), Some(ref right_schema_orig)) =
505                            (left_schema_for_dedup, right_schema_for_dedup)
506                        {
507                            // Use the original right schema directly to handle nested joins
508                            // with multiple tables (e.g., t1 NATURAL JOIN (t2 JOIN t3))
509                            result = remove_duplicate_columns_for_natural_join(
510                                result,
511                                left_schema,
512                                right_schema_orig,
513                                join_type,
514                            )?;
515                        }
516                    } else if let Some(using_cols) = using_columns {
517                        if let (Some(ref left_schema), Some(ref right_schema_orig)) =
518                            (left_schema_for_dedup, right_schema_for_dedup)
519                        {
520                            result = remove_duplicate_columns_for_using_join(
521                                result,
522                                left_schema,
523                                right_schema_orig,
524                                using_cols,
525                                join_type,
526                            )?;
527                        }
528                    }
529
530                    return Ok(result);
531                }
532
533                // Single-column equi-join: use standard hash join (more efficient for single key)
534                // Save schemas for NATURAL JOIN processing before moving left/right
535                let (left_schema_for_dedup, right_schema_for_dedup) =
536                    if natural || using_columns.is_some() {
537                        (Some(left.schema.clone()), Some(right.schema.clone()))
538                    } else {
539                        (None, None)
540                    };
541
542                let mut result = hash_join_inner(
543                    left,
544                    right,
545                    multi_result.equi_joins.left_col_indices[0],
546                    multi_result.equi_joins.right_col_indices[0],
547                )?;
548
549                // Apply remaining conditions as post-join filter
550                if !multi_result.remaining_conditions.is_empty() {
551                    if let Some(filter_expr) = combine_with_and(multi_result.remaining_conditions) {
552                        result =
553                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
554                    }
555                }
556
557                // For NATURAL JOIN or USING clause, remove duplicate columns from the result
558                if natural {
559                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
560                        (left_schema_for_dedup, right_schema_for_dedup)
561                    {
562                        // Use the original right schema directly to handle nested joins
563                        // with multiple tables (e.g., t1 NATURAL JOIN (t2 JOIN t3))
564                        result = remove_duplicate_columns_for_natural_join(
565                            result,
566                            left_schema,
567                            right_schema_orig,
568                            join_type,
569                        )?;
570                    }
571                } else if let Some(using_cols) = using_columns {
572                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
573                        (left_schema_for_dedup, right_schema_for_dedup)
574                    {
575                        result = remove_duplicate_columns_for_using_join(
576                            result,
577                            left_schema,
578                            right_schema_orig,
579                            using_cols,
580                            join_type,
581                        )?;
582                    }
583                }
584
585                return Ok(result);
586            }
587        }
588
589        // Phase 3.2: Try OR conditions with common equi-join (TPC-H Q19 optimization)
590        // For expressions like `(a.x = b.x AND ...) OR (a.x = b.x AND ...) OR (a.x = b.x AND ...)`,
591        // extract the common equi-join `a.x = b.x` for hash join
592        if let Some(cond) = condition {
593            if let Some(or_result) =
594                join_analyzer::analyze_or_equi_join(cond, &temp_schema, left_col_count)
595            {
596                // Save schemas for NATURAL JOIN processing before moving left/right
597                let (left_schema_for_dedup, right_schema_for_dedup) =
598                    if natural || using_columns.is_some() {
599                        (Some(left.schema.clone()), Some(right.schema.clone()))
600                    } else {
601                        (None, None)
602                    };
603
604                let mut result = hash_join_inner(
605                    left,
606                    right,
607                    or_result.equi_join.left_col_idx,
608                    or_result.equi_join.right_col_idx,
609                )?;
610
611                // Apply remaining OR conditions as post-join filter
612                if !or_result.remaining_conditions.is_empty() {
613                    if let Some(filter_expr) = combine_with_and(or_result.remaining_conditions) {
614                        result =
615                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
616                    }
617                }
618
619                // For NATURAL JOIN or USING clause, remove duplicate columns from the result
620                if natural {
621                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
622                        (left_schema_for_dedup, right_schema_for_dedup)
623                    {
624                        // Use the original right schema directly to handle nested joins
625                        // with multiple tables (e.g., t1 NATURAL JOIN (t2 JOIN t3))
626                        result = remove_duplicate_columns_for_natural_join(
627                            result,
628                            left_schema,
629                            right_schema_orig,
630                            join_type,
631                        )?;
632                    }
633                } else if let Some(using_cols) = using_columns {
634                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
635                        (left_schema_for_dedup, right_schema_for_dedup)
636                    {
637                        result = remove_duplicate_columns_for_using_join(
638                            result,
639                            left_schema,
640                            right_schema_orig,
641                            using_cols,
642                            join_type,
643                        )?;
644                    }
645                }
646
647                return Ok(result);
648            }
649        }
650
651        // Phase 3.3: Try arithmetic equi-join (TPC-DS Q2 optimization)
652        // For expressions like `col1 = col2 - 53`, extract the arithmetic offset for hash join
653        if let Some(cond) = condition {
654            if let Some(arith_info) =
655                join_analyzer::analyze_arithmetic_equi_join(cond, &temp_schema, left_col_count)
656            {
657                // Save schemas for NATURAL JOIN processing before moving left/right
658                let (left_schema_for_dedup, right_schema_for_dedup) =
659                    if natural || using_columns.is_some() {
660                        (Some(left.schema.clone()), Some(right.schema.clone()))
661                    } else {
662                        (None, None)
663                    };
664
665                let mut result = hash_join_inner_arithmetic(
666                    left,
667                    right,
668                    arith_info.left_col_idx,
669                    arith_info.right_col_idx,
670                    arith_info.offset,
671                )?;
672
673                // For NATURAL JOIN or USING clause, remove duplicate columns from the result
674                if natural {
675                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
676                        (left_schema_for_dedup, right_schema_for_dedup)
677                    {
678                        // Use the original right schema directly to handle nested joins
679                        // with multiple tables (e.g., t1 NATURAL JOIN (t2 JOIN t3))
680                        result = remove_duplicate_columns_for_natural_join(
681                            result,
682                            left_schema,
683                            right_schema_orig,
684                            join_type,
685                        )?;
686                    }
687                } else if let Some(using_cols) = using_columns {
688                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
689                        (left_schema_for_dedup, right_schema_for_dedup)
690                    {
691                        result = remove_duplicate_columns_for_using_join(
692                            result,
693                            left_schema,
694                            right_schema_orig,
695                            using_cols,
696                            join_type,
697                        )?;
698                    }
699                }
700
701                return Ok(result);
702            }
703        }
704
705        // Phase 3.4: Try multi-column hash join from WHERE clause conditions
706        // When there are multiple equijoin conditions (e.g., ps_suppkey = l_suppkey AND ps_partkey
707        // = l_partkey), using composite key hash join is critical for performance.
708        // Single-key hash join with post-filter can cause catastrophic performance issues
709        // (48B cartesian products in Q9 at SF=0.1).
710        if additional_equijoins.len() >= 2 {
711            // Collect all valid equi-join conditions
712            let mut left_col_indices = Vec::new();
713            let mut right_col_indices = Vec::new();
714            let mut used_indices = Vec::new();
715
716            for (idx, equijoin) in additional_equijoins.iter().enumerate() {
717                if let Some(equi_join_info) =
718                    join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
719                {
720                    left_col_indices.push(equi_join_info.left_col_idx);
721                    right_col_indices.push(equi_join_info.right_col_idx);
722                    used_indices.push(idx);
723                }
724            }
725
726            // If we found 2+ equi-join conditions, use multi-column hash join
727            if left_col_indices.len() >= 2 {
728                // Save schemas for NATURAL JOIN processing before moving left/right
729                let (left_schema_for_dedup, right_schema_for_dedup) =
730                    if natural || using_columns.is_some() {
731                        (Some(left.schema.clone()), Some(right.schema.clone()))
732                    } else {
733                        (None, None)
734                    };
735
736                let mut result =
737                    hash_join_inner_multi(left, right, &left_col_indices, &right_col_indices)?;
738
739                // Apply remaining conditions (non-equijoins) as post-join filters
740                let remaining_conditions: Vec<_> = additional_equijoins
741                    .iter()
742                    .enumerate()
743                    .filter(|(i, _)| !used_indices.contains(i))
744                    .map(|(_, e)| e.clone())
745                    .collect();
746
747                if !remaining_conditions.is_empty() {
748                    if let Some(filter_expr) = combine_with_and(remaining_conditions) {
749                        result =
750                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
751                    }
752                }
753
754                // For NATURAL JOIN or USING clause, remove duplicate columns from the result
755                if natural {
756                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
757                        (left_schema_for_dedup, right_schema_for_dedup)
758                    {
759                        // Use the original right schema directly to handle nested joins
760                        // with multiple tables (e.g., t1 NATURAL JOIN (t2 JOIN t3))
761                        result = remove_duplicate_columns_for_natural_join(
762                            result,
763                            left_schema,
764                            right_schema_orig,
765                            join_type,
766                        )?;
767                    }
768                } else if let Some(using_cols) = using_columns {
769                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
770                        (left_schema_for_dedup, right_schema_for_dedup)
771                    {
772                        result = remove_duplicate_columns_for_using_join(
773                            result,
774                            left_schema,
775                            right_schema_orig,
776                            using_cols,
777                            join_type,
778                        )?;
779                    }
780                }
781
782                return Ok(result);
783            }
784        }
785
786        // Phase 3.5: If no multi-column hash join, try single-column WHERE clause equijoins
787        // Iterate through all additional equijoins to find one suitable for hash join
788        for (idx, equijoin) in additional_equijoins.iter().enumerate() {
789            if let Some(equi_join_info) =
790                join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
791            {
792                // Save schemas for NATURAL JOIN processing before moving left/right
793                let (left_schema_for_dedup, right_schema_for_dedup) =
794                    if natural || using_columns.is_some() {
795                        (Some(left.schema.clone()), Some(right.schema.clone()))
796                    } else {
797                        (None, None)
798                    };
799
800                // Found a WHERE clause equijoin suitable for hash join!
801                let mut result = hash_join_inner(
802                    left,
803                    right,
804                    equi_join_info.left_col_idx,
805                    equi_join_info.right_col_idx,
806                )?;
807
808                // Apply remaining equijoins and conditions as post-join filters
809                let remaining_conditions: Vec<_> = additional_equijoins
810                    .iter()
811                    .enumerate()
812                    .filter(|(i, _)| *i != idx)
813                    .map(|(_, e)| e.clone())
814                    .collect();
815
816                if !remaining_conditions.is_empty() {
817                    if let Some(filter_expr) = combine_with_and(remaining_conditions) {
818                        result =
819                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
820                    }
821                }
822
823                // For NATURAL JOIN or USING clause, remove duplicate columns from the result
824                if natural {
825                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
826                        (left_schema_for_dedup, right_schema_for_dedup)
827                    {
828                        // Use the original right schema directly to handle nested joins
829                        // with multiple tables (e.g., t1 NATURAL JOIN (t2 JOIN t3))
830                        result = remove_duplicate_columns_for_natural_join(
831                            result,
832                            left_schema,
833                            right_schema_orig,
834                            join_type,
835                        )?;
836                    }
837                } else if let Some(using_cols) = using_columns {
838                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
839                        (left_schema_for_dedup, right_schema_for_dedup)
840                    {
841                        result = remove_duplicate_columns_for_using_join(
842                            result,
843                            left_schema,
844                            right_schema_orig,
845                            using_cols,
846                            join_type,
847                        )?;
848                    }
849                }
850
851                return Ok(result);
852            }
853        }
854
855        // Phase 3.6: Try arithmetic equijoins from WHERE clause for hash join
856        // For expressions like `col1 = col2 - 53` in WHERE clause with Inner joins
857        // This enables hash join for derived table joins with arithmetic conditions (TPC-DS Q2)
858        for (idx, equijoin) in additional_equijoins.iter().enumerate() {
859            if let Some(arith_info) =
860                join_analyzer::analyze_arithmetic_equi_join(equijoin, &temp_schema, left_col_count)
861            {
862                // Save schemas for NATURAL JOIN processing before moving left/right
863                let (left_schema_for_dedup, right_schema_for_dedup) =
864                    if natural || using_columns.is_some() {
865                        (Some(left.schema.clone()), Some(right.schema.clone()))
866                    } else {
867                        (None, None)
868                    };
869
870                // Found an arithmetic equijoin suitable for hash join!
871                let mut result = hash_join_inner_arithmetic(
872                    left,
873                    right,
874                    arith_info.left_col_idx,
875                    arith_info.right_col_idx,
876                    arith_info.offset,
877                )?;
878
879                // Apply remaining equijoins as post-join filters
880                let remaining_conditions: Vec<_> = additional_equijoins
881                    .iter()
882                    .enumerate()
883                    .filter(|(i, _)| *i != idx)
884                    .map(|(_, e)| e.clone())
885                    .collect();
886
887                if !remaining_conditions.is_empty() {
888                    if let Some(filter_expr) = combine_with_and(remaining_conditions) {
889                        result =
890                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
891                    }
892                }
893
894                // For NATURAL JOIN or USING clause, remove duplicate columns from the result
895                if natural {
896                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
897                        (left_schema_for_dedup, right_schema_for_dedup)
898                    {
899                        // Use the original right schema directly to handle nested joins
900                        // with multiple tables (e.g., t1 NATURAL JOIN (t2 JOIN t3))
901                        result = remove_duplicate_columns_for_natural_join(
902                            result,
903                            left_schema,
904                            right_schema_orig,
905                            join_type,
906                        )?;
907                    }
908                } else if let Some(using_cols) = using_columns {
909                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
910                        (left_schema_for_dedup, right_schema_for_dedup)
911                    {
912                        result = remove_duplicate_columns_for_using_join(
913                            result,
914                            left_schema,
915                            right_schema_orig,
916                            using_cols,
917                            join_type,
918                        )?;
919                    }
920                }
921
922                return Ok(result);
923            }
924        }
925    }
926
927    // Try to use hash join for LEFT OUTER JOINs with equi-join conditions
928    // This optimization is critical for Q13 (customer LEFT JOIN orders)
929    if let vibesql_ast::JoinType::LeftOuter = join_type {
930        // Get column count and right table info for analysis
931        let left_col_count = left.schema.total_columns;
932
933        // Use merge to preserve all tables from nested joins for column resolution
934        let temp_schema = CombinedSchema::merge(left.schema.clone(), right.schema.clone());
935
936        // Try ON condition for hash join with multi-column support
937        // Use multi-column analysis to include ALL equi-join conditions in the hash join
938        // This is critical for LEFT JOIN correctness: post-filter conditions on NULL columns
939        // (from unmatched rows) incorrectly filter out valid LEFT JOIN results
940        if let Some(cond) = condition {
941            if let Some(multi_result) =
942                join_analyzer::analyze_multi_column_equi_join(cond, &temp_schema, left_col_count)
943            {
944                // Save schemas for NATURAL JOIN processing before moving left/right
945                let (left_schema_for_dedup, right_schema_for_dedup) =
946                    if natural || using_columns.is_some() {
947                        (Some(left.schema.clone()), Some(right.schema.clone()))
948                    } else {
949                        (None, None)
950                    };
951
952                // Build the remaining filter expression (if any)
953                let remaining_filter = combine_with_and(multi_result.remaining_conditions);
954
955                // Use multi-column hash join when there are multiple equi-join columns
956                // For remaining conditions, use hash_join_left_outer_with_filter which
957                // applies the filter DURING the join (not as post-filter) to preserve
958                // LEFT JOIN semantics: rows that match equi-join but fail filter get NULLs
959                let mut result = if multi_result.equi_joins.left_col_indices.len() > 1 {
960                    // Multi-column equi-join (remaining conditions not yet supported)
961                    // TODO: Add hash_join_left_outer_multi_with_filter
962                    let mut r = hash_join_left_outer_multi(
963                        left,
964                        right,
965                        &multi_result.equi_joins.left_col_indices,
966                        &multi_result.equi_joins.right_col_indices,
967                    )?;
968                    // For now, apply as post-filter (known issue for multi-column)
969                    if let Some(ref filter_expr) = remaining_filter {
970                        r = apply_post_join_filter(r, filter_expr, database, cte_results)?;
971                    }
972                    r
973                } else if let Some(ref filter_expr) = remaining_filter {
974                    // Single column with additional filter - use LEFT JOIN-aware filter
975                    // This applies the filter DURING the join to preserve unmatched rows
976                    hash_join_left_outer_with_filter(
977                        left,
978                        right,
979                        multi_result.equi_joins.left_col_indices[0],
980                        multi_result.equi_joins.right_col_indices[0],
981                        filter_expr,
982                        &temp_schema,
983                        database,
984                    )?
985                } else {
986                    // Single column without additional filter
987                    hash_join_left_outer(
988                        left,
989                        right,
990                        multi_result.equi_joins.left_col_indices[0],
991                        multi_result.equi_joins.right_col_indices[0],
992                    )?
993                };
994
995                // For NATURAL JOIN or USING clause, remove duplicate columns from the result
996                if natural {
997                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
998                        (left_schema_for_dedup, right_schema_for_dedup)
999                    {
1000                        // Use the original right schema directly to handle nested joins
1001                        // with multiple tables (e.g., t1 NATURAL JOIN (t2 JOIN t3))
1002                        result = remove_duplicate_columns_for_natural_join(
1003                            result,
1004                            left_schema,
1005                            right_schema_orig,
1006                            join_type,
1007                        )?;
1008                    }
1009                } else if let Some(using_cols) = using_columns {
1010                    if let (Some(ref left_schema), Some(ref right_schema_orig)) =
1011                        (left_schema_for_dedup, right_schema_for_dedup)
1012                    {
1013                        result = remove_duplicate_columns_for_using_join(
1014                            result,
1015                            left_schema,
1016                            right_schema_orig,
1017                            using_cols,
1018                            join_type,
1019                        )?;
1020                    }
1021                }
1022
1023                return Ok(result);
1024            }
1025        }
1026    }
1027
1028    // Try to use hash join for SEMI/ANTI JOINs with equi-join conditions
1029    if matches!(join_type, vibesql_ast::JoinType::Semi | vibesql_ast::JoinType::Anti) {
1030        // Get column count for analysis
1031        let left_col_count = left.schema.total_columns;
1032
1033        // Use merge to preserve all tables from nested joins for column resolution
1034        let temp_schema = CombinedSchema::merge(left.schema.clone(), right.schema.clone());
1035
1036        // Try ON condition first - use analyze_compound_equi_join to handle complex conditions
1037        // This enables hash join optimization for EXISTS subqueries with additional predicates
1038        // Example: EXISTS (SELECT * FROM t WHERE t.x = outer.x AND t.y <> outer.y)
1039        // The equi-join (t.x = outer.x) is used for hash join, and the inequality is a post-filter
1040        if let Some(cond) = condition {
1041            if let Some(compound_result) =
1042                join_analyzer::analyze_compound_equi_join(cond, &temp_schema, left_col_count)
1043            {
1044                // Build the combined remaining condition (if any)
1045                let remaining_filter = combine_with_and(compound_result.remaining_conditions);
1046
1047                let result = if matches!(join_type, vibesql_ast::JoinType::Semi) {
1048                    hash_semi_join_with_filter(
1049                        left,
1050                        right,
1051                        compound_result.equi_join.left_col_idx,
1052                        compound_result.equi_join.right_col_idx,
1053                        remaining_filter.as_ref(),
1054                        &temp_schema,
1055                        database,
1056                    )?
1057                } else {
1058                    hash_anti_join_with_filter(
1059                        left,
1060                        right,
1061                        compound_result.equi_join.left_col_idx,
1062                        compound_result.equi_join.right_col_idx,
1063                        remaining_filter.as_ref(),
1064                        &temp_schema,
1065                        database,
1066                    )?
1067                };
1068
1069                return Ok(result);
1070            }
1071        }
1072
1073        // Try WHERE clause equijoins
1074        for equijoin in additional_equijoins.iter() {
1075            if let Some(equi_join_info) =
1076                join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
1077            {
1078                let result = if matches!(join_type, vibesql_ast::JoinType::Semi) {
1079                    hash_semi_join(
1080                        left,
1081                        right,
1082                        equi_join_info.left_col_idx,
1083                        equi_join_info.right_col_idx,
1084                    )?
1085                } else {
1086                    hash_anti_join(
1087                        left,
1088                        right,
1089                        equi_join_info.left_col_idx,
1090                        equi_join_info.right_col_idx,
1091                    )?
1092                };
1093
1094                return Ok(result);
1095            }
1096        }
1097    }
1098
1099    // Try to use hash join for CROSS JOINs when equijoin conditions exist in WHERE clause
1100    // This is critical for Q21 and other TPC-H queries with implicit (comma-separated) joins
1101    // CROSS JOIN with equijoin predicates should be executed as hash INNER JOIN
1102    if let vibesql_ast::JoinType::Cross = join_type {
1103        if !additional_equijoins.is_empty() {
1104            // Get column count and right table info for analysis
1105            let left_col_count = left.schema.total_columns;
1106
1107            // Use merge to preserve all tables from nested joins for column resolution
1108            let temp_schema = CombinedSchema::merge(left.schema.clone(), right.schema.clone());
1109
1110            // Try WHERE clause equijoins for hash join
1111            for (idx, equijoin) in additional_equijoins.iter().enumerate() {
1112                if let Some(equi_join_info) =
1113                    join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
1114                {
1115                    // Found a WHERE clause equijoin suitable for hash join!
1116                    // Execute CROSS JOIN as hash INNER JOIN with the equijoin condition
1117                    let mut result = hash_join_inner(
1118                        left,
1119                        right,
1120                        equi_join_info.left_col_idx,
1121                        equi_join_info.right_col_idx,
1122                    )?;
1123
1124                    // Apply remaining equijoins as post-join filters
1125                    let remaining_conditions: Vec<_> = additional_equijoins
1126                        .iter()
1127                        .enumerate()
1128                        .filter(|(i, _)| *i != idx)
1129                        .map(|(_, e)| e.clone())
1130                        .collect();
1131
1132                    if !remaining_conditions.is_empty() {
1133                        if let Some(filter_expr) = combine_with_and(remaining_conditions) {
1134                            result = apply_post_join_filter(
1135                                result,
1136                                &filter_expr,
1137                                database,
1138                                cte_results,
1139                            )?;
1140                        }
1141                    }
1142
1143                    return Ok(result);
1144                }
1145            }
1146
1147            // Try arithmetic equijoins for hash join (TPC-DS Q2 optimization)
1148            // For expressions like `col1 = col2 - 53` in WHERE clause
1149            for (idx, equijoin) in additional_equijoins.iter().enumerate() {
1150                if let Some(arith_info) = join_analyzer::analyze_arithmetic_equi_join(
1151                    equijoin,
1152                    &temp_schema,
1153                    left_col_count,
1154                ) {
1155                    // Found an arithmetic equijoin suitable for hash join!
1156                    let mut result = hash_join_inner_arithmetic(
1157                        left,
1158                        right,
1159                        arith_info.left_col_idx,
1160                        arith_info.right_col_idx,
1161                        arith_info.offset,
1162                    )?;
1163
1164                    // Apply remaining equijoins as post-join filters
1165                    let remaining_conditions: Vec<_> = additional_equijoins
1166                        .iter()
1167                        .enumerate()
1168                        .filter(|(i, _)| *i != idx)
1169                        .map(|(_, e)| e.clone())
1170                        .collect();
1171
1172                    if !remaining_conditions.is_empty() {
1173                        if let Some(filter_expr) = combine_with_and(remaining_conditions) {
1174                            result = apply_post_join_filter(
1175                                result,
1176                                &filter_expr,
1177                                database,
1178                                cte_results,
1179                            )?;
1180                        }
1181                    }
1182
1183                    return Ok(result);
1184                }
1185            }
1186        }
1187    }
1188
1189    // Prepare combined join condition including additional equijoins from WHERE clause
1190    let mut all_join_conditions = Vec::new();
1191    if let Some(cond) = condition {
1192        all_join_conditions.push(cond.clone());
1193    }
1194    all_join_conditions.extend_from_slice(additional_equijoins);
1195
1196    // Combine all join conditions with AND
1197    let combined_condition = combine_with_and(all_join_conditions);
1198
1199    // Fall back to nested loop join for all other cases
1200    // For NATURAL JOIN, we need to preserve the original schemas for duplicate removal
1201    let (left_schema_for_dedup, right_schema_for_dedup) = if natural || using_columns.is_some() {
1202        (Some(left.schema.clone()), Some(right.schema.clone()))
1203    } else {
1204        (None, None)
1205    };
1206
1207    let mut result = match join_type {
1208        vibesql_ast::JoinType::Inner => {
1209            nested_loop_inner_join(left, right, &combined_condition, database, timeout_ctx)
1210        }
1211        vibesql_ast::JoinType::LeftOuter => {
1212            nested_loop_left_outer_join(left, right, &combined_condition, database, timeout_ctx)
1213        }
1214        vibesql_ast::JoinType::RightOuter => {
1215            nested_loop_right_outer_join(left, right, &combined_condition, database, timeout_ctx)
1216        }
1217        vibesql_ast::JoinType::FullOuter => {
1218            nested_loop_full_outer_join(left, right, &combined_condition, database, timeout_ctx)
1219        }
1220        vibesql_ast::JoinType::Cross => {
1221            // CROSS JOIN with any condition (from USING clause, NATURAL, or explicit ON)
1222            // should be executed as INNER JOIN - the condition filters the Cartesian product.
1223            // Note: `using_columns` may be None here if USING deduplication is handled
1224            // post-process by the caller (join_scan.rs), but `combined_condition` will
1225            // contain the generated USING equality conditions.
1226            if combined_condition.is_some() {
1227                nested_loop_inner_join(left, right, &combined_condition, database, timeout_ctx)
1228            } else {
1229                nested_loop_cross_join(left, right, &combined_condition, database, timeout_ctx)
1230            }
1231        }
1232        vibesql_ast::JoinType::Semi => {
1233            nested_loop_semi_join(left, right, &combined_condition, database, timeout_ctx)
1234        }
1235        vibesql_ast::JoinType::Anti => {
1236            nested_loop_anti_join(left, right, &combined_condition, database, timeout_ctx)
1237        }
1238    }?;
1239
1240    // For NATURAL JOIN or USING clause, remove duplicate columns from the result
1241    // For RIGHT/FULL OUTER JOINs, the updated remove_duplicate_columns_for_natural_join
1242    // now handles hiding the correct side and adding coalesce pairs (issue #4781, #4802)
1243    if natural {
1244        if let (Some(ref left_schema), Some(ref right_schema)) =
1245            (&left_schema_for_dedup, &right_schema_for_dedup)
1246        {
1247            result = remove_duplicate_columns_for_natural_join(
1248                result,
1249                left_schema,
1250                right_schema,
1251                join_type,
1252            )?;
1253        }
1254    } else if let Some(using_cols) = using_columns {
1255        if let (Some(ref left_schema), Some(ref right_schema)) =
1256            (&left_schema_for_dedup, &right_schema_for_dedup)
1257        {
1258            // For RIGHT/FULL OUTER JOIN, coalesce USING columns before hiding duplicates
1259            if matches!(
1260                join_type,
1261                vibesql_ast::JoinType::RightOuter | vibesql_ast::JoinType::FullOuter
1262            ) {
1263                coalesce_using_columns_for_outer_join(
1264                    &mut result,
1265                    left_schema,
1266                    right_schema,
1267                    using_cols,
1268                );
1269            }
1270            result = remove_duplicate_columns_for_using_join(
1271                result,
1272                left_schema,
1273                right_schema,
1274                using_cols,
1275                join_type,
1276            )?;
1277        }
1278    }
1279
1280    Ok(result)
1281}
1282
1283/// Coalesce USING column values for RIGHT/FULL OUTER JOIN
1284///
1285/// For RIGHT JOIN and FULL JOIN with USING clause, when a left row is NULL (unmatched),
1286/// the USING column value should come from the right side, not the left.
1287///
1288/// SQL standard semantics: USING clause columns use COALESCE(left.col, right.col)
1289/// - INNER JOIN: values are equal, doesn't matter
1290/// - LEFT OUTER JOIN: unmatched rows have right=NULL, so left value is used (no change needed)
1291/// - RIGHT OUTER JOIN: unmatched rows have left=NULL, so right value should be used
1292/// - FULL OUTER JOIN: COALESCE(left, right) is used for both unmatched cases
1293///
1294/// This function updates left column values to COALESCE(left, right) for USING columns only.
1295fn coalesce_using_columns_for_outer_join(
1296    result: &mut FromResult,
1297    left_schema: &CombinedSchema,
1298    right_schema: &CombinedSchema,
1299    using_cols: &[String],
1300) {
1301    use std::collections::{HashMap, HashSet};
1302
1303    // Create a set of USING column names (lowercase for case-insensitive comparison)
1304    let using_cols_lower: HashSet<String> = using_cols.iter().map(|c| c.to_lowercase()).collect();
1305
1306    // Build a map: lowercase column name -> list of left column indices
1307    // Only include columns that are in the USING list
1308    let mut left_column_map: HashMap<String, Vec<usize>> = HashMap::new();
1309    for (table_start_idx, table_schema) in left_schema.table_schemas.values() {
1310        for (col_idx, col) in table_schema.columns.iter().enumerate() {
1311            let lowercase = col.name.to_lowercase();
1312            if using_cols_lower.contains(&lowercase) {
1313                left_column_map
1314                    .entry(lowercase)
1315                    .or_default()
1316                    .push(table_start_idx + col_idx);
1317            }
1318        }
1319    }
1320
1321    // Build pairs of (left_idx, right_idx) for USING columns
1322    let left_col_count = left_schema.total_columns;
1323    let mut coalesce_pairs: Vec<(usize, usize)> = Vec::new();
1324
1325    for (table_start_idx, table_schema) in right_schema.table_schemas.values() {
1326        for (col_idx, col) in table_schema.columns.iter().enumerate() {
1327            let lowercase = col.name.to_lowercase();
1328            if let Some(left_indices) = left_column_map.get(&lowercase) {
1329                let right_idx = left_col_count + table_start_idx + col_idx;
1330                // For each left column with this name, create a coalesce pair
1331                for &left_idx in left_indices {
1332                    coalesce_pairs.push((left_idx, right_idx));
1333                }
1334            }
1335        }
1336    }
1337
1338    // Apply COALESCE to each row: if left value is NULL, use right value
1339    if !coalesce_pairs.is_empty() {
1340        let rows = result.rows_mut();
1341        for row in rows {
1342            for &(left_idx, right_idx) in &coalesce_pairs {
1343                if row.values[left_idx] == vibesql_types::SqlValue::Null {
1344                    row.values[left_idx] = row.values[right_idx].clone();
1345                }
1346            }
1347        }
1348    }
1349}
1350
1351/// Mark duplicate columns as hidden for NATURAL JOIN
1352///
1353/// NATURAL JOIN should only include common columns once (from the left side) in `SELECT *`.
1354/// However, qualified wildcards like `SELECT t1.*` should still return all columns from t1.
1355///
1356/// This function marks duplicate columns from the right side as hidden in the schema,
1357/// rather than removing them. This allows:
1358/// - `SELECT *` to correctly deduplicate columns (skips hidden columns)
1359/// - `SELECT t1.*` to return all columns from t1 (includes hidden columns)
1360///
1361/// Issue #4466: table.* should return all columns in NATURAL JOIN context
1362fn remove_duplicate_columns_for_natural_join(
1363    mut result: FromResult,
1364    left_schema: &CombinedSchema,
1365    right_schema: &CombinedSchema,
1366    join_type: &vibesql_ast::JoinType,
1367) -> Result<FromResult, ExecutorError> {
1368    use std::collections::HashMap;
1369
1370    // For RIGHT/FULL OUTER JOINs, we need to hide left-side columns and add coalesce pairs
1371    // For other join types, we hide right-side columns
1372    let is_right_or_full = matches!(
1373        join_type,
1374        vibesql_ast::JoinType::RightOuter | vibesql_ast::JoinType::FullOuter
1375    );
1376
1377    // Find common column names (case-insensitive)
1378    // Use table_start_idx to compute correct absolute positions
1379    // (HashMap iteration order is non-deterministic, so we can't use a sequential counter)
1380    let mut left_column_map: HashMap<String, Vec<(String, String, usize)>> = HashMap::new(); // lowercase -> [(table, actual_name, idx)]
1381    for (table_name, (table_start_idx, table_schema)) in &left_schema.table_schemas {
1382        for (col_idx, col) in table_schema.columns.iter().enumerate() {
1383            let lowercase = col.name.to_lowercase();
1384            left_column_map.entry(lowercase).or_default().push((
1385                table_name.to_string(),
1386                col.name.clone(),
1387                table_start_idx + col_idx,
1388            ));
1389        }
1390    }
1391
1392    // Identify which columns from the right side are duplicates
1393    // Use table_start_idx from right_schema to compute correct absolute positions
1394    let left_col_count = left_schema.total_columns;
1395
1396    if is_right_or_full {
1397        // For RIGHT/FULL OUTER JOINs: hide left-side columns and add coalesce pairs
1398        // This ensures SELECT * shows the right-side value (which is non-NULL for unmatched rows)
1399        // while qualified references like t5.id still return the actual NULL value
1400
1401        // Build a set of columns that are replacement targets from the left schema
1402        // (i.e., columns that other columns point to for their replacement value)
1403        let existing_replacement_targets: std::collections::HashSet<usize> =
1404            left_schema.column_replacement_map.values().copied().collect();
1405
1406        for (table_start_idx, table_schema) in right_schema.table_schemas.values() {
1407            for (col_idx, col) in table_schema.columns.iter().enumerate() {
1408                let lowercase = col.name.to_lowercase();
1409                if let Some(left_entries) = left_column_map.get(&lowercase) {
1410                    let right_idx = left_col_count + table_start_idx + col_idx;
1411
1412                    // Handle chained NATURAL JOINs: update existing replacements that point
1413                    // to columns being hidden in this join. For example:
1414                    // t5 NATURAL RIGHT JOIN t4 NATURAL RIGHT JOIN t6
1415                    // After first join: {0 -> 2}
1416                    // After second join: 2 is being hidden, so update {0 -> 4}
1417                    for (_, _, left_idx) in left_entries {
1418                        // If this column is a replacement target from a previous join,
1419                        // we need to update all replacements that point to it
1420                        if existing_replacement_targets.contains(left_idx) {
1421                            // Find and update all replacements pointing to this column
1422                            for (source, target) in left_schema.column_replacement_map.iter() {
1423                                if *target == *left_idx {
1424                                    // Update the existing replacement to point to the new right column
1425                                    result.schema.add_column_replacement(*source, right_idx);
1426                                }
1427                            }
1428                        } else {
1429                            // Normal case: add replacement for this column
1430                            result.schema.add_column_replacement(*left_idx, right_idx);
1431                        }
1432                    }
1433
1434                    // Hide all left columns that are common (replacement targets get hidden too)
1435                    for (_, _, left_idx) in left_entries {
1436                        result.schema.hide_column(*left_idx);
1437                    }
1438
1439                    // Add coalesce pair for COALESCE(left, right) semantics on unqualified references
1440                    // Use the first left column entry for the coalesce pair
1441                    // Issue #4906: For nested parenthesized JOINs, the right_schema may already have
1442                    // a coalesce chain for this column. We need to extend the chain with ALL indices
1443                    // from the right side, not just the current right column.
1444                    if let Some((_, actual_name, left_idx)) = left_entries.first() {
1445                        // Check if right_schema already has a coalesce chain for this column
1446                        if let Some(right_coalesce_indices) =
1447                            right_schema.using_coalesce_indices.get(&lowercase)
1448                        {
1449                            // Add left_idx first
1450                            result
1451                                .schema
1452                                .add_using_coalesce_pair(actual_name, *left_idx, *left_idx);
1453                            // Then add all indices from the right side's chain
1454                            for &existing_idx in right_coalesce_indices {
1455                                let adjusted_idx = left_col_count + existing_idx;
1456                                result
1457                                    .schema
1458                                    .add_using_coalesce_pair(actual_name, *left_idx, adjusted_idx);
1459                            }
1460                        } else {
1461                            // No existing chain, just add the single pair
1462                            result
1463                                .schema
1464                                .add_using_coalesce_pair(actual_name, *left_idx, right_idx);
1465                        }
1466                    }
1467
1468                    // Mark as joined column for ambiguity resolution
1469                    result.schema.add_joined_column(&col.name);
1470
1471                    // Also hide the right column since it will be output via replacement
1472                    result.schema.hide_column(right_idx);
1473                }
1474            }
1475        }
1476    } else {
1477        // For INNER/LEFT/CROSS JOINs: hide right-side columns (original behavior)
1478        for (table_start_idx, table_schema) in right_schema.table_schemas.values() {
1479            for (col_idx, col) in table_schema.columns.iter().enumerate() {
1480                let lowercase = col.name.to_lowercase();
1481                if left_column_map.contains_key(&lowercase) {
1482                    // This is a common column, mark it as hidden for SELECT * expansion
1483                    // Position in result = left_col_count + position_in_right_schema
1484                    let hidden_idx = left_col_count + table_start_idx + col_idx;
1485                    result.schema.hide_column(hidden_idx);
1486
1487                    // Also mark this column as a "joined column" so it won't be
1488                    // considered ambiguous when referenced without table qualification (issue #4517)
1489                    result.schema.add_joined_column(&col.name);
1490                }
1491            }
1492        }
1493    }
1494
1495    Ok(result)
1496}
1497
1498/// Mark USING columns as hidden for USING clause JOIN
1499///
1500/// USING clause should only include the specified columns once (from the left side) in `SELECT *`.
1501/// However, qualified wildcards like `SELECT t1.*` should still return all columns from t1.
1502///
1503/// This function marks USING columns from the right side as hidden in the schema,
1504/// rather than removing them. This allows:
1505/// - `SELECT *` to correctly deduplicate columns (skips hidden columns)
1506/// - `SELECT t1.*` to return all columns from t1 (includes hidden columns)
1507///
1508/// Issue #4466: table.* should return all columns in NATURAL JOIN context
1509fn remove_duplicate_columns_for_using_join(
1510    mut result: FromResult,
1511    left_schema: &CombinedSchema,
1512    right_schema: &CombinedSchema,
1513    using_cols: &[String],
1514    join_type: &vibesql_ast::JoinType,
1515) -> Result<FromResult, ExecutorError> {
1516    use std::collections::HashSet;
1517
1518    // Create a set of USING column names (lowercase for case-insensitive comparison)
1519    let using_cols_lower: HashSet<String> = using_cols.iter().map(|c| c.to_lowercase()).collect();
1520
1521    // Mark all USING columns as "joined columns" so they won't be
1522    // considered ambiguous when referenced without table qualification (issue #4517)
1523    for col in using_cols {
1524        result.schema.add_joined_column(col);
1525    }
1526
1527    // Count columns in left schema
1528    let left_col_count = left_schema.total_columns;
1529
1530    // For RIGHT OUTER and FULL OUTER joins, we need COALESCE semantics:
1531    // The USING column value should be COALESCE(left.col, right.col)
1532    // - RIGHT OUTER: left can be NULL for unmatched rows from right
1533    // - FULL OUTER: either side can be NULL for unmatched rows
1534    // Issue #4783: USING column semantics differ from SQLite in OUTER JOINs
1535    let _needs_coalesce = matches!(
1536        join_type,
1537        vibesql_ast::JoinType::RightOuter | vibesql_ast::JoinType::FullOuter
1538    );
1539
1540    // Build a map of column name -> leftmost left column index
1541    // This is the index that unqualified references will resolve to
1542    //
1543    // Issue #4786: We ALWAYS need to build this map, not just for FULL/RIGHT joins.
1544    // When a LEFT/INNER join has USING on a right side with nested USING joins,
1545    // we need to add the left column to the coalesce chain so that the inner
1546    // join's visible column becomes a non-first position and gets properly
1547    // filtered by is_using_coalesce_right_side() in projection.
1548    let mut leftmost_left_indices: std::collections::HashMap<String, usize> =
1549        std::collections::HashMap::new();
1550    for (table_start_idx, table_schema) in left_schema.table_schemas.values() {
1551        for (col_idx, col) in table_schema.columns.iter().enumerate() {
1552            let lowercase = col.name.to_lowercase();
1553            if using_cols_lower.contains(&lowercase) {
1554                let absolute_idx = table_start_idx + col_idx;
1555                leftmost_left_indices
1556                    .entry(lowercase)
1557                    .and_modify(|e| {
1558                        if absolute_idx < *e {
1559                            *e = absolute_idx
1560                        }
1561                    })
1562                    .or_insert(absolute_idx);
1563            }
1564        }
1565    }
1566
1567    // Identify which columns from the right side are USING columns to hide
1568    // Use table_start_idx from right_schema to compute correct absolute positions
1569    for (table_start_idx, table_schema) in right_schema.table_schemas.values() {
1570        for (col_idx, col) in table_schema.columns.iter().enumerate() {
1571            let lowercase = col.name.to_lowercase();
1572            if using_cols_lower.contains(&lowercase) {
1573                // This is a USING column, mark it as hidden for SELECT * expansion
1574                // Position in result = left_col_count + position_in_right_schema
1575                let right_idx = left_col_count + table_start_idx + col_idx;
1576                result.schema.hide_column(right_idx);
1577
1578                // Issue #4786: ALWAYS register the coalesce pair in the schema, not just
1579                // for FULL/RIGHT joins. This ensures that when a nested USING join's
1580                // visible column is hidden by an outer LEFT/INNER USING join, the
1581                // is_using_coalesce_right_side() check in projection will correctly
1582                // filter it out (because it's no longer first in the chain).
1583                //
1584                // For LEFT/INNER joins, the COALESCE semantics won't be applied in
1585                // output (the left column value is used directly), but the chain
1586                // structure is needed for correct column filtering.
1587                if let Some(&left_idx) = leftmost_left_indices.get(&lowercase) {
1588                    result
1589                        .schema
1590                        .add_using_coalesce_pair(&col.name, left_idx, right_idx);
1591                }
1592            }
1593        }
1594    }
1595
1596    Ok(result)
1597}