use crate::graph::features::timeseries::{NodeTimeseries, TimeseriesConfig};
use crate::graph::schema::{
CompositeIndexKey, ConnectionTypeInfo, ConnectivityTriple, DirGraph, EmbeddingStore, IndexKey,
PropertyStorage, SaveMetadata, SchemaDefinition, SerdeDeserializeGuard, SerdeSerializeGuard,
SpatialConfig, StringInterner, StripPropertiesGuard, TemporalConfig,
};
use crate::graph::storage::column_store::ColumnStore;
use crate::graph::storage::{GraphRead, GraphWrite};
use bincode::Options;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use memmap2::Mmap;
use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fs::File;
use std::io::{self, BufReader, BufWriter, Read, Write};
use std::sync::Arc;
fn bincode_options() -> impl bincode::Options {
bincode::options()
.with_fixint_encoding()
.with_little_endian()
.allow_trailing_bytes()
.with_limit(2 * 1024 * 1024 * 1024) }
const V3_MAGIC: [u8; 4] = [0x52, 0x47, 0x46, 0x03];
const V4_MAGIC: [u8; 4] = [0x52, 0x47, 0x46, 0x04];
const CURRENT_CORE_DATA_VERSION: u32 = 2;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct V3ColumnSection {
type_name: String,
compressed_size: u64,
row_count: u32,
columns: HashMap<String, String>, }
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub(crate) struct FileMetadata {
#[serde(default)]
core_data_version: u32,
#[serde(default)]
library_version: String,
#[serde(default)]
schema_definition: Option<SchemaDefinition>,
#[serde(default)]
property_index_keys: Vec<IndexKey>,
#[serde(default)]
composite_index_keys: Vec<CompositeIndexKey>,
#[serde(default)]
range_index_keys: Vec<IndexKey>,
#[serde(default)]
node_type_metadata: HashMap<String, HashMap<String, String>>,
#[serde(default)]
connection_type_metadata: HashMap<String, ConnectionTypeInfo>,
#[serde(default)]
id_field_aliases: FxHashMap<String, String>,
#[serde(default)]
title_field_aliases: FxHashMap<String, String>,
#[serde(default = "crate::graph::dir_graph::default_auto_vacuum_threshold")]
auto_vacuum_threshold: Option<f64>,
#[serde(default)]
parent_types: HashMap<String, String>,
#[serde(default)]
spatial_configs: HashMap<String, SpatialConfig>,
#[serde(default)]
timeseries_configs: HashMap<String, TimeseriesConfig>,
#[serde(default)]
temporal_node_configs: HashMap<String, TemporalConfig>,
#[serde(default)]
temporal_edge_configs: HashMap<String, Vec<TemporalConfig>>,
#[serde(default = "default_ts_data_version")]
timeseries_data_version: u32,
#[serde(default)]
topology_compressed_size: u64,
#[serde(default)]
column_sections: Vec<V3ColumnSection>,
#[serde(default)]
embeddings_compressed_size: u64,
#[serde(default)]
timeseries_compressed_size: u64,
#[serde(default)]
secondary_labels_compressed_size: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
edge_type_counts: Option<HashMap<String, usize>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
type_connectivity: Option<Vec<ConnectivityTriple>>,
}
fn default_ts_data_version() -> u32 {
2
}
impl FileMetadata {
pub(crate) fn from_graph(graph: &DirGraph) -> Self {
FileMetadata {
core_data_version: CURRENT_CORE_DATA_VERSION,
library_version: env!("CARGO_PKG_VERSION").to_string(),
schema_definition: graph.schema_definition.clone(),
property_index_keys: graph.property_index_keys.clone(),
composite_index_keys: graph.composite_index_keys.clone(),
range_index_keys: graph.range_index_keys.clone(),
node_type_metadata: graph.node_type_metadata.clone(),
connection_type_metadata: graph.connection_type_metadata.clone(),
id_field_aliases: graph.id_field_aliases.clone(),
title_field_aliases: graph.title_field_aliases.clone(),
auto_vacuum_threshold: graph.auto_vacuum_threshold,
parent_types: graph.parent_types.clone(),
spatial_configs: graph.spatial_configs.clone(),
timeseries_configs: graph.timeseries_configs.clone(),
temporal_node_configs: graph.temporal_node_configs.clone(),
temporal_edge_configs: graph.temporal_edge_configs.clone(),
timeseries_data_version: 2,
topology_compressed_size: 0,
column_sections: Vec::new(),
embeddings_compressed_size: 0,
timeseries_compressed_size: 0,
secondary_labels_compressed_size: 0,
edge_type_counts: if graph.has_edge_type_counts_cache() {
Some(graph.get_edge_type_counts())
} else {
None
},
type_connectivity: graph.get_type_connectivity(),
}
}
#[allow(dead_code)]
pub(crate) fn apply_to(self, graph: &mut DirGraph) {
self.apply_to_with(graph, true)
}
pub(crate) fn apply_to_with(self, graph: &mut DirGraph, derive_type_connectivity: bool) {
graph.schema_definition = self.schema_definition;
graph.property_index_keys = self.property_index_keys;
graph.composite_index_keys = self.composite_index_keys;
graph.range_index_keys = self.range_index_keys;
graph.node_type_metadata = self.node_type_metadata;
graph.connection_type_metadata = self.connection_type_metadata;
graph.id_field_aliases = self.id_field_aliases;
graph.title_field_aliases = self.title_field_aliases;
graph.auto_vacuum_threshold = self.auto_vacuum_threshold;
graph.parent_types = self.parent_types;
graph.spatial_configs = self.spatial_configs;
graph.timeseries_configs = self.timeseries_configs;
graph.temporal_node_configs = self.temporal_node_configs;
graph.temporal_edge_configs = self.temporal_edge_configs;
graph.save_metadata = SaveMetadata {
format_version: 3,
library_version: self.library_version,
};
if let Some(counts) = self.edge_type_counts {
*graph.edge_type_counts_cache.write().unwrap() = Some(counts);
}
if let Some(triples) = self.type_connectivity {
*graph.type_connectivity_cache.write().unwrap() = Some(triples);
} else if derive_type_connectivity && !graph.connection_type_metadata.is_empty() {
let edge_counts = graph.edge_type_counts_cache.read().unwrap();
let mut triples = Vec::new();
for (conn_type, info) in &graph.connection_type_metadata {
let count = edge_counts
.as_ref()
.and_then(|c| c.get(conn_type).copied())
.unwrap_or(0);
for src in &info.source_types {
for tgt in &info.target_types {
triples.push(crate::graph::schema::ConnectivityTriple {
src: src.clone(),
conn: conn_type.clone(),
tgt: tgt.clone(),
count,
});
}
}
}
if !triples.is_empty() {
*graph.type_connectivity_cache.write().unwrap() = Some(triples);
}
}
}
}
pub(crate) fn build_disk_metadata(graph: &DirGraph) -> FileMetadata {
FileMetadata::from_graph(graph)
}
pub(crate) fn strip_type_connectivity(meta: &mut FileMetadata) {
meta.type_connectivity = None;
}
pub(crate) fn strip_heavy_metadata(meta: &mut FileMetadata) {
meta.node_type_metadata.clear();
meta.connection_type_metadata.clear();
}
const NODE_TYPE_META_MAGIC: &[u8; 8] = b"KGLNTM1\0";
const NODE_TYPE_META_VERSION: u32 = 1;
pub(crate) fn write_node_type_metadata_bin(
dir: &std::path::Path,
graph: &DirGraph,
) -> Result<(), String> {
if graph.node_type_metadata.is_empty() {
return Ok(());
}
let mut entries: Vec<(&String, &HashMap<String, String>)> =
graph.node_type_metadata.iter().collect();
entries.sort_by(|a, b| a.0.cmp(b.0));
let mut payload: Vec<u8> = Vec::with_capacity(64 * 1024);
payload.extend_from_slice(NODE_TYPE_META_MAGIC);
payload.extend_from_slice(&NODE_TYPE_META_VERSION.to_le_bytes());
payload.extend_from_slice(&(entries.len() as u32).to_le_bytes());
for (type_name, props) in entries {
payload.extend_from_slice(&(type_name.len() as u32).to_le_bytes());
payload.extend_from_slice(type_name.as_bytes());
let mut prop_pairs: Vec<(&String, &String)> = props.iter().collect();
prop_pairs.sort_by(|a, b| a.0.cmp(b.0));
payload.extend_from_slice(&(prop_pairs.len() as u32).to_le_bytes());
for (k, v) in prop_pairs {
payload.extend_from_slice(&(k.len() as u32).to_le_bytes());
payload.extend_from_slice(k.as_bytes());
payload.extend_from_slice(&(v.len() as u32).to_le_bytes());
payload.extend_from_slice(v.as_bytes());
}
}
let compressed = zstd::encode_all(payload.as_slice(), 3)
.map_err(|e| format!("node_type_metadata compression failed: {}", e))?;
std::fs::write(dir.join("node_type_metadata.bin.zst"), compressed)
.map_err(|e| format!("Failed to write node_type_metadata.bin.zst: {}", e))?;
Ok(())
}
pub(crate) fn read_node_type_metadata_bin(
dir: &std::path::Path,
) -> io::Result<Option<HashMap<String, HashMap<String, String>>>> {
let path = dir.join("node_type_metadata.bin.zst");
if !path.exists() {
return Ok(None);
}
let compressed = std::fs::read(&path)?;
let bytes = zstd::decode_all(compressed.as_slice()).map_err(io::Error::other)?;
if bytes.len() < 16 || &bytes[..8] != NODE_TYPE_META_MAGIC {
return Ok(None);
}
let version = u32::from_le_bytes(bytes[8..12].try_into().unwrap());
if version != NODE_TYPE_META_VERSION {
return Ok(None);
}
let num_types = u32::from_le_bytes(bytes[12..16].try_into().unwrap()) as usize;
let mut out = HashMap::with_capacity(num_types);
let mut cursor = 16usize;
for _ in 0..num_types {
let name = read_lp_string(&bytes, &mut cursor)?;
let num_props = read_u32(&bytes, &mut cursor)? as usize;
let mut props = HashMap::with_capacity(num_props);
for _ in 0..num_props {
let k = read_lp_string(&bytes, &mut cursor)?;
let v = read_lp_string(&bytes, &mut cursor)?;
props.insert(k, v);
}
out.insert(name, props);
}
Ok(Some(out))
}
const CONN_TYPE_META_MAGIC: &[u8; 8] = b"KGLCTM1\0";
const CONN_TYPE_META_VERSION: u32 = 1;
pub(crate) fn write_connection_type_metadata_bin(
dir: &std::path::Path,
graph: &DirGraph,
) -> Result<(), String> {
use crate::graph::schema::ConnectionTypeInfo;
if graph.connection_type_metadata.is_empty() {
return Ok(());
}
let mut entries: Vec<(&String, &ConnectionTypeInfo)> =
graph.connection_type_metadata.iter().collect();
entries.sort_by(|a, b| a.0.cmp(b.0));
let mut payload: Vec<u8> = Vec::with_capacity(64 * 1024);
payload.extend_from_slice(CONN_TYPE_META_MAGIC);
payload.extend_from_slice(&CONN_TYPE_META_VERSION.to_le_bytes());
payload.extend_from_slice(&(entries.len() as u32).to_le_bytes());
for (conn_name, info) in entries {
payload.extend_from_slice(&(conn_name.len() as u32).to_le_bytes());
payload.extend_from_slice(conn_name.as_bytes());
let mut sources: Vec<&String> = info.source_types.iter().collect();
sources.sort();
payload.extend_from_slice(&(sources.len() as u32).to_le_bytes());
for s in sources {
payload.extend_from_slice(&(s.len() as u32).to_le_bytes());
payload.extend_from_slice(s.as_bytes());
}
let mut targets: Vec<&String> = info.target_types.iter().collect();
targets.sort();
payload.extend_from_slice(&(targets.len() as u32).to_le_bytes());
for t in targets {
payload.extend_from_slice(&(t.len() as u32).to_le_bytes());
payload.extend_from_slice(t.as_bytes());
}
let mut props: Vec<(&String, &String)> = info.property_types.iter().collect();
props.sort_by(|a, b| a.0.cmp(b.0));
payload.extend_from_slice(&(props.len() as u32).to_le_bytes());
for (k, v) in props {
payload.extend_from_slice(&(k.len() as u32).to_le_bytes());
payload.extend_from_slice(k.as_bytes());
payload.extend_from_slice(&(v.len() as u32).to_le_bytes());
payload.extend_from_slice(v.as_bytes());
}
}
let compressed = zstd::encode_all(payload.as_slice(), 3)
.map_err(|e| format!("connection_type_metadata compression failed: {}", e))?;
std::fs::write(dir.join("connection_type_metadata.bin.zst"), compressed)
.map_err(|e| format!("Failed to write connection_type_metadata.bin.zst: {}", e))?;
Ok(())
}
pub(crate) fn read_connection_type_metadata_bin(
dir: &std::path::Path,
) -> io::Result<Option<HashMap<String, crate::graph::schema::ConnectionTypeInfo>>> {
use crate::graph::schema::ConnectionTypeInfo;
let path = dir.join("connection_type_metadata.bin.zst");
if !path.exists() {
return Ok(None);
}
let compressed = std::fs::read(&path)?;
let bytes = zstd::decode_all(compressed.as_slice()).map_err(io::Error::other)?;
if bytes.len() < 16 || &bytes[..8] != CONN_TYPE_META_MAGIC {
return Ok(None);
}
let version = u32::from_le_bytes(bytes[8..12].try_into().unwrap());
if version != CONN_TYPE_META_VERSION {
return Ok(None);
}
let num_conns = u32::from_le_bytes(bytes[12..16].try_into().unwrap()) as usize;
let mut out = HashMap::with_capacity(num_conns);
let mut cursor = 16usize;
for _ in 0..num_conns {
let name = read_lp_string(&bytes, &mut cursor)?;
let num_sources = read_u32(&bytes, &mut cursor)? as usize;
let mut source_types = std::collections::HashSet::with_capacity(num_sources);
for _ in 0..num_sources {
source_types.insert(read_lp_string(&bytes, &mut cursor)?);
}
let num_targets = read_u32(&bytes, &mut cursor)? as usize;
let mut target_types = std::collections::HashSet::with_capacity(num_targets);
for _ in 0..num_targets {
target_types.insert(read_lp_string(&bytes, &mut cursor)?);
}
let num_props = read_u32(&bytes, &mut cursor)? as usize;
let mut property_types = HashMap::with_capacity(num_props);
for _ in 0..num_props {
let k = read_lp_string(&bytes, &mut cursor)?;
let v = read_lp_string(&bytes, &mut cursor)?;
property_types.insert(k, v);
}
out.insert(
name,
ConnectionTypeInfo {
source_types,
target_types,
property_types,
},
);
}
Ok(Some(out))
}
#[inline]
fn read_u32(bytes: &[u8], cursor: &mut usize) -> io::Result<u32> {
if *cursor + 4 > bytes.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"metadata sidecar truncated",
));
}
let v = u32::from_le_bytes(bytes[*cursor..*cursor + 4].try_into().unwrap());
*cursor += 4;
Ok(v)
}
#[inline]
fn read_lp_string(bytes: &[u8], cursor: &mut usize) -> io::Result<String> {
let len = read_u32(bytes, cursor)? as usize;
if *cursor + len > bytes.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"metadata sidecar string truncated",
));
}
let s = std::str::from_utf8(&bytes[*cursor..*cursor + len])
.map_err(io::Error::other)?
.to_string();
*cursor += len;
Ok(s)
}
const TYPE_INDICES_MAGIC: &[u8; 8] = b"KGLTIDX1";
const TYPE_INDICES_VERSION: u32 = 1;
pub(crate) fn read_type_indices_bin(
payload: &[u8],
interner: &crate::graph::storage::interner::StringInterner,
) -> io::Result<Option<std::collections::HashMap<String, Vec<petgraph::graph::NodeIndex>>>> {
if payload.len() < 24 || &payload[..8] != TYPE_INDICES_MAGIC {
return Ok(None);
}
let version = u32::from_le_bytes(payload[8..12].try_into().unwrap());
if version != TYPE_INDICES_VERSION {
return Ok(None);
}
let num_types = u32::from_le_bytes(payload[12..16].try_into().unwrap()) as usize;
let total_nodes = u64::from_le_bytes(payload[16..24].try_into().unwrap()) as usize;
let type_keys_offset = 24usize;
let offsets_offset = type_keys_offset + 8 * num_types;
let nodes_offset = offsets_offset + 8 * (num_types + 1);
let expected_len = nodes_offset + 4 * total_nodes;
if payload.len() < expected_len {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"type_indices.bin.zst is truncated",
));
}
let mut out =
std::collections::HashMap::<String, Vec<petgraph::graph::NodeIndex>>::with_capacity(
num_types,
);
for i in 0..num_types {
let tkey_base = type_keys_offset + i * 8;
let type_key = u64::from_le_bytes(payload[tkey_base..tkey_base + 8].try_into().unwrap());
let off_base = offsets_offset + i * 8;
let off_start =
u64::from_le_bytes(payload[off_base..off_base + 8].try_into().unwrap()) as usize;
let off_end =
u64::from_le_bytes(payload[off_base + 8..off_base + 16].try_into().unwrap()) as usize;
let name = match interner.try_resolve(crate::graph::schema::InternedKey::from_u64(type_key))
{
Some(s) => s.to_string(),
None => continue, };
let nodes_start = nodes_offset + off_start * 4;
let nodes_end = nodes_offset + off_end * 4;
let mut vec = Vec::with_capacity(off_end - off_start);
for chunk in payload[nodes_start..nodes_end].chunks_exact(4) {
let idx = u32::from_le_bytes(chunk.try_into().unwrap()) as usize;
vec.push(petgraph::graph::NodeIndex::new(idx));
}
out.insert(name, vec);
}
Ok(Some(out))
}
pub(crate) fn write_interner_bin(dir: &std::path::Path, graph: &DirGraph) -> Result<(), String> {
let originals: Vec<String> = graph.interner.iter().map(|(_, v)| v.to_string()).collect();
let bytes = bincode::serialize(&originals)
.map_err(|e| format!("interner serialization failed: {}", e))?;
let compressed = zstd::encode_all(bytes.as_slice(), 3)
.map_err(|e| format!("interner compression failed: {}", e))?;
std::fs::write(dir.join("interner.bin.zst"), compressed)
.map_err(|e| format!("Failed to write interner.bin.zst: {}", e))?;
Ok(())
}
pub(crate) fn read_interner_bin(dir: &std::path::Path, graph: &mut DirGraph) -> io::Result<bool> {
let path = dir.join("interner.bin.zst");
if !path.exists() {
return Ok(false);
}
let compressed = std::fs::read(&path)?;
let bytes = zstd::decode_all(compressed.as_slice()).map_err(io::Error::other)?;
let originals: Vec<String> = bincode::deserialize(&bytes).map_err(io::Error::other)?;
for s in &originals {
graph.interner.get_or_intern(s);
}
Ok(true)
}
const ID_INDICES_MAGIC: &[u8; 8] = b"KGLIIDX1";
const ID_INDICES_VERSION: u32 = 1;
pub(crate) fn read_id_indices_bin(
payload: &[u8],
interner: &crate::graph::storage::interner::StringInterner,
) -> io::Result<Option<std::collections::HashMap<String, crate::graph::schema::TypeIdIndex>>> {
use crate::graph::schema::TypeIdIndex;
if payload.len() < 16 || &payload[..8] != ID_INDICES_MAGIC {
return Ok(None);
}
let version = u32::from_le_bytes(payload[8..12].try_into().unwrap());
if version != ID_INDICES_VERSION {
return Ok(None);
}
let num_types = u32::from_le_bytes(payload[12..16].try_into().unwrap()) as usize;
let mut out = std::collections::HashMap::<String, TypeIdIndex>::with_capacity(num_types);
let mut cursor = 16usize;
for _ in 0..num_types {
if cursor + 24 > payload.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"id_indices.bin.zst truncated at block header",
));
}
let type_key = u64::from_le_bytes(payload[cursor..cursor + 8].try_into().unwrap());
let variant_tag = payload[cursor + 8];
let num_entries =
u64::from_le_bytes(payload[cursor + 16..cursor + 24].try_into().unwrap()) as usize;
cursor += 24;
let name = interner
.try_resolve(crate::graph::schema::InternedKey::from_u64(type_key))
.map(|s| s.to_string());
match variant_tag {
0 => {
let keys_size = 4 * num_entries;
let idxs_size = 4 * num_entries;
if cursor + keys_size + idxs_size > payload.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"id_indices Integer block truncated",
));
}
let keys_bytes = &payload[cursor..cursor + keys_size];
let idxs_bytes = &payload[cursor + keys_size..cursor + keys_size + idxs_size];
cursor += keys_size + idxs_size;
if let Some(name) = name {
let mut map =
std::collections::HashMap::<u32, petgraph::graph::NodeIndex>::with_capacity(
num_entries,
);
for i in 0..num_entries {
let k =
u32::from_le_bytes(keys_bytes[i * 4..i * 4 + 4].try_into().unwrap());
let v = u32::from_le_bytes(idxs_bytes[i * 4..i * 4 + 4].try_into().unwrap())
as usize;
map.insert(k, petgraph::graph::NodeIndex::new(v));
}
out.insert(name, TypeIdIndex::Integer(map));
}
}
1 => {
if cursor + 8 > payload.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"id_indices General block missing blob length",
));
}
let blob_len =
u64::from_le_bytes(payload[cursor..cursor + 8].try_into().unwrap()) as usize;
cursor += 8;
if cursor + blob_len > payload.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"id_indices General blob truncated",
));
}
let blob = &payload[cursor..cursor + blob_len];
cursor += blob_len;
if let Some(name) = name {
let inner: std::collections::HashMap<
crate::datatypes::values::Value,
petgraph::graph::NodeIndex,
> = bincode::deserialize(blob).map_err(io::Error::other)?;
let _ = num_entries; out.insert(name, TypeIdIndex::General(inner));
}
}
other => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("id_indices unknown variant tag {}", other),
));
}
}
}
Ok(Some(out))
}
const TYPE_CONN_MAGIC: &[u8; 8] = b"KGLTCN1\0";
const TYPE_CONN_VERSION: u32 = 1;
const SECONDARY_LABELS_MAGIC: &[u8; 8] = b"KGLSLBL1";
const SECONDARY_LABELS_VERSION: u32 = 1;
pub(crate) fn write_type_connectivity_bin(
dir: &std::path::Path,
graph: &DirGraph,
) -> Result<(), String> {
let Some(triples) = graph.get_type_connectivity() else {
return Ok(());
};
if triples.is_empty() {
return Ok(());
}
let n = triples.len() as u32;
let mut payload: Vec<u8> = Vec::with_capacity(16 + (triples.len() * 32));
payload.extend_from_slice(TYPE_CONN_MAGIC);
payload.extend_from_slice(&TYPE_CONN_VERSION.to_le_bytes());
payload.extend_from_slice(&n.to_le_bytes());
let mut interner = graph.interner.clone();
for t in &triples {
let src_key = interner.get_or_intern(&t.src).as_u64();
let conn_key = interner.get_or_intern(&t.conn).as_u64();
let tgt_key = interner.get_or_intern(&t.tgt).as_u64();
payload.extend_from_slice(&src_key.to_le_bytes());
payload.extend_from_slice(&conn_key.to_le_bytes());
payload.extend_from_slice(&tgt_key.to_le_bytes());
payload.extend_from_slice(&(t.count as u64).to_le_bytes());
}
let compressed = zstd::encode_all(payload.as_slice(), 3)
.map_err(|e| format!("type_connectivity compression failed: {}", e))?;
std::fs::write(dir.join("type_connectivity.bin.zst"), compressed)
.map_err(|e| format!("Failed to write type_connectivity.bin.zst: {}", e))?;
Ok(())
}
pub(crate) fn read_type_connectivity_bin(
dir: &std::path::Path,
graph: &DirGraph,
) -> io::Result<Option<Vec<crate::graph::schema::ConnectivityTriple>>> {
let path = dir.join("type_connectivity.bin.zst");
if !path.exists() {
return Ok(None);
}
let compressed = std::fs::read(&path)?;
let payload = zstd::decode_all(compressed.as_slice()).map_err(io::Error::other)?;
if payload.len() < 16 || &payload[..8] != TYPE_CONN_MAGIC {
return Ok(None);
}
let version = u32::from_le_bytes(payload[8..12].try_into().unwrap());
if version != TYPE_CONN_VERSION {
return Ok(None);
}
let n = u32::from_le_bytes(payload[12..16].try_into().unwrap()) as usize;
let entry_bytes = 32usize;
let expected_len = 16 + n * entry_bytes;
if payload.len() < expected_len {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"type_connectivity.bin.zst is truncated",
));
}
let mut triples = Vec::with_capacity(n);
for i in 0..n {
let base = 16 + i * entry_bytes;
let src_key = u64::from_le_bytes(payload[base..base + 8].try_into().unwrap());
let conn_key = u64::from_le_bytes(payload[base + 8..base + 16].try_into().unwrap());
let tgt_key = u64::from_le_bytes(payload[base + 16..base + 24].try_into().unwrap());
let count = u64::from_le_bytes(payload[base + 24..base + 32].try_into().unwrap());
let src = graph
.interner
.try_resolve(crate::graph::schema::InternedKey::from_u64(src_key))
.map(|s| s.to_string());
let conn = graph
.interner
.try_resolve(crate::graph::schema::InternedKey::from_u64(conn_key))
.map(|s| s.to_string());
let tgt = graph
.interner
.try_resolve(crate::graph::schema::InternedKey::from_u64(tgt_key))
.map(|s| s.to_string());
if let (Some(src), Some(conn), Some(tgt)) = (src, conn, tgt) {
triples.push(crate::graph::schema::ConnectivityTriple {
src,
conn,
tgt,
count: count as usize,
});
}
}
Ok(Some(triples))
}
fn encode_secondary_label_index(graph: &DirGraph) -> Option<Vec<u8>> {
if !graph.has_secondary_labels || graph.secondary_label_index.is_empty() {
return None;
}
let n = graph.secondary_label_index.len() as u32;
let mut payload: Vec<u8> = Vec::new();
payload.extend_from_slice(SECONDARY_LABELS_MAGIC);
payload.extend_from_slice(&SECONDARY_LABELS_VERSION.to_le_bytes());
payload.extend_from_slice(&n.to_le_bytes());
let mut entries: Vec<(
&crate::graph::schema::InternedKey,
&Vec<petgraph::graph::NodeIndex>,
)> = graph.secondary_label_index.iter().collect();
entries.sort_by_key(|(k, _)| graph.interner.resolve(**k).to_string());
for (key, nodes) in entries {
let name = graph.interner.resolve(*key);
let name_bytes = name.as_bytes();
payload.extend_from_slice(&(name_bytes.len() as u32).to_le_bytes());
payload.extend_from_slice(name_bytes);
payload.extend_from_slice(&(nodes.len() as u32).to_le_bytes());
for idx in nodes {
payload.extend_from_slice(&(idx.index() as u32).to_le_bytes());
}
}
Some(payload)
}
fn decode_secondary_label_index(payload: &[u8], graph: &mut DirGraph) -> io::Result<bool> {
if payload.len() < 16 || &payload[..8] != SECONDARY_LABELS_MAGIC {
return Ok(false);
}
let version = u32::from_le_bytes(payload[8..12].try_into().unwrap());
if version != SECONDARY_LABELS_VERSION {
return Ok(false);
}
let n = u32::from_le_bytes(payload[12..16].try_into().unwrap()) as usize;
let mut cursor = 16usize;
let mut index: HashMap<crate::graph::schema::InternedKey, Vec<petgraph::graph::NodeIndex>> =
HashMap::with_capacity(n);
for _ in 0..n {
if payload.len() < cursor + 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"secondary_labels payload truncated (name len)",
));
}
let name_len = u32::from_le_bytes(payload[cursor..cursor + 4].try_into().unwrap()) as usize;
cursor += 4;
if payload.len() < cursor + name_len + 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"secondary_labels payload truncated (name bytes)",
));
}
let name = std::str::from_utf8(&payload[cursor..cursor + name_len])
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
.to_string();
cursor += name_len;
let num_nodes =
u32::from_le_bytes(payload[cursor..cursor + 4].try_into().unwrap()) as usize;
cursor += 4;
if payload.len() < cursor + num_nodes * 4 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"secondary_labels payload truncated (node list)",
));
}
let key = graph.interner.get_or_intern(&name);
let mut nodes = Vec::with_capacity(num_nodes);
for _ in 0..num_nodes {
let raw = u32::from_le_bytes(payload[cursor..cursor + 4].try_into().unwrap());
cursor += 4;
nodes.push(petgraph::graph::NodeIndex::new(raw as usize));
}
index.insert(key, nodes);
}
for bucket in index.values_mut() {
bucket.retain(|idx| graph.graph.node_weight(*idx).is_some());
}
index.retain(|_, bucket| !bucket.is_empty());
if !index.is_empty() {
graph.secondary_label_index = index;
graph.has_secondary_labels = true;
}
Ok(true)
}
pub(crate) fn write_secondary_labels_bin(
dir: &std::path::Path,
graph: &DirGraph,
) -> Result<(), String> {
let Some(payload) = encode_secondary_label_index(graph) else {
return Ok(());
};
let compressed = zstd::encode_all(payload.as_slice(), 3)
.map_err(|e| format!("secondary_labels compression failed: {}", e))?;
std::fs::write(dir.join("secondary_labels.bin.zst"), compressed)
.map_err(|e| format!("Failed to write secondary_labels.bin.zst: {}", e))?;
Ok(())
}
pub(crate) fn read_secondary_labels_bin(
dir: &std::path::Path,
graph: &mut DirGraph,
) -> io::Result<bool> {
let path = dir.join("secondary_labels.bin.zst");
if !path.exists() {
return Ok(false);
}
let compressed = std::fs::read(&path)?;
let payload = zstd::decode_all(compressed.as_slice()).map_err(io::Error::other)?;
decode_secondary_label_index(&payload, graph)
}
pub fn prepare_save(graph: &mut Arc<DirGraph>) {
let g = Arc::make_mut(graph);
g.save_metadata = SaveMetadata::current();
g.populate_index_keys();
}
fn zstd_compress(data: &[u8]) -> io::Result<Vec<u8>> {
zstd::encode_all(std::io::Cursor::new(data), 1)
}
fn zstd_decompress(data: &[u8]) -> io::Result<Vec<u8>> {
zstd::decode_all(std::io::Cursor::new(data))
}
fn bincode_ser<T: Serialize>(val: &T) -> io::Result<Vec<u8>> {
bincode_options().serialize(val).map_err(io::Error::other)
}
fn bincode_deser<'a, T: Deserialize<'a>>(buf: &'a [u8]) -> io::Result<T> {
bincode_options()
.deserialize(buf)
.map_err(|e| io::Error::other(format!("bincode deserialization failed: {}", e)))
}
#[cfg(debug_assertions)]
fn debug_assert_column_keys_registered(graph: &DirGraph) {
for (type_name, store) in &graph.column_stores {
let schema = store.schema();
for (_slot, key) in schema.iter() {
assert!(
graph.interner.try_resolve(key).is_some(),
"kglite invariant violation: ColumnStore for type '{}' contains \
InternedKey {} but the source string is not registered in \
graph.interner. A writer synthesized the key via \
`InternedKey::from_str()` without first calling \
`interner.get_or_intern(...)`. save() would silently corrupt \
the data — failing fast here so the offending writer is \
caught at write time, not at load time on the user's machine.",
type_name,
key.as_u64()
);
}
}
}
pub fn write_graph_v3(graph: &DirGraph, path: &str) -> io::Result<()> {
#[cfg(debug_assertions)]
debug_assert_column_keys_registered(graph);
let topology_raw = {
let _strip = StripPropertiesGuard::new();
let _guard = SerdeSerializeGuard::new(&graph.interner);
bincode_ser(&graph.graph)?
};
let topology_compressed = zstd_compress(&topology_raw)?;
drop(topology_raw);
let mut column_sections_meta: Vec<V3ColumnSection> = Vec::new();
let mut column_sections_data: Vec<Vec<u8>> = Vec::new();
let mut column_stores_sorted: Vec<(&String, &Arc<ColumnStore>)> =
graph.column_stores.iter().collect();
column_stores_sorted.sort_by(|a, b| a.0.cmp(b.0));
for (type_name, store) in column_stores_sorted {
let packed = store.write_packed(&graph.interner)?;
let compressed = zstd_compress(&packed)?;
drop(packed);
let mut cols = HashMap::new();
for (slot, ik) in store.schema().iter() {
let prop_name = graph.interner.resolve(ik);
if let Some(col) = store.columns_ref().get(slot as usize) {
cols.insert(prop_name.to_string(), col.type_tag().to_string());
}
}
column_sections_meta.push(V3ColumnSection {
type_name: type_name.clone(),
compressed_size: compressed.len() as u64,
row_count: store.row_count(),
columns: cols,
});
column_sections_data.push(compressed);
}
let embedding_compressed = if !graph.embeddings.is_empty() {
let raw = bincode_ser(&graph.embeddings)?;
Some(zstd_compress(&raw)?)
} else {
None
};
let timeseries_compressed = if !graph.timeseries_store.is_empty() {
let raw = bincode_ser(&graph.timeseries_store)?;
Some(zstd_compress(&raw)?)
} else {
None
};
let secondary_labels_compressed = match encode_secondary_label_index(graph) {
Some(payload) => Some(zstd_compress(&payload)?),
None => None,
};
let mut metadata = FileMetadata::from_graph(graph);
metadata.topology_compressed_size = topology_compressed.len() as u64;
metadata.column_sections = column_sections_meta;
metadata.embeddings_compressed_size = embedding_compressed
.as_ref()
.map(|b| b.len() as u64)
.unwrap_or(0);
metadata.timeseries_compressed_size = timeseries_compressed
.as_ref()
.map(|b| b.len() as u64)
.unwrap_or(0);
metadata.secondary_labels_compressed_size = secondary_labels_compressed
.as_ref()
.map(|b| b.len() as u64)
.unwrap_or(0);
let metadata_value = serde_json::to_value(&metadata).map_err(io::Error::other)?;
let metadata_json = serde_json::to_vec(&metadata_value).map_err(io::Error::other)?;
let file = File::create(path)?;
let mut writer = BufWriter::new(file);
writer.write_all(&V4_MAGIC)?;
writer.write_all(&CURRENT_CORE_DATA_VERSION.to_le_bytes())?;
writer.write_all(&(metadata_json.len() as u32).to_le_bytes())?;
writer.write_all(&metadata_json)?;
writer.write_all(&topology_compressed)?;
for section_data in &column_sections_data {
writer.write_all(section_data)?;
}
if let Some(emb_data) = &embedding_compressed {
writer.write_all(emb_data)?;
}
if let Some(ts_data) = ×eries_compressed {
writer.write_all(ts_data)?;
}
if let Some(sl_data) = &secondary_labels_compressed {
writer.write_all(sl_data)?;
}
writer.flush()?;
Ok(())
}
pub fn save_inmemory(graph: &mut Arc<DirGraph>, path: &str) -> io::Result<()> {
prepare_save(graph);
{
let dir = Arc::make_mut(graph);
dir.enable_columnar();
}
write_graph_v3(graph, path)
}
pub fn save_graph(graph: &mut Arc<DirGraph>, path: &str) -> Result<(), String> {
if graph.graph.is_disk() {
let dir = Arc::make_mut(graph);
return dir.save_disk(path);
}
save_inmemory(graph, path).map_err(|e| e.to_string())
}
const FILE_MMAP_THRESHOLD: u64 = 65_536;
pub fn load_file(path: &str) -> io::Result<Arc<DirGraph>> {
let p = std::path::Path::new(path);
if p.is_dir() {
return load_disk_dir(p);
}
let file = File::open(path)?;
let file_len = file.metadata()?.len();
if file_len >= FILE_MMAP_THRESHOLD {
let mmap = unsafe { Mmap::map(&file)? };
if mmap.len() < 4 {
return Err(io::Error::other(
"File is too small to be a valid kglite file.",
));
}
if mmap[..4] == V4_MAGIC {
return load_v4(&mmap);
}
if mmap[..4] == V3_MAGIC {
return Err(io::Error::other(V3_HARD_BREAK_MSG));
}
return Err(io::Error::other(
"Unrecognized file format. This file was saved with an older version of kglite. \
Please rebuild the graph with the current version and save again.",
));
}
let buf = std::fs::read(path)?;
if buf.len() < 4 {
return Err(io::Error::other(
"File is too small to be a valid kglite file.",
));
}
if buf[..4] == V4_MAGIC {
load_v4(&buf)
} else if buf[..4] == V3_MAGIC {
Err(io::Error::other(V3_HARD_BREAK_MSG))
} else {
Err(io::Error::other(
"Unrecognized file format. This file was saved with an older version of kglite. \
Please rebuild the graph with the current version and save again.",
))
}
}
const V3_HARD_BREAK_MSG: &str = "kglite .kgl file format v3 is not supported by this binary. \
kglite 0.10+ uses v4 — the Value enum gained structured Node / \
Relationship / Path / List / Map variants, which changes the \
serialised property representation. Rebuild your graph from its \
original source (CSV, DataFrame, dataset loader) and save again, \
or downgrade kglite to the 0.9.x line if you need to read this \
file.";
fn load_disk_dir(dir: &std::path::Path) -> io::Result<Arc<DirGraph>> {
use crate::graph::io::load_timing::{log_stage, stage_timer};
use crate::graph::schema::GraphBackend;
let _load_t = stage_timer();
if !dir.join("disk_graph_meta.json").exists() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Directory does not contain a valid disk graph (missing disk_graph_meta.json)",
));
}
let mut graph = DirGraph::new();
let t = stage_timer();
if dir.join("metadata.json").exists() {
let meta_bytes = std::fs::read(dir.join("metadata.json"))?;
let mut meta: FileMetadata = serde_json::from_slice(&meta_bytes)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
if let Some(ntm) = read_node_type_metadata_bin(dir)? {
meta.node_type_metadata = ntm;
}
if let Some(ctm) = read_connection_type_metadata_bin(dir)? {
meta.connection_type_metadata = ctm;
}
meta.apply_to_with(&mut graph, false);
}
log_stage("metadata_json", t);
let t = stage_timer();
let loaded_from_bin = read_interner_bin(dir, &mut graph)?;
if !loaded_from_bin && dir.join("interner.json").exists() {
let interner_str = std::fs::read_to_string(dir.join("interner.json"))?;
let interner_map: std::collections::HashMap<String, String> =
serde_json::from_str(&interner_str)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
for original in interner_map.values() {
graph.interner.get_or_intern(original);
}
}
log_stage("interner_load", t);
let t = stage_timer();
let (disk_graph, temp_dir) =
crate::graph::storage::disk::graph::DiskGraph::load_from_dir(dir, &mut graph.interner)?;
log_stage("disk_graph_load", t);
if std::env::var_os("KGLITE_PREFETCH").is_some() {
let t = stage_timer();
disk_graph.prefetch_hot_regions();
log_stage("prefetch_hot_regions", t);
}
graph.graph = GraphBackend::Disk(Box::new(disk_graph));
if let Ok(mut dirs) = graph.temp_dirs.lock() {
dirs.push(temp_dir);
}
let t = stage_timer();
if let GraphBackend::Disk(ref dg) = graph.graph {
let mut loaded = false;
if let Some(base) =
crate::graph::storage::disk::type_index::TypeIndexBase::load_from(dir, &graph.interner)?
{
graph.type_indices =
crate::graph::storage::disk::type_index::TypeIndexStore::from_base(base);
loaded = true;
}
if !loaded {
let ti_path = dir.join("type_indices.bin.zst");
if ti_path.exists() {
if let Ok(compressed) = std::fs::read(&ti_path) {
if let Ok(bytes) = zstd::decode_all(compressed.as_slice()) {
match read_type_indices_bin(&bytes, &graph.interner) {
Ok(Some(indices)) => {
graph.type_indices.replace_with(indices);
loaded = true;
}
_ => {
if let Ok(indices) = bincode::deserialize(&bytes) {
graph.type_indices.replace_with(indices);
loaded = true;
}
}
}
}
}
}
}
if !loaded {
let mut new_type_indices: std::collections::HashMap<
String,
Vec<petgraph::graph::NodeIndex>,
> = std::collections::HashMap::new();
for i in 0..dg.node_slots.len() {
let slot = dg.node_slots.get(i);
if slot.is_alive() {
let key = crate::graph::schema::InternedKey::from_u64(slot.node_type);
if let Some(type_name) = graph.interner.try_resolve(key) {
new_type_indices
.entry(type_name.to_string())
.or_default()
.push(petgraph::graph::NodeIndex::new(i));
}
}
}
graph.type_indices.replace_with(new_type_indices);
}
}
log_stage("type_indices_load", t);
for (node_type, props) in &graph.node_type_metadata {
let mut schema = crate::graph::schema::TypeSchema::new();
for prop_name in props.keys() {
let key = graph.interner.get_or_intern(prop_name);
schema.add_key(key);
}
graph
.type_schemas
.insert(node_type.clone(), std::sync::Arc::new(schema));
}
let mmap_path = {
let seg0 = dir.join("seg_000/columns.bin");
if seg0.exists() {
seg0
} else {
dir.join("columns.bin")
}
};
let meta_bin_path = {
let seg0 = dir.join("seg_000/columns_meta.bin.zst");
if seg0.exists() {
seg0
} else {
dir.join("columns_meta.bin.zst")
}
};
let meta_json_path = {
let seg0 = dir.join("seg_000/columns_meta.json");
if seg0.exists() {
seg0
} else {
dir.join("columns_meta.json")
}
};
let has_mmap = mmap_path.exists() && (meta_bin_path.exists() || meta_json_path.exists());
let t = stage_timer();
if has_mmap {
use crate::graph::io::ntriples::ColumnTypeMeta;
use memmap2::MmapMut;
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&mmap_path)?;
let mmap = unsafe { MmapMut::map_mut(&file)? };
let mmap_arc = std::sync::Arc::new(mmap);
let type_metas: Vec<ColumnTypeMeta> = if meta_bin_path.exists() {
let compressed = std::fs::read(&meta_bin_path)?;
let bytes = zstd::decode_all(compressed.as_slice()).map_err(io::Error::other)?;
bincode::deserialize(&bytes).map_err(io::Error::other)?
} else {
let meta_json = std::fs::read_to_string(&meta_json_path)?;
serde_json::from_str(&meta_json).map_err(io::Error::other)?
};
for tm in type_metas {
let store = tm.to_mmap_store(std::sync::Arc::clone(&mmap_arc));
let cs = crate::graph::storage::column_store::ColumnStore::from_mmap_store(
std::sync::Arc::new(store),
);
graph.column_stores.insert(tm.type_name, Arc::new(cs));
}
load_column_sidecars(dir, &mut graph)?;
} else {
load_column_sidecars(dir, &mut graph)?;
}
log_stage("column_stores_load", t);
graph.sync_disk_column_stores();
let t = stage_timer();
if crate::graph::storage::GraphRead::is_disk(&graph.graph) {
if let Some(base) =
crate::graph::storage::disk::id_index::IdIndexBase::load_from(dir, &graph.interner)?
{
graph.id_indices = crate::graph::storage::disk::id_index::IdIndexStore::from_base(base);
} else {
let id_indices_path = dir.join("id_indices.bin.zst");
if id_indices_path.exists() {
if let Ok(compressed) = std::fs::read(&id_indices_path) {
if let Ok(bytes) = zstd::decode_all(compressed.as_slice()) {
match read_id_indices_bin(&bytes, &graph.interner) {
Ok(Some(indices)) => graph.id_indices.replace_with(indices),
_ => {
if let Ok(indices) = bincode::deserialize(&bytes) {
graph.id_indices.replace_with(indices);
}
}
}
}
}
}
}
}
log_stage("id_indices_load", t);
let t = stage_timer();
if std::env::var_os("KGLITE_EAGER_TYPE_CONNECTIVITY").is_some()
&& !graph.has_type_connectivity_cache()
{
if let Ok(Some(triples)) = read_type_connectivity_bin(dir, &graph) {
if !triples.is_empty() {
*graph.type_connectivity_cache.write().unwrap() = Some(triples);
}
}
}
log_stage("type_connectivity_load", t);
let emb_path = dir.join("embeddings.bin.zst");
if emb_path.exists() {
if let Ok(compressed) = std::fs::read(&emb_path) {
if let Ok(bytes) = zstd::decode_all(compressed.as_slice()) {
if let Ok(embeddings) =
bincode::deserialize::<HashMap<(String, String), EmbeddingStore>>(&bytes)
{
graph.embeddings = embeddings;
}
}
}
}
let ts_path = dir.join("timeseries.bin.zst");
if ts_path.exists() {
if let Ok(compressed) = std::fs::read(&ts_path) {
if let Ok(bytes) = zstd::decode_all(compressed.as_slice()) {
if let Ok(ts_store) = bincode::deserialize::<HashMap<usize, NodeTimeseries>>(&bytes)
{
graph.timeseries_store = ts_store;
}
}
}
}
let _ = read_secondary_labels_bin(dir, &mut graph);
graph.build_connection_types_cache();
log_stage("load_disk_dir_total", _load_t);
Ok(Arc::new(graph))
}
fn load_column_sidecars(
dir: &std::path::Path,
graph: &mut crate::graph::dir_graph::DirGraph,
) -> io::Result<()> {
use rayon::prelude::*;
let columns_dir = dir.join("columns");
if !columns_dir.exists() {
return Ok(());
}
struct Job {
type_name: String,
col_file: std::path::PathBuf,
schema: Arc<crate::graph::schema::TypeSchema>,
type_meta: std::collections::HashMap<String, String>,
legacy_row_count: u32,
}
let mut jobs: Vec<Job> = Vec::new();
for entry in std::fs::read_dir(&columns_dir)? {
let entry = entry?;
if !entry.file_type()?.is_dir() {
continue;
}
let type_name = entry.file_name().to_string_lossy().to_string();
if graph.column_stores.contains_key(&type_name) {
continue;
}
let col_file = entry.path().join("columns.zst");
if !col_file.exists() {
continue;
}
let schema = graph
.type_schemas
.get(&type_name)
.cloned()
.unwrap_or_else(|| std::sync::Arc::new(crate::graph::schema::TypeSchema::new()));
let type_meta = graph
.node_type_metadata
.get(&type_name)
.cloned()
.unwrap_or_default();
let legacy_row_count = graph
.type_indices
.get(&type_name)
.map(|v| v.len() as u32)
.unwrap_or(0);
jobs.push(Job {
type_name,
col_file,
schema,
type_meta,
legacy_row_count,
});
}
let interner = &graph.interner;
let results: Vec<io::Result<(String, crate::graph::storage::column_store::ColumnStore)>> = jobs
.into_par_iter()
.map(
|job| -> io::Result<(String, crate::graph::storage::column_store::ColumnStore)> {
let compressed = std::fs::read(&job.col_file)?;
let decoded = zstd::decode_all(compressed.as_slice()).map_err(io::Error::other)?;
let (packed_slice, row_count): (&[u8], u32) =
if decoded.len() >= 12 && &decoded[..8] == b"KGLCOLv1" {
let rc = u32::from_le_bytes(decoded[8..12].try_into().unwrap());
(&decoded[12..], rc)
} else {
(decoded.as_slice(), job.legacy_row_count)
};
let store = crate::graph::storage::column_store::ColumnStore::load_packed(
job.schema,
&job.type_meta,
interner,
packed_slice,
row_count,
None,
)?;
Ok((job.type_name, store))
},
)
.collect();
for r in results {
let (type_name, store) = r?;
graph.column_stores.insert(type_name, Arc::new(store));
}
Ok(())
}
fn load_v4(buf: &[u8]) -> io::Result<Arc<DirGraph>> {
if buf.len() < 12 {
return Err(io::Error::other(
"v4 file is truncated — header incomplete.",
));
}
let core_version = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]);
let metadata_len = u32::from_le_bytes([buf[8], buf[9], buf[10], buf[11]]) as usize;
if core_version > CURRENT_CORE_DATA_VERSION {
return Err(io::Error::other(format!(
"File uses core data version {} but this library only supports up to version {}. \
Please upgrade kglite.",
core_version, CURRENT_CORE_DATA_VERSION,
)));
}
let metadata_end = 12 + metadata_len;
if buf.len() < metadata_end {
return Err(io::Error::other(
"v4 file is truncated — metadata incomplete.",
));
}
let metadata: FileMetadata = serde_json::from_slice(&buf[12..metadata_end])
.map_err(|e| io::Error::other(format!("Failed to parse v3 metadata: {}", e)))?;
let topology_start = metadata_end;
let topology_end = topology_start + metadata.topology_compressed_size as usize;
let topology_compressed = &buf[topology_start..topology_end];
let topology_raw = zstd_decompress(topology_compressed)?;
let mut interner = StringInterner::new();
let graph: crate::graph::schema::GraphBackend = {
let _guard = SerdeDeserializeGuard::new(&mut interner);
bincode_deser(&topology_raw)?
};
drop(topology_raw);
let column_sections = metadata.column_sections.clone();
let embeddings_compressed_size = metadata.embeddings_compressed_size;
let timeseries_compressed_size = metadata.timeseries_compressed_size;
let secondary_labels_compressed_size = metadata.secondary_labels_compressed_size;
let mut dir_graph = DirGraph::from_graph(graph);
dir_graph.interner = interner;
metadata.apply_to(&mut dir_graph);
dir_graph.rebuild_type_indices_and_compact();
dir_graph.build_connection_types_cache();
let mut section_offset = topology_end;
let temp_dir = std::env::temp_dir().join(format!(
"kglite_v3_{}_{:x}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
));
if let Ok(mut dirs) = dir_graph.temp_dirs.lock() {
dirs.push(temp_dir.clone());
}
for section_meta in &column_sections {
let section_end = section_offset + section_meta.compressed_size as usize;
if buf.len() < section_end {
return Err(io::Error::other(format!(
"v3 file truncated — column section '{}' incomplete.",
section_meta.type_name
)));
}
let compressed = &buf[section_offset..section_end];
let packed = zstd_decompress(compressed)?;
{
let col_keys: Vec<crate::graph::schema::InternedKey> = section_meta
.columns
.keys()
.map(|name| {
dir_graph.interner.get_or_intern(name);
crate::graph::schema::InternedKey::from_str(name)
})
.collect();
let column_schema = Arc::new(crate::graph::schema::TypeSchema::from_keys(col_keys));
let type_meta = dir_graph
.node_type_metadata
.get(§ion_meta.type_name)
.cloned()
.unwrap_or_default();
let type_temp_dir = temp_dir.join(§ion_meta.type_name);
std::fs::create_dir_all(&type_temp_dir)?;
let store = ColumnStore::load_packed(
column_schema,
&type_meta,
&dir_graph.interner,
&packed,
section_meta.row_count,
Some(&type_temp_dir),
)?;
drop(packed);
dir_graph
.column_stores
.insert(section_meta.type_name.clone(), Arc::new(store));
}
section_offset = section_end;
}
for (type_name, store) in &dir_graph.column_stores {
let has_id_title = store.has_id_title_columns();
if let Some(indices) = dir_graph.type_indices.get(type_name) {
for (row_id, node_idx) in indices.iter().enumerate() {
if let Some(node) = dir_graph.graph.node_weight_mut(node_idx) {
node.properties = PropertyStorage::Columnar {
store: Arc::clone(store),
row_id: row_id as u32,
};
if has_id_title {
node.id = Value::Null;
node.title = Value::Null;
}
}
}
}
}
dir_graph.rebuild_indices_from_keys();
if embeddings_compressed_size > 0 {
let emb_end = section_offset + embeddings_compressed_size as usize;
if buf.len() >= emb_end {
let emb_compressed = &buf[section_offset..emb_end];
let emb_raw = zstd_decompress(emb_compressed)?;
let embeddings: HashMap<(String, String), EmbeddingStore> = bincode_deser(&emb_raw)?;
dir_graph.embeddings = embeddings;
section_offset = emb_end;
}
}
if timeseries_compressed_size > 0 {
let ts_end = section_offset + timeseries_compressed_size as usize;
if buf.len() >= ts_end {
let ts_compressed = &buf[section_offset..ts_end];
let ts_raw = zstd_decompress(ts_compressed)?;
let ts_store: HashMap<usize, NodeTimeseries> = bincode_deser(&ts_raw)?;
dir_graph.timeseries_store = ts_store;
section_offset = ts_end;
}
}
if secondary_labels_compressed_size > 0 {
let sl_end = section_offset + secondary_labels_compressed_size as usize;
if buf.len() >= sl_end {
let sl_compressed = &buf[section_offset..sl_end];
let sl_raw = zstd_decompress(sl_compressed)?;
decode_secondary_label_index(&sl_raw, &mut dir_graph)?;
}
}
Ok(Arc::new(dir_graph))
}
use crate::datatypes::values::Value;
const KGLE_MAGIC: [u8; 4] = *b"KGLE";
const KGLE_VERSION: u32 = 1;
#[derive(Serialize, Deserialize)]
struct ExportedEmbeddingStore {
node_type: String,
text_column: String, dimension: usize,
entries: Vec<(Value, Vec<f32>)>, }
pub enum EmbeddingExportFilter {
Types(Vec<String>),
TypeProperties(HashMap<String, Vec<String>>),
}
pub struct ExportStats {
pub stores: usize,
pub embeddings: usize,
}
pub struct ImportStats {
pub stores: usize,
pub imported: usize,
pub skipped: usize,
pub dropped_stores: usize,
}
pub fn export_embeddings_to_file(
graph: &DirGraph,
path: &str,
filter: Option<&EmbeddingExportFilter>,
) -> io::Result<ExportStats> {
let mut exported_stores: Vec<ExportedEmbeddingStore> = Vec::new();
let mut total_embeddings = 0usize;
for ((node_type, store_name), store) in &graph.embeddings {
let text_column = store_name
.strip_suffix("_emb")
.unwrap_or(store_name.as_str());
if let Some(f) = filter {
match f {
EmbeddingExportFilter::Types(types) => {
if !types.iter().any(|t| t == node_type) {
continue;
}
}
EmbeddingExportFilter::TypeProperties(map) => {
match map.get(node_type) {
None => continue, Some(props) if !props.is_empty() => {
if !props.iter().any(|p| p == text_column) {
continue;
}
}
Some(_) => {} }
}
}
}
let mut entries: Vec<(Value, Vec<f32>)> = Vec::with_capacity(store.len());
for &node_index in &store.slot_to_node {
if let Some(node) = graph
.graph
.node_weight(petgraph::graph::NodeIndex::new(node_index))
{
if let Some(embedding) = store.get_embedding(node_index) {
entries.push((node.id().into_owned(), embedding.to_vec()));
}
}
}
total_embeddings += entries.len();
exported_stores.push(ExportedEmbeddingStore {
node_type: node_type.clone(),
text_column: text_column.to_string(),
dimension: store.dimension,
entries,
});
}
let file = File::create(path)?;
let mut writer = BufWriter::new(file);
writer.write_all(&KGLE_MAGIC)?;
writer.write_all(&KGLE_VERSION.to_le_bytes())?;
let gz = GzEncoder::new(&mut writer, Compression::new(3));
bincode_options()
.serialize_into(gz, &exported_stores)
.map_err(|e| io::Error::other(format!("Failed to serialize embeddings: {}", e)))?;
writer.flush()?;
Ok(ExportStats {
stores: exported_stores.len(),
embeddings: total_embeddings,
})
}
pub fn import_embeddings_from_file(graph: &mut DirGraph, path: &str) -> io::Result<ImportStats> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let mut buf = Vec::new();
reader.read_to_end(&mut buf)?;
if buf.len() < 8 {
return Err(io::Error::other(
"File is too small to be a valid .kgle file.",
));
}
if buf[..4] != KGLE_MAGIC {
return Err(io::Error::other(
"Not a valid .kgle file (bad magic bytes).",
));
}
let version = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]);
if version > KGLE_VERSION {
return Err(io::Error::other(format!(
"Embedding file version {} is newer than supported version {}. Please upgrade kglite.",
version, KGLE_VERSION,
)));
}
let gz = GzDecoder::new(&buf[8..]);
let exported_stores: Vec<ExportedEmbeddingStore> = bincode_options()
.deserialize_from(gz)
.map_err(|e| io::Error::other(format!("Failed to deserialize embedding data: {}", e)))?;
let mut total_imported = 0usize;
let mut total_skipped = 0usize;
let mut stores_count = 0usize;
let mut dropped_stores = 0usize;
for exported in exported_stores {
graph.build_id_index(&exported.node_type);
let mut store = crate::graph::schema::EmbeddingStore::new(exported.dimension);
store
.data
.reserve(exported.entries.len() * exported.dimension);
let mut imported = 0usize;
let mut skipped = 0usize;
for (id, vec) in &exported.entries {
match graph.lookup_by_id(&exported.node_type, id) {
Some(node_idx) => {
store.set_embedding(node_idx.index(), vec);
imported += 1;
}
None => {
skipped += 1;
}
}
}
if imported > 0 {
let key = (exported.node_type, format!("{}_emb", exported.text_column));
graph.embeddings.insert(key, store);
stores_count += 1;
} else if !exported.entries.is_empty() {
dropped_stores += 1;
}
total_imported += imported;
total_skipped += skipped;
}
Ok(ImportStats {
stores: stores_count,
imported: total_imported,
skipped: total_skipped,
dropped_stores,
})
}