reifydb_catalog/transaction/
namespace.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::interface::{
5	CommandTransaction, NamespaceDef, NamespaceId, QueryTransaction, TransactionalChanges,
6	TransactionalNamespaceChanges,
7	interceptor::{NamespaceDefInterceptor, WithInterceptors},
8};
9use reifydb_type::{
10	IntoFragment,
11	diagnostic::catalog::{namespace_already_exists, namespace_not_found},
12	error, internal, return_error,
13};
14use tracing::{instrument, warn};
15
16use crate::{CatalogStore, store::namespace::NamespaceToCreate, transaction::MaterializedCatalogTransaction};
17
18pub trait CatalogNamespaceCommandOperations {
19	fn create_namespace(&mut self, to_create: NamespaceToCreate) -> crate::Result<NamespaceDef>;
20
21	// TODO: Implement when update/delete are ready
22	// fn update_namespace(&mut self, namespace_id: NamespaceId, updates:
23	// NamespaceUpdates) -> crate::Result<NamespaceDef>; fn
24	// delete_namespace(&mut self, namespace_id: NamespaceId) ->
25	// crate::Result<()>;
26}
27
28pub trait CatalogTrackNamespaceChangeOperations {
29	fn track_namespace_def_created(&mut self, namespace: NamespaceDef) -> crate::Result<()>;
30
31	fn track_namespace_def_updated(&mut self, pre: NamespaceDef, post: NamespaceDef) -> crate::Result<()>;
32
33	fn track_namespace_def_deleted(&mut self, namespace: NamespaceDef) -> crate::Result<()>;
34}
35
36pub trait CatalogNamespaceQueryOperations {
37	fn find_namespace(&mut self, id: NamespaceId) -> crate::Result<Option<NamespaceDef>>;
38
39	fn find_namespace_by_name<'a>(&mut self, name: impl IntoFragment<'a>) -> crate::Result<Option<NamespaceDef>>;
40
41	fn get_namespace(&mut self, id: NamespaceId) -> crate::Result<NamespaceDef>;
42
43	fn get_namespace_by_name<'a>(&mut self, name: impl IntoFragment<'a>) -> crate::Result<NamespaceDef>;
44}
45
46impl<
47	CT: CommandTransaction
48		+ MaterializedCatalogTransaction
49		+ CatalogTrackNamespaceChangeOperations
50		+ WithInterceptors<CT>
51		+ TransactionalChanges,
52> CatalogNamespaceCommandOperations for CT
53{
54	#[instrument(level = "debug", skip(self, to_create))]
55	fn create_namespace(&mut self, to_create: NamespaceToCreate) -> reifydb_core::Result<NamespaceDef> {
56		if let Some(namespace) = self.find_namespace_by_name(&to_create.name)? {
57			return_error!(namespace_already_exists(to_create.namespace_fragment, &namespace.name));
58		}
59		let result = CatalogStore::create_namespace(self, to_create)?;
60		self.track_namespace_def_created(result.clone())?;
61		NamespaceDefInterceptor::post_create(self, &result)?;
62		Ok(result)
63	}
64}
65
66impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges> CatalogNamespaceQueryOperations
67	for QT
68{
69	#[instrument(level = "trace", skip(self))]
70	fn find_namespace(&mut self, id: NamespaceId) -> reifydb_core::Result<Option<NamespaceDef>> {
71		// 1. Check transactional changes first
72		if let Some(namespace) = TransactionalNamespaceChanges::find_namespace(self, id) {
73			return Ok(Some(namespace.clone()));
74		}
75
76		// 2. Check if deleted
77		// nop for QueryTransaction
78		if TransactionalNamespaceChanges::is_namespace_deleted(self, id) {
79			return Ok(None);
80		}
81
82		// 3. Check MaterializedCatalog
83		if let Some(namespace) = self.catalog().find_namespace(id, self.version()) {
84			return Ok(Some(namespace));
85		}
86
87		// 4. Fall back to storage as defensive measure
88		if let Some(namespace) = CatalogStore::find_namespace(self, id)? {
89			warn!("Namespace with ID {:?} found in storage but not in MaterializedCatalog", id);
90			return Ok(Some(namespace));
91		}
92
93		Ok(None)
94	}
95
96	#[instrument(level = "trace", skip(self, name))]
97	fn find_namespace_by_name<'a>(
98		&mut self,
99		name: impl IntoFragment<'a>,
100	) -> reifydb_core::Result<Option<NamespaceDef>> {
101		let name = name.into_fragment();
102
103		// 1. Check transactional changes first
104		// nop for QueryTransaction
105		if let Some(namespace) = TransactionalNamespaceChanges::find_namespace_by_name(self, name.as_borrowed())
106		{
107			return Ok(Some(namespace.clone()));
108		}
109
110		// 2. Check if deleted
111		// nop for QueryTransaction
112		if TransactionalNamespaceChanges::is_namespace_deleted_by_name(self, name.as_borrowed()) {
113			return Ok(None);
114		}
115
116		// 3. Check MaterializedCatalog
117		if let Some(namespace) = self.catalog().find_namespace_by_name(name.text(), self.version()) {
118			return Ok(Some(namespace));
119		}
120
121		// 4. Fall back to storage as defensive measure
122		if let Some(namespace) = CatalogStore::find_namespace_by_name(self, name.text())? {
123			warn!("Namespace '{}' found in storage but not in MaterializedCatalog", name.text());
124			return Ok(Some(namespace));
125		}
126
127		Ok(None)
128	}
129
130	#[instrument(level = "trace", skip(self))]
131	fn get_namespace(&mut self, id: NamespaceId) -> reifydb_core::Result<NamespaceDef> {
132		self.find_namespace(id)?.ok_or_else(|| {
133			error!(internal!(
134				"Namespace with ID {} not found in catalog. This indicates a critical catalog inconsistency.",
135				id
136			))
137		})
138	}
139
140	#[instrument(level = "trace", skip(self, name))]
141	fn get_namespace_by_name<'a>(&mut self, name: impl IntoFragment<'a>) -> reifydb_core::Result<NamespaceDef> {
142		let name = name.into_fragment();
143		self.find_namespace_by_name(name.as_borrowed())?
144			.ok_or_else(|| error!(namespace_not_found(name.as_borrowed(), name.text())))
145	}
146}