reifydb_engine/transaction/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_catalog::MaterializedCatalog;
5use reifydb_core::{
6	CommitVersion, EncodedKey, EncodedKeyRange, TransactionId,
7	interface::{MultiVersionBatch, MultiVersionQueryTransaction, MultiVersionValues},
8};
9
10mod catalog;
11mod command;
12#[allow(dead_code)]
13pub(crate) mod operation;
14mod query;
15
16pub use command::StandardCommandTransaction;
17pub use query::StandardQueryTransaction;
18
19/// An enum that can hold either a command or query transaction for flexible
20/// execution
21pub enum StandardTransaction<'a> {
22	Command(&'a mut StandardCommandTransaction),
23	Query(&'a mut StandardQueryTransaction),
24}
25
26impl<'a> StandardTransaction<'a> {
27	/// Get the transaction version
28	pub fn version(&self) -> CommitVersion {
29		match self {
30			Self::Command(txn) => MultiVersionQueryTransaction::version(*txn),
31			Self::Query(txn) => MultiVersionQueryTransaction::version(*txn),
32		}
33	}
34
35	/// Get the transaction ID
36	pub fn id(&self) -> TransactionId {
37		match self {
38			Self::Command(txn) => txn.id(),
39			Self::Query(txn) => txn.id(),
40		}
41	}
42
43	/// Get a value by key (async method)
44	pub async fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
45		match self {
46			Self::Command(txn) => txn.get(key).await,
47			Self::Query(txn) => txn.get(key).await,
48		}
49	}
50
51	/// Check if a key exists (async method)
52	pub async fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
53		match self {
54			Self::Command(txn) => txn.contains_key(key).await,
55			Self::Query(txn) => txn.contains_key(key).await,
56		}
57	}
58
59	/// Get a range batch (async method)
60	pub async fn range_batch(
61		&mut self,
62		range: EncodedKeyRange,
63		batch_size: u64,
64	) -> crate::Result<MultiVersionBatch> {
65		match self {
66			Self::Command(txn) => txn.range_batch(range, batch_size).await,
67			Self::Query(txn) => txn.range_batch(range, batch_size).await,
68		}
69	}
70
71	/// Get a reverse range batch (async method)
72	pub async fn range_rev_batch(
73		&mut self,
74		range: EncodedKeyRange,
75		batch_size: u64,
76	) -> crate::Result<MultiVersionBatch> {
77		match self {
78			Self::Command(txn) => txn.range_rev_batch(range, batch_size).await,
79			Self::Query(txn) => txn.range_rev_batch(range, batch_size).await,
80		}
81	}
82
83	/// Get a prefix batch (async method)
84	pub async fn prefix(&mut self, prefix: &EncodedKey) -> crate::Result<MultiVersionBatch> {
85		match self {
86			Self::Command(txn) => txn.prefix(prefix).await,
87			Self::Query(txn) => txn.prefix(prefix).await,
88		}
89	}
90
91	/// Get a reverse prefix batch (async method)
92	pub async fn prefix_rev(&mut self, prefix: &EncodedKey) -> crate::Result<MultiVersionBatch> {
93		match self {
94			Self::Command(txn) => txn.prefix_rev(prefix).await,
95			Self::Query(txn) => txn.prefix_rev(prefix).await,
96		}
97	}
98
99	/// Read as of version exclusive (async method)
100	pub async fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
101		match self {
102			StandardTransaction::Command(txn) => txn.read_as_of_version_exclusive(version).await,
103			StandardTransaction::Query(txn) => txn.read_as_of_version_exclusive(version).await,
104		}
105	}
106}
107
108impl<'a> From<&'a mut StandardCommandTransaction> for StandardTransaction<'a> {
109	fn from(txn: &'a mut StandardCommandTransaction) -> Self {
110		Self::Command(txn)
111	}
112}
113
114impl<'a> From<&'a mut StandardQueryTransaction> for StandardTransaction<'a> {
115	fn from(txn: &'a mut StandardQueryTransaction) -> Self {
116		Self::Query(txn)
117	}
118}
119
120impl<'a> StandardTransaction<'a> {
121	/// Extract the underlying StandardCommandTransaction, panics if this is
122	/// a Query transaction
123	pub fn command(self) -> &'a mut StandardCommandTransaction {
124		match self {
125			Self::Command(txn) => txn,
126			Self::Query(_) => panic!("Expected Command transaction but found Query transaction"),
127		}
128	}
129
130	/// Extract the underlying StandardQueryTransaction, panics if this is a
131	/// Command transaction
132	pub fn query(self) -> &'a mut StandardQueryTransaction {
133		match self {
134			Self::Query(txn) => txn,
135			Self::Command(_) => panic!("Expected Query transaction but found Command transaction"),
136		}
137	}
138
139	/// Get a mutable reference to the underlying
140	/// StandardCommandTransaction, panics if this is a Query transaction
141	pub fn command_mut(&mut self) -> &mut StandardCommandTransaction {
142		match self {
143			Self::Command(txn) => txn,
144			Self::Query(_) => panic!("Expected Command transaction but found Query transaction"),
145		}
146	}
147
148	/// Get a mutable reference to the underlying StandardQueryTransaction,
149	/// panics if this is a Command transaction
150	pub fn query_mut(&mut self) -> &mut StandardQueryTransaction {
151		match self {
152			Self::Query(txn) => txn,
153			Self::Command(_) => panic!("Expected Query transaction but found Command transaction"),
154		}
155	}
156
157	pub fn catalog(&self) -> &MaterializedCatalog {
158		match self {
159			StandardTransaction::Command(txn) => &txn.catalog,
160			StandardTransaction::Query(txn) => &txn.catalog,
161		}
162	}
163}
164
165use async_trait::async_trait;
166use reifydb_core::interface::QueryTransaction;
167
168// StandardTransaction already has MultiVersionQueryTransaction methods defined above,
169// but we need the trait implementation for trait bounds
170#[async_trait]
171impl<'a> MultiVersionQueryTransaction for StandardTransaction<'a> {
172	fn version(&self) -> CommitVersion {
173		match self {
174			Self::Command(txn) => MultiVersionQueryTransaction::version(*txn),
175			Self::Query(txn) => MultiVersionQueryTransaction::version(*txn),
176		}
177	}
178
179	fn id(&self) -> TransactionId {
180		match self {
181			Self::Command(txn) => txn.id(),
182			Self::Query(txn) => txn.id(),
183		}
184	}
185
186	async fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
187		match self {
188			Self::Command(txn) => txn.get(key).await,
189			Self::Query(txn) => txn.get(key).await,
190		}
191	}
192
193	async fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
194		match self {
195			Self::Command(txn) => txn.contains_key(key).await,
196			Self::Query(txn) => txn.contains_key(key).await,
197		}
198	}
199
200	async fn range_batch(&mut self, range: EncodedKeyRange, batch_size: u64) -> crate::Result<MultiVersionBatch> {
201		match self {
202			Self::Command(txn) => txn.range_batch(range, batch_size).await,
203			Self::Query(txn) => txn.range_batch(range, batch_size).await,
204		}
205	}
206
207	async fn range_rev_batch(
208		&mut self,
209		range: EncodedKeyRange,
210		batch_size: u64,
211	) -> crate::Result<MultiVersionBatch> {
212		match self {
213			Self::Command(txn) => txn.range_rev_batch(range, batch_size).await,
214			Self::Query(txn) => txn.range_rev_batch(range, batch_size).await,
215		}
216	}
217
218	async fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
219		match self {
220			StandardTransaction::Command(txn) => txn.read_as_of_version_exclusive(version).await,
221			StandardTransaction::Query(txn) => txn.read_as_of_version_exclusive(version).await,
222		}
223	}
224}
225
226#[async_trait]
227impl<'a> QueryTransaction for StandardTransaction<'a> {
228	type SingleVersionQuery<'b>
229		= <StandardQueryTransaction as QueryTransaction>::SingleVersionQuery<'b>
230	where
231		Self: 'b;
232	type CdcQuery<'b>
233		= <StandardQueryTransaction as QueryTransaction>::CdcQuery<'b>
234	where
235		Self: 'b;
236
237	async fn begin_single_query<'c, I>(&self, keys: I) -> crate::Result<Self::SingleVersionQuery<'_>>
238	where
239		I: IntoIterator<Item = &'c EncodedKey> + Send,
240	{
241		match self {
242			StandardTransaction::Command(txn) => txn.begin_single_query(keys).await,
243			StandardTransaction::Query(txn) => txn.begin_single_query(keys).await,
244		}
245	}
246
247	async fn begin_cdc_query(&self) -> crate::Result<Self::CdcQuery<'_>> {
248		match self {
249			StandardTransaction::Command(txn) => txn.begin_cdc_query().await,
250			StandardTransaction::Query(txn) => txn.begin_cdc_query().await,
251		}
252	}
253}