Skip to main content

reifydb_engine/vm/volcano/compile/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4mod join;
5mod transform;
6mod vtable;
7
8use std::sync::Arc;
9
10use reifydb_core::interface::{catalog::id::IndexId, resolved::ResolvedShape};
11use reifydb_rql::{
12	nodes::{
13		AggregateNode as RqlAggregateNode, AssertNode as RqlAssertNode, GeneratorNode as RqlGeneratorNode,
14		InlineDataNode as RqlInlineDataNode, RowListLookupNode as RqlRowListLookupNode,
15		RowPointLookupNode as RqlRowPointLookupNode, RowRangeScanNode as RqlRowRangeScanNode,
16		SortNode as RqlSortNode, TakeLimit, TakeNode as RqlTakeNode,
17	},
18	query::QueryPlan as RqlQueryPlan,
19};
20use reifydb_transaction::transaction::Transaction;
21use reifydb_type::fragment::Fragment;
22use tracing::instrument;
23
24use super::{apply_transform::ApplyTransformNode, run_tests::RunTestsQueryNode};
25use crate::vm::{
26	stack::Variable,
27	volcano::{
28		aggregate::AggregateNode,
29		assert::{AssertNode, AssertWithoutInputNode},
30		distinct::DistinctNode,
31		environment::EnvironmentNode,
32		filter::FilterNode,
33		generator::GeneratorNode,
34		inline::InlineDataNode,
35		query::{QueryContext, QueryNode},
36		row_lookup::{RowListLookupNode, RowPointLookupNode, RowRangeScanNode},
37		scalarize::ScalarizeNode,
38		scan::{
39			dictionary::DictionaryScanNode, index::IndexScanNode, remote::RemoteFetchNode,
40			ringbuffer::RingBufferScan, series::SeriesScanNode as VolcanoSeriesScanNode,
41			table::TableScanNode, view::ViewScanNode,
42		},
43		sort::SortNode,
44		take::TakeNode,
45		top_k::TopKNode,
46		variable::VariableNode,
47	},
48};
49
50// Helpers
51
52/// Extract the source name from a query plan if it's a scan node.
53fn extract_source_name_from_query(plan: &RqlQueryPlan) -> Option<Fragment> {
54	match plan {
55		RqlQueryPlan::TableScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
56		RqlQueryPlan::ViewScan(node) => Some(Fragment::internal(node.source.def().name())),
57		RqlQueryPlan::RingBufferScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
58		RqlQueryPlan::DictionaryScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
59		RqlQueryPlan::SeriesScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
60		RqlQueryPlan::RemoteScan(_) => None,
61		RqlQueryPlan::Assert(node) => node.input.as_ref().and_then(|p| extract_source_name_from_query(p)),
62		RqlQueryPlan::Filter(node) => extract_source_name_from_query(&node.input),
63		RqlQueryPlan::Map(node) => node.input.as_ref().and_then(|p| extract_source_name_from_query(p)),
64		RqlQueryPlan::Take(node) => extract_source_name_from_query(&node.input),
65		_ => None,
66	}
67}
68
69pub(crate) fn extract_resolved_source(plan: &RqlQueryPlan) -> Option<ResolvedShape> {
70	match plan {
71		RqlQueryPlan::TableScan(node) => Some(ResolvedShape::Table(node.source.clone())),
72		RqlQueryPlan::ViewScan(node) => Some(ResolvedShape::View(node.source.clone())),
73		RqlQueryPlan::RingBufferScan(node) => Some(ResolvedShape::RingBuffer(node.source.clone())),
74		RqlQueryPlan::DictionaryScan(node) => Some(ResolvedShape::Dictionary(node.source.clone())),
75		RqlQueryPlan::SeriesScan(node) => Some(ResolvedShape::Series(node.source.clone())),
76		RqlQueryPlan::RemoteScan(_) => None,
77		RqlQueryPlan::Filter(node) => extract_resolved_source(&node.input),
78		RqlQueryPlan::Assert(node) => node.input.as_ref().and_then(|p| extract_resolved_source(p)),
79		RqlQueryPlan::Map(node) => node.input.as_ref().and_then(|p| extract_resolved_source(p)),
80		RqlQueryPlan::Take(node) => extract_resolved_source(&node.input),
81		RqlQueryPlan::Sort(node) => extract_resolved_source(&node.input),
82		_ => None,
83	}
84}
85
86// Main compile function
87
88#[instrument(name = "volcano::compile", level = "debug", skip_all)]
89pub(crate) fn compile<'a>(
90	plan: RqlQueryPlan,
91	rx: &mut Transaction<'a>,
92	context: Arc<QueryContext>,
93) -> Box<dyn QueryNode> {
94	match plan {
95		RqlQueryPlan::Aggregate(RqlAggregateNode {
96			by,
97			map,
98			input,
99		}) => {
100			let input_node = compile(*input, rx, context.clone());
101			Box::new(AggregateNode::new(input_node, by, map, context))
102		}
103		RqlQueryPlan::Distinct(node) => {
104			let input = compile(*node.input, rx, context);
105			Box::new(DistinctNode::new(input, node.columns))
106		}
107
108		RqlQueryPlan::Filter(node) => transform::compile_filter(node, rx, context),
109		RqlQueryPlan::Gate(node) => {
110			let input_node = compile(*node.input, rx, context);
111			Box::new(FilterNode::new(input_node, node.conditions))
112		}
113		RqlQueryPlan::Map(node) => transform::compile_map(node, rx, context),
114		RqlQueryPlan::Extend(node) => transform::compile_extend(node, rx, context),
115		RqlQueryPlan::Patch(node) => transform::compile_patch(node, rx, context),
116
117		RqlQueryPlan::Sort(RqlSortNode {
118			by,
119			input,
120		}) => {
121			let input_node = compile(*input, rx, context);
122			Box::new(SortNode::new(input_node, by))
123		}
124		RqlQueryPlan::Take(RqlTakeNode {
125			take,
126			input,
127		}) => {
128			let limit = match take {
129				TakeLimit::Literal(n) => n,
130				TakeLimit::Variable(ref name) => context
131					.symbols
132					.get(name)
133					.and_then(|var| match var {
134						Variable::Columns {
135							columns: cols,
136							..
137						} => cols.scalar_value().to_usize(),
138						_ => None,
139					})
140					.unwrap_or_else(|| panic!("TAKE variable ${} must be a numeric value", name)),
141			};
142			// Optimize: TAKE over SORT becomes TopK
143			if let RqlQueryPlan::Sort(sort_node) = *input {
144				let input_node = compile(*sort_node.input, rx, context);
145				return Box::new(TopKNode::new(input_node, sort_node.by, limit));
146			}
147			let mut input_node = compile(*input, rx, context);
148			// Push limit hint into scan operators so they read fewer rows
149			input_node.set_scan_limit(limit);
150			Box::new(TakeNode::new(input_node, limit))
151		}
152
153		RqlQueryPlan::JoinInner(node) => join::compile_inner_join(node, rx, context),
154		RqlQueryPlan::JoinLeft(node) => join::compile_left_join(node, rx, context),
155		RqlQueryPlan::JoinNatural(node) => join::compile_natural_join(node, rx, context),
156
157		RqlQueryPlan::Assert(RqlAssertNode {
158			conditions,
159			input,
160			message,
161		}) => {
162			if let Some(input) = input {
163				let input_node = compile(*input, rx, context);
164				Box::new(AssertNode::new(input_node, conditions, message))
165			} else {
166				Box::new(AssertWithoutInputNode::new(conditions, message))
167			}
168		}
169
170		RqlQueryPlan::TableScan(node) => {
171			Box::new(TableScanNode::new(node.source.clone(), context, rx).unwrap())
172		}
173		RqlQueryPlan::ViewScan(node) => Box::new(ViewScanNode::new(node.source.clone(), context).unwrap()),
174		RqlQueryPlan::RingBufferScan(node) => {
175			Box::new(RingBufferScan::new(node.source.clone(), context, rx).unwrap())
176		}
177		RqlQueryPlan::DictionaryScan(node) => {
178			Box::new(DictionaryScanNode::new(node.source.clone(), context).unwrap())
179		}
180		RqlQueryPlan::SeriesScan(node) => Box::new(
181			VolcanoSeriesScanNode::new(
182				node.source.clone(),
183				node.key_range_start,
184				node.key_range_end,
185				node.variant_tag,
186				context,
187			)
188			.unwrap(),
189		),
190		RqlQueryPlan::IndexScan(node) => {
191			let table = node.source.def().clone();
192			let Some(pk) = table.primary_key.clone() else {
193				unimplemented!()
194			};
195			Box::new(IndexScanNode::new(table, IndexId::primary(pk.id), context).unwrap())
196		}
197		RqlQueryPlan::RemoteScan(node) => {
198			Box::new(RemoteFetchNode::new(node.address, node.token, node.remote_rql, node.variables))
199		}
200		RqlQueryPlan::TableVirtualScan(node) => vtable::compile_virtual_scan(node, context),
201
202		RqlQueryPlan::RowPointLookup(RqlRowPointLookupNode {
203			source,
204			row_number,
205		}) => {
206			let resolved_source = source;
207			Box::new(
208				RowPointLookupNode::new(resolved_source, row_number, context)
209					.expect("Failed to create RowPointLookupNode"),
210			)
211		}
212		RqlQueryPlan::RowListLookup(RqlRowListLookupNode {
213			source,
214			row_numbers,
215		}) => {
216			let resolved_source = source;
217			Box::new(
218				RowListLookupNode::new(resolved_source, row_numbers, context)
219					.expect("Failed to create RowListLookupNode"),
220			)
221		}
222		RqlQueryPlan::RowRangeScan(RqlRowRangeScanNode {
223			source,
224			start,
225			end,
226		}) => {
227			let resolved_source = source;
228			Box::new(
229				RowRangeScanNode::new(resolved_source, start, end, context)
230					.expect("Failed to create RowRangeScanNode"),
231			)
232		}
233
234		RqlQueryPlan::InlineData(RqlInlineDataNode {
235			rows,
236		}) => Box::new(InlineDataNode::new(rows, context)),
237		RqlQueryPlan::Generator(RqlGeneratorNode {
238			name,
239			expressions,
240		}) => Box::new(GeneratorNode::new(name, expressions)),
241		RqlQueryPlan::Variable(node) => Box::new(VariableNode::new(node.variable_expr)),
242		RqlQueryPlan::Environment(_) => Box::new(EnvironmentNode::new()),
243		RqlQueryPlan::Scalarize(node) => {
244			let input = compile(*node.input, rx, context.clone());
245			Box::new(ScalarizeNode::new(input))
246		}
247		RqlQueryPlan::Apply(node) => {
248			let operator_name = node.operator.text().to_string();
249			let transform = context
250				.services
251				.transforms
252				.get_transform(&operator_name)
253				.unwrap_or_else(|| panic!("Unknown transform: {}", operator_name));
254			let input = node.input.expect("Apply requires input");
255			let input_node = compile(*input, rx, context);
256			Box::new(ApplyTransformNode::new(input_node, transform))
257		}
258		RqlQueryPlan::RunTests(node) => Box::new(RunTestsQueryNode::new(node, context.clone())),
259		RqlQueryPlan::CallFunction(node) => Box::new(GeneratorNode::new(node.name, node.arguments)),
260
261		RqlQueryPlan::Window(_) => {
262			unimplemented!(
263				"Window operator is only supported in deferred views and requires the flow engine."
264			)
265		}
266		RqlQueryPlan::Append(_) => {
267			unimplemented!(
268				"Append operator is only supported in deferred views and requires the flow engine."
269			)
270		}
271	}
272}