Skip to main content

reifydb_engine/
engine.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc, time::Duration};
5
6use reifydb_auth::service::AuthEngine;
7use reifydb_catalog::{
8	catalog::Catalog,
9	materialized::MaterializedCatalog,
10	schema::RowSchemaRegistry,
11	vtable::{
12		system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
13		tables::UserVTableDataFunction,
14		user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
15	},
16};
17use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
18use reifydb_core::{
19	common::CommitVersion,
20	error::diagnostic::catalog::namespace_not_found,
21	event::{Event, EventBus},
22	interface::{
23		WithEventBus,
24		catalog::{
25			column::{Column, ColumnIndex},
26			id::ColumnId,
27			vtable::{VTable, VTableId},
28		},
29	},
30	util::ioc::IocContainer,
31};
32use reifydb_extension::transform::registry::Transforms;
33use reifydb_metric::metric::MetricReader;
34use reifydb_routine::{function::registry::Functions, procedure::registry::Procedures};
35use reifydb_runtime::{actor::system::ActorSystem, context::RuntimeContext};
36use reifydb_store_single::SingleStore;
37use reifydb_transaction::{
38	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
39	multi::transaction::MultiTransaction,
40	single::SingleTransaction,
41	transaction::{
42		admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
43		subscription::SubscriptionTransaction,
44	},
45};
46use reifydb_type::{
47	error::Error,
48	fragment::Fragment,
49	params::Params,
50	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId},
51};
52use tracing::instrument;
53
54#[cfg(not(target_arch = "wasm32"))]
55use crate::remote::RemoteRegistry;
56use crate::{
57	Result,
58	bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
59	interceptor::catalog::MaterializedCatalogInterceptor,
60	vm::{Admin, Command, Query, Subscription, executor::Executor},
61};
62
63pub struct StandardEngine(Arc<Inner>);
64
65impl WithEventBus for StandardEngine {
66	fn event_bus(&self) -> &EventBus {
67		&self.event_bus
68	}
69}
70
71impl AuthEngine for StandardEngine {
72	fn begin_admin(&self) -> Result<AdminTransaction> {
73		StandardEngine::begin_admin(self, IdentityId::system())
74	}
75
76	fn begin_query(&self) -> Result<QueryTransaction> {
77		StandardEngine::begin_query(self, IdentityId::system())
78	}
79
80	fn catalog(&self) -> Catalog {
81		StandardEngine::catalog(self)
82	}
83}
84
85// Engine methods (formerly from Engine trait in reifydb-core)
86impl StandardEngine {
87	#[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
88	pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
89		let interceptors = self.interceptors.create();
90		let mut txn = CommandTransaction::new(
91			self.multi.clone(),
92			self.single.clone(),
93			self.event_bus.clone(),
94			interceptors,
95			identity,
96		)?;
97		txn.set_executor(Arc::new(self.executor.clone()));
98		Ok(txn)
99	}
100
101	#[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
102	pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
103		let interceptors = self.interceptors.create();
104		let mut txn = AdminTransaction::new(
105			self.multi.clone(),
106			self.single.clone(),
107			self.event_bus.clone(),
108			interceptors,
109			identity,
110		)?;
111		txn.set_executor(Arc::new(self.executor.clone()));
112		Ok(txn)
113	}
114
115	#[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
116	pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
117		let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
118		txn.set_executor(Arc::new(self.executor.clone()));
119		Ok(txn)
120	}
121
122	#[instrument(name = "engine::transaction::begin_subscription", level = "debug", skip(self))]
123	pub fn begin_subscription(&self, identity: IdentityId) -> Result<SubscriptionTransaction> {
124		let interceptors = self.interceptors.create();
125		let mut txn = SubscriptionTransaction::new(
126			self.multi.clone(),
127			self.single.clone(),
128			self.event_bus.clone(),
129			interceptors,
130			identity,
131		)?;
132		txn.set_executor(Arc::new(self.executor.clone()));
133		Ok(txn)
134	}
135
136	#[instrument(name = "engine::admin", level = "debug", skip(self, params), fields(rql = %rql))]
137	pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
138		(|| {
139			let mut txn = self.begin_admin(identity)?;
140			let frames = self.executor.admin(
141				&mut txn,
142				Admin {
143					rql,
144					params,
145				},
146			)?;
147			txn.commit()?;
148			Ok(frames)
149		})()
150		.map_err(|mut err: Error| {
151			err.with_statement(rql.to_string());
152			err
153		})
154	}
155
156	#[instrument(name = "engine::command", level = "debug", skip(self, params), fields(rql = %rql))]
157	pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
158		(|| {
159			let mut txn = self.begin_command(identity)?;
160			let frames = self.executor.command(
161				&mut txn,
162				Command {
163					rql,
164					params,
165				},
166			)?;
167			txn.commit()?;
168			Ok(frames)
169		})()
170		.map_err(|mut err: Error| {
171			err.with_statement(rql.to_string());
172			err
173		})
174	}
175
176	#[instrument(name = "engine::query", level = "debug", skip(self, params), fields(rql = %rql))]
177	pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
178		(|| {
179			let mut txn = self.begin_query(identity)?;
180			self.executor.query(
181				&mut txn,
182				Query {
183					rql,
184					params,
185				},
186			)
187		})()
188		.map_err(|mut err: Error| {
189			err.with_statement(rql.to_string());
190			err
191		})
192	}
193
194	#[instrument(name = "engine::subscription", level = "debug", skip(self, params), fields(rql = %rql))]
195	pub fn subscription_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
196		(|| {
197			let mut txn = self.begin_subscription(identity)?;
198			let frames = self.executor.subscription(
199				&mut txn,
200				Subscription {
201					rql,
202					params,
203				},
204			)?;
205			txn.commit()?;
206			Ok(frames)
207		})()
208		.map_err(|mut err: Error| {
209			err.with_statement(rql.to_string());
210			err
211		})
212	}
213
214	/// Call a procedure by fully-qualified name.
215	#[instrument(name = "engine::procedure", level = "debug", skip(self, params), fields(name = %name))]
216	pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> Result<Vec<Frame>> {
217		let mut txn = self.begin_command(identity)?;
218		let frames = self.executor.call_procedure(&mut txn, name, &params)?;
219		txn.commit()?;
220		Ok(frames)
221	}
222
223	/// Register a user-defined virtual table.
224	///
225	/// The virtual table will be available for queries using the given namespace and name.
226	///
227	/// # Arguments
228	///
229	/// * `namespace` - The namespace name (e.g., "default", "my_namespace")
230	/// * `name` - The table name
231	/// * `table` - The virtual table implementation
232	///
233	/// # Returns
234	///
235	/// The assigned `VTableId` on success.
236	///
237	/// # Example
238	///
239	/// ```ignore
240	/// use reifydb_engine::vtable::{UserVTable, UserVTableColumn};
241	/// use reifydb_type::value::r#type::Type;
242	/// use reifydb_core::value::Columns;
243	///
244	/// #[derive(Clone)]
245	/// struct MyTable;
246	///
247	/// impl UserVTable for MyTable {
248	///     fn definition(&self) -> Vec<UserVTableColumn> {
249	///         vec![UserVTableColumn::new("id", Type::Uint8)]
250	///     }
251	///     fn get(&self) -> Columns {
252	///         // Return column-oriented data
253	///         Columns::empty()
254	///     }
255	/// }
256	///
257	/// let id = engine.register_virtual_table("default", "my_table", MyTable)?;
258	/// ```
259	pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
260		let catalog = self.materialized_catalog();
261
262		// Look up namespace by name (use max u64 to get latest version)
263		let ns_def = catalog
264			.find_namespace_by_name(namespace)
265			.ok_or_else(|| Error(namespace_not_found(Fragment::None, namespace)))?;
266
267		// Allocate a new table ID
268		let table_id = self.executor.virtual_table_registry.allocate_id();
269		// Convert user column definitions to internal column definitions
270		let table_columns = table.definition();
271		let columns = convert_vtable_user_columns_to_columns(&table_columns);
272
273		// Create the table definition
274		let def = Arc::new(VTable {
275			id: table_id,
276			namespace: ns_def.id(),
277			name: name.to_string(),
278			columns,
279		});
280
281		// Register in catalog (for resolver lookups)
282		catalog.register_vtable_user(def.clone())?;
283		// Create the data function from the UserVTable trait
284		let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
285		// Create and register the entry
286		let entry = UserVTableEntry {
287			def: def.clone(),
288			data_fn,
289		};
290		self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
291		Ok(table_id)
292	}
293}
294
295impl CdcHost for StandardEngine {
296	fn begin_command(&self) -> Result<CommandTransaction> {
297		StandardEngine::begin_command(self, IdentityId::system())
298	}
299
300	fn begin_query(&self) -> Result<QueryTransaction> {
301		StandardEngine::begin_query(self, IdentityId::system())
302	}
303
304	fn current_version(&self) -> Result<CommitVersion> {
305		StandardEngine::current_version(self)
306	}
307
308	fn done_until(&self) -> CommitVersion {
309		StandardEngine::done_until(self)
310	}
311
312	fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
313		StandardEngine::wait_for_mark_timeout(self, version, timeout)
314	}
315
316	fn row_schema_registry(&self) -> &RowSchemaRegistry {
317		&self.catalog.schema
318	}
319}
320
321impl Clone for StandardEngine {
322	fn clone(&self) -> Self {
323		Self(self.0.clone())
324	}
325}
326
327impl Deref for StandardEngine {
328	type Target = Inner;
329
330	fn deref(&self) -> &Self::Target {
331		&self.0
332	}
333}
334
335pub struct Inner {
336	multi: MultiTransaction,
337	single: SingleTransaction,
338	event_bus: EventBus,
339	executor: Executor,
340	interceptors: Arc<InterceptorFactory>,
341	catalog: Catalog,
342	flow_operator_store: SystemFlowOperatorStore,
343}
344
345impl StandardEngine {
346	pub fn new(
347		multi: MultiTransaction,
348		single: SingleTransaction,
349		event_bus: EventBus,
350		interceptors: InterceptorFactory,
351		catalog: Catalog,
352		runtime_context: RuntimeContext,
353		functions: Functions,
354		procedures: Procedures,
355		transforms: Transforms,
356		ioc: IocContainer,
357		#[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
358	) -> Self {
359		let flow_operator_store = SystemFlowOperatorStore::new();
360		let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
361		event_bus.register(listener);
362
363		// Get the metrics store from IoC to create the stats reader
364		let metrics_store = ioc
365			.resolve::<SingleStore>()
366			.expect("SingleStore must be registered in IocContainer for metrics");
367		let stats_reader = MetricReader::new(metrics_store);
368
369		// Register MaterializedCatalogInterceptor as a factory function.
370		let materialized = catalog.materialized.clone();
371		interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
372			interceptors
373				.post_commit
374				.add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
375		}));
376
377		let interceptors = Arc::new(interceptors);
378
379		Self(Arc::new(Inner {
380			multi,
381			single,
382			event_bus,
383			executor: Executor::new(
384				catalog.clone(),
385				runtime_context,
386				functions,
387				procedures,
388				transforms,
389				flow_operator_store.clone(),
390				stats_reader,
391				ioc,
392				#[cfg(not(target_arch = "wasm32"))]
393				remote_registry,
394			),
395			interceptors,
396			catalog,
397			flow_operator_store,
398		}))
399	}
400
401	/// Create a new set of interceptors from the factory.
402	pub fn create_interceptors(&self) -> Interceptors {
403		self.interceptors.create()
404	}
405
406	/// Register an additional interceptor factory function.
407	///
408	/// The function will be called on every `create()` to augment the base interceptors.
409	/// This is thread-safe and can be called after the engine is constructed (e.g. by subsystems).
410	pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
411		self.interceptors.add_late(factory);
412	}
413
414	/// Begin a query transaction at a specific version.
415	///
416	/// This is used for parallel query execution where multiple tasks need to
417	/// read from the same snapshot (same CommitVersion) for consistency.
418	#[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
419    ))]
420	pub fn begin_query_at_version(&self, version: CommitVersion, identity: IdentityId) -> Result<QueryTransaction> {
421		let mut txn = QueryTransaction::new(
422			self.multi.begin_query_at_version(version)?,
423			self.single.clone(),
424			identity,
425		);
426		txn.set_executor(Arc::new(self.executor.clone()));
427		Ok(txn)
428	}
429
430	#[inline]
431	pub fn multi(&self) -> &MultiTransaction {
432		&self.multi
433	}
434
435	#[inline]
436	pub fn multi_owned(&self) -> MultiTransaction {
437		self.multi.clone()
438	}
439
440	/// Get the actor system
441	#[inline]
442	pub fn actor_system(&self) -> ActorSystem {
443		self.multi.actor_system()
444	}
445
446	#[inline]
447	pub fn single(&self) -> &SingleTransaction {
448		&self.single
449	}
450
451	#[inline]
452	pub fn single_owned(&self) -> SingleTransaction {
453		self.single.clone()
454	}
455
456	#[inline]
457	pub fn emit<E: Event>(&self, event: E) {
458		self.event_bus.emit(event)
459	}
460
461	#[inline]
462	pub fn materialized_catalog(&self) -> &MaterializedCatalog {
463		&self.catalog.materialized
464	}
465
466	/// Returns a `Catalog` instance for catalog lookups.
467	/// The Catalog provides three-tier lookup methods that check transactional changes,
468	/// then MaterializedCatalog, then fall back to storage.
469	#[inline]
470	pub fn catalog(&self) -> Catalog {
471		self.catalog.clone()
472	}
473
474	#[inline]
475	pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
476		&self.flow_operator_store
477	}
478
479	/// Get the current version from the transaction manager
480	#[inline]
481	pub fn current_version(&self) -> Result<CommitVersion> {
482		self.multi.current_version()
483	}
484
485	/// Returns the highest version where ALL prior versions have completed.
486	/// This is useful for CDC polling to know the safe upper bound for fetching
487	/// CDC events - all events up to this version are guaranteed to be in storage.
488	#[inline]
489	pub fn done_until(&self) -> CommitVersion {
490		self.multi.done_until()
491	}
492
493	/// Wait for the watermark to reach the given version with a timeout.
494	/// Returns true if the watermark reached the target, false if timeout occurred.
495	#[inline]
496	pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
497		self.multi.wait_for_mark_timeout(version, timeout)
498	}
499
500	#[inline]
501	pub fn executor(&self) -> Executor {
502		self.executor.clone()
503	}
504
505	/// Get the CDC store from the IoC container.
506	///
507	/// Returns the CdcStore that was registered during engine construction.
508	/// Panics if CdcStore was not registered.
509	#[inline]
510	pub fn cdc_store(&self) -> CdcStore {
511		self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
512	}
513
514	pub fn shutdown(&self) {
515		self.interceptors.clear_late();
516		self.executor.ioc.clear();
517	}
518
519	/// Start a bulk insert operation with full validation.
520	///
521	/// This provides a fluent API for fast bulk inserts that bypasses RQL parsing.
522	/// All inserts within a single builder execute in one transaction.
523	///
524	/// # Example
525	///
526	/// ```ignore
527	/// use reifydb_type::params;
528	///
529	/// engine.bulk_insert(&identity)
530	///     .table("namespace.users")
531	///         .row(params!{ id: 1, name: "Alice" })
532	///         .row(params!{ id: 2, name: "Bob" })
533	///         .done()
534	///     .execute()?;
535	/// ```
536	pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
537		BulkInsertBuilder::new(self, identity)
538	}
539
540	/// Start a bulk insert operation with validation disabled (trusted mode).
541	///
542	/// Use this for pre-validated internal data where constraint validation
543	/// can be skipped for maximum performance.
544	///
545	/// # Safety
546	///
547	/// The caller is responsible for ensuring the data conforms to the
548	/// schema constraints. Invalid data may cause undefined behavior.
549	pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
550		BulkInsertBuilder::new_trusted(self, identity)
551	}
552}
553
554/// Convert user column definitions to internal Column format.
555fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
556	columns.iter()
557		.enumerate()
558		.map(|(idx, col)| {
559			// Note: For virtual tables, we use unconstrained for all types.
560			// The nullable field is still available for documentation purposes.
561			let constraint = TypeConstraint::unconstrained(col.data_type.clone());
562			Column {
563				id: ColumnId(idx as u64),
564				name: col.name.clone(),
565				constraint,
566				properties: vec![],
567				index: ColumnIndex(idx as u8),
568				auto_increment: false,
569				dictionary_id: None,
570			}
571		})
572		.collect()
573}