Skip to main content

uni_query/query/executor/
ddl_procedures.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::Result;
5use serde::Deserialize;
6use std::collections::HashMap;
7use uni_common::Value;
8use uni_common::{
9    UniError,
10    core::schema::{
11        Constraint, ConstraintTarget, ConstraintType, DataType, DistanceMetric, EmbeddingConfig,
12        IndexDefinition, ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
13        validate_identifier,
14    },
15};
16use uni_store::storage::StorageManager;
17
18#[derive(Deserialize)]
19struct LabelConfig {
20    #[serde(default)]
21    properties: HashMap<String, PropertyConfig>,
22    #[serde(default)]
23    indexes: Vec<IndexConfig>,
24    #[serde(default)]
25    constraints: Vec<ConstraintConfig>,
26    #[serde(default)]
27    pub description: Option<String>,
28}
29
30#[derive(Deserialize)]
31struct PropertyConfig {
32    #[serde(rename = "type")]
33    data_type: String,
34    #[serde(default = "default_nullable")]
35    nullable: bool,
36    #[serde(default)]
37    pub description: Option<String>,
38}
39
40fn default_nullable() -> bool {
41    true
42}
43
44#[derive(Deserialize)]
45struct IndexConfig {
46    property: Option<String>,
47    #[serde(rename = "type")]
48    index_type: String,
49    // Vector specific
50    #[expect(dead_code)]
51    dimensions: Option<usize>,
52    metric: Option<String>,
53    algorithm: Option<String>,
54    partitions: Option<u32>,
55    m: Option<u32>,
56    ef_construction: Option<u32>,
57    sub_vectors: Option<u32>,
58    num_bits: Option<u8>,
59    embedding: Option<EmbeddingOptions>,
60    // Generic
61    name: Option<String>,
62}
63
64/// Embedding configuration for vector indexes via procedure API.
65#[derive(Deserialize)]
66struct EmbeddingOptions {
67    alias: String,
68    source: Vec<String>,
69    #[serde(default = "default_batch_size")]
70    batch_size: usize,
71    #[serde(default)]
72    document_prefix: Option<String>,
73    #[serde(default)]
74    query_prefix: Option<String>,
75}
76
77fn default_batch_size() -> usize {
78    32
79}
80
81#[derive(Deserialize)]
82struct ConstraintConfig {
83    #[serde(rename = "type")]
84    constraint_type: String,
85    properties: Vec<String>,
86    name: Option<String>,
87}
88
89pub async fn create_label(
90    storage: &StorageManager,
91    name: &str,
92    config_val: &Value,
93) -> Result<bool> {
94    validate_identifier(name)?;
95
96    if storage.schema_manager().schema().labels.contains_key(name) {
97        return Err(UniError::LabelAlreadyExists {
98            label: name.to_string(),
99        }
100        .into());
101    }
102
103    let json_val: serde_json::Value = config_val.clone().into();
104    let config: LabelConfig =
105        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
106            arg: "config".to_string(),
107            message: e.to_string(),
108        })?;
109
110    // Create label
111    storage
112        .schema_manager()
113        .add_label_with_desc(name, config.description)?;
114
115    // Add properties
116    for (prop_name, prop_config) in config.properties {
117        validate_identifier(&prop_name)?;
118        let data_type = parse_data_type(&prop_config.data_type)?;
119        storage.schema_manager().add_property_with_desc(
120            name,
121            &prop_name,
122            data_type,
123            prop_config.nullable,
124            prop_config.description,
125        )?;
126    }
127
128    // Add indexes
129    for idx in config.indexes {
130        if idx.property.is_none() {
131            return Err(UniError::InvalidArgument {
132                arg: "indexes".into(),
133                message: "Property name required for index definition".into(),
134            }
135            .into());
136        }
137        create_index_internal(storage, name, &idx).await?;
138    }
139
140    // Add constraints
141    for c in config.constraints {
142        create_constraint_internal(storage, name, &c, true).await?;
143    }
144
145    storage.schema_manager().save().await?;
146    Ok(true)
147}
148
149pub async fn create_edge_type(
150    storage: &StorageManager,
151    name: &str,
152    src_labels: Vec<String>,
153    dst_labels: Vec<String>,
154    config_val: &Value,
155) -> Result<bool> {
156    validate_identifier(name)?;
157
158    let json_val: serde_json::Value = config_val.clone().into();
159    let config: LabelConfig =
160        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
161            arg: "config".to_string(),
162            message: e.to_string(),
163        })?;
164
165    storage.schema_manager().add_edge_type_with_desc(
166        name,
167        src_labels,
168        dst_labels,
169        config.description,
170    )?;
171
172    for (prop_name, prop_config) in config.properties {
173        validate_identifier(&prop_name)?;
174        let data_type = parse_data_type(&prop_config.data_type)?;
175        storage.schema_manager().add_property_with_desc(
176            name,
177            &prop_name,
178            data_type,
179            prop_config.nullable,
180            prop_config.description,
181        )?;
182    }
183
184    // Constraints
185    for c in config.constraints {
186        create_constraint_internal(storage, name, &c, false).await?;
187    }
188
189    storage.schema_manager().save().await?;
190    Ok(true)
191}
192
193pub async fn create_index(
194    storage: &StorageManager,
195    label: &str,
196    property: &str,
197    config_val: &Value,
198) -> Result<bool> {
199    let json_val: serde_json::Value = config_val.clone().into();
200    let mut config: IndexConfig =
201        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
202            arg: "config".to_string(),
203            message: e.to_string(),
204        })?;
205
206    // Override property from args
207    config.property = Some(property.to_string());
208
209    create_index_internal(storage, label, &config).await?;
210    storage.schema_manager().save().await?;
211    Ok(true)
212}
213
214pub async fn create_constraint(
215    storage: &StorageManager,
216    label: &str,
217    constraint_type: &str,
218    properties: Vec<String>,
219) -> Result<bool> {
220    let config = ConstraintConfig {
221        constraint_type: constraint_type.to_string(),
222        properties,
223        name: None,
224    };
225    // Assume label target
226    create_constraint_internal(storage, label, &config, true).await?;
227    storage.schema_manager().save().await?;
228    Ok(true)
229}
230
231pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
232    storage.schema_manager().drop_label(name, true)?;
233    storage.schema_manager().save().await?;
234    Ok(true)
235}
236
237pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
238    storage.schema_manager().drop_edge_type(name, true)?;
239    storage.schema_manager().save().await?;
240    Ok(true)
241}
242
243pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
244    storage.schema_manager().remove_index(name)?;
245    storage.schema_manager().save().await?;
246    Ok(true)
247}
248
249pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
250    storage.schema_manager().drop_constraint(name, true)?;
251    storage.schema_manager().save().await?;
252    Ok(true)
253}
254
255// Internal helpers
256
257async fn create_index_internal(
258    storage: &StorageManager,
259    label: &str,
260    config: &IndexConfig,
261) -> Result<()> {
262    let prop_name = config
263        .property
264        .as_ref()
265        .ok_or_else(|| UniError::InvalidArgument {
266            arg: "property".into(),
267            message: "Property is missing".into(),
268        })?;
269
270    let index_name = config.name.clone().unwrap_or_else(|| {
271        format!(
272            "{}_{}_{}",
273            label,
274            prop_name,
275            config.index_type.to_lowercase()
276        )
277    });
278
279    let def = match config.index_type.to_uppercase().as_str() {
280        "VECTOR" => {
281            let metric = match config.metric.as_deref().unwrap_or("cosine") {
282                "cosine" => DistanceMetric::Cosine,
283                "l2" | "euclidean" => DistanceMetric::L2,
284                "dot" => DistanceMetric::Dot,
285                _ => {
286                    return Err(UniError::InvalidArgument {
287                        arg: "metric".into(),
288                        message: "Invalid metric".into(),
289                    }
290                    .into());
291                }
292            };
293
294            // Parse embedding config from procedure options
295            let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
296                alias: emb.alias.clone(),
297                source_properties: emb.source.clone(),
298                batch_size: emb.batch_size,
299                document_prefix: emb.document_prefix.clone(),
300                query_prefix: emb.query_prefix.clone(),
301            });
302
303            let algorithm = config.algorithm.as_deref().unwrap_or("hnsw");
304            let index_type = match algorithm.to_lowercase().as_str() {
305                "flat" => VectorIndexType::Flat,
306                "ivf_flat" => VectorIndexType::IvfFlat {
307                    num_partitions: config.partitions.unwrap_or(256),
308                },
309                "ivf_pq" => VectorIndexType::IvfPq {
310                    num_partitions: config.partitions.unwrap_or(256),
311                    num_sub_vectors: config.sub_vectors.unwrap_or(16),
312                    bits_per_subvector: config.num_bits.unwrap_or(8),
313                },
314                "ivf_sq" => VectorIndexType::IvfSq {
315                    num_partitions: config.partitions.unwrap_or(256),
316                },
317                "ivf_rq" => VectorIndexType::IvfRq {
318                    num_partitions: config.partitions.unwrap_or(256),
319                    num_bits: config.num_bits,
320                },
321                "hnsw_flat" => VectorIndexType::HnswFlat {
322                    m: config.m.unwrap_or(16),
323                    ef_construction: config.ef_construction.unwrap_or(200),
324                    num_partitions: config.partitions,
325                },
326                "hnsw_pq" => VectorIndexType::HnswPq {
327                    m: config.m.unwrap_or(16),
328                    ef_construction: config.ef_construction.unwrap_or(200),
329                    num_sub_vectors: config.sub_vectors.unwrap_or(16),
330                    num_partitions: config.partitions,
331                },
332                _ => VectorIndexType::HnswSq {
333                    m: config.m.unwrap_or(16),
334                    ef_construction: config.ef_construction.unwrap_or(200),
335                    num_partitions: config.partitions,
336                },
337            };
338
339            IndexDefinition::Vector(VectorIndexConfig {
340                name: index_name,
341                label: label.to_string(),
342                property: prop_name.clone(),
343                index_type,
344                metric,
345                embedding_config,
346                metadata: Default::default(),
347            })
348        }
349        "SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
350            name: index_name,
351            label: label.to_string(),
352            properties: vec![prop_name.clone()],
353            index_type: ScalarIndexType::BTree,
354            where_clause: None,
355            metadata: Default::default(),
356        }),
357        "BITMAP" => IndexDefinition::Scalar(ScalarIndexConfig {
358            name: index_name,
359            label: label.to_string(),
360            properties: vec![prop_name.clone()],
361            index_type: ScalarIndexType::Bitmap,
362            where_clause: None,
363            metadata: Default::default(),
364        }),
365        "LABEL_LIST" | "LABELLIST" => IndexDefinition::Scalar(ScalarIndexConfig {
366            name: index_name,
367            label: label.to_string(),
368            properties: vec![prop_name.clone()],
369            index_type: ScalarIndexType::LabelList,
370            where_clause: None,
371            metadata: Default::default(),
372        }),
373        "INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
374            name: index_name,
375            label: label.to_string(),
376            property: prop_name.clone(),
377            normalize: true,
378            max_terms_per_doc: 10_000,
379            metadata: Default::default(),
380        }),
381        _ => {
382            return Err(UniError::InvalidArgument {
383                arg: "type".into(),
384                message: format!("Unsupported index type: {}", config.index_type),
385            }
386            .into());
387        }
388    };
389
390    storage.schema_manager().add_index(def.clone())?;
391
392    // Trigger build if data exists
393    let count = if let Ok(ds) = storage.vertex_dataset(label) {
394        if let Ok(raw) = ds.open_raw().await {
395            raw.count_rows(None).await.unwrap_or(0)
396        } else {
397            0
398        }
399    } else {
400        0
401    };
402
403    tracing::debug!("create_index_internal count for {}: {}", label, count);
404
405    if count > 0 {
406        let idx_mgr = storage.index_manager();
407        match def {
408            IndexDefinition::Vector(cfg) => {
409                idx_mgr.create_vector_index(cfg).await?;
410            }
411            IndexDefinition::Scalar(cfg) => {
412                idx_mgr.create_scalar_index(cfg).await?;
413            }
414            IndexDefinition::Inverted(cfg) => {
415                idx_mgr.create_inverted_index(cfg).await?;
416            }
417            IndexDefinition::FullText(cfg) => {
418                idx_mgr.create_fts_index(cfg).await?;
419            }
420            IndexDefinition::JsonFullText(cfg) => {
421                idx_mgr.create_json_fts_index(cfg).await?;
422            }
423            _ => {}
424        }
425    }
426
427    Ok(())
428}
429
430async fn create_constraint_internal(
431    storage: &StorageManager,
432    target_name: &str,
433    config: &ConstraintConfig,
434    is_label: bool,
435) -> Result<()> {
436    let name = config.name.clone().unwrap_or_else(|| {
437        format!(
438            "{}_{}_{}",
439            target_name,
440            config.constraint_type.to_lowercase(),
441            config.properties.join("_")
442        )
443    });
444
445    let constraint_type = match config.constraint_type.to_uppercase().as_str() {
446        "UNIQUE" => ConstraintType::Unique {
447            properties: config.properties.clone(),
448        },
449        "EXISTS" => {
450            if config.properties.len() != 1 {
451                return Err(UniError::InvalidArgument {
452                    arg: "properties".into(),
453                    message: "EXISTS constraint requires exactly one property".into(),
454                }
455                .into());
456            }
457            ConstraintType::Exists {
458                property: config.properties[0].clone(),
459            }
460        }
461        _ => {
462            return Err(UniError::InvalidArgument {
463                arg: "type".into(),
464                message: format!("Unsupported constraint type: {}", config.constraint_type),
465            }
466            .into());
467        }
468    };
469
470    let target = if is_label {
471        ConstraintTarget::Label(target_name.to_string())
472    } else {
473        ConstraintTarget::EdgeType(target_name.to_string())
474    };
475
476    let constraint = Constraint {
477        name,
478        constraint_type,
479        target,
480        enabled: true,
481    };
482
483    storage.schema_manager().add_constraint(constraint)?;
484    Ok(())
485}
486
487fn parse_data_type(s: &str) -> Result<DataType> {
488    let s = s.trim();
489    if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
490        let inner = &s[5..s.len() - 1];
491        let inner_type = parse_data_type(inner)?;
492        return Ok(DataType::List(Box::new(inner_type)));
493    }
494
495    match s.to_uppercase().as_str() {
496        "STRING" | "UTF8" => Ok(DataType::String),
497        "INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
498        "INT32" => Ok(DataType::Int32),
499        "FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
500        "FLOAT32" => Ok(DataType::Float32),
501        "BOOL" | "BOOLEAN" => Ok(DataType::Bool),
502        "DATETIME" => Ok(DataType::DateTime),
503        "DATE" => Ok(DataType::Date),
504        "BTIC" => Ok(DataType::Btic),
505        "VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
506        _ => Err(UniError::InvalidArgument {
507            arg: "type".into(),
508            message: format!("Unknown data type: {}", s),
509        }
510        .into()),
511    }
512}