reifydb_engine/vm/
executor.rs1use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::FlowOperatorStore};
7use reifydb_core::util::ioc::IocContainer;
8use reifydb_function::registry::Functions;
9use reifydb_metric::metric::MetricReader;
10use reifydb_rql::compiler::CompilationResult;
11use reifydb_runtime::clock::Clock;
12use reifydb_store_single::SingleStore;
13use reifydb_transaction::transaction::{
14 Transaction, admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
15};
16use reifydb_type::{params::Params, value::frame::frame::Frame};
17use tracing::instrument;
18
19use crate::{
20 transform::registry::Transforms,
21 vm::{
22 Admin, Command, Query,
23 services::Services,
24 stack::{SymbolTable, Variable},
25 vm::Vm,
26 },
27};
28
29pub struct Executor(Arc<Services>);
31
32impl Clone for Executor {
33 fn clone(&self) -> Self {
34 Self(self.0.clone())
35 }
36}
37
38impl Deref for Executor {
39 type Target = Services;
40
41 fn deref(&self) -> &Self::Target {
42 &self.0
43 }
44}
45
46impl Executor {
47 pub fn new(
48 catalog: Catalog,
49 clock: Clock,
50 functions: Functions,
51 transforms: Transforms,
52 flow_operator_store: FlowOperatorStore,
53 stats_reader: MetricReader<SingleStore>,
54 ioc: IocContainer,
55 ) -> Self {
56 Self(Arc::new(Services::new(
57 catalog,
58 clock,
59 functions,
60 transforms,
61 flow_operator_store,
62 stats_reader,
63 ioc,
64 )))
65 }
66
67 pub fn services(&self) -> &Arc<Services> {
69 &self.0
70 }
71
72 #[allow(dead_code)]
73 pub fn testing() -> Self {
74 Self(Services::testing())
75 }
76}
77
78fn populate_stack(stack: &mut SymbolTable, params: &Params) -> crate::Result<()> {
80 match params {
81 Params::Positional(values) => {
82 for (index, value) in values.iter().enumerate() {
83 let param_name = (index + 1).to_string();
84 stack.set(param_name, Variable::scalar(value.clone()), false)?;
85 }
86 }
87 Params::Named(map) => {
88 for (name, value) in map {
89 stack.set(name.clone(), Variable::scalar(value.clone()), false)?;
90 }
91 }
92 Params::None => {}
93 }
94 Ok(())
95}
96
97impl Executor {
98 #[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
99 pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> crate::Result<Vec<Frame>> {
100 let mut result = vec![];
101 let mut output_results: Vec<Frame> = Vec::new();
102 let mut symbol_table = SymbolTable::new();
103 populate_stack(&mut symbol_table, &cmd.params)?;
104
105 match self.compiler.compile(&mut Transaction::Admin(txn), cmd.rql)? {
106 CompilationResult::Ready(compiled) => {
107 for compiled in compiled.iter() {
108 result.clear();
109 let mut tx = Transaction::Admin(txn);
110 let mut vm = Vm::new(symbol_table);
111 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
112 symbol_table = vm.symbol_table;
113
114 if compiled.is_output {
115 output_results.append(&mut result);
116 }
117 }
118 }
119 CompilationResult::Incremental(mut state) => {
120 while let Some(compiled) =
121 self.compiler.compile_next(&mut Transaction::Admin(txn), &mut state)?
122 {
123 result.clear();
124 let mut tx = Transaction::Admin(txn);
125 let mut vm = Vm::new(symbol_table);
126 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
127 symbol_table = vm.symbol_table;
128
129 if compiled.is_output {
130 output_results.append(&mut result);
131 }
132 }
133 }
134 }
135
136 let mut final_result = output_results;
137 final_result.append(&mut result);
138 Ok(final_result)
139 }
140
141 #[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
142 pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
143 let mut result = vec![];
144 let mut output_results: Vec<Frame> = Vec::new();
145 let mut symbol_table = SymbolTable::new();
146 populate_stack(&mut symbol_table, &cmd.params)?;
147
148 let compiled = match self.compiler.compile(&mut Transaction::Command(txn), cmd.rql)? {
149 CompilationResult::Ready(compiled) => compiled,
150 CompilationResult::Incremental(_) => {
151 unreachable!("DDL statements require admin transactions, not command transactions")
152 }
153 };
154
155 for compiled in compiled.iter() {
156 result.clear();
157 let mut tx = Transaction::Command(txn);
158 let mut vm = Vm::new(symbol_table);
159 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
160 symbol_table = vm.symbol_table;
161
162 if compiled.is_output {
163 output_results.append(&mut result);
164 }
165 }
166
167 let mut final_result = output_results;
168 final_result.append(&mut result);
169 Ok(final_result)
170 }
171
172 #[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
173 pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
174 let mut result = vec![];
175 let mut output_results: Vec<Frame> = Vec::new();
176 let mut symbol_table = SymbolTable::new();
177 populate_stack(&mut symbol_table, &qry.params)?;
178
179 let compiled = match self.compiler.compile(&mut Transaction::Query(txn), qry.rql)? {
180 CompilationResult::Ready(compiled) => compiled,
181 CompilationResult::Incremental(_) => {
182 unreachable!("DDL statements require admin transactions, not query transactions")
183 }
184 };
185
186 for compiled in compiled.iter() {
187 result.clear();
188 let mut tx = Transaction::Query(txn);
189 let mut vm = Vm::new(symbol_table);
190 vm.run(&self.0, &mut tx, &compiled.instructions, &qry.params, &mut result)?;
191 symbol_table = vm.symbol_table;
192
193 if compiled.is_output {
194 output_results.append(&mut result);
195 }
196 }
197
198 let mut final_result = output_results;
199 final_result.append(&mut result);
200 Ok(final_result)
201 }
202}