Skip to main content

reifydb_engine/
engine.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc, time::Duration};
5
6use reifydb_catalog::{
7	catalog::Catalog,
8	materialized::MaterializedCatalog,
9	schema::SchemaRegistry,
10	vtable::{
11		system::flow_operator_store::{FlowOperatorEventListener, FlowOperatorStore},
12		tables::UserVTableDataFunction,
13		user::{UserVTable, UserVTableColumnDef, registry::UserVTableEntry},
14	},
15};
16use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
17use reifydb_core::{
18	common::CommitVersion,
19	error::diagnostic::catalog::namespace_not_found,
20	event::{Event, EventBus},
21	interface::{
22		WithEventBus,
23		catalog::{
24			column::{ColumnDef, ColumnIndex},
25			id::ColumnId,
26			vtable::{VTableDef, VTableId},
27		},
28	},
29	util::ioc::IocContainer,
30};
31use reifydb_function::registry::Functions;
32use reifydb_metric::metric::MetricReader;
33use reifydb_runtime::{actor::system::ActorSystem, clock::Clock};
34use reifydb_store_single::SingleStore;
35use reifydb_transaction::{
36	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
37	multi::transaction::MultiTransaction,
38	single::SingleTransaction,
39	transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
40};
41use reifydb_type::{
42	error::Error,
43	fragment::Fragment,
44	params::Params,
45	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId},
46};
47use tracing::instrument;
48
49use crate::{
50	Result,
51	bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
52	interceptor::catalog::MaterializedCatalogInterceptor,
53	procedure::registry::Procedures,
54	transform::registry::Transforms,
55	vm::{Admin, Command, Query, executor::Executor},
56};
57
58pub struct StandardEngine(Arc<Inner>);
59
60impl WithEventBus for StandardEngine {
61	fn event_bus(&self) -> &EventBus {
62		&self.event_bus
63	}
64}
65
66// Engine methods (formerly from Engine trait in reifydb-core)
67impl StandardEngine {
68	#[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
69	pub fn begin_command(&self) -> Result<CommandTransaction> {
70		let interceptors = self.interceptors.create();
71		CommandTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
72	}
73
74	#[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
75	pub fn begin_admin(&self) -> Result<AdminTransaction> {
76		let interceptors = self.interceptors.create();
77		AdminTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
78	}
79
80	#[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
81	pub fn begin_query(&self) -> Result<QueryTransaction> {
82		Ok(QueryTransaction::new(self.multi.begin_query()?, self.single.clone()))
83	}
84
85	#[instrument(name = "engine::admin", level = "debug", skip(self, params), fields(rql = %rql))]
86	pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
87		(|| {
88			let mut txn = self.begin_admin()?;
89			let frames = self.executor.admin(
90				&mut txn,
91				Admin {
92					rql,
93					params,
94					identity,
95				},
96			)?;
97			txn.commit()?;
98			Ok(frames)
99		})()
100		.map_err(|mut err: Error| {
101			err.with_statement(rql.to_string());
102			err
103		})
104	}
105
106	#[instrument(name = "engine::command", level = "debug", skip(self, params), fields(rql = %rql))]
107	pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
108		(|| {
109			let mut txn = self.begin_command()?;
110			let frames = self.executor.command(
111				&mut txn,
112				Command {
113					rql,
114					params,
115					identity,
116				},
117			)?;
118			txn.commit()?;
119			Ok(frames)
120		})()
121		.map_err(|mut err: Error| {
122			err.with_statement(rql.to_string());
123			err
124		})
125	}
126
127	#[instrument(name = "engine::query", level = "debug", skip(self, params), fields(rql = %rql))]
128	pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
129		(|| {
130			let mut txn = self.begin_query()?;
131			self.executor.query(
132				&mut txn,
133				Query {
134					rql,
135					params,
136					identity,
137				},
138			)
139		})()
140		.map_err(|mut err: Error| {
141			err.with_statement(rql.to_string());
142			err
143		})
144	}
145
146	/// Call a procedure by fully-qualified name.
147	#[instrument(name = "engine::procedure", level = "debug", skip(self, params), fields(name = %name))]
148	pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> Result<Vec<Frame>> {
149		let mut txn = self.begin_command()?;
150		let frames = self.executor.call_procedure(&mut txn, identity, name, &params)?;
151		txn.commit()?;
152		Ok(frames)
153	}
154
155	/// Register a user-defined virtual table.
156	///
157	/// The virtual table will be available for queries using the given namespace and name.
158	///
159	/// # Arguments
160	///
161	/// * `namespace` - The namespace name (e.g., "default", "my_namespace")
162	/// * `name` - The table name
163	/// * `table` - The virtual table implementation
164	///
165	/// # Returns
166	///
167	/// The assigned `VTableId` on success.
168	///
169	/// # Example
170	///
171	/// ```ignore
172	/// use reifydb_engine::vtable::{UserVTable, UserVTableColumnDef};
173	/// use reifydb_type::value::r#type::Type;
174	/// use reifydb_core::value::Columns;
175	///
176	/// #[derive(Clone)]
177	/// struct MyTable;
178	///
179	/// impl UserVTable for MyTable {
180	///     fn definition(&self) -> Vec<UserVTableColumnDef> {
181	///         vec![UserVTableColumnDef::new("id", Type::Uint8)]
182	///     }
183	///     fn get(&self) -> Columns {
184	///         // Return column-oriented data
185	///         Columns::empty()
186	///     }
187	/// }
188	///
189	/// let id = engine.register_virtual_table("default", "my_table", MyTable)?;
190	/// ```
191	pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
192		let catalog = self.materialized_catalog();
193
194		// Look up namespace by name (use max u64 to get latest version)
195		let ns_def = catalog
196			.find_namespace_by_name(namespace)
197			.ok_or_else(|| Error(namespace_not_found(Fragment::None, namespace)))?;
198
199		// Allocate a new table ID
200		let table_id = self.executor.virtual_table_registry.allocate_id();
201		// Convert user column definitions to internal column definitions
202		let table_columns = table.definition();
203		let columns = convert_vtable_user_columns_to_column_defs(&table_columns);
204
205		// Create the table definition
206		let def = Arc::new(VTableDef {
207			id: table_id,
208			namespace: ns_def.id,
209			name: name.to_string(),
210			columns,
211		});
212
213		// Register in catalog (for resolver lookups)
214		catalog.register_vtable_user(def.clone())?;
215		// Create the data function from the UserVTable trait
216		let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
217		// Create and register the entry
218		let entry = UserVTableEntry {
219			def: def.clone(),
220			data_fn,
221		};
222		self.executor.virtual_table_registry.register(ns_def.id, name.to_string(), entry);
223		Ok(table_id)
224	}
225}
226
227impl CdcHost for StandardEngine {
228	fn begin_command(&self) -> Result<CommandTransaction> {
229		StandardEngine::begin_command(self)
230	}
231
232	fn begin_query(&self) -> Result<QueryTransaction> {
233		StandardEngine::begin_query(self)
234	}
235
236	fn current_version(&self) -> Result<CommitVersion> {
237		StandardEngine::current_version(self)
238	}
239
240	fn done_until(&self) -> CommitVersion {
241		StandardEngine::done_until(self)
242	}
243
244	fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
245		StandardEngine::wait_for_mark_timeout(self, version, timeout)
246	}
247
248	fn schema_registry(&self) -> &SchemaRegistry {
249		&self.catalog.schema
250	}
251}
252
253impl Clone for StandardEngine {
254	fn clone(&self) -> Self {
255		Self(self.0.clone())
256	}
257}
258
259impl Deref for StandardEngine {
260	type Target = Inner;
261
262	fn deref(&self) -> &Self::Target {
263		&self.0
264	}
265}
266
267pub struct Inner {
268	multi: MultiTransaction,
269	single: SingleTransaction,
270	event_bus: EventBus,
271	executor: Executor,
272	interceptors: InterceptorFactory,
273	catalog: Catalog,
274	flow_operator_store: FlowOperatorStore,
275}
276
277impl StandardEngine {
278	pub fn new(
279		multi: MultiTransaction,
280		single: SingleTransaction,
281		event_bus: EventBus,
282		interceptors: InterceptorFactory,
283		catalog: Catalog,
284		clock: Clock,
285		functions: Functions,
286		procedures: Procedures,
287		transforms: Transforms,
288		ioc: IocContainer,
289	) -> Self {
290		let flow_operator_store = FlowOperatorStore::new();
291		let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
292		event_bus.register(listener);
293
294		// Get the metrics store from IoC to create the stats reader
295		let metrics_store = ioc
296			.resolve::<SingleStore>()
297			.expect("SingleStore must be registered in IocContainer for metrics");
298		let stats_reader = MetricReader::new(metrics_store);
299
300		// Register MaterializedCatalogInterceptor as a factory function.
301		let materialized = catalog.materialized.clone();
302		interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
303			interceptors
304				.post_commit
305				.add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
306		}));
307
308		Self(Arc::new(Inner {
309			multi,
310			single,
311			event_bus,
312			executor: Executor::new(
313				catalog.clone(),
314				clock,
315				functions,
316				procedures,
317				transforms,
318				flow_operator_store.clone(),
319				stats_reader,
320				ioc,
321			),
322			interceptors,
323			catalog,
324			flow_operator_store,
325		}))
326	}
327
328	/// Create a new set of interceptors from the factory.
329	pub fn create_interceptors(&self) -> Interceptors {
330		self.interceptors.create()
331	}
332
333	/// Register an additional interceptor factory function.
334	///
335	/// The function will be called on every `create()` to augment the base interceptors.
336	/// This is thread-safe and can be called after the engine is constructed (e.g. by subsystems).
337	pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
338		self.interceptors.add_late(factory);
339	}
340
341	/// Begin a query transaction at a specific version.
342	///
343	/// This is used for parallel query execution where multiple tasks need to
344	/// read from the same snapshot (same CommitVersion) for consistency.
345	#[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
346    ))]
347	pub fn begin_query_at_version(&self, version: CommitVersion) -> Result<QueryTransaction> {
348		Ok(QueryTransaction::new(self.multi.begin_query_at_version(version)?, self.single.clone()))
349	}
350
351	#[inline]
352	pub fn multi(&self) -> &MultiTransaction {
353		&self.multi
354	}
355
356	#[inline]
357	pub fn multi_owned(&self) -> MultiTransaction {
358		self.multi.clone()
359	}
360
361	/// Get the actor system
362	#[inline]
363	pub fn actor_system(&self) -> ActorSystem {
364		self.multi.actor_system()
365	}
366
367	#[inline]
368	pub fn single(&self) -> &SingleTransaction {
369		&self.single
370	}
371
372	#[inline]
373	pub fn single_owned(&self) -> SingleTransaction {
374		self.single.clone()
375	}
376
377	#[inline]
378	pub fn emit<E: Event>(&self, event: E) {
379		self.event_bus.emit(event)
380	}
381
382	#[inline]
383	pub fn materialized_catalog(&self) -> &MaterializedCatalog {
384		&self.catalog.materialized
385	}
386
387	/// Returns a `Catalog` instance for catalog lookups.
388	/// The Catalog provides three-tier lookup methods that check transactional changes,
389	/// then MaterializedCatalog, then fall back to storage.
390	#[inline]
391	pub fn catalog(&self) -> Catalog {
392		self.catalog.clone()
393	}
394
395	#[inline]
396	pub fn flow_operator_store(&self) -> &FlowOperatorStore {
397		&self.flow_operator_store
398	}
399
400	/// Get the current version from the transaction manager
401	#[inline]
402	pub fn current_version(&self) -> Result<CommitVersion> {
403		self.multi.current_version()
404	}
405
406	/// Returns the highest version where ALL prior versions have completed.
407	/// This is useful for CDC polling to know the safe upper bound for fetching
408	/// CDC events - all events up to this version are guaranteed to be in storage.
409	#[inline]
410	pub fn done_until(&self) -> CommitVersion {
411		self.multi.done_until()
412	}
413
414	/// Wait for the watermark to reach the given version with a timeout.
415	/// Returns true if the watermark reached the target, false if timeout occurred.
416	#[inline]
417	pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
418		self.multi.wait_for_mark_timeout(version, timeout)
419	}
420
421	#[inline]
422	pub fn executor(&self) -> Executor {
423		self.executor.clone()
424	}
425
426	/// Get the CDC store from the IoC container.
427	///
428	/// Returns the CdcStore that was registered during engine construction.
429	/// Panics if CdcStore was not registered.
430	#[inline]
431	pub fn cdc_store(&self) -> CdcStore {
432		self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
433	}
434
435	pub fn shutdown(&self) {
436		self.interceptors.clear_late();
437		self.executor.ioc.clear();
438	}
439
440	/// Start a bulk insert operation with full validation.
441	///
442	/// This provides a fluent API for fast bulk inserts that bypasses RQL parsing.
443	/// All inserts within a single builder execute in one transaction.
444	///
445	/// # Example
446	///
447	/// ```ignore
448	/// use reifydb_type::params;
449	///
450	/// engine.bulk_insert(&identity)
451	///     .table("namespace.users")
452	///         .row(params!{ id: 1, name: "Alice" })
453	///         .row(params!{ id: 2, name: "Bob" })
454	///         .done()
455	///     .execute()?;
456	/// ```
457	pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
458		BulkInsertBuilder::new(self, identity)
459	}
460
461	/// Start a bulk insert operation with validation disabled (trusted mode).
462	///
463	/// Use this for pre-validated internal data where constraint validation
464	/// can be skipped for maximum performance.
465	///
466	/// # Safety
467	///
468	/// The caller is responsible for ensuring the data conforms to the
469	/// schema constraints. Invalid data may cause undefined behavior.
470	pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
471		BulkInsertBuilder::new_trusted(self, identity)
472	}
473}
474
475/// Convert user column definitions to internal ColumnDef format.
476fn convert_vtable_user_columns_to_column_defs(columns: &[UserVTableColumnDef]) -> Vec<ColumnDef> {
477	columns.iter()
478		.enumerate()
479		.map(|(idx, col)| {
480			// Note: For virtual tables, we use unconstrained for all types.
481			// The nullable field is still available for documentation purposes.
482			let constraint = TypeConstraint::unconstrained(col.data_type.clone());
483			ColumnDef {
484				id: ColumnId(idx as u64),
485				name: col.name.clone(),
486				constraint,
487				properties: vec![],
488				index: ColumnIndex(idx as u8),
489				auto_increment: false,
490				dictionary_id: None,
491			}
492		})
493		.collect()
494}