1mod join;
5mod transform;
6mod vtable;
7
8use std::sync::Arc;
9
10use reifydb_core::interface::{catalog::id::IndexId, resolved::ResolvedShape};
11use reifydb_rql::{
12 nodes::{
13 AggregateNode as RqlAggregateNode, AssertNode as RqlAssertNode, GeneratorNode as RqlGeneratorNode,
14 InlineDataNode as RqlInlineDataNode, RowListLookupNode as RqlRowListLookupNode,
15 RowPointLookupNode as RqlRowPointLookupNode, RowRangeScanNode as RqlRowRangeScanNode,
16 SortNode as RqlSortNode, TakeLimit, TakeNode as RqlTakeNode,
17 },
18 query::QueryPlan as RqlQueryPlan,
19};
20use reifydb_transaction::transaction::Transaction;
21use reifydb_type::fragment::Fragment;
22use tracing::instrument;
23
24use super::{apply_transform::ApplyTransformNode, run_tests::RunTestsQueryNode};
25use crate::vm::{
26 stack::Variable,
27 volcano::{
28 aggregate::AggregateNode,
29 assert::{AssertNode, AssertWithoutInputNode},
30 distinct::DistinctNode,
31 environment::EnvironmentNode,
32 filter::FilterNode,
33 generator::GeneratorNode,
34 inline::InlineDataNode,
35 query::{QueryContext, QueryNode},
36 row_lookup::{RowListLookupNode, RowPointLookupNode, RowRangeScanNode},
37 scalarize::ScalarizeNode,
38 scan::{
39 dictionary::DictionaryScanNode, index::IndexScanNode, remote::RemoteFetchNode,
40 ringbuffer::RingBufferScan, series::SeriesScanNode as VolcanoSeriesScanNode,
41 table::TableScanNode, view::ViewScanNode,
42 },
43 sort::SortNode,
44 take::TakeNode,
45 top_k::TopKNode,
46 variable::VariableNode,
47 },
48};
49
50fn extract_source_name_from_query(plan: &RqlQueryPlan) -> Option<Fragment> {
54 match plan {
55 RqlQueryPlan::TableScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
56 RqlQueryPlan::ViewScan(node) => Some(Fragment::internal(node.source.def().name())),
57 RqlQueryPlan::RingBufferScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
58 RqlQueryPlan::DictionaryScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
59 RqlQueryPlan::SeriesScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
60 RqlQueryPlan::RemoteScan(_) => None,
61 RqlQueryPlan::Assert(node) => node.input.as_ref().and_then(|p| extract_source_name_from_query(p)),
62 RqlQueryPlan::Filter(node) => extract_source_name_from_query(&node.input),
63 RqlQueryPlan::Map(node) => node.input.as_ref().and_then(|p| extract_source_name_from_query(p)),
64 RqlQueryPlan::Take(node) => extract_source_name_from_query(&node.input),
65 _ => None,
66 }
67}
68
69pub(crate) fn extract_resolved_source(plan: &RqlQueryPlan) -> Option<ResolvedShape> {
70 match plan {
71 RqlQueryPlan::TableScan(node) => Some(ResolvedShape::Table(node.source.clone())),
72 RqlQueryPlan::ViewScan(node) => Some(ResolvedShape::View(node.source.clone())),
73 RqlQueryPlan::RingBufferScan(node) => Some(ResolvedShape::RingBuffer(node.source.clone())),
74 RqlQueryPlan::DictionaryScan(node) => Some(ResolvedShape::Dictionary(node.source.clone())),
75 RqlQueryPlan::SeriesScan(node) => Some(ResolvedShape::Series(node.source.clone())),
76 RqlQueryPlan::RemoteScan(_) => None,
77 RqlQueryPlan::Filter(node) => extract_resolved_source(&node.input),
78 RqlQueryPlan::Assert(node) => node.input.as_ref().and_then(|p| extract_resolved_source(p)),
79 RqlQueryPlan::Map(node) => node.input.as_ref().and_then(|p| extract_resolved_source(p)),
80 RqlQueryPlan::Take(node) => extract_resolved_source(&node.input),
81 RqlQueryPlan::Sort(node) => extract_resolved_source(&node.input),
82 _ => None,
83 }
84}
85
86#[instrument(name = "volcano::compile", level = "debug", skip_all)]
89pub(crate) fn compile<'a>(
90 plan: RqlQueryPlan,
91 rx: &mut Transaction<'a>,
92 context: Arc<QueryContext>,
93) -> Box<dyn QueryNode> {
94 match plan {
95 RqlQueryPlan::Aggregate(RqlAggregateNode {
96 by,
97 map,
98 input,
99 }) => {
100 let input_node = compile(*input, rx, context.clone());
101 Box::new(AggregateNode::new(input_node, by, map, context))
102 }
103 RqlQueryPlan::Distinct(node) => {
104 let input = compile(*node.input, rx, context);
105 Box::new(DistinctNode::new(input, node.columns))
106 }
107
108 RqlQueryPlan::Filter(node) => transform::compile_filter(node, rx, context),
109 RqlQueryPlan::Gate(node) => {
110 let input_node = compile(*node.input, rx, context);
111 Box::new(FilterNode::new(input_node, node.conditions))
112 }
113 RqlQueryPlan::Map(node) => transform::compile_map(node, rx, context),
114 RqlQueryPlan::Extend(node) => transform::compile_extend(node, rx, context),
115 RqlQueryPlan::Patch(node) => transform::compile_patch(node, rx, context),
116
117 RqlQueryPlan::Sort(RqlSortNode {
118 by,
119 input,
120 }) => {
121 let input_node = compile(*input, rx, context);
122 Box::new(SortNode::new(input_node, by))
123 }
124 RqlQueryPlan::Take(RqlTakeNode {
125 take,
126 input,
127 }) => {
128 let limit = match take {
129 TakeLimit::Literal(n) => n,
130 TakeLimit::Variable(ref name) => context
131 .symbols
132 .get(name)
133 .and_then(|var| match var {
134 Variable::Columns {
135 columns: cols,
136 ..
137 } => cols.scalar_value().to_usize(),
138 _ => None,
139 })
140 .unwrap_or_else(|| panic!("TAKE variable ${} must be a numeric value", name)),
141 };
142 if let RqlQueryPlan::Sort(sort_node) = *input {
144 let input_node = compile(*sort_node.input, rx, context);
145 return Box::new(TopKNode::new(input_node, sort_node.by, limit));
146 }
147 let mut input_node = compile(*input, rx, context);
148 input_node.set_scan_limit(limit);
150 Box::new(TakeNode::new(input_node, limit))
151 }
152
153 RqlQueryPlan::JoinInner(node) => join::compile_inner_join(node, rx, context),
154 RqlQueryPlan::JoinLeft(node) => join::compile_left_join(node, rx, context),
155 RqlQueryPlan::JoinNatural(node) => join::compile_natural_join(node, rx, context),
156
157 RqlQueryPlan::Assert(RqlAssertNode {
158 conditions,
159 input,
160 message,
161 }) => {
162 if let Some(input) = input {
163 let input_node = compile(*input, rx, context);
164 Box::new(AssertNode::new(input_node, conditions, message))
165 } else {
166 Box::new(AssertWithoutInputNode::new(conditions, message))
167 }
168 }
169
170 RqlQueryPlan::TableScan(node) => {
171 Box::new(TableScanNode::new(node.source.clone(), context, rx).unwrap())
172 }
173 RqlQueryPlan::ViewScan(node) => Box::new(ViewScanNode::new(node.source.clone(), context).unwrap()),
174 RqlQueryPlan::RingBufferScan(node) => {
175 Box::new(RingBufferScan::new(node.source.clone(), context, rx).unwrap())
176 }
177 RqlQueryPlan::DictionaryScan(node) => {
178 Box::new(DictionaryScanNode::new(node.source.clone(), context).unwrap())
179 }
180 RqlQueryPlan::SeriesScan(node) => Box::new(
181 VolcanoSeriesScanNode::new(
182 node.source.clone(),
183 node.key_range_start,
184 node.key_range_end,
185 node.variant_tag,
186 context,
187 )
188 .unwrap(),
189 ),
190 RqlQueryPlan::IndexScan(node) => {
191 let table = node.source.def().clone();
192 let Some(pk) = table.primary_key.clone() else {
193 unimplemented!()
194 };
195 Box::new(IndexScanNode::new(table, IndexId::primary(pk.id), context).unwrap())
196 }
197 RqlQueryPlan::RemoteScan(node) => {
198 Box::new(RemoteFetchNode::new(node.address, node.token, node.remote_rql, node.variables))
199 }
200 RqlQueryPlan::TableVirtualScan(node) => vtable::compile_virtual_scan(node, context),
201
202 RqlQueryPlan::RowPointLookup(RqlRowPointLookupNode {
203 source,
204 row_number,
205 }) => {
206 let resolved_source = source;
207 Box::new(
208 RowPointLookupNode::new(resolved_source, row_number, context)
209 .expect("Failed to create RowPointLookupNode"),
210 )
211 }
212 RqlQueryPlan::RowListLookup(RqlRowListLookupNode {
213 source,
214 row_numbers,
215 }) => {
216 let resolved_source = source;
217 Box::new(
218 RowListLookupNode::new(resolved_source, row_numbers, context)
219 .expect("Failed to create RowListLookupNode"),
220 )
221 }
222 RqlQueryPlan::RowRangeScan(RqlRowRangeScanNode {
223 source,
224 start,
225 end,
226 }) => {
227 let resolved_source = source;
228 Box::new(
229 RowRangeScanNode::new(resolved_source, start, end, context)
230 .expect("Failed to create RowRangeScanNode"),
231 )
232 }
233
234 RqlQueryPlan::InlineData(RqlInlineDataNode {
235 rows,
236 }) => Box::new(InlineDataNode::new(rows, context)),
237 RqlQueryPlan::Generator(RqlGeneratorNode {
238 name,
239 expressions,
240 }) => Box::new(GeneratorNode::new(name, expressions)),
241 RqlQueryPlan::Variable(node) => Box::new(VariableNode::new(node.variable_expr)),
242 RqlQueryPlan::Environment(_) => Box::new(EnvironmentNode::new()),
243 RqlQueryPlan::Scalarize(node) => {
244 let input = compile(*node.input, rx, context.clone());
245 Box::new(ScalarizeNode::new(input))
246 }
247 RqlQueryPlan::Apply(node) => {
248 let operator_name = node.operator.text().to_string();
249 let transform = context
250 .services
251 .transforms
252 .get_transform(&operator_name)
253 .unwrap_or_else(|| panic!("Unknown transform: {}", operator_name));
254 let input = node.input.expect("Apply requires input");
255 let input_node = compile(*input, rx, context);
256 Box::new(ApplyTransformNode::new(input_node, transform))
257 }
258 RqlQueryPlan::RunTests(node) => Box::new(RunTestsQueryNode::new(node, context.clone())),
259 RqlQueryPlan::CallFunction(node) => Box::new(GeneratorNode::new(node.name, node.arguments)),
260
261 RqlQueryPlan::Window(_) => {
262 unimplemented!(
263 "Window operator is only supported in deferred views and requires the flow engine."
264 )
265 }
266 RqlQueryPlan::Append(_) => {
267 unimplemented!(
268 "Append operator is only supported in deferred views and requires the flow engine."
269 )
270 }
271 }
272}