1use std::sync::Arc;
5
6use reifydb_catalog::{
7 catalog::{
8 Catalog,
9 namespace::NamespaceToCreate,
10 table::{TableColumnToCreate, TableToCreate},
11 },
12 materialized::MaterializedCatalog,
13 schema::SchemaRegistry,
14};
15use reifydb_cdc::{
16 produce::producer::{CdcProducerEventListener, spawn_cdc_producer},
17 storage::CdcStore,
18};
19use reifydb_core::{
20 event::{
21 EventBus,
22 metric::{CdcStatsDroppedEvent, CdcStatsRecordedEvent, StorageStatsRecordedEvent},
23 transaction::PostCommitEvent,
24 },
25 util::ioc::IocContainer,
26};
27use reifydb_function::registry::Functions;
28use reifydb_metric::worker::{
29 CdcStatsDroppedListener, CdcStatsListener, MetricsWorker, MetricsWorkerConfig, StorageStatsListener,
30};
31use reifydb_runtime::{
32 SharedRuntime, SharedRuntimeConfig,
33 actor::system::{ActorSystem, ActorSystemConfig},
34 clock::Clock,
35};
36use reifydb_store_multi::MultiStore;
37use reifydb_store_single::SingleStore;
38use reifydb_transaction::{
39 interceptor::{factory::StandardInterceptorFactory, interceptors::Interceptors},
40 multi::transaction::MultiTransaction,
41 single::SingleTransaction,
42 transaction::{admin::AdminTransaction, command::CommandTransaction},
43};
44use reifydb_type::{
45 fragment::Fragment,
46 value::{constraint::TypeConstraint, r#type::Type},
47};
48
49use crate::engine::StandardEngine;
50
51pub fn create_test_admin_transaction() -> AdminTransaction {
52 let multi_store = MultiStore::testing_memory();
53 let single_store = SingleStore::testing_memory();
54
55 let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
56 let event_bus = EventBus::new(&actor_system);
57 let single = SingleTransaction::new(single_store, event_bus.clone());
58 let multi =
59 MultiTransaction::new(multi_store, single.clone(), event_bus.clone(), actor_system, Clock::default())
60 .unwrap();
61
62 AdminTransaction::new(multi, single, event_bus, Interceptors::new()).unwrap()
63}
64
65pub fn create_test_admin_transaction_with_internal_schema() -> AdminTransaction {
66 let multi_store = MultiStore::testing_memory();
67 let single_store = SingleStore::testing_memory();
68
69 let actor_system = ActorSystem::new(ActorSystemConfig {
70 pool_threads: 1,
71 max_in_flight: 1,
72 });
73 let event_bus = EventBus::new(&actor_system);
74 let single = SingleTransaction::new(single_store, event_bus.clone());
75 let multi =
76 MultiTransaction::new(multi_store, single.clone(), event_bus.clone(), actor_system, Clock::default())
77 .unwrap();
78 let mut result = AdminTransaction::new(multi, single.clone(), event_bus.clone(), Interceptors::new()).unwrap();
79
80 let materialized_catalog = MaterializedCatalog::new();
81 let schema_registry = SchemaRegistry::new(single);
82 let catalog = Catalog::new(materialized_catalog, schema_registry);
83
84 let namespace = catalog
85 .create_namespace(
86 &mut result,
87 NamespaceToCreate {
88 namespace_fragment: None,
89 name: "reifydb".to_string(),
90 parent_id: reifydb_core::interface::catalog::id::NamespaceId::ROOT,
91 },
92 )
93 .unwrap();
94
95 catalog.create_table(
96 &mut result,
97 TableToCreate {
98 name: Fragment::internal("flows"),
99 namespace: namespace.id,
100 columns: vec![
101 TableColumnToCreate {
102 name: Fragment::internal("id"),
103 fragment: Fragment::None,
104 constraint: TypeConstraint::unconstrained(Type::Int8),
105 policies: vec![],
106 auto_increment: true,
107 dictionary_id: None,
108 },
109 TableColumnToCreate {
110 name: Fragment::internal("data"),
111 fragment: Fragment::None,
112 constraint: TypeConstraint::unconstrained(Type::Blob),
113 policies: vec![],
114 auto_increment: false,
115 dictionary_id: None,
116 },
117 ],
118 retention_policy: None,
119 primary_key_columns: None,
120 },
121 )
122 .unwrap();
123
124 result
125}
126
127pub fn create_test_engine() -> StandardEngine {
129 let actor_system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
130 let eventbus = EventBus::new(&actor_system);
131 let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
132 let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
133 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
134 let runtime = SharedRuntime::from_config(
135 SharedRuntimeConfig::default()
136 .async_threads(2)
137 .compute_threads(2)
138 .compute_max_in_flight(32)
139 .mock_clock(1000),
140 );
141 let multi = MultiTransaction::new(
142 multi_store.clone(),
143 single.clone(),
144 eventbus.clone(),
145 actor_system,
146 runtime.clock().clone(),
147 )
148 .unwrap();
149
150 let mut ioc = IocContainer::new();
151
152 let materialized_catalog = MaterializedCatalog::new();
153 ioc = ioc.register(materialized_catalog.clone());
154
155 let schema_registry = SchemaRegistry::new(single.clone());
156 ioc = ioc.register(schema_registry.clone());
157
158 ioc = ioc.register(runtime.clone());
159
160 let metrics_store = single_store.clone();
161
162 ioc = ioc.register(metrics_store.clone());
163
164 let metrics_worker = Arc::new(MetricsWorker::new(
165 MetricsWorkerConfig::default(),
166 metrics_store,
167 multi_store.clone(),
168 eventbus.clone(),
169 ));
170 eventbus.register::<StorageStatsRecordedEvent, _>(StorageStatsListener::new(metrics_worker.sender()));
171 eventbus.register::<CdcStatsRecordedEvent, _>(CdcStatsListener::new(metrics_worker.sender()));
172 eventbus.register::<CdcStatsDroppedEvent, _>(CdcStatsDroppedListener::new(metrics_worker.sender()));
173 ioc.register_service::<Arc<MetricsWorker>>(metrics_worker);
174
175 let cdc_store = CdcStore::memory();
176 ioc = ioc.register(cdc_store.clone());
177
178 let ioc_for_cdc = ioc.clone();
179
180 let engine = StandardEngine::new(
181 multi,
182 single,
183 eventbus.clone(),
184 Box::new(StandardInterceptorFactory::default()),
185 Catalog::new(materialized_catalog, schema_registry),
186 runtime.clock().clone(),
187 Functions::builder().build(),
188 crate::transform::registry::Transforms::empty(),
189 ioc,
190 );
191
192 let cdc_handle = spawn_cdc_producer(
193 &runtime.actor_system(),
194 cdc_store,
195 multi_store.clone(),
196 engine.clone(),
197 eventbus.clone(),
198 );
199 eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
200 cdc_handle.actor_ref().clone(),
201 runtime.clock().clone(),
202 ));
203 ioc_for_cdc
204 .register_service::<Arc<reifydb_runtime::actor::system::ActorHandle<reifydb_cdc::produce::producer::CdcProduceMsg>>>(
205 Arc::new(cdc_handle),
206 );
207
208 engine
209}