reifydb_engine/
engine.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{ops::Deref, sync::Arc, time::Duration};
5
6use async_trait::async_trait;
7use reifydb_catalog::MaterializedCatalog;
8use reifydb_core::{
9	CommitVersion, Frame,
10	event::{Event, EventBus},
11	interceptor::InterceptorFactory,
12	interface::{
13		ColumnDef, ColumnId, ColumnIndex, Command, Engine as EngineInterface, ExecuteCommand, ExecuteQuery,
14		Identity, Params, Query, TableVirtualDef, TableVirtualId, WithEventBus,
15	},
16	stream::{SendableFrameStream, StreamError},
17};
18use reifydb_transaction::{
19	cdc::TransactionCdc,
20	multi::{AwaitWatermarkError, TransactionMultiVersion},
21	single::TransactionSingle,
22};
23use reifydb_type::{Fragment, TypeConstraint};
24use tokio::spawn;
25use tokio_util::sync::CancellationToken;
26use tracing::instrument;
27
28use crate::{
29	execute::Executor,
30	function::{Functions, generator, math},
31	interceptor::{CatalogEventInterceptor, materialized_catalog::MaterializedCatalogInterceptor},
32	stream::{ChannelFrameStream, FrameSender},
33	table_virtual::{
34		IteratorVirtualTableFactory, SimpleVirtualTableFactory, TableVirtualUser, TableVirtualUserColumnDef,
35		TableVirtualUserIterator,
36		system::{FlowOperatorEventListener, FlowOperatorStore},
37	},
38	transaction::{StandardCommandTransaction, StandardQueryTransaction},
39};
40
41pub struct StandardEngine(Arc<EngineInner>);
42
43impl WithEventBus for StandardEngine {
44	fn event_bus(&self) -> &EventBus {
45		&self.event_bus
46	}
47}
48
49#[async_trait]
50impl EngineInterface for StandardEngine {
51	type Command = StandardCommandTransaction;
52	type Query = StandardQueryTransaction;
53
54	#[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
55	async fn begin_command(&self) -> crate::Result<Self::Command> {
56		let mut interceptors = self.interceptors.create();
57
58		interceptors.post_commit.add(Arc::new(MaterializedCatalogInterceptor::new(self.catalog.clone())));
59		interceptors
60			.post_commit
61			.add(Arc::new(CatalogEventInterceptor::new(self.event_bus.clone(), self.catalog.clone())));
62
63		StandardCommandTransaction::new(
64			self.multi.clone(),
65			self.single.clone(),
66			self.cdc.clone(),
67			self.event_bus.clone(),
68			self.catalog.clone(),
69			interceptors,
70		)
71		.await
72	}
73
74	#[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
75	async fn begin_query(&self) -> crate::Result<Self::Query> {
76		Ok(StandardQueryTransaction::new(
77			self.multi.begin_query().await?,
78			self.single.clone(),
79			self.cdc.clone(),
80			self.catalog.clone(),
81		))
82	}
83
84	#[instrument(name = "engine::command", level = "info", skip(self, params), fields(rql = %rql))]
85	fn command_as(&self, identity: &Identity, rql: &str, params: Params) -> SendableFrameStream {
86		let engine = self.clone();
87		let identity = identity.clone();
88		let rql = rql.to_string();
89		let cancel_token = CancellationToken::new();
90
91		let (sender, stream) = ChannelFrameStream::new(8, cancel_token.clone());
92
93		spawn(execute_command(engine, identity, rql, params, sender, cancel_token));
94
95		Box::pin(stream)
96	}
97
98	#[instrument(name = "engine::query", level = "info", skip(self, params), fields(rql = %rql))]
99	fn query_as(&self, identity: &Identity, rql: &str, params: Params) -> SendableFrameStream {
100		let engine = self.clone();
101		let identity = identity.clone();
102		let rql = rql.to_string();
103		let cancel_token = CancellationToken::new();
104
105		let (sender, stream) = ChannelFrameStream::new(8, cancel_token.clone());
106
107		spawn(execute_query(engine, identity, rql, params, sender, cancel_token));
108
109		Box::pin(stream)
110	}
111}
112
113/// Execute a command and send results to the stream.
114async fn execute_command(
115	engine: StandardEngine,
116	identity: Identity,
117	rql: String,
118	params: Params,
119	sender: FrameSender,
120	cancel_token: CancellationToken,
121) {
122	// Check for cancellation before starting
123	if cancel_token.is_cancelled() {
124		return;
125	}
126
127	// Begin transaction
128	let txn_result = engine.begin_command().await;
129	let mut txn = match txn_result {
130		Ok(txn) => txn,
131		Err(e) => {
132			let _ = sender.try_send(Err(StreamError::query_with_statement(e, rql.clone())));
133			return;
134		}
135	};
136
137	// Execute command - call executor directly to avoid trait object indirection
138	let result = engine
139		.executor
140		.execute_command(
141			&mut txn,
142			Command {
143				rql: &rql,
144				params,
145				identity: &identity,
146			},
147		)
148		.await;
149
150	match result {
151		Ok(frames) => {
152			// Commit transaction
153			if let Err(e) = txn.commit().await {
154				let _ = sender.try_send(Err(StreamError::query_with_statement(e, rql)));
155				return;
156			}
157
158			// Send each frame through the channel
159			for frame in frames {
160				if cancel_token.is_cancelled() {
161					return;
162				}
163				if sender.send(Ok(frame)).await.is_err() {
164					return; // Receiver dropped
165				}
166			}
167		}
168		Err(e) => {
169			// Rollback on error (drop will handle it)
170			let _ = sender.try_send(Err(StreamError::query_with_statement(e, rql)));
171		}
172	}
173}
174
175/// Execute a query and send results to the stream.
176async fn execute_query(
177	engine: StandardEngine,
178	identity: Identity,
179	rql: String,
180	params: Params,
181	sender: FrameSender,
182	cancel_token: CancellationToken,
183) {
184	// Check for cancellation before starting
185	if cancel_token.is_cancelled() {
186		return;
187	}
188
189	// Begin transaction
190	let txn_result = engine.begin_query().await;
191	let mut txn = match txn_result {
192		Ok(txn) => txn,
193		Err(e) => {
194			let _ = sender.try_send(Err(StreamError::query_with_statement(e, rql.clone())));
195			return;
196		}
197	};
198
199	// Execute query - call executor directly to avoid trait object indirection
200	let result = engine
201		.executor
202		.execute_query(
203			&mut txn,
204			Query {
205				rql: &rql,
206				params,
207				identity: &identity,
208			},
209		)
210		.await;
211
212	match result {
213		Ok(frames) => {
214			// Send each frame through the channel
215			for frame in frames {
216				if cancel_token.is_cancelled() {
217					return;
218				}
219				if sender.send(Ok(frame)).await.is_err() {
220					return; // Receiver dropped
221				}
222			}
223		}
224		Err(e) => {
225			let _ = sender.try_send(Err(StreamError::query_with_statement(e, rql)));
226		}
227	}
228}
229
230#[async_trait]
231impl ExecuteCommand<StandardCommandTransaction> for StandardEngine {
232	#[inline]
233	async fn execute_command(
234		&self,
235		txn: &mut StandardCommandTransaction,
236		cmd: Command<'_>,
237	) -> crate::Result<Vec<Frame>> {
238		self.executor.execute_command(txn, cmd).await
239	}
240}
241
242#[async_trait]
243impl ExecuteQuery<StandardQueryTransaction> for StandardEngine {
244	#[inline]
245	async fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
246		self.executor.execute_query(txn, qry).await
247	}
248}
249
250impl Clone for StandardEngine {
251	fn clone(&self) -> Self {
252		Self(self.0.clone())
253	}
254}
255
256impl Deref for StandardEngine {
257	type Target = EngineInner;
258
259	fn deref(&self) -> &Self::Target {
260		&self.0
261	}
262}
263
264pub struct EngineInner {
265	multi: TransactionMultiVersion,
266	single: TransactionSingle,
267	cdc: TransactionCdc,
268	event_bus: EventBus,
269	executor: Executor,
270	interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
271	catalog: MaterializedCatalog,
272	flow_operator_store: FlowOperatorStore,
273}
274
275impl StandardEngine {
276	pub async fn new(
277		multi: TransactionMultiVersion,
278		single: TransactionSingle,
279		cdc: TransactionCdc,
280		event_bus: EventBus,
281		interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
282		catalog: MaterializedCatalog,
283		custom_functions: Option<Functions>,
284	) -> Self {
285		let functions = custom_functions.unwrap_or_else(|| {
286			Functions::builder()
287				.register_aggregate("math::sum", math::aggregate::Sum::new)
288				.register_aggregate("math::min", math::aggregate::Min::new)
289				.register_aggregate("math::max", math::aggregate::Max::new)
290				.register_aggregate("math::avg", math::aggregate::Avg::new)
291				.register_aggregate("math::count", math::aggregate::Count::new)
292				.register_scalar("math::abs", math::scalar::Abs::new)
293				.register_scalar("math::avg", math::scalar::Avg::new)
294				.register_generator("generate_series", generator::GenerateSeries::new)
295				.build()
296		});
297
298		// Create the flow operator store and register the event listener
299		let flow_operator_store = FlowOperatorStore::new();
300		let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
301		event_bus.register(listener).await;
302
303		let stats_tracker = multi.store().stats_tracker().clone();
304
305		Self(Arc::new(EngineInner {
306			multi,
307			single,
308			cdc,
309			event_bus,
310			executor: Executor::new(functions, flow_operator_store.clone(), stats_tracker),
311			interceptors,
312			catalog,
313			flow_operator_store,
314		}))
315	}
316
317	#[inline]
318	pub fn multi(&self) -> &TransactionMultiVersion {
319		&self.multi
320	}
321
322	#[inline]
323	pub fn multi_owned(&self) -> TransactionMultiVersion {
324		self.multi.clone()
325	}
326
327	#[inline]
328	pub fn single(&self) -> &TransactionSingle {
329		&self.single
330	}
331
332	#[inline]
333	pub fn single_owned(&self) -> TransactionSingle {
334		self.single.clone()
335	}
336
337	#[inline]
338	pub fn cdc(&self) -> &TransactionCdc {
339		&self.cdc
340	}
341
342	#[inline]
343	pub fn cdc_owned(&self) -> TransactionCdc {
344		self.cdc.clone()
345	}
346
347	#[inline]
348	pub async fn emit<E: Event>(&self, event: E) {
349		self.event_bus.emit(event).await
350	}
351
352	#[inline]
353	pub fn catalog(&self) -> &MaterializedCatalog {
354		&self.catalog
355	}
356
357	#[inline]
358	pub fn flow_operator_store(&self) -> &FlowOperatorStore {
359		&self.flow_operator_store
360	}
361
362	/// Get the current version from the transaction manager
363	#[inline]
364	pub async fn current_version(&self) -> crate::Result<CommitVersion> {
365		self.multi.current_version().await
366	}
367
368	/// Wait for the watermark to reach the specified version.
369	/// Returns Ok(()) if the watermark reaches the version within the timeout,
370	/// or Err(AwaitWatermarkError) if the timeout expires.
371	///
372	/// This is useful for CDC polling to ensure all in-flight commits have
373	/// completed their storage writes before querying for CDC events.
374	#[inline]
375	pub async fn try_wait_for_watermark(
376		&self,
377		version: CommitVersion,
378		timeout: Duration,
379	) -> Result<(), AwaitWatermarkError> {
380		self.multi.try_wait_for_watermark(version, timeout).await
381	}
382
383	/// Returns the highest version where ALL prior versions have completed.
384	/// This is useful for CDC polling to know the safe upper bound for fetching
385	/// CDC events - all events up to this version are guaranteed to be in storage.
386	#[inline]
387	pub fn done_until(&self) -> CommitVersion {
388		self.multi.done_until()
389	}
390
391	/// Returns (query_done_until, command_done_until) for debugging watermark state.
392	#[inline]
393	pub fn watermarks(&self) -> (CommitVersion, CommitVersion) {
394		self.multi.watermarks()
395	}
396
397	#[inline]
398	pub fn executor(&self) -> Executor {
399		self.executor.clone()
400	}
401
402	/// Register a user-defined virtual table.
403	///
404	/// The virtual table will be available for queries using the given namespace and name.
405	///
406	/// # Arguments
407	///
408	/// * `namespace` - The namespace name (e.g., "default", "my_namespace")
409	/// * `name` - The table name
410	/// * `table` - The virtual table implementation
411	///
412	/// # Returns
413	///
414	/// The assigned `TableVirtualId` on success.
415	///
416	/// # Example
417	///
418	/// ```ignore
419	/// use reifydb_engine::table_virtual::{TableVirtualUser, TableVirtualUserColumnDef};
420	/// use reifydb_type::Type;
421	/// use reifydb_core::value::Value;
422	///
423	/// #[derive(Clone)]
424	/// struct MyTable;
425	///
426	/// impl TableVirtualUser for MyTable {
427	///     fn columns(&self) -> Vec<TableVirtualUserColumnDef> {
428	///         vec![TableVirtualUserColumnDef::new("id", Type::Uint8)]
429	///     }
430	///     fn rows(&self) -> Vec<Vec<Value>> {
431	///         vec![vec![Value::Uint8(1)], vec![Value::Uint8(2)]]
432	///     }
433	/// }
434	///
435	/// let id = engine.register_virtual_table("default", "my_table", MyTable)?;
436	/// ```
437	pub fn register_virtual_table<T: TableVirtualUser + Clone>(
438		&self,
439		namespace: &str,
440		name: &str,
441		table: T,
442	) -> crate::Result<TableVirtualId> {
443		// Look up namespace by name (use max u64 to get latest version)
444		let ns_def =
445			self.catalog.find_namespace_by_name(namespace, CommitVersion(u64::MAX)).ok_or_else(|| {
446				reifydb_type::Error(reifydb_type::diagnostic::catalog::namespace_not_found(
447					Fragment::None,
448					namespace,
449				))
450			})?;
451
452		// Allocate a new table ID
453		let table_id = self.executor.virtual_table_registry.allocate_id();
454
455		// Convert user columns to internal column definitions
456		let table_columns = table.columns();
457		let columns = convert_table_virtual_user_columns_to_column_defs(&table_columns);
458
459		// Create the table definition
460		let def = Arc::new(TableVirtualDef {
461			id: table_id,
462			namespace: ns_def.id,
463			name: name.to_string(),
464			columns,
465		});
466
467		// Register in catalog (for resolver lookups)
468		self.catalog.register_table_virtual_user(def.clone())?;
469
470		// Create and register the factory (for runtime instantiation)
471		let factory = Arc::new(SimpleVirtualTableFactory::new(table, def.clone()));
472		self.executor.virtual_table_registry.register(ns_def.id, name.to_string(), factory);
473
474		Ok(table_id)
475	}
476
477	/// Unregister a user-defined virtual table.
478	///
479	/// # Arguments
480	///
481	/// * `namespace` - The namespace name
482	/// * `name` - The table name
483	pub fn unregister_virtual_table(&self, namespace: &str, name: &str) -> crate::Result<()> {
484		// Look up namespace by name (use max u64 to get latest version)
485		let ns_def =
486			self.catalog.find_namespace_by_name(namespace, CommitVersion(u64::MAX)).ok_or_else(|| {
487				reifydb_type::Error(reifydb_type::diagnostic::catalog::namespace_not_found(
488					Fragment::None,
489					namespace,
490				))
491			})?;
492
493		// Unregister from catalog
494		self.catalog.unregister_table_virtual_user(ns_def.id, name)?;
495
496		// Unregister from executor registry
497		self.executor.virtual_table_registry.unregister(ns_def.id, name);
498
499		Ok(())
500	}
501
502	/// Register a user-defined virtual table using an iterator-based implementation.
503	///
504	/// This method is for tables that stream data in batches, which is more efficient
505	/// for large datasets. The creator function is called once per query to create
506	/// a fresh iterator instance.
507	///
508	/// # Arguments
509	///
510	/// * `namespace` - The namespace to register the table in
511	/// * `name` - The table name
512	/// * `creator` - A function that creates a new iterator instance for each query
513	///
514	/// # Returns
515	///
516	/// The ID of the registered virtual table
517	pub fn register_virtual_table_iterator<F>(
518		&self,
519		namespace: &str,
520		name: &str,
521		creator: F,
522	) -> crate::Result<TableVirtualId>
523	where
524		F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
525	{
526		// Look up namespace by name (use max u64 to get latest version)
527		let ns_def =
528			self.catalog.find_namespace_by_name(namespace, CommitVersion(u64::MAX)).ok_or_else(|| {
529				reifydb_type::Error(reifydb_type::diagnostic::catalog::namespace_not_found(
530					Fragment::None,
531					namespace,
532				))
533			})?;
534
535		// Allocate a new table ID
536		let table_id = self.executor.virtual_table_registry.allocate_id();
537
538		// Get columns from a temporary instance
539		let temp_iter = creator();
540		let table_columns = temp_iter.columns();
541		let columns = convert_table_virtual_user_columns_to_column_defs(&table_columns);
542
543		// Create the table definition
544		let def = Arc::new(TableVirtualDef {
545			id: table_id,
546			namespace: ns_def.id,
547			name: name.to_string(),
548			columns,
549		});
550
551		// Register in catalog (for resolver lookups)
552		self.catalog.register_table_virtual_user(def.clone())?;
553
554		// Create and register the factory (for runtime instantiation)
555		let factory = Arc::new(IteratorVirtualTableFactory::new(creator, def.clone()));
556		self.executor.virtual_table_registry.register(ns_def.id, name.to_string(), factory);
557
558		Ok(table_id)
559	}
560
561	/// Start a bulk insert operation with full validation.
562	///
563	/// This provides a fluent API for fast bulk inserts that bypasses RQL parsing.
564	/// All inserts within a single builder execute in one transaction.
565	///
566	/// # Example
567	///
568	/// ```ignore
569	/// use reifydb_type::params;
570	///
571	/// engine.bulk_insert(&identity)
572	///     .table("namespace.users")
573	///         .row(params!{ id: 1, name: "Alice" })
574	///         .row(params!{ id: 2, name: "Bob" })
575	///         .done()
576	///     .execute()?;
577	/// ```
578	pub fn bulk_insert<'e>(
579		&'e self,
580		identity: &'e Identity,
581	) -> crate::bulk_insert::BulkInsertBuilder<'e, crate::bulk_insert::Validated> {
582		crate::bulk_insert::BulkInsertBuilder::new(self, identity)
583	}
584
585	/// Start a bulk insert operation with validation disabled (trusted mode).
586	///
587	/// Use this for pre-validated internal data where constraint validation
588	/// can be skipped for maximum performance.
589	///
590	/// # Safety
591	///
592	/// The caller is responsible for ensuring the data conforms to the
593	/// schema constraints. Invalid data may cause undefined behavior.
594	pub fn bulk_insert_trusted<'e>(
595		&'e self,
596		identity: &'e Identity,
597	) -> crate::bulk_insert::BulkInsertBuilder<'e, crate::bulk_insert::Trusted> {
598		crate::bulk_insert::BulkInsertBuilder::new_trusted(self, identity)
599	}
600}
601
602/// Convert user column definitions to internal ColumnDef format.
603fn convert_table_virtual_user_columns_to_column_defs(columns: &[TableVirtualUserColumnDef]) -> Vec<ColumnDef> {
604	columns.iter()
605		.enumerate()
606		.map(|(idx, col)| {
607			// Note: For virtual tables, we use unconstrained for all types.
608			// The nullable field is still available for documentation purposes.
609			let constraint = TypeConstraint::unconstrained(col.data_type);
610			ColumnDef {
611				id: ColumnId(idx as u64),
612				name: col.name.clone(),
613				constraint,
614				policies: vec![],
615				index: ColumnIndex(idx as u8),
616				auto_increment: false,
617				dictionary_id: None,
618			}
619		})
620		.collect()
621}