reifydb_catalog/transaction/
dictionary.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use async_trait::async_trait;
5use reifydb_core::interface::{
6	CommandTransaction, DictionaryDef, DictionaryId, NamespaceId, QueryTransaction, TransactionalChanges,
7	TransactionalDictionaryChanges, interceptor::WithInterceptors,
8};
9use reifydb_type::{
10	Fragment,
11	diagnostic::catalog::{dictionary_already_exists, dictionary_not_found},
12	error, internal, return_error,
13};
14use tracing::{instrument, warn};
15
16use crate::{
17	CatalogNamespaceQueryOperations, CatalogStore, store::dictionary::create::DictionaryToCreate,
18	transaction::MaterializedCatalogTransaction,
19};
20
21#[async_trait]
22pub trait CatalogDictionaryCommandOperations: Send {
23	async fn create_dictionary(&mut self, to_create: DictionaryToCreate) -> crate::Result<DictionaryDef>;
24}
25
26pub trait CatalogTrackDictionaryChangeOperations {
27	fn track_dictionary_def_created(&mut self, dictionary: DictionaryDef) -> crate::Result<()>;
28
29	fn track_dictionary_def_updated(&mut self, pre: DictionaryDef, post: DictionaryDef) -> crate::Result<()>;
30
31	fn track_dictionary_def_deleted(&mut self, dictionary: DictionaryDef) -> crate::Result<()>;
32}
33
34#[async_trait]
35pub trait CatalogDictionaryQueryOperations: CatalogNamespaceQueryOperations + Send {
36	async fn find_dictionary(&mut self, id: DictionaryId) -> crate::Result<Option<DictionaryDef>>;
37
38	async fn find_dictionary_by_name(
39		&mut self,
40		namespace: NamespaceId,
41		name: &str,
42	) -> crate::Result<Option<DictionaryDef>>;
43
44	async fn get_dictionary(&mut self, id: DictionaryId) -> crate::Result<DictionaryDef>;
45
46	async fn get_dictionary_by_name(
47		&mut self,
48		namespace: NamespaceId,
49		name: impl Into<Fragment> + Send,
50	) -> crate::Result<DictionaryDef>;
51}
52
53#[async_trait]
54impl<
55	CT: CommandTransaction
56		+ MaterializedCatalogTransaction
57		+ CatalogTrackDictionaryChangeOperations
58		+ WithInterceptors<CT>
59		+ TransactionalChanges
60		+ Send
61		+ 'static,
62> CatalogDictionaryCommandOperations for CT
63{
64	#[instrument(name = "catalog::dictionary::create", level = "debug", skip(self, to_create))]
65	async fn create_dictionary(&mut self, to_create: DictionaryToCreate) -> reifydb_core::Result<DictionaryDef> {
66		if let Some(dictionary) =
67			self.find_dictionary_by_name(to_create.namespace, to_create.dictionary.as_str()).await?
68		{
69			let namespace = self.get_namespace(to_create.namespace).await?;
70			return_error!(dictionary_already_exists(
71				to_create.fragment.unwrap_or_else(|| Fragment::None),
72				&namespace.name,
73				&dictionary.name
74			));
75		}
76		let result = CatalogStore::create_dictionary(self, to_create).await?;
77		self.track_dictionary_def_created(result.clone())?;
78		Ok(result)
79	}
80}
81
82#[async_trait]
83impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges + Send + 'static>
84	CatalogDictionaryQueryOperations for QT
85{
86	#[instrument(name = "catalog::dictionary::find", level = "trace", skip(self))]
87	async fn find_dictionary(&mut self, id: DictionaryId) -> reifydb_core::Result<Option<DictionaryDef>> {
88		// 1. Check transactional changes first
89		// nop for QueryTransaction
90		if let Some(dictionary) = TransactionalDictionaryChanges::find_dictionary(self, id) {
91			return Ok(Some(dictionary.clone()));
92		}
93
94		// 2. Check if deleted
95		// nop for QueryTransaction
96		if TransactionalDictionaryChanges::is_dictionary_deleted(self, id) {
97			return Ok(None);
98		}
99
100		// 3. Check MaterializedCatalog
101		if let Some(dictionary) = self.catalog().find_dictionary(id, self.version()) {
102			return Ok(Some(dictionary));
103		}
104
105		// 4. Fall back to storage as defensive measure
106		if let Some(dictionary) = CatalogStore::find_dictionary(self, id).await? {
107			warn!("Dictionary with ID {:?} found in storage but not in MaterializedCatalog", id);
108			return Ok(Some(dictionary));
109		}
110
111		Ok(None)
112	}
113
114	#[instrument(name = "catalog::dictionary::find_by_name", level = "trace", skip(self, name))]
115	async fn find_dictionary_by_name(
116		&mut self,
117		namespace: NamespaceId,
118		name: &str,
119	) -> reifydb_core::Result<Option<DictionaryDef>> {
120		// 1. Check transactional changes first
121		// nop for QueryTransaction
122		if let Some(dictionary) = TransactionalDictionaryChanges::find_dictionary_by_name(self, namespace, name)
123		{
124			return Ok(Some(dictionary.clone()));
125		}
126
127		// 2. Check if deleted
128		// nop for QueryTransaction
129		if TransactionalDictionaryChanges::is_dictionary_deleted_by_name(self, namespace, name) {
130			return Ok(None);
131		}
132
133		// 3. Check MaterializedCatalog
134		if let Some(dictionary) = self.catalog().find_dictionary_by_name(namespace, name, self.version()) {
135			return Ok(Some(dictionary));
136		}
137
138		// 4. Fall back to storage as defensive measure
139		if let Some(dictionary) = CatalogStore::find_dictionary_by_name(self, namespace, name).await? {
140			warn!(
141				"Dictionary '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
142				name, namespace
143			);
144			return Ok(Some(dictionary));
145		}
146
147		Ok(None)
148	}
149
150	#[instrument(name = "catalog::dictionary::get", level = "trace", skip(self))]
151	async fn get_dictionary(&mut self, id: DictionaryId) -> reifydb_core::Result<DictionaryDef> {
152		self.find_dictionary(id).await?.ok_or_else(|| {
153			error!(internal!(
154				"Dictionary with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
155				id
156			))
157		})
158	}
159
160	#[instrument(name = "catalog::dictionary::get_by_name", level = "trace", skip(self, name))]
161	async fn get_dictionary_by_name(
162		&mut self,
163		namespace: NamespaceId,
164		name: impl Into<Fragment> + Send,
165	) -> reifydb_core::Result<DictionaryDef> {
166		let name = name.into();
167
168		// Try to get the namespace name for the error message
169		let namespace_name = self
170			.find_namespace(namespace)
171			.await?
172			.map(|ns| ns.name)
173			.unwrap_or_else(|| format!("namespace_{}", namespace));
174
175		self.find_dictionary_by_name(namespace, name.text())
176			.await?
177			.ok_or_else(|| error!(dictionary_not_found(name.clone(), &namespace_name, name.text())))
178	}
179}