1use std::{
5 ops::Deref,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10 time::Duration,
11};
12
13use reifydb_auth::service::AuthEngine;
14use reifydb_catalog::{
15 catalog::Catalog,
16 interceptor::CatalogCacheInterceptor,
17 vtable::{
18 system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
19 tables::UserVTableDataFunction,
20 user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
21 },
22};
23use reifydb_cdc::{
24 consume::{host::CdcHost, watermark::CdcConsumerWatermark},
25 produce::watermark::CdcProducerWatermark,
26 storage::CdcStore,
27};
28use reifydb_core::{
29 common::CommitVersion,
30 error::diagnostic::engine::read_only_rejection,
31 event::{Event, EventBus},
32 execution::ExecutionResult,
33 interface::{
34 WithEventBus,
35 catalog::{
36 column::{Column, ColumnIndex},
37 id::{ColumnId, NamespaceId},
38 vtable::{VTable, VTableId},
39 },
40 },
41 metric::ExecutionMetrics,
42 util::ioc::IocContainer,
43};
44use reifydb_metric::storage::metric::MetricReader;
45use reifydb_runtime::{
46 actor::{mailbox::ActorRef, system::ActorSystem},
47 context::{clock::Clock, rng::Rng},
48};
49use reifydb_store_single::SingleStore;
50use reifydb_transaction::{
51 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
52 multi::{lease::VersionLeaseGuard, transaction::MultiTransaction},
53 single::SingleTransaction,
54 transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
55};
56use reifydb_value::{
57 error::Error,
58 fragment::Fragment,
59 params::Params,
60 value::{constraint::TypeConstraint, identity::IdentityId},
61};
62use tracing::instrument;
63
64use crate::{
65 Result,
66 bulk_insert::builder::{BulkInsertBuilder, Unchecked, Validated},
67 vm::{
68 Admin, Command, Query, Subscription,
69 executor::Executor,
70 services::{EngineConfig, Services},
71 },
72};
73
74pub struct StandardEngine(Arc<Inner>);
75
76impl WithEventBus for StandardEngine {
77 fn event_bus(&self) -> &EventBus {
78 &self.event_bus
79 }
80}
81
82impl AuthEngine for StandardEngine {
83 fn begin_admin(&self) -> Result<AdminTransaction> {
84 StandardEngine::begin_admin(self, IdentityId::system())
85 }
86
87 fn begin_query(&self) -> Result<QueryTransaction> {
88 StandardEngine::begin_query(self, IdentityId::system())
89 }
90
91 fn catalog(&self) -> Catalog {
92 StandardEngine::catalog(self)
93 }
94}
95
96impl StandardEngine {
97 #[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
98 pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
99 #[cfg(reifydb_assertions)]
100 {
101 assert!(
102 !self.is_read_only(),
103 "begin_command called on a read-only engine: writes are permanently disabled after set_read_only(), so any caller reaching this point has bypassed the reject_if_read_only guard (identity={:?})",
104 identity
105 );
106 }
107 let interceptors = self.interceptors.create();
108 let mut txn = CommandTransaction::new(
109 self.multi.clone(),
110 self.single.clone(),
111 self.event_bus.clone(),
112 interceptors,
113 identity,
114 self.executor.runtime_context.clock.clone(),
115 )?;
116 txn.set_executor(Arc::new(self.executor.clone()));
117 Ok(txn)
118 }
119
120 #[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
121 pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
122 let interceptors = self.interceptors.create();
123 let mut txn = AdminTransaction::new(
124 self.multi.clone(),
125 self.single.clone(),
126 self.event_bus.clone(),
127 interceptors,
128 identity,
129 self.executor.runtime_context.clock.clone(),
130 )?;
131 txn.set_executor(Arc::new(self.executor.clone()));
132 Ok(txn)
133 }
134
135 #[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
136 pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
137 let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
138 txn.set_executor(Arc::new(self.executor.clone()));
139 Ok(txn)
140 }
141
142 pub fn clock(&self) -> &Clock {
143 &self.executor.runtime_context.clock
144 }
145
146 pub fn rng(&self) -> &Rng {
147 &self.executor.runtime_context.rng
148 }
149
150 #[instrument(name = "engine::admin_as", level = "debug", skip(self, params), fields(rql = %rql))]
151 pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
152 if let Err(e) = self.reject_if_read_only() {
153 return ExecutionResult {
154 frames: vec![],
155 error: Some(e),
156 metrics: ExecutionMetrics::default(),
157 };
158 }
159 let mut txn = match self.begin_admin(identity) {
160 Ok(t) => t,
161 Err(mut e) => {
162 e.with_rql(rql.to_string());
163 return ExecutionResult {
164 frames: vec![],
165 error: Some(e),
166 metrics: ExecutionMetrics::default(),
167 };
168 }
169 };
170 let mut outcome = self.executor.admin(
171 &mut txn,
172 Admin {
173 rql,
174 params,
175 },
176 );
177 if outcome.is_ok()
178 && let Err(mut e) = txn.commit()
179 {
180 e.with_rql(rql.to_string());
181 outcome.error = Some(e);
182 }
183 if let Some(ref mut e) = outcome.error {
184 e.with_rql(rql.to_string());
185 }
186 outcome
187 }
188
189 #[instrument(name = "engine::command_as", level = "debug", skip(self, params), fields(rql = %rql))]
190 pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
191 if let Err(e) = self.reject_if_read_only() {
192 return ExecutionResult {
193 frames: vec![],
194 error: Some(e),
195 metrics: ExecutionMetrics::default(),
196 };
197 }
198 let mut txn = match self.begin_command(identity) {
199 Ok(t) => t,
200 Err(mut e) => {
201 e.with_rql(rql.to_string());
202 return ExecutionResult {
203 frames: vec![],
204 error: Some(e),
205 metrics: ExecutionMetrics::default(),
206 };
207 }
208 };
209 let mut outcome = self.executor.command(
210 &mut txn,
211 Command {
212 rql,
213 params,
214 },
215 );
216 if outcome.is_ok()
217 && let Err(mut e) = txn.commit()
218 {
219 e.with_rql(rql.to_string());
220 outcome.error = Some(e);
221 }
222 if let Some(ref mut e) = outcome.error {
223 e.with_rql(rql.to_string());
224 }
225 outcome
226 }
227
228 #[instrument(name = "engine::query_as", level = "debug", skip(self, params), fields(rql = %rql))]
229 pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
230 let mut txn = match self.begin_query(identity) {
231 Ok(t) => t,
232 Err(mut e) => {
233 e.with_rql(rql.to_string());
234 return ExecutionResult {
235 frames: vec![],
236 error: Some(e),
237 metrics: ExecutionMetrics::default(),
238 };
239 }
240 };
241 let mut outcome = self.executor.query(
242 &mut txn,
243 Query {
244 rql,
245 params,
246 },
247 );
248 if let Some(ref mut e) = outcome.error {
249 e.with_rql(rql.to_string());
250 }
251 outcome
252 }
253
254 #[instrument(name = "engine::query_as_at_version", level = "debug", skip(self, params, lease), fields(rql = %rql, version = %lease.version().0))]
255 pub fn query_as_at_version(
256 &self,
257 identity: IdentityId,
258 rql: &str,
259 params: Params,
260 lease: &VersionLeaseGuard,
261 ) -> ExecutionResult {
262 let mut txn = match self.begin_query_at_version(lease, identity) {
263 Ok(t) => t,
264 Err(mut e) => {
265 e.with_rql(rql.to_string());
266 return ExecutionResult {
267 frames: vec![],
268 error: Some(e),
269 metrics: ExecutionMetrics::default(),
270 };
271 }
272 };
273 let mut outcome = self.executor.query(
274 &mut txn,
275 Query {
276 rql,
277 params,
278 },
279 );
280 if let Some(ref mut e) = outcome.error {
281 e.with_rql(rql.to_string());
282 }
283 outcome
284 }
285
286 #[instrument(name = "engine::query_in_txn", level = "debug", skip(self, txn, params), fields(rql = %rql))]
287 pub fn query_in_txn(&self, txn: &mut QueryTransaction, rql: &str, params: Params) -> ExecutionResult {
288 let mut outcome = self.executor.query(
289 txn,
290 Query {
291 rql,
292 params,
293 },
294 );
295 if let Some(ref mut e) = outcome.error {
296 e.with_rql(rql.to_string());
297 }
298 outcome
299 }
300
301 #[instrument(name = "engine::subscribe_as", level = "debug", skip(self, params), fields(rql = %rql))]
302 pub fn subscribe_as(&self, identity: IdentityId, rql: &str, params: Params) -> ExecutionResult {
303 let mut txn = match self.begin_query(identity) {
304 Ok(t) => t,
305 Err(mut e) => {
306 e.with_rql(rql.to_string());
307 return ExecutionResult {
308 frames: vec![],
309 error: Some(e),
310 metrics: ExecutionMetrics::default(),
311 };
312 }
313 };
314 let mut outcome = self.executor.subscription(
315 &mut txn,
316 Subscription {
317 rql,
318 params,
319 },
320 );
321 if let Some(ref mut e) = outcome.error {
322 e.with_rql(rql.to_string());
323 }
324 outcome
325 }
326
327 #[instrument(name = "engine::procedure_as", level = "debug", skip(self, params), fields(name = %name))]
328 pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> ExecutionResult {
329 if let Err(e) = self.reject_if_read_only() {
330 return ExecutionResult {
331 frames: vec![],
332 error: Some(e),
333 metrics: ExecutionMetrics::default(),
334 };
335 }
336 let mut txn = match self.begin_command(identity) {
337 Ok(t) => t,
338 Err(e) => {
339 return ExecutionResult {
340 frames: vec![],
341 error: Some(e),
342 metrics: ExecutionMetrics::default(),
343 };
344 }
345 };
346 let mut outcome = self.executor.call_procedure(&mut txn, name, ¶ms);
347 if outcome.is_ok()
348 && let Err(e) = txn.commit()
349 {
350 outcome.error = Some(e);
351 }
352 outcome
353 }
354
355 pub fn register_virtual_table<T: UserVTable>(
356 &self,
357 namespace_id: NamespaceId,
358 name: &str,
359 table: T,
360 ) -> Result<VTableId> {
361 let catalog = self.catalog();
362 let table_id = self.executor.virtual_table_registry.allocate_id();
363
364 let table_columns = table.vtable();
365 let columns = convert_vtable_user_columns_to_columns(&table_columns);
366
367 let def = Arc::new(VTable {
368 id: table_id,
369 namespace: namespace_id,
370 name: name.to_string(),
371 columns,
372 });
373
374 catalog.register_vtable_user(def.clone())?;
375
376 let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
377
378 let entry = UserVTableEntry {
379 def: def.clone(),
380 data_fn,
381 };
382 self.executor.virtual_table_registry.register(namespace_id, name.to_string(), entry);
383 Ok(table_id)
384 }
385}
386
387impl CdcHost for StandardEngine {
388 fn begin_command(&self) -> Result<CommandTransaction> {
389 StandardEngine::begin_command(self, IdentityId::system())
390 }
391
392 fn begin_query(&self) -> Result<QueryTransaction> {
393 StandardEngine::begin_query(self, IdentityId::system())
394 }
395
396 fn current_version(&self) -> Result<CommitVersion> {
397 StandardEngine::current_version(self)
398 }
399
400 fn done_until(&self) -> CommitVersion {
401 StandardEngine::done_until(self)
402 }
403
404 fn cdc_producer_watermark(&self) -> CommitVersion {
405 StandardEngine::cdc_producer_watermark(self)
406 }
407
408 fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
409 StandardEngine::wait_for_mark_timeout(self, version, timeout)
410 }
411
412 fn catalog(&self) -> &Catalog {
413 &self.catalog
414 }
415}
416
417impl Clone for StandardEngine {
418 fn clone(&self) -> Self {
419 Self(self.0.clone())
420 }
421}
422
423impl Deref for StandardEngine {
424 type Target = Inner;
425
426 fn deref(&self) -> &Self::Target {
427 &self.0
428 }
429}
430
431pub struct Inner {
432 multi: MultiTransaction,
433 single: SingleTransaction,
434 event_bus: EventBus,
435 executor: Executor,
436 interceptors: Arc<InterceptorFactory>,
437 catalog: Catalog,
438 flow_operator_store: SystemFlowOperatorStore,
439 read_only: AtomicBool,
440}
441
442impl StandardEngine {
443 pub fn new(
444 multi: MultiTransaction,
445 single: SingleTransaction,
446 event_bus: EventBus,
447 interceptors: InterceptorFactory,
448 catalog: Catalog,
449 config: EngineConfig,
450 ) -> Self {
451 let flow_operator_store = SystemFlowOperatorStore::new();
452 let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
453 event_bus.register(listener);
454
455 let metrics_store = config
456 .ioc
457 .resolve::<SingleStore>()
458 .expect("SingleStore must be registered in IocContainer for metrics");
459 let stats_reader = MetricReader::new(metrics_store);
460
461 let catalog_for_interceptor = catalog.clone();
462 interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
463 interceptors.post_commit.add(Arc::new(CatalogCacheInterceptor::new(&catalog_for_interceptor)));
464 }));
465
466 let interceptors = Arc::new(interceptors);
467
468 Self(Arc::new(Inner {
469 multi,
470 single,
471 event_bus,
472 executor: Executor::new(catalog.clone(), config, flow_operator_store.clone(), stats_reader),
473 interceptors,
474 catalog,
475 flow_operator_store,
476 read_only: AtomicBool::new(false),
477 }))
478 }
479
480 pub fn create_interceptors(&self) -> Interceptors {
481 self.interceptors.create()
482 }
483
484 pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
485 self.interceptors.add_late(factory);
486 }
487
488 #[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self, lease), fields(version = %lease.version().0
489 ))]
490 pub fn begin_query_at_version(
491 &self,
492 lease: &VersionLeaseGuard,
493 identity: IdentityId,
494 ) -> Result<QueryTransaction> {
495 let mut txn =
496 QueryTransaction::new(self.multi.begin_query_at_version(lease)?, self.single.clone(), identity);
497 txn.set_executor(Arc::new(self.executor.clone()));
498 Ok(txn)
499 }
500
501 #[instrument(name = "engine::acquire_version_lease", level = "debug", skip(self), fields(version = %version.0))]
502 pub fn acquire_version_lease(&self, version: CommitVersion) -> Result<VersionLeaseGuard> {
503 self.multi.acquire_version_lease(version)
504 }
505
506 #[instrument(name = "engine::acquire_current_snapshot_lease", level = "debug", skip(self))]
507 pub fn acquire_current_snapshot_lease(&self) -> Result<(CommitVersion, VersionLeaseGuard)> {
508 self.multi.acquire_current_snapshot_lease()
509 }
510
511 #[inline]
512 pub fn multi(&self) -> &MultiTransaction {
513 &self.multi
514 }
515
516 #[inline]
517 pub fn multi_owned(&self) -> MultiTransaction {
518 self.multi.clone()
519 }
520
521 #[inline]
522 pub fn actor_system(&self) -> ActorSystem {
523 self.multi.actor_system()
524 }
525
526 #[inline]
527 pub fn single(&self) -> &SingleTransaction {
528 &self.single
529 }
530
531 #[inline]
532 pub fn single_owned(&self) -> SingleTransaction {
533 self.single.clone()
534 }
535
536 #[inline]
537 pub fn emit<E: Event>(&self, event: E) {
538 self.event_bus.emit(event)
539 }
540
541 #[inline]
542 pub fn catalog(&self) -> Catalog {
543 self.catalog.clone()
544 }
545
546 #[inline]
547 pub fn services(&self) -> Arc<Services> {
548 self.executor.services().clone()
549 }
550
551 #[inline]
552 pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
553 &self.flow_operator_store
554 }
555
556 #[inline]
557 pub fn current_version(&self) -> Result<CommitVersion> {
558 self.multi.current_version()
559 }
560
561 #[inline]
562 pub fn done_until(&self) -> CommitVersion {
563 self.multi.done_until()
564 }
565
566 #[inline]
567 pub fn query_done_until(&self) -> CommitVersion {
568 self.multi.query_done_until()
569 }
570
571 #[inline]
572 pub fn oracle_window_count(&self) -> usize {
573 self.multi.oracle_window_count()
574 }
575
576 #[inline]
577 pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
578 self.multi.wait_for_mark_timeout(version, timeout)
579 }
580
581 #[inline]
582 pub fn executor(&self) -> Executor {
583 self.executor.clone()
584 }
585
586 #[inline]
587 pub fn ioc(&self) -> &IocContainer {
588 &self.executor.ioc
589 }
590
591 #[inline]
592 pub fn cdc_store(&self) -> CdcStore {
593 self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
594 }
595
596 #[inline]
597 pub fn actor<M: 'static>(&self) -> Option<ActorRef<M>>
598 where
599 ActorRef<M>: Send + Sync,
600 {
601 self.executor.ioc.try_resolve::<ActorRef<M>>()
602 }
603
604 #[inline]
605 pub fn cdc_producer_watermark(&self) -> CommitVersion {
606 self.executor.ioc.try_resolve::<CdcProducerWatermark>().map(|w| w.get()).unwrap_or(CommitVersion(0))
607 }
608
609 #[inline]
610 pub fn cdc_consumer_watermark(&self) -> CommitVersion {
611 self.executor.ioc.try_resolve::<CdcConsumerWatermark>().map(|w| w.get()).unwrap_or(CommitVersion(0))
612 }
613
614 pub fn set_read_only(&self) {
615 self.read_only.store(true, Ordering::SeqCst);
616 }
617
618 pub fn is_read_only(&self) -> bool {
619 self.read_only.load(Ordering::SeqCst)
620 }
621
622 pub(crate) fn reject_if_read_only(&self) -> Result<()> {
623 if self.is_read_only() {
624 return Err(Error(Box::new(read_only_rejection(Fragment::None))));
625 }
626 Ok(())
627 }
628
629 pub fn shutdown(&self) {
630 self.interceptors.clear_late();
631 self.executor.ioc.clear();
632 }
633
634 pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
635 BulkInsertBuilder::new(self, identity)
636 }
637
638 pub fn bulk_insert_unchecked<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Unchecked> {
639 BulkInsertBuilder::new_unchecked(self, identity)
640 }
641}
642
643fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
644 columns.iter()
645 .enumerate()
646 .map(|(idx, col)| {
647 let constraint = TypeConstraint::unconstrained(col.data_type.clone());
648 Column {
649 id: ColumnId(idx as u64),
650 name: col.name.clone(),
651 constraint,
652 properties: vec![],
653 index: ColumnIndex(idx as u8),
654 auto_increment: false,
655 dictionary_id: None,
656 }
657 })
658 .collect()
659}