reifydb_transaction/multi/
multi.rs1use async_trait::async_trait;
5use reifydb_core::{
6 CommitVersion, EncodedKey, EncodedKeyRange, Error,
7 event::EventBus,
8 interface::{
9 MultiVersionBatch, MultiVersionCommandTransaction, MultiVersionQueryTransaction,
10 MultiVersionTransaction, MultiVersionValues, TransactionId, WithEventBus,
11 },
12 value::encoded::EncodedValues,
13};
14
15use crate::multi::transaction::{CommandTransaction, QueryTransaction, TransactionMulti};
16
17impl WithEventBus for TransactionMulti {
18 fn event_bus(&self) -> &EventBus {
19 &self.event_bus
20 }
21}
22
23#[async_trait]
24impl MultiVersionTransaction for TransactionMulti {
25 type Query = QueryTransaction;
26 type Command = CommandTransaction;
27
28 async fn begin_query(&self) -> Result<Self::Query, Error> {
29 TransactionMulti::begin_query(self).await
30 }
31
32 async fn begin_command(&self) -> Result<Self::Command, Error> {
33 TransactionMulti::begin_command(self).await
34 }
35}
36
37#[async_trait]
38impl MultiVersionQueryTransaction for QueryTransaction {
39 fn version(&self) -> CommitVersion {
40 self.tm.version()
41 }
42
43 fn id(&self) -> TransactionId {
44 self.tm.id()
45 }
46
47 async fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>, Error> {
48 Ok(QueryTransaction::get(self, key).await?.map(|tv| MultiVersionValues {
49 key: tv.key().clone(),
50 values: tv.values().clone(),
51 version: tv.version(),
52 }))
53 }
54
55 async fn contains_key(&mut self, key: &EncodedKey) -> Result<bool, Error> {
56 QueryTransaction::contains_key(self, key).await
57 }
58
59 async fn range_batch(&mut self, range: EncodedKeyRange, batch_size: u64) -> Result<MultiVersionBatch, Error> {
60 let batch = QueryTransaction::range_batch(self, range, batch_size).await?;
61 Ok(MultiVersionBatch {
62 items: batch
63 .items
64 .into_iter()
65 .map(|mv| MultiVersionValues {
66 key: mv.key,
67 values: mv.values,
68 version: mv.version,
69 })
70 .collect(),
71 has_more: batch.has_more,
72 })
73 }
74
75 async fn range_rev_batch(
76 &mut self,
77 range: EncodedKeyRange,
78 batch_size: u64,
79 ) -> Result<MultiVersionBatch, Error> {
80 let batch = QueryTransaction::range_rev_batch(self, range, batch_size).await?;
81 Ok(MultiVersionBatch {
82 items: batch
83 .items
84 .into_iter()
85 .map(|mv| MultiVersionValues {
86 key: mv.key,
87 values: mv.values,
88 version: mv.version,
89 })
90 .collect(),
91 has_more: batch.has_more,
92 })
93 }
94
95 async fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<(), Error> {
96 QueryTransaction::read_as_of_version_exclusive(self, version);
97 Ok(())
98 }
99}
100
101#[async_trait]
102impl MultiVersionQueryTransaction for CommandTransaction {
103 fn version(&self) -> CommitVersion {
104 self.tm.version()
105 }
106
107 fn id(&self) -> TransactionId {
108 self.tm.id()
109 }
110
111 async fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>, Error> {
112 Ok(CommandTransaction::get(self, key).await?.map(|tv| MultiVersionValues {
113 key: tv.key().clone(),
114 values: tv.values().clone(),
115 version: tv.version(),
116 }))
117 }
118
119 async fn contains_key(&mut self, key: &EncodedKey) -> Result<bool, Error> {
120 CommandTransaction::contains_key(self, key).await
121 }
122
123 async fn range_batch(&mut self, range: EncodedKeyRange, batch_size: u64) -> Result<MultiVersionBatch, Error> {
124 let batch = CommandTransaction::range_batch(self, range, batch_size).await?;
125 Ok(MultiVersionBatch {
126 items: batch.items,
127 has_more: batch.has_more,
128 })
129 }
130
131 async fn range_rev_batch(
132 &mut self,
133 range: EncodedKeyRange,
134 batch_size: u64,
135 ) -> Result<MultiVersionBatch, Error> {
136 let batch = CommandTransaction::range_rev_batch(self, range, batch_size).await?;
137 Ok(MultiVersionBatch {
138 items: batch.items,
139 has_more: batch.has_more,
140 })
141 }
142
143 async fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<(), Error> {
144 CommandTransaction::read_as_of_version_exclusive(self, version);
145 Ok(())
146 }
147}
148
149#[async_trait]
150impl MultiVersionCommandTransaction for CommandTransaction {
151 async fn set(&mut self, key: &EncodedKey, values: EncodedValues) -> Result<(), Error> {
152 CommandTransaction::set(self, key, values)?;
153 Ok(())
154 }
155
156 async fn remove(&mut self, key: &EncodedKey) -> Result<(), Error> {
157 CommandTransaction::remove(self, key)?;
158 Ok(())
159 }
160
161 async fn commit(&mut self) -> Result<CommitVersion, Error> {
162 let version = CommandTransaction::commit(self).await?;
163 Ok(version)
164 }
165
166 async fn rollback(&mut self) -> Result<(), Error> {
167 CommandTransaction::rollback(self)?;
168 Ok(())
169 }
170}