reifydb_engine/vm/volcano/join/
nested_loop.rs1use reifydb_core::value::column::{columns::Columns, headers::ColumnHeaders};
5use reifydb_rql::expression::Expression;
6use reifydb_transaction::transaction::Transaction;
7use reifydb_type::{
8 fragment::Fragment,
9 value::{Value, row_number::RowNumber},
10};
11use tracing::instrument;
12
13use super::common::{JoinContext, build_eval_columns, load_and_merge_all, resolve_column_names};
14use crate::{
15 Result,
16 expression::{
17 compile::compile_expression,
18 context::{CompileContext, EvalContext},
19 },
20 vm::volcano::query::{QueryContext, QueryNode},
21};
22
23#[derive(Clone, Copy, PartialEq)]
24enum NestedLoopMode {
25 Inner,
26 Left,
27}
28
29pub struct NestedLoopJoinNode {
30 left: Box<dyn QueryNode>,
31 right: Box<dyn QueryNode>,
32 on: Vec<Expression>,
33 alias: Option<Fragment>,
34 mode: NestedLoopMode,
35 headers: Option<ColumnHeaders>,
36 context: JoinContext,
37}
38
39impl NestedLoopJoinNode {
40 pub(crate) fn new_inner(
41 left: Box<dyn QueryNode>,
42 right: Box<dyn QueryNode>,
43 on: Vec<Expression>,
44 alias: Option<Fragment>,
45 ) -> Self {
46 Self {
47 left,
48 right,
49 on,
50 alias,
51 mode: NestedLoopMode::Inner,
52 headers: None,
53 context: JoinContext::new(),
54 }
55 }
56
57 pub(crate) fn new_left(
58 left: Box<dyn QueryNode>,
59 right: Box<dyn QueryNode>,
60 on: Vec<Expression>,
61 alias: Option<Fragment>,
62 ) -> Self {
63 Self {
64 left,
65 right,
66 on,
67 alias,
68 mode: NestedLoopMode::Left,
69 headers: None,
70 context: JoinContext::new(),
71 }
72 }
73}
74
75impl QueryNode for NestedLoopJoinNode {
76 #[instrument(level = "trace", skip_all, name = "volcano::join::nested_loop::initialize")]
77 fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
78 let compile_ctx = CompileContext {
79 symbols: &ctx.symbols,
80 };
81 self.context.compiled =
82 self.on.iter().map(|e| compile_expression(&compile_ctx, e).expect("compile")).collect();
83 self.context.set(ctx);
84 self.left.initialize(rx, ctx)?;
85 self.right.initialize(rx, ctx)?;
86 Ok(())
87 }
88
89 #[instrument(level = "trace", skip_all, name = "volcano::join::nested_loop::next")]
90 fn next<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &mut QueryContext) -> Result<Option<Columns>> {
91 debug_assert!(self.context.is_initialized(), "NestedLoopJoinNode::next() called before initialize()");
92 let _stored_ctx = self.context.get();
93
94 if self.headers.is_some() {
95 return Ok(None);
96 }
97
98 let left_columns = load_and_merge_all(&mut self.left, rx, ctx)?;
99 let right_columns = load_and_merge_all(&mut self.right, rx, ctx)?;
100
101 let left_rows = left_columns.row_count();
102 let right_rows = right_columns.row_count();
103 let right_width = right_columns.len();
104 let left_row_numbers = left_columns.row_numbers.to_vec();
105
106 let resolved = resolve_column_names(&left_columns, &right_columns, &self.alias, None);
108
109 let session = EvalContext::from_query(ctx);
110 let mut result_rows = Vec::new();
111 let mut result_row_numbers: Vec<RowNumber> = Vec::new();
112
113 for i in 0..left_rows {
114 let left_row = left_columns.get_row(i);
115
116 let mut matched = false;
117 for j in 0..right_rows {
118 let right_row = right_columns.get_row(j);
119
120 let eval_columns = build_eval_columns(
122 &left_columns,
123 &right_columns,
124 &left_row,
125 &right_row,
126 &self.alias,
127 );
128
129 let exec_ctx = session.with_eval_join(Columns::new(eval_columns));
130
131 let all_true = self.context.compiled.iter().fold(true, |acc, compiled_expr| {
132 let col = compiled_expr.execute(&exec_ctx).unwrap();
133 matches!(col.data().get_value(0), Value::Boolean(true)) && acc
134 });
135
136 if all_true {
137 let mut combined = left_row.clone();
138 combined.extend(right_row.clone());
139 result_rows.push(combined);
140 matched = true;
141 if !left_row_numbers.is_empty() {
142 result_row_numbers.push(left_row_numbers[i]);
143 }
144 }
145 }
146
147 if self.mode == NestedLoopMode::Left && !matched {
149 let mut combined = left_row.clone();
150 combined.extend(vec![Value::none(); right_width]);
151 result_rows.push(combined);
152 if !left_row_numbers.is_empty() {
153 result_row_numbers.push(left_row_numbers[i]);
154 }
155 }
156 }
157
158 let names_refs: Vec<&str> = resolved.qualified_names.iter().map(|s| s.as_str()).collect();
160 let columns = if result_row_numbers.is_empty() {
161 Columns::from_rows(&names_refs, &result_rows)
162 } else {
163 Columns::from_rows(&names_refs, &result_rows).with_row_numbers(result_row_numbers)
164 };
165
166 self.headers = Some(ColumnHeaders::from_columns(&columns));
167 Ok(Some(columns))
168 }
169
170 fn headers(&self) -> Option<ColumnHeaders> {
171 self.headers.clone()
172 }
173}