1use std::{ops::Deref, sync::Arc};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::FlowOperatorStore};
8use reifydb_core::{error::diagnostic::subscription, util::ioc::IocContainer, value::column::columns::Columns};
9use reifydb_function::registry::Functions;
10use reifydb_metric::metric::MetricReader;
11use reifydb_policy::inject_read_policies;
12use reifydb_rql::{
13 ast::parse_str,
14 compiler::{CompilationResult, constrain_policy},
15};
16use reifydb_runtime::clock::Clock;
17use reifydb_store_single::SingleStore;
18use reifydb_transaction::transaction::{
19 Transaction, admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
20 subscription::SubscriptionTransaction,
21};
22#[cfg(not(target_arch = "wasm32"))]
23use reifydb_type::error::Diagnostic;
24use reifydb_type::{
25 error::Error,
26 params::Params,
27 value::{Value, frame::frame::Frame, identity::IdentityId, r#type::Type},
28};
29use tracing::instrument;
30
31#[cfg(not(target_arch = "wasm32"))]
32use crate::remote::{self, RemoteRegistry};
33use crate::{
34 Result,
35 policy::PolicyEvaluator,
36 procedure::registry::Procedures,
37 transform::registry::Transforms,
38 vm::{
39 Admin, Command, Query, Subscription,
40 services::Services,
41 stack::{SymbolTable, Variable},
42 vm::Vm,
43 },
44};
45
46pub struct Executor(Arc<Services>);
48
49impl Clone for Executor {
50 fn clone(&self) -> Self {
51 Self(self.0.clone())
52 }
53}
54
55impl Deref for Executor {
56 type Target = Services;
57
58 fn deref(&self) -> &Self::Target {
59 &self.0
60 }
61}
62
63impl Executor {
64 pub fn new(
65 catalog: Catalog,
66 clock: Clock,
67 functions: Functions,
68 procedures: Procedures,
69 transforms: Transforms,
70 flow_operator_store: FlowOperatorStore,
71 stats_reader: MetricReader<SingleStore>,
72 ioc: IocContainer,
73 #[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
74 ) -> Self {
75 Self(Arc::new(Services::new(
76 catalog,
77 clock,
78 functions,
79 procedures,
80 transforms,
81 flow_operator_store,
82 stats_reader,
83 ioc,
84 #[cfg(not(target_arch = "wasm32"))]
85 remote_registry,
86 )))
87 }
88
89 pub fn services(&self) -> &Arc<Services> {
91 &self.0
92 }
93
94 pub fn from_services(services: Arc<Services>) -> Self {
96 Self(services)
97 }
98
99 #[allow(dead_code)]
100 pub fn testing() -> Self {
101 Self(Services::testing())
102 }
103
104 #[cfg(not(target_arch = "wasm32"))]
107 fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
108 if let Some(ref registry) = self.0.remote_registry {
109 if remote::is_remote_query(err) {
110 if let Some(address) = remote::extract_remote_address(err) {
111 return registry.forward_query(&address, rql, params).map(Some);
112 }
113 }
114 }
115 Ok(None)
116 }
117}
118
119fn populate_stack(stack: &mut SymbolTable, params: &Params) -> Result<()> {
121 match params {
122 Params::Positional(values) => {
123 for (index, value) in values.iter().enumerate() {
124 let param_name = (index + 1).to_string();
125 stack.set(param_name, Variable::scalar(value.clone()), false)?;
126 }
127 }
128 Params::Named(map) => {
129 for (name, value) in map {
130 stack.set(name.clone(), Variable::scalar(value.clone()), false)?;
131 }
132 }
133 Params::None => {}
134 }
135 Ok(())
136}
137
138fn populate_identity(
141 stack: &mut SymbolTable,
142 catalog: &Catalog,
143 tx: &mut Transaction<'_>,
144 identity: IdentityId,
145) -> Result<()> {
146 if identity.is_privileged() {
147 return Ok(());
148 }
149 if identity.is_anonymous() {
150 let columns = Columns::single_row([
151 ("id", Value::IdentityId(identity)),
152 ("name", Value::none_of(Type::Utf8)),
153 ("roles", Value::List(vec![])),
154 ]);
155 stack.set("identity".to_string(), Variable::Columns(columns), false)?;
156 return Ok(());
157 }
158 if let Some(user) = catalog.find_user_by_identity(tx, identity)? {
159 let roles = catalog.find_role_names_for_identity(tx, identity)?;
160 let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
161 let columns = Columns::single_row([
162 ("id", Value::IdentityId(identity)),
163 ("name", Value::Utf8(user.name)),
164 ("roles", Value::List(role_values)),
165 ]);
166 stack.set("identity".to_string(), Variable::Columns(columns), false)?;
167 }
168 Ok(())
169}
170
171impl Executor {
172 #[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
177 pub fn rql(
178 &self,
179 tx: &mut Transaction<'_>,
180 identity: IdentityId,
181 rql: &str,
182 params: Params,
183 ) -> Result<Vec<Frame>> {
184 let mut result = vec![];
185 let mut symbol_table = SymbolTable::new();
186 populate_stack(&mut symbol_table, ¶ms)?;
187 populate_identity(&mut symbol_table, &self.catalog, tx, identity)?;
188
189 let compiled = match self.compiler.compile_with_policy(tx, rql, |plans, bump, cat, tx| {
190 inject_read_policies(plans, bump, cat, tx, identity)
191 }) {
192 Ok(CompilationResult::Ready(compiled)) => compiled,
193 Ok(CompilationResult::Incremental(_)) => {
194 unreachable!("incremental compilation not supported in rql()")
195 }
196 Err(err) => {
197 #[cfg(not(target_arch = "wasm32"))]
198 if let Some(frames) = self.try_forward_remote_query(&err, rql, params)? {
199 return Ok(frames);
200 }
201 return Err(err);
202 }
203 };
204
205 for compiled in compiled.iter() {
206 result.clear();
207 let mut vm = Vm::new(symbol_table, identity);
208 vm.run(&self.0, tx, &compiled.instructions, ¶ms, &mut result)?;
209 symbol_table = vm.symbol_table;
210 }
211
212 Ok(result)
213 }
214
215 #[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
216 pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> Result<Vec<Frame>> {
217 let mut result = vec![];
218 let mut output_results: Vec<Frame> = Vec::new();
219 let mut symbol_table = SymbolTable::new();
220 populate_stack(&mut symbol_table, &cmd.params)?;
221
222 let identity = cmd.identity;
223 populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Admin(&mut *txn), identity)?;
224
225 PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
226 &mut Transaction::Admin(&mut *txn),
227 identity,
228 "admin",
229 true,
230 )?;
231
232 match self.compiler.compile_with_policy(
233 &mut Transaction::Admin(txn),
234 cmd.rql,
235 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
236 ) {
237 Err(err) => {
238 #[cfg(not(target_arch = "wasm32"))]
239 if let Some(frames) = self.try_forward_remote_query(&err, cmd.rql, cmd.params)? {
240 return Ok(frames);
241 }
242 return Err(err);
243 }
244 Ok(CompilationResult::Ready(compiled)) => {
245 for compiled in compiled.iter() {
246 result.clear();
247 let mut tx = Transaction::Admin(txn);
248 let mut vm = Vm::new(symbol_table, identity);
249 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
250 symbol_table = vm.symbol_table;
251
252 if compiled.is_output {
253 output_results.append(&mut result);
254 }
255 }
256 }
257 Ok(CompilationResult::Incremental(mut state)) => {
258 let policy = constrain_policy(|plans, bump, cat, tx| {
259 inject_read_policies(plans, bump, cat, tx, identity)
260 });
261 while let Some(compiled) = self.compiler.compile_next_with_policy(
262 &mut Transaction::Admin(txn),
263 &mut state,
264 &policy,
265 )? {
266 result.clear();
267 let mut tx = Transaction::Admin(txn);
268 let mut vm = Vm::new(symbol_table, identity);
269 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
270 symbol_table = vm.symbol_table;
271
272 if compiled.is_output {
273 output_results.append(&mut result);
274 }
275 }
276 }
277 }
278
279 let mut final_result = output_results;
280 final_result.append(&mut result);
281 Ok(final_result)
282 }
283
284 #[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
285 pub fn subscription(&self, txn: &mut SubscriptionTransaction, cmd: Subscription<'_>) -> Result<Vec<Frame>> {
286 let bump = Bump::new();
288 let statements = parse_str(&bump, cmd.rql)?;
289
290 if statements.len() != 1 {
291 return Err(Error(subscription::single_statement_required(
292 "Subscription endpoint requires exactly one statement",
293 )));
294 }
295
296 let statement = &statements[0];
297 if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
298 return Err(Error(subscription::invalid_statement(
299 "Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
300 )));
301 }
302
303 let mut result = vec![];
305 let mut output_results: Vec<Frame> = Vec::new();
306 let mut symbol_table = SymbolTable::new();
307 populate_stack(&mut symbol_table, &cmd.params)?;
308
309 let identity = cmd.identity;
310 populate_identity(
311 &mut symbol_table,
312 &self.catalog,
313 &mut Transaction::Subscription(&mut *txn),
314 identity,
315 )?;
316
317 PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
318 &mut Transaction::Subscription(&mut *txn),
319 identity,
320 "subscription",
321 true,
322 )?;
323
324 let compiled = match self.compiler.compile_with_policy(
325 &mut Transaction::Subscription(txn),
326 cmd.rql,
327 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
328 ) {
329 Ok(CompilationResult::Ready(compiled)) => compiled,
330 Ok(CompilationResult::Incremental(_)) => {
331 unreachable!("Single subscription statement should not require incremental compilation")
332 }
333 Err(err) => return Err(err),
334 };
335
336 for compiled in compiled.iter() {
337 result.clear();
338 let mut tx = Transaction::Subscription(txn);
339 let mut vm = Vm::new(symbol_table, identity);
340 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
341 symbol_table = vm.symbol_table;
342
343 if compiled.is_output {
344 output_results.append(&mut result);
345 }
346 }
347
348 let mut final_result = output_results;
349 final_result.append(&mut result);
350 Ok(final_result)
351 }
352
353 #[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
354 pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> Result<Vec<Frame>> {
355 let mut result = vec![];
356 let mut output_results: Vec<Frame> = Vec::new();
357 let mut symbol_table = SymbolTable::new();
358 populate_stack(&mut symbol_table, &cmd.params)?;
359
360 let identity = cmd.identity;
361 populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Command(&mut *txn), identity)?;
362
363 PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
364 &mut Transaction::Command(&mut *txn),
365 identity,
366 "command",
367 false,
368 )?;
369
370 let compiled = match self.compiler.compile_with_policy(
371 &mut Transaction::Command(txn),
372 cmd.rql,
373 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
374 ) {
375 Ok(CompilationResult::Ready(compiled)) => compiled,
376 Ok(CompilationResult::Incremental(_)) => {
377 unreachable!("DDL statements require admin transactions, not command transactions")
378 }
379 Err(err) => {
380 #[cfg(not(target_arch = "wasm32"))]
381 if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
382 return Err(Error(Diagnostic {
383 code: "REMOTE_002".to_string(),
384 message: "Write operations on remote namespaces are not supported"
385 .to_string(),
386 help: Some("Use the remote instance directly for write operations"
387 .to_string()),
388 ..Default::default()
389 }));
390 }
391 return Err(err);
392 }
393 };
394
395 for compiled in compiled.iter() {
396 result.clear();
397 let mut tx = Transaction::Command(txn);
398 let mut vm = Vm::new(symbol_table, identity);
399 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
400 symbol_table = vm.symbol_table;
401
402 if compiled.is_output {
403 output_results.append(&mut result);
404 }
405 }
406
407 let mut final_result = output_results;
408 final_result.append(&mut result);
409 Ok(final_result)
410 }
411
412 #[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
414 pub fn call_procedure(
415 &self,
416 txn: &mut CommandTransaction,
417 identity: IdentityId,
418 name: &str,
419 params: &Params,
420 ) -> Result<Vec<Frame>> {
421 let rql = format!("CALL {}()", name);
423 let mut result = vec![];
424 let mut symbol_table = SymbolTable::new();
425 populate_stack(&mut symbol_table, params)?;
426 populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Command(&mut *txn), identity)?;
427
428 let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql)? {
429 CompilationResult::Ready(compiled) => compiled,
430 CompilationResult::Incremental(_) => {
431 unreachable!("CALL statements should not require incremental compilation")
432 }
433 };
434
435 for compiled in compiled.iter() {
436 result.clear();
437 let mut tx = Transaction::Command(txn);
438 let mut vm = Vm::new(symbol_table, identity);
439 vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result)?;
440 symbol_table = vm.symbol_table;
441 }
442
443 Ok(result)
444 }
445
446 #[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
447 pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> Result<Vec<Frame>> {
448 let mut result = vec![];
449 let mut output_results: Vec<Frame> = Vec::new();
450 let mut symbol_table = SymbolTable::new();
451 populate_stack(&mut symbol_table, &qry.params)?;
452
453 let identity = qry.identity;
454 populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Query(&mut *txn), identity)?;
455
456 PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
457 &mut Transaction::Query(&mut *txn),
458 identity,
459 "query",
460 false,
461 )?;
462
463 let compiled = match self.compiler.compile_with_policy(
464 &mut Transaction::Query(txn),
465 qry.rql,
466 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
467 ) {
468 Ok(CompilationResult::Ready(compiled)) => compiled,
469 Ok(CompilationResult::Incremental(_)) => {
470 unreachable!("DDL statements require admin transactions, not query transactions")
471 }
472 Err(err) => {
473 #[cfg(not(target_arch = "wasm32"))]
474 if let Some(frames) = self.try_forward_remote_query(&err, qry.rql, qry.params)? {
475 return Ok(frames);
476 }
477 return Err(err);
478 }
479 };
480
481 for compiled in compiled.iter() {
482 result.clear();
483 let mut tx = Transaction::Query(txn);
484 let mut vm = Vm::new(symbol_table, identity);
485 vm.run(&self.0, &mut tx, &compiled.instructions, &qry.params, &mut result)?;
486 symbol_table = vm.symbol_table;
487
488 if compiled.is_output {
489 output_results.append(&mut result);
490 }
491 }
492
493 let mut final_result = output_results;
494 final_result.append(&mut result);
495 Ok(final_result)
496 }
497}