Skip to main content

reifydb_transaction/transaction/
query.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::CommitVersion,
6	encoded::key::{EncodedKey, EncodedKeyRange},
7	interface::{
8		catalog::{
9			dictionary::DictionaryDef,
10			flow::{FlowDef, FlowId},
11			handler::HandlerDef,
12			id::{
13				HandlerId, MigrationId, NamespaceId, ProcedureId, RingBufferId, SeriesId,
14				SubscriptionId, TableId, TestId, ViewId,
15			},
16			migration::MigrationDef,
17			namespace::Namespace,
18			policy::{PolicyDef, PolicyId},
19			procedure::ProcedureDef,
20			ringbuffer::RingBufferDef,
21			series::SeriesDef,
22			subscription::SubscriptionDef,
23			sumtype::SumTypeDef,
24			table::TableDef,
25			test::TestDef,
26			user::{RoleDef, RoleId, UserDef, UserId, UserRoleDef},
27			user_authentication::{UserAuthenticationDef, UserAuthenticationId},
28			view::ViewDef,
29		},
30		store::{MultiVersionBatch, MultiVersionValues},
31	},
32};
33use reifydb_type::{
34	Result,
35	value::{dictionary::DictionaryId, sumtype::SumTypeId},
36};
37use tracing::instrument;
38
39use crate::{
40	TransactionId,
41	change::{
42		TransactionalChanges, TransactionalDictionaryChanges, TransactionalFlowChanges,
43		TransactionalHandlerChanges, TransactionalMigrationChanges, TransactionalNamespaceChanges,
44		TransactionalPolicyChanges, TransactionalProcedureChanges, TransactionalRingBufferChanges,
45		TransactionalRoleChanges, TransactionalSeriesChanges, TransactionalSubscriptionChanges,
46		TransactionalSumTypeChanges, TransactionalTableChanges, TransactionalTestChanges,
47		TransactionalUserAuthenticationChanges, TransactionalUserChanges, TransactionalUserRoleChanges,
48		TransactionalViewChanges,
49	},
50	multi::transaction::read::MultiReadTransaction,
51	single::{SingleTransaction, read::SingleReadTransaction},
52};
53
54/// An active query transaction that holds a multi query transaction
55/// and provides query-only access to single storage.
56pub struct QueryTransaction {
57	pub(crate) multi: MultiReadTransaction,
58	pub(crate) single: SingleTransaction,
59}
60
61impl QueryTransaction {
62	/// Creates a new active query transaction
63	#[instrument(name = "transaction::query::new", level = "debug", skip_all)]
64	pub fn new(multi: MultiReadTransaction, single: SingleTransaction) -> Self {
65		Self {
66			multi,
67			single,
68		}
69	}
70
71	/// Get the transaction version
72	#[inline]
73	pub fn version(&self) -> CommitVersion {
74		self.multi.version()
75	}
76
77	/// Get the transaction ID
78	#[inline]
79	pub fn id(&self) -> TransactionId {
80		self.multi.tm.id()
81	}
82
83	/// Get a value by key
84	#[inline]
85	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionValues>> {
86		Ok(self.multi.get(key)?.map(|v| v.into_multi_version_values()))
87	}
88
89	/// Check if a key exists
90	#[inline]
91	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
92		self.multi.contains_key(key)
93	}
94
95	/// Get a prefix batch
96	#[inline]
97	pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
98		self.multi.prefix(prefix)
99	}
100
101	/// Get a reverse prefix batch
102	#[inline]
103	pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
104		self.multi.prefix_rev(prefix)
105	}
106
107	/// Read as of version exclusive
108	#[inline]
109	pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
110		self.multi.read_as_of_version_exclusive(version);
111		Ok(())
112	}
113
114	/// Create a streaming iterator for forward range queries.
115	#[inline]
116	pub fn range(
117		&self,
118		range: EncodedKeyRange,
119		batch_size: usize,
120	) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_> {
121		self.multi.range(range, batch_size)
122	}
123
124	/// Create a streaming iterator for reverse range queries.
125	#[inline]
126	pub fn range_rev(
127		&self,
128		range: EncodedKeyRange,
129		batch_size: usize,
130	) -> Box<dyn Iterator<Item = Result<MultiVersionValues>> + Send + '_> {
131		self.multi.range_rev(range, batch_size)
132	}
133
134	/// Execute a function with query access to the single transaction.
135	#[instrument(name = "transaction::query::with_single_query", level = "trace", skip(self, keys, f))]
136	pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
137	where
138		I: IntoIterator<Item = &'a EncodedKey> + Send,
139		F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
140		R: Send,
141	{
142		self.single.with_query(keys, f)
143	}
144
145	/// Execute a function with access to the multi query transaction.
146	/// This operates within the same transaction context.
147	#[instrument(name = "transaction::query::with_multi_query", level = "trace", skip(self, f))]
148	pub fn with_multi_query<F, R>(&mut self, f: F) -> Result<R>
149	where
150		F: FnOnce(&mut MultiReadTransaction) -> Result<R>,
151	{
152		f(&mut self.multi)
153	}
154
155	/// Begin a single-version query transaction for specific keys
156	#[instrument(name = "transaction::query::begin_single_query", level = "trace", skip(self, keys))]
157	pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
158	where
159		I: IntoIterator<Item = &'a EncodedKey>,
160	{
161		self.single.begin_query(keys)
162	}
163}
164
165// No-op implementations of TransactionalChanges for QueryTransaction.
166// Query transactions don't track changes, so all methods return None/false.
167
168impl TransactionalDictionaryChanges for QueryTransaction {
169	fn find_dictionary(&self, _id: DictionaryId) -> Option<&DictionaryDef> {
170		None
171	}
172
173	fn find_dictionary_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&DictionaryDef> {
174		None
175	}
176
177	fn is_dictionary_deleted(&self, _id: DictionaryId) -> bool {
178		false
179	}
180
181	fn is_dictionary_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
182		false
183	}
184}
185
186impl TransactionalFlowChanges for QueryTransaction {
187	fn find_flow(&self, _id: FlowId) -> Option<&FlowDef> {
188		None
189	}
190
191	fn find_flow_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&FlowDef> {
192		None
193	}
194
195	fn is_flow_deleted(&self, _id: FlowId) -> bool {
196		false
197	}
198
199	fn is_flow_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
200		false
201	}
202}
203
204impl TransactionalNamespaceChanges for QueryTransaction {
205	fn find_namespace(&self, _id: NamespaceId) -> Option<&Namespace> {
206		None
207	}
208
209	fn find_namespace_by_name(&self, _name: &str) -> Option<&Namespace> {
210		None
211	}
212
213	fn is_namespace_deleted(&self, _id: NamespaceId) -> bool {
214		false
215	}
216
217	fn is_namespace_deleted_by_name(&self, _name: &str) -> bool {
218		false
219	}
220}
221
222impl TransactionalProcedureChanges for QueryTransaction {
223	fn find_procedure(&self, _id: ProcedureId) -> Option<&ProcedureDef> {
224		None
225	}
226
227	fn find_procedure_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&ProcedureDef> {
228		None
229	}
230
231	fn is_procedure_deleted(&self, _id: ProcedureId) -> bool {
232		false
233	}
234
235	fn is_procedure_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
236		false
237	}
238}
239
240impl TransactionalTestChanges for QueryTransaction {
241	fn find_test(&self, _id: TestId) -> Option<&TestDef> {
242		None
243	}
244
245	fn find_test_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&TestDef> {
246		None
247	}
248
249	fn is_test_deleted(&self, _id: TestId) -> bool {
250		false
251	}
252
253	fn is_test_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
254		false
255	}
256}
257
258impl TransactionalRingBufferChanges for QueryTransaction {
259	fn find_ringbuffer(&self, _id: RingBufferId) -> Option<&RingBufferDef> {
260		None
261	}
262
263	fn find_ringbuffer_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&RingBufferDef> {
264		None
265	}
266
267	fn is_ringbuffer_deleted(&self, _id: RingBufferId) -> bool {
268		false
269	}
270
271	fn is_ringbuffer_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
272		false
273	}
274}
275
276impl TransactionalSeriesChanges for QueryTransaction {
277	fn find_series(&self, _id: SeriesId) -> Option<&SeriesDef> {
278		None
279	}
280
281	fn find_series_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&SeriesDef> {
282		None
283	}
284
285	fn is_series_deleted(&self, _id: SeriesId) -> bool {
286		false
287	}
288
289	fn is_series_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
290		false
291	}
292}
293
294impl TransactionalTableChanges for QueryTransaction {
295	fn find_table(&self, _id: TableId) -> Option<&TableDef> {
296		None
297	}
298
299	fn find_table_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&TableDef> {
300		None
301	}
302
303	fn is_table_deleted(&self, _id: TableId) -> bool {
304		false
305	}
306
307	fn is_table_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
308		false
309	}
310}
311
312impl TransactionalViewChanges for QueryTransaction {
313	fn find_view(&self, _id: ViewId) -> Option<&ViewDef> {
314		None
315	}
316
317	fn find_view_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&ViewDef> {
318		None
319	}
320
321	fn is_view_deleted(&self, _id: ViewId) -> bool {
322		false
323	}
324
325	fn is_view_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
326		false
327	}
328}
329
330impl TransactionalSumTypeChanges for QueryTransaction {
331	fn find_sumtype(&self, _id: SumTypeId) -> Option<&SumTypeDef> {
332		None
333	}
334
335	fn find_sumtype_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&SumTypeDef> {
336		None
337	}
338
339	fn is_sumtype_deleted(&self, _id: SumTypeId) -> bool {
340		false
341	}
342
343	fn is_sumtype_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
344		false
345	}
346}
347
348impl TransactionalSubscriptionChanges for QueryTransaction {
349	fn find_subscription(&self, _id: SubscriptionId) -> Option<&SubscriptionDef> {
350		None
351	}
352
353	fn is_subscription_deleted(&self, _id: SubscriptionId) -> bool {
354		false
355	}
356}
357
358impl TransactionalHandlerChanges for QueryTransaction {
359	fn find_handler_by_id(&self, _id: HandlerId) -> Option<&HandlerDef> {
360		None
361	}
362
363	fn find_handler_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&HandlerDef> {
364		None
365	}
366
367	fn is_handler_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
368		false
369	}
370}
371
372impl TransactionalUserChanges for QueryTransaction {
373	fn find_user(&self, _id: UserId) -> Option<&UserDef> {
374		None
375	}
376
377	fn find_user_by_name(&self, _name: &str) -> Option<&UserDef> {
378		None
379	}
380
381	fn is_user_deleted(&self, _id: UserId) -> bool {
382		false
383	}
384
385	fn is_user_deleted_by_name(&self, _name: &str) -> bool {
386		false
387	}
388}
389
390impl TransactionalRoleChanges for QueryTransaction {
391	fn find_role(&self, _id: RoleId) -> Option<&RoleDef> {
392		None
393	}
394
395	fn find_role_by_name(&self, _name: &str) -> Option<&RoleDef> {
396		None
397	}
398
399	fn is_role_deleted(&self, _id: RoleId) -> bool {
400		false
401	}
402
403	fn is_role_deleted_by_name(&self, _name: &str) -> bool {
404		false
405	}
406}
407
408impl TransactionalUserRoleChanges for QueryTransaction {
409	fn find_user_role(&self, _user: UserId, _role: RoleId) -> Option<&UserRoleDef> {
410		None
411	}
412
413	fn is_user_role_deleted(&self, _user: UserId, _role: RoleId) -> bool {
414		false
415	}
416}
417
418impl TransactionalPolicyChanges for QueryTransaction {
419	fn find_policy(&self, _id: PolicyId) -> Option<&PolicyDef> {
420		None
421	}
422
423	fn find_policy_by_name(&self, _name: &str) -> Option<&PolicyDef> {
424		None
425	}
426
427	fn is_policy_deleted(&self, _id: PolicyId) -> bool {
428		false
429	}
430
431	fn is_policy_deleted_by_name(&self, _name: &str) -> bool {
432		false
433	}
434}
435
436impl TransactionalMigrationChanges for QueryTransaction {
437	fn find_migration(&self, _id: MigrationId) -> Option<&MigrationDef> {
438		None
439	}
440
441	fn find_migration_by_name(&self, _name: &str) -> Option<&MigrationDef> {
442		None
443	}
444
445	fn is_migration_deleted(&self, _id: MigrationId) -> bool {
446		false
447	}
448
449	fn is_migration_deleted_by_name(&self, _name: &str) -> bool {
450		false
451	}
452}
453
454impl TransactionalUserAuthenticationChanges for QueryTransaction {
455	fn find_user_authentication(&self, _id: UserAuthenticationId) -> Option<&UserAuthenticationDef> {
456		None
457	}
458
459	fn find_user_authentication_by_user_and_method(
460		&self,
461		_user_id: UserId,
462		_method: &str,
463	) -> Option<&UserAuthenticationDef> {
464		None
465	}
466
467	fn is_user_authentication_deleted(&self, _id: UserAuthenticationId) -> bool {
468		false
469	}
470}
471
472impl TransactionalChanges for QueryTransaction {}