use std::collections;
use collections::HashSet;
use reifydb_core::{
interface::catalog::{
id::{ColumnId, PrimaryKeyId},
shape::ShapeId,
},
key::primary_key::PrimaryKeyKey,
return_internal_error,
};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use reifydb_type::fragment::Fragment;
use crate::{
CatalogStore, Result,
error::CatalogError,
store::{
primary_key::shape::{
primary_key,
primary_key::{SHAPE, serialize_column_ids},
},
sequence::system::SystemSequence,
},
};
pub struct PrimaryKeyToCreate {
pub shape: ShapeId,
pub column_ids: Vec<ColumnId>,
}
impl CatalogStore {
pub(crate) fn create_primary_key(
txn: &mut AdminTransaction,
to_create: PrimaryKeyToCreate,
) -> Result<PrimaryKeyId> {
if to_create.column_ids.is_empty() {
return Err(CatalogError::PrimaryKeyEmpty {
fragment: Fragment::None,
}
.into());
}
let source_columns = Self::list_columns(&mut Transaction::Admin(&mut *txn), to_create.shape)?;
let source_column_ids: HashSet<_> = source_columns.iter().map(|c| c.id).collect();
for column_id in &to_create.column_ids {
if !source_column_ids.contains(column_id) {
return Err(CatalogError::PrimaryKeyColumnNotFound {
fragment: Fragment::None,
column_id: column_id.0,
}
.into());
}
}
let id = SystemSequence::next_primary_key_id(txn)?;
let mut row = SHAPE.allocate();
SHAPE.set_u64(&mut row, primary_key::ID, id.0);
SHAPE.set_u64(&mut row, primary_key::SOURCE, to_create.shape.as_u64());
SHAPE.set_blob(&mut row, primary_key::COLUMN_IDS, &serialize_column_ids(&to_create.column_ids));
txn.set(&PrimaryKeyKey::encoded(id), row)?;
match to_create.shape {
ShapeId::Table(table_id) => {
Self::set_table_primary_key(txn, table_id, id)?;
}
ShapeId::View(view_id) => {
Self::set_view_primary_key(txn, view_id, id)?;
}
ShapeId::TableVirtual(_) => {
return_internal_error!(
"Cannot create primary key for virtual table. Virtual tables do not support primary keys."
);
}
ShapeId::RingBuffer(ringbuffer_id) => {
Self::set_ringbuffer_primary_key(txn, ringbuffer_id, id)?;
}
ShapeId::Dictionary(_) => {
return_internal_error!(
"Cannot create primary key for dictionary. Dictionaries have their own key structure."
);
}
ShapeId::Series(_) => {
return_internal_error!(
"Cannot create primary key for series. Series use timestamp-based key ordering."
);
}
}
Ok(id)
}
}
#[cfg(test)]
pub mod tests {
use reifydb_core::interface::catalog::{
column::ColumnIndex,
id::{ColumnId, PrimaryKeyId, TableId, ViewId},
shape::ShapeId,
};
use reifydb_engine::test_harness::create_test_admin_transaction;
use reifydb_transaction::transaction::Transaction;
use reifydb_type::{
fragment::Fragment,
value::{constraint::TypeConstraint, r#type::Type},
};
use super::PrimaryKeyToCreate;
use crate::{
CatalogStore,
store::{
column::create::ColumnToCreate,
table::create::TableToCreate,
view::create::{ViewColumnToCreate, ViewStorageConfig, ViewToCreate},
},
test_utils::{ensure_test_namespace, ensure_test_table},
};
#[test]
fn test_create_primary_key_for_table() {
let mut txn = create_test_admin_transaction();
let table = ensure_test_table(&mut txn);
let col1 = CatalogStore::create_column(
&mut txn,
table.id,
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: "id".to_string(),
constraint: TypeConstraint::unconstrained(Type::Uint8),
properties: vec![],
index: ColumnIndex(0),
auto_increment: true,
dictionary_id: None,
},
)
.unwrap();
let col2 = CatalogStore::create_column(
&mut txn,
table.id,
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: "tenant_id".to_string(),
constraint: TypeConstraint::unconstrained(Type::Uint8),
properties: vec![],
index: ColumnIndex(1),
auto_increment: false,
dictionary_id: None,
},
)
.unwrap();
let primary_key_id = CatalogStore::create_primary_key(
&mut txn,
PrimaryKeyToCreate {
shape: ShapeId::Table(table.id),
column_ids: vec![col1.id, col2.id],
},
)
.unwrap();
assert_eq!(primary_key_id, PrimaryKeyId(16385));
let found_pk = CatalogStore::find_primary_key(&mut Transaction::Admin(&mut txn), table.id)
.unwrap()
.expect("Primary key should exist");
assert_eq!(found_pk.id, primary_key_id);
assert_eq!(found_pk.columns.len(), 2);
assert_eq!(found_pk.columns[0].id, col1.id);
assert_eq!(found_pk.columns[0].name, "id");
assert_eq!(found_pk.columns[1].id, col2.id);
assert_eq!(found_pk.columns[1].name, "tenant_id");
}
#[test]
fn test_create_primary_key_for_view() {
let mut txn = create_test_admin_transaction();
let namespace = ensure_test_namespace(&mut txn);
let view = CatalogStore::create_deferred_view(
&mut txn,
ViewToCreate {
name: Fragment::internal("test_view"),
namespace: namespace.id(),
columns: vec![
ViewColumnToCreate {
name: Fragment::internal("id"),
fragment: Fragment::None,
constraint: TypeConstraint::unconstrained(Type::Uint8),
},
ViewColumnToCreate {
name: Fragment::internal("name"),
fragment: Fragment::None,
constraint: TypeConstraint::unconstrained(Type::Utf8),
},
],
storage: ViewStorageConfig::default(),
},
)
.unwrap();
let columns = CatalogStore::list_columns(&mut Transaction::Admin(&mut txn), view.id()).unwrap();
assert_eq!(columns.len(), 2);
let primary_key_id = CatalogStore::create_primary_key(
&mut txn,
PrimaryKeyToCreate {
shape: ShapeId::View(view.id()),
column_ids: vec![columns[0].id],
},
)
.unwrap();
assert_eq!(primary_key_id, PrimaryKeyId(16385));
let found_pk = CatalogStore::find_primary_key(&mut Transaction::Admin(&mut txn), view.id())
.unwrap()
.expect("Primary key should exist");
assert_eq!(found_pk.id, primary_key_id);
assert_eq!(found_pk.columns.len(), 1);
assert_eq!(found_pk.columns[0].id, columns[0].id);
assert_eq!(found_pk.columns[0].name, "id");
}
#[test]
fn test_create_composite_primary_key() {
let mut txn = create_test_admin_transaction();
let table = ensure_test_table(&mut txn);
let mut column_ids = Vec::new();
for i in 0..3 {
let col = CatalogStore::create_column(
&mut txn,
table.id,
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: format!("col_{}", i),
constraint: TypeConstraint::unconstrained(Type::Uint8),
properties: vec![],
index: ColumnIndex(i as u8),
auto_increment: false,
dictionary_id: None,
},
)
.unwrap();
column_ids.push(col.id);
}
let primary_key_id = CatalogStore::create_primary_key(
&mut txn,
PrimaryKeyToCreate {
shape: ShapeId::Table(table.id),
column_ids: column_ids.clone(),
},
)
.unwrap();
let found_pk = CatalogStore::find_primary_key(&mut Transaction::Admin(&mut txn), table.id)
.unwrap()
.expect("Primary key should exist");
assert_eq!(found_pk.id, primary_key_id);
assert_eq!(found_pk.columns.len(), 3);
for (i, col) in found_pk.columns.iter().enumerate() {
assert_eq!(col.id, column_ids[i]);
assert_eq!(col.name, format!("col_{}", i));
}
}
#[test]
fn test_create_primary_key_updates_table() {
let mut txn = create_test_admin_transaction();
let table = ensure_test_table(&mut txn);
let initial_pk = CatalogStore::find_primary_key(&mut Transaction::Admin(&mut txn), table.id).unwrap();
assert!(initial_pk.is_none());
let col = CatalogStore::create_column(
&mut txn,
table.id,
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: "id".to_string(),
constraint: TypeConstraint::unconstrained(Type::Uint8),
properties: vec![],
index: ColumnIndex(0),
auto_increment: true,
dictionary_id: None,
},
)
.unwrap();
let primary_key_id = CatalogStore::create_primary_key(
&mut txn,
PrimaryKeyToCreate {
shape: ShapeId::Table(table.id),
column_ids: vec![col.id],
},
)
.unwrap();
let updated_pk = CatalogStore::find_primary_key(&mut Transaction::Admin(&mut txn), table.id)
.unwrap()
.expect("Primary key should exist");
assert_eq!(updated_pk.id, primary_key_id);
}
#[test]
fn test_create_primary_key_on_nonexistent_table() {
let mut txn = create_test_admin_transaction();
let result = CatalogStore::create_primary_key(
&mut txn,
PrimaryKeyToCreate {
shape: ShapeId::Table(TableId(999)),
column_ids: vec![ColumnId(1)],
},
);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code, "CA_021");
}
#[test]
fn test_create_primary_key_on_nonexistent_view() {
let mut txn = create_test_admin_transaction();
let result = CatalogStore::create_primary_key(
&mut txn,
PrimaryKeyToCreate {
shape: ShapeId::View(ViewId(999)),
column_ids: vec![ColumnId(1)],
},
);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code, "CA_021");
}
#[test]
fn test_create_empty_primary_key() {
let mut txn = create_test_admin_transaction();
let table = ensure_test_table(&mut txn);
let result = CatalogStore::create_primary_key(
&mut txn,
PrimaryKeyToCreate {
shape: ShapeId::Table(table.id),
column_ids: vec![],
},
);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code, "CA_020");
}
#[test]
fn test_create_primary_key_with_nonexistent_column() {
let mut txn = create_test_admin_transaction();
let table = ensure_test_table(&mut txn);
let result = CatalogStore::create_primary_key(
&mut txn,
PrimaryKeyToCreate {
shape: ShapeId::Table(table.id),
column_ids: vec![ColumnId(999)],
},
);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code, "CA_021");
}
#[test]
fn test_create_primary_key_with_column_from_different_table() {
let mut txn = create_test_admin_transaction();
let table1 = ensure_test_table(&mut txn);
let _col1 = CatalogStore::create_column(
&mut txn,
table1.id,
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: "id".to_string(),
constraint: TypeConstraint::unconstrained(Type::Uint8),
properties: vec![],
index: ColumnIndex(0),
auto_increment: false,
dictionary_id: None,
},
)
.unwrap();
let namespace =
CatalogStore::get_namespace(&mut Transaction::Admin(&mut txn), table1.namespace).unwrap();
let table2 = CatalogStore::create_table(
&mut txn,
TableToCreate {
name: Fragment::internal("test_table2"),
namespace: namespace.id(),
columns: vec![],
retention_strategy: None,
underlying: false,
},
)
.unwrap();
let col2 = CatalogStore::create_column(
&mut txn,
table2.id,
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table2".to_string(),
column: "id".to_string(),
constraint: TypeConstraint::unconstrained(Type::Uint8),
properties: vec![],
index: ColumnIndex(0),
auto_increment: false,
dictionary_id: None,
},
)
.unwrap();
let result = CatalogStore::create_primary_key(
&mut txn,
PrimaryKeyToCreate {
shape: ShapeId::Table(table1.id),
column_ids: vec![col2.id],
},
);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.code, "CA_021");
}
}