1use anyhow::Result;
5use serde::Deserialize;
6use std::collections::HashMap;
7use uni_common::Value;
8use uni_common::{
9 UniError,
10 core::schema::{
11 Constraint, ConstraintTarget, ConstraintType, DataType, DistanceMetric, EmbeddingConfig,
12 IndexDefinition, ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
13 validate_identifier,
14 },
15};
16use uni_store::storage::{IndexManager, StorageManager};
17
18#[derive(Deserialize)]
19struct LabelConfig {
20 #[serde(default)]
21 properties: HashMap<String, PropertyConfig>,
22 #[serde(default)]
23 indexes: Vec<IndexConfig>,
24 #[serde(default)]
25 constraints: Vec<ConstraintConfig>,
26}
27
28#[derive(Deserialize)]
29struct PropertyConfig {
30 #[serde(rename = "type")]
31 data_type: String,
32 #[serde(default = "default_nullable")]
33 nullable: bool,
34}
35
36fn default_nullable() -> bool {
37 true
38}
39
40#[derive(Deserialize)]
41struct IndexConfig {
42 property: Option<String>,
43 #[serde(rename = "type")]
44 index_type: String,
45 dimensions: Option<usize>,
47 metric: Option<String>,
48 embedding: Option<EmbeddingOptions>,
49 name: Option<String>,
51}
52
53#[derive(Deserialize)]
55struct EmbeddingOptions {
56 alias: String,
57 source: Vec<String>,
58 #[serde(default = "default_batch_size")]
59 batch_size: usize,
60}
61
62fn default_batch_size() -> usize {
63 32
64}
65
66#[derive(Deserialize)]
67struct ConstraintConfig {
68 #[serde(rename = "type")]
69 constraint_type: String,
70 properties: Vec<String>,
71 name: Option<String>,
72}
73
74pub async fn create_label(
75 storage: &StorageManager,
76 name: &str,
77 config_val: &Value,
78) -> Result<bool> {
79 validate_identifier(name)?;
80
81 if storage.schema_manager().schema().labels.contains_key(name) {
82 return Err(UniError::LabelAlreadyExists {
83 label: name.to_string(),
84 }
85 .into());
86 }
87
88 let json_val: serde_json::Value = config_val.clone().into();
89 let config: LabelConfig =
90 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
91 arg: "config".to_string(),
92 message: e.to_string(),
93 })?;
94
95 storage.schema_manager().add_label(name)?;
97
98 for (prop_name, prop_config) in config.properties {
100 validate_identifier(&prop_name)?;
101 let data_type = parse_data_type(&prop_config.data_type)?;
102 storage
103 .schema_manager()
104 .add_property(name, &prop_name, data_type, prop_config.nullable)?;
105 }
106
107 for idx in config.indexes {
109 if idx.property.is_none() {
110 return Err(UniError::InvalidArgument {
111 arg: "indexes".into(),
112 message: "Property name required for index definition".into(),
113 }
114 .into());
115 }
116 create_index_internal(storage, name, &idx).await?;
117 }
118
119 for c in config.constraints {
121 create_constraint_internal(storage, name, &c, true).await?;
122 }
123
124 storage.schema_manager().save().await?;
125 Ok(true)
126}
127
128pub async fn create_edge_type(
129 storage: &StorageManager,
130 name: &str,
131 src_labels: Vec<String>,
132 dst_labels: Vec<String>,
133 config_val: &Value,
134) -> Result<bool> {
135 validate_identifier(name)?;
136
137 let json_val: serde_json::Value = config_val.clone().into();
138 let config: LabelConfig =
139 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
140 arg: "config".to_string(),
141 message: e.to_string(),
142 })?;
143
144 storage
145 .schema_manager()
146 .add_edge_type(name, src_labels, dst_labels)?;
147
148 for (prop_name, prop_config) in config.properties {
149 validate_identifier(&prop_name)?;
150 let data_type = parse_data_type(&prop_config.data_type)?;
151 storage
152 .schema_manager()
153 .add_property(name, &prop_name, data_type, prop_config.nullable)?;
154 }
155
156 for c in config.constraints {
158 create_constraint_internal(storage, name, &c, false).await?;
159 }
160
161 storage.schema_manager().save().await?;
162 Ok(true)
163}
164
165pub async fn create_index(
166 storage: &StorageManager,
167 label: &str,
168 property: &str,
169 config_val: &Value,
170) -> Result<bool> {
171 let json_val: serde_json::Value = config_val.clone().into();
172 let mut config: IndexConfig =
173 serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
174 arg: "config".to_string(),
175 message: e.to_string(),
176 })?;
177
178 config.property = Some(property.to_string());
180
181 create_index_internal(storage, label, &config).await?;
182 storage.schema_manager().save().await?;
183 Ok(true)
184}
185
186pub async fn create_constraint(
187 storage: &StorageManager,
188 label: &str,
189 constraint_type: &str,
190 properties: Vec<String>,
191) -> Result<bool> {
192 let config = ConstraintConfig {
193 constraint_type: constraint_type.to_string(),
194 properties,
195 name: None,
196 };
197 create_constraint_internal(storage, label, &config, true).await?;
199 storage.schema_manager().save().await?;
200 Ok(true)
201}
202
203pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
204 storage.schema_manager().drop_label(name, true)?;
205 storage.schema_manager().save().await?;
206 Ok(true)
207}
208
209pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
210 storage.schema_manager().drop_edge_type(name, true)?;
211 storage.schema_manager().save().await?;
212 Ok(true)
213}
214
215pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
216 storage.schema_manager().remove_index(name)?;
217 storage.schema_manager().save().await?;
218 Ok(true)
219}
220
221pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
222 storage.schema_manager().drop_constraint(name, true)?;
223 storage.schema_manager().save().await?;
224 Ok(true)
225}
226
227async fn create_index_internal(
230 storage: &StorageManager,
231 label: &str,
232 config: &IndexConfig,
233) -> Result<()> {
234 let prop_name = config
235 .property
236 .as_ref()
237 .ok_or_else(|| UniError::InvalidArgument {
238 arg: "property".into(),
239 message: "Property is missing".into(),
240 })?;
241
242 let index_name = config.name.clone().unwrap_or_else(|| {
243 format!(
244 "{}_{}_{}",
245 label,
246 prop_name,
247 config.index_type.to_lowercase()
248 )
249 });
250
251 let def = match config.index_type.to_uppercase().as_str() {
252 "VECTOR" => {
253 let _dimensions = config.dimensions;
254 let metric = match config.metric.as_deref().unwrap_or("cosine") {
255 "cosine" => DistanceMetric::Cosine,
256 "l2" | "euclidean" => DistanceMetric::L2,
257 "dot" => DistanceMetric::Dot,
258 _ => {
259 return Err(UniError::InvalidArgument {
260 arg: "metric".into(),
261 message: "Invalid metric".into(),
262 }
263 .into());
264 }
265 };
266
267 let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
269 alias: emb.alias.clone(),
270 source_properties: emb.source.clone(),
271 batch_size: emb.batch_size,
272 });
273
274 IndexDefinition::Vector(VectorIndexConfig {
275 name: index_name,
276 label: label.to_string(),
277 property: prop_name.clone(),
278 index_type: VectorIndexType::Hnsw {
279 m: 16,
280 ef_construction: 200,
281 ef_search: 50,
282 }, metric,
284 embedding_config,
285 })
286 }
287 "SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
288 name: index_name,
289 label: label.to_string(),
290 properties: vec![prop_name.clone()],
291 index_type: ScalarIndexType::BTree,
292 where_clause: None,
293 }),
294 "INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
295 name: index_name,
296 label: label.to_string(),
297 property: prop_name.clone(),
298 normalize: true,
299 max_terms_per_doc: 10_000,
300 }),
301 _ => {
302 return Err(UniError::InvalidArgument {
303 arg: "type".into(),
304 message: format!("Unsupported index type: {}", config.index_type),
305 }
306 .into());
307 }
308 };
309
310 storage.schema_manager().add_index(def.clone())?;
311
312 let count = if let Ok(ds) = storage.vertex_dataset(label) {
314 if let Ok(raw) = ds.open_raw().await {
315 raw.count_rows(None).await.unwrap_or(0)
316 } else {
317 0
318 }
319 } else {
320 0
321 };
322
323 tracing::debug!("create_index_internal count for {}: {}", label, count);
324
325 if count > 0 {
326 let idx_mgr = IndexManager::new(
327 storage.base_path(),
328 storage.schema_manager_arc(),
329 storage.lancedb_store_arc(),
330 );
331 match def {
332 IndexDefinition::Vector(cfg) => {
333 idx_mgr.create_vector_index(cfg).await?;
334 }
335 IndexDefinition::Scalar(cfg) => {
336 idx_mgr.create_scalar_index(cfg).await?;
337 }
338 IndexDefinition::Inverted(cfg) => {
339 idx_mgr.create_inverted_index(cfg).await?;
340 }
341 _ => {}
342 }
343 }
344
345 Ok(())
346}
347
348async fn create_constraint_internal(
349 storage: &StorageManager,
350 target_name: &str,
351 config: &ConstraintConfig,
352 is_label: bool,
353) -> Result<()> {
354 let name = config.name.clone().unwrap_or_else(|| {
355 format!(
356 "{}_{}_{}",
357 target_name,
358 config.constraint_type.to_lowercase(),
359 config.properties.join("_")
360 )
361 });
362
363 let constraint_type = match config.constraint_type.to_uppercase().as_str() {
364 "UNIQUE" => ConstraintType::Unique {
365 properties: config.properties.clone(),
366 },
367 "EXISTS" => {
368 if config.properties.len() != 1 {
369 return Err(UniError::InvalidArgument {
370 arg: "properties".into(),
371 message: "EXISTS constraint requires exactly one property".into(),
372 }
373 .into());
374 }
375 ConstraintType::Exists {
376 property: config.properties[0].clone(),
377 }
378 }
379 _ => {
380 return Err(UniError::InvalidArgument {
381 arg: "type".into(),
382 message: format!("Unsupported constraint type: {}", config.constraint_type),
383 }
384 .into());
385 }
386 };
387
388 let target = if is_label {
389 ConstraintTarget::Label(target_name.to_string())
390 } else {
391 ConstraintTarget::EdgeType(target_name.to_string())
392 };
393
394 let constraint = Constraint {
395 name,
396 constraint_type,
397 target,
398 enabled: true,
399 };
400
401 storage.schema_manager().add_constraint(constraint)?;
402 Ok(())
403}
404
405fn parse_data_type(s: &str) -> Result<DataType> {
406 let s = s.trim();
407 if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
408 let inner = &s[5..s.len() - 1];
409 let inner_type = parse_data_type(inner)?;
410 return Ok(DataType::List(Box::new(inner_type)));
411 }
412
413 match s.to_uppercase().as_str() {
414 "STRING" | "UTF8" => Ok(DataType::String),
415 "INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
416 "INT32" => Ok(DataType::Int32),
417 "FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
418 "FLOAT32" => Ok(DataType::Float32),
419 "BOOL" | "BOOLEAN" => Ok(DataType::Bool),
420 "DATETIME" => Ok(DataType::DateTime),
421 "DATE" => Ok(DataType::Date),
422 "VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
423 _ => Err(UniError::InvalidArgument {
424 arg: "type".into(),
425 message: format!("Unknown data type: {}", s),
426 }
427 .into()),
428 }
429}