Skip to main content

reifydb_cdc/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{sync::Arc, time::Duration};
5
6use reifydb_catalog::materialized::MaterializedCatalog;
7use reifydb_core::{
8	common::CommitVersion,
9	encoded::{key::EncodedKey, row::EncodedRow},
10	event::EventBus,
11};
12use reifydb_runtime::{
13	actor::system::ActorSystem,
14	context::{
15		clock::{Clock, MockClock},
16		rng::Rng,
17	},
18	pool::Pools,
19};
20use reifydb_store_multi::MultiStore;
21use reifydb_store_single::SingleStore;
22use reifydb_transaction::{
23	interceptor::interceptors::Interceptors,
24	multi::transaction::MultiTransaction,
25	single::SingleTransaction,
26	transaction::{command::CommandTransaction, query::QueryTransaction},
27};
28use reifydb_type::{Result, util::cowvec::CowVec, value::identity::IdentityId};
29
30use crate::consume::host::CdcHost;
31
32/// In-memory `CdcHost` for tests. Owns its own `MaterializedCatalog`, `EventBus` and a
33/// `Clock::Mock` (cloned `MockClock` accessible via the public `mock` field).
34#[derive(Clone)]
35pub struct TestCdcHost {
36	multi: MultiTransaction,
37	single: SingleTransaction,
38	pub event_bus: EventBus,
39	pub materialized_catalog: MaterializedCatalog,
40	pub clock: Clock,
41	pub mock: MockClock,
42}
43
44impl TestCdcHost {
45	/// Build a fresh host with the mock clock initialised to `initial_nanos`.
46	pub fn with_clock(initial_nanos: u64) -> Self {
47		let multi_store = MultiStore::testing_memory();
48		let single_store = SingleStore::testing_memory();
49		let actor_system = ActorSystem::new(Pools::default(), Clock::Real);
50		let event_bus = EventBus::new(&actor_system);
51		let single = SingleTransaction::new(single_store, event_bus.clone());
52		let materialized_catalog = MaterializedCatalog::new();
53		let mock = MockClock::new(initial_nanos);
54		let clock = Clock::Mock(mock.clone());
55		let multi = MultiTransaction::new(
56			multi_store,
57			single.clone(),
58			event_bus.clone(),
59			actor_system,
60			clock.clone(),
61			Rng::seeded(42),
62			Arc::new(materialized_catalog.clone()),
63		)
64		.unwrap();
65		Self {
66			multi,
67			single,
68			event_bus,
69			materialized_catalog,
70			clock,
71			mock,
72		}
73	}
74
75	/// Build a host with the mock clock initialized to 1 s past the epoch.
76	pub fn new() -> Self {
77		Self::with_clock(1_000_000_000)
78	}
79}
80
81impl Default for TestCdcHost {
82	fn default() -> Self {
83		Self::new()
84	}
85}
86
87impl CdcHost for TestCdcHost {
88	fn begin_command(&self) -> Result<CommandTransaction> {
89		CommandTransaction::new(
90			self.multi.clone(),
91			self.single.clone(),
92			self.event_bus.clone(),
93			Interceptors::new(),
94			IdentityId::system(),
95			self.clock.clone(),
96		)
97	}
98
99	fn begin_query(&self) -> Result<QueryTransaction> {
100		Ok(QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), IdentityId::system()))
101	}
102
103	fn current_version(&self) -> Result<CommitVersion> {
104		Ok(CommitVersion(1))
105	}
106
107	fn done_until(&self) -> CommitVersion {
108		CommitVersion(1)
109	}
110
111	fn wait_for_mark_timeout(&self, _version: CommitVersion, _timeout: Duration) -> bool {
112		true
113	}
114
115	fn materialized_catalog(&self) -> &MaterializedCatalog {
116		&self.materialized_catalog
117	}
118}
119
120/// Convenience: build an `EncodedKey` from a string.
121pub fn make_key(s: &str) -> EncodedKey {
122	EncodedKey(CowVec::new(s.as_bytes().to_vec()))
123}
124
125/// Convenience: build an `EncodedRow` from a string.
126pub fn make_row(s: &str) -> EncodedRow {
127	EncodedRow(CowVec::new(s.as_bytes().to_vec()))
128}