1mod join;
5mod transform;
6mod vtable;
7
8use std::sync::Arc;
9
10use reifydb_core::interface::{catalog::id::IndexId, resolved::ResolvedShape};
11use reifydb_rql::{
12 nodes::{
13 AggregateNode as RqlAggregateNode, AssertNode as RqlAssertNode, GeneratorNode as RqlGeneratorNode,
14 InlineDataNode as RqlInlineDataNode, RowListLookupNode as RqlRowListLookupNode,
15 RowPointLookupNode as RqlRowPointLookupNode, RowRangeScanNode as RqlRowRangeScanNode,
16 SortNode as RqlSortNode, TakeLimit, TakeNode as RqlTakeNode,
17 },
18 query::QueryPlan as RqlQueryPlan,
19};
20use reifydb_transaction::transaction::Transaction;
21use reifydb_type::fragment::Fragment;
22use tracing::instrument;
23
24use super::{apply_transform::ApplyTransformNode, run_tests::RunTestsQueryNode};
25use crate::vm::{
26 stack::Variable,
27 volcano::{
28 aggregate::AggregateNode,
29 assert::{AssertNode, AssertWithoutInputNode},
30 distinct::DistinctNode,
31 environment::EnvironmentNode,
32 filter::FilterNode,
33 generator::GeneratorNode,
34 inline::InlineDataNode,
35 query::{QueryContext, QueryNode},
36 row_lookup::{RowListLookupNode, RowPointLookupNode, RowRangeScanNode},
37 scalarize::ScalarizeNode,
38 scan::{
39 dictionary::DictionaryScanNode, index::IndexScanNode, remote::RemoteFetchNode,
40 ringbuffer::RingBufferScan, series::SeriesScanNode as VolcanoSeriesScanNode,
41 table::TableScanNode, view::ViewScanNode,
42 },
43 sort::SortNode,
44 take::TakeNode,
45 top_k::TopKNode,
46 variable::VariableNode,
47 },
48};
49
50fn extract_source_name_from_query(plan: &RqlQueryPlan) -> Option<Fragment> {
51 match plan {
52 RqlQueryPlan::TableScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
53 RqlQueryPlan::ViewScan(node) => Some(Fragment::internal(node.source.def().name())),
54 RqlQueryPlan::RingBufferScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
55 RqlQueryPlan::DictionaryScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
56 RqlQueryPlan::SeriesScan(node) => Some(Fragment::internal(node.source.def().name.clone())),
57 RqlQueryPlan::RemoteScan(_) => None,
58 RqlQueryPlan::Assert(node) => node.input.as_ref().and_then(|p| extract_source_name_from_query(p)),
59 RqlQueryPlan::Filter(node) => extract_source_name_from_query(&node.input),
60 RqlQueryPlan::Map(node) => node.input.as_ref().and_then(|p| extract_source_name_from_query(p)),
61 RqlQueryPlan::Take(node) => extract_source_name_from_query(&node.input),
62 _ => None,
63 }
64}
65
66pub(crate) fn extract_resolved_source(plan: &RqlQueryPlan) -> Option<ResolvedShape> {
67 match plan {
68 RqlQueryPlan::TableScan(node) => Some(ResolvedShape::Table(node.source.clone())),
69 RqlQueryPlan::ViewScan(node) => Some(ResolvedShape::View(node.source.clone())),
70 RqlQueryPlan::RingBufferScan(node) => Some(ResolvedShape::RingBuffer(node.source.clone())),
71 RqlQueryPlan::DictionaryScan(node) => Some(ResolvedShape::Dictionary(node.source.clone())),
72 RqlQueryPlan::SeriesScan(node) => Some(ResolvedShape::Series(node.source.clone())),
73 RqlQueryPlan::RemoteScan(_) => None,
74 RqlQueryPlan::Filter(node) => extract_resolved_source(&node.input),
75 RqlQueryPlan::Assert(node) => node.input.as_ref().and_then(|p| extract_resolved_source(p)),
76 RqlQueryPlan::Map(node) => node.input.as_ref().and_then(|p| extract_resolved_source(p)),
77 RqlQueryPlan::Take(node) => extract_resolved_source(&node.input),
78 RqlQueryPlan::Sort(node) => extract_resolved_source(&node.input),
79 _ => None,
80 }
81}
82
83#[instrument(name = "volcano::compile", level = "debug", skip_all)]
84pub(crate) fn compile<'a>(
85 plan: RqlQueryPlan,
86 rx: &mut Transaction<'a>,
87 context: Arc<QueryContext>,
88) -> Box<dyn QueryNode> {
89 match plan {
90 RqlQueryPlan::Aggregate(RqlAggregateNode {
91 by,
92 map,
93 input,
94 }) => {
95 let input_node = compile(*input, rx, context.clone());
96 Box::new(AggregateNode::new(input_node, by, map, context))
97 }
98 RqlQueryPlan::Distinct(node) => {
99 let input = compile(*node.input, rx, context);
100 Box::new(DistinctNode::new(input, node.columns))
101 }
102
103 RqlQueryPlan::Filter(node) => transform::compile_filter(node, rx, context),
104 RqlQueryPlan::Gate(node) => {
105 let input_node = compile(*node.input, rx, context);
106 Box::new(FilterNode::new(input_node, node.conditions))
107 }
108 RqlQueryPlan::Map(node) => transform::compile_map(node, rx, context),
109 RqlQueryPlan::Extend(node) => transform::compile_extend(node, rx, context),
110 RqlQueryPlan::Patch(node) => transform::compile_patch(node, rx, context),
111
112 RqlQueryPlan::Sort(RqlSortNode {
113 by,
114 input,
115 }) => {
116 let input_node = compile(*input, rx, context);
117 Box::new(SortNode::new(input_node, by))
118 }
119 RqlQueryPlan::Take(RqlTakeNode {
120 take,
121 input,
122 }) => {
123 let limit = match take {
124 TakeLimit::Literal(n) => n,
125 TakeLimit::Variable(ref name) => context
126 .symbols
127 .get(name)
128 .and_then(|var| match var {
129 Variable::Columns {
130 columns: cols,
131 ..
132 } => cols.scalar_value().to_usize(),
133 _ => None,
134 })
135 .unwrap_or_else(|| panic!("TAKE variable ${} must be a numeric value", name)),
136 };
137
138 if let RqlQueryPlan::Sort(sort_node) = *input {
139 let input_node = compile(*sort_node.input, rx, context);
140 return Box::new(TopKNode::new(input_node, sort_node.by, limit));
141 }
142 let input_node = compile(*input, rx, context);
143 Box::new(TakeNode::new(input_node, limit))
144 }
145
146 RqlQueryPlan::JoinInner(node) => join::compile_inner_join(node, rx, context),
147 RqlQueryPlan::JoinLeft(node) => join::compile_left_join(node, rx, context),
148 RqlQueryPlan::JoinNatural(node) => join::compile_natural_join(node, rx, context),
149
150 RqlQueryPlan::Assert(RqlAssertNode {
151 conditions,
152 input,
153 message,
154 }) => {
155 if let Some(input) = input {
156 let input_node = compile(*input, rx, context);
157 Box::new(AssertNode::new(input_node, conditions, message))
158 } else {
159 Box::new(AssertWithoutInputNode::new(conditions, message))
160 }
161 }
162
163 RqlQueryPlan::TableScan(node) => {
164 Box::new(TableScanNode::new(node.source.clone(), context, rx).unwrap())
165 }
166 RqlQueryPlan::ViewScan(node) => Box::new(ViewScanNode::new(node.source.clone(), context).unwrap()),
167 RqlQueryPlan::RingBufferScan(node) => {
168 Box::new(RingBufferScan::new(node.source.clone(), context, rx).unwrap())
169 }
170 RqlQueryPlan::DictionaryScan(node) => {
171 Box::new(DictionaryScanNode::new(node.source.clone(), context).unwrap())
172 }
173 RqlQueryPlan::SeriesScan(node) => Box::new(
174 VolcanoSeriesScanNode::new(
175 node.source.clone(),
176 node.key_range_start,
177 node.key_range_end,
178 node.variant_tag,
179 context,
180 )
181 .unwrap(),
182 ),
183 RqlQueryPlan::IndexScan(node) => {
184 let table = node.source.def().clone();
185 let Some(pk) = table.primary_key.clone() else {
186 unimplemented!()
187 };
188 Box::new(IndexScanNode::new(table, IndexId::primary(pk.id), context).unwrap())
189 }
190 RqlQueryPlan::RemoteScan(node) => {
191 Box::new(RemoteFetchNode::new(node.address, node.token, node.remote_rql, node.variables))
192 }
193 RqlQueryPlan::TableVirtualScan(node) => vtable::compile_virtual_scan(node, context),
194
195 RqlQueryPlan::RowPointLookup(RqlRowPointLookupNode {
196 source,
197 row_number,
198 }) => {
199 let resolved_source = source;
200 Box::new(
201 RowPointLookupNode::new(resolved_source, row_number, context)
202 .expect("Failed to create RowPointLookupNode"),
203 )
204 }
205 RqlQueryPlan::RowListLookup(RqlRowListLookupNode {
206 source,
207 row_numbers,
208 }) => {
209 let resolved_source = source;
210 Box::new(
211 RowListLookupNode::new(resolved_source, row_numbers, context)
212 .expect("Failed to create RowListLookupNode"),
213 )
214 }
215 RqlQueryPlan::RowRangeScan(RqlRowRangeScanNode {
216 source,
217 start,
218 end,
219 }) => {
220 let resolved_source = source;
221 Box::new(
222 RowRangeScanNode::new(resolved_source, start, end, context)
223 .expect("Failed to create RowRangeScanNode"),
224 )
225 }
226
227 RqlQueryPlan::InlineData(RqlInlineDataNode {
228 rows,
229 }) => Box::new(InlineDataNode::new(rows, context)),
230 RqlQueryPlan::Generator(RqlGeneratorNode {
231 name,
232 expressions,
233 }) => Box::new(GeneratorNode::new(name, expressions)),
234 RqlQueryPlan::Variable(node) => Box::new(VariableNode::new(node.variable_expr)),
235 RqlQueryPlan::Environment(_) => Box::new(EnvironmentNode::new()),
236 RqlQueryPlan::Scalarize(node) => {
237 let input = compile(*node.input, rx, context.clone());
238 Box::new(ScalarizeNode::new(input))
239 }
240 RqlQueryPlan::Apply(node) => {
241 let operator_name = node.operator.text().to_string();
242 let transform = context
243 .services
244 .transforms
245 .get_transform(&operator_name)
246 .unwrap_or_else(|| panic!("Unknown transform: {}", operator_name));
247 let input = node.input.expect("Apply requires input");
248 let input_node = compile(*input, rx, context);
249 Box::new(ApplyTransformNode::new(input_node, transform))
250 }
251 RqlQueryPlan::RunTests(node) => Box::new(RunTestsQueryNode::new(node, context.clone())),
252 RqlQueryPlan::CallFunction(node) => Box::new(GeneratorNode::new(node.name, node.arguments)),
253
254 RqlQueryPlan::Window(_) => {
255 unimplemented!(
256 "Window operator is only supported in deferred views and requires the flow engine."
257 )
258 }
259 RqlQueryPlan::Append(_) => {
260 unimplemented!(
261 "Append operator is only supported in deferred views and requires the flow engine."
262 )
263 }
264 }
265}