Skip to main content

reifydb_engine/transaction/operation/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::CommitVersion,
6	encoded::{row::EncodedRow, shape::RowShape},
7	interface::{
8		catalog::{shape::ShapeId, table::Table},
9		change::{Change, ChangeOrigin, Diff},
10	},
11	key::row::RowKey,
12	value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
13};
14use reifydb_transaction::{
15	change::{RowChange, TableRowInsertion},
16	interceptor::table_row::TableRowInterceptor,
17	transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
18};
19use reifydb_type::{
20	fragment::Fragment,
21	util::cowvec::CowVec,
22	value::{datetime::DateTime, row_number::RowNumber},
23};
24use smallvec::smallvec;
25
26use crate::Result;
27
28fn build_encoded_columns(shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Columns {
29	let fields = shape.fields();
30
31	let mut columns_vec: Vec<ColumnWithName> = Vec::with_capacity(fields.len());
32	for field in fields.iter() {
33		columns_vec.push(ColumnWithName {
34			name: Fragment::internal(&field.name),
35			data: ColumnBuffer::with_capacity(field.constraint.get_type(), 1),
36		});
37	}
38
39	for (i, _) in fields.iter().enumerate() {
40		columns_vec[i].data.push_value(shape.get_value(encoded, i));
41	}
42
43	Columns::with_system_columns(
44		columns_vec,
45		vec![row_number],
46		vec![DateTime::from_nanos(encoded.created_at_nanos())],
47		vec![DateTime::from_nanos(encoded.updated_at_nanos())],
48	)
49}
50
51fn build_table_insert_change(table: &Table, shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Change {
52	Change {
53		origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
54		version: CommitVersion(0),
55		diffs: smallvec![Diff::insert(build_encoded_columns(shape, row_number, encoded))],
56		changed_at: DateTime::default(),
57	}
58}
59
60fn build_table_update_change(table: &Table, row_number: RowNumber, pre: &EncodedRow, post: &EncodedRow) -> Change {
61	let shape: RowShape = (&table.columns).into();
62	Change {
63		origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
64		version: CommitVersion(0),
65		diffs: smallvec![Diff::update(
66			build_encoded_columns(&shape, row_number, pre),
67			build_encoded_columns(&shape, row_number, post),
68		)],
69		changed_at: DateTime::default(),
70	}
71}
72
73fn build_table_remove_change(table: &Table, row_number: RowNumber, encoded: &EncodedRow) -> Change {
74	let shape: RowShape = (&table.columns).into();
75	Change {
76		origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
77		version: CommitVersion(0),
78		diffs: smallvec![Diff::remove(build_encoded_columns(&shape, row_number, encoded))],
79		changed_at: DateTime::default(),
80	}
81}
82
83pub(crate) trait TableOperations {
84	fn insert_table(
85		&mut self,
86		table: &Table,
87		shape: &RowShape,
88		row: EncodedRow,
89		row_number: RowNumber,
90	) -> Result<EncodedRow>;
91
92	fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow>;
93
94	fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow>;
95}
96
97impl TableOperations for CommandTransaction {
98	fn insert_table(
99		&mut self,
100		table: &Table,
101		shape: &RowShape,
102		row: EncodedRow,
103		row_number: RowNumber,
104	) -> Result<EncodedRow> {
105		let row = TableRowInterceptor::pre_insert(self, table, row_number, row)?;
106
107		self.set(&RowKey::encoded(table.id, row_number), row.clone())?;
108
109		TableRowInterceptor::post_insert(self, table, row_number, &row)?;
110
111		// Track insertion for post-commit event emission
112		self.track_row_change(RowChange::TableInsert(TableRowInsertion {
113			table_id: table.id,
114			row_number,
115			encoded: row.clone(),
116		}));
117
118		// Track flow change for transactional view pre-commit processing
119		self.track_flow_change(build_table_insert_change(table, shape, row_number, &row));
120
121		Ok(row)
122	}
123
124	fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
125		let key = RowKey::encoded(table.id, id);
126
127		let pre = match self.get(&key)? {
128			Some(v) => v.row,
129			None => return Ok(row),
130		};
131
132		let row = TableRowInterceptor::pre_update(self, &table, id, row)?;
133
134		if self.get_committed(&key)?.is_some() {
135			self.mark_preexisting(&key)?;
136		}
137		self.set(&key, row.clone())?;
138
139		TableRowInterceptor::post_update(self, &table, id, &row, &pre)?;
140
141		self.track_flow_change(build_table_update_change(&table, id, &pre, &row));
142
143		Ok(row)
144	}
145
146	fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
147		let key = RowKey::encoded(table.id, id);
148
149		let displayed = match self.get(&key)? {
150			Some(v) => v.row,
151			None => return Ok(EncodedRow(CowVec::new(vec![]))),
152		};
153		let committed = self.get_committed(&key)?.map(|v| v.row);
154
155		TableRowInterceptor::pre_delete(self, &table, id)?;
156
157		let pre_for_cdc = committed.clone().unwrap_or_else(|| displayed.clone());
158
159		if committed.is_some() {
160			self.mark_preexisting(&key)?;
161		}
162		self.unset(&key, pre_for_cdc.clone())?;
163
164		TableRowInterceptor::post_delete(self, &table, id, &pre_for_cdc)?;
165
166		self.track_flow_change(build_table_remove_change(&table, id, &pre_for_cdc));
167
168		Ok(displayed)
169	}
170}
171
172impl TableOperations for AdminTransaction {
173	fn insert_table(
174		&mut self,
175		table: &Table,
176		shape: &RowShape,
177		row: EncodedRow,
178		row_number: RowNumber,
179	) -> Result<EncodedRow> {
180		let row = TableRowInterceptor::pre_insert(self, table, row_number, row)?;
181
182		self.set(&RowKey::encoded(table.id, row_number), row.clone())?;
183
184		TableRowInterceptor::post_insert(self, table, row_number, &row)?;
185
186		// Track insertion for post-commit event emission
187		self.track_row_change(RowChange::TableInsert(TableRowInsertion {
188			table_id: table.id,
189			row_number,
190			encoded: row.clone(),
191		}));
192
193		// Track flow change for transactional view pre-commit processing
194		self.track_flow_change(build_table_insert_change(table, shape, row_number, &row));
195
196		Ok(row)
197	}
198
199	fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
200		let key = RowKey::encoded(table.id, id);
201
202		let pre = match self.get(&key)? {
203			Some(v) => v.row,
204			None => return Ok(row),
205		};
206
207		let row = TableRowInterceptor::pre_update(self, &table, id, row)?;
208
209		if self.get_committed(&key)?.is_some() {
210			self.mark_preexisting(&key)?;
211		}
212		self.set(&key, row.clone())?;
213
214		TableRowInterceptor::post_update(self, &table, id, &row, &pre)?;
215
216		self.track_flow_change(build_table_update_change(&table, id, &pre, &row));
217
218		Ok(row)
219	}
220
221	fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
222		let key = RowKey::encoded(table.id, id);
223
224		let displayed = match self.get(&key)? {
225			Some(v) => v.row,
226			None => return Ok(EncodedRow(CowVec::new(vec![]))),
227		};
228		let committed = self.get_committed(&key)?.map(|v| v.row);
229
230		TableRowInterceptor::pre_delete(self, &table, id)?;
231
232		let pre_for_cdc = committed.clone().unwrap_or_else(|| displayed.clone());
233
234		if committed.is_some() {
235			self.mark_preexisting(&key)?;
236		}
237		self.unset(&key, pre_for_cdc.clone())?;
238
239		TableRowInterceptor::post_delete(self, &table, id, &pre_for_cdc)?;
240
241		self.track_flow_change(build_table_remove_change(&table, id, &pre_for_cdc));
242
243		Ok(displayed)
244	}
245}
246
247impl TableOperations for Transaction<'_> {
248	fn insert_table(
249		&mut self,
250		table: &Table,
251		shape: &RowShape,
252		row: EncodedRow,
253		row_number: RowNumber,
254	) -> Result<EncodedRow> {
255		match self {
256			Transaction::Command(txn) => txn.insert_table(table, shape, row, row_number),
257			Transaction::Admin(txn) => txn.insert_table(table, shape, row, row_number),
258			Transaction::Test(t) => t.inner.insert_table(table, shape, row, row_number),
259			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
260			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
261		}
262	}
263
264	fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
265		match self {
266			Transaction::Command(txn) => txn.update_table(table, id, row),
267			Transaction::Admin(txn) => txn.update_table(table, id, row),
268			Transaction::Test(t) => t.inner.update_table(table, id, row),
269			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
270			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
271		}
272	}
273
274	fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
275		match self {
276			Transaction::Command(txn) => txn.remove_from_table(table, id),
277			Transaction::Admin(txn) => txn.remove_from_table(table, id),
278			Transaction::Test(t) => t.inner.remove_from_table(table, id),
279			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
280			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
281		}
282	}
283}