reifydb-engine 0.4.12

Query execution and processing engine for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_core::{
	common::CommitVersion,
	encoded::{row::EncodedRow, shape::RowShape},
	interface::{
		catalog::{shape::ShapeId, table::Table},
		change::{Change, ChangeOrigin, Diff},
	},
	key::row::RowKey,
	value::column::{Column, columns::Columns, data::ColumnData},
};
use reifydb_transaction::{
	change::{RowChange, TableRowInsertion},
	interceptor::table_row::TableRowInterceptor,
	transaction::{Transaction, admin::AdminTransaction, command::CommandTransaction},
};
use reifydb_type::{
	fragment::Fragment,
	util::cowvec::CowVec,
	value::{datetime::DateTime, row_number::RowNumber},
};

use crate::Result;

fn build_encoded_columns(shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Columns {
	let fields = shape.fields();

	let mut columns_vec: Vec<Column> = Vec::with_capacity(fields.len());
	for field in fields.iter() {
		columns_vec.push(Column {
			name: Fragment::internal(&field.name),
			data: ColumnData::with_capacity(field.constraint.get_type(), 1),
		});
	}

	for (i, _) in fields.iter().enumerate() {
		columns_vec[i].data.push_value(shape.get_value(encoded, i));
	}

	Columns {
		row_numbers: CowVec::new(vec![row_number]),
		created_at: CowVec::new(vec![DateTime::from_nanos(encoded.created_at_nanos())]),
		updated_at: CowVec::new(vec![DateTime::from_nanos(encoded.updated_at_nanos())]),
		columns: CowVec::new(columns_vec),
	}
}

fn build_table_insert_change(table: &Table, shape: &RowShape, row_number: RowNumber, encoded: &EncodedRow) -> Change {
	Change {
		origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
		version: CommitVersion(0),
		diffs: vec![Diff::Insert {
			post: build_encoded_columns(shape, row_number, encoded),
		}],
		changed_at: DateTime::default(),
	}
}

fn build_table_update_change(table: &Table, row_number: RowNumber, pre: &EncodedRow, post: &EncodedRow) -> Change {
	let shape: RowShape = (&table.columns).into();
	Change {
		origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
		version: CommitVersion(0),
		diffs: vec![Diff::Update {
			pre: build_encoded_columns(&shape, row_number, pre),
			post: build_encoded_columns(&shape, row_number, post),
		}],
		changed_at: DateTime::default(),
	}
}

fn build_table_remove_change(table: &Table, row_number: RowNumber, encoded: &EncodedRow) -> Change {
	let shape: RowShape = (&table.columns).into();
	Change {
		origin: ChangeOrigin::Shape(ShapeId::Table(table.id)),
		version: CommitVersion(0),
		diffs: vec![Diff::Remove {
			pre: build_encoded_columns(&shape, row_number, encoded),
		}],
		changed_at: DateTime::default(),
	}
}

pub(crate) trait TableOperations {
	fn insert_table(
		&mut self,
		table: &Table,
		shape: &RowShape,
		row: EncodedRow,
		row_number: RowNumber,
	) -> Result<EncodedRow>;

	fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow>;

	fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow>;
}

impl TableOperations for CommandTransaction {
	fn insert_table(
		&mut self,
		table: &Table,
		shape: &RowShape,
		row: EncodedRow,
		row_number: RowNumber,
	) -> Result<EncodedRow> {
		let row = TableRowInterceptor::pre_insert(self, table, row_number, row)?;

		self.set(&RowKey::encoded(table.id, row_number), row.clone())?;

		TableRowInterceptor::post_insert(self, table, row_number, &row)?;

		// Track insertion for post-commit event emission
		self.track_row_change(RowChange::TableInsert(TableRowInsertion {
			table_id: table.id,
			row_number,
			encoded: row.clone(),
		}));

		// Track flow change for transactional view pre-commit processing
		self.track_flow_change(build_table_insert_change(table, shape, row_number, &row));

		Ok(row)
	}

	fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
		let key = RowKey::encoded(table.id, id);

		let pre = match self.get(&key)? {
			Some(v) => v.row,
			None => return Ok(row),
		};

		let row = TableRowInterceptor::pre_update(self, &table, id, row)?;

		self.set(&key, row.clone())?;

		TableRowInterceptor::post_update(self, &table, id, &row, &pre)?;

		self.track_flow_change(build_table_update_change(&table, id, &pre, &row));

		Ok(row)
	}

	fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
		let key = RowKey::encoded(table.id, id);

		let deleted_values = match self.get(&key)? {
			Some(v) => v.row,
			None => return Ok(EncodedRow(CowVec::new(vec![]))),
		};

		TableRowInterceptor::pre_delete(self, &table, id)?;

		self.unset(&key, deleted_values.clone())?;

		TableRowInterceptor::post_delete(self, &table, id, &deleted_values)?;

		self.track_flow_change(build_table_remove_change(&table, id, &deleted_values));

		Ok(deleted_values)
	}
}

impl TableOperations for AdminTransaction {
	fn insert_table(
		&mut self,
		table: &Table,
		shape: &RowShape,
		row: EncodedRow,
		row_number: RowNumber,
	) -> Result<EncodedRow> {
		let row = TableRowInterceptor::pre_insert(self, table, row_number, row)?;

		self.set(&RowKey::encoded(table.id, row_number), row.clone())?;

		TableRowInterceptor::post_insert(self, table, row_number, &row)?;

		// Track insertion for post-commit event emission
		self.track_row_change(RowChange::TableInsert(TableRowInsertion {
			table_id: table.id,
			row_number,
			encoded: row.clone(),
		}));

		// Track flow change for transactional view pre-commit processing
		self.track_flow_change(build_table_insert_change(table, shape, row_number, &row));

		Ok(row)
	}

	fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
		let key = RowKey::encoded(table.id, id);

		let pre = match self.get(&key)? {
			Some(v) => v.row,
			None => return Ok(row),
		};

		let row = TableRowInterceptor::pre_update(self, &table, id, row)?;

		self.set(&key, row.clone())?;

		TableRowInterceptor::post_update(self, &table, id, &row, &pre)?;

		self.track_flow_change(build_table_update_change(&table, id, &pre, &row));

		Ok(row)
	}

	fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
		let key = RowKey::encoded(table.id, id);

		let deleted_values = match self.get(&key)? {
			Some(v) => v.row,
			None => return Ok(EncodedRow(CowVec::new(vec![]))),
		};

		TableRowInterceptor::pre_delete(self, &table, id)?;

		self.unset(&key, deleted_values.clone())?;

		TableRowInterceptor::post_delete(self, &table, id, &deleted_values)?;

		self.track_flow_change(build_table_remove_change(&table, id, &deleted_values));

		Ok(deleted_values)
	}
}

impl TableOperations for Transaction<'_> {
	fn insert_table(
		&mut self,
		table: &Table,
		shape: &RowShape,
		row: EncodedRow,
		row_number: RowNumber,
	) -> Result<EncodedRow> {
		match self {
			Transaction::Command(txn) => txn.insert_table(table, shape, row, row_number),
			Transaction::Admin(txn) => txn.insert_table(table, shape, row, row_number),
			Transaction::Test(t) => t.inner.insert_table(table, shape, row, row_number),
			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
		}
	}

	fn update_table(&mut self, table: Table, id: RowNumber, row: EncodedRow) -> Result<EncodedRow> {
		match self {
			Transaction::Command(txn) => txn.update_table(table, id, row),
			Transaction::Admin(txn) => txn.update_table(table, id, row),
			Transaction::Test(t) => t.inner.update_table(table, id, row),
			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
		}
	}

	fn remove_from_table(&mut self, table: Table, id: RowNumber) -> Result<EncodedRow> {
		match self {
			Transaction::Command(txn) => txn.remove_from_table(table, id),
			Transaction::Admin(txn) => txn.remove_from_table(table, id),
			Transaction::Test(t) => t.inner.remove_from_table(table, id),
			Transaction::Query(_) => panic!("Write operations not supported on Query transaction"),
			Transaction::Replica(_) => panic!("Write operations not supported on Replica transaction"),
		}
	}
}