Skip to main content

uni_db/api/
schema.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::api::Uni;
5use std::path::Path;
6use uni_common::core::schema::{
7    DataType, DistanceMetric, EmbeddingConfig, FullTextIndexConfig, IndexDefinition,
8    ScalarIndexConfig, ScalarIndexType, TokenizerConfig, VectorIndexConfig, VectorIndexType,
9};
10use uni_common::{Result, UniError};
11
12/// Builder for defining and modifying the graph schema.
13///
14/// Use this builder to define labels, edge types, properties, and indexes.
15/// Changes are batched and applied atomically when `.apply()` is called.
16///
17/// # Example
18///
19/// ```no_run
20/// # async fn example(db: &uni_db::Uni) -> uni_db::Result<()> {
21/// db.schema()
22///     .label("Person")
23///         .property("name", uni_db::DataType::String)
24///         .property("age", uni_db::DataType::Int64)
25///         .vector("embedding", 1536) // Adds property AND vector index
26///         .index("name", uni_db::IndexType::Scalar(uni_db::ScalarType::BTree))
27///     .edge_type("KNOWS", &["Person"], &["Person"])
28///         .property("since", uni_db::DataType::Date)
29///     .apply()
30///     .await?;
31/// # Ok(())
32/// # }
33/// ```
34#[must_use = "schema builders do nothing until .apply() or .current() is called"]
35pub struct SchemaBuilder<'a> {
36    pub(crate) db: &'a Uni,
37    pending: Vec<SchemaChange>,
38}
39
40pub enum SchemaChange {
41    AddLabel {
42        name: String,
43    },
44    AddProperty {
45        label_or_type: String,
46        name: String,
47        data_type: DataType,
48        nullable: bool,
49    },
50    AddIndex(IndexDefinition),
51    AddEdgeType {
52        name: String,
53        from_labels: Vec<String>,
54        to_labels: Vec<String>,
55    },
56}
57
58impl<'a> SchemaBuilder<'a> {
59    pub fn new(db: &'a Uni) -> Self {
60        Self {
61            db,
62            pending: Vec::new(),
63        }
64    }
65
66    /// Get the current schema (read-only snapshot).
67    pub fn current(&self) -> std::sync::Arc<uni_common::core::schema::Schema> {
68        self.db.inner.schema.schema()
69    }
70
71    /// Add pre-built schema changes to this builder.
72    pub fn with_changes(mut self, changes: Vec<SchemaChange>) -> Self {
73        self.pending.extend(changes);
74        self
75    }
76
77    /// Create a label (node type) in the schema.
78    ///
79    /// Labels can be **schemaless** (no properties defined) or **typed** (with properties).
80    ///
81    /// # Schemaless Labels
82    ///
83    /// Labels without property definitions support flexible, dynamic properties:
84    /// - Properties not in schema are stored in `overflow_json` (JSONB binary)
85    /// - Queries on overflow properties are automatically rewritten to JSONB functions
86    /// - No schema migration needed to add new properties
87    ///
88    /// # Example: Schemaless Label
89    ///
90    /// ```ignore
91    /// // Create label with no properties
92    /// db.schema().label("Document").apply().await?;
93    ///
94    /// // Create with arbitrary properties
95    /// db.execute("CREATE (:Document {title: 'Article', author: 'Alice', year: 2024})").await?;
96    ///
97    /// // Query works transparently (automatic query rewriting)
98    /// db.query("MATCH (d:Document) WHERE d.author = 'Alice' RETURN d.title, d.year").await?;
99    /// ```
100    ///
101    /// # Example: Typed Label with Overflow
102    ///
103    /// ```ignore
104    /// // Define core properties in schema
105    /// db.schema()
106    ///     .label("Person")
107    ///     .property("name", DataType::String)
108    ///     .property("age", DataType::Int)
109    ///     .apply().await?;
110    ///
111    /// // Can still add overflow properties dynamically
112    /// db.execute("CREATE (:Person {name: 'Bob', age: 25, city: 'NYC'})").await?;
113    /// //                                                   ^^^^^^^^^^^
114    /// //                                                   overflow property
115    ///
116    /// // Query mixing schema and overflow properties
117    /// db.query("MATCH (p:Person) WHERE p.name = 'Bob' AND p.city = 'NYC' RETURN p.age").await?;
118    /// ```
119    ///
120    /// **Performance Note**: Schema properties use typed columns (faster filtering/sorting),
121    /// while overflow properties use JSONB (flexible but slower). Use schema properties
122    /// for core, frequently-queried fields.
123    pub fn label(self, name: &str) -> LabelBuilder<'a> {
124        LabelBuilder::new(self, name.to_string())
125    }
126
127    pub fn edge_type(self, name: &str, from: &[&str], to: &[&str]) -> EdgeTypeBuilder<'a> {
128        EdgeTypeBuilder::new(
129            self,
130            name.to_string(),
131            from.iter().map(|s| s.to_string()).collect(),
132            to.iter().map(|s| s.to_string()).collect(),
133        )
134    }
135
136    pub async fn apply(self) -> Result<()> {
137        let manager = &self.db.inner.schema;
138        let mut indexes_to_build = Vec::new();
139
140        for change in self.pending {
141            match change {
142                SchemaChange::AddLabel { name } => match manager.add_label(&name) {
143                    Ok(_) => {}
144                    Err(e) if e.to_string().contains("already exists") => {}
145                    Err(e) => {
146                        return Err(UniError::Schema {
147                            message: e.to_string(),
148                        });
149                    }
150                },
151                SchemaChange::AddProperty {
152                    label_or_type,
153                    name,
154                    data_type,
155                    nullable,
156                } => match manager.add_property(&label_or_type, &name, data_type, nullable) {
157                    Ok(_) => {}
158                    Err(e) if e.to_string().contains("already exists") => {}
159                    Err(e) => {
160                        return Err(UniError::Schema {
161                            message: e.to_string(),
162                        });
163                    }
164                },
165                SchemaChange::AddIndex(idx) => {
166                    manager
167                        .add_index(idx.clone())
168                        .map_err(|e| UniError::Schema {
169                            message: e.to_string(),
170                        })?;
171                    // Track index to trigger build after saving schema
172                    indexes_to_build.push(idx.label().to_string());
173                }
174                SchemaChange::AddEdgeType {
175                    name,
176                    from_labels,
177                    to_labels,
178                } => match manager.add_edge_type(&name, from_labels, to_labels) {
179                    Ok(_) => {}
180                    Err(e) if e.to_string().contains("already exists") => {}
181                    Err(e) => {
182                        return Err(UniError::Schema {
183                            message: e.to_string(),
184                        });
185                    }
186                },
187            }
188        }
189
190        manager.save().await.map_err(UniError::Internal)?;
191
192        // Trigger index builds for affected labels
193        // We use a set to avoid rebuilding same label multiple times if multiple indexes added
194        indexes_to_build.sort();
195        indexes_to_build.dedup();
196        for label in indexes_to_build {
197            // Trigger async rebuild
198            // Note: If synchronous behavior is desired, pass false.
199            // But usually schema changes should be fast, so async build is better?
200            // The prompt says "Indexes Not Built During Schema Changes", implying they should be.
201            // Let's do it synchronously to ensure they are ready, matching user expectation.
202            self.db.indexes().rebuild(&label, false).await?;
203        }
204
205        Ok(())
206    }
207}
208
209#[must_use = "builders do nothing until .done() or .apply() is called"]
210pub struct LabelBuilder<'a> {
211    builder: SchemaBuilder<'a>,
212    name: String,
213}
214
215impl<'a> LabelBuilder<'a> {
216    fn new(builder: SchemaBuilder<'a>, name: String) -> Self {
217        Self { builder, name }
218    }
219
220    pub fn property(mut self, name: &str, data_type: DataType) -> Self {
221        self.builder.pending.push(SchemaChange::AddProperty {
222            label_or_type: self.name.clone(),
223            name: name.to_string(),
224            data_type,
225            nullable: false,
226        });
227        self
228    }
229
230    pub fn property_nullable(mut self, name: &str, data_type: DataType) -> Self {
231        self.builder.pending.push(SchemaChange::AddProperty {
232            label_or_type: self.name.clone(),
233            name: name.to_string(),
234            data_type,
235            nullable: true,
236        });
237        self
238    }
239
240    pub fn vector(self, name: &str, dimensions: usize) -> Self {
241        self.property(name, DataType::Vector { dimensions })
242    }
243
244    pub fn index(mut self, property: &str, index_type: IndexType) -> Self {
245        let idx = match index_type {
246            IndexType::Vector(cfg) => IndexDefinition::Vector(VectorIndexConfig {
247                name: format!("idx_{}_{}", self.name, property),
248                label: self.name.clone(),
249                property: property.to_string(),
250                index_type: cfg.algorithm.into_internal(),
251                metric: cfg.metric.into_internal(),
252                embedding_config: cfg.embedding.map(|e| e.into_internal()),
253                metadata: Default::default(),
254            }),
255            IndexType::FullText => IndexDefinition::FullText(FullTextIndexConfig {
256                name: format!("fts_{}_{}", self.name, property),
257                label: self.name.clone(),
258                properties: vec![property.to_string()],
259                tokenizer: TokenizerConfig::Standard,
260                with_positions: true,
261                metadata: Default::default(),
262            }),
263            IndexType::Scalar(stype) => IndexDefinition::Scalar(ScalarIndexConfig {
264                name: format!("idx_{}_{}", self.name, property),
265                label: self.name.clone(),
266                properties: vec![property.to_string()],
267                index_type: stype.into_internal(),
268                where_clause: None,
269                metadata: Default::default(),
270            }),
271            IndexType::Inverted(config) => IndexDefinition::Inverted(config),
272        };
273        self.builder.pending.push(SchemaChange::AddIndex(idx));
274        self
275    }
276
277    pub fn done(mut self) -> SchemaBuilder<'a> {
278        self.builder
279            .pending
280            .insert(0, SchemaChange::AddLabel { name: self.name });
281        self.builder
282    }
283
284    // Chaining
285    pub fn label(self, name: &str) -> LabelBuilder<'a> {
286        self.done().label(name)
287    }
288
289    pub fn edge_type(self, name: &str, from: &[&str], to: &[&str]) -> EdgeTypeBuilder<'a> {
290        self.done().edge_type(name, from, to)
291    }
292
293    pub async fn apply(self) -> Result<()> {
294        self.done().apply().await
295    }
296}
297
298#[must_use = "builders do nothing until .done() or .apply() is called"]
299pub struct EdgeTypeBuilder<'a> {
300    builder: SchemaBuilder<'a>,
301    name: String,
302    from_labels: Vec<String>,
303    to_labels: Vec<String>,
304}
305
306impl<'a> EdgeTypeBuilder<'a> {
307    fn new(
308        builder: SchemaBuilder<'a>,
309        name: String,
310        from_labels: Vec<String>,
311        to_labels: Vec<String>,
312    ) -> Self {
313        Self {
314            builder,
315            name,
316            from_labels,
317            to_labels,
318        }
319    }
320
321    pub fn property(mut self, name: &str, data_type: DataType) -> Self {
322        self.builder.pending.push(SchemaChange::AddProperty {
323            label_or_type: self.name.clone(),
324            name: name.to_string(),
325            data_type,
326            nullable: false,
327        });
328        self
329    }
330
331    pub fn property_nullable(mut self, name: &str, data_type: DataType) -> Self {
332        self.builder.pending.push(SchemaChange::AddProperty {
333            label_or_type: self.name.clone(),
334            name: name.to_string(),
335            data_type,
336            nullable: true,
337        });
338        self
339    }
340
341    pub fn done(mut self) -> SchemaBuilder<'a> {
342        self.builder.pending.insert(
343            0,
344            SchemaChange::AddEdgeType {
345                name: self.name,
346                from_labels: self.from_labels,
347                to_labels: self.to_labels,
348            },
349        );
350        self.builder
351    }
352
353    pub fn label(self, name: &str) -> LabelBuilder<'a> {
354        self.done().label(name)
355    }
356
357    pub fn edge_type(self, name: &str, from: &[&str], to: &[&str]) -> EdgeTypeBuilder<'a> {
358        self.done().edge_type(name, from, to)
359    }
360
361    pub async fn apply(self) -> Result<()> {
362        self.done().apply().await
363    }
364}
365
366#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
367pub struct LabelInfo {
368    pub name: String,
369    pub count: usize,
370    pub properties: Vec<PropertyInfo>,
371    pub indexes: Vec<IndexInfo>,
372    pub constraints: Vec<ConstraintInfo>,
373}
374
375#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
376pub struct EdgeTypeInfo {
377    pub name: String,
378    pub count: usize,
379    pub source_labels: Vec<String>,
380    pub target_labels: Vec<String>,
381    pub properties: Vec<PropertyInfo>,
382    pub indexes: Vec<IndexInfo>,
383    pub constraints: Vec<ConstraintInfo>,
384}
385
386#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
387pub struct PropertyInfo {
388    pub name: String,
389    pub data_type: String,
390    pub nullable: bool,
391    pub is_indexed: bool,
392}
393
394#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
395pub struct IndexInfo {
396    pub name: String,
397    pub index_type: String,
398    pub properties: Vec<String>,
399    pub status: String,
400}
401
402#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
403pub struct ConstraintInfo {
404    pub name: String,
405    pub constraint_type: String,
406    pub properties: Vec<String>,
407    pub enabled: bool,
408}
409
410#[non_exhaustive]
411pub enum IndexType {
412    Vector(VectorIndexCfg),
413    FullText,
414    Scalar(ScalarType),
415    Inverted(uni_common::core::schema::InvertedIndexConfig),
416}
417
418pub struct VectorIndexCfg {
419    pub algorithm: VectorAlgo,
420    pub metric: VectorMetric,
421    pub embedding: Option<EmbeddingCfg>,
422}
423
424/// Embedding configuration for auto-embedding during index writes.
425pub struct EmbeddingCfg {
426    /// Model alias from the Uni-Xervo catalog (for example: "embed/default").
427    pub alias: String,
428    pub source_properties: Vec<String>,
429    pub batch_size: usize,
430}
431
432impl EmbeddingCfg {
433    fn into_internal(self) -> EmbeddingConfig {
434        EmbeddingConfig {
435            alias: self.alias,
436            source_properties: self.source_properties,
437            batch_size: self.batch_size,
438        }
439    }
440}
441
442#[non_exhaustive]
443pub enum VectorAlgo {
444    Hnsw { m: u32, ef_construction: u32 },
445    IvfPq { partitions: u32, sub_vectors: u32 },
446    Flat,
447}
448
449impl VectorAlgo {
450    fn into_internal(self) -> VectorIndexType {
451        match self {
452            VectorAlgo::Hnsw { m, ef_construction } => VectorIndexType::Hnsw {
453                m,
454                ef_construction,
455                ef_search: 50,
456            },
457            VectorAlgo::IvfPq {
458                partitions,
459                sub_vectors,
460            } => VectorIndexType::IvfPq {
461                num_partitions: partitions,
462                num_sub_vectors: sub_vectors,
463                bits_per_subvector: 8,
464            },
465            VectorAlgo::Flat => VectorIndexType::Flat,
466        }
467    }
468}
469
470#[non_exhaustive]
471pub enum VectorMetric {
472    Cosine,
473    L2,
474    Dot,
475}
476
477impl VectorMetric {
478    fn into_internal(self) -> DistanceMetric {
479        match self {
480            VectorMetric::Cosine => DistanceMetric::Cosine,
481            VectorMetric::L2 => DistanceMetric::L2,
482            VectorMetric::Dot => DistanceMetric::Dot,
483        }
484    }
485}
486
487#[non_exhaustive]
488pub enum ScalarType {
489    BTree,
490    Hash,
491    Bitmap,
492}
493
494impl ScalarType {
495    fn into_internal(self) -> ScalarIndexType {
496        match self {
497            ScalarType::BTree => ScalarIndexType::BTree,
498            ScalarType::Hash => ScalarIndexType::Hash,
499            ScalarType::Bitmap => ScalarIndexType::Bitmap,
500        }
501    }
502}
503
504impl Uni {
505    pub fn schema(&self) -> SchemaBuilder<'_> {
506        SchemaBuilder::new(self)
507    }
508
509    pub async fn load_schema(&self, path: impl AsRef<Path>) -> Result<()> {
510        // We can't easily "replace" the SchemaManager's schema in-place if it's already Arc-ed around.
511        // But SchemaManager has internal RwLock<Schema>.
512        // Let's check if we can add a method to SchemaManager to reload.
513        let content = tokio::fs::read_to_string(path)
514            .await
515            .map_err(UniError::Io)?;
516        let schema: uni_common::core::schema::Schema =
517            serde_json::from_str(&content).map_err(|e| UniError::Schema {
518                message: e.to_string(),
519            })?;
520
521        // We need a way to update the schema in SchemaManager.
522        // I'll add a `replace_schema` or similar to SchemaManager.
523        self.inner.schema.replace_schema(schema);
524        Ok(())
525    }
526
527    pub async fn save_schema(&self, path: impl AsRef<Path>) -> Result<()> {
528        let content = serde_json::to_string_pretty(&self.inner.schema.schema()).map_err(|e| {
529            UniError::Schema {
530                message: e.to_string(),
531            }
532        })?;
533        tokio::fs::write(path, content)
534            .await
535            .map_err(UniError::Io)?;
536        Ok(())
537    }
538}