Skip to main content

reifydb_engine/transaction/operation/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::CommitVersion,
6	encoded::{row::EncodedRow, shape::RowShape},
7	interface::{
8		catalog::{shape::ShapeId, table::Table},
9		change::{Change, ChangeOrigin, Diff},
10	},
11	key::row::RowKey,
12	value::column::{Column, columns::Columns, data::ColumnData},
13};
14use reifydb_transaction::{
15	change::{RowChange, TableRowInsertion},
16	interceptor::table_row::TableRowInterceptor,
17	transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
18};
19use reifydb_type::{
20	fragment::Fragment,
21	util::cowvec::CowVec,
22	value::{datetime::DateTime, row_number::RowNumber},
23};
24
25use crate::Result;
26
27fn build_encoded_columns(shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Columns {
28	let fields = shape.fields();
29
30	let mut columns_vec: Vec<Column> = Vec::with_capacity(fields.len());
31	for field in fields.iter() {
32		columns_vec.push(Column {
33			name: Fragment::internal(&field.name),
34			data: ColumnData::with_capacity(field.constraint.get_type(), 1),
35		});
36	}
37
38	for (i, _) in fields.iter().enumerate() {
39		columns_vec[i].data.push_value(shape.get_value(encoded, i));
40	}
41
42	Columns {
43		row_numbers: CowVec::new(vec![row_number]),
44		created_at: CowVec::new(vec![DateTime::from_nanos(encoded.created_at_nanos())]),
45		updated_at: CowVec::new(vec![DateTime::from_nanos(encoded.updated_at_nanos())]),
46		columns: CowVec::new(columns_vec),
47	}
48}
49
50fn build_table_insert_change(table: &Table, shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Change {
51	Change {
52		origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
53		version: CommitVersion(0),
54		diffs: vec![Diff::Insert {
55			post: build_encoded_columns(shape, row_number, encoded),
56		}],
57		changed_at: DateTime::default(),
58	}
59}
60
61fn build_table_update_change(table: &Table, row_number: RowNumber, pre: &EncodedRow, post: &EncodedRow) -> Change {
62	let shape: RowShape = (&table.columns).into();
63	Change {
64		origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
65		version: CommitVersion(0),
66		diffs: vec![Diff::Update {
67			pre: build_encoded_columns(&shape, row_number, pre),
68			post: build_encoded_columns(&shape, row_number, post),
69		}],
70		changed_at: DateTime::default(),
71	}
72}
73
74fn build_table_remove_change(table: &Table, row_number: RowNumber, encoded: &EncodedRow) -> Change {
75	let shape: RowShape = (&table.columns).into();
76	Change {
77		origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
78		version: CommitVersion(0),
79		diffs: vec![Diff::Remove {
80			pre: build_encoded_columns(&shape, row_number, encoded),
81		}],
82		changed_at: DateTime::default(),
83	}
84}
85
86pub(crate) trait TableOperations {
87	fn insert_table(
88		&mut self,
89		table: &Table,
90		shape: &RowShape,
91		row: EncodedRow,
92		row_number: RowNumber,
93	) -> Result<EncodedRow>;
94
95	fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow>;
96
97	fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow>;
98}
99
100impl TableOperations for CommandTransaction {
101	fn insert_table(
102		&mut self,
103		table: &Table,
104		shape: &RowShape,
105		row: EncodedRow,
106		row_number: RowNumber,
107	) -> Result<EncodedRow> {
108		let row = TableRowInterceptor::pre_insert(self, table, row_number, row)?;
109
110		self.set(&RowKey::encoded(table.id, row_number), row.clone())?;
111
112		TableRowInterceptor::post_insert(self, table, row_number, &row)?;
113
114		// Track insertion for post-commit event emission
115		self.track_row_change(RowChange::TableInsert(TableRowInsertion {
116			table_id: table.id,
117			row_number,
118			encoded: row.clone(),
119		}));
120
121		// Track flow change for transactional view pre-commit processing
122		self.track_flow_change(build_table_insert_change(table, shape, row_number, &row));
123
124		Ok(row)
125	}
126
127	fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
128		let key = RowKey::encoded(table.id, id);
129
130		let pre = match self.get(&key)? {
131			Some(v) => v.row,
132			None => return Ok(row),
133		};
134
135		let row = TableRowInterceptor::pre_update(self, &table, id, row)?;
136
137		self.set(&key, row.clone())?;
138
139		TableRowInterceptor::post_update(self, &table, id, &row, &pre)?;
140
141		self.track_flow_change(build_table_update_change(&table, id, &pre, &row));
142
143		Ok(row)
144	}
145
146	fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
147		let key = RowKey::encoded(table.id, id);
148
149		let deleted_values = match self.get(&key)? {
150			Some(v) => v.row,
151			None => return Ok(EncodedRow(CowVec::new(vec![]))),
152		};
153
154		TableRowInterceptor::pre_delete(self, &table, id)?;
155
156		self.unset(&key, deleted_values.clone())?;
157
158		TableRowInterceptor::post_delete(self, &table, id, &deleted_values)?;
159
160		self.track_flow_change(build_table_remove_change(&table, id, &deleted_values));
161
162		Ok(deleted_values)
163	}
164}
165
166impl TableOperations for AdminTransaction {
167	fn insert_table(
168		&mut self,
169		table: &Table,
170		shape: &RowShape,
171		row: EncodedRow,
172		row_number: RowNumber,
173	) -> Result<EncodedRow> {
174		let row = TableRowInterceptor::pre_insert(self, table, row_number, row)?;
175
176		self.set(&RowKey::encoded(table.id, row_number), row.clone())?;
177
178		TableRowInterceptor::post_insert(self, table, row_number, &row)?;
179
180		// Track insertion for post-commit event emission
181		self.track_row_change(RowChange::TableInsert(TableRowInsertion {
182			table_id: table.id,
183			row_number,
184			encoded: row.clone(),
185		}));
186
187		// Track flow change for transactional view pre-commit processing
188		self.track_flow_change(build_table_insert_change(table, shape, row_number, &row));
189
190		Ok(row)
191	}
192
193	fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
194		let key = RowKey::encoded(table.id, id);
195
196		let pre = match self.get(&key)? {
197			Some(v) => v.row,
198			None => return Ok(row),
199		};
200
201		let row = TableRowInterceptor::pre_update(self, &table, id, row)?;
202
203		self.set(&key, row.clone())?;
204
205		TableRowInterceptor::post_update(self, &table, id, &row, &pre)?;
206
207		self.track_flow_change(build_table_update_change(&table, id, &pre, &row));
208
209		Ok(row)
210	}
211
212	fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
213		let key = RowKey::encoded(table.id, id);
214
215		let deleted_values = match self.get(&key)? {
216			Some(v) => v.row,
217			None => return Ok(EncodedRow(CowVec::new(vec![]))),
218		};
219
220		TableRowInterceptor::pre_delete(self, &table, id)?;
221
222		self.unset(&key, deleted_values.clone())?;
223
224		TableRowInterceptor::post_delete(self, &table, id, &deleted_values)?;
225
226		self.track_flow_change(build_table_remove_change(&table, id, &deleted_values));
227
228		Ok(deleted_values)
229	}
230}
231
232impl TableOperations for Transaction<'_> {
233	fn insert_table(
234		&mut self,
235		table: &Table,
236		shape: &RowShape,
237		row: EncodedRow,
238		row_number: RowNumber,
239	) -> Result<EncodedRow> {
240		match self {
241			Transaction::Command(txn) => txn.insert_table(table, shape, row, row_number),
242			Transaction::Admin(txn) => txn.insert_table(table, shape, row, row_number),
243			Transaction::Test(t) => t.inner.insert_table(table, shape, row, row_number),
244			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
245			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
246		}
247	}
248
249	fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
250		match self {
251			Transaction::Command(txn) => txn.update_table(table, id, row),
252			Transaction::Admin(txn) => txn.update_table(table, id, row),
253			Transaction::Test(t) => t.inner.update_table(table, id, row),
254			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
255			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
256		}
257	}
258
259	fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
260		match self {
261			Transaction::Command(txn) => txn.remove_from_table(table, id),
262			Transaction::Admin(txn) => txn.remove_from_table(table, id),
263			Transaction::Test(t) => t.inner.remove_from_table(table, id),
264			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
265			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
266		}
267	}
268}