Skip to main content

reifydb_engine/
test_harness.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7	catalog::{
8		Catalog,
9		namespace::NamespaceToCreate,
10		table::{TableColumnToCreate, TableToCreate},
11	},
12	materialized::MaterializedCatalog,
13};
14use reifydb_cdc::{
15	produce::producer::{CdcProducerEventListener, spawn_cdc_producer},
16	storage::CdcStore,
17};
18use reifydb_core::{
19	actors::cdc::CdcProduceHandle,
20	event::{EventBus, transaction::PostCommitEvent},
21	interface::catalog::id::NamespaceId,
22	util::ioc::IocContainer,
23};
24use reifydb_extension::transform::registry::Transforms;
25use reifydb_routine::{function::default_functions, procedure::registry::Procedures};
26use reifydb_runtime::{
27	SharedRuntime, SharedRuntimeConfig,
28	actor::system::ActorSystem,
29	context::{
30		RuntimeContext,
31		clock::{Clock, MockClock},
32		rng::Rng,
33	},
34	pool::{PoolConfig, Pools},
35};
36use reifydb_store_multi::MultiStore;
37use reifydb_store_single::SingleStore;
38use reifydb_transaction::{
39	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
40	multi::transaction::MultiTransaction,
41	single::SingleTransaction,
42	transaction::admin::AdminTransaction,
43};
44use reifydb_type::{
45	fragment::Fragment,
46	params::Params,
47	value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
48};
49
50use crate::{engine::StandardEngine, vm::services::EngineConfig};
51
52pub struct TestEngine {
53	engine: StandardEngine,
54}
55
56impl Default for TestEngine {
57	fn default() -> Self {
58		Self::new()
59	}
60}
61
62impl TestEngine {
63	/// Create a new TestEngine with all subsystems (CDC, metrics, etc.).
64	pub fn new() -> Self {
65		Self::builder().with_cdc().build()
66	}
67
68	/// Start configuring a test engine via the builder.
69	pub fn builder() -> TestEngineBuilder {
70		TestEngineBuilder::default()
71	}
72
73	/// Run an admin RQL statement as system identity. Panics on error.
74	pub fn admin(&self, rql: &str) -> Vec<Frame> {
75		let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
76		if let Some(e) = r.error {
77			panic!("admin failed: {e:?}\nrql: {rql}")
78		}
79		r.frames
80	}
81
82	/// Run a command RQL statement as system identity. Panics on error.
83	pub fn command(&self, rql: &str) -> Vec<Frame> {
84		let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
85		if let Some(e) = r.error {
86			panic!("command failed: {e:?}\nrql: {rql}")
87		}
88		r.frames
89	}
90
91	/// Run a query RQL statement as system identity. Panics on error.
92	pub fn query(&self, rql: &str) -> Vec<Frame> {
93		let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
94		if let Some(e) = r.error {
95			panic!("query failed: {e:?}\nrql: {rql}")
96		}
97		r.frames
98	}
99
100	/// Run an admin statement expecting an error. Panics if it succeeds.
101	pub fn admin_err(&self, rql: &str) -> String {
102		let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
103		match r.error {
104			Some(e) => format!("{e:?}"),
105			None => panic!("Expected error but admin succeeded\nrql: {rql}"),
106		}
107	}
108
109	/// Run a command statement expecting an error. Panics if it succeeds.
110	pub fn command_err(&self, rql: &str) -> String {
111		let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
112		match r.error {
113			Some(e) => format!("{e:?}"),
114			None => panic!("Expected error but command succeeded\nrql: {rql}"),
115		}
116	}
117
118	/// Run a query statement expecting an error. Panics if it succeeds.
119	pub fn query_err(&self, rql: &str) -> String {
120		let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
121		match r.error {
122			Some(e) => format!("{e:?}"),
123			None => panic!("Expected error but query succeeded\nrql: {rql}"),
124		}
125	}
126
127	/// Count rows in the first frame.
128	pub fn row_count(frames: &[Frame]) -> usize {
129		frames.first().map(|f| f.row_count()).unwrap_or(0)
130	}
131
132	/// Return the system identity used by this harness.
133	pub fn identity() -> IdentityId {
134		IdentityId::system()
135	}
136
137	/// Access the underlying StandardEngine.
138	pub fn inner(&self) -> &StandardEngine {
139		&self.engine
140	}
141}
142
143impl Deref for TestEngine {
144	type Target = StandardEngine;
145
146	fn deref(&self) -> &StandardEngine {
147		&self.engine
148	}
149}
150
151#[derive(Default)]
152pub struct TestEngineBuilder {
153	cdc: bool,
154}
155
156impl TestEngineBuilder {
157	pub fn with_cdc(mut self) -> Self {
158		self.cdc = true;
159		self
160	}
161
162	pub fn build(self) -> TestEngine {
163		let pools = Pools::new(PoolConfig::default());
164		let actor_system = ActorSystem::new(pools, Clock::Real);
165		let eventbus = EventBus::new(&actor_system);
166		let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
167		let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
168		let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
169		let runtime = SharedRuntime::from_config(
170			SharedRuntimeConfig::default().async_threads(2).system_threads(2).query_threads(2).seeded(1000),
171		);
172		let materialized_catalog = MaterializedCatalog::new();
173		let multi = MultiTransaction::new(
174			multi_store.clone(),
175			single.clone(),
176			eventbus.clone(),
177			actor_system,
178			runtime.clock().clone(),
179			runtime.rng().clone(),
180			Arc::new(materialized_catalog.clone()),
181		)
182		.unwrap();
183
184		let mut ioc = IocContainer::new();
185
186		ioc = ioc.register(materialized_catalog.clone());
187
188		ioc = ioc.register(runtime.clone());
189		ioc = ioc.register(single_store.clone());
190
191		let cdc_store = CdcStore::memory();
192		ioc = ioc.register(cdc_store.clone());
193
194		let ioc_for_cdc = ioc.clone();
195
196		let engine = StandardEngine::new(
197			multi,
198			single.clone(),
199			eventbus.clone(),
200			InterceptorFactory::default(),
201			Catalog::new(materialized_catalog),
202			EngineConfig {
203				runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
204				functions: default_functions().configure(),
205				procedures: Procedures::empty(),
206				transforms: Transforms::empty(),
207				ioc,
208				#[cfg(not(reifydb_single_threaded))]
209				remote_registry: None,
210			},
211		);
212
213		if self.cdc {
214			let cdc_handle = spawn_cdc_producer(
215				&runtime.actor_system(),
216				cdc_store,
217				multi_store.clone(),
218				engine.clone(),
219				eventbus.clone(),
220			);
221			eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
222				cdc_handle.actor_ref().clone(),
223				runtime.clock().clone(),
224			));
225			ioc_for_cdc.register_service::<Arc<CdcProduceHandle>>(Arc::new(cdc_handle));
226		}
227
228		TestEngine {
229			engine,
230		}
231	}
232}
233
234pub fn create_test_admin_transaction() -> AdminTransaction {
235	let multi_store = MultiStore::testing_memory();
236	let single_store = SingleStore::testing_memory();
237
238	let pools = Pools::new(PoolConfig::default());
239	let actor_system = ActorSystem::new(pools, Clock::Real);
240	let event_bus = EventBus::new(&actor_system);
241	let single = SingleTransaction::new(single_store, event_bus.clone());
242	let multi = MultiTransaction::new(
243		multi_store,
244		single.clone(),
245		event_bus.clone(),
246		actor_system,
247		Clock::Mock(MockClock::from_millis(1000)),
248		Rng::seeded(42),
249		Arc::new(MaterializedCatalog::new()),
250	)
251	.unwrap();
252
253	AdminTransaction::new(
254		multi,
255		single,
256		event_bus,
257		Interceptors::new(),
258		IdentityId::system(),
259		Clock::Mock(MockClock::from_millis(1000)),
260	)
261	.unwrap()
262}
263
264pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
265	let multi_store = MultiStore::testing_memory();
266	let single_store = SingleStore::testing_memory();
267
268	let pools = Pools::new(PoolConfig::default());
269	let actor_system = ActorSystem::new(pools, Clock::Real);
270	let event_bus = EventBus::new(&actor_system);
271	let single = SingleTransaction::new(single_store, event_bus.clone());
272	let multi = MultiTransaction::new(
273		multi_store,
274		single.clone(),
275		event_bus.clone(),
276		actor_system,
277		Clock::Mock(MockClock::from_millis(1000)),
278		Rng::seeded(42),
279		Arc::new(MaterializedCatalog::new()),
280	)
281	.unwrap();
282	let mut result = AdminTransaction::new(
283		multi,
284		single.clone(),
285		event_bus.clone(),
286		Interceptors::new(),
287		IdentityId::system(),
288		Clock::Mock(MockClock::from_millis(1000)),
289	)
290	.unwrap();
291
292	let materialized_catalog = MaterializedCatalog::new();
293	let catalog = Catalog::new(materialized_catalog);
294
295	let namespace = catalog
296		.create_namespace(
297			&mut result,
298			NamespaceToCreate {
299				namespace_fragment: None,
300				name: "reifydb".to_string(),
301				local_name: "reifydb".to_string(),
302				parent_id: NamespaceId::ROOT,
303				grpc: None,
304				token: None,
305			},
306		)
307		.unwrap();
308
309	catalog.create_table(
310		&mut result,
311		TableToCreate {
312			name: Fragment::internal("flows"),
313			namespace: namespace.id(),
314			columns: vec![
315				TableColumnToCreate {
316					name: Fragment::internal("id"),
317					fragment: Fragment::None,
318					constraint: TypeConstraint::unconstrained(Type::Int8),
319					properties: vec![],
320					auto_increment: true,
321					dictionary_id: None,
322				},
323				TableColumnToCreate {
324					name: Fragment::internal("data"),
325					fragment: Fragment::None,
326					constraint: TypeConstraint::unconstrained(Type::Blob),
327					properties: vec![],
328					auto_increment: false,
329					dictionary_id: None,
330				},
331			],
332			retention_strategy: None,
333			primary_key_columns: None,
334			underlying: false,
335		},
336	)
337	.unwrap();
338
339	result
340}