Skip to main content

reifydb_engine/
test_utils.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_catalog::{
7	catalog::{
8		Catalog,
9		namespace::NamespaceToCreate,
10		table::{TableColumnToCreate, TableToCreate},
11	},
12	materialized::MaterializedCatalog,
13	schema::SchemaRegistry,
14};
15use reifydb_cdc::{
16	produce::producer::{CdcProducerEventListener, spawn_cdc_producer},
17	storage::CdcStore,
18};
19use reifydb_core::{
20	event::{
21		EventBus,
22		metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
23		transaction::PostCommitEvent,
24	},
25	util::ioc::IocContainer,
26};
27use reifydb_function::registry::Functions;
28use reifydb_metric::worker::{
29	CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
30};
31use reifydb_runtime::{
32	SharedRuntime, SharedRuntimeConfig,
33	actor::system::{ActorSystem, ActorSystemConfig},
34	clock::Clock,
35};
36use reifydb_store_multi::MultiStore;
37use reifydb_store_single::SingleStore;
38use reifydb_transaction::{
39	interceptor::{factory::StandardInterceptorFactory, interceptors::Interceptors},
40	multi::transaction::MultiTransaction,
41	single::SingleTransaction,
42	transaction::{admin::AdminTransaction, command::CommandTransaction},
43};
44use reifydb_type::{
45	fragment::Fragment,
46	value::{constraint::TypeConstraint, r#type::Type},
47};
48
49use crate::engine::StandardEngine;
50
51pub fn create_test_admin_transaction() -> AdminTransaction {
52	let multi_store = MultiStore::testing_memory();
53	let single_store = SingleStore::testing_memory();
54
55	let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
56	let event_bus = EventBus::new(&actor_system);
57	let single = SingleTransaction::new(single_store, event_bus.clone());
58	let multi =
59		MultiTransaction::new(multi_store, single.clone(), event_bus.clone(), actor_system, Clock::default())
60			.unwrap();
61
62	AdminTransaction::new(multi, single, event_bus, Interceptors::new()).unwrap()
63}
64
65pub fn create_test_admin_transaction_with_internal_schema() -> AdminTransaction {
66	let multi_store = MultiStore::testing_memory();
67	let single_store = SingleStore::testing_memory();
68
69	let actor_system = ActorSystem::new(ActorSystemConfig {
70		pool_threads: 1,
71		max_in_flight: 1,
72	});
73	let event_bus = EventBus::new(&actor_system);
74	let single = SingleTransaction::new(single_store, event_bus.clone());
75	let multi =
76		MultiTransaction::new(multi_store, single.clone(), event_bus.clone(), actor_system, Clock::default())
77			.unwrap();
78	let mut result = AdminTransaction::new(multi, single.clone(), event_bus.clone(), Interceptors::new()).unwrap();
79
80	let materialized_catalog = MaterializedCatalog::new();
81	let schema_registry = SchemaRegistry::new(single);
82	let catalog = Catalog::new(materialized_catalog, schema_registry);
83
84	let namespace = catalog
85		.create_namespace(
86			&mut result,
87			NamespaceToCreate {
88				namespace_fragment: None,
89				name: "reifydb".to_string(),
90				parent_id: reifydb_core::interface::catalog::id::NamespaceId::ROOT,
91			},
92		)
93		.unwrap();
94
95	catalog.create_table(
96		&mut result,
97		TableToCreate {
98			name: Fragment::internal("flows"),
99			namespace: namespace.id,
100			columns: vec![
101				TableColumnToCreate {
102					name: Fragment::internal("id"),
103					fragment: Fragment::None,
104					constraint: TypeConstraint::unconstrained(Type::Int8),
105					policies: vec![],
106					auto_increment: true,
107					dictionary_id: None,
108				},
109				TableColumnToCreate {
110					name: Fragment::internal("data"),
111					fragment: Fragment::None,
112					constraint: TypeConstraint::unconstrained(Type::Blob),
113					policies: vec![],
114					auto_increment: false,
115					dictionary_id: None,
116				},
117			],
118			retention_policy: None,
119			primary_key_columns: None,
120		},
121	)
122	.unwrap();
123
124	result
125}
126
127/// Create a test StandardEngine with all required dependencies registered.
128pub fn create_test_engine() -> StandardEngine {
129	let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
130	let eventbus = EventBus::new(&actor_system);
131	let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
132	let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
133	let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
134	let runtime = SharedRuntime::from_config(
135		SharedRuntimeConfig::default()
136			.async_threads(2)
137			.compute_threads(2)
138			.compute_max_in_flight(32)
139			.mock_clock(1000),
140	);
141	let multi = MultiTransaction::new(
142		multi_store.clone(),
143		single.clone(),
144		eventbus.clone(),
145		actor_system,
146		runtime.clock().clone(),
147	)
148	.unwrap();
149
150	let mut ioc = IocContainer::new();
151
152	let materialized_catalog = MaterializedCatalog::new();
153	ioc = ioc.register(materialized_catalog.clone());
154
155	let schema_registry = SchemaRegistry::new(single.clone());
156	ioc = ioc.register(schema_registry.clone());
157
158	ioc = ioc.register(runtime.clone());
159
160	let metrics_store = single_store.clone();
161
162	ioc = ioc.register(metrics_store.clone());
163
164	let metrics_worker = Arc::new(MetricsWorker::new(
165		MetricsWorkerConfig::default(),
166		metrics_store,
167		multi_store.clone(),
168		eventbus.clone(),
169	));
170	eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(metrics_worker.sender()));
171	eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
172	eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(metrics_worker.sender()));
173	ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
174
175	let cdc_store = CdcStore::memory();
176	ioc = ioc.register(cdc_store.clone());
177
178	let ioc_for_cdc = ioc.clone();
179
180	let engine = StandardEngine::new(
181		multi,
182		single,
183		eventbus.clone(),
184		Box::new(StandardInterceptorFactory::default()),
185		Catalog::new(materialized_catalog, schema_registry),
186		runtime.clock().clone(),
187		Functions::builder().build(),
188		crate::transform::registry::Transforms::empty(),
189		ioc,
190	);
191
192	let cdc_handle = spawn_cdc_producer(
193		&runtime.actor_system(),
194		cdc_store,
195		multi_store.clone(),
196		engine.clone(),
197		eventbus.clone(),
198	);
199	eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
200		cdc_handle.actor_ref().clone(),
201		runtime.clock().clone(),
202	));
203	ioc_for_cdc
204		.register_service::<Arc<reifydb_runtime::actor::system::ActorHandle<reifydb_cdc::produce::producer::CdcProduceMsg>>>(
205			Arc::new(cdc_handle),
206		);
207
208	engine
209}