Skip to main content

reifydb_engine/
test_harness.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7	cache::CatalogCache,
8	catalog::{
9		Catalog,
10		namespace::NamespaceToCreate,
11		table::{TableColumnToCreate, TableToCreate},
12	},
13};
14use reifydb_cdc::{
15	produce::{
16		producer::{CdcProducerEventListener, spawn_cdc_producer},
17		watermark::CdcProducerWatermark,
18	},
19	storage::CdcStore,
20};
21use reifydb_core::{
22	actors::cdc::CdcProduceHandle,
23	event::{EventBus, transaction::PostCommitEvent},
24	interface::catalog::id::NamespaceId,
25	util::ioc::IocContainer,
26};
27use reifydb_extension::transform::registry::Transforms;
28use reifydb_routine::{
29	function::default_native_functions, procedure::default_native_procedures, routine::registry::Routines,
30};
31use reifydb_runtime::{
32	SharedRuntime, SharedRuntimeConfig,
33	actor::system::ActorSystem,
34	context::{
35		RuntimeContext,
36		clock::{Clock, MockClock},
37		rng::Rng,
38	},
39	pool::{PoolConfig, Pools},
40};
41#[cfg(not(target_arch = "wasm32"))]
42use reifydb_sqlite::SqliteConfig;
43use reifydb_store_multi::MultiStore;
44use reifydb_store_single::SingleStore;
45use reifydb_transaction::{
46	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
47	multi::transaction::MultiTransaction,
48	single::SingleTransaction,
49	transaction::admin::AdminTransaction,
50};
51use reifydb_type::{
52	fragment::Fragment,
53	params::Params,
54	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
55};
56
57use crate::{engine::StandardEngine, vm::services::EngineConfig};
58
59pub struct TestEngine {
60	engine: StandardEngine,
61	mock_clock: MockClock,
62}
63
64impl Default for TestEngine {
65	fn default() -> Self {
66		Self::new()
67	}
68}
69
70impl TestEngine {
71	pub fn new() -> Self {
72		Self::builder().with_cdc().build()
73	}
74
75	pub fn builder() -> TestEngineBuilder {
76		TestEngineBuilder::default()
77	}
78
79	pub fn admin(&self, rql: &str) -> Vec<Frame> {
80		let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
81		if let Some(e) = r.error {
82			panic!("admin failed: {e:?}\nrql: {rql}")
83		}
84		r.frames
85	}
86
87	pub fn command(&self, rql: &str) -> Vec<Frame> {
88		let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
89		if let Some(e) = r.error {
90			panic!("command failed: {e:?}\nrql: {rql}")
91		}
92		r.frames
93	}
94
95	pub fn query(&self, rql: &str) -> Vec<Frame> {
96		let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
97		if let Some(e) = r.error {
98			panic!("query failed: {e:?}\nrql: {rql}")
99		}
100		r.frames
101	}
102
103	pub fn admin_err(&self, rql: &str) -> String {
104		let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
105		match r.error {
106			Some(e) => format!("{e:?}"),
107			None => panic!("Expected error but admin succeeded\nrql: {rql}"),
108		}
109	}
110
111	pub fn command_err(&self, rql: &str) -> String {
112		let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
113		match r.error {
114			Some(e) => format!("{e:?}"),
115			None => panic!("Expected error but command succeeded\nrql: {rql}"),
116		}
117	}
118
119	pub fn query_err(&self, rql: &str) -> String {
120		let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
121		match r.error {
122			Some(e) => format!("{e:?}"),
123			None => panic!("Expected error but query succeeded\nrql: {rql}"),
124		}
125	}
126
127	pub fn row_count(frames: &[Frame]) -> usize {
128		frames.first().map(|f| f.row_count()).unwrap_or(0)
129	}
130
131	pub fn identity() -> IdentityId {
132		IdentityId::system()
133	}
134
135	pub fn inner(&self) -> &StandardEngine {
136		&self.engine
137	}
138
139	pub fn mock_clock(&self) -> MockClock {
140		self.mock_clock.clone()
141	}
142}
143
144impl Deref for TestEngine {
145	type Target = StandardEngine;
146
147	fn deref(&self) -> &StandardEngine {
148		&self.engine
149	}
150}
151
152#[derive(Default)]
153pub struct TestEngineBuilder {
154	cdc: bool,
155	#[cfg(not(target_arch = "wasm32"))]
156	sqlite_cdc: Option<SqliteConfig>,
157}
158
159impl TestEngineBuilder {
160	pub fn with_cdc(mut self) -> Self {
161		self.cdc = true;
162		self
163	}
164
165	#[cfg(not(target_arch = "wasm32"))]
166	pub fn with_sqlite_cdc(mut self, config: SqliteConfig) -> Self {
167		self.cdc = true;
168		self.sqlite_cdc = Some(config);
169		self
170	}
171
172	pub fn build(self) -> TestEngine {
173		let mock_clock = MockClock::from_millis(1000);
174		let pools = Pools::new(PoolConfig::default());
175		let actor_system = ActorSystem::new(pools, Clock::Mock(mock_clock.clone()));
176		let eventbus = EventBus::new(&actor_system);
177		let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
178		let single_store = SingleStore::testing_memory();
179		let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
180		let runtime = make_test_runtime(&mock_clock);
181		let catalog_cache = CatalogCache::new();
182		let multi = MultiTransaction::new(
183			multi_store.clone(),
184			single.clone(),
185			eventbus.clone(),
186			actor_system,
187			runtime.clock().clone(),
188			runtime.rng().clone(),
189			Arc::new(catalog_cache.clone()),
190		)
191		.unwrap();
192
193		let mut ioc = IocContainer::new();
194		ioc = ioc.register(catalog_cache.clone());
195		ioc = ioc.register(runtime.clone());
196		ioc = ioc.register(single_store.clone());
197
198		#[cfg(not(target_arch = "wasm32"))]
199		let cdc_store = match self.sqlite_cdc {
200			Some(config) => CdcStore::sqlite(config),
201			None => CdcStore::memory(),
202		};
203		#[cfg(target_arch = "wasm32")]
204		let cdc_store = CdcStore::memory();
205		ioc = ioc.register(cdc_store.clone());
206
207		let cdc_producer_watermark = CdcProducerWatermark::new();
208		ioc = ioc.register(cdc_producer_watermark.clone());
209
210		let ioc_for_cdc = ioc.clone();
211
212		let engine = StandardEngine::new(
213			multi,
214			single.clone(),
215			eventbus.clone(),
216			InterceptorFactory::default(),
217			Catalog::new(catalog_cache),
218			EngineConfig {
219				runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
220				routines: {
221					let b = Routines::builder();
222					let b = default_native_functions(b);
223					default_native_procedures(b).configure()
224				},
225				transforms: Transforms::empty(),
226				ioc,
227				#[cfg(not(reifydb_single_threaded))]
228				remote_registry: None,
229			},
230		);
231
232		if self.cdc {
233			register_cdc_producer(
234				&runtime,
235				cdc_store,
236				multi_store,
237				&engine,
238				&eventbus,
239				ioc_for_cdc,
240				cdc_producer_watermark,
241			);
242		}
243
244		TestEngine {
245			engine,
246			mock_clock,
247		}
248	}
249}
250
251#[inline]
252fn make_test_runtime(mock_clock: &MockClock) -> SharedRuntime {
253	let config = SharedRuntimeConfig::default().seeded(1000);
254	let config = SharedRuntimeConfig {
255		clock: Clock::Mock(mock_clock.clone()),
256		..config
257	};
258	let pools = PoolConfig {
259		async_threads: 2,
260		system_threads: 2,
261		query_threads: 2,
262	};
263	SharedRuntime::from_config(config, pools)
264}
265
266fn register_cdc_producer(
267	runtime: &SharedRuntime,
268	cdc_store: CdcStore,
269	multi_store: MultiStore,
270	engine: &StandardEngine,
271	eventbus: &EventBus,
272	ioc_for_cdc: IocContainer,
273	watermark: CdcProducerWatermark,
274) {
275	let cdc_handle = spawn_cdc_producer(
276		&runtime.actor_system(),
277		cdc_store,
278		multi_store,
279		engine.clone(),
280		eventbus.clone(),
281		runtime.clock().clone(),
282		watermark,
283	);
284	eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
285		cdc_handle.actor_ref().clone(),
286		runtime.clock().clone(),
287	));
288	ioc_for_cdc.register_service::<Arc<CdcProduceHandle>>(Arc::new(cdc_handle));
289}
290
291pub fn create_test_admin_transaction() -> AdminTransaction {
292	let multi_store = MultiStore::testing_memory();
293	let single_store = SingleStore::testing_memory();
294
295	let pools = Pools::new(PoolConfig::sync_only());
296	let actor_system = ActorSystem::new(pools, Clock::Real);
297	let event_bus = EventBus::new(&actor_system);
298	let single = SingleTransaction::new(single_store, event_bus.clone());
299	let multi = MultiTransaction::new(
300		multi_store,
301		single.clone(),
302		event_bus.clone(),
303		actor_system,
304		Clock::Mock(MockClock::from_millis(1000)),
305		Rng::seeded(42),
306		Arc::new(CatalogCache::new()),
307	)
308	.unwrap();
309
310	AdminTransaction::new(
311		multi,
312		single,
313		event_bus,
314		Interceptors::new(),
315		IdentityId::system(),
316		Clock::Mock(MockClock::from_millis(1000)),
317	)
318	.unwrap()
319}
320
321pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
322	let multi_store = MultiStore::testing_memory();
323	let single_store = SingleStore::testing_memory();
324
325	let pools = Pools::new(PoolConfig::sync_only());
326	let actor_system = ActorSystem::new(pools, Clock::Real);
327	let event_bus = EventBus::new(&actor_system);
328	let single = SingleTransaction::new(single_store, event_bus.clone());
329	let multi = MultiTransaction::new(
330		multi_store,
331		single.clone(),
332		event_bus.clone(),
333		actor_system,
334		Clock::Mock(MockClock::from_millis(1000)),
335		Rng::seeded(42),
336		Arc::new(CatalogCache::new()),
337	)
338	.unwrap();
339	let mut result = AdminTransaction::new(
340		multi,
341		single.clone(),
342		event_bus.clone(),
343		Interceptors::new(),
344		IdentityId::system(),
345		Clock::Mock(MockClock::from_millis(1000)),
346	)
347	.unwrap();
348
349	let catalog_cache = CatalogCache::new();
350	let catalog = Catalog::new(catalog_cache);
351
352	let namespace = catalog
353		.create_namespace(
354			&mut result,
355			NamespaceToCreate {
356				namespace_fragment: None,
357				name: "reifydb".to_string(),
358				local_name: "reifydb".to_string(),
359				parent_id: NamespaceId::ROOT,
360				grpc: None,
361				token: None,
362			},
363		)
364		.unwrap();
365
366	catalog.create_table(
367		&mut result,
368		TableToCreate {
369			name: Fragment::internal("flows"),
370			namespace: namespace.id(),
371			columns: vec![
372				TableColumnToCreate {
373					name: Fragment::internal("id"),
374					fragment: Fragment::None,
375					constraint: TypeConstraint::unconstrained(Type::Int8),
376					properties: vec![],
377					auto_increment: true,
378					dictionary_id: None,
379				},
380				TableColumnToCreate {
381					name: Fragment::internal("data"),
382					fragment: Fragment::None,
383					constraint: TypeConstraint::unconstrained(Type::Blob),
384					properties: vec![],
385					auto_increment: false,
386					dictionary_id: None,
387				},
388			],
389			retention_strategy: None,
390			primary_key_columns: None,
391			underlying: false,
392		},
393	)
394	.unwrap();
395
396	result
397}