1use std::collections::HashMap;
8
9use grafeo_common::types::{LogicalType, Value};
10
11use super::{Operator, OperatorError, OperatorResult};
12use crate::execution::chunk::DataChunkBuilder;
13use crate::execution::{DataChunk, ValueVector};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum JoinType {
18 Inner,
20 Left,
22 Right,
24 Full,
26 Cross,
28 Semi,
30 Anti,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub enum HashKey {
37 Null,
39 Bool(bool),
41 Int64(i64),
43 String(String),
45 Composite(Vec<HashKey>),
47}
48
49impl HashKey {
50 pub fn from_value(value: &Value) -> Self {
52 match value {
53 Value::Null => HashKey::Null,
54 Value::Bool(b) => HashKey::Bool(*b),
55 Value::Int64(i) => HashKey::Int64(*i),
56 Value::Float64(f) => {
57 HashKey::Int64(f.to_bits() as i64)
59 }
60 Value::String(s) => HashKey::String(s.to_string()),
61 Value::Bytes(b) => {
62 HashKey::String(format!("{b:?}"))
64 }
65 Value::Timestamp(t) => HashKey::Int64(t.as_micros()),
66 Value::List(items) => {
67 HashKey::Composite(items.iter().map(HashKey::from_value).collect())
68 }
69 Value::Map(map) => {
70 let mut keys: Vec<_> = map
71 .iter()
72 .map(|(k, v)| {
73 HashKey::Composite(vec![
74 HashKey::String(k.to_string()),
75 HashKey::from_value(v),
76 ])
77 })
78 .collect();
79 keys.sort_by(|a, b| format!("{a:?}").cmp(&format!("{b:?}")));
80 HashKey::Composite(keys)
81 }
82 Value::Vector(v) => {
83 HashKey::Composite(
85 v.iter()
86 .map(|f| HashKey::Int64(f.to_bits() as i64))
87 .collect(),
88 )
89 }
90 }
91 }
92
93 pub fn from_column(column: &ValueVector, row: usize) -> Option<Self> {
95 column.get_value(row).map(|v| Self::from_value(&v))
96 }
97}
98
99pub struct HashJoinOperator {
104 probe_side: Box<dyn Operator>,
106 build_side: Box<dyn Operator>,
108 probe_keys: Vec<usize>,
110 build_keys: Vec<usize>,
112 join_type: JoinType,
114 output_schema: Vec<LogicalType>,
116 hash_table: HashMap<HashKey, Vec<(usize, usize)>>,
118 build_chunks: Vec<DataChunk>,
120 build_complete: bool,
122 current_probe_chunk: Option<DataChunk>,
124 current_probe_row: usize,
126 current_match_position: usize,
128 current_matches: Vec<(usize, usize)>,
130 probe_matched: Vec<bool>,
132 build_matched: Vec<Vec<bool>>,
134 emitting_unmatched: bool,
136 unmatched_chunk_idx: usize,
138 unmatched_row_idx: usize,
140}
141
142impl HashJoinOperator {
143 pub fn new(
153 probe_side: Box<dyn Operator>,
154 build_side: Box<dyn Operator>,
155 probe_keys: Vec<usize>,
156 build_keys: Vec<usize>,
157 join_type: JoinType,
158 output_schema: Vec<LogicalType>,
159 ) -> Self {
160 Self {
161 probe_side,
162 build_side,
163 probe_keys,
164 build_keys,
165 join_type,
166 output_schema,
167 hash_table: HashMap::new(),
168 build_chunks: Vec::new(),
169 build_complete: false,
170 current_probe_chunk: None,
171 current_probe_row: 0,
172 current_match_position: 0,
173 current_matches: Vec::new(),
174 probe_matched: Vec::new(),
175 build_matched: Vec::new(),
176 emitting_unmatched: false,
177 unmatched_chunk_idx: 0,
178 unmatched_row_idx: 0,
179 }
180 }
181
182 fn build_hash_table(&mut self) -> Result<(), OperatorError> {
184 while let Some(chunk) = self.build_side.next()? {
185 let chunk_idx = self.build_chunks.len();
186
187 if matches!(self.join_type, JoinType::Right | JoinType::Full) {
189 self.build_matched.push(vec![false; chunk.row_count()]);
190 }
191
192 for row in chunk.selected_indices() {
194 let key = self.extract_key(&chunk, row, &self.build_keys)?;
195
196 if matches!(key, HashKey::Null)
198 && !matches!(
199 self.join_type,
200 JoinType::Left | JoinType::Right | JoinType::Full
201 )
202 {
203 continue;
204 }
205
206 self.hash_table
207 .entry(key)
208 .or_default()
209 .push((chunk_idx, row));
210 }
211
212 self.build_chunks.push(chunk);
213 }
214
215 self.build_complete = true;
216 Ok(())
217 }
218
219 fn extract_key(
221 &self,
222 chunk: &DataChunk,
223 row: usize,
224 key_columns: &[usize],
225 ) -> Result<HashKey, OperatorError> {
226 if key_columns.len() == 1 {
227 let col = chunk.column(key_columns[0]).ok_or_else(|| {
228 OperatorError::ColumnNotFound(format!("column {}", key_columns[0]))
229 })?;
230 Ok(HashKey::from_column(col, row).unwrap_or(HashKey::Null))
231 } else {
232 let keys: Vec<HashKey> = key_columns
233 .iter()
234 .map(|&col_idx| {
235 chunk
236 .column(col_idx)
237 .and_then(|col| HashKey::from_column(col, row))
238 .unwrap_or(HashKey::Null)
239 })
240 .collect();
241 Ok(HashKey::Composite(keys))
242 }
243 }
244
245 fn produce_output_row(
247 &self,
248 builder: &mut DataChunkBuilder,
249 probe_chunk: &DataChunk,
250 probe_row: usize,
251 build_chunk: Option<&DataChunk>,
252 build_row: Option<usize>,
253 ) -> Result<(), OperatorError> {
254 let probe_col_count = probe_chunk.column_count();
255
256 for col_idx in 0..probe_col_count {
258 let src_col = probe_chunk
259 .column(col_idx)
260 .ok_or_else(|| OperatorError::ColumnNotFound(format!("probe column {col_idx}")))?;
261 let dst_col = builder
262 .column_mut(col_idx)
263 .ok_or_else(|| OperatorError::ColumnNotFound(format!("output column {col_idx}")))?;
264
265 if let Some(value) = src_col.get_value(probe_row) {
266 dst_col.push_value(value);
267 } else {
268 dst_col.push_value(Value::Null);
269 }
270 }
271
272 match (build_chunk, build_row) {
274 (Some(chunk), Some(row)) => {
275 for col_idx in 0..chunk.column_count() {
276 let src_col = chunk.column(col_idx).ok_or_else(|| {
277 OperatorError::ColumnNotFound(format!("build column {col_idx}"))
278 })?;
279 let dst_col =
280 builder
281 .column_mut(probe_col_count + col_idx)
282 .ok_or_else(|| {
283 OperatorError::ColumnNotFound(format!(
284 "output column {}",
285 probe_col_count + col_idx
286 ))
287 })?;
288
289 if let Some(value) = src_col.get_value(row) {
290 dst_col.push_value(value);
291 } else {
292 dst_col.push_value(Value::Null);
293 }
294 }
295 }
296 _ => {
297 if !self.build_chunks.is_empty() {
299 let build_col_count = self.build_chunks[0].column_count();
300 for col_idx in 0..build_col_count {
301 let dst_col =
302 builder
303 .column_mut(probe_col_count + col_idx)
304 .ok_or_else(|| {
305 OperatorError::ColumnNotFound(format!(
306 "output column {}",
307 probe_col_count + col_idx
308 ))
309 })?;
310 dst_col.push_value(Value::Null);
311 }
312 }
313 }
314 }
315
316 builder.advance_row();
317 Ok(())
318 }
319
320 fn get_next_probe_chunk(&mut self) -> Result<bool, OperatorError> {
322 let chunk = self.probe_side.next()?;
323 if let Some(ref c) = chunk {
324 if matches!(self.join_type, JoinType::Left | JoinType::Full) {
326 self.probe_matched = vec![false; c.row_count()];
327 }
328 }
329 let has_chunk = chunk.is_some();
330 self.current_probe_chunk = chunk;
331 self.current_probe_row = 0;
332 Ok(has_chunk)
333 }
334
335 fn emit_unmatched_build(&mut self) -> OperatorResult {
337 if self.build_matched.is_empty() {
338 return Ok(None);
339 }
340
341 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
342
343 let probe_col_count = if !self.build_chunks.is_empty() {
345 self.output_schema.len() - self.build_chunks[0].column_count()
346 } else {
347 0
348 };
349
350 while self.unmatched_chunk_idx < self.build_chunks.len() {
351 let chunk = &self.build_chunks[self.unmatched_chunk_idx];
352 let matched = &self.build_matched[self.unmatched_chunk_idx];
353
354 while self.unmatched_row_idx < matched.len() {
355 if !matched[self.unmatched_row_idx] {
356 for col_idx in 0..probe_col_count {
360 if let Some(dst_col) = builder.column_mut(col_idx) {
361 dst_col.push_value(Value::Null);
362 }
363 }
364
365 for col_idx in 0..chunk.column_count() {
367 if let (Some(src_col), Some(dst_col)) = (
368 chunk.column(col_idx),
369 builder.column_mut(probe_col_count + col_idx),
370 ) {
371 if let Some(value) = src_col.get_value(self.unmatched_row_idx) {
372 dst_col.push_value(value);
373 } else {
374 dst_col.push_value(Value::Null);
375 }
376 }
377 }
378
379 builder.advance_row();
380
381 if builder.is_full() {
382 self.unmatched_row_idx += 1;
383 return Ok(Some(builder.finish()));
384 }
385 }
386
387 self.unmatched_row_idx += 1;
388 }
389
390 self.unmatched_chunk_idx += 1;
391 self.unmatched_row_idx = 0;
392 }
393
394 if builder.row_count() > 0 {
395 Ok(Some(builder.finish()))
396 } else {
397 Ok(None)
398 }
399 }
400}
401
402impl Operator for HashJoinOperator {
403 fn next(&mut self) -> OperatorResult {
404 if !self.build_complete {
406 self.build_hash_table()?;
407 }
408
409 if self.emitting_unmatched {
411 return self.emit_unmatched_build();
412 }
413
414 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
416
417 loop {
418 if self.current_probe_chunk.is_none() {
420 if !self.get_next_probe_chunk()? {
421 if matches!(self.join_type, JoinType::Right | JoinType::Full) {
423 self.emitting_unmatched = true;
424 return self.emit_unmatched_build();
425 }
426 return if builder.row_count() > 0 {
427 Ok(Some(builder.finish()))
428 } else {
429 Ok(None)
430 };
431 }
432 }
433
434 let probe_chunk = self
437 .current_probe_chunk
438 .as_ref()
439 .expect("probe chunk is Some: guard at line 396 ensures this");
440 let probe_rows: Vec<usize> = probe_chunk.selected_indices().collect();
441
442 while self.current_probe_row < probe_rows.len() {
443 let probe_row = probe_rows[self.current_probe_row];
444
445 if self.current_matches.is_empty() && self.current_match_position == 0 {
447 let key = self.extract_key(probe_chunk, probe_row, &self.probe_keys)?;
448
449 match self.join_type {
451 JoinType::Semi => {
452 if self.hash_table.contains_key(&key) {
453 for col_idx in 0..probe_chunk.column_count() {
455 if let (Some(src_col), Some(dst_col)) =
456 (probe_chunk.column(col_idx), builder.column_mut(col_idx))
457 {
458 if let Some(value) = src_col.get_value(probe_row) {
459 dst_col.push_value(value);
460 }
461 }
462 }
463 builder.advance_row();
464 }
465 self.current_probe_row += 1;
466 continue;
467 }
468 JoinType::Anti => {
469 if !self.hash_table.contains_key(&key) {
470 for col_idx in 0..probe_chunk.column_count() {
472 if let (Some(src_col), Some(dst_col)) =
473 (probe_chunk.column(col_idx), builder.column_mut(col_idx))
474 {
475 if let Some(value) = src_col.get_value(probe_row) {
476 dst_col.push_value(value);
477 }
478 }
479 }
480 builder.advance_row();
481 }
482 self.current_probe_row += 1;
483 continue;
484 }
485 _ => {
486 self.current_matches =
487 self.hash_table.get(&key).cloned().unwrap_or_default();
488 }
489 }
490 }
491
492 if self.current_matches.is_empty() {
494 if matches!(self.join_type, JoinType::Left | JoinType::Full) {
496 self.produce_output_row(&mut builder, probe_chunk, probe_row, None, None)?;
497 }
498 self.current_probe_row += 1;
499 self.current_match_position = 0;
500 } else {
501 while self.current_match_position < self.current_matches.len() {
503 let (build_chunk_idx, build_row) =
504 self.current_matches[self.current_match_position];
505 let build_chunk = &self.build_chunks[build_chunk_idx];
506
507 if matches!(self.join_type, JoinType::Left | JoinType::Full) {
509 if probe_row < self.probe_matched.len() {
510 self.probe_matched[probe_row] = true;
511 }
512 }
513 if matches!(self.join_type, JoinType::Right | JoinType::Full) {
514 if build_chunk_idx < self.build_matched.len()
515 && build_row < self.build_matched[build_chunk_idx].len()
516 {
517 self.build_matched[build_chunk_idx][build_row] = true;
518 }
519 }
520
521 self.produce_output_row(
522 &mut builder,
523 probe_chunk,
524 probe_row,
525 Some(build_chunk),
526 Some(build_row),
527 )?;
528
529 self.current_match_position += 1;
530
531 if builder.is_full() {
532 return Ok(Some(builder.finish()));
533 }
534 }
535
536 self.current_probe_row += 1;
538 self.current_matches.clear();
539 self.current_match_position = 0;
540 }
541
542 if builder.is_full() {
543 return Ok(Some(builder.finish()));
544 }
545 }
546
547 self.current_probe_chunk = None;
549 self.current_probe_row = 0;
550
551 if builder.row_count() > 0 {
552 return Ok(Some(builder.finish()));
553 }
554 }
555 }
556
557 fn reset(&mut self) {
558 self.probe_side.reset();
559 self.build_side.reset();
560 self.hash_table.clear();
561 self.build_chunks.clear();
562 self.build_complete = false;
563 self.current_probe_chunk = None;
564 self.current_probe_row = 0;
565 self.current_match_position = 0;
566 self.current_matches.clear();
567 self.probe_matched.clear();
568 self.build_matched.clear();
569 self.emitting_unmatched = false;
570 self.unmatched_chunk_idx = 0;
571 self.unmatched_row_idx = 0;
572 }
573
574 fn name(&self) -> &'static str {
575 "HashJoin"
576 }
577}
578
579pub struct NestedLoopJoinOperator {
584 left: Box<dyn Operator>,
586 right: Box<dyn Operator>,
588 condition: Option<Box<dyn JoinCondition>>,
590 join_type: JoinType,
592 output_schema: Vec<LogicalType>,
594 right_chunks: Vec<DataChunk>,
596 right_materialized: bool,
598 current_left_chunk: Option<DataChunk>,
600 current_left_row: usize,
602 current_right_chunk: usize,
604 current_left_matched: bool,
606 current_right_row: usize,
608}
609
610pub trait JoinCondition: Send + Sync {
612 fn evaluate(
614 &self,
615 left_chunk: &DataChunk,
616 left_row: usize,
617 right_chunk: &DataChunk,
618 right_row: usize,
619 ) -> bool;
620}
621
622pub struct EqualityCondition {
624 left_column: usize,
626 right_column: usize,
628}
629
630impl EqualityCondition {
631 pub fn new(left_column: usize, right_column: usize) -> Self {
633 Self {
634 left_column,
635 right_column,
636 }
637 }
638}
639
640impl JoinCondition for EqualityCondition {
641 fn evaluate(
642 &self,
643 left_chunk: &DataChunk,
644 left_row: usize,
645 right_chunk: &DataChunk,
646 right_row: usize,
647 ) -> bool {
648 let left_val = left_chunk
649 .column(self.left_column)
650 .and_then(|c| c.get_value(left_row));
651 let right_val = right_chunk
652 .column(self.right_column)
653 .and_then(|c| c.get_value(right_row));
654
655 match (left_val, right_val) {
656 (Some(l), Some(r)) => l == r,
657 _ => false,
658 }
659 }
660}
661
662impl NestedLoopJoinOperator {
663 pub fn new(
665 left: Box<dyn Operator>,
666 right: Box<dyn Operator>,
667 condition: Option<Box<dyn JoinCondition>>,
668 join_type: JoinType,
669 output_schema: Vec<LogicalType>,
670 ) -> Self {
671 Self {
672 left,
673 right,
674 condition,
675 join_type,
676 output_schema,
677 right_chunks: Vec::new(),
678 right_materialized: false,
679 current_left_chunk: None,
680 current_left_row: 0,
681 current_right_chunk: 0,
682 current_right_row: 0,
683 current_left_matched: false,
684 }
685 }
686
687 fn materialize_right(&mut self) -> Result<(), OperatorError> {
689 while let Some(chunk) = self.right.next()? {
690 self.right_chunks.push(chunk);
691 }
692 self.right_materialized = true;
693 Ok(())
694 }
695
696 fn produce_row(
698 &self,
699 builder: &mut DataChunkBuilder,
700 left_chunk: &DataChunk,
701 left_row: usize,
702 right_chunk: &DataChunk,
703 right_row: usize,
704 ) {
705 for col_idx in 0..left_chunk.column_count() {
707 if let (Some(src), Some(dst)) =
708 (left_chunk.column(col_idx), builder.column_mut(col_idx))
709 {
710 if let Some(val) = src.get_value(left_row) {
711 dst.push_value(val);
712 } else {
713 dst.push_value(Value::Null);
714 }
715 }
716 }
717
718 let left_col_count = left_chunk.column_count();
720 for col_idx in 0..right_chunk.column_count() {
721 if let (Some(src), Some(dst)) = (
722 right_chunk.column(col_idx),
723 builder.column_mut(left_col_count + col_idx),
724 ) {
725 if let Some(val) = src.get_value(right_row) {
726 dst.push_value(val);
727 } else {
728 dst.push_value(Value::Null);
729 }
730 }
731 }
732
733 builder.advance_row();
734 }
735
736 fn produce_left_unmatched_row(
738 &self,
739 builder: &mut DataChunkBuilder,
740 left_chunk: &DataChunk,
741 left_row: usize,
742 right_col_count: usize,
743 ) {
744 for col_idx in 0..left_chunk.column_count() {
746 if let (Some(src), Some(dst)) =
747 (left_chunk.column(col_idx), builder.column_mut(col_idx))
748 {
749 if let Some(val) = src.get_value(left_row) {
750 dst.push_value(val);
751 } else {
752 dst.push_value(Value::Null);
753 }
754 }
755 }
756
757 let left_col_count = left_chunk.column_count();
759 for col_idx in 0..right_col_count {
760 if let Some(dst) = builder.column_mut(left_col_count + col_idx) {
761 dst.push_value(Value::Null);
762 }
763 }
764
765 builder.advance_row();
766 }
767}
768
769impl Operator for NestedLoopJoinOperator {
770 fn next(&mut self) -> OperatorResult {
771 if !self.right_materialized {
773 self.materialize_right()?;
774 }
775
776 if self.right_chunks.is_empty() && !matches!(self.join_type, JoinType::Left) {
778 return Ok(None);
779 }
780
781 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
782
783 loop {
784 if self.current_left_chunk.is_none() {
786 self.current_left_chunk = self.left.next()?;
787 self.current_left_row = 0;
788 self.current_right_chunk = 0;
789 self.current_right_row = 0;
790
791 if self.current_left_chunk.is_none() {
792 return if builder.row_count() > 0 {
794 Ok(Some(builder.finish()))
795 } else {
796 Ok(None)
797 };
798 }
799 }
800
801 let left_chunk = self.current_left_chunk.as_ref().unwrap();
802 let left_rows: Vec<usize> = left_chunk.selected_indices().collect();
803
804 let right_col_count = if !self.right_chunks.is_empty() {
806 self.right_chunks[0].column_count()
807 } else {
808 self.output_schema
810 .len()
811 .saturating_sub(left_chunk.column_count())
812 };
813
814 while self.current_left_row < left_rows.len() {
816 let left_row = left_rows[self.current_left_row];
817
818 if self.current_right_chunk == 0 && self.current_right_row == 0 {
820 self.current_left_matched = false;
821 }
822
823 while self.current_right_chunk < self.right_chunks.len() {
825 let right_chunk = &self.right_chunks[self.current_right_chunk];
826 let right_rows: Vec<usize> = right_chunk.selected_indices().collect();
827
828 while self.current_right_row < right_rows.len() {
829 let right_row = right_rows[self.current_right_row];
830
831 let matches = match &self.condition {
833 Some(cond) => {
834 cond.evaluate(left_chunk, left_row, right_chunk, right_row)
835 }
836 None => true, };
838
839 if matches {
840 self.current_left_matched = true;
841 self.produce_row(
842 &mut builder,
843 left_chunk,
844 left_row,
845 right_chunk,
846 right_row,
847 );
848
849 if builder.is_full() {
850 self.current_right_row += 1;
851 return Ok(Some(builder.finish()));
852 }
853 }
854
855 self.current_right_row += 1;
856 }
857
858 self.current_right_chunk += 1;
859 self.current_right_row = 0;
860 }
861
862 if matches!(self.join_type, JoinType::Left) && !self.current_left_matched {
865 self.produce_left_unmatched_row(
866 &mut builder,
867 left_chunk,
868 left_row,
869 right_col_count,
870 );
871
872 if builder.is_full() {
873 self.current_left_row += 1;
874 self.current_right_chunk = 0;
875 self.current_right_row = 0;
876 return Ok(Some(builder.finish()));
877 }
878 }
879
880 self.current_left_row += 1;
882 self.current_right_chunk = 0;
883 self.current_right_row = 0;
884 }
885
886 self.current_left_chunk = None;
888
889 if builder.row_count() > 0 {
890 return Ok(Some(builder.finish()));
891 }
892 }
893 }
894
895 fn reset(&mut self) {
896 self.left.reset();
897 self.right.reset();
898 self.right_chunks.clear();
899 self.right_materialized = false;
900 self.current_left_chunk = None;
901 self.current_left_row = 0;
902 self.current_right_chunk = 0;
903 self.current_right_row = 0;
904 self.current_left_matched = false;
905 }
906
907 fn name(&self) -> &'static str {
908 "NestedLoopJoin"
909 }
910}
911
912#[cfg(test)]
913mod tests {
914 use super::*;
915 use crate::execution::chunk::DataChunkBuilder;
916
917 struct MockOperator {
919 chunks: Vec<DataChunk>,
920 position: usize,
921 }
922
923 impl MockOperator {
924 fn new(chunks: Vec<DataChunk>) -> Self {
925 Self {
926 chunks,
927 position: 0,
928 }
929 }
930 }
931
932 impl Operator for MockOperator {
933 fn next(&mut self) -> OperatorResult {
934 if self.position < self.chunks.len() {
935 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
936 self.position += 1;
937 Ok(Some(chunk))
938 } else {
939 Ok(None)
940 }
941 }
942
943 fn reset(&mut self) {
944 self.position = 0;
945 }
946
947 fn name(&self) -> &'static str {
948 "Mock"
949 }
950 }
951
952 fn create_int_chunk(values: &[i64]) -> DataChunk {
953 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
954 for &v in values {
955 builder.column_mut(0).unwrap().push_int64(v);
956 builder.advance_row();
957 }
958 builder.finish()
959 }
960
961 #[test]
962 fn test_hash_join_inner() {
963 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
968 let right = MockOperator::new(vec![create_int_chunk(&[2, 3, 4, 5])]);
969
970 let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
971 let mut join = HashJoinOperator::new(
972 Box::new(left),
973 Box::new(right),
974 vec![0],
975 vec![0],
976 JoinType::Inner,
977 output_schema,
978 );
979
980 let mut results = Vec::new();
981 while let Some(chunk) = join.next().unwrap() {
982 for row in chunk.selected_indices() {
983 let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
984 let right_val = chunk.column(1).unwrap().get_int64(row).unwrap();
985 results.push((left_val, right_val));
986 }
987 }
988
989 results.sort();
990 assert_eq!(results, vec![(2, 2), (3, 3), (4, 4)]);
991 }
992
993 #[test]
994 fn test_hash_join_left_outer() {
995 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3])]);
1000 let right = MockOperator::new(vec![create_int_chunk(&[2, 3])]);
1001
1002 let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
1003 let mut join = HashJoinOperator::new(
1004 Box::new(left),
1005 Box::new(right),
1006 vec![0],
1007 vec![0],
1008 JoinType::Left,
1009 output_schema,
1010 );
1011
1012 let mut results = Vec::new();
1013 while let Some(chunk) = join.next().unwrap() {
1014 for row in chunk.selected_indices() {
1015 let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
1016 let right_val = chunk.column(1).unwrap().get_int64(row);
1017 results.push((left_val, right_val));
1018 }
1019 }
1020
1021 results.sort_by_key(|(l, _)| *l);
1022 assert_eq!(results.len(), 3);
1023 assert_eq!(results[0], (1, None)); assert_eq!(results[1], (2, Some(2)));
1025 assert_eq!(results[2], (3, Some(3)));
1026 }
1027
1028 #[test]
1029 fn test_nested_loop_cross_join() {
1030 let left = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
1035 let right = MockOperator::new(vec![create_int_chunk(&[10, 20])]);
1036
1037 let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
1038 let mut join = NestedLoopJoinOperator::new(
1039 Box::new(left),
1040 Box::new(right),
1041 None,
1042 JoinType::Cross,
1043 output_schema,
1044 );
1045
1046 let mut results = Vec::new();
1047 while let Some(chunk) = join.next().unwrap() {
1048 for row in chunk.selected_indices() {
1049 let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
1050 let right_val = chunk.column(1).unwrap().get_int64(row).unwrap();
1051 results.push((left_val, right_val));
1052 }
1053 }
1054
1055 results.sort();
1056 assert_eq!(results, vec![(1, 10), (1, 20), (2, 10), (2, 20)]);
1057 }
1058
1059 #[test]
1060 fn test_hash_join_semi() {
1061 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
1066 let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
1067
1068 let output_schema = vec![LogicalType::Int64];
1070 let mut join = HashJoinOperator::new(
1071 Box::new(left),
1072 Box::new(right),
1073 vec![0],
1074 vec![0],
1075 JoinType::Semi,
1076 output_schema,
1077 );
1078
1079 let mut results = Vec::new();
1080 while let Some(chunk) = join.next().unwrap() {
1081 for row in chunk.selected_indices() {
1082 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
1083 results.push(val);
1084 }
1085 }
1086
1087 results.sort();
1088 assert_eq!(results, vec![2, 4]);
1089 }
1090
1091 #[test]
1092 fn test_hash_join_anti() {
1093 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
1098 let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
1099
1100 let output_schema = vec![LogicalType::Int64];
1101 let mut join = HashJoinOperator::new(
1102 Box::new(left),
1103 Box::new(right),
1104 vec![0],
1105 vec![0],
1106 JoinType::Anti,
1107 output_schema,
1108 );
1109
1110 let mut results = Vec::new();
1111 while let Some(chunk) = join.next().unwrap() {
1112 for row in chunk.selected_indices() {
1113 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
1114 results.push(val);
1115 }
1116 }
1117
1118 results.sort();
1119 assert_eq!(results, vec![1, 3]);
1120 }
1121}