1use std::{ops::Deref, rc::Rc, sync::Arc};
5
6use reifydb_catalog::MaterializedCatalog;
7use reifydb_core::{
8 Frame,
9 event::{Event, EventBus},
10 interceptor::InterceptorFactory,
11 interface::{
12 Command, Engine as EngineInterface, ExecuteCommand, ExecuteQuery, Identity, MultiVersionTransaction,
13 Params, Query, WithEventBus,
14 },
15};
16use reifydb_transaction::{cdc::TransactionCdc, multi::TransactionMultiVersion, single::TransactionSingleVersion};
17
18use crate::{
19 execute::Executor,
20 function::{Functions, generator, math},
21 interceptor::materialized_catalog::MaterializedCatalogInterceptor,
22 table_virtual::system::{FlowOperatorEventListener, FlowOperatorStore},
23 transaction::{StandardCommandTransaction, StandardQueryTransaction},
24};
25
26pub struct StandardEngine(Arc<EngineInner>);
27
28impl WithEventBus for StandardEngine {
29 fn event_bus(&self) -> &EventBus {
30 &self.event_bus
31 }
32}
33
34impl EngineInterface for StandardEngine {
35 type Command = StandardCommandTransaction;
36 type Query = StandardQueryTransaction;
37
38 fn begin_command(&self) -> crate::Result<Self::Command> {
39 let mut interceptors = self.interceptors.create();
40
41 interceptors.post_commit.add(Rc::new(MaterializedCatalogInterceptor::new(self.catalog.clone())));
42
43 StandardCommandTransaction::new(
44 self.multi.clone(),
45 self.single.clone(),
46 self.cdc.clone(),
47 self.event_bus.clone(),
48 self.catalog.clone(),
49 interceptors,
50 )
51 }
52
53 fn begin_query(&self) -> crate::Result<Self::Query> {
54 Ok(StandardQueryTransaction::new(
55 self.multi.begin_query()?,
56 self.single.clone(),
57 self.cdc.clone(),
58 self.catalog.clone(),
59 ))
60 }
61
62 fn command_as(&self, identity: &Identity, rql: &str, params: Params) -> crate::Result<Vec<Frame>> {
63 let mut txn = self.begin_command()?;
64 let result = self.execute_command(
65 &mut txn,
66 Command {
67 rql,
68 params,
69 identity,
70 },
71 )?;
72 txn.commit()?;
73 Ok(result)
74 }
75
76 fn query_as(&self, identity: &Identity, rql: &str, params: Params) -> crate::Result<Vec<Frame>> {
77 let mut txn = self.begin_query()?;
78 let result = self.execute_query(
79 &mut txn,
80 Query {
81 rql,
82 params,
83 identity,
84 },
85 )?;
86 Ok(result)
87 }
88}
89
90impl ExecuteCommand<StandardCommandTransaction> for StandardEngine {
91 #[inline]
92 fn execute_command(&self, txn: &mut StandardCommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
93 self.executor.execute_command(txn, cmd)
94 }
95}
96
97impl ExecuteQuery<StandardQueryTransaction> for StandardEngine {
98 #[inline]
99 fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
100 self.executor.execute_query(txn, qry)
101 }
102}
103
104impl Clone for StandardEngine {
105 fn clone(&self) -> Self {
106 Self(self.0.clone())
107 }
108}
109
110impl Deref for StandardEngine {
111 type Target = EngineInner;
112
113 fn deref(&self) -> &Self::Target {
114 &self.0
115 }
116}
117
118pub struct EngineInner {
119 multi: TransactionMultiVersion,
120 single: TransactionSingleVersion,
121 cdc: TransactionCdc,
122 event_bus: EventBus,
123 executor: Executor,
124 interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
125 catalog: MaterializedCatalog,
126 flow_operator_store: FlowOperatorStore,
127}
128
129impl StandardEngine {
130 pub fn new(
131 multi: TransactionMultiVersion,
132 single: TransactionSingleVersion,
133 cdc: TransactionCdc,
134 event_bus: EventBus,
135 interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
136 catalog: MaterializedCatalog,
137 ) -> Self {
138 Self::with_functions(multi, single, cdc, event_bus, interceptors, catalog, None)
139 }
140
141 pub fn with_functions(
142 multi: TransactionMultiVersion,
143 single: TransactionSingleVersion,
144 cdc: TransactionCdc,
145 event_bus: EventBus,
146 interceptors: Box<dyn InterceptorFactory<StandardCommandTransaction>>,
147 catalog: MaterializedCatalog,
148 custom_functions: Option<Functions>,
149 ) -> Self {
150 let functions = custom_functions.unwrap_or_else(|| {
151 Functions::builder()
152 .register_aggregate("math::sum", math::aggregate::Sum::new)
153 .register_aggregate("math::min", math::aggregate::Min::new)
154 .register_aggregate("math::max", math::aggregate::Max::new)
155 .register_aggregate("math::avg", math::aggregate::Avg::new)
156 .register_aggregate("math::count", math::aggregate::Count::new)
157 .register_scalar("math::abs", math::scalar::Abs::new)
158 .register_scalar("math::avg", math::scalar::Avg::new)
159 .register_generator("generate_series", generator::GenerateSeries::new)
160 .build()
161 });
162
163 let flow_operator_store = FlowOperatorStore::new();
165 let listener = FlowOperatorEventListener::new(flow_operator_store.clone());
166 event_bus.register(listener);
167
168 Self(Arc::new(EngineInner {
169 multi,
170 single,
171 cdc,
172 event_bus,
173 executor: Executor::new(functions, flow_operator_store.clone()),
174 interceptors,
175 catalog,
176 flow_operator_store,
177 }))
178 }
179
180 #[inline]
181 pub fn multi(&self) -> &TransactionMultiVersion {
182 &self.multi
183 }
184
185 #[inline]
186 pub fn multi_owned(&self) -> TransactionMultiVersion {
187 self.multi.clone()
188 }
189
190 #[inline]
191 pub fn single(&self) -> &TransactionSingleVersion {
192 &self.single
193 }
194
195 #[inline]
196 pub fn single_owned(&self) -> TransactionSingleVersion {
197 self.single.clone()
198 }
199
200 #[inline]
201 pub fn cdc(&self) -> &TransactionCdc {
202 &self.cdc
203 }
204
205 #[inline]
206 pub fn cdc_owned(&self) -> TransactionCdc {
207 self.cdc.clone()
208 }
209
210 #[inline]
211 pub fn emit<E: Event>(&self, event: E) {
212 self.event_bus.emit(event)
213 }
214
215 #[inline]
216 pub fn catalog(&self) -> &MaterializedCatalog {
217 &self.catalog
218 }
219
220 #[inline]
221 pub fn flow_operator_store(&self) -> &FlowOperatorStore {
222 &self.flow_operator_store
223 }
224
225 #[inline]
226 pub fn executor(&self) -> Executor {
227 self.executor.clone()
228 }
229}