Skip to main content

reifydb_cdc/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{sync::Arc, time::Duration};
5
6use reifydb_catalog::{cache::CatalogCache, catalog::Catalog};
7use reifydb_core::{
8	common::CommitVersion,
9	encoded::{key::EncodedKey, row::EncodedRow},
10	event::EventBus,
11};
12use reifydb_runtime::{
13	actor::system::ActorSystem,
14	context::{
15		clock::{Clock, MockClock},
16		rng::Rng,
17	},
18	pool::Pools,
19};
20use reifydb_store_multi::MultiStore;
21use reifydb_store_single::SingleStore;
22use reifydb_transaction::{
23	interceptor::interceptors::Interceptors,
24	multi::transaction::MultiTransaction,
25	single::SingleTransaction,
26	transaction::{command::CommandTransaction, query::QueryTransaction},
27};
28use reifydb_type::{Result, util::cowvec::CowVec, value::identity::IdentityId};
29
30use crate::consume::host::CdcHost;
31
32#[derive(Clone)]
33pub struct TestCdcHost {
34	multi: MultiTransaction,
35	single: SingleTransaction,
36	pub event_bus: EventBus,
37	pub catalog: Catalog,
38	pub clock: Clock,
39	pub mock: MockClock,
40}
41
42impl TestCdcHost {
43	pub fn with_clock(initial_nanos: u64) -> Self {
44		let multi_store = MultiStore::testing_memory();
45		let single_store = SingleStore::testing_memory();
46		let actor_system = ActorSystem::new(Pools::default(), Clock::Real);
47		let event_bus = EventBus::new(&actor_system);
48		let single = SingleTransaction::new(single_store, event_bus.clone());
49		let catalog_cache = CatalogCache::new();
50		let mock = MockClock::new(initial_nanos);
51		let clock = Clock::Mock(mock.clone());
52		let multi = MultiTransaction::new(
53			multi_store,
54			single.clone(),
55			event_bus.clone(),
56			actor_system,
57			clock.clone(),
58			Rng::seeded(42),
59			Arc::new(catalog_cache.clone()),
60		)
61		.unwrap();
62		Self {
63			multi,
64			single,
65			event_bus,
66			catalog: Catalog::new(catalog_cache),
67			clock,
68			mock,
69		}
70	}
71
72	pub fn new() -> Self {
73		Self::with_clock(1_000_000_000)
74	}
75}
76
77impl Default for TestCdcHost {
78	fn default() -> Self {
79		Self::new()
80	}
81}
82
83impl CdcHost for TestCdcHost {
84	fn begin_command(&self) -> Result<CommandTransaction> {
85		CommandTransaction::new(
86			self.multi.clone(),
87			self.single.clone(),
88			self.event_bus.clone(),
89			Interceptors::new(),
90			IdentityId::system(),
91			self.clock.clone(),
92		)
93	}
94
95	fn begin_query(&self) -> Result<QueryTransaction> {
96		Ok(QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), IdentityId::system()))
97	}
98
99	fn current_version(&self) -> Result<CommitVersion> {
100		Ok(CommitVersion(1))
101	}
102
103	fn done_until(&self) -> CommitVersion {
104		CommitVersion(1)
105	}
106
107	fn wait_for_mark_timeout(&self, _version: CommitVersion, _timeout: Duration) -> bool {
108		true
109	}
110
111	fn catalog(&self) -> &Catalog {
112		&self.catalog
113	}
114}
115
116pub fn make_key(s: &str) -> EncodedKey {
117	EncodedKey::new(s.as_bytes().to_vec())
118}
119
120pub fn make_row(s: &str) -> EncodedRow {
121	EncodedRow(CowVec::new(s.as_bytes().to_vec()))
122}