1use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7 cache::CatalogCache,
8 catalog::{
9 Catalog,
10 namespace::NamespaceToCreate,
11 table::{TableColumnToCreate, TableToCreate},
12 },
13};
14use reifydb_cdc::{
15 produce::{
16 producer::{CdcProducerEventListener, spawn_cdc_producer},
17 watermark::CdcProducerWatermark,
18 },
19 storage::CdcStore,
20};
21use reifydb_core::{
22 actors::cdc::CdcProduceHandle,
23 event::{EventBus, transaction::PostCommitEvent},
24 interface::catalog::id::NamespaceId,
25 util::ioc::IocContainer,
26};
27use reifydb_extension::transform::registry::Transforms;
28use reifydb_routine::{
29 function::default_native_functions, procedure::default_native_procedures, routine::registry::Routines,
30};
31use reifydb_runtime::{
32 SharedRuntime, SharedRuntimeConfig,
33 actor::system::ActorSystem,
34 context::{
35 RuntimeContext,
36 clock::{Clock, MockClock},
37 rng::Rng,
38 },
39 pool::{PoolConfig, Pools},
40};
41#[cfg(not(target_arch = "wasm32"))]
42use reifydb_sqlite::SqliteConfig;
43use reifydb_store_multi::MultiStore;
44use reifydb_store_single::SingleStore;
45use reifydb_transaction::{
46 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
47 multi::transaction::MultiTransaction,
48 single::SingleTransaction,
49 transaction::admin::AdminTransaction,
50};
51use reifydb_type::{
52 fragment::Fragment,
53 params::Params,
54 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
55};
56
57use crate::{engine::StandardEngine, vm::services::EngineConfig};
58
59pub struct TestEngine {
60 engine: StandardEngine,
61 mock_clock: MockClock,
62}
63
64impl Default for TestEngine {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl TestEngine {
71 pub fn new() -> Self {
72 Self::builder().with_cdc().build()
73 }
74
75 pub fn builder() -> TestEngineBuilder {
76 TestEngineBuilder::default()
77 }
78
79 pub fn admin(&self, rql: &str) -> Vec<Frame> {
80 let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
81 if let Some(e) = r.error {
82 panic!("admin failed: {e:?}\nrql: {rql}")
83 }
84 r.frames
85 }
86
87 pub fn command(&self, rql: &str) -> Vec<Frame> {
88 let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
89 if let Some(e) = r.error {
90 panic!("command failed: {e:?}\nrql: {rql}")
91 }
92 r.frames
93 }
94
95 pub fn query(&self, rql: &str) -> Vec<Frame> {
96 let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
97 if let Some(e) = r.error {
98 panic!("query failed: {e:?}\nrql: {rql}")
99 }
100 r.frames
101 }
102
103 pub fn admin_err(&self, rql: &str) -> String {
104 let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
105 match r.error {
106 Some(e) => format!("{e:?}"),
107 None => panic!("Expected error but admin succeeded\nrql: {rql}"),
108 }
109 }
110
111 pub fn command_err(&self, rql: &str) -> String {
112 let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
113 match r.error {
114 Some(e) => format!("{e:?}"),
115 None => panic!("Expected error but command succeeded\nrql: {rql}"),
116 }
117 }
118
119 pub fn query_err(&self, rql: &str) -> String {
120 let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
121 match r.error {
122 Some(e) => format!("{e:?}"),
123 None => panic!("Expected error but query succeeded\nrql: {rql}"),
124 }
125 }
126
127 pub fn row_count(frames: &[Frame]) -> usize {
128 frames.first().map(|f| f.row_count()).unwrap_or(0)
129 }
130
131 pub fn identity() -> IdentityId {
132 IdentityId::system()
133 }
134
135 pub fn inner(&self) -> &StandardEngine {
136 &self.engine
137 }
138
139 pub fn mock_clock(&self) -> MockClock {
140 self.mock_clock.clone()
141 }
142}
143
144impl Deref for TestEngine {
145 type Target = StandardEngine;
146
147 fn deref(&self) -> &StandardEngine {
148 &self.engine
149 }
150}
151
152#[derive(Default)]
153pub struct TestEngineBuilder {
154 cdc: bool,
155 #[cfg(not(target_arch = "wasm32"))]
156 sqlite_cdc: Option<SqliteConfig>,
157}
158
159impl TestEngineBuilder {
160 pub fn with_cdc(mut self) -> Self {
161 self.cdc = true;
162 self
163 }
164
165 #[cfg(not(target_arch = "wasm32"))]
166 pub fn with_sqlite_cdc(mut self, config: SqliteConfig) -> Self {
167 self.cdc = true;
168 self.sqlite_cdc = Some(config);
169 self
170 }
171
172 pub fn build(self) -> TestEngine {
173 let mock_clock = MockClock::from_millis(1000);
174 let pools = Pools::new(PoolConfig::default());
175 let actor_system = ActorSystem::new(pools, Clock::Mock(mock_clock.clone()));
176 let eventbus = EventBus::new(&actor_system);
177 let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
178 let single_store = SingleStore::testing_memory();
179 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
180 let runtime = make_test_runtime(&mock_clock);
181 let catalog_cache = CatalogCache::new();
182 let multi = MultiTransaction::new(
183 multi_store.clone(),
184 single.clone(),
185 eventbus.clone(),
186 actor_system,
187 runtime.clock().clone(),
188 runtime.rng().clone(),
189 Arc::new(catalog_cache.clone()),
190 )
191 .unwrap();
192
193 let mut ioc = IocContainer::new();
194 ioc = ioc.register(catalog_cache.clone());
195 ioc = ioc.register(runtime.clone());
196 ioc = ioc.register(single_store.clone());
197
198 #[cfg(not(target_arch = "wasm32"))]
199 let cdc_store = match self.sqlite_cdc {
200 Some(config) => CdcStore::sqlite(config),
201 None => CdcStore::memory(),
202 };
203 #[cfg(target_arch = "wasm32")]
204 let cdc_store = CdcStore::memory();
205 ioc = ioc.register(cdc_store.clone());
206
207 let cdc_producer_watermark = CdcProducerWatermark::new();
208 ioc = ioc.register(cdc_producer_watermark.clone());
209
210 let ioc_for_cdc = ioc.clone();
211
212 let engine = StandardEngine::new(
213 multi,
214 single.clone(),
215 eventbus.clone(),
216 InterceptorFactory::default(),
217 Catalog::new(catalog_cache),
218 EngineConfig {
219 runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
220 routines: {
221 let b = Routines::builder();
222 let b = default_native_functions(b);
223 default_native_procedures(b).configure()
224 },
225 transforms: Transforms::empty(),
226 ioc,
227 #[cfg(not(reifydb_single_threaded))]
228 remote_registry: None,
229 },
230 );
231
232 if self.cdc {
233 register_cdc_producer(
234 &runtime,
235 cdc_store,
236 multi_store,
237 &engine,
238 &eventbus,
239 ioc_for_cdc,
240 cdc_producer_watermark,
241 );
242 }
243
244 TestEngine {
245 engine,
246 mock_clock,
247 }
248 }
249}
250
251#[inline]
252fn make_test_runtime(mock_clock: &MockClock) -> SharedRuntime {
253 let config = SharedRuntimeConfig::default().seeded(1000);
254 let config = SharedRuntimeConfig {
255 clock: Clock::Mock(mock_clock.clone()),
256 ..config
257 };
258 let pools = PoolConfig {
259 async_threads: 2,
260 system_threads: 2,
261 query_threads: 2,
262 };
263 SharedRuntime::from_config(config, pools)
264}
265
266fn register_cdc_producer(
267 runtime: &SharedRuntime,
268 cdc_store: CdcStore,
269 multi_store: MultiStore,
270 engine: &StandardEngine,
271 eventbus: &EventBus,
272 ioc_for_cdc: IocContainer,
273 watermark: CdcProducerWatermark,
274) {
275 let cdc_handle = spawn_cdc_producer(
276 &runtime.actor_system(),
277 cdc_store,
278 multi_store,
279 engine.clone(),
280 eventbus.clone(),
281 runtime.clock().clone(),
282 watermark,
283 );
284 eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
285 cdc_handle.actor_ref().clone(),
286 runtime.clock().clone(),
287 ));
288 ioc_for_cdc.register_service::<Arc<CdcProduceHandle>>(Arc::new(cdc_handle));
289}
290
291pub fn create_test_admin_transaction() -> AdminTransaction {
292 let multi_store = MultiStore::testing_memory();
293 let single_store = SingleStore::testing_memory();
294
295 let pools = Pools::new(PoolConfig::sync_only());
296 let actor_system = ActorSystem::new(pools, Clock::Real);
297 let event_bus = EventBus::new(&actor_system);
298 let single = SingleTransaction::new(single_store, event_bus.clone());
299 let multi = MultiTransaction::new(
300 multi_store,
301 single.clone(),
302 event_bus.clone(),
303 actor_system,
304 Clock::Mock(MockClock::from_millis(1000)),
305 Rng::seeded(42),
306 Arc::new(CatalogCache::new()),
307 )
308 .unwrap();
309
310 AdminTransaction::new(
311 multi,
312 single,
313 event_bus,
314 Interceptors::new(),
315 IdentityId::system(),
316 Clock::Mock(MockClock::from_millis(1000)),
317 )
318 .unwrap()
319}
320
321pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
322 let multi_store = MultiStore::testing_memory();
323 let single_store = SingleStore::testing_memory();
324
325 let pools = Pools::new(PoolConfig::sync_only());
326 let actor_system = ActorSystem::new(pools, Clock::Real);
327 let event_bus = EventBus::new(&actor_system);
328 let single = SingleTransaction::new(single_store, event_bus.clone());
329 let multi = MultiTransaction::new(
330 multi_store,
331 single.clone(),
332 event_bus.clone(),
333 actor_system,
334 Clock::Mock(MockClock::from_millis(1000)),
335 Rng::seeded(42),
336 Arc::new(CatalogCache::new()),
337 )
338 .unwrap();
339 let mut result = AdminTransaction::new(
340 multi,
341 single.clone(),
342 event_bus.clone(),
343 Interceptors::new(),
344 IdentityId::system(),
345 Clock::Mock(MockClock::from_millis(1000)),
346 )
347 .unwrap();
348
349 let catalog_cache = CatalogCache::new();
350 let catalog = Catalog::new(catalog_cache);
351
352 let namespace = catalog
353 .create_namespace(
354 &mut result,
355 NamespaceToCreate {
356 namespace_fragment: None,
357 name: "reifydb".to_string(),
358 local_name: "reifydb".to_string(),
359 parent_id: NamespaceId::ROOT,
360 grpc: None,
361 token: None,
362 },
363 )
364 .unwrap();
365
366 catalog.create_table(
367 &mut result,
368 TableToCreate {
369 name: Fragment::internal("flows"),
370 namespace: namespace.id(),
371 columns: vec![
372 TableColumnToCreate {
373 name: Fragment::internal("id"),
374 fragment: Fragment::None,
375 constraint: TypeConstraint::unconstrained(Type::Int8),
376 properties: vec![],
377 auto_increment: true,
378 dictionary_id: None,
379 },
380 TableColumnToCreate {
381 name: Fragment::internal("data"),
382 fragment: Fragment::None,
383 constraint: TypeConstraint::unconstrained(Type::Blob),
384 properties: vec![],
385 auto_increment: false,
386 dictionary_id: None,
387 },
388 ],
389 retention_strategy: None,
390 primary_key_columns: None,
391 underlying: false,
392 },
393 )
394 .unwrap();
395
396 result
397}