1use std::sync::Arc;
5
6use reifydb_catalog::{
7 catalog::{
8 Catalog,
9 namespace::NamespaceToCreate,
10 table::{TableColumnToCreate, TableToCreate},
11 },
12 materialized::MaterializedCatalog,
13 schema::SchemaRegistry,
14};
15use reifydb_cdc::{
16 produce::producer::{CdcProduceMsg, CdcProducerEventListener, spawn_cdc_producer},
17 storage::CdcStore,
18};
19use reifydb_core::{
20 config::SystemConfig,
21 event::{
22 EventBus,
23 metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
24 transaction::PostCommitEvent,
25 },
26 interface::catalog::id::NamespaceId,
27 util::ioc::IocContainer,
28};
29use reifydb_function::registry::Functions;
30use reifydb_metric::worker::{
31 CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
32};
33use reifydb_runtime::{
34 SharedRuntime, SharedRuntimeConfig,
35 actor::system::{ActorHandle, ActorSystem, ActorSystemConfig},
36 clock::Clock,
37};
38use reifydb_store_multi::MultiStore;
39use reifydb_store_single::SingleStore;
40use reifydb_transaction::{
41 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
42 multi::transaction::{MultiTransaction, register_oracle_defaults},
43 single::SingleTransaction,
44 transaction::admin::AdminTransaction,
45};
46use reifydb_type::{
47 fragment::Fragment,
48 value::{constraint::TypeConstraint, r#type::Type},
49};
50
51use crate::{engine::StandardEngine, procedure::registry::Procedures, transform::registry::Transforms};
52
53pub fn create_test_admin_transaction() -> AdminTransaction {
54 let multi_store = MultiStore::testing_memory();
55 let single_store = SingleStore::testing_memory();
56
57 let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
58 let event_bus = EventBus::new(&actor_system);
59 let single = SingleTransaction::new(single_store, event_bus.clone());
60 let system_config = SystemConfig::new();
61 register_oracle_defaults(&system_config);
62 let multi = MultiTransaction::new(
63 multi_store,
64 single.clone(),
65 event_bus.clone(),
66 actor_system,
67 Clock::default(),
68 system_config,
69 )
70 .unwrap();
71
72 AdminTransaction::new(multi, single, event_bus, Interceptors::new()).unwrap()
73}
74
75pub fn create_test_admin_transaction_with_internal_schema() -> AdminTransaction {
76 let multi_store = MultiStore::testing_memory();
77 let single_store = SingleStore::testing_memory();
78
79 let actor_system = ActorSystem::new(ActorSystemConfig {
80 pool_threads: 1,
81 max_in_flight: 1,
82 });
83 let event_bus = EventBus::new(&actor_system);
84 let single = SingleTransaction::new(single_store, event_bus.clone());
85 let system_config = SystemConfig::new();
86 register_oracle_defaults(&system_config);
87 let multi = MultiTransaction::new(
88 multi_store,
89 single.clone(),
90 event_bus.clone(),
91 actor_system,
92 Clock::default(),
93 system_config,
94 )
95 .unwrap();
96 let mut result = AdminTransaction::new(multi, single.clone(), event_bus.clone(), Interceptors::new()).unwrap();
97
98 let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
99 let schema_registry = SchemaRegistry::new(single);
100 let catalog = Catalog::new(materialized_catalog, schema_registry);
101
102 let namespace = catalog
103 .create_namespace(
104 &mut result,
105 NamespaceToCreate {
106 namespace_fragment: None,
107 name: "reifydb".to_string(),
108 local_name: "reifydb".to_string(),
109 parent_id: NamespaceId::ROOT,
110 grpc: None,
111 },
112 )
113 .unwrap();
114
115 catalog.create_table(
116 &mut result,
117 TableToCreate {
118 name: Fragment::internal("flows"),
119 namespace: namespace.id(),
120 columns: vec![
121 TableColumnToCreate {
122 name: Fragment::internal("id"),
123 fragment: Fragment::None,
124 constraint: TypeConstraint::unconstrained(Type::Int8),
125 properties: vec![],
126 auto_increment: true,
127 dictionary_id: None,
128 },
129 TableColumnToCreate {
130 name: Fragment::internal("data"),
131 fragment: Fragment::None,
132 constraint: TypeConstraint::unconstrained(Type::Blob),
133 properties: vec![],
134 auto_increment: false,
135 dictionary_id: None,
136 },
137 ],
138 retention_policy: None,
139 primary_key_columns: None,
140 },
141 )
142 .unwrap();
143
144 result
145}
146
147pub fn create_test_engine() -> StandardEngine {
149 let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
150 let eventbus = EventBus::new(&actor_system);
151 let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
152 let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
153 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
154 let runtime = SharedRuntime::from_config(
155 SharedRuntimeConfig::default()
156 .async_threads(2)
157 .compute_threads(2)
158 .compute_max_in_flight(32)
159 .mock_clock(1000),
160 );
161 let system_config = SystemConfig::new();
162 register_oracle_defaults(&system_config);
163 let multi = MultiTransaction::new(
164 multi_store.clone(),
165 single.clone(),
166 eventbus.clone(),
167 actor_system,
168 runtime.clock().clone(),
169 system_config,
170 )
171 .unwrap();
172
173 let mut ioc = IocContainer::new();
174
175 let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
176 ioc = ioc.register(materialized_catalog.clone());
177
178 let schema_registry = SchemaRegistry::new(single.clone());
179 ioc = ioc.register(schema_registry.clone());
180
181 ioc = ioc.register(runtime.clone());
182
183 let metrics_store = single_store.clone();
184
185 ioc = ioc.register(metrics_store.clone());
186
187 let metrics_worker = Arc::new(MetricsWorker::new(
188 MetricsWorkerConfig::default(),
189 metrics_store,
190 multi_store.clone(),
191 eventbus.clone(),
192 ));
193 eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(metrics_worker.sender()));
194 eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
195 eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(metrics_worker.sender()));
196 ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
197
198 let cdc_store = CdcStore::memory();
199 ioc = ioc.register(cdc_store.clone());
200
201 let ioc_for_cdc = ioc.clone();
202
203 let engine = StandardEngine::new(
204 multi,
205 single,
206 eventbus.clone(),
207 InterceptorFactory::default(),
208 Catalog::new(materialized_catalog, schema_registry),
209 runtime.clock().clone(),
210 Functions::builder().build(),
211 Procedures::empty(),
212 Transforms::empty(),
213 ioc,
214 #[cfg(not(target_arch = "wasm32"))]
215 None,
216 );
217
218 let cdc_handle = spawn_cdc_producer(
219 &runtime.actor_system(),
220 cdc_store,
221 multi_store.clone(),
222 engine.clone(),
223 eventbus.clone(),
224 );
225 eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
226 cdc_handle.actor_ref().clone(),
227 runtime.clock().clone(),
228 ));
229 ioc_for_cdc.register_service::<Arc<ActorHandle<CdcProduceMsg>>>(Arc::new(cdc_handle));
230
231 engine
232}