Skip to main content

grafeo_core/execution/operators/
join.rs

1//! Join operators for combining data from two sources.
2//!
3//! This module provides:
4//! - `HashJoinOperator`: Efficient hash-based join for equality conditions
5//! - `NestedLoopJoinOperator`: General-purpose join for any condition
6
7use std::collections::HashMap;
8
9use grafeo_common::types::{LogicalType, Value};
10
11use super::{Operator, OperatorError, OperatorResult};
12use crate::execution::chunk::DataChunkBuilder;
13use crate::execution::{DataChunk, ValueVector};
14
15/// The type of join to perform.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum JoinType {
18    /// Inner join: only matching rows from both sides.
19    Inner,
20    /// Left outer join: all rows from left, matching from right (nulls if no match).
21    Left,
22    /// Right outer join: all rows from right, matching from left (nulls if no match).
23    Right,
24    /// Full outer join: all rows from both sides.
25    Full,
26    /// Cross join: cartesian product of both sides.
27    Cross,
28    /// Semi join: rows from left that have a match in right.
29    Semi,
30    /// Anti join: rows from left that have no match in right.
31    Anti,
32}
33
34/// A hash key that can be hashed and compared for join operations.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub enum HashKey {
37    /// Null key.
38    Null,
39    /// Boolean key.
40    Bool(bool),
41    /// Integer key.
42    Int64(i64),
43    /// String key (using the string content for hashing).
44    String(String),
45    /// Composite key for multi-column joins.
46    Composite(Vec<HashKey>),
47}
48
49impl HashKey {
50    /// Creates a hash key from a Value.
51    pub fn from_value(value: &Value) -> Self {
52        match value {
53            Value::Null => HashKey::Null,
54            Value::Bool(b) => HashKey::Bool(*b),
55            Value::Int64(i) => HashKey::Int64(*i),
56            Value::Float64(f) => {
57                // Convert float to bits for consistent hashing
58                HashKey::Int64(f.to_bits() as i64)
59            }
60            Value::String(s) => HashKey::String(s.to_string()),
61            Value::Bytes(b) => {
62                // Use byte content for hashing
63                HashKey::String(format!("{b:?}"))
64            }
65            Value::Timestamp(t) => HashKey::Int64(t.as_micros()),
66            Value::List(items) => {
67                HashKey::Composite(items.iter().map(HashKey::from_value).collect())
68            }
69            Value::Map(map) => {
70                let mut keys: Vec<_> = map
71                    .iter()
72                    .map(|(k, v)| {
73                        HashKey::Composite(vec![
74                            HashKey::String(k.to_string()),
75                            HashKey::from_value(v),
76                        ])
77                    })
78                    .collect();
79                keys.sort_by(|a, b| format!("{a:?}").cmp(&format!("{b:?}")));
80                HashKey::Composite(keys)
81            }
82            Value::Vector(v) => {
83                // Hash vectors by converting each f32 to its bit representation
84                HashKey::Composite(
85                    v.iter()
86                        .map(|f| HashKey::Int64(f.to_bits() as i64))
87                        .collect(),
88                )
89            }
90        }
91    }
92
93    /// Creates a hash key from a column value at a given row.
94    pub fn from_column(column: &ValueVector, row: usize) -> Option<Self> {
95        column.get_value(row).map(|v| Self::from_value(&v))
96    }
97}
98
99/// Hash join operator.
100///
101/// Builds a hash table from the build side (right) and probes with the probe side (left).
102/// Efficient for equality joins on one or more columns.
103pub struct HashJoinOperator {
104    /// Left (probe) side operator.
105    probe_side: Box<dyn Operator>,
106    /// Right (build) side operator.
107    build_side: Box<dyn Operator>,
108    /// Column indices on the probe side for join keys.
109    probe_keys: Vec<usize>,
110    /// Column indices on the build side for join keys.
111    build_keys: Vec<usize>,
112    /// Join type.
113    join_type: JoinType,
114    /// Output schema (combined from both sides).
115    output_schema: Vec<LogicalType>,
116    /// Hash table: key -> list of (chunk_index, row_index).
117    hash_table: HashMap<HashKey, Vec<(usize, usize)>>,
118    /// Materialized build side chunks.
119    build_chunks: Vec<DataChunk>,
120    /// Whether the build phase is complete.
121    build_complete: bool,
122    /// Current probe chunk being processed.
123    current_probe_chunk: Option<DataChunk>,
124    /// Current row in the probe chunk.
125    current_probe_row: usize,
126    /// Current position in the hash table matches for the current probe row.
127    current_match_position: usize,
128    /// Current matches for the current probe row.
129    current_matches: Vec<(usize, usize)>,
130    /// For left/full outer joins: track which probe rows had matches.
131    probe_matched: Vec<bool>,
132    /// For right/full outer joins: track which build rows were matched.
133    build_matched: Vec<Vec<bool>>,
134    /// Whether we're in the emit unmatched phase (for outer joins).
135    emitting_unmatched: bool,
136    /// Current chunk index when emitting unmatched rows.
137    unmatched_chunk_idx: usize,
138    /// Current row index when emitting unmatched rows.
139    unmatched_row_idx: usize,
140}
141
142impl HashJoinOperator {
143    /// Creates a new hash join operator.
144    ///
145    /// # Arguments
146    /// * `probe_side` - Left side operator (will be probed).
147    /// * `build_side` - Right side operator (will build hash table).
148    /// * `probe_keys` - Column indices on probe side for join keys.
149    /// * `build_keys` - Column indices on build side for join keys.
150    /// * `join_type` - Type of join to perform.
151    /// * `output_schema` - Schema of the output (probe columns + build columns).
152    pub fn new(
153        probe_side: Box<dyn Operator>,
154        build_side: Box<dyn Operator>,
155        probe_keys: Vec<usize>,
156        build_keys: Vec<usize>,
157        join_type: JoinType,
158        output_schema: Vec<LogicalType>,
159    ) -> Self {
160        Self {
161            probe_side,
162            build_side,
163            probe_keys,
164            build_keys,
165            join_type,
166            output_schema,
167            hash_table: HashMap::new(),
168            build_chunks: Vec::new(),
169            build_complete: false,
170            current_probe_chunk: None,
171            current_probe_row: 0,
172            current_match_position: 0,
173            current_matches: Vec::new(),
174            probe_matched: Vec::new(),
175            build_matched: Vec::new(),
176            emitting_unmatched: false,
177            unmatched_chunk_idx: 0,
178            unmatched_row_idx: 0,
179        }
180    }
181
182    /// Builds the hash table from the build side.
183    fn build_hash_table(&mut self) -> Result<(), OperatorError> {
184        while let Some(chunk) = self.build_side.next()? {
185            let chunk_idx = self.build_chunks.len();
186
187            // Initialize match tracking for outer joins
188            if matches!(self.join_type, JoinType::Right | JoinType::Full) {
189                self.build_matched.push(vec![false; chunk.row_count()]);
190            }
191
192            // Add each row to the hash table
193            for row in chunk.selected_indices() {
194                let key = self.extract_key(&chunk, row, &self.build_keys)?;
195
196                // Skip null keys for inner/semi/anti joins
197                if matches!(key, HashKey::Null)
198                    && !matches!(
199                        self.join_type,
200                        JoinType::Left | JoinType::Right | JoinType::Full
201                    )
202                {
203                    continue;
204                }
205
206                self.hash_table
207                    .entry(key)
208                    .or_default()
209                    .push((chunk_idx, row));
210            }
211
212            self.build_chunks.push(chunk);
213        }
214
215        self.build_complete = true;
216        Ok(())
217    }
218
219    /// Extracts a hash key from a chunk row.
220    fn extract_key(
221        &self,
222        chunk: &DataChunk,
223        row: usize,
224        key_columns: &[usize],
225    ) -> Result<HashKey, OperatorError> {
226        if key_columns.len() == 1 {
227            let col = chunk.column(key_columns[0]).ok_or_else(|| {
228                OperatorError::ColumnNotFound(format!("column {}", key_columns[0]))
229            })?;
230            Ok(HashKey::from_column(col, row).unwrap_or(HashKey::Null))
231        } else {
232            let keys: Vec<HashKey> = key_columns
233                .iter()
234                .map(|&col_idx| {
235                    chunk
236                        .column(col_idx)
237                        .and_then(|col| HashKey::from_column(col, row))
238                        .unwrap_or(HashKey::Null)
239                })
240                .collect();
241            Ok(HashKey::Composite(keys))
242        }
243    }
244
245    /// Produces an output row from a probe row and build row.
246    fn produce_output_row(
247        &self,
248        builder: &mut DataChunkBuilder,
249        probe_chunk: &DataChunk,
250        probe_row: usize,
251        build_chunk: Option<&DataChunk>,
252        build_row: Option<usize>,
253    ) -> Result<(), OperatorError> {
254        let probe_col_count = probe_chunk.column_count();
255
256        // Copy probe side columns
257        for col_idx in 0..probe_col_count {
258            let src_col = probe_chunk
259                .column(col_idx)
260                .ok_or_else(|| OperatorError::ColumnNotFound(format!("probe column {col_idx}")))?;
261            let dst_col = builder
262                .column_mut(col_idx)
263                .ok_or_else(|| OperatorError::ColumnNotFound(format!("output column {col_idx}")))?;
264
265            if let Some(value) = src_col.get_value(probe_row) {
266                dst_col.push_value(value);
267            } else {
268                dst_col.push_value(Value::Null);
269            }
270        }
271
272        // Copy build side columns
273        match (build_chunk, build_row) {
274            (Some(chunk), Some(row)) => {
275                for col_idx in 0..chunk.column_count() {
276                    let src_col = chunk.column(col_idx).ok_or_else(|| {
277                        OperatorError::ColumnNotFound(format!("build column {col_idx}"))
278                    })?;
279                    let dst_col =
280                        builder
281                            .column_mut(probe_col_count + col_idx)
282                            .ok_or_else(|| {
283                                OperatorError::ColumnNotFound(format!(
284                                    "output column {}",
285                                    probe_col_count + col_idx
286                                ))
287                            })?;
288
289                    if let Some(value) = src_col.get_value(row) {
290                        dst_col.push_value(value);
291                    } else {
292                        dst_col.push_value(Value::Null);
293                    }
294                }
295            }
296            _ => {
297                // Emit nulls for build side (left outer join case)
298                if !self.build_chunks.is_empty() {
299                    let build_col_count = self.build_chunks[0].column_count();
300                    for col_idx in 0..build_col_count {
301                        let dst_col =
302                            builder
303                                .column_mut(probe_col_count + col_idx)
304                                .ok_or_else(|| {
305                                    OperatorError::ColumnNotFound(format!(
306                                        "output column {}",
307                                        probe_col_count + col_idx
308                                    ))
309                                })?;
310                        dst_col.push_value(Value::Null);
311                    }
312                }
313            }
314        }
315
316        builder.advance_row();
317        Ok(())
318    }
319
320    /// Gets the next probe chunk.
321    fn get_next_probe_chunk(&mut self) -> Result<bool, OperatorError> {
322        let chunk = self.probe_side.next()?;
323        if let Some(ref c) = chunk {
324            // Initialize match tracking for outer joins
325            if matches!(self.join_type, JoinType::Left | JoinType::Full) {
326                self.probe_matched = vec![false; c.row_count()];
327            }
328        }
329        let has_chunk = chunk.is_some();
330        self.current_probe_chunk = chunk;
331        self.current_probe_row = 0;
332        Ok(has_chunk)
333    }
334
335    /// Emits unmatched build rows for right/full outer joins.
336    fn emit_unmatched_build(&mut self) -> OperatorResult {
337        if self.build_matched.is_empty() {
338            return Ok(None);
339        }
340
341        let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
342
343        // Determine probe column count from schema or first probe chunk
344        let probe_col_count = if !self.build_chunks.is_empty() {
345            self.output_schema.len() - self.build_chunks[0].column_count()
346        } else {
347            0
348        };
349
350        while self.unmatched_chunk_idx < self.build_chunks.len() {
351            let chunk = &self.build_chunks[self.unmatched_chunk_idx];
352            let matched = &self.build_matched[self.unmatched_chunk_idx];
353
354            while self.unmatched_row_idx < matched.len() {
355                if !matched[self.unmatched_row_idx] {
356                    // This row was not matched - emit with nulls on probe side
357
358                    // Emit nulls for probe side
359                    for col_idx in 0..probe_col_count {
360                        if let Some(dst_col) = builder.column_mut(col_idx) {
361                            dst_col.push_value(Value::Null);
362                        }
363                    }
364
365                    // Copy build side values
366                    for col_idx in 0..chunk.column_count() {
367                        if let (Some(src_col), Some(dst_col)) = (
368                            chunk.column(col_idx),
369                            builder.column_mut(probe_col_count + col_idx),
370                        ) {
371                            if let Some(value) = src_col.get_value(self.unmatched_row_idx) {
372                                dst_col.push_value(value);
373                            } else {
374                                dst_col.push_value(Value::Null);
375                            }
376                        }
377                    }
378
379                    builder.advance_row();
380
381                    if builder.is_full() {
382                        self.unmatched_row_idx += 1;
383                        return Ok(Some(builder.finish()));
384                    }
385                }
386
387                self.unmatched_row_idx += 1;
388            }
389
390            self.unmatched_chunk_idx += 1;
391            self.unmatched_row_idx = 0;
392        }
393
394        if builder.row_count() > 0 {
395            Ok(Some(builder.finish()))
396        } else {
397            Ok(None)
398        }
399    }
400}
401
402impl Operator for HashJoinOperator {
403    fn next(&mut self) -> OperatorResult {
404        // Phase 1: Build hash table
405        if !self.build_complete {
406            self.build_hash_table()?;
407        }
408
409        // Phase 3: Emit unmatched build rows (right/full outer join)
410        if self.emitting_unmatched {
411            return self.emit_unmatched_build();
412        }
413
414        // Phase 2: Probe
415        let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
416
417        loop {
418            // Get current probe chunk or fetch new one
419            if self.current_probe_chunk.is_none() {
420                if !self.get_next_probe_chunk()? {
421                    // No more probe data
422                    if matches!(self.join_type, JoinType::Right | JoinType::Full) {
423                        self.emitting_unmatched = true;
424                        return self.emit_unmatched_build();
425                    }
426                    return if builder.row_count() > 0 {
427                        Ok(Some(builder.finish()))
428                    } else {
429                        Ok(None)
430                    };
431                }
432            }
433
434            // Invariant: current_probe_chunk is Some here - the guard at line 396 either
435            // populates it via get_next_probe_chunk() or returns from the function
436            let probe_chunk = self
437                .current_probe_chunk
438                .as_ref()
439                .expect("probe chunk is Some: guard at line 396 ensures this");
440            let probe_rows: Vec<usize> = probe_chunk.selected_indices().collect();
441
442            while self.current_probe_row < probe_rows.len() {
443                let probe_row = probe_rows[self.current_probe_row];
444
445                // If we don't have current matches, look them up
446                if self.current_matches.is_empty() && self.current_match_position == 0 {
447                    let key = self.extract_key(probe_chunk, probe_row, &self.probe_keys)?;
448
449                    // Handle semi/anti joins differently
450                    match self.join_type {
451                        JoinType::Semi => {
452                            if self.hash_table.contains_key(&key) {
453                                // Emit probe row only
454                                for col_idx in 0..probe_chunk.column_count() {
455                                    if let (Some(src_col), Some(dst_col)) =
456                                        (probe_chunk.column(col_idx), builder.column_mut(col_idx))
457                                    {
458                                        if let Some(value) = src_col.get_value(probe_row) {
459                                            dst_col.push_value(value);
460                                        }
461                                    }
462                                }
463                                builder.advance_row();
464                            }
465                            self.current_probe_row += 1;
466                            continue;
467                        }
468                        JoinType::Anti => {
469                            if !self.hash_table.contains_key(&key) {
470                                // Emit probe row only
471                                for col_idx in 0..probe_chunk.column_count() {
472                                    if let (Some(src_col), Some(dst_col)) =
473                                        (probe_chunk.column(col_idx), builder.column_mut(col_idx))
474                                    {
475                                        if let Some(value) = src_col.get_value(probe_row) {
476                                            dst_col.push_value(value);
477                                        }
478                                    }
479                                }
480                                builder.advance_row();
481                            }
482                            self.current_probe_row += 1;
483                            continue;
484                        }
485                        _ => {
486                            self.current_matches =
487                                self.hash_table.get(&key).cloned().unwrap_or_default();
488                        }
489                    }
490                }
491
492                // Process matches
493                if self.current_matches.is_empty() {
494                    // No matches - for left/full outer join, emit with nulls
495                    if matches!(self.join_type, JoinType::Left | JoinType::Full) {
496                        self.produce_output_row(&mut builder, probe_chunk, probe_row, None, None)?;
497                    }
498                    self.current_probe_row += 1;
499                    self.current_match_position = 0;
500                } else {
501                    // Process each match
502                    while self.current_match_position < self.current_matches.len() {
503                        let (build_chunk_idx, build_row) =
504                            self.current_matches[self.current_match_position];
505                        let build_chunk = &self.build_chunks[build_chunk_idx];
506
507                        // Mark as matched for outer joins
508                        if matches!(self.join_type, JoinType::Left | JoinType::Full) {
509                            if probe_row < self.probe_matched.len() {
510                                self.probe_matched[probe_row] = true;
511                            }
512                        }
513                        if matches!(self.join_type, JoinType::Right | JoinType::Full) {
514                            if build_chunk_idx < self.build_matched.len()
515                                && build_row < self.build_matched[build_chunk_idx].len()
516                            {
517                                self.build_matched[build_chunk_idx][build_row] = true;
518                            }
519                        }
520
521                        self.produce_output_row(
522                            &mut builder,
523                            probe_chunk,
524                            probe_row,
525                            Some(build_chunk),
526                            Some(build_row),
527                        )?;
528
529                        self.current_match_position += 1;
530
531                        if builder.is_full() {
532                            return Ok(Some(builder.finish()));
533                        }
534                    }
535
536                    // Done with this probe row
537                    self.current_probe_row += 1;
538                    self.current_matches.clear();
539                    self.current_match_position = 0;
540                }
541
542                if builder.is_full() {
543                    return Ok(Some(builder.finish()));
544                }
545            }
546
547            // Done with current probe chunk
548            self.current_probe_chunk = None;
549            self.current_probe_row = 0;
550
551            if builder.row_count() > 0 {
552                return Ok(Some(builder.finish()));
553            }
554        }
555    }
556
557    fn reset(&mut self) {
558        self.probe_side.reset();
559        self.build_side.reset();
560        self.hash_table.clear();
561        self.build_chunks.clear();
562        self.build_complete = false;
563        self.current_probe_chunk = None;
564        self.current_probe_row = 0;
565        self.current_match_position = 0;
566        self.current_matches.clear();
567        self.probe_matched.clear();
568        self.build_matched.clear();
569        self.emitting_unmatched = false;
570        self.unmatched_chunk_idx = 0;
571        self.unmatched_row_idx = 0;
572    }
573
574    fn name(&self) -> &'static str {
575        "HashJoin"
576    }
577}
578
579/// Nested loop join operator.
580///
581/// Performs a cartesian product of both sides, filtering by the join condition.
582/// Less efficient than hash join but supports any join condition.
583pub struct NestedLoopJoinOperator {
584    /// Left side operator.
585    left: Box<dyn Operator>,
586    /// Right side operator.
587    right: Box<dyn Operator>,
588    /// Join condition predicate (if any).
589    condition: Option<Box<dyn JoinCondition>>,
590    /// Join type.
591    join_type: JoinType,
592    /// Output schema.
593    output_schema: Vec<LogicalType>,
594    /// Materialized right side chunks.
595    right_chunks: Vec<DataChunk>,
596    /// Whether the right side is materialized.
597    right_materialized: bool,
598    /// Current left chunk.
599    current_left_chunk: Option<DataChunk>,
600    /// Current row in the left chunk.
601    current_left_row: usize,
602    /// Current chunk index in the right side.
603    current_right_chunk: usize,
604    /// Whether the current left row has been matched (for Left Join).
605    current_left_matched: bool,
606    /// Current row in the current right chunk.
607    current_right_row: usize,
608}
609
610/// Trait for join conditions.
611pub trait JoinCondition: Send + Sync {
612    /// Evaluates the condition for a pair of rows.
613    fn evaluate(
614        &self,
615        left_chunk: &DataChunk,
616        left_row: usize,
617        right_chunk: &DataChunk,
618        right_row: usize,
619    ) -> bool;
620}
621
622/// A simple equality condition for nested loop joins.
623pub struct EqualityCondition {
624    /// Column index on the left side.
625    left_column: usize,
626    /// Column index on the right side.
627    right_column: usize,
628}
629
630impl EqualityCondition {
631    /// Creates a new equality condition.
632    pub fn new(left_column: usize, right_column: usize) -> Self {
633        Self {
634            left_column,
635            right_column,
636        }
637    }
638}
639
640impl JoinCondition for EqualityCondition {
641    fn evaluate(
642        &self,
643        left_chunk: &DataChunk,
644        left_row: usize,
645        right_chunk: &DataChunk,
646        right_row: usize,
647    ) -> bool {
648        let left_val = left_chunk
649            .column(self.left_column)
650            .and_then(|c| c.get_value(left_row));
651        let right_val = right_chunk
652            .column(self.right_column)
653            .and_then(|c| c.get_value(right_row));
654
655        match (left_val, right_val) {
656            (Some(l), Some(r)) => l == r,
657            _ => false,
658        }
659    }
660}
661
662impl NestedLoopJoinOperator {
663    /// Creates a new nested loop join operator.
664    pub fn new(
665        left: Box<dyn Operator>,
666        right: Box<dyn Operator>,
667        condition: Option<Box<dyn JoinCondition>>,
668        join_type: JoinType,
669        output_schema: Vec<LogicalType>,
670    ) -> Self {
671        Self {
672            left,
673            right,
674            condition,
675            join_type,
676            output_schema,
677            right_chunks: Vec::new(),
678            right_materialized: false,
679            current_left_chunk: None,
680            current_left_row: 0,
681            current_right_chunk: 0,
682            current_right_row: 0,
683            current_left_matched: false,
684        }
685    }
686
687    /// Materializes the right side.
688    fn materialize_right(&mut self) -> Result<(), OperatorError> {
689        while let Some(chunk) = self.right.next()? {
690            self.right_chunks.push(chunk);
691        }
692        self.right_materialized = true;
693        Ok(())
694    }
695
696    /// Produces an output row.
697    fn produce_row(
698        &self,
699        builder: &mut DataChunkBuilder,
700        left_chunk: &DataChunk,
701        left_row: usize,
702        right_chunk: &DataChunk,
703        right_row: usize,
704    ) {
705        // Copy left columns
706        for col_idx in 0..left_chunk.column_count() {
707            if let (Some(src), Some(dst)) =
708                (left_chunk.column(col_idx), builder.column_mut(col_idx))
709            {
710                if let Some(val) = src.get_value(left_row) {
711                    dst.push_value(val);
712                } else {
713                    dst.push_value(Value::Null);
714                }
715            }
716        }
717
718        // Copy right columns
719        let left_col_count = left_chunk.column_count();
720        for col_idx in 0..right_chunk.column_count() {
721            if let (Some(src), Some(dst)) = (
722                right_chunk.column(col_idx),
723                builder.column_mut(left_col_count + col_idx),
724            ) {
725                if let Some(val) = src.get_value(right_row) {
726                    dst.push_value(val);
727                } else {
728                    dst.push_value(Value::Null);
729                }
730            }
731        }
732
733        builder.advance_row();
734    }
735
736    /// Produces an output row with NULLs for the right side (for unmatched left rows in Left Join).
737    fn produce_left_unmatched_row(
738        &self,
739        builder: &mut DataChunkBuilder,
740        left_chunk: &DataChunk,
741        left_row: usize,
742        right_col_count: usize,
743    ) {
744        // Copy left columns
745        for col_idx in 0..left_chunk.column_count() {
746            if let (Some(src), Some(dst)) =
747                (left_chunk.column(col_idx), builder.column_mut(col_idx))
748            {
749                if let Some(val) = src.get_value(left_row) {
750                    dst.push_value(val);
751                } else {
752                    dst.push_value(Value::Null);
753                }
754            }
755        }
756
757        // Fill right columns with NULLs
758        let left_col_count = left_chunk.column_count();
759        for col_idx in 0..right_col_count {
760            if let Some(dst) = builder.column_mut(left_col_count + col_idx) {
761                dst.push_value(Value::Null);
762            }
763        }
764
765        builder.advance_row();
766    }
767}
768
769impl Operator for NestedLoopJoinOperator {
770    fn next(&mut self) -> OperatorResult {
771        // Materialize right side
772        if !self.right_materialized {
773            self.materialize_right()?;
774        }
775
776        // If right side is empty and not a left outer join, return nothing
777        if self.right_chunks.is_empty() && !matches!(self.join_type, JoinType::Left) {
778            return Ok(None);
779        }
780
781        let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
782
783        loop {
784            // Get current left chunk
785            if self.current_left_chunk.is_none() {
786                self.current_left_chunk = self.left.next()?;
787                self.current_left_row = 0;
788                self.current_right_chunk = 0;
789                self.current_right_row = 0;
790
791                if self.current_left_chunk.is_none() {
792                    // No more left data
793                    return if builder.row_count() > 0 {
794                        Ok(Some(builder.finish()))
795                    } else {
796                        Ok(None)
797                    };
798                }
799            }
800
801            let left_chunk = self.current_left_chunk.as_ref().unwrap();
802            let left_rows: Vec<usize> = left_chunk.selected_indices().collect();
803
804            // Calculate right column count for potential unmatched rows
805            let right_col_count = if !self.right_chunks.is_empty() {
806                self.right_chunks[0].column_count()
807            } else {
808                // Infer from output schema
809                self.output_schema
810                    .len()
811                    .saturating_sub(left_chunk.column_count())
812            };
813
814            // Process current left row against all right rows
815            while self.current_left_row < left_rows.len() {
816                let left_row = left_rows[self.current_left_row];
817
818                // Reset match tracking for this left row
819                if self.current_right_chunk == 0 && self.current_right_row == 0 {
820                    self.current_left_matched = false;
821                }
822
823                // Cross join or inner/other join
824                while self.current_right_chunk < self.right_chunks.len() {
825                    let right_chunk = &self.right_chunks[self.current_right_chunk];
826                    let right_rows: Vec<usize> = right_chunk.selected_indices().collect();
827
828                    while self.current_right_row < right_rows.len() {
829                        let right_row = right_rows[self.current_right_row];
830
831                        // Check condition
832                        let matches = match &self.condition {
833                            Some(cond) => {
834                                cond.evaluate(left_chunk, left_row, right_chunk, right_row)
835                            }
836                            None => true, // Cross join
837                        };
838
839                        if matches {
840                            self.current_left_matched = true;
841                            self.produce_row(
842                                &mut builder,
843                                left_chunk,
844                                left_row,
845                                right_chunk,
846                                right_row,
847                            );
848
849                            if builder.is_full() {
850                                self.current_right_row += 1;
851                                return Ok(Some(builder.finish()));
852                            }
853                        }
854
855                        self.current_right_row += 1;
856                    }
857
858                    self.current_right_chunk += 1;
859                    self.current_right_row = 0;
860                }
861
862                // Done processing all right rows for this left row
863                // For Left Join, emit unmatched left row with NULLs
864                if matches!(self.join_type, JoinType::Left) && !self.current_left_matched {
865                    self.produce_left_unmatched_row(
866                        &mut builder,
867                        left_chunk,
868                        left_row,
869                        right_col_count,
870                    );
871
872                    if builder.is_full() {
873                        self.current_left_row += 1;
874                        self.current_right_chunk = 0;
875                        self.current_right_row = 0;
876                        return Ok(Some(builder.finish()));
877                    }
878                }
879
880                // Move to next left row
881                self.current_left_row += 1;
882                self.current_right_chunk = 0;
883                self.current_right_row = 0;
884            }
885
886            // Done with current left chunk
887            self.current_left_chunk = None;
888
889            if builder.row_count() > 0 {
890                return Ok(Some(builder.finish()));
891            }
892        }
893    }
894
895    fn reset(&mut self) {
896        self.left.reset();
897        self.right.reset();
898        self.right_chunks.clear();
899        self.right_materialized = false;
900        self.current_left_chunk = None;
901        self.current_left_row = 0;
902        self.current_right_chunk = 0;
903        self.current_right_row = 0;
904        self.current_left_matched = false;
905    }
906
907    fn name(&self) -> &'static str {
908        "NestedLoopJoin"
909    }
910}
911
912#[cfg(test)]
913mod tests {
914    use super::*;
915    use crate::execution::chunk::DataChunkBuilder;
916
917    /// Mock operator for testing.
918    struct MockOperator {
919        chunks: Vec<DataChunk>,
920        position: usize,
921    }
922
923    impl MockOperator {
924        fn new(chunks: Vec<DataChunk>) -> Self {
925            Self {
926                chunks,
927                position: 0,
928            }
929        }
930    }
931
932    impl Operator for MockOperator {
933        fn next(&mut self) -> OperatorResult {
934            if self.position < self.chunks.len() {
935                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
936                self.position += 1;
937                Ok(Some(chunk))
938            } else {
939                Ok(None)
940            }
941        }
942
943        fn reset(&mut self) {
944            self.position = 0;
945        }
946
947        fn name(&self) -> &'static str {
948            "Mock"
949        }
950    }
951
952    fn create_int_chunk(values: &[i64]) -> DataChunk {
953        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
954        for &v in values {
955            builder.column_mut(0).unwrap().push_int64(v);
956            builder.advance_row();
957        }
958        builder.finish()
959    }
960
961    #[test]
962    fn test_hash_join_inner() {
963        // Left: [1, 2, 3, 4]
964        // Right: [2, 3, 4, 5]
965        // Inner join on column 0 should produce: [2, 3, 4]
966
967        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
968        let right = MockOperator::new(vec![create_int_chunk(&[2, 3, 4, 5])]);
969
970        let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
971        let mut join = HashJoinOperator::new(
972            Box::new(left),
973            Box::new(right),
974            vec![0],
975            vec![0],
976            JoinType::Inner,
977            output_schema,
978        );
979
980        let mut results = Vec::new();
981        while let Some(chunk) = join.next().unwrap() {
982            for row in chunk.selected_indices() {
983                let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
984                let right_val = chunk.column(1).unwrap().get_int64(row).unwrap();
985                results.push((left_val, right_val));
986            }
987        }
988
989        results.sort();
990        assert_eq!(results, vec![(2, 2), (3, 3), (4, 4)]);
991    }
992
993    #[test]
994    fn test_hash_join_left_outer() {
995        // Left: [1, 2, 3]
996        // Right: [2, 3]
997        // Left outer join should produce: [(1, null), (2, 2), (3, 3)]
998
999        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3])]);
1000        let right = MockOperator::new(vec![create_int_chunk(&[2, 3])]);
1001
1002        let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
1003        let mut join = HashJoinOperator::new(
1004            Box::new(left),
1005            Box::new(right),
1006            vec![0],
1007            vec![0],
1008            JoinType::Left,
1009            output_schema,
1010        );
1011
1012        let mut results = Vec::new();
1013        while let Some(chunk) = join.next().unwrap() {
1014            for row in chunk.selected_indices() {
1015                let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
1016                let right_val = chunk.column(1).unwrap().get_int64(row);
1017                results.push((left_val, right_val));
1018            }
1019        }
1020
1021        results.sort_by_key(|(l, _)| *l);
1022        assert_eq!(results.len(), 3);
1023        assert_eq!(results[0], (1, None)); // No match
1024        assert_eq!(results[1], (2, Some(2)));
1025        assert_eq!(results[2], (3, Some(3)));
1026    }
1027
1028    #[test]
1029    fn test_nested_loop_cross_join() {
1030        // Left: [1, 2]
1031        // Right: [10, 20]
1032        // Cross join should produce: [(1,10), (1,20), (2,10), (2,20)]
1033
1034        let left = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
1035        let right = MockOperator::new(vec![create_int_chunk(&[10, 20])]);
1036
1037        let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
1038        let mut join = NestedLoopJoinOperator::new(
1039            Box::new(left),
1040            Box::new(right),
1041            None,
1042            JoinType::Cross,
1043            output_schema,
1044        );
1045
1046        let mut results = Vec::new();
1047        while let Some(chunk) = join.next().unwrap() {
1048            for row in chunk.selected_indices() {
1049                let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
1050                let right_val = chunk.column(1).unwrap().get_int64(row).unwrap();
1051                results.push((left_val, right_val));
1052            }
1053        }
1054
1055        results.sort();
1056        assert_eq!(results, vec![(1, 10), (1, 20), (2, 10), (2, 20)]);
1057    }
1058
1059    #[test]
1060    fn test_hash_join_semi() {
1061        // Left: [1, 2, 3, 4]
1062        // Right: [2, 4]
1063        // Semi join should produce: [2, 4] (only left rows that have matches)
1064
1065        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
1066        let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
1067
1068        // Semi join only outputs probe (left) columns
1069        let output_schema = vec![LogicalType::Int64];
1070        let mut join = HashJoinOperator::new(
1071            Box::new(left),
1072            Box::new(right),
1073            vec![0],
1074            vec![0],
1075            JoinType::Semi,
1076            output_schema,
1077        );
1078
1079        let mut results = Vec::new();
1080        while let Some(chunk) = join.next().unwrap() {
1081            for row in chunk.selected_indices() {
1082                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
1083                results.push(val);
1084            }
1085        }
1086
1087        results.sort();
1088        assert_eq!(results, vec![2, 4]);
1089    }
1090
1091    #[test]
1092    fn test_hash_join_anti() {
1093        // Left: [1, 2, 3, 4]
1094        // Right: [2, 4]
1095        // Anti join should produce: [1, 3] (left rows with no matches)
1096
1097        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
1098        let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
1099
1100        let output_schema = vec![LogicalType::Int64];
1101        let mut join = HashJoinOperator::new(
1102            Box::new(left),
1103            Box::new(right),
1104            vec![0],
1105            vec![0],
1106            JoinType::Anti,
1107            output_schema,
1108        );
1109
1110        let mut results = Vec::new();
1111        while let Some(chunk) = join.next().unwrap() {
1112            for row in chunk.selected_indices() {
1113                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
1114                results.push(val);
1115            }
1116        }
1117
1118        results.sort();
1119        assert_eq!(results, vec![1, 3]);
1120    }
1121}