Skip to main content

reifydb_engine/vm/
executor.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::FlowOperatorStore};
7use reifydb_core::{util::ioc::IocContainer, value::column::columns::Columns};
8use reifydb_function::registry::Functions;
9use reifydb_metric::metric::MetricReader;
10use reifydb_policy::inject_read_policies;
11use reifydb_rql::compiler::{CompilationResult, constrain_policy};
12use reifydb_runtime::clock::Clock;
13use reifydb_store_single::SingleStore;
14use reifydb_transaction::transaction::{
15	Transaction, admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
16};
17use reifydb_type::{
18	params::Params,
19	value::{Value, frame::frame::Frame, identity::IdentityId, r#type::Type},
20};
21use tracing::instrument;
22
23use crate::{
24	Result,
25	policy::PolicyEvaluator,
26	procedure::registry::Procedures,
27	transform::registry::Transforms,
28	vm::{
29		Admin, Command, Query,
30		services::Services,
31		stack::{SymbolTable, Variable},
32		vm::Vm,
33	},
34};
35
36/// Executor is the orchestration layer for RQL statement execution.
37pub struct Executor(Arc<Services>);
38
39impl Clone for Executor {
40	fn clone(&self) -> Self {
41		Self(self.0.clone())
42	}
43}
44
45impl Deref for Executor {
46	type Target = Services;
47
48	fn deref(&self) -> &Self::Target {
49		&self.0
50	}
51}
52
53impl Executor {
54	pub fn new(
55		catalog: Catalog,
56		clock: Clock,
57		functions: Functions,
58		procedures: Procedures,
59		transforms: Transforms,
60		flow_operator_store: FlowOperatorStore,
61		stats_reader: MetricReader<SingleStore>,
62		ioc: IocContainer,
63	) -> Self {
64		Self(Arc::new(Services::new(
65			catalog,
66			clock,
67			functions,
68			procedures,
69			transforms,
70			flow_operator_store,
71			stats_reader,
72			ioc,
73		)))
74	}
75
76	/// Get a reference to the underlying Services
77	pub fn services(&self) -> &Arc<Services> {
78		&self.0
79	}
80
81	/// Construct an Executor from an existing `Arc<Services>`.
82	pub fn from_services(services: Arc<Services>) -> Self {
83		Self(services)
84	}
85
86	#[allow(dead_code)]
87	pub fn testing() -> Self {
88		Self(Services::testing())
89	}
90}
91
92/// Populate a stack with parameters so they can be accessed as variables.
93fn populate_stack(stack: &mut SymbolTable, params: &Params) -> Result<()> {
94	match params {
95		Params::Positional(values) => {
96			for (index, value) in values.iter().enumerate() {
97				let param_name = (index + 1).to_string();
98				stack.set(param_name, Variable::scalar(value.clone()), false)?;
99			}
100		}
101		Params::Named(map) => {
102			for (name, value) in map {
103				stack.set(name.clone(), Variable::scalar(value.clone()), false)?;
104			}
105		}
106		Params::None => {}
107	}
108	Ok(())
109}
110
111/// Populate the `$identity` variable in the symbol table so policy bodies
112/// (and user RQL) can reference `$identity.id`, `$identity.name`, and `$identity.roles`.
113fn populate_identity(
114	stack: &mut SymbolTable,
115	catalog: &Catalog,
116	tx: &mut Transaction<'_>,
117	identity: IdentityId,
118) -> Result<()> {
119	if identity.is_privileged() {
120		return Ok(());
121	}
122	if identity.is_anonymous() {
123		let columns = Columns::single_row([
124			("id", Value::IdentityId(identity)),
125			("name", Value::none_of(Type::Utf8)),
126			("roles", Value::List(vec![])),
127		]);
128		stack.set("identity".to_string(), Variable::Columns(columns), false)?;
129		return Ok(());
130	}
131	if let Some(user) = catalog.find_user_by_identity(tx, identity)? {
132		let roles = catalog.find_role_names_for_identity(tx, identity)?;
133		let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
134		let columns = Columns::single_row([
135			("id", Value::IdentityId(identity)),
136			("name", Value::Utf8(user.name)),
137			("roles", Value::List(role_values)),
138		]);
139		stack.set("identity".to_string(), Variable::Columns(columns), false)?;
140	}
141	Ok(())
142}
143
144impl Executor {
145	/// Execute RQL against an existing open transaction.
146	///
147	/// This is the universal RQL execution interface: it compiles and runs
148	/// arbitrary RQL within whatever transaction variant the caller provides.
149	#[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
150	pub fn rql(
151		&self,
152		tx: &mut Transaction<'_>,
153		identity: IdentityId,
154		rql: &str,
155		params: Params,
156	) -> Result<Vec<Frame>> {
157		let mut result = vec![];
158		let mut symbol_table = SymbolTable::new();
159		populate_stack(&mut symbol_table, &params)?;
160		populate_identity(&mut symbol_table, &self.catalog, tx, identity)?;
161
162		let compiled = match self.compiler.compile_with_policy(tx, rql, |plans, bump, cat, tx| {
163			inject_read_policies(plans, bump, cat, tx, identity)
164		})? {
165			CompilationResult::Ready(compiled) => compiled,
166			CompilationResult::Incremental(_) => {
167				unreachable!("incremental compilation not supported in rql()")
168			}
169		};
170
171		for compiled in compiled.iter() {
172			result.clear();
173			let mut vm = Vm::new(symbol_table, identity);
174			vm.run(&self.0, tx, &compiled.instructions, &params, &mut result)?;
175			symbol_table = vm.symbol_table;
176		}
177
178		Ok(result)
179	}
180
181	#[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
182	pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> Result<Vec<Frame>> {
183		let mut result = vec![];
184		let mut output_results: Vec<Frame> = Vec::new();
185		let mut symbol_table = SymbolTable::new();
186		populate_stack(&mut symbol_table, &cmd.params)?;
187
188		let identity = cmd.identity;
189		populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Admin(&mut *txn), identity)?;
190
191		PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
192			&mut Transaction::Admin(&mut *txn),
193			identity,
194			"admin",
195			true,
196		)?;
197
198		match self.compiler.compile_with_policy(
199			&mut Transaction::Admin(txn),
200			cmd.rql,
201			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
202		)? {
203			CompilationResult::Ready(compiled) => {
204				for compiled in compiled.iter() {
205					result.clear();
206					let mut tx = Transaction::Admin(txn);
207					let mut vm = Vm::new(symbol_table, identity);
208					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
209					symbol_table = vm.symbol_table;
210
211					if compiled.is_output {
212						output_results.append(&mut result);
213					}
214				}
215			}
216			CompilationResult::Incremental(mut state) => {
217				let policy = constrain_policy(|plans, bump, cat, tx| {
218					inject_read_policies(plans, bump, cat, tx, identity)
219				});
220				while let Some(compiled) = self.compiler.compile_next_with_policy(
221					&mut Transaction::Admin(txn),
222					&mut state,
223					&policy,
224				)? {
225					result.clear();
226					let mut tx = Transaction::Admin(txn);
227					let mut vm = Vm::new(symbol_table, identity);
228					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
229					symbol_table = vm.symbol_table;
230
231					if compiled.is_output {
232						output_results.append(&mut result);
233					}
234				}
235			}
236		}
237
238		let mut final_result = output_results;
239		final_result.append(&mut result);
240		Ok(final_result)
241	}
242
243	#[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
244	pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> Result<Vec<Frame>> {
245		let mut result = vec![];
246		let mut output_results: Vec<Frame> = Vec::new();
247		let mut symbol_table = SymbolTable::new();
248		populate_stack(&mut symbol_table, &cmd.params)?;
249
250		let identity = cmd.identity;
251		populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Command(&mut *txn), identity)?;
252
253		PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
254			&mut Transaction::Command(&mut *txn),
255			identity,
256			"command",
257			false,
258		)?;
259
260		let compiled = match self.compiler.compile_with_policy(
261			&mut Transaction::Command(txn),
262			cmd.rql,
263			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
264		)? {
265			CompilationResult::Ready(compiled) => compiled,
266			CompilationResult::Incremental(_) => {
267				unreachable!("DDL statements require admin transactions, not command transactions")
268			}
269		};
270
271		for compiled in compiled.iter() {
272			result.clear();
273			let mut tx = Transaction::Command(txn);
274			let mut vm = Vm::new(symbol_table, identity);
275			vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
276			symbol_table = vm.symbol_table;
277
278			if compiled.is_output {
279				output_results.append(&mut result);
280			}
281		}
282
283		let mut final_result = output_results;
284		final_result.append(&mut result);
285		Ok(final_result)
286	}
287
288	/// Call a procedure by fully-qualified name (e.g., "banking.transfer_funds").
289	#[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
290	pub fn call_procedure(
291		&self,
292		txn: &mut CommandTransaction,
293		identity: IdentityId,
294		name: &str,
295		params: &Params,
296	) -> Result<Vec<Frame>> {
297		// Compile and execute CALL <name>(<params>)
298		let rql = format!("CALL {}()", name);
299		let mut result = vec![];
300		let mut symbol_table = SymbolTable::new();
301		populate_stack(&mut symbol_table, params)?;
302		populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Command(&mut *txn), identity)?;
303
304		let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql)? {
305			CompilationResult::Ready(compiled) => compiled,
306			CompilationResult::Incremental(_) => {
307				unreachable!("CALL statements should not require incremental compilation")
308			}
309		};
310
311		for compiled in compiled.iter() {
312			result.clear();
313			let mut tx = Transaction::Command(txn);
314			let mut vm = Vm::new(symbol_table, identity);
315			vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result)?;
316			symbol_table = vm.symbol_table;
317		}
318
319		Ok(result)
320	}
321
322	#[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
323	pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> Result<Vec<Frame>> {
324		let mut result = vec![];
325		let mut output_results: Vec<Frame> = Vec::new();
326		let mut symbol_table = SymbolTable::new();
327		populate_stack(&mut symbol_table, &qry.params)?;
328
329		let identity = qry.identity;
330		populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Query(&mut *txn), identity)?;
331
332		PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
333			&mut Transaction::Query(&mut *txn),
334			identity,
335			"query",
336			false,
337		)?;
338
339		let compiled = match self.compiler.compile_with_policy(
340			&mut Transaction::Query(txn),
341			qry.rql,
342			|plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
343		)? {
344			CompilationResult::Ready(compiled) => compiled,
345			CompilationResult::Incremental(_) => {
346				unreachable!("DDL statements require admin transactions, not query transactions")
347			}
348		};
349
350		for compiled in compiled.iter() {
351			result.clear();
352			let mut tx = Transaction::Query(txn);
353			let mut vm = Vm::new(symbol_table, identity);
354			vm.run(&self.0, &mut tx, &compiled.instructions, &qry.params, &mut result)?;
355			symbol_table = vm.symbol_table;
356
357			if compiled.is_output {
358				output_results.append(&mut result);
359			}
360		}
361
362		let mut final_result = output_results;
363		final_result.append(&mut result);
364		Ok(final_result)
365	}
366}