Skip to main content

reifydb_engine/vm/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, result::Result as StdResult, sync::Arc, time::Duration};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{
9	error::diagnostic::subscription,
10	execution::ExecutionResult,
11	interface::catalog::policy::SessionOp,
12	metric::{ExecutionMetrics, StatementMetric},
13	value::column::columns::Columns,
14};
15use reifydb_metric::storage::metric::MetricReader;
16use reifydb_policy::inject_from_policies;
17use reifydb_rql::{
18	ast::parse_str,
19	compiler::{CompilationResult, Compiled, IncrementalCompilation, constrain_policy},
20	fingerprint::request::fingerprint_request,
21};
22use reifydb_runtime::context::clock::Instant;
23use reifydb_store_single::SingleStore;
24use reifydb_transaction::transaction::{
25	RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
26	query::QueryTransaction,
27};
28#[cfg(not(reifydb_single_threaded))]
29use reifydb_type::error::Diagnostic;
30use reifydb_type::{
31	error::Error,
32	params::Params,
33	value::{Value, frame::frame::Frame, r#type::Type},
34};
35use tracing::instrument;
36
37#[cfg(not(reifydb_single_threaded))]
38use crate::remote;
39use crate::{
40	Result,
41	policy::PolicyEvaluator,
42	vm::{
43		Admin, Command, Query, Subscription, Test,
44		services::{EngineConfig, Services},
45		stack::{SymbolTable, Variable},
46		vm::Vm,
47	},
48};
49
50/// Executor is the orchestration layer for RQL statement execution.
51pub struct Executor(Arc<Services>);
52
53impl Clone for Executor {
54	fn clone(&self) -> Self {
55		Self(self.0.clone())
56	}
57}
58
59impl Deref for Executor {
60	type Target = Services;
61
62	fn deref(&self) -> &Self::Target {
63		&self.0
64	}
65}
66
67impl Executor {
68	pub fn new(
69		catalog: Catalog,
70		config: EngineConfig,
71		flow_operator_store: SystemFlowOperatorStore,
72		stats_reader: MetricReader<SingleStore>,
73	) -> Self {
74		Self(Arc::new(Services::new(catalog, config, flow_operator_store, stats_reader)))
75	}
76
77	/// Get a reference to the underlying Services
78	pub fn services(&self) -> &Arc<Services> {
79		&self.0
80	}
81
82	/// Construct an Executor from an existing `Arc<Services>`.
83	pub fn from_services(services: Arc<Services>) -> Self {
84		Self(services)
85	}
86
87	#[allow(dead_code)]
88	pub fn testing() -> Self {
89		Self(Services::testing())
90	}
91
92	/// If the error is a REMOTE_001 and we have a RemoteRegistry, forward the query.
93	/// Returns `Ok(Some(frames))` if forwarded, `Ok(None)` if not a remote query.
94	#[cfg(not(reifydb_single_threaded))]
95	fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
96		if let Some(ref registry) = self.0.remote_registry
97			&& remote::is_remote_query(err)
98			&& let Some(address) = remote::extract_remote_address(err)
99		{
100			let token = remote::extract_remote_token(err);
101			return registry.forward_query(&address, rql, params, token.as_deref()).map(Some);
102		}
103		Ok(None)
104	}
105}
106
107impl RqlExecutor for Executor {
108	fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
109		Executor::rql(self, tx, rql, params)
110	}
111}
112
113/// Populate a stack with parameters so they can be accessed as variables.
114fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
115	match params {
116		Params::Positional(values) => {
117			for (index, value) in values.iter().enumerate() {
118				let param_name = (index + 1).to_string();
119				symbols.set(param_name, Variable::scalar(value.clone()), false)?;
120			}
121		}
122		Params::Named(map) => {
123			for (name, value) in map.iter() {
124				symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
125			}
126		}
127		Params::None => {}
128	}
129	Ok(())
130}
131
132/// Populate the `$identity` variable in the symbol table so policy bodies
133/// (and user RQL) can reference `$identity.id`, `$identity.name`, and `$identity.roles`.
134fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
135	let identity = tx.identity();
136	if identity.is_privileged() {
137		return Ok(());
138	}
139	if identity.is_anonymous() {
140		let columns = Columns::single_row([
141			("id", Value::IdentityId(identity)),
142			("name", Value::none_of(Type::Utf8)),
143			("roles", Value::List(vec![])),
144		]);
145		symbols.set("identity".to_string(), Variable::columns(columns), false)?;
146		return Ok(());
147	}
148	if let Some(user) = catalog.find_identity(tx, identity)? {
149		let roles = catalog.find_role_names_for_identity(tx, identity)?;
150		let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
151		let columns = Columns::single_row([
152			("id", Value::IdentityId(identity)),
153			("name", Value::Utf8(user.name)),
154			("roles", Value::List(role_values)),
155		]);
156		symbols.set("identity".to_string(), Variable::columns(columns), false)?;
157	}
158	Ok(())
159}
160
161/// Execute a list of compiled units, tracking output frames separately.
162type CompiledUnitsResult = (Vec<Frame>, Vec<Frame>, SymbolTable, Vec<StatementMetric>);
163
164/// Error from `execute_compiled_units` that preserves partial metrics.
165struct ExecutionFailure {
166	error: Error,
167	partial_metrics: Vec<StatementMetric>,
168}
169
170/// Build `ExecutionMetrics` from a list of statement metrics.
171fn build_metrics(statements: Vec<StatementMetric>) -> ExecutionMetrics {
172	let fps: Vec<_> = statements.iter().map(|m| m.fingerprint).collect();
173	ExecutionMetrics {
174		fingerprint: fingerprint_request(&fps),
175		statements,
176		..Default::default()
177	}
178}
179
180/// Returns (output_results, last_result, final_symbols, metrics).
181fn execute_compiled_units(
182	services: &Arc<Services>,
183	tx: &mut Transaction<'_>,
184	compiled_list: &[Compiled],
185	params: &Params,
186	mut symbols: SymbolTable,
187	compile_duration: Duration,
188) -> StdResult<CompiledUnitsResult, ExecutionFailure> {
189	let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
190	let mut result = vec![];
191	let mut output_results: Vec<Frame> = Vec::new();
192	let mut metrics = Vec::new();
193
194	for compiled in compiled_list.iter() {
195		result.clear();
196		let mut vm = Vm::from_services(symbols, services, params, tx.identity());
197		let start = services.runtime_context.clock.instant();
198		let run_result = vm.run(services, tx, &compiled.instructions, &mut result);
199		let execute_duration = start.elapsed();
200		symbols = vm.symbols;
201
202		metrics.push(StatementMetric {
203			fingerprint: compiled.fingerprint,
204			normalized_rql: compiled.normalized_rql.clone(),
205			compile_duration_us,
206			execute_duration_us: execute_duration.as_micros() as u64,
207			rows_affected: if run_result.is_ok() {
208				extract_rows_affected(&result)
209			} else {
210				0
211			},
212		});
213
214		if let Err(error) = run_result {
215			return Err(ExecutionFailure {
216				error,
217				partial_metrics: metrics,
218			});
219		}
220
221		if compiled.is_output {
222			output_results.append(&mut result);
223		}
224	}
225
226	Ok((output_results, result, symbols, metrics))
227}
228
229/// Merge output_results and remaining results into the final result.
230fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
231	output_results.append(&mut remaining);
232	output_results
233}
234
235#[inline]
236fn error_result(error: Error, metrics: ExecutionMetrics) -> ExecutionResult {
237	ExecutionResult {
238		frames: vec![],
239		error: Some(error),
240		metrics,
241	}
242}
243
244/// Extract the actual rows-affected count from a DML result.
245///
246/// DML handlers (INSERT/UPDATE/DELETE) emit a single summary frame with a
247/// column named "inserted", "updated", or "deleted" containing the count as
248/// a `Uint8` value. When that pattern is detected, return the real count.
249/// Otherwise fall back to the number of frames (correct for SELECT, DDL, etc.).
250fn extract_rows_affected(result: &[Frame]) -> u64 {
251	if result.len() == 1 {
252		let frame = &result[0];
253		for col in &frame.columns {
254			match col.name.as_str() {
255				"inserted" | "updated" | "deleted" => {
256					if col.data.len() == 1
257						&& let Value::Uint8(n) = col.data.get_value(0)
258					{
259						return n;
260					}
261				}
262				_ => {}
263			}
264		}
265	}
266	result.len() as u64
267}
268
269impl Executor {
270	/// Shared setup: create symbols and populate with params + identity.
271	fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
272		let mut symbols = SymbolTable::new();
273		populate_symbols(&mut symbols, params)?;
274		populate_identity(&mut symbols, &self.catalog, tx)?;
275		Ok(symbols)
276	}
277
278	/// Execute RQL against an existing open transaction.
279	///
280	/// This is the universal RQL execution interface: it compiles and runs
281	/// arbitrary RQL within whatever transaction variant the caller provides.
282	#[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
283	pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
284		let mut symbols = match self.setup_symbols(&params, tx) {
285			Ok(s) => s,
286			Err(e) => {
287				return ExecutionResult {
288					frames: vec![],
289					error: Some(e),
290					metrics: ExecutionMetrics::default(),
291				};
292			}
293		};
294
295		let start_compile = self.0.runtime_context.clock.instant();
296		let compiled_list = match self.compiler.compile_with_policy(tx, rql, inject_from_policies) {
297			Ok(CompilationResult::Ready(compiled)) => compiled,
298			Ok(CompilationResult::Incremental(_)) => {
299				unreachable!("incremental compilation not supported in rql()")
300			}
301			Err(err) => {
302				#[cfg(not(reifydb_single_threaded))]
303				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
304					return ExecutionResult {
305						frames,
306						error: None,
307						metrics: ExecutionMetrics::default(),
308					};
309				}
310				return ExecutionResult {
311					frames: vec![],
312					error: Some(err),
313					metrics: ExecutionMetrics::default(),
314				};
315			}
316		};
317		let compile_duration = start_compile.elapsed();
318		let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
319
320		let mut result = vec![];
321		let mut metrics = Vec::new();
322		for compiled in compiled_list.iter() {
323			result.clear();
324			let mut vm = Vm::from_services(symbols, &self.0, &params, tx.identity());
325			let start_execute = self.0.runtime_context.clock.instant();
326			let run_result = vm.run(&self.0, tx, &compiled.instructions, &mut result);
327			let execute_duration = start_execute.elapsed();
328			symbols = vm.symbols;
329
330			metrics.push(StatementMetric {
331				fingerprint: compiled.fingerprint,
332				normalized_rql: compiled.normalized_rql.clone(),
333				compile_duration_us,
334				execute_duration_us: execute_duration.as_micros() as u64,
335				rows_affected: if run_result.is_ok() {
336					extract_rows_affected(&result)
337				} else {
338					0
339				},
340			});
341
342			if let Err(e) = run_result {
343				return ExecutionResult {
344					frames: vec![],
345					error: Some(e),
346					metrics: build_metrics(metrics),
347				};
348			}
349		}
350
351		ExecutionResult {
352			frames: result,
353			error: None,
354			metrics: build_metrics(metrics),
355		}
356	}
357
358	#[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
359	pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> ExecutionResult {
360		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn)) {
361			Ok(s) => s,
362			Err(e) => return error_result(e, ExecutionMetrics::default()),
363		};
364		if let Err(e) = self.enforce_admin_policy(&symbols, txn) {
365			return error_result(e, ExecutionMetrics::default());
366		}
367		let start_compile = self.0.runtime_context.clock.instant();
368		match self.compiler.compile_with_policy(&mut Transaction::Admin(txn), cmd.rql, inject_from_policies) {
369			Err(err) => self.handle_admin_compile_error(err, cmd.rql, cmd.params),
370			Ok(CompilationResult::Ready(compiled)) => {
371				self.execute_admin_ready(txn, compiled, &cmd.params, symbols, start_compile)
372			}
373			Ok(CompilationResult::Incremental(state)) => {
374				self.execute_admin_incremental(txn, state, &cmd.params, symbols)
375			}
376		}
377	}
378
379	#[inline]
380	fn enforce_admin_policy(&self, symbols: &SymbolTable, txn: &mut AdminTransaction) -> Result<()> {
381		PolicyEvaluator::new(&self.0, symbols).enforce_session_policy(
382			&mut Transaction::Admin(txn),
383			SessionOp::Admin,
384			true,
385		)
386	}
387
388	#[inline]
389	#[cfg_attr(reifydb_single_threaded, allow(unused_variables))]
390	fn handle_admin_compile_error(&self, err: Error, rql: &str, params: Params) -> ExecutionResult {
391		#[cfg(not(reifydb_single_threaded))]
392		if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
393			return ExecutionResult {
394				frames,
395				error: None,
396				metrics: ExecutionMetrics::default(),
397			};
398		}
399		error_result(err, ExecutionMetrics::default())
400	}
401
402	#[inline]
403	fn execute_admin_ready(
404		&self,
405		txn: &mut AdminTransaction,
406		compiled: Arc<Vec<Compiled>>,
407		params: &Params,
408		symbols: SymbolTable,
409		start_compile: Instant,
410	) -> ExecutionResult {
411		let compile_duration = start_compile.elapsed();
412		match execute_compiled_units(
413			&self.0,
414			&mut Transaction::Admin(txn),
415			&compiled,
416			params,
417			symbols,
418			compile_duration,
419		) {
420			Ok((output, remaining, _, metrics)) => ExecutionResult {
421				frames: merge_results(output, remaining),
422				error: None,
423				metrics: build_metrics(metrics),
424			},
425			Err(f) => ExecutionResult {
426				frames: vec![],
427				error: Some(f.error),
428				metrics: build_metrics(f.partial_metrics),
429			},
430		}
431	}
432
433	fn execute_admin_incremental(
434		&self,
435		txn: &mut AdminTransaction,
436		mut state: IncrementalCompilation,
437		params: &Params,
438		symbols: SymbolTable,
439	) -> ExecutionResult {
440		let policy = constrain_policy(inject_from_policies);
441		let mut result = vec![];
442		let mut output_results: Vec<Frame> = Vec::new();
443		let mut symbols = symbols;
444		let mut metrics = Vec::new();
445		loop {
446			let start_incr = self.0.runtime_context.clock.instant();
447			let next = match self.compiler.compile_next_with_policy(
448				&mut Transaction::Admin(txn),
449				&mut state,
450				&policy,
451			) {
452				Ok(n) => n,
453				Err(e) => return error_result(e, build_metrics(metrics)),
454			};
455			let compile_duration = start_incr.elapsed();
456
457			let Some(compiled) = next else {
458				break;
459			};
460
461			result.clear();
462			let mut tx = Transaction::Admin(txn);
463			let mut vm = Vm::from_services(symbols, &self.0, params, tx.identity());
464			let start_execute = self.0.runtime_context.clock.instant();
465			let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
466			let execute_duration = start_execute.elapsed();
467			symbols = vm.symbols;
468
469			metrics.push(StatementMetric {
470				fingerprint: compiled.fingerprint,
471				normalized_rql: compiled.normalized_rql,
472				compile_duration_us: compile_duration.as_micros() as u64,
473				execute_duration_us: execute_duration.as_micros() as u64,
474				rows_affected: if run_result.is_ok() {
475					extract_rows_affected(&result)
476				} else {
477					0
478				},
479			});
480
481			if let Err(e) = run_result {
482				return error_result(e, build_metrics(metrics));
483			}
484
485			if compiled.is_output {
486				output_results.append(&mut result);
487			}
488		}
489		ExecutionResult {
490			frames: merge_results(output_results, result),
491			error: None,
492			metrics: build_metrics(metrics),
493		}
494	}
495
496	#[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
497	pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> ExecutionResult {
498		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Test(Box::new(txn.reborrow()))) {
499			Ok(s) => s,
500			Err(e) => {
501				return ExecutionResult {
502					frames: vec![],
503					error: Some(e),
504					metrics: ExecutionMetrics::default(),
505				};
506			}
507		};
508
509		let session_type = txn.session_type;
510		let session_default_deny = txn.session_default_deny;
511		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
512			&mut Transaction::Test(Box::new(txn.reborrow())),
513			session_type,
514			session_default_deny,
515		) {
516			return ExecutionResult {
517				frames: vec![],
518				error: Some(e),
519				metrics: ExecutionMetrics::default(),
520			};
521		}
522
523		let start_compile = self.0.runtime_context.clock.instant();
524		match self.compiler.compile_with_policy(
525			&mut Transaction::Test(Box::new(txn.reborrow())),
526			cmd.rql,
527			inject_from_policies,
528		) {
529			Err(err) => {
530				#[cfg(not(reifydb_single_threaded))]
531				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
532					return ExecutionResult {
533						frames,
534						error: None,
535						metrics: ExecutionMetrics::default(),
536					};
537				}
538				ExecutionResult {
539					frames: vec![],
540					error: Some(err),
541					metrics: ExecutionMetrics::default(),
542				}
543			}
544			Ok(CompilationResult::Ready(compiled)) => {
545				let compile_duration = start_compile.elapsed();
546				match execute_compiled_units(
547					&self.0,
548					&mut Transaction::Test(Box::new(txn.reborrow())),
549					&compiled,
550					&cmd.params,
551					symbols,
552					compile_duration,
553				) {
554					Ok((output, remaining, _, metrics)) => ExecutionResult {
555						frames: merge_results(output, remaining),
556						error: None,
557						metrics: build_metrics(metrics),
558					},
559					Err(f) => ExecutionResult {
560						frames: vec![],
561						error: Some(f.error),
562						metrics: build_metrics(f.partial_metrics),
563					},
564				}
565			}
566			Ok(CompilationResult::Incremental(mut state)) => {
567				let policy = constrain_policy(|plans, bump, cat, tx| {
568					inject_from_policies(plans, bump, cat, tx)
569				});
570				let mut result = vec![];
571				let mut output_results: Vec<Frame> = Vec::new();
572				let mut symbols = symbols;
573				let mut metrics = Vec::new();
574				loop {
575					let start_incr = self.0.runtime_context.clock.instant();
576					let next = match self.compiler.compile_next_with_policy(
577						&mut Transaction::Test(Box::new(txn.reborrow())),
578						&mut state,
579						&policy,
580					) {
581						Ok(n) => n,
582						Err(e) => {
583							return ExecutionResult {
584								frames: vec![],
585								error: Some(e),
586								metrics: build_metrics(metrics),
587							};
588						}
589					};
590					let compile_duration = start_incr.elapsed();
591
592					let Some(compiled) = next else {
593						break;
594					};
595
596					result.clear();
597					let mut tx = Transaction::Test(Box::new(txn.reborrow()));
598					let mut vm = Vm::from_services(symbols, &self.0, &cmd.params, tx.identity());
599					let start_execute = self.0.runtime_context.clock.instant();
600					let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
601					let execute_duration = start_execute.elapsed();
602					symbols = vm.symbols;
603
604					metrics.push(StatementMetric {
605						fingerprint: compiled.fingerprint,
606						normalized_rql: compiled.normalized_rql,
607						compile_duration_us: compile_duration.as_micros() as u64,
608						execute_duration_us: execute_duration.as_micros() as u64,
609						rows_affected: if run_result.is_ok() {
610							extract_rows_affected(&result)
611						} else {
612							0
613						},
614					});
615
616					if let Err(e) = run_result {
617						return ExecutionResult {
618							frames: vec![],
619							error: Some(e),
620							metrics: build_metrics(metrics),
621						};
622					}
623
624					if compiled.is_output {
625						output_results.append(&mut result);
626					}
627				}
628				ExecutionResult {
629					frames: merge_results(output_results, result),
630					error: None,
631					metrics: build_metrics(metrics),
632				}
633			}
634		}
635	}
636
637	#[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
638	pub fn subscription(&self, txn: &mut QueryTransaction, cmd: Subscription<'_>) -> ExecutionResult {
639		// Pre-compilation validation: parse and check statement constraints
640		let bump = Bump::new();
641		let statements = match parse_str(&bump, cmd.rql) {
642			Ok(s) => s,
643			Err(e) => {
644				return ExecutionResult {
645					frames: vec![],
646					error: Some(e),
647					metrics: ExecutionMetrics::default(),
648				};
649			}
650		};
651
652		if statements.len() != 1 {
653			return ExecutionResult {
654				frames: vec![],
655				error: Some(Error(Box::new(subscription::single_statement_required(
656					"Subscription endpoint requires exactly one statement",
657				)))),
658				metrics: ExecutionMetrics::default(),
659			};
660		}
661
662		let statement = &statements[0];
663		if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
664			return ExecutionResult {
665				frames: vec![],
666				error: Some(Error(Box::new(subscription::invalid_statement(
667					"Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
668				)))),
669				metrics: ExecutionMetrics::default(),
670			};
671		}
672
673		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Query(&mut *txn)) {
674			Ok(s) => s,
675			Err(e) => {
676				return ExecutionResult {
677					frames: vec![],
678					error: Some(e),
679					metrics: ExecutionMetrics::default(),
680				};
681			}
682		};
683
684		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
685			&mut Transaction::Query(&mut *txn),
686			SessionOp::Subscription,
687			true,
688		) {
689			return ExecutionResult {
690				frames: vec![],
691				error: Some(e),
692				metrics: ExecutionMetrics::default(),
693			};
694		}
695
696		let start_compile = self.0.runtime_context.clock.instant();
697		let compiled = match self.compiler.compile_with_policy(
698			&mut Transaction::Query(txn),
699			cmd.rql,
700			inject_from_policies,
701		) {
702			Ok(CompilationResult::Ready(compiled)) => compiled,
703			Ok(CompilationResult::Incremental(_)) => {
704				unreachable!("Single subscription statement should not require incremental compilation")
705			}
706			Err(err) => {
707				return ExecutionResult {
708					frames: vec![],
709					error: Some(err),
710					metrics: ExecutionMetrics::default(),
711				};
712			}
713		};
714		let compile_duration = start_compile.elapsed();
715
716		match execute_compiled_units(
717			&self.0,
718			&mut Transaction::Query(txn),
719			&compiled,
720			&cmd.params,
721			symbols,
722			compile_duration,
723		) {
724			Ok((output, remaining, _, metrics)) => ExecutionResult {
725				frames: merge_results(output, remaining),
726				error: None,
727				metrics: build_metrics(metrics),
728			},
729			Err(f) => ExecutionResult {
730				frames: vec![],
731				error: Some(f.error),
732				metrics: build_metrics(f.partial_metrics),
733			},
734		}
735	}
736
737	#[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
738	pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> ExecutionResult {
739		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn)) {
740			Ok(s) => s,
741			Err(e) => {
742				return ExecutionResult {
743					frames: vec![],
744					error: Some(e),
745					metrics: ExecutionMetrics::default(),
746				};
747			}
748		};
749
750		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
751			&mut Transaction::Command(&mut *txn),
752			SessionOp::Command,
753			false,
754		) {
755			return ExecutionResult {
756				frames: vec![],
757				error: Some(e),
758				metrics: ExecutionMetrics::default(),
759			};
760		}
761
762		let start_compile = self.0.runtime_context.clock.instant();
763		let compiled = match self.compiler.compile_with_policy(
764			&mut Transaction::Command(txn),
765			cmd.rql,
766			inject_from_policies,
767		) {
768			Ok(CompilationResult::Ready(compiled)) => compiled,
769			Ok(CompilationResult::Incremental(_)) => {
770				unreachable!("DDL statements require admin transactions, not command transactions")
771			}
772			Err(err) => {
773				#[cfg(not(reifydb_single_threaded))]
774				if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
775					return ExecutionResult {
776						frames: vec![],
777						error: Some(Error(Box::new(Diagnostic {
778							code: "REMOTE_002".to_string(),
779							message: "Write operations on remote namespaces are not supported"
780								.to_string(),
781							help: Some("Use the remote instance directly for write operations"
782								.to_string()),
783							..Default::default()
784						}))),
785						metrics: ExecutionMetrics::default(),
786					};
787				}
788				return ExecutionResult {
789					frames: vec![],
790					error: Some(err),
791					metrics: ExecutionMetrics::default(),
792				};
793			}
794		};
795		let compile_duration = start_compile.elapsed();
796
797		match execute_compiled_units(
798			&self.0,
799			&mut Transaction::Command(txn),
800			&compiled,
801			&cmd.params,
802			symbols,
803			compile_duration,
804		) {
805			Ok((output, remaining, _, metrics)) => ExecutionResult {
806				frames: merge_results(output, remaining),
807				error: None,
808				metrics: build_metrics(metrics),
809			},
810			Err(f) => ExecutionResult {
811				frames: vec![],
812				error: Some(f.error),
813				metrics: build_metrics(f.partial_metrics),
814			},
815		}
816	}
817
818	/// Call a procedure by fully-qualified name (e.g., "banking.transfer_funds").
819	#[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
820	pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> ExecutionResult {
821		let rql = format!("CALL {}()", name);
822		let symbols = match self.setup_symbols(params, &mut Transaction::Command(&mut *txn)) {
823			Ok(s) => s,
824			Err(e) => {
825				return ExecutionResult {
826					frames: vec![],
827					error: Some(e),
828					metrics: ExecutionMetrics::default(),
829				};
830			}
831		};
832
833		let start_compile = self.0.runtime_context.clock.instant();
834		let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql) {
835			Ok(CompilationResult::Ready(compiled)) => compiled,
836			Ok(CompilationResult::Incremental(_)) => {
837				unreachable!("CALL statements should not require incremental compilation")
838			}
839			Err(e) => {
840				return ExecutionResult {
841					frames: vec![],
842					error: Some(e),
843					metrics: ExecutionMetrics::default(),
844				};
845			}
846		};
847		let compile_duration = start_compile.elapsed();
848		let compile_duration_us = compile_duration.as_micros() as u64 / compiled.len().max(1) as u64;
849
850		let mut result = vec![];
851		let mut metrics = Vec::new();
852		let mut symbols = symbols;
853		for compiled in compiled.iter() {
854			result.clear();
855			let mut tx = Transaction::Command(txn);
856			let mut vm = Vm::from_services(symbols, &self.0, params, tx.identity());
857			let start_execute = self.0.runtime_context.clock.instant();
858			let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
859			let execute_duration = start_execute.elapsed();
860			symbols = vm.symbols;
861
862			metrics.push(StatementMetric {
863				fingerprint: compiled.fingerprint,
864				normalized_rql: compiled.normalized_rql.clone(),
865				compile_duration_us,
866				execute_duration_us: execute_duration.as_micros() as u64,
867				rows_affected: if run_result.is_ok() {
868					extract_rows_affected(&result)
869				} else {
870					0
871				},
872			});
873
874			if let Err(e) = run_result {
875				return ExecutionResult {
876					frames: vec![],
877					error: Some(e),
878					metrics: build_metrics(metrics),
879				};
880			}
881		}
882
883		ExecutionResult {
884			frames: result,
885			error: None,
886			metrics: build_metrics(metrics),
887		}
888	}
889
890	#[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
891	pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> ExecutionResult {
892		let symbols = match self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn)) {
893			Ok(s) => s,
894			Err(e) => {
895				return ExecutionResult {
896					frames: vec![],
897					error: Some(e),
898					metrics: ExecutionMetrics::default(),
899				};
900			}
901		};
902
903		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
904			&mut Transaction::Query(&mut *txn),
905			SessionOp::Query,
906			false,
907		) {
908			return ExecutionResult {
909				frames: vec![],
910				error: Some(e),
911				metrics: ExecutionMetrics::default(),
912			};
913		}
914
915		let start_compile = self.0.runtime_context.clock.instant();
916		let compiled = match self.compiler.compile_with_policy(
917			&mut Transaction::Query(txn),
918			qry.rql,
919			inject_from_policies,
920		) {
921			Ok(CompilationResult::Ready(compiled)) => compiled,
922			Ok(CompilationResult::Incremental(_)) => {
923				unreachable!("DDL statements require admin transactions, not query transactions")
924			}
925			Err(err) => {
926				#[cfg(not(reifydb_single_threaded))]
927				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, qry.rql, qry.params) {
928					return ExecutionResult {
929						frames,
930						error: None,
931						metrics: ExecutionMetrics::default(),
932					};
933				}
934				return ExecutionResult {
935					frames: vec![],
936					error: Some(err),
937					metrics: ExecutionMetrics::default(),
938				};
939			}
940		};
941		let compile_duration = start_compile.elapsed();
942
943		let exec_result = execute_compiled_units(
944			&self.0,
945			&mut Transaction::Query(txn),
946			&compiled,
947			&qry.params,
948			symbols,
949			compile_duration,
950		);
951
952		match exec_result {
953			Ok((output, remaining, _, metrics)) => ExecutionResult {
954				frames: merge_results(output, remaining),
955				error: None,
956				metrics: build_metrics(metrics),
957			},
958			Err(f) => ExecutionResult {
959				frames: vec![],
960				error: Some(f.error),
961				metrics: build_metrics(f.partial_metrics),
962			},
963		}
964	}
965}