1use std::{sync::Arc, time::Duration};
5
6use reifydb_catalog::materialized::MaterializedCatalog;
7use reifydb_core::{
8 common::CommitVersion,
9 encoded::{key::EncodedKey, row::EncodedRow},
10 event::EventBus,
11};
12use reifydb_runtime::{
13 actor::system::ActorSystem,
14 context::{
15 clock::{Clock, MockClock},
16 rng::Rng,
17 },
18 pool::Pools,
19};
20use reifydb_store_multi::MultiStore;
21use reifydb_store_single::SingleStore;
22use reifydb_transaction::{
23 interceptor::interceptors::Interceptors,
24 multi::transaction::MultiTransaction,
25 single::SingleTransaction,
26 transaction::{command::CommandTransaction, query::QueryTransaction},
27};
28use reifydb_type::{Result, util::cowvec::CowVec, value::identity::IdentityId};
29
30use crate::consume::host::CdcHost;
31
32#[derive(Clone)]
35pub struct TestCdcHost {
36 multi: MultiTransaction,
37 single: SingleTransaction,
38 pub event_bus: EventBus,
39 pub materialized_catalog: MaterializedCatalog,
40 pub clock: Clock,
41 pub mock: MockClock,
42}
43
44impl TestCdcHost {
45 pub fn with_clock(initial_nanos: u64) -> Self {
47 let multi_store = MultiStore::testing_memory();
48 let single_store = SingleStore::testing_memory();
49 let actor_system = ActorSystem::new(Pools::default(), Clock::Real);
50 let event_bus = EventBus::new(&actor_system);
51 let single = SingleTransaction::new(single_store, event_bus.clone());
52 let materialized_catalog = MaterializedCatalog::new();
53 let mock = MockClock::new(initial_nanos);
54 let clock = Clock::Mock(mock.clone());
55 let multi = MultiTransaction::new(
56 multi_store,
57 single.clone(),
58 event_bus.clone(),
59 actor_system,
60 clock.clone(),
61 Rng::seeded(42),
62 Arc::new(materialized_catalog.clone()),
63 )
64 .unwrap();
65 Self {
66 multi,
67 single,
68 event_bus,
69 materialized_catalog,
70 clock,
71 mock,
72 }
73 }
74
75 pub fn new() -> Self {
77 Self::with_clock(1_000_000_000)
78 }
79}
80
81impl Default for TestCdcHost {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl CdcHost for TestCdcHost {
88 fn begin_command(&self) -> Result<CommandTransaction> {
89 CommandTransaction::new(
90 self.multi.clone(),
91 self.single.clone(),
92 self.event_bus.clone(),
93 Interceptors::new(),
94 IdentityId::system(),
95 self.clock.clone(),
96 )
97 }
98
99 fn begin_query(&self) -> Result<QueryTransaction> {
100 Ok(QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), IdentityId::system()))
101 }
102
103 fn current_version(&self) -> Result<CommitVersion> {
104 Ok(CommitVersion(1))
105 }
106
107 fn done_until(&self) -> CommitVersion {
108 CommitVersion(1)
109 }
110
111 fn wait_for_mark_timeout(&self, _version: CommitVersion, _timeout: Duration) -> bool {
112 true
113 }
114
115 fn materialized_catalog(&self) -> &MaterializedCatalog {
116 &self.materialized_catalog
117 }
118}
119
120pub fn make_key(s: &str) -> EncodedKey {
122 EncodedKey(CowVec::new(s.as_bytes().to_vec()))
123}
124
125pub fn make_row(s: &str) -> EncodedRow {
127 EncodedRow(CowVec::new(s.as_bytes().to_vec()))
128}