Skip to main content

reifydb_engine/vm/volcano/compile/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4mod join;
5mod transform;
6mod vtable;
7
8use std::sync::Arc;
9
10use reifydb_core::interface::{catalog::id::IndexId, resolved::ResolvedShape};
11use reifydb_rql::{
12	nodes::{
13		AggregateNode as RqlAggregateNode, AssertNode as RqlAssertNode, GeneratorNode as RqlGeneratorNode,
14		InlineDataNode as RqlInlineDataNode, RowListLookupNode as RqlRowListLookupNode,
15		RowPointLookupNode as RqlRowPointLookupNode, RowRangeScanNode as RqlRowRangeScanNode,
16		SortNode as RqlSortNode, TakeLimit, TakeNode as RqlTakeNode,
17	},
18	query::QueryPlan as RqlQueryPlan,
19};
20use reifydb_transaction::transaction::Transaction;
21use reifydb_type::fragment::Fragment;
22use tracing::instrument;
23
24use super::{apply_transform::ApplyTransformNode, run_tests::RunTestsQueryNode};
25use crate::vm::{
26	stack::Variable,
27	volcano::{
28		aggregate::AggregateNode,
29		assert::{AssertNode, AssertWithoutInputNode},
30		distinct::DistinctNode,
31		environment::EnvironmentNode,
32		filter::FilterNode,
33		generator::GeneratorNode,
34		inline::InlineDataNode,
35		query::{QueryContext, QueryNode},
36		row_lookup::{RowListLookupNode, RowPointLookupNode, RowRangeScanNode},
37		scalarize::ScalarizeNode,
38		scan::{
39			dictionary::DictionaryScanNode, index::IndexScanNode, remote::RemoteFetchNode,
40			ringbuffer::RingBufferScan, series::SeriesScanNode as VolcanoSeriesScanNode,
41			table::TableScanNode, view::ViewScanNode,
42		},
43		sort::SortNode,
44		take::TakeNode,
45		top_k::TopKNode,
46		variable::VariableNode,
47	},
48};
49
50fn extract_source_name_from_query(plan: &RqlQueryPlan) -> Option<Fragment> {
51	match plan {
52		RqlQueryPlan::TableScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
53		RqlQueryPlan::ViewScan(node) => Some(Fragment::internal(node.source.def().name())),
54		RqlQueryPlan::RingBufferScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
55		RqlQueryPlan::DictionaryScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
56		RqlQueryPlan::SeriesScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
57		RqlQueryPlan::RemoteScan(_) => None,
58		RqlQueryPlan::Assert(node) => node.input.as_ref().and_then(|p| extract_source_name_from_query(p)),
59		RqlQueryPlan::Filter(node) => extract_source_name_from_query(&node.input),
60		RqlQueryPlan::Map(node) => node.input.as_ref().and_then(|p| extract_source_name_from_query(p)),
61		RqlQueryPlan::Take(node) => extract_source_name_from_query(&node.input),
62		_ => None,
63	}
64}
65
66pub(crate) fn extract_resolved_source(plan: &RqlQueryPlan) -> Option<ResolvedShape> {
67	match plan {
68		RqlQueryPlan::TableScan(node) => Some(ResolvedShape::Table(node.source.clone())),
69		RqlQueryPlan::ViewScan(node) => Some(ResolvedShape::View(node.source.clone())),
70		RqlQueryPlan::RingBufferScan(node) => Some(ResolvedShape::RingBuffer(node.source.clone())),
71		RqlQueryPlan::DictionaryScan(node) => Some(ResolvedShape::Dictionary(node.source.clone())),
72		RqlQueryPlan::SeriesScan(node) => Some(ResolvedShape::Series(node.source.clone())),
73		RqlQueryPlan::RemoteScan(_) => None,
74		RqlQueryPlan::Filter(node) => extract_resolved_source(&node.input),
75		RqlQueryPlan::Assert(node) => node.input.as_ref().and_then(|p| extract_resolved_source(p)),
76		RqlQueryPlan::Map(node) => node.input.as_ref().and_then(|p| extract_resolved_source(p)),
77		RqlQueryPlan::Take(node) => extract_resolved_source(&node.input),
78		RqlQueryPlan::Sort(node) => extract_resolved_source(&node.input),
79		_ => None,
80	}
81}
82
83#[instrument(name = "volcano::compile", level = "debug", skip_all)]
84pub(crate) fn compile<'a>(
85	plan: RqlQueryPlan,
86	rx: &mut Transaction<'a>,
87	context: Arc<QueryContext>,
88) -> Box<dyn QueryNode> {
89	match plan {
90		RqlQueryPlan::Aggregate(RqlAggregateNode {
91			by,
92			map,
93			input,
94		}) => {
95			let input_node = compile(*input, rx, context.clone());
96			Box::new(AggregateNode::new(input_node, by, map, context))
97		}
98		RqlQueryPlan::Distinct(node) => {
99			let input = compile(*node.input, rx, context);
100			Box::new(DistinctNode::new(input, node.columns))
101		}
102
103		RqlQueryPlan::Filter(node) => transform::compile_filter(node, rx, context),
104		RqlQueryPlan::Gate(node) => {
105			let input_node = compile(*node.input, rx, context);
106			Box::new(FilterNode::new(input_node, node.conditions))
107		}
108		RqlQueryPlan::Map(node) => transform::compile_map(node, rx, context),
109		RqlQueryPlan::Extend(node) => transform::compile_extend(node, rx, context),
110		RqlQueryPlan::Patch(node) => transform::compile_patch(node, rx, context),
111
112		RqlQueryPlan::Sort(RqlSortNode {
113			by,
114			input,
115		}) => {
116			let input_node = compile(*input, rx, context);
117			Box::new(SortNode::new(input_node, by))
118		}
119		RqlQueryPlan::Take(RqlTakeNode {
120			take,
121			input,
122		}) => {
123			let limit = match take {
124				TakeLimit::Literal(n) => n,
125				TakeLimit::Variable(ref name) => context
126					.symbols
127					.get(name)
128					.and_then(|var| match var {
129						Variable::Columns {
130							columns: cols,
131							..
132						} => cols.scalar_value().to_usize(),
133						_ => None,
134					})
135					.unwrap_or_else(|| panic!("TAKE variable ${} must be a numeric value", name)),
136			};
137
138			if let RqlQueryPlan::Sort(sort_node) = *input {
139				let input_node = compile(*sort_node.input, rx, context);
140				return Box::new(TopKNode::new(input_node, sort_node.by, limit));
141			}
142			let input_node = compile(*input, rx, context);
143			Box::new(TakeNode::new(input_node, limit))
144		}
145
146		RqlQueryPlan::JoinInner(node) => join::compile_inner_join(node, rx, context),
147		RqlQueryPlan::JoinLeft(node) => join::compile_left_join(node, rx, context),
148		RqlQueryPlan::JoinNatural(node) => join::compile_natural_join(node, rx, context),
149
150		RqlQueryPlan::Assert(RqlAssertNode {
151			conditions,
152			input,
153			message,
154		}) => {
155			if let Some(input) = input {
156				let input_node = compile(*input, rx, context);
157				Box::new(AssertNode::new(input_node, conditions, message))
158			} else {
159				Box::new(AssertWithoutInputNode::new(conditions, message))
160			}
161		}
162
163		RqlQueryPlan::TableScan(node) => {
164			Box::new(TableScanNode::new(node.source.clone(), context, rx).unwrap())
165		}
166		RqlQueryPlan::ViewScan(node) => Box::new(ViewScanNode::new(node.source.clone(), context).unwrap()),
167		RqlQueryPlan::RingBufferScan(node) => {
168			Box::new(RingBufferScan::new(node.source.clone(), context, rx).unwrap())
169		}
170		RqlQueryPlan::DictionaryScan(node) => {
171			Box::new(DictionaryScanNode::new(node.source.clone(), context).unwrap())
172		}
173		RqlQueryPlan::SeriesScan(node) => Box::new(
174			VolcanoSeriesScanNode::new(
175				node.source.clone(),
176				node.key_range_start,
177				node.key_range_end,
178				node.variant_tag,
179				context,
180			)
181			.unwrap(),
182		),
183		RqlQueryPlan::IndexScan(node) => {
184			let table = node.source.def().clone();
185			let Some(pk) = table.primary_key.clone() else {
186				unimplemented!()
187			};
188			Box::new(IndexScanNode::new(table, IndexId::primary(pk.id), context).unwrap())
189		}
190		RqlQueryPlan::RemoteScan(node) => {
191			Box::new(RemoteFetchNode::new(node.address, node.token, node.remote_rql, node.variables))
192		}
193		RqlQueryPlan::TableVirtualScan(node) => vtable::compile_virtual_scan(node, context),
194
195		RqlQueryPlan::RowPointLookup(RqlRowPointLookupNode {
196			source,
197			row_number,
198		}) => {
199			let resolved_source = source;
200			Box::new(
201				RowPointLookupNode::new(resolved_source, row_number, context)
202					.expect("Failed to create RowPointLookupNode"),
203			)
204		}
205		RqlQueryPlan::RowListLookup(RqlRowListLookupNode {
206			source,
207			row_numbers,
208		}) => {
209			let resolved_source = source;
210			Box::new(
211				RowListLookupNode::new(resolved_source, row_numbers, context)
212					.expect("Failed to create RowListLookupNode"),
213			)
214		}
215		RqlQueryPlan::RowRangeScan(RqlRowRangeScanNode {
216			source,
217			start,
218			end,
219		}) => {
220			let resolved_source = source;
221			Box::new(
222				RowRangeScanNode::new(resolved_source, start, end, context)
223					.expect("Failed to create RowRangeScanNode"),
224			)
225		}
226
227		RqlQueryPlan::InlineData(RqlInlineDataNode {
228			rows,
229		}) => Box::new(InlineDataNode::new(rows, context)),
230		RqlQueryPlan::Generator(RqlGeneratorNode {
231			name,
232			expressions,
233		}) => Box::new(GeneratorNode::new(name, expressions)),
234		RqlQueryPlan::Variable(node) => Box::new(VariableNode::new(node.variable_expr)),
235		RqlQueryPlan::Environment(_) => Box::new(EnvironmentNode::new()),
236		RqlQueryPlan::Scalarize(node) => {
237			let input = compile(*node.input, rx, context.clone());
238			Box::new(ScalarizeNode::new(input))
239		}
240		RqlQueryPlan::Apply(node) => {
241			let operator_name = node.operator.text().to_string();
242			let transform = context
243				.services
244				.transforms
245				.get_transform(&operator_name)
246				.unwrap_or_else(|| panic!("Unknown transform: {}", operator_name));
247			let input = node.input.expect("Apply requires input");
248			let input_node = compile(*input, rx, context);
249			Box::new(ApplyTransformNode::new(input_node, transform))
250		}
251		RqlQueryPlan::RunTests(node) => Box::new(RunTestsQueryNode::new(node, context.clone())),
252		RqlQueryPlan::CallFunction(node) => Box::new(GeneratorNode::new(node.name, node.arguments)),
253
254		RqlQueryPlan::Window(_) => {
255			unimplemented!(
256				"Window operator is only supported in deferred views and requires the flow engine."
257			)
258		}
259		RqlQueryPlan::Append(_) => {
260			unimplemented!(
261				"Append operator is only supported in deferred views and requires the flow engine."
262			)
263		}
264	}
265}