Skip to main content

reifydb_routine/procedure/rql/
logical.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{fmt::Display, sync::LazyLock};
5
6use bumpalo::Bump;
7use reifydb_core::{
8	common::JoinType,
9	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
10};
11use reifydb_rql::{
12	ast::{ast::AstAlterPolicyAction, parse_str},
13	plan::logical::{
14		AggregateNode, AlterSequenceNode, AppendNode, AssertNode, CreateColumnPropertyNode, CreateIndexNode,
15		CreatePrimaryKeyNode, DistinctNode, ExtendNode, FilterNode, GateNode, GeneratorNode, InlineDataNode,
16		JoinInnerNode, JoinLeftNode, JoinNaturalNode, LogicalPlan, MapNode, OrderNode, PatchNode,
17		RemoteScanNode, ShapeScanNode, TakeNode, VariableSourceNode, compile_logical,
18	},
19};
20use reifydb_type::value::r#type::Type;
21
22use crate::{
23	procedure::rql::extract_query,
24	routine::{Routine, RoutineInfo, context::ProcedureContext, error::RoutineError},
25};
26
27static INFO: LazyLock<RoutineInfo> = LazyLock::new(|| RoutineInfo::new("rql::logical"));
28
29pub struct RqlLogical;
30
31impl Default for RqlLogical {
32	fn default() -> Self {
33		Self::new()
34	}
35}
36
37impl RqlLogical {
38	pub fn new() -> Self {
39		Self
40	}
41}
42
43impl<'a, 'tx> Routine<ProcedureContext<'a, 'tx>> for RqlLogical {
44	fn info(&self) -> &RoutineInfo {
45		&INFO
46	}
47
48	fn return_type(&self, _input_types: &[Type]) -> Type {
49		Type::Any
50	}
51
52	fn attaches_row_metadata(&self) -> bool {
53		false
54	}
55
56	fn execute(&self, ctx: &mut ProcedureContext<'a, 'tx>, _args: &Columns) -> Result<Columns, RoutineError> {
57		let query = extract_query(ctx.params, "rql::logical")?;
58
59		let bump = Bump::new();
60		let statements = parse_str(&bump, query.as_str())?;
61
62		let mut walker = LogicalWalker::default();
63		for statement in statements {
64			let plans = compile_logical(&bump, ctx.catalog, ctx.tx, statement)?;
65			for plan in plans.iter() {
66				walker.walk(plan, 0, None);
67			}
68		}
69
70		Ok(walker.into_columns())
71	}
72}
73
74#[derive(Default)]
75struct LogicalWalker {
76	idx: Vec<i32>,
77	depth: Vec<i32>,
78	parent: Vec<Option<i32>>,
79	kind: Vec<String>,
80	detail: Vec<String>,
81}
82
83impl LogicalWalker {
84	fn emit(&mut self, depth: i32, parent: Option<i32>, kind: &str, detail: String) -> i32 {
85		let next = self.idx.len() as i32;
86		self.idx.push(next);
87		self.depth.push(depth);
88		self.parent.push(parent);
89		self.kind.push(kind.to_string());
90		self.detail.push(detail);
91		next
92	}
93
94	fn into_columns(self) -> Columns {
95		Columns::new(vec![
96			ColumnWithName::int4("idx", self.idx),
97			ColumnWithName::int4("depth", self.depth),
98			ColumnWithName::new("parent", ColumnBuffer::int4_optional(self.parent)),
99			ColumnWithName::utf8("kind", self.kind),
100			ColumnWithName::utf8("detail", self.detail),
101		])
102	}
103
104	fn walk(&mut self, plan: &LogicalPlan<'_>, depth: i32, parent: Option<i32>) {
105		let (kind, detail) = describe(plan);
106		let me = self.emit(depth, parent, kind, detail);
107		self.recurse_children(plan, depth + 1, Some(me));
108	}
109
110	fn recurse_children(&mut self, plan: &LogicalPlan<'_>, depth: i32, parent: Option<i32>) {
111		match plan {
112			LogicalPlan::Pipeline(pipeline) => {
113				for step in pipeline.steps.iter() {
114					self.walk(step, depth, parent);
115				}
116			}
117			LogicalPlan::JoinInner(JoinInnerNode {
118				with,
119				..
120			})
121			| LogicalPlan::JoinLeft(JoinLeftNode {
122				with,
123				..
124			})
125			| LogicalPlan::JoinNatural(JoinNaturalNode {
126				with,
127				..
128			}) => {
129				for child in with.iter() {
130					self.walk(child, depth, parent);
131				}
132			}
133			LogicalPlan::Append(AppendNode::Query {
134				with,
135			}) => {
136				for child in with.iter() {
137					self.walk(child, depth, parent);
138				}
139			}
140			LogicalPlan::Scalarize(scalarize) => {
141				self.walk(&scalarize.input, depth, parent);
142			}
143			LogicalPlan::DeleteTable(node) => {
144				if let Some(input) = &node.input {
145					self.walk(input, depth, parent);
146				}
147			}
148			LogicalPlan::DeleteRingBuffer(node) => {
149				if let Some(input) = &node.input {
150					self.walk(input, depth, parent);
151				}
152			}
153			LogicalPlan::Update(node) => {
154				if let Some(input) = &node.input {
155					self.walk(input, depth, parent);
156				}
157			}
158			LogicalPlan::UpdateRingBuffer(node) => {
159				if let Some(input) = &node.input {
160					self.walk(input, depth, parent);
161				}
162			}
163			LogicalPlan::UpdateSeries(node) => {
164				if let Some(input) = &node.input {
165					self.walk(input, depth, parent);
166				}
167			}
168			LogicalPlan::Conditional(node) => {
169				self.walk(&node.then_branch, depth, parent);
170				for else_if in node.else_ifs.iter() {
171					self.walk(&else_if.then_branch, depth, parent);
172				}
173				if let Some(else_branch) = &node.else_branch {
174					self.walk(else_branch, depth, parent);
175				}
176			}
177			LogicalPlan::DefineFunction(def) => {
178				for stmt in def.body.iter() {
179					for child in stmt.iter() {
180						self.walk(child, depth, parent);
181					}
182				}
183			}
184			LogicalPlan::DefineClosure(def) => {
185				for stmt in def.body.iter() {
186					for child in stmt.iter() {
187						self.walk(child, depth, parent);
188					}
189				}
190			}
191			_ => {}
192		}
193	}
194}
195
196fn describe(plan: &LogicalPlan<'_>) -> (&'static str, String) {
197	match plan {
198		LogicalPlan::Loop(_) => ("Loop", String::new()),
199		LogicalPlan::While(_) => ("While", String::new()),
200		LogicalPlan::For(_) => ("For", String::new()),
201		LogicalPlan::Break => ("Break", String::new()),
202		LogicalPlan::Continue => ("Continue", String::new()),
203		LogicalPlan::CreateDeferredView(_) => ("CreateDeferredView", String::new()),
204		LogicalPlan::CreateTransactionalView(_) => ("CreateTransactionalView", String::new()),
205		LogicalPlan::CreateNamespace(_) => ("CreateNamespace", String::new()),
206		LogicalPlan::CreateRemoteNamespace(_) => ("CreateRemoteNamespace", String::new()),
207		LogicalPlan::CreateSequence(_) => ("CreateSequence", String::new()),
208		LogicalPlan::CreateTable(_) => ("CreateTable", String::new()),
209		LogicalPlan::CreateRingBuffer(_) => ("CreateRingBuffer", String::new()),
210		LogicalPlan::CreateDictionary(_) => ("CreateDictionary", String::new()),
211		LogicalPlan::CreateSumType(_) => ("CreateSumType", String::new()),
212		LogicalPlan::CreateSubscription(_) => ("CreateSubscription", String::new()),
213		LogicalPlan::DropNamespace(_) => ("DropNamespace", String::new()),
214		LogicalPlan::DropTable(_) => ("DropTable", String::new()),
215		LogicalPlan::DropView(_) => ("DropView", String::new()),
216		LogicalPlan::DropRingBuffer(_) => ("DropRingBuffer", String::new()),
217		LogicalPlan::DropDictionary(_) => ("DropDictionary", String::new()),
218		LogicalPlan::DropSumType(_) => ("DropSumType", String::new()),
219		LogicalPlan::DropSubscription(_) => ("DropSubscription", String::new()),
220		LogicalPlan::DropSeries(_) => ("DropSeries", String::new()),
221		LogicalPlan::DropProcedure(_) => ("DropProcedure", String::new()),
222		LogicalPlan::DropHandler(_) => ("DropHandler", String::new()),
223		LogicalPlan::DropTest(_) => ("DropTest", String::new()),
224		LogicalPlan::CreateSource(_) => ("CreateSource", String::new()),
225		LogicalPlan::CreateSink(_) => ("CreateSink", String::new()),
226		LogicalPlan::CreateBinding(_) => ("CreateBinding", String::new()),
227		LogicalPlan::DropSource(_) => ("DropSource", String::new()),
228		LogicalPlan::DropSink(_) => ("DropSink", String::new()),
229		LogicalPlan::DropBinding(_) => ("DropBinding", String::new()),
230		LogicalPlan::CreateIdentity(n) => ("CreateIdentity", format!("name={}", n.name.text())),
231		LogicalPlan::CreateRole(n) => ("CreateRole", format!("name={}", n.name.text())),
232		LogicalPlan::Grant(n) => ("Grant", format!("role={} user={}", n.role.text(), n.user.text())),
233		LogicalPlan::Revoke(n) => ("Revoke", format!("role={} user={}", n.role.text(), n.user.text())),
234		LogicalPlan::DropIdentity(n) => {
235			("DropIdentity", format!("name={} if_exists={}", n.name.text(), n.if_exists))
236		}
237		LogicalPlan::DropRole(n) => ("DropRole", format!("name={} if_exists={}", n.name.text(), n.if_exists)),
238		LogicalPlan::CreateAuthentication(n) => ("CreateAuthentication", format!("user={}", n.user.text())),
239		LogicalPlan::DropAuthentication(n) => {
240			("DropAuthentication", format!("user={} if_exists={}", n.user.text(), n.if_exists))
241		}
242		LogicalPlan::CreatePolicy(n) => {
243			let name = n.name.as_ref().map(|f| f.text()).unwrap_or("<unnamed>");
244			("CreatePolicy", format!("name={} type={:?}", name, n.target_type))
245		}
246		LogicalPlan::AlterPolicy(n) => {
247			let enabled = n.action == AstAlterPolicyAction::Enable;
248			("AlterPolicy", format!("name={} enabled={}", n.name.text(), enabled))
249		}
250		LogicalPlan::DropPolicy(n) => {
251			("DropPolicy", format!("name={} if_exists={}", n.name.text(), n.if_exists))
252		}
253		LogicalPlan::AlterSequence(AlterSequenceNode {
254			sequence,
255			column,
256			value,
257		}) => {
258			let namespace =
259				sequence.namespace.first().map(|s| format!("{}.", s.text())).unwrap_or_default();
260			(
261				"AlterSequence",
262				format!("{}{}.{} = {}", namespace, sequence.name.text(), column.name.text(), value),
263			)
264		}
265		LogicalPlan::CreateIndex(CreateIndexNode {
266			index_type,
267			index,
268			columns,
269			..
270		}) => {
271			let cols = columns
272				.iter()
273				.map(|c| {
274					if let Some(order) = &c.order {
275						format!("{} {:?}", c.column.text(), order)
276					} else {
277						c.column.text().to_string()
278					}
279				})
280				.collect::<Vec<_>>()
281				.join(", ");
282			(
283				"CreateIndex",
284				format!(
285					"type={:?} name={} table={} columns=[{}]",
286					index_type,
287					index.name.text(),
288					index.table.text(),
289					cols
290				),
291			)
292		}
293		LogicalPlan::DeleteTable(node) => {
294			let target = node
295				.target
296				.as_ref()
297				.map(|t| {
298					let ns = t.namespace.first().map(|n| n.text()).unwrap_or("default");
299					format!("{}::{}", ns, t.name.text())
300				})
301				.unwrap_or_else(|| "<inferred>".to_string());
302			("DeleteTable", format!("target={}", target))
303		}
304		LogicalPlan::DeleteRingBuffer(node) => {
305			let ns = node.target.namespace.first().map(|n| n.text()).unwrap_or("default");
306			("DeleteRingBuffer", format!("target={}::{}", ns, node.target.name.text()))
307		}
308		LogicalPlan::InsertTable(_) => ("InsertTable", String::new()),
309		LogicalPlan::InsertRingBuffer(_) => ("InsertRingBuffer", String::new()),
310		LogicalPlan::InsertDictionary(_) => ("InsertDictionary", String::new()),
311		LogicalPlan::InsertSeries(_) => ("InsertSeries", String::new()),
312		LogicalPlan::DeleteSeries(_) => ("DeleteSeries", String::new()),
313		LogicalPlan::UpdateSeries(node) => {
314			let ns = node.target.namespace.first().map(|n| n.text()).unwrap_or("default");
315			("UpdateSeries", format!("target={}::{}", ns, node.target.name.text()))
316		}
317		LogicalPlan::Update(node) => {
318			let target = node
319				.target
320				.as_ref()
321				.map(|t| {
322					let ns = t.namespace.first().map(|n| n.text()).unwrap_or("default");
323					format!("{}::{}", ns, t.name.text())
324				})
325				.unwrap_or_else(|| "<inferred>".to_string());
326			("Update", format!("target={}", target))
327		}
328		LogicalPlan::UpdateRingBuffer(node) => {
329			let ns = node.target.namespace.first().map(|n| n.text()).unwrap_or("default");
330			("UpdateRingBuffer", format!("target={}::{}", ns, node.target.name.text()))
331		}
332		LogicalPlan::Take(TakeNode {
333			take,
334		}) => ("Take", take.to_string()),
335		LogicalPlan::Assert(AssertNode {
336			condition,
337			message,
338			..
339		}) => {
340			let msg = message.as_deref().unwrap_or("assertion failed");
341			("Assert", format!("\"{}\" condition: {}", msg, condition))
342		}
343		LogicalPlan::AssertBlock(node) => {
344			let kind = if node.expect_error {
345				"AssertError"
346			} else {
347				"AssertBlock"
348			};
349			let msg = node.message.as_deref().unwrap_or("");
350			(kind, format!("\"{}\"", msg))
351		}
352		LogicalPlan::Filter(FilterNode {
353			condition,
354			..
355		}) => ("Filter", format!("condition: {}", condition)),
356		LogicalPlan::Gate(GateNode {
357			condition,
358			..
359		}) => ("Gate", format!("condition: {}", condition)),
360		LogicalPlan::Map(MapNode {
361			map,
362			..
363		}) => ("Map", expressions_inline(map)),
364		LogicalPlan::Extend(ExtendNode {
365			extend,
366			..
367		}) => ("Extend", expressions_inline(extend)),
368		LogicalPlan::Patch(PatchNode {
369			assignments,
370			..
371		}) => ("Patch", expressions_inline(assignments)),
372		LogicalPlan::Aggregate(AggregateNode {
373			by,
374			map,
375			..
376		}) => {
377			let by_str = expressions_inline(by);
378			let map_str = expressions_inline(map);
379			("Aggregate", format!("by=[{}] map=[{}]", by_str, map_str))
380		}
381		LogicalPlan::Order(OrderNode {
382			by,
383			..
384		}) => ("Order", expressions_inline(by)),
385		LogicalPlan::JoinInner(JoinInnerNode {
386			on,
387			..
388		}) => ("JoinInner", expressions_inline(on)),
389		LogicalPlan::JoinLeft(JoinLeftNode {
390			on,
391			..
392		}) => ("JoinLeft", expressions_inline(on)),
393		LogicalPlan::JoinNatural(JoinNaturalNode {
394			join_type,
395			..
396		}) => {
397			let kind = match join_type {
398				JoinType::Inner => "Inner",
399				JoinType::Left => "Left",
400			};
401			("JoinNatural", format!("type={}", kind))
402		}
403		LogicalPlan::PrimitiveScan(ShapeScanNode {
404			source,
405			index,
406			..
407		}) => {
408			let name =
409				source.fully_qualified_name().unwrap_or_else(|| source.identifier().text().to_string());
410			if let Some(idx) = index {
411				("IndexScan", format!("{}::{}", name, idx.identifier().text()))
412			} else {
413				("TableScan", name)
414			}
415		}
416		LogicalPlan::RemoteScan(RemoteScanNode {
417			address,
418			local_namespace,
419			remote_name,
420			..
421		}) => ("RemoteScan", format!("{}::{} @ {}", local_namespace, remote_name, address)),
422		LogicalPlan::InlineData(InlineDataNode {
423			rows,
424		}) => {
425			let total_fields: usize = rows.iter().map(|row| row.len()).sum();
426			("InlineData", format!("rows={} fields={}", rows.len(), total_fields))
427		}
428		LogicalPlan::Generator(GeneratorNode {
429			name,
430			expressions,
431		}) => ("Generator", format!("name={} parameters={}", name.text(), expressions.len())),
432		LogicalPlan::VariableSource(VariableSourceNode {
433			name,
434		}) => ("VariableSource", name.text().to_string()),
435		LogicalPlan::Environment(_) => ("Environment", String::new()),
436		LogicalPlan::Distinct(DistinctNode {
437			columns,
438			..
439		}) => {
440			let detail = if columns.is_empty() {
441				"primary key".to_string()
442			} else {
443				columns.iter().map(|c| c.name.text().to_string()).collect::<Vec<_>>().join(", ")
444			};
445			("Distinct", detail)
446		}
447		LogicalPlan::Apply(apply) => {
448			("Apply", format!("operator={} arguments={}", apply.operator.text(), apply.arguments.len()))
449		}
450		LogicalPlan::Pipeline(_) => ("Pipeline", String::new()),
451		LogicalPlan::CreatePrimaryKey(CreatePrimaryKeyNode {
452			..
453		}) => ("CreatePrimaryKey", String::new()),
454		LogicalPlan::CreateColumnProperty(CreateColumnPropertyNode {
455			..
456		}) => ("CreateColumnProperty", String::new()),
457		LogicalPlan::CreateProcedure(_) => ("CreateProcedure", String::new()),
458		LogicalPlan::CreateSeries(_) => ("CreateSeries", String::new()),
459		LogicalPlan::CreateEvent(_) => ("CreateEvent", String::new()),
460		LogicalPlan::CreateTag(_) => ("CreateTag", String::new()),
461		LogicalPlan::CreateTest(_) => ("CreateTest", String::new()),
462		LogicalPlan::RunTests(_) => ("RunTests", String::new()),
463		LogicalPlan::CreateMigration(_) => ("CreateMigration", String::new()),
464		LogicalPlan::Migrate(_) => ("Migrate", String::new()),
465		LogicalPlan::RollbackMigration(_) => ("RollbackMigration", String::new()),
466		LogicalPlan::Dispatch(_) => ("Dispatch", String::new()),
467		LogicalPlan::Window(window) => {
468			let group = window.group_by.len();
469			let agg = window.aggregations.len();
470			("Window", format!("kind={:?} group_by={} aggregations={}", window.kind, group, agg))
471		}
472		LogicalPlan::Declare(node) => ("Declare", format!("{} = {}", node.name.text(), node.value)),
473		LogicalPlan::Assign(node) => ("Assign", format!("{} = {}", node.name.text(), node.value)),
474		LogicalPlan::Conditional(node) => ("Conditional", format!("if {}", node.condition)),
475		LogicalPlan::Scalarize(_) => ("Scalarize", String::new()),
476		LogicalPlan::AlterTable(node) => {
477			let name = if let Some(ns) = node.table.namespace.first() {
478				format!("{}::{}", ns.text(), node.table.name.text())
479			} else {
480				node.table.name.text().to_string()
481			};
482			("AlterTable", name)
483		}
484		LogicalPlan::AlterRemoteNamespace(_) => ("AlterRemoteNamespace", String::new()),
485		LogicalPlan::DefineFunction(def) => {
486			let params: Vec<String> = def
487				.parameters
488				.iter()
489				.map(|p| {
490					if let Some(ref tc) = p.type_constraint {
491						format!("${}: {:?}", p.name.text(), tc)
492					} else {
493						format!("${}", p.name.text())
494					}
495				})
496				.collect();
497			let ret = if let Some(ref rt) = def.return_type {
498				format!(" -> {:?}", rt)
499			} else {
500				String::new()
501			};
502			("DefineFunction", format!("{}[{}]{}", def.name.text(), params.join(", "), ret))
503		}
504		LogicalPlan::Return(ret) => {
505			let value = ret.value.as_ref().map(|expr| expr.to_string()).unwrap_or_default();
506			("Return", value)
507		}
508		LogicalPlan::CallFunction(call) => {
509			let args: Vec<String> = call.arguments.iter().map(|a| format!("{}", a)).collect();
510			("CallFunction", format!("{}({})", call.name.text(), args.join(", ")))
511		}
512		LogicalPlan::Append(node) => match node {
513			AppendNode::IntoVariable {
514				target,
515				..
516			} => ("Append", format!("${}", target.text())),
517			AppendNode::Query {
518				..
519			} => ("Append", String::new()),
520		},
521		LogicalPlan::DefineClosure(node) => {
522			let params: Vec<String> = node
523				.parameters
524				.iter()
525				.map(|p| {
526					if let Some(ref tc) = p.type_constraint {
527						format!("${}: {:?}", p.name.text(), tc)
528					} else {
529						format!("${}", p.name.text())
530					}
531				})
532				.collect();
533			("DefineClosure", format!("[{}]", params.join(", ")))
534		}
535	}
536}
537
538fn expressions_inline<E: Display>(items: &[E]) -> String {
539	items.iter().map(|e| e.to_string()).collect::<Vec<_>>().join(", ")
540}