reifydb_transaction/transaction/
mod.rs1use reifydb_core::{
5 common::CommitVersion,
6 encoded::{
7 encoded::EncodedValues,
8 key::{EncodedKey, EncodedKeyRange},
9 },
10 interface::{
11 change::Change,
12 store::{MultiVersionBatch, MultiVersionValues},
13 },
14};
15use reifydb_type::Result;
16
17use crate::{
18 TransactionId,
19 change::RowChange,
20 single::{read::SingleReadTransaction, write::SingleWriteTransaction},
21 transaction::{
22 admin::AdminTransaction, command::CommandTransaction, query::QueryTransaction,
23 subscription::SubscriptionTransaction,
24 },
25};
26
27pub mod admin;
28pub mod catalog;
29pub mod command;
30pub mod query;
31pub mod subscription;
32
33pub enum Transaction<'a> {
36 Command(&'a mut CommandTransaction),
37 Admin(&'a mut AdminTransaction),
38 Query(&'a mut QueryTransaction),
39 Subscription(&'a mut SubscriptionTransaction),
40}
41
42impl<'a> Transaction<'a> {
43 pub fn version(&self) -> CommitVersion {
45 match self {
46 Self::Command(txn) => txn.version(),
47 Self::Admin(txn) => txn.version(),
48 Self::Query(txn) => txn.version(),
49 Self::Subscription(txn) => txn.version(),
50 }
51 }
52
53 pub fn id(&self) -> TransactionId {
55 match self {
56 Self::Command(txn) => txn.id(),
57 Self::Admin(txn) => txn.id(),
58 Self::Query(txn) => txn.id(),
59 Self::Subscription(txn) => txn.id(),
60 }
61 }
62
63 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>> {
65 match self {
66 Self::Command(txn) => txn.get(key),
67 Self::Admin(txn) => txn.get(key),
68 Self::Query(txn) => txn.get(key),
69 Self::Subscription(txn) => txn.get(key),
70 }
71 }
72
73 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
75 match self {
76 Self::Command(txn) => txn.contains_key(key),
77 Self::Admin(txn) => txn.contains_key(key),
78 Self::Query(txn) => txn.contains_key(key),
79 Self::Subscription(txn) => txn.contains_key(key),
80 }
81 }
82
83 pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
85 match self {
86 Self::Command(txn) => txn.prefix(prefix),
87 Self::Admin(txn) => txn.prefix(prefix),
88 Self::Query(txn) => txn.prefix(prefix),
89 Self::Subscription(txn) => txn.prefix(prefix),
90 }
91 }
92
93 pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
95 match self {
96 Self::Command(txn) => txn.prefix_rev(prefix),
97 Self::Admin(txn) => txn.prefix_rev(prefix),
98 Self::Query(txn) => txn.prefix_rev(prefix),
99 Self::Subscription(txn) => txn.prefix_rev(prefix),
100 }
101 }
102
103 pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
105 match self {
106 Transaction::Command(txn) => txn.read_as_of_version_exclusive(version),
107 Transaction::Admin(txn) => txn.read_as_of_version_exclusive(version),
108 Transaction::Query(txn) => txn.read_as_of_version_exclusive(version),
109 Transaction::Subscription(txn) => txn.read_as_of_version_exclusive(version),
110 }
111 }
112
113 pub fn range(
115 &mut self,
116 range: EncodedKeyRange,
117 batch_size: usize,
118 ) -> Result<Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>> {
119 match self {
120 Transaction::Command(txn) => txn.range(range, batch_size),
121 Transaction::Admin(txn) => txn.range(range, batch_size),
122 Transaction::Query(txn) => Ok(txn.range(range, batch_size)),
123 Transaction::Subscription(txn) => txn.range(range, batch_size),
124 }
125 }
126
127 pub fn range_rev(
129 &mut self,
130 range: EncodedKeyRange,
131 batch_size: usize,
132 ) -> Result<Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_>> {
133 match self {
134 Transaction::Command(txn) => txn.range_rev(range, batch_size),
135 Transaction::Admin(txn) => txn.range_rev(range, batch_size),
136 Transaction::Query(txn) => Ok(txn.range_rev(range, batch_size)),
137 Transaction::Subscription(txn) => txn.range_rev(range, batch_size),
138 }
139 }
140}
141
142impl<'a> From<&'a mut CommandTransaction> for Transaction<'a> {
143 fn from(txn: &'a mut CommandTransaction) -> Self {
144 Self::Command(txn)
145 }
146}
147
148impl<'a> From<&'a mut AdminTransaction> for Transaction<'a> {
149 fn from(txn: &'a mut AdminTransaction) -> Self {
150 Self::Admin(txn)
151 }
152}
153
154impl<'a> From<&'a mut QueryTransaction> for Transaction<'a> {
155 fn from(txn: &'a mut QueryTransaction) -> Self {
156 Self::Query(txn)
157 }
158}
159
160impl<'a> From<&'a mut SubscriptionTransaction> for Transaction<'a> {
161 fn from(txn: &'a mut SubscriptionTransaction) -> Self {
162 Self::Subscription(txn)
163 }
164}
165
166impl<'a> Transaction<'a> {
167 pub fn reborrow(&mut self) -> Transaction<'_> {
170 match self {
171 Transaction::Command(cmd) => Transaction::Command(cmd),
172 Transaction::Admin(admin) => Transaction::Admin(admin),
173 Transaction::Query(qry) => Transaction::Query(qry),
174 Transaction::Subscription(sub) => Transaction::Subscription(sub),
175 }
176 }
177
178 pub fn command(self) -> &'a mut CommandTransaction {
181 match self {
182 Self::Command(txn) => txn,
183 _ => panic!("Expected Command transaction"),
184 }
185 }
186
187 pub fn admin(self) -> &'a mut AdminTransaction {
190 match self {
191 Self::Admin(txn) => txn,
192 _ => panic!("Expected Admin transaction"),
193 }
194 }
195
196 pub fn query(self) -> &'a mut QueryTransaction {
199 match self {
200 Self::Query(txn) => txn,
201 _ => panic!("Expected Query transaction"),
202 }
203 }
204
205 pub fn subscription(self) -> &'a mut SubscriptionTransaction {
208 match self {
209 Self::Subscription(txn) => txn,
210 _ => panic!("Expected Subscription transaction"),
211 }
212 }
213
214 pub fn command_mut(&mut self) -> &mut CommandTransaction {
217 match self {
218 Self::Command(txn) => txn,
219 _ => panic!("Expected Command transaction"),
220 }
221 }
222
223 pub fn admin_mut(&mut self) -> &mut AdminTransaction {
226 match self {
227 Self::Admin(txn) => txn,
228 _ => panic!("Expected Admin transaction"),
229 }
230 }
231
232 pub fn query_mut(&mut self) -> &mut QueryTransaction {
235 match self {
236 Self::Query(txn) => txn,
237 _ => panic!("Expected Query transaction"),
238 }
239 }
240
241 pub fn subscription_mut(&mut self) -> &mut SubscriptionTransaction {
244 match self {
245 Self::Subscription(txn) => txn,
246 _ => panic!("Expected Subscription transaction"),
247 }
248 }
249
250 pub fn begin_single_query<'b, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
252 where
253 I: IntoIterator<Item = &'b EncodedKey>,
254 {
255 match self {
256 Transaction::Command(txn) => txn.begin_single_query(keys),
257 Transaction::Admin(txn) => txn.begin_single_query(keys),
258 Transaction::Query(txn) => txn.begin_single_query(keys),
259 Transaction::Subscription(txn) => txn.begin_single_query(keys),
260 }
261 }
262
263 pub fn begin_single_command<'b, I>(&self, keys: I) -> Result<SingleWriteTransaction<'_>>
266 where
267 I: IntoIterator<Item = &'b EncodedKey>,
268 {
269 match self {
270 Transaction::Command(txn) => txn.begin_single_command(keys),
271 Transaction::Admin(txn) => txn.begin_single_command(keys),
272 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
273 Transaction::Subscription(txn) => txn.begin_single_command(keys),
274 }
275 }
276
277 pub fn set(&mut self, key: &EncodedKey, row: EncodedValues) -> Result<()> {
279 match self {
280 Transaction::Command(txn) => txn.set(key, row),
281 Transaction::Admin(txn) => txn.set(key, row),
282 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
283 Transaction::Subscription(txn) => txn.set(key, row),
284 }
285 }
286
287 pub fn unset(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<()> {
289 match self {
290 Transaction::Command(txn) => txn.unset(key, values),
291 Transaction::Admin(txn) => txn.unset(key, values),
292 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
293 Transaction::Subscription(txn) => txn.unset(key, values),
294 }
295 }
296
297 pub fn remove(&mut self, key: &EncodedKey) -> Result<()> {
299 match self {
300 Transaction::Command(txn) => txn.remove(key),
301 Transaction::Admin(txn) => txn.remove(key),
302 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
303 Transaction::Subscription(txn) => txn.remove(key),
304 }
305 }
306
307 pub fn track_row_change(&mut self, change: RowChange) {
309 match self {
310 Transaction::Command(txn) => txn.track_row_change(change),
311 Transaction::Admin(txn) => txn.track_row_change(change),
312 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
313 Transaction::Subscription(txn) => txn.track_row_change(change),
314 }
315 }
316
317 pub fn track_flow_change(&mut self, change: Change) {
319 match self {
320 Transaction::Command(txn) => txn.track_flow_change(change),
321 Transaction::Admin(txn) => txn.track_flow_change(change),
322 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
323 Transaction::Subscription(txn) => txn.track_flow_change(change),
324 }
325 }
326}