1use std::{
5 ops::Deref,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10 time::Duration,
11};
12
13use reifydb_auth::service::AuthEngine;
14use reifydb_catalog::{
15 catalog::Catalog,
16 interceptor::CatalogCacheInterceptor,
17 vtable::{
18 system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
19 tables::UserVTableDataFunction,
20 user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
21 },
22};
23use reifydb_cdc::{
24 consume::{host::CdcHost, watermark::CdcConsumerWatermark},
25 produce::watermark::CdcProducerWatermark,
26 storage::CdcStore,
27};
28use reifydb_core::{
29 common::CommitVersion,
30 error::diagnostic::{catalog::namespace_not_found, engine::read_only_rejection},
31 event::{Event, EventBus},
32 execution::ExecutionResult,
33 interface::{
34 WithEventBus,
35 catalog::{
36 column::{Column, ColumnIndex},
37 id::ColumnId,
38 vtable::{VTable, VTableId},
39 },
40 },
41 metric::ExecutionMetrics,
42 util::ioc::IocContainer,
43};
44use reifydb_metric::storage::metric::MetricReader;
45use reifydb_runtime::{
46 actor::{mailbox::ActorRef, system::ActorSystem},
47 context::{clock::Clock, rng::Rng},
48};
49use reifydb_store_single::SingleStore;
50use reifydb_transaction::{
51 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
52 multi::{lease::VersionLeaseGuard, transaction::MultiTransaction},
53 single::SingleTransaction,
54 transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
55};
56use reifydb_type::{
57 error::Error,
58 fragment::Fragment,
59 params::Params,
60 value::{constraint::TypeConstraint, identity::IdentityId},
61};
62use tracing::instrument;
63
64use crate::{
65 Result,
66 bulk_insert::builder::{BulkInsertBuilder, Unchecked, Validated},
67 vm::{
68 Admin, Command, Query, Subscription,
69 executor::Executor,
70 services::{EngineConfig, Services},
71 },
72};
73
74pub struct StandardEngine(Arc<Inner>);
75
76impl WithEventBus for StandardEngine {
77 fn event_bus(&self) -> &EventBus {
78 &self.event_bus
79 }
80}
81
82impl AuthEngine for StandardEngine {
83 fn begin_admin(&self) -> Result<AdminTransaction> {
84 StandardEngine::begin_admin(self, IdentityId::system())
85 }
86
87 fn begin_query(&self) -> Result<QueryTransaction> {
88 StandardEngine::begin_query(self, IdentityId::system())
89 }
90
91 fn catalog(&self) -> Catalog {
92 StandardEngine::catalog(self)
93 }
94}
95
96impl StandardEngine {
97 #[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
98 pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
99 let interceptors = self.interceptors.create();
100 let mut txn = CommandTransaction::new(
101 self.multi.clone(),
102 self.single.clone(),
103 self.event_bus.clone(),
104 interceptors,
105 identity,
106 self.executor.runtime_context.clock.clone(),
107 )?;
108 txn.set_executor(Arc::new(self.executor.clone()));
109 Ok(txn)
110 }
111
112 #[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
113 pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
114 let interceptors = self.interceptors.create();
115 let mut txn = AdminTransaction::new(
116 self.multi.clone(),
117 self.single.clone(),
118 self.event_bus.clone(),
119 interceptors,
120 identity,
121 self.executor.runtime_context.clock.clone(),
122 )?;
123 txn.set_executor(Arc::new(self.executor.clone()));
124 Ok(txn)
125 }
126
127 #[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
128 pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
129 let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
130 txn.set_executor(Arc::new(self.executor.clone()));
131 Ok(txn)
132 }
133
134 pub fn clock(&self) -> &Clock {
135 &self.executor.runtime_context.clock
136 }
137
138 pub fn rng(&self) -> &Rng {
139 &self.executor.runtime_context.rng
140 }
141
142 #[instrument(name = "engine::admin_as", level = "debug", skip(self, params), fields(rql = %rql))]
143 pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
144 if let Err(e) = self.reject_if_read_only() {
145 return ExecutionResult {
146 frames: vec![],
147 error: Some(e),
148 metrics: ExecutionMetrics::default(),
149 };
150 }
151 let mut txn = match self.begin_admin(identity) {
152 Ok(t) => t,
153 Err(mut e) => {
154 e.with_rql(rql.to_string());
155 return ExecutionResult {
156 frames: vec![],
157 error: Some(e),
158 metrics: ExecutionMetrics::default(),
159 };
160 }
161 };
162 let mut outcome = self.executor.admin(
163 &mut txn,
164 Admin {
165 rql,
166 params,
167 },
168 );
169 if outcome.is_ok()
170 && let Err(mut e) = txn.commit()
171 {
172 e.with_rql(rql.to_string());
173 outcome.error = Some(e);
174 }
175 if let Some(ref mut e) = outcome.error {
176 e.with_rql(rql.to_string());
177 }
178 outcome
179 }
180
181 #[instrument(name = "engine::command_as", level = "debug", skip(self, params), fields(rql = %rql))]
182 pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
183 if let Err(e) = self.reject_if_read_only() {
184 return ExecutionResult {
185 frames: vec![],
186 error: Some(e),
187 metrics: ExecutionMetrics::default(),
188 };
189 }
190 let mut txn = match self.begin_command(identity) {
191 Ok(t) => t,
192 Err(mut e) => {
193 e.with_rql(rql.to_string());
194 return ExecutionResult {
195 frames: vec![],
196 error: Some(e),
197 metrics: ExecutionMetrics::default(),
198 };
199 }
200 };
201 let mut outcome = self.executor.command(
202 &mut txn,
203 Command {
204 rql,
205 params,
206 },
207 );
208 if outcome.is_ok()
209 && let Err(mut e) = txn.commit()
210 {
211 e.with_rql(rql.to_string());
212 outcome.error = Some(e);
213 }
214 if let Some(ref mut e) = outcome.error {
215 e.with_rql(rql.to_string());
216 }
217 outcome
218 }
219
220 #[instrument(name = "engine::query_as", level = "debug", skip(self, params), fields(rql = %rql))]
221 pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
222 let mut txn = match self.begin_query(identity) {
223 Ok(t) => t,
224 Err(mut e) => {
225 e.with_rql(rql.to_string());
226 return ExecutionResult {
227 frames: vec![],
228 error: Some(e),
229 metrics: ExecutionMetrics::default(),
230 };
231 }
232 };
233 let mut outcome = self.executor.query(
234 &mut txn,
235 Query {
236 rql,
237 params,
238 },
239 );
240 if let Some(ref mut e) = outcome.error {
241 e.with_rql(rql.to_string());
242 }
243 outcome
244 }
245
246 #[instrument(name = "engine::query_as_at_version", level = "debug", skip(self, params, lease), fields(rql = %rql, version = %lease.version().0))]
247 pub fn query_as_at_version(
248 &self,
249 identity: IdentityId,
250 rql: &str,
251 params: Params,
252 lease: &VersionLeaseGuard,
253 ) -> ExecutionResult {
254 let mut txn = match self.begin_query_at_version(lease, identity) {
255 Ok(t) => t,
256 Err(mut e) => {
257 e.with_rql(rql.to_string());
258 return ExecutionResult {
259 frames: vec![],
260 error: Some(e),
261 metrics: ExecutionMetrics::default(),
262 };
263 }
264 };
265 let mut outcome = self.executor.query(
266 &mut txn,
267 Query {
268 rql,
269 params,
270 },
271 );
272 if let Some(ref mut e) = outcome.error {
273 e.with_rql(rql.to_string());
274 }
275 outcome
276 }
277
278 #[instrument(name = "engine::query_in_txn", level = "debug", skip(self, txn, params), fields(rql = %rql))]
279 pub fn query_in_txn(&self, txn: &mut QueryTransaction, rql: &str, params: Params) -> ExecutionResult {
280 let mut outcome = self.executor.query(
281 txn,
282 Query {
283 rql,
284 params,
285 },
286 );
287 if let Some(ref mut e) = outcome.error {
288 e.with_rql(rql.to_string());
289 }
290 outcome
291 }
292
293 #[instrument(name = "engine::subscribe_as", level = "debug", skip(self, params), fields(rql = %rql))]
294 pub fn subscribe_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
295 let mut txn = match self.begin_query(identity) {
296 Ok(t) => t,
297 Err(mut e) => {
298 e.with_rql(rql.to_string());
299 return ExecutionResult {
300 frames: vec![],
301 error: Some(e),
302 metrics: ExecutionMetrics::default(),
303 };
304 }
305 };
306 let mut outcome = self.executor.subscription(
307 &mut txn,
308 Subscription {
309 rql,
310 params,
311 },
312 );
313 if let Some(ref mut e) = outcome.error {
314 e.with_rql(rql.to_string());
315 }
316 outcome
317 }
318
319 #[instrument(name = "engine::procedure_as", level = "debug", skip(self, params), fields(name = %name))]
320 pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> ExecutionResult {
321 if let Err(e) = self.reject_if_read_only() {
322 return ExecutionResult {
323 frames: vec![],
324 error: Some(e),
325 metrics: ExecutionMetrics::default(),
326 };
327 }
328 let mut txn = match self.begin_command(identity) {
329 Ok(t) => t,
330 Err(e) => {
331 return ExecutionResult {
332 frames: vec![],
333 error: Some(e),
334 metrics: ExecutionMetrics::default(),
335 };
336 }
337 };
338 let mut outcome = self.executor.call_procedure(&mut txn, name, ¶ms);
339 if outcome.is_ok()
340 && let Err(e) = txn.commit()
341 {
342 outcome.error = Some(e);
343 }
344 outcome
345 }
346
347 pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
348 let catalog = self.catalog();
349
350 let mut qry = self.begin_query(IdentityId::root())?;
351 let ns_def = catalog
352 .find_namespace_by_name(&mut Transaction::Query(&mut qry), namespace)?
353 .ok_or_else(|| Error(Box::new(namespace_not_found(Fragment::None, namespace))))?;
354
355 let table_id = self.executor.virtual_table_registry.allocate_id();
356
357 let table_columns = table.vtable();
358 let columns = convert_vtable_user_columns_to_columns(&table_columns);
359
360 let def = Arc::new(VTable {
361 id: table_id,
362 namespace: ns_def.id(),
363 name: name.to_string(),
364 columns,
365 });
366
367 catalog.register_vtable_user(def.clone())?;
368
369 let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
370
371 let entry = UserVTableEntry {
372 def: def.clone(),
373 data_fn,
374 };
375 self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
376 Ok(table_id)
377 }
378}
379
380impl CdcHost for StandardEngine {
381 fn begin_command(&self) -> Result<CommandTransaction> {
382 StandardEngine::begin_command(self, IdentityId::system())
383 }
384
385 fn begin_query(&self) -> Result<QueryTransaction> {
386 StandardEngine::begin_query(self, IdentityId::system())
387 }
388
389 fn current_version(&self) -> Result<CommitVersion> {
390 StandardEngine::current_version(self)
391 }
392
393 fn done_until(&self) -> CommitVersion {
394 StandardEngine::done_until(self)
395 }
396
397 fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
398 StandardEngine::wait_for_mark_timeout(self, version, timeout)
399 }
400
401 fn catalog(&self) -> &Catalog {
402 &self.catalog
403 }
404}
405
406impl Clone for StandardEngine {
407 fn clone(&self) -> Self {
408 Self(self.0.clone())
409 }
410}
411
412impl Deref for StandardEngine {
413 type Target = Inner;
414
415 fn deref(&self) -> &Self::Target {
416 &self.0
417 }
418}
419
420pub struct Inner {
421 multi: MultiTransaction,
422 single: SingleTransaction,
423 event_bus: EventBus,
424 executor: Executor,
425 interceptors: Arc<InterceptorFactory>,
426 catalog: Catalog,
427 flow_operator_store: SystemFlowOperatorStore,
428 read_only: AtomicBool,
429}
430
431impl StandardEngine {
432 pub fn new(
433 multi: MultiTransaction,
434 single: SingleTransaction,
435 event_bus: EventBus,
436 interceptors: InterceptorFactory,
437 catalog: Catalog,
438 config: EngineConfig,
439 ) -> Self {
440 let flow_operator_store = SystemFlowOperatorStore::new();
441 let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
442 event_bus.register(listener);
443
444 let metrics_store = config
445 .ioc
446 .resolve::<SingleStore>()
447 .expect("SingleStore must be registered in IocContainer for metrics");
448 let stats_reader = MetricReader::new(metrics_store);
449
450 let catalog_for_interceptor = catalog.clone();
451 interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
452 interceptors.post_commit.add(Arc::new(CatalogCacheInterceptor::new(&catalog_for_interceptor)));
453 }));
454
455 let interceptors = Arc::new(interceptors);
456
457 Self(Arc::new(Inner {
458 multi,
459 single,
460 event_bus,
461 executor: Executor::new(catalog.clone(), config, flow_operator_store.clone(), stats_reader),
462 interceptors,
463 catalog,
464 flow_operator_store,
465 read_only: AtomicBool::new(false),
466 }))
467 }
468
469 pub fn create_interceptors(&self) -> Interceptors {
470 self.interceptors.create()
471 }
472
473 pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
474 self.interceptors.add_late(factory);
475 }
476
477 #[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self, lease), fields(version = %lease.version().0
478 ))]
479 pub fn begin_query_at_version(
480 &self,
481 lease: &VersionLeaseGuard,
482 identity: IdentityId,
483 ) -> Result<QueryTransaction> {
484 let mut txn =
485 QueryTransaction::new(self.multi.begin_query_at_version(lease)?, self.single.clone(), identity);
486 txn.set_executor(Arc::new(self.executor.clone()));
487 Ok(txn)
488 }
489
490 #[instrument(name = "engine::acquire_version_lease", level = "debug", skip(self), fields(version = %version.0))]
491 pub fn acquire_version_lease(&self, version: CommitVersion) -> Result<VersionLeaseGuard> {
492 self.multi.acquire_version_lease(version)
493 }
494
495 #[instrument(name = "engine::acquire_current_snapshot_lease", level = "debug", skip(self))]
496 pub fn acquire_current_snapshot_lease(&self) -> Result<(CommitVersion, VersionLeaseGuard)> {
497 self.multi.acquire_current_snapshot_lease()
498 }
499
500 #[inline]
501 pub fn multi(&self) -> &MultiTransaction {
502 &self.multi
503 }
504
505 #[inline]
506 pub fn multi_owned(&self) -> MultiTransaction {
507 self.multi.clone()
508 }
509
510 #[inline]
511 pub fn actor_system(&self) -> ActorSystem {
512 self.multi.actor_system()
513 }
514
515 #[inline]
516 pub fn single(&self) -> &SingleTransaction {
517 &self.single
518 }
519
520 #[inline]
521 pub fn single_owned(&self) -> SingleTransaction {
522 self.single.clone()
523 }
524
525 #[inline]
526 pub fn emit<E: Event>(&self, event: E) {
527 self.event_bus.emit(event)
528 }
529
530 #[inline]
531 pub fn catalog(&self) -> Catalog {
532 self.catalog.clone()
533 }
534
535 #[inline]
536 pub fn services(&self) -> Arc<Services> {
537 self.executor.services().clone()
538 }
539
540 #[inline]
541 pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
542 &self.flow_operator_store
543 }
544
545 #[inline]
546 pub fn current_version(&self) -> Result<CommitVersion> {
547 self.multi.current_version()
548 }
549
550 #[inline]
551 pub fn done_until(&self) -> CommitVersion {
552 self.multi.done_until()
553 }
554
555 #[inline]
556 pub fn query_done_until(&self) -> CommitVersion {
557 self.multi.query_done_until()
558 }
559
560 #[inline]
561 pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
562 self.multi.wait_for_mark_timeout(version, timeout)
563 }
564
565 #[inline]
566 pub fn executor(&self) -> Executor {
567 self.executor.clone()
568 }
569
570 #[inline]
571 pub fn ioc(&self) -> &IocContainer {
572 &self.executor.ioc
573 }
574
575 #[inline]
576 pub fn cdc_store(&self) -> CdcStore {
577 self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
578 }
579
580 #[inline]
581 pub fn actor<M: 'static>(&self) -> Option<ActorRef<M>>
582 where
583 ActorRef<M>: Send + Sync,
584 {
585 self.executor.ioc.try_resolve::<ActorRef<M>>()
586 }
587
588 #[inline]
589 pub fn cdc_producer_watermark(&self) -> CommitVersion {
590 self.executor
591 .ioc
592 .resolve::<CdcProducerWatermark>()
593 .expect("CdcProducerWatermark must be registered")
594 .get()
595 }
596
597 #[inline]
598 pub fn cdc_consumer_watermark(&self) -> CommitVersion {
599 self.executor.ioc.try_resolve::<CdcConsumerWatermark>().map(|w| w.get()).unwrap_or(CommitVersion(0))
600 }
601
602 pub fn set_read_only(&self) {
603 self.read_only.store(true, Ordering::SeqCst);
604 }
605
606 pub fn is_read_only(&self) -> bool {
607 self.read_only.load(Ordering::SeqCst)
608 }
609
610 pub(crate) fn reject_if_read_only(&self) -> Result<()> {
611 if self.is_read_only() {
612 return Err(Error(Box::new(read_only_rejection(Fragment::None))));
613 }
614 Ok(())
615 }
616
617 pub fn shutdown(&self) {
618 self.interceptors.clear_late();
619 self.executor.ioc.clear();
620 }
621
622 pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
623 BulkInsertBuilder::new(self, identity)
624 }
625
626 pub fn bulk_insert_unchecked<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Unchecked> {
627 BulkInsertBuilder::new_unchecked(self, identity)
628 }
629}
630
631fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
632 columns.iter()
633 .enumerate()
634 .map(|(idx, col)| {
635 let constraint = TypeConstraint::unconstrained(col.data_type.clone());
636 Column {
637 id: ColumnId(idx as u64),
638 name: col.name.clone(),
639 constraint,
640 properties: vec![],
641 index: ColumnIndex(idx as u8),
642 auto_increment: false,
643 dictionary_id: None,
644 }
645 })
646 .collect()
647}