Skip to main content

uni_query/query/executor/
ddl_procedures.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use anyhow::Result;
5use serde::Deserialize;
6use std::collections::HashMap;
7use uni_common::Value;
8use uni_common::{
9    UniError,
10    core::schema::{
11        Constraint, ConstraintTarget, ConstraintType, DataType, EmbeddingConfig, IndexDefinition,
12        ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, validate_identifier,
13    },
14};
15use uni_store::storage::StorageManager;
16
17#[derive(Deserialize)]
18struct LabelConfig {
19    #[serde(default)]
20    properties: HashMap<String, PropertyConfig>,
21    #[serde(default)]
22    indexes: Vec<IndexConfig>,
23    #[serde(default)]
24    constraints: Vec<ConstraintConfig>,
25    #[serde(default)]
26    pub description: Option<String>,
27}
28
29#[derive(Deserialize)]
30struct PropertyConfig {
31    #[serde(rename = "type")]
32    data_type: String,
33    #[serde(default = "default_nullable")]
34    nullable: bool,
35    #[serde(default)]
36    pub description: Option<String>,
37}
38
39fn default_nullable() -> bool {
40    true
41}
42
43#[derive(Deserialize)]
44struct IndexConfig {
45    property: Option<String>,
46    #[serde(rename = "type")]
47    index_type: String,
48    // Vector specific
49    #[expect(dead_code)]
50    dimensions: Option<usize>,
51    metric: Option<String>,
52    algorithm: Option<String>,
53    partitions: Option<u32>,
54    m: Option<u32>,
55    ef_construction: Option<u32>,
56    sub_vectors: Option<u32>,
57    num_bits: Option<u8>,
58    // MUVERA-specific (algorithm == "muvera"): FDE encoder params + inner ANN type.
59    k_sim: Option<u32>,
60    reps: Option<u32>,
61    d_proj: Option<u32>,
62    seed: Option<u64>,
63    inner: Option<String>,
64    embedding: Option<EmbeddingOptions>,
65    // Sparse-specific: 8-bit weight quantization (default on).
66    quantize: Option<bool>,
67    // Generic
68    name: Option<String>,
69}
70
71/// Embedding configuration for vector indexes via procedure API.
72#[derive(Deserialize)]
73struct EmbeddingOptions {
74    alias: String,
75    source: Vec<String>,
76    #[serde(default = "default_batch_size")]
77    batch_size: usize,
78    #[serde(default)]
79    document_prefix: Option<String>,
80    #[serde(default)]
81    query_prefix: Option<String>,
82}
83
84fn default_batch_size() -> usize {
85    32
86}
87
88#[derive(Deserialize)]
89struct ConstraintConfig {
90    #[serde(rename = "type")]
91    constraint_type: String,
92    properties: Vec<String>,
93    name: Option<String>,
94}
95
96pub async fn create_label(
97    storage: &StorageManager,
98    name: &str,
99    config_val: &Value,
100) -> Result<bool> {
101    validate_identifier(name)?;
102
103    if storage.schema_manager().schema().labels.contains_key(name) {
104        return Err(UniError::LabelAlreadyExists {
105            label: name.to_string(),
106        }
107        .into());
108    }
109
110    let json_val: serde_json::Value = config_val.clone().into();
111    let config: LabelConfig =
112        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
113            arg: "config".to_string(),
114            message: e.to_string(),
115        })?;
116
117    // Create label
118    storage
119        .schema_manager()
120        .add_label_with_desc(name, config.description)?;
121
122    // Add properties
123    for (prop_name, prop_config) in config.properties {
124        validate_identifier(&prop_name)?;
125        let data_type = parse_data_type(&prop_config.data_type)?;
126        storage.schema_manager().add_property_with_desc(
127            name,
128            &prop_name,
129            data_type,
130            prop_config.nullable,
131            prop_config.description,
132        )?;
133    }
134
135    // Add indexes
136    for idx in config.indexes {
137        if idx.property.is_none() {
138            return Err(UniError::InvalidArgument {
139                arg: "indexes".into(),
140                message: "Property name required for index definition".into(),
141            }
142            .into());
143        }
144        create_index_internal(storage, name, &idx).await?;
145    }
146
147    // Add constraints
148    for c in config.constraints {
149        create_constraint_internal(storage, name, &c, true).await?;
150    }
151
152    storage.schema_manager().save().await?;
153    Ok(true)
154}
155
156pub async fn create_edge_type(
157    storage: &StorageManager,
158    name: &str,
159    src_labels: Vec<String>,
160    dst_labels: Vec<String>,
161    config_val: &Value,
162) -> Result<bool> {
163    validate_identifier(name)?;
164
165    let json_val: serde_json::Value = config_val.clone().into();
166    let config: LabelConfig =
167        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
168            arg: "config".to_string(),
169            message: e.to_string(),
170        })?;
171
172    storage.schema_manager().add_edge_type_with_desc(
173        name,
174        src_labels,
175        dst_labels,
176        config.description,
177    )?;
178
179    for (prop_name, prop_config) in config.properties {
180        validate_identifier(&prop_name)?;
181        let data_type = parse_data_type(&prop_config.data_type)?;
182        storage.schema_manager().add_property_with_desc(
183            name,
184            &prop_name,
185            data_type,
186            prop_config.nullable,
187            prop_config.description,
188        )?;
189    }
190
191    // Constraints
192    for c in config.constraints {
193        create_constraint_internal(storage, name, &c, false).await?;
194    }
195
196    storage.schema_manager().save().await?;
197    Ok(true)
198}
199
200pub async fn create_index(
201    storage: &StorageManager,
202    label: &str,
203    property: &str,
204    config_val: &Value,
205) -> Result<bool> {
206    let json_val: serde_json::Value = config_val.clone().into();
207    let mut config: IndexConfig =
208        serde_json::from_value(json_val).map_err(|e| UniError::InvalidArgument {
209            arg: "config".to_string(),
210            message: e.to_string(),
211        })?;
212
213    // Override property from args
214    config.property = Some(property.to_string());
215
216    create_index_internal(storage, label, &config).await?;
217    storage.schema_manager().save().await?;
218    Ok(true)
219}
220
221pub async fn create_constraint(
222    storage: &StorageManager,
223    label: &str,
224    constraint_type: &str,
225    properties: Vec<String>,
226) -> Result<bool> {
227    let config = ConstraintConfig {
228        constraint_type: constraint_type.to_string(),
229        properties,
230        name: None,
231    };
232    // Assume label target
233    create_constraint_internal(storage, label, &config, true).await?;
234    storage.schema_manager().save().await?;
235    Ok(true)
236}
237
238pub async fn drop_label(storage: &StorageManager, name: &str) -> Result<bool> {
239    storage.schema_manager().drop_label(name, true)?;
240    storage.schema_manager().save().await?;
241    Ok(true)
242}
243
244pub async fn drop_edge_type(storage: &StorageManager, name: &str) -> Result<bool> {
245    storage.schema_manager().drop_edge_type(name, true)?;
246    storage.schema_manager().save().await?;
247    Ok(true)
248}
249
250pub async fn drop_index(storage: &StorageManager, name: &str) -> Result<bool> {
251    storage.schema_manager().remove_index(name)?;
252    storage.schema_manager().save().await?;
253    Ok(true)
254}
255
256pub async fn drop_constraint(storage: &StorageManager, name: &str) -> Result<bool> {
257    storage.schema_manager().drop_constraint(name, true)?;
258    storage.schema_manager().save().await?;
259    Ok(true)
260}
261
262// Internal helpers
263
264async fn create_index_internal(
265    storage: &StorageManager,
266    label: &str,
267    config: &IndexConfig,
268) -> Result<()> {
269    let prop_name = config
270        .property
271        .as_ref()
272        .ok_or_else(|| UniError::InvalidArgument {
273            arg: "property".into(),
274            message: "Property is missing".into(),
275        })?;
276
277    let index_name = config.name.clone().unwrap_or_else(|| {
278        format!(
279            "{}_{}_{}",
280            label,
281            prop_name,
282            config.index_type.to_lowercase()
283        )
284    });
285
286    let def = match config.index_type.to_uppercase().as_str() {
287        "VECTOR" => {
288            // Distance metric + index type are parsed via the SAME helpers as the DDL
289            // `CREATE VECTOR INDEX` path so dense / native-multivector / MUVERA behave
290            // identically across both entry points (incl. the default ANN = IVF_PQ).
291            let metric =
292                uni_common::vector_index_opts::parse_vector_metric(config.metric.as_deref())
293                    .map_err(|e| UniError::InvalidArgument {
294                        arg: "metric".into(),
295                        message: e.to_string(),
296                    })?;
297
298            // Parse embedding config from procedure options
299            let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
300                alias: emb.alias.clone(),
301                source_properties: emb.source.clone(),
302                batch_size: emb.batch_size,
303                document_prefix: emb.document_prefix.clone(),
304                query_prefix: emb.query_prefix.clone(),
305            });
306
307            let index_type = uni_common::vector_index_opts::build_vector_index_type(
308                &uni_common::vector_index_opts::VectorIndexOpts {
309                    type_name: config.algorithm.as_deref(),
310                    partitions: config.partitions,
311                    m: config.m,
312                    ef_construction: config.ef_construction,
313                    sub_vectors: config.sub_vectors,
314                    num_bits: config.num_bits,
315                    k_sim: config.k_sim,
316                    reps: config.reps,
317                    d_proj: config.d_proj,
318                    seed: config.seed,
319                    inner: config.inner.as_deref(),
320                },
321            );
322
323            IndexDefinition::Vector(VectorIndexConfig {
324                name: index_name,
325                label: label.to_string(),
326                property: prop_name.clone(),
327                index_type,
328                metric,
329                embedding_config,
330                metadata: Default::default(),
331            })
332        }
333        "SCALAR" | "BTREE" => IndexDefinition::Scalar(ScalarIndexConfig {
334            name: index_name,
335            label: label.to_string(),
336            properties: vec![prop_name.clone()],
337            index_type: ScalarIndexType::BTree,
338            where_clause: None,
339            metadata: Default::default(),
340        }),
341        "BITMAP" => IndexDefinition::Scalar(ScalarIndexConfig {
342            name: index_name,
343            label: label.to_string(),
344            properties: vec![prop_name.clone()],
345            index_type: ScalarIndexType::Bitmap,
346            where_clause: None,
347            metadata: Default::default(),
348        }),
349        "LABEL_LIST" | "LABELLIST" => IndexDefinition::Scalar(ScalarIndexConfig {
350            name: index_name,
351            label: label.to_string(),
352            properties: vec![prop_name.clone()],
353            index_type: ScalarIndexType::LabelList,
354            where_clause: None,
355            metadata: Default::default(),
356        }),
357        "INVERTED" => IndexDefinition::Inverted(uni_common::core::schema::InvertedIndexConfig {
358            name: index_name,
359            label: label.to_string(),
360            property: prop_name.clone(),
361            normalize: true,
362            max_terms_per_doc: 10_000,
363            metadata: Default::default(),
364        }),
365        "SPARSE" => {
366            // The term-space cardinality comes from the declared
367            // `DataType::SparseVector { dimensions }` of the column.
368            let dimensions = storage
369                .schema_manager()
370                .schema()
371                .properties
372                .get(label)
373                .and_then(|props| props.get(prop_name))
374                .and_then(|meta| match &meta.r#type {
375                    uni_common::DataType::SparseVector { dimensions } => Some(*dimensions),
376                    _ => None,
377                })
378                .ok_or_else(|| UniError::InvalidArgument {
379                    arg: "property".into(),
380                    message: format!(
381                        "Property '{prop_name}' is not a SparseVector column; cannot create a sparse index"
382                    ),
383                })?;
384            // Auto-embed config, same shape as the VECTOR arm.
385            let embedding_config = config.embedding.as_ref().map(|emb| EmbeddingConfig {
386                alias: emb.alias.clone(),
387                source_properties: emb.source.clone(),
388                batch_size: emb.batch_size,
389                document_prefix: emb.document_prefix.clone(),
390                query_prefix: emb.query_prefix.clone(),
391            });
392            IndexDefinition::Sparse(uni_common::core::schema::SparseVectorIndexConfig {
393                name: index_name,
394                label: label.to_string(),
395                property: prop_name.clone(),
396                dimensions,
397                // `OPTIONS{type:'sparse', quantize:false}` stores lossless f32.
398                quantize: config.quantize.unwrap_or(true),
399                embedding_config,
400                metadata: Default::default(),
401            })
402        }
403        _ => {
404            return Err(UniError::InvalidArgument {
405                arg: "type".into(),
406                message: format!("Unsupported index type: {}", config.index_type),
407            }
408            .into());
409        }
410    };
411
412    storage.schema_manager().add_index(def.clone())?;
413
414    let idx_mgr = storage.index_manager();
415    match def {
416        // Vector indexes ALWAYS build, matching the DDL `CREATE VECTOR INDEX` path:
417        // `create_vector_index` handles an empty/not-yet-created dataset gracefully, and
418        // MUVERA must register (+ backfill when data exists) its derived FDE column even
419        // before any rows exist (create-before-ingest). `prepare_muvera_index` is a no-op
420        // for non-MUVERA vector indexes.
421        IndexDefinition::Vector(cfg) => {
422            idx_mgr.create_vector_index(cfg).await?;
423        }
424        // Sparse ALSO always builds, like Vector: `create_sparse_vector_index`
425        // handles an empty table (create-before-ingest → populated on flush) and
426        // backfills already-flushed rows via the storage *backend*. It must NOT be
427        // gated by the raw-dataset `count` below, which reads 0 for a flushed
428        // LanceDB-managed table (the table is not at the raw `{base}/vertices_<L>`
429        // path); that gate would silently leave the index empty after a flush.
430        IndexDefinition::Sparse(cfg) => {
431            idx_mgr.create_sparse_vector_index(cfg).await?;
432        }
433        // Remaining non-vector indexes keep the build-if-data-exists optimization.
434        other => {
435            // Build-if-data-exists gate, read through the `StorageBackend`
436            // (correct `.lance` path). With the prior raw-dataset read this
437            // counted 0 for any flushed table, so the physical index was
438            // silently skipped after a flush (#115). A not-yet-flushed table
439            // legitimately has no rows here and is populated on the next flush.
440            let backend = storage.backend();
441            let table = uni_store::backend::table_names::vertex_table_name(label);
442            let count = if backend.table_exists(&table).await? {
443                backend.count_rows(&table, None).await?
444            } else {
445                0
446            };
447            if count > 0 {
448                match other {
449                    IndexDefinition::Scalar(cfg) => idx_mgr.create_scalar_index(cfg).await?,
450                    IndexDefinition::Inverted(cfg) => idx_mgr.create_inverted_index(cfg).await?,
451                    IndexDefinition::FullText(cfg) => idx_mgr.create_fts_index(cfg).await?,
452                    IndexDefinition::JsonFullText(cfg) => {
453                        idx_mgr.create_json_fts_index(cfg).await?
454                    }
455                    _ => {}
456                }
457            }
458        }
459    }
460
461    Ok(())
462}
463
464async fn create_constraint_internal(
465    storage: &StorageManager,
466    target_name: &str,
467    config: &ConstraintConfig,
468    is_label: bool,
469) -> Result<()> {
470    let name = config.name.clone().unwrap_or_else(|| {
471        format!(
472            "{}_{}_{}",
473            target_name,
474            config.constraint_type.to_lowercase(),
475            config.properties.join("_")
476        )
477    });
478
479    let constraint_type = match config.constraint_type.to_uppercase().as_str() {
480        "UNIQUE" => ConstraintType::Unique {
481            properties: config.properties.clone(),
482        },
483        "EXISTS" => {
484            if config.properties.len() != 1 {
485                return Err(UniError::InvalidArgument {
486                    arg: "properties".into(),
487                    message: "EXISTS constraint requires exactly one property".into(),
488                }
489                .into());
490            }
491            ConstraintType::Exists {
492                property: config.properties[0].clone(),
493            }
494        }
495        _ => {
496            return Err(UniError::InvalidArgument {
497                arg: "type".into(),
498                message: format!("Unsupported constraint type: {}", config.constraint_type),
499            }
500            .into());
501        }
502    };
503
504    let target = if is_label {
505        ConstraintTarget::Label(target_name.to_string())
506    } else {
507        ConstraintTarget::EdgeType(target_name.to_string())
508    };
509
510    let constraint = Constraint {
511        name,
512        constraint_type,
513        target,
514        enabled: true,
515    };
516
517    storage.schema_manager().add_constraint(constraint)?;
518    Ok(())
519}
520
521fn parse_data_type(s: &str) -> Result<DataType> {
522    let s = s.trim();
523    if s.to_uppercase().starts_with("LIST<") && s.ends_with('>') {
524        let inner = &s[5..s.len() - 1];
525        let inner_type = parse_data_type(inner)?;
526        return Ok(DataType::List(Box::new(inner_type)));
527    }
528    if s.to_uppercase().starts_with("MAP<") && s.ends_with('>') {
529        let (k_str, v_str) = split_map_kv(&s[4..s.len() - 1])?;
530        let key_type = parse_data_type(&k_str)?;
531        if !matches!(key_type, DataType::String) {
532            return Err(UniError::InvalidArgument {
533                arg: "type".into(),
534                message: format!("MAP key type must be STRING, got: {k_str}"),
535            }
536            .into());
537        }
538        let value_type = parse_data_type(&v_str)?;
539        return Ok(DataType::Map(Box::new(key_type), Box::new(value_type)));
540    }
541
542    match s.to_uppercase().as_str() {
543        "STRING" | "UTF8" => Ok(DataType::String),
544        "INT" | "INTEGER" | "INT64" => Ok(DataType::Int64),
545        "INT32" => Ok(DataType::Int32),
546        "FLOAT" | "FLOAT64" | "DOUBLE" => Ok(DataType::Float64),
547        "FLOAT32" => Ok(DataType::Float32),
548        "BOOL" | "BOOLEAN" => Ok(DataType::Bool),
549        "DATETIME" => Ok(DataType::DateTime),
550        "DATE" => Ok(DataType::Date),
551        "BTIC" => Ok(DataType::Btic),
552        "VECTOR" => Ok(DataType::Vector { dimensions: 0 }),
553        _ => Err(UniError::InvalidArgument {
554            arg: "type".into(),
555            message: format!("Unknown data type: {}", s),
556        }
557        .into()),
558    }
559}
560
561/// Split a `MAP<K, V>` inner string on the top-level comma, respecting `<>`/`()` depth so
562/// nested value types split at the right comma. Returns trimmed `(key, value)` strings.
563fn split_map_kv(inner: &str) -> Result<(String, String)> {
564    let mut depth = 0i32;
565    for (i, c) in inner.char_indices() {
566        match c {
567            '<' | '(' => depth += 1,
568            '>' | ')' => depth -= 1,
569            ',' if depth == 0 => {
570                let k = inner[..i].trim();
571                let v = inner[i + 1..].trim();
572                if k.is_empty() || v.is_empty() {
573                    return Err(UniError::InvalidArgument {
574                        arg: "type".into(),
575                        message: "MAP<K,V> requires both a key and a value type".into(),
576                    }
577                    .into());
578                }
579                return Ok((k.to_string(), v.to_string()));
580            }
581            _ => {}
582        }
583    }
584    Err(UniError::InvalidArgument {
585        arg: "type".into(),
586        message: "MAP<K,V> requires a comma separating key and value types".into(),
587    }
588    .into())
589}