1use std::{ops::Deref, result::Result as StdResult, sync::Arc, time::Duration};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{
9 error::diagnostic::subscription,
10 execution::ExecutionResult,
11 interface::catalog::policy::SessionOp,
12 metric::{ExecutionMetrics, StatementMetric},
13 value::column::columns::Columns,
14};
15use reifydb_metric::storage::metric::MetricReader;
16use reifydb_policy::inject_from_policies;
17use reifydb_rql::{
18 ast::parse_str,
19 compiler::{CompilationResult, Compiled, constrain_policy},
20 fingerprint::request::fingerprint_request,
21};
22use reifydb_store_single::SingleStore;
23use reifydb_transaction::transaction::{
24 RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
25 query::QueryTransaction,
26};
27#[cfg(not(reifydb_single_threaded))]
28use reifydb_type::error::Diagnostic;
29use reifydb_type::{
30 error::Error,
31 params::Params,
32 value::{Value, frame::frame::Frame, r#type::Type},
33};
34use tracing::instrument;
35
36#[cfg(not(reifydb_single_threaded))]
37use crate::remote;
38use crate::{
39 Result,
40 policy::PolicyEvaluator,
41 vm::{
42 Admin, Command, Query, Subscription, Test,
43 services::{EngineConfig, Services},
44 stack::{SymbolTable, Variable},
45 vm::Vm,
46 },
47};
48
49pub struct Executor(Arc<Services>);
51
52impl Clone for Executor {
53 fn clone(&self) -> Self {
54 Self(self.0.clone())
55 }
56}
57
58impl Deref for Executor {
59 type Target = Services;
60
61 fn deref(&self) -> &Self::Target {
62 &self.0
63 }
64}
65
66impl Executor {
67 pub fn new(
68 catalog: Catalog,
69 config: EngineConfig,
70 flow_operator_store: SystemFlowOperatorStore,
71 stats_reader: MetricReader<SingleStore>,
72 ) -> Self {
73 Self(Arc::new(Services::new(catalog, config, flow_operator_store, stats_reader)))
74 }
75
76 pub fn services(&self) -> &Arc<Services> {
78 &self.0
79 }
80
81 pub fn from_services(services: Arc<Services>) -> Self {
83 Self(services)
84 }
85
86 #[allow(dead_code)]
87 pub fn testing() -> Self {
88 Self(Services::testing())
89 }
90
91 #[cfg(not(reifydb_single_threaded))]
94 fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
95 if let Some(ref registry) = self.0.remote_registry
96 && remote::is_remote_query(err)
97 && let Some(address) = remote::extract_remote_address(err)
98 {
99 let token = remote::extract_remote_token(err);
100 return registry.forward_query(&address, rql, params, token.as_deref()).map(Some);
101 }
102 Ok(None)
103 }
104}
105
106impl RqlExecutor for Executor {
107 fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
108 Executor::rql(self, tx, rql, params)
109 }
110}
111
112fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
114 match params {
115 Params::Positional(values) => {
116 for (index, value) in values.iter().enumerate() {
117 let param_name = (index + 1).to_string();
118 symbols.set(param_name, Variable::scalar(value.clone()), false)?;
119 }
120 }
121 Params::Named(map) => {
122 for (name, value) in map.iter() {
123 symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
124 }
125 }
126 Params::None => {}
127 }
128 Ok(())
129}
130
131fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
134 let identity = tx.identity();
135 if identity.is_privileged() {
136 return Ok(());
137 }
138 if identity.is_anonymous() {
139 let columns = Columns::single_row([
140 ("id", Value::IdentityId(identity)),
141 ("name", Value::none_of(Type::Utf8)),
142 ("roles", Value::List(vec![])),
143 ]);
144 symbols.set("identity".to_string(), Variable::columns(columns), false)?;
145 return Ok(());
146 }
147 if let Some(user) = catalog.find_identity(tx, identity)? {
148 let roles = catalog.find_role_names_for_identity(tx, identity)?;
149 let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
150 let columns = Columns::single_row([
151 ("id", Value::IdentityId(identity)),
152 ("name", Value::Utf8(user.name)),
153 ("roles", Value::List(role_values)),
154 ]);
155 symbols.set("identity".to_string(), Variable::columns(columns), false)?;
156 }
157 Ok(())
158}
159
160type CompiledUnitsResult = (Vec<Frame>, Vec<Frame>, SymbolTable, Vec<StatementMetric>);
162
163struct ExecutionFailure {
165 error: Error,
166 partial_metrics: Vec<StatementMetric>,
167}
168
169fn build_metrics(statements: Vec<StatementMetric>) -> ExecutionMetrics {
171 let fps: Vec<_> = statements.iter().map(|m| m.fingerprint).collect();
172 ExecutionMetrics {
173 fingerprint: fingerprint_request(&fps),
174 statements,
175 ..Default::default()
176 }
177}
178
179fn execute_compiled_units(
181 services: &Arc<Services>,
182 tx: &mut Transaction<'_>,
183 compiled_list: &[Compiled],
184 params: &Params,
185 mut symbols: SymbolTable,
186 compile_duration: Duration,
187) -> StdResult<CompiledUnitsResult, ExecutionFailure> {
188 let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
189 let mut result = vec![];
190 let mut output_results: Vec<Frame> = Vec::new();
191 let mut metrics = Vec::new();
192
193 for compiled in compiled_list.iter() {
194 result.clear();
195 let mut vm = Vm::from_services(symbols, services, params, tx.identity());
196 let start = services.runtime_context.clock.instant();
197 let run_result = vm.run(services, tx, &compiled.instructions, &mut result);
198 let execute_duration = start.elapsed();
199 symbols = vm.symbols;
200
201 metrics.push(StatementMetric {
202 fingerprint: compiled.fingerprint,
203 normalized_rql: compiled.normalized_rql.clone(),
204 compile_duration_us,
205 execute_duration_us: execute_duration.as_micros() as u64,
206 rows_affected: if run_result.is_ok() {
207 extract_rows_affected(&result)
208 } else {
209 0
210 },
211 });
212
213 if let Err(error) = run_result {
214 return Err(ExecutionFailure {
215 error,
216 partial_metrics: metrics,
217 });
218 }
219
220 if compiled.is_output {
221 output_results.append(&mut result);
222 }
223 }
224
225 Ok((output_results, result, symbols, metrics))
226}
227
228fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
230 output_results.append(&mut remaining);
231 output_results
232}
233
234fn extract_rows_affected(result: &[Frame]) -> u64 {
241 if result.len() == 1 {
242 let frame = &result[0];
243 for col in &frame.columns {
244 match col.name.as_str() {
245 "inserted" | "updated" | "deleted" => {
246 if col.data.len() == 1
247 && let Value::Uint8(n) = col.data.get_value(0)
248 {
249 return n;
250 }
251 }
252 _ => {}
253 }
254 }
255 }
256 result.len() as u64
257}
258
259impl Executor {
260 fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
262 let mut symbols = SymbolTable::new();
263 populate_symbols(&mut symbols, params)?;
264 populate_identity(&mut symbols, &self.catalog, tx)?;
265 Ok(symbols)
266 }
267
268 #[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
273 pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
274 let mut symbols = match self.setup_symbols(¶ms, tx) {
275 Ok(s) => s,
276 Err(e) => {
277 return ExecutionResult {
278 frames: vec![],
279 error: Some(e),
280 metrics: ExecutionMetrics::default(),
281 };
282 }
283 };
284
285 let start_compile = self.0.runtime_context.clock.instant();
286 let compiled_list = match self.compiler.compile_with_policy(tx, rql, inject_from_policies) {
287 Ok(CompilationResult::Ready(compiled)) => compiled,
288 Ok(CompilationResult::Incremental(_)) => {
289 unreachable!("incremental compilation not supported in rql()")
290 }
291 Err(err) => {
292 #[cfg(not(reifydb_single_threaded))]
293 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
294 return ExecutionResult {
295 frames,
296 error: None,
297 metrics: ExecutionMetrics::default(),
298 };
299 }
300 return ExecutionResult {
301 frames: vec![],
302 error: Some(err),
303 metrics: ExecutionMetrics::default(),
304 };
305 }
306 };
307 let compile_duration = start_compile.elapsed();
308 let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
309
310 let mut result = vec![];
311 let mut metrics = Vec::new();
312 for compiled in compiled_list.iter() {
313 result.clear();
314 let mut vm = Vm::from_services(symbols, &self.0, ¶ms, tx.identity());
315 let start_execute = self.0.runtime_context.clock.instant();
316 let run_result = vm.run(&self.0, tx, &compiled.instructions, &mut result);
317 let execute_duration = start_execute.elapsed();
318 symbols = vm.symbols;
319
320 metrics.push(StatementMetric {
321 fingerprint: compiled.fingerprint,
322 normalized_rql: compiled.normalized_rql.clone(),
323 compile_duration_us,
324 execute_duration_us: execute_duration.as_micros() as u64,
325 rows_affected: if run_result.is_ok() {
326 extract_rows_affected(&result)
327 } else {
328 0
329 },
330 });
331
332 if let Err(e) = run_result {
333 return ExecutionResult {
334 frames: vec![],
335 error: Some(e),
336 metrics: build_metrics(metrics),
337 };
338 }
339 }
340
341 ExecutionResult {
342 frames: result,
343 error: None,
344 metrics: build_metrics(metrics),
345 }
346 }
347
348 #[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
349 pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> ExecutionResult {
350 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn)) {
351 Ok(s) => s,
352 Err(e) => {
353 return ExecutionResult {
354 frames: vec![],
355 error: Some(e),
356 metrics: ExecutionMetrics::default(),
357 };
358 }
359 };
360
361 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
362 &mut Transaction::Admin(&mut *txn),
363 SessionOp::Admin,
364 true,
365 ) {
366 return ExecutionResult {
367 frames: vec![],
368 error: Some(e),
369 metrics: ExecutionMetrics::default(),
370 };
371 }
372
373 let start_compile = self.0.runtime_context.clock.instant();
374 match self.compiler.compile_with_policy(&mut Transaction::Admin(txn), cmd.rql, inject_from_policies) {
375 Err(err) => {
376 #[cfg(not(reifydb_single_threaded))]
377 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
378 return ExecutionResult {
379 frames,
380 error: None,
381 metrics: ExecutionMetrics::default(),
382 };
383 }
384 ExecutionResult {
385 frames: vec![],
386 error: Some(err),
387 metrics: ExecutionMetrics::default(),
388 }
389 }
390 Ok(CompilationResult::Ready(compiled)) => {
391 let compile_duration = start_compile.elapsed();
392 match execute_compiled_units(
393 &self.0,
394 &mut Transaction::Admin(txn),
395 &compiled,
396 &cmd.params,
397 symbols,
398 compile_duration,
399 ) {
400 Ok((output, remaining, _, metrics)) => ExecutionResult {
401 frames: merge_results(output, remaining),
402 error: None,
403 metrics: build_metrics(metrics),
404 },
405 Err(f) => ExecutionResult {
406 frames: vec![],
407 error: Some(f.error),
408 metrics: build_metrics(f.partial_metrics),
409 },
410 }
411 }
412 Ok(CompilationResult::Incremental(mut state)) => {
413 let policy = constrain_policy(|plans, bump, cat, tx| {
414 inject_from_policies(plans, bump, cat, tx)
415 });
416 let mut result = vec![];
417 let mut output_results: Vec<Frame> = Vec::new();
418 let mut symbols = symbols;
419 let mut metrics = Vec::new();
420 loop {
421 let start_incr = self.0.runtime_context.clock.instant();
422 let next = match self.compiler.compile_next_with_policy(
423 &mut Transaction::Admin(txn),
424 &mut state,
425 &policy,
426 ) {
427 Ok(n) => n,
428 Err(e) => {
429 return ExecutionResult {
430 frames: vec![],
431 error: Some(e),
432 metrics: build_metrics(metrics),
433 };
434 }
435 };
436 let compile_duration = start_incr.elapsed();
437
438 let Some(compiled) = next else {
439 break;
440 };
441
442 result.clear();
443 let mut tx = Transaction::Admin(txn);
444 let mut vm = Vm::from_services(symbols, &self.0, &cmd.params, tx.identity());
445 let start_execute = self.0.runtime_context.clock.instant();
446 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
447 let execute_duration = start_execute.elapsed();
448 symbols = vm.symbols;
449
450 metrics.push(StatementMetric {
451 fingerprint: compiled.fingerprint,
452 normalized_rql: compiled.normalized_rql,
453 compile_duration_us: compile_duration.as_micros() as u64,
454 execute_duration_us: execute_duration.as_micros() as u64,
455 rows_affected: if run_result.is_ok() {
456 extract_rows_affected(&result)
457 } else {
458 0
459 },
460 });
461
462 if let Err(e) = run_result {
463 return ExecutionResult {
464 frames: vec![],
465 error: Some(e),
466 metrics: build_metrics(metrics),
467 };
468 }
469
470 if compiled.is_output {
471 output_results.append(&mut result);
472 }
473 }
474 ExecutionResult {
475 frames: merge_results(output_results, result),
476 error: None,
477 metrics: build_metrics(metrics),
478 }
479 }
480 }
481 }
482
483 #[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
484 pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> ExecutionResult {
485 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Test(Box::new(txn.reborrow()))) {
486 Ok(s) => s,
487 Err(e) => {
488 return ExecutionResult {
489 frames: vec![],
490 error: Some(e),
491 metrics: ExecutionMetrics::default(),
492 };
493 }
494 };
495
496 let session_type = txn.session_type;
497 let session_default_deny = txn.session_default_deny;
498 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
499 &mut Transaction::Test(Box::new(txn.reborrow())),
500 session_type,
501 session_default_deny,
502 ) {
503 return ExecutionResult {
504 frames: vec![],
505 error: Some(e),
506 metrics: ExecutionMetrics::default(),
507 };
508 }
509
510 let start_compile = self.0.runtime_context.clock.instant();
511 match self.compiler.compile_with_policy(
512 &mut Transaction::Test(Box::new(txn.reborrow())),
513 cmd.rql,
514 inject_from_policies,
515 ) {
516 Err(err) => {
517 #[cfg(not(reifydb_single_threaded))]
518 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
519 return ExecutionResult {
520 frames,
521 error: None,
522 metrics: ExecutionMetrics::default(),
523 };
524 }
525 ExecutionResult {
526 frames: vec![],
527 error: Some(err),
528 metrics: ExecutionMetrics::default(),
529 }
530 }
531 Ok(CompilationResult::Ready(compiled)) => {
532 let compile_duration = start_compile.elapsed();
533 match execute_compiled_units(
534 &self.0,
535 &mut Transaction::Test(Box::new(txn.reborrow())),
536 &compiled,
537 &cmd.params,
538 symbols,
539 compile_duration,
540 ) {
541 Ok((output, remaining, _, metrics)) => ExecutionResult {
542 frames: merge_results(output, remaining),
543 error: None,
544 metrics: build_metrics(metrics),
545 },
546 Err(f) => ExecutionResult {
547 frames: vec![],
548 error: Some(f.error),
549 metrics: build_metrics(f.partial_metrics),
550 },
551 }
552 }
553 Ok(CompilationResult::Incremental(mut state)) => {
554 let policy = constrain_policy(|plans, bump, cat, tx| {
555 inject_from_policies(plans, bump, cat, tx)
556 });
557 let mut result = vec![];
558 let mut output_results: Vec<Frame> = Vec::new();
559 let mut symbols = symbols;
560 let mut metrics = Vec::new();
561 loop {
562 let start_incr = self.0.runtime_context.clock.instant();
563 let next = match self.compiler.compile_next_with_policy(
564 &mut Transaction::Test(Box::new(txn.reborrow())),
565 &mut state,
566 &policy,
567 ) {
568 Ok(n) => n,
569 Err(e) => {
570 return ExecutionResult {
571 frames: vec![],
572 error: Some(e),
573 metrics: build_metrics(metrics),
574 };
575 }
576 };
577 let compile_duration = start_incr.elapsed();
578
579 let Some(compiled) = next else {
580 break;
581 };
582
583 result.clear();
584 let mut tx = Transaction::Test(Box::new(txn.reborrow()));
585 let mut vm = Vm::from_services(symbols, &self.0, &cmd.params, tx.identity());
586 let start_execute = self.0.runtime_context.clock.instant();
587 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
588 let execute_duration = start_execute.elapsed();
589 symbols = vm.symbols;
590
591 metrics.push(StatementMetric {
592 fingerprint: compiled.fingerprint,
593 normalized_rql: compiled.normalized_rql,
594 compile_duration_us: compile_duration.as_micros() as u64,
595 execute_duration_us: execute_duration.as_micros() as u64,
596 rows_affected: if run_result.is_ok() {
597 extract_rows_affected(&result)
598 } else {
599 0
600 },
601 });
602
603 if let Err(e) = run_result {
604 return ExecutionResult {
605 frames: vec![],
606 error: Some(e),
607 metrics: build_metrics(metrics),
608 };
609 }
610
611 if compiled.is_output {
612 output_results.append(&mut result);
613 }
614 }
615 ExecutionResult {
616 frames: merge_results(output_results, result),
617 error: None,
618 metrics: build_metrics(metrics),
619 }
620 }
621 }
622 }
623
624 #[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
625 pub fn subscription(&self, txn: &mut QueryTransaction, cmd: Subscription<'_>) -> ExecutionResult {
626 let bump = Bump::new();
628 let statements = match parse_str(&bump, cmd.rql) {
629 Ok(s) => s,
630 Err(e) => {
631 return ExecutionResult {
632 frames: vec![],
633 error: Some(e),
634 metrics: ExecutionMetrics::default(),
635 };
636 }
637 };
638
639 if statements.len() != 1 {
640 return ExecutionResult {
641 frames: vec![],
642 error: Some(Error(Box::new(subscription::single_statement_required(
643 "Subscription endpoint requires exactly one statement",
644 )))),
645 metrics: ExecutionMetrics::default(),
646 };
647 }
648
649 let statement = &statements[0];
650 if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
651 return ExecutionResult {
652 frames: vec![],
653 error: Some(Error(Box::new(subscription::invalid_statement(
654 "Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
655 )))),
656 metrics: ExecutionMetrics::default(),
657 };
658 }
659
660 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Query(&mut *txn)) {
661 Ok(s) => s,
662 Err(e) => {
663 return ExecutionResult {
664 frames: vec![],
665 error: Some(e),
666 metrics: ExecutionMetrics::default(),
667 };
668 }
669 };
670
671 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
672 &mut Transaction::Query(&mut *txn),
673 SessionOp::Subscription,
674 true,
675 ) {
676 return ExecutionResult {
677 frames: vec![],
678 error: Some(e),
679 metrics: ExecutionMetrics::default(),
680 };
681 }
682
683 let start_compile = self.0.runtime_context.clock.instant();
684 let compiled = match self.compiler.compile_with_policy(
685 &mut Transaction::Query(txn),
686 cmd.rql,
687 inject_from_policies,
688 ) {
689 Ok(CompilationResult::Ready(compiled)) => compiled,
690 Ok(CompilationResult::Incremental(_)) => {
691 unreachable!("Single subscription statement should not require incremental compilation")
692 }
693 Err(err) => {
694 return ExecutionResult {
695 frames: vec![],
696 error: Some(err),
697 metrics: ExecutionMetrics::default(),
698 };
699 }
700 };
701 let compile_duration = start_compile.elapsed();
702
703 match execute_compiled_units(
704 &self.0,
705 &mut Transaction::Query(txn),
706 &compiled,
707 &cmd.params,
708 symbols,
709 compile_duration,
710 ) {
711 Ok((output, remaining, _, metrics)) => ExecutionResult {
712 frames: merge_results(output, remaining),
713 error: None,
714 metrics: build_metrics(metrics),
715 },
716 Err(f) => ExecutionResult {
717 frames: vec![],
718 error: Some(f.error),
719 metrics: build_metrics(f.partial_metrics),
720 },
721 }
722 }
723
724 #[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
725 pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> ExecutionResult {
726 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn)) {
727 Ok(s) => s,
728 Err(e) => {
729 return ExecutionResult {
730 frames: vec![],
731 error: Some(e),
732 metrics: ExecutionMetrics::default(),
733 };
734 }
735 };
736
737 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
738 &mut Transaction::Command(&mut *txn),
739 SessionOp::Command,
740 false,
741 ) {
742 return ExecutionResult {
743 frames: vec![],
744 error: Some(e),
745 metrics: ExecutionMetrics::default(),
746 };
747 }
748
749 let start_compile = self.0.runtime_context.clock.instant();
750 let compiled = match self.compiler.compile_with_policy(
751 &mut Transaction::Command(txn),
752 cmd.rql,
753 inject_from_policies,
754 ) {
755 Ok(CompilationResult::Ready(compiled)) => compiled,
756 Ok(CompilationResult::Incremental(_)) => {
757 unreachable!("DDL statements require admin transactions, not command transactions")
758 }
759 Err(err) => {
760 #[cfg(not(reifydb_single_threaded))]
761 if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
762 return ExecutionResult {
763 frames: vec![],
764 error: Some(Error(Box::new(Diagnostic {
765 code: "REMOTE_002".to_string(),
766 message: "Write operations on remote namespaces are not supported"
767 .to_string(),
768 help: Some("Use the remote instance directly for write operations"
769 .to_string()),
770 ..Default::default()
771 }))),
772 metrics: ExecutionMetrics::default(),
773 };
774 }
775 return ExecutionResult {
776 frames: vec![],
777 error: Some(err),
778 metrics: ExecutionMetrics::default(),
779 };
780 }
781 };
782 let compile_duration = start_compile.elapsed();
783
784 match execute_compiled_units(
785 &self.0,
786 &mut Transaction::Command(txn),
787 &compiled,
788 &cmd.params,
789 symbols,
790 compile_duration,
791 ) {
792 Ok((output, remaining, _, metrics)) => ExecutionResult {
793 frames: merge_results(output, remaining),
794 error: None,
795 metrics: build_metrics(metrics),
796 },
797 Err(f) => ExecutionResult {
798 frames: vec![],
799 error: Some(f.error),
800 metrics: build_metrics(f.partial_metrics),
801 },
802 }
803 }
804
805 #[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
807 pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> ExecutionResult {
808 let rql = format!("CALL {}()", name);
809 let symbols = match self.setup_symbols(params, &mut Transaction::Command(&mut *txn)) {
810 Ok(s) => s,
811 Err(e) => {
812 return ExecutionResult {
813 frames: vec![],
814 error: Some(e),
815 metrics: ExecutionMetrics::default(),
816 };
817 }
818 };
819
820 let start_compile = self.0.runtime_context.clock.instant();
821 let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql) {
822 Ok(CompilationResult::Ready(compiled)) => compiled,
823 Ok(CompilationResult::Incremental(_)) => {
824 unreachable!("CALL statements should not require incremental compilation")
825 }
826 Err(e) => {
827 return ExecutionResult {
828 frames: vec![],
829 error: Some(e),
830 metrics: ExecutionMetrics::default(),
831 };
832 }
833 };
834 let compile_duration = start_compile.elapsed();
835 let compile_duration_us = compile_duration.as_micros() as u64 / compiled.len().max(1) as u64;
836
837 let mut result = vec![];
838 let mut metrics = Vec::new();
839 let mut symbols = symbols;
840 for compiled in compiled.iter() {
841 result.clear();
842 let mut tx = Transaction::Command(txn);
843 let mut vm = Vm::from_services(symbols, &self.0, params, tx.identity());
844 let start_execute = self.0.runtime_context.clock.instant();
845 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
846 let execute_duration = start_execute.elapsed();
847 symbols = vm.symbols;
848
849 metrics.push(StatementMetric {
850 fingerprint: compiled.fingerprint,
851 normalized_rql: compiled.normalized_rql.clone(),
852 compile_duration_us,
853 execute_duration_us: execute_duration.as_micros() as u64,
854 rows_affected: if run_result.is_ok() {
855 extract_rows_affected(&result)
856 } else {
857 0
858 },
859 });
860
861 if let Err(e) = run_result {
862 return ExecutionResult {
863 frames: vec![],
864 error: Some(e),
865 metrics: build_metrics(metrics),
866 };
867 }
868 }
869
870 ExecutionResult {
871 frames: result,
872 error: None,
873 metrics: build_metrics(metrics),
874 }
875 }
876
877 #[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
878 pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> ExecutionResult {
879 let symbols = match self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn)) {
880 Ok(s) => s,
881 Err(e) => {
882 return ExecutionResult {
883 frames: vec![],
884 error: Some(e),
885 metrics: ExecutionMetrics::default(),
886 };
887 }
888 };
889
890 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
891 &mut Transaction::Query(&mut *txn),
892 SessionOp::Query,
893 false,
894 ) {
895 return ExecutionResult {
896 frames: vec![],
897 error: Some(e),
898 metrics: ExecutionMetrics::default(),
899 };
900 }
901
902 let start_compile = self.0.runtime_context.clock.instant();
903 let compiled = match self.compiler.compile_with_policy(
904 &mut Transaction::Query(txn),
905 qry.rql,
906 inject_from_policies,
907 ) {
908 Ok(CompilationResult::Ready(compiled)) => compiled,
909 Ok(CompilationResult::Incremental(_)) => {
910 unreachable!("DDL statements require admin transactions, not query transactions")
911 }
912 Err(err) => {
913 #[cfg(not(reifydb_single_threaded))]
914 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, qry.rql, qry.params) {
915 return ExecutionResult {
916 frames,
917 error: None,
918 metrics: ExecutionMetrics::default(),
919 };
920 }
921 return ExecutionResult {
922 frames: vec![],
923 error: Some(err),
924 metrics: ExecutionMetrics::default(),
925 };
926 }
927 };
928 let compile_duration = start_compile.elapsed();
929
930 let exec_result = execute_compiled_units(
931 &self.0,
932 &mut Transaction::Query(txn),
933 &compiled,
934 &qry.params,
935 symbols,
936 compile_duration,
937 );
938
939 match exec_result {
940 Ok((output, remaining, _, metrics)) => ExecutionResult {
941 frames: merge_results(output, remaining),
942 error: None,
943 metrics: build_metrics(metrics),
944 },
945 Err(f) => ExecutionResult {
946 frames: vec![],
947 error: Some(f.error),
948 metrics: build_metrics(f.partial_metrics),
949 },
950 }
951 }
952}