use std::collections::BTreeSet;
use selene_core::{DbString, HnswIndexConfig, IvfIndexConfig};
use serde::{Deserialize, Serialize};
use crate::{
core_provider::invalid_payload,
graph::SeleneGraph,
typed_index::TypedIndexKind,
vector_index::{MAX_IVF_TARGET_CENTROIDS, VectorIndexKind},
};
use super::codec::{decode_rkyv, encode_rkyv, ensure_section_within_cap, validate_sorted_unique};
#[derive(
Clone,
Copy,
Debug,
Deserialize,
Eq,
Ord,
PartialEq,
PartialOrd,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub enum SchemaEntityKind {
Node,
Edge,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
Ord,
PartialEq,
PartialOrd,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct SchemaKey {
pub entity: SchemaEntityKind,
pub label: DbString,
pub property: DbString,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
PartialEq,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct SchemaEntry {
pub kind: TypedIndexKind,
pub name: Option<DbString>,
}
pub(in crate::core_provider) const SCMA_VERSION: u8 = 3;
#[derive(
Clone,
Debug,
Deserialize,
Eq,
Ord,
PartialEq,
PartialOrd,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct CompositeSchemaKey {
pub label: DbString,
pub properties: Vec<DbString>,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
PartialEq,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct CompositeSchemaEntry {
pub kinds: Vec<TypedIndexKind>,
pub name: Option<DbString>,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
Ord,
PartialEq,
PartialOrd,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct VectorSchemaKey {
pub label: DbString,
pub property: DbString,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
PartialEq,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct VectorSchemaEntry {
pub kind: VectorIndexKind,
pub dimension: u32,
pub hnsw_config: Option<HnswIndexConfig>,
pub ivf_config: Option<IvfIndexConfig>,
pub name: Option<DbString>,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
Ord,
PartialEq,
PartialOrd,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct TextSchemaKey {
pub label: DbString,
pub property: DbString,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
PartialEq,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct TextSchemaEntry {
pub name: Option<DbString>,
}
pub(in crate::core_provider) fn encode_schemas(
graph: &SeleneGraph,
) -> Result<Vec<u8>, crate::ProviderError> {
let mut rows: Vec<(SchemaKey, SchemaEntry)> = graph
.property_index
.iter()
.map(|((label, property), entry)| {
(
SchemaKey {
entity: SchemaEntityKind::Node,
label: label.clone(),
property: property.clone(),
},
SchemaEntry {
kind: entry.kind(),
name: entry.name.clone(),
},
)
})
.collect();
rows.extend(
graph
.edge_property_index
.iter()
.map(|((label, property), entry)| {
(
SchemaKey {
entity: SchemaEntityKind::Edge,
label: label.clone(),
property: property.clone(),
},
SchemaEntry {
kind: entry.kind(),
name: entry.name.clone(),
},
)
}),
);
rows.sort_by(schema_wire_cmp);
let mut payload = Vec::with_capacity(1);
payload.push(SCMA_VERSION);
payload.extend(encode_rkyv(&rows, "CORE/SCMA")?);
ensure_section_within_cap("CORE/SCMA", payload.len())?;
Ok(payload)
}
pub(in crate::core_provider) fn decode_schemas(
bytes: &[u8],
) -> Result<Vec<(SchemaKey, SchemaEntry)>, crate::ProviderError> {
let Some((&version, rest)) = bytes.split_first() else {
return Err(invalid_payload(
"CORE/SCMA section is empty (missing version byte)".to_owned(),
));
};
if version != SCMA_VERSION {
return Err(invalid_payload(format!(
"CORE/SCMA section version {version} is unsupported (expected {SCMA_VERSION})"
)));
}
let mut rows: Vec<(SchemaKey, SchemaEntry)> = decode_rkyv(rest, "CORE/SCMA")?;
rows.sort_unstable_by(|(lhs, _), (rhs, _)| lhs.cmp(rhs));
validate_sorted_unique(&rows, "CORE/SCMA")?;
Ok(rows)
}
fn schema_wire_cmp<V>(lhs: &(SchemaKey, V), rhs: &(SchemaKey, V)) -> std::cmp::Ordering {
(lhs.0.entity, lhs.0.label.as_str(), lhs.0.property.as_str()).cmp(&(
rhs.0.entity,
rhs.0.label.as_str(),
rhs.0.property.as_str(),
))
}
pub(in crate::core_provider) fn encode_composite_schemas(
graph: &SeleneGraph,
) -> Result<Vec<u8>, crate::ProviderError> {
let mut rows: Vec<(CompositeSchemaKey, CompositeSchemaEntry)> = graph
.composite_property_index
.iter()
.map(|((label, _), entry)| {
(
CompositeSchemaKey {
label: label.clone(),
properties: entry.declared_properties.iter().cloned().collect(),
},
CompositeSchemaEntry {
kinds: entry.kinds().iter().copied().collect(),
name: entry.name.clone(),
},
)
})
.collect();
rows.sort_by(composite_schema_wire_cmp);
encode_rkyv(&rows, "CORE/CPIX")
}
pub(in crate::core_provider) fn decode_composite_schemas(
bytes: &[u8],
) -> Result<Vec<(CompositeSchemaKey, CompositeSchemaEntry)>, crate::ProviderError> {
let mut rows: Vec<(CompositeSchemaKey, CompositeSchemaEntry)> =
decode_rkyv(bytes, "CORE/CPIX")?;
validate_composite_schema_rows(&rows)?;
rows.sort_unstable_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
Ok(rows)
}
fn composite_schema_wire_cmp(
lhs: &(CompositeSchemaKey, CompositeSchemaEntry),
rhs: &(CompositeSchemaKey, CompositeSchemaEntry),
) -> std::cmp::Ordering {
lhs.0
.label
.as_str()
.cmp(rhs.0.label.as_str())
.then_with(|| {
lhs.0
.properties
.iter()
.map(|property| property.as_str())
.cmp(rhs.0.properties.iter().map(|property| property.as_str()))
})
}
pub(in crate::core_provider) fn encode_vector_schemas(
graph: &SeleneGraph,
) -> Result<Vec<u8>, crate::ProviderError> {
let mut rows: Vec<(VectorSchemaKey, VectorSchemaEntry)> = graph
.vector_index
.iter()
.map(|((label, property), entry)| {
(
VectorSchemaKey {
label: label.clone(),
property: property.clone(),
},
VectorSchemaEntry {
kind: entry.kind(),
dimension: entry.dimension(),
hnsw_config: entry.hnsw_config(),
ivf_config: entry.ivf_config(),
name: entry.name.clone(),
},
)
})
.collect();
rows.sort_by(vector_schema_wire_cmp);
encode_rkyv(&rows, "CORE/VIDX")
}
pub(in crate::core_provider) fn decode_vector_schemas(
bytes: &[u8],
) -> Result<Vec<(VectorSchemaKey, VectorSchemaEntry)>, crate::ProviderError> {
let mut rows: Vec<(VectorSchemaKey, VectorSchemaEntry)> = decode_rkyv(bytes, "CORE/VIDX")?;
rows.sort_unstable_by(|(lhs, _), (rhs, _)| lhs.cmp(rhs));
validate_vector_schema_rows(&rows)?;
Ok(rows)
}
pub(in crate::core_provider) fn encode_text_schemas(
graph: &SeleneGraph,
) -> Result<Vec<u8>, crate::ProviderError> {
let mut rows: Vec<(TextSchemaKey, TextSchemaEntry)> = graph
.text_index
.iter()
.map(|((label, property), entry)| {
(
TextSchemaKey {
label: label.clone(),
property: property.clone(),
},
TextSchemaEntry {
name: entry.name.clone(),
},
)
})
.collect();
rows.sort_by(text_schema_wire_cmp);
encode_rkyv(&rows, "CORE/TIDX")
}
pub(in crate::core_provider) fn decode_text_schemas(
bytes: &[u8],
) -> Result<Vec<(TextSchemaKey, TextSchemaEntry)>, crate::ProviderError> {
let mut rows: Vec<(TextSchemaKey, TextSchemaEntry)> = decode_rkyv(bytes, "CORE/TIDX")?;
rows.sort_unstable_by(|(lhs, _), (rhs, _)| lhs.cmp(rhs));
validate_sorted_unique(&rows, "CORE/TIDX")?;
Ok(rows)
}
fn vector_schema_wire_cmp(
lhs: &(VectorSchemaKey, VectorSchemaEntry),
rhs: &(VectorSchemaKey, VectorSchemaEntry),
) -> std::cmp::Ordering {
(lhs.0.label.as_str(), lhs.0.property.as_str())
.cmp(&(rhs.0.label.as_str(), rhs.0.property.as_str()))
}
fn text_schema_wire_cmp(
lhs: &(TextSchemaKey, TextSchemaEntry),
rhs: &(TextSchemaKey, TextSchemaEntry),
) -> std::cmp::Ordering {
(lhs.0.label.as_str(), lhs.0.property.as_str())
.cmp(&(rhs.0.label.as_str(), rhs.0.property.as_str()))
}
fn validate_composite_schema_rows(
rows: &[(CompositeSchemaKey, CompositeSchemaEntry)],
) -> Result<(), crate::ProviderError> {
let mut seen = BTreeSet::new();
for (key, entry) in rows {
if key.properties.len() < 2 {
return Err(invalid_payload(format!(
"CORE/CPIX row for label {} has fewer than two properties",
key.label
)));
}
if key.properties.len() != entry.kinds.len() {
return Err(invalid_payload(format!(
"CORE/CPIX row for label {} has {} properties but {} kinds",
key.label,
key.properties.len(),
entry.kinds.len()
)));
}
let mut canonical = key.properties.clone();
canonical.sort_unstable();
if canonical.windows(2).any(|pair| pair[0] == pair[1]) {
return Err(invalid_payload(format!(
"CORE/CPIX row for label {} repeats a property",
key.label
)));
}
if !seen.insert((key.label.clone(), canonical)) {
return Err(invalid_payload(format!(
"CORE/CPIX rows contain duplicate composite registration for label {}",
key.label
)));
}
}
Ok(())
}
fn validate_vector_schema_rows(
rows: &[(VectorSchemaKey, VectorSchemaEntry)],
) -> Result<(), crate::ProviderError> {
validate_sorted_unique(rows, "CORE/VIDX")?;
for (key, entry) in rows {
if entry.dimension == 0 {
return Err(invalid_payload(format!(
"CORE/VIDX row for ({}, {}) has zero vector dimension",
key.label, key.property
)));
}
if entry.kind.hnsw_metric().is_some() != entry.hnsw_config.is_some() {
return Err(invalid_payload(format!(
"CORE/VIDX row for ({}, {}) has inconsistent HNSW config",
key.label, key.property
)));
}
if entry.kind.ivf_metric().is_some() {
if let Some(config) = entry.ivf_config
&& (config.target_centroids == 0
|| config.target_centroids > MAX_IVF_TARGET_CENTROIDS)
{
return Err(invalid_payload(format!(
"CORE/VIDX row for ({}, {}) has invalid IVF config",
key.label, key.property
)));
}
} else if entry.ivf_config.is_some() {
return Err(invalid_payload(format!(
"CORE/VIDX row for ({}, {}) has inconsistent IVF config",
key.label, key.property
)));
}
if let Some(config) = entry.hnsw_config
&& (config.max_neighbors == 0
|| config.ef_construction == 0
|| config.ef_construction < config.max_neighbors)
{
return Err(invalid_payload(format!(
"CORE/VIDX row for ({}, {}) has invalid HNSW config",
key.label, key.property
)));
}
}
Ok(())
}