Skip to main content

reifydb_engine/
engine.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	ops::Deref,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10	time::Duration,
11};
12
13use reifydb_auth::service::AuthEngine;
14use reifydb_catalog::{
15	catalog::Catalog,
16	interceptor::CatalogCacheInterceptor,
17	vtable::{
18		system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
19		tables::UserVTableDataFunction,
20		user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
21	},
22};
23use reifydb_cdc::{
24	consume::{host::CdcHost, watermark::CdcConsumerWatermark},
25	produce::watermark::CdcProducerWatermark,
26	storage::CdcStore,
27};
28use reifydb_core::{
29	common::CommitVersion,
30	error::diagnostic::{catalog::namespace_not_found, engine::read_only_rejection},
31	event::{Event, EventBus},
32	execution::ExecutionResult,
33	interface::{
34		WithEventBus,
35		catalog::{
36			column::{Column, ColumnIndex},
37			id::ColumnId,
38			vtable::{VTable, VTableId},
39		},
40	},
41	metric::ExecutionMetrics,
42	util::ioc::IocContainer,
43};
44use reifydb_metric::storage::metric::MetricReader;
45use reifydb_runtime::{
46	actor::{mailbox::ActorRef, system::ActorSystem},
47	context::{clock::Clock, rng::Rng},
48};
49use reifydb_store_single::SingleStore;
50use reifydb_transaction::{
51	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
52	multi::{lease::VersionLeaseGuard, transaction::MultiTransaction},
53	single::SingleTransaction,
54	transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
55};
56use reifydb_type::{
57	error::Error,
58	fragment::Fragment,
59	params::Params,
60	value::{constraint::TypeConstraint, identity::IdentityId},
61};
62use tracing::instrument;
63
64use crate::{
65	Result,
66	bulk_insert::builder::{BulkInsertBuilder, Unchecked, Validated},
67	vm::{
68		Admin, Command, Query, Subscription,
69		executor::Executor,
70		services::{EngineConfig, Services},
71	},
72};
73
74pub struct StandardEngine(Arc<Inner>);
75
76impl WithEventBus for StandardEngine {
77	fn event_bus(&self) -> &EventBus {
78		&self.event_bus
79	}
80}
81
82impl AuthEngine for StandardEngine {
83	fn begin_admin(&self) -> Result<AdminTransaction> {
84		StandardEngine::begin_admin(self, IdentityId::system())
85	}
86
87	fn begin_query(&self) -> Result<QueryTransaction> {
88		StandardEngine::begin_query(self, IdentityId::system())
89	}
90
91	fn catalog(&self) -> Catalog {
92		StandardEngine::catalog(self)
93	}
94}
95
96impl StandardEngine {
97	#[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
98	pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
99		let interceptors = self.interceptors.create();
100		let mut txn = CommandTransaction::new(
101			self.multi.clone(),
102			self.single.clone(),
103			self.event_bus.clone(),
104			interceptors,
105			identity,
106			self.executor.runtime_context.clock.clone(),
107		)?;
108		txn.set_executor(Arc::new(self.executor.clone()));
109		Ok(txn)
110	}
111
112	#[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
113	pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
114		let interceptors = self.interceptors.create();
115		let mut txn = AdminTransaction::new(
116			self.multi.clone(),
117			self.single.clone(),
118			self.event_bus.clone(),
119			interceptors,
120			identity,
121			self.executor.runtime_context.clock.clone(),
122		)?;
123		txn.set_executor(Arc::new(self.executor.clone()));
124		Ok(txn)
125	}
126
127	#[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
128	pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
129		let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
130		txn.set_executor(Arc::new(self.executor.clone()));
131		Ok(txn)
132	}
133
134	pub fn clock(&self) -> &Clock {
135		&self.executor.runtime_context.clock
136	}
137
138	pub fn rng(&self) -> &Rng {
139		&self.executor.runtime_context.rng
140	}
141
142	#[instrument(name = "engine::admin_as", level = "debug", skip(self, params), fields(rql = %rql))]
143	pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
144		if let Err(e) = self.reject_if_read_only() {
145			return ExecutionResult {
146				frames: vec![],
147				error: Some(e),
148				metrics: ExecutionMetrics::default(),
149			};
150		}
151		let mut txn = match self.begin_admin(identity) {
152			Ok(t) => t,
153			Err(mut e) => {
154				e.with_rql(rql.to_string());
155				return ExecutionResult {
156					frames: vec![],
157					error: Some(e),
158					metrics: ExecutionMetrics::default(),
159				};
160			}
161		};
162		let mut outcome = self.executor.admin(
163			&mut txn,
164			Admin {
165				rql,
166				params,
167			},
168		);
169		if outcome.is_ok()
170			&& let Err(mut e) = txn.commit()
171		{
172			e.with_rql(rql.to_string());
173			outcome.error = Some(e);
174		}
175		if let Some(ref mut e) = outcome.error {
176			e.with_rql(rql.to_string());
177		}
178		outcome
179	}
180
181	#[instrument(name = "engine::command_as", level = "debug", skip(self, params), fields(rql = %rql))]
182	pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
183		if let Err(e) = self.reject_if_read_only() {
184			return ExecutionResult {
185				frames: vec![],
186				error: Some(e),
187				metrics: ExecutionMetrics::default(),
188			};
189		}
190		let mut txn = match self.begin_command(identity) {
191			Ok(t) => t,
192			Err(mut e) => {
193				e.with_rql(rql.to_string());
194				return ExecutionResult {
195					frames: vec![],
196					error: Some(e),
197					metrics: ExecutionMetrics::default(),
198				};
199			}
200		};
201		let mut outcome = self.executor.command(
202			&mut txn,
203			Command {
204				rql,
205				params,
206			},
207		);
208		if outcome.is_ok()
209			&& let Err(mut e) = txn.commit()
210		{
211			e.with_rql(rql.to_string());
212			outcome.error = Some(e);
213		}
214		if let Some(ref mut e) = outcome.error {
215			e.with_rql(rql.to_string());
216		}
217		outcome
218	}
219
220	#[instrument(name = "engine::query_as", level = "debug", skip(self, params), fields(rql = %rql))]
221	pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
222		let mut txn = match self.begin_query(identity) {
223			Ok(t) => t,
224			Err(mut e) => {
225				e.with_rql(rql.to_string());
226				return ExecutionResult {
227					frames: vec![],
228					error: Some(e),
229					metrics: ExecutionMetrics::default(),
230				};
231			}
232		};
233		let mut outcome = self.executor.query(
234			&mut txn,
235			Query {
236				rql,
237				params,
238			},
239		);
240		if let Some(ref mut e) = outcome.error {
241			e.with_rql(rql.to_string());
242		}
243		outcome
244	}
245
246	#[instrument(name = "engine::query_as_at_version", level = "debug", skip(self, params, lease), fields(rql = %rql, version = %lease.version().0))]
247	pub fn query_as_at_version(
248		&self,
249		identity: IdentityId,
250		rql: &str,
251		params: Params,
252		lease: &VersionLeaseGuard,
253	) -> ExecutionResult {
254		let mut txn = match self.begin_query_at_version(lease, identity) {
255			Ok(t) => t,
256			Err(mut e) => {
257				e.with_rql(rql.to_string());
258				return ExecutionResult {
259					frames: vec![],
260					error: Some(e),
261					metrics: ExecutionMetrics::default(),
262				};
263			}
264		};
265		let mut outcome = self.executor.query(
266			&mut txn,
267			Query {
268				rql,
269				params,
270			},
271		);
272		if let Some(ref mut e) = outcome.error {
273			e.with_rql(rql.to_string());
274		}
275		outcome
276	}
277
278	#[instrument(name = "engine::query_in_txn", level = "debug", skip(self, txn, params), fields(rql = %rql))]
279	pub fn query_in_txn(&self, txn: &mut QueryTransaction, rql: &str, params: Params) -> ExecutionResult {
280		let mut outcome = self.executor.query(
281			txn,
282			Query {
283				rql,
284				params,
285			},
286		);
287		if let Some(ref mut e) = outcome.error {
288			e.with_rql(rql.to_string());
289		}
290		outcome
291	}
292
293	#[instrument(name = "engine::subscribe_as", level = "debug", skip(self, params), fields(rql = %rql))]
294	pub fn subscribe_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
295		let mut txn = match self.begin_query(identity) {
296			Ok(t) => t,
297			Err(mut e) => {
298				e.with_rql(rql.to_string());
299				return ExecutionResult {
300					frames: vec![],
301					error: Some(e),
302					metrics: ExecutionMetrics::default(),
303				};
304			}
305		};
306		let mut outcome = self.executor.subscription(
307			&mut txn,
308			Subscription {
309				rql,
310				params,
311			},
312		);
313		if let Some(ref mut e) = outcome.error {
314			e.with_rql(rql.to_string());
315		}
316		outcome
317	}
318
319	#[instrument(name = "engine::procedure_as", level = "debug", skip(self, params), fields(name = %name))]
320	pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> ExecutionResult {
321		if let Err(e) = self.reject_if_read_only() {
322			return ExecutionResult {
323				frames: vec![],
324				error: Some(e),
325				metrics: ExecutionMetrics::default(),
326			};
327		}
328		let mut txn = match self.begin_command(identity) {
329			Ok(t) => t,
330			Err(e) => {
331				return ExecutionResult {
332					frames: vec![],
333					error: Some(e),
334					metrics: ExecutionMetrics::default(),
335				};
336			}
337		};
338		let mut outcome = self.executor.call_procedure(&mut txn, name, &params);
339		if outcome.is_ok()
340			&& let Err(e) = txn.commit()
341		{
342			outcome.error = Some(e);
343		}
344		outcome
345	}
346
347	pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
348		let catalog = self.catalog();
349
350		let mut qry = self.begin_query(IdentityId::root())?;
351		let ns_def = catalog
352			.find_namespace_by_name(&mut Transaction::Query(&mut qry), namespace)?
353			.ok_or_else(|| Error(Box::new(namespace_not_found(Fragment::None, namespace))))?;
354
355		let table_id = self.executor.virtual_table_registry.allocate_id();
356
357		let table_columns = table.vtable();
358		let columns = convert_vtable_user_columns_to_columns(&table_columns);
359
360		let def = Arc::new(VTable {
361			id: table_id,
362			namespace: ns_def.id(),
363			name: name.to_string(),
364			columns,
365		});
366
367		catalog.register_vtable_user(def.clone())?;
368
369		let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
370
371		let entry = UserVTableEntry {
372			def: def.clone(),
373			data_fn,
374		};
375		self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
376		Ok(table_id)
377	}
378}
379
380impl CdcHost for StandardEngine {
381	fn begin_command(&self) -> Result<CommandTransaction> {
382		StandardEngine::begin_command(self, IdentityId::system())
383	}
384
385	fn begin_query(&self) -> Result<QueryTransaction> {
386		StandardEngine::begin_query(self, IdentityId::system())
387	}
388
389	fn current_version(&self) -> Result<CommitVersion> {
390		StandardEngine::current_version(self)
391	}
392
393	fn done_until(&self) -> CommitVersion {
394		StandardEngine::done_until(self)
395	}
396
397	fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
398		StandardEngine::wait_for_mark_timeout(self, version, timeout)
399	}
400
401	fn catalog(&self) -> &Catalog {
402		&self.catalog
403	}
404}
405
406impl Clone for StandardEngine {
407	fn clone(&self) -> Self {
408		Self(self.0.clone())
409	}
410}
411
412impl Deref for StandardEngine {
413	type Target = Inner;
414
415	fn deref(&self) -> &Self::Target {
416		&self.0
417	}
418}
419
420pub struct Inner {
421	multi: MultiTransaction,
422	single: SingleTransaction,
423	event_bus: EventBus,
424	executor: Executor,
425	interceptors: Arc<InterceptorFactory>,
426	catalog: Catalog,
427	flow_operator_store: SystemFlowOperatorStore,
428	read_only: AtomicBool,
429}
430
431impl StandardEngine {
432	pub fn new(
433		multi: MultiTransaction,
434		single: SingleTransaction,
435		event_bus: EventBus,
436		interceptors: InterceptorFactory,
437		catalog: Catalog,
438		config: EngineConfig,
439	) -> Self {
440		let flow_operator_store = SystemFlowOperatorStore::new();
441		let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
442		event_bus.register(listener);
443
444		let metrics_store = config
445			.ioc
446			.resolve::<SingleStore>()
447			.expect("SingleStore must be registered in IocContainer for metrics");
448		let stats_reader = MetricReader::new(metrics_store);
449
450		let catalog_for_interceptor = catalog.clone();
451		interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
452			interceptors.post_commit.add(Arc::new(CatalogCacheInterceptor::new(&catalog_for_interceptor)));
453		}));
454
455		let interceptors = Arc::new(interceptors);
456
457		Self(Arc::new(Inner {
458			multi,
459			single,
460			event_bus,
461			executor: Executor::new(catalog.clone(), config, flow_operator_store.clone(), stats_reader),
462			interceptors,
463			catalog,
464			flow_operator_store,
465			read_only: AtomicBool::new(false),
466		}))
467	}
468
469	pub fn create_interceptors(&self) -> Interceptors {
470		self.interceptors.create()
471	}
472
473	pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
474		self.interceptors.add_late(factory);
475	}
476
477	#[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self, lease), fields(version = %lease.version().0
478    ))]
479	pub fn begin_query_at_version(
480		&self,
481		lease: &VersionLeaseGuard,
482		identity: IdentityId,
483	) -> Result<QueryTransaction> {
484		let mut txn =
485			QueryTransaction::new(self.multi.begin_query_at_version(lease)?, self.single.clone(), identity);
486		txn.set_executor(Arc::new(self.executor.clone()));
487		Ok(txn)
488	}
489
490	#[instrument(name = "engine::acquire_version_lease", level = "debug", skip(self), fields(version = %version.0))]
491	pub fn acquire_version_lease(&self, version: CommitVersion) -> Result<VersionLeaseGuard> {
492		self.multi.acquire_version_lease(version)
493	}
494
495	#[instrument(name = "engine::acquire_current_snapshot_lease", level = "debug", skip(self))]
496	pub fn acquire_current_snapshot_lease(&self) -> Result<(CommitVersion, VersionLeaseGuard)> {
497		self.multi.acquire_current_snapshot_lease()
498	}
499
500	#[inline]
501	pub fn multi(&self) -> &MultiTransaction {
502		&self.multi
503	}
504
505	#[inline]
506	pub fn multi_owned(&self) -> MultiTransaction {
507		self.multi.clone()
508	}
509
510	#[inline]
511	pub fn actor_system(&self) -> ActorSystem {
512		self.multi.actor_system()
513	}
514
515	#[inline]
516	pub fn single(&self) -> &SingleTransaction {
517		&self.single
518	}
519
520	#[inline]
521	pub fn single_owned(&self) -> SingleTransaction {
522		self.single.clone()
523	}
524
525	#[inline]
526	pub fn emit<E: Event>(&self, event: E) {
527		self.event_bus.emit(event)
528	}
529
530	#[inline]
531	pub fn catalog(&self) -> Catalog {
532		self.catalog.clone()
533	}
534
535	#[inline]
536	pub fn services(&self) -> Arc<Services> {
537		self.executor.services().clone()
538	}
539
540	#[inline]
541	pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
542		&self.flow_operator_store
543	}
544
545	#[inline]
546	pub fn current_version(&self) -> Result<CommitVersion> {
547		self.multi.current_version()
548	}
549
550	#[inline]
551	pub fn done_until(&self) -> CommitVersion {
552		self.multi.done_until()
553	}
554
555	#[inline]
556	pub fn query_done_until(&self) -> CommitVersion {
557		self.multi.query_done_until()
558	}
559
560	#[inline]
561	pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
562		self.multi.wait_for_mark_timeout(version, timeout)
563	}
564
565	#[inline]
566	pub fn executor(&self) -> Executor {
567		self.executor.clone()
568	}
569
570	#[inline]
571	pub fn ioc(&self) -> &IocContainer {
572		&self.executor.ioc
573	}
574
575	#[inline]
576	pub fn cdc_store(&self) -> CdcStore {
577		self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
578	}
579
580	#[inline]
581	pub fn actor<M: 'static>(&self) -> Option<ActorRef<M>>
582	where
583		ActorRef<M>: Send + Sync,
584	{
585		self.executor.ioc.try_resolve::<ActorRef<M>>()
586	}
587
588	#[inline]
589	pub fn cdc_producer_watermark(&self) -> CommitVersion {
590		self.executor
591			.ioc
592			.resolve::<CdcProducerWatermark>()
593			.expect("CdcProducerWatermark must be registered")
594			.get()
595	}
596
597	#[inline]
598	pub fn cdc_consumer_watermark(&self) -> CommitVersion {
599		self.executor.ioc.try_resolve::<CdcConsumerWatermark>().map(|w| w.get()).unwrap_or(CommitVersion(0))
600	}
601
602	pub fn set_read_only(&self) {
603		self.read_only.store(true, Ordering::SeqCst);
604	}
605
606	pub fn is_read_only(&self) -> bool {
607		self.read_only.load(Ordering::SeqCst)
608	}
609
610	pub(crate) fn reject_if_read_only(&self) -> Result<()> {
611		if self.is_read_only() {
612			return Err(Error(Box::new(read_only_rejection(Fragment::None))));
613		}
614		Ok(())
615	}
616
617	pub fn shutdown(&self) {
618		self.interceptors.clear_late();
619		self.executor.ioc.clear();
620	}
621
622	pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
623		BulkInsertBuilder::new(self, identity)
624	}
625
626	pub fn bulk_insert_unchecked<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Unchecked> {
627		BulkInsertBuilder::new_unchecked(self, identity)
628	}
629}
630
631fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
632	columns.iter()
633		.enumerate()
634		.map(|(idx, col)| {
635			let constraint = TypeConstraint::unconstrained(col.data_type.clone());
636			Column {
637				id: ColumnId(idx as u64),
638				name: col.name.clone(),
639				constraint,
640				properties: vec![],
641				index: ColumnIndex(idx as u8),
642				auto_increment: false,
643				dictionary_id: None,
644			}
645		})
646		.collect()
647}