use std::sync::Arc;
use rkyv::{
Place,
rancor::Fallible,
ser::{Allocator, Writer},
vec::{ArchivedVec, VecResolver},
with::{ArchiveWith, DeserializeWith, SerializeWith},
};
use selene_core::{DbString, EdgeId, LabelSet, NodeId, PropertyMap};
use serde::{Deserialize, Serialize};
use crate::{
core_provider::inconsistent,
graph::{GraphMeta, SeleneGraph},
};
use super::codec::{
decode_properties_blob, decode_rkyv, encode_properties_blob, encode_rkyv, validate_ids_unique,
};
struct ArcBytes;
impl ArchiveWith<Arc<[u8]>> for ArcBytes {
type Archived = ArchivedVec<u8>;
type Resolver = VecResolver;
fn resolve_with(field: &Arc<[u8]>, resolver: Self::Resolver, out: Place<Self::Archived>) {
ArchivedVec::resolve_from_slice(field.as_ref(), resolver, out);
}
}
impl<S> SerializeWith<Arc<[u8]>, S> for ArcBytes
where
S: Fallible + Allocator + Writer + ?Sized,
{
fn serialize_with(field: &Arc<[u8]>, serializer: &mut S) -> Result<Self::Resolver, S::Error> {
ArchivedVec::serialize_from_slice(field.as_ref(), serializer)
}
}
impl<D> DeserializeWith<ArchivedVec<u8>, Arc<[u8]>, D> for ArcBytes
where
D: Fallible + ?Sized,
{
fn deserialize_with(
field: &ArchivedVec<u8>,
_deserializer: &mut D,
) -> Result<Arc<[u8]>, D::Error> {
Ok(Arc::from(field.as_slice()))
}
}
#[derive(
Clone,
Debug,
Deserialize,
Eq,
PartialEq,
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Serialize,
)]
pub struct MetaPayload {
pub graph_id: selene_core::GraphId,
pub generation: u64,
pub next_node_id: u64,
pub next_edge_id: u64,
pub bound_type_index: Option<u32>,
pub sequence: u64,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct NodeRow {
pub labels: LabelSet,
pub properties: PropertyMap,
pub alive: bool,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct EdgeRow {
pub label: DbString,
pub source: NodeId,
pub target: NodeId,
pub properties: PropertyMap,
pub alive: bool,
}
#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize)]
struct NodeArchiveRow {
labels: LabelSet,
#[rkyv(with = ArcBytes)]
properties_blob: Arc<[u8]>,
alive: bool,
}
impl NodeArchiveRow {
fn from_runtime(row: NodeRow, section: &'static str) -> Result<Self, crate::ProviderError> {
Ok(Self {
labels: row.labels,
properties_blob: encode_properties_blob(&row.properties, section)?,
alive: row.alive,
})
}
fn into_runtime(self, section: &'static str) -> Result<NodeRow, crate::ProviderError> {
Ok(NodeRow {
labels: self.labels,
properties: decode_properties_blob(&self.properties_blob, section)?,
alive: self.alive,
})
}
}
#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize)]
struct EdgeArchiveRow {
label: DbString,
source: NodeId,
target: NodeId,
#[rkyv(with = ArcBytes)]
properties_blob: Arc<[u8]>,
alive: bool,
}
impl EdgeArchiveRow {
fn from_runtime(row: EdgeRow, section: &'static str) -> Result<Self, crate::ProviderError> {
Ok(Self {
label: row.label,
source: row.source,
target: row.target,
properties_blob: encode_properties_blob(&row.properties, section)?,
alive: row.alive,
})
}
fn into_runtime(self, section: &'static str) -> Result<EdgeRow, crate::ProviderError> {
Ok(EdgeRow {
label: self.label,
source: self.source,
target: self.target,
properties: decode_properties_blob(&self.properties_blob, section)?,
alive: self.alive,
})
}
}
pub(in crate::core_provider) fn encode_meta(
meta: &GraphMeta,
sequence: u64,
) -> Result<Vec<u8>, crate::ProviderError> {
encode_rkyv(
&MetaPayload {
graph_id: meta.graph_id,
generation: meta.generation,
next_node_id: meta.next_node_id,
next_edge_id: meta.next_edge_id,
bound_type_index: meta.bound_type.as_ref().map(|_| 0),
sequence,
},
"CORE/META",
)
}
pub(in crate::core_provider) fn decode_meta(
bytes: &[u8],
) -> Result<MetaPayload, crate::ProviderError> {
decode_rkyv(bytes, "CORE/META")
}
pub(in crate::core_provider) fn encode_nodes(
graph: &SeleneGraph,
) -> Result<Vec<u8>, crate::ProviderError> {
let mut rows = Vec::with_capacity(graph.node_store.len());
for row_index in 0..graph.node_store.len() {
let row = u32::try_from(row_index).map_err(|_| {
inconsistent(format!(
"node row index {row_index} exceeds u32::MAX; core snapshot sections use v1 row indexes"
))
})?;
let labels =
graph.node_store.labels.get(row_index).ok_or_else(|| {
inconsistent(format!("node labels column missing row {row_index}"))
})?;
let properties = graph.node_store.properties.get(row_index).ok_or_else(|| {
inconsistent(format!("node properties column missing row {row_index}"))
})?;
let runtime = NodeRow {
labels: labels.clone(),
properties: properties.clone(),
alive: graph.node_store.is_alive(row),
};
let id = graph
.node_store
.row_to_id
.get(row_index)
.copied()
.ok_or_else(|| {
inconsistent(format!("node row_to_id column missing row {row_index}"))
})?;
rows.push((id, NodeArchiveRow::from_runtime(runtime, "CORE/NODE")?));
}
encode_rkyv(&rows, "CORE/NODE")
}
pub(in crate::core_provider) fn decode_nodes(
bytes: &[u8],
) -> Result<Vec<(NodeId, NodeRow)>, crate::ProviderError> {
let rows: Vec<(NodeId, NodeArchiveRow)> = decode_rkyv(bytes, "CORE/NODE")?;
validate_ids_unique(&rows, NodeId::TOMBSTONE, "CORE/NODE")?;
rows.into_iter()
.map(|(id, row)| row.into_runtime("CORE/NODE").map(|row| (id, row)))
.collect()
}
pub(in crate::core_provider) fn encode_edges(
graph: &SeleneGraph,
) -> Result<Vec<u8>, crate::ProviderError> {
let mut rows = Vec::with_capacity(graph.edge_store.len());
for row_index in 0..graph.edge_store.len() {
let row = u32::try_from(row_index).map_err(|_| {
inconsistent(format!(
"edge row index {row_index} exceeds u32::MAX; core snapshot sections use v1 row indexes"
))
})?;
let label =
graph.edge_store.label.get(row_index).ok_or_else(|| {
inconsistent(format!("edge label column missing row {row_index}"))
})?;
let source =
graph.edge_store.source.get(row_index).ok_or_else(|| {
inconsistent(format!("edge source column missing row {row_index}"))
})?;
let target =
graph.edge_store.target.get(row_index).ok_or_else(|| {
inconsistent(format!("edge target column missing row {row_index}"))
})?;
let properties = graph.edge_store.properties.get(row_index).ok_or_else(|| {
inconsistent(format!("edge properties column missing row {row_index}"))
})?;
let runtime = EdgeRow {
label: label.clone(),
source: *source,
target: *target,
properties: properties.clone(),
alive: graph.edge_store.is_alive(row),
};
let id = graph
.edge_store
.row_to_id
.get(row_index)
.copied()
.ok_or_else(|| {
inconsistent(format!("edge row_to_id column missing row {row_index}"))
})?;
rows.push((id, EdgeArchiveRow::from_runtime(runtime, "CORE/EDGE")?));
}
encode_rkyv(&rows, "CORE/EDGE")
}
pub(in crate::core_provider) fn decode_edges(
bytes: &[u8],
) -> Result<Vec<(EdgeId, EdgeRow)>, crate::ProviderError> {
let rows: Vec<(EdgeId, EdgeArchiveRow)> = decode_rkyv(bytes, "CORE/EDGE")?;
validate_ids_unique(&rows, EdgeId::TOMBSTONE, "CORE/EDGE")?;
rows.into_iter()
.map(|(id, row)| row.into_runtime("CORE/EDGE").map(|row| (id, row)))
.collect()
}