1use anyhow::Result;
5use serde::Deserialize;
6use std::collections::HashMap;
7use uni_common::Value;
8use uni_common::{
9 UniError,
10 core::schema::{
11 Constraint, ConstraintTarget, ConstraintType, DataType, DistanceMetric, EmbeddingConfig,
12 IndexDefinition, ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
13 validate_identifier,
14 },
15};
16use uni_store::storage::StorageManager;
17
18#[derive(Deserialize)]
19struct LabelConfig {
20 #[serde(default)]
21 properties: HashMap<String, PropertyConfig>,
22 #[serde(default)]
23 indexes: Vec<IndexConfig>,
24 #[serde(default)]
25 constraints: Vec<ConstraintConfig>,
26 #[serde(default)]
27 pub description: Option<String>,
28}
29
30#[derive(Deserialize)]
31struct PropertyConfig {
32 #[serde(rename = "type")]
33 data_type: String,
34 #[serde(default = "default_nullable")]
35 nullable: bool,
36 #[serde(default)]
37 pub description: Option<String>,
38}
39
40fn default_nullable() -> bool {
41 true
42}
43
44#[derive(Deserialize)]
45struct IndexConfig {
46 property: Option<String>,
47 #[serde(rename = "type")]
48 index_type: String,
49 #[expect(dead_code)]
51 dimensions: Option<usize>,
52 metric: Option<String>,
53 algorithm: Option<String>,
54 partitions: Option<u32>,
55 m: Option<u32>,
56 ef_construction: Option<u32>,
57 sub_vectors: Option<u32>,
58 num_bits: Option<u8>,
59 embedding: Option<EmbeddingOptions>,
60 name: Option<String>,
62}
63
64#[derive(Deserialize)]
66struct EmbeddingOptions {
67 alias: String,
68 source: Vec<String>,
69 #[serde(default = "default_batch_size")]
70 batch_size: usize,
71 #[serde(default)]
72 document_prefix: Option<String>,
73 #[serde(default)]
74 query_prefix: Option<String>,
75}
76
77fn default_batch_size() -> usize {
78 32
79}
80
81#[derive(Deserialize)]
82struct ConstraintConfig {
83 #[serde(rename = "type")]
84 constraint_type: String,
85 properties: Vec<String>,
86 name: Option<String>,
87}
88
89pub async fn create_label(
90 storage: &StorageManager,
91 name: &str,
92 config_val: &Value,
93) -> Result<bool> {
94 validate_identifier(name)?;
95
96 if storage.schema_manager().schema().labels.contains_key(name) {
97 return Err(UniError::LabelAlreadyExists {
98 label: name.to_string(),
99 }
100 .into());
101 }
102
103 let json_val: serde_json::Value = config_val.clone().into();
104 let config: LabelConfig =
105 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
106 arg: "config".to_string(),
107 message: e.to_string(),
108 })?;
109
110 storage
112 .schema_manager()
113 .add_label_with_desc(name, config.description)?;
114
115 for (prop_name, prop_config) in config.properties {
117 validate_identifier(&prop_name)?;
118 let data_type = parse_data_type(&prop_config.data_type)?;
119 storage.schema_manager().add_property_with_desc(
120 name,
121 &prop_name,
122 data_type,
123 prop_config.nullable,
124 prop_config.description,
125 )?;
126 }
127
128 for idx in config.indexes {
130 if idx.property.is_none() {
131 return Err(UniError::InvalidArgument {
132 arg: "indexes".into(),
133 message: "Property name required for index definition".into(),
134 }
135 .into());
136 }
137 create_index_internal(storage, name, &idx).await?;
138 }
139
140 for c in config.constraints {
142 create_constraint_internal(storage, name, &c, true).await?;
143 }
144
145 storage.schema_manager().save().await?;
146 Ok(true)
147}
148
149pub async fn create_edge_type(
150 storage: &StorageManager,
151 name: &str,
152 src_labels: Vec<String>,
153 dst_labels: Vec<String>,
154 config_val: &Value,
155) -> Result<bool> {
156 validate_identifier(name)?;
157
158 let json_val: serde_json::Value = config_val.clone().into();
159 let config: LabelConfig =
160 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
161 arg: "config".to_string(),
162 message: e.to_string(),
163 })?;
164
165 storage.schema_manager().add_edge_type_with_desc(
166 name,
167 src_labels,
168 dst_labels,
169 config.description,
170 )?;
171
172 for (prop_name, prop_config) in config.properties {
173 validate_identifier(&prop_name)?;
174 let data_type = parse_data_type(&prop_config.data_type)?;
175 storage.schema_manager().add_property_with_desc(
176 name,
177 &prop_name,
178 data_type,
179 prop_config.nullable,
180 prop_config.description,
181 )?;
182 }
183
184 for c in config.constraints {
186 create_constraint_internal(storage, name, &c, false).await?;
187 }
188
189 storage.schema_manager().save().await?;
190 Ok(true)
191}
192
193pub async fn create_index(
194 storage: &StorageManager,
195 label: &str,
196 property: &str,
197 config_val: &Value,
198) -> Result<bool> {
199 let json_val: serde_json::Value = config_val.clone().into();
200 let mut config: IndexConfig =
201 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
202 arg: "config".to_string(),
203 message: e.to_string(),
204 })?;
205
206 config.property = Some(property.to_string());
208
209 create_index_internal(storage, label, &config).await?;
210 storage.schema_manager().save().await?;
211 Ok(true)
212}
213
214pub async fn create_constraint(
215 storage: &StorageManager,
216 label: &str,
217 constraint_type: &str,
218 properties: Vec<String>,
219) -> Result<bool> {
220 let config = ConstraintConfig {
221 constraint_type: constraint_type.to_string(),
222 properties,
223 name: None,
224 };
225 create_constraint_internal(storage, label, &config, true).await?;
227 storage.schema_manager().save().await?;
228 Ok(true)
229}
230
231pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
232 storage.schema_manager().drop_label(name, true)?;
233 storage.schema_manager().save().await?;
234 Ok(true)
235}
236
237pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
238 storage.schema_manager().drop_edge_type(name, true)?;
239 storage.schema_manager().save().await?;
240 Ok(true)
241}
242
243pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
244 storage.schema_manager().remove_index(name)?;
245 storage.schema_manager().save().await?;
246 Ok(true)
247}
248
249pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
250 storage.schema_manager().drop_constraint(name, true)?;
251 storage.schema_manager().save().await?;
252 Ok(true)
253}
254
255async fn create_index_internal(
258 storage: &StorageManager,
259 label: &str,
260 config: &IndexConfig,
261) -> Result<()> {
262 let prop_name = config
263 .property
264 .as_ref()
265 .ok_or_else(|| UniError::InvalidArgument {
266 arg: "property".into(),
267 message: "Property is missing".into(),
268 })?;
269
270 let index_name = config.name.clone().unwrap_or_else(|| {
271 format!(
272 "{}_{}_{}",
273 label,
274 prop_name,
275 config.index_type.to_lowercase()
276 )
277 });
278
279 let def = match config.index_type.to_uppercase().as_str() {
280 "VECTOR" => {
281 let metric = match config.metric.as_deref().unwrap_or("cosine") {
282 "cosine" => DistanceMetric::Cosine,
283 "l2" | "euclidean" => DistanceMetric::L2,
284 "dot" => DistanceMetric::Dot,
285 _ => {
286 return Err(UniError::InvalidArgument {
287 arg: "metric".into(),
288 message: "Invalid metric".into(),
289 }
290 .into());
291 }
292 };
293
294 let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
296 alias: emb.alias.clone(),
297 source_properties: emb.source.clone(),
298 batch_size: emb.batch_size,
299 document_prefix: emb.document_prefix.clone(),
300 query_prefix: emb.query_prefix.clone(),
301 });
302
303 let algorithm = config.algorithm.as_deref().unwrap_or("hnsw");
304 let index_type = match algorithm.to_lowercase().as_str() {
305 "flat" => VectorIndexType::Flat,
306 "ivf_flat" => VectorIndexType::IvfFlat {
307 num_partitions: config.partitions.unwrap_or(256),
308 },
309 "ivf_pq" => VectorIndexType::IvfPq {
310 num_partitions: config.partitions.unwrap_or(256),
311 num_sub_vectors: config.sub_vectors.unwrap_or(16),
312 bits_per_subvector: config.num_bits.unwrap_or(8),
313 },
314 "ivf_sq" => VectorIndexType::IvfSq {
315 num_partitions: config.partitions.unwrap_or(256),
316 },
317 "ivf_rq" => VectorIndexType::IvfRq {
318 num_partitions: config.partitions.unwrap_or(256),
319 num_bits: config.num_bits,
320 },
321 "hnsw_flat" => VectorIndexType::HnswFlat {
322 m: config.m.unwrap_or(16),
323 ef_construction: config.ef_construction.unwrap_or(200),
324 num_partitions: config.partitions,
325 },
326 "hnsw_pq" => VectorIndexType::HnswPq {
327 m: config.m.unwrap_or(16),
328 ef_construction: config.ef_construction.unwrap_or(200),
329 num_sub_vectors: config.sub_vectors.unwrap_or(16),
330 num_partitions: config.partitions,
331 },
332 _ => VectorIndexType::HnswSq {
333 m: config.m.unwrap_or(16),
334 ef_construction: config.ef_construction.unwrap_or(200),
335 num_partitions: config.partitions,
336 },
337 };
338
339 IndexDefinition::Vector(VectorIndexConfig {
340 name: index_name,
341 label: label.to_string(),
342 property: prop_name.clone(),
343 index_type,
344 metric,
345 embedding_config,
346 metadata: Default::default(),
347 })
348 }
349 "SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
350 name: index_name,
351 label: label.to_string(),
352 properties: vec![prop_name.clone()],
353 index_type: ScalarIndexType::BTree,
354 where_clause: None,
355 metadata: Default::default(),
356 }),
357 "BITMAP" => IndexDefinition::Scalar(ScalarIndexConfig {
358 name: index_name,
359 label: label.to_string(),
360 properties: vec![prop_name.clone()],
361 index_type: ScalarIndexType::Bitmap,
362 where_clause: None,
363 metadata: Default::default(),
364 }),
365 "LABEL_LIST" | "LABELLIST" => IndexDefinition::Scalar(ScalarIndexConfig {
366 name: index_name,
367 label: label.to_string(),
368 properties: vec![prop_name.clone()],
369 index_type: ScalarIndexType::LabelList,
370 where_clause: None,
371 metadata: Default::default(),
372 }),
373 "INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
374 name: index_name,
375 label: label.to_string(),
376 property: prop_name.clone(),
377 normalize: true,
378 max_terms_per_doc: 10_000,
379 metadata: Default::default(),
380 }),
381 _ => {
382 return Err(UniError::InvalidArgument {
383 arg: "type".into(),
384 message: format!("Unsupported index type: {}", config.index_type),
385 }
386 .into());
387 }
388 };
389
390 storage.schema_manager().add_index(def.clone())?;
391
392 let count = if let Ok(ds) = storage.vertex_dataset(label) {
394 if let Ok(raw) = ds.open_raw().await {
395 raw.count_rows(None).await.unwrap_or(0)
396 } else {
397 0
398 }
399 } else {
400 0
401 };
402
403 tracing::debug!("create_index_internal count for {}: {}", label, count);
404
405 if count > 0 {
406 let idx_mgr = storage.index_manager();
407 match def {
408 IndexDefinition::Vector(cfg) => {
409 idx_mgr.create_vector_index(cfg).await?;
410 }
411 IndexDefinition::Scalar(cfg) => {
412 idx_mgr.create_scalar_index(cfg).await?;
413 }
414 IndexDefinition::Inverted(cfg) => {
415 idx_mgr.create_inverted_index(cfg).await?;
416 }
417 IndexDefinition::FullText(cfg) => {
418 idx_mgr.create_fts_index(cfg).await?;
419 }
420 IndexDefinition::JsonFullText(cfg) => {
421 idx_mgr.create_json_fts_index(cfg).await?;
422 }
423 _ => {}
424 }
425 }
426
427 Ok(())
428}
429
430async fn create_constraint_internal(
431 storage: &StorageManager,
432 target_name: &str,
433 config: &ConstraintConfig,
434 is_label: bool,
435) -> Result<()> {
436 let name = config.name.clone().unwrap_or_else(|| {
437 format!(
438 "{}_{}_{}",
439 target_name,
440 config.constraint_type.to_lowercase(),
441 config.properties.join("_")
442 )
443 });
444
445 let constraint_type = match config.constraint_type.to_uppercase().as_str() {
446 "UNIQUE" => ConstraintType::Unique {
447 properties: config.properties.clone(),
448 },
449 "EXISTS" => {
450 if config.properties.len() != 1 {
451 return Err(UniError::InvalidArgument {
452 arg: "properties".into(),
453 message: "EXISTS constraint requires exactly one property".into(),
454 }
455 .into());
456 }
457 ConstraintType::Exists {
458 property: config.properties[0].clone(),
459 }
460 }
461 _ => {
462 return Err(UniError::InvalidArgument {
463 arg: "type".into(),
464 message: format!("Unsupported constraint type: {}", config.constraint_type),
465 }
466 .into());
467 }
468 };
469
470 let target = if is_label {
471 ConstraintTarget::Label(target_name.to_string())
472 } else {
473 ConstraintTarget::EdgeType(target_name.to_string())
474 };
475
476 let constraint = Constraint {
477 name,
478 constraint_type,
479 target,
480 enabled: true,
481 };
482
483 storage.schema_manager().add_constraint(constraint)?;
484 Ok(())
485}
486
487fn parse_data_type(s: &str) -> Result<DataType> {
488 let s = s.trim();
489 if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
490 let inner = &s[5..s.len() - 1];
491 let inner_type = parse_data_type(inner)?;
492 return Ok(DataType::List(Box::new(inner_type)));
493 }
494
495 match s.to_uppercase().as_str() {
496 "STRING" | "UTF8" => Ok(DataType::String),
497 "INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
498 "INT32" => Ok(DataType::Int32),
499 "FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
500 "FLOAT32" => Ok(DataType::Float32),
501 "BOOL" | "BOOLEAN" => Ok(DataType::Bool),
502 "DATETIME" => Ok(DataType::DateTime),
503 "DATE" => Ok(DataType::Date),
504 "BTIC" => Ok(DataType::Btic),
505 "VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
506 _ => Err(UniError::InvalidArgument {
507 arg: "type".into(),
508 message: format!("Unknown data type: {}", s),
509 }
510 .into()),
511 }
512}