reifydb_catalog/transaction/
table.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::{
5	interface::{
6		CommandTransaction, NamespaceId, QueryTransaction, TableDef, TableId, TransactionalChanges,
7		TransactionalTableChanges,
8		interceptor::{TableDefInterceptor, WithInterceptors},
9	},
10	log_warn,
11};
12use reifydb_type::{
13	IntoFragment,
14	diagnostic::catalog::{table_already_exists, table_not_found},
15	error, internal, return_error,
16};
17
18use crate::{
19	CatalogNamespaceQueryOperations, CatalogStore, store::table::TableToCreate,
20	transaction::MaterializedCatalogTransaction,
21};
22
23pub trait CatalogTableCommandOperations {
24	fn create_table(&mut self, table: TableToCreate) -> crate::Result<TableDef>;
25
26	// TODO: Implement when update/delete are ready
27	// fn update_table(&mut self, table_id: TableId, updates: TableUpdates)
28	// -> crate::Result<TableDef>; fn delete_table(&mut self, table_id:
29	// TableId) -> crate::Result<()>;
30}
31
32pub trait CatalogTrackTableChangeOperations {
33	// Table tracking methods
34	fn track_table_def_created(&mut self, table: TableDef) -> crate::Result<()>;
35
36	fn track_table_def_updated(&mut self, pre: TableDef, post: TableDef) -> crate::Result<()>;
37
38	fn track_table_def_deleted(&mut self, table: TableDef) -> crate::Result<()>;
39}
40
41pub trait CatalogTableQueryOperations: CatalogNamespaceQueryOperations {
42	fn find_table(&mut self, id: TableId) -> crate::Result<Option<TableDef>>;
43
44	fn find_table_by_name<'a>(
45		&mut self,
46		namespace: NamespaceId,
47		name: impl IntoFragment<'a>,
48	) -> crate::Result<Option<TableDef>>;
49
50	fn get_table(&mut self, id: TableId) -> crate::Result<TableDef>;
51
52	fn get_table_by_name<'a>(
53		&mut self,
54		namespace: NamespaceId,
55		name: impl IntoFragment<'a>,
56	) -> crate::Result<TableDef>;
57}
58
59impl<
60	CT: CommandTransaction
61		+ MaterializedCatalogTransaction
62		+ CatalogTrackTableChangeOperations
63		+ WithInterceptors<CT>
64		+ TransactionalChanges,
65> CatalogTableCommandOperations for CT
66{
67	fn create_table(&mut self, to_create: TableToCreate) -> reifydb_core::Result<TableDef> {
68		if let Some(table) = self.find_table_by_name(to_create.namespace, &to_create.table)? {
69			let namespace = self.get_namespace(to_create.namespace)?;
70			return_error!(table_already_exists(to_create.fragment, &namespace.name, &table.name));
71		}
72		let result = CatalogStore::create_table(self, to_create)?;
73		self.track_table_def_created(result.clone())?;
74		TableDefInterceptor::post_create(self, &result)?;
75		Ok(result)
76	}
77}
78
79impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges> CatalogTableQueryOperations for QT {
80	fn find_table(&mut self, id: TableId) -> reifydb_core::Result<Option<TableDef>> {
81		// 1. Check transactional changes first
82		// nop for QueryTransaction
83		if let Some(table) = TransactionalTableChanges::find_table(self, id) {
84			return Ok(Some(table.clone()));
85		}
86
87		// 2. Check if deleted
88		// nop for QueryTransaction
89		if TransactionalTableChanges::is_table_deleted(self, id) {
90			return Ok(None);
91		}
92
93		// 3. Check MaterializedCatalog
94		if let Some(table) = self.catalog().find_table(id, self.version()) {
95			return Ok(Some(table));
96		}
97
98		// 4. Fall back to storage as defensive measure
99		if let Some(table) = CatalogStore::find_table(self, id)? {
100			log_warn!("Table with ID {:?} found in storage but not in MaterializedCatalog", id);
101			return Ok(Some(table));
102		}
103
104		Ok(None)
105	}
106
107	fn find_table_by_name<'a>(
108		&mut self,
109		namespace: NamespaceId,
110		name: impl IntoFragment<'a>,
111	) -> reifydb_core::Result<Option<TableDef>> {
112		let name = name.into_fragment();
113
114		// 1. Check transactional changes first
115		// nop for QueryTransaction
116		if let Some(table) = TransactionalTableChanges::find_table_by_name(self, namespace, name.as_borrowed())
117		{
118			return Ok(Some(table.clone()));
119		}
120
121		// 2. Check if deleted
122		// nop for QueryTransaction
123		if TransactionalTableChanges::is_table_deleted_by_name(self, namespace, name.as_borrowed()) {
124			return Ok(None);
125		}
126
127		// 3. Check MaterializedCatalog
128		if let Some(table) = self.catalog().find_table_by_name(namespace, name.text(), self.version()) {
129			return Ok(Some(table));
130		}
131
132		// 4. Fall back to storage as defensive measure
133		if let Some(table) = CatalogStore::find_table_by_name(self, namespace, name.text())? {
134			log_warn!(
135				"Table '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
136				name.text(),
137				namespace
138			);
139			return Ok(Some(table));
140		}
141
142		Ok(None)
143	}
144
145	fn get_table(&mut self, id: TableId) -> reifydb_core::Result<TableDef> {
146		self.find_table(id)?.ok_or_else(|| {
147			error!(internal!(
148				"Table with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
149				id
150			))
151		})
152	}
153
154	fn get_table_by_name<'a>(
155		&mut self,
156		namespace: NamespaceId,
157		name: impl IntoFragment<'a>,
158	) -> reifydb_core::Result<TableDef> {
159		let name = name.into_fragment();
160
161		// Try to get the namespace name for the error message
162		let namespace_name = self
163			.find_namespace(namespace)?
164			.map(|ns| ns.name)
165			.unwrap_or_else(|| format!("namespace_{}", namespace));
166
167		self.find_table_by_name(namespace, name.as_borrowed())?
168			.ok_or_else(|| error!(table_not_found(name.as_borrowed(), &namespace_name, name.text())))
169	}
170}