1use std::{sync::Arc, time::Duration};
5
6use reifydb_catalog::{cache::CatalogCache, catalog::Catalog};
7use reifydb_core::{
8 common::CommitVersion,
9 encoded::{key::EncodedKey, row::EncodedRow},
10 event::EventBus,
11};
12use reifydb_runtime::{
13 actor::system::ActorSystem,
14 context::{
15 clock::{Clock, MockClock},
16 rng::Rng,
17 },
18 pool::Pools,
19};
20use reifydb_store_multi::MultiStore;
21use reifydb_store_single::SingleStore;
22use reifydb_transaction::{
23 interceptor::interceptors::Interceptors,
24 multi::transaction::MultiTransaction,
25 single::SingleTransaction,
26 transaction::{command::CommandTransaction, query::QueryTransaction},
27};
28use reifydb_type::{Result, util::cowvec::CowVec, value::identity::IdentityId};
29
30use crate::consume::host::CdcHost;
31
32#[derive(Clone)]
33pub struct TestCdcHost {
34 multi: MultiTransaction,
35 single: SingleTransaction,
36 pub event_bus: EventBus,
37 pub catalog: Catalog,
38 pub clock: Clock,
39 pub mock: MockClock,
40}
41
42impl TestCdcHost {
43 pub fn with_clock(initial_nanos: u64) -> Self {
44 let multi_store = MultiStore::testing_memory();
45 let single_store = SingleStore::testing_memory();
46 let actor_system = ActorSystem::new(Pools::default(), Clock::Real);
47 let event_bus = EventBus::new(&actor_system);
48 let single = SingleTransaction::new(single_store, event_bus.clone());
49 let catalog_cache = CatalogCache::new();
50 let mock = MockClock::new(initial_nanos);
51 let clock = Clock::Mock(mock.clone());
52 let multi = MultiTransaction::new(
53 multi_store,
54 single.clone(),
55 event_bus.clone(),
56 actor_system,
57 clock.clone(),
58 Rng::seeded(42),
59 Arc::new(catalog_cache.clone()),
60 )
61 .unwrap();
62 Self {
63 multi,
64 single,
65 event_bus,
66 catalog: Catalog::new(catalog_cache),
67 clock,
68 mock,
69 }
70 }
71
72 pub fn new() -> Self {
73 Self::with_clock(1_000_000_000)
74 }
75}
76
77impl Default for TestCdcHost {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83impl CdcHost for TestCdcHost {
84 fn begin_command(&self) -> Result<CommandTransaction> {
85 CommandTransaction::new(
86 self.multi.clone(),
87 self.single.clone(),
88 self.event_bus.clone(),
89 Interceptors::new(),
90 IdentityId::system(),
91 self.clock.clone(),
92 )
93 }
94
95 fn begin_query(&self) -> Result<QueryTransaction> {
96 Ok(QueryTransaction::new(self.multi.begin_query()?, self.single.clone(), IdentityId::system()))
97 }
98
99 fn current_version(&self) -> Result<CommitVersion> {
100 Ok(CommitVersion(1))
101 }
102
103 fn done_until(&self) -> CommitVersion {
104 CommitVersion(1)
105 }
106
107 fn wait_for_mark_timeout(&self, _version: CommitVersion, _timeout: Duration) -> bool {
108 true
109 }
110
111 fn catalog(&self) -> &Catalog {
112 &self.catalog
113 }
114}
115
116pub fn make_key(s: &str) -> EncodedKey {
117 EncodedKey::new(s.as_bytes().to_vec())
118}
119
120pub fn make_row(s: &str) -> EncodedRow {
121 EncodedRow(CowVec::new(s.as_bytes().to_vec()))
122}