1use reifydb_core::{
5 common::CommitVersion,
6 encoded::{row::EncodedRow, shape::RowShape},
7 interface::{
8 catalog::{shape::ShapeId, table::Table},
9 change::{Change, ChangeOrigin, Diff},
10 },
11 key::row::RowKey,
12 value::column::{Column, columns::Columns, data::ColumnData},
13};
14use reifydb_transaction::{
15 change::{RowChange, TableRowInsertion},
16 interceptor::table_row::TableRowInterceptor,
17 transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
18};
19use reifydb_type::{
20 fragment::Fragment,
21 util::cowvec::CowVec,
22 value::{datetime::DateTime, row_number::RowNumber},
23};
24
25use crate::Result;
26
27fn build_encoded_columns(shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Columns {
28 let fields = shape.fields();
29
30 let mut columns_vec: Vec<Column> = Vec::with_capacity(fields.len());
31 for field in fields.iter() {
32 columns_vec.push(Column {
33 name: Fragment::internal(&field.name),
34 data: ColumnData::with_capacity(field.constraint.get_type(), 1),
35 });
36 }
37
38 for (i, _) in fields.iter().enumerate() {
39 columns_vec[i].data.push_value(shape.get_value(encoded, i));
40 }
41
42 Columns {
43 row_numbers: CowVec::new(vec![row_number]),
44 created_at: CowVec::new(vec![DateTime::from_nanos(encoded.created_at_nanos())]),
45 updated_at: CowVec::new(vec![DateTime::from_nanos(encoded.updated_at_nanos())]),
46 columns: CowVec::new(columns_vec),
47 }
48}
49
50fn build_table_insert_change(table: &Table, shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Change {
51 Change {
52 origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
53 version: CommitVersion(0),
54 diffs: vec![Diff::Insert {
55 post: build_encoded_columns(shape, row_number, encoded),
56 }],
57 changed_at: DateTime::default(),
58 }
59}
60
61fn build_table_update_change(table: &Table, row_number: RowNumber, pre: &EncodedRow, post: &EncodedRow) -> Change {
62 let shape: RowShape = (&table.columns).into();
63 Change {
64 origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
65 version: CommitVersion(0),
66 diffs: vec![Diff::Update {
67 pre: build_encoded_columns(&shape, row_number, pre),
68 post: build_encoded_columns(&shape, row_number, post),
69 }],
70 changed_at: DateTime::default(),
71 }
72}
73
74fn build_table_remove_change(table: &Table, row_number: RowNumber, encoded: &EncodedRow) -> Change {
75 let shape: RowShape = (&table.columns).into();
76 Change {
77 origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
78 version: CommitVersion(0),
79 diffs: vec![Diff::Remove {
80 pre: build_encoded_columns(&shape, row_number, encoded),
81 }],
82 changed_at: DateTime::default(),
83 }
84}
85
86pub(crate) trait TableOperations {
87 fn insert_table(
88 &mut self,
89 table: &Table,
90 shape: &RowShape,
91 row: EncodedRow,
92 row_number: RowNumber,
93 ) -> Result<EncodedRow>;
94
95 fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow>;
96
97 fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow>;
98}
99
100impl TableOperations for CommandTransaction {
101 fn insert_table(
102 &mut self,
103 table: &Table,
104 shape: &RowShape,
105 row: EncodedRow,
106 row_number: RowNumber,
107 ) -> Result<EncodedRow> {
108 let row = TableRowInterceptor::pre_insert(self, table, row_number, row)?;
109
110 self.set(&RowKey::encoded(table.id, row_number), row.clone())?;
111
112 TableRowInterceptor::post_insert(self, table, row_number, &row)?;
113
114 self.track_row_change(RowChange::TableInsert(TableRowInsertion {
116 table_id: table.id,
117 row_number,
118 encoded: row.clone(),
119 }));
120
121 self.track_flow_change(build_table_insert_change(table, shape, row_number, &row));
123
124 Ok(row)
125 }
126
127 fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
128 let key = RowKey::encoded(table.id, id);
129
130 let pre = match self.get(&key)? {
131 Some(v) => v.row,
132 None => return Ok(row),
133 };
134
135 let row = TableRowInterceptor::pre_update(self, &table, id, row)?;
136
137 self.set(&key, row.clone())?;
138
139 TableRowInterceptor::post_update(self, &table, id, &row, &pre)?;
140
141 self.track_flow_change(build_table_update_change(&table, id, &pre, &row));
142
143 Ok(row)
144 }
145
146 fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
147 let key = RowKey::encoded(table.id, id);
148
149 let deleted_values = match self.get(&key)? {
150 Some(v) => v.row,
151 None => return Ok(EncodedRow(CowVec::new(vec![]))),
152 };
153
154 TableRowInterceptor::pre_delete(self, &table, id)?;
155
156 self.unset(&key, deleted_values.clone())?;
157
158 TableRowInterceptor::post_delete(self, &table, id, &deleted_values)?;
159
160 self.track_flow_change(build_table_remove_change(&table, id, &deleted_values));
161
162 Ok(deleted_values)
163 }
164}
165
166impl TableOperations for AdminTransaction {
167 fn insert_table(
168 &mut self,
169 table: &Table,
170 shape: &RowShape,
171 row: EncodedRow,
172 row_number: RowNumber,
173 ) -> Result<EncodedRow> {
174 let row = TableRowInterceptor::pre_insert(self, table, row_number, row)?;
175
176 self.set(&RowKey::encoded(table.id, row_number), row.clone())?;
177
178 TableRowInterceptor::post_insert(self, table, row_number, &row)?;
179
180 self.track_row_change(RowChange::TableInsert(TableRowInsertion {
182 table_id: table.id,
183 row_number,
184 encoded: row.clone(),
185 }));
186
187 self.track_flow_change(build_table_insert_change(table, shape, row_number, &row));
189
190 Ok(row)
191 }
192
193 fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
194 let key = RowKey::encoded(table.id, id);
195
196 let pre = match self.get(&key)? {
197 Some(v) => v.row,
198 None => return Ok(row),
199 };
200
201 let row = TableRowInterceptor::pre_update(self, &table, id, row)?;
202
203 self.set(&key, row.clone())?;
204
205 TableRowInterceptor::post_update(self, &table, id, &row, &pre)?;
206
207 self.track_flow_change(build_table_update_change(&table, id, &pre, &row));
208
209 Ok(row)
210 }
211
212 fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
213 let key = RowKey::encoded(table.id, id);
214
215 let deleted_values = match self.get(&key)? {
216 Some(v) => v.row,
217 None => return Ok(EncodedRow(CowVec::new(vec![]))),
218 };
219
220 TableRowInterceptor::pre_delete(self, &table, id)?;
221
222 self.unset(&key, deleted_values.clone())?;
223
224 TableRowInterceptor::post_delete(self, &table, id, &deleted_values)?;
225
226 self.track_flow_change(build_table_remove_change(&table, id, &deleted_values));
227
228 Ok(deleted_values)
229 }
230}
231
232impl TableOperations for Transaction<'_> {
233 fn insert_table(
234 &mut self,
235 table: &Table,
236 shape: &RowShape,
237 row: EncodedRow,
238 row_number: RowNumber,
239 ) -> Result<EncodedRow> {
240 match self {
241 Transaction::Command(txn) => txn.insert_table(table, shape, row, row_number),
242 Transaction::Admin(txn) => txn.insert_table(table, shape, row, row_number),
243 Transaction::Test(t) => t.inner.insert_table(table, shape, row, row_number),
244 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
245 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
246 }
247 }
248
249 fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
250 match self {
251 Transaction::Command(txn) => txn.update_table(table, id, row),
252 Transaction::Admin(txn) => txn.update_table(table, id, row),
253 Transaction::Test(t) => t.inner.update_table(table, id, row),
254 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
255 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
256 }
257 }
258
259 fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
260 match self {
261 Transaction::Command(txn) => txn.remove_from_table(table, id),
262 Transaction::Admin(txn) => txn.remove_from_table(table, id),
263 Transaction::Test(t) => t.inner.remove_from_table(table, id),
264 Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
265 Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
266 }
267 }
268}