1use anyhow::Result;
5use serde::Deserialize;
6use std::collections::HashMap;
7use uni_common::Value;
8use uni_common::{
9 UniError,
10 core::schema::{
11 Constraint, ConstraintTarget, ConstraintType, DataType, DistanceMetric, EmbeddingConfig,
12 IndexDefinition, ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
13 validate_identifier,
14 },
15};
16use uni_store::storage::{IndexManager, StorageManager};
17
18#[derive(Deserialize)]
19struct LabelConfig {
20 #[serde(default)]
21 properties: HashMap<String, PropertyConfig>,
22 #[serde(default)]
23 indexes: Vec<IndexConfig>,
24 #[serde(default)]
25 constraints: Vec<ConstraintConfig>,
26}
27
28#[derive(Deserialize)]
29struct PropertyConfig {
30 #[serde(rename = "type")]
31 data_type: String,
32 #[serde(default = "default_nullable")]
33 nullable: bool,
34}
35
36fn default_nullable() -> bool {
37 true
38}
39
40#[derive(Deserialize)]
41struct IndexConfig {
42 property: Option<String>,
43 #[serde(rename = "type")]
44 index_type: String,
45 #[expect(dead_code)]
47 dimensions: Option<usize>,
48 metric: Option<String>,
49 embedding: Option<EmbeddingOptions>,
50 name: Option<String>,
52}
53
54#[derive(Deserialize)]
56struct EmbeddingOptions {
57 alias: String,
58 source: Vec<String>,
59 #[serde(default = "default_batch_size")]
60 batch_size: usize,
61}
62
63fn default_batch_size() -> usize {
64 32
65}
66
67#[derive(Deserialize)]
68struct ConstraintConfig {
69 #[serde(rename = "type")]
70 constraint_type: String,
71 properties: Vec<String>,
72 name: Option<String>,
73}
74
75pub async fn create_label(
76 storage: &StorageManager,
77 name: &str,
78 config_val: &Value,
79) -> Result<bool> {
80 validate_identifier(name)?;
81
82 if storage.schema_manager().schema().labels.contains_key(name) {
83 return Err(UniError::LabelAlreadyExists {
84 label: name.to_string(),
85 }
86 .into());
87 }
88
89 let json_val: serde_json::Value = config_val.clone().into();
90 let config: LabelConfig =
91 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
92 arg: "config".to_string(),
93 message: e.to_string(),
94 })?;
95
96 storage.schema_manager().add_label(name)?;
98
99 for (prop_name, prop_config) in config.properties {
101 validate_identifier(&prop_name)?;
102 let data_type = parse_data_type(&prop_config.data_type)?;
103 storage
104 .schema_manager()
105 .add_property(name, &prop_name, data_type, prop_config.nullable)?;
106 }
107
108 for idx in config.indexes {
110 if idx.property.is_none() {
111 return Err(UniError::InvalidArgument {
112 arg: "indexes".into(),
113 message: "Property name required for index definition".into(),
114 }
115 .into());
116 }
117 create_index_internal(storage, name, &idx).await?;
118 }
119
120 for c in config.constraints {
122 create_constraint_internal(storage, name, &c, true).await?;
123 }
124
125 storage.schema_manager().save().await?;
126 Ok(true)
127}
128
129pub async fn create_edge_type(
130 storage: &StorageManager,
131 name: &str,
132 src_labels: Vec<String>,
133 dst_labels: Vec<String>,
134 config_val: &Value,
135) -> Result<bool> {
136 validate_identifier(name)?;
137
138 let json_val: serde_json::Value = config_val.clone().into();
139 let config: LabelConfig =
140 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
141 arg: "config".to_string(),
142 message: e.to_string(),
143 })?;
144
145 storage
146 .schema_manager()
147 .add_edge_type(name, src_labels, dst_labels)?;
148
149 for (prop_name, prop_config) in config.properties {
150 validate_identifier(&prop_name)?;
151 let data_type = parse_data_type(&prop_config.data_type)?;
152 storage
153 .schema_manager()
154 .add_property(name, &prop_name, data_type, prop_config.nullable)?;
155 }
156
157 for c in config.constraints {
159 create_constraint_internal(storage, name, &c, false).await?;
160 }
161
162 storage.schema_manager().save().await?;
163 Ok(true)
164}
165
166pub async fn create_index(
167 storage: &StorageManager,
168 label: &str,
169 property: &str,
170 config_val: &Value,
171) -> Result<bool> {
172 let json_val: serde_json::Value = config_val.clone().into();
173 let mut config: IndexConfig =
174 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
175 arg: "config".to_string(),
176 message: e.to_string(),
177 })?;
178
179 config.property = Some(property.to_string());
181
182 create_index_internal(storage, label, &config).await?;
183 storage.schema_manager().save().await?;
184 Ok(true)
185}
186
187pub async fn create_constraint(
188 storage: &StorageManager,
189 label: &str,
190 constraint_type: &str,
191 properties: Vec<String>,
192) -> Result<bool> {
193 let config = ConstraintConfig {
194 constraint_type: constraint_type.to_string(),
195 properties,
196 name: None,
197 };
198 create_constraint_internal(storage, label, &config, true).await?;
200 storage.schema_manager().save().await?;
201 Ok(true)
202}
203
204pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
205 storage.schema_manager().drop_label(name, true)?;
206 storage.schema_manager().save().await?;
207 Ok(true)
208}
209
210pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
211 storage.schema_manager().drop_edge_type(name, true)?;
212 storage.schema_manager().save().await?;
213 Ok(true)
214}
215
216pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
217 storage.schema_manager().remove_index(name)?;
218 storage.schema_manager().save().await?;
219 Ok(true)
220}
221
222pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
223 storage.schema_manager().drop_constraint(name, true)?;
224 storage.schema_manager().save().await?;
225 Ok(true)
226}
227
228async fn create_index_internal(
231 storage: &StorageManager,
232 label: &str,
233 config: &IndexConfig,
234) -> Result<()> {
235 let prop_name = config
236 .property
237 .as_ref()
238 .ok_or_else(|| UniError::InvalidArgument {
239 arg: "property".into(),
240 message: "Property is missing".into(),
241 })?;
242
243 let index_name = config.name.clone().unwrap_or_else(|| {
244 format!(
245 "{}_{}_{}",
246 label,
247 prop_name,
248 config.index_type.to_lowercase()
249 )
250 });
251
252 let def = match config.index_type.to_uppercase().as_str() {
253 "VECTOR" => {
254 let metric = match config.metric.as_deref().unwrap_or("cosine") {
255 "cosine" => DistanceMetric::Cosine,
256 "l2" | "euclidean" => DistanceMetric::L2,
257 "dot" => DistanceMetric::Dot,
258 _ => {
259 return Err(UniError::InvalidArgument {
260 arg: "metric".into(),
261 message: "Invalid metric".into(),
262 }
263 .into());
264 }
265 };
266
267 let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
269 alias: emb.alias.clone(),
270 source_properties: emb.source.clone(),
271 batch_size: emb.batch_size,
272 });
273
274 IndexDefinition::Vector(VectorIndexConfig {
275 name: index_name,
276 label: label.to_string(),
277 property: prop_name.clone(),
278 index_type: VectorIndexType::Hnsw {
279 m: 16,
280 ef_construction: 200,
281 ef_search: 50,
282 }, metric,
284 embedding_config,
285 metadata: Default::default(),
286 })
287 }
288 "SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
289 name: index_name,
290 label: label.to_string(),
291 properties: vec![prop_name.clone()],
292 index_type: ScalarIndexType::BTree,
293 where_clause: None,
294 metadata: Default::default(),
295 }),
296 "INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
297 name: index_name,
298 label: label.to_string(),
299 property: prop_name.clone(),
300 normalize: true,
301 max_terms_per_doc: 10_000,
302 metadata: Default::default(),
303 }),
304 _ => {
305 return Err(UniError::InvalidArgument {
306 arg: "type".into(),
307 message: format!("Unsupported index type: {}", config.index_type),
308 }
309 .into());
310 }
311 };
312
313 storage.schema_manager().add_index(def.clone())?;
314
315 let count = if let Ok(ds) = storage.vertex_dataset(label) {
317 if let Ok(raw) = ds.open_raw().await {
318 raw.count_rows(None).await.unwrap_or(0)
319 } else {
320 0
321 }
322 } else {
323 0
324 };
325
326 tracing::debug!("create_index_internal count for {}: {}", label, count);
327
328 if count > 0 {
329 let idx_mgr = IndexManager::new(
330 storage.base_path(),
331 storage.schema_manager_arc(),
332 storage.lancedb_store_arc(),
333 );
334 match def {
335 IndexDefinition::Vector(cfg) => {
336 idx_mgr.create_vector_index(cfg).await?;
337 }
338 IndexDefinition::Scalar(cfg) => {
339 idx_mgr.create_scalar_index(cfg).await?;
340 }
341 IndexDefinition::Inverted(cfg) => {
342 idx_mgr.create_inverted_index(cfg).await?;
343 }
344 _ => {}
345 }
346 }
347
348 Ok(())
349}
350
351async fn create_constraint_internal(
352 storage: &StorageManager,
353 target_name: &str,
354 config: &ConstraintConfig,
355 is_label: bool,
356) -> Result<()> {
357 let name = config.name.clone().unwrap_or_else(|| {
358 format!(
359 "{}_{}_{}",
360 target_name,
361 config.constraint_type.to_lowercase(),
362 config.properties.join("_")
363 )
364 });
365
366 let constraint_type = match config.constraint_type.to_uppercase().as_str() {
367 "UNIQUE" => ConstraintType::Unique {
368 properties: config.properties.clone(),
369 },
370 "EXISTS" => {
371 if config.properties.len() != 1 {
372 return Err(UniError::InvalidArgument {
373 arg: "properties".into(),
374 message: "EXISTS constraint requires exactly one property".into(),
375 }
376 .into());
377 }
378 ConstraintType::Exists {
379 property: config.properties[0].clone(),
380 }
381 }
382 _ => {
383 return Err(UniError::InvalidArgument {
384 arg: "type".into(),
385 message: format!("Unsupported constraint type: {}", config.constraint_type),
386 }
387 .into());
388 }
389 };
390
391 let target = if is_label {
392 ConstraintTarget::Label(target_name.to_string())
393 } else {
394 ConstraintTarget::EdgeType(target_name.to_string())
395 };
396
397 let constraint = Constraint {
398 name,
399 constraint_type,
400 target,
401 enabled: true,
402 };
403
404 storage.schema_manager().add_constraint(constraint)?;
405 Ok(())
406}
407
408fn parse_data_type(s: &str) -> Result<DataType> {
409 let s = s.trim();
410 if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
411 let inner = &s[5..s.len() - 1];
412 let inner_type = parse_data_type(inner)?;
413 return Ok(DataType::List(Box::new(inner_type)));
414 }
415
416 match s.to_uppercase().as_str() {
417 "STRING" | "UTF8" => Ok(DataType::String),
418 "INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
419 "INT32" => Ok(DataType::Int32),
420 "FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
421 "FLOAT32" => Ok(DataType::Float32),
422 "BOOL" | "BOOLEAN" => Ok(DataType::Bool),
423 "DATETIME" => Ok(DataType::DateTime),
424 "DATE" => Ok(DataType::Date),
425 "VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
426 _ => Err(UniError::InvalidArgument {
427 arg: "type".into(),
428 message: format!("Unknown data type: {}", s),
429 }
430 .into()),
431 }
432}