Skip to main content

reifydb_engine/vm/
vm.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{internal_error, value::column::columns::Columns};
7use reifydb_routine::function::registry::Functions;
8use reifydb_rql::instruction::{Instruction, ScopeType};
9use reifydb_runtime::context::RuntimeContext;
10use reifydb_transaction::transaction::Transaction;
11use reifydb_type::{
12	params::Params,
13	util::bitvec::BitVec,
14	value::{Value, frame::frame::Frame, identity::IdentityId},
15};
16
17use super::{
18	exec::{
19		mask::{LoopMaskState, MaskFrame, extract_bool_bitvec},
20		stack::strip_dollar_prefix,
21	},
22	instruction::{
23		ddl::{
24			alter::{
25				remote_namespace::alter_remote_namespace, sequence::alter_table_sequence,
26				table::execute_alter_table,
27			},
28			create::{
29				binding::create_binding, deferred::create_deferred_view, dictionary::create_dictionary,
30				migration::create_migration, namespace::create_namespace,
31				primary_key::create_primary_key, procedure::create_procedure,
32				property::create_column_property, remote_namespace::create_remote_namespace,
33				ringbuffer::create_ringbuffer, series::create_series, sink::create_sink,
34				source::create_source, subscription::create_subscription, sumtype::create_sumtype,
35				table::create_table, tag::create_tag, test::create_test,
36				transactional::create_transactional_view,
37			},
38			drop::{
39				binding::drop_binding, dictionary::drop_dictionary, namespace::drop_namespace,
40				procedure::drop_procedure, ringbuffer::drop_ringbuffer, series::drop_series,
41				sink::drop_sink, source::drop_source, subscription::drop_subscription,
42				sumtype::drop_sumtype, table::drop_table, view::drop_view,
43			},
44		},
45		dml::{
46			dictionary_insert::insert_dictionary, ringbuffer_delete::delete_ringbuffer,
47			ringbuffer_insert::insert_ringbuffer, ringbuffer_update::update_ringbuffer,
48			series_delete::delete_series, series_insert::insert_series, series_update::update_series,
49			table_delete::delete, table_insert::insert_table, table_update::update_table,
50		},
51	},
52	services::Services,
53	stack::{ControlFlow, Stack, SymbolTable, Variable},
54};
55use crate::{
56	Result,
57	expression::context::EvalContext,
58	vm::instruction::ddl::{
59		alter::policy::alter_policy,
60		create::{
61			authentication::create_authentication, event::create_event, identity::create_identity,
62			policy::create_policy, role::create_role,
63		},
64		drop::{
65			authentication::drop_authentication, handler::drop_handler, identity::drop_identity,
66			policy::drop_policy, role::drop_role, test::drop_test,
67		},
68		grant::grant,
69		revoke::revoke,
70	},
71};
72
73/// A `&'static Params` value pointing at `Params::None`. Used by UDF / test
74/// runner Vm construction sites to make the "no caller params inside a UDF
75/// body" semantics explicit at the call site.
76pub static EMPTY_PARAMS: LazyLock<Params> = LazyLock::new(|| Params::None);
77
78pub struct Vm<'a> {
79	pub(crate) ip: usize,
80	pub(crate) iteration_count: usize,
81	pub(crate) stack: Stack,
82	pub symbols: SymbolTable,
83	pub control_flow: ControlFlow,
84	pub(crate) dispatch_depth: u8,
85	/// Number of rows in the current batch. 1 = scalar mode.
86	pub(crate) batch_size: usize,
87	/// Current active execution mask. None means all rows are active.
88	pub(crate) active_mask: Option<BitVec>,
89	/// Stack of mask frames for nested masked conditionals.
90	pub(crate) mask_stack: Vec<MaskFrame>,
91	/// Stack of loop mask states for nested WHILE loops in columnar mode.
92	pub(crate) loop_mask_stack: Vec<LoopMaskState>,
93	/// Borrowed evaluation invariants — combined with `&self.symbols` on demand
94	/// in `eval_ctx()` to produce a full `EvalContext`. Stored as loose fields
95	/// rather than a nested `EvalContext` because the latter would need
96	/// `&self.symbols` and would be self-referential.
97	pub(crate) params: &'a Params,
98	pub(crate) functions: &'a Functions,
99	pub(crate) runtime_context: &'a RuntimeContext,
100	pub(crate) identity: IdentityId,
101}
102
103impl<'a> Vm<'a> {
104	pub fn from_services(
105		symbols: SymbolTable,
106		services: &'a Services,
107		params: &'a Params,
108		identity: IdentityId,
109	) -> Self {
110		Self::build(symbols, 1, params, &services.functions, &services.runtime_context, identity)
111	}
112
113	pub fn with_batch_size_from_services(
114		symbols: SymbolTable,
115		batch_size: usize,
116		services: &'a Services,
117		params: &'a Params,
118		identity: IdentityId,
119	) -> Self {
120		Self::build(symbols, batch_size, params, &services.functions, &services.runtime_context, identity)
121	}
122
123	fn build(
124		symbols: SymbolTable,
125		batch_size: usize,
126		params: &'a Params,
127		functions: &'a Functions,
128		runtime_context: &'a RuntimeContext,
129		identity: IdentityId,
130	) -> Self {
131		Self {
132			ip: 0,
133			iteration_count: 0,
134			stack: Stack::new(),
135			symbols,
136			control_flow: ControlFlow::Normal,
137			dispatch_depth: 0,
138			batch_size,
139			active_mask: None,
140			mask_stack: Vec::new(),
141			loop_mask_stack: Vec::new(),
142			params,
143			functions,
144			runtime_context,
145			identity,
146		}
147	}
148
149	pub(crate) fn eval_ctx(&self) -> EvalContext<'_> {
150		EvalContext {
151			params: self.params,
152			symbols: &self.symbols,
153			functions: self.functions,
154			runtime_context: self.runtime_context,
155			arena: None,
156			identity: self.identity,
157			is_aggregate_context: false,
158			columns: Columns::empty(),
159			row_count: self.batch_size,
160			target: None,
161			take: None,
162		}
163	}
164
165	/// Execute `instructions` on this Vm with `self.params` temporarily swapped to
166	/// `Params::None`. Used for UDF and closure bodies, which are isolated from the
167	/// caller's `$param` bindings (formal parameters are bound via the symbol table,
168	/// not `Params`). Ensures the swap is reverted even if the body errors.
169	pub(crate) fn run_isolated_body(
170		&mut self,
171		services: &Arc<Services>,
172		tx: &mut Transaction<'_>,
173		instructions: &[Instruction],
174		result: &mut Vec<Frame>,
175	) -> Result<()> {
176		let saved = self.params;
177		self.params = &EMPTY_PARAMS;
178		let run_result = self.run(services, tx, instructions, result);
179		self.params = saved;
180		run_result
181	}
182
183	/// Pop a scalar Value from the stack. Works for Scalar(Columns) and
184	/// 1x1 Columns variants.
185	///
186	/// Invariant: only legitimate in scalar-mode contexts (`batch_size == 1`) or
187	/// in DDL/DML code paths where a single value is required. All arithmetic,
188	/// comparison, and logic dispatch goes through columnar kernels via
189	/// `pop_as_column`; scalar-path jump helpers are guarded by `batch_size > 1`
190	/// checks in the dispatch in `run`. If you add a new caller, confirm it's
191	/// unreachable when the VM is batch-mode, or the call will fail on multi-row
192	/// input.
193	pub(crate) fn pop_value(&mut self) -> Result<Value> {
194		match self.stack.pop()? {
195			Variable::Columns {
196				columns: c,
197			} if c.is_scalar() => Ok(c.scalar_value()),
198			_ => Err(internal_error!("Expected scalar value on stack")),
199		}
200	}
201
202	/// Pop the top of stack as Columns. Works for any variant.
203	pub(crate) fn pop_as_columns(&mut self) -> Result<Columns> {
204		match self.stack.pop()? {
205			Variable::Columns {
206				columns: c,
207				..
208			}
209			| Variable::ForIterator {
210				columns: c,
211				..
212			} => Ok(c),
213			Variable::Closure(_) => Ok(Columns::scalar(Value::none())),
214		}
215	}
216
217	pub(crate) fn run(
218		&mut self,
219		services: &Arc<Services>,
220		tx: &mut Transaction<'_>,
221		instructions: &[Instruction],
222		result: &mut Vec<Frame>,
223	) -> Result<()> {
224		let params = self.params;
225		while self.ip < instructions.len() {
226			// Check if current IP is a mask merge point (end of else-branch)
227			if self.batch_size > 1 && self.check_mask_merge_point()? {
228				// Merge happened; IP is already at end_addr, continue to execute it
229			}
230
231			match &instructions[self.ip] {
232				Instruction::Halt => return Ok(()),
233				Instruction::Nop => {}
234
235				Instruction::PushConst(v) => self.exec_push_const(v),
236				Instruction::PushNone => self.exec_push_none(),
237				Instruction::Pop => self.exec_pop()?,
238				Instruction::Dup => self.exec_dup()?,
239
240				Instruction::LoadVar(f) => self.exec_load_var(f)?,
241				Instruction::StoreVar(f) => {
242					if self.batch_size > 1 && self.is_masked() {
243						let name = strip_dollar_prefix(f.text());
244						let value = self.stack.pop()?;
245						self.exec_store_var_masked(name, value)?;
246					} else if self.batch_size > 1 {
247						// Columnar mode without mask: store the full column
248						let name = strip_dollar_prefix(f.text());
249						let value = self.stack.pop()?;
250						self.symbols.reassign(name.to_string(), value)?;
251					} else {
252						self.exec_store_var(f)?;
253					}
254				}
255				Instruction::DeclareVar(f) => self.exec_declare_var(f)?,
256				Instruction::FieldAccess {
257					object,
258					field,
259				} => self.exec_field_access(object, field)?,
260
261				Instruction::Add => self.exec_add()?,
262				Instruction::Sub => self.exec_sub()?,
263				Instruction::Mul => self.exec_mul()?,
264				Instruction::Div => self.exec_div()?,
265				Instruction::Rem => self.exec_rem()?,
266				Instruction::Negate => self.exec_negate()?,
267				Instruction::LogicNot => self.exec_logic_not()?,
268
269				Instruction::CmpEq => self.exec_cmp_eq()?,
270				Instruction::CmpNe => self.exec_cmp_ne()?,
271				Instruction::CmpLt => self.exec_cmp_lt()?,
272				Instruction::CmpLe => self.exec_cmp_le()?,
273				Instruction::CmpGt => self.exec_cmp_gt()?,
274				Instruction::CmpGe => self.exec_cmp_ge()?,
275
276				Instruction::LogicAnd => self.exec_logic_and()?,
277				Instruction::LogicOr => self.exec_logic_or()?,
278				Instruction::LogicXor => self.exec_logic_xor()?,
279				Instruction::Between => self.exec_between()?,
280				Instruction::InList {
281					count,
282					negated,
283				} => self.exec_in_list(*count, *negated)?,
284				Instruction::Cast(target) => self.exec_cast(target)?,
285
286				Instruction::Jump(addr) => {
287					if self.batch_size > 1
288						&& (!self.mask_stack.is_empty() || !self.loop_mask_stack.is_empty())
289					{
290						if self.exec_jump_masked(*addr)? {
291							continue;
292						}
293					} else {
294						self.exec_jump(*addr)?;
295						continue;
296					}
297				}
298				Instruction::JumpIfFalsePop(addr) => {
299					if self.batch_size > 1 {
300						// Check if this is a WHILE loop condition (next instruction is
301						// EnterScope(Loop))
302						let is_while_loop = instructions.get(self.ip + 1).is_some_and(|next| {
303							matches!(next, Instruction::EnterScope(ScopeType::Loop))
304						});
305
306						if is_while_loop
307							&& self.loop_mask_stack
308								.last()
309								.is_none_or(|s| s.loop_end_addr != *addr)
310						{
311							// First entry into a WHILE loop — handle specially
312							let var = self.stack.pop()?;
313							let bool_bv = extract_bool_bitvec(&var)?;
314							let parent = self.effective_mask();
315							let candidate = self.intersect_condition(&bool_bv);
316
317							if candidate == parent {
318								// All true — no mask needed, proceed normally
319							} else if candidate.none() {
320								// All false — skip the loop
321								self.ip = *addr;
322								continue;
323							} else {
324								// Mixed — enter loop mask
325								self.enter_loop_mask(*addr, candidate);
326							}
327						} else if self.exec_jump_if_false_pop_columnar(*addr)? {
328							continue;
329						}
330					} else if self.exec_jump_if_false_pop(*addr)? {
331						continue;
332					}
333				}
334				Instruction::JumpIfTruePop(addr) => {
335					if self.batch_size > 1 {
336						if self.exec_jump_if_true_pop_columnar(*addr)? {
337							continue;
338						}
339					} else if self.exec_jump_if_true_pop(*addr)? {
340						continue;
341					}
342				}
343				Instruction::EnterScope(scope_type) => self.exec_enter_scope(scope_type),
344				Instruction::ExitScope => self.exec_exit_scope()?,
345				Instruction::Break {
346					exit_scopes,
347					addr,
348				} => {
349					if self.batch_size > 1 && !self.loop_mask_stack.is_empty() {
350						self.exec_break_masked(*exit_scopes, *addr)?;
351					} else {
352						self.exec_break(*exit_scopes, *addr)?;
353					}
354					continue;
355				}
356				Instruction::Continue {
357					exit_scopes,
358					addr,
359				} => {
360					if self.batch_size > 1 && !self.loop_mask_stack.is_empty() {
361						self.exec_continue_masked(*exit_scopes, *addr)?;
362					} else {
363						self.exec_continue(*exit_scopes, *addr)?;
364					}
365					continue;
366				}
367
368				Instruction::ForInit {
369					variable_name,
370				} => self.exec_for_init(variable_name)?,
371				Instruction::ForNext {
372					variable_name,
373					addr,
374				} => {
375					if self.exec_for_next(variable_name, *addr)? {
376						continue;
377					}
378				}
379
380				Instruction::DefineFunction(node) => self.exec_define_function(node),
381				Instruction::Call {
382					name,
383					arity,
384					is_procedure_call,
385				} => {
386					self.exec_call(services, tx, name, *arity, *is_procedure_call)?;
387				}
388				Instruction::ReturnValue => {
389					self.exec_return_value()?;
390					return Ok(());
391				}
392				Instruction::ReturnVoid => {
393					self.exec_return_void();
394					return Ok(());
395				}
396				Instruction::DefineClosure(def) => self.exec_define_closure(def),
397
398				Instruction::Emit => self.exec_emit(result),
399				Instruction::Append {
400					target,
401				} => self.exec_append(target)?,
402
403				Instruction::Query(plan) => self.exec_query(services, tx, plan, params)?,
404
405				Instruction::CreateNamespace(n) => {
406					self.exec_ddl(services, tx, |s, t| create_namespace(s, t, n.clone()))?
407				}
408				Instruction::CreateRemoteNamespace(n) => {
409					self.exec_ddl(services, tx, |s, t| create_remote_namespace(s, t, n.clone()))?
410				}
411				Instruction::CreateTable(n) => {
412					self.exec_ddl(services, tx, |s, t| create_table(s, t, n.clone()))?
413				}
414				Instruction::CreateRingBuffer(n) => {
415					self.exec_ddl(services, tx, |s, t| create_ringbuffer(s, t, n.clone()))?
416				}
417				Instruction::CreateDeferredView(n) => {
418					self.exec_ddl(services, tx, |s, t| create_deferred_view(s, t, n.clone()))?
419				}
420				Instruction::CreateTransactionalView(n) => {
421					self.exec_ddl(services, tx, |s, t| create_transactional_view(s, t, n.clone()))?
422				}
423				Instruction::CreateDictionary(n) => {
424					self.exec_ddl(services, tx, |s, t| create_dictionary(s, t, n.clone()))?
425				}
426				Instruction::CreateSumType(n) => {
427					self.exec_ddl(services, tx, |s, t| create_sumtype(s, t, n.clone()))?
428				}
429				Instruction::CreatePrimaryKey(n) => {
430					self.exec_ddl(services, tx, |s, t| create_primary_key(s, t, n.clone()))?
431				}
432				Instruction::CreateColumnProperty(n) => {
433					self.exec_ddl(services, tx, |s, t| create_column_property(s, t, n.clone()))?
434				}
435				Instruction::CreateProcedure(n) => {
436					self.exec_ddl(services, tx, |s, t| create_procedure(s, t, n.clone()))?
437				}
438				Instruction::CreateSeries(n) => {
439					self.exec_ddl(services, tx, |s, t| create_series(s, t, n.clone()))?
440				}
441				Instruction::CreateEvent(n) => {
442					self.exec_ddl(services, tx, |s, t| create_event(s, t, n.clone()))?
443				}
444				Instruction::CreateTag(n) => {
445					self.exec_ddl(services, tx, |s, t| create_tag(s, t, n.clone()))?
446				}
447				Instruction::CreateSource(n) => {
448					self.exec_ddl(services, tx, |s, t| create_source(s, t, n.clone()))?
449				}
450				Instruction::CreateSink(n) => {
451					self.exec_ddl(services, tx, |s, t| create_sink(s, t, n.clone()))?
452				}
453				Instruction::CreateBinding(n) => {
454					self.exec_ddl(services, tx, |s, t| create_binding(s, t, n.clone()))?
455				}
456				Instruction::CreateTest(n) => {
457					self.exec_ddl(services, tx, |s, t| create_test(s, t, n.clone()))?
458				}
459				Instruction::CreateMigration(n) => {
460					self.exec_ddl(services, tx, |s, t| create_migration(s, t, n.clone()))?
461				}
462				Instruction::CreateIdentity(n) => {
463					self.exec_ddl(services, tx, |s, t| create_identity(s, t, n.clone()))?
464				}
465				Instruction::CreateRole(n) => {
466					self.exec_ddl(services, tx, |s, t| create_role(s, t, n.clone()))?
467				}
468				Instruction::CreatePolicy(n) => {
469					self.exec_ddl(services, tx, |s, t| create_policy(s, t, n.clone()))?
470				}
471				Instruction::CreateAuthentication(n) => {
472					self.exec_ddl(services, tx, |s, t| create_authentication(s, t, n.clone()))?
473				}
474				Instruction::Grant(n) => self.exec_ddl(services, tx, |s, t| grant(s, t, n.clone()))?,
475				Instruction::Revoke(n) => {
476					self.exec_ddl(services, tx, |s, t| revoke(s, t, n.clone()))?
477				}
478
479				Instruction::CreateSubscription(n) => {
480					self.exec_ddl_sub(services, tx, |s, t| create_subscription(s, t, n.clone()))?
481				}
482
483				Instruction::AlterTable(n) => {
484					self.exec_ddl(services, tx, |s, t| execute_alter_table(s, t, n.clone()))?
485				}
486				Instruction::AlterRemoteNamespace(n) => {
487					self.exec_ddl(services, tx, |s, t| alter_remote_namespace(s, t, n.clone()))?
488				}
489				Instruction::AlterSequence(n) => {
490					self.exec_ddl(services, tx, |s, t| alter_table_sequence(s, t, n.clone()))?
491				}
492				Instruction::AlterPolicy(n) => {
493					self.exec_ddl(services, tx, |s, t| alter_policy(s, t, n.clone()))?
494				}
495
496				Instruction::DropNamespace(n) => {
497					self.exec_ddl(services, tx, |s, t| drop_namespace(s, t, n.clone()))?
498				}
499				Instruction::DropTable(n) => {
500					self.exec_ddl(services, tx, |s, t| drop_table(s, t, n.clone()))?
501				}
502				Instruction::DropView(n) => {
503					self.exec_ddl(services, tx, |s, t| drop_view(s, t, n.clone()))?
504				}
505				Instruction::DropRingBuffer(n) => {
506					self.exec_ddl(services, tx, |s, t| drop_ringbuffer(s, t, n.clone()))?
507				}
508				Instruction::DropSeries(n) => {
509					self.exec_ddl(services, tx, |s, t| drop_series(s, t, n.clone()))?
510				}
511				Instruction::DropDictionary(n) => {
512					self.exec_ddl(services, tx, |s, t| drop_dictionary(s, t, n.clone()))?
513				}
514				Instruction::DropSumType(n) => {
515					self.exec_ddl(services, tx, |s, t| drop_sumtype(s, t, n.clone()))?
516				}
517				Instruction::DropSource(n) => {
518					self.exec_ddl(services, tx, |s, t| drop_source(s, t, n.clone()))?
519				}
520				Instruction::DropSink(n) => {
521					self.exec_ddl(services, tx, |s, t| drop_sink(s, t, n.clone()))?
522				}
523				Instruction::DropProcedure(n) => {
524					self.exec_ddl(services, tx, |s, t| drop_procedure(s, t, n.clone()))?
525				}
526				Instruction::DropHandler(n) => {
527					self.exec_ddl(services, tx, |s, t| drop_handler(s, t, n.clone()))?
528				}
529				Instruction::DropTest(n) => {
530					self.exec_ddl(services, tx, |s, t| drop_test(s, t, n.clone()))?
531				}
532				Instruction::DropBinding(n) => {
533					self.exec_ddl(services, tx, |s, t| drop_binding(s, t, n.clone()))?
534				}
535				Instruction::DropIdentity(n) => {
536					self.exec_ddl(services, tx, |s, t| drop_identity(s, t, n.clone()))?
537				}
538				Instruction::DropRole(n) => {
539					self.exec_ddl(services, tx, |s, t| drop_role(s, t, n.clone()))?
540				}
541				Instruction::DropPolicy(n) => {
542					self.exec_ddl(services, tx, |s, t| drop_policy(s, t, n.clone()))?
543				}
544				Instruction::DropAuthentication(n) => {
545					self.exec_ddl(services, tx, |s, t| drop_authentication(s, t, n.clone()))?
546				}
547
548				Instruction::DropSubscription(n) => {
549					self.exec_ddl_sub(services, tx, |s, t| drop_subscription(s, t, n.clone()))?
550				}
551
552				Instruction::Delete(n) => {
553					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
554						delete(s, t, n.clone(), p, sym)
555					})?
556				}
557				Instruction::DeleteRingBuffer(n) => {
558					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
559						delete_ringbuffer(s, t, n.clone(), p, sym)
560					})?
561				}
562				Instruction::DeleteSeries(n) => {
563					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
564						delete_series(s, t, n.clone(), p, sym)
565					})?
566				}
567				Instruction::Update(n) => {
568					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
569						update_table(s, t, n.clone(), p, sym)
570					})?
571				}
572				Instruction::UpdateRingBuffer(n) => {
573					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
574						update_ringbuffer(s, t, n.clone(), p, sym)
575					})?
576				}
577				Instruction::UpdateSeries(n) => {
578					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
579						update_series(s, t, n.clone(), p, sym)
580					})?
581				}
582				Instruction::InsertTable(n) => {
583					self.exec_dml_with_mut_symbols(services, tx, |s, t, sym| {
584						insert_table(s, t, n.clone(), sym)
585					})?
586				}
587				Instruction::InsertDictionary(n) => {
588					self.exec_dml_with_mut_symbols(services, tx, |s, t, sym| {
589						insert_dictionary(s, t, n.clone(), sym)
590					})?
591				}
592				Instruction::InsertRingBuffer(n) => {
593					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
594						insert_ringbuffer(s, t, n.clone(), p, sym)
595					})?
596				}
597				Instruction::InsertSeries(n) => {
598					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
599						insert_series(s, t, n.clone(), p, sym)
600					})?
601				}
602
603				Instruction::Dispatch(n) => self.exec_dispatch(services, tx, n, params)?,
604				Instruction::Migrate(n) => self.exec_migrate(services, tx, n)?,
605				Instruction::RollbackMigration(n) => self.exec_rollback_migration(services, tx, n)?,
606				Instruction::AssertBlock(n) => self.exec_assert_block(services, tx, n)?,
607			}
608
609			self.ip += 1;
610
611			if !self.control_flow.is_normal() {
612				return Ok(());
613			}
614		}
615		Ok(())
616	}
617}