Skip to main content

uni_query/query/executor/
ddl_procedures.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::Result;
5use serde::Deserialize;
6use std::collections::HashMap;
7use uni_common::Value;
8use uni_common::{
9    UniError,
10    core::schema::{
11        Constraint, ConstraintTarget, ConstraintType, DataType, DistanceMetric, EmbeddingConfig,
12        IndexDefinition, ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
13        validate_identifier,
14    },
15};
16use uni_store::storage::{IndexManager, StorageManager};
17
18#[derive(Deserialize)]
19struct LabelConfig {
20    #[serde(default)]
21    properties: HashMap<String, PropertyConfig>,
22    #[serde(default)]
23    indexes: Vec<IndexConfig>,
24    #[serde(default)]
25    constraints: Vec<ConstraintConfig>,
26}
27
28#[derive(Deserialize)]
29struct PropertyConfig {
30    #[serde(rename = "type")]
31    data_type: String,
32    #[serde(default = "default_nullable")]
33    nullable: bool,
34}
35
36fn default_nullable() -> bool {
37    true
38}
39
40#[derive(Deserialize)]
41struct IndexConfig {
42    property: Option<String>,
43    #[serde(rename = "type")]
44    index_type: String,
45    // Vector specific
46    dimensions: Option<usize>,
47    metric: Option<String>,
48    embedding: Option<EmbeddingOptions>,
49    // Generic
50    name: Option<String>,
51}
52
53/// Embedding configuration for vector indexes via procedure API.
54#[derive(Deserialize)]
55struct EmbeddingOptions {
56    alias: String,
57    source: Vec<String>,
58    #[serde(default = "default_batch_size")]
59    batch_size: usize,
60}
61
62fn default_batch_size() -> usize {
63    32
64}
65
66#[derive(Deserialize)]
67struct ConstraintConfig {
68    #[serde(rename = "type")]
69    constraint_type: String,
70    properties: Vec<String>,
71    name: Option<String>,
72}
73
74pub async fn create_label(
75    storage: &StorageManager,
76    name: &str,
77    config_val: &Value,
78) -> Result<bool> {
79    validate_identifier(name)?;
80
81    if storage.schema_manager().schema().labels.contains_key(name) {
82        return Err(UniError::LabelAlreadyExists {
83            label: name.to_string(),
84        }
85        .into());
86    }
87
88    let json_val: serde_json::Value = config_val.clone().into();
89    let config: LabelConfig =
90        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
91            arg: "config".to_string(),
92            message: e.to_string(),
93        })?;
94
95    // Create label
96    storage.schema_manager().add_label(name)?;
97
98    // Add properties
99    for (prop_name, prop_config) in config.properties {
100        validate_identifier(&prop_name)?;
101        let data_type = parse_data_type(&prop_config.data_type)?;
102        storage
103            .schema_manager()
104            .add_property(name, &prop_name, data_type, prop_config.nullable)?;
105    }
106
107    // Add indexes
108    for idx in config.indexes {
109        if idx.property.is_none() {
110            return Err(UniError::InvalidArgument {
111                arg: "indexes".into(),
112                message: "Property name required for index definition".into(),
113            }
114            .into());
115        }
116        create_index_internal(storage, name, &idx).await?;
117    }
118
119    // Add constraints
120    for c in config.constraints {
121        create_constraint_internal(storage, name, &c, true).await?;
122    }
123
124    storage.schema_manager().save().await?;
125    Ok(true)
126}
127
128pub async fn create_edge_type(
129    storage: &StorageManager,
130    name: &str,
131    src_labels: Vec<String>,
132    dst_labels: Vec<String>,
133    config_val: &Value,
134) -> Result<bool> {
135    validate_identifier(name)?;
136
137    let json_val: serde_json::Value = config_val.clone().into();
138    let config: LabelConfig =
139        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
140            arg: "config".to_string(),
141            message: e.to_string(),
142        })?;
143
144    storage
145        .schema_manager()
146        .add_edge_type(name, src_labels, dst_labels)?;
147
148    for (prop_name, prop_config) in config.properties {
149        validate_identifier(&prop_name)?;
150        let data_type = parse_data_type(&prop_config.data_type)?;
151        storage
152            .schema_manager()
153            .add_property(name, &prop_name, data_type, prop_config.nullable)?;
154    }
155
156    // Constraints
157    for c in config.constraints {
158        create_constraint_internal(storage, name, &c, false).await?;
159    }
160
161    storage.schema_manager().save().await?;
162    Ok(true)
163}
164
165pub async fn create_index(
166    storage: &StorageManager,
167    label: &str,
168    property: &str,
169    config_val: &Value,
170) -> Result<bool> {
171    let json_val: serde_json::Value = config_val.clone().into();
172    let mut config: IndexConfig =
173        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
174            arg: "config".to_string(),
175            message: e.to_string(),
176        })?;
177
178    // Override property from args
179    config.property = Some(property.to_string());
180
181    create_index_internal(storage, label, &config).await?;
182    storage.schema_manager().save().await?;
183    Ok(true)
184}
185
186pub async fn create_constraint(
187    storage: &StorageManager,
188    label: &str,
189    constraint_type: &str,
190    properties: Vec<String>,
191) -> Result<bool> {
192    let config = ConstraintConfig {
193        constraint_type: constraint_type.to_string(),
194        properties,
195        name: None,
196    };
197    // Assume label target
198    create_constraint_internal(storage, label, &config, true).await?;
199    storage.schema_manager().save().await?;
200    Ok(true)
201}
202
203pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
204    storage.schema_manager().drop_label(name, true)?;
205    storage.schema_manager().save().await?;
206    Ok(true)
207}
208
209pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
210    storage.schema_manager().drop_edge_type(name, true)?;
211    storage.schema_manager().save().await?;
212    Ok(true)
213}
214
215pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
216    storage.schema_manager().remove_index(name)?;
217    storage.schema_manager().save().await?;
218    Ok(true)
219}
220
221pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
222    storage.schema_manager().drop_constraint(name, true)?;
223    storage.schema_manager().save().await?;
224    Ok(true)
225}
226
227// Internal helpers
228
229async fn create_index_internal(
230    storage: &StorageManager,
231    label: &str,
232    config: &IndexConfig,
233) -> Result<()> {
234    let prop_name = config
235        .property
236        .as_ref()
237        .ok_or_else(|| UniError::InvalidArgument {
238            arg: "property".into(),
239            message: "Property is missing".into(),
240        })?;
241
242    let index_name = config.name.clone().unwrap_or_else(|| {
243        format!(
244            "{}_{}_{}",
245            label,
246            prop_name,
247            config.index_type.to_lowercase()
248        )
249    });
250
251    let def = match config.index_type.to_uppercase().as_str() {
252        "VECTOR" => {
253            let _dimensions = config.dimensions;
254            let metric = match config.metric.as_deref().unwrap_or("cosine") {
255                "cosine" => DistanceMetric::Cosine,
256                "l2" | "euclidean" => DistanceMetric::L2,
257                "dot" => DistanceMetric::Dot,
258                _ => {
259                    return Err(UniError::InvalidArgument {
260                        arg: "metric".into(),
261                        message: "Invalid metric".into(),
262                    }
263                    .into());
264                }
265            };
266
267            // Parse embedding config from procedure options
268            let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
269                alias: emb.alias.clone(),
270                source_properties: emb.source.clone(),
271                batch_size: emb.batch_size,
272            });
273
274            IndexDefinition::Vector(VectorIndexConfig {
275                name: index_name,
276                label: label.to_string(),
277                property: prop_name.clone(),
278                index_type: VectorIndexType::Hnsw {
279                    m: 16,
280                    ef_construction: 200,
281                    ef_search: 50,
282                }, // Default params
283                metric,
284                embedding_config,
285            })
286        }
287        "SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
288            name: index_name,
289            label: label.to_string(),
290            properties: vec![prop_name.clone()],
291            index_type: ScalarIndexType::BTree,
292            where_clause: None,
293        }),
294        "INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
295            name: index_name,
296            label: label.to_string(),
297            property: prop_name.clone(),
298            normalize: true,
299            max_terms_per_doc: 10_000,
300        }),
301        _ => {
302            return Err(UniError::InvalidArgument {
303                arg: "type".into(),
304                message: format!("Unsupported index type: {}", config.index_type),
305            }
306            .into());
307        }
308    };
309
310    storage.schema_manager().add_index(def.clone())?;
311
312    // Trigger build if data exists
313    let count = if let Ok(ds) = storage.vertex_dataset(label) {
314        if let Ok(raw) = ds.open_raw().await {
315            raw.count_rows(None).await.unwrap_or(0)
316        } else {
317            0
318        }
319    } else {
320        0
321    };
322
323    tracing::debug!("create_index_internal count for {}: {}", label, count);
324
325    if count > 0 {
326        let idx_mgr = IndexManager::new(
327            storage.base_path(),
328            storage.schema_manager_arc(),
329            storage.lancedb_store_arc(),
330        );
331        match def {
332            IndexDefinition::Vector(cfg) => {
333                idx_mgr.create_vector_index(cfg).await?;
334            }
335            IndexDefinition::Scalar(cfg) => {
336                idx_mgr.create_scalar_index(cfg).await?;
337            }
338            IndexDefinition::Inverted(cfg) => {
339                idx_mgr.create_inverted_index(cfg).await?;
340            }
341            _ => {}
342        }
343    }
344
345    Ok(())
346}
347
348async fn create_constraint_internal(
349    storage: &StorageManager,
350    target_name: &str,
351    config: &ConstraintConfig,
352    is_label: bool,
353) -> Result<()> {
354    let name = config.name.clone().unwrap_or_else(|| {
355        format!(
356            "{}_{}_{}",
357            target_name,
358            config.constraint_type.to_lowercase(),
359            config.properties.join("_")
360        )
361    });
362
363    let constraint_type = match config.constraint_type.to_uppercase().as_str() {
364        "UNIQUE" => ConstraintType::Unique {
365            properties: config.properties.clone(),
366        },
367        "EXISTS" => {
368            if config.properties.len() != 1 {
369                return Err(UniError::InvalidArgument {
370                    arg: "properties".into(),
371                    message: "EXISTS constraint requires exactly one property".into(),
372                }
373                .into());
374            }
375            ConstraintType::Exists {
376                property: config.properties[0].clone(),
377            }
378        }
379        _ => {
380            return Err(UniError::InvalidArgument {
381                arg: "type".into(),
382                message: format!("Unsupported constraint type: {}", config.constraint_type),
383            }
384            .into());
385        }
386    };
387
388    let target = if is_label {
389        ConstraintTarget::Label(target_name.to_string())
390    } else {
391        ConstraintTarget::EdgeType(target_name.to_string())
392    };
393
394    let constraint = Constraint {
395        name,
396        constraint_type,
397        target,
398        enabled: true,
399    };
400
401    storage.schema_manager().add_constraint(constraint)?;
402    Ok(())
403}
404
405fn parse_data_type(s: &str) -> Result<DataType> {
406    let s = s.trim();
407    if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
408        let inner = &s[5..s.len() - 1];
409        let inner_type = parse_data_type(inner)?;
410        return Ok(DataType::List(Box::new(inner_type)));
411    }
412
413    match s.to_uppercase().as_str() {
414        "STRING" | "UTF8" => Ok(DataType::String),
415        "INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
416        "INT32" => Ok(DataType::Int32),
417        "FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
418        "FLOAT32" => Ok(DataType::Float32),
419        "BOOL" | "BOOLEAN" => Ok(DataType::Bool),
420        "DATETIME" => Ok(DataType::DateTime),
421        "DATE" => Ok(DataType::Date),
422        "VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
423        _ => Err(UniError::InvalidArgument {
424            arg: "type".into(),
425            message: format!("Unknown data type: {}", s),
426        }
427        .into()),
428    }
429}