quill_sql/execution/physical_plan/
nested_loop_join.rs

1use log::debug;
2use std::sync::{Arc, Mutex};
3
4use crate::catalog::SchemaRef;
5use crate::expression::Expr;
6use crate::{
7    error::QuillSQLResult,
8    execution::{ExecutionContext, VolcanoExecutor},
9    plan::logical_plan::JoinType,
10    storage::tuple::Tuple,
11};
12
13use super::PhysicalPlan;
14
15#[derive(Debug)]
16pub struct PhysicalNestedLoopJoin {
17    pub join_type: JoinType,
18    pub condition: Option<Expr>,
19    pub left_input: Arc<PhysicalPlan>,
20    pub right_input: Arc<PhysicalPlan>,
21    pub schema: SchemaRef,
22
23    left_tuple: Mutex<Option<Tuple>>,
24}
25impl PhysicalNestedLoopJoin {
26    pub fn new(
27        join_type: JoinType,
28        condition: Option<Expr>,
29        left_input: Arc<PhysicalPlan>,
30        right_input: Arc<PhysicalPlan>,
31        schema: SchemaRef,
32    ) -> Self {
33        PhysicalNestedLoopJoin {
34            join_type,
35            condition,
36            left_input,
37            right_input,
38            schema,
39            left_tuple: Mutex::new(None),
40        }
41    }
42}
43impl VolcanoExecutor for PhysicalNestedLoopJoin {
44    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
45        debug!("init nested loop join executor");
46        self.left_input.init(context)?;
47        self.right_input.init(context)?;
48        *self.left_tuple.lock().unwrap() = None;
49        Ok(())
50    }
51    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
52        let left_tuple = self.left_tuple.lock().unwrap();
53        let mut left_next_tuple = if left_tuple.is_none() {
54            self.left_input.next(context)?
55        } else {
56            Some(left_tuple.clone().unwrap())
57        };
58        // release mutex
59        drop(left_tuple);
60
61        while left_next_tuple.is_some() {
62            let left_tuple = left_next_tuple.clone().unwrap();
63
64            let mut right_next_tuple = self.right_input.next(context)?;
65            while right_next_tuple.is_some() {
66                let right_tuple = right_next_tuple.unwrap();
67
68                // TODO judge if matches
69                if let Some(condition) = &self.condition {
70                    let merged_tuple =
71                        Tuple::try_merge(vec![left_tuple.clone(), right_tuple.clone()])?;
72                    if context.eval_predicate(condition, &merged_tuple)? {
73                        *self.left_tuple.lock().unwrap() = Some(left_tuple.clone());
74                        return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
75                    }
76                } else {
77                    // save latest left_next_result before return
78                    *self.left_tuple.lock().unwrap() = Some(left_tuple.clone());
79
80                    return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
81                }
82
83                right_next_tuple = self.right_input.next(context)?;
84            }
85
86            // reset right executor
87            self.right_input.init(context)?;
88            left_next_tuple = self.left_input.next(context)?;
89        }
90        Ok(None)
91    }
92
93    fn output_schema(&self) -> SchemaRef {
94        self.schema.clone()
95    }
96}
97
98impl std::fmt::Display for PhysicalNestedLoopJoin {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        write!(f, "NestedLoopJoin")
101    }
102}