1use std::{ops::Deref, sync::Arc, time::Duration};
5
6use reifydb_catalog::{
7 catalog::Catalog,
8 materialized::MaterializedCatalog,
9 schema::SchemaRegistry,
10 vtable::{
11 system::flow_operator_store::{FlowOperatorEventListener, FlowOperatorStore},
12 tables::UserVTableDataFunction,
13 user::{UserVTable, UserVTableColumnDef, registry::UserVTableEntry},
14 },
15};
16use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
17use reifydb_core::{
18 common::CommitVersion,
19 error::diagnostic::catalog::namespace_not_found,
20 event::{Event, EventBus},
21 interface::{
22 WithEventBus,
23 catalog::{
24 column::{ColumnDef, ColumnIndex},
25 id::ColumnId,
26 vtable::{VTableDef, VTableId},
27 },
28 },
29 util::ioc::IocContainer,
30};
31use reifydb_function::registry::Functions;
32use reifydb_metric::metric::MetricReader;
33use reifydb_runtime::{actor::system::ActorSystem, clock::Clock};
34use reifydb_store_single::SingleStore;
35use reifydb_transaction::{
36 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
37 multi::transaction::MultiTransaction,
38 single::SingleTransaction,
39 transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
40};
41use reifydb_type::{
42 error::Error,
43 fragment::Fragment,
44 params::Params,
45 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId},
46};
47use tracing::instrument;
48
49use crate::{
50 Result,
51 bulk_insert::builder::{BulkInsertBuilder, Trusted, Validated},
52 interceptor::catalog::MaterializedCatalogInterceptor,
53 procedure::registry::Procedures,
54 transform::registry::Transforms,
55 vm::{Admin, Command, Query, executor::Executor},
56};
57
58pub struct StandardEngine(Arc<Inner>);
59
60impl WithEventBus for StandardEngine {
61 fn event_bus(&self) -> &EventBus {
62 &self.event_bus
63 }
64}
65
66impl StandardEngine {
68 #[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
69 pub fn begin_command(&self) -> Result<CommandTransaction> {
70 let interceptors = self.interceptors.create();
71 CommandTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
72 }
73
74 #[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
75 pub fn begin_admin(&self) -> Result<AdminTransaction> {
76 let interceptors = self.interceptors.create();
77 AdminTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
78 }
79
80 #[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
81 pub fn begin_query(&self) -> Result<QueryTransaction> {
82 Ok(QueryTransaction::new(self.multi.begin_query()?, self.single.clone()))
83 }
84
85 #[instrument(name = "engine::admin", level = "debug", skip(self, params), fields(rql = %rql))]
86 pub fn admin_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
87 (|| {
88 let mut txn = self.begin_admin()?;
89 let frames = self.executor.admin(
90 &mut txn,
91 Admin {
92 rql,
93 params,
94 identity,
95 },
96 )?;
97 txn.commit()?;
98 Ok(frames)
99 })()
100 .map_err(|mut err: Error| {
101 err.with_statement(rql.to_string());
102 err
103 })
104 }
105
106 #[instrument(name = "engine::command", level = "debug", skip(self, params), fields(rql = %rql))]
107 pub fn command_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
108 (|| {
109 let mut txn = self.begin_command()?;
110 let frames = self.executor.command(
111 &mut txn,
112 Command {
113 rql,
114 params,
115 identity,
116 },
117 )?;
118 txn.commit()?;
119 Ok(frames)
120 })()
121 .map_err(|mut err: Error| {
122 err.with_statement(rql.to_string());
123 err
124 })
125 }
126
127 #[instrument(name = "engine::query", level = "debug", skip(self, params), fields(rql = %rql))]
128 pub fn query_as(&self, identity: IdentityId, rql: &str, params: Params) -> Result<Vec<Frame>> {
129 (|| {
130 let mut txn = self.begin_query()?;
131 self.executor.query(
132 &mut txn,
133 Query {
134 rql,
135 params,
136 identity,
137 },
138 )
139 })()
140 .map_err(|mut err: Error| {
141 err.with_statement(rql.to_string());
142 err
143 })
144 }
145
146 #[instrument(name = "engine::procedure", level = "debug", skip(self, params), fields(name = %name))]
148 pub fn procedure_as(&self, identity: IdentityId, name: &str, params: Params) -> Result<Vec<Frame>> {
149 let mut txn = self.begin_command()?;
150 let frames = self.executor.call_procedure(&mut txn, identity, name, ¶ms)?;
151 txn.commit()?;
152 Ok(frames)
153 }
154
155 pub fn register_virtual_table<T: UserVTable>(&self, namespace: &str, name: &str, table: T) -> Result<VTableId> {
192 let catalog = self.materialized_catalog();
193
194 let ns_def = catalog
196 .find_namespace_by_name(namespace)
197 .ok_or_else(|| Error(namespace_not_found(Fragment::None, namespace)))?;
198
199 let table_id = self.executor.virtual_table_registry.allocate_id();
201 let table_columns = table.definition();
203 let columns = convert_vtable_user_columns_to_column_defs(&table_columns);
204
205 let def = Arc::new(VTableDef {
207 id: table_id,
208 namespace: ns_def.id,
209 name: name.to_string(),
210 columns,
211 });
212
213 catalog.register_vtable_user(def.clone())?;
215 let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
217 let entry = UserVTableEntry {
219 def: def.clone(),
220 data_fn,
221 };
222 self.executor.virtual_table_registry.register(ns_def.id, name.to_string(), entry);
223 Ok(table_id)
224 }
225}
226
227impl CdcHost for StandardEngine {
228 fn begin_command(&self) -> Result<CommandTransaction> {
229 StandardEngine::begin_command(self)
230 }
231
232 fn begin_query(&self) -> Result<QueryTransaction> {
233 StandardEngine::begin_query(self)
234 }
235
236 fn current_version(&self) -> Result<CommitVersion> {
237 StandardEngine::current_version(self)
238 }
239
240 fn done_until(&self) -> CommitVersion {
241 StandardEngine::done_until(self)
242 }
243
244 fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
245 StandardEngine::wait_for_mark_timeout(self, version, timeout)
246 }
247
248 fn schema_registry(&self) -> &SchemaRegistry {
249 &self.catalog.schema
250 }
251}
252
253impl Clone for StandardEngine {
254 fn clone(&self) -> Self {
255 Self(self.0.clone())
256 }
257}
258
259impl Deref for StandardEngine {
260 type Target = Inner;
261
262 fn deref(&self) -> &Self::Target {
263 &self.0
264 }
265}
266
267pub struct Inner {
268 multi: MultiTransaction,
269 single: SingleTransaction,
270 event_bus: EventBus,
271 executor: Executor,
272 interceptors: InterceptorFactory,
273 catalog: Catalog,
274 flow_operator_store: FlowOperatorStore,
275}
276
277impl StandardEngine {
278 pub fn new(
279 multi: MultiTransaction,
280 single: SingleTransaction,
281 event_bus: EventBus,
282 interceptors: InterceptorFactory,
283 catalog: Catalog,
284 clock: Clock,
285 functions: Functions,
286 procedures: Procedures,
287 transforms: Transforms,
288 ioc: IocContainer,
289 ) -> Self {
290 let flow_operator_store = FlowOperatorStore::new();
291 let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
292 event_bus.register(listener);
293
294 let metrics_store = ioc
296 .resolve::<SingleStore>()
297 .expect("SingleStore must be registered in IocContainer for metrics");
298 let stats_reader = MetricReader::new(metrics_store);
299
300 let materialized = catalog.materialized.clone();
302 interceptors.add_late(Arc::new(move |interceptors: &mut Interceptors| {
303 interceptors
304 .post_commit
305 .add(Arc::new(MaterializedCatalogInterceptor::new(materialized.clone())));
306 }));
307
308 Self(Arc::new(Inner {
309 multi,
310 single,
311 event_bus,
312 executor: Executor::new(
313 catalog.clone(),
314 clock,
315 functions,
316 procedures,
317 transforms,
318 flow_operator_store.clone(),
319 stats_reader,
320 ioc,
321 ),
322 interceptors,
323 catalog,
324 flow_operator_store,
325 }))
326 }
327
328 pub fn create_interceptors(&self) -> Interceptors {
330 self.interceptors.create()
331 }
332
333 pub fn add_interceptor_factory(&self, factory: Arc<dyn Fn(&mut Interceptors) + Send + Sync>) {
338 self.interceptors.add_late(factory);
339 }
340
341 #[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
346 ))]
347 pub fn begin_query_at_version(&self, version: CommitVersion) -> Result<QueryTransaction> {
348 Ok(QueryTransaction::new(self.multi.begin_query_at_version(version)?, self.single.clone()))
349 }
350
351 #[inline]
352 pub fn multi(&self) -> &MultiTransaction {
353 &self.multi
354 }
355
356 #[inline]
357 pub fn multi_owned(&self) -> MultiTransaction {
358 self.multi.clone()
359 }
360
361 #[inline]
363 pub fn actor_system(&self) -> ActorSystem {
364 self.multi.actor_system()
365 }
366
367 #[inline]
368 pub fn single(&self) -> &SingleTransaction {
369 &self.single
370 }
371
372 #[inline]
373 pub fn single_owned(&self) -> SingleTransaction {
374 self.single.clone()
375 }
376
377 #[inline]
378 pub fn emit<E: Event>(&self, event: E) {
379 self.event_bus.emit(event)
380 }
381
382 #[inline]
383 pub fn materialized_catalog(&self) -> &MaterializedCatalog {
384 &self.catalog.materialized
385 }
386
387 #[inline]
391 pub fn catalog(&self) -> Catalog {
392 self.catalog.clone()
393 }
394
395 #[inline]
396 pub fn flow_operator_store(&self) -> &FlowOperatorStore {
397 &self.flow_operator_store
398 }
399
400 #[inline]
402 pub fn current_version(&self) -> Result<CommitVersion> {
403 self.multi.current_version()
404 }
405
406 #[inline]
410 pub fn done_until(&self) -> CommitVersion {
411 self.multi.done_until()
412 }
413
414 #[inline]
417 pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
418 self.multi.wait_for_mark_timeout(version, timeout)
419 }
420
421 #[inline]
422 pub fn executor(&self) -> Executor {
423 self.executor.clone()
424 }
425
426 #[inline]
431 pub fn cdc_store(&self) -> CdcStore {
432 self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
433 }
434
435 pub fn shutdown(&self) {
436 self.interceptors.clear_late();
437 self.executor.ioc.clear();
438 }
439
440 pub fn bulk_insert<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Validated> {
458 BulkInsertBuilder::new(self, identity)
459 }
460
461 pub fn bulk_insert_trusted<'e>(&'e self, identity: IdentityId) -> BulkInsertBuilder<'e, Trusted> {
471 BulkInsertBuilder::new_trusted(self, identity)
472 }
473}
474
475fn convert_vtable_user_columns_to_column_defs(columns: &[UserVTableColumnDef]) -> Vec<ColumnDef> {
477 columns.iter()
478 .enumerate()
479 .map(|(idx, col)| {
480 let constraint = TypeConstraint::unconstrained(col.data_type.clone());
483 ColumnDef {
484 id: ColumnId(idx as u64),
485 name: col.name.clone(),
486 constraint,
487 properties: vec![],
488 index: ColumnIndex(idx as u8),
489 auto_increment: false,
490 dictionary_id: None,
491 }
492 })
493 .collect()
494}