Skip to main content

reifydb_engine/vm/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, result::Result as StdResult, sync::Arc, time::Duration};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{
9	error::diagnostic::subscription,
10	execution::ExecutionResult,
11	metric::{ExecutionMetrics, StatementMetric},
12	value::column::columns::Columns,
13};
14use reifydb_metric::storage::metric::MetricReader;
15use reifydb_policy::inject_read_policies;
16use reifydb_rql::{
17	ast::parse_str,
18	compiler::{CompilationResult, Compiled, constrain_policy},
19	fingerprint::request::fingerprint_request,
20};
21use reifydb_store_single::SingleStore;
22use reifydb_transaction::transaction::{
23	RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
24	query::QueryTransaction,
25};
26#[cfg(not(reifydb_single_threaded))]
27use reifydb_type::error::Diagnostic;
28use reifydb_type::{
29	error::Error,
30	params::Params,
31	value::{Value, frame::frame::Frame, r#type::Type},
32};
33use tracing::instrument;
34
35#[cfg(not(reifydb_single_threaded))]
36use crate::remote;
37use crate::{
38	Result,
39	policy::PolicyEvaluator,
40	vm::{
41		Admin, Command, Query, Subscription, Test,
42		services::{EngineConfig, Services},
43		stack::{SymbolTable, Variable},
44		vm::Vm,
45	},
46};
47
48/// Executor is the orchestration layer for RQL statement execution.
49pub struct Executor(Arc<Services>);
50
51impl Clone for Executor {
52	fn clone(&self) -> Self {
53		Self(self.0.clone())
54	}
55}
56
57impl Deref for Executor {
58	type Target = Services;
59
60	fn deref(&self) -> &Self::Target {
61		&self.0
62	}
63}
64
65impl Executor {
66	pub fn new(
67		catalog: Catalog,
68		config: EngineConfig,
69		flow_operator_store: SystemFlowOperatorStore,
70		stats_reader: MetricReader<SingleStore>,
71	) -> Self {
72		Self(Arc::new(Services::new(catalog, config, flow_operator_store, stats_reader)))
73	}
74
75	/// Get a reference to the underlying Services
76	pub fn services(&self) -> &Arc<Services> {
77		&self.0
78	}
79
80	/// Construct an Executor from an existing `Arc<Services>`.
81	pub fn from_services(services: Arc<Services>) -> Self {
82		Self(services)
83	}
84
85	#[allow(dead_code)]
86	pub fn testing() -> Self {
87		Self(Services::testing())
88	}
89
90	/// If the error is a REMOTE_001 and we have a RemoteRegistry, forward the query.
91	/// Returns `Ok(Some(frames))` if forwarded, `Ok(None)` if not a remote query.
92	#[cfg(not(reifydb_single_threaded))]
93	fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
94		if let Some(ref registry) = self.0.remote_registry
95			&& remote::is_remote_query(err)
96			&& let Some(address) = remote::extract_remote_address(err)
97		{
98			let token = remote::extract_remote_token(err);
99			return registry.forward_query(&address, rql, params, token.as_deref()).map(Some);
100		}
101		Ok(None)
102	}
103}
104
105impl RqlExecutor for Executor {
106	fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
107		Executor::rql(self, tx, rql, params)
108	}
109}
110
111/// Populate a stack with parameters so they can be accessed as variables.
112fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
113	match params {
114		Params::Positional(values) => {
115			for (index, value) in values.iter().enumerate() {
116				let param_name = (index + 1).to_string();
117				symbols.set(param_name, Variable::scalar(value.clone()), false)?;
118			}
119		}
120		Params::Named(map) => {
121			for (name, value) in map.iter() {
122				symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
123			}
124		}
125		Params::None => {}
126	}
127	Ok(())
128}
129
130/// Populate the `$identity` variable in the symbol table so policy bodies
131/// (and user RQL) can reference `$identity.id`, `$identity.name`, and `$identity.roles`.
132fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
133	let identity = tx.identity();
134	if identity.is_privileged() {
135		return Ok(());
136	}
137	if identity.is_anonymous() {
138		let columns = Columns::single_row([
139			("id", Value::IdentityId(identity)),
140			("name", Value::none_of(Type::Utf8)),
141			("roles", Value::List(vec![])),
142		]);
143		symbols.set("identity".to_string(), Variable::columns(columns), false)?;
144		return Ok(());
145	}
146	if let Some(user) = catalog.find_identity(tx, identity)? {
147		let roles = catalog.find_role_names_for_identity(tx, identity)?;
148		let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
149		let columns = Columns::single_row([
150			("id", Value::IdentityId(identity)),
151			("name", Value::Utf8(user.name)),
152			("roles", Value::List(role_values)),
153		]);
154		symbols.set("identity".to_string(), Variable::columns(columns), false)?;
155	}
156	Ok(())
157}
158
159/// Execute a list of compiled units, tracking output frames separately.
160type CompiledUnitsResult = (Vec<Frame>, Vec<Frame>, SymbolTable, Vec<StatementMetric>);
161
162/// Error from `execute_compiled_units` that preserves partial metrics.
163struct ExecutionFailure {
164	error: Error,
165	partial_metrics: Vec<StatementMetric>,
166}
167
168/// Build `ExecutionMetrics` from a list of statement metrics.
169fn build_metrics(statements: Vec<StatementMetric>) -> ExecutionMetrics {
170	let fps: Vec<_> = statements.iter().map(|m| m.fingerprint).collect();
171	ExecutionMetrics {
172		fingerprint: fingerprint_request(&fps),
173		statements,
174	}
175}
176
177/// Returns (output_results, last_result, final_symbols, metrics).
178fn execute_compiled_units(
179	services: &Arc<Services>,
180	tx: &mut Transaction<'_>,
181	compiled_list: &[Compiled],
182	params: &Params,
183	mut symbols: SymbolTable,
184	compile_duration: Duration,
185) -> StdResult<CompiledUnitsResult, ExecutionFailure> {
186	let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
187	let mut result = vec![];
188	let mut output_results: Vec<Frame> = Vec::new();
189	let mut metrics = Vec::new();
190
191	for compiled in compiled_list.iter() {
192		result.clear();
193		let mut vm = Vm::from_services(symbols, services, params, tx.identity());
194		let start = services.runtime_context.clock.instant();
195		let run_result = vm.run(services, tx, &compiled.instructions, &mut result);
196		let execute_duration = start.elapsed();
197		symbols = vm.symbols;
198
199		metrics.push(StatementMetric {
200			fingerprint: compiled.fingerprint,
201			normalized_rql: compiled.normalized_rql.clone(),
202			compile_duration_us,
203			execute_duration_us: execute_duration.as_micros() as u64,
204			rows_affected: if run_result.is_ok() {
205				extract_rows_affected(&result)
206			} else {
207				0
208			},
209		});
210
211		if let Err(error) = run_result {
212			return Err(ExecutionFailure {
213				error,
214				partial_metrics: metrics,
215			});
216		}
217
218		if compiled.is_output {
219			output_results.append(&mut result);
220		}
221	}
222
223	Ok((output_results, result, symbols, metrics))
224}
225
226/// Merge output_results and remaining results into the final result.
227fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
228	output_results.append(&mut remaining);
229	output_results
230}
231
232/// Extract the actual rows-affected count from a DML result.
233///
234/// DML handlers (INSERT/UPDATE/DELETE) emit a single summary frame with a
235/// column named "inserted", "updated", or "deleted" containing the count as
236/// a `Uint8` value. When that pattern is detected, return the real count.
237/// Otherwise fall back to the number of frames (correct for SELECT, DDL, etc.).
238fn extract_rows_affected(result: &[Frame]) -> u64 {
239	if result.len() == 1 {
240		let frame = &result[0];
241		for col in &frame.columns {
242			match col.name.as_str() {
243				"inserted" | "updated" | "deleted" => {
244					if col.data.len() == 1
245						&& let Value::Uint8(n) = col.data.get_value(0)
246					{
247						return n;
248					}
249				}
250				_ => {}
251			}
252		}
253	}
254	result.len() as u64
255}
256
257impl Executor {
258	/// Shared setup: create symbols and populate with params + identity.
259	fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
260		let mut symbols = SymbolTable::new();
261		populate_symbols(&mut symbols, params)?;
262		populate_identity(&mut symbols, &self.catalog, tx)?;
263		Ok(symbols)
264	}
265
266	/// Execute RQL against an existing open transaction.
267	///
268	/// This is the universal RQL execution interface: it compiles and runs
269	/// arbitrary RQL within whatever transaction variant the caller provides.
270	#[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
271	pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
272		let mut symbols = match self.setup_symbols(&params, tx) {
273			Ok(s) => s,
274			Err(e) => {
275				return ExecutionResult {
276					frames: vec![],
277					error: Some(e),
278					metrics: ExecutionMetrics::default(),
279				};
280			}
281		};
282
283		let start_compile = self.0.runtime_context.clock.instant();
284		let compiled_list = match self.compiler.compile_with_policy(tx, rql, inject_read_policies) {
285			Ok(CompilationResult::Ready(compiled)) => compiled,
286			Ok(CompilationResult::Incremental(_)) => {
287				unreachable!("incremental compilation not supported in rql()")
288			}
289			Err(err) => {
290				#[cfg(not(reifydb_single_threaded))]
291				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
292					return ExecutionResult {
293						frames,
294						error: None,
295						metrics: ExecutionMetrics::default(),
296					};
297				}
298				return ExecutionResult {
299					frames: vec![],
300					error: Some(err),
301					metrics: ExecutionMetrics::default(),
302				};
303			}
304		};
305		let compile_duration = start_compile.elapsed();
306		let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
307
308		let mut result = vec![];
309		let mut metrics = Vec::new();
310		for compiled in compiled_list.iter() {
311			result.clear();
312			let mut vm = Vm::from_services(symbols, &self.0, &params, tx.identity());
313			let start_execute = self.0.runtime_context.clock.instant();
314			let run_result = vm.run(&self.0, tx, &compiled.instructions, &mut result);
315			let execute_duration = start_execute.elapsed();
316			symbols = vm.symbols;
317
318			metrics.push(StatementMetric {
319				fingerprint: compiled.fingerprint,
320				normalized_rql: compiled.normalized_rql.clone(),
321				compile_duration_us,
322				execute_duration_us: execute_duration.as_micros() as u64,
323				rows_affected: if run_result.is_ok() {
324					extract_rows_affected(&result)
325				} else {
326					0
327				},
328			});
329
330			if let Err(e) = run_result {
331				return ExecutionResult {
332					frames: vec![],
333					error: Some(e),
334					metrics: build_metrics(metrics),
335				};
336			}
337		}
338
339		ExecutionResult {
340			frames: result,
341			error: None,
342			metrics: build_metrics(metrics),
343		}
344	}
345
346	#[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
347	pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> ExecutionResult {
348		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn)) {
349			Ok(s) => s,
350			Err(e) => {
351				return ExecutionResult {
352					frames: vec![],
353					error: Some(e),
354					metrics: ExecutionMetrics::default(),
355				};
356			}
357		};
358
359		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
360			&mut Transaction::Admin(&mut *txn),
361			"admin",
362			true,
363		) {
364			return ExecutionResult {
365				frames: vec![],
366				error: Some(e),
367				metrics: ExecutionMetrics::default(),
368			};
369		}
370
371		let start_compile = self.0.runtime_context.clock.instant();
372		match self.compiler.compile_with_policy(&mut Transaction::Admin(txn), cmd.rql, inject_read_policies) {
373			Err(err) => {
374				#[cfg(not(reifydb_single_threaded))]
375				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
376					return ExecutionResult {
377						frames,
378						error: None,
379						metrics: ExecutionMetrics::default(),
380					};
381				}
382				ExecutionResult {
383					frames: vec![],
384					error: Some(err),
385					metrics: ExecutionMetrics::default(),
386				}
387			}
388			Ok(CompilationResult::Ready(compiled)) => {
389				let compile_duration = start_compile.elapsed();
390				match execute_compiled_units(
391					&self.0,
392					&mut Transaction::Admin(txn),
393					&compiled,
394					&cmd.params,
395					symbols,
396					compile_duration,
397				) {
398					Ok((output, remaining, _, metrics)) => ExecutionResult {
399						frames: merge_results(output, remaining),
400						error: None,
401						metrics: build_metrics(metrics),
402					},
403					Err(f) => ExecutionResult {
404						frames: vec![],
405						error: Some(f.error),
406						metrics: build_metrics(f.partial_metrics),
407					},
408				}
409			}
410			Ok(CompilationResult::Incremental(mut state)) => {
411				let policy = constrain_policy(|plans, bump, cat, tx| {
412					inject_read_policies(plans, bump, cat, tx)
413				});
414				let mut result = vec![];
415				let mut output_results: Vec<Frame> = Vec::new();
416				let mut symbols = symbols;
417				let mut metrics = Vec::new();
418				loop {
419					let start_incr = self.0.runtime_context.clock.instant();
420					let next = match self.compiler.compile_next_with_policy(
421						&mut Transaction::Admin(txn),
422						&mut state,
423						&policy,
424					) {
425						Ok(n) => n,
426						Err(e) => {
427							return ExecutionResult {
428								frames: vec![],
429								error: Some(e),
430								metrics: build_metrics(metrics),
431							};
432						}
433					};
434					let compile_duration = start_incr.elapsed();
435
436					let Some(compiled) = next else {
437						break;
438					};
439
440					result.clear();
441					let mut tx = Transaction::Admin(txn);
442					let mut vm = Vm::from_services(symbols, &self.0, &cmd.params, tx.identity());
443					let start_execute = self.0.runtime_context.clock.instant();
444					let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
445					let execute_duration = start_execute.elapsed();
446					symbols = vm.symbols;
447
448					metrics.push(StatementMetric {
449						fingerprint: compiled.fingerprint,
450						normalized_rql: compiled.normalized_rql,
451						compile_duration_us: compile_duration.as_micros() as u64,
452						execute_duration_us: execute_duration.as_micros() as u64,
453						rows_affected: if run_result.is_ok() {
454							extract_rows_affected(&result)
455						} else {
456							0
457						},
458					});
459
460					if let Err(e) = run_result {
461						return ExecutionResult {
462							frames: vec![],
463							error: Some(e),
464							metrics: build_metrics(metrics),
465						};
466					}
467
468					if compiled.is_output {
469						output_results.append(&mut result);
470					}
471				}
472				ExecutionResult {
473					frames: merge_results(output_results, result),
474					error: None,
475					metrics: build_metrics(metrics),
476				}
477			}
478		}
479	}
480
481	#[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
482	pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> ExecutionResult {
483		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Test(Box::new(txn.reborrow()))) {
484			Ok(s) => s,
485			Err(e) => {
486				return ExecutionResult {
487					frames: vec![],
488					error: Some(e),
489					metrics: ExecutionMetrics::default(),
490				};
491			}
492		};
493
494		let session_type = txn.session_type.clone();
495		let session_default_deny = txn.session_default_deny;
496		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
497			&mut Transaction::Test(Box::new(txn.reborrow())),
498			&session_type,
499			session_default_deny,
500		) {
501			return ExecutionResult {
502				frames: vec![],
503				error: Some(e),
504				metrics: ExecutionMetrics::default(),
505			};
506		}
507
508		let start_compile = self.0.runtime_context.clock.instant();
509		match self.compiler.compile_with_policy(
510			&mut Transaction::Test(Box::new(txn.reborrow())),
511			cmd.rql,
512			inject_read_policies,
513		) {
514			Err(err) => {
515				#[cfg(not(reifydb_single_threaded))]
516				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
517					return ExecutionResult {
518						frames,
519						error: None,
520						metrics: ExecutionMetrics::default(),
521					};
522				}
523				ExecutionResult {
524					frames: vec![],
525					error: Some(err),
526					metrics: ExecutionMetrics::default(),
527				}
528			}
529			Ok(CompilationResult::Ready(compiled)) => {
530				let compile_duration = start_compile.elapsed();
531				match execute_compiled_units(
532					&self.0,
533					&mut Transaction::Test(Box::new(txn.reborrow())),
534					&compiled,
535					&cmd.params,
536					symbols,
537					compile_duration,
538				) {
539					Ok((output, remaining, _, metrics)) => ExecutionResult {
540						frames: merge_results(output, remaining),
541						error: None,
542						metrics: build_metrics(metrics),
543					},
544					Err(f) => ExecutionResult {
545						frames: vec![],
546						error: Some(f.error),
547						metrics: build_metrics(f.partial_metrics),
548					},
549				}
550			}
551			Ok(CompilationResult::Incremental(mut state)) => {
552				let policy = constrain_policy(|plans, bump, cat, tx| {
553					inject_read_policies(plans, bump, cat, tx)
554				});
555				let mut result = vec![];
556				let mut output_results: Vec<Frame> = Vec::new();
557				let mut symbols = symbols;
558				let mut metrics = Vec::new();
559				loop {
560					let start_incr = self.0.runtime_context.clock.instant();
561					let next = match self.compiler.compile_next_with_policy(
562						&mut Transaction::Test(Box::new(txn.reborrow())),
563						&mut state,
564						&policy,
565					) {
566						Ok(n) => n,
567						Err(e) => {
568							return ExecutionResult {
569								frames: vec![],
570								error: Some(e),
571								metrics: build_metrics(metrics),
572							};
573						}
574					};
575					let compile_duration = start_incr.elapsed();
576
577					let Some(compiled) = next else {
578						break;
579					};
580
581					result.clear();
582					let mut tx = Transaction::Test(Box::new(txn.reborrow()));
583					let mut vm = Vm::from_services(symbols, &self.0, &cmd.params, tx.identity());
584					let start_execute = self.0.runtime_context.clock.instant();
585					let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
586					let execute_duration = start_execute.elapsed();
587					symbols = vm.symbols;
588
589					metrics.push(StatementMetric {
590						fingerprint: compiled.fingerprint,
591						normalized_rql: compiled.normalized_rql,
592						compile_duration_us: compile_duration.as_micros() as u64,
593						execute_duration_us: execute_duration.as_micros() as u64,
594						rows_affected: if run_result.is_ok() {
595							extract_rows_affected(&result)
596						} else {
597							0
598						},
599					});
600
601					if let Err(e) = run_result {
602						return ExecutionResult {
603							frames: vec![],
604							error: Some(e),
605							metrics: build_metrics(metrics),
606						};
607					}
608
609					if compiled.is_output {
610						output_results.append(&mut result);
611					}
612				}
613				ExecutionResult {
614					frames: merge_results(output_results, result),
615					error: None,
616					metrics: build_metrics(metrics),
617				}
618			}
619		}
620	}
621
622	#[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
623	pub fn subscription(&self, txn: &mut QueryTransaction, cmd: Subscription<'_>) -> ExecutionResult {
624		// Pre-compilation validation: parse and check statement constraints
625		let bump = Bump::new();
626		let statements = match parse_str(&bump, cmd.rql) {
627			Ok(s) => s,
628			Err(e) => {
629				return ExecutionResult {
630					frames: vec![],
631					error: Some(e),
632					metrics: ExecutionMetrics::default(),
633				};
634			}
635		};
636
637		if statements.len() != 1 {
638			return ExecutionResult {
639				frames: vec![],
640				error: Some(Error(Box::new(subscription::single_statement_required(
641					"Subscription endpoint requires exactly one statement",
642				)))),
643				metrics: ExecutionMetrics::default(),
644			};
645		}
646
647		let statement = &statements[0];
648		if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
649			return ExecutionResult {
650				frames: vec![],
651				error: Some(Error(Box::new(subscription::invalid_statement(
652					"Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
653				)))),
654				metrics: ExecutionMetrics::default(),
655			};
656		}
657
658		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Query(&mut *txn)) {
659			Ok(s) => s,
660			Err(e) => {
661				return ExecutionResult {
662					frames: vec![],
663					error: Some(e),
664					metrics: ExecutionMetrics::default(),
665				};
666			}
667		};
668
669		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
670			&mut Transaction::Query(&mut *txn),
671			"subscription",
672			true,
673		) {
674			return ExecutionResult {
675				frames: vec![],
676				error: Some(e),
677				metrics: ExecutionMetrics::default(),
678			};
679		}
680
681		let start_compile = self.0.runtime_context.clock.instant();
682		let compiled = match self.compiler.compile_with_policy(
683			&mut Transaction::Query(txn),
684			cmd.rql,
685			inject_read_policies,
686		) {
687			Ok(CompilationResult::Ready(compiled)) => compiled,
688			Ok(CompilationResult::Incremental(_)) => {
689				unreachable!("Single subscription statement should not require incremental compilation")
690			}
691			Err(err) => {
692				return ExecutionResult {
693					frames: vec![],
694					error: Some(err),
695					metrics: ExecutionMetrics::default(),
696				};
697			}
698		};
699		let compile_duration = start_compile.elapsed();
700
701		match execute_compiled_units(
702			&self.0,
703			&mut Transaction::Query(txn),
704			&compiled,
705			&cmd.params,
706			symbols,
707			compile_duration,
708		) {
709			Ok((output, remaining, _, metrics)) => ExecutionResult {
710				frames: merge_results(output, remaining),
711				error: None,
712				metrics: build_metrics(metrics),
713			},
714			Err(f) => ExecutionResult {
715				frames: vec![],
716				error: Some(f.error),
717				metrics: build_metrics(f.partial_metrics),
718			},
719		}
720	}
721
722	#[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
723	pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> ExecutionResult {
724		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn)) {
725			Ok(s) => s,
726			Err(e) => {
727				return ExecutionResult {
728					frames: vec![],
729					error: Some(e),
730					metrics: ExecutionMetrics::default(),
731				};
732			}
733		};
734
735		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
736			&mut Transaction::Command(&mut *txn),
737			"command",
738			false,
739		) {
740			return ExecutionResult {
741				frames: vec![],
742				error: Some(e),
743				metrics: ExecutionMetrics::default(),
744			};
745		}
746
747		let start_compile = self.0.runtime_context.clock.instant();
748		let compiled = match self.compiler.compile_with_policy(
749			&mut Transaction::Command(txn),
750			cmd.rql,
751			inject_read_policies,
752		) {
753			Ok(CompilationResult::Ready(compiled)) => compiled,
754			Ok(CompilationResult::Incremental(_)) => {
755				unreachable!("DDL statements require admin transactions, not command transactions")
756			}
757			Err(err) => {
758				#[cfg(not(reifydb_single_threaded))]
759				if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
760					return ExecutionResult {
761						frames: vec![],
762						error: Some(Error(Box::new(Diagnostic {
763							code: "REMOTE_002".to_string(),
764							message: "Write operations on remote namespaces are not supported"
765								.to_string(),
766							help: Some("Use the remote instance directly for write operations"
767								.to_string()),
768							..Default::default()
769						}))),
770						metrics: ExecutionMetrics::default(),
771					};
772				}
773				return ExecutionResult {
774					frames: vec![],
775					error: Some(err),
776					metrics: ExecutionMetrics::default(),
777				};
778			}
779		};
780		let compile_duration = start_compile.elapsed();
781
782		match execute_compiled_units(
783			&self.0,
784			&mut Transaction::Command(txn),
785			&compiled,
786			&cmd.params,
787			symbols,
788			compile_duration,
789		) {
790			Ok((output, remaining, _, metrics)) => ExecutionResult {
791				frames: merge_results(output, remaining),
792				error: None,
793				metrics: build_metrics(metrics),
794			},
795			Err(f) => ExecutionResult {
796				frames: vec![],
797				error: Some(f.error),
798				metrics: build_metrics(f.partial_metrics),
799			},
800		}
801	}
802
803	/// Call a procedure by fully-qualified name (e.g., "banking.transfer_funds").
804	#[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
805	pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> ExecutionResult {
806		let rql = format!("CALL {}()", name);
807		let symbols = match self.setup_symbols(params, &mut Transaction::Command(&mut *txn)) {
808			Ok(s) => s,
809			Err(e) => {
810				return ExecutionResult {
811					frames: vec![],
812					error: Some(e),
813					metrics: ExecutionMetrics::default(),
814				};
815			}
816		};
817
818		let start_compile = self.0.runtime_context.clock.instant();
819		let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql) {
820			Ok(CompilationResult::Ready(compiled)) => compiled,
821			Ok(CompilationResult::Incremental(_)) => {
822				unreachable!("CALL statements should not require incremental compilation")
823			}
824			Err(e) => {
825				return ExecutionResult {
826					frames: vec![],
827					error: Some(e),
828					metrics: ExecutionMetrics::default(),
829				};
830			}
831		};
832		let compile_duration = start_compile.elapsed();
833		let compile_duration_us = compile_duration.as_micros() as u64 / compiled.len().max(1) as u64;
834
835		let mut result = vec![];
836		let mut metrics = Vec::new();
837		let mut symbols = symbols;
838		for compiled in compiled.iter() {
839			result.clear();
840			let mut tx = Transaction::Command(txn);
841			let mut vm = Vm::from_services(symbols, &self.0, params, tx.identity());
842			let start_execute = self.0.runtime_context.clock.instant();
843			let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
844			let execute_duration = start_execute.elapsed();
845			symbols = vm.symbols;
846
847			metrics.push(StatementMetric {
848				fingerprint: compiled.fingerprint,
849				normalized_rql: compiled.normalized_rql.clone(),
850				compile_duration_us,
851				execute_duration_us: execute_duration.as_micros() as u64,
852				rows_affected: if run_result.is_ok() {
853					extract_rows_affected(&result)
854				} else {
855					0
856				},
857			});
858
859			if let Err(e) = run_result {
860				return ExecutionResult {
861					frames: vec![],
862					error: Some(e),
863					metrics: build_metrics(metrics),
864				};
865			}
866		}
867
868		ExecutionResult {
869			frames: result,
870			error: None,
871			metrics: build_metrics(metrics),
872		}
873	}
874
875	#[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
876	pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> ExecutionResult {
877		let symbols = match self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn)) {
878			Ok(s) => s,
879			Err(e) => {
880				return ExecutionResult {
881					frames: vec![],
882					error: Some(e),
883					metrics: ExecutionMetrics::default(),
884				};
885			}
886		};
887
888		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
889			&mut Transaction::Query(&mut *txn),
890			"query",
891			false,
892		) {
893			return ExecutionResult {
894				frames: vec![],
895				error: Some(e),
896				metrics: ExecutionMetrics::default(),
897			};
898		}
899
900		let start_compile = self.0.runtime_context.clock.instant();
901		let compiled = match self.compiler.compile_with_policy(
902			&mut Transaction::Query(txn),
903			qry.rql,
904			inject_read_policies,
905		) {
906			Ok(CompilationResult::Ready(compiled)) => compiled,
907			Ok(CompilationResult::Incremental(_)) => {
908				unreachable!("DDL statements require admin transactions, not query transactions")
909			}
910			Err(err) => {
911				#[cfg(not(reifydb_single_threaded))]
912				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, qry.rql, qry.params) {
913					return ExecutionResult {
914						frames,
915						error: None,
916						metrics: ExecutionMetrics::default(),
917					};
918				}
919				return ExecutionResult {
920					frames: vec![],
921					error: Some(err),
922					metrics: ExecutionMetrics::default(),
923				};
924			}
925		};
926		let compile_duration = start_compile.elapsed();
927
928		let exec_result = execute_compiled_units(
929			&self.0,
930			&mut Transaction::Query(txn),
931			&compiled,
932			&qry.params,
933			symbols,
934			compile_duration,
935		);
936
937		match exec_result {
938			Ok((output, remaining, _, metrics)) => ExecutionResult {
939				frames: merge_results(output, remaining),
940				error: None,
941				metrics: build_metrics(metrics),
942			},
943			Err(f) => ExecutionResult {
944				frames: vec![],
945				error: Some(f.error),
946				metrics: build_metrics(f.partial_metrics),
947			},
948		}
949	}
950}