reifydb_catalog/transaction/
dictionary.rs1use async_trait::async_trait;
5use reifydb_core::interface::{
6 CommandTransaction, DictionaryDef, DictionaryId, NamespaceId, QueryTransaction, TransactionalChanges,
7 TransactionalDictionaryChanges, interceptor::WithInterceptors,
8};
9use reifydb_type::{
10 Fragment,
11 diagnostic::catalog::{dictionary_already_exists, dictionary_not_found},
12 error, internal, return_error,
13};
14use tracing::{instrument, warn};
15
16use crate::{
17 CatalogNamespaceQueryOperations, CatalogStore, store::dictionary::create::DictionaryToCreate,
18 transaction::MaterializedCatalogTransaction,
19};
20
21#[async_trait]
22pub trait CatalogDictionaryCommandOperations: Send {
23 async fn create_dictionary(&mut self, to_create: DictionaryToCreate) -> crate::Result<DictionaryDef>;
24}
25
26pub trait CatalogTrackDictionaryChangeOperations {
27 fn track_dictionary_def_created(&mut self, dictionary: DictionaryDef) -> crate::Result<()>;
28
29 fn track_dictionary_def_updated(&mut self, pre: DictionaryDef, post: DictionaryDef) -> crate::Result<()>;
30
31 fn track_dictionary_def_deleted(&mut self, dictionary: DictionaryDef) -> crate::Result<()>;
32}
33
34#[async_trait]
35pub trait CatalogDictionaryQueryOperations: CatalogNamespaceQueryOperations + Send {
36 async fn find_dictionary(&mut self, id: DictionaryId) -> crate::Result<Option<DictionaryDef>>;
37
38 async fn find_dictionary_by_name(
39 &mut self,
40 namespace: NamespaceId,
41 name: &str,
42 ) -> crate::Result<Option<DictionaryDef>>;
43
44 async fn get_dictionary(&mut self, id: DictionaryId) -> crate::Result<DictionaryDef>;
45
46 async fn get_dictionary_by_name(
47 &mut self,
48 namespace: NamespaceId,
49 name: impl Into<Fragment> + Send,
50 ) -> crate::Result<DictionaryDef>;
51}
52
53#[async_trait]
54impl<
55 CT: CommandTransaction
56 + MaterializedCatalogTransaction
57 + CatalogTrackDictionaryChangeOperations
58 + WithInterceptors<CT>
59 + TransactionalChanges
60 + Send
61 + 'static,
62> CatalogDictionaryCommandOperations for CT
63{
64 #[instrument(name = "catalog::dictionary::create", level = "debug", skip(self, to_create))]
65 async fn create_dictionary(&mut self, to_create: DictionaryToCreate) -> reifydb_core::Result<DictionaryDef> {
66 if let Some(dictionary) =
67 self.find_dictionary_by_name(to_create.namespace, to_create.dictionary.as_str()).await?
68 {
69 let namespace = self.get_namespace(to_create.namespace).await?;
70 return_error!(dictionary_already_exists(
71 to_create.fragment.unwrap_or_else(|| Fragment::None),
72 &namespace.name,
73 &dictionary.name
74 ));
75 }
76 let result = CatalogStore::create_dictionary(self, to_create).await?;
77 self.track_dictionary_def_created(result.clone())?;
78 Ok(result)
79 }
80}
81
82#[async_trait]
83impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges + Send + 'static>
84 CatalogDictionaryQueryOperations for QT
85{
86 #[instrument(name = "catalog::dictionary::find", level = "trace", skip(self))]
87 async fn find_dictionary(&mut self, id: DictionaryId) -> reifydb_core::Result<Option<DictionaryDef>> {
88 if let Some(dictionary) = TransactionalDictionaryChanges::find_dictionary(self, id) {
91 return Ok(Some(dictionary.clone()));
92 }
93
94 if TransactionalDictionaryChanges::is_dictionary_deleted(self, id) {
97 return Ok(None);
98 }
99
100 if let Some(dictionary) = self.catalog().find_dictionary(id, self.version()) {
102 return Ok(Some(dictionary));
103 }
104
105 if let Some(dictionary) = CatalogStore::find_dictionary(self, id).await? {
107 warn!("Dictionary with ID {:?} found in storage but not in MaterializedCatalog", id);
108 return Ok(Some(dictionary));
109 }
110
111 Ok(None)
112 }
113
114 #[instrument(name = "catalog::dictionary::find_by_name", level = "trace", skip(self, name))]
115 async fn find_dictionary_by_name(
116 &mut self,
117 namespace: NamespaceId,
118 name: &str,
119 ) -> reifydb_core::Result<Option<DictionaryDef>> {
120 if let Some(dictionary) = TransactionalDictionaryChanges::find_dictionary_by_name(self, namespace, name)
123 {
124 return Ok(Some(dictionary.clone()));
125 }
126
127 if TransactionalDictionaryChanges::is_dictionary_deleted_by_name(self, namespace, name) {
130 return Ok(None);
131 }
132
133 if let Some(dictionary) = self.catalog().find_dictionary_by_name(namespace, name, self.version()) {
135 return Ok(Some(dictionary));
136 }
137
138 if let Some(dictionary) = CatalogStore::find_dictionary_by_name(self, namespace, name).await? {
140 warn!(
141 "Dictionary '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
142 name, namespace
143 );
144 return Ok(Some(dictionary));
145 }
146
147 Ok(None)
148 }
149
150 #[instrument(name = "catalog::dictionary::get", level = "trace", skip(self))]
151 async fn get_dictionary(&mut self, id: DictionaryId) -> reifydb_core::Result<DictionaryDef> {
152 self.find_dictionary(id).await?.ok_or_else(|| {
153 error!(internal!(
154 "Dictionary with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
155 id
156 ))
157 })
158 }
159
160 #[instrument(name = "catalog::dictionary::get_by_name", level = "trace", skip(self, name))]
161 async fn get_dictionary_by_name(
162 &mut self,
163 namespace: NamespaceId,
164 name: impl Into<Fragment> + Send,
165 ) -> reifydb_core::Result<DictionaryDef> {
166 let name = name.into();
167
168 let namespace_name = self
170 .find_namespace(namespace)
171 .await?
172 .map(|ns| ns.name)
173 .unwrap_or_else(|| format!("namespace_{}", namespace));
174
175 self.find_dictionary_by_name(namespace, name.text())
176 .await?
177 .ok_or_else(|| error!(dictionary_not_found(name.clone(), &namespace_name, name.text())))
178 }
179}