reifydb_catalog/transaction/
table.rs1use reifydb_core::{
5 interface::{
6 CommandTransaction, NamespaceId, QueryTransaction, TableDef, TableId, TransactionalChanges,
7 TransactionalTableChanges,
8 interceptor::{TableDefInterceptor, WithInterceptors},
9 },
10 log_warn,
11};
12use reifydb_type::{
13 IntoFragment,
14 diagnostic::catalog::{table_already_exists, table_not_found},
15 error, internal, return_error,
16};
17
18use crate::{
19 CatalogNamespaceQueryOperations, CatalogStore, store::table::TableToCreate,
20 transaction::MaterializedCatalogTransaction,
21};
22
23pub trait CatalogTableCommandOperations {
24 fn create_table(&mut self, table: TableToCreate) -> crate::Result<TableDef>;
25
26 }
31
32pub trait CatalogTrackTableChangeOperations {
33 fn track_table_def_created(&mut self, table: TableDef) -> crate::Result<()>;
35
36 fn track_table_def_updated(&mut self, pre: TableDef, post: TableDef) -> crate::Result<()>;
37
38 fn track_table_def_deleted(&mut self, table: TableDef) -> crate::Result<()>;
39}
40
41pub trait CatalogTableQueryOperations: CatalogNamespaceQueryOperations {
42 fn find_table(&mut self, id: TableId) -> crate::Result<Option<TableDef>>;
43
44 fn find_table_by_name<'a>(
45 &mut self,
46 namespace: NamespaceId,
47 name: impl IntoFragment<'a>,
48 ) -> crate::Result<Option<TableDef>>;
49
50 fn get_table(&mut self, id: TableId) -> crate::Result<TableDef>;
51
52 fn get_table_by_name<'a>(
53 &mut self,
54 namespace: NamespaceId,
55 name: impl IntoFragment<'a>,
56 ) -> crate::Result<TableDef>;
57}
58
59impl<
60 CT: CommandTransaction
61 + MaterializedCatalogTransaction
62 + CatalogTrackTableChangeOperations
63 + WithInterceptors<CT>
64 + TransactionalChanges,
65> CatalogTableCommandOperations for CT
66{
67 fn create_table(&mut self, to_create: TableToCreate) -> reifydb_core::Result<TableDef> {
68 if let Some(table) = self.find_table_by_name(to_create.namespace, &to_create.table)? {
69 let namespace = self.get_namespace(to_create.namespace)?;
70 return_error!(table_already_exists(to_create.fragment, &namespace.name, &table.name));
71 }
72 let result = CatalogStore::create_table(self, to_create)?;
73 self.track_table_def_created(result.clone())?;
74 TableDefInterceptor::post_create(self, &result)?;
75 Ok(result)
76 }
77}
78
79impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges> CatalogTableQueryOperations for QT {
80 fn find_table(&mut self, id: TableId) -> reifydb_core::Result<Option<TableDef>> {
81 if let Some(table) = TransactionalTableChanges::find_table(self, id) {
84 return Ok(Some(table.clone()));
85 }
86
87 if TransactionalTableChanges::is_table_deleted(self, id) {
90 return Ok(None);
91 }
92
93 if let Some(table) = self.catalog().find_table(id, self.version()) {
95 return Ok(Some(table));
96 }
97
98 if let Some(table) = CatalogStore::find_table(self, id)? {
100 log_warn!("Table with ID {:?} found in storage but not in MaterializedCatalog", id);
101 return Ok(Some(table));
102 }
103
104 Ok(None)
105 }
106
107 fn find_table_by_name<'a>(
108 &mut self,
109 namespace: NamespaceId,
110 name: impl IntoFragment<'a>,
111 ) -> reifydb_core::Result<Option<TableDef>> {
112 let name = name.into_fragment();
113
114 if let Some(table) = TransactionalTableChanges::find_table_by_name(self, namespace, name.as_borrowed())
117 {
118 return Ok(Some(table.clone()));
119 }
120
121 if TransactionalTableChanges::is_table_deleted_by_name(self, namespace, name.as_borrowed()) {
124 return Ok(None);
125 }
126
127 if let Some(table) = self.catalog().find_table_by_name(namespace, name.text(), self.version()) {
129 return Ok(Some(table));
130 }
131
132 if let Some(table) = CatalogStore::find_table_by_name(self, namespace, name.text())? {
134 log_warn!(
135 "Table '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
136 name.text(),
137 namespace
138 );
139 return Ok(Some(table));
140 }
141
142 Ok(None)
143 }
144
145 fn get_table(&mut self, id: TableId) -> reifydb_core::Result<TableDef> {
146 self.find_table(id)?.ok_or_else(|| {
147 error!(internal!(
148 "Table with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
149 id
150 ))
151 })
152 }
153
154 fn get_table_by_name<'a>(
155 &mut self,
156 namespace: NamespaceId,
157 name: impl IntoFragment<'a>,
158 ) -> reifydb_core::Result<TableDef> {
159 let name = name.into_fragment();
160
161 let namespace_name = self
163 .find_namespace(namespace)?
164 .map(|ns| ns.name)
165 .unwrap_or_else(|| format!("namespace_{}", namespace));
166
167 self.find_table_by_name(namespace, name.as_borrowed())?
168 .ok_or_else(|| error!(table_not_found(name.as_borrowed(), &namespace_name, name.text())))
169 }
170}