Skip to main content

reifydb_engine/
test_harness.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7	catalog::{
8		Catalog,
9		namespace::NamespaceToCreate,
10		table::{TableColumnToCreate, TableToCreate},
11	},
12	materialized::MaterializedCatalog,
13};
14use reifydb_cdc::{
15	produce::producer::{CdcProduceMsg, CdcProducerEventListener, spawn_cdc_producer},
16	storage::CdcStore,
17};
18use reifydb_core::{
19	event::{
20		EventBus,
21		metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
22		transaction::PostCommitEvent,
23	},
24	interface::catalog::id::NamespaceId,
25	util::ioc::IocContainer,
26};
27use reifydb_extension::transform::registry::Transforms;
28use reifydb_metric_old::worker::{
29	CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
30};
31use reifydb_routine::{function::default_functions, procedure::registry::Procedures};
32use reifydb_runtime::{
33	SharedRuntime, SharedRuntimeConfig,
34	actor::system::{ActorHandle, ActorSystem},
35	context::{
36		RuntimeContext,
37		clock::{Clock, MockClock},
38		rng::Rng,
39	},
40};
41use reifydb_store_multi::MultiStore;
42use reifydb_store_single::SingleStore;
43use reifydb_transaction::{
44	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
45	multi::transaction::MultiTransaction,
46	single::SingleTransaction,
47	transaction::admin::AdminTransaction,
48};
49use reifydb_type::{
50	fragment::Fragment,
51	params::Params,
52	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
53};
54
55use crate::{engine::StandardEngine, vm::services::EngineConfig};
56
57pub struct TestEngine {
58	engine: StandardEngine,
59}
60
61impl Default for TestEngine {
62	fn default() -> Self {
63		Self::new()
64	}
65}
66
67impl TestEngine {
68	/// Create a new TestEngine with all subsystems (CDC, metrics, etc.).
69	pub fn new() -> Self {
70		Self::builder().with_cdc().with_metrics().build()
71	}
72
73	/// Start configuring a test engine via the builder.
74	pub fn builder() -> TestEngineBuilder {
75		TestEngineBuilder::default()
76	}
77
78	/// Run an admin RQL statement as system identity. Panics on error.
79	pub fn admin(&self, rql: &str) -> Vec<Frame> {
80		self.engine
81			.admin_as(IdentityId::system(), rql, Params::None)
82			.unwrap_or_else(|e| panic!("admin failed: {e:?}\nrql: {rql}"))
83	}
84
85	/// Run a command RQL statement as system identity. Panics on error.
86	pub fn command(&self, rql: &str) -> Vec<Frame> {
87		self.engine
88			.command_as(IdentityId::system(), rql, Params::None)
89			.unwrap_or_else(|e| panic!("command failed: {e:?}\nrql: {rql}"))
90	}
91
92	/// Run a query RQL statement as system identity. Panics on error.
93	pub fn query(&self, rql: &str) -> Vec<Frame> {
94		self.engine
95			.query_as(IdentityId::system(), rql, Params::None)
96			.unwrap_or_else(|e| panic!("query failed: {e:?}\nrql: {rql}"))
97	}
98
99	/// Run an admin statement expecting an error. Panics if it succeeds.
100	pub fn admin_err(&self, rql: &str) -> String {
101		match self.engine.admin_as(IdentityId::system(), rql, Params::None) {
102			Err(e) => format!("{e:?}"),
103			Ok(_) => panic!("Expected error but admin succeeded\nrql: {rql}"),
104		}
105	}
106
107	/// Run a command statement expecting an error. Panics if it succeeds.
108	pub fn command_err(&self, rql: &str) -> String {
109		match self.engine.command_as(IdentityId::system(), rql, Params::None) {
110			Err(e) => format!("{e:?}"),
111			Ok(_) => panic!("Expected error but command succeeded\nrql: {rql}"),
112		}
113	}
114
115	/// Run a query statement expecting an error. Panics if it succeeds.
116	pub fn query_err(&self, rql: &str) -> String {
117		match self.engine.query_as(IdentityId::system(), rql, Params::None) {
118			Err(e) => format!("{e:?}"),
119			Ok(_) => panic!("Expected error but query succeeded\nrql: {rql}"),
120		}
121	}
122
123	/// Count rows in the first frame.
124	pub fn row_count(frames: &[Frame]) -> usize {
125		frames.first().map(|f| f.row_count()).unwrap_or(0)
126	}
127
128	/// Return the system identity used by this harness.
129	pub fn identity() -> IdentityId {
130		IdentityId::system()
131	}
132
133	/// Access the underlying StandardEngine.
134	pub fn inner(&self) -> &StandardEngine {
135		&self.engine
136	}
137}
138
139impl Deref for TestEngine {
140	type Target = StandardEngine;
141
142	fn deref(&self) -> &StandardEngine {
143		&self.engine
144	}
145}
146
147#[derive(Default)]
148pub struct TestEngineBuilder {
149	cdc: bool,
150	metrics: bool,
151}
152
153impl TestEngineBuilder {
154	pub fn with_cdc(mut self) -> Self {
155		self.cdc = true;
156		self
157	}
158
159	pub fn with_metrics(mut self) -> Self {
160		self.metrics = true;
161		self
162	}
163
164	pub fn build(self) -> TestEngine {
165		let actor_system = ActorSystem::new(1);
166		let eventbus = EventBus::new(&actor_system);
167		let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
168		let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
169		let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
170		let runtime = SharedRuntime::from_config(
171			SharedRuntimeConfig::default().async_threads(2).compute_threads(2).deterministic_testing(1000),
172		);
173		let materialized_catalog = MaterializedCatalog::new();
174		let multi = MultiTransaction::new(
175			multi_store.clone(),
176			single.clone(),
177			eventbus.clone(),
178			actor_system,
179			runtime.clock().clone(),
180			runtime.rng().clone(),
181			Arc::new(materialized_catalog.clone()),
182		)
183		.unwrap();
184
185		let mut ioc = IocContainer::new();
186
187		ioc = ioc.register(materialized_catalog.clone());
188
189		ioc = ioc.register(runtime.clone());
190		ioc = ioc.register(single_store.clone());
191
192		if self.metrics {
193			let metrics_worker = Arc::new(MetricsWorker::new(
194				MetricsWorkerConfig::default(),
195				single_store.clone(),
196				multi_store.clone(),
197				eventbus.clone(),
198			));
199			eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(
200				metrics_worker.sender(),
201			));
202			eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
203			eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(
204				metrics_worker.sender(),
205			));
206			ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
207		}
208
209		let cdc_store = CdcStore::memory();
210		ioc = ioc.register(cdc_store.clone());
211
212		let ioc_for_cdc = ioc.clone();
213
214		let engine = StandardEngine::new(
215			multi,
216			single.clone(),
217			eventbus.clone(),
218			InterceptorFactory::default(),
219			Catalog::new(materialized_catalog),
220			EngineConfig {
221				runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
222				functions: default_functions().configure(),
223				procedures: Procedures::empty(),
224				transforms: Transforms::empty(),
225				ioc,
226				#[cfg(not(target_arch = "wasm32"))]
227				remote_registry: None,
228			},
229		);
230
231		if self.cdc {
232			let cdc_handle = spawn_cdc_producer(
233				&runtime.actor_system(),
234				cdc_store,
235				multi_store.clone(),
236				engine.clone(),
237				eventbus.clone(),
238			);
239			eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
240				cdc_handle.actor_ref().clone(),
241				runtime.clock().clone(),
242			));
243			ioc_for_cdc.register_service::<Arc<ActorHandle<CdcProduceMsg>>>(Arc::new(cdc_handle));
244		}
245
246		TestEngine {
247			engine,
248		}
249	}
250}
251
252pub fn create_test_admin_transaction() -> AdminTransaction {
253	let multi_store = MultiStore::testing_memory();
254	let single_store = SingleStore::testing_memory();
255
256	let actor_system = ActorSystem::new(1);
257	let event_bus = EventBus::new(&actor_system);
258	let single = SingleTransaction::new(single_store, event_bus.clone());
259	let multi = MultiTransaction::new(
260		multi_store,
261		single.clone(),
262		event_bus.clone(),
263		actor_system,
264		Clock::Mock(MockClock::from_millis(1000)),
265		Rng::seeded(42),
266		Arc::new(MaterializedCatalog::new()),
267	)
268	.unwrap();
269
270	AdminTransaction::new(
271		multi,
272		single,
273		event_bus,
274		Interceptors::new(),
275		IdentityId::system(),
276		Clock::Mock(MockClock::from_millis(1000)),
277	)
278	.unwrap()
279}
280
281pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
282	let multi_store = MultiStore::testing_memory();
283	let single_store = SingleStore::testing_memory();
284
285	let actor_system = ActorSystem::new(1);
286	let event_bus = EventBus::new(&actor_system);
287	let single = SingleTransaction::new(single_store, event_bus.clone());
288	let multi = MultiTransaction::new(
289		multi_store,
290		single.clone(),
291		event_bus.clone(),
292		actor_system,
293		Clock::Mock(MockClock::from_millis(1000)),
294		Rng::seeded(42),
295		Arc::new(MaterializedCatalog::new()),
296	)
297	.unwrap();
298	let mut result = AdminTransaction::new(
299		multi,
300		single.clone(),
301		event_bus.clone(),
302		Interceptors::new(),
303		IdentityId::system(),
304		Clock::Mock(MockClock::from_millis(1000)),
305	)
306	.unwrap();
307
308	let materialized_catalog = MaterializedCatalog::new();
309	let catalog = Catalog::new(materialized_catalog);
310
311	let namespace = catalog
312		.create_namespace(
313			&mut result,
314			NamespaceToCreate {
315				namespace_fragment: None,
316				name: "reifydb".to_string(),
317				local_name: "reifydb".to_string(),
318				parent_id: NamespaceId::ROOT,
319				grpc: None,
320				token: None,
321			},
322		)
323		.unwrap();
324
325	catalog.create_table(
326		&mut result,
327		TableToCreate {
328			name: Fragment::internal("flows"),
329			namespace: namespace.id(),
330			columns: vec![
331				TableColumnToCreate {
332					name: Fragment::internal("id"),
333					fragment: Fragment::None,
334					constraint: TypeConstraint::unconstrained(Type::Int8),
335					properties: vec![],
336					auto_increment: true,
337					dictionary_id: None,
338				},
339				TableColumnToCreate {
340					name: Fragment::internal("data"),
341					fragment: Fragment::None,
342					constraint: TypeConstraint::unconstrained(Type::Blob),
343					properties: vec![],
344					auto_increment: false,
345					dictionary_id: None,
346				},
347			],
348			retention_strategy: None,
349			primary_key_columns: None,
350		},
351	)
352	.unwrap();
353
354	result
355}