#![cfg(all(feature = "compact-store", feature = "mmap"))]
use std::path::{Path, PathBuf};
use std::sync::Arc;
use grafeo_common::storage::section::Section;
use grafeo_common::utils::error::{Error, Result};
use grafeo_core::graph::compact::CompactStore;
use grafeo_core::graph::compact::section::CompactStoreSection;
use memmap2::Mmap;
use parking_lot::RwLock;
pub struct CompactStoreTiered {
state: RwLock<TierState>,
}
enum TierState {
InMemory(Arc<CompactStore>),
OnDisk {
path: PathBuf,
_mmap: Mmap,
store: Arc<CompactStore>,
},
}
impl CompactStoreTiered {
#[must_use]
pub fn new_in_memory(store: Arc<CompactStore>) -> Self {
Self {
state: RwLock::new(TierState::InMemory(store)),
}
}
#[must_use]
pub fn store(&self) -> Arc<CompactStore> {
match &*self.state.read() {
TierState::InMemory(store) => Arc::clone(store),
TierState::OnDisk { store, .. } => Arc::clone(store),
}
}
#[must_use]
pub fn is_on_disk(&self) -> bool {
matches!(&*self.state.read(), TierState::OnDisk { .. })
}
#[must_use]
pub fn path(&self) -> Option<PathBuf> {
match &*self.state.read() {
TierState::OnDisk { path, .. } => Some(path.clone()),
TierState::InMemory(_) => None,
}
}
pub fn persist(&self, path: &Path) -> Result<usize> {
let store = self.store();
let section = CompactStoreSection::new(store);
let bytes = section.serialize()?;
write_atomically(path, &bytes)?;
Ok(bytes.len())
}
pub fn persist_to_mmap(&self, path: &Path) -> Result<usize> {
let bytes = {
let store = self.store();
let section = CompactStoreSection::new(store);
section.serialize()?
};
write_atomically(path, &bytes)?;
let (mmap, store) = open_and_deserialize(path)?;
let written = bytes.len();
*self.state.write() = TierState::OnDisk {
path: path.to_path_buf(),
_mmap: mmap,
store,
};
Ok(written)
}
pub fn open_mmap(path: &Path) -> Result<Self> {
let (mmap, store) = open_and_deserialize(path)?;
Ok(Self {
state: RwLock::new(TierState::OnDisk {
path: path.to_path_buf(),
_mmap: mmap,
store,
}),
})
}
pub fn reload_to_ram(&self) {
let mut guard = self.state.write();
if let TierState::OnDisk { store, .. } = &*guard {
let cloned = Arc::clone(store);
*guard = TierState::InMemory(cloned);
}
}
#[must_use]
pub fn memory_bytes(&self) -> usize {
self.store().memory_bytes()
}
}
fn write_atomically(path: &Path, bytes: &[u8]) -> Result<()> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.map_err(|e| Error::Internal(format!("create dir for {}: {e}", parent.display())))?;
}
let tmp = path.with_extension("grafeo.tmp");
std::fs::write(&tmp, bytes)
.map_err(|e| Error::Internal(format!("write {}: {e}", tmp.display())))?;
std::fs::rename(&tmp, path).map_err(|e| {
Error::Internal(format!(
"rename {} -> {}: {e}",
tmp.display(),
path.display()
))
})?;
Ok(())
}
fn open_and_deserialize(path: &Path) -> Result<(Mmap, Arc<CompactStore>)> {
let file = std::fs::File::open(path)
.map_err(|e| Error::Internal(format!("open {}: {e}", path.display())))?;
#[allow(unsafe_code)]
let mmap = unsafe { Mmap::map(&file) }
.map_err(|e| Error::Internal(format!("mmap {}: {e}", path.display())))?;
let mut section = CompactStoreSection::empty();
section.deserialize(&mmap[..])?;
let store = section.store().ok_or_else(|| {
Error::Internal(format!(
"empty CompactStoreSection after deserialize of {}",
path.display()
))
})?;
Ok((mmap, store))
}
#[cfg(test)]
mod tests {
use super::*;
use grafeo_common::types::{PropertyKey, Value};
use grafeo_core::graph::compact::builder::from_graph_store;
use grafeo_core::graph::lpg::LpgStore;
use grafeo_core::graph::traits::GraphStore;
fn build_sample_store() -> Arc<CompactStore> {
let lpg = LpgStore::new().expect("lpg store");
for i in 0..16 {
let id = lpg.create_node(&["Person"]);
lpg.set_node_property(id, "age", Value::Int64(i as i64));
lpg.set_node_property(id, "name", Value::String(arcstr::format!("person-{i}")));
}
let compact = from_graph_store(&lpg).expect("compact");
Arc::new(compact)
}
#[test]
fn in_memory_roundtrip() {
let store = build_sample_store();
let expected = store.memory_bytes();
let tiered = CompactStoreTiered::new_in_memory(store);
assert!(!tiered.is_on_disk());
assert_eq!(tiered.memory_bytes(), expected);
}
#[test]
fn persist_and_mmap_round_trip() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("base.compact");
let store = build_sample_store();
let expected_nodes = store.node_count();
let tiered = CompactStoreTiered::new_in_memory(store);
let written = tiered.persist_to_mmap(&path).expect("persist_to_mmap");
assert!(written > 0);
assert!(tiered.is_on_disk());
assert_eq!(tiered.path().as_deref(), Some(path.as_path()));
let store_after = tiered.store();
assert_eq!(store_after.node_count(), expected_nodes);
}
#[test]
fn open_mmap_reads_existing_file() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("base.compact");
let expected_nodes = {
let store = build_sample_store();
let expected = store.node_count();
let tiered = CompactStoreTiered::new_in_memory(store);
tiered.persist_to_mmap(&path).expect("persist_to_mmap");
expected
};
let reopened = CompactStoreTiered::open_mmap(&path).expect("open_mmap");
assert!(reopened.is_on_disk());
assert_eq!(reopened.store().node_count(), expected_nodes);
}
#[test]
fn reload_to_ram_transitions_state() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("base.compact");
let tiered = CompactStoreTiered::new_in_memory(build_sample_store());
tiered.persist_to_mmap(&path).expect("persist_to_mmap");
assert!(tiered.is_on_disk());
tiered.reload_to_ram();
assert!(!tiered.is_on_disk());
assert!(tiered.path().is_none());
let store = tiered.store();
assert!(store.node_count() > 0);
}
#[test]
fn persist_without_mmap_keeps_memory_state() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("snapshot.compact");
let tiered = CompactStoreTiered::new_in_memory(build_sample_store());
let written = tiered.persist(&path).expect("persist");
assert!(written > 0);
assert!(
!tiered.is_on_disk(),
"persist() alone must not change tier state"
);
assert!(path.exists());
}
#[test]
fn spill_drops_original_arc() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("base.compact");
let tiered = CompactStoreTiered::new_in_memory(build_sample_store());
assert_eq!(Arc::strong_count(&tiered.store()), 2);
tiered.persist_to_mmap(&path).expect("persist_to_mmap");
let after = tiered.store();
assert_eq!(Arc::strong_count(&after), 2);
}
#[test]
fn store_values_survive_mmap_round_trip() {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("base.compact");
let tiered = CompactStoreTiered::new_in_memory(build_sample_store());
let before = tiered.store();
let first_id = before.nodes_by_label("Person").first().copied();
let first_name =
first_id.and_then(|id| before.get_node_property(id, &PropertyKey::new("name")));
tiered.persist_to_mmap(&path).expect("persist_to_mmap");
let after = tiered.store();
let after_name =
first_id.and_then(|id| after.get_node_property(id, &PropertyKey::new("name")));
assert_eq!(first_name, after_name);
}
}