reifydb_engine/transaction/catalog/
namespace.rs1use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackNamespaceChangeOperations;
6use reifydb_core::interface::{
7 Change, NamespaceDef, NamespaceId, OperationType, OperationType::Delete, TransactionalNamespaceChanges,
8};
9use reifydb_type::IntoFragment;
10
11use crate::{StandardCommandTransaction, StandardQueryTransaction};
12
13impl CatalogTrackNamespaceChangeOperations for StandardCommandTransaction {
14 fn track_namespace_def_created(&mut self, namespace: NamespaceDef) -> reifydb_core::Result<()> {
15 let change = Change {
16 pre: None,
17 post: Some(namespace),
18 op: Create,
19 };
20 self.changes.add_namespace_def_change(change);
21 Ok(())
22 }
23
24 fn track_namespace_def_updated(&mut self, pre: NamespaceDef, post: NamespaceDef) -> reifydb_core::Result<()> {
25 let change = Change {
26 pre: Some(pre),
27 post: Some(post),
28 op: Update,
29 };
30 self.changes.add_namespace_def_change(change);
31 Ok(())
32 }
33
34 fn track_namespace_def_deleted(&mut self, namespace: NamespaceDef) -> reifydb_core::Result<()> {
35 let change = Change {
36 pre: Some(namespace),
37 post: None,
38 op: Delete,
39 };
40 self.changes.add_namespace_def_change(change);
41 Ok(())
42 }
43}
44
45impl TransactionalNamespaceChanges for StandardCommandTransaction {
46 fn find_namespace(&self, id: NamespaceId) -> Option<&NamespaceDef> {
47 for change in self.changes.namespace_def.iter().rev() {
49 if let Some(namespace) = &change.post {
50 if namespace.id == id {
51 return Some(namespace);
52 }
53 } else if let Some(namespace) = &change.pre {
54 if namespace.id == id && change.op == Delete {
55 return None;
57 }
58 }
59 }
60 None
61 }
62
63 fn find_namespace_by_name<'a>(&self, name: impl IntoFragment<'a>) -> Option<&NamespaceDef> {
64 let name = name.into_fragment();
65 self.changes
66 .namespace_def
67 .iter()
68 .rev()
69 .find_map(|change| change.post.as_ref().filter(|s| s.name == name.text()))
70 }
71
72 fn is_namespace_deleted(&self, id: NamespaceId) -> bool {
73 self.changes
74 .namespace_def
75 .iter()
76 .rev()
77 .any(|change| change.op == Delete && change.pre.as_ref().map(|s| s.id) == Some(id))
78 }
79
80 fn is_namespace_deleted_by_name<'a>(&self, name: impl IntoFragment<'a>) -> bool {
81 let name = name.into_fragment();
82 self.changes.namespace_def.iter().rev().any(|change| {
83 change.op == Delete && change.pre.as_ref().map(|s| s.name.as_str()) == Some(name.text())
84 })
85 }
86}
87
88impl TransactionalNamespaceChanges for StandardQueryTransaction {
89 fn find_namespace(&self, _id: NamespaceId) -> Option<&NamespaceDef> {
90 None
91 }
92
93 fn find_namespace_by_name<'a>(&self, _name: impl IntoFragment<'a>) -> Option<&NamespaceDef> {
94 None
95 }
96
97 fn is_namespace_deleted(&self, _id: NamespaceId) -> bool {
98 false
99 }
100
101 fn is_namespace_deleted_by_name<'a>(&self, _name: impl IntoFragment<'a>) -> bool {
102 false
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use reifydb_catalog::transaction::CatalogTrackNamespaceChangeOperations;
109 use reifydb_core::interface::{
110 NamespaceDef, NamespaceId, Operation,
111 OperationType::{Create, Delete, Update},
112 };
113
114 use crate::test_utils::create_test_command_transaction;
115
116 fn test_namespace_def(id: u64, name: &str) -> NamespaceDef {
118 NamespaceDef {
119 id: NamespaceId(id),
120 name: name.to_string(),
121 }
122 }
123
124 mod track_namespace_def_created {
125 use super::*;
126
127 #[test]
128 fn test_successful_creation() {
129 let mut txn = create_test_command_transaction();
130 let namespace = test_namespace_def(1, "test_namespace");
131
132 let result = txn.track_namespace_def_created(namespace.clone());
133 assert!(result.is_ok());
134
135 assert_eq!(txn.changes.namespace_def.len(), 1);
137 let change = &txn.changes.namespace_def[0];
138 assert!(change.pre.is_none());
139 assert_eq!(change.post.as_ref().unwrap().name, "test_namespace");
140 assert_eq!(change.op, Create);
141
142 assert_eq!(txn.changes.log.len(), 1);
144 match &txn.changes.log[0] {
145 Operation::Namespace {
146 id,
147 op,
148 } if *id == namespace.id && *op == Create => {}
149 _ => panic!("Expected Namespace operation with Create"),
150 }
151 }
152 }
153
154 mod track_namespace_def_updated {
155 use super::*;
156
157 #[test]
158 fn test_multiple_updates_no_coalescing() {
159 let mut txn = create_test_command_transaction();
160 let namespace_v1 = test_namespace_def(1, "namespace_v1");
161 let namespace_v2 = test_namespace_def(1, "namespace_v2");
162 let namespace_v3 = test_namespace_def(1, "namespace_v3");
163
164 txn.track_namespace_def_updated(namespace_v1.clone(), namespace_v2.clone()).unwrap();
166
167 assert_eq!(txn.changes.namespace_def.len(), 1);
169 assert_eq!(txn.changes.namespace_def[0].pre.as_ref().unwrap().name, "namespace_v1");
170 assert_eq!(txn.changes.namespace_def[0].post.as_ref().unwrap().name, "namespace_v2");
171 assert_eq!(txn.changes.namespace_def[0].op, Update);
172
173 txn.track_namespace_def_updated(namespace_v2, namespace_v3.clone()).unwrap();
175
176 assert_eq!(txn.changes.namespace_def.len(), 2);
178
179 assert_eq!(txn.changes.namespace_def[0].pre.as_ref().unwrap().name, "namespace_v1");
181
182 assert_eq!(txn.changes.namespace_def[1].pre.as_ref().unwrap().name, "namespace_v2");
184 assert_eq!(txn.changes.namespace_def[1].post.as_ref().unwrap().name, "namespace_v3");
185
186 assert_eq!(txn.changes.log.len(), 2);
188 }
189
190 #[test]
191 fn test_create_then_update_no_coalescing() {
192 let mut txn = create_test_command_transaction();
193 let namespace_v1 = test_namespace_def(1, "namespace_v1");
194 let namespace_v2 = test_namespace_def(1, "namespace_v2");
195
196 txn.track_namespace_def_created(namespace_v1.clone()).unwrap();
198 assert_eq!(txn.changes.namespace_def.len(), 1);
199 assert_eq!(txn.changes.namespace_def[0].op, Create);
200
201 txn.track_namespace_def_updated(namespace_v1, namespace_v2.clone()).unwrap();
203
204 assert_eq!(txn.changes.namespace_def.len(), 2);
206
207 assert_eq!(txn.changes.namespace_def[0].op, Create);
209 assert_eq!(txn.changes.namespace_def[0].post.as_ref().unwrap().name, "namespace_v1");
210
211 assert_eq!(txn.changes.namespace_def[1].op, Update);
213 }
214
215 #[test]
216 fn test_normal_update() {
217 let mut txn = create_test_command_transaction();
218 let namespace_v1 = test_namespace_def(1, "namespace_v1");
219 let namespace_v2 = test_namespace_def(1, "namespace_v2");
220
221 let result = txn.track_namespace_def_updated(namespace_v1.clone(), namespace_v2.clone());
222 assert!(result.is_ok());
223
224 assert_eq!(txn.changes.namespace_def.len(), 1);
226 let change = &txn.changes.namespace_def[0];
227 assert_eq!(change.pre.as_ref().unwrap().name, "namespace_v1");
228 assert_eq!(change.post.as_ref().unwrap().name, "namespace_v2");
229 assert_eq!(change.op, Update);
230
231 assert_eq!(txn.changes.log.len(), 1);
233 match &txn.changes.log[0] {
234 Operation::Namespace {
235 id,
236 op,
237 } if *id == NamespaceId(1) && *op == Update => {}
238 _ => panic!("Expected Namespace operation with Update"),
239 }
240 }
241 }
242
243 mod track_namespace_def_deleted {
244 use super::*;
245
246 #[test]
247 fn test_delete_after_create_no_coalescing() {
248 let mut txn = create_test_command_transaction();
249 let namespace = test_namespace_def(1, "test_namespace");
250
251 txn.track_namespace_def_created(namespace.clone()).unwrap();
253 assert_eq!(txn.changes.log.len(), 1);
254 assert_eq!(txn.changes.namespace_def.len(), 1);
255
256 let result = txn.track_namespace_def_deleted(namespace.clone());
258 assert!(result.is_ok());
259
260 assert_eq!(txn.changes.namespace_def.len(), 2);
262
263 assert_eq!(txn.changes.namespace_def[0].op, Create);
265
266 assert_eq!(txn.changes.namespace_def[1].op, Delete);
268 assert_eq!(txn.changes.namespace_def[1].pre.as_ref().unwrap().name, "test_namespace");
269
270 assert_eq!(txn.changes.log.len(), 2);
272 }
273
274 #[test]
275 fn test_delete_after_update_no_coalescing() {
276 let mut txn = create_test_command_transaction();
277 let namespace_v1 = test_namespace_def(1, "namespace_v1");
278 let namespace_v2 = test_namespace_def(1, "namespace_v2");
279
280 txn.track_namespace_def_updated(namespace_v1.clone(), namespace_v2.clone()).unwrap();
282 assert_eq!(txn.changes.namespace_def.len(), 1);
283
284 let result = txn.track_namespace_def_deleted(namespace_v2);
286 assert!(result.is_ok());
287
288 assert_eq!(txn.changes.namespace_def.len(), 2);
290
291 assert_eq!(txn.changes.namespace_def[0].op, Update);
293
294 assert_eq!(txn.changes.namespace_def[1].op, Delete);
296
297 assert_eq!(txn.changes.log.len(), 2);
299 }
300
301 #[test]
302 fn test_normal_delete() {
303 let mut txn = create_test_command_transaction();
304 let namespace = test_namespace_def(1, "test_namespace");
305
306 let result = txn.track_namespace_def_deleted(namespace.clone());
307 assert!(result.is_ok());
308
309 assert_eq!(txn.changes.namespace_def.len(), 1);
311 let change = &txn.changes.namespace_def[0];
312 assert_eq!(change.pre.as_ref().unwrap().name, "test_namespace");
313 assert!(change.post.is_none());
314 assert_eq!(change.op, Delete);
315
316 assert_eq!(txn.changes.log.len(), 1);
318 match &txn.changes.log[0] {
319 Operation::Namespace {
320 id,
321 op,
322 } if *id == namespace.id && *op == Delete => {}
323 _ => panic!("Expected Namespace operation with Delete"),
324 }
325 }
326 }
327}