1use anyhow::{anyhow, Result};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tracing::{debug, info};
7
8use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
9use crate::sql::parser::ast::{JoinClause, JoinOperator, JoinType};
10
11pub struct HashJoinExecutor {
13 case_insensitive: bool,
14}
15
16impl HashJoinExecutor {
17 pub fn new(case_insensitive: bool) -> Self {
18 Self { case_insensitive }
19 }
20
21 pub fn execute_join(
23 &self,
24 left_table: Arc<DataTable>,
25 join_clause: &JoinClause,
26 right_table: Arc<DataTable>,
27 ) -> Result<DataTable> {
28 info!(
29 "Executing {:?} JOIN: {} rows x {} rows with {} conditions",
30 join_clause.join_type,
31 left_table.row_count(),
32 right_table.row_count(),
33 join_clause.condition.conditions.len()
34 );
35
36 let mut condition_indices = Vec::new();
38 let mut all_equal = true;
39
40 for single_condition in &join_clause.condition.conditions {
41 let (left_col_idx, right_col_idx) = self.resolve_join_columns(
42 &left_table,
43 &right_table,
44 &single_condition.left_column,
45 &single_condition.right_column,
46 )?;
47
48 if single_condition.operator != JoinOperator::Equal {
49 all_equal = false;
50 }
51
52 condition_indices.push((
53 left_col_idx,
54 right_col_idx,
55 single_condition.operator.clone(),
56 ));
57 }
58
59 let use_hash_join = all_equal;
61
62 match join_clause.join_type {
64 JoinType::Inner => {
65 if use_hash_join && condition_indices.len() == 1 {
66 let (left_col_idx, right_col_idx, _) = condition_indices[0];
68 self.hash_join_inner(
69 left_table,
70 right_table,
71 left_col_idx,
72 right_col_idx,
73 &join_clause.condition.conditions[0].left_column,
74 &join_clause.condition.conditions[0].right_column,
75 &join_clause.alias,
76 )
77 } else {
78 self.nested_loop_join_inner_multi(
80 left_table,
81 right_table,
82 condition_indices,
83 &join_clause.alias,
84 )
85 }
86 }
87 JoinType::Left => {
88 if use_hash_join && condition_indices.len() == 1 {
89 let (left_col_idx, right_col_idx, _) = condition_indices[0];
91 self.hash_join_left(
92 left_table,
93 right_table,
94 left_col_idx,
95 right_col_idx,
96 &join_clause.condition.conditions[0].left_column,
97 &join_clause.condition.conditions[0].right_column,
98 &join_clause.alias,
99 )
100 } else {
101 self.nested_loop_join_left_multi(
103 left_table,
104 right_table,
105 condition_indices,
106 &join_clause.alias,
107 )
108 }
109 }
110 JoinType::Right => {
111 let swapped_indices: Vec<(usize, usize, JoinOperator)> = condition_indices
113 .into_iter()
114 .map(|(l, r, op)| (r, l, self.reverse_operator(&op)))
115 .collect();
116
117 if use_hash_join && swapped_indices.len() == 1 {
118 let (right_col_idx, left_col_idx, _) = swapped_indices[0];
120 self.hash_join_left(
121 right_table,
122 left_table,
123 right_col_idx,
124 left_col_idx,
125 &join_clause.condition.conditions[0].right_column,
126 &join_clause.condition.conditions[0].left_column,
127 &join_clause.alias,
128 )
129 } else {
130 self.nested_loop_join_left_multi(
132 right_table,
133 left_table,
134 swapped_indices,
135 &join_clause.alias,
136 )
137 }
138 }
139 JoinType::Cross => self.cross_join(left_table, right_table),
140 JoinType::Full => {
141 return Err(anyhow!("FULL OUTER JOIN not yet implemented"));
142 }
143 }
144 }
145
146 fn resolve_join_columns(
148 &self,
149 left_table: &DataTable,
150 right_table: &DataTable,
151 left_col_name: &str,
152 right_col_name: &str,
153 ) -> Result<(usize, usize)> {
154 let left_col_idx = if let Ok(idx) = self.find_column_index(left_table, left_col_name) {
156 idx
157 } else if let Ok(_idx) = self.find_column_index(right_table, left_col_name) {
158 return Err(anyhow!(
161 "Column '{}' found in right table but specified as left operand. \
162 Please rewrite the condition with columns in correct positions.",
163 left_col_name
164 ));
165 } else {
166 return Err(anyhow!(
167 "Column '{}' not found in either table",
168 left_col_name
169 ));
170 };
171
172 let right_col_idx = if let Ok(idx) = self.find_column_index(right_table, right_col_name) {
174 idx
175 } else if let Ok(_idx) = self.find_column_index(left_table, right_col_name) {
176 return Err(anyhow!(
179 "Column '{}' found in left table but specified as right operand. \
180 Please rewrite the condition with columns in correct positions.",
181 right_col_name
182 ));
183 } else {
184 return Err(anyhow!(
185 "Column '{}' not found in either table",
186 right_col_name
187 ));
188 };
189
190 Ok((left_col_idx, right_col_idx))
191 }
192
193 fn find_column_index(&self, table: &DataTable, col_name: &str) -> Result<usize> {
195 let col_name = if let Some(dot_pos) = col_name.rfind('.') {
197 &col_name[dot_pos + 1..]
198 } else {
199 col_name
200 };
201
202 debug!(
203 "Looking for column '{}' in table with columns: {:?}",
204 col_name,
205 table.column_names()
206 );
207
208 table
209 .columns
210 .iter()
211 .position(|col| {
212 if self.case_insensitive {
213 col.name.to_lowercase() == col_name.to_lowercase()
214 } else {
215 col.name == col_name
216 }
217 })
218 .ok_or_else(|| anyhow!("Column '{}' not found in table", col_name))
219 }
220
221 fn hash_join_inner(
223 &self,
224 left_table: Arc<DataTable>,
225 right_table: Arc<DataTable>,
226 left_col_idx: usize,
227 right_col_idx: usize,
228 _left_col_name: &str,
229 _right_col_name: &str,
230 join_alias: &Option<String>,
231 ) -> Result<DataTable> {
232 let start = std::time::Instant::now();
233
234 let (build_table, probe_table, build_col_idx, probe_col_idx, build_is_left) =
236 if left_table.row_count() <= right_table.row_count() {
237 (
238 left_table.clone(),
239 right_table.clone(),
240 left_col_idx,
241 right_col_idx,
242 true,
243 )
244 } else {
245 (
246 right_table.clone(),
247 left_table.clone(),
248 right_col_idx,
249 left_col_idx,
250 false,
251 )
252 };
253
254 debug!(
255 "Building hash index on {} table ({} rows)",
256 if build_is_left { "left" } else { "right" },
257 build_table.row_count()
258 );
259
260 let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
262 for (row_idx, row) in build_table.rows.iter().enumerate() {
263 let key = row.values[build_col_idx].clone();
264 hash_index.entry(key).or_default().push(row_idx);
265 }
266
267 debug!(
268 "Hash index built with {} unique keys in {:?}",
269 hash_index.len(),
270 start.elapsed()
271 );
272
273 let mut result = DataTable::new("joined");
275
276 for col in &left_table.columns {
278 result.add_column(DataColumn {
279 name: col.name.clone(),
280 data_type: col.data_type.clone(),
281 nullable: col.nullable,
282 unique_values: col.unique_values,
283 null_count: col.null_count,
284 metadata: col.metadata.clone(),
285 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
288 }
289
290 for col in &right_table.columns {
292 if !left_table
294 .columns
295 .iter()
296 .any(|left_col| left_col.name == col.name)
297 {
298 result.add_column(DataColumn {
299 name: col.name.clone(),
300 data_type: col.data_type.clone(),
301 nullable: col.nullable,
302 unique_values: col.unique_values,
303 null_count: col.null_count,
304 metadata: col.metadata.clone(),
305 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
308 } else {
309 let (column_name, qualified_name) = if let Some(alias) = join_alias {
311 (
313 format!("{}.{}", alias, col.name),
314 Some(format!("{}.{}", alias, col.name)),
315 )
316 } else {
317 (format!("{}_right", col.name), col.qualified_name.clone())
319 };
320 result.add_column(DataColumn {
321 name: column_name,
322 data_type: col.data_type.clone(),
323 nullable: col.nullable,
324 unique_values: col.unique_values,
325 null_count: col.null_count,
326 metadata: col.metadata.clone(),
327 qualified_name,
328 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
329 });
330 }
331 }
332
333 debug!(
334 "Joined table will have {} columns: {:?}",
335 result.column_count(),
336 result.column_names()
337 );
338
339 let mut match_count = 0;
341 for probe_row in &probe_table.rows {
342 let probe_key = &probe_row.values[probe_col_idx];
343
344 if let Some(matching_indices) = hash_index.get(probe_key) {
345 for &build_idx in matching_indices {
346 let build_row = &build_table.rows[build_idx];
347
348 let mut joined_row = DataRow { values: Vec::new() };
350
351 if build_is_left {
352 joined_row.values.extend_from_slice(&build_row.values);
354 joined_row.values.extend_from_slice(&probe_row.values);
355 } else {
356 joined_row.values.extend_from_slice(&probe_row.values);
358 joined_row.values.extend_from_slice(&build_row.values);
359 }
360
361 result.add_row(joined_row);
362 match_count += 1;
363 }
364 }
365 }
366
367 let qualified_cols: Vec<String> = result
369 .columns
370 .iter()
371 .filter_map(|c| c.qualified_name.clone())
372 .collect();
373
374 info!(
375 "INNER JOIN complete: {} matches found in {:?}. Result has {} columns ({} qualified: {:?})",
376 match_count,
377 start.elapsed(),
378 result.columns.len(),
379 qualified_cols.len(),
380 qualified_cols
381 );
382
383 Ok(result)
384 }
385
386 fn hash_join_left(
388 &self,
389 left_table: Arc<DataTable>,
390 right_table: Arc<DataTable>,
391 left_col_idx: usize,
392 right_col_idx: usize,
393 _left_col_name: &str,
394 _right_col_name: &str,
395 join_alias: &Option<String>,
396 ) -> Result<DataTable> {
397 let start = std::time::Instant::now();
398
399 debug!(
400 "Building hash index on right table ({} rows)",
401 right_table.row_count()
402 );
403
404 let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
406 for (row_idx, row) in right_table.rows.iter().enumerate() {
407 let key = row.values[right_col_idx].clone();
408 hash_index.entry(key).or_default().push(row_idx);
409 }
410
411 let mut result = DataTable::new("joined");
413
414 for col in &left_table.columns {
416 result.add_column(DataColumn {
417 name: col.name.clone(),
418 data_type: col.data_type.clone(),
419 nullable: col.nullable,
420 unique_values: col.unique_values,
421 null_count: col.null_count,
422 metadata: col.metadata.clone(),
423 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
426 }
427
428 for col in &right_table.columns {
430 if !left_table
432 .columns
433 .iter()
434 .any(|left_col| left_col.name == col.name)
435 {
436 result.add_column(DataColumn {
437 name: col.name.clone(),
438 data_type: col.data_type.clone(),
439 nullable: true, unique_values: col.unique_values,
441 null_count: col.null_count,
442 metadata: col.metadata.clone(),
443 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
446 } else {
447 let (column_name, qualified_name) = if let Some(alias) = join_alias {
449 (
451 format!("{}.{}", alias, col.name),
452 Some(format!("{}.{}", alias, col.name)),
453 )
454 } else {
455 (format!("{}_right", col.name), col.qualified_name.clone())
457 };
458 result.add_column(DataColumn {
459 name: column_name,
460 data_type: col.data_type.clone(),
461 nullable: true, unique_values: col.unique_values,
463 null_count: col.null_count,
464 metadata: col.metadata.clone(),
465 qualified_name,
466 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
467 });
468 }
469 }
470
471 debug!(
472 "LEFT JOIN table will have {} columns: {:?}",
473 result.column_count(),
474 result.column_names()
475 );
476
477 let mut match_count = 0;
479 let mut null_count = 0;
480
481 for left_row in &left_table.rows {
482 let left_key = &left_row.values[left_col_idx];
483
484 if let Some(matching_indices) = hash_index.get(left_key) {
485 for &right_idx in matching_indices {
487 let right_row = &right_table.rows[right_idx];
488
489 let mut joined_row = DataRow { values: Vec::new() };
490 joined_row.values.extend_from_slice(&left_row.values);
491 joined_row.values.extend_from_slice(&right_row.values);
492
493 result.add_row(joined_row);
494 match_count += 1;
495 }
496 } else {
497 let mut joined_row = DataRow { values: Vec::new() };
499 joined_row.values.extend_from_slice(&left_row.values);
500
501 for _ in 0..right_table.column_count() {
503 joined_row.values.push(DataValue::Null);
504 }
505
506 result.add_row(joined_row);
507 null_count += 1;
508 }
509 }
510
511 let qualified_cols: Vec<String> = result
513 .columns
514 .iter()
515 .filter_map(|c| c.qualified_name.clone())
516 .collect();
517
518 info!(
519 "LEFT JOIN complete: {} matches, {} nulls in {:?}. Result has {} columns ({} qualified: {:?})",
520 match_count,
521 null_count,
522 start.elapsed(),
523 result.columns.len(),
524 qualified_cols.len(),
525 qualified_cols
526 );
527
528 Ok(result)
529 }
530
531 fn cross_join(
533 &self,
534 left_table: Arc<DataTable>,
535 right_table: Arc<DataTable>,
536 ) -> Result<DataTable> {
537 let start = std::time::Instant::now();
538
539 let result_rows = left_table.row_count() * right_table.row_count();
541 if result_rows > 1_000_000 {
542 return Err(anyhow!(
543 "CROSS JOIN would produce {} rows, which exceeds the safety limit",
544 result_rows
545 ));
546 }
547
548 let mut result = DataTable::new("joined");
550
551 for col in &left_table.columns {
553 result.add_column(col.clone());
554 }
555 for col in &right_table.columns {
556 result.add_column(col.clone());
557 }
558
559 for left_row in &left_table.rows {
561 for right_row in &right_table.rows {
562 let mut joined_row = DataRow { values: Vec::new() };
563 joined_row.values.extend_from_slice(&left_row.values);
564 joined_row.values.extend_from_slice(&right_row.values);
565 result.add_row(joined_row);
566 }
567 }
568
569 info!(
570 "CROSS JOIN complete: {} rows in {:?}",
571 result.row_count(),
572 start.elapsed()
573 );
574
575 Ok(result)
576 }
577
578 fn qualify_column_name(
580 &self,
581 col_name: &str,
582 table_side: &str,
583 left_join_col: &str,
584 right_join_col: &str,
585 ) -> String {
586 let base_name = if let Some(dot_pos) = col_name.rfind('.') {
588 &col_name[dot_pos + 1..]
589 } else {
590 col_name
591 };
592
593 let left_base = if let Some(dot_pos) = left_join_col.rfind('.') {
594 &left_join_col[dot_pos + 1..]
595 } else {
596 left_join_col
597 };
598
599 let right_base = if let Some(dot_pos) = right_join_col.rfind('.') {
600 &right_join_col[dot_pos + 1..]
601 } else {
602 right_join_col
603 };
604
605 if base_name == left_base || base_name == right_base {
607 format!("{}_{}", table_side, base_name)
608 } else {
609 col_name.to_string()
610 }
611 }
612
613 fn reverse_operator(&self, op: &JoinOperator) -> JoinOperator {
615 match op {
616 JoinOperator::Equal => JoinOperator::Equal,
617 JoinOperator::NotEqual => JoinOperator::NotEqual,
618 JoinOperator::LessThan => JoinOperator::GreaterThan,
619 JoinOperator::GreaterThan => JoinOperator::LessThan,
620 JoinOperator::LessThanOrEqual => JoinOperator::GreaterThanOrEqual,
621 JoinOperator::GreaterThanOrEqual => JoinOperator::LessThanOrEqual,
622 }
623 }
624
625 fn compare_values(&self, left: &DataValue, right: &DataValue, op: &JoinOperator) -> bool {
627 match op {
628 JoinOperator::Equal => left == right,
629 JoinOperator::NotEqual => left != right,
630 JoinOperator::LessThan => left < right,
631 JoinOperator::GreaterThan => left > right,
632 JoinOperator::LessThanOrEqual => left <= right,
633 JoinOperator::GreaterThanOrEqual => left >= right,
634 }
635 }
636
637 fn nested_loop_join_inner(
639 &self,
640 left_table: Arc<DataTable>,
641 right_table: Arc<DataTable>,
642 left_col_idx: usize,
643 right_col_idx: usize,
644 operator: &JoinOperator,
645 join_alias: &Option<String>,
646 ) -> Result<DataTable> {
647 let start = std::time::Instant::now();
648
649 info!(
650 "Executing nested loop INNER JOIN with {:?} operator: {} x {} rows",
651 operator,
652 left_table.row_count(),
653 right_table.row_count()
654 );
655
656 let mut result = DataTable::new("joined");
658
659 for col in &left_table.columns {
661 result.add_column(DataColumn {
662 name: col.name.clone(),
663 data_type: col.data_type.clone(),
664 nullable: col.nullable,
665 unique_values: col.unique_values,
666 null_count: col.null_count,
667 metadata: col.metadata.clone(),
668 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
671 }
672
673 for col in &right_table.columns {
675 if !left_table
676 .columns
677 .iter()
678 .any(|left_col| left_col.name == col.name)
679 {
680 result.add_column(DataColumn {
681 name: col.name.clone(),
682 data_type: col.data_type.clone(),
683 nullable: col.nullable,
684 unique_values: col.unique_values,
685 null_count: col.null_count,
686 metadata: col.metadata.clone(),
687 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
690 } else {
691 let (column_name, qualified_name) = if let Some(alias) = join_alias {
692 (
694 format!("{}.{}", alias, col.name),
695 Some(format!("{}.{}", alias, col.name)),
696 )
697 } else {
698 (format!("{}_right", col.name), col.qualified_name.clone())
700 };
701 result.add_column(DataColumn {
702 name: column_name,
703 data_type: col.data_type.clone(),
704 nullable: col.nullable,
705 unique_values: col.unique_values,
706 null_count: col.null_count,
707 metadata: col.metadata.clone(),
708 qualified_name,
709 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
710 });
711 }
712 }
713
714 let mut match_count = 0;
716 for left_row in &left_table.rows {
717 let left_value = &left_row.values[left_col_idx];
718
719 for right_row in &right_table.rows {
720 let right_value = &right_row.values[right_col_idx];
721
722 if self.compare_values(left_value, right_value, operator) {
723 let mut joined_row = DataRow { values: Vec::new() };
724 joined_row.values.extend_from_slice(&left_row.values);
725 joined_row.values.extend_from_slice(&right_row.values);
726 result.add_row(joined_row);
727 match_count += 1;
728 }
729 }
730 }
731
732 info!(
733 "Nested loop INNER JOIN complete: {} matches found in {:?}",
734 match_count,
735 start.elapsed()
736 );
737
738 Ok(result)
739 }
740
741 fn nested_loop_join_inner_multi(
743 &self,
744 left_table: Arc<DataTable>,
745 right_table: Arc<DataTable>,
746 conditions: Vec<(usize, usize, JoinOperator)>,
747 join_alias: &Option<String>,
748 ) -> Result<DataTable> {
749 let start = std::time::Instant::now();
750
751 info!(
752 "Executing nested loop INNER JOIN with {} conditions: {} x {} rows",
753 conditions.len(),
754 left_table.row_count(),
755 right_table.row_count()
756 );
757
758 let mut result = DataTable::new("joined");
760
761 for col in &left_table.columns {
763 result.add_column(DataColumn {
764 name: col.name.clone(),
765 data_type: col.data_type.clone(),
766 nullable: col.nullable,
767 unique_values: col.unique_values,
768 null_count: col.null_count,
769 metadata: col.metadata.clone(),
770 qualified_name: col.qualified_name.clone(),
771 source_table: col.source_table.clone(),
772 });
773 }
774
775 for col in &right_table.columns {
777 if !left_table
778 .columns
779 .iter()
780 .any(|left_col| left_col.name == col.name)
781 {
782 result.add_column(DataColumn {
783 name: col.name.clone(),
784 data_type: col.data_type.clone(),
785 nullable: col.nullable,
786 unique_values: col.unique_values,
787 null_count: col.null_count,
788 metadata: col.metadata.clone(),
789 qualified_name: col.qualified_name.clone(),
790 source_table: col.source_table.clone(),
791 });
792 } else {
793 let (column_name, qualified_name) = if let Some(alias) = join_alias {
794 (
795 format!("{}.{}", alias, col.name),
796 Some(format!("{}.{}", alias, col.name)),
797 )
798 } else {
799 (format!("{}_right", col.name), col.qualified_name.clone())
800 };
801 result.add_column(DataColumn {
802 name: column_name,
803 data_type: col.data_type.clone(),
804 nullable: col.nullable,
805 unique_values: col.unique_values,
806 null_count: col.null_count,
807 metadata: col.metadata.clone(),
808 qualified_name,
809 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
810 });
811 }
812 }
813
814 let mut match_count = 0;
816 for left_row in &left_table.rows {
817 for right_row in &right_table.rows {
818 let mut all_conditions_met = true;
820 for (left_idx, right_idx, operator) in &conditions {
821 let left_value = &left_row.values[*left_idx];
822 let right_value = &right_row.values[*right_idx];
823
824 if !self.compare_values(left_value, right_value, operator) {
825 all_conditions_met = false;
826 break;
827 }
828 }
829
830 if all_conditions_met {
831 let mut joined_row = DataRow { values: Vec::new() };
832 joined_row.values.extend_from_slice(&left_row.values);
833 joined_row.values.extend_from_slice(&right_row.values);
834 result.add_row(joined_row);
835 match_count += 1;
836 }
837 }
838 }
839
840 info!(
841 "Nested loop INNER JOIN complete: {} matches found in {:?}",
842 match_count,
843 start.elapsed()
844 );
845
846 Ok(result)
847 }
848
849 fn nested_loop_join_left_multi(
851 &self,
852 left_table: Arc<DataTable>,
853 right_table: Arc<DataTable>,
854 conditions: Vec<(usize, usize, JoinOperator)>,
855 join_alias: &Option<String>,
856 ) -> Result<DataTable> {
857 let start = std::time::Instant::now();
858
859 info!(
860 "Executing nested loop LEFT JOIN with {} conditions: {} x {} rows",
861 conditions.len(),
862 left_table.row_count(),
863 right_table.row_count()
864 );
865
866 let mut result = DataTable::new("joined");
868
869 for col in &left_table.columns {
871 result.add_column(DataColumn {
872 name: col.name.clone(),
873 data_type: col.data_type.clone(),
874 nullable: col.nullable,
875 unique_values: col.unique_values,
876 null_count: col.null_count,
877 metadata: col.metadata.clone(),
878 qualified_name: col.qualified_name.clone(),
879 source_table: col.source_table.clone(),
880 });
881 }
882
883 for col in &right_table.columns {
885 if !left_table
886 .columns
887 .iter()
888 .any(|left_col| left_col.name == col.name)
889 {
890 result.add_column(DataColumn {
891 name: col.name.clone(),
892 data_type: col.data_type.clone(),
893 nullable: true, unique_values: col.unique_values,
895 null_count: col.null_count,
896 metadata: col.metadata.clone(),
897 qualified_name: col.qualified_name.clone(),
898 source_table: col.source_table.clone(),
899 });
900 } else {
901 let (column_name, qualified_name) = if let Some(alias) = join_alias {
902 (
903 format!("{}.{}", alias, col.name),
904 Some(format!("{}.{}", alias, col.name)),
905 )
906 } else {
907 (format!("{}_right", col.name), col.qualified_name.clone())
908 };
909 result.add_column(DataColumn {
910 name: column_name,
911 data_type: col.data_type.clone(),
912 nullable: true, unique_values: col.unique_values,
914 null_count: col.null_count,
915 metadata: col.metadata.clone(),
916 qualified_name,
917 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
918 });
919 }
920 }
921
922 let mut match_count = 0;
924 let mut null_count = 0;
925
926 for left_row in &left_table.rows {
927 let mut found_match = false;
928
929 for right_row in &right_table.rows {
930 let mut all_conditions_met = true;
932 for (left_idx, right_idx, operator) in &conditions {
933 let left_value = &left_row.values[*left_idx];
934 let right_value = &right_row.values[*right_idx];
935
936 if !self.compare_values(left_value, right_value, operator) {
937 all_conditions_met = false;
938 break;
939 }
940 }
941
942 if all_conditions_met {
943 let mut joined_row = DataRow { values: Vec::new() };
944 joined_row.values.extend_from_slice(&left_row.values);
945 joined_row.values.extend_from_slice(&right_row.values);
946 result.add_row(joined_row);
947 match_count += 1;
948 found_match = true;
949 }
950 }
951
952 if !found_match {
954 let mut joined_row = DataRow { values: Vec::new() };
955 joined_row.values.extend_from_slice(&left_row.values);
956 for _ in 0..right_table.column_count() {
957 joined_row.values.push(DataValue::Null);
958 }
959 result.add_row(joined_row);
960 null_count += 1;
961 }
962 }
963
964 info!(
965 "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
966 match_count,
967 null_count,
968 start.elapsed()
969 );
970
971 Ok(result)
972 }
973
974 fn nested_loop_join_left(
976 &self,
977 left_table: Arc<DataTable>,
978 right_table: Arc<DataTable>,
979 left_col_idx: usize,
980 right_col_idx: usize,
981 operator: &JoinOperator,
982 join_alias: &Option<String>,
983 ) -> Result<DataTable> {
984 let start = std::time::Instant::now();
985
986 info!(
987 "Executing nested loop LEFT JOIN with {:?} operator: {} x {} rows",
988 operator,
989 left_table.row_count(),
990 right_table.row_count()
991 );
992
993 let mut result = DataTable::new("joined");
995
996 for col in &left_table.columns {
998 result.add_column(DataColumn {
999 name: col.name.clone(),
1000 data_type: col.data_type.clone(),
1001 nullable: col.nullable,
1002 unique_values: col.unique_values,
1003 null_count: col.null_count,
1004 metadata: col.metadata.clone(),
1005 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
1008 }
1009
1010 for col in &right_table.columns {
1012 if !left_table
1013 .columns
1014 .iter()
1015 .any(|left_col| left_col.name == col.name)
1016 {
1017 result.add_column(DataColumn {
1018 name: col.name.clone(),
1019 data_type: col.data_type.clone(),
1020 nullable: true, unique_values: col.unique_values,
1022 null_count: col.null_count,
1023 metadata: col.metadata.clone(),
1024 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
1027 } else {
1028 let (column_name, qualified_name) = if let Some(alias) = join_alias {
1029 (
1031 format!("{}.{}", alias, col.name),
1032 Some(format!("{}.{}", alias, col.name)),
1033 )
1034 } else {
1035 (format!("{}_right", col.name), col.qualified_name.clone())
1037 };
1038 result.add_column(DataColumn {
1039 name: column_name,
1040 data_type: col.data_type.clone(),
1041 nullable: true, unique_values: col.unique_values,
1043 null_count: col.null_count,
1044 metadata: col.metadata.clone(),
1045 qualified_name,
1046 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
1047 });
1048 }
1049 }
1050
1051 let mut match_count = 0;
1053 let mut null_count = 0;
1054
1055 for left_row in &left_table.rows {
1056 let left_value = &left_row.values[left_col_idx];
1057 let mut found_match = false;
1058
1059 for right_row in &right_table.rows {
1060 let right_value = &right_row.values[right_col_idx];
1061
1062 if self.compare_values(left_value, right_value, operator) {
1063 let mut joined_row = DataRow { values: Vec::new() };
1064 joined_row.values.extend_from_slice(&left_row.values);
1065 joined_row.values.extend_from_slice(&right_row.values);
1066 result.add_row(joined_row);
1067 match_count += 1;
1068 found_match = true;
1069 }
1070 }
1071
1072 if !found_match {
1074 let mut joined_row = DataRow { values: Vec::new() };
1075 joined_row.values.extend_from_slice(&left_row.values);
1076 for _ in 0..right_table.column_count() {
1077 joined_row.values.push(DataValue::Null);
1078 }
1079 result.add_row(joined_row);
1080 null_count += 1;
1081 }
1082 }
1083
1084 info!(
1085 "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
1086 match_count,
1087 null_count,
1088 start.elapsed()
1089 );
1090
1091 Ok(result)
1092 }
1093}