Skip to main content

grafeo_core/execution/operators/
join.rs

1//! Join operators for combining data from two sources.
2//!
3//! This module provides:
4//! - `HashJoinOperator`: Efficient hash-based join for equality conditions
5//! - `NestedLoopJoinOperator`: General-purpose join for any condition
6
7use std::collections::HashMap;
8
9use grafeo_common::types::{LogicalType, Value};
10
11use super::{Operator, OperatorError, OperatorResult};
12use crate::execution::chunk::DataChunkBuilder;
13use crate::execution::{DataChunk, ValueVector};
14
15/// The type of join to perform.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum JoinType {
18    /// Inner join: only matching rows from both sides.
19    Inner,
20    /// Left outer join: all rows from left, matching from right (nulls if no match).
21    Left,
22    /// Right outer join: all rows from right, matching from left (nulls if no match).
23    Right,
24    /// Full outer join: all rows from both sides.
25    Full,
26    /// Cross join: cartesian product of both sides.
27    Cross,
28    /// Semi join: rows from left that have a match in right.
29    Semi,
30    /// Anti join: rows from left that have no match in right.
31    Anti,
32}
33
34/// A hash key that can be hashed and compared for join operations.
35#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub enum HashKey {
37    /// Null key.
38    Null,
39    /// Boolean key.
40    Bool(bool),
41    /// Integer key.
42    Int64(i64),
43    /// String key (using the string content for hashing).
44    String(String),
45    /// Composite key for multi-column joins.
46    Composite(Vec<HashKey>),
47}
48
49impl HashKey {
50    /// Creates a hash key from a Value.
51    pub fn from_value(value: &Value) -> Self {
52        match value {
53            Value::Null => HashKey::Null,
54            Value::Bool(b) => HashKey::Bool(*b),
55            Value::Int64(i) => HashKey::Int64(*i),
56            Value::Float64(f) => {
57                // Convert float to bits for consistent hashing
58                HashKey::Int64(f.to_bits() as i64)
59            }
60            Value::String(s) => HashKey::String(s.to_string()),
61            Value::Bytes(b) => {
62                // Use byte content for hashing
63                HashKey::String(format!("{b:?}"))
64            }
65            Value::Timestamp(t) => HashKey::Int64(t.as_micros()),
66            Value::Date(d) => HashKey::Int64(d.as_days() as i64),
67            Value::Time(t) => HashKey::Int64(t.as_nanos() as i64),
68            Value::Duration(d) => HashKey::Composite(vec![
69                HashKey::Int64(d.months()),
70                HashKey::Int64(d.days()),
71                HashKey::Int64(d.nanos()),
72            ]),
73            Value::ZonedDatetime(zdt) => HashKey::Int64(zdt.as_timestamp().as_micros()),
74            Value::List(items) => {
75                HashKey::Composite(items.iter().map(HashKey::from_value).collect())
76            }
77            Value::Map(map) => {
78                let mut keys: Vec<_> = map
79                    .iter()
80                    .map(|(k, v)| {
81                        HashKey::Composite(vec![
82                            HashKey::String(k.to_string()),
83                            HashKey::from_value(v),
84                        ])
85                    })
86                    .collect();
87                keys.sort_by(|a, b| format!("{a:?}").cmp(&format!("{b:?}")));
88                HashKey::Composite(keys)
89            }
90            Value::Vector(v) => {
91                // Hash vectors by converting each f32 to its bit representation
92                HashKey::Composite(
93                    v.iter()
94                        .map(|f| HashKey::Int64(f.to_bits() as i64))
95                        .collect(),
96                )
97            }
98            Value::Path { nodes, edges } => {
99                let mut parts: Vec<_> = nodes.iter().map(HashKey::from_value).collect();
100                parts.extend(edges.iter().map(HashKey::from_value));
101                HashKey::Composite(parts)
102            }
103        }
104    }
105
106    /// Creates a hash key from a column value at a given row.
107    pub fn from_column(column: &ValueVector, row: usize) -> Option<Self> {
108        column.get_value(row).map(|v| Self::from_value(&v))
109    }
110}
111
112/// Hash join operator.
113///
114/// Builds a hash table from the build side (right) and probes with the probe side (left).
115/// Efficient for equality joins on one or more columns.
116pub struct HashJoinOperator {
117    /// Left (probe) side operator.
118    probe_side: Box<dyn Operator>,
119    /// Right (build) side operator.
120    build_side: Box<dyn Operator>,
121    /// Column indices on the probe side for join keys.
122    probe_keys: Vec<usize>,
123    /// Column indices on the build side for join keys.
124    build_keys: Vec<usize>,
125    /// Join type.
126    join_type: JoinType,
127    /// Output schema (combined from both sides).
128    output_schema: Vec<LogicalType>,
129    /// Hash table: key -> list of (chunk_index, row_index).
130    hash_table: HashMap<HashKey, Vec<(usize, usize)>>,
131    /// Materialized build side chunks.
132    build_chunks: Vec<DataChunk>,
133    /// Whether the build phase is complete.
134    build_complete: bool,
135    /// Current probe chunk being processed.
136    current_probe_chunk: Option<DataChunk>,
137    /// Current row in the probe chunk.
138    current_probe_row: usize,
139    /// Current position in the hash table matches for the current probe row.
140    current_match_position: usize,
141    /// Current matches for the current probe row.
142    current_matches: Vec<(usize, usize)>,
143    /// For left/full outer joins: track which probe rows had matches.
144    probe_matched: Vec<bool>,
145    /// For right/full outer joins: track which build rows were matched.
146    build_matched: Vec<Vec<bool>>,
147    /// Whether we're in the emit unmatched phase (for outer joins).
148    emitting_unmatched: bool,
149    /// Current chunk index when emitting unmatched rows.
150    unmatched_chunk_idx: usize,
151    /// Current row index when emitting unmatched rows.
152    unmatched_row_idx: usize,
153}
154
155impl HashJoinOperator {
156    /// Creates a new hash join operator.
157    ///
158    /// # Arguments
159    /// * `probe_side` - Left side operator (will be probed).
160    /// * `build_side` - Right side operator (will build hash table).
161    /// * `probe_keys` - Column indices on probe side for join keys.
162    /// * `build_keys` - Column indices on build side for join keys.
163    /// * `join_type` - Type of join to perform.
164    /// * `output_schema` - Schema of the output (probe columns + build columns).
165    pub fn new(
166        probe_side: Box<dyn Operator>,
167        build_side: Box<dyn Operator>,
168        probe_keys: Vec<usize>,
169        build_keys: Vec<usize>,
170        join_type: JoinType,
171        output_schema: Vec<LogicalType>,
172    ) -> Self {
173        Self {
174            probe_side,
175            build_side,
176            probe_keys,
177            build_keys,
178            join_type,
179            output_schema,
180            hash_table: HashMap::new(),
181            build_chunks: Vec::new(),
182            build_complete: false,
183            current_probe_chunk: None,
184            current_probe_row: 0,
185            current_match_position: 0,
186            current_matches: Vec::new(),
187            probe_matched: Vec::new(),
188            build_matched: Vec::new(),
189            emitting_unmatched: false,
190            unmatched_chunk_idx: 0,
191            unmatched_row_idx: 0,
192        }
193    }
194
195    /// Builds the hash table from the build side.
196    fn build_hash_table(&mut self) -> Result<(), OperatorError> {
197        while let Some(chunk) = self.build_side.next()? {
198            let chunk_idx = self.build_chunks.len();
199
200            // Initialize match tracking for outer joins
201            if matches!(self.join_type, JoinType::Right | JoinType::Full) {
202                self.build_matched.push(vec![false; chunk.row_count()]);
203            }
204
205            // Add each row to the hash table
206            for row in chunk.selected_indices() {
207                let key = self.extract_key(&chunk, row, &self.build_keys)?;
208
209                // Skip null keys for inner/semi/anti joins
210                if matches!(key, HashKey::Null)
211                    && !matches!(
212                        self.join_type,
213                        JoinType::Left | JoinType::Right | JoinType::Full
214                    )
215                {
216                    continue;
217                }
218
219                self.hash_table
220                    .entry(key)
221                    .or_default()
222                    .push((chunk_idx, row));
223            }
224
225            self.build_chunks.push(chunk);
226        }
227
228        self.build_complete = true;
229        Ok(())
230    }
231
232    /// Extracts a hash key from a chunk row.
233    fn extract_key(
234        &self,
235        chunk: &DataChunk,
236        row: usize,
237        key_columns: &[usize],
238    ) -> Result<HashKey, OperatorError> {
239        if key_columns.len() == 1 {
240            let col = chunk.column(key_columns[0]).ok_or_else(|| {
241                OperatorError::ColumnNotFound(format!("column {}", key_columns[0]))
242            })?;
243            Ok(HashKey::from_column(col, row).unwrap_or(HashKey::Null))
244        } else {
245            let keys: Vec<HashKey> = key_columns
246                .iter()
247                .map(|&col_idx| {
248                    chunk
249                        .column(col_idx)
250                        .and_then(|col| HashKey::from_column(col, row))
251                        .unwrap_or(HashKey::Null)
252                })
253                .collect();
254            Ok(HashKey::Composite(keys))
255        }
256    }
257
258    /// Produces an output row from a probe row and build row.
259    fn produce_output_row(
260        &self,
261        builder: &mut DataChunkBuilder,
262        probe_chunk: &DataChunk,
263        probe_row: usize,
264        build_chunk: Option<&DataChunk>,
265        build_row: Option<usize>,
266    ) -> Result<(), OperatorError> {
267        let probe_col_count = probe_chunk.column_count();
268
269        // Copy probe side columns
270        for col_idx in 0..probe_col_count {
271            let src_col = probe_chunk
272                .column(col_idx)
273                .ok_or_else(|| OperatorError::ColumnNotFound(format!("probe column {col_idx}")))?;
274            let dst_col = builder
275                .column_mut(col_idx)
276                .ok_or_else(|| OperatorError::ColumnNotFound(format!("output column {col_idx}")))?;
277
278            if let Some(value) = src_col.get_value(probe_row) {
279                dst_col.push_value(value);
280            } else {
281                dst_col.push_value(Value::Null);
282            }
283        }
284
285        // Copy build side columns
286        match (build_chunk, build_row) {
287            (Some(chunk), Some(row)) => {
288                for col_idx in 0..chunk.column_count() {
289                    let src_col = chunk.column(col_idx).ok_or_else(|| {
290                        OperatorError::ColumnNotFound(format!("build column {col_idx}"))
291                    })?;
292                    let dst_col =
293                        builder
294                            .column_mut(probe_col_count + col_idx)
295                            .ok_or_else(|| {
296                                OperatorError::ColumnNotFound(format!(
297                                    "output column {}",
298                                    probe_col_count + col_idx
299                                ))
300                            })?;
301
302                    if let Some(value) = src_col.get_value(row) {
303                        dst_col.push_value(value);
304                    } else {
305                        dst_col.push_value(Value::Null);
306                    }
307                }
308            }
309            _ => {
310                // Emit nulls for build side (left outer join case)
311                if !self.build_chunks.is_empty() {
312                    let build_col_count = self.build_chunks[0].column_count();
313                    for col_idx in 0..build_col_count {
314                        let dst_col =
315                            builder
316                                .column_mut(probe_col_count + col_idx)
317                                .ok_or_else(|| {
318                                    OperatorError::ColumnNotFound(format!(
319                                        "output column {}",
320                                        probe_col_count + col_idx
321                                    ))
322                                })?;
323                        dst_col.push_value(Value::Null);
324                    }
325                }
326            }
327        }
328
329        builder.advance_row();
330        Ok(())
331    }
332
333    /// Gets the next probe chunk.
334    fn get_next_probe_chunk(&mut self) -> Result<bool, OperatorError> {
335        let chunk = self.probe_side.next()?;
336        if let Some(ref c) = chunk {
337            // Initialize match tracking for outer joins
338            if matches!(self.join_type, JoinType::Left | JoinType::Full) {
339                self.probe_matched = vec![false; c.row_count()];
340            }
341        }
342        let has_chunk = chunk.is_some();
343        self.current_probe_chunk = chunk;
344        self.current_probe_row = 0;
345        Ok(has_chunk)
346    }
347
348    /// Emits unmatched build rows for right/full outer joins.
349    fn emit_unmatched_build(&mut self) -> OperatorResult {
350        if self.build_matched.is_empty() {
351            return Ok(None);
352        }
353
354        let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
355
356        // Determine probe column count from schema or first probe chunk
357        let probe_col_count = if !self.build_chunks.is_empty() {
358            self.output_schema.len() - self.build_chunks[0].column_count()
359        } else {
360            0
361        };
362
363        while self.unmatched_chunk_idx < self.build_chunks.len() {
364            let chunk = &self.build_chunks[self.unmatched_chunk_idx];
365            let matched = &self.build_matched[self.unmatched_chunk_idx];
366
367            while self.unmatched_row_idx < matched.len() {
368                if !matched[self.unmatched_row_idx] {
369                    // This row was not matched - emit with nulls on probe side
370
371                    // Emit nulls for probe side
372                    for col_idx in 0..probe_col_count {
373                        if let Some(dst_col) = builder.column_mut(col_idx) {
374                            dst_col.push_value(Value::Null);
375                        }
376                    }
377
378                    // Copy build side values
379                    for col_idx in 0..chunk.column_count() {
380                        if let (Some(src_col), Some(dst_col)) = (
381                            chunk.column(col_idx),
382                            builder.column_mut(probe_col_count + col_idx),
383                        ) {
384                            if let Some(value) = src_col.get_value(self.unmatched_row_idx) {
385                                dst_col.push_value(value);
386                            } else {
387                                dst_col.push_value(Value::Null);
388                            }
389                        }
390                    }
391
392                    builder.advance_row();
393
394                    if builder.is_full() {
395                        self.unmatched_row_idx += 1;
396                        return Ok(Some(builder.finish()));
397                    }
398                }
399
400                self.unmatched_row_idx += 1;
401            }
402
403            self.unmatched_chunk_idx += 1;
404            self.unmatched_row_idx = 0;
405        }
406
407        if builder.row_count() > 0 {
408            Ok(Some(builder.finish()))
409        } else {
410            Ok(None)
411        }
412    }
413}
414
415impl Operator for HashJoinOperator {
416    fn next(&mut self) -> OperatorResult {
417        // Phase 1: Build hash table
418        if !self.build_complete {
419            self.build_hash_table()?;
420        }
421
422        // Phase 3: Emit unmatched build rows (right/full outer join)
423        if self.emitting_unmatched {
424            return self.emit_unmatched_build();
425        }
426
427        // Phase 2: Probe
428        let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
429
430        loop {
431            // Get current probe chunk or fetch new one
432            if self.current_probe_chunk.is_none() && !self.get_next_probe_chunk()? {
433                // No more probe data
434                if matches!(self.join_type, JoinType::Right | JoinType::Full) {
435                    self.emitting_unmatched = true;
436                    return self.emit_unmatched_build();
437                }
438                return if builder.row_count() > 0 {
439                    Ok(Some(builder.finish()))
440                } else {
441                    Ok(None)
442                };
443            }
444
445            // Invariant: current_probe_chunk is Some here - the guard at line 396 either
446            // populates it via get_next_probe_chunk() or returns from the function
447            let probe_chunk = self
448                .current_probe_chunk
449                .as_ref()
450                .expect("probe chunk is Some: guard at line 396 ensures this");
451            let probe_rows: Vec<usize> = probe_chunk.selected_indices().collect();
452
453            while self.current_probe_row < probe_rows.len() {
454                let probe_row = probe_rows[self.current_probe_row];
455
456                // If we don't have current matches, look them up
457                if self.current_matches.is_empty() && self.current_match_position == 0 {
458                    let key = self.extract_key(probe_chunk, probe_row, &self.probe_keys)?;
459
460                    // Handle semi/anti joins differently
461                    match self.join_type {
462                        JoinType::Semi => {
463                            if self.hash_table.contains_key(&key) {
464                                // Emit probe row only
465                                for col_idx in 0..probe_chunk.column_count() {
466                                    if let (Some(src_col), Some(dst_col)) =
467                                        (probe_chunk.column(col_idx), builder.column_mut(col_idx))
468                                        && let Some(value) = src_col.get_value(probe_row)
469                                    {
470                                        dst_col.push_value(value);
471                                    }
472                                }
473                                builder.advance_row();
474                            }
475                            self.current_probe_row += 1;
476                            continue;
477                        }
478                        JoinType::Anti => {
479                            if !self.hash_table.contains_key(&key) {
480                                // Emit probe row only
481                                for col_idx in 0..probe_chunk.column_count() {
482                                    if let (Some(src_col), Some(dst_col)) =
483                                        (probe_chunk.column(col_idx), builder.column_mut(col_idx))
484                                        && let Some(value) = src_col.get_value(probe_row)
485                                    {
486                                        dst_col.push_value(value);
487                                    }
488                                }
489                                builder.advance_row();
490                            }
491                            self.current_probe_row += 1;
492                            continue;
493                        }
494                        _ => {
495                            self.current_matches =
496                                self.hash_table.get(&key).cloned().unwrap_or_default();
497                        }
498                    }
499                }
500
501                // Process matches
502                if self.current_matches.is_empty() {
503                    // No matches - for left/full outer join, emit with nulls
504                    if matches!(self.join_type, JoinType::Left | JoinType::Full) {
505                        self.produce_output_row(&mut builder, probe_chunk, probe_row, None, None)?;
506                    }
507                    self.current_probe_row += 1;
508                    self.current_match_position = 0;
509                } else {
510                    // Process each match
511                    while self.current_match_position < self.current_matches.len() {
512                        let (build_chunk_idx, build_row) =
513                            self.current_matches[self.current_match_position];
514                        let build_chunk = &self.build_chunks[build_chunk_idx];
515
516                        // Mark as matched for outer joins
517                        if matches!(self.join_type, JoinType::Left | JoinType::Full)
518                            && probe_row < self.probe_matched.len()
519                        {
520                            self.probe_matched[probe_row] = true;
521                        }
522                        if matches!(self.join_type, JoinType::Right | JoinType::Full)
523                            && build_chunk_idx < self.build_matched.len()
524                            && build_row < self.build_matched[build_chunk_idx].len()
525                        {
526                            self.build_matched[build_chunk_idx][build_row] = true;
527                        }
528
529                        self.produce_output_row(
530                            &mut builder,
531                            probe_chunk,
532                            probe_row,
533                            Some(build_chunk),
534                            Some(build_row),
535                        )?;
536
537                        self.current_match_position += 1;
538
539                        if builder.is_full() {
540                            return Ok(Some(builder.finish()));
541                        }
542                    }
543
544                    // Done with this probe row
545                    self.current_probe_row += 1;
546                    self.current_matches.clear();
547                    self.current_match_position = 0;
548                }
549
550                if builder.is_full() {
551                    return Ok(Some(builder.finish()));
552                }
553            }
554
555            // Done with current probe chunk
556            self.current_probe_chunk = None;
557            self.current_probe_row = 0;
558
559            if builder.row_count() > 0 {
560                return Ok(Some(builder.finish()));
561            }
562        }
563    }
564
565    fn reset(&mut self) {
566        self.probe_side.reset();
567        self.build_side.reset();
568        self.hash_table.clear();
569        self.build_chunks.clear();
570        self.build_complete = false;
571        self.current_probe_chunk = None;
572        self.current_probe_row = 0;
573        self.current_match_position = 0;
574        self.current_matches.clear();
575        self.probe_matched.clear();
576        self.build_matched.clear();
577        self.emitting_unmatched = false;
578        self.unmatched_chunk_idx = 0;
579        self.unmatched_row_idx = 0;
580    }
581
582    fn name(&self) -> &'static str {
583        "HashJoin"
584    }
585}
586
587/// Nested loop join operator.
588///
589/// Performs a cartesian product of both sides, filtering by the join condition.
590/// Less efficient than hash join but supports any join condition.
591pub struct NestedLoopJoinOperator {
592    /// Left side operator.
593    left: Box<dyn Operator>,
594    /// Right side operator.
595    right: Box<dyn Operator>,
596    /// Join condition predicate (if any).
597    condition: Option<Box<dyn JoinCondition>>,
598    /// Join type.
599    join_type: JoinType,
600    /// Output schema.
601    output_schema: Vec<LogicalType>,
602    /// Materialized right side chunks.
603    right_chunks: Vec<DataChunk>,
604    /// Whether the right side is materialized.
605    right_materialized: bool,
606    /// Current left chunk.
607    current_left_chunk: Option<DataChunk>,
608    /// Current row in the left chunk.
609    current_left_row: usize,
610    /// Current chunk index in the right side.
611    current_right_chunk: usize,
612    /// Whether the current left row has been matched (for Left Join).
613    current_left_matched: bool,
614    /// Current row in the current right chunk.
615    current_right_row: usize,
616}
617
618/// Trait for join conditions.
619pub trait JoinCondition: Send + Sync {
620    /// Evaluates the condition for a pair of rows.
621    fn evaluate(
622        &self,
623        left_chunk: &DataChunk,
624        left_row: usize,
625        right_chunk: &DataChunk,
626        right_row: usize,
627    ) -> bool;
628}
629
630/// A simple equality condition for nested loop joins.
631pub struct EqualityCondition {
632    /// Column index on the left side.
633    left_column: usize,
634    /// Column index on the right side.
635    right_column: usize,
636}
637
638impl EqualityCondition {
639    /// Creates a new equality condition.
640    pub fn new(left_column: usize, right_column: usize) -> Self {
641        Self {
642            left_column,
643            right_column,
644        }
645    }
646}
647
648impl JoinCondition for EqualityCondition {
649    fn evaluate(
650        &self,
651        left_chunk: &DataChunk,
652        left_row: usize,
653        right_chunk: &DataChunk,
654        right_row: usize,
655    ) -> bool {
656        let left_val = left_chunk
657            .column(self.left_column)
658            .and_then(|c| c.get_value(left_row));
659        let right_val = right_chunk
660            .column(self.right_column)
661            .and_then(|c| c.get_value(right_row));
662
663        match (left_val, right_val) {
664            (Some(l), Some(r)) => l == r,
665            _ => false,
666        }
667    }
668}
669
670impl NestedLoopJoinOperator {
671    /// Creates a new nested loop join operator.
672    pub fn new(
673        left: Box<dyn Operator>,
674        right: Box<dyn Operator>,
675        condition: Option<Box<dyn JoinCondition>>,
676        join_type: JoinType,
677        output_schema: Vec<LogicalType>,
678    ) -> Self {
679        Self {
680            left,
681            right,
682            condition,
683            join_type,
684            output_schema,
685            right_chunks: Vec::new(),
686            right_materialized: false,
687            current_left_chunk: None,
688            current_left_row: 0,
689            current_right_chunk: 0,
690            current_right_row: 0,
691            current_left_matched: false,
692        }
693    }
694
695    /// Materializes the right side.
696    fn materialize_right(&mut self) -> Result<(), OperatorError> {
697        while let Some(chunk) = self.right.next()? {
698            self.right_chunks.push(chunk);
699        }
700        self.right_materialized = true;
701        Ok(())
702    }
703
704    /// Produces an output row.
705    fn produce_row(
706        &self,
707        builder: &mut DataChunkBuilder,
708        left_chunk: &DataChunk,
709        left_row: usize,
710        right_chunk: &DataChunk,
711        right_row: usize,
712    ) {
713        // Copy left columns
714        for col_idx in 0..left_chunk.column_count() {
715            if let (Some(src), Some(dst)) =
716                (left_chunk.column(col_idx), builder.column_mut(col_idx))
717            {
718                if let Some(val) = src.get_value(left_row) {
719                    dst.push_value(val);
720                } else {
721                    dst.push_value(Value::Null);
722                }
723            }
724        }
725
726        // Copy right columns
727        let left_col_count = left_chunk.column_count();
728        for col_idx in 0..right_chunk.column_count() {
729            if let (Some(src), Some(dst)) = (
730                right_chunk.column(col_idx),
731                builder.column_mut(left_col_count + col_idx),
732            ) {
733                if let Some(val) = src.get_value(right_row) {
734                    dst.push_value(val);
735                } else {
736                    dst.push_value(Value::Null);
737                }
738            }
739        }
740
741        builder.advance_row();
742    }
743
744    /// Produces an output row with NULLs for the right side (for unmatched left rows in Left Join).
745    fn produce_left_unmatched_row(
746        &self,
747        builder: &mut DataChunkBuilder,
748        left_chunk: &DataChunk,
749        left_row: usize,
750        right_col_count: usize,
751    ) {
752        // Copy left columns
753        for col_idx in 0..left_chunk.column_count() {
754            if let (Some(src), Some(dst)) =
755                (left_chunk.column(col_idx), builder.column_mut(col_idx))
756            {
757                if let Some(val) = src.get_value(left_row) {
758                    dst.push_value(val);
759                } else {
760                    dst.push_value(Value::Null);
761                }
762            }
763        }
764
765        // Fill right columns with NULLs
766        let left_col_count = left_chunk.column_count();
767        for col_idx in 0..right_col_count {
768            if let Some(dst) = builder.column_mut(left_col_count + col_idx) {
769                dst.push_value(Value::Null);
770            }
771        }
772
773        builder.advance_row();
774    }
775}
776
777impl Operator for NestedLoopJoinOperator {
778    fn next(&mut self) -> OperatorResult {
779        // Materialize right side
780        if !self.right_materialized {
781            self.materialize_right()?;
782        }
783
784        // If right side is empty and not a left outer join, return nothing
785        if self.right_chunks.is_empty() && !matches!(self.join_type, JoinType::Left) {
786            return Ok(None);
787        }
788
789        let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
790
791        loop {
792            // Get current left chunk
793            if self.current_left_chunk.is_none() {
794                self.current_left_chunk = self.left.next()?;
795                self.current_left_row = 0;
796                self.current_right_chunk = 0;
797                self.current_right_row = 0;
798
799                if self.current_left_chunk.is_none() {
800                    // No more left data
801                    return if builder.row_count() > 0 {
802                        Ok(Some(builder.finish()))
803                    } else {
804                        Ok(None)
805                    };
806                }
807            }
808
809            let left_chunk = self
810                .current_left_chunk
811                .as_ref()
812                .expect("left chunk is Some: loaded in loop above");
813            let left_rows: Vec<usize> = left_chunk.selected_indices().collect();
814
815            // Calculate right column count for potential unmatched rows
816            let right_col_count = if !self.right_chunks.is_empty() {
817                self.right_chunks[0].column_count()
818            } else {
819                // Infer from output schema
820                self.output_schema
821                    .len()
822                    .saturating_sub(left_chunk.column_count())
823            };
824
825            // Process current left row against all right rows
826            while self.current_left_row < left_rows.len() {
827                let left_row = left_rows[self.current_left_row];
828
829                // Reset match tracking for this left row
830                if self.current_right_chunk == 0 && self.current_right_row == 0 {
831                    self.current_left_matched = false;
832                }
833
834                // Cross join or inner/other join
835                while self.current_right_chunk < self.right_chunks.len() {
836                    let right_chunk = &self.right_chunks[self.current_right_chunk];
837                    let right_rows: Vec<usize> = right_chunk.selected_indices().collect();
838
839                    while self.current_right_row < right_rows.len() {
840                        let right_row = right_rows[self.current_right_row];
841
842                        // Check condition
843                        let matches = match &self.condition {
844                            Some(cond) => {
845                                cond.evaluate(left_chunk, left_row, right_chunk, right_row)
846                            }
847                            None => true, // Cross join
848                        };
849
850                        if matches {
851                            self.current_left_matched = true;
852                            self.produce_row(
853                                &mut builder,
854                                left_chunk,
855                                left_row,
856                                right_chunk,
857                                right_row,
858                            );
859
860                            if builder.is_full() {
861                                self.current_right_row += 1;
862                                return Ok(Some(builder.finish()));
863                            }
864                        }
865
866                        self.current_right_row += 1;
867                    }
868
869                    self.current_right_chunk += 1;
870                    self.current_right_row = 0;
871                }
872
873                // Done processing all right rows for this left row
874                // For Left Join, emit unmatched left row with NULLs
875                if matches!(self.join_type, JoinType::Left) && !self.current_left_matched {
876                    self.produce_left_unmatched_row(
877                        &mut builder,
878                        left_chunk,
879                        left_row,
880                        right_col_count,
881                    );
882
883                    if builder.is_full() {
884                        self.current_left_row += 1;
885                        self.current_right_chunk = 0;
886                        self.current_right_row = 0;
887                        return Ok(Some(builder.finish()));
888                    }
889                }
890
891                // Move to next left row
892                self.current_left_row += 1;
893                self.current_right_chunk = 0;
894                self.current_right_row = 0;
895            }
896
897            // Done with current left chunk
898            self.current_left_chunk = None;
899
900            if builder.row_count() > 0 {
901                return Ok(Some(builder.finish()));
902            }
903        }
904    }
905
906    fn reset(&mut self) {
907        self.left.reset();
908        self.right.reset();
909        self.right_chunks.clear();
910        self.right_materialized = false;
911        self.current_left_chunk = None;
912        self.current_left_row = 0;
913        self.current_right_chunk = 0;
914        self.current_right_row = 0;
915        self.current_left_matched = false;
916    }
917
918    fn name(&self) -> &'static str {
919        "NestedLoopJoin"
920    }
921}
922
923#[cfg(test)]
924mod tests {
925    use super::*;
926    use crate::execution::chunk::DataChunkBuilder;
927
928    /// Mock operator for testing.
929    struct MockOperator {
930        chunks: Vec<DataChunk>,
931        position: usize,
932    }
933
934    impl MockOperator {
935        fn new(chunks: Vec<DataChunk>) -> Self {
936            Self {
937                chunks,
938                position: 0,
939            }
940        }
941    }
942
943    impl Operator for MockOperator {
944        fn next(&mut self) -> OperatorResult {
945            if self.position < self.chunks.len() {
946                let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
947                self.position += 1;
948                Ok(Some(chunk))
949            } else {
950                Ok(None)
951            }
952        }
953
954        fn reset(&mut self) {
955            self.position = 0;
956        }
957
958        fn name(&self) -> &'static str {
959            "Mock"
960        }
961    }
962
963    fn create_int_chunk(values: &[i64]) -> DataChunk {
964        let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
965        for &v in values {
966            builder.column_mut(0).unwrap().push_int64(v);
967            builder.advance_row();
968        }
969        builder.finish()
970    }
971
972    #[test]
973    fn test_hash_join_inner() {
974        // Left: [1, 2, 3, 4]
975        // Right: [2, 3, 4, 5]
976        // Inner join on column 0 should produce: [2, 3, 4]
977
978        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
979        let right = MockOperator::new(vec![create_int_chunk(&[2, 3, 4, 5])]);
980
981        let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
982        let mut join = HashJoinOperator::new(
983            Box::new(left),
984            Box::new(right),
985            vec![0],
986            vec![0],
987            JoinType::Inner,
988            output_schema,
989        );
990
991        let mut results = Vec::new();
992        while let Some(chunk) = join.next().unwrap() {
993            for row in chunk.selected_indices() {
994                let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
995                let right_val = chunk.column(1).unwrap().get_int64(row).unwrap();
996                results.push((left_val, right_val));
997            }
998        }
999
1000        results.sort_unstable();
1001        assert_eq!(results, vec![(2, 2), (3, 3), (4, 4)]);
1002    }
1003
1004    #[test]
1005    fn test_hash_join_left_outer() {
1006        // Left: [1, 2, 3]
1007        // Right: [2, 3]
1008        // Left outer join should produce: [(1, null), (2, 2), (3, 3)]
1009
1010        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3])]);
1011        let right = MockOperator::new(vec![create_int_chunk(&[2, 3])]);
1012
1013        let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
1014        let mut join = HashJoinOperator::new(
1015            Box::new(left),
1016            Box::new(right),
1017            vec![0],
1018            vec![0],
1019            JoinType::Left,
1020            output_schema,
1021        );
1022
1023        let mut results = Vec::new();
1024        while let Some(chunk) = join.next().unwrap() {
1025            for row in chunk.selected_indices() {
1026                let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
1027                let right_val = chunk.column(1).unwrap().get_int64(row);
1028                results.push((left_val, right_val));
1029            }
1030        }
1031
1032        results.sort_by_key(|(l, _)| *l);
1033        assert_eq!(results.len(), 3);
1034        assert_eq!(results[0], (1, None)); // No match
1035        assert_eq!(results[1], (2, Some(2)));
1036        assert_eq!(results[2], (3, Some(3)));
1037    }
1038
1039    #[test]
1040    fn test_nested_loop_cross_join() {
1041        // Left: [1, 2]
1042        // Right: [10, 20]
1043        // Cross join should produce: [(1,10), (1,20), (2,10), (2,20)]
1044
1045        let left = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
1046        let right = MockOperator::new(vec![create_int_chunk(&[10, 20])]);
1047
1048        let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
1049        let mut join = NestedLoopJoinOperator::new(
1050            Box::new(left),
1051            Box::new(right),
1052            None,
1053            JoinType::Cross,
1054            output_schema,
1055        );
1056
1057        let mut results = Vec::new();
1058        while let Some(chunk) = join.next().unwrap() {
1059            for row in chunk.selected_indices() {
1060                let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
1061                let right_val = chunk.column(1).unwrap().get_int64(row).unwrap();
1062                results.push((left_val, right_val));
1063            }
1064        }
1065
1066        results.sort_unstable();
1067        assert_eq!(results, vec![(1, 10), (1, 20), (2, 10), (2, 20)]);
1068    }
1069
1070    #[test]
1071    fn test_hash_join_semi() {
1072        // Left: [1, 2, 3, 4]
1073        // Right: [2, 4]
1074        // Semi join should produce: [2, 4] (only left rows that have matches)
1075
1076        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
1077        let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
1078
1079        // Semi join only outputs probe (left) columns
1080        let output_schema = vec![LogicalType::Int64];
1081        let mut join = HashJoinOperator::new(
1082            Box::new(left),
1083            Box::new(right),
1084            vec![0],
1085            vec![0],
1086            JoinType::Semi,
1087            output_schema,
1088        );
1089
1090        let mut results = Vec::new();
1091        while let Some(chunk) = join.next().unwrap() {
1092            for row in chunk.selected_indices() {
1093                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
1094                results.push(val);
1095            }
1096        }
1097
1098        results.sort_unstable();
1099        assert_eq!(results, vec![2, 4]);
1100    }
1101
1102    #[test]
1103    fn test_hash_join_anti() {
1104        // Left: [1, 2, 3, 4]
1105        // Right: [2, 4]
1106        // Anti join should produce: [1, 3] (left rows with no matches)
1107
1108        let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
1109        let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
1110
1111        let output_schema = vec![LogicalType::Int64];
1112        let mut join = HashJoinOperator::new(
1113            Box::new(left),
1114            Box::new(right),
1115            vec![0],
1116            vec![0],
1117            JoinType::Anti,
1118            output_schema,
1119        );
1120
1121        let mut results = Vec::new();
1122        while let Some(chunk) = join.next().unwrap() {
1123            for row in chunk.selected_indices() {
1124                let val = chunk.column(0).unwrap().get_int64(row).unwrap();
1125                results.push(val);
1126            }
1127        }
1128
1129        results.sort_unstable();
1130        assert_eq!(results, vec![1, 3]);
1131    }
1132}