Skip to main content

reifydb_engine/vm/volcano/join/
nested_loop.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::value::column::{columns::Columns, headers::ColumnHeaders};
5use reifydb_rql::expression::Expression;
6use reifydb_transaction::transaction::Transaction;
7use reifydb_type::{
8	fragment::Fragment,
9	value::{Value, row_number::RowNumber},
10};
11use tracing::instrument;
12
13use super::common::{JoinContext, build_eval_columns, load_and_merge_all, resolve_column_names};
14use crate::{
15	Result,
16	expression::{
17		compile::compile_expression,
18		context::{CompileContext, EvalContext},
19	},
20	vm::volcano::query::{QueryContext, QueryNode},
21};
22
23#[derive(Clone, Copy, PartialEq)]
24enum NestedLoopMode {
25	Inner,
26	Left,
27}
28
29pub struct NestedLoopJoinNode {
30	left: Box<dyn QueryNode>,
31	right: Box<dyn QueryNode>,
32	on: Vec<Expression>,
33	alias: Option<Fragment>,
34	mode: NestedLoopMode,
35	headers: Option<ColumnHeaders>,
36	context: JoinContext,
37}
38
39impl NestedLoopJoinNode {
40	pub(crate) fn new_inner(
41		left: Box<dyn QueryNode>,
42		right: Box<dyn QueryNode>,
43		on: Vec<Expression>,
44		alias: Option<Fragment>,
45	) -> Self {
46		Self {
47			left,
48			right,
49			on,
50			alias,
51			mode: NestedLoopMode::Inner,
52			headers: None,
53			context: JoinContext::new(),
54		}
55	}
56
57	pub(crate) fn new_left(
58		left: Box<dyn QueryNode>,
59		right: Box<dyn QueryNode>,
60		on: Vec<Expression>,
61		alias: Option<Fragment>,
62	) -> Self {
63		Self {
64			left,
65			right,
66			on,
67			alias,
68			mode: NestedLoopMode::Left,
69			headers: None,
70			context: JoinContext::new(),
71		}
72	}
73}
74
75impl QueryNode for NestedLoopJoinNode {
76	#[instrument(level = "trace", skip_all, name = "volcano::join::nested_loop::initialize")]
77	fn initialize<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &QueryContext) -> Result<()> {
78		let compile_ctx = CompileContext {
79			symbols: &ctx.symbols,
80		};
81		self.context.compiled =
82			self.on.iter().map(|e| compile_expression(&compile_ctx, e).expect("compile")).collect();
83		self.context.set(ctx);
84		self.left.initialize(rx, ctx)?;
85		self.right.initialize(rx, ctx)?;
86		Ok(())
87	}
88
89	#[instrument(level = "trace", skip_all, name = "volcano::join::nested_loop::next")]
90	fn next<'a>(&mut self, rx: &mut Transaction<'a>, ctx: &mut QueryContext) -> Result<Option<Columns>> {
91		debug_assert!(self.context.is_initialized(), "NestedLoopJoinNode::next() called before initialize()");
92		let _stored_ctx = self.context.get();
93
94		if self.headers.is_some() {
95			return Ok(None);
96		}
97
98		let left_columns = load_and_merge_all(&mut self.left, rx, ctx)?;
99		let right_columns = load_and_merge_all(&mut self.right, rx, ctx)?;
100
101		let left_rows = left_columns.row_count();
102		let right_rows = right_columns.row_count();
103		let right_width = right_columns.len();
104		let left_row_numbers = left_columns.row_numbers.to_vec();
105
106		// Resolve column names with conflict detection
107		let resolved = resolve_column_names(&left_columns, &right_columns, &self.alias, None);
108
109		let session = EvalContext::from_query(ctx);
110		let mut result_rows = Vec::new();
111		let mut result_row_numbers: Vec<RowNumber> = Vec::new();
112
113		for i in 0..left_rows {
114			let left_row = left_columns.get_row(i);
115
116			let mut matched = false;
117			for j in 0..right_rows {
118				let right_row = right_columns.get_row(j);
119
120				// Build evaluation columns
121				let eval_columns = build_eval_columns(
122					&left_columns,
123					&right_columns,
124					&left_row,
125					&right_row,
126					&self.alias,
127				);
128
129				let exec_ctx = session.with_eval_join(Columns::new(eval_columns));
130
131				let all_true = self.context.compiled.iter().fold(true, |acc, compiled_expr| {
132					let col = compiled_expr.execute(&exec_ctx).unwrap();
133					matches!(col.data().get_value(0), Value::Boolean(true)) && acc
134				});
135
136				if all_true {
137					let mut combined = left_row.clone();
138					combined.extend(right_row.clone());
139					result_rows.push(combined);
140					matched = true;
141					if !left_row_numbers.is_empty() {
142						result_row_numbers.push(left_row_numbers[i]);
143					}
144				}
145			}
146
147			// Add unmatched left rows with undefined values for right columns
148			if self.mode == NestedLoopMode::Left && !matched {
149				let mut combined = left_row.clone();
150				combined.extend(vec![Value::none(); right_width]);
151				result_rows.push(combined);
152				if !left_row_numbers.is_empty() {
153					result_row_numbers.push(left_row_numbers[i]);
154				}
155			}
156		}
157
158		// Create columns with conflict-resolved names
159		let names_refs: Vec<&str> = resolved.qualified_names.iter().map(|s| s.as_str()).collect();
160		let columns = if result_row_numbers.is_empty() {
161			Columns::from_rows(&names_refs, &result_rows)
162		} else {
163			Columns::from_rows(&names_refs, &result_rows).with_row_numbers(result_row_numbers)
164		};
165
166		self.headers = Some(ColumnHeaders::from_columns(&columns));
167		Ok(Some(columns))
168	}
169
170	fn headers(&self) -> Option<ColumnHeaders> {
171		self.headers.clone()
172	}
173}