reifydb_engine/
engine.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{ops::Deref, rc::Rc, sync::Arc};
5
6use reifydb_catalog::MaterializedCatalog;
7use reifydb_core::{
8	Frame,
9	event::{Event, EventBus},
10	interceptor::InterceptorFactory,
11	interface::{
12		Command, Engine as EngineInterface, ExecuteCommand, ExecuteQuery, Identity, MultiVersionTransaction,
13		Params, Query, WithEventBus,
14	},
15};
16use reifydb_transaction::{cdc::TransactionCdc, multi::TransactionMultiVersion, single::TransactionSingleVersion};
17
18use crate::{
19	execute::Executor,
20	function::{Functions, generator, math},
21	interceptor::materialized_catalog::MaterializedCatalogInterceptor,
22	table_virtual::system::{FlowOperatorEventListener, FlowOperatorStore},
23	transaction::{StandardCommandTransaction, StandardQueryTransaction},
24};
25
26pub struct StandardEngine(Arc<EngineInner>);
27
28impl WithEventBus for StandardEngine {
29	fn event_bus(&self) -> &EventBus {
30		&self.event_bus
31	}
32}
33
34impl EngineInterface for StandardEngine {
35	type Command = StandardCommandTransaction;
36	type Query = StandardQueryTransaction;
37
38	fn begin_command(&self) -> crate::Result<Self::Command> {
39		let mut interceptors = self.interceptors.create();
40
41		interceptors.post_commit.add(Rc::new(MaterializedCatalogInterceptor::new(self.catalog.clone())));
42
43		StandardCommandTransaction::new(
44			self.multi.clone(),
45			self.single.clone(),
46			self.cdc.clone(),
47			self.event_bus.clone(),
48			self.catalog.clone(),
49			interceptors,
50		)
51	}
52
53	fn begin_query(&self) -> crate::Result<Self::Query> {
54		Ok(StandardQueryTransaction::new(
55			self.multi.begin_query()?,
56			self.single.clone(),
57			self.cdc.clone(),
58			self.catalog.clone(),
59		))
60	}
61
62	fn command_as(&self, identity: &Identity, rql: &str, params: Params) -> crate::Result<Vec<Frame>> {
63		let mut txn = self.begin_command()?;
64		let result = self.execute_command(
65			&mut txn,
66			Command {
67				rql,
68				params,
69				identity,
70			},
71		)?;
72		txn.commit()?;
73		Ok(result)
74	}
75
76	fn query_as(&self, identity: &Identity, rql: &str, params: Params) -> crate::Result<Vec<Frame>> {
77		let mut txn = self.begin_query()?;
78		let result = self.execute_query(
79			&mut txn,
80			Query {
81				rql,
82				params,
83				identity,
84			},
85		)?;
86		Ok(result)
87	}
88}
89
90impl ExecuteCommand<StandardCommandTransaction> for StandardEngine {
91	#[inline]
92	fn execute_command(&self, txn: &mut StandardCommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
93		self.executor.execute_command(txn, cmd)
94	}
95}
96
97impl ExecuteQuery<StandardQueryTransaction> for StandardEngine {
98	#[inline]
99	fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
100		self.executor.execute_query(txn, qry)
101	}
102}
103
104impl Clone for StandardEngine {
105	fn clone(&self) -> Self {
106		Self(self.0.clone())
107	}
108}
109
110impl Deref for StandardEngine {
111	type Target = EngineInner;
112
113	fn deref(&self) -> &Self::Target {
114		&self.0
115	}
116}
117
118pub struct EngineInner {
119	multi: TransactionMultiVersion,
120	single: TransactionSingleVersion,
121	cdc: TransactionCdc,
122	event_bus: EventBus,
123	executor: Executor,
124	interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
125	catalog: MaterializedCatalog,
126	flow_operator_store: FlowOperatorStore,
127}
128
129impl StandardEngine {
130	pub fn new(
131		multi: TransactionMultiVersion,
132		single: TransactionSingleVersion,
133		cdc: TransactionCdc,
134		event_bus: EventBus,
135		interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
136		catalog: MaterializedCatalog,
137	) -> Self {
138		Self::with_functions(multi, single, cdc, event_bus, interceptors, catalog, None)
139	}
140
141	pub fn with_functions(
142		multi: TransactionMultiVersion,
143		single: TransactionSingleVersion,
144		cdc: TransactionCdc,
145		event_bus: EventBus,
146		interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
147		catalog: MaterializedCatalog,
148		custom_functions: Option<Functions>,
149	) -> Self {
150		let functions = custom_functions.unwrap_or_else(|| {
151			Functions::builder()
152				.register_aggregate("math::sum", math::aggregate::Sum::new)
153				.register_aggregate("math::min", math::aggregate::Min::new)
154				.register_aggregate("math::max", math::aggregate::Max::new)
155				.register_aggregate("math::avg", math::aggregate::Avg::new)
156				.register_aggregate("math::count", math::aggregate::Count::new)
157				.register_scalar("math::abs", math::scalar::Abs::new)
158				.register_scalar("math::avg", math::scalar::Avg::new)
159				.register_generator("generate_series", generator::GenerateSeries::new)
160				.build()
161		});
162
163		// Create the flow operator store and register the event listener
164		let flow_operator_store = FlowOperatorStore::new();
165		let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
166		event_bus.register(listener);
167
168		Self(Arc::new(EngineInner {
169			multi,
170			single,
171			cdc,
172			event_bus,
173			executor: Executor::new(functions, flow_operator_store.clone()),
174			interceptors,
175			catalog,
176			flow_operator_store,
177		}))
178	}
179
180	#[inline]
181	pub fn multi(&self) -> &TransactionMultiVersion {
182		&self.multi
183	}
184
185	#[inline]
186	pub fn multi_owned(&self) -> TransactionMultiVersion {
187		self.multi.clone()
188	}
189
190	#[inline]
191	pub fn single(&self) -> &TransactionSingleVersion {
192		&self.single
193	}
194
195	#[inline]
196	pub fn single_owned(&self) -> TransactionSingleVersion {
197		self.single.clone()
198	}
199
200	#[inline]
201	pub fn cdc(&self) -> &TransactionCdc {
202		&self.cdc
203	}
204
205	#[inline]
206	pub fn cdc_owned(&self) -> TransactionCdc {
207		self.cdc.clone()
208	}
209
210	#[inline]
211	pub fn emit<E: Event>(&self, event: E) {
212		self.event_bus.emit(event)
213	}
214
215	#[inline]
216	pub fn catalog(&self) -> &MaterializedCatalog {
217		&self.catalog
218	}
219
220	#[inline]
221	pub fn flow_operator_store(&self) -> &FlowOperatorStore {
222		&self.flow_operator_store
223	}
224
225	#[inline]
226	pub fn executor(&self) -> Executor {
227		self.executor.clone()
228	}
229}