reifydb_transaction/multi/transaction/
mod.rs1use std::{ops::Deref, sync::Arc, time::Duration};
13
14use reifydb_core::{
15 common::CommitVersion,
16 encoded::key::EncodedKey,
17 event::EventBus,
18 interface::{
19 catalog::config::{ConfigKey, GetConfig},
20 store::{MultiVersionContains, MultiVersionGet},
21 },
22};
23use reifydb_runtime::{
24 actor::system::ActorSystem,
25 context::{
26 clock::{Clock, MockClock},
27 rng::Rng,
28 },
29 pool::{PoolConfig, Pools},
30};
31use reifydb_store_multi::MultiStore;
32use reifydb_type::{Result, util::hex, value::Value};
33use tracing::instrument;
34use version::{StandardVersionProvider, VersionProvider};
35
36pub(crate) use crate::multi::oracle::Oracle;
37use crate::{TransactionId, multi::types::*, single::SingleTransaction};
38
39pub mod manager;
40pub mod read;
41pub mod replica;
42pub(crate) mod version;
43pub mod write;
44
45use reifydb_store_single::SingleStore;
46
47use crate::multi::{
48 MultiReadTransaction, MultiReplicaTransaction, MultiWriteTransaction,
49 transaction::manager::TransactionManagerQuery,
50};
51
52pub struct TransactionManager<L>
53where
54 L: VersionProvider,
55{
56 inner: Arc<Oracle<L>>,
57}
58
59impl<L> Clone for TransactionManager<L>
60where
61 L: VersionProvider,
62{
63 fn clone(&self) -> Self {
64 Self {
65 inner: self.inner.clone(),
66 }
67 }
68}
69
70impl<L> TransactionManager<L>
71where
72 L: VersionProvider,
73{
74 #[instrument(
75 name = "transaction::manager::new",
76 level = "debug",
77 skip(clock, actor_system, metrics_clock, rng, config)
78 )]
79 pub fn new(
80 clock: L,
81 actor_system: ActorSystem,
82 metrics_clock: Clock,
83 rng: Rng,
84 config: Arc<dyn GetConfig>,
85 ) -> Result<Self> {
86 let version = clock.next()?;
87 let oracle = Oracle::new(clock, actor_system, metrics_clock, rng, config);
88 oracle.query.mark_finished(version);
89 oracle.command.mark_finished(version);
90 Ok(Self {
91 inner: Arc::new(oracle),
92 })
93 }
94
95 pub fn actor_system(&self) -> ActorSystem {
97 self.inner.actor_system()
98 }
99
100 pub fn config(&self) -> Arc<dyn GetConfig> {
102 self.inner.config()
103 }
104
105 pub(crate) fn oracle(&self) -> &Arc<Oracle<L>> {
109 &self.inner
110 }
111
112 pub fn bootstrapping_completed(&self) {
114 self.inner.bootstrapping_completed();
115 }
116
117 #[instrument(name = "transaction::manager::version", level = "trace", skip(self))]
118 pub fn version(&self) -> Result<CommitVersion> {
119 self.inner.version()
120 }
121}
122
123impl<L> TransactionManager<L>
124where
125 L: VersionProvider,
126{
127 #[instrument(name = "transaction::manager::query", level = "debug", skip(self), fields(as_of_version = ?version))]
128 pub fn query(&self, version: Option<CommitVersion>) -> Result<TransactionManagerQuery<L>> {
129 let safe_version = self.inner.version()?;
130
131 Ok(if let Some(version) = version {
132 assert!(version <= safe_version);
133 TransactionManagerQuery::new_time_travel(
134 TransactionId::generate(self.inner.metrics_clock(), self.inner.rng()),
135 self.clone(),
136 version,
137 )
138 } else {
139 self.inner.query.register_in_flight(safe_version);
141 TransactionManagerQuery::new_current(
142 TransactionId::generate(self.inner.metrics_clock(), self.inner.rng()),
143 self.clone(),
144 safe_version,
145 )
146 })
147 }
148
149 pub fn begin_commit(&self, version: CommitVersion) {
152 self.inner.command.register_in_flight(version);
153 }
154
155 pub fn done_commit(&self, version: CommitVersion) {
158 self.inner.done_commit(version);
159 }
160
161 pub fn advance_clock_to(&self, version: CommitVersion) {
165 self.inner.clock.advance_to(version);
166 }
167
168 #[instrument(name = "transaction::manager::done_until", level = "trace", skip(self))]
172 pub fn done_until(&self) -> CommitVersion {
173 self.inner.command.done_until()
174 }
175
176 #[instrument(name = "transaction::manager::wait_for_mark_timeout", level = "trace", skip(self))]
179 pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
180 self.inner.command.wait_for_mark_timeout(version, timeout)
181 }
182
183 pub fn advance_version_for_replica(&self, version: CommitVersion) {
189 self.inner.advance_version_for_replica(version);
190 self.inner.command.advance_to(version);
191 self.inner.query.advance_to(version);
192 }
193}
194
195pub struct MultiTransaction(Arc<Inner>);
196
197pub struct Inner {
198 pub(crate) tm: TransactionManager<StandardVersionProvider>,
199 pub(crate) store: MultiStore,
200 pub(crate) event_bus: EventBus,
201}
202
203impl Deref for MultiTransaction {
204 type Target = Inner;
205
206 fn deref(&self) -> &Self::Target {
207 &self.0
208 }
209}
210
211impl Clone for MultiTransaction {
212 fn clone(&self) -> Self {
213 Self(self.0.clone())
214 }
215}
216
217impl Inner {
218 fn new(
219 store: MultiStore,
220 single: SingleTransaction,
221 event_bus: EventBus,
222 actor_system: ActorSystem,
223 metrics_clock: Clock,
224 rng: Rng,
225 config: Arc<dyn GetConfig>,
226 ) -> Result<Self> {
227 let version_provider = StandardVersionProvider::new(single)?;
228 let tm = TransactionManager::new(version_provider, actor_system, metrics_clock, rng, config)?;
229
230 Ok(Self {
231 tm,
232 store,
233 event_bus,
234 })
235 }
236
237 fn version(&self) -> Result<CommitVersion> {
238 self.tm.version()
239 }
240
241 fn actor_system(&self) -> ActorSystem {
242 self.tm.actor_system()
243 }
244
245 fn bootstrapping_completed(&self) {
246 self.tm.bootstrapping_completed();
247 }
248}
249
250impl MultiTransaction {
251 pub fn testing() -> Self {
252 let multi_store = MultiStore::testing_memory();
253 let single_store = SingleStore::testing_memory();
254 let pools = Pools::new(PoolConfig::default());
255 let actor_system = ActorSystem::new(pools, Clock::Real);
256 let event_bus = EventBus::new(&actor_system);
257
258 struct DummyConfig;
259 impl GetConfig for DummyConfig {
260 fn get_config(&self, key: ConfigKey) -> Value {
261 key.default_value()
262 }
263 fn get_config_at(&self, key: ConfigKey, _version: CommitVersion) -> Value {
264 key.default_value()
265 }
266 }
267 let config = Arc::new(DummyConfig);
268
269 Self::new(
270 multi_store,
271 SingleTransaction::new(single_store, event_bus.clone()),
272 event_bus,
273 actor_system,
274 Clock::Mock(MockClock::from_millis(1000)),
275 Rng::seeded(42),
276 config,
277 )
278 .expect("failed to create testing MultiTransaction")
279 }
280}
281
282impl MultiTransaction {
283 #[instrument(
284 name = "transaction::new",
285 level = "debug",
286 skip(store, single, event_bus, actor_system, metrics_clock, rng, config)
287 )]
288 pub fn new(
289 store: MultiStore,
290 single: SingleTransaction,
291 event_bus: EventBus,
292 actor_system: ActorSystem,
293 metrics_clock: Clock,
294 rng: Rng,
295 config: Arc<dyn GetConfig>,
296 ) -> Result<Self> {
297 Ok(Self(Arc::new(Inner::new(store, single, event_bus, actor_system, metrics_clock, rng, config)?)))
298 }
299
300 pub fn actor_system(&self) -> ActorSystem {
302 self.0.actor_system()
303 }
304
305 pub fn config(&self) -> Arc<dyn GetConfig> {
307 self.0.tm.config()
308 }
309
310 pub fn bootstrapping_completed(&self) {
312 self.0.bootstrapping_completed();
313 }
314}
315
316impl MultiTransaction {
317 #[instrument(name = "transaction::version", level = "trace", skip(self))]
318 pub fn version(&self) -> Result<CommitVersion> {
319 self.0.version()
320 }
321
322 #[instrument(name = "transaction::begin_query", level = "debug", skip(self))]
323 pub fn begin_query(&self) -> Result<MultiReadTransaction> {
324 MultiReadTransaction::new(self.clone(), None)
325 }
326
327 #[instrument(name = "transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0))]
332 pub fn begin_query_at_version(&self, version: CommitVersion) -> Result<MultiReadTransaction> {
333 MultiReadTransaction::new(self.clone(), Some(version))
334 }
335}
336
337impl MultiTransaction {
338 #[instrument(name = "transaction::begin_command", level = "debug", skip(self))]
339 pub fn begin_command(&self) -> Result<MultiWriteTransaction> {
340 MultiWriteTransaction::new(self.clone())
341 }
342
343 #[instrument(name = "transaction::begin_replica", level = "debug", skip(self), fields(version = %version.0))]
348 pub fn begin_replica(&self, version: CommitVersion) -> Result<MultiReplicaTransaction> {
349 MultiReplicaTransaction::new(self.clone(), version)
350 }
351}
352
353pub enum TransactionType {
354 Query(MultiReadTransaction),
355 Command(Box<MultiWriteTransaction>),
356}
357
358impl MultiTransaction {
359 #[instrument(name = "transaction::get", level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
360 pub fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<Committed>> {
361 Ok(MultiVersionGet::get(&self.store, key, version)?.map(|sv| sv.into()))
362 }
363
364 #[instrument(name = "transaction::contains_key", level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
365 pub fn contains_key(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
366 MultiVersionContains::contains(&self.store, key, version)
367 }
368
369 pub fn store(&self) -> &MultiStore {
371 &self.store
372 }
373}