1use std::{
5 ops::Deref,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10 time::Duration,
11};
12
13use reifydb_auth::service::AuthEngine;
14use reifydb_catalog::{
15 catalog::Catalog,
16 materialized::MaterializedCatalog,
17 vtable::{
18 system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
19 tables::UserVTableDataFunction,
20 user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
21 },
22};
23use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
24use reifydb_core::{
25 common::CommitVersion,
26 error::diagnostic::{catalog::namespace_not_found, engine::read_only_rejection},
27 event::{Event, EventBus},
28 execution::ExecutionResult,
29 interface::{
30 WithEventBus,
31 catalog::{
32 column::{Column, ColumnIndex},
33 id::ColumnId,
34 vtable::{VTable, VTableId},
35 },
36 },
37 metric::ExecutionMetrics,
38};
39use reifydb_metric::storage::metric::MetricReader;
40use reifydb_runtime::{
41 actor::system::ActorSystem,
42 context::{clock::Clock, rng::Rng},
43};
44use reifydb_store_single::SingleStore;
45use reifydb_transaction::{
46 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
47 multi::transaction::MultiTransaction,
48 single::SingleTransaction,
49 transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
50};
51use reifydb_type::{
52 error::Error,
53 fragment::Fragment,
54 params::Params,
55 value::{constraint::TypeConstraint, identity::IdentityId},
56};
57use tracing::instrument;
58
59use crate::{
60 Result,
61 bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
62 interceptor::catalog::MaterializedCatalogInterceptor,
63 vm::{Admin, Command, Query, Subscription, executor::Executor, services::EngineConfig},
64};
65
66pub struct StandardEngine(Arc<Inner>);
67
68impl WithEventBus for StandardEngine {
69 fn event_bus(&self) -> &EventBus {
70 &self.event_bus
71 }
72}
73
74impl AuthEngine for StandardEngine {
75 fn begin_admin(&self) -> Result<AdminTransaction> {
76 StandardEngine::begin_admin(self, IdentityId::system())
77 }
78
79 fn begin_query(&self) -> Result<QueryTransaction> {
80 StandardEngine::begin_query(self, IdentityId::system())
81 }
82
83 fn catalog(&self) -> Catalog {
84 StandardEngine::catalog(self)
85 }
86}
87
88impl StandardEngine {
90 #[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
91 pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
92 let interceptors = self.interceptors.create();
93 let mut txn = CommandTransaction::new(
94 self.multi.clone(),
95 self.single.clone(),
96 self.event_bus.clone(),
97 interceptors,
98 identity,
99 self.executor.runtime_context.clock.clone(),
100 )?;
101 txn.set_executor(Arc::new(self.executor.clone()));
102 Ok(txn)
103 }
104
105 #[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
106 pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
107 let interceptors = self.interceptors.create();
108 let mut txn = AdminTransaction::new(
109 self.multi.clone(),
110 self.single.clone(),
111 self.event_bus.clone(),
112 interceptors,
113 identity,
114 self.executor.runtime_context.clock.clone(),
115 )?;
116 txn.set_executor(Arc::new(self.executor.clone()));
117 Ok(txn)
118 }
119
120 #[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
121 pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
122 let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
123 txn.set_executor(Arc::new(self.executor.clone()));
124 Ok(txn)
125 }
126
127 pub fn clock(&self) -> &Clock {
129 &self.executor.runtime_context.clock
130 }
131
132 pub fn rng(&self) -> &Rng {
133 &self.executor.runtime_context.rng
134 }
135
136 #[instrument(name = "engine::admin_as", level = "debug", skip(self, params), fields(rql = %rql))]
137 pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
138 if let Err(e) = self.reject_if_read_only() {
139 return ExecutionResult {
140 frames: vec![],
141 error: Some(e),
142 metrics: ExecutionMetrics::default(),
143 };
144 }
145 let mut txn = match self.begin_admin(identity) {
146 Ok(t) => t,
147 Err(mut e) => {
148 e.with_rql(rql.to_string());
149 return ExecutionResult {
150 frames: vec![],
151 error: Some(e),
152 metrics: ExecutionMetrics::default(),
153 };
154 }
155 };
156 let mut outcome = self.executor.admin(
157 &mut txn,
158 Admin {
159 rql,
160 params,
161 },
162 );
163 if outcome.is_ok()
164 && let Err(mut e) = txn.commit()
165 {
166 e.with_rql(rql.to_string());
167 outcome.error = Some(e);
168 }
169 if let Some(ref mut e) = outcome.error {
170 e.with_rql(rql.to_string());
171 }
172 outcome
173 }
174
175 #[instrument(name = "engine::command_as", level = "debug", skip(self, params), fields(rql = %rql))]
176 pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
177 if let Err(e) = self.reject_if_read_only() {
178 return ExecutionResult {
179 frames: vec![],
180 error: Some(e),
181 metrics: ExecutionMetrics::default(),
182 };
183 }
184 let mut txn = match self.begin_command(identity) {
185 Ok(t) => t,
186 Err(mut e) => {
187 e.with_rql(rql.to_string());
188 return ExecutionResult {
189 frames: vec![],
190 error: Some(e),
191 metrics: ExecutionMetrics::default(),
192 };
193 }
194 };
195 let mut outcome = self.executor.command(
196 &mut txn,
197 Command {
198 rql,
199 params,
200 },
201 );
202 if outcome.is_ok()
203 && let Err(mut e) = txn.commit()
204 {
205 e.with_rql(rql.to_string());
206 outcome.error = Some(e);
207 }
208 if let Some(ref mut e) = outcome.error {
209 e.with_rql(rql.to_string());
210 }
211 outcome
212 }
213
214 #[instrument(name = "engine::query_as", level = "debug", skip(self, params), fields(rql = %rql))]
215 pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
216 let mut txn = match self.begin_query(identity) {
217 Ok(t) => t,
218 Err(mut e) => {
219 e.with_rql(rql.to_string());
220 return ExecutionResult {
221 frames: vec![],
222 error: Some(e),
223 metrics: ExecutionMetrics::default(),
224 };
225 }
226 };
227 let mut outcome = self.executor.query(
228 &mut txn,
229 Query {
230 rql,
231 params,
232 },
233 );
234 if let Some(ref mut e) = outcome.error {
235 e.with_rql(rql.to_string());
236 }
237 outcome
238 }
239
240 #[instrument(name = "engine::subscribe_as", level = "debug", skip(self, params), fields(rql = %rql))]
241 pub fn subscribe_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
242 let mut txn = match self.begin_query(identity) {
243 Ok(t) => t,
244 Err(mut e) => {
245 e.with_rql(rql.to_string());
246 return ExecutionResult {
247 frames: vec![],
248 error: Some(e),
249 metrics: ExecutionMetrics::default(),
250 };
251 }
252 };
253 let mut outcome = self.executor.subscription(
254 &mut txn,
255 Subscription {
256 rql,
257 params,
258 },
259 );
260 if let Some(ref mut e) = outcome.error {
261 e.with_rql(rql.to_string());
262 }
263 outcome
264 }
265
266 #[instrument(name = "engine::procedure_as", level = "debug", skip(self, params), fields(name = %name))]
268 pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> ExecutionResult {
269 if let Err(e) = self.reject_if_read_only() {
270 return ExecutionResult {
271 frames: vec![],
272 error: Some(e),
273 metrics: ExecutionMetrics::default(),
274 };
275 }
276 let mut txn = match self.begin_command(identity) {
277 Ok(t) => t,
278 Err(e) => {
279 return ExecutionResult {
280 frames: vec![],
281 error: Some(e),
282 metrics: ExecutionMetrics::default(),
283 };
284 }
285 };
286 let mut outcome = self.executor.call_procedure(&mut txn, name, ¶ms);
287 if outcome.is_ok()
288 && let Err(e) = txn.commit()
289 {
290 outcome.error = Some(e);
291 }
292 outcome
293 }
294
295 pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
332 let catalog = self.materialized_catalog();
333
334 let ns_def = catalog
336 .find_namespace_by_name(namespace)
337 .ok_or_else(|| Error(Box::new(namespace_not_found(Fragment::None, namespace))))?;
338
339 let table_id = self.executor.virtual_table_registry.allocate_id();
341 let table_columns = table.vtable();
343 let columns = convert_vtable_user_columns_to_columns(&table_columns);
344
345 let def = Arc::new(VTable {
347 id: table_id,
348 namespace: ns_def.id(),
349 name: name.to_string(),
350 columns,
351 });
352
353 catalog.register_vtable_user(def.clone())?;
355 let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
357 let entry = UserVTableEntry {
359 def: def.clone(),
360 data_fn,
361 };
362 self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
363 Ok(table_id)
364 }
365}
366
367impl CdcHost for StandardEngine {
368 fn begin_command(&self) -> Result<CommandTransaction> {
369 StandardEngine::begin_command(self, IdentityId::system())
370 }
371
372 fn begin_query(&self) -> Result<QueryTransaction> {
373 StandardEngine::begin_query(self, IdentityId::system())
374 }
375
376 fn current_version(&self) -> Result<CommitVersion> {
377 StandardEngine::current_version(self)
378 }
379
380 fn done_until(&self) -> CommitVersion {
381 StandardEngine::done_until(self)
382 }
383
384 fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
385 StandardEngine::wait_for_mark_timeout(self, version, timeout)
386 }
387
388 fn materialized_catalog(&self) -> &MaterializedCatalog {
389 &self.catalog.materialized
390 }
391}
392
393impl Clone for StandardEngine {
394 fn clone(&self) -> Self {
395 Self(self.0.clone())
396 }
397}
398
399impl Deref for StandardEngine {
400 type Target = Inner;
401
402 fn deref(&self) -> &Self::Target {
403 &self.0
404 }
405}
406
407pub struct Inner {
408 multi: MultiTransaction,
409 single: SingleTransaction,
410 event_bus: EventBus,
411 executor: Executor,
412 interceptors: Arc<InterceptorFactory>,
413 catalog: Catalog,
414 flow_operator_store: SystemFlowOperatorStore,
415 read_only: AtomicBool,
416}
417
418impl StandardEngine {
419 pub fn new(
420 multi: MultiTransaction,
421 single: SingleTransaction,
422 event_bus: EventBus,
423 interceptors: InterceptorFactory,
424 catalog: Catalog,
425 config: EngineConfig,
426 ) -> Self {
427 let flow_operator_store = SystemFlowOperatorStore::new();
428 let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
429 event_bus.register(listener);
430
431 let metrics_store = config
433 .ioc
434 .resolve::<SingleStore>()
435 .expect("SingleStore must be registered in IocContainer for metrics");
436 let stats_reader = MetricReader::new(metrics_store);
437
438 let materialized = catalog.materialized.clone();
440 interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
441 interceptors
442 .post_commit
443 .add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
444 }));
445
446 let interceptors = Arc::new(interceptors);
447
448 Self(Arc::new(Inner {
449 multi,
450 single,
451 event_bus,
452 executor: Executor::new(catalog.clone(), config, flow_operator_store.clone(), stats_reader),
453 interceptors,
454 catalog,
455 flow_operator_store,
456 read_only: AtomicBool::new(false),
457 }))
458 }
459
460 pub fn create_interceptors(&self) -> Interceptors {
462 self.interceptors.create()
463 }
464
465 pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
470 self.interceptors.add_late(factory);
471 }
472
473 #[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
478 ))]
479 pub fn begin_query_at_version(&self, version: CommitVersion, identity: IdentityId) -> Result<QueryTransaction> {
480 let mut txn = QueryTransaction::new(
481 self.multi.begin_query_at_version(version)?,
482 self.single.clone(),
483 identity,
484 );
485 txn.set_executor(Arc::new(self.executor.clone()));
486 Ok(txn)
487 }
488
489 #[inline]
490 pub fn multi(&self) -> &MultiTransaction {
491 &self.multi
492 }
493
494 #[inline]
495 pub fn multi_owned(&self) -> MultiTransaction {
496 self.multi.clone()
497 }
498
499 #[inline]
501 pub fn actor_system(&self) -> ActorSystem {
502 self.multi.actor_system()
503 }
504
505 #[inline]
506 pub fn single(&self) -> &SingleTransaction {
507 &self.single
508 }
509
510 #[inline]
511 pub fn single_owned(&self) -> SingleTransaction {
512 self.single.clone()
513 }
514
515 #[inline]
516 pub fn emit<E: Event>(&self, event: E) {
517 self.event_bus.emit(event)
518 }
519
520 #[inline]
521 pub fn materialized_catalog(&self) -> &MaterializedCatalog {
522 &self.catalog.materialized
523 }
524
525 #[inline]
529 pub fn catalog(&self) -> Catalog {
530 self.catalog.clone()
531 }
532
533 #[inline]
534 pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
535 &self.flow_operator_store
536 }
537
538 #[inline]
540 pub fn current_version(&self) -> Result<CommitVersion> {
541 self.multi.current_version()
542 }
543
544 #[inline]
548 pub fn done_until(&self) -> CommitVersion {
549 self.multi.done_until()
550 }
551
552 #[inline]
555 pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
556 self.multi.wait_for_mark_timeout(version, timeout)
557 }
558
559 #[inline]
560 pub fn executor(&self) -> Executor {
561 self.executor.clone()
562 }
563
564 #[inline]
569 pub fn cdc_store(&self) -> CdcStore {
570 self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
571 }
572
573 pub fn set_read_only(&self) {
576 self.read_only.store(true, Ordering::SeqCst);
577 }
578
579 pub fn is_read_only(&self) -> bool {
581 self.read_only.load(Ordering::SeqCst)
582 }
583
584 pub(crate) fn reject_if_read_only(&self) -> Result<()> {
585 if self.is_read_only() {
586 return Err(Error(Box::new(read_only_rejection(Fragment::None))));
587 }
588 Ok(())
589 }
590
591 pub fn shutdown(&self) {
592 self.interceptors.clear_late();
593 self.executor.ioc.clear();
594 }
595
596 pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
614 BulkInsertBuilder::new(self, identity)
615 }
616
617 pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
627 BulkInsertBuilder::new_trusted(self, identity)
628 }
629}
630
631fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
633 columns.iter()
634 .enumerate()
635 .map(|(idx, col)| {
636 let constraint = TypeConstraint::unconstrained(col.data_type.clone());
639 Column {
640 id: ColumnId(idx as u64),
641 name: col.name.clone(),
642 constraint,
643 properties: vec![],
644 index: ColumnIndex(idx as u8),
645 auto_increment: false,
646 dictionary_id: None,
647 }
648 })
649 .collect()
650}