reifydb_engine/transaction/
mod.rs1use reifydb_catalog::MaterializedCatalog;
5use reifydb_core::{
6 CommitVersion, EncodedKey, EncodedKeyRange, TransactionId,
7 interface::{MultiVersionBatch, MultiVersionQueryTransaction, MultiVersionValues},
8};
9
10mod catalog;
11mod command;
12#[allow(dead_code)]
13pub(crate) mod operation;
14mod query;
15
16pub use command::StandardCommandTransaction;
17pub use query::StandardQueryTransaction;
18
19pub enum StandardTransaction<'a> {
22 Command(&'a mut StandardCommandTransaction),
23 Query(&'a mut StandardQueryTransaction),
24}
25
26impl<'a> StandardTransaction<'a> {
27 pub fn version(&self) -> CommitVersion {
29 match self {
30 Self::Command(txn) => MultiVersionQueryTransaction::version(*txn),
31 Self::Query(txn) => MultiVersionQueryTransaction::version(*txn),
32 }
33 }
34
35 pub fn id(&self) -> TransactionId {
37 match self {
38 Self::Command(txn) => txn.id(),
39 Self::Query(txn) => txn.id(),
40 }
41 }
42
43 pub async fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
45 match self {
46 Self::Command(txn) => txn.get(key).await,
47 Self::Query(txn) => txn.get(key).await,
48 }
49 }
50
51 pub async fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
53 match self {
54 Self::Command(txn) => txn.contains_key(key).await,
55 Self::Query(txn) => txn.contains_key(key).await,
56 }
57 }
58
59 pub async fn range_batch(
61 &mut self,
62 range: EncodedKeyRange,
63 batch_size: u64,
64 ) -> crate::Result<MultiVersionBatch> {
65 match self {
66 Self::Command(txn) => txn.range_batch(range, batch_size).await,
67 Self::Query(txn) => txn.range_batch(range, batch_size).await,
68 }
69 }
70
71 pub async fn range_rev_batch(
73 &mut self,
74 range: EncodedKeyRange,
75 batch_size: u64,
76 ) -> crate::Result<MultiVersionBatch> {
77 match self {
78 Self::Command(txn) => txn.range_rev_batch(range, batch_size).await,
79 Self::Query(txn) => txn.range_rev_batch(range, batch_size).await,
80 }
81 }
82
83 pub async fn prefix(&mut self, prefix: &EncodedKey) -> crate::Result<MultiVersionBatch> {
85 match self {
86 Self::Command(txn) => txn.prefix(prefix).await,
87 Self::Query(txn) => txn.prefix(prefix).await,
88 }
89 }
90
91 pub async fn prefix_rev(&mut self, prefix: &EncodedKey) -> crate::Result<MultiVersionBatch> {
93 match self {
94 Self::Command(txn) => txn.prefix_rev(prefix).await,
95 Self::Query(txn) => txn.prefix_rev(prefix).await,
96 }
97 }
98
99 pub async fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
101 match self {
102 StandardTransaction::Command(txn) => txn.read_as_of_version_exclusive(version).await,
103 StandardTransaction::Query(txn) => txn.read_as_of_version_exclusive(version).await,
104 }
105 }
106}
107
108impl<'a> From<&'a mut StandardCommandTransaction> for StandardTransaction<'a> {
109 fn from(txn: &'a mut StandardCommandTransaction) -> Self {
110 Self::Command(txn)
111 }
112}
113
114impl<'a> From<&'a mut StandardQueryTransaction> for StandardTransaction<'a> {
115 fn from(txn: &'a mut StandardQueryTransaction) -> Self {
116 Self::Query(txn)
117 }
118}
119
120impl<'a> StandardTransaction<'a> {
121 pub fn command(self) -> &'a mut StandardCommandTransaction {
124 match self {
125 Self::Command(txn) => txn,
126 Self::Query(_) => panic!("Expected Command transaction but found Query transaction"),
127 }
128 }
129
130 pub fn query(self) -> &'a mut StandardQueryTransaction {
133 match self {
134 Self::Query(txn) => txn,
135 Self::Command(_) => panic!("Expected Query transaction but found Command transaction"),
136 }
137 }
138
139 pub fn command_mut(&mut self) -> &mut StandardCommandTransaction {
142 match self {
143 Self::Command(txn) => txn,
144 Self::Query(_) => panic!("Expected Command transaction but found Query transaction"),
145 }
146 }
147
148 pub fn query_mut(&mut self) -> &mut StandardQueryTransaction {
151 match self {
152 Self::Query(txn) => txn,
153 Self::Command(_) => panic!("Expected Query transaction but found Command transaction"),
154 }
155 }
156
157 pub fn catalog(&self) -> &MaterializedCatalog {
158 match self {
159 StandardTransaction::Command(txn) => &txn.catalog,
160 StandardTransaction::Query(txn) => &txn.catalog,
161 }
162 }
163}
164
165use async_trait::async_trait;
166use reifydb_core::interface::QueryTransaction;
167
168#[async_trait]
171impl<'a> MultiVersionQueryTransaction for StandardTransaction<'a> {
172 fn version(&self) -> CommitVersion {
173 match self {
174 Self::Command(txn) => MultiVersionQueryTransaction::version(*txn),
175 Self::Query(txn) => MultiVersionQueryTransaction::version(*txn),
176 }
177 }
178
179 fn id(&self) -> TransactionId {
180 match self {
181 Self::Command(txn) => txn.id(),
182 Self::Query(txn) => txn.id(),
183 }
184 }
185
186 async fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
187 match self {
188 Self::Command(txn) => txn.get(key).await,
189 Self::Query(txn) => txn.get(key).await,
190 }
191 }
192
193 async fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
194 match self {
195 Self::Command(txn) => txn.contains_key(key).await,
196 Self::Query(txn) => txn.contains_key(key).await,
197 }
198 }
199
200 async fn range_batch(&mut self, range: EncodedKeyRange, batch_size: u64) -> crate::Result<MultiVersionBatch> {
201 match self {
202 Self::Command(txn) => txn.range_batch(range, batch_size).await,
203 Self::Query(txn) => txn.range_batch(range, batch_size).await,
204 }
205 }
206
207 async fn range_rev_batch(
208 &mut self,
209 range: EncodedKeyRange,
210 batch_size: u64,
211 ) -> crate::Result<MultiVersionBatch> {
212 match self {
213 Self::Command(txn) => txn.range_rev_batch(range, batch_size).await,
214 Self::Query(txn) => txn.range_rev_batch(range, batch_size).await,
215 }
216 }
217
218 async fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
219 match self {
220 StandardTransaction::Command(txn) => txn.read_as_of_version_exclusive(version).await,
221 StandardTransaction::Query(txn) => txn.read_as_of_version_exclusive(version).await,
222 }
223 }
224}
225
226#[async_trait]
227impl<'a> QueryTransaction for StandardTransaction<'a> {
228 type SingleVersionQuery<'b>
229 = <StandardQueryTransaction as QueryTransaction>::SingleVersionQuery<'b>
230 where
231 Self: 'b;
232 type CdcQuery<'b>
233 = <StandardQueryTransaction as QueryTransaction>::CdcQuery<'b>
234 where
235 Self: 'b;
236
237 async fn begin_single_query<'c, I>(&self, keys: I) -> crate::Result<Self::SingleVersionQuery<'_>>
238 where
239 I: IntoIterator<Item = &'c EncodedKey> + Send,
240 {
241 match self {
242 StandardTransaction::Command(txn) => txn.begin_single_query(keys).await,
243 StandardTransaction::Query(txn) => txn.begin_single_query(keys).await,
244 }
245 }
246
247 async fn begin_cdc_query(&self) -> crate::Result<Self::CdcQuery<'_>> {
248 match self {
249 StandardTransaction::Command(txn) => txn.begin_cdc_query().await,
250 StandardTransaction::Query(txn) => txn.begin_cdc_query().await,
251 }
252 }
253}