reifydb_transaction/multi/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::time::Duration;
5
6use reifydb_core::{
7	CommitVersion, EncodedKey, EncodedKeyRange, Error,
8	event::EventBus,
9	interface::{
10		BoxedMultiVersionIter, MultiVersionCommandTransaction, MultiVersionQueryTransaction,
11		MultiVersionTransaction, MultiVersionValues, TransactionId, WithEventBus,
12	},
13	value::encoded::EncodedValues,
14};
15use reifydb_store_transaction::TransactionStore;
16
17/// Error returned when waiting for watermark times out
18#[derive(Debug, Clone)]
19pub struct AwaitWatermarkError {
20	pub version: CommitVersion,
21	pub timeout: Duration,
22}
23
24impl std::fmt::Display for AwaitWatermarkError {
25	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26		write!(f, "Timeout waiting for watermark to reach version {} after {:?}", self.version.0, self.timeout)
27	}
28}
29
30impl std::error::Error for AwaitWatermarkError {}
31
32use crate::{
33	multi::{
34		pending::PendingWrites,
35		transaction::{
36			optimistic::{
37				CommandTransaction as OptimisticCommandTransaction,
38				QueryTransaction as OptimisticQueryTransaction, TransactionOptimistic,
39			},
40			serializable::{
41				CommandTransaction as SerializableCommandTransaction,
42				QueryTransaction as SerializableQueryTransaction, TransactionSerializable,
43			},
44		},
45	},
46	single::TransactionSingleVersion,
47};
48
49pub mod conflict;
50pub mod marker;
51pub mod optimistic;
52pub mod pending;
53pub mod serializable;
54pub mod transaction;
55pub mod types;
56pub mod watermark;
57
58#[repr(u8)]
59#[derive(Clone)]
60pub enum TransactionMultiVersion {
61	Optimistic(TransactionOptimistic) = 0,
62	Serializable(TransactionSerializable) = 1,
63}
64
65impl TransactionMultiVersion {
66	pub fn optimistic(store: TransactionStore, single: TransactionSingleVersion, bus: EventBus) -> Self {
67		Self::Optimistic(TransactionOptimistic::new(store, single, bus))
68	}
69
70	pub fn serializable(store: TransactionStore, single: TransactionSingleVersion, bus: EventBus) -> Self {
71		Self::Serializable(TransactionSerializable::new(store, single, bus))
72	}
73
74	/// Wait for the watermark to reach the specified version.
75	/// Returns Ok(()) if the watermark reaches the version within the timeout,
76	/// or Err(AwaitWatermarkError) if the timeout expires.
77	pub fn try_wait_for_watermark(
78		&self,
79		version: CommitVersion,
80		timeout: Duration,
81	) -> Result<(), AwaitWatermarkError> {
82		match self {
83			Self::Optimistic(t) => t.tm.try_wait_for_watermark(version, timeout),
84			Self::Serializable(t) => t.tm.try_wait_for_watermark(version, timeout),
85		}
86	}
87
88	/// Get the current version from the transaction manager
89	pub fn current_version(&self) -> crate::Result<CommitVersion> {
90		match self {
91			Self::Optimistic(t) => t.tm.version(),
92			Self::Serializable(t) => t.tm.version(),
93		}
94	}
95
96	/// Returns the highest version where ALL prior versions have completed.
97	/// This is useful for CDC polling to know the safe upper bound for fetching
98	/// CDC events - all events up to this version are guaranteed to be in storage.
99	pub fn done_until(&self) -> CommitVersion {
100		match self {
101			Self::Optimistic(t) => t.tm.done_until(),
102			Self::Serializable(t) => t.tm.done_until(),
103		}
104	}
105
106	/// Returns (query_done_until, command_done_until) for debugging watermark state.
107	pub fn watermarks(&self) -> (CommitVersion, CommitVersion) {
108		match self {
109			Self::Optimistic(t) => t.tm.watermarks(),
110			Self::Serializable(t) => t.tm.watermarks(),
111		}
112	}
113}
114
115pub enum StandardQueryTransaction {
116	Optimistic(OptimisticQueryTransaction),
117	Serializable(SerializableQueryTransaction),
118}
119
120pub enum StandardCommandTransaction {
121	Optimistic(OptimisticCommandTransaction),
122	Serializable(SerializableCommandTransaction),
123}
124
125impl WithEventBus for TransactionMultiVersion {
126	fn event_bus(&self) -> &EventBus {
127		match self {
128			TransactionMultiVersion::Optimistic(t) => t.event_bus(),
129			TransactionMultiVersion::Serializable(t) => t.event_bus(),
130		}
131	}
132}
133
134impl MultiVersionQueryTransaction for StandardQueryTransaction {
135	fn version(&self) -> CommitVersion {
136		match self {
137			StandardQueryTransaction::Optimistic(q) => q.version(),
138			StandardQueryTransaction::Serializable(q) => q.version(),
139		}
140	}
141
142	fn id(&self) -> TransactionId {
143		match self {
144			StandardQueryTransaction::Optimistic(q) => q.tm.id(),
145			StandardQueryTransaction::Serializable(q) => q.tm.id(),
146		}
147	}
148
149	fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>, Error> {
150		match self {
151			StandardQueryTransaction::Optimistic(q) => Ok(q.get(key)?),
152			StandardQueryTransaction::Serializable(q) => Ok(q.get(key)?),
153		}
154	}
155
156	fn contains_key(&mut self, key: &EncodedKey) -> Result<bool, Error> {
157		match self {
158			StandardQueryTransaction::Optimistic(q) => q.contains_key(key),
159			StandardQueryTransaction::Serializable(q) => q.contains_key(key),
160		}
161	}
162
163	fn range_batched(
164		&mut self,
165		range: EncodedKeyRange,
166		batch_size: u64,
167	) -> Result<BoxedMultiVersionIter<'_>, Error> {
168		match self {
169			StandardQueryTransaction::Optimistic(q) => {
170				let iter = q.range_batched(range, batch_size)?;
171				Ok(Box::new(iter.into_iter()))
172			}
173			StandardQueryTransaction::Serializable(q) => {
174				let iter = q.range_batched(range, batch_size)?;
175				Ok(Box::new(iter.into_iter()))
176			}
177		}
178	}
179
180	fn range_rev_batched(
181		&mut self,
182		range: EncodedKeyRange,
183		batch_size: u64,
184	) -> Result<BoxedMultiVersionIter<'_>, Error> {
185		match self {
186			StandardQueryTransaction::Optimistic(q) => {
187				let iter = q.range_rev_batched(range, batch_size)?;
188				Ok(Box::new(iter.into_iter()))
189			}
190			StandardQueryTransaction::Serializable(q) => {
191				let iter = q.range_rev_batched(range, batch_size)?;
192				Ok(Box::new(iter.into_iter()))
193			}
194		}
195	}
196
197	fn prefix(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter<'_>, Error> {
198		match self {
199			StandardQueryTransaction::Optimistic(q) => {
200				let iter = q.prefix(prefix)?;
201				Ok(Box::new(iter.into_iter()))
202			}
203			StandardQueryTransaction::Serializable(q) => {
204				let iter = q.prefix(prefix)?;
205				Ok(Box::new(iter.into_iter()))
206			}
207		}
208	}
209
210	fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter<'_>, Error> {
211		match self {
212			StandardQueryTransaction::Optimistic(q) => {
213				let iter = q.prefix_rev(prefix)?;
214				Ok(Box::new(iter.into_iter()))
215			}
216			StandardQueryTransaction::Serializable(q) => {
217				let iter = q.prefix_rev(prefix)?;
218				Ok(Box::new(iter.into_iter()))
219			}
220		}
221	}
222
223	fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<(), Error> {
224		match self {
225			StandardQueryTransaction::Optimistic(q) => {
226				q.read_as_of_version_exclusive(version);
227				Ok(())
228			}
229			StandardQueryTransaction::Serializable(q) => {
230				q.read_as_of_version_exclusive(version);
231				Ok(())
232			}
233		}
234	}
235}
236
237impl MultiVersionCommandTransaction for StandardCommandTransaction {
238	fn set(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<(), Error> {
239		match self {
240			StandardCommandTransaction::Optimistic(c) => c.set(key, values),
241			StandardCommandTransaction::Serializable(c) => c.set(key, values),
242		}
243	}
244
245	fn remove(&mut self, key: &EncodedKey) -> Result<(), Error> {
246		match self {
247			StandardCommandTransaction::Optimistic(c) => c.remove(key),
248			StandardCommandTransaction::Serializable(c) => c.remove(key),
249		}
250	}
251
252	fn commit(self) -> Result<CommitVersion, Error> {
253		match self {
254			StandardCommandTransaction::Optimistic(c) => c.commit(),
255			StandardCommandTransaction::Serializable(c) => c.commit(),
256		}
257	}
258
259	fn rollback(self) -> Result<(), Error> {
260		// Both transaction types auto-rollback when dropped
261		Ok(())
262	}
263}
264
265impl MultiVersionQueryTransaction for StandardCommandTransaction {
266	fn version(&self) -> CommitVersion {
267		match self {
268			StandardCommandTransaction::Optimistic(c) => c.tm.version(),
269			StandardCommandTransaction::Serializable(c) => c.tm.version(),
270		}
271	}
272
273	fn id(&self) -> TransactionId {
274		match self {
275			StandardCommandTransaction::Optimistic(c) => c.tm.id(),
276			StandardCommandTransaction::Serializable(c) => c.tm.id(),
277		}
278	}
279
280	fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>, Error> {
281		match self {
282			StandardCommandTransaction::Optimistic(c) => {
283				Ok(c.get(key)?.map(|tv| tv.into_multi_version_values()))
284			}
285			StandardCommandTransaction::Serializable(c) => {
286				Ok(c.get(key)?.map(|tv| tv.into_multi_version_values()))
287			}
288		}
289	}
290
291	fn contains_key(&mut self, key: &EncodedKey) -> Result<bool, Error> {
292		match self {
293			StandardCommandTransaction::Optimistic(c) => c.contains_key(key),
294			StandardCommandTransaction::Serializable(c) => c.contains_key(key),
295		}
296	}
297
298	fn range_batched(
299		&mut self,
300		range: EncodedKeyRange,
301		batch_size: u64,
302	) -> Result<BoxedMultiVersionIter<'_>, Error> {
303		match self {
304			StandardCommandTransaction::Optimistic(c) => {
305				let iter = c.range_batched(range, batch_size)?;
306				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
307			}
308			StandardCommandTransaction::Serializable(c) => {
309				let iter = c.range_batched(range, batch_size)?;
310				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
311			}
312		}
313	}
314
315	fn range_rev_batched(
316		&mut self,
317		range: EncodedKeyRange,
318		batch_size: u64,
319	) -> Result<BoxedMultiVersionIter<'_>, Error> {
320		match self {
321			StandardCommandTransaction::Optimistic(c) => {
322				let iter = c.range_rev_batched(range, batch_size)?;
323				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
324			}
325			StandardCommandTransaction::Serializable(c) => {
326				let iter = c.range_rev_batched(range, batch_size)?;
327				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
328			}
329		}
330	}
331
332	fn prefix(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter<'_>, Error> {
333		match self {
334			StandardCommandTransaction::Optimistic(c) => {
335				let iter = c.prefix(prefix)?;
336				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
337			}
338			StandardCommandTransaction::Serializable(c) => {
339				let iter = c.prefix(prefix)?;
340				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
341			}
342		}
343	}
344
345	fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter<'_>, Error> {
346		match self {
347			StandardCommandTransaction::Optimistic(c) => {
348				let iter = c.prefix_rev(prefix)?;
349				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
350			}
351			StandardCommandTransaction::Serializable(c) => {
352				let iter = c.prefix_rev(prefix)?;
353				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
354			}
355		}
356	}
357
358	fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<(), Error> {
359		match self {
360			StandardCommandTransaction::Optimistic(c) => {
361				c.read_as_of_version_exclusive(version);
362				Ok(())
363			}
364			StandardCommandTransaction::Serializable(c) => {
365				c.read_as_of_version_exclusive(version);
366				Ok(())
367			}
368		}
369	}
370}
371
372impl StandardCommandTransaction {
373	/// Get access to the pending writes in this transaction
374	pub fn pending_writes(&self) -> &PendingWrites {
375		match self {
376			StandardCommandTransaction::Optimistic(c) => c.pending_writes(),
377			StandardCommandTransaction::Serializable(c) => c.pending_writes(),
378		}
379	}
380}
381
382impl MultiVersionTransaction for TransactionMultiVersion {
383	type Query = StandardQueryTransaction;
384	type Command = StandardCommandTransaction;
385
386	fn begin_query(&self) -> Result<Self::Query, Error> {
387		match self {
388			TransactionMultiVersion::Optimistic(t) => {
389				Ok(StandardQueryTransaction::Optimistic(t.begin_query()?))
390			}
391			TransactionMultiVersion::Serializable(t) => {
392				Ok(StandardQueryTransaction::Serializable(t.begin_query()?))
393			}
394		}
395	}
396
397	fn begin_command(&self) -> Result<Self::Command, Error> {
398		match self {
399			TransactionMultiVersion::Optimistic(t) => {
400				Ok(StandardCommandTransaction::Optimistic(t.begin_command()?))
401			}
402			TransactionMultiVersion::Serializable(t) => {
403				Ok(StandardCommandTransaction::Serializable(t.begin_command()?))
404			}
405		}
406	}
407}