1use anyhow::Result;
5use serde::Deserialize;
6use std::collections::HashMap;
7use uni_common::Value;
8use uni_common::{
9 UniError,
10 core::schema::{
11 Constraint, ConstraintTarget, ConstraintType, DataType, DistanceMetric, EmbeddingConfig,
12 IndexDefinition, ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
13 validate_identifier,
14 },
15};
16use uni_store::storage::StorageManager;
17
18#[derive(Deserialize)]
19struct LabelConfig {
20 #[serde(default)]
21 properties: HashMap<String, PropertyConfig>,
22 #[serde(default)]
23 indexes: Vec<IndexConfig>,
24 #[serde(default)]
25 constraints: Vec<ConstraintConfig>,
26}
27
28#[derive(Deserialize)]
29struct PropertyConfig {
30 #[serde(rename = "type")]
31 data_type: String,
32 #[serde(default = "default_nullable")]
33 nullable: bool,
34}
35
36fn default_nullable() -> bool {
37 true
38}
39
40#[derive(Deserialize)]
41struct IndexConfig {
42 property: Option<String>,
43 #[serde(rename = "type")]
44 index_type: String,
45 #[expect(dead_code)]
47 dimensions: Option<usize>,
48 metric: Option<String>,
49 algorithm: Option<String>,
50 partitions: Option<u32>,
51 m: Option<u32>,
52 ef_construction: Option<u32>,
53 sub_vectors: Option<u32>,
54 num_bits: Option<u8>,
55 embedding: Option<EmbeddingOptions>,
56 name: Option<String>,
58}
59
60#[derive(Deserialize)]
62struct EmbeddingOptions {
63 alias: String,
64 source: Vec<String>,
65 #[serde(default = "default_batch_size")]
66 batch_size: usize,
67}
68
69fn default_batch_size() -> usize {
70 32
71}
72
73#[derive(Deserialize)]
74struct ConstraintConfig {
75 #[serde(rename = "type")]
76 constraint_type: String,
77 properties: Vec<String>,
78 name: Option<String>,
79}
80
81pub async fn create_label(
82 storage: &StorageManager,
83 name: &str,
84 config_val: &Value,
85) -> Result<bool> {
86 validate_identifier(name)?;
87
88 if storage.schema_manager().schema().labels.contains_key(name) {
89 return Err(UniError::LabelAlreadyExists {
90 label: name.to_string(),
91 }
92 .into());
93 }
94
95 let json_val: serde_json::Value = config_val.clone().into();
96 let config: LabelConfig =
97 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
98 arg: "config".to_string(),
99 message: e.to_string(),
100 })?;
101
102 storage.schema_manager().add_label(name)?;
104
105 for (prop_name, prop_config) in config.properties {
107 validate_identifier(&prop_name)?;
108 let data_type = parse_data_type(&prop_config.data_type)?;
109 storage
110 .schema_manager()
111 .add_property(name, &prop_name, data_type, prop_config.nullable)?;
112 }
113
114 for idx in config.indexes {
116 if idx.property.is_none() {
117 return Err(UniError::InvalidArgument {
118 arg: "indexes".into(),
119 message: "Property name required for index definition".into(),
120 }
121 .into());
122 }
123 create_index_internal(storage, name, &idx).await?;
124 }
125
126 for c in config.constraints {
128 create_constraint_internal(storage, name, &c, true).await?;
129 }
130
131 storage.schema_manager().save().await?;
132 Ok(true)
133}
134
135pub async fn create_edge_type(
136 storage: &StorageManager,
137 name: &str,
138 src_labels: Vec<String>,
139 dst_labels: Vec<String>,
140 config_val: &Value,
141) -> Result<bool> {
142 validate_identifier(name)?;
143
144 let json_val: serde_json::Value = config_val.clone().into();
145 let config: LabelConfig =
146 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
147 arg: "config".to_string(),
148 message: e.to_string(),
149 })?;
150
151 storage
152 .schema_manager()
153 .add_edge_type(name, src_labels, dst_labels)?;
154
155 for (prop_name, prop_config) in config.properties {
156 validate_identifier(&prop_name)?;
157 let data_type = parse_data_type(&prop_config.data_type)?;
158 storage
159 .schema_manager()
160 .add_property(name, &prop_name, data_type, prop_config.nullable)?;
161 }
162
163 for c in config.constraints {
165 create_constraint_internal(storage, name, &c, false).await?;
166 }
167
168 storage.schema_manager().save().await?;
169 Ok(true)
170}
171
172pub async fn create_index(
173 storage: &StorageManager,
174 label: &str,
175 property: &str,
176 config_val: &Value,
177) -> Result<bool> {
178 let json_val: serde_json::Value = config_val.clone().into();
179 let mut config: IndexConfig =
180 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
181 arg: "config".to_string(),
182 message: e.to_string(),
183 })?;
184
185 config.property = Some(property.to_string());
187
188 create_index_internal(storage, label, &config).await?;
189 storage.schema_manager().save().await?;
190 Ok(true)
191}
192
193pub async fn create_constraint(
194 storage: &StorageManager,
195 label: &str,
196 constraint_type: &str,
197 properties: Vec<String>,
198) -> Result<bool> {
199 let config = ConstraintConfig {
200 constraint_type: constraint_type.to_string(),
201 properties,
202 name: None,
203 };
204 create_constraint_internal(storage, label, &config, true).await?;
206 storage.schema_manager().save().await?;
207 Ok(true)
208}
209
210pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
211 storage.schema_manager().drop_label(name, true)?;
212 storage.schema_manager().save().await?;
213 Ok(true)
214}
215
216pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
217 storage.schema_manager().drop_edge_type(name, true)?;
218 storage.schema_manager().save().await?;
219 Ok(true)
220}
221
222pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
223 storage.schema_manager().remove_index(name)?;
224 storage.schema_manager().save().await?;
225 Ok(true)
226}
227
228pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
229 storage.schema_manager().drop_constraint(name, true)?;
230 storage.schema_manager().save().await?;
231 Ok(true)
232}
233
234async fn create_index_internal(
237 storage: &StorageManager,
238 label: &str,
239 config: &IndexConfig,
240) -> Result<()> {
241 let prop_name = config
242 .property
243 .as_ref()
244 .ok_or_else(|| UniError::InvalidArgument {
245 arg: "property".into(),
246 message: "Property is missing".into(),
247 })?;
248
249 let index_name = config.name.clone().unwrap_or_else(|| {
250 format!(
251 "{}_{}_{}",
252 label,
253 prop_name,
254 config.index_type.to_lowercase()
255 )
256 });
257
258 let def = match config.index_type.to_uppercase().as_str() {
259 "VECTOR" => {
260 let metric = match config.metric.as_deref().unwrap_or("cosine") {
261 "cosine" => DistanceMetric::Cosine,
262 "l2" | "euclidean" => DistanceMetric::L2,
263 "dot" => DistanceMetric::Dot,
264 _ => {
265 return Err(UniError::InvalidArgument {
266 arg: "metric".into(),
267 message: "Invalid metric".into(),
268 }
269 .into());
270 }
271 };
272
273 let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
275 alias: emb.alias.clone(),
276 source_properties: emb.source.clone(),
277 batch_size: emb.batch_size,
278 });
279
280 let algorithm = config.algorithm.as_deref().unwrap_or("hnsw");
281 let index_type = match algorithm.to_lowercase().as_str() {
282 "flat" => VectorIndexType::Flat,
283 "ivf_flat" => VectorIndexType::IvfFlat {
284 num_partitions: config.partitions.unwrap_or(256),
285 },
286 "ivf_pq" => VectorIndexType::IvfPq {
287 num_partitions: config.partitions.unwrap_or(256),
288 num_sub_vectors: config.sub_vectors.unwrap_or(16),
289 bits_per_subvector: config.num_bits.unwrap_or(8),
290 },
291 "ivf_sq" => VectorIndexType::IvfSq {
292 num_partitions: config.partitions.unwrap_or(256),
293 },
294 "ivf_rq" => VectorIndexType::IvfRq {
295 num_partitions: config.partitions.unwrap_or(256),
296 num_bits: config.num_bits,
297 },
298 "hnsw_flat" => VectorIndexType::HnswFlat {
299 m: config.m.unwrap_or(16),
300 ef_construction: config.ef_construction.unwrap_or(200),
301 num_partitions: config.partitions,
302 },
303 "hnsw_pq" => VectorIndexType::HnswPq {
304 m: config.m.unwrap_or(16),
305 ef_construction: config.ef_construction.unwrap_or(200),
306 num_sub_vectors: config.sub_vectors.unwrap_or(16),
307 num_partitions: config.partitions,
308 },
309 _ => VectorIndexType::HnswSq {
310 m: config.m.unwrap_or(16),
311 ef_construction: config.ef_construction.unwrap_or(200),
312 num_partitions: config.partitions,
313 },
314 };
315
316 IndexDefinition::Vector(VectorIndexConfig {
317 name: index_name,
318 label: label.to_string(),
319 property: prop_name.clone(),
320 index_type,
321 metric,
322 embedding_config,
323 metadata: Default::default(),
324 })
325 }
326 "SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
327 name: index_name,
328 label: label.to_string(),
329 properties: vec![prop_name.clone()],
330 index_type: ScalarIndexType::BTree,
331 where_clause: None,
332 metadata: Default::default(),
333 }),
334 "BITMAP" => IndexDefinition::Scalar(ScalarIndexConfig {
335 name: index_name,
336 label: label.to_string(),
337 properties: vec![prop_name.clone()],
338 index_type: ScalarIndexType::Bitmap,
339 where_clause: None,
340 metadata: Default::default(),
341 }),
342 "LABEL_LIST" | "LABELLIST" => IndexDefinition::Scalar(ScalarIndexConfig {
343 name: index_name,
344 label: label.to_string(),
345 properties: vec![prop_name.clone()],
346 index_type: ScalarIndexType::LabelList,
347 where_clause: None,
348 metadata: Default::default(),
349 }),
350 "INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
351 name: index_name,
352 label: label.to_string(),
353 property: prop_name.clone(),
354 normalize: true,
355 max_terms_per_doc: 10_000,
356 metadata: Default::default(),
357 }),
358 _ => {
359 return Err(UniError::InvalidArgument {
360 arg: "type".into(),
361 message: format!("Unsupported index type: {}", config.index_type),
362 }
363 .into());
364 }
365 };
366
367 storage.schema_manager().add_index(def.clone())?;
368
369 let count = if let Ok(ds) = storage.vertex_dataset(label) {
371 if let Ok(raw) = ds.open_raw().await {
372 raw.count_rows(None).await.unwrap_or(0)
373 } else {
374 0
375 }
376 } else {
377 0
378 };
379
380 tracing::debug!("create_index_internal count for {}: {}", label, count);
381
382 if count > 0 {
383 let idx_mgr = storage.index_manager();
384 match def {
385 IndexDefinition::Vector(cfg) => {
386 idx_mgr.create_vector_index(cfg).await?;
387 }
388 IndexDefinition::Scalar(cfg) => {
389 idx_mgr.create_scalar_index(cfg).await?;
390 }
391 IndexDefinition::Inverted(cfg) => {
392 idx_mgr.create_inverted_index(cfg).await?;
393 }
394 IndexDefinition::FullText(cfg) => {
395 idx_mgr.create_fts_index(cfg).await?;
396 }
397 IndexDefinition::JsonFullText(cfg) => {
398 idx_mgr.create_json_fts_index(cfg).await?;
399 }
400 _ => {}
401 }
402 }
403
404 Ok(())
405}
406
407async fn create_constraint_internal(
408 storage: &StorageManager,
409 target_name: &str,
410 config: &ConstraintConfig,
411 is_label: bool,
412) -> Result<()> {
413 let name = config.name.clone().unwrap_or_else(|| {
414 format!(
415 "{}_{}_{}",
416 target_name,
417 config.constraint_type.to_lowercase(),
418 config.properties.join("_")
419 )
420 });
421
422 let constraint_type = match config.constraint_type.to_uppercase().as_str() {
423 "UNIQUE" => ConstraintType::Unique {
424 properties: config.properties.clone(),
425 },
426 "EXISTS" => {
427 if config.properties.len() != 1 {
428 return Err(UniError::InvalidArgument {
429 arg: "properties".into(),
430 message: "EXISTS constraint requires exactly one property".into(),
431 }
432 .into());
433 }
434 ConstraintType::Exists {
435 property: config.properties[0].clone(),
436 }
437 }
438 _ => {
439 return Err(UniError::InvalidArgument {
440 arg: "type".into(),
441 message: format!("Unsupported constraint type: {}", config.constraint_type),
442 }
443 .into());
444 }
445 };
446
447 let target = if is_label {
448 ConstraintTarget::Label(target_name.to_string())
449 } else {
450 ConstraintTarget::EdgeType(target_name.to_string())
451 };
452
453 let constraint = Constraint {
454 name,
455 constraint_type,
456 target,
457 enabled: true,
458 };
459
460 storage.schema_manager().add_constraint(constraint)?;
461 Ok(())
462}
463
464fn parse_data_type(s: &str) -> Result<DataType> {
465 let s = s.trim();
466 if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
467 let inner = &s[5..s.len() - 1];
468 let inner_type = parse_data_type(inner)?;
469 return Ok(DataType::List(Box::new(inner_type)));
470 }
471
472 match s.to_uppercase().as_str() {
473 "STRING" | "UTF8" => Ok(DataType::String),
474 "INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
475 "INT32" => Ok(DataType::Int32),
476 "FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
477 "FLOAT32" => Ok(DataType::Float32),
478 "BOOL" | "BOOLEAN" => Ok(DataType::Bool),
479 "DATETIME" => Ok(DataType::DateTime),
480 "DATE" => Ok(DataType::Date),
481 "BTIC" => Ok(DataType::Btic),
482 "VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
483 _ => Err(UniError::InvalidArgument {
484 arg: "type".into(),
485 message: format!("Unknown data type: {}", s),
486 }
487 .into()),
488 }
489}