1use std::collections::HashMap;
8
9use grafeo_common::types::{LogicalType, Value};
10
11use super::{Operator, OperatorError, OperatorResult};
12use crate::execution::chunk::DataChunkBuilder;
13use crate::execution::{DataChunk, ValueVector};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum JoinType {
18 Inner,
20 Left,
22 Right,
24 Full,
26 Cross,
28 Semi,
30 Anti,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub enum HashKey {
37 Null,
39 Bool(bool),
41 Int64(i64),
43 String(String),
45 Composite(Vec<HashKey>),
47}
48
49impl HashKey {
50 pub fn from_value(value: &Value) -> Self {
52 match value {
53 Value::Null => HashKey::Null,
54 Value::Bool(b) => HashKey::Bool(*b),
55 Value::Int64(i) => HashKey::Int64(*i),
56 Value::Float64(f) => {
57 HashKey::Int64(f.to_bits() as i64)
59 }
60 Value::String(s) => HashKey::String(s.to_string()),
61 Value::Bytes(b) => {
62 HashKey::String(format!("{b:?}"))
64 }
65 Value::Timestamp(t) => HashKey::Int64(t.as_micros()),
66 Value::List(items) => {
67 HashKey::Composite(items.iter().map(HashKey::from_value).collect())
68 }
69 Value::Map(map) => {
70 let mut keys: Vec<_> = map
71 .iter()
72 .map(|(k, v)| {
73 HashKey::Composite(vec![
74 HashKey::String(k.to_string()),
75 HashKey::from_value(v),
76 ])
77 })
78 .collect();
79 keys.sort_by(|a, b| format!("{a:?}").cmp(&format!("{b:?}")));
80 HashKey::Composite(keys)
81 }
82 }
83 }
84
85 pub fn from_column(column: &ValueVector, row: usize) -> Option<Self> {
87 column.get_value(row).map(|v| Self::from_value(&v))
88 }
89}
90
91pub struct HashJoinOperator {
96 probe_side: Box<dyn Operator>,
98 build_side: Box<dyn Operator>,
100 probe_keys: Vec<usize>,
102 build_keys: Vec<usize>,
104 join_type: JoinType,
106 output_schema: Vec<LogicalType>,
108 hash_table: HashMap<HashKey, Vec<(usize, usize)>>,
110 build_chunks: Vec<DataChunk>,
112 build_complete: bool,
114 current_probe_chunk: Option<DataChunk>,
116 current_probe_row: usize,
118 current_match_position: usize,
120 current_matches: Vec<(usize, usize)>,
122 probe_matched: Vec<bool>,
124 build_matched: Vec<Vec<bool>>,
126 emitting_unmatched: bool,
128 unmatched_chunk_idx: usize,
130 unmatched_row_idx: usize,
132}
133
134impl HashJoinOperator {
135 pub fn new(
145 probe_side: Box<dyn Operator>,
146 build_side: Box<dyn Operator>,
147 probe_keys: Vec<usize>,
148 build_keys: Vec<usize>,
149 join_type: JoinType,
150 output_schema: Vec<LogicalType>,
151 ) -> Self {
152 Self {
153 probe_side,
154 build_side,
155 probe_keys,
156 build_keys,
157 join_type,
158 output_schema,
159 hash_table: HashMap::new(),
160 build_chunks: Vec::new(),
161 build_complete: false,
162 current_probe_chunk: None,
163 current_probe_row: 0,
164 current_match_position: 0,
165 current_matches: Vec::new(),
166 probe_matched: Vec::new(),
167 build_matched: Vec::new(),
168 emitting_unmatched: false,
169 unmatched_chunk_idx: 0,
170 unmatched_row_idx: 0,
171 }
172 }
173
174 fn build_hash_table(&mut self) -> Result<(), OperatorError> {
176 while let Some(chunk) = self.build_side.next()? {
177 let chunk_idx = self.build_chunks.len();
178
179 if matches!(self.join_type, JoinType::Right | JoinType::Full) {
181 self.build_matched.push(vec![false; chunk.row_count()]);
182 }
183
184 for row in chunk.selected_indices() {
186 let key = self.extract_key(&chunk, row, &self.build_keys)?;
187
188 if matches!(key, HashKey::Null)
190 && !matches!(
191 self.join_type,
192 JoinType::Left | JoinType::Right | JoinType::Full
193 )
194 {
195 continue;
196 }
197
198 self.hash_table
199 .entry(key)
200 .or_default()
201 .push((chunk_idx, row));
202 }
203
204 self.build_chunks.push(chunk);
205 }
206
207 self.build_complete = true;
208 Ok(())
209 }
210
211 fn extract_key(
213 &self,
214 chunk: &DataChunk,
215 row: usize,
216 key_columns: &[usize],
217 ) -> Result<HashKey, OperatorError> {
218 if key_columns.len() == 1 {
219 let col = chunk.column(key_columns[0]).ok_or_else(|| {
220 OperatorError::ColumnNotFound(format!("column {}", key_columns[0]))
221 })?;
222 Ok(HashKey::from_column(col, row).unwrap_or(HashKey::Null))
223 } else {
224 let keys: Vec<HashKey> = key_columns
225 .iter()
226 .map(|&col_idx| {
227 chunk
228 .column(col_idx)
229 .and_then(|col| HashKey::from_column(col, row))
230 .unwrap_or(HashKey::Null)
231 })
232 .collect();
233 Ok(HashKey::Composite(keys))
234 }
235 }
236
237 fn produce_output_row(
239 &self,
240 builder: &mut DataChunkBuilder,
241 probe_chunk: &DataChunk,
242 probe_row: usize,
243 build_chunk: Option<&DataChunk>,
244 build_row: Option<usize>,
245 ) -> Result<(), OperatorError> {
246 let probe_col_count = probe_chunk.column_count();
247
248 for col_idx in 0..probe_col_count {
250 let src_col = probe_chunk
251 .column(col_idx)
252 .ok_or_else(|| OperatorError::ColumnNotFound(format!("probe column {col_idx}")))?;
253 let dst_col = builder
254 .column_mut(col_idx)
255 .ok_or_else(|| OperatorError::ColumnNotFound(format!("output column {col_idx}")))?;
256
257 if let Some(value) = src_col.get_value(probe_row) {
258 dst_col.push_value(value);
259 } else {
260 dst_col.push_value(Value::Null);
261 }
262 }
263
264 match (build_chunk, build_row) {
266 (Some(chunk), Some(row)) => {
267 for col_idx in 0..chunk.column_count() {
268 let src_col = chunk.column(col_idx).ok_or_else(|| {
269 OperatorError::ColumnNotFound(format!("build column {col_idx}"))
270 })?;
271 let dst_col =
272 builder
273 .column_mut(probe_col_count + col_idx)
274 .ok_or_else(|| {
275 OperatorError::ColumnNotFound(format!(
276 "output column {}",
277 probe_col_count + col_idx
278 ))
279 })?;
280
281 if let Some(value) = src_col.get_value(row) {
282 dst_col.push_value(value);
283 } else {
284 dst_col.push_value(Value::Null);
285 }
286 }
287 }
288 _ => {
289 if !self.build_chunks.is_empty() {
291 let build_col_count = self.build_chunks[0].column_count();
292 for col_idx in 0..build_col_count {
293 let dst_col =
294 builder
295 .column_mut(probe_col_count + col_idx)
296 .ok_or_else(|| {
297 OperatorError::ColumnNotFound(format!(
298 "output column {}",
299 probe_col_count + col_idx
300 ))
301 })?;
302 dst_col.push_value(Value::Null);
303 }
304 }
305 }
306 }
307
308 builder.advance_row();
309 Ok(())
310 }
311
312 fn get_next_probe_chunk(&mut self) -> Result<bool, OperatorError> {
314 let chunk = self.probe_side.next()?;
315 if let Some(ref c) = chunk {
316 if matches!(self.join_type, JoinType::Left | JoinType::Full) {
318 self.probe_matched = vec![false; c.row_count()];
319 }
320 }
321 let has_chunk = chunk.is_some();
322 self.current_probe_chunk = chunk;
323 self.current_probe_row = 0;
324 Ok(has_chunk)
325 }
326
327 fn emit_unmatched_build(&mut self) -> OperatorResult {
329 if self.build_matched.is_empty() {
330 return Ok(None);
331 }
332
333 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
334
335 let probe_col_count = if !self.build_chunks.is_empty() {
337 self.output_schema.len() - self.build_chunks[0].column_count()
338 } else {
339 0
340 };
341
342 while self.unmatched_chunk_idx < self.build_chunks.len() {
343 let chunk = &self.build_chunks[self.unmatched_chunk_idx];
344 let matched = &self.build_matched[self.unmatched_chunk_idx];
345
346 while self.unmatched_row_idx < matched.len() {
347 if !matched[self.unmatched_row_idx] {
348 for col_idx in 0..probe_col_count {
352 if let Some(dst_col) = builder.column_mut(col_idx) {
353 dst_col.push_value(Value::Null);
354 }
355 }
356
357 for col_idx in 0..chunk.column_count() {
359 if let (Some(src_col), Some(dst_col)) = (
360 chunk.column(col_idx),
361 builder.column_mut(probe_col_count + col_idx),
362 ) {
363 if let Some(value) = src_col.get_value(self.unmatched_row_idx) {
364 dst_col.push_value(value);
365 } else {
366 dst_col.push_value(Value::Null);
367 }
368 }
369 }
370
371 builder.advance_row();
372
373 if builder.is_full() {
374 self.unmatched_row_idx += 1;
375 return Ok(Some(builder.finish()));
376 }
377 }
378
379 self.unmatched_row_idx += 1;
380 }
381
382 self.unmatched_chunk_idx += 1;
383 self.unmatched_row_idx = 0;
384 }
385
386 if builder.row_count() > 0 {
387 Ok(Some(builder.finish()))
388 } else {
389 Ok(None)
390 }
391 }
392}
393
394impl Operator for HashJoinOperator {
395 fn next(&mut self) -> OperatorResult {
396 if !self.build_complete {
398 self.build_hash_table()?;
399 }
400
401 if self.emitting_unmatched {
403 return self.emit_unmatched_build();
404 }
405
406 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
408
409 loop {
410 if self.current_probe_chunk.is_none() {
412 if !self.get_next_probe_chunk()? {
413 if matches!(self.join_type, JoinType::Right | JoinType::Full) {
415 self.emitting_unmatched = true;
416 return self.emit_unmatched_build();
417 }
418 return if builder.row_count() > 0 {
419 Ok(Some(builder.finish()))
420 } else {
421 Ok(None)
422 };
423 }
424 }
425
426 let probe_chunk = self
429 .current_probe_chunk
430 .as_ref()
431 .expect("probe chunk is Some: guard at line 396 ensures this");
432 let probe_rows: Vec<usize> = probe_chunk.selected_indices().collect();
433
434 while self.current_probe_row < probe_rows.len() {
435 let probe_row = probe_rows[self.current_probe_row];
436
437 if self.current_matches.is_empty() && self.current_match_position == 0 {
439 let key = self.extract_key(probe_chunk, probe_row, &self.probe_keys)?;
440
441 match self.join_type {
443 JoinType::Semi => {
444 if self.hash_table.contains_key(&key) {
445 for col_idx in 0..probe_chunk.column_count() {
447 if let (Some(src_col), Some(dst_col)) =
448 (probe_chunk.column(col_idx), builder.column_mut(col_idx))
449 {
450 if let Some(value) = src_col.get_value(probe_row) {
451 dst_col.push_value(value);
452 }
453 }
454 }
455 builder.advance_row();
456 }
457 self.current_probe_row += 1;
458 continue;
459 }
460 JoinType::Anti => {
461 if !self.hash_table.contains_key(&key) {
462 for col_idx in 0..probe_chunk.column_count() {
464 if let (Some(src_col), Some(dst_col)) =
465 (probe_chunk.column(col_idx), builder.column_mut(col_idx))
466 {
467 if let Some(value) = src_col.get_value(probe_row) {
468 dst_col.push_value(value);
469 }
470 }
471 }
472 builder.advance_row();
473 }
474 self.current_probe_row += 1;
475 continue;
476 }
477 _ => {
478 self.current_matches =
479 self.hash_table.get(&key).cloned().unwrap_or_default();
480 }
481 }
482 }
483
484 if self.current_matches.is_empty() {
486 if matches!(self.join_type, JoinType::Left | JoinType::Full) {
488 self.produce_output_row(&mut builder, probe_chunk, probe_row, None, None)?;
489 }
490 self.current_probe_row += 1;
491 self.current_match_position = 0;
492 } else {
493 while self.current_match_position < self.current_matches.len() {
495 let (build_chunk_idx, build_row) =
496 self.current_matches[self.current_match_position];
497 let build_chunk = &self.build_chunks[build_chunk_idx];
498
499 if matches!(self.join_type, JoinType::Left | JoinType::Full) {
501 if probe_row < self.probe_matched.len() {
502 self.probe_matched[probe_row] = true;
503 }
504 }
505 if matches!(self.join_type, JoinType::Right | JoinType::Full) {
506 if build_chunk_idx < self.build_matched.len()
507 && build_row < self.build_matched[build_chunk_idx].len()
508 {
509 self.build_matched[build_chunk_idx][build_row] = true;
510 }
511 }
512
513 self.produce_output_row(
514 &mut builder,
515 probe_chunk,
516 probe_row,
517 Some(build_chunk),
518 Some(build_row),
519 )?;
520
521 self.current_match_position += 1;
522
523 if builder.is_full() {
524 return Ok(Some(builder.finish()));
525 }
526 }
527
528 self.current_probe_row += 1;
530 self.current_matches.clear();
531 self.current_match_position = 0;
532 }
533
534 if builder.is_full() {
535 return Ok(Some(builder.finish()));
536 }
537 }
538
539 self.current_probe_chunk = None;
541 self.current_probe_row = 0;
542
543 if builder.row_count() > 0 {
544 return Ok(Some(builder.finish()));
545 }
546 }
547 }
548
549 fn reset(&mut self) {
550 self.probe_side.reset();
551 self.build_side.reset();
552 self.hash_table.clear();
553 self.build_chunks.clear();
554 self.build_complete = false;
555 self.current_probe_chunk = None;
556 self.current_probe_row = 0;
557 self.current_match_position = 0;
558 self.current_matches.clear();
559 self.probe_matched.clear();
560 self.build_matched.clear();
561 self.emitting_unmatched = false;
562 self.unmatched_chunk_idx = 0;
563 self.unmatched_row_idx = 0;
564 }
565
566 fn name(&self) -> &'static str {
567 "HashJoin"
568 }
569}
570
571pub struct NestedLoopJoinOperator {
576 left: Box<dyn Operator>,
578 right: Box<dyn Operator>,
580 condition: Option<Box<dyn JoinCondition>>,
582 join_type: JoinType,
584 output_schema: Vec<LogicalType>,
586 right_chunks: Vec<DataChunk>,
588 right_materialized: bool,
590 current_left_chunk: Option<DataChunk>,
592 current_left_row: usize,
594 current_right_chunk: usize,
596 current_left_matched: bool,
598 current_right_row: usize,
600}
601
602pub trait JoinCondition: Send + Sync {
604 fn evaluate(
606 &self,
607 left_chunk: &DataChunk,
608 left_row: usize,
609 right_chunk: &DataChunk,
610 right_row: usize,
611 ) -> bool;
612}
613
614pub struct EqualityCondition {
616 left_column: usize,
618 right_column: usize,
620}
621
622impl EqualityCondition {
623 pub fn new(left_column: usize, right_column: usize) -> Self {
625 Self {
626 left_column,
627 right_column,
628 }
629 }
630}
631
632impl JoinCondition for EqualityCondition {
633 fn evaluate(
634 &self,
635 left_chunk: &DataChunk,
636 left_row: usize,
637 right_chunk: &DataChunk,
638 right_row: usize,
639 ) -> bool {
640 let left_val = left_chunk
641 .column(self.left_column)
642 .and_then(|c| c.get_value(left_row));
643 let right_val = right_chunk
644 .column(self.right_column)
645 .and_then(|c| c.get_value(right_row));
646
647 match (left_val, right_val) {
648 (Some(l), Some(r)) => l == r,
649 _ => false,
650 }
651 }
652}
653
654impl NestedLoopJoinOperator {
655 pub fn new(
657 left: Box<dyn Operator>,
658 right: Box<dyn Operator>,
659 condition: Option<Box<dyn JoinCondition>>,
660 join_type: JoinType,
661 output_schema: Vec<LogicalType>,
662 ) -> Self {
663 Self {
664 left,
665 right,
666 condition,
667 join_type,
668 output_schema,
669 right_chunks: Vec::new(),
670 right_materialized: false,
671 current_left_chunk: None,
672 current_left_row: 0,
673 current_right_chunk: 0,
674 current_right_row: 0,
675 current_left_matched: false,
676 }
677 }
678
679 fn materialize_right(&mut self) -> Result<(), OperatorError> {
681 while let Some(chunk) = self.right.next()? {
682 self.right_chunks.push(chunk);
683 }
684 self.right_materialized = true;
685 Ok(())
686 }
687
688 fn produce_row(
690 &self,
691 builder: &mut DataChunkBuilder,
692 left_chunk: &DataChunk,
693 left_row: usize,
694 right_chunk: &DataChunk,
695 right_row: usize,
696 ) {
697 for col_idx in 0..left_chunk.column_count() {
699 if let (Some(src), Some(dst)) =
700 (left_chunk.column(col_idx), builder.column_mut(col_idx))
701 {
702 if let Some(val) = src.get_value(left_row) {
703 dst.push_value(val);
704 } else {
705 dst.push_value(Value::Null);
706 }
707 }
708 }
709
710 let left_col_count = left_chunk.column_count();
712 for col_idx in 0..right_chunk.column_count() {
713 if let (Some(src), Some(dst)) = (
714 right_chunk.column(col_idx),
715 builder.column_mut(left_col_count + col_idx),
716 ) {
717 if let Some(val) = src.get_value(right_row) {
718 dst.push_value(val);
719 } else {
720 dst.push_value(Value::Null);
721 }
722 }
723 }
724
725 builder.advance_row();
726 }
727
728 fn produce_left_unmatched_row(
730 &self,
731 builder: &mut DataChunkBuilder,
732 left_chunk: &DataChunk,
733 left_row: usize,
734 right_col_count: usize,
735 ) {
736 for col_idx in 0..left_chunk.column_count() {
738 if let (Some(src), Some(dst)) =
739 (left_chunk.column(col_idx), builder.column_mut(col_idx))
740 {
741 if let Some(val) = src.get_value(left_row) {
742 dst.push_value(val);
743 } else {
744 dst.push_value(Value::Null);
745 }
746 }
747 }
748
749 let left_col_count = left_chunk.column_count();
751 for col_idx in 0..right_col_count {
752 if let Some(dst) = builder.column_mut(left_col_count + col_idx) {
753 dst.push_value(Value::Null);
754 }
755 }
756
757 builder.advance_row();
758 }
759}
760
761impl Operator for NestedLoopJoinOperator {
762 fn next(&mut self) -> OperatorResult {
763 if !self.right_materialized {
765 self.materialize_right()?;
766 }
767
768 if self.right_chunks.is_empty() && !matches!(self.join_type, JoinType::Left) {
770 return Ok(None);
771 }
772
773 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
774
775 loop {
776 if self.current_left_chunk.is_none() {
778 self.current_left_chunk = self.left.next()?;
779 self.current_left_row = 0;
780 self.current_right_chunk = 0;
781 self.current_right_row = 0;
782
783 if self.current_left_chunk.is_none() {
784 return if builder.row_count() > 0 {
786 Ok(Some(builder.finish()))
787 } else {
788 Ok(None)
789 };
790 }
791 }
792
793 let left_chunk = self.current_left_chunk.as_ref().unwrap();
794 let left_rows: Vec<usize> = left_chunk.selected_indices().collect();
795
796 let right_col_count = if !self.right_chunks.is_empty() {
798 self.right_chunks[0].column_count()
799 } else {
800 self.output_schema
802 .len()
803 .saturating_sub(left_chunk.column_count())
804 };
805
806 while self.current_left_row < left_rows.len() {
808 let left_row = left_rows[self.current_left_row];
809
810 if self.current_right_chunk == 0 && self.current_right_row == 0 {
812 self.current_left_matched = false;
813 }
814
815 while self.current_right_chunk < self.right_chunks.len() {
817 let right_chunk = &self.right_chunks[self.current_right_chunk];
818 let right_rows: Vec<usize> = right_chunk.selected_indices().collect();
819
820 while self.current_right_row < right_rows.len() {
821 let right_row = right_rows[self.current_right_row];
822
823 let matches = match &self.condition {
825 Some(cond) => {
826 cond.evaluate(left_chunk, left_row, right_chunk, right_row)
827 }
828 None => true, };
830
831 if matches {
832 self.current_left_matched = true;
833 self.produce_row(
834 &mut builder,
835 left_chunk,
836 left_row,
837 right_chunk,
838 right_row,
839 );
840
841 if builder.is_full() {
842 self.current_right_row += 1;
843 return Ok(Some(builder.finish()));
844 }
845 }
846
847 self.current_right_row += 1;
848 }
849
850 self.current_right_chunk += 1;
851 self.current_right_row = 0;
852 }
853
854 if matches!(self.join_type, JoinType::Left) && !self.current_left_matched {
857 self.produce_left_unmatched_row(
858 &mut builder,
859 left_chunk,
860 left_row,
861 right_col_count,
862 );
863
864 if builder.is_full() {
865 self.current_left_row += 1;
866 self.current_right_chunk = 0;
867 self.current_right_row = 0;
868 return Ok(Some(builder.finish()));
869 }
870 }
871
872 self.current_left_row += 1;
874 self.current_right_chunk = 0;
875 self.current_right_row = 0;
876 }
877
878 self.current_left_chunk = None;
880
881 if builder.row_count() > 0 {
882 return Ok(Some(builder.finish()));
883 }
884 }
885 }
886
887 fn reset(&mut self) {
888 self.left.reset();
889 self.right.reset();
890 self.right_chunks.clear();
891 self.right_materialized = false;
892 self.current_left_chunk = None;
893 self.current_left_row = 0;
894 self.current_right_chunk = 0;
895 self.current_right_row = 0;
896 self.current_left_matched = false;
897 }
898
899 fn name(&self) -> &'static str {
900 "NestedLoopJoin"
901 }
902}
903
904#[cfg(test)]
905mod tests {
906 use super::*;
907 use crate::execution::chunk::DataChunkBuilder;
908
909 struct MockOperator {
911 chunks: Vec<DataChunk>,
912 position: usize,
913 }
914
915 impl MockOperator {
916 fn new(chunks: Vec<DataChunk>) -> Self {
917 Self {
918 chunks,
919 position: 0,
920 }
921 }
922 }
923
924 impl Operator for MockOperator {
925 fn next(&mut self) -> OperatorResult {
926 if self.position < self.chunks.len() {
927 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
928 self.position += 1;
929 Ok(Some(chunk))
930 } else {
931 Ok(None)
932 }
933 }
934
935 fn reset(&mut self) {
936 self.position = 0;
937 }
938
939 fn name(&self) -> &'static str {
940 "Mock"
941 }
942 }
943
944 fn create_int_chunk(values: &[i64]) -> DataChunk {
945 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
946 for &v in values {
947 builder.column_mut(0).unwrap().push_int64(v);
948 builder.advance_row();
949 }
950 builder.finish()
951 }
952
953 #[test]
954 fn test_hash_join_inner() {
955 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
960 let right = MockOperator::new(vec![create_int_chunk(&[2, 3, 4, 5])]);
961
962 let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
963 let mut join = HashJoinOperator::new(
964 Box::new(left),
965 Box::new(right),
966 vec![0],
967 vec![0],
968 JoinType::Inner,
969 output_schema,
970 );
971
972 let mut results = Vec::new();
973 while let Some(chunk) = join.next().unwrap() {
974 for row in chunk.selected_indices() {
975 let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
976 let right_val = chunk.column(1).unwrap().get_int64(row).unwrap();
977 results.push((left_val, right_val));
978 }
979 }
980
981 results.sort();
982 assert_eq!(results, vec![(2, 2), (3, 3), (4, 4)]);
983 }
984
985 #[test]
986 fn test_hash_join_left_outer() {
987 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3])]);
992 let right = MockOperator::new(vec![create_int_chunk(&[2, 3])]);
993
994 let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
995 let mut join = HashJoinOperator::new(
996 Box::new(left),
997 Box::new(right),
998 vec![0],
999 vec![0],
1000 JoinType::Left,
1001 output_schema,
1002 );
1003
1004 let mut results = Vec::new();
1005 while let Some(chunk) = join.next().unwrap() {
1006 for row in chunk.selected_indices() {
1007 let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
1008 let right_val = chunk.column(1).unwrap().get_int64(row);
1009 results.push((left_val, right_val));
1010 }
1011 }
1012
1013 results.sort_by_key(|(l, _)| *l);
1014 assert_eq!(results.len(), 3);
1015 assert_eq!(results[0], (1, None)); assert_eq!(results[1], (2, Some(2)));
1017 assert_eq!(results[2], (3, Some(3)));
1018 }
1019
1020 #[test]
1021 fn test_nested_loop_cross_join() {
1022 let left = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
1027 let right = MockOperator::new(vec![create_int_chunk(&[10, 20])]);
1028
1029 let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
1030 let mut join = NestedLoopJoinOperator::new(
1031 Box::new(left),
1032 Box::new(right),
1033 None,
1034 JoinType::Cross,
1035 output_schema,
1036 );
1037
1038 let mut results = Vec::new();
1039 while let Some(chunk) = join.next().unwrap() {
1040 for row in chunk.selected_indices() {
1041 let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
1042 let right_val = chunk.column(1).unwrap().get_int64(row).unwrap();
1043 results.push((left_val, right_val));
1044 }
1045 }
1046
1047 results.sort();
1048 assert_eq!(results, vec![(1, 10), (1, 20), (2, 10), (2, 20)]);
1049 }
1050
1051 #[test]
1052 fn test_hash_join_semi() {
1053 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
1058 let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
1059
1060 let output_schema = vec![LogicalType::Int64];
1062 let mut join = HashJoinOperator::new(
1063 Box::new(left),
1064 Box::new(right),
1065 vec![0],
1066 vec![0],
1067 JoinType::Semi,
1068 output_schema,
1069 );
1070
1071 let mut results = Vec::new();
1072 while let Some(chunk) = join.next().unwrap() {
1073 for row in chunk.selected_indices() {
1074 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
1075 results.push(val);
1076 }
1077 }
1078
1079 results.sort();
1080 assert_eq!(results, vec![2, 4]);
1081 }
1082
1083 #[test]
1084 fn test_hash_join_anti() {
1085 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
1090 let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
1091
1092 let output_schema = vec![LogicalType::Int64];
1093 let mut join = HashJoinOperator::new(
1094 Box::new(left),
1095 Box::new(right),
1096 vec![0],
1097 vec![0],
1098 JoinType::Anti,
1099 output_schema,
1100 );
1101
1102 let mut results = Vec::new();
1103 while let Some(chunk) = join.next().unwrap() {
1104 for row in chunk.selected_indices() {
1105 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
1106 results.push(val);
1107 }
1108 }
1109
1110 results.sort();
1111 assert_eq!(results, vec![1, 3]);
1112 }
1113}