reifydb_catalog/transaction/
namespace.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	interface::{
6		CommandTransaction, NamespaceDef, NamespaceId, QueryTransaction, TransactionalChanges,
7		TransactionalNamespaceChanges,
8		interceptor::{NamespaceDefInterceptor, WithInterceptors},
9	},
10	log_warn,
11};
12use reifydb_type::{
13	IntoFragment,
14	diagnostic::catalog::{namespace_already_exists, namespace_not_found},
15	error, internal, return_error,
16};
17
18use crate::{CatalogStore, store::namespace::NamespaceToCreate, transaction::MaterializedCatalogTransaction};
19
20pub trait CatalogNamespaceCommandOperations {
21	fn create_namespace(&mut self, to_create: NamespaceToCreate) -> crate::Result<NamespaceDef>;
22
23	// TODO: Implement when update/delete are ready
24	// fn update_namespace(&mut self, namespace_id: NamespaceId, updates:
25	// NamespaceUpdates) -> crate::Result<NamespaceDef>; fn
26	// delete_namespace(&mut self, namespace_id: NamespaceId) ->
27	// crate::Result<()>;
28}
29
30pub trait CatalogTrackNamespaceChangeOperations {
31	fn track_namespace_def_created(&mut self, namespace: NamespaceDef) -> crate::Result<()>;
32
33	fn track_namespace_def_updated(&mut self, pre: NamespaceDef, post: NamespaceDef) -> crate::Result<()>;
34
35	fn track_namespace_def_deleted(&mut self, namespace: NamespaceDef) -> crate::Result<()>;
36}
37
38pub trait CatalogNamespaceQueryOperations {
39	fn find_namespace(&mut self, id: NamespaceId) -> crate::Result<Option<NamespaceDef>>;
40
41	fn find_namespace_by_name<'a>(&mut self, name: impl IntoFragment<'a>) -> crate::Result<Option<NamespaceDef>>;
42
43	fn get_namespace(&mut self, id: NamespaceId) -> crate::Result<NamespaceDef>;
44
45	fn get_namespace_by_name<'a>(&mut self, name: impl IntoFragment<'a>) -> crate::Result<NamespaceDef>;
46}
47
48impl<
49	CT: CommandTransaction
50		+ MaterializedCatalogTransaction
51		+ CatalogTrackNamespaceChangeOperations
52		+ WithInterceptors<CT>
53		+ TransactionalChanges,
54> CatalogNamespaceCommandOperations for CT
55{
56	fn create_namespace(&mut self, to_create: NamespaceToCreate) -> reifydb_core::Result<NamespaceDef> {
57		if let Some(namespace) = self.find_namespace_by_name(&to_create.name)? {
58			return_error!(namespace_already_exists(to_create.namespace_fragment, &namespace.name));
59		}
60		let result = CatalogStore::create_namespace(self, to_create)?;
61		self.track_namespace_def_created(result.clone())?;
62		NamespaceDefInterceptor::post_create(self, &result)?;
63		Ok(result)
64	}
65}
66
67impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges> CatalogNamespaceQueryOperations
68	for QT
69{
70	fn find_namespace(&mut self, id: NamespaceId) -> reifydb_core::Result<Option<NamespaceDef>> {
71		// 1. Check transactional changes first
72		if let Some(namespace) = TransactionalNamespaceChanges::find_namespace(self, id) {
73			return Ok(Some(namespace.clone()));
74		}
75
76		// 2. Check if deleted
77		// nop for QueryTransaction
78		if TransactionalNamespaceChanges::is_namespace_deleted(self, id) {
79			return Ok(None);
80		}
81
82		// 3. Check MaterializedCatalog
83		if let Some(namespace) = self.catalog().find_namespace(id, self.version()) {
84			return Ok(Some(namespace));
85		}
86
87		// 4. Fall back to storage as defensive measure
88		if let Some(namespace) = CatalogStore::find_namespace(self, id)? {
89			log_warn!("Namespace with ID {:?} found in storage but not in MaterializedCatalog", id);
90			return Ok(Some(namespace));
91		}
92
93		Ok(None)
94	}
95
96	fn find_namespace_by_name<'a>(
97		&mut self,
98		name: impl IntoFragment<'a>,
99	) -> reifydb_core::Result<Option<NamespaceDef>> {
100		let name = name.into_fragment();
101
102		// 1. Check transactional changes first
103		// nop for QueryTransaction
104		if let Some(namespace) = TransactionalNamespaceChanges::find_namespace_by_name(self, name.as_borrowed())
105		{
106			return Ok(Some(namespace.clone()));
107		}
108
109		// 2. Check if deleted
110		// nop for QueryTransaction
111		if TransactionalNamespaceChanges::is_namespace_deleted_by_name(self, name.as_borrowed()) {
112			return Ok(None);
113		}
114
115		// 3. Check MaterializedCatalog
116		if let Some(namespace) = self.catalog().find_namespace_by_name(name.text(), self.version()) {
117			return Ok(Some(namespace));
118		}
119
120		// 4. Fall back to storage as defensive measure
121		if let Some(namespace) = CatalogStore::find_namespace_by_name(self, name.text())? {
122			log_warn!("Namespace '{}' found in storage but not in MaterializedCatalog", name.text());
123			return Ok(Some(namespace));
124		}
125
126		Ok(None)
127	}
128
129	fn get_namespace(&mut self, id: NamespaceId) -> reifydb_core::Result<NamespaceDef> {
130		self.find_namespace(id)?.ok_or_else(|| {
131			error!(internal!(
132				"Namespace with ID {} not found in catalog. This indicates a critical catalog inconsistency.",
133				id
134			))
135		})
136	}
137
138	fn get_namespace_by_name<'a>(&mut self, name: impl IntoFragment<'a>) -> reifydb_core::Result<NamespaceDef> {
139		let name = name.into_fragment();
140		self.find_namespace_by_name(name.as_borrowed())?
141			.ok_or_else(|| error!(namespace_not_found(name.as_borrowed(), name.text())))
142	}
143}