reifydb_catalog/transaction/
table.rs1use reifydb_core::interface::{
5 CommandTransaction, NamespaceId, QueryTransaction, TableDef, TableId, TransactionalChanges,
6 TransactionalTableChanges,
7 interceptor::{TableDefInterceptor, WithInterceptors},
8};
9use reifydb_type::{
10 IntoFragment,
11 diagnostic::catalog::{table_already_exists, table_not_found},
12 error, internal, return_error,
13};
14use tracing::{instrument, warn};
15
16use crate::{
17 CatalogNamespaceQueryOperations, CatalogStore, store::table::TableToCreate,
18 transaction::MaterializedCatalogTransaction,
19};
20
21pub trait CatalogTableCommandOperations {
22 fn create_table(&mut self, table: TableToCreate) -> crate::Result<TableDef>;
23
24 }
29
30pub trait CatalogTrackTableChangeOperations {
31 fn track_table_def_created(&mut self, table: TableDef) -> crate::Result<()>;
32
33 fn track_table_def_updated(&mut self, pre: TableDef, post: TableDef) -> crate::Result<()>;
34
35 fn track_table_def_deleted(&mut self, table: TableDef) -> crate::Result<()>;
36}
37
38pub trait CatalogTableQueryOperations: CatalogNamespaceQueryOperations {
39 fn find_table(&mut self, id: TableId) -> crate::Result<Option<TableDef>>;
40
41 fn find_table_by_name<'a>(
42 &mut self,
43 namespace: NamespaceId,
44 name: impl IntoFragment<'a>,
45 ) -> crate::Result<Option<TableDef>>;
46
47 fn get_table(&mut self, id: TableId) -> crate::Result<TableDef>;
48
49 fn get_table_by_name<'a>(
50 &mut self,
51 namespace: NamespaceId,
52 name: impl IntoFragment<'a>,
53 ) -> crate::Result<TableDef>;
54}
55
56impl<
57 CT: CommandTransaction
58 + MaterializedCatalogTransaction
59 + CatalogTrackTableChangeOperations
60 + WithInterceptors<CT>
61 + TransactionalChanges,
62> CatalogTableCommandOperations for CT
63{
64 #[instrument(level = "debug", skip(self, to_create))]
65 fn create_table(&mut self, to_create: TableToCreate) -> reifydb_core::Result<TableDef> {
66 if let Some(table) = self.find_table_by_name(to_create.namespace, &to_create.table)? {
67 let namespace = self.get_namespace(to_create.namespace)?;
68 return_error!(table_already_exists(to_create.fragment, &namespace.name, &table.name));
69 }
70 let result = CatalogStore::create_table(self, to_create)?;
71 self.track_table_def_created(result.clone())?;
72 TableDefInterceptor::post_create(self, &result)?;
73 Ok(result)
74 }
75}
76
77impl<QT: QueryTransaction + MaterializedCatalogTransaction + TransactionalChanges> CatalogTableQueryOperations for QT {
78 #[instrument(level = "trace", skip(self))]
79 fn find_table(&mut self, id: TableId) -> reifydb_core::Result<Option<TableDef>> {
80 if let Some(table) = TransactionalTableChanges::find_table(self, id) {
83 return Ok(Some(table.clone()));
84 }
85
86 if TransactionalTableChanges::is_table_deleted(self, id) {
89 return Ok(None);
90 }
91
92 if let Some(table) = self.catalog().find_table(id, self.version()) {
94 return Ok(Some(table));
95 }
96
97 if let Some(table) = CatalogStore::find_table(self, id)? {
99 warn!("Table with ID {:?} found in storage but not in MaterializedCatalog", id);
100 return Ok(Some(table));
101 }
102
103 Ok(None)
104 }
105
106 #[instrument(level = "trace", skip(self, name))]
107 fn find_table_by_name<'a>(
108 &mut self,
109 namespace: NamespaceId,
110 name: impl IntoFragment<'a>,
111 ) -> reifydb_core::Result<Option<TableDef>> {
112 let name = name.into_fragment();
113
114 if let Some(table) = TransactionalTableChanges::find_table_by_name(self, namespace, name.as_borrowed())
117 {
118 return Ok(Some(table.clone()));
119 }
120
121 if TransactionalTableChanges::is_table_deleted_by_name(self, namespace, name.as_borrowed()) {
124 return Ok(None);
125 }
126
127 if let Some(table) = self.catalog().find_table_by_name(namespace, name.text(), self.version()) {
129 return Ok(Some(table));
130 }
131
132 if let Some(table) = CatalogStore::find_table_by_name(self, namespace, name.text())? {
134 warn!(
135 "Table '{}' in namespace {:?} found in storage but not in MaterializedCatalog",
136 name.text(),
137 namespace
138 );
139 return Ok(Some(table));
140 }
141
142 Ok(None)
143 }
144
145 #[instrument(level = "trace", skip(self))]
146 fn get_table(&mut self, id: TableId) -> reifydb_core::Result<TableDef> {
147 self.find_table(id)?.ok_or_else(|| {
148 error!(internal!(
149 "Table with ID {:?} not found in catalog. This indicates a critical catalog inconsistency.",
150 id
151 ))
152 })
153 }
154
155 #[instrument(level = "trace", skip(self, name))]
156 fn get_table_by_name<'a>(
157 &mut self,
158 namespace: NamespaceId,
159 name: impl IntoFragment<'a>,
160 ) -> reifydb_core::Result<TableDef> {
161 let name = name.into_fragment();
162
163 let namespace_name = self
165 .find_namespace(namespace)?
166 .map(|ns| ns.name)
167 .unwrap_or_else(|| format!("namespace_{}", namespace));
168
169 self.find_table_by_name(namespace, name.as_borrowed())?
170 .ok_or_else(|| error!(table_not_found(name.as_borrowed(), &namespace_name, name.text())))
171 }
172}