Skip to main content

reifydb_engine/vm/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::FlowOperatorStore};
7use reifydb_core::{util::ioc::IocContainer, value::column::columns::Columns};
8use reifydb_function::registry::Functions;
9use reifydb_metric::metric::MetricReader;
10use reifydb_policy::inject_read_policies;
11use reifydb_rql::compiler::{CompilationResult, constrain_policy};
12use reifydb_runtime::clock::Clock;
13use reifydb_store_single::SingleStore;
14use reifydb_transaction::transaction::{
15	Transaction, admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
16};
17#[cfg(not(target_arch = "wasm32"))]
18use reifydb_type::error::{Diagnostic, Error};
19use reifydb_type::{
20	params::Params,
21	value::{Value, frame::frame::Frame, identity::IdentityId, r#type::Type},
22};
23use tracing::instrument;
24
25#[cfg(not(target_arch = "wasm32"))]
26use crate::remote::{self, RemoteRegistry};
27use crate::{
28	Result,
29	policy::PolicyEvaluator,
30	procedure::registry::Procedures,
31	transform::registry::Transforms,
32	vm::{
33		Admin, Command, Query,
34		services::Services,
35		stack::{SymbolTable, Variable},
36		vm::Vm,
37	},
38};
39
40/// Executor is the orchestration layer for RQL statement execution.
41pub struct Executor(Arc<Services>);
42
43impl Clone for Executor {
44	fn clone(&self) -> Self {
45		Self(self.0.clone())
46	}
47}
48
49impl Deref for Executor {
50	type Target = Services;
51
52	fn deref(&self) -> &Self::Target {
53		&self.0
54	}
55}
56
57impl Executor {
58	pub fn new(
59		catalog: Catalog,
60		clock: Clock,
61		functions: Functions,
62		procedures: Procedures,
63		transforms: Transforms,
64		flow_operator_store: FlowOperatorStore,
65		stats_reader: MetricReader<SingleStore>,
66		ioc: IocContainer,
67		#[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
68	) -> Self {
69		Self(Arc::new(Services::new(
70			catalog,
71			clock,
72			functions,
73			procedures,
74			transforms,
75			flow_operator_store,
76			stats_reader,
77			ioc,
78			#[cfg(not(target_arch = "wasm32"))]
79			remote_registry,
80		)))
81	}
82
83	/// Get a reference to the underlying Services
84	pub fn services(&self) -> &Arc<Services> {
85		&self.0
86	}
87
88	/// Construct an Executor from an existing `Arc<Services>`.
89	pub fn from_services(services: Arc<Services>) -> Self {
90		Self(services)
91	}
92
93	#[allow(dead_code)]
94	pub fn testing() -> Self {
95		Self(Services::testing())
96	}
97
98	/// If the error is a REMOTE_001 and we have a RemoteRegistry, forward the query.
99	/// Returns `Ok(Some(frames))` if forwarded, `Ok(None)` if not a remote query.
100	#[cfg(not(target_arch = "wasm32"))]
101	fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
102		if let Some(ref registry) = self.0.remote_registry {
103			if remote::is_remote_query(err) {
104				if let Some(address) = remote::extract_remote_address(err) {
105					return registry.forward_query(&address, rql, params).map(Some);
106				}
107			}
108		}
109		Ok(None)
110	}
111}
112
113/// Populate a stack with parameters so they can be accessed as variables.
114fn populate_stack(stack: &mut SymbolTable, params: &Params) -> Result<()> {
115	match params {
116		Params::Positional(values) => {
117			for (index, value) in values.iter().enumerate() {
118				let param_name = (index + 1).to_string();
119				stack.set(param_name, Variable::scalar(value.clone()), false)?;
120			}
121		}
122		Params::Named(map) => {
123			for (name, value) in map {
124				stack.set(name.clone(), Variable::scalar(value.clone()), false)?;
125			}
126		}
127		Params::None => {}
128	}
129	Ok(())
130}
131
132/// Populate the `$identity` variable in the symbol table so policy bodies
133/// (and user RQL) can reference `$identity.id`, `$identity.name`, and `$identity.roles`.
134fn populate_identity(
135	stack: &mut SymbolTable,
136	catalog: &Catalog,
137	tx: &mut Transaction<'_>,
138	identity: IdentityId,
139) -> Result<()> {
140	if identity.is_privileged() {
141		return Ok(());
142	}
143	if identity.is_anonymous() {
144		let columns = Columns::single_row([
145			("id", Value::IdentityId(identity)),
146			("name", Value::none_of(Type::Utf8)),
147			("roles", Value::List(vec![])),
148		]);
149		stack.set("identity".to_string(), Variable::Columns(columns), false)?;
150		return Ok(());
151	}
152	if let Some(user) = catalog.find_user_by_identity(tx, identity)? {
153		let roles = catalog.find_role_names_for_identity(tx, identity)?;
154		let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
155		let columns = Columns::single_row([
156			("id", Value::IdentityId(identity)),
157			("name", Value::Utf8(user.name)),
158			("roles", Value::List(role_values)),
159		]);
160		stack.set("identity".to_string(), Variable::Columns(columns), false)?;
161	}
162	Ok(())
163}
164
165impl Executor {
166	/// Execute RQL against an existing open transaction.
167	///
168	/// This is the universal RQL execution interface: it compiles and runs
169	/// arbitrary RQL within whatever transaction variant the caller provides.
170	#[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
171	pub fn rql(
172		&self,
173		tx: &mut Transaction<'_>,
174		identity: IdentityId,
175		rql: &str,
176		params: Params,
177	) -> Result<Vec<Frame>> {
178		let mut result = vec![];
179		let mut symbol_table = SymbolTable::new();
180		populate_stack(&mut symbol_table, &params)?;
181		populate_identity(&mut symbol_table, &self.catalog, tx, identity)?;
182
183		let compiled = match self.compiler.compile_with_policy(tx, rql, |plans, bump, cat, tx| {
184			inject_read_policies(plans, bump, cat, tx, identity)
185		}) {
186			Ok(CompilationResult::Ready(compiled)) => compiled,
187			Ok(CompilationResult::Incremental(_)) => {
188				unreachable!("incremental compilation not supported in rql()")
189			}
190			Err(err) => {
191				#[cfg(not(target_arch = "wasm32"))]
192				if let Some(frames) = self.try_forward_remote_query(&err, rql, params)? {
193					return Ok(frames);
194				}
195				return Err(err);
196			}
197		};
198
199		for compiled in compiled.iter() {
200			result.clear();
201			let mut vm = Vm::new(symbol_table, identity);
202			vm.run(&self.0, tx, &compiled.instructions, &params, &mut result)?;
203			symbol_table = vm.symbol_table;
204		}
205
206		Ok(result)
207	}
208
209	#[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
210	pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> Result<Vec<Frame>> {
211		let mut result = vec![];
212		let mut output_results: Vec<Frame> = Vec::new();
213		let mut symbol_table = SymbolTable::new();
214		populate_stack(&mut symbol_table, &cmd.params)?;
215
216		let identity = cmd.identity;
217		populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Admin(&mut *txn), identity)?;
218
219		PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
220			&mut Transaction::Admin(&mut *txn),
221			identity,
222			"admin",
223			true,
224		)?;
225
226		match self.compiler.compile_with_policy(
227			&mut Transaction::Admin(txn),
228			cmd.rql,
229			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
230		) {
231			Err(err) => {
232				#[cfg(not(target_arch = "wasm32"))]
233				if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
234					return Ok(frames);
235				}
236				return Err(err);
237			}
238			Ok(CompilationResult::Ready(compiled)) => {
239				for compiled in compiled.iter() {
240					result.clear();
241					let mut tx = Transaction::Admin(txn);
242					let mut vm = Vm::new(symbol_table, identity);
243					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
244					symbol_table = vm.symbol_table;
245
246					if compiled.is_output {
247						output_results.append(&mut result);
248					}
249				}
250			}
251			Ok(CompilationResult::Incremental(mut state)) => {
252				let policy = constrain_policy(|plans, bump, cat, tx| {
253					inject_read_policies(plans, bump, cat, tx, identity)
254				});
255				while let Some(compiled) = self.compiler.compile_next_with_policy(
256					&mut Transaction::Admin(txn),
257					&mut state,
258					&policy,
259				)? {
260					result.clear();
261					let mut tx = Transaction::Admin(txn);
262					let mut vm = Vm::new(symbol_table, identity);
263					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
264					symbol_table = vm.symbol_table;
265
266					if compiled.is_output {
267						output_results.append(&mut result);
268					}
269				}
270			}
271		}
272
273		let mut final_result = output_results;
274		final_result.append(&mut result);
275		Ok(final_result)
276	}
277
278	#[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
279	pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> Result<Vec<Frame>> {
280		let mut result = vec![];
281		let mut output_results: Vec<Frame> = Vec::new();
282		let mut symbol_table = SymbolTable::new();
283		populate_stack(&mut symbol_table, &cmd.params)?;
284
285		let identity = cmd.identity;
286		populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Command(&mut *txn), identity)?;
287
288		PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
289			&mut Transaction::Command(&mut *txn),
290			identity,
291			"command",
292			false,
293		)?;
294
295		let compiled = match self.compiler.compile_with_policy(
296			&mut Transaction::Command(txn),
297			cmd.rql,
298			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
299		) {
300			Ok(CompilationResult::Ready(compiled)) => compiled,
301			Ok(CompilationResult::Incremental(_)) => {
302				unreachable!("DDL statements require admin transactions, not command transactions")
303			}
304			Err(err) => {
305				#[cfg(not(target_arch = "wasm32"))]
306				if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
307					return Err(Error(Diagnostic {
308						code: "REMOTE_002".to_string(),
309						message: "Write operations on remote namespaces are not supported"
310							.to_string(),
311						help: Some("Use the remote instance directly for write operations"
312							.to_string()),
313						..Default::default()
314					}));
315				}
316				return Err(err);
317			}
318		};
319
320		for compiled in compiled.iter() {
321			result.clear();
322			let mut tx = Transaction::Command(txn);
323			let mut vm = Vm::new(symbol_table, identity);
324			vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
325			symbol_table = vm.symbol_table;
326
327			if compiled.is_output {
328				output_results.append(&mut result);
329			}
330		}
331
332		let mut final_result = output_results;
333		final_result.append(&mut result);
334		Ok(final_result)
335	}
336
337	/// Call a procedure by fully-qualified name (e.g., "banking.transfer_funds").
338	#[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
339	pub fn call_procedure(
340		&self,
341		txn: &mut CommandTransaction,
342		identity: IdentityId,
343		name: &str,
344		params: &Params,
345	) -> Result<Vec<Frame>> {
346		// Compile and execute CALL <name>(<params>)
347		let rql = format!("CALL {}()", name);
348		let mut result = vec![];
349		let mut symbol_table = SymbolTable::new();
350		populate_stack(&mut symbol_table, params)?;
351		populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Command(&mut *txn), identity)?;
352
353		let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql)? {
354			CompilationResult::Ready(compiled) => compiled,
355			CompilationResult::Incremental(_) => {
356				unreachable!("CALL statements should not require incremental compilation")
357			}
358		};
359
360		for compiled in compiled.iter() {
361			result.clear();
362			let mut tx = Transaction::Command(txn);
363			let mut vm = Vm::new(symbol_table, identity);
364			vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result)?;
365			symbol_table = vm.symbol_table;
366		}
367
368		Ok(result)
369	}
370
371	#[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
372	pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> Result<Vec<Frame>> {
373		let mut result = vec![];
374		let mut output_results: Vec<Frame> = Vec::new();
375		let mut symbol_table = SymbolTable::new();
376		populate_stack(&mut symbol_table, &qry.params)?;
377
378		let identity = qry.identity;
379		populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Query(&mut *txn), identity)?;
380
381		PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
382			&mut Transaction::Query(&mut *txn),
383			identity,
384			"query",
385			false,
386		)?;
387
388		let compiled = match self.compiler.compile_with_policy(
389			&mut Transaction::Query(txn),
390			qry.rql,
391			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
392		) {
393			Ok(CompilationResult::Ready(compiled)) => compiled,
394			Ok(CompilationResult::Incremental(_)) => {
395				unreachable!("DDL statements require admin transactions, not query transactions")
396			}
397			Err(err) => {
398				#[cfg(not(target_arch = "wasm32"))]
399				if let Some(frames) = self.try_forward_remote_query(&err, qry.rql, qry.params)? {
400					return Ok(frames);
401				}
402				return Err(err);
403			}
404		};
405
406		for compiled in compiled.iter() {
407			result.clear();
408			let mut tx = Transaction::Query(txn);
409			let mut vm = Vm::new(symbol_table, identity);
410			vm.run(&self.0, &mut tx, &compiled.instructions, &qry.params, &mut result)?;
411			symbol_table = vm.symbol_table;
412
413			if compiled.is_output {
414				output_results.append(&mut result);
415			}
416		}
417
418		let mut final_result = output_results;
419		final_result.append(&mut result);
420		Ok(final_result)
421	}
422}