reifydb_catalog/transaction/
dictionary.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::interface::{
5	CommandTransaction, DictionaryDef, DictionaryId, NamespaceId, QueryTransaction, TransactionalChanges,
6	TransactionalDictionaryChanges, interceptor::WithInterceptors,
7};
8use reifydb_type::{
9	IntoFragment,
10	diagnostic::catalog::{dictionary_already_exists, dictionary_not_found},
11	error, internal, return_error,
12};
13use tracing::{instrument, warn};
14
15use crate::{
16	CatalogNamespaceQueryOperations, CatalogStore, store::dictionary::create::DictionaryToCreate,
17	transaction::MaterializedCatalogTransaction,
18};
19
20pub trait CatalogDictionaryCommandOperations {
21	fn create_dictionary(&mut self, to_create: DictionaryToCreate) -> crate::Result<DictionaryDef>;
22}
23
24pub trait CatalogTrackDictionaryChangeOperations {
25	fn track_dictionary_def_created(&mut self, dictionary: DictionaryDef) -> crate::Result<()>;
26
27	fn track_dictionary_def_updated(&mut self, pre: DictionaryDef, post: DictionaryDef) -> crate::Result<()>;
28
29	fn track_dictionary_def_deleted(&mut self, dictionary: DictionaryDef) -> crate::Result<()>;
30}
31
32pub trait CatalogDictionaryQueryOperations: CatalogNamespaceQueryOperations {
33	fn find_dictionary(&mut self, id: DictionaryId) -> crate::Result<Option<DictionaryDef>>;
34
35	fn find_dictionary_by_name<'a>(
36		&mut self,
37		namespace: NamespaceId,
38		name: impl IntoFragment<'a>,
39	) -> crate::Result<Option<DictionaryDef>>;
40
41	fn get_dictionary(&mut self, id: DictionaryId) -> crate::Result<DictionaryDef>;
42
43	fn get_dictionary_by_name<'a>(
44		&mut self,
45		namespace: NamespaceId,
46		name: impl IntoFragment<'a>,
47	) -> crate::Result<DictionaryDef>;
48}
49
50impl<
51	CT: CommandTransaction
52		+ MaterializedCatalogTransaction
53		+ CatalogTrackDictionaryChangeOperations
54		+ WithInterceptors<CT>
55		+ TransactionalChanges,
56> CatalogDictionaryCommandOperations for CT
57{
58	#[instrument(level = "debug", skip(self, to_create))]
59	fn create_dictionary(&mut self, to_create: DictionaryToCreate) -> reifydb_core::Result<DictionaryDef> {
60		if let Some(dictionary) = self.find_dictionary_by_name(to_create.namespace, &to_create.dictionary)? {
61			let namespace = self.get_namespace(to_create.namespace)?;
62			return_error!(dictionary_already_exists(to_create.fragment, &namespace.name, &dictionary.name));
63		}
64		let result = CatalogStore::create_dictionary(self, to_create)?;
65		self.track_dictionary_def_created(result.clone())?;
66		Ok(result)
67	}
68}
69
70impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges> CatalogDictionaryQueryOperations
71	for QT
72{
73	#[instrument(level = "trace", skip(self))]
74	fn find_dictionary(&mut self, id: DictionaryId) -> reifydb_core::Result<Option<DictionaryDef>> {
75		// 1. Check transactional changes first
76		// nop for QueryTransaction
77		if let Some(dictionary) = TransactionalDictionaryChanges::find_dictionary(self, id) {
78			return Ok(Some(dictionary.clone()));
79		}
80
81		// 2. Check if deleted
82		// nop for QueryTransaction
83		if TransactionalDictionaryChanges::is_dictionary_deleted(self, id) {
84			return Ok(None);
85		}
86
87		// 3. Check MaterializedCatalog
88		if let Some(dictionary) = self.catalog().find_dictionary(id, self.version()) {
89			return Ok(Some(dictionary));
90		}
91
92		// 4. Fall back to storage as defensive measure
93		if let Some(dictionary) = CatalogStore::find_dictionary(self, id)? {
94			warn!("Dictionary with ID {:?} found in storage but not in MaterializedCatalog", id);
95			return Ok(Some(dictionary));
96		}
97
98		Ok(None)
99	}
100
101	#[instrument(level = "trace", skip(self, name))]
102	fn find_dictionary_by_name<'a>(
103		&mut self,
104		namespace: NamespaceId,
105		name: impl IntoFragment<'a>,
106	) -> reifydb_core::Result<Option<DictionaryDef>> {
107		let name = name.into_fragment();
108
109		// 1. Check transactional changes first
110		// nop for QueryTransaction
111		if let Some(dictionary) =
112			TransactionalDictionaryChanges::find_dictionary_by_name(self, namespace, name.as_borrowed())
113		{
114			return Ok(Some(dictionary.clone()));
115		}
116
117		// 2. Check if deleted
118		// nop for QueryTransaction
119		if TransactionalDictionaryChanges::is_dictionary_deleted_by_name(self, namespace, name.as_borrowed()) {
120			return Ok(None);
121		}
122
123		// 3. Check MaterializedCatalog
124		if let Some(dictionary) = self.catalog().find_dictionary_by_name(namespace, name.text(), self.version())
125		{
126			return Ok(Some(dictionary));
127		}
128
129		// 4. Fall back to storage as defensive measure
130		if let Some(dictionary) = CatalogStore::find_dictionary_by_name(self, namespace, name.text())? {
131			warn!(
132				"Dictionary '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
133				name.text(),
134				namespace
135			);
136			return Ok(Some(dictionary));
137		}
138
139		Ok(None)
140	}
141
142	#[instrument(level = "trace", skip(self))]
143	fn get_dictionary(&mut self, id: DictionaryId) -> reifydb_core::Result<DictionaryDef> {
144		self.find_dictionary(id)?.ok_or_else(|| {
145			error!(internal!(
146				"Dictionary with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
147				id
148			))
149		})
150	}
151
152	#[instrument(level = "trace", skip(self, name))]
153	fn get_dictionary_by_name<'a>(
154		&mut self,
155		namespace: NamespaceId,
156		name: impl IntoFragment<'a>,
157	) -> reifydb_core::Result<DictionaryDef> {
158		let name = name.into_fragment();
159
160		// Try to get the namespace name for the error message
161		let namespace_name = self
162			.find_namespace(namespace)?
163			.map(|ns| ns.name)
164			.unwrap_or_else(|| format!("namespace_{}", namespace));
165
166		self.find_dictionary_by_name(namespace, name.as_borrowed())?
167			.ok_or_else(|| error!(dictionary_not_found(name.as_borrowed(), &namespace_name, name.text())))
168	}
169}