use crate::core::edge_type::{MAX_SCHEMA_TYPE_ID, is_schemaless_edge_type, make_schemaless_id};
use crate::sync::{acquire_read, acquire_write};
use anyhow::{Result, anyhow};
use chrono::{DateTime, Utc};
use object_store::ObjectStore;
use object_store::local::LocalFileSystem;
use object_store::path::Path as ObjectStorePath;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, RwLock};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum SchemaElementState {
Active,
Hidden {
since: DateTime<Utc>,
last_active_snapshot: String, },
Tombstone {
since: DateTime<Utc>,
},
}
use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
pub fn datetime_struct_fields() -> Fields {
Fields::from(vec![
Field::new(
"nanos_since_epoch",
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
Field::new("offset_seconds", ArrowDataType::Int32, true),
Field::new("timezone_name", ArrowDataType::Utf8, true),
])
}
pub fn time_struct_fields() -> Fields {
Fields::from(vec![
Field::new(
"nanos_since_midnight",
ArrowDataType::Time64(TimeUnit::Nanosecond),
true,
),
Field::new("offset_seconds", ArrowDataType::Int32, true),
])
}
pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
}
pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum CrdtType {
GCounter,
GSet,
ORSet,
LWWRegister,
LWWMap,
Rga,
VectorClock,
VCRegister,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
pub enum PointType {
Geographic, Cartesian2D, Cartesian3D, }
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum DataType {
String,
Int32,
Int64,
Float32,
Float64,
Bool,
Timestamp,
Date,
Time,
DateTime,
Duration,
CypherValue,
Point(PointType),
Vector { dimensions: usize },
Btic,
Crdt(CrdtType),
List(Box<DataType>),
Map(Box<DataType>, Box<DataType>),
}
impl DataType {
#[allow(non_upper_case_globals)]
pub const Float: DataType = DataType::Float64;
#[allow(non_upper_case_globals)]
pub const Int: DataType = DataType::Int64;
pub fn to_arrow(&self) -> ArrowDataType {
match self {
DataType::String => ArrowDataType::Utf8,
DataType::Int32 => ArrowDataType::Int32,
DataType::Int64 => ArrowDataType::Int64,
DataType::Float32 => ArrowDataType::Float32,
DataType::Float64 => ArrowDataType::Float64,
DataType::Bool => ArrowDataType::Boolean,
DataType::Timestamp => {
ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
}
DataType::Date => ArrowDataType::Date32,
DataType::Time => ArrowDataType::Struct(time_struct_fields()),
DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
Field::new("latitude", ArrowDataType::Float64, false),
Field::new("longitude", ArrowDataType::Float64, false),
Field::new("crs", ArrowDataType::Utf8, false),
])),
PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
Field::new("x", ArrowDataType::Float64, false),
Field::new("y", ArrowDataType::Float64, false),
Field::new("crs", ArrowDataType::Utf8, false),
])),
PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
Field::new("x", ArrowDataType::Float64, false),
Field::new("y", ArrowDataType::Float64, false),
Field::new("z", ArrowDataType::Float64, false),
Field::new("crs", ArrowDataType::Utf8, false),
])),
},
DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
Arc::new(Field::new("item", ArrowDataType::Float32, true)),
*dimensions as i32,
),
DataType::Btic => ArrowDataType::FixedSizeBinary(24),
DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
}
DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
"item",
ArrowDataType::Struct(Fields::from(vec![
Field::new("key", key.to_arrow(), false),
Field::new("value", value.to_arrow(), true),
])),
true,
))),
}
}
}
fn default_created_at() -> DateTime<Utc> {
Utc::now()
}
fn default_state() -> SchemaElementState {
SchemaElementState::Active
}
fn default_version_1() -> u32 {
1
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PropertyMeta {
pub r#type: DataType,
pub nullable: bool,
#[serde(default = "default_version_1")]
pub added_in: u32, #[serde(default = "default_state")]
pub state: SchemaElementState,
#[serde(default)]
pub generation_expression: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LabelMeta {
pub id: u16, #[serde(default = "default_created_at")]
pub created_at: DateTime<Utc>,
#[serde(default = "default_state")]
pub state: SchemaElementState,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EdgeTypeMeta {
pub id: u32,
pub src_labels: Vec<String>,
pub dst_labels: Vec<String>,
#[serde(default = "default_state")]
pub state: SchemaElementState,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum ConstraintType {
Unique { properties: Vec<String> },
Exists { property: String },
Check { expression: String },
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum ConstraintTarget {
Label(String),
EdgeType(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Constraint {
pub name: String,
pub constraint_type: ConstraintType,
pub target: ConstraintTarget,
pub enabled: bool,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SchemalessEdgeTypeRegistry {
name_to_id: HashMap<String, u32>,
id_to_name: HashMap<u32, String>,
next_local_id: u32,
}
impl SchemalessEdgeTypeRegistry {
pub fn new() -> Self {
Self {
name_to_id: HashMap::new(),
id_to_name: HashMap::new(),
next_local_id: 1,
}
}
pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
if let Some(&id) = self.name_to_id.get(type_name) {
return id;
}
let id = make_schemaless_id(self.next_local_id);
self.next_local_id += 1;
self.name_to_id.insert(type_name.to_string(), id);
self.id_to_name.insert(id, type_name.to_string());
id
}
pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
self.id_to_name.get(&type_id).map(String::as_str)
}
pub fn contains(&self, type_name: &str) -> bool {
self.name_to_id.contains_key(type_name)
}
pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
self.name_to_id
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case(type_name))
.map(|(_, &id)| id)
}
pub fn all_type_ids(&self) -> Vec<u32> {
self.id_to_name.keys().copied().collect()
}
pub fn is_empty(&self) -> bool {
self.name_to_id.is_empty()
}
}
impl Default for SchemalessEdgeTypeRegistry {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Schema {
pub schema_version: u32,
pub labels: HashMap<String, LabelMeta>,
pub edge_types: HashMap<String, EdgeTypeMeta>,
pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
#[serde(default)]
pub indexes: Vec<IndexDefinition>,
#[serde(default)]
pub constraints: Vec<Constraint>,
#[serde(default)]
pub schemaless_registry: SchemalessEdgeTypeRegistry,
}
impl Default for Schema {
fn default() -> Self {
Self {
schema_version: 1,
labels: HashMap::new(),
edge_types: HashMap::new(),
properties: HashMap::new(),
indexes: Vec::new(),
constraints: Vec::new(),
schemaless_registry: SchemalessEdgeTypeRegistry::new(),
}
}
}
impl Schema {
pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
self.labels
.iter()
.find(|(_, meta)| meta.id == label_id)
.map(|(name, _)| name.as_str())
}
pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
self.labels.get(label_name).map(|meta| meta.id)
}
pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
self.edge_types
.iter()
.find(|(_, meta)| meta.id == type_id)
.map(|(name, _)| name.as_str())
}
pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
self.edge_types.get(type_name).map(|meta| meta.id)
}
pub fn vector_index_for_property(
&self,
label: &str,
property: &str,
) -> Option<&VectorIndexConfig> {
self.indexes.iter().find_map(|idx| {
if let IndexDefinition::Vector(config) = idx
&& config.label == label
&& config.property == property
&& config.metadata.status == IndexStatus::Online
{
return Some(config);
}
None
})
}
pub fn fulltext_index_for_property(
&self,
label: &str,
property: &str,
) -> Option<&FullTextIndexConfig> {
self.indexes.iter().find_map(|idx| {
if let IndexDefinition::FullText(config) = idx
&& config.label == label
&& config.properties.iter().any(|p| p == property)
&& config.metadata.status == IndexStatus::Online
{
return Some(config);
}
None
})
}
pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
self.labels
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case(name))
.map(|(_, v)| v)
}
pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
self.get_label_case_insensitive(label_name)
.map(|meta| meta.id)
}
pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
self.edge_types
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case(name))
.map(|(_, v)| v)
}
pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
self.get_edge_type_case_insensitive(type_name)
.map(|meta| meta.id)
}
pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
self.edge_type_id_by_name_case_insensitive(type_name)
.or_else(|| {
self.schemaless_registry
.id_by_name_case_insensitive(type_name)
})
}
pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
if let Some(id) = self.edge_type_id_by_name(type_name) {
return id;
}
self.schemaless_registry.get_or_assign_id(type_name)
}
pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
if is_schemaless_edge_type(type_id) {
self.schemaless_registry
.type_name_by_id(type_id)
.map(str::to_owned)
} else {
self.edge_type_name_by_id(type_id).map(str::to_owned)
}
}
pub fn all_edge_type_ids(&self) -> Vec<u32> {
let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
ids.extend(self.schemaless_registry.all_type_ids());
ids.sort_unstable();
ids
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub enum IndexStatus {
#[default]
Online,
Building,
Stale,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct IndexMetadata {
#[serde(default)]
pub status: IndexStatus,
#[serde(default)]
pub last_built_at: Option<DateTime<Utc>>,
#[serde(default)]
pub row_count_at_build: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
#[non_exhaustive]
pub enum IndexDefinition {
Vector(VectorIndexConfig),
FullText(FullTextIndexConfig),
Scalar(ScalarIndexConfig),
Inverted(InvertedIndexConfig),
JsonFullText(JsonFtsIndexConfig),
}
impl IndexDefinition {
pub fn name(&self) -> &str {
match self {
IndexDefinition::Vector(c) => &c.name,
IndexDefinition::FullText(c) => &c.name,
IndexDefinition::Scalar(c) => &c.name,
IndexDefinition::Inverted(c) => &c.name,
IndexDefinition::JsonFullText(c) => &c.name,
}
}
pub fn label(&self) -> &str {
match self {
IndexDefinition::Vector(c) => &c.label,
IndexDefinition::FullText(c) => &c.label,
IndexDefinition::Scalar(c) => &c.label,
IndexDefinition::Inverted(c) => &c.label,
IndexDefinition::JsonFullText(c) => &c.label,
}
}
pub fn metadata(&self) -> &IndexMetadata {
match self {
IndexDefinition::Vector(c) => &c.metadata,
IndexDefinition::FullText(c) => &c.metadata,
IndexDefinition::Scalar(c) => &c.metadata,
IndexDefinition::Inverted(c) => &c.metadata,
IndexDefinition::JsonFullText(c) => &c.metadata,
}
}
pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
match self {
IndexDefinition::Vector(c) => &mut c.metadata,
IndexDefinition::FullText(c) => &mut c.metadata,
IndexDefinition::Scalar(c) => &mut c.metadata,
IndexDefinition::Inverted(c) => &mut c.metadata,
IndexDefinition::JsonFullText(c) => &mut c.metadata,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct InvertedIndexConfig {
pub name: String,
pub label: String,
pub property: String,
#[serde(default = "default_normalize")]
pub normalize: bool,
#[serde(default = "default_max_terms_per_doc")]
pub max_terms_per_doc: usize,
#[serde(default)]
pub metadata: IndexMetadata,
}
fn default_normalize() -> bool {
true
}
fn default_max_terms_per_doc() -> usize {
10_000
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct VectorIndexConfig {
pub name: String,
pub label: String,
pub property: String,
pub index_type: VectorIndexType,
pub metric: DistanceMetric,
pub embedding_config: Option<EmbeddingConfig>,
#[serde(default)]
pub metadata: IndexMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct EmbeddingConfig {
pub alias: String,
pub source_properties: Vec<String>,
pub batch_size: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum VectorIndexType {
Flat,
IvfFlat {
num_partitions: u32,
},
IvfPq {
num_partitions: u32,
num_sub_vectors: u32,
bits_per_subvector: u8,
},
IvfSq {
num_partitions: u32,
},
IvfRq {
num_partitions: u32,
#[serde(default)]
num_bits: Option<u8>,
},
HnswFlat {
m: u32,
ef_construction: u32,
#[serde(default)]
num_partitions: Option<u32>,
},
HnswSq {
m: u32,
ef_construction: u32,
#[serde(default)]
num_partitions: Option<u32>,
},
HnswPq {
m: u32,
ef_construction: u32,
num_sub_vectors: u32,
#[serde(default)]
num_partitions: Option<u32>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum DistanceMetric {
Cosine,
L2,
Dot,
}
impl DistanceMetric {
pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
assert_eq!(a.len(), b.len(), "vector dimension mismatch");
match self {
DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
DistanceMetric::Cosine => {
let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
let denom = norm_a * norm_b;
if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
}
DistanceMetric::Dot => {
let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
-dot
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FullTextIndexConfig {
pub name: String,
pub label: String,
pub properties: Vec<String>,
pub tokenizer: TokenizerConfig,
pub with_positions: bool,
#[serde(default)]
pub metadata: IndexMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum TokenizerConfig {
Standard,
Whitespace,
Ngram { min: u8, max: u8 },
Custom { name: String },
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct JsonFtsIndexConfig {
pub name: String,
pub label: String,
pub column: String,
#[serde(default)]
pub paths: Vec<String>,
#[serde(default)]
pub with_positions: bool,
#[serde(default)]
pub metadata: IndexMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ScalarIndexConfig {
pub name: String,
pub label: String,
pub properties: Vec<String>,
pub index_type: ScalarIndexType,
pub where_clause: Option<String>,
#[serde(default)]
pub metadata: IndexMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum ScalarIndexType {
BTree,
Hash,
Bitmap,
LabelList,
}
pub struct SchemaManager {
store: Arc<dyn ObjectStore>,
path: ObjectStorePath,
schema: RwLock<Arc<Schema>>,
}
impl SchemaManager {
pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
let parent = path
.parent()
.ok_or_else(|| anyhow!("Invalid schema path"))?;
let filename = path
.file_name()
.ok_or_else(|| anyhow!("Invalid schema filename"))?
.to_str()
.ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
let obj_path = ObjectStorePath::from(filename);
Self::load_from_store(store, &obj_path).await
}
pub async fn load_from_store(
store: Arc<dyn ObjectStore>,
path: &ObjectStorePath,
) -> Result<Self> {
match store.get(path).await {
Ok(result) => {
let bytes = result.bytes().await?;
let content = String::from_utf8(bytes.to_vec())?;
let schema: Schema = serde_json::from_str(&content)?;
Ok(Self {
store,
path: path.clone(),
schema: RwLock::new(Arc::new(schema)),
})
}
Err(object_store::Error::NotFound { .. }) => Ok(Self {
store,
path: path.clone(),
schema: RwLock::new(Arc::new(Schema::default())),
}),
Err(e) => Err(anyhow::Error::from(e)),
}
}
pub async fn save(&self) -> Result<()> {
let content = {
let schema_guard = acquire_read(&self.schema, "schema")?;
serde_json::to_string_pretty(&**schema_guard)?
};
self.store
.put(&self.path, content.into())
.await
.map_err(anyhow::Error::from)?;
Ok(())
}
pub fn path(&self) -> &ObjectStorePath {
&self.path
}
pub fn schema(&self) -> Arc<Schema> {
self.schema
.read()
.expect("Schema lock poisoned - a thread panicked while holding it")
.clone()
}
fn normalize_function_names(expr: &str) -> String {
let mut result = String::with_capacity(expr.len());
let mut chars = expr.chars().peekable();
while let Some(ch) = chars.next() {
if ch.is_alphabetic() {
let mut ident = String::new();
ident.push(ch);
while let Some(&next) = chars.peek() {
if next.is_alphanumeric() || next == '_' {
ident.push(chars.next().unwrap());
} else {
break;
}
}
if chars.peek() == Some(&'(') {
result.push_str(&ident.to_uppercase());
} else {
result.push_str(&ident); }
} else {
result.push(ch);
}
}
result
}
pub fn generated_column_name(expr: &str) -> String {
let normalized = Self::normalize_function_names(expr);
let sanitized = normalized
.replace(|c: char| !c.is_alphanumeric(), "_")
.trim_matches('_')
.to_string();
const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
const FNV_PRIME: u64 = 1099511628211;
let mut hash = FNV_OFFSET_BASIS;
for byte in normalized.as_bytes() {
hash ^= *byte as u64;
hash = hash.wrapping_mul(FNV_PRIME);
}
format!("_gen_{}_{:x}", sanitized, hash)
}
pub fn replace_schema(&self, new_schema: Schema) {
let mut schema = self
.schema
.write()
.expect("Schema lock poisoned - a thread panicked while holding it");
*schema = Arc::new(new_schema);
}
pub fn next_label_id(&self) -> u16 {
self.schema()
.labels
.values()
.map(|l| l.id)
.max()
.unwrap_or(0)
+ 1
}
pub fn next_type_id(&self) -> u32 {
let max_schema_id = self
.schema()
.edge_types
.values()
.map(|t| t.id)
.max()
.unwrap_or(0);
if max_schema_id >= MAX_SCHEMA_TYPE_ID {
panic!("Schema edge type ID exhaustion");
}
max_schema_id + 1
}
pub fn add_label(&self, name: &str) -> Result<u16> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if schema.labels.contains_key(name) {
return Err(anyhow!("Label '{}' already exists", name));
}
let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
schema.labels.insert(
name.to_string(),
LabelMeta {
id,
created_at: Utc::now(),
state: SchemaElementState::Active,
},
);
Ok(id)
}
pub fn add_edge_type(
&self,
name: &str,
src_labels: Vec<String>,
dst_labels: Vec<String>,
) -> Result<u32> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if schema.edge_types.contains_key(name) {
return Err(anyhow!("Edge type '{}' already exists", name));
}
let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
if id >= MAX_SCHEMA_TYPE_ID {
return Err(anyhow!("Schema edge type ID exhaustion"));
}
schema.edge_types.insert(
name.to_string(),
EdgeTypeMeta {
id,
src_labels,
dst_labels,
state: SchemaElementState::Active,
},
);
Ok(id)
}
pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
let mut guard = acquire_write(&self.schema, "schema")
.expect("Schema lock poisoned - a thread panicked while holding it");
let schema = Arc::make_mut(&mut *guard);
schema.get_or_assign_edge_type_id(type_name)
}
pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
let schema = acquire_read(&self.schema, "schema")
.expect("Schema lock poisoned - a thread panicked while holding it");
schema.edge_type_name_by_id_unified(type_id)
}
pub fn add_property(
&self,
label_or_type: &str,
prop_name: &str,
data_type: DataType,
nullable: bool,
) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
let version = schema.schema_version;
let props = schema
.properties
.entry(label_or_type.to_string())
.or_default();
if props.contains_key(prop_name) {
return Err(anyhow!(
"Property '{}' already exists for '{}'",
prop_name,
label_or_type
));
}
props.insert(
prop_name.to_string(),
PropertyMeta {
r#type: data_type,
nullable,
added_in: version,
state: SchemaElementState::Active,
generation_expression: None,
},
);
Ok(())
}
pub fn add_generated_property(
&self,
label_or_type: &str,
prop_name: &str,
data_type: DataType,
expr: String,
) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
let version = schema.schema_version;
let props = schema
.properties
.entry(label_or_type.to_string())
.or_default();
if props.contains_key(prop_name) {
return Err(anyhow!("Property '{}' already exists", prop_name));
}
props.insert(
prop_name.to_string(),
PropertyMeta {
r#type: data_type,
nullable: true,
added_in: version,
state: SchemaElementState::Active,
generation_expression: Some(expr),
},
);
Ok(())
}
pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
schema.indexes.push(index_def);
Ok(())
}
pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
let schema = self.schema.read().expect("Schema lock poisoned");
schema.indexes.iter().find(|i| i.name() == name).cloned()
}
pub fn update_index_metadata(
&self,
index_name: &str,
f: impl FnOnce(&mut IndexMetadata),
) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
let idx = schema
.indexes
.iter_mut()
.find(|i| i.name() == index_name)
.ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
f(idx.metadata_mut());
Ok(())
}
pub fn remove_index(&self, name: &str) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
schema.indexes.remove(pos);
Ok(())
} else {
Err(anyhow!("Index '{}' not found", name))
}
}
pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if schema.constraints.iter().any(|c| c.name == constraint.name) {
return Err(anyhow!("Constraint '{}' already exists", constraint.name));
}
schema.constraints.push(constraint);
Ok(())
}
pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
schema.constraints.remove(pos);
Ok(())
} else if if_exists {
Ok(())
} else {
Err(anyhow!("Constraint '{}' not found", name))
}
}
pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if let Some(props) = schema.properties.get_mut(label_or_type) {
if props.remove(prop_name).is_some() {
Ok(())
} else {
Err(anyhow!(
"Property '{}' not found for '{}'",
prop_name,
label_or_type
))
}
} else {
Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
}
}
pub fn rename_property(
&self,
label_or_type: &str,
old_name: &str,
new_name: &str,
) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if let Some(props) = schema.properties.get_mut(label_or_type) {
if let Some(meta) = props.remove(old_name) {
if props.contains_key(new_name) {
props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
}
props.insert(new_name.to_string(), meta);
Ok(())
} else {
Err(anyhow!(
"Property '{}' not found for '{}'",
old_name,
label_or_type
))
}
} else {
Err(anyhow!("Label or Edge Type '{}' not found", label_or_type))
}
}
pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if let Some(label_meta) = schema.labels.get_mut(name) {
label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
Ok(())
} else if if_exists {
Ok(())
} else {
Err(anyhow!("Label '{}' not found", name))
}
}
pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if let Some(edge_meta) = schema.edge_types.get_mut(name) {
edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
Ok(())
} else if if_exists {
Ok(())
} else {
Err(anyhow!("Edge Type '{}' not found", name))
}
}
}
pub fn validate_identifier(name: &str) -> Result<()> {
if name.is_empty() || name.len() > 64 {
return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
}
let first = name.chars().next().unwrap();
if !first.is_alphabetic() && first != '_' {
return Err(anyhow!(
"Identifier '{}' must start with letter or underscore",
name
));
}
if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
return Err(anyhow!(
"Identifier '{}' must contain only alphanumeric and underscore",
name
));
}
const RESERVED: &[&str] = &[
"MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
"UNION", "ORDER", "LIMIT",
];
if RESERVED.contains(&name.to_uppercase().as_str()) {
return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::local::LocalFileSystem;
use tempfile::tempdir;
#[tokio::test]
async fn test_schema_management() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let path = ObjectStorePath::from("schema.json");
let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
let lid = manager.add_label("Person")?;
assert_eq!(lid, 1);
assert!(manager.add_label("Person").is_err());
manager.add_property("Person", "name", DataType::String, false)?;
assert!(
manager
.add_property("Person", "name", DataType::String, false)
.is_err()
);
let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
assert_eq!(tid, 1);
manager.save().await?;
assert!(store.get(&path).await.is_ok());
let manager2 = SchemaManager::load_from_store(store, &path).await?;
assert!(manager2.schema().labels.contains_key("Person"));
assert!(
manager2
.schema()
.properties
.get("Person")
.unwrap()
.contains_key("name")
);
Ok(())
}
#[test]
fn test_normalize_function_names() {
assert_eq!(
SchemaManager::normalize_function_names("lower(email)"),
"LOWER(email)"
);
assert_eq!(
SchemaManager::normalize_function_names("LOWER(email)"),
"LOWER(email)"
);
assert_eq!(
SchemaManager::normalize_function_names("Lower(email)"),
"LOWER(email)"
);
assert_eq!(
SchemaManager::normalize_function_names("trim(lower(email))"),
"TRIM(LOWER(email))"
);
}
#[test]
fn test_generated_column_name_case_insensitive() {
let col1 = SchemaManager::generated_column_name("lower(email)");
let col2 = SchemaManager::generated_column_name("LOWER(email)");
let col3 = SchemaManager::generated_column_name("Lower(email)");
assert_eq!(col1, col2);
assert_eq!(col2, col3);
assert!(col1.starts_with("_gen_LOWER_email_"));
}
#[test]
fn test_index_metadata_serde_backward_compat() {
let json = r#"{
"type": "Scalar",
"name": "idx_person_name",
"label": "Person",
"properties": ["name"],
"index_type": "BTree",
"where_clause": null
}"#;
let def: IndexDefinition = serde_json::from_str(json).unwrap();
let meta = def.metadata();
assert_eq!(meta.status, IndexStatus::Online);
assert!(meta.last_built_at.is_none());
assert!(meta.row_count_at_build.is_none());
}
#[test]
fn test_index_metadata_serde_roundtrip() {
let now = Utc::now();
let def = IndexDefinition::Scalar(ScalarIndexConfig {
name: "idx_test".to_string(),
label: "Test".to_string(),
properties: vec!["prop".to_string()],
index_type: ScalarIndexType::BTree,
where_clause: None,
metadata: IndexMetadata {
status: IndexStatus::Building,
last_built_at: Some(now),
row_count_at_build: Some(42),
},
});
let json = serde_json::to_string(&def).unwrap();
let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.metadata().status, IndexStatus::Building);
assert_eq!(parsed.metadata().row_count_at_build, Some(42));
assert!(parsed.metadata().last_built_at.is_some());
}
#[tokio::test]
async fn test_update_index_metadata() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let path = ObjectStorePath::from("schema.json");
let manager = SchemaManager::load_from_store(store, &path).await?;
manager.add_label("Person")?;
let idx = IndexDefinition::Scalar(ScalarIndexConfig {
name: "idx_test".to_string(),
label: "Person".to_string(),
properties: vec!["name".to_string()],
index_type: ScalarIndexType::BTree,
where_clause: None,
metadata: Default::default(),
});
manager.add_index(idx)?;
let initial = manager.get_index("idx_test").unwrap();
assert_eq!(initial.metadata().status, IndexStatus::Online);
manager.update_index_metadata("idx_test", |m| {
m.status = IndexStatus::Building;
m.row_count_at_build = Some(100);
})?;
let updated = manager.get_index("idx_test").unwrap();
assert_eq!(updated.metadata().status, IndexStatus::Building);
assert_eq!(updated.metadata().row_count_at_build, Some(100));
assert!(manager.update_index_metadata("nope", |_| {}).is_err());
Ok(())
}
#[test]
fn test_vector_index_for_property_skips_non_online() {
let mut schema = Schema::default();
schema.labels.insert(
"Document".to_string(),
LabelMeta {
id: 1,
created_at: chrono::Utc::now(),
state: SchemaElementState::Active,
},
);
schema
.indexes
.push(IndexDefinition::Vector(VectorIndexConfig {
name: "vec_doc_embedding".to_string(),
label: "Document".to_string(),
property: "embedding".to_string(),
index_type: VectorIndexType::Flat,
metric: DistanceMetric::Cosine,
embedding_config: None,
metadata: IndexMetadata {
status: IndexStatus::Stale,
..Default::default()
},
}));
assert!(
schema
.vector_index_for_property("Document", "embedding")
.is_none()
);
if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
cfg.metadata.status = IndexStatus::Online;
}
let result = schema.vector_index_for_property("Document", "embedding");
assert!(result.is_some());
assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
}
}