1use OperationType::Delete;
5use reifydb_core::{
6 encoded::row::EncodedRow,
7 interface::catalog::{
8 authentication::{Authentication, AuthenticationId},
9 binding::Binding,
10 config::{Config, ConfigKey},
11 dictionary::Dictionary,
12 flow::{Flow, FlowId, FlowNodeId},
13 handler::Handler,
14 id::{
15 BindingId, HandlerId, MigrationEventId, MigrationId, NamespaceId, ProcedureId, RingBufferId,
16 SeriesId, SinkId, SourceId, TableId, TestId, ViewId,
17 },
18 identity::{GrantedRole, Identity, Role, RoleId},
19 migration::{Migration, MigrationEvent},
20 namespace::Namespace,
21 policy::{Policy, PolicyId},
22 procedure::Procedure,
23 ringbuffer::RingBuffer,
24 series::Series,
25 shape::ShapeId,
26 sink::Sink,
27 source::Source,
28 sumtype::SumType,
29 table::Table,
30 test::Test,
31 view::View,
32 },
33 row::Ttl,
34};
35use reifydb_type::value::{dictionary::DictionaryId, identity::IdentityId, row_number::RowNumber, sumtype::SumTypeId};
36
37use crate::TransactionId;
38
39pub trait TransactionalChanges:
40 TransactionalBindingChanges
41 + TransactionalDictionaryChanges
42 + TransactionalFlowChanges
43 + TransactionalHandlerChanges
44 + TransactionalMigrationChanges
45 + TransactionalNamespaceChanges
46 + TransactionalProcedureChanges
47 + TransactionalRingBufferChanges
48 + TransactionalRoleChanges
49 + TransactionalPolicyChanges
50 + TransactionalSeriesChanges
51 + TransactionalSinkChanges
52 + TransactionalSourceChanges
53 + TransactionalSumTypeChanges
54 + TransactionalTableChanges
55 + TransactionalTestChanges
56 + TransactionalAuthenticationChanges
57 + TransactionalIdentityChanges
58 + TransactionalGrantedRoleChanges
59 + TransactionalViewChanges
60 + TransactionalConfigChanges
61 + TransactionalRowTtlChanges
62 + TransactionalOperatorTtlChanges
63{
64}
65
66pub trait TransactionalBindingChanges {
67 fn find_binding(&self, id: BindingId) -> Option<&Binding>;
68
69 fn find_binding_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Binding>;
70
71 fn is_binding_deleted(&self, id: BindingId) -> bool;
72
73 fn is_binding_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
74}
75
76pub trait TransactionalRowTtlChanges {
77 fn find_row_ttl(&self, shape: ShapeId) -> Option<&Ttl>;
78
79 fn is_row_ttl_deleted(&self, shape: ShapeId) -> bool;
80}
81
82pub trait TransactionalOperatorTtlChanges {
83 fn find_operator_ttl(&self, node: FlowNodeId) -> Option<&Ttl>;
84
85 fn is_operator_ttl_deleted(&self, node: FlowNodeId) -> bool;
86}
87
88pub trait TransactionalConfigChanges {
89 fn find_config(&self, key: ConfigKey) -> Option<&Config>;
90}
91
92pub trait TransactionalDictionaryChanges {
93 fn find_dictionary(&self, id: DictionaryId) -> Option<&Dictionary>;
94
95 fn find_dictionary_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Dictionary>;
96
97 fn is_dictionary_deleted(&self, id: DictionaryId) -> bool;
98
99 fn is_dictionary_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
100}
101
102pub trait TransactionalNamespaceChanges {
103 fn find_namespace(&self, id: NamespaceId) -> Option<&Namespace>;
104
105 fn find_namespace_by_name(&self, name: &str) -> Option<&Namespace>;
106
107 fn is_namespace_deleted(&self, id: NamespaceId) -> bool;
108
109 fn is_namespace_deleted_by_name(&self, name: &str) -> bool;
110}
111
112pub trait TransactionalFlowChanges {
113 fn find_flow(&self, id: FlowId) -> Option<&Flow>;
114
115 fn find_flow_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Flow>;
116
117 fn is_flow_deleted(&self, id: FlowId) -> bool;
118
119 fn is_flow_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
120}
121
122pub trait TransactionalTableChanges {
123 fn find_table(&self, id: TableId) -> Option<&Table>;
124
125 fn find_table_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Table>;
126
127 fn is_table_deleted(&self, id: TableId) -> bool;
128
129 fn is_table_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
130}
131
132pub trait TransactionalProcedureChanges {
133 fn find_procedure(&self, id: ProcedureId) -> Option<&Procedure>;
134
135 fn find_procedure_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Procedure>;
136
137 fn is_procedure_deleted(&self, id: ProcedureId) -> bool;
138
139 fn is_procedure_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
140}
141
142pub trait TransactionalTestChanges {
143 fn find_test(&self, id: TestId) -> Option<&Test>;
144
145 fn find_test_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Test>;
146
147 fn is_test_deleted(&self, id: TestId) -> bool;
148
149 fn is_test_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
150}
151
152pub trait TransactionalRingBufferChanges {
153 fn find_ringbuffer(&self, id: RingBufferId) -> Option<&RingBuffer>;
154
155 fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&RingBuffer>;
156
157 fn is_ringbuffer_deleted(&self, id: RingBufferId) -> bool;
158
159 fn is_ringbuffer_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
160}
161
162pub trait TransactionalSeriesChanges {
163 fn find_series(&self, id: SeriesId) -> Option<&Series>;
164
165 fn find_series_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Series>;
166
167 fn is_series_deleted(&self, id: SeriesId) -> bool;
168
169 fn is_series_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
170}
171
172pub trait TransactionalViewChanges {
173 fn find_view(&self, id: ViewId) -> Option<&View>;
174
175 fn find_view_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&View>;
176
177 fn is_view_deleted(&self, id: ViewId) -> bool;
178
179 fn is_view_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
180}
181
182pub trait TransactionalSumTypeChanges {
183 fn find_sumtype(&self, id: SumTypeId) -> Option<&SumType>;
184
185 fn find_sumtype_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&SumType>;
186
187 fn is_sumtype_deleted(&self, id: SumTypeId) -> bool;
188
189 fn is_sumtype_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
190}
191
192pub trait TransactionalHandlerChanges {
193 fn find_handler_by_id(&self, id: HandlerId) -> Option<&Handler>;
194
195 fn find_handler_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Handler>;
196
197 fn is_handler_deleted(&self, id: HandlerId) -> bool;
198
199 fn is_handler_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
200}
201
202pub trait TransactionalIdentityChanges {
203 fn find_identity(&self, id: IdentityId) -> Option<&Identity>;
204
205 fn find_identity_by_name(&self, name: &str) -> Option<&Identity>;
206
207 fn is_identity_deleted(&self, id: IdentityId) -> bool;
208
209 fn is_identity_deleted_by_name(&self, name: &str) -> bool;
210}
211
212pub trait TransactionalRoleChanges {
213 fn find_role(&self, id: RoleId) -> Option<&Role>;
214
215 fn find_role_by_name(&self, name: &str) -> Option<&Role>;
216
217 fn is_role_deleted(&self, id: RoleId) -> bool;
218
219 fn is_role_deleted_by_name(&self, name: &str) -> bool;
220}
221
222pub trait TransactionalAuthenticationChanges {
223 fn find_authentication(&self, id: AuthenticationId) -> Option<&Authentication>;
224
225 fn find_authentication_by_identity_and_method(
226 &self,
227 identity: IdentityId,
228 method: &str,
229 ) -> Option<&Authentication>;
230
231 fn is_authentication_deleted(&self, id: AuthenticationId) -> bool;
232
233 fn is_authentication_deleted_by_identity_and_method(&self, identity: IdentityId, method: &str) -> bool;
234}
235
236pub trait TransactionalGrantedRoleChanges {
237 fn find_granted_role(&self, identity: IdentityId, role: RoleId) -> Option<&GrantedRole>;
238
239 fn find_granted_roles_for_identity(&self, identity: IdentityId) -> Vec<&GrantedRole>;
240
241 fn is_granted_role_deleted(&self, identity: IdentityId, role: RoleId) -> bool;
242}
243
244pub trait TransactionalPolicyChanges {
245 fn find_policy(&self, id: PolicyId) -> Option<&Policy>;
246
247 fn find_policy_by_name(&self, name: &str) -> Option<&Policy>;
248
249 fn is_policy_deleted(&self, id: PolicyId) -> bool;
250
251 fn is_policy_deleted_by_name(&self, name: &str) -> bool;
252}
253
254pub trait TransactionalSourceChanges {
255 fn find_source(&self, id: SourceId) -> Option<&Source>;
256
257 fn find_source_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Source>;
258
259 fn is_source_deleted(&self, id: SourceId) -> bool;
260
261 fn is_source_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
262}
263
264pub trait TransactionalSinkChanges {
265 fn find_sink(&self, id: SinkId) -> Option<&Sink>;
266
267 fn find_sink_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Sink>;
268
269 fn is_sink_deleted(&self, id: SinkId) -> bool;
270
271 fn is_sink_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
272}
273
274pub trait TransactionalMigrationChanges {
275 fn find_migration(&self, id: MigrationId) -> Option<&Migration>;
276
277 fn find_migration_by_name(&self, name: &str) -> Option<&Migration>;
278
279 fn is_migration_deleted(&self, id: MigrationId) -> bool;
280
281 fn is_migration_deleted_by_name(&self, name: &str) -> bool;
282}
283
284#[derive(Default, Debug, Clone)]
285pub struct TransactionalCatalogChanges {
286 pub txn_id: TransactionId,
287
288 pub config: Vec<Change<Config>>,
289
290 pub binding: Vec<Change<Binding>>,
291
292 pub dictionary: Vec<Change<Dictionary>>,
293
294 pub flow: Vec<Change<Flow>>,
295
296 pub handler: Vec<Change<Handler>>,
297
298 pub migration: Vec<Change<Migration>>,
299
300 pub migration_event: Vec<Change<MigrationEvent>>,
301
302 pub namespace: Vec<Change<Namespace>>,
303
304 pub procedure: Vec<Change<Procedure>>,
305
306 pub ringbuffer: Vec<Change<RingBuffer>>,
307
308 pub series: Vec<Change<Series>>,
309
310 pub sink: Vec<Change<Sink>>,
311
312 pub source: Vec<Change<Source>>,
313 pub sumtype: Vec<Change<SumType>>,
314
315 pub test: Vec<Change<Test>>,
316
317 pub table: Vec<Change<Table>>,
318
319 pub identity: Vec<Change<Identity>>,
320
321 pub authentication: Vec<Change<Authentication>>,
322
323 pub role: Vec<Change<Role>>,
324
325 pub granted_role: Vec<Change<GrantedRole>>,
326
327 pub policy: Vec<Change<Policy>>,
328
329 pub view: Vec<Change<View>>,
330
331 pub row_ttl: Vec<Change<(ShapeId, Ttl)>>,
332
333 pub operator_ttl: Vec<Change<(FlowNodeId, Ttl)>>,
334
335 pub log: Vec<Operation>,
336}
337
338pub struct CatalogChangesSavepoint {
339 binding_len: usize,
340 config_len: usize,
341 dictionary_len: usize,
342 flow_len: usize,
343 handler_len: usize,
344 migration_len: usize,
345 migration_event_len: usize,
346 namespace_len: usize,
347 procedure_len: usize,
348 ringbuffer_len: usize,
349 series_len: usize,
350 sink_len: usize,
351 source_len: usize,
352 sumtype_len: usize,
353 test_len: usize,
354 table_len: usize,
355 identity_len: usize,
356 authentication_len: usize,
357 role_len: usize,
358 granted_role_len: usize,
359 policy_len: usize,
360 view_len: usize,
361 row_ttl_len: usize,
362 operator_ttl_len: usize,
363 log_len: usize,
364}
365
366impl TransactionalCatalogChanges {
367 pub fn savepoint(&self) -> CatalogChangesSavepoint {
368 CatalogChangesSavepoint {
369 binding_len: self.binding.len(),
370 config_len: self.config.len(),
371 dictionary_len: self.dictionary.len(),
372 flow_len: self.flow.len(),
373 handler_len: self.handler.len(),
374 migration_len: self.migration.len(),
375 migration_event_len: self.migration_event.len(),
376 namespace_len: self.namespace.len(),
377 procedure_len: self.procedure.len(),
378 ringbuffer_len: self.ringbuffer.len(),
379 series_len: self.series.len(),
380 sink_len: self.sink.len(),
381 source_len: self.source.len(),
382 sumtype_len: self.sumtype.len(),
383 test_len: self.test.len(),
384 table_len: self.table.len(),
385 identity_len: self.identity.len(),
386 authentication_len: self.authentication.len(),
387 role_len: self.role.len(),
388 granted_role_len: self.granted_role.len(),
389 policy_len: self.policy.len(),
390 view_len: self.view.len(),
391 row_ttl_len: self.row_ttl.len(),
392 operator_ttl_len: self.operator_ttl.len(),
393 log_len: self.log.len(),
394 }
395 }
396
397 pub fn restore_savepoint(&mut self, sp: CatalogChangesSavepoint) {
398 self.binding.truncate(sp.binding_len);
399 self.config.truncate(sp.config_len);
400 self.dictionary.truncate(sp.dictionary_len);
401 self.flow.truncate(sp.flow_len);
402 self.handler.truncate(sp.handler_len);
403 self.migration.truncate(sp.migration_len);
404 self.migration_event.truncate(sp.migration_event_len);
405 self.namespace.truncate(sp.namespace_len);
406 self.procedure.truncate(sp.procedure_len);
407 self.ringbuffer.truncate(sp.ringbuffer_len);
408 self.series.truncate(sp.series_len);
409 self.sink.truncate(sp.sink_len);
410 self.source.truncate(sp.source_len);
411 self.sumtype.truncate(sp.sumtype_len);
412 self.test.truncate(sp.test_len);
413 self.table.truncate(sp.table_len);
414 self.identity.truncate(sp.identity_len);
415 self.authentication.truncate(sp.authentication_len);
416 self.role.truncate(sp.role_len);
417 self.granted_role.truncate(sp.granted_role_len);
418 self.policy.truncate(sp.policy_len);
419 self.view.truncate(sp.view_len);
420 self.row_ttl.truncate(sp.row_ttl_len);
421 self.operator_ttl.truncate(sp.operator_ttl_len);
422 self.log.truncate(sp.log_len);
423 }
424
425 pub fn add_binding_change(&mut self, change: Change<Binding>) {
426 let id = change
427 .post
428 .as_ref()
429 .or(change.pre.as_ref())
430 .map(|b| b.id)
431 .expect("Change must have either pre or post state");
432 let op = change.op;
433 self.binding.push(change);
434 self.log.push(Operation::Binding {
435 id,
436 op,
437 });
438 }
439
440 pub fn add_config_change(&mut self, change: Change<Config>) {
441 let key = change
442 .post
443 .as_ref()
444 .or(change.pre.as_ref())
445 .map(|c| c.key)
446 .expect("Change must have either pre or post state");
447 let op = change.op;
448 self.config.push(change);
449 self.log.push(Operation::Config {
450 key,
451 op,
452 });
453 }
454
455 pub fn add_dictionary_change(&mut self, change: Change<Dictionary>) {
456 let id = change
457 .post
458 .as_ref()
459 .or(change.pre.as_ref())
460 .map(|d| d.id)
461 .expect("Change must have either pre or post state");
462 let op = change.op;
463 self.dictionary.push(change);
464 self.log.push(Operation::Dictionary {
465 id,
466 op,
467 });
468 }
469
470 pub fn add_flow_change(&mut self, change: Change<Flow>) {
471 let id = change
472 .post
473 .as_ref()
474 .or(change.pre.as_ref())
475 .map(|f| f.id)
476 .expect("Change must have either pre or post state");
477 let op = change.op;
478 self.flow.push(change);
479 self.log.push(Operation::Flow {
480 id,
481 op,
482 });
483 }
484
485 pub fn add_namespace_change(&mut self, change: Change<Namespace>) {
486 let id = change
487 .post
488 .as_ref()
489 .or(change.pre.as_ref())
490 .map(|s| s.id())
491 .expect("Change must have either pre or post state");
492 let op = change.op;
493 self.namespace.push(change);
494 self.log.push(Operation::Namespace {
495 id,
496 op,
497 });
498 }
499
500 pub fn add_handler_change(&mut self, change: Change<Handler>) {
501 let id = change
502 .post
503 .as_ref()
504 .or(change.pre.as_ref())
505 .map(|h| h.id)
506 .expect("Change must have either pre or post state");
507 let op = change.op;
508 self.handler.push(change);
509 self.log.push(Operation::Handler {
510 id,
511 op,
512 });
513 }
514
515 pub fn add_migration_change(&mut self, change: Change<Migration>) {
516 let id = change
517 .post
518 .as_ref()
519 .or(change.pre.as_ref())
520 .map(|m| m.id)
521 .expect("Change must have either pre or post state");
522 let op = change.op;
523 self.migration.push(change);
524 self.log.push(Operation::Migration {
525 id,
526 op,
527 });
528 }
529
530 pub fn add_migration_event_change(&mut self, change: Change<MigrationEvent>) {
531 let id = change
532 .post
533 .as_ref()
534 .or(change.pre.as_ref())
535 .map(|e| e.id)
536 .expect("Change must have either pre or post state");
537 let op = change.op;
538 self.migration_event.push(change);
539 self.log.push(Operation::MigrationEvent {
540 id,
541 op,
542 });
543 }
544
545 pub fn add_procedure_change(&mut self, change: Change<Procedure>) {
546 let id = change
547 .post
548 .as_ref()
549 .or(change.pre.as_ref())
550 .map(|p| p.id())
551 .expect("Change must have either pre or post state");
552 let op = change.op;
553 self.procedure.push(change);
554 self.log.push(Operation::Procedure {
555 id,
556 op,
557 });
558 }
559
560 pub fn add_test_change(&mut self, change: Change<Test>) {
561 let id = change
562 .post
563 .as_ref()
564 .or(change.pre.as_ref())
565 .map(|t| t.id)
566 .expect("Change must have either pre or post state");
567 let op = change.op;
568 self.test.push(change);
569 self.log.push(Operation::Test {
570 id,
571 op,
572 });
573 }
574
575 pub fn add_ringbuffer_change(&mut self, change: Change<RingBuffer>) {
576 let id = change
577 .post
578 .as_ref()
579 .or(change.pre.as_ref())
580 .map(|rb| rb.id)
581 .expect("Change must have either pre or post state");
582 let op = change.op;
583 self.ringbuffer.push(change);
584 self.log.push(Operation::RingBuffer {
585 id,
586 op,
587 });
588 }
589
590 pub fn add_series_change(&mut self, change: Change<Series>) {
591 let id = change
592 .post
593 .as_ref()
594 .or(change.pre.as_ref())
595 .map(|s| s.id)
596 .expect("Change must have either pre or post state");
597 let op = change.op;
598 self.series.push(change);
599 self.log.push(Operation::Series {
600 id,
601 op,
602 });
603 }
604
605 pub fn add_sink_change(&mut self, change: Change<Sink>) {
606 let id = change
607 .post
608 .as_ref()
609 .or(change.pre.as_ref())
610 .map(|s| s.id)
611 .expect("Change must have either pre or post state");
612 let op = change.op;
613 self.sink.push(change);
614 self.log.push(Operation::Sink {
615 id,
616 op,
617 });
618 }
619
620 pub fn add_source_change(&mut self, change: Change<Source>) {
621 let id = change
622 .post
623 .as_ref()
624 .or(change.pre.as_ref())
625 .map(|s| s.id)
626 .expect("Change must have either pre or post state");
627 let op = change.op;
628 self.source.push(change);
629 self.log.push(Operation::Source {
630 id,
631 op,
632 });
633 }
634
635 pub fn add_table_change(&mut self, change: Change<Table>) {
636 let id = change
637 .post
638 .as_ref()
639 .or(change.pre.as_ref())
640 .map(|t| t.id)
641 .expect("Change must have either pre or post state");
642 let op = change.op;
643 self.table.push(change);
644 self.log.push(Operation::Table {
645 id,
646 op,
647 });
648 }
649
650 pub fn add_view_change(&mut self, change: Change<View>) {
651 let id = change
652 .post
653 .as_ref()
654 .or(change.pre.as_ref())
655 .map(|v| v.id())
656 .expect("Change must have either pre or post state");
657 let op = change.op;
658 self.view.push(change);
659 self.log.push(Operation::View {
660 id,
661 op,
662 });
663 }
664
665 pub fn add_sumtype_change(&mut self, change: Change<SumType>) {
666 let id = change
667 .post
668 .as_ref()
669 .or(change.pre.as_ref())
670 .map(|s| s.id)
671 .expect("Change must have either pre or post state");
672 let op = change.op;
673 self.sumtype.push(change);
674 self.log.push(Operation::SumType {
675 id,
676 op,
677 });
678 }
679
680 pub fn add_identity_change(&mut self, change: Change<Identity>) {
681 let id = change
682 .post
683 .as_ref()
684 .or(change.pre.as_ref())
685 .map(|u| u.id)
686 .expect("Change must have either pre or post state");
687 let op = change.op;
688 self.identity.push(change);
689 self.log.push(Operation::Identity {
690 id,
691 op,
692 });
693 }
694
695 pub fn add_role_change(&mut self, change: Change<Role>) {
696 let id = change
697 .post
698 .as_ref()
699 .or(change.pre.as_ref())
700 .map(|r| r.id)
701 .expect("Change must have either pre or post state");
702 let op = change.op;
703 self.role.push(change);
704 self.log.push(Operation::Role {
705 id,
706 op,
707 });
708 }
709
710 pub fn add_granted_role_change(&mut self, change: Change<GrantedRole>) {
711 let identity = change
712 .post
713 .as_ref()
714 .or(change.pre.as_ref())
715 .map(|ur| ur.identity)
716 .expect("Change must have either pre or post state");
717 let role = change
718 .post
719 .as_ref()
720 .or(change.pre.as_ref())
721 .map(|ur| ur.role_id)
722 .expect("Change must have either pre or post state");
723 let op = change.op;
724 self.granted_role.push(change);
725 self.log.push(Operation::GrantedRole {
726 identity,
727 role,
728 op,
729 });
730 }
731
732 pub fn add_authentication_change(&mut self, change: Change<Authentication>) {
733 let id = change
734 .post
735 .as_ref()
736 .or(change.pre.as_ref())
737 .map(|a| a.id)
738 .expect("Change must have either pre or post state");
739 let op = change.op;
740 self.authentication.push(change);
741 self.log.push(Operation::Authentication {
742 id,
743 op,
744 });
745 }
746
747 pub fn add_policy_change(&mut self, change: Change<Policy>) {
748 let id = change
749 .post
750 .as_ref()
751 .or(change.pre.as_ref())
752 .map(|p| p.id)
753 .expect("Change must have either pre or post state");
754 let op = change.op;
755 self.policy.push(change);
756 self.log.push(Operation::Policy {
757 id,
758 op,
759 });
760 }
761
762 pub fn add_row_ttl_change(&mut self, change: Change<(ShapeId, Ttl)>) {
763 let shape = change
764 .post
765 .as_ref()
766 .or(change.pre.as_ref())
767 .map(|(s, _)| *s)
768 .expect("Change must have either pre or post state");
769 let op = change.op;
770 self.row_ttl.push(change);
771 self.log.push(Operation::Ttl {
772 shape,
773 op,
774 });
775 }
776
777 pub fn add_operator_ttl_change(&mut self, change: Change<(FlowNodeId, Ttl)>) {
778 let node = change
779 .post
780 .as_ref()
781 .or(change.pre.as_ref())
782 .map(|(n, _)| *n)
783 .expect("Change must have either pre or post state");
784 let op = change.op;
785 self.operator_ttl.push(change);
786 self.log.push(Operation::OperatorTtl {
787 node,
788 op,
789 });
790 }
791}
792
793#[derive(Debug, Clone)]
794pub struct Change<T> {
795 pub pre: Option<T>,
796
797 pub post: Option<T>,
798
799 pub op: OperationType,
800}
801
802#[derive(Debug, Clone, Copy, PartialEq)]
803pub enum OperationType {
804 Create,
805 Update,
806 Delete,
807}
808
809#[derive(Debug, Clone)]
810pub enum Operation {
811 Binding {
812 id: BindingId,
813 op: OperationType,
814 },
815 Config {
816 key: ConfigKey,
817 op: OperationType,
818 },
819 Dictionary {
820 id: DictionaryId,
821 op: OperationType,
822 },
823 Flow {
824 id: FlowId,
825 op: OperationType,
826 },
827 Handler {
828 id: HandlerId,
829 op: OperationType,
830 },
831 Migration {
832 id: MigrationId,
833 op: OperationType,
834 },
835 MigrationEvent {
836 id: MigrationEventId,
837 op: OperationType,
838 },
839 Namespace {
840 id: NamespaceId,
841 op: OperationType,
842 },
843 Procedure {
844 id: ProcedureId,
845 op: OperationType,
846 },
847 RingBuffer {
848 id: RingBufferId,
849 op: OperationType,
850 },
851 Series {
852 id: SeriesId,
853 op: OperationType,
854 },
855 Sink {
856 id: SinkId,
857 op: OperationType,
858 },
859 Source {
860 id: SourceId,
861 op: OperationType,
862 },
863 SumType {
864 id: SumTypeId,
865 op: OperationType,
866 },
867 Test {
868 id: TestId,
869 op: OperationType,
870 },
871 Table {
872 id: TableId,
873 op: OperationType,
874 },
875 Identity {
876 id: IdentityId,
877 op: OperationType,
878 },
879 Authentication {
880 id: AuthenticationId,
881 op: OperationType,
882 },
883 Role {
884 id: RoleId,
885 op: OperationType,
886 },
887 GrantedRole {
888 identity: IdentityId,
889 role: RoleId,
890 op: OperationType,
891 },
892 Policy {
893 id: PolicyId,
894 op: OperationType,
895 },
896 View {
897 id: ViewId,
898 op: OperationType,
899 },
900 Ttl {
901 shape: ShapeId,
902 op: OperationType,
903 },
904 OperatorTtl {
905 node: FlowNodeId,
906 op: OperationType,
907 },
908}
909
910impl TransactionalCatalogChanges {
911 pub fn new(txn_id: TransactionId) -> Self {
912 Self {
913 txn_id,
914 binding: Vec::new(),
915 config: Vec::new(),
916 dictionary: Vec::new(),
917 flow: Vec::new(),
918 handler: Vec::new(),
919 migration: Vec::new(),
920 migration_event: Vec::new(),
921 namespace: Vec::new(),
922 procedure: Vec::new(),
923 ringbuffer: Vec::new(),
924 series: Vec::new(),
925 sink: Vec::new(),
926 source: Vec::new(),
927 sumtype: Vec::new(),
928 test: Vec::new(),
929 table: Vec::new(),
930 identity: Vec::new(),
931 authentication: Vec::new(),
932 role: Vec::new(),
933 granted_role: Vec::new(),
934 policy: Vec::new(),
935 view: Vec::new(),
936 row_ttl: Vec::new(),
937 operator_ttl: Vec::new(),
938 log: Vec::new(),
939 }
940 }
941
942 pub fn table_exists(&self, id: TableId) -> bool {
943 self.get_table(id).is_some()
944 }
945
946 pub fn get_table(&self, id: TableId) -> Option<&Table> {
947 for change in self.table.iter().rev() {
948 if let Some(table) = &change.post {
949 if table.id == id {
950 return Some(table);
951 }
952 } else if let Some(table) = &change.pre
953 && table.id == id && change.op == Delete
954 {
955 return None;
956 }
957 }
958 None
959 }
960
961 pub fn view_exists(&self, id: ViewId) -> bool {
962 self.get_view(id).is_some()
963 }
964
965 pub fn get_view(&self, id: ViewId) -> Option<&View> {
966 for change in self.view.iter().rev() {
967 if let Some(view) = &change.post {
968 if view.id() == id {
969 return Some(view);
970 }
971 } else if let Some(view) = &change.pre
972 && view.id() == id && change.op == Delete
973 {
974 return None;
975 }
976 }
977 None
978 }
979
980 pub fn get_row_ttl(&self, shape: ShapeId) -> Option<&Ttl> {
981 for change in self.row_ttl.iter().rev() {
982 if let Some((s, ttl)) = &change.post {
983 if *s == shape {
984 return Some(ttl);
985 }
986 } else if let Some((s, _)) = &change.pre
987 && *s == shape && change.op == Delete
988 {
989 return None;
990 }
991 }
992 None
993 }
994
995 pub fn get_pending_changes(&self) -> &[Operation] {
996 &self.log
997 }
998
999 pub fn txn_id(&self) -> TransactionId {
1000 self.txn_id
1001 }
1002
1003 pub fn namespace(&self) -> &[Change<Namespace>] {
1004 &self.namespace
1005 }
1006
1007 pub fn table(&self) -> &[Change<Table>] {
1008 &self.table
1009 }
1010
1011 pub fn view(&self) -> &[Change<View>] {
1012 &self.view
1013 }
1014
1015 pub fn clear(&mut self) {
1016 self.binding.clear();
1017 self.config.clear();
1018 self.dictionary.clear();
1019 self.flow.clear();
1020 self.handler.clear();
1021 self.migration.clear();
1022 self.migration_event.clear();
1023 self.namespace.clear();
1024 self.procedure.clear();
1025 self.ringbuffer.clear();
1026 self.series.clear();
1027 self.sink.clear();
1028 self.source.clear();
1029 self.sumtype.clear();
1030 self.test.clear();
1031 self.table.clear();
1032 self.identity.clear();
1033 self.authentication.clear();
1034 self.role.clear();
1035 self.granted_role.clear();
1036 self.policy.clear();
1037 self.view.clear();
1038 self.log.clear();
1039 }
1040}
1041
1042#[derive(Debug, Clone)]
1043pub struct TableRowInsertion {
1044 pub table_id: TableId,
1045 pub row_number: RowNumber,
1046 pub encoded: EncodedRow,
1047}
1048
1049#[derive(Debug, Clone)]
1050pub enum RowChange {
1051 TableInsert(TableRowInsertion),
1052}