Skip to main content

reifydb_engine/vm/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{error::diagnostic::subscription, util::ioc::IocContainer, value::column::columns::Columns};
9use reifydb_extension::transform::registry::Transforms;
10use reifydb_metric::metric::MetricReader;
11use reifydb_policy::inject_read_policies;
12use reifydb_routine::{function::registry::Functions, procedure::registry::Procedures};
13use reifydb_rql::{
14	ast::parse_str,
15	compiler::{CompilationResult, Compiled, constrain_policy},
16};
17use reifydb_runtime::context::RuntimeContext;
18use reifydb_store_single::SingleStore;
19use reifydb_transaction::transaction::{
20	RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
21	query::QueryTransaction, subscription::SubscriptionTransaction,
22};
23#[cfg(not(target_arch = "wasm32"))]
24use reifydb_type::error::Diagnostic;
25use reifydb_type::{
26	error::Error,
27	params::Params,
28	value::{Value, frame::frame::Frame, r#type::Type},
29};
30use tracing::instrument;
31
32#[cfg(not(target_arch = "wasm32"))]
33use crate::remote::{self, RemoteRegistry};
34use crate::{
35	Result,
36	policy::PolicyEvaluator,
37	vm::{
38		Admin, Command, Query, Subscription, Test,
39		services::Services,
40		stack::{SymbolTable, Variable},
41		vm::Vm,
42	},
43};
44
45/// Executor is the orchestration layer for RQL statement execution.
46pub struct Executor(Arc<Services>);
47
48impl Clone for Executor {
49	fn clone(&self) -> Self {
50		Self(self.0.clone())
51	}
52}
53
54impl Deref for Executor {
55	type Target = Services;
56
57	fn deref(&self) -> &Self::Target {
58		&self.0
59	}
60}
61
62impl Executor {
63	pub fn new(
64		catalog: Catalog,
65		runtime_context: RuntimeContext,
66		functions: Functions,
67		procedures: Procedures,
68		transforms: Transforms,
69		flow_operator_store: SystemFlowOperatorStore,
70		stats_reader: MetricReader<SingleStore>,
71		ioc: IocContainer,
72		#[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
73	) -> Self {
74		Self(Arc::new(Services::new(
75			catalog,
76			runtime_context,
77			functions,
78			procedures,
79			transforms,
80			flow_operator_store,
81			stats_reader,
82			ioc,
83			#[cfg(not(target_arch = "wasm32"))]
84			remote_registry,
85		)))
86	}
87
88	/// Get a reference to the underlying Services
89	pub fn services(&self) -> &Arc<Services> {
90		&self.0
91	}
92
93	/// Construct an Executor from an existing `Arc<Services>`.
94	pub fn from_services(services: Arc<Services>) -> Self {
95		Self(services)
96	}
97
98	#[allow(dead_code)]
99	pub fn testing() -> Self {
100		Self(Services::testing())
101	}
102
103	/// If the error is a REMOTE_001 and we have a RemoteRegistry, forward the query.
104	/// Returns `Ok(Some(frames))` if forwarded, `Ok(None)` if not a remote query.
105	#[cfg(not(target_arch = "wasm32"))]
106	fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
107		if let Some(ref registry) = self.0.remote_registry {
108			if remote::is_remote_query(err) {
109				if let Some(address) = remote::extract_remote_address(err) {
110					let token = remote::extract_remote_token(err);
111					return registry
112						.forward_query(&address, rql, params, token.as_deref())
113						.map(Some);
114				}
115			}
116		}
117		Ok(None)
118	}
119}
120
121impl RqlExecutor for Executor {
122	fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> Result<Vec<Frame>> {
123		Executor::rql(self, tx, rql, params)
124	}
125}
126
127/// Populate a stack with parameters so they can be accessed as variables.
128fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
129	match params {
130		Params::Positional(values) => {
131			for (index, value) in values.iter().enumerate() {
132				let param_name = (index + 1).to_string();
133				symbols.set(param_name, Variable::scalar(value.clone()), false)?;
134			}
135		}
136		Params::Named(map) => {
137			for (name, value) in map.iter() {
138				symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
139			}
140		}
141		Params::None => {}
142	}
143	Ok(())
144}
145
146/// Populate the `$identity` variable in the symbol table so policy bodies
147/// (and user RQL) can reference `$identity.id`, `$identity.name`, and `$identity.roles`.
148fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
149	let identity = tx.identity();
150	if identity.is_privileged() {
151		return Ok(());
152	}
153	if identity.is_anonymous() {
154		let columns = Columns::single_row([
155			("id", Value::IdentityId(identity)),
156			("name", Value::none_of(Type::Utf8)),
157			("roles", Value::List(vec![])),
158		]);
159		symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
160		return Ok(());
161	}
162	if let Some(user) = catalog.find_identity(tx, identity)? {
163		let roles = catalog.find_role_names_for_identity(tx, identity)?;
164		let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
165		let columns = Columns::single_row([
166			("id", Value::IdentityId(identity)),
167			("name", Value::Utf8(user.name)),
168			("roles", Value::List(role_values)),
169		]);
170		symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
171	}
172	Ok(())
173}
174
175/// Execute a list of compiled units, tracking output frames separately.
176/// Returns (output_results, last_result, final_symbols).
177fn execute_compiled_units(
178	services: &Arc<Services>,
179	tx: &mut Transaction<'_>,
180	compiled_list: &[Compiled],
181	params: &Params,
182	mut symbols: SymbolTable,
183) -> Result<(Vec<Frame>, Vec<Frame>, SymbolTable)> {
184	let mut result = vec![];
185	let mut output_results: Vec<Frame> = Vec::new();
186
187	for compiled in compiled_list.iter() {
188		result.clear();
189		let mut vm = Vm::new(symbols);
190		vm.run(services, tx, &compiled.instructions, params, &mut result)?;
191		symbols = vm.symbols;
192
193		if compiled.is_output {
194			output_results.append(&mut result);
195		}
196	}
197
198	Ok((output_results, result, symbols))
199}
200
201/// Merge output_results and remaining results into the final result.
202fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
203	output_results.append(&mut remaining);
204	output_results
205}
206
207impl Executor {
208	/// Shared setup: create symbols and populate with params + identity.
209	fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
210		let mut symbols = SymbolTable::new();
211		populate_symbols(&mut symbols, params)?;
212		populate_identity(&mut symbols, &self.catalog, tx)?;
213		Ok(symbols)
214	}
215
216	/// Execute RQL against an existing open transaction.
217	///
218	/// This is the universal RQL execution interface: it compiles and runs
219	/// arbitrary RQL within whatever transaction variant the caller provides.
220	#[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
221	pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> Result<Vec<Frame>> {
222		let mut symbols = self.setup_symbols(&params, tx)?;
223
224		let compiled = match self
225			.compiler
226			.compile_with_policy(tx, rql, |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx))
227		{
228			Ok(CompilationResult::Ready(compiled)) => compiled,
229			Ok(CompilationResult::Incremental(_)) => {
230				unreachable!("incremental compilation not supported in rql()")
231			}
232			Err(err) => {
233				#[cfg(not(target_arch = "wasm32"))]
234				if let Some(frames) = self.try_forward_remote_query(&err, rql, params)? {
235					return Ok(frames);
236				}
237				return Err(err);
238			}
239		};
240
241		let mut result = vec![];
242		for compiled in compiled.iter() {
243			result.clear();
244			let mut vm = Vm::new(symbols);
245			vm.run(&self.0, tx, &compiled.instructions, &params, &mut result)?;
246			symbols = vm.symbols;
247		}
248
249		Ok(result)
250	}
251
252	#[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
253	pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> Result<Vec<Frame>> {
254		let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn))?;
255
256		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
257			&mut Transaction::Admin(&mut *txn),
258			"admin",
259			true,
260		)?;
261
262		match self.compiler.compile_with_policy(
263			&mut Transaction::Admin(txn),
264			cmd.rql,
265			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
266		) {
267			Err(err) => {
268				#[cfg(not(target_arch = "wasm32"))]
269				if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
270					return Ok(frames);
271				}
272				Err(err)
273			}
274			Ok(CompilationResult::Ready(compiled)) => {
275				let (output, remaining, _) = execute_compiled_units(
276					&self.0,
277					&mut Transaction::Admin(txn),
278					&compiled,
279					&cmd.params,
280					symbols,
281				)?;
282				Ok(merge_results(output, remaining))
283			}
284			Ok(CompilationResult::Incremental(mut state)) => {
285				let policy = constrain_policy(|plans, bump, cat, tx| {
286					inject_read_policies(plans, bump, cat, tx)
287				});
288				let mut result = vec![];
289				let mut output_results: Vec<Frame> = Vec::new();
290				let mut symbols = symbols;
291				while let Some(compiled) = self.compiler.compile_next_with_policy(
292					&mut Transaction::Admin(txn),
293					&mut state,
294					&policy,
295				)? {
296					result.clear();
297					let mut tx = Transaction::Admin(txn);
298					let mut vm = Vm::new(symbols);
299					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
300					symbols = vm.symbols;
301					if compiled.is_output {
302						output_results.append(&mut result);
303					}
304				}
305				Ok(merge_results(output_results, result))
306			}
307		}
308	}
309
310	#[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
311	pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> Result<Vec<Frame>> {
312		let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Test(txn.reborrow()))?;
313
314		let session_type = txn.session_type.clone();
315		let session_default_deny = txn.session_default_deny;
316		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
317			&mut Transaction::Test(txn.reborrow()),
318			&session_type,
319			session_default_deny,
320		)?;
321
322		match self.compiler.compile_with_policy(
323			&mut Transaction::Test(txn.reborrow()),
324			cmd.rql,
325			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
326		) {
327			Err(err) => {
328				#[cfg(not(target_arch = "wasm32"))]
329				if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
330					return Ok(frames);
331				}
332				Err(err)
333			}
334			Ok(CompilationResult::Ready(compiled)) => {
335				let (output, remaining, _) = execute_compiled_units(
336					&self.0,
337					&mut Transaction::Test(txn.reborrow()),
338					&compiled,
339					&cmd.params,
340					symbols,
341				)?;
342				Ok(merge_results(output, remaining))
343			}
344			Ok(CompilationResult::Incremental(mut state)) => {
345				let policy = constrain_policy(|plans, bump, cat, tx| {
346					inject_read_policies(plans, bump, cat, tx)
347				});
348				let mut result = vec![];
349				let mut output_results: Vec<Frame> = Vec::new();
350				let mut symbols = symbols;
351				while let Some(compiled) = self.compiler.compile_next_with_policy(
352					&mut Transaction::Test(txn.reborrow()),
353					&mut state,
354					&policy,
355				)? {
356					result.clear();
357					let mut tx = Transaction::Test(txn.reborrow());
358					let mut vm = Vm::new(symbols);
359					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
360					symbols = vm.symbols;
361					if compiled.is_output {
362						output_results.append(&mut result);
363					}
364				}
365				Ok(merge_results(output_results, result))
366			}
367		}
368	}
369
370	#[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
371	pub fn subscription(&self, txn: &mut SubscriptionTransaction, cmd: Subscription<'_>) -> Result<Vec<Frame>> {
372		// Pre-compilation validation: parse and check statement constraints
373		let bump = Bump::new();
374		let statements = parse_str(&bump, cmd.rql)?;
375
376		if statements.len() != 1 {
377			return Err(Error(subscription::single_statement_required(
378				"Subscription endpoint requires exactly one statement",
379			)));
380		}
381
382		let statement = &statements[0];
383		if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
384			return Err(Error(subscription::invalid_statement(
385				"Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
386			)));
387		}
388
389		let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Subscription(&mut *txn))?;
390
391		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
392			&mut Transaction::Subscription(&mut *txn),
393			"subscription",
394			true,
395		)?;
396
397		let compiled = match self.compiler.compile_with_policy(
398			&mut Transaction::Subscription(txn),
399			cmd.rql,
400			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
401		) {
402			Ok(CompilationResult::Ready(compiled)) => compiled,
403			Ok(CompilationResult::Incremental(_)) => {
404				unreachable!("Single subscription statement should not require incremental compilation")
405			}
406			Err(err) => return Err(err),
407		};
408
409		let (output, remaining, _) = execute_compiled_units(
410			&self.0,
411			&mut Transaction::Subscription(txn),
412			&compiled,
413			&cmd.params,
414			symbols,
415		)?;
416		Ok(merge_results(output, remaining))
417	}
418
419	#[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
420	pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> Result<Vec<Frame>> {
421		let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn))?;
422
423		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
424			&mut Transaction::Command(&mut *txn),
425			"command",
426			false,
427		)?;
428
429		let compiled = match self.compiler.compile_with_policy(
430			&mut Transaction::Command(txn),
431			cmd.rql,
432			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
433		) {
434			Ok(CompilationResult::Ready(compiled)) => compiled,
435			Ok(CompilationResult::Incremental(_)) => {
436				unreachable!("DDL statements require admin transactions, not command transactions")
437			}
438			Err(err) => {
439				#[cfg(not(target_arch = "wasm32"))]
440				if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
441					return Err(Error(Diagnostic {
442						code: "REMOTE_002".to_string(),
443						message: "Write operations on remote namespaces are not supported"
444							.to_string(),
445						help: Some("Use the remote instance directly for write operations"
446							.to_string()),
447						..Default::default()
448					}));
449				}
450				return Err(err);
451			}
452		};
453
454		let (output, remaining, _) = execute_compiled_units(
455			&self.0,
456			&mut Transaction::Command(txn),
457			&compiled,
458			&cmd.params,
459			symbols,
460		)?;
461		Ok(merge_results(output, remaining))
462	}
463
464	/// Call a procedure by fully-qualified name (e.g., "banking.transfer_funds").
465	#[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
466	pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> Result<Vec<Frame>> {
467		let rql = format!("CALL {}()", name);
468		let symbols = self.setup_symbols(params, &mut Transaction::Command(&mut *txn))?;
469
470		let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql)? {
471			CompilationResult::Ready(compiled) => compiled,
472			CompilationResult::Incremental(_) => {
473				unreachable!("CALL statements should not require incremental compilation")
474			}
475		};
476
477		let mut result = vec![];
478		let mut symbols = symbols;
479		for compiled in compiled.iter() {
480			result.clear();
481			let mut tx = Transaction::Command(txn);
482			let mut vm = Vm::new(symbols);
483			vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result)?;
484			symbols = vm.symbols;
485		}
486
487		Ok(result)
488	}
489
490	#[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
491	pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> Result<Vec<Frame>> {
492		let symbols = self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn))?;
493
494		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
495			&mut Transaction::Query(&mut *txn),
496			"query",
497			false,
498		)?;
499
500		let compiled = match self.compiler.compile_with_policy(
501			&mut Transaction::Query(txn),
502			qry.rql,
503			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
504		) {
505			Ok(CompilationResult::Ready(compiled)) => compiled,
506			Ok(CompilationResult::Incremental(_)) => {
507				unreachable!("DDL statements require admin transactions, not query transactions")
508			}
509			Err(err) => {
510				#[cfg(not(target_arch = "wasm32"))]
511				if let Some(frames) = self.try_forward_remote_query(&err, qry.rql, qry.params)? {
512					return Ok(frames);
513				}
514				return Err(err);
515			}
516		};
517
518		let (output, remaining, _) =
519			execute_compiled_units(&self.0, &mut Transaction::Query(txn), &compiled, &qry.params, symbols)?;
520		Ok(merge_results(output, remaining))
521	}
522}