use reifydb_catalog::{
catalog::table::TableColumnToCreate,
error::{CatalogError, CatalogObjectKind},
};
use reifydb_transaction::transaction::Transaction;
use reifydb_type::{
fragment::Fragment,
value::constraint::{Constraint, TypeConstraint},
};
use crate::{
Result,
ast::ast::{AstColumnProperty, AstCreateTable, AstType},
convert_data_type_with_constraints,
diagnostic::AstError,
plan::logical::{Compiler, CreateTableNode, LogicalPlan},
};
impl<'bump> Compiler<'bump> {
pub(crate) fn compile_create_table(
&self,
ast: AstCreateTable<'bump>,
tx: &mut Transaction<'_>,
) -> Result<LogicalPlan<'bump>> {
let mut columns: Vec<TableColumnToCreate> = vec![];
let table_ns_segments: Vec<&str> = ast.table.namespace.iter().map(|n| n.text()).collect();
for col in ast.columns.into_iter() {
let column_name = col.name.text().to_string();
let mut constraint = match &col.ty {
AstType::Qualified {
namespace,
name,
} => {
let ns_name = namespace.text();
let type_name = name.text();
let ns = self.catalog.find_namespace_by_segments(tx, &[ns_name])?;
let sumtype = ns
.and_then(|ns| {
self.catalog
.find_sumtype_by_name(tx, ns.id(), type_name)
.transpose()
})
.transpose()?;
match sumtype {
Some(def) => TypeConstraint::sumtype(def.id),
None => {
return Err(CatalogError::NotFound {
kind: CatalogObjectKind::Enum,
namespace: ns_name.to_string(),
name: type_name.to_string(),
fragment: Fragment::merge_all([
namespace.to_owned(),
name.to_owned(),
]),
}
.into());
}
}
}
_ => match convert_data_type_with_constraints(&col.ty) {
Ok(c) => c,
Err(_) => {
return Err(AstError::UnrecognizedType {
fragment: col.ty.name_fragment().to_owned(),
}
.into());
}
},
};
let column_type = constraint.get_type();
let name = col.name.to_owned();
let ty_fragment = col.ty.name_fragment().to_owned();
let fragment = Fragment::merge_all([name.clone(), ty_fragment]);
let mut auto_increment = false;
let mut dictionary_id = None;
let properties = vec![];
for property in &col.properties {
match property {
AstColumnProperty::AutoIncrement => auto_increment = true,
AstColumnProperty::Dictionary(dict_ident) => {
let dict_ns_segments: Vec<&str> = if dict_ident.namespace.is_empty() {
table_ns_segments.clone()
} else {
dict_ident.namespace.iter().map(|n| n.text()).collect()
};
let dict_name = dict_ident.name.text();
let Some(namespace) = self
.catalog
.find_namespace_by_segments(tx, &dict_ns_segments)?
else {
return Err(CatalogError::NotFound {
kind: CatalogObjectKind::Dictionary,
namespace: dict_ns_segments.join("::"),
name: dict_name.to_string(),
fragment: dict_ident.name.to_owned(),
}
.into());
};
let Some(dictionary) = self.catalog.find_dictionary_by_name(
tx,
namespace.id(),
dict_name,
)?
else {
return Err(CatalogError::NotFound {
kind: CatalogObjectKind::Dictionary,
namespace: dict_ns_segments.join("::"),
name: dict_name.to_string(),
fragment: dict_ident.name.to_owned(),
}
.into());
};
if column_type != dictionary.value_type {
return Err(CatalogError::DictionaryTypeMismatch {
column: column_name.clone(),
column_type,
dictionary: dict_name.to_string(),
dictionary_value_type: dictionary.value_type,
fragment: col.name.to_owned(),
}
.into());
}
dictionary_id = Some(dictionary.id);
constraint = TypeConstraint::with_constraint(
constraint.get_type(),
Constraint::Dictionary(dictionary.id, dictionary.id_type),
);
}
AstColumnProperty::Saturation(_) => {
}
AstColumnProperty::Default(_) => {
}
}
}
columns.push(TableColumnToCreate {
name,
fragment,
constraint,
properties,
auto_increment,
dictionary_id,
});
}
let table = ast.table;
let ttl = ast.ttl.map(Self::compile_row_ttl).transpose()?;
Ok(LogicalPlan::CreateTable(CreateTableNode {
table,
if_not_exists: ast.if_not_exists,
columns,
ttl,
}))
}
}