vibesql_executor/select/join/
mod.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use super::{cte::CteResult, from_iterator::FromIterator};
5use crate::{
6    errors::ExecutorError, evaluator::CombinedExpressionEvaluator, optimizer::combine_with_and,
7    schema::CombinedSchema, timeout::TimeoutContext,
8};
9
10mod bloom_filter;
11mod expression_mapper;
12mod hash_anti_join;
13pub(crate) mod hash_join;
14mod hash_join_iterator;
15mod hash_semi_join;
16mod join_analyzer;
17mod nested_loop;
18pub mod reorder;
19pub mod search;
20
21// Re-export Bloom filter for use in hash join implementations
22pub(crate) use bloom_filter::BloomFilter;
23
24#[cfg(test)]
25mod tests;
26
27// Re-export join reorder analyzer for public tests
28// Re-export hash_join functions for internal use
29use hash_anti_join::{hash_anti_join, hash_anti_join_with_filter};
30use hash_join::{
31    hash_join_inner, hash_join_inner_arithmetic, hash_join_inner_multi, hash_join_left_outer,
32    hash_join_left_outer_multi,
33};
34use hash_semi_join::{hash_semi_join, hash_semi_join_with_filter};
35// Re-export hash join iterator for public use
36pub use hash_join_iterator::HashJoinIterator;
37// Re-export nested loop join variants for internal use
38use nested_loop::{
39    nested_loop_anti_join, nested_loop_cross_join, nested_loop_full_outer_join,
40    nested_loop_inner_join, nested_loop_left_outer_join, nested_loop_right_outer_join,
41    nested_loop_semi_join,
42};
43pub use reorder::JoinOrderAnalyzer;
44// Re-export join order search for public tests
45pub use search::JoinOrderSearch;
46
47/// Iterator over `FromData` rows without forcing full materialization
48///
49/// This enum wraps either a Vec iterator or a lazy `FromIterator`, allowing
50/// uniform iteration over rows regardless of how they were stored.
51///
52/// # Issue #4060
53///
54/// This type enables deferred row materialization for LIMIT queries:
55/// - `SELECT * FROM t LIMIT 10` only clones 10 rows, not all of `t`
56/// - Memory usage is O(LIMIT) instead of O(table_size)
57#[allow(dead_code)]
58pub(super) enum FromDataIterator {
59    /// Iterator over a materialized Vec<Row>
60    Vec(std::vec::IntoIter<vibesql_storage::Row>),
61    /// Lazy iterator from FromIterator (table scan)
62    Lazy(FromIterator),
63}
64
65impl Iterator for FromDataIterator {
66    type Item = vibesql_storage::Row;
67
68    #[inline]
69    fn next(&mut self) -> Option<Self::Item> {
70        match self {
71            Self::Vec(iter) => iter.next(),
72            Self::Lazy(iter) => iter.next(),
73        }
74    }
75
76    #[inline]
77    fn size_hint(&self) -> (usize, Option<usize>) {
78        match self {
79            Self::Vec(iter) => iter.size_hint(),
80            Self::Lazy(iter) => iter.size_hint(),
81        }
82    }
83}
84
85/// Data source for FROM clause results
86///
87/// This enum allows FROM results to be either materialized (Vec<Row>) or lazy (iterator).
88/// Materialized results are used for JOINs, CTEs, and operations that need multiple passes.
89/// Lazy results are used for simple table scans to enable streaming execution.
90pub(super) enum FromData {
91    /// Materialized rows (for JOINs, CTEs, operations needing multiple passes)
92    Materialized(Vec<vibesql_storage::Row>),
93
94    /// Shared rows (for zero-copy CTE references without filtering)
95    ///
96    /// This variant enables O(1) cloning when CTEs are referenced multiple times
97    /// without any filtering. Only materializes to Vec when mutation is needed.
98    SharedRows(Arc<Vec<vibesql_storage::Row>>),
99
100    /// Lazy iterator (for streaming table scans)
101    Iterator(FromIterator),
102}
103
104impl FromData {
105    /// Get rows, materializing if needed
106    ///
107    /// For SharedRows, this will clone only if the Arc is shared.
108    /// If the Arc has a single reference, the Vec is moved out efficiently.
109    pub fn into_rows(self) -> Vec<vibesql_storage::Row> {
110        match self {
111            Self::Materialized(rows) => rows,
112            Self::SharedRows(arc) => Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()),
113            Self::Iterator(iter) => iter.collect_vec(),
114        }
115    }
116
117    /// Returns an iterator over rows without forcing full materialization
118    ///
119    /// This is more efficient than `into_rows()` when you don't need all rows,
120    /// particularly for LIMIT queries where only a subset will be consumed.
121    ///
122    /// For Materialized and SharedRows variants, this returns an iterator over
123    /// the owned/cloned Vec. For Iterator variant, it returns the lazy iterator
124    /// directly without collecting.
125    ///
126    /// # Performance
127    ///
128    /// - `into_rows()`: O(n) allocation + cloning for all rows
129    /// - `into_iter()`: O(k) where k is the number of rows actually consumed
130    ///
131    /// Use `into_iter()` when:
132    /// - Processing with LIMIT (only need first N rows)
133    /// - Filtering results (may discard many rows)
134    /// - Streaming output without full materialization
135    #[allow(dead_code)]
136    pub fn into_iter(self) -> FromDataIterator {
137        match self {
138            Self::Materialized(rows) => FromDataIterator::Vec(rows.into_iter()),
139            Self::SharedRows(arc) => {
140                // Try to unwrap the Arc; if shared, clone the Vec
141                let rows = Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone());
142                FromDataIterator::Vec(rows.into_iter())
143            }
144            Self::Iterator(iter) => FromDataIterator::Lazy(iter),
145        }
146    }
147
148    /// Get a reference to materialized rows, or materialize if iterator
149    ///
150    /// For SharedRows, returns a reference to the shared data without cloning.
151    pub fn as_rows(&mut self) -> &Vec<vibesql_storage::Row> {
152        // If we have an iterator, materialize it
153        if let Self::Iterator(iter) = self {
154            #[cfg(feature = "profile-q6")]
155            let materialize_start = std::time::Instant::now();
156
157            let rows = std::mem::replace(iter, FromIterator::from_vec(vec![])).collect_vec();
158            *self = Self::Materialized(rows);
159
160            #[cfg(feature = "profile-q6")]
161            {
162                let materialize_time = materialize_start.elapsed();
163                if let Self::Materialized(rows) = self {
164                    eprintln!(
165                        "[Q6 PROFILE] Row materialization (collect_vec): {:?} ({} rows, {:?}/row)",
166                        materialize_time,
167                        rows.len(),
168                        materialize_time / rows.len() as u32
169                    );
170                }
171            }
172        }
173
174        // Now we're guaranteed to have materialized or shared rows
175        match self {
176            Self::Materialized(ref rows) => rows,
177            Self::SharedRows(ref arc) => arc.as_ref(),
178            Self::Iterator(_) => unreachable!(),
179        }
180    }
181
182    /// Get a slice reference to the underlying rows without triggering materialization
183    ///
184    /// This is a zero-cost operation that directly accesses the underlying Vec<Row>
185    /// without calling collect_vec(). This avoids the 137ms row materialization
186    /// bottleneck in Q6 by skipping iteration entirely.
187    ///
188    /// Critical for columnar execution performance (#2521).
189    pub fn as_slice(&self) -> &[vibesql_storage::Row] {
190        match self {
191            Self::Materialized(rows) => rows.as_slice(),
192            Self::SharedRows(arc) => arc.as_slice(),
193            Self::Iterator(iter) => iter.as_slice(),
194        }
195    }
196}
197
198/// Result of executing a FROM clause
199///
200/// Contains the combined schema and data (either materialized or lazy).
201pub(super) struct FromResult {
202    pub(super) schema: CombinedSchema,
203    pub(super) data: FromData,
204    /// If present, indicates that results are already sorted by the specified columns
205    /// in the given order (ASC/DESC). This allows skipping ORDER BY sorting.
206    pub(super) sorted_by: Option<Vec<(String, vibesql_ast::OrderDirection)>>,
207    /// If true, indicates that WHERE clause filtering has already been fully applied
208    /// during the scan (e.g., by index scan with predicate pushdown). This allows
209    /// skipping redundant WHERE clause evaluation in the executor.
210    pub(super) where_filtered: bool,
211}
212
213impl FromResult {
214    /// Create a FromResult from materialized rows
215    pub(super) fn from_rows(schema: CombinedSchema, rows: Vec<vibesql_storage::Row>) -> Self {
216        Self { schema, data: FromData::Materialized(rows), sorted_by: None, where_filtered: false }
217    }
218
219    /// Create a FromResult from shared rows (zero-copy for CTEs)
220    ///
221    /// This variant is used when CTE rows can be shared without cloning,
222    /// enabling O(1) memory usage for CTE references without filtering.
223    pub(super) fn from_shared_rows(
224        schema: CombinedSchema,
225        rows: Arc<Vec<vibesql_storage::Row>>,
226    ) -> Self {
227        Self { schema, data: FromData::SharedRows(rows), sorted_by: None, where_filtered: false }
228    }
229
230    /// Create a FromResult from materialized rows with sorting metadata
231    pub(super) fn from_rows_sorted(
232        schema: CombinedSchema,
233        rows: Vec<vibesql_storage::Row>,
234        sorted_by: Vec<(String, vibesql_ast::OrderDirection)>,
235    ) -> Self {
236        Self {
237            schema,
238            data: FromData::Materialized(rows),
239            sorted_by: Some(sorted_by),
240            where_filtered: false,
241        }
242    }
243
244    /// Create a FromResult from materialized rows with WHERE filtering already applied
245    pub(super) fn from_rows_where_filtered(
246        schema: CombinedSchema,
247        rows: Vec<vibesql_storage::Row>,
248        sorted_by: Option<Vec<(String, vibesql_ast::OrderDirection)>>,
249    ) -> Self {
250        Self { schema, data: FromData::Materialized(rows), sorted_by, where_filtered: true }
251    }
252
253    /// Create a FromResult from an iterator
254    pub(super) fn from_iterator(schema: CombinedSchema, iterator: FromIterator) -> Self {
255        Self { schema, data: FromData::Iterator(iterator), sorted_by: None, where_filtered: false }
256    }
257
258    /// Get the rows, materializing if needed
259    pub(super) fn into_rows(self) -> Vec<vibesql_storage::Row> {
260        self.data.into_rows()
261    }
262
263    /// Returns an iterator over rows without forcing full materialization
264    ///
265    /// This delegates to `FromData::into_iter()` and is more efficient than
266    /// `into_rows()` when only a subset of rows will be consumed.
267    ///
268    /// # Example Use Cases
269    ///
270    /// - LIMIT queries: `result.into_iter().take(10).collect()`
271    /// - Filtered iteration: `result.into_iter().filter(|r| ...).collect()`
272    /// - Early termination: Stop iterating when a condition is met
273    ///
274    /// # Issue #4060
275    #[allow(dead_code)]
276    pub(super) fn into_iter(self) -> FromDataIterator {
277        self.data.into_iter()
278    }
279
280    /// Take up to N rows without full materialization
281    ///
282    /// This is a convenience method equivalent to `self.into_iter().take(n).collect()`.
283    /// It's optimized for LIMIT queries where only a small subset of rows is needed.
284    ///
285    /// # Performance
286    ///
287    /// For a table with 10,000 rows and `take(10)`:
288    /// - `into_rows()` + truncate: clones all 10,000 rows, then discards 9,990
289    /// - `take(10)`: clones only 10 rows
290    ///
291    /// # Issue #4060
292    #[allow(dead_code)]
293    pub(super) fn take(self, n: usize) -> Vec<vibesql_storage::Row> {
294        self.into_iter().take(n).collect()
295    }
296
297    /// Get a mutable reference to the rows, materializing if needed
298    ///
299    /// For SharedRows, this triggers copy-on-write: the shared data is cloned
300    /// into owned Materialized data to allow mutation.
301    #[allow(dead_code)]
302    pub(super) fn rows_mut(&mut self) -> &mut Vec<vibesql_storage::Row> {
303        // Convert iterator or shared to materialized
304        match &mut self.data {
305            FromData::Iterator(iter) => {
306                let rows = std::mem::replace(iter, FromIterator::from_vec(vec![])).collect_vec();
307                self.data = FromData::Materialized(rows);
308            }
309            FromData::SharedRows(arc) => {
310                // Copy-on-write: clone the shared data to allow mutation
311                let rows = arc.as_ref().clone();
312                self.data = FromData::Materialized(rows);
313            }
314            FromData::Materialized(_) => {}
315        }
316
317        // Now we're guaranteed to have materialized rows
318        match &mut self.data {
319            FromData::Materialized(rows) => rows,
320            FromData::SharedRows(_) | FromData::Iterator(_) => unreachable!(),
321        }
322    }
323
324    /// Get a reference to rows, materializing if needed
325    pub(super) fn rows(&mut self) -> &Vec<vibesql_storage::Row> {
326        self.data.as_rows()
327    }
328
329    /// Get a slice reference to rows without triggering materialization
330    ///
331    /// This is a zero-cost operation that accesses the underlying data directly
332    /// without calling collect_vec(). This is critical for performance as it
333    /// avoids the row materialization bottleneck (up to 57% of query time).
334    ///
335    /// Unlike `rows()` which may trigger iterator collection, this method
336    /// provides direct access to the underlying Vec<Row> or iterator buffer.
337    pub(super) fn as_slice(&self) -> &[vibesql_storage::Row] {
338        self.data.as_slice()
339    }
340}
341
342/// Helper function to combine two rows without unnecessary cloning
343/// Only creates a single combined row, avoiding intermediate clones
344#[inline]
345fn combine_rows(
346    left_row: &vibesql_storage::Row,
347    right_row: &vibesql_storage::Row,
348) -> vibesql_storage::Row {
349    let mut combined_values = Vec::with_capacity(left_row.values.len() + right_row.values.len());
350    combined_values.extend_from_slice(&left_row.values);
351    combined_values.extend_from_slice(&right_row.values);
352    vibesql_storage::Row::new(combined_values)
353}
354
355/// Apply a post-join filter expression to join result rows
356///
357/// This is used to filter rows produced by hash join with additional conditions
358/// from the WHERE clause that weren't used in the hash join itself.
359///
360/// Issue #3562: Added cte_results parameter so IN subqueries in filter expressions
361/// can resolve CTE references.
362fn apply_post_join_filter(
363    result: FromResult,
364    filter_expr: &vibesql_ast::Expression,
365    database: &vibesql_storage::Database,
366    cte_results: &HashMap<String, CteResult>,
367) -> Result<FromResult, ExecutorError> {
368    // Extract schema before moving result
369    let schema = result.schema.clone();
370    // Issue #3562: Use evaluator with CTE context if CTEs exist
371    let evaluator = if cte_results.is_empty() {
372        CombinedExpressionEvaluator::with_database(&schema, database)
373    } else {
374        CombinedExpressionEvaluator::with_database_and_cte(&schema, database, cte_results)
375    };
376
377    // Filter rows based on the expression
378    let mut filtered_rows = Vec::new();
379    for row in result.into_rows() {
380        match evaluator.eval(filter_expr, &row)? {
381            vibesql_types::SqlValue::Boolean(true) => filtered_rows.push(row),
382            vibesql_types::SqlValue::Boolean(false) => {} // Skip this row
383            vibesql_types::SqlValue::Null => {}           // Skip NULL results
384            // SQLLogicTest compatibility: treat integers as truthy/falsy
385            vibesql_types::SqlValue::Integer(0) => {} // Skip 0
386            vibesql_types::SqlValue::Integer(_) => filtered_rows.push(row),
387            vibesql_types::SqlValue::Smallint(0) => {} // Skip 0
388            vibesql_types::SqlValue::Smallint(_) => filtered_rows.push(row),
389            vibesql_types::SqlValue::Bigint(0) => {} // Skip 0
390            vibesql_types::SqlValue::Bigint(_) => filtered_rows.push(row),
391            vibesql_types::SqlValue::Float(0.0) => {} // Skip 0.0
392            vibesql_types::SqlValue::Float(_) => filtered_rows.push(row),
393            vibesql_types::SqlValue::Real(0.0) => {} // Skip 0.0
394            vibesql_types::SqlValue::Real(_) => filtered_rows.push(row),
395            vibesql_types::SqlValue::Double(0.0) => {} // Skip 0.0
396            vibesql_types::SqlValue::Double(_) => filtered_rows.push(row),
397            other => {
398                return Err(ExecutorError::InvalidWhereClause(format!(
399                    "Filter expression must evaluate to boolean, got: {:?}",
400                    other
401                )))
402            }
403        }
404    }
405
406    Ok(FromResult::from_rows(schema, filtered_rows))
407}
408
409/// Perform join between two FROM results, optimizing with hash join when possible
410///
411/// This function now supports predicate pushdown from WHERE clauses. Additional equijoin
412/// predicates from WHERE can be passed to optimize hash join selection and execution.
413///
414/// Note: This function combines rows from left and right according to the join type
415/// and join condition. For queries with many tables and large intermediate results,
416/// consider applying WHERE filters earlier to reduce memory usage.
417///
418/// Issue #3562: Added cte_results parameter so post-join filters with IN subqueries
419/// can resolve CTE references.
420#[allow(clippy::too_many_arguments)]
421pub(super) fn nested_loop_join(
422    left: FromResult,
423    right: FromResult,
424    join_type: &vibesql_ast::JoinType,
425    condition: &Option<vibesql_ast::Expression>,
426    natural: bool,
427    database: &vibesql_storage::Database,
428    additional_equijoins: &[vibesql_ast::Expression],
429    timeout_ctx: &TimeoutContext,
430    cte_results: &HashMap<String, CteResult>,
431) -> Result<FromResult, ExecutorError> {
432    // Try to use hash join for INNER JOINs with simple equi-join conditions
433    if let vibesql_ast::JoinType::Inner = join_type {
434        // Get column count and right table info once for analysis
435        // IMPORTANT: Sum up columns from ALL tables in the left schema,
436        // not just the first table, to handle accumulated multi-table joins
437        let left_col_count: usize =
438            left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
439
440        let right_table_name = right
441            .schema
442            .table_schemas
443            .keys()
444            .next()
445            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
446            .clone();
447
448        let right_schema = right
449            .schema
450            .table_schemas
451            .get(&right_table_name)
452            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
453            .1
454            .clone();
455
456        // Clone right_table_name before it gets moved into combine()
457        let right_table_name_for_natural = right_table_name.clone();
458
459        let temp_schema =
460            CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
461
462        // Phase 3.1: Try ON condition first (preferred for hash join)
463        // Now supports multi-column composite keys for better performance
464        if let Some(cond) = condition {
465            // First try multi-column hash join for composite keys (2+ equi-join conditions)
466            if let Some(multi_result) =
467                join_analyzer::analyze_multi_column_equi_join(cond, &temp_schema, left_col_count)
468            {
469                // Use multi-column hash join if there are 2+ join columns
470                if multi_result.equi_joins.left_col_indices.len() >= 2 {
471                    // Save schemas for NATURAL JOIN processing before moving left/right
472                    let (left_schema_for_natural, right_schema_for_natural) = if natural {
473                        (Some(left.schema.clone()), Some(right.schema.clone()))
474                    } else {
475                        (None, None)
476                    };
477
478                    let mut result = hash_join_inner_multi(
479                        left,
480                        right,
481                        &multi_result.equi_joins.left_col_indices,
482                        &multi_result.equi_joins.right_col_indices,
483                    )?;
484
485                    // Apply remaining conditions as post-join filter
486                    if !multi_result.remaining_conditions.is_empty() {
487                        if let Some(filter_expr) =
488                            combine_with_and(multi_result.remaining_conditions)
489                        {
490                            result = apply_post_join_filter(
491                                result,
492                                &filter_expr,
493                                database,
494                                cte_results,
495                            )?;
496                        }
497                    }
498
499                    // For NATURAL JOIN, remove duplicate columns from the result
500                    if natural {
501                        if let (Some(left_schema), Some(right_schema_orig)) =
502                            (left_schema_for_natural, right_schema_for_natural)
503                        {
504                            let right_schema_for_removal = CombinedSchema {
505                                table_schemas: vec![(
506                                    right_table_name_for_natural.clone(),
507                                    (
508                                        0,
509                                        right_schema_orig
510                                            .table_schemas
511                                            .values()
512                                            .next()
513                                            .unwrap()
514                                            .1
515                                            .clone(),
516                                    ),
517                                )]
518                                .into_iter()
519                                .collect(),
520                                total_columns: right_schema_orig.total_columns,
521                            };
522                            result = remove_duplicate_columns_for_natural_join(
523                                result,
524                                &left_schema,
525                                &right_schema_for_removal,
526                            )?;
527                        }
528                    }
529
530                    return Ok(result);
531                }
532
533                // Single-column equi-join: use standard hash join (more efficient for single key)
534                // Save schemas for NATURAL JOIN processing before moving left/right
535                let (left_schema_for_natural, right_schema_for_natural) = if natural {
536                    (Some(left.schema.clone()), Some(right.schema.clone()))
537                } else {
538                    (None, None)
539                };
540
541                let mut result = hash_join_inner(
542                    left,
543                    right,
544                    multi_result.equi_joins.left_col_indices[0],
545                    multi_result.equi_joins.right_col_indices[0],
546                )?;
547
548                // Apply remaining conditions as post-join filter
549                if !multi_result.remaining_conditions.is_empty() {
550                    if let Some(filter_expr) = combine_with_and(multi_result.remaining_conditions) {
551                        result =
552                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
553                    }
554                }
555
556                // For NATURAL JOIN, remove duplicate columns from the result
557                if natural {
558                    if let (Some(left_schema), Some(right_schema_orig)) =
559                        (left_schema_for_natural, right_schema_for_natural)
560                    {
561                        let right_schema_for_removal = CombinedSchema {
562                            table_schemas: vec![(
563                                right_table_name_for_natural.clone(),
564                                (
565                                    0,
566                                    right_schema_orig
567                                        .table_schemas
568                                        .values()
569                                        .next()
570                                        .unwrap()
571                                        .1
572                                        .clone(),
573                                ),
574                            )]
575                            .into_iter()
576                            .collect(),
577                            total_columns: right_schema_orig.total_columns,
578                        };
579                        result = remove_duplicate_columns_for_natural_join(
580                            result,
581                            &left_schema,
582                            &right_schema_for_removal,
583                        )?;
584                    }
585                }
586
587                return Ok(result);
588            }
589        }
590
591        // Phase 3.2: Try OR conditions with common equi-join (TPC-H Q19 optimization)
592        // For expressions like `(a.x = b.x AND ...) OR (a.x = b.x AND ...) OR (a.x = b.x AND ...)`,
593        // extract the common equi-join `a.x = b.x` for hash join
594        if let Some(cond) = condition {
595            if let Some(or_result) =
596                join_analyzer::analyze_or_equi_join(cond, &temp_schema, left_col_count)
597            {
598                // Save schemas for NATURAL JOIN processing before moving left/right
599                let (left_schema_for_natural, right_schema_for_natural) = if natural {
600                    (Some(left.schema.clone()), Some(right.schema.clone()))
601                } else {
602                    (None, None)
603                };
604
605                let mut result = hash_join_inner(
606                    left,
607                    right,
608                    or_result.equi_join.left_col_idx,
609                    or_result.equi_join.right_col_idx,
610                )?;
611
612                // Apply remaining OR conditions as post-join filter
613                if !or_result.remaining_conditions.is_empty() {
614                    if let Some(filter_expr) = combine_with_and(or_result.remaining_conditions) {
615                        result =
616                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
617                    }
618                }
619
620                // For NATURAL JOIN, remove duplicate columns from the result
621                if natural {
622                    if let (Some(left_schema), Some(right_schema_orig)) =
623                        (left_schema_for_natural, right_schema_for_natural)
624                    {
625                        let right_schema_for_removal = CombinedSchema {
626                            table_schemas: vec![(
627                                right_table_name_for_natural.clone(),
628                                (
629                                    0,
630                                    right_schema_orig
631                                        .table_schemas
632                                        .values()
633                                        .next()
634                                        .unwrap()
635                                        .1
636                                        .clone(),
637                                ),
638                            )]
639                            .into_iter()
640                            .collect(),
641                            total_columns: right_schema_orig.total_columns,
642                        };
643                        result = remove_duplicate_columns_for_natural_join(
644                            result,
645                            &left_schema,
646                            &right_schema_for_removal,
647                        )?;
648                    }
649                }
650
651                return Ok(result);
652            }
653        }
654
655        // Phase 3.3: Try arithmetic equi-join (TPC-DS Q2 optimization)
656        // For expressions like `col1 = col2 - 53`, extract the arithmetic offset for hash join
657        if let Some(cond) = condition {
658            if let Some(arith_info) =
659                join_analyzer::analyze_arithmetic_equi_join(cond, &temp_schema, left_col_count)
660            {
661                // Save schemas for NATURAL JOIN processing before moving left/right
662                let (left_schema_for_natural, right_schema_for_natural) = if natural {
663                    (Some(left.schema.clone()), Some(right.schema.clone()))
664                } else {
665                    (None, None)
666                };
667
668                let mut result = hash_join_inner_arithmetic(
669                    left,
670                    right,
671                    arith_info.left_col_idx,
672                    arith_info.right_col_idx,
673                    arith_info.offset,
674                )?;
675
676                // For NATURAL JOIN, remove duplicate columns from the result
677                if natural {
678                    if let (Some(left_schema), Some(right_schema_orig)) =
679                        (left_schema_for_natural, right_schema_for_natural)
680                    {
681                        let right_schema_for_removal = CombinedSchema {
682                            table_schemas: vec![(
683                                right_table_name_for_natural.clone(),
684                                (
685                                    0,
686                                    right_schema_orig
687                                        .table_schemas
688                                        .values()
689                                        .next()
690                                        .unwrap()
691                                        .1
692                                        .clone(),
693                                ),
694                            )]
695                            .into_iter()
696                            .collect(),
697                            total_columns: right_schema_orig.total_columns,
698                        };
699                        result = remove_duplicate_columns_for_natural_join(
700                            result,
701                            &left_schema,
702                            &right_schema_for_removal,
703                        )?;
704                    }
705                }
706
707                return Ok(result);
708            }
709        }
710
711        // Phase 3.4: Try multi-column hash join from WHERE clause conditions
712        // When there are multiple equijoin conditions (e.g., ps_suppkey = l_suppkey AND ps_partkey = l_partkey),
713        // using composite key hash join is critical for performance. Single-key hash join with post-filter
714        // can cause catastrophic performance issues (48B cartesian products in Q9 at SF=0.1).
715        if additional_equijoins.len() >= 2 {
716            // Collect all valid equi-join conditions
717            let mut left_col_indices = Vec::new();
718            let mut right_col_indices = Vec::new();
719            let mut used_indices = Vec::new();
720
721            for (idx, equijoin) in additional_equijoins.iter().enumerate() {
722                if let Some(equi_join_info) =
723                    join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
724                {
725                    left_col_indices.push(equi_join_info.left_col_idx);
726                    right_col_indices.push(equi_join_info.right_col_idx);
727                    used_indices.push(idx);
728                }
729            }
730
731            // If we found 2+ equi-join conditions, use multi-column hash join
732            if left_col_indices.len() >= 2 {
733                // Save schemas for NATURAL JOIN processing before moving left/right
734                let (left_schema_for_natural, right_schema_for_natural) = if natural {
735                    (Some(left.schema.clone()), Some(right.schema.clone()))
736                } else {
737                    (None, None)
738                };
739
740                let mut result = hash_join_inner_multi(
741                    left,
742                    right,
743                    &left_col_indices,
744                    &right_col_indices,
745                )?;
746
747                // Apply remaining conditions (non-equijoins) as post-join filters
748                let remaining_conditions: Vec<_> = additional_equijoins
749                    .iter()
750                    .enumerate()
751                    .filter(|(i, _)| !used_indices.contains(i))
752                    .map(|(_, e)| e.clone())
753                    .collect();
754
755                if !remaining_conditions.is_empty() {
756                    if let Some(filter_expr) = combine_with_and(remaining_conditions) {
757                        result =
758                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
759                    }
760                }
761
762                // For NATURAL JOIN, remove duplicate columns from the result
763                if natural {
764                    if let (Some(left_schema), Some(right_schema_orig)) =
765                        (left_schema_for_natural, right_schema_for_natural)
766                    {
767                        let right_schema_for_removal = CombinedSchema {
768                            table_schemas: vec![(
769                                right_table_name_for_natural.clone(),
770                                (
771                                    0,
772                                    right_schema_orig
773                                        .table_schemas
774                                        .values()
775                                        .next()
776                                        .unwrap()
777                                        .1
778                                        .clone(),
779                                ),
780                            )]
781                            .into_iter()
782                            .collect(),
783                            total_columns: right_schema_orig.total_columns,
784                        };
785                        result = remove_duplicate_columns_for_natural_join(
786                            result,
787                            &left_schema,
788                            &right_schema_for_removal,
789                        )?;
790                    }
791                }
792
793                return Ok(result);
794            }
795        }
796
797        // Phase 3.5: If no multi-column hash join, try single-column WHERE clause equijoins
798        // Iterate through all additional equijoins to find one suitable for hash join
799        for (idx, equijoin) in additional_equijoins.iter().enumerate() {
800            if let Some(equi_join_info) =
801                join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
802            {
803                // Save schemas for NATURAL JOIN processing before moving left/right
804                let (left_schema_for_natural, right_schema_for_natural) = if natural {
805                    (Some(left.schema.clone()), Some(right.schema.clone()))
806                } else {
807                    (None, None)
808                };
809
810                // Found a WHERE clause equijoin suitable for hash join!
811                let mut result = hash_join_inner(
812                    left,
813                    right,
814                    equi_join_info.left_col_idx,
815                    equi_join_info.right_col_idx,
816                )?;
817
818                // Apply remaining equijoins and conditions as post-join filters
819                let remaining_conditions: Vec<_> = additional_equijoins
820                    .iter()
821                    .enumerate()
822                    .filter(|(i, _)| *i != idx)
823                    .map(|(_, e)| e.clone())
824                    .collect();
825
826                if !remaining_conditions.is_empty() {
827                    if let Some(filter_expr) = combine_with_and(remaining_conditions) {
828                        result =
829                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
830                    }
831                }
832
833                // For NATURAL JOIN, remove duplicate columns from the result
834                if natural {
835                    if let (Some(left_schema), Some(right_schema_orig)) =
836                        (left_schema_for_natural, right_schema_for_natural)
837                    {
838                        let right_schema_for_removal = CombinedSchema {
839                            table_schemas: vec![(
840                                right_table_name_for_natural.clone(),
841                                (
842                                    0,
843                                    right_schema_orig
844                                        .table_schemas
845                                        .values()
846                                        .next()
847                                        .unwrap()
848                                        .1
849                                        .clone(),
850                                ),
851                            )]
852                            .into_iter()
853                            .collect(),
854                            total_columns: right_schema_orig.total_columns,
855                        };
856                        result = remove_duplicate_columns_for_natural_join(
857                            result,
858                            &left_schema,
859                            &right_schema_for_removal,
860                        )?;
861                    }
862                }
863
864                return Ok(result);
865            }
866        }
867
868        // Phase 3.6: Try arithmetic equijoins from WHERE clause for hash join
869        // For expressions like `col1 = col2 - 53` in WHERE clause with Inner joins
870        // This enables hash join for derived table joins with arithmetic conditions (TPC-DS Q2)
871        for (idx, equijoin) in additional_equijoins.iter().enumerate() {
872            if let Some(arith_info) =
873                join_analyzer::analyze_arithmetic_equi_join(equijoin, &temp_schema, left_col_count)
874            {
875                // Save schemas for NATURAL JOIN processing before moving left/right
876                let (left_schema_for_natural, right_schema_for_natural) = if natural {
877                    (Some(left.schema.clone()), Some(right.schema.clone()))
878                } else {
879                    (None, None)
880                };
881
882                // Found an arithmetic equijoin suitable for hash join!
883                let mut result = hash_join_inner_arithmetic(
884                    left,
885                    right,
886                    arith_info.left_col_idx,
887                    arith_info.right_col_idx,
888                    arith_info.offset,
889                )?;
890
891                // Apply remaining equijoins as post-join filters
892                let remaining_conditions: Vec<_> = additional_equijoins
893                    .iter()
894                    .enumerate()
895                    .filter(|(i, _)| *i != idx)
896                    .map(|(_, e)| e.clone())
897                    .collect();
898
899                if !remaining_conditions.is_empty() {
900                    if let Some(filter_expr) = combine_with_and(remaining_conditions) {
901                        result =
902                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
903                    }
904                }
905
906                // For NATURAL JOIN, remove duplicate columns from the result
907                if natural {
908                    if let (Some(left_schema), Some(right_schema_orig)) =
909                        (left_schema_for_natural, right_schema_for_natural)
910                    {
911                        let right_schema_for_removal = CombinedSchema {
912                            table_schemas: vec![(
913                                right_table_name_for_natural.clone(),
914                                (
915                                    0,
916                                    right_schema_orig
917                                        .table_schemas
918                                        .values()
919                                        .next()
920                                        .unwrap()
921                                        .1
922                                        .clone(),
923                                ),
924                            )]
925                            .into_iter()
926                            .collect(),
927                            total_columns: right_schema_orig.total_columns,
928                        };
929                        result = remove_duplicate_columns_for_natural_join(
930                            result,
931                            &left_schema,
932                            &right_schema_for_removal,
933                        )?;
934                    }
935                }
936
937                return Ok(result);
938            }
939        }
940    }
941
942    // Try to use hash join for LEFT OUTER JOINs with equi-join conditions
943    // This optimization is critical for Q13 (customer LEFT JOIN orders)
944    if let vibesql_ast::JoinType::LeftOuter = join_type {
945        // Get column count and right table info for analysis
946        let left_col_count: usize =
947            left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
948
949        let right_table_name = right
950            .schema
951            .table_schemas
952            .keys()
953            .next()
954            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
955            .clone();
956
957        let right_schema = right
958            .schema
959            .table_schemas
960            .get(&right_table_name)
961            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
962            .1
963            .clone();
964
965        // Clone right_table_name before it gets moved into combine()
966        let right_table_name_for_natural = right_table_name.clone();
967
968        let temp_schema =
969            CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
970
971        // Try ON condition for hash join with multi-column support
972        // Use multi-column analysis to include ALL equi-join conditions in the hash join
973        // This is critical for LEFT JOIN correctness: post-filter conditions on NULL columns
974        // (from unmatched rows) incorrectly filter out valid LEFT JOIN results
975        if let Some(cond) = condition {
976            if let Some(multi_result) =
977                join_analyzer::analyze_multi_column_equi_join(cond, &temp_schema, left_col_count)
978            {
979                // Save schemas for NATURAL JOIN processing before moving left/right
980                let (left_schema_for_natural, right_schema_for_natural) = if natural {
981                    (Some(left.schema.clone()), Some(right.schema.clone()))
982                } else {
983                    (None, None)
984                };
985
986                // Use multi-column hash join when there are multiple equi-join columns
987                // This ensures all equi-join conditions are matched during hash lookup,
988                // not filtered afterward (which breaks LEFT JOIN semantics for NULL columns)
989                let mut result = if multi_result.equi_joins.left_col_indices.len() > 1 {
990                    hash_join_left_outer_multi(
991                        left,
992                        right,
993                        &multi_result.equi_joins.left_col_indices,
994                        &multi_result.equi_joins.right_col_indices,
995                    )?
996                } else {
997                    // Single column - use optimized single-column hash join
998                    hash_join_left_outer(
999                        left,
1000                        right,
1001                        multi_result.equi_joins.left_col_indices[0],
1002                        multi_result.equi_joins.right_col_indices[0],
1003                    )?
1004                };
1005
1006                // Apply remaining NON-equi-join conditions as post-join filter
1007                // Note: For LEFT JOIN, remaining conditions on right columns may evaluate
1008                // to NULL for unmatched rows. These rows should be preserved.
1009                // TODO: Consider a LEFT JOIN-aware post-filter that preserves NULL results
1010                if !multi_result.remaining_conditions.is_empty() {
1011                    if let Some(filter_expr) =
1012                        combine_with_and(multi_result.remaining_conditions)
1013                    {
1014                        result =
1015                            apply_post_join_filter(result, &filter_expr, database, cte_results)?;
1016                    }
1017                }
1018
1019                // For NATURAL JOIN, remove duplicate columns from the result
1020                if natural {
1021                    if let (Some(left_schema), Some(right_schema_orig)) =
1022                        (left_schema_for_natural, right_schema_for_natural)
1023                    {
1024                        let right_schema_for_removal = CombinedSchema {
1025                            table_schemas: vec![(
1026                                right_table_name_for_natural.clone(),
1027                                (
1028                                    0,
1029                                    right_schema_orig
1030                                        .table_schemas
1031                                        .values()
1032                                        .next()
1033                                        .unwrap()
1034                                        .1
1035                                        .clone(),
1036                                ),
1037                            )]
1038                            .into_iter()
1039                            .collect(),
1040                            total_columns: right_schema_orig.total_columns,
1041                        };
1042                        result = remove_duplicate_columns_for_natural_join(
1043                            result,
1044                            &left_schema,
1045                            &right_schema_for_removal,
1046                        )?;
1047                    }
1048                }
1049
1050                return Ok(result);
1051            }
1052        }
1053    }
1054
1055    // Try to use hash join for SEMI/ANTI JOINs with equi-join conditions
1056    if matches!(join_type, vibesql_ast::JoinType::Semi | vibesql_ast::JoinType::Anti) {
1057        // Get column count for analysis
1058        let left_col_count: usize =
1059            left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
1060
1061        let right_table_name = right
1062            .schema
1063            .table_schemas
1064            .keys()
1065            .next()
1066            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
1067            .clone();
1068
1069        let right_schema = right
1070            .schema
1071            .table_schemas
1072            .get(&right_table_name)
1073            .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
1074            .1
1075            .clone();
1076
1077        let temp_schema =
1078            CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
1079
1080        // Try ON condition first - use analyze_compound_equi_join to handle complex conditions
1081        // This enables hash join optimization for EXISTS subqueries with additional predicates
1082        // Example: EXISTS (SELECT * FROM t WHERE t.x = outer.x AND t.y <> outer.y)
1083        // The equi-join (t.x = outer.x) is used for hash join, and the inequality is a post-filter
1084        if let Some(cond) = condition {
1085            if let Some(compound_result) =
1086                join_analyzer::analyze_compound_equi_join(cond, &temp_schema, left_col_count)
1087            {
1088                // Build the combined remaining condition (if any)
1089                let remaining_filter = combine_with_and(compound_result.remaining_conditions);
1090
1091                let result = if matches!(join_type, vibesql_ast::JoinType::Semi) {
1092                    hash_semi_join_with_filter(
1093                        left,
1094                        right,
1095                        compound_result.equi_join.left_col_idx,
1096                        compound_result.equi_join.right_col_idx,
1097                        remaining_filter.as_ref(),
1098                        &temp_schema,
1099                        database,
1100                    )?
1101                } else {
1102                    hash_anti_join_with_filter(
1103                        left,
1104                        right,
1105                        compound_result.equi_join.left_col_idx,
1106                        compound_result.equi_join.right_col_idx,
1107                        remaining_filter.as_ref(),
1108                        &temp_schema,
1109                        database,
1110                    )?
1111                };
1112
1113                return Ok(result);
1114            }
1115        }
1116
1117        // Try WHERE clause equijoins
1118        for equijoin in additional_equijoins.iter() {
1119            if let Some(equi_join_info) =
1120                join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
1121            {
1122                let result = if matches!(join_type, vibesql_ast::JoinType::Semi) {
1123                    hash_semi_join(
1124                        left,
1125                        right,
1126                        equi_join_info.left_col_idx,
1127                        equi_join_info.right_col_idx,
1128                    )?
1129                } else {
1130                    hash_anti_join(
1131                        left,
1132                        right,
1133                        equi_join_info.left_col_idx,
1134                        equi_join_info.right_col_idx,
1135                    )?
1136                };
1137
1138                return Ok(result);
1139            }
1140        }
1141    }
1142
1143    // Try to use hash join for CROSS JOINs when equijoin conditions exist in WHERE clause
1144    // This is critical for Q21 and other TPC-H queries with implicit (comma-separated) joins
1145    // CROSS JOIN with equijoin predicates should be executed as hash INNER JOIN
1146    if let vibesql_ast::JoinType::Cross = join_type {
1147        if !additional_equijoins.is_empty() {
1148            // Get column count and right table info for analysis
1149            let left_col_count: usize =
1150                left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
1151
1152            let right_table_name = right
1153                .schema
1154                .table_schemas
1155                .keys()
1156                .next()
1157                .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
1158                .clone();
1159
1160            let right_schema = right
1161                .schema
1162                .table_schemas
1163                .get(&right_table_name)
1164                .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
1165                .1
1166                .clone();
1167
1168            let temp_schema =
1169                CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
1170
1171            // Try WHERE clause equijoins for hash join
1172            for (idx, equijoin) in additional_equijoins.iter().enumerate() {
1173                if let Some(equi_join_info) =
1174                    join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
1175                {
1176                    // Found a WHERE clause equijoin suitable for hash join!
1177                    // Execute CROSS JOIN as hash INNER JOIN with the equijoin condition
1178                    let mut result = hash_join_inner(
1179                        left,
1180                        right,
1181                        equi_join_info.left_col_idx,
1182                        equi_join_info.right_col_idx,
1183                    )?;
1184
1185                    // Apply remaining equijoins as post-join filters
1186                    let remaining_conditions: Vec<_> = additional_equijoins
1187                        .iter()
1188                        .enumerate()
1189                        .filter(|(i, _)| *i != idx)
1190                        .map(|(_, e)| e.clone())
1191                        .collect();
1192
1193                    if !remaining_conditions.is_empty() {
1194                        if let Some(filter_expr) = combine_with_and(remaining_conditions) {
1195                            result = apply_post_join_filter(
1196                                result,
1197                                &filter_expr,
1198                                database,
1199                                cte_results,
1200                            )?;
1201                        }
1202                    }
1203
1204                    return Ok(result);
1205                }
1206            }
1207
1208            // Try arithmetic equijoins for hash join (TPC-DS Q2 optimization)
1209            // For expressions like `col1 = col2 - 53` in WHERE clause
1210            for (idx, equijoin) in additional_equijoins.iter().enumerate() {
1211                if let Some(arith_info) = join_analyzer::analyze_arithmetic_equi_join(
1212                    equijoin,
1213                    &temp_schema,
1214                    left_col_count,
1215                ) {
1216                    // Found an arithmetic equijoin suitable for hash join!
1217                    let mut result = hash_join_inner_arithmetic(
1218                        left,
1219                        right,
1220                        arith_info.left_col_idx,
1221                        arith_info.right_col_idx,
1222                        arith_info.offset,
1223                    )?;
1224
1225                    // Apply remaining equijoins as post-join filters
1226                    let remaining_conditions: Vec<_> = additional_equijoins
1227                        .iter()
1228                        .enumerate()
1229                        .filter(|(i, _)| *i != idx)
1230                        .map(|(_, e)| e.clone())
1231                        .collect();
1232
1233                    if !remaining_conditions.is_empty() {
1234                        if let Some(filter_expr) = combine_with_and(remaining_conditions) {
1235                            result = apply_post_join_filter(
1236                                result,
1237                                &filter_expr,
1238                                database,
1239                                cte_results,
1240                            )?;
1241                        }
1242                    }
1243
1244                    return Ok(result);
1245                }
1246            }
1247        }
1248    }
1249
1250    // Prepare combined join condition including additional equijoins from WHERE clause
1251    let mut all_join_conditions = Vec::new();
1252    if let Some(cond) = condition {
1253        all_join_conditions.push(cond.clone());
1254    }
1255    all_join_conditions.extend_from_slice(additional_equijoins);
1256
1257    // Combine all join conditions with AND
1258    let combined_condition = combine_with_and(all_join_conditions);
1259
1260    // Fall back to nested loop join for all other cases
1261    // For NATURAL JOIN, we need to preserve the original schemas for duplicate removal
1262    let (left_schema_for_natural, right_schema_for_natural) = if natural {
1263        (Some(left.schema.clone()), Some(right.schema.clone()))
1264    } else {
1265        (None, None)
1266    };
1267
1268    let mut result = match join_type {
1269        vibesql_ast::JoinType::Inner => {
1270            nested_loop_inner_join(left, right, &combined_condition, database, timeout_ctx)
1271        }
1272        vibesql_ast::JoinType::LeftOuter => {
1273            nested_loop_left_outer_join(left, right, &combined_condition, database, timeout_ctx)
1274        }
1275        vibesql_ast::JoinType::RightOuter => {
1276            nested_loop_right_outer_join(left, right, &combined_condition, database, timeout_ctx)
1277        }
1278        vibesql_ast::JoinType::FullOuter => {
1279            nested_loop_full_outer_join(left, right, &combined_condition, database, timeout_ctx)
1280        }
1281        vibesql_ast::JoinType::Cross => {
1282            nested_loop_cross_join(left, right, &combined_condition, database, timeout_ctx)
1283        }
1284        vibesql_ast::JoinType::Semi => {
1285            nested_loop_semi_join(left, right, &combined_condition, database, timeout_ctx)
1286        }
1287        vibesql_ast::JoinType::Anti => {
1288            nested_loop_anti_join(left, right, &combined_condition, database, timeout_ctx)
1289        }
1290    }?;
1291
1292    // For NATURAL JOIN, remove duplicate columns from the result
1293    if natural {
1294        if let (Some(left_schema), Some(right_schema)) =
1295            (left_schema_for_natural, right_schema_for_natural)
1296        {
1297            result =
1298                remove_duplicate_columns_for_natural_join(result, &left_schema, &right_schema)?;
1299        }
1300    }
1301
1302    Ok(result)
1303}
1304
1305/// Remove duplicate columns for NATURAL JOIN
1306///
1307/// NATURAL JOIN should only include common columns once (from the left side).
1308/// This function identifies common columns and removes duplicates from the right side.
1309fn remove_duplicate_columns_for_natural_join(
1310    mut result: FromResult,
1311    left_schema: &CombinedSchema,
1312    right_schema: &CombinedSchema,
1313) -> Result<FromResult, ExecutorError> {
1314    use std::collections::{HashMap, HashSet};
1315
1316    // Find common column names (case-insensitive)
1317    let mut left_column_map: HashMap<String, Vec<(String, String, usize)>> = HashMap::new(); // lowercase -> [(table, actual_name, idx)]
1318    let mut col_idx = 0;
1319    for (table_name, (_table_idx, table_schema)) in &left_schema.table_schemas {
1320        for col in &table_schema.columns {
1321            let lowercase = col.name.to_lowercase();
1322            left_column_map.entry(lowercase).or_default().push((
1323                table_name.to_string(),
1324                col.name.clone(),
1325                col_idx,
1326            ));
1327            col_idx += 1;
1328        }
1329    }
1330
1331    // Identify which columns from the right side are duplicates
1332    let mut right_duplicate_indices: HashSet<usize> = HashSet::new();
1333    let left_col_count = col_idx;
1334    col_idx = 0;
1335    for (_table_idx, table_schema) in right_schema.table_schemas.values() {
1336        for col in &table_schema.columns {
1337            let lowercase = col.name.to_lowercase();
1338            if left_column_map.contains_key(&lowercase) {
1339                // This is a common column, mark it as a duplicate to remove
1340                right_duplicate_indices.insert(left_col_count + col_idx);
1341            }
1342            col_idx += 1;
1343        }
1344    }
1345
1346    // If no duplicates, return as-is
1347    if right_duplicate_indices.is_empty() {
1348        return Ok(result);
1349    }
1350
1351    // Project out the duplicate columns from the result
1352    let total_cols = left_col_count + col_idx;
1353    let keep_indices: Vec<usize> =
1354        (0..total_cols).filter(|i| !right_duplicate_indices.contains(i)).collect();
1355
1356    // Build new schema without duplicate columns
1357    let mut new_schema = CombinedSchema { table_schemas: HashMap::new(), total_columns: 0 };
1358    for (table_name, (table_start_idx, table_schema)) in &result.schema.table_schemas {
1359        let mut new_cols = Vec::new();
1360
1361        for (idx, col) in table_schema.columns.iter().enumerate() {
1362            // Calculate absolute column index manually
1363            let abs_col_idx = table_start_idx + idx;
1364
1365            if keep_indices.contains(&abs_col_idx) {
1366                new_cols.push(col.clone());
1367            }
1368        }
1369
1370        if !new_cols.is_empty() {
1371            let new_table_schema =
1372                vibesql_catalog::TableSchema::new(table_schema.name.clone(), new_cols);
1373            new_schema
1374                .table_schemas
1375                .insert(table_name.clone(), (new_schema.total_columns, new_table_schema.clone()));
1376            new_schema.total_columns += new_table_schema.columns.len();
1377        }
1378    }
1379
1380    // Project the rows - get mutable reference to rows to work with FromResult API
1381    let rows = result.rows();
1382    let new_rows: Vec<vibesql_storage::Row> = rows
1383        .iter()
1384        .map(|row| {
1385            let new_values: Vec<vibesql_types::SqlValue> =
1386                keep_indices.iter().filter_map(|&i| row.values.get(i).cloned()).collect();
1387            vibesql_storage::Row::new(new_values)
1388        })
1389        .collect();
1390
1391    Ok(FromResult::from_rows(new_schema, new_rows))
1392}