use std::sync::Arc;
use arc_swap::ArcSwap;
use arcstr::ArcStr;
use grafeo_common::types::{EdgeId, EpochId, NodeId, PropertyKey, TransactionId, Value};
use grafeo_common::utils::hash::{FxHashMap, FxHashSet};
use parking_lot::RwLock;
use super::CompactStore;
use crate::graph::Direction;
use crate::graph::lpg::{CompareOp, Edge, LpgStore, Node};
use crate::graph::traits::{GraphStore, GraphStoreMut, GraphStoreSearch};
#[cfg(feature = "vector-index")]
use crate::index::vector::DistanceMetric;
use crate::statistics::Statistics;
pub struct LayeredStore {
base: ArcSwap<CompactStore>,
overlay: Arc<LpgStore>,
dirty_node_ids: RwLock<FxHashSet<NodeId>>,
dirty_edge_ids: RwLock<FxHashSet<EdgeId>>,
deleted_from_base_nodes: RwLock<FxHashSet<NodeId>>,
deleted_from_base_edges: RwLock<FxHashSet<EdgeId>>,
}
impl std::fmt::Debug for LayeredStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LayeredStore")
.field("base_node_count", &self.base.load().node_count())
.field("overlay_node_count", &self.overlay.node_count())
.field("dirty_nodes", &self.dirty_node_ids.read().len())
.field(
"deleted_base_nodes",
&self.deleted_from_base_nodes.read().len(),
)
.finish_non_exhaustive()
}
}
impl LayeredStore {
pub fn new(
base: CompactStore,
max_node_id: u64,
max_edge_id: u64,
) -> Result<Self, grafeo_common::memory::AllocError> {
let overlay = Arc::new(LpgStore::new()?);
overlay.set_next_node_id(max_node_id + 1);
overlay.set_next_edge_id(max_edge_id + 1);
Ok(Self {
base: ArcSwap::new(Arc::new(base)),
overlay,
dirty_node_ids: RwLock::new(FxHashSet::default()),
dirty_edge_ids: RwLock::new(FxHashSet::default()),
deleted_from_base_nodes: RwLock::new(FxHashSet::default()),
deleted_from_base_edges: RwLock::new(FxHashSet::default()),
})
}
#[must_use]
pub fn base_store_arc(&self) -> Arc<CompactStore> {
self.base.load_full()
}
pub fn swap_base(&self, new_base: Arc<CompactStore>) -> Arc<CompactStore> {
self.base.swap(new_base)
}
#[must_use]
pub fn overlay_store(&self) -> &Arc<LpgStore> {
&self.overlay
}
#[must_use]
pub fn overlay_mutation_count(&self) -> usize {
self.dirty_node_ids.read().len()
+ self.dirty_edge_ids.read().len()
+ self.deleted_from_base_nodes.read().len()
+ self.deleted_from_base_edges.read().len()
}
#[must_use]
pub fn memory_bytes(&self) -> usize {
let (store_mem, index_mem, mvcc_mem, pool_mem) = self.overlay.memory_breakdown();
self.base.load().memory_bytes()
+ store_mem.total_bytes
+ index_mem.total_bytes
+ mvcc_mem.total_bytes
+ pool_mem.total_bytes
}
#[inline]
fn is_node_dirty(&self, id: NodeId) -> bool {
self.dirty_node_ids.read().contains(&id)
}
#[inline]
fn is_node_deleted_from_base(&self, id: NodeId) -> bool {
self.deleted_from_base_nodes.read().contains(&id)
}
#[inline]
fn is_edge_dirty(&self, id: EdgeId) -> bool {
self.dirty_edge_ids.read().contains(&id)
}
#[inline]
fn is_edge_deleted_from_base(&self, id: EdgeId) -> bool {
self.deleted_from_base_edges.read().contains(&id)
}
}
impl GraphStore for LayeredStore {
fn get_node(&self, id: NodeId) -> Option<Node> {
if self.is_node_deleted_from_base(id) {
return None;
}
if self.is_node_dirty(id) {
return self.overlay.get_node(id);
}
self.base
.load()
.get_node(id)
.or_else(|| self.overlay.get_node(id))
}
fn get_edge(&self, id: EdgeId) -> Option<Edge> {
if self.is_edge_deleted_from_base(id) {
return None;
}
if self.is_edge_dirty(id) {
return self.overlay.get_edge(id);
}
self.base
.load()
.get_edge(id)
.or_else(|| self.overlay.get_edge(id))
}
fn get_node_versioned(
&self,
id: NodeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> Option<Node> {
if self.is_node_deleted_from_base(id) {
return None;
}
if self.is_node_dirty(id) {
return self.overlay.get_node_versioned(id, epoch, transaction_id);
}
self.base
.load()
.get_node(id)
.or_else(|| self.overlay.get_node_versioned(id, epoch, transaction_id))
}
fn get_edge_versioned(
&self,
id: EdgeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> Option<Edge> {
if self.is_edge_deleted_from_base(id) {
return None;
}
if self.is_edge_dirty(id) {
return self.overlay.get_edge_versioned(id, epoch, transaction_id);
}
self.base
.load()
.get_edge(id)
.or_else(|| self.overlay.get_edge_versioned(id, epoch, transaction_id))
}
fn get_node_at_epoch(&self, id: NodeId, epoch: EpochId) -> Option<Node> {
if self.is_node_deleted_from_base(id) {
return None;
}
if self.is_node_dirty(id) {
return self.overlay.get_node_at_epoch(id, epoch);
}
self.base
.load()
.get_node(id)
.or_else(|| self.overlay.get_node_at_epoch(id, epoch))
}
fn get_edge_at_epoch(&self, id: EdgeId, epoch: EpochId) -> Option<Edge> {
if self.is_edge_deleted_from_base(id) {
return None;
}
if self.is_edge_dirty(id) {
return self.overlay.get_edge_at_epoch(id, epoch);
}
self.base
.load()
.get_edge(id)
.or_else(|| self.overlay.get_edge_at_epoch(id, epoch))
}
fn get_node_property(&self, id: NodeId, key: &PropertyKey) -> Option<Value> {
if self.is_node_deleted_from_base(id) {
return None;
}
if self.is_node_dirty(id) {
return self.overlay.get_node_property(id, key);
}
self.base
.load()
.get_node_property(id, key)
.or_else(|| self.overlay.get_node_property(id, key))
}
fn get_edge_property(&self, id: EdgeId, key: &PropertyKey) -> Option<Value> {
if self.is_edge_deleted_from_base(id) {
return None;
}
if self.is_edge_dirty(id) {
return self.overlay.get_edge_property(id, key);
}
self.base
.load()
.get_edge_property(id, key)
.or_else(|| self.overlay.get_edge_property(id, key))
}
fn get_node_property_batch(&self, ids: &[NodeId], key: &PropertyKey) -> Vec<Option<Value>> {
ids.iter()
.map(|id| self.get_node_property(*id, key))
.collect()
}
fn get_nodes_properties_batch(&self, ids: &[NodeId]) -> Vec<FxHashMap<PropertyKey, Value>> {
ids.iter()
.map(|id| {
self.get_node(*id)
.map(|n| {
n.properties
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
})
.unwrap_or_default()
})
.collect()
}
fn get_nodes_properties_selective_batch(
&self,
ids: &[NodeId],
keys: &[PropertyKey],
) -> Vec<FxHashMap<PropertyKey, Value>> {
ids.iter()
.map(|id| {
let mut map = FxHashMap::default();
for key in keys {
if let Some(v) = self.get_node_property(*id, key) {
map.insert(key.clone(), v);
}
}
map
})
.collect()
}
fn get_edges_properties_selective_batch(
&self,
ids: &[EdgeId],
keys: &[PropertyKey],
) -> Vec<FxHashMap<PropertyKey, Value>> {
ids.iter()
.map(|id| {
let mut map = FxHashMap::default();
for key in keys {
if let Some(v) = self.get_edge_property(*id, key) {
map.insert(key.clone(), v);
}
}
map
})
.collect()
}
fn neighbors(&self, node: NodeId, direction: Direction) -> Vec<NodeId> {
let deleted_nodes = self.deleted_from_base_nodes.read();
let mut results = Vec::new();
if !deleted_nodes.contains(&node) && !self.is_node_dirty(node) {
for nid in self.base.load().neighbors(node, direction) {
if !deleted_nodes.contains(&nid) {
results.push(nid);
}
}
}
for nid in self.overlay.neighbors(node, direction) {
if !deleted_nodes.contains(&nid) {
results.push(nid);
}
}
results.sort_unstable();
results.dedup();
results
}
fn edges_from(&self, node: NodeId, direction: Direction) -> Vec<(NodeId, EdgeId)> {
let deleted_nodes = self.deleted_from_base_nodes.read();
let deleted_edges = self.deleted_from_base_edges.read();
let mut results = Vec::new();
if !deleted_nodes.contains(&node) && !self.is_node_dirty(node) {
for (target, eid) in self.base.load().edges_from(node, direction) {
if !deleted_nodes.contains(&target) && !deleted_edges.contains(&eid) {
results.push((target, eid));
}
}
}
for (target, eid) in self.overlay.edges_from(node, direction) {
if !deleted_nodes.contains(&target) && !deleted_edges.contains(&eid) {
results.push((target, eid));
}
}
results.sort_unstable_by_key(|&(_, eid)| eid);
results.dedup_by_key(|&mut (_, eid)| eid);
results
}
fn out_degree(&self, node: NodeId) -> usize {
self.edges_from(node, Direction::Outgoing).len()
}
fn in_degree(&self, node: NodeId) -> usize {
self.edges_from(node, Direction::Incoming).len()
}
fn has_backward_adjacency(&self) -> bool {
self.base.load().has_backward_adjacency() || self.overlay.has_backward_adjacency()
}
fn node_ids(&self) -> Vec<NodeId> {
let deleted = self.deleted_from_base_nodes.read();
let mut ids: Vec<NodeId> = self
.base
.load()
.node_ids()
.into_iter()
.filter(|id| !deleted.contains(id))
.collect();
ids.extend(self.overlay.node_ids());
ids.sort_unstable();
ids.dedup();
ids
}
fn nodes_by_label(&self, label: &str) -> Vec<NodeId> {
let deleted = self.deleted_from_base_nodes.read();
let dirty = self.dirty_node_ids.read();
let mut ids: Vec<NodeId> = self
.base
.load()
.nodes_by_label(label)
.into_iter()
.filter(|id| !deleted.contains(id) && !dirty.contains(id))
.collect();
ids.extend(
self.overlay
.nodes_by_label(label)
.into_iter()
.filter(|id| !deleted.contains(id)),
);
ids.sort_unstable();
ids.dedup();
ids
}
fn node_count(&self) -> usize {
let base_count = self.base.load().node_count();
let deleted = self.deleted_from_base_nodes.read().len();
let overlay_count = self.overlay.node_count();
let promoted = self
.dirty_node_ids
.read()
.iter()
.filter(|id| self.base.load().get_node(**id).is_some())
.count();
base_count - deleted - promoted + overlay_count
}
fn edge_count(&self) -> usize {
let base_count = self.base.load().edge_count();
let deleted = self.deleted_from_base_edges.read().len();
let overlay_count = self.overlay.edge_count();
let promoted = self
.dirty_edge_ids
.read()
.iter()
.filter(|id| self.base.load().get_edge(**id).is_some())
.count();
base_count - deleted - promoted + overlay_count
}
fn edge_type(&self, id: EdgeId) -> Option<ArcStr> {
if self.is_edge_deleted_from_base(id) {
return None;
}
if self.is_edge_dirty(id) {
return self.overlay.edge_type(id);
}
self.base
.load()
.edge_type(id)
.or_else(|| self.overlay.edge_type(id))
}
fn find_nodes_by_property(&self, property: &str, value: &Value) -> Vec<NodeId> {
let deleted = self.deleted_from_base_nodes.read();
let dirty = self.dirty_node_ids.read();
let mut results: Vec<NodeId> = self
.base
.load()
.find_nodes_by_property(property, value)
.into_iter()
.filter(|id| !deleted.contains(id) && !dirty.contains(id))
.collect();
results.extend(self.overlay.find_nodes_by_property(property, value));
results
}
fn find_nodes_by_properties(&self, conditions: &[(&str, Value)]) -> Vec<NodeId> {
if conditions.is_empty() {
return self.node_ids();
}
let deleted = self.deleted_from_base_nodes.read();
let dirty = self.dirty_node_ids.read();
let mut results: Vec<NodeId> = self
.base
.load()
.find_nodes_by_properties(conditions)
.into_iter()
.filter(|id| !deleted.contains(id) && !dirty.contains(id))
.collect();
results.extend(self.overlay.find_nodes_by_properties(conditions));
results
}
fn find_nodes_in_range(
&self,
property: &str,
min: Option<&Value>,
max: Option<&Value>,
min_inclusive: bool,
max_inclusive: bool,
) -> Vec<NodeId> {
let deleted = self.deleted_from_base_nodes.read();
let dirty = self.dirty_node_ids.read();
let mut results: Vec<NodeId> = self
.base
.load()
.find_nodes_in_range(property, min, max, min_inclusive, max_inclusive)
.into_iter()
.filter(|id| !deleted.contains(id) && !dirty.contains(id))
.collect();
results.extend(self.overlay.find_nodes_in_range(
property,
min,
max,
min_inclusive,
max_inclusive,
));
results
}
fn node_property_might_match(
&self,
property: &PropertyKey,
op: CompareOp,
value: &Value,
) -> bool {
self.base
.load()
.node_property_might_match(property, op, value)
|| self.overlay.node_property_might_match(property, op, value)
}
fn edge_property_might_match(
&self,
property: &PropertyKey,
op: CompareOp,
value: &Value,
) -> bool {
self.base
.load()
.edge_property_might_match(property, op, value)
|| self.overlay.edge_property_might_match(property, op, value)
}
fn statistics(&self) -> Arc<Statistics> {
let base_stats = self.base.load().statistics();
let mut combined = (*base_stats).clone();
combined.total_nodes = self.node_count() as u64;
combined.total_edges = self.edge_count() as u64;
for label in self.overlay.all_labels() {
let count = self.overlay.nodes_by_label(&label).len() as u64;
if let Some(existing) = combined.get_label(&label) {
combined.update_label(
&label,
crate::statistics::LabelStatistics::new(existing.node_count + count),
);
} else {
combined.update_label(&label, crate::statistics::LabelStatistics::new(count));
}
}
Arc::new(combined)
}
fn estimate_label_cardinality(&self, label: &str) -> f64 {
self.base.load().estimate_label_cardinality(label)
+ self.overlay.estimate_label_cardinality(label)
}
fn estimate_avg_degree(&self, edge_type: &str, outgoing: bool) -> f64 {
let base_est = self.base.load().estimate_avg_degree(edge_type, outgoing);
let overlay_est = self.overlay.estimate_avg_degree(edge_type, outgoing);
let base_edges = self.base.load().edge_count() as f64;
let overlay_edges = self.overlay.edge_count() as f64;
let total = base_edges + overlay_edges;
if total == 0.0 {
return 0.0;
}
(base_est * base_edges + overlay_est * overlay_edges) / total
}
fn current_epoch(&self) -> EpochId {
self.overlay.current_epoch()
}
fn all_labels(&self) -> Vec<String> {
let mut labels: FxHashSet<String> = self.base.load().all_labels().into_iter().collect();
labels.extend(self.overlay.all_labels());
labels.into_iter().collect()
}
fn all_edge_types(&self) -> Vec<String> {
let mut types: FxHashSet<String> = self.base.load().all_edge_types().into_iter().collect();
types.extend(self.overlay.all_edge_types());
types.into_iter().collect()
}
fn all_property_keys(&self) -> Vec<String> {
let mut keys: FxHashSet<String> =
self.base.load().all_property_keys().into_iter().collect();
keys.extend(self.overlay.all_property_keys());
keys.into_iter().collect()
}
fn is_node_visible_at_epoch(&self, id: NodeId, epoch: EpochId) -> bool {
if self.is_node_deleted_from_base(id) {
return false;
}
if self.is_node_dirty(id) {
return self.overlay.is_node_visible_at_epoch(id, epoch);
}
let base = self.base.load();
if base.get_node(id).is_some() {
base.is_node_visible_at_epoch(id, epoch)
} else {
self.overlay.is_node_visible_at_epoch(id, epoch)
}
}
fn is_node_visible_versioned(
&self,
id: NodeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> bool {
if self.is_node_deleted_from_base(id) {
return false;
}
if self.is_node_dirty(id) {
return self
.overlay
.is_node_visible_versioned(id, epoch, transaction_id);
}
let base = self.base.load();
if base.get_node(id).is_some() {
base.is_node_visible_versioned(id, epoch, transaction_id)
} else {
self.overlay
.is_node_visible_versioned(id, epoch, transaction_id)
}
}
fn is_edge_visible_at_epoch(&self, id: EdgeId, epoch: EpochId) -> bool {
if self.is_edge_deleted_from_base(id) {
return false;
}
if self.is_edge_dirty(id) {
return self.overlay.is_edge_visible_at_epoch(id, epoch);
}
let base = self.base.load();
if base.get_edge(id).is_some() {
base.is_edge_visible_at_epoch(id, epoch)
} else {
self.overlay.is_edge_visible_at_epoch(id, epoch)
}
}
fn is_edge_visible_versioned(
&self,
id: EdgeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> bool {
if self.is_edge_deleted_from_base(id) {
return false;
}
if self.is_edge_dirty(id) {
return self
.overlay
.is_edge_visible_versioned(id, epoch, transaction_id);
}
let base = self.base.load();
if base.get_edge(id).is_some() {
base.is_edge_visible_versioned(id, epoch, transaction_id)
} else {
self.overlay
.is_edge_visible_versioned(id, epoch, transaction_id)
}
}
fn filter_visible_node_ids(&self, ids: &[NodeId], epoch: EpochId) -> Vec<NodeId> {
ids.iter()
.copied()
.filter(|id| self.is_node_visible_at_epoch(*id, epoch))
.collect()
}
fn filter_visible_node_ids_versioned(
&self,
ids: &[NodeId],
epoch: EpochId,
transaction_id: TransactionId,
) -> Vec<NodeId> {
ids.iter()
.copied()
.filter(|id| self.is_node_visible_versioned(*id, epoch, transaction_id))
.collect()
}
fn get_node_history(&self, id: NodeId) -> Vec<(EpochId, Option<EpochId>, Node)> {
if self.is_node_dirty(id) {
return self.overlay.get_node_history(id);
}
Vec::new()
}
fn get_edge_history(&self, id: EdgeId) -> Vec<(EpochId, Option<EpochId>, Edge)> {
if self.is_edge_dirty(id) {
return self.overlay.get_edge_history(id);
}
Vec::new()
}
}
impl GraphStoreSearch for LayeredStore {
#[cfg(feature = "text-index")]
fn has_text_index(&self, label: &str, property: &str) -> bool {
self.overlay.has_text_index(label, property)
}
#[cfg(feature = "text-index")]
fn score_text(&self, node_id: NodeId, label: &str, property: &str, query: &str) -> Option<f64> {
if self.is_node_deleted_from_base(node_id) {
return None;
}
self.overlay.score_text(node_id, label, property, query)
}
#[cfg(feature = "text-index")]
fn text_search(
&self,
label: &str,
property: &str,
query: &str,
k: usize,
) -> Vec<(NodeId, f64)> {
let deleted = self.deleted_from_base_nodes.read();
let mut results = self
.overlay
.text_search(label, property, query, k + deleted.len());
results.retain(|(id, _)| !deleted.contains(id));
results.truncate(k);
results
}
#[cfg(feature = "text-index")]
fn text_search_with_threshold(
&self,
label: &str,
property: &str,
query: &str,
threshold: f64,
) -> Vec<(NodeId, f64)> {
let deleted = self.deleted_from_base_nodes.read();
let mut results = self
.overlay
.text_search_with_threshold(label, property, query, threshold);
results.retain(|(id, _)| !deleted.contains(id));
results
}
#[cfg(feature = "vector-index")]
fn has_vector_index(&self, label: &str, property: &str) -> bool {
self.overlay.has_vector_index(label, property)
}
#[cfg(feature = "vector-index")]
fn vector_index_metric(&self, label: &str, property: &str) -> Option<DistanceMetric> {
self.overlay.vector_index_metric(label, property)
}
#[cfg(feature = "vector-index")]
fn vector_search(
&self,
label: Option<&str>,
property: &str,
query: &[f32],
k: usize,
metric: DistanceMetric,
) -> Vec<(NodeId, f64)> {
let deleted = self.deleted_from_base_nodes.read();
let mut results =
self.overlay
.vector_search(label, property, query, k + deleted.len(), metric);
results.retain(|(id, _)| !deleted.contains(id));
results.truncate(k);
results
}
#[cfg(feature = "vector-index")]
fn vector_search_with_threshold(
&self,
label: Option<&str>,
property: &str,
query: &[f32],
threshold: f64,
metric: DistanceMetric,
) -> Vec<(NodeId, f64)> {
let deleted = self.deleted_from_base_nodes.read();
let mut results = self
.overlay
.vector_search_with_threshold(label, property, query, threshold, metric);
results.retain(|(id, _)| !deleted.contains(id));
results
}
}
impl GraphStoreMut for LayeredStore {
fn create_node(&self, labels: &[&str]) -> NodeId {
let id = self.overlay.create_node(labels);
self.dirty_node_ids.write().insert(id);
id
}
fn create_node_versioned(
&self,
labels: &[&str],
epoch: EpochId,
transaction_id: TransactionId,
) -> NodeId {
let id = self
.overlay
.create_node_versioned(labels, epoch, transaction_id);
self.dirty_node_ids.write().insert(id);
id
}
fn create_edge(&self, src: NodeId, dst: NodeId, edge_type: &str) -> EdgeId {
self.ensure_in_overlay(src);
self.ensure_in_overlay(dst);
let id = self.overlay.create_edge(src, dst, edge_type);
self.dirty_edge_ids.write().insert(id);
id
}
fn create_edge_versioned(
&self,
src: NodeId,
dst: NodeId,
edge_type: &str,
epoch: EpochId,
transaction_id: TransactionId,
) -> EdgeId {
self.ensure_in_overlay(src);
self.ensure_in_overlay(dst);
let id = self
.overlay
.create_edge_versioned(src, dst, edge_type, epoch, transaction_id);
self.dirty_edge_ids.write().insert(id);
id
}
fn batch_create_edges(&self, edges: &[(NodeId, NodeId, &str)]) -> Vec<EdgeId> {
for &(src, dst, _) in edges {
self.ensure_in_overlay(src);
self.ensure_in_overlay(dst);
}
let ids = self.overlay.batch_create_edges(edges);
let mut dirty = self.dirty_edge_ids.write();
for &id in &ids {
dirty.insert(id);
}
ids
}
fn delete_node(&self, id: NodeId) -> bool {
if self.is_node_dirty(id) {
return self.overlay.delete_node(id);
}
if self.base.load().get_node(id).is_some() {
self.deleted_from_base_nodes.write().insert(id);
return true;
}
false
}
fn delete_node_versioned(
&self,
id: NodeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> bool {
if self.is_node_dirty(id) {
return self
.overlay
.delete_node_versioned(id, epoch, transaction_id);
}
if self.base.load().get_node(id).is_some() {
self.deleted_from_base_nodes.write().insert(id);
return true;
}
false
}
fn delete_node_edges(&self, node_id: NodeId) {
if self.is_node_dirty(node_id) {
self.overlay.delete_node_edges(node_id);
}
for (_, eid) in self.base.load().edges_from(node_id, Direction::Both) {
self.deleted_from_base_edges.write().insert(eid);
}
}
fn delete_edge(&self, id: EdgeId) -> bool {
if self.is_edge_dirty(id) {
return self.overlay.delete_edge(id);
}
if self.base.load().get_edge(id).is_some() {
self.deleted_from_base_edges.write().insert(id);
return true;
}
false
}
fn delete_edge_versioned(
&self,
id: EdgeId,
epoch: EpochId,
transaction_id: TransactionId,
) -> bool {
if self.is_edge_dirty(id) {
return self
.overlay
.delete_edge_versioned(id, epoch, transaction_id);
}
if self.base.load().get_edge(id).is_some() {
self.deleted_from_base_edges.write().insert(id);
return true;
}
false
}
fn set_node_property(&self, id: NodeId, key: &str, value: Value) {
self.ensure_in_overlay(id);
self.overlay.set_node_property(id, key, value);
}
fn set_node_property_versioned(
&self,
id: NodeId,
key: &str,
value: Value,
transaction_id: TransactionId,
) {
self.ensure_in_overlay(id);
self.overlay
.set_node_property_versioned(id, key, value, transaction_id);
}
fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) {
self.ensure_edge_in_overlay(id);
self.overlay.set_edge_property(id, key, value);
}
fn set_edge_property_versioned(
&self,
id: EdgeId,
key: &str,
value: Value,
transaction_id: TransactionId,
) {
self.ensure_edge_in_overlay(id);
self.overlay
.set_edge_property_versioned(id, key, value, transaction_id);
}
fn remove_node_property(&self, id: NodeId, key: &str) -> Option<Value> {
self.ensure_in_overlay(id);
self.overlay.remove_node_property(id, key)
}
fn remove_node_property_versioned(
&self,
id: NodeId,
key: &str,
transaction_id: TransactionId,
) -> Option<Value> {
self.ensure_in_overlay(id);
self.overlay
.remove_node_property_versioned(id, key, transaction_id)
}
fn remove_edge_property(&self, id: EdgeId, key: &str) -> Option<Value> {
self.ensure_edge_in_overlay(id);
self.overlay.remove_edge_property(id, key)
}
fn remove_edge_property_versioned(
&self,
id: EdgeId,
key: &str,
transaction_id: TransactionId,
) -> Option<Value> {
self.ensure_edge_in_overlay(id);
self.overlay
.remove_edge_property_versioned(id, key, transaction_id)
}
fn add_label(&self, node_id: NodeId, label: &str) -> bool {
self.ensure_in_overlay(node_id);
self.overlay.add_label(node_id, label)
}
fn add_label_versioned(
&self,
node_id: NodeId,
label: &str,
transaction_id: TransactionId,
) -> bool {
self.ensure_in_overlay(node_id);
self.overlay
.add_label_versioned(node_id, label, transaction_id)
}
fn remove_label(&self, node_id: NodeId, label: &str) -> bool {
self.ensure_in_overlay(node_id);
self.overlay.remove_label(node_id, label)
}
fn remove_label_versioned(
&self,
node_id: NodeId,
label: &str,
transaction_id: TransactionId,
) -> bool {
self.ensure_in_overlay(node_id);
self.overlay
.remove_label_versioned(node_id, label, transaction_id)
}
}
impl LayeredStore {
fn ensure_in_overlay(&self, id: NodeId) {
if self.is_node_dirty(id) {
return; }
let Some(base_node) = self.base.load().get_node(id) else {
return; };
let saved_next = self.overlay.next_node_id();
self.overlay.set_next_node_id(id.as_u64());
let labels: Vec<&str> = base_node.labels.iter().map(|l| l.as_str()).collect();
let promoted_id = self.overlay.create_node(&labels);
debug_assert_eq!(
promoted_id, id,
"promoted node should reuse the original ID"
);
self.overlay.set_next_node_id(saved_next);
for (key, value) in base_node.properties.iter() {
self.overlay
.set_node_property(id, key.as_str(), value.clone());
}
self.dirty_node_ids.write().insert(id);
}
fn ensure_edge_in_overlay(&self, id: EdgeId) {
if self.is_edge_dirty(id) {
return;
}
let Some(base_edge) = self.base.load().get_edge(id) else {
return;
};
self.ensure_in_overlay(base_edge.src);
self.ensure_in_overlay(base_edge.dst);
let saved_next = self.overlay.next_edge_id();
self.overlay.set_next_edge_id(id.as_u64());
let promoted_id =
self.overlay
.create_edge(base_edge.src, base_edge.dst, base_edge.edge_type.as_str());
debug_assert_eq!(
promoted_id, id,
"promoted edge should reuse the original ID"
);
self.overlay.set_next_edge_id(saved_next);
for (key, value) in base_edge.properties.iter() {
self.overlay
.set_edge_property(id, key.as_str(), value.clone());
}
self.dirty_edge_ids.write().insert(id);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::compact::from_graph_store_preserving_ids;
fn build_test_layered() -> LayeredStore {
let store = LpgStore::new().unwrap();
let alix = store.create_node(&["Person"]);
store.set_node_property(alix, "name", Value::from("Alix"));
store.set_node_property(alix, "age", Value::Int64(30));
let gus = store.create_node(&["Person"]);
store.set_node_property(gus, "name", Value::from("Gus"));
store.set_node_property(gus, "age", Value::Int64(25));
let amsterdam = store.create_node(&["City"]);
store.set_node_property(amsterdam, "name", Value::from("Amsterdam"));
let e1 = store.create_edge(alix, amsterdam, "LIVES_IN");
store.set_edge_property(e1, "since", Value::Int64(2020));
let e2 = store.create_edge(gus, amsterdam, "LIVES_IN");
store.set_edge_property(e2, "since", Value::Int64(2022));
let compact = from_graph_store_preserving_ids(&store).unwrap();
let max_nid = store
.node_ids()
.into_iter()
.map(|id| id.as_u64())
.max()
.unwrap_or(0);
let max_eid = 10u64; LayeredStore::new(compact, max_nid, max_eid).unwrap()
}
#[test]
fn test_read_through_base() {
let layered = build_test_layered();
assert_eq!(layered.node_count(), 3);
assert_eq!(layered.edge_count(), 2);
let persons = layered.nodes_by_label("Person");
assert_eq!(persons.len(), 2);
}
#[test]
fn test_create_node_in_overlay() {
let layered = build_test_layered();
let vincent = layered.create_node(&["Person"]);
layered.set_node_property(vincent, "name", Value::from("Vincent"));
assert_eq!(layered.node_count(), 4);
let node = layered.get_node(vincent).unwrap();
assert_eq!(
node.properties.get(&PropertyKey::new("name")),
Some(&Value::String(ArcStr::from("Vincent")))
);
}
#[test]
fn test_delete_base_node() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
assert_eq!(persons.len(), 2);
let deleted = layered.delete_node(persons[0]);
assert!(deleted);
assert!(layered.get_node(persons[0]).is_none());
let remaining_persons = layered.nodes_by_label("Person");
assert_eq!(remaining_persons.len(), 1);
assert_eq!(layered.node_count(), 2);
}
#[test]
fn test_modify_base_node_property() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
let original_age = layered
.get_node_property(first, &PropertyKey::new("age"))
.unwrap();
assert!(matches!(original_age, Value::Int64(_)));
layered.set_node_property(first, "age", Value::Int64(99));
let new_age = layered
.get_node_property(first, &PropertyKey::new("age"))
.unwrap();
assert_eq!(new_age, Value::Int64(99));
}
#[test]
fn test_create_edge_between_base_and_overlay() {
let layered = build_test_layered();
let paris = layered.create_node(&["City"]);
layered.set_node_property(paris, "name", Value::from("Paris"));
let persons = layered.nodes_by_label("Person");
let first_person = persons[0];
let eid = layered.create_edge(first_person, paris, "VISITS");
assert!(layered.get_edge(eid).is_some());
let edge = layered.get_edge(eid).unwrap();
assert_eq!(edge.src, first_person);
assert_eq!(edge.dst, paris);
}
#[test]
fn test_traversal_merges_layers() {
let layered = build_test_layered();
let cities = layered.nodes_by_label("City");
let amsterdam = cities[0];
let incoming = layered.edges_from(amsterdam, Direction::Incoming);
assert_eq!(incoming.len(), 2);
}
#[test]
fn test_node_ids_combines_layers() {
let layered = build_test_layered();
let initial = layered.node_ids();
assert_eq!(initial.len(), 3);
layered.create_node(&["New"]);
let after = layered.node_ids();
assert_eq!(after.len(), 4);
}
#[test]
fn test_delete_edge() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
assert_eq!(edges.len(), 1);
let (_, eid) = edges[0];
let deleted = layered.delete_edge(eid);
assert!(deleted);
let after = layered.edges_from(persons[0], Direction::Outgoing);
assert_eq!(after.len(), 0);
}
#[test]
fn test_all_labels_combines() {
let layered = build_test_layered();
layered.create_node(&["NewLabel"]);
let labels = layered.all_labels();
assert!(labels.contains(&"Person".to_string()));
assert!(labels.contains(&"City".to_string()));
assert!(labels.contains(&"NewLabel".to_string()));
}
#[test]
fn test_get_edge_from_base() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
assert_eq!(edges.len(), 1);
let (_, eid) = edges[0];
let edge = layered.get_edge(eid);
assert!(edge.is_some(), "edge should be readable from base");
let edge = edge.unwrap();
assert_eq!(edge.edge_type.as_str(), "LIVES_IN");
assert_eq!(layered.edge_type(eid).as_deref(), Some("LIVES_IN"));
let since = layered.get_edge_property(eid, &PropertyKey::new("since"));
assert!(
since.is_some(),
"edge property should be readable from base"
);
}
#[test]
fn test_get_node_property_batch_across_layers() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let vincent = layered.create_node(&["Person"]);
layered.set_node_property(vincent, "name", Value::from("Vincent"));
let all_ids: Vec<NodeId> = persons
.iter()
.copied()
.chain(std::iter::once(vincent))
.collect();
let names = layered.get_node_property_batch(&all_ids, &PropertyKey::new("name"));
for name in &names {
assert!(name.is_some(), "every node should have a name property");
}
assert_eq!(
names.last().unwrap().as_ref().unwrap(),
&Value::String(ArcStr::from("Vincent"))
);
}
#[test]
fn test_out_degree_both_layers() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first_person = persons[0];
assert_eq!(layered.out_degree(first_person), 1);
let vincent = layered.create_node(&["Person"]);
let berlin = layered.create_node(&["City"]);
layered.create_edge(vincent, berlin, "VISITS");
assert_eq!(layered.out_degree(vincent), 1);
assert_eq!(layered.out_degree(first_person), 1);
}
#[test]
fn test_in_degree_both_layers() {
let layered = build_test_layered();
let cities = layered.nodes_by_label("City");
let amsterdam = cities[0];
assert_eq!(layered.in_degree(amsterdam), 2);
let jules = layered.create_node(&["Person"]);
let berlin = layered.create_node(&["City"]);
layered.create_edge(jules, berlin, "LIVES_IN");
assert_eq!(layered.in_degree(berlin), 1);
assert_eq!(layered.in_degree(amsterdam), 2);
}
#[test]
fn test_set_node_property_promotes_base_node() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
assert_eq!(layered.overlay_mutation_count(), 0);
layered.set_node_property(first, "city", Value::from("Amsterdam"));
assert!(layered.overlay_mutation_count() > 0);
let city = layered
.get_node_property(first, &PropertyKey::new("city"))
.unwrap();
assert_eq!(city, Value::String(ArcStr::from("Amsterdam")));
}
#[test]
fn test_set_edge_property_promotes_base_edge() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, eid) = edges[0];
assert_eq!(layered.overlay_mutation_count(), 0);
layered.set_edge_property(eid, "weight", Value::Float64(1.5));
assert!(layered.overlay_mutation_count() > 0);
let weight = layered
.get_edge_property(eid, &PropertyKey::new("weight"))
.unwrap();
assert_eq!(weight, Value::Float64(1.5));
}
#[test]
fn test_remove_node_property() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
assert!(
layered
.get_node_property(first, &PropertyKey::new("age"))
.is_some()
);
let removed = layered.remove_node_property(first, "age");
assert!(removed.is_some());
assert!(
layered
.get_node_property(first, &PropertyKey::new("age"))
.is_none()
);
}
#[test]
fn test_remove_edge_property() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, eid) = edges[0];
let removed = layered.remove_edge_property(eid, "since");
assert!(removed.is_some());
assert!(
layered
.get_edge_property(eid, &PropertyKey::new("since"))
.is_none()
);
}
#[test]
fn test_add_label_to_base_node() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
let added = layered.add_label(first, "Employee");
assert!(added);
let node = layered.get_node(first).unwrap();
let label_strs: Vec<&str> = node.labels.iter().map(|l| l.as_str()).collect();
assert!(label_strs.contains(&"Person"));
assert!(label_strs.contains(&"Employee"));
}
#[test]
fn test_remove_label_from_base_node() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
let removed = layered.remove_label(first, "Person");
assert!(removed);
let after_persons = layered.nodes_by_label("Person");
assert!(!after_persons.contains(&first));
assert!(layered.get_node(first).is_some());
}
#[test]
fn test_delete_node_edges_cascade() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first_person = persons[0];
let edges_before = layered.edges_from(first_person, Direction::Outgoing);
assert_eq!(edges_before.len(), 1);
layered.delete_node_edges(first_person);
let edges_after = layered.edges_from(first_person, Direction::Outgoing);
assert_eq!(edges_after.len(), 0);
}
#[test]
fn test_batch_create_edges_cross_layer() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let base_person = persons[0];
let berlin = layered.create_node(&["City"]);
let paris = layered.create_node(&["City"]);
let edge_specs: Vec<(NodeId, NodeId, &str)> = vec![
(base_person, berlin, "VISITS"),
(base_person, paris, "VISITS"),
];
let eids = layered.batch_create_edges(&edge_specs);
assert_eq!(eids.len(), 2);
for eid in &eids {
let edge = layered.get_edge(*eid);
assert!(edge.is_some());
assert_eq!(edge.unwrap().edge_type.as_str(), "VISITS");
}
}
#[test]
fn test_promotion_copies_all_properties() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
let original_name = layered
.get_node_property(first, &PropertyKey::new("name"))
.unwrap();
let original_age = layered
.get_node_property(first, &PropertyKey::new("age"))
.unwrap();
layered.set_node_property(first, "city", Value::from("Berlin"));
let after_name = layered
.get_node_property(first, &PropertyKey::new("name"))
.unwrap();
let after_age = layered
.get_node_property(first, &PropertyKey::new("age"))
.unwrap();
assert_eq!(original_name, after_name);
assert_eq!(original_age, after_age);
let city = layered
.get_node_property(first, &PropertyKey::new("city"))
.unwrap();
assert_eq!(city, Value::String(ArcStr::from("Berlin")));
}
#[test]
fn test_promotion_is_idempotent() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
layered.set_node_property(first, "x", Value::Int64(1));
let count_after_first = layered.node_count();
layered.set_node_property(first, "y", Value::Int64(2));
let count_after_second = layered.node_count();
assert_eq!(count_after_first, count_after_second);
assert_eq!(
layered
.get_node_property(first, &PropertyKey::new("x"))
.unwrap(),
Value::Int64(1)
);
assert_eq!(
layered
.get_node_property(first, &PropertyKey::new("y"))
.unwrap(),
Value::Int64(2)
);
}
#[test]
fn test_edge_promotion_promotes_endpoints() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, eid) = edges[0];
layered.set_edge_property(eid, "weight", Value::Float64(0.5));
let edge = layered.get_edge(eid).unwrap();
let src_node = layered.get_node(edge.src);
let dst_node = layered.get_node(edge.dst);
assert!(
src_node.is_some(),
"source node should be accessible after edge promotion"
);
assert!(
dst_node.is_some(),
"destination node should be accessible after edge promotion"
);
}
#[test]
fn test_deleted_base_node_invisible() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let target = persons[0];
let deleted = layered.delete_node(target);
assert!(deleted);
assert!(layered.get_node(target).is_none());
assert!(
layered
.get_node_property(target, &PropertyKey::new("name"))
.is_none()
);
}
#[test]
fn test_deleted_node_excluded_from_nodes_by_label() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
assert_eq!(persons.len(), 2);
let target = persons[0];
layered.delete_node(target);
let after = layered.nodes_by_label("Person");
assert_eq!(after.len(), 1);
assert!(!after.contains(&target));
}
#[test]
fn test_deleted_node_excluded_from_node_ids() {
let layered = build_test_layered();
let all_before = layered.node_ids();
assert_eq!(all_before.len(), 3);
let persons = layered.nodes_by_label("Person");
let target = persons[0];
layered.delete_node(target);
let all_after = layered.node_ids();
assert_eq!(all_after.len(), 2);
assert!(!all_after.contains(&target));
}
#[test]
fn test_deleted_edge_excluded_from_edges_from() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
let edges = layered.edges_from(first, Direction::Outgoing);
assert_eq!(edges.len(), 1);
let (_, eid) = edges[0];
layered.delete_edge(eid);
let after = layered.edges_from(first, Direction::Outgoing);
assert_eq!(after.len(), 0);
}
#[test]
fn test_deleted_edge_excluded_from_neighbors() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
let neighbors_before = layered.neighbors(first, Direction::Outgoing);
assert_eq!(neighbors_before.len(), 1);
let target_node = neighbors_before[0];
layered.delete_node(target_node);
let neighbors_after = layered.neighbors(first, Direction::Outgoing);
assert_eq!(
neighbors_after.len(),
0,
"deleted node should not appear in neighbors"
);
}
#[test]
fn test_node_count_reflects_deletions() {
let layered = build_test_layered();
assert_eq!(layered.node_count(), 3);
let persons = layered.nodes_by_label("Person");
layered.delete_node(persons[0]);
assert_eq!(layered.node_count(), 2);
layered.delete_node(persons[1]);
assert_eq!(layered.node_count(), 1);
}
#[test]
fn test_edge_count_reflects_deletions() {
let layered = build_test_layered();
assert_eq!(layered.edge_count(), 2);
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, eid) = edges[0];
layered.delete_edge(eid);
assert_eq!(layered.edge_count(), 1);
}
#[test]
fn test_find_nodes_by_property_across_layers() {
let layered = build_test_layered();
let age_30 = layered.find_nodes_by_property("age", &Value::Int64(30));
assert_eq!(age_30.len(), 1);
let vincent = layered.create_node(&["Person"]);
layered.set_node_property(vincent, "age", Value::Int64(30));
let age_30_after = layered.find_nodes_by_property("age", &Value::Int64(30));
assert_eq!(age_30_after.len(), 2);
assert!(age_30_after.contains(&vincent));
}
#[test]
fn test_find_nodes_in_range_across_layers() {
let layered = build_test_layered();
let mia = layered.create_node(&["Person"]);
layered.set_node_property(mia, "age", Value::Int64(35));
let in_range = layered.find_nodes_in_range(
"age",
Some(&Value::Int64(25)),
Some(&Value::Int64(35)),
true,
true,
);
assert!(
in_range.len() >= 3,
"expected at least 3 nodes in range, got {}",
in_range.len()
);
assert!(in_range.contains(&mia));
}
#[test]
fn test_statistics_reflects_overlay() {
let layered = build_test_layered();
let stats_before = layered.statistics();
let nodes_before = stats_before.total_nodes;
layered.create_node(&["Person"]);
layered.create_node(&["City"]);
let stats_after = layered.statistics();
assert_eq!(stats_after.total_nodes, nodes_before + 2);
}
#[test]
fn test_all_edge_types_combines_layers() {
let layered = build_test_layered();
let types_before = layered.all_edge_types();
assert!(types_before.contains(&"LIVES_IN".to_string()));
let persons = layered.nodes_by_label("Person");
let butch = layered.create_node(&["Person"]);
layered.create_edge(persons[0], butch, "KNOWS");
let types_after = layered.all_edge_types();
assert!(types_after.contains(&"LIVES_IN".to_string()));
assert!(types_after.contains(&"KNOWS".to_string()));
}
#[test]
fn test_all_property_keys_combines_layers() {
let layered = build_test_layered();
let keys_before = layered.all_property_keys();
assert!(keys_before.contains(&"name".to_string()));
assert!(keys_before.contains(&"age".to_string()));
let mia = layered.create_node(&["Person"]);
layered.set_node_property(mia, "email", Value::from("mia@example.com"));
let keys_after = layered.all_property_keys();
assert!(keys_after.contains(&"email".to_string()));
assert!(keys_after.contains(&"name".to_string()));
}
#[test]
fn test_overlay_mutation_count() {
let layered = build_test_layered();
assert_eq!(layered.overlay_mutation_count(), 0);
layered.create_node(&["Person"]);
assert_eq!(layered.overlay_mutation_count(), 1);
let persons = layered.nodes_by_label("Person");
layered.delete_node(persons[0]);
assert_eq!(layered.overlay_mutation_count(), 2);
}
#[test]
fn test_memory_bytes_nonzero() {
let layered = build_test_layered();
assert!(
layered.memory_bytes() > 0,
"memory_bytes should be positive for a non-empty store"
);
}
#[test]
fn test_versioned_node_reads() {
let layered = build_test_layered();
let epoch = EpochId::from(u64::MAX);
let txn_id = TransactionId::from(1);
let persons = layered.nodes_by_label("Person");
let first = persons[0];
assert!(
layered.get_node_versioned(first, epoch, txn_id).is_some(),
"versioned read should fall through to base"
);
assert!(
layered.get_node_at_epoch(first, epoch).is_some(),
"base node should be visible at epoch 0"
);
let hans = layered.create_node_versioned(&["Person"], epoch, txn_id);
layered.set_node_property(hans, "name", Value::from("Hans"));
let node = layered.get_node_versioned(hans, epoch, txn_id).unwrap();
assert_eq!(
node.properties.get(&PropertyKey::new("name")),
Some(&Value::String(ArcStr::from("Hans")))
);
assert!(layered.get_node_at_epoch(hans, epoch).is_some());
layered.delete_node(first);
assert!(
layered.get_node_versioned(first, epoch, txn_id).is_none(),
"versioned read should return None for deleted base node"
);
assert!(
layered.get_node_at_epoch(first, epoch).is_none(),
"deleted base node should not be visible at epoch"
);
}
#[test]
fn test_versioned_edge_reads() {
let layered = build_test_layered();
let epoch = EpochId::from(u64::MAX);
let txn_id = TransactionId::from(1);
let persons = layered.nodes_by_label("Person");
let base_edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, base_eid) = base_edges[0];
assert!(
layered
.get_edge_versioned(base_eid, epoch, txn_id)
.is_some(),
"versioned read should fall through to base edge"
);
assert!(
layered.get_edge_at_epoch(base_eid, epoch).is_some(),
"base edge should be visible at epoch 0"
);
let barcelona = layered.create_node(&["City"]);
let overlay_eid =
layered.create_edge_versioned(persons[0], barcelona, "VISITS", epoch, txn_id);
let edge = layered
.get_edge_versioned(overlay_eid, epoch, txn_id)
.unwrap();
assert_eq!(edge.edge_type.as_str(), "VISITS");
layered.delete_edge(base_eid);
assert!(
layered
.get_edge_versioned(base_eid, epoch, txn_id)
.is_none(),
"versioned read should return None for deleted base edge"
);
assert!(
layered.get_edge_at_epoch(base_eid, epoch).is_none(),
"deleted base edge should not be visible at epoch"
);
}
#[test]
fn test_node_visibility() {
let layered = build_test_layered();
let epoch = EpochId::from(u64::MAX);
let txn_id = TransactionId::from(1);
let persons = layered.nodes_by_label("Person");
let target = persons[0];
assert!(layered.is_node_visible_at_epoch(target, epoch));
assert!(layered.is_node_visible_versioned(target, epoch, txn_id));
let beatrix = layered.create_node(&["Person"]);
assert!(layered.is_node_visible_at_epoch(beatrix, epoch));
let butch = layered.create_node_versioned(&["Person"], epoch, txn_id);
assert!(layered.is_node_visible_versioned(butch, epoch, txn_id));
layered.delete_node(target);
assert!(!layered.is_node_visible_at_epoch(target, epoch));
assert!(!layered.is_node_visible_versioned(target, epoch, txn_id));
}
#[test]
fn test_edge_visibility() {
let layered = build_test_layered();
let epoch = EpochId::from(u64::MAX);
let txn_id = TransactionId::from(1);
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, eid) = edges[0];
assert!(layered.is_edge_visible_at_epoch(eid, epoch));
assert!(layered.is_edge_visible_versioned(eid, epoch, txn_id));
layered.delete_edge(eid);
assert!(!layered.is_edge_visible_at_epoch(eid, epoch));
assert!(!layered.is_edge_visible_versioned(eid, epoch, txn_id));
}
#[test]
fn test_filter_visible_node_ids() {
let layered = build_test_layered();
let epoch = EpochId::from(u64::MAX);
let txn_id = TransactionId::from(1);
let all_ids = layered.node_ids();
assert_eq!(all_ids.len(), 3);
assert_eq!(layered.filter_visible_node_ids(&all_ids, epoch).len(), 3);
assert_eq!(
layered
.filter_visible_node_ids_versioned(&all_ids, epoch, txn_id)
.len(),
3
);
let persons = layered.nodes_by_label("Person");
layered.delete_node(persons[0]);
let visible_epoch = layered.filter_visible_node_ids(&all_ids, epoch);
assert_eq!(visible_epoch.len(), 2);
assert!(!visible_epoch.contains(&persons[0]));
let visible_versioned = layered.filter_visible_node_ids_versioned(&all_ids, epoch, txn_id);
assert_eq!(visible_versioned.len(), 2);
assert!(!visible_versioned.contains(&persons[0]));
}
#[test]
fn test_history_base_only_and_dirty() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
let edges = layered.edges_from(first, Direction::Outgoing);
let (_, eid) = edges[0];
assert!(
layered.get_node_history(first).is_empty(),
"base-only node should have no history entries"
);
assert!(
layered.get_edge_history(eid).is_empty(),
"base-only edge should have no history entries"
);
layered.set_node_property(first, "age", Value::Int64(42));
layered.set_edge_property(eid, "weight", Value::Float64(2.0));
let _ = layered.get_node_history(first);
let _ = layered.get_edge_history(eid);
}
#[test]
fn test_find_nodes_by_properties_across_layers() {
let layered = build_test_layered();
let results = layered
.find_nodes_by_properties(&[("name", Value::from("Alix")), ("age", Value::Int64(30))]);
assert_eq!(results.len(), 1);
let mia = layered.create_node(&["Person"]);
layered.set_node_property(mia, "name", Value::from("Alix"));
layered.set_node_property(mia, "age", Value::Int64(30));
let results_after = layered
.find_nodes_by_properties(&[("name", Value::from("Alix")), ("age", Value::Int64(30))]);
assert_eq!(results_after.len(), 2);
assert!(results_after.contains(&mia));
}
#[test]
fn test_find_nodes_by_properties_empty_conditions() {
let layered = build_test_layered();
let results = layered.find_nodes_by_properties(&[]);
assert_eq!(results.len(), layered.node_ids().len());
}
#[test]
fn test_get_nodes_properties_selective_batch() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let vincent = layered.create_node(&["Person"]);
layered.set_node_property(vincent, "name", Value::from("Vincent"));
layered.set_node_property(vincent, "age", Value::Int64(38));
let all_ids: Vec<NodeId> = persons
.iter()
.copied()
.chain(std::iter::once(vincent))
.collect();
let keys = vec![PropertyKey::new("name"), PropertyKey::new("age")];
let batch = layered.get_nodes_properties_selective_batch(&all_ids, &keys);
assert_eq!(batch.len(), all_ids.len());
for map in &batch {
for key in map.keys() {
assert!(
keys.contains(key),
"unexpected key {:?} in selective batch",
key
);
}
}
let vincent_map = &batch[batch.len() - 1];
assert_eq!(
vincent_map.get(&PropertyKey::new("name")),
Some(&Value::String(ArcStr::from("Vincent")))
);
assert_eq!(
vincent_map.get(&PropertyKey::new("age")),
Some(&Value::Int64(38))
);
}
#[test]
fn test_get_edges_properties_selective_batch() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let edges_a = layered.edges_from(persons[0], Direction::Outgoing);
let edges_b = layered.edges_from(persons[1], Direction::Outgoing);
let edge_ids: Vec<EdgeId> = edges_a
.iter()
.chain(edges_b.iter())
.map(|(_, eid)| *eid)
.collect();
let keys = vec![PropertyKey::new("since")];
let batch = layered.get_edges_properties_selective_batch(&edge_ids, &keys);
assert_eq!(batch.len(), edge_ids.len());
for map in &batch {
assert!(
map.contains_key(&PropertyKey::new("since")),
"each edge should have the 'since' property"
);
}
}
#[test]
fn test_estimate_label_cardinality() {
let layered = build_test_layered();
let person_card = layered.estimate_label_cardinality("Person");
assert!(
person_card >= 2.0,
"should estimate at least 2 Person nodes, got {}",
person_card
);
let city_card = layered.estimate_label_cardinality("City");
assert!(
city_card >= 1.0,
"should estimate at least 1 City node, got {}",
city_card
);
let missing_card = layered.estimate_label_cardinality("NonExistent");
assert!(
missing_card >= 0.0,
"cardinality for unknown label should be non-negative"
);
}
#[test]
fn test_estimate_avg_degree() {
let layered = build_test_layered();
let avg_out = layered.estimate_avg_degree("LIVES_IN", true);
assert!(
avg_out > 0.0,
"average out-degree for LIVES_IN should be positive"
);
let avg_in = layered.estimate_avg_degree("LIVES_IN", false);
assert!(
avg_in > 0.0,
"average in-degree for LIVES_IN should be positive"
);
}
#[test]
fn test_estimate_avg_degree_empty() {
let store = LpgStore::new().unwrap();
let compact = from_graph_store_preserving_ids(&store).unwrap();
let layered = LayeredStore::new(compact, 0, 0).unwrap();
let avg = layered.estimate_avg_degree("NONEXISTENT", true);
assert_eq!(avg, 0.0, "empty store should have avg degree 0");
}
#[test]
fn test_node_property_might_match() {
let layered = build_test_layered();
let might = layered.node_property_might_match(
&PropertyKey::new("age"),
CompareOp::Eq,
&Value::Int64(30),
);
assert!(might, "zone map should indicate age might match 30");
}
#[test]
fn test_edge_property_might_match() {
let layered = build_test_layered();
let might = layered.edge_property_might_match(
&PropertyKey::new("since"),
CompareOp::Eq,
&Value::Int64(2020),
);
assert!(might, "zone map should indicate since might match 2020");
}
#[test]
fn test_layered_delete_and_recreate_node() {
let layered = build_test_layered();
let persons_before = layered.nodes_by_label("Person");
assert_eq!(persons_before.len(), 2);
let target = persons_before[0];
let other = persons_before[1];
let other_neighbors_before = layered.neighbors(other, Direction::Outgoing);
assert!(layered.delete_node(target));
let replacement = layered.create_node(&["Person"]);
layered.set_node_property(replacement, "name", Value::from("Shosanna"));
assert_eq!(layered.node_count(), 3);
let persons_after = layered.nodes_by_label("Person");
assert_eq!(persons_after.len(), 2);
assert!(persons_after.contains(&other));
assert!(persons_after.contains(&replacement));
assert!(
!persons_after.contains(&target),
"deleted base node must not reappear"
);
let other_neighbors_after = layered.neighbors(other, Direction::Outgoing);
assert_eq!(other_neighbors_before, other_neighbors_after);
let all_ids = layered.node_ids();
assert!(!all_ids.contains(&target));
assert!(all_ids.contains(&replacement));
}
#[test]
fn test_layered_promote_node_on_mutation() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let target = persons[0];
let next_id_before = layered.overlay.next_node_id();
let base_node = layered.base.load().get_node(target).unwrap();
let base_labels: Vec<String> = base_node
.labels
.iter()
.map(|l| l.as_str().to_string())
.collect();
let base_name = layered
.base
.load()
.get_node_property(target, &PropertyKey::new("name"))
.unwrap();
layered.set_node_property(target, "city", Value::from("Amsterdam"));
let promoted = layered.overlay.get_node(target).unwrap();
let promoted_labels: Vec<String> = promoted
.labels
.iter()
.map(|l| l.as_str().to_string())
.collect();
assert_eq!(promoted_labels, base_labels);
let name_after = layered
.get_node_property(target, &PropertyKey::new("name"))
.unwrap();
assert_eq!(name_after, base_name);
assert_eq!(
layered.get_node_property(target, &PropertyKey::new("city")),
Some(Value::String(ArcStr::from("Amsterdam")))
);
let next_id_after = layered.overlay.next_node_id();
assert_eq!(
next_id_before, next_id_after,
"overlay next_node_id should be restored after promotion"
);
let fresh = layered.create_node(&["Person"]);
assert_ne!(fresh, target);
for &p in &persons {
assert_ne!(fresh, p);
}
}
#[test]
fn test_layered_promote_edge_on_mutation() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
assert_eq!(edges.len(), 1);
let (target_dst, target_eid) = edges[0];
let base_edge = layered.base.load().get_edge(target_eid).unwrap();
let base_since = base_edge
.properties
.get(&PropertyKey::new("since"))
.cloned()
.unwrap();
layered.set_edge_property(target_eid, "weight", Value::Float64(0.75));
assert!(layered.overlay.get_edge(target_eid).is_some());
let since_after = layered
.get_edge_property(target_eid, &PropertyKey::new("since"))
.unwrap();
assert_eq!(since_after, base_since);
assert_eq!(
layered.get_edge_property(target_eid, &PropertyKey::new("weight")),
Some(Value::Float64(0.75))
);
assert!(
layered.overlay.get_node(persons[0]).is_some(),
"edge source must be in the overlay after promotion"
);
assert!(
layered.overlay.get_node(target_dst).is_some(),
"edge destination must be in the overlay after promotion"
);
assert!(
layered
.get_node_property(persons[0], &PropertyKey::new("name"))
.is_some()
);
assert!(
layered
.get_node_property(target_dst, &PropertyKey::new("name"))
.is_some()
);
}
#[test]
fn test_layered_is_node_dirty_after_mutation() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let target = persons[0];
assert!(
!layered.is_node_dirty(target),
"base-only node should start clean"
);
layered.set_node_property(target, "city", Value::from("Berlin"));
assert!(
layered.is_node_dirty(target),
"node must be dirty after a mutating set_node_property"
);
}
#[test]
fn test_base_store_and_overlay_store_accessors() {
let layered = build_test_layered();
assert_eq!(layered.base_store_arc().node_count(), 3);
assert_eq!(layered.base_store_arc().edge_count(), 2);
let arc = layered.base_store_arc();
assert_eq!(arc.node_count(), 3);
assert_eq!(layered.overlay_store().node_count(), 0);
}
#[test]
fn test_has_backward_adjacency() {
let layered = build_test_layered();
assert!(layered.has_backward_adjacency());
}
#[test]
fn test_debug_format_does_not_panic() {
let layered = build_test_layered();
let s = format!("{layered:?}");
assert!(s.contains("LayeredStore"));
}
#[test]
fn test_current_epoch_delegates_to_overlay() {
let layered = build_test_layered();
let epoch = layered.current_epoch();
assert_eq!(epoch, layered.overlay_store().current_epoch());
}
#[test]
fn test_delete_overlay_only_node() {
let layered = build_test_layered();
let beatrix = layered.create_node(&["Person"]);
assert!(layered.get_node(beatrix).is_some());
let deleted = layered.delete_node(beatrix);
assert!(deleted);
assert!(
layered.get_node(beatrix).is_none(),
"overlay-only node should be unreadable after delete"
);
let missing = NodeId::from(9_999_999u64);
assert!(!layered.delete_node(missing));
}
#[test]
fn test_delete_overlay_only_edge() {
let layered = build_test_layered();
let django = layered.create_node(&["Person"]);
let shosanna = layered.create_node(&["Person"]);
let eid = layered.create_edge(django, shosanna, "KNOWS");
assert!(layered.get_edge(eid).is_some());
let deleted = layered.delete_edge(eid);
assert!(deleted);
assert!(
layered.get_edge(eid).is_none(),
"overlay-only edge should be unreadable after delete"
);
let missing = EdgeId::from(9_999_999u64);
assert!(!layered.delete_edge(missing));
}
#[test]
fn test_delete_then_recreate_node_with_same_label() {
let layered = build_test_layered();
let persons_before = layered.nodes_by_label("Person");
assert_eq!(persons_before.len(), 2);
layered.delete_node(persons_before[0]);
let hans = layered.create_node(&["Person"]);
layered.set_node_property(hans, "name", Value::from("Hans"));
let persons_after = layered.nodes_by_label("Person");
assert_eq!(persons_after.len(), 2);
assert!(persons_after.contains(&hans));
assert!(!persons_after.contains(&persons_before[0]));
}
#[test]
fn test_neighbors_from_promoted_node_with_new_overlay_edges() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
layered.set_node_property(first, "touched", Value::Bool(true));
let paris = layered.create_node(&["City"]);
layered.set_node_property(paris, "name", Value::from("Paris"));
let _ = layered.create_edge(first, paris, "VISITS");
let outgoing = layered.neighbors(first, Direction::Outgoing);
assert!(
outgoing.contains(&paris),
"overlay-created edge target should appear in neighbors"
);
}
#[test]
fn test_edges_from_promoted_node_has_overlay_edges() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
let berlin = layered.create_node(&["City"]);
layered.set_node_property(first, "touched", Value::Bool(true));
let new_eid = layered.create_edge(first, berlin, "VISITS");
let edges = layered.edges_from(first, Direction::Outgoing);
let found_ids: Vec<EdgeId> = edges.iter().map(|(_, e)| *e).collect();
assert!(
found_ids.contains(&new_eid),
"new overlay edge should be reachable via edges_from after promotion"
);
}
#[test]
fn test_edge_count_with_overlay_adds() {
let layered = build_test_layered();
assert_eq!(layered.edge_count(), 2);
let persons = layered.nodes_by_label("Person");
let vincent = layered.create_node(&["Person"]);
let _ = layered.create_edge(persons[0], vincent, "KNOWS");
let _ = layered.create_edge(persons[1], vincent, "KNOWS");
assert_eq!(layered.edge_count(), 4);
}
#[test]
fn test_edge_count_with_base_edge_promoted_is_not_double_counted() {
let layered = build_test_layered();
assert_eq!(layered.edge_count(), 2);
let persons = layered.nodes_by_label("Person");
let base_edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, base_eid) = base_edges[0];
layered.set_edge_property(base_eid, "weight", Value::Float64(1.0));
assert_eq!(layered.edge_count(), 2);
}
#[test]
fn test_delete_edge_then_recreate() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
let edges = layered.edges_from(first, Direction::Outgoing);
let (target, base_eid) = edges[0];
assert!(layered.delete_edge(base_eid));
assert_eq!(layered.edges_from(first, Direction::Outgoing).len(), 0);
let new_eid = layered.create_edge(first, target, "LIVES_IN");
assert_ne!(new_eid, base_eid);
let edges_after = layered.edges_from(first, Direction::Outgoing);
assert_eq!(edges_after.len(), 1);
assert_eq!(edges_after[0].1, new_eid);
}
#[test]
fn test_overlay_mutation_count_tracks_all_four_kinds() {
let layered = build_test_layered();
assert_eq!(layered.overlay_mutation_count(), 0);
layered.create_node(&["Person"]);
assert_eq!(
layered.overlay_mutation_count(),
1,
"kind 1 (dirty node) must increment by exactly 1"
);
let a = layered.create_node(&["Person"]);
let b = layered.create_node(&["Person"]);
let _ = layered.create_edge(a, b, "KNOWS");
assert_eq!(
layered.overlay_mutation_count(),
4,
"kind 2 (2 nodes + 1 edge) must add exactly 3, for total 4"
);
let persons = layered.nodes_by_label("Person");
let base_person = *persons
.iter()
.find(|id| layered.base.load().get_node(**id).is_some())
.expect("fixture must have at least one base node");
let before_delete_node = layered.overlay_mutation_count();
layered.delete_node(base_person);
assert_eq!(
layered.overlay_mutation_count(),
before_delete_node + 1,
"kind 3 (deleted base node) must increment by exactly 1"
);
let persons2 = layered.nodes_by_label("Person");
let (other_base, base_eid) = persons2
.iter()
.find_map(|id| {
layered.base.load().get_node(*id)?;
let edges = layered.edges_from(*id, Direction::Outgoing);
edges.first().map(|(_, eid)| (*id, *eid))
})
.expect("fixture must have at least one base edge to delete");
let _ = other_base;
let before_delete_edge = layered.overlay_mutation_count();
layered.delete_edge(base_eid);
assert_eq!(
layered.overlay_mutation_count(),
before_delete_edge + 1,
"kind 4 (deleted base edge) must increment by exactly 1"
);
}
#[test]
fn test_get_node_history_base_edge_returns_empty() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, base_eid) = edges[0];
assert!(layered.get_edge_history(base_eid).is_empty());
assert!(layered.get_node_history(persons[0]).is_empty());
}
#[test]
fn test_base_store_accessors() {
let layered = build_test_layered();
assert_eq!(layered.base_store_arc().node_count(), 3);
let arc_clone = layered.base_store_arc();
assert_eq!(arc_clone.node_count(), 3);
assert!(Arc::strong_count(&arc_clone) >= 2);
}
#[test]
fn test_overlay_store_accessor() {
let layered = build_test_layered();
assert_eq!(layered.overlay_store().node_count(), 0);
layered.create_node(&["Person"]);
assert_eq!(layered.overlay_store().node_count(), 1);
}
#[test]
fn test_debug_impl_renders() {
let layered = build_test_layered();
layered.create_node(&["Person"]);
let persons = layered.nodes_by_label("Person");
layered.delete_node(persons[0]);
let rendered = format!("{layered:?}");
assert!(rendered.contains("LayeredStore"));
assert!(rendered.contains("base_node_count"));
assert!(rendered.contains("overlay_node_count"));
assert!(rendered.contains("dirty_nodes"));
assert!(rendered.contains("deleted_base_nodes"));
}
#[test]
fn test_get_nodes_properties_batch_full() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let vincent = layered.create_node(&["Person"]);
layered.set_node_property(vincent, "name", Value::from("Vincent"));
let ids: Vec<NodeId> = persons
.iter()
.copied()
.chain(std::iter::once(vincent))
.collect();
let batch = layered.get_nodes_properties_batch(&ids);
assert_eq!(batch.len(), ids.len());
for map in batch.iter().take(persons.len()) {
assert!(map.contains_key(&PropertyKey::new("name")));
assert!(map.contains_key(&PropertyKey::new("age")));
}
let vincent_map = &batch[batch.len() - 1];
assert_eq!(
vincent_map.get(&PropertyKey::new("name")),
Some(&Value::String(ArcStr::from("Vincent")))
);
let missing = NodeId::new(999_999);
let batch_missing = layered.get_nodes_properties_batch(&[missing]);
assert_eq!(batch_missing.len(), 1);
assert!(batch_missing[0].is_empty());
}
#[test]
fn test_get_node_deleted_returns_none() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let target = persons[0];
layered.delete_node(target);
assert!(layered.get_node(target).is_none());
let batch = layered.get_nodes_properties_batch(&[target]);
assert!(batch[0].is_empty());
}
#[test]
fn test_get_edge_dirty_path_via_promotion() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, eid) = edges[0];
layered.set_edge_property(eid, "weight", Value::Float64(1.25));
let edge = layered.get_edge(eid).unwrap();
assert_eq!(edge.edge_type.as_str(), "LIVES_IN");
assert_eq!(layered.edge_type(eid).as_deref(), Some("LIVES_IN"));
let weight = layered
.get_edge_property(eid, &PropertyKey::new("weight"))
.unwrap();
assert_eq!(weight, Value::Float64(1.25));
}
#[test]
fn test_get_edge_property_deleted() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, eid) = edges[0];
layered.delete_edge(eid);
assert!(
layered
.get_edge_property(eid, &PropertyKey::new("since"))
.is_none(),
"deleted edge should not expose properties"
);
assert!(layered.edge_type(eid).is_none());
}
#[test]
fn test_get_node_at_epoch_and_versioned_dirty() {
let layered = build_test_layered();
let epoch = EpochId::from(u64::MAX);
let txn_id = TransactionId::from(1);
let jules = layered.create_node(&["Person"]);
layered.set_node_property(jules, "name", Value::from("Jules"));
assert!(layered.get_node_at_epoch(jules, epoch).is_some());
assert!(layered.get_node_versioned(jules, epoch, txn_id).is_some());
}
#[test]
fn test_get_edge_at_epoch_and_versioned_dirty() {
let layered = build_test_layered();
let epoch = EpochId::from(u64::MAX);
let txn_id = TransactionId::from(1);
let django = layered.create_node(&["Person"]);
let prague = layered.create_node(&["City"]);
let eid = layered.create_edge(django, prague, "VISITS");
assert!(layered.get_edge_at_epoch(eid, epoch).is_some());
assert!(layered.get_edge_versioned(eid, epoch, txn_id).is_some());
}
#[test]
fn test_delete_nonexistent_node_returns_false() {
let layered = build_test_layered();
let missing = NodeId::new(999_999);
assert!(!layered.delete_node(missing));
let txn_id = TransactionId::from(1);
let epoch = EpochId::from(u64::MAX);
assert!(!layered.delete_node_versioned(missing, epoch, txn_id));
}
#[test]
fn test_delete_nonexistent_edge_returns_false() {
let layered = build_test_layered();
let missing = EdgeId::new(999_999);
assert!(!layered.delete_edge(missing));
let txn_id = TransactionId::from(1);
let epoch = EpochId::from(u64::MAX);
assert!(!layered.delete_edge_versioned(missing, epoch, txn_id));
}
#[test]
fn test_delete_dirty_node_via_overlay() {
let layered = build_test_layered();
let shosanna = layered.create_node(&["Person"]);
assert!(layered.get_node(shosanna).is_some());
assert!(layered.delete_node(shosanna));
assert!(layered.get_node(shosanna).is_none());
}
#[test]
fn test_delete_dirty_edge_via_overlay() {
let layered = build_test_layered();
let hans = layered.create_node(&["Person"]);
let berlin = layered.create_node(&["City"]);
let eid = layered.create_edge(hans, berlin, "LIVES_IN");
assert!(layered.delete_edge(eid));
assert!(layered.get_edge(eid).is_none());
}
#[test]
fn test_delete_base_node_versioned() {
let layered = build_test_layered();
let epoch = EpochId::from(u64::MAX);
let txn_id = TransactionId::from(1);
let persons = layered.nodes_by_label("Person");
assert!(layered.delete_node_versioned(persons[0], epoch, txn_id));
assert!(layered.get_node(persons[0]).is_none());
}
#[test]
fn test_delete_base_edge_versioned() {
let layered = build_test_layered();
let epoch = EpochId::from(u64::MAX);
let txn_id = TransactionId::from(1);
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, eid) = edges[0];
assert!(layered.delete_edge_versioned(eid, epoch, txn_id));
assert!(layered.get_edge(eid).is_none());
}
#[test]
fn test_delete_node_edges_on_dirty_source() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
layered.set_node_property(first, "city", Value::from("Berlin"));
assert!(layered.overlay.get_node(first).is_some());
layered.delete_node_edges(first);
let remaining = layered.edges_from(first, Direction::Outgoing);
assert!(
remaining.is_empty(),
"edges from a dirty source should be fully removed"
);
}
#[test]
fn test_set_node_property_versioned_promotes_base() {
let layered = build_test_layered();
let txn_id = TransactionId::from(42);
let persons = layered.nodes_by_label("Person");
let first = persons[0];
layered.set_node_property_versioned(first, "city", Value::from("Paris"), txn_id);
let city = layered
.get_node_property(first, &PropertyKey::new("city"))
.unwrap();
assert_eq!(city, Value::String(ArcStr::from("Paris")));
}
#[test]
fn test_set_edge_property_versioned_promotes_base() {
let layered = build_test_layered();
let txn_id = TransactionId::from(7);
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, eid) = edges[0];
layered.set_edge_property_versioned(eid, "weight", Value::Float64(3.5), txn_id);
let weight = layered
.get_edge_property(eid, &PropertyKey::new("weight"))
.unwrap();
assert_eq!(weight, Value::Float64(3.5));
}
#[test]
fn test_remove_node_property_versioned_on_overlay_node() {
let layered = build_test_layered();
let txn_id = TransactionId::from(101);
let mia = layered.create_node(&["Person"]);
layered.set_node_property(mia, "email", Value::from("mia@example.com"));
let removed = layered.remove_node_property_versioned(mia, "email", txn_id);
assert_eq!(
removed,
Some(Value::String(ArcStr::from("mia@example.com")))
);
assert!(
layered
.get_node_property(mia, &PropertyKey::new("email"))
.is_none()
);
}
#[test]
fn test_remove_edge_property_versioned_on_overlay_edge() {
let layered = build_test_layered();
let txn_id = TransactionId::from(202);
let django = layered.create_node(&["Person"]);
let paris = layered.create_node(&["City"]);
let eid = layered.create_edge(django, paris, "VISITS");
layered.set_edge_property(eid, "year", Value::Int64(2024));
let removed = layered.remove_edge_property_versioned(eid, "year", txn_id);
assert_eq!(removed, Some(Value::Int64(2024)));
assert!(
layered
.get_edge_property(eid, &PropertyKey::new("year"))
.is_none()
);
}
#[test]
fn test_add_and_remove_label_versioned_on_overlay_node() {
let layered = build_test_layered();
let txn_id = TransactionId::from(11);
let butch = layered.create_node(&["Person"]);
assert!(layered.add_label_versioned(butch, "Employee", txn_id));
let node = layered.get_node(butch).unwrap();
let labels: Vec<&str> = node.labels.iter().map(|l| l.as_str()).collect();
assert!(labels.contains(&"Employee"));
assert!(labels.contains(&"Person"));
assert!(layered.remove_label_versioned(butch, "Employee", txn_id));
let node = layered.get_node(butch).unwrap();
let labels: Vec<&str> = node.labels.iter().map(|l| l.as_str()).collect();
assert!(!labels.contains(&"Employee"));
}
#[test]
fn test_ensure_in_overlay_noop_for_nonexistent_node() {
let layered = build_test_layered();
let missing = NodeId::new(999_999);
layered.set_node_property(missing, "name", Value::from("Ghost"));
assert!(layered.base_store_arc().get_node(missing).is_none());
}
#[test]
fn test_ensure_edge_in_overlay_noop_for_nonexistent_edge() {
let layered = build_test_layered();
let missing = EdgeId::new(999_999);
layered.set_edge_property(missing, "weight", Value::Float64(1.0));
assert!(layered.base_store_arc().get_edge(missing).is_none());
}
#[test]
fn test_ensure_edge_in_overlay_idempotent() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let edges = layered.edges_from(persons[0], Direction::Outgoing);
let (_, eid) = edges[0];
layered.set_edge_property(eid, "weight", Value::Float64(1.0));
layered.set_edge_property(eid, "weight", Value::Float64(2.0));
let weight = layered
.get_edge_property(eid, &PropertyKey::new("weight"))
.unwrap();
assert_eq!(weight, Value::Float64(2.0));
}
#[test]
fn test_neighbors_incoming_with_deleted_source() {
let layered = build_test_layered();
let cities = layered.nodes_by_label("City");
let amsterdam = cities[0];
let persons = layered.nodes_by_label("Person");
layered.delete_node(persons[0]);
let incoming = layered.neighbors(amsterdam, Direction::Incoming);
assert!(!incoming.contains(&persons[0]));
assert_eq!(incoming.len(), 1);
}
#[test]
fn test_edges_from_dirty_source_merges_layers() {
let layered = build_test_layered();
let persons = layered.nodes_by_label("Person");
let first = persons[0];
layered.set_node_property(first, "city", Value::from("Berlin"));
let prague = layered.create_node(&["City"]);
layered.create_edge(first, prague, "VISITS");
let outgoing = layered.edges_from(first, Direction::Outgoing);
assert!(
outgoing.iter().any(|(target, _)| *target == prague),
"new overlay edge should appear in edges_from"
);
}
}