reifydb_engine/transaction/
query.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::marker::PhantomData;
5
6use reifydb_catalog::{MaterializedCatalog, transaction::MaterializedCatalogTransaction};
7use reifydb_core::{
8	CommitVersion, EncodedKey, EncodedKeyRange,
9	interface::{
10		BoxedMultiVersionIter, CdcTransaction, MultiVersionQueryTransaction, MultiVersionTransaction,
11		MultiVersionValues, QueryTransaction, SingleVersionTransaction, TransactionId, TransactionalChanges,
12	},
13};
14use reifydb_transaction::{multi::TransactionMultiVersion, single::TransactionSingleVersion};
15use tracing::instrument;
16
17use crate::transaction::TransactionCdc;
18
19/// An active query transaction that holds a multi query transaction
20/// and provides query-only access to single storage.
21pub struct StandardQueryTransaction {
22	pub(crate) multi: <TransactionMultiVersion as MultiVersionTransaction>::Query,
23	pub(crate) single: TransactionSingleVersion,
24	pub(crate) cdc: TransactionCdc,
25	pub(crate) catalog: MaterializedCatalog,
26	// Marker to prevent Send and Sync
27	_not_send_sync: PhantomData<*const ()>,
28}
29
30impl StandardQueryTransaction {
31	/// Creates a new active query transaction
32	#[instrument(level = "debug", skip_all)]
33	pub fn new(
34		multi: <TransactionMultiVersion as MultiVersionTransaction>::Query,
35		single: TransactionSingleVersion,
36		cdc: TransactionCdc,
37		catalog: MaterializedCatalog,
38	) -> Self {
39		Self {
40			multi,
41			single,
42			cdc,
43			catalog,
44			_not_send_sync: PhantomData,
45		}
46	}
47
48	/// Execute a function with query access to the single transaction.
49	#[instrument(level = "trace", skip(self, keys, f))]
50	pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> crate::Result<R>
51	where
52		I: IntoIterator<Item = &'a EncodedKey>,
53		F: FnOnce(&mut <TransactionSingleVersion as SingleVersionTransaction>::Query<'_>) -> crate::Result<R>,
54	{
55		self.single.with_query(keys, f)
56	}
57
58	/// Execute a function with access to the multi query transaction.
59	/// This operates within the same transaction context.
60	#[instrument(level = "trace", skip(self, f))]
61	pub fn with_multi_query<F, R>(&mut self, f: F) -> crate::Result<R>
62	where
63		F: FnOnce(&mut <TransactionMultiVersion as MultiVersionTransaction>::Query) -> crate::Result<R>,
64	{
65		f(&mut self.multi)
66	}
67
68	/// Get access to the CDC transaction interface
69	#[instrument(level = "trace", skip(self))]
70	pub fn cdc(&self) -> &TransactionCdc {
71		&self.cdc
72	}
73}
74
75impl MultiVersionQueryTransaction for StandardQueryTransaction {
76	#[inline]
77	fn version(&self) -> CommitVersion {
78		MultiVersionQueryTransaction::version(&self.multi)
79	}
80
81	#[inline]
82	fn id(&self) -> TransactionId {
83		MultiVersionQueryTransaction::id(&self.multi)
84	}
85
86	#[inline]
87	fn get(&mut self, key: &EncodedKey) -> crate::Result<Option<MultiVersionValues>> {
88		MultiVersionQueryTransaction::get(&mut self.multi, key)
89	}
90
91	#[inline]
92	fn contains_key(&mut self, key: &EncodedKey) -> crate::Result<bool> {
93		MultiVersionQueryTransaction::contains_key(&mut self.multi, key)
94	}
95
96	#[inline]
97	fn range_batched(
98		&mut self,
99		range: EncodedKeyRange,
100		batch_size: u64,
101	) -> crate::Result<BoxedMultiVersionIter<'_>> {
102		MultiVersionQueryTransaction::range_batched(&mut self.multi, range, batch_size)
103	}
104
105	#[inline]
106	fn range_rev_batched(
107		&mut self,
108		range: EncodedKeyRange,
109		batch_size: u64,
110	) -> crate::Result<BoxedMultiVersionIter<'_>> {
111		MultiVersionQueryTransaction::range_rev_batched(&mut self.multi, range, batch_size)
112	}
113
114	#[inline]
115	fn prefix(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter<'_>> {
116		MultiVersionQueryTransaction::prefix(&mut self.multi, prefix)
117	}
118
119	#[inline]
120	fn prefix_rev(&mut self, prefix: &EncodedKey) -> crate::Result<BoxedMultiVersionIter<'_>> {
121		MultiVersionQueryTransaction::prefix_rev(&mut self.multi, prefix)
122	}
123
124	#[inline]
125	fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> crate::Result<()> {
126		MultiVersionQueryTransaction::read_as_of_version_exclusive(&mut self.multi, version)
127	}
128}
129
130impl QueryTransaction for StandardQueryTransaction {
131	type SingleVersionQuery<'a> = <TransactionSingleVersion as SingleVersionTransaction>::Query<'a>;
132	type CdcQuery<'a>
133		= <TransactionCdc as CdcTransaction>::Query<'a>
134	where
135		Self: 'a;
136
137	fn begin_single_query<'a, I>(&self, keys: I) -> crate::Result<Self::SingleVersionQuery<'_>>
138	where
139		I: IntoIterator<Item = &'a EncodedKey>,
140	{
141		self.single.begin_query(keys)
142	}
143
144	fn begin_cdc_query(&self) -> crate::Result<Self::CdcQuery<'_>> {
145		self.cdc.begin_query()
146	}
147}
148
149impl MaterializedCatalogTransaction for StandardQueryTransaction {
150	fn catalog(&self) -> &MaterializedCatalog {
151		&self.catalog
152	}
153}
154
155impl TransactionalChanges for StandardQueryTransaction {}