reifydb_transaction/multi/
mod.rs1use reifydb_core::{
5 CommitVersion, EncodedKey, EncodedKeyRange, Error,
6 event::EventBus,
7 interface::{
8 BoxedMultiVersionIter, MultiVersionCommandTransaction, MultiVersionQueryTransaction,
9 MultiVersionTransaction, MultiVersionValues, TransactionId, WithEventBus,
10 },
11 value::encoded::EncodedValues,
12};
13use reifydb_store_transaction::TransactionStore;
14
15use crate::{
16 multi::{
17 pending::PendingWrites,
18 transaction::{
19 optimistic::{
20 CommandTransaction as OptimisticCommandTransaction,
21 QueryTransaction as OptimisticQueryTransaction, TransactionOptimistic,
22 },
23 serializable::{
24 CommandTransaction as SerializableCommandTransaction,
25 QueryTransaction as SerializableQueryTransaction, TransactionSerializable,
26 },
27 },
28 },
29 single::TransactionSingleVersion,
30};
31
32pub mod conflict;
33pub mod marker;
34pub mod optimistic;
35pub mod pending;
36pub mod serializable;
37pub mod transaction;
38pub mod types;
39pub mod watermark;
40
41#[repr(u8)]
42#[derive(Clone)]
43pub enum TransactionMultiVersion {
44 Optimistic(TransactionOptimistic) = 0,
45 Serializable(TransactionSerializable) = 1,
46}
47
48impl TransactionMultiVersion {
49 pub fn optimistic(store: TransactionStore, single: TransactionSingleVersion, bus: EventBus) -> Self {
50 Self::Optimistic(TransactionOptimistic::new(store, single, bus))
51 }
52
53 pub fn serializable(store: TransactionStore, single: TransactionSingleVersion, bus: EventBus) -> Self {
54 Self::Serializable(TransactionSerializable::new(store, single, bus))
55 }
56}
57
58pub enum StandardQueryTransaction {
59 Optimistic(OptimisticQueryTransaction),
60 Serializable(SerializableQueryTransaction),
61}
62
63pub enum StandardCommandTransaction {
64 Optimistic(OptimisticCommandTransaction),
65 Serializable(SerializableCommandTransaction),
66}
67
68impl WithEventBus for TransactionMultiVersion {
69 fn event_bus(&self) -> &EventBus {
70 match self {
71 TransactionMultiVersion::Optimistic(t) => t.event_bus(),
72 TransactionMultiVersion::Serializable(t) => t.event_bus(),
73 }
74 }
75}
76
77impl MultiVersionQueryTransaction for StandardQueryTransaction {
78 fn version(&self) -> CommitVersion {
79 match self {
80 StandardQueryTransaction::Optimistic(q) => q.version(),
81 StandardQueryTransaction::Serializable(q) => q.version(),
82 }
83 }
84
85 fn id(&self) -> TransactionId {
86 match self {
87 StandardQueryTransaction::Optimistic(q) => q.tm.id(),
88 StandardQueryTransaction::Serializable(q) => q.tm.id(),
89 }
90 }
91
92 fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>, Error> {
93 match self {
94 StandardQueryTransaction::Optimistic(q) => Ok(q.get(key)?),
95 StandardQueryTransaction::Serializable(q) => Ok(q.get(key)?),
96 }
97 }
98
99 fn contains_key(&mut self, key: &EncodedKey) -> Result<bool, Error> {
100 match self {
101 StandardQueryTransaction::Optimistic(q) => q.contains_key(key),
102 StandardQueryTransaction::Serializable(q) => q.contains_key(key),
103 }
104 }
105
106 fn range_batched(&mut self, range: EncodedKeyRange, batch_size: u64) -> Result<BoxedMultiVersionIter, Error> {
107 match self {
108 StandardQueryTransaction::Optimistic(q) => {
109 let iter = q.range_batched(range, batch_size)?;
110 Ok(Box::new(iter.into_iter()))
111 }
112 StandardQueryTransaction::Serializable(q) => {
113 let iter = q.range_batched(range, batch_size)?;
114 Ok(Box::new(iter.into_iter()))
115 }
116 }
117 }
118
119 fn range_rev_batched(
120 &mut self,
121 range: EncodedKeyRange,
122 batch_size: u64,
123 ) -> Result<BoxedMultiVersionIter, Error> {
124 match self {
125 StandardQueryTransaction::Optimistic(q) => {
126 let iter = q.range_rev_batched(range, batch_size)?;
127 Ok(Box::new(iter.into_iter()))
128 }
129 StandardQueryTransaction::Serializable(q) => {
130 let iter = q.range_rev_batched(range, batch_size)?;
131 Ok(Box::new(iter.into_iter()))
132 }
133 }
134 }
135
136 fn prefix(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter, Error> {
137 match self {
138 StandardQueryTransaction::Optimistic(q) => {
139 let iter = q.prefix(prefix)?;
140 Ok(Box::new(iter.into_iter()))
141 }
142 StandardQueryTransaction::Serializable(q) => {
143 let iter = q.prefix(prefix)?;
144 Ok(Box::new(iter.into_iter()))
145 }
146 }
147 }
148
149 fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter, Error> {
150 match self {
151 StandardQueryTransaction::Optimistic(q) => {
152 let iter = q.prefix_rev(prefix)?;
153 Ok(Box::new(iter.into_iter()))
154 }
155 StandardQueryTransaction::Serializable(q) => {
156 let iter = q.prefix_rev(prefix)?;
157 Ok(Box::new(iter.into_iter()))
158 }
159 }
160 }
161
162 fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<(), Error> {
163 match self {
164 StandardQueryTransaction::Optimistic(q) => {
165 q.read_as_of_version_exclusive(version);
166 Ok(())
167 }
168 StandardQueryTransaction::Serializable(q) => {
169 q.read_as_of_version_exclusive(version);
170 Ok(())
171 }
172 }
173 }
174}
175
176impl MultiVersionCommandTransaction for StandardCommandTransaction {
177 fn set(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<(), Error> {
178 match self {
179 StandardCommandTransaction::Optimistic(c) => c.set(key, values),
180 StandardCommandTransaction::Serializable(c) => c.set(key, values),
181 }
182 }
183
184 fn remove(&mut self, key: &EncodedKey) -> Result<(), Error> {
185 match self {
186 StandardCommandTransaction::Optimistic(c) => c.remove(key),
187 StandardCommandTransaction::Serializable(c) => c.remove(key),
188 }
189 }
190
191 fn commit(self) -> Result<CommitVersion, Error> {
192 match self {
193 StandardCommandTransaction::Optimistic(c) => c.commit(),
194 StandardCommandTransaction::Serializable(c) => c.commit(),
195 }
196 }
197
198 fn rollback(self) -> Result<(), Error> {
199 Ok(())
201 }
202}
203
204impl MultiVersionQueryTransaction for StandardCommandTransaction {
205 fn version(&self) -> CommitVersion {
206 match self {
207 StandardCommandTransaction::Optimistic(c) => c.tm.version(),
208 StandardCommandTransaction::Serializable(c) => c.tm.version(),
209 }
210 }
211
212 fn id(&self) -> TransactionId {
213 match self {
214 StandardCommandTransaction::Optimistic(c) => c.tm.id(),
215 StandardCommandTransaction::Serializable(c) => c.tm.id(),
216 }
217 }
218
219 fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>, Error> {
220 match self {
221 StandardCommandTransaction::Optimistic(c) => {
222 Ok(c.get(key)?.map(|tv| tv.into_multi_version_values()))
223 }
224 StandardCommandTransaction::Serializable(c) => {
225 Ok(c.get(key)?.map(|tv| tv.into_multi_version_values()))
226 }
227 }
228 }
229
230 fn contains_key(&mut self, key: &EncodedKey) -> Result<bool, Error> {
231 match self {
232 StandardCommandTransaction::Optimistic(c) => c.contains_key(key),
233 StandardCommandTransaction::Serializable(c) => c.contains_key(key),
234 }
235 }
236
237 fn range_batched(&mut self, range: EncodedKeyRange, batch_size: u64) -> Result<BoxedMultiVersionIter, Error> {
238 match self {
239 StandardCommandTransaction::Optimistic(c) => {
240 let iter = c.range_batched(range, batch_size)?;
241 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
242 }
243 StandardCommandTransaction::Serializable(c) => {
244 let iter = c.range_batched(range, batch_size)?;
245 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
246 }
247 }
248 }
249
250 fn range_rev_batched(
251 &mut self,
252 range: EncodedKeyRange,
253 batch_size: u64,
254 ) -> Result<BoxedMultiVersionIter, Error> {
255 match self {
256 StandardCommandTransaction::Optimistic(c) => {
257 let iter = c.range_rev_batched(range, batch_size)?;
258 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
259 }
260 StandardCommandTransaction::Serializable(c) => {
261 let iter = c.range_rev_batched(range, batch_size)?;
262 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
263 }
264 }
265 }
266
267 fn prefix(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter, Error> {
268 match self {
269 StandardCommandTransaction::Optimistic(c) => {
270 let iter = c.prefix(prefix)?;
271 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
272 }
273 StandardCommandTransaction::Serializable(c) => {
274 let iter = c.prefix(prefix)?;
275 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
276 }
277 }
278 }
279
280 fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter, Error> {
281 match self {
282 StandardCommandTransaction::Optimistic(c) => {
283 let iter = c.prefix_rev(prefix)?;
284 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
285 }
286 StandardCommandTransaction::Serializable(c) => {
287 let iter = c.prefix_rev(prefix)?;
288 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
289 }
290 }
291 }
292
293 fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<(), Error> {
294 match self {
295 StandardCommandTransaction::Optimistic(c) => {
296 c.read_as_of_version_exclusive(version);
297 Ok(())
298 }
299 StandardCommandTransaction::Serializable(c) => {
300 c.read_as_of_version_exclusive(version);
301 Ok(())
302 }
303 }
304 }
305}
306
307impl StandardCommandTransaction {
308 pub fn pending_writes(&self) -> &PendingWrites {
310 match self {
311 StandardCommandTransaction::Optimistic(c) => c.pending_writes(),
312 StandardCommandTransaction::Serializable(c) => c.pending_writes(),
313 }
314 }
315}
316
317impl MultiVersionTransaction for TransactionMultiVersion {
318 type Query = StandardQueryTransaction;
319 type Command = StandardCommandTransaction;
320
321 fn begin_query(&self) -> Result<Self::Query, Error> {
322 match self {
323 TransactionMultiVersion::Optimistic(t) => {
324 Ok(StandardQueryTransaction::Optimistic(t.begin_query()?))
325 }
326 TransactionMultiVersion::Serializable(t) => {
327 Ok(StandardQueryTransaction::Serializable(t.begin_query()?))
328 }
329 }
330 }
331
332 fn begin_command(&self) -> Result<Self::Command, Error> {
333 match self {
334 TransactionMultiVersion::Optimistic(t) => {
335 Ok(StandardCommandTransaction::Optimistic(t.begin_command()?))
336 }
337 TransactionMultiVersion::Serializable(t) => {
338 Ok(StandardCommandTransaction::Serializable(t.begin_command()?))
339 }
340 }
341 }
342}