1use std::{ops::Deref, rc::Rc, sync::Arc, time::Duration};
5
6use reifydb_catalog::MaterializedCatalog;
7use reifydb_core::{
8 CommitVersion, Frame,
9 event::{Event, EventBus},
10 interceptor::InterceptorFactory,
11 interface::{
12 ColumnDef, ColumnId, ColumnIndex, Command, Engine as EngineInterface, ExecuteCommand, ExecuteQuery,
13 Identity, MultiVersionTransaction, Params, Query, TableVirtualDef, TableVirtualId, WithEventBus,
14 },
15};
16use reifydb_transaction::{
17 cdc::TransactionCdc,
18 multi::{AwaitWatermarkError, TransactionMultiVersion},
19 single::TransactionSingleVersion,
20};
21use reifydb_type::{OwnedFragment, TypeConstraint};
22use tracing::instrument;
23
24use crate::{
25 execute::Executor,
26 function::{Functions, generator, math},
27 interceptor::{CatalogEventInterceptor, materialized_catalog::MaterializedCatalogInterceptor},
28 table_virtual::{
29 IteratorVirtualTableFactory, SimpleVirtualTableFactory, TableVirtualUser, TableVirtualUserColumnDef,
30 TableVirtualUserIterator,
31 system::{FlowOperatorEventListener, FlowOperatorStore},
32 },
33 transaction::{StandardCommandTransaction, StandardQueryTransaction},
34};
35
36pub struct StandardEngine(Arc<EngineInner>);
37
38impl WithEventBus for StandardEngine {
39 fn event_bus(&self) -> &EventBus {
40 &self.event_bus
41 }
42}
43
44impl EngineInterface for StandardEngine {
45 type Command = StandardCommandTransaction;
46 type Query = StandardQueryTransaction;
47
48 #[instrument(level = "debug", skip(self))]
49 fn begin_command(&self) -> crate::Result<Self::Command> {
50 let mut interceptors = self.interceptors.create();
51
52 interceptors.post_commit.add(Rc::new(MaterializedCatalogInterceptor::new(self.catalog.clone())));
53 interceptors
54 .post_commit
55 .add(Rc::new(CatalogEventInterceptor::new(self.event_bus.clone(), self.catalog.clone())));
56
57 StandardCommandTransaction::new(
58 self.multi.clone(),
59 self.single.clone(),
60 self.cdc.clone(),
61 self.event_bus.clone(),
62 self.catalog.clone(),
63 interceptors,
64 )
65 }
66
67 #[instrument(level = "debug", skip(self))]
68 fn begin_query(&self) -> crate::Result<Self::Query> {
69 Ok(StandardQueryTransaction::new(
70 self.multi.begin_query()?,
71 self.single.clone(),
72 self.cdc.clone(),
73 self.catalog.clone(),
74 ))
75 }
76
77 #[instrument(level = "info", skip(self, params), fields(rql = %rql))]
78 fn command_as(&self, identity: &Identity, rql: &str, params: Params) -> crate::Result<Vec<Frame>> {
79 let mut txn = self.begin_command()?;
80 let result = self.execute_command(
81 &mut txn,
82 Command {
83 rql,
84 params,
85 identity,
86 },
87 )?;
88 txn.commit()?;
89 Ok(result)
90 }
91
92 #[instrument(level = "info", skip(self, params), fields(rql = %rql))]
93 fn query_as(&self, identity: &Identity, rql: &str, params: Params) -> crate::Result<Vec<Frame>> {
94 let mut txn = self.begin_query()?;
95 let result = self.execute_query(
96 &mut txn,
97 Query {
98 rql,
99 params,
100 identity,
101 },
102 )?;
103 Ok(result)
104 }
105}
106
107impl ExecuteCommand<StandardCommandTransaction> for StandardEngine {
108 #[inline]
109 fn execute_command(&self, txn: &mut StandardCommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
110 self.executor.execute_command(txn, cmd)
111 }
112}
113
114impl ExecuteQuery<StandardQueryTransaction> for StandardEngine {
115 #[inline]
116 fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
117 self.executor.execute_query(txn, qry)
118 }
119}
120
121impl Clone for StandardEngine {
122 fn clone(&self) -> Self {
123 Self(self.0.clone())
124 }
125}
126
127impl Deref for StandardEngine {
128 type Target = EngineInner;
129
130 fn deref(&self) -> &Self::Target {
131 &self.0
132 }
133}
134
135pub struct EngineInner {
136 multi: TransactionMultiVersion,
137 single: TransactionSingleVersion,
138 cdc: TransactionCdc,
139 event_bus: EventBus,
140 executor: Executor,
141 interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
142 catalog: MaterializedCatalog,
143 flow_operator_store: FlowOperatorStore,
144}
145
146impl StandardEngine {
147 pub fn new(
148 multi: TransactionMultiVersion,
149 single: TransactionSingleVersion,
150 cdc: TransactionCdc,
151 event_bus: EventBus,
152 interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
153 catalog: MaterializedCatalog,
154 ) -> Self {
155 Self::with_functions(multi, single, cdc, event_bus, interceptors, catalog, None)
156 }
157
158 pub fn with_functions(
159 multi: TransactionMultiVersion,
160 single: TransactionSingleVersion,
161 cdc: TransactionCdc,
162 event_bus: EventBus,
163 interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
164 catalog: MaterializedCatalog,
165 custom_functions: Option<Functions>,
166 ) -> Self {
167 let functions = custom_functions.unwrap_or_else(|| {
168 Functions::builder()
169 .register_aggregate("math::sum", math::aggregate::Sum::new)
170 .register_aggregate("math::min", math::aggregate::Min::new)
171 .register_aggregate("math::max", math::aggregate::Max::new)
172 .register_aggregate("math::avg", math::aggregate::Avg::new)
173 .register_aggregate("math::count", math::aggregate::Count::new)
174 .register_scalar("math::abs", math::scalar::Abs::new)
175 .register_scalar("math::avg", math::scalar::Avg::new)
176 .register_generator("generate_series", generator::GenerateSeries::new)
177 .build()
178 });
179
180 let flow_operator_store = FlowOperatorStore::new();
182 let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
183 event_bus.register(listener);
184
185 Self(Arc::new(EngineInner {
186 multi,
187 single,
188 cdc,
189 event_bus,
190 executor: Executor::new(functions, flow_operator_store.clone()),
191 interceptors,
192 catalog,
193 flow_operator_store,
194 }))
195 }
196
197 #[inline]
198 pub fn multi(&self) -> &TransactionMultiVersion {
199 &self.multi
200 }
201
202 #[inline]
203 pub fn multi_owned(&self) -> TransactionMultiVersion {
204 self.multi.clone()
205 }
206
207 #[inline]
208 pub fn single(&self) -> &TransactionSingleVersion {
209 &self.single
210 }
211
212 #[inline]
213 pub fn single_owned(&self) -> TransactionSingleVersion {
214 self.single.clone()
215 }
216
217 #[inline]
218 pub fn cdc(&self) -> &TransactionCdc {
219 &self.cdc
220 }
221
222 #[inline]
223 pub fn cdc_owned(&self) -> TransactionCdc {
224 self.cdc.clone()
225 }
226
227 #[inline]
228 pub fn emit<E: Event>(&self, event: E) {
229 self.event_bus.emit(event)
230 }
231
232 #[inline]
233 pub fn catalog(&self) -> &MaterializedCatalog {
234 &self.catalog
235 }
236
237 #[inline]
238 pub fn flow_operator_store(&self) -> &FlowOperatorStore {
239 &self.flow_operator_store
240 }
241
242 #[inline]
244 pub fn current_version(&self) -> crate::Result<CommitVersion> {
245 self.multi.current_version()
246 }
247
248 #[inline]
255 pub fn try_wait_for_watermark(
256 &self,
257 version: CommitVersion,
258 timeout: Duration,
259 ) -> Result<(), AwaitWatermarkError> {
260 self.multi.try_wait_for_watermark(version, timeout)
261 }
262
263 #[inline]
267 pub fn done_until(&self) -> CommitVersion {
268 self.multi.done_until()
269 }
270
271 #[inline]
273 pub fn watermarks(&self) -> (CommitVersion, CommitVersion) {
274 self.multi.watermarks()
275 }
276
277 #[inline]
278 pub fn executor(&self) -> Executor {
279 self.executor.clone()
280 }
281
282 pub fn register_virtual_table<T: TableVirtualUser + Clone>(
318 &self,
319 namespace: &str,
320 name: &str,
321 table: T,
322 ) -> crate::Result<TableVirtualId> {
323 let ns_def =
325 self.catalog.find_namespace_by_name(namespace, CommitVersion(u64::MAX)).ok_or_else(|| {
326 reifydb_type::Error(reifydb_type::diagnostic::catalog::namespace_not_found(
327 OwnedFragment::None,
328 namespace,
329 ))
330 })?;
331
332 let table_id = self.executor.virtual_table_registry.allocate_id();
334
335 let table_columns = table.columns();
337 let columns = convert_table_virtual_user_columns_to_column_defs(&table_columns);
338
339 let def = Arc::new(TableVirtualDef {
341 id: table_id,
342 namespace: ns_def.id,
343 name: name.to_string(),
344 columns,
345 });
346
347 self.catalog.register_table_virtual_user(def.clone())?;
349
350 let factory = Arc::new(SimpleVirtualTableFactory::new(table, def.clone()));
352 self.executor.virtual_table_registry.register(ns_def.id, name.to_string(), factory);
353
354 Ok(table_id)
355 }
356
357 pub fn unregister_virtual_table(&self, namespace: &str, name: &str) -> crate::Result<()> {
364 let ns_def =
366 self.catalog.find_namespace_by_name(namespace, CommitVersion(u64::MAX)).ok_or_else(|| {
367 reifydb_type::Error(reifydb_type::diagnostic::catalog::namespace_not_found(
368 OwnedFragment::None,
369 namespace,
370 ))
371 })?;
372
373 self.catalog.unregister_table_virtual_user(ns_def.id, name)?;
375
376 self.executor.virtual_table_registry.unregister(ns_def.id, name);
378
379 Ok(())
380 }
381
382 pub fn register_virtual_table_iterator<F>(
398 &self,
399 namespace: &str,
400 name: &str,
401 creator: F,
402 ) -> crate::Result<TableVirtualId>
403 where
404 F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
405 {
406 let ns_def =
408 self.catalog.find_namespace_by_name(namespace, CommitVersion(u64::MAX)).ok_or_else(|| {
409 reifydb_type::Error(reifydb_type::diagnostic::catalog::namespace_not_found(
410 OwnedFragment::None,
411 namespace,
412 ))
413 })?;
414
415 let table_id = self.executor.virtual_table_registry.allocate_id();
417
418 let temp_iter = creator();
420 let table_columns = temp_iter.columns();
421 let columns = convert_table_virtual_user_columns_to_column_defs(&table_columns);
422
423 let def = Arc::new(TableVirtualDef {
425 id: table_id,
426 namespace: ns_def.id,
427 name: name.to_string(),
428 columns,
429 });
430
431 self.catalog.register_table_virtual_user(def.clone())?;
433
434 let factory = Arc::new(IteratorVirtualTableFactory::new(creator, def.clone()));
436 self.executor.virtual_table_registry.register(ns_def.id, name.to_string(), factory);
437
438 Ok(table_id)
439 }
440}
441
442fn convert_table_virtual_user_columns_to_column_defs(columns: &[TableVirtualUserColumnDef]) -> Vec<ColumnDef> {
444 columns.iter()
445 .enumerate()
446 .map(|(idx, col)| {
447 let constraint = TypeConstraint::unconstrained(col.data_type);
450 ColumnDef {
451 id: ColumnId(idx as u64),
452 name: col.name.clone(),
453 constraint,
454 policies: vec![],
455 index: ColumnIndex(idx as u8),
456 auto_increment: false,
457 dictionary_id: None,
458 }
459 })
460 .collect()
461}