reifydb_engine/transaction/catalog/
table.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackTableChangeOperations;
6use reifydb_core::interface::{
7	Change, NamespaceId, OperationType, OperationType::Delete, TableDef, TableId, TransactionalTableChanges,
8};
9
10use crate::{StandardCommandTransaction, StandardQueryTransaction};
11
12impl CatalogTrackTableChangeOperations for StandardCommandTransaction {
13	fn track_table_def_created(&mut self, table: TableDef) -> reifydb_core::Result<()> {
14		let change = Change {
15			pre: None,
16			post: Some(table),
17			op: Create,
18		};
19		self.changes.add_table_def_change(change);
20		Ok(())
21	}
22
23	fn track_table_def_updated(&mut self, pre: TableDef, post: TableDef) -> reifydb_core::Result<()> {
24		let change = Change {
25			pre: Some(pre),
26			post: Some(post),
27			op: Update,
28		};
29		self.changes.add_table_def_change(change);
30		Ok(())
31	}
32
33	fn track_table_def_deleted(&mut self, table: TableDef) -> reifydb_core::Result<()> {
34		let change = Change {
35			pre: Some(table),
36			post: None,
37			op: Delete,
38		};
39		self.changes.add_table_def_change(change);
40		Ok(())
41	}
42}
43
44impl TransactionalTableChanges for StandardCommandTransaction {
45	fn find_table(&self, id: TableId) -> Option<&TableDef> {
46		// Find the last change for this table ID
47		for change in self.changes.table_def.iter().rev() {
48			if let Some(table) = &change.post {
49				if table.id == id {
50					return Some(table);
51				}
52			} else if let Some(table) = &change.pre {
53				if table.id == id && change.op == Delete {
54					// Table was deleted
55					return None;
56				}
57			}
58		}
59		None
60	}
61
62	fn find_table_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&TableDef> {
63		self.changes
64			.table_def
65			.iter()
66			.rev()
67			.find_map(|change| change.post.as_ref().filter(|t| t.namespace == namespace && t.name == name))
68	}
69
70	fn is_table_deleted(&self, id: TableId) -> bool {
71		self.changes
72			.table_def
73			.iter()
74			.rev()
75			.any(|change| change.op == Delete && change.pre.as_ref().map(|t| t.id) == Some(id))
76	}
77
78	fn is_table_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
79		self.changes.table_def.iter().rev().any(|change| {
80			change.op == Delete
81				&& change
82					.pre
83					.as_ref()
84					.map(|t| t.namespace == namespace && t.name == name)
85					.unwrap_or(false)
86		})
87	}
88}
89
90impl TransactionalTableChanges for StandardQueryTransaction {
91	fn find_table(&self, _id: TableId) -> Option<&TableDef> {
92		None
93	}
94
95	fn find_table_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&TableDef> {
96		None
97	}
98
99	fn is_table_deleted(&self, _id: TableId) -> bool {
100		false
101	}
102
103	fn is_table_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
104		false
105	}
106}
107
108#[cfg(test)]
109mod tests {
110	use reifydb_catalog::transaction::CatalogTrackTableChangeOperations;
111	use reifydb_core::interface::{
112		NamespaceId, Operation,
113		OperationType::{Create, Delete, Update},
114		TableDef, TableId,
115	};
116
117	use crate::test_utils::create_test_command_transaction;
118
119	// Helper functions to create test definitions
120	async fn test_table_def(id: u64, namespace_id: u64, name: &str) -> TableDef {
121		TableDef {
122			id: TableId(id),
123			namespace: NamespaceId(namespace_id),
124			name: name.to_string(),
125			columns: vec![],
126			primary_key: None,
127		}
128	}
129
130	mod track_table_def_created {
131		use super::*;
132
133		#[tokio::test]
134		async fn test_successful_creation() {
135			let mut txn = create_test_command_transaction().await;
136
137			let table = test_table_def(1, 1, "test_table").await;
138			let result = txn.track_table_def_created(table.clone());
139			assert!(result.is_ok());
140
141			// Verify the change was recorded
142			assert_eq!(txn.changes.table_def.len(), 1);
143			let change = &txn.changes.table_def[0];
144			assert!(change.pre.is_none());
145			assert_eq!(change.post.as_ref().unwrap().name, "test_table");
146			assert_eq!(change.op, Create);
147
148			// Verify operation was logged
149			assert_eq!(txn.changes.log.len(), 1);
150			match &txn.changes.log[0] {
151				Operation::Table {
152					id,
153					op,
154				} if *id == table.id && *op == Create => {}
155				_ => panic!("Expected Table operation with Create"),
156			}
157		}
158	}
159
160	mod track_table_def_updated {
161		use super::*;
162
163		#[tokio::test]
164		async fn test_multiple_updates_no_coalescing() {
165			let mut txn = create_test_command_transaction().await;
166			let table_v1 = test_table_def(1, 1, "table_v1").await;
167			let table_v2 = test_table_def(1, 1, "table_v2").await;
168			let table_v3 = test_table_def(1, 1, "table_v3").await;
169
170			// First update
171			txn.track_table_def_updated(table_v1.clone(), table_v2.clone()).unwrap();
172
173			// Should have one change
174			assert_eq!(txn.changes.table_def.len(), 1);
175			assert_eq!(txn.changes.table_def[0].pre.as_ref().unwrap().name, "table_v1");
176			assert_eq!(txn.changes.table_def[0].post.as_ref().unwrap().name, "table_v2");
177			assert_eq!(txn.changes.table_def[0].op, Update);
178
179			// Second update - should NOT coalesce
180			txn.track_table_def_updated(table_v2, table_v3.clone()).unwrap();
181
182			// Should now have TWO changes
183			assert_eq!(txn.changes.table_def.len(), 2);
184
185			// Second update recorded separately
186			assert_eq!(txn.changes.table_def[1].pre.as_ref().unwrap().name, "table_v2");
187			assert_eq!(txn.changes.table_def[1].post.as_ref().unwrap().name, "table_v3");
188
189			// Should have 2 log entries
190			assert_eq!(txn.changes.log.len(), 2);
191		}
192
193		#[tokio::test]
194		async fn test_create_then_update_no_coalescing() {
195			let mut txn = create_test_command_transaction().await;
196			let table_v1 = test_table_def(1, 1, "table_v1").await;
197			let table_v2 = test_table_def(1, 1, "table_v2").await;
198
199			// First track creation
200			txn.track_table_def_created(table_v1.clone()).unwrap();
201			assert_eq!(txn.changes.table_def.len(), 1);
202			assert_eq!(txn.changes.table_def[0].op, Create);
203
204			// Then track update - should NOT coalesce
205			txn.track_table_def_updated(table_v1, table_v2.clone()).unwrap();
206
207			// Should have TWO changes now
208			assert_eq!(txn.changes.table_def.len(), 2);
209
210			// First is still Create
211			assert_eq!(txn.changes.table_def[0].op, Create);
212
213			// Second is Update
214			assert_eq!(txn.changes.table_def[1].op, Update);
215
216			// Should have 2 log entries
217			assert_eq!(txn.changes.log.len(), 2);
218		}
219	}
220
221	mod track_table_def_deleted {
222		use super::*;
223
224		#[tokio::test]
225		async fn test_delete_after_create_no_coalescing() {
226			let mut txn = create_test_command_transaction().await;
227			let table = test_table_def(1, 1, "test_table").await;
228
229			// First track creation
230			txn.track_table_def_created(table.clone()).unwrap();
231			assert_eq!(txn.changes.table_def.len(), 1);
232
233			// Then track deletion
234			let result = txn.track_table_def_deleted(table.clone());
235			assert!(result.is_ok());
236
237			// Should have TWO changes now (no coalescing)
238			assert_eq!(txn.changes.table_def.len(), 2);
239
240			// First is Create
241			assert_eq!(txn.changes.table_def[0].op, Create);
242
243			// Second is Delete
244			assert_eq!(txn.changes.table_def[1].op, Delete);
245
246			// Should have 2 log entries
247			assert_eq!(txn.changes.log.len(), 2);
248		}
249
250		#[tokio::test]
251		async fn test_normal_delete() {
252			let mut txn = create_test_command_transaction().await;
253			let table = test_table_def(1, 1, "test_table").await;
254
255			let result = txn.track_table_def_deleted(table.clone());
256			assert!(result.is_ok());
257
258			// Verify the change was recorded
259			assert_eq!(txn.changes.table_def.len(), 1);
260			let change = &txn.changes.table_def[0];
261			assert_eq!(change.pre.as_ref().unwrap().name, "test_table");
262			assert!(change.post.is_none());
263			assert_eq!(change.op, Delete);
264
265			// Verify operation was logged
266			assert_eq!(txn.changes.log.len(), 1);
267			match &txn.changes.log[0] {
268				Operation::Table {
269					id,
270					op,
271				} if *id == table.id && *op == Delete => {}
272				_ => panic!("Expected Table operation with Delete"),
273			}
274		}
275	}
276}