1use anyhow::{anyhow, Result};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tracing::{debug, info};
7
8use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
9use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
10use crate::sql::parser::ast::{JoinClause, JoinOperator, JoinType};
11use crate::sql::recursive_parser::SqlExpression;
12
13pub struct HashJoinExecutor {
15 case_insensitive: bool,
16}
17
18impl HashJoinExecutor {
19 pub fn new(case_insensitive: bool) -> Self {
20 Self { case_insensitive }
21 }
22
23 pub fn execute_join(
25 &self,
26 left_table: Arc<DataTable>,
27 join_clause: &JoinClause,
28 right_table: Arc<DataTable>,
29 ) -> Result<DataTable> {
30 info!(
31 "Executing {:?} JOIN: {} rows x {} rows with {} conditions",
32 join_clause.join_type,
33 left_table.row_count(),
34 right_table.row_count(),
35 join_clause.condition.conditions.len()
36 );
37
38 let mut condition_indices = Vec::new();
41 let mut all_equal = true;
42 let mut has_complex_expr = false;
43
44 for single_condition in &join_clause.condition.conditions {
45 let left_col_name = Self::extract_simple_column_name(&single_condition.left_expr);
47 let right_col_name = Self::extract_simple_column_name(&single_condition.right_expr);
48
49 if left_col_name.is_none() || right_col_name.is_none() {
50 has_complex_expr = true;
52 all_equal = false; break;
54 }
55
56 let (left_col_idx, right_col_idx) = self.resolve_join_columns(
57 &left_table,
58 &right_table,
59 &left_col_name.unwrap(),
60 &right_col_name.unwrap(),
61 )?;
62
63 if single_condition.operator != JoinOperator::Equal {
64 all_equal = false;
65 }
66
67 condition_indices.push((
68 left_col_idx,
69 right_col_idx,
70 single_condition.operator.clone(),
71 ));
72 }
73
74 let use_hash_join = all_equal && !has_complex_expr;
78
79 match join_clause.join_type {
81 JoinType::Inner => {
82 if use_hash_join && condition_indices.len() == 1 {
83 let (left_col_idx, right_col_idx, _) = condition_indices[0];
85 let left_col_name = Self::extract_simple_column_name(
86 &join_clause.condition.conditions[0].left_expr,
87 )
88 .expect("left_expr should be a simple column in hash join path");
89 let right_col_name = Self::extract_simple_column_name(
90 &join_clause.condition.conditions[0].right_expr,
91 )
92 .expect("right_expr should be a simple column in hash join path");
93 self.hash_join_inner(
94 left_table,
95 right_table,
96 left_col_idx,
97 right_col_idx,
98 &left_col_name,
99 &right_col_name,
100 &join_clause.alias,
101 )
102 } else {
103 self.nested_loop_join_inner_multi(
105 left_table,
106 right_table,
107 &join_clause.condition.conditions,
108 &join_clause.alias,
109 )
110 }
111 }
112 JoinType::Left => {
113 if use_hash_join && condition_indices.len() == 1 {
114 let (left_col_idx, right_col_idx, _) = condition_indices[0];
116 let left_col_name = Self::extract_simple_column_name(
117 &join_clause.condition.conditions[0].left_expr,
118 )
119 .expect("left_expr should be a simple column in hash join path");
120 let right_col_name = Self::extract_simple_column_name(
121 &join_clause.condition.conditions[0].right_expr,
122 )
123 .expect("right_expr should be a simple column in hash join path");
124 self.hash_join_left(
125 left_table,
126 right_table,
127 left_col_idx,
128 right_col_idx,
129 &left_col_name,
130 &right_col_name,
131 &join_clause.alias,
132 )
133 } else {
134 self.nested_loop_join_left_multi(
136 left_table,
137 right_table,
138 &join_clause.condition.conditions,
139 &join_clause.alias,
140 )
141 }
142 }
143 JoinType::Right => {
144 let swapped_indices: Vec<(usize, usize, JoinOperator)> = condition_indices
146 .into_iter()
147 .map(|(l, r, op)| (r, l, self.reverse_operator(&op)))
148 .collect();
149
150 if use_hash_join && swapped_indices.len() == 1 {
151 let (right_col_idx, left_col_idx, _) = swapped_indices[0];
153 let left_col_name = Self::extract_simple_column_name(
154 &join_clause.condition.conditions[0].left_expr,
155 )
156 .expect("left_expr should be a simple column in hash join path");
157 let right_col_name = Self::extract_simple_column_name(
158 &join_clause.condition.conditions[0].right_expr,
159 )
160 .expect("right_expr should be a simple column in hash join path");
161 self.hash_join_left(
162 right_table,
163 left_table,
164 right_col_idx,
165 left_col_idx,
166 &right_col_name,
167 &left_col_name,
168 &join_clause.alias,
169 )
170 } else {
171 self.nested_loop_join_left_multi(
174 right_table,
175 left_table,
176 &join_clause.condition.conditions,
177 &join_clause.alias,
178 )
179 }
180 }
181 JoinType::Cross => self.cross_join(left_table, right_table),
182 JoinType::Full => {
183 return Err(anyhow!("FULL OUTER JOIN not yet implemented"));
184 }
185 }
186 }
187
188 fn extract_simple_column_name(expr: &SqlExpression) -> Option<String> {
191 match expr {
192 SqlExpression::Column(col_ref) => {
193 if let Some(table_prefix) = &col_ref.table_prefix {
195 Some(format!("{}.{}", table_prefix, col_ref.name))
196 } else {
197 Some(col_ref.name.clone())
198 }
199 }
200 _ => None, }
202 }
203
204 fn resolve_join_columns(
206 &self,
207 left_table: &DataTable,
208 right_table: &DataTable,
209 left_col_name: &str,
210 right_col_name: &str,
211 ) -> Result<(usize, usize)> {
212 let left_col_idx = if let Ok(idx) = self.find_column_index(left_table, left_col_name) {
214 idx
215 } else if let Ok(_idx) = self.find_column_index(right_table, left_col_name) {
216 return Err(anyhow!(
219 "Column '{}' found in right table but specified as left operand. \
220 Please rewrite the condition with columns in correct positions.",
221 left_col_name
222 ));
223 } else {
224 return Err(anyhow!(
225 "Column '{}' not found in either table",
226 left_col_name
227 ));
228 };
229
230 let right_col_idx = if let Ok(idx) = self.find_column_index(right_table, right_col_name) {
232 idx
233 } else if let Ok(_idx) = self.find_column_index(left_table, right_col_name) {
234 return Err(anyhow!(
237 "Column '{}' found in left table but specified as right operand. \
238 Please rewrite the condition with columns in correct positions.",
239 right_col_name
240 ));
241 } else {
242 return Err(anyhow!(
243 "Column '{}' not found in either table",
244 right_col_name
245 ));
246 };
247
248 Ok((left_col_idx, right_col_idx))
249 }
250
251 fn find_column_index(&self, table: &DataTable, col_name: &str) -> Result<usize> {
253 let col_name = if let Some(dot_pos) = col_name.rfind('.') {
255 &col_name[dot_pos + 1..]
256 } else {
257 col_name
258 };
259
260 debug!(
261 "Looking for column '{}' in table with columns: {:?}",
262 col_name,
263 table.column_names()
264 );
265
266 table
267 .columns
268 .iter()
269 .position(|col| {
270 if self.case_insensitive {
271 col.name.to_lowercase() == col_name.to_lowercase()
272 } else {
273 col.name == col_name
274 }
275 })
276 .ok_or_else(|| anyhow!("Column '{}' not found in table", col_name))
277 }
278
279 fn hash_join_inner(
281 &self,
282 left_table: Arc<DataTable>,
283 right_table: Arc<DataTable>,
284 left_col_idx: usize,
285 right_col_idx: usize,
286 _left_col_name: &str,
287 _right_col_name: &str,
288 join_alias: &Option<String>,
289 ) -> Result<DataTable> {
290 let start = std::time::Instant::now();
291
292 let (build_table, probe_table, build_col_idx, probe_col_idx, build_is_left) =
294 if left_table.row_count() <= right_table.row_count() {
295 (
296 left_table.clone(),
297 right_table.clone(),
298 left_col_idx,
299 right_col_idx,
300 true,
301 )
302 } else {
303 (
304 right_table.clone(),
305 left_table.clone(),
306 right_col_idx,
307 left_col_idx,
308 false,
309 )
310 };
311
312 debug!(
313 "Building hash index on {} table ({} rows)",
314 if build_is_left { "left" } else { "right" },
315 build_table.row_count()
316 );
317
318 let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
320 for (row_idx, row) in build_table.rows.iter().enumerate() {
321 let key = row.values[build_col_idx].clone();
322 hash_index.entry(key).or_default().push(row_idx);
323 }
324
325 debug!(
326 "Hash index built with {} unique keys in {:?}",
327 hash_index.len(),
328 start.elapsed()
329 );
330
331 let mut result = DataTable::new("joined");
333
334 for col in &left_table.columns {
336 result.add_column(DataColumn {
337 name: col.name.clone(),
338 data_type: col.data_type.clone(),
339 nullable: col.nullable,
340 unique_values: col.unique_values,
341 null_count: col.null_count,
342 metadata: col.metadata.clone(),
343 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
346 }
347
348 for col in &right_table.columns {
350 if !left_table
352 .columns
353 .iter()
354 .any(|left_col| left_col.name == col.name)
355 {
356 result.add_column(DataColumn {
357 name: col.name.clone(),
358 data_type: col.data_type.clone(),
359 nullable: col.nullable,
360 unique_values: col.unique_values,
361 null_count: col.null_count,
362 metadata: col.metadata.clone(),
363 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
366 } else {
367 let (column_name, qualified_name) = if let Some(alias) = join_alias {
369 (
371 format!("{}.{}", alias, col.name),
372 Some(format!("{}.{}", alias, col.name)),
373 )
374 } else {
375 (format!("{}_right", col.name), col.qualified_name.clone())
377 };
378 result.add_column(DataColumn {
379 name: column_name,
380 data_type: col.data_type.clone(),
381 nullable: col.nullable,
382 unique_values: col.unique_values,
383 null_count: col.null_count,
384 metadata: col.metadata.clone(),
385 qualified_name,
386 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
387 });
388 }
389 }
390
391 debug!(
392 "Joined table will have {} columns: {:?}",
393 result.column_count(),
394 result.column_names()
395 );
396
397 let mut match_count = 0;
399 for probe_row in &probe_table.rows {
400 let probe_key = &probe_row.values[probe_col_idx];
401
402 if let Some(matching_indices) = hash_index.get(probe_key) {
403 for &build_idx in matching_indices {
404 let build_row = &build_table.rows[build_idx];
405
406 let mut joined_row = DataRow { values: Vec::new() };
408
409 if build_is_left {
410 joined_row.values.extend_from_slice(&build_row.values);
412 joined_row.values.extend_from_slice(&probe_row.values);
413 } else {
414 joined_row.values.extend_from_slice(&probe_row.values);
416 joined_row.values.extend_from_slice(&build_row.values);
417 }
418
419 result.add_row(joined_row);
420 match_count += 1;
421 }
422 }
423 }
424
425 let qualified_cols: Vec<String> = result
427 .columns
428 .iter()
429 .filter_map(|c| c.qualified_name.clone())
430 .collect();
431
432 info!(
433 "INNER JOIN complete: {} matches found in {:?}. Result has {} columns ({} qualified: {:?})",
434 match_count,
435 start.elapsed(),
436 result.columns.len(),
437 qualified_cols.len(),
438 qualified_cols
439 );
440
441 Ok(result)
442 }
443
444 fn hash_join_left(
446 &self,
447 left_table: Arc<DataTable>,
448 right_table: Arc<DataTable>,
449 left_col_idx: usize,
450 right_col_idx: usize,
451 _left_col_name: &str,
452 _right_col_name: &str,
453 join_alias: &Option<String>,
454 ) -> Result<DataTable> {
455 let start = std::time::Instant::now();
456
457 debug!(
458 "Building hash index on right table ({} rows)",
459 right_table.row_count()
460 );
461
462 let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
464 for (row_idx, row) in right_table.rows.iter().enumerate() {
465 let key = row.values[right_col_idx].clone();
466 hash_index.entry(key).or_default().push(row_idx);
467 }
468
469 let mut result = DataTable::new("joined");
471
472 for col in &left_table.columns {
474 result.add_column(DataColumn {
475 name: col.name.clone(),
476 data_type: col.data_type.clone(),
477 nullable: col.nullable,
478 unique_values: col.unique_values,
479 null_count: col.null_count,
480 metadata: col.metadata.clone(),
481 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
484 }
485
486 for col in &right_table.columns {
488 if !left_table
490 .columns
491 .iter()
492 .any(|left_col| left_col.name == col.name)
493 {
494 result.add_column(DataColumn {
495 name: col.name.clone(),
496 data_type: col.data_type.clone(),
497 nullable: true, unique_values: col.unique_values,
499 null_count: col.null_count,
500 metadata: col.metadata.clone(),
501 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
504 } else {
505 let (column_name, qualified_name) = if let Some(alias) = join_alias {
507 (
509 format!("{}.{}", alias, col.name),
510 Some(format!("{}.{}", alias, col.name)),
511 )
512 } else {
513 (format!("{}_right", col.name), col.qualified_name.clone())
515 };
516 result.add_column(DataColumn {
517 name: column_name,
518 data_type: col.data_type.clone(),
519 nullable: true, unique_values: col.unique_values,
521 null_count: col.null_count,
522 metadata: col.metadata.clone(),
523 qualified_name,
524 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
525 });
526 }
527 }
528
529 debug!(
530 "LEFT JOIN table will have {} columns: {:?}",
531 result.column_count(),
532 result.column_names()
533 );
534
535 let mut match_count = 0;
537 let mut null_count = 0;
538
539 for left_row in &left_table.rows {
540 let left_key = &left_row.values[left_col_idx];
541
542 if let Some(matching_indices) = hash_index.get(left_key) {
543 for &right_idx in matching_indices {
545 let right_row = &right_table.rows[right_idx];
546
547 let mut joined_row = DataRow { values: Vec::new() };
548 joined_row.values.extend_from_slice(&left_row.values);
549 joined_row.values.extend_from_slice(&right_row.values);
550
551 result.add_row(joined_row);
552 match_count += 1;
553 }
554 } else {
555 let mut joined_row = DataRow { values: Vec::new() };
557 joined_row.values.extend_from_slice(&left_row.values);
558
559 for _ in 0..right_table.column_count() {
561 joined_row.values.push(DataValue::Null);
562 }
563
564 result.add_row(joined_row);
565 null_count += 1;
566 }
567 }
568
569 let qualified_cols: Vec<String> = result
571 .columns
572 .iter()
573 .filter_map(|c| c.qualified_name.clone())
574 .collect();
575
576 info!(
577 "LEFT JOIN complete: {} matches, {} nulls in {:?}. Result has {} columns ({} qualified: {:?})",
578 match_count,
579 null_count,
580 start.elapsed(),
581 result.columns.len(),
582 qualified_cols.len(),
583 qualified_cols
584 );
585
586 Ok(result)
587 }
588
589 fn cross_join(
591 &self,
592 left_table: Arc<DataTable>,
593 right_table: Arc<DataTable>,
594 ) -> Result<DataTable> {
595 let start = std::time::Instant::now();
596
597 let result_rows = left_table.row_count() * right_table.row_count();
599 if result_rows > 1_000_000 {
600 return Err(anyhow!(
601 "CROSS JOIN would produce {} rows, which exceeds the safety limit",
602 result_rows
603 ));
604 }
605
606 let mut result = DataTable::new("joined");
608
609 for col in &left_table.columns {
611 result.add_column(col.clone());
612 }
613 for col in &right_table.columns {
614 result.add_column(col.clone());
615 }
616
617 for left_row in &left_table.rows {
619 for right_row in &right_table.rows {
620 let mut joined_row = DataRow { values: Vec::new() };
621 joined_row.values.extend_from_slice(&left_row.values);
622 joined_row.values.extend_from_slice(&right_row.values);
623 result.add_row(joined_row);
624 }
625 }
626
627 info!(
628 "CROSS JOIN complete: {} rows in {:?}",
629 result.row_count(),
630 start.elapsed()
631 );
632
633 Ok(result)
634 }
635
636 fn qualify_column_name(
638 &self,
639 col_name: &str,
640 table_side: &str,
641 left_join_col: &str,
642 right_join_col: &str,
643 ) -> String {
644 let base_name = if let Some(dot_pos) = col_name.rfind('.') {
646 &col_name[dot_pos + 1..]
647 } else {
648 col_name
649 };
650
651 let left_base = if let Some(dot_pos) = left_join_col.rfind('.') {
652 &left_join_col[dot_pos + 1..]
653 } else {
654 left_join_col
655 };
656
657 let right_base = if let Some(dot_pos) = right_join_col.rfind('.') {
658 &right_join_col[dot_pos + 1..]
659 } else {
660 right_join_col
661 };
662
663 if base_name == left_base || base_name == right_base {
665 format!("{}_{}", table_side, base_name)
666 } else {
667 col_name.to_string()
668 }
669 }
670
671 fn reverse_operator(&self, op: &JoinOperator) -> JoinOperator {
673 match op {
674 JoinOperator::Equal => JoinOperator::Equal,
675 JoinOperator::NotEqual => JoinOperator::NotEqual,
676 JoinOperator::LessThan => JoinOperator::GreaterThan,
677 JoinOperator::GreaterThan => JoinOperator::LessThan,
678 JoinOperator::LessThanOrEqual => JoinOperator::GreaterThanOrEqual,
679 JoinOperator::GreaterThanOrEqual => JoinOperator::LessThanOrEqual,
680 }
681 }
682
683 fn compare_values(&self, left: &DataValue, right: &DataValue, op: &JoinOperator) -> bool {
685 match op {
686 JoinOperator::Equal => left == right,
687 JoinOperator::NotEqual => left != right,
688 JoinOperator::LessThan => left < right,
689 JoinOperator::GreaterThan => left > right,
690 JoinOperator::LessThanOrEqual => left <= right,
691 JoinOperator::GreaterThanOrEqual => left >= right,
692 }
693 }
694
695 fn nested_loop_join_inner(
697 &self,
698 left_table: Arc<DataTable>,
699 right_table: Arc<DataTable>,
700 left_col_idx: usize,
701 right_col_idx: usize,
702 operator: &JoinOperator,
703 join_alias: &Option<String>,
704 ) -> Result<DataTable> {
705 let start = std::time::Instant::now();
706
707 info!(
708 "Executing nested loop INNER JOIN with {:?} operator: {} x {} rows",
709 operator,
710 left_table.row_count(),
711 right_table.row_count()
712 );
713
714 let mut result = DataTable::new("joined");
716
717 for col in &left_table.columns {
719 result.add_column(DataColumn {
720 name: col.name.clone(),
721 data_type: col.data_type.clone(),
722 nullable: col.nullable,
723 unique_values: col.unique_values,
724 null_count: col.null_count,
725 metadata: col.metadata.clone(),
726 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
729 }
730
731 for col in &right_table.columns {
733 if !left_table
734 .columns
735 .iter()
736 .any(|left_col| left_col.name == col.name)
737 {
738 result.add_column(DataColumn {
739 name: col.name.clone(),
740 data_type: col.data_type.clone(),
741 nullable: col.nullable,
742 unique_values: col.unique_values,
743 null_count: col.null_count,
744 metadata: col.metadata.clone(),
745 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
748 } else {
749 let (column_name, qualified_name) = if let Some(alias) = join_alias {
750 (
752 format!("{}.{}", alias, col.name),
753 Some(format!("{}.{}", alias, col.name)),
754 )
755 } else {
756 (format!("{}_right", col.name), col.qualified_name.clone())
758 };
759 result.add_column(DataColumn {
760 name: column_name,
761 data_type: col.data_type.clone(),
762 nullable: col.nullable,
763 unique_values: col.unique_values,
764 null_count: col.null_count,
765 metadata: col.metadata.clone(),
766 qualified_name,
767 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
768 });
769 }
770 }
771
772 let mut match_count = 0;
774 for left_row in &left_table.rows {
775 let left_value = &left_row.values[left_col_idx];
776
777 for right_row in &right_table.rows {
778 let right_value = &right_row.values[right_col_idx];
779
780 if self.compare_values(left_value, right_value, operator) {
781 let mut joined_row = DataRow { values: Vec::new() };
782 joined_row.values.extend_from_slice(&left_row.values);
783 joined_row.values.extend_from_slice(&right_row.values);
784 result.add_row(joined_row);
785 match_count += 1;
786 }
787 }
788 }
789
790 info!(
791 "Nested loop INNER JOIN complete: {} matches found in {:?}",
792 match_count,
793 start.elapsed()
794 );
795
796 Ok(result)
797 }
798
799 fn nested_loop_join_inner_multi(
801 &self,
802 left_table: Arc<DataTable>,
803 right_table: Arc<DataTable>,
804 conditions: &[crate::sql::parser::ast::SingleJoinCondition],
805 join_alias: &Option<String>,
806 ) -> Result<DataTable> {
807 let start = std::time::Instant::now();
808
809 info!(
810 "Executing nested loop INNER JOIN with {} conditions: {} x {} rows",
811 conditions.len(),
812 left_table.row_count(),
813 right_table.row_count()
814 );
815
816 let mut result = DataTable::new("joined");
818
819 for col in &left_table.columns {
821 result.add_column(DataColumn {
822 name: col.name.clone(),
823 data_type: col.data_type.clone(),
824 nullable: col.nullable,
825 unique_values: col.unique_values,
826 null_count: col.null_count,
827 metadata: col.metadata.clone(),
828 qualified_name: col.qualified_name.clone(),
829 source_table: col.source_table.clone(),
830 });
831 }
832
833 for col in &right_table.columns {
835 if !left_table
836 .columns
837 .iter()
838 .any(|left_col| left_col.name == col.name)
839 {
840 result.add_column(DataColumn {
841 name: col.name.clone(),
842 data_type: col.data_type.clone(),
843 nullable: col.nullable,
844 unique_values: col.unique_values,
845 null_count: col.null_count,
846 metadata: col.metadata.clone(),
847 qualified_name: col.qualified_name.clone(),
848 source_table: col.source_table.clone(),
849 });
850 } else {
851 let (column_name, qualified_name) = if let Some(alias) = join_alias {
852 (
853 format!("{}.{}", alias, col.name),
854 Some(format!("{}.{}", alias, col.name)),
855 )
856 } else {
857 (format!("{}_right", col.name), col.qualified_name.clone())
858 };
859 result.add_column(DataColumn {
860 name: column_name,
861 data_type: col.data_type.clone(),
862 nullable: col.nullable,
863 unique_values: col.unique_values,
864 null_count: col.null_count,
865 metadata: col.metadata.clone(),
866 qualified_name,
867 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
868 });
869 }
870 }
871
872 let mut left_evaluator = ArithmeticEvaluator::new(&left_table);
874 let mut right_evaluator = ArithmeticEvaluator::new(&right_table);
875
876 let mut match_count = 0;
878 for (left_row_idx, left_row) in left_table.rows.iter().enumerate() {
879 for (right_row_idx, right_row) in right_table.rows.iter().enumerate() {
880 let mut all_conditions_met = true;
882 for condition in conditions.iter() {
883 let left_value =
885 match left_evaluator.evaluate(&condition.left_expr, left_row_idx) {
886 Ok(val) => val,
887 Err(_) => {
888 all_conditions_met = false;
889 break;
890 }
891 };
892
893 let right_value =
895 match right_evaluator.evaluate(&condition.right_expr, right_row_idx) {
896 Ok(val) => val,
897 Err(_) => {
898 all_conditions_met = false;
899 break;
900 }
901 };
902
903 if !self.compare_values(&left_value, &right_value, &condition.operator) {
904 all_conditions_met = false;
905 break;
906 }
907 }
908
909 if all_conditions_met {
910 let mut joined_row = DataRow { values: Vec::new() };
911 joined_row.values.extend_from_slice(&left_row.values);
912 joined_row.values.extend_from_slice(&right_row.values);
913 result.add_row(joined_row);
914 match_count += 1;
915 }
916 }
917 }
918
919 info!(
920 "Nested loop INNER JOIN complete: {} matches found in {:?}",
921 match_count,
922 start.elapsed()
923 );
924
925 Ok(result)
926 }
927
928 fn nested_loop_join_left_multi(
930 &self,
931 left_table: Arc<DataTable>,
932 right_table: Arc<DataTable>,
933 conditions: &[crate::sql::parser::ast::SingleJoinCondition],
934 join_alias: &Option<String>,
935 ) -> Result<DataTable> {
936 let start = std::time::Instant::now();
937
938 info!(
939 "Executing nested loop LEFT JOIN with {} conditions: {} x {} rows",
940 conditions.len(),
941 left_table.row_count(),
942 right_table.row_count()
943 );
944
945 let mut result = DataTable::new("joined");
947
948 for col in &left_table.columns {
950 result.add_column(DataColumn {
951 name: col.name.clone(),
952 data_type: col.data_type.clone(),
953 nullable: col.nullable,
954 unique_values: col.unique_values,
955 null_count: col.null_count,
956 metadata: col.metadata.clone(),
957 qualified_name: col.qualified_name.clone(),
958 source_table: col.source_table.clone(),
959 });
960 }
961
962 for col in &right_table.columns {
964 if !left_table
965 .columns
966 .iter()
967 .any(|left_col| left_col.name == col.name)
968 {
969 result.add_column(DataColumn {
970 name: col.name.clone(),
971 data_type: col.data_type.clone(),
972 nullable: true, unique_values: col.unique_values,
974 null_count: col.null_count,
975 metadata: col.metadata.clone(),
976 qualified_name: col.qualified_name.clone(),
977 source_table: col.source_table.clone(),
978 });
979 } else {
980 let (column_name, qualified_name) = if let Some(alias) = join_alias {
981 (
982 format!("{}.{}", alias, col.name),
983 Some(format!("{}.{}", alias, col.name)),
984 )
985 } else {
986 (format!("{}_right", col.name), col.qualified_name.clone())
987 };
988 result.add_column(DataColumn {
989 name: column_name,
990 data_type: col.data_type.clone(),
991 nullable: true, unique_values: col.unique_values,
993 null_count: col.null_count,
994 metadata: col.metadata.clone(),
995 qualified_name,
996 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
997 });
998 }
999 }
1000
1001 let mut left_evaluator = ArithmeticEvaluator::new(&left_table);
1003 let mut right_evaluator = ArithmeticEvaluator::new(&right_table);
1004
1005 let mut match_count = 0;
1007 let mut null_count = 0;
1008
1009 for (left_row_idx, left_row) in left_table.rows.iter().enumerate() {
1010 let mut found_match = false;
1011
1012 for (right_row_idx, right_row) in right_table.rows.iter().enumerate() {
1013 let mut all_conditions_met = true;
1015 for condition in conditions.iter() {
1016 let left_value =
1018 match left_evaluator.evaluate(&condition.left_expr, left_row_idx) {
1019 Ok(val) => val,
1020 Err(_) => {
1021 all_conditions_met = false;
1022 break;
1023 }
1024 };
1025
1026 let right_value =
1028 match right_evaluator.evaluate(&condition.right_expr, right_row_idx) {
1029 Ok(val) => val,
1030 Err(_) => {
1031 all_conditions_met = false;
1032 break;
1033 }
1034 };
1035
1036 if !self.compare_values(&left_value, &right_value, &condition.operator) {
1037 all_conditions_met = false;
1038 break;
1039 }
1040 }
1041
1042 if all_conditions_met {
1043 let mut joined_row = DataRow { values: Vec::new() };
1044 joined_row.values.extend_from_slice(&left_row.values);
1045 joined_row.values.extend_from_slice(&right_row.values);
1046 result.add_row(joined_row);
1047 match_count += 1;
1048 found_match = true;
1049 }
1050 }
1051
1052 if !found_match {
1054 let mut joined_row = DataRow { values: Vec::new() };
1055 joined_row.values.extend_from_slice(&left_row.values);
1056 for _ in 0..right_table.column_count() {
1057 joined_row.values.push(DataValue::Null);
1058 }
1059 result.add_row(joined_row);
1060 null_count += 1;
1061 }
1062 }
1063
1064 info!(
1065 "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
1066 match_count,
1067 null_count,
1068 start.elapsed()
1069 );
1070
1071 Ok(result)
1072 }
1073
1074 fn nested_loop_join_left(
1076 &self,
1077 left_table: Arc<DataTable>,
1078 right_table: Arc<DataTable>,
1079 left_col_idx: usize,
1080 right_col_idx: usize,
1081 operator: &JoinOperator,
1082 join_alias: &Option<String>,
1083 ) -> Result<DataTable> {
1084 let start = std::time::Instant::now();
1085
1086 info!(
1087 "Executing nested loop LEFT JOIN with {:?} operator: {} x {} rows",
1088 operator,
1089 left_table.row_count(),
1090 right_table.row_count()
1091 );
1092
1093 let mut result = DataTable::new("joined");
1095
1096 for col in &left_table.columns {
1098 result.add_column(DataColumn {
1099 name: col.name.clone(),
1100 data_type: col.data_type.clone(),
1101 nullable: col.nullable,
1102 unique_values: col.unique_values,
1103 null_count: col.null_count,
1104 metadata: col.metadata.clone(),
1105 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
1108 }
1109
1110 for col in &right_table.columns {
1112 if !left_table
1113 .columns
1114 .iter()
1115 .any(|left_col| left_col.name == col.name)
1116 {
1117 result.add_column(DataColumn {
1118 name: col.name.clone(),
1119 data_type: col.data_type.clone(),
1120 nullable: true, unique_values: col.unique_values,
1122 null_count: col.null_count,
1123 metadata: col.metadata.clone(),
1124 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
1127 } else {
1128 let (column_name, qualified_name) = if let Some(alias) = join_alias {
1129 (
1131 format!("{}.{}", alias, col.name),
1132 Some(format!("{}.{}", alias, col.name)),
1133 )
1134 } else {
1135 (format!("{}_right", col.name), col.qualified_name.clone())
1137 };
1138 result.add_column(DataColumn {
1139 name: column_name,
1140 data_type: col.data_type.clone(),
1141 nullable: true, unique_values: col.unique_values,
1143 null_count: col.null_count,
1144 metadata: col.metadata.clone(),
1145 qualified_name,
1146 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
1147 });
1148 }
1149 }
1150
1151 let mut match_count = 0;
1153 let mut null_count = 0;
1154
1155 for left_row in &left_table.rows {
1156 let left_value = &left_row.values[left_col_idx];
1157 let mut found_match = false;
1158
1159 for right_row in &right_table.rows {
1160 let right_value = &right_row.values[right_col_idx];
1161
1162 if self.compare_values(left_value, right_value, operator) {
1163 let mut joined_row = DataRow { values: Vec::new() };
1164 joined_row.values.extend_from_slice(&left_row.values);
1165 joined_row.values.extend_from_slice(&right_row.values);
1166 result.add_row(joined_row);
1167 match_count += 1;
1168 found_match = true;
1169 }
1170 }
1171
1172 if !found_match {
1174 let mut joined_row = DataRow { values: Vec::new() };
1175 joined_row.values.extend_from_slice(&left_row.values);
1176 for _ in 0..right_table.column_count() {
1177 joined_row.values.push(DataValue::Null);
1178 }
1179 result.add_row(joined_row);
1180 null_count += 1;
1181 }
1182 }
1183
1184 info!(
1185 "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
1186 match_count,
1187 null_count,
1188 start.elapsed()
1189 );
1190
1191 Ok(result)
1192 }
1193}