1use std::collections::HashMap;
8
9use grafeo_common::types::{LogicalType, Value};
10
11use super::{Operator, OperatorError, OperatorResult};
12use crate::execution::chunk::DataChunkBuilder;
13use crate::execution::{DataChunk, ValueVector};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum JoinType {
18 Inner,
20 Left,
22 Right,
24 Full,
26 Cross,
28 Semi,
30 Anti,
32}
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
36pub enum HashKey {
37 Null,
39 Bool(bool),
41 Int64(i64),
43 String(String),
45 Composite(Vec<HashKey>),
47}
48
49impl HashKey {
50 pub fn from_value(value: &Value) -> Self {
52 match value {
53 Value::Null => HashKey::Null,
54 Value::Bool(b) => HashKey::Bool(*b),
55 Value::Int64(i) => HashKey::Int64(*i),
56 Value::Float64(f) => {
57 HashKey::Int64(f.to_bits() as i64)
59 }
60 Value::String(s) => HashKey::String(s.to_string()),
61 Value::Bytes(b) => {
62 HashKey::String(format!("{b:?}"))
64 }
65 Value::Timestamp(t) => HashKey::Int64(t.as_micros()),
66 Value::Date(d) => HashKey::Int64(d.as_days() as i64),
67 Value::Time(t) => HashKey::Int64(t.as_nanos() as i64),
68 Value::Duration(d) => HashKey::Composite(vec![
69 HashKey::Int64(d.months()),
70 HashKey::Int64(d.days()),
71 HashKey::Int64(d.nanos()),
72 ]),
73 Value::ZonedDatetime(zdt) => HashKey::Int64(zdt.as_timestamp().as_micros()),
74 Value::List(items) => {
75 HashKey::Composite(items.iter().map(HashKey::from_value).collect())
76 }
77 Value::Map(map) => {
78 let mut keys: Vec<_> = map
79 .iter()
80 .map(|(k, v)| {
81 HashKey::Composite(vec![
82 HashKey::String(k.to_string()),
83 HashKey::from_value(v),
84 ])
85 })
86 .collect();
87 keys.sort_by(|a, b| format!("{a:?}").cmp(&format!("{b:?}")));
88 HashKey::Composite(keys)
89 }
90 Value::Vector(v) => {
91 HashKey::Composite(
93 v.iter()
94 .map(|f| HashKey::Int64(f.to_bits() as i64))
95 .collect(),
96 )
97 }
98 Value::Path { nodes, edges } => {
99 let mut parts: Vec<_> = nodes.iter().map(HashKey::from_value).collect();
100 parts.extend(edges.iter().map(HashKey::from_value));
101 HashKey::Composite(parts)
102 }
103 }
104 }
105
106 pub fn from_column(column: &ValueVector, row: usize) -> Option<Self> {
108 column.get_value(row).map(|v| Self::from_value(&v))
109 }
110}
111
112pub struct HashJoinOperator {
117 probe_side: Box<dyn Operator>,
119 build_side: Box<dyn Operator>,
121 probe_keys: Vec<usize>,
123 build_keys: Vec<usize>,
125 join_type: JoinType,
127 output_schema: Vec<LogicalType>,
129 hash_table: HashMap<HashKey, Vec<(usize, usize)>>,
131 build_chunks: Vec<DataChunk>,
133 build_complete: bool,
135 current_probe_chunk: Option<DataChunk>,
137 current_probe_row: usize,
139 current_match_position: usize,
141 current_matches: Vec<(usize, usize)>,
143 probe_matched: Vec<bool>,
145 build_matched: Vec<Vec<bool>>,
147 emitting_unmatched: bool,
149 unmatched_chunk_idx: usize,
151 unmatched_row_idx: usize,
153}
154
155impl HashJoinOperator {
156 pub fn new(
166 probe_side: Box<dyn Operator>,
167 build_side: Box<dyn Operator>,
168 probe_keys: Vec<usize>,
169 build_keys: Vec<usize>,
170 join_type: JoinType,
171 output_schema: Vec<LogicalType>,
172 ) -> Self {
173 Self {
174 probe_side,
175 build_side,
176 probe_keys,
177 build_keys,
178 join_type,
179 output_schema,
180 hash_table: HashMap::new(),
181 build_chunks: Vec::new(),
182 build_complete: false,
183 current_probe_chunk: None,
184 current_probe_row: 0,
185 current_match_position: 0,
186 current_matches: Vec::new(),
187 probe_matched: Vec::new(),
188 build_matched: Vec::new(),
189 emitting_unmatched: false,
190 unmatched_chunk_idx: 0,
191 unmatched_row_idx: 0,
192 }
193 }
194
195 fn build_hash_table(&mut self) -> Result<(), OperatorError> {
197 while let Some(chunk) = self.build_side.next()? {
198 let chunk_idx = self.build_chunks.len();
199
200 if matches!(self.join_type, JoinType::Right | JoinType::Full) {
202 self.build_matched.push(vec![false; chunk.row_count()]);
203 }
204
205 for row in chunk.selected_indices() {
207 let key = self.extract_key(&chunk, row, &self.build_keys)?;
208
209 if matches!(key, HashKey::Null)
211 && !matches!(
212 self.join_type,
213 JoinType::Left | JoinType::Right | JoinType::Full
214 )
215 {
216 continue;
217 }
218
219 self.hash_table
220 .entry(key)
221 .or_default()
222 .push((chunk_idx, row));
223 }
224
225 self.build_chunks.push(chunk);
226 }
227
228 self.build_complete = true;
229 Ok(())
230 }
231
232 fn extract_key(
234 &self,
235 chunk: &DataChunk,
236 row: usize,
237 key_columns: &[usize],
238 ) -> Result<HashKey, OperatorError> {
239 if key_columns.len() == 1 {
240 let col = chunk.column(key_columns[0]).ok_or_else(|| {
241 OperatorError::ColumnNotFound(format!("column {}", key_columns[0]))
242 })?;
243 Ok(HashKey::from_column(col, row).unwrap_or(HashKey::Null))
244 } else {
245 let keys: Vec<HashKey> = key_columns
246 .iter()
247 .map(|&col_idx| {
248 chunk
249 .column(col_idx)
250 .and_then(|col| HashKey::from_column(col, row))
251 .unwrap_or(HashKey::Null)
252 })
253 .collect();
254 Ok(HashKey::Composite(keys))
255 }
256 }
257
258 fn produce_output_row(
260 &self,
261 builder: &mut DataChunkBuilder,
262 probe_chunk: &DataChunk,
263 probe_row: usize,
264 build_chunk: Option<&DataChunk>,
265 build_row: Option<usize>,
266 ) -> Result<(), OperatorError> {
267 let probe_col_count = probe_chunk.column_count();
268
269 for col_idx in 0..probe_col_count {
271 let src_col = probe_chunk
272 .column(col_idx)
273 .ok_or_else(|| OperatorError::ColumnNotFound(format!("probe column {col_idx}")))?;
274 let dst_col = builder
275 .column_mut(col_idx)
276 .ok_or_else(|| OperatorError::ColumnNotFound(format!("output column {col_idx}")))?;
277
278 if let Some(value) = src_col.get_value(probe_row) {
279 dst_col.push_value(value);
280 } else {
281 dst_col.push_value(Value::Null);
282 }
283 }
284
285 match (build_chunk, build_row) {
287 (Some(chunk), Some(row)) => {
288 for col_idx in 0..chunk.column_count() {
289 let src_col = chunk.column(col_idx).ok_or_else(|| {
290 OperatorError::ColumnNotFound(format!("build column {col_idx}"))
291 })?;
292 let dst_col =
293 builder
294 .column_mut(probe_col_count + col_idx)
295 .ok_or_else(|| {
296 OperatorError::ColumnNotFound(format!(
297 "output column {}",
298 probe_col_count + col_idx
299 ))
300 })?;
301
302 if let Some(value) = src_col.get_value(row) {
303 dst_col.push_value(value);
304 } else {
305 dst_col.push_value(Value::Null);
306 }
307 }
308 }
309 _ => {
310 if !self.build_chunks.is_empty() {
312 let build_col_count = self.build_chunks[0].column_count();
313 for col_idx in 0..build_col_count {
314 let dst_col =
315 builder
316 .column_mut(probe_col_count + col_idx)
317 .ok_or_else(|| {
318 OperatorError::ColumnNotFound(format!(
319 "output column {}",
320 probe_col_count + col_idx
321 ))
322 })?;
323 dst_col.push_value(Value::Null);
324 }
325 }
326 }
327 }
328
329 builder.advance_row();
330 Ok(())
331 }
332
333 fn get_next_probe_chunk(&mut self) -> Result<bool, OperatorError> {
335 let chunk = self.probe_side.next()?;
336 if let Some(ref c) = chunk {
337 if matches!(self.join_type, JoinType::Left | JoinType::Full) {
339 self.probe_matched = vec![false; c.row_count()];
340 }
341 }
342 let has_chunk = chunk.is_some();
343 self.current_probe_chunk = chunk;
344 self.current_probe_row = 0;
345 Ok(has_chunk)
346 }
347
348 fn emit_unmatched_build(&mut self) -> OperatorResult {
350 if self.build_matched.is_empty() {
351 return Ok(None);
352 }
353
354 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
355
356 let probe_col_count = if !self.build_chunks.is_empty() {
358 self.output_schema.len() - self.build_chunks[0].column_count()
359 } else {
360 0
361 };
362
363 while self.unmatched_chunk_idx < self.build_chunks.len() {
364 let chunk = &self.build_chunks[self.unmatched_chunk_idx];
365 let matched = &self.build_matched[self.unmatched_chunk_idx];
366
367 while self.unmatched_row_idx < matched.len() {
368 if !matched[self.unmatched_row_idx] {
369 for col_idx in 0..probe_col_count {
373 if let Some(dst_col) = builder.column_mut(col_idx) {
374 dst_col.push_value(Value::Null);
375 }
376 }
377
378 for col_idx in 0..chunk.column_count() {
380 if let (Some(src_col), Some(dst_col)) = (
381 chunk.column(col_idx),
382 builder.column_mut(probe_col_count + col_idx),
383 ) {
384 if let Some(value) = src_col.get_value(self.unmatched_row_idx) {
385 dst_col.push_value(value);
386 } else {
387 dst_col.push_value(Value::Null);
388 }
389 }
390 }
391
392 builder.advance_row();
393
394 if builder.is_full() {
395 self.unmatched_row_idx += 1;
396 return Ok(Some(builder.finish()));
397 }
398 }
399
400 self.unmatched_row_idx += 1;
401 }
402
403 self.unmatched_chunk_idx += 1;
404 self.unmatched_row_idx = 0;
405 }
406
407 if builder.row_count() > 0 {
408 Ok(Some(builder.finish()))
409 } else {
410 Ok(None)
411 }
412 }
413}
414
415impl Operator for HashJoinOperator {
416 fn next(&mut self) -> OperatorResult {
417 if !self.build_complete {
419 self.build_hash_table()?;
420 }
421
422 if self.emitting_unmatched {
424 return self.emit_unmatched_build();
425 }
426
427 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
429
430 loop {
431 if self.current_probe_chunk.is_none() && !self.get_next_probe_chunk()? {
433 if matches!(self.join_type, JoinType::Right | JoinType::Full) {
435 self.emitting_unmatched = true;
436 return self.emit_unmatched_build();
437 }
438 return if builder.row_count() > 0 {
439 Ok(Some(builder.finish()))
440 } else {
441 Ok(None)
442 };
443 }
444
445 let probe_chunk = self
448 .current_probe_chunk
449 .as_ref()
450 .expect("probe chunk is Some: guard at line 396 ensures this");
451 let probe_rows: Vec<usize> = probe_chunk.selected_indices().collect();
452
453 while self.current_probe_row < probe_rows.len() {
454 let probe_row = probe_rows[self.current_probe_row];
455
456 if self.current_matches.is_empty() && self.current_match_position == 0 {
458 let key = self.extract_key(probe_chunk, probe_row, &self.probe_keys)?;
459
460 match self.join_type {
462 JoinType::Semi => {
463 if self.hash_table.contains_key(&key) {
464 for col_idx in 0..probe_chunk.column_count() {
466 if let (Some(src_col), Some(dst_col)) =
467 (probe_chunk.column(col_idx), builder.column_mut(col_idx))
468 && let Some(value) = src_col.get_value(probe_row)
469 {
470 dst_col.push_value(value);
471 }
472 }
473 builder.advance_row();
474 }
475 self.current_probe_row += 1;
476 continue;
477 }
478 JoinType::Anti => {
479 if !self.hash_table.contains_key(&key) {
480 for col_idx in 0..probe_chunk.column_count() {
482 if let (Some(src_col), Some(dst_col)) =
483 (probe_chunk.column(col_idx), builder.column_mut(col_idx))
484 && let Some(value) = src_col.get_value(probe_row)
485 {
486 dst_col.push_value(value);
487 }
488 }
489 builder.advance_row();
490 }
491 self.current_probe_row += 1;
492 continue;
493 }
494 _ => {
495 self.current_matches =
496 self.hash_table.get(&key).cloned().unwrap_or_default();
497 }
498 }
499 }
500
501 if self.current_matches.is_empty() {
503 if matches!(self.join_type, JoinType::Left | JoinType::Full) {
505 self.produce_output_row(&mut builder, probe_chunk, probe_row, None, None)?;
506 }
507 self.current_probe_row += 1;
508 self.current_match_position = 0;
509 } else {
510 while self.current_match_position < self.current_matches.len() {
512 let (build_chunk_idx, build_row) =
513 self.current_matches[self.current_match_position];
514 let build_chunk = &self.build_chunks[build_chunk_idx];
515
516 if matches!(self.join_type, JoinType::Left | JoinType::Full)
518 && probe_row < self.probe_matched.len()
519 {
520 self.probe_matched[probe_row] = true;
521 }
522 if matches!(self.join_type, JoinType::Right | JoinType::Full)
523 && build_chunk_idx < self.build_matched.len()
524 && build_row < self.build_matched[build_chunk_idx].len()
525 {
526 self.build_matched[build_chunk_idx][build_row] = true;
527 }
528
529 self.produce_output_row(
530 &mut builder,
531 probe_chunk,
532 probe_row,
533 Some(build_chunk),
534 Some(build_row),
535 )?;
536
537 self.current_match_position += 1;
538
539 if builder.is_full() {
540 return Ok(Some(builder.finish()));
541 }
542 }
543
544 self.current_probe_row += 1;
546 self.current_matches.clear();
547 self.current_match_position = 0;
548 }
549
550 if builder.is_full() {
551 return Ok(Some(builder.finish()));
552 }
553 }
554
555 self.current_probe_chunk = None;
557 self.current_probe_row = 0;
558
559 if builder.row_count() > 0 {
560 return Ok(Some(builder.finish()));
561 }
562 }
563 }
564
565 fn reset(&mut self) {
566 self.probe_side.reset();
567 self.build_side.reset();
568 self.hash_table.clear();
569 self.build_chunks.clear();
570 self.build_complete = false;
571 self.current_probe_chunk = None;
572 self.current_probe_row = 0;
573 self.current_match_position = 0;
574 self.current_matches.clear();
575 self.probe_matched.clear();
576 self.build_matched.clear();
577 self.emitting_unmatched = false;
578 self.unmatched_chunk_idx = 0;
579 self.unmatched_row_idx = 0;
580 }
581
582 fn name(&self) -> &'static str {
583 "HashJoin"
584 }
585}
586
587pub struct NestedLoopJoinOperator {
592 left: Box<dyn Operator>,
594 right: Box<dyn Operator>,
596 condition: Option<Box<dyn JoinCondition>>,
598 join_type: JoinType,
600 output_schema: Vec<LogicalType>,
602 right_chunks: Vec<DataChunk>,
604 right_materialized: bool,
606 current_left_chunk: Option<DataChunk>,
608 current_left_row: usize,
610 current_right_chunk: usize,
612 current_left_matched: bool,
614 current_right_row: usize,
616}
617
618pub trait JoinCondition: Send + Sync {
620 fn evaluate(
622 &self,
623 left_chunk: &DataChunk,
624 left_row: usize,
625 right_chunk: &DataChunk,
626 right_row: usize,
627 ) -> bool;
628}
629
630pub struct EqualityCondition {
632 left_column: usize,
634 right_column: usize,
636}
637
638impl EqualityCondition {
639 pub fn new(left_column: usize, right_column: usize) -> Self {
641 Self {
642 left_column,
643 right_column,
644 }
645 }
646}
647
648impl JoinCondition for EqualityCondition {
649 fn evaluate(
650 &self,
651 left_chunk: &DataChunk,
652 left_row: usize,
653 right_chunk: &DataChunk,
654 right_row: usize,
655 ) -> bool {
656 let left_val = left_chunk
657 .column(self.left_column)
658 .and_then(|c| c.get_value(left_row));
659 let right_val = right_chunk
660 .column(self.right_column)
661 .and_then(|c| c.get_value(right_row));
662
663 match (left_val, right_val) {
664 (Some(l), Some(r)) => l == r,
665 _ => false,
666 }
667 }
668}
669
670impl NestedLoopJoinOperator {
671 pub fn new(
673 left: Box<dyn Operator>,
674 right: Box<dyn Operator>,
675 condition: Option<Box<dyn JoinCondition>>,
676 join_type: JoinType,
677 output_schema: Vec<LogicalType>,
678 ) -> Self {
679 Self {
680 left,
681 right,
682 condition,
683 join_type,
684 output_schema,
685 right_chunks: Vec::new(),
686 right_materialized: false,
687 current_left_chunk: None,
688 current_left_row: 0,
689 current_right_chunk: 0,
690 current_right_row: 0,
691 current_left_matched: false,
692 }
693 }
694
695 fn materialize_right(&mut self) -> Result<(), OperatorError> {
697 while let Some(chunk) = self.right.next()? {
698 self.right_chunks.push(chunk);
699 }
700 self.right_materialized = true;
701 Ok(())
702 }
703
704 fn produce_row(
706 &self,
707 builder: &mut DataChunkBuilder,
708 left_chunk: &DataChunk,
709 left_row: usize,
710 right_chunk: &DataChunk,
711 right_row: usize,
712 ) {
713 for col_idx in 0..left_chunk.column_count() {
715 if let (Some(src), Some(dst)) =
716 (left_chunk.column(col_idx), builder.column_mut(col_idx))
717 {
718 if let Some(val) = src.get_value(left_row) {
719 dst.push_value(val);
720 } else {
721 dst.push_value(Value::Null);
722 }
723 }
724 }
725
726 let left_col_count = left_chunk.column_count();
728 for col_idx in 0..right_chunk.column_count() {
729 if let (Some(src), Some(dst)) = (
730 right_chunk.column(col_idx),
731 builder.column_mut(left_col_count + col_idx),
732 ) {
733 if let Some(val) = src.get_value(right_row) {
734 dst.push_value(val);
735 } else {
736 dst.push_value(Value::Null);
737 }
738 }
739 }
740
741 builder.advance_row();
742 }
743
744 fn produce_left_unmatched_row(
746 &self,
747 builder: &mut DataChunkBuilder,
748 left_chunk: &DataChunk,
749 left_row: usize,
750 right_col_count: usize,
751 ) {
752 for col_idx in 0..left_chunk.column_count() {
754 if let (Some(src), Some(dst)) =
755 (left_chunk.column(col_idx), builder.column_mut(col_idx))
756 {
757 if let Some(val) = src.get_value(left_row) {
758 dst.push_value(val);
759 } else {
760 dst.push_value(Value::Null);
761 }
762 }
763 }
764
765 let left_col_count = left_chunk.column_count();
767 for col_idx in 0..right_col_count {
768 if let Some(dst) = builder.column_mut(left_col_count + col_idx) {
769 dst.push_value(Value::Null);
770 }
771 }
772
773 builder.advance_row();
774 }
775}
776
777impl Operator for NestedLoopJoinOperator {
778 fn next(&mut self) -> OperatorResult {
779 if !self.right_materialized {
781 self.materialize_right()?;
782 }
783
784 if self.right_chunks.is_empty() && !matches!(self.join_type, JoinType::Left) {
786 return Ok(None);
787 }
788
789 let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, 2048);
790
791 loop {
792 if self.current_left_chunk.is_none() {
794 self.current_left_chunk = self.left.next()?;
795 self.current_left_row = 0;
796 self.current_right_chunk = 0;
797 self.current_right_row = 0;
798
799 if self.current_left_chunk.is_none() {
800 return if builder.row_count() > 0 {
802 Ok(Some(builder.finish()))
803 } else {
804 Ok(None)
805 };
806 }
807 }
808
809 let left_chunk = self
810 .current_left_chunk
811 .as_ref()
812 .expect("left chunk is Some: loaded in loop above");
813 let left_rows: Vec<usize> = left_chunk.selected_indices().collect();
814
815 let right_col_count = if !self.right_chunks.is_empty() {
817 self.right_chunks[0].column_count()
818 } else {
819 self.output_schema
821 .len()
822 .saturating_sub(left_chunk.column_count())
823 };
824
825 while self.current_left_row < left_rows.len() {
827 let left_row = left_rows[self.current_left_row];
828
829 if self.current_right_chunk == 0 && self.current_right_row == 0 {
831 self.current_left_matched = false;
832 }
833
834 while self.current_right_chunk < self.right_chunks.len() {
836 let right_chunk = &self.right_chunks[self.current_right_chunk];
837 let right_rows: Vec<usize> = right_chunk.selected_indices().collect();
838
839 while self.current_right_row < right_rows.len() {
840 let right_row = right_rows[self.current_right_row];
841
842 let matches = match &self.condition {
844 Some(cond) => {
845 cond.evaluate(left_chunk, left_row, right_chunk, right_row)
846 }
847 None => true, };
849
850 if matches {
851 self.current_left_matched = true;
852 self.produce_row(
853 &mut builder,
854 left_chunk,
855 left_row,
856 right_chunk,
857 right_row,
858 );
859
860 if builder.is_full() {
861 self.current_right_row += 1;
862 return Ok(Some(builder.finish()));
863 }
864 }
865
866 self.current_right_row += 1;
867 }
868
869 self.current_right_chunk += 1;
870 self.current_right_row = 0;
871 }
872
873 if matches!(self.join_type, JoinType::Left) && !self.current_left_matched {
876 self.produce_left_unmatched_row(
877 &mut builder,
878 left_chunk,
879 left_row,
880 right_col_count,
881 );
882
883 if builder.is_full() {
884 self.current_left_row += 1;
885 self.current_right_chunk = 0;
886 self.current_right_row = 0;
887 return Ok(Some(builder.finish()));
888 }
889 }
890
891 self.current_left_row += 1;
893 self.current_right_chunk = 0;
894 self.current_right_row = 0;
895 }
896
897 self.current_left_chunk = None;
899
900 if builder.row_count() > 0 {
901 return Ok(Some(builder.finish()));
902 }
903 }
904 }
905
906 fn reset(&mut self) {
907 self.left.reset();
908 self.right.reset();
909 self.right_chunks.clear();
910 self.right_materialized = false;
911 self.current_left_chunk = None;
912 self.current_left_row = 0;
913 self.current_right_chunk = 0;
914 self.current_right_row = 0;
915 self.current_left_matched = false;
916 }
917
918 fn name(&self) -> &'static str {
919 "NestedLoopJoin"
920 }
921}
922
923#[cfg(test)]
924mod tests {
925 use super::*;
926 use crate::execution::chunk::DataChunkBuilder;
927
928 struct MockOperator {
930 chunks: Vec<DataChunk>,
931 position: usize,
932 }
933
934 impl MockOperator {
935 fn new(chunks: Vec<DataChunk>) -> Self {
936 Self {
937 chunks,
938 position: 0,
939 }
940 }
941 }
942
943 impl Operator for MockOperator {
944 fn next(&mut self) -> OperatorResult {
945 if self.position < self.chunks.len() {
946 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
947 self.position += 1;
948 Ok(Some(chunk))
949 } else {
950 Ok(None)
951 }
952 }
953
954 fn reset(&mut self) {
955 self.position = 0;
956 }
957
958 fn name(&self) -> &'static str {
959 "Mock"
960 }
961 }
962
963 fn create_int_chunk(values: &[i64]) -> DataChunk {
964 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
965 for &v in values {
966 builder.column_mut(0).unwrap().push_int64(v);
967 builder.advance_row();
968 }
969 builder.finish()
970 }
971
972 #[test]
973 fn test_hash_join_inner() {
974 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
979 let right = MockOperator::new(vec![create_int_chunk(&[2, 3, 4, 5])]);
980
981 let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
982 let mut join = HashJoinOperator::new(
983 Box::new(left),
984 Box::new(right),
985 vec![0],
986 vec![0],
987 JoinType::Inner,
988 output_schema,
989 );
990
991 let mut results = Vec::new();
992 while let Some(chunk) = join.next().unwrap() {
993 for row in chunk.selected_indices() {
994 let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
995 let right_val = chunk.column(1).unwrap().get_int64(row).unwrap();
996 results.push((left_val, right_val));
997 }
998 }
999
1000 results.sort_unstable();
1001 assert_eq!(results, vec![(2, 2), (3, 3), (4, 4)]);
1002 }
1003
1004 #[test]
1005 fn test_hash_join_left_outer() {
1006 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3])]);
1011 let right = MockOperator::new(vec![create_int_chunk(&[2, 3])]);
1012
1013 let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
1014 let mut join = HashJoinOperator::new(
1015 Box::new(left),
1016 Box::new(right),
1017 vec![0],
1018 vec![0],
1019 JoinType::Left,
1020 output_schema,
1021 );
1022
1023 let mut results = Vec::new();
1024 while let Some(chunk) = join.next().unwrap() {
1025 for row in chunk.selected_indices() {
1026 let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
1027 let right_val = chunk.column(1).unwrap().get_int64(row);
1028 results.push((left_val, right_val));
1029 }
1030 }
1031
1032 results.sort_by_key(|(l, _)| *l);
1033 assert_eq!(results.len(), 3);
1034 assert_eq!(results[0], (1, None)); assert_eq!(results[1], (2, Some(2)));
1036 assert_eq!(results[2], (3, Some(3)));
1037 }
1038
1039 #[test]
1040 fn test_nested_loop_cross_join() {
1041 let left = MockOperator::new(vec![create_int_chunk(&[1, 2])]);
1046 let right = MockOperator::new(vec![create_int_chunk(&[10, 20])]);
1047
1048 let output_schema = vec![LogicalType::Int64, LogicalType::Int64];
1049 let mut join = NestedLoopJoinOperator::new(
1050 Box::new(left),
1051 Box::new(right),
1052 None,
1053 JoinType::Cross,
1054 output_schema,
1055 );
1056
1057 let mut results = Vec::new();
1058 while let Some(chunk) = join.next().unwrap() {
1059 for row in chunk.selected_indices() {
1060 let left_val = chunk.column(0).unwrap().get_int64(row).unwrap();
1061 let right_val = chunk.column(1).unwrap().get_int64(row).unwrap();
1062 results.push((left_val, right_val));
1063 }
1064 }
1065
1066 results.sort_unstable();
1067 assert_eq!(results, vec![(1, 10), (1, 20), (2, 10), (2, 20)]);
1068 }
1069
1070 #[test]
1071 fn test_hash_join_semi() {
1072 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
1077 let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
1078
1079 let output_schema = vec![LogicalType::Int64];
1081 let mut join = HashJoinOperator::new(
1082 Box::new(left),
1083 Box::new(right),
1084 vec![0],
1085 vec![0],
1086 JoinType::Semi,
1087 output_schema,
1088 );
1089
1090 let mut results = Vec::new();
1091 while let Some(chunk) = join.next().unwrap() {
1092 for row in chunk.selected_indices() {
1093 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
1094 results.push(val);
1095 }
1096 }
1097
1098 results.sort_unstable();
1099 assert_eq!(results, vec![2, 4]);
1100 }
1101
1102 #[test]
1103 fn test_hash_join_anti() {
1104 let left = MockOperator::new(vec![create_int_chunk(&[1, 2, 3, 4])]);
1109 let right = MockOperator::new(vec![create_int_chunk(&[2, 4])]);
1110
1111 let output_schema = vec![LogicalType::Int64];
1112 let mut join = HashJoinOperator::new(
1113 Box::new(left),
1114 Box::new(right),
1115 vec![0],
1116 vec![0],
1117 JoinType::Anti,
1118 output_schema,
1119 );
1120
1121 let mut results = Vec::new();
1122 while let Some(chunk) = join.next().unwrap() {
1123 for row in chunk.selected_indices() {
1124 let val = chunk.column(0).unwrap().get_int64(row).unwrap();
1125 results.push(val);
1126 }
1127 }
1128
1129 results.sort_unstable();
1130 assert_eq!(results, vec![1, 3]);
1131 }
1132}