reifydb_catalog/transaction/
dictionary.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	interface::{
6		CommandTransaction, DictionaryDef, DictionaryId, NamespaceId, QueryTransaction, TransactionalChanges,
7		TransactionalDictionaryChanges, interceptor::WithInterceptors,
8	},
9	log_warn,
10};
11use reifydb_type::{
12	IntoFragment,
13	diagnostic::catalog::{dictionary_already_exists, dictionary_not_found},
14	error, internal, return_error,
15};
16
17use crate::{
18	CatalogNamespaceQueryOperations, CatalogStore, store::dictionary::create::DictionaryToCreate,
19	transaction::MaterializedCatalogTransaction,
20};
21
22pub trait CatalogDictionaryCommandOperations {
23	fn create_dictionary(&mut self, to_create: DictionaryToCreate) -> crate::Result<DictionaryDef>;
24}
25
26pub trait CatalogTrackDictionaryChangeOperations {
27	fn track_dictionary_def_created(&mut self, dictionary: DictionaryDef) -> crate::Result<()>;
28
29	fn track_dictionary_def_updated(&mut self, pre: DictionaryDef, post: DictionaryDef) -> crate::Result<()>;
30
31	fn track_dictionary_def_deleted(&mut self, dictionary: DictionaryDef) -> crate::Result<()>;
32}
33
34pub trait CatalogDictionaryQueryOperations: CatalogNamespaceQueryOperations {
35	fn find_dictionary(&mut self, id: DictionaryId) -> crate::Result<Option<DictionaryDef>>;
36
37	fn find_dictionary_by_name<'a>(
38		&mut self,
39		namespace: NamespaceId,
40		name: impl IntoFragment<'a>,
41	) -> crate::Result<Option<DictionaryDef>>;
42
43	fn get_dictionary(&mut self, id: DictionaryId) -> crate::Result<DictionaryDef>;
44
45	fn get_dictionary_by_name<'a>(
46		&mut self,
47		namespace: NamespaceId,
48		name: impl IntoFragment<'a>,
49	) -> crate::Result<DictionaryDef>;
50}
51
52impl<
53	CT: CommandTransaction
54		+ MaterializedCatalogTransaction
55		+ CatalogTrackDictionaryChangeOperations
56		+ WithInterceptors<CT>
57		+ TransactionalChanges,
58> CatalogDictionaryCommandOperations for CT
59{
60	fn create_dictionary(&mut self, to_create: DictionaryToCreate) -> reifydb_core::Result<DictionaryDef> {
61		if let Some(dictionary) = self.find_dictionary_by_name(to_create.namespace, &to_create.dictionary)? {
62			let namespace = self.get_namespace(to_create.namespace)?;
63			return_error!(dictionary_already_exists(to_create.fragment, &namespace.name, &dictionary.name));
64		}
65		let result = CatalogStore::create_dictionary(self, to_create)?;
66		self.track_dictionary_def_created(result.clone())?;
67		Ok(result)
68	}
69}
70
71impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges> CatalogDictionaryQueryOperations
72	for QT
73{
74	fn find_dictionary(&mut self, id: DictionaryId) -> reifydb_core::Result<Option<DictionaryDef>> {
75		// 1. Check transactional changes first
76		// nop for QueryTransaction
77		if let Some(dictionary) = TransactionalDictionaryChanges::find_dictionary(self, id) {
78			return Ok(Some(dictionary.clone()));
79		}
80
81		// 2. Check if deleted
82		// nop for QueryTransaction
83		if TransactionalDictionaryChanges::is_dictionary_deleted(self, id) {
84			return Ok(None);
85		}
86
87		// 3. Check MaterializedCatalog
88		if let Some(dictionary) = self.catalog().find_dictionary(id, self.version()) {
89			return Ok(Some(dictionary));
90		}
91
92		// 4. Fall back to storage as defensive measure
93		if let Some(dictionary) = CatalogStore::find_dictionary(self, id)? {
94			log_warn!("Dictionary with ID {:?} found in storage but not in MaterializedCatalog", id);
95			return Ok(Some(dictionary));
96		}
97
98		Ok(None)
99	}
100
101	fn find_dictionary_by_name<'a>(
102		&mut self,
103		namespace: NamespaceId,
104		name: impl IntoFragment<'a>,
105	) -> reifydb_core::Result<Option<DictionaryDef>> {
106		let name = name.into_fragment();
107
108		// 1. Check transactional changes first
109		// nop for QueryTransaction
110		if let Some(dictionary) =
111			TransactionalDictionaryChanges::find_dictionary_by_name(self, namespace, name.as_borrowed())
112		{
113			return Ok(Some(dictionary.clone()));
114		}
115
116		// 2. Check if deleted
117		// nop for QueryTransaction
118		if TransactionalDictionaryChanges::is_dictionary_deleted_by_name(self, namespace, name.as_borrowed()) {
119			return Ok(None);
120		}
121
122		// 3. Check MaterializedCatalog
123		if let Some(dictionary) = self.catalog().find_dictionary_by_name(namespace, name.text(), self.version())
124		{
125			return Ok(Some(dictionary));
126		}
127
128		// 4. Fall back to storage as defensive measure
129		if let Some(dictionary) = CatalogStore::find_dictionary_by_name(self, namespace, name.text())? {
130			log_warn!(
131				"Dictionary '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
132				name.text(),
133				namespace
134			);
135			return Ok(Some(dictionary));
136		}
137
138		Ok(None)
139	}
140
141	fn get_dictionary(&mut self, id: DictionaryId) -> reifydb_core::Result<DictionaryDef> {
142		self.find_dictionary(id)?.ok_or_else(|| {
143			error!(internal!(
144				"Dictionary with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
145				id
146			))
147		})
148	}
149
150	fn get_dictionary_by_name<'a>(
151		&mut self,
152		namespace: NamespaceId,
153		name: impl IntoFragment<'a>,
154	) -> reifydb_core::Result<DictionaryDef> {
155		let name = name.into_fragment();
156
157		// Try to get the namespace name for the error message
158		let namespace_name = self
159			.find_namespace(namespace)?
160			.map(|ns| ns.name)
161			.unwrap_or_else(|| format!("namespace_{}", namespace));
162
163		self.find_dictionary_by_name(namespace, name.as_borrowed())?
164			.ok_or_else(|| error!(dictionary_not_found(name.as_borrowed(), &namespace_name, name.text())))
165	}
166}