1use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7 cache::CatalogCache,
8 catalog::{
9 Catalog,
10 namespace::NamespaceToCreate,
11 table::{TableColumnToCreate, TableToCreate},
12 },
13};
14#[cfg(not(target_arch = "wasm32"))]
15use reifydb_cdc::storage::recent_cache::RecentCdcCache;
16use reifydb_cdc::{
17 consume::wake::CdcWakeRegistry,
18 produce::{
19 producer::{CdcProducerEventListener, spawn_cdc_producer},
20 watermark::CdcProducerWatermark,
21 },
22 storage::CdcStore,
23};
24use reifydb_core::{
25 actors::cdc::CdcProduceHandle,
26 event::{EventBus, transaction::PostCommitEvent},
27 interface::catalog::id::NamespaceId,
28 util::ioc::IocContainer,
29};
30use reifydb_extension::transform::registry::Transforms;
31use reifydb_routine::{
32 function::default_native_functions, procedure::default_native_procedures, routine::registry::Routines,
33};
34use reifydb_runtime::{
35 SharedRuntime, SharedRuntimeConfig,
36 actor::system::ActorSystem,
37 context::{
38 RuntimeContext,
39 clock::{Clock, MockClock},
40 rng::Rng,
41 },
42 pool::{PoolConfig, Pools},
43};
44#[cfg(not(target_arch = "wasm32"))]
45use reifydb_sqlite::SqliteConfig;
46use reifydb_store_multi::MultiStore;
47use reifydb_store_single::SingleStore;
48use reifydb_transaction::{
49 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
50 multi::transaction::MultiTransaction,
51 single::SingleTransaction,
52 transaction::admin::AdminTransaction,
53};
54use reifydb_value::{
55 fragment::Fragment,
56 params::Params,
57 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, value_type::ValueType},
58};
59
60use crate::{engine::StandardEngine, vm::services::EngineConfig};
61
62pub struct TestEngine {
63 engine: StandardEngine,
64 mock_clock: MockClock,
65}
66
67impl Default for TestEngine {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl TestEngine {
74 pub fn new() -> Self {
75 Self::builder().with_cdc().build()
76 }
77
78 pub fn builder() -> TestEngineBuilder {
79 TestEngineBuilder::default()
80 }
81
82 pub fn admin(&self, rql: &str) -> Vec<Frame> {
83 let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
84 if let Some(e) = r.error {
85 panic!("admin failed: {e:?}\nrql: {rql}")
86 }
87 r.frames
88 }
89
90 pub fn command(&self, rql: &str) -> Vec<Frame> {
91 let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
92 if let Some(e) = r.error {
93 panic!("command failed: {e:?}\nrql: {rql}")
94 }
95 r.frames
96 }
97
98 pub fn query(&self, rql: &str) -> Vec<Frame> {
99 let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
100 if let Some(e) = r.error {
101 panic!("query failed: {e:?}\nrql: {rql}")
102 }
103 r.frames
104 }
105
106 pub fn admin_err(&self, rql: &str) -> String {
107 let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
108 match r.error {
109 Some(e) => format!("{e:?}"),
110 None => panic!("Expected error but admin succeeded\nrql: {rql}"),
111 }
112 }
113
114 pub fn command_err(&self, rql: &str) -> String {
115 let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
116 match r.error {
117 Some(e) => format!("{e:?}"),
118 None => panic!("Expected error but command succeeded\nrql: {rql}"),
119 }
120 }
121
122 pub fn query_err(&self, rql: &str) -> String {
123 let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
124 match r.error {
125 Some(e) => format!("{e:?}"),
126 None => panic!("Expected error but query succeeded\nrql: {rql}"),
127 }
128 }
129
130 pub fn row_count(frames: &[Frame]) -> usize {
131 frames.first().map(|f| f.row_count()).unwrap_or(0)
132 }
133
134 pub fn identity() -> IdentityId {
135 IdentityId::system()
136 }
137
138 pub fn inner(&self) -> &StandardEngine {
139 &self.engine
140 }
141
142 pub fn mock_clock(&self) -> MockClock {
143 self.mock_clock.clone()
144 }
145}
146
147impl Deref for TestEngine {
148 type Target = StandardEngine;
149
150 fn deref(&self) -> &StandardEngine {
151 &self.engine
152 }
153}
154
155#[derive(Default)]
156pub struct TestEngineBuilder {
157 cdc: bool,
158 #[cfg(not(target_arch = "wasm32"))]
159 sqlite_cdc: Option<SqliteConfig>,
160}
161
162impl TestEngineBuilder {
163 pub fn with_cdc(mut self) -> Self {
164 self.cdc = true;
165 self
166 }
167
168 #[cfg(not(target_arch = "wasm32"))]
169 pub fn with_sqlite_cdc(mut self, config: SqliteConfig) -> Self {
170 self.cdc = true;
171 self.sqlite_cdc = Some(config);
172 self
173 }
174
175 pub fn build(self) -> TestEngine {
176 let mock_clock = MockClock::from_millis(1000);
177 let pools = Pools::new(PoolConfig::default());
178 let actor_system = ActorSystem::new(pools, Clock::Mock(mock_clock.clone()));
179 let eventbus = EventBus::new(&actor_system);
180 let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
181 let single_store = SingleStore::testing_memory();
182 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
183 let runtime = make_test_runtime(&mock_clock);
184 let catalog_cache = CatalogCache::new();
185 let multi = MultiTransaction::new(
186 multi_store.clone(),
187 single.clone(),
188 eventbus.clone(),
189 actor_system,
190 runtime.clock().clone(),
191 runtime.rng().clone(),
192 Arc::new(catalog_cache.clone()),
193 )
194 .unwrap();
195
196 let mut ioc = IocContainer::new();
197 ioc = ioc.register(catalog_cache.clone());
198 ioc = ioc.register(runtime.clone());
199 ioc = ioc.register(single_store.clone());
200 ioc = ioc.register(eventbus.clone());
201
202 #[cfg(not(target_arch = "wasm32"))]
203 let cdc_store = match self.sqlite_cdc {
204 Some(config) => CdcStore::sqlite(config, RecentCdcCache::DEFAULT_CAPACITY),
205 None => CdcStore::memory(),
206 };
207 #[cfg(target_arch = "wasm32")]
208 let cdc_store = CdcStore::memory();
209 ioc = ioc.register(cdc_store.clone());
210
211 let cdc_producer_watermark = CdcProducerWatermark::new();
212 ioc = ioc.register(cdc_producer_watermark.clone());
213
214 let cdc_wake_registry = CdcWakeRegistry::new();
215 ioc = ioc.register(cdc_wake_registry.clone());
216
217 let ioc_for_cdc = ioc.clone();
218
219 let engine = StandardEngine::new(
220 multi,
221 single.clone(),
222 eventbus.clone(),
223 InterceptorFactory::default(),
224 Catalog::new(catalog_cache),
225 EngineConfig {
226 runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
227 routines: {
228 let b = Routines::builder();
229 let b = default_native_functions(b);
230 default_native_procedures(b).configure()
231 },
232 transforms: Transforms::empty(),
233 ioc,
234 #[cfg(not(reifydb_single_threaded))]
235 remote_registry: None,
236 },
237 );
238
239 if self.cdc {
240 register_cdc_producer(
241 &runtime,
242 cdc_store,
243 multi_store,
244 &engine,
245 &eventbus,
246 ioc_for_cdc,
247 cdc_producer_watermark,
248 cdc_wake_registry,
249 );
250 }
251
252 TestEngine {
253 engine,
254 mock_clock,
255 }
256 }
257}
258
259#[inline]
260fn make_test_runtime(mock_clock: &MockClock) -> SharedRuntime {
261 let config = SharedRuntimeConfig::default().seeded(1000);
262 let config = SharedRuntimeConfig {
263 clock: Clock::Mock(mock_clock.clone()),
264 ..config
265 };
266 let pools = PoolConfig {
267 async_threads: 2,
268 system_threads: 2,
269 query_threads: 2,
270 commit_threads: 2,
271 background_threads: 1,
272 };
273 SharedRuntime::from_config(config, pools)
274}
275
276#[allow(clippy::too_many_arguments)]
277fn register_cdc_producer(
278 runtime: &SharedRuntime,
279 cdc_store: CdcStore,
280 multi_store: MultiStore,
281 engine: &StandardEngine,
282 eventbus: &EventBus,
283 ioc_for_cdc: IocContainer,
284 watermark: CdcProducerWatermark,
285 wake_registry: CdcWakeRegistry,
286) {
287 let cdc_handle = spawn_cdc_producer(
288 &runtime.actor_system(),
289 cdc_store,
290 multi_store,
291 engine.clone(),
292 eventbus.clone(),
293 runtime.clock().clone(),
294 watermark,
295 wake_registry,
296 );
297 eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
298 cdc_handle.actor_ref().clone(),
299 runtime.clock().clone(),
300 ));
301 ioc_for_cdc.register_service::<Arc<CdcProduceHandle>>(Arc::new(cdc_handle));
302}
303
304pub fn create_test_admin_transaction() -> AdminTransaction {
305 let multi_store = MultiStore::testing_memory();
306 let single_store = SingleStore::testing_memory();
307
308 let pools = Pools::new(PoolConfig::sync_only());
309 let actor_system = ActorSystem::new(pools, Clock::Real);
310 let event_bus = EventBus::new(&actor_system);
311 let single = SingleTransaction::new(single_store, event_bus.clone());
312 let multi = MultiTransaction::new(
313 multi_store,
314 single.clone(),
315 event_bus.clone(),
316 actor_system,
317 Clock::Mock(MockClock::from_millis(1000)),
318 Rng::seeded(42),
319 Arc::new(CatalogCache::new()),
320 )
321 .unwrap();
322
323 AdminTransaction::new(
324 multi,
325 single,
326 event_bus,
327 Interceptors::new(),
328 IdentityId::system(),
329 Clock::Mock(MockClock::from_millis(1000)),
330 )
331 .unwrap()
332}
333
334pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
335 let multi_store = MultiStore::testing_memory();
336 let single_store = SingleStore::testing_memory();
337
338 let pools = Pools::new(PoolConfig::sync_only());
339 let actor_system = ActorSystem::new(pools, Clock::Real);
340 let event_bus = EventBus::new(&actor_system);
341 let single = SingleTransaction::new(single_store, event_bus.clone());
342 let multi = MultiTransaction::new(
343 multi_store,
344 single.clone(),
345 event_bus.clone(),
346 actor_system,
347 Clock::Mock(MockClock::from_millis(1000)),
348 Rng::seeded(42),
349 Arc::new(CatalogCache::new()),
350 )
351 .unwrap();
352 let mut result = AdminTransaction::new(
353 multi,
354 single.clone(),
355 event_bus.clone(),
356 Interceptors::new(),
357 IdentityId::system(),
358 Clock::Mock(MockClock::from_millis(1000)),
359 )
360 .unwrap();
361
362 let catalog_cache = CatalogCache::new();
363 let catalog = Catalog::new(catalog_cache);
364
365 let namespace = catalog
366 .create_namespace(
367 &mut result,
368 NamespaceToCreate {
369 namespace_fragment: None,
370 name: "reifydb".to_string(),
371 local_name: "reifydb".to_string(),
372 parent_id: NamespaceId::ROOT,
373 grpc: None,
374 token: None,
375 },
376 )
377 .unwrap();
378
379 catalog.create_table(
380 &mut result,
381 TableToCreate {
382 name: Fragment::internal("flows"),
383 namespace: namespace.id(),
384 columns: vec![
385 TableColumnToCreate {
386 name: Fragment::internal("id"),
387 fragment: Fragment::None,
388 constraint: TypeConstraint::unconstrained(ValueType::Int8),
389 properties: vec![],
390 auto_increment: true,
391 dictionary_id: None,
392 },
393 TableColumnToCreate {
394 name: Fragment::internal("data"),
395 fragment: Fragment::None,
396 constraint: TypeConstraint::unconstrained(ValueType::Blob),
397 properties: vec![],
398 auto_increment: false,
399 dictionary_id: None,
400 },
401 ],
402 retention_strategy: None,
403 primary_key_columns: None,
404 underlying: false,
405 },
406 )
407 .unwrap();
408
409 result
410}