1use anyhow::{anyhow, Result};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tracing::{debug, info};
7
8use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
9use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
10use crate::sql::parser::ast::{JoinClause, JoinOperator, JoinType};
11use crate::sql::recursive_parser::SqlExpression;
12
13pub struct HashJoinExecutor {
15 case_insensitive: bool,
16}
17
18impl HashJoinExecutor {
19 pub fn new(case_insensitive: bool) -> Self {
20 Self { case_insensitive }
21 }
22
23 pub fn execute_join(
25 &self,
26 left_table: Arc<DataTable>,
27 join_clause: &JoinClause,
28 right_table: Arc<DataTable>,
29 ) -> Result<DataTable> {
30 info!(
31 "Executing {:?} JOIN: {} rows x {} rows with {} conditions",
32 join_clause.join_type,
33 left_table.row_count(),
34 right_table.row_count(),
35 join_clause.condition.conditions.len()
36 );
37
38 let mut condition_indices = Vec::new();
41 let mut all_equal = true;
42 let mut has_complex_expr = false;
43
44 for single_condition in &join_clause.condition.conditions {
45 let right_col_name = Self::extract_simple_column_name(&single_condition.right_expr);
47
48 if right_col_name.is_none() {
49 has_complex_expr = true;
51 all_equal = false; break;
53 }
54
55 let (left_col_idx, right_col_idx) = self.resolve_join_columns(
56 &left_table,
57 &right_table,
58 &single_condition.left_column,
59 &right_col_name.unwrap(),
60 )?;
61
62 if single_condition.operator != JoinOperator::Equal {
63 all_equal = false;
64 }
65
66 condition_indices.push((
67 left_col_idx,
68 right_col_idx,
69 single_condition.operator.clone(),
70 ));
71 }
72
73 let use_hash_join = all_equal && !has_complex_expr;
77
78 match join_clause.join_type {
80 JoinType::Inner => {
81 if use_hash_join && condition_indices.len() == 1 {
82 let (left_col_idx, right_col_idx, _) = condition_indices[0];
84 let right_col_name = Self::extract_simple_column_name(
85 &join_clause.condition.conditions[0].right_expr,
86 )
87 .expect("right_expr should be a simple column in hash join path");
88 self.hash_join_inner(
89 left_table,
90 right_table,
91 left_col_idx,
92 right_col_idx,
93 &join_clause.condition.conditions[0].left_column,
94 &right_col_name,
95 &join_clause.alias,
96 )
97 } else {
98 self.nested_loop_join_inner_multi(
100 left_table,
101 right_table,
102 &join_clause.condition.conditions,
103 &join_clause.alias,
104 )
105 }
106 }
107 JoinType::Left => {
108 if use_hash_join && condition_indices.len() == 1 {
109 let (left_col_idx, right_col_idx, _) = condition_indices[0];
111 let right_col_name = Self::extract_simple_column_name(
112 &join_clause.condition.conditions[0].right_expr,
113 )
114 .expect("right_expr should be a simple column in hash join path");
115 self.hash_join_left(
116 left_table,
117 right_table,
118 left_col_idx,
119 right_col_idx,
120 &join_clause.condition.conditions[0].left_column,
121 &right_col_name,
122 &join_clause.alias,
123 )
124 } else {
125 self.nested_loop_join_left_multi(
127 left_table,
128 right_table,
129 &join_clause.condition.conditions,
130 &join_clause.alias,
131 )
132 }
133 }
134 JoinType::Right => {
135 let swapped_indices: Vec<(usize, usize, JoinOperator)> = condition_indices
137 .into_iter()
138 .map(|(l, r, op)| (r, l, self.reverse_operator(&op)))
139 .collect();
140
141 if use_hash_join && swapped_indices.len() == 1 {
142 let (right_col_idx, left_col_idx, _) = swapped_indices[0];
144 let right_col_name = Self::extract_simple_column_name(
145 &join_clause.condition.conditions[0].right_expr,
146 )
147 .expect("right_expr should be a simple column in hash join path");
148 self.hash_join_left(
149 right_table,
150 left_table,
151 right_col_idx,
152 left_col_idx,
153 &right_col_name,
154 &join_clause.condition.conditions[0].left_column,
155 &join_clause.alias,
156 )
157 } else {
158 self.nested_loop_join_left_multi(
161 right_table,
162 left_table,
163 &join_clause.condition.conditions,
164 &join_clause.alias,
165 )
166 }
167 }
168 JoinType::Cross => self.cross_join(left_table, right_table),
169 JoinType::Full => {
170 return Err(anyhow!("FULL OUTER JOIN not yet implemented"));
171 }
172 }
173 }
174
175 fn extract_simple_column_name(expr: &SqlExpression) -> Option<String> {
178 match expr {
179 SqlExpression::Column(col_ref) => {
180 if let Some(table_prefix) = &col_ref.table_prefix {
182 Some(format!("{}.{}", table_prefix, col_ref.name))
183 } else {
184 Some(col_ref.name.clone())
185 }
186 }
187 _ => None, }
189 }
190
191 fn resolve_join_columns(
193 &self,
194 left_table: &DataTable,
195 right_table: &DataTable,
196 left_col_name: &str,
197 right_col_name: &str,
198 ) -> Result<(usize, usize)> {
199 let left_col_idx = if let Ok(idx) = self.find_column_index(left_table, left_col_name) {
201 idx
202 } else if let Ok(_idx) = self.find_column_index(right_table, left_col_name) {
203 return Err(anyhow!(
206 "Column '{}' found in right table but specified as left operand. \
207 Please rewrite the condition with columns in correct positions.",
208 left_col_name
209 ));
210 } else {
211 return Err(anyhow!(
212 "Column '{}' not found in either table",
213 left_col_name
214 ));
215 };
216
217 let right_col_idx = if let Ok(idx) = self.find_column_index(right_table, right_col_name) {
219 idx
220 } else if let Ok(_idx) = self.find_column_index(left_table, right_col_name) {
221 return Err(anyhow!(
224 "Column '{}' found in left table but specified as right operand. \
225 Please rewrite the condition with columns in correct positions.",
226 right_col_name
227 ));
228 } else {
229 return Err(anyhow!(
230 "Column '{}' not found in either table",
231 right_col_name
232 ));
233 };
234
235 Ok((left_col_idx, right_col_idx))
236 }
237
238 fn find_column_index(&self, table: &DataTable, col_name: &str) -> Result<usize> {
240 let col_name = if let Some(dot_pos) = col_name.rfind('.') {
242 &col_name[dot_pos + 1..]
243 } else {
244 col_name
245 };
246
247 debug!(
248 "Looking for column '{}' in table with columns: {:?}",
249 col_name,
250 table.column_names()
251 );
252
253 table
254 .columns
255 .iter()
256 .position(|col| {
257 if self.case_insensitive {
258 col.name.to_lowercase() == col_name.to_lowercase()
259 } else {
260 col.name == col_name
261 }
262 })
263 .ok_or_else(|| anyhow!("Column '{}' not found in table", col_name))
264 }
265
266 fn hash_join_inner(
268 &self,
269 left_table: Arc<DataTable>,
270 right_table: Arc<DataTable>,
271 left_col_idx: usize,
272 right_col_idx: usize,
273 _left_col_name: &str,
274 _right_col_name: &str,
275 join_alias: &Option<String>,
276 ) -> Result<DataTable> {
277 let start = std::time::Instant::now();
278
279 let (build_table, probe_table, build_col_idx, probe_col_idx, build_is_left) =
281 if left_table.row_count() <= right_table.row_count() {
282 (
283 left_table.clone(),
284 right_table.clone(),
285 left_col_idx,
286 right_col_idx,
287 true,
288 )
289 } else {
290 (
291 right_table.clone(),
292 left_table.clone(),
293 right_col_idx,
294 left_col_idx,
295 false,
296 )
297 };
298
299 debug!(
300 "Building hash index on {} table ({} rows)",
301 if build_is_left { "left" } else { "right" },
302 build_table.row_count()
303 );
304
305 let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
307 for (row_idx, row) in build_table.rows.iter().enumerate() {
308 let key = row.values[build_col_idx].clone();
309 hash_index.entry(key).or_default().push(row_idx);
310 }
311
312 debug!(
313 "Hash index built with {} unique keys in {:?}",
314 hash_index.len(),
315 start.elapsed()
316 );
317
318 let mut result = DataTable::new("joined");
320
321 for col in &left_table.columns {
323 result.add_column(DataColumn {
324 name: col.name.clone(),
325 data_type: col.data_type.clone(),
326 nullable: col.nullable,
327 unique_values: col.unique_values,
328 null_count: col.null_count,
329 metadata: col.metadata.clone(),
330 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
333 }
334
335 for col in &right_table.columns {
337 if !left_table
339 .columns
340 .iter()
341 .any(|left_col| left_col.name == col.name)
342 {
343 result.add_column(DataColumn {
344 name: col.name.clone(),
345 data_type: col.data_type.clone(),
346 nullable: col.nullable,
347 unique_values: col.unique_values,
348 null_count: col.null_count,
349 metadata: col.metadata.clone(),
350 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
353 } else {
354 let (column_name, qualified_name) = if let Some(alias) = join_alias {
356 (
358 format!("{}.{}", alias, col.name),
359 Some(format!("{}.{}", alias, col.name)),
360 )
361 } else {
362 (format!("{}_right", col.name), col.qualified_name.clone())
364 };
365 result.add_column(DataColumn {
366 name: column_name,
367 data_type: col.data_type.clone(),
368 nullable: col.nullable,
369 unique_values: col.unique_values,
370 null_count: col.null_count,
371 metadata: col.metadata.clone(),
372 qualified_name,
373 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
374 });
375 }
376 }
377
378 debug!(
379 "Joined table will have {} columns: {:?}",
380 result.column_count(),
381 result.column_names()
382 );
383
384 let mut match_count = 0;
386 for probe_row in &probe_table.rows {
387 let probe_key = &probe_row.values[probe_col_idx];
388
389 if let Some(matching_indices) = hash_index.get(probe_key) {
390 for &build_idx in matching_indices {
391 let build_row = &build_table.rows[build_idx];
392
393 let mut joined_row = DataRow { values: Vec::new() };
395
396 if build_is_left {
397 joined_row.values.extend_from_slice(&build_row.values);
399 joined_row.values.extend_from_slice(&probe_row.values);
400 } else {
401 joined_row.values.extend_from_slice(&probe_row.values);
403 joined_row.values.extend_from_slice(&build_row.values);
404 }
405
406 result.add_row(joined_row);
407 match_count += 1;
408 }
409 }
410 }
411
412 let qualified_cols: Vec<String> = result
414 .columns
415 .iter()
416 .filter_map(|c| c.qualified_name.clone())
417 .collect();
418
419 info!(
420 "INNER JOIN complete: {} matches found in {:?}. Result has {} columns ({} qualified: {:?})",
421 match_count,
422 start.elapsed(),
423 result.columns.len(),
424 qualified_cols.len(),
425 qualified_cols
426 );
427
428 Ok(result)
429 }
430
431 fn hash_join_left(
433 &self,
434 left_table: Arc<DataTable>,
435 right_table: Arc<DataTable>,
436 left_col_idx: usize,
437 right_col_idx: usize,
438 _left_col_name: &str,
439 _right_col_name: &str,
440 join_alias: &Option<String>,
441 ) -> Result<DataTable> {
442 let start = std::time::Instant::now();
443
444 debug!(
445 "Building hash index on right table ({} rows)",
446 right_table.row_count()
447 );
448
449 let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
451 for (row_idx, row) in right_table.rows.iter().enumerate() {
452 let key = row.values[right_col_idx].clone();
453 hash_index.entry(key).or_default().push(row_idx);
454 }
455
456 let mut result = DataTable::new("joined");
458
459 for col in &left_table.columns {
461 result.add_column(DataColumn {
462 name: col.name.clone(),
463 data_type: col.data_type.clone(),
464 nullable: col.nullable,
465 unique_values: col.unique_values,
466 null_count: col.null_count,
467 metadata: col.metadata.clone(),
468 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
471 }
472
473 for col in &right_table.columns {
475 if !left_table
477 .columns
478 .iter()
479 .any(|left_col| left_col.name == col.name)
480 {
481 result.add_column(DataColumn {
482 name: col.name.clone(),
483 data_type: col.data_type.clone(),
484 nullable: true, unique_values: col.unique_values,
486 null_count: col.null_count,
487 metadata: col.metadata.clone(),
488 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
491 } else {
492 let (column_name, qualified_name) = if let Some(alias) = join_alias {
494 (
496 format!("{}.{}", alias, col.name),
497 Some(format!("{}.{}", alias, col.name)),
498 )
499 } else {
500 (format!("{}_right", col.name), col.qualified_name.clone())
502 };
503 result.add_column(DataColumn {
504 name: column_name,
505 data_type: col.data_type.clone(),
506 nullable: true, unique_values: col.unique_values,
508 null_count: col.null_count,
509 metadata: col.metadata.clone(),
510 qualified_name,
511 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
512 });
513 }
514 }
515
516 debug!(
517 "LEFT JOIN table will have {} columns: {:?}",
518 result.column_count(),
519 result.column_names()
520 );
521
522 let mut match_count = 0;
524 let mut null_count = 0;
525
526 for left_row in &left_table.rows {
527 let left_key = &left_row.values[left_col_idx];
528
529 if let Some(matching_indices) = hash_index.get(left_key) {
530 for &right_idx in matching_indices {
532 let right_row = &right_table.rows[right_idx];
533
534 let mut joined_row = DataRow { values: Vec::new() };
535 joined_row.values.extend_from_slice(&left_row.values);
536 joined_row.values.extend_from_slice(&right_row.values);
537
538 result.add_row(joined_row);
539 match_count += 1;
540 }
541 } else {
542 let mut joined_row = DataRow { values: Vec::new() };
544 joined_row.values.extend_from_slice(&left_row.values);
545
546 for _ in 0..right_table.column_count() {
548 joined_row.values.push(DataValue::Null);
549 }
550
551 result.add_row(joined_row);
552 null_count += 1;
553 }
554 }
555
556 let qualified_cols: Vec<String> = result
558 .columns
559 .iter()
560 .filter_map(|c| c.qualified_name.clone())
561 .collect();
562
563 info!(
564 "LEFT JOIN complete: {} matches, {} nulls in {:?}. Result has {} columns ({} qualified: {:?})",
565 match_count,
566 null_count,
567 start.elapsed(),
568 result.columns.len(),
569 qualified_cols.len(),
570 qualified_cols
571 );
572
573 Ok(result)
574 }
575
576 fn cross_join(
578 &self,
579 left_table: Arc<DataTable>,
580 right_table: Arc<DataTable>,
581 ) -> Result<DataTable> {
582 let start = std::time::Instant::now();
583
584 let result_rows = left_table.row_count() * right_table.row_count();
586 if result_rows > 1_000_000 {
587 return Err(anyhow!(
588 "CROSS JOIN would produce {} rows, which exceeds the safety limit",
589 result_rows
590 ));
591 }
592
593 let mut result = DataTable::new("joined");
595
596 for col in &left_table.columns {
598 result.add_column(col.clone());
599 }
600 for col in &right_table.columns {
601 result.add_column(col.clone());
602 }
603
604 for left_row in &left_table.rows {
606 for right_row in &right_table.rows {
607 let mut joined_row = DataRow { values: Vec::new() };
608 joined_row.values.extend_from_slice(&left_row.values);
609 joined_row.values.extend_from_slice(&right_row.values);
610 result.add_row(joined_row);
611 }
612 }
613
614 info!(
615 "CROSS JOIN complete: {} rows in {:?}",
616 result.row_count(),
617 start.elapsed()
618 );
619
620 Ok(result)
621 }
622
623 fn qualify_column_name(
625 &self,
626 col_name: &str,
627 table_side: &str,
628 left_join_col: &str,
629 right_join_col: &str,
630 ) -> String {
631 let base_name = if let Some(dot_pos) = col_name.rfind('.') {
633 &col_name[dot_pos + 1..]
634 } else {
635 col_name
636 };
637
638 let left_base = if let Some(dot_pos) = left_join_col.rfind('.') {
639 &left_join_col[dot_pos + 1..]
640 } else {
641 left_join_col
642 };
643
644 let right_base = if let Some(dot_pos) = right_join_col.rfind('.') {
645 &right_join_col[dot_pos + 1..]
646 } else {
647 right_join_col
648 };
649
650 if base_name == left_base || base_name == right_base {
652 format!("{}_{}", table_side, base_name)
653 } else {
654 col_name.to_string()
655 }
656 }
657
658 fn reverse_operator(&self, op: &JoinOperator) -> JoinOperator {
660 match op {
661 JoinOperator::Equal => JoinOperator::Equal,
662 JoinOperator::NotEqual => JoinOperator::NotEqual,
663 JoinOperator::LessThan => JoinOperator::GreaterThan,
664 JoinOperator::GreaterThan => JoinOperator::LessThan,
665 JoinOperator::LessThanOrEqual => JoinOperator::GreaterThanOrEqual,
666 JoinOperator::GreaterThanOrEqual => JoinOperator::LessThanOrEqual,
667 }
668 }
669
670 fn compare_values(&self, left: &DataValue, right: &DataValue, op: &JoinOperator) -> bool {
672 match op {
673 JoinOperator::Equal => left == right,
674 JoinOperator::NotEqual => left != right,
675 JoinOperator::LessThan => left < right,
676 JoinOperator::GreaterThan => left > right,
677 JoinOperator::LessThanOrEqual => left <= right,
678 JoinOperator::GreaterThanOrEqual => left >= right,
679 }
680 }
681
682 fn nested_loop_join_inner(
684 &self,
685 left_table: Arc<DataTable>,
686 right_table: Arc<DataTable>,
687 left_col_idx: usize,
688 right_col_idx: usize,
689 operator: &JoinOperator,
690 join_alias: &Option<String>,
691 ) -> Result<DataTable> {
692 let start = std::time::Instant::now();
693
694 info!(
695 "Executing nested loop INNER JOIN with {:?} operator: {} x {} rows",
696 operator,
697 left_table.row_count(),
698 right_table.row_count()
699 );
700
701 let mut result = DataTable::new("joined");
703
704 for col in &left_table.columns {
706 result.add_column(DataColumn {
707 name: col.name.clone(),
708 data_type: col.data_type.clone(),
709 nullable: col.nullable,
710 unique_values: col.unique_values,
711 null_count: col.null_count,
712 metadata: col.metadata.clone(),
713 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
716 }
717
718 for col in &right_table.columns {
720 if !left_table
721 .columns
722 .iter()
723 .any(|left_col| left_col.name == col.name)
724 {
725 result.add_column(DataColumn {
726 name: col.name.clone(),
727 data_type: col.data_type.clone(),
728 nullable: col.nullable,
729 unique_values: col.unique_values,
730 null_count: col.null_count,
731 metadata: col.metadata.clone(),
732 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
735 } else {
736 let (column_name, qualified_name) = if let Some(alias) = join_alias {
737 (
739 format!("{}.{}", alias, col.name),
740 Some(format!("{}.{}", alias, col.name)),
741 )
742 } else {
743 (format!("{}_right", col.name), col.qualified_name.clone())
745 };
746 result.add_column(DataColumn {
747 name: column_name,
748 data_type: col.data_type.clone(),
749 nullable: col.nullable,
750 unique_values: col.unique_values,
751 null_count: col.null_count,
752 metadata: col.metadata.clone(),
753 qualified_name,
754 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
755 });
756 }
757 }
758
759 let mut match_count = 0;
761 for left_row in &left_table.rows {
762 let left_value = &left_row.values[left_col_idx];
763
764 for right_row in &right_table.rows {
765 let right_value = &right_row.values[right_col_idx];
766
767 if self.compare_values(left_value, right_value, operator) {
768 let mut joined_row = DataRow { values: Vec::new() };
769 joined_row.values.extend_from_slice(&left_row.values);
770 joined_row.values.extend_from_slice(&right_row.values);
771 result.add_row(joined_row);
772 match_count += 1;
773 }
774 }
775 }
776
777 info!(
778 "Nested loop INNER JOIN complete: {} matches found in {:?}",
779 match_count,
780 start.elapsed()
781 );
782
783 Ok(result)
784 }
785
786 fn nested_loop_join_inner_multi(
788 &self,
789 left_table: Arc<DataTable>,
790 right_table: Arc<DataTable>,
791 conditions: &[crate::sql::parser::ast::SingleJoinCondition],
792 join_alias: &Option<String>,
793 ) -> Result<DataTable> {
794 let start = std::time::Instant::now();
795
796 info!(
797 "Executing nested loop INNER JOIN with {} conditions: {} x {} rows",
798 conditions.len(),
799 left_table.row_count(),
800 right_table.row_count()
801 );
802
803 let mut result = DataTable::new("joined");
805
806 for col in &left_table.columns {
808 result.add_column(DataColumn {
809 name: col.name.clone(),
810 data_type: col.data_type.clone(),
811 nullable: col.nullable,
812 unique_values: col.unique_values,
813 null_count: col.null_count,
814 metadata: col.metadata.clone(),
815 qualified_name: col.qualified_name.clone(),
816 source_table: col.source_table.clone(),
817 });
818 }
819
820 for col in &right_table.columns {
822 if !left_table
823 .columns
824 .iter()
825 .any(|left_col| left_col.name == col.name)
826 {
827 result.add_column(DataColumn {
828 name: col.name.clone(),
829 data_type: col.data_type.clone(),
830 nullable: col.nullable,
831 unique_values: col.unique_values,
832 null_count: col.null_count,
833 metadata: col.metadata.clone(),
834 qualified_name: col.qualified_name.clone(),
835 source_table: col.source_table.clone(),
836 });
837 } else {
838 let (column_name, qualified_name) = if let Some(alias) = join_alias {
839 (
840 format!("{}.{}", alias, col.name),
841 Some(format!("{}.{}", alias, col.name)),
842 )
843 } else {
844 (format!("{}_right", col.name), col.qualified_name.clone())
845 };
846 result.add_column(DataColumn {
847 name: column_name,
848 data_type: col.data_type.clone(),
849 nullable: col.nullable,
850 unique_values: col.unique_values,
851 null_count: col.null_count,
852 metadata: col.metadata.clone(),
853 qualified_name,
854 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
855 });
856 }
857 }
858
859 let mut left_col_indices = Vec::new();
861 for condition in conditions {
862 let left_idx = self.find_column_index(&left_table, &condition.left_column)?;
863 left_col_indices.push(left_idx);
864 }
865
866 let mut right_evaluator = ArithmeticEvaluator::new(&right_table);
868
869 let mut match_count = 0;
871 for left_row in &left_table.rows {
872 for (right_row_idx, right_row) in right_table.rows.iter().enumerate() {
873 let mut all_conditions_met = true;
875 for (i, condition) in conditions.iter().enumerate() {
876 let left_value = &left_row.values[left_col_indices[i]];
877
878 let right_value =
880 match right_evaluator.evaluate(&condition.right_expr, right_row_idx) {
881 Ok(val) => val,
882 Err(_) => {
883 all_conditions_met = false;
884 break;
885 }
886 };
887
888 if !self.compare_values(left_value, &right_value, &condition.operator) {
889 all_conditions_met = false;
890 break;
891 }
892 }
893
894 if all_conditions_met {
895 let mut joined_row = DataRow { values: Vec::new() };
896 joined_row.values.extend_from_slice(&left_row.values);
897 joined_row.values.extend_from_slice(&right_row.values);
898 result.add_row(joined_row);
899 match_count += 1;
900 }
901 }
902 }
903
904 info!(
905 "Nested loop INNER JOIN complete: {} matches found in {:?}",
906 match_count,
907 start.elapsed()
908 );
909
910 Ok(result)
911 }
912
913 fn nested_loop_join_left_multi(
915 &self,
916 left_table: Arc<DataTable>,
917 right_table: Arc<DataTable>,
918 conditions: &[crate::sql::parser::ast::SingleJoinCondition],
919 join_alias: &Option<String>,
920 ) -> Result<DataTable> {
921 let start = std::time::Instant::now();
922
923 info!(
924 "Executing nested loop LEFT JOIN with {} conditions: {} x {} rows",
925 conditions.len(),
926 left_table.row_count(),
927 right_table.row_count()
928 );
929
930 let mut result = DataTable::new("joined");
932
933 for col in &left_table.columns {
935 result.add_column(DataColumn {
936 name: col.name.clone(),
937 data_type: col.data_type.clone(),
938 nullable: col.nullable,
939 unique_values: col.unique_values,
940 null_count: col.null_count,
941 metadata: col.metadata.clone(),
942 qualified_name: col.qualified_name.clone(),
943 source_table: col.source_table.clone(),
944 });
945 }
946
947 for col in &right_table.columns {
949 if !left_table
950 .columns
951 .iter()
952 .any(|left_col| left_col.name == col.name)
953 {
954 result.add_column(DataColumn {
955 name: col.name.clone(),
956 data_type: col.data_type.clone(),
957 nullable: true, unique_values: col.unique_values,
959 null_count: col.null_count,
960 metadata: col.metadata.clone(),
961 qualified_name: col.qualified_name.clone(),
962 source_table: col.source_table.clone(),
963 });
964 } else {
965 let (column_name, qualified_name) = if let Some(alias) = join_alias {
966 (
967 format!("{}.{}", alias, col.name),
968 Some(format!("{}.{}", alias, col.name)),
969 )
970 } else {
971 (format!("{}_right", col.name), col.qualified_name.clone())
972 };
973 result.add_column(DataColumn {
974 name: column_name,
975 data_type: col.data_type.clone(),
976 nullable: true, unique_values: col.unique_values,
978 null_count: col.null_count,
979 metadata: col.metadata.clone(),
980 qualified_name,
981 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
982 });
983 }
984 }
985
986 let mut left_col_indices = Vec::new();
988 for condition in conditions {
989 let left_idx = self.find_column_index(&left_table, &condition.left_column)?;
990 left_col_indices.push(left_idx);
991 }
992
993 let mut right_evaluator = ArithmeticEvaluator::new(&right_table);
995
996 let mut match_count = 0;
998 let mut null_count = 0;
999
1000 for left_row in &left_table.rows {
1001 let mut found_match = false;
1002
1003 for (right_row_idx, right_row) in right_table.rows.iter().enumerate() {
1004 let mut all_conditions_met = true;
1006 for (i, condition) in conditions.iter().enumerate() {
1007 let left_value = &left_row.values[left_col_indices[i]];
1008
1009 let right_value =
1011 match right_evaluator.evaluate(&condition.right_expr, right_row_idx) {
1012 Ok(val) => val,
1013 Err(_) => {
1014 all_conditions_met = false;
1015 break;
1016 }
1017 };
1018
1019 if !self.compare_values(left_value, &right_value, &condition.operator) {
1020 all_conditions_met = false;
1021 break;
1022 }
1023 }
1024
1025 if all_conditions_met {
1026 let mut joined_row = DataRow { values: Vec::new() };
1027 joined_row.values.extend_from_slice(&left_row.values);
1028 joined_row.values.extend_from_slice(&right_row.values);
1029 result.add_row(joined_row);
1030 match_count += 1;
1031 found_match = true;
1032 }
1033 }
1034
1035 if !found_match {
1037 let mut joined_row = DataRow { values: Vec::new() };
1038 joined_row.values.extend_from_slice(&left_row.values);
1039 for _ in 0..right_table.column_count() {
1040 joined_row.values.push(DataValue::Null);
1041 }
1042 result.add_row(joined_row);
1043 null_count += 1;
1044 }
1045 }
1046
1047 info!(
1048 "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
1049 match_count,
1050 null_count,
1051 start.elapsed()
1052 );
1053
1054 Ok(result)
1055 }
1056
1057 fn nested_loop_join_left(
1059 &self,
1060 left_table: Arc<DataTable>,
1061 right_table: Arc<DataTable>,
1062 left_col_idx: usize,
1063 right_col_idx: usize,
1064 operator: &JoinOperator,
1065 join_alias: &Option<String>,
1066 ) -> Result<DataTable> {
1067 let start = std::time::Instant::now();
1068
1069 info!(
1070 "Executing nested loop LEFT JOIN with {:?} operator: {} x {} rows",
1071 operator,
1072 left_table.row_count(),
1073 right_table.row_count()
1074 );
1075
1076 let mut result = DataTable::new("joined");
1078
1079 for col in &left_table.columns {
1081 result.add_column(DataColumn {
1082 name: col.name.clone(),
1083 data_type: col.data_type.clone(),
1084 nullable: col.nullable,
1085 unique_values: col.unique_values,
1086 null_count: col.null_count,
1087 metadata: col.metadata.clone(),
1088 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
1091 }
1092
1093 for col in &right_table.columns {
1095 if !left_table
1096 .columns
1097 .iter()
1098 .any(|left_col| left_col.name == col.name)
1099 {
1100 result.add_column(DataColumn {
1101 name: col.name.clone(),
1102 data_type: col.data_type.clone(),
1103 nullable: true, unique_values: col.unique_values,
1105 null_count: col.null_count,
1106 metadata: col.metadata.clone(),
1107 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
1110 } else {
1111 let (column_name, qualified_name) = if let Some(alias) = join_alias {
1112 (
1114 format!("{}.{}", alias, col.name),
1115 Some(format!("{}.{}", alias, col.name)),
1116 )
1117 } else {
1118 (format!("{}_right", col.name), col.qualified_name.clone())
1120 };
1121 result.add_column(DataColumn {
1122 name: column_name,
1123 data_type: col.data_type.clone(),
1124 nullable: true, unique_values: col.unique_values,
1126 null_count: col.null_count,
1127 metadata: col.metadata.clone(),
1128 qualified_name,
1129 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
1130 });
1131 }
1132 }
1133
1134 let mut match_count = 0;
1136 let mut null_count = 0;
1137
1138 for left_row in &left_table.rows {
1139 let left_value = &left_row.values[left_col_idx];
1140 let mut found_match = false;
1141
1142 for right_row in &right_table.rows {
1143 let right_value = &right_row.values[right_col_idx];
1144
1145 if self.compare_values(left_value, right_value, operator) {
1146 let mut joined_row = DataRow { values: Vec::new() };
1147 joined_row.values.extend_from_slice(&left_row.values);
1148 joined_row.values.extend_from_slice(&right_row.values);
1149 result.add_row(joined_row);
1150 match_count += 1;
1151 found_match = true;
1152 }
1153 }
1154
1155 if !found_match {
1157 let mut joined_row = DataRow { values: Vec::new() };
1158 joined_row.values.extend_from_slice(&left_row.values);
1159 for _ in 0..right_table.column_count() {
1160 joined_row.values.push(DataValue::Null);
1161 }
1162 result.add_row(joined_row);
1163 null_count += 1;
1164 }
1165 }
1166
1167 info!(
1168 "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
1169 match_count,
1170 null_count,
1171 start.elapsed()
1172 );
1173
1174 Ok(result)
1175 }
1176}