reifydb_transaction/multi/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	CommitVersion, EncodedKey, EncodedKeyRange, Error,
6	event::EventBus,
7	interface::{
8		BoxedMultiVersionIter, MultiVersionCommandTransaction, MultiVersionQueryTransaction,
9		MultiVersionTransaction, MultiVersionValues, TransactionId, WithEventBus,
10	},
11	value::encoded::EncodedValues,
12};
13use reifydb_store_transaction::TransactionStore;
14
15use crate::{
16	multi::{
17		pending::PendingWrites,
18		transaction::{
19			optimistic::{
20				CommandTransaction as OptimisticCommandTransaction,
21				QueryTransaction as OptimisticQueryTransaction, TransactionOptimistic,
22			},
23			serializable::{
24				CommandTransaction as SerializableCommandTransaction,
25				QueryTransaction as SerializableQueryTransaction, TransactionSerializable,
26			},
27		},
28	},
29	single::TransactionSingleVersion,
30};
31
32pub mod conflict;
33pub mod marker;
34pub mod optimistic;
35pub mod pending;
36pub mod serializable;
37pub mod transaction;
38pub mod types;
39pub mod watermark;
40
41#[repr(u8)]
42#[derive(Clone)]
43pub enum TransactionMultiVersion {
44	Optimistic(TransactionOptimistic) = 0,
45	Serializable(TransactionSerializable) = 1,
46}
47
48impl TransactionMultiVersion {
49	pub fn optimistic(store: TransactionStore, single: TransactionSingleVersion, bus: EventBus) -> Self {
50		Self::Optimistic(TransactionOptimistic::new(store, single, bus))
51	}
52
53	pub fn serializable(store: TransactionStore, single: TransactionSingleVersion, bus: EventBus) -> Self {
54		Self::Serializable(TransactionSerializable::new(store, single, bus))
55	}
56}
57
58pub enum StandardQueryTransaction {
59	Optimistic(OptimisticQueryTransaction),
60	Serializable(SerializableQueryTransaction),
61}
62
63pub enum StandardCommandTransaction {
64	Optimistic(OptimisticCommandTransaction),
65	Serializable(SerializableCommandTransaction),
66}
67
68impl WithEventBus for TransactionMultiVersion {
69	fn event_bus(&self) -> &EventBus {
70		match self {
71			TransactionMultiVersion::Optimistic(t) => t.event_bus(),
72			TransactionMultiVersion::Serializable(t) => t.event_bus(),
73		}
74	}
75}
76
77impl MultiVersionQueryTransaction for StandardQueryTransaction {
78	fn version(&self) -> CommitVersion {
79		match self {
80			StandardQueryTransaction::Optimistic(q) => q.version(),
81			StandardQueryTransaction::Serializable(q) => q.version(),
82		}
83	}
84
85	fn id(&self) -> TransactionId {
86		match self {
87			StandardQueryTransaction::Optimistic(q) => q.tm.id(),
88			StandardQueryTransaction::Serializable(q) => q.tm.id(),
89		}
90	}
91
92	fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>, Error> {
93		match self {
94			StandardQueryTransaction::Optimistic(q) => Ok(q.get(key)?),
95			StandardQueryTransaction::Serializable(q) => Ok(q.get(key)?),
96		}
97	}
98
99	fn contains_key(&mut self, key: &EncodedKey) -> Result<bool, Error> {
100		match self {
101			StandardQueryTransaction::Optimistic(q) => q.contains_key(key),
102			StandardQueryTransaction::Serializable(q) => q.contains_key(key),
103		}
104	}
105
106	fn range_batched(&mut self, range: EncodedKeyRange, batch_size: u64) -> Result<BoxedMultiVersionIter, Error> {
107		match self {
108			StandardQueryTransaction::Optimistic(q) => {
109				let iter = q.range_batched(range, batch_size)?;
110				Ok(Box::new(iter.into_iter()))
111			}
112			StandardQueryTransaction::Serializable(q) => {
113				let iter = q.range_batched(range, batch_size)?;
114				Ok(Box::new(iter.into_iter()))
115			}
116		}
117	}
118
119	fn range_rev_batched(
120		&mut self,
121		range: EncodedKeyRange,
122		batch_size: u64,
123	) -> Result<BoxedMultiVersionIter, Error> {
124		match self {
125			StandardQueryTransaction::Optimistic(q) => {
126				let iter = q.range_rev_batched(range, batch_size)?;
127				Ok(Box::new(iter.into_iter()))
128			}
129			StandardQueryTransaction::Serializable(q) => {
130				let iter = q.range_rev_batched(range, batch_size)?;
131				Ok(Box::new(iter.into_iter()))
132			}
133		}
134	}
135
136	fn prefix(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter, Error> {
137		match self {
138			StandardQueryTransaction::Optimistic(q) => {
139				let iter = q.prefix(prefix)?;
140				Ok(Box::new(iter.into_iter()))
141			}
142			StandardQueryTransaction::Serializable(q) => {
143				let iter = q.prefix(prefix)?;
144				Ok(Box::new(iter.into_iter()))
145			}
146		}
147	}
148
149	fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter, Error> {
150		match self {
151			StandardQueryTransaction::Optimistic(q) => {
152				let iter = q.prefix_rev(prefix)?;
153				Ok(Box::new(iter.into_iter()))
154			}
155			StandardQueryTransaction::Serializable(q) => {
156				let iter = q.prefix_rev(prefix)?;
157				Ok(Box::new(iter.into_iter()))
158			}
159		}
160	}
161
162	fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<(), Error> {
163		match self {
164			StandardQueryTransaction::Optimistic(q) => {
165				q.read_as_of_version_exclusive(version);
166				Ok(())
167			}
168			StandardQueryTransaction::Serializable(q) => {
169				q.read_as_of_version_exclusive(version);
170				Ok(())
171			}
172		}
173	}
174}
175
176impl MultiVersionCommandTransaction for StandardCommandTransaction {
177	fn set(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<(), Error> {
178		match self {
179			StandardCommandTransaction::Optimistic(c) => c.set(key, values),
180			StandardCommandTransaction::Serializable(c) => c.set(key, values),
181		}
182	}
183
184	fn remove(&mut self, key: &EncodedKey) -> Result<(), Error> {
185		match self {
186			StandardCommandTransaction::Optimistic(c) => c.remove(key),
187			StandardCommandTransaction::Serializable(c) => c.remove(key),
188		}
189	}
190
191	fn commit(self) -> Result<CommitVersion, Error> {
192		match self {
193			StandardCommandTransaction::Optimistic(c) => c.commit(),
194			StandardCommandTransaction::Serializable(c) => c.commit(),
195		}
196	}
197
198	fn rollback(self) -> Result<(), Error> {
199		// Both transaction types auto-rollback when dropped
200		Ok(())
201	}
202}
203
204impl MultiVersionQueryTransaction for StandardCommandTransaction {
205	fn version(&self) -> CommitVersion {
206		match self {
207			StandardCommandTransaction::Optimistic(c) => c.tm.version(),
208			StandardCommandTransaction::Serializable(c) => c.tm.version(),
209		}
210	}
211
212	fn id(&self) -> TransactionId {
213		match self {
214			StandardCommandTransaction::Optimistic(c) => c.tm.id(),
215			StandardCommandTransaction::Serializable(c) => c.tm.id(),
216		}
217	}
218
219	fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>, Error> {
220		match self {
221			StandardCommandTransaction::Optimistic(c) => {
222				Ok(c.get(key)?.map(|tv| tv.into_multi_version_values()))
223			}
224			StandardCommandTransaction::Serializable(c) => {
225				Ok(c.get(key)?.map(|tv| tv.into_multi_version_values()))
226			}
227		}
228	}
229
230	fn contains_key(&mut self, key: &EncodedKey) -> Result<bool, Error> {
231		match self {
232			StandardCommandTransaction::Optimistic(c) => c.contains_key(key),
233			StandardCommandTransaction::Serializable(c) => c.contains_key(key),
234		}
235	}
236
237	fn range_batched(&mut self, range: EncodedKeyRange, batch_size: u64) -> Result<BoxedMultiVersionIter, Error> {
238		match self {
239			StandardCommandTransaction::Optimistic(c) => {
240				let iter = c.range_batched(range, batch_size)?;
241				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
242			}
243			StandardCommandTransaction::Serializable(c) => {
244				let iter = c.range_batched(range, batch_size)?;
245				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
246			}
247		}
248	}
249
250	fn range_rev_batched(
251		&mut self,
252		range: EncodedKeyRange,
253		batch_size: u64,
254	) -> Result<BoxedMultiVersionIter, Error> {
255		match self {
256			StandardCommandTransaction::Optimistic(c) => {
257				let iter = c.range_rev_batched(range, batch_size)?;
258				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
259			}
260			StandardCommandTransaction::Serializable(c) => {
261				let iter = c.range_rev_batched(range, batch_size)?;
262				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
263			}
264		}
265	}
266
267	fn prefix(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter, Error> {
268		match self {
269			StandardCommandTransaction::Optimistic(c) => {
270				let iter = c.prefix(prefix)?;
271				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
272			}
273			StandardCommandTransaction::Serializable(c) => {
274				let iter = c.prefix(prefix)?;
275				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
276			}
277		}
278	}
279
280	fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter, Error> {
281		match self {
282			StandardCommandTransaction::Optimistic(c) => {
283				let iter = c.prefix_rev(prefix)?;
284				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
285			}
286			StandardCommandTransaction::Serializable(c) => {
287				let iter = c.prefix_rev(prefix)?;
288				Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
289			}
290		}
291	}
292
293	fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<(), Error> {
294		match self {
295			StandardCommandTransaction::Optimistic(c) => {
296				c.read_as_of_version_exclusive(version);
297				Ok(())
298			}
299			StandardCommandTransaction::Serializable(c) => {
300				c.read_as_of_version_exclusive(version);
301				Ok(())
302			}
303		}
304	}
305}
306
307impl StandardCommandTransaction {
308	/// Get access to the pending writes in this transaction
309	pub fn pending_writes(&self) -> &PendingWrites {
310		match self {
311			StandardCommandTransaction::Optimistic(c) => c.pending_writes(),
312			StandardCommandTransaction::Serializable(c) => c.pending_writes(),
313		}
314	}
315}
316
317impl MultiVersionTransaction for TransactionMultiVersion {
318	type Query = StandardQueryTransaction;
319	type Command = StandardCommandTransaction;
320
321	fn begin_query(&self) -> Result<Self::Query, Error> {
322		match self {
323			TransactionMultiVersion::Optimistic(t) => {
324				Ok(StandardQueryTransaction::Optimistic(t.begin_query()?))
325			}
326			TransactionMultiVersion::Serializable(t) => {
327				Ok(StandardQueryTransaction::Serializable(t.begin_query()?))
328			}
329		}
330	}
331
332	fn begin_command(&self) -> Result<Self::Command, Error> {
333		match self {
334			TransactionMultiVersion::Optimistic(t) => {
335				Ok(StandardCommandTransaction::Optimistic(t.begin_command()?))
336			}
337			TransactionMultiVersion::Serializable(t) => {
338				Ok(StandardCommandTransaction::Serializable(t.begin_command()?))
339			}
340		}
341	}
342}