1use anyhow::Result;
5use serde::Deserialize;
6use std::collections::HashMap;
7use uni_common::Value;
8use uni_common::{
9 UniError,
10 core::schema::{
11 Constraint, ConstraintTarget, ConstraintType, DataType, EmbeddingConfig, IndexDefinition,
12 ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, validate_identifier,
13 },
14};
15use uni_store::storage::StorageManager;
16
17#[derive(Deserialize)]
18struct LabelConfig {
19 #[serde(default)]
20 properties: HashMap<String, PropertyConfig>,
21 #[serde(default)]
22 indexes: Vec<IndexConfig>,
23 #[serde(default)]
24 constraints: Vec<ConstraintConfig>,
25 #[serde(default)]
26 pub description: Option<String>,
27}
28
29#[derive(Deserialize)]
30struct PropertyConfig {
31 #[serde(rename = "type")]
32 data_type: String,
33 #[serde(default = "default_nullable")]
34 nullable: bool,
35 #[serde(default)]
36 pub description: Option<String>,
37}
38
39fn default_nullable() -> bool {
40 true
41}
42
43#[derive(Deserialize)]
44struct IndexConfig {
45 property: Option<String>,
46 #[serde(rename = "type")]
47 index_type: String,
48 #[expect(dead_code)]
50 dimensions: Option<usize>,
51 metric: Option<String>,
52 algorithm: Option<String>,
53 partitions: Option<u32>,
54 m: Option<u32>,
55 ef_construction: Option<u32>,
56 sub_vectors: Option<u32>,
57 num_bits: Option<u8>,
58 k_sim: Option<u32>,
60 reps: Option<u32>,
61 d_proj: Option<u32>,
62 seed: Option<u64>,
63 inner: Option<String>,
64 embedding: Option<EmbeddingOptions>,
65 name: Option<String>,
67}
68
69#[derive(Deserialize)]
71struct EmbeddingOptions {
72 alias: String,
73 source: Vec<String>,
74 #[serde(default = "default_batch_size")]
75 batch_size: usize,
76 #[serde(default)]
77 document_prefix: Option<String>,
78 #[serde(default)]
79 query_prefix: Option<String>,
80}
81
82fn default_batch_size() -> usize {
83 32
84}
85
86#[derive(Deserialize)]
87struct ConstraintConfig {
88 #[serde(rename = "type")]
89 constraint_type: String,
90 properties: Vec<String>,
91 name: Option<String>,
92}
93
94pub async fn create_label(
95 storage: &StorageManager,
96 name: &str,
97 config_val: &Value,
98) -> Result<bool> {
99 validate_identifier(name)?;
100
101 if storage.schema_manager().schema().labels.contains_key(name) {
102 return Err(UniError::LabelAlreadyExists {
103 label: name.to_string(),
104 }
105 .into());
106 }
107
108 let json_val: serde_json::Value = config_val.clone().into();
109 let config: LabelConfig =
110 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
111 arg: "config".to_string(),
112 message: e.to_string(),
113 })?;
114
115 storage
117 .schema_manager()
118 .add_label_with_desc(name, config.description)?;
119
120 for (prop_name, prop_config) in config.properties {
122 validate_identifier(&prop_name)?;
123 let data_type = parse_data_type(&prop_config.data_type)?;
124 storage.schema_manager().add_property_with_desc(
125 name,
126 &prop_name,
127 data_type,
128 prop_config.nullable,
129 prop_config.description,
130 )?;
131 }
132
133 for idx in config.indexes {
135 if idx.property.is_none() {
136 return Err(UniError::InvalidArgument {
137 arg: "indexes".into(),
138 message: "Property name required for index definition".into(),
139 }
140 .into());
141 }
142 create_index_internal(storage, name, &idx).await?;
143 }
144
145 for c in config.constraints {
147 create_constraint_internal(storage, name, &c, true).await?;
148 }
149
150 storage.schema_manager().save().await?;
151 Ok(true)
152}
153
154pub async fn create_edge_type(
155 storage: &StorageManager,
156 name: &str,
157 src_labels: Vec<String>,
158 dst_labels: Vec<String>,
159 config_val: &Value,
160) -> Result<bool> {
161 validate_identifier(name)?;
162
163 let json_val: serde_json::Value = config_val.clone().into();
164 let config: LabelConfig =
165 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
166 arg: "config".to_string(),
167 message: e.to_string(),
168 })?;
169
170 storage.schema_manager().add_edge_type_with_desc(
171 name,
172 src_labels,
173 dst_labels,
174 config.description,
175 )?;
176
177 for (prop_name, prop_config) in config.properties {
178 validate_identifier(&prop_name)?;
179 let data_type = parse_data_type(&prop_config.data_type)?;
180 storage.schema_manager().add_property_with_desc(
181 name,
182 &prop_name,
183 data_type,
184 prop_config.nullable,
185 prop_config.description,
186 )?;
187 }
188
189 for c in config.constraints {
191 create_constraint_internal(storage, name, &c, false).await?;
192 }
193
194 storage.schema_manager().save().await?;
195 Ok(true)
196}
197
198pub async fn create_index(
199 storage: &StorageManager,
200 label: &str,
201 property: &str,
202 config_val: &Value,
203) -> Result<bool> {
204 let json_val: serde_json::Value = config_val.clone().into();
205 let mut config: IndexConfig =
206 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
207 arg: "config".to_string(),
208 message: e.to_string(),
209 })?;
210
211 config.property = Some(property.to_string());
213
214 create_index_internal(storage, label, &config).await?;
215 storage.schema_manager().save().await?;
216 Ok(true)
217}
218
219pub async fn create_constraint(
220 storage: &StorageManager,
221 label: &str,
222 constraint_type: &str,
223 properties: Vec<String>,
224) -> Result<bool> {
225 let config = ConstraintConfig {
226 constraint_type: constraint_type.to_string(),
227 properties,
228 name: None,
229 };
230 create_constraint_internal(storage, label, &config, true).await?;
232 storage.schema_manager().save().await?;
233 Ok(true)
234}
235
236pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
237 storage.schema_manager().drop_label(name, true)?;
238 storage.schema_manager().save().await?;
239 Ok(true)
240}
241
242pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
243 storage.schema_manager().drop_edge_type(name, true)?;
244 storage.schema_manager().save().await?;
245 Ok(true)
246}
247
248pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
249 storage.schema_manager().remove_index(name)?;
250 storage.schema_manager().save().await?;
251 Ok(true)
252}
253
254pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
255 storage.schema_manager().drop_constraint(name, true)?;
256 storage.schema_manager().save().await?;
257 Ok(true)
258}
259
260async fn create_index_internal(
263 storage: &StorageManager,
264 label: &str,
265 config: &IndexConfig,
266) -> Result<()> {
267 let prop_name = config
268 .property
269 .as_ref()
270 .ok_or_else(|| UniError::InvalidArgument {
271 arg: "property".into(),
272 message: "Property is missing".into(),
273 })?;
274
275 let index_name = config.name.clone().unwrap_or_else(|| {
276 format!(
277 "{}_{}_{}",
278 label,
279 prop_name,
280 config.index_type.to_lowercase()
281 )
282 });
283
284 let def = match config.index_type.to_uppercase().as_str() {
285 "VECTOR" => {
286 let metric =
290 uni_common::vector_index_opts::parse_vector_metric(config.metric.as_deref())
291 .map_err(|e| UniError::InvalidArgument {
292 arg: "metric".into(),
293 message: e.to_string(),
294 })?;
295
296 let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
298 alias: emb.alias.clone(),
299 source_properties: emb.source.clone(),
300 batch_size: emb.batch_size,
301 document_prefix: emb.document_prefix.clone(),
302 query_prefix: emb.query_prefix.clone(),
303 });
304
305 let index_type = uni_common::vector_index_opts::build_vector_index_type(
306 &uni_common::vector_index_opts::VectorIndexOpts {
307 type_name: config.algorithm.as_deref(),
308 partitions: config.partitions,
309 m: config.m,
310 ef_construction: config.ef_construction,
311 sub_vectors: config.sub_vectors,
312 num_bits: config.num_bits,
313 k_sim: config.k_sim,
314 reps: config.reps,
315 d_proj: config.d_proj,
316 seed: config.seed,
317 inner: config.inner.as_deref(),
318 },
319 );
320
321 IndexDefinition::Vector(VectorIndexConfig {
322 name: index_name,
323 label: label.to_string(),
324 property: prop_name.clone(),
325 index_type,
326 metric,
327 embedding_config,
328 metadata: Default::default(),
329 })
330 }
331 "SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
332 name: index_name,
333 label: label.to_string(),
334 properties: vec![prop_name.clone()],
335 index_type: ScalarIndexType::BTree,
336 where_clause: None,
337 metadata: Default::default(),
338 }),
339 "BITMAP" => IndexDefinition::Scalar(ScalarIndexConfig {
340 name: index_name,
341 label: label.to_string(),
342 properties: vec![prop_name.clone()],
343 index_type: ScalarIndexType::Bitmap,
344 where_clause: None,
345 metadata: Default::default(),
346 }),
347 "LABEL_LIST" | "LABELLIST" => IndexDefinition::Scalar(ScalarIndexConfig {
348 name: index_name,
349 label: label.to_string(),
350 properties: vec![prop_name.clone()],
351 index_type: ScalarIndexType::LabelList,
352 where_clause: None,
353 metadata: Default::default(),
354 }),
355 "INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
356 name: index_name,
357 label: label.to_string(),
358 property: prop_name.clone(),
359 normalize: true,
360 max_terms_per_doc: 10_000,
361 metadata: Default::default(),
362 }),
363 _ => {
364 return Err(UniError::InvalidArgument {
365 arg: "type".into(),
366 message: format!("Unsupported index type: {}", config.index_type),
367 }
368 .into());
369 }
370 };
371
372 storage.schema_manager().add_index(def.clone())?;
373
374 let idx_mgr = storage.index_manager();
375 match def {
376 IndexDefinition::Vector(cfg) => {
382 idx_mgr.create_vector_index(cfg).await?;
383 }
384 other => {
386 let count = if let Ok(ds) = storage.vertex_dataset(label) {
387 if let Ok(raw) = ds.open_raw().await {
388 raw.count_rows(None).await.unwrap_or(0)
389 } else {
390 0
391 }
392 } else {
393 0
394 };
395 if count > 0 {
396 match other {
397 IndexDefinition::Scalar(cfg) => idx_mgr.create_scalar_index(cfg).await?,
398 IndexDefinition::Inverted(cfg) => idx_mgr.create_inverted_index(cfg).await?,
399 IndexDefinition::FullText(cfg) => idx_mgr.create_fts_index(cfg).await?,
400 IndexDefinition::JsonFullText(cfg) => {
401 idx_mgr.create_json_fts_index(cfg).await?
402 }
403 _ => {}
404 }
405 }
406 }
407 }
408
409 Ok(())
410}
411
412async fn create_constraint_internal(
413 storage: &StorageManager,
414 target_name: &str,
415 config: &ConstraintConfig,
416 is_label: bool,
417) -> Result<()> {
418 let name = config.name.clone().unwrap_or_else(|| {
419 format!(
420 "{}_{}_{}",
421 target_name,
422 config.constraint_type.to_lowercase(),
423 config.properties.join("_")
424 )
425 });
426
427 let constraint_type = match config.constraint_type.to_uppercase().as_str() {
428 "UNIQUE" => ConstraintType::Unique {
429 properties: config.properties.clone(),
430 },
431 "EXISTS" => {
432 if config.properties.len() != 1 {
433 return Err(UniError::InvalidArgument {
434 arg: "properties".into(),
435 message: "EXISTS constraint requires exactly one property".into(),
436 }
437 .into());
438 }
439 ConstraintType::Exists {
440 property: config.properties[0].clone(),
441 }
442 }
443 _ => {
444 return Err(UniError::InvalidArgument {
445 arg: "type".into(),
446 message: format!("Unsupported constraint type: {}", config.constraint_type),
447 }
448 .into());
449 }
450 };
451
452 let target = if is_label {
453 ConstraintTarget::Label(target_name.to_string())
454 } else {
455 ConstraintTarget::EdgeType(target_name.to_string())
456 };
457
458 let constraint = Constraint {
459 name,
460 constraint_type,
461 target,
462 enabled: true,
463 };
464
465 storage.schema_manager().add_constraint(constraint)?;
466 Ok(())
467}
468
469fn parse_data_type(s: &str) -> Result<DataType> {
470 let s = s.trim();
471 if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
472 let inner = &s[5..s.len() - 1];
473 let inner_type = parse_data_type(inner)?;
474 return Ok(DataType::List(Box::new(inner_type)));
475 }
476 if s.to_uppercase().starts_with("MAP<") && s.ends_with('>') {
477 let (k_str, v_str) = split_map_kv(&s[4..s.len() - 1])?;
478 let key_type = parse_data_type(&k_str)?;
479 if !matches!(key_type, DataType::String) {
480 return Err(UniError::InvalidArgument {
481 arg: "type".into(),
482 message: format!("MAP key type must be STRING, got: {k_str}"),
483 }
484 .into());
485 }
486 let value_type = parse_data_type(&v_str)?;
487 return Ok(DataType::Map(Box::new(key_type), Box::new(value_type)));
488 }
489
490 match s.to_uppercase().as_str() {
491 "STRING" | "UTF8" => Ok(DataType::String),
492 "INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
493 "INT32" => Ok(DataType::Int32),
494 "FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
495 "FLOAT32" => Ok(DataType::Float32),
496 "BOOL" | "BOOLEAN" => Ok(DataType::Bool),
497 "DATETIME" => Ok(DataType::DateTime),
498 "DATE" => Ok(DataType::Date),
499 "BTIC" => Ok(DataType::Btic),
500 "VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
501 _ => Err(UniError::InvalidArgument {
502 arg: "type".into(),
503 message: format!("Unknown data type: {}", s),
504 }
505 .into()),
506 }
507}
508
509fn split_map_kv(inner: &str) -> Result<(String, String)> {
512 let mut depth = 0i32;
513 for (i, c) in inner.char_indices() {
514 match c {
515 '<' | '(' => depth += 1,
516 '>' | ')' => depth -= 1,
517 ',' if depth == 0 => {
518 let k = inner[..i].trim();
519 let v = inner[i + 1..].trim();
520 if k.is_empty() || v.is_empty() {
521 return Err(UniError::InvalidArgument {
522 arg: "type".into(),
523 message: "MAP<K,V> requires both a key and a value type".into(),
524 }
525 .into());
526 }
527 return Ok((k.to_string(), v.to_string()));
528 }
529 _ => {}
530 }
531 }
532 Err(UniError::InvalidArgument {
533 arg: "type".into(),
534 message: "MAP<K,V> requires a comma separating key and value types".into(),
535 }
536 .into())
537}