Skip to main content

grafeo_core/execution/operators/
join.rs

1//! Join operators for combining data from two sources.
2//!
3//! This module provides:
4//! - `HashJoinOperator`: Efficient hash-based join for equality conditions
5//! - `NestedLoopJoinOperator`: General-purpose join for any condition
6
7use std::collections::HashMap;
8
9use grafeo_common::types::{LogicalType, Value};
10
11use super::{Operator, OperatorError, OperatorResult};
12use crate::execution::chunk::DataChunkBuilder;
13use crate::execution::{DataChunk, ValueVector};
14
15/// The type of join to perform.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum JoinType {
18    /// Inner join: only matching rows from both sides.
19    Inner,
20    /// Left outer join: all rows from left, matching from right (nulls if no match).
21    Left,
22    /// Right outer join: all rows from right, matching from left (nulls if no match).
23    Right,
24    /// Full outer join: all rows from both sides.
25    Full,
26    /// Cross join: cartesian product of both sides.
27    Cross,
28    /// Semi join: rows from left that have a match in right.
29    Semi,
30    /// Anti join: rows from left that have no match in right.
31    Anti,
32}
33
34/// A hash key that can be hashed and compared for join operations.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub enum HashKey {
37    /// Null key.
38    Null,
39    /// Boolean key.
40    Bool(bool),
41    /// Integer key.
42    Int64(i64),
43    /// String key (using the string content for hashing).
44    String(String),
45    /// Composite key for multi-column joins.
46    Composite(Vec<HashKey>),
47}
48
49impl HashKey {
50    /// Creates a hash key from a Value.
51    pub fn from_value(value: &Value) -> Self {
52        match value {
53            Value::Null => HashKey::Null,
54            Value::Bool(b) => HashKey::Bool(*b),
55            Value::Int64(i) => HashKey::Int64(*i),
56            Value::Float64(f) => {
57                // Convert float to bits for consistent hashing
58                HashKey::Int64(f.to_bits() as i64)
59            }
60            Value::String(s) => HashKey::String(s.to_string()),
61            Value::Bytes(b) => {
62                // Use byte content for hashing
63                HashKey::String(format!("{b:?}"))
64            }
65            Value::Timestamp(t) => HashKey::Int64(t.as_micros()),
66            Value::List(items) => {
67                HashKey::Composite(items.iter().map(HashKey::from_value).collect())
68            }
69            Value::Map(map) => {
70                let mut keys: Vec<_> = map
71                    .iter()
72                    .map(|(k, v)| {
73                        HashKey::Composite(vec![
74                            HashKey::String(k.to_string()),
75                            HashKey::from_value(v),
76                        ])
77                    })
78                    .collect();
79                keys.sort_by(|a, b| format!("{a:?}").cmp(&format!("{b:?}")));
80                HashKey::Composite(keys)
81            }
82        }
83    }
84
85    /// Creates a hash key from a column value at a given row.
86    pub fn from_column(column: &ValueVector, row: usize) -> Option<Self> {
87        column.get_value(row).map(|v| Self::from_value(&v))
88    }
89}
90
91/// Hash join operator.
92///
93/// Builds a hash table from the build side (right) and probes with the probe side (left).
94/// Efficient for equality joins on one or more columns.
95pub struct HashJoinOperator {
96    /// Left (probe) side operator.
97    probe_side: Box<dyn Operator>,
98    /// Right (build) side operator.
99    build_side: Box<dyn Operator>,
100    /// Column indices on the probe side for join keys.
101    probe_keys: Vec<usize>,
102    /// Column indices on the build side for join keys.
103    build_keys: Vec<usize>,
104    /// Join type.
105    join_type: JoinType,
106    /// Output schema (combined from both sides).
107    output_schema: Vec<LogicalType>,
108    /// Hash table: key -> list of (chunk_index, row_index).
109    hash_table: HashMap<HashKey, Vec<(usize, usize)>>,
110    /// Materialized build side chunks.
111    build_chunks: Vec<DataChunk>,
112    /// Whether the build phase is complete.
113    build_complete: bool,
114    /// Current probe chunk being processed.
115    current_probe_chunk: Option<DataChunk>,
116    /// Current row in the probe chunk.
117    current_probe_row: usize,
118    /// Current position in the hash table matches for the current probe row.
119    current_match_position: usize,
120    /// Current matches for the current probe row.
121    current_matches: Vec<(usize, usize)>,
122    /// For left/full outer joins: track which probe rows had matches.
123    probe_matched: Vec<bool>,
124    /// For right/full outer joins: track which build rows were matched.
125    build_matched: Vec<Vec<bool>>,
126    /// Whether we're in the emit unmatched phase (for outer joins).
127    emitting_unmatched: bool,
128    /// Current chunk index when emitting unmatched rows.
129    unmatched_chunk_idx: usize,
130    /// Current row index when emitting unmatched rows.
131    unmatched_row_idx: usize,
132}
133
134impl HashJoinOperator {
135    /// Creates a new hash join operator.
136    ///
137    /// # Arguments
138    /// * `probe_side` - Left side operator (will be probed).
139    /// * `build_side` - Right side operator (will build hash table).
140    /// * `probe_keys` - Column indices on probe side for join keys.
141    /// * `build_keys` - Column indices on build side for join keys.
142    /// * `join_type` - Type of join to perform.
143    /// * `output_schema` - Schema of the output (probe columns + build columns).
144    pub fn new(
145        probe_side: Box<dyn Operator>,
146        build_side: Box<dyn Operator>,
147        probe_keys: Vec<usize>,
148        build_keys: Vec<usize>,
149        join_type: JoinType,
150        output_schema: Vec<LogicalType>,
151    ) -> Self {
152        Self {
153            probe_side,
154            build_side,
155            probe_keys,
156            build_keys,
157            join_type,
158            output_schema,
159            hash_table: HashMap::new(),
160            build_chunks: Vec::new(),
161            build_complete: false,
162            current_probe_chunk: None,
163            current_probe_row: 0,
164            current_match_position: 0,
165            current_matches: Vec::new(),
166            probe_matched: Vec::new(),
167            build_matched: Vec::new(),
168            emitting_unmatched: false,
169            unmatched_chunk_idx: 0,
170            unmatched_row_idx: 0,
171        }
172    }
173
174    /// Builds the hash table from the build side.
175    fn build_hash_table(&mut self) -> Result<(), OperatorError> {
176        while let Some(chunk) = self.build_side.next()? {
177            let chunk_idx = self.build_chunks.len();
178
179            // Initialize match tracking for outer joins
180            if matches!(self.join_type, JoinType::Right | JoinType::Full) {
181                self.build_matched.push(vec![false; chunk.row_count()]);
182            }
183
184            // Add each row to the hash table
185            for row in chunk.selected_indices() {
186                let key = self.extract_key(&chunk, row, &self.build_keys)?;
187
188                // Skip null keys for inner/semi/anti joins
189                if matches!(key, HashKey::Null)
190                    && !matches!(
191                        self.join_type,
192                        JoinType::Left | JoinType::Right | JoinType::Full
193                    )
194                {
195                    continue;
196                }
197
198                self.hash_table
199                    .entry(key)
200                    .or_default()
201                    .push((chunk_idx, row));
202            }
203
204            self.build_chunks.push(chunk);
205        }
206
207        self.build_complete = true;
208        Ok(())
209    }
210
211    /// Extracts a hash key from a chunk row.
212    fn extract_key(
213        &self,
214        chunk: &DataChunk,
215        row: usize,
216        key_columns: &[usize],
217    ) -> Result<HashKey, OperatorError> {
218        if key_columns.len() == 1 {
219            let col = chunk.column(key_columns[0]).ok_or_else(|| {
220                OperatorError::ColumnNotFound(format!("column {}", key_columns[0]))
221            })?;
222            Ok(HashKey::from_column(col, row).unwrap_or(HashKey::Null))
223        } else {
224            let keys: Vec<HashKey> = key_columns
225                .iter()
226                .map(|&col_idx| {
227                    chunk
228                        .column(col_idx)
229                        .and_then(|col| HashKey::from_column(col, row))
230                        .unwrap_or(HashKey::Null)
231                })
232                .collect();
233            Ok(HashKey::Composite(keys))
234        }
235    }
236
237    /// Produces an output row from a probe row and build row.
238    fn produce_output_row(
239        &self,
240        builder: &mut DataChunkBuilder,
241        probe_chunk: &DataChunk,
242        probe_row: usize,
243        build_chunk: Option<&DataChunk>,
244        build_row: Option<usize>,
245    ) -> Result<(), OperatorError> {
246        let probe_col_count = probe_chunk.column_count();
247
248        // Copy probe side columns
249        for col_idx in 0..probe_col_count {
250            let src_col = probe_chunk
251                .column(col_idx)
252                .ok_or_else(|| OperatorError::ColumnNotFound(format!("probe column {col_idx}")))?;
253            let dst_col = builder
254                .column_mut(col_idx)
255                .ok_or_else(|| OperatorError::ColumnNotFound(format!("output column {col_idx}")))?;
256
257            if let Some(value) = src_col.get_value(probe_row) {
258                dst_col.push_value(value);
259            } else {
260                dst_col.push_value(Value::Null);
261            }
262        }
263
264        // Copy build side columns
265        match (build_chunk, build_row) {
266            (Some(chunk), Some(row)) => {
267                for col_idx in 0..chunk.column_count() {
268                    let src_col = chunk.column(col_idx).ok_or_else(|| {
269                        OperatorError::ColumnNotFound(format!("build column {col_idx}"))
270                    })?;
271                    let dst_col =
272                        builder
273                            .column_mut(probe_col_count + col_idx)
274                            .ok_or_else(|| {
275                                OperatorError::ColumnNotFound(format!(
276                                    "output column {}",
277                                    probe_col_count + col_idx
278                                ))
279                            })?;
280
281                    if let Some(value) = src_col.get_value(row) {
282                        dst_col.push_value(value);
283                    } else {
284                        dst_col.push_value(Value::Null);
285                    }
286                }
287            }
288            _ => {
289                // Emit nulls for build side (left outer join case)
290                if !self.build_chunks.is_empty() {
291                    let build_col_count = self.build_chunks[0].column_count();
292                    for col_idx in 0..build_col_count {
293                        let dst_col =
294                            builder
295                                .column_mut(probe_col_count + col_idx)
296                                .ok_or_else(|| {
297                                    OperatorError::ColumnNotFound(format!(
298                                        "output column {}",
299                                        probe_col_count + col_idx
300                                    ))
301                                })?;
302                        dst_col.push_value(Value::Null);
303                    }
304                }
305            }
306        }
307
308        builder.advance_row();
309        Ok(())
310    }
311
312    /// Gets the next probe chunk.
313    fn get_next_probe_chunk(&mut self) -> Result<bool, OperatorError> {
314        let chunk = self.probe_side.next()?;
315        if let Some(ref c) = chunk {
316            // Initialize match tracking for outer joins
317            if matches!(self.join_type, JoinType::Left | JoinType::Full) {
318                self.probe_matched = vec![false; c.row_count()];
319            }
320        }
321        let has_chunk = chunk.is_some();
322        self.current_probe_chunk = chunk;
323        self.current_probe_row = 0;
324        Ok(has_chunk)
325    }
326
327    /// Emits unmatched build rows for right/full outer joins.
328    fn emit_unmatched_build(&mut self) -> OperatorResult {
329        if self.build_matched.is_empty() {
330            return Ok(None);
331        }
332
333        let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
334
335        // Determine probe column count from schema or first probe chunk
336        let probe_col_count = if !self.build_chunks.is_empty() {
337            self.output_schema.len() - self.build_chunks[0].column_count()
338        } else {
339            0
340        };
341
342        while self.unmatched_chunk_idx < self.build_chunks.len() {
343            let chunk = &self.build_chunks[self.unmatched_chunk_idx];
344            let matched = &self.build_matched[self.unmatched_chunk_idx];
345
346            while self.unmatched_row_idx < matched.len() {
347                if !matched[self.unmatched_row_idx] {
348                    // This row was not matched - emit with nulls on probe side
349
350                    // Emit nulls for probe side
351                    for col_idx in 0..probe_col_count {
352                        if let Some(dst_col) = builder.column_mut(col_idx) {
353                            dst_col.push_value(Value::Null);
354                        }
355                    }
356
357                    // Copy build side values
358                    for col_idx in 0..chunk.column_count() {
359                        if let (Some(src_col), Some(dst_col)) = (
360                            chunk.column(col_idx),
361                            builder.column_mut(probe_col_count + col_idx),
362                        ) {
363                            if let Some(value) = src_col.get_value(self.unmatched_row_idx) {
364                                dst_col.push_value(value);
365                            } else {
366                                dst_col.push_value(Value::Null);
367                            }
368                        }
369                    }
370
371                    builder.advance_row();
372
373                    if builder.is_full() {
374                        self.unmatched_row_idx += 1;
375                        return Ok(Some(builder.finish()));
376                    }
377                }
378
379                self.unmatched_row_idx += 1;
380            }
381
382            self.unmatched_chunk_idx += 1;
383            self.unmatched_row_idx = 0;
384        }
385
386        if builder.row_count() > 0 {
387            Ok(Some(builder.finish()))
388        } else {
389            Ok(None)
390        }
391    }
392}
393
394impl Operator for HashJoinOperator {
395    fn next(&mut self) -> OperatorResult {
396        // Phase 1: Build hash table
397        if !self.build_complete {
398            self.build_hash_table()?;
399        }
400
401        // Phase 3: Emit unmatched build rows (right/full outer join)
402        if self.emitting_unmatched {
403            return self.emit_unmatched_build();
404        }
405
406        // Phase 2: Probe
407        let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
408
409        loop {
410            // Get current probe chunk or fetch new one
411            if self.current_probe_chunk.is_none() {
412                if !self.get_next_probe_chunk()? {
413                    // No more probe data
414                    if matches!(self.join_type, JoinType::Right | JoinType::Full) {
415                        self.emitting_unmatched = true;
416                        return self.emit_unmatched_build();
417                    }
418                    return if builder.row_count() > 0 {
419                        Ok(Some(builder.finish()))
420                    } else {
421                        Ok(None)
422                    };
423                }
424            }
425
426            // Invariant: current_probe_chunk is Some here - the guard at line 396 either
427            // populates it via get_next_probe_chunk() or returns from the function
428            let probe_chunk = self
429                .current_probe_chunk
430                .as_ref()
431                .expect("probe chunk is Some: guard at line 396 ensures this");
432            let probe_rows: Vec<usize> = probe_chunk.selected_indices().collect();
433
434            while self.current_probe_row < probe_rows.len() {
435                let probe_row = probe_rows[self.current_probe_row];
436
437                // If we don't have current matches, look them up
438                if self.current_matches.is_empty() && self.current_match_position == 0 {
439                    let key = self.extract_key(probe_chunk, probe_row, &self.probe_keys)?;
440
441                    // Handle semi/anti joins differently
442                    match self.join_type {
443                        JoinType::Semi => {
444                            if self.hash_table.contains_key(&key) {
445                                // Emit probe row only
446                                for col_idx in 0..probe_chunk.column_count() {
447                                    if let (Some(src_col), Some(dst_col)) =
448                                        (probe_chunk.column(col_idx), builder.column_mut(col_idx))
449                                    {
450                                        if let Some(value) = src_col.get_value(probe_row) {
451                                            dst_col.push_value(value);
452                                        }
453                                    }
454                                }
455                                builder.advance_row();
456                            }
457                            self.current_probe_row += 1;
458                            continue;
459                        }
460                        JoinType::Anti => {
461                            if !self.hash_table.contains_key(&key) {
462                                // Emit probe row only
463                                for col_idx in 0..probe_chunk.column_count() {
464                                    if let (Some(src_col), Some(dst_col)) =
465                                        (probe_chunk.column(col_idx), builder.column_mut(col_idx))
466                                    {
467                                        if let Some(value) = src_col.get_value(probe_row) {
468                                            dst_col.push_value(value);
469                                        }
470                                    }
471                                }
472                                builder.advance_row();
473                            }
474                            self.current_probe_row += 1;
475                            continue;
476                        }
477                        _ => {
478                            self.current_matches =
479                                self.hash_table.get(&key).cloned().unwrap_or_default();
480                        }
481                    }
482                }
483
484                // Process matches
485                if self.current_matches.is_empty() {
486                    // No matches - for left/full outer join, emit with nulls
487                    if matches!(self.join_type, JoinType::Left | JoinType::Full) {
488                        self.produce_output_row(&mut builder, probe_chunk, probe_row, None, None)?;
489                    }
490                    self.current_probe_row += 1;
491                    self.current_match_position = 0;
492                } else {
493                    // Process each match
494                    while self.current_match_position < self.current_matches.len() {
495                        let (build_chunk_idx, build_row) =
496                            self.current_matches[self.current_match_position];
497                        let build_chunk = &self.build_chunks[build_chunk_idx];
498
499                        // Mark as matched for outer joins
500                        if matches!(self.join_type, JoinType::Left | JoinType::Full) {
501                            if probe_row < self.probe_matched.len() {
502                                self.probe_matched[probe_row] = true;
503                            }
504                        }
505                        if matches!(self.join_type, JoinType::Right | JoinType::Full) {
506                            if build_chunk_idx < self.build_matched.len()
507                                && build_row < self.build_matched[build_chunk_idx].len()
508                            {
509                                self.build_matched[build_chunk_idx][build_row] = true;
510                            }
511                        }
512
513                        self.produce_output_row(
514                            &mut builder,
515                            probe_chunk,
516                            probe_row,
517                            Some(build_chunk),
518                            Some(build_row),
519                        )?;
520
521                        self.current_match_position += 1;
522
523                        if builder.is_full() {
524                            return Ok(Some(builder.finish()));
525                        }
526                    }
527
528                    // Done with this probe row
529                    self.current_probe_row += 1;
530                    self.current_matches.clear();
531                    self.current_match_position = 0;
532                }
533
534                if builder.is_full() {
535                    return Ok(Some(builder.finish()));
536                }
537            }
538
539            // Done with current probe chunk
540            self.current_probe_chunk = None;
541            self.current_probe_row = 0;
542
543            if builder.row_count() > 0 {
544                return Ok(Some(builder.finish()));
545            }
546        }
547    }
548
549    fn reset(&mut self) {
550        self.probe_side.reset();
551        self.build_side.reset();
552        self.hash_table.clear();
553        self.build_chunks.clear();
554        self.build_complete = false;
555        self.current_probe_chunk = None;
556        self.current_probe_row = 0;
557        self.current_match_position = 0;
558        self.current_matches.clear();
559        self.probe_matched.clear();
560        self.build_matched.clear();
561        self.emitting_unmatched = false;
562        self.unmatched_chunk_idx = 0;
563        self.unmatched_row_idx = 0;
564    }
565
566    fn name(&self) -> &'static str {
567        "HashJoin"
568    }
569}
570
571/// Nested loop join operator.
572///
573/// Performs a cartesian product of both sides, filtering by the join condition.
574/// Less efficient than hash join but supports any join condition.
575pub struct NestedLoopJoinOperator {
576    /// Left side operator.
577    left: Box<dyn Operator>,
578    /// Right side operator.
579    right: Box<dyn Operator>,
580    /// Join condition predicate (if any).
581    condition: Option<Box<dyn JoinCondition>>,
582    /// Join type.
583    join_type: JoinType,
584    /// Output schema.
585    output_schema: Vec<LogicalType>,
586    /// Materialized right side chunks.
587    right_chunks: Vec<DataChunk>,
588    /// Whether the right side is materialized.
589    right_materialized: bool,
590    /// Current left chunk.
591    current_left_chunk: Option<DataChunk>,
592    /// Current row in the left chunk.
593    current_left_row: usize,
594    /// Current chunk index in the right side.
595    current_right_chunk: usize,
596    /// Whether the current left row has been matched (for Left Join).
597    current_left_matched: bool,
598    /// Current row in the current right chunk.
599    current_right_row: usize,
600}
601
602/// Trait for join conditions.
603pub trait JoinCondition: Send + Sync {
604    /// Evaluates the condition for a pair of rows.
605    fn evaluate(
606        &self,
607        left_chunk: &DataChunk,
608        left_row: usize,
609        right_chunk: &DataChunk,
610        right_row: usize,
611    ) -> bool;
612}
613
614/// A simple equality condition for nested loop joins.
615pub struct EqualityCondition {
616    /// Column index on the left side.
617    left_column: usize,
618    /// Column index on the right side.
619    right_column: usize,
620}
621
622impl EqualityCondition {
623    /// Creates a new equality condition.
624    pub fn new(left_column: usize, right_column: usize) -> Self {
625        Self {
626            left_column,
627            right_column,
628        }
629    }
630}
631
632impl JoinCondition for EqualityCondition {
633    fn evaluate(
634        &self,
635        left_chunk: &DataChunk,
636        left_row: usize,
637        right_chunk: &DataChunk,
638        right_row: usize,
639    ) -> bool {
640        let left_val = left_chunk
641            .column(self.left_column)
642            .and_then(|c| c.get_value(left_row));
643        let right_val = right_chunk
644            .column(self.right_column)
645            .and_then(|c| c.get_value(right_row));
646
647        match (left_val, right_val) {
648            (Some(l), Some(r)) => l == r,
649            _ => false,
650        }
651    }
652}
653
654impl NestedLoopJoinOperator {
655    /// Creates a new nested loop join operator.
656    pub fn new(
657        left: Box<dyn Operator>,
658        right: Box<dyn Operator>,
659        condition: Option<Box<dyn JoinCondition>>,
660        join_type: JoinType,
661        output_schema: Vec<LogicalType>,
662    ) -> Self {
663        Self {
664            left,
665            right,
666            condition,
667            join_type,
668            output_schema,
669            right_chunks: Vec::new(),
670            right_materialized: false,
671            current_left_chunk: None,
672            current_left_row: 0,
673            current_right_chunk: 0,
674            current_right_row: 0,
675            current_left_matched: false,
676        }
677    }
678
679    /// Materializes the right side.
680    fn materialize_right(&mut self) -> Result<(), OperatorError> {
681        while let Some(chunk) = self.right.next()? {
682            self.right_chunks.push(chunk);
683        }
684        self.right_materialized = true;
685        Ok(())
686    }
687
688    /// Produces an output row.
689    fn produce_row(
690        &self,
691        builder: &mut DataChunkBuilder,
692        left_chunk: &DataChunk,
693        left_row: usize,
694        right_chunk: &DataChunk,
695        right_row: usize,
696    ) {
697        // Copy left columns
698        for col_idx in 0..left_chunk.column_count() {
699            if let (Some(src), Some(dst)) =
700                (left_chunk.column(col_idx), builder.column_mut(col_idx))
701            {
702                if let Some(val) = src.get_value(left_row) {
703                    dst.push_value(val);
704                } else {
705                    dst.push_value(Value::Null);
706                }
707            }
708        }
709
710        // Copy right columns
711        let left_col_count = left_chunk.column_count();
712        for col_idx in 0..right_chunk.column_count() {
713            if let (Some(src), Some(dst)) = (
714                right_chunk.column(col_idx),
715                builder.column_mut(left_col_count + col_idx),
716            ) {
717                if let Some(val) = src.get_value(right_row) {
718                    dst.push_value(val);
719                } else {
720                    dst.push_value(Value::Null);
721                }
722            }
723        }
724
725        builder.advance_row();
726    }
727
728    /// Produces an output row with NULLs for the right side (for unmatched left rows in Left Join).
729    fn produce_left_unmatched_row(
730        &self,
731        builder: &mut DataChunkBuilder,
732        left_chunk: &DataChunk,
733        left_row: usize,
734        right_col_count: usize,
735    ) {
736        // Copy left columns
737        for col_idx in 0..left_chunk.column_count() {
738            if let (Some(src), Some(dst)) =
739                (left_chunk.column(col_idx), builder.column_mut(col_idx))
740            {
741                if let Some(val) = src.get_value(left_row) {
742                    dst.push_value(val);
743                } else {
744                    dst.push_value(Value::Null);
745                }
746            }
747        }
748
749        // Fill right columns with NULLs
750        let left_col_count = left_chunk.column_count();
751        for col_idx in 0..right_col_count {
752            if let Some(dst) = builder.column_mut(left_col_count + col_idx) {
753                dst.push_value(Value::Null);
754            }
755        }
756
757        builder.advance_row();
758    }
759}
760
761impl Operator for NestedLoopJoinOperator {
762    fn next(&mut self) -> OperatorResult {
763        // Materialize right side
764        if !self.right_materialized {
765            self.materialize_right()?;
766        }
767
768        // If right side is empty and not a left outer join, return nothing
769        if self.right_chunks.is_empty() && !matches!(self.join_type, JoinType::Left) {
770            return Ok(None);
771        }
772
773        let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
774
775        loop {
776            // Get current left chunk
777            if self.current_left_chunk.is_none() {
778                self.current_left_chunk = self.left.next()?;
779                self.current_left_row = 0;
780                self.current_right_chunk = 0;
781                self.current_right_row = 0;
782
783                if self.current_left_chunk.is_none() {
784                    // No more left data
785                    return if builder.row_count() > 0 {
786                        Ok(Some(builder.finish()))
787                    } else {
788                        Ok(None)
789                    };
790                }
791            }
792
793            let left_chunk = self.current_left_chunk.as_ref().unwrap();
794            let left_rows: Vec<usize> = left_chunk.selected_indices().collect();
795
796            // Calculate right column count for potential unmatched rows
797            let right_col_count = if !self.right_chunks.is_empty() {
798                self.right_chunks[0].column_count()
799            } else {
800                // Infer from output schema
801                self.output_schema
802                    .len()
803                    .saturating_sub(left_chunk.column_count())
804            };
805
806            // Process current left row against all right rows
807            while self.current_left_row < left_rows.len() {
808                let left_row = left_rows[self.current_left_row];
809
810                // Reset match tracking for this left row
811                if self.current_right_chunk == 0 && self.current_right_row == 0 {
812                    self.current_left_matched = false;
813                }
814
815                // Cross join or inner/other join
816                while self.current_right_chunk < self.right_chunks.len() {
817                    let right_chunk = &self.right_chunks[self.current_right_chunk];
818                    let right_rows: Vec<usize> = right_chunk.selected_indices().collect();
819
820                    while self.current_right_row < right_rows.len() {
821                        let right_row = right_rows[self.current_right_row];
822
823                        // Check condition
824                        let matches = match &self.condition {
825                            Some(cond) => {
826                                cond.evaluate(left_chunk, left_row, right_chunk, right_row)
827                            }
828                            None => true, // Cross join
829                        };
830
831                        if matches {
832                            self.current_left_matched = true;
833                            self.produce_row(
834                                &mut builder,
835                                left_chunk,
836                                left_row,
837                                right_chunk,
838                                right_row,
839                            );
840
841                            if builder.is_full() {
842                                self.current_right_row += 1;
843                                return Ok(Some(builder.finish()));
844                            }
845                        }
846
847                        self.current_right_row += 1;
848                    }
849
850                    self.current_right_chunk += 1;
851                    self.current_right_row = 0;
852                }
853
854                // Done processing all right rows for this left row
855                // For Left Join, emit unmatched left row with NULLs
856                if matches!(self.join_type, JoinType::Left) && !self.current_left_matched {
857                    self.produce_left_unmatched_row(
858                        &mut builder,
859                        left_chunk,
860                        left_row,
861                        right_col_count,
862                    );
863
864                    if builder.is_full() {
865                        self.current_left_row += 1;
866                        self.current_right_chunk = 0;
867                        self.current_right_row = 0;
868                        return Ok(Some(builder.finish()));
869                    }
870                }
871
872                // Move to next left row
873                self.current_left_row += 1;
874                self.current_right_chunk = 0;
875                self.current_right_row = 0;
876            }
877
878            // Done with current left chunk
879            self.current_left_chunk = None;
880
881            if builder.row_count() > 0 {
882                return Ok(Some(builder.finish()));
883            }
884        }
885    }
886
887    fn reset(&mut self) {
888        self.left.reset();
889        self.right.reset();
890        self.right_chunks.clear();
891        self.right_materialized = false;
892        self.current_left_chunk = None;
893        self.current_left_row = 0;
894        self.current_right_chunk = 0;
895        self.current_right_row = 0;
896        self.current_left_matched = false;
897    }
898
899    fn name(&self) -> &'static str {
900        "NestedLoopJoin"
901    }
902}
903
904#[cfg(test)]
905mod tests {
906    use super::*;
907    use crate::execution::chunk::DataChunkBuilder;
908
909    /// Mock operator for testing.
910    struct MockOperator {
911        chunks: Vec<DataChunk>,
912        position: usize,
913    }
914
915    impl MockOperator {
916        fn new(chunks: Vec<DataChunk>) -> Self {
917            Self {
918                chunks,
919                position: 0,
920            }
921        }
922    }
923
924    impl Operator for MockOperator {
925        fn next(&mut self) -> OperatorResult {
926            if self.position < self.chunks.len() {
927                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
928                self.position += 1;
929                Ok(Some(chunk))
930            } else {
931                Ok(None)
932            }
933        }
934
935        fn reset(&mut self) {
936            self.position = 0;
937        }
938
939        fn name(&self) -> &'static str {
940            "Mock"
941        }
942    }
943
944    fn create_int_chunk(values: &[i64]) -> DataChunk {
945        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
946        for &v in values {
947            builder.column_mut(0).unwrap().push_int64(v);
948            builder.advance_row();
949        }
950        builder.finish()
951    }
952
953    #[test]
954    fn test_hash_join_inner() {
955        // Left: [1, 2, 3, 4]
956        // Right: [2, 3, 4, 5]
957        // Inner join on column 0 should produce: [2, 3, 4]
958
959        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
960        let right = MockOperator::new(vec![create_int_chunk(&[2, 3, 4, 5])]);
961
962        let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
963        let mut join = HashJoinOperator::new(
964            Box::new(left),
965            Box::new(right),
966            vec![0],
967            vec![0],
968            JoinType::Inner,
969            output_schema,
970        );
971
972        let mut results = Vec::new();
973        while let Some(chunk) = join.next().unwrap() {
974            for row in chunk.selected_indices() {
975                let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
976                let right_val = chunk.column(1).unwrap().get_int64(row).unwrap();
977                results.push((left_val, right_val));
978            }
979        }
980
981        results.sort();
982        assert_eq!(results, vec![(2, 2), (3, 3), (4, 4)]);
983    }
984
985    #[test]
986    fn test_hash_join_left_outer() {
987        // Left: [1, 2, 3]
988        // Right: [2, 3]
989        // Left outer join should produce: [(1, null), (2, 2), (3, 3)]
990
991        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3])]);
992        let right = MockOperator::new(vec![create_int_chunk(&[2, 3])]);
993
994        let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
995        let mut join = HashJoinOperator::new(
996            Box::new(left),
997            Box::new(right),
998            vec![0],
999            vec![0],
1000            JoinType::Left,
1001            output_schema,
1002        );
1003
1004        let mut results = Vec::new();
1005        while let Some(chunk) = join.next().unwrap() {
1006            for row in chunk.selected_indices() {
1007                let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
1008                let right_val = chunk.column(1).unwrap().get_int64(row);
1009                results.push((left_val, right_val));
1010            }
1011        }
1012
1013        results.sort_by_key(|(l, _)| *l);
1014        assert_eq!(results.len(), 3);
1015        assert_eq!(results[0], (1, None)); // No match
1016        assert_eq!(results[1], (2, Some(2)));
1017        assert_eq!(results[2], (3, Some(3)));
1018    }
1019
1020    #[test]
1021    fn test_nested_loop_cross_join() {
1022        // Left: [1, 2]
1023        // Right: [10, 20]
1024        // Cross join should produce: [(1,10), (1,20), (2,10), (2,20)]
1025
1026        let left = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
1027        let right = MockOperator::new(vec![create_int_chunk(&[10, 20])]);
1028
1029        let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
1030        let mut join = NestedLoopJoinOperator::new(
1031            Box::new(left),
1032            Box::new(right),
1033            None,
1034            JoinType::Cross,
1035            output_schema,
1036        );
1037
1038        let mut results = Vec::new();
1039        while let Some(chunk) = join.next().unwrap() {
1040            for row in chunk.selected_indices() {
1041                let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
1042                let right_val = chunk.column(1).unwrap().get_int64(row).unwrap();
1043                results.push((left_val, right_val));
1044            }
1045        }
1046
1047        results.sort();
1048        assert_eq!(results, vec![(1, 10), (1, 20), (2, 10), (2, 20)]);
1049    }
1050
1051    #[test]
1052    fn test_hash_join_semi() {
1053        // Left: [1, 2, 3, 4]
1054        // Right: [2, 4]
1055        // Semi join should produce: [2, 4] (only left rows that have matches)
1056
1057        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
1058        let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
1059
1060        // Semi join only outputs probe (left) columns
1061        let output_schema = vec![LogicalType::Int64];
1062        let mut join = HashJoinOperator::new(
1063            Box::new(left),
1064            Box::new(right),
1065            vec![0],
1066            vec![0],
1067            JoinType::Semi,
1068            output_schema,
1069        );
1070
1071        let mut results = Vec::new();
1072        while let Some(chunk) = join.next().unwrap() {
1073            for row in chunk.selected_indices() {
1074                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
1075                results.push(val);
1076            }
1077        }
1078
1079        results.sort();
1080        assert_eq!(results, vec![2, 4]);
1081    }
1082
1083    #[test]
1084    fn test_hash_join_anti() {
1085        // Left: [1, 2, 3, 4]
1086        // Right: [2, 4]
1087        // Anti join should produce: [1, 3] (left rows with no matches)
1088
1089        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
1090        let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
1091
1092        let output_schema = vec![LogicalType::Int64];
1093        let mut join = HashJoinOperator::new(
1094            Box::new(left),
1095            Box::new(right),
1096            vec![0],
1097            vec![0],
1098            JoinType::Anti,
1099            output_schema,
1100        );
1101
1102        let mut results = Vec::new();
1103        while let Some(chunk) = join.next().unwrap() {
1104            for row in chunk.selected_indices() {
1105                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
1106                results.push(val);
1107            }
1108        }
1109
1110        results.sort();
1111        assert_eq!(results, vec![1, 3]);
1112    }
1113}