Skip to main content

reifydb_transaction/transaction/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::CommitVersion,
6	encoded::{
7		encoded::EncodedValues,
8		key::{EncodedKey, EncodedKeyRange},
9	},
10	interface::{
11		change::Change,
12		store::{MultiVersionBatch, MultiVersionValues},
13	},
14};
15use reifydb_type::Result;
16
17use crate::{
18	TransactionId,
19	change::RowChange,
20	single::{read::SingleReadTransaction, write::SingleWriteTransaction},
21	transaction::{
22		admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
23		subscription::SubscriptionTransaction,
24	},
25};
26
27pub mod admin;
28pub mod catalog;
29pub mod command;
30pub mod query;
31pub mod subscription;
32
33/// An enum that can hold either a command, admin, query, or subscription transaction
34/// for flexible execution
35pub enum Transaction<'a> {
36	Command(&'a mut CommandTransaction),
37	Admin(&'a mut AdminTransaction),
38	Query(&'a mut QueryTransaction),
39	Subscription(&'a mut SubscriptionTransaction),
40}
41
42impl<'a> Transaction<'a> {
43	/// Get the transaction version
44	pub fn version(&self) -> CommitVersion {
45		match self {
46			Self::Command(txn) => txn.version(),
47			Self::Admin(txn) => txn.version(),
48			Self::Query(txn) => txn.version(),
49			Self::Subscription(txn) => txn.version(),
50		}
51	}
52
53	/// Get the transaction ID
54	pub fn id(&self) -> TransactionId {
55		match self {
56			Self::Command(txn) => txn.id(),
57			Self::Admin(txn) => txn.id(),
58			Self::Query(txn) => txn.id(),
59			Self::Subscription(txn) => txn.id(),
60		}
61	}
62
63	/// Get a value by key (async method)
64	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>> {
65		match self {
66			Self::Command(txn) => txn.get(key),
67			Self::Admin(txn) => txn.get(key),
68			Self::Query(txn) => txn.get(key),
69			Self::Subscription(txn) => txn.get(key),
70		}
71	}
72
73	/// Check if a key exists (async method)
74	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
75		match self {
76			Self::Command(txn) => txn.contains_key(key),
77			Self::Admin(txn) => txn.contains_key(key),
78			Self::Query(txn) => txn.contains_key(key),
79			Self::Subscription(txn) => txn.contains_key(key),
80		}
81	}
82
83	/// Get a prefix batch (async method)
84	pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
85		match self {
86			Self::Command(txn) => txn.prefix(prefix),
87			Self::Admin(txn) => txn.prefix(prefix),
88			Self::Query(txn) => txn.prefix(prefix),
89			Self::Subscription(txn) => txn.prefix(prefix),
90		}
91	}
92
93	/// Get a reverse prefix batch (async method)
94	pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
95		match self {
96			Self::Command(txn) => txn.prefix_rev(prefix),
97			Self::Admin(txn) => txn.prefix_rev(prefix),
98			Self::Query(txn) => txn.prefix_rev(prefix),
99			Self::Subscription(txn) => txn.prefix_rev(prefix),
100		}
101	}
102
103	/// Read as of version exclusive (async method)
104	pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
105		match self {
106			Transaction::Command(txn) => txn.read_as_of_version_exclusive(version),
107			Transaction::Admin(txn) => txn.read_as_of_version_exclusive(version),
108			Transaction::Query(txn) => txn.read_as_of_version_exclusive(version),
109			Transaction::Subscription(txn) => txn.read_as_of_version_exclusive(version),
110		}
111	}
112
113	/// Create a streaming iterator for forward range queries.
114	pub fn range(
115		&mut self,
116		range: EncodedKeyRange,
117		batch_size: usize,
118	) -> Result<Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>> {
119		match self {
120			Transaction::Command(txn) => txn.range(range, batch_size),
121			Transaction::Admin(txn) => txn.range(range, batch_size),
122			Transaction::Query(txn) => Ok(txn.range(range, batch_size)),
123			Transaction::Subscription(txn) => txn.range(range, batch_size),
124		}
125	}
126
127	/// Create a streaming iterator for reverse range queries.
128	pub fn range_rev(
129		&mut self,
130		range: EncodedKeyRange,
131		batch_size: usize,
132	) -> Result<Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>> {
133		match self {
134			Transaction::Command(txn) => txn.range_rev(range, batch_size),
135			Transaction::Admin(txn) => txn.range_rev(range, batch_size),
136			Transaction::Query(txn) => Ok(txn.range_rev(range, batch_size)),
137			Transaction::Subscription(txn) => txn.range_rev(range, batch_size),
138		}
139	}
140}
141
142impl<'a> From<&'a mut CommandTransaction> for Transaction<'a> {
143	fn from(txn: &'a mut CommandTransaction) -> Self {
144		Self::Command(txn)
145	}
146}
147
148impl<'a> From<&'a mut AdminTransaction> for Transaction<'a> {
149	fn from(txn: &'a mut AdminTransaction) -> Self {
150		Self::Admin(txn)
151	}
152}
153
154impl<'a> From<&'a mut QueryTransaction> for Transaction<'a> {
155	fn from(txn: &'a mut QueryTransaction) -> Self {
156		Self::Query(txn)
157	}
158}
159
160impl<'a> From<&'a mut SubscriptionTransaction> for Transaction<'a> {
161	fn from(txn: &'a mut SubscriptionTransaction) -> Self {
162		Self::Subscription(txn)
163	}
164}
165
166impl<'a> Transaction<'a> {
167	/// Re-borrow this transaction with a shorter lifetime, enabling
168	/// multiple sequential uses of the same transaction binding.
169	pub fn reborrow(&mut self) -> Transaction<'_> {
170		match self {
171			Transaction::Command(cmd) => Transaction::Command(cmd),
172			Transaction::Admin(admin) => Transaction::Admin(admin),
173			Transaction::Query(qry) => Transaction::Query(qry),
174			Transaction::Subscription(sub) => Transaction::Subscription(sub),
175		}
176	}
177
178	/// Extract the underlying CommandTransaction, panics if this is
179	/// not a Command transaction
180	pub fn command(self) -> &'a mut CommandTransaction {
181		match self {
182			Self::Command(txn) => txn,
183			_ => panic!("Expected Command transaction"),
184		}
185	}
186
187	/// Extract the underlying AdminTransaction, panics if this is
188	/// not an Admin transaction
189	pub fn admin(self) -> &'a mut AdminTransaction {
190		match self {
191			Self::Admin(txn) => txn,
192			_ => panic!("Expected Admin transaction"),
193		}
194	}
195
196	/// Extract the underlying QueryTransaction, panics if this is
197	/// not a Query transaction
198	pub fn query(self) -> &'a mut QueryTransaction {
199		match self {
200			Self::Query(txn) => txn,
201			_ => panic!("Expected Query transaction"),
202		}
203	}
204
205	/// Extract the underlying SubscriptionTransaction, panics if this is
206	/// not a Subscription transaction
207	pub fn subscription(self) -> &'a mut SubscriptionTransaction {
208		match self {
209			Self::Subscription(txn) => txn,
210			_ => panic!("Expected Subscription transaction"),
211		}
212	}
213
214	/// Get a mutable reference to the underlying
215	/// CommandTransaction, panics if this is not a Command transaction
216	pub fn command_mut(&mut self) -> &mut CommandTransaction {
217		match self {
218			Self::Command(txn) => txn,
219			_ => panic!("Expected Command transaction"),
220		}
221	}
222
223	/// Get a mutable reference to the underlying
224	/// AdminTransaction, panics if this is not an Admin transaction
225	pub fn admin_mut(&mut self) -> &mut AdminTransaction {
226		match self {
227			Self::Admin(txn) => txn,
228			_ => panic!("Expected Admin transaction"),
229		}
230	}
231
232	/// Get a mutable reference to the underlying QueryTransaction,
233	/// panics if this is not a Query transaction
234	pub fn query_mut(&mut self) -> &mut QueryTransaction {
235		match self {
236			Self::Query(txn) => txn,
237			_ => panic!("Expected Query transaction"),
238		}
239	}
240
241	/// Get a mutable reference to the underlying SubscriptionTransaction,
242	/// panics if this is not a Subscription transaction
243	pub fn subscription_mut(&mut self) -> &mut SubscriptionTransaction {
244		match self {
245			Self::Subscription(txn) => txn,
246			_ => panic!("Expected Subscription transaction"),
247		}
248	}
249
250	/// Begin a single-version query transaction for specific keys
251	pub fn begin_single_query<'b, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
252	where
253		I: IntoIterator<Item = &'b EncodedKey>,
254	{
255		match self {
256			Transaction::Command(txn) => txn.begin_single_query(keys),
257			Transaction::Admin(txn) => txn.begin_single_query(keys),
258			Transaction::Query(txn) => txn.begin_single_query(keys),
259			Transaction::Subscription(txn) => txn.begin_single_query(keys),
260		}
261	}
262
263	/// Begin a single-version write transaction for specific keys.
264	/// Panics on Query transactions.
265	pub fn begin_single_command<'b, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
266	where
267		I: IntoIterator<Item = &'b EncodedKey>,
268	{
269		match self {
270			Transaction::Command(txn) => txn.begin_single_command(keys),
271			Transaction::Admin(txn) => txn.begin_single_command(keys),
272			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
273			Transaction::Subscription(txn) => txn.begin_single_command(keys),
274		}
275	}
276
277	/// Set a key-value pair. Panics on Query transactions.
278	pub fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> Result<()> {
279		match self {
280			Transaction::Command(txn) => txn.set(key, row),
281			Transaction::Admin(txn) => txn.set(key, row),
282			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
283			Transaction::Subscription(txn) => txn.set(key, row),
284		}
285	}
286
287	/// Unset (delete with tombstone) a key-value pair. Panics on Query transactions.
288	pub fn unset(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<()> {
289		match self {
290			Transaction::Command(txn) => txn.unset(key, values),
291			Transaction::Admin(txn) => txn.unset(key, values),
292			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
293			Transaction::Subscription(txn) => txn.unset(key, values),
294		}
295	}
296
297	/// Remove a key. Panics on Query transactions.
298	pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
299		match self {
300			Transaction::Command(txn) => txn.remove(key),
301			Transaction::Admin(txn) => txn.remove(key),
302			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
303			Transaction::Subscription(txn) => txn.remove(key),
304		}
305	}
306
307	/// Track a row change for post-commit event emission. Panics on Query transactions.
308	pub fn track_row_change(&mut self, change: RowChange) {
309		match self {
310			Transaction::Command(txn) => txn.track_row_change(change),
311			Transaction::Admin(txn) => txn.track_row_change(change),
312			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
313			Transaction::Subscription(txn) => txn.track_row_change(change),
314		}
315	}
316
317	/// Track a flow change for transactional view pre-commit processing. Panics on Query transactions.
318	pub fn track_flow_change(&mut self, change: Change) {
319		match self {
320			Transaction::Command(txn) => txn.track_flow_change(change),
321			Transaction::Admin(txn) => txn.track_flow_change(change),
322			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
323			Transaction::Subscription(txn) => txn.track_flow_change(change),
324		}
325	}
326}