reifydb_engine/transaction/
query.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use async_trait::async_trait;
5use reifydb_catalog::{MaterializedCatalog, transaction::MaterializedCatalogTransaction};
6use reifydb_core::{
7	CommitVersion, EncodedKey, EncodedKeyRange,
8	interface::{
9		CdcTransaction, MultiVersionBatch, MultiVersionQueryTransaction, MultiVersionTransaction,
10		MultiVersionValues, QueryTransaction, SingleVersionTransaction, TransactionId, TransactionalChanges,
11	},
12};
13use reifydb_transaction::{cdc::TransactionCdc, multi::TransactionMultiVersion, single::TransactionSingle};
14use tracing::instrument;
15
16/// An active query transaction that holds a multi query transaction
17/// and provides query-only access to single storage.
18pub struct StandardQueryTransaction {
19	pub(crate) multi: <TransactionMultiVersion as MultiVersionTransaction>::Query,
20	pub(crate) single: TransactionSingle,
21	pub(crate) cdc: TransactionCdc,
22	pub(crate) catalog: MaterializedCatalog,
23}
24
25impl StandardQueryTransaction {
26	/// Creates a new active query transaction
27	#[instrument(name = "engine::transaction::query::new", level = "debug", skip_all)]
28	pub fn new(
29		multi: <TransactionMultiVersion as MultiVersionTransaction>::Query,
30		single: TransactionSingle,
31		cdc: TransactionCdc,
32		catalog: MaterializedCatalog,
33	) -> Self {
34		Self {
35			multi,
36			single,
37			cdc,
38			catalog,
39		}
40	}
41
42	/// Execute a function with query access to the single transaction.
43	#[instrument(name = "engine::transaction::query::with_single_query", level = "trace", skip(self, keys, f))]
44	pub async fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
45	where
46		I: IntoIterator<Item = &'a EncodedKey> + Send,
47		F: FnOnce(&mut <TransactionSingle as SingleVersionTransaction>::Query<'_>) -> crate::Result<R> + Send,
48		R: Send,
49	{
50		self.single.with_query(keys, f).await
51	}
52
53	/// Execute a function with access to the multi query transaction.
54	/// This operates within the same transaction context.
55	#[instrument(name = "engine::transaction::query::with_multi_query", level = "trace", skip(self, f))]
56	pub fn with_multi_query<F, R>(&mut self, f: F) -> crate::Result<R>
57	where
58		F: FnOnce(&mut <TransactionMultiVersion as MultiVersionTransaction>::Query) -> crate::Result<R>,
59	{
60		f(&mut self.multi)
61	}
62
63	/// Get access to the CDC transaction interface
64	#[instrument(name = "engine::transaction::query::cdc", level = "trace", skip(self))]
65	pub fn cdc(&self) -> &TransactionCdc {
66		&self.cdc
67	}
68}
69
70#[async_trait]
71impl MultiVersionQueryTransaction for StandardQueryTransaction {
72	#[inline]
73	fn version(&self) -> CommitVersion {
74		MultiVersionQueryTransaction::version(&self.multi)
75	}
76
77	#[inline]
78	fn id(&self) -> TransactionId {
79		MultiVersionQueryTransaction::id(&self.multi)
80	}
81
82	#[inline]
83	async fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
84		MultiVersionQueryTransaction::get(&mut self.multi, key).await
85	}
86
87	#[inline]
88	async fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
89		MultiVersionQueryTransaction::contains_key(&mut self.multi, key).await
90	}
91
92	#[inline]
93	async fn range_batch(&mut self, range: EncodedKeyRange, batch_size: u64) -> crate::Result<MultiVersionBatch> {
94		MultiVersionQueryTransaction::range_batch(&mut self.multi, range, batch_size).await
95	}
96
97	#[inline]
98	async fn range_rev_batch(
99		&mut self,
100		range: EncodedKeyRange,
101		batch_size: u64,
102	) -> crate::Result<MultiVersionBatch> {
103		MultiVersionQueryTransaction::range_rev_batch(&mut self.multi, range, batch_size).await
104	}
105
106	#[inline]
107	async fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
108		MultiVersionQueryTransaction::read_as_of_version_exclusive(&mut self.multi, version).await
109	}
110}
111
112#[async_trait]
113impl QueryTransaction for StandardQueryTransaction {
114	type SingleVersionQuery<'a> = <TransactionSingle as SingleVersionTransaction>::Query<'a>;
115	type CdcQuery<'a>
116		= <TransactionCdc as CdcTransaction>::Query<'a>
117	where
118		Self: 'a;
119
120	async fn begin_single_query<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionQuery<'_>>
121	where
122		I: IntoIterator<Item = &'a EncodedKey> + Send,
123	{
124		self.single.begin_query(keys).await
125	}
126
127	async fn begin_cdc_query(&self) -> crate::Result<Self::CdcQuery<'_>> {
128		Ok(self.cdc.begin_query()?)
129	}
130}
131
132impl MaterializedCatalogTransaction for StandardQueryTransaction {
133	fn catalog(&self) -> &MaterializedCatalog {
134		&self.catalog
135	}
136}
137
138impl TransactionalChanges for StandardQueryTransaction {}