reifydb-catalog 0.4.13

Database catalog and metadata management for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_core::{
	common::CommitVersion,
	interface::catalog::{
		id::{NamespaceId, TableId},
		table::Table,
	},
};

use crate::materialized::{MaterializedCatalog, MultiVersionTable};

impl MaterializedCatalog {
	/// Find a table by ID at a specific version
	pub fn find_table_at(&self, table: TableId, version: CommitVersion) -> Option<Table> {
		self.tables.get(&table).and_then(|entry| {
			let multi = entry.value();
			multi.get(version)
		})
	}

	/// Find a table by name in a namespace at a specific version
	pub fn find_table_by_name_at(
		&self,
		namespace: NamespaceId,
		name: &str,
		version: CommitVersion,
	) -> Option<Table> {
		self.tables_by_name.get(&(namespace, name.to_string())).and_then(|entry| {
			let table_id = *entry.value();
			self.find_table_at(table_id, version)
		})
	}

	/// Find a table by ID (returns latest version)
	pub fn find_table(&self, table: TableId) -> Option<Table> {
		self.tables.get(&table).and_then(|entry| {
			let multi = entry.value();
			multi.get_latest()
		})
	}

	/// Find a table by name in a namespace (returns latest version)
	pub fn find_table_by_name(&self, namespace: NamespaceId, name: &str) -> Option<Table> {
		self.tables_by_name.get(&(namespace, name.to_string())).and_then(|entry| {
			let table_id = *entry.value();
			self.find_table(table_id)
		})
	}

	/// List the latest version of all tables.
	pub fn list_tables(&self) -> Vec<Table> {
		self.tables.iter().filter_map(|entry| entry.value().get_latest()).collect()
	}

	pub fn set_table(&self, id: TableId, version: CommitVersion, table: Option<Table>) {
		if let Some(entry) = self.tables.get(&id)
			&& let Some(pre) = entry.value().get_latest()
		{
			// Remove old name from index
			self.tables_by_name.remove(&(pre.namespace, pre.name.clone()));
		}

		let multi = self.tables.get_or_insert_with(id, MultiVersionTable::new);
		if let Some(new) = table {
			self.tables_by_name.insert((new.namespace, new.name.clone()), id);
			multi.value().insert(version, new);
		} else {
			multi.value().remove(version);
		}
	}
}

#[cfg(test)]
pub mod tests {
	use reifydb_core::interface::catalog::{
		column::{Column, ColumnIndex},
		id::ColumnId,
	};
	use reifydb_type::value::{constraint::TypeConstraint, r#type::Type};

	use super::*;

	fn create_test_table(id: TableId, namespace: NamespaceId, name: &str) -> Table {
		Table {
			id,
			namespace,
			name: name.to_string(),
			columns: vec![
				Column {
					id: ColumnId(1),
					name: "id".to_string(),
					constraint: TypeConstraint::unconstrained(Type::Int4),
					properties: vec![],
					index: ColumnIndex(0),
					auto_increment: true,
					dictionary_id: None,
				},
				Column {
					id: ColumnId(2),
					name: "name".to_string(),
					constraint: TypeConstraint::unconstrained(Type::Utf8),
					properties: vec![],
					index: ColumnIndex(1),
					auto_increment: false,
					dictionary_id: None,
				},
			],
			primary_key: None,
			underlying: false,
		}
	}

	#[test]
	fn test_set_and_find_table() {
		let catalog = MaterializedCatalog::new();
		let table_id = TableId(1);
		let namespace_id = NamespaceId::SYSTEM;
		let table = create_test_table(table_id, namespace_id, "test_table");

		// Set table at version 1
		catalog.set_table(table_id, CommitVersion(1), Some(table.clone()));

		// Find table at version 1
		let found = catalog.find_table_at(table_id, CommitVersion(1));
		assert_eq!(found, Some(table.clone()));

		// Find table at later version (should return same table)
		let found = catalog.find_table_at(table_id, CommitVersion(5));
		assert_eq!(found, Some(table));

		// Table shouldn't exist at version 0
		let found = catalog.find_table_at(table_id, CommitVersion(0));
		assert_eq!(found, None);
	}

	#[test]
	fn test_find_table_by_name() {
		let catalog = MaterializedCatalog::new();
		let table_id = TableId(1);
		let namespace_id = NamespaceId::SYSTEM;
		let table = create_test_table(table_id, namespace_id, "named_table");

		// Set table
		catalog.set_table(table_id, CommitVersion(1), Some(table.clone()));

		// Find by name
		let found = catalog.find_table_by_name_at(namespace_id, "named_table", CommitVersion(1));
		assert_eq!(found, Some(table));

		// Shouldn't find with wrong name
		let found = catalog.find_table_by_name_at(namespace_id, "wrong_name", CommitVersion(1));
		assert_eq!(found, None);

		// Shouldn't find in wrong namespace
		let found = catalog.find_table_by_name_at(NamespaceId::DEFAULT, "named_table", CommitVersion(1));
		assert_eq!(found, None);
	}

	#[test]
	fn test_table_rename() {
		let catalog = MaterializedCatalog::new();
		let table_id = TableId(1);
		let namespace_id = NamespaceId::SYSTEM;

		// Create and set initial table
		let table_v1 = create_test_table(table_id, namespace_id, "old_name");
		catalog.set_table(table_id, CommitVersion(1), Some(table_v1.clone()));

		// Verify initial state
		assert!(catalog.find_table_by_name_at(namespace_id, "old_name", CommitVersion(1)).is_some());
		assert!(catalog.find_table_by_name_at(namespace_id, "new_name", CommitVersion(1)).is_none());

		// Rename the table
		let mut table_v2 = table_v1.clone();
		table_v2.name = "new_name".to_string();
		catalog.set_table(table_id, CommitVersion(2), Some(table_v2.clone()));

		// Old name should be gone
		assert!(catalog.find_table_by_name_at(namespace_id, "old_name", CommitVersion(2)).is_none());

		// New name can be found
		assert_eq!(
			catalog.find_table_by_name_at(namespace_id, "new_name", CommitVersion(2)),
			Some(table_v2.clone())
		);

		// Historical query at version 1 should still show old name
		assert_eq!(catalog.find_table_at(table_id, CommitVersion(1)), Some(table_v1));

		// Current version should show new name
		assert_eq!(catalog.find_table_at(table_id, CommitVersion(2)), Some(table_v2));
	}

	#[test]
	fn test_table_move_between_namespaces() {
		let catalog = MaterializedCatalog::new();
		let table_id = TableId(1);
		let namespace1 = NamespaceId::SYSTEM;
		let namespace2 = NamespaceId::DEFAULT;

		// Create table in namespace1
		let table_v1 = create_test_table(table_id, namespace1, "movable_table");
		catalog.set_table(table_id, CommitVersion(1), Some(table_v1.clone()));

		// Verify it's in namespace1
		assert!(catalog.find_table_by_name_at(namespace1, "movable_table", CommitVersion(1)).is_some());
		assert!(catalog.find_table_by_name_at(namespace2, "movable_table", CommitVersion(1)).is_none());

		// Move to namespace2
		let mut table_v2 = table_v1.clone();
		table_v2.namespace = namespace2;
		catalog.set_table(table_id, CommitVersion(2), Some(table_v2.clone()));

		// Should no longer be in namespace1
		assert!(catalog.find_table_by_name_at(namespace1, "movable_table", CommitVersion(2)).is_none());

		// Should now be in namespace2
		assert!(catalog.find_table_by_name_at(namespace2, "movable_table", CommitVersion(2)).is_some());
	}

	#[test]
	fn test_table_deletion() {
		let catalog = MaterializedCatalog::new();
		let table_id = TableId(1);
		let namespace_id = NamespaceId::SYSTEM;

		// Create and set table
		let table = create_test_table(table_id, namespace_id, "deletable_table");
		catalog.set_table(table_id, CommitVersion(1), Some(table.clone()));

		// Verify it exists
		assert_eq!(catalog.find_table_at(table_id, CommitVersion(1)), Some(table.clone()));
		assert!(catalog.find_table_by_name_at(namespace_id, "deletable_table", CommitVersion(1)).is_some());

		// Delete the table
		catalog.set_table(table_id, CommitVersion(2), None);

		// Should not exist at version 2
		assert_eq!(catalog.find_table_at(table_id, CommitVersion(2)), None);
		assert!(catalog.find_table_by_name_at(namespace_id, "deletable_table", CommitVersion(2)).is_none());

		// Should still exist at version 1 (historical)
		assert_eq!(catalog.find_table_at(table_id, CommitVersion(1)), Some(table));
	}

	#[test]
	fn test_multiple_tables_in_namespace() {
		let catalog = MaterializedCatalog::new();
		let namespace_id = NamespaceId::SYSTEM;

		let table1 = create_test_table(TableId(1), namespace_id, "table1");
		let table2 = create_test_table(TableId(2), namespace_id, "table2");
		let table3 = create_test_table(TableId(3), namespace_id, "table3");

		// Set multiple tables
		catalog.set_table(TableId(1), CommitVersion(1), Some(table1.clone()));
		catalog.set_table(TableId(2), CommitVersion(1), Some(table2.clone()));
		catalog.set_table(TableId(3), CommitVersion(1), Some(table3.clone()));

		// All should be findable
		assert_eq!(catalog.find_table_by_name_at(namespace_id, "table1", CommitVersion(1)), Some(table1));
		assert_eq!(catalog.find_table_by_name_at(namespace_id, "table2", CommitVersion(1)), Some(table2));
		assert_eq!(catalog.find_table_by_name_at(namespace_id, "table3", CommitVersion(1)), Some(table3));
	}

	#[test]
	fn test_table_versioning() {
		let catalog = MaterializedCatalog::new();
		let table_id = TableId(1);
		let namespace_id = NamespaceId::SYSTEM;

		// Create multiple versions
		let table_v1 = create_test_table(table_id, namespace_id, "table_v1");
		let mut table_v2 = table_v1.clone();
		table_v2.name = "table_v2".to_string();
		let mut table_v3 = table_v2.clone();
		table_v3.name = "table_v3".to_string();

		// Set at different versions
		catalog.set_table(table_id, CommitVersion(10), Some(table_v1.clone()));
		catalog.set_table(table_id, CommitVersion(20), Some(table_v2.clone()));
		catalog.set_table(table_id, CommitVersion(30), Some(table_v3.clone()));

		// Query at different versions
		assert_eq!(catalog.find_table_at(table_id, CommitVersion(5)), None);
		assert_eq!(catalog.find_table_at(table_id, CommitVersion(10)), Some(table_v1.clone()));
		assert_eq!(catalog.find_table_at(table_id, CommitVersion(15)), Some(table_v1));
		assert_eq!(catalog.find_table_at(table_id, CommitVersion(20)), Some(table_v2.clone()));
		assert_eq!(catalog.find_table_at(table_id, CommitVersion(25)), Some(table_v2));
		assert_eq!(catalog.find_table_at(table_id, CommitVersion(30)), Some(table_v3.clone()));
		assert_eq!(catalog.find_table_at(table_id, CommitVersion(100)), Some(table_v3));
	}

	#[test]
	fn test_find_latest_table() {
		let catalog = MaterializedCatalog::new();
		let table_id = TableId(1);
		let namespace_id = NamespaceId::SYSTEM;

		// Empty catalog should return None
		assert_eq!(catalog.find_table(table_id), None);

		// Create multiple versions
		let table_v1 = create_test_table(table_id, namespace_id, "table_v1");
		let mut table_v2 = table_v1.clone();
		table_v2.name = "table_v2".to_string();

		catalog.set_table(table_id, CommitVersion(10), Some(table_v1));
		catalog.set_table(table_id, CommitVersion(20), Some(table_v2.clone()));

		// Should return latest (v2)
		assert_eq!(catalog.find_table(table_id), Some(table_v2));
	}

	#[test]
	fn test_find_latest_table_deleted() {
		let catalog = MaterializedCatalog::new();
		let table_id = TableId(1);
		let namespace_id = NamespaceId::SYSTEM;

		let table = create_test_table(table_id, namespace_id, "test_table");
		catalog.set_table(table_id, CommitVersion(10), Some(table));

		// Delete at latest version
		catalog.set_table(table_id, CommitVersion(20), None);

		// Should return None (deleted at latest)
		assert_eq!(catalog.find_table(table_id), None);
	}

	#[test]
	fn test_find_latest_table_by_name() {
		let catalog = MaterializedCatalog::new();
		let namespace_id = NamespaceId::SYSTEM;
		let table_id = TableId(1);

		// Empty catalog should return None
		assert_eq!(catalog.find_table_by_name(namespace_id, "test_table"), None);

		// Create table
		let table_v1 = create_test_table(table_id, namespace_id, "test_table");
		let mut table_v2 = table_v1.clone();
		table_v2.name = "renamed_table".to_string();

		catalog.set_table(table_id, CommitVersion(10), Some(table_v1));
		catalog.set_table(table_id, CommitVersion(20), Some(table_v2.clone()));

		// Old name should not be found
		assert_eq!(catalog.find_table_by_name(namespace_id, "test_table"), None);

		// New name should be found with latest version
		assert_eq!(catalog.find_table_by_name(namespace_id, "renamed_table"), Some(table_v2));
	}
}