1use reifydb_core::{
5 CommitVersion,
6 interface::{NamespaceId, TableDef, TableId},
7};
8
9use crate::materialized::{MaterializedCatalog, MultiVersionTableDef};
10
11impl MaterializedCatalog {
12 pub fn find_table(&self, table: TableId, version: CommitVersion) -> Option<TableDef> {
14 self.tables.get(&table).and_then(|entry| {
15 let multi = entry.value();
16 multi.get(version)
17 })
18 }
19
20 pub fn find_table_by_name(
22 &self,
23 namespace: NamespaceId,
24 name: &str,
25 version: CommitVersion,
26 ) -> Option<TableDef> {
27 self.tables_by_name.get(&(namespace, name.to_string())).and_then(|entry| {
28 let table_id = *entry.value();
29 self.find_table(table_id, version)
30 })
31 }
32
33 pub fn set_table(&self, id: TableId, version: CommitVersion, table: Option<TableDef>) {
34 if let Some(entry) = self.tables.get(&id) {
35 if let Some(pre) = entry.value().get_latest() {
36 self.tables_by_name.remove(&(pre.namespace, pre.name.clone()));
38 }
39 }
40
41 let multi = self.tables.get_or_insert_with(id, MultiVersionTableDef::new);
42 if let Some(new) = table {
43 self.tables_by_name.insert((new.namespace, new.name.clone()), id);
44 multi.value().insert(version, new);
45 } else {
46 multi.value().remove(version);
47 }
48 }
49}
50
51#[cfg(test)]
52mod tests {
53 use reifydb_core::interface::{ColumnDef, ColumnId, ColumnIndex};
54 use reifydb_type::{Type, TypeConstraint};
55
56 use super::*;
57
58 fn create_test_table(id: TableId, namespace: NamespaceId, name: &str) -> TableDef {
59 TableDef {
60 id,
61 namespace,
62 name: name.to_string(),
63 columns: vec![
64 ColumnDef {
65 id: ColumnId(1),
66 name: "id".to_string(),
67 constraint: TypeConstraint::unconstrained(Type::Int4),
68 policies: vec![],
69 index: ColumnIndex(0),
70 auto_increment: true,
71 dictionary_id: None,
72 },
73 ColumnDef {
74 id: ColumnId(2),
75 name: "name".to_string(),
76 constraint: TypeConstraint::unconstrained(Type::Utf8),
77 policies: vec![],
78 index: ColumnIndex(1),
79 auto_increment: false,
80 dictionary_id: None,
81 },
82 ],
83 primary_key: None,
84 }
85 }
86
87 #[test]
88 fn test_set_and_find_table() {
89 let catalog = MaterializedCatalog::new();
90 let table_id = TableId(1);
91 let namespace_id = NamespaceId(1);
92 let table = create_test_table(table_id, namespace_id, "test_table");
93
94 catalog.set_table(table_id, CommitVersion(1), Some(table.clone()));
96
97 let found = catalog.find_table(table_id, CommitVersion(1));
99 assert_eq!(found, Some(table.clone()));
100
101 let found = catalog.find_table(table_id, CommitVersion(5));
103 assert_eq!(found, Some(table));
104
105 let found = catalog.find_table(table_id, CommitVersion(0));
107 assert_eq!(found, None);
108 }
109
110 #[test]
111 fn test_find_table_by_name() {
112 let catalog = MaterializedCatalog::new();
113 let table_id = TableId(1);
114 let namespace_id = NamespaceId(1);
115 let table = create_test_table(table_id, namespace_id, "named_table");
116
117 catalog.set_table(table_id, CommitVersion(1), Some(table.clone()));
119
120 let found = catalog.find_table_by_name(namespace_id, "named_table", CommitVersion(1));
122 assert_eq!(found, Some(table));
123
124 let found = catalog.find_table_by_name(namespace_id, "wrong_name", CommitVersion(1));
126 assert_eq!(found, None);
127
128 let found = catalog.find_table_by_name(NamespaceId(2), "named_table", CommitVersion(1));
130 assert_eq!(found, None);
131 }
132
133 #[test]
134 fn test_table_rename() {
135 let catalog = MaterializedCatalog::new();
136 let table_id = TableId(1);
137 let namespace_id = NamespaceId(1);
138
139 let table_v1 = create_test_table(table_id, namespace_id, "old_name");
141 catalog.set_table(table_id, CommitVersion(1), Some(table_v1.clone()));
142
143 assert!(catalog.find_table_by_name(namespace_id, "old_name", CommitVersion(1)).is_some());
145 assert!(catalog.find_table_by_name(namespace_id, "new_name", CommitVersion(1)).is_none());
146
147 let mut table_v2 = table_v1.clone();
149 table_v2.name = "new_name".to_string();
150 catalog.set_table(table_id, CommitVersion(2), Some(table_v2.clone()));
151
152 assert!(catalog.find_table_by_name(namespace_id, "old_name", CommitVersion(2)).is_none());
154
155 assert_eq!(
157 catalog.find_table_by_name(namespace_id, "new_name", CommitVersion(2)),
158 Some(table_v2.clone())
159 );
160
161 assert_eq!(catalog.find_table(table_id, CommitVersion(1)), Some(table_v1));
163
164 assert_eq!(catalog.find_table(table_id, CommitVersion(2)), Some(table_v2));
166 }
167
168 #[test]
169 fn test_table_move_between_namespaces() {
170 let catalog = MaterializedCatalog::new();
171 let table_id = TableId(1);
172 let namespace1 = NamespaceId(1);
173 let namespace2 = NamespaceId(2);
174
175 let table_v1 = create_test_table(table_id, namespace1, "movable_table");
177 catalog.set_table(table_id, CommitVersion(1), Some(table_v1.clone()));
178
179 assert!(catalog.find_table_by_name(namespace1, "movable_table", CommitVersion(1)).is_some());
181 assert!(catalog.find_table_by_name(namespace2, "movable_table", CommitVersion(1)).is_none());
182
183 let mut table_v2 = table_v1.clone();
185 table_v2.namespace = namespace2;
186 catalog.set_table(table_id, CommitVersion(2), Some(table_v2.clone()));
187
188 assert!(catalog.find_table_by_name(namespace1, "movable_table", CommitVersion(2)).is_none());
190
191 assert!(catalog.find_table_by_name(namespace2, "movable_table", CommitVersion(2)).is_some());
193 }
194
195 #[test]
196 fn test_table_deletion() {
197 let catalog = MaterializedCatalog::new();
198 let table_id = TableId(1);
199 let namespace_id = NamespaceId(1);
200
201 let table = create_test_table(table_id, namespace_id, "deletable_table");
203 catalog.set_table(table_id, CommitVersion(1), Some(table.clone()));
204
205 assert_eq!(catalog.find_table(table_id, CommitVersion(1)), Some(table.clone()));
207 assert!(catalog.find_table_by_name(namespace_id, "deletable_table", CommitVersion(1)).is_some());
208
209 catalog.set_table(table_id, CommitVersion(2), None);
211
212 assert_eq!(catalog.find_table(table_id, CommitVersion(2)), None);
214 assert!(catalog.find_table_by_name(namespace_id, "deletable_table", CommitVersion(2)).is_none());
215
216 assert_eq!(catalog.find_table(table_id, CommitVersion(1)), Some(table));
218 }
219
220 #[test]
221 fn test_multiple_tables_in_namespace() {
222 let catalog = MaterializedCatalog::new();
223 let namespace_id = NamespaceId(1);
224
225 let table1 = create_test_table(TableId(1), namespace_id, "table1");
226 let table2 = create_test_table(TableId(2), namespace_id, "table2");
227 let table3 = create_test_table(TableId(3), namespace_id, "table3");
228
229 catalog.set_table(TableId(1), CommitVersion(1), Some(table1.clone()));
231 catalog.set_table(TableId(2), CommitVersion(1), Some(table2.clone()));
232 catalog.set_table(TableId(3), CommitVersion(1), Some(table3.clone()));
233
234 assert_eq!(catalog.find_table_by_name(namespace_id, "table1", CommitVersion(1)), Some(table1));
236 assert_eq!(catalog.find_table_by_name(namespace_id, "table2", CommitVersion(1)), Some(table2));
237 assert_eq!(catalog.find_table_by_name(namespace_id, "table3", CommitVersion(1)), Some(table3));
238 }
239
240 #[test]
241 fn test_table_versioning() {
242 let catalog = MaterializedCatalog::new();
243 let table_id = TableId(1);
244 let namespace_id = NamespaceId(1);
245
246 let table_v1 = create_test_table(table_id, namespace_id, "table_v1");
248 let mut table_v2 = table_v1.clone();
249 table_v2.name = "table_v2".to_string();
250 let mut table_v3 = table_v2.clone();
251 table_v3.name = "table_v3".to_string();
252
253 catalog.set_table(table_id, CommitVersion(10), Some(table_v1.clone()));
255 catalog.set_table(table_id, CommitVersion(20), Some(table_v2.clone()));
256 catalog.set_table(table_id, CommitVersion(30), Some(table_v3.clone()));
257
258 assert_eq!(catalog.find_table(table_id, CommitVersion(5)), None);
260 assert_eq!(catalog.find_table(table_id, CommitVersion(10)), Some(table_v1.clone()));
261 assert_eq!(catalog.find_table(table_id, CommitVersion(15)), Some(table_v1));
262 assert_eq!(catalog.find_table(table_id, CommitVersion(20)), Some(table_v2.clone()));
263 assert_eq!(catalog.find_table(table_id, CommitVersion(25)), Some(table_v2));
264 assert_eq!(catalog.find_table(table_id, CommitVersion(30)), Some(table_v3.clone()));
265 assert_eq!(catalog.find_table(table_id, CommitVersion(100)), Some(table_v3));
266 }
267}