1use std::sync::Arc;
2
3use crate::{
4 errors::ExecutorError, evaluator::CombinedExpressionEvaluator, optimizer::combine_with_and,
5 schema::CombinedSchema, timeout::TimeoutContext,
6};
7use super::from_iterator::FromIterator;
8
9mod expression_mapper;
10pub(crate) mod hash_join;
11mod hash_join_iterator;
12mod hash_semi_join;
13mod hash_anti_join;
14mod join_analyzer;
15mod nested_loop;
16pub mod reorder;
17pub mod search;
18
19#[cfg(test)]
20mod tests;
21
22use hash_join::{hash_join_inner, hash_join_inner_arithmetic, hash_join_inner_multi, hash_join_left_outer};
25use hash_semi_join::{hash_semi_join, hash_semi_join_with_filter};
26use hash_anti_join::{hash_anti_join, hash_anti_join_with_filter};
27pub use hash_join_iterator::HashJoinIterator;
29use nested_loop::{
31 nested_loop_anti_join, nested_loop_cross_join, nested_loop_full_outer_join,
32 nested_loop_inner_join, nested_loop_left_outer_join, nested_loop_right_outer_join,
33 nested_loop_semi_join,
34};
35pub use reorder::JoinOrderAnalyzer;
36pub use search::JoinOrderSearch;
38
39pub(super) enum FromData {
45 Materialized(Vec<vibesql_storage::Row>),
47
48 SharedRows(Arc<Vec<vibesql_storage::Row>>),
53
54 Iterator(FromIterator),
56}
57
58impl FromData {
59 pub fn into_rows(self) -> Vec<vibesql_storage::Row> {
64 match self {
65 Self::Materialized(rows) => rows,
66 Self::SharedRows(arc) => Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()),
67 Self::Iterator(iter) => iter.collect_vec(),
68 }
69 }
70
71 pub fn as_rows(&mut self) -> &Vec<vibesql_storage::Row> {
75 if let Self::Iterator(iter) = self {
77 #[cfg(feature = "profile-q6")]
78 let materialize_start = std::time::Instant::now();
79
80 let rows = std::mem::replace(iter, FromIterator::from_vec(vec![])).collect_vec();
81 *self = Self::Materialized(rows);
82
83 #[cfg(feature = "profile-q6")]
84 {
85 let materialize_time = materialize_start.elapsed();
86 if let Self::Materialized(rows) = self {
87 eprintln!("[Q6 PROFILE] Row materialization (collect_vec): {:?} ({} rows, {:?}/row)",
88 materialize_time, rows.len(), materialize_time / rows.len() as u32);
89 }
90 }
91 }
92
93 match self {
95 Self::Materialized(ref rows) => rows,
96 Self::SharedRows(ref arc) => arc.as_ref(),
97 Self::Iterator(_) => unreachable!(),
98 }
99 }
100
101 pub fn as_slice(&self) -> &[vibesql_storage::Row] {
109 match self {
110 Self::Materialized(rows) => rows.as_slice(),
111 Self::SharedRows(arc) => arc.as_slice(),
112 Self::Iterator(iter) => iter.as_slice(),
113 }
114 }
115}
116
117pub(super) struct FromResult {
121 pub(super) schema: CombinedSchema,
122 pub(super) data: FromData,
123 pub(super) sorted_by: Option<Vec<(String, vibesql_ast::OrderDirection)>>,
126 pub(super) where_filtered: bool,
130}
131
132impl FromResult {
133 pub(super) fn from_rows(schema: CombinedSchema, rows: Vec<vibesql_storage::Row>) -> Self {
135 Self { schema, data: FromData::Materialized(rows), sorted_by: None, where_filtered: false }
136 }
137
138 pub(super) fn from_shared_rows(schema: CombinedSchema, rows: Arc<Vec<vibesql_storage::Row>>) -> Self {
143 Self { schema, data: FromData::SharedRows(rows), sorted_by: None, where_filtered: false }
144 }
145
146 pub(super) fn from_rows_sorted(
148 schema: CombinedSchema,
149 rows: Vec<vibesql_storage::Row>,
150 sorted_by: Vec<(String, vibesql_ast::OrderDirection)>,
151 ) -> Self {
152 Self { schema, data: FromData::Materialized(rows), sorted_by: Some(sorted_by), where_filtered: false }
153 }
154
155 pub(super) fn from_rows_where_filtered(
157 schema: CombinedSchema,
158 rows: Vec<vibesql_storage::Row>,
159 sorted_by: Option<Vec<(String, vibesql_ast::OrderDirection)>>,
160 ) -> Self {
161 Self { schema, data: FromData::Materialized(rows), sorted_by, where_filtered: true }
162 }
163
164 pub(super) fn from_iterator(schema: CombinedSchema, iterator: FromIterator) -> Self {
166 Self { schema, data: FromData::Iterator(iterator), sorted_by: None, where_filtered: false }
167 }
168
169 pub(super) fn into_rows(self) -> Vec<vibesql_storage::Row> {
171 self.data.into_rows()
172 }
173
174 #[allow(dead_code)]
179 pub(super) fn rows_mut(&mut self) -> &mut Vec<vibesql_storage::Row> {
180 match &mut self.data {
182 FromData::Iterator(iter) => {
183 let rows = std::mem::replace(iter, FromIterator::from_vec(vec![])).collect_vec();
184 self.data = FromData::Materialized(rows);
185 }
186 FromData::SharedRows(arc) => {
187 let rows = arc.as_ref().clone();
189 self.data = FromData::Materialized(rows);
190 }
191 FromData::Materialized(_) => {}
192 }
193
194 match &mut self.data {
196 FromData::Materialized(rows) => rows,
197 FromData::SharedRows(_) | FromData::Iterator(_) => unreachable!(),
198 }
199 }
200
201 pub(super) fn rows(&mut self) -> &Vec<vibesql_storage::Row> {
203 self.data.as_rows()
204 }
205
206 pub(super) fn as_slice(&self) -> &[vibesql_storage::Row] {
215 self.data.as_slice()
216 }
217}
218
219#[inline]
222fn combine_rows(left_row: &vibesql_storage::Row, right_row: &vibesql_storage::Row) -> vibesql_storage::Row {
223 let mut combined_values = Vec::with_capacity(left_row.values.len() + right_row.values.len());
224 combined_values.extend_from_slice(&left_row.values);
225 combined_values.extend_from_slice(&right_row.values);
226 vibesql_storage::Row::new(combined_values)
227}
228
229fn apply_post_join_filter(
234 result: FromResult,
235 filter_expr: &vibesql_ast::Expression,
236 database: &vibesql_storage::Database,
237) -> Result<FromResult, ExecutorError> {
238 let schema = result.schema.clone();
240 let evaluator = CombinedExpressionEvaluator::with_database(&schema, database);
241
242 let mut filtered_rows = Vec::new();
244 for row in result.into_rows() {
245 match evaluator.eval(filter_expr, &row)? {
246 vibesql_types::SqlValue::Boolean(true) => filtered_rows.push(row),
247 vibesql_types::SqlValue::Boolean(false) => {} vibesql_types::SqlValue::Null => {} vibesql_types::SqlValue::Integer(0) => {} vibesql_types::SqlValue::Integer(_) => filtered_rows.push(row),
252 vibesql_types::SqlValue::Smallint(0) => {} vibesql_types::SqlValue::Smallint(_) => filtered_rows.push(row),
254 vibesql_types::SqlValue::Bigint(0) => {} vibesql_types::SqlValue::Bigint(_) => filtered_rows.push(row),
256 vibesql_types::SqlValue::Float(0.0) => {} vibesql_types::SqlValue::Float(_) => filtered_rows.push(row),
258 vibesql_types::SqlValue::Real(0.0) => {} vibesql_types::SqlValue::Real(_) => filtered_rows.push(row),
260 vibesql_types::SqlValue::Double(0.0) => {} vibesql_types::SqlValue::Double(_) => filtered_rows.push(row),
262 other => {
263 return Err(ExecutorError::InvalidWhereClause(format!(
264 "Filter expression must evaluate to boolean, got: {:?}",
265 other
266 )))
267 }
268 }
269 }
270
271 Ok(FromResult::from_rows(schema, filtered_rows))
272}
273
274#[allow(clippy::too_many_arguments)]
283pub(super) fn nested_loop_join(
284 left: FromResult,
285 right: FromResult,
286 join_type: &vibesql_ast::JoinType,
287 condition: &Option<vibesql_ast::Expression>,
288 natural: bool,
289 database: &vibesql_storage::Database,
290 additional_equijoins: &[vibesql_ast::Expression],
291 timeout_ctx: &TimeoutContext,
292) -> Result<FromResult, ExecutorError> {
293 if let vibesql_ast::JoinType::Inner = join_type {
295 let left_col_count: usize =
299 left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
300
301 let right_table_name = right
302 .schema
303 .table_schemas
304 .keys()
305 .next()
306 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
307 .clone();
308
309 let right_schema = right
310 .schema
311 .table_schemas
312 .get(&right_table_name)
313 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
314 .1
315 .clone();
316
317 let right_table_name_for_natural = right_table_name.clone();
319
320 let temp_schema =
321 CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
322
323 if let Some(cond) = condition {
326 if let Some(multi_result) =
328 join_analyzer::analyze_multi_column_equi_join(cond, &temp_schema, left_col_count)
329 {
330 if multi_result.equi_joins.left_col_indices.len() >= 2 {
332 let (left_schema_for_natural, right_schema_for_natural) = if natural {
334 (Some(left.schema.clone()), Some(right.schema.clone()))
335 } else {
336 (None, None)
337 };
338
339 let mut result = hash_join_inner_multi(
340 left,
341 right,
342 &multi_result.equi_joins.left_col_indices,
343 &multi_result.equi_joins.right_col_indices,
344 )?;
345
346 if !multi_result.remaining_conditions.is_empty() {
348 if let Some(filter_expr) = combine_with_and(multi_result.remaining_conditions) {
349 result = apply_post_join_filter(result, &filter_expr, database)?;
350 }
351 }
352
353 if natural {
355 if let (Some(left_schema), Some(right_schema_orig)) =
356 (left_schema_for_natural, right_schema_for_natural)
357 {
358 let right_schema_for_removal = CombinedSchema {
359 table_schemas: vec![(
360 right_table_name_for_natural.clone(),
361 (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
362 )]
363 .into_iter()
364 .collect(),
365 total_columns: right_schema_orig.total_columns,
366 };
367 result = remove_duplicate_columns_for_natural_join(
368 result,
369 &left_schema,
370 &right_schema_for_removal,
371 )?;
372 }
373 }
374
375 return Ok(result);
376 }
377
378 let (left_schema_for_natural, right_schema_for_natural) = if natural {
381 (Some(left.schema.clone()), Some(right.schema.clone()))
382 } else {
383 (None, None)
384 };
385
386 let mut result = hash_join_inner(
387 left,
388 right,
389 multi_result.equi_joins.left_col_indices[0],
390 multi_result.equi_joins.right_col_indices[0],
391 )?;
392
393 if !multi_result.remaining_conditions.is_empty() {
395 if let Some(filter_expr) = combine_with_and(multi_result.remaining_conditions) {
396 result = apply_post_join_filter(result, &filter_expr, database)?;
397 }
398 }
399
400 if natural {
402 if let (Some(left_schema), Some(right_schema_orig)) =
403 (left_schema_for_natural, right_schema_for_natural)
404 {
405 let right_schema_for_removal = CombinedSchema {
406 table_schemas: vec![(
407 right_table_name_for_natural.clone(),
408 (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
409 )]
410 .into_iter()
411 .collect(),
412 total_columns: right_schema_orig.total_columns,
413 };
414 result = remove_duplicate_columns_for_natural_join(
415 result,
416 &left_schema,
417 &right_schema_for_removal,
418 )?;
419 }
420 }
421
422 return Ok(result);
423 }
424 }
425
426 if let Some(cond) = condition {
430 if let Some(or_result) =
431 join_analyzer::analyze_or_equi_join(cond, &temp_schema, left_col_count)
432 {
433 let (left_schema_for_natural, right_schema_for_natural) = if natural {
435 (Some(left.schema.clone()), Some(right.schema.clone()))
436 } else {
437 (None, None)
438 };
439
440 let mut result = hash_join_inner(
441 left,
442 right,
443 or_result.equi_join.left_col_idx,
444 or_result.equi_join.right_col_idx,
445 )?;
446
447 if !or_result.remaining_conditions.is_empty() {
449 if let Some(filter_expr) = combine_with_and(or_result.remaining_conditions) {
450 result = apply_post_join_filter(result, &filter_expr, database)?;
451 }
452 }
453
454 if natural {
456 if let (Some(left_schema), Some(right_schema_orig)) =
457 (left_schema_for_natural, right_schema_for_natural)
458 {
459 let right_schema_for_removal = CombinedSchema {
460 table_schemas: vec![(
461 right_table_name_for_natural.clone(),
462 (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
463 )]
464 .into_iter()
465 .collect(),
466 total_columns: right_schema_orig.total_columns,
467 };
468 result = remove_duplicate_columns_for_natural_join(
469 result,
470 &left_schema,
471 &right_schema_for_removal,
472 )?;
473 }
474 }
475
476 return Ok(result);
477 }
478 }
479
480 if let Some(cond) = condition {
483 if let Some(arith_info) =
484 join_analyzer::analyze_arithmetic_equi_join(cond, &temp_schema, left_col_count)
485 {
486 let (left_schema_for_natural, right_schema_for_natural) = if natural {
488 (Some(left.schema.clone()), Some(right.schema.clone()))
489 } else {
490 (None, None)
491 };
492
493 let mut result = hash_join_inner_arithmetic(
494 left,
495 right,
496 arith_info.left_col_idx,
497 arith_info.right_col_idx,
498 arith_info.offset,
499 )?;
500
501 if natural {
503 if let (Some(left_schema), Some(right_schema_orig)) =
504 (left_schema_for_natural, right_schema_for_natural)
505 {
506 let right_schema_for_removal = CombinedSchema {
507 table_schemas: vec![(
508 right_table_name_for_natural.clone(),
509 (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
510 )]
511 .into_iter()
512 .collect(),
513 total_columns: right_schema_orig.total_columns,
514 };
515 result = remove_duplicate_columns_for_natural_join(
516 result,
517 &left_schema,
518 &right_schema_for_removal,
519 )?;
520 }
521 }
522
523 return Ok(result);
524 }
525 }
526
527 for (idx, equijoin) in additional_equijoins.iter().enumerate() {
530 if let Some(equi_join_info) =
531 join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
532 {
533 let (left_schema_for_natural, right_schema_for_natural) = if natural {
535 (Some(left.schema.clone()), Some(right.schema.clone()))
536 } else {
537 (None, None)
538 };
539
540 let mut result = hash_join_inner(
542 left,
543 right,
544 equi_join_info.left_col_idx,
545 equi_join_info.right_col_idx,
546 )?;
547
548 let remaining_conditions: Vec<_> = additional_equijoins
550 .iter()
551 .enumerate()
552 .filter(|(i, _)| *i != idx)
553 .map(|(_, e)| e.clone())
554 .collect();
555
556 if !remaining_conditions.is_empty() {
557 if let Some(filter_expr) = combine_with_and(remaining_conditions) {
558 result = apply_post_join_filter(result, &filter_expr, database)?;
559 }
560 }
561
562 if natural {
564 if let (Some(left_schema), Some(right_schema_orig)) =
565 (left_schema_for_natural, right_schema_for_natural)
566 {
567 let right_schema_for_removal = CombinedSchema {
568 table_schemas: vec![(
569 right_table_name_for_natural.clone(),
570 (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
571 )]
572 .into_iter()
573 .collect(),
574 total_columns: right_schema_orig.total_columns,
575 };
576 result = remove_duplicate_columns_for_natural_join(
577 result,
578 &left_schema,
579 &right_schema_for_removal,
580 )?;
581 }
582 }
583
584 return Ok(result);
585 }
586 }
587
588 for (idx, equijoin) in additional_equijoins.iter().enumerate() {
592 if let Some(arith_info) =
593 join_analyzer::analyze_arithmetic_equi_join(equijoin, &temp_schema, left_col_count)
594 {
595 let (left_schema_for_natural, right_schema_for_natural) = if natural {
597 (Some(left.schema.clone()), Some(right.schema.clone()))
598 } else {
599 (None, None)
600 };
601
602 let mut result = hash_join_inner_arithmetic(
604 left,
605 right,
606 arith_info.left_col_idx,
607 arith_info.right_col_idx,
608 arith_info.offset,
609 )?;
610
611 let remaining_conditions: Vec<_> = additional_equijoins
613 .iter()
614 .enumerate()
615 .filter(|(i, _)| *i != idx)
616 .map(|(_, e)| e.clone())
617 .collect();
618
619 if !remaining_conditions.is_empty() {
620 if let Some(filter_expr) = combine_with_and(remaining_conditions) {
621 result = apply_post_join_filter(result, &filter_expr, database)?;
622 }
623 }
624
625 if natural {
627 if let (Some(left_schema), Some(right_schema_orig)) =
628 (left_schema_for_natural, right_schema_for_natural)
629 {
630 let right_schema_for_removal = CombinedSchema {
631 table_schemas: vec![(
632 right_table_name_for_natural.clone(),
633 (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
634 )]
635 .into_iter()
636 .collect(),
637 total_columns: right_schema_orig.total_columns,
638 };
639 result = remove_duplicate_columns_for_natural_join(
640 result,
641 &left_schema,
642 &right_schema_for_removal,
643 )?;
644 }
645 }
646
647 return Ok(result);
648 }
649 }
650 }
651
652 if let vibesql_ast::JoinType::LeftOuter = join_type {
655 let left_col_count: usize =
657 left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
658
659 let right_table_name = right
660 .schema
661 .table_schemas
662 .keys()
663 .next()
664 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
665 .clone();
666
667 let right_schema = right
668 .schema
669 .table_schemas
670 .get(&right_table_name)
671 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
672 .1
673 .clone();
674
675 let right_table_name_for_natural = right_table_name.clone();
677
678 let temp_schema =
679 CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
680
681 if let Some(cond) = condition {
683 if let Some(compound_result) =
684 join_analyzer::analyze_compound_equi_join(cond, &temp_schema, left_col_count)
685 {
686 let (left_schema_for_natural, right_schema_for_natural) = if natural {
688 (Some(left.schema.clone()), Some(right.schema.clone()))
689 } else {
690 (None, None)
691 };
692
693 let mut result = hash_join_left_outer(
694 left,
695 right,
696 compound_result.equi_join.left_col_idx,
697 compound_result.equi_join.right_col_idx,
698 )?;
699
700 if !compound_result.remaining_conditions.is_empty() {
702 if let Some(filter_expr) = combine_with_and(compound_result.remaining_conditions) {
703 result = apply_post_join_filter(result, &filter_expr, database)?;
704 }
705 }
706
707 if natural {
709 if let (Some(left_schema), Some(right_schema_orig)) =
710 (left_schema_for_natural, right_schema_for_natural)
711 {
712 let right_schema_for_removal = CombinedSchema {
713 table_schemas: vec![(
714 right_table_name_for_natural.clone(),
715 (0, right_schema_orig.table_schemas.values().next().unwrap().1.clone()),
716 )]
717 .into_iter()
718 .collect(),
719 total_columns: right_schema_orig.total_columns,
720 };
721 result = remove_duplicate_columns_for_natural_join(
722 result,
723 &left_schema,
724 &right_schema_for_removal,
725 )?;
726 }
727 }
728
729 return Ok(result);
730 }
731 }
732 }
733
734 if matches!(join_type, vibesql_ast::JoinType::Semi | vibesql_ast::JoinType::Anti) {
736 let left_col_count: usize =
738 left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
739
740 let right_table_name = right
741 .schema
742 .table_schemas
743 .keys()
744 .next()
745 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
746 .clone();
747
748 let right_schema = right
749 .schema
750 .table_schemas
751 .get(&right_table_name)
752 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
753 .1
754 .clone();
755
756 let temp_schema =
757 CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
758
759 if let Some(cond) = condition {
764 if let Some(compound_result) =
765 join_analyzer::analyze_compound_equi_join(cond, &temp_schema, left_col_count)
766 {
767 let remaining_filter = combine_with_and(compound_result.remaining_conditions);
769
770 let result = if matches!(join_type, vibesql_ast::JoinType::Semi) {
771 hash_semi_join_with_filter(
772 left,
773 right,
774 compound_result.equi_join.left_col_idx,
775 compound_result.equi_join.right_col_idx,
776 remaining_filter.as_ref(),
777 &temp_schema,
778 database,
779 )?
780 } else {
781 hash_anti_join_with_filter(
782 left,
783 right,
784 compound_result.equi_join.left_col_idx,
785 compound_result.equi_join.right_col_idx,
786 remaining_filter.as_ref(),
787 &temp_schema,
788 database,
789 )?
790 };
791
792 return Ok(result);
793 }
794 }
795
796 for equijoin in additional_equijoins.iter() {
798 if let Some(equi_join_info) =
799 join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
800 {
801 let result = if matches!(join_type, vibesql_ast::JoinType::Semi) {
802 hash_semi_join(
803 left,
804 right,
805 equi_join_info.left_col_idx,
806 equi_join_info.right_col_idx,
807 )?
808 } else {
809 hash_anti_join(
810 left,
811 right,
812 equi_join_info.left_col_idx,
813 equi_join_info.right_col_idx,
814 )?
815 };
816
817 return Ok(result);
818 }
819 }
820 }
821
822 if let vibesql_ast::JoinType::Cross = join_type {
826 if !additional_equijoins.is_empty() {
827 let left_col_count: usize =
829 left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
830
831 let right_table_name = right
832 .schema
833 .table_schemas
834 .keys()
835 .next()
836 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
837 .clone();
838
839 let right_schema = right
840 .schema
841 .table_schemas
842 .get(&right_table_name)
843 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
844 .1
845 .clone();
846
847 let temp_schema =
848 CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
849
850 for (idx, equijoin) in additional_equijoins.iter().enumerate() {
852 if let Some(equi_join_info) =
853 join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
854 {
855 let mut result = hash_join_inner(
858 left,
859 right,
860 equi_join_info.left_col_idx,
861 equi_join_info.right_col_idx,
862 )?;
863
864 let remaining_conditions: Vec<_> = additional_equijoins
866 .iter()
867 .enumerate()
868 .filter(|(i, _)| *i != idx)
869 .map(|(_, e)| e.clone())
870 .collect();
871
872 if !remaining_conditions.is_empty() {
873 if let Some(filter_expr) = combine_with_and(remaining_conditions) {
874 result = apply_post_join_filter(result, &filter_expr, database)?;
875 }
876 }
877
878 return Ok(result);
879 }
880 }
881
882 for (idx, equijoin) in additional_equijoins.iter().enumerate() {
885 if let Some(arith_info) =
886 join_analyzer::analyze_arithmetic_equi_join(equijoin, &temp_schema, left_col_count)
887 {
888 let mut result = hash_join_inner_arithmetic(
890 left,
891 right,
892 arith_info.left_col_idx,
893 arith_info.right_col_idx,
894 arith_info.offset,
895 )?;
896
897 let remaining_conditions: Vec<_> = additional_equijoins
899 .iter()
900 .enumerate()
901 .filter(|(i, _)| *i != idx)
902 .map(|(_, e)| e.clone())
903 .collect();
904
905 if !remaining_conditions.is_empty() {
906 if let Some(filter_expr) = combine_with_and(remaining_conditions) {
907 result = apply_post_join_filter(result, &filter_expr, database)?;
908 }
909 }
910
911 return Ok(result);
912 }
913 }
914 }
915 }
916
917 let mut all_join_conditions = Vec::new();
919 if let Some(cond) = condition {
920 all_join_conditions.push(cond.clone());
921 }
922 all_join_conditions.extend_from_slice(additional_equijoins);
923
924 let combined_condition = combine_with_and(all_join_conditions);
926
927 let (left_schema_for_natural, right_schema_for_natural) = if natural {
930 (Some(left.schema.clone()), Some(right.schema.clone()))
931 } else {
932 (None, None)
933 };
934
935 let mut result = match join_type {
936 vibesql_ast::JoinType::Inner => nested_loop_inner_join(left, right, &combined_condition, database, timeout_ctx),
937 vibesql_ast::JoinType::LeftOuter => {
938 nested_loop_left_outer_join(left, right, &combined_condition, database, timeout_ctx)
939 }
940 vibesql_ast::JoinType::RightOuter => {
941 nested_loop_right_outer_join(left, right, &combined_condition, database, timeout_ctx)
942 }
943 vibesql_ast::JoinType::FullOuter => {
944 nested_loop_full_outer_join(left, right, &combined_condition, database, timeout_ctx)
945 }
946 vibesql_ast::JoinType::Cross => nested_loop_cross_join(left, right, &combined_condition, database, timeout_ctx),
947 vibesql_ast::JoinType::Semi => nested_loop_semi_join(left, right, &combined_condition, database, timeout_ctx),
948 vibesql_ast::JoinType::Anti => nested_loop_anti_join(left, right, &combined_condition, database, timeout_ctx),
949 }?;
950
951 if natural {
953 if let (Some(left_schema), Some(right_schema)) = (left_schema_for_natural, right_schema_for_natural) {
954 result = remove_duplicate_columns_for_natural_join(result, &left_schema, &right_schema)?;
955 }
956 }
957
958 Ok(result)
959}
960
961fn remove_duplicate_columns_for_natural_join(
966 mut result: FromResult,
967 left_schema: &CombinedSchema,
968 right_schema: &CombinedSchema,
969) -> Result<FromResult, ExecutorError> {
970 use std::collections::{HashMap, HashSet};
971
972 let mut left_column_map: HashMap<String, Vec<(String, String, usize)>> = HashMap::new(); let mut col_idx = 0;
975 for (table_name, (_table_idx, table_schema)) in &left_schema.table_schemas {
976 for col in &table_schema.columns {
977 let lowercase = col.name.to_lowercase();
978 left_column_map
979 .entry(lowercase)
980 .or_default()
981 .push((table_name.clone(), col.name.clone(), col_idx));
982 col_idx += 1;
983 }
984 }
985
986 let mut right_duplicate_indices: HashSet<usize> = HashSet::new();
988 let left_col_count = col_idx;
989 col_idx = 0;
990 for (_table_idx, table_schema) in right_schema.table_schemas.values() {
991 for col in &table_schema.columns {
992 let lowercase = col.name.to_lowercase();
993 if left_column_map.contains_key(&lowercase) {
994 right_duplicate_indices.insert(left_col_count + col_idx);
996 }
997 col_idx += 1;
998 }
999 }
1000
1001 if right_duplicate_indices.is_empty() {
1003 return Ok(result);
1004 }
1005
1006 let total_cols = left_col_count + col_idx;
1008 let keep_indices: Vec<usize> = (0..total_cols)
1009 .filter(|i| !right_duplicate_indices.contains(i))
1010 .collect();
1011
1012 let mut new_schema = CombinedSchema { table_schemas: HashMap::new(), total_columns: 0 };
1014 for (table_name, (table_start_idx, table_schema)) in &result.schema.table_schemas {
1015 let mut new_cols = Vec::new();
1016
1017 for (idx, col) in table_schema.columns.iter().enumerate() {
1018 let abs_col_idx = table_start_idx + idx;
1020
1021 if keep_indices.contains(&abs_col_idx) {
1022 new_cols.push(col.clone());
1023 }
1024 }
1025
1026 if !new_cols.is_empty() {
1027 let new_table_schema = vibesql_catalog::TableSchema::new(
1028 table_schema.name.clone(),
1029 new_cols,
1030 );
1031 new_schema.table_schemas.insert(
1032 table_name.clone(),
1033 (new_schema.total_columns, new_table_schema.clone()),
1034 );
1035 new_schema.total_columns += new_table_schema.columns.len();
1036 }
1037 }
1038
1039 let rows = result.rows();
1041 let new_rows: Vec<vibesql_storage::Row> = rows
1042 .iter()
1043 .map(|row| {
1044 let new_values: Vec<vibesql_types::SqlValue> = keep_indices
1045 .iter()
1046 .filter_map(|&i| row.values.get(i).cloned())
1047 .collect();
1048 vibesql_storage::Row::new(new_values)
1049 })
1050 .collect();
1051
1052 Ok(FromResult::from_rows(new_schema, new_rows))
1053}