1use OperationType::Delete;
5use reifydb_core::{
6 encoded::row::EncodedRow,
7 interface::catalog::{
8 authentication::{Authentication, AuthenticationId},
9 binding::Binding,
10 config::{Config, ConfigKey},
11 dictionary::Dictionary,
12 flow::{Flow, FlowId},
13 handler::Handler,
14 id::{
15 BindingId, HandlerId, MigrationEventId, MigrationId, NamespaceId, ProcedureId, RingBufferId,
16 SeriesId, SinkId, SourceId, TableId, TestId, ViewId,
17 },
18 identity::{GrantedRole, Identity, Role, RoleId},
19 migration::{Migration, MigrationEvent},
20 namespace::Namespace,
21 policy::{Policy, PolicyId},
22 procedure::Procedure,
23 ringbuffer::RingBuffer,
24 series::Series,
25 shape::ShapeId,
26 sink::Sink,
27 source::Source,
28 sumtype::SumType,
29 table::Table,
30 test::Test,
31 view::View,
32 },
33 row::RowTtl,
34};
35use reifydb_type::value::{dictionary::DictionaryId, identity::IdentityId, row_number::RowNumber, sumtype::SumTypeId};
36
37use crate::TransactionId;
38
39pub trait TransactionalChanges:
40 TransactionalBindingChanges
41 + TransactionalDictionaryChanges
42 + TransactionalFlowChanges
43 + TransactionalHandlerChanges
44 + TransactionalMigrationChanges
45 + TransactionalNamespaceChanges
46 + TransactionalProcedureChanges
47 + TransactionalRingBufferChanges
48 + TransactionalRoleChanges
49 + TransactionalPolicyChanges
50 + TransactionalSeriesChanges
51 + TransactionalSinkChanges
52 + TransactionalSourceChanges
53 + TransactionalSumTypeChanges
54 + TransactionalTableChanges
55 + TransactionalTestChanges
56 + TransactionalAuthenticationChanges
57 + TransactionalIdentityChanges
58 + TransactionalGrantedRoleChanges
59 + TransactionalViewChanges
60 + TransactionalConfigChanges
61 + TransactionalRowTtlChanges
62{
63}
64
65pub trait TransactionalBindingChanges {
66 fn find_binding(&self, id: BindingId) -> Option<&Binding>;
67
68 fn find_binding_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Binding>;
69
70 fn is_binding_deleted(&self, id: BindingId) -> bool;
71
72 fn is_binding_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
73}
74
75pub trait TransactionalRowTtlChanges {
76 fn find_row_ttl(&self, shape: ShapeId) -> Option<&RowTtl>;
77
78 fn is_row_ttl_deleted(&self, shape: ShapeId) -> bool;
79}
80
81pub trait TransactionalConfigChanges {
82 fn find_config(&self, key: ConfigKey) -> Option<&Config>;
83}
84
85pub trait TransactionalDictionaryChanges {
86 fn find_dictionary(&self, id: DictionaryId) -> Option<&Dictionary>;
87
88 fn find_dictionary_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Dictionary>;
89
90 fn is_dictionary_deleted(&self, id: DictionaryId) -> bool;
91
92 fn is_dictionary_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
93}
94
95pub trait TransactionalNamespaceChanges {
96 fn find_namespace(&self, id: NamespaceId) -> Option<&Namespace>;
97
98 fn find_namespace_by_name(&self, name: &str) -> Option<&Namespace>;
99
100 fn is_namespace_deleted(&self, id: NamespaceId) -> bool;
101
102 fn is_namespace_deleted_by_name(&self, name: &str) -> bool;
103}
104
105pub trait TransactionalFlowChanges {
106 fn find_flow(&self, id: FlowId) -> Option<&Flow>;
107
108 fn find_flow_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Flow>;
109
110 fn is_flow_deleted(&self, id: FlowId) -> bool;
111
112 fn is_flow_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
113}
114
115pub trait TransactionalTableChanges {
116 fn find_table(&self, id: TableId) -> Option<&Table>;
117
118 fn find_table_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Table>;
119
120 fn is_table_deleted(&self, id: TableId) -> bool;
121
122 fn is_table_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
123}
124
125pub trait TransactionalProcedureChanges {
126 fn find_procedure(&self, id: ProcedureId) -> Option<&Procedure>;
127
128 fn find_procedure_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Procedure>;
129
130 fn is_procedure_deleted(&self, id: ProcedureId) -> bool;
131
132 fn is_procedure_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
133}
134
135pub trait TransactionalTestChanges {
136 fn find_test(&self, id: TestId) -> Option<&Test>;
137
138 fn find_test_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Test>;
139
140 fn is_test_deleted(&self, id: TestId) -> bool;
141
142 fn is_test_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
143}
144
145pub trait TransactionalRingBufferChanges {
146 fn find_ringbuffer(&self, id: RingBufferId) -> Option<&RingBuffer>;
147
148 fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&RingBuffer>;
149
150 fn is_ringbuffer_deleted(&self, id: RingBufferId) -> bool;
151
152 fn is_ringbuffer_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
153}
154
155pub trait TransactionalSeriesChanges {
156 fn find_series(&self, id: SeriesId) -> Option<&Series>;
157
158 fn find_series_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Series>;
159
160 fn is_series_deleted(&self, id: SeriesId) -> bool;
161
162 fn is_series_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
163}
164
165pub trait TransactionalViewChanges {
166 fn find_view(&self, id: ViewId) -> Option<&View>;
167
168 fn find_view_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&View>;
169
170 fn is_view_deleted(&self, id: ViewId) -> bool;
171
172 fn is_view_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
173}
174
175pub trait TransactionalSumTypeChanges {
176 fn find_sumtype(&self, id: SumTypeId) -> Option<&SumType>;
177
178 fn find_sumtype_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&SumType>;
179
180 fn is_sumtype_deleted(&self, id: SumTypeId) -> bool;
181
182 fn is_sumtype_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
183}
184
185pub trait TransactionalHandlerChanges {
186 fn find_handler_by_id(&self, id: HandlerId) -> Option<&Handler>;
187
188 fn find_handler_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Handler>;
189
190 fn is_handler_deleted(&self, id: HandlerId) -> bool;
191
192 fn is_handler_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
193}
194
195pub trait TransactionalIdentityChanges {
196 fn find_identity(&self, id: IdentityId) -> Option<&Identity>;
197
198 fn find_identity_by_name(&self, name: &str) -> Option<&Identity>;
199
200 fn is_identity_deleted(&self, id: IdentityId) -> bool;
201
202 fn is_identity_deleted_by_name(&self, name: &str) -> bool;
203}
204
205pub trait TransactionalRoleChanges {
206 fn find_role(&self, id: RoleId) -> Option<&Role>;
207
208 fn find_role_by_name(&self, name: &str) -> Option<&Role>;
209
210 fn is_role_deleted(&self, id: RoleId) -> bool;
211
212 fn is_role_deleted_by_name(&self, name: &str) -> bool;
213}
214
215pub trait TransactionalAuthenticationChanges {
216 fn find_authentication(&self, id: AuthenticationId) -> Option<&Authentication>;
217
218 fn find_authentication_by_identity_and_method(
219 &self,
220 identity: IdentityId,
221 method: &str,
222 ) -> Option<&Authentication>;
223
224 fn is_authentication_deleted(&self, id: AuthenticationId) -> bool;
225
226 fn is_authentication_deleted_by_identity_and_method(&self, identity: IdentityId, method: &str) -> bool;
227}
228
229pub trait TransactionalGrantedRoleChanges {
230 fn find_granted_role(&self, identity: IdentityId, role: RoleId) -> Option<&GrantedRole>;
231
232 fn find_granted_roles_for_identity(&self, identity: IdentityId) -> Vec<&GrantedRole>;
233
234 fn is_granted_role_deleted(&self, identity: IdentityId, role: RoleId) -> bool;
235}
236
237pub trait TransactionalPolicyChanges {
238 fn find_policy(&self, id: PolicyId) -> Option<&Policy>;
239
240 fn find_policy_by_name(&self, name: &str) -> Option<&Policy>;
241
242 fn is_policy_deleted(&self, id: PolicyId) -> bool;
243
244 fn is_policy_deleted_by_name(&self, name: &str) -> bool;
245}
246
247pub trait TransactionalSourceChanges {
248 fn find_source(&self, id: SourceId) -> Option<&Source>;
249
250 fn find_source_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Source>;
251
252 fn is_source_deleted(&self, id: SourceId) -> bool;
253
254 fn is_source_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
255}
256
257pub trait TransactionalSinkChanges {
258 fn find_sink(&self, id: SinkId) -> Option<&Sink>;
259
260 fn find_sink_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Sink>;
261
262 fn is_sink_deleted(&self, id: SinkId) -> bool;
263
264 fn is_sink_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
265}
266
267pub trait TransactionalMigrationChanges {
268 fn find_migration(&self, id: MigrationId) -> Option<&Migration>;
269
270 fn find_migration_by_name(&self, name: &str) -> Option<&Migration>;
271
272 fn is_migration_deleted(&self, id: MigrationId) -> bool;
273
274 fn is_migration_deleted_by_name(&self, name: &str) -> bool;
275}
276
277#[derive(Default, Debug, Clone)]
278pub struct TransactionalCatalogChanges {
279 pub txn_id: TransactionId,
281 pub config: Vec<Change<Config>>,
283 pub binding: Vec<Change<Binding>>,
285 pub dictionary: Vec<Change<Dictionary>>,
287 pub flow: Vec<Change<Flow>>,
289 pub handler: Vec<Change<Handler>>,
291 pub migration: Vec<Change<Migration>>,
293 pub migration_event: Vec<Change<MigrationEvent>>,
295 pub namespace: Vec<Change<Namespace>>,
297 pub procedure: Vec<Change<Procedure>>,
299 pub ringbuffer: Vec<Change<RingBuffer>>,
301 pub series: Vec<Change<Series>>,
303 pub sink: Vec<Change<Sink>>,
305 pub source: Vec<Change<Source>>,
307 pub sumtype: Vec<Change<SumType>>,
308 pub test: Vec<Change<Test>>,
310 pub table: Vec<Change<Table>>,
312 pub identity: Vec<Change<Identity>>,
314 pub authentication: Vec<Change<Authentication>>,
316 pub role: Vec<Change<Role>>,
318 pub granted_role: Vec<Change<GrantedRole>>,
320 pub policy: Vec<Change<Policy>>,
322 pub view: Vec<Change<View>>,
324 pub row_ttl: Vec<Change<(ShapeId, RowTtl)>>,
326 pub log: Vec<Operation>,
328}
329
330pub struct CatalogChangesSavepoint {
331 binding_len: usize,
332 config_len: usize,
333 dictionary_len: usize,
334 flow_len: usize,
335 handler_len: usize,
336 migration_len: usize,
337 migration_event_len: usize,
338 namespace_len: usize,
339 procedure_len: usize,
340 ringbuffer_len: usize,
341 series_len: usize,
342 sink_len: usize,
343 source_len: usize,
344 sumtype_len: usize,
345 test_len: usize,
346 table_len: usize,
347 identity_len: usize,
348 authentication_len: usize,
349 role_len: usize,
350 granted_role_len: usize,
351 policy_len: usize,
352 view_len: usize,
353 row_ttl_len: usize,
354 log_len: usize,
355}
356
357impl TransactionalCatalogChanges {
358 pub fn savepoint(&self) -> CatalogChangesSavepoint {
359 CatalogChangesSavepoint {
360 binding_len: self.binding.len(),
361 config_len: self.config.len(),
362 dictionary_len: self.dictionary.len(),
363 flow_len: self.flow.len(),
364 handler_len: self.handler.len(),
365 migration_len: self.migration.len(),
366 migration_event_len: self.migration_event.len(),
367 namespace_len: self.namespace.len(),
368 procedure_len: self.procedure.len(),
369 ringbuffer_len: self.ringbuffer.len(),
370 series_len: self.series.len(),
371 sink_len: self.sink.len(),
372 source_len: self.source.len(),
373 sumtype_len: self.sumtype.len(),
374 test_len: self.test.len(),
375 table_len: self.table.len(),
376 identity_len: self.identity.len(),
377 authentication_len: self.authentication.len(),
378 role_len: self.role.len(),
379 granted_role_len: self.granted_role.len(),
380 policy_len: self.policy.len(),
381 view_len: self.view.len(),
382 row_ttl_len: self.row_ttl.len(),
383 log_len: self.log.len(),
384 }
385 }
386
387 pub fn restore_savepoint(&mut self, sp: CatalogChangesSavepoint) {
388 self.binding.truncate(sp.binding_len);
389 self.config.truncate(sp.config_len);
390 self.dictionary.truncate(sp.dictionary_len);
391 self.flow.truncate(sp.flow_len);
392 self.handler.truncate(sp.handler_len);
393 self.migration.truncate(sp.migration_len);
394 self.migration_event.truncate(sp.migration_event_len);
395 self.namespace.truncate(sp.namespace_len);
396 self.procedure.truncate(sp.procedure_len);
397 self.ringbuffer.truncate(sp.ringbuffer_len);
398 self.series.truncate(sp.series_len);
399 self.sink.truncate(sp.sink_len);
400 self.source.truncate(sp.source_len);
401 self.sumtype.truncate(sp.sumtype_len);
402 self.test.truncate(sp.test_len);
403 self.table.truncate(sp.table_len);
404 self.identity.truncate(sp.identity_len);
405 self.authentication.truncate(sp.authentication_len);
406 self.role.truncate(sp.role_len);
407 self.granted_role.truncate(sp.granted_role_len);
408 self.policy.truncate(sp.policy_len);
409 self.view.truncate(sp.view_len);
410 self.row_ttl.truncate(sp.row_ttl_len);
411 self.log.truncate(sp.log_len);
412 }
413
414 pub fn add_binding_change(&mut self, change: Change<Binding>) {
415 let id = change
416 .post
417 .as_ref()
418 .or(change.pre.as_ref())
419 .map(|b| b.id)
420 .expect("Change must have either pre or post state");
421 let op = change.op;
422 self.binding.push(change);
423 self.log.push(Operation::Binding {
424 id,
425 op,
426 });
427 }
428
429 pub fn add_config_change(&mut self, change: Change<Config>) {
430 let key = change
431 .post
432 .as_ref()
433 .or(change.pre.as_ref())
434 .map(|c| c.key)
435 .expect("Change must have either pre or post state");
436 let op = change.op;
437 self.config.push(change);
438 self.log.push(Operation::Config {
439 key,
440 op,
441 });
442 }
443
444 pub fn add_dictionary_change(&mut self, change: Change<Dictionary>) {
445 let id = change
446 .post
447 .as_ref()
448 .or(change.pre.as_ref())
449 .map(|d| d.id)
450 .expect("Change must have either pre or post state");
451 let op = change.op;
452 self.dictionary.push(change);
453 self.log.push(Operation::Dictionary {
454 id,
455 op,
456 });
457 }
458
459 pub fn add_flow_change(&mut self, change: Change<Flow>) {
460 let id = change
461 .post
462 .as_ref()
463 .or(change.pre.as_ref())
464 .map(|f| f.id)
465 .expect("Change must have either pre or post state");
466 let op = change.op;
467 self.flow.push(change);
468 self.log.push(Operation::Flow {
469 id,
470 op,
471 });
472 }
473
474 pub fn add_namespace_change(&mut self, change: Change<Namespace>) {
475 let id = change
476 .post
477 .as_ref()
478 .or(change.pre.as_ref())
479 .map(|s| s.id())
480 .expect("Change must have either pre or post state");
481 let op = change.op;
482 self.namespace.push(change);
483 self.log.push(Operation::Namespace {
484 id,
485 op,
486 });
487 }
488
489 pub fn add_handler_change(&mut self, change: Change<Handler>) {
490 let id = change
491 .post
492 .as_ref()
493 .or(change.pre.as_ref())
494 .map(|h| h.id)
495 .expect("Change must have either pre or post state");
496 let op = change.op;
497 self.handler.push(change);
498 self.log.push(Operation::Handler {
499 id,
500 op,
501 });
502 }
503
504 pub fn add_migration_change(&mut self, change: Change<Migration>) {
505 let id = change
506 .post
507 .as_ref()
508 .or(change.pre.as_ref())
509 .map(|m| m.id)
510 .expect("Change must have either pre or post state");
511 let op = change.op;
512 self.migration.push(change);
513 self.log.push(Operation::Migration {
514 id,
515 op,
516 });
517 }
518
519 pub fn add_migration_event_change(&mut self, change: Change<MigrationEvent>) {
520 let id = change
521 .post
522 .as_ref()
523 .or(change.pre.as_ref())
524 .map(|e| e.id)
525 .expect("Change must have either pre or post state");
526 let op = change.op;
527 self.migration_event.push(change);
528 self.log.push(Operation::MigrationEvent {
529 id,
530 op,
531 });
532 }
533
534 pub fn add_procedure_change(&mut self, change: Change<Procedure>) {
535 let id = change
536 .post
537 .as_ref()
538 .or(change.pre.as_ref())
539 .map(|p| p.id())
540 .expect("Change must have either pre or post state");
541 let op = change.op;
542 self.procedure.push(change);
543 self.log.push(Operation::Procedure {
544 id,
545 op,
546 });
547 }
548
549 pub fn add_test_change(&mut self, change: Change<Test>) {
550 let id = change
551 .post
552 .as_ref()
553 .or(change.pre.as_ref())
554 .map(|t| t.id)
555 .expect("Change must have either pre or post state");
556 let op = change.op;
557 self.test.push(change);
558 self.log.push(Operation::Test {
559 id,
560 op,
561 });
562 }
563
564 pub fn add_ringbuffer_change(&mut self, change: Change<RingBuffer>) {
565 let id = change
566 .post
567 .as_ref()
568 .or(change.pre.as_ref())
569 .map(|rb| rb.id)
570 .expect("Change must have either pre or post state");
571 let op = change.op;
572 self.ringbuffer.push(change);
573 self.log.push(Operation::RingBuffer {
574 id,
575 op,
576 });
577 }
578
579 pub fn add_series_change(&mut self, change: Change<Series>) {
580 let id = change
581 .post
582 .as_ref()
583 .or(change.pre.as_ref())
584 .map(|s| s.id)
585 .expect("Change must have either pre or post state");
586 let op = change.op;
587 self.series.push(change);
588 self.log.push(Operation::Series {
589 id,
590 op,
591 });
592 }
593
594 pub fn add_sink_change(&mut self, change: Change<Sink>) {
595 let id = change
596 .post
597 .as_ref()
598 .or(change.pre.as_ref())
599 .map(|s| s.id)
600 .expect("Change must have either pre or post state");
601 let op = change.op;
602 self.sink.push(change);
603 self.log.push(Operation::Sink {
604 id,
605 op,
606 });
607 }
608
609 pub fn add_source_change(&mut self, change: Change<Source>) {
610 let id = change
611 .post
612 .as_ref()
613 .or(change.pre.as_ref())
614 .map(|s| s.id)
615 .expect("Change must have either pre or post state");
616 let op = change.op;
617 self.source.push(change);
618 self.log.push(Operation::Source {
619 id,
620 op,
621 });
622 }
623
624 pub fn add_table_change(&mut self, change: Change<Table>) {
625 let id = change
626 .post
627 .as_ref()
628 .or(change.pre.as_ref())
629 .map(|t| t.id)
630 .expect("Change must have either pre or post state");
631 let op = change.op;
632 self.table.push(change);
633 self.log.push(Operation::Table {
634 id,
635 op,
636 });
637 }
638
639 pub fn add_view_change(&mut self, change: Change<View>) {
640 let id = change
641 .post
642 .as_ref()
643 .or(change.pre.as_ref())
644 .map(|v| v.id())
645 .expect("Change must have either pre or post state");
646 let op = change.op;
647 self.view.push(change);
648 self.log.push(Operation::View {
649 id,
650 op,
651 });
652 }
653
654 pub fn add_sumtype_change(&mut self, change: Change<SumType>) {
655 let id = change
656 .post
657 .as_ref()
658 .or(change.pre.as_ref())
659 .map(|s| s.id)
660 .expect("Change must have either pre or post state");
661 let op = change.op;
662 self.sumtype.push(change);
663 self.log.push(Operation::SumType {
664 id,
665 op,
666 });
667 }
668
669 pub fn add_identity_change(&mut self, change: Change<Identity>) {
670 let id = change
671 .post
672 .as_ref()
673 .or(change.pre.as_ref())
674 .map(|u| u.id)
675 .expect("Change must have either pre or post state");
676 let op = change.op;
677 self.identity.push(change);
678 self.log.push(Operation::Identity {
679 id,
680 op,
681 });
682 }
683
684 pub fn add_role_change(&mut self, change: Change<Role>) {
685 let id = change
686 .post
687 .as_ref()
688 .or(change.pre.as_ref())
689 .map(|r| r.id)
690 .expect("Change must have either pre or post state");
691 let op = change.op;
692 self.role.push(change);
693 self.log.push(Operation::Role {
694 id,
695 op,
696 });
697 }
698
699 pub fn add_granted_role_change(&mut self, change: Change<GrantedRole>) {
700 let identity = change
701 .post
702 .as_ref()
703 .or(change.pre.as_ref())
704 .map(|ur| ur.identity)
705 .expect("Change must have either pre or post state");
706 let role = change
707 .post
708 .as_ref()
709 .or(change.pre.as_ref())
710 .map(|ur| ur.role_id)
711 .expect("Change must have either pre or post state");
712 let op = change.op;
713 self.granted_role.push(change);
714 self.log.push(Operation::GrantedRole {
715 identity,
716 role,
717 op,
718 });
719 }
720
721 pub fn add_authentication_change(&mut self, change: Change<Authentication>) {
722 let id = change
723 .post
724 .as_ref()
725 .or(change.pre.as_ref())
726 .map(|a| a.id)
727 .expect("Change must have either pre or post state");
728 let op = change.op;
729 self.authentication.push(change);
730 self.log.push(Operation::Authentication {
731 id,
732 op,
733 });
734 }
735
736 pub fn add_policy_change(&mut self, change: Change<Policy>) {
737 let id = change
738 .post
739 .as_ref()
740 .or(change.pre.as_ref())
741 .map(|p| p.id)
742 .expect("Change must have either pre or post state");
743 let op = change.op;
744 self.policy.push(change);
745 self.log.push(Operation::Policy {
746 id,
747 op,
748 });
749 }
750
751 pub fn add_row_ttl_change(&mut self, change: Change<(ShapeId, RowTtl)>) {
752 let shape = change
753 .post
754 .as_ref()
755 .or(change.pre.as_ref())
756 .map(|(s, _)| *s)
757 .expect("Change must have either pre or post state");
758 let op = change.op;
759 self.row_ttl.push(change);
760 self.log.push(Operation::RowTtl {
761 shape,
762 op,
763 });
764 }
765}
766
767#[derive(Debug, Clone)]
769pub struct Change<T> {
770 pub pre: Option<T>,
772
773 pub post: Option<T>,
775
776 pub op: OperationType,
778}
779
780#[derive(Debug, Clone, Copy, PartialEq)]
781pub enum OperationType {
782 Create,
783 Update,
784 Delete,
785}
786
787#[derive(Debug, Clone)]
789pub enum Operation {
790 Binding {
791 id: BindingId,
792 op: OperationType,
793 },
794 Config {
795 key: ConfigKey,
796 op: OperationType,
797 },
798 Dictionary {
799 id: DictionaryId,
800 op: OperationType,
801 },
802 Flow {
803 id: FlowId,
804 op: OperationType,
805 },
806 Handler {
807 id: HandlerId,
808 op: OperationType,
809 },
810 Migration {
811 id: MigrationId,
812 op: OperationType,
813 },
814 MigrationEvent {
815 id: MigrationEventId,
816 op: OperationType,
817 },
818 Namespace {
819 id: NamespaceId,
820 op: OperationType,
821 },
822 Procedure {
823 id: ProcedureId,
824 op: OperationType,
825 },
826 RingBuffer {
827 id: RingBufferId,
828 op: OperationType,
829 },
830 Series {
831 id: SeriesId,
832 op: OperationType,
833 },
834 Sink {
835 id: SinkId,
836 op: OperationType,
837 },
838 Source {
839 id: SourceId,
840 op: OperationType,
841 },
842 SumType {
843 id: SumTypeId,
844 op: OperationType,
845 },
846 Test {
847 id: TestId,
848 op: OperationType,
849 },
850 Table {
851 id: TableId,
852 op: OperationType,
853 },
854 Identity {
855 id: IdentityId,
856 op: OperationType,
857 },
858 Authentication {
859 id: AuthenticationId,
860 op: OperationType,
861 },
862 Role {
863 id: RoleId,
864 op: OperationType,
865 },
866 GrantedRole {
867 identity: IdentityId,
868 role: RoleId,
869 op: OperationType,
870 },
871 Policy {
872 id: PolicyId,
873 op: OperationType,
874 },
875 View {
876 id: ViewId,
877 op: OperationType,
878 },
879 RowTtl {
880 shape: ShapeId,
881 op: OperationType,
882 },
883}
884
885impl TransactionalCatalogChanges {
886 pub fn new(txn_id: TransactionId) -> Self {
887 Self {
888 txn_id,
889 binding: Vec::new(),
890 config: Vec::new(),
891 dictionary: Vec::new(),
892 flow: Vec::new(),
893 handler: Vec::new(),
894 migration: Vec::new(),
895 migration_event: Vec::new(),
896 namespace: Vec::new(),
897 procedure: Vec::new(),
898 ringbuffer: Vec::new(),
899 series: Vec::new(),
900 sink: Vec::new(),
901 source: Vec::new(),
902 sumtype: Vec::new(),
903 test: Vec::new(),
904 table: Vec::new(),
905 identity: Vec::new(),
906 authentication: Vec::new(),
907 role: Vec::new(),
908 granted_role: Vec::new(),
909 policy: Vec::new(),
910 view: Vec::new(),
911 row_ttl: Vec::new(),
912 log: Vec::new(),
913 }
914 }
915
916 pub fn table_exists(&self, id: TableId) -> bool {
918 self.get_table(id).is_some()
919 }
920
921 pub fn get_table(&self, id: TableId) -> Option<&Table> {
923 for change in self.table.iter().rev() {
925 if let Some(table) = &change.post {
926 if table.id == id {
927 return Some(table);
928 }
929 } else if let Some(table) = &change.pre
930 && table.id == id && change.op == Delete
931 {
932 return None;
934 }
935 }
936 None
937 }
938
939 pub fn view_exists(&self, id: ViewId) -> bool {
941 self.get_view(id).is_some()
942 }
943
944 pub fn get_view(&self, id: ViewId) -> Option<&View> {
946 for change in self.view.iter().rev() {
948 if let Some(view) = &change.post {
949 if view.id() == id {
950 return Some(view);
951 }
952 } else if let Some(view) = &change.pre
953 && view.id() == id && change.op == Delete
954 {
955 return None;
957 }
958 }
959 None
960 }
961
962 pub fn get_row_ttl(&self, shape: ShapeId) -> Option<&RowTtl> {
964 for change in self.row_ttl.iter().rev() {
966 if let Some((s, ttl)) = &change.post {
967 if *s == shape {
968 return Some(ttl);
969 }
970 } else if let Some((s, _)) = &change.pre
971 && *s == shape && change.op == Delete
972 {
973 return None;
975 }
976 }
977 None
978 }
979
980 pub fn get_pending_changes(&self) -> &[Operation] {
982 &self.log
983 }
984
985 pub fn txn_id(&self) -> TransactionId {
987 self.txn_id
988 }
989
990 pub fn namespace(&self) -> &[Change<Namespace>] {
992 &self.namespace
993 }
994
995 pub fn table(&self) -> &[Change<Table>] {
997 &self.table
998 }
999
1000 pub fn view(&self) -> &[Change<View>] {
1002 &self.view
1003 }
1004
1005 pub fn clear(&mut self) {
1007 self.binding.clear();
1008 self.config.clear();
1009 self.dictionary.clear();
1010 self.flow.clear();
1011 self.handler.clear();
1012 self.migration.clear();
1013 self.migration_event.clear();
1014 self.namespace.clear();
1015 self.procedure.clear();
1016 self.ringbuffer.clear();
1017 self.series.clear();
1018 self.sink.clear();
1019 self.source.clear();
1020 self.sumtype.clear();
1021 self.test.clear();
1022 self.table.clear();
1023 self.identity.clear();
1024 self.authentication.clear();
1025 self.role.clear();
1026 self.granted_role.clear();
1027 self.policy.clear();
1028 self.view.clear();
1029 self.log.clear();
1030 }
1031}
1032
1033#[derive(Debug, Clone)]
1035pub struct TableRowInsertion {
1036 pub table_id: TableId,
1037 pub row_number: RowNumber,
1038 pub encoded: EncodedRow,
1039}
1040
1041#[derive(Debug, Clone)]
1043pub enum RowChange {
1044 TableInsert(TableRowInsertion),
1046 }