reifydb_catalog/transaction/
namespace.rs1use reifydb_core::interface::{
5 CommandTransaction, NamespaceDef, NamespaceId, QueryTransaction, TransactionalChanges,
6 TransactionalNamespaceChanges,
7 interceptor::{NamespaceDefInterceptor, WithInterceptors},
8};
9use reifydb_type::{
10 IntoFragment,
11 diagnostic::catalog::{namespace_already_exists, namespace_not_found},
12 error, internal, return_error,
13};
14use tracing::{instrument, warn};
15
16use crate::{CatalogStore, store::namespace::NamespaceToCreate, transaction::MaterializedCatalogTransaction};
17
18pub trait CatalogNamespaceCommandOperations {
19 fn create_namespace(&mut self, to_create: NamespaceToCreate) -> crate::Result<NamespaceDef>;
20
21 }
27
28pub trait CatalogTrackNamespaceChangeOperations {
29 fn track_namespace_def_created(&mut self, namespace: NamespaceDef) -> crate::Result<()>;
30
31 fn track_namespace_def_updated(&mut self, pre: NamespaceDef, post: NamespaceDef) -> crate::Result<()>;
32
33 fn track_namespace_def_deleted(&mut self, namespace: NamespaceDef) -> crate::Result<()>;
34}
35
36pub trait CatalogNamespaceQueryOperations {
37 fn find_namespace(&mut self, id: NamespaceId) -> crate::Result<Option<NamespaceDef>>;
38
39 fn find_namespace_by_name<'a>(&mut self, name: impl IntoFragment<'a>) -> crate::Result<Option<NamespaceDef>>;
40
41 fn get_namespace(&mut self, id: NamespaceId) -> crate::Result<NamespaceDef>;
42
43 fn get_namespace_by_name<'a>(&mut self, name: impl IntoFragment<'a>) -> crate::Result<NamespaceDef>;
44}
45
46impl<
47 CT: CommandTransaction
48 + MaterializedCatalogTransaction
49 + CatalogTrackNamespaceChangeOperations
50 + WithInterceptors<CT>
51 + TransactionalChanges,
52> CatalogNamespaceCommandOperations for CT
53{
54 #[instrument(level = "debug", skip(self, to_create))]
55 fn create_namespace(&mut self, to_create: NamespaceToCreate) -> reifydb_core::Result<NamespaceDef> {
56 if let Some(namespace) = self.find_namespace_by_name(&to_create.name)? {
57 return_error!(namespace_already_exists(to_create.namespace_fragment, &namespace.name));
58 }
59 let result = CatalogStore::create_namespace(self, to_create)?;
60 self.track_namespace_def_created(result.clone())?;
61 NamespaceDefInterceptor::post_create(self, &result)?;
62 Ok(result)
63 }
64}
65
66impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges> CatalogNamespaceQueryOperations
67 for QT
68{
69 #[instrument(level = "trace", skip(self))]
70 fn find_namespace(&mut self, id: NamespaceId) -> reifydb_core::Result<Option<NamespaceDef>> {
71 if let Some(namespace) = TransactionalNamespaceChanges::find_namespace(self, id) {
73 return Ok(Some(namespace.clone()));
74 }
75
76 if TransactionalNamespaceChanges::is_namespace_deleted(self, id) {
79 return Ok(None);
80 }
81
82 if let Some(namespace) = self.catalog().find_namespace(id, self.version()) {
84 return Ok(Some(namespace));
85 }
86
87 if let Some(namespace) = CatalogStore::find_namespace(self, id)? {
89 warn!("Namespace with ID {:?} found in storage but not in MaterializedCatalog", id);
90 return Ok(Some(namespace));
91 }
92
93 Ok(None)
94 }
95
96 #[instrument(level = "trace", skip(self, name))]
97 fn find_namespace_by_name<'a>(
98 &mut self,
99 name: impl IntoFragment<'a>,
100 ) -> reifydb_core::Result<Option<NamespaceDef>> {
101 let name = name.into_fragment();
102
103 if let Some(namespace) = TransactionalNamespaceChanges::find_namespace_by_name(self, name.as_borrowed())
106 {
107 return Ok(Some(namespace.clone()));
108 }
109
110 if TransactionalNamespaceChanges::is_namespace_deleted_by_name(self, name.as_borrowed()) {
113 return Ok(None);
114 }
115
116 if let Some(namespace) = self.catalog().find_namespace_by_name(name.text(), self.version()) {
118 return Ok(Some(namespace));
119 }
120
121 if let Some(namespace) = CatalogStore::find_namespace_by_name(self, name.text())? {
123 warn!("Namespace '{}' found in storage but not in MaterializedCatalog", name.text());
124 return Ok(Some(namespace));
125 }
126
127 Ok(None)
128 }
129
130 #[instrument(level = "trace", skip(self))]
131 fn get_namespace(&mut self, id: NamespaceId) -> reifydb_core::Result<NamespaceDef> {
132 self.find_namespace(id)?.ok_or_else(|| {
133 error!(internal!(
134 "Namespace with ID {} not found in catalog. This indicates a critical catalog inconsistency.",
135 id
136 ))
137 })
138 }
139
140 #[instrument(level = "trace", skip(self, name))]
141 fn get_namespace_by_name<'a>(&mut self, name: impl IntoFragment<'a>) -> reifydb_core::Result<NamespaceDef> {
142 let name = name.into_fragment();
143 self.find_namespace_by_name(name.as_borrowed())?
144 .ok_or_else(|| error!(namespace_not_found(name.as_borrowed(), name.text())))
145 }
146}