quill_sql/execution/physical_plan/
nested_loop_join.rs

1//! Teaching-first nested loop join with optional predicate evaluation.
2
3use log::debug;
4use std::cell::RefCell;
5use std::sync::Arc;
6
7use crate::catalog::SchemaRef;
8use crate::expression::Expr;
9use crate::{
10    error::QuillSQLResult,
11    execution::{ExecutionContext, VolcanoExecutor},
12    plan::logical_plan::JoinType,
13    storage::tuple::Tuple,
14};
15
16use super::PhysicalPlan;
17
18#[derive(Debug)]
19pub struct PhysicalNestedLoopJoin {
20    pub join_type: JoinType,
21    pub condition: Option<Expr>,
22    pub left_input: Arc<PhysicalPlan>,
23    pub right_input: Arc<PhysicalPlan>,
24    pub schema: SchemaRef,
25
26    left_tuple: RefCell<Option<Tuple>>,
27}
28impl PhysicalNestedLoopJoin {
29    pub fn new(
30        join_type: JoinType,
31        condition: Option<Expr>,
32        left_input: Arc<PhysicalPlan>,
33        right_input: Arc<PhysicalPlan>,
34        schema: SchemaRef,
35    ) -> Self {
36        PhysicalNestedLoopJoin {
37            join_type,
38            condition,
39            left_input,
40            right_input,
41            schema,
42            left_tuple: RefCell::new(None),
43        }
44    }
45}
46impl VolcanoExecutor for PhysicalNestedLoopJoin {
47    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
48        debug!("init nested loop join executor");
49        self.left_input.init(context)?;
50        self.right_input.init(context)?;
51        self.left_tuple.borrow_mut().take();
52        Ok(())
53    }
54    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
55        let mut left_next_tuple = if self.left_tuple.borrow().is_none() {
56            self.left_input.next(context)?
57        } else {
58            self.left_tuple.borrow().clone()
59        };
60
61        while left_next_tuple.is_some() {
62            let left_tuple = left_next_tuple.clone().unwrap();
63
64            let mut right_next_tuple = self.right_input.next(context)?;
65            while right_next_tuple.is_some() {
66                let right_tuple = right_next_tuple.unwrap();
67
68                // TODO judge if matches
69                if let Some(condition) = &self.condition {
70                    let merged_tuple =
71                        Tuple::try_merge(vec![left_tuple.clone(), right_tuple.clone()])?;
72                    if context.eval_predicate(condition, &merged_tuple)? {
73                        self.left_tuple.borrow_mut().replace(left_tuple.clone());
74                        return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
75                    }
76                } else {
77                    // save latest left_next_result before return
78                    self.left_tuple.borrow_mut().replace(left_tuple.clone());
79
80                    return Ok(Some(Tuple::try_merge(vec![left_tuple, right_tuple])?));
81                }
82
83                right_next_tuple = self.right_input.next(context)?;
84            }
85
86            // reset right executor
87            self.right_input.init(context)?;
88            left_next_tuple = self.left_input.next(context)?;
89        }
90        Ok(None)
91    }
92
93    fn output_schema(&self) -> SchemaRef {
94        self.schema.clone()
95    }
96}
97
98impl std::fmt::Display for PhysicalNestedLoopJoin {
99    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100        write!(f, "NestedLoopJoin")
101    }
102}