Skip to main content

reifydb_engine/
test_utils.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_catalog::{
7	catalog::{
8		Catalog,
9		namespace::NamespaceToCreate,
10		table::{TableColumnToCreate, TableToCreate},
11	},
12	materialized::MaterializedCatalog,
13	schema::SchemaRegistry,
14};
15use reifydb_cdc::{
16	produce::producer::{CdcProduceMsg, CdcProducerEventListener, spawn_cdc_producer},
17	storage::CdcStore,
18};
19use reifydb_core::{
20	config::SystemConfig,
21	event::{
22		EventBus,
23		metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
24		transaction::PostCommitEvent,
25	},
26	interface::catalog::id::NamespaceId,
27	util::ioc::IocContainer,
28};
29use reifydb_function::registry::Functions;
30use reifydb_metric::worker::{
31	CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
32};
33use reifydb_runtime::{
34	SharedRuntime, SharedRuntimeConfig,
35	actor::system::{ActorHandle, ActorSystem, ActorSystemConfig},
36	clock::Clock,
37};
38use reifydb_store_multi::MultiStore;
39use reifydb_store_single::SingleStore;
40use reifydb_transaction::{
41	interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
42	multi::transaction::{MultiTransaction, register_oracle_defaults},
43	single::SingleTransaction,
44	transaction::{admin::AdminTransaction, command::CommandTransaction},
45};
46use reifydb_type::{
47	fragment::Fragment,
48	value::{constraint::TypeConstraint, r#type::Type},
49};
50
51use crate::{engine::StandardEngine, procedure::registry::Procedures, transform::registry::Transforms};
52
53pub fn create_test_admin_transaction() -> AdminTransaction {
54	let multi_store = MultiStore::testing_memory();
55	let single_store = SingleStore::testing_memory();
56
57	let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
58	let event_bus = EventBus::new(&actor_system);
59	let single = SingleTransaction::new(single_store, event_bus.clone());
60	let system_config = SystemConfig::new();
61	register_oracle_defaults(&system_config);
62	let multi = MultiTransaction::new(
63		multi_store,
64		single.clone(),
65		event_bus.clone(),
66		actor_system,
67		Clock::default(),
68		system_config,
69	)
70	.unwrap();
71
72	AdminTransaction::new(multi, single, event_bus, Interceptors::new()).unwrap()
73}
74
75pub fn create_test_admin_transaction_with_internal_schema() -> AdminTransaction {
76	let multi_store = MultiStore::testing_memory();
77	let single_store = SingleStore::testing_memory();
78
79	let actor_system = ActorSystem::new(ActorSystemConfig {
80		pool_threads: 1,
81		max_in_flight: 1,
82	});
83	let event_bus = EventBus::new(&actor_system);
84	let single = SingleTransaction::new(single_store, event_bus.clone());
85	let system_config = SystemConfig::new();
86	register_oracle_defaults(&system_config);
87	let multi = MultiTransaction::new(
88		multi_store,
89		single.clone(),
90		event_bus.clone(),
91		actor_system,
92		Clock::default(),
93		system_config,
94	)
95	.unwrap();
96	let mut result = AdminTransaction::new(multi, single.clone(), event_bus.clone(), Interceptors::new()).unwrap();
97
98	let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
99	let schema_registry = SchemaRegistry::new(single);
100	let catalog = Catalog::new(materialized_catalog, schema_registry);
101
102	let namespace = catalog
103		.create_namespace(
104			&mut result,
105			NamespaceToCreate {
106				namespace_fragment: None,
107				name: "reifydb".to_string(),
108				parent_id: NamespaceId::ROOT,
109			},
110		)
111		.unwrap();
112
113	catalog.create_table(
114		&mut result,
115		TableToCreate {
116			name: Fragment::internal("flows"),
117			namespace: namespace.id,
118			columns: vec![
119				TableColumnToCreate {
120					name: Fragment::internal("id"),
121					fragment: Fragment::None,
122					constraint: TypeConstraint::unconstrained(Type::Int8),
123					properties: vec![],
124					auto_increment: true,
125					dictionary_id: None,
126				},
127				TableColumnToCreate {
128					name: Fragment::internal("data"),
129					fragment: Fragment::None,
130					constraint: TypeConstraint::unconstrained(Type::Blob),
131					properties: vec![],
132					auto_increment: false,
133					dictionary_id: None,
134				},
135			],
136			retention_policy: None,
137			primary_key_columns: None,
138		},
139	)
140	.unwrap();
141
142	result
143}
144
145/// Create a test StandardEngine with all required dependencies registered.
146pub fn create_test_engine() -> StandardEngine {
147	let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
148	let eventbus = EventBus::new(&actor_system);
149	let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
150	let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
151	let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
152	let runtime = SharedRuntime::from_config(
153		SharedRuntimeConfig::default()
154			.async_threads(2)
155			.compute_threads(2)
156			.compute_max_in_flight(32)
157			.mock_clock(1000),
158	);
159	let system_config = SystemConfig::new();
160	register_oracle_defaults(&system_config);
161	let multi = MultiTransaction::new(
162		multi_store.clone(),
163		single.clone(),
164		eventbus.clone(),
165		actor_system,
166		runtime.clock().clone(),
167		system_config,
168	)
169	.unwrap();
170
171	let mut ioc = IocContainer::new();
172
173	let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
174	ioc = ioc.register(materialized_catalog.clone());
175
176	let schema_registry = SchemaRegistry::new(single.clone());
177	ioc = ioc.register(schema_registry.clone());
178
179	ioc = ioc.register(runtime.clone());
180
181	let metrics_store = single_store.clone();
182
183	ioc = ioc.register(metrics_store.clone());
184
185	let metrics_worker = Arc::new(MetricsWorker::new(
186		MetricsWorkerConfig::default(),
187		metrics_store,
188		multi_store.clone(),
189		eventbus.clone(),
190	));
191	eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(metrics_worker.sender()));
192	eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
193	eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(metrics_worker.sender()));
194	ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
195
196	let cdc_store = CdcStore::memory();
197	ioc = ioc.register(cdc_store.clone());
198
199	let ioc_for_cdc = ioc.clone();
200
201	let engine = StandardEngine::new(
202		multi,
203		single,
204		eventbus.clone(),
205		InterceptorFactory::default(),
206		Catalog::new(materialized_catalog, schema_registry),
207		runtime.clock().clone(),
208		Functions::builder().build(),
209		Procedures::empty(),
210		Transforms::empty(),
211		ioc,
212	);
213
214	let cdc_handle = spawn_cdc_producer(
215		&runtime.actor_system(),
216		cdc_store,
217		multi_store.clone(),
218		engine.clone(),
219		eventbus.clone(),
220	);
221	eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
222		cdc_handle.actor_ref().clone(),
223		runtime.clock().clone(),
224	));
225	ioc_for_cdc.register_service::<Arc<ActorHandle<CdcProduceMsg>>>(Arc::new(cdc_handle));
226
227	engine
228}