1use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{catalog::Catalog, vtable::system::flow_operator_store::FlowOperatorStore};
7use reifydb_core::{util::ioc::IocContainer, value::column::columns::Columns};
8use reifydb_function::registry::Functions;
9use reifydb_metric::metric::MetricReader;
10use reifydb_policy::inject_read_policies;
11use reifydb_rql::compiler::{CompilationResult, constrain_policy};
12use reifydb_runtime::clock::Clock;
13use reifydb_store_single::SingleStore;
14use reifydb_transaction::transaction::{
15 Transaction, admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
16};
17use reifydb_type::{
18 params::Params,
19 value::{Value, frame::frame::Frame, identity::IdentityId, r#type::Type},
20};
21use tracing::instrument;
22
23use crate::{
24 Result,
25 policy::PolicyEvaluator,
26 procedure::registry::Procedures,
27 transform::registry::Transforms,
28 vm::{
29 Admin, Command, Query,
30 services::Services,
31 stack::{SymbolTable, Variable},
32 vm::Vm,
33 },
34};
35
36pub struct Executor(Arc<Services>);
38
39impl Clone for Executor {
40 fn clone(&self) -> Self {
41 Self(self.0.clone())
42 }
43}
44
45impl Deref for Executor {
46 type Target = Services;
47
48 fn deref(&self) -> &Self::Target {
49 &self.0
50 }
51}
52
53impl Executor {
54 pub fn new(
55 catalog: Catalog,
56 clock: Clock,
57 functions: Functions,
58 procedures: Procedures,
59 transforms: Transforms,
60 flow_operator_store: FlowOperatorStore,
61 stats_reader: MetricReader<SingleStore>,
62 ioc: IocContainer,
63 ) -> Self {
64 Self(Arc::new(Services::new(
65 catalog,
66 clock,
67 functions,
68 procedures,
69 transforms,
70 flow_operator_store,
71 stats_reader,
72 ioc,
73 )))
74 }
75
76 pub fn services(&self) -> &Arc<Services> {
78 &self.0
79 }
80
81 pub fn from_services(services: Arc<Services>) -> Self {
83 Self(services)
84 }
85
86 #[allow(dead_code)]
87 pub fn testing() -> Self {
88 Self(Services::testing())
89 }
90}
91
92fn populate_stack(stack: &mut SymbolTable, params: &Params) -> Result<()> {
94 match params {
95 Params::Positional(values) => {
96 for (index, value) in values.iter().enumerate() {
97 let param_name = (index + 1).to_string();
98 stack.set(param_name, Variable::scalar(value.clone()), false)?;
99 }
100 }
101 Params::Named(map) => {
102 for (name, value) in map {
103 stack.set(name.clone(), Variable::scalar(value.clone()), false)?;
104 }
105 }
106 Params::None => {}
107 }
108 Ok(())
109}
110
111fn populate_identity(
114 stack: &mut SymbolTable,
115 catalog: &Catalog,
116 tx: &mut Transaction<'_>,
117 identity: IdentityId,
118) -> Result<()> {
119 if identity.is_privileged() {
120 return Ok(());
121 }
122 if identity.is_anonymous() {
123 let columns = Columns::single_row([
124 ("id", Value::IdentityId(identity)),
125 ("name", Value::none_of(Type::Utf8)),
126 ("roles", Value::List(vec![])),
127 ]);
128 stack.set("identity".to_string(), Variable::Columns(columns), false)?;
129 return Ok(());
130 }
131 if let Some(user) = catalog.find_user_by_identity(tx, identity)? {
132 let roles = catalog.find_role_names_for_identity(tx, identity)?;
133 let role_values: Vec<Value> = roles.into_iter().map(Value::Utf8).collect();
134 let columns = Columns::single_row([
135 ("id", Value::IdentityId(identity)),
136 ("name", Value::Utf8(user.name)),
137 ("roles", Value::List(role_values)),
138 ]);
139 stack.set("identity".to_string(), Variable::Columns(columns), false)?;
140 }
141 Ok(())
142}
143
144impl Executor {
145 #[instrument(name = "executor::rql", level = "debug", skip(self, tx, params), fields(rql = %rql))]
150 pub fn rql(
151 &self,
152 tx: &mut Transaction<'_>,
153 identity: IdentityId,
154 rql: &str,
155 params: Params,
156 ) -> Result<Vec<Frame>> {
157 let mut result = vec![];
158 let mut symbol_table = SymbolTable::new();
159 populate_stack(&mut symbol_table, ¶ms)?;
160 populate_identity(&mut symbol_table, &self.catalog, tx, identity)?;
161
162 let compiled = match self.compiler.compile_with_policy(tx, rql, |plans, bump, cat, tx| {
163 inject_read_policies(plans, bump, cat, tx, identity)
164 })? {
165 CompilationResult::Ready(compiled) => compiled,
166 CompilationResult::Incremental(_) => {
167 unreachable!("incremental compilation not supported in rql()")
168 }
169 };
170
171 for compiled in compiled.iter() {
172 result.clear();
173 let mut vm = Vm::new(symbol_table, identity);
174 vm.run(&self.0, tx, &compiled.instructions, ¶ms, &mut result)?;
175 symbol_table = vm.symbol_table;
176 }
177
178 Ok(result)
179 }
180
181 #[instrument(name = "executor::admin", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
182 pub fn admin(&self, txn: &mut AdminTransaction, cmd: Admin<'_>) -> Result<Vec<Frame>> {
183 let mut result = vec![];
184 let mut output_results: Vec<Frame> = Vec::new();
185 let mut symbol_table = SymbolTable::new();
186 populate_stack(&mut symbol_table, &cmd.params)?;
187
188 let identity = cmd.identity;
189 populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Admin(&mut *txn), identity)?;
190
191 PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
192 &mut Transaction::Admin(&mut *txn),
193 identity,
194 "admin",
195 true,
196 )?;
197
198 match self.compiler.compile_with_policy(
199 &mut Transaction::Admin(txn),
200 cmd.rql,
201 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
202 )? {
203 CompilationResult::Ready(compiled) => {
204 for compiled in compiled.iter() {
205 result.clear();
206 let mut tx = Transaction::Admin(txn);
207 let mut vm = Vm::new(symbol_table, identity);
208 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
209 symbol_table = vm.symbol_table;
210
211 if compiled.is_output {
212 output_results.append(&mut result);
213 }
214 }
215 }
216 CompilationResult::Incremental(mut state) => {
217 let policy = constrain_policy(|plans, bump, cat, tx| {
218 inject_read_policies(plans, bump, cat, tx, identity)
219 });
220 while let Some(compiled) = self.compiler.compile_next_with_policy(
221 &mut Transaction::Admin(txn),
222 &mut state,
223 &policy,
224 )? {
225 result.clear();
226 let mut tx = Transaction::Admin(txn);
227 let mut vm = Vm::new(symbol_table, identity);
228 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
229 symbol_table = vm.symbol_table;
230
231 if compiled.is_output {
232 output_results.append(&mut result);
233 }
234 }
235 }
236 }
237
238 let mut final_result = output_results;
239 final_result.append(&mut result);
240 Ok(final_result)
241 }
242
243 #[instrument(name = "executor::command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
244 pub fn command(&self, txn: &mut CommandTransaction, cmd: Command<'_>) -> Result<Vec<Frame>> {
245 let mut result = vec![];
246 let mut output_results: Vec<Frame> = Vec::new();
247 let mut symbol_table = SymbolTable::new();
248 populate_stack(&mut symbol_table, &cmd.params)?;
249
250 let identity = cmd.identity;
251 populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Command(&mut *txn), identity)?;
252
253 PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
254 &mut Transaction::Command(&mut *txn),
255 identity,
256 "command",
257 false,
258 )?;
259
260 let compiled = match self.compiler.compile_with_policy(
261 &mut Transaction::Command(txn),
262 cmd.rql,
263 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
264 )? {
265 CompilationResult::Ready(compiled) => compiled,
266 CompilationResult::Incremental(_) => {
267 unreachable!("DDL statements require admin transactions, not command transactions")
268 }
269 };
270
271 for compiled in compiled.iter() {
272 result.clear();
273 let mut tx = Transaction::Command(txn);
274 let mut vm = Vm::new(symbol_table, identity);
275 vm.run(&self.0, &mut tx, &compiled.instructions, &cmd.params, &mut result)?;
276 symbol_table = vm.symbol_table;
277
278 if compiled.is_output {
279 output_results.append(&mut result);
280 }
281 }
282
283 let mut final_result = output_results;
284 final_result.append(&mut result);
285 Ok(final_result)
286 }
287
288 #[instrument(name = "executor::call_procedure", level = "debug", skip(self, txn, params), fields(name = %name))]
290 pub fn call_procedure(
291 &self,
292 txn: &mut CommandTransaction,
293 identity: IdentityId,
294 name: &str,
295 params: &Params,
296 ) -> Result<Vec<Frame>> {
297 let rql = format!("CALL {}()", name);
299 let mut result = vec![];
300 let mut symbol_table = SymbolTable::new();
301 populate_stack(&mut symbol_table, params)?;
302 populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Command(&mut *txn), identity)?;
303
304 let compiled = match self.compiler.compile(&mut Transaction::Command(txn), &rql)? {
305 CompilationResult::Ready(compiled) => compiled,
306 CompilationResult::Incremental(_) => {
307 unreachable!("CALL statements should not require incremental compilation")
308 }
309 };
310
311 for compiled in compiled.iter() {
312 result.clear();
313 let mut tx = Transaction::Command(txn);
314 let mut vm = Vm::new(symbol_table, identity);
315 vm.run(&self.0, &mut tx, &compiled.instructions, params, &mut result)?;
316 symbol_table = vm.symbol_table;
317 }
318
319 Ok(result)
320 }
321
322 #[instrument(name = "executor::query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
323 pub fn query(&self, txn: &mut QueryTransaction, qry: Query<'_>) -> Result<Vec<Frame>> {
324 let mut result = vec![];
325 let mut output_results: Vec<Frame> = Vec::new();
326 let mut symbol_table = SymbolTable::new();
327 populate_stack(&mut symbol_table, &qry.params)?;
328
329 let identity = qry.identity;
330 populate_identity(&mut symbol_table, &self.catalog, &mut Transaction::Query(&mut *txn), identity)?;
331
332 PolicyEvaluator::new(&self.0, &symbol_table).enforce_session_policy(
333 &mut Transaction::Query(&mut *txn),
334 identity,
335 "query",
336 false,
337 )?;
338
339 let compiled = match self.compiler.compile_with_policy(
340 &mut Transaction::Query(txn),
341 qry.rql,
342 |plans, bump, cat, tx| inject_read_policies(plans, bump, cat, tx, identity),
343 )? {
344 CompilationResult::Ready(compiled) => compiled,
345 CompilationResult::Incremental(_) => {
346 unreachable!("DDL statements require admin transactions, not query transactions")
347 }
348 };
349
350 for compiled in compiled.iter() {
351 result.clear();
352 let mut tx = Transaction::Query(txn);
353 let mut vm = Vm::new(symbol_table, identity);
354 vm.run(&self.0, &mut tx, &compiled.instructions, &qry.params, &mut result)?;
355 symbol_table = vm.symbol_table;
356
357 if compiled.is_output {
358 output_results.append(&mut result);
359 }
360 }
361
362 let mut final_result = output_results;
363 final_result.append(&mut result);
364 Ok(final_result)
365 }
366}