1use std::{
5 ops::Deref,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10 time::Duration,
11};
12
13use reifydb_auth::service::AuthEngine;
14use reifydb_catalog::{
15 catalog::Catalog,
16 materialized::MaterializedCatalog,
17 vtable::{
18 system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
19 tables::UserVTableDataFunction,
20 user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
21 },
22};
23use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
24use reifydb_core::{
25 common::CommitVersion,
26 error::diagnostic::{catalog::namespace_not_found, engine::read_only_rejection},
27 event::{Event, EventBus},
28 execution::ExecutionResult,
29 interface::{
30 WithEventBus,
31 catalog::{
32 column::{Column, ColumnIndex},
33 id::ColumnId,
34 vtable::{VTable, VTableId},
35 },
36 },
37 metric::ExecutionMetrics,
38};
39use reifydb_metric_old::metric::MetricReader;
40use reifydb_runtime::{actor::system::ActorSystem, context::clock::Clock};
41use reifydb_store_single::SingleStore;
42use reifydb_transaction::{
43 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
44 multi::transaction::MultiTransaction,
45 single::SingleTransaction,
46 transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
47};
48use reifydb_type::{
49 error::Error,
50 fragment::Fragment,
51 params::Params,
52 value::{constraint::TypeConstraint, identity::IdentityId},
53};
54use tracing::instrument;
55
56use crate::{
57 Result,
58 bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
59 interceptor::catalog::MaterializedCatalogInterceptor,
60 vm::{Admin, Command, Query, Subscription, executor::Executor, services::EngineConfig},
61};
62
63pub struct StandardEngine(Arc<Inner>);
64
65impl WithEventBus for StandardEngine {
66 fn event_bus(&self) -> &EventBus {
67 &self.event_bus
68 }
69}
70
71impl AuthEngine for StandardEngine {
72 fn begin_admin(&self) -> Result<AdminTransaction> {
73 StandardEngine::begin_admin(self, IdentityId::system())
74 }
75
76 fn begin_query(&self) -> Result<QueryTransaction> {
77 StandardEngine::begin_query(self, IdentityId::system())
78 }
79
80 fn catalog(&self) -> Catalog {
81 StandardEngine::catalog(self)
82 }
83}
84
85impl StandardEngine {
87 #[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
88 pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
89 let interceptors = self.interceptors.create();
90 let mut txn = CommandTransaction::new(
91 self.multi.clone(),
92 self.single.clone(),
93 self.event_bus.clone(),
94 interceptors,
95 identity,
96 self.executor.runtime_context.clock.clone(),
97 )?;
98 txn.set_executor(Arc::new(self.executor.clone()));
99 Ok(txn)
100 }
101
102 #[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
103 pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
104 let interceptors = self.interceptors.create();
105 let mut txn = AdminTransaction::new(
106 self.multi.clone(),
107 self.single.clone(),
108 self.event_bus.clone(),
109 interceptors,
110 identity,
111 self.executor.runtime_context.clock.clone(),
112 )?;
113 txn.set_executor(Arc::new(self.executor.clone()));
114 Ok(txn)
115 }
116
117 #[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
118 pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
119 let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
120 txn.set_executor(Arc::new(self.executor.clone()));
121 Ok(txn)
122 }
123
124 pub fn clock(&self) -> &Clock {
126 &self.executor.runtime_context.clock
127 }
128
129 #[instrument(name = "engine::admin_as", level = "debug", skip(self, params), fields(rql = %rql))]
130 pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
131 if let Err(e) = self.reject_if_read_only() {
132 return ExecutionResult {
133 frames: vec![],
134 error: Some(e),
135 metrics: ExecutionMetrics::default(),
136 };
137 }
138 let mut txn = match self.begin_admin(identity) {
139 Ok(t) => t,
140 Err(mut e) => {
141 e.with_statement(rql.to_string());
142 return ExecutionResult {
143 frames: vec![],
144 error: Some(e),
145 metrics: ExecutionMetrics::default(),
146 };
147 }
148 };
149 let mut outcome = self.executor.admin(
150 &mut txn,
151 Admin {
152 rql,
153 params,
154 },
155 );
156 if outcome.is_ok()
157 && let Err(mut e) = txn.commit()
158 {
159 e.with_statement(rql.to_string());
160 outcome.error = Some(e);
161 }
162 if let Some(ref mut e) = outcome.error {
163 e.with_statement(rql.to_string());
164 }
165 outcome
166 }
167
168 #[instrument(name = "engine::command_as", level = "debug", skip(self, params), fields(rql = %rql))]
169 pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
170 if let Err(e) = self.reject_if_read_only() {
171 return ExecutionResult {
172 frames: vec![],
173 error: Some(e),
174 metrics: ExecutionMetrics::default(),
175 };
176 }
177 let mut txn = match self.begin_command(identity) {
178 Ok(t) => t,
179 Err(mut e) => {
180 e.with_statement(rql.to_string());
181 return ExecutionResult {
182 frames: vec![],
183 error: Some(e),
184 metrics: ExecutionMetrics::default(),
185 };
186 }
187 };
188 let mut outcome = self.executor.command(
189 &mut txn,
190 Command {
191 rql,
192 params,
193 },
194 );
195 if outcome.is_ok()
196 && let Err(mut e) = txn.commit()
197 {
198 e.with_statement(rql.to_string());
199 outcome.error = Some(e);
200 }
201 if let Some(ref mut e) = outcome.error {
202 e.with_statement(rql.to_string());
203 }
204 outcome
205 }
206
207 #[instrument(name = "engine::query_as", level = "debug", skip(self, params), fields(rql = %rql))]
208 pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
209 let mut txn = match self.begin_query(identity) {
210 Ok(t) => t,
211 Err(mut e) => {
212 e.with_statement(rql.to_string());
213 return ExecutionResult {
214 frames: vec![],
215 error: Some(e),
216 metrics: ExecutionMetrics::default(),
217 };
218 }
219 };
220 let mut outcome = self.executor.query(
221 &mut txn,
222 Query {
223 rql,
224 params,
225 },
226 );
227 if let Some(ref mut e) = outcome.error {
228 e.with_statement(rql.to_string());
229 }
230 outcome
231 }
232
233 #[instrument(name = "engine::subscribe_as", level = "debug", skip(self, params), fields(rql = %rql))]
234 pub fn subscribe_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
235 let mut txn = match self.begin_query(identity) {
236 Ok(t) => t,
237 Err(mut e) => {
238 e.with_statement(rql.to_string());
239 return ExecutionResult {
240 frames: vec![],
241 error: Some(e),
242 metrics: ExecutionMetrics::default(),
243 };
244 }
245 };
246 let mut outcome = self.executor.subscription(
247 &mut txn,
248 Subscription {
249 rql,
250 params,
251 },
252 );
253 if let Some(ref mut e) = outcome.error {
254 e.with_statement(rql.to_string());
255 }
256 outcome
257 }
258
259 #[instrument(name = "engine::procedure_as", level = "debug", skip(self, params), fields(name = %name))]
261 pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> ExecutionResult {
262 if let Err(e) = self.reject_if_read_only() {
263 return ExecutionResult {
264 frames: vec![],
265 error: Some(e),
266 metrics: ExecutionMetrics::default(),
267 };
268 }
269 let mut txn = match self.begin_command(identity) {
270 Ok(t) => t,
271 Err(e) => {
272 return ExecutionResult {
273 frames: vec![],
274 error: Some(e),
275 metrics: ExecutionMetrics::default(),
276 };
277 }
278 };
279 let mut outcome = self.executor.call_procedure(&mut txn, name, ¶ms);
280 if outcome.is_ok()
281 && let Err(e) = txn.commit()
282 {
283 outcome.error = Some(e);
284 }
285 outcome
286 }
287
288 pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
325 let catalog = self.materialized_catalog();
326
327 let ns_def = catalog
329 .find_namespace_by_name(namespace)
330 .ok_or_else(|| Error(Box::new(namespace_not_found(Fragment::None, namespace))))?;
331
332 let table_id = self.executor.virtual_table_registry.allocate_id();
334 let table_columns = table.vtable();
336 let columns = convert_vtable_user_columns_to_columns(&table_columns);
337
338 let def = Arc::new(VTable {
340 id: table_id,
341 namespace: ns_def.id(),
342 name: name.to_string(),
343 columns,
344 });
345
346 catalog.register_vtable_user(def.clone())?;
348 let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
350 let entry = UserVTableEntry {
352 def: def.clone(),
353 data_fn,
354 };
355 self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
356 Ok(table_id)
357 }
358}
359
360impl CdcHost for StandardEngine {
361 fn begin_command(&self) -> Result<CommandTransaction> {
362 StandardEngine::begin_command(self, IdentityId::system())
363 }
364
365 fn begin_query(&self) -> Result<QueryTransaction> {
366 StandardEngine::begin_query(self, IdentityId::system())
367 }
368
369 fn current_version(&self) -> Result<CommitVersion> {
370 StandardEngine::current_version(self)
371 }
372
373 fn done_until(&self) -> CommitVersion {
374 StandardEngine::done_until(self)
375 }
376
377 fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
378 StandardEngine::wait_for_mark_timeout(self, version, timeout)
379 }
380
381 fn materialized_catalog(&self) -> &MaterializedCatalog {
382 &self.catalog.materialized
383 }
384}
385
386impl Clone for StandardEngine {
387 fn clone(&self) -> Self {
388 Self(self.0.clone())
389 }
390}
391
392impl Deref for StandardEngine {
393 type Target = Inner;
394
395 fn deref(&self) -> &Self::Target {
396 &self.0
397 }
398}
399
400pub struct Inner {
401 multi: MultiTransaction,
402 single: SingleTransaction,
403 event_bus: EventBus,
404 executor: Executor,
405 interceptors: Arc<InterceptorFactory>,
406 catalog: Catalog,
407 flow_operator_store: SystemFlowOperatorStore,
408 read_only: AtomicBool,
409}
410
411impl StandardEngine {
412 pub fn new(
413 multi: MultiTransaction,
414 single: SingleTransaction,
415 event_bus: EventBus,
416 interceptors: InterceptorFactory,
417 catalog: Catalog,
418 config: EngineConfig,
419 ) -> Self {
420 let flow_operator_store = SystemFlowOperatorStore::new();
421 let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
422 event_bus.register(listener);
423
424 let metrics_store = config
426 .ioc
427 .resolve::<SingleStore>()
428 .expect("SingleStore must be registered in IocContainer for metrics");
429 let stats_reader = MetricReader::new(metrics_store);
430
431 let materialized = catalog.materialized.clone();
433 interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
434 interceptors
435 .post_commit
436 .add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
437 }));
438
439 let interceptors = Arc::new(interceptors);
440
441 Self(Arc::new(Inner {
442 multi,
443 single,
444 event_bus,
445 executor: Executor::new(catalog.clone(), config, flow_operator_store.clone(), stats_reader),
446 interceptors,
447 catalog,
448 flow_operator_store,
449 read_only: AtomicBool::new(false),
450 }))
451 }
452
453 pub fn create_interceptors(&self) -> Interceptors {
455 self.interceptors.create()
456 }
457
458 pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
463 self.interceptors.add_late(factory);
464 }
465
466 #[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
471 ))]
472 pub fn begin_query_at_version(&self, version: CommitVersion, identity: IdentityId) -> Result<QueryTransaction> {
473 let mut txn = QueryTransaction::new(
474 self.multi.begin_query_at_version(version)?,
475 self.single.clone(),
476 identity,
477 );
478 txn.set_executor(Arc::new(self.executor.clone()));
479 Ok(txn)
480 }
481
482 #[inline]
483 pub fn multi(&self) -> &MultiTransaction {
484 &self.multi
485 }
486
487 #[inline]
488 pub fn multi_owned(&self) -> MultiTransaction {
489 self.multi.clone()
490 }
491
492 #[inline]
494 pub fn actor_system(&self) -> ActorSystem {
495 self.multi.actor_system()
496 }
497
498 #[inline]
499 pub fn single(&self) -> &SingleTransaction {
500 &self.single
501 }
502
503 #[inline]
504 pub fn single_owned(&self) -> SingleTransaction {
505 self.single.clone()
506 }
507
508 #[inline]
509 pub fn emit<E: Event>(&self, event: E) {
510 self.event_bus.emit(event)
511 }
512
513 #[inline]
514 pub fn materialized_catalog(&self) -> &MaterializedCatalog {
515 &self.catalog.materialized
516 }
517
518 #[inline]
522 pub fn catalog(&self) -> Catalog {
523 self.catalog.clone()
524 }
525
526 #[inline]
527 pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
528 &self.flow_operator_store
529 }
530
531 #[inline]
533 pub fn current_version(&self) -> Result<CommitVersion> {
534 self.multi.current_version()
535 }
536
537 #[inline]
541 pub fn done_until(&self) -> CommitVersion {
542 self.multi.done_until()
543 }
544
545 #[inline]
548 pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
549 self.multi.wait_for_mark_timeout(version, timeout)
550 }
551
552 #[inline]
553 pub fn executor(&self) -> Executor {
554 self.executor.clone()
555 }
556
557 #[inline]
562 pub fn cdc_store(&self) -> CdcStore {
563 self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
564 }
565
566 pub fn set_read_only(&self) {
569 self.read_only.store(true, Ordering::SeqCst);
570 }
571
572 pub fn is_read_only(&self) -> bool {
574 self.read_only.load(Ordering::SeqCst)
575 }
576
577 pub(crate) fn reject_if_read_only(&self) -> Result<()> {
578 if self.is_read_only() {
579 return Err(Error(Box::new(read_only_rejection(Fragment::None))));
580 }
581 Ok(())
582 }
583
584 pub fn shutdown(&self) {
585 self.interceptors.clear_late();
586 self.executor.ioc.clear();
587 }
588
589 pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
607 BulkInsertBuilder::new(self, identity)
608 }
609
610 pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
620 BulkInsertBuilder::new_trusted(self, identity)
621 }
622}
623
624fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
626 columns.iter()
627 .enumerate()
628 .map(|(idx, col)| {
629 let constraint = TypeConstraint::unconstrained(col.data_type.clone());
632 Column {
633 id: ColumnId(idx as u64),
634 name: col.name.clone(),
635 constraint,
636 properties: vec![],
637 index: ColumnIndex(idx as u8),
638 auto_increment: false,
639 dictionary_id: None,
640 }
641 })
642 .collect()
643}