sql_cli/data/
hash_join.rs

1//! Hash join implementation for efficient JOIN operations
2
3use anyhow::{anyhow, Result};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tracing::{debug, info};
7
8use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
9use crate::sql::parser::ast::{JoinClause, JoinOperator, JoinType};
10
11/// Hash join executor for efficient JOIN operations
12pub struct HashJoinExecutor {
13    case_insensitive: bool,
14}
15
16impl HashJoinExecutor {
17    pub fn new(case_insensitive: bool) -> Self {
18        Self { case_insensitive }
19    }
20
21    /// Execute a single join operation
22    pub fn execute_join(
23        &self,
24        left_table: Arc<DataTable>,
25        join_clause: &JoinClause,
26        right_table: Arc<DataTable>,
27    ) -> Result<DataTable> {
28        info!(
29            "Executing {:?} JOIN: {} rows x {} rows with {} conditions",
30            join_clause.join_type,
31            left_table.row_count(),
32            right_table.row_count(),
33            join_clause.condition.conditions.len()
34        );
35
36        // For multiple conditions, we need to track all column indices
37        let mut condition_indices = Vec::new();
38        let mut all_equal = true;
39
40        for single_condition in &join_clause.condition.conditions {
41            let (left_col_idx, right_col_idx) = self.resolve_join_columns(
42                &left_table,
43                &right_table,
44                &single_condition.left_column,
45                &single_condition.right_column,
46            )?;
47
48            if single_condition.operator != JoinOperator::Equal {
49                all_equal = false;
50            }
51
52            condition_indices.push((
53                left_col_idx,
54                right_col_idx,
55                single_condition.operator.clone(),
56            ));
57        }
58
59        // Choose join algorithm based on operators - use hash join only if all conditions use equality
60        let use_hash_join = all_equal;
61
62        // Perform the appropriate join based on type and operator
63        match join_clause.join_type {
64            JoinType::Inner => {
65                if use_hash_join && condition_indices.len() == 1 {
66                    // Single equality condition - use optimized hash join
67                    let (left_col_idx, right_col_idx, _) = condition_indices[0];
68                    self.hash_join_inner(
69                        left_table,
70                        right_table,
71                        left_col_idx,
72                        right_col_idx,
73                        &join_clause.condition.conditions[0].left_column,
74                        &join_clause.condition.conditions[0].right_column,
75                        &join_clause.alias,
76                    )
77                } else {
78                    // Multiple conditions or inequality - use nested loop join
79                    self.nested_loop_join_inner_multi(
80                        left_table,
81                        right_table,
82                        condition_indices,
83                        &join_clause.alias,
84                    )
85                }
86            }
87            JoinType::Left => {
88                if use_hash_join && condition_indices.len() == 1 {
89                    // Single equality condition - use optimized hash join
90                    let (left_col_idx, right_col_idx, _) = condition_indices[0];
91                    self.hash_join_left(
92                        left_table,
93                        right_table,
94                        left_col_idx,
95                        right_col_idx,
96                        &join_clause.condition.conditions[0].left_column,
97                        &join_clause.condition.conditions[0].right_column,
98                        &join_clause.alias,
99                    )
100                } else {
101                    // Multiple conditions or inequality - use nested loop join
102                    self.nested_loop_join_left_multi(
103                        left_table,
104                        right_table,
105                        condition_indices,
106                        &join_clause.alias,
107                    )
108                }
109            }
110            JoinType::Right => {
111                // Swap condition indices for right join
112                let swapped_indices: Vec<(usize, usize, JoinOperator)> = condition_indices
113                    .into_iter()
114                    .map(|(l, r, op)| (r, l, self.reverse_operator(&op)))
115                    .collect();
116
117                if use_hash_join && swapped_indices.len() == 1 {
118                    // Right join is just a left join with tables swapped
119                    let (right_col_idx, left_col_idx, _) = swapped_indices[0];
120                    self.hash_join_left(
121                        right_table,
122                        left_table,
123                        right_col_idx,
124                        left_col_idx,
125                        &join_clause.condition.conditions[0].right_column,
126                        &join_clause.condition.conditions[0].left_column,
127                        &join_clause.alias,
128                    )
129                } else {
130                    // Right join is just a left join with tables swapped
131                    self.nested_loop_join_left_multi(
132                        right_table,
133                        left_table,
134                        swapped_indices,
135                        &join_clause.alias,
136                    )
137                }
138            }
139            JoinType::Cross => self.cross_join(left_table, right_table),
140            JoinType::Full => {
141                return Err(anyhow!("FULL OUTER JOIN not yet implemented"));
142            }
143        }
144    }
145
146    /// Resolve which table each column belongs to in a join condition
147    fn resolve_join_columns(
148        &self,
149        left_table: &DataTable,
150        right_table: &DataTable,
151        left_col_name: &str,
152        right_col_name: &str,
153    ) -> Result<(usize, usize)> {
154        // Try to find the left column in left table, then right table
155        let left_col_idx = if let Ok(idx) = self.find_column_index(left_table, left_col_name) {
156            idx
157        } else if let Ok(idx) = self.find_column_index(right_table, left_col_name) {
158            // The "left" column in the condition is actually from the right table
159            // This means we need to swap the comparison
160            return Err(anyhow!(
161                "Column '{}' found in right table but specified as left operand. \
162                Please rewrite the condition with columns in correct positions.",
163                left_col_name
164            ));
165        } else {
166            return Err(anyhow!(
167                "Column '{}' not found in either table",
168                left_col_name
169            ));
170        };
171
172        // Try to find the right column in right table, then left table
173        let right_col_idx = if let Ok(idx) = self.find_column_index(right_table, right_col_name) {
174            idx
175        } else if let Ok(idx) = self.find_column_index(left_table, right_col_name) {
176            // The "right" column in the condition is actually from the left table
177            // This means we need to swap the comparison
178            return Err(anyhow!(
179                "Column '{}' found in left table but specified as right operand. \
180                Please rewrite the condition with columns in correct positions.",
181                right_col_name
182            ));
183        } else {
184            return Err(anyhow!(
185                "Column '{}' not found in either table",
186                right_col_name
187            ));
188        };
189
190        Ok((left_col_idx, right_col_idx))
191    }
192
193    /// Find column index in a table
194    fn find_column_index(&self, table: &DataTable, col_name: &str) -> Result<usize> {
195        // Handle table-qualified column names (e.g., "t1.id")
196        let col_name = if let Some(dot_pos) = col_name.rfind('.') {
197            &col_name[dot_pos + 1..]
198        } else {
199            col_name
200        };
201
202        debug!(
203            "Looking for column '{}' in table with columns: {:?}",
204            col_name,
205            table.column_names()
206        );
207
208        table
209            .columns
210            .iter()
211            .position(|col| {
212                if self.case_insensitive {
213                    col.name.to_lowercase() == col_name.to_lowercase()
214                } else {
215                    col.name == col_name
216                }
217            })
218            .ok_or_else(|| anyhow!("Column '{}' not found in table", col_name))
219    }
220
221    /// Hash join implementation for INNER JOIN
222    fn hash_join_inner(
223        &self,
224        left_table: Arc<DataTable>,
225        right_table: Arc<DataTable>,
226        left_col_idx: usize,
227        right_col_idx: usize,
228        _left_col_name: &str,
229        _right_col_name: &str,
230        join_alias: &Option<String>,
231    ) -> Result<DataTable> {
232        let start = std::time::Instant::now();
233
234        // Determine which table to use for building the hash index (prefer smaller)
235        let (build_table, probe_table, build_col_idx, probe_col_idx, build_is_left) =
236            if left_table.row_count() <= right_table.row_count() {
237                (
238                    left_table.clone(),
239                    right_table.clone(),
240                    left_col_idx,
241                    right_col_idx,
242                    true,
243                )
244            } else {
245                (
246                    right_table.clone(),
247                    left_table.clone(),
248                    right_col_idx,
249                    left_col_idx,
250                    false,
251                )
252            };
253
254        debug!(
255            "Building hash index on {} table ({} rows)",
256            if build_is_left { "left" } else { "right" },
257            build_table.row_count()
258        );
259
260        // Build hash index on the smaller table
261        let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
262        for (row_idx, row) in build_table.rows.iter().enumerate() {
263            let key = row.values[build_col_idx].clone();
264            hash_index.entry(key).or_default().push(row_idx);
265        }
266
267        debug!(
268            "Hash index built with {} unique keys in {:?}",
269            hash_index.len(),
270            start.elapsed()
271        );
272
273        // Create result table with columns from both tables
274        let mut result = DataTable::new("joined");
275
276        // Add columns from left table
277        for col in &left_table.columns {
278            result.add_column(DataColumn {
279                name: col.name.clone(),
280                data_type: col.data_type.clone(),
281                nullable: col.nullable,
282                unique_values: col.unique_values,
283                null_count: col.null_count,
284                metadata: col.metadata.clone(),
285                qualified_name: col.qualified_name.clone(), // Preserve qualified name
286                source_table: col.source_table.clone(),     // Preserve source table
287            });
288        }
289
290        // Add columns from right table
291        for col in &right_table.columns {
292            // Skip columns with duplicate names for now
293            if !left_table
294                .columns
295                .iter()
296                .any(|left_col| left_col.name == col.name)
297            {
298                result.add_column(DataColumn {
299                    name: col.name.clone(),
300                    data_type: col.data_type.clone(),
301                    nullable: col.nullable,
302                    unique_values: col.unique_values,
303                    null_count: col.null_count,
304                    metadata: col.metadata.clone(),
305                    qualified_name: col.qualified_name.clone(), // Preserve qualified name
306                    source_table: col.source_table.clone(),     // Preserve source table
307                });
308            } else {
309                // If there's a name conflict, add with a suffix
310                let (column_name, qualified_name) = if let Some(alias) = join_alias {
311                    // Use the join alias for the column name
312                    (
313                        format!("{}.{}", alias, col.name),
314                        Some(format!("{}.{}", alias, col.name)),
315                    )
316                } else {
317                    // Fall back to _right suffix
318                    (format!("{}_right", col.name), col.qualified_name.clone())
319                };
320                result.add_column(DataColumn {
321                    name: column_name,
322                    data_type: col.data_type.clone(),
323                    nullable: col.nullable,
324                    unique_values: col.unique_values,
325                    null_count: col.null_count,
326                    metadata: col.metadata.clone(),
327                    qualified_name,
328                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
329                });
330            }
331        }
332
333        debug!(
334            "Joined table will have {} columns: {:?}",
335            result.column_count(),
336            result.column_names()
337        );
338
339        // Probe phase: iterate through the larger table
340        let mut match_count = 0;
341        for probe_row in &probe_table.rows {
342            let probe_key = &probe_row.values[probe_col_idx];
343
344            if let Some(matching_indices) = hash_index.get(probe_key) {
345                for &build_idx in matching_indices {
346                    let build_row = &build_table.rows[build_idx];
347
348                    // Create joined row based on which table was used for building
349                    let mut joined_row = DataRow { values: Vec::new() };
350
351                    if build_is_left {
352                        // Build was left, probe was right
353                        joined_row.values.extend_from_slice(&build_row.values);
354                        joined_row.values.extend_from_slice(&probe_row.values);
355                    } else {
356                        // Build was right, probe was left
357                        joined_row.values.extend_from_slice(&probe_row.values);
358                        joined_row.values.extend_from_slice(&build_row.values);
359                    }
360
361                    result.add_row(joined_row);
362                    match_count += 1;
363                }
364            }
365        }
366
367        // Debug: log the qualified names in the result table
368        let qualified_cols: Vec<String> = result
369            .columns
370            .iter()
371            .filter_map(|c| c.qualified_name.clone())
372            .collect();
373
374        info!(
375            "INNER JOIN complete: {} matches found in {:?}. Result has {} columns ({} qualified: {:?})",
376            match_count,
377            start.elapsed(),
378            result.columns.len(),
379            qualified_cols.len(),
380            qualified_cols
381        );
382
383        Ok(result)
384    }
385
386    /// Hash join implementation for LEFT OUTER JOIN
387    fn hash_join_left(
388        &self,
389        left_table: Arc<DataTable>,
390        right_table: Arc<DataTable>,
391        left_col_idx: usize,
392        right_col_idx: usize,
393        _left_col_name: &str,
394        _right_col_name: &str,
395        join_alias: &Option<String>,
396    ) -> Result<DataTable> {
397        let start = std::time::Instant::now();
398
399        debug!(
400            "Building hash index on right table ({} rows)",
401            right_table.row_count()
402        );
403
404        // Build hash index on right table
405        let mut hash_index: HashMap<DataValue, Vec<usize>> = HashMap::new();
406        for (row_idx, row) in right_table.rows.iter().enumerate() {
407            let key = row.values[right_col_idx].clone();
408            hash_index.entry(key).or_default().push(row_idx);
409        }
410
411        // Create result table with columns from both tables
412        let mut result = DataTable::new("joined");
413
414        // Add columns from left table
415        for col in &left_table.columns {
416            result.add_column(DataColumn {
417                name: col.name.clone(),
418                data_type: col.data_type.clone(),
419                nullable: col.nullable,
420                unique_values: col.unique_values,
421                null_count: col.null_count,
422                metadata: col.metadata.clone(),
423                qualified_name: col.qualified_name.clone(), // Preserve qualified name
424                source_table: col.source_table.clone(),     // Preserve source table
425            });
426        }
427
428        // Add columns from right table (all nullable for LEFT JOIN)
429        for col in &right_table.columns {
430            // Skip columns with duplicate names for now
431            if !left_table
432                .columns
433                .iter()
434                .any(|left_col| left_col.name == col.name)
435            {
436                result.add_column(DataColumn {
437                    name: col.name.clone(),
438                    data_type: col.data_type.clone(),
439                    nullable: true, // Always nullable for outer join
440                    unique_values: col.unique_values,
441                    null_count: col.null_count,
442                    metadata: col.metadata.clone(),
443                    qualified_name: col.qualified_name.clone(), // Preserve qualified name
444                    source_table: col.source_table.clone(),     // Preserve source table
445                });
446            } else {
447                // If there's a name conflict, add with a suffix
448                let (column_name, qualified_name) = if let Some(alias) = join_alias {
449                    // Use the join alias for the column name
450                    (
451                        format!("{}.{}", alias, col.name),
452                        Some(format!("{}.{}", alias, col.name)),
453                    )
454                } else {
455                    // Fall back to _right suffix
456                    (format!("{}_right", col.name), col.qualified_name.clone())
457                };
458                result.add_column(DataColumn {
459                    name: column_name,
460                    data_type: col.data_type.clone(),
461                    nullable: true, // Always nullable for outer join
462                    unique_values: col.unique_values,
463                    null_count: col.null_count,
464                    metadata: col.metadata.clone(),
465                    qualified_name,
466                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
467                });
468            }
469        }
470
471        debug!(
472            "LEFT JOIN table will have {} columns: {:?}",
473            result.column_count(),
474            result.column_names()
475        );
476
477        // Probe phase: iterate through left table
478        let mut match_count = 0;
479        let mut null_count = 0;
480
481        for left_row in &left_table.rows {
482            let left_key = &left_row.values[left_col_idx];
483
484            if let Some(matching_indices) = hash_index.get(left_key) {
485                // Found matches - emit joined rows
486                for &right_idx in matching_indices {
487                    let right_row = &right_table.rows[right_idx];
488
489                    let mut joined_row = DataRow { values: Vec::new() };
490                    joined_row.values.extend_from_slice(&left_row.values);
491                    joined_row.values.extend_from_slice(&right_row.values);
492
493                    result.add_row(joined_row);
494                    match_count += 1;
495                }
496            } else {
497                // No match - emit left row with NULLs for right columns
498                let mut joined_row = DataRow { values: Vec::new() };
499                joined_row.values.extend_from_slice(&left_row.values);
500
501                // Add NULL values for all right table columns
502                for _ in 0..right_table.column_count() {
503                    joined_row.values.push(DataValue::Null);
504                }
505
506                result.add_row(joined_row);
507                null_count += 1;
508            }
509        }
510
511        // Debug: log the qualified names in the result table
512        let qualified_cols: Vec<String> = result
513            .columns
514            .iter()
515            .filter_map(|c| c.qualified_name.clone())
516            .collect();
517
518        info!(
519            "LEFT JOIN complete: {} matches, {} nulls in {:?}. Result has {} columns ({} qualified: {:?})",
520            match_count,
521            null_count,
522            start.elapsed(),
523            result.columns.len(),
524            qualified_cols.len(),
525            qualified_cols
526        );
527
528        Ok(result)
529    }
530
531    /// Cross join implementation
532    fn cross_join(
533        &self,
534        left_table: Arc<DataTable>,
535        right_table: Arc<DataTable>,
536    ) -> Result<DataTable> {
537        let start = std::time::Instant::now();
538
539        // Check for potential memory explosion
540        let result_rows = left_table.row_count() * right_table.row_count();
541        if result_rows > 1_000_000 {
542            return Err(anyhow!(
543                "CROSS JOIN would produce {} rows, which exceeds the safety limit",
544                result_rows
545            ));
546        }
547
548        // Create result table
549        let mut result = DataTable::new("joined");
550
551        // Add columns from both tables
552        for col in &left_table.columns {
553            result.add_column(col.clone());
554        }
555        for col in &right_table.columns {
556            result.add_column(col.clone());
557        }
558
559        // Generate Cartesian product
560        for left_row in &left_table.rows {
561            for right_row in &right_table.rows {
562                let mut joined_row = DataRow { values: Vec::new() };
563                joined_row.values.extend_from_slice(&left_row.values);
564                joined_row.values.extend_from_slice(&right_row.values);
565                result.add_row(joined_row);
566            }
567        }
568
569        info!(
570            "CROSS JOIN complete: {} rows in {:?}",
571            result.row_count(),
572            start.elapsed()
573        );
574
575        Ok(result)
576    }
577
578    /// Qualify column name to avoid conflicts
579    fn qualify_column_name(
580        &self,
581        col_name: &str,
582        table_side: &str,
583        left_join_col: &str,
584        right_join_col: &str,
585    ) -> String {
586        // Extract base column name (without table prefix)
587        let base_name = if let Some(dot_pos) = col_name.rfind('.') {
588            &col_name[dot_pos + 1..]
589        } else {
590            col_name
591        };
592
593        let left_base = if let Some(dot_pos) = left_join_col.rfind('.') {
594            &left_join_col[dot_pos + 1..]
595        } else {
596            left_join_col
597        };
598
599        let right_base = if let Some(dot_pos) = right_join_col.rfind('.') {
600            &right_join_col[dot_pos + 1..]
601        } else {
602            right_join_col
603        };
604
605        // If this column name appears in both join columns, qualify it
606        if base_name == left_base || base_name == right_base {
607            format!("{}_{}", table_side, base_name)
608        } else {
609            col_name.to_string()
610        }
611    }
612
613    /// Reverse a join operator for right joins
614    fn reverse_operator(&self, op: &JoinOperator) -> JoinOperator {
615        match op {
616            JoinOperator::Equal => JoinOperator::Equal,
617            JoinOperator::NotEqual => JoinOperator::NotEqual,
618            JoinOperator::LessThan => JoinOperator::GreaterThan,
619            JoinOperator::GreaterThan => JoinOperator::LessThan,
620            JoinOperator::LessThanOrEqual => JoinOperator::GreaterThanOrEqual,
621            JoinOperator::GreaterThanOrEqual => JoinOperator::LessThanOrEqual,
622        }
623    }
624
625    /// Compare two values based on the join operator
626    fn compare_values(&self, left: &DataValue, right: &DataValue, op: &JoinOperator) -> bool {
627        match op {
628            JoinOperator::Equal => left == right,
629            JoinOperator::NotEqual => left != right,
630            JoinOperator::LessThan => left < right,
631            JoinOperator::GreaterThan => left > right,
632            JoinOperator::LessThanOrEqual => left <= right,
633            JoinOperator::GreaterThanOrEqual => left >= right,
634        }
635    }
636
637    /// Nested loop join for INNER JOIN with inequality conditions
638    fn nested_loop_join_inner(
639        &self,
640        left_table: Arc<DataTable>,
641        right_table: Arc<DataTable>,
642        left_col_idx: usize,
643        right_col_idx: usize,
644        operator: &JoinOperator,
645        join_alias: &Option<String>,
646    ) -> Result<DataTable> {
647        let start = std::time::Instant::now();
648
649        info!(
650            "Executing nested loop INNER JOIN with {:?} operator: {} x {} rows",
651            operator,
652            left_table.row_count(),
653            right_table.row_count()
654        );
655
656        // Create result table with columns from both tables
657        let mut result = DataTable::new("joined");
658
659        // Add columns from left table
660        for col in &left_table.columns {
661            result.add_column(DataColumn {
662                name: col.name.clone(),
663                data_type: col.data_type.clone(),
664                nullable: col.nullable,
665                unique_values: col.unique_values,
666                null_count: col.null_count,
667                metadata: col.metadata.clone(),
668                qualified_name: col.qualified_name.clone(), // Preserve qualified name
669                source_table: col.source_table.clone(),     // Preserve source table
670            });
671        }
672
673        // Add columns from right table
674        for col in &right_table.columns {
675            if !left_table
676                .columns
677                .iter()
678                .any(|left_col| left_col.name == col.name)
679            {
680                result.add_column(DataColumn {
681                    name: col.name.clone(),
682                    data_type: col.data_type.clone(),
683                    nullable: col.nullable,
684                    unique_values: col.unique_values,
685                    null_count: col.null_count,
686                    metadata: col.metadata.clone(),
687                    qualified_name: col.qualified_name.clone(), // Preserve qualified name
688                    source_table: col.source_table.clone(),     // Preserve source table
689                });
690            } else {
691                let (column_name, qualified_name) = if let Some(alias) = join_alias {
692                    // Use the join alias for the column name
693                    (
694                        format!("{}.{}", alias, col.name),
695                        Some(format!("{}.{}", alias, col.name)),
696                    )
697                } else {
698                    // Fall back to _right suffix
699                    (format!("{}_right", col.name), col.qualified_name.clone())
700                };
701                result.add_column(DataColumn {
702                    name: column_name,
703                    data_type: col.data_type.clone(),
704                    nullable: col.nullable,
705                    unique_values: col.unique_values,
706                    null_count: col.null_count,
707                    metadata: col.metadata.clone(),
708                    qualified_name,
709                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
710                });
711            }
712        }
713
714        // Nested loop join
715        let mut match_count = 0;
716        for left_row in &left_table.rows {
717            let left_value = &left_row.values[left_col_idx];
718
719            for right_row in &right_table.rows {
720                let right_value = &right_row.values[right_col_idx];
721
722                if self.compare_values(left_value, right_value, operator) {
723                    let mut joined_row = DataRow { values: Vec::new() };
724                    joined_row.values.extend_from_slice(&left_row.values);
725                    joined_row.values.extend_from_slice(&right_row.values);
726                    result.add_row(joined_row);
727                    match_count += 1;
728                }
729            }
730        }
731
732        info!(
733            "Nested loop INNER JOIN complete: {} matches found in {:?}",
734            match_count,
735            start.elapsed()
736        );
737
738        Ok(result)
739    }
740
741    /// Nested loop join for INNER JOIN with multiple conditions
742    fn nested_loop_join_inner_multi(
743        &self,
744        left_table: Arc<DataTable>,
745        right_table: Arc<DataTable>,
746        conditions: Vec<(usize, usize, JoinOperator)>,
747        join_alias: &Option<String>,
748    ) -> Result<DataTable> {
749        let start = std::time::Instant::now();
750
751        info!(
752            "Executing nested loop INNER JOIN with {} conditions: {} x {} rows",
753            conditions.len(),
754            left_table.row_count(),
755            right_table.row_count()
756        );
757
758        // Create result table with columns from both tables
759        let mut result = DataTable::new("joined");
760
761        // Add columns from left table
762        for col in &left_table.columns {
763            result.add_column(DataColumn {
764                name: col.name.clone(),
765                data_type: col.data_type.clone(),
766                nullable: col.nullable,
767                unique_values: col.unique_values,
768                null_count: col.null_count,
769                metadata: col.metadata.clone(),
770                qualified_name: col.qualified_name.clone(),
771                source_table: col.source_table.clone(),
772            });
773        }
774
775        // Add columns from right table
776        for col in &right_table.columns {
777            if !left_table
778                .columns
779                .iter()
780                .any(|left_col| left_col.name == col.name)
781            {
782                result.add_column(DataColumn {
783                    name: col.name.clone(),
784                    data_type: col.data_type.clone(),
785                    nullable: col.nullable,
786                    unique_values: col.unique_values,
787                    null_count: col.null_count,
788                    metadata: col.metadata.clone(),
789                    qualified_name: col.qualified_name.clone(),
790                    source_table: col.source_table.clone(),
791                });
792            } else {
793                let (column_name, qualified_name) = if let Some(alias) = join_alias {
794                    (
795                        format!("{}.{}", alias, col.name),
796                        Some(format!("{}.{}", alias, col.name)),
797                    )
798                } else {
799                    (format!("{}_right", col.name), col.qualified_name.clone())
800                };
801                result.add_column(DataColumn {
802                    name: column_name,
803                    data_type: col.data_type.clone(),
804                    nullable: col.nullable,
805                    unique_values: col.unique_values,
806                    null_count: col.null_count,
807                    metadata: col.metadata.clone(),
808                    qualified_name,
809                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
810                });
811            }
812        }
813
814        // Nested loop join with multiple conditions
815        let mut match_count = 0;
816        for left_row in &left_table.rows {
817            for right_row in &right_table.rows {
818                // Check all conditions - all must be true for a match
819                let mut all_conditions_met = true;
820                for (left_idx, right_idx, operator) in &conditions {
821                    let left_value = &left_row.values[*left_idx];
822                    let right_value = &right_row.values[*right_idx];
823
824                    if !self.compare_values(left_value, right_value, operator) {
825                        all_conditions_met = false;
826                        break;
827                    }
828                }
829
830                if all_conditions_met {
831                    let mut joined_row = DataRow { values: Vec::new() };
832                    joined_row.values.extend_from_slice(&left_row.values);
833                    joined_row.values.extend_from_slice(&right_row.values);
834                    result.add_row(joined_row);
835                    match_count += 1;
836                }
837            }
838        }
839
840        info!(
841            "Nested loop INNER JOIN complete: {} matches found in {:?}",
842            match_count,
843            start.elapsed()
844        );
845
846        Ok(result)
847    }
848
849    /// Nested loop join for LEFT JOIN with multiple conditions
850    fn nested_loop_join_left_multi(
851        &self,
852        left_table: Arc<DataTable>,
853        right_table: Arc<DataTable>,
854        conditions: Vec<(usize, usize, JoinOperator)>,
855        join_alias: &Option<String>,
856    ) -> Result<DataTable> {
857        let start = std::time::Instant::now();
858
859        info!(
860            "Executing nested loop LEFT JOIN with {} conditions: {} x {} rows",
861            conditions.len(),
862            left_table.row_count(),
863            right_table.row_count()
864        );
865
866        // Create result table with columns from both tables
867        let mut result = DataTable::new("joined");
868
869        // Add columns from left table
870        for col in &left_table.columns {
871            result.add_column(DataColumn {
872                name: col.name.clone(),
873                data_type: col.data_type.clone(),
874                nullable: col.nullable,
875                unique_values: col.unique_values,
876                null_count: col.null_count,
877                metadata: col.metadata.clone(),
878                qualified_name: col.qualified_name.clone(),
879                source_table: col.source_table.clone(),
880            });
881        }
882
883        // Add columns from right table (all nullable for LEFT JOIN)
884        for col in &right_table.columns {
885            if !left_table
886                .columns
887                .iter()
888                .any(|left_col| left_col.name == col.name)
889            {
890                result.add_column(DataColumn {
891                    name: col.name.clone(),
892                    data_type: col.data_type.clone(),
893                    nullable: true, // Always nullable for outer join
894                    unique_values: col.unique_values,
895                    null_count: col.null_count,
896                    metadata: col.metadata.clone(),
897                    qualified_name: col.qualified_name.clone(),
898                    source_table: col.source_table.clone(),
899                });
900            } else {
901                let (column_name, qualified_name) = if let Some(alias) = join_alias {
902                    (
903                        format!("{}.{}", alias, col.name),
904                        Some(format!("{}.{}", alias, col.name)),
905                    )
906                } else {
907                    (format!("{}_right", col.name), col.qualified_name.clone())
908                };
909                result.add_column(DataColumn {
910                    name: column_name,
911                    data_type: col.data_type.clone(),
912                    nullable: true, // Always nullable for outer join
913                    unique_values: col.unique_values,
914                    null_count: col.null_count,
915                    metadata: col.metadata.clone(),
916                    qualified_name,
917                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
918                });
919            }
920        }
921
922        // Nested loop join with multiple conditions
923        let mut match_count = 0;
924        let mut null_count = 0;
925
926        for left_row in &left_table.rows {
927            let mut found_match = false;
928
929            for right_row in &right_table.rows {
930                // Check all conditions - all must be true for a match
931                let mut all_conditions_met = true;
932                for (left_idx, right_idx, operator) in &conditions {
933                    let left_value = &left_row.values[*left_idx];
934                    let right_value = &right_row.values[*right_idx];
935
936                    if !self.compare_values(left_value, right_value, operator) {
937                        all_conditions_met = false;
938                        break;
939                    }
940                }
941
942                if all_conditions_met {
943                    let mut joined_row = DataRow { values: Vec::new() };
944                    joined_row.values.extend_from_slice(&left_row.values);
945                    joined_row.values.extend_from_slice(&right_row.values);
946                    result.add_row(joined_row);
947                    match_count += 1;
948                    found_match = true;
949                }
950            }
951
952            // If no match found, emit left row with NULLs for right columns
953            if !found_match {
954                let mut joined_row = DataRow { values: Vec::new() };
955                joined_row.values.extend_from_slice(&left_row.values);
956                for _ in 0..right_table.column_count() {
957                    joined_row.values.push(DataValue::Null);
958                }
959                result.add_row(joined_row);
960                null_count += 1;
961            }
962        }
963
964        info!(
965            "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
966            match_count,
967            null_count,
968            start.elapsed()
969        );
970
971        Ok(result)
972    }
973
974    /// Nested loop join for LEFT JOIN with inequality conditions
975    fn nested_loop_join_left(
976        &self,
977        left_table: Arc<DataTable>,
978        right_table: Arc<DataTable>,
979        left_col_idx: usize,
980        right_col_idx: usize,
981        operator: &JoinOperator,
982        join_alias: &Option<String>,
983    ) -> Result<DataTable> {
984        let start = std::time::Instant::now();
985
986        info!(
987            "Executing nested loop LEFT JOIN with {:?} operator: {} x {} rows",
988            operator,
989            left_table.row_count(),
990            right_table.row_count()
991        );
992
993        // Create result table with columns from both tables
994        let mut result = DataTable::new("joined");
995
996        // Add columns from left table
997        for col in &left_table.columns {
998            result.add_column(DataColumn {
999                name: col.name.clone(),
1000                data_type: col.data_type.clone(),
1001                nullable: col.nullable,
1002                unique_values: col.unique_values,
1003                null_count: col.null_count,
1004                metadata: col.metadata.clone(),
1005                qualified_name: col.qualified_name.clone(), // Preserve qualified name
1006                source_table: col.source_table.clone(),     // Preserve source table
1007            });
1008        }
1009
1010        // Add columns from right table (all nullable for LEFT JOIN)
1011        for col in &right_table.columns {
1012            if !left_table
1013                .columns
1014                .iter()
1015                .any(|left_col| left_col.name == col.name)
1016            {
1017                result.add_column(DataColumn {
1018                    name: col.name.clone(),
1019                    data_type: col.data_type.clone(),
1020                    nullable: true, // Always nullable for outer join
1021                    unique_values: col.unique_values,
1022                    null_count: col.null_count,
1023                    metadata: col.metadata.clone(),
1024                    qualified_name: col.qualified_name.clone(), // Preserve qualified name
1025                    source_table: col.source_table.clone(),     // Preserve source table
1026                });
1027            } else {
1028                let (column_name, qualified_name) = if let Some(alias) = join_alias {
1029                    // Use the join alias for the column name
1030                    (
1031                        format!("{}.{}", alias, col.name),
1032                        Some(format!("{}.{}", alias, col.name)),
1033                    )
1034                } else {
1035                    // Fall back to _right suffix
1036                    (format!("{}_right", col.name), col.qualified_name.clone())
1037                };
1038                result.add_column(DataColumn {
1039                    name: column_name,
1040                    data_type: col.data_type.clone(),
1041                    nullable: true, // Always nullable for outer join
1042                    unique_values: col.unique_values,
1043                    null_count: col.null_count,
1044                    metadata: col.metadata.clone(),
1045                    qualified_name,
1046                    source_table: join_alias.clone().or_else(|| col.source_table.clone()),
1047                });
1048            }
1049        }
1050
1051        // Nested loop join
1052        let mut match_count = 0;
1053        let mut null_count = 0;
1054
1055        for left_row in &left_table.rows {
1056            let left_value = &left_row.values[left_col_idx];
1057            let mut found_match = false;
1058
1059            for right_row in &right_table.rows {
1060                let right_value = &right_row.values[right_col_idx];
1061
1062                if self.compare_values(left_value, right_value, operator) {
1063                    let mut joined_row = DataRow { values: Vec::new() };
1064                    joined_row.values.extend_from_slice(&left_row.values);
1065                    joined_row.values.extend_from_slice(&right_row.values);
1066                    result.add_row(joined_row);
1067                    match_count += 1;
1068                    found_match = true;
1069                }
1070            }
1071
1072            // If no match found, emit left row with NULLs for right columns
1073            if !found_match {
1074                let mut joined_row = DataRow { values: Vec::new() };
1075                joined_row.values.extend_from_slice(&left_row.values);
1076                for _ in 0..right_table.column_count() {
1077                    joined_row.values.push(DataValue::Null);
1078                }
1079                result.add_row(joined_row);
1080                null_count += 1;
1081            }
1082        }
1083
1084        info!(
1085            "Nested loop LEFT JOIN complete: {} matches, {} nulls in {:?}",
1086            match_count,
1087            null_count,
1088            start.elapsed()
1089        );
1090
1091        Ok(result)
1092    }
1093}