reifydb-catalog 0.4.13

Database catalog and metadata management for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_core::{
	interface::catalog::{
		column::ColumnIndex,
		id::{ColumnId, NamespaceId, TableId},
		property::ColumnPropertyKind,
		shape::ShapeId,
		table::Table,
	},
	key::{namespace_table::NamespaceTableKey, table::TableKey},
	retention::RetentionStrategy,
};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use reifydb_type::{
	fragment::Fragment,
	value::{constraint::TypeConstraint, dictionary::DictionaryId},
};

use crate::{
	CatalogStore, Result,
	error::{CatalogError, CatalogObjectKind},
	store::{
		column::create::ColumnToCreate,
		retention_strategy::create::create_shape_retention_strategy,
		sequence::system::SystemSequence,
		table::shape::{table, table_namespace},
	},
};

#[derive(Debug, Clone)]
pub struct TableColumnToCreate {
	pub name: Fragment,
	pub fragment: Fragment,
	pub constraint: TypeConstraint,
	pub properties: Vec<ColumnPropertyKind>,
	pub auto_increment: bool,
	pub dictionary_id: Option<DictionaryId>,
}

#[derive(Debug, Clone)]
pub struct TableToCreate {
	pub name: Fragment,
	pub namespace: NamespaceId,
	pub columns: Vec<TableColumnToCreate>,
	pub retention_strategy: Option<RetentionStrategy>,
	pub underlying: bool,
}

impl CatalogStore {
	pub(crate) fn create_table(txn: &mut AdminTransaction, to_create: TableToCreate) -> Result<Table> {
		let namespace_id = to_create.namespace;

		if let Some(table) = CatalogStore::find_table_by_name(
			&mut Transaction::Admin(&mut *txn),
			namespace_id,
			to_create.name.text(),
		)? {
			let namespace = CatalogStore::get_namespace(&mut Transaction::Admin(&mut *txn), namespace_id)?;
			return Err(CatalogError::AlreadyExists {
				kind: CatalogObjectKind::Table,
				namespace: namespace.name().to_string(),
				name: table.name,
				fragment: to_create.name.clone(),
			}
			.into());
		}

		let table_id = SystemSequence::next_table_id(txn)?;
		Self::store_table(txn, table_id, namespace_id, &to_create)?;
		Self::link_table_to_namespace(txn, namespace_id, table_id, to_create.name.text())?;

		if let Some(retention_strategy) = &to_create.retention_strategy {
			create_shape_retention_strategy(txn, ShapeId::Table(table_id), retention_strategy)?;
		}

		Self::insert_columns(txn, table_id, to_create)?;

		Self::get_table(&mut Transaction::Admin(&mut *txn), table_id)
	}

	fn store_table(
		txn: &mut AdminTransaction,
		table: TableId,
		namespace: NamespaceId,
		to_create: &TableToCreate,
	) -> Result<()> {
		let mut row = table::SHAPE.allocate();
		table::SHAPE.set_u64(&mut row, table::ID, table);
		table::SHAPE.set_u64(&mut row, table::NAMESPACE, namespace);
		table::SHAPE.set_utf8(&mut row, table::NAME, to_create.name.text());

		// Initialize with no primary key
		table::SHAPE.set_u64(&mut row, table::PRIMARY_KEY, 0u64);
		table::SHAPE.set_u8(
			&mut row,
			table::UNDERLYING,
			if to_create.underlying {
				1
			} else {
				0
			},
		);

		txn.set(&TableKey::encoded(table), row)?;

		Ok(())
	}

	fn link_table_to_namespace(
		txn: &mut AdminTransaction,
		namespace: NamespaceId,
		table: TableId,
		name: &str,
	) -> Result<()> {
		let mut row = table_namespace::SHAPE.allocate();
		table_namespace::SHAPE.set_u64(&mut row, table_namespace::ID, table);
		table_namespace::SHAPE.set_utf8(&mut row, table_namespace::NAME, name);
		txn.set(&NamespaceTableKey::encoded(namespace, table), row)?;
		Ok(())
	}

	fn insert_columns(txn: &mut AdminTransaction, table: TableId, to_create: TableToCreate) -> Result<()> {
		// Look up namespace name for error messages
		let namespace_name = Self::find_namespace(&mut Transaction::Admin(&mut *txn), to_create.namespace)?
			.map(|s| s.name().to_string())
			.unwrap_or_else(|| format!("namespace_{}", to_create.namespace));

		for (idx, column_to_create) in to_create.columns.into_iter().enumerate() {
			Self::create_column(
				txn,
				table,
				ColumnToCreate {
					fragment: Some(column_to_create.fragment.clone()),
					namespace_name: namespace_name.clone(),
					shape_name: to_create.name.text().to_string(),
					column: column_to_create.name.text().to_string(),
					constraint: column_to_create.constraint.clone(),
					properties: column_to_create.properties.clone(),
					index: ColumnIndex(idx as u8),
					auto_increment: column_to_create.auto_increment,
					dictionary_id: column_to_create.dictionary_id,
				},
			)?;
		}
		Ok(())
	}

	/// Create a table with a specific ID and column IDs. Used for bootstrapping system shapes.
	/// Skips duplicate check - caller must ensure uniqueness.
	pub(crate) fn create_table_with_id(
		txn: &mut AdminTransaction,
		table_id: TableId,
		to_create: TableToCreate,
		column_ids: &[ColumnId],
	) -> Result<Table> {
		assert_eq!(column_ids.len(), to_create.columns.len(), "column_ids length must match columns length");

		let namespace_id = to_create.namespace;

		Self::store_table(txn, table_id, namespace_id, &to_create)?;
		Self::link_table_to_namespace(txn, namespace_id, table_id, to_create.name.text())?;

		if let Some(retention_strategy) = &to_create.retention_strategy {
			create_shape_retention_strategy(txn, ShapeId::Table(table_id), retention_strategy)?;
		}

		Self::insert_columns_with_ids(txn, table_id, to_create, column_ids)?;

		Self::get_table(&mut Transaction::Admin(&mut *txn), table_id)
	}

	fn insert_columns_with_ids(
		txn: &mut AdminTransaction,
		table: TableId,
		to_create: TableToCreate,
		column_ids: &[ColumnId],
	) -> Result<()> {
		for (idx, (column_to_create, &col_id)) in
			to_create.columns.into_iter().zip(column_ids.iter()).enumerate()
		{
			Self::create_column_with_id(
				txn,
				col_id,
				table,
				ColumnToCreate {
					fragment: Some(column_to_create.fragment.clone()),
					namespace_name: String::new(),
					shape_name: String::new(),
					column: column_to_create.name.text().to_string(),
					constraint: column_to_create.constraint.clone(),
					properties: column_to_create.properties.clone(),
					index: ColumnIndex(idx as u8),
					auto_increment: column_to_create.auto_increment,
					dictionary_id: column_to_create.dictionary_id,
				},
			)?;
		}
		Ok(())
	}
}

#[cfg(test)]
pub mod tests {
	use reifydb_core::{
		interface::catalog::id::{NamespaceId, TableId},
		key::namespace_table::NamespaceTableKey,
	};
	use reifydb_engine::test_harness::create_test_admin_transaction;
	use reifydb_type::fragment::Fragment;

	use crate::{
		CatalogStore,
		store::table::{create::TableToCreate, shape::table_namespace},
		test_utils::ensure_test_namespace,
	};

	#[test]
	fn test_create_table() {
		let mut txn = create_test_admin_transaction();

		let test_namespace = ensure_test_namespace(&mut txn);

		let to_create = TableToCreate {
			namespace: test_namespace.id(),
			name: Fragment::internal("test_table"),
			columns: vec![],
			retention_strategy: None,
			underlying: false,
		};

		// First creation should succeed
		let result = CatalogStore::create_table(&mut txn, to_create.clone()).unwrap();
		assert_eq!(result.id, TableId(16385));
		assert_eq!(result.namespace, NamespaceId(16385));
		assert_eq!(result.name, "test_table");

		let err = CatalogStore::create_table(&mut txn, to_create).unwrap_err();
		assert_eq!(err.diagnostic().code, "CA_003");
	}

	#[test]
	fn test_table_linked_to_namespace() {
		let mut txn = create_test_admin_transaction();
		let test_namespace = ensure_test_namespace(&mut txn);

		let to_create = TableToCreate {
			namespace: test_namespace.id(),
			name: Fragment::internal("test_table"),
			columns: vec![],
			retention_strategy: None,
			underlying: false,
		};

		CatalogStore::create_table(&mut txn, to_create).unwrap();

		let to_create = TableToCreate {
			namespace: test_namespace.id(),
			name: Fragment::internal("another_table"),
			columns: vec![],
			retention_strategy: None,
			underlying: false,
		};

		CatalogStore::create_table(&mut txn, to_create).unwrap();

		let links: Vec<_> = txn
			.range(NamespaceTableKey::full_scan(test_namespace.id()), 1024)
			.unwrap()
			.collect::<Result<Vec<_>, _>>()
			.unwrap();
		assert_eq!(links.len(), 2);

		let link = &links[1];
		let row = &link.row;
		assert_eq!(table_namespace::SHAPE.get_u64(row, table_namespace::ID), 16385);
		assert_eq!(table_namespace::SHAPE.get_utf8(row, table_namespace::NAME), "test_table");

		let link = &links[0];
		let row = &link.row;
		assert_eq!(table_namespace::SHAPE.get_u64(row, table_namespace::ID), 16386);
		assert_eq!(table_namespace::SHAPE.get_utf8(row, table_namespace::NAME), "another_table");
	}
}