Skip to main content

reifydb_engine/
engine.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc, time::Duration};
5
6use reifydb_catalog::{
7	catalog::Catalog,
8	materialized::MaterializedCatalog,
9	schema::SchemaRegistry,
10	vtable::{
11		system::flow_operator_store::{FlowOperatorEventListener, FlowOperatorStore},
12		tables::UserVTableDataFunction,
13		user::{UserVTable, UserVTableColumnDef, registry::UserVTableEntry},
14	},
15};
16use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
17use reifydb_core::{
18	common::CommitVersion,
19	error::diagnostic::catalog::namespace_not_found,
20	event::{Event, EventBus},
21	interface::{
22		WithEventBus,
23		catalog::{
24			column::{ColumnDef, ColumnIndex},
25			id::ColumnId,
26			vtable::{VTableDef, VTableId},
27		},
28	},
29	util::ioc::IocContainer,
30};
31use reifydb_function::registry::Functions;
32use reifydb_metric::metric::MetricReader;
33use reifydb_runtime::{actor::system::ActorSystem, clock::Clock};
34use reifydb_store_single::SingleStore;
35use reifydb_transaction::{
36	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
37	multi::transaction::MultiTransaction,
38	single::SingleTransaction,
39	transaction::{
40		admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
41		subscription::SubscriptionTransaction,
42	},
43};
44use reifydb_type::{
45	error::Error,
46	fragment::Fragment,
47	params::Params,
48	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId},
49};
50use tracing::instrument;
51
52#[cfg(not(target_arch = "wasm32"))]
53use crate::remote::RemoteRegistry;
54use crate::{
55	Result,
56	bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
57	interceptor::catalog::MaterializedCatalogInterceptor,
58	procedure::registry::Procedures,
59	transform::registry::Transforms,
60	vm::{Admin, Command, Query, Subscription, executor::Executor},
61};
62
63pub struct StandardEngine(Arc<Inner>);
64
65impl WithEventBus for StandardEngine {
66	fn event_bus(&self) -> &EventBus {
67		&self.event_bus
68	}
69}
70
71// Engine methods (formerly from Engine trait in reifydb-core)
72impl StandardEngine {
73	#[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
74	pub fn begin_command(&self) -> Result<CommandTransaction> {
75		let interceptors = self.interceptors.create();
76		CommandTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
77	}
78
79	#[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
80	pub fn begin_admin(&self) -> Result<AdminTransaction> {
81		let interceptors = self.interceptors.create();
82		AdminTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
83	}
84
85	#[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
86	pub fn begin_query(&self) -> Result<QueryTransaction> {
87		Ok(QueryTransaction::new(self.multi.begin_query()?, self.single.clone()))
88	}
89
90	#[instrument(name = "engine::transaction::begin_subscription", level = "debug", skip(self))]
91	pub fn begin_subscription(&self) -> Result<SubscriptionTransaction> {
92		let interceptors = self.interceptors.create();
93		SubscriptionTransaction::new(
94			self.multi.clone(),
95			self.single.clone(),
96			self.event_bus.clone(),
97			interceptors,
98		)
99	}
100
101	#[instrument(name = "engine::admin", level = "debug", skip(self, params), fields(rql = %rql))]
102	pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
103		(|| {
104			let mut txn = self.begin_admin()?;
105			let frames = self.executor.admin(
106				&mut txn,
107				Admin {
108					rql,
109					params,
110					identity,
111				},
112			)?;
113			txn.commit()?;
114			Ok(frames)
115		})()
116		.map_err(|mut err: Error| {
117			err.with_statement(rql.to_string());
118			err
119		})
120	}
121
122	#[instrument(name = "engine::command", level = "debug", skip(self, params), fields(rql = %rql))]
123	pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
124		(|| {
125			let mut txn = self.begin_command()?;
126			let frames = self.executor.command(
127				&mut txn,
128				Command {
129					rql,
130					params,
131					identity,
132				},
133			)?;
134			txn.commit()?;
135			Ok(frames)
136		})()
137		.map_err(|mut err: Error| {
138			err.with_statement(rql.to_string());
139			err
140		})
141	}
142
143	#[instrument(name = "engine::query", level = "debug", skip(self, params), fields(rql = %rql))]
144	pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
145		(|| {
146			let mut txn = self.begin_query()?;
147			self.executor.query(
148				&mut txn,
149				Query {
150					rql,
151					params,
152					identity,
153				},
154			)
155		})()
156		.map_err(|mut err: Error| {
157			err.with_statement(rql.to_string());
158			err
159		})
160	}
161
162	#[instrument(name = "engine::subscription", level = "debug", skip(self, params), fields(rql = %rql))]
163	pub fn subscription_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
164		(|| {
165			let mut txn = self.begin_subscription()?;
166			let frames = self.executor.subscription(
167				&mut txn,
168				Subscription {
169					rql,
170					params,
171					identity,
172				},
173			)?;
174			txn.commit()?;
175			Ok(frames)
176		})()
177		.map_err(|mut err: Error| {
178			err.with_statement(rql.to_string());
179			err
180		})
181	}
182
183	/// Call a procedure by fully-qualified name.
184	#[instrument(name = "engine::procedure", level = "debug", skip(self, params), fields(name = %name))]
185	pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> Result<Vec<Frame>> {
186		let mut txn = self.begin_command()?;
187		let frames = self.executor.call_procedure(&mut txn, identity, name, &params)?;
188		txn.commit()?;
189		Ok(frames)
190	}
191
192	/// Register a user-defined virtual table.
193	///
194	/// The virtual table will be available for queries using the given namespace and name.
195	///
196	/// # Arguments
197	///
198	/// * `namespace` - The namespace name (e.g., "default", "my_namespace")
199	/// * `name` - The table name
200	/// * `table` - The virtual table implementation
201	///
202	/// # Returns
203	///
204	/// The assigned `VTableId` on success.
205	///
206	/// # Example
207	///
208	/// ```ignore
209	/// use reifydb_engine::vtable::{UserVTable, UserVTableColumnDef};
210	/// use reifydb_type::value::r#type::Type;
211	/// use reifydb_core::value::Columns;
212	///
213	/// #[derive(Clone)]
214	/// struct MyTable;
215	///
216	/// impl UserVTable for MyTable {
217	///     fn definition(&self) -> Vec<UserVTableColumnDef> {
218	///         vec![UserVTableColumnDef::new("id", Type::Uint8)]
219	///     }
220	///     fn get(&self) -> Columns {
221	///         // Return column-oriented data
222	///         Columns::empty()
223	///     }
224	/// }
225	///
226	/// let id = engine.register_virtual_table("default", "my_table", MyTable)?;
227	/// ```
228	pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
229		let catalog = self.materialized_catalog();
230
231		// Look up namespace by name (use max u64 to get latest version)
232		let ns_def = catalog
233			.find_namespace_by_name(namespace)
234			.ok_or_else(|| Error(namespace_not_found(Fragment::None, namespace)))?;
235
236		// Allocate a new table ID
237		let table_id = self.executor.virtual_table_registry.allocate_id();
238		// Convert user column definitions to internal column definitions
239		let table_columns = table.definition();
240		let columns = convert_vtable_user_columns_to_column_defs(&table_columns);
241
242		// Create the table definition
243		let def = Arc::new(VTableDef {
244			id: table_id,
245			namespace: ns_def.id(),
246			name: name.to_string(),
247			columns,
248		});
249
250		// Register in catalog (for resolver lookups)
251		catalog.register_vtable_user(def.clone())?;
252		// Create the data function from the UserVTable trait
253		let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
254		// Create and register the entry
255		let entry = UserVTableEntry {
256			def: def.clone(),
257			data_fn,
258		};
259		self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
260		Ok(table_id)
261	}
262}
263
264impl CdcHost for StandardEngine {
265	fn begin_command(&self) -> Result<CommandTransaction> {
266		StandardEngine::begin_command(self)
267	}
268
269	fn begin_query(&self) -> Result<QueryTransaction> {
270		StandardEngine::begin_query(self)
271	}
272
273	fn current_version(&self) -> Result<CommitVersion> {
274		StandardEngine::current_version(self)
275	}
276
277	fn done_until(&self) -> CommitVersion {
278		StandardEngine::done_until(self)
279	}
280
281	fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
282		StandardEngine::wait_for_mark_timeout(self, version, timeout)
283	}
284
285	fn schema_registry(&self) -> &SchemaRegistry {
286		&self.catalog.schema
287	}
288}
289
290impl Clone for StandardEngine {
291	fn clone(&self) -> Self {
292		Self(self.0.clone())
293	}
294}
295
296impl Deref for StandardEngine {
297	type Target = Inner;
298
299	fn deref(&self) -> &Self::Target {
300		&self.0
301	}
302}
303
304pub struct Inner {
305	multi: MultiTransaction,
306	single: SingleTransaction,
307	event_bus: EventBus,
308	executor: Executor,
309	interceptors: InterceptorFactory,
310	catalog: Catalog,
311	flow_operator_store: FlowOperatorStore,
312}
313
314impl StandardEngine {
315	pub fn new(
316		multi: MultiTransaction,
317		single: SingleTransaction,
318		event_bus: EventBus,
319		interceptors: InterceptorFactory,
320		catalog: Catalog,
321		clock: Clock,
322		functions: Functions,
323		procedures: Procedures,
324		transforms: Transforms,
325		ioc: IocContainer,
326		#[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
327	) -> Self {
328		let flow_operator_store = FlowOperatorStore::new();
329		let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
330		event_bus.register(listener);
331
332		// Get the metrics store from IoC to create the stats reader
333		let metrics_store = ioc
334			.resolve::<SingleStore>()
335			.expect("SingleStore must be registered in IocContainer for metrics");
336		let stats_reader = MetricReader::new(metrics_store);
337
338		// Register MaterializedCatalogInterceptor as a factory function.
339		let materialized = catalog.materialized.clone();
340		interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
341			interceptors
342				.post_commit
343				.add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
344		}));
345
346		Self(Arc::new(Inner {
347			multi,
348			single,
349			event_bus,
350			executor: Executor::new(
351				catalog.clone(),
352				clock,
353				functions,
354				procedures,
355				transforms,
356				flow_operator_store.clone(),
357				stats_reader,
358				ioc,
359				#[cfg(not(target_arch = "wasm32"))]
360				remote_registry,
361			),
362			interceptors,
363			catalog,
364			flow_operator_store,
365		}))
366	}
367
368	/// Create a new set of interceptors from the factory.
369	pub fn create_interceptors(&self) -> Interceptors {
370		self.interceptors.create()
371	}
372
373	/// Register an additional interceptor factory function.
374	///
375	/// The function will be called on every `create()` to augment the base interceptors.
376	/// This is thread-safe and can be called after the engine is constructed (e.g. by subsystems).
377	pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
378		self.interceptors.add_late(factory);
379	}
380
381	/// Begin a query transaction at a specific version.
382	///
383	/// This is used for parallel query execution where multiple tasks need to
384	/// read from the same snapshot (same CommitVersion) for consistency.
385	#[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
386    ))]
387	pub fn begin_query_at_version(&self, version: CommitVersion) -> Result<QueryTransaction> {
388		Ok(QueryTransaction::new(self.multi.begin_query_at_version(version)?, self.single.clone()))
389	}
390
391	#[inline]
392	pub fn multi(&self) -> &MultiTransaction {
393		&self.multi
394	}
395
396	#[inline]
397	pub fn multi_owned(&self) -> MultiTransaction {
398		self.multi.clone()
399	}
400
401	/// Get the actor system
402	#[inline]
403	pub fn actor_system(&self) -> ActorSystem {
404		self.multi.actor_system()
405	}
406
407	#[inline]
408	pub fn single(&self) -> &SingleTransaction {
409		&self.single
410	}
411
412	#[inline]
413	pub fn single_owned(&self) -> SingleTransaction {
414		self.single.clone()
415	}
416
417	#[inline]
418	pub fn emit<E: Event>(&self, event: E) {
419		self.event_bus.emit(event)
420	}
421
422	#[inline]
423	pub fn materialized_catalog(&self) -> &MaterializedCatalog {
424		&self.catalog.materialized
425	}
426
427	/// Returns a `Catalog` instance for catalog lookups.
428	/// The Catalog provides three-tier lookup methods that check transactional changes,
429	/// then MaterializedCatalog, then fall back to storage.
430	#[inline]
431	pub fn catalog(&self) -> Catalog {
432		self.catalog.clone()
433	}
434
435	#[inline]
436	pub fn flow_operator_store(&self) -> &FlowOperatorStore {
437		&self.flow_operator_store
438	}
439
440	/// Get the current version from the transaction manager
441	#[inline]
442	pub fn current_version(&self) -> Result<CommitVersion> {
443		self.multi.current_version()
444	}
445
446	/// Returns the highest version where ALL prior versions have completed.
447	/// This is useful for CDC polling to know the safe upper bound for fetching
448	/// CDC events - all events up to this version are guaranteed to be in storage.
449	#[inline]
450	pub fn done_until(&self) -> CommitVersion {
451		self.multi.done_until()
452	}
453
454	/// Wait for the watermark to reach the given version with a timeout.
455	/// Returns true if the watermark reached the target, false if timeout occurred.
456	#[inline]
457	pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
458		self.multi.wait_for_mark_timeout(version, timeout)
459	}
460
461	#[inline]
462	pub fn executor(&self) -> Executor {
463		self.executor.clone()
464	}
465
466	/// Get the CDC store from the IoC container.
467	///
468	/// Returns the CdcStore that was registered during engine construction.
469	/// Panics if CdcStore was not registered.
470	#[inline]
471	pub fn cdc_store(&self) -> CdcStore {
472		self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
473	}
474
475	pub fn shutdown(&self) {
476		self.interceptors.clear_late();
477		self.executor.ioc.clear();
478	}
479
480	/// Start a bulk insert operation with full validation.
481	///
482	/// This provides a fluent API for fast bulk inserts that bypasses RQL parsing.
483	/// All inserts within a single builder execute in one transaction.
484	///
485	/// # Example
486	///
487	/// ```ignore
488	/// use reifydb_type::params;
489	///
490	/// engine.bulk_insert(&identity)
491	///     .table("namespace.users")
492	///         .row(params!{ id: 1, name: "Alice" })
493	///         .row(params!{ id: 2, name: "Bob" })
494	///         .done()
495	///     .execute()?;
496	/// ```
497	pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
498		BulkInsertBuilder::new(self, identity)
499	}
500
501	/// Start a bulk insert operation with validation disabled (trusted mode).
502	///
503	/// Use this for pre-validated internal data where constraint validation
504	/// can be skipped for maximum performance.
505	///
506	/// # Safety
507	///
508	/// The caller is responsible for ensuring the data conforms to the
509	/// schema constraints. Invalid data may cause undefined behavior.
510	pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
511		BulkInsertBuilder::new_trusted(self, identity)
512	}
513}
514
515/// Convert user column definitions to internal ColumnDef format.
516fn convert_vtable_user_columns_to_column_defs(columns: &[UserVTableColumnDef]) -> Vec<ColumnDef> {
517	columns.iter()
518		.enumerate()
519		.map(|(idx, col)| {
520			// Note: For virtual tables, we use unconstrained for all types.
521			// The nullable field is still available for documentation purposes.
522			let constraint = TypeConstraint::unconstrained(col.data_type.clone());
523			ColumnDef {
524				id: ColumnId(idx as u64),
525				name: col.name.clone(),
526				constraint,
527				properties: vec![],
528				index: ColumnIndex(idx as u8),
529				auto_increment: false,
530				dictionary_id: None,
531			}
532		})
533		.collect()
534}