1use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7 catalog::{
8 Catalog,
9 namespace::NamespaceToCreate,
10 table::{TableColumnToCreate, TableToCreate},
11 },
12 materialized::MaterializedCatalog,
13 schema::RowSchemaRegistry,
14};
15use reifydb_cdc::{
16 produce::producer::{CdcProduceMsg, CdcProducerEventListener, spawn_cdc_producer},
17 storage::CdcStore,
18};
19use reifydb_core::{
20 config::SystemConfig,
21 event::{
22 EventBus,
23 metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
24 transaction::PostCommitEvent,
25 },
26 interface::catalog::id::NamespaceId,
27 util::ioc::IocContainer,
28};
29use reifydb_extension::transform::registry::Transforms;
30use reifydb_metric::worker::{
31 CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
32};
33use reifydb_routine::{function::default_functions, procedure::registry::Procedures};
34use reifydb_runtime::{
35 SharedRuntime, SharedRuntimeConfig,
36 actor::system::{ActorHandle, ActorSystem, ActorSystemConfig},
37 context::{RuntimeContext, clock::Clock},
38};
39use reifydb_store_multi::MultiStore;
40use reifydb_store_single::SingleStore;
41use reifydb_transaction::{
42 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
43 multi::transaction::{MultiTransaction, register_oracle_defaults},
44 single::SingleTransaction,
45 transaction::admin::AdminTransaction,
46};
47use reifydb_type::{
48 fragment::Fragment,
49 params::Params,
50 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
51};
52
53use crate::engine::StandardEngine;
54
55pub struct TestEngine {
56 engine: StandardEngine,
57}
58
59impl TestEngine {
60 pub fn new() -> Self {
62 Self::builder().with_cdc().with_metrics().build()
63 }
64
65 pub fn builder() -> TestEngineBuilder {
67 TestEngineBuilder::default()
68 }
69
70 pub fn admin(&self, rql: &str) -> Vec<Frame> {
72 self.engine
73 .admin_as(IdentityId::system(), rql, Params::None)
74 .unwrap_or_else(|e| panic!("admin failed: {e:?}\nrql: {rql}"))
75 }
76
77 pub fn command(&self, rql: &str) -> Vec<Frame> {
79 self.engine
80 .command_as(IdentityId::system(), rql, Params::None)
81 .unwrap_or_else(|e| panic!("command failed: {e:?}\nrql: {rql}"))
82 }
83
84 pub fn query(&self, rql: &str) -> Vec<Frame> {
86 self.engine
87 .query_as(IdentityId::system(), rql, Params::None)
88 .unwrap_or_else(|e| panic!("query failed: {e:?}\nrql: {rql}"))
89 }
90
91 pub fn admin_err(&self, rql: &str) -> String {
93 match self.engine.admin_as(IdentityId::system(), rql, Params::None) {
94 Err(e) => format!("{e:?}"),
95 Ok(_) => panic!("Expected error but admin succeeded\nrql: {rql}"),
96 }
97 }
98
99 pub fn command_err(&self, rql: &str) -> String {
101 match self.engine.command_as(IdentityId::system(), rql, Params::None) {
102 Err(e) => format!("{e:?}"),
103 Ok(_) => panic!("Expected error but command succeeded\nrql: {rql}"),
104 }
105 }
106
107 pub fn query_err(&self, rql: &str) -> String {
109 match self.engine.query_as(IdentityId::system(), rql, Params::None) {
110 Err(e) => format!("{e:?}"),
111 Ok(_) => panic!("Expected error but query succeeded\nrql: {rql}"),
112 }
113 }
114
115 pub fn row_count(frames: &[Frame]) -> usize {
117 frames.first().map(|f| f.row_count()).unwrap_or(0)
118 }
119
120 pub fn identity() -> IdentityId {
122 IdentityId::system()
123 }
124
125 pub fn inner(&self) -> &StandardEngine {
127 &self.engine
128 }
129}
130
131impl Deref for TestEngine {
132 type Target = StandardEngine;
133
134 fn deref(&self) -> &StandardEngine {
135 &self.engine
136 }
137}
138
139#[derive(Default)]
140pub struct TestEngineBuilder {
141 cdc: bool,
142 metrics: bool,
143}
144
145impl TestEngineBuilder {
146 pub fn with_cdc(mut self) -> Self {
147 self.cdc = true;
148 self
149 }
150
151 pub fn with_metrics(mut self) -> Self {
152 self.metrics = true;
153 self
154 }
155
156 pub fn build(self) -> TestEngine {
157 let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
158 let eventbus = EventBus::new(&actor_system);
159 let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
160 let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
161 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
162 let runtime = SharedRuntime::from_config(
163 SharedRuntimeConfig::default()
164 .async_threads(2)
165 .compute_threads(2)
166 .compute_max_in_flight(32)
167 .deterministic_testing(1000),
168 );
169 let system_config = SystemConfig::new();
170 register_oracle_defaults(&system_config);
171 let multi = MultiTransaction::new(
172 multi_store.clone(),
173 single.clone(),
174 eventbus.clone(),
175 actor_system,
176 runtime.clock().clone(),
177 system_config,
178 )
179 .unwrap();
180
181 let mut ioc = IocContainer::new();
182
183 let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
184 ioc = ioc.register(materialized_catalog.clone());
185
186 let row_schema_registry = RowSchemaRegistry::new(single.clone());
187 ioc = ioc.register(row_schema_registry.clone());
188
189 ioc = ioc.register(runtime.clone());
190 ioc = ioc.register(single_store.clone());
191
192 if self.metrics {
193 let metrics_worker = Arc::new(MetricsWorker::new(
194 MetricsWorkerConfig::default(),
195 single_store.clone(),
196 multi_store.clone(),
197 eventbus.clone(),
198 ));
199 eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(
200 metrics_worker.sender(),
201 ));
202 eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
203 eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(
204 metrics_worker.sender(),
205 ));
206 ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
207 }
208
209 let cdc_store = CdcStore::memory();
210 ioc = ioc.register(cdc_store.clone());
211
212 let ioc_for_cdc = ioc.clone();
213
214 let engine = StandardEngine::new(
215 multi,
216 single,
217 eventbus.clone(),
218 InterceptorFactory::default(),
219 Catalog::new(materialized_catalog, row_schema_registry),
220 RuntimeContext::with_clock(runtime.clock().clone()),
221 default_functions().build(),
222 Procedures::empty(),
223 Transforms::empty(),
224 ioc,
225 #[cfg(not(target_arch = "wasm32"))]
226 None,
227 );
228
229 if self.cdc {
230 let cdc_handle = spawn_cdc_producer(
231 &runtime.actor_system(),
232 cdc_store,
233 multi_store.clone(),
234 engine.clone(),
235 eventbus.clone(),
236 );
237 eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
238 cdc_handle.actor_ref().clone(),
239 runtime.clock().clone(),
240 ));
241 ioc_for_cdc.register_service::<Arc<ActorHandle<CdcProduceMsg>>>(Arc::new(cdc_handle));
242 }
243
244 TestEngine {
245 engine,
246 }
247 }
248}
249
250pub fn create_test_admin_transaction() -> AdminTransaction {
251 let multi_store = MultiStore::testing_memory();
252 let single_store = SingleStore::testing_memory();
253
254 let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
255 let event_bus = EventBus::new(&actor_system);
256 let single = SingleTransaction::new(single_store, event_bus.clone());
257 let system_config = SystemConfig::new();
258 register_oracle_defaults(&system_config);
259 let multi = MultiTransaction::new(
260 multi_store,
261 single.clone(),
262 event_bus.clone(),
263 actor_system,
264 Clock::default(),
265 system_config,
266 )
267 .unwrap();
268
269 AdminTransaction::new(multi, single, event_bus, Interceptors::new(), IdentityId::system()).unwrap()
270}
271
272pub fn create_test_admin_transaction_with_internal_schema() -> AdminTransaction {
273 let multi_store = MultiStore::testing_memory();
274 let single_store = SingleStore::testing_memory();
275
276 let actor_system = ActorSystem::new(ActorSystemConfig {
277 pool_threads: 1,
278 max_in_flight: 1,
279 });
280 let event_bus = EventBus::new(&actor_system);
281 let single = SingleTransaction::new(single_store, event_bus.clone());
282 let system_config = SystemConfig::new();
283 register_oracle_defaults(&system_config);
284 let multi = MultiTransaction::new(
285 multi_store,
286 single.clone(),
287 event_bus.clone(),
288 actor_system,
289 Clock::default(),
290 system_config,
291 )
292 .unwrap();
293 let mut result = AdminTransaction::new(
294 multi,
295 single.clone(),
296 event_bus.clone(),
297 Interceptors::new(),
298 IdentityId::system(),
299 )
300 .unwrap();
301
302 let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
303 let row_schema_registry = RowSchemaRegistry::new(single);
304 let catalog = Catalog::new(materialized_catalog, row_schema_registry);
305
306 let namespace = catalog
307 .create_namespace(
308 &mut result,
309 NamespaceToCreate {
310 namespace_fragment: None,
311 name: "reifydb".to_string(),
312 local_name: "reifydb".to_string(),
313 parent_id: NamespaceId::ROOT,
314 grpc: None,
315 token: None,
316 },
317 )
318 .unwrap();
319
320 catalog.create_table(
321 &mut result,
322 TableToCreate {
323 name: Fragment::internal("flows"),
324 namespace: namespace.id(),
325 columns: vec![
326 TableColumnToCreate {
327 name: Fragment::internal("id"),
328 fragment: Fragment::None,
329 constraint: TypeConstraint::unconstrained(Type::Int8),
330 properties: vec![],
331 auto_increment: true,
332 dictionary_id: None,
333 },
334 TableColumnToCreate {
335 name: Fragment::internal("data"),
336 fragment: Fragment::None,
337 constraint: TypeConstraint::unconstrained(Type::Blob),
338 properties: vec![],
339 auto_increment: false,
340 dictionary_id: None,
341 },
342 ],
343 retention_policy: None,
344 primary_key_columns: None,
345 },
346 )
347 .unwrap();
348
349 result
350}