1use std::{
5 ops::Deref,
6 sync::{
7 Arc,
8 atomic::{AtomicBool, Ordering},
9 },
10 time::Duration,
11};
12
13use reifydb_auth::service::AuthEngine;
14use reifydb_catalog::{
15 catalog::Catalog,
16 materialized::MaterializedCatalog,
17 vtable::{
18 system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
19 tables::UserVTableDataFunction,
20 user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
21 },
22};
23use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
24use reifydb_core::{
25 common::CommitVersion,
26 error::diagnostic::{catalog::namespace_not_found, engine::read_only_rejection},
27 event::{Event, EventBus},
28 interface::{
29 WithEventBus,
30 catalog::{
31 column::{Column, ColumnIndex},
32 id::ColumnId,
33 vtable::{VTable, VTableId},
34 },
35 },
36};
37use reifydb_metric_old::metric::MetricReader;
38use reifydb_runtime::{actor::system::ActorSystem, context::clock::Clock};
39use reifydb_store_single::SingleStore;
40use reifydb_transaction::{
41 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
42 multi::transaction::MultiTransaction,
43 single::SingleTransaction,
44 transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
45};
46use reifydb_type::{
47 error::Error,
48 fragment::Fragment,
49 params::Params,
50 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId},
51};
52use tracing::instrument;
53
54use crate::{
55 Result,
56 bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
57 interceptor::catalog::MaterializedCatalogInterceptor,
58 vm::{Admin, Command, Query, Subscription, executor::Executor, services::EngineConfig},
59};
60
61pub struct StandardEngine(Arc<Inner>);
62
63impl WithEventBus for StandardEngine {
64 fn event_bus(&self) -> &EventBus {
65 &self.event_bus
66 }
67}
68
69impl AuthEngine for StandardEngine {
70 fn begin_admin(&self) -> Result<AdminTransaction> {
71 StandardEngine::begin_admin(self, IdentityId::system())
72 }
73
74 fn begin_query(&self) -> Result<QueryTransaction> {
75 StandardEngine::begin_query(self, IdentityId::system())
76 }
77
78 fn catalog(&self) -> Catalog {
79 StandardEngine::catalog(self)
80 }
81}
82
83impl StandardEngine {
85 #[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
86 pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
87 let interceptors = self.interceptors.create();
88 let mut txn = CommandTransaction::new(
89 self.multi.clone(),
90 self.single.clone(),
91 self.event_bus.clone(),
92 interceptors,
93 identity,
94 self.executor.runtime_context.clock.clone(),
95 )?;
96 txn.set_executor(Arc::new(self.executor.clone()));
97 Ok(txn)
98 }
99
100 #[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
101 pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
102 let interceptors = self.interceptors.create();
103 let mut txn = AdminTransaction::new(
104 self.multi.clone(),
105 self.single.clone(),
106 self.event_bus.clone(),
107 interceptors,
108 identity,
109 self.executor.runtime_context.clock.clone(),
110 )?;
111 txn.set_executor(Arc::new(self.executor.clone()));
112 Ok(txn)
113 }
114
115 #[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
116 pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
117 let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
118 txn.set_executor(Arc::new(self.executor.clone()));
119 Ok(txn)
120 }
121
122 pub fn clock(&self) -> &Clock {
124 &self.executor.runtime_context.clock
125 }
126
127 #[instrument(name = "engine::admin_as", level = "debug", skip(self, params), fields(rql = %rql))]
128 pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
129 self.reject_if_read_only()?;
130 (|| {
131 let mut txn = self.begin_admin(identity)?;
132 let frames = self.executor.admin(
133 &mut txn,
134 Admin {
135 rql,
136 params,
137 },
138 )?;
139 txn.commit()?;
140 Ok(frames)
141 })()
142 .map_err(|mut err: Error| {
143 err.with_statement(rql.to_string());
144 err
145 })
146 }
147
148 #[instrument(name = "engine::command_as", level = "debug", skip(self, params), fields(rql = %rql))]
149 pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
150 self.reject_if_read_only()?;
151 (|| {
152 let mut txn = self.begin_command(identity)?;
153 let frames = self.executor.command(
154 &mut txn,
155 Command {
156 rql,
157 params,
158 },
159 )?;
160 txn.commit()?;
161 Ok(frames)
162 })()
163 .map_err(|mut err: Error| {
164 err.with_statement(rql.to_string());
165 err
166 })
167 }
168
169 #[instrument(name = "engine::query_as", level = "debug", skip(self, params), fields(rql = %rql))]
170 pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
171 (|| {
172 let mut txn = self.begin_query(identity)?;
173 self.executor.query(
174 &mut txn,
175 Query {
176 rql,
177 params,
178 },
179 )
180 })()
181 .map_err(|mut err: Error| {
182 err.with_statement(rql.to_string());
183 err
184 })
185 }
186
187 #[instrument(name = "engine::subscribe_as", level = "debug", skip(self, params), fields(rql = %rql))]
188 pub fn subscribe_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
189 (|| {
190 let mut txn = self.begin_query(identity)?;
191 let frames = self.executor.subscription(
192 &mut txn,
193 Subscription {
194 rql,
195 params,
196 },
197 )?;
198 Ok(frames)
199 })()
200 .map_err(|mut err: Error| {
201 err.with_statement(rql.to_string());
202 err
203 })
204 }
205
206 #[instrument(name = "engine::procedure_as", level = "debug", skip(self, params), fields(name = %name))]
208 pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> Result<Vec<Frame>> {
209 self.reject_if_read_only()?;
210 let mut txn = self.begin_command(identity)?;
211 let frames = self.executor.call_procedure(&mut txn, name, ¶ms)?;
212 txn.commit()?;
213 Ok(frames)
214 }
215
216 pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
253 let catalog = self.materialized_catalog();
254
255 let ns_def = catalog
257 .find_namespace_by_name(namespace)
258 .ok_or_else(|| Error(Box::new(namespace_not_found(Fragment::None, namespace))))?;
259
260 let table_id = self.executor.virtual_table_registry.allocate_id();
262 let table_columns = table.vtable();
264 let columns = convert_vtable_user_columns_to_columns(&table_columns);
265
266 let def = Arc::new(VTable {
268 id: table_id,
269 namespace: ns_def.id(),
270 name: name.to_string(),
271 columns,
272 });
273
274 catalog.register_vtable_user(def.clone())?;
276 let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
278 let entry = UserVTableEntry {
280 def: def.clone(),
281 data_fn,
282 };
283 self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
284 Ok(table_id)
285 }
286}
287
288impl CdcHost for StandardEngine {
289 fn begin_command(&self) -> Result<CommandTransaction> {
290 StandardEngine::begin_command(self, IdentityId::system())
291 }
292
293 fn begin_query(&self) -> Result<QueryTransaction> {
294 StandardEngine::begin_query(self, IdentityId::system())
295 }
296
297 fn current_version(&self) -> Result<CommitVersion> {
298 StandardEngine::current_version(self)
299 }
300
301 fn done_until(&self) -> CommitVersion {
302 StandardEngine::done_until(self)
303 }
304
305 fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
306 StandardEngine::wait_for_mark_timeout(self, version, timeout)
307 }
308
309 fn materialized_catalog(&self) -> &MaterializedCatalog {
310 &self.catalog.materialized
311 }
312}
313
314impl Clone for StandardEngine {
315 fn clone(&self) -> Self {
316 Self(self.0.clone())
317 }
318}
319
320impl Deref for StandardEngine {
321 type Target = Inner;
322
323 fn deref(&self) -> &Self::Target {
324 &self.0
325 }
326}
327
328pub struct Inner {
329 multi: MultiTransaction,
330 single: SingleTransaction,
331 event_bus: EventBus,
332 executor: Executor,
333 interceptors: Arc<InterceptorFactory>,
334 catalog: Catalog,
335 flow_operator_store: SystemFlowOperatorStore,
336 read_only: AtomicBool,
337}
338
339impl StandardEngine {
340 pub fn new(
341 multi: MultiTransaction,
342 single: SingleTransaction,
343 event_bus: EventBus,
344 interceptors: InterceptorFactory,
345 catalog: Catalog,
346 config: EngineConfig,
347 ) -> Self {
348 let flow_operator_store = SystemFlowOperatorStore::new();
349 let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
350 event_bus.register(listener);
351
352 let metrics_store = config
354 .ioc
355 .resolve::<SingleStore>()
356 .expect("SingleStore must be registered in IocContainer for metrics");
357 let stats_reader = MetricReader::new(metrics_store);
358
359 let materialized = catalog.materialized.clone();
361 interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
362 interceptors
363 .post_commit
364 .add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
365 }));
366
367 let interceptors = Arc::new(interceptors);
368
369 Self(Arc::new(Inner {
370 multi,
371 single,
372 event_bus,
373 executor: Executor::new(catalog.clone(), config, flow_operator_store.clone(), stats_reader),
374 interceptors,
375 catalog,
376 flow_operator_store,
377 read_only: AtomicBool::new(false),
378 }))
379 }
380
381 pub fn create_interceptors(&self) -> Interceptors {
383 self.interceptors.create()
384 }
385
386 pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
391 self.interceptors.add_late(factory);
392 }
393
394 #[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
399 ))]
400 pub fn begin_query_at_version(&self, version: CommitVersion, identity: IdentityId) -> Result<QueryTransaction> {
401 let mut txn = QueryTransaction::new(
402 self.multi.begin_query_at_version(version)?,
403 self.single.clone(),
404 identity,
405 );
406 txn.set_executor(Arc::new(self.executor.clone()));
407 Ok(txn)
408 }
409
410 #[inline]
411 pub fn multi(&self) -> &MultiTransaction {
412 &self.multi
413 }
414
415 #[inline]
416 pub fn multi_owned(&self) -> MultiTransaction {
417 self.multi.clone()
418 }
419
420 #[inline]
422 pub fn actor_system(&self) -> ActorSystem {
423 self.multi.actor_system()
424 }
425
426 #[inline]
427 pub fn single(&self) -> &SingleTransaction {
428 &self.single
429 }
430
431 #[inline]
432 pub fn single_owned(&self) -> SingleTransaction {
433 self.single.clone()
434 }
435
436 #[inline]
437 pub fn emit<E: Event>(&self, event: E) {
438 self.event_bus.emit(event)
439 }
440
441 #[inline]
442 pub fn materialized_catalog(&self) -> &MaterializedCatalog {
443 &self.catalog.materialized
444 }
445
446 #[inline]
450 pub fn catalog(&self) -> Catalog {
451 self.catalog.clone()
452 }
453
454 #[inline]
455 pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
456 &self.flow_operator_store
457 }
458
459 #[inline]
461 pub fn current_version(&self) -> Result<CommitVersion> {
462 self.multi.current_version()
463 }
464
465 #[inline]
469 pub fn done_until(&self) -> CommitVersion {
470 self.multi.done_until()
471 }
472
473 #[inline]
476 pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
477 self.multi.wait_for_mark_timeout(version, timeout)
478 }
479
480 #[inline]
481 pub fn executor(&self) -> Executor {
482 self.executor.clone()
483 }
484
485 #[inline]
490 pub fn cdc_store(&self) -> CdcStore {
491 self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
492 }
493
494 pub fn set_read_only(&self) {
497 self.read_only.store(true, Ordering::SeqCst);
498 }
499
500 pub fn is_read_only(&self) -> bool {
502 self.read_only.load(Ordering::SeqCst)
503 }
504
505 pub(crate) fn reject_if_read_only(&self) -> Result<()> {
506 if self.is_read_only() {
507 return Err(Error(Box::new(read_only_rejection(Fragment::None))));
508 }
509 Ok(())
510 }
511
512 pub fn shutdown(&self) {
513 self.interceptors.clear_late();
514 self.executor.ioc.clear();
515 }
516
517 pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
535 BulkInsertBuilder::new(self, identity)
536 }
537
538 pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
548 BulkInsertBuilder::new_trusted(self, identity)
549 }
550}
551
552fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
554 columns.iter()
555 .enumerate()
556 .map(|(idx, col)| {
557 let constraint = TypeConstraint::unconstrained(col.data_type.clone());
560 Column {
561 id: ColumnId(idx as u64),
562 name: col.name.clone(),
563 constraint,
564 properties: vec![],
565 index: ColumnIndex(idx as u8),
566 auto_increment: false,
567 dictionary_id: None,
568 }
569 })
570 .collect()
571}