Skip to main content

uni_query/query/executor/
ddl_procedures.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::Result;
5use serde::Deserialize;
6use std::collections::HashMap;
7use uni_common::Value;
8use uni_common::{
9    UniError,
10    core::schema::{
11        Constraint, ConstraintTarget, ConstraintType, DataType, EmbeddingConfig, IndexDefinition,
12        ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, validate_identifier,
13    },
14};
15use uni_store::storage::StorageManager;
16
17#[derive(Deserialize)]
18struct LabelConfig {
19    #[serde(default)]
20    properties: HashMap<String, PropertyConfig>,
21    #[serde(default)]
22    indexes: Vec<IndexConfig>,
23    #[serde(default)]
24    constraints: Vec<ConstraintConfig>,
25    #[serde(default)]
26    pub description: Option<String>,
27}
28
29#[derive(Deserialize)]
30struct PropertyConfig {
31    #[serde(rename = "type")]
32    data_type: String,
33    #[serde(default = "default_nullable")]
34    nullable: bool,
35    #[serde(default)]
36    pub description: Option<String>,
37}
38
39fn default_nullable() -> bool {
40    true
41}
42
43#[derive(Deserialize)]
44struct IndexConfig {
45    property: Option<String>,
46    #[serde(rename = "type")]
47    index_type: String,
48    // Vector specific
49    #[expect(dead_code)]
50    dimensions: Option<usize>,
51    metric: Option<String>,
52    algorithm: Option<String>,
53    partitions: Option<u32>,
54    m: Option<u32>,
55    ef_construction: Option<u32>,
56    sub_vectors: Option<u32>,
57    num_bits: Option<u8>,
58    // MUVERA-specific (algorithm == "muvera"): FDE encoder params + inner ANN type.
59    k_sim: Option<u32>,
60    reps: Option<u32>,
61    d_proj: Option<u32>,
62    seed: Option<u64>,
63    inner: Option<String>,
64    embedding: Option<EmbeddingOptions>,
65    // Generic
66    name: Option<String>,
67}
68
69/// Embedding configuration for vector indexes via procedure API.
70#[derive(Deserialize)]
71struct EmbeddingOptions {
72    alias: String,
73    source: Vec<String>,
74    #[serde(default = "default_batch_size")]
75    batch_size: usize,
76    #[serde(default)]
77    document_prefix: Option<String>,
78    #[serde(default)]
79    query_prefix: Option<String>,
80}
81
82fn default_batch_size() -> usize {
83    32
84}
85
86#[derive(Deserialize)]
87struct ConstraintConfig {
88    #[serde(rename = "type")]
89    constraint_type: String,
90    properties: Vec<String>,
91    name: Option<String>,
92}
93
94pub async fn create_label(
95    storage: &StorageManager,
96    name: &str,
97    config_val: &Value,
98) -> Result<bool> {
99    validate_identifier(name)?;
100
101    if storage.schema_manager().schema().labels.contains_key(name) {
102        return Err(UniError::LabelAlreadyExists {
103            label: name.to_string(),
104        }
105        .into());
106    }
107
108    let json_val: serde_json::Value = config_val.clone().into();
109    let config: LabelConfig =
110        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
111            arg: "config".to_string(),
112            message: e.to_string(),
113        })?;
114
115    // Create label
116    storage
117        .schema_manager()
118        .add_label_with_desc(name, config.description)?;
119
120    // Add properties
121    for (prop_name, prop_config) in config.properties {
122        validate_identifier(&prop_name)?;
123        let data_type = parse_data_type(&prop_config.data_type)?;
124        storage.schema_manager().add_property_with_desc(
125            name,
126            &prop_name,
127            data_type,
128            prop_config.nullable,
129            prop_config.description,
130        )?;
131    }
132
133    // Add indexes
134    for idx in config.indexes {
135        if idx.property.is_none() {
136            return Err(UniError::InvalidArgument {
137                arg: "indexes".into(),
138                message: "Property name required for index definition".into(),
139            }
140            .into());
141        }
142        create_index_internal(storage, name, &idx).await?;
143    }
144
145    // Add constraints
146    for c in config.constraints {
147        create_constraint_internal(storage, name, &c, true).await?;
148    }
149
150    storage.schema_manager().save().await?;
151    Ok(true)
152}
153
154pub async fn create_edge_type(
155    storage: &StorageManager,
156    name: &str,
157    src_labels: Vec<String>,
158    dst_labels: Vec<String>,
159    config_val: &Value,
160) -> Result<bool> {
161    validate_identifier(name)?;
162
163    let json_val: serde_json::Value = config_val.clone().into();
164    let config: LabelConfig =
165        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
166            arg: "config".to_string(),
167            message: e.to_string(),
168        })?;
169
170    storage.schema_manager().add_edge_type_with_desc(
171        name,
172        src_labels,
173        dst_labels,
174        config.description,
175    )?;
176
177    for (prop_name, prop_config) in config.properties {
178        validate_identifier(&prop_name)?;
179        let data_type = parse_data_type(&prop_config.data_type)?;
180        storage.schema_manager().add_property_with_desc(
181            name,
182            &prop_name,
183            data_type,
184            prop_config.nullable,
185            prop_config.description,
186        )?;
187    }
188
189    // Constraints
190    for c in config.constraints {
191        create_constraint_internal(storage, name, &c, false).await?;
192    }
193
194    storage.schema_manager().save().await?;
195    Ok(true)
196}
197
198pub async fn create_index(
199    storage: &StorageManager,
200    label: &str,
201    property: &str,
202    config_val: &Value,
203) -> Result<bool> {
204    let json_val: serde_json::Value = config_val.clone().into();
205    let mut config: IndexConfig =
206        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
207            arg: "config".to_string(),
208            message: e.to_string(),
209        })?;
210
211    // Override property from args
212    config.property = Some(property.to_string());
213
214    create_index_internal(storage, label, &config).await?;
215    storage.schema_manager().save().await?;
216    Ok(true)
217}
218
219pub async fn create_constraint(
220    storage: &StorageManager,
221    label: &str,
222    constraint_type: &str,
223    properties: Vec<String>,
224) -> Result<bool> {
225    let config = ConstraintConfig {
226        constraint_type: constraint_type.to_string(),
227        properties,
228        name: None,
229    };
230    // Assume label target
231    create_constraint_internal(storage, label, &config, true).await?;
232    storage.schema_manager().save().await?;
233    Ok(true)
234}
235
236pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
237    storage.schema_manager().drop_label(name, true)?;
238    storage.schema_manager().save().await?;
239    Ok(true)
240}
241
242pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
243    storage.schema_manager().drop_edge_type(name, true)?;
244    storage.schema_manager().save().await?;
245    Ok(true)
246}
247
248pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
249    storage.schema_manager().remove_index(name)?;
250    storage.schema_manager().save().await?;
251    Ok(true)
252}
253
254pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
255    storage.schema_manager().drop_constraint(name, true)?;
256    storage.schema_manager().save().await?;
257    Ok(true)
258}
259
260// Internal helpers
261
262async fn create_index_internal(
263    storage: &StorageManager,
264    label: &str,
265    config: &IndexConfig,
266) -> Result<()> {
267    let prop_name = config
268        .property
269        .as_ref()
270        .ok_or_else(|| UniError::InvalidArgument {
271            arg: "property".into(),
272            message: "Property is missing".into(),
273        })?;
274
275    let index_name = config.name.clone().unwrap_or_else(|| {
276        format!(
277            "{}_{}_{}",
278            label,
279            prop_name,
280            config.index_type.to_lowercase()
281        )
282    });
283
284    let def = match config.index_type.to_uppercase().as_str() {
285        "VECTOR" => {
286            // Distance metric + index type are parsed via the SAME helpers as the DDL
287            // `CREATE VECTOR INDEX` path so dense / native-multivector / MUVERA behave
288            // identically across both entry points (incl. the default ANN = IVF_PQ).
289            let metric =
290                uni_common::vector_index_opts::parse_vector_metric(config.metric.as_deref())
291                    .map_err(|e| UniError::InvalidArgument {
292                        arg: "metric".into(),
293                        message: e.to_string(),
294                    })?;
295
296            // Parse embedding config from procedure options
297            let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
298                alias: emb.alias.clone(),
299                source_properties: emb.source.clone(),
300                batch_size: emb.batch_size,
301                document_prefix: emb.document_prefix.clone(),
302                query_prefix: emb.query_prefix.clone(),
303            });
304
305            let index_type = uni_common::vector_index_opts::build_vector_index_type(
306                &uni_common::vector_index_opts::VectorIndexOpts {
307                    type_name: config.algorithm.as_deref(),
308                    partitions: config.partitions,
309                    m: config.m,
310                    ef_construction: config.ef_construction,
311                    sub_vectors: config.sub_vectors,
312                    num_bits: config.num_bits,
313                    k_sim: config.k_sim,
314                    reps: config.reps,
315                    d_proj: config.d_proj,
316                    seed: config.seed,
317                    inner: config.inner.as_deref(),
318                },
319            );
320
321            IndexDefinition::Vector(VectorIndexConfig {
322                name: index_name,
323                label: label.to_string(),
324                property: prop_name.clone(),
325                index_type,
326                metric,
327                embedding_config,
328                metadata: Default::default(),
329            })
330        }
331        "SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
332            name: index_name,
333            label: label.to_string(),
334            properties: vec![prop_name.clone()],
335            index_type: ScalarIndexType::BTree,
336            where_clause: None,
337            metadata: Default::default(),
338        }),
339        "BITMAP" => IndexDefinition::Scalar(ScalarIndexConfig {
340            name: index_name,
341            label: label.to_string(),
342            properties: vec![prop_name.clone()],
343            index_type: ScalarIndexType::Bitmap,
344            where_clause: None,
345            metadata: Default::default(),
346        }),
347        "LABEL_LIST" | "LABELLIST" => IndexDefinition::Scalar(ScalarIndexConfig {
348            name: index_name,
349            label: label.to_string(),
350            properties: vec![prop_name.clone()],
351            index_type: ScalarIndexType::LabelList,
352            where_clause: None,
353            metadata: Default::default(),
354        }),
355        "INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
356            name: index_name,
357            label: label.to_string(),
358            property: prop_name.clone(),
359            normalize: true,
360            max_terms_per_doc: 10_000,
361            metadata: Default::default(),
362        }),
363        _ => {
364            return Err(UniError::InvalidArgument {
365                arg: "type".into(),
366                message: format!("Unsupported index type: {}", config.index_type),
367            }
368            .into());
369        }
370    };
371
372    storage.schema_manager().add_index(def.clone())?;
373
374    let idx_mgr = storage.index_manager();
375    match def {
376        // Vector indexes ALWAYS build, matching the DDL `CREATE VECTOR INDEX` path:
377        // `create_vector_index` handles an empty/not-yet-created dataset gracefully, and
378        // MUVERA must register (+ backfill when data exists) its derived FDE column even
379        // before any rows exist (create-before-ingest). `prepare_muvera_index` is a no-op
380        // for non-MUVERA vector indexes.
381        IndexDefinition::Vector(cfg) => {
382            idx_mgr.create_vector_index(cfg).await?;
383        }
384        // Non-vector indexes keep the build-if-data-exists optimization.
385        other => {
386            let count = if let Ok(ds) = storage.vertex_dataset(label) {
387                if let Ok(raw) = ds.open_raw().await {
388                    raw.count_rows(None).await.unwrap_or(0)
389                } else {
390                    0
391                }
392            } else {
393                0
394            };
395            if count > 0 {
396                match other {
397                    IndexDefinition::Scalar(cfg) => idx_mgr.create_scalar_index(cfg).await?,
398                    IndexDefinition::Inverted(cfg) => idx_mgr.create_inverted_index(cfg).await?,
399                    IndexDefinition::FullText(cfg) => idx_mgr.create_fts_index(cfg).await?,
400                    IndexDefinition::JsonFullText(cfg) => {
401                        idx_mgr.create_json_fts_index(cfg).await?
402                    }
403                    _ => {}
404                }
405            }
406        }
407    }
408
409    Ok(())
410}
411
412async fn create_constraint_internal(
413    storage: &StorageManager,
414    target_name: &str,
415    config: &ConstraintConfig,
416    is_label: bool,
417) -> Result<()> {
418    let name = config.name.clone().unwrap_or_else(|| {
419        format!(
420            "{}_{}_{}",
421            target_name,
422            config.constraint_type.to_lowercase(),
423            config.properties.join("_")
424        )
425    });
426
427    let constraint_type = match config.constraint_type.to_uppercase().as_str() {
428        "UNIQUE" => ConstraintType::Unique {
429            properties: config.properties.clone(),
430        },
431        "EXISTS" => {
432            if config.properties.len() != 1 {
433                return Err(UniError::InvalidArgument {
434                    arg: "properties".into(),
435                    message: "EXISTS constraint requires exactly one property".into(),
436                }
437                .into());
438            }
439            ConstraintType::Exists {
440                property: config.properties[0].clone(),
441            }
442        }
443        _ => {
444            return Err(UniError::InvalidArgument {
445                arg: "type".into(),
446                message: format!("Unsupported constraint type: {}", config.constraint_type),
447            }
448            .into());
449        }
450    };
451
452    let target = if is_label {
453        ConstraintTarget::Label(target_name.to_string())
454    } else {
455        ConstraintTarget::EdgeType(target_name.to_string())
456    };
457
458    let constraint = Constraint {
459        name,
460        constraint_type,
461        target,
462        enabled: true,
463    };
464
465    storage.schema_manager().add_constraint(constraint)?;
466    Ok(())
467}
468
469fn parse_data_type(s: &str) -> Result<DataType> {
470    let s = s.trim();
471    if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
472        let inner = &s[5..s.len() - 1];
473        let inner_type = parse_data_type(inner)?;
474        return Ok(DataType::List(Box::new(inner_type)));
475    }
476    if s.to_uppercase().starts_with("MAP<") && s.ends_with('>') {
477        let (k_str, v_str) = split_map_kv(&s[4..s.len() - 1])?;
478        let key_type = parse_data_type(&k_str)?;
479        if !matches!(key_type, DataType::String) {
480            return Err(UniError::InvalidArgument {
481                arg: "type".into(),
482                message: format!("MAP key type must be STRING, got: {k_str}"),
483            }
484            .into());
485        }
486        let value_type = parse_data_type(&v_str)?;
487        return Ok(DataType::Map(Box::new(key_type), Box::new(value_type)));
488    }
489
490    match s.to_uppercase().as_str() {
491        "STRING" | "UTF8" => Ok(DataType::String),
492        "INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
493        "INT32" => Ok(DataType::Int32),
494        "FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
495        "FLOAT32" => Ok(DataType::Float32),
496        "BOOL" | "BOOLEAN" => Ok(DataType::Bool),
497        "DATETIME" => Ok(DataType::DateTime),
498        "DATE" => Ok(DataType::Date),
499        "BTIC" => Ok(DataType::Btic),
500        "VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
501        _ => Err(UniError::InvalidArgument {
502            arg: "type".into(),
503            message: format!("Unknown data type: {}", s),
504        }
505        .into()),
506    }
507}
508
509/// Split a `MAP<K, V>` inner string on the top-level comma, respecting `<>`/`()` depth so
510/// nested value types split at the right comma. Returns trimmed `(key, value)` strings.
511fn split_map_kv(inner: &str) -> Result<(String, String)> {
512    let mut depth = 0i32;
513    for (i, c) in inner.char_indices() {
514        match c {
515            '<' | '(' => depth += 1,
516            '>' | ')' => depth -= 1,
517            ',' if depth == 0 => {
518                let k = inner[..i].trim();
519                let v = inner[i + 1..].trim();
520                if k.is_empty() || v.is_empty() {
521                    return Err(UniError::InvalidArgument {
522                        arg: "type".into(),
523                        message: "MAP<K,V> requires both a key and a value type".into(),
524                    }
525                    .into());
526                }
527                return Ok((k.to_string(), v.to_string()));
528            }
529            _ => {}
530        }
531    }
532    Err(UniError::InvalidArgument {
533        arg: "type".into(),
534        message: "MAP<K,V> requires a comma separating key and value types".into(),
535    }
536    .into())
537}