1use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7 catalog::{
8 Catalog,
9 namespace::NamespaceToCreate,
10 table::{TableColumnToCreate, TableToCreate},
11 },
12 materialized::MaterializedCatalog,
13};
14use reifydb_cdc::{
15 produce::producer::{CdcProduceMsg, CdcProducerEventListener, spawn_cdc_producer},
16 storage::CdcStore,
17};
18use reifydb_core::{
19 config::SystemConfig,
20 event::{
21 EventBus,
22 metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
23 transaction::PostCommitEvent,
24 },
25 interface::catalog::id::NamespaceId,
26 util::ioc::IocContainer,
27};
28use reifydb_extension::transform::registry::Transforms;
29use reifydb_metric::worker::{
30 CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
31};
32use reifydb_routine::{function::default_functions, procedure::registry::Procedures};
33use reifydb_runtime::{
34 SharedRuntime, SharedRuntimeConfig,
35 actor::system::{ActorHandle, ActorSystem},
36 context::{
37 RuntimeContext,
38 clock::{Clock, MockClock},
39 rng::Rng,
40 },
41};
42use reifydb_store_multi::MultiStore;
43use reifydb_store_single::SingleStore;
44use reifydb_transaction::{
45 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
46 multi::transaction::{MultiTransaction, register_oracle_defaults},
47 single::SingleTransaction,
48 transaction::admin::AdminTransaction,
49};
50use reifydb_type::{
51 fragment::Fragment,
52 params::Params,
53 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
54};
55
56use crate::{engine::StandardEngine, vm::services::EngineConfig};
57
58pub struct TestEngine {
59 engine: StandardEngine,
60}
61
62impl Default for TestEngine {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl TestEngine {
69 pub fn new() -> Self {
71 Self::builder().with_cdc().with_metrics().build()
72 }
73
74 pub fn builder() -> TestEngineBuilder {
76 TestEngineBuilder::default()
77 }
78
79 pub fn admin(&self, rql: &str) -> Vec<Frame> {
81 self.engine
82 .admin_as(IdentityId::system(), rql, Params::None)
83 .unwrap_or_else(|e| panic!("admin failed: {e:?}\nrql: {rql}"))
84 }
85
86 pub fn command(&self, rql: &str) -> Vec<Frame> {
88 self.engine
89 .command_as(IdentityId::system(), rql, Params::None)
90 .unwrap_or_else(|e| panic!("command failed: {e:?}\nrql: {rql}"))
91 }
92
93 pub fn query(&self, rql: &str) -> Vec<Frame> {
95 self.engine
96 .query_as(IdentityId::system(), rql, Params::None)
97 .unwrap_or_else(|e| panic!("query failed: {e:?}\nrql: {rql}"))
98 }
99
100 pub fn admin_err(&self, rql: &str) -> String {
102 match self.engine.admin_as(IdentityId::system(), rql, Params::None) {
103 Err(e) => format!("{e:?}"),
104 Ok(_) => panic!("Expected error but admin succeeded\nrql: {rql}"),
105 }
106 }
107
108 pub fn command_err(&self, rql: &str) -> String {
110 match self.engine.command_as(IdentityId::system(), rql, Params::None) {
111 Err(e) => format!("{e:?}"),
112 Ok(_) => panic!("Expected error but command succeeded\nrql: {rql}"),
113 }
114 }
115
116 pub fn query_err(&self, rql: &str) -> String {
118 match self.engine.query_as(IdentityId::system(), rql, Params::None) {
119 Err(e) => format!("{e:?}"),
120 Ok(_) => panic!("Expected error but query succeeded\nrql: {rql}"),
121 }
122 }
123
124 pub fn row_count(frames: &[Frame]) -> usize {
126 frames.first().map(|f| f.row_count()).unwrap_or(0)
127 }
128
129 pub fn identity() -> IdentityId {
131 IdentityId::system()
132 }
133
134 pub fn inner(&self) -> &StandardEngine {
136 &self.engine
137 }
138}
139
140impl Deref for TestEngine {
141 type Target = StandardEngine;
142
143 fn deref(&self) -> &StandardEngine {
144 &self.engine
145 }
146}
147
148#[derive(Default)]
149pub struct TestEngineBuilder {
150 cdc: bool,
151 metrics: bool,
152}
153
154impl TestEngineBuilder {
155 pub fn with_cdc(mut self) -> Self {
156 self.cdc = true;
157 self
158 }
159
160 pub fn with_metrics(mut self) -> Self {
161 self.metrics = true;
162 self
163 }
164
165 pub fn build(self) -> TestEngine {
166 let actor_system = ActorSystem::new(1);
167 let eventbus = EventBus::new(&actor_system);
168 let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
169 let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
170 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
171 let runtime = SharedRuntime::from_config(
172 SharedRuntimeConfig::default().async_threads(2).compute_threads(2).deterministic_testing(1000),
173 );
174 let system_config = SystemConfig::new();
175 register_oracle_defaults(&system_config);
176 let multi = MultiTransaction::new(
177 multi_store.clone(),
178 single.clone(),
179 eventbus.clone(),
180 actor_system,
181 runtime.clock().clone(),
182 runtime.rng().clone(),
183 system_config,
184 )
185 .unwrap();
186
187 let mut ioc = IocContainer::new();
188
189 let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
190 ioc = ioc.register(materialized_catalog.clone());
191
192 ioc = ioc.register(runtime.clone());
193 ioc = ioc.register(single_store.clone());
194
195 if self.metrics {
196 let metrics_worker = Arc::new(MetricsWorker::new(
197 MetricsWorkerConfig::default(),
198 single_store.clone(),
199 multi_store.clone(),
200 eventbus.clone(),
201 ));
202 eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(
203 metrics_worker.sender(),
204 ));
205 eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
206 eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(
207 metrics_worker.sender(),
208 ));
209 ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
210 }
211
212 let cdc_store = CdcStore::memory();
213 ioc = ioc.register(cdc_store.clone());
214
215 let ioc_for_cdc = ioc.clone();
216
217 let engine = StandardEngine::new(
218 multi,
219 single.clone(),
220 eventbus.clone(),
221 InterceptorFactory::default(),
222 Catalog::new(materialized_catalog),
223 EngineConfig {
224 runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
225 functions: default_functions().configure(),
226 procedures: Procedures::empty(),
227 transforms: Transforms::empty(),
228 ioc,
229 #[cfg(not(target_arch = "wasm32"))]
230 remote_registry: None,
231 },
232 );
233
234 if self.cdc {
235 let cdc_handle = spawn_cdc_producer(
236 &runtime.actor_system(),
237 cdc_store,
238 multi_store.clone(),
239 engine.clone(),
240 eventbus.clone(),
241 );
242 eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
243 cdc_handle.actor_ref().clone(),
244 runtime.clock().clone(),
245 ));
246 ioc_for_cdc.register_service::<Arc<ActorHandle<CdcProduceMsg>>>(Arc::new(cdc_handle));
247 }
248
249 TestEngine {
250 engine,
251 }
252 }
253}
254
255pub fn create_test_admin_transaction() -> AdminTransaction {
256 let multi_store = MultiStore::testing_memory();
257 let single_store = SingleStore::testing_memory();
258
259 let actor_system = ActorSystem::new(1);
260 let event_bus = EventBus::new(&actor_system);
261 let single = SingleTransaction::new(single_store, event_bus.clone());
262 let system_config = SystemConfig::new();
263 register_oracle_defaults(&system_config);
264 let multi = MultiTransaction::new(
265 multi_store,
266 single.clone(),
267 event_bus.clone(),
268 actor_system,
269 Clock::Mock(MockClock::from_millis(1000)),
270 Rng::seeded(42),
271 system_config,
272 )
273 .unwrap();
274
275 AdminTransaction::new(
276 multi,
277 single,
278 event_bus,
279 Interceptors::new(),
280 IdentityId::system(),
281 Clock::Mock(MockClock::from_millis(1000)),
282 )
283 .unwrap()
284}
285
286pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
287 let multi_store = MultiStore::testing_memory();
288 let single_store = SingleStore::testing_memory();
289
290 let actor_system = ActorSystem::new(1);
291 let event_bus = EventBus::new(&actor_system);
292 let single = SingleTransaction::new(single_store, event_bus.clone());
293 let system_config = SystemConfig::new();
294 register_oracle_defaults(&system_config);
295 let multi = MultiTransaction::new(
296 multi_store,
297 single.clone(),
298 event_bus.clone(),
299 actor_system,
300 Clock::Mock(MockClock::from_millis(1000)),
301 Rng::seeded(42),
302 system_config,
303 )
304 .unwrap();
305 let mut result = AdminTransaction::new(
306 multi,
307 single.clone(),
308 event_bus.clone(),
309 Interceptors::new(),
310 IdentityId::system(),
311 Clock::Mock(MockClock::from_millis(1000)),
312 )
313 .unwrap();
314
315 let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
316 let catalog = Catalog::new(materialized_catalog);
317
318 let namespace = catalog
319 .create_namespace(
320 &mut result,
321 NamespaceToCreate {
322 namespace_fragment: None,
323 name: "reifydb".to_string(),
324 local_name: "reifydb".to_string(),
325 parent_id: NamespaceId::ROOT,
326 grpc: None,
327 token: None,
328 },
329 )
330 .unwrap();
331
332 catalog.create_table(
333 &mut result,
334 TableToCreate {
335 name: Fragment::internal("flows"),
336 namespace: namespace.id(),
337 columns: vec![
338 TableColumnToCreate {
339 name: Fragment::internal("id"),
340 fragment: Fragment::None,
341 constraint: TypeConstraint::unconstrained(Type::Int8),
342 properties: vec![],
343 auto_increment: true,
344 dictionary_id: None,
345 },
346 TableColumnToCreate {
347 name: Fragment::internal("data"),
348 fragment: Fragment::None,
349 constraint: TypeConstraint::unconstrained(Type::Blob),
350 properties: vec![],
351 auto_increment: false,
352 dictionary_id: None,
353 },
354 ],
355 retention_strategy: None,
356 primary_key_columns: None,
357 },
358 )
359 .unwrap();
360
361 result
362}