Skip to main content

reifydb_transaction/multi/transaction/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4// This file includes and modifies code from the skipdb project (https://github.com/al8n/skipdb),
5// originally licensed under the Apache License, Version 2.0.
6// Original copyright:
7//   Copyright (c) 2024 Al Liu
8//
9// The original Apache License can be found at:
10//   http://www.apache.org/licenses/LICENSE-2.0
11
12use std::{ops::Deref, sync::Arc, time::Duration};
13
14use reifydb_core::{
15	common::CommitVersion,
16	encoded::key::EncodedKey,
17	event::EventBus,
18	interface::{
19		catalog::config::{ConfigKey, GetConfig},
20		store::{MultiVersionContains, MultiVersionGet},
21	},
22};
23use reifydb_runtime::{
24	actor::system::ActorSystem,
25	context::{
26		clock::{Clock, MockClock},
27		rng::Rng,
28	},
29	pool::{PoolConfig, Pools},
30};
31use reifydb_store_multi::MultiStore;
32use reifydb_type::{Result, util::hex, value::Value};
33use tracing::instrument;
34use version::{StandardVersionProvider, VersionProvider};
35
36pub(crate) use crate::multi::oracle::Oracle;
37use crate::{TransactionId, multi::types::*, single::SingleTransaction};
38
39pub mod manager;
40pub mod read;
41pub mod replica;
42pub(crate) mod version;
43pub mod write;
44
45use reifydb_store_single::SingleStore;
46
47use crate::multi::{
48	MultiReadTransaction, MultiReplicaTransaction, MultiWriteTransaction,
49	transaction::manager::TransactionManagerQuery,
50};
51
52pub struct TransactionManager<L>
53where
54	L: VersionProvider,
55{
56	inner: Arc<Oracle<L>>,
57}
58
59impl<L> Clone for TransactionManager<L>
60where
61	L: VersionProvider,
62{
63	fn clone(&self) -> Self {
64		Self {
65			inner: self.inner.clone(),
66		}
67	}
68}
69
70impl<L> TransactionManager<L>
71where
72	L: VersionProvider,
73{
74	#[instrument(
75		name = "transaction::manager::new",
76		level = "debug",
77		skip(clock, actor_system, metrics_clock, rng, config)
78	)]
79	pub fn new(
80		clock: L,
81		actor_system: ActorSystem,
82		metrics_clock: Clock,
83		rng: Rng,
84		config: Arc<dyn GetConfig>,
85	) -> Result<Self> {
86		let version = clock.next()?;
87		let oracle = Oracle::new(clock, actor_system, metrics_clock, rng, config);
88		oracle.query.mark_finished(version);
89		oracle.command.mark_finished(version);
90		Ok(Self {
91			inner: Arc::new(oracle),
92		})
93	}
94
95	/// Get the actor system
96	pub fn actor_system(&self) -> ActorSystem {
97		self.inner.actor_system()
98	}
99
100	/// Get the shared configuration.
101	pub fn config(&self) -> Arc<dyn GetConfig> {
102		self.inner.config()
103	}
104
105	/// Access the underlying oracle. Crate-private so the write/replica
106	/// transactions can read snapshot version, register on watermarks, and
107	/// invoke `new_commit` / `advance_unchecked` directly.
108	pub(crate) fn oracle(&self) -> &Arc<Oracle<L>> {
109		&self.inner
110	}
111
112	/// Clear the conflict detection window after bootstrap.
113	pub fn bootstrapping_completed(&self) {
114		self.inner.bootstrapping_completed();
115	}
116
117	#[instrument(name = "transaction::manager::version", level = "trace", skip(self))]
118	pub fn version(&self) -> Result<CommitVersion> {
119		self.inner.version()
120	}
121}
122
123impl<L> TransactionManager<L>
124where
125	L: VersionProvider,
126{
127	#[instrument(name = "transaction::manager::query", level = "debug", skip(self), fields(as_of_version = ?version))]
128	pub fn query(&self, version: Option<CommitVersion>) -> Result<TransactionManagerQuery<L>> {
129		let safe_version = self.inner.version()?;
130
131		Ok(if let Some(version) = version {
132			assert!(version <= safe_version);
133			TransactionManagerQuery::new_time_travel(
134				TransactionId::generate(self.inner.metrics_clock(), self.inner.rng()),
135				self.clone(),
136				version,
137			)
138		} else {
139			// Pair with `done_query(safe_version)` in TransactionManagerQuery::drop.
140			self.inner.query.register_in_flight(safe_version);
141			TransactionManagerQuery::new_current(
142				TransactionId::generate(self.inner.metrics_clock(), self.inner.rng()),
143				self.clone(),
144				safe_version,
145			)
146		})
147	}
148
149	/// Register a version with the command watermark before storage write.
150	/// Used by the replica applier to participate in the watermark system.
151	pub fn begin_commit(&self, version: CommitVersion) {
152		self.inner.command.register_in_flight(version);
153	}
154
155	/// Mark a commit version as done in the command watermark.
156	/// Used by the replica applier after storage write completes.
157	pub fn done_commit(&self, version: CommitVersion) {
158		self.inner.done_commit(version);
159	}
160
161	/// Advance the version provider's clock to at least the given version.
162	/// Used by the replica applier so that `clock.current()` returns
163	/// the latest replicated version for subsequent query transactions.
164	pub fn advance_clock_to(&self, version: CommitVersion) {
165		self.inner.clock.advance_to(version);
166	}
167
168	/// Returns the highest version where ALL prior versions have completed.
169	/// This is useful for CDC polling to know the safe upper bound for fetching
170	/// CDC events - all events up to this version are guaranteed to be in storage.
171	#[instrument(name = "transaction::manager::done_until", level = "trace", skip(self))]
172	pub fn done_until(&self) -> CommitVersion {
173		self.inner.command.done_until()
174	}
175
176	/// Wait for the watermark to reach the given version with a timeout.
177	/// Returns true if the watermark reached the target, false if timeout occurred.
178	#[instrument(name = "transaction::manager::wait_for_mark_timeout", level = "trace", skip(self))]
179	pub fn wait_for_mark_timeout(&self, version: CommitVersion, timeout: Duration) -> bool {
180		self.inner.command.wait_for_mark_timeout(version, timeout)
181	}
182
183	/// Advance the version state for replica replication.
184	///
185	/// This advances the watermark, the version provider counter, and the query
186	/// watermark so that queries can see replicated data. Must only be called
187	/// from the replica applier in sequential version order.
188	pub fn advance_version_for_replica(&self, version: CommitVersion) {
189		self.inner.advance_version_for_replica(version);
190		self.inner.command.advance_to(version);
191		self.inner.query.advance_to(version);
192	}
193}
194
195pub struct MultiTransaction(Arc<Inner>);
196
197pub struct Inner {
198	pub(crate) tm: TransactionManager<StandardVersionProvider>,
199	pub(crate) store: MultiStore,
200	pub(crate) event_bus: EventBus,
201}
202
203impl Deref for MultiTransaction {
204	type Target = Inner;
205
206	fn deref(&self) -> &Self::Target {
207		&self.0
208	}
209}
210
211impl Clone for MultiTransaction {
212	fn clone(&self) -> Self {
213		Self(self.0.clone())
214	}
215}
216
217impl Inner {
218	fn new(
219		store: MultiStore,
220		single: SingleTransaction,
221		event_bus: EventBus,
222		actor_system: ActorSystem,
223		metrics_clock: Clock,
224		rng: Rng,
225		config: Arc<dyn GetConfig>,
226	) -> Result<Self> {
227		let version_provider = StandardVersionProvider::new(single)?;
228		let tm = TransactionManager::new(version_provider, actor_system, metrics_clock, rng, config)?;
229
230		Ok(Self {
231			tm,
232			store,
233			event_bus,
234		})
235	}
236
237	fn version(&self) -> Result<CommitVersion> {
238		self.tm.version()
239	}
240
241	fn actor_system(&self) -> ActorSystem {
242		self.tm.actor_system()
243	}
244
245	fn bootstrapping_completed(&self) {
246		self.tm.bootstrapping_completed();
247	}
248}
249
250impl MultiTransaction {
251	pub fn testing() -> Self {
252		let multi_store = MultiStore::testing_memory();
253		let single_store = SingleStore::testing_memory();
254		let pools = Pools::new(PoolConfig::default());
255		let actor_system = ActorSystem::new(pools, Clock::Real);
256		let event_bus = EventBus::new(&actor_system);
257
258		struct DummyConfig;
259		impl GetConfig for DummyConfig {
260			fn get_config(&self, key: ConfigKey) -> Value {
261				key.default_value()
262			}
263			fn get_config_at(&self, key: ConfigKey, _version: CommitVersion) -> Value {
264				key.default_value()
265			}
266		}
267		let config = Arc::new(DummyConfig);
268
269		Self::new(
270			multi_store,
271			SingleTransaction::new(single_store, event_bus.clone()),
272			event_bus,
273			actor_system,
274			Clock::Mock(MockClock::from_millis(1000)),
275			Rng::seeded(42),
276			config,
277		)
278		.expect("failed to create testing MultiTransaction")
279	}
280}
281
282impl MultiTransaction {
283	#[instrument(
284		name = "transaction::new",
285		level = "debug",
286		skip(store, single, event_bus, actor_system, metrics_clock, rng, config)
287	)]
288	pub fn new(
289		store: MultiStore,
290		single: SingleTransaction,
291		event_bus: EventBus,
292		actor_system: ActorSystem,
293		metrics_clock: Clock,
294		rng: Rng,
295		config: Arc<dyn GetConfig>,
296	) -> Result<Self> {
297		Ok(Self(Arc::new(Inner::new(store, single, event_bus, actor_system, metrics_clock, rng, config)?)))
298	}
299
300	/// Get the actor system
301	pub fn actor_system(&self) -> ActorSystem {
302		self.0.actor_system()
303	}
304
305	/// Get the shared configuration from the oracle.
306	pub fn config(&self) -> Arc<dyn GetConfig> {
307		self.0.tm.config()
308	}
309
310	/// Clear the conflict detection window after bootstrap.
311	pub fn bootstrapping_completed(&self) {
312		self.0.bootstrapping_completed();
313	}
314}
315
316impl MultiTransaction {
317	#[instrument(name = "transaction::version", level = "trace", skip(self))]
318	pub fn version(&self) -> Result<CommitVersion> {
319		self.0.version()
320	}
321
322	#[instrument(name = "transaction::begin_query", level = "debug", skip(self))]
323	pub fn begin_query(&self) -> Result<MultiReadTransaction> {
324		MultiReadTransaction::new(self.clone(), None)
325	}
326
327	/// Begin a query transaction at a specific version.
328	///
329	/// This is used for parallel query execution where multiple tasks need to
330	/// read from the same snapshot (same CommitVersion) for consistency.
331	#[instrument(name = "transaction::begin_query_at_version", level = "debug", skip(self), fields(version = %version.0))]
332	pub fn begin_query_at_version(&self, version: CommitVersion) -> Result<MultiReadTransaction> {
333		MultiReadTransaction::new(self.clone(), Some(version))
334	}
335}
336
337impl MultiTransaction {
338	#[instrument(name = "transaction::begin_command", level = "debug", skip(self))]
339	pub fn begin_command(&self) -> Result<MultiWriteTransaction> {
340		MultiWriteTransaction::new(self.clone())
341	}
342
343	/// Begin a replica write transaction at the primary's exact version.
344	///
345	/// The returned transaction commits at the given version, bypassing
346	/// oracle conflict detection and version allocation.
347	#[instrument(name = "transaction::begin_replica", level = "debug", skip(self), fields(version = %version.0))]
348	pub fn begin_replica(&self, version: CommitVersion) -> Result<MultiReplicaTransaction> {
349		MultiReplicaTransaction::new(self.clone(), version)
350	}
351}
352
353pub enum TransactionType {
354	Query(MultiReadTransaction),
355	Command(Box<MultiWriteTransaction>),
356}
357
358impl MultiTransaction {
359	#[instrument(name = "transaction::get", level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
360	pub fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<Committed>> {
361		Ok(MultiVersionGet::get(&self.store, key, version)?.map(|sv| sv.into()))
362	}
363
364	#[instrument(name = "transaction::contains_key", level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
365	pub fn contains_key(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool> {
366		MultiVersionContains::contains(&self.store, key, version)
367	}
368
369	/// Get a reference to the underlying transaction store.
370	pub fn store(&self) -> &MultiStore {
371		&self.store
372	}
373}