Skip to main content

reifydb_engine/
test_utils.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_catalog::{
7	catalog::{
8		Catalog,
9		namespace::NamespaceToCreate,
10		table::{TableColumnToCreate, TableToCreate},
11	},
12	materialized::MaterializedCatalog,
13	schema::SchemaRegistry,
14};
15use reifydb_cdc::{
16	produce::producer::{CdcProduceMsg, CdcProducerEventListener, spawn_cdc_producer},
17	storage::CdcStore,
18};
19use reifydb_core::{
20	config::SystemConfig,
21	event::{
22		EventBus,
23		metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
24		transaction::PostCommitEvent,
25	},
26	interface::catalog::id::NamespaceId,
27	util::ioc::IocContainer,
28};
29use reifydb_function::registry::Functions;
30use reifydb_metric::worker::{
31	CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
32};
33use reifydb_runtime::{
34	SharedRuntime, SharedRuntimeConfig,
35	actor::system::{ActorHandle, ActorSystem, ActorSystemConfig},
36	clock::Clock,
37};
38use reifydb_store_multi::MultiStore;
39use reifydb_store_single::SingleStore;
40use reifydb_transaction::{
41	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
42	multi::transaction::{MultiTransaction, register_oracle_defaults},
43	single::SingleTransaction,
44	transaction::admin::AdminTransaction,
45};
46use reifydb_type::{
47	fragment::Fragment,
48	value::{constraint::TypeConstraint, r#type::Type},
49};
50
51use crate::{engine::StandardEngine, procedure::registry::Procedures, transform::registry::Transforms};
52
53pub fn create_test_admin_transaction() -> AdminTransaction {
54	let multi_store = MultiStore::testing_memory();
55	let single_store = SingleStore::testing_memory();
56
57	let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
58	let event_bus = EventBus::new(&actor_system);
59	let single = SingleTransaction::new(single_store, event_bus.clone());
60	let system_config = SystemConfig::new();
61	register_oracle_defaults(&system_config);
62	let multi = MultiTransaction::new(
63		multi_store,
64		single.clone(),
65		event_bus.clone(),
66		actor_system,
67		Clock::default(),
68		system_config,
69	)
70	.unwrap();
71
72	AdminTransaction::new(multi, single, event_bus, Interceptors::new()).unwrap()
73}
74
75pub fn create_test_admin_transaction_with_internal_schema() -> AdminTransaction {
76	let multi_store = MultiStore::testing_memory();
77	let single_store = SingleStore::testing_memory();
78
79	let actor_system = ActorSystem::new(ActorSystemConfig {
80		pool_threads: 1,
81		max_in_flight: 1,
82	});
83	let event_bus = EventBus::new(&actor_system);
84	let single = SingleTransaction::new(single_store, event_bus.clone());
85	let system_config = SystemConfig::new();
86	register_oracle_defaults(&system_config);
87	let multi = MultiTransaction::new(
88		multi_store,
89		single.clone(),
90		event_bus.clone(),
91		actor_system,
92		Clock::default(),
93		system_config,
94	)
95	.unwrap();
96	let mut result = AdminTransaction::new(multi, single.clone(), event_bus.clone(), Interceptors::new()).unwrap();
97
98	let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
99	let schema_registry = SchemaRegistry::new(single);
100	let catalog = Catalog::new(materialized_catalog, schema_registry);
101
102	let namespace = catalog
103		.create_namespace(
104			&mut result,
105			NamespaceToCreate {
106				namespace_fragment: None,
107				name: "reifydb".to_string(),
108				local_name: "reifydb".to_string(),
109				parent_id: NamespaceId::ROOT,
110				grpc: None,
111			},
112		)
113		.unwrap();
114
115	catalog.create_table(
116		&mut result,
117		TableToCreate {
118			name: Fragment::internal("flows"),
119			namespace: namespace.id(),
120			columns: vec![
121				TableColumnToCreate {
122					name: Fragment::internal("id"),
123					fragment: Fragment::None,
124					constraint: TypeConstraint::unconstrained(Type::Int8),
125					properties: vec![],
126					auto_increment: true,
127					dictionary_id: None,
128				},
129				TableColumnToCreate {
130					name: Fragment::internal("data"),
131					fragment: Fragment::None,
132					constraint: TypeConstraint::unconstrained(Type::Blob),
133					properties: vec![],
134					auto_increment: false,
135					dictionary_id: None,
136				},
137			],
138			retention_policy: None,
139			primary_key_columns: None,
140		},
141	)
142	.unwrap();
143
144	result
145}
146
147/// Create a test StandardEngine with all required dependencies registered.
148pub fn create_test_engine() -> StandardEngine {
149	let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
150	let eventbus = EventBus::new(&actor_system);
151	let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
152	let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
153	let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
154	let runtime = SharedRuntime::from_config(
155		SharedRuntimeConfig::default()
156			.async_threads(2)
157			.compute_threads(2)
158			.compute_max_in_flight(32)
159			.mock_clock(1000),
160	);
161	let system_config = SystemConfig::new();
162	register_oracle_defaults(&system_config);
163	let multi = MultiTransaction::new(
164		multi_store.clone(),
165		single.clone(),
166		eventbus.clone(),
167		actor_system,
168		runtime.clock().clone(),
169		system_config,
170	)
171	.unwrap();
172
173	let mut ioc = IocContainer::new();
174
175	let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
176	ioc = ioc.register(materialized_catalog.clone());
177
178	let schema_registry = SchemaRegistry::new(single.clone());
179	ioc = ioc.register(schema_registry.clone());
180
181	ioc = ioc.register(runtime.clone());
182
183	let metrics_store = single_store.clone();
184
185	ioc = ioc.register(metrics_store.clone());
186
187	let metrics_worker = Arc::new(MetricsWorker::new(
188		MetricsWorkerConfig::default(),
189		metrics_store,
190		multi_store.clone(),
191		eventbus.clone(),
192	));
193	eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(metrics_worker.sender()));
194	eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
195	eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(metrics_worker.sender()));
196	ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
197
198	let cdc_store = CdcStore::memory();
199	ioc = ioc.register(cdc_store.clone());
200
201	let ioc_for_cdc = ioc.clone();
202
203	let engine = StandardEngine::new(
204		multi,
205		single,
206		eventbus.clone(),
207		InterceptorFactory::default(),
208		Catalog::new(materialized_catalog, schema_registry),
209		runtime.clock().clone(),
210		Functions::builder().build(),
211		Procedures::empty(),
212		Transforms::empty(),
213		ioc,
214		#[cfg(not(target_arch = "wasm32"))]
215		None,
216	);
217
218	let cdc_handle = spawn_cdc_producer(
219		&runtime.actor_system(),
220		cdc_store,
221		multi_store.clone(),
222		engine.clone(),
223		eventbus.clone(),
224	);
225	eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
226		cdc_handle.actor_ref().clone(),
227		runtime.clock().clone(),
228	));
229	ioc_for_cdc.register_service::<Arc<ActorHandle<CdcProduceMsg>>>(Arc::new(cdc_handle));
230
231	engine
232}