Skip to main content

uni_query/query/executor/
ddl_procedures.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::Result;
5use serde::Deserialize;
6use std::collections::HashMap;
7use uni_common::Value;
8use uni_common::{
9    UniError,
10    core::schema::{
11        Constraint, ConstraintTarget, ConstraintType, DataType, DistanceMetric, EmbeddingConfig,
12        IndexDefinition, ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
13        validate_identifier,
14    },
15};
16use uni_store::storage::StorageManager;
17
18#[derive(Deserialize)]
19struct LabelConfig {
20    #[serde(default)]
21    properties: HashMap<String, PropertyConfig>,
22    #[serde(default)]
23    indexes: Vec<IndexConfig>,
24    #[serde(default)]
25    constraints: Vec<ConstraintConfig>,
26}
27
28#[derive(Deserialize)]
29struct PropertyConfig {
30    #[serde(rename = "type")]
31    data_type: String,
32    #[serde(default = "default_nullable")]
33    nullable: bool,
34}
35
36fn default_nullable() -> bool {
37    true
38}
39
40#[derive(Deserialize)]
41struct IndexConfig {
42    property: Option<String>,
43    #[serde(rename = "type")]
44    index_type: String,
45    // Vector specific
46    #[expect(dead_code)]
47    dimensions: Option<usize>,
48    metric: Option<String>,
49    algorithm: Option<String>,
50    partitions: Option<u32>,
51    m: Option<u32>,
52    ef_construction: Option<u32>,
53    sub_vectors: Option<u32>,
54    num_bits: Option<u8>,
55    embedding: Option<EmbeddingOptions>,
56    // Generic
57    name: Option<String>,
58}
59
60/// Embedding configuration for vector indexes via procedure API.
61#[derive(Deserialize)]
62struct EmbeddingOptions {
63    alias: String,
64    source: Vec<String>,
65    #[serde(default = "default_batch_size")]
66    batch_size: usize,
67}
68
69fn default_batch_size() -> usize {
70    32
71}
72
73#[derive(Deserialize)]
74struct ConstraintConfig {
75    #[serde(rename = "type")]
76    constraint_type: String,
77    properties: Vec<String>,
78    name: Option<String>,
79}
80
81pub async fn create_label(
82    storage: &StorageManager,
83    name: &str,
84    config_val: &Value,
85) -> Result<bool> {
86    validate_identifier(name)?;
87
88    if storage.schema_manager().schema().labels.contains_key(name) {
89        return Err(UniError::LabelAlreadyExists {
90            label: name.to_string(),
91        }
92        .into());
93    }
94
95    let json_val: serde_json::Value = config_val.clone().into();
96    let config: LabelConfig =
97        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
98            arg: "config".to_string(),
99            message: e.to_string(),
100        })?;
101
102    // Create label
103    storage.schema_manager().add_label(name)?;
104
105    // Add properties
106    for (prop_name, prop_config) in config.properties {
107        validate_identifier(&prop_name)?;
108        let data_type = parse_data_type(&prop_config.data_type)?;
109        storage
110            .schema_manager()
111            .add_property(name, &prop_name, data_type, prop_config.nullable)?;
112    }
113
114    // Add indexes
115    for idx in config.indexes {
116        if idx.property.is_none() {
117            return Err(UniError::InvalidArgument {
118                arg: "indexes".into(),
119                message: "Property name required for index definition".into(),
120            }
121            .into());
122        }
123        create_index_internal(storage, name, &idx).await?;
124    }
125
126    // Add constraints
127    for c in config.constraints {
128        create_constraint_internal(storage, name, &c, true).await?;
129    }
130
131    storage.schema_manager().save().await?;
132    Ok(true)
133}
134
135pub async fn create_edge_type(
136    storage: &StorageManager,
137    name: &str,
138    src_labels: Vec<String>,
139    dst_labels: Vec<String>,
140    config_val: &Value,
141) -> Result<bool> {
142    validate_identifier(name)?;
143
144    let json_val: serde_json::Value = config_val.clone().into();
145    let config: LabelConfig =
146        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
147            arg: "config".to_string(),
148            message: e.to_string(),
149        })?;
150
151    storage
152        .schema_manager()
153        .add_edge_type(name, src_labels, dst_labels)?;
154
155    for (prop_name, prop_config) in config.properties {
156        validate_identifier(&prop_name)?;
157        let data_type = parse_data_type(&prop_config.data_type)?;
158        storage
159            .schema_manager()
160            .add_property(name, &prop_name, data_type, prop_config.nullable)?;
161    }
162
163    // Constraints
164    for c in config.constraints {
165        create_constraint_internal(storage, name, &c, false).await?;
166    }
167
168    storage.schema_manager().save().await?;
169    Ok(true)
170}
171
172pub async fn create_index(
173    storage: &StorageManager,
174    label: &str,
175    property: &str,
176    config_val: &Value,
177) -> Result<bool> {
178    let json_val: serde_json::Value = config_val.clone().into();
179    let mut config: IndexConfig =
180        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
181            arg: "config".to_string(),
182            message: e.to_string(),
183        })?;
184
185    // Override property from args
186    config.property = Some(property.to_string());
187
188    create_index_internal(storage, label, &config).await?;
189    storage.schema_manager().save().await?;
190    Ok(true)
191}
192
193pub async fn create_constraint(
194    storage: &StorageManager,
195    label: &str,
196    constraint_type: &str,
197    properties: Vec<String>,
198) -> Result<bool> {
199    let config = ConstraintConfig {
200        constraint_type: constraint_type.to_string(),
201        properties,
202        name: None,
203    };
204    // Assume label target
205    create_constraint_internal(storage, label, &config, true).await?;
206    storage.schema_manager().save().await?;
207    Ok(true)
208}
209
210pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
211    storage.schema_manager().drop_label(name, true)?;
212    storage.schema_manager().save().await?;
213    Ok(true)
214}
215
216pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
217    storage.schema_manager().drop_edge_type(name, true)?;
218    storage.schema_manager().save().await?;
219    Ok(true)
220}
221
222pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
223    storage.schema_manager().remove_index(name)?;
224    storage.schema_manager().save().await?;
225    Ok(true)
226}
227
228pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
229    storage.schema_manager().drop_constraint(name, true)?;
230    storage.schema_manager().save().await?;
231    Ok(true)
232}
233
234// Internal helpers
235
236async fn create_index_internal(
237    storage: &StorageManager,
238    label: &str,
239    config: &IndexConfig,
240) -> Result<()> {
241    let prop_name = config
242        .property
243        .as_ref()
244        .ok_or_else(|| UniError::InvalidArgument {
245            arg: "property".into(),
246            message: "Property is missing".into(),
247        })?;
248
249    let index_name = config.name.clone().unwrap_or_else(|| {
250        format!(
251            "{}_{}_{}",
252            label,
253            prop_name,
254            config.index_type.to_lowercase()
255        )
256    });
257
258    let def = match config.index_type.to_uppercase().as_str() {
259        "VECTOR" => {
260            let metric = match config.metric.as_deref().unwrap_or("cosine") {
261                "cosine" => DistanceMetric::Cosine,
262                "l2" | "euclidean" => DistanceMetric::L2,
263                "dot" => DistanceMetric::Dot,
264                _ => {
265                    return Err(UniError::InvalidArgument {
266                        arg: "metric".into(),
267                        message: "Invalid metric".into(),
268                    }
269                    .into());
270                }
271            };
272
273            // Parse embedding config from procedure options
274            let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
275                alias: emb.alias.clone(),
276                source_properties: emb.source.clone(),
277                batch_size: emb.batch_size,
278            });
279
280            let algorithm = config.algorithm.as_deref().unwrap_or("hnsw");
281            let index_type = match algorithm.to_lowercase().as_str() {
282                "flat" => VectorIndexType::Flat,
283                "ivf_flat" => VectorIndexType::IvfFlat {
284                    num_partitions: config.partitions.unwrap_or(256),
285                },
286                "ivf_pq" => VectorIndexType::IvfPq {
287                    num_partitions: config.partitions.unwrap_or(256),
288                    num_sub_vectors: config.sub_vectors.unwrap_or(16),
289                    bits_per_subvector: config.num_bits.unwrap_or(8),
290                },
291                "ivf_sq" => VectorIndexType::IvfSq {
292                    num_partitions: config.partitions.unwrap_or(256),
293                },
294                "ivf_rq" => VectorIndexType::IvfRq {
295                    num_partitions: config.partitions.unwrap_or(256),
296                    num_bits: config.num_bits,
297                },
298                "hnsw_flat" => VectorIndexType::HnswFlat {
299                    m: config.m.unwrap_or(16),
300                    ef_construction: config.ef_construction.unwrap_or(200),
301                    num_partitions: config.partitions,
302                },
303                "hnsw_pq" => VectorIndexType::HnswPq {
304                    m: config.m.unwrap_or(16),
305                    ef_construction: config.ef_construction.unwrap_or(200),
306                    num_sub_vectors: config.sub_vectors.unwrap_or(16),
307                    num_partitions: config.partitions,
308                },
309                _ => VectorIndexType::HnswSq {
310                    m: config.m.unwrap_or(16),
311                    ef_construction: config.ef_construction.unwrap_or(200),
312                    num_partitions: config.partitions,
313                },
314            };
315
316            IndexDefinition::Vector(VectorIndexConfig {
317                name: index_name,
318                label: label.to_string(),
319                property: prop_name.clone(),
320                index_type,
321                metric,
322                embedding_config,
323                metadata: Default::default(),
324            })
325        }
326        "SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
327            name: index_name,
328            label: label.to_string(),
329            properties: vec![prop_name.clone()],
330            index_type: ScalarIndexType::BTree,
331            where_clause: None,
332            metadata: Default::default(),
333        }),
334        "BITMAP" => IndexDefinition::Scalar(ScalarIndexConfig {
335            name: index_name,
336            label: label.to_string(),
337            properties: vec![prop_name.clone()],
338            index_type: ScalarIndexType::Bitmap,
339            where_clause: None,
340            metadata: Default::default(),
341        }),
342        "LABEL_LIST" | "LABELLIST" => IndexDefinition::Scalar(ScalarIndexConfig {
343            name: index_name,
344            label: label.to_string(),
345            properties: vec![prop_name.clone()],
346            index_type: ScalarIndexType::LabelList,
347            where_clause: None,
348            metadata: Default::default(),
349        }),
350        "INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
351            name: index_name,
352            label: label.to_string(),
353            property: prop_name.clone(),
354            normalize: true,
355            max_terms_per_doc: 10_000,
356            metadata: Default::default(),
357        }),
358        _ => {
359            return Err(UniError::InvalidArgument {
360                arg: "type".into(),
361                message: format!("Unsupported index type: {}", config.index_type),
362            }
363            .into());
364        }
365    };
366
367    storage.schema_manager().add_index(def.clone())?;
368
369    // Trigger build if data exists
370    let count = if let Ok(ds) = storage.vertex_dataset(label) {
371        if let Ok(raw) = ds.open_raw().await {
372            raw.count_rows(None).await.unwrap_or(0)
373        } else {
374            0
375        }
376    } else {
377        0
378    };
379
380    tracing::debug!("create_index_internal count for {}: {}", label, count);
381
382    if count > 0 {
383        let idx_mgr = storage.index_manager();
384        match def {
385            IndexDefinition::Vector(cfg) => {
386                idx_mgr.create_vector_index(cfg).await?;
387            }
388            IndexDefinition::Scalar(cfg) => {
389                idx_mgr.create_scalar_index(cfg).await?;
390            }
391            IndexDefinition::Inverted(cfg) => {
392                idx_mgr.create_inverted_index(cfg).await?;
393            }
394            IndexDefinition::FullText(cfg) => {
395                idx_mgr.create_fts_index(cfg).await?;
396            }
397            IndexDefinition::JsonFullText(cfg) => {
398                idx_mgr.create_json_fts_index(cfg).await?;
399            }
400            _ => {}
401        }
402    }
403
404    Ok(())
405}
406
407async fn create_constraint_internal(
408    storage: &StorageManager,
409    target_name: &str,
410    config: &ConstraintConfig,
411    is_label: bool,
412) -> Result<()> {
413    let name = config.name.clone().unwrap_or_else(|| {
414        format!(
415            "{}_{}_{}",
416            target_name,
417            config.constraint_type.to_lowercase(),
418            config.properties.join("_")
419        )
420    });
421
422    let constraint_type = match config.constraint_type.to_uppercase().as_str() {
423        "UNIQUE" => ConstraintType::Unique {
424            properties: config.properties.clone(),
425        },
426        "EXISTS" => {
427            if config.properties.len() != 1 {
428                return Err(UniError::InvalidArgument {
429                    arg: "properties".into(),
430                    message: "EXISTS constraint requires exactly one property".into(),
431                }
432                .into());
433            }
434            ConstraintType::Exists {
435                property: config.properties[0].clone(),
436            }
437        }
438        _ => {
439            return Err(UniError::InvalidArgument {
440                arg: "type".into(),
441                message: format!("Unsupported constraint type: {}", config.constraint_type),
442            }
443            .into());
444        }
445    };
446
447    let target = if is_label {
448        ConstraintTarget::Label(target_name.to_string())
449    } else {
450        ConstraintTarget::EdgeType(target_name.to_string())
451    };
452
453    let constraint = Constraint {
454        name,
455        constraint_type,
456        target,
457        enabled: true,
458    };
459
460    storage.schema_manager().add_constraint(constraint)?;
461    Ok(())
462}
463
464fn parse_data_type(s: &str) -> Result<DataType> {
465    let s = s.trim();
466    if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
467        let inner = &s[5..s.len() - 1];
468        let inner_type = parse_data_type(inner)?;
469        return Ok(DataType::List(Box::new(inner_type)));
470    }
471
472    match s.to_uppercase().as_str() {
473        "STRING" | "UTF8" => Ok(DataType::String),
474        "INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
475        "INT32" => Ok(DataType::Int32),
476        "FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
477        "FLOAT32" => Ok(DataType::Float32),
478        "BOOL" | "BOOLEAN" => Ok(DataType::Bool),
479        "DATETIME" => Ok(DataType::DateTime),
480        "DATE" => Ok(DataType::Date),
481        "BTIC" => Ok(DataType::Btic),
482        "VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
483        _ => Err(UniError::InvalidArgument {
484            arg: "type".into(),
485            message: format!("Unknown data type: {}", s),
486        }
487        .into()),
488    }
489}