reifydb_transaction/multi/transaction/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4// This file includes and modifies code from the skipdb project (https://github.com/al8n/skipdb),
5// originally licensed under the Apache License, Version 2.0.
6// Original copyright:
7//   Copyright (c) 2024 Al Liu
8//
9// The original Apache License can be found at:
10//   http://www.apache.org/licenses/LICENSE-2.0
11
12use core::mem;
13use std::{ops::Deref, sync::Arc, time::Duration};
14
15pub use command::*;
16use oracle::*;
17use reifydb_core::{CommitVersion, EncodedKey, EncodedKeyRange, event::EventBus, interface::TransactionId};
18use reifydb_store_transaction::{
19	MultiVersionContains, MultiVersionGet, MultiVersionRange, MultiVersionRangeRev, TransactionStore,
20};
21use reifydb_type::util::hex;
22use tracing::instrument;
23use version::{StandardVersionProvider, VersionProvider};
24
25pub use crate::multi::types::*;
26use crate::single::{TransactionSingleVersion, TransactionSvl};
27
28mod command;
29mod command_tx;
30mod oracle;
31mod oracle_cleanup;
32pub mod query;
33mod query_tx;
34pub mod range;
35pub mod range_rev;
36mod version;
37
38pub use command_tx::CommandTransaction;
39pub use oracle::MAX_COMMITTED_TXNS;
40pub use query_tx::QueryTransaction;
41
42use crate::multi::{
43	AwaitWatermarkError, conflict::ConflictManager, pending::PendingWrites,
44	transaction::query::TransactionManagerQuery,
45};
46
47pub struct TransactionManager<L>
48where
49	L: VersionProvider,
50{
51	inner: Arc<Oracle<L>>,
52}
53
54impl<L> Clone for TransactionManager<L>
55where
56	L: VersionProvider,
57{
58	fn clone(&self) -> Self {
59		Self {
60			inner: self.inner.clone(),
61		}
62	}
63}
64
65impl<L> TransactionManager<L>
66where
67	L: VersionProvider,
68{
69	#[instrument(level = "debug", skip(self))]
70	pub fn write(&self) -> Result<TransactionManagerCommand<L>, reifydb_type::Error> {
71		Ok(TransactionManagerCommand {
72			id: TransactionId::generate(),
73			oracle: self.inner.clone(),
74			version: self.inner.version()?,
75			read_version: None,
76			size: 0,
77			count: 0,
78			conflicts: ConflictManager::new(),
79			pending_writes: PendingWrites::new(),
80			duplicates: Vec::new(),
81			discarded: false,
82			done_query: false,
83		})
84	}
85}
86
87impl<L> TransactionManager<L>
88where
89	L: VersionProvider,
90{
91	#[instrument(level = "debug", skip(clock))]
92	pub fn new(clock: L) -> crate::Result<Self> {
93		let version = clock.next()?;
94		Ok(Self {
95			inner: Arc::new({
96				let oracle = Oracle::new(clock);
97				oracle.query.done(version);
98				oracle.command.done(version);
99				oracle
100			}),
101		})
102	}
103
104	#[instrument(level = "trace", skip(self))]
105	pub fn version(&self) -> crate::Result<CommitVersion> {
106		self.inner.version()
107	}
108}
109
110impl<L> TransactionManager<L>
111where
112	L: VersionProvider,
113{
114	#[instrument(level = "trace", skip(self))]
115	pub fn discard_hint(&self) -> CommitVersion {
116		self.inner.discard_at_or_below()
117	}
118
119	#[instrument(level = "debug", skip(self), fields(as_of_version = ?version))]
120	pub fn query(&self, version: Option<CommitVersion>) -> crate::Result<TransactionManagerQuery<L>> {
121		Ok(if let Some(version) = version {
122			TransactionManagerQuery::new_time_travel(TransactionId::generate(), self.clone(), version)
123		} else {
124			TransactionManagerQuery::new_current(
125				TransactionId::generate(),
126				self.clone(),
127				self.inner.version()?,
128			)
129		})
130	}
131
132	/// Wait for the command watermark to reach the specified version.
133	/// Returns Ok(()) if the watermark reaches the version within the timeout,
134	/// or Err(AwaitWatermarkError) if the timeout expires.
135	///
136	/// This is useful for CDC polling to ensure all in-flight commits have
137	/// completed their storage writes before querying for CDC events.
138	#[instrument(level = "debug", skip(self))]
139	pub fn try_wait_for_watermark(
140		&self,
141		version: CommitVersion,
142		timeout: Duration,
143	) -> Result<(), AwaitWatermarkError> {
144		if self.inner.command.wait_for_mark_timeout(version, timeout) {
145			Ok(())
146		} else {
147			Err(AwaitWatermarkError {
148				version,
149				timeout,
150			})
151		}
152	}
153
154	/// Returns the highest version where ALL prior versions have completed.
155	/// This is useful for CDC polling to know the safe upper bound for fetching
156	/// CDC events - all events up to this version are guaranteed to be in storage.
157	#[instrument(level = "trace", skip(self))]
158	pub fn done_until(&self) -> CommitVersion {
159		self.inner.command.done_until()
160	}
161
162	/// Returns (query_done_until, command_done_until) for debugging watermark state.
163	pub fn watermarks(&self) -> (CommitVersion, CommitVersion) {
164		(self.inner.query.done_until(), self.inner.command.done_until())
165	}
166}
167
168// ============================================================================
169// Transaction - The main multi-version transaction type
170// ============================================================================
171
172pub struct Transaction(Arc<Inner>);
173
174pub struct Inner {
175	pub(crate) tm: TransactionManager<StandardVersionProvider>,
176	pub(crate) store: TransactionStore,
177	pub(crate) event_bus: EventBus,
178}
179
180impl Deref for Transaction {
181	type Target = Inner;
182
183	fn deref(&self) -> &Self::Target {
184		&self.0
185	}
186}
187
188impl Clone for Transaction {
189	fn clone(&self) -> Self {
190		Self(self.0.clone())
191	}
192}
193
194impl Inner {
195	fn new(store: TransactionStore, single: TransactionSingleVersion, event_bus: EventBus) -> Self {
196		let tm = TransactionManager::new(StandardVersionProvider::new(single).unwrap()).unwrap();
197
198		Self {
199			tm,
200			store,
201			event_bus,
202		}
203	}
204
205	fn version(&self) -> crate::Result<CommitVersion> {
206		self.tm.version()
207	}
208}
209
210impl Transaction {
211	pub fn testing() -> Self {
212		let store = TransactionStore::testing_memory();
213		let event_bus = EventBus::new();
214		Self::new(
215			store.clone(),
216			TransactionSingleVersion::SingleVersionLock(TransactionSvl::new(store, event_bus.clone())),
217			event_bus,
218		)
219	}
220}
221
222impl Transaction {
223	#[instrument(level = "debug", skip(store, single, event_bus))]
224	pub fn new(store: TransactionStore, single: TransactionSingleVersion, event_bus: EventBus) -> Self {
225		Self(Arc::new(Inner::new(store, single, event_bus)))
226	}
227}
228
229impl Transaction {
230	#[instrument(level = "trace", skip(self))]
231	pub fn version(&self) -> crate::Result<CommitVersion> {
232		self.0.version()
233	}
234
235	#[instrument(level = "debug", skip(self))]
236	pub fn begin_query(&self) -> crate::Result<QueryTransaction> {
237		QueryTransaction::new(self.clone(), None)
238	}
239}
240
241impl Transaction {
242	#[instrument(level = "debug", skip(self))]
243	pub fn begin_command(&self) -> crate::Result<CommandTransaction> {
244		CommandTransaction::new(self.clone())
245	}
246}
247
248pub enum TransactionType {
249	Query(QueryTransaction),
250	Command(CommandTransaction),
251}
252
253impl Transaction {
254	#[instrument(level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
255	pub fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<Committed>, reifydb_type::Error> {
256		Ok(self.store.get(key, version)?.map(|sv| sv.into()))
257	}
258
259	#[instrument(level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
260	pub fn contains_key(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool, reifydb_type::Error> {
261		self.store.contains(key, version)
262	}
263
264	#[instrument(level = "trace", skip(self), fields(version = version.0, batch_size = batch_size))]
265	pub fn range_batched(
266		&self,
267		range: EncodedKeyRange,
268		version: CommitVersion,
269		batch_size: u64,
270	) -> reifydb_type::Result<<TransactionStore as MultiVersionRange>::RangeIter<'_>> {
271		self.store.range_batched(range, version, batch_size)
272	}
273
274	pub fn range(
275		&self,
276		range: EncodedKeyRange,
277		version: CommitVersion,
278	) -> reifydb_type::Result<<TransactionStore as MultiVersionRange>::RangeIter<'_>> {
279		self.range_batched(range, version, 1024)
280	}
281
282	pub fn range_rev_batched(
283		&self,
284		range: EncodedKeyRange,
285		version: CommitVersion,
286		batch_size: u64,
287	) -> reifydb_type::Result<<TransactionStore as MultiVersionRangeRev>::RangeIterRev<'_>> {
288		self.store.range_rev_batched(range, version, batch_size)
289	}
290
291	pub fn range_rev(
292		&self,
293		range: EncodedKeyRange,
294		version: CommitVersion,
295	) -> reifydb_type::Result<<TransactionStore as MultiVersionRangeRev>::RangeIterRev<'_>> {
296		self.range_rev_batched(range, version, 1024)
297	}
298}