1use std::{ops::Deref, sync::Arc};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{error::diagnostic::subscription, util::ioc::IocContainer, value::column::columns::Columns};
9use reifydb_extension::transform::registry::Transforms;
10use reifydb_metric::metric::MetricReader;
11use reifydb_policy::inject_read_policies;
12use reifydb_routine::{function::registry::Functions, procedure::registry::Procedures};
13use reifydb_rql::{
14 ast::parse_str,
15 compiler::{CompilationResult, Compiled, constrain_policy},
16};
17use reifydb_runtime::context::RuntimeContext;
18use reifydb_store_single::SingleStore;
19use reifydb_transaction::transaction::{
20 RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
21 query::QueryTransaction, subscription::SubscriptionTransaction,
22};
23#[cfg(not(target_arch = "wasm32"))]
24use reifydb_type::error::Diagnostic;
25use reifydb_type::{
26 error::Error,
27 params::Params,
28 value::{Value, frame::frame::Frame, r#type::Type},
29};
30use tracing::instrument;
31
32#[cfg(not(target_arch = "wasm32"))]
33use crate::remote::{self, RemoteRegistry};
34use crate::{
35 Result,
36 policy::PolicyEvaluator,
37 vm::{
38 Admin, Command, Query, Subscription, Test,
39 services::Services,
40 stack::{SymbolTable, Variable},
41 vm::Vm,
42 },
43};
44
45pub struct Executor(Arc<Services>);
47
48impl Clone for Executor {
49 fn clone(&self) -> Self {
50 Self(self.0.clone())
51 }
52}
53
54impl Deref for Executor {
55 type Target = Services;
56
57 fn deref(&self) -> &Self::Target {
58 &self.0
59 }
60}
61
62impl Executor {
63 pub fn new(
64 catalog: Catalog,
65 runtime_context: RuntimeContext,
66 functions: Functions,
67 procedures: Procedures,
68 transforms: Transforms,
69 flow_operator_store: SystemFlowOperatorStore,
70 stats_reader: MetricReader<SingleStore>,
71 ioc: IocContainer,
72 #[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
73 ) -> Self {
74 Self(Arc::new(Services::new(
75 catalog,
76 runtime_context,
77 functions,
78 procedures,
79 transforms,
80 flow_operator_store,
81 stats_reader,
82 ioc,
83 #[cfg(not(target_arch = "wasm32"))]
84 remote_registry,
85 )))
86 }
87
88 pub fn services(&self) -> &Arc<Services> {
90 &self.0
91 }
92
93 pub fn from_services(services: Arc<Services>) -> Self {
95 Self(services)
96 }
97
98 #[allow(dead_code)]
99 pub fn testing() -> Self {
100 Self(Services::testing())
101 }
102
103 #[cfg(not(target_arch = "wasm32"))]
106 fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
107 if let Some(ref registry) = self.0.remote_registry {
108 if remote::is_remote_query(err) {
109 if let Some(address) = remote::extract_remote_address(err) {
110 let token = remote::extract_remote_token(err);
111 return registry
112 .forward_query(&address, rql, params, token.as_deref())
113 .map(Some);
114 }
115 }
116 }
117 Ok(None)
118 }
119}
120
121impl RqlExecutor for Executor {
122 fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> Result<Vec<Frame>> {
123 Executor::rql(self, tx, rql, params)
124 }
125}
126
127fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
129 match params {
130 Params::Positional(values) => {
131 for (index, value) in values.iter().enumerate() {
132 let param_name = (index + 1).to_string();
133 symbols.set(param_name, Variable::scalar(value.clone()), false)?;
134 }
135 }
136 Params::Named(map) => {
137 for (name, value) in map.iter() {
138 symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
139 }
140 }
141 Params::None => {}
142 }
143 Ok(())
144}
145
146fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
149 let identity = tx.identity();
150 if identity.is_privileged() {
151 return Ok(());
152 }
153 if identity.is_anonymous() {
154 let columns = Columns::single_row([
155 ("id", Value::IdentityId(identity)),
156 ("name", Value::none_of(Type::Utf8)),
157 ("roles", Value::List(vec![])),
158 ]);
159 symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
160 return Ok(());
161 }
162 if let Some(user) = catalog.find_identity(tx, identity)? {
163 let roles = catalog.find_role_names_for_identity(tx, identity)?;
164 let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
165 let columns = Columns::single_row([
166 ("id", Value::IdentityId(identity)),
167 ("name", Value::Utf8(user.name)),
168 ("roles", Value::List(role_values)),
169 ]);
170 symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
171 }
172 Ok(())
173}
174
175fn execute_compiled_units(
178 services: &Arc<Services>,
179 tx: &mut Transaction<'_>,
180 compiled_list: &[Compiled],
181 params: &Params,
182 mut symbols: SymbolTable,
183) -> Result<(Vec<Frame>, Vec<Frame>, SymbolTable)> {
184 let mut result = vec![];
185 let mut output_results: Vec<Frame> = Vec::new();
186
187 for compiled in compiled_list.iter() {
188 result.clear();
189 let mut vm = Vm::new(symbols);
190 vm.run(services, tx, &compiled.instructions, params, &mut result)?;
191 symbols = vm.symbols;
192
193 if compiled.is_output {
194 output_results.append(&mut result);
195 }
196 }
197
198 Ok((output_results, result, symbols))
199}
200
201fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
203 output_results.append(&mut remaining);
204 output_results
205}
206
207impl Executor {
208 fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
210 let mut symbols = SymbolTable::new();
211 populate_symbols(&mut symbols, params)?;
212 populate_identity(&mut symbols, &self.catalog, tx)?;
213 Ok(symbols)
214 }
215
216 #[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
221 pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> Result<Vec<Frame>> {
222 let mut symbols = self.setup_symbols(¶ms, tx)?;
223
224 let compiled = match self
225 .compiler
226 .compile_with_policy(tx, rql, |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx))
227 {
228 Ok(CompilationResult::Ready(compiled)) => compiled,
229 Ok(CompilationResult::Incremental(_)) => {
230 unreachable!("incremental compilation not supported in rql()")
231 }
232 Err(err) => {
233 #[cfg(not(target_arch = "wasm32"))]
234 if let Some(frames) = self.try_forward_remote_query(&err, rql, params)? {
235 return Ok(frames);
236 }
237 return Err(err);
238 }
239 };
240
241 let mut result = vec![];
242 for compiled in compiled.iter() {
243 result.clear();
244 let mut vm = Vm::new(symbols);
245 vm.run(&self.0, tx, &compiled.instructions, ¶ms, &mut result)?;
246 symbols = vm.symbols;
247 }
248
249 Ok(result)
250 }
251
252 #[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
253 pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> Result<Vec<Frame>> {
254 let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn))?;
255
256 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
257 &mut Transaction::Admin(&mut *txn),
258 "admin",
259 true,
260 )?;
261
262 match self.compiler.compile_with_policy(
263 &mut Transaction::Admin(txn),
264 cmd.rql,
265 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
266 ) {
267 Err(err) => {
268 #[cfg(not(target_arch = "wasm32"))]
269 if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
270 return Ok(frames);
271 }
272 Err(err)
273 }
274 Ok(CompilationResult::Ready(compiled)) => {
275 let (output, remaining, _) = execute_compiled_units(
276 &self.0,
277 &mut Transaction::Admin(txn),
278 &compiled,
279 &cmd.params,
280 symbols,
281 )?;
282 Ok(merge_results(output, remaining))
283 }
284 Ok(CompilationResult::Incremental(mut state)) => {
285 let policy = constrain_policy(|plans, bump, cat, tx| {
286 inject_read_policies(plans, bump, cat, tx)
287 });
288 let mut result = vec![];
289 let mut output_results: Vec<Frame> = Vec::new();
290 let mut symbols = symbols;
291 while let Some(compiled) = self.compiler.compile_next_with_policy(
292 &mut Transaction::Admin(txn),
293 &mut state,
294 &policy,
295 )? {
296 result.clear();
297 let mut tx = Transaction::Admin(txn);
298 let mut vm = Vm::new(symbols);
299 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
300 symbols = vm.symbols;
301 if compiled.is_output {
302 output_results.append(&mut result);
303 }
304 }
305 Ok(merge_results(output_results, result))
306 }
307 }
308 }
309
310 #[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
311 pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> Result<Vec<Frame>> {
312 let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Test(txn.reborrow()))?;
313
314 let session_type = txn.session_type.clone();
315 let session_default_deny = txn.session_default_deny;
316 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
317 &mut Transaction::Test(txn.reborrow()),
318 &session_type,
319 session_default_deny,
320 )?;
321
322 match self.compiler.compile_with_policy(
323 &mut Transaction::Test(txn.reborrow()),
324 cmd.rql,
325 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
326 ) {
327 Err(err) => {
328 #[cfg(not(target_arch = "wasm32"))]
329 if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
330 return Ok(frames);
331 }
332 Err(err)
333 }
334 Ok(CompilationResult::Ready(compiled)) => {
335 let (output, remaining, _) = execute_compiled_units(
336 &self.0,
337 &mut Transaction::Test(txn.reborrow()),
338 &compiled,
339 &cmd.params,
340 symbols,
341 )?;
342 Ok(merge_results(output, remaining))
343 }
344 Ok(CompilationResult::Incremental(mut state)) => {
345 let policy = constrain_policy(|plans, bump, cat, tx| {
346 inject_read_policies(plans, bump, cat, tx)
347 });
348 let mut result = vec![];
349 let mut output_results: Vec<Frame> = Vec::new();
350 let mut symbols = symbols;
351 while let Some(compiled) = self.compiler.compile_next_with_policy(
352 &mut Transaction::Test(txn.reborrow()),
353 &mut state,
354 &policy,
355 )? {
356 result.clear();
357 let mut tx = Transaction::Test(txn.reborrow());
358 let mut vm = Vm::new(symbols);
359 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
360 symbols = vm.symbols;
361 if compiled.is_output {
362 output_results.append(&mut result);
363 }
364 }
365 Ok(merge_results(output_results, result))
366 }
367 }
368 }
369
370 #[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
371 pub fn subscription(&self, txn: &mut SubscriptionTransaction, cmd: Subscription<'_>) -> Result<Vec<Frame>> {
372 let bump = Bump::new();
374 let statements = parse_str(&bump, cmd.rql)?;
375
376 if statements.len() != 1 {
377 return Err(Error(subscription::single_statement_required(
378 "Subscription endpoint requires exactly one statement",
379 )));
380 }
381
382 let statement = &statements[0];
383 if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
384 return Err(Error(subscription::invalid_statement(
385 "Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
386 )));
387 }
388
389 let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Subscription(&mut *txn))?;
390
391 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
392 &mut Transaction::Subscription(&mut *txn),
393 "subscription",
394 true,
395 )?;
396
397 let compiled = match self.compiler.compile_with_policy(
398 &mut Transaction::Subscription(txn),
399 cmd.rql,
400 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
401 ) {
402 Ok(CompilationResult::Ready(compiled)) => compiled,
403 Ok(CompilationResult::Incremental(_)) => {
404 unreachable!("Single subscription statement should not require incremental compilation")
405 }
406 Err(err) => return Err(err),
407 };
408
409 let (output, remaining, _) = execute_compiled_units(
410 &self.0,
411 &mut Transaction::Subscription(txn),
412 &compiled,
413 &cmd.params,
414 symbols,
415 )?;
416 Ok(merge_results(output, remaining))
417 }
418
419 #[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
420 pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> Result<Vec<Frame>> {
421 let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn))?;
422
423 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
424 &mut Transaction::Command(&mut *txn),
425 "command",
426 false,
427 )?;
428
429 let compiled = match self.compiler.compile_with_policy(
430 &mut Transaction::Command(txn),
431 cmd.rql,
432 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
433 ) {
434 Ok(CompilationResult::Ready(compiled)) => compiled,
435 Ok(CompilationResult::Incremental(_)) => {
436 unreachable!("DDL statements require admin transactions, not command transactions")
437 }
438 Err(err) => {
439 #[cfg(not(target_arch = "wasm32"))]
440 if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
441 return Err(Error(Diagnostic {
442 code: "REMOTE_002".to_string(),
443 message: "Write operations on remote namespaces are not supported"
444 .to_string(),
445 help: Some("Use the remote instance directly for write operations"
446 .to_string()),
447 ..Default::default()
448 }));
449 }
450 return Err(err);
451 }
452 };
453
454 let (output, remaining, _) = execute_compiled_units(
455 &self.0,
456 &mut Transaction::Command(txn),
457 &compiled,
458 &cmd.params,
459 symbols,
460 )?;
461 Ok(merge_results(output, remaining))
462 }
463
464 #[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
466 pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> Result<Vec<Frame>> {
467 let rql = format!("CALL {}()", name);
468 let symbols = self.setup_symbols(params, &mut Transaction::Command(&mut *txn))?;
469
470 let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql)? {
471 CompilationResult::Ready(compiled) => compiled,
472 CompilationResult::Incremental(_) => {
473 unreachable!("CALL statements should not require incremental compilation")
474 }
475 };
476
477 let mut result = vec![];
478 let mut symbols = symbols;
479 for compiled in compiled.iter() {
480 result.clear();
481 let mut tx = Transaction::Command(txn);
482 let mut vm = Vm::new(symbols);
483 vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result)?;
484 symbols = vm.symbols;
485 }
486
487 Ok(result)
488 }
489
490 #[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
491 pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> Result<Vec<Frame>> {
492 let symbols = self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn))?;
493
494 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
495 &mut Transaction::Query(&mut *txn),
496 "query",
497 false,
498 )?;
499
500 let compiled = match self.compiler.compile_with_policy(
501 &mut Transaction::Query(txn),
502 qry.rql,
503 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
504 ) {
505 Ok(CompilationResult::Ready(compiled)) => compiled,
506 Ok(CompilationResult::Incremental(_)) => {
507 unreachable!("DDL statements require admin transactions, not query transactions")
508 }
509 Err(err) => {
510 #[cfg(not(target_arch = "wasm32"))]
511 if let Some(frames) = self.try_forward_remote_query(&err, qry.rql, qry.params)? {
512 return Ok(frames);
513 }
514 return Err(err);
515 }
516 };
517
518 let (output, remaining, _) =
519 execute_compiled_units(&self.0, &mut Transaction::Query(txn), &compiled, &qry.params, symbols)?;
520 Ok(merge_results(output, remaining))
521 }
522}