1use std::{
5 ops::Deref,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10 time::Duration,
11};
12
13use reifydb_auth::service::AuthEngine;
14use reifydb_catalog::{
15 catalog::Catalog,
16 materialized::MaterializedCatalog,
17 vtable::{
18 system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
19 tables::UserVTableDataFunction,
20 user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
21 },
22};
23use reifydb_cdc::{consume::host::CdcHost, produce::watermark::CdcProducerWatermark, storage::CdcStore};
24use reifydb_core::{
25 common::CommitVersion,
26 error::diagnostic::{catalog::namespace_not_found, engine::read_only_rejection},
27 event::{Event, EventBus},
28 execution::ExecutionResult,
29 interface::{
30 WithEventBus,
31 catalog::{
32 column::{Column, ColumnIndex},
33 id::ColumnId,
34 vtable::{VTable, VTableId},
35 },
36 },
37 metric::ExecutionMetrics,
38 util::ioc::IocContainer,
39};
40use reifydb_metric::storage::metric::MetricReader;
41use reifydb_runtime::{
42 actor::{mailbox::ActorRef, system::ActorSystem},
43 context::{clock::Clock, rng::Rng},
44};
45use reifydb_store_single::SingleStore;
46use reifydb_transaction::{
47 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
48 multi::transaction::MultiTransaction,
49 single::SingleTransaction,
50 transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
51};
52use reifydb_type::{
53 error::Error,
54 fragment::Fragment,
55 params::Params,
56 value::{constraint::TypeConstraint, identity::IdentityId},
57};
58use tracing::instrument;
59
60use crate::{
61 Result,
62 bulk_insert::builder::{BulkInsertBuilder, Unchecked, Validated},
63 interceptor::catalog::MaterializedCatalogInterceptor,
64 vm::{
65 Admin, Command, Query, Subscription,
66 executor::Executor,
67 services::{EngineConfig, Services},
68 },
69};
70
71pub struct StandardEngine(Arc<Inner>);
72
73impl WithEventBus for StandardEngine {
74 fn event_bus(&self) -> &EventBus {
75 &self.event_bus
76 }
77}
78
79impl AuthEngine for StandardEngine {
80 fn begin_admin(&self) -> Result<AdminTransaction> {
81 StandardEngine::begin_admin(self, IdentityId::system())
82 }
83
84 fn begin_query(&self) -> Result<QueryTransaction> {
85 StandardEngine::begin_query(self, IdentityId::system())
86 }
87
88 fn catalog(&self) -> Catalog {
89 StandardEngine::catalog(self)
90 }
91}
92
93impl StandardEngine {
95 #[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
96 pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
97 let interceptors = self.interceptors.create();
98 let mut txn = CommandTransaction::new(
99 self.multi.clone(),
100 self.single.clone(),
101 self.event_bus.clone(),
102 interceptors,
103 identity,
104 self.executor.runtime_context.clock.clone(),
105 )?;
106 txn.set_executor(Arc::new(self.executor.clone()));
107 Ok(txn)
108 }
109
110 #[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
111 pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
112 let interceptors = self.interceptors.create();
113 let mut txn = AdminTransaction::new(
114 self.multi.clone(),
115 self.single.clone(),
116 self.event_bus.clone(),
117 interceptors,
118 identity,
119 self.executor.runtime_context.clock.clone(),
120 )?;
121 txn.set_executor(Arc::new(self.executor.clone()));
122 Ok(txn)
123 }
124
125 #[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
126 pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
127 let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
128 txn.set_executor(Arc::new(self.executor.clone()));
129 Ok(txn)
130 }
131
132 pub fn clock(&self) -> &Clock {
134 &self.executor.runtime_context.clock
135 }
136
137 pub fn rng(&self) -> &Rng {
138 &self.executor.runtime_context.rng
139 }
140
141 #[instrument(name = "engine::admin_as", level = "debug", skip(self, params), fields(rql = %rql))]
142 pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
143 if let Err(e) = self.reject_if_read_only() {
144 return ExecutionResult {
145 frames: vec![],
146 error: Some(e),
147 metrics: ExecutionMetrics::default(),
148 };
149 }
150 let mut txn = match self.begin_admin(identity) {
151 Ok(t) => t,
152 Err(mut e) => {
153 e.with_rql(rql.to_string());
154 return ExecutionResult {
155 frames: vec![],
156 error: Some(e),
157 metrics: ExecutionMetrics::default(),
158 };
159 }
160 };
161 let mut outcome = self.executor.admin(
162 &mut txn,
163 Admin {
164 rql,
165 params,
166 },
167 );
168 if outcome.is_ok()
169 && let Err(mut e) = txn.commit()
170 {
171 e.with_rql(rql.to_string());
172 outcome.error = Some(e);
173 }
174 if let Some(ref mut e) = outcome.error {
175 e.with_rql(rql.to_string());
176 }
177 outcome
178 }
179
180 #[instrument(name = "engine::command_as", level = "debug", skip(self, params), fields(rql = %rql))]
181 pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
182 if let Err(e) = self.reject_if_read_only() {
183 return ExecutionResult {
184 frames: vec![],
185 error: Some(e),
186 metrics: ExecutionMetrics::default(),
187 };
188 }
189 let mut txn = match self.begin_command(identity) {
190 Ok(t) => t,
191 Err(mut e) => {
192 e.with_rql(rql.to_string());
193 return ExecutionResult {
194 frames: vec![],
195 error: Some(e),
196 metrics: ExecutionMetrics::default(),
197 };
198 }
199 };
200 let mut outcome = self.executor.command(
201 &mut txn,
202 Command {
203 rql,
204 params,
205 },
206 );
207 if outcome.is_ok()
208 && let Err(mut e) = txn.commit()
209 {
210 e.with_rql(rql.to_string());
211 outcome.error = Some(e);
212 }
213 if let Some(ref mut e) = outcome.error {
214 e.with_rql(rql.to_string());
215 }
216 outcome
217 }
218
219 #[instrument(name = "engine::query_as", level = "debug", skip(self, params), fields(rql = %rql))]
220 pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
221 let mut txn = match self.begin_query(identity) {
222 Ok(t) => t,
223 Err(mut e) => {
224 e.with_rql(rql.to_string());
225 return ExecutionResult {
226 frames: vec![],
227 error: Some(e),
228 metrics: ExecutionMetrics::default(),
229 };
230 }
231 };
232 let mut outcome = self.executor.query(
233 &mut txn,
234 Query {
235 rql,
236 params,
237 },
238 );
239 if let Some(ref mut e) = outcome.error {
240 e.with_rql(rql.to_string());
241 }
242 outcome
243 }
244
245 #[instrument(name = "engine::subscribe_as", level = "debug", skip(self, params), fields(rql = %rql))]
246 pub fn subscribe_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
247 let mut txn = match self.begin_query(identity) {
248 Ok(t) => t,
249 Err(mut e) => {
250 e.with_rql(rql.to_string());
251 return ExecutionResult {
252 frames: vec![],
253 error: Some(e),
254 metrics: ExecutionMetrics::default(),
255 };
256 }
257 };
258 let mut outcome = self.executor.subscription(
259 &mut txn,
260 Subscription {
261 rql,
262 params,
263 },
264 );
265 if let Some(ref mut e) = outcome.error {
266 e.with_rql(rql.to_string());
267 }
268 outcome
269 }
270
271 #[instrument(name = "engine::procedure_as", level = "debug", skip(self, params), fields(name = %name))]
273 pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> ExecutionResult {
274 if let Err(e) = self.reject_if_read_only() {
275 return ExecutionResult {
276 frames: vec![],
277 error: Some(e),
278 metrics: ExecutionMetrics::default(),
279 };
280 }
281 let mut txn = match self.begin_command(identity) {
282 Ok(t) => t,
283 Err(e) => {
284 return ExecutionResult {
285 frames: vec![],
286 error: Some(e),
287 metrics: ExecutionMetrics::default(),
288 };
289 }
290 };
291 let mut outcome = self.executor.call_procedure(&mut txn, name, ¶ms);
292 if outcome.is_ok()
293 && let Err(e) = txn.commit()
294 {
295 outcome.error = Some(e);
296 }
297 outcome
298 }
299
300 pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
337 let catalog = self.materialized_catalog();
338
339 let ns_def = catalog
341 .find_namespace_by_name(namespace)
342 .ok_or_else(|| Error(Box::new(namespace_not_found(Fragment::None, namespace))))?;
343
344 let table_id = self.executor.virtual_table_registry.allocate_id();
346 let table_columns = table.vtable();
348 let columns = convert_vtable_user_columns_to_columns(&table_columns);
349
350 let def = Arc::new(VTable {
352 id: table_id,
353 namespace: ns_def.id(),
354 name: name.to_string(),
355 columns,
356 });
357
358 catalog.register_vtable_user(def.clone())?;
360 let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
362 let entry = UserVTableEntry {
364 def: def.clone(),
365 data_fn,
366 };
367 self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
368 Ok(table_id)
369 }
370}
371
372impl CdcHost for StandardEngine {
373 fn begin_command(&self) -> Result<CommandTransaction> {
374 StandardEngine::begin_command(self, IdentityId::system())
375 }
376
377 fn begin_query(&self) -> Result<QueryTransaction> {
378 StandardEngine::begin_query(self, IdentityId::system())
379 }
380
381 fn current_version(&self) -> Result<CommitVersion> {
382 StandardEngine::current_version(self)
383 }
384
385 fn done_until(&self) -> CommitVersion {
386 StandardEngine::done_until(self)
387 }
388
389 fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
390 StandardEngine::wait_for_mark_timeout(self, version, timeout)
391 }
392
393 fn materialized_catalog(&self) -> &MaterializedCatalog {
394 &self.catalog.materialized
395 }
396}
397
398impl Clone for StandardEngine {
399 fn clone(&self) -> Self {
400 Self(self.0.clone())
401 }
402}
403
404impl Deref for StandardEngine {
405 type Target = Inner;
406
407 fn deref(&self) -> &Self::Target {
408 &self.0
409 }
410}
411
412pub struct Inner {
413 multi: MultiTransaction,
414 single: SingleTransaction,
415 event_bus: EventBus,
416 executor: Executor,
417 interceptors: Arc<InterceptorFactory>,
418 catalog: Catalog,
419 flow_operator_store: SystemFlowOperatorStore,
420 read_only: AtomicBool,
421}
422
423impl StandardEngine {
424 pub fn new(
425 multi: MultiTransaction,
426 single: SingleTransaction,
427 event_bus: EventBus,
428 interceptors: InterceptorFactory,
429 catalog: Catalog,
430 config: EngineConfig,
431 ) -> Self {
432 let flow_operator_store = SystemFlowOperatorStore::new();
433 let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
434 event_bus.register(listener);
435
436 let metrics_store = config
438 .ioc
439 .resolve::<SingleStore>()
440 .expect("SingleStore must be registered in IocContainer for metrics");
441 let stats_reader = MetricReader::new(metrics_store);
442
443 let materialized = catalog.materialized.clone();
445 interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
446 interceptors
447 .post_commit
448 .add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
449 }));
450
451 let interceptors = Arc::new(interceptors);
452
453 Self(Arc::new(Inner {
454 multi,
455 single,
456 event_bus,
457 executor: Executor::new(catalog.clone(), config, flow_operator_store.clone(), stats_reader),
458 interceptors,
459 catalog,
460 flow_operator_store,
461 read_only: AtomicBool::new(false),
462 }))
463 }
464
465 pub fn create_interceptors(&self) -> Interceptors {
467 self.interceptors.create()
468 }
469
470 pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
475 self.interceptors.add_late(factory);
476 }
477
478 #[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
483 ))]
484 pub fn begin_query_at_version(&self, version: CommitVersion, identity: IdentityId) -> Result<QueryTransaction> {
485 let mut txn = QueryTransaction::new(
486 self.multi.begin_query_at_version(version)?,
487 self.single.clone(),
488 identity,
489 );
490 txn.set_executor(Arc::new(self.executor.clone()));
491 Ok(txn)
492 }
493
494 #[inline]
495 pub fn multi(&self) -> &MultiTransaction {
496 &self.multi
497 }
498
499 #[inline]
500 pub fn multi_owned(&self) -> MultiTransaction {
501 self.multi.clone()
502 }
503
504 #[inline]
506 pub fn actor_system(&self) -> ActorSystem {
507 self.multi.actor_system()
508 }
509
510 #[inline]
511 pub fn single(&self) -> &SingleTransaction {
512 &self.single
513 }
514
515 #[inline]
516 pub fn single_owned(&self) -> SingleTransaction {
517 self.single.clone()
518 }
519
520 #[inline]
521 pub fn emit<E: Event>(&self, event: E) {
522 self.event_bus.emit(event)
523 }
524
525 #[inline]
526 pub fn materialized_catalog(&self) -> &MaterializedCatalog {
527 &self.catalog.materialized
528 }
529
530 #[inline]
534 pub fn catalog(&self) -> Catalog {
535 self.catalog.clone()
536 }
537
538 #[inline]
544 pub fn services(&self) -> Arc<Services> {
545 self.executor.services().clone()
546 }
547
548 #[inline]
549 pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
550 &self.flow_operator_store
551 }
552
553 #[inline]
555 pub fn current_version(&self) -> Result<CommitVersion> {
556 self.multi.current_version()
557 }
558
559 #[inline]
563 pub fn done_until(&self) -> CommitVersion {
564 self.multi.done_until()
565 }
566
567 #[inline]
570 pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
571 self.multi.wait_for_mark_timeout(version, timeout)
572 }
573
574 #[inline]
575 pub fn executor(&self) -> Executor {
576 self.executor.clone()
577 }
578
579 #[inline]
583 pub fn ioc(&self) -> &IocContainer {
584 &self.executor.ioc
585 }
586
587 #[inline]
592 pub fn cdc_store(&self) -> CdcStore {
593 self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
594 }
595
596 #[inline]
602 pub fn actor<M: 'static>(&self) -> Option<ActorRef<M>>
603 where
604 ActorRef<M>: Send + Sync,
605 {
606 self.executor.ioc.try_resolve::<ActorRef<M>>()
607 }
608
609 #[inline]
618 pub fn cdc_producer_watermark(&self) -> CommitVersion {
619 self.executor
620 .ioc
621 .resolve::<CdcProducerWatermark>()
622 .expect("CdcProducerWatermark must be registered")
623 .get()
624 }
625
626 pub fn set_read_only(&self) {
629 self.read_only.store(true, Ordering::SeqCst);
630 }
631
632 pub fn is_read_only(&self) -> bool {
634 self.read_only.load(Ordering::SeqCst)
635 }
636
637 pub(crate) fn reject_if_read_only(&self) -> Result<()> {
638 if self.is_read_only() {
639 return Err(Error(Box::new(read_only_rejection(Fragment::None))));
640 }
641 Ok(())
642 }
643
644 pub fn shutdown(&self) {
645 self.interceptors.clear_late();
646 self.executor.ioc.clear();
647 }
648
649 pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
667 BulkInsertBuilder::new(self, identity)
668 }
669
670 pub fn bulk_insert_unchecked<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Unchecked> {
717 BulkInsertBuilder::new_unchecked(self, identity)
718 }
719}
720
721fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
723 columns.iter()
724 .enumerate()
725 .map(|(idx, col)| {
726 let constraint = TypeConstraint::unconstrained(col.data_type.clone());
729 Column {
730 id: ColumnId(idx as u64),
731 name: col.name.clone(),
732 constraint,
733 properties: vec![],
734 index: ColumnIndex(idx as u8),
735 auto_increment: false,
736 dictionary_id: None,
737 }
738 })
739 .collect()
740}