1use std::{ops::Deref, sync::Arc, time::Duration};
5
6use reifydb_catalog::{
7 catalog::Catalog,
8 materialized::MaterializedCatalog,
9 schema::SchemaRegistry,
10 vtable::{
11 system::flow_operator_store::{FlowOperatorEventListener, FlowOperatorStore},
12 tables::UserVTableDataFunction,
13 user::{UserVTable, UserVTableColumnDef, registry::UserVTableEntry},
14 },
15};
16use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
17use reifydb_core::{
18 common::CommitVersion,
19 error::diagnostic::catalog::namespace_not_found,
20 event::{Event, EventBus},
21 interface::{
22 WithEventBus,
23 catalog::{
24 column::{ColumnDef, ColumnIndex},
25 id::ColumnId,
26 vtable::{VTableDef, VTableId},
27 },
28 },
29 util::ioc::IocContainer,
30};
31use reifydb_function::registry::Functions;
32use reifydb_metric::metric::MetricReader;
33use reifydb_runtime::{actor::system::ActorSystem, clock::Clock};
34use reifydb_store_single::SingleStore;
35use reifydb_transaction::{
36 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
37 multi::transaction::MultiTransaction,
38 single::SingleTransaction,
39 transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
40};
41use reifydb_type::{
42 error::Error,
43 fragment::Fragment,
44 params::Params,
45 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId},
46};
47use tracing::instrument;
48
49#[cfg(not(target_arch = "wasm32"))]
50use crate::remote::RemoteRegistry;
51use crate::{
52 Result,
53 bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
54 interceptor::catalog::MaterializedCatalogInterceptor,
55 procedure::registry::Procedures,
56 transform::registry::Transforms,
57 vm::{Admin, Command, Query, executor::Executor},
58};
59
60pub struct StandardEngine(Arc<Inner>);
61
62impl WithEventBus for StandardEngine {
63 fn event_bus(&self) -> &EventBus {
64 &self.event_bus
65 }
66}
67
68impl StandardEngine {
70 #[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
71 pub fn begin_command(&self) -> Result<CommandTransaction> {
72 let interceptors = self.interceptors.create();
73 CommandTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
74 }
75
76 #[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
77 pub fn begin_admin(&self) -> Result<AdminTransaction> {
78 let interceptors = self.interceptors.create();
79 AdminTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
80 }
81
82 #[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
83 pub fn begin_query(&self) -> Result<QueryTransaction> {
84 Ok(QueryTransaction::new(self.multi.begin_query()?, self.single.clone()))
85 }
86
87 #[instrument(name = "engine::admin", level = "debug", skip(self, params), fields(rql = %rql))]
88 pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
89 (|| {
90 let mut txn = self.begin_admin()?;
91 let frames = self.executor.admin(
92 &mut txn,
93 Admin {
94 rql,
95 params,
96 identity,
97 },
98 )?;
99 txn.commit()?;
100 Ok(frames)
101 })()
102 .map_err(|mut err: Error| {
103 err.with_statement(rql.to_string());
104 err
105 })
106 }
107
108 #[instrument(name = "engine::command", level = "debug", skip(self, params), fields(rql = %rql))]
109 pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
110 (|| {
111 let mut txn = self.begin_command()?;
112 let frames = self.executor.command(
113 &mut txn,
114 Command {
115 rql,
116 params,
117 identity,
118 },
119 )?;
120 txn.commit()?;
121 Ok(frames)
122 })()
123 .map_err(|mut err: Error| {
124 err.with_statement(rql.to_string());
125 err
126 })
127 }
128
129 #[instrument(name = "engine::query", level = "debug", skip(self, params), fields(rql = %rql))]
130 pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
131 (|| {
132 let mut txn = self.begin_query()?;
133 self.executor.query(
134 &mut txn,
135 Query {
136 rql,
137 params,
138 identity,
139 },
140 )
141 })()
142 .map_err(|mut err: Error| {
143 err.with_statement(rql.to_string());
144 err
145 })
146 }
147
148 #[instrument(name = "engine::procedure", level = "debug", skip(self, params), fields(name = %name))]
150 pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> Result<Vec<Frame>> {
151 let mut txn = self.begin_command()?;
152 let frames = self.executor.call_procedure(&mut txn, identity, name, ¶ms)?;
153 txn.commit()?;
154 Ok(frames)
155 }
156
157 pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
194 let catalog = self.materialized_catalog();
195
196 let ns_def = catalog
198 .find_namespace_by_name(namespace)
199 .ok_or_else(|| Error(namespace_not_found(Fragment::None, namespace)))?;
200
201 let table_id = self.executor.virtual_table_registry.allocate_id();
203 let table_columns = table.definition();
205 let columns = convert_vtable_user_columns_to_column_defs(&table_columns);
206
207 let def = Arc::new(VTableDef {
209 id: table_id,
210 namespace: ns_def.id(),
211 name: name.to_string(),
212 columns,
213 });
214
215 catalog.register_vtable_user(def.clone())?;
217 let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
219 let entry = UserVTableEntry {
221 def: def.clone(),
222 data_fn,
223 };
224 self.executor.virtual_table_registry.register(ns_def.id(), name.to_string(), entry);
225 Ok(table_id)
226 }
227}
228
229impl CdcHost for StandardEngine {
230 fn begin_command(&self) -> Result<CommandTransaction> {
231 StandardEngine::begin_command(self)
232 }
233
234 fn begin_query(&self) -> Result<QueryTransaction> {
235 StandardEngine::begin_query(self)
236 }
237
238 fn current_version(&self) -> Result<CommitVersion> {
239 StandardEngine::current_version(self)
240 }
241
242 fn done_until(&self) -> CommitVersion {
243 StandardEngine::done_until(self)
244 }
245
246 fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
247 StandardEngine::wait_for_mark_timeout(self, version, timeout)
248 }
249
250 fn schema_registry(&self) -> &SchemaRegistry {
251 &self.catalog.schema
252 }
253}
254
255impl Clone for StandardEngine {
256 fn clone(&self) -> Self {
257 Self(self.0.clone())
258 }
259}
260
261impl Deref for StandardEngine {
262 type Target = Inner;
263
264 fn deref(&self) -> &Self::Target {
265 &self.0
266 }
267}
268
269pub struct Inner {
270 multi: MultiTransaction,
271 single: SingleTransaction,
272 event_bus: EventBus,
273 executor: Executor,
274 interceptors: InterceptorFactory,
275 catalog: Catalog,
276 flow_operator_store: FlowOperatorStore,
277}
278
279impl StandardEngine {
280 pub fn new(
281 multi: MultiTransaction,
282 single: SingleTransaction,
283 event_bus: EventBus,
284 interceptors: InterceptorFactory,
285 catalog: Catalog,
286 clock: Clock,
287 functions: Functions,
288 procedures: Procedures,
289 transforms: Transforms,
290 ioc: IocContainer,
291 #[cfg(not(target_arch = "wasm32"))] remote_registry: Option<RemoteRegistry>,
292 ) -> Self {
293 let flow_operator_store = FlowOperatorStore::new();
294 let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
295 event_bus.register(listener);
296
297 let metrics_store = ioc
299 .resolve::<SingleStore>()
300 .expect("SingleStore must be registered in IocContainer for metrics");
301 let stats_reader = MetricReader::new(metrics_store);
302
303 let materialized = catalog.materialized.clone();
305 interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
306 interceptors
307 .post_commit
308 .add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
309 }));
310
311 Self(Arc::new(Inner {
312 multi,
313 single,
314 event_bus,
315 executor: Executor::new(
316 catalog.clone(),
317 clock,
318 functions,
319 procedures,
320 transforms,
321 flow_operator_store.clone(),
322 stats_reader,
323 ioc,
324 #[cfg(not(target_arch = "wasm32"))]
325 remote_registry,
326 ),
327 interceptors,
328 catalog,
329 flow_operator_store,
330 }))
331 }
332
333 pub fn create_interceptors(&self) -> Interceptors {
335 self.interceptors.create()
336 }
337
338 pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
343 self.interceptors.add_late(factory);
344 }
345
346 #[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
351 ))]
352 pub fn begin_query_at_version(&self, version: CommitVersion) -> Result<QueryTransaction> {
353 Ok(QueryTransaction::new(self.multi.begin_query_at_version(version)?, self.single.clone()))
354 }
355
356 #[inline]
357 pub fn multi(&self) -> &MultiTransaction {
358 &self.multi
359 }
360
361 #[inline]
362 pub fn multi_owned(&self) -> MultiTransaction {
363 self.multi.clone()
364 }
365
366 #[inline]
368 pub fn actor_system(&self) -> ActorSystem {
369 self.multi.actor_system()
370 }
371
372 #[inline]
373 pub fn single(&self) -> &SingleTransaction {
374 &self.single
375 }
376
377 #[inline]
378 pub fn single_owned(&self) -> SingleTransaction {
379 self.single.clone()
380 }
381
382 #[inline]
383 pub fn emit<E: Event>(&self, event: E) {
384 self.event_bus.emit(event)
385 }
386
387 #[inline]
388 pub fn materialized_catalog(&self) -> &MaterializedCatalog {
389 &self.catalog.materialized
390 }
391
392 #[inline]
396 pub fn catalog(&self) -> Catalog {
397 self.catalog.clone()
398 }
399
400 #[inline]
401 pub fn flow_operator_store(&self) -> &FlowOperatorStore {
402 &self.flow_operator_store
403 }
404
405 #[inline]
407 pub fn current_version(&self) -> Result<CommitVersion> {
408 self.multi.current_version()
409 }
410
411 #[inline]
415 pub fn done_until(&self) -> CommitVersion {
416 self.multi.done_until()
417 }
418
419 #[inline]
422 pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
423 self.multi.wait_for_mark_timeout(version, timeout)
424 }
425
426 #[inline]
427 pub fn executor(&self) -> Executor {
428 self.executor.clone()
429 }
430
431 #[inline]
436 pub fn cdc_store(&self) -> CdcStore {
437 self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
438 }
439
440 pub fn shutdown(&self) {
441 self.interceptors.clear_late();
442 self.executor.ioc.clear();
443 }
444
445 pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
463 BulkInsertBuilder::new(self, identity)
464 }
465
466 pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
476 BulkInsertBuilder::new_trusted(self, identity)
477 }
478}
479
480fn convert_vtable_user_columns_to_column_defs(columns: &[UserVTableColumnDef]) -> Vec<ColumnDef> {
482 columns.iter()
483 .enumerate()
484 .map(|(idx, col)| {
485 let constraint = TypeConstraint::unconstrained(col.data_type.clone());
488 ColumnDef {
489 id: ColumnId(idx as u64),
490 name: col.name.clone(),
491 constraint,
492 properties: vec![],
493 index: ColumnIndex(idx as u8),
494 auto_increment: false,
495 dictionary_id: None,
496 }
497 })
498 .collect()
499}