reifydb_catalog/transaction/
namespace.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use async_trait::async_trait;
5use reifydb_core::interface::{
6	CommandTransaction, NamespaceDef, NamespaceId, QueryTransaction, TransactionalChanges,
7	TransactionalNamespaceChanges,
8	interceptor::{NamespaceDefInterceptor, WithInterceptors},
9};
10use reifydb_type::{
11	Fragment,
12	diagnostic::catalog::{namespace_already_exists, namespace_not_found},
13	error, internal, return_error,
14};
15use tracing::{instrument, warn};
16
17use crate::{CatalogStore, store::namespace::NamespaceToCreate, transaction::MaterializedCatalogTransaction};
18
19#[async_trait]
20pub trait CatalogNamespaceCommandOperations: Send {
21	async fn create_namespace(&mut self, to_create: NamespaceToCreate) -> crate::Result<NamespaceDef>;
22
23	// TODO: Implement when update/delete are ready
24	// async fn update_namespace(&mut self, namespace_id: NamespaceId, updates:
25	// NamespaceUpdates) -> crate::Result<NamespaceDef>; async fn
26	// delete_namespace(&mut self, namespace_id: NamespaceId) ->
27	// crate::Result<()>;
28}
29
30pub trait CatalogTrackNamespaceChangeOperations {
31	fn track_namespace_def_created(&mut self, namespace: NamespaceDef) -> crate::Result<()>;
32
33	fn track_namespace_def_updated(&mut self, pre: NamespaceDef, post: NamespaceDef) -> crate::Result<()>;
34
35	fn track_namespace_def_deleted(&mut self, namespace: NamespaceDef) -> crate::Result<()>;
36}
37
38#[async_trait]
39pub trait CatalogNamespaceQueryOperations: Send {
40	async fn find_namespace(&mut self, id: NamespaceId) -> crate::Result<Option<NamespaceDef>>;
41
42	async fn find_namespace_by_name(&mut self, name: &str) -> crate::Result<Option<NamespaceDef>>;
43
44	async fn get_namespace(&mut self, id: NamespaceId) -> crate::Result<NamespaceDef>;
45
46	async fn get_namespace_by_name(&mut self, name: impl Into<Fragment> + Send) -> crate::Result<NamespaceDef>;
47}
48
49#[async_trait]
50impl<
51	CT: CommandTransaction
52		+ MaterializedCatalogTransaction
53		+ CatalogTrackNamespaceChangeOperations
54		+ WithInterceptors<CT>
55		+ TransactionalChanges
56		+ Send
57		+ 'static,
58> CatalogNamespaceCommandOperations for CT
59{
60	#[instrument(name = "catalog::namespace::create", level = "debug", skip(self, to_create))]
61	async fn create_namespace(&mut self, to_create: NamespaceToCreate) -> reifydb_core::Result<NamespaceDef> {
62		if let Some(namespace) = self.find_namespace_by_name(to_create.name.as_str()).await? {
63			return_error!(namespace_already_exists(
64				to_create.namespace_fragment.unwrap_or_else(|| Fragment::None),
65				&namespace.name
66			));
67		}
68		let result = CatalogStore::create_namespace(self, to_create).await?;
69		self.track_namespace_def_created(result.clone())?;
70		NamespaceDefInterceptor::post_create(self, &result).await?;
71		Ok(result)
72	}
73}
74
75#[async_trait]
76impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges + Send + 'static>
77	CatalogNamespaceQueryOperations for QT
78{
79	#[instrument(name = "catalog::namespace::find", level = "trace", skip(self))]
80	async fn find_namespace(&mut self, id: NamespaceId) -> reifydb_core::Result<Option<NamespaceDef>> {
81		// 1. Check transactional changes first
82		if let Some(namespace) = TransactionalNamespaceChanges::find_namespace(self, id) {
83			return Ok(Some(namespace.clone()));
84		}
85
86		// 2. Check if deleted
87		// nop for QueryTransaction
88		if TransactionalNamespaceChanges::is_namespace_deleted(self, id) {
89			return Ok(None);
90		}
91
92		// 3. Check MaterializedCatalog
93		if let Some(namespace) = self.catalog().find_namespace(id, self.version()) {
94			return Ok(Some(namespace));
95		}
96
97		// 4. Fall back to storage as defensive measure
98		if let Some(namespace) = CatalogStore::find_namespace(self, id).await? {
99			warn!("Namespace with ID {:?} found in storage but not in MaterializedCatalog", id);
100			return Ok(Some(namespace));
101		}
102
103		Ok(None)
104	}
105
106	#[instrument(name = "catalog::namespace::find_by_name", level = "trace", skip(self, name))]
107	async fn find_namespace_by_name(&mut self, name: &str) -> reifydb_core::Result<Option<NamespaceDef>> {
108		// 1. Check transactional changes first
109		// nop for QueryTransaction
110		if let Some(namespace) = TransactionalNamespaceChanges::find_namespace_by_name(self, name) {
111			return Ok(Some(namespace.clone()));
112		}
113
114		// 2. Check if deleted
115		// nop for QueryTransaction
116		if TransactionalNamespaceChanges::is_namespace_deleted_by_name(self, name) {
117			return Ok(None);
118		}
119
120		// 3. Check MaterializedCatalog
121		if let Some(namespace) = self.catalog().find_namespace_by_name(name, self.version()) {
122			return Ok(Some(namespace));
123		}
124
125		// 4. Fall back to storage as defensive measure
126		if let Some(namespace) = CatalogStore::find_namespace_by_name(self, name).await? {
127			warn!("Namespace '{}' found in storage but not in MaterializedCatalog", name);
128			return Ok(Some(namespace));
129		}
130
131		Ok(None)
132	}
133
134	#[instrument(name = "catalog::namespace::get", level = "trace", skip(self))]
135	async fn get_namespace(&mut self, id: NamespaceId) -> reifydb_core::Result<NamespaceDef> {
136		self.find_namespace(id).await?.ok_or_else(|| {
137			error!(internal!(
138				"Namespace with ID {} not found in catalog. This indicates a critical catalog inconsistency.",
139				id
140			))
141		})
142	}
143
144	#[instrument(name = "catalog::namespace::get_by_name", level = "trace", skip(self, name))]
145	async fn get_namespace_by_name(
146		&mut self,
147		name: impl Into<Fragment> + Send,
148	) -> reifydb_core::Result<NamespaceDef> {
149		let name = name.into();
150		self.find_namespace_by_name(name.text())
151			.await?
152			.ok_or_else(|| error!(namespace_not_found(name.clone(), name.text())))
153	}
154}