Skip to main content

reifydb_engine/vm/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::FlowOperatorStore};
8use reifydb_core::{error::diagnostic::subscription, util::ioc::IocContainer, value::column::columns::Columns};
9use reifydb_function::registry::Functions;
10use reifydb_metric::metric::MetricReader;
11use reifydb_policy::inject_read_policies;
12use reifydb_rql::{
13	ast::parse_str,
14	compiler::{CompilationResult, constrain_policy},
15};
16use reifydb_runtime::clock::Clock;
17use reifydb_store_single::SingleStore;
18use reifydb_transaction::transaction::{
19	Transaction, admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
20	subscription::SubscriptionTransaction,
21};
22#[cfg(not(target_arch = "wasm32"))]
23use reifydb_type::error::Diagnostic;
24use reifydb_type::{
25	error::Error,
26	params::Params,
27	value::{Value, frame::frame::Frame, identity::IdentityId, r#type::Type},
28};
29use tracing::instrument;
30
31#[cfg(not(target_arch = "wasm32"))]
32use crate::remote::{self, RemoteRegistry};
33use crate::{
34	Result,
35	policy::PolicyEvaluator,
36	procedure::registry::Procedures,
37	transform::registry::Transforms,
38	vm::{
39		Admin, Command, Query, Subscription,
40		services::Services,
41		stack::{SymbolTable, Variable},
42		vm::Vm,
43	},
44};
45
46/// Executor is the orchestration layer for RQL statement execution.
47pub struct Executor(Arc<Services>);
48
49impl Clone for Executor {
50	fn clone(&self) -> Self {
51		Self(self.0.clone())
52	}
53}
54
55impl Deref for Executor {
56	type Target = Services;
57
58	fn deref(&self) -> &Self::Target {
59		&self.0
60	}
61}
62
63impl Executor {
64	pub fn new(
65		catalog: Catalog,
66		clock: Clock,
67		functions: Functions,
68		procedures: Procedures,
69		transforms: Transforms,
70		flow_operator_store: FlowOperatorStore,
71		stats_reader: MetricReader<SingleStore>,
72		ioc: IocContainer,
73		#[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
74	) -> Self {
75		Self(Arc::new(Services::new(
76			catalog,
77			clock,
78			functions,
79			procedures,
80			transforms,
81			flow_operator_store,
82			stats_reader,
83			ioc,
84			#[cfg(not(target_arch = "wasm32"))]
85			remote_registry,
86		)))
87	}
88
89	/// Get a reference to the underlying Services
90	pub fn services(&self) -> &Arc<Services> {
91		&self.0
92	}
93
94	/// Construct an Executor from an existing `Arc<Services>`.
95	pub fn from_services(services: Arc<Services>) -> Self {
96		Self(services)
97	}
98
99	#[allow(dead_code)]
100	pub fn testing() -> Self {
101		Self(Services::testing())
102	}
103
104	/// If the error is a REMOTE_001 and we have a RemoteRegistry, forward the query.
105	/// Returns `Ok(Some(frames))` if forwarded, `Ok(None)` if not a remote query.
106	#[cfg(not(target_arch = "wasm32"))]
107	fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
108		if let Some(ref registry) = self.0.remote_registry {
109			if remote::is_remote_query(err) {
110				if let Some(address) = remote::extract_remote_address(err) {
111					return registry.forward_query(&address, rql, params).map(Some);
112				}
113			}
114		}
115		Ok(None)
116	}
117}
118
119/// Populate a stack with parameters so they can be accessed as variables.
120fn populate_stack(stack: &mut SymbolTable, params: &Params) -> Result<()> {
121	match params {
122		Params::Positional(values) => {
123			for (index, value) in values.iter().enumerate() {
124				let param_name = (index + 1).to_string();
125				stack.set(param_name, Variable::scalar(value.clone()), false)?;
126			}
127		}
128		Params::Named(map) => {
129			for (name, value) in map {
130				stack.set(name.clone(), Variable::scalar(value.clone()), false)?;
131			}
132		}
133		Params::None => {}
134	}
135	Ok(())
136}
137
138/// Populate the `$identity` variable in the symbol table so policy bodies
139/// (and user RQL) can reference `$identity.id`, `$identity.name`, and `$identity.roles`.
140fn populate_identity(
141	stack: &mut SymbolTable,
142	catalog: &Catalog,
143	tx: &mut Transaction<'_>,
144	identity: IdentityId,
145) -> Result<()> {
146	if identity.is_privileged() {
147		return Ok(());
148	}
149	if identity.is_anonymous() {
150		let columns = Columns::single_row([
151			("id", Value::IdentityId(identity)),
152			("name", Value::none_of(Type::Utf8)),
153			("roles", Value::List(vec![])),
154		]);
155		stack.set("identity".to_string(), Variable::Columns(columns), false)?;
156		return Ok(());
157	}
158	if let Some(user) = catalog.find_user_by_identity(tx, identity)? {
159		let roles = catalog.find_role_names_for_identity(tx, identity)?;
160		let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
161		let columns = Columns::single_row([
162			("id", Value::IdentityId(identity)),
163			("name", Value::Utf8(user.name)),
164			("roles", Value::List(role_values)),
165		]);
166		stack.set("identity".to_string(), Variable::Columns(columns), false)?;
167	}
168	Ok(())
169}
170
171impl Executor {
172	/// Execute RQL against an existing open transaction.
173	///
174	/// This is the universal RQL execution interface: it compiles and runs
175	/// arbitrary RQL within whatever transaction variant the caller provides.
176	#[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
177	pub fn rql(
178		&self,
179		tx: &mut Transaction<'_>,
180		identity: IdentityId,
181		rql: &str,
182		params: Params,
183	) -> Result<Vec<Frame>> {
184		let mut result = vec![];
185		let mut symbol_table = SymbolTable::new();
186		populate_stack(&mut symbol_table, &params)?;
187		populate_identity(&mut symbol_table, &self.catalog, tx, identity)?;
188
189		let compiled = match self.compiler.compile_with_policy(tx, rql, |plans, bump, cat, tx| {
190			inject_read_policies(plans, bump, cat, tx, identity)
191		}) {
192			Ok(CompilationResult::Ready(compiled)) => compiled,
193			Ok(CompilationResult::Incremental(_)) => {
194				unreachable!("incremental compilation not supported in rql()")
195			}
196			Err(err) => {
197				#[cfg(not(target_arch = "wasm32"))]
198				if let Some(frames) = self.try_forward_remote_query(&err, rql, params)? {
199					return Ok(frames);
200				}
201				return Err(err);
202			}
203		};
204
205		for compiled in compiled.iter() {
206			result.clear();
207			let mut vm = Vm::new(symbol_table, identity);
208			vm.run(&self.0, tx, &compiled.instructions, &params, &mut result)?;
209			symbol_table = vm.symbol_table;
210		}
211
212		Ok(result)
213	}
214
215	#[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
216	pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> Result<Vec<Frame>> {
217		let mut result = vec![];
218		let mut output_results: Vec<Frame> = Vec::new();
219		let mut symbol_table = SymbolTable::new();
220		populate_stack(&mut symbol_table, &cmd.params)?;
221
222		let identity = cmd.identity;
223		populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Admin(&mut *txn), identity)?;
224
225		PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
226			&mut Transaction::Admin(&mut *txn),
227			identity,
228			"admin",
229			true,
230		)?;
231
232		match self.compiler.compile_with_policy(
233			&mut Transaction::Admin(txn),
234			cmd.rql,
235			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
236		) {
237			Err(err) => {
238				#[cfg(not(target_arch = "wasm32"))]
239				if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
240					return Ok(frames);
241				}
242				return Err(err);
243			}
244			Ok(CompilationResult::Ready(compiled)) => {
245				for compiled in compiled.iter() {
246					result.clear();
247					let mut tx = Transaction::Admin(txn);
248					let mut vm = Vm::new(symbol_table, identity);
249					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
250					symbol_table = vm.symbol_table;
251
252					if compiled.is_output {
253						output_results.append(&mut result);
254					}
255				}
256			}
257			Ok(CompilationResult::Incremental(mut state)) => {
258				let policy = constrain_policy(|plans, bump, cat, tx| {
259					inject_read_policies(plans, bump, cat, tx, identity)
260				});
261				while let Some(compiled) = self.compiler.compile_next_with_policy(
262					&mut Transaction::Admin(txn),
263					&mut state,
264					&policy,
265				)? {
266					result.clear();
267					let mut tx = Transaction::Admin(txn);
268					let mut vm = Vm::new(symbol_table, identity);
269					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
270					symbol_table = vm.symbol_table;
271
272					if compiled.is_output {
273						output_results.append(&mut result);
274					}
275				}
276			}
277		}
278
279		let mut final_result = output_results;
280		final_result.append(&mut result);
281		Ok(final_result)
282	}
283
284	#[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
285	pub fn subscription(&self, txn: &mut SubscriptionTransaction, cmd: Subscription<'_>) -> Result<Vec<Frame>> {
286		// Pre-compilation validation: parse and check statement constraints
287		let bump = Bump::new();
288		let statements = parse_str(&bump, cmd.rql)?;
289
290		if statements.len() != 1 {
291			return Err(Error(subscription::single_statement_required(
292				"Subscription endpoint requires exactly one statement",
293			)));
294		}
295
296		let statement = &statements[0];
297		if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
298			return Err(Error(subscription::invalid_statement(
299				"Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
300			)));
301		}
302
303		// Proceed with standard compilation and execution
304		let mut result = vec![];
305		let mut output_results: Vec<Frame> = Vec::new();
306		let mut symbol_table = SymbolTable::new();
307		populate_stack(&mut symbol_table, &cmd.params)?;
308
309		let identity = cmd.identity;
310		populate_identity(
311			&mut symbol_table,
312			&self.catalog,
313			&mut Transaction::Subscription(&mut *txn),
314			identity,
315		)?;
316
317		PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
318			&mut Transaction::Subscription(&mut *txn),
319			identity,
320			"subscription",
321			true,
322		)?;
323
324		let compiled = match self.compiler.compile_with_policy(
325			&mut Transaction::Subscription(txn),
326			cmd.rql,
327			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
328		) {
329			Ok(CompilationResult::Ready(compiled)) => compiled,
330			Ok(CompilationResult::Incremental(_)) => {
331				unreachable!("Single subscription statement should not require incremental compilation")
332			}
333			Err(err) => return Err(err),
334		};
335
336		for compiled in compiled.iter() {
337			result.clear();
338			let mut tx = Transaction::Subscription(txn);
339			let mut vm = Vm::new(symbol_table, identity);
340			vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
341			symbol_table = vm.symbol_table;
342
343			if compiled.is_output {
344				output_results.append(&mut result);
345			}
346		}
347
348		let mut final_result = output_results;
349		final_result.append(&mut result);
350		Ok(final_result)
351	}
352
353	#[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
354	pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> Result<Vec<Frame>> {
355		let mut result = vec![];
356		let mut output_results: Vec<Frame> = Vec::new();
357		let mut symbol_table = SymbolTable::new();
358		populate_stack(&mut symbol_table, &cmd.params)?;
359
360		let identity = cmd.identity;
361		populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Command(&mut *txn), identity)?;
362
363		PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
364			&mut Transaction::Command(&mut *txn),
365			identity,
366			"command",
367			false,
368		)?;
369
370		let compiled = match self.compiler.compile_with_policy(
371			&mut Transaction::Command(txn),
372			cmd.rql,
373			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
374		) {
375			Ok(CompilationResult::Ready(compiled)) => compiled,
376			Ok(CompilationResult::Incremental(_)) => {
377				unreachable!("DDL statements require admin transactions, not command transactions")
378			}
379			Err(err) => {
380				#[cfg(not(target_arch = "wasm32"))]
381				if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
382					return Err(Error(Diagnostic {
383						code: "REMOTE_002".to_string(),
384						message: "Write operations on remote namespaces are not supported"
385							.to_string(),
386						help: Some("Use the remote instance directly for write operations"
387							.to_string()),
388						..Default::default()
389					}));
390				}
391				return Err(err);
392			}
393		};
394
395		for compiled in compiled.iter() {
396			result.clear();
397			let mut tx = Transaction::Command(txn);
398			let mut vm = Vm::new(symbol_table, identity);
399			vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
400			symbol_table = vm.symbol_table;
401
402			if compiled.is_output {
403				output_results.append(&mut result);
404			}
405		}
406
407		let mut final_result = output_results;
408		final_result.append(&mut result);
409		Ok(final_result)
410	}
411
412	/// Call a procedure by fully-qualified name (e.g., "banking.transfer_funds").
413	#[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
414	pub fn call_procedure(
415		&self,
416		txn: &mut CommandTransaction,
417		identity: IdentityId,
418		name: &str,
419		params: &Params,
420	) -> Result<Vec<Frame>> {
421		// Compile and execute CALL <name>(<params>)
422		let rql = format!("CALL {}()", name);
423		let mut result = vec![];
424		let mut symbol_table = SymbolTable::new();
425		populate_stack(&mut symbol_table, params)?;
426		populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Command(&mut *txn), identity)?;
427
428		let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql)? {
429			CompilationResult::Ready(compiled) => compiled,
430			CompilationResult::Incremental(_) => {
431				unreachable!("CALL statements should not require incremental compilation")
432			}
433		};
434
435		for compiled in compiled.iter() {
436			result.clear();
437			let mut tx = Transaction::Command(txn);
438			let mut vm = Vm::new(symbol_table, identity);
439			vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result)?;
440			symbol_table = vm.symbol_table;
441		}
442
443		Ok(result)
444	}
445
446	#[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
447	pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> Result<Vec<Frame>> {
448		let mut result = vec![];
449		let mut output_results: Vec<Frame> = Vec::new();
450		let mut symbol_table = SymbolTable::new();
451		populate_stack(&mut symbol_table, &qry.params)?;
452
453		let identity = qry.identity;
454		populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Query(&mut *txn), identity)?;
455
456		PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
457			&mut Transaction::Query(&mut *txn),
458			identity,
459			"query",
460			false,
461		)?;
462
463		let compiled = match self.compiler.compile_with_policy(
464			&mut Transaction::Query(txn),
465			qry.rql,
466			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
467		) {
468			Ok(CompilationResult::Ready(compiled)) => compiled,
469			Ok(CompilationResult::Incremental(_)) => {
470				unreachable!("DDL statements require admin transactions, not query transactions")
471			}
472			Err(err) => {
473				#[cfg(not(target_arch = "wasm32"))]
474				if let Some(frames) = self.try_forward_remote_query(&err, qry.rql, qry.params)? {
475					return Ok(frames);
476				}
477				return Err(err);
478			}
479		};
480
481		for compiled in compiled.iter() {
482			result.clear();
483			let mut tx = Transaction::Query(txn);
484			let mut vm = Vm::new(symbol_table, identity);
485			vm.run(&self.0, &mut tx, &compiled.instructions, &qry.params, &mut result)?;
486			symbol_table = vm.symbol_table;
487
488			if compiled.is_output {
489				output_results.append(&mut result);
490			}
491		}
492
493		let mut final_result = output_results;
494		final_result.append(&mut result);
495		Ok(final_result)
496	}
497}