1use std::{ops::Deref, sync::Arc};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{error::diagnostic::subscription, value::column::columns::Columns};
9use reifydb_metric_old::metric::MetricReader;
10use reifydb_policy::inject_read_policies;
11use reifydb_rql::{
12 ast::parse_str,
13 compiler::{CompilationResult, Compiled, constrain_policy},
14};
15use reifydb_store_single::SingleStore;
16use reifydb_transaction::transaction::{
17 RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
18 query::QueryTransaction,
19};
20#[cfg(not(target_arch = "wasm32"))]
21use reifydb_type::error::Diagnostic;
22use reifydb_type::{
23 error::Error,
24 params::Params,
25 value::{Value, frame::frame::Frame, r#type::Type},
26};
27use tracing::instrument;
28
29#[cfg(not(target_arch = "wasm32"))]
30use crate::remote;
31use crate::{
32 Result,
33 policy::PolicyEvaluator,
34 vm::{
35 Admin, Command, Query, Subscription, Test,
36 services::{EngineConfig, Services},
37 stack::{SymbolTable, Variable},
38 vm::Vm,
39 },
40};
41
42pub struct Executor(Arc<Services>);
44
45impl Clone for Executor {
46 fn clone(&self) -> Self {
47 Self(self.0.clone())
48 }
49}
50
51impl Deref for Executor {
52 type Target = Services;
53
54 fn deref(&self) -> &Self::Target {
55 &self.0
56 }
57}
58
59impl Executor {
60 pub fn new(
61 catalog: Catalog,
62 config: EngineConfig,
63 flow_operator_store: SystemFlowOperatorStore,
64 stats_reader: MetricReader<SingleStore>,
65 ) -> Self {
66 Self(Arc::new(Services::new(catalog, config, flow_operator_store, stats_reader)))
67 }
68
69 pub fn services(&self) -> &Arc<Services> {
71 &self.0
72 }
73
74 pub fn from_services(services: Arc<Services>) -> Self {
76 Self(services)
77 }
78
79 #[allow(dead_code)]
80 pub fn testing() -> Self {
81 Self(Services::testing())
82 }
83
84 #[cfg(not(target_arch = "wasm32"))]
87 fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
88 if let Some(ref registry) = self.0.remote_registry
89 && remote::is_remote_query(err)
90 && let Some(address) = remote::extract_remote_address(err)
91 {
92 let token = remote::extract_remote_token(err);
93 return registry.forward_query(&address, rql, params, token.as_deref()).map(Some);
94 }
95 Ok(None)
96 }
97}
98
99impl RqlExecutor for Executor {
100 fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> Result<Vec<Frame>> {
101 Executor::rql(self, tx, rql, params)
102 }
103}
104
105fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
107 match params {
108 Params::Positional(values) => {
109 for (index, value) in values.iter().enumerate() {
110 let param_name = (index + 1).to_string();
111 symbols.set(param_name, Variable::scalar(value.clone()), false)?;
112 }
113 }
114 Params::Named(map) => {
115 for (name, value) in map.iter() {
116 symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
117 }
118 }
119 Params::None => {}
120 }
121 Ok(())
122}
123
124fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
127 let identity = tx.identity();
128 if identity.is_privileged() {
129 return Ok(());
130 }
131 if identity.is_anonymous() {
132 let columns = Columns::single_row([
133 ("id", Value::IdentityId(identity)),
134 ("name", Value::none_of(Type::Utf8)),
135 ("roles", Value::List(vec![])),
136 ]);
137 symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
138 return Ok(());
139 }
140 if let Some(user) = catalog.find_identity(tx, identity)? {
141 let roles = catalog.find_role_names_for_identity(tx, identity)?;
142 let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
143 let columns = Columns::single_row([
144 ("id", Value::IdentityId(identity)),
145 ("name", Value::Utf8(user.name)),
146 ("roles", Value::List(role_values)),
147 ]);
148 symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
149 }
150 Ok(())
151}
152
153fn execute_compiled_units(
156 services: &Arc<Services>,
157 tx: &mut Transaction<'_>,
158 compiled_list: &[Compiled],
159 params: &Params,
160 mut symbols: SymbolTable,
161) -> Result<(Vec<Frame>, Vec<Frame>, SymbolTable)> {
162 let mut result = vec![];
163 let mut output_results: Vec<Frame> = Vec::new();
164
165 for compiled in compiled_list.iter() {
166 result.clear();
167 let mut vm = Vm::new(symbols);
168 vm.run(services, tx, &compiled.instructions, params, &mut result)?;
169 symbols = vm.symbols;
170
171 if compiled.is_output {
172 output_results.append(&mut result);
173 }
174 }
175
176 Ok((output_results, result, symbols))
177}
178
179fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
181 output_results.append(&mut remaining);
182 output_results
183}
184
185impl Executor {
186 fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
188 let mut symbols = SymbolTable::new();
189 populate_symbols(&mut symbols, params)?;
190 populate_identity(&mut symbols, &self.catalog, tx)?;
191 Ok(symbols)
192 }
193
194 #[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
199 pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> Result<Vec<Frame>> {
200 let mut symbols = self.setup_symbols(¶ms, tx)?;
201
202 let compiled = match self.compiler.compile_with_policy(tx, rql, inject_read_policies) {
203 Ok(CompilationResult::Ready(compiled)) => compiled,
204 Ok(CompilationResult::Incremental(_)) => {
205 unreachable!("incremental compilation not supported in rql()")
206 }
207 Err(err) => {
208 #[cfg(not(target_arch = "wasm32"))]
209 if let Some(frames) = self.try_forward_remote_query(&err, rql, params)? {
210 return Ok(frames);
211 }
212 return Err(err);
213 }
214 };
215
216 let mut result = vec![];
217 for compiled in compiled.iter() {
218 result.clear();
219 let mut vm = Vm::new(symbols);
220 vm.run(&self.0, tx, &compiled.instructions, ¶ms, &mut result)?;
221 symbols = vm.symbols;
222 }
223
224 Ok(result)
225 }
226
227 #[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
228 pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> Result<Vec<Frame>> {
229 let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn))?;
230
231 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
232 &mut Transaction::Admin(&mut *txn),
233 "admin",
234 true,
235 )?;
236
237 match self.compiler.compile_with_policy(&mut Transaction::Admin(txn), cmd.rql, inject_read_policies) {
238 Err(err) => {
239 #[cfg(not(target_arch = "wasm32"))]
240 if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
241 return Ok(frames);
242 }
243 Err(err)
244 }
245 Ok(CompilationResult::Ready(compiled)) => {
246 let (output, remaining, _) = execute_compiled_units(
247 &self.0,
248 &mut Transaction::Admin(txn),
249 &compiled,
250 &cmd.params,
251 symbols,
252 )?;
253 Ok(merge_results(output, remaining))
254 }
255 Ok(CompilationResult::Incremental(mut state)) => {
256 let policy = constrain_policy(|plans, bump, cat, tx| {
257 inject_read_policies(plans, bump, cat, tx)
258 });
259 let mut result = vec![];
260 let mut output_results: Vec<Frame> = Vec::new();
261 let mut symbols = symbols;
262 while let Some(compiled) = self.compiler.compile_next_with_policy(
263 &mut Transaction::Admin(txn),
264 &mut state,
265 &policy,
266 )? {
267 result.clear();
268 let mut tx = Transaction::Admin(txn);
269 let mut vm = Vm::new(symbols);
270 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
271 symbols = vm.symbols;
272 if compiled.is_output {
273 output_results.append(&mut result);
274 }
275 }
276 Ok(merge_results(output_results, result))
277 }
278 }
279 }
280
281 #[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
282 pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> Result<Vec<Frame>> {
283 let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Test(Box::new(txn.reborrow())))?;
284
285 let session_type = txn.session_type.clone();
286 let session_default_deny = txn.session_default_deny;
287 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
288 &mut Transaction::Test(Box::new(txn.reborrow())),
289 &session_type,
290 session_default_deny,
291 )?;
292
293 match self.compiler.compile_with_policy(
294 &mut Transaction::Test(Box::new(txn.reborrow())),
295 cmd.rql,
296 inject_read_policies,
297 ) {
298 Err(err) => {
299 #[cfg(not(target_arch = "wasm32"))]
300 if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
301 return Ok(frames);
302 }
303 Err(err)
304 }
305 Ok(CompilationResult::Ready(compiled)) => {
306 let (output, remaining, _) = execute_compiled_units(
307 &self.0,
308 &mut Transaction::Test(Box::new(txn.reborrow())),
309 &compiled,
310 &cmd.params,
311 symbols,
312 )?;
313 Ok(merge_results(output, remaining))
314 }
315 Ok(CompilationResult::Incremental(mut state)) => {
316 let policy = constrain_policy(|plans, bump, cat, tx| {
317 inject_read_policies(plans, bump, cat, tx)
318 });
319 let mut result = vec![];
320 let mut output_results: Vec<Frame> = Vec::new();
321 let mut symbols = symbols;
322 while let Some(compiled) = self.compiler.compile_next_with_policy(
323 &mut Transaction::Test(Box::new(txn.reborrow())),
324 &mut state,
325 &policy,
326 )? {
327 result.clear();
328 let mut tx = Transaction::Test(Box::new(txn.reborrow()));
329 let mut vm = Vm::new(symbols);
330 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
331 symbols = vm.symbols;
332 if compiled.is_output {
333 output_results.append(&mut result);
334 }
335 }
336 Ok(merge_results(output_results, result))
337 }
338 }
339 }
340
341 #[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
342 pub fn subscription(&self, txn: &mut QueryTransaction, cmd: Subscription<'_>) -> Result<Vec<Frame>> {
343 let bump = Bump::new();
345 let statements = parse_str(&bump, cmd.rql)?;
346
347 if statements.len() != 1 {
348 return Err(Error(Box::new(subscription::single_statement_required(
349 "Subscription endpoint requires exactly one statement",
350 ))));
351 }
352
353 let statement = &statements[0];
354 if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
355 return Err(Error(Box::new(subscription::invalid_statement(
356 "Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
357 ))));
358 }
359
360 let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Query(&mut *txn))?;
361
362 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
363 &mut Transaction::Query(&mut *txn),
364 "subscription",
365 true,
366 )?;
367
368 let compiled = match self.compiler.compile_with_policy(
369 &mut Transaction::Query(txn),
370 cmd.rql,
371 inject_read_policies,
372 ) {
373 Ok(CompilationResult::Ready(compiled)) => compiled,
374 Ok(CompilationResult::Incremental(_)) => {
375 unreachable!("Single subscription statement should not require incremental compilation")
376 }
377 Err(err) => return Err(err),
378 };
379
380 let (output, remaining, _) =
381 execute_compiled_units(&self.0, &mut Transaction::Query(txn), &compiled, &cmd.params, symbols)?;
382 Ok(merge_results(output, remaining))
383 }
384
385 #[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
386 pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> Result<Vec<Frame>> {
387 let symbols = self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn))?;
388
389 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
390 &mut Transaction::Command(&mut *txn),
391 "command",
392 false,
393 )?;
394
395 let compiled = match self.compiler.compile_with_policy(
396 &mut Transaction::Command(txn),
397 cmd.rql,
398 inject_read_policies,
399 ) {
400 Ok(CompilationResult::Ready(compiled)) => compiled,
401 Ok(CompilationResult::Incremental(_)) => {
402 unreachable!("DDL statements require admin transactions, not command transactions")
403 }
404 Err(err) => {
405 #[cfg(not(target_arch = "wasm32"))]
406 if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
407 return Err(Error(Box::new(Diagnostic {
408 code: "REMOTE_002".to_string(),
409 message: "Write operations on remote namespaces are not supported"
410 .to_string(),
411 help: Some("Use the remote instance directly for write operations"
412 .to_string()),
413 ..Default::default()
414 })));
415 }
416 return Err(err);
417 }
418 };
419
420 let (output, remaining, _) = execute_compiled_units(
421 &self.0,
422 &mut Transaction::Command(txn),
423 &compiled,
424 &cmd.params,
425 symbols,
426 )?;
427 Ok(merge_results(output, remaining))
428 }
429
430 #[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
432 pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> Result<Vec<Frame>> {
433 let rql = format!("CALL {}()", name);
434 let symbols = self.setup_symbols(params, &mut Transaction::Command(&mut *txn))?;
435
436 let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql)? {
437 CompilationResult::Ready(compiled) => compiled,
438 CompilationResult::Incremental(_) => {
439 unreachable!("CALL statements should not require incremental compilation")
440 }
441 };
442
443 let mut result = vec![];
444 let mut symbols = symbols;
445 for compiled in compiled.iter() {
446 result.clear();
447 let mut tx = Transaction::Command(txn);
448 let mut vm = Vm::new(symbols);
449 vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result)?;
450 symbols = vm.symbols;
451 }
452
453 Ok(result)
454 }
455
456 #[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
457 pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> Result<Vec<Frame>> {
458 let symbols = self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn))?;
459
460 PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
461 &mut Transaction::Query(&mut *txn),
462 "query",
463 false,
464 )?;
465
466 let compiled = match self.compiler.compile_with_policy(
467 &mut Transaction::Query(txn),
468 qry.rql,
469 inject_read_policies,
470 ) {
471 Ok(CompilationResult::Ready(compiled)) => compiled,
472 Ok(CompilationResult::Incremental(_)) => {
473 unreachable!("DDL statements require admin transactions, not query transactions")
474 }
475 Err(err) => {
476 #[cfg(not(target_arch = "wasm32"))]
477 if let Some(frames) = self.try_forward_remote_query(&err, qry.rql, qry.params)? {
478 return Ok(frames);
479 }
480 return Err(err);
481 }
482 };
483
484 let (output, remaining, _) =
485 execute_compiled_units(&self.0, &mut Transaction::Query(txn), &compiled, &qry.params, symbols)?;
486 Ok(merge_results(output, remaining))
487 }
488}