1use std::sync::Arc;
5
6use reifydb_catalog::{
7 catalog::{
8 Catalog,
9 namespace::NamespaceToCreate,
10 table::{TableColumnToCreate, TableToCreate},
11 },
12 materialized::MaterializedCatalog,
13 schema::SchemaRegistry,
14};
15use reifydb_cdc::{
16 produce::producer::{CdcProduceMsg, CdcProducerEventListener, spawn_cdc_producer},
17 storage::CdcStore,
18};
19use reifydb_core::{
20 config::SystemConfig,
21 event::{
22 EventBus,
23 metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
24 transaction::PostCommitEvent,
25 },
26 interface::catalog::id::NamespaceId,
27 util::ioc::IocContainer,
28};
29use reifydb_function::registry::Functions;
30use reifydb_metric::worker::{
31 CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
32};
33use reifydb_runtime::{
34 SharedRuntime, SharedRuntimeConfig,
35 actor::system::{ActorHandle, ActorSystem, ActorSystemConfig},
36 clock::Clock,
37};
38use reifydb_store_multi::MultiStore;
39use reifydb_store_single::SingleStore;
40use reifydb_transaction::{
41 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
42 multi::transaction::{MultiTransaction, register_oracle_defaults},
43 single::SingleTransaction,
44 transaction::{admin::AdminTransaction, command::CommandTransaction},
45};
46use reifydb_type::{
47 fragment::Fragment,
48 value::{constraint::TypeConstraint, r#type::Type},
49};
50
51use crate::{engine::StandardEngine, procedure::registry::Procedures, transform::registry::Transforms};
52
53pub fn create_test_admin_transaction() -> AdminTransaction {
54 let multi_store = MultiStore::testing_memory();
55 let single_store = SingleStore::testing_memory();
56
57 let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
58 let event_bus = EventBus::new(&actor_system);
59 let single = SingleTransaction::new(single_store, event_bus.clone());
60 let system_config = SystemConfig::new();
61 register_oracle_defaults(&system_config);
62 let multi = MultiTransaction::new(
63 multi_store,
64 single.clone(),
65 event_bus.clone(),
66 actor_system,
67 Clock::default(),
68 system_config,
69 )
70 .unwrap();
71
72 AdminTransaction::new(multi, single, event_bus, Interceptors::new()).unwrap()
73}
74
75pub fn create_test_admin_transaction_with_internal_schema() -> AdminTransaction {
76 let multi_store = MultiStore::testing_memory();
77 let single_store = SingleStore::testing_memory();
78
79 let actor_system = ActorSystem::new(ActorSystemConfig {
80 pool_threads: 1,
81 max_in_flight: 1,
82 });
83 let event_bus = EventBus::new(&actor_system);
84 let single = SingleTransaction::new(single_store, event_bus.clone());
85 let system_config = SystemConfig::new();
86 register_oracle_defaults(&system_config);
87 let multi = MultiTransaction::new(
88 multi_store,
89 single.clone(),
90 event_bus.clone(),
91 actor_system,
92 Clock::default(),
93 system_config,
94 )
95 .unwrap();
96 let mut result = AdminTransaction::new(multi, single.clone(), event_bus.clone(), Interceptors::new()).unwrap();
97
98 let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
99 let schema_registry = SchemaRegistry::new(single);
100 let catalog = Catalog::new(materialized_catalog, schema_registry);
101
102 let namespace = catalog
103 .create_namespace(
104 &mut result,
105 NamespaceToCreate {
106 namespace_fragment: None,
107 name: "reifydb".to_string(),
108 parent_id: NamespaceId::ROOT,
109 },
110 )
111 .unwrap();
112
113 catalog.create_table(
114 &mut result,
115 TableToCreate {
116 name: Fragment::internal("flows"),
117 namespace: namespace.id,
118 columns: vec![
119 TableColumnToCreate {
120 name: Fragment::internal("id"),
121 fragment: Fragment::None,
122 constraint: TypeConstraint::unconstrained(Type::Int8),
123 properties: vec![],
124 auto_increment: true,
125 dictionary_id: None,
126 },
127 TableColumnToCreate {
128 name: Fragment::internal("data"),
129 fragment: Fragment::None,
130 constraint: TypeConstraint::unconstrained(Type::Blob),
131 properties: vec![],
132 auto_increment: false,
133 dictionary_id: None,
134 },
135 ],
136 retention_policy: None,
137 primary_key_columns: None,
138 },
139 )
140 .unwrap();
141
142 result
143}
144
145pub fn create_test_engine() -> StandardEngine {
147 let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
148 let eventbus = EventBus::new(&actor_system);
149 let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
150 let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
151 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
152 let runtime = SharedRuntime::from_config(
153 SharedRuntimeConfig::default()
154 .async_threads(2)
155 .compute_threads(2)
156 .compute_max_in_flight(32)
157 .mock_clock(1000),
158 );
159 let system_config = SystemConfig::new();
160 register_oracle_defaults(&system_config);
161 let multi = MultiTransaction::new(
162 multi_store.clone(),
163 single.clone(),
164 eventbus.clone(),
165 actor_system,
166 runtime.clock().clone(),
167 system_config,
168 )
169 .unwrap();
170
171 let mut ioc = IocContainer::new();
172
173 let materialized_catalog = MaterializedCatalog::new(SystemConfig::new());
174 ioc = ioc.register(materialized_catalog.clone());
175
176 let schema_registry = SchemaRegistry::new(single.clone());
177 ioc = ioc.register(schema_registry.clone());
178
179 ioc = ioc.register(runtime.clone());
180
181 let metrics_store = single_store.clone();
182
183 ioc = ioc.register(metrics_store.clone());
184
185 let metrics_worker = Arc::new(MetricsWorker::new(
186 MetricsWorkerConfig::default(),
187 metrics_store,
188 multi_store.clone(),
189 eventbus.clone(),
190 ));
191 eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(metrics_worker.sender()));
192 eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
193 eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(metrics_worker.sender()));
194 ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
195
196 let cdc_store = CdcStore::memory();
197 ioc = ioc.register(cdc_store.clone());
198
199 let ioc_for_cdc = ioc.clone();
200
201 let engine = StandardEngine::new(
202 multi,
203 single,
204 eventbus.clone(),
205 InterceptorFactory::default(),
206 Catalog::new(materialized_catalog, schema_registry),
207 runtime.clock().clone(),
208 Functions::builder().build(),
209 Procedures::empty(),
210 Transforms::empty(),
211 ioc,
212 );
213
214 let cdc_handle = spawn_cdc_producer(
215 &runtime.actor_system(),
216 cdc_store,
217 multi_store.clone(),
218 engine.clone(),
219 eventbus.clone(),
220 );
221 eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
222 cdc_handle.actor_ref().clone(),
223 runtime.clock().clone(),
224 ));
225 ioc_for_cdc.register_service::<Arc<ActorHandle<CdcProduceMsg>>>(Arc::new(cdc_handle));
226
227 engine
228}