Skip to main content

reifydb_engine/
test_harness.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7	catalog::{
8		Catalog,
9		namespace::NamespaceToCreate,
10		table::{TableColumnToCreate, TableToCreate},
11	},
12	materialized::MaterializedCatalog,
13	schema::RowSchemaRegistry,
14};
15use reifydb_cdc::{
16	produce::producer::{CdcProduceMsg, CdcProducerEventListener, spawn_cdc_producer},
17	storage::CdcStore,
18};
19use reifydb_core::{
20	config::SystemConfig,
21	event::{
22		EventBus,
23		metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
24		transaction::PostCommitEvent,
25	},
26	interface::catalog::id::NamespaceId,
27	util::ioc::IocContainer,
28};
29use reifydb_extension::transform::registry::Transforms;
30use reifydb_metric::worker::{
31	CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
32};
33use reifydb_routine::{function::default_functions, procedure::registry::Procedures};
34use reifydb_runtime::{
35	SharedRuntime, SharedRuntimeConfig,
36	actor::system::{ActorHandle, ActorSystem, ActorSystemConfig},
37	context::{RuntimeContext, clock::Clock},
38};
39use reifydb_store_multi::MultiStore;
40use reifydb_store_single::SingleStore;
41use reifydb_transaction::{
42	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
43	multi::transaction::{MultiTransaction, register_oracle_defaults},
44	single::SingleTransaction,
45	transaction::admin::AdminTransaction,
46};
47use reifydb_type::{
48	fragment::Fragment,
49	params::Params,
50	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
51};
52
53use crate::engine::StandardEngine;
54
55pub struct TestEngine {
56	engine: StandardEngine,
57}
58
59impl TestEngine {
60	/// Create a new TestEngine with all subsystems (CDC, metrics, etc.).
61	pub fn new() -> Self {
62		Self::builder().with_cdc().with_metrics().build()
63	}
64
65	/// Start configuring a test engine via the builder.
66	pub fn builder() -> TestEngineBuilder {
67		TestEngineBuilder::default()
68	}
69
70	/// Run an admin RQL statement as system identity. Panics on error.
71	pub fn admin(&self, rql: &str) -> Vec<Frame> {
72		self.engine
73			.admin_as(IdentityId::system(), rql, Params::None)
74			.unwrap_or_else(|e| panic!("admin failed: {e:?}\nrql: {rql}"))
75	}
76
77	/// Run a command RQL statement as system identity. Panics on error.
78	pub fn command(&self, rql: &str) -> Vec<Frame> {
79		self.engine
80			.command_as(IdentityId::system(), rql, Params::None)
81			.unwrap_or_else(|e| panic!("command failed: {e:?}\nrql: {rql}"))
82	}
83
84	/// Run a query RQL statement as system identity. Panics on error.
85	pub fn query(&self, rql: &str) -> Vec<Frame> {
86		self.engine
87			.query_as(IdentityId::system(), rql, Params::None)
88			.unwrap_or_else(|e| panic!("query failed: {e:?}\nrql: {rql}"))
89	}
90
91	/// Run an admin statement expecting an error. Panics if it succeeds.
92	pub fn admin_err(&self, rql: &str) -> String {
93		match self.engine.admin_as(IdentityId::system(), rql, Params::None) {
94			Err(e) => format!("{e:?}"),
95			Ok(_) => panic!("Expected error but admin succeeded\nrql: {rql}"),
96		}
97	}
98
99	/// Run a command statement expecting an error. Panics if it succeeds.
100	pub fn command_err(&self, rql: &str) -> String {
101		match self.engine.command_as(IdentityId::system(), rql, Params::None) {
102			Err(e) => format!("{e:?}"),
103			Ok(_) => panic!("Expected error but command succeeded\nrql: {rql}"),
104		}
105	}
106
107	/// Run a query statement expecting an error. Panics if it succeeds.
108	pub fn query_err(&self, rql: &str) -> String {
109		match self.engine.query_as(IdentityId::system(), rql, Params::None) {
110			Err(e) => format!("{e:?}"),
111			Ok(_) => panic!("Expected error but query succeeded\nrql: {rql}"),
112		}
113	}
114
115	/// Count rows in the first frame.
116	pub fn row_count(frames: &[Frame]) -> usize {
117		frames.first().map(|f| f.row_count()).unwrap_or(0)
118	}
119
120	/// Return the system identity used by this harness.
121	pub fn identity() -> IdentityId {
122		IdentityId::system()
123	}
124
125	/// Access the underlying StandardEngine.
126	pub fn inner(&self) -> &StandardEngine {
127		&self.engine
128	}
129}
130
131impl Deref for TestEngine {
132	type Target = StandardEngine;
133
134	fn deref(&self) -> &StandardEngine {
135		&self.engine
136	}
137}
138
139#[derive(Default)]
140pub struct TestEngineBuilder {
141	cdc: bool,
142	metrics: bool,
143}
144
145impl TestEngineBuilder {
146	pub fn with_cdc(mut self) -> Self {
147		self.cdc = true;
148		self
149	}
150
151	pub fn with_metrics(mut self) -> Self {
152		self.metrics = true;
153		self
154	}
155
156	pub fn build(self) -> TestEngine {
157		let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
158		let eventbus = EventBus::new(&actor_system);
159		let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
160		let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
161		let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
162		let runtime = SharedRuntime::from_config(
163			SharedRuntimeConfig::default()
164				.async_threads(2)
165				.compute_threads(2)
166				.compute_max_in_flight(32)
167				.deterministic_testing(1000),
168		);
169		let system_config = SystemConfig::new();
170		register_oracle_defaults(&system_config);
171		let multi = MultiTransaction::new(
172			multi_store.clone(),
173			single.clone(),
174			eventbus.clone(),
175			actor_system,
176			runtime.clock().clone(),
177			system_config,
178		)
179		.unwrap();
180
181		let mut ioc = IocContainer::new();
182
183		let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
184		ioc = ioc.register(materialized_catalog.clone());
185
186		let row_schema_registry = RowSchemaRegistry::new(single.clone());
187		ioc = ioc.register(row_schema_registry.clone());
188
189		ioc = ioc.register(runtime.clone());
190		ioc = ioc.register(single_store.clone());
191
192		if self.metrics {
193			let metrics_worker = Arc::new(MetricsWorker::new(
194				MetricsWorkerConfig::default(),
195				single_store.clone(),
196				multi_store.clone(),
197				eventbus.clone(),
198			));
199			eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(
200				metrics_worker.sender(),
201			));
202			eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
203			eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(
204				metrics_worker.sender(),
205			));
206			ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
207		}
208
209		let cdc_store = CdcStore::memory();
210		ioc = ioc.register(cdc_store.clone());
211
212		let ioc_for_cdc = ioc.clone();
213
214		let engine = StandardEngine::new(
215			multi,
216			single,
217			eventbus.clone(),
218			InterceptorFactory::default(),
219			Catalog::new(materialized_catalog, row_schema_registry),
220			RuntimeContext::with_clock(runtime.clock().clone()),
221			default_functions().build(),
222			Procedures::empty(),
223			Transforms::empty(),
224			ioc,
225			#[cfg(not(target_arch = "wasm32"))]
226			None,
227		);
228
229		if self.cdc {
230			let cdc_handle = spawn_cdc_producer(
231				&runtime.actor_system(),
232				cdc_store,
233				multi_store.clone(),
234				engine.clone(),
235				eventbus.clone(),
236			);
237			eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
238				cdc_handle.actor_ref().clone(),
239				runtime.clock().clone(),
240			));
241			ioc_for_cdc.register_service::<Arc<ActorHandle<CdcProduceMsg>>>(Arc::new(cdc_handle));
242		}
243
244		TestEngine {
245			engine,
246		}
247	}
248}
249
250pub fn create_test_admin_transaction() -> AdminTransaction {
251	let multi_store = MultiStore::testing_memory();
252	let single_store = SingleStore::testing_memory();
253
254	let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
255	let event_bus = EventBus::new(&actor_system);
256	let single = SingleTransaction::new(single_store, event_bus.clone());
257	let system_config = SystemConfig::new();
258	register_oracle_defaults(&system_config);
259	let multi = MultiTransaction::new(
260		multi_store,
261		single.clone(),
262		event_bus.clone(),
263		actor_system,
264		Clock::default(),
265		system_config,
266	)
267	.unwrap();
268
269	AdminTransaction::new(multi, single, event_bus, Interceptors::new(), IdentityId::system()).unwrap()
270}
271
272pub fn create_test_admin_transaction_with_internal_schema() -> AdminTransaction {
273	let multi_store = MultiStore::testing_memory();
274	let single_store = SingleStore::testing_memory();
275
276	let actor_system = ActorSystem::new(ActorSystemConfig {
277		pool_threads: 1,
278		max_in_flight: 1,
279	});
280	let event_bus = EventBus::new(&actor_system);
281	let single = SingleTransaction::new(single_store, event_bus.clone());
282	let system_config = SystemConfig::new();
283	register_oracle_defaults(&system_config);
284	let multi = MultiTransaction::new(
285		multi_store,
286		single.clone(),
287		event_bus.clone(),
288		actor_system,
289		Clock::default(),
290		system_config,
291	)
292	.unwrap();
293	let mut result = AdminTransaction::new(
294		multi,
295		single.clone(),
296		event_bus.clone(),
297		Interceptors::new(),
298		IdentityId::system(),
299	)
300	.unwrap();
301
302	let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
303	let row_schema_registry = RowSchemaRegistry::new(single);
304	let catalog = Catalog::new(materialized_catalog, row_schema_registry);
305
306	let namespace = catalog
307		.create_namespace(
308			&mut result,
309			NamespaceToCreate {
310				namespace_fragment: None,
311				name: "reifydb".to_string(),
312				local_name: "reifydb".to_string(),
313				parent_id: NamespaceId::ROOT,
314				grpc: None,
315				token: None,
316			},
317		)
318		.unwrap();
319
320	catalog.create_table(
321		&mut result,
322		TableToCreate {
323			name: Fragment::internal("flows"),
324			namespace: namespace.id(),
325			columns: vec![
326				TableColumnToCreate {
327					name: Fragment::internal("id"),
328					fragment: Fragment::None,
329					constraint: TypeConstraint::unconstrained(Type::Int8),
330					properties: vec![],
331					auto_increment: true,
332					dictionary_id: None,
333				},
334				TableColumnToCreate {
335					name: Fragment::internal("data"),
336					fragment: Fragment::None,
337					constraint: TypeConstraint::unconstrained(Type::Blob),
338					properties: vec![],
339					auto_increment: false,
340					dictionary_id: None,
341				},
342			],
343			retention_policy: None,
344			primary_key_columns: None,
345		},
346	)
347	.unwrap();
348
349	result
350}