1use anyhow::{anyhow, Result};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tracing::{debug, info};
7
8use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
9use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
10use crate::sql::parser::ast::{JoinClause, JoinOperator, JoinType};
11use crate::sql::recursive_parser::SqlExpression;
12
13pub struct HashJoinExecutor {
15    case_insensitive: bool,
16}
17
18impl HashJoinExecutor {
19    pub fn new(case_insensitive: bool) -> Self {
20        Self { case_insensitive }
21    }
22
23    pub fn execute_join(
25        &self,
26        left_table: Arc<DataTable>,
27        join_clause: &JoinClause,
28        right_table: Arc<DataTable>,
29    ) -> Result<DataTable> {
30        info!(
31            "Executing {:?} JOIN: {} rows x {} rows with {} conditions",
32            join_clause.join_type,
33            left_table.row_count(),
34            right_table.row_count(),
35            join_clause.condition.conditions.len()
36        );
37
38        let mut condition_indices = Vec::new();
41        let mut all_equal = true;
42        let mut has_complex_expr = false;
43
44        for single_condition in &join_clause.condition.conditions {
45            let right_col_name = Self::extract_simple_column_name(&single_condition.right_expr);
47
48            if right_col_name.is_none() {
49                has_complex_expr = true;
51                all_equal = false; break;
53            }
54
55            let (left_col_idx, right_col_idx) = self.resolve_join_columns(
56                &left_table,
57                &right_table,
58                &single_condition.left_column,
59                &right_col_name.unwrap(),
60            )?;
61
62            if single_condition.operator != JoinOperator::Equal {
63                all_equal = false;
64            }
65
66            condition_indices.push((
67                left_col_idx,
68                right_col_idx,
69                single_condition.operator.clone(),
70            ));
71        }
72
73        let use_hash_join = all_equal && !has_complex_expr;
77
78        match join_clause.join_type {
80            JoinType::Inner => {
81                if use_hash_join && condition_indices.len() == 1 {
82                    let (left_col_idx, right_col_idx, _) = condition_indices[0];
84                    let right_col_name = Self::extract_simple_column_name(
85                        &join_clause.condition.conditions[0].right_expr,
86                    )
87                    .expect("right_expr should be a simple column in hash join path");
88                    self.hash_join_inner(
89                        left_table,
90                        right_table,
91                        left_col_idx,
92                        right_col_idx,
93                        &join_clause.condition.conditions[0].left_column,
94                        &right_col_name,
95                        &join_clause.alias,
96                    )
97                } else {
98                    self.nested_loop_join_inner_multi(
100                        left_table,
101                        right_table,
102                        &join_clause.condition.conditions,
103                        &join_clause.alias,
104                    )
105                }
106            }
107            JoinType::Left => {
108                if use_hash_join && condition_indices.len() == 1 {
109                    let (left_col_idx, right_col_idx, _) = condition_indices[0];
111                    let right_col_name = Self::extract_simple_column_name(
112                        &join_clause.condition.conditions[0].right_expr,
113                    )
114                    .expect("right_expr should be a simple column in hash join path");
115                    self.hash_join_left(
116                        left_table,
117                        right_table,
118                        left_col_idx,
119                        right_col_idx,
120                        &join_clause.condition.conditions[0].left_column,
121                        &right_col_name,
122                        &join_clause.alias,
123                    )
124                } else {
125                    self.nested_loop_join_left_multi(
127                        left_table,
128                        right_table,
129                        &join_clause.condition.conditions,
130                        &join_clause.alias,
131                    )
132                }
133            }
134            JoinType::Right => {
135                let swapped_indices: Vec<(usize, usize, JoinOperator)> = condition_indices
137                    .into_iter()
138                    .map(|(l, r, op)| (r, l, self.reverse_operator(&op)))
139                    .collect();
140
141                if use_hash_join && swapped_indices.len() == 1 {
142                    let (right_col_idx, left_col_idx, _) = swapped_indices[0];
144                    let right_col_name = Self::extract_simple_column_name(
145                        &join_clause.condition.conditions[0].right_expr,
146                    )
147                    .expect("right_expr should be a simple column in hash join path");
148                    self.hash_join_left(
149                        right_table,
150                        left_table,
151                        right_col_idx,
152                        left_col_idx,
153                        &right_col_name,
154                        &join_clause.condition.conditions[0].left_column,
155                        &join_clause.alias,
156                    )
157                } else {
158                    self.nested_loop_join_left_multi(
161                        right_table,
162                        left_table,
163                        &join_clause.condition.conditions,
164                        &join_clause.alias,
165                    )
166                }
167            }
168            JoinType::Cross => self.cross_join(left_table, right_table),
169            JoinType::Full => {
170                return Err(anyhow!("FULL OUTER JOIN not yet implemented"));
171            }
172        }
173    }
174
175    fn extract_simple_column_name(expr: &SqlExpression) -> Option<String> {
178        match expr {
179            SqlExpression::Column(col_ref) => {
180                if let Some(table_prefix) = &col_ref.table_prefix {
182                    Some(format!("{}.{}", table_prefix, col_ref.name))
183                } else {
184                    Some(col_ref.name.clone())
185                }
186            }
187            _ => None, }
189    }
190
191    fn resolve_join_columns(
193        &self,
194        left_table: &DataTable,
195        right_table: &DataTable,
196        left_col_name: &str,
197        right_col_name: &str,
198    ) -> Result<(usize, usize)> {
199        let left_col_idx = if let Ok(idx) = self.find_column_index(left_table, left_col_name) {
201            idx
202        } else if let Ok(_idx) = self.find_column_index(right_table, left_col_name) {
203            return Err(anyhow!(
206                "Column '{}' found in right table but specified as left operand. \
207                Please rewrite the condition with columns in correct positions.",
208                left_col_name
209            ));
210        } else {
211            return Err(anyhow!(
212                "Column '{}' not found in either table",
213                left_col_name
214            ));
215        };
216
217        let right_col_idx = if let Ok(idx) = self.find_column_index(right_table, right_col_name) {
219            idx
220        } else if let Ok(_idx) = self.find_column_index(left_table, right_col_name) {
221            return Err(anyhow!(
224                "Column '{}' found in left table but specified as right operand. \
225                Please rewrite the condition with columns in correct positions.",
226                right_col_name
227            ));
228        } else {
229            return Err(anyhow!(
230                "Column '{}' not found in either table",
231                right_col_name
232            ));
233        };
234
235        Ok((left_col_idx, right_col_idx))
236    }
237
238    fn find_column_index(&self, table: &DataTable, col_name: &str) -> Result<usize> {
240        let col_name = if let Some(dot_pos) = col_name.rfind('.') {
242            &col_name[dot_pos + 1..]
243        } else {
244            col_name
245        };
246
247        debug!(
248            "Looking for column '{}' in table with columns: {:?}",
249            col_name,
250            table.column_names()
251        );
252
253        table
254            .columns
255            .iter()
256            .position(|col| {
257                if self.case_insensitive {
258                    col.name.to_lowercase() == col_name.to_lowercase()
259                } else {
260                    col.name == col_name
261                }
262            })
263            .ok_or_else(|| anyhow!("Column '{}' not found in table", col_name))
264    }
265
266    fn hash_join_inner(
268        &self,
269        left_table: Arc<DataTable>,
270        right_table: Arc<DataTable>,
271        left_col_idx: usize,
272        right_col_idx: usize,
273        _left_col_name: &str,
274        _right_col_name: &str,
275        join_alias: &Option<String>,
276    ) -> Result<DataTable> {
277        let start = std::time::Instant::now();
278
279        let (build_table, probe_table, build_col_idx, probe_col_idx, build_is_left) =
281            if left_table.row_count() <= right_table.row_count() {
282                (
283                    left_table.clone(),
284                    right_table.clone(),
285                    left_col_idx,
286                    right_col_idx,
287                    true,
288                )
289            } else {
290                (
291                    right_table.clone(),
292                    left_table.clone(),
293                    right_col_idx,
294                    left_col_idx,
295                    false,
296                )
297            };
298
299        debug!(
300            "Building hash index on {} table ({} rows)",
301            if build_is_left { "left" } else { "right" },
302            build_table.row_count()
303        );
304
305        let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
307        for (row_idx, row) in build_table.rows.iter().enumerate() {
308            let key = row.values[build_col_idx].clone();
309            hash_index.entry(key).or_default().push(row_idx);
310        }
311
312        debug!(
313            "Hash index built with {} unique keys in {:?}",
314            hash_index.len(),
315            start.elapsed()
316        );
317
318        let mut result = DataTable::new("joined");
320
321        for col in &left_table.columns {
323            result.add_column(DataColumn {
324                name: col.name.clone(),
325                data_type: col.data_type.clone(),
326                nullable: col.nullable,
327                unique_values: col.unique_values,
328                null_count: col.null_count,
329                metadata: col.metadata.clone(),
330                qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(),     });
333        }
334
335        for col in &right_table.columns {
337            if !left_table
339                .columns
340                .iter()
341                .any(|left_col| left_col.name == col.name)
342            {
343                result.add_column(DataColumn {
344                    name: col.name.clone(),
345                    data_type: col.data_type.clone(),
346                    nullable: col.nullable,
347                    unique_values: col.unique_values,
348                    null_count: col.null_count,
349                    metadata: col.metadata.clone(),
350                    qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(),     });
353            } else {
354                let (column_name, qualified_name) = if let Some(alias) = join_alias {
356                    (
358                        format!("{}.{}", alias, col.name),
359                        Some(format!("{}.{}", alias, col.name)),
360                    )
361                } else {
362                    (format!("{}_right", col.name), col.qualified_name.clone())
364                };
365                result.add_column(DataColumn {
366                    name: column_name,
367                    data_type: col.data_type.clone(),
368                    nullable: col.nullable,
369                    unique_values: col.unique_values,
370                    null_count: col.null_count,
371                    metadata: col.metadata.clone(),
372                    qualified_name,
373                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
374                });
375            }
376        }
377
378        debug!(
379            "Joined table will have {} columns: {:?}",
380            result.column_count(),
381            result.column_names()
382        );
383
384        let mut match_count = 0;
386        for probe_row in &probe_table.rows {
387            let probe_key = &probe_row.values[probe_col_idx];
388
389            if let Some(matching_indices) = hash_index.get(probe_key) {
390                for &build_idx in matching_indices {
391                    let build_row = &build_table.rows[build_idx];
392
393                    let mut joined_row = DataRow { values: Vec::new() };
395
396                    if build_is_left {
397                        joined_row.values.extend_from_slice(&build_row.values);
399                        joined_row.values.extend_from_slice(&probe_row.values);
400                    } else {
401                        joined_row.values.extend_from_slice(&probe_row.values);
403                        joined_row.values.extend_from_slice(&build_row.values);
404                    }
405
406                    result.add_row(joined_row);
407                    match_count += 1;
408                }
409            }
410        }
411
412        let qualified_cols: Vec<String> = result
414            .columns
415            .iter()
416            .filter_map(|c| c.qualified_name.clone())
417            .collect();
418
419        info!(
420            "INNER JOIN complete: {} matches found in {:?}. Result has {} columns ({} qualified: {:?})",
421            match_count,
422            start.elapsed(),
423            result.columns.len(),
424            qualified_cols.len(),
425            qualified_cols
426        );
427
428        Ok(result)
429    }
430
431    fn hash_join_left(
433        &self,
434        left_table: Arc<DataTable>,
435        right_table: Arc<DataTable>,
436        left_col_idx: usize,
437        right_col_idx: usize,
438        _left_col_name: &str,
439        _right_col_name: &str,
440        join_alias: &Option<String>,
441    ) -> Result<DataTable> {
442        let start = std::time::Instant::now();
443
444        debug!(
445            "Building hash index on right table ({} rows)",
446            right_table.row_count()
447        );
448
449        let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
451        for (row_idx, row) in right_table.rows.iter().enumerate() {
452            let key = row.values[right_col_idx].clone();
453            hash_index.entry(key).or_default().push(row_idx);
454        }
455
456        let mut result = DataTable::new("joined");
458
459        for col in &left_table.columns {
461            result.add_column(DataColumn {
462                name: col.name.clone(),
463                data_type: col.data_type.clone(),
464                nullable: col.nullable,
465                unique_values: col.unique_values,
466                null_count: col.null_count,
467                metadata: col.metadata.clone(),
468                qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(),     });
471        }
472
473        for col in &right_table.columns {
475            if !left_table
477                .columns
478                .iter()
479                .any(|left_col| left_col.name == col.name)
480            {
481                result.add_column(DataColumn {
482                    name: col.name.clone(),
483                    data_type: col.data_type.clone(),
484                    nullable: true, unique_values: col.unique_values,
486                    null_count: col.null_count,
487                    metadata: col.metadata.clone(),
488                    qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(),     });
491            } else {
492                let (column_name, qualified_name) = if let Some(alias) = join_alias {
494                    (
496                        format!("{}.{}", alias, col.name),
497                        Some(format!("{}.{}", alias, col.name)),
498                    )
499                } else {
500                    (format!("{}_right", col.name), col.qualified_name.clone())
502                };
503                result.add_column(DataColumn {
504                    name: column_name,
505                    data_type: col.data_type.clone(),
506                    nullable: true, unique_values: col.unique_values,
508                    null_count: col.null_count,
509                    metadata: col.metadata.clone(),
510                    qualified_name,
511                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
512                });
513            }
514        }
515
516        debug!(
517            "LEFT JOIN table will have {} columns: {:?}",
518            result.column_count(),
519            result.column_names()
520        );
521
522        let mut match_count = 0;
524        let mut null_count = 0;
525
526        for left_row in &left_table.rows {
527            let left_key = &left_row.values[left_col_idx];
528
529            if let Some(matching_indices) = hash_index.get(left_key) {
530                for &right_idx in matching_indices {
532                    let right_row = &right_table.rows[right_idx];
533
534                    let mut joined_row = DataRow { values: Vec::new() };
535                    joined_row.values.extend_from_slice(&left_row.values);
536                    joined_row.values.extend_from_slice(&right_row.values);
537
538                    result.add_row(joined_row);
539                    match_count += 1;
540                }
541            } else {
542                let mut joined_row = DataRow { values: Vec::new() };
544                joined_row.values.extend_from_slice(&left_row.values);
545
546                for _ in 0..right_table.column_count() {
548                    joined_row.values.push(DataValue::Null);
549                }
550
551                result.add_row(joined_row);
552                null_count += 1;
553            }
554        }
555
556        let qualified_cols: Vec<String> = result
558            .columns
559            .iter()
560            .filter_map(|c| c.qualified_name.clone())
561            .collect();
562
563        info!(
564            "LEFT JOIN complete: {} matches, {} nulls in {:?}. Result has {} columns ({} qualified: {:?})",
565            match_count,
566            null_count,
567            start.elapsed(),
568            result.columns.len(),
569            qualified_cols.len(),
570            qualified_cols
571        );
572
573        Ok(result)
574    }
575
576    fn cross_join(
578        &self,
579        left_table: Arc<DataTable>,
580        right_table: Arc<DataTable>,
581    ) -> Result<DataTable> {
582        let start = std::time::Instant::now();
583
584        let result_rows = left_table.row_count() * right_table.row_count();
586        if result_rows > 1_000_000 {
587            return Err(anyhow!(
588                "CROSS JOIN would produce {} rows, which exceeds the safety limit",
589                result_rows
590            ));
591        }
592
593        let mut result = DataTable::new("joined");
595
596        for col in &left_table.columns {
598            result.add_column(col.clone());
599        }
600        for col in &right_table.columns {
601            result.add_column(col.clone());
602        }
603
604        for left_row in &left_table.rows {
606            for right_row in &right_table.rows {
607                let mut joined_row = DataRow { values: Vec::new() };
608                joined_row.values.extend_from_slice(&left_row.values);
609                joined_row.values.extend_from_slice(&right_row.values);
610                result.add_row(joined_row);
611            }
612        }
613
614        info!(
615            "CROSS JOIN complete: {} rows in {:?}",
616            result.row_count(),
617            start.elapsed()
618        );
619
620        Ok(result)
621    }
622
623    fn qualify_column_name(
625        &self,
626        col_name: &str,
627        table_side: &str,
628        left_join_col: &str,
629        right_join_col: &str,
630    ) -> String {
631        let base_name = if let Some(dot_pos) = col_name.rfind('.') {
633            &col_name[dot_pos + 1..]
634        } else {
635            col_name
636        };
637
638        let left_base = if let Some(dot_pos) = left_join_col.rfind('.') {
639            &left_join_col[dot_pos + 1..]
640        } else {
641            left_join_col
642        };
643
644        let right_base = if let Some(dot_pos) = right_join_col.rfind('.') {
645            &right_join_col[dot_pos + 1..]
646        } else {
647            right_join_col
648        };
649
650        if base_name == left_base || base_name == right_base {
652            format!("{}_{}", table_side, base_name)
653        } else {
654            col_name.to_string()
655        }
656    }
657
658    fn reverse_operator(&self, op: &JoinOperator) -> JoinOperator {
660        match op {
661            JoinOperator::Equal => JoinOperator::Equal,
662            JoinOperator::NotEqual => JoinOperator::NotEqual,
663            JoinOperator::LessThan => JoinOperator::GreaterThan,
664            JoinOperator::GreaterThan => JoinOperator::LessThan,
665            JoinOperator::LessThanOrEqual => JoinOperator::GreaterThanOrEqual,
666            JoinOperator::GreaterThanOrEqual => JoinOperator::LessThanOrEqual,
667        }
668    }
669
670    fn compare_values(&self, left: &DataValue, right: &DataValue, op: &JoinOperator) -> bool {
672        match op {
673            JoinOperator::Equal => left == right,
674            JoinOperator::NotEqual => left != right,
675            JoinOperator::LessThan => left < right,
676            JoinOperator::GreaterThan => left > right,
677            JoinOperator::LessThanOrEqual => left <= right,
678            JoinOperator::GreaterThanOrEqual => left >= right,
679        }
680    }
681
682    fn nested_loop_join_inner(
684        &self,
685        left_table: Arc<DataTable>,
686        right_table: Arc<DataTable>,
687        left_col_idx: usize,
688        right_col_idx: usize,
689        operator: &JoinOperator,
690        join_alias: &Option<String>,
691    ) -> Result<DataTable> {
692        let start = std::time::Instant::now();
693
694        info!(
695            "Executing nested loop INNER JOIN with {:?} operator: {} x {} rows",
696            operator,
697            left_table.row_count(),
698            right_table.row_count()
699        );
700
701        let mut result = DataTable::new("joined");
703
704        for col in &left_table.columns {
706            result.add_column(DataColumn {
707                name: col.name.clone(),
708                data_type: col.data_type.clone(),
709                nullable: col.nullable,
710                unique_values: col.unique_values,
711                null_count: col.null_count,
712                metadata: col.metadata.clone(),
713                qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(),     });
716        }
717
718        for col in &right_table.columns {
720            if !left_table
721                .columns
722                .iter()
723                .any(|left_col| left_col.name == col.name)
724            {
725                result.add_column(DataColumn {
726                    name: col.name.clone(),
727                    data_type: col.data_type.clone(),
728                    nullable: col.nullable,
729                    unique_values: col.unique_values,
730                    null_count: col.null_count,
731                    metadata: col.metadata.clone(),
732                    qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(),     });
735            } else {
736                let (column_name, qualified_name) = if let Some(alias) = join_alias {
737                    (
739                        format!("{}.{}", alias, col.name),
740                        Some(format!("{}.{}", alias, col.name)),
741                    )
742                } else {
743                    (format!("{}_right", col.name), col.qualified_name.clone())
745                };
746                result.add_column(DataColumn {
747                    name: column_name,
748                    data_type: col.data_type.clone(),
749                    nullable: col.nullable,
750                    unique_values: col.unique_values,
751                    null_count: col.null_count,
752                    metadata: col.metadata.clone(),
753                    qualified_name,
754                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
755                });
756            }
757        }
758
759        let mut match_count = 0;
761        for left_row in &left_table.rows {
762            let left_value = &left_row.values[left_col_idx];
763
764            for right_row in &right_table.rows {
765                let right_value = &right_row.values[right_col_idx];
766
767                if self.compare_values(left_value, right_value, operator) {
768                    let mut joined_row = DataRow { values: Vec::new() };
769                    joined_row.values.extend_from_slice(&left_row.values);
770                    joined_row.values.extend_from_slice(&right_row.values);
771                    result.add_row(joined_row);
772                    match_count += 1;
773                }
774            }
775        }
776
777        info!(
778            "Nested loop INNER JOIN complete: {} matches found in {:?}",
779            match_count,
780            start.elapsed()
781        );
782
783        Ok(result)
784    }
785
786    fn nested_loop_join_inner_multi(
788        &self,
789        left_table: Arc<DataTable>,
790        right_table: Arc<DataTable>,
791        conditions: &[crate::sql::parser::ast::SingleJoinCondition],
792        join_alias: &Option<String>,
793    ) -> Result<DataTable> {
794        let start = std::time::Instant::now();
795
796        info!(
797            "Executing nested loop INNER JOIN with {} conditions: {} x {} rows",
798            conditions.len(),
799            left_table.row_count(),
800            right_table.row_count()
801        );
802
803        let mut result = DataTable::new("joined");
805
806        for col in &left_table.columns {
808            result.add_column(DataColumn {
809                name: col.name.clone(),
810                data_type: col.data_type.clone(),
811                nullable: col.nullable,
812                unique_values: col.unique_values,
813                null_count: col.null_count,
814                metadata: col.metadata.clone(),
815                qualified_name: col.qualified_name.clone(),
816                source_table: col.source_table.clone(),
817            });
818        }
819
820        for col in &right_table.columns {
822            if !left_table
823                .columns
824                .iter()
825                .any(|left_col| left_col.name == col.name)
826            {
827                result.add_column(DataColumn {
828                    name: col.name.clone(),
829                    data_type: col.data_type.clone(),
830                    nullable: col.nullable,
831                    unique_values: col.unique_values,
832                    null_count: col.null_count,
833                    metadata: col.metadata.clone(),
834                    qualified_name: col.qualified_name.clone(),
835                    source_table: col.source_table.clone(),
836                });
837            } else {
838                let (column_name, qualified_name) = if let Some(alias) = join_alias {
839                    (
840                        format!("{}.{}", alias, col.name),
841                        Some(format!("{}.{}", alias, col.name)),
842                    )
843                } else {
844                    (format!("{}_right", col.name), col.qualified_name.clone())
845                };
846                result.add_column(DataColumn {
847                    name: column_name,
848                    data_type: col.data_type.clone(),
849                    nullable: col.nullable,
850                    unique_values: col.unique_values,
851                    null_count: col.null_count,
852                    metadata: col.metadata.clone(),
853                    qualified_name,
854                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
855                });
856            }
857        }
858
859        let mut left_col_indices = Vec::new();
861        for condition in conditions {
862            let left_idx = self.find_column_index(&left_table, &condition.left_column)?;
863            left_col_indices.push(left_idx);
864        }
865
866        let mut right_evaluator = ArithmeticEvaluator::new(&right_table);
868
869        let mut match_count = 0;
871        for left_row in &left_table.rows {
872            for (right_row_idx, right_row) in right_table.rows.iter().enumerate() {
873                let mut all_conditions_met = true;
875                for (i, condition) in conditions.iter().enumerate() {
876                    let left_value = &left_row.values[left_col_indices[i]];
877
878                    let right_value =
880                        match right_evaluator.evaluate(&condition.right_expr, right_row_idx) {
881                            Ok(val) => val,
882                            Err(_) => {
883                                all_conditions_met = false;
884                                break;
885                            }
886                        };
887
888                    if !self.compare_values(left_value, &right_value, &condition.operator) {
889                        all_conditions_met = false;
890                        break;
891                    }
892                }
893
894                if all_conditions_met {
895                    let mut joined_row = DataRow { values: Vec::new() };
896                    joined_row.values.extend_from_slice(&left_row.values);
897                    joined_row.values.extend_from_slice(&right_row.values);
898                    result.add_row(joined_row);
899                    match_count += 1;
900                }
901            }
902        }
903
904        info!(
905            "Nested loop INNER JOIN complete: {} matches found in {:?}",
906            match_count,
907            start.elapsed()
908        );
909
910        Ok(result)
911    }
912
913    fn nested_loop_join_left_multi(
915        &self,
916        left_table: Arc<DataTable>,
917        right_table: Arc<DataTable>,
918        conditions: &[crate::sql::parser::ast::SingleJoinCondition],
919        join_alias: &Option<String>,
920    ) -> Result<DataTable> {
921        let start = std::time::Instant::now();
922
923        info!(
924            "Executing nested loop LEFT JOIN with {} conditions: {} x {} rows",
925            conditions.len(),
926            left_table.row_count(),
927            right_table.row_count()
928        );
929
930        let mut result = DataTable::new("joined");
932
933        for col in &left_table.columns {
935            result.add_column(DataColumn {
936                name: col.name.clone(),
937                data_type: col.data_type.clone(),
938                nullable: col.nullable,
939                unique_values: col.unique_values,
940                null_count: col.null_count,
941                metadata: col.metadata.clone(),
942                qualified_name: col.qualified_name.clone(),
943                source_table: col.source_table.clone(),
944            });
945        }
946
947        for col in &right_table.columns {
949            if !left_table
950                .columns
951                .iter()
952                .any(|left_col| left_col.name == col.name)
953            {
954                result.add_column(DataColumn {
955                    name: col.name.clone(),
956                    data_type: col.data_type.clone(),
957                    nullable: true, unique_values: col.unique_values,
959                    null_count: col.null_count,
960                    metadata: col.metadata.clone(),
961                    qualified_name: col.qualified_name.clone(),
962                    source_table: col.source_table.clone(),
963                });
964            } else {
965                let (column_name, qualified_name) = if let Some(alias) = join_alias {
966                    (
967                        format!("{}.{}", alias, col.name),
968                        Some(format!("{}.{}", alias, col.name)),
969                    )
970                } else {
971                    (format!("{}_right", col.name), col.qualified_name.clone())
972                };
973                result.add_column(DataColumn {
974                    name: column_name,
975                    data_type: col.data_type.clone(),
976                    nullable: true, unique_values: col.unique_values,
978                    null_count: col.null_count,
979                    metadata: col.metadata.clone(),
980                    qualified_name,
981                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
982                });
983            }
984        }
985
986        let mut left_col_indices = Vec::new();
988        for condition in conditions {
989            let left_idx = self.find_column_index(&left_table, &condition.left_column)?;
990            left_col_indices.push(left_idx);
991        }
992
993        let mut right_evaluator = ArithmeticEvaluator::new(&right_table);
995
996        let mut match_count = 0;
998        let mut null_count = 0;
999
1000        for left_row in &left_table.rows {
1001            let mut found_match = false;
1002
1003            for (right_row_idx, right_row) in right_table.rows.iter().enumerate() {
1004                let mut all_conditions_met = true;
1006                for (i, condition) in conditions.iter().enumerate() {
1007                    let left_value = &left_row.values[left_col_indices[i]];
1008
1009                    let right_value =
1011                        match right_evaluator.evaluate(&condition.right_expr, right_row_idx) {
1012                            Ok(val) => val,
1013                            Err(_) => {
1014                                all_conditions_met = false;
1015                                break;
1016                            }
1017                        };
1018
1019                    if !self.compare_values(left_value, &right_value, &condition.operator) {
1020                        all_conditions_met = false;
1021                        break;
1022                    }
1023                }
1024
1025                if all_conditions_met {
1026                    let mut joined_row = DataRow { values: Vec::new() };
1027                    joined_row.values.extend_from_slice(&left_row.values);
1028                    joined_row.values.extend_from_slice(&right_row.values);
1029                    result.add_row(joined_row);
1030                    match_count += 1;
1031                    found_match = true;
1032                }
1033            }
1034
1035            if !found_match {
1037                let mut joined_row = DataRow { values: Vec::new() };
1038                joined_row.values.extend_from_slice(&left_row.values);
1039                for _ in 0..right_table.column_count() {
1040                    joined_row.values.push(DataValue::Null);
1041                }
1042                result.add_row(joined_row);
1043                null_count += 1;
1044            }
1045        }
1046
1047        info!(
1048            "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
1049            match_count,
1050            null_count,
1051            start.elapsed()
1052        );
1053
1054        Ok(result)
1055    }
1056
1057    fn nested_loop_join_left(
1059        &self,
1060        left_table: Arc<DataTable>,
1061        right_table: Arc<DataTable>,
1062        left_col_idx: usize,
1063        right_col_idx: usize,
1064        operator: &JoinOperator,
1065        join_alias: &Option<String>,
1066    ) -> Result<DataTable> {
1067        let start = std::time::Instant::now();
1068
1069        info!(
1070            "Executing nested loop LEFT JOIN with {:?} operator: {} x {} rows",
1071            operator,
1072            left_table.row_count(),
1073            right_table.row_count()
1074        );
1075
1076        let mut result = DataTable::new("joined");
1078
1079        for col in &left_table.columns {
1081            result.add_column(DataColumn {
1082                name: col.name.clone(),
1083                data_type: col.data_type.clone(),
1084                nullable: col.nullable,
1085                unique_values: col.unique_values,
1086                null_count: col.null_count,
1087                metadata: col.metadata.clone(),
1088                qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(),     });
1091        }
1092
1093        for col in &right_table.columns {
1095            if !left_table
1096                .columns
1097                .iter()
1098                .any(|left_col| left_col.name == col.name)
1099            {
1100                result.add_column(DataColumn {
1101                    name: col.name.clone(),
1102                    data_type: col.data_type.clone(),
1103                    nullable: true, unique_values: col.unique_values,
1105                    null_count: col.null_count,
1106                    metadata: col.metadata.clone(),
1107                    qualified_name: col.qualified_name.clone(), source_table: col.source_table.clone(),     });
1110            } else {
1111                let (column_name, qualified_name) = if let Some(alias) = join_alias {
1112                    (
1114                        format!("{}.{}", alias, col.name),
1115                        Some(format!("{}.{}", alias, col.name)),
1116                    )
1117                } else {
1118                    (format!("{}_right", col.name), col.qualified_name.clone())
1120                };
1121                result.add_column(DataColumn {
1122                    name: column_name,
1123                    data_type: col.data_type.clone(),
1124                    nullable: true, unique_values: col.unique_values,
1126                    null_count: col.null_count,
1127                    metadata: col.metadata.clone(),
1128                    qualified_name,
1129                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
1130                });
1131            }
1132        }
1133
1134        let mut match_count = 0;
1136        let mut null_count = 0;
1137
1138        for left_row in &left_table.rows {
1139            let left_value = &left_row.values[left_col_idx];
1140            let mut found_match = false;
1141
1142            for right_row in &right_table.rows {
1143                let right_value = &right_row.values[right_col_idx];
1144
1145                if self.compare_values(left_value, right_value, operator) {
1146                    let mut joined_row = DataRow { values: Vec::new() };
1147                    joined_row.values.extend_from_slice(&left_row.values);
1148                    joined_row.values.extend_from_slice(&right_row.values);
1149                    result.add_row(joined_row);
1150                    match_count += 1;
1151                    found_match = true;
1152                }
1153            }
1154
1155            if !found_match {
1157                let mut joined_row = DataRow { values: Vec::new() };
1158                joined_row.values.extend_from_slice(&left_row.values);
1159                for _ in 0..right_table.column_count() {
1160                    joined_row.values.push(DataValue::Null);
1161                }
1162                result.add_row(joined_row);
1163                null_count += 1;
1164            }
1165        }
1166
1167        info!(
1168            "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
1169            match_count,
1170            null_count,
1171            start.elapsed()
1172        );
1173
1174        Ok(result)
1175    }
1176}