reifydb_engine/
engine.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{ops::Deref, rc::Rc, sync::Arc, time::Duration};
5
6use reifydb_catalog::MaterializedCatalog;
7use reifydb_core::{
8	CommitVersion, Frame,
9	event::{Event, EventBus},
10	interceptor::InterceptorFactory,
11	interface::{
12		ColumnDef, ColumnId, ColumnIndex, Command, Engine as EngineInterface, ExecuteCommand, ExecuteQuery,
13		Identity, MultiVersionTransaction, Params, Query, TableVirtualDef, TableVirtualId, WithEventBus,
14	},
15};
16use reifydb_transaction::{
17	cdc::TransactionCdc,
18	multi::{AwaitWatermarkError, TransactionMultiVersion},
19	single::TransactionSingleVersion,
20};
21use reifydb_type::{OwnedFragment, TypeConstraint};
22use tracing::instrument;
23
24use crate::{
25	execute::Executor,
26	function::{Functions, generator, math},
27	interceptor::{CatalogEventInterceptor, materialized_catalog::MaterializedCatalogInterceptor},
28	table_virtual::{
29		IteratorVirtualTableFactory, SimpleVirtualTableFactory, TableVirtualUser, TableVirtualUserColumnDef,
30		TableVirtualUserIterator,
31		system::{FlowOperatorEventListener, FlowOperatorStore},
32	},
33	transaction::{StandardCommandTransaction, StandardQueryTransaction},
34};
35
36pub struct StandardEngine(Arc<EngineInner>);
37
38impl WithEventBus for StandardEngine {
39	fn event_bus(&self) -> &EventBus {
40		&self.event_bus
41	}
42}
43
44impl EngineInterface for StandardEngine {
45	type Command = StandardCommandTransaction;
46	type Query = StandardQueryTransaction;
47
48	#[instrument(level = "debug", skip(self))]
49	fn begin_command(&self) -> crate::Result<Self::Command> {
50		let mut interceptors = self.interceptors.create();
51
52		interceptors.post_commit.add(Rc::new(MaterializedCatalogInterceptor::new(self.catalog.clone())));
53		interceptors
54			.post_commit
55			.add(Rc::new(CatalogEventInterceptor::new(self.event_bus.clone(), self.catalog.clone())));
56
57		StandardCommandTransaction::new(
58			self.multi.clone(),
59			self.single.clone(),
60			self.cdc.clone(),
61			self.event_bus.clone(),
62			self.catalog.clone(),
63			interceptors,
64		)
65	}
66
67	#[instrument(level = "debug", skip(self))]
68	fn begin_query(&self) -> crate::Result<Self::Query> {
69		Ok(StandardQueryTransaction::new(
70			self.multi.begin_query()?,
71			self.single.clone(),
72			self.cdc.clone(),
73			self.catalog.clone(),
74		))
75	}
76
77	#[instrument(level = "info", skip(self, params), fields(rql = %rql))]
78	fn command_as(&self, identity: &Identity, rql: &str, params: Params) -> crate::Result<Vec<Frame>> {
79		let mut txn = self.begin_command()?;
80		let result = self.execute_command(
81			&mut txn,
82			Command {
83				rql,
84				params,
85				identity,
86			},
87		)?;
88		txn.commit()?;
89		Ok(result)
90	}
91
92	#[instrument(level = "info", skip(self, params), fields(rql = %rql))]
93	fn query_as(&self, identity: &Identity, rql: &str, params: Params) -> crate::Result<Vec<Frame>> {
94		let mut txn = self.begin_query()?;
95		let result = self.execute_query(
96			&mut txn,
97			Query {
98				rql,
99				params,
100				identity,
101			},
102		)?;
103		Ok(result)
104	}
105}
106
107impl ExecuteCommand<StandardCommandTransaction> for StandardEngine {
108	#[inline]
109	fn execute_command(&self, txn: &mut StandardCommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
110		self.executor.execute_command(txn, cmd)
111	}
112}
113
114impl ExecuteQuery<StandardQueryTransaction> for StandardEngine {
115	#[inline]
116	fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
117		self.executor.execute_query(txn, qry)
118	}
119}
120
121impl Clone for StandardEngine {
122	fn clone(&self) -> Self {
123		Self(self.0.clone())
124	}
125}
126
127impl Deref for StandardEngine {
128	type Target = EngineInner;
129
130	fn deref(&self) -> &Self::Target {
131		&self.0
132	}
133}
134
135pub struct EngineInner {
136	multi: TransactionMultiVersion,
137	single: TransactionSingleVersion,
138	cdc: TransactionCdc,
139	event_bus: EventBus,
140	executor: Executor,
141	interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
142	catalog: MaterializedCatalog,
143	flow_operator_store: FlowOperatorStore,
144}
145
146impl StandardEngine {
147	pub fn new(
148		multi: TransactionMultiVersion,
149		single: TransactionSingleVersion,
150		cdc: TransactionCdc,
151		event_bus: EventBus,
152		interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
153		catalog: MaterializedCatalog,
154	) -> Self {
155		Self::with_functions(multi, single, cdc, event_bus, interceptors, catalog, None)
156	}
157
158	pub fn with_functions(
159		multi: TransactionMultiVersion,
160		single: TransactionSingleVersion,
161		cdc: TransactionCdc,
162		event_bus: EventBus,
163		interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
164		catalog: MaterializedCatalog,
165		custom_functions: Option<Functions>,
166	) -> Self {
167		let functions = custom_functions.unwrap_or_else(|| {
168			Functions::builder()
169				.register_aggregate("math::sum", math::aggregate::Sum::new)
170				.register_aggregate("math::min", math::aggregate::Min::new)
171				.register_aggregate("math::max", math::aggregate::Max::new)
172				.register_aggregate("math::avg", math::aggregate::Avg::new)
173				.register_aggregate("math::count", math::aggregate::Count::new)
174				.register_scalar("math::abs", math::scalar::Abs::new)
175				.register_scalar("math::avg", math::scalar::Avg::new)
176				.register_generator("generate_series", generator::GenerateSeries::new)
177				.build()
178		});
179
180		// Create the flow operator store and register the event listener
181		let flow_operator_store = FlowOperatorStore::new();
182		let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
183		event_bus.register(listener);
184
185		Self(Arc::new(EngineInner {
186			multi,
187			single,
188			cdc,
189			event_bus,
190			executor: Executor::new(functions, flow_operator_store.clone()),
191			interceptors,
192			catalog,
193			flow_operator_store,
194		}))
195	}
196
197	#[inline]
198	pub fn multi(&self) -> &TransactionMultiVersion {
199		&self.multi
200	}
201
202	#[inline]
203	pub fn multi_owned(&self) -> TransactionMultiVersion {
204		self.multi.clone()
205	}
206
207	#[inline]
208	pub fn single(&self) -> &TransactionSingleVersion {
209		&self.single
210	}
211
212	#[inline]
213	pub fn single_owned(&self) -> TransactionSingleVersion {
214		self.single.clone()
215	}
216
217	#[inline]
218	pub fn cdc(&self) -> &TransactionCdc {
219		&self.cdc
220	}
221
222	#[inline]
223	pub fn cdc_owned(&self) -> TransactionCdc {
224		self.cdc.clone()
225	}
226
227	#[inline]
228	pub fn emit<E: Event>(&self, event: E) {
229		self.event_bus.emit(event)
230	}
231
232	#[inline]
233	pub fn catalog(&self) -> &MaterializedCatalog {
234		&self.catalog
235	}
236
237	#[inline]
238	pub fn flow_operator_store(&self) -> &FlowOperatorStore {
239		&self.flow_operator_store
240	}
241
242	/// Get the current version from the transaction manager
243	#[inline]
244	pub fn current_version(&self) -> crate::Result<CommitVersion> {
245		self.multi.current_version()
246	}
247
248	/// Wait for the watermark to reach the specified version.
249	/// Returns Ok(()) if the watermark reaches the version within the timeout,
250	/// or Err(AwaitWatermarkError) if the timeout expires.
251	///
252	/// This is useful for CDC polling to ensure all in-flight commits have
253	/// completed their storage writes before querying for CDC events.
254	#[inline]
255	pub fn try_wait_for_watermark(
256		&self,
257		version: CommitVersion,
258		timeout: Duration,
259	) -> Result<(), AwaitWatermarkError> {
260		self.multi.try_wait_for_watermark(version, timeout)
261	}
262
263	/// Returns the highest version where ALL prior versions have completed.
264	/// This is useful for CDC polling to know the safe upper bound for fetching
265	/// CDC events - all events up to this version are guaranteed to be in storage.
266	#[inline]
267	pub fn done_until(&self) -> CommitVersion {
268		self.multi.done_until()
269	}
270
271	/// Returns (query_done_until, command_done_until) for debugging watermark state.
272	#[inline]
273	pub fn watermarks(&self) -> (CommitVersion, CommitVersion) {
274		self.multi.watermarks()
275	}
276
277	#[inline]
278	pub fn executor(&self) -> Executor {
279		self.executor.clone()
280	}
281
282	/// Register a user-defined virtual table.
283	///
284	/// The virtual table will be available for queries using the given namespace and name.
285	///
286	/// # Arguments
287	///
288	/// * `namespace` - The namespace name (e.g., "default", "my_namespace")
289	/// * `name` - The table name
290	/// * `table` - The virtual table implementation
291	///
292	/// # Returns
293	///
294	/// The assigned `TableVirtualId` on success.
295	///
296	/// # Example
297	///
298	/// ```ignore
299	/// use reifydb_engine::table_virtual::{TableVirtualUser, TableVirtualUserColumnDef};
300	/// use reifydb_type::Type;
301	/// use reifydb_core::value::Value;
302	///
303	/// #[derive(Clone)]
304	/// struct MyTable;
305	///
306	/// impl TableVirtualUser for MyTable {
307	///     fn columns(&self) -> Vec<TableVirtualUserColumnDef> {
308	///         vec![TableVirtualUserColumnDef::new("id", Type::Uint8)]
309	///     }
310	///     fn rows(&self) -> Vec<Vec<Value>> {
311	///         vec![vec![Value::Uint8(1)], vec![Value::Uint8(2)]]
312	///     }
313	/// }
314	///
315	/// let id = engine.register_virtual_table("default", "my_table", MyTable)?;
316	/// ```
317	pub fn register_virtual_table<T: TableVirtualUser + Clone>(
318		&self,
319		namespace: &str,
320		name: &str,
321		table: T,
322	) -> crate::Result<TableVirtualId> {
323		// Look up namespace by name (use max u64 to get latest version)
324		let ns_def =
325			self.catalog.find_namespace_by_name(namespace, CommitVersion(u64::MAX)).ok_or_else(|| {
326				reifydb_type::Error(reifydb_type::diagnostic::catalog::namespace_not_found(
327					OwnedFragment::None,
328					namespace,
329				))
330			})?;
331
332		// Allocate a new table ID
333		let table_id = self.executor.virtual_table_registry.allocate_id();
334
335		// Convert user columns to internal column definitions
336		let table_columns = table.columns();
337		let columns = convert_table_virtual_user_columns_to_column_defs(&table_columns);
338
339		// Create the table definition
340		let def = Arc::new(TableVirtualDef {
341			id: table_id,
342			namespace: ns_def.id,
343			name: name.to_string(),
344			columns,
345		});
346
347		// Register in catalog (for resolver lookups)
348		self.catalog.register_table_virtual_user(def.clone())?;
349
350		// Create and register the factory (for runtime instantiation)
351		let factory = Arc::new(SimpleVirtualTableFactory::new(table, def.clone()));
352		self.executor.virtual_table_registry.register(ns_def.id, name.to_string(), factory);
353
354		Ok(table_id)
355	}
356
357	/// Unregister a user-defined virtual table.
358	///
359	/// # Arguments
360	///
361	/// * `namespace` - The namespace name
362	/// * `name` - The table name
363	pub fn unregister_virtual_table(&self, namespace: &str, name: &str) -> crate::Result<()> {
364		// Look up namespace by name (use max u64 to get latest version)
365		let ns_def =
366			self.catalog.find_namespace_by_name(namespace, CommitVersion(u64::MAX)).ok_or_else(|| {
367				reifydb_type::Error(reifydb_type::diagnostic::catalog::namespace_not_found(
368					OwnedFragment::None,
369					namespace,
370				))
371			})?;
372
373		// Unregister from catalog
374		self.catalog.unregister_table_virtual_user(ns_def.id, name)?;
375
376		// Unregister from executor registry
377		self.executor.virtual_table_registry.unregister(ns_def.id, name);
378
379		Ok(())
380	}
381
382	/// Register a user-defined virtual table using an iterator-based implementation.
383	///
384	/// This method is for tables that stream data in batches, which is more efficient
385	/// for large datasets. The creator function is called once per query to create
386	/// a fresh iterator instance.
387	///
388	/// # Arguments
389	///
390	/// * `namespace` - The namespace to register the table in
391	/// * `name` - The table name
392	/// * `creator` - A function that creates a new iterator instance for each query
393	///
394	/// # Returns
395	///
396	/// The ID of the registered virtual table
397	pub fn register_virtual_table_iterator<F>(
398		&self,
399		namespace: &str,
400		name: &str,
401		creator: F,
402	) -> crate::Result<TableVirtualId>
403	where
404		F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
405	{
406		// Look up namespace by name (use max u64 to get latest version)
407		let ns_def =
408			self.catalog.find_namespace_by_name(namespace, CommitVersion(u64::MAX)).ok_or_else(|| {
409				reifydb_type::Error(reifydb_type::diagnostic::catalog::namespace_not_found(
410					OwnedFragment::None,
411					namespace,
412				))
413			})?;
414
415		// Allocate a new table ID
416		let table_id = self.executor.virtual_table_registry.allocate_id();
417
418		// Get columns from a temporary instance
419		let temp_iter = creator();
420		let table_columns = temp_iter.columns();
421		let columns = convert_table_virtual_user_columns_to_column_defs(&table_columns);
422
423		// Create the table definition
424		let def = Arc::new(TableVirtualDef {
425			id: table_id,
426			namespace: ns_def.id,
427			name: name.to_string(),
428			columns,
429		});
430
431		// Register in catalog (for resolver lookups)
432		self.catalog.register_table_virtual_user(def.clone())?;
433
434		// Create and register the factory (for runtime instantiation)
435		let factory = Arc::new(IteratorVirtualTableFactory::new(creator, def.clone()));
436		self.executor.virtual_table_registry.register(ns_def.id, name.to_string(), factory);
437
438		Ok(table_id)
439	}
440}
441
442/// Convert user column definitions to internal ColumnDef format.
443fn convert_table_virtual_user_columns_to_column_defs(columns: &[TableVirtualUserColumnDef]) -> Vec<ColumnDef> {
444	columns.iter()
445		.enumerate()
446		.map(|(idx, col)| {
447			// Note: For virtual tables, we use unconstrained for all types.
448			// The nullable field is still available for documentation purposes.
449			let constraint = TypeConstraint::unconstrained(col.data_type);
450			ColumnDef {
451				id: ColumnId(idx as u64),
452				name: col.name.clone(),
453				constraint,
454				policies: vec![],
455				index: ColumnIndex(idx as u8),
456				auto_increment: false,
457				dictionary_id: None,
458			}
459		})
460		.collect()
461}