1use std::{ops::Deref, result::Result as StdResult, sync::Arc, time::Duration};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{
9 error::diagnostic::subscription,
10 execution::ExecutionResult,
11 interface::catalog::policy::SessionOp,
12 metric::{ExecutionMetrics, StatementMetric},
13 value::column::columns::Columns,
14};
15use reifydb_metric::storage::metric::MetricReader;
16use reifydb_policy::inject_from_policies;
17use reifydb_rql::{
18 ast::parse_str,
19 compiler::{CompilationResult, Compiled, IncrementalCompilation, constrain_policy},
20 fingerprint::request::fingerprint_request,
21};
22use reifydb_runtime::context::clock::Instant;
23use reifydb_store_single::SingleStore;
24use reifydb_transaction::transaction::{
25 RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
26 query::QueryTransaction,
27};
28#[cfg(not(reifydb_single_threaded))]
29use reifydb_type::error::Diagnostic;
30use reifydb_type::{
31 error::Error,
32 params::Params,
33 value::{Value, frame::frame::Frame, r#type::Type},
34};
35use tracing::instrument;
36
37#[cfg(not(reifydb_single_threaded))]
38use crate::remote;
39use crate::{
40 Result,
41 policy::PolicyEvaluator,
42 vm::{
43 Admin, Command, Query, Subscription, Test,
44 services::{EngineConfig, Services},
45 stack::{SymbolTable, Variable},
46 vm::Vm,
47 },
48};
49
50pub struct Executor(Arc<Services>);
51
52impl Clone for Executor {
53 fn clone(&self) -> Self {
54 Self(self.0.clone())
55 }
56}
57
58impl Deref for Executor {
59 type Target = Services;
60
61 fn deref(&self) -> &Self::Target {
62 &self.0
63 }
64}
65
66impl Executor {
67 pub fn new(
68 catalog: Catalog,
69 config: EngineConfig,
70 flow_operator_store: SystemFlowOperatorStore,
71 stats_reader: MetricReader<SingleStore>,
72 ) -> Self {
73 Self(Arc::new(Services::new(catalog, config, flow_operator_store, stats_reader)))
74 }
75
76 pub fn services(&self) -> &Arc<Services> {
77 &self.0
78 }
79
80 pub fn from_services(services: Arc<Services>) -> Self {
81 Self(services)
82 }
83
84 #[allow(dead_code)]
85 pub fn testing() -> Self {
86 Self(Services::testing())
87 }
88
89 #[cfg(not(reifydb_single_threaded))]
90 fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
91 if let Some(ref registry) = self.0.remote_registry
92 && remote::is_remote_query(err)
93 && let Some(address) = remote::extract_remote_address(err)
94 {
95 let token = remote::extract_remote_token(err);
96 return registry.forward_query(&address, rql, params, token.as_deref()).map(Some);
97 }
98 Ok(None)
99 }
100}
101
102impl RqlExecutor for Executor {
103 fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
104 Executor::rql(self, tx, rql, params)
105 }
106}
107
108fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
109 match params {
110 Params::Positional(values) => {
111 for (index, value) in values.iter().enumerate() {
112 let param_name = (index + 1).to_string();
113 symbols.set(param_name, Variable::scalar(value.clone()), false)?;
114 }
115 }
116 Params::Named(map) => {
117 for (name, value) in map.iter() {
118 symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
119 }
120 }
121 Params::None => {}
122 }
123 Ok(())
124}
125
126fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
127 let identity = tx.identity();
128 if identity.is_privileged() {
129 return Ok(());
130 }
131 if identity.is_anonymous() {
132 let columns = Columns::single_row([
133 ("id", Value::IdentityId(identity)),
134 ("name", Value::none_of(Type::Utf8)),
135 ("roles", Value::List(vec![])),
136 ]);
137 symbols.set("identity".to_string(), Variable::columns(columns), false)?;
138 return Ok(());
139 }
140 if let Some(user) = catalog.find_identity(tx, identity)? {
141 let roles = catalog.find_role_names_for_identity(tx, identity)?;
142 let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
143 let columns = Columns::single_row([
144 ("id", Value::IdentityId(identity)),
145 ("name", Value::Utf8(user.name)),
146 ("roles", Value::List(role_values)),
147 ]);
148 symbols.set("identity".to_string(), Variable::columns(columns), false)?;
149 }
150 Ok(())
151}
152
153type CompiledUnitsResult = (Vec<Frame>, Vec<Frame>, SymbolTable, Vec<StatementMetric>);
154
155struct ExecutionFailure {
156 error: Error,
157 partial_metrics: Vec<StatementMetric>,
158}
159
160fn build_metrics(statements: Vec<StatementMetric>) -> ExecutionMetrics {
161 let fps: Vec<_> = statements.iter().map(|m| m.fingerprint).collect();
162 ExecutionMetrics {
163 fingerprint: fingerprint_request(&fps),
164 statements,
165 ..Default::default()
166 }
167}
168
169struct RunUnitOutcome {
170 symbols: SymbolTable,
171 run_result: Result<()>,
172 execute_duration_us: u64,
173}
174
175#[instrument(
176 name = "vm::run",
177 level = "debug",
178 skip_all,
179 fields(fingerprint = ?compiled.fingerprint, instr_count = compiled.instructions.len()),
180)]
181fn run_compiled_unit(
182 services: &Arc<Services>,
183 tx: &mut Transaction<'_>,
184 compiled: &Compiled,
185 params: &Params,
186 symbols: SymbolTable,
187 result: &mut Vec<Frame>,
188) -> RunUnitOutcome {
189 let mut vm = Vm::from_services(symbols, services, params, tx.identity());
190 let start = services.runtime_context.clock.instant();
191 let run_result = vm.run(services, tx, &compiled.instructions, result);
192 let execute_duration = start.elapsed();
193 RunUnitOutcome {
194 symbols: vm.symbols,
195 run_result,
196 execute_duration_us: execute_duration.as_micros() as u64,
197 }
198}
199
200#[instrument(
201 name = "executor::execute_units",
202 level = "debug",
203 skip_all,
204 fields(unit_count = compiled_list.len()),
205)]
206fn execute_compiled_units(
207 services: &Arc<Services>,
208 tx: &mut Transaction<'_>,
209 compiled_list: &[Compiled],
210 params: &Params,
211 mut symbols: SymbolTable,
212 compile_duration: Duration,
213) -> StdResult<CompiledUnitsResult, ExecutionFailure> {
214 let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
215 let mut result = vec![];
216 let mut output_results: Vec<Frame> = Vec::new();
217 let mut metrics = Vec::new();
218
219 for compiled in compiled_list.iter() {
220 result.clear();
221 let outcome = run_compiled_unit(services, tx, compiled, params, symbols, &mut result);
222 symbols = outcome.symbols;
223
224 metrics.push(StatementMetric {
225 fingerprint: compiled.fingerprint,
226 normalized_rql: compiled.normalized_rql.clone(),
227 compile_duration_us,
228 execute_duration_us: outcome.execute_duration_us,
229 rows_affected: if outcome.run_result.is_ok() {
230 extract_rows_affected(&result)
231 } else {
232 0
233 },
234 });
235
236 if let Err(error) = outcome.run_result {
237 return Err(ExecutionFailure {
238 error,
239 partial_metrics: metrics,
240 });
241 }
242
243 if compiled.is_output {
244 output_results.append(&mut result);
245 }
246 }
247
248 Ok((output_results, result, symbols, metrics))
249}
250
251fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
252 output_results.append(&mut remaining);
253 output_results
254}
255
256#[inline]
257fn error_result(error: Error, metrics: ExecutionMetrics) -> ExecutionResult {
258 ExecutionResult {
259 frames: vec![],
260 error: Some(error),
261 metrics,
262 }
263}
264
265fn extract_rows_affected(result: &[Frame]) -> u64 {
266 if result.len() == 1 {
267 let frame = &result[0];
268 for col in &frame.columns {
269 match col.name.as_str() {
270 "inserted" | "updated" | "deleted" => {
271 if col.data.len() == 1
272 && let Value::Uint8(n) = col.data.get_value(0)
273 {
274 return n;
275 }
276 }
277 _ => {}
278 }
279 }
280 }
281 result.len() as u64
282}
283
284impl Executor {
285 #[instrument(name = "executor::setup_symbols", level = "debug", skip_all)]
286 fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
287 let mut symbols = SymbolTable::new();
288 populate_symbols(&mut symbols, params)?;
289 populate_identity(&mut symbols, &self.catalog, tx)?;
290 Ok(symbols)
291 }
292
293 #[instrument(name = "executor::compile", level = "debug", skip(self, tx), fields(rql = %rql))]
294 fn compile_query(&self, tx: &mut Transaction<'_>, rql: &str) -> Result<CompilationResult> {
295 self.compiler.compile_with_policy(tx, rql, inject_from_policies)
296 }
297
298 #[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
299 pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
300 let mut symbols = match self.setup_symbols(¶ms, tx) {
301 Ok(s) => s,
302 Err(e) => {
303 return ExecutionResult {
304 frames: vec![],
305 error: Some(e),
306 metrics: ExecutionMetrics::default(),
307 };
308 }
309 };
310
311 let start_compile = self.0.runtime_context.clock.instant();
312 let compiled_list = match self.compile_query(tx, rql) {
313 Ok(CompilationResult::Ready(compiled)) => compiled,
314 Ok(CompilationResult::Incremental(_)) => {
315 unreachable!("incremental compilation not supported in rql()")
316 }
317 Err(err) => {
318 #[cfg(not(reifydb_single_threaded))]
319 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
320 return ExecutionResult {
321 frames,
322 error: None,
323 metrics: ExecutionMetrics::default(),
324 };
325 }
326 return ExecutionResult {
327 frames: vec![],
328 error: Some(err),
329 metrics: ExecutionMetrics::default(),
330 };
331 }
332 };
333 let compile_duration = start_compile.elapsed();
334 let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
335
336 let mut result = vec![];
337 let mut metrics = Vec::new();
338 for compiled in compiled_list.iter() {
339 result.clear();
340 let outcome = run_compiled_unit(&self.0, tx, compiled, ¶ms, symbols, &mut result);
341 symbols = outcome.symbols;
342
343 metrics.push(StatementMetric {
344 fingerprint: compiled.fingerprint,
345 normalized_rql: compiled.normalized_rql.clone(),
346 compile_duration_us,
347 execute_duration_us: outcome.execute_duration_us,
348 rows_affected: if outcome.run_result.is_ok() {
349 extract_rows_affected(&result)
350 } else {
351 0
352 },
353 });
354
355 if let Err(e) = outcome.run_result {
356 return ExecutionResult {
357 frames: vec![],
358 error: Some(e),
359 metrics: build_metrics(metrics),
360 };
361 }
362 }
363
364 ExecutionResult {
365 frames: result,
366 error: None,
367 metrics: build_metrics(metrics),
368 }
369 }
370
371 #[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
372 pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> ExecutionResult {
373 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn)) {
374 Ok(s) => s,
375 Err(e) => return error_result(e, ExecutionMetrics::default()),
376 };
377 if let Err(e) = self.enforce_admin_policy(&symbols, txn) {
378 return error_result(e, ExecutionMetrics::default());
379 }
380 let start_compile = self.0.runtime_context.clock.instant();
381 match self.compile_query(&mut Transaction::Admin(txn), cmd.rql) {
382 Err(err) => self.handle_admin_compile_error(err, cmd.rql, cmd.params),
383 Ok(CompilationResult::Ready(compiled)) => {
384 self.execute_admin_ready(txn, compiled, &cmd.params, symbols, start_compile)
385 }
386 Ok(CompilationResult::Incremental(state)) => {
387 self.execute_admin_incremental(txn, state, &cmd.params, symbols)
388 }
389 }
390 }
391
392 #[inline]
393 fn enforce_admin_policy(&self, symbols: &SymbolTable, txn: &mut AdminTransaction) -> Result<()> {
394 PolicyEvaluator::new(&self.0, symbols).enforce_session_policy(
395 &mut Transaction::Admin(txn),
396 SessionOp::Admin,
397 true,
398 )
399 }
400
401 #[inline]
402 #[cfg_attr(reifydb_single_threaded, allow(unused_variables))]
403 fn handle_admin_compile_error(&self, err: Error, rql: &str, params: Params) -> ExecutionResult {
404 #[cfg(not(reifydb_single_threaded))]
405 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
406 return ExecutionResult {
407 frames,
408 error: None,
409 metrics: ExecutionMetrics::default(),
410 };
411 }
412 error_result(err, ExecutionMetrics::default())
413 }
414
415 #[inline]
416 fn execute_admin_ready(
417 &self,
418 txn: &mut AdminTransaction,
419 compiled: Arc<Vec<Compiled>>,
420 params: &Params,
421 symbols: SymbolTable,
422 start_compile: Instant,
423 ) -> ExecutionResult {
424 let compile_duration = start_compile.elapsed();
425 match execute_compiled_units(
426 &self.0,
427 &mut Transaction::Admin(txn),
428 &compiled,
429 params,
430 symbols,
431 compile_duration,
432 ) {
433 Ok((output, remaining, _, metrics)) => ExecutionResult {
434 frames: merge_results(output, remaining),
435 error: None,
436 metrics: build_metrics(metrics),
437 },
438 Err(f) => ExecutionResult {
439 frames: vec![],
440 error: Some(f.error),
441 metrics: build_metrics(f.partial_metrics),
442 },
443 }
444 }
445
446 fn execute_admin_incremental(
447 &self,
448 txn: &mut AdminTransaction,
449 mut state: IncrementalCompilation,
450 params: &Params,
451 symbols: SymbolTable,
452 ) -> ExecutionResult {
453 let policy = constrain_policy(inject_from_policies);
454 let mut result = vec![];
455 let mut output_results: Vec<Frame> = Vec::new();
456 let mut symbols = symbols;
457 let mut metrics = Vec::new();
458 loop {
459 let start_incr = self.0.runtime_context.clock.instant();
460 let next = match self.compiler.compile_next_with_policy(
461 &mut Transaction::Admin(txn),
462 &mut state,
463 &policy,
464 ) {
465 Ok(n) => n,
466 Err(e) => return error_result(e, build_metrics(metrics)),
467 };
468 let compile_duration = start_incr.elapsed();
469
470 let Some(compiled) = next else {
471 break;
472 };
473
474 result.clear();
475 let mut tx = Transaction::Admin(txn);
476 let mut vm = Vm::from_services(symbols, &self.0, params, tx.identity());
477 let start_execute = self.0.runtime_context.clock.instant();
478 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
479 let execute_duration = start_execute.elapsed();
480 symbols = vm.symbols;
481
482 metrics.push(StatementMetric {
483 fingerprint: compiled.fingerprint,
484 normalized_rql: compiled.normalized_rql,
485 compile_duration_us: compile_duration.as_micros() as u64,
486 execute_duration_us: execute_duration.as_micros() as u64,
487 rows_affected: if run_result.is_ok() {
488 extract_rows_affected(&result)
489 } else {
490 0
491 },
492 });
493
494 if let Err(e) = run_result {
495 return error_result(e, build_metrics(metrics));
496 }
497
498 if compiled.is_output {
499 output_results.append(&mut result);
500 }
501 }
502 ExecutionResult {
503 frames: merge_results(output_results, result),
504 error: None,
505 metrics: build_metrics(metrics),
506 }
507 }
508
509 #[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
510 pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> ExecutionResult {
511 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Test(Box::new(txn.reborrow()))) {
512 Ok(s) => s,
513 Err(e) => {
514 return ExecutionResult {
515 frames: vec![],
516 error: Some(e),
517 metrics: ExecutionMetrics::default(),
518 };
519 }
520 };
521
522 let session_type = txn.session_type;
523 let session_default_deny = txn.session_default_deny;
524 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
525 &mut Transaction::Test(Box::new(txn.reborrow())),
526 session_type,
527 session_default_deny,
528 ) {
529 return ExecutionResult {
530 frames: vec![],
531 error: Some(e),
532 metrics: ExecutionMetrics::default(),
533 };
534 }
535
536 let start_compile = self.0.runtime_context.clock.instant();
537 match self.compiler.compile_with_policy(
538 &mut Transaction::Test(Box::new(txn.reborrow())),
539 cmd.rql,
540 inject_from_policies,
541 ) {
542 Err(err) => {
543 #[cfg(not(reifydb_single_threaded))]
544 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
545 return ExecutionResult {
546 frames,
547 error: None,
548 metrics: ExecutionMetrics::default(),
549 };
550 }
551 ExecutionResult {
552 frames: vec![],
553 error: Some(err),
554 metrics: ExecutionMetrics::default(),
555 }
556 }
557 Ok(CompilationResult::Ready(compiled)) => {
558 let compile_duration = start_compile.elapsed();
559 match execute_compiled_units(
560 &self.0,
561 &mut Transaction::Test(Box::new(txn.reborrow())),
562 &compiled,
563 &cmd.params,
564 symbols,
565 compile_duration,
566 ) {
567 Ok((output, remaining, _, metrics)) => ExecutionResult {
568 frames: merge_results(output, remaining),
569 error: None,
570 metrics: build_metrics(metrics),
571 },
572 Err(f) => ExecutionResult {
573 frames: vec![],
574 error: Some(f.error),
575 metrics: build_metrics(f.partial_metrics),
576 },
577 }
578 }
579 Ok(CompilationResult::Incremental(mut state)) => {
580 let policy = constrain_policy(|plans, bump, cat, tx| {
581 inject_from_policies(plans, bump, cat, tx)
582 });
583 let mut result = vec![];
584 let mut output_results: Vec<Frame> = Vec::new();
585 let mut symbols = symbols;
586 let mut metrics = Vec::new();
587 loop {
588 let start_incr = self.0.runtime_context.clock.instant();
589 let next = match self.compiler.compile_next_with_policy(
590 &mut Transaction::Test(Box::new(txn.reborrow())),
591 &mut state,
592 &policy,
593 ) {
594 Ok(n) => n,
595 Err(e) => {
596 return ExecutionResult {
597 frames: vec![],
598 error: Some(e),
599 metrics: build_metrics(metrics),
600 };
601 }
602 };
603 let compile_duration = start_incr.elapsed();
604
605 let Some(compiled) = next else {
606 break;
607 };
608
609 result.clear();
610 let mut tx = Transaction::Test(Box::new(txn.reborrow()));
611 let mut vm = Vm::from_services(symbols, &self.0, &cmd.params, tx.identity());
612 let start_execute = self.0.runtime_context.clock.instant();
613 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
614 let execute_duration = start_execute.elapsed();
615 symbols = vm.symbols;
616
617 metrics.push(StatementMetric {
618 fingerprint: compiled.fingerprint,
619 normalized_rql: compiled.normalized_rql,
620 compile_duration_us: compile_duration.as_micros() as u64,
621 execute_duration_us: execute_duration.as_micros() as u64,
622 rows_affected: if run_result.is_ok() {
623 extract_rows_affected(&result)
624 } else {
625 0
626 },
627 });
628
629 if let Err(e) = run_result {
630 return ExecutionResult {
631 frames: vec![],
632 error: Some(e),
633 metrics: build_metrics(metrics),
634 };
635 }
636
637 if compiled.is_output {
638 output_results.append(&mut result);
639 }
640 }
641 ExecutionResult {
642 frames: merge_results(output_results, result),
643 error: None,
644 metrics: build_metrics(metrics),
645 }
646 }
647 }
648 }
649
650 #[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
651 pub fn subscription(&self, txn: &mut QueryTransaction, cmd: Subscription<'_>) -> ExecutionResult {
652 let bump = Bump::new();
653 let statements = match parse_str(&bump, cmd.rql) {
654 Ok(s) => s,
655 Err(e) => {
656 return ExecutionResult {
657 frames: vec![],
658 error: Some(e),
659 metrics: ExecutionMetrics::default(),
660 };
661 }
662 };
663
664 if statements.len() != 1 {
665 return ExecutionResult {
666 frames: vec![],
667 error: Some(Error(Box::new(subscription::single_statement_required(
668 "Subscription endpoint requires exactly one statement",
669 )))),
670 metrics: ExecutionMetrics::default(),
671 };
672 }
673
674 let statement = &statements[0];
675 if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
676 return ExecutionResult {
677 frames: vec![],
678 error: Some(Error(Box::new(subscription::invalid_statement(
679 "Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
680 )))),
681 metrics: ExecutionMetrics::default(),
682 };
683 }
684
685 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Query(&mut *txn)) {
686 Ok(s) => s,
687 Err(e) => {
688 return ExecutionResult {
689 frames: vec![],
690 error: Some(e),
691 metrics: ExecutionMetrics::default(),
692 };
693 }
694 };
695
696 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
697 &mut Transaction::Query(&mut *txn),
698 SessionOp::Subscription,
699 true,
700 ) {
701 return ExecutionResult {
702 frames: vec![],
703 error: Some(e),
704 metrics: ExecutionMetrics::default(),
705 };
706 }
707
708 let start_compile = self.0.runtime_context.clock.instant();
709 let compiled = match self.compiler.compile_with_policy(
710 &mut Transaction::Query(txn),
711 cmd.rql,
712 inject_from_policies,
713 ) {
714 Ok(CompilationResult::Ready(compiled)) => compiled,
715 Ok(CompilationResult::Incremental(_)) => {
716 unreachable!("Single subscription statement should not require incremental compilation")
717 }
718 Err(err) => {
719 return ExecutionResult {
720 frames: vec![],
721 error: Some(err),
722 metrics: ExecutionMetrics::default(),
723 };
724 }
725 };
726 let compile_duration = start_compile.elapsed();
727
728 match execute_compiled_units(
729 &self.0,
730 &mut Transaction::Query(txn),
731 &compiled,
732 &cmd.params,
733 symbols,
734 compile_duration,
735 ) {
736 Ok((output, remaining, _, metrics)) => ExecutionResult {
737 frames: merge_results(output, remaining),
738 error: None,
739 metrics: build_metrics(metrics),
740 },
741 Err(f) => ExecutionResult {
742 frames: vec![],
743 error: Some(f.error),
744 metrics: build_metrics(f.partial_metrics),
745 },
746 }
747 }
748
749 #[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
750 pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> ExecutionResult {
751 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn)) {
752 Ok(s) => s,
753 Err(e) => {
754 return ExecutionResult {
755 frames: vec![],
756 error: Some(e),
757 metrics: ExecutionMetrics::default(),
758 };
759 }
760 };
761
762 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
763 &mut Transaction::Command(&mut *txn),
764 SessionOp::Command,
765 false,
766 ) {
767 return ExecutionResult {
768 frames: vec![],
769 error: Some(e),
770 metrics: ExecutionMetrics::default(),
771 };
772 }
773
774 let start_compile = self.0.runtime_context.clock.instant();
775 let compiled = match self.compile_query(&mut Transaction::Command(txn), cmd.rql) {
776 Ok(CompilationResult::Ready(compiled)) => compiled,
777 Ok(CompilationResult::Incremental(_)) => {
778 unreachable!("DDL statements require admin transactions, not command transactions")
779 }
780 Err(err) => {
781 #[cfg(not(reifydb_single_threaded))]
782 if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
783 return ExecutionResult {
784 frames: vec![],
785 error: Some(Error(Box::new(Diagnostic {
786 code: "REMOTE_002".to_string(),
787 message: "Write operations on remote namespaces are not supported"
788 .to_string(),
789 help: Some("Use the remote instance directly for write operations"
790 .to_string()),
791 ..Default::default()
792 }))),
793 metrics: ExecutionMetrics::default(),
794 };
795 }
796 return ExecutionResult {
797 frames: vec![],
798 error: Some(err),
799 metrics: ExecutionMetrics::default(),
800 };
801 }
802 };
803 let compile_duration = start_compile.elapsed();
804
805 match execute_compiled_units(
806 &self.0,
807 &mut Transaction::Command(txn),
808 &compiled,
809 &cmd.params,
810 symbols,
811 compile_duration,
812 ) {
813 Ok((output, remaining, _, metrics)) => ExecutionResult {
814 frames: merge_results(output, remaining),
815 error: None,
816 metrics: build_metrics(metrics),
817 },
818 Err(f) => ExecutionResult {
819 frames: vec![],
820 error: Some(f.error),
821 metrics: build_metrics(f.partial_metrics),
822 },
823 }
824 }
825
826 #[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
827 pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> ExecutionResult {
828 let rql = format!("CALL {}()", name);
829 let symbols = match self.setup_symbols(params, &mut Transaction::Command(&mut *txn)) {
830 Ok(s) => s,
831 Err(e) => {
832 return ExecutionResult {
833 frames: vec![],
834 error: Some(e),
835 metrics: ExecutionMetrics::default(),
836 };
837 }
838 };
839
840 let start_compile = self.0.runtime_context.clock.instant();
841 let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql) {
842 Ok(CompilationResult::Ready(compiled)) => compiled,
843 Ok(CompilationResult::Incremental(_)) => {
844 unreachable!("CALL statements should not require incremental compilation")
845 }
846 Err(e) => {
847 return ExecutionResult {
848 frames: vec![],
849 error: Some(e),
850 metrics: ExecutionMetrics::default(),
851 };
852 }
853 };
854 let compile_duration = start_compile.elapsed();
855 let compile_duration_us = compile_duration.as_micros() as u64 / compiled.len().max(1) as u64;
856
857 let mut result = vec![];
858 let mut metrics = Vec::new();
859 let mut symbols = symbols;
860 for compiled in compiled.iter() {
861 result.clear();
862 let mut tx = Transaction::Command(txn);
863 let mut vm = Vm::from_services(symbols, &self.0, params, tx.identity());
864 let start_execute = self.0.runtime_context.clock.instant();
865 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
866 let execute_duration = start_execute.elapsed();
867 symbols = vm.symbols;
868
869 metrics.push(StatementMetric {
870 fingerprint: compiled.fingerprint,
871 normalized_rql: compiled.normalized_rql.clone(),
872 compile_duration_us,
873 execute_duration_us: execute_duration.as_micros() as u64,
874 rows_affected: if run_result.is_ok() {
875 extract_rows_affected(&result)
876 } else {
877 0
878 },
879 });
880
881 if let Err(e) = run_result {
882 return ExecutionResult {
883 frames: vec![],
884 error: Some(e),
885 metrics: build_metrics(metrics),
886 };
887 }
888 }
889
890 ExecutionResult {
891 frames: result,
892 error: None,
893 metrics: build_metrics(metrics),
894 }
895 }
896
897 #[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
898 pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> ExecutionResult {
899 let symbols = match self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn)) {
900 Ok(s) => s,
901 Err(e) => {
902 return ExecutionResult {
903 frames: vec![],
904 error: Some(e),
905 metrics: ExecutionMetrics::default(),
906 };
907 }
908 };
909
910 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
911 &mut Transaction::Query(&mut *txn),
912 SessionOp::Query,
913 false,
914 ) {
915 return ExecutionResult {
916 frames: vec![],
917 error: Some(e),
918 metrics: ExecutionMetrics::default(),
919 };
920 }
921
922 let start_compile = self.0.runtime_context.clock.instant();
923 let compiled = match self.compile_query(&mut Transaction::Query(txn), qry.rql) {
924 Ok(CompilationResult::Ready(compiled)) => compiled,
925 Ok(CompilationResult::Incremental(_)) => {
926 unreachable!("DDL statements require admin transactions, not query transactions")
927 }
928 Err(err) => {
929 #[cfg(not(reifydb_single_threaded))]
930 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, qry.rql, qry.params) {
931 return ExecutionResult {
932 frames,
933 error: None,
934 metrics: ExecutionMetrics::default(),
935 };
936 }
937 return ExecutionResult {
938 frames: vec![],
939 error: Some(err),
940 metrics: ExecutionMetrics::default(),
941 };
942 }
943 };
944 let compile_duration = start_compile.elapsed();
945
946 let exec_result = execute_compiled_units(
947 &self.0,
948 &mut Transaction::Query(txn),
949 &compiled,
950 &qry.params,
951 symbols,
952 compile_duration,
953 );
954
955 match exec_result {
956 Ok((output, remaining, _, metrics)) => ExecutionResult {
957 frames: merge_results(output, remaining),
958 error: None,
959 metrics: build_metrics(metrics),
960 },
961 Err(f) => ExecutionResult {
962 frames: vec![],
963 error: Some(f.error),
964 metrics: build_metrics(f.partial_metrics),
965 },
966 }
967 }
968}