reifydb_engine/transaction/catalog/
namespace.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackNamespaceChangeOperations;
6use reifydb_core::interface::{
7	Change, NamespaceDef, NamespaceId, OperationType, OperationType::Delete, TransactionalNamespaceChanges,
8};
9
10use crate::{StandardCommandTransaction, StandardQueryTransaction};
11
12impl CatalogTrackNamespaceChangeOperations for StandardCommandTransaction {
13	fn track_namespace_def_created(&mut self, namespace: NamespaceDef) -> reifydb_core::Result<()> {
14		let change = Change {
15			pre: None,
16			post: Some(namespace),
17			op: Create,
18		};
19		self.changes.add_namespace_def_change(change);
20		Ok(())
21	}
22
23	fn track_namespace_def_updated(&mut self, pre: NamespaceDef, post: NamespaceDef) -> reifydb_core::Result<()> {
24		let change = Change {
25			pre: Some(pre),
26			post: Some(post),
27			op: Update,
28		};
29		self.changes.add_namespace_def_change(change);
30		Ok(())
31	}
32
33	fn track_namespace_def_deleted(&mut self, namespace: NamespaceDef) -> reifydb_core::Result<()> {
34		let change = Change {
35			pre: Some(namespace),
36			post: None,
37			op: Delete,
38		};
39		self.changes.add_namespace_def_change(change);
40		Ok(())
41	}
42}
43
44impl TransactionalNamespaceChanges for StandardCommandTransaction {
45	fn find_namespace(&self, id: NamespaceId) -> Option<&NamespaceDef> {
46		// Find the last change for this namespace ID
47		for change in self.changes.namespace_def.iter().rev() {
48			if let Some(namespace) = &change.post {
49				if namespace.id == id {
50					return Some(namespace);
51				}
52			} else if let Some(namespace) = &change.pre {
53				if namespace.id == id && change.op == Delete {
54					// Namespace was deleted
55					return None;
56				}
57			}
58		}
59		None
60	}
61
62	fn find_namespace_by_name(&self, name: &str) -> Option<&NamespaceDef> {
63		self.changes
64			.namespace_def
65			.iter()
66			.rev()
67			.find_map(|change| change.post.as_ref().filter(|s| s.name == name))
68	}
69
70	fn is_namespace_deleted(&self, id: NamespaceId) -> bool {
71		self.changes
72			.namespace_def
73			.iter()
74			.rev()
75			.any(|change| change.op == Delete && change.pre.as_ref().map(|s| s.id) == Some(id))
76	}
77
78	fn is_namespace_deleted_by_name(&self, name: &str) -> bool {
79		self.changes
80			.namespace_def
81			.iter()
82			.rev()
83			.any(|change| change.op == Delete && change.pre.as_ref().map(|s| s.name.as_str()) == Some(name))
84	}
85}
86
87impl TransactionalNamespaceChanges for StandardQueryTransaction {
88	fn find_namespace(&self, _id: NamespaceId) -> Option<&NamespaceDef> {
89		None
90	}
91
92	fn find_namespace_by_name(&self, _name: &str) -> Option<&NamespaceDef> {
93		None
94	}
95
96	fn is_namespace_deleted(&self, _id: NamespaceId) -> bool {
97		false
98	}
99
100	fn is_namespace_deleted_by_name(&self, _name: &str) -> bool {
101		false
102	}
103}
104
105#[cfg(test)]
106mod tests {
107	use reifydb_catalog::transaction::CatalogTrackNamespaceChangeOperations;
108	use reifydb_core::interface::{
109		NamespaceDef, NamespaceId, Operation,
110		OperationType::{Create, Delete, Update},
111	};
112
113	use crate::test_utils::create_test_command_transaction;
114
115	// Helper function to create test namespace definition
116	async fn test_namespace_def(id: u64, name: &str) -> NamespaceDef {
117		NamespaceDef {
118			id: NamespaceId(id),
119			name: name.to_string(),
120		}
121	}
122
123	mod track_namespace_def_created {
124		use super::*;
125
126		#[tokio::test]
127		async fn test_successful_creation() {
128			let mut txn = create_test_command_transaction().await;
129			let namespace = test_namespace_def(1, "test_namespace").await;
130
131			let result = txn.track_namespace_def_created(namespace.clone());
132			assert!(result.is_ok());
133
134			// Verify the change was recorded in the Vec
135			assert_eq!(txn.changes.namespace_def.len(), 1);
136			let change = &txn.changes.namespace_def[0];
137			assert!(change.pre.is_none());
138			assert_eq!(change.post.as_ref().unwrap().name, "test_namespace");
139			assert_eq!(change.op, Create);
140
141			// Verify operation was logged
142			assert_eq!(txn.changes.log.len(), 1);
143			match &txn.changes.log[0] {
144				Operation::Namespace {
145					id,
146					op,
147				} if *id == namespace.id && *op == Create => {}
148				_ => panic!("Expected Namespace operation with Create"),
149			}
150		}
151	}
152
153	mod track_namespace_def_updated {
154		use super::*;
155
156		#[tokio::test]
157		async fn test_multiple_updates_no_coalescing() {
158			let mut txn = create_test_command_transaction().await;
159			let namespace_v1 = test_namespace_def(1, "namespace_v1").await;
160			let namespace_v2 = test_namespace_def(1, "namespace_v2").await;
161			let namespace_v3 = test_namespace_def(1, "namespace_v3").await;
162
163			// First update
164			txn.track_namespace_def_updated(namespace_v1.clone(), namespace_v2.clone()).unwrap();
165
166			// Should have one change
167			assert_eq!(txn.changes.namespace_def.len(), 1);
168			assert_eq!(txn.changes.namespace_def[0].pre.as_ref().unwrap().name, "namespace_v1");
169			assert_eq!(txn.changes.namespace_def[0].post.as_ref().unwrap().name, "namespace_v2");
170			assert_eq!(txn.changes.namespace_def[0].op, Update);
171
172			// Second update - should NOT coalesce
173			txn.track_namespace_def_updated(namespace_v2, namespace_v3.clone()).unwrap();
174
175			// Should now have TWO changes (no coalescing)
176			assert_eq!(txn.changes.namespace_def.len(), 2);
177
178			// First update unchanged
179			assert_eq!(txn.changes.namespace_def[0].pre.as_ref().unwrap().name, "namespace_v1");
180
181			// Second update recorded separately
182			assert_eq!(txn.changes.namespace_def[1].pre.as_ref().unwrap().name, "namespace_v2");
183			assert_eq!(txn.changes.namespace_def[1].post.as_ref().unwrap().name, "namespace_v3");
184
185			// Should have 2 log entries
186			assert_eq!(txn.changes.log.len(), 2);
187		}
188
189		#[tokio::test]
190		async fn test_create_then_update_no_coalescing() {
191			let mut txn = create_test_command_transaction().await;
192			let namespace_v1 = test_namespace_def(1, "namespace_v1").await;
193			let namespace_v2 = test_namespace_def(1, "namespace_v2").await;
194
195			// First track creation
196			txn.track_namespace_def_created(namespace_v1.clone()).unwrap();
197			assert_eq!(txn.changes.namespace_def.len(), 1);
198			assert_eq!(txn.changes.namespace_def[0].op, Create);
199
200			// Then track update - should NOT coalesce
201			txn.track_namespace_def_updated(namespace_v1, namespace_v2.clone()).unwrap();
202
203			// Should have TWO changes now
204			assert_eq!(txn.changes.namespace_def.len(), 2);
205
206			// First is still Create
207			assert_eq!(txn.changes.namespace_def[0].op, Create);
208			assert_eq!(txn.changes.namespace_def[0].post.as_ref().unwrap().name, "namespace_v1");
209
210			// Second is Update
211			assert_eq!(txn.changes.namespace_def[1].op, Update);
212		}
213
214		#[tokio::test]
215		async fn test_normal_update() {
216			let mut txn = create_test_command_transaction().await;
217			let namespace_v1 = test_namespace_def(1, "namespace_v1").await;
218			let namespace_v2 = test_namespace_def(1, "namespace_v2").await;
219
220			let result = txn.track_namespace_def_updated(namespace_v1.clone(), namespace_v2.clone());
221			assert!(result.is_ok());
222
223			// Verify the change was recorded
224			assert_eq!(txn.changes.namespace_def.len(), 1);
225			let change = &txn.changes.namespace_def[0];
226			assert_eq!(change.pre.as_ref().unwrap().name, "namespace_v1");
227			assert_eq!(change.post.as_ref().unwrap().name, "namespace_v2");
228			assert_eq!(change.op, Update);
229
230			// Verify operation was logged
231			assert_eq!(txn.changes.log.len(), 1);
232			match &txn.changes.log[0] {
233				Operation::Namespace {
234					id,
235					op,
236				} if *id == NamespaceId(1) && *op == Update => {}
237				_ => panic!("Expected Namespace operation with Update"),
238			}
239		}
240	}
241
242	mod track_namespace_def_deleted {
243		use super::*;
244
245		#[tokio::test]
246		async fn test_delete_after_create_no_coalescing() {
247			let mut txn = create_test_command_transaction().await;
248			let namespace = test_namespace_def(1, "test_namespace").await;
249
250			// First track creation
251			txn.track_namespace_def_created(namespace.clone()).unwrap();
252			assert_eq!(txn.changes.log.len(), 1);
253			assert_eq!(txn.changes.namespace_def.len(), 1);
254
255			// Then track deletion - should NOT remove, just add
256			let result = txn.track_namespace_def_deleted(namespace.clone());
257			assert!(result.is_ok());
258
259			// Should have TWO changes now (no coalescing)
260			assert_eq!(txn.changes.namespace_def.len(), 2);
261
262			// First is Create
263			assert_eq!(txn.changes.namespace_def[0].op, Create);
264
265			// Second is Delete
266			assert_eq!(txn.changes.namespace_def[1].op, Delete);
267			assert_eq!(txn.changes.namespace_def[1].pre.as_ref().unwrap().name, "test_namespace");
268
269			// Should have 2 log entries
270			assert_eq!(txn.changes.log.len(), 2);
271		}
272
273		#[tokio::test]
274		async fn test_delete_after_update_no_coalescing() {
275			let mut txn = create_test_command_transaction().await;
276			let namespace_v1 = test_namespace_def(1, "namespace_v1").await;
277			let namespace_v2 = test_namespace_def(1, "namespace_v2").await;
278
279			// First track update
280			txn.track_namespace_def_updated(namespace_v1.clone(), namespace_v2.clone()).unwrap();
281			assert_eq!(txn.changes.namespace_def.len(), 1);
282
283			// Then track deletion
284			let result = txn.track_namespace_def_deleted(namespace_v2);
285			assert!(result.is_ok());
286
287			// Should have TWO changes (no coalescing)
288			assert_eq!(txn.changes.namespace_def.len(), 2);
289
290			// First is Update
291			assert_eq!(txn.changes.namespace_def[0].op, Update);
292
293			// Second is Delete
294			assert_eq!(txn.changes.namespace_def[1].op, Delete);
295
296			// Should have 2 log entries
297			assert_eq!(txn.changes.log.len(), 2);
298		}
299
300		#[tokio::test]
301		async fn test_normal_delete() {
302			let mut txn = create_test_command_transaction().await;
303			let namespace = test_namespace_def(1, "test_namespace").await;
304
305			let result = txn.track_namespace_def_deleted(namespace.clone());
306			assert!(result.is_ok());
307
308			// Verify the change was recorded
309			assert_eq!(txn.changes.namespace_def.len(), 1);
310			let change = &txn.changes.namespace_def[0];
311			assert_eq!(change.pre.as_ref().unwrap().name, "test_namespace");
312			assert!(change.post.is_none());
313			assert_eq!(change.op, Delete);
314
315			// Verify operation was logged
316			assert_eq!(txn.changes.log.len(), 1);
317			match &txn.changes.log[0] {
318				Operation::Namespace {
319					id,
320					op,
321				} if *id == namespace.id && *op == Delete => {}
322				_ => panic!("Expected Namespace operation with Delete"),
323			}
324		}
325	}
326}