Skip to main content

reifydb_engine/vm/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{error::diagnostic::subscription, util::ioc::IocContainer, value::column::columns::Columns};
9use reifydb_extension::transform::registry::Transforms;
10use reifydb_metric::metric::MetricReader;
11use reifydb_policy::inject_read_policies;
12use reifydb_routine::{function::registry::Functions, procedure::registry::Procedures};
13use reifydb_rql::{
14	ast::parse_str,
15	compiler::{CompilationResult, constrain_policy},
16};
17use reifydb_runtime::context::RuntimeContext;
18use reifydb_store_single::SingleStore;
19use reifydb_transaction::transaction::{
20	RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
21	query::QueryTransaction, subscription::SubscriptionTransaction,
22};
23#[cfg(not(target_arch = "wasm32"))]
24use reifydb_type::error::Diagnostic;
25use reifydb_type::{
26	error::Error,
27	params::Params,
28	value::{Value, frame::frame::Frame, r#type::Type},
29};
30use tracing::instrument;
31
32#[cfg(not(target_arch = "wasm32"))]
33use crate::remote::{self, RemoteRegistry};
34use crate::{
35	Result,
36	policy::PolicyEvaluator,
37	vm::{
38		Admin, Command, Query, Subscription, Test,
39		services::Services,
40		stack::{SymbolTable, Variable},
41		vm::Vm,
42	},
43};
44
45/// Executor is the orchestration layer for RQL statement execution.
46pub struct Executor(Arc<Services>);
47
48impl Clone for Executor {
49	fn clone(&self) -> Self {
50		Self(self.0.clone())
51	}
52}
53
54impl Deref for Executor {
55	type Target = Services;
56
57	fn deref(&self) -> &Self::Target {
58		&self.0
59	}
60}
61
62impl Executor {
63	pub fn new(
64		catalog: Catalog,
65		runtime_context: RuntimeContext,
66		functions: Functions,
67		procedures: Procedures,
68		transforms: Transforms,
69		flow_operator_store: SystemFlowOperatorStore,
70		stats_reader: MetricReader<SingleStore>,
71		ioc: IocContainer,
72		#[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
73	) -> Self {
74		Self(Arc::new(Services::new(
75			catalog,
76			runtime_context,
77			functions,
78			procedures,
79			transforms,
80			flow_operator_store,
81			stats_reader,
82			ioc,
83			#[cfg(not(target_arch = "wasm32"))]
84			remote_registry,
85		)))
86	}
87
88	/// Get a reference to the underlying Services
89	pub fn services(&self) -> &Arc<Services> {
90		&self.0
91	}
92
93	/// Construct an Executor from an existing `Arc<Services>`.
94	pub fn from_services(services: Arc<Services>) -> Self {
95		Self(services)
96	}
97
98	#[allow(dead_code)]
99	pub fn testing() -> Self {
100		Self(Services::testing())
101	}
102
103	/// If the error is a REMOTE_001 and we have a RemoteRegistry, forward the query.
104	/// Returns `Ok(Some(frames))` if forwarded, `Ok(None)` if not a remote query.
105	#[cfg(not(target_arch = "wasm32"))]
106	fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
107		if let Some(ref registry) = self.0.remote_registry {
108			if remote::is_remote_query(err) {
109				if let Some(address) = remote::extract_remote_address(err) {
110					let token = remote::extract_remote_token(err);
111					return registry
112						.forward_query(&address, rql, params, token.as_deref())
113						.map(Some);
114				}
115			}
116		}
117		Ok(None)
118	}
119}
120
121impl RqlExecutor for Executor {
122	fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> Result<Vec<Frame>> {
123		Executor::rql(self, tx, rql, params)
124	}
125}
126
127/// Populate a stack with parameters so they can be accessed as variables.
128fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
129	match params {
130		Params::Positional(values) => {
131			for (index, value) in values.iter().enumerate() {
132				let param_name = (index + 1).to_string();
133				symbols.set(param_name, Variable::scalar(value.clone()), false)?;
134			}
135		}
136		Params::Named(map) => {
137			for (name, value) in map.iter() {
138				symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
139			}
140		}
141		Params::None => {}
142	}
143	Ok(())
144}
145
146/// Populate the `$identity` variable in the symbol table so policy bodies
147/// (and user RQL) can reference `$identity.id`, `$identity.name`, and `$identity.roles`.
148fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
149	let identity = tx.identity();
150	if identity.is_privileged() {
151		return Ok(());
152	}
153	if identity.is_anonymous() {
154		let columns = Columns::single_row([
155			("id", Value::IdentityId(identity)),
156			("name", Value::none_of(Type::Utf8)),
157			("roles", Value::List(vec![])),
158		]);
159		symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
160		return Ok(());
161	}
162	if let Some(user) = catalog.find_identity(tx, identity)? {
163		let roles = catalog.find_role_names_for_identity(tx, identity)?;
164		let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
165		let columns = Columns::single_row([
166			("id", Value::IdentityId(identity)),
167			("name", Value::Utf8(user.name)),
168			("roles", Value::List(role_values)),
169		]);
170		symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
171	}
172	Ok(())
173}
174
175impl Executor {
176	/// Execute RQL against an existing open transaction.
177	///
178	/// This is the universal RQL execution interface: it compiles and runs
179	/// arbitrary RQL within whatever transaction variant the caller provides.
180	#[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
181	pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> Result<Vec<Frame>> {
182		let mut result = vec![];
183		let mut symbols = SymbolTable::new();
184		populate_symbols(&mut symbols, &params)?;
185		populate_identity(&mut symbols, &self.catalog, tx)?;
186
187		let compiled = match self
188			.compiler
189			.compile_with_policy(tx, rql, |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx))
190		{
191			Ok(CompilationResult::Ready(compiled)) => compiled,
192			Ok(CompilationResult::Incremental(_)) => {
193				unreachable!("incremental compilation not supported in rql()")
194			}
195			Err(err) => {
196				#[cfg(not(target_arch = "wasm32"))]
197				if let Some(frames) = self.try_forward_remote_query(&err, rql, params)? {
198					return Ok(frames);
199				}
200				return Err(err);
201			}
202		};
203
204		for compiled in compiled.iter() {
205			result.clear();
206			let mut vm = Vm::new(symbols);
207			vm.run(&self.0, tx, &compiled.instructions, &params, &mut result)?;
208			symbols = vm.symbols;
209		}
210
211		Ok(result)
212	}
213
214	#[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
215	pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> Result<Vec<Frame>> {
216		let mut result = vec![];
217		let mut output_results: Vec<Frame> = Vec::new();
218		let mut symbols = SymbolTable::new();
219		populate_symbols(&mut symbols, &cmd.params)?;
220
221		populate_identity(&mut symbols, &self.catalog, &mut Transaction::Admin(&mut *txn))?;
222
223		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
224			&mut Transaction::Admin(&mut *txn),
225			"admin",
226			true,
227		)?;
228
229		match self.compiler.compile_with_policy(
230			&mut Transaction::Admin(txn),
231			cmd.rql,
232			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
233		) {
234			Err(err) => {
235				#[cfg(not(target_arch = "wasm32"))]
236				if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
237					return Ok(frames);
238				}
239				return Err(err);
240			}
241			Ok(CompilationResult::Ready(compiled)) => {
242				for compiled in compiled.iter() {
243					result.clear();
244					let mut tx = Transaction::Admin(txn);
245					let mut vm = Vm::new(symbols);
246					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
247					symbols = vm.symbols;
248
249					if compiled.is_output {
250						output_results.append(&mut result);
251					}
252				}
253			}
254			Ok(CompilationResult::Incremental(mut state)) => {
255				let policy = constrain_policy(|plans, bump, cat, tx| {
256					inject_read_policies(plans, bump, cat, tx)
257				});
258				while let Some(compiled) = self.compiler.compile_next_with_policy(
259					&mut Transaction::Admin(txn),
260					&mut state,
261					&policy,
262				)? {
263					result.clear();
264					let mut tx = Transaction::Admin(txn);
265					let mut vm = Vm::new(symbols);
266					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
267					symbols = vm.symbols;
268
269					if compiled.is_output {
270						output_results.append(&mut result);
271					}
272				}
273			}
274		}
275
276		let mut final_result = output_results;
277		final_result.append(&mut result);
278		Ok(final_result)
279	}
280
281	#[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
282	pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> Result<Vec<Frame>> {
283		let mut result = vec![];
284		let mut output_results: Vec<Frame> = Vec::new();
285		let mut symbols = SymbolTable::new();
286		populate_symbols(&mut symbols, &cmd.params)?;
287
288		populate_identity(&mut symbols, &self.catalog, &mut Transaction::Test(txn.reborrow()))?;
289
290		let session_type = txn.session_type.clone();
291		let session_default_deny = txn.session_default_deny;
292		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
293			&mut Transaction::Test(txn.reborrow()),
294			&session_type,
295			session_default_deny,
296		)?;
297
298		match self.compiler.compile_with_policy(
299			&mut Transaction::Test(txn.reborrow()),
300			cmd.rql,
301			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
302		) {
303			Err(err) => {
304				#[cfg(not(target_arch = "wasm32"))]
305				if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
306					return Ok(frames);
307				}
308				return Err(err);
309			}
310			Ok(CompilationResult::Ready(compiled)) => {
311				for compiled in compiled.iter() {
312					result.clear();
313					let mut tx = Transaction::Test(txn.reborrow());
314					let mut vm = Vm::new(symbols);
315					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
316					symbols = vm.symbols;
317
318					if compiled.is_output {
319						output_results.append(&mut result);
320					}
321				}
322			}
323			Ok(CompilationResult::Incremental(mut state)) => {
324				let policy = constrain_policy(|plans, bump, cat, tx| {
325					inject_read_policies(plans, bump, cat, tx)
326				});
327				while let Some(compiled) = self.compiler.compile_next_with_policy(
328					&mut Transaction::Test(txn.reborrow()),
329					&mut state,
330					&policy,
331				)? {
332					result.clear();
333					let mut tx = Transaction::Test(txn.reborrow());
334					let mut vm = Vm::new(symbols);
335					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
336					symbols = vm.symbols;
337
338					if compiled.is_output {
339						output_results.append(&mut result);
340					}
341				}
342			}
343		}
344
345		let mut final_result = output_results;
346		final_result.append(&mut result);
347		Ok(final_result)
348	}
349
350	#[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
351	pub fn subscription(&self, txn: &mut SubscriptionTransaction, cmd: Subscription<'_>) -> Result<Vec<Frame>> {
352		// Pre-compilation validation: parse and check statement constraints
353		let bump = Bump::new();
354		let statements = parse_str(&bump, cmd.rql)?;
355
356		if statements.len() != 1 {
357			return Err(Error(subscription::single_statement_required(
358				"Subscription endpoint requires exactly one statement",
359			)));
360		}
361
362		let statement = &statements[0];
363		if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
364			return Err(Error(subscription::invalid_statement(
365				"Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
366			)));
367		}
368
369		// Proceed with standard compilation and execution
370		let mut result = vec![];
371		let mut output_results: Vec<Frame> = Vec::new();
372		let mut symbols = SymbolTable::new();
373		populate_symbols(&mut symbols, &cmd.params)?;
374
375		populate_identity(&mut symbols, &self.catalog, &mut Transaction::Subscription(&mut *txn))?;
376
377		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
378			&mut Transaction::Subscription(&mut *txn),
379			"subscription",
380			true,
381		)?;
382
383		let compiled = match self.compiler.compile_with_policy(
384			&mut Transaction::Subscription(txn),
385			cmd.rql,
386			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
387		) {
388			Ok(CompilationResult::Ready(compiled)) => compiled,
389			Ok(CompilationResult::Incremental(_)) => {
390				unreachable!("Single subscription statement should not require incremental compilation")
391			}
392			Err(err) => return Err(err),
393		};
394
395		for compiled in compiled.iter() {
396			result.clear();
397			let mut tx = Transaction::Subscription(txn);
398			let mut vm = Vm::new(symbols);
399			vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
400			symbols = vm.symbols;
401
402			if compiled.is_output {
403				output_results.append(&mut result);
404			}
405		}
406
407		let mut final_result = output_results;
408		final_result.append(&mut result);
409		Ok(final_result)
410	}
411
412	#[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
413	pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> Result<Vec<Frame>> {
414		let mut result = vec![];
415		let mut output_results: Vec<Frame> = Vec::new();
416		let mut symbols = SymbolTable::new();
417		populate_symbols(&mut symbols, &cmd.params)?;
418
419		populate_identity(&mut symbols, &self.catalog, &mut Transaction::Command(&mut *txn))?;
420
421		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
422			&mut Transaction::Command(&mut *txn),
423			"command",
424			false,
425		)?;
426
427		let compiled = match self.compiler.compile_with_policy(
428			&mut Transaction::Command(txn),
429			cmd.rql,
430			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
431		) {
432			Ok(CompilationResult::Ready(compiled)) => compiled,
433			Ok(CompilationResult::Incremental(_)) => {
434				unreachable!("DDL statements require admin transactions, not command transactions")
435			}
436			Err(err) => {
437				#[cfg(not(target_arch = "wasm32"))]
438				if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
439					return Err(Error(Diagnostic {
440						code: "REMOTE_002".to_string(),
441						message: "Write operations on remote namespaces are not supported"
442							.to_string(),
443						help: Some("Use the remote instance directly for write operations"
444							.to_string()),
445						..Default::default()
446					}));
447				}
448				return Err(err);
449			}
450		};
451
452		for compiled in compiled.iter() {
453			result.clear();
454			let mut tx = Transaction::Command(txn);
455			let mut vm = Vm::new(symbols);
456			vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
457			symbols = vm.symbols;
458
459			if compiled.is_output {
460				output_results.append(&mut result);
461			}
462		}
463
464		let mut final_result = output_results;
465		final_result.append(&mut result);
466		Ok(final_result)
467	}
468
469	/// Call a procedure by fully-qualified name (e.g., "banking.transfer_funds").
470	#[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
471	pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> Result<Vec<Frame>> {
472		// Compile and execute CALL <name>(<params>)
473		let rql = format!("CALL {}()", name);
474		let mut result = vec![];
475		let mut symbols = SymbolTable::new();
476		populate_symbols(&mut symbols, params)?;
477		populate_identity(&mut symbols, &self.catalog, &mut Transaction::Command(&mut *txn))?;
478
479		let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql)? {
480			CompilationResult::Ready(compiled) => compiled,
481			CompilationResult::Incremental(_) => {
482				unreachable!("CALL statements should not require incremental compilation")
483			}
484		};
485
486		for compiled in compiled.iter() {
487			result.clear();
488			let mut tx = Transaction::Command(txn);
489			let mut vm = Vm::new(symbols);
490			vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result)?;
491			symbols = vm.symbols;
492		}
493
494		Ok(result)
495	}
496
497	#[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
498	pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> Result<Vec<Frame>> {
499		let mut result = vec![];
500		let mut output_results: Vec<Frame> = Vec::new();
501		let mut symbols = SymbolTable::new();
502		populate_symbols(&mut symbols, &qry.params)?;
503
504		populate_identity(&mut symbols, &self.catalog, &mut Transaction::Query(&mut *txn))?;
505
506		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
507			&mut Transaction::Query(&mut *txn),
508			"query",
509			false,
510		)?;
511
512		let compiled = match self.compiler.compile_with_policy(
513			&mut Transaction::Query(txn),
514			qry.rql,
515			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
516		) {
517			Ok(CompilationResult::Ready(compiled)) => compiled,
518			Ok(CompilationResult::Incremental(_)) => {
519				unreachable!("DDL statements require admin transactions, not query transactions")
520			}
521			Err(err) => {
522				#[cfg(not(target_arch = "wasm32"))]
523				if let Some(frames) = self.try_forward_remote_query(&err, qry.rql, qry.params)? {
524					return Ok(frames);
525				}
526				return Err(err);
527			}
528		};
529
530		for compiled in compiled.iter() {
531			result.clear();
532			let mut tx = Transaction::Query(txn);
533			let mut vm = Vm::new(symbols);
534			vm.run(&self.0, &mut tx, &compiled.instructions, &qry.params, &mut result)?;
535			symbols = vm.symbols;
536
537			if compiled.is_output {
538				output_results.append(&mut result);
539			}
540		}
541
542		let mut final_result = output_results;
543		final_result.append(&mut result);
544		Ok(final_result)
545	}
546}