1use std::{ops::Deref, sync::Arc};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{error::diagnostic::subscription, util::ioc::IocContainer, value::column::columns::Columns};
9use reifydb_extension::transform::registry::Transforms;
10use reifydb_metric::metric::MetricReader;
11use reifydb_policy::inject_read_policies;
12use reifydb_routine::{function::registry::Functions, procedure::registry::Procedures};
13use reifydb_rql::{
14 ast::parse_str,
15 compiler::{CompilationResult, constrain_policy},
16};
17use reifydb_runtime::context::RuntimeContext;
18use reifydb_store_single::SingleStore;
19use reifydb_transaction::transaction::{
20 RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
21 query::QueryTransaction, subscription::SubscriptionTransaction,
22};
23#[cfg(not(target_arch = "wasm32"))]
24use reifydb_type::error::Diagnostic;
25use reifydb_type::{
26 error::Error,
27 params::Params,
28 value::{Value, frame::frame::Frame, r#type::Type},
29};
30use tracing::instrument;
31
32#[cfg(not(target_arch = "wasm32"))]
33use crate::remote::{self, RemoteRegistry};
34use crate::{
35 Result,
36 policy::PolicyEvaluator,
37 vm::{
38 Admin, Command, Query, Subscription, Test,
39 services::Services,
40 stack::{SymbolTable, Variable},
41 vm::Vm,
42 },
43};
44
45pub struct Executor(Arc<Services>);
47
48impl Clone for Executor {
49 fn clone(&self) -> Self {
50 Self(self.0.clone())
51 }
52}
53
54impl Deref for Executor {
55 type Target = Services;
56
57 fn deref(&self) -> &Self::Target {
58 &self.0
59 }
60}
61
62impl Executor {
63 pub fn new(
64 catalog: Catalog,
65 runtime_context: RuntimeContext,
66 functions: Functions,
67 procedures: Procedures,
68 transforms: Transforms,
69 flow_operator_store: SystemFlowOperatorStore,
70 stats_reader: MetricReader<SingleStore>,
71 ioc: IocContainer,
72 #[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
73 ) -> Self {
74 Self(Arc::new(Services::new(
75 catalog,
76 runtime_context,
77 functions,
78 procedures,
79 transforms,
80 flow_operator_store,
81 stats_reader,
82 ioc,
83 #[cfg(not(target_arch = "wasm32"))]
84 remote_registry,
85 )))
86 }
87
88 pub fn services(&self) -> &Arc<Services> {
90 &self.0
91 }
92
93 pub fn from_services(services: Arc<Services>) -> Self {
95 Self(services)
96 }
97
98 #[allow(dead_code)]
99 pub fn testing() -> Self {
100 Self(Services::testing())
101 }
102
103 #[cfg(not(target_arch = "wasm32"))]
106 fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
107 if let Some(ref registry) = self.0.remote_registry {
108 if remote::is_remote_query(err) {
109 if let Some(address) = remote::extract_remote_address(err) {
110 let token = remote::extract_remote_token(err);
111 return registry
112 .forward_query(&address, rql, params, token.as_deref())
113 .map(Some);
114 }
115 }
116 }
117 Ok(None)
118 }
119}
120
121impl RqlExecutor for Executor {
122 fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> Result<Vec<Frame>> {
123 Executor::rql(self, tx, rql, params)
124 }
125}
126
127fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
129 match params {
130 Params::Positional(values) => {
131 for (index, value) in values.iter().enumerate() {
132 let param_name = (index + 1).to_string();
133 symbols.set(param_name, Variable::scalar(value.clone()), false)?;
134 }
135 }
136 Params::Named(map) => {
137 for (name, value) in map.iter() {
138 symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
139 }
140 }
141 Params::None => {}
142 }
143 Ok(())
144}
145
146fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
149 let identity = tx.identity();
150 if identity.is_privileged() {
151 return Ok(());
152 }
153 if identity.is_anonymous() {
154 let columns = Columns::single_row([
155 ("id", Value::IdentityId(identity)),
156 ("name", Value::none_of(Type::Utf8)),
157 ("roles", Value::List(vec![])),
158 ]);
159 symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
160 return Ok(());
161 }
162 if let Some(user) = catalog.find_identity(tx, identity)? {
163 let roles = catalog.find_role_names_for_identity(tx, identity)?;
164 let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
165 let columns = Columns::single_row([
166 ("id", Value::IdentityId(identity)),
167 ("name", Value::Utf8(user.name)),
168 ("roles", Value::List(role_values)),
169 ]);
170 symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
171 }
172 Ok(())
173}
174
175impl Executor {
176 #[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
181 pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> Result<Vec<Frame>> {
182 let mut result = vec![];
183 let mut symbols = SymbolTable::new();
184 populate_symbols(&mut symbols, ¶ms)?;
185 populate_identity(&mut symbols, &self.catalog, tx)?;
186
187 let compiled = match self
188 .compiler
189 .compile_with_policy(tx, rql, |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx))
190 {
191 Ok(CompilationResult::Ready(compiled)) => compiled,
192 Ok(CompilationResult::Incremental(_)) => {
193 unreachable!("incremental compilation not supported in rql()")
194 }
195 Err(err) => {
196 #[cfg(not(target_arch = "wasm32"))]
197 if let Some(frames) = self.try_forward_remote_query(&err, rql, params)? {
198 return Ok(frames);
199 }
200 return Err(err);
201 }
202 };
203
204 for compiled in compiled.iter() {
205 result.clear();
206 let mut vm = Vm::new(symbols);
207 vm.run(&self.0, tx, &compiled.instructions, ¶ms, &mut result)?;
208 symbols = vm.symbols;
209 }
210
211 Ok(result)
212 }
213
214 #[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
215 pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> Result<Vec<Frame>> {
216 let mut result = vec![];
217 let mut output_results: Vec<Frame> = Vec::new();
218 let mut symbols = SymbolTable::new();
219 populate_symbols(&mut symbols, &cmd.params)?;
220
221 populate_identity(&mut symbols, &self.catalog, &mut Transaction::Admin(&mut *txn))?;
222
223 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
224 &mut Transaction::Admin(&mut *txn),
225 "admin",
226 true,
227 )?;
228
229 match self.compiler.compile_with_policy(
230 &mut Transaction::Admin(txn),
231 cmd.rql,
232 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
233 ) {
234 Err(err) => {
235 #[cfg(not(target_arch = "wasm32"))]
236 if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
237 return Ok(frames);
238 }
239 return Err(err);
240 }
241 Ok(CompilationResult::Ready(compiled)) => {
242 for compiled in compiled.iter() {
243 result.clear();
244 let mut tx = Transaction::Admin(txn);
245 let mut vm = Vm::new(symbols);
246 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
247 symbols = vm.symbols;
248
249 if compiled.is_output {
250 output_results.append(&mut result);
251 }
252 }
253 }
254 Ok(CompilationResult::Incremental(mut state)) => {
255 let policy = constrain_policy(|plans, bump, cat, tx| {
256 inject_read_policies(plans, bump, cat, tx)
257 });
258 while let Some(compiled) = self.compiler.compile_next_with_policy(
259 &mut Transaction::Admin(txn),
260 &mut state,
261 &policy,
262 )? {
263 result.clear();
264 let mut tx = Transaction::Admin(txn);
265 let mut vm = Vm::new(symbols);
266 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
267 symbols = vm.symbols;
268
269 if compiled.is_output {
270 output_results.append(&mut result);
271 }
272 }
273 }
274 }
275
276 let mut final_result = output_results;
277 final_result.append(&mut result);
278 Ok(final_result)
279 }
280
281 #[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
282 pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> Result<Vec<Frame>> {
283 let mut result = vec![];
284 let mut output_results: Vec<Frame> = Vec::new();
285 let mut symbols = SymbolTable::new();
286 populate_symbols(&mut symbols, &cmd.params)?;
287
288 populate_identity(&mut symbols, &self.catalog, &mut Transaction::Test(txn.reborrow()))?;
289
290 let session_type = txn.session_type.clone();
291 let session_default_deny = txn.session_default_deny;
292 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
293 &mut Transaction::Test(txn.reborrow()),
294 &session_type,
295 session_default_deny,
296 )?;
297
298 match self.compiler.compile_with_policy(
299 &mut Transaction::Test(txn.reborrow()),
300 cmd.rql,
301 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
302 ) {
303 Err(err) => {
304 #[cfg(not(target_arch = "wasm32"))]
305 if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
306 return Ok(frames);
307 }
308 return Err(err);
309 }
310 Ok(CompilationResult::Ready(compiled)) => {
311 for compiled in compiled.iter() {
312 result.clear();
313 let mut tx = Transaction::Test(txn.reborrow());
314 let mut vm = Vm::new(symbols);
315 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
316 symbols = vm.symbols;
317
318 if compiled.is_output {
319 output_results.append(&mut result);
320 }
321 }
322 }
323 Ok(CompilationResult::Incremental(mut state)) => {
324 let policy = constrain_policy(|plans, bump, cat, tx| {
325 inject_read_policies(plans, bump, cat, tx)
326 });
327 while let Some(compiled) = self.compiler.compile_next_with_policy(
328 &mut Transaction::Test(txn.reborrow()),
329 &mut state,
330 &policy,
331 )? {
332 result.clear();
333 let mut tx = Transaction::Test(txn.reborrow());
334 let mut vm = Vm::new(symbols);
335 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
336 symbols = vm.symbols;
337
338 if compiled.is_output {
339 output_results.append(&mut result);
340 }
341 }
342 }
343 }
344
345 let mut final_result = output_results;
346 final_result.append(&mut result);
347 Ok(final_result)
348 }
349
350 #[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
351 pub fn subscription(&self, txn: &mut SubscriptionTransaction, cmd: Subscription<'_>) -> Result<Vec<Frame>> {
352 let bump = Bump::new();
354 let statements = parse_str(&bump, cmd.rql)?;
355
356 if statements.len() != 1 {
357 return Err(Error(subscription::single_statement_required(
358 "Subscription endpoint requires exactly one statement",
359 )));
360 }
361
362 let statement = &statements[0];
363 if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
364 return Err(Error(subscription::invalid_statement(
365 "Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
366 )));
367 }
368
369 let mut result = vec![];
371 let mut output_results: Vec<Frame> = Vec::new();
372 let mut symbols = SymbolTable::new();
373 populate_symbols(&mut symbols, &cmd.params)?;
374
375 populate_identity(&mut symbols, &self.catalog, &mut Transaction::Subscription(&mut *txn))?;
376
377 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
378 &mut Transaction::Subscription(&mut *txn),
379 "subscription",
380 true,
381 )?;
382
383 let compiled = match self.compiler.compile_with_policy(
384 &mut Transaction::Subscription(txn),
385 cmd.rql,
386 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
387 ) {
388 Ok(CompilationResult::Ready(compiled)) => compiled,
389 Ok(CompilationResult::Incremental(_)) => {
390 unreachable!("Single subscription statement should not require incremental compilation")
391 }
392 Err(err) => return Err(err),
393 };
394
395 for compiled in compiled.iter() {
396 result.clear();
397 let mut tx = Transaction::Subscription(txn);
398 let mut vm = Vm::new(symbols);
399 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
400 symbols = vm.symbols;
401
402 if compiled.is_output {
403 output_results.append(&mut result);
404 }
405 }
406
407 let mut final_result = output_results;
408 final_result.append(&mut result);
409 Ok(final_result)
410 }
411
412 #[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
413 pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> Result<Vec<Frame>> {
414 let mut result = vec![];
415 let mut output_results: Vec<Frame> = Vec::new();
416 let mut symbols = SymbolTable::new();
417 populate_symbols(&mut symbols, &cmd.params)?;
418
419 populate_identity(&mut symbols, &self.catalog, &mut Transaction::Command(&mut *txn))?;
420
421 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
422 &mut Transaction::Command(&mut *txn),
423 "command",
424 false,
425 )?;
426
427 let compiled = match self.compiler.compile_with_policy(
428 &mut Transaction::Command(txn),
429 cmd.rql,
430 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
431 ) {
432 Ok(CompilationResult::Ready(compiled)) => compiled,
433 Ok(CompilationResult::Incremental(_)) => {
434 unreachable!("DDL statements require admin transactions, not command transactions")
435 }
436 Err(err) => {
437 #[cfg(not(target_arch = "wasm32"))]
438 if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
439 return Err(Error(Diagnostic {
440 code: "REMOTE_002".to_string(),
441 message: "Write operations on remote namespaces are not supported"
442 .to_string(),
443 help: Some("Use the remote instance directly for write operations"
444 .to_string()),
445 ..Default::default()
446 }));
447 }
448 return Err(err);
449 }
450 };
451
452 for compiled in compiled.iter() {
453 result.clear();
454 let mut tx = Transaction::Command(txn);
455 let mut vm = Vm::new(symbols);
456 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
457 symbols = vm.symbols;
458
459 if compiled.is_output {
460 output_results.append(&mut result);
461 }
462 }
463
464 let mut final_result = output_results;
465 final_result.append(&mut result);
466 Ok(final_result)
467 }
468
469 #[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
471 pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> Result<Vec<Frame>> {
472 let rql = format!("CALL {}()", name);
474 let mut result = vec![];
475 let mut symbols = SymbolTable::new();
476 populate_symbols(&mut symbols, params)?;
477 populate_identity(&mut symbols, &self.catalog, &mut Transaction::Command(&mut *txn))?;
478
479 let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql)? {
480 CompilationResult::Ready(compiled) => compiled,
481 CompilationResult::Incremental(_) => {
482 unreachable!("CALL statements should not require incremental compilation")
483 }
484 };
485
486 for compiled in compiled.iter() {
487 result.clear();
488 let mut tx = Transaction::Command(txn);
489 let mut vm = Vm::new(symbols);
490 vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result)?;
491 symbols = vm.symbols;
492 }
493
494 Ok(result)
495 }
496
497 #[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
498 pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> Result<Vec<Frame>> {
499 let mut result = vec![];
500 let mut output_results: Vec<Frame> = Vec::new();
501 let mut symbols = SymbolTable::new();
502 populate_symbols(&mut symbols, &qry.params)?;
503
504 populate_identity(&mut symbols, &self.catalog, &mut Transaction::Query(&mut *txn))?;
505
506 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
507 &mut Transaction::Query(&mut *txn),
508 "query",
509 false,
510 )?;
511
512 let compiled = match self.compiler.compile_with_policy(
513 &mut Transaction::Query(txn),
514 qry.rql,
515 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx),
516 ) {
517 Ok(CompilationResult::Ready(compiled)) => compiled,
518 Ok(CompilationResult::Incremental(_)) => {
519 unreachable!("DDL statements require admin transactions, not query transactions")
520 }
521 Err(err) => {
522 #[cfg(not(target_arch = "wasm32"))]
523 if let Some(frames) = self.try_forward_remote_query(&err, qry.rql, qry.params)? {
524 return Ok(frames);
525 }
526 return Err(err);
527 }
528 };
529
530 for compiled in compiled.iter() {
531 result.clear();
532 let mut tx = Transaction::Query(txn);
533 let mut vm = Vm::new(symbols);
534 vm.run(&self.0, &mut tx, &compiled.instructions, &qry.params, &mut result)?;
535 symbols = vm.symbols;
536
537 if compiled.is_output {
538 output_results.append(&mut result);
539 }
540 }
541
542 let mut final_result = output_results;
543 final_result.append(&mut result);
544 Ok(final_result)
545 }
546}