sql_cli/data/
hash_join.rs

1//! Hash join implementation for efficient JOIN operations
2
3use anyhow::{anyhow, Result};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tracing::{debug, info};
7
8use crate::data::arithmetic_evaluator::ArithmeticEvaluator;
9use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
10use crate::sql::parser::ast::{JoinClause, JoinOperator, JoinType};
11use crate::sql::recursive_parser::SqlExpression;
12
13/// Hash join executor for efficient JOIN operations
14pub struct HashJoinExecutor {
15    case_insensitive: bool,
16}
17
18impl HashJoinExecutor {
19    pub fn new(case_insensitive: bool) -> Self {
20        Self { case_insensitive }
21    }
22
23    /// Execute a single join operation
24    pub fn execute_join(
25        &self,
26        left_table: Arc<DataTable>,
27        join_clause: &JoinClause,
28        right_table: Arc<DataTable>,
29    ) -> Result<DataTable> {
30        info!(
31            "Executing {:?} JOIN: {} rows x {} rows with {} conditions",
32            join_clause.join_type,
33            left_table.row_count(),
34            right_table.row_count(),
35            join_clause.condition.conditions.len()
36        );
37
38        // For multiple conditions, we need to track all column indices
39        // If any condition has a complex right expression, we must use nested loop
40        let mut condition_indices = Vec::new();
41        let mut all_equal = true;
42        let mut has_complex_expr = false;
43
44        for single_condition in &join_clause.condition.conditions {
45            // Check if both sides are simple column references
46            let left_col_name = Self::extract_simple_column_name(&single_condition.left_expr);
47            let right_col_name = Self::extract_simple_column_name(&single_condition.right_expr);
48
49            if left_col_name.is_none() || right_col_name.is_none() {
50                // Complex expression on either side - must use nested loop with expression evaluation
51                has_complex_expr = true;
52                all_equal = false; // Force nested loop
53                break;
54            }
55
56            let (left_col_idx, right_col_idx) = self.resolve_join_columns(
57                &left_table,
58                &right_table,
59                &left_col_name.unwrap(),
60                &right_col_name.unwrap(),
61            )?;
62
63            if single_condition.operator != JoinOperator::Equal {
64                all_equal = false;
65            }
66
67            condition_indices.push((
68                left_col_idx,
69                right_col_idx,
70                single_condition.operator.clone(),
71            ));
72        }
73
74        // Choose join algorithm based on operators - use hash join only if:
75        // 1. All conditions use equality
76        // 2. No complex expressions (all simple column references)
77        let use_hash_join = all_equal && !has_complex_expr;
78
79        // Perform the appropriate join based on type and operator
80        match join_clause.join_type {
81            JoinType::Inner => {
82                if use_hash_join && condition_indices.len() == 1 {
83                    // Single equality condition with simple columns - use optimized hash join
84                    let (left_col_idx, right_col_idx, _) = condition_indices[0];
85                    let left_col_name = Self::extract_simple_column_name(
86                        &join_clause.condition.conditions[0].left_expr,
87                    )
88                    .expect("left_expr should be a simple column in hash join path");
89                    let right_col_name = Self::extract_simple_column_name(
90                        &join_clause.condition.conditions[0].right_expr,
91                    )
92                    .expect("right_expr should be a simple column in hash join path");
93                    self.hash_join_inner(
94                        left_table,
95                        right_table,
96                        left_col_idx,
97                        right_col_idx,
98                        &left_col_name,
99                        &right_col_name,
100                        &join_clause.alias,
101                    )
102                } else {
103                    // Multiple conditions, inequality, or expressions - use nested loop join
104                    self.nested_loop_join_inner_multi(
105                        left_table,
106                        right_table,
107                        &join_clause.condition.conditions,
108                        &join_clause.alias,
109                    )
110                }
111            }
112            JoinType::Left => {
113                if use_hash_join && condition_indices.len() == 1 {
114                    // Single equality condition with simple columns - use optimized hash join
115                    let (left_col_idx, right_col_idx, _) = condition_indices[0];
116                    let left_col_name = Self::extract_simple_column_name(
117                        &join_clause.condition.conditions[0].left_expr,
118                    )
119                    .expect("left_expr should be a simple column in hash join path");
120                    let right_col_name = Self::extract_simple_column_name(
121                        &join_clause.condition.conditions[0].right_expr,
122                    )
123                    .expect("right_expr should be a simple column in hash join path");
124                    self.hash_join_left(
125                        left_table,
126                        right_table,
127                        left_col_idx,
128                        right_col_idx,
129                        &left_col_name,
130                        &right_col_name,
131                        &join_clause.alias,
132                    )
133                } else {
134                    // Multiple conditions, inequality, or expressions - use nested loop join
135                    self.nested_loop_join_left_multi(
136                        left_table,
137                        right_table,
138                        &join_clause.condition.conditions,
139                        &join_clause.alias,
140                    )
141                }
142            }
143            JoinType::Right => {
144                // Swap condition indices for right join
145                let swapped_indices: Vec<(usize, usize, JoinOperator)> = condition_indices
146                    .into_iter()
147                    .map(|(l, r, op)| (r, l, self.reverse_operator(&op)))
148                    .collect();
149
150                if use_hash_join && swapped_indices.len() == 1 {
151                    // Right join is just a left join with tables swapped
152                    let (right_col_idx, left_col_idx, _) = swapped_indices[0];
153                    let left_col_name = Self::extract_simple_column_name(
154                        &join_clause.condition.conditions[0].left_expr,
155                    )
156                    .expect("left_expr should be a simple column in hash join path");
157                    let right_col_name = Self::extract_simple_column_name(
158                        &join_clause.condition.conditions[0].right_expr,
159                    )
160                    .expect("right_expr should be a simple column in hash join path");
161                    self.hash_join_left(
162                        right_table,
163                        left_table,
164                        right_col_idx,
165                        left_col_idx,
166                        &right_col_name,
167                        &left_col_name,
168                        &join_clause.alias,
169                    )
170                } else {
171                    // Right join is just a left join with tables swapped
172                    // Pass the original conditions - nested_loop_join_left_multi will handle the swap
173                    self.nested_loop_join_left_multi(
174                        right_table,
175                        left_table,
176                        &join_clause.condition.conditions,
177                        &join_clause.alias,
178                    )
179                }
180            }
181            JoinType::Cross => self.cross_join(left_table, right_table),
182            JoinType::Full => {
183                return Err(anyhow!("FULL OUTER JOIN not yet implemented"));
184            }
185        }
186    }
187
188    /// Extract column name from expression if it's a simple column reference
189    /// Returns None if the expression is complex (function, operation, etc.)
190    fn extract_simple_column_name(expr: &SqlExpression) -> Option<String> {
191        match expr {
192            SqlExpression::Column(col_ref) => {
193                // Build the full column name including table prefix if present
194                if let Some(table_prefix) = &col_ref.table_prefix {
195                    Some(format!("{}.{}", table_prefix, col_ref.name))
196                } else {
197                    Some(col_ref.name.clone())
198                }
199            }
200            _ => None, // Complex expression - cannot use fast path
201        }
202    }
203
204    /// Resolve which table each column belongs to in a join condition
205    fn resolve_join_columns(
206        &self,
207        left_table: &DataTable,
208        right_table: &DataTable,
209        left_col_name: &str,
210        right_col_name: &str,
211    ) -> Result<(usize, usize)> {
212        // Try to find the left column in left table, then right table
213        let left_col_idx = if let Ok(idx) = self.find_column_index(left_table, left_col_name) {
214            idx
215        } else if let Ok(_idx) = self.find_column_index(right_table, left_col_name) {
216            // The "left" column in the condition is actually from the right table
217            // This means we need to swap the comparison
218            return Err(anyhow!(
219                "Column '{}' found in right table but specified as left operand. \
220                Please rewrite the condition with columns in correct positions.",
221                left_col_name
222            ));
223        } else {
224            return Err(anyhow!(
225                "Column '{}' not found in either table",
226                left_col_name
227            ));
228        };
229
230        // Try to find the right column in right table, then left table
231        let right_col_idx = if let Ok(idx) = self.find_column_index(right_table, right_col_name) {
232            idx
233        } else if let Ok(_idx) = self.find_column_index(left_table, right_col_name) {
234            // The "right" column in the condition is actually from the left table
235            // This means we need to swap the comparison
236            return Err(anyhow!(
237                "Column '{}' found in left table but specified as right operand. \
238                Please rewrite the condition with columns in correct positions.",
239                right_col_name
240            ));
241        } else {
242            return Err(anyhow!(
243                "Column '{}' not found in either table",
244                right_col_name
245            ));
246        };
247
248        Ok((left_col_idx, right_col_idx))
249    }
250
251    /// Find column index in a table
252    fn find_column_index(&self, table: &DataTable, col_name: &str) -> Result<usize> {
253        // Handle table-qualified column names (e.g., "t1.id")
254        let col_name = if let Some(dot_pos) = col_name.rfind('.') {
255            &col_name[dot_pos + 1..]
256        } else {
257            col_name
258        };
259
260        debug!(
261            "Looking for column '{}' in table with columns: {:?}",
262            col_name,
263            table.column_names()
264        );
265
266        table
267            .columns
268            .iter()
269            .position(|col| {
270                if self.case_insensitive {
271                    col.name.to_lowercase() == col_name.to_lowercase()
272                } else {
273                    col.name == col_name
274                }
275            })
276            .ok_or_else(|| anyhow!("Column '{}' not found in table", col_name))
277    }
278
279    /// Hash join implementation for INNER JOIN
280    fn hash_join_inner(
281        &self,
282        left_table: Arc<DataTable>,
283        right_table: Arc<DataTable>,
284        left_col_idx: usize,
285        right_col_idx: usize,
286        _left_col_name: &str,
287        _right_col_name: &str,
288        join_alias: &Option<String>,
289    ) -> Result<DataTable> {
290        let start = std::time::Instant::now();
291
292        // Determine which table to use for building the hash index (prefer smaller)
293        let (build_table, probe_table, build_col_idx, probe_col_idx, build_is_left) =
294            if left_table.row_count() <= right_table.row_count() {
295                (
296                    left_table.clone(),
297                    right_table.clone(),
298                    left_col_idx,
299                    right_col_idx,
300                    true,
301                )
302            } else {
303                (
304                    right_table.clone(),
305                    left_table.clone(),
306                    right_col_idx,
307                    left_col_idx,
308                    false,
309                )
310            };
311
312        debug!(
313            "Building hash index on {} table ({} rows)",
314            if build_is_left { "left" } else { "right" },
315            build_table.row_count()
316        );
317
318        // Build hash index on the smaller table
319        let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
320        for (row_idx, row) in build_table.rows.iter().enumerate() {
321            let key = row.values[build_col_idx].clone();
322            hash_index.entry(key).or_default().push(row_idx);
323        }
324
325        debug!(
326            "Hash index built with {} unique keys in {:?}",
327            hash_index.len(),
328            start.elapsed()
329        );
330
331        // Create result table with columns from both tables
332        let mut result = DataTable::new("joined");
333
334        // Add columns from left table
335        for col in &left_table.columns {
336            result.add_column(DataColumn {
337                name: col.name.clone(),
338                data_type: col.data_type.clone(),
339                nullable: col.nullable,
340                unique_values: col.unique_values,
341                null_count: col.null_count,
342                metadata: col.metadata.clone(),
343                qualified_name: col.qualified_name.clone(), // Preserve qualified name
344                source_table: col.source_table.clone(),     // Preserve source table
345            });
346        }
347
348        // Add columns from right table
349        for col in &right_table.columns {
350            // Skip columns with duplicate names for now
351            if !left_table
352                .columns
353                .iter()
354                .any(|left_col| left_col.name == col.name)
355            {
356                result.add_column(DataColumn {
357                    name: col.name.clone(),
358                    data_type: col.data_type.clone(),
359                    nullable: col.nullable,
360                    unique_values: col.unique_values,
361                    null_count: col.null_count,
362                    metadata: col.metadata.clone(),
363                    qualified_name: col.qualified_name.clone(), // Preserve qualified name
364                    source_table: col.source_table.clone(),     // Preserve source table
365                });
366            } else {
367                // If there's a name conflict, add with a suffix
368                let (column_name, qualified_name) = if let Some(alias) = join_alias {
369                    // Use the join alias for the column name
370                    (
371                        format!("{}.{}", alias, col.name),
372                        Some(format!("{}.{}", alias, col.name)),
373                    )
374                } else {
375                    // Fall back to _right suffix
376                    (format!("{}_right", col.name), col.qualified_name.clone())
377                };
378                result.add_column(DataColumn {
379                    name: column_name,
380                    data_type: col.data_type.clone(),
381                    nullable: col.nullable,
382                    unique_values: col.unique_values,
383                    null_count: col.null_count,
384                    metadata: col.metadata.clone(),
385                    qualified_name,
386                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
387                });
388            }
389        }
390
391        debug!(
392            "Joined table will have {} columns: {:?}",
393            result.column_count(),
394            result.column_names()
395        );
396
397        // Probe phase: iterate through the larger table
398        let mut match_count = 0;
399        for probe_row in &probe_table.rows {
400            let probe_key = &probe_row.values[probe_col_idx];
401
402            if let Some(matching_indices) = hash_index.get(probe_key) {
403                for &build_idx in matching_indices {
404                    let build_row = &build_table.rows[build_idx];
405
406                    // Create joined row based on which table was used for building
407                    let mut joined_row = DataRow { values: Vec::new() };
408
409                    if build_is_left {
410                        // Build was left, probe was right
411                        joined_row.values.extend_from_slice(&build_row.values);
412                        joined_row.values.extend_from_slice(&probe_row.values);
413                    } else {
414                        // Build was right, probe was left
415                        joined_row.values.extend_from_slice(&probe_row.values);
416                        joined_row.values.extend_from_slice(&build_row.values);
417                    }
418
419                    result.add_row(joined_row);
420                    match_count += 1;
421                }
422            }
423        }
424
425        // Debug: log the qualified names in the result table
426        let qualified_cols: Vec<String> = result
427            .columns
428            .iter()
429            .filter_map(|c| c.qualified_name.clone())
430            .collect();
431
432        info!(
433            "INNER JOIN complete: {} matches found in {:?}. Result has {} columns ({} qualified: {:?})",
434            match_count,
435            start.elapsed(),
436            result.columns.len(),
437            qualified_cols.len(),
438            qualified_cols
439        );
440
441        Ok(result)
442    }
443
444    /// Hash join implementation for LEFT OUTER JOIN
445    fn hash_join_left(
446        &self,
447        left_table: Arc<DataTable>,
448        right_table: Arc<DataTable>,
449        left_col_idx: usize,
450        right_col_idx: usize,
451        _left_col_name: &str,
452        _right_col_name: &str,
453        join_alias: &Option<String>,
454    ) -> Result<DataTable> {
455        let start = std::time::Instant::now();
456
457        debug!(
458            "Building hash index on right table ({} rows)",
459            right_table.row_count()
460        );
461
462        // Build hash index on right table
463        let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
464        for (row_idx, row) in right_table.rows.iter().enumerate() {
465            let key = row.values[right_col_idx].clone();
466            hash_index.entry(key).or_default().push(row_idx);
467        }
468
469        // Create result table with columns from both tables
470        let mut result = DataTable::new("joined");
471
472        // Add columns from left table
473        for col in &left_table.columns {
474            result.add_column(DataColumn {
475                name: col.name.clone(),
476                data_type: col.data_type.clone(),
477                nullable: col.nullable,
478                unique_values: col.unique_values,
479                null_count: col.null_count,
480                metadata: col.metadata.clone(),
481                qualified_name: col.qualified_name.clone(), // Preserve qualified name
482                source_table: col.source_table.clone(),     // Preserve source table
483            });
484        }
485
486        // Add columns from right table (all nullable for LEFT JOIN)
487        for col in &right_table.columns {
488            // Skip columns with duplicate names for now
489            if !left_table
490                .columns
491                .iter()
492                .any(|left_col| left_col.name == col.name)
493            {
494                result.add_column(DataColumn {
495                    name: col.name.clone(),
496                    data_type: col.data_type.clone(),
497                    nullable: true, // Always nullable for outer join
498                    unique_values: col.unique_values,
499                    null_count: col.null_count,
500                    metadata: col.metadata.clone(),
501                    qualified_name: col.qualified_name.clone(), // Preserve qualified name
502                    source_table: col.source_table.clone(),     // Preserve source table
503                });
504            } else {
505                // If there's a name conflict, add with a suffix
506                let (column_name, qualified_name) = if let Some(alias) = join_alias {
507                    // Use the join alias for the column name
508                    (
509                        format!("{}.{}", alias, col.name),
510                        Some(format!("{}.{}", alias, col.name)),
511                    )
512                } else {
513                    // Fall back to _right suffix
514                    (format!("{}_right", col.name), col.qualified_name.clone())
515                };
516                result.add_column(DataColumn {
517                    name: column_name,
518                    data_type: col.data_type.clone(),
519                    nullable: true, // Always nullable for outer join
520                    unique_values: col.unique_values,
521                    null_count: col.null_count,
522                    metadata: col.metadata.clone(),
523                    qualified_name,
524                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
525                });
526            }
527        }
528
529        debug!(
530            "LEFT JOIN table will have {} columns: {:?}",
531            result.column_count(),
532            result.column_names()
533        );
534
535        // Probe phase: iterate through left table
536        let mut match_count = 0;
537        let mut null_count = 0;
538
539        for left_row in &left_table.rows {
540            let left_key = &left_row.values[left_col_idx];
541
542            if let Some(matching_indices) = hash_index.get(left_key) {
543                // Found matches - emit joined rows
544                for &right_idx in matching_indices {
545                    let right_row = &right_table.rows[right_idx];
546
547                    let mut joined_row = DataRow { values: Vec::new() };
548                    joined_row.values.extend_from_slice(&left_row.values);
549                    joined_row.values.extend_from_slice(&right_row.values);
550
551                    result.add_row(joined_row);
552                    match_count += 1;
553                }
554            } else {
555                // No match - emit left row with NULLs for right columns
556                let mut joined_row = DataRow { values: Vec::new() };
557                joined_row.values.extend_from_slice(&left_row.values);
558
559                // Add NULL values for all right table columns
560                for _ in 0..right_table.column_count() {
561                    joined_row.values.push(DataValue::Null);
562                }
563
564                result.add_row(joined_row);
565                null_count += 1;
566            }
567        }
568
569        // Debug: log the qualified names in the result table
570        let qualified_cols: Vec<String> = result
571            .columns
572            .iter()
573            .filter_map(|c| c.qualified_name.clone())
574            .collect();
575
576        info!(
577            "LEFT JOIN complete: {} matches, {} nulls in {:?}. Result has {} columns ({} qualified: {:?})",
578            match_count,
579            null_count,
580            start.elapsed(),
581            result.columns.len(),
582            qualified_cols.len(),
583            qualified_cols
584        );
585
586        Ok(result)
587    }
588
589    /// Cross join implementation
590    fn cross_join(
591        &self,
592        left_table: Arc<DataTable>,
593        right_table: Arc<DataTable>,
594    ) -> Result<DataTable> {
595        let start = std::time::Instant::now();
596
597        // Check for potential memory explosion
598        let result_rows = left_table.row_count() * right_table.row_count();
599        if result_rows > 1_000_000 {
600            return Err(anyhow!(
601                "CROSS JOIN would produce {} rows, which exceeds the safety limit",
602                result_rows
603            ));
604        }
605
606        // Create result table
607        let mut result = DataTable::new("joined");
608
609        // Add columns from both tables
610        for col in &left_table.columns {
611            result.add_column(col.clone());
612        }
613        for col in &right_table.columns {
614            result.add_column(col.clone());
615        }
616
617        // Generate Cartesian product
618        for left_row in &left_table.rows {
619            for right_row in &right_table.rows {
620                let mut joined_row = DataRow { values: Vec::new() };
621                joined_row.values.extend_from_slice(&left_row.values);
622                joined_row.values.extend_from_slice(&right_row.values);
623                result.add_row(joined_row);
624            }
625        }
626
627        info!(
628            "CROSS JOIN complete: {} rows in {:?}",
629            result.row_count(),
630            start.elapsed()
631        );
632
633        Ok(result)
634    }
635
636    /// Qualify column name to avoid conflicts
637    fn qualify_column_name(
638        &self,
639        col_name: &str,
640        table_side: &str,
641        left_join_col: &str,
642        right_join_col: &str,
643    ) -> String {
644        // Extract base column name (without table prefix)
645        let base_name = if let Some(dot_pos) = col_name.rfind('.') {
646            &col_name[dot_pos + 1..]
647        } else {
648            col_name
649        };
650
651        let left_base = if let Some(dot_pos) = left_join_col.rfind('.') {
652            &left_join_col[dot_pos + 1..]
653        } else {
654            left_join_col
655        };
656
657        let right_base = if let Some(dot_pos) = right_join_col.rfind('.') {
658            &right_join_col[dot_pos + 1..]
659        } else {
660            right_join_col
661        };
662
663        // If this column name appears in both join columns, qualify it
664        if base_name == left_base || base_name == right_base {
665            format!("{}_{}", table_side, base_name)
666        } else {
667            col_name.to_string()
668        }
669    }
670
671    /// Reverse a join operator for right joins
672    fn reverse_operator(&self, op: &JoinOperator) -> JoinOperator {
673        match op {
674            JoinOperator::Equal => JoinOperator::Equal,
675            JoinOperator::NotEqual => JoinOperator::NotEqual,
676            JoinOperator::LessThan => JoinOperator::GreaterThan,
677            JoinOperator::GreaterThan => JoinOperator::LessThan,
678            JoinOperator::LessThanOrEqual => JoinOperator::GreaterThanOrEqual,
679            JoinOperator::GreaterThanOrEqual => JoinOperator::LessThanOrEqual,
680        }
681    }
682
683    /// Compare two values based on the join operator
684    fn compare_values(&self, left: &DataValue, right: &DataValue, op: &JoinOperator) -> bool {
685        match op {
686            JoinOperator::Equal => left == right,
687            JoinOperator::NotEqual => left != right,
688            JoinOperator::LessThan => left < right,
689            JoinOperator::GreaterThan => left > right,
690            JoinOperator::LessThanOrEqual => left <= right,
691            JoinOperator::GreaterThanOrEqual => left >= right,
692        }
693    }
694
695    /// Nested loop join for INNER JOIN with inequality conditions
696    fn nested_loop_join_inner(
697        &self,
698        left_table: Arc<DataTable>,
699        right_table: Arc<DataTable>,
700        left_col_idx: usize,
701        right_col_idx: usize,
702        operator: &JoinOperator,
703        join_alias: &Option<String>,
704    ) -> Result<DataTable> {
705        let start = std::time::Instant::now();
706
707        info!(
708            "Executing nested loop INNER JOIN with {:?} operator: {} x {} rows",
709            operator,
710            left_table.row_count(),
711            right_table.row_count()
712        );
713
714        // Create result table with columns from both tables
715        let mut result = DataTable::new("joined");
716
717        // Add columns from left table
718        for col in &left_table.columns {
719            result.add_column(DataColumn {
720                name: col.name.clone(),
721                data_type: col.data_type.clone(),
722                nullable: col.nullable,
723                unique_values: col.unique_values,
724                null_count: col.null_count,
725                metadata: col.metadata.clone(),
726                qualified_name: col.qualified_name.clone(), // Preserve qualified name
727                source_table: col.source_table.clone(),     // Preserve source table
728            });
729        }
730
731        // Add columns from right table
732        for col in &right_table.columns {
733            if !left_table
734                .columns
735                .iter()
736                .any(|left_col| left_col.name == col.name)
737            {
738                result.add_column(DataColumn {
739                    name: col.name.clone(),
740                    data_type: col.data_type.clone(),
741                    nullable: col.nullable,
742                    unique_values: col.unique_values,
743                    null_count: col.null_count,
744                    metadata: col.metadata.clone(),
745                    qualified_name: col.qualified_name.clone(), // Preserve qualified name
746                    source_table: col.source_table.clone(),     // Preserve source table
747                });
748            } else {
749                let (column_name, qualified_name) = if let Some(alias) = join_alias {
750                    // Use the join alias for the column name
751                    (
752                        format!("{}.{}", alias, col.name),
753                        Some(format!("{}.{}", alias, col.name)),
754                    )
755                } else {
756                    // Fall back to _right suffix
757                    (format!("{}_right", col.name), col.qualified_name.clone())
758                };
759                result.add_column(DataColumn {
760                    name: column_name,
761                    data_type: col.data_type.clone(),
762                    nullable: col.nullable,
763                    unique_values: col.unique_values,
764                    null_count: col.null_count,
765                    metadata: col.metadata.clone(),
766                    qualified_name,
767                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
768                });
769            }
770        }
771
772        // Nested loop join
773        let mut match_count = 0;
774        for left_row in &left_table.rows {
775            let left_value = &left_row.values[left_col_idx];
776
777            for right_row in &right_table.rows {
778                let right_value = &right_row.values[right_col_idx];
779
780                if self.compare_values(left_value, right_value, operator) {
781                    let mut joined_row = DataRow { values: Vec::new() };
782                    joined_row.values.extend_from_slice(&left_row.values);
783                    joined_row.values.extend_from_slice(&right_row.values);
784                    result.add_row(joined_row);
785                    match_count += 1;
786                }
787            }
788        }
789
790        info!(
791            "Nested loop INNER JOIN complete: {} matches found in {:?}",
792            match_count,
793            start.elapsed()
794        );
795
796        Ok(result)
797    }
798
799    /// Nested loop join for INNER JOIN with multiple conditions
800    fn nested_loop_join_inner_multi(
801        &self,
802        left_table: Arc<DataTable>,
803        right_table: Arc<DataTable>,
804        conditions: &[crate::sql::parser::ast::SingleJoinCondition],
805        join_alias: &Option<String>,
806    ) -> Result<DataTable> {
807        let start = std::time::Instant::now();
808
809        info!(
810            "Executing nested loop INNER JOIN with {} conditions: {} x {} rows",
811            conditions.len(),
812            left_table.row_count(),
813            right_table.row_count()
814        );
815
816        // Create result table with columns from both tables
817        let mut result = DataTable::new("joined");
818
819        // Add columns from left table
820        for col in &left_table.columns {
821            result.add_column(DataColumn {
822                name: col.name.clone(),
823                data_type: col.data_type.clone(),
824                nullable: col.nullable,
825                unique_values: col.unique_values,
826                null_count: col.null_count,
827                metadata: col.metadata.clone(),
828                qualified_name: col.qualified_name.clone(),
829                source_table: col.source_table.clone(),
830            });
831        }
832
833        // Add columns from right table
834        for col in &right_table.columns {
835            if !left_table
836                .columns
837                .iter()
838                .any(|left_col| left_col.name == col.name)
839            {
840                result.add_column(DataColumn {
841                    name: col.name.clone(),
842                    data_type: col.data_type.clone(),
843                    nullable: col.nullable,
844                    unique_values: col.unique_values,
845                    null_count: col.null_count,
846                    metadata: col.metadata.clone(),
847                    qualified_name: col.qualified_name.clone(),
848                    source_table: col.source_table.clone(),
849                });
850            } else {
851                let (column_name, qualified_name) = if let Some(alias) = join_alias {
852                    (
853                        format!("{}.{}", alias, col.name),
854                        Some(format!("{}.{}", alias, col.name)),
855                    )
856                } else {
857                    (format!("{}_right", col.name), col.qualified_name.clone())
858                };
859                result.add_column(DataColumn {
860                    name: column_name,
861                    data_type: col.data_type.clone(),
862                    nullable: col.nullable,
863                    unique_values: col.unique_values,
864                    null_count: col.null_count,
865                    metadata: col.metadata.clone(),
866                    qualified_name,
867                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
868                });
869            }
870        }
871
872        // Create evaluators for both sides
873        let mut left_evaluator = ArithmeticEvaluator::new(&left_table);
874        let mut right_evaluator = ArithmeticEvaluator::new(&right_table);
875
876        // Nested loop join with multiple conditions
877        let mut match_count = 0;
878        for (left_row_idx, left_row) in left_table.rows.iter().enumerate() {
879            for (right_row_idx, right_row) in right_table.rows.iter().enumerate() {
880                // Check all conditions - all must be true for a match
881                let mut all_conditions_met = true;
882                for condition in conditions.iter() {
883                    // Evaluate left expression for this row
884                    let left_value =
885                        match left_evaluator.evaluate(&condition.left_expr, left_row_idx) {
886                            Ok(val) => val,
887                            Err(_) => {
888                                all_conditions_met = false;
889                                break;
890                            }
891                        };
892
893                    // Evaluate right expression for this row
894                    let right_value =
895                        match right_evaluator.evaluate(&condition.right_expr, right_row_idx) {
896                            Ok(val) => val,
897                            Err(_) => {
898                                all_conditions_met = false;
899                                break;
900                            }
901                        };
902
903                    if !self.compare_values(&left_value, &right_value, &condition.operator) {
904                        all_conditions_met = false;
905                        break;
906                    }
907                }
908
909                if all_conditions_met {
910                    let mut joined_row = DataRow { values: Vec::new() };
911                    joined_row.values.extend_from_slice(&left_row.values);
912                    joined_row.values.extend_from_slice(&right_row.values);
913                    result.add_row(joined_row);
914                    match_count += 1;
915                }
916            }
917        }
918
919        info!(
920            "Nested loop INNER JOIN complete: {} matches found in {:?}",
921            match_count,
922            start.elapsed()
923        );
924
925        Ok(result)
926    }
927
928    /// Nested loop join for LEFT JOIN with multiple conditions
929    fn nested_loop_join_left_multi(
930        &self,
931        left_table: Arc<DataTable>,
932        right_table: Arc<DataTable>,
933        conditions: &[crate::sql::parser::ast::SingleJoinCondition],
934        join_alias: &Option<String>,
935    ) -> Result<DataTable> {
936        let start = std::time::Instant::now();
937
938        info!(
939            "Executing nested loop LEFT JOIN with {} conditions: {} x {} rows",
940            conditions.len(),
941            left_table.row_count(),
942            right_table.row_count()
943        );
944
945        // Create result table with columns from both tables
946        let mut result = DataTable::new("joined");
947
948        // Add columns from left table
949        for col in &left_table.columns {
950            result.add_column(DataColumn {
951                name: col.name.clone(),
952                data_type: col.data_type.clone(),
953                nullable: col.nullable,
954                unique_values: col.unique_values,
955                null_count: col.null_count,
956                metadata: col.metadata.clone(),
957                qualified_name: col.qualified_name.clone(),
958                source_table: col.source_table.clone(),
959            });
960        }
961
962        // Add columns from right table (all nullable for LEFT JOIN)
963        for col in &right_table.columns {
964            if !left_table
965                .columns
966                .iter()
967                .any(|left_col| left_col.name == col.name)
968            {
969                result.add_column(DataColumn {
970                    name: col.name.clone(),
971                    data_type: col.data_type.clone(),
972                    nullable: true, // Always nullable for outer join
973                    unique_values: col.unique_values,
974                    null_count: col.null_count,
975                    metadata: col.metadata.clone(),
976                    qualified_name: col.qualified_name.clone(),
977                    source_table: col.source_table.clone(),
978                });
979            } else {
980                let (column_name, qualified_name) = if let Some(alias) = join_alias {
981                    (
982                        format!("{}.{}", alias, col.name),
983                        Some(format!("{}.{}", alias, col.name)),
984                    )
985                } else {
986                    (format!("{}_right", col.name), col.qualified_name.clone())
987                };
988                result.add_column(DataColumn {
989                    name: column_name,
990                    data_type: col.data_type.clone(),
991                    nullable: true, // Always nullable for outer join
992                    unique_values: col.unique_values,
993                    null_count: col.null_count,
994                    metadata: col.metadata.clone(),
995                    qualified_name,
996                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
997                });
998            }
999        }
1000
1001        // Create evaluators for both sides
1002        let mut left_evaluator = ArithmeticEvaluator::new(&left_table);
1003        let mut right_evaluator = ArithmeticEvaluator::new(&right_table);
1004
1005        // Nested loop join with multiple conditions
1006        let mut match_count = 0;
1007        let mut null_count = 0;
1008
1009        for (left_row_idx, left_row) in left_table.rows.iter().enumerate() {
1010            let mut found_match = false;
1011
1012            for (right_row_idx, right_row) in right_table.rows.iter().enumerate() {
1013                // Check all conditions - all must be true for a match
1014                let mut all_conditions_met = true;
1015                for condition in conditions.iter() {
1016                    // Evaluate left expression for this row
1017                    let left_value =
1018                        match left_evaluator.evaluate(&condition.left_expr, left_row_idx) {
1019                            Ok(val) => val,
1020                            Err(_) => {
1021                                all_conditions_met = false;
1022                                break;
1023                            }
1024                        };
1025
1026                    // Evaluate right expression for this row
1027                    let right_value =
1028                        match right_evaluator.evaluate(&condition.right_expr, right_row_idx) {
1029                            Ok(val) => val,
1030                            Err(_) => {
1031                                all_conditions_met = false;
1032                                break;
1033                            }
1034                        };
1035
1036                    if !self.compare_values(&left_value, &right_value, &condition.operator) {
1037                        all_conditions_met = false;
1038                        break;
1039                    }
1040                }
1041
1042                if all_conditions_met {
1043                    let mut joined_row = DataRow { values: Vec::new() };
1044                    joined_row.values.extend_from_slice(&left_row.values);
1045                    joined_row.values.extend_from_slice(&right_row.values);
1046                    result.add_row(joined_row);
1047                    match_count += 1;
1048                    found_match = true;
1049                }
1050            }
1051
1052            // If no match found, emit left row with NULLs for right columns
1053            if !found_match {
1054                let mut joined_row = DataRow { values: Vec::new() };
1055                joined_row.values.extend_from_slice(&left_row.values);
1056                for _ in 0..right_table.column_count() {
1057                    joined_row.values.push(DataValue::Null);
1058                }
1059                result.add_row(joined_row);
1060                null_count += 1;
1061            }
1062        }
1063
1064        info!(
1065            "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
1066            match_count,
1067            null_count,
1068            start.elapsed()
1069        );
1070
1071        Ok(result)
1072    }
1073
1074    /// Nested loop join for LEFT JOIN with inequality conditions
1075    fn nested_loop_join_left(
1076        &self,
1077        left_table: Arc<DataTable>,
1078        right_table: Arc<DataTable>,
1079        left_col_idx: usize,
1080        right_col_idx: usize,
1081        operator: &JoinOperator,
1082        join_alias: &Option<String>,
1083    ) -> Result<DataTable> {
1084        let start = std::time::Instant::now();
1085
1086        info!(
1087            "Executing nested loop LEFT JOIN with {:?} operator: {} x {} rows",
1088            operator,
1089            left_table.row_count(),
1090            right_table.row_count()
1091        );
1092
1093        // Create result table with columns from both tables
1094        let mut result = DataTable::new("joined");
1095
1096        // Add columns from left table
1097        for col in &left_table.columns {
1098            result.add_column(DataColumn {
1099                name: col.name.clone(),
1100                data_type: col.data_type.clone(),
1101                nullable: col.nullable,
1102                unique_values: col.unique_values,
1103                null_count: col.null_count,
1104                metadata: col.metadata.clone(),
1105                qualified_name: col.qualified_name.clone(), // Preserve qualified name
1106                source_table: col.source_table.clone(),     // Preserve source table
1107            });
1108        }
1109
1110        // Add columns from right table (all nullable for LEFT JOIN)
1111        for col in &right_table.columns {
1112            if !left_table
1113                .columns
1114                .iter()
1115                .any(|left_col| left_col.name == col.name)
1116            {
1117                result.add_column(DataColumn {
1118                    name: col.name.clone(),
1119                    data_type: col.data_type.clone(),
1120                    nullable: true, // Always nullable for outer join
1121                    unique_values: col.unique_values,
1122                    null_count: col.null_count,
1123                    metadata: col.metadata.clone(),
1124                    qualified_name: col.qualified_name.clone(), // Preserve qualified name
1125                    source_table: col.source_table.clone(),     // Preserve source table
1126                });
1127            } else {
1128                let (column_name, qualified_name) = if let Some(alias) = join_alias {
1129                    // Use the join alias for the column name
1130                    (
1131                        format!("{}.{}", alias, col.name),
1132                        Some(format!("{}.{}", alias, col.name)),
1133                    )
1134                } else {
1135                    // Fall back to _right suffix
1136                    (format!("{}_right", col.name), col.qualified_name.clone())
1137                };
1138                result.add_column(DataColumn {
1139                    name: column_name,
1140                    data_type: col.data_type.clone(),
1141                    nullable: true, // Always nullable for outer join
1142                    unique_values: col.unique_values,
1143                    null_count: col.null_count,
1144                    metadata: col.metadata.clone(),
1145                    qualified_name,
1146                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
1147                });
1148            }
1149        }
1150
1151        // Nested loop join
1152        let mut match_count = 0;
1153        let mut null_count = 0;
1154
1155        for left_row in &left_table.rows {
1156            let left_value = &left_row.values[left_col_idx];
1157            let mut found_match = false;
1158
1159            for right_row in &right_table.rows {
1160                let right_value = &right_row.values[right_col_idx];
1161
1162                if self.compare_values(left_value, right_value, operator) {
1163                    let mut joined_row = DataRow { values: Vec::new() };
1164                    joined_row.values.extend_from_slice(&left_row.values);
1165                    joined_row.values.extend_from_slice(&right_row.values);
1166                    result.add_row(joined_row);
1167                    match_count += 1;
1168                    found_match = true;
1169                }
1170            }
1171
1172            // If no match found, emit left row with NULLs for right columns
1173            if !found_match {
1174                let mut joined_row = DataRow { values: Vec::new() };
1175                joined_row.values.extend_from_slice(&left_row.values);
1176                for _ in 0..right_table.column_count() {
1177                    joined_row.values.push(DataValue::Null);
1178                }
1179                result.add_row(joined_row);
1180                null_count += 1;
1181            }
1182        }
1183
1184        info!(
1185            "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
1186            match_count,
1187            null_count,
1188            start.elapsed()
1189        );
1190
1191        Ok(result)
1192    }
1193}