quill_sql/execution/physical_plan/
nested_loop_join.rs1use log::debug;
4use std::cell::RefCell;
5use std::rc::Rc;
6
7use crate::catalog::SchemaRef;
8use crate::expression::Expr;
9use crate::{
10 error::QuillSQLResult,
11 execution::{ExecutionContext, VolcanoExecutor},
12 plan::logical_plan::JoinType,
13 storage::tuple::Tuple,
14};
15
16use super::PhysicalPlan;
17
18#[derive(Debug)]
19pub struct PhysicalNestedLoopJoin {
20 pub join_type: JoinType,
21 pub condition: Option<Expr>,
22 pub left_input: Rc<PhysicalPlan>,
23 pub right_input: Rc<PhysicalPlan>,
24 pub schema: SchemaRef,
25
26 left_tuple: RefCell<Option<Tuple>>,
27}
28impl PhysicalNestedLoopJoin {
29 pub fn new(
30 join_type: JoinType,
31 condition: Option<Expr>,
32 left_input: Rc<PhysicalPlan>,
33 right_input: Rc<PhysicalPlan>,
34 schema: SchemaRef,
35 ) -> Self {
36 PhysicalNestedLoopJoin {
37 join_type,
38 condition,
39 left_input,
40 right_input,
41 schema,
42 left_tuple: RefCell::new(None),
43 }
44 }
45}
46impl VolcanoExecutor for PhysicalNestedLoopJoin {
47 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
48 debug!("init nested loop join executor");
49 self.left_input.init(context)?;
50 self.right_input.init(context)?;
51 self.left_tuple.borrow_mut().take();
52 Ok(())
53 }
54 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
55 let mut left_next_tuple = if self.left_tuple.borrow().is_none() {
56 self.left_input.next(context)?
57 } else {
58 self.left_tuple.borrow().clone()
59 };
60
61 while left_next_tuple.is_some() {
62 let left_tuple = left_next_tuple.clone().unwrap();
63
64 let mut right_next_tuple = self.right_input.next(context)?;
65 while right_next_tuple.is_some() {
66 let right_tuple = right_next_tuple.unwrap();
67
68 if let Some(condition) = &self.condition {
70 let merged_tuple =
71 Tuple::try_merge(vec![left_tuple.clone(), right_tuple.clone()])?;
72 if context.eval_predicate(condition, &merged_tuple)? {
73 self.left_tuple.borrow_mut().replace(left_tuple.clone());
74 return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
75 }
76 } else {
77 self.left_tuple.borrow_mut().replace(left_tuple.clone());
79
80 return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
81 }
82
83 right_next_tuple = self.right_input.next(context)?;
84 }
85
86 self.right_input.init(context)?;
88 left_next_tuple = self.left_input.next(context)?;
89 }
90 Ok(None)
91 }
92
93 fn output_schema(&self) -> SchemaRef {
94 self.schema.clone()
95 }
96}
97
98impl std::fmt::Display for PhysicalNestedLoopJoin {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "NestedLoopJoin")
101 }
102}