reifydb_transaction/multi/transaction/
mod.rs1use core::mem;
13use std::{ops::Deref, sync::Arc, time::Duration};
14
15pub use command::*;
16use oracle::*;
17use reifydb_core::{CommitVersion, EncodedKey, EncodedKeyRange, event::EventBus, interface::TransactionId};
18use reifydb_store_transaction::{
19 MultiVersionContains, MultiVersionGet, MultiVersionRange, MultiVersionRangeRev, TransactionStore,
20};
21use reifydb_type::util::hex;
22use tracing::instrument;
23use version::{StandardVersionProvider, VersionProvider};
24
25pub use crate::multi::types::*;
26use crate::single::{TransactionSingleVersion, TransactionSvl};
27
28mod command;
29mod command_tx;
30mod oracle;
31mod oracle_cleanup;
32pub mod query;
33mod query_tx;
34pub mod range;
35pub mod range_rev;
36mod version;
37
38pub use command_tx::CommandTransaction;
39pub use oracle::MAX_COMMITTED_TXNS;
40pub use query_tx::QueryTransaction;
41
42use crate::multi::{
43 AwaitWatermarkError, conflict::ConflictManager, pending::PendingWrites,
44 transaction::query::TransactionManagerQuery,
45};
46
47pub struct TransactionManager<L>
48where
49 L: VersionProvider,
50{
51 inner: Arc<Oracle<L>>,
52}
53
54impl<L> Clone for TransactionManager<L>
55where
56 L: VersionProvider,
57{
58 fn clone(&self) -> Self {
59 Self {
60 inner: self.inner.clone(),
61 }
62 }
63}
64
65impl<L> TransactionManager<L>
66where
67 L: VersionProvider,
68{
69 #[instrument(level = "debug", skip(self))]
70 pub fn write(&self) -> Result<TransactionManagerCommand<L>, reifydb_type::Error> {
71 Ok(TransactionManagerCommand {
72 id: TransactionId::generate(),
73 oracle: self.inner.clone(),
74 version: self.inner.version()?,
75 read_version: None,
76 size: 0,
77 count: 0,
78 conflicts: ConflictManager::new(),
79 pending_writes: PendingWrites::new(),
80 duplicates: Vec::new(),
81 discarded: false,
82 done_query: false,
83 })
84 }
85}
86
87impl<L> TransactionManager<L>
88where
89 L: VersionProvider,
90{
91 #[instrument(level = "debug", skip(clock))]
92 pub fn new(clock: L) -> crate::Result<Self> {
93 let version = clock.next()?;
94 Ok(Self {
95 inner: Arc::new({
96 let oracle = Oracle::new(clock);
97 oracle.query.done(version);
98 oracle.command.done(version);
99 oracle
100 }),
101 })
102 }
103
104 #[instrument(level = "trace", skip(self))]
105 pub fn version(&self) -> crate::Result<CommitVersion> {
106 self.inner.version()
107 }
108}
109
110impl<L> TransactionManager<L>
111where
112 L: VersionProvider,
113{
114 #[instrument(level = "trace", skip(self))]
115 pub fn discard_hint(&self) -> CommitVersion {
116 self.inner.discard_at_or_below()
117 }
118
119 #[instrument(level = "debug", skip(self), fields(as_of_version = ?version))]
120 pub fn query(&self, version: Option<CommitVersion>) -> crate::Result<TransactionManagerQuery<L>> {
121 Ok(if let Some(version) = version {
122 TransactionManagerQuery::new_time_travel(TransactionId::generate(), self.clone(), version)
123 } else {
124 TransactionManagerQuery::new_current(
125 TransactionId::generate(),
126 self.clone(),
127 self.inner.version()?,
128 )
129 })
130 }
131
132 #[instrument(level = "debug", skip(self))]
139 pub fn try_wait_for_watermark(
140 &self,
141 version: CommitVersion,
142 timeout: Duration,
143 ) -> Result<(), AwaitWatermarkError> {
144 if self.inner.command.wait_for_mark_timeout(version, timeout) {
145 Ok(())
146 } else {
147 Err(AwaitWatermarkError {
148 version,
149 timeout,
150 })
151 }
152 }
153
154 #[instrument(level = "trace", skip(self))]
158 pub fn done_until(&self) -> CommitVersion {
159 self.inner.command.done_until()
160 }
161
162 pub fn watermarks(&self) -> (CommitVersion, CommitVersion) {
164 (self.inner.query.done_until(), self.inner.command.done_until())
165 }
166}
167
168pub struct Transaction(Arc<Inner>);
173
174pub struct Inner {
175 pub(crate) tm: TransactionManager<StandardVersionProvider>,
176 pub(crate) store: TransactionStore,
177 pub(crate) event_bus: EventBus,
178}
179
180impl Deref for Transaction {
181 type Target = Inner;
182
183 fn deref(&self) -> &Self::Target {
184 &self.0
185 }
186}
187
188impl Clone for Transaction {
189 fn clone(&self) -> Self {
190 Self(self.0.clone())
191 }
192}
193
194impl Inner {
195 fn new(store: TransactionStore, single: TransactionSingleVersion, event_bus: EventBus) -> Self {
196 let tm = TransactionManager::new(StandardVersionProvider::new(single).unwrap()).unwrap();
197
198 Self {
199 tm,
200 store,
201 event_bus,
202 }
203 }
204
205 fn version(&self) -> crate::Result<CommitVersion> {
206 self.tm.version()
207 }
208}
209
210impl Transaction {
211 pub fn testing() -> Self {
212 let store = TransactionStore::testing_memory();
213 let event_bus = EventBus::new();
214 Self::new(
215 store.clone(),
216 TransactionSingleVersion::SingleVersionLock(TransactionSvl::new(store, event_bus.clone())),
217 event_bus,
218 )
219 }
220}
221
222impl Transaction {
223 #[instrument(level = "debug", skip(store, single, event_bus))]
224 pub fn new(store: TransactionStore, single: TransactionSingleVersion, event_bus: EventBus) -> Self {
225 Self(Arc::new(Inner::new(store, single, event_bus)))
226 }
227}
228
229impl Transaction {
230 #[instrument(level = "trace", skip(self))]
231 pub fn version(&self) -> crate::Result<CommitVersion> {
232 self.0.version()
233 }
234
235 #[instrument(level = "debug", skip(self))]
236 pub fn begin_query(&self) -> crate::Result<QueryTransaction> {
237 QueryTransaction::new(self.clone(), None)
238 }
239}
240
241impl Transaction {
242 #[instrument(level = "debug", skip(self))]
243 pub fn begin_command(&self) -> crate::Result<CommandTransaction> {
244 CommandTransaction::new(self.clone())
245 }
246}
247
248pub enum TransactionType {
249 Query(QueryTransaction),
250 Command(CommandTransaction),
251}
252
253impl Transaction {
254 #[instrument(level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
255 pub fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<Committed>, reifydb_type::Error> {
256 Ok(self.store.get(key, version)?.map(|sv| sv.into()))
257 }
258
259 #[instrument(level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
260 pub fn contains_key(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool, reifydb_type::Error> {
261 self.store.contains(key, version)
262 }
263
264 #[instrument(level = "trace", skip(self), fields(version = version.0, batch_size = batch_size))]
265 pub fn range_batched(
266 &self,
267 range: EncodedKeyRange,
268 version: CommitVersion,
269 batch_size: u64,
270 ) -> reifydb_type::Result<<TransactionStore as MultiVersionRange>::RangeIter<'_>> {
271 self.store.range_batched(range, version, batch_size)
272 }
273
274 pub fn range(
275 &self,
276 range: EncodedKeyRange,
277 version: CommitVersion,
278 ) -> reifydb_type::Result<<TransactionStore as MultiVersionRange>::RangeIter<'_>> {
279 self.range_batched(range, version, 1024)
280 }
281
282 pub fn range_rev_batched(
283 &self,
284 range: EncodedKeyRange,
285 version: CommitVersion,
286 batch_size: u64,
287 ) -> reifydb_type::Result<<TransactionStore as MultiVersionRangeRev>::RangeIterRev<'_>> {
288 self.store.range_rev_batched(range, version, batch_size)
289 }
290
291 pub fn range_rev(
292 &self,
293 range: EncodedKeyRange,
294 version: CommitVersion,
295 ) -> reifydb_type::Result<<TransactionStore as MultiVersionRangeRev>::RangeIterRev<'_>> {
296 self.range_rev_batched(range, version, 1024)
297 }
298}