Skip to main content

reifydb_engine/vm/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, result::Result as StdResult, sync::Arc, time::Duration};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{
9	error::diagnostic::subscription,
10	execution::ExecutionResult,
11	metric::{ExecutionMetrics, StatementMetric},
12	value::column::columns::Columns,
13};
14use reifydb_metric_old::metric::MetricReader;
15use reifydb_policy::inject_read_policies;
16use reifydb_rql::{
17	ast::parse_str,
18	compiler::{CompilationResult, Compiled, constrain_policy},
19	fingerprint::request::fingerprint_request,
20};
21use reifydb_store_single::SingleStore;
22use reifydb_transaction::transaction::{
23	RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
24	query::QueryTransaction,
25};
26#[cfg(not(reifydb_single_threaded))]
27use reifydb_type::error::Diagnostic;
28use reifydb_type::{
29	error::Error,
30	params::Params,
31	value::{Value, frame::frame::Frame, r#type::Type},
32};
33use tracing::instrument;
34
35#[cfg(not(reifydb_single_threaded))]
36use crate::remote;
37use crate::{
38	Result,
39	policy::PolicyEvaluator,
40	vm::{
41		Admin, Command, Query, Subscription, Test,
42		services::{EngineConfig, Services},
43		stack::{SymbolTable, Variable},
44		vm::Vm,
45	},
46};
47
48/// Executor is the orchestration layer for RQL statement execution.
49pub struct Executor(Arc<Services>);
50
51impl Clone for Executor {
52	fn clone(&self) -> Self {
53		Self(self.0.clone())
54	}
55}
56
57impl Deref for Executor {
58	type Target = Services;
59
60	fn deref(&self) -> &Self::Target {
61		&self.0
62	}
63}
64
65impl Executor {
66	pub fn new(
67		catalog: Catalog,
68		config: EngineConfig,
69		flow_operator_store: SystemFlowOperatorStore,
70		stats_reader: MetricReader<SingleStore>,
71	) -> Self {
72		Self(Arc::new(Services::new(catalog, config, flow_operator_store, stats_reader)))
73	}
74
75	/// Get a reference to the underlying Services
76	pub fn services(&self) -> &Arc<Services> {
77		&self.0
78	}
79
80	/// Construct an Executor from an existing `Arc<Services>`.
81	pub fn from_services(services: Arc<Services>) -> Self {
82		Self(services)
83	}
84
85	#[allow(dead_code)]
86	pub fn testing() -> Self {
87		Self(Services::testing())
88	}
89
90	/// If the error is a REMOTE_001 and we have a RemoteRegistry, forward the query.
91	/// Returns `Ok(Some(frames))` if forwarded, `Ok(None)` if not a remote query.
92	#[cfg(not(reifydb_single_threaded))]
93	fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
94		if let Some(ref registry) = self.0.remote_registry
95			&& remote::is_remote_query(err)
96			&& let Some(address) = remote::extract_remote_address(err)
97		{
98			let token = remote::extract_remote_token(err);
99			return registry.forward_query(&address, rql, params, token.as_deref()).map(Some);
100		}
101		Ok(None)
102	}
103}
104
105impl RqlExecutor for Executor {
106	fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
107		Executor::rql(self, tx, rql, params)
108	}
109}
110
111/// Populate a stack with parameters so they can be accessed as variables.
112fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
113	match params {
114		Params::Positional(values) => {
115			for (index, value) in values.iter().enumerate() {
116				let param_name = (index + 1).to_string();
117				symbols.set(param_name, Variable::scalar(value.clone()), false)?;
118			}
119		}
120		Params::Named(map) => {
121			for (name, value) in map.iter() {
122				symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
123			}
124		}
125		Params::None => {}
126	}
127	Ok(())
128}
129
130/// Populate the `$identity` variable in the symbol table so policy bodies
131/// (and user RQL) can reference `$identity.id`, `$identity.name`, and `$identity.roles`.
132fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
133	let identity = tx.identity();
134	if identity.is_privileged() {
135		return Ok(());
136	}
137	if identity.is_anonymous() {
138		let columns = Columns::single_row([
139			("id", Value::IdentityId(identity)),
140			("name", Value::none_of(Type::Utf8)),
141			("roles", Value::List(vec![])),
142		]);
143		symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
144		return Ok(());
145	}
146	if let Some(user) = catalog.find_identity(tx, identity)? {
147		let roles = catalog.find_role_names_for_identity(tx, identity)?;
148		let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
149		let columns = Columns::single_row([
150			("id", Value::IdentityId(identity)),
151			("name", Value::Utf8(user.name)),
152			("roles", Value::List(role_values)),
153		]);
154		symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
155	}
156	Ok(())
157}
158
159/// Execute a list of compiled units, tracking output frames separately.
160type CompiledUnitsResult = (Vec<Frame>, Vec<Frame>, SymbolTable, Vec<StatementMetric>);
161
162/// Error from `execute_compiled_units` that preserves partial metrics.
163struct ExecutionFailure {
164	error: Error,
165	partial_metrics: Vec<StatementMetric>,
166}
167
168/// Build `ExecutionMetrics` from a list of statement metrics.
169fn build_metrics(statements: Vec<StatementMetric>) -> ExecutionMetrics {
170	let fps: Vec<_> = statements.iter().map(|m| m.fingerprint).collect();
171	ExecutionMetrics {
172		request_fingerprint: fingerprint_request(&fps),
173		statements,
174	}
175}
176
177/// Returns (output_results, last_result, final_symbols, metrics).
178fn execute_compiled_units(
179	services: &Arc<Services>,
180	tx: &mut Transaction<'_>,
181	compiled_list: &[Compiled],
182	params: &Params,
183	mut symbols: SymbolTable,
184	compile_duration: Duration,
185) -> StdResult<CompiledUnitsResult, ExecutionFailure> {
186	let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
187	let mut result = vec![];
188	let mut output_results: Vec<Frame> = Vec::new();
189	let mut metrics = Vec::new();
190
191	for compiled in compiled_list.iter() {
192		result.clear();
193		let mut vm = Vm::new(symbols);
194		let start = services.runtime_context.clock.instant();
195		let run_result = vm.run(services, tx, &compiled.instructions, params, &mut result);
196		let execute_duration = start.elapsed();
197		symbols = vm.symbols;
198
199		metrics.push(StatementMetric {
200			fingerprint: compiled.fingerprint,
201			normalized_rql: compiled.normalized_rql.clone(),
202			compile_duration_us,
203			execute_duration_us: execute_duration.as_micros() as u64,
204			rows_affected: if run_result.is_ok() {
205				extract_rows_affected(&result)
206			} else {
207				0
208			},
209		});
210
211		if let Err(error) = run_result {
212			return Err(ExecutionFailure {
213				error,
214				partial_metrics: metrics,
215			});
216		}
217
218		if compiled.is_output {
219			output_results.append(&mut result);
220		}
221	}
222
223	Ok((output_results, result, symbols, metrics))
224}
225
226/// Merge output_results and remaining results into the final result.
227fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
228	output_results.append(&mut remaining);
229	output_results
230}
231
232/// Extract the actual rows-affected count from a DML result.
233///
234/// DML handlers (INSERT/UPDATE/DELETE) emit a single summary frame with a
235/// column named "inserted", "updated", or "deleted" containing the count as
236/// a `Uint8` value. When that pattern is detected, return the real count.
237/// Otherwise fall back to the number of frames (correct for SELECT, DDL, etc.).
238fn extract_rows_affected(result: &[Frame]) -> u64 {
239	if result.len() == 1 {
240		let frame = &result[0];
241		for col in &frame.columns {
242			match col.name.as_str() {
243				"inserted" | "updated" | "deleted" => {
244					if col.data.len() == 1
245						&& let Value::Uint8(n) = col.data.get_value(0)
246					{
247						return n;
248					}
249				}
250				_ => {}
251			}
252		}
253	}
254	result.len() as u64
255}
256
257impl Executor {
258	/// Shared setup: create symbols and populate with params + identity.
259	fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
260		let mut symbols = SymbolTable::new();
261		populate_symbols(&mut symbols, params)?;
262		populate_identity(&mut symbols, &self.catalog, tx)?;
263		Ok(symbols)
264	}
265
266	/// Execute RQL against an existing open transaction.
267	///
268	/// This is the universal RQL execution interface: it compiles and runs
269	/// arbitrary RQL within whatever transaction variant the caller provides.
270	#[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
271	pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
272		let mut symbols = match self.setup_symbols(&params, tx) {
273			Ok(s) => s,
274			Err(e) => {
275				return ExecutionResult {
276					frames: vec![],
277					error: Some(e),
278					metrics: ExecutionMetrics::default(),
279				};
280			}
281		};
282
283		let start_compile = self.0.runtime_context.clock.instant();
284		let compiled_list = match self.compiler.compile_with_policy(tx, rql, inject_read_policies) {
285			Ok(CompilationResult::Ready(compiled)) => compiled,
286			Ok(CompilationResult::Incremental(_)) => {
287				unreachable!("incremental compilation not supported in rql()")
288			}
289			Err(err) => {
290				#[cfg(not(reifydb_single_threaded))]
291				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
292					return ExecutionResult {
293						frames,
294						error: None,
295						metrics: ExecutionMetrics::default(),
296					};
297				}
298				return ExecutionResult {
299					frames: vec![],
300					error: Some(err),
301					metrics: ExecutionMetrics::default(),
302				};
303			}
304		};
305		let compile_duration = start_compile.elapsed();
306		let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
307
308		let mut result = vec![];
309		let mut metrics = Vec::new();
310		for compiled in compiled_list.iter() {
311			result.clear();
312			let mut vm = Vm::new(symbols);
313			let start_execute = self.0.runtime_context.clock.instant();
314			let run_result = vm.run(&self.0, tx, &compiled.instructions, &params, &mut result);
315			let execute_duration = start_execute.elapsed();
316			symbols = vm.symbols;
317
318			metrics.push(StatementMetric {
319				fingerprint: compiled.fingerprint,
320				normalized_rql: compiled.normalized_rql.clone(),
321				compile_duration_us,
322				execute_duration_us: execute_duration.as_micros() as u64,
323				rows_affected: if run_result.is_ok() {
324					extract_rows_affected(&result)
325				} else {
326					0
327				},
328			});
329
330			if let Err(e) = run_result {
331				return ExecutionResult {
332					frames: vec![],
333					error: Some(e),
334					metrics: build_metrics(metrics),
335				};
336			}
337		}
338
339		ExecutionResult {
340			frames: result,
341			error: None,
342			metrics: build_metrics(metrics),
343		}
344	}
345
346	#[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
347	pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> ExecutionResult {
348		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn)) {
349			Ok(s) => s,
350			Err(e) => {
351				return ExecutionResult {
352					frames: vec![],
353					error: Some(e),
354					metrics: ExecutionMetrics::default(),
355				};
356			}
357		};
358
359		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
360			&mut Transaction::Admin(&mut *txn),
361			"admin",
362			true,
363		) {
364			return ExecutionResult {
365				frames: vec![],
366				error: Some(e),
367				metrics: ExecutionMetrics::default(),
368			};
369		}
370
371		let start_compile = self.0.runtime_context.clock.instant();
372		match self.compiler.compile_with_policy(&mut Transaction::Admin(txn), cmd.rql, inject_read_policies) {
373			Err(err) => {
374				#[cfg(not(reifydb_single_threaded))]
375				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
376					return ExecutionResult {
377						frames,
378						error: None,
379						metrics: ExecutionMetrics::default(),
380					};
381				}
382				ExecutionResult {
383					frames: vec![],
384					error: Some(err),
385					metrics: ExecutionMetrics::default(),
386				}
387			}
388			Ok(CompilationResult::Ready(compiled)) => {
389				let compile_duration = start_compile.elapsed();
390				match execute_compiled_units(
391					&self.0,
392					&mut Transaction::Admin(txn),
393					&compiled,
394					&cmd.params,
395					symbols,
396					compile_duration,
397				) {
398					Ok((output, remaining, _, metrics)) => ExecutionResult {
399						frames: merge_results(output, remaining),
400						error: None,
401						metrics: build_metrics(metrics),
402					},
403					Err(f) => ExecutionResult {
404						frames: vec![],
405						error: Some(f.error),
406						metrics: build_metrics(f.partial_metrics),
407					},
408				}
409			}
410			Ok(CompilationResult::Incremental(mut state)) => {
411				let policy = constrain_policy(|plans, bump, cat, tx| {
412					inject_read_policies(plans, bump, cat, tx)
413				});
414				let mut result = vec![];
415				let mut output_results: Vec<Frame> = Vec::new();
416				let mut symbols = symbols;
417				let mut metrics = Vec::new();
418				loop {
419					let start_incr = self.0.runtime_context.clock.instant();
420					let next = match self.compiler.compile_next_with_policy(
421						&mut Transaction::Admin(txn),
422						&mut state,
423						&policy,
424					) {
425						Ok(n) => n,
426						Err(e) => {
427							return ExecutionResult {
428								frames: vec![],
429								error: Some(e),
430								metrics: build_metrics(metrics),
431							};
432						}
433					};
434					let compile_duration = start_incr.elapsed();
435
436					let Some(compiled) = next else {
437						break;
438					};
439
440					result.clear();
441					let mut tx = Transaction::Admin(txn);
442					let mut vm = Vm::new(symbols);
443					let start_execute = self.0.runtime_context.clock.instant();
444					let run_result = vm.run(
445						&self.0,
446						&mut tx,
447						&compiled.instructions,
448						&cmd.params,
449						&mut result,
450					);
451					let execute_duration = start_execute.elapsed();
452					symbols = vm.symbols;
453
454					metrics.push(StatementMetric {
455						fingerprint: compiled.fingerprint,
456						normalized_rql: compiled.normalized_rql,
457						compile_duration_us: compile_duration.as_micros() as u64,
458						execute_duration_us: execute_duration.as_micros() as u64,
459						rows_affected: if run_result.is_ok() {
460							extract_rows_affected(&result)
461						} else {
462							0
463						},
464					});
465
466					if let Err(e) = run_result {
467						return ExecutionResult {
468							frames: vec![],
469							error: Some(e),
470							metrics: build_metrics(metrics),
471						};
472					}
473
474					if compiled.is_output {
475						output_results.append(&mut result);
476					}
477				}
478				ExecutionResult {
479					frames: merge_results(output_results, result),
480					error: None,
481					metrics: build_metrics(metrics),
482				}
483			}
484		}
485	}
486
487	#[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
488	pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> ExecutionResult {
489		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Test(Box::new(txn.reborrow()))) {
490			Ok(s) => s,
491			Err(e) => {
492				return ExecutionResult {
493					frames: vec![],
494					error: Some(e),
495					metrics: ExecutionMetrics::default(),
496				};
497			}
498		};
499
500		let session_type = txn.session_type.clone();
501		let session_default_deny = txn.session_default_deny;
502		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
503			&mut Transaction::Test(Box::new(txn.reborrow())),
504			&session_type,
505			session_default_deny,
506		) {
507			return ExecutionResult {
508				frames: vec![],
509				error: Some(e),
510				metrics: ExecutionMetrics::default(),
511			};
512		}
513
514		let start_compile = self.0.runtime_context.clock.instant();
515		match self.compiler.compile_with_policy(
516			&mut Transaction::Test(Box::new(txn.reborrow())),
517			cmd.rql,
518			inject_read_policies,
519		) {
520			Err(err) => {
521				#[cfg(not(reifydb_single_threaded))]
522				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
523					return ExecutionResult {
524						frames,
525						error: None,
526						metrics: ExecutionMetrics::default(),
527					};
528				}
529				ExecutionResult {
530					frames: vec![],
531					error: Some(err),
532					metrics: ExecutionMetrics::default(),
533				}
534			}
535			Ok(CompilationResult::Ready(compiled)) => {
536				let compile_duration = start_compile.elapsed();
537				match execute_compiled_units(
538					&self.0,
539					&mut Transaction::Test(Box::new(txn.reborrow())),
540					&compiled,
541					&cmd.params,
542					symbols,
543					compile_duration,
544				) {
545					Ok((output, remaining, _, metrics)) => ExecutionResult {
546						frames: merge_results(output, remaining),
547						error: None,
548						metrics: build_metrics(metrics),
549					},
550					Err(f) => ExecutionResult {
551						frames: vec![],
552						error: Some(f.error),
553						metrics: build_metrics(f.partial_metrics),
554					},
555				}
556			}
557			Ok(CompilationResult::Incremental(mut state)) => {
558				let policy = constrain_policy(|plans, bump, cat, tx| {
559					inject_read_policies(plans, bump, cat, tx)
560				});
561				let mut result = vec![];
562				let mut output_results: Vec<Frame> = Vec::new();
563				let mut symbols = symbols;
564				let mut metrics = Vec::new();
565				loop {
566					let start_incr = self.0.runtime_context.clock.instant();
567					let next = match self.compiler.compile_next_with_policy(
568						&mut Transaction::Test(Box::new(txn.reborrow())),
569						&mut state,
570						&policy,
571					) {
572						Ok(n) => n,
573						Err(e) => {
574							return ExecutionResult {
575								frames: vec![],
576								error: Some(e),
577								metrics: build_metrics(metrics),
578							};
579						}
580					};
581					let compile_duration = start_incr.elapsed();
582
583					let Some(compiled) = next else {
584						break;
585					};
586
587					result.clear();
588					let mut tx = Transaction::Test(Box::new(txn.reborrow()));
589					let mut vm = Vm::new(symbols);
590					let start_execute = self.0.runtime_context.clock.instant();
591					let run_result = vm.run(
592						&self.0,
593						&mut tx,
594						&compiled.instructions,
595						&cmd.params,
596						&mut result,
597					);
598					let execute_duration = start_execute.elapsed();
599					symbols = vm.symbols;
600
601					metrics.push(StatementMetric {
602						fingerprint: compiled.fingerprint,
603						normalized_rql: compiled.normalized_rql,
604						compile_duration_us: compile_duration.as_micros() as u64,
605						execute_duration_us: execute_duration.as_micros() as u64,
606						rows_affected: if run_result.is_ok() {
607							extract_rows_affected(&result)
608						} else {
609							0
610						},
611					});
612
613					if let Err(e) = run_result {
614						return ExecutionResult {
615							frames: vec![],
616							error: Some(e),
617							metrics: build_metrics(metrics),
618						};
619					}
620
621					if compiled.is_output {
622						output_results.append(&mut result);
623					}
624				}
625				ExecutionResult {
626					frames: merge_results(output_results, result),
627					error: None,
628					metrics: build_metrics(metrics),
629				}
630			}
631		}
632	}
633
634	#[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
635	pub fn subscription(&self, txn: &mut QueryTransaction, cmd: Subscription<'_>) -> ExecutionResult {
636		// Pre-compilation validation: parse and check statement constraints
637		let bump = Bump::new();
638		let statements = match parse_str(&bump, cmd.rql) {
639			Ok(s) => s,
640			Err(e) => {
641				return ExecutionResult {
642					frames: vec![],
643					error: Some(e),
644					metrics: ExecutionMetrics::default(),
645				};
646			}
647		};
648
649		if statements.len() != 1 {
650			return ExecutionResult {
651				frames: vec![],
652				error: Some(Error(Box::new(subscription::single_statement_required(
653					"Subscription endpoint requires exactly one statement",
654				)))),
655				metrics: ExecutionMetrics::default(),
656			};
657		}
658
659		let statement = &statements[0];
660		if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
661			return ExecutionResult {
662				frames: vec![],
663				error: Some(Error(Box::new(subscription::invalid_statement(
664					"Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
665				)))),
666				metrics: ExecutionMetrics::default(),
667			};
668		}
669
670		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Query(&mut *txn)) {
671			Ok(s) => s,
672			Err(e) => {
673				return ExecutionResult {
674					frames: vec![],
675					error: Some(e),
676					metrics: ExecutionMetrics::default(),
677				};
678			}
679		};
680
681		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
682			&mut Transaction::Query(&mut *txn),
683			"subscription",
684			true,
685		) {
686			return ExecutionResult {
687				frames: vec![],
688				error: Some(e),
689				metrics: ExecutionMetrics::default(),
690			};
691		}
692
693		let start_compile = self.0.runtime_context.clock.instant();
694		let compiled = match self.compiler.compile_with_policy(
695			&mut Transaction::Query(txn),
696			cmd.rql,
697			inject_read_policies,
698		) {
699			Ok(CompilationResult::Ready(compiled)) => compiled,
700			Ok(CompilationResult::Incremental(_)) => {
701				unreachable!("Single subscription statement should not require incremental compilation")
702			}
703			Err(err) => {
704				return ExecutionResult {
705					frames: vec![],
706					error: Some(err),
707					metrics: ExecutionMetrics::default(),
708				};
709			}
710		};
711		let compile_duration = start_compile.elapsed();
712
713		match execute_compiled_units(
714			&self.0,
715			&mut Transaction::Query(txn),
716			&compiled,
717			&cmd.params,
718			symbols,
719			compile_duration,
720		) {
721			Ok((output, remaining, _, metrics)) => ExecutionResult {
722				frames: merge_results(output, remaining),
723				error: None,
724				metrics: build_metrics(metrics),
725			},
726			Err(f) => ExecutionResult {
727				frames: vec![],
728				error: Some(f.error),
729				metrics: build_metrics(f.partial_metrics),
730			},
731		}
732	}
733
734	#[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
735	pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> ExecutionResult {
736		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn)) {
737			Ok(s) => s,
738			Err(e) => {
739				return ExecutionResult {
740					frames: vec![],
741					error: Some(e),
742					metrics: ExecutionMetrics::default(),
743				};
744			}
745		};
746
747		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
748			&mut Transaction::Command(&mut *txn),
749			"command",
750			false,
751		) {
752			return ExecutionResult {
753				frames: vec![],
754				error: Some(e),
755				metrics: ExecutionMetrics::default(),
756			};
757		}
758
759		let start_compile = self.0.runtime_context.clock.instant();
760		let compiled = match self.compiler.compile_with_policy(
761			&mut Transaction::Command(txn),
762			cmd.rql,
763			inject_read_policies,
764		) {
765			Ok(CompilationResult::Ready(compiled)) => compiled,
766			Ok(CompilationResult::Incremental(_)) => {
767				unreachable!("DDL statements require admin transactions, not command transactions")
768			}
769			Err(err) => {
770				#[cfg(not(reifydb_single_threaded))]
771				if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
772					return ExecutionResult {
773						frames: vec![],
774						error: Some(Error(Box::new(Diagnostic {
775							code: "REMOTE_002".to_string(),
776							message: "Write operations on remote namespaces are not supported"
777								.to_string(),
778							help: Some("Use the remote instance directly for write operations"
779								.to_string()),
780							..Default::default()
781						}))),
782						metrics: ExecutionMetrics::default(),
783					};
784				}
785				return ExecutionResult {
786					frames: vec![],
787					error: Some(err),
788					metrics: ExecutionMetrics::default(),
789				};
790			}
791		};
792		let compile_duration = start_compile.elapsed();
793
794		match execute_compiled_units(
795			&self.0,
796			&mut Transaction::Command(txn),
797			&compiled,
798			&cmd.params,
799			symbols,
800			compile_duration,
801		) {
802			Ok((output, remaining, _, metrics)) => ExecutionResult {
803				frames: merge_results(output, remaining),
804				error: None,
805				metrics: build_metrics(metrics),
806			},
807			Err(f) => ExecutionResult {
808				frames: vec![],
809				error: Some(f.error),
810				metrics: build_metrics(f.partial_metrics),
811			},
812		}
813	}
814
815	/// Call a procedure by fully-qualified name (e.g., "banking.transfer_funds").
816	#[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
817	pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> ExecutionResult {
818		let rql = format!("CALL {}()", name);
819		let symbols = match self.setup_symbols(params, &mut Transaction::Command(&mut *txn)) {
820			Ok(s) => s,
821			Err(e) => {
822				return ExecutionResult {
823					frames: vec![],
824					error: Some(e),
825					metrics: ExecutionMetrics::default(),
826				};
827			}
828		};
829
830		let start_compile = self.0.runtime_context.clock.instant();
831		let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql) {
832			Ok(CompilationResult::Ready(compiled)) => compiled,
833			Ok(CompilationResult::Incremental(_)) => {
834				unreachable!("CALL statements should not require incremental compilation")
835			}
836			Err(e) => {
837				return ExecutionResult {
838					frames: vec![],
839					error: Some(e),
840					metrics: ExecutionMetrics::default(),
841				};
842			}
843		};
844		let compile_duration = start_compile.elapsed();
845		let compile_duration_us = compile_duration.as_micros() as u64 / compiled.len().max(1) as u64;
846
847		let mut result = vec![];
848		let mut metrics = Vec::new();
849		let mut symbols = symbols;
850		for compiled in compiled.iter() {
851			result.clear();
852			let mut tx = Transaction::Command(txn);
853			let mut vm = Vm::new(symbols);
854			let start_execute = self.0.runtime_context.clock.instant();
855			let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result);
856			let execute_duration = start_execute.elapsed();
857			symbols = vm.symbols;
858
859			metrics.push(StatementMetric {
860				fingerprint: compiled.fingerprint,
861				normalized_rql: compiled.normalized_rql.clone(),
862				compile_duration_us,
863				execute_duration_us: execute_duration.as_micros() as u64,
864				rows_affected: if run_result.is_ok() {
865					extract_rows_affected(&result)
866				} else {
867					0
868				},
869			});
870
871			if let Err(e) = run_result {
872				return ExecutionResult {
873					frames: vec![],
874					error: Some(e),
875					metrics: build_metrics(metrics),
876				};
877			}
878		}
879
880		ExecutionResult {
881			frames: result,
882			error: None,
883			metrics: build_metrics(metrics),
884		}
885	}
886
887	#[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
888	pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> ExecutionResult {
889		let symbols = match self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn)) {
890			Ok(s) => s,
891			Err(e) => {
892				return ExecutionResult {
893					frames: vec![],
894					error: Some(e),
895					metrics: ExecutionMetrics::default(),
896				};
897			}
898		};
899
900		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
901			&mut Transaction::Query(&mut *txn),
902			"query",
903			false,
904		) {
905			return ExecutionResult {
906				frames: vec![],
907				error: Some(e),
908				metrics: ExecutionMetrics::default(),
909			};
910		}
911
912		let start_compile = self.0.runtime_context.clock.instant();
913		let compiled = match self.compiler.compile_with_policy(
914			&mut Transaction::Query(txn),
915			qry.rql,
916			inject_read_policies,
917		) {
918			Ok(CompilationResult::Ready(compiled)) => compiled,
919			Ok(CompilationResult::Incremental(_)) => {
920				unreachable!("DDL statements require admin transactions, not query transactions")
921			}
922			Err(err) => {
923				#[cfg(not(reifydb_single_threaded))]
924				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, qry.rql, qry.params) {
925					return ExecutionResult {
926						frames,
927						error: None,
928						metrics: ExecutionMetrics::default(),
929					};
930				}
931				return ExecutionResult {
932					frames: vec![],
933					error: Some(err),
934					metrics: ExecutionMetrics::default(),
935				};
936			}
937		};
938		let compile_duration = start_compile.elapsed();
939
940		let exec_result = execute_compiled_units(
941			&self.0,
942			&mut Transaction::Query(txn),
943			&compiled,
944			&qry.params,
945			symbols,
946			compile_duration,
947		);
948
949		match exec_result {
950			Ok((output, remaining, _, metrics)) => ExecutionResult {
951				frames: merge_results(output, remaining),
952				error: None,
953				metrics: build_metrics(metrics),
954			},
955			Err(f) => ExecutionResult {
956				frames: vec![],
957				error: Some(f.error),
958				metrics: build_metrics(f.partial_metrics),
959			},
960		}
961	}
962}