1use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7 catalog::{
8 Catalog,
9 namespace::NamespaceToCreate,
10 table::{TableColumnToCreate, TableToCreate},
11 },
12 materialized::MaterializedCatalog,
13};
14use reifydb_cdc::{
15 produce::producer::{CdcProduceMsg, CdcProducerEventListener, spawn_cdc_producer},
16 storage::CdcStore,
17};
18use reifydb_core::{
19 event::{
20 EventBus,
21 metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
22 transaction::PostCommitEvent,
23 },
24 interface::catalog::id::NamespaceId,
25 util::ioc::IocContainer,
26};
27use reifydb_extension::transform::registry::Transforms;
28use reifydb_metric_old::worker::{
29 CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
30};
31use reifydb_routine::{function::default_functions, procedure::registry::Procedures};
32use reifydb_runtime::{
33 SharedRuntime, SharedRuntimeConfig,
34 actor::system::{ActorHandle, ActorSystem},
35 context::{
36 RuntimeContext,
37 clock::{Clock, MockClock},
38 rng::Rng,
39 },
40};
41use reifydb_store_multi::MultiStore;
42use reifydb_store_single::SingleStore;
43use reifydb_transaction::{
44 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
45 multi::transaction::MultiTransaction,
46 single::SingleTransaction,
47 transaction::admin::AdminTransaction,
48};
49use reifydb_type::{
50 fragment::Fragment,
51 params::Params,
52 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
53};
54
55use crate::{engine::StandardEngine, vm::services::EngineConfig};
56
57pub struct TestEngine {
58 engine: StandardEngine,
59}
60
61impl Default for TestEngine {
62 fn default() -> Self {
63 Self::new()
64 }
65}
66
67impl TestEngine {
68 pub fn new() -> Self {
70 Self::builder().with_cdc().with_metrics().build()
71 }
72
73 pub fn builder() -> TestEngineBuilder {
75 TestEngineBuilder::default()
76 }
77
78 pub fn admin(&self, rql: &str) -> Vec<Frame> {
80 self.engine
81 .admin_as(IdentityId::system(), rql, Params::None)
82 .unwrap_or_else(|e| panic!("admin failed: {e:?}\nrql: {rql}"))
83 }
84
85 pub fn command(&self, rql: &str) -> Vec<Frame> {
87 self.engine
88 .command_as(IdentityId::system(), rql, Params::None)
89 .unwrap_or_else(|e| panic!("command failed: {e:?}\nrql: {rql}"))
90 }
91
92 pub fn query(&self, rql: &str) -> Vec<Frame> {
94 self.engine
95 .query_as(IdentityId::system(), rql, Params::None)
96 .unwrap_or_else(|e| panic!("query failed: {e:?}\nrql: {rql}"))
97 }
98
99 pub fn admin_err(&self, rql: &str) -> String {
101 match self.engine.admin_as(IdentityId::system(), rql, Params::None) {
102 Err(e) => format!("{e:?}"),
103 Ok(_) => panic!("Expected error but admin succeeded\nrql: {rql}"),
104 }
105 }
106
107 pub fn command_err(&self, rql: &str) -> String {
109 match self.engine.command_as(IdentityId::system(), rql, Params::None) {
110 Err(e) => format!("{e:?}"),
111 Ok(_) => panic!("Expected error but command succeeded\nrql: {rql}"),
112 }
113 }
114
115 pub fn query_err(&self, rql: &str) -> String {
117 match self.engine.query_as(IdentityId::system(), rql, Params::None) {
118 Err(e) => format!("{e:?}"),
119 Ok(_) => panic!("Expected error but query succeeded\nrql: {rql}"),
120 }
121 }
122
123 pub fn row_count(frames: &[Frame]) -> usize {
125 frames.first().map(|f| f.row_count()).unwrap_or(0)
126 }
127
128 pub fn identity() -> IdentityId {
130 IdentityId::system()
131 }
132
133 pub fn inner(&self) -> &StandardEngine {
135 &self.engine
136 }
137}
138
139impl Deref for TestEngine {
140 type Target = StandardEngine;
141
142 fn deref(&self) -> &StandardEngine {
143 &self.engine
144 }
145}
146
147#[derive(Default)]
148pub struct TestEngineBuilder {
149 cdc: bool,
150 metrics: bool,
151}
152
153impl TestEngineBuilder {
154 pub fn with_cdc(mut self) -> Self {
155 self.cdc = true;
156 self
157 }
158
159 pub fn with_metrics(mut self) -> Self {
160 self.metrics = true;
161 self
162 }
163
164 pub fn build(self) -> TestEngine {
165 let actor_system = ActorSystem::new(1);
166 let eventbus = EventBus::new(&actor_system);
167 let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
168 let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
169 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
170 let runtime = SharedRuntime::from_config(
171 SharedRuntimeConfig::default().async_threads(2).compute_threads(2).deterministic_testing(1000),
172 );
173 let materialized_catalog = MaterializedCatalog::new();
174 let multi = MultiTransaction::new(
175 multi_store.clone(),
176 single.clone(),
177 eventbus.clone(),
178 actor_system,
179 runtime.clock().clone(),
180 runtime.rng().clone(),
181 Arc::new(materialized_catalog.clone()),
182 )
183 .unwrap();
184
185 let mut ioc = IocContainer::new();
186
187 ioc = ioc.register(materialized_catalog.clone());
188
189 ioc = ioc.register(runtime.clone());
190 ioc = ioc.register(single_store.clone());
191
192 if self.metrics {
193 let metrics_worker = Arc::new(MetricsWorker::new(
194 MetricsWorkerConfig::default(),
195 single_store.clone(),
196 multi_store.clone(),
197 eventbus.clone(),
198 ));
199 eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(
200 metrics_worker.sender(),
201 ));
202 eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
203 eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(
204 metrics_worker.sender(),
205 ));
206 ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
207 }
208
209 let cdc_store = CdcStore::memory();
210 ioc = ioc.register(cdc_store.clone());
211
212 let ioc_for_cdc = ioc.clone();
213
214 let engine = StandardEngine::new(
215 multi,
216 single.clone(),
217 eventbus.clone(),
218 InterceptorFactory::default(),
219 Catalog::new(materialized_catalog),
220 EngineConfig {
221 runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
222 functions: default_functions().configure(),
223 procedures: Procedures::empty(),
224 transforms: Transforms::empty(),
225 ioc,
226 #[cfg(not(target_arch = "wasm32"))]
227 remote_registry: None,
228 },
229 );
230
231 if self.cdc {
232 let cdc_handle = spawn_cdc_producer(
233 &runtime.actor_system(),
234 cdc_store,
235 multi_store.clone(),
236 engine.clone(),
237 eventbus.clone(),
238 );
239 eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
240 cdc_handle.actor_ref().clone(),
241 runtime.clock().clone(),
242 ));
243 ioc_for_cdc.register_service::<Arc<ActorHandle<CdcProduceMsg>>>(Arc::new(cdc_handle));
244 }
245
246 TestEngine {
247 engine,
248 }
249 }
250}
251
252pub fn create_test_admin_transaction() -> AdminTransaction {
253 let multi_store = MultiStore::testing_memory();
254 let single_store = SingleStore::testing_memory();
255
256 let actor_system = ActorSystem::new(1);
257 let event_bus = EventBus::new(&actor_system);
258 let single = SingleTransaction::new(single_store, event_bus.clone());
259 let multi = MultiTransaction::new(
260 multi_store,
261 single.clone(),
262 event_bus.clone(),
263 actor_system,
264 Clock::Mock(MockClock::from_millis(1000)),
265 Rng::seeded(42),
266 Arc::new(MaterializedCatalog::new()),
267 )
268 .unwrap();
269
270 AdminTransaction::new(
271 multi,
272 single,
273 event_bus,
274 Interceptors::new(),
275 IdentityId::system(),
276 Clock::Mock(MockClock::from_millis(1000)),
277 )
278 .unwrap()
279}
280
281pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
282 let multi_store = MultiStore::testing_memory();
283 let single_store = SingleStore::testing_memory();
284
285 let actor_system = ActorSystem::new(1);
286 let event_bus = EventBus::new(&actor_system);
287 let single = SingleTransaction::new(single_store, event_bus.clone());
288 let multi = MultiTransaction::new(
289 multi_store,
290 single.clone(),
291 event_bus.clone(),
292 actor_system,
293 Clock::Mock(MockClock::from_millis(1000)),
294 Rng::seeded(42),
295 Arc::new(MaterializedCatalog::new()),
296 )
297 .unwrap();
298 let mut result = AdminTransaction::new(
299 multi,
300 single.clone(),
301 event_bus.clone(),
302 Interceptors::new(),
303 IdentityId::system(),
304 Clock::Mock(MockClock::from_millis(1000)),
305 )
306 .unwrap();
307
308 let materialized_catalog = MaterializedCatalog::new();
309 let catalog = Catalog::new(materialized_catalog);
310
311 let namespace = catalog
312 .create_namespace(
313 &mut result,
314 NamespaceToCreate {
315 namespace_fragment: None,
316 name: "reifydb".to_string(),
317 local_name: "reifydb".to_string(),
318 parent_id: NamespaceId::ROOT,
319 grpc: None,
320 token: None,
321 },
322 )
323 .unwrap();
324
325 catalog.create_table(
326 &mut result,
327 TableToCreate {
328 name: Fragment::internal("flows"),
329 namespace: namespace.id(),
330 columns: vec![
331 TableColumnToCreate {
332 name: Fragment::internal("id"),
333 fragment: Fragment::None,
334 constraint: TypeConstraint::unconstrained(Type::Int8),
335 properties: vec![],
336 auto_increment: true,
337 dictionary_id: None,
338 },
339 TableColumnToCreate {
340 name: Fragment::internal("data"),
341 fragment: Fragment::None,
342 constraint: TypeConstraint::unconstrained(Type::Blob),
343 properties: vec![],
344 auto_increment: false,
345 dictionary_id: None,
346 },
347 ],
348 retention_strategy: None,
349 primary_key_columns: None,
350 },
351 )
352 .unwrap();
353
354 result
355}