reifydb_transaction/multi/
mod.rs1use std::time::Duration;
5
6use reifydb_core::{
7 CommitVersion, EncodedKey, EncodedKeyRange, Error,
8 event::EventBus,
9 interface::{
10 BoxedMultiVersionIter, MultiVersionCommandTransaction, MultiVersionQueryTransaction,
11 MultiVersionTransaction, MultiVersionValues, TransactionId, WithEventBus,
12 },
13 value::encoded::EncodedValues,
14};
15use reifydb_store_transaction::TransactionStore;
16
17#[derive(Debug, Clone)]
19pub struct AwaitWatermarkError {
20 pub version: CommitVersion,
21 pub timeout: Duration,
22}
23
24impl std::fmt::Display for AwaitWatermarkError {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 write!(f, "Timeout waiting for watermark to reach version {} after {:?}", self.version.0, self.timeout)
27 }
28}
29
30impl std::error::Error for AwaitWatermarkError {}
31
32use crate::{
33 multi::{
34 pending::PendingWrites,
35 transaction::{
36 optimistic::{
37 CommandTransaction as OptimisticCommandTransaction,
38 QueryTransaction as OptimisticQueryTransaction, TransactionOptimistic,
39 },
40 serializable::{
41 CommandTransaction as SerializableCommandTransaction,
42 QueryTransaction as SerializableQueryTransaction, TransactionSerializable,
43 },
44 },
45 },
46 single::TransactionSingleVersion,
47};
48
49pub mod conflict;
50pub mod marker;
51pub mod optimistic;
52pub mod pending;
53pub mod serializable;
54pub mod transaction;
55pub mod types;
56pub mod watermark;
57
58#[repr(u8)]
59#[derive(Clone)]
60pub enum TransactionMultiVersion {
61 Optimistic(TransactionOptimistic) = 0,
62 Serializable(TransactionSerializable) = 1,
63}
64
65impl TransactionMultiVersion {
66 pub fn optimistic(store: TransactionStore, single: TransactionSingleVersion, bus: EventBus) -> Self {
67 Self::Optimistic(TransactionOptimistic::new(store, single, bus))
68 }
69
70 pub fn serializable(store: TransactionStore, single: TransactionSingleVersion, bus: EventBus) -> Self {
71 Self::Serializable(TransactionSerializable::new(store, single, bus))
72 }
73
74 pub fn try_wait_for_watermark(
78 &self,
79 version: CommitVersion,
80 timeout: Duration,
81 ) -> Result<(), AwaitWatermarkError> {
82 match self {
83 Self::Optimistic(t) => t.tm.try_wait_for_watermark(version, timeout),
84 Self::Serializable(t) => t.tm.try_wait_for_watermark(version, timeout),
85 }
86 }
87
88 pub fn current_version(&self) -> crate::Result<CommitVersion> {
90 match self {
91 Self::Optimistic(t) => t.tm.version(),
92 Self::Serializable(t) => t.tm.version(),
93 }
94 }
95
96 pub fn done_until(&self) -> CommitVersion {
100 match self {
101 Self::Optimistic(t) => t.tm.done_until(),
102 Self::Serializable(t) => t.tm.done_until(),
103 }
104 }
105
106 pub fn watermarks(&self) -> (CommitVersion, CommitVersion) {
108 match self {
109 Self::Optimistic(t) => t.tm.watermarks(),
110 Self::Serializable(t) => t.tm.watermarks(),
111 }
112 }
113}
114
115pub enum StandardQueryTransaction {
116 Optimistic(OptimisticQueryTransaction),
117 Serializable(SerializableQueryTransaction),
118}
119
120pub enum StandardCommandTransaction {
121 Optimistic(OptimisticCommandTransaction),
122 Serializable(SerializableCommandTransaction),
123}
124
125impl WithEventBus for TransactionMultiVersion {
126 fn event_bus(&self) -> &EventBus {
127 match self {
128 TransactionMultiVersion::Optimistic(t) => t.event_bus(),
129 TransactionMultiVersion::Serializable(t) => t.event_bus(),
130 }
131 }
132}
133
134impl MultiVersionQueryTransaction for StandardQueryTransaction {
135 fn version(&self) -> CommitVersion {
136 match self {
137 StandardQueryTransaction::Optimistic(q) => q.version(),
138 StandardQueryTransaction::Serializable(q) => q.version(),
139 }
140 }
141
142 fn id(&self) -> TransactionId {
143 match self {
144 StandardQueryTransaction::Optimistic(q) => q.tm.id(),
145 StandardQueryTransaction::Serializable(q) => q.tm.id(),
146 }
147 }
148
149 fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>, Error> {
150 match self {
151 StandardQueryTransaction::Optimistic(q) => Ok(q.get(key)?),
152 StandardQueryTransaction::Serializable(q) => Ok(q.get(key)?),
153 }
154 }
155
156 fn contains_key(&mut self, key: &EncodedKey) -> Result<bool, Error> {
157 match self {
158 StandardQueryTransaction::Optimistic(q) => q.contains_key(key),
159 StandardQueryTransaction::Serializable(q) => q.contains_key(key),
160 }
161 }
162
163 fn range_batched(
164 &mut self,
165 range: EncodedKeyRange,
166 batch_size: u64,
167 ) -> Result<BoxedMultiVersionIter<'_>, Error> {
168 match self {
169 StandardQueryTransaction::Optimistic(q) => {
170 let iter = q.range_batched(range, batch_size)?;
171 Ok(Box::new(iter.into_iter()))
172 }
173 StandardQueryTransaction::Serializable(q) => {
174 let iter = q.range_batched(range, batch_size)?;
175 Ok(Box::new(iter.into_iter()))
176 }
177 }
178 }
179
180 fn range_rev_batched(
181 &mut self,
182 range: EncodedKeyRange,
183 batch_size: u64,
184 ) -> Result<BoxedMultiVersionIter<'_>, Error> {
185 match self {
186 StandardQueryTransaction::Optimistic(q) => {
187 let iter = q.range_rev_batched(range, batch_size)?;
188 Ok(Box::new(iter.into_iter()))
189 }
190 StandardQueryTransaction::Serializable(q) => {
191 let iter = q.range_rev_batched(range, batch_size)?;
192 Ok(Box::new(iter.into_iter()))
193 }
194 }
195 }
196
197 fn prefix(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter<'_>, Error> {
198 match self {
199 StandardQueryTransaction::Optimistic(q) => {
200 let iter = q.prefix(prefix)?;
201 Ok(Box::new(iter.into_iter()))
202 }
203 StandardQueryTransaction::Serializable(q) => {
204 let iter = q.prefix(prefix)?;
205 Ok(Box::new(iter.into_iter()))
206 }
207 }
208 }
209
210 fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter<'_>, Error> {
211 match self {
212 StandardQueryTransaction::Optimistic(q) => {
213 let iter = q.prefix_rev(prefix)?;
214 Ok(Box::new(iter.into_iter()))
215 }
216 StandardQueryTransaction::Serializable(q) => {
217 let iter = q.prefix_rev(prefix)?;
218 Ok(Box::new(iter.into_iter()))
219 }
220 }
221 }
222
223 fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<(), Error> {
224 match self {
225 StandardQueryTransaction::Optimistic(q) => {
226 q.read_as_of_version_exclusive(version);
227 Ok(())
228 }
229 StandardQueryTransaction::Serializable(q) => {
230 q.read_as_of_version_exclusive(version);
231 Ok(())
232 }
233 }
234 }
235}
236
237impl MultiVersionCommandTransaction for StandardCommandTransaction {
238 fn set(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<(), Error> {
239 match self {
240 StandardCommandTransaction::Optimistic(c) => c.set(key, values),
241 StandardCommandTransaction::Serializable(c) => c.set(key, values),
242 }
243 }
244
245 fn remove(&mut self, key: &EncodedKey) -> Result<(), Error> {
246 match self {
247 StandardCommandTransaction::Optimistic(c) => c.remove(key),
248 StandardCommandTransaction::Serializable(c) => c.remove(key),
249 }
250 }
251
252 fn commit(self) -> Result<CommitVersion, Error> {
253 match self {
254 StandardCommandTransaction::Optimistic(c) => c.commit(),
255 StandardCommandTransaction::Serializable(c) => c.commit(),
256 }
257 }
258
259 fn rollback(self) -> Result<(), Error> {
260 Ok(())
262 }
263}
264
265impl MultiVersionQueryTransaction for StandardCommandTransaction {
266 fn version(&self) -> CommitVersion {
267 match self {
268 StandardCommandTransaction::Optimistic(c) => c.tm.version(),
269 StandardCommandTransaction::Serializable(c) => c.tm.version(),
270 }
271 }
272
273 fn id(&self) -> TransactionId {
274 match self {
275 StandardCommandTransaction::Optimistic(c) => c.tm.id(),
276 StandardCommandTransaction::Serializable(c) => c.tm.id(),
277 }
278 }
279
280 fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>, Error> {
281 match self {
282 StandardCommandTransaction::Optimistic(c) => {
283 Ok(c.get(key)?.map(|tv| tv.into_multi_version_values()))
284 }
285 StandardCommandTransaction::Serializable(c) => {
286 Ok(c.get(key)?.map(|tv| tv.into_multi_version_values()))
287 }
288 }
289 }
290
291 fn contains_key(&mut self, key: &EncodedKey) -> Result<bool, Error> {
292 match self {
293 StandardCommandTransaction::Optimistic(c) => c.contains_key(key),
294 StandardCommandTransaction::Serializable(c) => c.contains_key(key),
295 }
296 }
297
298 fn range_batched(
299 &mut self,
300 range: EncodedKeyRange,
301 batch_size: u64,
302 ) -> Result<BoxedMultiVersionIter<'_>, Error> {
303 match self {
304 StandardCommandTransaction::Optimistic(c) => {
305 let iter = c.range_batched(range, batch_size)?;
306 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
307 }
308 StandardCommandTransaction::Serializable(c) => {
309 let iter = c.range_batched(range, batch_size)?;
310 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
311 }
312 }
313 }
314
315 fn range_rev_batched(
316 &mut self,
317 range: EncodedKeyRange,
318 batch_size: u64,
319 ) -> Result<BoxedMultiVersionIter<'_>, Error> {
320 match self {
321 StandardCommandTransaction::Optimistic(c) => {
322 let iter = c.range_rev_batched(range, batch_size)?;
323 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
324 }
325 StandardCommandTransaction::Serializable(c) => {
326 let iter = c.range_rev_batched(range, batch_size)?;
327 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
328 }
329 }
330 }
331
332 fn prefix(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter<'_>, Error> {
333 match self {
334 StandardCommandTransaction::Optimistic(c) => {
335 let iter = c.prefix(prefix)?;
336 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
337 }
338 StandardCommandTransaction::Serializable(c) => {
339 let iter = c.prefix(prefix)?;
340 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
341 }
342 }
343 }
344
345 fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<BoxedMultiVersionIter<'_>, Error> {
346 match self {
347 StandardCommandTransaction::Optimistic(c) => {
348 let iter = c.prefix_rev(prefix)?;
349 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
350 }
351 StandardCommandTransaction::Serializable(c) => {
352 let iter = c.prefix_rev(prefix)?;
353 Ok(Box::new(iter.into_iter().map(|tv| tv.into_multi_version_values())))
354 }
355 }
356 }
357
358 fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<(), Error> {
359 match self {
360 StandardCommandTransaction::Optimistic(c) => {
361 c.read_as_of_version_exclusive(version);
362 Ok(())
363 }
364 StandardCommandTransaction::Serializable(c) => {
365 c.read_as_of_version_exclusive(version);
366 Ok(())
367 }
368 }
369 }
370}
371
372impl StandardCommandTransaction {
373 pub fn pending_writes(&self) -> &PendingWrites {
375 match self {
376 StandardCommandTransaction::Optimistic(c) => c.pending_writes(),
377 StandardCommandTransaction::Serializable(c) => c.pending_writes(),
378 }
379 }
380}
381
382impl MultiVersionTransaction for TransactionMultiVersion {
383 type Query = StandardQueryTransaction;
384 type Command = StandardCommandTransaction;
385
386 fn begin_query(&self) -> Result<Self::Query, Error> {
387 match self {
388 TransactionMultiVersion::Optimistic(t) => {
389 Ok(StandardQueryTransaction::Optimistic(t.begin_query()?))
390 }
391 TransactionMultiVersion::Serializable(t) => {
392 Ok(StandardQueryTransaction::Serializable(t.begin_query()?))
393 }
394 }
395 }
396
397 fn begin_command(&self) -> Result<Self::Command, Error> {
398 match self {
399 TransactionMultiVersion::Optimistic(t) => {
400 Ok(StandardCommandTransaction::Optimistic(t.begin_command()?))
401 }
402 TransactionMultiVersion::Serializable(t) => {
403 Ok(StandardCommandTransaction::Serializable(t.begin_command()?))
404 }
405 }
406 }
407}