Skip to main content

reifydb_engine/
engine.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	ops::Deref,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10	time::Duration,
11};
12
13use reifydb_auth::service::AuthEngine;
14use reifydb_catalog::{
15	catalog::Catalog,
16	materialized::MaterializedCatalog,
17	vtable::{
18		system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
19		tables::UserVTableDataFunction,
20		user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
21	},
22};
23use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
24use reifydb_core::{
25	common::CommitVersion,
26	error::diagnostic::{catalog::namespace_not_found, engine::read_only_rejection},
27	event::{Event, EventBus},
28	interface::{
29		WithEventBus,
30		catalog::{
31			column::{Column, ColumnIndex},
32			id::ColumnId,
33			vtable::{VTable, VTableId},
34		},
35	},
36};
37use reifydb_metric_old::metric::MetricReader;
38use reifydb_runtime::{actor::system::ActorSystem, context::clock::Clock};
39use reifydb_store_single::SingleStore;
40use reifydb_transaction::{
41	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
42	multi::transaction::MultiTransaction,
43	single::SingleTransaction,
44	transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
45};
46use reifydb_type::{
47	error::Error,
48	fragment::Fragment,
49	params::Params,
50	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId},
51};
52use tracing::instrument;
53
54use crate::{
55	Result,
56	bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
57	interceptor::catalog::MaterializedCatalogInterceptor,
58	vm::{Admin, Command, Query, Subscription, executor::Executor, services::EngineConfig},
59};
60
61pub struct StandardEngine(Arc<Inner>);
62
63impl WithEventBus for StandardEngine {
64	fn event_bus(&self) -> &EventBus {
65		&self.event_bus
66	}
67}
68
69impl AuthEngine for StandardEngine {
70	fn begin_admin(&self) -> Result<AdminTransaction> {
71		StandardEngine::begin_admin(self, IdentityId::system())
72	}
73
74	fn begin_query(&self) -> Result<QueryTransaction> {
75		StandardEngine::begin_query(self, IdentityId::system())
76	}
77
78	fn catalog(&self) -> Catalog {
79		StandardEngine::catalog(self)
80	}
81}
82
83// Engine methods (formerly from Engine trait in reifydb-core)
84impl StandardEngine {
85	#[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
86	pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
87		let interceptors = self.interceptors.create();
88		let mut txn = CommandTransaction::new(
89			self.multi.clone(),
90			self.single.clone(),
91			self.event_bus.clone(),
92			interceptors,
93			identity,
94			self.executor.runtime_context.clock.clone(),
95		)?;
96		txn.set_executor(Arc::new(self.executor.clone()));
97		Ok(txn)
98	}
99
100	#[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
101	pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
102		let interceptors = self.interceptors.create();
103		let mut txn = AdminTransaction::new(
104			self.multi.clone(),
105			self.single.clone(),
106			self.event_bus.clone(),
107			interceptors,
108			identity,
109			self.executor.runtime_context.clock.clone(),
110		)?;
111		txn.set_executor(Arc::new(self.executor.clone()));
112		Ok(txn)
113	}
114
115	#[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
116	pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
117		let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
118		txn.set_executor(Arc::new(self.executor.clone()));
119		Ok(txn)
120	}
121
122	/// Get the runtime clock for timestamp operations.
123	pub fn clock(&self) -> &Clock {
124		&self.executor.runtime_context.clock
125	}
126
127	#[instrument(name = "engine::admin_as", level = "debug", skip(self, params), fields(rql = %rql))]
128	pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
129		self.reject_if_read_only()?;
130		(|| {
131			let mut txn = self.begin_admin(identity)?;
132			let frames = self.executor.admin(
133				&mut txn,
134				Admin {
135					rql,
136					params,
137				},
138			)?;
139			txn.commit()?;
140			Ok(frames)
141		})()
142		.map_err(|mut err: Error| {
143			err.with_statement(rql.to_string());
144			err
145		})
146	}
147
148	#[instrument(name = "engine::command_as", level = "debug", skip(self, params), fields(rql = %rql))]
149	pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
150		self.reject_if_read_only()?;
151		(|| {
152			let mut txn = self.begin_command(identity)?;
153			let frames = self.executor.command(
154				&mut txn,
155				Command {
156					rql,
157					params,
158				},
159			)?;
160			txn.commit()?;
161			Ok(frames)
162		})()
163		.map_err(|mut err: Error| {
164			err.with_statement(rql.to_string());
165			err
166		})
167	}
168
169	#[instrument(name = "engine::query_as", level = "debug", skip(self, params), fields(rql = %rql))]
170	pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
171		(|| {
172			let mut txn = self.begin_query(identity)?;
173			self.executor.query(
174				&mut txn,
175				Query {
176					rql,
177					params,
178				},
179			)
180		})()
181		.map_err(|mut err: Error| {
182			err.with_statement(rql.to_string());
183			err
184		})
185	}
186
187	#[instrument(name = "engine::subscribe_as", level = "debug", skip(self, params), fields(rql = %rql))]
188	pub fn subscribe_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
189		(|| {
190			let mut txn = self.begin_query(identity)?;
191			let frames = self.executor.subscription(
192				&mut txn,
193				Subscription {
194					rql,
195					params,
196				},
197			)?;
198			Ok(frames)
199		})()
200		.map_err(|mut err: Error| {
201			err.with_statement(rql.to_string());
202			err
203		})
204	}
205
206	/// Call a procedure by fully-qualified name.
207	#[instrument(name = "engine::procedure_as", level = "debug", skip(self, params), fields(name = %name))]
208	pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> Result<Vec<Frame>> {
209		self.reject_if_read_only()?;
210		let mut txn = self.begin_command(identity)?;
211		let frames = self.executor.call_procedure(&mut txn, name, &params)?;
212		txn.commit()?;
213		Ok(frames)
214	}
215
216	/// Register a user-defined virtual table.
217	///
218	/// The virtual table will be available for queries using the given namespace and name.
219	///
220	/// # Arguments
221	///
222	/// * `namespace` - The namespace name (e.g., "default", "my_namespace")
223	/// * `name` - The table name
224	/// * `table` - The virtual table implementation
225	///
226	/// # Returns
227	///
228	/// The assigned `VTableId` on success.
229	///
230	/// # Example
231	///
232	/// ```ignore
233	/// use reifydb_engine::vtable::{UserVTable, UserVTableColumn};
234	/// use reifydb_type::value::r#type::Type;
235	/// use reifydb_core::value::Columns;
236	///
237	/// #[derive(Clone)]
238	/// struct MyTable;
239	///
240	/// impl UserVTable for MyTable {
241	///     fn definition(&self) -> Vec<UserVTableColumn> {
242	///         vec![UserVTableColumn::new("id", Type::Uint8)]
243	///     }
244	///     fn get(&self) -> Columns {
245	///         // Return column-oriented data
246	///         Columns::empty()
247	///     }
248	/// }
249	///
250	/// let id = engine.register_virtual_table("default", "my_table", MyTable)?;
251	/// ```
252	pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
253		let catalog = self.materialized_catalog();
254
255		// Look up namespace by name (use max u64 to get latest version)
256		let ns_def = catalog
257			.find_namespace_by_name(namespace)
258			.ok_or_else(|| Error(Box::new(namespace_not_found(Fragment::None, namespace))))?;
259
260		// Allocate a new table ID
261		let table_id = self.executor.virtual_table_registry.allocate_id();
262		// Convert user column definitions to internal column definitions
263		let table_columns = table.vtable();
264		let columns = convert_vtable_user_columns_to_columns(&table_columns);
265
266		// Create the table definition
267		let def = Arc::new(VTable {
268			id: table_id,
269			namespace: ns_def.id(),
270			name: name.to_string(),
271			columns,
272		});
273
274		// Register in catalog (for resolver lookups)
275		catalog.register_vtable_user(def.clone())?;
276		// Create the data function from the UserVTable trait
277		let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
278		// Create and register the entry
279		let entry = UserVTableEntry {
280			def: def.clone(),
281			data_fn,
282		};
283		self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
284		Ok(table_id)
285	}
286}
287
288impl CdcHost for StandardEngine {
289	fn begin_command(&self) -> Result<CommandTransaction> {
290		StandardEngine::begin_command(self, IdentityId::system())
291	}
292
293	fn begin_query(&self) -> Result<QueryTransaction> {
294		StandardEngine::begin_query(self, IdentityId::system())
295	}
296
297	fn current_version(&self) -> Result<CommitVersion> {
298		StandardEngine::current_version(self)
299	}
300
301	fn done_until(&self) -> CommitVersion {
302		StandardEngine::done_until(self)
303	}
304
305	fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
306		StandardEngine::wait_for_mark_timeout(self, version, timeout)
307	}
308
309	fn materialized_catalog(&self) -> &MaterializedCatalog {
310		&self.catalog.materialized
311	}
312}
313
314impl Clone for StandardEngine {
315	fn clone(&self) -> Self {
316		Self(self.0.clone())
317	}
318}
319
320impl Deref for StandardEngine {
321	type Target = Inner;
322
323	fn deref(&self) -> &Self::Target {
324		&self.0
325	}
326}
327
328pub struct Inner {
329	multi: MultiTransaction,
330	single: SingleTransaction,
331	event_bus: EventBus,
332	executor: Executor,
333	interceptors: Arc<InterceptorFactory>,
334	catalog: Catalog,
335	flow_operator_store: SystemFlowOperatorStore,
336	read_only: AtomicBool,
337}
338
339impl StandardEngine {
340	pub fn new(
341		multi: MultiTransaction,
342		single: SingleTransaction,
343		event_bus: EventBus,
344		interceptors: InterceptorFactory,
345		catalog: Catalog,
346		config: EngineConfig,
347	) -> Self {
348		let flow_operator_store = SystemFlowOperatorStore::new();
349		let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
350		event_bus.register(listener);
351
352		// Get the metrics store from IoC to create the stats reader
353		let metrics_store = config
354			.ioc
355			.resolve::<SingleStore>()
356			.expect("SingleStore must be registered in IocContainer for metrics");
357		let stats_reader = MetricReader::new(metrics_store);
358
359		// Register MaterializedCatalogInterceptor as a factory function.
360		let materialized = catalog.materialized.clone();
361		interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
362			interceptors
363				.post_commit
364				.add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
365		}));
366
367		let interceptors = Arc::new(interceptors);
368
369		Self(Arc::new(Inner {
370			multi,
371			single,
372			event_bus,
373			executor: Executor::new(catalog.clone(), config, flow_operator_store.clone(), stats_reader),
374			interceptors,
375			catalog,
376			flow_operator_store,
377			read_only: AtomicBool::new(false),
378		}))
379	}
380
381	/// Create a new set of interceptors from the factory.
382	pub fn create_interceptors(&self) -> Interceptors {
383		self.interceptors.create()
384	}
385
386	/// Register an additional interceptor factory function.
387	///
388	/// The function will be called on every `create()` to augment the base interceptors.
389	/// This is thread-safe and can be called after the engine is constructed (e.g. by subsystems).
390	pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
391		self.interceptors.add_late(factory);
392	}
393
394	/// Begin a query transaction at a specific version.
395	///
396	/// This is used for parallel query execution where multiple tasks need to
397	/// read from the same snapshot (same CommitVersion) for consistency.
398	#[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
399    ))]
400	pub fn begin_query_at_version(&self, version: CommitVersion, identity: IdentityId) -> Result<QueryTransaction> {
401		let mut txn = QueryTransaction::new(
402			self.multi.begin_query_at_version(version)?,
403			self.single.clone(),
404			identity,
405		);
406		txn.set_executor(Arc::new(self.executor.clone()));
407		Ok(txn)
408	}
409
410	#[inline]
411	pub fn multi(&self) -> &MultiTransaction {
412		&self.multi
413	}
414
415	#[inline]
416	pub fn multi_owned(&self) -> MultiTransaction {
417		self.multi.clone()
418	}
419
420	/// Get the actor system
421	#[inline]
422	pub fn actor_system(&self) -> ActorSystem {
423		self.multi.actor_system()
424	}
425
426	#[inline]
427	pub fn single(&self) -> &SingleTransaction {
428		&self.single
429	}
430
431	#[inline]
432	pub fn single_owned(&self) -> SingleTransaction {
433		self.single.clone()
434	}
435
436	#[inline]
437	pub fn emit<E: Event>(&self, event: E) {
438		self.event_bus.emit(event)
439	}
440
441	#[inline]
442	pub fn materialized_catalog(&self) -> &MaterializedCatalog {
443		&self.catalog.materialized
444	}
445
446	/// Returns a `Catalog` instance for catalog lookups.
447	/// The Catalog provides three-tier lookup methods that check transactional changes,
448	/// then MaterializedCatalog, then fall back to storage.
449	#[inline]
450	pub fn catalog(&self) -> Catalog {
451		self.catalog.clone()
452	}
453
454	#[inline]
455	pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
456		&self.flow_operator_store
457	}
458
459	/// Get the current version from the transaction manager
460	#[inline]
461	pub fn current_version(&self) -> Result<CommitVersion> {
462		self.multi.current_version()
463	}
464
465	/// Returns the highest version where ALL prior versions have completed.
466	/// This is useful for CDC polling to know the safe upper bound for fetching
467	/// CDC events - all events up to this version are guaranteed to be in storage.
468	#[inline]
469	pub fn done_until(&self) -> CommitVersion {
470		self.multi.done_until()
471	}
472
473	/// Wait for the watermark to reach the given version with a timeout.
474	/// Returns true if the watermark reached the target, false if timeout occurred.
475	#[inline]
476	pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
477		self.multi.wait_for_mark_timeout(version, timeout)
478	}
479
480	#[inline]
481	pub fn executor(&self) -> Executor {
482		self.executor.clone()
483	}
484
485	/// Get the CDC store from the IoC container.
486	///
487	/// Returns the CdcStore that was registered during engine construction.
488	/// Panics if CdcStore was not registered.
489	#[inline]
490	pub fn cdc_store(&self) -> CdcStore {
491		self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
492	}
493
494	/// Mark this engine as read-only (replica mode).
495	/// Once set, all write-path methods will return ENG_007 immediately.
496	pub fn set_read_only(&self) {
497		self.read_only.store(true, Ordering::SeqCst);
498	}
499
500	/// Whether this engine is in read-only (replica) mode.
501	pub fn is_read_only(&self) -> bool {
502		self.read_only.load(Ordering::SeqCst)
503	}
504
505	pub(crate) fn reject_if_read_only(&self) -> Result<()> {
506		if self.is_read_only() {
507			return Err(Error(Box::new(read_only_rejection(Fragment::None))));
508		}
509		Ok(())
510	}
511
512	pub fn shutdown(&self) {
513		self.interceptors.clear_late();
514		self.executor.ioc.clear();
515	}
516
517	/// Start a bulk insert operation with full validation.
518	///
519	/// This provides a fluent API for fast bulk inserts that bypasses RQL parsing.
520	/// All inserts within a single builder execute in one transaction.
521	///
522	/// # Example
523	///
524	/// ```ignore
525	/// use reifydb_type::params;
526	///
527	/// engine.bulk_insert(&identity)
528	///     .table("namespace.users")
529	///         .row(params!{ id: 1, name: "Alice" })
530	///         .row(params!{ id: 2, name: "Bob" })
531	///         .done()
532	///     .execute()?;
533	/// ```
534	pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
535		BulkInsertBuilder::new(self, identity)
536	}
537
538	/// Start a bulk insert operation with validation disabled (trusted mode).
539	///
540	/// Use this for pre-validated internal data where constraint validation
541	/// can be skipped for maximum performance.
542	///
543	/// # Safety
544	///
545	/// The caller is responsible for ensuring the data conforms to the
546	/// shape constraints. Invalid data may cause undefined behavior.
547	pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
548		BulkInsertBuilder::new_trusted(self, identity)
549	}
550}
551
552/// Convert user column definitions to internal Column format.
553fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
554	columns.iter()
555		.enumerate()
556		.map(|(idx, col)| {
557			// Note: For virtual tables, we use unconstrained for all types.
558			// The nullable field is still available for documentation purposes.
559			let constraint = TypeConstraint::unconstrained(col.data_type.clone());
560			Column {
561				id: ColumnId(idx as u64),
562				name: col.name.clone(),
563				constraint,
564				properties: vec![],
565				index: ColumnIndex(idx as u8),
566				auto_increment: false,
567				dictionary_id: None,
568			}
569		})
570		.collect()
571}