reifydb_catalog/transaction/
namespace.rs1use async_trait::async_trait;
5use reifydb_core::interface::{
6 CommandTransaction, NamespaceDef, NamespaceId, QueryTransaction, TransactionalChanges,
7 TransactionalNamespaceChanges,
8 interceptor::{NamespaceDefInterceptor, WithInterceptors},
9};
10use reifydb_type::{
11 Fragment,
12 diagnostic::catalog::{namespace_already_exists, namespace_not_found},
13 error, internal, return_error,
14};
15use tracing::{instrument, warn};
16
17use crate::{CatalogStore, store::namespace::NamespaceToCreate, transaction::MaterializedCatalogTransaction};
18
19#[async_trait]
20pub trait CatalogNamespaceCommandOperations: Send {
21 async fn create_namespace(&mut self, to_create: NamespaceToCreate) -> crate::Result<NamespaceDef>;
22
23 }
29
30pub trait CatalogTrackNamespaceChangeOperations {
31 fn track_namespace_def_created(&mut self, namespace: NamespaceDef) -> crate::Result<()>;
32
33 fn track_namespace_def_updated(&mut self, pre: NamespaceDef, post: NamespaceDef) -> crate::Result<()>;
34
35 fn track_namespace_def_deleted(&mut self, namespace: NamespaceDef) -> crate::Result<()>;
36}
37
38#[async_trait]
39pub trait CatalogNamespaceQueryOperations: Send {
40 async fn find_namespace(&mut self, id: NamespaceId) -> crate::Result<Option<NamespaceDef>>;
41
42 async fn find_namespace_by_name(&mut self, name: &str) -> crate::Result<Option<NamespaceDef>>;
43
44 async fn get_namespace(&mut self, id: NamespaceId) -> crate::Result<NamespaceDef>;
45
46 async fn get_namespace_by_name(&mut self, name: impl Into<Fragment> + Send) -> crate::Result<NamespaceDef>;
47}
48
49#[async_trait]
50impl<
51 CT: CommandTransaction
52 + MaterializedCatalogTransaction
53 + CatalogTrackNamespaceChangeOperations
54 + WithInterceptors<CT>
55 + TransactionalChanges
56 + Send
57 + 'static,
58> CatalogNamespaceCommandOperations for CT
59{
60 #[instrument(name = "catalog::namespace::create", level = "debug", skip(self, to_create))]
61 async fn create_namespace(&mut self, to_create: NamespaceToCreate) -> reifydb_core::Result<NamespaceDef> {
62 if let Some(namespace) = self.find_namespace_by_name(to_create.name.as_str()).await? {
63 return_error!(namespace_already_exists(
64 to_create.namespace_fragment.unwrap_or_else(|| Fragment::None),
65 &namespace.name
66 ));
67 }
68 let result = CatalogStore::create_namespace(self, to_create).await?;
69 self.track_namespace_def_created(result.clone())?;
70 NamespaceDefInterceptor::post_create(self, &result).await?;
71 Ok(result)
72 }
73}
74
75#[async_trait]
76impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges + Send + 'static>
77 CatalogNamespaceQueryOperations for QT
78{
79 #[instrument(name = "catalog::namespace::find", level = "trace", skip(self))]
80 async fn find_namespace(&mut self, id: NamespaceId) -> reifydb_core::Result<Option<NamespaceDef>> {
81 if let Some(namespace) = TransactionalNamespaceChanges::find_namespace(self, id) {
83 return Ok(Some(namespace.clone()));
84 }
85
86 if TransactionalNamespaceChanges::is_namespace_deleted(self, id) {
89 return Ok(None);
90 }
91
92 if let Some(namespace) = self.catalog().find_namespace(id, self.version()) {
94 return Ok(Some(namespace));
95 }
96
97 if let Some(namespace) = CatalogStore::find_namespace(self, id).await? {
99 warn!("Namespace with ID {:?} found in storage but not in MaterializedCatalog", id);
100 return Ok(Some(namespace));
101 }
102
103 Ok(None)
104 }
105
106 #[instrument(name = "catalog::namespace::find_by_name", level = "trace", skip(self, name))]
107 async fn find_namespace_by_name(&mut self, name: &str) -> reifydb_core::Result<Option<NamespaceDef>> {
108 if let Some(namespace) = TransactionalNamespaceChanges::find_namespace_by_name(self, name) {
111 return Ok(Some(namespace.clone()));
112 }
113
114 if TransactionalNamespaceChanges::is_namespace_deleted_by_name(self, name) {
117 return Ok(None);
118 }
119
120 if let Some(namespace) = self.catalog().find_namespace_by_name(name, self.version()) {
122 return Ok(Some(namespace));
123 }
124
125 if let Some(namespace) = CatalogStore::find_namespace_by_name(self, name).await? {
127 warn!("Namespace '{}' found in storage but not in MaterializedCatalog", name);
128 return Ok(Some(namespace));
129 }
130
131 Ok(None)
132 }
133
134 #[instrument(name = "catalog::namespace::get", level = "trace", skip(self))]
135 async fn get_namespace(&mut self, id: NamespaceId) -> reifydb_core::Result<NamespaceDef> {
136 self.find_namespace(id).await?.ok_or_else(|| {
137 error!(internal!(
138 "Namespace with ID {} not found in catalog. This indicates a critical catalog inconsistency.",
139 id
140 ))
141 })
142 }
143
144 #[instrument(name = "catalog::namespace::get_by_name", level = "trace", skip(self, name))]
145 async fn get_namespace_by_name(
146 &mut self,
147 name: impl Into<Fragment> + Send,
148 ) -> reifydb_core::Result<NamespaceDef> {
149 let name = name.into();
150 self.find_namespace_by_name(name.text())
151 .await?
152 .ok_or_else(|| error!(namespace_not_found(name.clone(), name.text())))
153 }
154}