use crate::datatypes::values::Value;
use crate::graph::schema::{InternedKey, SerdeDeserializeGuard, StringInterner};
use crate::graph::storage::mapped::mmap_vec::{MmapBytes, MmapOrVec};
use std::borrow::Cow;
use std::collections::HashMap;
use std::io;
use std::path::Path;
pub const OFFSETS_FILE: &str = "edge_prop_offsets.bin";
pub const HEAP_FILE: &str = "edge_prop_heap.bin";
pub const LEGACY_FILE: &str = "edge_properties.bin.zst";
#[derive(Debug, serde::Serialize, serde::Deserialize, Default, Clone, Copy)]
pub struct EdgePropertyStoreMeta {
pub offsets_len: usize,
pub heap_len: usize,
}
#[derive(Debug)]
struct ColumnarBase {
offsets: MmapOrVec<u64>,
heap: MmapBytes,
}
impl ColumnarBase {
fn slot(&self, edge_idx: u32) -> Option<&[u8]> {
let i = edge_idx as usize;
if i + 1 >= self.offsets.len() {
return None;
}
let start = self.offsets.get(i) as usize;
let end = self.offsets.get(i + 1) as usize;
if start == end {
return Some(&[]);
}
Some(self.heap.slice(start, end))
}
fn len(&self) -> u32 {
self.offsets.len().saturating_sub(1) as u32
}
}
fn decode_props(bytes: &[u8]) -> Option<Vec<(InternedKey, Value)>> {
let raw: Vec<(u64, Value)> = bincode::deserialize(bytes).ok()?;
Some(
raw.into_iter()
.map(|(k, v)| (InternedKey::from_u64(k), v))
.collect(),
)
}
fn encode_props_into(props: &[(InternedKey, Value)], heap: &mut Vec<u8>) -> io::Result<()> {
let raw: Vec<(u64, &Value)> = props.iter().map(|(k, v)| (k.as_u64(), v)).collect();
bincode::serialize_into(heap, &raw).map_err(io::Error::other)
}
#[derive(Debug, Default)]
pub struct EdgePropertyStore {
base: Option<ColumnarBase>,
overlay: HashMap<u32, Option<Vec<(InternedKey, Value)>>>,
}
impl EdgePropertyStore {
pub fn new() -> Self {
Self::default()
}
pub fn from_overlay(map: HashMap<u32, Vec<(InternedKey, Value)>>) -> Self {
Self {
base: None,
overlay: map.into_iter().map(|(k, v)| (k, Some(v))).collect(),
}
}
pub fn get(&self, edge_idx: u32) -> Option<Cow<'_, [(InternedKey, Value)]>> {
if let Some(entry) = self.overlay.get(&edge_idx) {
return entry.as_ref().map(|v| Cow::Borrowed(v.as_slice()));
}
let base = self.base.as_ref()?;
let bytes = base.slot(edge_idx)?;
if bytes.is_empty() {
return None;
}
let decoded = decode_props(bytes)?;
if decoded.is_empty() {
None
} else {
Some(Cow::Owned(decoded))
}
}
pub fn insert(&mut self, edge_idx: u32, props: Vec<(InternedKey, Value)>) {
if props.is_empty() {
self.remove(edge_idx);
return;
}
self.overlay.insert(edge_idx, Some(props));
}
pub fn remove(&mut self, edge_idx: u32) {
if let Some(base) = self.base.as_ref() {
if edge_idx < base.len() {
self.overlay.insert(edge_idx, None);
return;
}
}
self.overlay.remove(&edge_idx);
}
pub fn take(&mut self, edge_idx: u32) -> Option<Vec<(InternedKey, Value)>> {
let current = self
.get(edge_idx)
.map(|cow| cow.into_owned())
.filter(|v| !v.is_empty());
self.remove(edge_idx);
current
}
pub fn is_empty(&self) -> bool {
if self.base.as_ref().is_some_and(|b| b.len() > 0) {
return false;
}
self.overlay.values().all(|v| v.is_none())
}
pub fn upper_bound(&self) -> u32 {
let base_upper = self.base.as_ref().map(|b| b.len()).unwrap_or(0);
let overlay_upper = self
.overlay
.keys()
.max()
.copied()
.map(|k| k.saturating_add(1))
.unwrap_or(0);
base_upper.max(overlay_upper)
}
pub fn deep_clone(&self) -> Self {
let mut new = EdgePropertyStore::new();
if let Some(base) = self.base.as_ref() {
for edge_idx in 0..base.len() {
if let Some(cow) = self.get(edge_idx) {
new.insert(edge_idx, cow.into_owned());
}
}
}
for (idx, entry) in &self.overlay {
if let Some(props) = entry {
new.insert(*idx, props.clone());
}
}
new
}
pub fn save_to(&mut self, target_dir: &Path, upper_bound: u32) -> io::Result<()> {
let offsets_path = target_dir.join(OFFSETS_FILE);
let heap_path = target_dir.join(HEAP_FILE);
if let Some(base) = self.base.as_ref() {
if base.offsets.file_path() == Some(&offsets_path) {
self.base = None;
}
}
if self.is_empty() {
std::fs::write(&offsets_path, b"")?;
std::fs::write(&heap_path, b"")?;
let legacy = target_dir.join(LEGACY_FILE);
if legacy.exists() {
let _ = std::fs::remove_file(&legacy);
}
self.overlay.clear();
return Ok(());
}
let mut offsets: Vec<u64> = Vec::with_capacity(upper_bound as usize + 1);
let heap_hint: usize = self
.overlay
.values()
.map(|v| v.as_ref().map_or(0, |p| 32 + 16 * p.len()))
.sum();
let mut heap: Vec<u8> = Vec::with_capacity(heap_hint);
for edge_idx in 0..upper_bound {
offsets.push(heap.len() as u64);
if let Some(cow) = self.get(edge_idx) {
if !cow.is_empty() {
encode_props_into(cow.as_ref(), &mut heap)?;
}
}
}
offsets.push(heap.len() as u64);
MmapOrVec::from_vec(offsets).save_to_file(&offsets_path)?;
std::fs::write(&heap_path, &heap)?;
let legacy = target_dir.join(LEGACY_FILE);
if legacy.exists() {
let _ = std::fs::remove_file(&legacy);
}
self.overlay.clear();
Ok(())
}
pub fn load_from(
dir: &Path,
format_version: u8,
meta: EdgePropertyStoreMeta,
interner: &mut StringInterner,
) -> io::Result<Self> {
if format_version == 0 || (meta.offsets_len == 0 && !dir.join(OFFSETS_FILE).exists()) {
return Self::load_legacy(dir, interner);
}
let offsets_path = dir.join(OFFSETS_FILE);
let heap_path = dir.join(HEAP_FILE);
if !offsets_path.exists() {
return Ok(Self::new());
}
if meta.offsets_len == 0 {
return Ok(Self::new());
}
let offsets = MmapOrVec::<u64>::load_mapped(&offsets_path, meta.offsets_len)?;
let heap = MmapBytes::load_mapped(&heap_path, meta.heap_len)?;
Ok(Self {
base: Some(ColumnarBase { offsets, heap }),
overlay: HashMap::new(),
})
}
fn load_legacy(dir: &Path, interner: &mut StringInterner) -> io::Result<Self> {
let legacy = dir.join(LEGACY_FILE);
if !legacy.exists() {
return Ok(Self::new());
}
let compressed = std::fs::read(&legacy)?;
let bytes = zstd::decode_all(compressed.as_slice()).map_err(io::Error::other)?;
let _guard = SerdeDeserializeGuard::new(interner);
let map: HashMap<u32, Vec<(InternedKey, Value)>> =
bincode::deserialize(&bytes).map_err(io::Error::other)?;
Ok(Self::from_overlay(map))
}
pub fn meta_for(dir: &Path) -> EdgePropertyStoreMeta {
let offsets = dir.join(OFFSETS_FILE);
let heap = dir.join(HEAP_FILE);
EdgePropertyStoreMeta {
offsets_len: std::fs::metadata(&offsets)
.map(|m| m.len() as usize / std::mem::size_of::<u64>())
.unwrap_or(0),
heap_len: std::fs::metadata(&heap)
.map(|m| m.len() as usize)
.unwrap_or(0),
}
}
}
#[cfg(test)]
#[allow(clippy::approx_constant)]
mod tests {
use super::*;
use crate::datatypes::values::Value;
use crate::graph::schema::{SerdeSerializeGuard, StringInterner};
use tempfile::TempDir;
fn k(s: &str, interner: &mut StringInterner) -> InternedKey {
interner.get_or_intern(s)
}
#[test]
fn empty_store_is_empty() {
let s = EdgePropertyStore::new();
assert!(s.is_empty());
}
#[test]
fn insert_and_get_overlay_hit() {
let mut interner = StringInterner::new();
let mut s = EdgePropertyStore::new();
let props = vec![(k("weight", &mut interner), Value::Float64(1.5))];
s.insert(42, props.clone());
let got = s.get(42).expect("should hit overlay");
assert_eq!(got.as_ref(), props.as_slice());
assert!(!s.is_empty());
}
#[test]
fn remove_without_base_drops_entry() {
let mut interner = StringInterner::new();
let mut s = EdgePropertyStore::new();
s.insert(7, vec![(k("x", &mut interner), Value::Int64(1))]);
s.remove(7);
assert!(s.get(7).is_none());
assert!(s.is_empty());
}
#[test]
fn insert_empty_normalises_to_absent() {
let mut s = EdgePropertyStore::new();
s.insert(1, vec![]);
assert!(s.get(1).is_none());
assert!(s.is_empty());
}
#[test]
fn save_and_load_round_trip() {
let tmp = TempDir::new().unwrap();
let mut interner = StringInterner::new();
let mut s = EdgePropertyStore::new();
let p0 = vec![
(k("name", &mut interner), Value::String("alpha".into())),
(k("rank", &mut interner), Value::Int64(7)),
];
let p1 = vec![(k("weight", &mut interner), Value::Float64(3.14))];
s.insert(0, p0.clone());
s.insert(3, p1.clone());
s.save_to(tmp.path(), 4).unwrap();
let meta = EdgePropertyStore::meta_for(tmp.path());
assert_eq!(meta.offsets_len, 5);
assert!(meta.heap_len > 0);
let reloaded = EdgePropertyStore::load_from(tmp.path(), 1, meta, &mut interner).unwrap();
assert_eq!(reloaded.get(0).unwrap().as_ref(), p0.as_slice());
assert!(reloaded.get(1).is_none());
assert!(reloaded.get(2).is_none());
assert_eq!(reloaded.get(3).unwrap().as_ref(), p1.as_slice());
}
#[test]
fn overlay_tombstones_hide_base() {
let tmp = TempDir::new().unwrap();
let tmp2 = TempDir::new().unwrap();
let mut interner = StringInterner::new();
let mut s = EdgePropertyStore::new();
s.insert(5, vec![(k("a", &mut interner), Value::Int64(99))]);
s.save_to(tmp.path(), 6).unwrap();
let meta = EdgePropertyStore::meta_for(tmp.path());
let mut reloaded =
EdgePropertyStore::load_from(tmp.path(), 1, meta, &mut interner).unwrap();
assert!(reloaded.get(5).is_some());
reloaded.remove(5);
assert!(reloaded.get(5).is_none());
reloaded.save_to(tmp2.path(), 6).unwrap();
let meta2 = EdgePropertyStore::meta_for(tmp2.path());
let after = EdgePropertyStore::load_from(tmp2.path(), 1, meta2, &mut interner).unwrap();
assert!(after.get(5).is_none());
}
#[test]
fn take_returns_and_removes() {
let mut interner = StringInterner::new();
let mut s = EdgePropertyStore::new();
let p = vec![(k("t", &mut interner), Value::Boolean(true))];
s.insert(11, p.clone());
let taken = s.take(11).unwrap();
assert_eq!(taken, p);
assert!(s.get(11).is_none());
}
#[test]
fn legacy_load_reads_hashmap_blob() {
let tmp = TempDir::new().unwrap();
let mut interner = StringInterner::new();
let mut map: HashMap<u32, Vec<(InternedKey, Value)>> = HashMap::new();
map.insert(
2,
vec![(k("legacy", &mut interner), Value::String("old".into()))],
);
{
let _g = SerdeSerializeGuard::new(&interner);
let raw = bincode::serialize(&map).unwrap();
let compressed = zstd::encode_all(raw.as_slice(), 3).unwrap();
std::fs::write(tmp.path().join(LEGACY_FILE), compressed).unwrap();
}
let loaded = EdgePropertyStore::load_from(
tmp.path(),
0,
EdgePropertyStoreMeta::default(),
&mut interner,
)
.unwrap();
let got = loaded.get(2).expect("should load from legacy");
assert_eq!(got.as_ref().len(), 1);
assert_eq!(got.as_ref()[0].1, Value::String("old".into()));
}
#[test]
fn empty_store_save_emits_zero_length_files_and_reload_preserves_semantics() {
let tmp = TempDir::new().unwrap();
let mut interner = StringInterner::new();
let mut s = EdgePropertyStore::new();
s.save_to(tmp.path(), 1_000_000).unwrap();
let offsets_meta = std::fs::metadata(tmp.path().join(OFFSETS_FILE)).unwrap();
let heap_meta = std::fs::metadata(tmp.path().join(HEAP_FILE)).unwrap();
assert_eq!(offsets_meta.len(), 0, "offsets file should be empty");
assert_eq!(heap_meta.len(), 0, "heap file should be empty");
let meta = EdgePropertyStore::meta_for(tmp.path());
assert_eq!(meta.offsets_len, 0);
assert_eq!(meta.heap_len, 0);
let reloaded = EdgePropertyStore::load_from(tmp.path(), 1, meta, &mut interner).unwrap();
assert!(reloaded.is_empty());
assert!(reloaded.get(0).is_none());
assert!(reloaded.get(999_999).is_none());
assert!(reloaded.get(u32::MAX).is_none());
}
}