1use std::{ops::Deref, result::Result as StdResult, sync::Arc, time::Duration};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{
9 error::diagnostic::subscription,
10 execution::ExecutionResult,
11 metric::{ExecutionMetrics, StatementMetric},
12 value::column::columns::Columns,
13};
14use reifydb_metric_old::metric::MetricReader;
15use reifydb_policy::inject_read_policies;
16use reifydb_rql::{
17 ast::parse_str,
18 compiler::{CompilationResult, Compiled, constrain_policy},
19 fingerprint::request::fingerprint_request,
20};
21use reifydb_store_single::SingleStore;
22use reifydb_transaction::transaction::{
23 RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
24 query::QueryTransaction,
25};
26#[cfg(not(reifydb_single_threaded))]
27use reifydb_type::error::Diagnostic;
28use reifydb_type::{
29 error::Error,
30 params::Params,
31 value::{Value, frame::frame::Frame, r#type::Type},
32};
33use tracing::instrument;
34
35#[cfg(not(reifydb_single_threaded))]
36use crate::remote;
37use crate::{
38 Result,
39 policy::PolicyEvaluator,
40 vm::{
41 Admin, Command, Query, Subscription, Test,
42 services::{EngineConfig, Services},
43 stack::{SymbolTable, Variable},
44 vm::Vm,
45 },
46};
47
48pub struct Executor(Arc<Services>);
50
51impl Clone for Executor {
52 fn clone(&self) -> Self {
53 Self(self.0.clone())
54 }
55}
56
57impl Deref for Executor {
58 type Target = Services;
59
60 fn deref(&self) -> &Self::Target {
61 &self.0
62 }
63}
64
65impl Executor {
66 pub fn new(
67 catalog: Catalog,
68 config: EngineConfig,
69 flow_operator_store: SystemFlowOperatorStore,
70 stats_reader: MetricReader<SingleStore>,
71 ) -> Self {
72 Self(Arc::new(Services::new(catalog, config, flow_operator_store, stats_reader)))
73 }
74
75 pub fn services(&self) -> &Arc<Services> {
77 &self.0
78 }
79
80 pub fn from_services(services: Arc<Services>) -> Self {
82 Self(services)
83 }
84
85 #[allow(dead_code)]
86 pub fn testing() -> Self {
87 Self(Services::testing())
88 }
89
90 #[cfg(not(reifydb_single_threaded))]
93 fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
94 if let Some(ref registry) = self.0.remote_registry
95 && remote::is_remote_query(err)
96 && let Some(address) = remote::extract_remote_address(err)
97 {
98 let token = remote::extract_remote_token(err);
99 return registry.forward_query(&address, rql, params, token.as_deref()).map(Some);
100 }
101 Ok(None)
102 }
103}
104
105impl RqlExecutor for Executor {
106 fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
107 Executor::rql(self, tx, rql, params)
108 }
109}
110
111fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
113 match params {
114 Params::Positional(values) => {
115 for (index, value) in values.iter().enumerate() {
116 let param_name = (index + 1).to_string();
117 symbols.set(param_name, Variable::scalar(value.clone()), false)?;
118 }
119 }
120 Params::Named(map) => {
121 for (name, value) in map.iter() {
122 symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
123 }
124 }
125 Params::None => {}
126 }
127 Ok(())
128}
129
130fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
133 let identity = tx.identity();
134 if identity.is_privileged() {
135 return Ok(());
136 }
137 if identity.is_anonymous() {
138 let columns = Columns::single_row([
139 ("id", Value::IdentityId(identity)),
140 ("name", Value::none_of(Type::Utf8)),
141 ("roles", Value::List(vec![])),
142 ]);
143 symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
144 return Ok(());
145 }
146 if let Some(user) = catalog.find_identity(tx, identity)? {
147 let roles = catalog.find_role_names_for_identity(tx, identity)?;
148 let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
149 let columns = Columns::single_row([
150 ("id", Value::IdentityId(identity)),
151 ("name", Value::Utf8(user.name)),
152 ("roles", Value::List(role_values)),
153 ]);
154 symbols.set("identity".to_string(), Variable::Columns(columns), false)?;
155 }
156 Ok(())
157}
158
159type CompiledUnitsResult = (Vec<Frame>, Vec<Frame>, SymbolTable, Vec<StatementMetric>);
161
162struct ExecutionFailure {
164 error: Error,
165 partial_metrics: Vec<StatementMetric>,
166}
167
168fn build_metrics(statements: Vec<StatementMetric>) -> ExecutionMetrics {
170 let fps: Vec<_> = statements.iter().map(|m| m.fingerprint).collect();
171 ExecutionMetrics {
172 request_fingerprint: fingerprint_request(&fps),
173 statements,
174 }
175}
176
177fn execute_compiled_units(
179 services: &Arc<Services>,
180 tx: &mut Transaction<'_>,
181 compiled_list: &[Compiled],
182 params: &Params,
183 mut symbols: SymbolTable,
184 compile_duration: Duration,
185) -> StdResult<CompiledUnitsResult, ExecutionFailure> {
186 let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
187 let mut result = vec![];
188 let mut output_results: Vec<Frame> = Vec::new();
189 let mut metrics = Vec::new();
190
191 for compiled in compiled_list.iter() {
192 result.clear();
193 let mut vm = Vm::new(symbols);
194 let start = services.runtime_context.clock.instant();
195 let run_result = vm.run(services, tx, &compiled.instructions, params, &mut result);
196 let execute_duration = start.elapsed();
197 symbols = vm.symbols;
198
199 metrics.push(StatementMetric {
200 fingerprint: compiled.fingerprint,
201 normalized_rql: compiled.normalized_rql.clone(),
202 compile_duration_us,
203 execute_duration_us: execute_duration.as_micros() as u64,
204 rows_affected: if run_result.is_ok() {
205 extract_rows_affected(&result)
206 } else {
207 0
208 },
209 });
210
211 if let Err(error) = run_result {
212 return Err(ExecutionFailure {
213 error,
214 partial_metrics: metrics,
215 });
216 }
217
218 if compiled.is_output {
219 output_results.append(&mut result);
220 }
221 }
222
223 Ok((output_results, result, symbols, metrics))
224}
225
226fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
228 output_results.append(&mut remaining);
229 output_results
230}
231
232fn extract_rows_affected(result: &[Frame]) -> u64 {
239 if result.len() == 1 {
240 let frame = &result[0];
241 for col in &frame.columns {
242 match col.name.as_str() {
243 "inserted" | "updated" | "deleted" => {
244 if col.data.len() == 1
245 && let Value::Uint8(n) = col.data.get_value(0)
246 {
247 return n;
248 }
249 }
250 _ => {}
251 }
252 }
253 }
254 result.len() as u64
255}
256
257impl Executor {
258 fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
260 let mut symbols = SymbolTable::new();
261 populate_symbols(&mut symbols, params)?;
262 populate_identity(&mut symbols, &self.catalog, tx)?;
263 Ok(symbols)
264 }
265
266 #[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
271 pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
272 let mut symbols = match self.setup_symbols(¶ms, tx) {
273 Ok(s) => s,
274 Err(e) => {
275 return ExecutionResult {
276 frames: vec![],
277 error: Some(e),
278 metrics: ExecutionMetrics::default(),
279 };
280 }
281 };
282
283 let start_compile = self.0.runtime_context.clock.instant();
284 let compiled_list = match self.compiler.compile_with_policy(tx, rql, inject_read_policies) {
285 Ok(CompilationResult::Ready(compiled)) => compiled,
286 Ok(CompilationResult::Incremental(_)) => {
287 unreachable!("incremental compilation not supported in rql()")
288 }
289 Err(err) => {
290 #[cfg(not(reifydb_single_threaded))]
291 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
292 return ExecutionResult {
293 frames,
294 error: None,
295 metrics: ExecutionMetrics::default(),
296 };
297 }
298 return ExecutionResult {
299 frames: vec![],
300 error: Some(err),
301 metrics: ExecutionMetrics::default(),
302 };
303 }
304 };
305 let compile_duration = start_compile.elapsed();
306 let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
307
308 let mut result = vec![];
309 let mut metrics = Vec::new();
310 for compiled in compiled_list.iter() {
311 result.clear();
312 let mut vm = Vm::new(symbols);
313 let start_execute = self.0.runtime_context.clock.instant();
314 let run_result = vm.run(&self.0, tx, &compiled.instructions, ¶ms, &mut result);
315 let execute_duration = start_execute.elapsed();
316 symbols = vm.symbols;
317
318 metrics.push(StatementMetric {
319 fingerprint: compiled.fingerprint,
320 normalized_rql: compiled.normalized_rql.clone(),
321 compile_duration_us,
322 execute_duration_us: execute_duration.as_micros() as u64,
323 rows_affected: if run_result.is_ok() {
324 extract_rows_affected(&result)
325 } else {
326 0
327 },
328 });
329
330 if let Err(e) = run_result {
331 return ExecutionResult {
332 frames: vec![],
333 error: Some(e),
334 metrics: build_metrics(metrics),
335 };
336 }
337 }
338
339 ExecutionResult {
340 frames: result,
341 error: None,
342 metrics: build_metrics(metrics),
343 }
344 }
345
346 #[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
347 pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> ExecutionResult {
348 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn)) {
349 Ok(s) => s,
350 Err(e) => {
351 return ExecutionResult {
352 frames: vec![],
353 error: Some(e),
354 metrics: ExecutionMetrics::default(),
355 };
356 }
357 };
358
359 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
360 &mut Transaction::Admin(&mut *txn),
361 "admin",
362 true,
363 ) {
364 return ExecutionResult {
365 frames: vec![],
366 error: Some(e),
367 metrics: ExecutionMetrics::default(),
368 };
369 }
370
371 let start_compile = self.0.runtime_context.clock.instant();
372 match self.compiler.compile_with_policy(&mut Transaction::Admin(txn), cmd.rql, inject_read_policies) {
373 Err(err) => {
374 #[cfg(not(reifydb_single_threaded))]
375 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
376 return ExecutionResult {
377 frames,
378 error: None,
379 metrics: ExecutionMetrics::default(),
380 };
381 }
382 ExecutionResult {
383 frames: vec![],
384 error: Some(err),
385 metrics: ExecutionMetrics::default(),
386 }
387 }
388 Ok(CompilationResult::Ready(compiled)) => {
389 let compile_duration = start_compile.elapsed();
390 match execute_compiled_units(
391 &self.0,
392 &mut Transaction::Admin(txn),
393 &compiled,
394 &cmd.params,
395 symbols,
396 compile_duration,
397 ) {
398 Ok((output, remaining, _, metrics)) => ExecutionResult {
399 frames: merge_results(output, remaining),
400 error: None,
401 metrics: build_metrics(metrics),
402 },
403 Err(f) => ExecutionResult {
404 frames: vec![],
405 error: Some(f.error),
406 metrics: build_metrics(f.partial_metrics),
407 },
408 }
409 }
410 Ok(CompilationResult::Incremental(mut state)) => {
411 let policy = constrain_policy(|plans, bump, cat, tx| {
412 inject_read_policies(plans, bump, cat, tx)
413 });
414 let mut result = vec![];
415 let mut output_results: Vec<Frame> = Vec::new();
416 let mut symbols = symbols;
417 let mut metrics = Vec::new();
418 loop {
419 let start_incr = self.0.runtime_context.clock.instant();
420 let next = match self.compiler.compile_next_with_policy(
421 &mut Transaction::Admin(txn),
422 &mut state,
423 &policy,
424 ) {
425 Ok(n) => n,
426 Err(e) => {
427 return ExecutionResult {
428 frames: vec![],
429 error: Some(e),
430 metrics: build_metrics(metrics),
431 };
432 }
433 };
434 let compile_duration = start_incr.elapsed();
435
436 let Some(compiled) = next else {
437 break;
438 };
439
440 result.clear();
441 let mut tx = Transaction::Admin(txn);
442 let mut vm = Vm::new(symbols);
443 let start_execute = self.0.runtime_context.clock.instant();
444 let run_result = vm.run(
445 &self.0,
446 &mut tx,
447 &compiled.instructions,
448 &cmd.params,
449 &mut result,
450 );
451 let execute_duration = start_execute.elapsed();
452 symbols = vm.symbols;
453
454 metrics.push(StatementMetric {
455 fingerprint: compiled.fingerprint,
456 normalized_rql: compiled.normalized_rql,
457 compile_duration_us: compile_duration.as_micros() as u64,
458 execute_duration_us: execute_duration.as_micros() as u64,
459 rows_affected: if run_result.is_ok() {
460 extract_rows_affected(&result)
461 } else {
462 0
463 },
464 });
465
466 if let Err(e) = run_result {
467 return ExecutionResult {
468 frames: vec![],
469 error: Some(e),
470 metrics: build_metrics(metrics),
471 };
472 }
473
474 if compiled.is_output {
475 output_results.append(&mut result);
476 }
477 }
478 ExecutionResult {
479 frames: merge_results(output_results, result),
480 error: None,
481 metrics: build_metrics(metrics),
482 }
483 }
484 }
485 }
486
487 #[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
488 pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> ExecutionResult {
489 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Test(Box::new(txn.reborrow()))) {
490 Ok(s) => s,
491 Err(e) => {
492 return ExecutionResult {
493 frames: vec![],
494 error: Some(e),
495 metrics: ExecutionMetrics::default(),
496 };
497 }
498 };
499
500 let session_type = txn.session_type.clone();
501 let session_default_deny = txn.session_default_deny;
502 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
503 &mut Transaction::Test(Box::new(txn.reborrow())),
504 &session_type,
505 session_default_deny,
506 ) {
507 return ExecutionResult {
508 frames: vec![],
509 error: Some(e),
510 metrics: ExecutionMetrics::default(),
511 };
512 }
513
514 let start_compile = self.0.runtime_context.clock.instant();
515 match self.compiler.compile_with_policy(
516 &mut Transaction::Test(Box::new(txn.reborrow())),
517 cmd.rql,
518 inject_read_policies,
519 ) {
520 Err(err) => {
521 #[cfg(not(reifydb_single_threaded))]
522 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
523 return ExecutionResult {
524 frames,
525 error: None,
526 metrics: ExecutionMetrics::default(),
527 };
528 }
529 ExecutionResult {
530 frames: vec![],
531 error: Some(err),
532 metrics: ExecutionMetrics::default(),
533 }
534 }
535 Ok(CompilationResult::Ready(compiled)) => {
536 let compile_duration = start_compile.elapsed();
537 match execute_compiled_units(
538 &self.0,
539 &mut Transaction::Test(Box::new(txn.reborrow())),
540 &compiled,
541 &cmd.params,
542 symbols,
543 compile_duration,
544 ) {
545 Ok((output, remaining, _, metrics)) => ExecutionResult {
546 frames: merge_results(output, remaining),
547 error: None,
548 metrics: build_metrics(metrics),
549 },
550 Err(f) => ExecutionResult {
551 frames: vec![],
552 error: Some(f.error),
553 metrics: build_metrics(f.partial_metrics),
554 },
555 }
556 }
557 Ok(CompilationResult::Incremental(mut state)) => {
558 let policy = constrain_policy(|plans, bump, cat, tx| {
559 inject_read_policies(plans, bump, cat, tx)
560 });
561 let mut result = vec![];
562 let mut output_results: Vec<Frame> = Vec::new();
563 let mut symbols = symbols;
564 let mut metrics = Vec::new();
565 loop {
566 let start_incr = self.0.runtime_context.clock.instant();
567 let next = match self.compiler.compile_next_with_policy(
568 &mut Transaction::Test(Box::new(txn.reborrow())),
569 &mut state,
570 &policy,
571 ) {
572 Ok(n) => n,
573 Err(e) => {
574 return ExecutionResult {
575 frames: vec![],
576 error: Some(e),
577 metrics: build_metrics(metrics),
578 };
579 }
580 };
581 let compile_duration = start_incr.elapsed();
582
583 let Some(compiled) = next else {
584 break;
585 };
586
587 result.clear();
588 let mut tx = Transaction::Test(Box::new(txn.reborrow()));
589 let mut vm = Vm::new(symbols);
590 let start_execute = self.0.runtime_context.clock.instant();
591 let run_result = vm.run(
592 &self.0,
593 &mut tx,
594 &compiled.instructions,
595 &cmd.params,
596 &mut result,
597 );
598 let execute_duration = start_execute.elapsed();
599 symbols = vm.symbols;
600
601 metrics.push(StatementMetric {
602 fingerprint: compiled.fingerprint,
603 normalized_rql: compiled.normalized_rql,
604 compile_duration_us: compile_duration.as_micros() as u64,
605 execute_duration_us: execute_duration.as_micros() as u64,
606 rows_affected: if run_result.is_ok() {
607 extract_rows_affected(&result)
608 } else {
609 0
610 },
611 });
612
613 if let Err(e) = run_result {
614 return ExecutionResult {
615 frames: vec![],
616 error: Some(e),
617 metrics: build_metrics(metrics),
618 };
619 }
620
621 if compiled.is_output {
622 output_results.append(&mut result);
623 }
624 }
625 ExecutionResult {
626 frames: merge_results(output_results, result),
627 error: None,
628 metrics: build_metrics(metrics),
629 }
630 }
631 }
632 }
633
634 #[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
635 pub fn subscription(&self, txn: &mut QueryTransaction, cmd: Subscription<'_>) -> ExecutionResult {
636 let bump = Bump::new();
638 let statements = match parse_str(&bump, cmd.rql) {
639 Ok(s) => s,
640 Err(e) => {
641 return ExecutionResult {
642 frames: vec![],
643 error: Some(e),
644 metrics: ExecutionMetrics::default(),
645 };
646 }
647 };
648
649 if statements.len() != 1 {
650 return ExecutionResult {
651 frames: vec![],
652 error: Some(Error(Box::new(subscription::single_statement_required(
653 "Subscription endpoint requires exactly one statement",
654 )))),
655 metrics: ExecutionMetrics::default(),
656 };
657 }
658
659 let statement = &statements[0];
660 if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
661 return ExecutionResult {
662 frames: vec![],
663 error: Some(Error(Box::new(subscription::invalid_statement(
664 "Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
665 )))),
666 metrics: ExecutionMetrics::default(),
667 };
668 }
669
670 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Query(&mut *txn)) {
671 Ok(s) => s,
672 Err(e) => {
673 return ExecutionResult {
674 frames: vec![],
675 error: Some(e),
676 metrics: ExecutionMetrics::default(),
677 };
678 }
679 };
680
681 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
682 &mut Transaction::Query(&mut *txn),
683 "subscription",
684 true,
685 ) {
686 return ExecutionResult {
687 frames: vec![],
688 error: Some(e),
689 metrics: ExecutionMetrics::default(),
690 };
691 }
692
693 let start_compile = self.0.runtime_context.clock.instant();
694 let compiled = match self.compiler.compile_with_policy(
695 &mut Transaction::Query(txn),
696 cmd.rql,
697 inject_read_policies,
698 ) {
699 Ok(CompilationResult::Ready(compiled)) => compiled,
700 Ok(CompilationResult::Incremental(_)) => {
701 unreachable!("Single subscription statement should not require incremental compilation")
702 }
703 Err(err) => {
704 return ExecutionResult {
705 frames: vec![],
706 error: Some(err),
707 metrics: ExecutionMetrics::default(),
708 };
709 }
710 };
711 let compile_duration = start_compile.elapsed();
712
713 match execute_compiled_units(
714 &self.0,
715 &mut Transaction::Query(txn),
716 &compiled,
717 &cmd.params,
718 symbols,
719 compile_duration,
720 ) {
721 Ok((output, remaining, _, metrics)) => ExecutionResult {
722 frames: merge_results(output, remaining),
723 error: None,
724 metrics: build_metrics(metrics),
725 },
726 Err(f) => ExecutionResult {
727 frames: vec![],
728 error: Some(f.error),
729 metrics: build_metrics(f.partial_metrics),
730 },
731 }
732 }
733
734 #[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
735 pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> ExecutionResult {
736 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn)) {
737 Ok(s) => s,
738 Err(e) => {
739 return ExecutionResult {
740 frames: vec![],
741 error: Some(e),
742 metrics: ExecutionMetrics::default(),
743 };
744 }
745 };
746
747 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
748 &mut Transaction::Command(&mut *txn),
749 "command",
750 false,
751 ) {
752 return ExecutionResult {
753 frames: vec![],
754 error: Some(e),
755 metrics: ExecutionMetrics::default(),
756 };
757 }
758
759 let start_compile = self.0.runtime_context.clock.instant();
760 let compiled = match self.compiler.compile_with_policy(
761 &mut Transaction::Command(txn),
762 cmd.rql,
763 inject_read_policies,
764 ) {
765 Ok(CompilationResult::Ready(compiled)) => compiled,
766 Ok(CompilationResult::Incremental(_)) => {
767 unreachable!("DDL statements require admin transactions, not command transactions")
768 }
769 Err(err) => {
770 #[cfg(not(reifydb_single_threaded))]
771 if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
772 return ExecutionResult {
773 frames: vec![],
774 error: Some(Error(Box::new(Diagnostic {
775 code: "REMOTE_002".to_string(),
776 message: "Write operations on remote namespaces are not supported"
777 .to_string(),
778 help: Some("Use the remote instance directly for write operations"
779 .to_string()),
780 ..Default::default()
781 }))),
782 metrics: ExecutionMetrics::default(),
783 };
784 }
785 return ExecutionResult {
786 frames: vec![],
787 error: Some(err),
788 metrics: ExecutionMetrics::default(),
789 };
790 }
791 };
792 let compile_duration = start_compile.elapsed();
793
794 match execute_compiled_units(
795 &self.0,
796 &mut Transaction::Command(txn),
797 &compiled,
798 &cmd.params,
799 symbols,
800 compile_duration,
801 ) {
802 Ok((output, remaining, _, metrics)) => ExecutionResult {
803 frames: merge_results(output, remaining),
804 error: None,
805 metrics: build_metrics(metrics),
806 },
807 Err(f) => ExecutionResult {
808 frames: vec![],
809 error: Some(f.error),
810 metrics: build_metrics(f.partial_metrics),
811 },
812 }
813 }
814
815 #[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
817 pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> ExecutionResult {
818 let rql = format!("CALL {}()", name);
819 let symbols = match self.setup_symbols(params, &mut Transaction::Command(&mut *txn)) {
820 Ok(s) => s,
821 Err(e) => {
822 return ExecutionResult {
823 frames: vec![],
824 error: Some(e),
825 metrics: ExecutionMetrics::default(),
826 };
827 }
828 };
829
830 let start_compile = self.0.runtime_context.clock.instant();
831 let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql) {
832 Ok(CompilationResult::Ready(compiled)) => compiled,
833 Ok(CompilationResult::Incremental(_)) => {
834 unreachable!("CALL statements should not require incremental compilation")
835 }
836 Err(e) => {
837 return ExecutionResult {
838 frames: vec![],
839 error: Some(e),
840 metrics: ExecutionMetrics::default(),
841 };
842 }
843 };
844 let compile_duration = start_compile.elapsed();
845 let compile_duration_us = compile_duration.as_micros() as u64 / compiled.len().max(1) as u64;
846
847 let mut result = vec![];
848 let mut metrics = Vec::new();
849 let mut symbols = symbols;
850 for compiled in compiled.iter() {
851 result.clear();
852 let mut tx = Transaction::Command(txn);
853 let mut vm = Vm::new(symbols);
854 let start_execute = self.0.runtime_context.clock.instant();
855 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result);
856 let execute_duration = start_execute.elapsed();
857 symbols = vm.symbols;
858
859 metrics.push(StatementMetric {
860 fingerprint: compiled.fingerprint,
861 normalized_rql: compiled.normalized_rql.clone(),
862 compile_duration_us,
863 execute_duration_us: execute_duration.as_micros() as u64,
864 rows_affected: if run_result.is_ok() {
865 extract_rows_affected(&result)
866 } else {
867 0
868 },
869 });
870
871 if let Err(e) = run_result {
872 return ExecutionResult {
873 frames: vec![],
874 error: Some(e),
875 metrics: build_metrics(metrics),
876 };
877 }
878 }
879
880 ExecutionResult {
881 frames: result,
882 error: None,
883 metrics: build_metrics(metrics),
884 }
885 }
886
887 #[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
888 pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> ExecutionResult {
889 let symbols = match self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn)) {
890 Ok(s) => s,
891 Err(e) => {
892 return ExecutionResult {
893 frames: vec![],
894 error: Some(e),
895 metrics: ExecutionMetrics::default(),
896 };
897 }
898 };
899
900 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
901 &mut Transaction::Query(&mut *txn),
902 "query",
903 false,
904 ) {
905 return ExecutionResult {
906 frames: vec![],
907 error: Some(e),
908 metrics: ExecutionMetrics::default(),
909 };
910 }
911
912 let start_compile = self.0.runtime_context.clock.instant();
913 let compiled = match self.compiler.compile_with_policy(
914 &mut Transaction::Query(txn),
915 qry.rql,
916 inject_read_policies,
917 ) {
918 Ok(CompilationResult::Ready(compiled)) => compiled,
919 Ok(CompilationResult::Incremental(_)) => {
920 unreachable!("DDL statements require admin transactions, not query transactions")
921 }
922 Err(err) => {
923 #[cfg(not(reifydb_single_threaded))]
924 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, qry.rql, qry.params) {
925 return ExecutionResult {
926 frames,
927 error: None,
928 metrics: ExecutionMetrics::default(),
929 };
930 }
931 return ExecutionResult {
932 frames: vec![],
933 error: Some(err),
934 metrics: ExecutionMetrics::default(),
935 };
936 }
937 };
938 let compile_duration = start_compile.elapsed();
939
940 let exec_result = execute_compiled_units(
941 &self.0,
942 &mut Transaction::Query(txn),
943 &compiled,
944 &qry.params,
945 symbols,
946 compile_duration,
947 );
948
949 match exec_result {
950 Ok((output, remaining, _, metrics)) => ExecutionResult {
951 frames: merge_results(output, remaining),
952 error: None,
953 metrics: build_metrics(metrics),
954 },
955 Err(f) => ExecutionResult {
956 frames: vec![],
957 error: Some(f.error),
958 metrics: build_metrics(f.partial_metrics),
959 },
960 }
961 }
962}