Skip to main content

reifydb_engine/
test_harness.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7	cache::CatalogCache,
8	catalog::{
9		Catalog,
10		namespace::NamespaceToCreate,
11		table::{TableColumnToCreate, TableToCreate},
12	},
13};
14#[cfg(not(target_arch = "wasm32"))]
15use reifydb_cdc::storage::recent_cache::RecentCdcCache;
16use reifydb_cdc::{
17	consume::wake::CdcWakeRegistry,
18	produce::{
19		producer::{CdcProducerEventListener, spawn_cdc_producer},
20		watermark::CdcProducerWatermark,
21	},
22	storage::CdcStore,
23};
24use reifydb_core::{
25	actors::cdc::CdcProduceHandle,
26	event::{EventBus, transaction::PostCommitEvent},
27	interface::catalog::id::NamespaceId,
28	util::ioc::IocContainer,
29};
30use reifydb_extension::transform::registry::Transforms;
31use reifydb_routine::{
32	function::default_native_functions, procedure::default_native_procedures, routine::registry::Routines,
33};
34use reifydb_runtime::{
35	SharedRuntime, SharedRuntimeConfig,
36	actor::system::ActorSystem,
37	context::{
38		RuntimeContext,
39		clock::{Clock, MockClock},
40		rng::Rng,
41	},
42	pool::{PoolConfig, Pools},
43};
44#[cfg(not(target_arch = "wasm32"))]
45use reifydb_sqlite::SqliteConfig;
46use reifydb_store_multi::MultiStore;
47use reifydb_store_single::SingleStore;
48use reifydb_transaction::{
49	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
50	multi::transaction::MultiTransaction,
51	single::SingleTransaction,
52	transaction::admin::AdminTransaction,
53};
54use reifydb_value::{
55	fragment::Fragment,
56	params::Params,
57	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, value_type::ValueType},
58};
59
60use crate::{engine::StandardEngine, vm::services::EngineConfig};
61
62pub struct TestEngine {
63	engine: StandardEngine,
64	mock_clock: MockClock,
65}
66
67impl Default for TestEngine {
68	fn default() -> Self {
69		Self::new()
70	}
71}
72
73impl TestEngine {
74	pub fn new() -> Self {
75		Self::builder().with_cdc().build()
76	}
77
78	pub fn builder() -> TestEngineBuilder {
79		TestEngineBuilder::default()
80	}
81
82	pub fn admin(&self, rql: &str) -> Vec<Frame> {
83		let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
84		if let Some(e) = r.error {
85			panic!("admin failed: {e:?}\nrql: {rql}")
86		}
87		r.frames
88	}
89
90	pub fn command(&self, rql: &str) -> Vec<Frame> {
91		let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
92		if let Some(e) = r.error {
93			panic!("command failed: {e:?}\nrql: {rql}")
94		}
95		r.frames
96	}
97
98	pub fn query(&self, rql: &str) -> Vec<Frame> {
99		let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
100		if let Some(e) = r.error {
101			panic!("query failed: {e:?}\nrql: {rql}")
102		}
103		r.frames
104	}
105
106	pub fn admin_err(&self, rql: &str) -> String {
107		let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
108		match r.error {
109			Some(e) => format!("{e:?}"),
110			None => panic!("Expected error but admin succeeded\nrql: {rql}"),
111		}
112	}
113
114	pub fn command_err(&self, rql: &str) -> String {
115		let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
116		match r.error {
117			Some(e) => format!("{e:?}"),
118			None => panic!("Expected error but command succeeded\nrql: {rql}"),
119		}
120	}
121
122	pub fn query_err(&self, rql: &str) -> String {
123		let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
124		match r.error {
125			Some(e) => format!("{e:?}"),
126			None => panic!("Expected error but query succeeded\nrql: {rql}"),
127		}
128	}
129
130	pub fn row_count(frames: &[Frame]) -> usize {
131		frames.first().map(|f| f.row_count()).unwrap_or(0)
132	}
133
134	pub fn identity() -> IdentityId {
135		IdentityId::system()
136	}
137
138	pub fn inner(&self) -> &StandardEngine {
139		&self.engine
140	}
141
142	pub fn mock_clock(&self) -> MockClock {
143		self.mock_clock.clone()
144	}
145}
146
147impl Deref for TestEngine {
148	type Target = StandardEngine;
149
150	fn deref(&self) -> &StandardEngine {
151		&self.engine
152	}
153}
154
155#[derive(Default)]
156pub struct TestEngineBuilder {
157	cdc: bool,
158	#[cfg(not(target_arch = "wasm32"))]
159	sqlite_cdc: Option<SqliteConfig>,
160}
161
162impl TestEngineBuilder {
163	pub fn with_cdc(mut self) -> Self {
164		self.cdc = true;
165		self
166	}
167
168	#[cfg(not(target_arch = "wasm32"))]
169	pub fn with_sqlite_cdc(mut self, config: SqliteConfig) -> Self {
170		self.cdc = true;
171		self.sqlite_cdc = Some(config);
172		self
173	}
174
175	pub fn build(self) -> TestEngine {
176		let mock_clock = MockClock::from_millis(1000);
177		let pools = Pools::new(PoolConfig::default());
178		let actor_system = ActorSystem::new(pools, Clock::Mock(mock_clock.clone()));
179		let eventbus = EventBus::new(&actor_system);
180		let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
181		let single_store = SingleStore::testing_memory();
182		let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
183		let runtime = make_test_runtime(&mock_clock);
184		let catalog_cache = CatalogCache::new();
185		let multi = MultiTransaction::new(
186			multi_store.clone(),
187			single.clone(),
188			eventbus.clone(),
189			actor_system,
190			runtime.clock().clone(),
191			runtime.rng().clone(),
192			Arc::new(catalog_cache.clone()),
193		)
194		.unwrap();
195
196		let mut ioc = IocContainer::new();
197		ioc = ioc.register(catalog_cache.clone());
198		ioc = ioc.register(runtime.clone());
199		ioc = ioc.register(single_store.clone());
200		ioc = ioc.register(eventbus.clone());
201
202		#[cfg(not(target_arch = "wasm32"))]
203		let cdc_store = match self.sqlite_cdc {
204			Some(config) => CdcStore::sqlite(config, RecentCdcCache::DEFAULT_CAPACITY),
205			None => CdcStore::memory(),
206		};
207		#[cfg(target_arch = "wasm32")]
208		let cdc_store = CdcStore::memory();
209		ioc = ioc.register(cdc_store.clone());
210
211		let cdc_producer_watermark = CdcProducerWatermark::new();
212		ioc = ioc.register(cdc_producer_watermark.clone());
213
214		let cdc_wake_registry = CdcWakeRegistry::new();
215		ioc = ioc.register(cdc_wake_registry.clone());
216
217		let ioc_for_cdc = ioc.clone();
218
219		let engine = StandardEngine::new(
220			multi,
221			single.clone(),
222			eventbus.clone(),
223			InterceptorFactory::default(),
224			Catalog::new(catalog_cache),
225			EngineConfig {
226				runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
227				routines: {
228					let b = Routines::builder();
229					let b = default_native_functions(b);
230					default_native_procedures(b).configure()
231				},
232				transforms: Transforms::empty(),
233				ioc,
234				#[cfg(not(reifydb_single_threaded))]
235				remote_registry: None,
236			},
237		);
238
239		if self.cdc {
240			register_cdc_producer(
241				&runtime,
242				cdc_store,
243				multi_store,
244				&engine,
245				&eventbus,
246				ioc_for_cdc,
247				cdc_producer_watermark,
248				cdc_wake_registry,
249			);
250		}
251
252		TestEngine {
253			engine,
254			mock_clock,
255		}
256	}
257}
258
259#[inline]
260fn make_test_runtime(mock_clock: &MockClock) -> SharedRuntime {
261	let config = SharedRuntimeConfig::default().seeded(1000);
262	let config = SharedRuntimeConfig {
263		clock: Clock::Mock(mock_clock.clone()),
264		..config
265	};
266	let pools = PoolConfig {
267		async_threads: 2,
268		system_threads: 2,
269		query_threads: 2,
270		commit_threads: 2,
271		background_threads: 1,
272	};
273	SharedRuntime::from_config(config, pools)
274}
275
276#[allow(clippy::too_many_arguments)]
277fn register_cdc_producer(
278	runtime: &SharedRuntime,
279	cdc_store: CdcStore,
280	multi_store: MultiStore,
281	engine: &StandardEngine,
282	eventbus: &EventBus,
283	ioc_for_cdc: IocContainer,
284	watermark: CdcProducerWatermark,
285	wake_registry: CdcWakeRegistry,
286) {
287	let cdc_handle = spawn_cdc_producer(
288		&runtime.actor_system(),
289		cdc_store,
290		multi_store,
291		engine.clone(),
292		eventbus.clone(),
293		runtime.clock().clone(),
294		watermark,
295		wake_registry,
296	);
297	eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
298		cdc_handle.actor_ref().clone(),
299		runtime.clock().clone(),
300	));
301	ioc_for_cdc.register_service::<Arc<CdcProduceHandle>>(Arc::new(cdc_handle));
302}
303
304pub fn create_test_admin_transaction() -> AdminTransaction {
305	let multi_store = MultiStore::testing_memory();
306	let single_store = SingleStore::testing_memory();
307
308	let pools = Pools::new(PoolConfig::sync_only());
309	let actor_system = ActorSystem::new(pools, Clock::Real);
310	let event_bus = EventBus::new(&actor_system);
311	let single = SingleTransaction::new(single_store, event_bus.clone());
312	let multi = MultiTransaction::new(
313		multi_store,
314		single.clone(),
315		event_bus.clone(),
316		actor_system,
317		Clock::Mock(MockClock::from_millis(1000)),
318		Rng::seeded(42),
319		Arc::new(CatalogCache::new()),
320	)
321	.unwrap();
322
323	AdminTransaction::new(
324		multi,
325		single,
326		event_bus,
327		Interceptors::new(),
328		IdentityId::system(),
329		Clock::Mock(MockClock::from_millis(1000)),
330	)
331	.unwrap()
332}
333
334pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
335	let multi_store = MultiStore::testing_memory();
336	let single_store = SingleStore::testing_memory();
337
338	let pools = Pools::new(PoolConfig::sync_only());
339	let actor_system = ActorSystem::new(pools, Clock::Real);
340	let event_bus = EventBus::new(&actor_system);
341	let single = SingleTransaction::new(single_store, event_bus.clone());
342	let multi = MultiTransaction::new(
343		multi_store,
344		single.clone(),
345		event_bus.clone(),
346		actor_system,
347		Clock::Mock(MockClock::from_millis(1000)),
348		Rng::seeded(42),
349		Arc::new(CatalogCache::new()),
350	)
351	.unwrap();
352	let mut result = AdminTransaction::new(
353		multi,
354		single.clone(),
355		event_bus.clone(),
356		Interceptors::new(),
357		IdentityId::system(),
358		Clock::Mock(MockClock::from_millis(1000)),
359	)
360	.unwrap();
361
362	let catalog_cache = CatalogCache::new();
363	let catalog = Catalog::new(catalog_cache);
364
365	let namespace = catalog
366		.create_namespace(
367			&mut result,
368			NamespaceToCreate {
369				namespace_fragment: None,
370				name: "reifydb".to_string(),
371				local_name: "reifydb".to_string(),
372				parent_id: NamespaceId::ROOT,
373				grpc: None,
374				token: None,
375			},
376		)
377		.unwrap();
378
379	catalog.create_table(
380		&mut result,
381		TableToCreate {
382			name: Fragment::internal("flows"),
383			namespace: namespace.id(),
384			columns: vec![
385				TableColumnToCreate {
386					name: Fragment::internal("id"),
387					fragment: Fragment::None,
388					constraint: TypeConstraint::unconstrained(ValueType::Int8),
389					properties: vec![],
390					auto_increment: true,
391					dictionary_id: None,
392				},
393				TableColumnToCreate {
394					name: Fragment::internal("data"),
395					fragment: Fragment::None,
396					constraint: TypeConstraint::unconstrained(ValueType::Blob),
397					properties: vec![],
398					auto_increment: false,
399					dictionary_id: None,
400				},
401			],
402			retention_strategy: None,
403			primary_key_columns: None,
404			underlying: false,
405		},
406	)
407	.unwrap();
408
409	result
410}