reifydb_engine/transaction/catalog/
namespace.rs1use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackNamespaceChangeOperations;
6use reifydb_core::interface::{
7 Change, NamespaceDef, NamespaceId, OperationType, OperationType::Delete, TransactionalNamespaceChanges,
8};
9
10use crate::{StandardCommandTransaction, StandardQueryTransaction};
11
12impl CatalogTrackNamespaceChangeOperations for StandardCommandTransaction {
13 fn track_namespace_def_created(&mut self, namespace: NamespaceDef) -> reifydb_core::Result<()> {
14 let change = Change {
15 pre: None,
16 post: Some(namespace),
17 op: Create,
18 };
19 self.changes.add_namespace_def_change(change);
20 Ok(())
21 }
22
23 fn track_namespace_def_updated(&mut self, pre: NamespaceDef, post: NamespaceDef) -> reifydb_core::Result<()> {
24 let change = Change {
25 pre: Some(pre),
26 post: Some(post),
27 op: Update,
28 };
29 self.changes.add_namespace_def_change(change);
30 Ok(())
31 }
32
33 fn track_namespace_def_deleted(&mut self, namespace: NamespaceDef) -> reifydb_core::Result<()> {
34 let change = Change {
35 pre: Some(namespace),
36 post: None,
37 op: Delete,
38 };
39 self.changes.add_namespace_def_change(change);
40 Ok(())
41 }
42}
43
44impl TransactionalNamespaceChanges for StandardCommandTransaction {
45 fn find_namespace(&self, id: NamespaceId) -> Option<&NamespaceDef> {
46 for change in self.changes.namespace_def.iter().rev() {
48 if let Some(namespace) = &change.post {
49 if namespace.id == id {
50 return Some(namespace);
51 }
52 } else if let Some(namespace) = &change.pre {
53 if namespace.id == id && change.op == Delete {
54 return None;
56 }
57 }
58 }
59 None
60 }
61
62 fn find_namespace_by_name(&self, name: &str) -> Option<&NamespaceDef> {
63 self.changes
64 .namespace_def
65 .iter()
66 .rev()
67 .find_map(|change| change.post.as_ref().filter(|s| s.name == name))
68 }
69
70 fn is_namespace_deleted(&self, id: NamespaceId) -> bool {
71 self.changes
72 .namespace_def
73 .iter()
74 .rev()
75 .any(|change| change.op == Delete && change.pre.as_ref().map(|s| s.id) == Some(id))
76 }
77
78 fn is_namespace_deleted_by_name(&self, name: &str) -> bool {
79 self.changes
80 .namespace_def
81 .iter()
82 .rev()
83 .any(|change| change.op == Delete && change.pre.as_ref().map(|s| s.name.as_str()) == Some(name))
84 }
85}
86
87impl TransactionalNamespaceChanges for StandardQueryTransaction {
88 fn find_namespace(&self, _id: NamespaceId) -> Option<&NamespaceDef> {
89 None
90 }
91
92 fn find_namespace_by_name(&self, _name: &str) -> Option<&NamespaceDef> {
93 None
94 }
95
96 fn is_namespace_deleted(&self, _id: NamespaceId) -> bool {
97 false
98 }
99
100 fn is_namespace_deleted_by_name(&self, _name: &str) -> bool {
101 false
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use reifydb_catalog::transaction::CatalogTrackNamespaceChangeOperations;
108 use reifydb_core::interface::{
109 NamespaceDef, NamespaceId, Operation,
110 OperationType::{Create, Delete, Update},
111 };
112
113 use crate::test_utils::create_test_command_transaction;
114
115 async fn test_namespace_def(id: u64, name: &str) -> NamespaceDef {
117 NamespaceDef {
118 id: NamespaceId(id),
119 name: name.to_string(),
120 }
121 }
122
123 mod track_namespace_def_created {
124 use super::*;
125
126 #[tokio::test]
127 async fn test_successful_creation() {
128 let mut txn = create_test_command_transaction().await;
129 let namespace = test_namespace_def(1, "test_namespace").await;
130
131 let result = txn.track_namespace_def_created(namespace.clone());
132 assert!(result.is_ok());
133
134 assert_eq!(txn.changes.namespace_def.len(), 1);
136 let change = &txn.changes.namespace_def[0];
137 assert!(change.pre.is_none());
138 assert_eq!(change.post.as_ref().unwrap().name, "test_namespace");
139 assert_eq!(change.op, Create);
140
141 assert_eq!(txn.changes.log.len(), 1);
143 match &txn.changes.log[0] {
144 Operation::Namespace {
145 id,
146 op,
147 } if *id == namespace.id && *op == Create => {}
148 _ => panic!("Expected Namespace operation with Create"),
149 }
150 }
151 }
152
153 mod track_namespace_def_updated {
154 use super::*;
155
156 #[tokio::test]
157 async fn test_multiple_updates_no_coalescing() {
158 let mut txn = create_test_command_transaction().await;
159 let namespace_v1 = test_namespace_def(1, "namespace_v1").await;
160 let namespace_v2 = test_namespace_def(1, "namespace_v2").await;
161 let namespace_v3 = test_namespace_def(1, "namespace_v3").await;
162
163 txn.track_namespace_def_updated(namespace_v1.clone(), namespace_v2.clone()).unwrap();
165
166 assert_eq!(txn.changes.namespace_def.len(), 1);
168 assert_eq!(txn.changes.namespace_def[0].pre.as_ref().unwrap().name, "namespace_v1");
169 assert_eq!(txn.changes.namespace_def[0].post.as_ref().unwrap().name, "namespace_v2");
170 assert_eq!(txn.changes.namespace_def[0].op, Update);
171
172 txn.track_namespace_def_updated(namespace_v2, namespace_v3.clone()).unwrap();
174
175 assert_eq!(txn.changes.namespace_def.len(), 2);
177
178 assert_eq!(txn.changes.namespace_def[0].pre.as_ref().unwrap().name, "namespace_v1");
180
181 assert_eq!(txn.changes.namespace_def[1].pre.as_ref().unwrap().name, "namespace_v2");
183 assert_eq!(txn.changes.namespace_def[1].post.as_ref().unwrap().name, "namespace_v3");
184
185 assert_eq!(txn.changes.log.len(), 2);
187 }
188
189 #[tokio::test]
190 async fn test_create_then_update_no_coalescing() {
191 let mut txn = create_test_command_transaction().await;
192 let namespace_v1 = test_namespace_def(1, "namespace_v1").await;
193 let namespace_v2 = test_namespace_def(1, "namespace_v2").await;
194
195 txn.track_namespace_def_created(namespace_v1.clone()).unwrap();
197 assert_eq!(txn.changes.namespace_def.len(), 1);
198 assert_eq!(txn.changes.namespace_def[0].op, Create);
199
200 txn.track_namespace_def_updated(namespace_v1, namespace_v2.clone()).unwrap();
202
203 assert_eq!(txn.changes.namespace_def.len(), 2);
205
206 assert_eq!(txn.changes.namespace_def[0].op, Create);
208 assert_eq!(txn.changes.namespace_def[0].post.as_ref().unwrap().name, "namespace_v1");
209
210 assert_eq!(txn.changes.namespace_def[1].op, Update);
212 }
213
214 #[tokio::test]
215 async fn test_normal_update() {
216 let mut txn = create_test_command_transaction().await;
217 let namespace_v1 = test_namespace_def(1, "namespace_v1").await;
218 let namespace_v2 = test_namespace_def(1, "namespace_v2").await;
219
220 let result = txn.track_namespace_def_updated(namespace_v1.clone(), namespace_v2.clone());
221 assert!(result.is_ok());
222
223 assert_eq!(txn.changes.namespace_def.len(), 1);
225 let change = &txn.changes.namespace_def[0];
226 assert_eq!(change.pre.as_ref().unwrap().name, "namespace_v1");
227 assert_eq!(change.post.as_ref().unwrap().name, "namespace_v2");
228 assert_eq!(change.op, Update);
229
230 assert_eq!(txn.changes.log.len(), 1);
232 match &txn.changes.log[0] {
233 Operation::Namespace {
234 id,
235 op,
236 } if *id == NamespaceId(1) && *op == Update => {}
237 _ => panic!("Expected Namespace operation with Update"),
238 }
239 }
240 }
241
242 mod track_namespace_def_deleted {
243 use super::*;
244
245 #[tokio::test]
246 async fn test_delete_after_create_no_coalescing() {
247 let mut txn = create_test_command_transaction().await;
248 let namespace = test_namespace_def(1, "test_namespace").await;
249
250 txn.track_namespace_def_created(namespace.clone()).unwrap();
252 assert_eq!(txn.changes.log.len(), 1);
253 assert_eq!(txn.changes.namespace_def.len(), 1);
254
255 let result = txn.track_namespace_def_deleted(namespace.clone());
257 assert!(result.is_ok());
258
259 assert_eq!(txn.changes.namespace_def.len(), 2);
261
262 assert_eq!(txn.changes.namespace_def[0].op, Create);
264
265 assert_eq!(txn.changes.namespace_def[1].op, Delete);
267 assert_eq!(txn.changes.namespace_def[1].pre.as_ref().unwrap().name, "test_namespace");
268
269 assert_eq!(txn.changes.log.len(), 2);
271 }
272
273 #[tokio::test]
274 async fn test_delete_after_update_no_coalescing() {
275 let mut txn = create_test_command_transaction().await;
276 let namespace_v1 = test_namespace_def(1, "namespace_v1").await;
277 let namespace_v2 = test_namespace_def(1, "namespace_v2").await;
278
279 txn.track_namespace_def_updated(namespace_v1.clone(), namespace_v2.clone()).unwrap();
281 assert_eq!(txn.changes.namespace_def.len(), 1);
282
283 let result = txn.track_namespace_def_deleted(namespace_v2);
285 assert!(result.is_ok());
286
287 assert_eq!(txn.changes.namespace_def.len(), 2);
289
290 assert_eq!(txn.changes.namespace_def[0].op, Update);
292
293 assert_eq!(txn.changes.namespace_def[1].op, Delete);
295
296 assert_eq!(txn.changes.log.len(), 2);
298 }
299
300 #[tokio::test]
301 async fn test_normal_delete() {
302 let mut txn = create_test_command_transaction().await;
303 let namespace = test_namespace_def(1, "test_namespace").await;
304
305 let result = txn.track_namespace_def_deleted(namespace.clone());
306 assert!(result.is_ok());
307
308 assert_eq!(txn.changes.namespace_def.len(), 1);
310 let change = &txn.changes.namespace_def[0];
311 assert_eq!(change.pre.as_ref().unwrap().name, "test_namespace");
312 assert!(change.post.is_none());
313 assert_eq!(change.op, Delete);
314
315 assert_eq!(txn.changes.log.len(), 1);
317 match &txn.changes.log[0] {
318 Operation::Namespace {
319 id,
320 op,
321 } if *id == namespace.id && *op == Delete => {}
322 _ => panic!("Expected Namespace operation with Delete"),
323 }
324 }
325 }
326}