Skip to main content

reifydb_engine/
engine.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	ops::Deref,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10	time::Duration,
11};
12
13use reifydb_auth::service::AuthEngine;
14use reifydb_catalog::{
15	catalog::Catalog,
16	materialized::MaterializedCatalog,
17	vtable::{
18		system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
19		tables::UserVTableDataFunction,
20		user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
21	},
22};
23use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
24use reifydb_core::{
25	common::CommitVersion,
26	error::diagnostic::{catalog::namespace_not_found, engine::read_only_rejection},
27	event::{Event, EventBus},
28	execution::ExecutionResult,
29	interface::{
30		WithEventBus,
31		catalog::{
32			column::{Column, ColumnIndex},
33			id::ColumnId,
34			vtable::{VTable, VTableId},
35		},
36	},
37	metric::ExecutionMetrics,
38};
39use reifydb_metric::storage::metric::MetricReader;
40use reifydb_runtime::{
41	actor::system::ActorSystem,
42	context::{clock::Clock, rng::Rng},
43};
44use reifydb_store_single::SingleStore;
45use reifydb_transaction::{
46	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
47	multi::transaction::MultiTransaction,
48	single::SingleTransaction,
49	transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
50};
51use reifydb_type::{
52	error::Error,
53	fragment::Fragment,
54	params::Params,
55	value::{constraint::TypeConstraint, identity::IdentityId},
56};
57use tracing::instrument;
58
59use crate::{
60	Result,
61	bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
62	interceptor::catalog::MaterializedCatalogInterceptor,
63	vm::{Admin, Command, Query, Subscription, executor::Executor, services::EngineConfig},
64};
65
66pub struct StandardEngine(Arc<Inner>);
67
68impl WithEventBus for StandardEngine {
69	fn event_bus(&self) -> &EventBus {
70		&self.event_bus
71	}
72}
73
74impl AuthEngine for StandardEngine {
75	fn begin_admin(&self) -> Result<AdminTransaction> {
76		StandardEngine::begin_admin(self, IdentityId::system())
77	}
78
79	fn begin_query(&self) -> Result<QueryTransaction> {
80		StandardEngine::begin_query(self, IdentityId::system())
81	}
82
83	fn catalog(&self) -> Catalog {
84		StandardEngine::catalog(self)
85	}
86}
87
88// Engine methods (formerly from Engine trait in reifydb-core)
89impl StandardEngine {
90	#[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
91	pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
92		let interceptors = self.interceptors.create();
93		let mut txn = CommandTransaction::new(
94			self.multi.clone(),
95			self.single.clone(),
96			self.event_bus.clone(),
97			interceptors,
98			identity,
99			self.executor.runtime_context.clock.clone(),
100		)?;
101		txn.set_executor(Arc::new(self.executor.clone()));
102		Ok(txn)
103	}
104
105	#[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
106	pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
107		let interceptors = self.interceptors.create();
108		let mut txn = AdminTransaction::new(
109			self.multi.clone(),
110			self.single.clone(),
111			self.event_bus.clone(),
112			interceptors,
113			identity,
114			self.executor.runtime_context.clock.clone(),
115		)?;
116		txn.set_executor(Arc::new(self.executor.clone()));
117		Ok(txn)
118	}
119
120	#[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
121	pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
122		let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
123		txn.set_executor(Arc::new(self.executor.clone()));
124		Ok(txn)
125	}
126
127	/// Get the runtime clock for timestamp operations.
128	pub fn clock(&self) -> &Clock {
129		&self.executor.runtime_context.clock
130	}
131
132	pub fn rng(&self) -> &Rng {
133		&self.executor.runtime_context.rng
134	}
135
136	#[instrument(name = "engine::admin_as", level = "debug", skip(self, params), fields(rql = %rql))]
137	pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
138		if let Err(e) = self.reject_if_read_only() {
139			return ExecutionResult {
140				frames: vec![],
141				error: Some(e),
142				metrics: ExecutionMetrics::default(),
143			};
144		}
145		let mut txn = match self.begin_admin(identity) {
146			Ok(t) => t,
147			Err(mut e) => {
148				e.with_rql(rql.to_string());
149				return ExecutionResult {
150					frames: vec![],
151					error: Some(e),
152					metrics: ExecutionMetrics::default(),
153				};
154			}
155		};
156		let mut outcome = self.executor.admin(
157			&mut txn,
158			Admin {
159				rql,
160				params,
161			},
162		);
163		if outcome.is_ok()
164			&& let Err(mut e) = txn.commit()
165		{
166			e.with_rql(rql.to_string());
167			outcome.error = Some(e);
168		}
169		if let Some(ref mut e) = outcome.error {
170			e.with_rql(rql.to_string());
171		}
172		outcome
173	}
174
175	#[instrument(name = "engine::command_as", level = "debug", skip(self, params), fields(rql = %rql))]
176	pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
177		if let Err(e) = self.reject_if_read_only() {
178			return ExecutionResult {
179				frames: vec![],
180				error: Some(e),
181				metrics: ExecutionMetrics::default(),
182			};
183		}
184		let mut txn = match self.begin_command(identity) {
185			Ok(t) => t,
186			Err(mut e) => {
187				e.with_rql(rql.to_string());
188				return ExecutionResult {
189					frames: vec![],
190					error: Some(e),
191					metrics: ExecutionMetrics::default(),
192				};
193			}
194		};
195		let mut outcome = self.executor.command(
196			&mut txn,
197			Command {
198				rql,
199				params,
200			},
201		);
202		if outcome.is_ok()
203			&& let Err(mut e) = txn.commit()
204		{
205			e.with_rql(rql.to_string());
206			outcome.error = Some(e);
207		}
208		if let Some(ref mut e) = outcome.error {
209			e.with_rql(rql.to_string());
210		}
211		outcome
212	}
213
214	#[instrument(name = "engine::query_as", level = "debug", skip(self, params), fields(rql = %rql))]
215	pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
216		let mut txn = match self.begin_query(identity) {
217			Ok(t) => t,
218			Err(mut e) => {
219				e.with_rql(rql.to_string());
220				return ExecutionResult {
221					frames: vec![],
222					error: Some(e),
223					metrics: ExecutionMetrics::default(),
224				};
225			}
226		};
227		let mut outcome = self.executor.query(
228			&mut txn,
229			Query {
230				rql,
231				params,
232			},
233		);
234		if let Some(ref mut e) = outcome.error {
235			e.with_rql(rql.to_string());
236		}
237		outcome
238	}
239
240	#[instrument(name = "engine::subscribe_as", level = "debug", skip(self, params), fields(rql = %rql))]
241	pub fn subscribe_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
242		let mut txn = match self.begin_query(identity) {
243			Ok(t) => t,
244			Err(mut e) => {
245				e.with_rql(rql.to_string());
246				return ExecutionResult {
247					frames: vec![],
248					error: Some(e),
249					metrics: ExecutionMetrics::default(),
250				};
251			}
252		};
253		let mut outcome = self.executor.subscription(
254			&mut txn,
255			Subscription {
256				rql,
257				params,
258			},
259		);
260		if let Some(ref mut e) = outcome.error {
261			e.with_rql(rql.to_string());
262		}
263		outcome
264	}
265
266	/// Call a procedure by fully-qualified name.
267	#[instrument(name = "engine::procedure_as", level = "debug", skip(self, params), fields(name = %name))]
268	pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> ExecutionResult {
269		if let Err(e) = self.reject_if_read_only() {
270			return ExecutionResult {
271				frames: vec![],
272				error: Some(e),
273				metrics: ExecutionMetrics::default(),
274			};
275		}
276		let mut txn = match self.begin_command(identity) {
277			Ok(t) => t,
278			Err(e) => {
279				return ExecutionResult {
280					frames: vec![],
281					error: Some(e),
282					metrics: ExecutionMetrics::default(),
283				};
284			}
285		};
286		let mut outcome = self.executor.call_procedure(&mut txn, name, &params);
287		if outcome.is_ok()
288			&& let Err(e) = txn.commit()
289		{
290			outcome.error = Some(e);
291		}
292		outcome
293	}
294
295	/// Register a user-defined virtual table.
296	///
297	/// The virtual table will be available for queries using the given namespace and name.
298	///
299	/// # Arguments
300	///
301	/// * `namespace` - The namespace name (e.g., "default", "my_namespace")
302	/// * `name` - The table name
303	/// * `table` - The virtual table implementation
304	///
305	/// # Returns
306	///
307	/// The assigned `VTableId` on success.
308	///
309	/// # Example
310	///
311	/// ```ignore
312	/// use reifydb_engine::vtable::{UserVTable, UserVTableColumn};
313	/// use reifydb_type::value::r#type::Type;
314	/// use reifydb_core::value::Columns;
315	///
316	/// #[derive(Clone)]
317	/// struct MyTable;
318	///
319	/// impl UserVTable for MyTable {
320	///     fn definition(&self) -> Vec<UserVTableColumn> {
321	///         vec![UserVTableColumn::new("id", Type::Uint8)]
322	///     }
323	///     fn get(&self) -> Columns {
324	///         // Return column-oriented data
325	///         Columns::empty()
326	///     }
327	/// }
328	///
329	/// let id = engine.register_virtual_table("default", "my_table", MyTable)?;
330	/// ```
331	pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
332		let catalog = self.materialized_catalog();
333
334		// Look up namespace by name (use max u64 to get latest version)
335		let ns_def = catalog
336			.find_namespace_by_name(namespace)
337			.ok_or_else(|| Error(Box::new(namespace_not_found(Fragment::None, namespace))))?;
338
339		// Allocate a new table ID
340		let table_id = self.executor.virtual_table_registry.allocate_id();
341		// Convert user column definitions to internal column definitions
342		let table_columns = table.vtable();
343		let columns = convert_vtable_user_columns_to_columns(&table_columns);
344
345		// Create the table definition
346		let def = Arc::new(VTable {
347			id: table_id,
348			namespace: ns_def.id(),
349			name: name.to_string(),
350			columns,
351		});
352
353		// Register in catalog (for resolver lookups)
354		catalog.register_vtable_user(def.clone())?;
355		// Create the data function from the UserVTable trait
356		let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
357		// Create and register the entry
358		let entry = UserVTableEntry {
359			def: def.clone(),
360			data_fn,
361		};
362		self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
363		Ok(table_id)
364	}
365}
366
367impl CdcHost for StandardEngine {
368	fn begin_command(&self) -> Result<CommandTransaction> {
369		StandardEngine::begin_command(self, IdentityId::system())
370	}
371
372	fn begin_query(&self) -> Result<QueryTransaction> {
373		StandardEngine::begin_query(self, IdentityId::system())
374	}
375
376	fn current_version(&self) -> Result<CommitVersion> {
377		StandardEngine::current_version(self)
378	}
379
380	fn done_until(&self) -> CommitVersion {
381		StandardEngine::done_until(self)
382	}
383
384	fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
385		StandardEngine::wait_for_mark_timeout(self, version, timeout)
386	}
387
388	fn materialized_catalog(&self) -> &MaterializedCatalog {
389		&self.catalog.materialized
390	}
391}
392
393impl Clone for StandardEngine {
394	fn clone(&self) -> Self {
395		Self(self.0.clone())
396	}
397}
398
399impl Deref for StandardEngine {
400	type Target = Inner;
401
402	fn deref(&self) -> &Self::Target {
403		&self.0
404	}
405}
406
407pub struct Inner {
408	multi: MultiTransaction,
409	single: SingleTransaction,
410	event_bus: EventBus,
411	executor: Executor,
412	interceptors: Arc<InterceptorFactory>,
413	catalog: Catalog,
414	flow_operator_store: SystemFlowOperatorStore,
415	read_only: AtomicBool,
416}
417
418impl StandardEngine {
419	pub fn new(
420		multi: MultiTransaction,
421		single: SingleTransaction,
422		event_bus: EventBus,
423		interceptors: InterceptorFactory,
424		catalog: Catalog,
425		config: EngineConfig,
426	) -> Self {
427		let flow_operator_store = SystemFlowOperatorStore::new();
428		let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
429		event_bus.register(listener);
430
431		// Get the metrics store from IoC to create the stats reader
432		let metrics_store = config
433			.ioc
434			.resolve::<SingleStore>()
435			.expect("SingleStore must be registered in IocContainer for metrics");
436		let stats_reader = MetricReader::new(metrics_store);
437
438		// Register MaterializedCatalogInterceptor as a factory function.
439		let materialized = catalog.materialized.clone();
440		interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
441			interceptors
442				.post_commit
443				.add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
444		}));
445
446		let interceptors = Arc::new(interceptors);
447
448		Self(Arc::new(Inner {
449			multi,
450			single,
451			event_bus,
452			executor: Executor::new(catalog.clone(), config, flow_operator_store.clone(), stats_reader),
453			interceptors,
454			catalog,
455			flow_operator_store,
456			read_only: AtomicBool::new(false),
457		}))
458	}
459
460	/// Create a new set of interceptors from the factory.
461	pub fn create_interceptors(&self) -> Interceptors {
462		self.interceptors.create()
463	}
464
465	/// Register an additional interceptor factory function.
466	///
467	/// The function will be called on every `create()` to augment the base interceptors.
468	/// This is thread-safe and can be called after the engine is constructed (e.g. by subsystems).
469	pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
470		self.interceptors.add_late(factory);
471	}
472
473	/// Begin a query transaction at a specific version.
474	///
475	/// This is used for parallel query execution where multiple tasks need to
476	/// read from the same snapshot (same CommitVersion) for consistency.
477	#[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
478    ))]
479	pub fn begin_query_at_version(&self, version: CommitVersion, identity: IdentityId) -> Result<QueryTransaction> {
480		let mut txn = QueryTransaction::new(
481			self.multi.begin_query_at_version(version)?,
482			self.single.clone(),
483			identity,
484		);
485		txn.set_executor(Arc::new(self.executor.clone()));
486		Ok(txn)
487	}
488
489	#[inline]
490	pub fn multi(&self) -> &MultiTransaction {
491		&self.multi
492	}
493
494	#[inline]
495	pub fn multi_owned(&self) -> MultiTransaction {
496		self.multi.clone()
497	}
498
499	/// Get the actor system
500	#[inline]
501	pub fn actor_system(&self) -> ActorSystem {
502		self.multi.actor_system()
503	}
504
505	#[inline]
506	pub fn single(&self) -> &SingleTransaction {
507		&self.single
508	}
509
510	#[inline]
511	pub fn single_owned(&self) -> SingleTransaction {
512		self.single.clone()
513	}
514
515	#[inline]
516	pub fn emit<E: Event>(&self, event: E) {
517		self.event_bus.emit(event)
518	}
519
520	#[inline]
521	pub fn materialized_catalog(&self) -> &MaterializedCatalog {
522		&self.catalog.materialized
523	}
524
525	/// Returns a `Catalog` instance for catalog lookups.
526	/// The Catalog provides three-tier lookup methods that check transactional changes,
527	/// then MaterializedCatalog, then fall back to storage.
528	#[inline]
529	pub fn catalog(&self) -> Catalog {
530		self.catalog.clone()
531	}
532
533	#[inline]
534	pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
535		&self.flow_operator_store
536	}
537
538	/// Get the current version from the transaction manager
539	#[inline]
540	pub fn current_version(&self) -> Result<CommitVersion> {
541		self.multi.current_version()
542	}
543
544	/// Returns the highest version where ALL prior versions have completed.
545	/// This is useful for CDC polling to know the safe upper bound for fetching
546	/// CDC events - all events up to this version are guaranteed to be in storage.
547	#[inline]
548	pub fn done_until(&self) -> CommitVersion {
549		self.multi.done_until()
550	}
551
552	/// Wait for the watermark to reach the given version with a timeout.
553	/// Returns true if the watermark reached the target, false if timeout occurred.
554	#[inline]
555	pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
556		self.multi.wait_for_mark_timeout(version, timeout)
557	}
558
559	#[inline]
560	pub fn executor(&self) -> Executor {
561		self.executor.clone()
562	}
563
564	/// Get the CDC store from the IoC container.
565	///
566	/// Returns the CdcStore that was registered during engine construction.
567	/// Panics if CdcStore was not registered.
568	#[inline]
569	pub fn cdc_store(&self) -> CdcStore {
570		self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
571	}
572
573	/// Mark this engine as read-only (replica mode).
574	/// Once set, all write-path methods will return ENG_007 immediately.
575	pub fn set_read_only(&self) {
576		self.read_only.store(true, Ordering::SeqCst);
577	}
578
579	/// Whether this engine is in read-only (replica) mode.
580	pub fn is_read_only(&self) -> bool {
581		self.read_only.load(Ordering::SeqCst)
582	}
583
584	pub(crate) fn reject_if_read_only(&self) -> Result<()> {
585		if self.is_read_only() {
586			return Err(Error(Box::new(read_only_rejection(Fragment::None))));
587		}
588		Ok(())
589	}
590
591	pub fn shutdown(&self) {
592		self.interceptors.clear_late();
593		self.executor.ioc.clear();
594	}
595
596	/// Start a bulk insert operation with full validation.
597	///
598	/// This provides a fluent API for fast bulk inserts that bypasses RQL parsing.
599	/// All inserts within a single builder execute in one transaction.
600	///
601	/// # Example
602	///
603	/// ```ignore
604	/// use reifydb_type::params;
605	///
606	/// engine.bulk_insert(&identity)
607	///     .table("namespace.users")
608	///         .row(params!{ id: 1, name: "Alice" })
609	///         .row(params!{ id: 2, name: "Bob" })
610	///         .done()
611	///     .execute()?;
612	/// ```
613	pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
614		BulkInsertBuilder::new(self, identity)
615	}
616
617	/// Start a bulk insert operation with validation disabled (trusted mode).
618	///
619	/// Use this for pre-validated internal data where constraint validation
620	/// can be skipped for maximum performance.
621	///
622	/// # Safety
623	///
624	/// The caller is responsible for ensuring the data conforms to the
625	/// shape constraints. Invalid data may cause undefined behavior.
626	pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
627		BulkInsertBuilder::new_trusted(self, identity)
628	}
629}
630
631/// Convert user column definitions to internal Column format.
632fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
633	columns.iter()
634		.enumerate()
635		.map(|(idx, col)| {
636			// Note: For virtual tables, we use unconstrained for all types.
637			// The nullable field is still available for documentation purposes.
638			let constraint = TypeConstraint::unconstrained(col.data_type.clone());
639			Column {
640				id: ColumnId(idx as u64),
641				name: col.name.clone(),
642				constraint,
643				properties: vec![],
644				index: ColumnIndex(idx as u8),
645				auto_increment: false,
646				dictionary_id: None,
647			}
648		})
649		.collect()
650}