Skip to main content

reifydb_engine/vm/
vm.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4use std::sync::{Arc, LazyLock};
5
6use reifydb_core::{internal_error, value::column::columns::Columns};
7use reifydb_routine::routine::registry::Routines;
8use reifydb_rql::instruction::{Instruction, ScopeType};
9use reifydb_runtime::context::RuntimeContext;
10use reifydb_transaction::transaction::Transaction;
11use reifydb_value::{
12	params::Params,
13	util::bitvec::BitVec,
14	value::{Value, frame::frame::Frame, identity::IdentityId},
15};
16
17use super::{
18	exec::{
19		mask::{LoopMaskState, MaskFrame, extract_bool_bitvec},
20		stack::strip_dollar_prefix,
21	},
22	instruction::{
23		ddl::{
24			alter::{
25				remote_namespace::alter_remote_namespace, sequence::alter_table_sequence,
26				table::execute_alter_table,
27			},
28			create::{
29				binding::create_binding, deferred::create_deferred_view, dictionary::create_dictionary,
30				migration::create_migration, namespace::create_namespace,
31				primary_key::create_primary_key, procedure::create_procedure,
32				property::create_column_property, remote_namespace::create_remote_namespace,
33				ringbuffer::create_ringbuffer, series::create_series, sink::create_sink,
34				source::create_source, subscription::create_subscription, sumtype::create_sumtype,
35				table::create_table, tag::create_tag, test::create_test,
36				transactional::create_transactional_view,
37			},
38			drop::{
39				binding::drop_binding, dictionary::drop_dictionary, namespace::drop_namespace,
40				procedure::drop_procedure, ringbuffer::drop_ringbuffer, series::drop_series,
41				sink::drop_sink, source::drop_source, subscription::drop_subscription,
42				sumtype::drop_sumtype, table::drop_table, view::drop_view,
43			},
44		},
45		dml::{
46			dictionary_insert::insert_dictionary, ringbuffer_delete::delete_ringbuffer,
47			ringbuffer_insert::insert_ringbuffer, ringbuffer_update::update_ringbuffer,
48			series_delete::delete_series, series_insert::insert_series, series_update::update_series,
49			table_delete::delete, table_insert::insert_table, table_update::update_table,
50		},
51	},
52	services::Services,
53	stack::{ControlFlow, Stack, SymbolTable, Variable},
54};
55use crate::{
56	Result,
57	expression::context::EvalContext,
58	vm::instruction::ddl::{
59		alter::policy::alter_policy,
60		create::{
61			authentication::create_authentication, event::create_event, identity::create_identity,
62			policy::create_policy, role::create_role,
63		},
64		drop::{
65			authentication::drop_authentication, handler::drop_handler, identity::drop_identity,
66			policy::drop_policy, role::drop_role, test::drop_test,
67		},
68		grant::grant,
69		revoke::revoke,
70	},
71};
72
73pub static EMPTY_PARAMS: LazyLock<Params> = LazyLock::new(|| Params::None);
74
75pub struct Vm<'a> {
76	pub(crate) ip: usize,
77	pub(crate) iteration_count: usize,
78	pub(crate) stack: Stack,
79	pub symbols: SymbolTable,
80	pub control_flow: ControlFlow,
81	pub(crate) dispatch_depth: u8,
82
83	pub(crate) batch_size: usize,
84
85	pub(crate) active_mask: Option<BitVec>,
86
87	pub(crate) mask_stack: Vec<MaskFrame>,
88
89	pub(crate) loop_mask_stack: Vec<LoopMaskState>,
90
91	pub(crate) params: &'a Params,
92	pub(crate) routines: &'a Routines,
93	pub(crate) runtime_context: &'a RuntimeContext,
94	pub(crate) identity: IdentityId,
95}
96
97impl<'a> Vm<'a> {
98	pub fn from_services(
99		symbols: SymbolTable,
100		services: &'a Services,
101		params: &'a Params,
102		identity: IdentityId,
103	) -> Self {
104		Self::build(symbols, 1, params, &services.routines, &services.runtime_context, identity)
105	}
106
107	pub fn with_batch_size_from_services(
108		symbols: SymbolTable,
109		batch_size: usize,
110		services: &'a Services,
111		params: &'a Params,
112		identity: IdentityId,
113	) -> Self {
114		Self::build(symbols, batch_size, params, &services.routines, &services.runtime_context, identity)
115	}
116
117	fn build(
118		symbols: SymbolTable,
119		batch_size: usize,
120		params: &'a Params,
121		routines: &'a Routines,
122		runtime_context: &'a RuntimeContext,
123		identity: IdentityId,
124	) -> Self {
125		Self {
126			ip: 0,
127			iteration_count: 0,
128			stack: Stack::new(),
129			symbols,
130			control_flow: ControlFlow::Normal,
131			dispatch_depth: 0,
132			batch_size,
133			active_mask: None,
134			mask_stack: Vec::new(),
135			loop_mask_stack: Vec::new(),
136			params,
137			routines,
138			runtime_context,
139			identity,
140		}
141	}
142
143	pub(crate) fn eval_ctx(&self) -> EvalContext<'_> {
144		EvalContext {
145			params: self.params,
146			symbols: &self.symbols,
147			routines: self.routines,
148			runtime_context: self.runtime_context,
149			arena: None,
150			identity: self.identity,
151			is_aggregate_context: false,
152			columns: Columns::empty(),
153			row_count: self.batch_size,
154			target: None,
155			take: None,
156		}
157	}
158
159	pub(crate) fn run_isolated_body(
160		&mut self,
161		services: &Arc<Services>,
162		tx: &mut Transaction<'_>,
163		instructions: &[Instruction],
164		result: &mut Vec<Frame>,
165	) -> Result<()> {
166		let saved = self.params;
167		self.params = &EMPTY_PARAMS;
168		let run_result = self.run(services, tx, instructions, result);
169		self.params = saved;
170		run_result
171	}
172
173	pub(crate) fn pop_value(&mut self) -> Result<Value> {
174		match self.stack.pop()? {
175			Variable::Columns {
176				columns: c,
177			} if c.is_scalar() => Ok(c.scalar_value()),
178			_ => Err(internal_error!("Expected scalar value on stack")),
179		}
180	}
181
182	pub(crate) fn pop_as_columns(&mut self) -> Result<Columns> {
183		match self.stack.pop()? {
184			Variable::Columns {
185				columns: c,
186				..
187			}
188			| Variable::ForIterator {
189				columns: c,
190				..
191			} => Ok(c),
192			Variable::Closure(_) => Ok(Columns::single_row([("value", Value::none())])),
193		}
194	}
195
196	pub(crate) fn run(
197		&mut self,
198		services: &Arc<Services>,
199		tx: &mut Transaction<'_>,
200		instructions: &[Instruction],
201		result: &mut Vec<Frame>,
202	) -> Result<()> {
203		let params = self.params;
204		while self.ip < instructions.len() {
205			let _ = self.batch_size > 1 && self.check_mask_merge_point()?;
206
207			match &instructions[self.ip] {
208				Instruction::Halt => return Ok(()),
209				Instruction::Nop => {}
210
211				Instruction::PushConst(v) => self.exec_push_const(v),
212				Instruction::PushNone => self.exec_push_none(),
213				Instruction::Pop => self.exec_pop()?,
214				Instruction::Dup => self.exec_dup()?,
215
216				Instruction::LoadVar(f) => self.exec_load_var(f)?,
217				Instruction::StoreVar(f) => {
218					if self.batch_size > 1 && self.is_masked() {
219						let name = strip_dollar_prefix(f.text());
220						let value = self.stack.pop()?;
221						self.exec_store_var_masked(name, value)?;
222					} else if self.batch_size > 1 {
223						let name = strip_dollar_prefix(f.text());
224						let value = self.stack.pop()?;
225						self.symbols.reassign(name.to_string(), value)?;
226					} else {
227						self.exec_store_var(f)?;
228					}
229				}
230				Instruction::DeclareVar(f) => self.exec_declare_var(f)?,
231				Instruction::FieldAccess {
232					object,
233					field,
234				} => self.exec_field_access(object, field)?,
235
236				Instruction::Add => self.exec_add()?,
237				Instruction::Sub => self.exec_sub()?,
238				Instruction::Mul => self.exec_mul()?,
239				Instruction::Div => self.exec_div()?,
240				Instruction::Rem => self.exec_rem()?,
241				Instruction::Negate => self.exec_negate()?,
242				Instruction::LogicNot => self.exec_logic_not()?,
243
244				Instruction::CmpEq => self.exec_cmp_eq()?,
245				Instruction::CmpNe => self.exec_cmp_ne()?,
246				Instruction::CmpLt => self.exec_cmp_lt()?,
247				Instruction::CmpLe => self.exec_cmp_le()?,
248				Instruction::CmpGt => self.exec_cmp_gt()?,
249				Instruction::CmpGe => self.exec_cmp_ge()?,
250
251				Instruction::LogicAnd => self.exec_logic_and()?,
252				Instruction::LogicOr => self.exec_logic_or()?,
253				Instruction::LogicXor => self.exec_logic_xor()?,
254				Instruction::Between => self.exec_between()?,
255				Instruction::InList {
256					count,
257					negated,
258				} => self.exec_in_list(*count, *negated)?,
259				Instruction::Cast(target) => self.exec_cast(target)?,
260
261				Instruction::Jump(addr) => {
262					if self.batch_size > 1
263						&& (!self.mask_stack.is_empty() || !self.loop_mask_stack.is_empty())
264					{
265						if self.exec_jump_masked(*addr)? {
266							continue;
267						}
268					} else {
269						self.exec_jump(*addr)?;
270						continue;
271					}
272				}
273				Instruction::JumpIfFalsePop(addr) => {
274					if self.batch_size > 1 {
275						let is_while_loop = instructions.get(self.ip + 1).is_some_and(|next| {
276							matches!(next, Instruction::EnterScope(ScopeType::Loop))
277						});
278
279						if is_while_loop
280							&& self.loop_mask_stack
281								.last()
282								.is_none_or(|s| s.loop_end_addr != *addr)
283						{
284							let var = self.stack.pop()?;
285							let bool_bv = extract_bool_bitvec(&var)?;
286							let parent = self.effective_mask();
287							let candidate = self.intersect_condition(&bool_bv);
288
289							if candidate == parent {
290							} else if candidate.none() {
291								self.ip = *addr;
292								continue;
293							} else {
294								self.enter_loop_mask(*addr, candidate);
295							}
296						} else if self.exec_jump_if_false_pop_columnar(*addr)? {
297							continue;
298						}
299					} else if self.exec_jump_if_false_pop(*addr)? {
300						continue;
301					}
302				}
303				Instruction::JumpIfTruePop(addr) => {
304					if self.batch_size > 1 {
305						if self.exec_jump_if_true_pop_columnar(*addr)? {
306							continue;
307						}
308					} else if self.exec_jump_if_true_pop(*addr)? {
309						continue;
310					}
311				}
312				Instruction::EnterScope(scope_type) => self.exec_enter_scope(scope_type),
313				Instruction::ExitScope => self.exec_exit_scope()?,
314				Instruction::Break {
315					exit_scopes,
316					addr,
317				} => {
318					if self.batch_size > 1 && !self.loop_mask_stack.is_empty() {
319						self.exec_break_masked(*exit_scopes, *addr)?;
320					} else {
321						self.exec_break(*exit_scopes, *addr)?;
322					}
323					continue;
324				}
325				Instruction::Continue {
326					exit_scopes,
327					addr,
328				} => {
329					if self.batch_size > 1 && !self.loop_mask_stack.is_empty() {
330						self.exec_continue_masked(*exit_scopes, *addr)?;
331					} else {
332						self.exec_continue(*exit_scopes, *addr)?;
333					}
334					continue;
335				}
336
337				Instruction::ForInit {
338					variable_name,
339				} => self.exec_for_init(variable_name)?,
340				Instruction::ForNext {
341					variable_name,
342					addr,
343				} => {
344					if self.exec_for_next(variable_name, *addr)? {
345						continue;
346					}
347				}
348
349				Instruction::DefineFunction(node) => self.exec_define_function(node),
350				Instruction::Call {
351					name,
352					arity,
353					is_procedure_call,
354				} => {
355					self.exec_call(services, tx, name, *arity, *is_procedure_call)?;
356				}
357				Instruction::ReturnValue => {
358					self.exec_return_value()?;
359					return Ok(());
360				}
361				Instruction::ReturnVoid => {
362					self.exec_return_void();
363					return Ok(());
364				}
365				Instruction::DefineClosure(def) => self.exec_define_closure(def),
366
367				Instruction::Emit => self.exec_emit(result),
368				Instruction::Append {
369					target,
370				} => self.exec_append(target)?,
371
372				Instruction::Query(plan) => self.exec_query(services, tx, plan, params)?,
373
374				Instruction::CreateNamespace(n) => {
375					self.exec_ddl(services, tx, |s, t| create_namespace(s, t, n.clone()))?
376				}
377				Instruction::CreateRemoteNamespace(n) => {
378					self.exec_ddl(services, tx, |s, t| create_remote_namespace(s, t, n.clone()))?
379				}
380				Instruction::CreateTable(n) => {
381					self.exec_ddl(services, tx, |s, t| create_table(s, t, n.clone()))?
382				}
383				Instruction::CreateRingBuffer(n) => {
384					self.exec_ddl(services, tx, |s, t| create_ringbuffer(s, t, n.clone()))?
385				}
386				Instruction::CreateDeferredView(n) => {
387					self.exec_ddl(services, tx, |s, t| create_deferred_view(s, t, n.clone()))?
388				}
389				Instruction::CreateTransactionalView(n) => {
390					self.exec_ddl(services, tx, |s, t| create_transactional_view(s, t, n.clone()))?
391				}
392				Instruction::CreateDictionary(n) => {
393					self.exec_ddl(services, tx, |s, t| create_dictionary(s, t, n.clone()))?
394				}
395				Instruction::CreateSumType(n) => {
396					self.exec_ddl(services, tx, |s, t| create_sumtype(s, t, n.clone()))?
397				}
398				Instruction::CreatePrimaryKey(n) => {
399					self.exec_ddl(services, tx, |s, t| create_primary_key(s, t, n.clone()))?
400				}
401				Instruction::CreateColumnProperty(n) => {
402					self.exec_ddl(services, tx, |s, t| create_column_property(s, t, n.clone()))?
403				}
404				Instruction::CreateProcedure(n) => {
405					self.exec_ddl(services, tx, |s, t| create_procedure(s, t, n.clone()))?
406				}
407				Instruction::CreateSeries(n) => {
408					self.exec_ddl(services, tx, |s, t| create_series(s, t, n.clone()))?
409				}
410				Instruction::CreateEvent(n) => {
411					self.exec_ddl(services, tx, |s, t| create_event(s, t, n.clone()))?
412				}
413				Instruction::CreateTag(n) => {
414					self.exec_ddl(services, tx, |s, t| create_tag(s, t, n.clone()))?
415				}
416				Instruction::CreateSource(n) => {
417					self.exec_ddl(services, tx, |s, t| create_source(s, t, n.clone()))?
418				}
419				Instruction::CreateSink(n) => {
420					self.exec_ddl(services, tx, |s, t| create_sink(s, t, n.clone()))?
421				}
422				Instruction::CreateBinding(n) => {
423					self.exec_ddl(services, tx, |s, t| create_binding(s, t, n.clone()))?
424				}
425				Instruction::CreateTest(n) => {
426					self.exec_ddl(services, tx, |s, t| create_test(s, t, n.clone()))?
427				}
428				Instruction::CreateMigration(n) => {
429					self.exec_ddl(services, tx, |s, t| create_migration(s, t, n.clone()))?
430				}
431				Instruction::CreateIdentity(n) => {
432					self.exec_ddl(services, tx, |s, t| create_identity(s, t, n.clone()))?
433				}
434				Instruction::CreateRole(n) => {
435					self.exec_ddl(services, tx, |s, t| create_role(s, t, n.clone()))?
436				}
437				Instruction::CreatePolicy(n) => {
438					self.exec_ddl(services, tx, |s, t| create_policy(s, t, n.clone()))?
439				}
440				Instruction::CreateAuthentication(n) => {
441					self.exec_ddl(services, tx, |s, t| create_authentication(s, t, n.clone()))?
442				}
443				Instruction::Grant(n) => self.exec_ddl(services, tx, |s, t| grant(s, t, n.clone()))?,
444				Instruction::Revoke(n) => {
445					self.exec_ddl(services, tx, |s, t| revoke(s, t, n.clone()))?
446				}
447
448				Instruction::CreateSubscription(n) => {
449					self.exec_ddl_sub(services, tx, |s, t| create_subscription(s, t, n.clone()))?
450				}
451
452				Instruction::AlterTable(n) => {
453					self.exec_ddl(services, tx, |s, t| execute_alter_table(s, t, n.clone()))?
454				}
455				Instruction::AlterRemoteNamespace(n) => {
456					self.exec_ddl(services, tx, |s, t| alter_remote_namespace(s, t, n.clone()))?
457				}
458				Instruction::AlterSequence(n) => {
459					self.exec_ddl(services, tx, |s, t| alter_table_sequence(s, t, n.clone()))?
460				}
461				Instruction::AlterPolicy(n) => {
462					self.exec_ddl(services, tx, |s, t| alter_policy(s, t, n.clone()))?
463				}
464
465				Instruction::DropNamespace(n) => {
466					self.exec_ddl(services, tx, |s, t| drop_namespace(s, t, n.clone()))?
467				}
468				Instruction::DropTable(n) => {
469					self.exec_ddl(services, tx, |s, t| drop_table(s, t, n.clone()))?
470				}
471				Instruction::DropView(n) => {
472					self.exec_ddl(services, tx, |s, t| drop_view(s, t, n.clone()))?
473				}
474				Instruction::DropRingBuffer(n) => {
475					self.exec_ddl(services, tx, |s, t| drop_ringbuffer(s, t, n.clone()))?
476				}
477				Instruction::DropSeries(n) => {
478					self.exec_ddl(services, tx, |s, t| drop_series(s, t, n.clone()))?
479				}
480				Instruction::DropDictionary(n) => {
481					self.exec_ddl(services, tx, |s, t| drop_dictionary(s, t, n.clone()))?
482				}
483				Instruction::DropSumType(n) => {
484					self.exec_ddl(services, tx, |s, t| drop_sumtype(s, t, n.clone()))?
485				}
486				Instruction::DropSource(n) => {
487					self.exec_ddl(services, tx, |s, t| drop_source(s, t, n.clone()))?
488				}
489				Instruction::DropSink(n) => {
490					self.exec_ddl(services, tx, |s, t| drop_sink(s, t, n.clone()))?
491				}
492				Instruction::DropProcedure(n) => {
493					self.exec_ddl(services, tx, |s, t| drop_procedure(s, t, n.clone()))?
494				}
495				Instruction::DropHandler(n) => {
496					self.exec_ddl(services, tx, |s, t| drop_handler(s, t, n.clone()))?
497				}
498				Instruction::DropTest(n) => {
499					self.exec_ddl(services, tx, |s, t| drop_test(s, t, n.clone()))?
500				}
501				Instruction::DropBinding(n) => {
502					self.exec_ddl(services, tx, |s, t| drop_binding(s, t, n.clone()))?
503				}
504				Instruction::DropIdentity(n) => {
505					self.exec_ddl(services, tx, |s, t| drop_identity(s, t, n.clone()))?
506				}
507				Instruction::DropRole(n) => {
508					self.exec_ddl(services, tx, |s, t| drop_role(s, t, n.clone()))?
509				}
510				Instruction::DropPolicy(n) => {
511					self.exec_ddl(services, tx, |s, t| drop_policy(s, t, n.clone()))?
512				}
513				Instruction::DropAuthentication(n) => {
514					self.exec_ddl(services, tx, |s, t| drop_authentication(s, t, n.clone()))?
515				}
516
517				Instruction::DropSubscription(n) => {
518					self.exec_ddl_sub(services, tx, |s, t| drop_subscription(s, t, n.clone()))?
519				}
520
521				Instruction::Delete(n) => {
522					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
523						delete(s, t, n.clone(), p, sym)
524					})?
525				}
526				Instruction::DeleteRingBuffer(n) => {
527					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
528						delete_ringbuffer(s, t, n.clone(), p, sym)
529					})?
530				}
531				Instruction::DeleteSeries(n) => {
532					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
533						delete_series(s, t, n.clone(), p, sym)
534					})?
535				}
536				Instruction::Update(n) => {
537					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
538						update_table(s, t, n.clone(), p, sym)
539					})?
540				}
541				Instruction::UpdateRingBuffer(n) => {
542					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
543						update_ringbuffer(s, t, n.clone(), p, sym)
544					})?
545				}
546				Instruction::UpdateSeries(n) => {
547					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
548						update_series(s, t, n.clone(), p, sym)
549					})?
550				}
551				Instruction::InsertTable(n) => {
552					self.exec_dml_with_mut_symbols(services, tx, |s, t, sym| {
553						insert_table(s, t, n.clone(), sym)
554					})?
555				}
556				Instruction::InsertDictionary(n) => {
557					self.exec_dml_with_mut_symbols(services, tx, |s, t, sym| {
558						insert_dictionary(s, t, n.clone(), sym)
559					})?
560				}
561				Instruction::InsertRingBuffer(n) => {
562					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
563						insert_ringbuffer(s, t, n.clone(), p, sym)
564					})?
565				}
566				Instruction::InsertSeries(n) => {
567					self.exec_dml_with_params(services, tx, params, |s, t, p, sym| {
568						insert_series(s, t, n.clone(), p, sym)
569					})?
570				}
571
572				Instruction::Dispatch(n) => self.exec_dispatch(services, tx, n, params)?,
573				Instruction::Migrate(n) => self.exec_migrate(services, tx, n)?,
574				Instruction::RollbackMigration(n) => self.exec_rollback_migration(services, tx, n)?,
575				Instruction::AssertBlock(n) => self.exec_assert_block(services, tx, n)?,
576			}
577
578			self.ip += 1;
579
580			if !self.control_flow.is_normal() {
581				return Ok(());
582			}
583		}
584		Ok(())
585	}
586}