Skip to main content

reifydb_engine/vm/
executor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, result::Result as StdResult, sync::Arc, time::Duration};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{
9	error::diagnostic::subscription,
10	execution::ExecutionResult,
11	interface::catalog::policy::SessionOp,
12	metric::{ExecutionMetrics, StatementMetric},
13	value::column::columns::Columns,
14};
15use reifydb_metric::storage::metric::MetricReader;
16use reifydb_policy::inject_from_policies;
17use reifydb_rql::{
18	ast::parse_str,
19	compiler::{CompilationResult, Compiled, IncrementalCompilation, constrain_policy},
20	fingerprint::request::fingerprint_request,
21};
22use reifydb_runtime::context::clock::Instant;
23use reifydb_store_single::SingleStore;
24use reifydb_transaction::transaction::{
25	RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
26	query::QueryTransaction,
27};
28#[cfg(not(reifydb_single_threaded))]
29use reifydb_type::error::Diagnostic;
30use reifydb_type::{
31	error::Error,
32	params::Params,
33	value::{Value, frame::frame::Frame, r#type::Type},
34};
35use tracing::instrument;
36
37#[cfg(not(reifydb_single_threaded))]
38use crate::remote;
39use crate::{
40	Result,
41	policy::PolicyEvaluator,
42	vm::{
43		Admin, Command, Query, Subscription, Test,
44		services::{EngineConfig, Services},
45		stack::{SymbolTable, Variable},
46		vm::Vm,
47	},
48};
49
50pub struct Executor(Arc<Services>);
51
52impl Clone for Executor {
53	fn clone(&self) -> Self {
54		Self(self.0.clone())
55	}
56}
57
58impl Deref for Executor {
59	type Target = Services;
60
61	fn deref(&self) -> &Self::Target {
62		&self.0
63	}
64}
65
66impl Executor {
67	pub fn new(
68		catalog: Catalog,
69		config: EngineConfig,
70		flow_operator_store: SystemFlowOperatorStore,
71		stats_reader: MetricReader<SingleStore>,
72	) -> Self {
73		Self(Arc::new(Services::new(catalog, config, flow_operator_store, stats_reader)))
74	}
75
76	pub fn services(&self) -> &Arc<Services> {
77		&self.0
78	}
79
80	pub fn from_services(services: Arc<Services>) -> Self {
81		Self(services)
82	}
83
84	#[allow(dead_code)]
85	pub fn testing() -> Self {
86		Self(Services::testing())
87	}
88
89	#[cfg(not(reifydb_single_threaded))]
90	fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
91		if let Some(ref registry) = self.0.remote_registry
92			&& remote::is_remote_query(err)
93			&& let Some(address) = remote::extract_remote_address(err)
94		{
95			let token = remote::extract_remote_token(err);
96			return registry.forward_query(&address, rql, params, token.as_deref()).map(Some);
97		}
98		Ok(None)
99	}
100}
101
102impl RqlExecutor for Executor {
103	fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
104		Executor::rql(self, tx, rql, params)
105	}
106}
107
108fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
109	match params {
110		Params::Positional(values) => {
111			for (index, value) in values.iter().enumerate() {
112				let param_name = (index + 1).to_string();
113				symbols.set(param_name, Variable::scalar(value.clone()), false)?;
114			}
115		}
116		Params::Named(map) => {
117			for (name, value) in map.iter() {
118				symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
119			}
120		}
121		Params::None => {}
122	}
123	Ok(())
124}
125
126fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
127	let identity = tx.identity();
128	if identity.is_privileged() {
129		return Ok(());
130	}
131	if identity.is_anonymous() {
132		let columns = Columns::single_row([
133			("id", Value::IdentityId(identity)),
134			("name", Value::none_of(Type::Utf8)),
135			("roles", Value::List(vec![])),
136		]);
137		symbols.set("identity".to_string(), Variable::columns(columns), false)?;
138		return Ok(());
139	}
140	if let Some(user) = catalog.find_identity(tx, identity)? {
141		let roles = catalog.find_role_names_for_identity(tx, identity)?;
142		let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
143		let columns = Columns::single_row([
144			("id", Value::IdentityId(identity)),
145			("name", Value::Utf8(user.name)),
146			("roles", Value::List(role_values)),
147		]);
148		symbols.set("identity".to_string(), Variable::columns(columns), false)?;
149	}
150	Ok(())
151}
152
153type CompiledUnitsResult = (Vec<Frame>, Vec<Frame>, SymbolTable, Vec<StatementMetric>);
154
155struct ExecutionFailure {
156	error: Error,
157	partial_metrics: Vec<StatementMetric>,
158}
159
160fn build_metrics(statements: Vec<StatementMetric>) -> ExecutionMetrics {
161	let fps: Vec<_> = statements.iter().map(|m| m.fingerprint).collect();
162	ExecutionMetrics {
163		fingerprint: fingerprint_request(&fps),
164		statements,
165		..Default::default()
166	}
167}
168
169struct RunUnitOutcome {
170	symbols: SymbolTable,
171	run_result: Result<()>,
172	execute_duration_us: u64,
173}
174
175#[instrument(
176	name = "vm::run",
177	level = "debug",
178	skip_all,
179	fields(fingerprint = ?compiled.fingerprint, instr_count = compiled.instructions.len()),
180)]
181fn run_compiled_unit(
182	services: &Arc<Services>,
183	tx: &mut Transaction<'_>,
184	compiled: &Compiled,
185	params: &Params,
186	symbols: SymbolTable,
187	result: &mut Vec<Frame>,
188) -> RunUnitOutcome {
189	let mut vm = Vm::from_services(symbols, services, params, tx.identity());
190	let start = services.runtime_context.clock.instant();
191	let run_result = vm.run(services, tx, &compiled.instructions, result);
192	let execute_duration = start.elapsed();
193	RunUnitOutcome {
194		symbols: vm.symbols,
195		run_result,
196		execute_duration_us: execute_duration.as_micros() as u64,
197	}
198}
199
200#[instrument(
201	name = "executor::execute_units",
202	level = "debug",
203	skip_all,
204	fields(unit_count = compiled_list.len()),
205)]
206fn execute_compiled_units(
207	services: &Arc<Services>,
208	tx: &mut Transaction<'_>,
209	compiled_list: &[Compiled],
210	params: &Params,
211	mut symbols: SymbolTable,
212	compile_duration: Duration,
213) -> StdResult<CompiledUnitsResult, ExecutionFailure> {
214	let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
215	let mut result = vec![];
216	let mut output_results: Vec<Frame> = Vec::new();
217	let mut metrics = Vec::new();
218
219	for compiled in compiled_list.iter() {
220		result.clear();
221		let outcome = run_compiled_unit(services, tx, compiled, params, symbols, &mut result);
222		symbols = outcome.symbols;
223
224		metrics.push(StatementMetric {
225			fingerprint: compiled.fingerprint,
226			normalized_rql: compiled.normalized_rql.clone(),
227			compile_duration_us,
228			execute_duration_us: outcome.execute_duration_us,
229			rows_affected: if outcome.run_result.is_ok() {
230				extract_rows_affected(&result)
231			} else {
232				0
233			},
234		});
235
236		if let Err(error) = outcome.run_result {
237			return Err(ExecutionFailure {
238				error,
239				partial_metrics: metrics,
240			});
241		}
242
243		if compiled.is_output {
244			output_results.append(&mut result);
245		}
246	}
247
248	Ok((output_results, result, symbols, metrics))
249}
250
251fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
252	output_results.append(&mut remaining);
253	output_results
254}
255
256#[inline]
257fn error_result(error: Error, metrics: ExecutionMetrics) -> ExecutionResult {
258	ExecutionResult {
259		frames: vec![],
260		error: Some(error),
261		metrics,
262	}
263}
264
265fn extract_rows_affected(result: &[Frame]) -> u64 {
266	if result.len() == 1 {
267		let frame = &result[0];
268		for col in &frame.columns {
269			match col.name.as_str() {
270				"inserted" | "updated" | "deleted" => {
271					if col.data.len() == 1
272						&& let Value::Uint8(n) = col.data.get_value(0)
273					{
274						return n;
275					}
276				}
277				_ => {}
278			}
279		}
280	}
281	result.len() as u64
282}
283
284impl Executor {
285	#[instrument(name = "executor::setup_symbols", level = "debug", skip_all)]
286	fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
287		let mut symbols = SymbolTable::new();
288		populate_symbols(&mut symbols, params)?;
289		populate_identity(&mut symbols, &self.catalog, tx)?;
290		Ok(symbols)
291	}
292
293	#[instrument(name = "executor::compile", level = "debug", skip(self, tx), fields(rql = %rql))]
294	fn compile_query(&self, tx: &mut Transaction<'_>, rql: &str) -> Result<CompilationResult> {
295		self.compiler.compile_with_policy(tx, rql, inject_from_policies)
296	}
297
298	#[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
299	pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
300		let mut symbols = match self.setup_symbols(&params, tx) {
301			Ok(s) => s,
302			Err(e) => {
303				return ExecutionResult {
304					frames: vec![],
305					error: Some(e),
306					metrics: ExecutionMetrics::default(),
307				};
308			}
309		};
310
311		let start_compile = self.0.runtime_context.clock.instant();
312		let compiled_list = match self.compile_query(tx, rql) {
313			Ok(CompilationResult::Ready(compiled)) => compiled,
314			Ok(CompilationResult::Incremental(_)) => {
315				unreachable!("incremental compilation not supported in rql()")
316			}
317			Err(err) => {
318				#[cfg(not(reifydb_single_threaded))]
319				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
320					return ExecutionResult {
321						frames,
322						error: None,
323						metrics: ExecutionMetrics::default(),
324					};
325				}
326				return ExecutionResult {
327					frames: vec![],
328					error: Some(err),
329					metrics: ExecutionMetrics::default(),
330				};
331			}
332		};
333		let compile_duration = start_compile.elapsed();
334		let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
335
336		let mut result = vec![];
337		let mut metrics = Vec::new();
338		for compiled in compiled_list.iter() {
339			result.clear();
340			let outcome = run_compiled_unit(&self.0, tx, compiled, &params, symbols, &mut result);
341			symbols = outcome.symbols;
342
343			metrics.push(StatementMetric {
344				fingerprint: compiled.fingerprint,
345				normalized_rql: compiled.normalized_rql.clone(),
346				compile_duration_us,
347				execute_duration_us: outcome.execute_duration_us,
348				rows_affected: if outcome.run_result.is_ok() {
349					extract_rows_affected(&result)
350				} else {
351					0
352				},
353			});
354
355			if let Err(e) = outcome.run_result {
356				return ExecutionResult {
357					frames: vec![],
358					error: Some(e),
359					metrics: build_metrics(metrics),
360				};
361			}
362		}
363
364		ExecutionResult {
365			frames: result,
366			error: None,
367			metrics: build_metrics(metrics),
368		}
369	}
370
371	#[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
372	pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> ExecutionResult {
373		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn)) {
374			Ok(s) => s,
375			Err(e) => return error_result(e, ExecutionMetrics::default()),
376		};
377		if let Err(e) = self.enforce_admin_policy(&symbols, txn) {
378			return error_result(e, ExecutionMetrics::default());
379		}
380		let start_compile = self.0.runtime_context.clock.instant();
381		match self.compile_query(&mut Transaction::Admin(txn), cmd.rql) {
382			Err(err) => self.handle_admin_compile_error(err, cmd.rql, cmd.params),
383			Ok(CompilationResult::Ready(compiled)) => {
384				self.execute_admin_ready(txn, compiled, &cmd.params, symbols, start_compile)
385			}
386			Ok(CompilationResult::Incremental(state)) => {
387				self.execute_admin_incremental(txn, state, &cmd.params, symbols)
388			}
389		}
390	}
391
392	#[inline]
393	fn enforce_admin_policy(&self, symbols: &SymbolTable, txn: &mut AdminTransaction) -> Result<()> {
394		PolicyEvaluator::new(&self.0, symbols).enforce_session_policy(
395			&mut Transaction::Admin(txn),
396			SessionOp::Admin,
397			true,
398		)
399	}
400
401	#[inline]
402	#[cfg_attr(reifydb_single_threaded, allow(unused_variables))]
403	fn handle_admin_compile_error(&self, err: Error, rql: &str, params: Params) -> ExecutionResult {
404		#[cfg(not(reifydb_single_threaded))]
405		if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
406			return ExecutionResult {
407				frames,
408				error: None,
409				metrics: ExecutionMetrics::default(),
410			};
411		}
412		error_result(err, ExecutionMetrics::default())
413	}
414
415	#[inline]
416	fn execute_admin_ready(
417		&self,
418		txn: &mut AdminTransaction,
419		compiled: Arc<Vec<Compiled>>,
420		params: &Params,
421		symbols: SymbolTable,
422		start_compile: Instant,
423	) -> ExecutionResult {
424		let compile_duration = start_compile.elapsed();
425		match execute_compiled_units(
426			&self.0,
427			&mut Transaction::Admin(txn),
428			&compiled,
429			params,
430			symbols,
431			compile_duration,
432		) {
433			Ok((output, remaining, _, metrics)) => ExecutionResult {
434				frames: merge_results(output, remaining),
435				error: None,
436				metrics: build_metrics(metrics),
437			},
438			Err(f) => ExecutionResult {
439				frames: vec![],
440				error: Some(f.error),
441				metrics: build_metrics(f.partial_metrics),
442			},
443		}
444	}
445
446	fn execute_admin_incremental(
447		&self,
448		txn: &mut AdminTransaction,
449		mut state: IncrementalCompilation,
450		params: &Params,
451		symbols: SymbolTable,
452	) -> ExecutionResult {
453		let policy = constrain_policy(inject_from_policies);
454		let mut result = vec![];
455		let mut output_results: Vec<Frame> = Vec::new();
456		let mut symbols = symbols;
457		let mut metrics = Vec::new();
458		loop {
459			let start_incr = self.0.runtime_context.clock.instant();
460			let next = match self.compiler.compile_next_with_policy(
461				&mut Transaction::Admin(txn),
462				&mut state,
463				&policy,
464			) {
465				Ok(n) => n,
466				Err(e) => return error_result(e, build_metrics(metrics)),
467			};
468			let compile_duration = start_incr.elapsed();
469
470			let Some(compiled) = next else {
471				break;
472			};
473
474			result.clear();
475			let mut tx = Transaction::Admin(txn);
476			let mut vm = Vm::from_services(symbols, &self.0, params, tx.identity());
477			let start_execute = self.0.runtime_context.clock.instant();
478			let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
479			let execute_duration = start_execute.elapsed();
480			symbols = vm.symbols;
481
482			metrics.push(StatementMetric {
483				fingerprint: compiled.fingerprint,
484				normalized_rql: compiled.normalized_rql,
485				compile_duration_us: compile_duration.as_micros() as u64,
486				execute_duration_us: execute_duration.as_micros() as u64,
487				rows_affected: if run_result.is_ok() {
488					extract_rows_affected(&result)
489				} else {
490					0
491				},
492			});
493
494			if let Err(e) = run_result {
495				return error_result(e, build_metrics(metrics));
496			}
497
498			if compiled.is_output {
499				output_results.append(&mut result);
500			}
501		}
502		ExecutionResult {
503			frames: merge_results(output_results, result),
504			error: None,
505			metrics: build_metrics(metrics),
506		}
507	}
508
509	#[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
510	pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> ExecutionResult {
511		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Test(Box::new(txn.reborrow()))) {
512			Ok(s) => s,
513			Err(e) => {
514				return ExecutionResult {
515					frames: vec![],
516					error: Some(e),
517					metrics: ExecutionMetrics::default(),
518				};
519			}
520		};
521
522		let session_type = txn.session_type;
523		let session_default_deny = txn.session_default_deny;
524		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
525			&mut Transaction::Test(Box::new(txn.reborrow())),
526			session_type,
527			session_default_deny,
528		) {
529			return ExecutionResult {
530				frames: vec![],
531				error: Some(e),
532				metrics: ExecutionMetrics::default(),
533			};
534		}
535
536		let start_compile = self.0.runtime_context.clock.instant();
537		match self.compiler.compile_with_policy(
538			&mut Transaction::Test(Box::new(txn.reborrow())),
539			cmd.rql,
540			inject_from_policies,
541		) {
542			Err(err) => {
543				#[cfg(not(reifydb_single_threaded))]
544				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
545					return ExecutionResult {
546						frames,
547						error: None,
548						metrics: ExecutionMetrics::default(),
549					};
550				}
551				ExecutionResult {
552					frames: vec![],
553					error: Some(err),
554					metrics: ExecutionMetrics::default(),
555				}
556			}
557			Ok(CompilationResult::Ready(compiled)) => {
558				let compile_duration = start_compile.elapsed();
559				match execute_compiled_units(
560					&self.0,
561					&mut Transaction::Test(Box::new(txn.reborrow())),
562					&compiled,
563					&cmd.params,
564					symbols,
565					compile_duration,
566				) {
567					Ok((output, remaining, _, metrics)) => ExecutionResult {
568						frames: merge_results(output, remaining),
569						error: None,
570						metrics: build_metrics(metrics),
571					},
572					Err(f) => ExecutionResult {
573						frames: vec![],
574						error: Some(f.error),
575						metrics: build_metrics(f.partial_metrics),
576					},
577				}
578			}
579			Ok(CompilationResult::Incremental(mut state)) => {
580				let policy = constrain_policy(|plans, bump, cat, tx| {
581					inject_from_policies(plans, bump, cat, tx)
582				});
583				let mut result = vec![];
584				let mut output_results: Vec<Frame> = Vec::new();
585				let mut symbols = symbols;
586				let mut metrics = Vec::new();
587				loop {
588					let start_incr = self.0.runtime_context.clock.instant();
589					let next = match self.compiler.compile_next_with_policy(
590						&mut Transaction::Test(Box::new(txn.reborrow())),
591						&mut state,
592						&policy,
593					) {
594						Ok(n) => n,
595						Err(e) => {
596							return ExecutionResult {
597								frames: vec![],
598								error: Some(e),
599								metrics: build_metrics(metrics),
600							};
601						}
602					};
603					let compile_duration = start_incr.elapsed();
604
605					let Some(compiled) = next else {
606						break;
607					};
608
609					result.clear();
610					let mut tx = Transaction::Test(Box::new(txn.reborrow()));
611					let mut vm = Vm::from_services(symbols, &self.0, &cmd.params, tx.identity());
612					let start_execute = self.0.runtime_context.clock.instant();
613					let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
614					let execute_duration = start_execute.elapsed();
615					symbols = vm.symbols;
616
617					metrics.push(StatementMetric {
618						fingerprint: compiled.fingerprint,
619						normalized_rql: compiled.normalized_rql,
620						compile_duration_us: compile_duration.as_micros() as u64,
621						execute_duration_us: execute_duration.as_micros() as u64,
622						rows_affected: if run_result.is_ok() {
623							extract_rows_affected(&result)
624						} else {
625							0
626						},
627					});
628
629					if let Err(e) = run_result {
630						return ExecutionResult {
631							frames: vec![],
632							error: Some(e),
633							metrics: build_metrics(metrics),
634						};
635					}
636
637					if compiled.is_output {
638						output_results.append(&mut result);
639					}
640				}
641				ExecutionResult {
642					frames: merge_results(output_results, result),
643					error: None,
644					metrics: build_metrics(metrics),
645				}
646			}
647		}
648	}
649
650	#[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
651	pub fn subscription(&self, txn: &mut QueryTransaction, cmd: Subscription<'_>) -> ExecutionResult {
652		let bump = Bump::new();
653		let statements = match parse_str(&bump, cmd.rql) {
654			Ok(s) => s,
655			Err(e) => {
656				return ExecutionResult {
657					frames: vec![],
658					error: Some(e),
659					metrics: ExecutionMetrics::default(),
660				};
661			}
662		};
663
664		if statements.len() != 1 {
665			return ExecutionResult {
666				frames: vec![],
667				error: Some(Error(Box::new(subscription::single_statement_required(
668					"Subscription endpoint requires exactly one statement",
669				)))),
670				metrics: ExecutionMetrics::default(),
671			};
672		}
673
674		let statement = &statements[0];
675		if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
676			return ExecutionResult {
677				frames: vec![],
678				error: Some(Error(Box::new(subscription::invalid_statement(
679					"Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
680				)))),
681				metrics: ExecutionMetrics::default(),
682			};
683		}
684
685		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Query(&mut *txn)) {
686			Ok(s) => s,
687			Err(e) => {
688				return ExecutionResult {
689					frames: vec![],
690					error: Some(e),
691					metrics: ExecutionMetrics::default(),
692				};
693			}
694		};
695
696		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
697			&mut Transaction::Query(&mut *txn),
698			SessionOp::Subscription,
699			true,
700		) {
701			return ExecutionResult {
702				frames: vec![],
703				error: Some(e),
704				metrics: ExecutionMetrics::default(),
705			};
706		}
707
708		let start_compile = self.0.runtime_context.clock.instant();
709		let compiled = match self.compiler.compile_with_policy(
710			&mut Transaction::Query(txn),
711			cmd.rql,
712			inject_from_policies,
713		) {
714			Ok(CompilationResult::Ready(compiled)) => compiled,
715			Ok(CompilationResult::Incremental(_)) => {
716				unreachable!("Single subscription statement should not require incremental compilation")
717			}
718			Err(err) => {
719				return ExecutionResult {
720					frames: vec![],
721					error: Some(err),
722					metrics: ExecutionMetrics::default(),
723				};
724			}
725		};
726		let compile_duration = start_compile.elapsed();
727
728		match execute_compiled_units(
729			&self.0,
730			&mut Transaction::Query(txn),
731			&compiled,
732			&cmd.params,
733			symbols,
734			compile_duration,
735		) {
736			Ok((output, remaining, _, metrics)) => ExecutionResult {
737				frames: merge_results(output, remaining),
738				error: None,
739				metrics: build_metrics(metrics),
740			},
741			Err(f) => ExecutionResult {
742				frames: vec![],
743				error: Some(f.error),
744				metrics: build_metrics(f.partial_metrics),
745			},
746		}
747	}
748
749	#[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
750	pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> ExecutionResult {
751		let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn)) {
752			Ok(s) => s,
753			Err(e) => {
754				return ExecutionResult {
755					frames: vec![],
756					error: Some(e),
757					metrics: ExecutionMetrics::default(),
758				};
759			}
760		};
761
762		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
763			&mut Transaction::Command(&mut *txn),
764			SessionOp::Command,
765			false,
766		) {
767			return ExecutionResult {
768				frames: vec![],
769				error: Some(e),
770				metrics: ExecutionMetrics::default(),
771			};
772		}
773
774		let start_compile = self.0.runtime_context.clock.instant();
775		let compiled = match self.compile_query(&mut Transaction::Command(txn), cmd.rql) {
776			Ok(CompilationResult::Ready(compiled)) => compiled,
777			Ok(CompilationResult::Incremental(_)) => {
778				unreachable!("DDL statements require admin transactions, not command transactions")
779			}
780			Err(err) => {
781				#[cfg(not(reifydb_single_threaded))]
782				if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
783					return ExecutionResult {
784						frames: vec![],
785						error: Some(Error(Box::new(Diagnostic {
786							code: "REMOTE_002".to_string(),
787							message: "Write operations on remote namespaces are not supported"
788								.to_string(),
789							help: Some("Use the remote instance directly for write operations"
790								.to_string()),
791							..Default::default()
792						}))),
793						metrics: ExecutionMetrics::default(),
794					};
795				}
796				return ExecutionResult {
797					frames: vec![],
798					error: Some(err),
799					metrics: ExecutionMetrics::default(),
800				};
801			}
802		};
803		let compile_duration = start_compile.elapsed();
804
805		match execute_compiled_units(
806			&self.0,
807			&mut Transaction::Command(txn),
808			&compiled,
809			&cmd.params,
810			symbols,
811			compile_duration,
812		) {
813			Ok((output, remaining, _, metrics)) => ExecutionResult {
814				frames: merge_results(output, remaining),
815				error: None,
816				metrics: build_metrics(metrics),
817			},
818			Err(f) => ExecutionResult {
819				frames: vec![],
820				error: Some(f.error),
821				metrics: build_metrics(f.partial_metrics),
822			},
823		}
824	}
825
826	#[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
827	pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> ExecutionResult {
828		let rql = format!("CALL {}()", name);
829		let symbols = match self.setup_symbols(params, &mut Transaction::Command(&mut *txn)) {
830			Ok(s) => s,
831			Err(e) => {
832				return ExecutionResult {
833					frames: vec![],
834					error: Some(e),
835					metrics: ExecutionMetrics::default(),
836				};
837			}
838		};
839
840		let start_compile = self.0.runtime_context.clock.instant();
841		let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql) {
842			Ok(CompilationResult::Ready(compiled)) => compiled,
843			Ok(CompilationResult::Incremental(_)) => {
844				unreachable!("CALL statements should not require incremental compilation")
845			}
846			Err(e) => {
847				return ExecutionResult {
848					frames: vec![],
849					error: Some(e),
850					metrics: ExecutionMetrics::default(),
851				};
852			}
853		};
854		let compile_duration = start_compile.elapsed();
855		let compile_duration_us = compile_duration.as_micros() as u64 / compiled.len().max(1) as u64;
856
857		let mut result = vec![];
858		let mut metrics = Vec::new();
859		let mut symbols = symbols;
860		for compiled in compiled.iter() {
861			result.clear();
862			let mut tx = Transaction::Command(txn);
863			let mut vm = Vm::from_services(symbols, &self.0, params, tx.identity());
864			let start_execute = self.0.runtime_context.clock.instant();
865			let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
866			let execute_duration = start_execute.elapsed();
867			symbols = vm.symbols;
868
869			metrics.push(StatementMetric {
870				fingerprint: compiled.fingerprint,
871				normalized_rql: compiled.normalized_rql.clone(),
872				compile_duration_us,
873				execute_duration_us: execute_duration.as_micros() as u64,
874				rows_affected: if run_result.is_ok() {
875					extract_rows_affected(&result)
876				} else {
877					0
878				},
879			});
880
881			if let Err(e) = run_result {
882				return ExecutionResult {
883					frames: vec![],
884					error: Some(e),
885					metrics: build_metrics(metrics),
886				};
887			}
888		}
889
890		ExecutionResult {
891			frames: result,
892			error: None,
893			metrics: build_metrics(metrics),
894		}
895	}
896
897	#[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
898	pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> ExecutionResult {
899		let symbols = match self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn)) {
900			Ok(s) => s,
901			Err(e) => {
902				return ExecutionResult {
903					frames: vec![],
904					error: Some(e),
905					metrics: ExecutionMetrics::default(),
906				};
907			}
908		};
909
910		if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
911			&mut Transaction::Query(&mut *txn),
912			SessionOp::Query,
913			false,
914		) {
915			return ExecutionResult {
916				frames: vec![],
917				error: Some(e),
918				metrics: ExecutionMetrics::default(),
919			};
920		}
921
922		let start_compile = self.0.runtime_context.clock.instant();
923		let compiled = match self.compile_query(&mut Transaction::Query(txn), qry.rql) {
924			Ok(CompilationResult::Ready(compiled)) => compiled,
925			Ok(CompilationResult::Incremental(_)) => {
926				unreachable!("DDL statements require admin transactions, not query transactions")
927			}
928			Err(err) => {
929				#[cfg(not(reifydb_single_threaded))]
930				if let Ok(Some(frames)) = self.try_forward_remote_query(&err, qry.rql, qry.params) {
931					return ExecutionResult {
932						frames,
933						error: None,
934						metrics: ExecutionMetrics::default(),
935					};
936				}
937				return ExecutionResult {
938					frames: vec![],
939					error: Some(err),
940					metrics: ExecutionMetrics::default(),
941				};
942			}
943		};
944		let compile_duration = start_compile.elapsed();
945
946		let exec_result = execute_compiled_units(
947			&self.0,
948			&mut Transaction::Query(txn),
949			&compiled,
950			&qry.params,
951			symbols,
952			compile_duration,
953		);
954
955		match exec_result {
956			Ok((output, remaining, _, metrics)) => ExecutionResult {
957				frames: merge_results(output, remaining),
958				error: None,
959				metrics: build_metrics(metrics),
960			},
961			Err(f) => ExecutionResult {
962				frames: vec![],
963				error: Some(f.error),
964				metrics: build_metrics(f.partial_metrics),
965			},
966		}
967	}
968}