Skip to main content

reifydb_engine/
test_harness.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7	catalog::{
8		Catalog,
9		namespace::NamespaceToCreate,
10		table::{TableColumnToCreate, TableToCreate},
11	},
12	materialized::MaterializedCatalog,
13};
14use reifydb_cdc::{
15	produce::{
16		producer::{CdcProducerEventListener, spawn_cdc_producer},
17		watermark::CdcProducerWatermark,
18	},
19	storage::CdcStore,
20};
21use reifydb_core::{
22	actors::cdc::CdcProduceHandle,
23	event::{EventBus, transaction::PostCommitEvent},
24	interface::catalog::id::NamespaceId,
25	util::ioc::IocContainer,
26};
27use reifydb_extension::transform::registry::Transforms;
28use reifydb_routine::{
29	function::default_native_functions, procedure::default_native_procedures, routine::registry::Routines,
30};
31use reifydb_runtime::{
32	SharedRuntime, SharedRuntimeConfig,
33	actor::system::ActorSystem,
34	context::{
35		RuntimeContext,
36		clock::{Clock, MockClock},
37		rng::Rng,
38	},
39	pool::{PoolConfig, Pools},
40};
41#[cfg(not(target_arch = "wasm32"))]
42use reifydb_sqlite::SqliteConfig;
43use reifydb_store_multi::MultiStore;
44use reifydb_store_single::SingleStore;
45use reifydb_transaction::{
46	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
47	multi::transaction::MultiTransaction,
48	single::SingleTransaction,
49	transaction::admin::AdminTransaction,
50};
51use reifydb_type::{
52	fragment::Fragment,
53	params::Params,
54	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
55};
56
57use crate::{engine::StandardEngine, vm::services::EngineConfig};
58
59pub struct TestEngine {
60	engine: StandardEngine,
61	mock_clock: MockClock,
62}
63
64impl Default for TestEngine {
65	fn default() -> Self {
66		Self::new()
67	}
68}
69
70impl TestEngine {
71	/// Create a new TestEngine with all subsystems (CDC, metrics, etc.).
72	pub fn new() -> Self {
73		Self::builder().with_cdc().build()
74	}
75
76	/// Start configuring a test engine via the builder.
77	pub fn builder() -> TestEngineBuilder {
78		TestEngineBuilder::default()
79	}
80
81	/// Run an admin RQL statement as system identity. Panics on error.
82	pub fn admin(&self, rql: &str) -> Vec<Frame> {
83		let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
84		if let Some(e) = r.error {
85			panic!("admin failed: {e:?}\nrql: {rql}")
86		}
87		r.frames
88	}
89
90	/// Run a command RQL statement as system identity. Panics on error.
91	pub fn command(&self, rql: &str) -> Vec<Frame> {
92		let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
93		if let Some(e) = r.error {
94			panic!("command failed: {e:?}\nrql: {rql}")
95		}
96		r.frames
97	}
98
99	/// Run a query RQL statement as system identity. Panics on error.
100	pub fn query(&self, rql: &str) -> Vec<Frame> {
101		let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
102		if let Some(e) = r.error {
103			panic!("query failed: {e:?}\nrql: {rql}")
104		}
105		r.frames
106	}
107
108	/// Run an admin statement expecting an error. Panics if it succeeds.
109	pub fn admin_err(&self, rql: &str) -> String {
110		let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
111		match r.error {
112			Some(e) => format!("{e:?}"),
113			None => panic!("Expected error but admin succeeded\nrql: {rql}"),
114		}
115	}
116
117	/// Run a command statement expecting an error. Panics if it succeeds.
118	pub fn command_err(&self, rql: &str) -> String {
119		let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
120		match r.error {
121			Some(e) => format!("{e:?}"),
122			None => panic!("Expected error but command succeeded\nrql: {rql}"),
123		}
124	}
125
126	/// Run a query statement expecting an error. Panics if it succeeds.
127	pub fn query_err(&self, rql: &str) -> String {
128		let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
129		match r.error {
130			Some(e) => format!("{e:?}"),
131			None => panic!("Expected error but query succeeded\nrql: {rql}"),
132		}
133	}
134
135	/// Count rows in the first frame.
136	pub fn row_count(frames: &[Frame]) -> usize {
137		frames.first().map(|f| f.row_count()).unwrap_or(0)
138	}
139
140	/// Return the system identity used by this harness.
141	pub fn identity() -> IdentityId {
142		IdentityId::system()
143	}
144
145	/// Access the underlying StandardEngine.
146	pub fn inner(&self) -> &StandardEngine {
147		&self.engine
148	}
149
150	/// The mock clock backing the test engine. Use `.advance_millis()` etc. to
151	/// move time forward deterministically.
152	pub fn mock_clock(&self) -> MockClock {
153		self.mock_clock.clone()
154	}
155}
156
157impl Deref for TestEngine {
158	type Target = StandardEngine;
159
160	fn deref(&self) -> &StandardEngine {
161		&self.engine
162	}
163}
164
165#[derive(Default)]
166pub struct TestEngineBuilder {
167	cdc: bool,
168	#[cfg(not(target_arch = "wasm32"))]
169	sqlite_cdc: Option<SqliteConfig>,
170}
171
172impl TestEngineBuilder {
173	pub fn with_cdc(mut self) -> Self {
174		self.cdc = true;
175		self
176	}
177
178	/// Use a SQLite-backed CDC store instead of the default in-memory backend.
179	/// Implies `with_cdc()`.
180	#[cfg(not(target_arch = "wasm32"))]
181	pub fn with_sqlite_cdc(mut self, config: SqliteConfig) -> Self {
182		self.cdc = true;
183		self.sqlite_cdc = Some(config);
184		self
185	}
186
187	pub fn build(self) -> TestEngine {
188		let mock_clock = MockClock::from_millis(1000);
189		let pools = Pools::new(PoolConfig::default());
190		let actor_system = ActorSystem::new(pools, Clock::Mock(mock_clock.clone()));
191		let eventbus = EventBus::new(&actor_system);
192		let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
193		let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
194		let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
195		let runtime = make_test_runtime(&mock_clock);
196		let materialized_catalog = MaterializedCatalog::new();
197		let multi = MultiTransaction::new(
198			multi_store.clone(),
199			single.clone(),
200			eventbus.clone(),
201			actor_system,
202			runtime.clock().clone(),
203			runtime.rng().clone(),
204			Arc::new(materialized_catalog.clone()),
205		)
206		.unwrap();
207
208		let mut ioc = IocContainer::new();
209		ioc = ioc.register(materialized_catalog.clone());
210		ioc = ioc.register(runtime.clone());
211		ioc = ioc.register(single_store.clone());
212
213		#[cfg(not(target_arch = "wasm32"))]
214		let cdc_store = match self.sqlite_cdc {
215			Some(config) => CdcStore::sqlite(config),
216			None => CdcStore::memory(),
217		};
218		#[cfg(target_arch = "wasm32")]
219		let cdc_store = CdcStore::memory();
220		ioc = ioc.register(cdc_store.clone());
221
222		let cdc_producer_watermark = CdcProducerWatermark::new();
223		ioc = ioc.register(cdc_producer_watermark.clone());
224
225		let ioc_for_cdc = ioc.clone();
226
227		let engine = StandardEngine::new(
228			multi,
229			single.clone(),
230			eventbus.clone(),
231			InterceptorFactory::default(),
232			Catalog::new(materialized_catalog),
233			EngineConfig {
234				runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
235				routines: {
236					let b = Routines::builder();
237					let b = default_native_functions(b);
238					default_native_procedures(b).configure()
239				},
240				transforms: Transforms::empty(),
241				ioc,
242				#[cfg(not(reifydb_single_threaded))]
243				remote_registry: None,
244			},
245		);
246
247		if self.cdc {
248			register_cdc_producer(
249				&runtime,
250				cdc_store,
251				multi_store,
252				&engine,
253				&eventbus,
254				ioc_for_cdc,
255				cdc_producer_watermark,
256			);
257		}
258
259		TestEngine {
260			engine,
261			mock_clock,
262		}
263	}
264}
265
266#[inline]
267fn make_test_runtime(mock_clock: &MockClock) -> SharedRuntime {
268	let base = SharedRuntimeConfig::default().async_threads(2).system_threads(2).query_threads(2).seeded(1000);
269	let config = SharedRuntimeConfig {
270		clock: Clock::Mock(mock_clock.clone()),
271		..base
272	};
273	SharedRuntime::from_config(config)
274}
275
276fn register_cdc_producer(
277	runtime: &SharedRuntime,
278	cdc_store: CdcStore,
279	multi_store: MultiStore,
280	engine: &StandardEngine,
281	eventbus: &EventBus,
282	ioc_for_cdc: IocContainer,
283	watermark: CdcProducerWatermark,
284) {
285	let cdc_handle = spawn_cdc_producer(
286		&runtime.actor_system(),
287		cdc_store,
288		multi_store,
289		engine.clone(),
290		eventbus.clone(),
291		runtime.clock().clone(),
292		watermark,
293	);
294	eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
295		cdc_handle.actor_ref().clone(),
296		runtime.clock().clone(),
297	));
298	ioc_for_cdc.register_service::<Arc<CdcProduceHandle>>(Arc::new(cdc_handle));
299}
300
301pub fn create_test_admin_transaction() -> AdminTransaction {
302	let multi_store = MultiStore::testing_memory();
303	let single_store = SingleStore::testing_memory();
304
305	let pools = Pools::new(PoolConfig::default());
306	let actor_system = ActorSystem::new(pools, Clock::Real);
307	let event_bus = EventBus::new(&actor_system);
308	let single = SingleTransaction::new(single_store, event_bus.clone());
309	let multi = MultiTransaction::new(
310		multi_store,
311		single.clone(),
312		event_bus.clone(),
313		actor_system,
314		Clock::Mock(MockClock::from_millis(1000)),
315		Rng::seeded(42),
316		Arc::new(MaterializedCatalog::new()),
317	)
318	.unwrap();
319
320	AdminTransaction::new(
321		multi,
322		single,
323		event_bus,
324		Interceptors::new(),
325		IdentityId::system(),
326		Clock::Mock(MockClock::from_millis(1000)),
327	)
328	.unwrap()
329}
330
331pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
332	let multi_store = MultiStore::testing_memory();
333	let single_store = SingleStore::testing_memory();
334
335	let pools = Pools::new(PoolConfig::default());
336	let actor_system = ActorSystem::new(pools, Clock::Real);
337	let event_bus = EventBus::new(&actor_system);
338	let single = SingleTransaction::new(single_store, event_bus.clone());
339	let multi = MultiTransaction::new(
340		multi_store,
341		single.clone(),
342		event_bus.clone(),
343		actor_system,
344		Clock::Mock(MockClock::from_millis(1000)),
345		Rng::seeded(42),
346		Arc::new(MaterializedCatalog::new()),
347	)
348	.unwrap();
349	let mut result = AdminTransaction::new(
350		multi,
351		single.clone(),
352		event_bus.clone(),
353		Interceptors::new(),
354		IdentityId::system(),
355		Clock::Mock(MockClock::from_millis(1000)),
356	)
357	.unwrap();
358
359	let materialized_catalog = MaterializedCatalog::new();
360	let catalog = Catalog::new(materialized_catalog);
361
362	let namespace = catalog
363		.create_namespace(
364			&mut result,
365			NamespaceToCreate {
366				namespace_fragment: None,
367				name: "reifydb".to_string(),
368				local_name: "reifydb".to_string(),
369				parent_id: NamespaceId::ROOT,
370				grpc: None,
371				token: None,
372			},
373		)
374		.unwrap();
375
376	catalog.create_table(
377		&mut result,
378		TableToCreate {
379			name: Fragment::internal("flows"),
380			namespace: namespace.id(),
381			columns: vec![
382				TableColumnToCreate {
383					name: Fragment::internal("id"),
384					fragment: Fragment::None,
385					constraint: TypeConstraint::unconstrained(Type::Int8),
386					properties: vec![],
387					auto_increment: true,
388					dictionary_id: None,
389				},
390				TableColumnToCreate {
391					name: Fragment::internal("data"),
392					fragment: Fragment::None,
393					constraint: TypeConstraint::unconstrained(Type::Blob),
394					properties: vec![],
395					auto_increment: false,
396					dictionary_id: None,
397				},
398			],
399			retention_strategy: None,
400			primary_key_columns: None,
401			underlying: false,
402		},
403	)
404	.unwrap();
405
406	result
407}