1use std::{ops::Deref, sync::Arc, time::Duration};
5
6use reifydb_auth::service::AuthEngine;
7use reifydb_catalog::{
8 catalog::Catalog,
9 materialized::MaterializedCatalog,
10 schema::RowSchemaRegistry,
11 vtable::{
12 system::flow_operator_store::{SystemFlowOperatorEventListener, SystemFlowOperatorStore},
13 tables::UserVTableDataFunction,
14 user::{UserVTable, UserVTableColumn, registry::UserVTableEntry},
15 },
16};
17use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
18use reifydb_core::{
19 common::CommitVersion,
20 error::diagnostic::catalog::namespace_not_found,
21 event::{Event, EventBus},
22 interface::{
23 WithEventBus,
24 catalog::{
25 column::{Column, ColumnIndex},
26 id::ColumnId,
27 vtable::{VTable, VTableId},
28 },
29 },
30 util::ioc::IocContainer,
31};
32use reifydb_extension::transform::registry::Transforms;
33use reifydb_metric::metric::MetricReader;
34use reifydb_routine::{function::registry::Functions, procedure::registry::Procedures};
35use reifydb_runtime::{actor::system::ActorSystem, context::RuntimeContext};
36use reifydb_store_single::SingleStore;
37use reifydb_transaction::{
38 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
39 multi::transaction::MultiTransaction,
40 single::SingleTransaction,
41 transaction::{
42 admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
43 subscription::SubscriptionTransaction,
44 },
45};
46use reifydb_type::{
47 error::Error,
48 fragment::Fragment,
49 params::Params,
50 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId},
51};
52use tracing::instrument;
53
54#[cfg(not(target_arch = "wasm32"))]
55use crate::remote::RemoteRegistry;
56use crate::{
57 Result,
58 bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
59 interceptor::catalog::MaterializedCatalogInterceptor,
60 vm::{Admin, Command, Query, Subscription, executor::Executor},
61};
62
63pub struct StandardEngine(Arc<Inner>);
64
65impl WithEventBus for StandardEngine {
66 fn event_bus(&self) -> &EventBus {
67 &self.event_bus
68 }
69}
70
71impl AuthEngine for StandardEngine {
72 fn begin_admin(&self) -> Result<AdminTransaction> {
73 StandardEngine::begin_admin(self, IdentityId::system())
74 }
75
76 fn begin_query(&self) -> Result<QueryTransaction> {
77 StandardEngine::begin_query(self, IdentityId::system())
78 }
79
80 fn catalog(&self) -> Catalog {
81 StandardEngine::catalog(self)
82 }
83}
84
85impl StandardEngine {
87 #[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
88 pub fn begin_command(&self, identity: IdentityId) -> Result<CommandTransaction> {
89 let interceptors = self.interceptors.create();
90 let mut txn = CommandTransaction::new(
91 self.multi.clone(),
92 self.single.clone(),
93 self.event_bus.clone(),
94 interceptors,
95 identity,
96 )?;
97 txn.set_executor(Arc::new(self.executor.clone()));
98 Ok(txn)
99 }
100
101 #[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
102 pub fn begin_admin(&self, identity: IdentityId) -> Result<AdminTransaction> {
103 let interceptors = self.interceptors.create();
104 let mut txn = AdminTransaction::new(
105 self.multi.clone(),
106 self.single.clone(),
107 self.event_bus.clone(),
108 interceptors,
109 identity,
110 )?;
111 txn.set_executor(Arc::new(self.executor.clone()));
112 Ok(txn)
113 }
114
115 #[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
116 pub fn begin_query(&self, identity: IdentityId) -> Result<QueryTransaction> {
117 let mut txn = QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), identity);
118 txn.set_executor(Arc::new(self.executor.clone()));
119 Ok(txn)
120 }
121
122 #[instrument(name = "engine::transaction::begin_subscription", level = "debug", skip(self))]
123 pub fn begin_subscription(&self, identity: IdentityId) -> Result<SubscriptionTransaction> {
124 let interceptors = self.interceptors.create();
125 let mut txn = SubscriptionTransaction::new(
126 self.multi.clone(),
127 self.single.clone(),
128 self.event_bus.clone(),
129 interceptors,
130 identity,
131 )?;
132 txn.set_executor(Arc::new(self.executor.clone()));
133 Ok(txn)
134 }
135
136 #[instrument(name = "engine::admin", level = "debug", skip(self, params), fields(rql = %rql))]
137 pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
138 (|| {
139 let mut txn = self.begin_admin(identity)?;
140 let frames = self.executor.admin(
141 &mut txn,
142 Admin {
143 rql,
144 params,
145 },
146 )?;
147 txn.commit()?;
148 Ok(frames)
149 })()
150 .map_err(|mut err: Error| {
151 err.with_statement(rql.to_string());
152 err
153 })
154 }
155
156 #[instrument(name = "engine::command", level = "debug", skip(self, params), fields(rql = %rql))]
157 pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
158 (|| {
159 let mut txn = self.begin_command(identity)?;
160 let frames = self.executor.command(
161 &mut txn,
162 Command {
163 rql,
164 params,
165 },
166 )?;
167 txn.commit()?;
168 Ok(frames)
169 })()
170 .map_err(|mut err: Error| {
171 err.with_statement(rql.to_string());
172 err
173 })
174 }
175
176 #[instrument(name = "engine::query", level = "debug", skip(self, params), fields(rql = %rql))]
177 pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
178 (|| {
179 let mut txn = self.begin_query(identity)?;
180 self.executor.query(
181 &mut txn,
182 Query {
183 rql,
184 params,
185 },
186 )
187 })()
188 .map_err(|mut err: Error| {
189 err.with_statement(rql.to_string());
190 err
191 })
192 }
193
194 #[instrument(name = "engine::subscription", level = "debug", skip(self, params), fields(rql = %rql))]
195 pub fn subscription_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
196 (|| {
197 let mut txn = self.begin_subscription(identity)?;
198 let frames = self.executor.subscription(
199 &mut txn,
200 Subscription {
201 rql,
202 params,
203 },
204 )?;
205 txn.commit()?;
206 Ok(frames)
207 })()
208 .map_err(|mut err: Error| {
209 err.with_statement(rql.to_string());
210 err
211 })
212 }
213
214 #[instrument(name = "engine::procedure", level = "debug", skip(self, params), fields(name = %name))]
216 pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> Result<Vec<Frame>> {
217 let mut txn = self.begin_command(identity)?;
218 let frames = self.executor.call_procedure(&mut txn, name, ¶ms)?;
219 txn.commit()?;
220 Ok(frames)
221 }
222
223 pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
260 let catalog = self.materialized_catalog();
261
262 let ns_def = catalog
264 .find_namespace_by_name(namespace)
265 .ok_or_else(|| Error(namespace_not_found(Fragment::None, namespace)))?;
266
267 let table_id = self.executor.virtual_table_registry.allocate_id();
269 let table_columns = table.definition();
271 let columns = convert_vtable_user_columns_to_columns(&table_columns);
272
273 let def = Arc::new(VTable {
275 id: table_id,
276 namespace: ns_def.id(),
277 name: name.to_string(),
278 columns,
279 });
280
281 catalog.register_vtable_user(def.clone())?;
283 let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
285 let entry = UserVTableEntry {
287 def: def.clone(),
288 data_fn,
289 };
290 self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
291 Ok(table_id)
292 }
293}
294
295impl CdcHost for StandardEngine {
296 fn begin_command(&self) -> Result<CommandTransaction> {
297 StandardEngine::begin_command(self, IdentityId::system())
298 }
299
300 fn begin_query(&self) -> Result<QueryTransaction> {
301 StandardEngine::begin_query(self, IdentityId::system())
302 }
303
304 fn current_version(&self) -> Result<CommitVersion> {
305 StandardEngine::current_version(self)
306 }
307
308 fn done_until(&self) -> CommitVersion {
309 StandardEngine::done_until(self)
310 }
311
312 fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
313 StandardEngine::wait_for_mark_timeout(self, version, timeout)
314 }
315
316 fn row_schema_registry(&self) -> &RowSchemaRegistry {
317 &self.catalog.schema
318 }
319}
320
321impl Clone for StandardEngine {
322 fn clone(&self) -> Self {
323 Self(self.0.clone())
324 }
325}
326
327impl Deref for StandardEngine {
328 type Target = Inner;
329
330 fn deref(&self) -> &Self::Target {
331 &self.0
332 }
333}
334
335pub struct Inner {
336 multi: MultiTransaction,
337 single: SingleTransaction,
338 event_bus: EventBus,
339 executor: Executor,
340 interceptors: Arc<InterceptorFactory>,
341 catalog: Catalog,
342 flow_operator_store: SystemFlowOperatorStore,
343}
344
345impl StandardEngine {
346 pub fn new(
347 multi: MultiTransaction,
348 single: SingleTransaction,
349 event_bus: EventBus,
350 interceptors: InterceptorFactory,
351 catalog: Catalog,
352 runtime_context: RuntimeContext,
353 functions: Functions,
354 procedures: Procedures,
355 transforms: Transforms,
356 ioc: IocContainer,
357 #[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
358 ) -> Self {
359 let flow_operator_store = SystemFlowOperatorStore::new();
360 let listener = SystemFlowOperatorEventListener::new(flow_operator_store.clone());
361 event_bus.register(listener);
362
363 let metrics_store = ioc
365 .resolve::<SingleStore>()
366 .expect("SingleStore must be registered in IocContainer for metrics");
367 let stats_reader = MetricReader::new(metrics_store);
368
369 let materialized = catalog.materialized.clone();
371 interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
372 interceptors
373 .post_commit
374 .add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
375 }));
376
377 let interceptors = Arc::new(interceptors);
378
379 Self(Arc::new(Inner {
380 multi,
381 single,
382 event_bus,
383 executor: Executor::new(
384 catalog.clone(),
385 runtime_context,
386 functions,
387 procedures,
388 transforms,
389 flow_operator_store.clone(),
390 stats_reader,
391 ioc,
392 #[cfg(not(target_arch = "wasm32"))]
393 remote_registry,
394 ),
395 interceptors,
396 catalog,
397 flow_operator_store,
398 }))
399 }
400
401 pub fn create_interceptors(&self) -> Interceptors {
403 self.interceptors.create()
404 }
405
406 pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
411 self.interceptors.add_late(factory);
412 }
413
414 #[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
419 ))]
420 pub fn begin_query_at_version(&self, version: CommitVersion, identity: IdentityId) -> Result<QueryTransaction> {
421 let mut txn = QueryTransaction::new(
422 self.multi.begin_query_at_version(version)?,
423 self.single.clone(),
424 identity,
425 );
426 txn.set_executor(Arc::new(self.executor.clone()));
427 Ok(txn)
428 }
429
430 #[inline]
431 pub fn multi(&self) -> &MultiTransaction {
432 &self.multi
433 }
434
435 #[inline]
436 pub fn multi_owned(&self) -> MultiTransaction {
437 self.multi.clone()
438 }
439
440 #[inline]
442 pub fn actor_system(&self) -> ActorSystem {
443 self.multi.actor_system()
444 }
445
446 #[inline]
447 pub fn single(&self) -> &SingleTransaction {
448 &self.single
449 }
450
451 #[inline]
452 pub fn single_owned(&self) -> SingleTransaction {
453 self.single.clone()
454 }
455
456 #[inline]
457 pub fn emit<E: Event>(&self, event: E) {
458 self.event_bus.emit(event)
459 }
460
461 #[inline]
462 pub fn materialized_catalog(&self) -> &MaterializedCatalog {
463 &self.catalog.materialized
464 }
465
466 #[inline]
470 pub fn catalog(&self) -> Catalog {
471 self.catalog.clone()
472 }
473
474 #[inline]
475 pub fn flow_operator_store(&self) -> &SystemFlowOperatorStore {
476 &self.flow_operator_store
477 }
478
479 #[inline]
481 pub fn current_version(&self) -> Result<CommitVersion> {
482 self.multi.current_version()
483 }
484
485 #[inline]
489 pub fn done_until(&self) -> CommitVersion {
490 self.multi.done_until()
491 }
492
493 #[inline]
496 pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
497 self.multi.wait_for_mark_timeout(version, timeout)
498 }
499
500 #[inline]
501 pub fn executor(&self) -> Executor {
502 self.executor.clone()
503 }
504
505 #[inline]
510 pub fn cdc_store(&self) -> CdcStore {
511 self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
512 }
513
514 pub fn shutdown(&self) {
515 self.interceptors.clear_late();
516 self.executor.ioc.clear();
517 }
518
519 pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
537 BulkInsertBuilder::new(self, identity)
538 }
539
540 pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
550 BulkInsertBuilder::new_trusted(self, identity)
551 }
552}
553
554fn convert_vtable_user_columns_to_columns(columns: &[UserVTableColumn]) -> Vec<Column> {
556 columns.iter()
557 .enumerate()
558 .map(|(idx, col)| {
559 let constraint = TypeConstraint::unconstrained(col.data_type.clone());
562 Column {
563 id: ColumnId(idx as u64),
564 name: col.name.clone(),
565 constraint,
566 properties: vec![],
567 index: ColumnIndex(idx as u8),
568 auto_increment: false,
569 dictionary_id: None,
570 }
571 })
572 .collect()
573}