reifydb-catalog 0.4.13

Database catalog and metadata management for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_core::{
	interface::catalog::{
		column::ColumnIndex,
		id::{NamespaceId, RingBufferId, SeriesId, TableId, ViewId},
		series::SeriesKey,
		view::{
			View, ViewKind,
			ViewKind::{Deferred, Transactional},
			ViewStorageKind,
		},
	},
	key::{namespace_view::NamespaceViewKey, view::ViewKey},
};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use reifydb_type::{
	fragment::Fragment,
	value::{constraint::TypeConstraint, sumtype::SumTypeId},
};

use crate::{
	CatalogStore, Result,
	error::{CatalogError, CatalogObjectKind},
	store::{
		column::create::ColumnToCreate,
		sequence::system::SystemSequence,
		view::shape::{view, view_namespace},
	},
};

#[derive(Debug, Clone)]
pub struct ViewColumnToCreate {
	pub name: Fragment,
	pub fragment: Fragment,
	pub constraint: TypeConstraint,
}

#[derive(Debug, Clone)]
pub enum ViewStorageConfig {
	Table {
		underlying: TableId,
	},
	RingBuffer {
		underlying: RingBufferId,
		capacity: u64,
		propagate_evictions: bool,
	},
	Series {
		underlying: SeriesId,
		key: SeriesKey,
		tag: Option<SumTypeId>,
	},
}

impl Default for ViewStorageConfig {
	fn default() -> Self {
		ViewStorageConfig::Table {
			underlying: TableId(0),
		}
	}
}

#[derive(Debug, Clone)]
pub struct ViewToCreate {
	pub name: Fragment,
	pub namespace: NamespaceId,
	pub columns: Vec<ViewColumnToCreate>,
	pub storage: ViewStorageConfig,
}

impl CatalogStore {
	pub(crate) fn create_deferred_view(txn: &mut AdminTransaction, to_create: ViewToCreate) -> Result<View> {
		Self::create_view(txn, to_create, Deferred)
	}

	pub(crate) fn create_transactional_view(txn: &mut AdminTransaction, to_create: ViewToCreate) -> Result<View> {
		Self::create_view(txn, to_create, Transactional)
	}

	fn create_view(txn: &mut AdminTransaction, to_create: ViewToCreate, kind: ViewKind) -> Result<View> {
		let namespace_id = to_create.namespace;

		if let Some(view) = CatalogStore::find_view_by_name(
			&mut Transaction::Admin(&mut *txn),
			namespace_id,
			to_create.name.text(),
		)? {
			let namespace = CatalogStore::get_namespace(&mut Transaction::Admin(&mut *txn), namespace_id)?;
			return Err(CatalogError::AlreadyExists {
				kind: CatalogObjectKind::View,
				namespace: namespace.name().to_string(),
				name: view.name().to_string(),
				fragment: to_create.name.clone(),
			}
			.into());
		}

		let view_id = SystemSequence::next_view_id(txn)?;
		Self::store_view(txn, view_id, namespace_id, &to_create, kind)?;
		Self::link_view_to_namespace(txn, namespace_id, view_id, to_create.name.text())?;

		Self::insert_columns_for_view(txn, view_id, to_create)?;

		Self::get_view(&mut Transaction::Admin(&mut *txn), view_id)
	}

	fn store_view(
		txn: &mut AdminTransaction,
		view: ViewId,
		namespace: NamespaceId,
		to_create: &ViewToCreate,
		kind: ViewKind,
	) -> Result<()> {
		let mut row = view::SHAPE.allocate();
		view::SHAPE.set_u64(&mut row, view::ID, view);
		view::SHAPE.set_u64(&mut row, view::NAMESPACE, namespace);
		view::SHAPE.set_utf8(&mut row, view::NAME, to_create.name.text());
		view::SHAPE.set_u8(
			&mut row,
			view::KIND,
			match kind {
				Deferred => 0,
				Transactional => 1,
			},
		);
		view::SHAPE.set_u64(&mut row, view::PRIMARY_KEY, 0u64);

		// Write storage kind and configuration
		match &to_create.storage {
			ViewStorageConfig::Table {
				underlying,
			} => {
				view::SHAPE.set_u8(&mut row, view::STORAGE_KIND, ViewStorageKind::Table as u8);
				view::SHAPE.set_u64(&mut row, view::UNDERLYING_SHAPE_ID, *underlying);
				view::SHAPE.set_u64(&mut row, view::CAPACITY, 0u64);
				view::SHAPE.set_u8(&mut row, view::PROPAGATE_EVICTIONS, 0u8);
				view::SHAPE.set_utf8(&mut row, view::KEY_COLUMN, "");
				view::SHAPE.set_u8(&mut row, view::KEY_KIND, 0u8);
				view::SHAPE.set_u8(&mut row, view::PRECISION, 0u8);
				view::SHAPE.set_u64(&mut row, view::TAG_ID, 0u64);
			}
			ViewStorageConfig::RingBuffer {
				underlying,
				capacity,
				propagate_evictions,
			} => {
				view::SHAPE.set_u8(&mut row, view::STORAGE_KIND, ViewStorageKind::RingBuffer as u8);
				view::SHAPE.set_u64(&mut row, view::UNDERLYING_SHAPE_ID, *underlying);
				view::SHAPE.set_u64(&mut row, view::CAPACITY, *capacity);
				view::SHAPE.set_u8(
					&mut row,
					view::PROPAGATE_EVICTIONS,
					if *propagate_evictions {
						1
					} else {
						0
					},
				);
				view::SHAPE.set_utf8(&mut row, view::KEY_COLUMN, "");
				view::SHAPE.set_u8(&mut row, view::KEY_KIND, 0u8);
				view::SHAPE.set_u8(&mut row, view::PRECISION, 0u8);
				view::SHAPE.set_u64(&mut row, view::TAG_ID, 0u64);
			}
			ViewStorageConfig::Series {
				underlying,
				key,
				tag,
			} => {
				view::SHAPE.set_u8(&mut row, view::STORAGE_KIND, ViewStorageKind::Series as u8);
				view::SHAPE.set_u64(&mut row, view::UNDERLYING_SHAPE_ID, *underlying);
				view::SHAPE.set_u64(&mut row, view::CAPACITY, 0u64);
				view::SHAPE.set_u8(&mut row, view::PROPAGATE_EVICTIONS, 0u8);
				view::SHAPE.set_utf8(&mut row, view::KEY_COLUMN, key.column());
				let (key_kind_u8, precision_u8) = match key {
					SeriesKey::DateTime {
						precision,
						..
					} => (0u8, *precision as u8),
					SeriesKey::Integer {
						..
					} => (1u8, 0u8),
				};
				view::SHAPE.set_u8(&mut row, view::KEY_KIND, key_kind_u8);
				view::SHAPE.set_u8(&mut row, view::PRECISION, precision_u8);
				view::SHAPE.set_u64(&mut row, view::TAG_ID, tag.map(|t| t.0).unwrap_or(0));
			}
		}

		txn.set(&ViewKey::encoded(view), row)?;

		Ok(())
	}

	fn link_view_to_namespace(
		txn: &mut AdminTransaction,
		namespace: NamespaceId,
		view: ViewId,
		name: &str,
	) -> Result<()> {
		let mut row = view_namespace::SHAPE.allocate();
		view_namespace::SHAPE.set_u64(&mut row, view_namespace::ID, view);
		view_namespace::SHAPE.set_utf8(&mut row, view_namespace::NAME, name);
		txn.set(&NamespaceViewKey::encoded(namespace, view), row)?;
		Ok(())
	}

	fn insert_columns_for_view(txn: &mut AdminTransaction, view: ViewId, to_create: ViewToCreate) -> Result<()> {
		// Look up namespace name for error messages
		let namespace = Self::get_namespace(&mut Transaction::Admin(&mut *txn), to_create.namespace)?;

		for (idx, column_to_create) in to_create.columns.into_iter().enumerate() {
			Self::create_column(
				txn,
				view,
				ColumnToCreate {
					fragment: Some(column_to_create.fragment.clone()),
					namespace_name: namespace.name().to_string(),
					shape_name: to_create.name.text().to_string(),
					column: column_to_create.name.text().to_string(),
					constraint: column_to_create.constraint.clone(),
					properties: vec![],
					index: ColumnIndex(idx as u8),
					auto_increment: false,
					dictionary_id: None, // Views don't support dictionaries yet
				},
			)?;
		}
		Ok(())
	}
}

#[cfg(test)]
pub mod tests {
	use reifydb_core::{
		interface::catalog::id::{NamespaceId, ViewId},
		key::namespace_view::NamespaceViewKey,
	};
	use reifydb_engine::test_harness::create_test_admin_transaction;
	use reifydb_type::fragment::Fragment;

	use super::ViewStorageConfig;
	use crate::{
		CatalogStore,
		store::view::{create::ViewToCreate, shape::view_namespace},
		test_utils::ensure_test_namespace,
	};

	#[test]
	fn test_create_deferred_view() {
		let mut txn = create_test_admin_transaction();

		let namespace = ensure_test_namespace(&mut txn);

		let to_create = ViewToCreate {
			namespace: namespace.id(),
			name: Fragment::internal("test_view"),
			columns: vec![],
			storage: ViewStorageConfig::default(),
		};

		// First creation should succeed
		let result = CatalogStore::create_deferred_view(&mut txn, to_create.clone()).unwrap();
		assert_eq!(result.id(), ViewId(16385));
		assert_eq!(result.namespace(), NamespaceId(16385));
		assert_eq!(result.name(), "test_view");

		let err = CatalogStore::create_deferred_view(&mut txn, to_create).unwrap_err();
		assert_eq!(err.diagnostic().code, "CA_003");
	}

	#[test]
	fn test_view_linked_to_namespace() {
		let mut txn = create_test_admin_transaction();
		let namespace = ensure_test_namespace(&mut txn);

		let to_create = ViewToCreate {
			namespace: namespace.id(),
			name: Fragment::internal("test_view"),
			columns: vec![],
			storage: ViewStorageConfig::default(),
		};

		CatalogStore::create_deferred_view(&mut txn, to_create).unwrap();

		let to_create = ViewToCreate {
			namespace: namespace.id(),
			name: Fragment::internal("another_view"),
			columns: vec![],
			storage: ViewStorageConfig::default(),
		};

		CatalogStore::create_deferred_view(&mut txn, to_create).unwrap();

		let links: Vec<_> = txn
			.range(NamespaceViewKey::full_scan(namespace.id()), 1024)
			.unwrap()
			.collect::<Result<Vec<_>, _>>()
			.unwrap();
		assert_eq!(links.len(), 2);

		let link = &links[1];
		let row = &link.row;
		assert_eq!(view_namespace::SHAPE.get_u64(row, view_namespace::ID), 16385);
		assert_eq!(view_namespace::SHAPE.get_utf8(row, view_namespace::NAME), "test_view");

		let link = &links[0];
		let row = &link.row;
		assert_eq!(view_namespace::SHAPE.get_u64(row, view_namespace::ID), 16386);
		assert_eq!(view_namespace::SHAPE.get_utf8(row, view_namespace::NAME), "another_view");
	}

	#[test]
	fn test_create_deferred_view_missing_namespace() {
		let mut txn = create_test_admin_transaction();

		let to_create = ViewToCreate {
			namespace: NamespaceId(999), // Non-existent namespace
			name: Fragment::internal("my_view"),
			columns: vec![],
			storage: ViewStorageConfig::default(),
		};

		CatalogStore::create_deferred_view(&mut txn, to_create).unwrap_err();
	}
}