Skip to main content

reifydb_engine/
test_harness.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7	catalog::{
8		Catalog,
9		namespace::NamespaceToCreate,
10		table::{TableColumnToCreate, TableToCreate},
11	},
12	materialized::MaterializedCatalog,
13};
14use reifydb_cdc::{
15	produce::producer::{CdcProducerEventListener, spawn_cdc_producer},
16	storage::CdcStore,
17};
18use reifydb_core::{
19	actors::cdc::CdcProduceHandle,
20	event::{
21		EventBus,
22		metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
23		transaction::PostCommitEvent,
24	},
25	interface::catalog::id::NamespaceId,
26	util::ioc::IocContainer,
27};
28use reifydb_extension::transform::registry::Transforms;
29use reifydb_metric_old::worker::{
30	CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
31};
32use reifydb_routine::{function::default_functions, procedure::registry::Procedures};
33use reifydb_runtime::{
34	SharedRuntime, SharedRuntimeConfig,
35	actor::system::ActorSystem,
36	context::{
37		RuntimeContext,
38		clock::{Clock, MockClock},
39		rng::Rng,
40	},
41	pool::{PoolConfig, Pools},
42};
43use reifydb_store_multi::MultiStore;
44use reifydb_store_single::SingleStore;
45use reifydb_transaction::{
46	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
47	multi::transaction::MultiTransaction,
48	single::SingleTransaction,
49	transaction::admin::AdminTransaction,
50};
51use reifydb_type::{
52	fragment::Fragment,
53	params::Params,
54	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
55};
56
57use crate::{engine::StandardEngine, vm::services::EngineConfig};
58
59pub struct TestEngine {
60	engine: StandardEngine,
61}
62
63impl Default for TestEngine {
64	fn default() -> Self {
65		Self::new()
66	}
67}
68
69impl TestEngine {
70	/// Create a new TestEngine with all subsystems (CDC, metrics, etc.).
71	pub fn new() -> Self {
72		Self::builder().with_cdc().with_metrics().build()
73	}
74
75	/// Start configuring a test engine via the builder.
76	pub fn builder() -> TestEngineBuilder {
77		TestEngineBuilder::default()
78	}
79
80	/// Run an admin RQL statement as system identity. Panics on error.
81	pub fn admin(&self, rql: &str) -> Vec<Frame> {
82		let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
83		if let Some(e) = r.error {
84			panic!("admin failed: {e:?}\nrql: {rql}")
85		}
86		r.frames
87	}
88
89	/// Run a command RQL statement as system identity. Panics on error.
90	pub fn command(&self, rql: &str) -> Vec<Frame> {
91		let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
92		if let Some(e) = r.error {
93			panic!("command failed: {e:?}\nrql: {rql}")
94		}
95		r.frames
96	}
97
98	/// Run a query RQL statement as system identity. Panics on error.
99	pub fn query(&self, rql: &str) -> Vec<Frame> {
100		let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
101		if let Some(e) = r.error {
102			panic!("query failed: {e:?}\nrql: {rql}")
103		}
104		r.frames
105	}
106
107	/// Run an admin statement expecting an error. Panics if it succeeds.
108	pub fn admin_err(&self, rql: &str) -> String {
109		let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
110		match r.error {
111			Some(e) => format!("{e:?}"),
112			None => panic!("Expected error but admin succeeded\nrql: {rql}"),
113		}
114	}
115
116	/// Run a command statement expecting an error. Panics if it succeeds.
117	pub fn command_err(&self, rql: &str) -> String {
118		let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
119		match r.error {
120			Some(e) => format!("{e:?}"),
121			None => panic!("Expected error but command succeeded\nrql: {rql}"),
122		}
123	}
124
125	/// Run a query statement expecting an error. Panics if it succeeds.
126	pub fn query_err(&self, rql: &str) -> String {
127		let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
128		match r.error {
129			Some(e) => format!("{e:?}"),
130			None => panic!("Expected error but query succeeded\nrql: {rql}"),
131		}
132	}
133
134	/// Count rows in the first frame.
135	pub fn row_count(frames: &[Frame]) -> usize {
136		frames.first().map(|f| f.row_count()).unwrap_or(0)
137	}
138
139	/// Return the system identity used by this harness.
140	pub fn identity() -> IdentityId {
141		IdentityId::system()
142	}
143
144	/// Access the underlying StandardEngine.
145	pub fn inner(&self) -> &StandardEngine {
146		&self.engine
147	}
148}
149
150impl Deref for TestEngine {
151	type Target = StandardEngine;
152
153	fn deref(&self) -> &StandardEngine {
154		&self.engine
155	}
156}
157
158#[derive(Default)]
159pub struct TestEngineBuilder {
160	cdc: bool,
161	metrics: bool,
162}
163
164impl TestEngineBuilder {
165	pub fn with_cdc(mut self) -> Self {
166		self.cdc = true;
167		self
168	}
169
170	pub fn with_metrics(mut self) -> Self {
171		self.metrics = true;
172		self
173	}
174
175	pub fn build(self) -> TestEngine {
176		let pools = Pools::new(PoolConfig::default());
177		let actor_system = ActorSystem::new(pools, Clock::Real);
178		let eventbus = EventBus::new(&actor_system);
179		let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
180		let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
181		let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
182		let runtime = SharedRuntime::from_config(
183			SharedRuntimeConfig::default()
184				.async_threads(2)
185				.system_threads(2)
186				.query_threads(2)
187				.deterministic_testing(1000),
188		);
189		let materialized_catalog = MaterializedCatalog::new();
190		let multi = MultiTransaction::new(
191			multi_store.clone(),
192			single.clone(),
193			eventbus.clone(),
194			actor_system,
195			runtime.clock().clone(),
196			runtime.rng().clone(),
197			Arc::new(materialized_catalog.clone()),
198		)
199		.unwrap();
200
201		let mut ioc = IocContainer::new();
202
203		ioc = ioc.register(materialized_catalog.clone());
204
205		ioc = ioc.register(runtime.clone());
206		ioc = ioc.register(single_store.clone());
207
208		if self.metrics {
209			let metrics_worker = Arc::new(MetricsWorker::new(
210				MetricsWorkerConfig::default(),
211				single_store.clone(),
212				multi_store.clone(),
213				eventbus.clone(),
214			));
215			eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(
216				metrics_worker.sender(),
217			));
218			eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
219			eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(
220				metrics_worker.sender(),
221			));
222			ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
223		}
224
225		let cdc_store = CdcStore::memory();
226		ioc = ioc.register(cdc_store.clone());
227
228		let ioc_for_cdc = ioc.clone();
229
230		let engine = StandardEngine::new(
231			multi,
232			single.clone(),
233			eventbus.clone(),
234			InterceptorFactory::default(),
235			Catalog::new(materialized_catalog),
236			EngineConfig {
237				runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
238				functions: default_functions().configure(),
239				procedures: Procedures::empty(),
240				transforms: Transforms::empty(),
241				ioc,
242				#[cfg(not(reifydb_single_threaded))]
243				remote_registry: None,
244			},
245		);
246
247		if self.cdc {
248			let cdc_handle = spawn_cdc_producer(
249				&runtime.actor_system(),
250				cdc_store,
251				multi_store.clone(),
252				engine.clone(),
253				eventbus.clone(),
254			);
255			eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
256				cdc_handle.actor_ref().clone(),
257				runtime.clock().clone(),
258			));
259			ioc_for_cdc.register_service::<Arc<CdcProduceHandle>>(Arc::new(cdc_handle));
260		}
261
262		TestEngine {
263			engine,
264		}
265	}
266}
267
268pub fn create_test_admin_transaction() -> AdminTransaction {
269	let multi_store = MultiStore::testing_memory();
270	let single_store = SingleStore::testing_memory();
271
272	let pools = Pools::new(PoolConfig::default());
273	let actor_system = ActorSystem::new(pools, Clock::Real);
274	let event_bus = EventBus::new(&actor_system);
275	let single = SingleTransaction::new(single_store, event_bus.clone());
276	let multi = MultiTransaction::new(
277		multi_store,
278		single.clone(),
279		event_bus.clone(),
280		actor_system,
281		Clock::Mock(MockClock::from_millis(1000)),
282		Rng::seeded(42),
283		Arc::new(MaterializedCatalog::new()),
284	)
285	.unwrap();
286
287	AdminTransaction::new(
288		multi,
289		single,
290		event_bus,
291		Interceptors::new(),
292		IdentityId::system(),
293		Clock::Mock(MockClock::from_millis(1000)),
294	)
295	.unwrap()
296}
297
298pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
299	let multi_store = MultiStore::testing_memory();
300	let single_store = SingleStore::testing_memory();
301
302	let pools = Pools::new(PoolConfig::default());
303	let actor_system = ActorSystem::new(pools, Clock::Real);
304	let event_bus = EventBus::new(&actor_system);
305	let single = SingleTransaction::new(single_store, event_bus.clone());
306	let multi = MultiTransaction::new(
307		multi_store,
308		single.clone(),
309		event_bus.clone(),
310		actor_system,
311		Clock::Mock(MockClock::from_millis(1000)),
312		Rng::seeded(42),
313		Arc::new(MaterializedCatalog::new()),
314	)
315	.unwrap();
316	let mut result = AdminTransaction::new(
317		multi,
318		single.clone(),
319		event_bus.clone(),
320		Interceptors::new(),
321		IdentityId::system(),
322		Clock::Mock(MockClock::from_millis(1000)),
323	)
324	.unwrap();
325
326	let materialized_catalog = MaterializedCatalog::new();
327	let catalog = Catalog::new(materialized_catalog);
328
329	let namespace = catalog
330		.create_namespace(
331			&mut result,
332			NamespaceToCreate {
333				namespace_fragment: None,
334				name: "reifydb".to_string(),
335				local_name: "reifydb".to_string(),
336				parent_id: NamespaceId::ROOT,
337				grpc: None,
338				token: None,
339			},
340		)
341		.unwrap();
342
343	catalog.create_table(
344		&mut result,
345		TableToCreate {
346			name: Fragment::internal("flows"),
347			namespace: namespace.id(),
348			columns: vec![
349				TableColumnToCreate {
350					name: Fragment::internal("id"),
351					fragment: Fragment::None,
352					constraint: TypeConstraint::unconstrained(Type::Int8),
353					properties: vec![],
354					auto_increment: true,
355					dictionary_id: None,
356				},
357				TableColumnToCreate {
358					name: Fragment::internal("data"),
359					fragment: Fragment::None,
360					constraint: TypeConstraint::unconstrained(Type::Blob),
361					properties: vec![],
362					auto_increment: false,
363					dictionary_id: None,
364				},
365			],
366			retention_strategy: None,
367			primary_key_columns: None,
368			underlying: false,
369		},
370	)
371	.unwrap();
372
373	result
374}