reifydb_engine/transaction/catalog/
table.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackTableChangeOperations;
6use reifydb_core::interface::{
7	Change, NamespaceId, OperationType, OperationType::Delete, TableDef, TableId, TransactionalTableChanges,
8};
9use reifydb_type::IntoFragment;
10
11use crate::{StandardCommandTransaction, StandardQueryTransaction};
12
13impl CatalogTrackTableChangeOperations for StandardCommandTransaction {
14	fn track_table_def_created(&mut self, table: TableDef) -> reifydb_core::Result<()> {
15		let change = Change {
16			pre: None,
17			post: Some(table),
18			op: Create,
19		};
20		self.changes.add_table_def_change(change);
21		Ok(())
22	}
23
24	fn track_table_def_updated(&mut self, pre: TableDef, post: TableDef) -> reifydb_core::Result<()> {
25		let change = Change {
26			pre: Some(pre),
27			post: Some(post),
28			op: Update,
29		};
30		self.changes.add_table_def_change(change);
31		Ok(())
32	}
33
34	fn track_table_def_deleted(&mut self, table: TableDef) -> reifydb_core::Result<()> {
35		let change = Change {
36			pre: Some(table),
37			post: None,
38			op: Delete,
39		};
40		self.changes.add_table_def_change(change);
41		Ok(())
42	}
43}
44
45impl TransactionalTableChanges for StandardCommandTransaction {
46	fn find_table(&self, id: TableId) -> Option<&TableDef> {
47		// Find the last change for this table ID
48		for change in self.changes.table_def.iter().rev() {
49			if let Some(table) = &change.post {
50				if table.id == id {
51					return Some(table);
52				}
53			} else if let Some(table) = &change.pre {
54				if table.id == id && change.op == Delete {
55					// Table was deleted
56					return None;
57				}
58			}
59		}
60		None
61	}
62
63	fn find_table_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&TableDef> {
64		let name = name.into_fragment();
65		self.changes.table_def.iter().rev().find_map(|change| {
66			change.post.as_ref().filter(|t| t.namespace == namespace && t.name == name.text())
67		})
68	}
69
70	fn is_table_deleted(&self, id: TableId) -> bool {
71		self.changes
72			.table_def
73			.iter()
74			.rev()
75			.any(|change| change.op == Delete && change.pre.as_ref().map(|t| t.id) == Some(id))
76	}
77
78	fn is_table_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool {
79		let name = name.into_fragment();
80		self.changes.table_def.iter().rev().any(|change| {
81			change.op == Delete
82				&& change
83					.pre
84					.as_ref()
85					.map(|t| t.namespace == namespace && t.name == name.text())
86					.unwrap_or(false)
87		})
88	}
89}
90
91impl TransactionalTableChanges for StandardQueryTransaction {
92	fn find_table(&self, _id: TableId) -> Option<&TableDef> {
93		None
94	}
95
96	fn find_table_by_name<'a>(&self, _namespace: NamespaceId, _name: impl IntoFragment<'a>) -> Option<&TableDef> {
97		None
98	}
99
100	fn is_table_deleted(&self, _id: TableId) -> bool {
101		false
102	}
103
104	fn is_table_deleted_by_name<'a>(&self, _namespace: NamespaceId, _name: impl IntoFragment<'a>) -> bool {
105		false
106	}
107}
108
109#[cfg(test)]
110mod tests {
111	use reifydb_catalog::transaction::CatalogTrackTableChangeOperations;
112	use reifydb_core::interface::{
113		NamespaceId, Operation,
114		OperationType::{Create, Delete, Update},
115		TableDef, TableId,
116	};
117
118	use crate::test_utils::create_test_command_transaction;
119
120	// Helper functions to create test definitions
121	fn test_table_def(id: u64, namespace_id: u64, name: &str) -> TableDef {
122		TableDef {
123			id: TableId(id),
124			namespace: NamespaceId(namespace_id),
125			name: name.to_string(),
126			columns: vec![],
127			primary_key: None,
128		}
129	}
130
131	mod track_table_def_created {
132		use super::*;
133
134		#[test]
135		fn test_successful_creation() {
136			let mut txn = create_test_command_transaction();
137
138			let table = test_table_def(1, 1, "test_table");
139			let result = txn.track_table_def_created(table.clone());
140			assert!(result.is_ok());
141
142			// Verify the change was recorded
143			assert_eq!(txn.changes.table_def.len(), 1);
144			let change = &txn.changes.table_def[0];
145			assert!(change.pre.is_none());
146			assert_eq!(change.post.as_ref().unwrap().name, "test_table");
147			assert_eq!(change.op, Create);
148
149			// Verify operation was logged
150			assert_eq!(txn.changes.log.len(), 1);
151			match &txn.changes.log[0] {
152				Operation::Table {
153					id,
154					op,
155				} if *id == table.id && *op == Create => {}
156				_ => panic!("Expected Table operation with Create"),
157			}
158		}
159	}
160
161	mod track_table_def_updated {
162		use super::*;
163
164		#[test]
165		fn test_multiple_updates_no_coalescing() {
166			let mut txn = create_test_command_transaction();
167			let table_v1 = test_table_def(1, 1, "table_v1");
168			let table_v2 = test_table_def(1, 1, "table_v2");
169			let table_v3 = test_table_def(1, 1, "table_v3");
170
171			// First update
172			txn.track_table_def_updated(table_v1.clone(), table_v2.clone()).unwrap();
173
174			// Should have one change
175			assert_eq!(txn.changes.table_def.len(), 1);
176			assert_eq!(txn.changes.table_def[0].pre.as_ref().unwrap().name, "table_v1");
177			assert_eq!(txn.changes.table_def[0].post.as_ref().unwrap().name, "table_v2");
178			assert_eq!(txn.changes.table_def[0].op, Update);
179
180			// Second update - should NOT coalesce
181			txn.track_table_def_updated(table_v2, table_v3.clone()).unwrap();
182
183			// Should now have TWO changes
184			assert_eq!(txn.changes.table_def.len(), 2);
185
186			// Second update recorded separately
187			assert_eq!(txn.changes.table_def[1].pre.as_ref().unwrap().name, "table_v2");
188			assert_eq!(txn.changes.table_def[1].post.as_ref().unwrap().name, "table_v3");
189
190			// Should have 2 log entries
191			assert_eq!(txn.changes.log.len(), 2);
192		}
193
194		#[test]
195		fn test_create_then_update_no_coalescing() {
196			let mut txn = create_test_command_transaction();
197			let table_v1 = test_table_def(1, 1, "table_v1");
198			let table_v2 = test_table_def(1, 1, "table_v2");
199
200			// First track creation
201			txn.track_table_def_created(table_v1.clone()).unwrap();
202			assert_eq!(txn.changes.table_def.len(), 1);
203			assert_eq!(txn.changes.table_def[0].op, Create);
204
205			// Then track update - should NOT coalesce
206			txn.track_table_def_updated(table_v1, table_v2.clone()).unwrap();
207
208			// Should have TWO changes now
209			assert_eq!(txn.changes.table_def.len(), 2);
210
211			// First is still Create
212			assert_eq!(txn.changes.table_def[0].op, Create);
213
214			// Second is Update
215			assert_eq!(txn.changes.table_def[1].op, Update);
216
217			// Should have 2 log entries
218			assert_eq!(txn.changes.log.len(), 2);
219		}
220	}
221
222	mod track_table_def_deleted {
223		use super::*;
224
225		#[test]
226		fn test_delete_after_create_no_coalescing() {
227			let mut txn = create_test_command_transaction();
228			let table = test_table_def(1, 1, "test_table");
229
230			// First track creation
231			txn.track_table_def_created(table.clone()).unwrap();
232			assert_eq!(txn.changes.table_def.len(), 1);
233
234			// Then track deletion
235			let result = txn.track_table_def_deleted(table.clone());
236			assert!(result.is_ok());
237
238			// Should have TWO changes now (no coalescing)
239			assert_eq!(txn.changes.table_def.len(), 2);
240
241			// First is Create
242			assert_eq!(txn.changes.table_def[0].op, Create);
243
244			// Second is Delete
245			assert_eq!(txn.changes.table_def[1].op, Delete);
246
247			// Should have 2 log entries
248			assert_eq!(txn.changes.log.len(), 2);
249		}
250
251		#[test]
252		fn test_normal_delete() {
253			let mut txn = create_test_command_transaction();
254			let table = test_table_def(1, 1, "test_table");
255
256			let result = txn.track_table_def_deleted(table.clone());
257			assert!(result.is_ok());
258
259			// Verify the change was recorded
260			assert_eq!(txn.changes.table_def.len(), 1);
261			let change = &txn.changes.table_def[0];
262			assert_eq!(change.pre.as_ref().unwrap().name, "test_table");
263			assert!(change.post.is_none());
264			assert_eq!(change.op, Delete);
265
266			// Verify operation was logged
267			assert_eq!(txn.changes.log.len(), 1);
268			match &txn.changes.log[0] {
269				Operation::Table {
270					id,
271					op,
272				} if *id == table.id && *op == Delete => {}
273				_ => panic!("Expected Table operation with Delete"),
274			}
275		}
276	}
277}