use crate::core::edge_type::{
MAX_SCHEMA_TYPE_ID, VIRTUAL_EDGE_TYPE_ID_SENTINEL, VIRTUAL_EDGE_TYPE_ID_START,
is_schemaless_edge_type, make_schemaless_id,
};
use crate::sync::{acquire_read, acquire_write};
use anyhow::{Result, anyhow};
use chrono::{DateTime, Utc};
use object_store::ObjectStore;
use object_store::ObjectStoreExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path as ObjectStorePath;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
use std::sync::{Arc, RwLock};
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum SchemaElementState {
Active,
Hidden {
since: DateTime<Utc>,
last_active_snapshot: String, },
Tombstone {
since: DateTime<Utc>,
},
}
use arrow_schema::{DataType as ArrowDataType, Field, Fields, TimeUnit};
pub fn datetime_struct_fields() -> Fields {
Fields::from(vec![
Field::new(
"nanos_since_epoch",
ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
true,
),
Field::new("offset_seconds", ArrowDataType::Int32, true),
Field::new("timezone_name", ArrowDataType::Utf8, true),
])
}
pub fn time_struct_fields() -> Fields {
Fields::from(vec![
Field::new(
"nanos_since_midnight",
ArrowDataType::Time64(TimeUnit::Nanosecond),
true,
),
Field::new("offset_seconds", ArrowDataType::Int32, true),
])
}
pub fn is_datetime_struct(arrow_dt: &ArrowDataType) -> bool {
matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == datetime_struct_fields())
}
pub fn is_time_struct(arrow_dt: &ArrowDataType) -> bool {
matches!(arrow_dt, ArrowDataType::Struct(fields) if *fields == time_struct_fields())
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum CrdtType {
GCounter,
GSet,
ORSet,
LWWRegister,
LWWMap,
Rga,
VectorClock,
VCRegister,
}
impl CrdtType {
#[must_use]
pub fn type_name(&self) -> &'static str {
match self {
CrdtType::GCounter => "GCounter",
CrdtType::GSet => "GSet",
CrdtType::ORSet => "ORSet",
CrdtType::LWWRegister => "LWWRegister",
CrdtType::LWWMap => "LWWMap",
CrdtType::Rga => "Rga",
CrdtType::VectorClock => "VectorClock",
CrdtType::VCRegister => "VCRegister",
}
}
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
pub enum PointType {
Geographic, Cartesian2D, Cartesian3D, }
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum DataType {
String,
Int32,
Int64,
Float32,
Float64,
Bool,
Timestamp,
Date,
Time,
DateTime,
Duration,
CypherValue,
Bytes,
Point(PointType),
Vector { dimensions: usize },
Btic,
Crdt(CrdtType),
List(Box<DataType>),
Map(Box<DataType>, Box<DataType>),
}
impl DataType {
#[allow(non_upper_case_globals)]
pub const Float: DataType = DataType::Float64;
#[allow(non_upper_case_globals)]
pub const Int: DataType = DataType::Int64;
pub fn to_arrow(&self) -> ArrowDataType {
match self {
DataType::String => ArrowDataType::Utf8,
DataType::Int32 => ArrowDataType::Int32,
DataType::Int64 => ArrowDataType::Int64,
DataType::Float32 => ArrowDataType::Float32,
DataType::Float64 => ArrowDataType::Float64,
DataType::Bool => ArrowDataType::Boolean,
DataType::Timestamp => {
ArrowDataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into()))
}
DataType::Date => ArrowDataType::Date32,
DataType::Time => ArrowDataType::Struct(time_struct_fields()),
DataType::DateTime => ArrowDataType::Struct(datetime_struct_fields()),
DataType::Duration => ArrowDataType::LargeBinary, DataType::CypherValue => ArrowDataType::LargeBinary, DataType::Bytes => ArrowDataType::LargeBinary, DataType::Point(pt) => match pt {
PointType::Geographic => ArrowDataType::Struct(Fields::from(vec![
Field::new("latitude", ArrowDataType::Float64, false),
Field::new("longitude", ArrowDataType::Float64, false),
Field::new("crs", ArrowDataType::Utf8, false),
])),
PointType::Cartesian2D => ArrowDataType::Struct(Fields::from(vec![
Field::new("x", ArrowDataType::Float64, false),
Field::new("y", ArrowDataType::Float64, false),
Field::new("crs", ArrowDataType::Utf8, false),
])),
PointType::Cartesian3D => ArrowDataType::Struct(Fields::from(vec![
Field::new("x", ArrowDataType::Float64, false),
Field::new("y", ArrowDataType::Float64, false),
Field::new("z", ArrowDataType::Float64, false),
Field::new("crs", ArrowDataType::Utf8, false),
])),
},
DataType::Vector { dimensions } => ArrowDataType::FixedSizeList(
Arc::new(Field::new("item", ArrowDataType::Float32, true)),
*dimensions as i32,
),
DataType::Btic => ArrowDataType::FixedSizeBinary(24),
DataType::Crdt(_) => ArrowDataType::Binary, DataType::List(inner) => {
ArrowDataType::List(Arc::new(Field::new("item", inner.to_arrow(), true)))
}
DataType::Map(key, value) => ArrowDataType::List(Arc::new(Field::new(
"item",
ArrowDataType::Struct(Fields::from(vec![
Field::new("key", key.to_arrow(), false),
Field::new("value", value.to_arrow(), true),
])),
true,
))),
}
}
pub fn accepts(&self, value: &crate::value::Value) -> bool {
use crate::value::{TemporalValue, Value};
if matches!(value, Value::Null) {
return true;
}
match self {
DataType::CypherValue | DataType::Crdt(_) | DataType::Point(_) => true,
DataType::String => matches!(value, Value::String(_)),
DataType::Int32 | DataType::Int64 => matches!(value, Value::Int(_)),
DataType::Float32 | DataType::Float64 => {
matches!(value, Value::Int(_) | Value::Float(_))
}
DataType::Bool => matches!(value, Value::Bool(_)),
DataType::Timestamp => matches!(
value,
Value::String(_)
| Value::Int(_)
| Value::Temporal(
TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
)
),
DataType::DateTime => matches!(
value,
Value::Temporal(
TemporalValue::DateTime { .. } | TemporalValue::LocalDateTime { .. }
)
),
DataType::Date => {
matches!(
value,
Value::Int(_) | Value::Temporal(TemporalValue::Date { .. })
)
}
DataType::Time => matches!(
value,
Value::Int(_)
| Value::Temporal(TemporalValue::Time { .. } | TemporalValue::LocalTime { .. })
),
DataType::Duration => {
matches!(value, Value::Temporal(TemporalValue::Duration { .. }))
}
DataType::Bytes => matches!(value, Value::Bytes(_)),
DataType::Btic => matches!(
value,
Value::String(_) | Value::List(_) | Value::Temporal(TemporalValue::Btic { .. })
),
DataType::Vector { .. } => matches!(value, Value::Vector(_) | Value::List(_)),
DataType::List(_) => matches!(value, Value::List(_)),
DataType::Map(_, _) => matches!(value, Value::Map(_)),
}
}
}
fn default_created_at() -> DateTime<Utc> {
Utc::now()
}
fn default_state() -> SchemaElementState {
SchemaElementState::Active
}
fn default_version_1() -> u32 {
1
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PropertyMeta {
pub r#type: DataType,
pub nullable: bool,
#[serde(default = "default_version_1")]
pub added_in: u32, #[serde(default = "default_state")]
pub state: SchemaElementState,
#[serde(default)]
pub generation_expression: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LabelMeta {
pub id: u16, #[serde(default = "default_created_at")]
pub created_at: DateTime<Utc>,
#[serde(default = "default_state")]
pub state: SchemaElementState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct EdgeTypeMeta {
pub id: u32,
pub src_labels: Vec<String>,
pub dst_labels: Vec<String>,
#[serde(default = "default_state")]
pub state: SchemaElementState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum ConstraintType {
Unique { properties: Vec<String> },
Exists { property: String },
Check { expression: String },
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum ConstraintTarget {
Label(String),
EdgeType(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Constraint {
pub name: String,
pub constraint_type: ConstraintType,
pub target: ConstraintTarget,
pub enabled: bool,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SchemalessEdgeTypeRegistry {
name_to_id: HashMap<String, u32>,
id_to_name: HashMap<u32, String>,
next_local_id: u32,
}
impl SchemalessEdgeTypeRegistry {
pub fn new() -> Self {
Self {
name_to_id: HashMap::new(),
id_to_name: HashMap::new(),
next_local_id: 1,
}
}
pub fn get_or_assign_id(&mut self, type_name: &str) -> u32 {
if let Some(&id) = self.name_to_id.get(type_name) {
return id;
}
let id = make_schemaless_id(self.next_local_id);
self.next_local_id += 1;
self.name_to_id.insert(type_name.to_string(), id);
self.id_to_name.insert(id, type_name.to_string());
id
}
pub fn type_name_by_id(&self, type_id: u32) -> Option<&str> {
self.id_to_name.get(&type_id).map(String::as_str)
}
pub fn contains(&self, type_name: &str) -> bool {
self.name_to_id.contains_key(type_name)
}
pub fn id_by_name(&self, type_name: &str) -> Option<u32> {
self.name_to_id.get(type_name).copied()
}
pub fn id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
self.name_to_id
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case(type_name))
.map(|(_, &id)| id)
}
pub fn all_type_ids(&self) -> Vec<u32> {
self.id_to_name.keys().copied().collect()
}
pub fn is_empty(&self) -> bool {
self.name_to_id.is_empty()
}
}
impl Default for SchemalessEdgeTypeRegistry {
fn default() -> Self {
Self::new()
}
}
pub const VIRTUAL_LABEL_ID_START: u16 = 0xFF00;
pub const VIRTUAL_LABEL_ID_SENTINEL: u16 = 0xFFFF;
#[inline]
pub fn is_virtual_label_id(id: u16) -> bool {
(VIRTUAL_LABEL_ID_START..VIRTUAL_LABEL_ID_SENTINEL).contains(&id)
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Schema {
pub schema_version: u32,
pub labels: HashMap<String, LabelMeta>,
pub edge_types: HashMap<String, EdgeTypeMeta>,
pub properties: HashMap<String, HashMap<String, PropertyMeta>>,
#[serde(default)]
pub indexes: Vec<IndexDefinition>,
#[serde(default)]
pub constraints: Vec<Constraint>,
#[serde(default)]
pub schemaless_registry: SchemalessEdgeTypeRegistry,
}
impl Default for Schema {
fn default() -> Self {
Self {
schema_version: 1,
labels: HashMap::new(),
edge_types: HashMap::new(),
properties: HashMap::new(),
indexes: Vec::new(),
constraints: Vec::new(),
schemaless_registry: SchemalessEdgeTypeRegistry::new(),
}
}
}
impl Schema {
fn bump_version(&mut self) {
self.schema_version = self.schema_version.wrapping_add(1);
}
pub fn label_name_by_id(&self, label_id: u16) -> Option<&str> {
self.labels
.iter()
.find(|(_, meta)| meta.id == label_id)
.map(|(name, _)| name.as_str())
}
pub fn label_id_by_name(&self, label_name: &str) -> Option<u16> {
self.labels.get(label_name).map(|meta| meta.id)
}
pub fn edge_type_name_by_id(&self, type_id: u32) -> Option<&str> {
self.edge_types
.iter()
.find(|(_, meta)| meta.id == type_id)
.map(|(name, _)| name.as_str())
}
pub fn edge_type_id_by_name(&self, type_name: &str) -> Option<u32> {
self.edge_types.get(type_name).map(|meta| meta.id)
}
pub fn vector_index_for_property(
&self,
label: &str,
property: &str,
) -> Option<&VectorIndexConfig> {
self.indexes.iter().find_map(|idx| {
if let IndexDefinition::Vector(config) = idx
&& config.label == label
&& config.property == property
&& config.metadata.status == IndexStatus::Online
{
return Some(config);
}
None
})
}
pub fn fulltext_index_for_property(
&self,
label: &str,
property: &str,
) -> Option<&FullTextIndexConfig> {
self.indexes.iter().find_map(|idx| {
if let IndexDefinition::FullText(config) = idx
&& config.label == label
&& config.properties.iter().any(|p| p == property)
&& config.metadata.status == IndexStatus::Online
{
return Some(config);
}
None
})
}
pub fn get_label_case_insensitive(&self, name: &str) -> Option<&LabelMeta> {
self.labels
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case(name))
.map(|(_, v)| v)
}
pub fn canonical_label_name(&self, name: &str) -> Option<String> {
self.labels
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case(name))
.map(|(k, _)| k.clone())
}
pub fn label_id_by_name_case_insensitive(&self, label_name: &str) -> Option<u16> {
self.get_label_case_insensitive(label_name)
.map(|meta| meta.id)
}
pub fn get_edge_type_case_insensitive(&self, name: &str) -> Option<&EdgeTypeMeta> {
self.edge_types
.iter()
.find(|(k, _)| k.eq_ignore_ascii_case(name))
.map(|(_, v)| v)
}
pub fn edge_type_id_by_name_case_insensitive(&self, type_name: &str) -> Option<u32> {
self.get_edge_type_case_insensitive(type_name)
.map(|meta| meta.id)
}
pub fn edge_type_id_unified_case_insensitive(&self, type_name: &str) -> Option<u32> {
self.edge_type_id_by_name_case_insensitive(type_name)
.or_else(|| {
self.schemaless_registry
.id_by_name_case_insensitive(type_name)
})
}
pub fn get_or_assign_edge_type_id(&mut self, type_name: &str) -> u32 {
if let Some(id) = self.edge_type_id_unified(type_name) {
return id;
}
self.schemaless_registry.get_or_assign_id(type_name)
}
pub fn edge_type_id_unified(&self, type_name: &str) -> Option<u32> {
self.edge_type_id_by_name(type_name)
.or_else(|| self.schemaless_registry.id_by_name(type_name))
}
pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
if is_schemaless_edge_type(type_id) {
self.schemaless_registry
.type_name_by_id(type_id)
.map(str::to_owned)
} else {
self.edge_type_name_by_id(type_id).map(str::to_owned)
}
}
pub fn all_edge_type_ids(&self) -> Vec<u32> {
let mut ids: Vec<u32> = self.edge_types.values().map(|m| m.id).collect();
ids.extend(self.schemaless_registry.all_type_ids());
ids.sort_unstable();
ids
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub enum IndexStatus {
#[default]
Online,
Building,
Stale,
Failed,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct IndexMetadata {
#[serde(default)]
pub status: IndexStatus,
#[serde(default)]
pub last_built_at: Option<DateTime<Utc>>,
#[serde(default)]
pub row_count_at_build: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
#[non_exhaustive]
pub enum IndexDefinition {
Vector(VectorIndexConfig),
FullText(FullTextIndexConfig),
Scalar(ScalarIndexConfig),
Inverted(InvertedIndexConfig),
JsonFullText(JsonFtsIndexConfig),
}
impl IndexDefinition {
pub fn name(&self) -> &str {
match self {
IndexDefinition::Vector(c) => &c.name,
IndexDefinition::FullText(c) => &c.name,
IndexDefinition::Scalar(c) => &c.name,
IndexDefinition::Inverted(c) => &c.name,
IndexDefinition::JsonFullText(c) => &c.name,
}
}
pub fn label(&self) -> &str {
match self {
IndexDefinition::Vector(c) => &c.label,
IndexDefinition::FullText(c) => &c.label,
IndexDefinition::Scalar(c) => &c.label,
IndexDefinition::Inverted(c) => &c.label,
IndexDefinition::JsonFullText(c) => &c.label,
}
}
pub fn metadata(&self) -> &IndexMetadata {
match self {
IndexDefinition::Vector(c) => &c.metadata,
IndexDefinition::FullText(c) => &c.metadata,
IndexDefinition::Scalar(c) => &c.metadata,
IndexDefinition::Inverted(c) => &c.metadata,
IndexDefinition::JsonFullText(c) => &c.metadata,
}
}
pub fn metadata_mut(&mut self) -> &mut IndexMetadata {
match self {
IndexDefinition::Vector(c) => &mut c.metadata,
IndexDefinition::FullText(c) => &mut c.metadata,
IndexDefinition::Scalar(c) => &mut c.metadata,
IndexDefinition::Inverted(c) => &mut c.metadata,
IndexDefinition::JsonFullText(c) => &mut c.metadata,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct InvertedIndexConfig {
pub name: String,
pub label: String,
pub property: String,
#[serde(default = "default_normalize")]
pub normalize: bool,
#[serde(default = "default_max_terms_per_doc")]
pub max_terms_per_doc: usize,
#[serde(default)]
pub metadata: IndexMetadata,
}
fn default_normalize() -> bool {
true
}
fn default_max_terms_per_doc() -> usize {
10_000
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct VectorIndexConfig {
pub name: String,
pub label: String,
pub property: String,
pub index_type: VectorIndexType,
pub metric: DistanceMetric,
pub embedding_config: Option<EmbeddingConfig>,
#[serde(default)]
pub metadata: IndexMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct EmbeddingConfig {
pub alias: String,
pub source_properties: Vec<String>,
pub batch_size: usize,
#[serde(default)]
pub document_prefix: Option<String>,
#[serde(default)]
pub query_prefix: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum VectorIndexType {
Flat,
IvfFlat {
num_partitions: u32,
},
IvfPq {
num_partitions: u32,
num_sub_vectors: u32,
bits_per_subvector: u8,
},
IvfSq {
num_partitions: u32,
},
IvfRq {
num_partitions: u32,
#[serde(default)]
num_bits: Option<u8>,
},
HnswFlat {
m: u32,
ef_construction: u32,
#[serde(default)]
num_partitions: Option<u32>,
},
HnswSq {
m: u32,
ef_construction: u32,
#[serde(default)]
num_partitions: Option<u32>,
},
HnswPq {
m: u32,
ef_construction: u32,
num_sub_vectors: u32,
#[serde(default)]
num_partitions: Option<u32>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum DistanceMetric {
Cosine,
L2,
Dot,
}
impl DistanceMetric {
pub fn compute_distance(&self, a: &[f32], b: &[f32]) -> f32 {
assert_eq!(a.len(), b.len(), "vector dimension mismatch");
match self {
DistanceMetric::L2 => a.iter().zip(b).map(|(x, y)| (x - y).powi(2)).sum(),
DistanceMetric::Cosine => {
let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
let norm_a: f32 = a.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
let norm_b: f32 = b.iter().map(|x| x.powi(2)).sum::<f32>().sqrt();
let denom = norm_a * norm_b;
if denom == 0.0 { 1.0 } else { 1.0 - dot / denom }
}
DistanceMetric::Dot => {
let dot: f32 = a.iter().zip(b).map(|(x, y)| x * y).sum();
-dot
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct FullTextIndexConfig {
pub name: String,
pub label: String,
pub properties: Vec<String>,
pub tokenizer: TokenizerConfig,
pub with_positions: bool,
#[serde(default)]
pub metadata: IndexMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum TokenizerConfig {
Standard,
Whitespace,
Ngram { min: u8, max: u8 },
Custom { name: String },
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct JsonFtsIndexConfig {
pub name: String,
pub label: String,
pub column: String,
#[serde(default)]
pub paths: Vec<String>,
#[serde(default)]
pub with_positions: bool,
#[serde(default)]
pub metadata: IndexMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ScalarIndexConfig {
pub name: String,
pub label: String,
pub properties: Vec<String>,
pub index_type: ScalarIndexType,
pub where_clause: Option<String>,
#[serde(default)]
pub metadata: IndexMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[non_exhaustive]
pub enum ScalarIndexType {
BTree,
Hash,
Bitmap,
LabelList,
}
pub struct SchemaManager {
store: Arc<dyn ObjectStore>,
path: ObjectStorePath,
schema: RwLock<Arc<Schema>>,
}
impl SchemaManager {
pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
let parent = path
.parent()
.ok_or_else(|| anyhow!("Invalid schema path"))?;
let filename = path
.file_name()
.ok_or_else(|| anyhow!("Invalid schema filename"))?
.to_str()
.ok_or_else(|| anyhow!("Invalid utf8 filename"))?;
let store = Arc::new(LocalFileSystem::new_with_prefix(parent)?);
let obj_path = ObjectStorePath::from(filename);
Self::load_from_store(store, &obj_path).await
}
pub async fn load_from_store(
store: Arc<dyn ObjectStore>,
path: &ObjectStorePath,
) -> Result<Self> {
match store.get(path).await {
Ok(result) => {
let bytes = result.bytes().await?;
let content = String::from_utf8(bytes.to_vec())?;
let mut schema: Schema = serde_json::from_str(&content)?;
let original_len = schema.indexes.len();
if original_len > 0 {
let mut seen: std::collections::HashSet<String> =
std::collections::HashSet::with_capacity(original_len);
let mut dedup: Vec<IndexDefinition> = schema
.indexes
.iter()
.rev()
.filter(|idx| seen.insert(idx.name().to_string()))
.cloned()
.collect();
dedup.reverse();
if dedup.len() != original_len {
tracing::warn!(
collapsed = original_len - dedup.len(),
kept = dedup.len(),
"schema.indexes: collapsed duplicate entries on load (issue #63)"
);
schema.indexes = dedup;
}
}
Ok(Self {
store,
path: path.clone(),
schema: RwLock::new(Arc::new(schema)),
})
}
Err(object_store::Error::NotFound { .. }) => Ok(Self {
store,
path: path.clone(),
schema: RwLock::new(Arc::new(Schema::default())),
}),
Err(e) => Err(anyhow::Error::from(e)),
}
}
pub async fn save(&self) -> Result<()> {
let content = {
let schema_guard = acquire_read(&self.schema, "schema")?;
serde_json::to_string_pretty(&**schema_guard)?
};
self.store
.put(&self.path, content.into())
.await
.map_err(anyhow::Error::from)?;
Ok(())
}
pub fn path(&self) -> &ObjectStorePath {
&self.path
}
pub fn schema(&self) -> Arc<Schema> {
self.schema
.read()
.expect("Schema lock poisoned - a thread panicked while holding it")
.clone()
}
fn normalize_function_names(expr: &str) -> String {
let mut result = String::with_capacity(expr.len());
let mut chars = expr.chars().peekable();
while let Some(ch) = chars.next() {
if ch.is_alphabetic() {
let mut ident = String::new();
ident.push(ch);
while let Some(&next) = chars.peek() {
if next.is_alphanumeric() || next == '_' {
ident.push(chars.next().unwrap());
} else {
break;
}
}
if chars.peek() == Some(&'(') {
result.push_str(&ident.to_uppercase());
} else {
result.push_str(&ident); }
} else {
result.push(ch);
}
}
result
}
pub fn generated_column_name(expr: &str) -> String {
let normalized = Self::normalize_function_names(expr);
let sanitized = normalized
.replace(|c: char| !c.is_alphanumeric(), "_")
.trim_matches('_')
.to_string();
const FNV_OFFSET_BASIS: u64 = 14695981039346656037;
const FNV_PRIME: u64 = 1099511628211;
let mut hash = FNV_OFFSET_BASIS;
for byte in normalized.as_bytes() {
hash ^= *byte as u64;
hash = hash.wrapping_mul(FNV_PRIME);
}
format!("_gen_{}_{:x}", sanitized, hash)
}
pub fn replace_schema(&self, new_schema: Schema) {
let mut schema = self
.schema
.write()
.expect("Schema lock poisoned - a thread panicked while holding it");
*schema = Arc::new(new_schema);
}
#[must_use]
pub fn with_overlay(&self, overlay: &crate::core::fork::SchemaDelta) -> Arc<Self> {
let primary = self.schema();
let merged = if overlay.is_empty() {
(*primary).clone()
} else {
let mut merged = (*primary).clone();
for (name, label) in &overlay.added_labels {
merged.labels.insert(name.clone(), label.clone());
}
for (name, edge_type) in &overlay.added_edge_types {
merged.edge_types.insert(name.clone(), edge_type.clone());
}
for addition in &overlay.added_properties {
let props = merged.properties.entry(addition.owner.clone()).or_default();
props.insert(
addition.property.clone(),
PropertyMeta {
r#type: addition.data_type.clone(),
nullable: addition.nullable,
added_in: merged.schema_version,
state: SchemaElementState::Active,
generation_expression: None,
description: None,
},
);
}
merged
};
Arc::new(Self {
store: self.store.clone(),
path: self.path.clone(),
schema: RwLock::new(Arc::new(merged)),
})
}
pub fn next_label_id(&self) -> u16 {
self.schema()
.labels
.values()
.map(|l| l.id)
.max()
.unwrap_or(0)
+ 1
}
pub fn next_type_id(&self) -> u32 {
let max_schema_id = self
.schema()
.edge_types
.values()
.map(|t| t.id)
.max()
.unwrap_or(0);
if max_schema_id >= MAX_SCHEMA_TYPE_ID {
panic!("Schema edge type ID exhaustion");
}
max_schema_id + 1
}
pub fn add_label(&self, name: &str) -> Result<u16> {
self.add_label_with_desc(name, None)
}
pub fn add_label_with_desc(&self, name: &str, description: Option<String>) -> Result<u16> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if schema.labels.contains_key(name) {
return Err(anyhow!("Label '{}' already exists", name));
}
let id = schema.labels.values().map(|l| l.id).max().unwrap_or(0) + 1;
if id >= VIRTUAL_LABEL_ID_START {
return Err(anyhow!(
"Native label space exhausted (next id {id:#x} would enter the \
virtual range {VIRTUAL_LABEL_ID_START:#x}..{VIRTUAL_LABEL_ID_SENTINEL:#x} \
reserved for catalog-resolved labels)"
));
}
schema.labels.insert(
name.to_string(),
LabelMeta {
id,
created_at: Utc::now(),
state: SchemaElementState::Active,
description,
},
);
schema.bump_version();
Ok(id)
}
pub fn add_edge_type(
&self,
name: &str,
src_labels: Vec<String>,
dst_labels: Vec<String>,
) -> Result<u32> {
self.add_edge_type_with_desc(name, src_labels, dst_labels, None)
}
pub fn add_edge_type_with_desc(
&self,
name: &str,
src_labels: Vec<String>,
dst_labels: Vec<String>,
description: Option<String>,
) -> Result<u32> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if schema.edge_types.contains_key(name) {
return Err(anyhow!("Edge type '{}' already exists", name));
}
let id = schema.edge_types.values().map(|t| t.id).max().unwrap_or(0) + 1;
if id >= VIRTUAL_EDGE_TYPE_ID_START {
return Err(anyhow!(
"Native edge type space exhausted (next id {id:#x} would enter the \
virtual range {VIRTUAL_EDGE_TYPE_ID_START:#x}..{VIRTUAL_EDGE_TYPE_ID_SENTINEL:#x} \
reserved for catalog-resolved edge types)"
));
}
schema.edge_types.insert(
name.to_string(),
EdgeTypeMeta {
id,
src_labels,
dst_labels,
state: SchemaElementState::Active,
description,
},
);
schema.bump_version();
Ok(id)
}
pub fn get_or_assign_edge_type_id(&self, type_name: &str) -> u32 {
{
let guard = acquire_read(&self.schema, "schema")
.expect("Schema lock poisoned - a thread panicked while holding it");
if let Some(id) = guard.edge_type_id_unified(type_name) {
return id;
}
}
let mut guard = acquire_write(&self.schema, "schema")
.expect("Schema lock poisoned - a thread panicked while holding it");
let schema = Arc::make_mut(&mut *guard);
schema.get_or_assign_edge_type_id(type_name)
}
pub fn edge_type_name_by_id_unified(&self, type_id: u32) -> Option<String> {
let schema = acquire_read(&self.schema, "schema")
.expect("Schema lock poisoned - a thread panicked while holding it");
schema.edge_type_name_by_id_unified(type_id)
}
pub fn add_property(
&self,
label_or_type: &str,
prop_name: &str,
data_type: DataType,
nullable: bool,
) -> Result<()> {
self.add_property_with_desc(label_or_type, prop_name, data_type, nullable, None)
}
pub fn add_property_with_desc(
&self,
label_or_type: &str,
prop_name: &str,
data_type: DataType,
nullable: bool,
description: Option<String>,
) -> Result<()> {
validate_property_name(prop_name)?;
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
let version = schema.schema_version;
let props = schema
.properties
.entry(label_or_type.to_string())
.or_default();
if props.contains_key(prop_name) {
return Err(anyhow!(
"Property '{}' already exists for '{}'",
prop_name,
label_or_type
));
}
props.insert(
prop_name.to_string(),
PropertyMeta {
r#type: data_type,
nullable,
added_in: version,
state: SchemaElementState::Active,
generation_expression: None,
description,
},
);
schema.bump_version();
Ok(())
}
pub fn add_generated_property(
&self,
label_or_type: &str,
prop_name: &str,
data_type: DataType,
expr: String,
) -> Result<()> {
validate_reserved_property_name(prop_name)?;
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
let version = schema.schema_version;
let props = schema
.properties
.entry(label_or_type.to_string())
.or_default();
if props.contains_key(prop_name) {
return Err(anyhow!("Property '{}' already exists", prop_name));
}
props.insert(
prop_name.to_string(),
PropertyMeta {
r#type: data_type,
nullable: true,
added_in: version,
state: SchemaElementState::Active,
generation_expression: Some(expr),
description: None,
},
);
schema.bump_version();
Ok(())
}
pub fn set_label_description(&self, name: &str, description: Option<String>) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
let meta = schema
.labels
.get_mut(name)
.ok_or_else(|| anyhow!("Label '{}' does not exist", name))?;
meta.description = description;
Ok(())
}
pub fn set_edge_type_description(&self, name: &str, description: Option<String>) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
let meta = schema
.edge_types
.get_mut(name)
.ok_or_else(|| anyhow!("Edge type '{}' does not exist", name))?;
meta.description = description;
Ok(())
}
pub fn set_property_description(
&self,
entity: &str,
prop_name: &str,
description: Option<String>,
) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
let props = schema
.properties
.get_mut(entity)
.ok_or_else(|| anyhow!("Entity '{}' does not exist", entity))?;
let meta = props
.get_mut(prop_name)
.ok_or_else(|| anyhow!("Property '{}' does not exist on '{}'", prop_name, entity))?;
meta.description = description;
Ok(())
}
pub fn add_index(&self, index_def: IndexDefinition) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if let Some(existing) = schema
.indexes
.iter_mut()
.find(|i| i.name() == index_def.name())
{
*existing = index_def;
} else {
schema.indexes.push(index_def);
}
schema.bump_version();
Ok(())
}
pub fn get_index(&self, name: &str) -> Option<IndexDefinition> {
let schema = self.schema.read().expect("Schema lock poisoned");
schema.indexes.iter().find(|i| i.name() == name).cloned()
}
pub fn update_index_metadata(
&self,
index_name: &str,
f: impl FnOnce(&mut IndexMetadata),
) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
let idx = schema
.indexes
.iter_mut()
.find(|i| i.name() == index_name)
.ok_or_else(|| anyhow!("Index '{}' not found", index_name))?;
f(idx.metadata_mut());
Ok(())
}
pub fn remove_index(&self, name: &str) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if let Some(pos) = schema.indexes.iter().position(|i| i.name() == name) {
schema.indexes.remove(pos);
schema.bump_version();
Ok(())
} else {
Err(anyhow!("Index '{}' not found", name))
}
}
pub fn add_constraint(&self, constraint: Constraint) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if schema.constraints.iter().any(|c| c.name == constraint.name) {
return Err(anyhow!("Constraint '{}' already exists", constraint.name));
}
schema.constraints.push(constraint);
schema.bump_version();
Ok(())
}
pub fn drop_constraint(&self, name: &str, if_exists: bool) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if let Some(pos) = schema.constraints.iter().position(|c| c.name == name) {
schema.constraints.remove(pos);
schema.bump_version();
Ok(())
} else if if_exists {
Ok(())
} else {
Err(anyhow!("Constraint '{}' not found", name))
}
}
pub fn drop_property(&self, label_or_type: &str, prop_name: &str) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
let Some(props) = schema.properties.get_mut(label_or_type) else {
return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
};
if props.remove(prop_name).is_none() {
return Err(anyhow!(
"Property '{}' not found for '{}'",
prop_name,
label_or_type
));
}
schema.bump_version();
Ok(())
}
pub fn rename_property(
&self,
label_or_type: &str,
old_name: &str,
new_name: &str,
) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
let Some(props) = schema.properties.get_mut(label_or_type) else {
return Err(anyhow!("Label or Edge Type '{}' not found", label_or_type));
};
let Some(meta) = props.remove(old_name) else {
return Err(anyhow!(
"Property '{}' not found for '{}'",
old_name,
label_or_type
));
};
if props.contains_key(new_name) {
props.insert(old_name.to_string(), meta); return Err(anyhow!("Property '{}' already exists", new_name));
}
props.insert(new_name.to_string(), meta);
schema.bump_version();
Ok(())
}
pub fn drop_label(&self, name: &str, if_exists: bool) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if let Some(label_meta) = schema.labels.get_mut(name) {
label_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
schema.bump_version();
Ok(())
} else if if_exists {
Ok(())
} else {
Err(anyhow!("Label '{}' not found", name))
}
}
pub fn drop_edge_type(&self, name: &str, if_exists: bool) -> Result<()> {
let mut guard = acquire_write(&self.schema, "schema")?;
let schema = Arc::make_mut(&mut *guard);
if let Some(edge_meta) = schema.edge_types.get_mut(name) {
edge_meta.state = SchemaElementState::Tombstone { since: Utc::now() };
schema.bump_version();
Ok(())
} else if if_exists {
Ok(())
} else {
Err(anyhow!("Edge Type '{}' not found", name))
}
}
}
pub fn validate_identifier(name: &str) -> Result<()> {
if name.is_empty() || name.len() > 64 {
return Err(anyhow!("Identifier '{}' must be 1-64 characters", name));
}
let first = name.chars().next().unwrap();
if !first.is_alphabetic() && first != '_' {
return Err(anyhow!(
"Identifier '{}' must start with letter or underscore",
name
));
}
if !name.chars().all(|c| c.is_alphanumeric() || c == '_') {
return Err(anyhow!(
"Identifier '{}' must contain only alphanumeric and underscore",
name
));
}
const RESERVED: &[&str] = &[
"MATCH", "CREATE", "DELETE", "SET", "RETURN", "WHERE", "MERGE", "CALL", "YIELD", "WITH",
"UNION", "ORDER", "LIMIT",
];
if RESERVED.contains(&name.to_uppercase().as_str()) {
return Err(anyhow!("Identifier '{}' cannot be a reserved word", name));
}
Ok(())
}
pub fn validate_property_name(name: &str) -> Result<()> {
if name.starts_with('_') {
return Err(anyhow!(
"Property name '{}' is reserved: names starting with '_' are reserved by the storage layer",
name
));
}
validate_reserved_property_name(name)
}
fn validate_reserved_property_name(name: &str) -> Result<()> {
const RESERVED_PROPS: &[&str] = &[
"ext_id",
"overflow_json",
"eid",
"src_vid",
"dst_vid",
"op",
"__set_struct__",
];
if RESERVED_PROPS.contains(&name) {
return Err(anyhow!(
"Property name '{}' is reserved by the storage layer; please choose a different name",
name
));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::value::{TemporalValue, Value};
use object_store::local::LocalFileSystem;
use tempfile::tempdir;
#[test]
fn test_datatype_accepts_matrix() {
let dt = || TemporalValue::DateTime {
nanos_since_epoch: 0,
offset_seconds: 0,
timezone_name: None,
};
for ty in [
DataType::String,
DataType::Int64,
DataType::Bool,
DataType::DateTime,
DataType::Float64,
] {
assert!(ty.accepts(&Value::Null), "{ty:?} must accept Null");
}
assert!(DataType::String.accepts(&Value::String("x".into())));
assert!(DataType::Int64.accepts(&Value::Int(1)));
assert!(DataType::Bool.accepts(&Value::Bool(true)));
assert!(DataType::DateTime.accepts(&Value::Temporal(dt())));
assert!(
DataType::Float64.accepts(&Value::Int(3)),
"Int widens to Float"
);
assert!(DataType::Int32.accepts(&Value::Int(3)), "Int fits Int32");
assert!(DataType::Timestamp.accepts(&Value::Temporal(dt())));
assert!(
DataType::Timestamp.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
"storage parses strings for non-struct Timestamp columns"
);
assert!(
!DataType::DateTime.accepts(&Value::String("2026-01-01T00:00:00Z".into())),
"String into a DateTime struct column nulls silently — reject here"
);
assert!(!DataType::Bool.accepts(&Value::Int(1)));
assert!(!DataType::Int64.accepts(&Value::Bool(true)));
assert!(!DataType::Int64.accepts(&Value::Float(1.5)));
assert!(
!DataType::String.accepts(&Value::Int(10)),
"no implicit stringification"
);
assert!(!DataType::Duration.accepts(&Value::String("P1D".into())));
assert!(DataType::CypherValue.accepts(&Value::Map(Default::default())));
}
#[tokio::test]
async fn test_schema_management() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let path = ObjectStorePath::from("schema.json");
let manager = SchemaManager::load_from_store(store.clone(), &path).await?;
let lid = manager.add_label("Person")?;
assert_eq!(lid, 1);
assert!(manager.add_label("Person").is_err());
manager.add_property("Person", "name", DataType::String, false)?;
assert!(
manager
.add_property("Person", "name", DataType::String, false)
.is_err()
);
let tid = manager.add_edge_type("knows", vec!["Person".into()], vec!["Person".into()])?;
assert_eq!(tid, 1);
manager.save().await?;
assert!(store.get(&path).await.is_ok());
let manager2 = SchemaManager::load_from_store(store, &path).await?;
assert!(manager2.schema().labels.contains_key("Person"));
assert!(
manager2
.schema()
.properties
.get("Person")
.unwrap()
.contains_key("name")
);
Ok(())
}
#[tokio::test]
async fn test_reserved_property_names_rejected() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let path = ObjectStorePath::from("schema.json");
let manager = SchemaManager::load_from_store(store, &path).await?;
manager.add_label("Tiny")?;
for reserved in &["ext_id", "overflow_json", "eid", "src_vid", "dst_vid", "op"] {
let err = manager
.add_property("Tiny", reserved, DataType::String, true)
.expect_err(&format!("expected '{reserved}' to be rejected"));
assert!(
err.to_string().contains("reserved"),
"error for '{reserved}' should mention 'reserved', got: {err}"
);
}
let err = manager
.add_property("Tiny", "__set_struct__", DataType::String, true)
.expect_err("expected '__set_struct__' to be rejected");
assert!(
err.to_string().contains("reserved"),
"__set_struct__ rejection should mention 'reserved', got: {err}"
);
for reserved in &["_vid", "_uid", "_eid", "_version", "_created_at"] {
assert!(
manager
.add_property("Tiny", reserved, DataType::String, true)
.is_err(),
"expected '{reserved}' to be rejected"
);
}
manager.add_property("Tiny", "ext_id_foo", DataType::String, true)?;
manager.add_property("Tiny", "user_op", DataType::String, true)?;
manager.add_property("Tiny", "type_name", DataType::String, true)?;
manager.add_edge_type("knows", vec!["Tiny".into()], vec!["Tiny".into()])?;
assert!(
manager
.add_property("knows", "src_vid", DataType::Int64, true)
.is_err()
);
assert!(
manager
.add_generated_property(
"Tiny",
"ext_id",
DataType::String,
"concat('x', name)".into()
)
.is_err()
);
Ok(())
}
#[test]
fn test_normalize_function_names() {
assert_eq!(
SchemaManager::normalize_function_names("lower(email)"),
"LOWER(email)"
);
assert_eq!(
SchemaManager::normalize_function_names("LOWER(email)"),
"LOWER(email)"
);
assert_eq!(
SchemaManager::normalize_function_names("Lower(email)"),
"LOWER(email)"
);
assert_eq!(
SchemaManager::normalize_function_names("trim(lower(email))"),
"TRIM(LOWER(email))"
);
}
#[test]
fn test_generated_column_name_case_insensitive() {
let col1 = SchemaManager::generated_column_name("lower(email)");
let col2 = SchemaManager::generated_column_name("LOWER(email)");
let col3 = SchemaManager::generated_column_name("Lower(email)");
assert_eq!(col1, col2);
assert_eq!(col2, col3);
assert!(col1.starts_with("_gen_LOWER_email_"));
}
#[test]
fn test_index_metadata_serde_backward_compat() {
let json = r#"{
"type": "Scalar",
"name": "idx_person_name",
"label": "Person",
"properties": ["name"],
"index_type": "BTree",
"where_clause": null
}"#;
let def: IndexDefinition = serde_json::from_str(json).unwrap();
let meta = def.metadata();
assert_eq!(meta.status, IndexStatus::Online);
assert!(meta.last_built_at.is_none());
assert!(meta.row_count_at_build.is_none());
}
#[test]
fn test_index_metadata_serde_roundtrip() {
let now = Utc::now();
let def = IndexDefinition::Scalar(ScalarIndexConfig {
name: "idx_test".to_string(),
label: "Test".to_string(),
properties: vec!["prop".to_string()],
index_type: ScalarIndexType::BTree,
where_clause: None,
metadata: IndexMetadata {
status: IndexStatus::Building,
last_built_at: Some(now),
row_count_at_build: Some(42),
},
});
let json = serde_json::to_string(&def).unwrap();
let parsed: IndexDefinition = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.metadata().status, IndexStatus::Building);
assert_eq!(parsed.metadata().row_count_at_build, Some(42));
assert!(parsed.metadata().last_built_at.is_some());
}
#[tokio::test]
async fn test_update_index_metadata() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let path = ObjectStorePath::from("schema.json");
let manager = SchemaManager::load_from_store(store, &path).await?;
manager.add_label("Person")?;
let idx = IndexDefinition::Scalar(ScalarIndexConfig {
name: "idx_test".to_string(),
label: "Person".to_string(),
properties: vec!["name".to_string()],
index_type: ScalarIndexType::BTree,
where_clause: None,
metadata: Default::default(),
});
manager.add_index(idx)?;
let initial = manager.get_index("idx_test").unwrap();
assert_eq!(initial.metadata().status, IndexStatus::Online);
manager.update_index_metadata("idx_test", |m| {
m.status = IndexStatus::Building;
m.row_count_at_build = Some(100);
})?;
let updated = manager.get_index("idx_test").unwrap();
assert_eq!(updated.metadata().status, IndexStatus::Building);
assert_eq!(updated.metadata().row_count_at_build, Some(100));
assert!(manager.update_index_metadata("nope", |_| {}).is_err());
Ok(())
}
#[tokio::test]
async fn test_add_index_is_upsert_by_name() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let path = ObjectStorePath::from("schema.json");
let manager = SchemaManager::load_from_store(store, &path).await?;
manager.add_label("Person")?;
let initial = IndexDefinition::Scalar(ScalarIndexConfig {
name: "idx_test".to_string(),
label: "Person".to_string(),
properties: vec!["name".to_string()],
index_type: ScalarIndexType::BTree,
where_clause: None,
metadata: IndexMetadata {
status: IndexStatus::Building,
..Default::default()
},
});
manager.add_index(initial.clone())?;
assert_eq!(manager.schema().indexes.len(), 1);
manager.add_index(initial.clone())?;
assert_eq!(
manager.schema().indexes.len(),
1,
"duplicate add_index by name must not append"
);
let mut updated_cfg = match initial {
IndexDefinition::Scalar(c) => c,
_ => unreachable!(),
};
updated_cfg.metadata.status = IndexStatus::Online;
updated_cfg.metadata.row_count_at_build = Some(42);
manager.add_index(IndexDefinition::Scalar(updated_cfg))?;
assert_eq!(manager.schema().indexes.len(), 1);
let stored = manager.get_index("idx_test").unwrap();
assert_eq!(stored.metadata().status, IndexStatus::Online);
assert_eq!(stored.metadata().row_count_at_build, Some(42));
let other = IndexDefinition::Scalar(ScalarIndexConfig {
name: "idx_other".to_string(),
label: "Person".to_string(),
properties: vec!["age".to_string()],
index_type: ScalarIndexType::BTree,
where_clause: None,
metadata: IndexMetadata::default(),
});
manager.add_index(other)?;
assert_eq!(manager.schema().indexes.len(), 2);
Ok(())
}
#[tokio::test]
async fn test_load_dedups_bloated_indexes() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let path = ObjectStorePath::from("schema.json");
let mut schema = Schema::default();
schema.labels.insert(
"Person".to_string(),
LabelMeta {
id: 1,
created_at: chrono::Utc::now(),
state: SchemaElementState::Active,
description: None,
},
);
let make = |status: IndexStatus, count: Option<u64>| {
IndexDefinition::Scalar(ScalarIndexConfig {
name: "idx_dup".to_string(),
label: "Person".to_string(),
properties: vec!["name".to_string()],
index_type: ScalarIndexType::BTree,
where_clause: None,
metadata: IndexMetadata {
status,
row_count_at_build: count,
..Default::default()
},
})
};
for _ in 0..49 {
schema.indexes.push(make(IndexStatus::Building, None));
}
schema.indexes.push(make(IndexStatus::Online, Some(123)));
let json = serde_json::to_string_pretty(&schema)?;
store.put(&path, json.into()).await?;
let manager = SchemaManager::load_from_store(store, &path).await?;
let schema = manager.schema();
assert_eq!(
schema.indexes.len(),
1,
"load() must collapse 50 duplicates by name to 1"
);
assert_eq!(schema.indexes[0].metadata().status, IndexStatus::Online);
assert_eq!(schema.indexes[0].metadata().row_count_at_build, Some(123));
Ok(())
}
#[test]
fn test_vector_index_for_property_skips_non_online() {
let mut schema = Schema::default();
schema.labels.insert(
"Document".to_string(),
LabelMeta {
id: 1,
created_at: chrono::Utc::now(),
state: SchemaElementState::Active,
description: None,
},
);
schema
.indexes
.push(IndexDefinition::Vector(VectorIndexConfig {
name: "vec_doc_embedding".to_string(),
label: "Document".to_string(),
property: "embedding".to_string(),
index_type: VectorIndexType::Flat,
metric: DistanceMetric::Cosine,
embedding_config: None,
metadata: IndexMetadata {
status: IndexStatus::Stale,
..Default::default()
},
}));
assert!(
schema
.vector_index_for_property("Document", "embedding")
.is_none()
);
if let IndexDefinition::Vector(cfg) = &mut schema.indexes[0] {
cfg.metadata.status = IndexStatus::Online;
}
let result = schema.vector_index_for_property("Document", "embedding");
assert!(result.is_some());
assert_eq!(result.unwrap().metric, DistanceMetric::Cosine);
}
#[tokio::test]
async fn with_overlay_empty_clones_primary_in_isolation() -> Result<()> {
use crate::core::fork::SchemaDelta;
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let path = ObjectStorePath::from("schema.json");
let primary = SchemaManager::load_from_store(store, &path).await?;
primary.add_label("Person")?;
let overlay = primary.with_overlay(&SchemaDelta::empty());
assert_eq!(overlay.schema().labels.len(), 1);
overlay.add_label("Forked")?;
assert!(overlay.schema().labels.contains_key("Forked"));
assert!(!primary.schema().labels.contains_key("Forked"));
Ok(())
}
#[tokio::test]
async fn with_overlay_merges_added_labels_and_edge_types() -> Result<()> {
use crate::core::fork::SchemaDelta;
use chrono::Utc;
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let path = ObjectStorePath::from("schema.json");
let primary = SchemaManager::load_from_store(store, &path).await?;
primary.add_label("Existing")?;
let label_meta = LabelMeta {
id: 99,
created_at: Utc::now(),
state: SchemaElementState::Active,
description: None,
};
let edge_meta = EdgeTypeMeta {
id: 99,
src_labels: vec!["NewLabel".into()],
dst_labels: vec!["NewLabel".into()],
state: SchemaElementState::Active,
description: None,
};
let delta = SchemaDelta {
added_labels: vec![("NewLabel".to_string(), label_meta)],
added_edge_types: vec![("NewEdge".to_string(), edge_meta)],
added_properties: vec![],
};
let overlay = primary.with_overlay(&delta);
let merged = overlay.schema();
assert!(merged.labels.contains_key("Existing"));
assert!(merged.labels.contains_key("NewLabel"));
assert!(merged.edge_types.contains_key("NewEdge"));
assert!(!primary.schema().labels.contains_key("NewLabel"));
Ok(())
}
#[tokio::test]
async fn test_get_or_assign_edge_type_id_concurrent() -> Result<()> {
let dir = tempdir()?;
let store = Arc::new(LocalFileSystem::new_with_prefix(dir.path())?);
let path = ObjectStorePath::from("schema.json");
let manager = Arc::new(SchemaManager::load_from_store(store, &path).await?);
let mut handles = Vec::new();
for _ in 0..16 {
let m = manager.clone();
handles.push(std::thread::spawn(move || {
m.get_or_assign_edge_type_id("RACED")
}));
}
let ids: Vec<u32> = handles.into_iter().map(|h| h.join().unwrap()).collect();
assert!(
ids.iter().all(|&id| id == ids[0]),
"all racers must observe one id, got {ids:?}"
);
assert_eq!(manager.get_or_assign_edge_type_id("RACED"), ids[0]);
manager.add_label("A")?;
let declared = manager.add_edge_type("DECLARED", vec!["A".into()], vec!["A".into()])?;
assert_eq!(manager.get_or_assign_edge_type_id("DECLARED"), declared);
Ok(())
}
}