reifydb_catalog/transaction/
namespace.rs1use reifydb_core::{
5 interface::{
6 CommandTransaction, NamespaceDef, NamespaceId, QueryTransaction, TransactionalChanges,
7 TransactionalNamespaceChanges,
8 interceptor::{NamespaceDefInterceptor, WithInterceptors},
9 },
10 log_warn,
11};
12use reifydb_type::{
13 IntoFragment,
14 diagnostic::catalog::{namespace_already_exists, namespace_not_found},
15 error, internal, return_error,
16};
17
18use crate::{CatalogStore, store::namespace::NamespaceToCreate, transaction::MaterializedCatalogTransaction};
19
20pub trait CatalogNamespaceCommandOperations {
21 fn create_namespace(&mut self, to_create: NamespaceToCreate) -> crate::Result<NamespaceDef>;
22
23 }
29
30pub trait CatalogTrackNamespaceChangeOperations {
31 fn track_namespace_def_created(&mut self, namespace: NamespaceDef) -> crate::Result<()>;
32
33 fn track_namespace_def_updated(&mut self, pre: NamespaceDef, post: NamespaceDef) -> crate::Result<()>;
34
35 fn track_namespace_def_deleted(&mut self, namespace: NamespaceDef) -> crate::Result<()>;
36}
37
38pub trait CatalogNamespaceQueryOperations {
39 fn find_namespace(&mut self, id: NamespaceId) -> crate::Result<Option<NamespaceDef>>;
40
41 fn find_namespace_by_name<'a>(&mut self, name: impl IntoFragment<'a>) -> crate::Result<Option<NamespaceDef>>;
42
43 fn get_namespace(&mut self, id: NamespaceId) -> crate::Result<NamespaceDef>;
44
45 fn get_namespace_by_name<'a>(&mut self, name: impl IntoFragment<'a>) -> crate::Result<NamespaceDef>;
46}
47
48impl<
49 CT: CommandTransaction
50 + MaterializedCatalogTransaction
51 + CatalogTrackNamespaceChangeOperations
52 + WithInterceptors<CT>
53 + TransactionalChanges,
54> CatalogNamespaceCommandOperations for CT
55{
56 fn create_namespace(&mut self, to_create: NamespaceToCreate) -> reifydb_core::Result<NamespaceDef> {
57 if let Some(namespace) = self.find_namespace_by_name(&to_create.name)? {
58 return_error!(namespace_already_exists(to_create.namespace_fragment, &namespace.name));
59 }
60 let result = CatalogStore::create_namespace(self, to_create)?;
61 self.track_namespace_def_created(result.clone())?;
62 NamespaceDefInterceptor::post_create(self, &result)?;
63 Ok(result)
64 }
65}
66
67impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges> CatalogNamespaceQueryOperations
68 for QT
69{
70 fn find_namespace(&mut self, id: NamespaceId) -> reifydb_core::Result<Option<NamespaceDef>> {
71 if let Some(namespace) = TransactionalNamespaceChanges::find_namespace(self, id) {
73 return Ok(Some(namespace.clone()));
74 }
75
76 if TransactionalNamespaceChanges::is_namespace_deleted(self, id) {
79 return Ok(None);
80 }
81
82 if let Some(namespace) = self.catalog().find_namespace(id, self.version()) {
84 return Ok(Some(namespace));
85 }
86
87 if let Some(namespace) = CatalogStore::find_namespace(self, id)? {
89 log_warn!("Namespace with ID {:?} found in storage but not in MaterializedCatalog", id);
90 return Ok(Some(namespace));
91 }
92
93 Ok(None)
94 }
95
96 fn find_namespace_by_name<'a>(
97 &mut self,
98 name: impl IntoFragment<'a>,
99 ) -> reifydb_core::Result<Option<NamespaceDef>> {
100 let name = name.into_fragment();
101
102 if let Some(namespace) = TransactionalNamespaceChanges::find_namespace_by_name(self, name.as_borrowed())
105 {
106 return Ok(Some(namespace.clone()));
107 }
108
109 if TransactionalNamespaceChanges::is_namespace_deleted_by_name(self, name.as_borrowed()) {
112 return Ok(None);
113 }
114
115 if let Some(namespace) = self.catalog().find_namespace_by_name(name.text(), self.version()) {
117 return Ok(Some(namespace));
118 }
119
120 if let Some(namespace) = CatalogStore::find_namespace_by_name(self, name.text())? {
122 log_warn!("Namespace '{}' found in storage but not in MaterializedCatalog", name.text());
123 return Ok(Some(namespace));
124 }
125
126 Ok(None)
127 }
128
129 fn get_namespace(&mut self, id: NamespaceId) -> reifydb_core::Result<NamespaceDef> {
130 self.find_namespace(id)?.ok_or_else(|| {
131 error!(internal!(
132 "Namespace with ID {} not found in catalog. This indicates a critical catalog inconsistency.",
133 id
134 ))
135 })
136 }
137
138 fn get_namespace_by_name<'a>(&mut self, name: impl IntoFragment<'a>) -> reifydb_core::Result<NamespaceDef> {
139 let name = name.into_fragment();
140 self.find_namespace_by_name(name.as_borrowed())?
141 .ok_or_else(|| error!(namespace_not_found(name.as_borrowed(), name.text())))
142 }
143}