Skip to main content

reifydb_engine/vm/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{error::diagnostic::subscription, value::column::columns::Columns};
9use reifydb_metric_old::metric::MetricReader;
10use reifydb_policy::inject_read_policies;
11use reifydb_rql::{
12	ast::parse_str,
13	compiler::{CompilationResult, Compiled, constrain_policy},
14};
15use reifydb_store_single::SingleStore;
16use reifydb_transaction::transaction::{
17	RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
18	query::QueryTransaction,
19};
20#[cfg(not(target_arch = "wasm32"))]
21use reifydb_type::error::Diagnostic;
22use reifydb_type::{
23	error::Error,
24	params::Params,
25	value::{Value, frame::frame::Frame, r#type::Type},
26};
27use tracing::instrument;
28
29#[cfg(not(target_arch = "wasm32"))]
30use crate::remote;
31use crate::{
32	Result,
33	policy::PolicyEvaluator,
34	vm::{
35		Admin, Command, Query, Subscription, Test,
36		services::{EngineConfig, Services},
37		stack::{SymbolTable, Variable},
38		vm::Vm,
39	},
40};
41
42/// Executor is the orchestration layer for RQL statement execution.
43pub struct Executor(Arc<Services>);
44
45impl Clone for Executor {
46	fn clone(&self) -> Self {
47		Self(self.0.clone())
48	}
49}
50
51impl Deref for Executor {
52	type Target = Services;
53
54	fn deref(&self) -> &Self::Target {
55		&self.0
56	}
57}
58
59impl Executor {
60	pub fn new(
61		catalog: Catalog,
62		config: EngineConfig,
63		flow_operator_store: SystemFlowOperatorStore,
64		stats_reader: MetricReader<SingleStore>,
65	) -> Self {
66		Self(Arc::new(Services::new(catalog, config, flow_operator_store, stats_reader)))
67	}
68
69	/// Get a reference to the underlying Services
70	pub fn services(&self) -> &Arc<Services> {
71		&self.0
72	}
73
74	/// Construct an Executor from an existing `Arc<Services>`.
75	pub fn from_services(services: Arc<Services>) -> Self {
76		Self(services)
77	}
78
79	#[allow(dead_code)]
80	pub fn testing() -> Self {
81		Self(Services::testing())
82	}
83
84	/// If the error is a REMOTE_001 and we have a RemoteRegistry, forward the query.
85	/// Returns `Ok(Some(frames))` if forwarded, `Ok(None)` if not a remote query.
86	#[cfg(not(target_arch = "wasm32"))]
87	fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
88		if let Some(ref registry) = self.0.remote_registry
89			&& remote::is_remote_query(err)
90			&& let Some(address) = remote::extract_remote_address(err)
91		{
92			let token = remote::extract_remote_token(err);
93			return registry.forward_query(&address, rql, params, token.as_deref()).map(Some);
94		}
95		Ok(None)
96	}
97}
98
99impl RqlExecutor for Executor {
100	fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> Result<Vec<Frame>> {
101		Executor::rql(self, tx, rql, params)
102	}
103}
104
105/// Populate a stack with parameters so they can be accessed as variables.
106fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
107	match params {
108		Params::Positional(values) => {
109			for (index, value) in values.iter().enumerate() {
110				let param_name = (index + 1).to_string();
111				symbols.set(param_name, Variable::scalar(value.clone()), false)?;
112			}
113		}
114		Params::Named(map) => {
115			for (name, value) in map.iter() {
116				symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
117			}
118		}
119		Params::None => {}
120	}
121	Ok(())
122}
123
124/// Populate the `$identity` variable in the symbol table so policy bodies
125/// (and user RQL) can reference `$identity.id`, `$identity.name`, and `$identity.roles`.
126fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
127	let identity = tx.identity();
128	if identity.is_privileged() {
129		return Ok(());
130	}
131	if identity.is_anonymous() {
132		let columns = Columns::single_row([
133			("id", Value::IdentityId(identity)),
134			("name", Value::none_of(Type::Utf8)),
135			("roles", Value::List(vec![])),
136		]);
137		symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
138		return Ok(());
139	}
140	if let Some(user) = catalog.find_identity(tx, identity)? {
141		let roles = catalog.find_role_names_for_identity(tx, identity)?;
142		let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
143		let columns = Columns::single_row([
144			("id", Value::IdentityId(identity)),
145			("name", Value::Utf8(user.name)),
146			("roles", Value::List(role_values)),
147		]);
148		symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
149	}
150	Ok(())
151}
152
153/// Execute a list of compiled units, tracking output frames separately.
154/// Returns (output_results, last_result, final_symbols).
155fn execute_compiled_units(
156	services: &Arc<Services>,
157	tx: &mut Transaction<'_>,
158	compiled_list: &[Compiled],
159	params: &Params,
160	mut symbols: SymbolTable,
161) -> Result<(Vec<Frame>, Vec<Frame>, SymbolTable)> {
162	let mut result = vec![];
163	let mut output_results: Vec<Frame> = Vec::new();
164
165	for compiled in compiled_list.iter() {
166		result.clear();
167		let mut vm = Vm::new(symbols);
168		vm.run(services, tx, &compiled.instructions, params, &mut result)?;
169		symbols = vm.symbols;
170
171		if compiled.is_output {
172			output_results.append(&mut result);
173		}
174	}
175
176	Ok((output_results, result, symbols))
177}
178
179/// Merge output_results and remaining results into the final result.
180fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
181	output_results.append(&mut remaining);
182	output_results
183}
184
185impl Executor {
186	/// Shared setup: create symbols and populate with params + identity.
187	fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
188		let mut symbols = SymbolTable::new();
189		populate_symbols(&mut symbols, params)?;
190		populate_identity(&mut symbols, &self.catalog, tx)?;
191		Ok(symbols)
192	}
193
194	/// Execute RQL against an existing open transaction.
195	///
196	/// This is the universal RQL execution interface: it compiles and runs
197	/// arbitrary RQL within whatever transaction variant the caller provides.
198	#[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
199	pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> Result<Vec<Frame>> {
200		let mut symbols = self.setup_symbols(&params, tx)?;
201
202		let compiled = match self.compiler.compile_with_policy(tx, rql, inject_read_policies) {
203			Ok(CompilationResult::Ready(compiled)) => compiled,
204			Ok(CompilationResult::Incremental(_)) => {
205				unreachable!("incremental compilation not supported in rql()")
206			}
207			Err(err) => {
208				#[cfg(not(target_arch = "wasm32"))]
209				if let Some(frames) = self.try_forward_remote_query(&err, rql, params)? {
210					return Ok(frames);
211				}
212				return Err(err);
213			}
214		};
215
216		let mut result = vec![];
217		for compiled in compiled.iter() {
218			result.clear();
219			let mut vm = Vm::new(symbols);
220			vm.run(&self.0, tx, &compiled.instructions, &params, &mut result)?;
221			symbols = vm.symbols;
222		}
223
224		Ok(result)
225	}
226
227	#[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
228	pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> Result<Vec<Frame>> {
229		let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn))?;
230
231		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
232			&mut Transaction::Admin(&mut *txn),
233			"admin",
234			true,
235		)?;
236
237		match self.compiler.compile_with_policy(&mut Transaction::Admin(txn), cmd.rql, inject_read_policies) {
238			Err(err) => {
239				#[cfg(not(target_arch = "wasm32"))]
240				if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
241					return Ok(frames);
242				}
243				Err(err)
244			}
245			Ok(CompilationResult::Ready(compiled)) => {
246				let (output, remaining, _) = execute_compiled_units(
247					&self.0,
248					&mut Transaction::Admin(txn),
249					&compiled,
250					&cmd.params,
251					symbols,
252				)?;
253				Ok(merge_results(output, remaining))
254			}
255			Ok(CompilationResult::Incremental(mut state)) => {
256				let policy = constrain_policy(|plans, bump, cat, tx| {
257					inject_read_policies(plans, bump, cat, tx)
258				});
259				let mut result = vec![];
260				let mut output_results: Vec<Frame> = Vec::new();
261				let mut symbols = symbols;
262				while let Some(compiled) = self.compiler.compile_next_with_policy(
263					&mut Transaction::Admin(txn),
264					&mut state,
265					&policy,
266				)? {
267					result.clear();
268					let mut tx = Transaction::Admin(txn);
269					let mut vm = Vm::new(symbols);
270					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
271					symbols = vm.symbols;
272					if compiled.is_output {
273						output_results.append(&mut result);
274					}
275				}
276				Ok(merge_results(output_results, result))
277			}
278		}
279	}
280
281	#[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
282	pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> Result<Vec<Frame>> {
283		let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Test(Box::new(txn.reborrow())))?;
284
285		let session_type = txn.session_type.clone();
286		let session_default_deny = txn.session_default_deny;
287		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
288			&mut Transaction::Test(Box::new(txn.reborrow())),
289			&session_type,
290			session_default_deny,
291		)?;
292
293		match self.compiler.compile_with_policy(
294			&mut Transaction::Test(Box::new(txn.reborrow())),
295			cmd.rql,
296			inject_read_policies,
297		) {
298			Err(err) => {
299				#[cfg(not(target_arch = "wasm32"))]
300				if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
301					return Ok(frames);
302				}
303				Err(err)
304			}
305			Ok(CompilationResult::Ready(compiled)) => {
306				let (output, remaining, _) = execute_compiled_units(
307					&self.0,
308					&mut Transaction::Test(Box::new(txn.reborrow())),
309					&compiled,
310					&cmd.params,
311					symbols,
312				)?;
313				Ok(merge_results(output, remaining))
314			}
315			Ok(CompilationResult::Incremental(mut state)) => {
316				let policy = constrain_policy(|plans, bump, cat, tx| {
317					inject_read_policies(plans, bump, cat, tx)
318				});
319				let mut result = vec![];
320				let mut output_results: Vec<Frame> = Vec::new();
321				let mut symbols = symbols;
322				while let Some(compiled) = self.compiler.compile_next_with_policy(
323					&mut Transaction::Test(Box::new(txn.reborrow())),
324					&mut state,
325					&policy,
326				)? {
327					result.clear();
328					let mut tx = Transaction::Test(Box::new(txn.reborrow()));
329					let mut vm = Vm::new(symbols);
330					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
331					symbols = vm.symbols;
332					if compiled.is_output {
333						output_results.append(&mut result);
334					}
335				}
336				Ok(merge_results(output_results, result))
337			}
338		}
339	}
340
341	#[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
342	pub fn subscription(&self, txn: &mut QueryTransaction, cmd: Subscription<'_>) -> Result<Vec<Frame>> {
343		// Pre-compilation validation: parse and check statement constraints
344		let bump = Bump::new();
345		let statements = parse_str(&bump, cmd.rql)?;
346
347		if statements.len() != 1 {
348			return Err(Error(Box::new(subscription::single_statement_required(
349				"Subscription endpoint requires exactly one statement",
350			))));
351		}
352
353		let statement = &statements[0];
354		if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
355			return Err(Error(Box::new(subscription::invalid_statement(
356				"Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
357			))));
358		}
359
360		let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Query(&mut *txn))?;
361
362		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
363			&mut Transaction::Query(&mut *txn),
364			"subscription",
365			true,
366		)?;
367
368		let compiled = match self.compiler.compile_with_policy(
369			&mut Transaction::Query(txn),
370			cmd.rql,
371			inject_read_policies,
372		) {
373			Ok(CompilationResult::Ready(compiled)) => compiled,
374			Ok(CompilationResult::Incremental(_)) => {
375				unreachable!("Single subscription statement should not require incremental compilation")
376			}
377			Err(err) => return Err(err),
378		};
379
380		let (output, remaining, _) =
381			execute_compiled_units(&self.0, &mut Transaction::Query(txn), &compiled, &cmd.params, symbols)?;
382		Ok(merge_results(output, remaining))
383	}
384
385	#[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
386	pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> Result<Vec<Frame>> {
387		let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn))?;
388
389		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
390			&mut Transaction::Command(&mut *txn),
391			"command",
392			false,
393		)?;
394
395		let compiled = match self.compiler.compile_with_policy(
396			&mut Transaction::Command(txn),
397			cmd.rql,
398			inject_read_policies,
399		) {
400			Ok(CompilationResult::Ready(compiled)) => compiled,
401			Ok(CompilationResult::Incremental(_)) => {
402				unreachable!("DDL statements require admin transactions, not command transactions")
403			}
404			Err(err) => {
405				#[cfg(not(target_arch = "wasm32"))]
406				if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
407					return Err(Error(Box::new(Diagnostic {
408						code: "REMOTE_002".to_string(),
409						message: "Write operations on remote namespaces are not supported"
410							.to_string(),
411						help: Some("Use the remote instance directly for write operations"
412							.to_string()),
413						..Default::default()
414					})));
415				}
416				return Err(err);
417			}
418		};
419
420		let (output, remaining, _) = execute_compiled_units(
421			&self.0,
422			&mut Transaction::Command(txn),
423			&compiled,
424			&cmd.params,
425			symbols,
426		)?;
427		Ok(merge_results(output, remaining))
428	}
429
430	/// Call a procedure by fully-qualified name (e.g., "banking.transfer_funds").
431	#[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
432	pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> Result<Vec<Frame>> {
433		let rql = format!("CALL {}()", name);
434		let symbols = self.setup_symbols(params, &mut Transaction::Command(&mut *txn))?;
435
436		let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql)? {
437			CompilationResult::Ready(compiled) => compiled,
438			CompilationResult::Incremental(_) => {
439				unreachable!("CALL statements should not require incremental compilation")
440			}
441		};
442
443		let mut result = vec![];
444		let mut symbols = symbols;
445		for compiled in compiled.iter() {
446			result.clear();
447			let mut tx = Transaction::Command(txn);
448			let mut vm = Vm::new(symbols);
449			vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result)?;
450			symbols = vm.symbols;
451		}
452
453		Ok(result)
454	}
455
456	#[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
457	pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> Result<Vec<Frame>> {
458		let symbols = self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn))?;
459
460		PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
461			&mut Transaction::Query(&mut *txn),
462			"query",
463			false,
464		)?;
465
466		let compiled = match self.compiler.compile_with_policy(
467			&mut Transaction::Query(txn),
468			qry.rql,
469			inject_read_policies,
470		) {
471			Ok(CompilationResult::Ready(compiled)) => compiled,
472			Ok(CompilationResult::Incremental(_)) => {
473				unreachable!("DDL statements require admin transactions, not query transactions")
474			}
475			Err(err) => {
476				#[cfg(not(target_arch = "wasm32"))]
477				if let Some(frames) = self.try_forward_remote_query(&err, qry.rql, qry.params)? {
478					return Ok(frames);
479				}
480				return Err(err);
481			}
482		};
483
484		let (output, remaining, _) =
485			execute_compiled_units(&self.0, &mut Transaction::Query(txn), &compiled, &qry.params, symbols)?;
486		Ok(merge_results(output, remaining))
487	}
488}