1use std::{ops::Deref, result::Result as StdResult, sync::Arc, time::Duration};
5
6use bumpalo::Bump;
7use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::SystemFlowOperatorStore};
8use reifydb_core::{
9 error::diagnostic::subscription,
10 execution::ExecutionResult,
11 metric::{ExecutionMetrics, StatementMetric},
12 value::column::columns::Columns,
13};
14use reifydb_metric::storage::metric::MetricReader;
15use reifydb_policy::inject_read_policies;
16use reifydb_rql::{
17 ast::parse_str,
18 compiler::{CompilationResult, Compiled, constrain_policy},
19 fingerprint::request::fingerprint_request,
20};
21use reifydb_store_single::SingleStore;
22use reifydb_transaction::transaction::{
23 RqlExecutor, TestTransaction, Transaction, admin::AdminTransaction, command::CommandTransaction,
24 query::QueryTransaction,
25};
26#[cfg(not(reifydb_single_threaded))]
27use reifydb_type::error::Diagnostic;
28use reifydb_type::{
29 error::Error,
30 params::Params,
31 value::{Value, frame::frame::Frame, r#type::Type},
32};
33use tracing::instrument;
34
35#[cfg(not(reifydb_single_threaded))]
36use crate::remote;
37use crate::{
38 Result,
39 policy::PolicyEvaluator,
40 vm::{
41 Admin, Command, Query, Subscription, Test,
42 services::{EngineConfig, Services},
43 stack::{SymbolTable, Variable},
44 vm::Vm,
45 },
46};
47
48pub struct Executor(Arc<Services>);
50
51impl Clone for Executor {
52 fn clone(&self) -> Self {
53 Self(self.0.clone())
54 }
55}
56
57impl Deref for Executor {
58 type Target = Services;
59
60 fn deref(&self) -> &Self::Target {
61 &self.0
62 }
63}
64
65impl Executor {
66 pub fn new(
67 catalog: Catalog,
68 config: EngineConfig,
69 flow_operator_store: SystemFlowOperatorStore,
70 stats_reader: MetricReader<SingleStore>,
71 ) -> Self {
72 Self(Arc::new(Services::new(catalog, config, flow_operator_store, stats_reader)))
73 }
74
75 pub fn services(&self) -> &Arc<Services> {
77 &self.0
78 }
79
80 pub fn from_services(services: Arc<Services>) -> Self {
82 Self(services)
83 }
84
85 #[allow(dead_code)]
86 pub fn testing() -> Self {
87 Self(Services::testing())
88 }
89
90 #[cfg(not(reifydb_single_threaded))]
93 fn try_forward_remote_query(&self, err: &Error, rql: &str, params: Params) -> Result<Option<Vec<Frame>>> {
94 if let Some(ref registry) = self.0.remote_registry
95 && remote::is_remote_query(err)
96 && let Some(address) = remote::extract_remote_address(err)
97 {
98 let token = remote::extract_remote_token(err);
99 return registry.forward_query(&address, rql, params, token.as_deref()).map(Some);
100 }
101 Ok(None)
102 }
103}
104
105impl RqlExecutor for Executor {
106 fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
107 Executor::rql(self, tx, rql, params)
108 }
109}
110
111fn populate_symbols(symbols: &mut SymbolTable, params: &Params) -> Result<()> {
113 match params {
114 Params::Positional(values) => {
115 for (index, value) in values.iter().enumerate() {
116 let param_name = (index + 1).to_string();
117 symbols.set(param_name, Variable::scalar(value.clone()), false)?;
118 }
119 }
120 Params::Named(map) => {
121 for (name, value) in map.iter() {
122 symbols.set(name.clone(), Variable::scalar(value.clone()), false)?;
123 }
124 }
125 Params::None => {}
126 }
127 Ok(())
128}
129
130fn populate_identity(symbols: &mut SymbolTable, catalog: &Catalog, tx: &mut Transaction<'_>) -> Result<()> {
133 let identity = tx.identity();
134 if identity.is_privileged() {
135 return Ok(());
136 }
137 if identity.is_anonymous() {
138 let columns = Columns::single_row([
139 ("id", Value::IdentityId(identity)),
140 ("name", Value::none_of(Type::Utf8)),
141 ("roles", Value::List(vec![])),
142 ]);
143 symbols.set("identity".to_string(), Variable::columns(columns), false)?;
144 return Ok(());
145 }
146 if let Some(user) = catalog.find_identity(tx, identity)? {
147 let roles = catalog.find_role_names_for_identity(tx, identity)?;
148 let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
149 let columns = Columns::single_row([
150 ("id", Value::IdentityId(identity)),
151 ("name", Value::Utf8(user.name)),
152 ("roles", Value::List(role_values)),
153 ]);
154 symbols.set("identity".to_string(), Variable::columns(columns), false)?;
155 }
156 Ok(())
157}
158
159type CompiledUnitsResult = (Vec<Frame>, Vec<Frame>, SymbolTable, Vec<StatementMetric>);
161
162struct ExecutionFailure {
164 error: Error,
165 partial_metrics: Vec<StatementMetric>,
166}
167
168fn build_metrics(statements: Vec<StatementMetric>) -> ExecutionMetrics {
170 let fps: Vec<_> = statements.iter().map(|m| m.fingerprint).collect();
171 ExecutionMetrics {
172 fingerprint: fingerprint_request(&fps),
173 statements,
174 }
175}
176
177fn execute_compiled_units(
179 services: &Arc<Services>,
180 tx: &mut Transaction<'_>,
181 compiled_list: &[Compiled],
182 params: &Params,
183 mut symbols: SymbolTable,
184 compile_duration: Duration,
185) -> StdResult<CompiledUnitsResult, ExecutionFailure> {
186 let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
187 let mut result = vec![];
188 let mut output_results: Vec<Frame> = Vec::new();
189 let mut metrics = Vec::new();
190
191 for compiled in compiled_list.iter() {
192 result.clear();
193 let mut vm = Vm::from_services(symbols, services, params, tx.identity());
194 let start = services.runtime_context.clock.instant();
195 let run_result = vm.run(services, tx, &compiled.instructions, &mut result);
196 let execute_duration = start.elapsed();
197 symbols = vm.symbols;
198
199 metrics.push(StatementMetric {
200 fingerprint: compiled.fingerprint,
201 normalized_rql: compiled.normalized_rql.clone(),
202 compile_duration_us,
203 execute_duration_us: execute_duration.as_micros() as u64,
204 rows_affected: if run_result.is_ok() {
205 extract_rows_affected(&result)
206 } else {
207 0
208 },
209 });
210
211 if let Err(error) = run_result {
212 return Err(ExecutionFailure {
213 error,
214 partial_metrics: metrics,
215 });
216 }
217
218 if compiled.is_output {
219 output_results.append(&mut result);
220 }
221 }
222
223 Ok((output_results, result, symbols, metrics))
224}
225
226fn merge_results(mut output_results: Vec<Frame>, mut remaining: Vec<Frame>) -> Vec<Frame> {
228 output_results.append(&mut remaining);
229 output_results
230}
231
232fn extract_rows_affected(result: &[Frame]) -> u64 {
239 if result.len() == 1 {
240 let frame = &result[0];
241 for col in &frame.columns {
242 match col.name.as_str() {
243 "inserted" | "updated" | "deleted" => {
244 if col.data.len() == 1
245 && let Value::Uint8(n) = col.data.get_value(0)
246 {
247 return n;
248 }
249 }
250 _ => {}
251 }
252 }
253 }
254 result.len() as u64
255}
256
257impl Executor {
258 fn setup_symbols(&self, params: &Params, tx: &mut Transaction<'_>) -> Result<SymbolTable> {
260 let mut symbols = SymbolTable::new();
261 populate_symbols(&mut symbols, params)?;
262 populate_identity(&mut symbols, &self.catalog, tx)?;
263 Ok(symbols)
264 }
265
266 #[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
271 pub fn rql(&self, tx: &mut Transaction<'_>, rql: &str, params: Params) -> ExecutionResult {
272 let mut symbols = match self.setup_symbols(¶ms, tx) {
273 Ok(s) => s,
274 Err(e) => {
275 return ExecutionResult {
276 frames: vec![],
277 error: Some(e),
278 metrics: ExecutionMetrics::default(),
279 };
280 }
281 };
282
283 let start_compile = self.0.runtime_context.clock.instant();
284 let compiled_list = match self.compiler.compile_with_policy(tx, rql, inject_read_policies) {
285 Ok(CompilationResult::Ready(compiled)) => compiled,
286 Ok(CompilationResult::Incremental(_)) => {
287 unreachable!("incremental compilation not supported in rql()")
288 }
289 Err(err) => {
290 #[cfg(not(reifydb_single_threaded))]
291 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, rql, params) {
292 return ExecutionResult {
293 frames,
294 error: None,
295 metrics: ExecutionMetrics::default(),
296 };
297 }
298 return ExecutionResult {
299 frames: vec![],
300 error: Some(err),
301 metrics: ExecutionMetrics::default(),
302 };
303 }
304 };
305 let compile_duration = start_compile.elapsed();
306 let compile_duration_us = compile_duration.as_micros() as u64 / compiled_list.len().max(1) as u64;
307
308 let mut result = vec![];
309 let mut metrics = Vec::new();
310 for compiled in compiled_list.iter() {
311 result.clear();
312 let mut vm = Vm::from_services(symbols, &self.0, ¶ms, tx.identity());
313 let start_execute = self.0.runtime_context.clock.instant();
314 let run_result = vm.run(&self.0, tx, &compiled.instructions, &mut result);
315 let execute_duration = start_execute.elapsed();
316 symbols = vm.symbols;
317
318 metrics.push(StatementMetric {
319 fingerprint: compiled.fingerprint,
320 normalized_rql: compiled.normalized_rql.clone(),
321 compile_duration_us,
322 execute_duration_us: execute_duration.as_micros() as u64,
323 rows_affected: if run_result.is_ok() {
324 extract_rows_affected(&result)
325 } else {
326 0
327 },
328 });
329
330 if let Err(e) = run_result {
331 return ExecutionResult {
332 frames: vec![],
333 error: Some(e),
334 metrics: build_metrics(metrics),
335 };
336 }
337 }
338
339 ExecutionResult {
340 frames: result,
341 error: None,
342 metrics: build_metrics(metrics),
343 }
344 }
345
346 #[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
347 pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> ExecutionResult {
348 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Admin(&mut *txn)) {
349 Ok(s) => s,
350 Err(e) => {
351 return ExecutionResult {
352 frames: vec![],
353 error: Some(e),
354 metrics: ExecutionMetrics::default(),
355 };
356 }
357 };
358
359 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
360 &mut Transaction::Admin(&mut *txn),
361 "admin",
362 true,
363 ) {
364 return ExecutionResult {
365 frames: vec![],
366 error: Some(e),
367 metrics: ExecutionMetrics::default(),
368 };
369 }
370
371 let start_compile = self.0.runtime_context.clock.instant();
372 match self.compiler.compile_with_policy(&mut Transaction::Admin(txn), cmd.rql, inject_read_policies) {
373 Err(err) => {
374 #[cfg(not(reifydb_single_threaded))]
375 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
376 return ExecutionResult {
377 frames,
378 error: None,
379 metrics: ExecutionMetrics::default(),
380 };
381 }
382 ExecutionResult {
383 frames: vec![],
384 error: Some(err),
385 metrics: ExecutionMetrics::default(),
386 }
387 }
388 Ok(CompilationResult::Ready(compiled)) => {
389 let compile_duration = start_compile.elapsed();
390 match execute_compiled_units(
391 &self.0,
392 &mut Transaction::Admin(txn),
393 &compiled,
394 &cmd.params,
395 symbols,
396 compile_duration,
397 ) {
398 Ok((output, remaining, _, metrics)) => ExecutionResult {
399 frames: merge_results(output, remaining),
400 error: None,
401 metrics: build_metrics(metrics),
402 },
403 Err(f) => ExecutionResult {
404 frames: vec![],
405 error: Some(f.error),
406 metrics: build_metrics(f.partial_metrics),
407 },
408 }
409 }
410 Ok(CompilationResult::Incremental(mut state)) => {
411 let policy = constrain_policy(|plans, bump, cat, tx| {
412 inject_read_policies(plans, bump, cat, tx)
413 });
414 let mut result = vec![];
415 let mut output_results: Vec<Frame> = Vec::new();
416 let mut symbols = symbols;
417 let mut metrics = Vec::new();
418 loop {
419 let start_incr = self.0.runtime_context.clock.instant();
420 let next = match self.compiler.compile_next_with_policy(
421 &mut Transaction::Admin(txn),
422 &mut state,
423 &policy,
424 ) {
425 Ok(n) => n,
426 Err(e) => {
427 return ExecutionResult {
428 frames: vec![],
429 error: Some(e),
430 metrics: build_metrics(metrics),
431 };
432 }
433 };
434 let compile_duration = start_incr.elapsed();
435
436 let Some(compiled) = next else {
437 break;
438 };
439
440 result.clear();
441 let mut tx = Transaction::Admin(txn);
442 let mut vm = Vm::from_services(symbols, &self.0, &cmd.params, tx.identity());
443 let start_execute = self.0.runtime_context.clock.instant();
444 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
445 let execute_duration = start_execute.elapsed();
446 symbols = vm.symbols;
447
448 metrics.push(StatementMetric {
449 fingerprint: compiled.fingerprint,
450 normalized_rql: compiled.normalized_rql,
451 compile_duration_us: compile_duration.as_micros() as u64,
452 execute_duration_us: execute_duration.as_micros() as u64,
453 rows_affected: if run_result.is_ok() {
454 extract_rows_affected(&result)
455 } else {
456 0
457 },
458 });
459
460 if let Err(e) = run_result {
461 return ExecutionResult {
462 frames: vec![],
463 error: Some(e),
464 metrics: build_metrics(metrics),
465 };
466 }
467
468 if compiled.is_output {
469 output_results.append(&mut result);
470 }
471 }
472 ExecutionResult {
473 frames: merge_results(output_results, result),
474 error: None,
475 metrics: build_metrics(metrics),
476 }
477 }
478 }
479 }
480
481 #[instrument(name = "executor::test", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
482 pub fn test(&self, txn: &mut TestTransaction<'_>, cmd: Test<'_>) -> ExecutionResult {
483 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Test(Box::new(txn.reborrow()))) {
484 Ok(s) => s,
485 Err(e) => {
486 return ExecutionResult {
487 frames: vec![],
488 error: Some(e),
489 metrics: ExecutionMetrics::default(),
490 };
491 }
492 };
493
494 let session_type = txn.session_type.clone();
495 let session_default_deny = txn.session_default_deny;
496 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
497 &mut Transaction::Test(Box::new(txn.reborrow())),
498 &session_type,
499 session_default_deny,
500 ) {
501 return ExecutionResult {
502 frames: vec![],
503 error: Some(e),
504 metrics: ExecutionMetrics::default(),
505 };
506 }
507
508 let start_compile = self.0.runtime_context.clock.instant();
509 match self.compiler.compile_with_policy(
510 &mut Transaction::Test(Box::new(txn.reborrow())),
511 cmd.rql,
512 inject_read_policies,
513 ) {
514 Err(err) => {
515 #[cfg(not(reifydb_single_threaded))]
516 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, cmd.rql, cmd.params) {
517 return ExecutionResult {
518 frames,
519 error: None,
520 metrics: ExecutionMetrics::default(),
521 };
522 }
523 ExecutionResult {
524 frames: vec![],
525 error: Some(err),
526 metrics: ExecutionMetrics::default(),
527 }
528 }
529 Ok(CompilationResult::Ready(compiled)) => {
530 let compile_duration = start_compile.elapsed();
531 match execute_compiled_units(
532 &self.0,
533 &mut Transaction::Test(Box::new(txn.reborrow())),
534 &compiled,
535 &cmd.params,
536 symbols,
537 compile_duration,
538 ) {
539 Ok((output, remaining, _, metrics)) => ExecutionResult {
540 frames: merge_results(output, remaining),
541 error: None,
542 metrics: build_metrics(metrics),
543 },
544 Err(f) => ExecutionResult {
545 frames: vec![],
546 error: Some(f.error),
547 metrics: build_metrics(f.partial_metrics),
548 },
549 }
550 }
551 Ok(CompilationResult::Incremental(mut state)) => {
552 let policy = constrain_policy(|plans, bump, cat, tx| {
553 inject_read_policies(plans, bump, cat, tx)
554 });
555 let mut result = vec![];
556 let mut output_results: Vec<Frame> = Vec::new();
557 let mut symbols = symbols;
558 let mut metrics = Vec::new();
559 loop {
560 let start_incr = self.0.runtime_context.clock.instant();
561 let next = match self.compiler.compile_next_with_policy(
562 &mut Transaction::Test(Box::new(txn.reborrow())),
563 &mut state,
564 &policy,
565 ) {
566 Ok(n) => n,
567 Err(e) => {
568 return ExecutionResult {
569 frames: vec![],
570 error: Some(e),
571 metrics: build_metrics(metrics),
572 };
573 }
574 };
575 let compile_duration = start_incr.elapsed();
576
577 let Some(compiled) = next else {
578 break;
579 };
580
581 result.clear();
582 let mut tx = Transaction::Test(Box::new(txn.reborrow()));
583 let mut vm = Vm::from_services(symbols, &self.0, &cmd.params, tx.identity());
584 let start_execute = self.0.runtime_context.clock.instant();
585 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
586 let execute_duration = start_execute.elapsed();
587 symbols = vm.symbols;
588
589 metrics.push(StatementMetric {
590 fingerprint: compiled.fingerprint,
591 normalized_rql: compiled.normalized_rql,
592 compile_duration_us: compile_duration.as_micros() as u64,
593 execute_duration_us: execute_duration.as_micros() as u64,
594 rows_affected: if run_result.is_ok() {
595 extract_rows_affected(&result)
596 } else {
597 0
598 },
599 });
600
601 if let Err(e) = run_result {
602 return ExecutionResult {
603 frames: vec![],
604 error: Some(e),
605 metrics: build_metrics(metrics),
606 };
607 }
608
609 if compiled.is_output {
610 output_results.append(&mut result);
611 }
612 }
613 ExecutionResult {
614 frames: merge_results(output_results, result),
615 error: None,
616 metrics: build_metrics(metrics),
617 }
618 }
619 }
620 }
621
622 #[instrument(name = "executor::subscription", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
623 pub fn subscription(&self, txn: &mut QueryTransaction, cmd: Subscription<'_>) -> ExecutionResult {
624 let bump = Bump::new();
626 let statements = match parse_str(&bump, cmd.rql) {
627 Ok(s) => s,
628 Err(e) => {
629 return ExecutionResult {
630 frames: vec![],
631 error: Some(e),
632 metrics: ExecutionMetrics::default(),
633 };
634 }
635 };
636
637 if statements.len() != 1 {
638 return ExecutionResult {
639 frames: vec![],
640 error: Some(Error(Box::new(subscription::single_statement_required(
641 "Subscription endpoint requires exactly one statement",
642 )))),
643 metrics: ExecutionMetrics::default(),
644 };
645 }
646
647 let statement = &statements[0];
648 if statement.nodes.len() != 1 || !statement.nodes[0].is_subscription_ddl() {
649 return ExecutionResult {
650 frames: vec![],
651 error: Some(Error(Box::new(subscription::invalid_statement(
652 "Subscription endpoint only supports CREATE SUBSCRIPTION or DROP SUBSCRIPTION",
653 )))),
654 metrics: ExecutionMetrics::default(),
655 };
656 }
657
658 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Query(&mut *txn)) {
659 Ok(s) => s,
660 Err(e) => {
661 return ExecutionResult {
662 frames: vec![],
663 error: Some(e),
664 metrics: ExecutionMetrics::default(),
665 };
666 }
667 };
668
669 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
670 &mut Transaction::Query(&mut *txn),
671 "subscription",
672 true,
673 ) {
674 return ExecutionResult {
675 frames: vec![],
676 error: Some(e),
677 metrics: ExecutionMetrics::default(),
678 };
679 }
680
681 let start_compile = self.0.runtime_context.clock.instant();
682 let compiled = match self.compiler.compile_with_policy(
683 &mut Transaction::Query(txn),
684 cmd.rql,
685 inject_read_policies,
686 ) {
687 Ok(CompilationResult::Ready(compiled)) => compiled,
688 Ok(CompilationResult::Incremental(_)) => {
689 unreachable!("Single subscription statement should not require incremental compilation")
690 }
691 Err(err) => {
692 return ExecutionResult {
693 frames: vec![],
694 error: Some(err),
695 metrics: ExecutionMetrics::default(),
696 };
697 }
698 };
699 let compile_duration = start_compile.elapsed();
700
701 match execute_compiled_units(
702 &self.0,
703 &mut Transaction::Query(txn),
704 &compiled,
705 &cmd.params,
706 symbols,
707 compile_duration,
708 ) {
709 Ok((output, remaining, _, metrics)) => ExecutionResult {
710 frames: merge_results(output, remaining),
711 error: None,
712 metrics: build_metrics(metrics),
713 },
714 Err(f) => ExecutionResult {
715 frames: vec![],
716 error: Some(f.error),
717 metrics: build_metrics(f.partial_metrics),
718 },
719 }
720 }
721
722 #[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
723 pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> ExecutionResult {
724 let symbols = match self.setup_symbols(&cmd.params, &mut Transaction::Command(&mut *txn)) {
725 Ok(s) => s,
726 Err(e) => {
727 return ExecutionResult {
728 frames: vec![],
729 error: Some(e),
730 metrics: ExecutionMetrics::default(),
731 };
732 }
733 };
734
735 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
736 &mut Transaction::Command(&mut *txn),
737 "command",
738 false,
739 ) {
740 return ExecutionResult {
741 frames: vec![],
742 error: Some(e),
743 metrics: ExecutionMetrics::default(),
744 };
745 }
746
747 let start_compile = self.0.runtime_context.clock.instant();
748 let compiled = match self.compiler.compile_with_policy(
749 &mut Transaction::Command(txn),
750 cmd.rql,
751 inject_read_policies,
752 ) {
753 Ok(CompilationResult::Ready(compiled)) => compiled,
754 Ok(CompilationResult::Incremental(_)) => {
755 unreachable!("DDL statements require admin transactions, not command transactions")
756 }
757 Err(err) => {
758 #[cfg(not(reifydb_single_threaded))]
759 if self.0.remote_registry.is_some() && remote::is_remote_query(&err) {
760 return ExecutionResult {
761 frames: vec![],
762 error: Some(Error(Box::new(Diagnostic {
763 code: "REMOTE_002".to_string(),
764 message: "Write operations on remote namespaces are not supported"
765 .to_string(),
766 help: Some("Use the remote instance directly for write operations"
767 .to_string()),
768 ..Default::default()
769 }))),
770 metrics: ExecutionMetrics::default(),
771 };
772 }
773 return ExecutionResult {
774 frames: vec![],
775 error: Some(err),
776 metrics: ExecutionMetrics::default(),
777 };
778 }
779 };
780 let compile_duration = start_compile.elapsed();
781
782 match execute_compiled_units(
783 &self.0,
784 &mut Transaction::Command(txn),
785 &compiled,
786 &cmd.params,
787 symbols,
788 compile_duration,
789 ) {
790 Ok((output, remaining, _, metrics)) => ExecutionResult {
791 frames: merge_results(output, remaining),
792 error: None,
793 metrics: build_metrics(metrics),
794 },
795 Err(f) => ExecutionResult {
796 frames: vec![],
797 error: Some(f.error),
798 metrics: build_metrics(f.partial_metrics),
799 },
800 }
801 }
802
803 #[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
805 pub fn call_procedure(&self, txn: &mut CommandTransaction, name: &str, params: &Params) -> ExecutionResult {
806 let rql = format!("CALL {}()", name);
807 let symbols = match self.setup_symbols(params, &mut Transaction::Command(&mut *txn)) {
808 Ok(s) => s,
809 Err(e) => {
810 return ExecutionResult {
811 frames: vec![],
812 error: Some(e),
813 metrics: ExecutionMetrics::default(),
814 };
815 }
816 };
817
818 let start_compile = self.0.runtime_context.clock.instant();
819 let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql) {
820 Ok(CompilationResult::Ready(compiled)) => compiled,
821 Ok(CompilationResult::Incremental(_)) => {
822 unreachable!("CALL statements should not require incremental compilation")
823 }
824 Err(e) => {
825 return ExecutionResult {
826 frames: vec![],
827 error: Some(e),
828 metrics: ExecutionMetrics::default(),
829 };
830 }
831 };
832 let compile_duration = start_compile.elapsed();
833 let compile_duration_us = compile_duration.as_micros() as u64 / compiled.len().max(1) as u64;
834
835 let mut result = vec![];
836 let mut metrics = Vec::new();
837 let mut symbols = symbols;
838 for compiled in compiled.iter() {
839 result.clear();
840 let mut tx = Transaction::Command(txn);
841 let mut vm = Vm::from_services(symbols, &self.0, params, tx.identity());
842 let start_execute = self.0.runtime_context.clock.instant();
843 let run_result = vm.run(&self.0, &mut tx, &compiled.instructions, &mut result);
844 let execute_duration = start_execute.elapsed();
845 symbols = vm.symbols;
846
847 metrics.push(StatementMetric {
848 fingerprint: compiled.fingerprint,
849 normalized_rql: compiled.normalized_rql.clone(),
850 compile_duration_us,
851 execute_duration_us: execute_duration.as_micros() as u64,
852 rows_affected: if run_result.is_ok() {
853 extract_rows_affected(&result)
854 } else {
855 0
856 },
857 });
858
859 if let Err(e) = run_result {
860 return ExecutionResult {
861 frames: vec![],
862 error: Some(e),
863 metrics: build_metrics(metrics),
864 };
865 }
866 }
867
868 ExecutionResult {
869 frames: result,
870 error: None,
871 metrics: build_metrics(metrics),
872 }
873 }
874
875 #[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
876 pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> ExecutionResult {
877 let symbols = match self.setup_symbols(&qry.params, &mut Transaction::Query(&mut *txn)) {
878 Ok(s) => s,
879 Err(e) => {
880 return ExecutionResult {
881 frames: vec![],
882 error: Some(e),
883 metrics: ExecutionMetrics::default(),
884 };
885 }
886 };
887
888 if let Err(e) = PolicyEvaluator::new(&self.0, &symbols).enforce_session_policy(
889 &mut Transaction::Query(&mut *txn),
890 "query",
891 false,
892 ) {
893 return ExecutionResult {
894 frames: vec![],
895 error: Some(e),
896 metrics: ExecutionMetrics::default(),
897 };
898 }
899
900 let start_compile = self.0.runtime_context.clock.instant();
901 let compiled = match self.compiler.compile_with_policy(
902 &mut Transaction::Query(txn),
903 qry.rql,
904 inject_read_policies,
905 ) {
906 Ok(CompilationResult::Ready(compiled)) => compiled,
907 Ok(CompilationResult::Incremental(_)) => {
908 unreachable!("DDL statements require admin transactions, not query transactions")
909 }
910 Err(err) => {
911 #[cfg(not(reifydb_single_threaded))]
912 if let Ok(Some(frames)) = self.try_forward_remote_query(&err, qry.rql, qry.params) {
913 return ExecutionResult {
914 frames,
915 error: None,
916 metrics: ExecutionMetrics::default(),
917 };
918 }
919 return ExecutionResult {
920 frames: vec![],
921 error: Some(err),
922 metrics: ExecutionMetrics::default(),
923 };
924 }
925 };
926 let compile_duration = start_compile.elapsed();
927
928 let exec_result = execute_compiled_units(
929 &self.0,
930 &mut Transaction::Query(txn),
931 &compiled,
932 &qry.params,
933 symbols,
934 compile_duration,
935 );
936
937 match exec_result {
938 Ok((output, remaining, _, metrics)) => ExecutionResult {
939 frames: merge_results(output, remaining),
940 error: None,
941 metrics: build_metrics(metrics),
942 },
943 Err(f) => ExecutionResult {
944 frames: vec![],
945 error: Some(f.error),
946 metrics: build_metrics(f.partial_metrics),
947 },
948 }
949 }
950}