reifydb_engine/transaction/
query.rs1use async_trait::async_trait;
5use reifydb_catalog::{MaterializedCatalog, transaction::MaterializedCatalogTransaction};
6use reifydb_core::{
7 CommitVersion, EncodedKey, EncodedKeyRange,
8 interface::{
9 CdcTransaction, MultiVersionBatch, MultiVersionQueryTransaction, MultiVersionTransaction,
10 MultiVersionValues, QueryTransaction, SingleVersionTransaction, TransactionId, TransactionalChanges,
11 },
12};
13use reifydb_transaction::{cdc::TransactionCdc, multi::TransactionMultiVersion, single::TransactionSingle};
14use tracing::instrument;
15
16pub struct StandardQueryTransaction {
19 pub(crate) multi: <TransactionMultiVersion as MultiVersionTransaction>::Query,
20 pub(crate) single: TransactionSingle,
21 pub(crate) cdc: TransactionCdc,
22 pub(crate) catalog: MaterializedCatalog,
23}
24
25impl StandardQueryTransaction {
26 #[instrument(name = "engine::transaction::query::new", level = "debug", skip_all)]
28 pub fn new(
29 multi: <TransactionMultiVersion as MultiVersionTransaction>::Query,
30 single: TransactionSingle,
31 cdc: TransactionCdc,
32 catalog: MaterializedCatalog,
33 ) -> Self {
34 Self {
35 multi,
36 single,
37 cdc,
38 catalog,
39 }
40 }
41
42 #[instrument(name = "engine::transaction::query::with_single_query", level = "trace", skip(self, keys, f))]
44 pub async fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
45 where
46 I: IntoIterator<Item = &'a EncodedKey> + Send,
47 F: FnOnce(&mut <TransactionSingle as SingleVersionTransaction>::Query<'_>) -> crate::Result<R> + Send,
48 R: Send,
49 {
50 self.single.with_query(keys, f).await
51 }
52
53 #[instrument(name = "engine::transaction::query::with_multi_query", level = "trace", skip(self, f))]
56 pub fn with_multi_query<F, R>(&mut self, f: F) -> crate::Result<R>
57 where
58 F: FnOnce(&mut <TransactionMultiVersion as MultiVersionTransaction>::Query) -> crate::Result<R>,
59 {
60 f(&mut self.multi)
61 }
62
63 #[instrument(name = "engine::transaction::query::cdc", level = "trace", skip(self))]
65 pub fn cdc(&self) -> &TransactionCdc {
66 &self.cdc
67 }
68}
69
70#[async_trait]
71impl MultiVersionQueryTransaction for StandardQueryTransaction {
72 #[inline]
73 fn version(&self) -> CommitVersion {
74 MultiVersionQueryTransaction::version(&self.multi)
75 }
76
77 #[inline]
78 fn id(&self) -> TransactionId {
79 MultiVersionQueryTransaction::id(&self.multi)
80 }
81
82 #[inline]
83 async fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
84 MultiVersionQueryTransaction::get(&mut self.multi, key).await
85 }
86
87 #[inline]
88 async fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
89 MultiVersionQueryTransaction::contains_key(&mut self.multi, key).await
90 }
91
92 #[inline]
93 async fn range_batch(&mut self, range: EncodedKeyRange, batch_size: u64) -> crate::Result<MultiVersionBatch> {
94 MultiVersionQueryTransaction::range_batch(&mut self.multi, range, batch_size).await
95 }
96
97 #[inline]
98 async fn range_rev_batch(
99 &mut self,
100 range: EncodedKeyRange,
101 batch_size: u64,
102 ) -> crate::Result<MultiVersionBatch> {
103 MultiVersionQueryTransaction::range_rev_batch(&mut self.multi, range, batch_size).await
104 }
105
106 #[inline]
107 async fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
108 MultiVersionQueryTransaction::read_as_of_version_exclusive(&mut self.multi, version).await
109 }
110}
111
112#[async_trait]
113impl QueryTransaction for StandardQueryTransaction {
114 type SingleVersionQuery<'a> = <TransactionSingle as SingleVersionTransaction>::Query<'a>;
115 type CdcQuery<'a>
116 = <TransactionCdc as CdcTransaction>::Query<'a>
117 where
118 Self: 'a;
119
120 async fn begin_single_query<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionQuery<'_>>
121 where
122 I: IntoIterator<Item = &'a EncodedKey> + Send,
123 {
124 self.single.begin_query(keys).await
125 }
126
127 async fn begin_cdc_query(&self) -> crate::Result<Self::CdcQuery<'_>> {
128 Ok(self.cdc.begin_query()?)
129 }
130}
131
132impl MaterializedCatalogTransaction for StandardQueryTransaction {
133 fn catalog(&self) -> &MaterializedCatalog {
134 &self.catalog
135 }
136}
137
138impl TransactionalChanges for StandardQueryTransaction {}