reifydb_catalog/materialized/
table.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	CommitVersion,
6	interface::{NamespaceId, TableDef, TableId},
7};
8
9use crate::materialized::{MaterializedCatalog, MultiVersionTableDef};
10
11impl MaterializedCatalog {
12	/// Find a table by ID at a specific version
13	pub fn find_table(&self, table: TableId, version: CommitVersion) -> Option<TableDef> {
14		self.tables.get(&table).and_then(|entry| {
15			let multi = entry.value();
16			multi.get(version)
17		})
18	}
19
20	/// Find a table by name in a namespace at a specific version
21	pub fn find_table_by_name(
22		&self,
23		namespace: NamespaceId,
24		name: &str,
25		version: CommitVersion,
26	) -> Option<TableDef> {
27		self.tables_by_name.get(&(namespace, name.to_string())).and_then(|entry| {
28			let table_id = *entry.value();
29			self.find_table(table_id, version)
30		})
31	}
32
33	pub fn set_table(&self, id: TableId, version: CommitVersion, table: Option<TableDef>) {
34		if let Some(entry) = self.tables.get(&id) {
35			if let Some(pre) = entry.value().get_latest() {
36				// Remove old name from index
37				self.tables_by_name.remove(&(pre.namespace, pre.name.clone()));
38			}
39		}
40
41		let multi = self.tables.get_or_insert_with(id, MultiVersionTableDef::new);
42		if let Some(new) = table {
43			self.tables_by_name.insert((new.namespace, new.name.clone()), id);
44			multi.value().insert(version, new);
45		} else {
46			multi.value().remove(version);
47		}
48	}
49}
50
51#[cfg(test)]
52mod tests {
53	use reifydb_core::interface::{ColumnDef, ColumnId, ColumnIndex};
54	use reifydb_type::{Type, TypeConstraint};
55
56	use super::*;
57
58	fn create_test_table(id: TableId, namespace: NamespaceId, name: &str) -> TableDef {
59		TableDef {
60			id,
61			namespace,
62			name: name.to_string(),
63			columns: vec![
64				ColumnDef {
65					id: ColumnId(1),
66					name: "id".to_string(),
67					constraint: TypeConstraint::unconstrained(Type::Int4),
68					policies: vec![],
69					index: ColumnIndex(0),
70					auto_increment: true,
71					dictionary_id: None,
72				},
73				ColumnDef {
74					id: ColumnId(2),
75					name: "name".to_string(),
76					constraint: TypeConstraint::unconstrained(Type::Utf8),
77					policies: vec![],
78					index: ColumnIndex(1),
79					auto_increment: false,
80					dictionary_id: None,
81				},
82			],
83			primary_key: None,
84		}
85	}
86
87	#[test]
88	fn test_set_and_find_table() {
89		let catalog = MaterializedCatalog::new();
90		let table_id = TableId(1);
91		let namespace_id = NamespaceId(1);
92		let table = create_test_table(table_id, namespace_id, "test_table");
93
94		// Set table at version 1
95		catalog.set_table(table_id, CommitVersion(1), Some(table.clone()));
96
97		// Find table at version 1
98		let found = catalog.find_table(table_id, CommitVersion(1));
99		assert_eq!(found, Some(table.clone()));
100
101		// Find table at later version (should return same table)
102		let found = catalog.find_table(table_id, CommitVersion(5));
103		assert_eq!(found, Some(table));
104
105		// Table shouldn't exist at version 0
106		let found = catalog.find_table(table_id, CommitVersion(0));
107		assert_eq!(found, None);
108	}
109
110	#[test]
111	fn test_find_table_by_name() {
112		let catalog = MaterializedCatalog::new();
113		let table_id = TableId(1);
114		let namespace_id = NamespaceId(1);
115		let table = create_test_table(table_id, namespace_id, "named_table");
116
117		// Set table
118		catalog.set_table(table_id, CommitVersion(1), Some(table.clone()));
119
120		// Find by name
121		let found = catalog.find_table_by_name(namespace_id, "named_table", CommitVersion(1));
122		assert_eq!(found, Some(table));
123
124		// Shouldn't find with wrong name
125		let found = catalog.find_table_by_name(namespace_id, "wrong_name", CommitVersion(1));
126		assert_eq!(found, None);
127
128		// Shouldn't find in wrong namespace
129		let found = catalog.find_table_by_name(NamespaceId(2), "named_table", CommitVersion(1));
130		assert_eq!(found, None);
131	}
132
133	#[test]
134	fn test_table_rename() {
135		let catalog = MaterializedCatalog::new();
136		let table_id = TableId(1);
137		let namespace_id = NamespaceId(1);
138
139		// Create and set initial table
140		let table_v1 = create_test_table(table_id, namespace_id, "old_name");
141		catalog.set_table(table_id, CommitVersion(1), Some(table_v1.clone()));
142
143		// Verify initial state
144		assert!(catalog.find_table_by_name(namespace_id, "old_name", CommitVersion(1)).is_some());
145		assert!(catalog.find_table_by_name(namespace_id, "new_name", CommitVersion(1)).is_none());
146
147		// Rename the table
148		let mut table_v2 = table_v1.clone();
149		table_v2.name = "new_name".to_string();
150		catalog.set_table(table_id, CommitVersion(2), Some(table_v2.clone()));
151
152		// Old name should be gone
153		assert!(catalog.find_table_by_name(namespace_id, "old_name", CommitVersion(2)).is_none());
154
155		// New name can be found
156		assert_eq!(
157			catalog.find_table_by_name(namespace_id, "new_name", CommitVersion(2)),
158			Some(table_v2.clone())
159		);
160
161		// Historical query at version 1 should still show old name
162		assert_eq!(catalog.find_table(table_id, CommitVersion(1)), Some(table_v1));
163
164		// Current version should show new name
165		assert_eq!(catalog.find_table(table_id, CommitVersion(2)), Some(table_v2));
166	}
167
168	#[test]
169	fn test_table_move_between_namespaces() {
170		let catalog = MaterializedCatalog::new();
171		let table_id = TableId(1);
172		let namespace1 = NamespaceId(1);
173		let namespace2 = NamespaceId(2);
174
175		// Create table in namespace1
176		let table_v1 = create_test_table(table_id, namespace1, "movable_table");
177		catalog.set_table(table_id, CommitVersion(1), Some(table_v1.clone()));
178
179		// Verify it's in namespace1
180		assert!(catalog.find_table_by_name(namespace1, "movable_table", CommitVersion(1)).is_some());
181		assert!(catalog.find_table_by_name(namespace2, "movable_table", CommitVersion(1)).is_none());
182
183		// Move to namespace2
184		let mut table_v2 = table_v1.clone();
185		table_v2.namespace = namespace2;
186		catalog.set_table(table_id, CommitVersion(2), Some(table_v2.clone()));
187
188		// Should no longer be in namespace1
189		assert!(catalog.find_table_by_name(namespace1, "movable_table", CommitVersion(2)).is_none());
190
191		// Should now be in namespace2
192		assert!(catalog.find_table_by_name(namespace2, "movable_table", CommitVersion(2)).is_some());
193	}
194
195	#[test]
196	fn test_table_deletion() {
197		let catalog = MaterializedCatalog::new();
198		let table_id = TableId(1);
199		let namespace_id = NamespaceId(1);
200
201		// Create and set table
202		let table = create_test_table(table_id, namespace_id, "deletable_table");
203		catalog.set_table(table_id, CommitVersion(1), Some(table.clone()));
204
205		// Verify it exists
206		assert_eq!(catalog.find_table(table_id, CommitVersion(1)), Some(table.clone()));
207		assert!(catalog.find_table_by_name(namespace_id, "deletable_table", CommitVersion(1)).is_some());
208
209		// Delete the table
210		catalog.set_table(table_id, CommitVersion(2), None);
211
212		// Should not exist at version 2
213		assert_eq!(catalog.find_table(table_id, CommitVersion(2)), None);
214		assert!(catalog.find_table_by_name(namespace_id, "deletable_table", CommitVersion(2)).is_none());
215
216		// Should still exist at version 1 (historical)
217		assert_eq!(catalog.find_table(table_id, CommitVersion(1)), Some(table));
218	}
219
220	#[test]
221	fn test_multiple_tables_in_namespace() {
222		let catalog = MaterializedCatalog::new();
223		let namespace_id = NamespaceId(1);
224
225		let table1 = create_test_table(TableId(1), namespace_id, "table1");
226		let table2 = create_test_table(TableId(2), namespace_id, "table2");
227		let table3 = create_test_table(TableId(3), namespace_id, "table3");
228
229		// Set multiple tables
230		catalog.set_table(TableId(1), CommitVersion(1), Some(table1.clone()));
231		catalog.set_table(TableId(2), CommitVersion(1), Some(table2.clone()));
232		catalog.set_table(TableId(3), CommitVersion(1), Some(table3.clone()));
233
234		// All should be findable
235		assert_eq!(catalog.find_table_by_name(namespace_id, "table1", CommitVersion(1)), Some(table1));
236		assert_eq!(catalog.find_table_by_name(namespace_id, "table2", CommitVersion(1)), Some(table2));
237		assert_eq!(catalog.find_table_by_name(namespace_id, "table3", CommitVersion(1)), Some(table3));
238	}
239
240	#[test]
241	fn test_table_versioning() {
242		let catalog = MaterializedCatalog::new();
243		let table_id = TableId(1);
244		let namespace_id = NamespaceId(1);
245
246		// Create multiple versions
247		let table_v1 = create_test_table(table_id, namespace_id, "table_v1");
248		let mut table_v2 = table_v1.clone();
249		table_v2.name = "table_v2".to_string();
250		let mut table_v3 = table_v2.clone();
251		table_v3.name = "table_v3".to_string();
252
253		// Set at different versions
254		catalog.set_table(table_id, CommitVersion(10), Some(table_v1.clone()));
255		catalog.set_table(table_id, CommitVersion(20), Some(table_v2.clone()));
256		catalog.set_table(table_id, CommitVersion(30), Some(table_v3.clone()));
257
258		// Query at different versions
259		assert_eq!(catalog.find_table(table_id, CommitVersion(5)), None);
260		assert_eq!(catalog.find_table(table_id, CommitVersion(10)), Some(table_v1.clone()));
261		assert_eq!(catalog.find_table(table_id, CommitVersion(15)), Some(table_v1));
262		assert_eq!(catalog.find_table(table_id, CommitVersion(20)), Some(table_v2.clone()));
263		assert_eq!(catalog.find_table(table_id, CommitVersion(25)), Some(table_v2));
264		assert_eq!(catalog.find_table(table_id, CommitVersion(30)), Some(table_v3.clone()));
265		assert_eq!(catalog.find_table(table_id, CommitVersion(100)), Some(table_v3));
266	}
267}