use reifydb_core::{
interface::catalog::{property::ColumnPropertyKind, shape::ShapeId},
key::{column::ColumnKey, columns::ColumnsKey},
};
use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
use reifydb_type::{
fragment::Fragment,
value::{
blob::Blob,
constraint::{Constraint, TypeConstraint},
dictionary::DictionaryId,
r#type::Type,
},
};
fn encode_constraint(constraint: &Option<Constraint>) -> Vec<u8> {
match constraint {
None => vec![0], Some(Constraint::MaxBytes(max_bytes)) => {
let mut bytes = vec![1]; let max_value: u32 = (*max_bytes).into();
bytes.extend_from_slice(&max_value.to_le_bytes());
bytes
}
Some(Constraint::PrecisionScale(precision, scale)) => {
vec![2, (*precision).into(), (*scale).into()] }
Some(Constraint::Dictionary(dict_id, id_type)) => {
let mut bytes = vec![3]; bytes.extend_from_slice(&dict_id.to_u64().to_le_bytes());
bytes.push(id_type.to_u8());
bytes
}
Some(Constraint::SumType(id)) => {
let mut bytes = vec![4];
bytes.extend_from_slice(&id.to_u64().to_le_bytes());
bytes
}
}
}
use reifydb_core::interface::catalog::{
column::{Column, ColumnIndex},
id::ColumnId,
};
use crate::{
CatalogStore, Result,
error::{CatalogError, CatalogObjectKind},
store::{
column::shape::{
column,
column::{AUTO_INCREMENT, CONSTRAINT, DICTIONARY_ID, ID, INDEX, NAME, PRIMITIVE, VALUE},
primitive_column,
},
sequence::system::SystemSequence,
},
};
pub(crate) struct ColumnToCreate {
pub fragment: Option<Fragment>,
pub namespace_name: String,
pub shape_name: String, pub column: String,
pub constraint: TypeConstraint,
pub properties: Vec<ColumnPropertyKind>,
pub index: ColumnIndex,
pub auto_increment: bool,
pub dictionary_id: Option<DictionaryId>,
}
impl CatalogStore {
pub(crate) fn create_column(
txn: &mut AdminTransaction,
shape: impl Into<ShapeId>,
column_to_create: ColumnToCreate,
) -> Result<Column> {
let shape = shape.into();
if let Some(column) =
Self::find_column_by_name(&mut Transaction::Admin(&mut *txn), shape, &column_to_create.column)?
{
return Err(CatalogError::ColumnAlreadyExists {
kind: CatalogObjectKind::Table,
namespace: column_to_create.namespace_name.clone(),
name: column_to_create.shape_name.clone(),
column: column.name,
fragment: Fragment::None,
}
.into());
}
if column_to_create.auto_increment {
let base_type = column_to_create.constraint.get_type();
let is_integer_type = matches!(
base_type,
Type::Int1
| Type::Int2 | Type::Int4 | Type::Int8
| Type::Int16 | Type::Uint1 | Type::Uint2
| Type::Uint4 | Type::Uint8 | Type::Uint16
);
if !is_integer_type {
return Err(CatalogError::AutoIncrementInvalidType {
column: column_to_create.column.clone(),
ty: base_type,
fragment: column_to_create.fragment.unwrap_or(Fragment::None),
}
.into());
}
}
let id = SystemSequence::next_column_id(txn)?;
let mut row = column::SHAPE.allocate();
column::SHAPE.set_u64(&mut row, ID, id);
column::SHAPE.set_u64(&mut row, PRIMITIVE, shape);
column::SHAPE.set_utf8(&mut row, NAME, &column_to_create.column);
column::SHAPE.set_u8(&mut row, VALUE, column_to_create.constraint.get_type().to_u8());
column::SHAPE.set_u8(&mut row, INDEX, column_to_create.index);
column::SHAPE.set_bool(&mut row, AUTO_INCREMENT, column_to_create.auto_increment);
let constraint_bytes = encode_constraint(column_to_create.constraint.constraint());
let blob = Blob::from(constraint_bytes);
column::SHAPE.set_blob(&mut row, CONSTRAINT, &blob);
let dict_id_value = column_to_create.dictionary_id.map(u64::from).unwrap_or(0);
column::SHAPE.set_u64(&mut row, DICTIONARY_ID, dict_id_value);
txn.set(&ColumnsKey::encoded(id), row)?;
let mut row = primitive_column::SHAPE.allocate();
primitive_column::SHAPE.set_u64(&mut row, primitive_column::ID, id);
primitive_column::SHAPE.set_utf8(&mut row, primitive_column::NAME, &column_to_create.column);
primitive_column::SHAPE.set_u8(&mut row, primitive_column::INDEX, column_to_create.index);
txn.set(&ColumnKey::encoded(shape, id), row)?;
for policy in column_to_create.properties {
Self::create_column_property(txn, id, policy)?;
}
Ok(Column {
id,
name: column_to_create.column,
constraint: column_to_create.constraint,
index: column_to_create.index,
properties: Self::list_column_properties(&mut Transaction::Admin(&mut *txn), id)?,
auto_increment: column_to_create.auto_increment,
dictionary_id: column_to_create.dictionary_id,
})
}
pub(crate) fn create_column_with_id(
txn: &mut AdminTransaction,
id: ColumnId,
shape: impl Into<ShapeId>,
column_to_create: ColumnToCreate,
) -> Result<Column> {
let shape = shape.into();
let mut row = column::SHAPE.allocate();
column::SHAPE.set_u64(&mut row, ID, id);
column::SHAPE.set_u64(&mut row, PRIMITIVE, shape);
column::SHAPE.set_utf8(&mut row, NAME, &column_to_create.column);
column::SHAPE.set_u8(&mut row, VALUE, column_to_create.constraint.get_type().to_u8());
column::SHAPE.set_u8(&mut row, INDEX, column_to_create.index);
column::SHAPE.set_bool(&mut row, AUTO_INCREMENT, column_to_create.auto_increment);
let constraint_bytes = encode_constraint(column_to_create.constraint.constraint());
let blob = Blob::from(constraint_bytes);
column::SHAPE.set_blob(&mut row, CONSTRAINT, &blob);
let dict_id_value = column_to_create.dictionary_id.map(u64::from).unwrap_or(0);
column::SHAPE.set_u64(&mut row, DICTIONARY_ID, dict_id_value);
txn.set(&ColumnsKey::encoded(id), row)?;
let mut row = primitive_column::SHAPE.allocate();
primitive_column::SHAPE.set_u64(&mut row, primitive_column::ID, id);
primitive_column::SHAPE.set_utf8(&mut row, primitive_column::NAME, &column_to_create.column);
primitive_column::SHAPE.set_u8(&mut row, primitive_column::INDEX, column_to_create.index);
txn.set(&ColumnKey::encoded(shape, id), row)?;
for policy in column_to_create.properties {
Self::create_column_property(txn, id, policy)?;
}
Ok(Column {
id,
name: column_to_create.column,
constraint: column_to_create.constraint,
index: column_to_create.index,
properties: Self::list_column_properties(&mut Transaction::Admin(&mut *txn), id)?,
auto_increment: column_to_create.auto_increment,
dictionary_id: column_to_create.dictionary_id,
})
}
}
#[cfg(test)]
pub mod test {
use reifydb_core::interface::catalog::{
column::ColumnIndex,
id::{ColumnId, TableId},
};
use reifydb_engine::test_harness::create_test_admin_transaction;
use reifydb_transaction::transaction::Transaction;
use reifydb_type::value::{constraint::TypeConstraint, r#type::Type};
use crate::{CatalogStore, store::column::create::ColumnToCreate, test_utils::ensure_test_table};
#[test]
fn test_create_column() {
let mut txn = create_test_admin_transaction();
ensure_test_table(&mut txn);
CatalogStore::create_column(
&mut txn,
TableId(1),
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: "col_1".to_string(),
constraint: TypeConstraint::unconstrained(Type::Boolean),
properties: vec![],
index: ColumnIndex(0),
auto_increment: false,
dictionary_id: None,
},
)
.unwrap();
CatalogStore::create_column(
&mut txn,
TableId(1),
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: "col_2".to_string(),
constraint: TypeConstraint::unconstrained(Type::Int2),
properties: vec![],
index: ColumnIndex(1),
auto_increment: false,
dictionary_id: None,
},
)
.unwrap();
let column_1 = CatalogStore::get_column(&mut Transaction::Admin(&mut txn), ColumnId(16385)).unwrap();
assert_eq!(column_1.id, 16385);
assert_eq!(column_1.name, "col_1");
assert_eq!(column_1.constraint.get_type(), Type::Boolean);
assert_eq!(column_1.auto_increment, false);
let column_2 = CatalogStore::get_column(&mut Transaction::Admin(&mut txn), ColumnId(16386)).unwrap();
assert_eq!(column_2.id, 16386);
assert_eq!(column_2.name, "col_2");
assert_eq!(column_2.constraint.get_type(), Type::Int2);
assert_eq!(column_2.auto_increment, false);
}
#[test]
fn test_create_column_with_auto_increment() {
let mut txn = create_test_admin_transaction();
ensure_test_table(&mut txn);
CatalogStore::create_column(
&mut txn,
TableId(1),
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: "id".to_string(),
constraint: TypeConstraint::unconstrained(Type::Uint8),
properties: vec![],
index: ColumnIndex(0),
auto_increment: true,
dictionary_id: None,
},
)
.unwrap();
let column = CatalogStore::get_column(&mut Transaction::Admin(&mut txn), ColumnId(16385)).unwrap();
assert_eq!(column.id, ColumnId(16385));
assert_eq!(column.name, "id");
assert_eq!(column.constraint.get_type(), Type::Uint8);
assert_eq!(column.auto_increment, true);
}
#[test]
fn test_auto_increment_invalid_type() {
let mut txn = create_test_admin_transaction();
ensure_test_table(&mut txn);
let err = CatalogStore::create_column(
&mut txn,
TableId(1),
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: "name".to_string(),
constraint: TypeConstraint::unconstrained(Type::Utf8),
properties: vec![],
index: ColumnIndex(0),
auto_increment: true,
dictionary_id: None,
},
)
.unwrap_err();
let diagnostic = err.diagnostic();
assert_eq!(diagnostic.code, "CA_006");
assert!(diagnostic.message.contains("auto increment is not supported for type"));
let err = CatalogStore::create_column(
&mut txn,
TableId(1),
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: "is_active".to_string(),
constraint: TypeConstraint::unconstrained(Type::Boolean),
properties: vec![],
index: ColumnIndex(0),
auto_increment: true,
dictionary_id: None,
},
)
.unwrap_err();
assert_eq!(err.diagnostic().code, "CA_006");
let err = CatalogStore::create_column(
&mut txn,
TableId(1),
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: "price".to_string(),
constraint: TypeConstraint::unconstrained(Type::Float8),
properties: vec![],
index: ColumnIndex(0),
auto_increment: true,
dictionary_id: None,
},
)
.unwrap_err();
assert_eq!(err.diagnostic().code, "CA_006");
}
#[test]
fn test_column_already_exists() {
let mut txn = create_test_admin_transaction();
ensure_test_table(&mut txn);
CatalogStore::create_column(
&mut txn,
TableId(1),
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: "col_1".to_string(),
constraint: TypeConstraint::unconstrained(Type::Boolean),
properties: vec![],
index: ColumnIndex(0),
auto_increment: false,
dictionary_id: None,
},
)
.unwrap();
let err = CatalogStore::create_column(
&mut txn,
TableId(1),
ColumnToCreate {
fragment: None,
namespace_name: "test_namespace".to_string(),
shape_name: "test_table".to_string(),
column: "col_1".to_string(),
constraint: TypeConstraint::unconstrained(Type::Boolean),
properties: vec![],
index: ColumnIndex(1),
auto_increment: false,
dictionary_id: None,
},
)
.unwrap_err();
let diagnostic = err.diagnostic();
assert_eq!(diagnostic.code, "CA_005");
}
}