1use std::collections::HashMap;
2use std::sync::Arc;
3
4use super::{cte::CteResult, from_iterator::FromIterator};
5use crate::{
6 errors::ExecutorError, evaluator::CombinedExpressionEvaluator, optimizer::combine_with_and,
7 schema::CombinedSchema, timeout::TimeoutContext,
8};
9
10mod expression_mapper;
11mod hash_anti_join;
12pub(crate) mod hash_join;
13mod hash_join_iterator;
14mod hash_semi_join;
15mod join_analyzer;
16mod nested_loop;
17pub mod reorder;
18pub mod search;
19
20#[cfg(test)]
21mod tests;
22
23use hash_anti_join::{hash_anti_join, hash_anti_join_with_filter};
26use hash_join::{
27 hash_join_inner, hash_join_inner_arithmetic, hash_join_inner_multi, hash_join_left_outer,
28};
29use hash_semi_join::{hash_semi_join, hash_semi_join_with_filter};
30pub use hash_join_iterator::HashJoinIterator;
32use nested_loop::{
34 nested_loop_anti_join, nested_loop_cross_join, nested_loop_full_outer_join,
35 nested_loop_inner_join, nested_loop_left_outer_join, nested_loop_right_outer_join,
36 nested_loop_semi_join,
37};
38pub use reorder::JoinOrderAnalyzer;
39pub use search::JoinOrderSearch;
41
42pub(super) enum FromData {
48 Materialized(Vec<vibesql_storage::Row>),
50
51 SharedRows(Arc<Vec<vibesql_storage::Row>>),
56
57 Iterator(FromIterator),
59}
60
61impl FromData {
62 pub fn into_rows(self) -> Vec<vibesql_storage::Row> {
67 match self {
68 Self::Materialized(rows) => rows,
69 Self::SharedRows(arc) => Arc::try_unwrap(arc).unwrap_or_else(|arc| (*arc).clone()),
70 Self::Iterator(iter) => iter.collect_vec(),
71 }
72 }
73
74 pub fn as_rows(&mut self) -> &Vec<vibesql_storage::Row> {
78 if let Self::Iterator(iter) = self {
80 #[cfg(feature = "profile-q6")]
81 let materialize_start = std::time::Instant::now();
82
83 let rows = std::mem::replace(iter, FromIterator::from_vec(vec![])).collect_vec();
84 *self = Self::Materialized(rows);
85
86 #[cfg(feature = "profile-q6")]
87 {
88 let materialize_time = materialize_start.elapsed();
89 if let Self::Materialized(rows) = self {
90 eprintln!(
91 "[Q6 PROFILE] Row materialization (collect_vec): {:?} ({} rows, {:?}/row)",
92 materialize_time,
93 rows.len(),
94 materialize_time / rows.len() as u32
95 );
96 }
97 }
98 }
99
100 match self {
102 Self::Materialized(ref rows) => rows,
103 Self::SharedRows(ref arc) => arc.as_ref(),
104 Self::Iterator(_) => unreachable!(),
105 }
106 }
107
108 pub fn as_slice(&self) -> &[vibesql_storage::Row] {
116 match self {
117 Self::Materialized(rows) => rows.as_slice(),
118 Self::SharedRows(arc) => arc.as_slice(),
119 Self::Iterator(iter) => iter.as_slice(),
120 }
121 }
122}
123
124pub(super) struct FromResult {
128 pub(super) schema: CombinedSchema,
129 pub(super) data: FromData,
130 pub(super) sorted_by: Option<Vec<(String, vibesql_ast::OrderDirection)>>,
133 pub(super) where_filtered: bool,
137}
138
139impl FromResult {
140 pub(super) fn from_rows(schema: CombinedSchema, rows: Vec<vibesql_storage::Row>) -> Self {
142 Self { schema, data: FromData::Materialized(rows), sorted_by: None, where_filtered: false }
143 }
144
145 pub(super) fn from_shared_rows(
150 schema: CombinedSchema,
151 rows: Arc<Vec<vibesql_storage::Row>>,
152 ) -> Self {
153 Self { schema, data: FromData::SharedRows(rows), sorted_by: None, where_filtered: false }
154 }
155
156 pub(super) fn from_rows_sorted(
158 schema: CombinedSchema,
159 rows: Vec<vibesql_storage::Row>,
160 sorted_by: Vec<(String, vibesql_ast::OrderDirection)>,
161 ) -> Self {
162 Self {
163 schema,
164 data: FromData::Materialized(rows),
165 sorted_by: Some(sorted_by),
166 where_filtered: false,
167 }
168 }
169
170 pub(super) fn from_rows_where_filtered(
172 schema: CombinedSchema,
173 rows: Vec<vibesql_storage::Row>,
174 sorted_by: Option<Vec<(String, vibesql_ast::OrderDirection)>>,
175 ) -> Self {
176 Self { schema, data: FromData::Materialized(rows), sorted_by, where_filtered: true }
177 }
178
179 pub(super) fn from_iterator(schema: CombinedSchema, iterator: FromIterator) -> Self {
181 Self { schema, data: FromData::Iterator(iterator), sorted_by: None, where_filtered: false }
182 }
183
184 pub(super) fn into_rows(self) -> Vec<vibesql_storage::Row> {
186 self.data.into_rows()
187 }
188
189 #[allow(dead_code)]
194 pub(super) fn rows_mut(&mut self) -> &mut Vec<vibesql_storage::Row> {
195 match &mut self.data {
197 FromData::Iterator(iter) => {
198 let rows = std::mem::replace(iter, FromIterator::from_vec(vec![])).collect_vec();
199 self.data = FromData::Materialized(rows);
200 }
201 FromData::SharedRows(arc) => {
202 let rows = arc.as_ref().clone();
204 self.data = FromData::Materialized(rows);
205 }
206 FromData::Materialized(_) => {}
207 }
208
209 match &mut self.data {
211 FromData::Materialized(rows) => rows,
212 FromData::SharedRows(_) | FromData::Iterator(_) => unreachable!(),
213 }
214 }
215
216 pub(super) fn rows(&mut self) -> &Vec<vibesql_storage::Row> {
218 self.data.as_rows()
219 }
220
221 pub(super) fn as_slice(&self) -> &[vibesql_storage::Row] {
230 self.data.as_slice()
231 }
232}
233
234#[inline]
237fn combine_rows(
238 left_row: &vibesql_storage::Row,
239 right_row: &vibesql_storage::Row,
240) -> vibesql_storage::Row {
241 let mut combined_values = Vec::with_capacity(left_row.values.len() + right_row.values.len());
242 combined_values.extend_from_slice(&left_row.values);
243 combined_values.extend_from_slice(&right_row.values);
244 vibesql_storage::Row::new(combined_values)
245}
246
247fn apply_post_join_filter(
255 result: FromResult,
256 filter_expr: &vibesql_ast::Expression,
257 database: &vibesql_storage::Database,
258 cte_results: &HashMap<String, CteResult>,
259) -> Result<FromResult, ExecutorError> {
260 let schema = result.schema.clone();
262 let evaluator = if cte_results.is_empty() {
264 CombinedExpressionEvaluator::with_database(&schema, database)
265 } else {
266 CombinedExpressionEvaluator::with_database_and_cte(&schema, database, cte_results)
267 };
268
269 let mut filtered_rows = Vec::new();
271 for row in result.into_rows() {
272 match evaluator.eval(filter_expr, &row)? {
273 vibesql_types::SqlValue::Boolean(true) => filtered_rows.push(row),
274 vibesql_types::SqlValue::Boolean(false) => {} vibesql_types::SqlValue::Null => {} vibesql_types::SqlValue::Integer(0) => {} vibesql_types::SqlValue::Integer(_) => filtered_rows.push(row),
279 vibesql_types::SqlValue::Smallint(0) => {} vibesql_types::SqlValue::Smallint(_) => filtered_rows.push(row),
281 vibesql_types::SqlValue::Bigint(0) => {} vibesql_types::SqlValue::Bigint(_) => filtered_rows.push(row),
283 vibesql_types::SqlValue::Float(0.0) => {} vibesql_types::SqlValue::Float(_) => filtered_rows.push(row),
285 vibesql_types::SqlValue::Real(0.0) => {} vibesql_types::SqlValue::Real(_) => filtered_rows.push(row),
287 vibesql_types::SqlValue::Double(0.0) => {} vibesql_types::SqlValue::Double(_) => filtered_rows.push(row),
289 other => {
290 return Err(ExecutorError::InvalidWhereClause(format!(
291 "Filter expression must evaluate to boolean, got: {:?}",
292 other
293 )))
294 }
295 }
296 }
297
298 Ok(FromResult::from_rows(schema, filtered_rows))
299}
300
301#[allow(clippy::too_many_arguments)]
313pub(super) fn nested_loop_join(
314 left: FromResult,
315 right: FromResult,
316 join_type: &vibesql_ast::JoinType,
317 condition: &Option<vibesql_ast::Expression>,
318 natural: bool,
319 database: &vibesql_storage::Database,
320 additional_equijoins: &[vibesql_ast::Expression],
321 timeout_ctx: &TimeoutContext,
322 cte_results: &HashMap<String, CteResult>,
323) -> Result<FromResult, ExecutorError> {
324 if let vibesql_ast::JoinType::Inner = join_type {
326 let left_col_count: usize =
330 left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
331
332 let right_table_name = right
333 .schema
334 .table_schemas
335 .keys()
336 .next()
337 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
338 .clone();
339
340 let right_schema = right
341 .schema
342 .table_schemas
343 .get(&right_table_name)
344 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
345 .1
346 .clone();
347
348 let right_table_name_for_natural = right_table_name.clone();
350
351 let temp_schema =
352 CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
353
354 if let Some(cond) = condition {
357 if let Some(multi_result) =
359 join_analyzer::analyze_multi_column_equi_join(cond, &temp_schema, left_col_count)
360 {
361 if multi_result.equi_joins.left_col_indices.len() >= 2 {
363 let (left_schema_for_natural, right_schema_for_natural) = if natural {
365 (Some(left.schema.clone()), Some(right.schema.clone()))
366 } else {
367 (None, None)
368 };
369
370 let mut result = hash_join_inner_multi(
371 left,
372 right,
373 &multi_result.equi_joins.left_col_indices,
374 &multi_result.equi_joins.right_col_indices,
375 )?;
376
377 if !multi_result.remaining_conditions.is_empty() {
379 if let Some(filter_expr) =
380 combine_with_and(multi_result.remaining_conditions)
381 {
382 result = apply_post_join_filter(
383 result,
384 &filter_expr,
385 database,
386 cte_results,
387 )?;
388 }
389 }
390
391 if natural {
393 if let (Some(left_schema), Some(right_schema_orig)) =
394 (left_schema_for_natural, right_schema_for_natural)
395 {
396 let right_schema_for_removal = CombinedSchema {
397 table_schemas: vec![(
398 right_table_name_for_natural.clone(),
399 (
400 0,
401 right_schema_orig
402 .table_schemas
403 .values()
404 .next()
405 .unwrap()
406 .1
407 .clone(),
408 ),
409 )]
410 .into_iter()
411 .collect(),
412 total_columns: right_schema_orig.total_columns,
413 };
414 result = remove_duplicate_columns_for_natural_join(
415 result,
416 &left_schema,
417 &right_schema_for_removal,
418 )?;
419 }
420 }
421
422 return Ok(result);
423 }
424
425 let (left_schema_for_natural, right_schema_for_natural) = if natural {
428 (Some(left.schema.clone()), Some(right.schema.clone()))
429 } else {
430 (None, None)
431 };
432
433 let mut result = hash_join_inner(
434 left,
435 right,
436 multi_result.equi_joins.left_col_indices[0],
437 multi_result.equi_joins.right_col_indices[0],
438 )?;
439
440 if !multi_result.remaining_conditions.is_empty() {
442 if let Some(filter_expr) = combine_with_and(multi_result.remaining_conditions) {
443 result =
444 apply_post_join_filter(result, &filter_expr, database, cte_results)?;
445 }
446 }
447
448 if natural {
450 if let (Some(left_schema), Some(right_schema_orig)) =
451 (left_schema_for_natural, right_schema_for_natural)
452 {
453 let right_schema_for_removal = CombinedSchema {
454 table_schemas: vec![(
455 right_table_name_for_natural.clone(),
456 (
457 0,
458 right_schema_orig
459 .table_schemas
460 .values()
461 .next()
462 .unwrap()
463 .1
464 .clone(),
465 ),
466 )]
467 .into_iter()
468 .collect(),
469 total_columns: right_schema_orig.total_columns,
470 };
471 result = remove_duplicate_columns_for_natural_join(
472 result,
473 &left_schema,
474 &right_schema_for_removal,
475 )?;
476 }
477 }
478
479 return Ok(result);
480 }
481 }
482
483 if let Some(cond) = condition {
487 if let Some(or_result) =
488 join_analyzer::analyze_or_equi_join(cond, &temp_schema, left_col_count)
489 {
490 let (left_schema_for_natural, right_schema_for_natural) = if natural {
492 (Some(left.schema.clone()), Some(right.schema.clone()))
493 } else {
494 (None, None)
495 };
496
497 let mut result = hash_join_inner(
498 left,
499 right,
500 or_result.equi_join.left_col_idx,
501 or_result.equi_join.right_col_idx,
502 )?;
503
504 if !or_result.remaining_conditions.is_empty() {
506 if let Some(filter_expr) = combine_with_and(or_result.remaining_conditions) {
507 result =
508 apply_post_join_filter(result, &filter_expr, database, cte_results)?;
509 }
510 }
511
512 if natural {
514 if let (Some(left_schema), Some(right_schema_orig)) =
515 (left_schema_for_natural, right_schema_for_natural)
516 {
517 let right_schema_for_removal = CombinedSchema {
518 table_schemas: vec![(
519 right_table_name_for_natural.clone(),
520 (
521 0,
522 right_schema_orig
523 .table_schemas
524 .values()
525 .next()
526 .unwrap()
527 .1
528 .clone(),
529 ),
530 )]
531 .into_iter()
532 .collect(),
533 total_columns: right_schema_orig.total_columns,
534 };
535 result = remove_duplicate_columns_for_natural_join(
536 result,
537 &left_schema,
538 &right_schema_for_removal,
539 )?;
540 }
541 }
542
543 return Ok(result);
544 }
545 }
546
547 if let Some(cond) = condition {
550 if let Some(arith_info) =
551 join_analyzer::analyze_arithmetic_equi_join(cond, &temp_schema, left_col_count)
552 {
553 let (left_schema_for_natural, right_schema_for_natural) = if natural {
555 (Some(left.schema.clone()), Some(right.schema.clone()))
556 } else {
557 (None, None)
558 };
559
560 let mut result = hash_join_inner_arithmetic(
561 left,
562 right,
563 arith_info.left_col_idx,
564 arith_info.right_col_idx,
565 arith_info.offset,
566 )?;
567
568 if natural {
570 if let (Some(left_schema), Some(right_schema_orig)) =
571 (left_schema_for_natural, right_schema_for_natural)
572 {
573 let right_schema_for_removal = CombinedSchema {
574 table_schemas: vec![(
575 right_table_name_for_natural.clone(),
576 (
577 0,
578 right_schema_orig
579 .table_schemas
580 .values()
581 .next()
582 .unwrap()
583 .1
584 .clone(),
585 ),
586 )]
587 .into_iter()
588 .collect(),
589 total_columns: right_schema_orig.total_columns,
590 };
591 result = remove_duplicate_columns_for_natural_join(
592 result,
593 &left_schema,
594 &right_schema_for_removal,
595 )?;
596 }
597 }
598
599 return Ok(result);
600 }
601 }
602
603 for (idx, equijoin) in additional_equijoins.iter().enumerate() {
606 if let Some(equi_join_info) =
607 join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
608 {
609 let (left_schema_for_natural, right_schema_for_natural) = if natural {
611 (Some(left.schema.clone()), Some(right.schema.clone()))
612 } else {
613 (None, None)
614 };
615
616 let mut result = hash_join_inner(
618 left,
619 right,
620 equi_join_info.left_col_idx,
621 equi_join_info.right_col_idx,
622 )?;
623
624 let remaining_conditions: Vec<_> = additional_equijoins
626 .iter()
627 .enumerate()
628 .filter(|(i, _)| *i != idx)
629 .map(|(_, e)| e.clone())
630 .collect();
631
632 if !remaining_conditions.is_empty() {
633 if let Some(filter_expr) = combine_with_and(remaining_conditions) {
634 result =
635 apply_post_join_filter(result, &filter_expr, database, cte_results)?;
636 }
637 }
638
639 if natural {
641 if let (Some(left_schema), Some(right_schema_orig)) =
642 (left_schema_for_natural, right_schema_for_natural)
643 {
644 let right_schema_for_removal = CombinedSchema {
645 table_schemas: vec![(
646 right_table_name_for_natural.clone(),
647 (
648 0,
649 right_schema_orig
650 .table_schemas
651 .values()
652 .next()
653 .unwrap()
654 .1
655 .clone(),
656 ),
657 )]
658 .into_iter()
659 .collect(),
660 total_columns: right_schema_orig.total_columns,
661 };
662 result = remove_duplicate_columns_for_natural_join(
663 result,
664 &left_schema,
665 &right_schema_for_removal,
666 )?;
667 }
668 }
669
670 return Ok(result);
671 }
672 }
673
674 for (idx, equijoin) in additional_equijoins.iter().enumerate() {
678 if let Some(arith_info) =
679 join_analyzer::analyze_arithmetic_equi_join(equijoin, &temp_schema, left_col_count)
680 {
681 let (left_schema_for_natural, right_schema_for_natural) = if natural {
683 (Some(left.schema.clone()), Some(right.schema.clone()))
684 } else {
685 (None, None)
686 };
687
688 let mut result = hash_join_inner_arithmetic(
690 left,
691 right,
692 arith_info.left_col_idx,
693 arith_info.right_col_idx,
694 arith_info.offset,
695 )?;
696
697 let remaining_conditions: Vec<_> = additional_equijoins
699 .iter()
700 .enumerate()
701 .filter(|(i, _)| *i != idx)
702 .map(|(_, e)| e.clone())
703 .collect();
704
705 if !remaining_conditions.is_empty() {
706 if let Some(filter_expr) = combine_with_and(remaining_conditions) {
707 result =
708 apply_post_join_filter(result, &filter_expr, database, cte_results)?;
709 }
710 }
711
712 if natural {
714 if let (Some(left_schema), Some(right_schema_orig)) =
715 (left_schema_for_natural, right_schema_for_natural)
716 {
717 let right_schema_for_removal = CombinedSchema {
718 table_schemas: vec![(
719 right_table_name_for_natural.clone(),
720 (
721 0,
722 right_schema_orig
723 .table_schemas
724 .values()
725 .next()
726 .unwrap()
727 .1
728 .clone(),
729 ),
730 )]
731 .into_iter()
732 .collect(),
733 total_columns: right_schema_orig.total_columns,
734 };
735 result = remove_duplicate_columns_for_natural_join(
736 result,
737 &left_schema,
738 &right_schema_for_removal,
739 )?;
740 }
741 }
742
743 return Ok(result);
744 }
745 }
746 }
747
748 if let vibesql_ast::JoinType::LeftOuter = join_type {
751 let left_col_count: usize =
753 left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
754
755 let right_table_name = right
756 .schema
757 .table_schemas
758 .keys()
759 .next()
760 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
761 .clone();
762
763 let right_schema = right
764 .schema
765 .table_schemas
766 .get(&right_table_name)
767 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
768 .1
769 .clone();
770
771 let right_table_name_for_natural = right_table_name.clone();
773
774 let temp_schema =
775 CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
776
777 if let Some(cond) = condition {
779 if let Some(compound_result) =
780 join_analyzer::analyze_compound_equi_join(cond, &temp_schema, left_col_count)
781 {
782 let (left_schema_for_natural, right_schema_for_natural) = if natural {
784 (Some(left.schema.clone()), Some(right.schema.clone()))
785 } else {
786 (None, None)
787 };
788
789 let mut result = hash_join_left_outer(
790 left,
791 right,
792 compound_result.equi_join.left_col_idx,
793 compound_result.equi_join.right_col_idx,
794 )?;
795
796 if !compound_result.remaining_conditions.is_empty() {
798 if let Some(filter_expr) =
799 combine_with_and(compound_result.remaining_conditions)
800 {
801 result =
802 apply_post_join_filter(result, &filter_expr, database, cte_results)?;
803 }
804 }
805
806 if natural {
808 if let (Some(left_schema), Some(right_schema_orig)) =
809 (left_schema_for_natural, right_schema_for_natural)
810 {
811 let right_schema_for_removal = CombinedSchema {
812 table_schemas: vec![(
813 right_table_name_for_natural.clone(),
814 (
815 0,
816 right_schema_orig
817 .table_schemas
818 .values()
819 .next()
820 .unwrap()
821 .1
822 .clone(),
823 ),
824 )]
825 .into_iter()
826 .collect(),
827 total_columns: right_schema_orig.total_columns,
828 };
829 result = remove_duplicate_columns_for_natural_join(
830 result,
831 &left_schema,
832 &right_schema_for_removal,
833 )?;
834 }
835 }
836
837 return Ok(result);
838 }
839 }
840 }
841
842 if matches!(join_type, vibesql_ast::JoinType::Semi | vibesql_ast::JoinType::Anti) {
844 let left_col_count: usize =
846 left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
847
848 let right_table_name = right
849 .schema
850 .table_schemas
851 .keys()
852 .next()
853 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
854 .clone();
855
856 let right_schema = right
857 .schema
858 .table_schemas
859 .get(&right_table_name)
860 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
861 .1
862 .clone();
863
864 let temp_schema =
865 CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
866
867 if let Some(cond) = condition {
872 if let Some(compound_result) =
873 join_analyzer::analyze_compound_equi_join(cond, &temp_schema, left_col_count)
874 {
875 let remaining_filter = combine_with_and(compound_result.remaining_conditions);
877
878 let result = if matches!(join_type, vibesql_ast::JoinType::Semi) {
879 hash_semi_join_with_filter(
880 left,
881 right,
882 compound_result.equi_join.left_col_idx,
883 compound_result.equi_join.right_col_idx,
884 remaining_filter.as_ref(),
885 &temp_schema,
886 database,
887 )?
888 } else {
889 hash_anti_join_with_filter(
890 left,
891 right,
892 compound_result.equi_join.left_col_idx,
893 compound_result.equi_join.right_col_idx,
894 remaining_filter.as_ref(),
895 &temp_schema,
896 database,
897 )?
898 };
899
900 return Ok(result);
901 }
902 }
903
904 for equijoin in additional_equijoins.iter() {
906 if let Some(equi_join_info) =
907 join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
908 {
909 let result = if matches!(join_type, vibesql_ast::JoinType::Semi) {
910 hash_semi_join(
911 left,
912 right,
913 equi_join_info.left_col_idx,
914 equi_join_info.right_col_idx,
915 )?
916 } else {
917 hash_anti_join(
918 left,
919 right,
920 equi_join_info.left_col_idx,
921 equi_join_info.right_col_idx,
922 )?
923 };
924
925 return Ok(result);
926 }
927 }
928 }
929
930 if let vibesql_ast::JoinType::Cross = join_type {
934 if !additional_equijoins.is_empty() {
935 let left_col_count: usize =
937 left.schema.table_schemas.values().map(|(_, schema)| schema.columns.len()).sum();
938
939 let right_table_name = right
940 .schema
941 .table_schemas
942 .keys()
943 .next()
944 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
945 .clone();
946
947 let right_schema = right
948 .schema
949 .table_schemas
950 .get(&right_table_name)
951 .ok_or_else(|| ExecutorError::UnsupportedFeature("Complex JOIN".to_string()))?
952 .1
953 .clone();
954
955 let temp_schema =
956 CombinedSchema::combine(left.schema.clone(), right_table_name, right_schema);
957
958 for (idx, equijoin) in additional_equijoins.iter().enumerate() {
960 if let Some(equi_join_info) =
961 join_analyzer::analyze_equi_join(equijoin, &temp_schema, left_col_count)
962 {
963 let mut result = hash_join_inner(
966 left,
967 right,
968 equi_join_info.left_col_idx,
969 equi_join_info.right_col_idx,
970 )?;
971
972 let remaining_conditions: Vec<_> = additional_equijoins
974 .iter()
975 .enumerate()
976 .filter(|(i, _)| *i != idx)
977 .map(|(_, e)| e.clone())
978 .collect();
979
980 if !remaining_conditions.is_empty() {
981 if let Some(filter_expr) = combine_with_and(remaining_conditions) {
982 result = apply_post_join_filter(
983 result,
984 &filter_expr,
985 database,
986 cte_results,
987 )?;
988 }
989 }
990
991 return Ok(result);
992 }
993 }
994
995 for (idx, equijoin) in additional_equijoins.iter().enumerate() {
998 if let Some(arith_info) = join_analyzer::analyze_arithmetic_equi_join(
999 equijoin,
1000 &temp_schema,
1001 left_col_count,
1002 ) {
1003 let mut result = hash_join_inner_arithmetic(
1005 left,
1006 right,
1007 arith_info.left_col_idx,
1008 arith_info.right_col_idx,
1009 arith_info.offset,
1010 )?;
1011
1012 let remaining_conditions: Vec<_> = additional_equijoins
1014 .iter()
1015 .enumerate()
1016 .filter(|(i, _)| *i != idx)
1017 .map(|(_, e)| e.clone())
1018 .collect();
1019
1020 if !remaining_conditions.is_empty() {
1021 if let Some(filter_expr) = combine_with_and(remaining_conditions) {
1022 result = apply_post_join_filter(
1023 result,
1024 &filter_expr,
1025 database,
1026 cte_results,
1027 )?;
1028 }
1029 }
1030
1031 return Ok(result);
1032 }
1033 }
1034 }
1035 }
1036
1037 let mut all_join_conditions = Vec::new();
1039 if let Some(cond) = condition {
1040 all_join_conditions.push(cond.clone());
1041 }
1042 all_join_conditions.extend_from_slice(additional_equijoins);
1043
1044 let combined_condition = combine_with_and(all_join_conditions);
1046
1047 let (left_schema_for_natural, right_schema_for_natural) = if natural {
1050 (Some(left.schema.clone()), Some(right.schema.clone()))
1051 } else {
1052 (None, None)
1053 };
1054
1055 let mut result = match join_type {
1056 vibesql_ast::JoinType::Inner => {
1057 nested_loop_inner_join(left, right, &combined_condition, database, timeout_ctx)
1058 }
1059 vibesql_ast::JoinType::LeftOuter => {
1060 nested_loop_left_outer_join(left, right, &combined_condition, database, timeout_ctx)
1061 }
1062 vibesql_ast::JoinType::RightOuter => {
1063 nested_loop_right_outer_join(left, right, &combined_condition, database, timeout_ctx)
1064 }
1065 vibesql_ast::JoinType::FullOuter => {
1066 nested_loop_full_outer_join(left, right, &combined_condition, database, timeout_ctx)
1067 }
1068 vibesql_ast::JoinType::Cross => {
1069 nested_loop_cross_join(left, right, &combined_condition, database, timeout_ctx)
1070 }
1071 vibesql_ast::JoinType::Semi => {
1072 nested_loop_semi_join(left, right, &combined_condition, database, timeout_ctx)
1073 }
1074 vibesql_ast::JoinType::Anti => {
1075 nested_loop_anti_join(left, right, &combined_condition, database, timeout_ctx)
1076 }
1077 }?;
1078
1079 if natural {
1081 if let (Some(left_schema), Some(right_schema)) =
1082 (left_schema_for_natural, right_schema_for_natural)
1083 {
1084 result =
1085 remove_duplicate_columns_for_natural_join(result, &left_schema, &right_schema)?;
1086 }
1087 }
1088
1089 Ok(result)
1090}
1091
1092fn remove_duplicate_columns_for_natural_join(
1097 mut result: FromResult,
1098 left_schema: &CombinedSchema,
1099 right_schema: &CombinedSchema,
1100) -> Result<FromResult, ExecutorError> {
1101 use std::collections::{HashMap, HashSet};
1102
1103 let mut left_column_map: HashMap<String, Vec<(String, String, usize)>> = HashMap::new(); let mut col_idx = 0;
1106 for (table_name, (_table_idx, table_schema)) in &left_schema.table_schemas {
1107 for col in &table_schema.columns {
1108 let lowercase = col.name.to_lowercase();
1109 left_column_map.entry(lowercase).or_default().push((
1110 table_name.clone(),
1111 col.name.clone(),
1112 col_idx,
1113 ));
1114 col_idx += 1;
1115 }
1116 }
1117
1118 let mut right_duplicate_indices: HashSet<usize> = HashSet::new();
1120 let left_col_count = col_idx;
1121 col_idx = 0;
1122 for (_table_idx, table_schema) in right_schema.table_schemas.values() {
1123 for col in &table_schema.columns {
1124 let lowercase = col.name.to_lowercase();
1125 if left_column_map.contains_key(&lowercase) {
1126 right_duplicate_indices.insert(left_col_count + col_idx);
1128 }
1129 col_idx += 1;
1130 }
1131 }
1132
1133 if right_duplicate_indices.is_empty() {
1135 return Ok(result);
1136 }
1137
1138 let total_cols = left_col_count + col_idx;
1140 let keep_indices: Vec<usize> =
1141 (0..total_cols).filter(|i| !right_duplicate_indices.contains(i)).collect();
1142
1143 let mut new_schema = CombinedSchema { table_schemas: HashMap::new(), total_columns: 0 };
1145 for (table_name, (table_start_idx, table_schema)) in &result.schema.table_schemas {
1146 let mut new_cols = Vec::new();
1147
1148 for (idx, col) in table_schema.columns.iter().enumerate() {
1149 let abs_col_idx = table_start_idx + idx;
1151
1152 if keep_indices.contains(&abs_col_idx) {
1153 new_cols.push(col.clone());
1154 }
1155 }
1156
1157 if !new_cols.is_empty() {
1158 let new_table_schema =
1159 vibesql_catalog::TableSchema::new(table_schema.name.clone(), new_cols);
1160 new_schema
1161 .table_schemas
1162 .insert(table_name.clone(), (new_schema.total_columns, new_table_schema.clone()));
1163 new_schema.total_columns += new_table_schema.columns.len();
1164 }
1165 }
1166
1167 let rows = result.rows();
1169 let new_rows: Vec<vibesql_storage::Row> = rows
1170 .iter()
1171 .map(|row| {
1172 let new_values: Vec<vibesql_types::SqlValue> =
1173 keep_indices.iter().filter_map(|&i| row.values.get(i).cloned()).collect();
1174 vibesql_storage::Row::new(new_values)
1175 })
1176 .collect();
1177
1178 Ok(FromResult::from_rows(new_schema, new_rows))
1179}