1use std::sync::Arc;
5
6use reifydb_core::{
7 common::CommitVersion,
8 encoded::key::{EncodedKey, EncodedKeyRange},
9 execution::ExecutionResult,
10 interface::{
11 catalog::{
12 authentication::{Authentication, AuthenticationId},
13 binding::Binding,
14 config::{Config, ConfigKey},
15 dictionary::Dictionary,
16 flow::{Flow, FlowId},
17 handler::Handler,
18 id::{
19 BindingId, HandlerId, MigrationId, NamespaceId, ProcedureId, RingBufferId, SeriesId,
20 SinkId, SourceId, TableId, TestId, ViewId,
21 },
22 identity::{GrantedRole, Identity, Role, RoleId},
23 migration::Migration,
24 namespace::Namespace,
25 policy::{Policy, PolicyId},
26 procedure::Procedure,
27 ringbuffer::RingBuffer,
28 series::Series,
29 shape::ShapeId,
30 sink::Sink,
31 source::Source,
32 sumtype::SumType,
33 table::Table,
34 test::Test,
35 view::View,
36 },
37 store::{MultiVersionBatch, MultiVersionRow},
38 },
39 row::RowTtl,
40};
41use reifydb_type::{
42 Result,
43 params::Params,
44 value::{dictionary::DictionaryId, identity::IdentityId, sumtype::SumTypeId},
45};
46use tracing::instrument;
47
48use crate::{
49 TransactionId,
50 change::{
51 TransactionalAuthenticationChanges, TransactionalBindingChanges, TransactionalChanges,
52 TransactionalConfigChanges, TransactionalDictionaryChanges, TransactionalFlowChanges,
53 TransactionalGrantedRoleChanges, TransactionalHandlerChanges, TransactionalIdentityChanges,
54 TransactionalMigrationChanges, TransactionalNamespaceChanges, TransactionalPolicyChanges,
55 TransactionalProcedureChanges, TransactionalRingBufferChanges, TransactionalRoleChanges,
56 TransactionalRowTtlChanges, TransactionalSeriesChanges, TransactionalSinkChanges,
57 TransactionalSourceChanges, TransactionalSumTypeChanges, TransactionalTableChanges,
58 TransactionalTestChanges, TransactionalViewChanges,
59 },
60 multi::transaction::read::MultiReadTransaction,
61 single::{SingleTransaction, read::SingleReadTransaction},
62 transaction::{RqlExecutor, Transaction},
63};
64
65pub struct QueryTransaction {
68 pub(crate) multi: MultiReadTransaction,
69 pub(crate) single: SingleTransaction,
70
71 pub identity: IdentityId,
73
74 pub(crate) executor: Option<Arc<dyn RqlExecutor>>,
76}
77
78impl QueryTransaction {
79 #[instrument(name = "transaction::query::new", level = "debug", skip_all)]
81 pub fn new(multi: MultiReadTransaction, single: SingleTransaction, identity: IdentityId) -> Self {
82 Self {
83 multi,
84 single,
85 identity,
86 executor: None,
87 }
88 }
89
90 pub fn set_executor(&mut self, executor: Arc<dyn RqlExecutor>) {
92 self.executor = Some(executor);
93 }
94
95 pub fn rql(&mut self, rql: &str, params: Params) -> ExecutionResult {
99 let executor = self.executor.clone().expect("RqlExecutor not set");
100 executor.rql(&mut Transaction::Query(self), rql, params)
101 }
102
103 #[inline]
105 pub fn version(&self) -> CommitVersion {
106 self.multi.version()
107 }
108
109 #[inline]
111 pub fn id(&self) -> TransactionId {
112 self.multi.tm.id()
113 }
114
115 #[inline]
117 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
118 Ok(self.multi.get(key)?.map(|v| v.into_multi_version_row()))
119 }
120
121 #[inline]
123 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
124 self.multi.contains_key(key)
125 }
126
127 #[inline]
129 pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
130 self.multi.prefix(prefix)
131 }
132
133 #[inline]
135 pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
136 self.multi.prefix_rev(prefix)
137 }
138
139 #[inline]
141 pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
142 self.multi.read_as_of_version_exclusive(version);
143 Ok(())
144 }
145
146 #[inline]
148 pub fn range(
149 &self,
150 range: EncodedKeyRange,
151 batch_size: usize,
152 ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
153 self.multi.range(range, batch_size)
154 }
155
156 #[inline]
158 pub fn range_rev(
159 &self,
160 range: EncodedKeyRange,
161 batch_size: usize,
162 ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
163 self.multi.range_rev(range, batch_size)
164 }
165
166 #[instrument(name = "transaction::query::with_single_query", level = "trace", skip(self, keys, f))]
168 pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
169 where
170 I: IntoIterator<Item = &'a EncodedKey> + Send,
171 F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
172 R: Send,
173 {
174 self.single.with_query(keys, f)
175 }
176
177 #[instrument(name = "transaction::query::with_multi_query", level = "trace", skip(self, f))]
180 pub fn with_multi_query<F, R>(&mut self, f: F) -> Result<R>
181 where
182 F: FnOnce(&mut MultiReadTransaction) -> Result<R>,
183 {
184 f(&mut self.multi)
185 }
186
187 #[instrument(name = "transaction::query::begin_single_query", level = "trace", skip(self, keys))]
189 pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
190 where
191 I: IntoIterator<Item = &'a EncodedKey>,
192 {
193 self.single.begin_query(keys)
194 }
195}
196
197impl TransactionalDictionaryChanges for QueryTransaction {
201 fn find_dictionary(&self, _id: DictionaryId) -> Option<&Dictionary> {
202 None
203 }
204
205 fn find_dictionary_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Dictionary> {
206 None
207 }
208
209 fn is_dictionary_deleted(&self, _id: DictionaryId) -> bool {
210 false
211 }
212
213 fn is_dictionary_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
214 false
215 }
216}
217
218impl TransactionalFlowChanges for QueryTransaction {
219 fn find_flow(&self, _id: FlowId) -> Option<&Flow> {
220 None
221 }
222
223 fn find_flow_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Flow> {
224 None
225 }
226
227 fn is_flow_deleted(&self, _id: FlowId) -> bool {
228 false
229 }
230
231 fn is_flow_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
232 false
233 }
234}
235
236impl TransactionalNamespaceChanges for QueryTransaction {
237 fn find_namespace(&self, _id: NamespaceId) -> Option<&Namespace> {
238 None
239 }
240
241 fn find_namespace_by_name(&self, _name: &str) -> Option<&Namespace> {
242 None
243 }
244
245 fn is_namespace_deleted(&self, _id: NamespaceId) -> bool {
246 false
247 }
248
249 fn is_namespace_deleted_by_name(&self, _name: &str) -> bool {
250 false
251 }
252}
253
254impl TransactionalProcedureChanges for QueryTransaction {
255 fn find_procedure(&self, _id: ProcedureId) -> Option<&Procedure> {
256 None
257 }
258
259 fn find_procedure_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Procedure> {
260 None
261 }
262
263 fn is_procedure_deleted(&self, _id: ProcedureId) -> bool {
264 false
265 }
266
267 fn is_procedure_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
268 false
269 }
270}
271
272impl TransactionalTestChanges for QueryTransaction {
273 fn find_test(&self, _id: TestId) -> Option<&Test> {
274 None
275 }
276
277 fn find_test_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Test> {
278 None
279 }
280
281 fn is_test_deleted(&self, _id: TestId) -> bool {
282 false
283 }
284
285 fn is_test_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
286 false
287 }
288}
289
290impl TransactionalRingBufferChanges for QueryTransaction {
291 fn find_ringbuffer(&self, _id: RingBufferId) -> Option<&RingBuffer> {
292 None
293 }
294
295 fn find_ringbuffer_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&RingBuffer> {
296 None
297 }
298
299 fn is_ringbuffer_deleted(&self, _id: RingBufferId) -> bool {
300 false
301 }
302
303 fn is_ringbuffer_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
304 false
305 }
306}
307
308impl TransactionalSeriesChanges for QueryTransaction {
309 fn find_series(&self, _id: SeriesId) -> Option<&Series> {
310 None
311 }
312
313 fn find_series_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Series> {
314 None
315 }
316
317 fn is_series_deleted(&self, _id: SeriesId) -> bool {
318 false
319 }
320
321 fn is_series_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
322 false
323 }
324}
325
326impl TransactionalTableChanges for QueryTransaction {
327 fn find_table(&self, _id: TableId) -> Option<&Table> {
328 None
329 }
330
331 fn find_table_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Table> {
332 None
333 }
334
335 fn is_table_deleted(&self, _id: TableId) -> bool {
336 false
337 }
338
339 fn is_table_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
340 false
341 }
342}
343
344impl TransactionalViewChanges for QueryTransaction {
345 fn find_view(&self, _id: ViewId) -> Option<&View> {
346 None
347 }
348
349 fn find_view_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&View> {
350 None
351 }
352
353 fn is_view_deleted(&self, _id: ViewId) -> bool {
354 false
355 }
356
357 fn is_view_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
358 false
359 }
360}
361
362impl TransactionalSumTypeChanges for QueryTransaction {
363 fn find_sumtype(&self, _id: SumTypeId) -> Option<&SumType> {
364 None
365 }
366
367 fn find_sumtype_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&SumType> {
368 None
369 }
370
371 fn is_sumtype_deleted(&self, _id: SumTypeId) -> bool {
372 false
373 }
374
375 fn is_sumtype_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
376 false
377 }
378}
379
380impl TransactionalHandlerChanges for QueryTransaction {
381 fn find_handler_by_id(&self, _id: HandlerId) -> Option<&Handler> {
382 None
383 }
384
385 fn find_handler_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Handler> {
386 None
387 }
388
389 fn is_handler_deleted(&self, _id: HandlerId) -> bool {
390 false
391 }
392
393 fn is_handler_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
394 false
395 }
396}
397
398impl TransactionalIdentityChanges for QueryTransaction {
399 fn find_identity(&self, _id: IdentityId) -> Option<&Identity> {
400 None
401 }
402
403 fn find_identity_by_name(&self, _name: &str) -> Option<&Identity> {
404 None
405 }
406
407 fn is_identity_deleted(&self, _id: IdentityId) -> bool {
408 false
409 }
410
411 fn is_identity_deleted_by_name(&self, _name: &str) -> bool {
412 false
413 }
414}
415
416impl TransactionalRoleChanges for QueryTransaction {
417 fn find_role(&self, _id: RoleId) -> Option<&Role> {
418 None
419 }
420
421 fn find_role_by_name(&self, _name: &str) -> Option<&Role> {
422 None
423 }
424
425 fn is_role_deleted(&self, _id: RoleId) -> bool {
426 false
427 }
428
429 fn is_role_deleted_by_name(&self, _name: &str) -> bool {
430 false
431 }
432}
433
434impl TransactionalGrantedRoleChanges for QueryTransaction {
435 fn find_granted_role(&self, _identity: IdentityId, _role: RoleId) -> Option<&GrantedRole> {
436 None
437 }
438
439 fn find_granted_roles_for_identity(&self, _identity: IdentityId) -> Vec<&GrantedRole> {
440 Vec::new()
441 }
442
443 fn is_granted_role_deleted(&self, _identity: IdentityId, _role: RoleId) -> bool {
444 false
445 }
446}
447
448impl TransactionalPolicyChanges for QueryTransaction {
449 fn find_policy(&self, _id: PolicyId) -> Option<&Policy> {
450 None
451 }
452
453 fn find_policy_by_name(&self, _name: &str) -> Option<&Policy> {
454 None
455 }
456
457 fn is_policy_deleted(&self, _id: PolicyId) -> bool {
458 false
459 }
460
461 fn is_policy_deleted_by_name(&self, _name: &str) -> bool {
462 false
463 }
464}
465
466impl TransactionalMigrationChanges for QueryTransaction {
467 fn find_migration(&self, _id: MigrationId) -> Option<&Migration> {
468 None
469 }
470
471 fn find_migration_by_name(&self, _name: &str) -> Option<&Migration> {
472 None
473 }
474
475 fn is_migration_deleted(&self, _id: MigrationId) -> bool {
476 false
477 }
478
479 fn is_migration_deleted_by_name(&self, _name: &str) -> bool {
480 false
481 }
482}
483
484impl TransactionalAuthenticationChanges for QueryTransaction {
485 fn find_authentication(&self, _id: AuthenticationId) -> Option<&Authentication> {
486 None
487 }
488
489 fn find_authentication_by_identity_and_method(
490 &self,
491 _identity: IdentityId,
492 _method: &str,
493 ) -> Option<&Authentication> {
494 None
495 }
496
497 fn is_authentication_deleted(&self, _id: AuthenticationId) -> bool {
498 false
499 }
500
501 fn is_authentication_deleted_by_identity_and_method(&self, _identity: IdentityId, _method: &str) -> bool {
502 false
503 }
504}
505
506impl TransactionalSourceChanges for QueryTransaction {
507 fn find_source(&self, _id: SourceId) -> Option<&Source> {
508 None
509 }
510
511 fn find_source_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Source> {
512 None
513 }
514
515 fn is_source_deleted(&self, _id: SourceId) -> bool {
516 false
517 }
518
519 fn is_source_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
520 false
521 }
522}
523
524impl TransactionalSinkChanges for QueryTransaction {
525 fn find_sink(&self, _id: SinkId) -> Option<&Sink> {
526 None
527 }
528
529 fn find_sink_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Sink> {
530 None
531 }
532
533 fn is_sink_deleted(&self, _id: SinkId) -> bool {
534 false
535 }
536
537 fn is_sink_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
538 false
539 }
540}
541
542impl TransactionalConfigChanges for QueryTransaction {
543 fn find_config(&self, _key: ConfigKey) -> Option<&Config> {
544 None
545 }
546}
547
548impl TransactionalRowTtlChanges for QueryTransaction {
549 fn find_row_ttl(&self, _shape: ShapeId) -> Option<&RowTtl> {
550 None
551 }
552
553 fn is_row_ttl_deleted(&self, _shape: ShapeId) -> bool {
554 false
555 }
556}
557
558impl TransactionalBindingChanges for QueryTransaction {
559 fn find_binding(&self, _id: BindingId) -> Option<&Binding> {
560 None
561 }
562
563 fn find_binding_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Binding> {
564 None
565 }
566
567 fn is_binding_deleted(&self, _id: BindingId) -> bool {
568 false
569 }
570
571 fn is_binding_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
572 false
573 }
574}
575
576impl TransactionalChanges for QueryTransaction {}