Skip to main content

reifydb_engine/
test_harness.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7	catalog::{
8		Catalog,
9		namespace::NamespaceToCreate,
10		table::{TableColumnToCreate, TableToCreate},
11	},
12	materialized::MaterializedCatalog,
13};
14use reifydb_cdc::{
15	produce::producer::{CdcProduceMsg, CdcProducerEventListener, spawn_cdc_producer},
16	storage::CdcStore,
17};
18use reifydb_core::{
19	config::SystemConfig,
20	event::{
21		EventBus,
22		metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
23		transaction::PostCommitEvent,
24	},
25	interface::catalog::id::NamespaceId,
26	util::ioc::IocContainer,
27};
28use reifydb_extension::transform::registry::Transforms;
29use reifydb_metric::worker::{
30	CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
31};
32use reifydb_routine::{function::default_functions, procedure::registry::Procedures};
33use reifydb_runtime::{
34	SharedRuntime, SharedRuntimeConfig,
35	actor::system::{ActorHandle, ActorSystem},
36	context::{
37		RuntimeContext,
38		clock::{Clock, MockClock},
39		rng::Rng,
40	},
41};
42use reifydb_store_multi::MultiStore;
43use reifydb_store_single::SingleStore;
44use reifydb_transaction::{
45	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
46	multi::transaction::{MultiTransaction, register_oracle_defaults},
47	single::SingleTransaction,
48	transaction::admin::AdminTransaction,
49};
50use reifydb_type::{
51	fragment::Fragment,
52	params::Params,
53	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
54};
55
56use crate::{engine::StandardEngine, vm::services::EngineConfig};
57
58pub struct TestEngine {
59	engine: StandardEngine,
60}
61
62impl Default for TestEngine {
63	fn default() -> Self {
64		Self::new()
65	}
66}
67
68impl TestEngine {
69	/// Create a new TestEngine with all subsystems (CDC, metrics, etc.).
70	pub fn new() -> Self {
71		Self::builder().with_cdc().with_metrics().build()
72	}
73
74	/// Start configuring a test engine via the builder.
75	pub fn builder() -> TestEngineBuilder {
76		TestEngineBuilder::default()
77	}
78
79	/// Run an admin RQL statement as system identity. Panics on error.
80	pub fn admin(&self, rql: &str) -> Vec<Frame> {
81		self.engine
82			.admin_as(IdentityId::system(), rql, Params::None)
83			.unwrap_or_else(|e| panic!("admin failed: {e:?}\nrql: {rql}"))
84	}
85
86	/// Run a command RQL statement as system identity. Panics on error.
87	pub fn command(&self, rql: &str) -> Vec<Frame> {
88		self.engine
89			.command_as(IdentityId::system(), rql, Params::None)
90			.unwrap_or_else(|e| panic!("command failed: {e:?}\nrql: {rql}"))
91	}
92
93	/// Run a query RQL statement as system identity. Panics on error.
94	pub fn query(&self, rql: &str) -> Vec<Frame> {
95		self.engine
96			.query_as(IdentityId::system(), rql, Params::None)
97			.unwrap_or_else(|e| panic!("query failed: {e:?}\nrql: {rql}"))
98	}
99
100	/// Run an admin statement expecting an error. Panics if it succeeds.
101	pub fn admin_err(&self, rql: &str) -> String {
102		match self.engine.admin_as(IdentityId::system(), rql, Params::None) {
103			Err(e) => format!("{e:?}"),
104			Ok(_) => panic!("Expected error but admin succeeded\nrql: {rql}"),
105		}
106	}
107
108	/// Run a command statement expecting an error. Panics if it succeeds.
109	pub fn command_err(&self, rql: &str) -> String {
110		match self.engine.command_as(IdentityId::system(), rql, Params::None) {
111			Err(e) => format!("{e:?}"),
112			Ok(_) => panic!("Expected error but command succeeded\nrql: {rql}"),
113		}
114	}
115
116	/// Run a query statement expecting an error. Panics if it succeeds.
117	pub fn query_err(&self, rql: &str) -> String {
118		match self.engine.query_as(IdentityId::system(), rql, Params::None) {
119			Err(e) => format!("{e:?}"),
120			Ok(_) => panic!("Expected error but query succeeded\nrql: {rql}"),
121		}
122	}
123
124	/// Count rows in the first frame.
125	pub fn row_count(frames: &[Frame]) -> usize {
126		frames.first().map(|f| f.row_count()).unwrap_or(0)
127	}
128
129	/// Return the system identity used by this harness.
130	pub fn identity() -> IdentityId {
131		IdentityId::system()
132	}
133
134	/// Access the underlying StandardEngine.
135	pub fn inner(&self) -> &StandardEngine {
136		&self.engine
137	}
138}
139
140impl Deref for TestEngine {
141	type Target = StandardEngine;
142
143	fn deref(&self) -> &StandardEngine {
144		&self.engine
145	}
146}
147
148#[derive(Default)]
149pub struct TestEngineBuilder {
150	cdc: bool,
151	metrics: bool,
152}
153
154impl TestEngineBuilder {
155	pub fn with_cdc(mut self) -> Self {
156		self.cdc = true;
157		self
158	}
159
160	pub fn with_metrics(mut self) -> Self {
161		self.metrics = true;
162		self
163	}
164
165	pub fn build(self) -> TestEngine {
166		let actor_system = ActorSystem::new(1);
167		let eventbus = EventBus::new(&actor_system);
168		let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
169		let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
170		let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
171		let runtime = SharedRuntime::from_config(
172			SharedRuntimeConfig::default().async_threads(2).compute_threads(2).deterministic_testing(1000),
173		);
174		let system_config = SystemConfig::new();
175		register_oracle_defaults(&system_config);
176		let multi = MultiTransaction::new(
177			multi_store.clone(),
178			single.clone(),
179			eventbus.clone(),
180			actor_system,
181			runtime.clock().clone(),
182			runtime.rng().clone(),
183			system_config,
184		)
185		.unwrap();
186
187		let mut ioc = IocContainer::new();
188
189		let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
190		ioc = ioc.register(materialized_catalog.clone());
191
192		ioc = ioc.register(runtime.clone());
193		ioc = ioc.register(single_store.clone());
194
195		if self.metrics {
196			let metrics_worker = Arc::new(MetricsWorker::new(
197				MetricsWorkerConfig::default(),
198				single_store.clone(),
199				multi_store.clone(),
200				eventbus.clone(),
201			));
202			eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(
203				metrics_worker.sender(),
204			));
205			eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
206			eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(
207				metrics_worker.sender(),
208			));
209			ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
210		}
211
212		let cdc_store = CdcStore::memory();
213		ioc = ioc.register(cdc_store.clone());
214
215		let ioc_for_cdc = ioc.clone();
216
217		let engine = StandardEngine::new(
218			multi,
219			single.clone(),
220			eventbus.clone(),
221			InterceptorFactory::default(),
222			Catalog::new(materialized_catalog),
223			EngineConfig {
224				runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
225				functions: default_functions().configure(),
226				procedures: Procedures::empty(),
227				transforms: Transforms::empty(),
228				ioc,
229				#[cfg(not(target_arch = "wasm32"))]
230				remote_registry: None,
231			},
232		);
233
234		if self.cdc {
235			let cdc_handle = spawn_cdc_producer(
236				&runtime.actor_system(),
237				cdc_store,
238				multi_store.clone(),
239				engine.clone(),
240				eventbus.clone(),
241			);
242			eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
243				cdc_handle.actor_ref().clone(),
244				runtime.clock().clone(),
245			));
246			ioc_for_cdc.register_service::<Arc<ActorHandle<CdcProduceMsg>>>(Arc::new(cdc_handle));
247		}
248
249		TestEngine {
250			engine,
251		}
252	}
253}
254
255pub fn create_test_admin_transaction() -> AdminTransaction {
256	let multi_store = MultiStore::testing_memory();
257	let single_store = SingleStore::testing_memory();
258
259	let actor_system = ActorSystem::new(1);
260	let event_bus = EventBus::new(&actor_system);
261	let single = SingleTransaction::new(single_store, event_bus.clone());
262	let system_config = SystemConfig::new();
263	register_oracle_defaults(&system_config);
264	let multi = MultiTransaction::new(
265		multi_store,
266		single.clone(),
267		event_bus.clone(),
268		actor_system,
269		Clock::Mock(MockClock::from_millis(1000)),
270		Rng::seeded(42),
271		system_config,
272	)
273	.unwrap();
274
275	AdminTransaction::new(
276		multi,
277		single,
278		event_bus,
279		Interceptors::new(),
280		IdentityId::system(),
281		Clock::Mock(MockClock::from_millis(1000)),
282	)
283	.unwrap()
284}
285
286pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
287	let multi_store = MultiStore::testing_memory();
288	let single_store = SingleStore::testing_memory();
289
290	let actor_system = ActorSystem::new(1);
291	let event_bus = EventBus::new(&actor_system);
292	let single = SingleTransaction::new(single_store, event_bus.clone());
293	let system_config = SystemConfig::new();
294	register_oracle_defaults(&system_config);
295	let multi = MultiTransaction::new(
296		multi_store,
297		single.clone(),
298		event_bus.clone(),
299		actor_system,
300		Clock::Mock(MockClock::from_millis(1000)),
301		Rng::seeded(42),
302		system_config,
303	)
304	.unwrap();
305	let mut result = AdminTransaction::new(
306		multi,
307		single.clone(),
308		event_bus.clone(),
309		Interceptors::new(),
310		IdentityId::system(),
311		Clock::Mock(MockClock::from_millis(1000)),
312	)
313	.unwrap();
314
315	let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
316	let catalog = Catalog::new(materialized_catalog);
317
318	let namespace = catalog
319		.create_namespace(
320			&mut result,
321			NamespaceToCreate {
322				namespace_fragment: None,
323				name: "reifydb".to_string(),
324				local_name: "reifydb".to_string(),
325				parent_id: NamespaceId::ROOT,
326				grpc: None,
327				token: None,
328			},
329		)
330		.unwrap();
331
332	catalog.create_table(
333		&mut result,
334		TableToCreate {
335			name: Fragment::internal("flows"),
336			namespace: namespace.id(),
337			columns: vec![
338				TableColumnToCreate {
339					name: Fragment::internal("id"),
340					fragment: Fragment::None,
341					constraint: TypeConstraint::unconstrained(Type::Int8),
342					properties: vec![],
343					auto_increment: true,
344					dictionary_id: None,
345				},
346				TableColumnToCreate {
347					name: Fragment::internal("data"),
348					fragment: Fragment::None,
349					constraint: TypeConstraint::unconstrained(Type::Blob),
350					properties: vec![],
351					auto_increment: false,
352					dictionary_id: None,
353				},
354			],
355			retention_strategy: None,
356			primary_key_columns: None,
357		},
358	)
359	.unwrap();
360
361	result
362}