Skip to main content

reifydb_transaction/transaction/
query.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	common::CommitVersion,
8	encoded::key::{EncodedKey, EncodedKeyRange},
9	execution::ExecutionResult,
10	interface::{
11		catalog::{
12			authentication::{Authentication, AuthenticationId},
13			binding::Binding,
14			config::{Config, ConfigKey},
15			dictionary::Dictionary,
16			flow::{Flow, FlowId},
17			handler::Handler,
18			id::{
19				BindingId, HandlerId, MigrationId, NamespaceId, ProcedureId, RingBufferId, SeriesId,
20				SinkId, SourceId, TableId, TestId, ViewId,
21			},
22			identity::{GrantedRole, Identity, Role, RoleId},
23			migration::Migration,
24			namespace::Namespace,
25			policy::{Policy, PolicyId},
26			procedure::Procedure,
27			ringbuffer::RingBuffer,
28			series::Series,
29			shape::ShapeId,
30			sink::Sink,
31			source::Source,
32			sumtype::SumType,
33			table::Table,
34			test::Test,
35			view::View,
36		},
37		store::{MultiVersionBatch, MultiVersionRow},
38	},
39	row::RowTtl,
40};
41use reifydb_type::{
42	Result,
43	params::Params,
44	value::{dictionary::DictionaryId, identity::IdentityId, sumtype::SumTypeId},
45};
46use tracing::instrument;
47
48use crate::{
49	TransactionId,
50	change::{
51		TransactionalAuthenticationChanges, TransactionalBindingChanges, TransactionalChanges,
52		TransactionalConfigChanges, TransactionalDictionaryChanges, TransactionalFlowChanges,
53		TransactionalGrantedRoleChanges, TransactionalHandlerChanges, TransactionalIdentityChanges,
54		TransactionalMigrationChanges, TransactionalNamespaceChanges, TransactionalPolicyChanges,
55		TransactionalProcedureChanges, TransactionalRingBufferChanges, TransactionalRoleChanges,
56		TransactionalRowTtlChanges, TransactionalSeriesChanges, TransactionalSinkChanges,
57		TransactionalSourceChanges, TransactionalSumTypeChanges, TransactionalTableChanges,
58		TransactionalTestChanges, TransactionalViewChanges,
59	},
60	multi::transaction::read::MultiReadTransaction,
61	single::{SingleTransaction, read::SingleReadTransaction},
62	transaction::{RqlExecutor, Transaction},
63};
64
65/// An active query transaction that holds a multi query transaction
66/// and provides query-only access to single storage.
67pub struct QueryTransaction {
68	pub(crate) multi: MultiReadTransaction,
69	pub(crate) single: SingleTransaction,
70
71	/// The identity executing this transaction.
72	pub identity: IdentityId,
73
74	/// Optional RQL executor for running RQL within this transaction.
75	pub(crate) executor: Option<Arc<dyn RqlExecutor>>,
76}
77
78impl QueryTransaction {
79	/// Creates a new active query transaction
80	#[instrument(name = "transaction::query::new", level = "debug", skip_all)]
81	pub fn new(multi: MultiReadTransaction, single: SingleTransaction, identity: IdentityId) -> Self {
82		Self {
83			multi,
84			single,
85			identity,
86			executor: None,
87		}
88	}
89
90	/// Set the RQL executor for this transaction.
91	pub fn set_executor(&mut self, executor: Arc<dyn RqlExecutor>) {
92		self.executor = Some(executor);
93	}
94
95	/// Execute RQL within this transaction using the attached executor.
96	///
97	/// Panics if no `RqlExecutor` has been set on this transaction.
98	pub fn rql(&mut self, rql: &str, params: Params) -> ExecutionResult {
99		let executor = self.executor.clone().expect("RqlExecutor not set");
100		executor.rql(&mut Transaction::Query(self), rql, params)
101	}
102
103	/// Get the transaction version
104	#[inline]
105	pub fn version(&self) -> CommitVersion {
106		self.multi.version()
107	}
108
109	/// Get the transaction ID
110	#[inline]
111	pub fn id(&self) -> TransactionId {
112		self.multi.tm.id()
113	}
114
115	/// Get a value by key
116	#[inline]
117	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
118		Ok(self.multi.get(key)?.map(|v| v.into_multi_version_row()))
119	}
120
121	/// Check if a key exists
122	#[inline]
123	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
124		self.multi.contains_key(key)
125	}
126
127	/// Get a prefix batch
128	#[inline]
129	pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
130		self.multi.prefix(prefix)
131	}
132
133	/// Get a reverse prefix batch
134	#[inline]
135	pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
136		self.multi.prefix_rev(prefix)
137	}
138
139	/// Read as of version exclusive
140	#[inline]
141	pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
142		self.multi.read_as_of_version_exclusive(version);
143		Ok(())
144	}
145
146	/// Create a streaming iterator for forward range queries.
147	#[inline]
148	pub fn range(
149		&self,
150		range: EncodedKeyRange,
151		batch_size: usize,
152	) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
153		self.multi.range(range, batch_size)
154	}
155
156	/// Create a streaming iterator for reverse range queries.
157	#[inline]
158	pub fn range_rev(
159		&self,
160		range: EncodedKeyRange,
161		batch_size: usize,
162	) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
163		self.multi.range_rev(range, batch_size)
164	}
165
166	/// Execute a function with query access to the single transaction.
167	#[instrument(name = "transaction::query::with_single_query", level = "trace", skip(self, keys, f))]
168	pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
169	where
170		I: IntoIterator<Item = &'a EncodedKey> + Send,
171		F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
172		R: Send,
173	{
174		self.single.with_query(keys, f)
175	}
176
177	/// Execute a function with access to the multi query transaction.
178	/// This operates within the same transaction context.
179	#[instrument(name = "transaction::query::with_multi_query", level = "trace", skip(self, f))]
180	pub fn with_multi_query<F, R>(&mut self, f: F) -> Result<R>
181	where
182		F: FnOnce(&mut MultiReadTransaction) -> Result<R>,
183	{
184		f(&mut self.multi)
185	}
186
187	/// Begin a single-version query transaction for specific keys
188	#[instrument(name = "transaction::query::begin_single_query", level = "trace", skip(self, keys))]
189	pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
190	where
191		I: IntoIterator<Item = &'a EncodedKey>,
192	{
193		self.single.begin_query(keys)
194	}
195}
196
197// No-op implementations of TransactionalChanges for QueryTransaction.
198// Query transactions don't track changes, so all methods return None/false.
199
200impl TransactionalDictionaryChanges for QueryTransaction {
201	fn find_dictionary(&self, _id: DictionaryId) -> Option<&Dictionary> {
202		None
203	}
204
205	fn find_dictionary_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Dictionary> {
206		None
207	}
208
209	fn is_dictionary_deleted(&self, _id: DictionaryId) -> bool {
210		false
211	}
212
213	fn is_dictionary_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
214		false
215	}
216}
217
218impl TransactionalFlowChanges for QueryTransaction {
219	fn find_flow(&self, _id: FlowId) -> Option<&Flow> {
220		None
221	}
222
223	fn find_flow_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Flow> {
224		None
225	}
226
227	fn is_flow_deleted(&self, _id: FlowId) -> bool {
228		false
229	}
230
231	fn is_flow_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
232		false
233	}
234}
235
236impl TransactionalNamespaceChanges for QueryTransaction {
237	fn find_namespace(&self, _id: NamespaceId) -> Option<&Namespace> {
238		None
239	}
240
241	fn find_namespace_by_name(&self, _name: &str) -> Option<&Namespace> {
242		None
243	}
244
245	fn is_namespace_deleted(&self, _id: NamespaceId) -> bool {
246		false
247	}
248
249	fn is_namespace_deleted_by_name(&self, _name: &str) -> bool {
250		false
251	}
252}
253
254impl TransactionalProcedureChanges for QueryTransaction {
255	fn find_procedure(&self, _id: ProcedureId) -> Option<&Procedure> {
256		None
257	}
258
259	fn find_procedure_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Procedure> {
260		None
261	}
262
263	fn is_procedure_deleted(&self, _id: ProcedureId) -> bool {
264		false
265	}
266
267	fn is_procedure_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
268		false
269	}
270}
271
272impl TransactionalTestChanges for QueryTransaction {
273	fn find_test(&self, _id: TestId) -> Option<&Test> {
274		None
275	}
276
277	fn find_test_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Test> {
278		None
279	}
280
281	fn is_test_deleted(&self, _id: TestId) -> bool {
282		false
283	}
284
285	fn is_test_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
286		false
287	}
288}
289
290impl TransactionalRingBufferChanges for QueryTransaction {
291	fn find_ringbuffer(&self, _id: RingBufferId) -> Option<&RingBuffer> {
292		None
293	}
294
295	fn find_ringbuffer_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&RingBuffer> {
296		None
297	}
298
299	fn is_ringbuffer_deleted(&self, _id: RingBufferId) -> bool {
300		false
301	}
302
303	fn is_ringbuffer_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
304		false
305	}
306}
307
308impl TransactionalSeriesChanges for QueryTransaction {
309	fn find_series(&self, _id: SeriesId) -> Option<&Series> {
310		None
311	}
312
313	fn find_series_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Series> {
314		None
315	}
316
317	fn is_series_deleted(&self, _id: SeriesId) -> bool {
318		false
319	}
320
321	fn is_series_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
322		false
323	}
324}
325
326impl TransactionalTableChanges for QueryTransaction {
327	fn find_table(&self, _id: TableId) -> Option<&Table> {
328		None
329	}
330
331	fn find_table_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Table> {
332		None
333	}
334
335	fn is_table_deleted(&self, _id: TableId) -> bool {
336		false
337	}
338
339	fn is_table_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
340		false
341	}
342}
343
344impl TransactionalViewChanges for QueryTransaction {
345	fn find_view(&self, _id: ViewId) -> Option<&View> {
346		None
347	}
348
349	fn find_view_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&View> {
350		None
351	}
352
353	fn is_view_deleted(&self, _id: ViewId) -> bool {
354		false
355	}
356
357	fn is_view_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
358		false
359	}
360}
361
362impl TransactionalSumTypeChanges for QueryTransaction {
363	fn find_sumtype(&self, _id: SumTypeId) -> Option<&SumType> {
364		None
365	}
366
367	fn find_sumtype_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&SumType> {
368		None
369	}
370
371	fn is_sumtype_deleted(&self, _id: SumTypeId) -> bool {
372		false
373	}
374
375	fn is_sumtype_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
376		false
377	}
378}
379
380impl TransactionalHandlerChanges for QueryTransaction {
381	fn find_handler_by_id(&self, _id: HandlerId) -> Option<&Handler> {
382		None
383	}
384
385	fn find_handler_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Handler> {
386		None
387	}
388
389	fn is_handler_deleted(&self, _id: HandlerId) -> bool {
390		false
391	}
392
393	fn is_handler_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
394		false
395	}
396}
397
398impl TransactionalIdentityChanges for QueryTransaction {
399	fn find_identity(&self, _id: IdentityId) -> Option<&Identity> {
400		None
401	}
402
403	fn find_identity_by_name(&self, _name: &str) -> Option<&Identity> {
404		None
405	}
406
407	fn is_identity_deleted(&self, _id: IdentityId) -> bool {
408		false
409	}
410
411	fn is_identity_deleted_by_name(&self, _name: &str) -> bool {
412		false
413	}
414}
415
416impl TransactionalRoleChanges for QueryTransaction {
417	fn find_role(&self, _id: RoleId) -> Option<&Role> {
418		None
419	}
420
421	fn find_role_by_name(&self, _name: &str) -> Option<&Role> {
422		None
423	}
424
425	fn is_role_deleted(&self, _id: RoleId) -> bool {
426		false
427	}
428
429	fn is_role_deleted_by_name(&self, _name: &str) -> bool {
430		false
431	}
432}
433
434impl TransactionalGrantedRoleChanges for QueryTransaction {
435	fn find_granted_role(&self, _identity: IdentityId, _role: RoleId) -> Option<&GrantedRole> {
436		None
437	}
438
439	fn find_granted_roles_for_identity(&self, _identity: IdentityId) -> Vec<&GrantedRole> {
440		Vec::new()
441	}
442
443	fn is_granted_role_deleted(&self, _identity: IdentityId, _role: RoleId) -> bool {
444		false
445	}
446}
447
448impl TransactionalPolicyChanges for QueryTransaction {
449	fn find_policy(&self, _id: PolicyId) -> Option<&Policy> {
450		None
451	}
452
453	fn find_policy_by_name(&self, _name: &str) -> Option<&Policy> {
454		None
455	}
456
457	fn is_policy_deleted(&self, _id: PolicyId) -> bool {
458		false
459	}
460
461	fn is_policy_deleted_by_name(&self, _name: &str) -> bool {
462		false
463	}
464}
465
466impl TransactionalMigrationChanges for QueryTransaction {
467	fn find_migration(&self, _id: MigrationId) -> Option<&Migration> {
468		None
469	}
470
471	fn find_migration_by_name(&self, _name: &str) -> Option<&Migration> {
472		None
473	}
474
475	fn is_migration_deleted(&self, _id: MigrationId) -> bool {
476		false
477	}
478
479	fn is_migration_deleted_by_name(&self, _name: &str) -> bool {
480		false
481	}
482}
483
484impl TransactionalAuthenticationChanges for QueryTransaction {
485	fn find_authentication(&self, _id: AuthenticationId) -> Option<&Authentication> {
486		None
487	}
488
489	fn find_authentication_by_identity_and_method(
490		&self,
491		_identity: IdentityId,
492		_method: &str,
493	) -> Option<&Authentication> {
494		None
495	}
496
497	fn is_authentication_deleted(&self, _id: AuthenticationId) -> bool {
498		false
499	}
500
501	fn is_authentication_deleted_by_identity_and_method(&self, _identity: IdentityId, _method: &str) -> bool {
502		false
503	}
504}
505
506impl TransactionalSourceChanges for QueryTransaction {
507	fn find_source(&self, _id: SourceId) -> Option<&Source> {
508		None
509	}
510
511	fn find_source_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Source> {
512		None
513	}
514
515	fn is_source_deleted(&self, _id: SourceId) -> bool {
516		false
517	}
518
519	fn is_source_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
520		false
521	}
522}
523
524impl TransactionalSinkChanges for QueryTransaction {
525	fn find_sink(&self, _id: SinkId) -> Option<&Sink> {
526		None
527	}
528
529	fn find_sink_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Sink> {
530		None
531	}
532
533	fn is_sink_deleted(&self, _id: SinkId) -> bool {
534		false
535	}
536
537	fn is_sink_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
538		false
539	}
540}
541
542impl TransactionalConfigChanges for QueryTransaction {
543	fn find_config(&self, _key: ConfigKey) -> Option<&Config> {
544		None
545	}
546}
547
548impl TransactionalRowTtlChanges for QueryTransaction {
549	fn find_row_ttl(&self, _shape: ShapeId) -> Option<&RowTtl> {
550		None
551	}
552
553	fn is_row_ttl_deleted(&self, _shape: ShapeId) -> bool {
554		false
555	}
556}
557
558impl TransactionalBindingChanges for QueryTransaction {
559	fn find_binding(&self, _id: BindingId) -> Option<&Binding> {
560		None
561	}
562
563	fn find_binding_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Binding> {
564		None
565	}
566
567	fn is_binding_deleted(&self, _id: BindingId) -> bool {
568		false
569	}
570
571	fn is_binding_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
572		false
573	}
574}
575
576impl TransactionalChanges for QueryTransaction {}