Skip to main content

reifydb_transaction/
change.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use OperationType::Delete;
5use reifydb_core::{
6	encoded::row::EncodedRow,
7	interface::catalog::{
8		authentication::{Authentication, AuthenticationId},
9		binding::Binding,
10		config::{Config, ConfigKey},
11		dictionary::Dictionary,
12		flow::{Flow, FlowId},
13		handler::Handler,
14		id::{
15			BindingId, HandlerId, MigrationEventId, MigrationId, NamespaceId, ProcedureId, RingBufferId,
16			SeriesId, SinkId, SourceId, TableId, TestId, ViewId,
17		},
18		identity::{GrantedRole, Identity, Role, RoleId},
19		migration::{Migration, MigrationEvent},
20		namespace::Namespace,
21		policy::{Policy, PolicyId},
22		procedure::Procedure,
23		ringbuffer::RingBuffer,
24		series::Series,
25		shape::ShapeId,
26		sink::Sink,
27		source::Source,
28		sumtype::SumType,
29		table::Table,
30		test::Test,
31		view::View,
32	},
33	row::RowTtl,
34};
35use reifydb_type::value::{dictionary::DictionaryId, identity::IdentityId, row_number::RowNumber, sumtype::SumTypeId};
36
37use crate::TransactionId;
38
39pub trait TransactionalChanges:
40	TransactionalBindingChanges
41	+ TransactionalDictionaryChanges
42	+ TransactionalFlowChanges
43	+ TransactionalHandlerChanges
44	+ TransactionalMigrationChanges
45	+ TransactionalNamespaceChanges
46	+ TransactionalProcedureChanges
47	+ TransactionalRingBufferChanges
48	+ TransactionalRoleChanges
49	+ TransactionalPolicyChanges
50	+ TransactionalSeriesChanges
51	+ TransactionalSinkChanges
52	+ TransactionalSourceChanges
53	+ TransactionalSumTypeChanges
54	+ TransactionalTableChanges
55	+ TransactionalTestChanges
56	+ TransactionalAuthenticationChanges
57	+ TransactionalIdentityChanges
58	+ TransactionalGrantedRoleChanges
59	+ TransactionalViewChanges
60	+ TransactionalConfigChanges
61	+ TransactionalRowTtlChanges
62{
63}
64
65pub trait TransactionalBindingChanges {
66	fn find_binding(&self, id: BindingId) -> Option<&Binding>;
67
68	fn find_binding_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Binding>;
69
70	fn is_binding_deleted(&self, id: BindingId) -> bool;
71
72	fn is_binding_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
73}
74
75pub trait TransactionalRowTtlChanges {
76	fn find_row_ttl(&self, shape: ShapeId) -> Option<&RowTtl>;
77
78	fn is_row_ttl_deleted(&self, shape: ShapeId) -> bool;
79}
80
81pub trait TransactionalConfigChanges {
82	fn find_config(&self, key: ConfigKey) -> Option<&Config>;
83}
84
85pub trait TransactionalDictionaryChanges {
86	fn find_dictionary(&self, id: DictionaryId) -> Option<&Dictionary>;
87
88	fn find_dictionary_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Dictionary>;
89
90	fn is_dictionary_deleted(&self, id: DictionaryId) -> bool;
91
92	fn is_dictionary_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
93}
94
95pub trait TransactionalNamespaceChanges {
96	fn find_namespace(&self, id: NamespaceId) -> Option<&Namespace>;
97
98	fn find_namespace_by_name(&self, name: &str) -> Option<&Namespace>;
99
100	fn is_namespace_deleted(&self, id: NamespaceId) -> bool;
101
102	fn is_namespace_deleted_by_name(&self, name: &str) -> bool;
103}
104
105pub trait TransactionalFlowChanges {
106	fn find_flow(&self, id: FlowId) -> Option<&Flow>;
107
108	fn find_flow_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Flow>;
109
110	fn is_flow_deleted(&self, id: FlowId) -> bool;
111
112	fn is_flow_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
113}
114
115pub trait TransactionalTableChanges {
116	fn find_table(&self, id: TableId) -> Option<&Table>;
117
118	fn find_table_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Table>;
119
120	fn is_table_deleted(&self, id: TableId) -> bool;
121
122	fn is_table_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
123}
124
125pub trait TransactionalProcedureChanges {
126	fn find_procedure(&self, id: ProcedureId) -> Option<&Procedure>;
127
128	fn find_procedure_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Procedure>;
129
130	fn is_procedure_deleted(&self, id: ProcedureId) -> bool;
131
132	fn is_procedure_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
133}
134
135pub trait TransactionalTestChanges {
136	fn find_test(&self, id: TestId) -> Option<&Test>;
137
138	fn find_test_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Test>;
139
140	fn is_test_deleted(&self, id: TestId) -> bool;
141
142	fn is_test_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
143}
144
145pub trait TransactionalRingBufferChanges {
146	fn find_ringbuffer(&self, id: RingBufferId) -> Option<&RingBuffer>;
147
148	fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&RingBuffer>;
149
150	fn is_ringbuffer_deleted(&self, id: RingBufferId) -> bool;
151
152	fn is_ringbuffer_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
153}
154
155pub trait TransactionalSeriesChanges {
156	fn find_series(&self, id: SeriesId) -> Option<&Series>;
157
158	fn find_series_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Series>;
159
160	fn is_series_deleted(&self, id: SeriesId) -> bool;
161
162	fn is_series_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
163}
164
165pub trait TransactionalViewChanges {
166	fn find_view(&self, id: ViewId) -> Option<&View>;
167
168	fn find_view_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&View>;
169
170	fn is_view_deleted(&self, id: ViewId) -> bool;
171
172	fn is_view_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
173}
174
175pub trait TransactionalSumTypeChanges {
176	fn find_sumtype(&self, id: SumTypeId) -> Option<&SumType>;
177
178	fn find_sumtype_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&SumType>;
179
180	fn is_sumtype_deleted(&self, id: SumTypeId) -> bool;
181
182	fn is_sumtype_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
183}
184
185pub trait TransactionalHandlerChanges {
186	fn find_handler_by_id(&self, id: HandlerId) -> Option<&Handler>;
187
188	fn find_handler_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Handler>;
189
190	fn is_handler_deleted(&self, id: HandlerId) -> bool;
191
192	fn is_handler_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
193}
194
195pub trait TransactionalIdentityChanges {
196	fn find_identity(&self, id: IdentityId) -> Option<&Identity>;
197
198	fn find_identity_by_name(&self, name: &str) -> Option<&Identity>;
199
200	fn is_identity_deleted(&self, id: IdentityId) -> bool;
201
202	fn is_identity_deleted_by_name(&self, name: &str) -> bool;
203}
204
205pub trait TransactionalRoleChanges {
206	fn find_role(&self, id: RoleId) -> Option<&Role>;
207
208	fn find_role_by_name(&self, name: &str) -> Option<&Role>;
209
210	fn is_role_deleted(&self, id: RoleId) -> bool;
211
212	fn is_role_deleted_by_name(&self, name: &str) -> bool;
213}
214
215pub trait TransactionalAuthenticationChanges {
216	fn find_authentication(&self, id: AuthenticationId) -> Option<&Authentication>;
217
218	fn find_authentication_by_identity_and_method(
219		&self,
220		identity: IdentityId,
221		method: &str,
222	) -> Option<&Authentication>;
223
224	fn is_authentication_deleted(&self, id: AuthenticationId) -> bool;
225
226	fn is_authentication_deleted_by_identity_and_method(&self, identity: IdentityId, method: &str) -> bool;
227}
228
229pub trait TransactionalGrantedRoleChanges {
230	fn find_granted_role(&self, identity: IdentityId, role: RoleId) -> Option<&GrantedRole>;
231
232	fn find_granted_roles_for_identity(&self, identity: IdentityId) -> Vec<&GrantedRole>;
233
234	fn is_granted_role_deleted(&self, identity: IdentityId, role: RoleId) -> bool;
235}
236
237pub trait TransactionalPolicyChanges {
238	fn find_policy(&self, id: PolicyId) -> Option<&Policy>;
239
240	fn find_policy_by_name(&self, name: &str) -> Option<&Policy>;
241
242	fn is_policy_deleted(&self, id: PolicyId) -> bool;
243
244	fn is_policy_deleted_by_name(&self, name: &str) -> bool;
245}
246
247pub trait TransactionalSourceChanges {
248	fn find_source(&self, id: SourceId) -> Option<&Source>;
249
250	fn find_source_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Source>;
251
252	fn is_source_deleted(&self, id: SourceId) -> bool;
253
254	fn is_source_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
255}
256
257pub trait TransactionalSinkChanges {
258	fn find_sink(&self, id: SinkId) -> Option<&Sink>;
259
260	fn find_sink_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&Sink>;
261
262	fn is_sink_deleted(&self, id: SinkId) -> bool;
263
264	fn is_sink_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool;
265}
266
267pub trait TransactionalMigrationChanges {
268	fn find_migration(&self, id: MigrationId) -> Option<&Migration>;
269
270	fn find_migration_by_name(&self, name: &str) -> Option<&Migration>;
271
272	fn is_migration_deleted(&self, id: MigrationId) -> bool;
273
274	fn is_migration_deleted_by_name(&self, name: &str) -> bool;
275}
276
277#[derive(Default, Debug, Clone)]
278pub struct TransactionalCatalogChanges {
279	/// Transaction ID this change set belongs to
280	pub txn_id: TransactionId,
281	/// Configuration changes in order
282	pub config: Vec<Change<Config>>,
283	/// All binding definition changes in order (no coalescing)
284	pub binding: Vec<Change<Binding>>,
285	/// All dictionary definition changes in order (no coalescing)
286	pub dictionary: Vec<Change<Dictionary>>,
287	/// All flow definition changes in order (no coalescing)
288	pub flow: Vec<Change<Flow>>,
289	/// All handler definition changes in order (no coalescing)
290	pub handler: Vec<Change<Handler>>,
291	/// All migration definition changes in order (no coalescing)
292	pub migration: Vec<Change<Migration>>,
293	/// All migration event changes in order (no coalescing)
294	pub migration_event: Vec<Change<MigrationEvent>>,
295	/// All namespace definition changes in order (no coalescing)
296	pub namespace: Vec<Change<Namespace>>,
297	/// All procedure definition changes in order (no coalescing)
298	pub procedure: Vec<Change<Procedure>>,
299	/// All ring buffer definition changes in order (no coalescing)
300	pub ringbuffer: Vec<Change<RingBuffer>>,
301	/// All series definition changes in order (no coalescing)
302	pub series: Vec<Change<Series>>,
303	/// All sink definition changes in order (no coalescing)
304	pub sink: Vec<Change<Sink>>,
305	/// All source definition changes in order (no coalescing)
306	pub source: Vec<Change<Source>>,
307	pub sumtype: Vec<Change<SumType>>,
308	/// All test definition changes in order (no coalescing)
309	pub test: Vec<Change<Test>>,
310	/// All table definition changes in order (no coalescing)
311	pub table: Vec<Change<Table>>,
312	/// All identity definition changes in order (no coalescing)
313	pub identity: Vec<Change<Identity>>,
314	/// All authentication definition changes in order (no coalescing)
315	pub authentication: Vec<Change<Authentication>>,
316	/// All role definition changes in order (no coalescing)
317	pub role: Vec<Change<Role>>,
318	/// All granted-role changes in order (no coalescing)
319	pub granted_role: Vec<Change<GrantedRole>>,
320	/// All policy definition changes in order (no coalescing)
321	pub policy: Vec<Change<Policy>>,
322	/// All view definition changes in order (no coalescing)
323	pub view: Vec<Change<View>>,
324	/// All row TTL changes in order (no coalescing)
325	pub row_ttl: Vec<Change<(ShapeId, RowTtl)>>,
326	/// Order of operations for replay/rollback
327	pub log: Vec<Operation>,
328}
329
330pub struct CatalogChangesSavepoint {
331	binding_len: usize,
332	config_len: usize,
333	dictionary_len: usize,
334	flow_len: usize,
335	handler_len: usize,
336	migration_len: usize,
337	migration_event_len: usize,
338	namespace_len: usize,
339	procedure_len: usize,
340	ringbuffer_len: usize,
341	series_len: usize,
342	sink_len: usize,
343	source_len: usize,
344	sumtype_len: usize,
345	test_len: usize,
346	table_len: usize,
347	identity_len: usize,
348	authentication_len: usize,
349	role_len: usize,
350	granted_role_len: usize,
351	policy_len: usize,
352	view_len: usize,
353	row_ttl_len: usize,
354	log_len: usize,
355}
356
357impl TransactionalCatalogChanges {
358	pub fn savepoint(&self) -> CatalogChangesSavepoint {
359		CatalogChangesSavepoint {
360			binding_len: self.binding.len(),
361			config_len: self.config.len(),
362			dictionary_len: self.dictionary.len(),
363			flow_len: self.flow.len(),
364			handler_len: self.handler.len(),
365			migration_len: self.migration.len(),
366			migration_event_len: self.migration_event.len(),
367			namespace_len: self.namespace.len(),
368			procedure_len: self.procedure.len(),
369			ringbuffer_len: self.ringbuffer.len(),
370			series_len: self.series.len(),
371			sink_len: self.sink.len(),
372			source_len: self.source.len(),
373			sumtype_len: self.sumtype.len(),
374			test_len: self.test.len(),
375			table_len: self.table.len(),
376			identity_len: self.identity.len(),
377			authentication_len: self.authentication.len(),
378			role_len: self.role.len(),
379			granted_role_len: self.granted_role.len(),
380			policy_len: self.policy.len(),
381			view_len: self.view.len(),
382			row_ttl_len: self.row_ttl.len(),
383			log_len: self.log.len(),
384		}
385	}
386
387	pub fn restore_savepoint(&mut self, sp: CatalogChangesSavepoint) {
388		self.binding.truncate(sp.binding_len);
389		self.config.truncate(sp.config_len);
390		self.dictionary.truncate(sp.dictionary_len);
391		self.flow.truncate(sp.flow_len);
392		self.handler.truncate(sp.handler_len);
393		self.migration.truncate(sp.migration_len);
394		self.migration_event.truncate(sp.migration_event_len);
395		self.namespace.truncate(sp.namespace_len);
396		self.procedure.truncate(sp.procedure_len);
397		self.ringbuffer.truncate(sp.ringbuffer_len);
398		self.series.truncate(sp.series_len);
399		self.sink.truncate(sp.sink_len);
400		self.source.truncate(sp.source_len);
401		self.sumtype.truncate(sp.sumtype_len);
402		self.test.truncate(sp.test_len);
403		self.table.truncate(sp.table_len);
404		self.identity.truncate(sp.identity_len);
405		self.authentication.truncate(sp.authentication_len);
406		self.role.truncate(sp.role_len);
407		self.granted_role.truncate(sp.granted_role_len);
408		self.policy.truncate(sp.policy_len);
409		self.view.truncate(sp.view_len);
410		self.row_ttl.truncate(sp.row_ttl_len);
411		self.log.truncate(sp.log_len);
412	}
413
414	pub fn add_binding_change(&mut self, change: Change<Binding>) {
415		let id = change
416			.post
417			.as_ref()
418			.or(change.pre.as_ref())
419			.map(|b| b.id)
420			.expect("Change must have either pre or post state");
421		let op = change.op;
422		self.binding.push(change);
423		self.log.push(Operation::Binding {
424			id,
425			op,
426		});
427	}
428
429	pub fn add_config_change(&mut self, change: Change<Config>) {
430		let key = change
431			.post
432			.as_ref()
433			.or(change.pre.as_ref())
434			.map(|c| c.key)
435			.expect("Change must have either pre or post state");
436		let op = change.op;
437		self.config.push(change);
438		self.log.push(Operation::Config {
439			key,
440			op,
441		});
442	}
443
444	pub fn add_dictionary_change(&mut self, change: Change<Dictionary>) {
445		let id = change
446			.post
447			.as_ref()
448			.or(change.pre.as_ref())
449			.map(|d| d.id)
450			.expect("Change must have either pre or post state");
451		let op = change.op;
452		self.dictionary.push(change);
453		self.log.push(Operation::Dictionary {
454			id,
455			op,
456		});
457	}
458
459	pub fn add_flow_change(&mut self, change: Change<Flow>) {
460		let id = change
461			.post
462			.as_ref()
463			.or(change.pre.as_ref())
464			.map(|f| f.id)
465			.expect("Change must have either pre or post state");
466		let op = change.op;
467		self.flow.push(change);
468		self.log.push(Operation::Flow {
469			id,
470			op,
471		});
472	}
473
474	pub fn add_namespace_change(&mut self, change: Change<Namespace>) {
475		let id = change
476			.post
477			.as_ref()
478			.or(change.pre.as_ref())
479			.map(|s| s.id())
480			.expect("Change must have either pre or post state");
481		let op = change.op;
482		self.namespace.push(change);
483		self.log.push(Operation::Namespace {
484			id,
485			op,
486		});
487	}
488
489	pub fn add_handler_change(&mut self, change: Change<Handler>) {
490		let id = change
491			.post
492			.as_ref()
493			.or(change.pre.as_ref())
494			.map(|h| h.id)
495			.expect("Change must have either pre or post state");
496		let op = change.op;
497		self.handler.push(change);
498		self.log.push(Operation::Handler {
499			id,
500			op,
501		});
502	}
503
504	pub fn add_migration_change(&mut self, change: Change<Migration>) {
505		let id = change
506			.post
507			.as_ref()
508			.or(change.pre.as_ref())
509			.map(|m| m.id)
510			.expect("Change must have either pre or post state");
511		let op = change.op;
512		self.migration.push(change);
513		self.log.push(Operation::Migration {
514			id,
515			op,
516		});
517	}
518
519	pub fn add_migration_event_change(&mut self, change: Change<MigrationEvent>) {
520		let id = change
521			.post
522			.as_ref()
523			.or(change.pre.as_ref())
524			.map(|e| e.id)
525			.expect("Change must have either pre or post state");
526		let op = change.op;
527		self.migration_event.push(change);
528		self.log.push(Operation::MigrationEvent {
529			id,
530			op,
531		});
532	}
533
534	pub fn add_procedure_change(&mut self, change: Change<Procedure>) {
535		let id = change
536			.post
537			.as_ref()
538			.or(change.pre.as_ref())
539			.map(|p| p.id())
540			.expect("Change must have either pre or post state");
541		let op = change.op;
542		self.procedure.push(change);
543		self.log.push(Operation::Procedure {
544			id,
545			op,
546		});
547	}
548
549	pub fn add_test_change(&mut self, change: Change<Test>) {
550		let id = change
551			.post
552			.as_ref()
553			.or(change.pre.as_ref())
554			.map(|t| t.id)
555			.expect("Change must have either pre or post state");
556		let op = change.op;
557		self.test.push(change);
558		self.log.push(Operation::Test {
559			id,
560			op,
561		});
562	}
563
564	pub fn add_ringbuffer_change(&mut self, change: Change<RingBuffer>) {
565		let id = change
566			.post
567			.as_ref()
568			.or(change.pre.as_ref())
569			.map(|rb| rb.id)
570			.expect("Change must have either pre or post state");
571		let op = change.op;
572		self.ringbuffer.push(change);
573		self.log.push(Operation::RingBuffer {
574			id,
575			op,
576		});
577	}
578
579	pub fn add_series_change(&mut self, change: Change<Series>) {
580		let id = change
581			.post
582			.as_ref()
583			.or(change.pre.as_ref())
584			.map(|s| s.id)
585			.expect("Change must have either pre or post state");
586		let op = change.op;
587		self.series.push(change);
588		self.log.push(Operation::Series {
589			id,
590			op,
591		});
592	}
593
594	pub fn add_sink_change(&mut self, change: Change<Sink>) {
595		let id = change
596			.post
597			.as_ref()
598			.or(change.pre.as_ref())
599			.map(|s| s.id)
600			.expect("Change must have either pre or post state");
601		let op = change.op;
602		self.sink.push(change);
603		self.log.push(Operation::Sink {
604			id,
605			op,
606		});
607	}
608
609	pub fn add_source_change(&mut self, change: Change<Source>) {
610		let id = change
611			.post
612			.as_ref()
613			.or(change.pre.as_ref())
614			.map(|s| s.id)
615			.expect("Change must have either pre or post state");
616		let op = change.op;
617		self.source.push(change);
618		self.log.push(Operation::Source {
619			id,
620			op,
621		});
622	}
623
624	pub fn add_table_change(&mut self, change: Change<Table>) {
625		let id = change
626			.post
627			.as_ref()
628			.or(change.pre.as_ref())
629			.map(|t| t.id)
630			.expect("Change must have either pre or post state");
631		let op = change.op;
632		self.table.push(change);
633		self.log.push(Operation::Table {
634			id,
635			op,
636		});
637	}
638
639	pub fn add_view_change(&mut self, change: Change<View>) {
640		let id = change
641			.post
642			.as_ref()
643			.or(change.pre.as_ref())
644			.map(|v| v.id())
645			.expect("Change must have either pre or post state");
646		let op = change.op;
647		self.view.push(change);
648		self.log.push(Operation::View {
649			id,
650			op,
651		});
652	}
653
654	pub fn add_sumtype_change(&mut self, change: Change<SumType>) {
655		let id = change
656			.post
657			.as_ref()
658			.or(change.pre.as_ref())
659			.map(|s| s.id)
660			.expect("Change must have either pre or post state");
661		let op = change.op;
662		self.sumtype.push(change);
663		self.log.push(Operation::SumType {
664			id,
665			op,
666		});
667	}
668
669	pub fn add_identity_change(&mut self, change: Change<Identity>) {
670		let id = change
671			.post
672			.as_ref()
673			.or(change.pre.as_ref())
674			.map(|u| u.id)
675			.expect("Change must have either pre or post state");
676		let op = change.op;
677		self.identity.push(change);
678		self.log.push(Operation::Identity {
679			id,
680			op,
681		});
682	}
683
684	pub fn add_role_change(&mut self, change: Change<Role>) {
685		let id = change
686			.post
687			.as_ref()
688			.or(change.pre.as_ref())
689			.map(|r| r.id)
690			.expect("Change must have either pre or post state");
691		let op = change.op;
692		self.role.push(change);
693		self.log.push(Operation::Role {
694			id,
695			op,
696		});
697	}
698
699	pub fn add_granted_role_change(&mut self, change: Change<GrantedRole>) {
700		let identity = change
701			.post
702			.as_ref()
703			.or(change.pre.as_ref())
704			.map(|ur| ur.identity)
705			.expect("Change must have either pre or post state");
706		let role = change
707			.post
708			.as_ref()
709			.or(change.pre.as_ref())
710			.map(|ur| ur.role_id)
711			.expect("Change must have either pre or post state");
712		let op = change.op;
713		self.granted_role.push(change);
714		self.log.push(Operation::GrantedRole {
715			identity,
716			role,
717			op,
718		});
719	}
720
721	pub fn add_authentication_change(&mut self, change: Change<Authentication>) {
722		let id = change
723			.post
724			.as_ref()
725			.or(change.pre.as_ref())
726			.map(|a| a.id)
727			.expect("Change must have either pre or post state");
728		let op = change.op;
729		self.authentication.push(change);
730		self.log.push(Operation::Authentication {
731			id,
732			op,
733		});
734	}
735
736	pub fn add_policy_change(&mut self, change: Change<Policy>) {
737		let id = change
738			.post
739			.as_ref()
740			.or(change.pre.as_ref())
741			.map(|p| p.id)
742			.expect("Change must have either pre or post state");
743		let op = change.op;
744		self.policy.push(change);
745		self.log.push(Operation::Policy {
746			id,
747			op,
748		});
749	}
750
751	pub fn add_row_ttl_change(&mut self, change: Change<(ShapeId, RowTtl)>) {
752		let shape = change
753			.post
754			.as_ref()
755			.or(change.pre.as_ref())
756			.map(|(s, _)| *s)
757			.expect("Change must have either pre or post state");
758		let op = change.op;
759		self.row_ttl.push(change);
760		self.log.push(Operation::RowTtl {
761			shape,
762			op,
763		});
764	}
765}
766
767/// Represents a single change
768#[derive(Debug, Clone)]
769pub struct Change<T> {
770	/// State before the change (None for CREATE)
771	pub pre: Option<T>,
772
773	/// State after the change (None for DELETE)
774	pub post: Option<T>,
775
776	/// Type of operation
777	pub op: OperationType,
778}
779
780#[derive(Debug, Clone, Copy, PartialEq)]
781pub enum OperationType {
782	Create,
783	Update,
784	Delete,
785}
786
787/// Log entry for operation ordering
788#[derive(Debug, Clone)]
789pub enum Operation {
790	Binding {
791		id: BindingId,
792		op: OperationType,
793	},
794	Config {
795		key: ConfigKey,
796		op: OperationType,
797	},
798	Dictionary {
799		id: DictionaryId,
800		op: OperationType,
801	},
802	Flow {
803		id: FlowId,
804		op: OperationType,
805	},
806	Handler {
807		id: HandlerId,
808		op: OperationType,
809	},
810	Migration {
811		id: MigrationId,
812		op: OperationType,
813	},
814	MigrationEvent {
815		id: MigrationEventId,
816		op: OperationType,
817	},
818	Namespace {
819		id: NamespaceId,
820		op: OperationType,
821	},
822	Procedure {
823		id: ProcedureId,
824		op: OperationType,
825	},
826	RingBuffer {
827		id: RingBufferId,
828		op: OperationType,
829	},
830	Series {
831		id: SeriesId,
832		op: OperationType,
833	},
834	Sink {
835		id: SinkId,
836		op: OperationType,
837	},
838	Source {
839		id: SourceId,
840		op: OperationType,
841	},
842	SumType {
843		id: SumTypeId,
844		op: OperationType,
845	},
846	Test {
847		id: TestId,
848		op: OperationType,
849	},
850	Table {
851		id: TableId,
852		op: OperationType,
853	},
854	Identity {
855		id: IdentityId,
856		op: OperationType,
857	},
858	Authentication {
859		id: AuthenticationId,
860		op: OperationType,
861	},
862	Role {
863		id: RoleId,
864		op: OperationType,
865	},
866	GrantedRole {
867		identity: IdentityId,
868		role: RoleId,
869		op: OperationType,
870	},
871	Policy {
872		id: PolicyId,
873		op: OperationType,
874	},
875	View {
876		id: ViewId,
877		op: OperationType,
878	},
879	RowTtl {
880		shape: ShapeId,
881		op: OperationType,
882	},
883}
884
885impl TransactionalCatalogChanges {
886	pub fn new(txn_id: TransactionId) -> Self {
887		Self {
888			txn_id,
889			binding: Vec::new(),
890			config: Vec::new(),
891			dictionary: Vec::new(),
892			flow: Vec::new(),
893			handler: Vec::new(),
894			migration: Vec::new(),
895			migration_event: Vec::new(),
896			namespace: Vec::new(),
897			procedure: Vec::new(),
898			ringbuffer: Vec::new(),
899			series: Vec::new(),
900			sink: Vec::new(),
901			source: Vec::new(),
902			sumtype: Vec::new(),
903			test: Vec::new(),
904			table: Vec::new(),
905			identity: Vec::new(),
906			authentication: Vec::new(),
907			role: Vec::new(),
908			granted_role: Vec::new(),
909			policy: Vec::new(),
910			view: Vec::new(),
911			row_ttl: Vec::new(),
912			log: Vec::new(),
913		}
914	}
915
916	/// Check if a table exists in this transaction's view
917	pub fn table_exists(&self, id: TableId) -> bool {
918		self.get_table(id).is_some()
919	}
920
921	/// Get current state of a table within this transaction
922	pub fn get_table(&self, id: TableId) -> Option<&Table> {
923		// Find the last change for this table ID
924		for change in self.table.iter().rev() {
925			if let Some(table) = &change.post {
926				if table.id == id {
927					return Some(table);
928				}
929			} else if let Some(table) = &change.pre
930				&& table.id == id && change.op == Delete
931			{
932				// Table was deleted
933				return None;
934			}
935		}
936		None
937	}
938
939	/// Check if a view exists in this transaction's view
940	pub fn view_exists(&self, id: ViewId) -> bool {
941		self.get_view(id).is_some()
942	}
943
944	/// Get current state of a view within this transaction
945	pub fn get_view(&self, id: ViewId) -> Option<&View> {
946		// Find the last change for this view ID
947		for change in self.view.iter().rev() {
948			if let Some(view) = &change.post {
949				if view.id() == id {
950					return Some(view);
951				}
952			} else if let Some(view) = &change.pre
953				&& view.id() == id && change.op == Delete
954			{
955				// View was deleted
956				return None;
957			}
958		}
959		None
960	}
961
962	/// Get current state of a row TTL within this transaction
963	pub fn get_row_ttl(&self, shape: ShapeId) -> Option<&RowTtl> {
964		// Find the last change for this shape ID
965		for change in self.row_ttl.iter().rev() {
966			if let Some((s, ttl)) = &change.post {
967				if *s == shape {
968					return Some(ttl);
969				}
970			} else if let Some((s, _)) = &change.pre
971				&& *s == shape && change.op == Delete
972			{
973				// TTL was deleted
974				return None;
975			}
976		}
977		None
978	}
979
980	/// Get all pending changes for commit
981	pub fn get_pending_changes(&self) -> &[Operation] {
982		&self.log
983	}
984
985	/// Get the transaction ID
986	pub fn txn_id(&self) -> TransactionId {
987		self.txn_id
988	}
989
990	/// Get namespace definition changes
991	pub fn namespace(&self) -> &[Change<Namespace>] {
992		&self.namespace
993	}
994
995	/// Get table definition changes
996	pub fn table(&self) -> &[Change<Table>] {
997		&self.table
998	}
999
1000	/// Get view definition changes
1001	pub fn view(&self) -> &[Change<View>] {
1002		&self.view
1003	}
1004
1005	/// Clear all changes (for rollback)
1006	pub fn clear(&mut self) {
1007		self.binding.clear();
1008		self.config.clear();
1009		self.dictionary.clear();
1010		self.flow.clear();
1011		self.handler.clear();
1012		self.migration.clear();
1013		self.migration_event.clear();
1014		self.namespace.clear();
1015		self.procedure.clear();
1016		self.ringbuffer.clear();
1017		self.series.clear();
1018		self.sink.clear();
1019		self.source.clear();
1020		self.sumtype.clear();
1021		self.test.clear();
1022		self.table.clear();
1023		self.identity.clear();
1024		self.authentication.clear();
1025		self.role.clear();
1026		self.granted_role.clear();
1027		self.policy.clear();
1028		self.view.clear();
1029		self.log.clear();
1030	}
1031}
1032
1033/// Tracks a table row insertion for post-commit event emission
1034#[derive(Debug, Clone)]
1035pub struct TableRowInsertion {
1036	pub table_id: TableId,
1037	pub row_number: RowNumber,
1038	pub encoded: EncodedRow,
1039}
1040
1041/// Tracks row changes across different entity types for post-commit event emission
1042#[derive(Debug, Clone)]
1043pub enum RowChange {
1044	/// A row was inserted into a table
1045	TableInsert(TableRowInsertion),
1046	// Future variants:
1047	// ViewInsert(ViewRowInsertion),
1048	// RingBufferInsert(RingBufferRowInsertion),
1049	// TableUpdate(TableRowUpdate),
1050	// TableDelete(TableRowDelete),
1051}