Skip to main content

reifydb_engine/
engine.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc, time::Duration};
5
6use reifydb_catalog::{
7	catalog::Catalog,
8	materialized::MaterializedCatalog,
9	vtable::{
10		system::flow_operator_store::{FlowOperatorEventListener, FlowOperatorStore},
11		tables::UserVTableDataFunction,
12		user::{UserVTable, UserVTableColumnDef, registry::UserVTableEntry},
13	},
14};
15use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
16use reifydb_core::{
17	common::CommitVersion,
18	error::diagnostic::catalog::namespace_not_found,
19	event::{Event, EventBus},
20	interface::{
21		WithEventBus,
22		auth::Identity,
23		catalog::{
24			column::{ColumnDef, ColumnIndex},
25			id::ColumnId,
26			vtable::{VTableDef, VTableId},
27		},
28	},
29	util::ioc::IocContainer,
30};
31use reifydb_function::registry::Functions;
32use reifydb_metric::metric::MetricReader;
33use reifydb_runtime::{actor::system::ActorSystem, clock::Clock};
34use reifydb_transaction::{
35	interceptor::factory::InterceptorFactory,
36	multi::transaction::MultiTransaction,
37	single::SingleTransaction,
38	transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
39};
40use reifydb_type::{
41	error::Error,
42	fragment::Fragment,
43	params::Params,
44	value::{constraint::TypeConstraint, frame::frame::Frame},
45};
46use tracing::instrument;
47
48use crate::{
49	bulk_insert::builder::BulkInsertBuilder,
50	interceptor::catalog::MaterializedCatalogInterceptor,
51	transform::registry::Transforms,
52	vm::{Admin, Command, Query, executor::Executor},
53};
54
55pub struct StandardEngine(Arc<Inner>);
56
57impl WithEventBus for StandardEngine {
58	fn event_bus(&self) -> &EventBus {
59		&self.event_bus
60	}
61}
62
63// Engine methods (formerly from Engine trait in reifydb-core)
64impl StandardEngine {
65	#[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
66	pub fn begin_command(&self) -> crate::Result<CommandTransaction> {
67		let mut interceptors = self.interceptors.create();
68
69		interceptors
70			.post_commit
71			.add(Arc::new(MaterializedCatalogInterceptor::new(self.catalog.materialized.clone())));
72
73		CommandTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
74	}
75
76	#[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
77	pub fn begin_admin(&self) -> crate::Result<AdminTransaction> {
78		let mut interceptors = self.interceptors.create();
79
80		interceptors
81			.post_commit
82			.add(Arc::new(MaterializedCatalogInterceptor::new(self.catalog.materialized.clone())));
83
84		AdminTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
85	}
86
87	#[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
88	pub fn begin_query(&self) -> crate::Result<QueryTransaction> {
89		Ok(QueryTransaction::new(self.multi.begin_query()?, self.single.clone()))
90	}
91
92	#[instrument(name = "engine::admin", level = "debug", skip(self, params), fields(rql = %rql))]
93	pub fn admin_as(&self, identity: &Identity, rql: &str, params: Params) -> Result<Vec<Frame>, Error> {
94		(|| {
95			let mut txn = self.begin_admin()?;
96			let frames = self.executor.admin(
97				&mut txn,
98				Admin {
99					rql,
100					params,
101					identity,
102				},
103			)?;
104			txn.commit()?;
105			Ok(frames)
106		})()
107		.map_err(|mut err: Error| {
108			err.with_statement(rql.to_string());
109			err
110		})
111	}
112
113	#[instrument(name = "engine::command", level = "debug", skip(self, params), fields(rql = %rql))]
114	pub fn command_as(&self, identity: &Identity, rql: &str, params: Params) -> Result<Vec<Frame>, Error> {
115		(|| {
116			let mut txn = self.begin_command()?;
117			let frames = self.executor.command(
118				&mut txn,
119				Command {
120					rql,
121					params,
122					identity,
123				},
124			)?;
125			txn.commit()?;
126			Ok(frames)
127		})()
128		.map_err(|mut err: Error| {
129			err.with_statement(rql.to_string());
130			err
131		})
132	}
133
134	#[instrument(name = "engine::query", level = "debug", skip(self, params), fields(rql = %rql))]
135	pub fn query_as(&self, identity: &Identity, rql: &str, params: Params) -> Result<Vec<Frame>, Error> {
136		(|| {
137			let mut txn = self.begin_query()?;
138			self.executor.query(
139				&mut txn,
140				Query {
141					rql,
142					params,
143					identity,
144				},
145			)
146		})()
147		.map_err(|mut err: Error| {
148			err.with_statement(rql.to_string());
149			err
150		})
151	}
152
153	/// Register a user-defined virtual table.
154	///
155	/// The virtual table will be available for queries using the given namespace and name.
156	///
157	/// # Arguments
158	///
159	/// * `namespace` - The namespace name (e.g., "default", "my_namespace")
160	/// * `name` - The table name
161	/// * `table` - The virtual table implementation
162	///
163	/// # Returns
164	///
165	/// The assigned `VTableId` on success.
166	///
167	/// # Example
168	///
169	/// ```ignore
170	/// use reifydb_engine::vtable::{UserVTable, UserVTableColumnDef};
171	/// use reifydb_type::value::r#type::Type;
172	/// use reifydb_core::value::Columns;
173	///
174	/// #[derive(Clone)]
175	/// struct MyTable;
176	///
177	/// impl UserVTable for MyTable {
178	///     fn definition(&self) -> Vec<UserVTableColumnDef> {
179	///         vec![UserVTableColumnDef::new("id", Type::Uint8)]
180	///     }
181	///     fn get(&self) -> Columns {
182	///         // Return column-oriented data
183	///         Columns::empty()
184	///     }
185	/// }
186	///
187	/// let id = engine.register_virtual_table("default", "my_table", MyTable)?;
188	/// ```
189	pub fn register_virtual_table<T: UserVTable>(
190		&self,
191		namespace: &str,
192		name: &str,
193		table: T,
194	) -> crate::Result<VTableId> {
195		let catalog = self.materialized_catalog();
196
197		// Look up namespace by name (use max u64 to get latest version)
198		let ns_def = catalog
199			.find_namespace_by_name(namespace)
200			.ok_or_else(|| Error(namespace_not_found(Fragment::None, namespace)))?;
201
202		// Allocate a new table ID
203		let table_id = self.executor.virtual_table_registry.allocate_id();
204		// Convert user column definitions to internal column definitions
205		let table_columns = table.definition();
206		let columns = convert_vtable_user_columns_to_column_defs(&table_columns);
207
208		// Create the table definition
209		let def = Arc::new(VTableDef {
210			id: table_id,
211			namespace: ns_def.id,
212			name: name.to_string(),
213			columns,
214		});
215
216		// Register in catalog (for resolver lookups)
217		catalog.register_vtable_user(def.clone())?;
218		// Create the data function from the UserVTable trait
219		let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
220		// Create and register the entry
221		let entry = UserVTableEntry {
222			def: def.clone(),
223			data_fn,
224		};
225		self.executor.virtual_table_registry.register(ns_def.id, name.to_string(), entry);
226		Ok(table_id)
227	}
228}
229
230impl CdcHost for StandardEngine {
231	fn begin_command(&self) -> reifydb_type::Result<CommandTransaction> {
232		StandardEngine::begin_command(self)
233	}
234
235	fn begin_query(&self) -> reifydb_type::Result<QueryTransaction> {
236		StandardEngine::begin_query(self)
237	}
238
239	fn current_version(&self) -> reifydb_type::Result<CommitVersion> {
240		StandardEngine::current_version(self)
241	}
242
243	fn done_until(&self) -> CommitVersion {
244		StandardEngine::done_until(self)
245	}
246
247	fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
248		StandardEngine::wait_for_mark_timeout(self, version, timeout)
249	}
250
251	fn schema_registry(&self) -> &reifydb_catalog::schema::SchemaRegistry {
252		&self.catalog.schema
253	}
254}
255
256impl Clone for StandardEngine {
257	fn clone(&self) -> Self {
258		Self(self.0.clone())
259	}
260}
261
262impl Deref for StandardEngine {
263	type Target = Inner;
264
265	fn deref(&self) -> &Self::Target {
266		&self.0
267	}
268}
269
270pub struct Inner {
271	multi: MultiTransaction,
272	single: SingleTransaction,
273	event_bus: EventBus,
274	executor: Executor,
275	interceptors: Box<dyn InterceptorFactory>,
276	catalog: Catalog,
277	flow_operator_store: FlowOperatorStore,
278}
279
280impl StandardEngine {
281	pub fn new(
282		multi: MultiTransaction,
283		single: SingleTransaction,
284		event_bus: EventBus,
285		interceptors: Box<dyn InterceptorFactory>,
286		catalog: Catalog,
287		clock: Clock,
288		functions: Functions,
289		transforms: Transforms,
290		ioc: IocContainer,
291	) -> Self {
292		let flow_operator_store = FlowOperatorStore::new();
293		let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
294		event_bus.register(listener);
295
296		// Get the metrics store from IoC to create the stats reader
297		let metrics_store = ioc
298			.resolve::<reifydb_store_single::SingleStore>()
299			.expect("SingleStore must be registered in IocContainer for metrics");
300		let stats_reader = MetricReader::new(metrics_store);
301
302		Self(Arc::new(Inner {
303			multi,
304			single,
305			event_bus,
306			executor: Executor::new(
307				catalog.clone(),
308				clock,
309				functions,
310				transforms,
311				flow_operator_store.clone(),
312				stats_reader,
313				ioc,
314			),
315			interceptors,
316			catalog,
317			flow_operator_store,
318		}))
319	}
320
321	/// Create a new set of interceptors from the factory.
322	pub fn create_interceptors(&self) -> reifydb_transaction::interceptor::interceptors::Interceptors {
323		self.interceptors.create()
324	}
325
326	/// Begin a query transaction at a specific version.
327	///
328	/// This is used for parallel query execution where multiple tasks need to
329	/// read from the same snapshot (same CommitVersion) for consistency.
330	#[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
331    ))]
332	pub fn begin_query_at_version(&self, version: CommitVersion) -> crate::Result<QueryTransaction> {
333		Ok(QueryTransaction::new(self.multi.begin_query_at_version(version)?, self.single.clone()))
334	}
335
336	#[inline]
337	pub fn multi(&self) -> &MultiTransaction {
338		&self.multi
339	}
340
341	#[inline]
342	pub fn multi_owned(&self) -> MultiTransaction {
343		self.multi.clone()
344	}
345
346	/// Get the actor system
347	#[inline]
348	pub fn actor_system(&self) -> ActorSystem {
349		self.multi.actor_system()
350	}
351
352	#[inline]
353	pub fn single(&self) -> &SingleTransaction {
354		&self.single
355	}
356
357	#[inline]
358	pub fn single_owned(&self) -> SingleTransaction {
359		self.single.clone()
360	}
361
362	#[inline]
363	pub fn emit<E: Event>(&self, event: E) {
364		self.event_bus.emit(event)
365	}
366
367	#[inline]
368	pub fn materialized_catalog(&self) -> &MaterializedCatalog {
369		&self.catalog.materialized
370	}
371
372	/// Returns a `Catalog` instance for catalog lookups.
373	/// The Catalog provides three-tier lookup methods that check transactional changes,
374	/// then MaterializedCatalog, then fall back to storage.
375	#[inline]
376	pub fn catalog(&self) -> Catalog {
377		self.catalog.clone()
378	}
379
380	#[inline]
381	pub fn flow_operator_store(&self) -> &FlowOperatorStore {
382		&self.flow_operator_store
383	}
384
385	/// Get the current version from the transaction manager
386	#[inline]
387	pub fn current_version(&self) -> crate::Result<CommitVersion> {
388		self.multi.current_version()
389	}
390
391	/// Returns the highest version where ALL prior versions have completed.
392	/// This is useful for CDC polling to know the safe upper bound for fetching
393	/// CDC events - all events up to this version are guaranteed to be in storage.
394	#[inline]
395	pub fn done_until(&self) -> CommitVersion {
396		self.multi.done_until()
397	}
398
399	/// Wait for the watermark to reach the given version with a timeout.
400	/// Returns true if the watermark reached the target, false if timeout occurred.
401	#[inline]
402	pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
403		self.multi.wait_for_mark_timeout(version, timeout)
404	}
405
406	#[inline]
407	pub fn executor(&self) -> Executor {
408		self.executor.clone()
409	}
410
411	/// Get the CDC store from the IoC container.
412	///
413	/// Returns the CdcStore that was registered during engine construction.
414	/// Panics if CdcStore was not registered.
415	#[inline]
416	pub fn cdc_store(&self) -> CdcStore {
417		self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
418	}
419
420	/// Start a bulk insert operation with full validation.
421	///
422	/// This provides a fluent API for fast bulk inserts that bypasses RQL parsing.
423	/// All inserts within a single builder execute in one transaction.
424	///
425	/// # Example
426	///
427	/// ```ignore
428	/// use reifydb_type::params;
429	///
430	/// engine.bulk_insert(&identity)
431	///     .table("namespace.users")
432	///         .row(params!{ id: 1, name: "Alice" })
433	///         .row(params!{ id: 2, name: "Bob" })
434	///         .done()
435	///     .execute()?;
436	/// ```
437	pub fn bulk_insert<'e>(
438		&'e self,
439		identity: &'e Identity,
440	) -> BulkInsertBuilder<'e, crate::bulk_insert::builder::Validated> {
441		BulkInsertBuilder::new(self, identity)
442	}
443
444	/// Start a bulk insert operation with validation disabled (trusted mode).
445	///
446	/// Use this for pre-validated internal data where constraint validation
447	/// can be skipped for maximum performance.
448	///
449	/// # Safety
450	///
451	/// The caller is responsible for ensuring the data conforms to the
452	/// schema constraints. Invalid data may cause undefined behavior.
453	pub fn bulk_insert_trusted<'e>(
454		&'e self,
455		identity: &'e Identity,
456	) -> BulkInsertBuilder<'e, crate::bulk_insert::builder::Trusted> {
457		BulkInsertBuilder::new_trusted(self, identity)
458	}
459}
460
461/// Convert user column definitions to internal ColumnDef format.
462fn convert_vtable_user_columns_to_column_defs(columns: &[UserVTableColumnDef]) -> Vec<ColumnDef> {
463	columns.iter()
464		.enumerate()
465		.map(|(idx, col)| {
466			// Note: For virtual tables, we use unconstrained for all types.
467			// The nullable field is still available for documentation purposes.
468			let constraint = TypeConstraint::unconstrained(col.data_type.clone());
469			ColumnDef {
470				id: ColumnId(idx as u64),
471				name: col.name.clone(),
472				constraint,
473				policies: vec![],
474				index: ColumnIndex(idx as u8),
475				auto_increment: false,
476				dictionary_id: None,
477			}
478		})
479		.collect()
480}