1use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7 catalog::{
8 Catalog,
9 namespace::NamespaceToCreate,
10 table::{TableColumnToCreate, TableToCreate},
11 },
12 materialized::MaterializedCatalog,
13};
14use reifydb_cdc::{
15 produce::{
16 producer::{CdcProducerEventListener, spawn_cdc_producer},
17 watermark::CdcProducerWatermark,
18 },
19 storage::CdcStore,
20};
21use reifydb_core::{
22 actors::cdc::CdcProduceHandle,
23 event::{EventBus, transaction::PostCommitEvent},
24 interface::catalog::id::NamespaceId,
25 util::ioc::IocContainer,
26};
27use reifydb_extension::transform::registry::Transforms;
28use reifydb_routine::{
29 function::default_native_functions, procedure::default_native_procedures, routine::registry::Routines,
30};
31use reifydb_runtime::{
32 SharedRuntime, SharedRuntimeConfig,
33 actor::system::ActorSystem,
34 context::{
35 RuntimeContext,
36 clock::{Clock, MockClock},
37 rng::Rng,
38 },
39 pool::{PoolConfig, Pools},
40};
41#[cfg(not(target_arch = "wasm32"))]
42use reifydb_sqlite::SqliteConfig;
43use reifydb_store_multi::MultiStore;
44use reifydb_store_single::SingleStore;
45use reifydb_transaction::{
46 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
47 multi::transaction::MultiTransaction,
48 single::SingleTransaction,
49 transaction::admin::AdminTransaction,
50};
51use reifydb_type::{
52 fragment::Fragment,
53 params::Params,
54 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
55};
56
57use crate::{engine::StandardEngine, vm::services::EngineConfig};
58
59pub struct TestEngine {
60 engine: StandardEngine,
61 mock_clock: MockClock,
62}
63
64impl Default for TestEngine {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl TestEngine {
71 pub fn new() -> Self {
73 Self::builder().with_cdc().build()
74 }
75
76 pub fn builder() -> TestEngineBuilder {
78 TestEngineBuilder::default()
79 }
80
81 pub fn admin(&self, rql: &str) -> Vec<Frame> {
83 let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
84 if let Some(e) = r.error {
85 panic!("admin failed: {e:?}\nrql: {rql}")
86 }
87 r.frames
88 }
89
90 pub fn command(&self, rql: &str) -> Vec<Frame> {
92 let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
93 if let Some(e) = r.error {
94 panic!("command failed: {e:?}\nrql: {rql}")
95 }
96 r.frames
97 }
98
99 pub fn query(&self, rql: &str) -> Vec<Frame> {
101 let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
102 if let Some(e) = r.error {
103 panic!("query failed: {e:?}\nrql: {rql}")
104 }
105 r.frames
106 }
107
108 pub fn admin_err(&self, rql: &str) -> String {
110 let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
111 match r.error {
112 Some(e) => format!("{e:?}"),
113 None => panic!("Expected error but admin succeeded\nrql: {rql}"),
114 }
115 }
116
117 pub fn command_err(&self, rql: &str) -> String {
119 let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
120 match r.error {
121 Some(e) => format!("{e:?}"),
122 None => panic!("Expected error but command succeeded\nrql: {rql}"),
123 }
124 }
125
126 pub fn query_err(&self, rql: &str) -> String {
128 let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
129 match r.error {
130 Some(e) => format!("{e:?}"),
131 None => panic!("Expected error but query succeeded\nrql: {rql}"),
132 }
133 }
134
135 pub fn row_count(frames: &[Frame]) -> usize {
137 frames.first().map(|f| f.row_count()).unwrap_or(0)
138 }
139
140 pub fn identity() -> IdentityId {
142 IdentityId::system()
143 }
144
145 pub fn inner(&self) -> &StandardEngine {
147 &self.engine
148 }
149
150 pub fn mock_clock(&self) -> MockClock {
153 self.mock_clock.clone()
154 }
155}
156
157impl Deref for TestEngine {
158 type Target = StandardEngine;
159
160 fn deref(&self) -> &StandardEngine {
161 &self.engine
162 }
163}
164
165#[derive(Default)]
166pub struct TestEngineBuilder {
167 cdc: bool,
168 #[cfg(not(target_arch = "wasm32"))]
169 sqlite_cdc: Option<SqliteConfig>,
170}
171
172impl TestEngineBuilder {
173 pub fn with_cdc(mut self) -> Self {
174 self.cdc = true;
175 self
176 }
177
178 #[cfg(not(target_arch = "wasm32"))]
181 pub fn with_sqlite_cdc(mut self, config: SqliteConfig) -> Self {
182 self.cdc = true;
183 self.sqlite_cdc = Some(config);
184 self
185 }
186
187 pub fn build(self) -> TestEngine {
188 let mock_clock = MockClock::from_millis(1000);
189 let pools = Pools::new(PoolConfig::default());
190 let actor_system = ActorSystem::new(pools, Clock::Mock(mock_clock.clone()));
191 let eventbus = EventBus::new(&actor_system);
192 let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
193 let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
194 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
195 let runtime = make_test_runtime(&mock_clock);
196 let materialized_catalog = MaterializedCatalog::new();
197 let multi = MultiTransaction::new(
198 multi_store.clone(),
199 single.clone(),
200 eventbus.clone(),
201 actor_system,
202 runtime.clock().clone(),
203 runtime.rng().clone(),
204 Arc::new(materialized_catalog.clone()),
205 )
206 .unwrap();
207
208 let mut ioc = IocContainer::new();
209 ioc = ioc.register(materialized_catalog.clone());
210 ioc = ioc.register(runtime.clone());
211 ioc = ioc.register(single_store.clone());
212
213 #[cfg(not(target_arch = "wasm32"))]
214 let cdc_store = match self.sqlite_cdc {
215 Some(config) => CdcStore::sqlite(config),
216 None => CdcStore::memory(),
217 };
218 #[cfg(target_arch = "wasm32")]
219 let cdc_store = CdcStore::memory();
220 ioc = ioc.register(cdc_store.clone());
221
222 let cdc_producer_watermark = CdcProducerWatermark::new();
223 ioc = ioc.register(cdc_producer_watermark.clone());
224
225 let ioc_for_cdc = ioc.clone();
226
227 let engine = StandardEngine::new(
228 multi,
229 single.clone(),
230 eventbus.clone(),
231 InterceptorFactory::default(),
232 Catalog::new(materialized_catalog),
233 EngineConfig {
234 runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
235 routines: {
236 let b = Routines::builder();
237 let b = default_native_functions(b);
238 default_native_procedures(b).configure()
239 },
240 transforms: Transforms::empty(),
241 ioc,
242 #[cfg(not(reifydb_single_threaded))]
243 remote_registry: None,
244 },
245 );
246
247 if self.cdc {
248 register_cdc_producer(
249 &runtime,
250 cdc_store,
251 multi_store,
252 &engine,
253 &eventbus,
254 ioc_for_cdc,
255 cdc_producer_watermark,
256 );
257 }
258
259 TestEngine {
260 engine,
261 mock_clock,
262 }
263 }
264}
265
266#[inline]
267fn make_test_runtime(mock_clock: &MockClock) -> SharedRuntime {
268 let base = SharedRuntimeConfig::default().async_threads(2).system_threads(2).query_threads(2).seeded(1000);
269 let config = SharedRuntimeConfig {
270 clock: Clock::Mock(mock_clock.clone()),
271 ..base
272 };
273 SharedRuntime::from_config(config)
274}
275
276fn register_cdc_producer(
277 runtime: &SharedRuntime,
278 cdc_store: CdcStore,
279 multi_store: MultiStore,
280 engine: &StandardEngine,
281 eventbus: &EventBus,
282 ioc_for_cdc: IocContainer,
283 watermark: CdcProducerWatermark,
284) {
285 let cdc_handle = spawn_cdc_producer(
286 &runtime.actor_system(),
287 cdc_store,
288 multi_store,
289 engine.clone(),
290 eventbus.clone(),
291 runtime.clock().clone(),
292 watermark,
293 );
294 eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
295 cdc_handle.actor_ref().clone(),
296 runtime.clock().clone(),
297 ));
298 ioc_for_cdc.register_service::<Arc<CdcProduceHandle>>(Arc::new(cdc_handle));
299}
300
301pub fn create_test_admin_transaction() -> AdminTransaction {
302 let multi_store = MultiStore::testing_memory();
303 let single_store = SingleStore::testing_memory();
304
305 let pools = Pools::new(PoolConfig::default());
306 let actor_system = ActorSystem::new(pools, Clock::Real);
307 let event_bus = EventBus::new(&actor_system);
308 let single = SingleTransaction::new(single_store, event_bus.clone());
309 let multi = MultiTransaction::new(
310 multi_store,
311 single.clone(),
312 event_bus.clone(),
313 actor_system,
314 Clock::Mock(MockClock::from_millis(1000)),
315 Rng::seeded(42),
316 Arc::new(MaterializedCatalog::new()),
317 )
318 .unwrap();
319
320 AdminTransaction::new(
321 multi,
322 single,
323 event_bus,
324 Interceptors::new(),
325 IdentityId::system(),
326 Clock::Mock(MockClock::from_millis(1000)),
327 )
328 .unwrap()
329}
330
331pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
332 let multi_store = MultiStore::testing_memory();
333 let single_store = SingleStore::testing_memory();
334
335 let pools = Pools::new(PoolConfig::default());
336 let actor_system = ActorSystem::new(pools, Clock::Real);
337 let event_bus = EventBus::new(&actor_system);
338 let single = SingleTransaction::new(single_store, event_bus.clone());
339 let multi = MultiTransaction::new(
340 multi_store,
341 single.clone(),
342 event_bus.clone(),
343 actor_system,
344 Clock::Mock(MockClock::from_millis(1000)),
345 Rng::seeded(42),
346 Arc::new(MaterializedCatalog::new()),
347 )
348 .unwrap();
349 let mut result = AdminTransaction::new(
350 multi,
351 single.clone(),
352 event_bus.clone(),
353 Interceptors::new(),
354 IdentityId::system(),
355 Clock::Mock(MockClock::from_millis(1000)),
356 )
357 .unwrap();
358
359 let materialized_catalog = MaterializedCatalog::new();
360 let catalog = Catalog::new(materialized_catalog);
361
362 let namespace = catalog
363 .create_namespace(
364 &mut result,
365 NamespaceToCreate {
366 namespace_fragment: None,
367 name: "reifydb".to_string(),
368 local_name: "reifydb".to_string(),
369 parent_id: NamespaceId::ROOT,
370 grpc: None,
371 token: None,
372 },
373 )
374 .unwrap();
375
376 catalog.create_table(
377 &mut result,
378 TableToCreate {
379 name: Fragment::internal("flows"),
380 namespace: namespace.id(),
381 columns: vec![
382 TableColumnToCreate {
383 name: Fragment::internal("id"),
384 fragment: Fragment::None,
385 constraint: TypeConstraint::unconstrained(Type::Int8),
386 properties: vec![],
387 auto_increment: true,
388 dictionary_id: None,
389 },
390 TableColumnToCreate {
391 name: Fragment::internal("data"),
392 fragment: Fragment::None,
393 constraint: TypeConstraint::unconstrained(Type::Blob),
394 properties: vec![],
395 auto_increment: false,
396 dictionary_id: None,
397 },
398 ],
399 retention_strategy: None,
400 primary_key_columns: None,
401 underlying: false,
402 },
403 )
404 .unwrap();
405
406 result
407}