Skip to main content

reifydb_engine/vm/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, result::Result as StdResult, sync::Arc, time::Duration};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{
9	error::diagnostic::subscription,
10	execution::ExecutionResult,
11	interface::catalog::policy::SessionOp,
12	metric::{ExecutionMetrics, StatementMetric},
13	value::column::columns::Columns,
14};
15use reifydb_metric::storage::metric::MetricReader;
16use reifydb_policy::inject_from_policies;
17use reifydb_rql::{
18	ast::parse_str,
19	compiler::{CompilationResult, Compiled, constrain_policy},
20	fingerprint::request::fingerprint_request,
21};
22use reifydb_store_single::SingleStore;
23use reifydb_transaction::transaction::{
24	RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
25	query::QueryTransaction,
26};
27#[cfg(not(reifydb_single_threaded))]
28use reifydb_type::error::Diagnostic;
29use reifydb_type::{
30	error::Error,
31	params::Params,
32	value::{Value, frame::frame::Frame, r#type::Type},
33};
34use tracing::instrument;
35
36#[cfg(not(reifydb_single_threaded))]
37use crate::remote;
38use crate::{
39	Result,
40	policy::PolicyEvaluator,
41	vm::{
42		Admin, Command, Query, Subscription, Test,
43		services::{EngineConfig, Services},
44		stack::{SymbolTable, Variable},
45		vm::Vm,
46	},
47};
48
49/// Executor is the orchestration layer for RQL statement execution.
50pub struct Executor(Arc<Services>);
51
52impl Clone for Executor {
53	fn clone(&self) -> Self {
54		Self(self.0.clone())
55	}
56}
57
58impl Deref for Executor {
59	type Target = Services;
60
61	fn deref(&self) -> &Self::Target {
62		&self.0
63	}
64}
65
66impl Executor {
67	pub fn new(
68		catalog: Catalog,
69		config: EngineConfig,
70		flow_operator_store: SystemFlowOperatorStore,
71		stats_reader: MetricReader<SingleStore>,
72	) -> Self {
73		Self(Arc::new(Services::new(catalog, config, flow_operator_store, stats_reader)))
74	}
75
76	/// Get a reference to the underlying Services
77	pub fn services(&self) -> &Arc<Services> {
78		&self.0
79	}
80
81	/// Construct an Executor from an existing `Arc<Services>`.
82	pub fn from_services(services: Arc<Services>) -> Self {
83		Self(services)
84	}
85
86	#[allow(dead_code)]
87	pub fn testing() -> Self {
88		Self(Services::testing())
89	}
90
91	/// If the error is a REMOTE_001 and we have a RemoteRegistry, forward the query.
92	/// Returns `Ok(Some(frames))` if forwarded, `Ok(None)` if not a remote query.
93	#[cfg(not(reifydb_single_threaded))]
94	fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
95		if let Some(ref registry) = self.0.remote_registry
96			&& remote::is_remote_query(err)
97			&& let Some(address) = remote::extract_remote_address(err)
98		{
99			let token = remote::extract_remote_token(err);
100			return registry.forward_query(&address, rql, params, token.as_deref()).map(Some);
101		}
102		Ok(None)
103	}
104}
105
106impl RqlExecutor for Executor {
107	fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
108		Executor::rql(self, tx, rql, params)
109	}
110}
111
112/// Populate a stack with parameters so they can be accessed as variables.
113fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
114	match params {
115		Params::Positional(values) => {
116			for (index, value) in values.iter().enumerate() {
117				let param_name = (index + 1).to_string();
118				symbols.set(param_name, Variable::scalar(value.clone()), false)?;
119			}
120		}
121		Params::Named(map) => {
122			for (name, value) in map.iter() {
123				symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
124			}
125		}
126		Params::None => {}
127	}
128	Ok(())
129}
130
131/// Populate the `$identity` variable in the symbol table so policy bodies
132/// (and user RQL) can reference `$identity.id`, `$identity.name`, and `$identity.roles`.
133fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
134	let identity = tx.identity();
135	if identity.is_privileged() {
136		return Ok(());
137	}
138	if identity.is_anonymous() {
139		let columns = Columns::single_row([
140			("id", Value::IdentityId(identity)),
141			("name", Value::none_of(Type::Utf8)),
142			("roles", Value::List(vec![])),
143		]);
144		symbols.set("identity".to_string(), Variable::columns(columns), false)?;
145		return Ok(());
146	}
147	if let Some(user) = catalog.find_identity(tx, identity)? {
148		let roles = catalog.find_role_names_for_identity(tx, identity)?;
149		let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
150		let columns = Columns::single_row([
151			("id", Value::IdentityId(identity)),
152			("name", Value::Utf8(user.name)),
153			("roles", Value::List(role_values)),
154		]);
155		symbols.set("identity".to_string(), Variable::columns(columns), false)?;
156	}
157	Ok(())
158}
159
160/// Execute a list of compiled units, tracking output frames separately.
161type CompiledUnitsResult = (Vec<Frame>, Vec<Frame>, SymbolTable, Vec<StatementMetric>);
162
163/// Error from `execute_compiled_units` that preserves partial metrics.
164struct ExecutionFailure {
165	error: Error,
166	partial_metrics: Vec<StatementMetric>,
167}
168
169/// Build `ExecutionMetrics` from a list of statement metrics.
170fn build_metrics(statements: Vec<StatementMetric>) -> ExecutionMetrics {
171	let fps: Vec<_> = statements.iter().map(|m| m.fingerprint).collect();
172	ExecutionMetrics {
173		fingerprint: fingerprint_request(&fps),
174		statements,
175		..Default::default()
176	}
177}
178
179/// Returns (output_results, last_result, final_symbols, metrics).
180fn execute_compiled_units(
181	services: &Arc<Services>,
182	tx: &mut Transaction<'_>,
183	compiled_list: &[Compiled],
184	params: &Params,
185	mut symbols: SymbolTable,
186	compile_duration: Duration,
187) -> StdResult<CompiledUnitsResult, ExecutionFailure> {
188	let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
189	let mut result = vec![];
190	let mut output_results: Vec<Frame> = Vec::new();
191	let mut metrics = Vec::new();
192
193	for compiled in compiled_list.iter() {
194		result.clear();
195		let mut vm = Vm::from_services(symbols, services, params, tx.identity());
196		let start = services.runtime_context.clock.instant();
197		let run_result = vm.run(services, tx, &compiled.instructions, &mut result);
198		let execute_duration = start.elapsed();
199		symbols = vm.symbols;
200
201		metrics.push(StatementMetric {
202			fingerprint: compiled.fingerprint,
203			normalized_rql: compiled.normalized_rql.clone(),
204			compile_duration_us,
205			execute_duration_us: execute_duration.as_micros() as u64,
206			rows_affected: if run_result.is_ok() {
207				extract_rows_affected(&result)
208			} else {
209				0
210			},
211		});
212
213		if let Err(error) = run_result {
214			return Err(ExecutionFailure {
215				error,
216				partial_metrics: metrics,
217			});
218		}
219
220		if compiled.is_output {
221			output_results.append(&mut result);
222		}
223	}
224
225	Ok((output_results, result, symbols, metrics))
226}
227
228/// Merge output_results and remaining results into the final result.
229fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
230	output_results.append(&mut remaining);
231	output_results
232}
233
234/// Extract the actual rows-affected count from a DML result.
235///
236/// DML handlers (INSERT/UPDATE/DELETE) emit a single summary frame with a
237/// column named "inserted", "updated", or "deleted" containing the count as
238/// a `Uint8` value. When that pattern is detected, return the real count.
239/// Otherwise fall back to the number of frames (correct for SELECT, DDL, etc.).
240fn extract_rows_affected(result: &[Frame]) -> u64 {
241	if result.len() == 1 {
242		let frame = &result[0];
243		for col in &frame.columns {
244			match col.name.as_str() {
245				"inserted" | "updated" | "deleted" => {
246					if col.data.len() == 1
247						&& let Value::Uint8(n) = col.data.get_value(0)
248					{
249						return n;
250					}
251				}
252				_ => {}
253			}
254		}
255	}
256	result.len() as u64
257}
258
259impl Executor {
260	/// Shared setup: create symbols and populate with params + identity.
261	fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
262		let mut symbols = SymbolTable::new();
263		populate_symbols(&mut symbols, params)?;
264		populate_identity(&mut symbols, &self.catalog, tx)?;
265		Ok(symbols)
266	}
267
268	/// Execute RQL against an existing open transaction.
269	///
270	/// This is the universal RQL execution interface: it compiles and runs
271	/// arbitrary RQL within whatever transaction variant the caller provides.
272	#[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
273	pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
274		let mut symbols = match self.setup_symbols(&params, tx) {
275			Ok(s) => s,
276			Err(e) => {
277				return ExecutionResult {
278					frames: vec![],
279					error: Some(e),
280					metrics: ExecutionMetrics::default(),
281				};
282			}
283		};
284
285		let start_compile = self.0.runtime_context.clock.instant();
286		let compiled_list = match self.compiler.compile_with_policy(tx, rql, inject_from_policies) {
287			Ok(CompilationResult::Ready(compiled)) => compiled,
288			Ok(CompilationResult::Incremental(_)) => {
289				unreachable!("incremental compilation not supported in rql()")
290			}
291			Err(err) => {
292				#[cfg(not(reifydb_single_threaded))]
293				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
294					return ExecutionResult {
295						frames,
296						error: None,
297						metrics: ExecutionMetrics::default(),
298					};
299				}
300				return ExecutionResult {
301					frames: vec![],
302					error: Some(err),
303					metrics: ExecutionMetrics::default(),
304				};
305			}
306		};
307		let compile_duration = start_compile.elapsed();
308		let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
309
310		let mut result = vec![];
311		let mut metrics = Vec::new();
312		for compiled in compiled_list.iter() {
313			result.clear();
314			let mut vm = Vm::from_services(symbols, &self.0, &params, tx.identity());
315			let start_execute = self.0.runtime_context.clock.instant();
316			let run_result = vm.run(&self.0, tx, &compiled.instructions, &mut result);
317			let execute_duration = start_execute.elapsed();
318			symbols = vm.symbols;
319
320			metrics.push(StatementMetric {
321				fingerprint: compiled.fingerprint,
322				normalized_rql: compiled.normalized_rql.clone(),
323				compile_duration_us,
324				execute_duration_us: execute_duration.as_micros() as u64,
325				rows_affected: if run_result.is_ok() {
326					extract_rows_affected(&result)
327				} else {
328					0
329				},
330			});
331
332			if let Err(e) = run_result {
333				return ExecutionResult {
334					frames: vec![],
335					error: Some(e),
336					metrics: build_metrics(metrics),
337				};
338			}
339		}
340
341		ExecutionResult {
342			frames: result,
343			error: None,
344			metrics: build_metrics(metrics),
345		}
346	}
347
348	#[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
349	pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> ExecutionResult {
350		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn)) {
351			Ok(s) => s,
352			Err(e) => {
353				return ExecutionResult {
354					frames: vec![],
355					error: Some(e),
356					metrics: ExecutionMetrics::default(),
357				};
358			}
359		};
360
361		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
362			&mut Transaction::Admin(&mut *txn),
363			SessionOp::Admin,
364			true,
365		) {
366			return ExecutionResult {
367				frames: vec![],
368				error: Some(e),
369				metrics: ExecutionMetrics::default(),
370			};
371		}
372
373		let start_compile = self.0.runtime_context.clock.instant();
374		match self.compiler.compile_with_policy(&mut Transaction::Admin(txn), cmd.rql, inject_from_policies) {
375			Err(err) => {
376				#[cfg(not(reifydb_single_threaded))]
377				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
378					return ExecutionResult {
379						frames,
380						error: None,
381						metrics: ExecutionMetrics::default(),
382					};
383				}
384				ExecutionResult {
385					frames: vec![],
386					error: Some(err),
387					metrics: ExecutionMetrics::default(),
388				}
389			}
390			Ok(CompilationResult::Ready(compiled)) => {
391				let compile_duration = start_compile.elapsed();
392				match execute_compiled_units(
393					&self.0,
394					&mut Transaction::Admin(txn),
395					&compiled,
396					&cmd.params,
397					symbols,
398					compile_duration,
399				) {
400					Ok((output, remaining, _, metrics)) => ExecutionResult {
401						frames: merge_results(output, remaining),
402						error: None,
403						metrics: build_metrics(metrics),
404					},
405					Err(f) => ExecutionResult {
406						frames: vec![],
407						error: Some(f.error),
408						metrics: build_metrics(f.partial_metrics),
409					},
410				}
411			}
412			Ok(CompilationResult::Incremental(mut state)) => {
413				let policy = constrain_policy(|plans, bump, cat, tx| {
414					inject_from_policies(plans, bump, cat, tx)
415				});
416				let mut result = vec![];
417				let mut output_results: Vec<Frame> = Vec::new();
418				let mut symbols = symbols;
419				let mut metrics = Vec::new();
420				loop {
421					let start_incr = self.0.runtime_context.clock.instant();
422					let next = match self.compiler.compile_next_with_policy(
423						&mut Transaction::Admin(txn),
424						&mut state,
425						&policy,
426					) {
427						Ok(n) => n,
428						Err(e) => {
429							return ExecutionResult {
430								frames: vec![],
431								error: Some(e),
432								metrics: build_metrics(metrics),
433							};
434						}
435					};
436					let compile_duration = start_incr.elapsed();
437
438					let Some(compiled) = next else {
439						break;
440					};
441
442					result.clear();
443					let mut tx = Transaction::Admin(txn);
444					let mut vm = Vm::from_services(symbols, &self.0, &cmd.params, tx.identity());
445					let start_execute = self.0.runtime_context.clock.instant();
446					let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
447					let execute_duration = start_execute.elapsed();
448					symbols = vm.symbols;
449
450					metrics.push(StatementMetric {
451						fingerprint: compiled.fingerprint,
452						normalized_rql: compiled.normalized_rql,
453						compile_duration_us: compile_duration.as_micros() as u64,
454						execute_duration_us: execute_duration.as_micros() as u64,
455						rows_affected: if run_result.is_ok() {
456							extract_rows_affected(&result)
457						} else {
458							0
459						},
460					});
461
462					if let Err(e) = run_result {
463						return ExecutionResult {
464							frames: vec![],
465							error: Some(e),
466							metrics: build_metrics(metrics),
467						};
468					}
469
470					if compiled.is_output {
471						output_results.append(&mut result);
472					}
473				}
474				ExecutionResult {
475					frames: merge_results(output_results, result),
476					error: None,
477					metrics: build_metrics(metrics),
478				}
479			}
480		}
481	}
482
483	#[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
484	pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> ExecutionResult {
485		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Test(Box::new(txn.reborrow()))) {
486			Ok(s) => s,
487			Err(e) => {
488				return ExecutionResult {
489					frames: vec![],
490					error: Some(e),
491					metrics: ExecutionMetrics::default(),
492				};
493			}
494		};
495
496		let session_type = txn.session_type;
497		let session_default_deny = txn.session_default_deny;
498		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
499			&mut Transaction::Test(Box::new(txn.reborrow())),
500			session_type,
501			session_default_deny,
502		) {
503			return ExecutionResult {
504				frames: vec![],
505				error: Some(e),
506				metrics: ExecutionMetrics::default(),
507			};
508		}
509
510		let start_compile = self.0.runtime_context.clock.instant();
511		match self.compiler.compile_with_policy(
512			&mut Transaction::Test(Box::new(txn.reborrow())),
513			cmd.rql,
514			inject_from_policies,
515		) {
516			Err(err) => {
517				#[cfg(not(reifydb_single_threaded))]
518				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
519					return ExecutionResult {
520						frames,
521						error: None,
522						metrics: ExecutionMetrics::default(),
523					};
524				}
525				ExecutionResult {
526					frames: vec![],
527					error: Some(err),
528					metrics: ExecutionMetrics::default(),
529				}
530			}
531			Ok(CompilationResult::Ready(compiled)) => {
532				let compile_duration = start_compile.elapsed();
533				match execute_compiled_units(
534					&self.0,
535					&mut Transaction::Test(Box::new(txn.reborrow())),
536					&compiled,
537					&cmd.params,
538					symbols,
539					compile_duration,
540				) {
541					Ok((output, remaining, _, metrics)) => ExecutionResult {
542						frames: merge_results(output, remaining),
543						error: None,
544						metrics: build_metrics(metrics),
545					},
546					Err(f) => ExecutionResult {
547						frames: vec![],
548						error: Some(f.error),
549						metrics: build_metrics(f.partial_metrics),
550					},
551				}
552			}
553			Ok(CompilationResult::Incremental(mut state)) => {
554				let policy = constrain_policy(|plans, bump, cat, tx| {
555					inject_from_policies(plans, bump, cat, tx)
556				});
557				let mut result = vec![];
558				let mut output_results: Vec<Frame> = Vec::new();
559				let mut symbols = symbols;
560				let mut metrics = Vec::new();
561				loop {
562					let start_incr = self.0.runtime_context.clock.instant();
563					let next = match self.compiler.compile_next_with_policy(
564						&mut Transaction::Test(Box::new(txn.reborrow())),
565						&mut state,
566						&policy,
567					) {
568						Ok(n) => n,
569						Err(e) => {
570							return ExecutionResult {
571								frames: vec![],
572								error: Some(e),
573								metrics: build_metrics(metrics),
574							};
575						}
576					};
577					let compile_duration = start_incr.elapsed();
578
579					let Some(compiled) = next else {
580						break;
581					};
582
583					result.clear();
584					let mut tx = Transaction::Test(Box::new(txn.reborrow()));
585					let mut vm = Vm::from_services(symbols, &self.0, &cmd.params, tx.identity());
586					let start_execute = self.0.runtime_context.clock.instant();
587					let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
588					let execute_duration = start_execute.elapsed();
589					symbols = vm.symbols;
590
591					metrics.push(StatementMetric {
592						fingerprint: compiled.fingerprint,
593						normalized_rql: compiled.normalized_rql,
594						compile_duration_us: compile_duration.as_micros() as u64,
595						execute_duration_us: execute_duration.as_micros() as u64,
596						rows_affected: if run_result.is_ok() {
597							extract_rows_affected(&result)
598						} else {
599							0
600						},
601					});
602
603					if let Err(e) = run_result {
604						return ExecutionResult {
605							frames: vec![],
606							error: Some(e),
607							metrics: build_metrics(metrics),
608						};
609					}
610
611					if compiled.is_output {
612						output_results.append(&mut result);
613					}
614				}
615				ExecutionResult {
616					frames: merge_results(output_results, result),
617					error: None,
618					metrics: build_metrics(metrics),
619				}
620			}
621		}
622	}
623
624	#[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
625	pub fn subscription(&self, txn: &mut QueryTransaction, cmd: Subscription<'_>) -> ExecutionResult {
626		// Pre-compilation validation: parse and check statement constraints
627		let bump = Bump::new();
628		let statements = match parse_str(&bump, cmd.rql) {
629			Ok(s) => s,
630			Err(e) => {
631				return ExecutionResult {
632					frames: vec![],
633					error: Some(e),
634					metrics: ExecutionMetrics::default(),
635				};
636			}
637		};
638
639		if statements.len() != 1 {
640			return ExecutionResult {
641				frames: vec![],
642				error: Some(Error(Box::new(subscription::single_statement_required(
643					"Subscription endpoint requires exactly one statement",
644				)))),
645				metrics: ExecutionMetrics::default(),
646			};
647		}
648
649		let statement = &statements[0];
650		if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
651			return ExecutionResult {
652				frames: vec![],
653				error: Some(Error(Box::new(subscription::invalid_statement(
654					"Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
655				)))),
656				metrics: ExecutionMetrics::default(),
657			};
658		}
659
660		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Query(&mut *txn)) {
661			Ok(s) => s,
662			Err(e) => {
663				return ExecutionResult {
664					frames: vec![],
665					error: Some(e),
666					metrics: ExecutionMetrics::default(),
667				};
668			}
669		};
670
671		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
672			&mut Transaction::Query(&mut *txn),
673			SessionOp::Subscription,
674			true,
675		) {
676			return ExecutionResult {
677				frames: vec![],
678				error: Some(e),
679				metrics: ExecutionMetrics::default(),
680			};
681		}
682
683		let start_compile = self.0.runtime_context.clock.instant();
684		let compiled = match self.compiler.compile_with_policy(
685			&mut Transaction::Query(txn),
686			cmd.rql,
687			inject_from_policies,
688		) {
689			Ok(CompilationResult::Ready(compiled)) => compiled,
690			Ok(CompilationResult::Incremental(_)) => {
691				unreachable!("Single subscription statement should not require incremental compilation")
692			}
693			Err(err) => {
694				return ExecutionResult {
695					frames: vec![],
696					error: Some(err),
697					metrics: ExecutionMetrics::default(),
698				};
699			}
700		};
701		let compile_duration = start_compile.elapsed();
702
703		match execute_compiled_units(
704			&self.0,
705			&mut Transaction::Query(txn),
706			&compiled,
707			&cmd.params,
708			symbols,
709			compile_duration,
710		) {
711			Ok((output, remaining, _, metrics)) => ExecutionResult {
712				frames: merge_results(output, remaining),
713				error: None,
714				metrics: build_metrics(metrics),
715			},
716			Err(f) => ExecutionResult {
717				frames: vec![],
718				error: Some(f.error),
719				metrics: build_metrics(f.partial_metrics),
720			},
721		}
722	}
723
724	#[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
725	pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> ExecutionResult {
726		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn)) {
727			Ok(s) => s,
728			Err(e) => {
729				return ExecutionResult {
730					frames: vec![],
731					error: Some(e),
732					metrics: ExecutionMetrics::default(),
733				};
734			}
735		};
736
737		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
738			&mut Transaction::Command(&mut *txn),
739			SessionOp::Command,
740			false,
741		) {
742			return ExecutionResult {
743				frames: vec![],
744				error: Some(e),
745				metrics: ExecutionMetrics::default(),
746			};
747		}
748
749		let start_compile = self.0.runtime_context.clock.instant();
750		let compiled = match self.compiler.compile_with_policy(
751			&mut Transaction::Command(txn),
752			cmd.rql,
753			inject_from_policies,
754		) {
755			Ok(CompilationResult::Ready(compiled)) => compiled,
756			Ok(CompilationResult::Incremental(_)) => {
757				unreachable!("DDL statements require admin transactions, not command transactions")
758			}
759			Err(err) => {
760				#[cfg(not(reifydb_single_threaded))]
761				if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
762					return ExecutionResult {
763						frames: vec![],
764						error: Some(Error(Box::new(Diagnostic {
765							code: "REMOTE_002".to_string(),
766							message: "Write operations on remote namespaces are not supported"
767								.to_string(),
768							help: Some("Use the remote instance directly for write operations"
769								.to_string()),
770							..Default::default()
771						}))),
772						metrics: ExecutionMetrics::default(),
773					};
774				}
775				return ExecutionResult {
776					frames: vec![],
777					error: Some(err),
778					metrics: ExecutionMetrics::default(),
779				};
780			}
781		};
782		let compile_duration = start_compile.elapsed();
783
784		match execute_compiled_units(
785			&self.0,
786			&mut Transaction::Command(txn),
787			&compiled,
788			&cmd.params,
789			symbols,
790			compile_duration,
791		) {
792			Ok((output, remaining, _, metrics)) => ExecutionResult {
793				frames: merge_results(output, remaining),
794				error: None,
795				metrics: build_metrics(metrics),
796			},
797			Err(f) => ExecutionResult {
798				frames: vec![],
799				error: Some(f.error),
800				metrics: build_metrics(f.partial_metrics),
801			},
802		}
803	}
804
805	/// Call a procedure by fully-qualified name (e.g., "banking.transfer_funds").
806	#[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
807	pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> ExecutionResult {
808		let rql = format!("CALL {}()", name);
809		let symbols = match self.setup_symbols(params, &mut Transaction::Command(&mut *txn)) {
810			Ok(s) => s,
811			Err(e) => {
812				return ExecutionResult {
813					frames: vec![],
814					error: Some(e),
815					metrics: ExecutionMetrics::default(),
816				};
817			}
818		};
819
820		let start_compile = self.0.runtime_context.clock.instant();
821		let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql) {
822			Ok(CompilationResult::Ready(compiled)) => compiled,
823			Ok(CompilationResult::Incremental(_)) => {
824				unreachable!("CALL statements should not require incremental compilation")
825			}
826			Err(e) => {
827				return ExecutionResult {
828					frames: vec![],
829					error: Some(e),
830					metrics: ExecutionMetrics::default(),
831				};
832			}
833		};
834		let compile_duration = start_compile.elapsed();
835		let compile_duration_us = compile_duration.as_micros() as u64 / compiled.len().max(1) as u64;
836
837		let mut result = vec![];
838		let mut metrics = Vec::new();
839		let mut symbols = symbols;
840		for compiled in compiled.iter() {
841			result.clear();
842			let mut tx = Transaction::Command(txn);
843			let mut vm = Vm::from_services(symbols, &self.0, params, tx.identity());
844			let start_execute = self.0.runtime_context.clock.instant();
845			let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
846			let execute_duration = start_execute.elapsed();
847			symbols = vm.symbols;
848
849			metrics.push(StatementMetric {
850				fingerprint: compiled.fingerprint,
851				normalized_rql: compiled.normalized_rql.clone(),
852				compile_duration_us,
853				execute_duration_us: execute_duration.as_micros() as u64,
854				rows_affected: if run_result.is_ok() {
855					extract_rows_affected(&result)
856				} else {
857					0
858				},
859			});
860
861			if let Err(e) = run_result {
862				return ExecutionResult {
863					frames: vec![],
864					error: Some(e),
865					metrics: build_metrics(metrics),
866				};
867			}
868		}
869
870		ExecutionResult {
871			frames: result,
872			error: None,
873			metrics: build_metrics(metrics),
874		}
875	}
876
877	#[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
878	pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> ExecutionResult {
879		let symbols = match self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn)) {
880			Ok(s) => s,
881			Err(e) => {
882				return ExecutionResult {
883					frames: vec![],
884					error: Some(e),
885					metrics: ExecutionMetrics::default(),
886				};
887			}
888		};
889
890		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
891			&mut Transaction::Query(&mut *txn),
892			SessionOp::Query,
893			false,
894		) {
895			return ExecutionResult {
896				frames: vec![],
897				error: Some(e),
898				metrics: ExecutionMetrics::default(),
899			};
900		}
901
902		let start_compile = self.0.runtime_context.clock.instant();
903		let compiled = match self.compiler.compile_with_policy(
904			&mut Transaction::Query(txn),
905			qry.rql,
906			inject_from_policies,
907		) {
908			Ok(CompilationResult::Ready(compiled)) => compiled,
909			Ok(CompilationResult::Incremental(_)) => {
910				unreachable!("DDL statements require admin transactions, not query transactions")
911			}
912			Err(err) => {
913				#[cfg(not(reifydb_single_threaded))]
914				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, qry.rql, qry.params) {
915					return ExecutionResult {
916						frames,
917						error: None,
918						metrics: ExecutionMetrics::default(),
919					};
920				}
921				return ExecutionResult {
922					frames: vec![],
923					error: Some(err),
924					metrics: ExecutionMetrics::default(),
925				};
926			}
927		};
928		let compile_duration = start_compile.elapsed();
929
930		let exec_result = execute_compiled_units(
931			&self.0,
932			&mut Transaction::Query(txn),
933			&compiled,
934			&qry.params,
935			symbols,
936			compile_duration,
937		);
938
939		match exec_result {
940			Ok((output, remaining, _, metrics)) => ExecutionResult {
941				frames: merge_results(output, remaining),
942				error: None,
943				metrics: build_metrics(metrics),
944			},
945			Err(f) => ExecutionResult {
946				frames: vec![],
947				error: Some(f.error),
948				metrics: build_metrics(f.partial_metrics),
949			},
950		}
951	}
952}