1use std::{fmt::Display, sync::LazyLock};
5
6use bumpalo::Bump;
7use reifydb_core::{
8 common::JoinType,
9 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
10};
11use reifydb_rql::{
12 ast::{ast::AstAlterPolicyAction, parse_str},
13 plan::logical::{
14 AggregateNode, AlterSequenceNode, AppendNode, AssertNode, CreateColumnPropertyNode, CreateIndexNode,
15 CreatePrimaryKeyNode, DistinctNode, ExtendNode, FilterNode, GateNode, GeneratorNode, InlineDataNode,
16 JoinInnerNode, JoinLeftNode, JoinNaturalNode, LogicalPlan, MapNode, OrderNode, PatchNode,
17 RemoteScanNode, ShapeScanNode, TakeNode, VariableSourceNode, compile_logical,
18 },
19};
20use reifydb_type::value::r#type::Type;
21
22use crate::{
23 procedure::rql::extract_query,
24 routine::{Routine, RoutineInfo, context::ProcedureContext, error::RoutineError},
25};
26
27static INFO: LazyLock<RoutineInfo> = LazyLock::new(|| RoutineInfo::new("rql::logical"));
28
29pub struct RqlLogical;
30
31impl Default for RqlLogical {
32 fn default() -> Self {
33 Self::new()
34 }
35}
36
37impl RqlLogical {
38 pub fn new() -> Self {
39 Self
40 }
41}
42
43impl<'a, 'tx> Routine<ProcedureContext<'a, 'tx>> for RqlLogical {
44 fn info(&self) -> &RoutineInfo {
45 &INFO
46 }
47
48 fn return_type(&self, _input_types: &[Type]) -> Type {
49 Type::Any
50 }
51
52 fn attaches_row_metadata(&self) -> bool {
53 false
54 }
55
56 fn execute(&self, ctx: &mut ProcedureContext<'a, 'tx>, _args: &Columns) -> Result<Columns, RoutineError> {
57 let query = extract_query(ctx.params, "rql::logical")?;
58
59 let bump = Bump::new();
60 let statements = parse_str(&bump, query.as_str())?;
61
62 let mut walker = LogicalWalker::default();
63 for statement in statements {
64 let plans = compile_logical(&bump, ctx.catalog, ctx.tx, statement)?;
65 for plan in plans.iter() {
66 walker.walk(plan, 0, None);
67 }
68 }
69
70 Ok(walker.into_columns())
71 }
72}
73
74#[derive(Default)]
75struct LogicalWalker {
76 idx: Vec<i32>,
77 depth: Vec<i32>,
78 parent: Vec<Option<i32>>,
79 kind: Vec<String>,
80 detail: Vec<String>,
81}
82
83impl LogicalWalker {
84 fn emit(&mut self, depth: i32, parent: Option<i32>, kind: &str, detail: String) -> i32 {
85 let next = self.idx.len() as i32;
86 self.idx.push(next);
87 self.depth.push(depth);
88 self.parent.push(parent);
89 self.kind.push(kind.to_string());
90 self.detail.push(detail);
91 next
92 }
93
94 fn into_columns(self) -> Columns {
95 Columns::new(vec![
96 ColumnWithName::int4("idx", self.idx),
97 ColumnWithName::int4("depth", self.depth),
98 ColumnWithName::new("parent", ColumnBuffer::int4_optional(self.parent)),
99 ColumnWithName::utf8("kind", self.kind),
100 ColumnWithName::utf8("detail", self.detail),
101 ])
102 }
103
104 fn walk(&mut self, plan: &LogicalPlan<'_>, depth: i32, parent: Option<i32>) {
105 let (kind, detail) = describe(plan);
106 let me = self.emit(depth, parent, kind, detail);
107 self.recurse_children(plan, depth + 1, Some(me));
108 }
109
110 fn recurse_children(&mut self, plan: &LogicalPlan<'_>, depth: i32, parent: Option<i32>) {
111 match plan {
112 LogicalPlan::Pipeline(pipeline) => {
113 for step in pipeline.steps.iter() {
114 self.walk(step, depth, parent);
115 }
116 }
117 LogicalPlan::JoinInner(JoinInnerNode {
118 with,
119 ..
120 })
121 | LogicalPlan::JoinLeft(JoinLeftNode {
122 with,
123 ..
124 })
125 | LogicalPlan::JoinNatural(JoinNaturalNode {
126 with,
127 ..
128 }) => {
129 for child in with.iter() {
130 self.walk(child, depth, parent);
131 }
132 }
133 LogicalPlan::Append(AppendNode::Query {
134 with,
135 }) => {
136 for child in with.iter() {
137 self.walk(child, depth, parent);
138 }
139 }
140 LogicalPlan::Scalarize(scalarize) => {
141 self.walk(&scalarize.input, depth, parent);
142 }
143 LogicalPlan::DeleteTable(node) => {
144 if let Some(input) = &node.input {
145 self.walk(input, depth, parent);
146 }
147 }
148 LogicalPlan::DeleteRingBuffer(node) => {
149 if let Some(input) = &node.input {
150 self.walk(input, depth, parent);
151 }
152 }
153 LogicalPlan::Update(node) => {
154 if let Some(input) = &node.input {
155 self.walk(input, depth, parent);
156 }
157 }
158 LogicalPlan::UpdateRingBuffer(node) => {
159 if let Some(input) = &node.input {
160 self.walk(input, depth, parent);
161 }
162 }
163 LogicalPlan::UpdateSeries(node) => {
164 if let Some(input) = &node.input {
165 self.walk(input, depth, parent);
166 }
167 }
168 LogicalPlan::Conditional(node) => {
169 self.walk(&node.then_branch, depth, parent);
170 for else_if in node.else_ifs.iter() {
171 self.walk(&else_if.then_branch, depth, parent);
172 }
173 if let Some(else_branch) = &node.else_branch {
174 self.walk(else_branch, depth, parent);
175 }
176 }
177 LogicalPlan::DefineFunction(def) => {
178 for stmt in def.body.iter() {
179 for child in stmt.iter() {
180 self.walk(child, depth, parent);
181 }
182 }
183 }
184 LogicalPlan::DefineClosure(def) => {
185 for stmt in def.body.iter() {
186 for child in stmt.iter() {
187 self.walk(child, depth, parent);
188 }
189 }
190 }
191 _ => {}
192 }
193 }
194}
195
196fn describe(plan: &LogicalPlan<'_>) -> (&'static str, String) {
197 match plan {
198 LogicalPlan::Loop(_) => ("Loop", String::new()),
199 LogicalPlan::While(_) => ("While", String::new()),
200 LogicalPlan::For(_) => ("For", String::new()),
201 LogicalPlan::Break => ("Break", String::new()),
202 LogicalPlan::Continue => ("Continue", String::new()),
203 LogicalPlan::CreateDeferredView(_) => ("CreateDeferredView", String::new()),
204 LogicalPlan::CreateTransactionalView(_) => ("CreateTransactionalView", String::new()),
205 LogicalPlan::CreateNamespace(_) => ("CreateNamespace", String::new()),
206 LogicalPlan::CreateRemoteNamespace(_) => ("CreateRemoteNamespace", String::new()),
207 LogicalPlan::CreateSequence(_) => ("CreateSequence", String::new()),
208 LogicalPlan::CreateTable(_) => ("CreateTable", String::new()),
209 LogicalPlan::CreateRingBuffer(_) => ("CreateRingBuffer", String::new()),
210 LogicalPlan::CreateDictionary(_) => ("CreateDictionary", String::new()),
211 LogicalPlan::CreateSumType(_) => ("CreateSumType", String::new()),
212 LogicalPlan::CreateSubscription(_) => ("CreateSubscription", String::new()),
213 LogicalPlan::DropNamespace(_) => ("DropNamespace", String::new()),
214 LogicalPlan::DropTable(_) => ("DropTable", String::new()),
215 LogicalPlan::DropView(_) => ("DropView", String::new()),
216 LogicalPlan::DropRingBuffer(_) => ("DropRingBuffer", String::new()),
217 LogicalPlan::DropDictionary(_) => ("DropDictionary", String::new()),
218 LogicalPlan::DropSumType(_) => ("DropSumType", String::new()),
219 LogicalPlan::DropSubscription(_) => ("DropSubscription", String::new()),
220 LogicalPlan::DropSeries(_) => ("DropSeries", String::new()),
221 LogicalPlan::DropProcedure(_) => ("DropProcedure", String::new()),
222 LogicalPlan::DropHandler(_) => ("DropHandler", String::new()),
223 LogicalPlan::DropTest(_) => ("DropTest", String::new()),
224 LogicalPlan::CreateSource(_) => ("CreateSource", String::new()),
225 LogicalPlan::CreateSink(_) => ("CreateSink", String::new()),
226 LogicalPlan::CreateBinding(_) => ("CreateBinding", String::new()),
227 LogicalPlan::DropSource(_) => ("DropSource", String::new()),
228 LogicalPlan::DropSink(_) => ("DropSink", String::new()),
229 LogicalPlan::DropBinding(_) => ("DropBinding", String::new()),
230 LogicalPlan::CreateIdentity(n) => ("CreateIdentity", format!("name={}", n.name.text())),
231 LogicalPlan::CreateRole(n) => ("CreateRole", format!("name={}", n.name.text())),
232 LogicalPlan::Grant(n) => ("Grant", format!("role={} user={}", n.role.text(), n.user.text())),
233 LogicalPlan::Revoke(n) => ("Revoke", format!("role={} user={}", n.role.text(), n.user.text())),
234 LogicalPlan::DropIdentity(n) => {
235 ("DropIdentity", format!("name={} if_exists={}", n.name.text(), n.if_exists))
236 }
237 LogicalPlan::DropRole(n) => ("DropRole", format!("name={} if_exists={}", n.name.text(), n.if_exists)),
238 LogicalPlan::CreateAuthentication(n) => ("CreateAuthentication", format!("user={}", n.user.text())),
239 LogicalPlan::DropAuthentication(n) => {
240 ("DropAuthentication", format!("user={} if_exists={}", n.user.text(), n.if_exists))
241 }
242 LogicalPlan::CreatePolicy(n) => {
243 let name = n.name.as_ref().map(|f| f.text()).unwrap_or("<unnamed>");
244 ("CreatePolicy", format!("name={} type={:?}", name, n.target_type))
245 }
246 LogicalPlan::AlterPolicy(n) => {
247 let enabled = n.action == AstAlterPolicyAction::Enable;
248 ("AlterPolicy", format!("name={} enabled={}", n.name.text(), enabled))
249 }
250 LogicalPlan::DropPolicy(n) => {
251 ("DropPolicy", format!("name={} if_exists={}", n.name.text(), n.if_exists))
252 }
253 LogicalPlan::AlterSequence(AlterSequenceNode {
254 sequence,
255 column,
256 value,
257 }) => {
258 let namespace =
259 sequence.namespace.first().map(|s| format!("{}.", s.text())).unwrap_or_default();
260 (
261 "AlterSequence",
262 format!("{}{}.{} = {}", namespace, sequence.name.text(), column.name.text(), value),
263 )
264 }
265 LogicalPlan::CreateIndex(CreateIndexNode {
266 index_type,
267 index,
268 columns,
269 ..
270 }) => {
271 let cols = columns
272 .iter()
273 .map(|c| {
274 if let Some(order) = &c.order {
275 format!("{} {:?}", c.column.text(), order)
276 } else {
277 c.column.text().to_string()
278 }
279 })
280 .collect::<Vec<_>>()
281 .join(", ");
282 (
283 "CreateIndex",
284 format!(
285 "type={:?} name={} table={} columns=[{}]",
286 index_type,
287 index.name.text(),
288 index.table.text(),
289 cols
290 ),
291 )
292 }
293 LogicalPlan::DeleteTable(node) => {
294 let target = node
295 .target
296 .as_ref()
297 .map(|t| {
298 let ns = t.namespace.first().map(|n| n.text()).unwrap_or("default");
299 format!("{}::{}", ns, t.name.text())
300 })
301 .unwrap_or_else(|| "<inferred>".to_string());
302 ("DeleteTable", format!("target={}", target))
303 }
304 LogicalPlan::DeleteRingBuffer(node) => {
305 let ns = node.target.namespace.first().map(|n| n.text()).unwrap_or("default");
306 ("DeleteRingBuffer", format!("target={}::{}", ns, node.target.name.text()))
307 }
308 LogicalPlan::InsertTable(_) => ("InsertTable", String::new()),
309 LogicalPlan::InsertRingBuffer(_) => ("InsertRingBuffer", String::new()),
310 LogicalPlan::InsertDictionary(_) => ("InsertDictionary", String::new()),
311 LogicalPlan::InsertSeries(_) => ("InsertSeries", String::new()),
312 LogicalPlan::DeleteSeries(_) => ("DeleteSeries", String::new()),
313 LogicalPlan::UpdateSeries(node) => {
314 let ns = node.target.namespace.first().map(|n| n.text()).unwrap_or("default");
315 ("UpdateSeries", format!("target={}::{}", ns, node.target.name.text()))
316 }
317 LogicalPlan::Update(node) => {
318 let target = node
319 .target
320 .as_ref()
321 .map(|t| {
322 let ns = t.namespace.first().map(|n| n.text()).unwrap_or("default");
323 format!("{}::{}", ns, t.name.text())
324 })
325 .unwrap_or_else(|| "<inferred>".to_string());
326 ("Update", format!("target={}", target))
327 }
328 LogicalPlan::UpdateRingBuffer(node) => {
329 let ns = node.target.namespace.first().map(|n| n.text()).unwrap_or("default");
330 ("UpdateRingBuffer", format!("target={}::{}", ns, node.target.name.text()))
331 }
332 LogicalPlan::Take(TakeNode {
333 take,
334 }) => ("Take", take.to_string()),
335 LogicalPlan::Assert(AssertNode {
336 condition,
337 message,
338 ..
339 }) => {
340 let msg = message.as_deref().unwrap_or("assertion failed");
341 ("Assert", format!("\"{}\" condition: {}", msg, condition))
342 }
343 LogicalPlan::AssertBlock(node) => {
344 let kind = if node.expect_error {
345 "AssertError"
346 } else {
347 "AssertBlock"
348 };
349 let msg = node.message.as_deref().unwrap_or("");
350 (kind, format!("\"{}\"", msg))
351 }
352 LogicalPlan::Filter(FilterNode {
353 condition,
354 ..
355 }) => ("Filter", format!("condition: {}", condition)),
356 LogicalPlan::Gate(GateNode {
357 condition,
358 ..
359 }) => ("Gate", format!("condition: {}", condition)),
360 LogicalPlan::Map(MapNode {
361 map,
362 ..
363 }) => ("Map", expressions_inline(map)),
364 LogicalPlan::Extend(ExtendNode {
365 extend,
366 ..
367 }) => ("Extend", expressions_inline(extend)),
368 LogicalPlan::Patch(PatchNode {
369 assignments,
370 ..
371 }) => ("Patch", expressions_inline(assignments)),
372 LogicalPlan::Aggregate(AggregateNode {
373 by,
374 map,
375 ..
376 }) => {
377 let by_str = expressions_inline(by);
378 let map_str = expressions_inline(map);
379 ("Aggregate", format!("by=[{}] map=[{}]", by_str, map_str))
380 }
381 LogicalPlan::Order(OrderNode {
382 by,
383 ..
384 }) => ("Order", expressions_inline(by)),
385 LogicalPlan::JoinInner(JoinInnerNode {
386 on,
387 ..
388 }) => ("JoinInner", expressions_inline(on)),
389 LogicalPlan::JoinLeft(JoinLeftNode {
390 on,
391 ..
392 }) => ("JoinLeft", expressions_inline(on)),
393 LogicalPlan::JoinNatural(JoinNaturalNode {
394 join_type,
395 ..
396 }) => {
397 let kind = match join_type {
398 JoinType::Inner => "Inner",
399 JoinType::Left => "Left",
400 };
401 ("JoinNatural", format!("type={}", kind))
402 }
403 LogicalPlan::PrimitiveScan(ShapeScanNode {
404 source,
405 index,
406 ..
407 }) => {
408 let name =
409 source.fully_qualified_name().unwrap_or_else(|| source.identifier().text().to_string());
410 if let Some(idx) = index {
411 ("IndexScan", format!("{}::{}", name, idx.identifier().text()))
412 } else {
413 ("TableScan", name)
414 }
415 }
416 LogicalPlan::RemoteScan(RemoteScanNode {
417 address,
418 local_namespace,
419 remote_name,
420 ..
421 }) => ("RemoteScan", format!("{}::{} @ {}", local_namespace, remote_name, address)),
422 LogicalPlan::InlineData(InlineDataNode {
423 rows,
424 }) => {
425 let total_fields: usize = rows.iter().map(|row| row.len()).sum();
426 ("InlineData", format!("rows={} fields={}", rows.len(), total_fields))
427 }
428 LogicalPlan::Generator(GeneratorNode {
429 name,
430 expressions,
431 }) => ("Generator", format!("name={} parameters={}", name.text(), expressions.len())),
432 LogicalPlan::VariableSource(VariableSourceNode {
433 name,
434 }) => ("VariableSource", name.text().to_string()),
435 LogicalPlan::Environment(_) => ("Environment", String::new()),
436 LogicalPlan::Distinct(DistinctNode {
437 columns,
438 ..
439 }) => {
440 let detail = if columns.is_empty() {
441 "primary key".to_string()
442 } else {
443 columns.iter().map(|c| c.name.text().to_string()).collect::<Vec<_>>().join(", ")
444 };
445 ("Distinct", detail)
446 }
447 LogicalPlan::Apply(apply) => {
448 ("Apply", format!("operator={} arguments={}", apply.operator.text(), apply.arguments.len()))
449 }
450 LogicalPlan::Pipeline(_) => ("Pipeline", String::new()),
451 LogicalPlan::CreatePrimaryKey(CreatePrimaryKeyNode {
452 ..
453 }) => ("CreatePrimaryKey", String::new()),
454 LogicalPlan::CreateColumnProperty(CreateColumnPropertyNode {
455 ..
456 }) => ("CreateColumnProperty", String::new()),
457 LogicalPlan::CreateProcedure(_) => ("CreateProcedure", String::new()),
458 LogicalPlan::CreateSeries(_) => ("CreateSeries", String::new()),
459 LogicalPlan::CreateEvent(_) => ("CreateEvent", String::new()),
460 LogicalPlan::CreateTag(_) => ("CreateTag", String::new()),
461 LogicalPlan::CreateTest(_) => ("CreateTest", String::new()),
462 LogicalPlan::RunTests(_) => ("RunTests", String::new()),
463 LogicalPlan::CreateMigration(_) => ("CreateMigration", String::new()),
464 LogicalPlan::Migrate(_) => ("Migrate", String::new()),
465 LogicalPlan::RollbackMigration(_) => ("RollbackMigration", String::new()),
466 LogicalPlan::Dispatch(_) => ("Dispatch", String::new()),
467 LogicalPlan::Window(window) => {
468 let group = window.group_by.len();
469 let agg = window.aggregations.len();
470 ("Window", format!("kind={:?} group_by={} aggregations={}", window.kind, group, agg))
471 }
472 LogicalPlan::Declare(node) => ("Declare", format!("{} = {}", node.name.text(), node.value)),
473 LogicalPlan::Assign(node) => ("Assign", format!("{} = {}", node.name.text(), node.value)),
474 LogicalPlan::Conditional(node) => ("Conditional", format!("if {}", node.condition)),
475 LogicalPlan::Scalarize(_) => ("Scalarize", String::new()),
476 LogicalPlan::AlterTable(node) => {
477 let name = if let Some(ns) = node.table.namespace.first() {
478 format!("{}::{}", ns.text(), node.table.name.text())
479 } else {
480 node.table.name.text().to_string()
481 };
482 ("AlterTable", name)
483 }
484 LogicalPlan::AlterRemoteNamespace(_) => ("AlterRemoteNamespace", String::new()),
485 LogicalPlan::DefineFunction(def) => {
486 let params: Vec<String> = def
487 .parameters
488 .iter()
489 .map(|p| {
490 if let Some(ref tc) = p.type_constraint {
491 format!("${}: {:?}", p.name.text(), tc)
492 } else {
493 format!("${}", p.name.text())
494 }
495 })
496 .collect();
497 let ret = if let Some(ref rt) = def.return_type {
498 format!(" -> {:?}", rt)
499 } else {
500 String::new()
501 };
502 ("DefineFunction", format!("{}[{}]{}", def.name.text(), params.join(", "), ret))
503 }
504 LogicalPlan::Return(ret) => {
505 let value = ret.value.as_ref().map(|expr| expr.to_string()).unwrap_or_default();
506 ("Return", value)
507 }
508 LogicalPlan::CallFunction(call) => {
509 let args: Vec<String> = call.arguments.iter().map(|a| format!("{}", a)).collect();
510 ("CallFunction", format!("{}({})", call.name.text(), args.join(", ")))
511 }
512 LogicalPlan::Append(node) => match node {
513 AppendNode::IntoVariable {
514 target,
515 ..
516 } => ("Append", format!("${}", target.text())),
517 AppendNode::Query {
518 ..
519 } => ("Append", String::new()),
520 },
521 LogicalPlan::DefineClosure(node) => {
522 let params: Vec<String> = node
523 .parameters
524 .iter()
525 .map(|p| {
526 if let Some(ref tc) = p.type_constraint {
527 format!("${}: {:?}", p.name.text(), tc)
528 } else {
529 format!("${}", p.name.text())
530 }
531 })
532 .collect();
533 ("DefineClosure", format!("[{}]", params.join(", ")))
534 }
535 }
536}
537
538fn expressions_inline<E: Display>(items: &[E]) -> String {
539 items.iter().map(|e| e.to_string()).collect::<Vec<_>>().join(", ")
540}