use crate::constants::*;
use crate::error::{KiteError, Result};
use crate::types::*;
use crate::util::binary::*;
use crate::util::compression::{maybe_compress, CompressionOptions, CompressionType};
use crate::util::crc::crc32c;
use crate::util::hash::xxhash64_string;
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct NodeData {
pub node_id: NodeId,
pub key: Option<String>,
pub labels: Vec<LabelId>,
pub props: HashMap<PropKeyId, PropValue>,
}
#[derive(Debug, Clone)]
pub struct EdgeData {
pub src: NodeId,
pub etype: ETypeId,
pub dst: NodeId,
pub props: HashMap<PropKeyId, PropValue>,
}
#[derive(Debug)]
pub struct SnapshotBuildInput {
pub generation: u64,
pub nodes: Vec<NodeData>,
pub edges: Vec<EdgeData>,
pub labels: HashMap<LabelId, String>,
pub etypes: HashMap<ETypeId, String>,
pub propkeys: HashMap<PropKeyId, String>,
pub compression: Option<CompressionOptions>,
}
struct StringTable {
strings: Vec<String>,
string_to_id: HashMap<String, StringId>,
}
impl StringTable {
fn new() -> Self {
let mut table = Self {
strings: vec![String::new()], string_to_id: HashMap::new(),
};
table.string_to_id.insert(String::new(), 0);
table
}
fn intern(&mut self, s: &str) -> StringId {
if let Some(&id) = self.string_to_id.get(s) {
return id;
}
let id = self.strings.len() as StringId;
self.strings.push(s.to_string());
self.string_to_id.insert(s.to_string(), id);
id
}
fn len(&self) -> usize {
self.strings.len()
}
}
struct CSRData {
offsets: Vec<u32>,
dst: Vec<u32>,
etype: Vec<u32>,
out_index: Option<Vec<u32>>,
}
fn build_out_edges_csr(
nodes: &[NodeData],
edges: &[EdgeData],
node_id_to_phys: &HashMap<NodeId, PhysNode>,
) -> CSRData {
let num_nodes = nodes.len();
let num_edges = edges.len();
let mut counts = vec![0u32; num_nodes];
for edge in edges {
if let Some(&src_phys) = node_id_to_phys.get(&edge.src) {
counts[src_phys as usize] += 1;
}
}
let mut offsets = vec![0u32; num_nodes + 1];
for i in 0..num_nodes {
offsets[i + 1] = offsets[i] + counts[i];
}
let mut dst_arr = vec![0u32; num_edges];
let mut etype_arr = vec![0u32; num_edges];
let mut edges_by_node: HashMap<PhysNode, Vec<(ETypeId, PhysNode)>> = HashMap::new();
for edge in edges {
if let (Some(&src_phys), Some(&dst_phys)) = (
node_id_to_phys.get(&edge.src),
node_id_to_phys.get(&edge.dst),
) {
edges_by_node
.entry(src_phys)
.or_default()
.push((edge.etype, dst_phys));
}
}
for (src_phys, mut node_edges) in edges_by_node {
node_edges.sort_by(|a, b| {
if a.0 != b.0 {
a.0.cmp(&b.0)
} else {
a.1.cmp(&b.1)
}
});
let mut pos = offsets[src_phys as usize] as usize;
for (etype, dst_phys) in node_edges {
dst_arr[pos] = dst_phys;
etype_arr[pos] = etype;
pos += 1;
}
}
CSRData {
offsets,
dst: dst_arr,
etype: etype_arr,
out_index: None,
}
}
fn build_in_edges_csr(nodes: &[NodeData], out_csr: &CSRData) -> CSRData {
let num_nodes = nodes.len();
let num_edges = out_csr.dst.len();
let mut counts = vec![0u32; num_nodes];
for &dst in &out_csr.dst {
counts[dst as usize] += 1;
}
let mut offsets = vec![0u32; num_nodes + 1];
for i in 0..num_nodes {
offsets[i + 1] = offsets[i] + counts[i];
}
let mut src_arr = vec![0u32; num_edges];
let mut etype_arr = vec![0u32; num_edges];
let mut out_index = vec![0u32; num_edges];
let mut in_edges_by_node: HashMap<PhysNode, Vec<(PhysNode, ETypeId, u32)>> = HashMap::new();
for src_phys in 0..num_nodes {
let start = out_csr.offsets[src_phys] as usize;
let end = out_csr.offsets[src_phys + 1] as usize;
for out_idx in start..end {
let dst_phys = out_csr.dst[out_idx];
let edge_etype = out_csr.etype[out_idx];
in_edges_by_node.entry(dst_phys).or_default().push((
src_phys as PhysNode,
edge_etype,
out_idx as u32,
));
}
}
for (dst_phys, mut node_in_edges) in in_edges_by_node {
node_in_edges.sort_by(|a, b| {
if a.1 != b.1 {
a.1.cmp(&b.1)
} else {
a.0.cmp(&b.0)
}
});
let mut pos = offsets[dst_phys as usize] as usize;
for (src_phys, etype, out_idx) in node_in_edges {
src_arr[pos] = src_phys;
etype_arr[pos] = etype;
out_index[pos] = out_idx;
pos += 1;
}
}
CSRData {
offsets,
dst: src_arr, etype: etype_arr,
out_index: Some(out_index),
}
}
struct KeyEntry {
hash64: u64,
string_id: StringId,
node_id: NodeId,
}
struct KeyIndexData {
entries: Vec<KeyEntry>,
buckets: Vec<u32>,
}
fn build_key_index(nodes: &[NodeData], node_key_strings: &[StringId]) -> KeyIndexData {
let mut raw_entries: Vec<KeyEntry> = Vec::new();
for (i, node) in nodes.iter().enumerate() {
if let Some(ref key) = node.key {
let string_id = node_key_strings[i];
raw_entries.push(KeyEntry {
hash64: xxhash64_string(key),
string_id,
node_id: node.node_id,
});
}
}
let num_buckets = std::cmp::max(16, raw_entries.len() * 2);
let mut buckets = vec![0u32; num_buckets + 1];
if raw_entries.is_empty() {
return KeyIndexData {
entries: raw_entries,
buckets,
};
}
let num_buckets_u64 = num_buckets as u64;
raw_entries.sort_by(|a, b| {
let a_bucket = (a.hash64 % num_buckets_u64) as usize;
let b_bucket = (b.hash64 % num_buckets_u64) as usize;
if a_bucket != b_bucket {
return a_bucket.cmp(&b_bucket);
}
if a.hash64 != b.hash64 {
return a.hash64.cmp(&b.hash64);
}
if a.string_id != b.string_id {
return a.string_id.cmp(&b.string_id);
}
a.node_id.cmp(&b.node_id)
});
let mut counts = vec![0u32; num_buckets];
for entry in &raw_entries {
let bucket = (entry.hash64 % num_buckets_u64) as usize;
counts[bucket] += 1;
}
for i in 0..num_buckets {
buckets[i + 1] = buckets[i] + counts[i];
}
KeyIndexData {
entries: raw_entries,
buckets,
}
}
struct VectorTable {
offsets: Vec<u64>,
data: Vec<u8>,
}
impl VectorTable {
fn new() -> Self {
Self {
offsets: vec![0],
data: Vec::new(),
}
}
fn push(&mut self, vec: &[f32]) -> u64 {
for v in vec {
self.data.extend_from_slice(&v.to_le_bytes());
}
let offset = self.data.len() as u64;
self.offsets.push(offset);
(self.offsets.len() - 2) as u64
}
fn is_empty(&self) -> bool {
self.offsets.len() <= 1
}
}
fn encode_prop_value(
value: &PropValue,
string_table: &StringTable,
vectors: &mut VectorTable,
) -> (u8, u64) {
match value {
PropValue::Null => (PropValueTag::Null as u8, 0),
PropValue::Bool(b) => (PropValueTag::Bool as u8, if *b { 1 } else { 0 }),
PropValue::I64(v) => (PropValueTag::I64 as u8, *v as u64),
PropValue::F64(v) => (PropValueTag::F64 as u8, v.to_bits()),
PropValue::String(s) => {
let string_id = string_table.string_to_id.get(s).copied().unwrap_or(0);
(PropValueTag::String as u8, string_id as u64)
}
PropValue::VectorF32(vec) => (PropValueTag::VectorF32 as u8, vectors.push(vec)),
}
}
struct SectionData {
id: SectionId,
data: Vec<u8>,
compression: CompressionType,
uncompressed_size: u32,
}
struct SnapshotBuildState {
phys_to_node_id: Vec<NodeId>,
node_id_to_phys: HashMap<NodeId, PhysNode>,
max_node_id: NodeId,
string_table: StringTable,
label_string_ids: Vec<StringId>,
etype_string_ids: Vec<StringId>,
propkey_string_ids: Vec<StringId>,
node_key_strings: Vec<StringId>,
out_csr: CSRData,
in_csr: CSRData,
key_index: KeyIndexData,
node_label_offsets: Vec<u32>,
node_label_ids: Vec<u32>,
has_properties: bool,
}
fn build_node_id_maps(nodes: &[NodeData]) -> (Vec<NodeId>, HashMap<NodeId, PhysNode>, NodeId) {
let phys_to_node_id: Vec<NodeId> = nodes.iter().map(|n| n.node_id).collect();
let mut node_id_to_phys: HashMap<NodeId, PhysNode> = HashMap::new();
let mut max_node_id: NodeId = 0;
for (i, node) in nodes.iter().enumerate() {
node_id_to_phys.insert(node.node_id, i as PhysNode);
if node.node_id > max_node_id {
max_node_id = node.node_id;
}
}
(phys_to_node_id, node_id_to_phys, max_node_id)
}
fn validate_edge_nodes(
edges: &[EdgeData],
node_id_to_phys: &HashMap<NodeId, PhysNode>,
) -> Result<()> {
for edge in edges {
let src_missing = !node_id_to_phys.contains_key(&edge.src);
let dst_missing = !node_id_to_phys.contains_key(&edge.dst);
if src_missing || dst_missing {
return Err(KiteError::InvalidSnapshot(format!(
"Edge references missing node(s): src={}, dst={}",
edge.src, edge.dst
)));
}
}
Ok(())
}
fn intern_name_table<'a, F>(
count: usize,
mut lookup: F,
string_table: &mut StringTable,
) -> Vec<StringId>
where
F: FnMut(usize) -> Option<&'a str>,
{
let mut ids: Vec<StringId> = vec![0];
for i in 1..=count {
let name = lookup(i);
ids.push(if let Some(n) = name {
string_table.intern(n)
} else {
0
});
}
ids
}
fn build_node_key_strings(nodes: &[NodeData], string_table: &mut StringTable) -> Vec<StringId> {
nodes
.iter()
.map(|node| {
node
.key
.as_ref()
.map(|k| string_table.intern(k))
.unwrap_or(0)
})
.collect()
}
fn intern_string_props(nodes: &[NodeData], edges: &[EdgeData], string_table: &mut StringTable) {
for node in nodes {
let mut sorted_props: Vec<_> = node.props.iter().collect();
sorted_props.sort_by_key(|(k, _)| *k);
for (_, value) in sorted_props {
if let PropValue::String(s) = value {
string_table.intern(s);
}
}
}
for edge in edges {
let mut sorted_props: Vec<_> = edge.props.iter().collect();
sorted_props.sort_by_key(|(k, _)| *k);
for (_, value) in sorted_props {
if let PropValue::String(s) = value {
string_table.intern(s);
}
}
}
}
fn build_node_labels(nodes: &[NodeData]) -> (Vec<u32>, Vec<u32>) {
let mut node_label_offsets: Vec<u32> = Vec::with_capacity(nodes.len() + 1);
let mut node_label_ids: Vec<u32> = Vec::new();
node_label_offsets.push(0);
for node in nodes {
let mut labels = node.labels.clone();
labels.sort_unstable();
labels.dedup();
node_label_ids.extend(labels.iter().copied());
node_label_offsets.push(node_label_ids.len() as u32);
}
(node_label_offsets, node_label_ids)
}
fn prepare_snapshot_state(
nodes: &[NodeData],
edges: &[EdgeData],
labels: &HashMap<LabelId, String>,
etypes: &HashMap<ETypeId, String>,
propkeys: &HashMap<PropKeyId, String>,
) -> Result<SnapshotBuildState> {
let (phys_to_node_id, node_id_to_phys, max_node_id) = build_node_id_maps(nodes);
validate_edge_nodes(edges, &node_id_to_phys)?;
let mut string_table = StringTable::new();
let label_string_ids = intern_name_table(
labels.len(),
|i| labels.get(&(i as LabelId)).map(|s| s.as_str()),
&mut string_table,
);
let etype_string_ids = intern_name_table(
etypes.len(),
|i| etypes.get(&(i as ETypeId)).map(|s| s.as_str()),
&mut string_table,
);
let propkey_string_ids = intern_name_table(
propkeys.len(),
|i| propkeys.get(&(i as PropKeyId)).map(|s| s.as_str()),
&mut string_table,
);
let node_key_strings = build_node_key_strings(nodes, &mut string_table);
let out_csr = build_out_edges_csr(nodes, edges, &node_id_to_phys);
let in_csr = build_in_edges_csr(nodes, &out_csr);
let key_index = build_key_index(nodes, &node_key_strings);
intern_string_props(nodes, edges, &mut string_table);
let has_properties =
nodes.iter().any(|n| !n.props.is_empty()) || edges.iter().any(|e| !e.props.is_empty());
let (node_label_offsets, node_label_ids) = build_node_labels(nodes);
Ok(SnapshotBuildState {
phys_to_node_id,
node_id_to_phys,
max_node_id,
string_table,
label_string_ids,
etype_string_ids,
propkey_string_ids,
node_key_strings,
out_csr,
in_csr,
key_index,
node_label_offsets,
node_label_ids,
has_properties,
})
}
fn encode_u32_slice(values: &[u32]) -> Vec<u8> {
let mut data = vec![0u8; values.len() * 4];
for (i, &value) in values.iter().enumerate() {
write_u32(&mut data, i * 4, value);
}
data
}
fn encode_u64_slice(values: &[u64]) -> Vec<u8> {
let mut data = vec![0u8; values.len() * 8];
for (i, &value) in values.iter().enumerate() {
write_u64(&mut data, i * 8, value);
}
data
}
fn add_basic_sections(
add_section: &mut impl FnMut(SectionId, Vec<u8>),
phys_to_node_id: &[NodeId],
node_id_to_phys: &HashMap<NodeId, PhysNode>,
max_node_id: NodeId,
out_csr: &CSRData,
in_csr: &CSRData,
num_edges: usize,
) {
{
let data = encode_u64_slice(phys_to_node_id);
add_section(SectionId::PhysToNodeId, data);
}
{
let size = (max_node_id + 1) as usize;
let mut data = vec![0u8; size * 4];
for i in 0..size {
write_i32(&mut data, i * 4, -1);
}
for (&node_id, &phys) in node_id_to_phys {
write_i32(&mut data, (node_id as usize) * 4, phys as i32);
}
add_section(SectionId::NodeIdToPhys, data);
}
{
let data = encode_u32_slice(&out_csr.offsets);
add_section(SectionId::OutOffsets, data);
}
{
let data = encode_u32_slice(&out_csr.dst);
add_section(SectionId::OutDst, data);
}
{
let data = encode_u32_slice(&out_csr.etype);
add_section(SectionId::OutEtype, data);
}
{
let data = encode_u32_slice(&in_csr.offsets);
add_section(SectionId::InOffsets, data);
}
{
let data = encode_u32_slice(&in_csr.dst);
add_section(SectionId::InSrc, data);
}
{
let data = encode_u32_slice(&in_csr.etype);
add_section(SectionId::InEtype, data);
}
{
let mut data = vec![0u8; num_edges * 4];
if let Some(ref out_index) = in_csr.out_index {
for (i, &idx) in out_index.iter().enumerate() {
write_u32(&mut data, i * 4, idx);
}
}
add_section(SectionId::InOutIndex, data);
}
}
fn add_string_table_sections(
add_section: &mut impl FnMut(SectionId, Vec<u8>),
string_table: &StringTable,
num_strings: usize,
) {
let encoded_strings: Vec<Vec<u8>> = string_table
.strings
.iter()
.map(|s| s.as_bytes().to_vec())
.collect();
let total_bytes: usize = encoded_strings.iter().map(|s| s.len()).sum();
let mut offsets_data = vec![0u8; (num_strings + 1) * 4];
let mut bytes_data = vec![0u8; total_bytes];
let mut byte_offset = 0usize;
for (i, encoded) in encoded_strings.iter().enumerate() {
write_u32(&mut offsets_data, i * 4, byte_offset as u32);
bytes_data[byte_offset..byte_offset + encoded.len()].copy_from_slice(encoded);
byte_offset += encoded.len();
}
write_u32(&mut offsets_data, num_strings * 4, byte_offset as u32);
add_section(SectionId::StringOffsets, offsets_data);
add_section(SectionId::StringBytes, bytes_data);
}
fn add_string_id_sections(
add_section: &mut impl FnMut(SectionId, Vec<u8>),
label_string_ids: &[StringId],
etype_string_ids: &[StringId],
propkey_string_ids: &[StringId],
node_key_strings: &[StringId],
) {
add_section(
SectionId::LabelStringIds,
encode_u32_slice(label_string_ids),
);
add_section(
SectionId::EtypeStringIds,
encode_u32_slice(etype_string_ids),
);
add_section(
SectionId::PropkeyStringIds,
encode_u32_slice(propkey_string_ids),
);
add_section(SectionId::NodeKeyString, encode_u32_slice(node_key_strings));
}
fn add_node_label_sections(
add_section: &mut impl FnMut(SectionId, Vec<u8>),
node_label_offsets: &[u32],
node_label_ids: &[u32],
) {
add_section(
SectionId::NodeLabelOffsets,
encode_u32_slice(node_label_offsets),
);
add_section(SectionId::NodeLabelIds, encode_u32_slice(node_label_ids));
}
fn add_key_index_sections(
add_section: &mut impl FnMut(SectionId, Vec<u8>),
key_index: &KeyIndexData,
) {
let mut data = vec![0u8; key_index.entries.len() * KEY_INDEX_ENTRY_SIZE];
for (i, entry) in key_index.entries.iter().enumerate() {
let offset = i * KEY_INDEX_ENTRY_SIZE;
write_u64(&mut data, offset, entry.hash64);
write_u32(&mut data, offset + 8, entry.string_id);
write_u32(&mut data, offset + 12, 0);
write_u64(&mut data, offset + 16, entry.node_id);
}
add_section(SectionId::KeyEntries, data);
add_section(SectionId::KeyBuckets, encode_u32_slice(&key_index.buckets));
}
fn add_node_prop_sections(
add_section: &mut impl FnMut(SectionId, Vec<u8>),
nodes: &[NodeData],
string_table: &StringTable,
vector_table: &mut VectorTable,
) {
let num_nodes = nodes.len();
let mut node_prop_offsets = vec![0u32; num_nodes + 1];
let mut node_prop_keys: Vec<u32> = Vec::new();
let mut node_prop_vals: Vec<(u8, u64)> = Vec::new();
for (i, node) in nodes.iter().enumerate() {
node_prop_offsets[i] = node_prop_keys.len() as u32;
let mut sorted_props: Vec<_> = node.props.iter().collect();
sorted_props.sort_by_key(|(k, _)| *k);
for (&key_id, value) in sorted_props {
node_prop_keys.push(key_id);
node_prop_vals.push(encode_prop_value(value, string_table, vector_table));
}
}
node_prop_offsets[num_nodes] = node_prop_keys.len() as u32;
add_section(
SectionId::NodePropOffsets,
encode_u32_slice(&node_prop_offsets),
);
add_section(SectionId::NodePropKeys, encode_u32_slice(&node_prop_keys));
let mut vals_data = vec![0u8; node_prop_vals.len() * PROP_VALUE_DISK_SIZE];
for (i, (tag, payload)) in node_prop_vals.iter().enumerate() {
let offset = i * PROP_VALUE_DISK_SIZE;
vals_data[offset] = *tag;
write_u64(&mut vals_data, offset + 8, *payload);
}
add_section(SectionId::NodePropVals, vals_data);
}
struct EdgePropSectionArgs<'a> {
edges: &'a [EdgeData],
node_id_to_phys: &'a HashMap<NodeId, PhysNode>,
out_csr: &'a CSRData,
string_table: &'a StringTable,
vector_table: &'a mut VectorTable,
num_nodes: usize,
num_edges: usize,
}
fn add_edge_prop_sections(
add_section: &mut impl FnMut(SectionId, Vec<u8>),
args: EdgePropSectionArgs<'_>,
) {
let mut edge_prop_map: HashMap<(PhysNode, ETypeId, PhysNode), &HashMap<PropKeyId, PropValue>> =
HashMap::new();
for edge in args.edges {
if !edge.props.is_empty() {
if let (Some(&src_phys), Some(&dst_phys)) = (
args.node_id_to_phys.get(&edge.src),
args.node_id_to_phys.get(&edge.dst),
) {
edge_prop_map.insert((src_phys, edge.etype, dst_phys), &edge.props);
}
}
}
let mut edge_prop_offsets = vec![0u32; args.num_edges + 1];
let mut edge_prop_keys: Vec<u32> = Vec::new();
let mut edge_prop_vals: Vec<(u8, u64)> = Vec::new();
let mut edge_idx = 0usize;
for src_phys in 0..args.num_nodes {
let start = args.out_csr.offsets[src_phys] as usize;
let end = args.out_csr.offsets[src_phys + 1] as usize;
for i in start..end {
edge_prop_offsets[edge_idx] = edge_prop_keys.len() as u32;
let dst_phys = args.out_csr.dst[i];
let etype = args.out_csr.etype[i];
if let Some(props) = edge_prop_map.get(&(src_phys as PhysNode, etype, dst_phys)) {
let mut sorted_props: Vec<_> = props.iter().collect();
sorted_props.sort_by_key(|(k, _)| *k);
for (&key_id, value) in sorted_props {
edge_prop_keys.push(key_id);
edge_prop_vals.push(encode_prop_value(
value,
args.string_table,
args.vector_table,
));
}
}
edge_idx += 1;
}
}
edge_prop_offsets[args.num_edges] = edge_prop_keys.len() as u32;
add_section(
SectionId::EdgePropOffsets,
encode_u32_slice(&edge_prop_offsets),
);
add_section(SectionId::EdgePropKeys, encode_u32_slice(&edge_prop_keys));
let mut vals_data = vec![0u8; edge_prop_vals.len() * PROP_VALUE_DISK_SIZE];
for (i, (tag, payload)) in edge_prop_vals.iter().enumerate() {
let offset = i * PROP_VALUE_DISK_SIZE;
vals_data[offset] = *tag;
write_u64(&mut vals_data, offset + 8, *payload);
}
add_section(SectionId::EdgePropVals, vals_data);
}
fn add_vector_sections(
add_section: &mut impl FnMut(SectionId, Vec<u8>),
vector_table: VectorTable,
) -> bool {
if vector_table.is_empty() {
return false;
}
let mut offsets_data = vec![0u8; vector_table.offsets.len() * 8];
for (i, &offset) in vector_table.offsets.iter().enumerate() {
write_u64(&mut offsets_data, i * 8, offset);
}
add_section(SectionId::VectorOffsets, offsets_data);
add_section(SectionId::VectorData, vector_table.data);
true
}
pub fn build_snapshot_to_memory(input: SnapshotBuildInput) -> Result<Vec<u8>> {
let SnapshotBuildInput {
generation,
mut nodes,
edges,
labels,
etypes,
propkeys,
compression,
} = input;
nodes.sort_by_key(|n| n.node_id);
let state = prepare_snapshot_state(&nodes, &edges, &labels, &etypes, &propkeys)?;
let compression_opts = compression.unwrap_or_default();
let mut section_data: Vec<SectionData> = Vec::new();
let num_nodes = nodes.len();
let num_edges = edges.len();
let num_strings = state.string_table.len();
let mut add_section = |id: SectionId, data: Vec<u8>| {
let (compressed, compression_type) = maybe_compress(&data, &compression_opts);
section_data.push(SectionData {
id,
uncompressed_size: data.len() as u32,
data: compressed,
compression: compression_type,
});
};
add_basic_sections(
&mut add_section,
&state.phys_to_node_id,
&state.node_id_to_phys,
state.max_node_id,
&state.out_csr,
&state.in_csr,
num_edges,
);
add_string_table_sections(&mut add_section, &state.string_table, num_strings);
add_string_id_sections(
&mut add_section,
&state.label_string_ids,
&state.etype_string_ids,
&state.propkey_string_ids,
&state.node_key_strings,
);
add_node_label_sections(
&mut add_section,
&state.node_label_offsets,
&state.node_label_ids,
);
add_key_index_sections(&mut add_section, &state.key_index);
let mut vector_table = VectorTable::new();
add_node_prop_sections(
&mut add_section,
&nodes,
&state.string_table,
&mut vector_table,
);
add_edge_prop_sections(
&mut add_section,
EdgePropSectionArgs {
edges: &edges,
node_id_to_phys: &state.node_id_to_phys,
out_csr: &state.out_csr,
string_table: &state.string_table,
vector_table: &mut vector_table,
num_nodes,
num_edges,
},
);
let has_vectors = add_vector_sections(&mut add_section, vector_table);
let header_size = SNAPSHOT_HEADER_SIZE;
let section_table_size = SectionId::COUNT * SECTION_ENTRY_SIZE;
let mut data_offset = align_up(header_size + section_table_size, SECTION_ALIGNMENT);
let mut section_offsets: HashMap<SectionId, (u64, u64, CompressionType, u32)> = HashMap::new();
for section in §ion_data {
section_offsets.insert(
section.id,
(
data_offset as u64,
section.data.len() as u64,
section.compression,
section.uncompressed_size,
),
);
data_offset = align_up(data_offset + section.data.len(), SECTION_ALIGNMENT);
}
let total_size = data_offset + 4;
let mut buffer = vec![0u8; total_size];
let mut offset = 0;
write_u32(&mut buffer, offset, MAGIC_SNAPSHOT);
offset += 4;
write_u32(&mut buffer, offset, VERSION_SNAPSHOT);
offset += 4;
write_u32(&mut buffer, offset, MIN_READER_SNAPSHOT);
offset += 4;
let mut flags = SnapshotFlags::HAS_IN_EDGES | SnapshotFlags::HAS_NODE_LABELS;
if state.has_properties {
flags |= SnapshotFlags::HAS_PROPERTIES;
}
if state.key_index.buckets.len() > 1 {
flags |= SnapshotFlags::HAS_KEY_BUCKETS;
}
if has_vectors {
flags |= SnapshotFlags::HAS_VECTORS;
}
write_u32(&mut buffer, offset, flags.bits());
offset += 4;
write_u64(&mut buffer, offset, generation);
offset += 8;
let created_unix_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
write_u64(&mut buffer, offset, created_unix_ns);
offset += 8;
write_u64(&mut buffer, offset, num_nodes as u64);
offset += 8;
write_u64(&mut buffer, offset, num_edges as u64);
offset += 8;
write_u64(&mut buffer, offset, state.max_node_id);
offset += 8;
write_u64(&mut buffer, offset, labels.len() as u64);
offset += 8;
write_u64(&mut buffer, offset, etypes.len() as u64);
offset += 8;
write_u64(&mut buffer, offset, propkeys.len() as u64);
offset += 8;
write_u64(&mut buffer, offset, num_strings as u64);
offset = header_size;
for id_num in 0..SectionId::COUNT {
let id = SectionId::from_u32(id_num as u32).ok_or_else(|| {
KiteError::Internal("section id out of range in snapshot writer".to_string())
})?;
let (sec_offset, sec_length, compression, uncompressed_size) = section_offsets
.get(&id)
.copied()
.unwrap_or((0, 0, CompressionType::None, 0));
write_u64(&mut buffer, offset, sec_offset);
offset += 8;
write_u64(&mut buffer, offset, sec_length);
offset += 8;
write_u32(&mut buffer, offset, compression as u32);
offset += 4;
write_u32(&mut buffer, offset, uncompressed_size);
offset += 4;
}
for section in §ion_data {
let (sec_offset, _, _, _) = section_offsets[§ion.id];
buffer[sec_offset as usize..sec_offset as usize + section.data.len()]
.copy_from_slice(§ion.data);
}
let footer_crc = crc32c(&buffer[..total_size - 4]);
write_u32(&mut buffer, total_size - 4, footer_crc);
Ok(buffer)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::util::crc::crc32c;
use std::io::Write;
use tempfile::NamedTempFile;
fn create_test_input() -> SnapshotBuildInput {
let nodes = vec![
NodeData {
node_id: 1,
key: Some("user:alice".to_string()),
labels: vec![1],
props: {
let mut props = HashMap::new();
props.insert(1, PropValue::String("Alice".to_string()));
props.insert(2, PropValue::I64(30));
props.insert(4, PropValue::VectorF32(vec![0.1, 0.2, 0.3]));
props
},
},
NodeData {
node_id: 2,
key: Some("user:bob".to_string()),
labels: vec![1],
props: {
let mut props = HashMap::new();
props.insert(1, PropValue::String("Bob".to_string()));
props.insert(2, PropValue::I64(25));
props
},
},
NodeData {
node_id: 3,
key: None,
labels: vec![2],
props: HashMap::new(),
},
];
let edges = vec![
EdgeData {
src: 1,
etype: 1,
dst: 2,
props: {
let mut props = HashMap::new();
props.insert(3, PropValue::F64(0.9));
props
},
},
EdgeData {
src: 2,
etype: 1,
dst: 1,
props: HashMap::new(),
},
EdgeData {
src: 1,
etype: 2,
dst: 3,
props: HashMap::new(),
},
];
let mut labels = HashMap::new();
labels.insert(1, "Person".to_string());
labels.insert(2, "Document".to_string());
let mut etypes = HashMap::new();
etypes.insert(1, "KNOWS".to_string());
etypes.insert(2, "CREATED".to_string());
let mut propkeys = HashMap::new();
propkeys.insert(1, "name".to_string());
propkeys.insert(2, "age".to_string());
propkeys.insert(3, "weight".to_string());
propkeys.insert(4, "embedding".to_string());
SnapshotBuildInput {
generation: 1,
nodes,
edges,
labels,
etypes,
propkeys,
compression: None,
}
}
#[test]
fn test_build_snapshot_to_memory() {
let input = create_test_input();
let buffer = build_snapshot_to_memory(input).expect("expected value");
assert!(buffer.len() > SNAPSHOT_HEADER_SIZE);
assert_eq!(read_u32(&buffer, 0), MAGIC_SNAPSHOT);
assert_eq!(read_u32(&buffer, 4), VERSION_SNAPSHOT);
assert_eq!(read_u32(&buffer, 8), MIN_READER_SNAPSHOT);
let generation = read_u64(&buffer, 16);
assert_eq!(generation, 1);
let num_nodes = read_u64(&buffer, 32);
assert_eq!(num_nodes, 3);
let num_edges = read_u64(&buffer, 40);
assert_eq!(num_edges, 3);
let max_node_id = read_u64(&buffer, 48);
assert_eq!(max_node_id, 3);
let crc_offset = buffer.len() - 4;
let stored_crc = read_u32(&buffer, crc_offset);
let computed_crc = crc32c(&buffer[..crc_offset]);
assert_eq!(stored_crc, computed_crc);
}
#[test]
fn test_snapshot_round_trip_includes_vector_properties() {
let input = create_test_input();
let buffer = build_snapshot_to_memory(input).expect("expected value");
let mut tmp = NamedTempFile::new().expect("expected value");
tmp.write_all(&buffer).expect("expected value");
tmp.flush().expect("expected value");
let snapshot =
crate::core::snapshot::reader::SnapshotData::load(tmp.path()).expect("expected value");
assert!(snapshot
.header
.flags
.contains(SnapshotFlags::HAS_PROPERTIES));
assert!(snapshot.header.flags.contains(SnapshotFlags::HAS_VECTORS));
let phys = snapshot.phys_node(1).expect("expected value");
let embedding = snapshot.node_prop(phys, 4).expect("expected value");
match embedding {
PropValue::VectorF32(v) => {
assert_eq!(v.len(), 3);
assert!((v[0] - 0.1).abs() < 1e-6);
assert!((v[1] - 0.2).abs() < 1e-6);
assert!((v[2] - 0.3).abs() < 1e-6);
}
other => panic!("expected VectorF32, got {other:?}"),
}
}
#[test]
fn test_build_empty_snapshot() {
let input = SnapshotBuildInput {
generation: 1,
nodes: vec![],
edges: vec![],
labels: HashMap::new(),
etypes: HashMap::new(),
propkeys: HashMap::new(),
compression: None,
};
let buffer = build_snapshot_to_memory(input).expect("expected value");
assert_eq!(read_u32(&buffer, 0), MAGIC_SNAPSHOT);
let num_nodes = read_u64(&buffer, 32);
let num_edges = read_u64(&buffer, 40);
assert_eq!(num_nodes, 0);
assert_eq!(num_edges, 0);
}
#[test]
fn test_build_snapshot_missing_nodes_returns_error() {
let mut etypes = HashMap::new();
etypes.insert(1, "REL".to_string());
let input = SnapshotBuildInput {
generation: 1,
nodes: vec![],
edges: vec![EdgeData {
src: 1,
etype: 1,
dst: 2,
props: HashMap::new(),
}],
labels: HashMap::new(),
etypes,
propkeys: HashMap::new(),
compression: None,
};
assert!(build_snapshot_to_memory(input).is_err());
}
#[test]
fn test_string_table() {
let mut table = StringTable::new();
assert_eq!(table.len(), 1);
let id1 = table.intern("hello");
assert_eq!(id1, 1);
let id2 = table.intern("world");
assert_eq!(id2, 2);
let id1_again = table.intern("hello");
assert_eq!(id1_again, 1);
assert_eq!(table.len(), 3);
}
#[test]
fn test_csr_building() {
let nodes = vec![
NodeData {
node_id: 1,
key: None,
labels: vec![],
props: HashMap::new(),
},
NodeData {
node_id: 2,
key: None,
labels: vec![],
props: HashMap::new(),
},
NodeData {
node_id: 3,
key: None,
labels: vec![],
props: HashMap::new(),
},
];
let edges = vec![
EdgeData {
src: 1,
etype: 1,
dst: 2,
props: HashMap::new(),
},
EdgeData {
src: 1,
etype: 1,
dst: 3,
props: HashMap::new(),
},
EdgeData {
src: 2,
etype: 2,
dst: 1,
props: HashMap::new(),
},
];
let mut node_id_to_phys = HashMap::new();
node_id_to_phys.insert(1, 0);
node_id_to_phys.insert(2, 1);
node_id_to_phys.insert(3, 2);
let out_csr = build_out_edges_csr(&nodes, &edges, &node_id_to_phys);
assert_eq!(out_csr.offsets, vec![0, 2, 3, 3]);
assert_eq!(out_csr.dst.len(), 3);
assert_eq!(out_csr.etype.len(), 3);
let in_csr = build_in_edges_csr(&nodes, &out_csr);
assert_eq!(in_csr.offsets, vec![0, 1, 2, 3]);
}
}