1use std::{ops::Deref, sync::Arc, time::Duration};
5
6use reifydb_catalog::{
7 catalog::Catalog,
8 materialized::MaterializedCatalog,
9 vtable::{
10 system::flow_operator_store::{FlowOperatorEventListener, FlowOperatorStore},
11 tables::UserVTableDataFunction,
12 user::{UserVTable, UserVTableColumnDef, registry::UserVTableEntry},
13 },
14};
15use reifydb_cdc::{consume::host::CdcHost, storage::CdcStore};
16use reifydb_core::{
17 common::CommitVersion,
18 error::diagnostic::catalog::namespace_not_found,
19 event::{Event, EventBus},
20 interface::{
21 WithEventBus,
22 auth::Identity,
23 catalog::{
24 column::{ColumnDef, ColumnIndex},
25 id::ColumnId,
26 vtable::{VTableDef, VTableId},
27 },
28 },
29 util::ioc::IocContainer,
30};
31use reifydb_function::registry::Functions;
32use reifydb_metric::metric::MetricReader;
33use reifydb_runtime::{actor::system::ActorSystem, clock::Clock};
34use reifydb_transaction::{
35 interceptor::factory::InterceptorFactory,
36 multi::transaction::MultiTransaction,
37 single::SingleTransaction,
38 transaction::{admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction},
39};
40use reifydb_type::{
41 error::Error,
42 fragment::Fragment,
43 params::Params,
44 value::{constraint::TypeConstraint, frame::frame::Frame},
45};
46use tracing::instrument;
47
48use crate::{
49 bulk_insert::builder::BulkInsertBuilder,
50 interceptor::catalog::MaterializedCatalogInterceptor,
51 transform::registry::Transforms,
52 vm::{Admin, Command, Query, executor::Executor},
53};
54
55pub struct StandardEngine(Arc<Inner>);
56
57impl WithEventBus for StandardEngine {
58 fn event_bus(&self) -> &EventBus {
59 &self.event_bus
60 }
61}
62
63impl StandardEngine {
65 #[instrument(name = "engine::transaction::begin_command", level = "debug", skip(self))]
66 pub fn begin_command(&self) -> crate::Result<CommandTransaction> {
67 let mut interceptors = self.interceptors.create();
68
69 interceptors
70 .post_commit
71 .add(Arc::new(MaterializedCatalogInterceptor::new(self.catalog.materialized.clone())));
72
73 CommandTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
74 }
75
76 #[instrument(name = "engine::transaction::begin_admin", level = "debug", skip(self))]
77 pub fn begin_admin(&self) -> crate::Result<AdminTransaction> {
78 let mut interceptors = self.interceptors.create();
79
80 interceptors
81 .post_commit
82 .add(Arc::new(MaterializedCatalogInterceptor::new(self.catalog.materialized.clone())));
83
84 AdminTransaction::new(self.multi.clone(), self.single.clone(), self.event_bus.clone(), interceptors)
85 }
86
87 #[instrument(name = "engine::transaction::begin_query", level = "debug", skip(self))]
88 pub fn begin_query(&self) -> crate::Result<QueryTransaction> {
89 Ok(QueryTransaction::new(self.multi.begin_query()?, self.single.clone()))
90 }
91
92 #[instrument(name = "engine::admin", level = "debug", skip(self, params), fields(rql = %rql))]
93 pub fn admin_as(&self, identity: &Identity, rql: &str, params: Params) -> Result<Vec<Frame>, Error> {
94 (|| {
95 let mut txn = self.begin_admin()?;
96 let frames = self.executor.admin(
97 &mut txn,
98 Admin {
99 rql,
100 params,
101 identity,
102 },
103 )?;
104 txn.commit()?;
105 Ok(frames)
106 })()
107 .map_err(|mut err: Error| {
108 err.with_statement(rql.to_string());
109 err
110 })
111 }
112
113 #[instrument(name = "engine::command", level = "debug", skip(self, params), fields(rql = %rql))]
114 pub fn command_as(&self, identity: &Identity, rql: &str, params: Params) -> Result<Vec<Frame>, Error> {
115 (|| {
116 let mut txn = self.begin_command()?;
117 let frames = self.executor.command(
118 &mut txn,
119 Command {
120 rql,
121 params,
122 identity,
123 },
124 )?;
125 txn.commit()?;
126 Ok(frames)
127 })()
128 .map_err(|mut err: Error| {
129 err.with_statement(rql.to_string());
130 err
131 })
132 }
133
134 #[instrument(name = "engine::query", level = "debug", skip(self, params), fields(rql = %rql))]
135 pub fn query_as(&self, identity: &Identity, rql: &str, params: Params) -> Result<Vec<Frame>, Error> {
136 (|| {
137 let mut txn = self.begin_query()?;
138 self.executor.query(
139 &mut txn,
140 Query {
141 rql,
142 params,
143 identity,
144 },
145 )
146 })()
147 .map_err(|mut err: Error| {
148 err.with_statement(rql.to_string());
149 err
150 })
151 }
152
153 pub fn register_virtual_table<T: UserVTable>(
190 &self,
191 namespace: &str,
192 name: &str,
193 table: T,
194 ) -> crate::Result<VTableId> {
195 let catalog = self.materialized_catalog();
196
197 let ns_def = catalog
199 .find_namespace_by_name(namespace)
200 .ok_or_else(|| Error(namespace_not_found(Fragment::None, namespace)))?;
201
202 let table_id = self.executor.virtual_table_registry.allocate_id();
204 let table_columns = table.definition();
206 let columns = convert_vtable_user_columns_to_column_defs(&table_columns);
207
208 let def = Arc::new(VTableDef {
210 id: table_id,
211 namespace: ns_def.id,
212 name: name.to_string(),
213 columns,
214 });
215
216 catalog.register_vtable_user(def.clone())?;
218 let data_fn: UserVTableDataFunction = Arc::new(move |_params| table.get());
220 let entry = UserVTableEntry {
222 def: def.clone(),
223 data_fn,
224 };
225 self.executor.virtual_table_registry.register(ns_def.id, name.to_string(), entry);
226 Ok(table_id)
227 }
228}
229
230impl CdcHost for StandardEngine {
231 fn begin_command(&self) -> reifydb_type::Result<CommandTransaction> {
232 StandardEngine::begin_command(self)
233 }
234
235 fn begin_query(&self) -> reifydb_type::Result<QueryTransaction> {
236 StandardEngine::begin_query(self)
237 }
238
239 fn current_version(&self) -> reifydb_type::Result<CommitVersion> {
240 StandardEngine::current_version(self)
241 }
242
243 fn done_until(&self) -> CommitVersion {
244 StandardEngine::done_until(self)
245 }
246
247 fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
248 StandardEngine::wait_for_mark_timeout(self, version, timeout)
249 }
250
251 fn schema_registry(&self) -> &reifydb_catalog::schema::SchemaRegistry {
252 &self.catalog.schema
253 }
254}
255
256impl Clone for StandardEngine {
257 fn clone(&self) -> Self {
258 Self(self.0.clone())
259 }
260}
261
262impl Deref for StandardEngine {
263 type Target = Inner;
264
265 fn deref(&self) -> &Self::Target {
266 &self.0
267 }
268}
269
270pub struct Inner {
271 multi: MultiTransaction,
272 single: SingleTransaction,
273 event_bus: EventBus,
274 executor: Executor,
275 interceptors: Box<dyn InterceptorFactory>,
276 catalog: Catalog,
277 flow_operator_store: FlowOperatorStore,
278}
279
280impl StandardEngine {
281 pub fn new(
282 multi: MultiTransaction,
283 single: SingleTransaction,
284 event_bus: EventBus,
285 interceptors: Box<dyn InterceptorFactory>,
286 catalog: Catalog,
287 clock: Clock,
288 functions: Functions,
289 transforms: Transforms,
290 ioc: IocContainer,
291 ) -> Self {
292 let flow_operator_store = FlowOperatorStore::new();
293 let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
294 event_bus.register(listener);
295
296 let metrics_store = ioc
298 .resolve::<reifydb_store_single::SingleStore>()
299 .expect("SingleStore must be registered in IocContainer for metrics");
300 let stats_reader = MetricReader::new(metrics_store);
301
302 Self(Arc::new(Inner {
303 multi,
304 single,
305 event_bus,
306 executor: Executor::new(
307 catalog.clone(),
308 clock,
309 functions,
310 transforms,
311 flow_operator_store.clone(),
312 stats_reader,
313 ioc,
314 ),
315 interceptors,
316 catalog,
317 flow_operator_store,
318 }))
319 }
320
321 pub fn create_interceptors(&self) -> reifydb_transaction::interceptor::interceptors::Interceptors {
323 self.interceptors.create()
324 }
325
326 #[instrument(name = "engine::transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0
331 ))]
332 pub fn begin_query_at_version(&self, version: CommitVersion) -> crate::Result<QueryTransaction> {
333 Ok(QueryTransaction::new(self.multi.begin_query_at_version(version)?, self.single.clone()))
334 }
335
336 #[inline]
337 pub fn multi(&self) -> &MultiTransaction {
338 &self.multi
339 }
340
341 #[inline]
342 pub fn multi_owned(&self) -> MultiTransaction {
343 self.multi.clone()
344 }
345
346 #[inline]
348 pub fn actor_system(&self) -> ActorSystem {
349 self.multi.actor_system()
350 }
351
352 #[inline]
353 pub fn single(&self) -> &SingleTransaction {
354 &self.single
355 }
356
357 #[inline]
358 pub fn single_owned(&self) -> SingleTransaction {
359 self.single.clone()
360 }
361
362 #[inline]
363 pub fn emit<E: Event>(&self, event: E) {
364 self.event_bus.emit(event)
365 }
366
367 #[inline]
368 pub fn materialized_catalog(&self) -> &MaterializedCatalog {
369 &self.catalog.materialized
370 }
371
372 #[inline]
376 pub fn catalog(&self) -> Catalog {
377 self.catalog.clone()
378 }
379
380 #[inline]
381 pub fn flow_operator_store(&self) -> &FlowOperatorStore {
382 &self.flow_operator_store
383 }
384
385 #[inline]
387 pub fn current_version(&self) -> crate::Result<CommitVersion> {
388 self.multi.current_version()
389 }
390
391 #[inline]
395 pub fn done_until(&self) -> CommitVersion {
396 self.multi.done_until()
397 }
398
399 #[inline]
402 pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
403 self.multi.wait_for_mark_timeout(version, timeout)
404 }
405
406 #[inline]
407 pub fn executor(&self) -> Executor {
408 self.executor.clone()
409 }
410
411 #[inline]
416 pub fn cdc_store(&self) -> CdcStore {
417 self.executor.ioc.resolve::<CdcStore>().expect("CdcStore must be registered")
418 }
419
420 pub fn bulk_insert<'e>(
438 &'e self,
439 identity: &'e Identity,
440 ) -> BulkInsertBuilder<'e, crate::bulk_insert::builder::Validated> {
441 BulkInsertBuilder::new(self, identity)
442 }
443
444 pub fn bulk_insert_trusted<'e>(
454 &'e self,
455 identity: &'e Identity,
456 ) -> BulkInsertBuilder<'e, crate::bulk_insert::builder::Trusted> {
457 BulkInsertBuilder::new_trusted(self, identity)
458 }
459}
460
461fn convert_vtable_user_columns_to_column_defs(columns: &[UserVTableColumnDef]) -> Vec<ColumnDef> {
463 columns.iter()
464 .enumerate()
465 .map(|(idx, col)| {
466 let constraint = TypeConstraint::unconstrained(col.data_type.clone());
469 ColumnDef {
470 id: ColumnId(idx as u64),
471 name: col.name.clone(),
472 constraint,
473 policies: vec![],
474 index: ColumnIndex(idx as u8),
475 auto_increment: false,
476 dictionary_id: None,
477 }
478 })
479 .collect()
480}