Skip to main content

reifydb_engine/
engine.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	ops::Deref,
6	sync::{
7		Arc,
8		atomic::{AtomicBool, Ordering},
9	},
10	time::Duration,
11};
12
13use reifydb_auth::service::AuthEngine;
14use reifydb_catalog::{
15	catalog::Catalog,
16	materialized::MaterializedCatalog,
17	vtable::{
18		system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
19		tables::UserVTableDataFunction,
20		user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
21	},
22};
23use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
24use reifydb_core::{
25	common::CommitVersion,
26	error::diagnostic::{catalog::namespace_not_found, engine::read_only_rejection},
27	event::{Event, EventBus},
28	execution::ExecutionResult,
29	interface::{
30		WithEventBus,
31		catalog::{
32			column::{Column, ColumnIndex},
33			id::ColumnId,
34			vtable::{VTable, VTableId},
35		},
36	},
37	metric::ExecutionMetrics,
38};
39use reifydb_metric_old::metric::MetricReader;
40use reifydb_runtime::{actor::system::ActorSystem, context::clock::Clock};
41use reifydb_store_single::SingleStore;
42use reifydb_transaction::{
43	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
44	multi::transaction::MultiTransaction,
45	single::SingleTransaction,
46	transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
47};
48use reifydb_type::{
49	error::Error,
50	fragment::Fragment,
51	params::Params,
52	value::{constraint::TypeConstraint, identity::IdentityId},
53};
54use tracing::instrument;
55
56use crate::{
57	Result,
58	bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
59	interceptor::catalog::MaterializedCatalogInterceptor,
60	vm::{Admin, Command, Query, Subscription, executor::Executor, services::EngineConfig},
61};
62
63pub struct StandardEngine(Arc<Inner>);
64
65impl WithEventBus for StandardEngine {
66	fn event_bus(&self) -> &EventBus {
67		&self.event_bus
68	}
69}
70
71impl AuthEngine for StandardEngine {
72	fn begin_admin(&self) -> Result<AdminTransaction> {
73		StandardEngine::begin_admin(self, IdentityId::system())
74	}
75
76	fn begin_query(&self) -> Result<QueryTransaction> {
77		StandardEngine::begin_query(self, IdentityId::system())
78	}
79
80	fn catalog(&self) -> Catalog {
81		StandardEngine::catalog(self)
82	}
83}
84
85// Engine methods (formerly from Engine trait in reifydb-core)
86impl StandardEngine {
87	#[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
88	pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
89		let interceptors = self.interceptors.create();
90		let mut txn = CommandTransaction::new(
91			self.multi.clone(),
92			self.single.clone(),
93			self.event_bus.clone(),
94			interceptors,
95			identity,
96			self.executor.runtime_context.clock.clone(),
97		)?;
98		txn.set_executor(Arc::new(self.executor.clone()));
99		Ok(txn)
100	}
101
102	#[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
103	pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
104		let interceptors = self.interceptors.create();
105		let mut txn = AdminTransaction::new(
106			self.multi.clone(),
107			self.single.clone(),
108			self.event_bus.clone(),
109			interceptors,
110			identity,
111			self.executor.runtime_context.clock.clone(),
112		)?;
113		txn.set_executor(Arc::new(self.executor.clone()));
114		Ok(txn)
115	}
116
117	#[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
118	pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
119		let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
120		txn.set_executor(Arc::new(self.executor.clone()));
121		Ok(txn)
122	}
123
124	/// Get the runtime clock for timestamp operations.
125	pub fn clock(&self) -> &Clock {
126		&self.executor.runtime_context.clock
127	}
128
129	#[instrument(name = "engine::admin_as", level = "debug", skip(self, params), fields(rql = %rql))]
130	pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
131		if let Err(e) = self.reject_if_read_only() {
132			return ExecutionResult {
133				frames: vec![],
134				error: Some(e),
135				metrics: ExecutionMetrics::default(),
136			};
137		}
138		let mut txn = match self.begin_admin(identity) {
139			Ok(t) => t,
140			Err(mut e) => {
141				e.with_statement(rql.to_string());
142				return ExecutionResult {
143					frames: vec![],
144					error: Some(e),
145					metrics: ExecutionMetrics::default(),
146				};
147			}
148		};
149		let mut outcome = self.executor.admin(
150			&mut txn,
151			Admin {
152				rql,
153				params,
154			},
155		);
156		if outcome.is_ok()
157			&& let Err(mut e) = txn.commit()
158		{
159			e.with_statement(rql.to_string());
160			outcome.error = Some(e);
161		}
162		if let Some(ref mut e) = outcome.error {
163			e.with_statement(rql.to_string());
164		}
165		outcome
166	}
167
168	#[instrument(name = "engine::command_as", level = "debug", skip(self, params), fields(rql = %rql))]
169	pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
170		if let Err(e) = self.reject_if_read_only() {
171			return ExecutionResult {
172				frames: vec![],
173				error: Some(e),
174				metrics: ExecutionMetrics::default(),
175			};
176		}
177		let mut txn = match self.begin_command(identity) {
178			Ok(t) => t,
179			Err(mut e) => {
180				e.with_statement(rql.to_string());
181				return ExecutionResult {
182					frames: vec![],
183					error: Some(e),
184					metrics: ExecutionMetrics::default(),
185				};
186			}
187		};
188		let mut outcome = self.executor.command(
189			&mut txn,
190			Command {
191				rql,
192				params,
193			},
194		);
195		if outcome.is_ok()
196			&& let Err(mut e) = txn.commit()
197		{
198			e.with_statement(rql.to_string());
199			outcome.error = Some(e);
200		}
201		if let Some(ref mut e) = outcome.error {
202			e.with_statement(rql.to_string());
203		}
204		outcome
205	}
206
207	#[instrument(name = "engine::query_as", level = "debug", skip(self, params), fields(rql = %rql))]
208	pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
209		let mut txn = match self.begin_query(identity) {
210			Ok(t) => t,
211			Err(mut e) => {
212				e.with_statement(rql.to_string());
213				return ExecutionResult {
214					frames: vec![],
215					error: Some(e),
216					metrics: ExecutionMetrics::default(),
217				};
218			}
219		};
220		let mut outcome = self.executor.query(
221			&mut txn,
222			Query {
223				rql,
224				params,
225			},
226		);
227		if let Some(ref mut e) = outcome.error {
228			e.with_statement(rql.to_string());
229		}
230		outcome
231	}
232
233	#[instrument(name = "engine::subscribe_as", level = "debug", skip(self, params), fields(rql = %rql))]
234	pub fn subscribe_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
235		let mut txn = match self.begin_query(identity) {
236			Ok(t) => t,
237			Err(mut e) => {
238				e.with_statement(rql.to_string());
239				return ExecutionResult {
240					frames: vec![],
241					error: Some(e),
242					metrics: ExecutionMetrics::default(),
243				};
244			}
245		};
246		let mut outcome = self.executor.subscription(
247			&mut txn,
248			Subscription {
249				rql,
250				params,
251			},
252		);
253		if let Some(ref mut e) = outcome.error {
254			e.with_statement(rql.to_string());
255		}
256		outcome
257	}
258
259	/// Call a procedure by fully-qualified name.
260	#[instrument(name = "engine::procedure_as", level = "debug", skip(self, params), fields(name = %name))]
261	pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> ExecutionResult {
262		if let Err(e) = self.reject_if_read_only() {
263			return ExecutionResult {
264				frames: vec![],
265				error: Some(e),
266				metrics: ExecutionMetrics::default(),
267			};
268		}
269		let mut txn = match self.begin_command(identity) {
270			Ok(t) => t,
271			Err(e) => {
272				return ExecutionResult {
273					frames: vec![],
274					error: Some(e),
275					metrics: ExecutionMetrics::default(),
276				};
277			}
278		};
279		let mut outcome = self.executor.call_procedure(&mut txn, name, &params);
280		if outcome.is_ok()
281			&& let Err(e) = txn.commit()
282		{
283			outcome.error = Some(e);
284		}
285		outcome
286	}
287
288	/// Register a user-defined virtual table.
289	///
290	/// The virtual table will be available for queries using the given namespace and name.
291	///
292	/// # Arguments
293	///
294	/// * `namespace` - The namespace name (e.g., "default", "my_namespace")
295	/// * `name` - The table name
296	/// * `table` - The virtual table implementation
297	///
298	/// # Returns
299	///
300	/// The assigned `VTableId` on success.
301	///
302	/// # Example
303	///
304	/// ```ignore
305	/// use reifydb_engine::vtable::{UserVTable, UserVTableColumn};
306	/// use reifydb_type::value::r#type::Type;
307	/// use reifydb_core::value::Columns;
308	///
309	/// #[derive(Clone)]
310	/// struct MyTable;
311	///
312	/// impl UserVTable for MyTable {
313	///     fn definition(&self) -> Vec<UserVTableColumn> {
314	///         vec![UserVTableColumn::new("id", Type::Uint8)]
315	///     }
316	///     fn get(&self) -> Columns {
317	///         // Return column-oriented data
318	///         Columns::empty()
319	///     }
320	/// }
321	///
322	/// let id = engine.register_virtual_table("default", "my_table", MyTable)?;
323	/// ```
324	pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
325		let catalog = self.materialized_catalog();
326
327		// Look up namespace by name (use max u64 to get latest version)
328		let ns_def = catalog
329			.find_namespace_by_name(namespace)
330			.ok_or_else(|| Error(Box::new(namespace_not_found(Fragment::None, namespace))))?;
331
332		// Allocate a new table ID
333		let table_id = self.executor.virtual_table_registry.allocate_id();
334		// Convert user column definitions to internal column definitions
335		let table_columns = table.vtable();
336		let columns = convert_vtable_user_columns_to_columns(&table_columns);
337
338		// Create the table definition
339		let def = Arc::new(VTable {
340			id: table_id,
341			namespace: ns_def.id(),
342			name: name.to_string(),
343			columns,
344		});
345
346		// Register in catalog (for resolver lookups)
347		catalog.register_vtable_user(def.clone())?;
348		// Create the data function from the UserVTable trait
349		let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
350		// Create and register the entry
351		let entry = UserVTableEntry {
352			def: def.clone(),
353			data_fn,
354		};
355		self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
356		Ok(table_id)
357	}
358}
359
360impl CdcHost for StandardEngine {
361	fn begin_command(&self) -> Result<CommandTransaction> {
362		StandardEngine::begin_command(self, IdentityId::system())
363	}
364
365	fn begin_query(&self) -> Result<QueryTransaction> {
366		StandardEngine::begin_query(self, IdentityId::system())
367	}
368
369	fn current_version(&self) -> Result<CommitVersion> {
370		StandardEngine::current_version(self)
371	}
372
373	fn done_until(&self) -> CommitVersion {
374		StandardEngine::done_until(self)
375	}
376
377	fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
378		StandardEngine::wait_for_mark_timeout(self, version, timeout)
379	}
380
381	fn materialized_catalog(&self) -> &MaterializedCatalog {
382		&self.catalog.materialized
383	}
384}
385
386impl Clone for StandardEngine {
387	fn clone(&self) -> Self {
388		Self(self.0.clone())
389	}
390}
391
392impl Deref for StandardEngine {
393	type Target = Inner;
394
395	fn deref(&self) -> &Self::Target {
396		&self.0
397	}
398}
399
400pub struct Inner {
401	multi: MultiTransaction,
402	single: SingleTransaction,
403	event_bus: EventBus,
404	executor: Executor,
405	interceptors: Arc<InterceptorFactory>,
406	catalog: Catalog,
407	flow_operator_store: SystemFlowOperatorStore,
408	read_only: AtomicBool,
409}
410
411impl StandardEngine {
412	pub fn new(
413		multi: MultiTransaction,
414		single: SingleTransaction,
415		event_bus: EventBus,
416		interceptors: InterceptorFactory,
417		catalog: Catalog,
418		config: EngineConfig,
419	) -> Self {
420		let flow_operator_store = SystemFlowOperatorStore::new();
421		let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
422		event_bus.register(listener);
423
424		// Get the metrics store from IoC to create the stats reader
425		let metrics_store = config
426			.ioc
427			.resolve::<SingleStore>()
428			.expect("SingleStore must be registered in IocContainer for metrics");
429		let stats_reader = MetricReader::new(metrics_store);
430
431		// Register MaterializedCatalogInterceptor as a factory function.
432		let materialized = catalog.materialized.clone();
433		interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
434			interceptors
435				.post_commit
436				.add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
437		}));
438
439		let interceptors = Arc::new(interceptors);
440
441		Self(Arc::new(Inner {
442			multi,
443			single,
444			event_bus,
445			executor: Executor::new(catalog.clone(), config, flow_operator_store.clone(), stats_reader),
446			interceptors,
447			catalog,
448			flow_operator_store,
449			read_only: AtomicBool::new(false),
450		}))
451	}
452
453	/// Create a new set of interceptors from the factory.
454	pub fn create_interceptors(&self) -> Interceptors {
455		self.interceptors.create()
456	}
457
458	/// Register an additional interceptor factory function.
459	///
460	/// The function will be called on every `create()` to augment the base interceptors.
461	/// This is thread-safe and can be called after the engine is constructed (e.g. by subsystems).
462	pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
463		self.interceptors.add_late(factory);
464	}
465
466	/// Begin a query transaction at a specific version.
467	///
468	/// This is used for parallel query execution where multiple tasks need to
469	/// read from the same snapshot (same CommitVersion) for consistency.
470	#[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
471    ))]
472	pub fn begin_query_at_version(&self, version: CommitVersion, identity: IdentityId) -> Result<QueryTransaction> {
473		let mut txn = QueryTransaction::new(
474			self.multi.begin_query_at_version(version)?,
475			self.single.clone(),
476			identity,
477		);
478		txn.set_executor(Arc::new(self.executor.clone()));
479		Ok(txn)
480	}
481
482	#[inline]
483	pub fn multi(&self) -> &MultiTransaction {
484		&self.multi
485	}
486
487	#[inline]
488	pub fn multi_owned(&self) -> MultiTransaction {
489		self.multi.clone()
490	}
491
492	/// Get the actor system
493	#[inline]
494	pub fn actor_system(&self) -> ActorSystem {
495		self.multi.actor_system()
496	}
497
498	#[inline]
499	pub fn single(&self) -> &SingleTransaction {
500		&self.single
501	}
502
503	#[inline]
504	pub fn single_owned(&self) -> SingleTransaction {
505		self.single.clone()
506	}
507
508	#[inline]
509	pub fn emit<E: Event>(&self, event: E) {
510		self.event_bus.emit(event)
511	}
512
513	#[inline]
514	pub fn materialized_catalog(&self) -> &MaterializedCatalog {
515		&self.catalog.materialized
516	}
517
518	/// Returns a `Catalog` instance for catalog lookups.
519	/// The Catalog provides three-tier lookup methods that check transactional changes,
520	/// then MaterializedCatalog, then fall back to storage.
521	#[inline]
522	pub fn catalog(&self) -> Catalog {
523		self.catalog.clone()
524	}
525
526	#[inline]
527	pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
528		&self.flow_operator_store
529	}
530
531	/// Get the current version from the transaction manager
532	#[inline]
533	pub fn current_version(&self) -> Result<CommitVersion> {
534		self.multi.current_version()
535	}
536
537	/// Returns the highest version where ALL prior versions have completed.
538	/// This is useful for CDC polling to know the safe upper bound for fetching
539	/// CDC events - all events up to this version are guaranteed to be in storage.
540	#[inline]
541	pub fn done_until(&self) -> CommitVersion {
542		self.multi.done_until()
543	}
544
545	/// Wait for the watermark to reach the given version with a timeout.
546	/// Returns true if the watermark reached the target, false if timeout occurred.
547	#[inline]
548	pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
549		self.multi.wait_for_mark_timeout(version, timeout)
550	}
551
552	#[inline]
553	pub fn executor(&self) -> Executor {
554		self.executor.clone()
555	}
556
557	/// Get the CDC store from the IoC container.
558	///
559	/// Returns the CdcStore that was registered during engine construction.
560	/// Panics if CdcStore was not registered.
561	#[inline]
562	pub fn cdc_store(&self) -> CdcStore {
563		self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
564	}
565
566	/// Mark this engine as read-only (replica mode).
567	/// Once set, all write-path methods will return ENG_007 immediately.
568	pub fn set_read_only(&self) {
569		self.read_only.store(true, Ordering::SeqCst);
570	}
571
572	/// Whether this engine is in read-only (replica) mode.
573	pub fn is_read_only(&self) -> bool {
574		self.read_only.load(Ordering::SeqCst)
575	}
576
577	pub(crate) fn reject_if_read_only(&self) -> Result<()> {
578		if self.is_read_only() {
579			return Err(Error(Box::new(read_only_rejection(Fragment::None))));
580		}
581		Ok(())
582	}
583
584	pub fn shutdown(&self) {
585		self.interceptors.clear_late();
586		self.executor.ioc.clear();
587	}
588
589	/// Start a bulk insert operation with full validation.
590	///
591	/// This provides a fluent API for fast bulk inserts that bypasses RQL parsing.
592	/// All inserts within a single builder execute in one transaction.
593	///
594	/// # Example
595	///
596	/// ```ignore
597	/// use reifydb_type::params;
598	///
599	/// engine.bulk_insert(&identity)
600	///     .table("namespace.users")
601	///         .row(params!{ id: 1, name: "Alice" })
602	///         .row(params!{ id: 2, name: "Bob" })
603	///         .done()
604	///     .execute()?;
605	/// ```
606	pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
607		BulkInsertBuilder::new(self, identity)
608	}
609
610	/// Start a bulk insert operation with validation disabled (trusted mode).
611	///
612	/// Use this for pre-validated internal data where constraint validation
613	/// can be skipped for maximum performance.
614	///
615	/// # Safety
616	///
617	/// The caller is responsible for ensuring the data conforms to the
618	/// shape constraints. Invalid data may cause undefined behavior.
619	pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
620		BulkInsertBuilder::new_trusted(self, identity)
621	}
622}
623
624/// Convert user column definitions to internal Column format.
625fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
626	columns.iter()
627		.enumerate()
628		.map(|(idx, col)| {
629			// Note: For virtual tables, we use unconstrained for all types.
630			// The nullable field is still available for documentation purposes.
631			let constraint = TypeConstraint::unconstrained(col.data_type.clone());
632			Column {
633				id: ColumnId(idx as u64),
634				name: col.name.clone(),
635				constraint,
636				properties: vec![],
637				index: ColumnIndex(idx as u8),
638				auto_increment: false,
639				dictionary_id: None,
640			}
641		})
642		.collect()
643}