1use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::FlowOperatorStore};
7use reifydb_core::{util::ioc::IocContainer, value::column::columns::Columns};
8use reifydb_function::registry::Functions;
9use reifydb_metric::metric::MetricReader;
10use reifydb_policy::inject_read_policies;
11use reifydb_rql::compiler::{CompilationResult, constrain_policy};
12use reifydb_runtime::clock::Clock;
13use reifydb_store_single::SingleStore;
14use reifydb_transaction::transaction::{
15 Transaction, admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
16};
17#[cfg(not(target_arch = "wasm32"))]
18use reifydb_type::error::{Diagnostic, Error};
19use reifydb_type::{
20 params::Params,
21 value::{Value, frame::frame::Frame, identity::IdentityId, r#type::Type},
22};
23use tracing::instrument;
24
25#[cfg(not(target_arch = "wasm32"))]
26use crate::remote::{self, RemoteRegistry};
27use crate::{
28 Result,
29 policy::PolicyEvaluator,
30 procedure::registry::Procedures,
31 transform::registry::Transforms,
32 vm::{
33 Admin, Command, Query,
34 services::Services,
35 stack::{SymbolTable, Variable},
36 vm::Vm,
37 },
38};
39
40pub struct Executor(Arc<Services>);
42
43impl Clone for Executor {
44 fn clone(&self) -> Self {
45 Self(self.0.clone())
46 }
47}
48
49impl Deref for Executor {
50 type Target = Services;
51
52 fn deref(&self) -> &Self::Target {
53 &self.0
54 }
55}
56
57impl Executor {
58 pub fn new(
59 catalog: Catalog,
60 clock: Clock,
61 functions: Functions,
62 procedures: Procedures,
63 transforms: Transforms,
64 flow_operator_store: FlowOperatorStore,
65 stats_reader: MetricReader<SingleStore>,
66 ioc: IocContainer,
67 #[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
68 ) -> Self {
69 Self(Arc::new(Services::new(
70 catalog,
71 clock,
72 functions,
73 procedures,
74 transforms,
75 flow_operator_store,
76 stats_reader,
77 ioc,
78 #[cfg(not(target_arch = "wasm32"))]
79 remote_registry,
80 )))
81 }
82
83 pub fn services(&self) -> &Arc<Services> {
85 &self.0
86 }
87
88 pub fn from_services(services: Arc<Services>) -> Self {
90 Self(services)
91 }
92
93 #[allow(dead_code)]
94 pub fn testing() -> Self {
95 Self(Services::testing())
96 }
97
98 #[cfg(not(target_arch = "wasm32"))]
101 fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
102 if let Some(ref registry) = self.0.remote_registry {
103 if remote::is_remote_query(err) {
104 if let Some(address) = remote::extract_remote_address(err) {
105 return registry.forward_query(&address, rql, params).map(Some);
106 }
107 }
108 }
109 Ok(None)
110 }
111}
112
113fn populate_stack(stack: &mut SymbolTable, params: &Params) -> Result<()> {
115 match params {
116 Params::Positional(values) => {
117 for (index, value) in values.iter().enumerate() {
118 let param_name = (index + 1).to_string();
119 stack.set(param_name, Variable::scalar(value.clone()), false)?;
120 }
121 }
122 Params::Named(map) => {
123 for (name, value) in map {
124 stack.set(name.clone(), Variable::scalar(value.clone()), false)?;
125 }
126 }
127 Params::None => {}
128 }
129 Ok(())
130}
131
132fn populate_identity(
135 stack: &mut SymbolTable,
136 catalog: &Catalog,
137 tx: &mut Transaction<'_>,
138 identity: IdentityId,
139) -> Result<()> {
140 if identity.is_privileged() {
141 return Ok(());
142 }
143 if identity.is_anonymous() {
144 let columns = Columns::single_row([
145 ("id", Value::IdentityId(identity)),
146 ("name", Value::none_of(Type::Utf8)),
147 ("roles", Value::List(vec![])),
148 ]);
149 stack.set("identity".to_string(), Variable::Columns(columns), false)?;
150 return Ok(());
151 }
152 if let Some(user) = catalog.find_user_by_identity(tx, identity)? {
153 let roles = catalog.find_role_names_for_identity(tx, identity)?;
154 let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
155 let columns = Columns::single_row([
156 ("id", Value::IdentityId(identity)),
157 ("name", Value::Utf8(user.name)),
158 ("roles", Value::List(role_values)),
159 ]);
160 stack.set("identity".to_string(), Variable::Columns(columns), false)?;
161 }
162 Ok(())
163}
164
165impl Executor {
166 #[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
171 pub fn rql(
172 &self,
173 tx: &mut Transaction<'_>,
174 identity: IdentityId,
175 rql: &str,
176 params: Params,
177 ) -> Result<Vec<Frame>> {
178 let mut result = vec![];
179 let mut symbol_table = SymbolTable::new();
180 populate_stack(&mut symbol_table, ¶ms)?;
181 populate_identity(&mut symbol_table, &self.catalog, tx, identity)?;
182
183 let compiled = match self.compiler.compile_with_policy(tx, rql, |plans, bump, cat, tx| {
184 inject_read_policies(plans, bump, cat, tx, identity)
185 }) {
186 Ok(CompilationResult::Ready(compiled)) => compiled,
187 Ok(CompilationResult::Incremental(_)) => {
188 unreachable!("incremental compilation not supported in rql()")
189 }
190 Err(err) => {
191 #[cfg(not(target_arch = "wasm32"))]
192 if let Some(frames) = self.try_forward_remote_query(&err, rql, params)? {
193 return Ok(frames);
194 }
195 return Err(err);
196 }
197 };
198
199 for compiled in compiled.iter() {
200 result.clear();
201 let mut vm = Vm::new(symbol_table, identity);
202 vm.run(&self.0, tx, &compiled.instructions, ¶ms, &mut result)?;
203 symbol_table = vm.symbol_table;
204 }
205
206 Ok(result)
207 }
208
209 #[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
210 pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> Result<Vec<Frame>> {
211 let mut result = vec![];
212 let mut output_results: Vec<Frame> = Vec::new();
213 let mut symbol_table = SymbolTable::new();
214 populate_stack(&mut symbol_table, &cmd.params)?;
215
216 let identity = cmd.identity;
217 populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Admin(&mut *txn), identity)?;
218
219 PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
220 &mut Transaction::Admin(&mut *txn),
221 identity,
222 "admin",
223 true,
224 )?;
225
226 match self.compiler.compile_with_policy(
227 &mut Transaction::Admin(txn),
228 cmd.rql,
229 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
230 ) {
231 Err(err) => {
232 #[cfg(not(target_arch = "wasm32"))]
233 if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
234 return Ok(frames);
235 }
236 return Err(err);
237 }
238 Ok(CompilationResult::Ready(compiled)) => {
239 for compiled in compiled.iter() {
240 result.clear();
241 let mut tx = Transaction::Admin(txn);
242 let mut vm = Vm::new(symbol_table, identity);
243 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
244 symbol_table = vm.symbol_table;
245
246 if compiled.is_output {
247 output_results.append(&mut result);
248 }
249 }
250 }
251 Ok(CompilationResult::Incremental(mut state)) => {
252 let policy = constrain_policy(|plans, bump, cat, tx| {
253 inject_read_policies(plans, bump, cat, tx, identity)
254 });
255 while let Some(compiled) = self.compiler.compile_next_with_policy(
256 &mut Transaction::Admin(txn),
257 &mut state,
258 &policy,
259 )? {
260 result.clear();
261 let mut tx = Transaction::Admin(txn);
262 let mut vm = Vm::new(symbol_table, identity);
263 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
264 symbol_table = vm.symbol_table;
265
266 if compiled.is_output {
267 output_results.append(&mut result);
268 }
269 }
270 }
271 }
272
273 let mut final_result = output_results;
274 final_result.append(&mut result);
275 Ok(final_result)
276 }
277
278 #[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
279 pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> Result<Vec<Frame>> {
280 let mut result = vec![];
281 let mut output_results: Vec<Frame> = Vec::new();
282 let mut symbol_table = SymbolTable::new();
283 populate_stack(&mut symbol_table, &cmd.params)?;
284
285 let identity = cmd.identity;
286 populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Command(&mut *txn), identity)?;
287
288 PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
289 &mut Transaction::Command(&mut *txn),
290 identity,
291 "command",
292 false,
293 )?;
294
295 let compiled = match self.compiler.compile_with_policy(
296 &mut Transaction::Command(txn),
297 cmd.rql,
298 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
299 ) {
300 Ok(CompilationResult::Ready(compiled)) => compiled,
301 Ok(CompilationResult::Incremental(_)) => {
302 unreachable!("DDL statements require admin transactions, not command transactions")
303 }
304 Err(err) => {
305 #[cfg(not(target_arch = "wasm32"))]
306 if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
307 return Err(Error(Diagnostic {
308 code: "REMOTE_002".to_string(),
309 message: "Write operations on remote namespaces are not supported"
310 .to_string(),
311 help: Some("Use the remote instance directly for write operations"
312 .to_string()),
313 ..Default::default()
314 }));
315 }
316 return Err(err);
317 }
318 };
319
320 for compiled in compiled.iter() {
321 result.clear();
322 let mut tx = Transaction::Command(txn);
323 let mut vm = Vm::new(symbol_table, identity);
324 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
325 symbol_table = vm.symbol_table;
326
327 if compiled.is_output {
328 output_results.append(&mut result);
329 }
330 }
331
332 let mut final_result = output_results;
333 final_result.append(&mut result);
334 Ok(final_result)
335 }
336
337 #[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
339 pub fn call_procedure(
340 &self,
341 txn: &mut CommandTransaction,
342 identity: IdentityId,
343 name: &str,
344 params: &Params,
345 ) -> Result<Vec<Frame>> {
346 let rql = format!("CALL {}()", name);
348 let mut result = vec![];
349 let mut symbol_table = SymbolTable::new();
350 populate_stack(&mut symbol_table, params)?;
351 populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Command(&mut *txn), identity)?;
352
353 let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql)? {
354 CompilationResult::Ready(compiled) => compiled,
355 CompilationResult::Incremental(_) => {
356 unreachable!("CALL statements should not require incremental compilation")
357 }
358 };
359
360 for compiled in compiled.iter() {
361 result.clear();
362 let mut tx = Transaction::Command(txn);
363 let mut vm = Vm::new(symbol_table, identity);
364 vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result)?;
365 symbol_table = vm.symbol_table;
366 }
367
368 Ok(result)
369 }
370
371 #[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
372 pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> Result<Vec<Frame>> {
373 let mut result = vec![];
374 let mut output_results: Vec<Frame> = Vec::new();
375 let mut symbol_table = SymbolTable::new();
376 populate_stack(&mut symbol_table, &qry.params)?;
377
378 let identity = qry.identity;
379 populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Query(&mut *txn), identity)?;
380
381 PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
382 &mut Transaction::Query(&mut *txn),
383 identity,
384 "query",
385 false,
386 )?;
387
388 let compiled = match self.compiler.compile_with_policy(
389 &mut Transaction::Query(txn),
390 qry.rql,
391 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
392 ) {
393 Ok(CompilationResult::Ready(compiled)) => compiled,
394 Ok(CompilationResult::Incremental(_)) => {
395 unreachable!("DDL statements require admin transactions, not query transactions")
396 }
397 Err(err) => {
398 #[cfg(not(target_arch = "wasm32"))]
399 if let Some(frames) = self.try_forward_remote_query(&err, qry.rql, qry.params)? {
400 return Ok(frames);
401 }
402 return Err(err);
403 }
404 };
405
406 for compiled in compiled.iter() {
407 result.clear();
408 let mut tx = Transaction::Query(txn);
409 let mut vm = Vm::new(symbol_table, identity);
410 vm.run(&self.0, &mut tx, &compiled.instructions, &qry.params, &mut result)?;
411 symbol_table = vm.symbol_table;
412
413 if compiled.is_output {
414 output_results.append(&mut result);
415 }
416 }
417
418 let mut final_result = output_results;
419 final_result.append(&mut result);
420 Ok(final_result)
421 }
422}