reifydb_catalog/transaction/
table.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use reifydb_core::interface::{
5	CommandTransaction, NamespaceId, QueryTransaction, TableDef, TableId, TransactionalChanges,
6	TransactionalTableChanges,
7	interceptor::{TableDefInterceptor, WithInterceptors},
8};
9use reifydb_type::{
10	IntoFragment,
11	diagnostic::catalog::{table_already_exists, table_not_found},
12	error, internal, return_error,
13};
14use tracing::{instrument, warn};
15
16use crate::{
17	CatalogNamespaceQueryOperations, CatalogStore, store::table::TableToCreate,
18	transaction::MaterializedCatalogTransaction,
19};
20
21pub trait CatalogTableCommandOperations {
22	fn create_table(&mut self, table: TableToCreate) -> crate::Result<TableDef>;
23
24	// TODO: Implement when update/delete are ready
25	// fn update_table(&mut self, table_id: TableId, updates: TableUpdates)
26	// -> crate::Result<TableDef>; fn delete_table(&mut self, table_id:
27	// TableId) -> crate::Result<()>;
28}
29
30pub trait CatalogTrackTableChangeOperations {
31	fn track_table_def_created(&mut self, table: TableDef) -> crate::Result<()>;
32
33	fn track_table_def_updated(&mut self, pre: TableDef, post: TableDef) -> crate::Result<()>;
34
35	fn track_table_def_deleted(&mut self, table: TableDef) -> crate::Result<()>;
36}
37
38pub trait CatalogTableQueryOperations: CatalogNamespaceQueryOperations {
39	fn find_table(&mut self, id: TableId) -> crate::Result<Option<TableDef>>;
40
41	fn find_table_by_name<'a>(
42		&mut self,
43		namespace: NamespaceId,
44		name: impl IntoFragment<'a>,
45	) -> crate::Result<Option<TableDef>>;
46
47	fn get_table(&mut self, id: TableId) -> crate::Result<TableDef>;
48
49	fn get_table_by_name<'a>(
50		&mut self,
51		namespace: NamespaceId,
52		name: impl IntoFragment<'a>,
53	) -> crate::Result<TableDef>;
54}
55
56impl<
57	CT: CommandTransaction
58		+ MaterializedCatalogTransaction
59		+ CatalogTrackTableChangeOperations
60		+ WithInterceptors<CT>
61		+ TransactionalChanges,
62> CatalogTableCommandOperations for CT
63{
64	#[instrument(level = "debug", skip(self, to_create))]
65	fn create_table(&mut self, to_create: TableToCreate) -> reifydb_core::Result<TableDef> {
66		if let Some(table) = self.find_table_by_name(to_create.namespace, &to_create.table)? {
67			let namespace = self.get_namespace(to_create.namespace)?;
68			return_error!(table_already_exists(to_create.fragment, &namespace.name, &table.name));
69		}
70		let result = CatalogStore::create_table(self, to_create)?;
71		self.track_table_def_created(result.clone())?;
72		TableDefInterceptor::post_create(self, &result)?;
73		Ok(result)
74	}
75}
76
77impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges> CatalogTableQueryOperations for QT {
78	#[instrument(level = "trace", skip(self))]
79	fn find_table(&mut self, id: TableId) -> reifydb_core::Result<Option<TableDef>> {
80		// 1. Check transactional changes first
81		// nop for QueryTransaction
82		if let Some(table) = TransactionalTableChanges::find_table(self, id) {
83			return Ok(Some(table.clone()));
84		}
85
86		// 2. Check if deleted
87		// nop for QueryTransaction
88		if TransactionalTableChanges::is_table_deleted(self, id) {
89			return Ok(None);
90		}
91
92		// 3. Check MaterializedCatalog
93		if let Some(table) = self.catalog().find_table(id, self.version()) {
94			return Ok(Some(table));
95		}
96
97		// 4. Fall back to storage as defensive measure
98		if let Some(table) = CatalogStore::find_table(self, id)? {
99			warn!("Table with ID {:?} found in storage but not in MaterializedCatalog", id);
100			return Ok(Some(table));
101		}
102
103		Ok(None)
104	}
105
106	#[instrument(level = "trace", skip(self, name))]
107	fn find_table_by_name<'a>(
108		&mut self,
109		namespace: NamespaceId,
110		name: impl IntoFragment<'a>,
111	) -> reifydb_core::Result<Option<TableDef>> {
112		let name = name.into_fragment();
113
114		// 1. Check transactional changes first
115		// nop for QueryTransaction
116		if let Some(table) = TransactionalTableChanges::find_table_by_name(self, namespace, name.as_borrowed())
117		{
118			return Ok(Some(table.clone()));
119		}
120
121		// 2. Check if deleted
122		// nop for QueryTransaction
123		if TransactionalTableChanges::is_table_deleted_by_name(self, namespace, name.as_borrowed()) {
124			return Ok(None);
125		}
126
127		// 3. Check MaterializedCatalog
128		if let Some(table) = self.catalog().find_table_by_name(namespace, name.text(), self.version()) {
129			return Ok(Some(table));
130		}
131
132		// 4. Fall back to storage as defensive measure
133		if let Some(table) = CatalogStore::find_table_by_name(self, namespace, name.text())? {
134			warn!(
135				"Table '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
136				name.text(),
137				namespace
138			);
139			return Ok(Some(table));
140		}
141
142		Ok(None)
143	}
144
145	#[instrument(level = "trace", skip(self))]
146	fn get_table(&mut self, id: TableId) -> reifydb_core::Result<TableDef> {
147		self.find_table(id)?.ok_or_else(|| {
148			error!(internal!(
149				"Table with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
150				id
151			))
152		})
153	}
154
155	#[instrument(level = "trace", skip(self, name))]
156	fn get_table_by_name<'a>(
157		&mut self,
158		namespace: NamespaceId,
159		name: impl IntoFragment<'a>,
160	) -> reifydb_core::Result<TableDef> {
161		let name = name.into_fragment();
162
163		// Try to get the namespace name for the error message
164		let namespace_name = self
165			.find_namespace(namespace)?
166			.map(|ns| ns.name)
167			.unwrap_or_else(|| format!("namespace_{}", namespace));
168
169		self.find_table_by_name(namespace, name.as_borrowed())?
170			.ok_or_else(|| error!(table_not_found(name.as_borrowed(), &namespace_name, name.text())))
171	}
172}