1use reifydb_core::{
5 common::CommitVersion,
6 encoded::{row::EncodedRow, shape::RowShape},
7 interface::{
8 catalog::{shape::ShapeId, table::Table},
9 change::{Change, ChangeOrigin, Diff},
10 },
11 key::row::RowKey,
12 value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
13};
14use reifydb_transaction::{
15 change::{RowChange, TableRowInsertion},
16 interceptor::table_row::TableRowInterceptor,
17 transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
18};
19use reifydb_type::{
20 fragment::Fragment,
21 util::cowvec::CowVec,
22 value::{datetime::DateTime, row_number::RowNumber},
23};
24use smallvec::smallvec;
25
26use crate::Result;
27
28fn build_encoded_columns(shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Columns {
29 let fields = shape.fields();
30
31 let mut columns_vec: Vec<ColumnWithName> = Vec::with_capacity(fields.len());
32 for field in fields.iter() {
33 columns_vec.push(ColumnWithName {
34 name: Fragment::internal(&field.name),
35 data: ColumnBuffer::with_capacity(field.constraint.get_type(), 1),
36 });
37 }
38
39 for (i, _) in fields.iter().enumerate() {
40 columns_vec[i].data.push_value(shape.get_value(encoded, i));
41 }
42
43 Columns::with_system_columns(
44 columns_vec,
45 vec![row_number],
46 vec![DateTime::from_nanos(encoded.created_at_nanos())],
47 vec![DateTime::from_nanos(encoded.updated_at_nanos())],
48 )
49}
50
51fn build_table_insert_change(table: &Table, shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Change {
52 Change {
53 origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
54 version: CommitVersion(0),
55 diffs: smallvec![Diff::insert(build_encoded_columns(shape, row_number, encoded))],
56 changed_at: DateTime::default(),
57 }
58}
59
60fn build_table_update_change(table: &Table, row_number: RowNumber, pre: &EncodedRow, post: &EncodedRow) -> Change {
61 let shape: RowShape = (&table.columns).into();
62 Change {
63 origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
64 version: CommitVersion(0),
65 diffs: smallvec![Diff::update(
66 build_encoded_columns(&shape, row_number, pre),
67 build_encoded_columns(&shape, row_number, post),
68 )],
69 changed_at: DateTime::default(),
70 }
71}
72
73fn build_table_remove_change(table: &Table, row_number: RowNumber, encoded: &EncodedRow) -> Change {
74 let shape: RowShape = (&table.columns).into();
75 Change {
76 origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
77 version: CommitVersion(0),
78 diffs: smallvec![Diff::remove(build_encoded_columns(&shape, row_number, encoded))],
79 changed_at: DateTime::default(),
80 }
81}
82
83pub(crate) trait TableOperations {
84 fn insert_table(
85 &mut self,
86 table: &Table,
87 shape: &RowShape,
88 row: EncodedRow,
89 row_number: RowNumber,
90 ) -> Result<EncodedRow>;
91
92 fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow>;
93
94 fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow>;
95}
96
97impl TableOperations for CommandTransaction {
98 fn insert_table(
99 &mut self,
100 table: &Table,
101 shape: &RowShape,
102 row: EncodedRow,
103 row_number: RowNumber,
104 ) -> Result<EncodedRow> {
105 let row = TableRowInterceptor::pre_insert(self, table, row_number, row)?;
106
107 self.set(&RowKey::encoded(table.id, row_number), row.clone())?;
108
109 TableRowInterceptor::post_insert(self, table, row_number, &row)?;
110
111 self.track_row_change(RowChange::TableInsert(TableRowInsertion {
113 table_id: table.id,
114 row_number,
115 encoded: row.clone(),
116 }));
117
118 self.track_flow_change(build_table_insert_change(table, shape, row_number, &row));
120
121 Ok(row)
122 }
123
124 fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
125 let key = RowKey::encoded(table.id, id);
126
127 let pre = match self.get(&key)? {
128 Some(v) => v.row,
129 None => return Ok(row),
130 };
131
132 let row = TableRowInterceptor::pre_update(self, &table, id, row)?;
133
134 if self.get_committed(&key)?.is_some() {
135 self.mark_preexisting(&key)?;
136 }
137 self.set(&key, row.clone())?;
138
139 TableRowInterceptor::post_update(self, &table, id, &row, &pre)?;
140
141 self.track_flow_change(build_table_update_change(&table, id, &pre, &row));
142
143 Ok(row)
144 }
145
146 fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
147 let key = RowKey::encoded(table.id, id);
148
149 let displayed = match self.get(&key)? {
150 Some(v) => v.row,
151 None => return Ok(EncodedRow(CowVec::new(vec![]))),
152 };
153 let committed = self.get_committed(&key)?.map(|v| v.row);
154
155 TableRowInterceptor::pre_delete(self, &table, id)?;
156
157 let pre_for_cdc = committed.clone().unwrap_or_else(|| displayed.clone());
158
159 if committed.is_some() {
160 self.mark_preexisting(&key)?;
161 }
162 self.unset(&key, pre_for_cdc.clone())?;
163
164 TableRowInterceptor::post_delete(self, &table, id, &pre_for_cdc)?;
165
166 self.track_flow_change(build_table_remove_change(&table, id, &pre_for_cdc));
167
168 Ok(displayed)
169 }
170}
171
172impl TableOperations for AdminTransaction {
173 fn insert_table(
174 &mut self,
175 table: &Table,
176 shape: &RowShape,
177 row: EncodedRow,
178 row_number: RowNumber,
179 ) -> Result<EncodedRow> {
180 let row = TableRowInterceptor::pre_insert(self, table, row_number, row)?;
181
182 self.set(&RowKey::encoded(table.id, row_number), row.clone())?;
183
184 TableRowInterceptor::post_insert(self, table, row_number, &row)?;
185
186 self.track_row_change(RowChange::TableInsert(TableRowInsertion {
188 table_id: table.id,
189 row_number,
190 encoded: row.clone(),
191 }));
192
193 self.track_flow_change(build_table_insert_change(table, shape, row_number, &row));
195
196 Ok(row)
197 }
198
199 fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
200 let key = RowKey::encoded(table.id, id);
201
202 let pre = match self.get(&key)? {
203 Some(v) => v.row,
204 None => return Ok(row),
205 };
206
207 let row = TableRowInterceptor::pre_update(self, &table, id, row)?;
208
209 if self.get_committed(&key)?.is_some() {
210 self.mark_preexisting(&key)?;
211 }
212 self.set(&key, row.clone())?;
213
214 TableRowInterceptor::post_update(self, &table, id, &row, &pre)?;
215
216 self.track_flow_change(build_table_update_change(&table, id, &pre, &row));
217
218 Ok(row)
219 }
220
221 fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
222 let key = RowKey::encoded(table.id, id);
223
224 let displayed = match self.get(&key)? {
225 Some(v) => v.row,
226 None => return Ok(EncodedRow(CowVec::new(vec![]))),
227 };
228 let committed = self.get_committed(&key)?.map(|v| v.row);
229
230 TableRowInterceptor::pre_delete(self, &table, id)?;
231
232 let pre_for_cdc = committed.clone().unwrap_or_else(|| displayed.clone());
233
234 if committed.is_some() {
235 self.mark_preexisting(&key)?;
236 }
237 self.unset(&key, pre_for_cdc.clone())?;
238
239 TableRowInterceptor::post_delete(self, &table, id, &pre_for_cdc)?;
240
241 self.track_flow_change(build_table_remove_change(&table, id, &pre_for_cdc));
242
243 Ok(displayed)
244 }
245}
246
247impl TableOperations for Transaction<'_> {
248 fn insert_table(
249 &mut self,
250 table: &Table,
251 shape: &RowShape,
252 row: EncodedRow,
253 row_number: RowNumber,
254 ) -> Result<EncodedRow> {
255 match self {
256 Transaction::Command(txn) => txn.insert_table(table, shape, row, row_number),
257 Transaction::Admin(txn) => txn.insert_table(table, shape, row, row_number),
258 Transaction::Test(t) => t.inner.insert_table(table, shape, row, row_number),
259 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
260 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
261 }
262 }
263
264 fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
265 match self {
266 Transaction::Command(txn) => txn.update_table(table, id, row),
267 Transaction::Admin(txn) => txn.update_table(table, id, row),
268 Transaction::Test(t) => t.inner.update_table(table, id, row),
269 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
270 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
271 }
272 }
273
274 fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
275 match self {
276 Transaction::Command(txn) => txn.remove_from_table(table, id),
277 Transaction::Admin(txn) => txn.remove_from_table(table, id),
278 Transaction::Test(t) => t.inner.remove_from_table(table, id),
279 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
280 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
281 }
282 }
283}