Skip to main content

reifydb_transaction/
change.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use OperationType::Delete;
5use reifydb_core::{
6	encoded::row::EncodedRow,
7	interface::catalog::{
8		authentication::{Authentication, AuthenticationId},
9		binding::Binding,
10		config::{Config, ConfigKey},
11		dictionary::Dictionary,
12		flow::{Flow, FlowId, FlowNodeId},
13		handler::Handler,
14		id::{
15			BindingId, HandlerId, MigrationEventId, MigrationId, NamespaceId, ProcedureId, RingBufferId,
16			SeriesId, SinkId, SourceId, TableId, TestId, ViewId,
17		},
18		identity::{GrantedRole, Identity, Role, RoleId},
19		migration::{Migration, MigrationEvent},
20		namespace::Namespace,
21		policy::{Policy, PolicyId},
22		procedure::Procedure,
23		ringbuffer::RingBuffer,
24		series::Series,
25		shape::ShapeId,
26		sink::Sink,
27		source::Source,
28		sumtype::SumType,
29		table::Table,
30		test::Test,
31		view::View,
32	},
33	row::Ttl,
34};
35use reifydb_type::value::{dictionary::DictionaryId, identity::IdentityId, row_number::RowNumber, sumtype::SumTypeId};
36
37use crate::TransactionId;
38
39pub trait TransactionalChanges:
40	TransactionalBindingChanges
41	+ TransactionalDictionaryChanges
42	+ TransactionalFlowChanges
43	+ TransactionalHandlerChanges
44	+ TransactionalMigrationChanges
45	+ TransactionalNamespaceChanges
46	+ TransactionalProcedureChanges
47	+ TransactionalRingBufferChanges
48	+ TransactionalRoleChanges
49	+ TransactionalPolicyChanges
50	+ TransactionalSeriesChanges
51	+ TransactionalSinkChanges
52	+ TransactionalSourceChanges
53	+ TransactionalSumTypeChanges
54	+ TransactionalTableChanges
55	+ TransactionalTestChanges
56	+ TransactionalAuthenticationChanges
57	+ TransactionalIdentityChanges
58	+ TransactionalGrantedRoleChanges
59	+ TransactionalViewChanges
60	+ TransactionalConfigChanges
61	+ TransactionalRowTtlChanges
62	+ TransactionalOperatorTtlChanges
63{
64}
65
66pub trait TransactionalBindingChanges {
67	fn find_binding(&self, id: BindingId) -> Option<&Binding>;
68
69	fn find_binding_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Binding>;
70
71	fn is_binding_deleted(&self, id: BindingId) -> bool;
72
73	fn is_binding_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
74}
75
76pub trait TransactionalRowTtlChanges {
77	fn find_row_ttl(&self, shape: ShapeId) -> Option<&Ttl>;
78
79	fn is_row_ttl_deleted(&self, shape: ShapeId) -> bool;
80}
81
82pub trait TransactionalOperatorTtlChanges {
83	fn find_operator_ttl(&self, node: FlowNodeId) -> Option<&Ttl>;
84
85	fn is_operator_ttl_deleted(&self, node: FlowNodeId) -> bool;
86}
87
88pub trait TransactionalConfigChanges {
89	fn find_config(&self, key: ConfigKey) -> Option<&Config>;
90}
91
92pub trait TransactionalDictionaryChanges {
93	fn find_dictionary(&self, id: DictionaryId) -> Option<&Dictionary>;
94
95	fn find_dictionary_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Dictionary>;
96
97	fn is_dictionary_deleted(&self, id: DictionaryId) -> bool;
98
99	fn is_dictionary_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
100}
101
102pub trait TransactionalNamespaceChanges {
103	fn find_namespace(&self, id: NamespaceId) -> Option<&Namespace>;
104
105	fn find_namespace_by_name(&self, name: &str) -> Option<&Namespace>;
106
107	fn is_namespace_deleted(&self, id: NamespaceId) -> bool;
108
109	fn is_namespace_deleted_by_name(&self, name: &str) -> bool;
110}
111
112pub trait TransactionalFlowChanges {
113	fn find_flow(&self, id: FlowId) -> Option<&Flow>;
114
115	fn find_flow_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Flow>;
116
117	fn is_flow_deleted(&self, id: FlowId) -> bool;
118
119	fn is_flow_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
120}
121
122pub trait TransactionalTableChanges {
123	fn find_table(&self, id: TableId) -> Option<&Table>;
124
125	fn find_table_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Table>;
126
127	fn is_table_deleted(&self, id: TableId) -> bool;
128
129	fn is_table_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
130}
131
132pub trait TransactionalProcedureChanges {
133	fn find_procedure(&self, id: ProcedureId) -> Option<&Procedure>;
134
135	fn find_procedure_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Procedure>;
136
137	fn is_procedure_deleted(&self, id: ProcedureId) -> bool;
138
139	fn is_procedure_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
140}
141
142pub trait TransactionalTestChanges {
143	fn find_test(&self, id: TestId) -> Option<&Test>;
144
145	fn find_test_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Test>;
146
147	fn is_test_deleted(&self, id: TestId) -> bool;
148
149	fn is_test_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
150}
151
152pub trait TransactionalRingBufferChanges {
153	fn find_ringbuffer(&self, id: RingBufferId) -> Option<&RingBuffer>;
154
155	fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&RingBuffer>;
156
157	fn is_ringbuffer_deleted(&self, id: RingBufferId) -> bool;
158
159	fn is_ringbuffer_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
160}
161
162pub trait TransactionalSeriesChanges {
163	fn find_series(&self, id: SeriesId) -> Option<&Series>;
164
165	fn find_series_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Series>;
166
167	fn is_series_deleted(&self, id: SeriesId) -> bool;
168
169	fn is_series_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
170}
171
172pub trait TransactionalViewChanges {
173	fn find_view(&self, id: ViewId) -> Option<&View>;
174
175	fn find_view_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&View>;
176
177	fn is_view_deleted(&self, id: ViewId) -> bool;
178
179	fn is_view_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
180}
181
182pub trait TransactionalSumTypeChanges {
183	fn find_sumtype(&self, id: SumTypeId) -> Option<&SumType>;
184
185	fn find_sumtype_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&SumType>;
186
187	fn is_sumtype_deleted(&self, id: SumTypeId) -> bool;
188
189	fn is_sumtype_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
190}
191
192pub trait TransactionalHandlerChanges {
193	fn find_handler_by_id(&self, id: HandlerId) -> Option<&Handler>;
194
195	fn find_handler_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Handler>;
196
197	fn is_handler_deleted(&self, id: HandlerId) -> bool;
198
199	fn is_handler_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
200}
201
202pub trait TransactionalIdentityChanges {
203	fn find_identity(&self, id: IdentityId) -> Option<&Identity>;
204
205	fn find_identity_by_name(&self, name: &str) -> Option<&Identity>;
206
207	fn is_identity_deleted(&self, id: IdentityId) -> bool;
208
209	fn is_identity_deleted_by_name(&self, name: &str) -> bool;
210}
211
212pub trait TransactionalRoleChanges {
213	fn find_role(&self, id: RoleId) -> Option<&Role>;
214
215	fn find_role_by_name(&self, name: &str) -> Option<&Role>;
216
217	fn is_role_deleted(&self, id: RoleId) -> bool;
218
219	fn is_role_deleted_by_name(&self, name: &str) -> bool;
220}
221
222pub trait TransactionalAuthenticationChanges {
223	fn find_authentication(&self, id: AuthenticationId) -> Option<&Authentication>;
224
225	fn find_authentication_by_identity_and_method(
226		&self,
227		identity: IdentityId,
228		method: &str,
229	) -> Option<&Authentication>;
230
231	fn is_authentication_deleted(&self, id: AuthenticationId) -> bool;
232
233	fn is_authentication_deleted_by_identity_and_method(&self, identity: IdentityId, method: &str) -> bool;
234}
235
236pub trait TransactionalGrantedRoleChanges {
237	fn find_granted_role(&self, identity: IdentityId, role: RoleId) -> Option<&GrantedRole>;
238
239	fn find_granted_roles_for_identity(&self, identity: IdentityId) -> Vec<&GrantedRole>;
240
241	fn is_granted_role_deleted(&self, identity: IdentityId, role: RoleId) -> bool;
242}
243
244pub trait TransactionalPolicyChanges {
245	fn find_policy(&self, id: PolicyId) -> Option<&Policy>;
246
247	fn find_policy_by_name(&self, name: &str) -> Option<&Policy>;
248
249	fn is_policy_deleted(&self, id: PolicyId) -> bool;
250
251	fn is_policy_deleted_by_name(&self, name: &str) -> bool;
252}
253
254pub trait TransactionalSourceChanges {
255	fn find_source(&self, id: SourceId) -> Option<&Source>;
256
257	fn find_source_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Source>;
258
259	fn is_source_deleted(&self, id: SourceId) -> bool;
260
261	fn is_source_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
262}
263
264pub trait TransactionalSinkChanges {
265	fn find_sink(&self, id: SinkId) -> Option<&Sink>;
266
267	fn find_sink_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Sink>;
268
269	fn is_sink_deleted(&self, id: SinkId) -> bool;
270
271	fn is_sink_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
272}
273
274pub trait TransactionalMigrationChanges {
275	fn find_migration(&self, id: MigrationId) -> Option<&Migration>;
276
277	fn find_migration_by_name(&self, name: &str) -> Option<&Migration>;
278
279	fn is_migration_deleted(&self, id: MigrationId) -> bool;
280
281	fn is_migration_deleted_by_name(&self, name: &str) -> bool;
282}
283
284#[derive(Default, Debug, Clone)]
285pub struct TransactionalCatalogChanges {
286	pub txn_id: TransactionId,
287
288	pub config: Vec<Change<Config>>,
289
290	pub binding: Vec<Change<Binding>>,
291
292	pub dictionary: Vec<Change<Dictionary>>,
293
294	pub flow: Vec<Change<Flow>>,
295
296	pub handler: Vec<Change<Handler>>,
297
298	pub migration: Vec<Change<Migration>>,
299
300	pub migration_event: Vec<Change<MigrationEvent>>,
301
302	pub namespace: Vec<Change<Namespace>>,
303
304	pub procedure: Vec<Change<Procedure>>,
305
306	pub ringbuffer: Vec<Change<RingBuffer>>,
307
308	pub series: Vec<Change<Series>>,
309
310	pub sink: Vec<Change<Sink>>,
311
312	pub source: Vec<Change<Source>>,
313	pub sumtype: Vec<Change<SumType>>,
314
315	pub test: Vec<Change<Test>>,
316
317	pub table: Vec<Change<Table>>,
318
319	pub identity: Vec<Change<Identity>>,
320
321	pub authentication: Vec<Change<Authentication>>,
322
323	pub role: Vec<Change<Role>>,
324
325	pub granted_role: Vec<Change<GrantedRole>>,
326
327	pub policy: Vec<Change<Policy>>,
328
329	pub view: Vec<Change<View>>,
330
331	pub row_ttl: Vec<Change<(ShapeId, Ttl)>>,
332
333	pub operator_ttl: Vec<Change<(FlowNodeId, Ttl)>>,
334
335	pub log: Vec<Operation>,
336}
337
338pub struct CatalogChangesSavepoint {
339	binding_len: usize,
340	config_len: usize,
341	dictionary_len: usize,
342	flow_len: usize,
343	handler_len: usize,
344	migration_len: usize,
345	migration_event_len: usize,
346	namespace_len: usize,
347	procedure_len: usize,
348	ringbuffer_len: usize,
349	series_len: usize,
350	sink_len: usize,
351	source_len: usize,
352	sumtype_len: usize,
353	test_len: usize,
354	table_len: usize,
355	identity_len: usize,
356	authentication_len: usize,
357	role_len: usize,
358	granted_role_len: usize,
359	policy_len: usize,
360	view_len: usize,
361	row_ttl_len: usize,
362	operator_ttl_len: usize,
363	log_len: usize,
364}
365
366impl TransactionalCatalogChanges {
367	pub fn savepoint(&self) -> CatalogChangesSavepoint {
368		CatalogChangesSavepoint {
369			binding_len: self.binding.len(),
370			config_len: self.config.len(),
371			dictionary_len: self.dictionary.len(),
372			flow_len: self.flow.len(),
373			handler_len: self.handler.len(),
374			migration_len: self.migration.len(),
375			migration_event_len: self.migration_event.len(),
376			namespace_len: self.namespace.len(),
377			procedure_len: self.procedure.len(),
378			ringbuffer_len: self.ringbuffer.len(),
379			series_len: self.series.len(),
380			sink_len: self.sink.len(),
381			source_len: self.source.len(),
382			sumtype_len: self.sumtype.len(),
383			test_len: self.test.len(),
384			table_len: self.table.len(),
385			identity_len: self.identity.len(),
386			authentication_len: self.authentication.len(),
387			role_len: self.role.len(),
388			granted_role_len: self.granted_role.len(),
389			policy_len: self.policy.len(),
390			view_len: self.view.len(),
391			row_ttl_len: self.row_ttl.len(),
392			operator_ttl_len: self.operator_ttl.len(),
393			log_len: self.log.len(),
394		}
395	}
396
397	pub fn restore_savepoint(&mut self, sp: CatalogChangesSavepoint) {
398		self.binding.truncate(sp.binding_len);
399		self.config.truncate(sp.config_len);
400		self.dictionary.truncate(sp.dictionary_len);
401		self.flow.truncate(sp.flow_len);
402		self.handler.truncate(sp.handler_len);
403		self.migration.truncate(sp.migration_len);
404		self.migration_event.truncate(sp.migration_event_len);
405		self.namespace.truncate(sp.namespace_len);
406		self.procedure.truncate(sp.procedure_len);
407		self.ringbuffer.truncate(sp.ringbuffer_len);
408		self.series.truncate(sp.series_len);
409		self.sink.truncate(sp.sink_len);
410		self.source.truncate(sp.source_len);
411		self.sumtype.truncate(sp.sumtype_len);
412		self.test.truncate(sp.test_len);
413		self.table.truncate(sp.table_len);
414		self.identity.truncate(sp.identity_len);
415		self.authentication.truncate(sp.authentication_len);
416		self.role.truncate(sp.role_len);
417		self.granted_role.truncate(sp.granted_role_len);
418		self.policy.truncate(sp.policy_len);
419		self.view.truncate(sp.view_len);
420		self.row_ttl.truncate(sp.row_ttl_len);
421		self.operator_ttl.truncate(sp.operator_ttl_len);
422		self.log.truncate(sp.log_len);
423	}
424
425	pub fn add_binding_change(&mut self, change: Change<Binding>) {
426		let id = change
427			.post
428			.as_ref()
429			.or(change.pre.as_ref())
430			.map(|b| b.id)
431			.expect("Change must have either pre or post state");
432		let op = change.op;
433		self.binding.push(change);
434		self.log.push(Operation::Binding {
435			id,
436			op,
437		});
438	}
439
440	pub fn add_config_change(&mut self, change: Change<Config>) {
441		let key = change
442			.post
443			.as_ref()
444			.or(change.pre.as_ref())
445			.map(|c| c.key)
446			.expect("Change must have either pre or post state");
447		let op = change.op;
448		self.config.push(change);
449		self.log.push(Operation::Config {
450			key,
451			op,
452		});
453	}
454
455	pub fn add_dictionary_change(&mut self, change: Change<Dictionary>) {
456		let id = change
457			.post
458			.as_ref()
459			.or(change.pre.as_ref())
460			.map(|d| d.id)
461			.expect("Change must have either pre or post state");
462		let op = change.op;
463		self.dictionary.push(change);
464		self.log.push(Operation::Dictionary {
465			id,
466			op,
467		});
468	}
469
470	pub fn add_flow_change(&mut self, change: Change<Flow>) {
471		let id = change
472			.post
473			.as_ref()
474			.or(change.pre.as_ref())
475			.map(|f| f.id)
476			.expect("Change must have either pre or post state");
477		let op = change.op;
478		self.flow.push(change);
479		self.log.push(Operation::Flow {
480			id,
481			op,
482		});
483	}
484
485	pub fn add_namespace_change(&mut self, change: Change<Namespace>) {
486		let id = change
487			.post
488			.as_ref()
489			.or(change.pre.as_ref())
490			.map(|s| s.id())
491			.expect("Change must have either pre or post state");
492		let op = change.op;
493		self.namespace.push(change);
494		self.log.push(Operation::Namespace {
495			id,
496			op,
497		});
498	}
499
500	pub fn add_handler_change(&mut self, change: Change<Handler>) {
501		let id = change
502			.post
503			.as_ref()
504			.or(change.pre.as_ref())
505			.map(|h| h.id)
506			.expect("Change must have either pre or post state");
507		let op = change.op;
508		self.handler.push(change);
509		self.log.push(Operation::Handler {
510			id,
511			op,
512		});
513	}
514
515	pub fn add_migration_change(&mut self, change: Change<Migration>) {
516		let id = change
517			.post
518			.as_ref()
519			.or(change.pre.as_ref())
520			.map(|m| m.id)
521			.expect("Change must have either pre or post state");
522		let op = change.op;
523		self.migration.push(change);
524		self.log.push(Operation::Migration {
525			id,
526			op,
527		});
528	}
529
530	pub fn add_migration_event_change(&mut self, change: Change<MigrationEvent>) {
531		let id = change
532			.post
533			.as_ref()
534			.or(change.pre.as_ref())
535			.map(|e| e.id)
536			.expect("Change must have either pre or post state");
537		let op = change.op;
538		self.migration_event.push(change);
539		self.log.push(Operation::MigrationEvent {
540			id,
541			op,
542		});
543	}
544
545	pub fn add_procedure_change(&mut self, change: Change<Procedure>) {
546		let id = change
547			.post
548			.as_ref()
549			.or(change.pre.as_ref())
550			.map(|p| p.id())
551			.expect("Change must have either pre or post state");
552		let op = change.op;
553		self.procedure.push(change);
554		self.log.push(Operation::Procedure {
555			id,
556			op,
557		});
558	}
559
560	pub fn add_test_change(&mut self, change: Change<Test>) {
561		let id = change
562			.post
563			.as_ref()
564			.or(change.pre.as_ref())
565			.map(|t| t.id)
566			.expect("Change must have either pre or post state");
567		let op = change.op;
568		self.test.push(change);
569		self.log.push(Operation::Test {
570			id,
571			op,
572		});
573	}
574
575	pub fn add_ringbuffer_change(&mut self, change: Change<RingBuffer>) {
576		let id = change
577			.post
578			.as_ref()
579			.or(change.pre.as_ref())
580			.map(|rb| rb.id)
581			.expect("Change must have either pre or post state");
582		let op = change.op;
583		self.ringbuffer.push(change);
584		self.log.push(Operation::RingBuffer {
585			id,
586			op,
587		});
588	}
589
590	pub fn add_series_change(&mut self, change: Change<Series>) {
591		let id = change
592			.post
593			.as_ref()
594			.or(change.pre.as_ref())
595			.map(|s| s.id)
596			.expect("Change must have either pre or post state");
597		let op = change.op;
598		self.series.push(change);
599		self.log.push(Operation::Series {
600			id,
601			op,
602		});
603	}
604
605	pub fn add_sink_change(&mut self, change: Change<Sink>) {
606		let id = change
607			.post
608			.as_ref()
609			.or(change.pre.as_ref())
610			.map(|s| s.id)
611			.expect("Change must have either pre or post state");
612		let op = change.op;
613		self.sink.push(change);
614		self.log.push(Operation::Sink {
615			id,
616			op,
617		});
618	}
619
620	pub fn add_source_change(&mut self, change: Change<Source>) {
621		let id = change
622			.post
623			.as_ref()
624			.or(change.pre.as_ref())
625			.map(|s| s.id)
626			.expect("Change must have either pre or post state");
627		let op = change.op;
628		self.source.push(change);
629		self.log.push(Operation::Source {
630			id,
631			op,
632		});
633	}
634
635	pub fn add_table_change(&mut self, change: Change<Table>) {
636		let id = change
637			.post
638			.as_ref()
639			.or(change.pre.as_ref())
640			.map(|t| t.id)
641			.expect("Change must have either pre or post state");
642		let op = change.op;
643		self.table.push(change);
644		self.log.push(Operation::Table {
645			id,
646			op,
647		});
648	}
649
650	pub fn add_view_change(&mut self, change: Change<View>) {
651		let id = change
652			.post
653			.as_ref()
654			.or(change.pre.as_ref())
655			.map(|v| v.id())
656			.expect("Change must have either pre or post state");
657		let op = change.op;
658		self.view.push(change);
659		self.log.push(Operation::View {
660			id,
661			op,
662		});
663	}
664
665	pub fn add_sumtype_change(&mut self, change: Change<SumType>) {
666		let id = change
667			.post
668			.as_ref()
669			.or(change.pre.as_ref())
670			.map(|s| s.id)
671			.expect("Change must have either pre or post state");
672		let op = change.op;
673		self.sumtype.push(change);
674		self.log.push(Operation::SumType {
675			id,
676			op,
677		});
678	}
679
680	pub fn add_identity_change(&mut self, change: Change<Identity>) {
681		let id = change
682			.post
683			.as_ref()
684			.or(change.pre.as_ref())
685			.map(|u| u.id)
686			.expect("Change must have either pre or post state");
687		let op = change.op;
688		self.identity.push(change);
689		self.log.push(Operation::Identity {
690			id,
691			op,
692		});
693	}
694
695	pub fn add_role_change(&mut self, change: Change<Role>) {
696		let id = change
697			.post
698			.as_ref()
699			.or(change.pre.as_ref())
700			.map(|r| r.id)
701			.expect("Change must have either pre or post state");
702		let op = change.op;
703		self.role.push(change);
704		self.log.push(Operation::Role {
705			id,
706			op,
707		});
708	}
709
710	pub fn add_granted_role_change(&mut self, change: Change<GrantedRole>) {
711		let identity = change
712			.post
713			.as_ref()
714			.or(change.pre.as_ref())
715			.map(|ur| ur.identity)
716			.expect("Change must have either pre or post state");
717		let role = change
718			.post
719			.as_ref()
720			.or(change.pre.as_ref())
721			.map(|ur| ur.role_id)
722			.expect("Change must have either pre or post state");
723		let op = change.op;
724		self.granted_role.push(change);
725		self.log.push(Operation::GrantedRole {
726			identity,
727			role,
728			op,
729		});
730	}
731
732	pub fn add_authentication_change(&mut self, change: Change<Authentication>) {
733		let id = change
734			.post
735			.as_ref()
736			.or(change.pre.as_ref())
737			.map(|a| a.id)
738			.expect("Change must have either pre or post state");
739		let op = change.op;
740		self.authentication.push(change);
741		self.log.push(Operation::Authentication {
742			id,
743			op,
744		});
745	}
746
747	pub fn add_policy_change(&mut self, change: Change<Policy>) {
748		let id = change
749			.post
750			.as_ref()
751			.or(change.pre.as_ref())
752			.map(|p| p.id)
753			.expect("Change must have either pre or post state");
754		let op = change.op;
755		self.policy.push(change);
756		self.log.push(Operation::Policy {
757			id,
758			op,
759		});
760	}
761
762	pub fn add_row_ttl_change(&mut self, change: Change<(ShapeId, Ttl)>) {
763		let shape = change
764			.post
765			.as_ref()
766			.or(change.pre.as_ref())
767			.map(|(s, _)| *s)
768			.expect("Change must have either pre or post state");
769		let op = change.op;
770		self.row_ttl.push(change);
771		self.log.push(Operation::Ttl {
772			shape,
773			op,
774		});
775	}
776
777	pub fn add_operator_ttl_change(&mut self, change: Change<(FlowNodeId, Ttl)>) {
778		let node = change
779			.post
780			.as_ref()
781			.or(change.pre.as_ref())
782			.map(|(n, _)| *n)
783			.expect("Change must have either pre or post state");
784		let op = change.op;
785		self.operator_ttl.push(change);
786		self.log.push(Operation::OperatorTtl {
787			node,
788			op,
789		});
790	}
791}
792
793#[derive(Debug, Clone)]
794pub struct Change<T> {
795	pub pre: Option<T>,
796
797	pub post: Option<T>,
798
799	pub op: OperationType,
800}
801
802#[derive(Debug, Clone, Copy, PartialEq)]
803pub enum OperationType {
804	Create,
805	Update,
806	Delete,
807}
808
809#[derive(Debug, Clone)]
810pub enum Operation {
811	Binding {
812		id: BindingId,
813		op: OperationType,
814	},
815	Config {
816		key: ConfigKey,
817		op: OperationType,
818	},
819	Dictionary {
820		id: DictionaryId,
821		op: OperationType,
822	},
823	Flow {
824		id: FlowId,
825		op: OperationType,
826	},
827	Handler {
828		id: HandlerId,
829		op: OperationType,
830	},
831	Migration {
832		id: MigrationId,
833		op: OperationType,
834	},
835	MigrationEvent {
836		id: MigrationEventId,
837		op: OperationType,
838	},
839	Namespace {
840		id: NamespaceId,
841		op: OperationType,
842	},
843	Procedure {
844		id: ProcedureId,
845		op: OperationType,
846	},
847	RingBuffer {
848		id: RingBufferId,
849		op: OperationType,
850	},
851	Series {
852		id: SeriesId,
853		op: OperationType,
854	},
855	Sink {
856		id: SinkId,
857		op: OperationType,
858	},
859	Source {
860		id: SourceId,
861		op: OperationType,
862	},
863	SumType {
864		id: SumTypeId,
865		op: OperationType,
866	},
867	Test {
868		id: TestId,
869		op: OperationType,
870	},
871	Table {
872		id: TableId,
873		op: OperationType,
874	},
875	Identity {
876		id: IdentityId,
877		op: OperationType,
878	},
879	Authentication {
880		id: AuthenticationId,
881		op: OperationType,
882	},
883	Role {
884		id: RoleId,
885		op: OperationType,
886	},
887	GrantedRole {
888		identity: IdentityId,
889		role: RoleId,
890		op: OperationType,
891	},
892	Policy {
893		id: PolicyId,
894		op: OperationType,
895	},
896	View {
897		id: ViewId,
898		op: OperationType,
899	},
900	Ttl {
901		shape: ShapeId,
902		op: OperationType,
903	},
904	OperatorTtl {
905		node: FlowNodeId,
906		op: OperationType,
907	},
908}
909
910impl TransactionalCatalogChanges {
911	pub fn new(txn_id: TransactionId) -> Self {
912		Self {
913			txn_id,
914			binding: Vec::new(),
915			config: Vec::new(),
916			dictionary: Vec::new(),
917			flow: Vec::new(),
918			handler: Vec::new(),
919			migration: Vec::new(),
920			migration_event: Vec::new(),
921			namespace: Vec::new(),
922			procedure: Vec::new(),
923			ringbuffer: Vec::new(),
924			series: Vec::new(),
925			sink: Vec::new(),
926			source: Vec::new(),
927			sumtype: Vec::new(),
928			test: Vec::new(),
929			table: Vec::new(),
930			identity: Vec::new(),
931			authentication: Vec::new(),
932			role: Vec::new(),
933			granted_role: Vec::new(),
934			policy: Vec::new(),
935			view: Vec::new(),
936			row_ttl: Vec::new(),
937			operator_ttl: Vec::new(),
938			log: Vec::new(),
939		}
940	}
941
942	pub fn table_exists(&self, id: TableId) -> bool {
943		self.get_table(id).is_some()
944	}
945
946	pub fn get_table(&self, id: TableId) -> Option<&Table> {
947		for change in self.table.iter().rev() {
948			if let Some(table) = &change.post {
949				if table.id == id {
950					return Some(table);
951				}
952			} else if let Some(table) = &change.pre
953				&& table.id == id && change.op == Delete
954			{
955				return None;
956			}
957		}
958		None
959	}
960
961	pub fn view_exists(&self, id: ViewId) -> bool {
962		self.get_view(id).is_some()
963	}
964
965	pub fn get_view(&self, id: ViewId) -> Option<&View> {
966		for change in self.view.iter().rev() {
967			if let Some(view) = &change.post {
968				if view.id() == id {
969					return Some(view);
970				}
971			} else if let Some(view) = &change.pre
972				&& view.id() == id && change.op == Delete
973			{
974				return None;
975			}
976		}
977		None
978	}
979
980	pub fn get_row_ttl(&self, shape: ShapeId) -> Option<&Ttl> {
981		for change in self.row_ttl.iter().rev() {
982			if let Some((s, ttl)) = &change.post {
983				if *s == shape {
984					return Some(ttl);
985				}
986			} else if let Some((s, _)) = &change.pre
987				&& *s == shape && change.op == Delete
988			{
989				return None;
990			}
991		}
992		None
993	}
994
995	pub fn get_pending_changes(&self) -> &[Operation] {
996		&self.log
997	}
998
999	pub fn txn_id(&self) -> TransactionId {
1000		self.txn_id
1001	}
1002
1003	pub fn namespace(&self) -> &[Change<Namespace>] {
1004		&self.namespace
1005	}
1006
1007	pub fn table(&self) -> &[Change<Table>] {
1008		&self.table
1009	}
1010
1011	pub fn view(&self) -> &[Change<View>] {
1012		&self.view
1013	}
1014
1015	pub fn clear(&mut self) {
1016		self.binding.clear();
1017		self.config.clear();
1018		self.dictionary.clear();
1019		self.flow.clear();
1020		self.handler.clear();
1021		self.migration.clear();
1022		self.migration_event.clear();
1023		self.namespace.clear();
1024		self.procedure.clear();
1025		self.ringbuffer.clear();
1026		self.series.clear();
1027		self.sink.clear();
1028		self.source.clear();
1029		self.sumtype.clear();
1030		self.test.clear();
1031		self.table.clear();
1032		self.identity.clear();
1033		self.authentication.clear();
1034		self.role.clear();
1035		self.granted_role.clear();
1036		self.policy.clear();
1037		self.view.clear();
1038		self.log.clear();
1039	}
1040}
1041
1042#[derive(Debug, Clone)]
1043pub struct TableRowInsertion {
1044	pub table_id: TableId,
1045	pub row_number: RowNumber,
1046	pub encoded: EncodedRow,
1047}
1048
1049#[derive(Debug, Clone)]
1050pub enum RowChange {
1051	TableInsert(TableRowInsertion),
1052}