use anyhow::Result;
use serde::Deserialize;
use std::collections::HashMap;
use uni_common::Value;
use uni_common::{
UniError,
core::schema::{
Constraint, ConstraintTarget, ConstraintType, DataType, DistanceMetric, EmbeddingConfig,
IndexDefinition, ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
validate_identifier,
},
};
use uni_store::storage::StorageManager;
#[derive(Deserialize)]
struct LabelConfig {
#[serde(default)]
properties: HashMap<String, PropertyConfig>,
#[serde(default)]
indexes: Vec<IndexConfig>,
#[serde(default)]
constraints: Vec<ConstraintConfig>,
#[serde(default)]
pub description: Option<String>,
}
#[derive(Deserialize)]
struct PropertyConfig {
#[serde(rename = "type")]
data_type: String,
#[serde(default = "default_nullable")]
nullable: bool,
#[serde(default)]
pub description: Option<String>,
}
fn default_nullable() -> bool {
true
}
#[derive(Deserialize)]
struct IndexConfig {
property: Option<String>,
#[serde(rename = "type")]
index_type: String,
#[expect(dead_code)]
dimensions: Option<usize>,
metric: Option<String>,
algorithm: Option<String>,
partitions: Option<u32>,
m: Option<u32>,
ef_construction: Option<u32>,
sub_vectors: Option<u32>,
num_bits: Option<u8>,
embedding: Option<EmbeddingOptions>,
name: Option<String>,
}
#[derive(Deserialize)]
struct EmbeddingOptions {
alias: String,
source: Vec<String>,
#[serde(default = "default_batch_size")]
batch_size: usize,
#[serde(default)]
document_prefix: Option<String>,
#[serde(default)]
query_prefix: Option<String>,
}
fn default_batch_size() -> usize {
32
}
#[derive(Deserialize)]
struct ConstraintConfig {
#[serde(rename = "type")]
constraint_type: String,
properties: Vec<String>,
name: Option<String>,
}
pub async fn create_label(
storage: &StorageManager,
name: &str,
config_val: &Value,
) -> Result<bool> {
validate_identifier(name)?;
if storage.schema_manager().schema().labels.contains_key(name) {
return Err(UniError::LabelAlreadyExists {
label: name.to_string(),
}
.into());
}
let json_val: serde_json::Value = config_val.clone().into();
let config: LabelConfig =
serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
arg: "config".to_string(),
message: e.to_string(),
})?;
storage
.schema_manager()
.add_label_with_desc(name, config.description)?;
for (prop_name, prop_config) in config.properties {
validate_identifier(&prop_name)?;
let data_type = parse_data_type(&prop_config.data_type)?;
storage.schema_manager().add_property_with_desc(
name,
&prop_name,
data_type,
prop_config.nullable,
prop_config.description,
)?;
}
for idx in config.indexes {
if idx.property.is_none() {
return Err(UniError::InvalidArgument {
arg: "indexes".into(),
message: "Property name required for index definition".into(),
}
.into());
}
create_index_internal(storage, name, &idx).await?;
}
for c in config.constraints {
create_constraint_internal(storage, name, &c, true).await?;
}
storage.schema_manager().save().await?;
Ok(true)
}
pub async fn create_edge_type(
storage: &StorageManager,
name: &str,
src_labels: Vec<String>,
dst_labels: Vec<String>,
config_val: &Value,
) -> Result<bool> {
validate_identifier(name)?;
let json_val: serde_json::Value = config_val.clone().into();
let config: LabelConfig =
serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
arg: "config".to_string(),
message: e.to_string(),
})?;
storage.schema_manager().add_edge_type_with_desc(
name,
src_labels,
dst_labels,
config.description,
)?;
for (prop_name, prop_config) in config.properties {
validate_identifier(&prop_name)?;
let data_type = parse_data_type(&prop_config.data_type)?;
storage.schema_manager().add_property_with_desc(
name,
&prop_name,
data_type,
prop_config.nullable,
prop_config.description,
)?;
}
for c in config.constraints {
create_constraint_internal(storage, name, &c, false).await?;
}
storage.schema_manager().save().await?;
Ok(true)
}
pub async fn create_index(
storage: &StorageManager,
label: &str,
property: &str,
config_val: &Value,
) -> Result<bool> {
let json_val: serde_json::Value = config_val.clone().into();
let mut config: IndexConfig =
serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
arg: "config".to_string(),
message: e.to_string(),
})?;
config.property = Some(property.to_string());
create_index_internal(storage, label, &config).await?;
storage.schema_manager().save().await?;
Ok(true)
}
pub async fn create_constraint(
storage: &StorageManager,
label: &str,
constraint_type: &str,
properties: Vec<String>,
) -> Result<bool> {
let config = ConstraintConfig {
constraint_type: constraint_type.to_string(),
properties,
name: None,
};
create_constraint_internal(storage, label, &config, true).await?;
storage.schema_manager().save().await?;
Ok(true)
}
pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
storage.schema_manager().drop_label(name, true)?;
storage.schema_manager().save().await?;
Ok(true)
}
pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
storage.schema_manager().drop_edge_type(name, true)?;
storage.schema_manager().save().await?;
Ok(true)
}
pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
storage.schema_manager().remove_index(name)?;
storage.schema_manager().save().await?;
Ok(true)
}
pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
storage.schema_manager().drop_constraint(name, true)?;
storage.schema_manager().save().await?;
Ok(true)
}
async fn create_index_internal(
storage: &StorageManager,
label: &str,
config: &IndexConfig,
) -> Result<()> {
let prop_name = config
.property
.as_ref()
.ok_or_else(|| UniError::InvalidArgument {
arg: "property".into(),
message: "Property is missing".into(),
})?;
let index_name = config.name.clone().unwrap_or_else(|| {
format!(
"{}_{}_{}",
label,
prop_name,
config.index_type.to_lowercase()
)
});
let def = match config.index_type.to_uppercase().as_str() {
"VECTOR" => {
let metric = match config.metric.as_deref().unwrap_or("cosine") {
"cosine" => DistanceMetric::Cosine,
"l2" | "euclidean" => DistanceMetric::L2,
"dot" => DistanceMetric::Dot,
_ => {
return Err(UniError::InvalidArgument {
arg: "metric".into(),
message: "Invalid metric".into(),
}
.into());
}
};
let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
alias: emb.alias.clone(),
source_properties: emb.source.clone(),
batch_size: emb.batch_size,
document_prefix: emb.document_prefix.clone(),
query_prefix: emb.query_prefix.clone(),
});
let algorithm = config.algorithm.as_deref().unwrap_or("hnsw");
let index_type = match algorithm.to_lowercase().as_str() {
"flat" => VectorIndexType::Flat,
"ivf_flat" => VectorIndexType::IvfFlat {
num_partitions: config.partitions.unwrap_or(256),
},
"ivf_pq" => VectorIndexType::IvfPq {
num_partitions: config.partitions.unwrap_or(256),
num_sub_vectors: config.sub_vectors.unwrap_or(16),
bits_per_subvector: config.num_bits.unwrap_or(8),
},
"ivf_sq" => VectorIndexType::IvfSq {
num_partitions: config.partitions.unwrap_or(256),
},
"ivf_rq" => VectorIndexType::IvfRq {
num_partitions: config.partitions.unwrap_or(256),
num_bits: config.num_bits,
},
"hnsw_flat" => VectorIndexType::HnswFlat {
m: config.m.unwrap_or(16),
ef_construction: config.ef_construction.unwrap_or(200),
num_partitions: config.partitions,
},
"hnsw_pq" => VectorIndexType::HnswPq {
m: config.m.unwrap_or(16),
ef_construction: config.ef_construction.unwrap_or(200),
num_sub_vectors: config.sub_vectors.unwrap_or(16),
num_partitions: config.partitions,
},
_ => VectorIndexType::HnswSq {
m: config.m.unwrap_or(16),
ef_construction: config.ef_construction.unwrap_or(200),
num_partitions: config.partitions,
},
};
IndexDefinition::Vector(VectorIndexConfig {
name: index_name,
label: label.to_string(),
property: prop_name.clone(),
index_type,
metric,
embedding_config,
metadata: Default::default(),
})
}
"SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
name: index_name,
label: label.to_string(),
properties: vec![prop_name.clone()],
index_type: ScalarIndexType::BTree,
where_clause: None,
metadata: Default::default(),
}),
"BITMAP" => IndexDefinition::Scalar(ScalarIndexConfig {
name: index_name,
label: label.to_string(),
properties: vec![prop_name.clone()],
index_type: ScalarIndexType::Bitmap,
where_clause: None,
metadata: Default::default(),
}),
"LABEL_LIST" | "LABELLIST" => IndexDefinition::Scalar(ScalarIndexConfig {
name: index_name,
label: label.to_string(),
properties: vec![prop_name.clone()],
index_type: ScalarIndexType::LabelList,
where_clause: None,
metadata: Default::default(),
}),
"INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
name: index_name,
label: label.to_string(),
property: prop_name.clone(),
normalize: true,
max_terms_per_doc: 10_000,
metadata: Default::default(),
}),
_ => {
return Err(UniError::InvalidArgument {
arg: "type".into(),
message: format!("Unsupported index type: {}", config.index_type),
}
.into());
}
};
storage.schema_manager().add_index(def.clone())?;
let count = if let Ok(ds) = storage.vertex_dataset(label) {
if let Ok(raw) = ds.open_raw().await {
raw.count_rows(None).await.unwrap_or(0)
} else {
0
}
} else {
0
};
tracing::debug!("create_index_internal count for {}: {}", label, count);
if count > 0 {
let idx_mgr = storage.index_manager();
match def {
IndexDefinition::Vector(cfg) => {
idx_mgr.create_vector_index(cfg).await?;
}
IndexDefinition::Scalar(cfg) => {
idx_mgr.create_scalar_index(cfg).await?;
}
IndexDefinition::Inverted(cfg) => {
idx_mgr.create_inverted_index(cfg).await?;
}
IndexDefinition::FullText(cfg) => {
idx_mgr.create_fts_index(cfg).await?;
}
IndexDefinition::JsonFullText(cfg) => {
idx_mgr.create_json_fts_index(cfg).await?;
}
_ => {}
}
}
Ok(())
}
async fn create_constraint_internal(
storage: &StorageManager,
target_name: &str,
config: &ConstraintConfig,
is_label: bool,
) -> Result<()> {
let name = config.name.clone().unwrap_or_else(|| {
format!(
"{}_{}_{}",
target_name,
config.constraint_type.to_lowercase(),
config.properties.join("_")
)
});
let constraint_type = match config.constraint_type.to_uppercase().as_str() {
"UNIQUE" => ConstraintType::Unique {
properties: config.properties.clone(),
},
"EXISTS" => {
if config.properties.len() != 1 {
return Err(UniError::InvalidArgument {
arg: "properties".into(),
message: "EXISTS constraint requires exactly one property".into(),
}
.into());
}
ConstraintType::Exists {
property: config.properties[0].clone(),
}
}
_ => {
return Err(UniError::InvalidArgument {
arg: "type".into(),
message: format!("Unsupported constraint type: {}", config.constraint_type),
}
.into());
}
};
let target = if is_label {
ConstraintTarget::Label(target_name.to_string())
} else {
ConstraintTarget::EdgeType(target_name.to_string())
};
let constraint = Constraint {
name,
constraint_type,
target,
enabled: true,
};
storage.schema_manager().add_constraint(constraint)?;
Ok(())
}
fn parse_data_type(s: &str) -> Result<DataType> {
let s = s.trim();
if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
let inner = &s[5..s.len() - 1];
let inner_type = parse_data_type(inner)?;
return Ok(DataType::List(Box::new(inner_type)));
}
match s.to_uppercase().as_str() {
"STRING" | "UTF8" => Ok(DataType::String),
"INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
"INT32" => Ok(DataType::Int32),
"FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
"FLOAT32" => Ok(DataType::Float32),
"BOOL" | "BOOLEAN" => Ok(DataType::Bool),
"DATETIME" => Ok(DataType::DateTime),
"DATE" => Ok(DataType::Date),
"BTIC" => Ok(DataType::Btic),
"VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
_ => Err(UniError::InvalidArgument {
arg: "type".into(),
message: format!("Unknown data type: {}", s),
}
.into()),
}
}