Skip to main content

uni_query/query/executor/
ddl_procedures.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::Result;
5use serde::Deserialize;
6use std::collections::HashMap;
7use uni_common::Value;
8use uni_common::{
9    UniError,
10    core::schema::{
11        Constraint, ConstraintTarget, ConstraintType, DataType, DistanceMetric, EmbeddingConfig,
12        IndexDefinition, ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
13        validate_identifier,
14    },
15};
16use uni_store::storage::{IndexManager, StorageManager};
17
18#[derive(Deserialize)]
19struct LabelConfig {
20    #[serde(default)]
21    properties: HashMap<String, PropertyConfig>,
22    #[serde(default)]
23    indexes: Vec<IndexConfig>,
24    #[serde(default)]
25    constraints: Vec<ConstraintConfig>,
26}
27
28#[derive(Deserialize)]
29struct PropertyConfig {
30    #[serde(rename = "type")]
31    data_type: String,
32    #[serde(default = "default_nullable")]
33    nullable: bool,
34}
35
36fn default_nullable() -> bool {
37    true
38}
39
40#[derive(Deserialize)]
41struct IndexConfig {
42    property: Option<String>,
43    #[serde(rename = "type")]
44    index_type: String,
45    // Vector specific (accepted in JSON but not consumed at runtime)
46    #[expect(dead_code)]
47    dimensions: Option<usize>,
48    metric: Option<String>,
49    embedding: Option<EmbeddingOptions>,
50    // Generic
51    name: Option<String>,
52}
53
54/// Embedding configuration for vector indexes via procedure API.
55#[derive(Deserialize)]
56struct EmbeddingOptions {
57    alias: String,
58    source: Vec<String>,
59    #[serde(default = "default_batch_size")]
60    batch_size: usize,
61}
62
63fn default_batch_size() -> usize {
64    32
65}
66
67#[derive(Deserialize)]
68struct ConstraintConfig {
69    #[serde(rename = "type")]
70    constraint_type: String,
71    properties: Vec<String>,
72    name: Option<String>,
73}
74
75pub async fn create_label(
76    storage: &StorageManager,
77    name: &str,
78    config_val: &Value,
79) -> Result<bool> {
80    validate_identifier(name)?;
81
82    if storage.schema_manager().schema().labels.contains_key(name) {
83        return Err(UniError::LabelAlreadyExists {
84            label: name.to_string(),
85        }
86        .into());
87    }
88
89    let json_val: serde_json::Value = config_val.clone().into();
90    let config: LabelConfig =
91        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
92            arg: "config".to_string(),
93            message: e.to_string(),
94        })?;
95
96    // Create label
97    storage.schema_manager().add_label(name)?;
98
99    // Add properties
100    for (prop_name, prop_config) in config.properties {
101        validate_identifier(&prop_name)?;
102        let data_type = parse_data_type(&prop_config.data_type)?;
103        storage
104            .schema_manager()
105            .add_property(name, &prop_name, data_type, prop_config.nullable)?;
106    }
107
108    // Add indexes
109    for idx in config.indexes {
110        if idx.property.is_none() {
111            return Err(UniError::InvalidArgument {
112                arg: "indexes".into(),
113                message: "Property name required for index definition".into(),
114            }
115            .into());
116        }
117        create_index_internal(storage, name, &idx).await?;
118    }
119
120    // Add constraints
121    for c in config.constraints {
122        create_constraint_internal(storage, name, &c, true).await?;
123    }
124
125    storage.schema_manager().save().await?;
126    Ok(true)
127}
128
129pub async fn create_edge_type(
130    storage: &StorageManager,
131    name: &str,
132    src_labels: Vec<String>,
133    dst_labels: Vec<String>,
134    config_val: &Value,
135) -> Result<bool> {
136    validate_identifier(name)?;
137
138    let json_val: serde_json::Value = config_val.clone().into();
139    let config: LabelConfig =
140        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
141            arg: "config".to_string(),
142            message: e.to_string(),
143        })?;
144
145    storage
146        .schema_manager()
147        .add_edge_type(name, src_labels, dst_labels)?;
148
149    for (prop_name, prop_config) in config.properties {
150        validate_identifier(&prop_name)?;
151        let data_type = parse_data_type(&prop_config.data_type)?;
152        storage
153            .schema_manager()
154            .add_property(name, &prop_name, data_type, prop_config.nullable)?;
155    }
156
157    // Constraints
158    for c in config.constraints {
159        create_constraint_internal(storage, name, &c, false).await?;
160    }
161
162    storage.schema_manager().save().await?;
163    Ok(true)
164}
165
166pub async fn create_index(
167    storage: &StorageManager,
168    label: &str,
169    property: &str,
170    config_val: &Value,
171) -> Result<bool> {
172    let json_val: serde_json::Value = config_val.clone().into();
173    let mut config: IndexConfig =
174        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
175            arg: "config".to_string(),
176            message: e.to_string(),
177        })?;
178
179    // Override property from args
180    config.property = Some(property.to_string());
181
182    create_index_internal(storage, label, &config).await?;
183    storage.schema_manager().save().await?;
184    Ok(true)
185}
186
187pub async fn create_constraint(
188    storage: &StorageManager,
189    label: &str,
190    constraint_type: &str,
191    properties: Vec<String>,
192) -> Result<bool> {
193    let config = ConstraintConfig {
194        constraint_type: constraint_type.to_string(),
195        properties,
196        name: None,
197    };
198    // Assume label target
199    create_constraint_internal(storage, label, &config, true).await?;
200    storage.schema_manager().save().await?;
201    Ok(true)
202}
203
204pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
205    storage.schema_manager().drop_label(name, true)?;
206    storage.schema_manager().save().await?;
207    Ok(true)
208}
209
210pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
211    storage.schema_manager().drop_edge_type(name, true)?;
212    storage.schema_manager().save().await?;
213    Ok(true)
214}
215
216pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
217    storage.schema_manager().remove_index(name)?;
218    storage.schema_manager().save().await?;
219    Ok(true)
220}
221
222pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
223    storage.schema_manager().drop_constraint(name, true)?;
224    storage.schema_manager().save().await?;
225    Ok(true)
226}
227
228// Internal helpers
229
230async fn create_index_internal(
231    storage: &StorageManager,
232    label: &str,
233    config: &IndexConfig,
234) -> Result<()> {
235    let prop_name = config
236        .property
237        .as_ref()
238        .ok_or_else(|| UniError::InvalidArgument {
239            arg: "property".into(),
240            message: "Property is missing".into(),
241        })?;
242
243    let index_name = config.name.clone().unwrap_or_else(|| {
244        format!(
245            "{}_{}_{}",
246            label,
247            prop_name,
248            config.index_type.to_lowercase()
249        )
250    });
251
252    let def = match config.index_type.to_uppercase().as_str() {
253        "VECTOR" => {
254            let metric = match config.metric.as_deref().unwrap_or("cosine") {
255                "cosine" => DistanceMetric::Cosine,
256                "l2" | "euclidean" => DistanceMetric::L2,
257                "dot" => DistanceMetric::Dot,
258                _ => {
259                    return Err(UniError::InvalidArgument {
260                        arg: "metric".into(),
261                        message: "Invalid metric".into(),
262                    }
263                    .into());
264                }
265            };
266
267            // Parse embedding config from procedure options
268            let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
269                alias: emb.alias.clone(),
270                source_properties: emb.source.clone(),
271                batch_size: emb.batch_size,
272            });
273
274            IndexDefinition::Vector(VectorIndexConfig {
275                name: index_name,
276                label: label.to_string(),
277                property: prop_name.clone(),
278                index_type: VectorIndexType::Hnsw {
279                    m: 16,
280                    ef_construction: 200,
281                    ef_search: 50,
282                }, // Default params
283                metric,
284                embedding_config,
285                metadata: Default::default(),
286            })
287        }
288        "SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
289            name: index_name,
290            label: label.to_string(),
291            properties: vec![prop_name.clone()],
292            index_type: ScalarIndexType::BTree,
293            where_clause: None,
294            metadata: Default::default(),
295        }),
296        "INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
297            name: index_name,
298            label: label.to_string(),
299            property: prop_name.clone(),
300            normalize: true,
301            max_terms_per_doc: 10_000,
302            metadata: Default::default(),
303        }),
304        _ => {
305            return Err(UniError::InvalidArgument {
306                arg: "type".into(),
307                message: format!("Unsupported index type: {}", config.index_type),
308            }
309            .into());
310        }
311    };
312
313    storage.schema_manager().add_index(def.clone())?;
314
315    // Trigger build if data exists
316    let count = if let Ok(ds) = storage.vertex_dataset(label) {
317        if let Ok(raw) = ds.open_raw().await {
318            raw.count_rows(None).await.unwrap_or(0)
319        } else {
320            0
321        }
322    } else {
323        0
324    };
325
326    tracing::debug!("create_index_internal count for {}: {}", label, count);
327
328    if count > 0 {
329        let idx_mgr = IndexManager::new(
330            storage.base_path(),
331            storage.schema_manager_arc(),
332            storage.lancedb_store_arc(),
333        );
334        match def {
335            IndexDefinition::Vector(cfg) => {
336                idx_mgr.create_vector_index(cfg).await?;
337            }
338            IndexDefinition::Scalar(cfg) => {
339                idx_mgr.create_scalar_index(cfg).await?;
340            }
341            IndexDefinition::Inverted(cfg) => {
342                idx_mgr.create_inverted_index(cfg).await?;
343            }
344            _ => {}
345        }
346    }
347
348    Ok(())
349}
350
351async fn create_constraint_internal(
352    storage: &StorageManager,
353    target_name: &str,
354    config: &ConstraintConfig,
355    is_label: bool,
356) -> Result<()> {
357    let name = config.name.clone().unwrap_or_else(|| {
358        format!(
359            "{}_{}_{}",
360            target_name,
361            config.constraint_type.to_lowercase(),
362            config.properties.join("_")
363        )
364    });
365
366    let constraint_type = match config.constraint_type.to_uppercase().as_str() {
367        "UNIQUE" => ConstraintType::Unique {
368            properties: config.properties.clone(),
369        },
370        "EXISTS" => {
371            if config.properties.len() != 1 {
372                return Err(UniError::InvalidArgument {
373                    arg: "properties".into(),
374                    message: "EXISTS constraint requires exactly one property".into(),
375                }
376                .into());
377            }
378            ConstraintType::Exists {
379                property: config.properties[0].clone(),
380            }
381        }
382        _ => {
383            return Err(UniError::InvalidArgument {
384                arg: "type".into(),
385                message: format!("Unsupported constraint type: {}", config.constraint_type),
386            }
387            .into());
388        }
389    };
390
391    let target = if is_label {
392        ConstraintTarget::Label(target_name.to_string())
393    } else {
394        ConstraintTarget::EdgeType(target_name.to_string())
395    };
396
397    let constraint = Constraint {
398        name,
399        constraint_type,
400        target,
401        enabled: true,
402    };
403
404    storage.schema_manager().add_constraint(constraint)?;
405    Ok(())
406}
407
408fn parse_data_type(s: &str) -> Result<DataType> {
409    let s = s.trim();
410    if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
411        let inner = &s[5..s.len() - 1];
412        let inner_type = parse_data_type(inner)?;
413        return Ok(DataType::List(Box::new(inner_type)));
414    }
415
416    match s.to_uppercase().as_str() {
417        "STRING" | "UTF8" => Ok(DataType::String),
418        "INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
419        "INT32" => Ok(DataType::Int32),
420        "FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
421        "FLOAT32" => Ok(DataType::Float32),
422        "BOOL" | "BOOLEAN" => Ok(DataType::Bool),
423        "DATETIME" => Ok(DataType::DateTime),
424        "DATE" => Ok(DataType::Date),
425        "VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
426        _ => Err(UniError::InvalidArgument {
427            arg: "type".into(),
428            message: format!("Unknown data type: {}", s),
429        }
430        .into()),
431    }
432}