1use anyhow::{anyhow, Result};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tracing::{debug, info};
7
8use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
9use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
10use crate::data::value_comparisons::compare_with_op;
11use crate::sql::parser::ast::{JoinClause, JoinOperator, JoinType};
12use crate::sql::recursive_parser::SqlExpression;
13
14fn canonical_join_key(value: &DataValue, coerce_numeric: bool) -> DataValue {
43 match value {
44 DataValue::String(s) => normalize_join_text(s, coerce_numeric),
45 DataValue::InternedString(s) => normalize_join_text(s.as_str(), coerce_numeric),
46 DataValue::Float(f) => fold_whole_float(*f),
47 other => other.clone(),
48 }
49}
50
51fn normalize_join_text(s: &str, coerce_numeric: bool) -> DataValue {
54 if coerce_numeric {
55 if let Ok(i) = s.parse::<i64>() {
56 return DataValue::Integer(i);
57 }
58 if let Ok(f) = s.parse::<f64>() {
59 if f.is_finite() {
60 return fold_whole_float(f);
61 }
62 }
63 }
64 DataValue::String(s.to_string())
65}
66
67#[derive(PartialEq, Eq)]
70enum KeyKind {
71 Stringy,
72 Numeric,
73 Other,
74}
75
76fn value_kind(value: &DataValue) -> KeyKind {
77 match value {
78 DataValue::String(_) | DataValue::InternedString(_) => KeyKind::Stringy,
79 DataValue::Integer(_) | DataValue::Float(_) => KeyKind::Numeric,
80 _ => KeyKind::Other,
81 }
82}
83
84fn column_key_kind(table: &DataTable, col_idx: usize) -> Option<KeyKind> {
90 table
91 .rows
92 .iter()
93 .filter_map(|r| r.values.get(col_idx))
94 .find(|v| !matches!(v, DataValue::Null))
95 .map(value_kind)
96}
97
98fn join_key_coercion(
104 left_table: &DataTable,
105 left_col_idx: usize,
106 right_table: &DataTable,
107 right_col_idx: usize,
108) -> bool {
109 match (
110 column_key_kind(left_table, left_col_idx),
111 column_key_kind(right_table, right_col_idx),
112 ) {
113 (Some(l), Some(r)) => l != r,
114 _ => true,
115 }
116}
117
118fn fold_whole_float(f: f64) -> DataValue {
121 if f.is_finite() && f.fract() == 0.0 && f >= i64::MIN as f64 && f <= i64::MAX as f64 {
122 DataValue::Integer(f as i64)
123 } else {
124 DataValue::Float(f)
125 }
126}
127
128pub struct HashJoinExecutor {
130 case_insensitive: bool,
131}
132
133impl HashJoinExecutor {
134 pub fn new(case_insensitive: bool) -> Self {
135 Self { case_insensitive }
136 }
137
138 pub fn execute_join(
140 &self,
141 left_table: Arc<DataTable>,
142 join_clause: &JoinClause,
143 right_table: Arc<DataTable>,
144 ) -> Result<DataTable> {
145 info!(
146 "Executing {:?} JOIN: {} rows x {} rows with {} conditions",
147 join_clause.join_type,
148 left_table.row_count(),
149 right_table.row_count(),
150 join_clause.condition.conditions.len()
151 );
152
153 let mut condition_indices = Vec::new();
156 let mut all_equal = true;
157 let mut has_complex_expr = false;
158
159 for single_condition in &join_clause.condition.conditions {
160 let left_col_name = Self::extract_simple_column_name(&single_condition.left_expr);
162 let right_col_name = Self::extract_simple_column_name(&single_condition.right_expr);
163
164 if left_col_name.is_none() || right_col_name.is_none() {
165 has_complex_expr = true;
167 all_equal = false; break;
169 }
170
171 let (left_col_idx, right_col_idx) = self.resolve_join_columns(
172 &left_table,
173 &right_table,
174 &left_col_name.unwrap(),
175 &right_col_name.unwrap(),
176 )?;
177
178 if single_condition.operator != JoinOperator::Equal {
179 all_equal = false;
180 }
181
182 condition_indices.push((
183 left_col_idx,
184 right_col_idx,
185 single_condition.operator.clone(),
186 ));
187 }
188
189 let use_hash_join = all_equal && !has_complex_expr;
193
194 match join_clause.join_type {
196 JoinType::Inner => {
197 if use_hash_join && condition_indices.len() == 1 {
198 let (left_col_idx, right_col_idx, _) = condition_indices[0];
200 let left_col_name = Self::extract_simple_column_name(
201 &join_clause.condition.conditions[0].left_expr,
202 )
203 .expect("left_expr should be a simple column in hash join path");
204 let right_col_name = Self::extract_simple_column_name(
205 &join_clause.condition.conditions[0].right_expr,
206 )
207 .expect("right_expr should be a simple column in hash join path");
208 self.hash_join_inner(
209 left_table,
210 right_table,
211 left_col_idx,
212 right_col_idx,
213 &left_col_name,
214 &right_col_name,
215 &join_clause.alias,
216 )
217 } else {
218 self.nested_loop_join_inner_multi(
220 left_table,
221 right_table,
222 &join_clause.condition.conditions,
223 &join_clause.alias,
224 )
225 }
226 }
227 JoinType::Left => {
228 if use_hash_join && condition_indices.len() == 1 {
229 let (left_col_idx, right_col_idx, _) = condition_indices[0];
231 let left_col_name = Self::extract_simple_column_name(
232 &join_clause.condition.conditions[0].left_expr,
233 )
234 .expect("left_expr should be a simple column in hash join path");
235 let right_col_name = Self::extract_simple_column_name(
236 &join_clause.condition.conditions[0].right_expr,
237 )
238 .expect("right_expr should be a simple column in hash join path");
239 self.hash_join_left(
240 left_table,
241 right_table,
242 left_col_idx,
243 right_col_idx,
244 &left_col_name,
245 &right_col_name,
246 &join_clause.alias,
247 )
248 } else {
249 self.nested_loop_join_left_multi(
251 left_table,
252 right_table,
253 &join_clause.condition.conditions,
254 &join_clause.alias,
255 )
256 }
257 }
258 JoinType::Right => {
259 let swapped_indices: Vec<(usize, usize, JoinOperator)> = condition_indices
261 .into_iter()
262 .map(|(l, r, op)| (r, l, self.reverse_operator(&op)))
263 .collect();
264
265 if use_hash_join && swapped_indices.len() == 1 {
266 let (right_col_idx, left_col_idx, _) = swapped_indices[0];
268 let left_col_name = Self::extract_simple_column_name(
269 &join_clause.condition.conditions[0].left_expr,
270 )
271 .expect("left_expr should be a simple column in hash join path");
272 let right_col_name = Self::extract_simple_column_name(
273 &join_clause.condition.conditions[0].right_expr,
274 )
275 .expect("right_expr should be a simple column in hash join path");
276 self.hash_join_left(
277 right_table,
278 left_table,
279 right_col_idx,
280 left_col_idx,
281 &right_col_name,
282 &left_col_name,
283 &join_clause.alias,
284 )
285 } else {
286 self.nested_loop_join_left_multi(
289 right_table,
290 left_table,
291 &join_clause.condition.conditions,
292 &join_clause.alias,
293 )
294 }
295 }
296 JoinType::Cross => self.cross_join(left_table, right_table),
297 JoinType::Full => {
298 return Err(anyhow!("FULL OUTER JOIN not yet implemented"));
299 }
300 }
301 }
302
303 fn extract_simple_column_name(expr: &SqlExpression) -> Option<String> {
306 match expr {
307 SqlExpression::Column(col_ref) => {
308 if let Some(table_prefix) = &col_ref.table_prefix {
310 Some(format!("{}.{}", table_prefix, col_ref.name))
311 } else {
312 Some(col_ref.name.clone())
313 }
314 }
315 _ => None, }
317 }
318
319 fn resolve_join_columns(
321 &self,
322 left_table: &DataTable,
323 right_table: &DataTable,
324 left_col_name: &str,
325 right_col_name: &str,
326 ) -> Result<(usize, usize)> {
327 let left_col_idx = if let Ok(idx) = self.find_column_index(left_table, left_col_name) {
329 idx
330 } else if let Ok(_idx) = self.find_column_index(right_table, left_col_name) {
331 return Err(anyhow!(
334 "Column '{}' found in right table but specified as left operand. \
335 Please rewrite the condition with columns in correct positions.",
336 left_col_name
337 ));
338 } else {
339 return Err(anyhow!(
340 "Column '{}' not found in either table",
341 left_col_name
342 ));
343 };
344
345 let right_col_idx = if let Ok(idx) = self.find_column_index(right_table, right_col_name) {
347 idx
348 } else if let Ok(_idx) = self.find_column_index(left_table, right_col_name) {
349 return Err(anyhow!(
352 "Column '{}' found in left table but specified as right operand. \
353 Please rewrite the condition with columns in correct positions.",
354 right_col_name
355 ));
356 } else {
357 return Err(anyhow!(
358 "Column '{}' not found in either table",
359 right_col_name
360 ));
361 };
362
363 Ok((left_col_idx, right_col_idx))
364 }
365
366 fn find_column_index(&self, table: &DataTable, col_name: &str) -> Result<usize> {
368 let col_name = if let Some(dot_pos) = col_name.rfind('.') {
370 &col_name[dot_pos + 1..]
371 } else {
372 col_name
373 };
374
375 debug!(
376 "Looking for column '{}' in table with columns: {:?}",
377 col_name,
378 table.column_names()
379 );
380
381 table
382 .columns
383 .iter()
384 .position(|col| {
385 if self.case_insensitive {
386 col.name.to_lowercase() == col_name.to_lowercase()
387 } else {
388 col.name == col_name
389 }
390 })
391 .ok_or_else(|| anyhow!("Column '{}' not found in table", col_name))
392 }
393
394 fn hash_join_inner(
396 &self,
397 left_table: Arc<DataTable>,
398 right_table: Arc<DataTable>,
399 left_col_idx: usize,
400 right_col_idx: usize,
401 _left_col_name: &str,
402 _right_col_name: &str,
403 join_alias: &Option<String>,
404 ) -> Result<DataTable> {
405 let start = std::time::Instant::now();
406
407 let coerce = join_key_coercion(&left_table, left_col_idx, &right_table, right_col_idx);
411
412 let (build_table, probe_table, build_col_idx, probe_col_idx, build_is_left) =
414 if left_table.row_count() <= right_table.row_count() {
415 (
416 left_table.clone(),
417 right_table.clone(),
418 left_col_idx,
419 right_col_idx,
420 true,
421 )
422 } else {
423 (
424 right_table.clone(),
425 left_table.clone(),
426 right_col_idx,
427 left_col_idx,
428 false,
429 )
430 };
431
432 debug!(
433 "Building hash index on {} table ({} rows)",
434 if build_is_left { "left" } else { "right" },
435 build_table.row_count()
436 );
437
438 let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
440 for (row_idx, row) in build_table.rows.iter().enumerate() {
441 let key = canonical_join_key(&row.values[build_col_idx], coerce);
442 hash_index.entry(key).or_default().push(row_idx);
443 }
444
445 debug!(
446 "Hash index built with {} unique keys in {:?}",
447 hash_index.len(),
448 start.elapsed()
449 );
450
451 let mut result = DataTable::new("joined");
453
454 for col in &left_table.columns {
456 result.add_column(DataColumn {
457 name: col.name.clone(),
458 data_type: col.data_type.clone(),
459 nullable: col.nullable,
460 unique_values: col.unique_values,
461 null_count: col.null_count,
462 metadata: col.metadata.clone(),
463 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
466 }
467
468 for col in &right_table.columns {
470 if !left_table
472 .columns
473 .iter()
474 .any(|left_col| left_col.name == col.name)
475 {
476 result.add_column(DataColumn {
477 name: col.name.clone(),
478 data_type: col.data_type.clone(),
479 nullable: col.nullable,
480 unique_values: col.unique_values,
481 null_count: col.null_count,
482 metadata: col.metadata.clone(),
483 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
486 } else {
487 let (column_name, qualified_name) = if let Some(alias) = join_alias {
489 (
491 format!("{}.{}", alias, col.name),
492 Some(format!("{}.{}", alias, col.name)),
493 )
494 } else {
495 (format!("{}_right", col.name), col.qualified_name.clone())
497 };
498 result.add_column(DataColumn {
499 name: column_name,
500 data_type: col.data_type.clone(),
501 nullable: col.nullable,
502 unique_values: col.unique_values,
503 null_count: col.null_count,
504 metadata: col.metadata.clone(),
505 qualified_name,
506 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
507 });
508 }
509 }
510
511 debug!(
512 "Joined table will have {} columns: {:?}",
513 result.column_count(),
514 result.column_names()
515 );
516
517 let mut match_count = 0;
519 for probe_row in &probe_table.rows {
520 let probe_key = canonical_join_key(&probe_row.values[probe_col_idx], coerce);
521
522 if let Some(matching_indices) = hash_index.get(&probe_key) {
523 for &build_idx in matching_indices {
524 let build_row = &build_table.rows[build_idx];
525
526 let mut joined_row = DataRow { values: Vec::new() };
528
529 if build_is_left {
530 joined_row.values.extend_from_slice(&build_row.values);
532 joined_row.values.extend_from_slice(&probe_row.values);
533 } else {
534 joined_row.values.extend_from_slice(&probe_row.values);
536 joined_row.values.extend_from_slice(&build_row.values);
537 }
538
539 result.add_row(joined_row);
540 match_count += 1;
541 }
542 }
543 }
544
545 let qualified_cols: Vec<String> = result
547 .columns
548 .iter()
549 .filter_map(|c| c.qualified_name.clone())
550 .collect();
551
552 info!(
553 "INNER JOIN complete: {} matches found in {:?}. Result has {} columns ({} qualified: {:?})",
554 match_count,
555 start.elapsed(),
556 result.columns.len(),
557 qualified_cols.len(),
558 qualified_cols
559 );
560
561 Ok(result)
562 }
563
564 fn hash_join_left(
566 &self,
567 left_table: Arc<DataTable>,
568 right_table: Arc<DataTable>,
569 left_col_idx: usize,
570 right_col_idx: usize,
571 _left_col_name: &str,
572 _right_col_name: &str,
573 join_alias: &Option<String>,
574 ) -> Result<DataTable> {
575 let start = std::time::Instant::now();
576
577 let coerce = join_key_coercion(&left_table, left_col_idx, &right_table, right_col_idx);
579
580 debug!(
581 "Building hash index on right table ({} rows)",
582 right_table.row_count()
583 );
584
585 let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
587 for (row_idx, row) in right_table.rows.iter().enumerate() {
588 let key = canonical_join_key(&row.values[right_col_idx], coerce);
589 hash_index.entry(key).or_default().push(row_idx);
590 }
591
592 let mut result = DataTable::new("joined");
594
595 for col in &left_table.columns {
597 result.add_column(DataColumn {
598 name: col.name.clone(),
599 data_type: col.data_type.clone(),
600 nullable: col.nullable,
601 unique_values: col.unique_values,
602 null_count: col.null_count,
603 metadata: col.metadata.clone(),
604 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
607 }
608
609 for col in &right_table.columns {
611 if !left_table
613 .columns
614 .iter()
615 .any(|left_col| left_col.name == col.name)
616 {
617 result.add_column(DataColumn {
618 name: col.name.clone(),
619 data_type: col.data_type.clone(),
620 nullable: true, unique_values: col.unique_values,
622 null_count: col.null_count,
623 metadata: col.metadata.clone(),
624 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
627 } else {
628 let (column_name, qualified_name) = if let Some(alias) = join_alias {
630 (
632 format!("{}.{}", alias, col.name),
633 Some(format!("{}.{}", alias, col.name)),
634 )
635 } else {
636 (format!("{}_right", col.name), col.qualified_name.clone())
638 };
639 result.add_column(DataColumn {
640 name: column_name,
641 data_type: col.data_type.clone(),
642 nullable: true, unique_values: col.unique_values,
644 null_count: col.null_count,
645 metadata: col.metadata.clone(),
646 qualified_name,
647 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
648 });
649 }
650 }
651
652 debug!(
653 "LEFT JOIN table will have {} columns: {:?}",
654 result.column_count(),
655 result.column_names()
656 );
657
658 let mut match_count = 0;
660 let mut null_count = 0;
661
662 for left_row in &left_table.rows {
663 let left_key = canonical_join_key(&left_row.values[left_col_idx], coerce);
664
665 if let Some(matching_indices) = hash_index.get(&left_key) {
666 for &right_idx in matching_indices {
668 let right_row = &right_table.rows[right_idx];
669
670 let mut joined_row = DataRow { values: Vec::new() };
671 joined_row.values.extend_from_slice(&left_row.values);
672 joined_row.values.extend_from_slice(&right_row.values);
673
674 result.add_row(joined_row);
675 match_count += 1;
676 }
677 } else {
678 let mut joined_row = DataRow { values: Vec::new() };
680 joined_row.values.extend_from_slice(&left_row.values);
681
682 for _ in 0..right_table.column_count() {
684 joined_row.values.push(DataValue::Null);
685 }
686
687 result.add_row(joined_row);
688 null_count += 1;
689 }
690 }
691
692 let qualified_cols: Vec<String> = result
694 .columns
695 .iter()
696 .filter_map(|c| c.qualified_name.clone())
697 .collect();
698
699 info!(
700 "LEFT JOIN complete: {} matches, {} nulls in {:?}. Result has {} columns ({} qualified: {:?})",
701 match_count,
702 null_count,
703 start.elapsed(),
704 result.columns.len(),
705 qualified_cols.len(),
706 qualified_cols
707 );
708
709 Ok(result)
710 }
711
712 fn cross_join(
714 &self,
715 left_table: Arc<DataTable>,
716 right_table: Arc<DataTable>,
717 ) -> Result<DataTable> {
718 let start = std::time::Instant::now();
719
720 let result_rows = left_table.row_count() * right_table.row_count();
722 if result_rows > 1_000_000 {
723 return Err(anyhow!(
724 "CROSS JOIN would produce {} rows, which exceeds the safety limit",
725 result_rows
726 ));
727 }
728
729 let mut result = DataTable::new("joined");
731
732 for col in &left_table.columns {
734 result.add_column(col.clone());
735 }
736 for col in &right_table.columns {
737 result.add_column(col.clone());
738 }
739
740 for left_row in &left_table.rows {
742 for right_row in &right_table.rows {
743 let mut joined_row = DataRow { values: Vec::new() };
744 joined_row.values.extend_from_slice(&left_row.values);
745 joined_row.values.extend_from_slice(&right_row.values);
746 result.add_row(joined_row);
747 }
748 }
749
750 info!(
751 "CROSS JOIN complete: {} rows in {:?}",
752 result.row_count(),
753 start.elapsed()
754 );
755
756 Ok(result)
757 }
758
759 fn qualify_column_name(
761 &self,
762 col_name: &str,
763 table_side: &str,
764 left_join_col: &str,
765 right_join_col: &str,
766 ) -> String {
767 let base_name = if let Some(dot_pos) = col_name.rfind('.') {
769 &col_name[dot_pos + 1..]
770 } else {
771 col_name
772 };
773
774 let left_base = if let Some(dot_pos) = left_join_col.rfind('.') {
775 &left_join_col[dot_pos + 1..]
776 } else {
777 left_join_col
778 };
779
780 let right_base = if let Some(dot_pos) = right_join_col.rfind('.') {
781 &right_join_col[dot_pos + 1..]
782 } else {
783 right_join_col
784 };
785
786 if base_name == left_base || base_name == right_base {
788 format!("{}_{}", table_side, base_name)
789 } else {
790 col_name.to_string()
791 }
792 }
793
794 fn reverse_operator(&self, op: &JoinOperator) -> JoinOperator {
796 match op {
797 JoinOperator::Equal => JoinOperator::Equal,
798 JoinOperator::NotEqual => JoinOperator::NotEqual,
799 JoinOperator::LessThan => JoinOperator::GreaterThan,
800 JoinOperator::GreaterThan => JoinOperator::LessThan,
801 JoinOperator::LessThanOrEqual => JoinOperator::GreaterThanOrEqual,
802 JoinOperator::GreaterThanOrEqual => JoinOperator::LessThanOrEqual,
803 }
804 }
805
806 fn compare_values(&self, left: &DataValue, right: &DataValue, op: &JoinOperator) -> bool {
814 let op_str = match op {
815 JoinOperator::Equal => "=",
816 JoinOperator::NotEqual => "!=",
817 JoinOperator::LessThan => "<",
818 JoinOperator::GreaterThan => ">",
819 JoinOperator::LessThanOrEqual => "<=",
820 JoinOperator::GreaterThanOrEqual => ">=",
821 };
822 compare_with_op(left, right, op_str, self.case_insensitive)
823 }
824
825 fn nested_loop_join_inner(
827 &self,
828 left_table: Arc<DataTable>,
829 right_table: Arc<DataTable>,
830 left_col_idx: usize,
831 right_col_idx: usize,
832 operator: &JoinOperator,
833 join_alias: &Option<String>,
834 ) -> Result<DataTable> {
835 let start = std::time::Instant::now();
836
837 info!(
838 "Executing nested loop INNER JOIN with {:?} operator: {} x {} rows",
839 operator,
840 left_table.row_count(),
841 right_table.row_count()
842 );
843
844 let mut result = DataTable::new("joined");
846
847 for col in &left_table.columns {
849 result.add_column(DataColumn {
850 name: col.name.clone(),
851 data_type: col.data_type.clone(),
852 nullable: col.nullable,
853 unique_values: col.unique_values,
854 null_count: col.null_count,
855 metadata: col.metadata.clone(),
856 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
859 }
860
861 for col in &right_table.columns {
863 if !left_table
864 .columns
865 .iter()
866 .any(|left_col| left_col.name == col.name)
867 {
868 result.add_column(DataColumn {
869 name: col.name.clone(),
870 data_type: col.data_type.clone(),
871 nullable: col.nullable,
872 unique_values: col.unique_values,
873 null_count: col.null_count,
874 metadata: col.metadata.clone(),
875 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
878 } else {
879 let (column_name, qualified_name) = if let Some(alias) = join_alias {
880 (
882 format!("{}.{}", alias, col.name),
883 Some(format!("{}.{}", alias, col.name)),
884 )
885 } else {
886 (format!("{}_right", col.name), col.qualified_name.clone())
888 };
889 result.add_column(DataColumn {
890 name: column_name,
891 data_type: col.data_type.clone(),
892 nullable: col.nullable,
893 unique_values: col.unique_values,
894 null_count: col.null_count,
895 metadata: col.metadata.clone(),
896 qualified_name,
897 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
898 });
899 }
900 }
901
902 let mut match_count = 0;
904 for left_row in &left_table.rows {
905 let left_value = &left_row.values[left_col_idx];
906
907 for right_row in &right_table.rows {
908 let right_value = &right_row.values[right_col_idx];
909
910 if self.compare_values(left_value, right_value, operator) {
911 let mut joined_row = DataRow { values: Vec::new() };
912 joined_row.values.extend_from_slice(&left_row.values);
913 joined_row.values.extend_from_slice(&right_row.values);
914 result.add_row(joined_row);
915 match_count += 1;
916 }
917 }
918 }
919
920 info!(
921 "Nested loop INNER JOIN complete: {} matches found in {:?}",
922 match_count,
923 start.elapsed()
924 );
925
926 Ok(result)
927 }
928
929 fn nested_loop_join_inner_multi(
931 &self,
932 left_table: Arc<DataTable>,
933 right_table: Arc<DataTable>,
934 conditions: &[crate::sql::parser::ast::SingleJoinCondition],
935 join_alias: &Option<String>,
936 ) -> Result<DataTable> {
937 let start = std::time::Instant::now();
938
939 info!(
940 "Executing nested loop INNER JOIN with {} conditions: {} x {} rows",
941 conditions.len(),
942 left_table.row_count(),
943 right_table.row_count()
944 );
945
946 let mut result = DataTable::new("joined");
948
949 for col in &left_table.columns {
951 result.add_column(DataColumn {
952 name: col.name.clone(),
953 data_type: col.data_type.clone(),
954 nullable: col.nullable,
955 unique_values: col.unique_values,
956 null_count: col.null_count,
957 metadata: col.metadata.clone(),
958 qualified_name: col.qualified_name.clone(),
959 source_table: col.source_table.clone(),
960 });
961 }
962
963 for col in &right_table.columns {
965 if !left_table
966 .columns
967 .iter()
968 .any(|left_col| left_col.name == col.name)
969 {
970 result.add_column(DataColumn {
971 name: col.name.clone(),
972 data_type: col.data_type.clone(),
973 nullable: col.nullable,
974 unique_values: col.unique_values,
975 null_count: col.null_count,
976 metadata: col.metadata.clone(),
977 qualified_name: col.qualified_name.clone(),
978 source_table: col.source_table.clone(),
979 });
980 } else {
981 let (column_name, qualified_name) = if let Some(alias) = join_alias {
982 (
983 format!("{}.{}", alias, col.name),
984 Some(format!("{}.{}", alias, col.name)),
985 )
986 } else {
987 (format!("{}_right", col.name), col.qualified_name.clone())
988 };
989 result.add_column(DataColumn {
990 name: column_name,
991 data_type: col.data_type.clone(),
992 nullable: col.nullable,
993 unique_values: col.unique_values,
994 null_count: col.null_count,
995 metadata: col.metadata.clone(),
996 qualified_name,
997 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
998 });
999 }
1000 }
1001
1002 let mut left_evaluator = ArithmeticEvaluator::new(&left_table);
1004 let mut right_evaluator = ArithmeticEvaluator::new(&right_table);
1005
1006 let mut match_count = 0;
1008 for (left_row_idx, left_row) in left_table.rows.iter().enumerate() {
1009 for (right_row_idx, right_row) in right_table.rows.iter().enumerate() {
1010 let mut all_conditions_met = true;
1012 for condition in conditions.iter() {
1013 let left_value =
1015 match left_evaluator.evaluate(&condition.left_expr, left_row_idx) {
1016 Ok(val) => val,
1017 Err(_) => {
1018 all_conditions_met = false;
1019 break;
1020 }
1021 };
1022
1023 let right_value =
1025 match right_evaluator.evaluate(&condition.right_expr, right_row_idx) {
1026 Ok(val) => val,
1027 Err(_) => {
1028 all_conditions_met = false;
1029 break;
1030 }
1031 };
1032
1033 if !self.compare_values(&left_value, &right_value, &condition.operator) {
1034 all_conditions_met = false;
1035 break;
1036 }
1037 }
1038
1039 if all_conditions_met {
1040 let mut joined_row = DataRow { values: Vec::new() };
1041 joined_row.values.extend_from_slice(&left_row.values);
1042 joined_row.values.extend_from_slice(&right_row.values);
1043 result.add_row(joined_row);
1044 match_count += 1;
1045 }
1046 }
1047 }
1048
1049 info!(
1050 "Nested loop INNER JOIN complete: {} matches found in {:?}",
1051 match_count,
1052 start.elapsed()
1053 );
1054
1055 Ok(result)
1056 }
1057
1058 fn nested_loop_join_left_multi(
1060 &self,
1061 left_table: Arc<DataTable>,
1062 right_table: Arc<DataTable>,
1063 conditions: &[crate::sql::parser::ast::SingleJoinCondition],
1064 join_alias: &Option<String>,
1065 ) -> Result<DataTable> {
1066 let start = std::time::Instant::now();
1067
1068 info!(
1069 "Executing nested loop LEFT JOIN with {} conditions: {} x {} rows",
1070 conditions.len(),
1071 left_table.row_count(),
1072 right_table.row_count()
1073 );
1074
1075 let mut result = DataTable::new("joined");
1077
1078 for col in &left_table.columns {
1080 result.add_column(DataColumn {
1081 name: col.name.clone(),
1082 data_type: col.data_type.clone(),
1083 nullable: col.nullable,
1084 unique_values: col.unique_values,
1085 null_count: col.null_count,
1086 metadata: col.metadata.clone(),
1087 qualified_name: col.qualified_name.clone(),
1088 source_table: col.source_table.clone(),
1089 });
1090 }
1091
1092 for col in &right_table.columns {
1094 if !left_table
1095 .columns
1096 .iter()
1097 .any(|left_col| left_col.name == col.name)
1098 {
1099 result.add_column(DataColumn {
1100 name: col.name.clone(),
1101 data_type: col.data_type.clone(),
1102 nullable: true, unique_values: col.unique_values,
1104 null_count: col.null_count,
1105 metadata: col.metadata.clone(),
1106 qualified_name: col.qualified_name.clone(),
1107 source_table: col.source_table.clone(),
1108 });
1109 } else {
1110 let (column_name, qualified_name) = if let Some(alias) = join_alias {
1111 (
1112 format!("{}.{}", alias, col.name),
1113 Some(format!("{}.{}", alias, col.name)),
1114 )
1115 } else {
1116 (format!("{}_right", col.name), col.qualified_name.clone())
1117 };
1118 result.add_column(DataColumn {
1119 name: column_name,
1120 data_type: col.data_type.clone(),
1121 nullable: true, unique_values: col.unique_values,
1123 null_count: col.null_count,
1124 metadata: col.metadata.clone(),
1125 qualified_name,
1126 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
1127 });
1128 }
1129 }
1130
1131 let mut left_evaluator = ArithmeticEvaluator::new(&left_table);
1133 let mut right_evaluator = ArithmeticEvaluator::new(&right_table);
1134
1135 let mut match_count = 0;
1137 let mut null_count = 0;
1138
1139 for (left_row_idx, left_row) in left_table.rows.iter().enumerate() {
1140 let mut found_match = false;
1141
1142 for (right_row_idx, right_row) in right_table.rows.iter().enumerate() {
1143 let mut all_conditions_met = true;
1145 for condition in conditions.iter() {
1146 let left_value =
1148 match left_evaluator.evaluate(&condition.left_expr, left_row_idx) {
1149 Ok(val) => val,
1150 Err(_) => {
1151 all_conditions_met = false;
1152 break;
1153 }
1154 };
1155
1156 let right_value =
1158 match right_evaluator.evaluate(&condition.right_expr, right_row_idx) {
1159 Ok(val) => val,
1160 Err(_) => {
1161 all_conditions_met = false;
1162 break;
1163 }
1164 };
1165
1166 if !self.compare_values(&left_value, &right_value, &condition.operator) {
1167 all_conditions_met = false;
1168 break;
1169 }
1170 }
1171
1172 if all_conditions_met {
1173 let mut joined_row = DataRow { values: Vec::new() };
1174 joined_row.values.extend_from_slice(&left_row.values);
1175 joined_row.values.extend_from_slice(&right_row.values);
1176 result.add_row(joined_row);
1177 match_count += 1;
1178 found_match = true;
1179 }
1180 }
1181
1182 if !found_match {
1184 let mut joined_row = DataRow { values: Vec::new() };
1185 joined_row.values.extend_from_slice(&left_row.values);
1186 for _ in 0..right_table.column_count() {
1187 joined_row.values.push(DataValue::Null);
1188 }
1189 result.add_row(joined_row);
1190 null_count += 1;
1191 }
1192 }
1193
1194 info!(
1195 "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
1196 match_count,
1197 null_count,
1198 start.elapsed()
1199 );
1200
1201 Ok(result)
1202 }
1203
1204 fn nested_loop_join_left(
1206 &self,
1207 left_table: Arc<DataTable>,
1208 right_table: Arc<DataTable>,
1209 left_col_idx: usize,
1210 right_col_idx: usize,
1211 operator: &JoinOperator,
1212 join_alias: &Option<String>,
1213 ) -> Result<DataTable> {
1214 let start = std::time::Instant::now();
1215
1216 info!(
1217 "Executing nested loop LEFT JOIN with {:?} operator: {} x {} rows",
1218 operator,
1219 left_table.row_count(),
1220 right_table.row_count()
1221 );
1222
1223 let mut result = DataTable::new("joined");
1225
1226 for col in &left_table.columns {
1228 result.add_column(DataColumn {
1229 name: col.name.clone(),
1230 data_type: col.data_type.clone(),
1231 nullable: col.nullable,
1232 unique_values: col.unique_values,
1233 null_count: col.null_count,
1234 metadata: col.metadata.clone(),
1235 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
1238 }
1239
1240 for col in &right_table.columns {
1242 if !left_table
1243 .columns
1244 .iter()
1245 .any(|left_col| left_col.name == col.name)
1246 {
1247 result.add_column(DataColumn {
1248 name: col.name.clone(),
1249 data_type: col.data_type.clone(),
1250 nullable: true, unique_values: col.unique_values,
1252 null_count: col.null_count,
1253 metadata: col.metadata.clone(),
1254 qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(), });
1257 } else {
1258 let (column_name, qualified_name) = if let Some(alias) = join_alias {
1259 (
1261 format!("{}.{}", alias, col.name),
1262 Some(format!("{}.{}", alias, col.name)),
1263 )
1264 } else {
1265 (format!("{}_right", col.name), col.qualified_name.clone())
1267 };
1268 result.add_column(DataColumn {
1269 name: column_name,
1270 data_type: col.data_type.clone(),
1271 nullable: true, unique_values: col.unique_values,
1273 null_count: col.null_count,
1274 metadata: col.metadata.clone(),
1275 qualified_name,
1276 source_table: join_alias.clone().or_else(|| col.source_table.clone()),
1277 });
1278 }
1279 }
1280
1281 let mut match_count = 0;
1283 let mut null_count = 0;
1284
1285 for left_row in &left_table.rows {
1286 let left_value = &left_row.values[left_col_idx];
1287 let mut found_match = false;
1288
1289 for right_row in &right_table.rows {
1290 let right_value = &right_row.values[right_col_idx];
1291
1292 if self.compare_values(left_value, right_value, operator) {
1293 let mut joined_row = DataRow { values: Vec::new() };
1294 joined_row.values.extend_from_slice(&left_row.values);
1295 joined_row.values.extend_from_slice(&right_row.values);
1296 result.add_row(joined_row);
1297 match_count += 1;
1298 found_match = true;
1299 }
1300 }
1301
1302 if !found_match {
1304 let mut joined_row = DataRow { values: Vec::new() };
1305 joined_row.values.extend_from_slice(&left_row.values);
1306 for _ in 0..right_table.column_count() {
1307 joined_row.values.push(DataValue::Null);
1308 }
1309 result.add_row(joined_row);
1310 null_count += 1;
1311 }
1312 }
1313
1314 info!(
1315 "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
1316 match_count,
1317 null_count,
1318 start.elapsed()
1319 );
1320
1321 Ok(result)
1322 }
1323}
1324
1325#[cfg(test)]
1326mod tests {
1327 use super::*;
1328 use std::sync::Arc;
1329
1330 #[test]
1331 fn numeric_string_folds_to_integer_when_coercing() {
1332 assert_eq!(
1335 canonical_join_key(&DataValue::String("220".to_string()), true),
1336 DataValue::Integer(220)
1337 );
1338 assert_eq!(
1339 canonical_join_key(&DataValue::Integer(220), true),
1340 DataValue::Integer(220)
1341 );
1342 assert_eq!(
1343 canonical_join_key(&DataValue::String("220".to_string()), true),
1344 canonical_join_key(&DataValue::Integer(220), true)
1345 );
1346 }
1347
1348 #[test]
1349 fn numeric_strings_stay_distinct_when_not_coercing() {
1350 assert_eq!(
1353 canonical_join_key(&DataValue::String("007".to_string()), false),
1354 DataValue::String("007".to_string())
1355 );
1356 assert_ne!(
1357 canonical_join_key(&DataValue::String("007".to_string()), false),
1358 canonical_join_key(&DataValue::String("7".to_string()), false)
1359 );
1360 assert_ne!(
1362 canonical_join_key(&DataValue::String("7".to_string()), false),
1363 canonical_join_key(&DataValue::Integer(7), false)
1364 );
1365 }
1366
1367 #[test]
1368 fn interned_and_plain_strings_collapse_regardless_of_coercion() {
1369 for coerce in [true, false] {
1370 assert_eq!(
1371 canonical_join_key(
1372 &DataValue::InternedString(Arc::new("North".to_string())),
1373 coerce
1374 ),
1375 canonical_join_key(&DataValue::String("North".to_string()), coerce),
1376 "interned/plain strings must collapse (coerce = {coerce})"
1377 );
1378 }
1379 }
1380
1381 #[test]
1382 fn whole_float_folds_to_integer_when_coercing() {
1383 assert_eq!(
1384 canonical_join_key(&DataValue::Float(220.0), true),
1385 DataValue::Integer(220)
1386 );
1387 assert_eq!(
1388 canonical_join_key(&DataValue::String("220.0".to_string()), true),
1389 DataValue::Integer(220)
1390 );
1391 assert_eq!(
1393 canonical_join_key(&DataValue::Float(220.5), true),
1394 DataValue::Float(220.5)
1395 );
1396 assert_eq!(
1399 canonical_join_key(&DataValue::Float(220.0), false),
1400 DataValue::Integer(220)
1401 );
1402 }
1403
1404 #[test]
1405 fn non_numeric_text_is_preserved() {
1406 assert_eq!(
1407 canonical_join_key(&DataValue::String("North".to_string()), true),
1408 DataValue::String("North".to_string())
1409 );
1410 assert_eq!(
1412 canonical_join_key(&DataValue::String(" 220".to_string()), true),
1413 DataValue::String(" 220".to_string())
1414 );
1415 }
1416
1417 #[test]
1418 fn non_finite_strings_stay_strings() {
1419 assert_eq!(
1420 canonical_join_key(&DataValue::String("inf".to_string()), true),
1421 DataValue::String("inf".to_string())
1422 );
1423 assert_eq!(
1424 canonical_join_key(&DataValue::String("NaN".to_string()), true),
1425 DataValue::String("NaN".to_string())
1426 );
1427 }
1428
1429 #[test]
1430 fn null_is_unchanged() {
1431 assert_eq!(canonical_join_key(&DataValue::Null, true), DataValue::Null);
1432 }
1433
1434 #[test]
1435 fn coercion_enabled_only_for_differing_value_kinds() {
1436 let stringy = single_col_table(DataValue::String("7".to_string()));
1438 let numeric = single_col_table(DataValue::Integer(7));
1439 let stringy2 = single_col_table(DataValue::String("8".to_string()));
1440 let empty = DataTable::new("empty"); assert!(join_key_coercion(&stringy, 0, &numeric, 0));
1444 assert!(!join_key_coercion(&stringy, 0, &stringy2, 0));
1446 assert!(!join_key_coercion(&numeric, 0, &numeric, 0));
1448 assert!(join_key_coercion(&stringy, 0, &empty, 0));
1450 }
1451
1452 fn single_col_table(value: DataValue) -> DataTable {
1453 let mut t = DataTable::new("t");
1454 t.add_column(DataColumn::new("k"));
1455 let _ = t.add_row(DataRow {
1456 values: vec![value],
1457 });
1458 t
1459 }
1460}