reifydb-catalog 0.4.13

Database catalog and metadata management for ReifyDB
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2025 ReifyDB

use reifydb_core::{
	interface::catalog::{
		column::ColumnIndex,
		id::{ColumnId, NamespaceId, SeriesId},
		property::ColumnPropertyKind,
		series::{Series, SeriesKey},
	},
	key::{
		namespace_series::NamespaceSeriesKey,
		series::{SeriesKey as SeriesStorageKey, SeriesMetadataKey},
	},
};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use reifydb_type::{
	fragment::Fragment,
	value::{constraint::TypeConstraint, dictionary::DictionaryId, sumtype::SumTypeId},
};

use crate::{
	CatalogStore, Result,
	error::{CatalogError, CatalogObjectKind},
	store::{
		column::create::ColumnToCreate,
		sequence::system::SystemSequence,
		series::shape::{series, series_metadata, series_namespace},
	},
};

#[derive(Debug, Clone)]
pub struct SeriesColumnToCreate {
	pub name: Fragment,
	pub fragment: Fragment,
	pub constraint: TypeConstraint,
	pub properties: Vec<ColumnPropertyKind>,
	pub auto_increment: bool,
	pub dictionary_id: Option<DictionaryId>,
}

#[derive(Debug, Clone)]
pub struct SeriesToCreate {
	pub name: Fragment,
	pub namespace: NamespaceId,
	pub columns: Vec<SeriesColumnToCreate>,
	pub tag: Option<SumTypeId>,
	pub key: SeriesKey,
	pub underlying: bool,
}

impl CatalogStore {
	pub(crate) fn create_series(txn: &mut AdminTransaction, to_create: SeriesToCreate) -> Result<Series> {
		let namespace_id = to_create.namespace;
		Self::reject_existing_series(txn, namespace_id, &to_create.name)?;

		let series_id = SystemSequence::next_series_id(txn)?;
		Self::install_series(txn, series_id, namespace_id, &to_create)?;
		Self::insert_series_columns(txn, series_id, &to_create)?;
		Self::initialize_series_metadata(txn, series_id)?;
		Self::get_series(&mut Transaction::Admin(&mut *txn), series_id)
	}

	#[inline]
	fn reject_existing_series(
		txn: &mut AdminTransaction,
		namespace_id: NamespaceId,
		name: &Fragment,
	) -> Result<()> {
		let Some(series) = CatalogStore::find_series_by_name(
			&mut Transaction::Admin(&mut *txn),
			namespace_id,
			name.text(),
		)?
		else {
			return Ok(());
		};
		let namespace = CatalogStore::get_namespace(&mut Transaction::Admin(&mut *txn), namespace_id)?;
		Err(CatalogError::AlreadyExists {
			kind: CatalogObjectKind::Series,
			namespace: namespace.name().to_string(),
			name: series.name,
			fragment: name.clone(),
		}
		.into())
	}

	#[inline]
	fn install_series(
		txn: &mut AdminTransaction,
		series_id: SeriesId,
		namespace_id: NamespaceId,
		to_create: &SeriesToCreate,
	) -> Result<()> {
		Self::store_series(txn, series_id, namespace_id, to_create)?;
		Self::link_series_to_namespace(txn, namespace_id, series_id, to_create.name.text())
	}

	fn store_series(
		txn: &mut AdminTransaction,
		series_id: SeriesId,
		namespace: NamespaceId,
		to_create: &SeriesToCreate,
	) -> Result<()> {
		let mut row = series::SHAPE.allocate();
		series::SHAPE.set_u64(&mut row, series::ID, series_id);
		series::SHAPE.set_u64(&mut row, series::NAMESPACE, namespace);
		series::SHAPE.set_utf8(&mut row, series::NAME, to_create.name.text());
		series::SHAPE.set_u64(&mut row, series::TAG, to_create.tag.map(|t| *t).unwrap_or(0));
		series::SHAPE.set_utf8(&mut row, series::KEY_COLUMN, to_create.key.column());
		let (key_kind_u8, precision_u8) = match &to_create.key {
			SeriesKey::DateTime {
				precision,
				..
			} => (0u8, *precision as u8),
			SeriesKey::Integer {
				..
			} => (1u8, 0u8),
		};
		series::SHAPE.set_u8(&mut row, series::KEY_KIND, key_kind_u8);
		series::SHAPE.set_u8(&mut row, series::PRECISION, precision_u8);
		series::SHAPE.set_u64(&mut row, series::PRIMARY_KEY, 0u64);
		series::SHAPE.set_u8(
			&mut row,
			series::UNDERLYING,
			if to_create.underlying {
				1
			} else {
				0
			},
		);

		txn.set(&SeriesStorageKey::encoded(series_id), row)?;

		Ok(())
	}

	fn link_series_to_namespace(
		txn: &mut AdminTransaction,
		namespace: NamespaceId,
		series_id: SeriesId,
		name: &str,
	) -> Result<()> {
		let mut row = series_namespace::SHAPE.allocate();
		series_namespace::SHAPE.set_u64(&mut row, series_namespace::ID, series_id);
		series_namespace::SHAPE.set_utf8(&mut row, series_namespace::NAME, name);

		txn.set(&NamespaceSeriesKey::encoded(namespace, series_id), row)?;

		Ok(())
	}

	fn insert_series_columns(
		txn: &mut AdminTransaction,
		series_id: SeriesId,
		to_create: &SeriesToCreate,
	) -> Result<()> {
		for (idx, col) in to_create.columns.iter().enumerate() {
			CatalogStore::create_column(
				txn,
				series_id,
				ColumnToCreate {
					fragment: Some(col.fragment.clone()),
					namespace_name: String::new(),
					shape_name: String::new(),
					column: col.name.text().to_string(),
					constraint: col.constraint.clone(),
					properties: col.properties.clone(),
					index: ColumnIndex(idx as u8),
					auto_increment: col.auto_increment,
					dictionary_id: col.dictionary_id,
				},
			)?;
		}

		Ok(())
	}

	fn initialize_series_metadata(txn: &mut AdminTransaction, series_id: SeriesId) -> Result<()> {
		let mut row = series_metadata::SHAPE.allocate();
		series_metadata::SHAPE.set_u64(&mut row, series_metadata::ID, series_id);
		series_metadata::SHAPE.set_u64(&mut row, series_metadata::ROW_COUNT, 0u64);
		series_metadata::SHAPE.set_u64(&mut row, series_metadata::OLDEST_KEY, 0u64);
		series_metadata::SHAPE.set_u64(&mut row, series_metadata::NEWEST_KEY, 0u64);
		series_metadata::SHAPE.set_u64(&mut row, series_metadata::SEQUENCE_COUNTER, 0u64);

		txn.set(&SeriesMetadataKey::encoded(series_id), row)?;

		Ok(())
	}

	/// Create a series with a specific ID and column IDs. Used for bootstrapping system shapes.
	/// Skips duplicate check - caller must ensure uniqueness.
	pub(crate) fn create_series_with_id(
		txn: &mut AdminTransaction,
		series_id: SeriesId,
		to_create: SeriesToCreate,
		column_ids: &[ColumnId],
	) -> Result<Series> {
		assert_eq!(column_ids.len(), to_create.columns.len(), "column_ids length must match columns length");

		let namespace_id = to_create.namespace;
		Self::install_series(txn, series_id, namespace_id, &to_create)?;
		Self::insert_series_columns_with_ids(txn, series_id, &to_create, column_ids)?;
		Self::initialize_series_metadata(txn, series_id)?;
		Self::get_series(&mut Transaction::Admin(&mut *txn), series_id)
	}

	fn insert_series_columns_with_ids(
		txn: &mut AdminTransaction,
		series_id: SeriesId,
		to_create: &SeriesToCreate,
		column_ids: &[ColumnId],
	) -> Result<()> {
		for (idx, (col, &col_id)) in to_create.columns.iter().zip(column_ids.iter()).enumerate() {
			CatalogStore::create_column_with_id(
				txn,
				col_id,
				series_id,
				ColumnToCreate {
					fragment: Some(col.fragment.clone()),
					namespace_name: String::new(),
					shape_name: String::new(),
					column: col.name.text().to_string(),
					constraint: col.constraint.clone(),
					properties: col.properties.clone(),
					index: ColumnIndex(idx as u8),
					auto_increment: col.auto_increment,
					dictionary_id: col.dictionary_id,
				},
			)?;
		}

		Ok(())
	}
}