1use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7 catalog::{
8 Catalog,
9 namespace::NamespaceToCreate,
10 table::{TableColumnToCreate, TableToCreate},
11 },
12 materialized::MaterializedCatalog,
13};
14use reifydb_cdc::{
15 produce::producer::{CdcProducerEventListener, spawn_cdc_producer},
16 storage::CdcStore,
17};
18use reifydb_core::{
19 actors::cdc::CdcProduceHandle,
20 event::{
21 EventBus,
22 metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
23 transaction::PostCommitEvent,
24 },
25 interface::catalog::id::NamespaceId,
26 util::ioc::IocContainer,
27};
28use reifydb_extension::transform::registry::Transforms;
29use reifydb_metric_old::worker::{
30 CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
31};
32use reifydb_routine::{function::default_functions, procedure::registry::Procedures};
33use reifydb_runtime::{
34 SharedRuntime, SharedRuntimeConfig,
35 actor::system::ActorSystem,
36 context::{
37 RuntimeContext,
38 clock::{Clock, MockClock},
39 rng::Rng,
40 },
41 pool::{PoolConfig, Pools},
42};
43use reifydb_store_multi::MultiStore;
44use reifydb_store_single::SingleStore;
45use reifydb_transaction::{
46 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
47 multi::transaction::MultiTransaction,
48 single::SingleTransaction,
49 transaction::admin::AdminTransaction,
50};
51use reifydb_type::{
52 fragment::Fragment,
53 params::Params,
54 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
55};
56
57use crate::{engine::StandardEngine, vm::services::EngineConfig};
58
59pub struct TestEngine {
60 engine: StandardEngine,
61}
62
63impl Default for TestEngine {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69impl TestEngine {
70 pub fn new() -> Self {
72 Self::builder().with_cdc().with_metrics().build()
73 }
74
75 pub fn builder() -> TestEngineBuilder {
77 TestEngineBuilder::default()
78 }
79
80 pub fn admin(&self, rql: &str) -> Vec<Frame> {
82 let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
83 if let Some(e) = r.error {
84 panic!("admin failed: {e:?}\nrql: {rql}")
85 }
86 r.frames
87 }
88
89 pub fn command(&self, rql: &str) -> Vec<Frame> {
91 let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
92 if let Some(e) = r.error {
93 panic!("command failed: {e:?}\nrql: {rql}")
94 }
95 r.frames
96 }
97
98 pub fn query(&self, rql: &str) -> Vec<Frame> {
100 let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
101 if let Some(e) = r.error {
102 panic!("query failed: {e:?}\nrql: {rql}")
103 }
104 r.frames
105 }
106
107 pub fn admin_err(&self, rql: &str) -> String {
109 let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
110 match r.error {
111 Some(e) => format!("{e:?}"),
112 None => panic!("Expected error but admin succeeded\nrql: {rql}"),
113 }
114 }
115
116 pub fn command_err(&self, rql: &str) -> String {
118 let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
119 match r.error {
120 Some(e) => format!("{e:?}"),
121 None => panic!("Expected error but command succeeded\nrql: {rql}"),
122 }
123 }
124
125 pub fn query_err(&self, rql: &str) -> String {
127 let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
128 match r.error {
129 Some(e) => format!("{e:?}"),
130 None => panic!("Expected error but query succeeded\nrql: {rql}"),
131 }
132 }
133
134 pub fn row_count(frames: &[Frame]) -> usize {
136 frames.first().map(|f| f.row_count()).unwrap_or(0)
137 }
138
139 pub fn identity() -> IdentityId {
141 IdentityId::system()
142 }
143
144 pub fn inner(&self) -> &StandardEngine {
146 &self.engine
147 }
148}
149
150impl Deref for TestEngine {
151 type Target = StandardEngine;
152
153 fn deref(&self) -> &StandardEngine {
154 &self.engine
155 }
156}
157
158#[derive(Default)]
159pub struct TestEngineBuilder {
160 cdc: bool,
161 metrics: bool,
162}
163
164impl TestEngineBuilder {
165 pub fn with_cdc(mut self) -> Self {
166 self.cdc = true;
167 self
168 }
169
170 pub fn with_metrics(mut self) -> Self {
171 self.metrics = true;
172 self
173 }
174
175 pub fn build(self) -> TestEngine {
176 let pools = Pools::new(PoolConfig::default());
177 let actor_system = ActorSystem::new(pools, Clock::Real);
178 let eventbus = EventBus::new(&actor_system);
179 let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
180 let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
181 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
182 let runtime = SharedRuntime::from_config(
183 SharedRuntimeConfig::default()
184 .async_threads(2)
185 .system_threads(2)
186 .query_threads(2)
187 .deterministic_testing(1000),
188 );
189 let materialized_catalog = MaterializedCatalog::new();
190 let multi = MultiTransaction::new(
191 multi_store.clone(),
192 single.clone(),
193 eventbus.clone(),
194 actor_system,
195 runtime.clock().clone(),
196 runtime.rng().clone(),
197 Arc::new(materialized_catalog.clone()),
198 )
199 .unwrap();
200
201 let mut ioc = IocContainer::new();
202
203 ioc = ioc.register(materialized_catalog.clone());
204
205 ioc = ioc.register(runtime.clone());
206 ioc = ioc.register(single_store.clone());
207
208 if self.metrics {
209 let metrics_worker = Arc::new(MetricsWorker::new(
210 MetricsWorkerConfig::default(),
211 single_store.clone(),
212 multi_store.clone(),
213 eventbus.clone(),
214 ));
215 eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(
216 metrics_worker.sender(),
217 ));
218 eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
219 eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(
220 metrics_worker.sender(),
221 ));
222 ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
223 }
224
225 let cdc_store = CdcStore::memory();
226 ioc = ioc.register(cdc_store.clone());
227
228 let ioc_for_cdc = ioc.clone();
229
230 let engine = StandardEngine::new(
231 multi,
232 single.clone(),
233 eventbus.clone(),
234 InterceptorFactory::default(),
235 Catalog::new(materialized_catalog),
236 EngineConfig {
237 runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
238 functions: default_functions().configure(),
239 procedures: Procedures::empty(),
240 transforms: Transforms::empty(),
241 ioc,
242 #[cfg(not(reifydb_single_threaded))]
243 remote_registry: None,
244 },
245 );
246
247 if self.cdc {
248 let cdc_handle = spawn_cdc_producer(
249 &runtime.actor_system(),
250 cdc_store,
251 multi_store.clone(),
252 engine.clone(),
253 eventbus.clone(),
254 );
255 eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
256 cdc_handle.actor_ref().clone(),
257 runtime.clock().clone(),
258 ));
259 ioc_for_cdc.register_service::<Arc<CdcProduceHandle>>(Arc::new(cdc_handle));
260 }
261
262 TestEngine {
263 engine,
264 }
265 }
266}
267
268pub fn create_test_admin_transaction() -> AdminTransaction {
269 let multi_store = MultiStore::testing_memory();
270 let single_store = SingleStore::testing_memory();
271
272 let pools = Pools::new(PoolConfig::default());
273 let actor_system = ActorSystem::new(pools, Clock::Real);
274 let event_bus = EventBus::new(&actor_system);
275 let single = SingleTransaction::new(single_store, event_bus.clone());
276 let multi = MultiTransaction::new(
277 multi_store,
278 single.clone(),
279 event_bus.clone(),
280 actor_system,
281 Clock::Mock(MockClock::from_millis(1000)),
282 Rng::seeded(42),
283 Arc::new(MaterializedCatalog::new()),
284 )
285 .unwrap();
286
287 AdminTransaction::new(
288 multi,
289 single,
290 event_bus,
291 Interceptors::new(),
292 IdentityId::system(),
293 Clock::Mock(MockClock::from_millis(1000)),
294 )
295 .unwrap()
296}
297
298pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
299 let multi_store = MultiStore::testing_memory();
300 let single_store = SingleStore::testing_memory();
301
302 let pools = Pools::new(PoolConfig::default());
303 let actor_system = ActorSystem::new(pools, Clock::Real);
304 let event_bus = EventBus::new(&actor_system);
305 let single = SingleTransaction::new(single_store, event_bus.clone());
306 let multi = MultiTransaction::new(
307 multi_store,
308 single.clone(),
309 event_bus.clone(),
310 actor_system,
311 Clock::Mock(MockClock::from_millis(1000)),
312 Rng::seeded(42),
313 Arc::new(MaterializedCatalog::new()),
314 )
315 .unwrap();
316 let mut result = AdminTransaction::new(
317 multi,
318 single.clone(),
319 event_bus.clone(),
320 Interceptors::new(),
321 IdentityId::system(),
322 Clock::Mock(MockClock::from_millis(1000)),
323 )
324 .unwrap();
325
326 let materialized_catalog = MaterializedCatalog::new();
327 let catalog = Catalog::new(materialized_catalog);
328
329 let namespace = catalog
330 .create_namespace(
331 &mut result,
332 NamespaceToCreate {
333 namespace_fragment: None,
334 name: "reifydb".to_string(),
335 local_name: "reifydb".to_string(),
336 parent_id: NamespaceId::ROOT,
337 grpc: None,
338 token: None,
339 },
340 )
341 .unwrap();
342
343 catalog.create_table(
344 &mut result,
345 TableToCreate {
346 name: Fragment::internal("flows"),
347 namespace: namespace.id(),
348 columns: vec![
349 TableColumnToCreate {
350 name: Fragment::internal("id"),
351 fragment: Fragment::None,
352 constraint: TypeConstraint::unconstrained(Type::Int8),
353 properties: vec![],
354 auto_increment: true,
355 dictionary_id: None,
356 },
357 TableColumnToCreate {
358 name: Fragment::internal("data"),
359 fragment: Fragment::None,
360 constraint: TypeConstraint::unconstrained(Type::Blob),
361 properties: vec![],
362 auto_increment: false,
363 dictionary_id: None,
364 },
365 ],
366 retention_strategy: None,
367 primary_key_columns: None,
368 underlying: false,
369 },
370 )
371 .unwrap();
372
373 result
374}