1use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{internal_error, value::column::columns::Columns};
7use reifydb_routine::routine::registry::Routines;
8use reifydb_rql::instruction::{Instruction, ScopeType};
9use reifydb_runtime::context::RuntimeContext;
10use reifydb_transaction::transaction::Transaction;
11use reifydb_value::{
12 params::Params,
13 util::bitvec::BitVec,
14 value::{Value, frame::frame::Frame, identity::IdentityId},
15};
16
17use super::{
18 exec::{
19 mask::{LoopMaskState, MaskFrame, extract_bool_bitvec},
20 stack::strip_dollar_prefix,
21 },
22 instruction::{
23 ddl::{
24 alter::{
25 remote_namespace::alter_remote_namespace, sequence::alter_table_sequence,
26 table::execute_alter_table,
27 },
28 create::{
29 binding::create_binding, deferred::create_deferred_view, dictionary::create_dictionary,
30 migration::create_migration, namespace::create_namespace,
31 primary_key::create_primary_key, procedure::create_procedure,
32 property::create_column_property, remote_namespace::create_remote_namespace,
33 ringbuffer::create_ringbuffer, series::create_series, sink::create_sink,
34 source::create_source, subscription::create_subscription, sumtype::create_sumtype,
35 table::create_table, tag::create_tag, test::create_test,
36 transactional::create_transactional_view,
37 },
38 drop::{
39 binding::drop_binding, dictionary::drop_dictionary, namespace::drop_namespace,
40 procedure::drop_procedure, ringbuffer::drop_ringbuffer, series::drop_series,
41 sink::drop_sink, source::drop_source, subscription::drop_subscription,
42 sumtype::drop_sumtype, table::drop_table, view::drop_view,
43 },
44 },
45 dml::{
46 dictionary_insert::insert_dictionary, ringbuffer_delete::delete_ringbuffer,
47 ringbuffer_insert::insert_ringbuffer, ringbuffer_update::update_ringbuffer,
48 series_delete::delete_series, series_insert::insert_series, series_update::update_series,
49 table_delete::delete, table_insert::insert_table, table_update::update_table,
50 },
51 },
52 services::Services,
53 stack::{ControlFlow, Stack, SymbolTable, Variable},
54};
55use crate::{
56 Result,
57 expression::context::EvalContext,
58 vm::instruction::ddl::{
59 alter::policy::alter_policy,
60 create::{
61 authentication::create_authentication, event::create_event, identity::create_identity,
62 policy::create_policy, role::create_role,
63 },
64 drop::{
65 authentication::drop_authentication, handler::drop_handler, identity::drop_identity,
66 policy::drop_policy, role::drop_role, test::drop_test,
67 },
68 grant::grant,
69 revoke::revoke,
70 },
71};
72
73pub static EMPTY_PARAMS: LazyLock<Params> = LazyLock::new(|| Params::None);
74
75pub struct Vm<'a> {
76 pub(crate) ip: usize,
77 pub(crate) iteration_count: usize,
78 pub(crate) stack: Stack,
79 pub symbols: SymbolTable,
80 pub control_flow: ControlFlow,
81 pub(crate) dispatch_depth: u8,
82
83 pub(crate) batch_size: usize,
84
85 pub(crate) active_mask: Option<BitVec>,
86
87 pub(crate) mask_stack: Vec<MaskFrame>,
88
89 pub(crate) loop_mask_stack: Vec<LoopMaskState>,
90
91 pub(crate) params: &'a Params,
92 pub(crate) routines: &'a Routines,
93 pub(crate) runtime_context: &'a RuntimeContext,
94 pub(crate) identity: IdentityId,
95}
96
97impl<'a> Vm<'a> {
98 pub fn from_services(
99 symbols: SymbolTable,
100 services: &'a Services,
101 params: &'a Params,
102 identity: IdentityId,
103 ) -> Self {
104 Self::build(symbols, 1, params, &services.routines, &services.runtime_context, identity)
105 }
106
107 pub fn with_batch_size_from_services(
108 symbols: SymbolTable,
109 batch_size: usize,
110 services: &'a Services,
111 params: &'a Params,
112 identity: IdentityId,
113 ) -> Self {
114 Self::build(symbols, batch_size, params, &services.routines, &services.runtime_context, identity)
115 }
116
117 fn build(
118 symbols: SymbolTable,
119 batch_size: usize,
120 params: &'a Params,
121 routines: &'a Routines,
122 runtime_context: &'a RuntimeContext,
123 identity: IdentityId,
124 ) -> Self {
125 Self {
126 ip: 0,
127 iteration_count: 0,
128 stack: Stack::new(),
129 symbols,
130 control_flow: ControlFlow::Normal,
131 dispatch_depth: 0,
132 batch_size,
133 active_mask: None,
134 mask_stack: Vec::new(),
135 loop_mask_stack: Vec::new(),
136 params,
137 routines,
138 runtime_context,
139 identity,
140 }
141 }
142
143 pub(crate) fn eval_ctx(&self) -> EvalContext<'_> {
144 EvalContext {
145 params: self.params,
146 symbols: &self.symbols,
147 routines: self.routines,
148 runtime_context: self.runtime_context,
149 arena: None,
150 identity: self.identity,
151 is_aggregate_context: false,
152 columns: Columns::empty(),
153 row_count: self.batch_size,
154 target: None,
155 take: None,
156 }
157 }
158
159 pub(crate) fn run_isolated_body(
160 &mut self,
161 services: &Arc<Services>,
162 tx: &mut Transaction<'_>,
163 instructions: &[Instruction],
164 result: &mut Vec<Frame>,
165 ) -> Result<()> {
166 let saved = self.params;
167 self.params = &EMPTY_PARAMS;
168 let run_result = self.run(services, tx, instructions, result);
169 self.params = saved;
170 run_result
171 }
172
173 pub(crate) fn pop_value(&mut self) -> Result<Value> {
174 match self.stack.pop()? {
175 Variable::Columns {
176 columns: c,
177 } if c.is_scalar() => Ok(c.scalar_value()),
178 _ => Err(internal_error!("Expected scalar value on stack")),
179 }
180 }
181
182 pub(crate) fn pop_as_columns(&mut self) -> Result<Columns> {
183 match self.stack.pop()? {
184 Variable::Columns {
185 columns: c,
186 ..
187 }
188 | Variable::ForIterator {
189 columns: c,
190 ..
191 } => Ok(c),
192 Variable::Closure(_) => Ok(Columns::single_row([("value", Value::none())])),
193 }
194 }
195
196 pub(crate) fn run(
197 &mut self,
198 services: &Arc<Services>,
199 tx: &mut Transaction<'_>,
200 instructions: &[Instruction],
201 result: &mut Vec<Frame>,
202 ) -> Result<()> {
203 let params = self.params;
204 while self.ip < instructions.len() {
205 let _ = self.batch_size > 1 && self.check_mask_merge_point()?;
206
207 match &instructions[self.ip] {
208 Instruction::Halt => return Ok(()),
209 Instruction::Nop => {}
210
211 Instruction::PushConst(v) => self.exec_push_const(v),
212 Instruction::PushNone => self.exec_push_none(),
213 Instruction::Pop => self.exec_pop()?,
214 Instruction::Dup => self.exec_dup()?,
215
216 Instruction::LoadVar(f) => self.exec_load_var(f)?,
217 Instruction::StoreVar(f) => {
218 if self.batch_size > 1 && self.is_masked() {
219 let name = strip_dollar_prefix(f.text());
220 let value = self.stack.pop()?;
221 self.exec_store_var_masked(name, value)?;
222 } else if self.batch_size > 1 {
223 let name = strip_dollar_prefix(f.text());
224 let value = self.stack.pop()?;
225 self.symbols.reassign(name.to_string(), value)?;
226 } else {
227 self.exec_store_var(f)?;
228 }
229 }
230 Instruction::DeclareVar(f) => self.exec_declare_var(f)?,
231 Instruction::FieldAccess {
232 object,
233 field,
234 } => self.exec_field_access(object, field)?,
235
236 Instruction::Add => self.exec_add()?,
237 Instruction::Sub => self.exec_sub()?,
238 Instruction::Mul => self.exec_mul()?,
239 Instruction::Div => self.exec_div()?,
240 Instruction::Rem => self.exec_rem()?,
241 Instruction::Negate => self.exec_negate()?,
242 Instruction::LogicNot => self.exec_logic_not()?,
243
244 Instruction::CmpEq => self.exec_cmp_eq()?,
245 Instruction::CmpNe => self.exec_cmp_ne()?,
246 Instruction::CmpLt => self.exec_cmp_lt()?,
247 Instruction::CmpLe => self.exec_cmp_le()?,
248 Instruction::CmpGt => self.exec_cmp_gt()?,
249 Instruction::CmpGe => self.exec_cmp_ge()?,
250
251 Instruction::LogicAnd => self.exec_logic_and()?,
252 Instruction::LogicOr => self.exec_logic_or()?,
253 Instruction::LogicXor => self.exec_logic_xor()?,
254 Instruction::Between => self.exec_between()?,
255 Instruction::InList {
256 count,
257 negated,
258 } => self.exec_in_list(*count, *negated)?,
259 Instruction::Cast(target) => self.exec_cast(target)?,
260
261 Instruction::Jump(addr) => {
262 if self.batch_size > 1
263 && (!self.mask_stack.is_empty() || !self.loop_mask_stack.is_empty())
264 {
265 if self.exec_jump_masked(*addr)? {
266 continue;
267 }
268 } else {
269 self.exec_jump(*addr)?;
270 continue;
271 }
272 }
273 Instruction::JumpIfFalsePop(addr) => {
274 if self.batch_size > 1 {
275 let is_while_loop = instructions.get(self.ip + 1).is_some_and(|next| {
276 matches!(next, Instruction::EnterScope(ScopeType::Loop))
277 });
278
279 if is_while_loop
280 && self.loop_mask_stack
281 .last()
282 .is_none_or(|s| s.loop_end_addr != *addr)
283 {
284 let var = self.stack.pop()?;
285 let bool_bv = extract_bool_bitvec(&var)?;
286 let parent = self.effective_mask();
287 let candidate = self.intersect_condition(&bool_bv);
288
289 if candidate == parent {
290 } else if candidate.none() {
291 self.ip = *addr;
292 continue;
293 } else {
294 self.enter_loop_mask(*addr, candidate);
295 }
296 } else if self.exec_jump_if_false_pop_columnar(*addr)? {
297 continue;
298 }
299 } else if self.exec_jump_if_false_pop(*addr)? {
300 continue;
301 }
302 }
303 Instruction::JumpIfTruePop(addr) => {
304 if self.batch_size > 1 {
305 if self.exec_jump_if_true_pop_columnar(*addr)? {
306 continue;
307 }
308 } else if self.exec_jump_if_true_pop(*addr)? {
309 continue;
310 }
311 }
312 Instruction::EnterScope(scope_type) => self.exec_enter_scope(scope_type),
313 Instruction::ExitScope => self.exec_exit_scope()?,
314 Instruction::Break {
315 exit_scopes,
316 addr,
317 } => {
318 if self.batch_size > 1 && !self.loop_mask_stack.is_empty() {
319 self.exec_break_masked(*exit_scopes, *addr)?;
320 } else {
321 self.exec_break(*exit_scopes, *addr)?;
322 }
323 continue;
324 }
325 Instruction::Continue {
326 exit_scopes,
327 addr,
328 } => {
329 if self.batch_size > 1 && !self.loop_mask_stack.is_empty() {
330 self.exec_continue_masked(*exit_scopes, *addr)?;
331 } else {
332 self.exec_continue(*exit_scopes, *addr)?;
333 }
334 continue;
335 }
336
337 Instruction::ForInit {
338 variable_name,
339 } => self.exec_for_init(variable_name)?,
340 Instruction::ForNext {
341 variable_name,
342 addr,
343 } => {
344 if self.exec_for_next(variable_name, *addr)? {
345 continue;
346 }
347 }
348
349 Instruction::DefineFunction(node) => self.exec_define_function(node),
350 Instruction::Call {
351 name,
352 arity,
353 is_procedure_call,
354 } => {
355 self.exec_call(services, tx, name, *arity, *is_procedure_call)?;
356 }
357 Instruction::ReturnValue => {
358 self.exec_return_value()?;
359 return Ok(());
360 }
361 Instruction::ReturnVoid => {
362 self.exec_return_void();
363 return Ok(());
364 }
365 Instruction::DefineClosure(def) => self.exec_define_closure(def),
366
367 Instruction::Emit => self.exec_emit(result),
368 Instruction::Append {
369 target,
370 } => self.exec_append(target)?,
371
372 Instruction::Query(plan) => self.exec_query(services, tx, plan, params)?,
373
374 Instruction::CreateNamespace(n) => {
375 self.exec_ddl(services, tx, |s, t| create_namespace(s, t, n.clone()))?
376 }
377 Instruction::CreateRemoteNamespace(n) => {
378 self.exec_ddl(services, tx, |s, t| create_remote_namespace(s, t, n.clone()))?
379 }
380 Instruction::CreateTable(n) => {
381 self.exec_ddl(services, tx, |s, t| create_table(s, t, n.clone()))?
382 }
383 Instruction::CreateRingBuffer(n) => {
384 self.exec_ddl(services, tx, |s, t| create_ringbuffer(s, t, n.clone()))?
385 }
386 Instruction::CreateDeferredView(n) => {
387 self.exec_ddl(services, tx, |s, t| create_deferred_view(s, t, n.clone()))?
388 }
389 Instruction::CreateTransactionalView(n) => {
390 self.exec_ddl(services, tx, |s, t| create_transactional_view(s, t, n.clone()))?
391 }
392 Instruction::CreateDictionary(n) => {
393 self.exec_ddl(services, tx, |s, t| create_dictionary(s, t, n.clone()))?
394 }
395 Instruction::CreateSumType(n) => {
396 self.exec_ddl(services, tx, |s, t| create_sumtype(s, t, n.clone()))?
397 }
398 Instruction::CreatePrimaryKey(n) => {
399 self.exec_ddl(services, tx, |s, t| create_primary_key(s, t, n.clone()))?
400 }
401 Instruction::CreateColumnProperty(n) => {
402 self.exec_ddl(services, tx, |s, t| create_column_property(s, t, n.clone()))?
403 }
404 Instruction::CreateProcedure(n) => {
405 self.exec_ddl(services, tx, |s, t| create_procedure(s, t, n.clone()))?
406 }
407 Instruction::CreateSeries(n) => {
408 self.exec_ddl(services, tx, |s, t| create_series(s, t, n.clone()))?
409 }
410 Instruction::CreateEvent(n) => {
411 self.exec_ddl(services, tx, |s, t| create_event(s, t, n.clone()))?
412 }
413 Instruction::CreateTag(n) => {
414 self.exec_ddl(services, tx, |s, t| create_tag(s, t, n.clone()))?
415 }
416 Instruction::CreateSource(n) => {
417 self.exec_ddl(services, tx, |s, t| create_source(s, t, n.clone()))?
418 }
419 Instruction::CreateSink(n) => {
420 self.exec_ddl(services, tx, |s, t| create_sink(s, t, n.clone()))?
421 }
422 Instruction::CreateBinding(n) => {
423 self.exec_ddl(services, tx, |s, t| create_binding(s, t, n.clone()))?
424 }
425 Instruction::CreateTest(n) => {
426 self.exec_ddl(services, tx, |s, t| create_test(s, t, n.clone()))?
427 }
428 Instruction::CreateMigration(n) => {
429 self.exec_ddl(services, tx, |s, t| create_migration(s, t, n.clone()))?
430 }
431 Instruction::CreateIdentity(n) => {
432 self.exec_ddl(services, tx, |s, t| create_identity(s, t, n.clone()))?
433 }
434 Instruction::CreateRole(n) => {
435 self.exec_ddl(services, tx, |s, t| create_role(s, t, n.clone()))?
436 }
437 Instruction::CreatePolicy(n) => {
438 self.exec_ddl(services, tx, |s, t| create_policy(s, t, n.clone()))?
439 }
440 Instruction::CreateAuthentication(n) => {
441 self.exec_ddl(services, tx, |s, t| create_authentication(s, t, n.clone()))?
442 }
443 Instruction::Grant(n) => self.exec_ddl(services, tx, |s, t| grant(s, t, n.clone()))?,
444 Instruction::Revoke(n) => {
445 self.exec_ddl(services, tx, |s, t| revoke(s, t, n.clone()))?
446 }
447
448 Instruction::CreateSubscription(n) => {
449 self.exec_ddl_sub(services, tx, |s, t| create_subscription(s, t, n.clone()))?
450 }
451
452 Instruction::AlterTable(n) => {
453 self.exec_ddl(services, tx, |s, t| execute_alter_table(s, t, n.clone()))?
454 }
455 Instruction::AlterRemoteNamespace(n) => {
456 self.exec_ddl(services, tx, |s, t| alter_remote_namespace(s, t, n.clone()))?
457 }
458 Instruction::AlterSequence(n) => {
459 self.exec_ddl(services, tx, |s, t| alter_table_sequence(s, t, n.clone()))?
460 }
461 Instruction::AlterPolicy(n) => {
462 self.exec_ddl(services, tx, |s, t| alter_policy(s, t, n.clone()))?
463 }
464
465 Instruction::DropNamespace(n) => {
466 self.exec_ddl(services, tx, |s, t| drop_namespace(s, t, n.clone()))?
467 }
468 Instruction::DropTable(n) => {
469 self.exec_ddl(services, tx, |s, t| drop_table(s, t, n.clone()))?
470 }
471 Instruction::DropView(n) => {
472 self.exec_ddl(services, tx, |s, t| drop_view(s, t, n.clone()))?
473 }
474 Instruction::DropRingBuffer(n) => {
475 self.exec_ddl(services, tx, |s, t| drop_ringbuffer(s, t, n.clone()))?
476 }
477 Instruction::DropSeries(n) => {
478 self.exec_ddl(services, tx, |s, t| drop_series(s, t, n.clone()))?
479 }
480 Instruction::DropDictionary(n) => {
481 self.exec_ddl(services, tx, |s, t| drop_dictionary(s, t, n.clone()))?
482 }
483 Instruction::DropSumType(n) => {
484 self.exec_ddl(services, tx, |s, t| drop_sumtype(s, t, n.clone()))?
485 }
486 Instruction::DropSource(n) => {
487 self.exec_ddl(services, tx, |s, t| drop_source(s, t, n.clone()))?
488 }
489 Instruction::DropSink(n) => {
490 self.exec_ddl(services, tx, |s, t| drop_sink(s, t, n.clone()))?
491 }
492 Instruction::DropProcedure(n) => {
493 self.exec_ddl(services, tx, |s, t| drop_procedure(s, t, n.clone()))?
494 }
495 Instruction::DropHandler(n) => {
496 self.exec_ddl(services, tx, |s, t| drop_handler(s, t, n.clone()))?
497 }
498 Instruction::DropTest(n) => {
499 self.exec_ddl(services, tx, |s, t| drop_test(s, t, n.clone()))?
500 }
501 Instruction::DropBinding(n) => {
502 self.exec_ddl(services, tx, |s, t| drop_binding(s, t, n.clone()))?
503 }
504 Instruction::DropIdentity(n) => {
505 self.exec_ddl(services, tx, |s, t| drop_identity(s, t, n.clone()))?
506 }
507 Instruction::DropRole(n) => {
508 self.exec_ddl(services, tx, |s, t| drop_role(s, t, n.clone()))?
509 }
510 Instruction::DropPolicy(n) => {
511 self.exec_ddl(services, tx, |s, t| drop_policy(s, t, n.clone()))?
512 }
513 Instruction::DropAuthentication(n) => {
514 self.exec_ddl(services, tx, |s, t| drop_authentication(s, t, n.clone()))?
515 }
516
517 Instruction::DropSubscription(n) => {
518 self.exec_ddl_sub(services, tx, |s, t| drop_subscription(s, t, n.clone()))?
519 }
520
521 Instruction::Delete(n) => {
522 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
523 delete(s, t, n.clone(), p, sym)
524 })?
525 }
526 Instruction::DeleteRingBuffer(n) => {
527 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
528 delete_ringbuffer(s, t, n.clone(), p, sym)
529 })?
530 }
531 Instruction::DeleteSeries(n) => {
532 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
533 delete_series(s, t, n.clone(), p, sym)
534 })?
535 }
536 Instruction::Update(n) => {
537 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
538 update_table(s, t, n.clone(), p, sym)
539 })?
540 }
541 Instruction::UpdateRingBuffer(n) => {
542 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
543 update_ringbuffer(s, t, n.clone(), p, sym)
544 })?
545 }
546 Instruction::UpdateSeries(n) => {
547 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
548 update_series(s, t, n.clone(), p, sym)
549 })?
550 }
551 Instruction::InsertTable(n) => {
552 self.exec_dml_with_mut_symbols(services, tx, |s, t, sym| {
553 insert_table(s, t, n.clone(), sym)
554 })?
555 }
556 Instruction::InsertDictionary(n) => {
557 self.exec_dml_with_mut_symbols(services, tx, |s, t, sym| {
558 insert_dictionary(s, t, n.clone(), sym)
559 })?
560 }
561 Instruction::InsertRingBuffer(n) => {
562 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
563 insert_ringbuffer(s, t, n.clone(), p, sym)
564 })?
565 }
566 Instruction::InsertSeries(n) => {
567 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
568 insert_series(s, t, n.clone(), p, sym)
569 })?
570 }
571
572 Instruction::Dispatch(n) => self.exec_dispatch(services, tx, n, params)?,
573 Instruction::Migrate(n) => self.exec_migrate(services, tx, n)?,
574 Instruction::RollbackMigration(n) => self.exec_rollback_migration(services, tx, n)?,
575 Instruction::AssertBlock(n) => self.exec_assert_block(services, tx, n)?,
576 }
577
578 self.ip += 1;
579
580 if !self.control_flow.is_normal() {
581 return Ok(());
582 }
583 }
584 Ok(())
585 }
586}