quill_sql/execution/physical_plan/
nested_loop_join.rs1use log::debug;
2use std::sync::{Arc, Mutex};
3
4use crate::catalog::SchemaRef;
5use crate::expression::{Expr, ExprTrait};
6use crate::utils::scalar::ScalarValue;
7use crate::{
8 error::QuillSQLResult,
9 execution::{ExecutionContext, VolcanoExecutor},
10 plan::logical_plan::JoinType,
11 storage::tuple::Tuple,
12};
13
14use super::PhysicalPlan;
15
16#[derive(Debug)]
17pub struct PhysicalNestedLoopJoin {
18 pub join_type: JoinType,
19 pub condition: Option<Expr>,
20 pub left_input: Arc<PhysicalPlan>,
21 pub right_input: Arc<PhysicalPlan>,
22 pub schema: SchemaRef,
23
24 left_tuple: Mutex<Option<Tuple>>,
25}
26impl PhysicalNestedLoopJoin {
27 pub fn new(
28 join_type: JoinType,
29 condition: Option<Expr>,
30 left_input: Arc<PhysicalPlan>,
31 right_input: Arc<PhysicalPlan>,
32 schema: SchemaRef,
33 ) -> Self {
34 PhysicalNestedLoopJoin {
35 join_type,
36 condition,
37 left_input,
38 right_input,
39 schema,
40 left_tuple: Mutex::new(None),
41 }
42 }
43}
44impl VolcanoExecutor for PhysicalNestedLoopJoin {
45 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
46 debug!("init nested loop join executor");
47 self.left_input.init(context)?;
48 self.right_input.init(context)?;
49 *self.left_tuple.lock().unwrap() = None;
50 Ok(())
51 }
52 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
53 let left_tuple = self.left_tuple.lock().unwrap();
54 let mut left_next_tuple = if left_tuple.is_none() {
55 self.left_input.next(context)?
56 } else {
57 Some(left_tuple.clone().unwrap())
58 };
59 drop(left_tuple);
61
62 while left_next_tuple.is_some() {
63 let left_tuple = left_next_tuple.clone().unwrap();
64
65 let mut right_next_tuple = self.right_input.next(context)?;
66 while right_next_tuple.is_some() {
67 let right_tuple = right_next_tuple.unwrap();
68
69 if self.condition.is_none() {
71 *self.left_tuple.lock().unwrap() = Some(left_tuple.clone());
73
74 return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
75 } else {
76 let condition = self.condition.clone().unwrap();
77 let merged_tuple =
78 Tuple::try_merge(vec![left_tuple.clone(), right_tuple.clone()])?;
79 let evaluate_res = condition.evaluate(&merged_tuple)?;
80 if let ScalarValue::Boolean(Some(v)) = evaluate_res {
82 if v {
83 *self.left_tuple.lock().unwrap() = Some(left_tuple.clone());
85
86 return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
87 }
88 } else {
89 panic!("nested loop join condition should be boolean")
90 }
91 }
92
93 right_next_tuple = self.right_input.next(context)?;
94 }
95
96 self.right_input.init(context)?;
98 left_next_tuple = self.left_input.next(context)?;
99 }
100 Ok(None)
101 }
102
103 fn output_schema(&self) -> SchemaRef {
104 self.schema.clone()
105 }
106}
107
108impl std::fmt::Display for PhysicalNestedLoopJoin {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 write!(f, "NestedLoopJoin")
111 }
112}