1use std::{ops::Deref, sync::Arc, time::Duration};
5
6use async_trait::async_trait;
7use reifydb_catalog::MaterializedCatalog;
8use reifydb_core::{
9 CommitVersion, Frame,
10 event::{Event, EventBus},
11 interceptor::InterceptorFactory,
12 interface::{
13 ColumnDef, ColumnId, ColumnIndex, Command, Engine as EngineInterface, ExecuteCommand, ExecuteQuery,
14 Identity, Params, Query, TableVirtualDef, TableVirtualId, WithEventBus,
15 },
16 stream::{SendableFrameStream, StreamError},
17};
18use reifydb_transaction::{
19 cdc::TransactionCdc,
20 multi::{AwaitWatermarkError, TransactionMultiVersion},
21 single::TransactionSingle,
22};
23use reifydb_type::{Fragment, TypeConstraint};
24use tokio::spawn;
25use tokio_util::sync::CancellationToken;
26use tracing::instrument;
27
28use crate::{
29 execute::Executor,
30 function::{Functions, generator, math},
31 interceptor::{CatalogEventInterceptor, materialized_catalog::MaterializedCatalogInterceptor},
32 stream::{ChannelFrameStream, FrameSender},
33 table_virtual::{
34 IteratorVirtualTableFactory, SimpleVirtualTableFactory, TableVirtualUser, TableVirtualUserColumnDef,
35 TableVirtualUserIterator,
36 system::{FlowOperatorEventListener, FlowOperatorStore},
37 },
38 transaction::{StandardCommandTransaction, StandardQueryTransaction},
39};
40
41pub struct StandardEngine(Arc<EngineInner>);
42
43impl WithEventBus for StandardEngine {
44 fn event_bus(&self) -> &EventBus {
45 &self.event_bus
46 }
47}
48
49#[async_trait]
50impl EngineInterface for StandardEngine {
51 type Command = StandardCommandTransaction;
52 type Query = StandardQueryTransaction;
53
54 #[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
55 async fn begin_command(&self) -> crate::Result<Self::Command> {
56 let mut interceptors = self.interceptors.create();
57
58 interceptors.post_commit.add(Arc::new(MaterializedCatalogInterceptor::new(self.catalog.clone())));
59 interceptors
60 .post_commit
61 .add(Arc::new(CatalogEventInterceptor::new(self.event_bus.clone(), self.catalog.clone())));
62
63 StandardCommandTransaction::new(
64 self.multi.clone(),
65 self.single.clone(),
66 self.cdc.clone(),
67 self.event_bus.clone(),
68 self.catalog.clone(),
69 interceptors,
70 )
71 .await
72 }
73
74 #[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
75 async fn begin_query(&self) -> crate::Result<Self::Query> {
76 Ok(StandardQueryTransaction::new(
77 self.multi.begin_query().await?,
78 self.single.clone(),
79 self.cdc.clone(),
80 self.catalog.clone(),
81 ))
82 }
83
84 #[instrument(name = "engine::command", level = "info", skip(self, params), fields(rql = %rql))]
85 fn command_as(&self, identity: &Identity, rql: &str, params: Params) -> SendableFrameStream {
86 let engine = self.clone();
87 let identity = identity.clone();
88 let rql = rql.to_string();
89 let cancel_token = CancellationToken::new();
90
91 let (sender, stream) = ChannelFrameStream::new(8, cancel_token.clone());
92
93 spawn(execute_command(engine, identity, rql, params, sender, cancel_token));
94
95 Box::pin(stream)
96 }
97
98 #[instrument(name = "engine::query", level = "info", skip(self, params), fields(rql = %rql))]
99 fn query_as(&self, identity: &Identity, rql: &str, params: Params) -> SendableFrameStream {
100 let engine = self.clone();
101 let identity = identity.clone();
102 let rql = rql.to_string();
103 let cancel_token = CancellationToken::new();
104
105 let (sender, stream) = ChannelFrameStream::new(8, cancel_token.clone());
106
107 spawn(execute_query(engine, identity, rql, params, sender, cancel_token));
108
109 Box::pin(stream)
110 }
111}
112
113async fn execute_command(
115 engine: StandardEngine,
116 identity: Identity,
117 rql: String,
118 params: Params,
119 sender: FrameSender,
120 cancel_token: CancellationToken,
121) {
122 if cancel_token.is_cancelled() {
124 return;
125 }
126
127 let txn_result = engine.begin_command().await;
129 let mut txn = match txn_result {
130 Ok(txn) => txn,
131 Err(e) => {
132 let _ = sender.try_send(Err(StreamError::query_with_statement(e, rql.clone())));
133 return;
134 }
135 };
136
137 let result = engine
139 .executor
140 .execute_command(
141 &mut txn,
142 Command {
143 rql: &rql,
144 params,
145 identity: &identity,
146 },
147 )
148 .await;
149
150 match result {
151 Ok(frames) => {
152 if let Err(e) = txn.commit().await {
154 let _ = sender.try_send(Err(StreamError::query_with_statement(e, rql)));
155 return;
156 }
157
158 for frame in frames {
160 if cancel_token.is_cancelled() {
161 return;
162 }
163 if sender.send(Ok(frame)).await.is_err() {
164 return; }
166 }
167 }
168 Err(e) => {
169 let _ = sender.try_send(Err(StreamError::query_with_statement(e, rql)));
171 }
172 }
173}
174
175async fn execute_query(
177 engine: StandardEngine,
178 identity: Identity,
179 rql: String,
180 params: Params,
181 sender: FrameSender,
182 cancel_token: CancellationToken,
183) {
184 if cancel_token.is_cancelled() {
186 return;
187 }
188
189 let txn_result = engine.begin_query().await;
191 let mut txn = match txn_result {
192 Ok(txn) => txn,
193 Err(e) => {
194 let _ = sender.try_send(Err(StreamError::query_with_statement(e, rql.clone())));
195 return;
196 }
197 };
198
199 let result = engine
201 .executor
202 .execute_query(
203 &mut txn,
204 Query {
205 rql: &rql,
206 params,
207 identity: &identity,
208 },
209 )
210 .await;
211
212 match result {
213 Ok(frames) => {
214 for frame in frames {
216 if cancel_token.is_cancelled() {
217 return;
218 }
219 if sender.send(Ok(frame)).await.is_err() {
220 return; }
222 }
223 }
224 Err(e) => {
225 let _ = sender.try_send(Err(StreamError::query_with_statement(e, rql)));
226 }
227 }
228}
229
230#[async_trait]
231impl ExecuteCommand<StandardCommandTransaction> for StandardEngine {
232 #[inline]
233 async fn execute_command(
234 &self,
235 txn: &mut StandardCommandTransaction,
236 cmd: Command<'_>,
237 ) -> crate::Result<Vec<Frame>> {
238 self.executor.execute_command(txn, cmd).await
239 }
240}
241
242#[async_trait]
243impl ExecuteQuery<StandardQueryTransaction> for StandardEngine {
244 #[inline]
245 async fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
246 self.executor.execute_query(txn, qry).await
247 }
248}
249
250impl Clone for StandardEngine {
251 fn clone(&self) -> Self {
252 Self(self.0.clone())
253 }
254}
255
256impl Deref for StandardEngine {
257 type Target = EngineInner;
258
259 fn deref(&self) -> &Self::Target {
260 &self.0
261 }
262}
263
264pub struct EngineInner {
265 multi: TransactionMultiVersion,
266 single: TransactionSingle,
267 cdc: TransactionCdc,
268 event_bus: EventBus,
269 executor: Executor,
270 interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
271 catalog: MaterializedCatalog,
272 flow_operator_store: FlowOperatorStore,
273}
274
275impl StandardEngine {
276 pub async fn new(
277 multi: TransactionMultiVersion,
278 single: TransactionSingle,
279 cdc: TransactionCdc,
280 event_bus: EventBus,
281 interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
282 catalog: MaterializedCatalog,
283 custom_functions: Option<Functions>,
284 ) -> Self {
285 let functions = custom_functions.unwrap_or_else(|| {
286 Functions::builder()
287 .register_aggregate("math::sum", math::aggregate::Sum::new)
288 .register_aggregate("math::min", math::aggregate::Min::new)
289 .register_aggregate("math::max", math::aggregate::Max::new)
290 .register_aggregate("math::avg", math::aggregate::Avg::new)
291 .register_aggregate("math::count", math::aggregate::Count::new)
292 .register_scalar("math::abs", math::scalar::Abs::new)
293 .register_scalar("math::avg", math::scalar::Avg::new)
294 .register_generator("generate_series", generator::GenerateSeries::new)
295 .build()
296 });
297
298 let flow_operator_store = FlowOperatorStore::new();
300 let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
301 event_bus.register(listener).await;
302
303 let stats_tracker = multi.store().stats_tracker().clone();
304
305 Self(Arc::new(EngineInner {
306 multi,
307 single,
308 cdc,
309 event_bus,
310 executor: Executor::new(functions, flow_operator_store.clone(), stats_tracker),
311 interceptors,
312 catalog,
313 flow_operator_store,
314 }))
315 }
316
317 #[inline]
318 pub fn multi(&self) -> &TransactionMultiVersion {
319 &self.multi
320 }
321
322 #[inline]
323 pub fn multi_owned(&self) -> TransactionMultiVersion {
324 self.multi.clone()
325 }
326
327 #[inline]
328 pub fn single(&self) -> &TransactionSingle {
329 &self.single
330 }
331
332 #[inline]
333 pub fn single_owned(&self) -> TransactionSingle {
334 self.single.clone()
335 }
336
337 #[inline]
338 pub fn cdc(&self) -> &TransactionCdc {
339 &self.cdc
340 }
341
342 #[inline]
343 pub fn cdc_owned(&self) -> TransactionCdc {
344 self.cdc.clone()
345 }
346
347 #[inline]
348 pub async fn emit<E: Event>(&self, event: E) {
349 self.event_bus.emit(event).await
350 }
351
352 #[inline]
353 pub fn catalog(&self) -> &MaterializedCatalog {
354 &self.catalog
355 }
356
357 #[inline]
358 pub fn flow_operator_store(&self) -> &FlowOperatorStore {
359 &self.flow_operator_store
360 }
361
362 #[inline]
364 pub async fn current_version(&self) -> crate::Result<CommitVersion> {
365 self.multi.current_version().await
366 }
367
368 #[inline]
375 pub async fn try_wait_for_watermark(
376 &self,
377 version: CommitVersion,
378 timeout: Duration,
379 ) -> Result<(), AwaitWatermarkError> {
380 self.multi.try_wait_for_watermark(version, timeout).await
381 }
382
383 #[inline]
387 pub fn done_until(&self) -> CommitVersion {
388 self.multi.done_until()
389 }
390
391 #[inline]
393 pub fn watermarks(&self) -> (CommitVersion, CommitVersion) {
394 self.multi.watermarks()
395 }
396
397 #[inline]
398 pub fn executor(&self) -> Executor {
399 self.executor.clone()
400 }
401
402 pub fn register_virtual_table<T: TableVirtualUser + Clone>(
438 &self,
439 namespace: &str,
440 name: &str,
441 table: T,
442 ) -> crate::Result<TableVirtualId> {
443 let ns_def =
445 self.catalog.find_namespace_by_name(namespace, CommitVersion(u64::MAX)).ok_or_else(|| {
446 reifydb_type::Error(reifydb_type::diagnostic::catalog::namespace_not_found(
447 Fragment::None,
448 namespace,
449 ))
450 })?;
451
452 let table_id = self.executor.virtual_table_registry.allocate_id();
454
455 let table_columns = table.columns();
457 let columns = convert_table_virtual_user_columns_to_column_defs(&table_columns);
458
459 let def = Arc::new(TableVirtualDef {
461 id: table_id,
462 namespace: ns_def.id,
463 name: name.to_string(),
464 columns,
465 });
466
467 self.catalog.register_table_virtual_user(def.clone())?;
469
470 let factory = Arc::new(SimpleVirtualTableFactory::new(table, def.clone()));
472 self.executor.virtual_table_registry.register(ns_def.id, name.to_string(), factory);
473
474 Ok(table_id)
475 }
476
477 pub fn unregister_virtual_table(&self, namespace: &str, name: &str) -> crate::Result<()> {
484 let ns_def =
486 self.catalog.find_namespace_by_name(namespace, CommitVersion(u64::MAX)).ok_or_else(|| {
487 reifydb_type::Error(reifydb_type::diagnostic::catalog::namespace_not_found(
488 Fragment::None,
489 namespace,
490 ))
491 })?;
492
493 self.catalog.unregister_table_virtual_user(ns_def.id, name)?;
495
496 self.executor.virtual_table_registry.unregister(ns_def.id, name);
498
499 Ok(())
500 }
501
502 pub fn register_virtual_table_iterator<F>(
518 &self,
519 namespace: &str,
520 name: &str,
521 creator: F,
522 ) -> crate::Result<TableVirtualId>
523 where
524 F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
525 {
526 let ns_def =
528 self.catalog.find_namespace_by_name(namespace, CommitVersion(u64::MAX)).ok_or_else(|| {
529 reifydb_type::Error(reifydb_type::diagnostic::catalog::namespace_not_found(
530 Fragment::None,
531 namespace,
532 ))
533 })?;
534
535 let table_id = self.executor.virtual_table_registry.allocate_id();
537
538 let temp_iter = creator();
540 let table_columns = temp_iter.columns();
541 let columns = convert_table_virtual_user_columns_to_column_defs(&table_columns);
542
543 let def = Arc::new(TableVirtualDef {
545 id: table_id,
546 namespace: ns_def.id,
547 name: name.to_string(),
548 columns,
549 });
550
551 self.catalog.register_table_virtual_user(def.clone())?;
553
554 let factory = Arc::new(IteratorVirtualTableFactory::new(creator, def.clone()));
556 self.executor.virtual_table_registry.register(ns_def.id, name.to_string(), factory);
557
558 Ok(table_id)
559 }
560
561 pub fn bulk_insert<'e>(
579 &'e self,
580 identity: &'e Identity,
581 ) -> crate::bulk_insert::BulkInsertBuilder<'e, crate::bulk_insert::Validated> {
582 crate::bulk_insert::BulkInsertBuilder::new(self, identity)
583 }
584
585 pub fn bulk_insert_trusted<'e>(
595 &'e self,
596 identity: &'e Identity,
597 ) -> crate::bulk_insert::BulkInsertBuilder<'e, crate::bulk_insert::Trusted> {
598 crate::bulk_insert::BulkInsertBuilder::new_trusted(self, identity)
599 }
600}
601
602fn convert_table_virtual_user_columns_to_column_defs(columns: &[TableVirtualUserColumnDef]) -> Vec<ColumnDef> {
604 columns.iter()
605 .enumerate()
606 .map(|(idx, col)| {
607 let constraint = TypeConstraint::unconstrained(col.data_type);
610 ColumnDef {
611 id: ColumnId(idx as u64),
612 name: col.name.clone(),
613 constraint,
614 policies: vec![],
615 index: ColumnIndex(idx as u8),
616 auto_increment: false,
617 dictionary_id: None,
618 }
619 })
620 .collect()
621}