1use anyhow::Result;
5use serde::Deserialize;
6use std::collections::HashMap;
7use uni_common::Value;
8use uni_common::{
9 UniError,
10 core::schema::{
11 Constraint, ConstraintTarget, ConstraintType, DataType, EmbeddingConfig, IndexDefinition,
12 ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, validate_identifier,
13 },
14};
15use uni_store::storage::StorageManager;
16
17#[derive(Deserialize)]
18struct LabelConfig {
19 #[serde(default)]
20 properties: HashMap<String, PropertyConfig>,
21 #[serde(default)]
22 indexes: Vec<IndexConfig>,
23 #[serde(default)]
24 constraints: Vec<ConstraintConfig>,
25 #[serde(default)]
26 pub description: Option<String>,
27}
28
29#[derive(Deserialize)]
30struct PropertyConfig {
31 #[serde(rename = "type")]
32 data_type: String,
33 #[serde(default = "default_nullable")]
34 nullable: bool,
35 #[serde(default)]
36 pub description: Option<String>,
37}
38
39fn default_nullable() -> bool {
40 true
41}
42
43#[derive(Deserialize)]
44struct IndexConfig {
45 property: Option<String>,
46 #[serde(rename = "type")]
47 index_type: String,
48 #[expect(dead_code)]
50 dimensions: Option<usize>,
51 metric: Option<String>,
52 algorithm: Option<String>,
53 partitions: Option<u32>,
54 m: Option<u32>,
55 ef_construction: Option<u32>,
56 sub_vectors: Option<u32>,
57 num_bits: Option<u8>,
58 k_sim: Option<u32>,
60 reps: Option<u32>,
61 d_proj: Option<u32>,
62 seed: Option<u64>,
63 inner: Option<String>,
64 embedding: Option<EmbeddingOptions>,
65 quantize: Option<bool>,
67 name: Option<String>,
69}
70
71#[derive(Deserialize)]
73struct EmbeddingOptions {
74 alias: String,
75 source: Vec<String>,
76 #[serde(default = "default_batch_size")]
77 batch_size: usize,
78 #[serde(default)]
79 document_prefix: Option<String>,
80 #[serde(default)]
81 query_prefix: Option<String>,
82}
83
84fn default_batch_size() -> usize {
85 32
86}
87
88#[derive(Deserialize)]
89struct ConstraintConfig {
90 #[serde(rename = "type")]
91 constraint_type: String,
92 properties: Vec<String>,
93 name: Option<String>,
94}
95
96pub async fn create_label(
97 storage: &StorageManager,
98 name: &str,
99 config_val: &Value,
100) -> Result<bool> {
101 validate_identifier(name)?;
102
103 if storage.schema_manager().schema().labels.contains_key(name) {
104 return Err(UniError::LabelAlreadyExists {
105 label: name.to_string(),
106 }
107 .into());
108 }
109
110 let json_val: serde_json::Value = config_val.clone().into();
111 let config: LabelConfig =
112 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
113 arg: "config".to_string(),
114 message: e.to_string(),
115 })?;
116
117 storage
119 .schema_manager()
120 .add_label_with_desc(name, config.description)?;
121
122 for (prop_name, prop_config) in config.properties {
124 validate_identifier(&prop_name)?;
125 let data_type = parse_data_type(&prop_config.data_type)?;
126 storage.schema_manager().add_property_with_desc(
127 name,
128 &prop_name,
129 data_type,
130 prop_config.nullable,
131 prop_config.description,
132 )?;
133 }
134
135 for idx in config.indexes {
137 if idx.property.is_none() {
138 return Err(UniError::InvalidArgument {
139 arg: "indexes".into(),
140 message: "Property name required for index definition".into(),
141 }
142 .into());
143 }
144 create_index_internal(storage, name, &idx).await?;
145 }
146
147 for c in config.constraints {
149 create_constraint_internal(storage, name, &c, true).await?;
150 }
151
152 storage.schema_manager().save().await?;
153 Ok(true)
154}
155
156pub async fn create_edge_type(
157 storage: &StorageManager,
158 name: &str,
159 src_labels: Vec<String>,
160 dst_labels: Vec<String>,
161 config_val: &Value,
162) -> Result<bool> {
163 validate_identifier(name)?;
164
165 let json_val: serde_json::Value = config_val.clone().into();
166 let config: LabelConfig =
167 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
168 arg: "config".to_string(),
169 message: e.to_string(),
170 })?;
171
172 storage.schema_manager().add_edge_type_with_desc(
173 name,
174 src_labels,
175 dst_labels,
176 config.description,
177 )?;
178
179 for (prop_name, prop_config) in config.properties {
180 validate_identifier(&prop_name)?;
181 let data_type = parse_data_type(&prop_config.data_type)?;
182 storage.schema_manager().add_property_with_desc(
183 name,
184 &prop_name,
185 data_type,
186 prop_config.nullable,
187 prop_config.description,
188 )?;
189 }
190
191 for c in config.constraints {
193 create_constraint_internal(storage, name, &c, false).await?;
194 }
195
196 storage.schema_manager().save().await?;
197 Ok(true)
198}
199
200pub async fn create_index(
201 storage: &StorageManager,
202 label: &str,
203 property: &str,
204 config_val: &Value,
205) -> Result<bool> {
206 let json_val: serde_json::Value = config_val.clone().into();
207 let mut config: IndexConfig =
208 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
209 arg: "config".to_string(),
210 message: e.to_string(),
211 })?;
212
213 config.property = Some(property.to_string());
215
216 create_index_internal(storage, label, &config).await?;
217 storage.schema_manager().save().await?;
218 Ok(true)
219}
220
221pub async fn create_constraint(
222 storage: &StorageManager,
223 label: &str,
224 constraint_type: &str,
225 properties: Vec<String>,
226) -> Result<bool> {
227 let config = ConstraintConfig {
228 constraint_type: constraint_type.to_string(),
229 properties,
230 name: None,
231 };
232 create_constraint_internal(storage, label, &config, true).await?;
234 storage.schema_manager().save().await?;
235 Ok(true)
236}
237
238pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
239 storage.schema_manager().drop_label(name, true)?;
240 storage.schema_manager().save().await?;
241 Ok(true)
242}
243
244pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
245 storage.schema_manager().drop_edge_type(name, true)?;
246 storage.schema_manager().save().await?;
247 Ok(true)
248}
249
250pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
251 storage.schema_manager().remove_index(name)?;
252 storage.schema_manager().save().await?;
253 Ok(true)
254}
255
256pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
257 storage.schema_manager().drop_constraint(name, true)?;
258 storage.schema_manager().save().await?;
259 Ok(true)
260}
261
262async fn create_index_internal(
265 storage: &StorageManager,
266 label: &str,
267 config: &IndexConfig,
268) -> Result<()> {
269 let prop_name = config
270 .property
271 .as_ref()
272 .ok_or_else(|| UniError::InvalidArgument {
273 arg: "property".into(),
274 message: "Property is missing".into(),
275 })?;
276
277 let index_name = config.name.clone().unwrap_or_else(|| {
278 format!(
279 "{}_{}_{}",
280 label,
281 prop_name,
282 config.index_type.to_lowercase()
283 )
284 });
285
286 let def = match config.index_type.to_uppercase().as_str() {
287 "VECTOR" => {
288 let metric =
292 uni_common::vector_index_opts::parse_vector_metric(config.metric.as_deref())
293 .map_err(|e| UniError::InvalidArgument {
294 arg: "metric".into(),
295 message: e.to_string(),
296 })?;
297
298 let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
300 alias: emb.alias.clone(),
301 source_properties: emb.source.clone(),
302 batch_size: emb.batch_size,
303 document_prefix: emb.document_prefix.clone(),
304 query_prefix: emb.query_prefix.clone(),
305 });
306
307 let index_type = uni_common::vector_index_opts::build_vector_index_type(
308 &uni_common::vector_index_opts::VectorIndexOpts {
309 type_name: config.algorithm.as_deref(),
310 partitions: config.partitions,
311 m: config.m,
312 ef_construction: config.ef_construction,
313 sub_vectors: config.sub_vectors,
314 num_bits: config.num_bits,
315 k_sim: config.k_sim,
316 reps: config.reps,
317 d_proj: config.d_proj,
318 seed: config.seed,
319 inner: config.inner.as_deref(),
320 },
321 );
322
323 IndexDefinition::Vector(VectorIndexConfig {
324 name: index_name,
325 label: label.to_string(),
326 property: prop_name.clone(),
327 index_type,
328 metric,
329 embedding_config,
330 metadata: Default::default(),
331 })
332 }
333 "SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
334 name: index_name,
335 label: label.to_string(),
336 properties: vec![prop_name.clone()],
337 index_type: ScalarIndexType::BTree,
338 where_clause: None,
339 metadata: Default::default(),
340 }),
341 "BITMAP" => IndexDefinition::Scalar(ScalarIndexConfig {
342 name: index_name,
343 label: label.to_string(),
344 properties: vec![prop_name.clone()],
345 index_type: ScalarIndexType::Bitmap,
346 where_clause: None,
347 metadata: Default::default(),
348 }),
349 "LABEL_LIST" | "LABELLIST" => IndexDefinition::Scalar(ScalarIndexConfig {
350 name: index_name,
351 label: label.to_string(),
352 properties: vec![prop_name.clone()],
353 index_type: ScalarIndexType::LabelList,
354 where_clause: None,
355 metadata: Default::default(),
356 }),
357 "INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
358 name: index_name,
359 label: label.to_string(),
360 property: prop_name.clone(),
361 normalize: true,
362 max_terms_per_doc: 10_000,
363 metadata: Default::default(),
364 }),
365 "SPARSE" => {
366 let dimensions = storage
369 .schema_manager()
370 .schema()
371 .properties
372 .get(label)
373 .and_then(|props| props.get(prop_name))
374 .and_then(|meta| match &meta.r#type {
375 uni_common::DataType::SparseVector { dimensions } => Some(*dimensions),
376 _ => None,
377 })
378 .ok_or_else(|| UniError::InvalidArgument {
379 arg: "property".into(),
380 message: format!(
381 "Property '{prop_name}' is not a SparseVector column; cannot create a sparse index"
382 ),
383 })?;
384 let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
386 alias: emb.alias.clone(),
387 source_properties: emb.source.clone(),
388 batch_size: emb.batch_size,
389 document_prefix: emb.document_prefix.clone(),
390 query_prefix: emb.query_prefix.clone(),
391 });
392 IndexDefinition::Sparse(uni_common::core::schema::SparseVectorIndexConfig {
393 name: index_name,
394 label: label.to_string(),
395 property: prop_name.clone(),
396 dimensions,
397 quantize: config.quantize.unwrap_or(true),
399 embedding_config,
400 metadata: Default::default(),
401 })
402 }
403 _ => {
404 return Err(UniError::InvalidArgument {
405 arg: "type".into(),
406 message: format!("Unsupported index type: {}", config.index_type),
407 }
408 .into());
409 }
410 };
411
412 storage.schema_manager().add_index(def.clone())?;
413
414 let idx_mgr = storage.index_manager();
415 match def {
416 IndexDefinition::Vector(cfg) => {
422 idx_mgr.create_vector_index(cfg).await?;
423 }
424 IndexDefinition::Sparse(cfg) => {
431 idx_mgr.create_sparse_vector_index(cfg).await?;
432 }
433 other => {
435 let backend = storage.backend();
441 let table = uni_store::backend::table_names::vertex_table_name(label);
442 let count = if backend.table_exists(&table).await? {
443 backend.count_rows(&table, None).await?
444 } else {
445 0
446 };
447 if count > 0 {
448 match other {
449 IndexDefinition::Scalar(cfg) => idx_mgr.create_scalar_index(cfg).await?,
450 IndexDefinition::Inverted(cfg) => idx_mgr.create_inverted_index(cfg).await?,
451 IndexDefinition::FullText(cfg) => idx_mgr.create_fts_index(cfg).await?,
452 IndexDefinition::JsonFullText(cfg) => {
453 idx_mgr.create_json_fts_index(cfg).await?
454 }
455 _ => {}
456 }
457 }
458 }
459 }
460
461 Ok(())
462}
463
464async fn create_constraint_internal(
465 storage: &StorageManager,
466 target_name: &str,
467 config: &ConstraintConfig,
468 is_label: bool,
469) -> Result<()> {
470 let name = config.name.clone().unwrap_or_else(|| {
471 format!(
472 "{}_{}_{}",
473 target_name,
474 config.constraint_type.to_lowercase(),
475 config.properties.join("_")
476 )
477 });
478
479 let constraint_type = match config.constraint_type.to_uppercase().as_str() {
480 "UNIQUE" => ConstraintType::Unique {
481 properties: config.properties.clone(),
482 },
483 "EXISTS" => {
484 if config.properties.len() != 1 {
485 return Err(UniError::InvalidArgument {
486 arg: "properties".into(),
487 message: "EXISTS constraint requires exactly one property".into(),
488 }
489 .into());
490 }
491 ConstraintType::Exists {
492 property: config.properties[0].clone(),
493 }
494 }
495 _ => {
496 return Err(UniError::InvalidArgument {
497 arg: "type".into(),
498 message: format!("Unsupported constraint type: {}", config.constraint_type),
499 }
500 .into());
501 }
502 };
503
504 let target = if is_label {
505 ConstraintTarget::Label(target_name.to_string())
506 } else {
507 ConstraintTarget::EdgeType(target_name.to_string())
508 };
509
510 let constraint = Constraint {
511 name,
512 constraint_type,
513 target,
514 enabled: true,
515 };
516
517 storage.schema_manager().add_constraint(constraint)?;
518 Ok(())
519}
520
521fn parse_data_type(s: &str) -> Result<DataType> {
522 let s = s.trim();
523 if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
524 let inner = &s[5..s.len() - 1];
525 let inner_type = parse_data_type(inner)?;
526 return Ok(DataType::List(Box::new(inner_type)));
527 }
528 if s.to_uppercase().starts_with("MAP<") && s.ends_with('>') {
529 let (k_str, v_str) = split_map_kv(&s[4..s.len() - 1])?;
530 let key_type = parse_data_type(&k_str)?;
531 if !matches!(key_type, DataType::String) {
532 return Err(UniError::InvalidArgument {
533 arg: "type".into(),
534 message: format!("MAP key type must be STRING, got: {k_str}"),
535 }
536 .into());
537 }
538 let value_type = parse_data_type(&v_str)?;
539 return Ok(DataType::Map(Box::new(key_type), Box::new(value_type)));
540 }
541
542 match s.to_uppercase().as_str() {
543 "STRING" | "UTF8" => Ok(DataType::String),
544 "INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
545 "INT32" => Ok(DataType::Int32),
546 "FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
547 "FLOAT32" => Ok(DataType::Float32),
548 "BOOL" | "BOOLEAN" => Ok(DataType::Bool),
549 "DATETIME" => Ok(DataType::DateTime),
550 "DATE" => Ok(DataType::Date),
551 "BTIC" => Ok(DataType::Btic),
552 "VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
553 _ => Err(UniError::InvalidArgument {
554 arg: "type".into(),
555 message: format!("Unknown data type: {}", s),
556 }
557 .into()),
558 }
559}
560
561fn split_map_kv(inner: &str) -> Result<(String, String)> {
564 let mut depth = 0i32;
565 for (i, c) in inner.char_indices() {
566 match c {
567 '<' | '(' => depth += 1,
568 '>' | ')' => depth -= 1,
569 ',' if depth == 0 => {
570 let k = inner[..i].trim();
571 let v = inner[i + 1..].trim();
572 if k.is_empty() || v.is_empty() {
573 return Err(UniError::InvalidArgument {
574 arg: "type".into(),
575 message: "MAP<K,V> requires both a key and a value type".into(),
576 }
577 .into());
578 }
579 return Ok((k.to_string(), v.to_string()));
580 }
581 _ => {}
582 }
583 }
584 Err(UniError::InvalidArgument {
585 arg: "type".into(),
586 message: "MAP<K,V> requires a comma separating key and value types".into(),
587 }
588 .into())
589}