use std::collections::HashSet;
use selene_core::{DbString, EdgeId, GraphId, LabelSet, NodeId, PropertyMap, Value, db_string};
use super::{
COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_BASIS_POINTS,
COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS, CompactionStats, compact_core,
};
use crate::error::GraphError;
use crate::store::RowIndex;
use crate::{AdjacencyEntry, CompactionReport, SeleneGraph, SharedGraph};
fn prop(key: &str, value: Value) -> PropertyMap {
PropertyMap::from_pairs([(db_string(key).unwrap(), value)]).unwrap()
}
fn adjacency_summary(entry: &AdjacencyEntry) -> Vec<(NodeId, EdgeId, DbString)> {
let mut edges: Vec<(NodeId, EdgeId, DbString)> = entry
.iter()
.map(|edge| (edge.neighbor, edge.edge_id, edge.label.clone()))
.collect();
edges.sort_by_key(|(neighbor, edge_id, _)| (*neighbor, *edge_id));
edges
}
fn graph_with_a_deletion() -> SharedGraph {
let shared = SharedGraph::new(GraphId::new(1));
let la = db_string("cmp.a").unwrap();
let lb = db_string("cmp.b").unwrap();
let el = db_string("cmp.e").unwrap();
let mut txn = shared.begin_write();
{
let mut m = txn.mutator();
let n1 = m
.create_node(LabelSet::single(la.clone()), prop("name", Value::Int(1)))
.unwrap();
let n2 = m
.create_node(LabelSet::single(lb), prop("name", Value::Int(2)))
.unwrap();
let n3 = m
.create_node(LabelSet::single(la.clone()), prop("name", Value::Int(3)))
.unwrap();
let n4 = m
.create_node(LabelSet::single(la), prop("name", Value::Int(4)))
.unwrap();
assert_eq!(
(n1, n2, n3, n4),
(
NodeId::new(1),
NodeId::new(2),
NodeId::new(3),
NodeId::new(4)
)
);
m.create_edge(el.clone(), n1, n3, prop("w", Value::Int(10)))
.unwrap(); m.create_edge(el.clone(), n1, n2, PropertyMap::new())
.unwrap(); m.create_edge(el, n3, n4, PropertyMap::new()).unwrap(); m.delete_node(n2).unwrap();
}
txn.commit().unwrap();
shared
}
#[test]
fn compaction_drops_dead_rows_and_renumbers_dense() {
let shared = graph_with_a_deletion();
let before = shared.read();
let next_node_before = before.meta.next_node_id;
let next_edge_before = before.meta.next_edge_id;
let live_nodes = before.node_count();
let len_before = before.node_store.len();
let compacted = compact_core(&before).unwrap();
let g = &compacted.graph;
assert_eq!(g.node_store.len(), live_nodes);
assert!(
len_before > g.node_store.len(),
"dead rows existed to reclaim"
);
assert_eq!(g.node_count(), live_nodes);
for row in 0..g.node_store.len() as u32 {
let id = g
.node_id_for_row(RowIndex::new(row))
.expect("every dense row has a live external id");
assert!(g.is_node_alive(id), "dense row {row} ({id}) must be alive");
}
assert_eq!(
compacted.report.reclaimed_nodes,
(len_before - g.node_store.len()) as u64
);
assert!(
compacted.report.reclaimed_edges >= 1,
"the n1->n2 edge (id 2) was reclaimed"
);
assert_eq!(g.row_for_node_id(NodeId::new(1)), Some(RowIndex::new(0)));
assert_eq!(g.row_for_node_id(NodeId::new(3)), Some(RowIndex::new(1)));
assert_eq!(g.row_for_node_id(NodeId::new(4)), Some(RowIndex::new(2)));
assert!(!before.is_node_alive(NodeId::new(2)));
assert!(
before.row_for_node_id(NodeId::new(2)).is_some(),
"pre-compaction: deleted id still mapped (NotAlive, Option B)"
);
assert!(
g.row_for_node_id(NodeId::new(2)).is_none(),
"post-compaction: deleted id reclaimed (NotFound)"
);
assert!(!g.is_node_alive(NodeId::new(2)));
assert_eq!(g.meta.next_node_id, next_node_before);
assert_eq!(g.meta.next_edge_id, next_edge_before);
}
#[test]
fn compaction_preserves_observable_reads() {
let shared = graph_with_a_deletion();
let before = shared.read();
let compacted = compact_core(&before).unwrap();
let g = &compacted.graph;
for id in [NodeId::new(1), NodeId::new(3), NodeId::new(4)] {
assert_eq!(g.node_labels(id), before.node_labels(id), "labels {id}");
assert_eq!(
g.node_properties(id),
before.node_properties(id),
"properties {id}"
);
assert_eq!(
g.outgoing_edges(id).map(adjacency_summary),
before.outgoing_edges(id).map(adjacency_summary),
"outgoing adjacency {id}"
);
assert_eq!(
g.incoming_edges(id).map(adjacency_summary),
before.incoming_edges(id).map(adjacency_summary),
"incoming adjacency {id}"
);
}
for id in [EdgeId::new(1), EdgeId::new(3)] {
assert_eq!(
g.edge_endpoints(id),
before.edge_endpoints(id),
"endpoints {id}"
);
assert_eq!(g.edge_label(id), before.edge_label(id), "edge label {id}");
assert_eq!(
g.edge_properties(id),
before.edge_properties(id),
"edge properties {id}"
);
}
let la = db_string("cmp.a").unwrap();
let external_with_label = |graph: &SeleneGraph| -> HashSet<NodeId> {
graph
.nodes_with_label(&la)
.map(|bitmap| {
bitmap
.iter()
.map(|row| graph.node_id_for_row(RowIndex::new(row)).unwrap())
.collect()
})
.unwrap_or_default()
};
assert_eq!(external_with_label(g), external_with_label(&before));
assert_eq!(
external_with_label(g),
HashSet::from([NodeId::new(1), NodeId::new(3), NodeId::new(4)])
);
}
#[test]
fn compacted_graph_republishes_cleanly() {
let shared = graph_with_a_deletion();
let compacted = compact_core(&shared.read()).unwrap();
let republished = SharedGraph::from_graph(compacted.graph);
let g = republished.read();
assert_eq!(g.node_count(), 3);
assert_eq!(g.edge_count(), 2);
assert!(g.is_node_alive(NodeId::new(1)));
assert!(g.row_for_node_id(NodeId::new(2)).is_none());
assert_eq!(
g.edge_endpoints(EdgeId::new(1)),
Some((NodeId::new(1), NodeId::new(3)))
);
assert_eq!(
g.edge_endpoints(EdgeId::new(3)),
Some((NodeId::new(3), NodeId::new(4)))
);
}
#[test]
fn compacting_a_dense_graph_is_idempotent() {
let shared = SharedGraph::new(GraphId::new(1));
let la = db_string("cmp.idem").unwrap();
let mut txn = shared.begin_write();
{
let mut m = txn.mutator();
m.create_node(LabelSet::single(la.clone()), PropertyMap::new())
.unwrap();
m.create_node(LabelSet::single(la), PropertyMap::new())
.unwrap();
}
txn.commit().unwrap();
let before = shared.read();
let compacted = compact_core(&before).unwrap();
let g = &compacted.graph;
assert_eq!(compacted.report, CompactionReport::default());
assert_eq!(g.node_store.len(), before.node_store.len());
for id in [NodeId::new(1), NodeId::new(2)] {
assert_eq!(
g.row_for_node_id(id),
before.row_for_node_id(id),
"dense graph: {id} must keep its row"
);
}
assert_eq!(g.row_for_node_id(NodeId::new(1)), Some(RowIndex::new(0)));
assert_eq!(g.row_for_node_id(NodeId::new(2)), Some(RowIndex::new(1)));
assert_eq!(g.meta.next_node_id, before.meta.next_node_id);
}
#[test]
fn compaction_of_empty_graph_is_a_clean_noop() {
let shared = SharedGraph::new(GraphId::new(1));
let before = shared.read();
let compacted = compact_core(&before).unwrap();
assert_eq!(compacted.graph.node_store.len(), 0);
assert_eq!(compacted.graph.edge_store.len(), 0);
assert_eq!(compacted.graph.node_count(), 0);
assert_eq!(compacted.report, CompactionReport::default());
assert_eq!(compacted.graph.meta.next_node_id, before.meta.next_node_id);
let _ = SharedGraph::from_graph(compacted.graph);
}
#[test]
fn compaction_stats_reclaimable_ratio_uses_allocated_rows() {
let stats = CompactionStats {
allocated_nodes: 800,
live_nodes: 600,
reclaimable_nodes: 200,
allocated_edges: 200,
live_edges: 100,
reclaimable_edges: 100,
};
assert_eq!(stats.allocated_rows(), 1_000);
assert_eq!(stats.reclaimable_rows(), 300);
assert_eq!(stats.reclaimable_row_basis_points(), 3_000);
}
#[test]
fn compaction_recommendation_requires_dead_row_floor_and_ratio() {
let below_floor = CompactionStats {
allocated_nodes: COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS * 2,
live_nodes: COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS + 1,
reclaimable_nodes: COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS - 1,
..CompactionStats::default()
};
let below_ratio = CompactionStats {
allocated_nodes: 100_000,
live_nodes: 100_000 - COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS,
reclaimable_nodes: COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS,
..CompactionStats::default()
};
let recommended = CompactionStats {
allocated_nodes: COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS * 4,
live_nodes: COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS * 3,
reclaimable_nodes: COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_ROWS,
..CompactionStats::default()
};
assert_eq!(CompactionStats::default().reclaimable_row_basis_points(), 0);
assert!(!below_floor.compaction_recommended());
assert!(!below_ratio.compaction_recommended());
assert_eq!(
recommended.reclaimable_row_basis_points(),
COMPACTION_RECOMMENDATION_MIN_RECLAIMABLE_BASIS_POINTS
);
assert!(recommended.compaction_recommended());
}
#[test]
fn compaction_of_an_all_deleted_graph_reclaims_everything() {
let shared = SharedGraph::new(GraphId::new(1));
let la = db_string("cmp.alldel").unwrap();
let el = db_string("cmp.alldele").unwrap();
let mut txn = shared.begin_write();
{
let mut m = txn.mutator();
let n1 = m
.create_node(LabelSet::single(la.clone()), PropertyMap::new())
.unwrap();
let n2 = m
.create_node(LabelSet::single(la), PropertyMap::new())
.unwrap();
m.create_edge(el, n1, n2, PropertyMap::new()).unwrap();
m.delete_node(n1).unwrap(); m.delete_node(n2).unwrap();
}
txn.commit().unwrap();
let before = shared.read();
assert_eq!(before.node_count(), 0);
let rows_before = before.node_store.len() as u64;
let next_node_before = before.meta.next_node_id;
let next_edge_before = before.meta.next_edge_id;
let compacted = compact_core(&before).unwrap();
let g = &compacted.graph;
assert_eq!(g.node_store.len(), 0);
assert_eq!(g.edge_store.len(), 0);
assert_eq!(g.node_count(), 0);
assert_eq!(compacted.report.reclaimed_nodes, rows_before);
assert!(compacted.report.reclaimed_edges >= 1);
assert!(g.row_for_node_id(NodeId::new(1)).is_none());
assert!(g.row_for_node_id(NodeId::new(2)).is_none());
assert_eq!(g.meta.next_node_id, next_node_before);
assert_eq!(g.meta.next_edge_id, next_edge_before);
let _ = SharedGraph::from_graph(compacted.graph);
}
#[test]
fn aborted_tx_leaves_no_hole_so_store_stays_dense() {
let shared = SharedGraph::new(GraphId::new(1));
let la = db_string("cmp.hole").unwrap();
let mut txn = shared.begin_write();
{
let mut m = txn.mutator();
let n1 = m
.create_node(LabelSet::single(la.clone()), PropertyMap::new())
.unwrap();
assert_eq!(n1, NodeId::new(1));
}
txn.commit().unwrap();
{
let mut txn = shared.begin_write();
let mut m = txn.mutator();
let n2 = m
.create_node(LabelSet::single(la.clone()), PropertyMap::new())
.unwrap();
assert_eq!(n2, NodeId::new(2));
}
let mut txn = shared.begin_write();
{
let mut m = txn.mutator();
let n3 = m
.create_node(LabelSet::single(la), PropertyMap::new())
.unwrap();
assert_eq!(n3, NodeId::new(3));
}
txn.commit().unwrap();
let before = shared.read();
assert_eq!(before.node_count(), 2, "n1 + n3 alive; id 2 burned");
assert_eq!(
before.node_store.len(),
2,
"append leaves NO interior hole row for the burned id"
);
assert_eq!(
before.row_for_node_id(NodeId::new(1)),
Some(RowIndex::new(0))
);
assert_eq!(
before.row_for_node_id(NodeId::new(3)),
Some(RowIndex::new(1)),
"id 3 appended at the dense end, not arith row 2"
);
assert!(
before.row_for_node_id(NodeId::new(2)).is_none(),
"aborted id was never committed -> NotFound"
);
let next_node_before = before.meta.next_node_id;
let compacted = compact_core(&before).unwrap();
let g = &compacted.graph;
assert_eq!(g.node_store.len(), 2);
assert_eq!(g.node_count(), 2);
assert_eq!(compacted.report.reclaimed_nodes, 0, "nothing to reclaim");
assert_eq!(g.row_for_node_id(NodeId::new(1)), Some(RowIndex::new(0)));
assert_eq!(g.row_for_node_id(NodeId::new(3)), Some(RowIndex::new(1)));
assert!(g.row_for_node_id(NodeId::new(2)).is_none());
assert_eq!(
g.meta.next_node_id, next_node_before,
"burned id 2 must never be reissued"
);
}
#[test]
fn republished_compacted_graph_allocates_without_reuse() {
let shared = graph_with_a_deletion();
let next_node_before = shared.read().meta.next_node_id;
let compacted = compact_core(&shared.read()).unwrap();
let republished = SharedGraph::from_graph(compacted.graph);
let new_id = {
let mut txn = republished.begin_write();
let id = {
let mut m = txn.mutator();
m.create_node(
LabelSet::single(db_string("cmp.a").unwrap()),
PropertyMap::new(),
)
.unwrap()
};
txn.commit().unwrap();
id
};
let g = republished.read();
assert_eq!(
new_id,
NodeId::new(next_node_before),
"the next create must take the preserved high-water mark, not a reclaimed id"
);
assert!(g.is_node_alive(new_id));
assert_eq!(g.row_for_node_id(new_id), Some(RowIndex::new(3)));
assert_eq!(g.node_store.len(), 4, "store stays dense, no hole re-bloat");
assert!(g.is_node_alive(NodeId::new(1)));
assert!(g.is_node_alive(NodeId::new(3)));
assert!(g.is_node_alive(NodeId::new(4)));
assert!(g.row_for_node_id(NodeId::new(2)).is_none());
}
fn graph_with_one_live_node() -> SeleneGraph {
let mut graph = SeleneGraph::new(GraphId::new(1));
graph
.node_store
.labels
.push(LabelSet::single(db_string("cmp.live").unwrap()));
graph.node_store.properties.push(PropertyMap::new());
graph.node_store.row_to_id.push(NodeId::new(1));
graph.node_store.alive_mut().insert(0);
graph
.node_id_to_row
.insert(NodeId::new(1), RowIndex::new(0));
graph
}
fn expect_compact_inconsistent(graph: &SeleneGraph) -> GraphError {
match compact_core(graph) {
Ok(_) => panic!("compaction should have failed loudly on a corrupt graph"),
Err(err) => {
assert!(
matches!(err, GraphError::Inconsistent { .. }),
"expected GraphError::Inconsistent, got {err:?}",
);
err
}
}
}
#[test]
fn compaction_rejects_edge_with_dead_endpoint() {
let mut graph = graph_with_one_live_node();
graph
.edge_store
.label
.push(db_string("cmp.dangling").unwrap());
graph.edge_store.source.push(NodeId::new(1));
graph.edge_store.target.push(NodeId::new(99)); graph.edge_store.properties.push(PropertyMap::new());
graph.edge_store.row_to_id.push(EdgeId::new(1));
graph.edge_store.alive_mut().insert(0);
graph
.edge_id_to_row
.insert(EdgeId::new(1), RowIndex::new(0));
let GraphError::Inconsistent { reason } = expect_compact_inconsistent(&graph) else {
unreachable!("helper returns only Inconsistent");
};
assert!(
reason.contains("survives compaction but endpoint"),
"expected dead-endpoint rejection, got: {reason}",
);
}
#[test]
fn compaction_rejects_alive_node_row_with_no_external_id() {
let mut graph = SeleneGraph::new(GraphId::new(1));
graph
.node_store
.labels
.push(LabelSet::single(db_string("cmp.noid").unwrap()));
graph.node_store.properties.push(PropertyMap::new());
graph.node_store.row_to_id.push(NodeId::TOMBSTONE); graph.node_store.alive_mut().insert(0);
let GraphError::Inconsistent { reason } = expect_compact_inconsistent(&graph) else {
unreachable!("helper returns only Inconsistent");
};
assert!(
reason.contains("alive node row") && reason.contains("no external id"),
"expected missing-node-id rejection, got: {reason}",
);
}
#[test]
fn compaction_rejects_alive_edge_row_with_no_external_id() {
let mut graph = graph_with_one_live_node();
graph
.edge_store
.label
.push(db_string("cmp.noid.edge").unwrap());
graph.edge_store.source.push(NodeId::new(1));
graph.edge_store.target.push(NodeId::new(1));
graph.edge_store.properties.push(PropertyMap::new());
graph.edge_store.row_to_id.push(EdgeId::TOMBSTONE); graph.edge_store.alive_mut().insert(0);
let GraphError::Inconsistent { reason } = expect_compact_inconsistent(&graph) else {
unreachable!("helper returns only Inconsistent");
};
assert!(
reason.contains("alive edge row") && reason.contains("no external id"),
"expected missing-edge-id rejection, got: {reason}",
);
}