quill_sql/execution/physical_plan/
nested_loop_join.rs1use log::debug;
2use std::sync::{Arc, Mutex};
3
4use crate::catalog::SchemaRef;
5use crate::expression::Expr;
6use crate::{
7 error::QuillSQLResult,
8 execution::{ExecutionContext, VolcanoExecutor},
9 plan::logical_plan::JoinType,
10 storage::tuple::Tuple,
11};
12
13use super::PhysicalPlan;
14
15#[derive(Debug)]
16pub struct PhysicalNestedLoopJoin {
17 pub join_type: JoinType,
18 pub condition: Option<Expr>,
19 pub left_input: Arc<PhysicalPlan>,
20 pub right_input: Arc<PhysicalPlan>,
21 pub schema: SchemaRef,
22
23 left_tuple: Mutex<Option<Tuple>>,
24}
25impl PhysicalNestedLoopJoin {
26 pub fn new(
27 join_type: JoinType,
28 condition: Option<Expr>,
29 left_input: Arc<PhysicalPlan>,
30 right_input: Arc<PhysicalPlan>,
31 schema: SchemaRef,
32 ) -> Self {
33 PhysicalNestedLoopJoin {
34 join_type,
35 condition,
36 left_input,
37 right_input,
38 schema,
39 left_tuple: Mutex::new(None),
40 }
41 }
42}
43impl VolcanoExecutor for PhysicalNestedLoopJoin {
44 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
45 debug!("init nested loop join executor");
46 self.left_input.init(context)?;
47 self.right_input.init(context)?;
48 *self.left_tuple.lock().unwrap() = None;
49 Ok(())
50 }
51 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
52 let left_tuple = self.left_tuple.lock().unwrap();
53 let mut left_next_tuple = if left_tuple.is_none() {
54 self.left_input.next(context)?
55 } else {
56 Some(left_tuple.clone().unwrap())
57 };
58 drop(left_tuple);
60
61 while left_next_tuple.is_some() {
62 let left_tuple = left_next_tuple.clone().unwrap();
63
64 let mut right_next_tuple = self.right_input.next(context)?;
65 while right_next_tuple.is_some() {
66 let right_tuple = right_next_tuple.unwrap();
67
68 if let Some(condition) = &self.condition {
70 let merged_tuple =
71 Tuple::try_merge(vec![left_tuple.clone(), right_tuple.clone()])?;
72 if context.eval_predicate(condition, &merged_tuple)? {
73 *self.left_tuple.lock().unwrap() = Some(left_tuple.clone());
74 return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
75 }
76 } else {
77 *self.left_tuple.lock().unwrap() = Some(left_tuple.clone());
79
80 return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
81 }
82
83 right_next_tuple = self.right_input.next(context)?;
84 }
85
86 self.right_input.init(context)?;
88 left_next_tuple = self.left_input.next(context)?;
89 }
90 Ok(None)
91 }
92
93 fn output_schema(&self) -> SchemaRef {
94 self.schema.clone()
95 }
96}
97
98impl std::fmt::Display for PhysicalNestedLoopJoin {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 write!(f, "NestedLoopJoin")
101 }
102}