use reifydb_core::{
interface::catalog::{
column::ColumnIndex,
id::{ColumnId, NamespaceId, TableId},
property::ColumnPropertyKind,
shape::ShapeId,
table::Table,
},
key::{namespace_table::NamespaceTableKey, table::TableKey},
retention::RetentionStrategy,
};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use reifydb_type::{
fragment::Fragment,
value::{constraint::TypeConstraint, dictionary::DictionaryId},
};
use crate::{
CatalogStore, Result,
error::{CatalogError, CatalogObjectKind},
store::{
column::create::ColumnToCreate,
retention_strategy::create::create_shape_retention_strategy,
sequence::system::SystemSequence,
table::shape::{table, table_namespace},
},
};
#[derive(Debug, Clone)]
pub struct TableColumnToCreate {
pub name: Fragment,
pub fragment: Fragment,
pub constraint: TypeConstraint,
pub properties: Vec<ColumnPropertyKind>,
pub auto_increment: bool,
pub dictionary_id: Option<DictionaryId>,
}
#[derive(Debug, Clone)]
pub struct TableToCreate {
pub name: Fragment,
pub namespace: NamespaceId,
pub columns: Vec<TableColumnToCreate>,
pub retention_strategy: Option<RetentionStrategy>,
pub underlying: bool,
}
impl CatalogStore {
pub(crate) fn create_table(txn: &mut AdminTransaction, to_create: TableToCreate) -> Result<Table> {
let namespace_id = to_create.namespace;
if let Some(table) = CatalogStore::find_table_by_name(
&mut Transaction::Admin(&mut *txn),
namespace_id,
to_create.name.text(),
)? {
let namespace = CatalogStore::get_namespace(&mut Transaction::Admin(&mut *txn), namespace_id)?;
return Err(CatalogError::AlreadyExists {
kind: CatalogObjectKind::Table,
namespace: namespace.name().to_string(),
name: table.name,
fragment: to_create.name.clone(),
}
.into());
}
let table_id = SystemSequence::next_table_id(txn)?;
Self::store_table(txn, table_id, namespace_id, &to_create)?;
Self::link_table_to_namespace(txn, namespace_id, table_id, to_create.name.text())?;
if let Some(retention_strategy) = &to_create.retention_strategy {
create_shape_retention_strategy(txn, ShapeId::Table(table_id), retention_strategy)?;
}
Self::insert_columns(txn, table_id, to_create)?;
Self::get_table(&mut Transaction::Admin(&mut *txn), table_id)
}
fn store_table(
txn: &mut AdminTransaction,
table: TableId,
namespace: NamespaceId,
to_create: &TableToCreate,
) -> Result<()> {
let mut row = table::SHAPE.allocate();
table::SHAPE.set_u64(&mut row, table::ID, table);
table::SHAPE.set_u64(&mut row, table::NAMESPACE, namespace);
table::SHAPE.set_utf8(&mut row, table::NAME, to_create.name.text());
table::SHAPE.set_u64(&mut row, table::PRIMARY_KEY, 0u64);
table::SHAPE.set_u8(
&mut row,
table::UNDERLYING,
if to_create.underlying {
1
} else {
0
},
);
txn.set(&TableKey::encoded(table), row)?;
Ok(())
}
fn link_table_to_namespace(
txn: &mut AdminTransaction,
namespace: NamespaceId,
table: TableId,
name: &str,
) -> Result<()> {
let mut row = table_namespace::SHAPE.allocate();
table_namespace::SHAPE.set_u64(&mut row, table_namespace::ID, table);
table_namespace::SHAPE.set_utf8(&mut row, table_namespace::NAME, name);
txn.set(&NamespaceTableKey::encoded(namespace, table), row)?;
Ok(())
}
fn insert_columns(txn: &mut AdminTransaction, table: TableId, to_create: TableToCreate) -> Result<()> {
let namespace_name = Self::find_namespace(&mut Transaction::Admin(&mut *txn), to_create.namespace)?
.map(|s| s.name().to_string())
.unwrap_or_else(|| format!("namespace_{}", to_create.namespace));
for (idx, column_to_create) in to_create.columns.into_iter().enumerate() {
Self::create_column(
txn,
table,
ColumnToCreate {
fragment: Some(column_to_create.fragment.clone()),
namespace_name: namespace_name.clone(),
shape_name: to_create.name.text().to_string(),
column: column_to_create.name.text().to_string(),
constraint: column_to_create.constraint.clone(),
properties: column_to_create.properties.clone(),
index: ColumnIndex(idx as u8),
auto_increment: column_to_create.auto_increment,
dictionary_id: column_to_create.dictionary_id,
},
)?;
}
Ok(())
}
pub(crate) fn create_table_with_id(
txn: &mut AdminTransaction,
table_id: TableId,
to_create: TableToCreate,
column_ids: &[ColumnId],
) -> Result<Table> {
assert_eq!(column_ids.len(), to_create.columns.len(), "column_ids length must match columns length");
let namespace_id = to_create.namespace;
Self::store_table(txn, table_id, namespace_id, &to_create)?;
Self::link_table_to_namespace(txn, namespace_id, table_id, to_create.name.text())?;
if let Some(retention_strategy) = &to_create.retention_strategy {
create_shape_retention_strategy(txn, ShapeId::Table(table_id), retention_strategy)?;
}
Self::insert_columns_with_ids(txn, table_id, to_create, column_ids)?;
Self::get_table(&mut Transaction::Admin(&mut *txn), table_id)
}
fn insert_columns_with_ids(
txn: &mut AdminTransaction,
table: TableId,
to_create: TableToCreate,
column_ids: &[ColumnId],
) -> Result<()> {
for (idx, (column_to_create, &col_id)) in
to_create.columns.into_iter().zip(column_ids.iter()).enumerate()
{
Self::create_column_with_id(
txn,
col_id,
table,
ColumnToCreate {
fragment: Some(column_to_create.fragment.clone()),
namespace_name: String::new(),
shape_name: String::new(),
column: column_to_create.name.text().to_string(),
constraint: column_to_create.constraint.clone(),
properties: column_to_create.properties.clone(),
index: ColumnIndex(idx as u8),
auto_increment: column_to_create.auto_increment,
dictionary_id: column_to_create.dictionary_id,
},
)?;
}
Ok(())
}
}
#[cfg(test)]
pub mod tests {
use reifydb_core::{
interface::catalog::id::{NamespaceId, TableId},
key::namespace_table::NamespaceTableKey,
};
use reifydb_engine::test_harness::create_test_admin_transaction;
use reifydb_type::fragment::Fragment;
use crate::{
CatalogStore,
store::table::{create::TableToCreate, shape::table_namespace},
test_utils::ensure_test_namespace,
};
#[test]
fn test_create_table() {
let mut txn = create_test_admin_transaction();
let test_namespace = ensure_test_namespace(&mut txn);
let to_create = TableToCreate {
namespace: test_namespace.id(),
name: Fragment::internal("test_table"),
columns: vec![],
retention_strategy: None,
underlying: false,
};
let result = CatalogStore::create_table(&mut txn, to_create.clone()).unwrap();
assert_eq!(result.id, TableId(16385));
assert_eq!(result.namespace, NamespaceId(16385));
assert_eq!(result.name, "test_table");
let err = CatalogStore::create_table(&mut txn, to_create).unwrap_err();
assert_eq!(err.diagnostic().code, "CA_003");
}
#[test]
fn test_table_linked_to_namespace() {
let mut txn = create_test_admin_transaction();
let test_namespace = ensure_test_namespace(&mut txn);
let to_create = TableToCreate {
namespace: test_namespace.id(),
name: Fragment::internal("test_table"),
columns: vec![],
retention_strategy: None,
underlying: false,
};
CatalogStore::create_table(&mut txn, to_create).unwrap();
let to_create = TableToCreate {
namespace: test_namespace.id(),
name: Fragment::internal("another_table"),
columns: vec![],
retention_strategy: None,
underlying: false,
};
CatalogStore::create_table(&mut txn, to_create).unwrap();
let links: Vec<_> = txn
.range(NamespaceTableKey::full_scan(test_namespace.id()), 1024)
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
assert_eq!(links.len(), 2);
let link = &links[1];
let row = &link.row;
assert_eq!(table_namespace::SHAPE.get_u64(row, table_namespace::ID), 16385);
assert_eq!(table_namespace::SHAPE.get_utf8(row, table_namespace::NAME), "test_table");
let link = &links[0];
let row = &link.row;
assert_eq!(table_namespace::SHAPE.get_u64(row, table_namespace::ID), 16386);
assert_eq!(table_namespace::SHAPE.get_utf8(row, table_namespace::NAME), "another_table");
}
}