Skip to main content

reifydb_engine/
engine.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc, time::Duration};
5
6use reifydb_catalog::{
7	catalog::Catalog,
8	materialized::MaterializedCatalog,
9	schema::SchemaRegistry,
10	vtable::{
11		system::flow_operator_store::{FlowOperatorEventListener, FlowOperatorStore},
12		tables::UserVTableDataFunction,
13		user::{UserVTable, UserVTableColumnDef, registry::UserVTableEntry},
14	},
15};
16use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
17use reifydb_core::{
18	common::CommitVersion,
19	error::diagnostic::catalog::namespace_not_found,
20	event::{Event, EventBus},
21	interface::{
22		WithEventBus,
23		catalog::{
24			column::{ColumnDef, ColumnIndex},
25			id::ColumnId,
26			vtable::{VTableDef, VTableId},
27		},
28	},
29	util::ioc::IocContainer,
30};
31use reifydb_function::registry::Functions;
32use reifydb_metric::metric::MetricReader;
33use reifydb_runtime::{actor::system::ActorSystem, clock::Clock};
34use reifydb_store_single::SingleStore;
35use reifydb_transaction::{
36	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
37	multi::transaction::MultiTransaction,
38	single::SingleTransaction,
39	transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
40};
41use reifydb_type::{
42	error::Error,
43	fragment::Fragment,
44	params::Params,
45	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId},
46};
47use tracing::instrument;
48
49#[cfg(not(target_arch = "wasm32"))]
50use crate::remote::RemoteRegistry;
51use crate::{
52	Result,
53	bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
54	interceptor::catalog::MaterializedCatalogInterceptor,
55	procedure::registry::Procedures,
56	transform::registry::Transforms,
57	vm::{Admin, Command, Query, executor::Executor},
58};
59
60pub struct StandardEngine(Arc<Inner>);
61
62impl WithEventBus for StandardEngine {
63	fn event_bus(&self) -> &EventBus {
64		&self.event_bus
65	}
66}
67
68// Engine methods (formerly from Engine trait in reifydb-core)
69impl StandardEngine {
70	#[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
71	pub fn begin_command(&self) -> Result<CommandTransaction> {
72		let interceptors = self.interceptors.create();
73		CommandTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
74	}
75
76	#[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
77	pub fn begin_admin(&self) -> Result<AdminTransaction> {
78		let interceptors = self.interceptors.create();
79		AdminTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
80	}
81
82	#[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
83	pub fn begin_query(&self) -> Result<QueryTransaction> {
84		Ok(QueryTransaction::new(self.multi.begin_query()?, self.single.clone()))
85	}
86
87	#[instrument(name = "engine::admin", level = "debug", skip(self, params), fields(rql = %rql))]
88	pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
89		(|| {
90			let mut txn = self.begin_admin()?;
91			let frames = self.executor.admin(
92				&mut txn,
93				Admin {
94					rql,
95					params,
96					identity,
97				},
98			)?;
99			txn.commit()?;
100			Ok(frames)
101		})()
102		.map_err(|mut err: Error| {
103			err.with_statement(rql.to_string());
104			err
105		})
106	}
107
108	#[instrument(name = "engine::command", level = "debug", skip(self, params), fields(rql = %rql))]
109	pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
110		(|| {
111			let mut txn = self.begin_command()?;
112			let frames = self.executor.command(
113				&mut txn,
114				Command {
115					rql,
116					params,
117					identity,
118				},
119			)?;
120			txn.commit()?;
121			Ok(frames)
122		})()
123		.map_err(|mut err: Error| {
124			err.with_statement(rql.to_string());
125			err
126		})
127	}
128
129	#[instrument(name = "engine::query", level = "debug", skip(self, params), fields(rql = %rql))]
130	pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
131		(|| {
132			let mut txn = self.begin_query()?;
133			self.executor.query(
134				&mut txn,
135				Query {
136					rql,
137					params,
138					identity,
139				},
140			)
141		})()
142		.map_err(|mut err: Error| {
143			err.with_statement(rql.to_string());
144			err
145		})
146	}
147
148	/// Call a procedure by fully-qualified name.
149	#[instrument(name = "engine::procedure", level = "debug", skip(self, params), fields(name = %name))]
150	pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> Result<Vec<Frame>> {
151		let mut txn = self.begin_command()?;
152		let frames = self.executor.call_procedure(&mut txn, identity, name, &params)?;
153		txn.commit()?;
154		Ok(frames)
155	}
156
157	/// Register a user-defined virtual table.
158	///
159	/// The virtual table will be available for queries using the given namespace and name.
160	///
161	/// # Arguments
162	///
163	/// * `namespace` - The namespace name (e.g., "default", "my_namespace")
164	/// * `name` - The table name
165	/// * `table` - The virtual table implementation
166	///
167	/// # Returns
168	///
169	/// The assigned `VTableId` on success.
170	///
171	/// # Example
172	///
173	/// ```ignore
174	/// use reifydb_engine::vtable::{UserVTable, UserVTableColumnDef};
175	/// use reifydb_type::value::r#type::Type;
176	/// use reifydb_core::value::Columns;
177	///
178	/// #[derive(Clone)]
179	/// struct MyTable;
180	///
181	/// impl UserVTable for MyTable {
182	///     fn definition(&self) -> Vec<UserVTableColumnDef> {
183	///         vec![UserVTableColumnDef::new("id", Type::Uint8)]
184	///     }
185	///     fn get(&self) -> Columns {
186	///         // Return column-oriented data
187	///         Columns::empty()
188	///     }
189	/// }
190	///
191	/// let id = engine.register_virtual_table("default", "my_table", MyTable)?;
192	/// ```
193	pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
194		let catalog = self.materialized_catalog();
195
196		// Look up namespace by name (use max u64 to get latest version)
197		let ns_def = catalog
198			.find_namespace_by_name(namespace)
199			.ok_or_else(|| Error(namespace_not_found(Fragment::None, namespace)))?;
200
201		// Allocate a new table ID
202		let table_id = self.executor.virtual_table_registry.allocate_id();
203		// Convert user column definitions to internal column definitions
204		let table_columns = table.definition();
205		let columns = convert_vtable_user_columns_to_column_defs(&table_columns);
206
207		// Create the table definition
208		let def = Arc::new(VTableDef {
209			id: table_id,
210			namespace: ns_def.id(),
211			name: name.to_string(),
212			columns,
213		});
214
215		// Register in catalog (for resolver lookups)
216		catalog.register_vtable_user(def.clone())?;
217		// Create the data function from the UserVTable trait
218		let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
219		// Create and register the entry
220		let entry = UserVTableEntry {
221			def: def.clone(),
222			data_fn,
223		};
224		self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
225		Ok(table_id)
226	}
227}
228
229impl CdcHost for StandardEngine {
230	fn begin_command(&self) -> Result<CommandTransaction> {
231		StandardEngine::begin_command(self)
232	}
233
234	fn begin_query(&self) -> Result<QueryTransaction> {
235		StandardEngine::begin_query(self)
236	}
237
238	fn current_version(&self) -> Result<CommitVersion> {
239		StandardEngine::current_version(self)
240	}
241
242	fn done_until(&self) -> CommitVersion {
243		StandardEngine::done_until(self)
244	}
245
246	fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
247		StandardEngine::wait_for_mark_timeout(self, version, timeout)
248	}
249
250	fn schema_registry(&self) -> &SchemaRegistry {
251		&self.catalog.schema
252	}
253}
254
255impl Clone for StandardEngine {
256	fn clone(&self) -> Self {
257		Self(self.0.clone())
258	}
259}
260
261impl Deref for StandardEngine {
262	type Target = Inner;
263
264	fn deref(&self) -> &Self::Target {
265		&self.0
266	}
267}
268
269pub struct Inner {
270	multi: MultiTransaction,
271	single: SingleTransaction,
272	event_bus: EventBus,
273	executor: Executor,
274	interceptors: InterceptorFactory,
275	catalog: Catalog,
276	flow_operator_store: FlowOperatorStore,
277}
278
279impl StandardEngine {
280	pub fn new(
281		multi: MultiTransaction,
282		single: SingleTransaction,
283		event_bus: EventBus,
284		interceptors: InterceptorFactory,
285		catalog: Catalog,
286		clock: Clock,
287		functions: Functions,
288		procedures: Procedures,
289		transforms: Transforms,
290		ioc: IocContainer,
291		#[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
292	) -> Self {
293		let flow_operator_store = FlowOperatorStore::new();
294		let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
295		event_bus.register(listener);
296
297		// Get the metrics store from IoC to create the stats reader
298		let metrics_store = ioc
299			.resolve::<SingleStore>()
300			.expect("SingleStore must be registered in IocContainer for metrics");
301		let stats_reader = MetricReader::new(metrics_store);
302
303		// Register MaterializedCatalogInterceptor as a factory function.
304		let materialized = catalog.materialized.clone();
305		interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
306			interceptors
307				.post_commit
308				.add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
309		}));
310
311		Self(Arc::new(Inner {
312			multi,
313			single,
314			event_bus,
315			executor: Executor::new(
316				catalog.clone(),
317				clock,
318				functions,
319				procedures,
320				transforms,
321				flow_operator_store.clone(),
322				stats_reader,
323				ioc,
324				#[cfg(not(target_arch = "wasm32"))]
325				remote_registry,
326			),
327			interceptors,
328			catalog,
329			flow_operator_store,
330		}))
331	}
332
333	/// Create a new set of interceptors from the factory.
334	pub fn create_interceptors(&self) -> Interceptors {
335		self.interceptors.create()
336	}
337
338	/// Register an additional interceptor factory function.
339	///
340	/// The function will be called on every `create()` to augment the base interceptors.
341	/// This is thread-safe and can be called after the engine is constructed (e.g. by subsystems).
342	pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
343		self.interceptors.add_late(factory);
344	}
345
346	/// Begin a query transaction at a specific version.
347	///
348	/// This is used for parallel query execution where multiple tasks need to
349	/// read from the same snapshot (same CommitVersion) for consistency.
350	#[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
351    ))]
352	pub fn begin_query_at_version(&self, version: CommitVersion) -> Result<QueryTransaction> {
353		Ok(QueryTransaction::new(self.multi.begin_query_at_version(version)?, self.single.clone()))
354	}
355
356	#[inline]
357	pub fn multi(&self) -> &MultiTransaction {
358		&self.multi
359	}
360
361	#[inline]
362	pub fn multi_owned(&self) -> MultiTransaction {
363		self.multi.clone()
364	}
365
366	/// Get the actor system
367	#[inline]
368	pub fn actor_system(&self) -> ActorSystem {
369		self.multi.actor_system()
370	}
371
372	#[inline]
373	pub fn single(&self) -> &SingleTransaction {
374		&self.single
375	}
376
377	#[inline]
378	pub fn single_owned(&self) -> SingleTransaction {
379		self.single.clone()
380	}
381
382	#[inline]
383	pub fn emit<E: Event>(&self, event: E) {
384		self.event_bus.emit(event)
385	}
386
387	#[inline]
388	pub fn materialized_catalog(&self) -> &MaterializedCatalog {
389		&self.catalog.materialized
390	}
391
392	/// Returns a `Catalog` instance for catalog lookups.
393	/// The Catalog provides three-tier lookup methods that check transactional changes,
394	/// then MaterializedCatalog, then fall back to storage.
395	#[inline]
396	pub fn catalog(&self) -> Catalog {
397		self.catalog.clone()
398	}
399
400	#[inline]
401	pub fn flow_operator_store(&self) -> &FlowOperatorStore {
402		&self.flow_operator_store
403	}
404
405	/// Get the current version from the transaction manager
406	#[inline]
407	pub fn current_version(&self) -> Result<CommitVersion> {
408		self.multi.current_version()
409	}
410
411	/// Returns the highest version where ALL prior versions have completed.
412	/// This is useful for CDC polling to know the safe upper bound for fetching
413	/// CDC events - all events up to this version are guaranteed to be in storage.
414	#[inline]
415	pub fn done_until(&self) -> CommitVersion {
416		self.multi.done_until()
417	}
418
419	/// Wait for the watermark to reach the given version with a timeout.
420	/// Returns true if the watermark reached the target, false if timeout occurred.
421	#[inline]
422	pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
423		self.multi.wait_for_mark_timeout(version, timeout)
424	}
425
426	#[inline]
427	pub fn executor(&self) -> Executor {
428		self.executor.clone()
429	}
430
431	/// Get the CDC store from the IoC container.
432	///
433	/// Returns the CdcStore that was registered during engine construction.
434	/// Panics if CdcStore was not registered.
435	#[inline]
436	pub fn cdc_store(&self) -> CdcStore {
437		self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
438	}
439
440	pub fn shutdown(&self) {
441		self.interceptors.clear_late();
442		self.executor.ioc.clear();
443	}
444
445	/// Start a bulk insert operation with full validation.
446	///
447	/// This provides a fluent API for fast bulk inserts that bypasses RQL parsing.
448	/// All inserts within a single builder execute in one transaction.
449	///
450	/// # Example
451	///
452	/// ```ignore
453	/// use reifydb_type::params;
454	///
455	/// engine.bulk_insert(&identity)
456	///     .table("namespace.users")
457	///         .row(params!{ id: 1, name: "Alice" })
458	///         .row(params!{ id: 2, name: "Bob" })
459	///         .done()
460	///     .execute()?;
461	/// ```
462	pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
463		BulkInsertBuilder::new(self, identity)
464	}
465
466	/// Start a bulk insert operation with validation disabled (trusted mode).
467	///
468	/// Use this for pre-validated internal data where constraint validation
469	/// can be skipped for maximum performance.
470	///
471	/// # Safety
472	///
473	/// The caller is responsible for ensuring the data conforms to the
474	/// schema constraints. Invalid data may cause undefined behavior.
475	pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
476		BulkInsertBuilder::new_trusted(self, identity)
477	}
478}
479
480/// Convert user column definitions to internal ColumnDef format.
481fn convert_vtable_user_columns_to_column_defs(columns: &[UserVTableColumnDef]) -> Vec<ColumnDef> {
482	columns.iter()
483		.enumerate()
484		.map(|(idx, col)| {
485			// Note: For virtual tables, we use unconstrained for all types.
486			// The nullable field is still available for documentation purposes.
487			let constraint = TypeConstraint::unconstrained(col.data_type.clone());
488			ColumnDef {
489				id: ColumnId(idx as u64),
490				name: col.name.clone(),
491				constraint,
492				properties: vec![],
493				index: ColumnIndex(idx as u8),
494				auto_increment: false,
495				dictionary_id: None,
496			}
497		})
498		.collect()
499}