1use std::{ops::Deref, sync::Arc};
5
6use reifydb_catalog::{
7 catalog::{
8 Catalog,
9 namespace::NamespaceToCreate,
10 table::{TableColumnToCreate, TableToCreate},
11 },
12 materialized::MaterializedCatalog,
13};
14use reifydb_cdc::{
15 produce::producer::{CdcProducerEventListener, spawn_cdc_producer},
16 storage::CdcStore,
17};
18use reifydb_core::{
19 actors::cdc::CdcProduceHandle,
20 event::{EventBus, transaction::PostCommitEvent},
21 interface::catalog::id::NamespaceId,
22 util::ioc::IocContainer,
23};
24use reifydb_extension::transform::registry::Transforms;
25use reifydb_routine::{function::default_functions, procedure::registry::Procedures};
26use reifydb_runtime::{
27 SharedRuntime, SharedRuntimeConfig,
28 actor::system::ActorSystem,
29 context::{
30 RuntimeContext,
31 clock::{Clock, MockClock},
32 rng::Rng,
33 },
34 pool::{PoolConfig, Pools},
35};
36use reifydb_store_multi::MultiStore;
37use reifydb_store_single::SingleStore;
38use reifydb_transaction::{
39 interceptor::{factory::InterceptorFactory, interceptors::Interceptors},
40 multi::transaction::MultiTransaction,
41 single::SingleTransaction,
42 transaction::admin::AdminTransaction,
43};
44use reifydb_type::{
45 fragment::Fragment,
46 params::Params,
47 value::{constraint::TypeConstraint, frame::frame::Frame, identity::IdentityId, r#type::Type},
48};
49
50use crate::{engine::StandardEngine, vm::services::EngineConfig};
51
52pub struct TestEngine {
53 engine: StandardEngine,
54}
55
56impl Default for TestEngine {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62impl TestEngine {
63 pub fn new() -> Self {
65 Self::builder().with_cdc().build()
66 }
67
68 pub fn builder() -> TestEngineBuilder {
70 TestEngineBuilder::default()
71 }
72
73 pub fn admin(&self, rql: &str) -> Vec<Frame> {
75 let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
76 if let Some(e) = r.error {
77 panic!("admin failed: {e:?}\nrql: {rql}")
78 }
79 r.frames
80 }
81
82 pub fn command(&self, rql: &str) -> Vec<Frame> {
84 let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
85 if let Some(e) = r.error {
86 panic!("command failed: {e:?}\nrql: {rql}")
87 }
88 r.frames
89 }
90
91 pub fn query(&self, rql: &str) -> Vec<Frame> {
93 let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
94 if let Some(e) = r.error {
95 panic!("query failed: {e:?}\nrql: {rql}")
96 }
97 r.frames
98 }
99
100 pub fn admin_err(&self, rql: &str) -> String {
102 let r = self.engine.admin_as(IdentityId::system(), rql, Params::None);
103 match r.error {
104 Some(e) => format!("{e:?}"),
105 None => panic!("Expected error but admin succeeded\nrql: {rql}"),
106 }
107 }
108
109 pub fn command_err(&self, rql: &str) -> String {
111 let r = self.engine.command_as(IdentityId::system(), rql, Params::None);
112 match r.error {
113 Some(e) => format!("{e:?}"),
114 None => panic!("Expected error but command succeeded\nrql: {rql}"),
115 }
116 }
117
118 pub fn query_err(&self, rql: &str) -> String {
120 let r = self.engine.query_as(IdentityId::system(), rql, Params::None);
121 match r.error {
122 Some(e) => format!("{e:?}"),
123 None => panic!("Expected error but query succeeded\nrql: {rql}"),
124 }
125 }
126
127 pub fn row_count(frames: &[Frame]) -> usize {
129 frames.first().map(|f| f.row_count()).unwrap_or(0)
130 }
131
132 pub fn identity() -> IdentityId {
134 IdentityId::system()
135 }
136
137 pub fn inner(&self) -> &StandardEngine {
139 &self.engine
140 }
141}
142
143impl Deref for TestEngine {
144 type Target = StandardEngine;
145
146 fn deref(&self) -> &StandardEngine {
147 &self.engine
148 }
149}
150
151#[derive(Default)]
152pub struct TestEngineBuilder {
153 cdc: bool,
154}
155
156impl TestEngineBuilder {
157 pub fn with_cdc(mut self) -> Self {
158 self.cdc = true;
159 self
160 }
161
162 pub fn build(self) -> TestEngine {
163 let pools = Pools::new(PoolConfig::default());
164 let actor_system = ActorSystem::new(pools, Clock::Real);
165 let eventbus = EventBus::new(&actor_system);
166 let multi_store = MultiStore::testing_memory_with_eventbus(eventbus.clone());
167 let single_store = SingleStore::testing_memory_with_eventbus(eventbus.clone());
168 let single = SingleTransaction::new(single_store.clone(), eventbus.clone());
169 let runtime = SharedRuntime::from_config(
170 SharedRuntimeConfig::default().async_threads(2).system_threads(2).query_threads(2).seeded(1000),
171 );
172 let materialized_catalog = MaterializedCatalog::new();
173 let multi = MultiTransaction::new(
174 multi_store.clone(),
175 single.clone(),
176 eventbus.clone(),
177 actor_system,
178 runtime.clock().clone(),
179 runtime.rng().clone(),
180 Arc::new(materialized_catalog.clone()),
181 )
182 .unwrap();
183
184 let mut ioc = IocContainer::new();
185
186 ioc = ioc.register(materialized_catalog.clone());
187
188 ioc = ioc.register(runtime.clone());
189 ioc = ioc.register(single_store.clone());
190
191 let cdc_store = CdcStore::memory();
192 ioc = ioc.register(cdc_store.clone());
193
194 let ioc_for_cdc = ioc.clone();
195
196 let engine = StandardEngine::new(
197 multi,
198 single.clone(),
199 eventbus.clone(),
200 InterceptorFactory::default(),
201 Catalog::new(materialized_catalog),
202 EngineConfig {
203 runtime_context: RuntimeContext::new(runtime.clock().clone(), runtime.rng().clone()),
204 functions: default_functions().configure(),
205 procedures: Procedures::empty(),
206 transforms: Transforms::empty(),
207 ioc,
208 #[cfg(not(reifydb_single_threaded))]
209 remote_registry: None,
210 },
211 );
212
213 if self.cdc {
214 let cdc_handle = spawn_cdc_producer(
215 &runtime.actor_system(),
216 cdc_store,
217 multi_store.clone(),
218 engine.clone(),
219 eventbus.clone(),
220 );
221 eventbus.register::<PostCommitEvent, _>(CdcProducerEventListener::new(
222 cdc_handle.actor_ref().clone(),
223 runtime.clock().clone(),
224 ));
225 ioc_for_cdc.register_service::<Arc<CdcProduceHandle>>(Arc::new(cdc_handle));
226 }
227
228 TestEngine {
229 engine,
230 }
231 }
232}
233
234pub fn create_test_admin_transaction() -> AdminTransaction {
235 let multi_store = MultiStore::testing_memory();
236 let single_store = SingleStore::testing_memory();
237
238 let pools = Pools::new(PoolConfig::default());
239 let actor_system = ActorSystem::new(pools, Clock::Real);
240 let event_bus = EventBus::new(&actor_system);
241 let single = SingleTransaction::new(single_store, event_bus.clone());
242 let multi = MultiTransaction::new(
243 multi_store,
244 single.clone(),
245 event_bus.clone(),
246 actor_system,
247 Clock::Mock(MockClock::from_millis(1000)),
248 Rng::seeded(42),
249 Arc::new(MaterializedCatalog::new()),
250 )
251 .unwrap();
252
253 AdminTransaction::new(
254 multi,
255 single,
256 event_bus,
257 Interceptors::new(),
258 IdentityId::system(),
259 Clock::Mock(MockClock::from_millis(1000)),
260 )
261 .unwrap()
262}
263
264pub fn create_test_admin_transaction_with_internal_shape() -> AdminTransaction {
265 let multi_store = MultiStore::testing_memory();
266 let single_store = SingleStore::testing_memory();
267
268 let pools = Pools::new(PoolConfig::default());
269 let actor_system = ActorSystem::new(pools, Clock::Real);
270 let event_bus = EventBus::new(&actor_system);
271 let single = SingleTransaction::new(single_store, event_bus.clone());
272 let multi = MultiTransaction::new(
273 multi_store,
274 single.clone(),
275 event_bus.clone(),
276 actor_system,
277 Clock::Mock(MockClock::from_millis(1000)),
278 Rng::seeded(42),
279 Arc::new(MaterializedCatalog::new()),
280 )
281 .unwrap();
282 let mut result = AdminTransaction::new(
283 multi,
284 single.clone(),
285 event_bus.clone(),
286 Interceptors::new(),
287 IdentityId::system(),
288 Clock::Mock(MockClock::from_millis(1000)),
289 )
290 .unwrap();
291
292 let materialized_catalog = MaterializedCatalog::new();
293 let catalog = Catalog::new(materialized_catalog);
294
295 let namespace = catalog
296 .create_namespace(
297 &mut result,
298 NamespaceToCreate {
299 namespace_fragment: None,
300 name: "reifydb".to_string(),
301 local_name: "reifydb".to_string(),
302 parent_id: NamespaceId::ROOT,
303 grpc: None,
304 token: None,
305 },
306 )
307 .unwrap();
308
309 catalog.create_table(
310 &mut result,
311 TableToCreate {
312 name: Fragment::internal("flows"),
313 namespace: namespace.id(),
314 columns: vec![
315 TableColumnToCreate {
316 name: Fragment::internal("id"),
317 fragment: Fragment::None,
318 constraint: TypeConstraint::unconstrained(Type::Int8),
319 properties: vec![],
320 auto_increment: true,
321 dictionary_id: None,
322 },
323 TableColumnToCreate {
324 name: Fragment::internal("data"),
325 fragment: Fragment::None,
326 constraint: TypeConstraint::unconstrained(Type::Blob),
327 properties: vec![],
328 auto_increment: false,
329 dictionary_id: None,
330 },
331 ],
332 retention_strategy: None,
333 primary_key_columns: None,
334 underlying: false,
335 },
336 )
337 .unwrap();
338
339 result
340}