1use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{internal_error, value::column::columns::Columns};
7use reifydb_routine::function::registry::Functions;
8use reifydb_rql::instruction::{Instruction, ScopeType};
9use reifydb_runtime::context::RuntimeContext;
10use reifydb_transaction::transaction::Transaction;
11use reifydb_type::{
12 params::Params,
13 util::bitvec::BitVec,
14 value::{Value, frame::frame::Frame, identity::IdentityId},
15};
16
17use super::{
18 exec::{
19 mask::{LoopMaskState, MaskFrame, extract_bool_bitvec},
20 stack::strip_dollar_prefix,
21 },
22 instruction::{
23 ddl::{
24 alter::{
25 remote_namespace::alter_remote_namespace, sequence::alter_table_sequence,
26 table::execute_alter_table,
27 },
28 create::{
29 binding::create_binding, deferred::create_deferred_view, dictionary::create_dictionary,
30 migration::create_migration, namespace::create_namespace,
31 primary_key::create_primary_key, procedure::create_procedure,
32 property::create_column_property, remote_namespace::create_remote_namespace,
33 ringbuffer::create_ringbuffer, series::create_series, sink::create_sink,
34 source::create_source, subscription::create_subscription, sumtype::create_sumtype,
35 table::create_table, tag::create_tag, test::create_test,
36 transactional::create_transactional_view,
37 },
38 drop::{
39 binding::drop_binding, dictionary::drop_dictionary, namespace::drop_namespace,
40 procedure::drop_procedure, ringbuffer::drop_ringbuffer, series::drop_series,
41 sink::drop_sink, source::drop_source, subscription::drop_subscription,
42 sumtype::drop_sumtype, table::drop_table, view::drop_view,
43 },
44 },
45 dml::{
46 dictionary_insert::insert_dictionary, ringbuffer_delete::delete_ringbuffer,
47 ringbuffer_insert::insert_ringbuffer, ringbuffer_update::update_ringbuffer,
48 series_delete::delete_series, series_insert::insert_series, series_update::update_series,
49 table_delete::delete, table_insert::insert_table, table_update::update_table,
50 },
51 },
52 services::Services,
53 stack::{ControlFlow, Stack, SymbolTable, Variable},
54};
55use crate::{
56 Result,
57 expression::context::EvalContext,
58 vm::instruction::ddl::{
59 alter::policy::alter_policy,
60 create::{
61 authentication::create_authentication, event::create_event, identity::create_identity,
62 policy::create_policy, role::create_role,
63 },
64 drop::{
65 authentication::drop_authentication, handler::drop_handler, identity::drop_identity,
66 policy::drop_policy, role::drop_role, test::drop_test,
67 },
68 grant::grant,
69 revoke::revoke,
70 },
71};
72
73pub static EMPTY_PARAMS: LazyLock<Params> = LazyLock::new(|| Params::None);
77
78pub struct Vm<'a> {
79 pub(crate) ip: usize,
80 pub(crate) iteration_count: usize,
81 pub(crate) stack: Stack,
82 pub symbols: SymbolTable,
83 pub control_flow: ControlFlow,
84 pub(crate) dispatch_depth: u8,
85 pub(crate) batch_size: usize,
87 pub(crate) active_mask: Option<BitVec>,
89 pub(crate) mask_stack: Vec<MaskFrame>,
91 pub(crate) loop_mask_stack: Vec<LoopMaskState>,
93 pub(crate) params: &'a Params,
98 pub(crate) functions: &'a Functions,
99 pub(crate) runtime_context: &'a RuntimeContext,
100 pub(crate) identity: IdentityId,
101}
102
103impl<'a> Vm<'a> {
104 pub fn from_services(
105 symbols: SymbolTable,
106 services: &'a Services,
107 params: &'a Params,
108 identity: IdentityId,
109 ) -> Self {
110 Self::build(symbols, 1, params, &services.functions, &services.runtime_context, identity)
111 }
112
113 pub fn with_batch_size_from_services(
114 symbols: SymbolTable,
115 batch_size: usize,
116 services: &'a Services,
117 params: &'a Params,
118 identity: IdentityId,
119 ) -> Self {
120 Self::build(symbols, batch_size, params, &services.functions, &services.runtime_context, identity)
121 }
122
123 fn build(
124 symbols: SymbolTable,
125 batch_size: usize,
126 params: &'a Params,
127 functions: &'a Functions,
128 runtime_context: &'a RuntimeContext,
129 identity: IdentityId,
130 ) -> Self {
131 Self {
132 ip: 0,
133 iteration_count: 0,
134 stack: Stack::new(),
135 symbols,
136 control_flow: ControlFlow::Normal,
137 dispatch_depth: 0,
138 batch_size,
139 active_mask: None,
140 mask_stack: Vec::new(),
141 loop_mask_stack: Vec::new(),
142 params,
143 functions,
144 runtime_context,
145 identity,
146 }
147 }
148
149 pub(crate) fn eval_ctx(&self) -> EvalContext<'_> {
150 EvalContext {
151 params: self.params,
152 symbols: &self.symbols,
153 functions: self.functions,
154 runtime_context: self.runtime_context,
155 arena: None,
156 identity: self.identity,
157 is_aggregate_context: false,
158 columns: Columns::empty(),
159 row_count: self.batch_size,
160 target: None,
161 take: None,
162 }
163 }
164
165 pub(crate) fn run_isolated_body(
170 &mut self,
171 services: &Arc<Services>,
172 tx: &mut Transaction<'_>,
173 instructions: &[Instruction],
174 result: &mut Vec<Frame>,
175 ) -> Result<()> {
176 let saved = self.params;
177 self.params = &EMPTY_PARAMS;
178 let run_result = self.run(services, tx, instructions, result);
179 self.params = saved;
180 run_result
181 }
182
183 pub(crate) fn pop_value(&mut self) -> Result<Value> {
194 match self.stack.pop()? {
195 Variable::Columns {
196 columns: c,
197 } if c.is_scalar() => Ok(c.scalar_value()),
198 _ => Err(internal_error!("Expected scalar value on stack")),
199 }
200 }
201
202 pub(crate) fn pop_as_columns(&mut self) -> Result<Columns> {
204 match self.stack.pop()? {
205 Variable::Columns {
206 columns: c,
207 ..
208 }
209 | Variable::ForIterator {
210 columns: c,
211 ..
212 } => Ok(c),
213 Variable::Closure(_) => Ok(Columns::scalar(Value::none())),
214 }
215 }
216
217 pub(crate) fn run(
218 &mut self,
219 services: &Arc<Services>,
220 tx: &mut Transaction<'_>,
221 instructions: &[Instruction],
222 result: &mut Vec<Frame>,
223 ) -> Result<()> {
224 let params = self.params;
225 while self.ip < instructions.len() {
226 if self.batch_size > 1 && self.check_mask_merge_point()? {
228 }
230
231 match &instructions[self.ip] {
232 Instruction::Halt => return Ok(()),
233 Instruction::Nop => {}
234
235 Instruction::PushConst(v) => self.exec_push_const(v),
236 Instruction::PushNone => self.exec_push_none(),
237 Instruction::Pop => self.exec_pop()?,
238 Instruction::Dup => self.exec_dup()?,
239
240 Instruction::LoadVar(f) => self.exec_load_var(f)?,
241 Instruction::StoreVar(f) => {
242 if self.batch_size > 1 && self.is_masked() {
243 let name = strip_dollar_prefix(f.text());
244 let value = self.stack.pop()?;
245 self.exec_store_var_masked(name, value)?;
246 } else if self.batch_size > 1 {
247 let name = strip_dollar_prefix(f.text());
249 let value = self.stack.pop()?;
250 self.symbols.reassign(name.to_string(), value)?;
251 } else {
252 self.exec_store_var(f)?;
253 }
254 }
255 Instruction::DeclareVar(f) => self.exec_declare_var(f)?,
256 Instruction::FieldAccess {
257 object,
258 field,
259 } => self.exec_field_access(object, field)?,
260
261 Instruction::Add => self.exec_add()?,
262 Instruction::Sub => self.exec_sub()?,
263 Instruction::Mul => self.exec_mul()?,
264 Instruction::Div => self.exec_div()?,
265 Instruction::Rem => self.exec_rem()?,
266 Instruction::Negate => self.exec_negate()?,
267 Instruction::LogicNot => self.exec_logic_not()?,
268
269 Instruction::CmpEq => self.exec_cmp_eq()?,
270 Instruction::CmpNe => self.exec_cmp_ne()?,
271 Instruction::CmpLt => self.exec_cmp_lt()?,
272 Instruction::CmpLe => self.exec_cmp_le()?,
273 Instruction::CmpGt => self.exec_cmp_gt()?,
274 Instruction::CmpGe => self.exec_cmp_ge()?,
275
276 Instruction::LogicAnd => self.exec_logic_and()?,
277 Instruction::LogicOr => self.exec_logic_or()?,
278 Instruction::LogicXor => self.exec_logic_xor()?,
279 Instruction::Between => self.exec_between()?,
280 Instruction::InList {
281 count,
282 negated,
283 } => self.exec_in_list(*count, *negated)?,
284 Instruction::Cast(target) => self.exec_cast(target)?,
285
286 Instruction::Jump(addr) => {
287 if self.batch_size > 1
288 && (!self.mask_stack.is_empty() || !self.loop_mask_stack.is_empty())
289 {
290 if self.exec_jump_masked(*addr)? {
291 continue;
292 }
293 } else {
294 self.exec_jump(*addr)?;
295 continue;
296 }
297 }
298 Instruction::JumpIfFalsePop(addr) => {
299 if self.batch_size > 1 {
300 let is_while_loop = instructions.get(self.ip + 1).is_some_and(|next| {
303 matches!(next, Instruction::EnterScope(ScopeType::Loop))
304 });
305
306 if is_while_loop
307 && self.loop_mask_stack
308 .last()
309 .is_none_or(|s| s.loop_end_addr != *addr)
310 {
311 let var = self.stack.pop()?;
313 let bool_bv = extract_bool_bitvec(&var)?;
314 let parent = self.effective_mask();
315 let candidate = self.intersect_condition(&bool_bv);
316
317 if candidate == parent {
318 } else if candidate.none() {
320 self.ip = *addr;
322 continue;
323 } else {
324 self.enter_loop_mask(*addr, candidate);
326 }
327 } else if self.exec_jump_if_false_pop_columnar(*addr)? {
328 continue;
329 }
330 } else if self.exec_jump_if_false_pop(*addr)? {
331 continue;
332 }
333 }
334 Instruction::JumpIfTruePop(addr) => {
335 if self.batch_size > 1 {
336 if self.exec_jump_if_true_pop_columnar(*addr)? {
337 continue;
338 }
339 } else if self.exec_jump_if_true_pop(*addr)? {
340 continue;
341 }
342 }
343 Instruction::EnterScope(scope_type) => self.exec_enter_scope(scope_type),
344 Instruction::ExitScope => self.exec_exit_scope()?,
345 Instruction::Break {
346 exit_scopes,
347 addr,
348 } => {
349 if self.batch_size > 1 && !self.loop_mask_stack.is_empty() {
350 self.exec_break_masked(*exit_scopes, *addr)?;
351 } else {
352 self.exec_break(*exit_scopes, *addr)?;
353 }
354 continue;
355 }
356 Instruction::Continue {
357 exit_scopes,
358 addr,
359 } => {
360 if self.batch_size > 1 && !self.loop_mask_stack.is_empty() {
361 self.exec_continue_masked(*exit_scopes, *addr)?;
362 } else {
363 self.exec_continue(*exit_scopes, *addr)?;
364 }
365 continue;
366 }
367
368 Instruction::ForInit {
369 variable_name,
370 } => self.exec_for_init(variable_name)?,
371 Instruction::ForNext {
372 variable_name,
373 addr,
374 } => {
375 if self.exec_for_next(variable_name, *addr)? {
376 continue;
377 }
378 }
379
380 Instruction::DefineFunction(node) => self.exec_define_function(node),
381 Instruction::Call {
382 name,
383 arity,
384 is_procedure_call,
385 } => {
386 self.exec_call(services, tx, name, *arity, *is_procedure_call)?;
387 }
388 Instruction::ReturnValue => {
389 self.exec_return_value()?;
390 return Ok(());
391 }
392 Instruction::ReturnVoid => {
393 self.exec_return_void();
394 return Ok(());
395 }
396 Instruction::DefineClosure(def) => self.exec_define_closure(def),
397
398 Instruction::Emit => self.exec_emit(result),
399 Instruction::Append {
400 target,
401 } => self.exec_append(target)?,
402
403 Instruction::Query(plan) => self.exec_query(services, tx, plan, params)?,
404
405 Instruction::CreateNamespace(n) => {
406 self.exec_ddl(services, tx, |s, t| create_namespace(s, t, n.clone()))?
407 }
408 Instruction::CreateRemoteNamespace(n) => {
409 self.exec_ddl(services, tx, |s, t| create_remote_namespace(s, t, n.clone()))?
410 }
411 Instruction::CreateTable(n) => {
412 self.exec_ddl(services, tx, |s, t| create_table(s, t, n.clone()))?
413 }
414 Instruction::CreateRingBuffer(n) => {
415 self.exec_ddl(services, tx, |s, t| create_ringbuffer(s, t, n.clone()))?
416 }
417 Instruction::CreateDeferredView(n) => {
418 self.exec_ddl(services, tx, |s, t| create_deferred_view(s, t, n.clone()))?
419 }
420 Instruction::CreateTransactionalView(n) => {
421 self.exec_ddl(services, tx, |s, t| create_transactional_view(s, t, n.clone()))?
422 }
423 Instruction::CreateDictionary(n) => {
424 self.exec_ddl(services, tx, |s, t| create_dictionary(s, t, n.clone()))?
425 }
426 Instruction::CreateSumType(n) => {
427 self.exec_ddl(services, tx, |s, t| create_sumtype(s, t, n.clone()))?
428 }
429 Instruction::CreatePrimaryKey(n) => {
430 self.exec_ddl(services, tx, |s, t| create_primary_key(s, t, n.clone()))?
431 }
432 Instruction::CreateColumnProperty(n) => {
433 self.exec_ddl(services, tx, |s, t| create_column_property(s, t, n.clone()))?
434 }
435 Instruction::CreateProcedure(n) => {
436 self.exec_ddl(services, tx, |s, t| create_procedure(s, t, n.clone()))?
437 }
438 Instruction::CreateSeries(n) => {
439 self.exec_ddl(services, tx, |s, t| create_series(s, t, n.clone()))?
440 }
441 Instruction::CreateEvent(n) => {
442 self.exec_ddl(services, tx, |s, t| create_event(s, t, n.clone()))?
443 }
444 Instruction::CreateTag(n) => {
445 self.exec_ddl(services, tx, |s, t| create_tag(s, t, n.clone()))?
446 }
447 Instruction::CreateSource(n) => {
448 self.exec_ddl(services, tx, |s, t| create_source(s, t, n.clone()))?
449 }
450 Instruction::CreateSink(n) => {
451 self.exec_ddl(services, tx, |s, t| create_sink(s, t, n.clone()))?
452 }
453 Instruction::CreateBinding(n) => {
454 self.exec_ddl(services, tx, |s, t| create_binding(s, t, n.clone()))?
455 }
456 Instruction::CreateTest(n) => {
457 self.exec_ddl(services, tx, |s, t| create_test(s, t, n.clone()))?
458 }
459 Instruction::CreateMigration(n) => {
460 self.exec_ddl(services, tx, |s, t| create_migration(s, t, n.clone()))?
461 }
462 Instruction::CreateIdentity(n) => {
463 self.exec_ddl(services, tx, |s, t| create_identity(s, t, n.clone()))?
464 }
465 Instruction::CreateRole(n) => {
466 self.exec_ddl(services, tx, |s, t| create_role(s, t, n.clone()))?
467 }
468 Instruction::CreatePolicy(n) => {
469 self.exec_ddl(services, tx, |s, t| create_policy(s, t, n.clone()))?
470 }
471 Instruction::CreateAuthentication(n) => {
472 self.exec_ddl(services, tx, |s, t| create_authentication(s, t, n.clone()))?
473 }
474 Instruction::Grant(n) => self.exec_ddl(services, tx, |s, t| grant(s, t, n.clone()))?,
475 Instruction::Revoke(n) => {
476 self.exec_ddl(services, tx, |s, t| revoke(s, t, n.clone()))?
477 }
478
479 Instruction::CreateSubscription(n) => {
480 self.exec_ddl_sub(services, tx, |s, t| create_subscription(s, t, n.clone()))?
481 }
482
483 Instruction::AlterTable(n) => {
484 self.exec_ddl(services, tx, |s, t| execute_alter_table(s, t, n.clone()))?
485 }
486 Instruction::AlterRemoteNamespace(n) => {
487 self.exec_ddl(services, tx, |s, t| alter_remote_namespace(s, t, n.clone()))?
488 }
489 Instruction::AlterSequence(n) => {
490 self.exec_ddl(services, tx, |s, t| alter_table_sequence(s, t, n.clone()))?
491 }
492 Instruction::AlterPolicy(n) => {
493 self.exec_ddl(services, tx, |s, t| alter_policy(s, t, n.clone()))?
494 }
495
496 Instruction::DropNamespace(n) => {
497 self.exec_ddl(services, tx, |s, t| drop_namespace(s, t, n.clone()))?
498 }
499 Instruction::DropTable(n) => {
500 self.exec_ddl(services, tx, |s, t| drop_table(s, t, n.clone()))?
501 }
502 Instruction::DropView(n) => {
503 self.exec_ddl(services, tx, |s, t| drop_view(s, t, n.clone()))?
504 }
505 Instruction::DropRingBuffer(n) => {
506 self.exec_ddl(services, tx, |s, t| drop_ringbuffer(s, t, n.clone()))?
507 }
508 Instruction::DropSeries(n) => {
509 self.exec_ddl(services, tx, |s, t| drop_series(s, t, n.clone()))?
510 }
511 Instruction::DropDictionary(n) => {
512 self.exec_ddl(services, tx, |s, t| drop_dictionary(s, t, n.clone()))?
513 }
514 Instruction::DropSumType(n) => {
515 self.exec_ddl(services, tx, |s, t| drop_sumtype(s, t, n.clone()))?
516 }
517 Instruction::DropSource(n) => {
518 self.exec_ddl(services, tx, |s, t| drop_source(s, t, n.clone()))?
519 }
520 Instruction::DropSink(n) => {
521 self.exec_ddl(services, tx, |s, t| drop_sink(s, t, n.clone()))?
522 }
523 Instruction::DropProcedure(n) => {
524 self.exec_ddl(services, tx, |s, t| drop_procedure(s, t, n.clone()))?
525 }
526 Instruction::DropHandler(n) => {
527 self.exec_ddl(services, tx, |s, t| drop_handler(s, t, n.clone()))?
528 }
529 Instruction::DropTest(n) => {
530 self.exec_ddl(services, tx, |s, t| drop_test(s, t, n.clone()))?
531 }
532 Instruction::DropBinding(n) => {
533 self.exec_ddl(services, tx, |s, t| drop_binding(s, t, n.clone()))?
534 }
535 Instruction::DropIdentity(n) => {
536 self.exec_ddl(services, tx, |s, t| drop_identity(s, t, n.clone()))?
537 }
538 Instruction::DropRole(n) => {
539 self.exec_ddl(services, tx, |s, t| drop_role(s, t, n.clone()))?
540 }
541 Instruction::DropPolicy(n) => {
542 self.exec_ddl(services, tx, |s, t| drop_policy(s, t, n.clone()))?
543 }
544 Instruction::DropAuthentication(n) => {
545 self.exec_ddl(services, tx, |s, t| drop_authentication(s, t, n.clone()))?
546 }
547
548 Instruction::DropSubscription(n) => {
549 self.exec_ddl_sub(services, tx, |s, t| drop_subscription(s, t, n.clone()))?
550 }
551
552 Instruction::Delete(n) => {
553 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
554 delete(s, t, n.clone(), p, sym)
555 })?
556 }
557 Instruction::DeleteRingBuffer(n) => {
558 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
559 delete_ringbuffer(s, t, n.clone(), p, sym)
560 })?
561 }
562 Instruction::DeleteSeries(n) => {
563 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
564 delete_series(s, t, n.clone(), p, sym)
565 })?
566 }
567 Instruction::Update(n) => {
568 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
569 update_table(s, t, n.clone(), p, sym)
570 })?
571 }
572 Instruction::UpdateRingBuffer(n) => {
573 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
574 update_ringbuffer(s, t, n.clone(), p, sym)
575 })?
576 }
577 Instruction::UpdateSeries(n) => {
578 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
579 update_series(s, t, n.clone(), p, sym)
580 })?
581 }
582 Instruction::InsertTable(n) => {
583 self.exec_dml_with_mut_symbols(services, tx, |s, t, sym| {
584 insert_table(s, t, n.clone(), sym)
585 })?
586 }
587 Instruction::InsertDictionary(n) => {
588 self.exec_dml_with_mut_symbols(services, tx, |s, t, sym| {
589 insert_dictionary(s, t, n.clone(), sym)
590 })?
591 }
592 Instruction::InsertRingBuffer(n) => {
593 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
594 insert_ringbuffer(s, t, n.clone(), p, sym)
595 })?
596 }
597 Instruction::InsertSeries(n) => {
598 self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
599 insert_series(s, t, n.clone(), p, sym)
600 })?
601 }
602
603 Instruction::Dispatch(n) => self.exec_dispatch(services, tx, n, params)?,
604 Instruction::Migrate(n) => self.exec_migrate(services, tx, n)?,
605 Instruction::RollbackMigration(n) => self.exec_rollback_migration(services, tx, n)?,
606 Instruction::AssertBlock(n) => self.exec_assert_block(services, tx, n)?,
607 }
608
609 self.ip += 1;
610
611 if !self.control_flow.is_normal() {
612 return Ok(());
613 }
614 }
615 Ok(())
616 }
617}