Skip to main content

reifydb_transaction/transaction/
query.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	common::CommitVersion,
8	encoded::key::{EncodedKey, EncodedKeyRange},
9	execution::ExecutionResult,
10	interface::{
11		catalog::{
12			authentication::{Authentication, AuthenticationId},
13			binding::Binding,
14			config::{Config, ConfigKey},
15			dictionary::Dictionary,
16			flow::{Flow, FlowId, FlowNodeId},
17			handler::Handler,
18			id::{
19				BindingId, HandlerId, MigrationId, NamespaceId, ProcedureId, RingBufferId, SeriesId,
20				SinkId, SourceId, TableId, TestId, ViewId,
21			},
22			identity::{GrantedRole, Identity, Role, RoleId},
23			migration::Migration,
24			namespace::Namespace,
25			policy::{Policy, PolicyId},
26			procedure::Procedure,
27			ringbuffer::RingBuffer,
28			series::Series,
29			shape::ShapeId,
30			sink::Sink,
31			source::Source,
32			sumtype::SumType,
33			table::Table,
34			test::Test,
35			view::View,
36		},
37		store::{MultiVersionBatch, MultiVersionRow},
38	},
39	row::Ttl,
40};
41use reifydb_type::{
42	Result,
43	params::Params,
44	value::{dictionary::DictionaryId, identity::IdentityId, sumtype::SumTypeId},
45};
46use tracing::instrument;
47
48use crate::{
49	TransactionId,
50	change::{
51		TransactionalAuthenticationChanges, TransactionalBindingChanges, TransactionalChanges,
52		TransactionalConfigChanges, TransactionalDictionaryChanges, TransactionalFlowChanges,
53		TransactionalGrantedRoleChanges, TransactionalHandlerChanges, TransactionalIdentityChanges,
54		TransactionalMigrationChanges, TransactionalNamespaceChanges, TransactionalOperatorTtlChanges,
55		TransactionalPolicyChanges, TransactionalProcedureChanges, TransactionalRingBufferChanges,
56		TransactionalRoleChanges, TransactionalRowTtlChanges, TransactionalSeriesChanges,
57		TransactionalSinkChanges, TransactionalSourceChanges, TransactionalSumTypeChanges,
58		TransactionalTableChanges, TransactionalTestChanges, TransactionalViewChanges,
59	},
60	multi::transaction::read::MultiReadTransaction,
61	single::{SingleTransaction, read::SingleReadTransaction},
62	transaction::{RqlExecutor, Transaction},
63};
64
65pub struct QueryTransaction {
66	pub(crate) multi: MultiReadTransaction,
67	pub(crate) single: SingleTransaction,
68
69	pub identity: IdentityId,
70
71	pub(crate) executor: Option<Arc<dyn RqlExecutor>>,
72}
73
74impl QueryTransaction {
75	#[instrument(name = "transaction::query::new", level = "debug", skip_all)]
76	pub fn new(multi: MultiReadTransaction, single: SingleTransaction, identity: IdentityId) -> Self {
77		Self {
78			multi,
79			single,
80			identity,
81			executor: None,
82		}
83	}
84
85	pub fn set_executor(&mut self, executor: Arc<dyn RqlExecutor>) {
86		self.executor = Some(executor);
87	}
88
89	pub fn rql(&mut self, rql: &str, params: Params) -> ExecutionResult {
90		let executor = self.executor.clone().expect("RqlExecutor not set");
91		executor.rql(&mut Transaction::Query(self), rql, params)
92	}
93
94	#[inline]
95	pub fn version(&self) -> CommitVersion {
96		self.multi.version()
97	}
98
99	#[inline]
100	pub fn id(&self) -> TransactionId {
101		self.multi.tm.id()
102	}
103
104	#[inline]
105	pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
106		Ok(self.multi.get(key)?.map(|v| v.into_multi_version_row()))
107	}
108
109	#[inline]
110	pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
111		self.multi.contains_key(key)
112	}
113
114	#[inline]
115	pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
116		self.multi.prefix(prefix)
117	}
118
119	#[inline]
120	pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
121		self.multi.prefix_rev(prefix)
122	}
123
124	#[inline]
125	pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
126		self.multi.read_as_of_version_exclusive(version);
127		Ok(())
128	}
129
130	#[inline]
131	pub fn range(
132		&self,
133		range: EncodedKeyRange,
134		batch_size: usize,
135	) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
136		self.multi.range(range, batch_size)
137	}
138
139	#[inline]
140	pub fn range_rev(
141		&self,
142		range: EncodedKeyRange,
143		batch_size: usize,
144	) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
145		self.multi.range_rev(range, batch_size)
146	}
147
148	#[instrument(name = "transaction::query::with_single_query", level = "trace", skip(self, keys, f))]
149	pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
150	where
151		I: IntoIterator<Item = &'a EncodedKey> + Send,
152		F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
153		R: Send,
154	{
155		self.single.with_query(keys, f)
156	}
157
158	#[instrument(name = "transaction::query::with_multi_query", level = "trace", skip(self, f))]
159	pub fn with_multi_query<F, R>(&mut self, f: F) -> Result<R>
160	where
161		F: FnOnce(&mut MultiReadTransaction) -> Result<R>,
162	{
163		f(&mut self.multi)
164	}
165
166	#[instrument(name = "transaction::query::begin_single_query", level = "trace", skip(self, keys))]
167	pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
168	where
169		I: IntoIterator<Item = &'a EncodedKey>,
170	{
171		self.single.begin_query(keys)
172	}
173}
174
175impl TransactionalDictionaryChanges for QueryTransaction {
176	fn find_dictionary(&self, _id: DictionaryId) -> Option<&Dictionary> {
177		None
178	}
179
180	fn find_dictionary_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Dictionary> {
181		None
182	}
183
184	fn is_dictionary_deleted(&self, _id: DictionaryId) -> bool {
185		false
186	}
187
188	fn is_dictionary_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
189		false
190	}
191}
192
193impl TransactionalFlowChanges for QueryTransaction {
194	fn find_flow(&self, _id: FlowId) -> Option<&Flow> {
195		None
196	}
197
198	fn find_flow_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Flow> {
199		None
200	}
201
202	fn is_flow_deleted(&self, _id: FlowId) -> bool {
203		false
204	}
205
206	fn is_flow_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
207		false
208	}
209}
210
211impl TransactionalNamespaceChanges for QueryTransaction {
212	fn find_namespace(&self, _id: NamespaceId) -> Option<&Namespace> {
213		None
214	}
215
216	fn find_namespace_by_name(&self, _name: &str) -> Option<&Namespace> {
217		None
218	}
219
220	fn is_namespace_deleted(&self, _id: NamespaceId) -> bool {
221		false
222	}
223
224	fn is_namespace_deleted_by_name(&self, _name: &str) -> bool {
225		false
226	}
227}
228
229impl TransactionalProcedureChanges for QueryTransaction {
230	fn find_procedure(&self, _id: ProcedureId) -> Option<&Procedure> {
231		None
232	}
233
234	fn find_procedure_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Procedure> {
235		None
236	}
237
238	fn is_procedure_deleted(&self, _id: ProcedureId) -> bool {
239		false
240	}
241
242	fn is_procedure_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
243		false
244	}
245}
246
247impl TransactionalTestChanges for QueryTransaction {
248	fn find_test(&self, _id: TestId) -> Option<&Test> {
249		None
250	}
251
252	fn find_test_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Test> {
253		None
254	}
255
256	fn is_test_deleted(&self, _id: TestId) -> bool {
257		false
258	}
259
260	fn is_test_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
261		false
262	}
263}
264
265impl TransactionalRingBufferChanges for QueryTransaction {
266	fn find_ringbuffer(&self, _id: RingBufferId) -> Option<&RingBuffer> {
267		None
268	}
269
270	fn find_ringbuffer_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&RingBuffer> {
271		None
272	}
273
274	fn is_ringbuffer_deleted(&self, _id: RingBufferId) -> bool {
275		false
276	}
277
278	fn is_ringbuffer_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
279		false
280	}
281}
282
283impl TransactionalSeriesChanges for QueryTransaction {
284	fn find_series(&self, _id: SeriesId) -> Option<&Series> {
285		None
286	}
287
288	fn find_series_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Series> {
289		None
290	}
291
292	fn is_series_deleted(&self, _id: SeriesId) -> bool {
293		false
294	}
295
296	fn is_series_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
297		false
298	}
299}
300
301impl TransactionalTableChanges for QueryTransaction {
302	fn find_table(&self, _id: TableId) -> Option<&Table> {
303		None
304	}
305
306	fn find_table_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Table> {
307		None
308	}
309
310	fn is_table_deleted(&self, _id: TableId) -> bool {
311		false
312	}
313
314	fn is_table_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
315		false
316	}
317}
318
319impl TransactionalViewChanges for QueryTransaction {
320	fn find_view(&self, _id: ViewId) -> Option<&View> {
321		None
322	}
323
324	fn find_view_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&View> {
325		None
326	}
327
328	fn is_view_deleted(&self, _id: ViewId) -> bool {
329		false
330	}
331
332	fn is_view_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
333		false
334	}
335}
336
337impl TransactionalSumTypeChanges for QueryTransaction {
338	fn find_sumtype(&self, _id: SumTypeId) -> Option<&SumType> {
339		None
340	}
341
342	fn find_sumtype_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&SumType> {
343		None
344	}
345
346	fn is_sumtype_deleted(&self, _id: SumTypeId) -> bool {
347		false
348	}
349
350	fn is_sumtype_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
351		false
352	}
353}
354
355impl TransactionalHandlerChanges for QueryTransaction {
356	fn find_handler_by_id(&self, _id: HandlerId) -> Option<&Handler> {
357		None
358	}
359
360	fn find_handler_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Handler> {
361		None
362	}
363
364	fn is_handler_deleted(&self, _id: HandlerId) -> bool {
365		false
366	}
367
368	fn is_handler_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
369		false
370	}
371}
372
373impl TransactionalIdentityChanges for QueryTransaction {
374	fn find_identity(&self, _id: IdentityId) -> Option<&Identity> {
375		None
376	}
377
378	fn find_identity_by_name(&self, _name: &str) -> Option<&Identity> {
379		None
380	}
381
382	fn is_identity_deleted(&self, _id: IdentityId) -> bool {
383		false
384	}
385
386	fn is_identity_deleted_by_name(&self, _name: &str) -> bool {
387		false
388	}
389}
390
391impl TransactionalRoleChanges for QueryTransaction {
392	fn find_role(&self, _id: RoleId) -> Option<&Role> {
393		None
394	}
395
396	fn find_role_by_name(&self, _name: &str) -> Option<&Role> {
397		None
398	}
399
400	fn is_role_deleted(&self, _id: RoleId) -> bool {
401		false
402	}
403
404	fn is_role_deleted_by_name(&self, _name: &str) -> bool {
405		false
406	}
407}
408
409impl TransactionalGrantedRoleChanges for QueryTransaction {
410	fn find_granted_role(&self, _identity: IdentityId, _role: RoleId) -> Option<&GrantedRole> {
411		None
412	}
413
414	fn find_granted_roles_for_identity(&self, _identity: IdentityId) -> Vec<&GrantedRole> {
415		Vec::new()
416	}
417
418	fn is_granted_role_deleted(&self, _identity: IdentityId, _role: RoleId) -> bool {
419		false
420	}
421}
422
423impl TransactionalPolicyChanges for QueryTransaction {
424	fn find_policy(&self, _id: PolicyId) -> Option<&Policy> {
425		None
426	}
427
428	fn find_policy_by_name(&self, _name: &str) -> Option<&Policy> {
429		None
430	}
431
432	fn is_policy_deleted(&self, _id: PolicyId) -> bool {
433		false
434	}
435
436	fn is_policy_deleted_by_name(&self, _name: &str) -> bool {
437		false
438	}
439}
440
441impl TransactionalMigrationChanges for QueryTransaction {
442	fn find_migration(&self, _id: MigrationId) -> Option<&Migration> {
443		None
444	}
445
446	fn find_migration_by_name(&self, _name: &str) -> Option<&Migration> {
447		None
448	}
449
450	fn is_migration_deleted(&self, _id: MigrationId) -> bool {
451		false
452	}
453
454	fn is_migration_deleted_by_name(&self, _name: &str) -> bool {
455		false
456	}
457}
458
459impl TransactionalAuthenticationChanges for QueryTransaction {
460	fn find_authentication(&self, _id: AuthenticationId) -> Option<&Authentication> {
461		None
462	}
463
464	fn find_authentication_by_identity_and_method(
465		&self,
466		_identity: IdentityId,
467		_method: &str,
468	) -> Option<&Authentication> {
469		None
470	}
471
472	fn is_authentication_deleted(&self, _id: AuthenticationId) -> bool {
473		false
474	}
475
476	fn is_authentication_deleted_by_identity_and_method(&self, _identity: IdentityId, _method: &str) -> bool {
477		false
478	}
479}
480
481impl TransactionalSourceChanges for QueryTransaction {
482	fn find_source(&self, _id: SourceId) -> Option<&Source> {
483		None
484	}
485
486	fn find_source_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Source> {
487		None
488	}
489
490	fn is_source_deleted(&self, _id: SourceId) -> bool {
491		false
492	}
493
494	fn is_source_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
495		false
496	}
497}
498
499impl TransactionalSinkChanges for QueryTransaction {
500	fn find_sink(&self, _id: SinkId) -> Option<&Sink> {
501		None
502	}
503
504	fn find_sink_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Sink> {
505		None
506	}
507
508	fn is_sink_deleted(&self, _id: SinkId) -> bool {
509		false
510	}
511
512	fn is_sink_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
513		false
514	}
515}
516
517impl TransactionalConfigChanges for QueryTransaction {
518	fn find_config(&self, _key: ConfigKey) -> Option<&Config> {
519		None
520	}
521}
522
523impl TransactionalRowTtlChanges for QueryTransaction {
524	fn find_row_ttl(&self, _shape: ShapeId) -> Option<&Ttl> {
525		None
526	}
527
528	fn is_row_ttl_deleted(&self, _shape: ShapeId) -> bool {
529		false
530	}
531}
532
533impl TransactionalOperatorTtlChanges for QueryTransaction {
534	fn find_operator_ttl(&self, _node: FlowNodeId) -> Option<&Ttl> {
535		None
536	}
537
538	fn is_operator_ttl_deleted(&self, _node: FlowNodeId) -> bool {
539		false
540	}
541}
542
543impl TransactionalBindingChanges for QueryTransaction {
544	fn find_binding(&self, _id: BindingId) -> Option<&Binding> {
545		None
546	}
547
548	fn find_binding_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Binding> {
549		None
550	}
551
552	fn is_binding_deleted(&self, _id: BindingId) -> bool {
553		false
554	}
555
556	fn is_binding_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
557		false
558	}
559}
560
561impl TransactionalChanges for QueryTransaction {}