1use std::{ops::Deref, result::Result as StdResult, sync::Arc, time::Duration};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{
9 error::diagnostic::subscription,
10 execution::ExecutionResult,
11 interface::catalog::policy::SessionOp,
12 metric::{ExecutionMetrics, StatementMetric},
13 value::column::columns::Columns,
14};
15use reifydb_metric::storage::metric::MetricReader;
16use reifydb_policy::inject_from_policies;
17use reifydb_rql::{
18 ast::parse_str,
19 compiler::{CompilationResult, Compiled, IncrementalCompilation, constrain_policy},
20 fingerprint::request::fingerprint_request,
21};
22use reifydb_runtime::context::clock::Instant;
23use reifydb_store_single::SingleStore;
24use reifydb_transaction::transaction::{
25 RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
26 query::QueryTransaction,
27};
28#[cfg(not(reifydb_single_threaded))]
29use reifydb_type::error::Diagnostic;
30use reifydb_type::{
31 error::Error,
32 params::Params,
33 value::{Value, frame::frame::Frame, r#type::Type},
34};
35use tracing::instrument;
36
37#[cfg(not(reifydb_single_threaded))]
38use crate::remote;
39use crate::{
40 Result,
41 policy::PolicyEvaluator,
42 vm::{
43 Admin, Command, Query, Subscription, Test,
44 services::{EngineConfig, Services},
45 stack::{SymbolTable, Variable},
46 vm::Vm,
47 },
48};
49
50pub struct Executor(Arc<Services>);
52
53impl Clone for Executor {
54 fn clone(&self) -> Self {
55 Self(self.0.clone())
56 }
57}
58
59impl Deref for Executor {
60 type Target = Services;
61
62 fn deref(&self) -> &Self::Target {
63 &self.0
64 }
65}
66
67impl Executor {
68 pub fn new(
69 catalog: Catalog,
70 config: EngineConfig,
71 flow_operator_store: SystemFlowOperatorStore,
72 stats_reader: MetricReader<SingleStore>,
73 ) -> Self {
74 Self(Arc::new(Services::new(catalog, config, flow_operator_store, stats_reader)))
75 }
76
77 pub fn services(&self) -> &Arc<Services> {
79 &self.0
80 }
81
82 pub fn from_services(services: Arc<Services>) -> Self {
84 Self(services)
85 }
86
87 #[allow(dead_code)]
88 pub fn testing() -> Self {
89 Self(Services::testing())
90 }
91
92 #[cfg(not(reifydb_single_threaded))]
95 fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
96 if let Some(ref registry) = self.0.remote_registry
97 && remote::is_remote_query(err)
98 && let Some(address) = remote::extract_remote_address(err)
99 {
100 let token = remote::extract_remote_token(err);
101 return registry.forward_query(&address, rql, params, token.as_deref()).map(Some);
102 }
103 Ok(None)
104 }
105}
106
107impl RqlExecutor for Executor {
108 fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
109 Executor::rql(self, tx, rql, params)
110 }
111}
112
113fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
115 match params {
116 Params::Positional(values) => {
117 for (index, value) in values.iter().enumerate() {
118 let param_name = (index + 1).to_string();
119 symbols.set(param_name, Variable::scalar(value.clone()), false)?;
120 }
121 }
122 Params::Named(map) => {
123 for (name, value) in map.iter() {
124 symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
125 }
126 }
127 Params::None => {}
128 }
129 Ok(())
130}
131
132fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
135 let identity = tx.identity();
136 if identity.is_privileged() {
137 return Ok(());
138 }
139 if identity.is_anonymous() {
140 let columns = Columns::single_row([
141 ("id", Value::IdentityId(identity)),
142 ("name", Value::none_of(Type::Utf8)),
143 ("roles", Value::List(vec![])),
144 ]);
145 symbols.set("identity".to_string(), Variable::columns(columns), false)?;
146 return Ok(());
147 }
148 if let Some(user) = catalog.find_identity(tx, identity)? {
149 let roles = catalog.find_role_names_for_identity(tx, identity)?;
150 let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
151 let columns = Columns::single_row([
152 ("id", Value::IdentityId(identity)),
153 ("name", Value::Utf8(user.name)),
154 ("roles", Value::List(role_values)),
155 ]);
156 symbols.set("identity".to_string(), Variable::columns(columns), false)?;
157 }
158 Ok(())
159}
160
161type CompiledUnitsResult = (Vec<Frame>, Vec<Frame>, SymbolTable, Vec<StatementMetric>);
163
164struct ExecutionFailure {
166 error: Error,
167 partial_metrics: Vec<StatementMetric>,
168}
169
170fn build_metrics(statements: Vec<StatementMetric>) -> ExecutionMetrics {
172 let fps: Vec<_> = statements.iter().map(|m| m.fingerprint).collect();
173 ExecutionMetrics {
174 fingerprint: fingerprint_request(&fps),
175 statements,
176 ..Default::default()
177 }
178}
179
180fn execute_compiled_units(
182 services: &Arc<Services>,
183 tx: &mut Transaction<'_>,
184 compiled_list: &[Compiled],
185 params: &Params,
186 mut symbols: SymbolTable,
187 compile_duration: Duration,
188) -> StdResult<CompiledUnitsResult, ExecutionFailure> {
189 let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
190 let mut result = vec![];
191 let mut output_results: Vec<Frame> = Vec::new();
192 let mut metrics = Vec::new();
193
194 for compiled in compiled_list.iter() {
195 result.clear();
196 let mut vm = Vm::from_services(symbols, services, params, tx.identity());
197 let start = services.runtime_context.clock.instant();
198 let run_result = vm.run(services, tx, &compiled.instructions, &mut result);
199 let execute_duration = start.elapsed();
200 symbols = vm.symbols;
201
202 metrics.push(StatementMetric {
203 fingerprint: compiled.fingerprint,
204 normalized_rql: compiled.normalized_rql.clone(),
205 compile_duration_us,
206 execute_duration_us: execute_duration.as_micros() as u64,
207 rows_affected: if run_result.is_ok() {
208 extract_rows_affected(&result)
209 } else {
210 0
211 },
212 });
213
214 if let Err(error) = run_result {
215 return Err(ExecutionFailure {
216 error,
217 partial_metrics: metrics,
218 });
219 }
220
221 if compiled.is_output {
222 output_results.append(&mut result);
223 }
224 }
225
226 Ok((output_results, result, symbols, metrics))
227}
228
229fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
231 output_results.append(&mut remaining);
232 output_results
233}
234
235#[inline]
236fn error_result(error: Error, metrics: ExecutionMetrics) -> ExecutionResult {
237 ExecutionResult {
238 frames: vec![],
239 error: Some(error),
240 metrics,
241 }
242}
243
244fn extract_rows_affected(result: &[Frame]) -> u64 {
251 if result.len() == 1 {
252 let frame = &result[0];
253 for col in &frame.columns {
254 match col.name.as_str() {
255 "inserted" | "updated" | "deleted" => {
256 if col.data.len() == 1
257 && let Value::Uint8(n) = col.data.get_value(0)
258 {
259 return n;
260 }
261 }
262 _ => {}
263 }
264 }
265 }
266 result.len() as u64
267}
268
269impl Executor {
270 fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
272 let mut symbols = SymbolTable::new();
273 populate_symbols(&mut symbols, params)?;
274 populate_identity(&mut symbols, &self.catalog, tx)?;
275 Ok(symbols)
276 }
277
278 #[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
283 pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
284 let mut symbols = match self.setup_symbols(¶ms, tx) {
285 Ok(s) => s,
286 Err(e) => {
287 return ExecutionResult {
288 frames: vec![],
289 error: Some(e),
290 metrics: ExecutionMetrics::default(),
291 };
292 }
293 };
294
295 let start_compile = self.0.runtime_context.clock.instant();
296 let compiled_list = match self.compiler.compile_with_policy(tx, rql, inject_from_policies) {
297 Ok(CompilationResult::Ready(compiled)) => compiled,
298 Ok(CompilationResult::Incremental(_)) => {
299 unreachable!("incremental compilation not supported in rql()")
300 }
301 Err(err) => {
302 #[cfg(not(reifydb_single_threaded))]
303 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
304 return ExecutionResult {
305 frames,
306 error: None,
307 metrics: ExecutionMetrics::default(),
308 };
309 }
310 return ExecutionResult {
311 frames: vec![],
312 error: Some(err),
313 metrics: ExecutionMetrics::default(),
314 };
315 }
316 };
317 let compile_duration = start_compile.elapsed();
318 let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
319
320 let mut result = vec![];
321 let mut metrics = Vec::new();
322 for compiled in compiled_list.iter() {
323 result.clear();
324 let mut vm = Vm::from_services(symbols, &self.0, ¶ms, tx.identity());
325 let start_execute = self.0.runtime_context.clock.instant();
326 let run_result = vm.run(&self.0, tx, &compiled.instructions, &mut result);
327 let execute_duration = start_execute.elapsed();
328 symbols = vm.symbols;
329
330 metrics.push(StatementMetric {
331 fingerprint: compiled.fingerprint,
332 normalized_rql: compiled.normalized_rql.clone(),
333 compile_duration_us,
334 execute_duration_us: execute_duration.as_micros() as u64,
335 rows_affected: if run_result.is_ok() {
336 extract_rows_affected(&result)
337 } else {
338 0
339 },
340 });
341
342 if let Err(e) = run_result {
343 return ExecutionResult {
344 frames: vec![],
345 error: Some(e),
346 metrics: build_metrics(metrics),
347 };
348 }
349 }
350
351 ExecutionResult {
352 frames: result,
353 error: None,
354 metrics: build_metrics(metrics),
355 }
356 }
357
358 #[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
359 pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> ExecutionResult {
360 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn)) {
361 Ok(s) => s,
362 Err(e) => return error_result(e, ExecutionMetrics::default()),
363 };
364 if let Err(e) = self.enforce_admin_policy(&symbols, txn) {
365 return error_result(e, ExecutionMetrics::default());
366 }
367 let start_compile = self.0.runtime_context.clock.instant();
368 match self.compiler.compile_with_policy(&mut Transaction::Admin(txn), cmd.rql, inject_from_policies) {
369 Err(err) => self.handle_admin_compile_error(err, cmd.rql, cmd.params),
370 Ok(CompilationResult::Ready(compiled)) => {
371 self.execute_admin_ready(txn, compiled, &cmd.params, symbols, start_compile)
372 }
373 Ok(CompilationResult::Incremental(state)) => {
374 self.execute_admin_incremental(txn, state, &cmd.params, symbols)
375 }
376 }
377 }
378
379 #[inline]
380 fn enforce_admin_policy(&self, symbols: &SymbolTable, txn: &mut AdminTransaction) -> Result<()> {
381 PolicyEvaluator::new(&self.0, symbols).enforce_session_policy(
382 &mut Transaction::Admin(txn),
383 SessionOp::Admin,
384 true,
385 )
386 }
387
388 #[inline]
389 #[cfg_attr(reifydb_single_threaded, allow(unused_variables))]
390 fn handle_admin_compile_error(&self, err: Error, rql: &str, params: Params) -> ExecutionResult {
391 #[cfg(not(reifydb_single_threaded))]
392 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
393 return ExecutionResult {
394 frames,
395 error: None,
396 metrics: ExecutionMetrics::default(),
397 };
398 }
399 error_result(err, ExecutionMetrics::default())
400 }
401
402 #[inline]
403 fn execute_admin_ready(
404 &self,
405 txn: &mut AdminTransaction,
406 compiled: Arc<Vec<Compiled>>,
407 params: &Params,
408 symbols: SymbolTable,
409 start_compile: Instant,
410 ) -> ExecutionResult {
411 let compile_duration = start_compile.elapsed();
412 match execute_compiled_units(
413 &self.0,
414 &mut Transaction::Admin(txn),
415 &compiled,
416 params,
417 symbols,
418 compile_duration,
419 ) {
420 Ok((output, remaining, _, metrics)) => ExecutionResult {
421 frames: merge_results(output, remaining),
422 error: None,
423 metrics: build_metrics(metrics),
424 },
425 Err(f) => ExecutionResult {
426 frames: vec![],
427 error: Some(f.error),
428 metrics: build_metrics(f.partial_metrics),
429 },
430 }
431 }
432
433 fn execute_admin_incremental(
434 &self,
435 txn: &mut AdminTransaction,
436 mut state: IncrementalCompilation,
437 params: &Params,
438 symbols: SymbolTable,
439 ) -> ExecutionResult {
440 let policy = constrain_policy(inject_from_policies);
441 let mut result = vec![];
442 let mut output_results: Vec<Frame> = Vec::new();
443 let mut symbols = symbols;
444 let mut metrics = Vec::new();
445 loop {
446 let start_incr = self.0.runtime_context.clock.instant();
447 let next = match self.compiler.compile_next_with_policy(
448 &mut Transaction::Admin(txn),
449 &mut state,
450 &policy,
451 ) {
452 Ok(n) => n,
453 Err(e) => return error_result(e, build_metrics(metrics)),
454 };
455 let compile_duration = start_incr.elapsed();
456
457 let Some(compiled) = next else {
458 break;
459 };
460
461 result.clear();
462 let mut tx = Transaction::Admin(txn);
463 let mut vm = Vm::from_services(symbols, &self.0, params, tx.identity());
464 let start_execute = self.0.runtime_context.clock.instant();
465 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
466 let execute_duration = start_execute.elapsed();
467 symbols = vm.symbols;
468
469 metrics.push(StatementMetric {
470 fingerprint: compiled.fingerprint,
471 normalized_rql: compiled.normalized_rql,
472 compile_duration_us: compile_duration.as_micros() as u64,
473 execute_duration_us: execute_duration.as_micros() as u64,
474 rows_affected: if run_result.is_ok() {
475 extract_rows_affected(&result)
476 } else {
477 0
478 },
479 });
480
481 if let Err(e) = run_result {
482 return error_result(e, build_metrics(metrics));
483 }
484
485 if compiled.is_output {
486 output_results.append(&mut result);
487 }
488 }
489 ExecutionResult {
490 frames: merge_results(output_results, result),
491 error: None,
492 metrics: build_metrics(metrics),
493 }
494 }
495
496 #[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
497 pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> ExecutionResult {
498 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Test(Box::new(txn.reborrow()))) {
499 Ok(s) => s,
500 Err(e) => {
501 return ExecutionResult {
502 frames: vec![],
503 error: Some(e),
504 metrics: ExecutionMetrics::default(),
505 };
506 }
507 };
508
509 let session_type = txn.session_type;
510 let session_default_deny = txn.session_default_deny;
511 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
512 &mut Transaction::Test(Box::new(txn.reborrow())),
513 session_type,
514 session_default_deny,
515 ) {
516 return ExecutionResult {
517 frames: vec![],
518 error: Some(e),
519 metrics: ExecutionMetrics::default(),
520 };
521 }
522
523 let start_compile = self.0.runtime_context.clock.instant();
524 match self.compiler.compile_with_policy(
525 &mut Transaction::Test(Box::new(txn.reborrow())),
526 cmd.rql,
527 inject_from_policies,
528 ) {
529 Err(err) => {
530 #[cfg(not(reifydb_single_threaded))]
531 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
532 return ExecutionResult {
533 frames,
534 error: None,
535 metrics: ExecutionMetrics::default(),
536 };
537 }
538 ExecutionResult {
539 frames: vec![],
540 error: Some(err),
541 metrics: ExecutionMetrics::default(),
542 }
543 }
544 Ok(CompilationResult::Ready(compiled)) => {
545 let compile_duration = start_compile.elapsed();
546 match execute_compiled_units(
547 &self.0,
548 &mut Transaction::Test(Box::new(txn.reborrow())),
549 &compiled,
550 &cmd.params,
551 symbols,
552 compile_duration,
553 ) {
554 Ok((output, remaining, _, metrics)) => ExecutionResult {
555 frames: merge_results(output, remaining),
556 error: None,
557 metrics: build_metrics(metrics),
558 },
559 Err(f) => ExecutionResult {
560 frames: vec![],
561 error: Some(f.error),
562 metrics: build_metrics(f.partial_metrics),
563 },
564 }
565 }
566 Ok(CompilationResult::Incremental(mut state)) => {
567 let policy = constrain_policy(|plans, bump, cat, tx| {
568 inject_from_policies(plans, bump, cat, tx)
569 });
570 let mut result = vec![];
571 let mut output_results: Vec<Frame> = Vec::new();
572 let mut symbols = symbols;
573 let mut metrics = Vec::new();
574 loop {
575 let start_incr = self.0.runtime_context.clock.instant();
576 let next = match self.compiler.compile_next_with_policy(
577 &mut Transaction::Test(Box::new(txn.reborrow())),
578 &mut state,
579 &policy,
580 ) {
581 Ok(n) => n,
582 Err(e) => {
583 return ExecutionResult {
584 frames: vec![],
585 error: Some(e),
586 metrics: build_metrics(metrics),
587 };
588 }
589 };
590 let compile_duration = start_incr.elapsed();
591
592 let Some(compiled) = next else {
593 break;
594 };
595
596 result.clear();
597 let mut tx = Transaction::Test(Box::new(txn.reborrow()));
598 let mut vm = Vm::from_services(symbols, &self.0, &cmd.params, tx.identity());
599 let start_execute = self.0.runtime_context.clock.instant();
600 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
601 let execute_duration = start_execute.elapsed();
602 symbols = vm.symbols;
603
604 metrics.push(StatementMetric {
605 fingerprint: compiled.fingerprint,
606 normalized_rql: compiled.normalized_rql,
607 compile_duration_us: compile_duration.as_micros() as u64,
608 execute_duration_us: execute_duration.as_micros() as u64,
609 rows_affected: if run_result.is_ok() {
610 extract_rows_affected(&result)
611 } else {
612 0
613 },
614 });
615
616 if let Err(e) = run_result {
617 return ExecutionResult {
618 frames: vec![],
619 error: Some(e),
620 metrics: build_metrics(metrics),
621 };
622 }
623
624 if compiled.is_output {
625 output_results.append(&mut result);
626 }
627 }
628 ExecutionResult {
629 frames: merge_results(output_results, result),
630 error: None,
631 metrics: build_metrics(metrics),
632 }
633 }
634 }
635 }
636
637 #[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
638 pub fn subscription(&self, txn: &mut QueryTransaction, cmd: Subscription<'_>) -> ExecutionResult {
639 let bump = Bump::new();
641 let statements = match parse_str(&bump, cmd.rql) {
642 Ok(s) => s,
643 Err(e) => {
644 return ExecutionResult {
645 frames: vec![],
646 error: Some(e),
647 metrics: ExecutionMetrics::default(),
648 };
649 }
650 };
651
652 if statements.len() != 1 {
653 return ExecutionResult {
654 frames: vec![],
655 error: Some(Error(Box::new(subscription::single_statement_required(
656 "Subscription endpoint requires exactly one statement",
657 )))),
658 metrics: ExecutionMetrics::default(),
659 };
660 }
661
662 let statement = &statements[0];
663 if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
664 return ExecutionResult {
665 frames: vec![],
666 error: Some(Error(Box::new(subscription::invalid_statement(
667 "Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
668 )))),
669 metrics: ExecutionMetrics::default(),
670 };
671 }
672
673 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Query(&mut *txn)) {
674 Ok(s) => s,
675 Err(e) => {
676 return ExecutionResult {
677 frames: vec![],
678 error: Some(e),
679 metrics: ExecutionMetrics::default(),
680 };
681 }
682 };
683
684 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
685 &mut Transaction::Query(&mut *txn),
686 SessionOp::Subscription,
687 true,
688 ) {
689 return ExecutionResult {
690 frames: vec![],
691 error: Some(e),
692 metrics: ExecutionMetrics::default(),
693 };
694 }
695
696 let start_compile = self.0.runtime_context.clock.instant();
697 let compiled = match self.compiler.compile_with_policy(
698 &mut Transaction::Query(txn),
699 cmd.rql,
700 inject_from_policies,
701 ) {
702 Ok(CompilationResult::Ready(compiled)) => compiled,
703 Ok(CompilationResult::Incremental(_)) => {
704 unreachable!("Single subscription statement should not require incremental compilation")
705 }
706 Err(err) => {
707 return ExecutionResult {
708 frames: vec![],
709 error: Some(err),
710 metrics: ExecutionMetrics::default(),
711 };
712 }
713 };
714 let compile_duration = start_compile.elapsed();
715
716 match execute_compiled_units(
717 &self.0,
718 &mut Transaction::Query(txn),
719 &compiled,
720 &cmd.params,
721 symbols,
722 compile_duration,
723 ) {
724 Ok((output, remaining, _, metrics)) => ExecutionResult {
725 frames: merge_results(output, remaining),
726 error: None,
727 metrics: build_metrics(metrics),
728 },
729 Err(f) => ExecutionResult {
730 frames: vec![],
731 error: Some(f.error),
732 metrics: build_metrics(f.partial_metrics),
733 },
734 }
735 }
736
737 #[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
738 pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> ExecutionResult {
739 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn)) {
740 Ok(s) => s,
741 Err(e) => {
742 return ExecutionResult {
743 frames: vec![],
744 error: Some(e),
745 metrics: ExecutionMetrics::default(),
746 };
747 }
748 };
749
750 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
751 &mut Transaction::Command(&mut *txn),
752 SessionOp::Command,
753 false,
754 ) {
755 return ExecutionResult {
756 frames: vec![],
757 error: Some(e),
758 metrics: ExecutionMetrics::default(),
759 };
760 }
761
762 let start_compile = self.0.runtime_context.clock.instant();
763 let compiled = match self.compiler.compile_with_policy(
764 &mut Transaction::Command(txn),
765 cmd.rql,
766 inject_from_policies,
767 ) {
768 Ok(CompilationResult::Ready(compiled)) => compiled,
769 Ok(CompilationResult::Incremental(_)) => {
770 unreachable!("DDL statements require admin transactions, not command transactions")
771 }
772 Err(err) => {
773 #[cfg(not(reifydb_single_threaded))]
774 if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
775 return ExecutionResult {
776 frames: vec![],
777 error: Some(Error(Box::new(Diagnostic {
778 code: "REMOTE_002".to_string(),
779 message: "Write operations on remote namespaces are not supported"
780 .to_string(),
781 help: Some("Use the remote instance directly for write operations"
782 .to_string()),
783 ..Default::default()
784 }))),
785 metrics: ExecutionMetrics::default(),
786 };
787 }
788 return ExecutionResult {
789 frames: vec![],
790 error: Some(err),
791 metrics: ExecutionMetrics::default(),
792 };
793 }
794 };
795 let compile_duration = start_compile.elapsed();
796
797 match execute_compiled_units(
798 &self.0,
799 &mut Transaction::Command(txn),
800 &compiled,
801 &cmd.params,
802 symbols,
803 compile_duration,
804 ) {
805 Ok((output, remaining, _, metrics)) => ExecutionResult {
806 frames: merge_results(output, remaining),
807 error: None,
808 metrics: build_metrics(metrics),
809 },
810 Err(f) => ExecutionResult {
811 frames: vec![],
812 error: Some(f.error),
813 metrics: build_metrics(f.partial_metrics),
814 },
815 }
816 }
817
818 #[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
820 pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> ExecutionResult {
821 let rql = format!("CALL {}()", name);
822 let symbols = match self.setup_symbols(params, &mut Transaction::Command(&mut *txn)) {
823 Ok(s) => s,
824 Err(e) => {
825 return ExecutionResult {
826 frames: vec![],
827 error: Some(e),
828 metrics: ExecutionMetrics::default(),
829 };
830 }
831 };
832
833 let start_compile = self.0.runtime_context.clock.instant();
834 let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql) {
835 Ok(CompilationResult::Ready(compiled)) => compiled,
836 Ok(CompilationResult::Incremental(_)) => {
837 unreachable!("CALL statements should not require incremental compilation")
838 }
839 Err(e) => {
840 return ExecutionResult {
841 frames: vec![],
842 error: Some(e),
843 metrics: ExecutionMetrics::default(),
844 };
845 }
846 };
847 let compile_duration = start_compile.elapsed();
848 let compile_duration_us = compile_duration.as_micros() as u64 / compiled.len().max(1) as u64;
849
850 let mut result = vec![];
851 let mut metrics = Vec::new();
852 let mut symbols = symbols;
853 for compiled in compiled.iter() {
854 result.clear();
855 let mut tx = Transaction::Command(txn);
856 let mut vm = Vm::from_services(symbols, &self.0, params, tx.identity());
857 let start_execute = self.0.runtime_context.clock.instant();
858 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
859 let execute_duration = start_execute.elapsed();
860 symbols = vm.symbols;
861
862 metrics.push(StatementMetric {
863 fingerprint: compiled.fingerprint,
864 normalized_rql: compiled.normalized_rql.clone(),
865 compile_duration_us,
866 execute_duration_us: execute_duration.as_micros() as u64,
867 rows_affected: if run_result.is_ok() {
868 extract_rows_affected(&result)
869 } else {
870 0
871 },
872 });
873
874 if let Err(e) = run_result {
875 return ExecutionResult {
876 frames: vec![],
877 error: Some(e),
878 metrics: build_metrics(metrics),
879 };
880 }
881 }
882
883 ExecutionResult {
884 frames: result,
885 error: None,
886 metrics: build_metrics(metrics),
887 }
888 }
889
890 #[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
891 pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> ExecutionResult {
892 let symbols = match self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn)) {
893 Ok(s) => s,
894 Err(e) => {
895 return ExecutionResult {
896 frames: vec![],
897 error: Some(e),
898 metrics: ExecutionMetrics::default(),
899 };
900 }
901 };
902
903 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
904 &mut Transaction::Query(&mut *txn),
905 SessionOp::Query,
906 false,
907 ) {
908 return ExecutionResult {
909 frames: vec![],
910 error: Some(e),
911 metrics: ExecutionMetrics::default(),
912 };
913 }
914
915 let start_compile = self.0.runtime_context.clock.instant();
916 let compiled = match self.compiler.compile_with_policy(
917 &mut Transaction::Query(txn),
918 qry.rql,
919 inject_from_policies,
920 ) {
921 Ok(CompilationResult::Ready(compiled)) => compiled,
922 Ok(CompilationResult::Incremental(_)) => {
923 unreachable!("DDL statements require admin transactions, not query transactions")
924 }
925 Err(err) => {
926 #[cfg(not(reifydb_single_threaded))]
927 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, qry.rql, qry.params) {
928 return ExecutionResult {
929 frames,
930 error: None,
931 metrics: ExecutionMetrics::default(),
932 };
933 }
934 return ExecutionResult {
935 frames: vec![],
936 error: Some(err),
937 metrics: ExecutionMetrics::default(),
938 };
939 }
940 };
941 let compile_duration = start_compile.elapsed();
942
943 let exec_result = execute_compiled_units(
944 &self.0,
945 &mut Transaction::Query(txn),
946 &compiled,
947 &qry.params,
948 symbols,
949 compile_duration,
950 );
951
952 match exec_result {
953 Ok((output, remaining, _, metrics)) => ExecutionResult {
954 frames: merge_results(output, remaining),
955 error: None,
956 metrics: build_metrics(metrics),
957 },
958 Err(f) => ExecutionResult {
959 frames: vec![],
960 error: Some(f.error),
961 metrics: build_metrics(f.partial_metrics),
962 },
963 }
964 }
965}