1use std::sync::Arc;
5
6use reifydb_core::{
7 common::CommitVersion,
8 encoded::key::{EncodedKey, EncodedKeyRange},
9 execution::ExecutionResult,
10 interface::{
11 catalog::{
12 authentication::{Authentication, AuthenticationId},
13 binding::Binding,
14 config::{Config, ConfigKey},
15 dictionary::Dictionary,
16 flow::{Flow, FlowId, FlowNodeId},
17 handler::Handler,
18 id::{
19 BindingId, HandlerId, MigrationId, NamespaceId, ProcedureId, RingBufferId, SeriesId,
20 SinkId, SourceId, TableId, TestId, ViewId,
21 },
22 identity::{GrantedRole, Identity, Role, RoleId},
23 migration::Migration,
24 namespace::Namespace,
25 policy::{Policy, PolicyId},
26 procedure::Procedure,
27 ringbuffer::RingBuffer,
28 series::Series,
29 shape::ShapeId,
30 sink::Sink,
31 source::Source,
32 sumtype::SumType,
33 table::Table,
34 test::Test,
35 view::View,
36 },
37 store::{MultiVersionBatch, MultiVersionRow},
38 },
39 row::Ttl,
40};
41use reifydb_type::{
42 Result,
43 params::Params,
44 value::{dictionary::DictionaryId, identity::IdentityId, sumtype::SumTypeId},
45};
46use tracing::instrument;
47
48use crate::{
49 TransactionId,
50 change::{
51 TransactionalAuthenticationChanges, TransactionalBindingChanges, TransactionalChanges,
52 TransactionalConfigChanges, TransactionalDictionaryChanges, TransactionalFlowChanges,
53 TransactionalGrantedRoleChanges, TransactionalHandlerChanges, TransactionalIdentityChanges,
54 TransactionalMigrationChanges, TransactionalNamespaceChanges, TransactionalOperatorTtlChanges,
55 TransactionalPolicyChanges, TransactionalProcedureChanges, TransactionalRingBufferChanges,
56 TransactionalRoleChanges, TransactionalRowTtlChanges, TransactionalSeriesChanges,
57 TransactionalSinkChanges, TransactionalSourceChanges, TransactionalSumTypeChanges,
58 TransactionalTableChanges, TransactionalTestChanges, TransactionalViewChanges,
59 },
60 multi::transaction::read::MultiReadTransaction,
61 single::{SingleTransaction, read::SingleReadTransaction},
62 transaction::{RqlExecutor, Transaction},
63};
64
65pub struct QueryTransaction {
66 pub(crate) multi: MultiReadTransaction,
67 pub(crate) single: SingleTransaction,
68
69 pub identity: IdentityId,
70
71 pub(crate) executor: Option<Arc<dyn RqlExecutor>>,
72}
73
74impl QueryTransaction {
75 #[instrument(name = "transaction::query::new", level = "debug", skip_all)]
76 pub fn new(multi: MultiReadTransaction, single: SingleTransaction, identity: IdentityId) -> Self {
77 Self {
78 multi,
79 single,
80 identity,
81 executor: None,
82 }
83 }
84
85 pub fn set_executor(&mut self, executor: Arc<dyn RqlExecutor>) {
86 self.executor = Some(executor);
87 }
88
89 pub fn rql(&mut self, rql: &str, params: Params) -> ExecutionResult {
90 let executor = self.executor.clone().expect("RqlExecutor not set");
91 executor.rql(&mut Transaction::Query(self), rql, params)
92 }
93
94 #[inline]
95 pub fn version(&self) -> CommitVersion {
96 self.multi.version()
97 }
98
99 #[inline]
100 pub fn id(&self) -> TransactionId {
101 self.multi.tm.id()
102 }
103
104 #[inline]
105 pub fn get(&mut self, key: &EncodedKey) -> Result<Option<MultiVersionRow>> {
106 Ok(self.multi.get(key)?.map(|v| v.into_multi_version_row()))
107 }
108
109 #[inline]
110 pub fn contains_key(&mut self, key: &EncodedKey) -> Result<bool> {
111 self.multi.contains_key(key)
112 }
113
114 #[inline]
115 pub fn prefix(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
116 self.multi.prefix(prefix)
117 }
118
119 #[inline]
120 pub fn prefix_rev(&mut self, prefix: &EncodedKey) -> Result<MultiVersionBatch> {
121 self.multi.prefix_rev(prefix)
122 }
123
124 #[inline]
125 pub fn read_as_of_version_exclusive(&mut self, version: CommitVersion) -> Result<()> {
126 self.multi.read_as_of_version_exclusive(version);
127 Ok(())
128 }
129
130 #[inline]
131 pub fn range(
132 &self,
133 range: EncodedKeyRange,
134 batch_size: usize,
135 ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
136 self.multi.range(range, batch_size)
137 }
138
139 #[inline]
140 pub fn range_rev(
141 &self,
142 range: EncodedKeyRange,
143 batch_size: usize,
144 ) -> Box<dyn Iterator<Item = Result<MultiVersionRow>> + Send + '_> {
145 self.multi.range_rev(range, batch_size)
146 }
147
148 #[instrument(name = "transaction::query::with_single_query", level = "trace", skip(self, keys, f))]
149 pub fn with_single_query<'a, I, F, R>(&self, keys: I, f: F) -> Result<R>
150 where
151 I: IntoIterator<Item = &'a EncodedKey> + Send,
152 F: FnOnce(&mut SingleReadTransaction<'_>) -> Result<R> + Send,
153 R: Send,
154 {
155 self.single.with_query(keys, f)
156 }
157
158 #[instrument(name = "transaction::query::with_multi_query", level = "trace", skip(self, f))]
159 pub fn with_multi_query<F, R>(&mut self, f: F) -> Result<R>
160 where
161 F: FnOnce(&mut MultiReadTransaction) -> Result<R>,
162 {
163 f(&mut self.multi)
164 }
165
166 #[instrument(name = "transaction::query::begin_single_query", level = "trace", skip(self, keys))]
167 pub fn begin_single_query<'a, I>(&self, keys: I) -> Result<SingleReadTransaction<'_>>
168 where
169 I: IntoIterator<Item = &'a EncodedKey>,
170 {
171 self.single.begin_query(keys)
172 }
173}
174
175impl TransactionalDictionaryChanges for QueryTransaction {
176 fn find_dictionary(&self, _id: DictionaryId) -> Option<&Dictionary> {
177 None
178 }
179
180 fn find_dictionary_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Dictionary> {
181 None
182 }
183
184 fn is_dictionary_deleted(&self, _id: DictionaryId) -> bool {
185 false
186 }
187
188 fn is_dictionary_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
189 false
190 }
191}
192
193impl TransactionalFlowChanges for QueryTransaction {
194 fn find_flow(&self, _id: FlowId) -> Option<&Flow> {
195 None
196 }
197
198 fn find_flow_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Flow> {
199 None
200 }
201
202 fn is_flow_deleted(&self, _id: FlowId) -> bool {
203 false
204 }
205
206 fn is_flow_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
207 false
208 }
209}
210
211impl TransactionalNamespaceChanges for QueryTransaction {
212 fn find_namespace(&self, _id: NamespaceId) -> Option<&Namespace> {
213 None
214 }
215
216 fn find_namespace_by_name(&self, _name: &str) -> Option<&Namespace> {
217 None
218 }
219
220 fn is_namespace_deleted(&self, _id: NamespaceId) -> bool {
221 false
222 }
223
224 fn is_namespace_deleted_by_name(&self, _name: &str) -> bool {
225 false
226 }
227}
228
229impl TransactionalProcedureChanges for QueryTransaction {
230 fn find_procedure(&self, _id: ProcedureId) -> Option<&Procedure> {
231 None
232 }
233
234 fn find_procedure_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Procedure> {
235 None
236 }
237
238 fn is_procedure_deleted(&self, _id: ProcedureId) -> bool {
239 false
240 }
241
242 fn is_procedure_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
243 false
244 }
245}
246
247impl TransactionalTestChanges for QueryTransaction {
248 fn find_test(&self, _id: TestId) -> Option<&Test> {
249 None
250 }
251
252 fn find_test_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Test> {
253 None
254 }
255
256 fn is_test_deleted(&self, _id: TestId) -> bool {
257 false
258 }
259
260 fn is_test_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
261 false
262 }
263}
264
265impl TransactionalRingBufferChanges for QueryTransaction {
266 fn find_ringbuffer(&self, _id: RingBufferId) -> Option<&RingBuffer> {
267 None
268 }
269
270 fn find_ringbuffer_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&RingBuffer> {
271 None
272 }
273
274 fn is_ringbuffer_deleted(&self, _id: RingBufferId) -> bool {
275 false
276 }
277
278 fn is_ringbuffer_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
279 false
280 }
281}
282
283impl TransactionalSeriesChanges for QueryTransaction {
284 fn find_series(&self, _id: SeriesId) -> Option<&Series> {
285 None
286 }
287
288 fn find_series_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Series> {
289 None
290 }
291
292 fn is_series_deleted(&self, _id: SeriesId) -> bool {
293 false
294 }
295
296 fn is_series_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
297 false
298 }
299}
300
301impl TransactionalTableChanges for QueryTransaction {
302 fn find_table(&self, _id: TableId) -> Option<&Table> {
303 None
304 }
305
306 fn find_table_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Table> {
307 None
308 }
309
310 fn is_table_deleted(&self, _id: TableId) -> bool {
311 false
312 }
313
314 fn is_table_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
315 false
316 }
317}
318
319impl TransactionalViewChanges for QueryTransaction {
320 fn find_view(&self, _id: ViewId) -> Option<&View> {
321 None
322 }
323
324 fn find_view_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&View> {
325 None
326 }
327
328 fn is_view_deleted(&self, _id: ViewId) -> bool {
329 false
330 }
331
332 fn is_view_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
333 false
334 }
335}
336
337impl TransactionalSumTypeChanges for QueryTransaction {
338 fn find_sumtype(&self, _id: SumTypeId) -> Option<&SumType> {
339 None
340 }
341
342 fn find_sumtype_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&SumType> {
343 None
344 }
345
346 fn is_sumtype_deleted(&self, _id: SumTypeId) -> bool {
347 false
348 }
349
350 fn is_sumtype_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
351 false
352 }
353}
354
355impl TransactionalHandlerChanges for QueryTransaction {
356 fn find_handler_by_id(&self, _id: HandlerId) -> Option<&Handler> {
357 None
358 }
359
360 fn find_handler_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Handler> {
361 None
362 }
363
364 fn is_handler_deleted(&self, _id: HandlerId) -> bool {
365 false
366 }
367
368 fn is_handler_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
369 false
370 }
371}
372
373impl TransactionalIdentityChanges for QueryTransaction {
374 fn find_identity(&self, _id: IdentityId) -> Option<&Identity> {
375 None
376 }
377
378 fn find_identity_by_name(&self, _name: &str) -> Option<&Identity> {
379 None
380 }
381
382 fn is_identity_deleted(&self, _id: IdentityId) -> bool {
383 false
384 }
385
386 fn is_identity_deleted_by_name(&self, _name: &str) -> bool {
387 false
388 }
389}
390
391impl TransactionalRoleChanges for QueryTransaction {
392 fn find_role(&self, _id: RoleId) -> Option<&Role> {
393 None
394 }
395
396 fn find_role_by_name(&self, _name: &str) -> Option<&Role> {
397 None
398 }
399
400 fn is_role_deleted(&self, _id: RoleId) -> bool {
401 false
402 }
403
404 fn is_role_deleted_by_name(&self, _name: &str) -> bool {
405 false
406 }
407}
408
409impl TransactionalGrantedRoleChanges for QueryTransaction {
410 fn find_granted_role(&self, _identity: IdentityId, _role: RoleId) -> Option<&GrantedRole> {
411 None
412 }
413
414 fn find_granted_roles_for_identity(&self, _identity: IdentityId) -> Vec<&GrantedRole> {
415 Vec::new()
416 }
417
418 fn is_granted_role_deleted(&self, _identity: IdentityId, _role: RoleId) -> bool {
419 false
420 }
421}
422
423impl TransactionalPolicyChanges for QueryTransaction {
424 fn find_policy(&self, _id: PolicyId) -> Option<&Policy> {
425 None
426 }
427
428 fn find_policy_by_name(&self, _name: &str) -> Option<&Policy> {
429 None
430 }
431
432 fn is_policy_deleted(&self, _id: PolicyId) -> bool {
433 false
434 }
435
436 fn is_policy_deleted_by_name(&self, _name: &str) -> bool {
437 false
438 }
439}
440
441impl TransactionalMigrationChanges for QueryTransaction {
442 fn find_migration(&self, _id: MigrationId) -> Option<&Migration> {
443 None
444 }
445
446 fn find_migration_by_name(&self, _name: &str) -> Option<&Migration> {
447 None
448 }
449
450 fn is_migration_deleted(&self, _id: MigrationId) -> bool {
451 false
452 }
453
454 fn is_migration_deleted_by_name(&self, _name: &str) -> bool {
455 false
456 }
457}
458
459impl TransactionalAuthenticationChanges for QueryTransaction {
460 fn find_authentication(&self, _id: AuthenticationId) -> Option<&Authentication> {
461 None
462 }
463
464 fn find_authentication_by_identity_and_method(
465 &self,
466 _identity: IdentityId,
467 _method: &str,
468 ) -> Option<&Authentication> {
469 None
470 }
471
472 fn is_authentication_deleted(&self, _id: AuthenticationId) -> bool {
473 false
474 }
475
476 fn is_authentication_deleted_by_identity_and_method(&self, _identity: IdentityId, _method: &str) -> bool {
477 false
478 }
479}
480
481impl TransactionalSourceChanges for QueryTransaction {
482 fn find_source(&self, _id: SourceId) -> Option<&Source> {
483 None
484 }
485
486 fn find_source_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Source> {
487 None
488 }
489
490 fn is_source_deleted(&self, _id: SourceId) -> bool {
491 false
492 }
493
494 fn is_source_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
495 false
496 }
497}
498
499impl TransactionalSinkChanges for QueryTransaction {
500 fn find_sink(&self, _id: SinkId) -> Option<&Sink> {
501 None
502 }
503
504 fn find_sink_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Sink> {
505 None
506 }
507
508 fn is_sink_deleted(&self, _id: SinkId) -> bool {
509 false
510 }
511
512 fn is_sink_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
513 false
514 }
515}
516
517impl TransactionalConfigChanges for QueryTransaction {
518 fn find_config(&self, _key: ConfigKey) -> Option<&Config> {
519 None
520 }
521}
522
523impl TransactionalRowTtlChanges for QueryTransaction {
524 fn find_row_ttl(&self, _shape: ShapeId) -> Option<&Ttl> {
525 None
526 }
527
528 fn is_row_ttl_deleted(&self, _shape: ShapeId) -> bool {
529 false
530 }
531}
532
533impl TransactionalOperatorTtlChanges for QueryTransaction {
534 fn find_operator_ttl(&self, _node: FlowNodeId) -> Option<&Ttl> {
535 None
536 }
537
538 fn is_operator_ttl_deleted(&self, _node: FlowNodeId) -> bool {
539 false
540 }
541}
542
543impl TransactionalBindingChanges for QueryTransaction {
544 fn find_binding(&self, _id: BindingId) -> Option<&Binding> {
545 None
546 }
547
548 fn find_binding_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&Binding> {
549 None
550 }
551
552 fn is_binding_deleted(&self, _id: BindingId) -> bool {
553 false
554 }
555
556 fn is_binding_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
557 false
558 }
559}
560
561impl TransactionalChanges for QueryTransaction {}