quill_sql/execution/physical_plan/
nested_loop_join.rs

1use log::debug;
2use std::sync::{Arc, Mutex};
3
4use crate::catalog::SchemaRef;
5use crate::expression::{Expr, ExprTrait};
6use crate::utils::scalar::ScalarValue;
7use crate::{
8    error::QuillSQLResult,
9    execution::{ExecutionContext, VolcanoExecutor},
10    plan::logical_plan::JoinType,
11    storage::tuple::Tuple,
12};
13
14use super::PhysicalPlan;
15
16#[derive(Debug)]
17pub struct PhysicalNestedLoopJoin {
18    pub join_type: JoinType,
19    pub condition: Option<Expr>,
20    pub left_input: Arc<PhysicalPlan>,
21    pub right_input: Arc<PhysicalPlan>,
22    pub schema: SchemaRef,
23
24    left_tuple: Mutex<Option<Tuple>>,
25}
26impl PhysicalNestedLoopJoin {
27    pub fn new(
28        join_type: JoinType,
29        condition: Option<Expr>,
30        left_input: Arc<PhysicalPlan>,
31        right_input: Arc<PhysicalPlan>,
32        schema: SchemaRef,
33    ) -> Self {
34        PhysicalNestedLoopJoin {
35            join_type,
36            condition,
37            left_input,
38            right_input,
39            schema,
40            left_tuple: Mutex::new(None),
41        }
42    }
43}
44impl VolcanoExecutor for PhysicalNestedLoopJoin {
45    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
46        debug!("init nested loop join executor");
47        self.left_input.init(context)?;
48        self.right_input.init(context)?;
49        *self.left_tuple.lock().unwrap() = None;
50        Ok(())
51    }
52    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
53        let left_tuple = self.left_tuple.lock().unwrap();
54        let mut left_next_tuple = if left_tuple.is_none() {
55            self.left_input.next(context)?
56        } else {
57            Some(left_tuple.clone().unwrap())
58        };
59        // release mutex
60        drop(left_tuple);
61
62        while left_next_tuple.is_some() {
63            let left_tuple = left_next_tuple.clone().unwrap();
64
65            let mut right_next_tuple = self.right_input.next(context)?;
66            while right_next_tuple.is_some() {
67                let right_tuple = right_next_tuple.unwrap();
68
69                // TODO judge if matches
70                if self.condition.is_none() {
71                    // save latest left_next_result before return
72                    *self.left_tuple.lock().unwrap() = Some(left_tuple.clone());
73
74                    return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
75                } else {
76                    let condition = self.condition.clone().unwrap();
77                    let merged_tuple =
78                        Tuple::try_merge(vec![left_tuple.clone(), right_tuple.clone()])?;
79                    let evaluate_res = condition.evaluate(&merged_tuple)?;
80                    // TODO support left/right join after null support added
81                    if let ScalarValue::Boolean(Some(v)) = evaluate_res {
82                        if v {
83                            // save latest left_next_result before return
84                            *self.left_tuple.lock().unwrap() = Some(left_tuple.clone());
85
86                            return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
87                        }
88                    } else {
89                        panic!("nested loop join condition should be boolean")
90                    }
91                }
92
93                right_next_tuple = self.right_input.next(context)?;
94            }
95
96            // reset right executor
97            self.right_input.init(context)?;
98            left_next_tuple = self.left_input.next(context)?;
99        }
100        Ok(None)
101    }
102
103    fn output_schema(&self) -> SchemaRef {
104        self.schema.clone()
105    }
106}
107
108impl std::fmt::Display for PhysicalNestedLoopJoin {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        write!(f, "NestedLoopJoin")
111    }
112}