reifydb_engine/transaction/catalog/
table.rs1use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackTableChangeOperations;
6use reifydb_core::interface::{
7 Change, NamespaceId, OperationType, OperationType::Delete, TableDef, TableId, TransactionalTableChanges,
8};
9use reifydb_type::IntoFragment;
10
11use crate::{StandardCommandTransaction, StandardQueryTransaction};
12
13impl CatalogTrackTableChangeOperations for StandardCommandTransaction {
14 fn track_table_def_created(&mut self, table: TableDef) -> reifydb_core::Result<()> {
15 let change = Change {
16 pre: None,
17 post: Some(table),
18 op: Create,
19 };
20 self.changes.add_table_def_change(change);
21 Ok(())
22 }
23
24 fn track_table_def_updated(&mut self, pre: TableDef, post: TableDef) -> reifydb_core::Result<()> {
25 let change = Change {
26 pre: Some(pre),
27 post: Some(post),
28 op: Update,
29 };
30 self.changes.add_table_def_change(change);
31 Ok(())
32 }
33
34 fn track_table_def_deleted(&mut self, table: TableDef) -> reifydb_core::Result<()> {
35 let change = Change {
36 pre: Some(table),
37 post: None,
38 op: Delete,
39 };
40 self.changes.add_table_def_change(change);
41 Ok(())
42 }
43}
44
45impl TransactionalTableChanges for StandardCommandTransaction {
46 fn find_table(&self, id: TableId) -> Option<&TableDef> {
47 for change in self.changes.table_def.iter().rev() {
49 if let Some(table) = &change.post {
50 if table.id == id {
51 return Some(table);
52 }
53 } else if let Some(table) = &change.pre {
54 if table.id == id && change.op == Delete {
55 return None;
57 }
58 }
59 }
60 None
61 }
62
63 fn find_table_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> Option<&TableDef> {
64 let name = name.into_fragment();
65 self.changes.table_def.iter().rev().find_map(|change| {
66 change.post.as_ref().filter(|t| t.namespace == namespace && t.name == name.text())
67 })
68 }
69
70 fn is_table_deleted(&self, id: TableId) -> bool {
71 self.changes
72 .table_def
73 .iter()
74 .rev()
75 .any(|change| change.op == Delete && change.pre.as_ref().map(|t| t.id) == Some(id))
76 }
77
78 fn is_table_deleted_by_name<'a>(&self, namespace: NamespaceId, name: impl IntoFragment<'a>) -> bool {
79 let name = name.into_fragment();
80 self.changes.table_def.iter().rev().any(|change| {
81 change.op == Delete
82 && change
83 .pre
84 .as_ref()
85 .map(|t| t.namespace == namespace && t.name == name.text())
86 .unwrap_or(false)
87 })
88 }
89}
90
91impl TransactionalTableChanges for StandardQueryTransaction {
92 fn find_table(&self, _id: TableId) -> Option<&TableDef> {
93 None
94 }
95
96 fn find_table_by_name<'a>(&self, _namespace: NamespaceId, _name: impl IntoFragment<'a>) -> Option<&TableDef> {
97 None
98 }
99
100 fn is_table_deleted(&self, _id: TableId) -> bool {
101 false
102 }
103
104 fn is_table_deleted_by_name<'a>(&self, _namespace: NamespaceId, _name: impl IntoFragment<'a>) -> bool {
105 false
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use reifydb_catalog::transaction::CatalogTrackTableChangeOperations;
112 use reifydb_core::interface::{
113 NamespaceId, Operation,
114 OperationType::{Create, Delete, Update},
115 TableDef, TableId,
116 };
117
118 use crate::test_utils::create_test_command_transaction;
119
120 fn test_table_def(id: u64, namespace_id: u64, name: &str) -> TableDef {
122 TableDef {
123 id: TableId(id),
124 namespace: NamespaceId(namespace_id),
125 name: name.to_string(),
126 columns: vec![],
127 primary_key: None,
128 }
129 }
130
131 mod track_table_def_created {
132 use super::*;
133
134 #[test]
135 fn test_successful_creation() {
136 let mut txn = create_test_command_transaction();
137
138 let table = test_table_def(1, 1, "test_table");
139 let result = txn.track_table_def_created(table.clone());
140 assert!(result.is_ok());
141
142 assert_eq!(txn.changes.table_def.len(), 1);
144 let change = &txn.changes.table_def[0];
145 assert!(change.pre.is_none());
146 assert_eq!(change.post.as_ref().unwrap().name, "test_table");
147 assert_eq!(change.op, Create);
148
149 assert_eq!(txn.changes.log.len(), 1);
151 match &txn.changes.log[0] {
152 Operation::Table {
153 id,
154 op,
155 } if *id == table.id && *op == Create => {}
156 _ => panic!("Expected Table operation with Create"),
157 }
158 }
159 }
160
161 mod track_table_def_updated {
162 use super::*;
163
164 #[test]
165 fn test_multiple_updates_no_coalescing() {
166 let mut txn = create_test_command_transaction();
167 let table_v1 = test_table_def(1, 1, "table_v1");
168 let table_v2 = test_table_def(1, 1, "table_v2");
169 let table_v3 = test_table_def(1, 1, "table_v3");
170
171 txn.track_table_def_updated(table_v1.clone(), table_v2.clone()).unwrap();
173
174 assert_eq!(txn.changes.table_def.len(), 1);
176 assert_eq!(txn.changes.table_def[0].pre.as_ref().unwrap().name, "table_v1");
177 assert_eq!(txn.changes.table_def[0].post.as_ref().unwrap().name, "table_v2");
178 assert_eq!(txn.changes.table_def[0].op, Update);
179
180 txn.track_table_def_updated(table_v2, table_v3.clone()).unwrap();
182
183 assert_eq!(txn.changes.table_def.len(), 2);
185
186 assert_eq!(txn.changes.table_def[1].pre.as_ref().unwrap().name, "table_v2");
188 assert_eq!(txn.changes.table_def[1].post.as_ref().unwrap().name, "table_v3");
189
190 assert_eq!(txn.changes.log.len(), 2);
192 }
193
194 #[test]
195 fn test_create_then_update_no_coalescing() {
196 let mut txn = create_test_command_transaction();
197 let table_v1 = test_table_def(1, 1, "table_v1");
198 let table_v2 = test_table_def(1, 1, "table_v2");
199
200 txn.track_table_def_created(table_v1.clone()).unwrap();
202 assert_eq!(txn.changes.table_def.len(), 1);
203 assert_eq!(txn.changes.table_def[0].op, Create);
204
205 txn.track_table_def_updated(table_v1, table_v2.clone()).unwrap();
207
208 assert_eq!(txn.changes.table_def.len(), 2);
210
211 assert_eq!(txn.changes.table_def[0].op, Create);
213
214 assert_eq!(txn.changes.table_def[1].op, Update);
216
217 assert_eq!(txn.changes.log.len(), 2);
219 }
220 }
221
222 mod track_table_def_deleted {
223 use super::*;
224
225 #[test]
226 fn test_delete_after_create_no_coalescing() {
227 let mut txn = create_test_command_transaction();
228 let table = test_table_def(1, 1, "test_table");
229
230 txn.track_table_def_created(table.clone()).unwrap();
232 assert_eq!(txn.changes.table_def.len(), 1);
233
234 let result = txn.track_table_def_deleted(table.clone());
236 assert!(result.is_ok());
237
238 assert_eq!(txn.changes.table_def.len(), 2);
240
241 assert_eq!(txn.changes.table_def[0].op, Create);
243
244 assert_eq!(txn.changes.table_def[1].op, Delete);
246
247 assert_eq!(txn.changes.log.len(), 2);
249 }
250
251 #[test]
252 fn test_normal_delete() {
253 let mut txn = create_test_command_transaction();
254 let table = test_table_def(1, 1, "test_table");
255
256 let result = txn.track_table_def_deleted(table.clone());
257 assert!(result.is_ok());
258
259 assert_eq!(txn.changes.table_def.len(), 1);
261 let change = &txn.changes.table_def[0];
262 assert_eq!(change.pre.as_ref().unwrap().name, "test_table");
263 assert!(change.post.is_none());
264 assert_eq!(change.op, Delete);
265
266 assert_eq!(txn.changes.log.len(), 1);
268 match &txn.changes.log[0] {
269 Operation::Table {
270 id,
271 op,
272 } if *id == table.id && *op == Delete => {}
273 _ => panic!("Expected Table operation with Delete"),
274 }
275 }
276 }
277}