reifydb_transaction/multi/transaction/serializable/
mod.rs1use std::{ops::Deref, sync::Arc};
13
14use TransactionSingleVersion::SingleVersionLock;
15pub use command::*;
16pub use query::*;
17use reifydb_core::{CommitVersion, EncodedKey, EncodedKeyRange, event::EventBus};
18use reifydb_store_transaction::{
19 MultiVersionContains, MultiVersionGet, MultiVersionRange, MultiVersionRangeRev, TransactionStore,
20};
21use reifydb_type::util::hex;
22use tracing::instrument;
23
24use crate::multi::transaction::version::StandardVersionProvider;
25
26#[allow(clippy::module_inception)]
27mod command;
28pub(crate) mod query;
29
30use crate::{
31 multi::transaction::{Committed, TransactionManager},
32 single::{TransactionSingleVersion, TransactionSvl},
33};
34
35pub struct TransactionSerializable(Arc<Inner>);
36
37pub struct Inner {
38 pub(crate) tm: TransactionManager<StandardVersionProvider>,
39 pub(crate) store: TransactionStore,
40 pub(crate) event_bus: EventBus,
41}
42
43impl Deref for TransactionSerializable {
44 type Target = Inner;
45
46 fn deref(&self) -> &Self::Target {
47 &self.0
48 }
49}
50
51impl Clone for TransactionSerializable {
52 fn clone(&self) -> Self {
53 Self(self.0.clone())
54 }
55}
56
57impl Inner {
58 fn new(store: TransactionStore, single: TransactionSingleVersion, event_bus: EventBus) -> Self {
59 let tm = TransactionManager::new(StandardVersionProvider::new(single).unwrap()).unwrap();
60
61 Self {
62 tm,
63 store,
64 event_bus,
65 }
66 }
67
68 fn version(&self) -> crate::Result<CommitVersion> {
69 self.tm.version()
70 }
71}
72
73impl TransactionSerializable {
74 pub fn testing() -> Self {
75 let store = TransactionStore::testing_memory();
76 let event_bus = EventBus::new();
77 Self::new(store.clone(), SingleVersionLock(TransactionSvl::new(store, event_bus.clone())), event_bus)
78 }
79}
80
81impl TransactionSerializable {
82 #[instrument(level = "debug", skip(store, single, event_bus))]
83 pub fn new(store: TransactionStore, single: TransactionSingleVersion, event_bus: EventBus) -> Self {
84 Self(Arc::new(Inner::new(store, single, event_bus)))
85 }
86}
87
88impl TransactionSerializable {
89 #[instrument(level = "trace", skip(self))]
90 pub fn version(&self) -> crate::Result<CommitVersion> {
91 self.0.version()
92 }
93
94 #[instrument(level = "debug", skip(self))]
95 pub fn begin_query(&self) -> crate::Result<QueryTransaction> {
96 QueryTransaction::new(self.clone(), None)
97 }
98}
99
100impl TransactionSerializable {
101 #[instrument(level = "debug", skip(self))]
102 pub fn begin_command(&self) -> crate::Result<CommandTransaction> {
103 CommandTransaction::new(self.clone())
104 }
105}
106
107pub enum Transaction {
108 Query(QueryTransaction),
109 Command(CommandTransaction),
110}
111
112impl TransactionSerializable {
113 #[instrument(level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
114 pub fn get(&self, key: &EncodedKey, version: CommitVersion) -> Result<Option<Committed>, reifydb_type::Error> {
115 Ok(self.store.get(key, version)?.map(|sv| sv.into()))
116 }
117
118 #[instrument(level = "trace", skip(self), fields(key_hex = %hex::encode(key.as_ref()), version = version.0))]
119 pub fn contains_key(&self, key: &EncodedKey, version: CommitVersion) -> Result<bool, reifydb_type::Error> {
120 self.store.contains(key, version)
121 }
122
123 #[instrument(level = "trace", skip(self), fields(version = version.0, batch_size = batch_size))]
124 pub fn range_batched(
125 &self,
126 range: EncodedKeyRange,
127 version: CommitVersion,
128 batch_size: u64,
129 ) -> reifydb_type::Result<<TransactionStore as MultiVersionRange>::RangeIter<'_>> {
130 self.store.range_batched(range, version, batch_size)
131 }
132
133 pub fn range(
134 &self,
135 range: EncodedKeyRange,
136 version: CommitVersion,
137 ) -> reifydb_type::Result<<TransactionStore as MultiVersionRange>::RangeIter<'_>> {
138 self.range_batched(range, version, 1024)
139 }
140
141 pub fn range_rev_batched(
142 &self,
143 range: EncodedKeyRange,
144 version: CommitVersion,
145 batch_size: u64,
146 ) -> reifydb_type::Result<<TransactionStore as MultiVersionRangeRev>::RangeIterRev<'_>> {
147 self.store.range_rev_batched(range, version, batch_size)
148 }
149
150 pub fn range_rev(
151 &self,
152 range: EncodedKeyRange,
153 version: CommitVersion,
154 ) -> reifydb_type::Result<<TransactionStore as MultiVersionRangeRev>::RangeIterRev<'_>> {
155 self.range_rev_batched(range, version, 1024)
156 }
157}