reifydb_engine/transaction/catalog/
table.rs1use OperationType::{Create, Update};
5use reifydb_catalog::transaction::CatalogTrackTableChangeOperations;
6use reifydb_core::interface::{
7 Change, NamespaceId, OperationType, OperationType::Delete, TableDef, TableId, TransactionalTableChanges,
8};
9
10use crate::{StandardCommandTransaction, StandardQueryTransaction};
11
12impl CatalogTrackTableChangeOperations for StandardCommandTransaction {
13 fn track_table_def_created(&mut self, table: TableDef) -> reifydb_core::Result<()> {
14 let change = Change {
15 pre: None,
16 post: Some(table),
17 op: Create,
18 };
19 self.changes.add_table_def_change(change);
20 Ok(())
21 }
22
23 fn track_table_def_updated(&mut self, pre: TableDef, post: TableDef) -> reifydb_core::Result<()> {
24 let change = Change {
25 pre: Some(pre),
26 post: Some(post),
27 op: Update,
28 };
29 self.changes.add_table_def_change(change);
30 Ok(())
31 }
32
33 fn track_table_def_deleted(&mut self, table: TableDef) -> reifydb_core::Result<()> {
34 let change = Change {
35 pre: Some(table),
36 post: None,
37 op: Delete,
38 };
39 self.changes.add_table_def_change(change);
40 Ok(())
41 }
42}
43
44impl TransactionalTableChanges for StandardCommandTransaction {
45 fn find_table(&self, id: TableId) -> Option<&TableDef> {
46 for change in self.changes.table_def.iter().rev() {
48 if let Some(table) = &change.post {
49 if table.id == id {
50 return Some(table);
51 }
52 } else if let Some(table) = &change.pre {
53 if table.id == id && change.op == Delete {
54 return None;
56 }
57 }
58 }
59 None
60 }
61
62 fn find_table_by_name(&self, namespace: NamespaceId, name: &str) -> Option<&TableDef> {
63 self.changes
64 .table_def
65 .iter()
66 .rev()
67 .find_map(|change| change.post.as_ref().filter(|t| t.namespace == namespace && t.name == name))
68 }
69
70 fn is_table_deleted(&self, id: TableId) -> bool {
71 self.changes
72 .table_def
73 .iter()
74 .rev()
75 .any(|change| change.op == Delete && change.pre.as_ref().map(|t| t.id) == Some(id))
76 }
77
78 fn is_table_deleted_by_name(&self, namespace: NamespaceId, name: &str) -> bool {
79 self.changes.table_def.iter().rev().any(|change| {
80 change.op == Delete
81 && change
82 .pre
83 .as_ref()
84 .map(|t| t.namespace == namespace && t.name == name)
85 .unwrap_or(false)
86 })
87 }
88}
89
90impl TransactionalTableChanges for StandardQueryTransaction {
91 fn find_table(&self, _id: TableId) -> Option<&TableDef> {
92 None
93 }
94
95 fn find_table_by_name(&self, _namespace: NamespaceId, _name: &str) -> Option<&TableDef> {
96 None
97 }
98
99 fn is_table_deleted(&self, _id: TableId) -> bool {
100 false
101 }
102
103 fn is_table_deleted_by_name(&self, _namespace: NamespaceId, _name: &str) -> bool {
104 false
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use reifydb_catalog::transaction::CatalogTrackTableChangeOperations;
111 use reifydb_core::interface::{
112 NamespaceId, Operation,
113 OperationType::{Create, Delete, Update},
114 TableDef, TableId,
115 };
116
117 use crate::test_utils::create_test_command_transaction;
118
119 async fn test_table_def(id: u64, namespace_id: u64, name: &str) -> TableDef {
121 TableDef {
122 id: TableId(id),
123 namespace: NamespaceId(namespace_id),
124 name: name.to_string(),
125 columns: vec![],
126 primary_key: None,
127 }
128 }
129
130 mod track_table_def_created {
131 use super::*;
132
133 #[tokio::test]
134 async fn test_successful_creation() {
135 let mut txn = create_test_command_transaction().await;
136
137 let table = test_table_def(1, 1, "test_table").await;
138 let result = txn.track_table_def_created(table.clone());
139 assert!(result.is_ok());
140
141 assert_eq!(txn.changes.table_def.len(), 1);
143 let change = &txn.changes.table_def[0];
144 assert!(change.pre.is_none());
145 assert_eq!(change.post.as_ref().unwrap().name, "test_table");
146 assert_eq!(change.op, Create);
147
148 assert_eq!(txn.changes.log.len(), 1);
150 match &txn.changes.log[0] {
151 Operation::Table {
152 id,
153 op,
154 } if *id == table.id && *op == Create => {}
155 _ => panic!("Expected Table operation with Create"),
156 }
157 }
158 }
159
160 mod track_table_def_updated {
161 use super::*;
162
163 #[tokio::test]
164 async fn test_multiple_updates_no_coalescing() {
165 let mut txn = create_test_command_transaction().await;
166 let table_v1 = test_table_def(1, 1, "table_v1").await;
167 let table_v2 = test_table_def(1, 1, "table_v2").await;
168 let table_v3 = test_table_def(1, 1, "table_v3").await;
169
170 txn.track_table_def_updated(table_v1.clone(), table_v2.clone()).unwrap();
172
173 assert_eq!(txn.changes.table_def.len(), 1);
175 assert_eq!(txn.changes.table_def[0].pre.as_ref().unwrap().name, "table_v1");
176 assert_eq!(txn.changes.table_def[0].post.as_ref().unwrap().name, "table_v2");
177 assert_eq!(txn.changes.table_def[0].op, Update);
178
179 txn.track_table_def_updated(table_v2, table_v3.clone()).unwrap();
181
182 assert_eq!(txn.changes.table_def.len(), 2);
184
185 assert_eq!(txn.changes.table_def[1].pre.as_ref().unwrap().name, "table_v2");
187 assert_eq!(txn.changes.table_def[1].post.as_ref().unwrap().name, "table_v3");
188
189 assert_eq!(txn.changes.log.len(), 2);
191 }
192
193 #[tokio::test]
194 async fn test_create_then_update_no_coalescing() {
195 let mut txn = create_test_command_transaction().await;
196 let table_v1 = test_table_def(1, 1, "table_v1").await;
197 let table_v2 = test_table_def(1, 1, "table_v2").await;
198
199 txn.track_table_def_created(table_v1.clone()).unwrap();
201 assert_eq!(txn.changes.table_def.len(), 1);
202 assert_eq!(txn.changes.table_def[0].op, Create);
203
204 txn.track_table_def_updated(table_v1, table_v2.clone()).unwrap();
206
207 assert_eq!(txn.changes.table_def.len(), 2);
209
210 assert_eq!(txn.changes.table_def[0].op, Create);
212
213 assert_eq!(txn.changes.table_def[1].op, Update);
215
216 assert_eq!(txn.changes.log.len(), 2);
218 }
219 }
220
221 mod track_table_def_deleted {
222 use super::*;
223
224 #[tokio::test]
225 async fn test_delete_after_create_no_coalescing() {
226 let mut txn = create_test_command_transaction().await;
227 let table = test_table_def(1, 1, "test_table").await;
228
229 txn.track_table_def_created(table.clone()).unwrap();
231 assert_eq!(txn.changes.table_def.len(), 1);
232
233 let result = txn.track_table_def_deleted(table.clone());
235 assert!(result.is_ok());
236
237 assert_eq!(txn.changes.table_def.len(), 2);
239
240 assert_eq!(txn.changes.table_def[0].op, Create);
242
243 assert_eq!(txn.changes.table_def[1].op, Delete);
245
246 assert_eq!(txn.changes.log.len(), 2);
248 }
249
250 #[tokio::test]
251 async fn test_normal_delete() {
252 let mut txn = create_test_command_transaction().await;
253 let table = test_table_def(1, 1, "test_table").await;
254
255 let result = txn.track_table_def_deleted(table.clone());
256 assert!(result.is_ok());
257
258 assert_eq!(txn.changes.table_def.len(), 1);
260 let change = &txn.changes.table_def[0];
261 assert_eq!(change.pre.as_ref().unwrap().name, "test_table");
262 assert!(change.post.is_none());
263 assert_eq!(change.op, Delete);
264
265 assert_eq!(txn.changes.log.len(), 1);
267 match &txn.changes.log[0] {
268 Operation::Table {
269 id,
270 op,
271 } if *id == table.id && *op == Delete => {}
272 _ => panic!("Expected Table operation with Delete"),
273 }
274 }
275 }
276}