reifydb_catalog/materialized/
namespace.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	CommitVersion,
6	interface::{NamespaceDef, NamespaceId},
7};
8
9use crate::materialized::{MaterializedCatalog, MultiVersionNamespaceDef};
10
11impl MaterializedCatalog {
12	/// Find a namespace by ID at a specific version
13	pub fn find_namespace(&self, namespace: NamespaceId, version: CommitVersion) -> Option<NamespaceDef> {
14		self.namespaces.get(&namespace).and_then(|entry| {
15			let multi = entry.value();
16			multi.get(version)
17		})
18	}
19
20	/// Find a namespace by name at a specific version
21	pub fn find_namespace_by_name(&self, namespace: &str, version: CommitVersion) -> Option<NamespaceDef> {
22		self.namespaces_by_name.get(namespace).and_then(|entry| {
23			let namespace_id = *entry.value();
24			self.find_namespace(namespace_id, version)
25		})
26	}
27
28	pub fn set_namespace(&self, id: NamespaceId, version: CommitVersion, namespace: Option<NamespaceDef>) {
29		// Look up the current namespace to update the index
30		if let Some(entry) = self.namespaces.get(&id) {
31			if let Some(pre) = entry.value().get_latest() {
32				// Remove old name from index
33				self.namespaces_by_name.remove(&pre.name);
34			}
35		}
36
37		let multi = self.namespaces.get_or_insert_with(id, MultiVersionNamespaceDef::new);
38		if let Some(new) = namespace {
39			self.namespaces_by_name.insert(new.name.clone(), id);
40			multi.value().insert(version, new);
41		} else {
42			multi.value().remove(version);
43		}
44	}
45}
46
47#[cfg(test)]
48mod tests {
49	use super::*;
50
51	fn create_test_namespace(id: NamespaceId, name: &str) -> NamespaceDef {
52		NamespaceDef {
53			id,
54			name: name.to_string(),
55		}
56	}
57
58	#[test]
59	fn test_set_and_find_namespace() {
60		let catalog = MaterializedCatalog::new();
61		let namespace_id = NamespaceId(1);
62		let namespace = create_test_namespace(namespace_id, "test_namespace");
63
64		// Set namespace at version 1
65		catalog.set_namespace(namespace_id, CommitVersion(1), Some(namespace.clone()));
66
67		// Find namespace at version 1
68		let found = catalog.find_namespace(namespace_id, CommitVersion(1));
69		assert_eq!(found, Some(namespace.clone()));
70
71		// Find namespace at later version (should return same
72		// namespace)
73		let found = catalog.find_namespace(namespace_id, CommitVersion(5));
74		assert_eq!(found, Some(namespace));
75
76		// Namespace shouldn't exist at version 0
77		let found = catalog.find_namespace(namespace_id, CommitVersion(0));
78		assert_eq!(found, None);
79	}
80
81	#[test]
82	fn test_find_namespace_by_name() {
83		let catalog = MaterializedCatalog::new();
84		let namespace_id = NamespaceId(1);
85		let namespace = create_test_namespace(namespace_id, "named_namespace");
86
87		// Set namespace
88		catalog.set_namespace(namespace_id, CommitVersion(1), Some(namespace.clone()));
89
90		// Find by name
91		let found = catalog.find_namespace_by_name("named_namespace", CommitVersion(1));
92		assert_eq!(found, Some(namespace));
93
94		// Shouldn't find with wrong name
95		let found = catalog.find_namespace_by_name("wrong_name", CommitVersion(1));
96		assert_eq!(found, None);
97	}
98
99	#[test]
100	fn test_namespace_rename() {
101		let catalog = MaterializedCatalog::new();
102		let namespace_id = NamespaceId(1);
103
104		// Create and set initial namespace
105		let namespace_v1 = create_test_namespace(namespace_id, "old_name");
106		catalog.set_namespace(namespace_id, CommitVersion(1), Some(namespace_v1.clone()));
107
108		// Verify initial state
109		assert!(catalog.find_namespace_by_name("old_name", CommitVersion(1)).is_some());
110		assert!(catalog.find_namespace_by_name("new_name", CommitVersion(1)).is_none());
111
112		// Rename the namespace
113		let mut namespace_v2 = namespace_v1.clone();
114		namespace_v2.name = "new_name".to_string();
115		catalog.set_namespace(namespace_id, CommitVersion(2), Some(namespace_v2.clone()));
116
117		// Old name should be gone
118		assert!(catalog.find_namespace_by_name("old_name", CommitVersion(2)).is_none());
119
120		// New name can be found
121		assert_eq!(catalog.find_namespace_by_name("new_name", CommitVersion(2)), Some(namespace_v2.clone()));
122
123		// Historical query at version 1 should still show old name
124		assert_eq!(catalog.find_namespace(namespace_id, CommitVersion(1)), Some(namespace_v1));
125
126		// Current version should show new name
127		assert_eq!(catalog.find_namespace(namespace_id, CommitVersion(2)), Some(namespace_v2));
128	}
129
130	#[test]
131	fn test_namespace_deletion() {
132		let catalog = MaterializedCatalog::new();
133		let namespace_id = NamespaceId(1);
134
135		// Create and set namespace
136		let namespace = create_test_namespace(namespace_id, "deletable_namespace");
137		catalog.set_namespace(namespace_id, CommitVersion(1), Some(namespace.clone()));
138
139		// Verify it exists
140		assert_eq!(catalog.find_namespace(namespace_id, CommitVersion(1)), Some(namespace.clone()));
141		assert!(catalog.find_namespace_by_name("deletable_namespace", CommitVersion(1)).is_some());
142
143		// Delete the namespace
144		catalog.set_namespace(namespace_id, CommitVersion(2), None);
145
146		// Should not exist at version 2
147		assert_eq!(catalog.find_namespace(namespace_id, CommitVersion(2)), None);
148		assert!(catalog.find_namespace_by_name("deletable_namespace", CommitVersion(2)).is_none());
149
150		// Should still exist at version 1 (historical)
151		assert_eq!(catalog.find_namespace(namespace_id, CommitVersion(1)), Some(namespace));
152	}
153
154	#[test]
155	fn test_multiple_namespaces() {
156		let catalog = MaterializedCatalog::new();
157
158		let namespace1 = create_test_namespace(NamespaceId(1), "namespace1");
159		let namespace2 = create_test_namespace(NamespaceId(2), "namespace2");
160		let namespace3 = create_test_namespace(NamespaceId(3), "namespace3");
161
162		// Set multiple namespaces
163		catalog.set_namespace(NamespaceId(1), CommitVersion(1), Some(namespace1.clone()));
164		catalog.set_namespace(NamespaceId(2), CommitVersion(1), Some(namespace2.clone()));
165		catalog.set_namespace(NamespaceId(3), CommitVersion(1), Some(namespace3.clone()));
166
167		// All should be findable
168		assert_eq!(catalog.find_namespace_by_name("namespace1", CommitVersion(1)), Some(namespace1));
169		assert_eq!(catalog.find_namespace_by_name("namespace2", CommitVersion(1)), Some(namespace2));
170		assert_eq!(catalog.find_namespace_by_name("namespace3", CommitVersion(1)), Some(namespace3));
171	}
172
173	#[test]
174	fn test_namespace_versioning() {
175		let catalog = MaterializedCatalog::new();
176		let namespace_id = NamespaceId(2);
177
178		// Create multiple versions
179		let namespace_v1 = create_test_namespace(namespace_id, "namespace_v1");
180		let mut namespace_v2 = namespace_v1.clone();
181		namespace_v2.name = "namespace_v2".to_string();
182		let mut namespace_v3 = namespace_v2.clone();
183		namespace_v3.name = "namespace_v3".to_string();
184
185		// Set at different versions
186		catalog.set_namespace(namespace_id, CommitVersion(10), Some(namespace_v1.clone()));
187		catalog.set_namespace(namespace_id, CommitVersion(20), Some(namespace_v2.clone()));
188		catalog.set_namespace(namespace_id, CommitVersion(30), Some(namespace_v3.clone()));
189
190		// Query at different versions
191		assert_eq!(catalog.find_namespace(namespace_id, CommitVersion(5)), None);
192		assert_eq!(catalog.find_namespace(namespace_id, CommitVersion(10)), Some(namespace_v1.clone()));
193		assert_eq!(catalog.find_namespace(namespace_id, CommitVersion(15)), Some(namespace_v1));
194		assert_eq!(catalog.find_namespace(namespace_id, CommitVersion(20)), Some(namespace_v2.clone()));
195		assert_eq!(catalog.find_namespace(namespace_id, CommitVersion(25)), Some(namespace_v2));
196		assert_eq!(catalog.find_namespace(namespace_id, CommitVersion(30)), Some(namespace_v3.clone()));
197		assert_eq!(catalog.find_namespace(namespace_id, CommitVersion(100)), Some(namespace_v3));
198	}
199}