use crate::api::Uni;
use std::path::Path;
use uni_common::core::schema::{
DataType, DistanceMetric, EmbeddingConfig, FullTextIndexConfig, IndexDefinition,
ScalarIndexConfig, ScalarIndexType, TokenizerConfig, VectorIndexConfig, VectorIndexType,
};
use uni_common::{Result, UniError};
#[must_use = "schema builders do nothing until .apply() or .current() is called"]
pub struct SchemaBuilder<'a> {
pub(crate) db: &'a Uni,
pending: Vec<SchemaChange>,
}
pub enum SchemaChange {
AddLabel {
name: String,
},
AddProperty {
label_or_type: String,
name: String,
data_type: DataType,
nullable: bool,
},
AddIndex(IndexDefinition),
AddEdgeType {
name: String,
from_labels: Vec<String>,
to_labels: Vec<String>,
},
}
impl<'a> SchemaBuilder<'a> {
pub fn new(db: &'a Uni) -> Self {
Self {
db,
pending: Vec::new(),
}
}
pub fn current(&self) -> std::sync::Arc<uni_common::core::schema::Schema> {
self.db.inner.schema.schema()
}
pub fn with_changes(mut self, changes: Vec<SchemaChange>) -> Self {
self.pending.extend(changes);
self
}
pub fn label(self, name: &str) -> LabelBuilder<'a> {
LabelBuilder::new(self, name.to_string())
}
pub fn edge_type(self, name: &str, from: &[&str], to: &[&str]) -> EdgeTypeBuilder<'a> {
EdgeTypeBuilder::new(
self,
name.to_string(),
from.iter().map(|s| s.to_string()).collect(),
to.iter().map(|s| s.to_string()).collect(),
)
}
pub async fn apply(self) -> Result<()> {
let manager = &self.db.inner.schema;
let mut indexes_to_build = Vec::new();
for change in self.pending {
match change {
SchemaChange::AddLabel { name } => match manager.add_label(&name) {
Ok(_) => {}
Err(e) if e.to_string().contains("already exists") => {}
Err(e) => {
return Err(UniError::Schema {
message: e.to_string(),
});
}
},
SchemaChange::AddProperty {
label_or_type,
name,
data_type,
nullable,
} => match manager.add_property(&label_or_type, &name, data_type, nullable) {
Ok(_) => {}
Err(e) if e.to_string().contains("already exists") => {}
Err(e) => {
return Err(UniError::Schema {
message: e.to_string(),
});
}
},
SchemaChange::AddIndex(idx) => {
manager
.add_index(idx.clone())
.map_err(|e| UniError::Schema {
message: e.to_string(),
})?;
indexes_to_build.push(idx.label().to_string());
}
SchemaChange::AddEdgeType {
name,
from_labels,
to_labels,
} => match manager.add_edge_type(&name, from_labels, to_labels) {
Ok(_) => {}
Err(e) if e.to_string().contains("already exists") => {}
Err(e) => {
return Err(UniError::Schema {
message: e.to_string(),
});
}
},
}
}
manager.save().await.map_err(UniError::Internal)?;
indexes_to_build.sort();
indexes_to_build.dedup();
for label in indexes_to_build {
self.db.indexes().rebuild(&label, false).await?;
}
Ok(())
}
}
#[must_use = "builders do nothing until .done() or .apply() is called"]
pub struct LabelBuilder<'a> {
builder: SchemaBuilder<'a>,
name: String,
}
impl<'a> LabelBuilder<'a> {
fn new(builder: SchemaBuilder<'a>, name: String) -> Self {
Self { builder, name }
}
pub fn property(mut self, name: &str, data_type: DataType) -> Self {
self.builder.pending.push(SchemaChange::AddProperty {
label_or_type: self.name.clone(),
name: name.to_string(),
data_type,
nullable: false,
});
self
}
pub fn property_nullable(mut self, name: &str, data_type: DataType) -> Self {
self.builder.pending.push(SchemaChange::AddProperty {
label_or_type: self.name.clone(),
name: name.to_string(),
data_type,
nullable: true,
});
self
}
pub fn vector(self, name: &str, dimensions: usize) -> Self {
self.property(name, DataType::Vector { dimensions })
}
pub fn index(mut self, property: &str, index_type: IndexType) -> Self {
let idx = match index_type {
IndexType::Vector(cfg) => IndexDefinition::Vector(VectorIndexConfig {
name: format!("idx_{}_{}", self.name, property),
label: self.name.clone(),
property: property.to_string(),
index_type: cfg.algorithm.into_internal(),
metric: cfg.metric.into_internal(),
embedding_config: cfg.embedding.map(|e| e.into_internal()),
metadata: Default::default(),
}),
IndexType::FullText => IndexDefinition::FullText(FullTextIndexConfig {
name: format!("fts_{}_{}", self.name, property),
label: self.name.clone(),
properties: vec![property.to_string()],
tokenizer: TokenizerConfig::Standard,
with_positions: true,
metadata: Default::default(),
}),
IndexType::Scalar(stype) => IndexDefinition::Scalar(ScalarIndexConfig {
name: format!("idx_{}_{}", self.name, property),
label: self.name.clone(),
properties: vec![property.to_string()],
index_type: stype.into_internal(),
where_clause: None,
metadata: Default::default(),
}),
IndexType::Inverted(config) => IndexDefinition::Inverted(config),
};
self.builder.pending.push(SchemaChange::AddIndex(idx));
self
}
pub fn done(mut self) -> SchemaBuilder<'a> {
self.builder
.pending
.insert(0, SchemaChange::AddLabel { name: self.name });
self.builder
}
pub fn label(self, name: &str) -> LabelBuilder<'a> {
self.done().label(name)
}
pub fn edge_type(self, name: &str, from: &[&str], to: &[&str]) -> EdgeTypeBuilder<'a> {
self.done().edge_type(name, from, to)
}
pub async fn apply(self) -> Result<()> {
self.done().apply().await
}
}
#[must_use = "builders do nothing until .done() or .apply() is called"]
pub struct EdgeTypeBuilder<'a> {
builder: SchemaBuilder<'a>,
name: String,
from_labels: Vec<String>,
to_labels: Vec<String>,
}
impl<'a> EdgeTypeBuilder<'a> {
fn new(
builder: SchemaBuilder<'a>,
name: String,
from_labels: Vec<String>,
to_labels: Vec<String>,
) -> Self {
Self {
builder,
name,
from_labels,
to_labels,
}
}
pub fn property(mut self, name: &str, data_type: DataType) -> Self {
self.builder.pending.push(SchemaChange::AddProperty {
label_or_type: self.name.clone(),
name: name.to_string(),
data_type,
nullable: false,
});
self
}
pub fn property_nullable(mut self, name: &str, data_type: DataType) -> Self {
self.builder.pending.push(SchemaChange::AddProperty {
label_or_type: self.name.clone(),
name: name.to_string(),
data_type,
nullable: true,
});
self
}
pub fn done(mut self) -> SchemaBuilder<'a> {
self.builder.pending.insert(
0,
SchemaChange::AddEdgeType {
name: self.name,
from_labels: self.from_labels,
to_labels: self.to_labels,
},
);
self.builder
}
pub fn label(self, name: &str) -> LabelBuilder<'a> {
self.done().label(name)
}
pub fn edge_type(self, name: &str, from: &[&str], to: &[&str]) -> EdgeTypeBuilder<'a> {
self.done().edge_type(name, from, to)
}
pub async fn apply(self) -> Result<()> {
self.done().apply().await
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LabelInfo {
pub name: String,
pub count: usize,
pub properties: Vec<PropertyInfo>,
pub indexes: Vec<IndexInfo>,
pub constraints: Vec<ConstraintInfo>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EdgeTypeInfo {
pub name: String,
pub count: usize,
pub source_labels: Vec<String>,
pub target_labels: Vec<String>,
pub properties: Vec<PropertyInfo>,
pub indexes: Vec<IndexInfo>,
pub constraints: Vec<ConstraintInfo>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PropertyInfo {
pub name: String,
pub data_type: String,
pub nullable: bool,
pub is_indexed: bool,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct IndexInfo {
pub name: String,
pub index_type: String,
pub properties: Vec<String>,
pub status: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ConstraintInfo {
pub name: String,
pub constraint_type: String,
pub properties: Vec<String>,
pub enabled: bool,
}
#[non_exhaustive]
pub enum IndexType {
Vector(VectorIndexCfg),
FullText,
Scalar(ScalarType),
Inverted(uni_common::core::schema::InvertedIndexConfig),
}
pub struct VectorIndexCfg {
pub algorithm: VectorAlgo,
pub metric: VectorMetric,
pub embedding: Option<EmbeddingCfg>,
}
pub struct EmbeddingCfg {
pub alias: String,
pub source_properties: Vec<String>,
pub batch_size: usize,
}
impl EmbeddingCfg {
fn into_internal(self) -> EmbeddingConfig {
EmbeddingConfig {
alias: self.alias,
source_properties: self.source_properties,
batch_size: self.batch_size,
}
}
}
#[non_exhaustive]
pub enum VectorAlgo {
Flat,
IvfFlat {
partitions: u32,
},
IvfPq {
partitions: u32,
sub_vectors: u32,
},
IvfSq {
partitions: u32,
},
IvfRq {
partitions: u32,
num_bits: Option<u8>,
},
Hnsw {
m: u32,
ef_construction: u32,
partitions: Option<u32>,
},
HnswFlat {
m: u32,
ef_construction: u32,
partitions: Option<u32>,
},
HnswSq {
m: u32,
ef_construction: u32,
partitions: Option<u32>,
},
HnswPq {
m: u32,
ef_construction: u32,
sub_vectors: u32,
partitions: Option<u32>,
},
}
impl VectorAlgo {
fn into_internal(self) -> VectorIndexType {
match self {
VectorAlgo::Flat => VectorIndexType::Flat,
VectorAlgo::IvfFlat { partitions } => VectorIndexType::IvfFlat {
num_partitions: partitions,
},
VectorAlgo::IvfPq {
partitions,
sub_vectors,
} => VectorIndexType::IvfPq {
num_partitions: partitions,
num_sub_vectors: sub_vectors,
bits_per_subvector: 8,
},
VectorAlgo::IvfSq { partitions } => VectorIndexType::IvfSq {
num_partitions: partitions,
},
VectorAlgo::IvfRq {
partitions,
num_bits,
} => VectorIndexType::IvfRq {
num_partitions: partitions,
num_bits,
},
VectorAlgo::HnswFlat {
m,
ef_construction,
partitions,
} => VectorIndexType::HnswFlat {
m,
ef_construction,
num_partitions: partitions,
},
VectorAlgo::Hnsw {
m,
ef_construction,
partitions,
}
| VectorAlgo::HnswSq {
m,
ef_construction,
partitions,
} => VectorIndexType::HnswSq {
m,
ef_construction,
num_partitions: partitions,
},
VectorAlgo::HnswPq {
m,
ef_construction,
sub_vectors,
partitions,
} => VectorIndexType::HnswPq {
m,
ef_construction,
num_sub_vectors: sub_vectors,
num_partitions: partitions,
},
}
}
}
#[non_exhaustive]
pub enum VectorMetric {
Cosine,
L2,
Dot,
}
impl VectorMetric {
fn into_internal(self) -> DistanceMetric {
match self {
VectorMetric::Cosine => DistanceMetric::Cosine,
VectorMetric::L2 => DistanceMetric::L2,
VectorMetric::Dot => DistanceMetric::Dot,
}
}
}
#[non_exhaustive]
pub enum ScalarType {
BTree,
Hash,
Bitmap,
LabelList,
}
impl ScalarType {
fn into_internal(self) -> ScalarIndexType {
match self {
ScalarType::BTree => ScalarIndexType::BTree,
ScalarType::Hash => ScalarIndexType::Hash,
ScalarType::Bitmap => ScalarIndexType::Bitmap,
ScalarType::LabelList => ScalarIndexType::LabelList,
}
}
}
impl Uni {
pub fn schema(&self) -> SchemaBuilder<'_> {
SchemaBuilder::new(self)
}
pub async fn load_schema(&self, path: impl AsRef<Path>) -> Result<()> {
let content = tokio::fs::read_to_string(path)
.await
.map_err(UniError::Io)?;
let schema: uni_common::core::schema::Schema =
serde_json::from_str(&content).map_err(|e| UniError::Schema {
message: e.to_string(),
})?;
self.inner.schema.replace_schema(schema);
Ok(())
}
pub async fn save_schema(&self, path: impl AsRef<Path>) -> Result<()> {
let content = serde_json::to_string_pretty(&self.inner.schema.schema()).map_err(|e| {
UniError::Schema {
message: e.to_string(),
}
})?;
tokio::fs::write(path, content)
.await
.map_err(UniError::Io)?;
Ok(())
}
}