1use std::{ops::Deref, sync::Arc, time::Duration};
5
6use reifydb_catalog::{
7 catalog::Catalog,
8 materialized::MaterializedCatalog,
9 schema::SchemaRegistry,
10 vtable::{
11 system::flow_operator_store::{FlowOperatorEventListener, FlowOperatorStore},
12 tables::UserVTableDataFunction,
13 user::{UserVTable, UserVTableColumnDef, registry::UserVTableEntry},
14 },
15};
16use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
17use reifydb_core::{
18 common::CommitVersion,
19 error::diagnostic::catalog::namespace_not_found,
20 event::{Event, EventBus},
21 interface::{
22 WithEventBus,
23 catalog::{
24 column::{ColumnDef, ColumnIndex},
25 id::ColumnId,
26 vtable::{VTableDef, VTableId},
27 },
28 },
29 util::ioc::IocContainer,
30};
31use reifydb_function::registry::Functions;
32use reifydb_metric::metric::MetricReader;
33use reifydb_runtime::{actor::system::ActorSystem, clock::Clock};
34use reifydb_store_single::SingleStore;
35use reifydb_transaction::{
36 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
37 multi::transaction::MultiTransaction,
38 single::SingleTransaction,
39 transaction::{
40 admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
41 subscription::SubscriptionTransaction,
42 },
43};
44use reifydb_type::{
45 error::Error,
46 fragment::Fragment,
47 params::Params,
48 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId},
49};
50use tracing::instrument;
51
52#[cfg(not(target_arch = "wasm32"))]
53use crate::remote::RemoteRegistry;
54use crate::{
55 Result,
56 bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
57 interceptor::catalog::MaterializedCatalogInterceptor,
58 procedure::registry::Procedures,
59 transform::registry::Transforms,
60 vm::{Admin, Command, Query, Subscription, executor::Executor},
61};
62
63pub struct StandardEngine(Arc<Inner>);
64
65impl WithEventBus for StandardEngine {
66 fn event_bus(&self) -> &EventBus {
67 &self.event_bus
68 }
69}
70
71impl StandardEngine {
73 #[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
74 pub fn begin_command(&self) -> Result<CommandTransaction> {
75 let interceptors = self.interceptors.create();
76 CommandTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
77 }
78
79 #[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
80 pub fn begin_admin(&self) -> Result<AdminTransaction> {
81 let interceptors = self.interceptors.create();
82 AdminTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
83 }
84
85 #[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
86 pub fn begin_query(&self) -> Result<QueryTransaction> {
87 Ok(QueryTransaction::new(self.multi.begin_query()?, self.single.clone()))
88 }
89
90 #[instrument(name = "engine::transaction::begin_subscription", level = "debug", skip(self))]
91 pub fn begin_subscription(&self) -> Result<SubscriptionTransaction> {
92 let interceptors = self.interceptors.create();
93 SubscriptionTransaction::new(
94 self.multi.clone(),
95 self.single.clone(),
96 self.event_bus.clone(),
97 interceptors,
98 )
99 }
100
101 #[instrument(name = "engine::admin", level = "debug", skip(self, params), fields(rql = %rql))]
102 pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
103 (|| {
104 let mut txn = self.begin_admin()?;
105 let frames = self.executor.admin(
106 &mut txn,
107 Admin {
108 rql,
109 params,
110 identity,
111 },
112 )?;
113 txn.commit()?;
114 Ok(frames)
115 })()
116 .map_err(|mut err: Error| {
117 err.with_statement(rql.to_string());
118 err
119 })
120 }
121
122 #[instrument(name = "engine::command", level = "debug", skip(self, params), fields(rql = %rql))]
123 pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
124 (|| {
125 let mut txn = self.begin_command()?;
126 let frames = self.executor.command(
127 &mut txn,
128 Command {
129 rql,
130 params,
131 identity,
132 },
133 )?;
134 txn.commit()?;
135 Ok(frames)
136 })()
137 .map_err(|mut err: Error| {
138 err.with_statement(rql.to_string());
139 err
140 })
141 }
142
143 #[instrument(name = "engine::query", level = "debug", skip(self, params), fields(rql = %rql))]
144 pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
145 (|| {
146 let mut txn = self.begin_query()?;
147 self.executor.query(
148 &mut txn,
149 Query {
150 rql,
151 params,
152 identity,
153 },
154 )
155 })()
156 .map_err(|mut err: Error| {
157 err.with_statement(rql.to_string());
158 err
159 })
160 }
161
162 #[instrument(name = "engine::subscription", level = "debug", skip(self, params), fields(rql = %rql))]
163 pub fn subscription_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
164 (|| {
165 let mut txn = self.begin_subscription()?;
166 let frames = self.executor.subscription(
167 &mut txn,
168 Subscription {
169 rql,
170 params,
171 identity,
172 },
173 )?;
174 txn.commit()?;
175 Ok(frames)
176 })()
177 .map_err(|mut err: Error| {
178 err.with_statement(rql.to_string());
179 err
180 })
181 }
182
183 #[instrument(name = "engine::procedure", level = "debug", skip(self, params), fields(name = %name))]
185 pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> Result<Vec<Frame>> {
186 let mut txn = self.begin_command()?;
187 let frames = self.executor.call_procedure(&mut txn, identity, name, ¶ms)?;
188 txn.commit()?;
189 Ok(frames)
190 }
191
192 pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
229 let catalog = self.materialized_catalog();
230
231 let ns_def = catalog
233 .find_namespace_by_name(namespace)
234 .ok_or_else(|| Error(namespace_not_found(Fragment::None, namespace)))?;
235
236 let table_id = self.executor.virtual_table_registry.allocate_id();
238 let table_columns = table.definition();
240 let columns = convert_vtable_user_columns_to_column_defs(&table_columns);
241
242 let def = Arc::new(VTableDef {
244 id: table_id,
245 namespace: ns_def.id(),
246 name: name.to_string(),
247 columns,
248 });
249
250 catalog.register_vtable_user(def.clone())?;
252 let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
254 let entry = UserVTableEntry {
256 def: def.clone(),
257 data_fn,
258 };
259 self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
260 Ok(table_id)
261 }
262}
263
264impl CdcHost for StandardEngine {
265 fn begin_command(&self) -> Result<CommandTransaction> {
266 StandardEngine::begin_command(self)
267 }
268
269 fn begin_query(&self) -> Result<QueryTransaction> {
270 StandardEngine::begin_query(self)
271 }
272
273 fn current_version(&self) -> Result<CommitVersion> {
274 StandardEngine::current_version(self)
275 }
276
277 fn done_until(&self) -> CommitVersion {
278 StandardEngine::done_until(self)
279 }
280
281 fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
282 StandardEngine::wait_for_mark_timeout(self, version, timeout)
283 }
284
285 fn schema_registry(&self) -> &SchemaRegistry {
286 &self.catalog.schema
287 }
288}
289
290impl Clone for StandardEngine {
291 fn clone(&self) -> Self {
292 Self(self.0.clone())
293 }
294}
295
296impl Deref for StandardEngine {
297 type Target = Inner;
298
299 fn deref(&self) -> &Self::Target {
300 &self.0
301 }
302}
303
304pub struct Inner {
305 multi: MultiTransaction,
306 single: SingleTransaction,
307 event_bus: EventBus,
308 executor: Executor,
309 interceptors: InterceptorFactory,
310 catalog: Catalog,
311 flow_operator_store: FlowOperatorStore,
312}
313
314impl StandardEngine {
315 pub fn new(
316 multi: MultiTransaction,
317 single: SingleTransaction,
318 event_bus: EventBus,
319 interceptors: InterceptorFactory,
320 catalog: Catalog,
321 clock: Clock,
322 functions: Functions,
323 procedures: Procedures,
324 transforms: Transforms,
325 ioc: IocContainer,
326 #[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
327 ) -> Self {
328 let flow_operator_store = FlowOperatorStore::new();
329 let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
330 event_bus.register(listener);
331
332 let metrics_store = ioc
334 .resolve::<SingleStore>()
335 .expect("SingleStore must be registered in IocContainer for metrics");
336 let stats_reader = MetricReader::new(metrics_store);
337
338 let materialized = catalog.materialized.clone();
340 interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
341 interceptors
342 .post_commit
343 .add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
344 }));
345
346 Self(Arc::new(Inner {
347 multi,
348 single,
349 event_bus,
350 executor: Executor::new(
351 catalog.clone(),
352 clock,
353 functions,
354 procedures,
355 transforms,
356 flow_operator_store.clone(),
357 stats_reader,
358 ioc,
359 #[cfg(not(target_arch = "wasm32"))]
360 remote_registry,
361 ),
362 interceptors,
363 catalog,
364 flow_operator_store,
365 }))
366 }
367
368 pub fn create_interceptors(&self) -> Interceptors {
370 self.interceptors.create()
371 }
372
373 pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
378 self.interceptors.add_late(factory);
379 }
380
381 #[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
386 ))]
387 pub fn begin_query_at_version(&self, version: CommitVersion) -> Result<QueryTransaction> {
388 Ok(QueryTransaction::new(self.multi.begin_query_at_version(version)?, self.single.clone()))
389 }
390
391 #[inline]
392 pub fn multi(&self) -> &MultiTransaction {
393 &self.multi
394 }
395
396 #[inline]
397 pub fn multi_owned(&self) -> MultiTransaction {
398 self.multi.clone()
399 }
400
401 #[inline]
403 pub fn actor_system(&self) -> ActorSystem {
404 self.multi.actor_system()
405 }
406
407 #[inline]
408 pub fn single(&self) -> &SingleTransaction {
409 &self.single
410 }
411
412 #[inline]
413 pub fn single_owned(&self) -> SingleTransaction {
414 self.single.clone()
415 }
416
417 #[inline]
418 pub fn emit<E: Event>(&self, event: E) {
419 self.event_bus.emit(event)
420 }
421
422 #[inline]
423 pub fn materialized_catalog(&self) -> &MaterializedCatalog {
424 &self.catalog.materialized
425 }
426
427 #[inline]
431 pub fn catalog(&self) -> Catalog {
432 self.catalog.clone()
433 }
434
435 #[inline]
436 pub fn flow_operator_store(&self) -> &FlowOperatorStore {
437 &self.flow_operator_store
438 }
439
440 #[inline]
442 pub fn current_version(&self) -> Result<CommitVersion> {
443 self.multi.current_version()
444 }
445
446 #[inline]
450 pub fn done_until(&self) -> CommitVersion {
451 self.multi.done_until()
452 }
453
454 #[inline]
457 pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
458 self.multi.wait_for_mark_timeout(version, timeout)
459 }
460
461 #[inline]
462 pub fn executor(&self) -> Executor {
463 self.executor.clone()
464 }
465
466 #[inline]
471 pub fn cdc_store(&self) -> CdcStore {
472 self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
473 }
474
475 pub fn shutdown(&self) {
476 self.interceptors.clear_late();
477 self.executor.ioc.clear();
478 }
479
480 pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
498 BulkInsertBuilder::new(self, identity)
499 }
500
501 pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
511 BulkInsertBuilder::new_trusted(self, identity)
512 }
513}
514
515fn convert_vtable_user_columns_to_column_defs(columns: &[UserVTableColumnDef]) -> Vec<ColumnDef> {
517 columns.iter()
518 .enumerate()
519 .map(|(idx, col)| {
520 let constraint = TypeConstraint::unconstrained(col.data_type.clone());
523 ColumnDef {
524 id: ColumnId(idx as u64),
525 name: col.name.clone(),
526 constraint,
527 properties: vec![],
528 index: ColumnIndex(idx as u8),
529 auto_increment: false,
530 dictionary_id: None,
531 }
532 })
533 .collect()
534}