Skip to main content

reifydb_engine/vm/
executor.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::FlowOperatorStore};
7use reifydb_core::util::ioc::IocContainer;
8use reifydb_function::registry::Functions;
9use reifydb_metric::metric::MetricReader;
10use reifydb_rql::compiler::CompilationResult;
11use reifydb_runtime::clock::Clock;
12use reifydb_store_single::SingleStore;
13use reifydb_transaction::transaction::{
14	Transaction, admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
15};
16use reifydb_type::{params::Params, value::frame::frame::Frame};
17use tracing::instrument;
18
19use crate::{
20	transform::registry::Transforms,
21	vm::{
22		Admin, Command, Query,
23		services::Services,
24		stack::{SymbolTable, Variable},
25		vm::Vm,
26	},
27};
28
29/// Executor is the orchestration layer for RQL statement execution.
30pub struct Executor(Arc<Services>);
31
32impl Clone for Executor {
33	fn clone(&self) -> Self {
34		Self(self.0.clone())
35	}
36}
37
38impl Deref for Executor {
39	type Target = Services;
40
41	fn deref(&self) -> &Self::Target {
42		&self.0
43	}
44}
45
46impl Executor {
47	pub fn new(
48		catalog: Catalog,
49		clock: Clock,
50		functions: Functions,
51		transforms: Transforms,
52		flow_operator_store: FlowOperatorStore,
53		stats_reader: MetricReader<SingleStore>,
54		ioc: IocContainer,
55	) -> Self {
56		Self(Arc::new(Services::new(
57			catalog,
58			clock,
59			functions,
60			transforms,
61			flow_operator_store,
62			stats_reader,
63			ioc,
64		)))
65	}
66
67	/// Get a reference to the underlying Services
68	pub fn services(&self) -> &Arc<Services> {
69		&self.0
70	}
71
72	#[allow(dead_code)]
73	pub fn testing() -> Self {
74		Self(Services::testing())
75	}
76}
77
78/// Populate a stack with parameters so they can be accessed as variables.
79fn populate_stack(stack: &mut SymbolTable, params: &Params) -> crate::Result<()> {
80	match params {
81		Params::Positional(values) => {
82			for (index, value) in values.iter().enumerate() {
83				let param_name = (index + 1).to_string();
84				stack.set(param_name, Variable::scalar(value.clone()), false)?;
85			}
86		}
87		Params::Named(map) => {
88			for (name, value) in map {
89				stack.set(name.clone(), Variable::scalar(value.clone()), false)?;
90			}
91		}
92		Params::None => {}
93	}
94	Ok(())
95}
96
97impl Executor {
98	#[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
99	pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> crate::Result<Vec<Frame>> {
100		let mut result = vec![];
101		let mut output_results: Vec<Frame> = Vec::new();
102		let mut symbol_table = SymbolTable::new();
103		populate_stack(&mut symbol_table, &cmd.params)?;
104
105		match self.compiler.compile(&mut Transaction::Admin(txn), cmd.rql)? {
106			CompilationResult::Ready(compiled) => {
107				for compiled in compiled.iter() {
108					result.clear();
109					let mut tx = Transaction::Admin(txn);
110					let mut vm = Vm::new(symbol_table);
111					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
112					symbol_table = vm.symbol_table;
113
114					if compiled.is_output {
115						output_results.append(&mut result);
116					}
117				}
118			}
119			CompilationResult::Incremental(mut state) => {
120				while let Some(compiled) =
121					self.compiler.compile_next(&mut Transaction::Admin(txn), &mut state)?
122				{
123					result.clear();
124					let mut tx = Transaction::Admin(txn);
125					let mut vm = Vm::new(symbol_table);
126					vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
127					symbol_table = vm.symbol_table;
128
129					if compiled.is_output {
130						output_results.append(&mut result);
131					}
132				}
133			}
134		}
135
136		let mut final_result = output_results;
137		final_result.append(&mut result);
138		Ok(final_result)
139	}
140
141	#[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
142	pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
143		let mut result = vec![];
144		let mut output_results: Vec<Frame> = Vec::new();
145		let mut symbol_table = SymbolTable::new();
146		populate_stack(&mut symbol_table, &cmd.params)?;
147
148		let compiled = match self.compiler.compile(&mut Transaction::Command(txn), cmd.rql)? {
149			CompilationResult::Ready(compiled) => compiled,
150			CompilationResult::Incremental(_) => {
151				unreachable!("DDL statements require admin transactions, not command transactions")
152			}
153		};
154
155		for compiled in compiled.iter() {
156			result.clear();
157			let mut tx = Transaction::Command(txn);
158			let mut vm = Vm::new(symbol_table);
159			vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
160			symbol_table = vm.symbol_table;
161
162			if compiled.is_output {
163				output_results.append(&mut result);
164			}
165		}
166
167		let mut final_result = output_results;
168		final_result.append(&mut result);
169		Ok(final_result)
170	}
171
172	#[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
173	pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
174		let mut result = vec![];
175		let mut output_results: Vec<Frame> = Vec::new();
176		let mut symbol_table = SymbolTable::new();
177		populate_stack(&mut symbol_table, &qry.params)?;
178
179		let compiled = match self.compiler.compile(&mut Transaction::Query(txn), qry.rql)? {
180			CompilationResult::Ready(compiled) => compiled,
181			CompilationResult::Incremental(_) => {
182				unreachable!("DDL statements require admin transactions, not query transactions")
183			}
184		};
185
186		for compiled in compiled.iter() {
187			result.clear();
188			let mut tx = Transaction::Query(txn);
189			let mut vm = Vm::new(symbol_table);
190			vm.run(&self.0, &mut tx, &compiled.instructions, &qry.params, &mut result)?;
191			symbol_table = vm.symbol_table;
192
193			if compiled.is_output {
194				output_results.append(&mut result);
195			}
196		}
197
198		let mut final_result = output_results;
199		final_result.append(&mut result);
200		Ok(final_result)
201	}
202}