use std::collections::BTreeSet;
use std::sync::Arc;
use rkyv::{
Place,
rancor::Fallible,
ser::{Allocator, Writer},
vec::{ArchivedVec, VecResolver},
with::{ArchiveWith, DeserializeWith, SerializeWith},
};
use selene_core::{
DbString, EdgeId, HnswIndexConfig, IvfIndexConfig, LabelSet, NodeId, PropertyMap,
};
use serde::{Deserialize, Serialize};
use crate::core_provider::{inconsistent, invalid_payload};
use crate::graph::{GraphMeta, SeleneGraph};
use crate::typed_index::TypedIndexKind;
use crate::vector_index::{MAX_IVF_TARGET_CENTROIDS, VectorIndexKind};
mod codec;
mod gtyp;
use codec::{
decode_properties_blob, decode_rkyv, encode_properties_blob, encode_rkyv, validate_ids_unique,
validate_sorted_unique,
};
pub(super) use codec::ensure_section_within_cap;
pub(super) use gtyp::{decode_graph_types, encode_graph_types};
struct ArcBytes;
impl ArchiveWith<Arc<[u8]>> for ArcBytes {
type Archived = ArchivedVec<u8>;
type Resolver = VecResolver;
fn resolve_with(field: &Arc<[u8]>, resolver: Self::Resolver, out: Place<Self::Archived>) {
ArchivedVec::resolve_from_slice(field.as_ref(), resolver, out);
}
}
impl<S> SerializeWith<Arc<[u8]>, S> for ArcBytes
where
S: Fallible + Allocator + Writer + ?Sized,
{
fn serialize_with(field: &Arc<[u8]>, serializer: &mut S) -> Result<Self::Resolver, S::Error> {
ArchivedVec::serialize_from_slice(field.as_ref(), serializer)
}
}
impl<D> DeserializeWith<ArchivedVec<u8>, Arc<[u8]>, D> for ArcBytes
where
D: Fallible + ?Sized,
{
fn deserialize_with(
field: &ArchivedVec<u8>,
_deserializer: &mut D,
) -> Result<Arc<[u8]>, D::Error> {
Ok(Arc::from(field.as_slice()))
}
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
PartialEq,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct MetaPayload {
pub graph_id: selene_core::GraphId,
pub generation: u64,
pub next_node_id: u64,
pub next_edge_id: u64,
pub bound_type_index: Option<u32>,
pub sequence: u64,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct NodeRow {
pub labels: LabelSet,
pub properties: PropertyMap,
pub alive: bool,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct EdgeRow {
pub label: DbString,
pub source: NodeId,
pub target: NodeId,
pub properties: PropertyMap,
pub alive: bool,
}
#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize)]
struct NodeArchiveRow {
labels: LabelSet,
#[rkyv(with = ArcBytes)]
properties_blob: Arc<[u8]>,
alive: bool,
}
impl NodeArchiveRow {
fn from_runtime(row: NodeRow, section: &'static str) -> Result<Self, crate::ProviderError> {
Ok(Self {
labels: row.labels,
properties_blob: encode_properties_blob(&row.properties, section)?,
alive: row.alive,
})
}
fn into_runtime(self, section: &'static str) -> Result<NodeRow, crate::ProviderError> {
Ok(NodeRow {
labels: self.labels,
properties: decode_properties_blob(&self.properties_blob, section)?,
alive: self.alive,
})
}
}
#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize)]
struct EdgeArchiveRow {
label: DbString,
source: NodeId,
target: NodeId,
#[rkyv(with = ArcBytes)]
properties_blob: Arc<[u8]>,
alive: bool,
}
impl EdgeArchiveRow {
fn from_runtime(row: EdgeRow, section: &'static str) -> Result<Self, crate::ProviderError> {
Ok(Self {
label: row.label,
source: row.source,
target: row.target,
properties_blob: encode_properties_blob(&row.properties, section)?,
alive: row.alive,
})
}
fn into_runtime(self, section: &'static str) -> Result<EdgeRow, crate::ProviderError> {
Ok(EdgeRow {
label: self.label,
source: self.source,
target: self.target,
properties: decode_properties_blob(&self.properties_blob, section)?,
alive: self.alive,
})
}
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
Ord,
PartialEq,
PartialOrd,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct SchemaKey {
pub label: DbString,
pub property: DbString,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
PartialEq,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct SchemaEntry {
pub kind: TypedIndexKind,
pub name: Option<DbString>,
}
pub(super) const SCMA_VERSION: u8 = 2;
#[derive(
Clone,
Debug,
Deserialize,
Eq,
Ord,
PartialEq,
PartialOrd,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct CompositeSchemaKey {
pub label: DbString,
pub properties: Vec<DbString>,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
PartialEq,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct CompositeSchemaEntry {
pub kinds: Vec<TypedIndexKind>,
pub name: Option<DbString>,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
Ord,
PartialEq,
PartialOrd,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct VectorSchemaKey {
pub label: DbString,
pub property: DbString,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
PartialEq,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct VectorSchemaEntry {
pub kind: VectorIndexKind,
pub dimension: u32,
pub hnsw_config: Option<HnswIndexConfig>,
pub ivf_config: Option<IvfIndexConfig>,
pub name: Option<DbString>,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
Ord,
PartialEq,
PartialOrd,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct TextSchemaKey {
pub label: DbString,
pub property: DbString,
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
PartialEq,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct TextSchemaEntry {
pub name: Option<DbString>,
}
pub(super) fn encode_meta(
meta: &GraphMeta,
sequence: u64,
) -> Result<Vec<u8>, crate::ProviderError> {
encode_rkyv(
&MetaPayload {
graph_id: meta.graph_id,
generation: meta.generation,
next_node_id: meta.next_node_id,
next_edge_id: meta.next_edge_id,
bound_type_index: meta.bound_type.as_ref().map(|_| 0),
sequence,
},
"CORE/META",
)
}
pub(super) fn decode_meta(bytes: &[u8]) -> Result<MetaPayload, crate::ProviderError> {
decode_rkyv(bytes, "CORE/META")
}
pub(super) fn encode_nodes(graph: &SeleneGraph) -> Result<Vec<u8>, crate::ProviderError> {
let mut rows = Vec::with_capacity(graph.node_store.len());
for row_index in 0..graph.node_store.len() {
let row = u32::try_from(row_index).map_err(|_| {
inconsistent(format!(
"node row index {row_index} exceeds u32::MAX; core snapshot sections use v1 row indexes"
))
})?;
let labels =
graph.node_store.labels.get(row_index).ok_or_else(|| {
inconsistent(format!("node labels column missing row {row_index}"))
})?;
let properties = graph.node_store.properties.get(row_index).ok_or_else(|| {
inconsistent(format!("node properties column missing row {row_index}"))
})?;
let runtime = NodeRow {
labels: labels.clone(),
properties: properties.clone(),
alive: graph.node_store.is_alive(row),
};
let id = graph
.node_store
.row_to_id
.get(row_index)
.copied()
.ok_or_else(|| {
inconsistent(format!("node row_to_id column missing row {row_index}"))
})?;
rows.push((id, NodeArchiveRow::from_runtime(runtime, "CORE/NODE")?));
}
encode_rkyv(&rows, "CORE/NODE")
}
pub(super) fn decode_nodes(bytes: &[u8]) -> Result<Vec<(NodeId, NodeRow)>, crate::ProviderError> {
let rows: Vec<(NodeId, NodeArchiveRow)> = decode_rkyv(bytes, "CORE/NODE")?;
validate_ids_unique(&rows, NodeId::TOMBSTONE, "CORE/NODE")?;
rows.into_iter()
.map(|(id, row)| row.into_runtime("CORE/NODE").map(|row| (id, row)))
.collect()
}
pub(super) fn encode_edges(graph: &SeleneGraph) -> Result<Vec<u8>, crate::ProviderError> {
let mut rows = Vec::with_capacity(graph.edge_store.len());
for row_index in 0..graph.edge_store.len() {
let row = u32::try_from(row_index).map_err(|_| {
inconsistent(format!(
"edge row index {row_index} exceeds u32::MAX; core snapshot sections use v1 row indexes"
))
})?;
let label =
graph.edge_store.label.get(row_index).ok_or_else(|| {
inconsistent(format!("edge label column missing row {row_index}"))
})?;
let source =
graph.edge_store.source.get(row_index).ok_or_else(|| {
inconsistent(format!("edge source column missing row {row_index}"))
})?;
let target =
graph.edge_store.target.get(row_index).ok_or_else(|| {
inconsistent(format!("edge target column missing row {row_index}"))
})?;
let properties = graph.edge_store.properties.get(row_index).ok_or_else(|| {
inconsistent(format!("edge properties column missing row {row_index}"))
})?;
let runtime = EdgeRow {
label: label.clone(),
source: *source,
target: *target,
properties: properties.clone(),
alive: graph.edge_store.is_alive(row),
};
let id = graph
.edge_store
.row_to_id
.get(row_index)
.copied()
.ok_or_else(|| {
inconsistent(format!("edge row_to_id column missing row {row_index}"))
})?;
rows.push((id, EdgeArchiveRow::from_runtime(runtime, "CORE/EDGE")?));
}
encode_rkyv(&rows, "CORE/EDGE")
}
pub(super) fn decode_edges(bytes: &[u8]) -> Result<Vec<(EdgeId, EdgeRow)>, crate::ProviderError> {
let rows: Vec<(EdgeId, EdgeArchiveRow)> = decode_rkyv(bytes, "CORE/EDGE")?;
validate_ids_unique(&rows, EdgeId::TOMBSTONE, "CORE/EDGE")?;
rows.into_iter()
.map(|(id, row)| row.into_runtime("CORE/EDGE").map(|row| (id, row)))
.collect()
}
pub(super) fn encode_schemas(graph: &SeleneGraph) -> Result<Vec<u8>, crate::ProviderError> {
let mut rows: Vec<(SchemaKey, SchemaEntry)> = graph
.property_index
.iter()
.map(|((label, property), entry)| {
(
SchemaKey {
label: label.clone(),
property: property.clone(),
},
SchemaEntry {
kind: entry.kind(),
name: entry.name.clone(),
},
)
})
.collect();
rows.sort_by(schema_wire_cmp);
let mut payload = Vec::with_capacity(1);
payload.push(SCMA_VERSION);
payload.extend(encode_rkyv(&rows, "CORE/SCMA")?);
ensure_section_within_cap("CORE/SCMA", payload.len())?;
Ok(payload)
}
pub(super) fn decode_schemas(
bytes: &[u8],
) -> Result<Vec<(SchemaKey, SchemaEntry)>, crate::ProviderError> {
let Some((&version, rest)) = bytes.split_first() else {
return Err(invalid_payload(
"CORE/SCMA section is empty (missing version byte)".to_owned(),
));
};
if version != SCMA_VERSION {
return Err(invalid_payload(format!(
"CORE/SCMA section version {version} is unsupported (expected {SCMA_VERSION})"
)));
}
let mut rows: Vec<(SchemaKey, SchemaEntry)> = decode_rkyv(rest, "CORE/SCMA")?;
rows.sort_unstable_by(|(lhs, _), (rhs, _)| lhs.cmp(rhs));
validate_sorted_unique(&rows, "CORE/SCMA")?;
Ok(rows)
}
fn schema_wire_cmp<V>(lhs: &(SchemaKey, V), rhs: &(SchemaKey, V)) -> std::cmp::Ordering {
(lhs.0.label.as_str(), lhs.0.property.as_str())
.cmp(&(rhs.0.label.as_str(), rhs.0.property.as_str()))
}
pub(super) fn encode_composite_schemas(
graph: &SeleneGraph,
) -> Result<Vec<u8>, crate::ProviderError> {
let mut rows: Vec<(CompositeSchemaKey, CompositeSchemaEntry)> = graph
.composite_property_index
.iter()
.map(|((label, _), entry)| {
(
CompositeSchemaKey {
label: label.clone(),
properties: entry.declared_properties.iter().cloned().collect(),
},
CompositeSchemaEntry {
kinds: entry.kinds().iter().copied().collect(),
name: entry.name.clone(),
},
)
})
.collect();
rows.sort_by(composite_schema_wire_cmp);
encode_rkyv(&rows, "CORE/CPIX")
}
pub(super) fn decode_composite_schemas(
bytes: &[u8],
) -> Result<Vec<(CompositeSchemaKey, CompositeSchemaEntry)>, crate::ProviderError> {
let mut rows: Vec<(CompositeSchemaKey, CompositeSchemaEntry)> =
decode_rkyv(bytes, "CORE/CPIX")?;
validate_composite_schema_rows(&rows)?;
rows.sort_unstable_by(|lhs, rhs| lhs.0.cmp(&rhs.0));
Ok(rows)
}
fn composite_schema_wire_cmp(
lhs: &(CompositeSchemaKey, CompositeSchemaEntry),
rhs: &(CompositeSchemaKey, CompositeSchemaEntry),
) -> std::cmp::Ordering {
lhs.0
.label
.as_str()
.cmp(rhs.0.label.as_str())
.then_with(|| {
lhs.0
.properties
.iter()
.map(|property| property.as_str())
.cmp(rhs.0.properties.iter().map(|property| property.as_str()))
})
}
pub(super) fn encode_vector_schemas(graph: &SeleneGraph) -> Result<Vec<u8>, crate::ProviderError> {
let mut rows: Vec<(VectorSchemaKey, VectorSchemaEntry)> = graph
.vector_index
.iter()
.map(|((label, property), entry)| {
(
VectorSchemaKey {
label: label.clone(),
property: property.clone(),
},
VectorSchemaEntry {
kind: entry.kind(),
dimension: entry.dimension(),
hnsw_config: entry.hnsw_config(),
ivf_config: entry.ivf_config(),
name: entry.name.clone(),
},
)
})
.collect();
rows.sort_by(vector_schema_wire_cmp);
encode_rkyv(&rows, "CORE/VIDX")
}
pub(super) fn decode_vector_schemas(
bytes: &[u8],
) -> Result<Vec<(VectorSchemaKey, VectorSchemaEntry)>, crate::ProviderError> {
let mut rows: Vec<(VectorSchemaKey, VectorSchemaEntry)> = decode_rkyv(bytes, "CORE/VIDX")?;
rows.sort_unstable_by(|(lhs, _), (rhs, _)| lhs.cmp(rhs));
validate_vector_schema_rows(&rows)?;
Ok(rows)
}
pub(super) fn encode_text_schemas(graph: &SeleneGraph) -> Result<Vec<u8>, crate::ProviderError> {
let mut rows: Vec<(TextSchemaKey, TextSchemaEntry)> = graph
.text_index
.iter()
.map(|((label, property), entry)| {
(
TextSchemaKey {
label: label.clone(),
property: property.clone(),
},
TextSchemaEntry {
name: entry.name.clone(),
},
)
})
.collect();
rows.sort_by(text_schema_wire_cmp);
encode_rkyv(&rows, "CORE/TIDX")
}
pub(super) fn decode_text_schemas(
bytes: &[u8],
) -> Result<Vec<(TextSchemaKey, TextSchemaEntry)>, crate::ProviderError> {
let mut rows: Vec<(TextSchemaKey, TextSchemaEntry)> = decode_rkyv(bytes, "CORE/TIDX")?;
rows.sort_unstable_by(|(lhs, _), (rhs, _)| lhs.cmp(rhs));
validate_sorted_unique(&rows, "CORE/TIDX")?;
Ok(rows)
}
fn vector_schema_wire_cmp(
lhs: &(VectorSchemaKey, VectorSchemaEntry),
rhs: &(VectorSchemaKey, VectorSchemaEntry),
) -> std::cmp::Ordering {
(lhs.0.label.as_str(), lhs.0.property.as_str())
.cmp(&(rhs.0.label.as_str(), rhs.0.property.as_str()))
}
fn text_schema_wire_cmp(
lhs: &(TextSchemaKey, TextSchemaEntry),
rhs: &(TextSchemaKey, TextSchemaEntry),
) -> std::cmp::Ordering {
(lhs.0.label.as_str(), lhs.0.property.as_str())
.cmp(&(rhs.0.label.as_str(), rhs.0.property.as_str()))
}
fn validate_composite_schema_rows(
rows: &[(CompositeSchemaKey, CompositeSchemaEntry)],
) -> Result<(), crate::ProviderError> {
let mut seen = BTreeSet::new();
for (key, entry) in rows {
if key.properties.len() < 2 {
return Err(invalid_payload(format!(
"CORE/CPIX row for label {} has fewer than two properties",
key.label
)));
}
if key.properties.len() != entry.kinds.len() {
return Err(invalid_payload(format!(
"CORE/CPIX row for label {} has {} properties but {} kinds",
key.label,
key.properties.len(),
entry.kinds.len()
)));
}
let mut canonical = key.properties.clone();
canonical.sort_unstable();
if canonical.windows(2).any(|pair| pair[0] == pair[1]) {
return Err(invalid_payload(format!(
"CORE/CPIX row for label {} repeats a property",
key.label
)));
}
if !seen.insert((key.label.clone(), canonical)) {
return Err(invalid_payload(format!(
"CORE/CPIX rows contain duplicate composite registration for label {}",
key.label
)));
}
}
Ok(())
}
fn validate_vector_schema_rows(
rows: &[(VectorSchemaKey, VectorSchemaEntry)],
) -> Result<(), crate::ProviderError> {
validate_sorted_unique(rows, "CORE/VIDX")?;
for (key, entry) in rows {
if entry.dimension == 0 {
return Err(invalid_payload(format!(
"CORE/VIDX row for ({}, {}) has zero vector dimension",
key.label, key.property
)));
}
if entry.kind.hnsw_metric().is_some() != entry.hnsw_config.is_some() {
return Err(invalid_payload(format!(
"CORE/VIDX row for ({}, {}) has inconsistent HNSW config",
key.label, key.property
)));
}
if entry.kind.ivf_metric().is_some() {
if let Some(config) = entry.ivf_config
&& (config.target_centroids == 0
|| config.target_centroids > MAX_IVF_TARGET_CENTROIDS)
{
return Err(invalid_payload(format!(
"CORE/VIDX row for ({}, {}) has invalid IVF config",
key.label, key.property
)));
}
} else if entry.ivf_config.is_some() {
return Err(invalid_payload(format!(
"CORE/VIDX row for ({}, {}) has inconsistent IVF config",
key.label, key.property
)));
}
if let Some(config) = entry.hnsw_config
&& (config.max_neighbors == 0
|| config.ef_construction == 0
|| config.ef_construction < config.max_neighbors)
{
return Err(invalid_payload(format!(
"CORE/VIDX row for ({}, {}) has invalid HNSW config",
key.label, key.property
)));
}
}
Ok(())
}