Skip to main content

reifydb_engine/
engine.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4use std::{
5	ops::Deref,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10	time::Duration,
11};
12
13use reifydb_auth::service::AuthEngine;
14use reifydb_catalog::{
15	catalog::Catalog,
16	interceptor::CatalogCacheInterceptor,
17	vtable::{
18		system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
19		tables::UserVTableDataFunction,
20		user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
21	},
22};
23use reifydb_cdc::{
24	consume::{host::CdcHost, watermark::CdcConsumerWatermark},
25	produce::watermark::CdcProducerWatermark,
26	storage::CdcStore,
27};
28use reifydb_core::{
29	common::CommitVersion,
30	error::diagnostic::engine::read_only_rejection,
31	event::{Event, EventBus},
32	execution::ExecutionResult,
33	interface::{
34		WithEventBus,
35		catalog::{
36			column::{Column, ColumnIndex},
37			id::{ColumnId, NamespaceId},
38			vtable::{VTable, VTableId},
39		},
40	},
41	metric::ExecutionMetrics,
42	util::ioc::IocContainer,
43};
44use reifydb_metric::storage::metric::MetricReader;
45use reifydb_runtime::{
46	actor::{mailbox::ActorRef, system::ActorSystem},
47	context::{clock::Clock, rng::Rng},
48};
49use reifydb_store_single::SingleStore;
50use reifydb_transaction::{
51	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
52	multi::{lease::VersionLeaseGuard, transaction::MultiTransaction},
53	single::SingleTransaction,
54	transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
55};
56use reifydb_value::{
57	error::Error,
58	fragment::Fragment,
59	params::Params,
60	value::{constraint::TypeConstraint, identity::IdentityId},
61};
62use tracing::instrument;
63
64use crate::{
65	Result,
66	bulk_insert::builder::{BulkInsertBuilder, Unchecked, Validated},
67	vm::{
68		Admin, Command, Query, Subscription,
69		executor::Executor,
70		services::{EngineConfig, Services},
71	},
72};
73
74pub struct StandardEngine(Arc<Inner>);
75
76impl WithEventBus for StandardEngine {
77	fn event_bus(&self) -> &EventBus {
78		&self.event_bus
79	}
80}
81
82impl AuthEngine for StandardEngine {
83	fn begin_admin(&self) -> Result<AdminTransaction> {
84		StandardEngine::begin_admin(self, IdentityId::system())
85	}
86
87	fn begin_query(&self) -> Result<QueryTransaction> {
88		StandardEngine::begin_query(self, IdentityId::system())
89	}
90
91	fn catalog(&self) -> Catalog {
92		StandardEngine::catalog(self)
93	}
94}
95
96impl StandardEngine {
97	#[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
98	pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
99		#[cfg(reifydb_assertions)]
100		{
101			assert!(
102				!self.is_read_only(),
103				"begin_command called on a read-only engine: writes are permanently disabled after set_read_only(), so any caller reaching this point has bypassed the reject_if_read_only guard (identity={:?})",
104				identity
105			);
106		}
107		let interceptors = self.interceptors.create();
108		let mut txn = CommandTransaction::new(
109			self.multi.clone(),
110			self.single.clone(),
111			self.event_bus.clone(),
112			interceptors,
113			identity,
114			self.executor.runtime_context.clock.clone(),
115		)?;
116		txn.set_executor(Arc::new(self.executor.clone()));
117		Ok(txn)
118	}
119
120	#[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
121	pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
122		let interceptors = self.interceptors.create();
123		let mut txn = AdminTransaction::new(
124			self.multi.clone(),
125			self.single.clone(),
126			self.event_bus.clone(),
127			interceptors,
128			identity,
129			self.executor.runtime_context.clock.clone(),
130		)?;
131		txn.set_executor(Arc::new(self.executor.clone()));
132		Ok(txn)
133	}
134
135	#[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
136	pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
137		let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
138		txn.set_executor(Arc::new(self.executor.clone()));
139		Ok(txn)
140	}
141
142	pub fn clock(&self) -> &Clock {
143		&self.executor.runtime_context.clock
144	}
145
146	pub fn rng(&self) -> &Rng {
147		&self.executor.runtime_context.rng
148	}
149
150	#[instrument(name = "engine::admin_as", level = "debug", skip(self, params), fields(rql = %rql))]
151	pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
152		if let Err(e) = self.reject_if_read_only() {
153			return ExecutionResult {
154				frames: vec![],
155				error: Some(e),
156				metrics: ExecutionMetrics::default(),
157			};
158		}
159		let mut txn = match self.begin_admin(identity) {
160			Ok(t) => t,
161			Err(mut e) => {
162				e.with_rql(rql.to_string());
163				return ExecutionResult {
164					frames: vec![],
165					error: Some(e),
166					metrics: ExecutionMetrics::default(),
167				};
168			}
169		};
170		let mut outcome = self.executor.admin(
171			&mut txn,
172			Admin {
173				rql,
174				params,
175			},
176		);
177		if outcome.is_ok()
178			&& let Err(mut e) = txn.commit()
179		{
180			e.with_rql(rql.to_string());
181			outcome.error = Some(e);
182		}
183		if let Some(ref mut e) = outcome.error {
184			e.with_rql(rql.to_string());
185		}
186		outcome
187	}
188
189	#[instrument(name = "engine::command_as", level = "debug", skip(self, params), fields(rql = %rql))]
190	pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
191		if let Err(e) = self.reject_if_read_only() {
192			return ExecutionResult {
193				frames: vec![],
194				error: Some(e),
195				metrics: ExecutionMetrics::default(),
196			};
197		}
198		let mut txn = match self.begin_command(identity) {
199			Ok(t) => t,
200			Err(mut e) => {
201				e.with_rql(rql.to_string());
202				return ExecutionResult {
203					frames: vec![],
204					error: Some(e),
205					metrics: ExecutionMetrics::default(),
206				};
207			}
208		};
209		let mut outcome = self.executor.command(
210			&mut txn,
211			Command {
212				rql,
213				params,
214			},
215		);
216		if outcome.is_ok()
217			&& let Err(mut e) = txn.commit()
218		{
219			e.with_rql(rql.to_string());
220			outcome.error = Some(e);
221		}
222		if let Some(ref mut e) = outcome.error {
223			e.with_rql(rql.to_string());
224		}
225		outcome
226	}
227
228	#[instrument(name = "engine::query_as", level = "debug", skip(self, params), fields(rql = %rql))]
229	pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
230		let mut txn = match self.begin_query(identity) {
231			Ok(t) => t,
232			Err(mut e) => {
233				e.with_rql(rql.to_string());
234				return ExecutionResult {
235					frames: vec![],
236					error: Some(e),
237					metrics: ExecutionMetrics::default(),
238				};
239			}
240		};
241		let mut outcome = self.executor.query(
242			&mut txn,
243			Query {
244				rql,
245				params,
246			},
247		);
248		if let Some(ref mut e) = outcome.error {
249			e.with_rql(rql.to_string());
250		}
251		outcome
252	}
253
254	#[instrument(name = "engine::query_as_at_version", level = "debug", skip(self, params, lease), fields(rql = %rql, version = %lease.version().0))]
255	pub fn query_as_at_version(
256		&self,
257		identity: IdentityId,
258		rql: &str,
259		params: Params,
260		lease: &VersionLeaseGuard,
261	) -> ExecutionResult {
262		let mut txn = match self.begin_query_at_version(lease, identity) {
263			Ok(t) => t,
264			Err(mut e) => {
265				e.with_rql(rql.to_string());
266				return ExecutionResult {
267					frames: vec![],
268					error: Some(e),
269					metrics: ExecutionMetrics::default(),
270				};
271			}
272		};
273		let mut outcome = self.executor.query(
274			&mut txn,
275			Query {
276				rql,
277				params,
278			},
279		);
280		if let Some(ref mut e) = outcome.error {
281			e.with_rql(rql.to_string());
282		}
283		outcome
284	}
285
286	#[instrument(name = "engine::query_in_txn", level = "debug", skip(self, txn, params), fields(rql = %rql))]
287	pub fn query_in_txn(&self, txn: &mut QueryTransaction, rql: &str, params: Params) -> ExecutionResult {
288		let mut outcome = self.executor.query(
289			txn,
290			Query {
291				rql,
292				params,
293			},
294		);
295		if let Some(ref mut e) = outcome.error {
296			e.with_rql(rql.to_string());
297		}
298		outcome
299	}
300
301	#[instrument(name = "engine::subscribe_as", level = "debug", skip(self, params), fields(rql = %rql))]
302	pub fn subscribe_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
303		let mut txn = match self.begin_query(identity) {
304			Ok(t) => t,
305			Err(mut e) => {
306				e.with_rql(rql.to_string());
307				return ExecutionResult {
308					frames: vec![],
309					error: Some(e),
310					metrics: ExecutionMetrics::default(),
311				};
312			}
313		};
314		let mut outcome = self.executor.subscription(
315			&mut txn,
316			Subscription {
317				rql,
318				params,
319			},
320		);
321		if let Some(ref mut e) = outcome.error {
322			e.with_rql(rql.to_string());
323		}
324		outcome
325	}
326
327	#[instrument(name = "engine::procedure_as", level = "debug", skip(self, params), fields(name = %name))]
328	pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> ExecutionResult {
329		if let Err(e) = self.reject_if_read_only() {
330			return ExecutionResult {
331				frames: vec![],
332				error: Some(e),
333				metrics: ExecutionMetrics::default(),
334			};
335		}
336		let mut txn = match self.begin_command(identity) {
337			Ok(t) => t,
338			Err(e) => {
339				return ExecutionResult {
340					frames: vec![],
341					error: Some(e),
342					metrics: ExecutionMetrics::default(),
343				};
344			}
345		};
346		let mut outcome = self.executor.call_procedure(&mut txn, name, &params);
347		if outcome.is_ok()
348			&& let Err(e) = txn.commit()
349		{
350			outcome.error = Some(e);
351		}
352		outcome
353	}
354
355	pub fn register_virtual_table<T: UserVTable>(
356		&self,
357		namespace_id: NamespaceId,
358		name: &str,
359		table: T,
360	) -> Result<VTableId> {
361		let catalog = self.catalog();
362		let table_id = self.executor.virtual_table_registry.allocate_id();
363
364		let table_columns = table.vtable();
365		let columns = convert_vtable_user_columns_to_columns(&table_columns);
366
367		let def = Arc::new(VTable {
368			id: table_id,
369			namespace: namespace_id,
370			name: name.to_string(),
371			columns,
372		});
373
374		catalog.register_vtable_user(def.clone())?;
375
376		let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
377
378		let entry = UserVTableEntry {
379			def: def.clone(),
380			data_fn,
381		};
382		self.executor.virtual_table_registry.register(namespace_id, name.to_string(), entry);
383		Ok(table_id)
384	}
385}
386
387impl CdcHost for StandardEngine {
388	fn begin_command(&self) -> Result<CommandTransaction> {
389		StandardEngine::begin_command(self, IdentityId::system())
390	}
391
392	fn begin_query(&self) -> Result<QueryTransaction> {
393		StandardEngine::begin_query(self, IdentityId::system())
394	}
395
396	fn current_version(&self) -> Result<CommitVersion> {
397		StandardEngine::current_version(self)
398	}
399
400	fn done_until(&self) -> CommitVersion {
401		StandardEngine::done_until(self)
402	}
403
404	fn cdc_producer_watermark(&self) -> CommitVersion {
405		StandardEngine::cdc_producer_watermark(self)
406	}
407
408	fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
409		StandardEngine::wait_for_mark_timeout(self, version, timeout)
410	}
411
412	fn catalog(&self) -> &Catalog {
413		&self.catalog
414	}
415}
416
417impl Clone for StandardEngine {
418	fn clone(&self) -> Self {
419		Self(self.0.clone())
420	}
421}
422
423impl Deref for StandardEngine {
424	type Target = Inner;
425
426	fn deref(&self) -> &Self::Target {
427		&self.0
428	}
429}
430
431pub struct Inner {
432	multi: MultiTransaction,
433	single: SingleTransaction,
434	event_bus: EventBus,
435	executor: Executor,
436	interceptors: Arc<InterceptorFactory>,
437	catalog: Catalog,
438	flow_operator_store: SystemFlowOperatorStore,
439	read_only: AtomicBool,
440}
441
442impl StandardEngine {
443	pub fn new(
444		multi: MultiTransaction,
445		single: SingleTransaction,
446		event_bus: EventBus,
447		interceptors: InterceptorFactory,
448		catalog: Catalog,
449		config: EngineConfig,
450	) -> Self {
451		let flow_operator_store = SystemFlowOperatorStore::new();
452		let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
453		event_bus.register(listener);
454
455		let metrics_store = config
456			.ioc
457			.resolve::<SingleStore>()
458			.expect("SingleStore must be registered in IocContainer for metrics");
459		let stats_reader = MetricReader::new(metrics_store);
460
461		let catalog_for_interceptor = catalog.clone();
462		interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
463			interceptors.post_commit.add(Arc::new(CatalogCacheInterceptor::new(&catalog_for_interceptor)));
464		}));
465
466		let interceptors = Arc::new(interceptors);
467
468		Self(Arc::new(Inner {
469			multi,
470			single,
471			event_bus,
472			executor: Executor::new(catalog.clone(), config, flow_operator_store.clone(), stats_reader),
473			interceptors,
474			catalog,
475			flow_operator_store,
476			read_only: AtomicBool::new(false),
477		}))
478	}
479
480	pub fn create_interceptors(&self) -> Interceptors {
481		self.interceptors.create()
482	}
483
484	pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
485		self.interceptors.add_late(factory);
486	}
487
488	#[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self, lease), fields(version = %lease.version().0
489    ))]
490	pub fn begin_query_at_version(
491		&self,
492		lease: &VersionLeaseGuard,
493		identity: IdentityId,
494	) -> Result<QueryTransaction> {
495		let mut txn =
496			QueryTransaction::new(self.multi.begin_query_at_version(lease)?, self.single.clone(), identity);
497		txn.set_executor(Arc::new(self.executor.clone()));
498		Ok(txn)
499	}
500
501	#[instrument(name = "engine::acquire_version_lease", level = "debug", skip(self), fields(version = %version.0))]
502	pub fn acquire_version_lease(&self, version: CommitVersion) -> Result<VersionLeaseGuard> {
503		self.multi.acquire_version_lease(version)
504	}
505
506	#[instrument(name = "engine::acquire_current_snapshot_lease", level = "debug", skip(self))]
507	pub fn acquire_current_snapshot_lease(&self) -> Result<(CommitVersion, VersionLeaseGuard)> {
508		self.multi.acquire_current_snapshot_lease()
509	}
510
511	#[inline]
512	pub fn multi(&self) -> &MultiTransaction {
513		&self.multi
514	}
515
516	#[inline]
517	pub fn multi_owned(&self) -> MultiTransaction {
518		self.multi.clone()
519	}
520
521	#[inline]
522	pub fn actor_system(&self) -> ActorSystem {
523		self.multi.actor_system()
524	}
525
526	#[inline]
527	pub fn single(&self) -> &SingleTransaction {
528		&self.single
529	}
530
531	#[inline]
532	pub fn single_owned(&self) -> SingleTransaction {
533		self.single.clone()
534	}
535
536	#[inline]
537	pub fn emit<E: Event>(&self, event: E) {
538		self.event_bus.emit(event)
539	}
540
541	#[inline]
542	pub fn catalog(&self) -> Catalog {
543		self.catalog.clone()
544	}
545
546	#[inline]
547	pub fn services(&self) -> Arc<Services> {
548		self.executor.services().clone()
549	}
550
551	#[inline]
552	pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
553		&self.flow_operator_store
554	}
555
556	#[inline]
557	pub fn current_version(&self) -> Result<CommitVersion> {
558		self.multi.current_version()
559	}
560
561	#[inline]
562	pub fn done_until(&self) -> CommitVersion {
563		self.multi.done_until()
564	}
565
566	#[inline]
567	pub fn query_done_until(&self) -> CommitVersion {
568		self.multi.query_done_until()
569	}
570
571	#[inline]
572	pub fn oracle_window_count(&self) -> usize {
573		self.multi.oracle_window_count()
574	}
575
576	#[inline]
577	pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
578		self.multi.wait_for_mark_timeout(version, timeout)
579	}
580
581	#[inline]
582	pub fn executor(&self) -> Executor {
583		self.executor.clone()
584	}
585
586	#[inline]
587	pub fn ioc(&self) -> &IocContainer {
588		&self.executor.ioc
589	}
590
591	#[inline]
592	pub fn cdc_store(&self) -> CdcStore {
593		self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
594	}
595
596	#[inline]
597	pub fn actor<M: 'static>(&self) -> Option<ActorRef<M>>
598	where
599		ActorRef<M>: Send + Sync,
600	{
601		self.executor.ioc.try_resolve::<ActorRef<M>>()
602	}
603
604	#[inline]
605	pub fn cdc_producer_watermark(&self) -> CommitVersion {
606		self.executor.ioc.try_resolve::<CdcProducerWatermark>().map(|w| w.get()).unwrap_or(CommitVersion(0))
607	}
608
609	#[inline]
610	pub fn cdc_consumer_watermark(&self) -> CommitVersion {
611		self.executor.ioc.try_resolve::<CdcConsumerWatermark>().map(|w| w.get()).unwrap_or(CommitVersion(0))
612	}
613
614	pub fn set_read_only(&self) {
615		self.read_only.store(true, Ordering::SeqCst);
616	}
617
618	pub fn is_read_only(&self) -> bool {
619		self.read_only.load(Ordering::SeqCst)
620	}
621
622	pub(crate) fn reject_if_read_only(&self) -> Result<()> {
623		if self.is_read_only() {
624			return Err(Error(Box::new(read_only_rejection(Fragment::None))));
625		}
626		Ok(())
627	}
628
629	pub fn shutdown(&self) {
630		self.interceptors.clear_late();
631		self.executor.ioc.clear();
632	}
633
634	pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
635		BulkInsertBuilder::new(self, identity)
636	}
637
638	pub fn bulk_insert_unchecked<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Unchecked> {
639		BulkInsertBuilder::new_unchecked(self, identity)
640	}
641}
642
643fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
644	columns.iter()
645		.enumerate()
646		.map(|(idx, col)| {
647			let constraint = TypeConstraint::unconstrained(col.data_type.clone());
648			Column {
649				id: ColumnId(idx as u64),
650				name: col.name.clone(),
651				constraint,
652				properties: vec![],
653				index: ColumnIndex(idx as u8),
654				auto_increment: false,
655				dictionary_id: None,
656			}
657		})
658		.collect()
659}