use selene_core::{Change, EdgeId, GraphId, LabelSet, NodeId, PropertyMap, db_string};
use super::{append_wal, node_created, sample_shared_graph, temp_dir, write_snapshot};
use crate::store::RowIndex;
use crate::{SeleneGraph, SharedGraph};
fn non_identity_graph() -> SeleneGraph {
let nlabel = db_string("nidr.node").unwrap();
let elabel = db_string("nidr.edge").unwrap();
let hole_elabel = db_string("__selene_hole").unwrap();
let mut g = SeleneGraph::new(GraphId::new(1));
let push_node = |g: &mut SeleneGraph, id: NodeId, alive: bool| {
let row = g.node_store.row_to_id.len() as u32;
g.node_store.labels.push(if id == NodeId::TOMBSTONE {
LabelSet::new()
} else {
LabelSet::single(nlabel.clone())
});
g.node_store.properties.push(PropertyMap::new());
g.node_store.row_to_id.push(id);
if alive {
g.node_store.alive_mut().insert(row);
}
};
push_node(&mut g, NodeId::new(5), true);
push_node(&mut g, NodeId::new(8), true);
push_node(&mut g, NodeId::TOMBSTONE, false); push_node(&mut g, NodeId::new(15), false); push_node(&mut g, NodeId::new(20), true);
g.meta.next_node_id = 21;
let push_edge =
|g: &mut SeleneGraph, id: EdgeId, source: NodeId, target: NodeId, alive: bool| {
let row = g.edge_store.row_to_id.len() as u32;
g.edge_store.label.push(if id == EdgeId::TOMBSTONE {
hole_elabel.clone()
} else {
elabel.clone()
});
g.edge_store.source.push(source);
g.edge_store.target.push(target);
g.edge_store.properties.push(PropertyMap::new());
g.edge_store.row_to_id.push(id);
if alive {
g.edge_store.alive_mut().insert(row);
}
};
push_edge(&mut g, EdgeId::new(3), NodeId::new(5), NodeId::new(8), true);
push_edge(
&mut g,
EdgeId::TOMBSTONE,
NodeId::TOMBSTONE,
NodeId::TOMBSTONE,
false,
);
push_edge(
&mut g,
EdgeId::new(7),
NodeId::new(20),
NodeId::new(5),
true,
);
g.meta.next_edge_id = 8;
g
}
#[test]
fn non_identity_snapshot_round_trips_positionally() {
let dir = temp_dir("nodeid-split-positional");
let shared = SharedGraph::from_graph(non_identity_graph());
write_snapshot(&dir, &shared, 1);
let recovered = SharedGraph::recover(&dir, GraphId::new(1)).expect("recovery succeeds");
let g = recovered.read();
assert_eq!(g.row_for_node_id(NodeId::new(5)), Some(RowIndex::new(0)));
assert_eq!(g.row_for_node_id(NodeId::new(8)), Some(RowIndex::new(1)));
assert_eq!(g.row_for_node_id(NodeId::new(15)), Some(RowIndex::new(3)));
assert_eq!(g.row_for_node_id(NodeId::new(20)), Some(RowIndex::new(4)));
assert_ne!(g.row_for_node_id(NodeId::new(20)), Some(RowIndex::new(19)));
assert_eq!(g.node_id_for_row(RowIndex::new(0)), Some(NodeId::new(5)));
assert_eq!(g.node_id_for_row(RowIndex::new(4)), Some(NodeId::new(20)));
assert_eq!(g.node_id_for_row(RowIndex::new(2)), None);
assert_eq!(g.node_store.len(), 5, "the interior hole row is preserved");
assert!(g.is_node_alive(NodeId::new(5)));
assert!(g.is_node_alive(NodeId::new(8)));
assert!(g.is_node_alive(NodeId::new(20)));
assert!(!g.is_node_alive(NodeId::new(15)));
assert_eq!(g.row_for_node_id(NodeId::new(15)), Some(RowIndex::new(3)));
assert_eq!(g.row_for_node_id(NodeId::new(3)), None);
assert!(!g.is_node_alive(NodeId::new(3)));
assert_eq!(g.node_count(), 3, "only the three alive nodes are counted");
assert_eq!(
g.node_labels(NodeId::new(20))
.unwrap()
.iter()
.cloned()
.collect::<Vec<_>>(),
vec![db_string("nidr.node").unwrap()]
);
assert_eq!(g.meta.next_node_id, 21);
assert_eq!(g.meta.next_edge_id, 8);
assert_eq!(g.row_for_edge_id(EdgeId::new(3)), Some(RowIndex::new(0)));
assert_eq!(g.row_for_edge_id(EdgeId::new(7)), Some(RowIndex::new(2)));
assert_ne!(g.row_for_edge_id(EdgeId::new(7)), Some(RowIndex::new(6)));
assert_eq!(g.edge_id_for_row(RowIndex::new(1)), None); assert_eq!(g.edge_store.len(), 3, "the interior edge hole is preserved");
assert!(g.is_edge_alive(EdgeId::new(3)));
assert!(g.is_edge_alive(EdgeId::new(7)));
assert_eq!(
g.edge_endpoints(EdgeId::new(3)),
Some((NodeId::new(5), NodeId::new(8)))
);
assert_eq!(
g.edge_endpoints(EdgeId::new(7)),
Some((NodeId::new(20), NodeId::new(5)))
);
assert!(
g.outgoing_edges(NodeId::new(20))
.unwrap()
.iter()
.any(|e| e.neighbor == NodeId::new(5) && e.edge_id == EdgeId::new(7))
);
assert!(
g.incoming_edges(NodeId::new(8))
.unwrap()
.iter()
.any(|e| e.neighbor == NodeId::new(5) && e.edge_id == EdgeId::new(3))
);
}
fn descending_first_graph() -> SeleneGraph {
let nlabel = db_string("nidr2.node").unwrap();
let mut g = SeleneGraph::new(GraphId::new(2));
for id in [10u64, 3, 7] {
let row = g.node_store.row_to_id.len() as u32;
g.node_store.labels.push(LabelSet::single(nlabel.clone()));
g.node_store.properties.push(PropertyMap::new());
g.node_store.row_to_id.push(NodeId::new(id));
g.node_store.alive_mut().insert(row);
}
g.meta.next_node_id = 11;
g
}
#[test]
fn out_of_order_positional_snapshot_round_trips() {
let dir = temp_dir("nodeid-split-out-of-order");
let shared = SharedGraph::from_graph(descending_first_graph());
write_snapshot(&dir, &shared, 1);
let recovered = SharedGraph::recover(&dir, GraphId::new(2)).expect("recovery succeeds");
let g = recovered.read();
assert_eq!(g.row_for_node_id(NodeId::new(10)), Some(RowIndex::new(0)));
assert_eq!(g.row_for_node_id(NodeId::new(3)), Some(RowIndex::new(1)));
assert_eq!(g.row_for_node_id(NodeId::new(7)), Some(RowIndex::new(2)));
assert_eq!(g.node_id_for_row(RowIndex::new(0)), Some(NodeId::new(10)));
assert_eq!(g.node_id_for_row(RowIndex::new(1)), Some(NodeId::new(3)));
assert_eq!(g.node_id_for_row(RowIndex::new(2)), Some(NodeId::new(7)));
assert!(g.is_node_alive(NodeId::new(10)));
assert!(g.is_node_alive(NodeId::new(3)));
assert!(g.is_node_alive(NodeId::new(7)));
assert_eq!(g.node_count(), 3);
assert_eq!(g.node_store.len(), 3, "no leftover pad row");
}
#[test]
fn recovered_store_continues_id_allocation_without_clobber() {
let dir = temp_dir("nodeid-split-realloc");
let shared = SharedGraph::from_graph(non_identity_graph());
write_snapshot(&dir, &shared, 1);
let recovered = SharedGraph::recover(&dir, GraphId::new(1)).expect("recovery succeeds");
let new_id = {
let mut txn = recovered.begin_write();
let id = {
let mut m = txn.mutator();
m.create_node(
LabelSet::single(db_string("nidr.node").unwrap()),
PropertyMap::new(),
)
.unwrap()
};
txn.commit().unwrap();
id
};
let g = recovered.read();
assert_eq!(new_id, NodeId::new(21), "allocation resumes at the floor");
assert!(g.is_node_alive(NodeId::new(21)));
assert_eq!(g.row_for_node_id(NodeId::new(5)), Some(RowIndex::new(0)));
assert_eq!(g.row_for_node_id(NodeId::new(20)), Some(RowIndex::new(4)));
assert!(g.is_node_alive(NodeId::new(5)));
assert!(!g.is_node_alive(NodeId::new(15)));
assert!(g.row_for_node_id(NodeId::new(15)).is_some());
assert!(g.row_for_node_id(NodeId::new(3)).is_none());
}
fn compacted_sample() -> SharedGraph {
let shared = sample_shared_graph();
{
let mut txn = shared.begin_write();
{
let mut m = txn.mutator();
m.delete_node(NodeId::new(2)).unwrap();
m.delete_node(NodeId::new(3)).unwrap();
}
txn.commit().unwrap();
}
let report = shared.compact().unwrap();
assert_eq!(report.reclaimed_nodes, 3, "rows 2, 3, 5 reclaimed");
assert!(
report.reclaimed_edges >= 1,
"the cascade-deleted edge reclaimed"
);
{
let g = shared.read();
assert_eq!(g.node_store.len(), 2, "live store densified to 1 and 4");
assert_eq!(g.meta.next_node_id, 6, "high-water preserved");
}
shared
}
#[test]
fn compacted_snapshot_recovers_dense() {
let dir = temp_dir("compacted-snapshot-dense");
let shared = compacted_sample();
write_snapshot(&dir, &shared, 3);
let recovered = SharedGraph::recover(&dir, GraphId::new(7)).unwrap();
let g = recovered.read();
assert_eq!(g.node_store.len(), 2, "recovered store is dense (no holes)");
assert_eq!(g.node_count(), 2);
assert!(g.is_node_alive(NodeId::new(1)));
assert!(g.is_node_alive(NodeId::new(4)));
assert!(g.row_for_node_id(NodeId::new(2)).is_none());
assert!(g.row_for_node_id(NodeId::new(3)).is_none());
assert!(g.row_for_node_id(NodeId::new(5)).is_none());
assert_eq!(g.meta.next_node_id, 6);
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn post_compaction_wal_create_recovers_dense_without_rebloat() {
let dir = temp_dir("compacted-snapshot-wal-rebloat");
let shared = compacted_sample();
write_snapshot(&dir, &shared, 3);
append_wal(&dir, 3, &[node_created(6)]);
let recovered = SharedGraph::recover(&dir, GraphId::new(7)).unwrap();
let g = recovered.read();
assert!(g.is_node_alive(NodeId::new(1)));
assert!(g.is_node_alive(NodeId::new(4)));
assert!(
g.is_node_alive(NodeId::new(6)),
"the WAL-created node recovered"
);
assert!(g.row_for_node_id(NodeId::new(2)).is_none());
assert!(g.row_for_node_id(NodeId::new(3)).is_none());
assert!(g.row_for_node_id(NodeId::new(5)).is_none());
assert_eq!(
g.node_store.len(),
3,
"no hole re-bloat from the WAL-created id"
);
assert_eq!(g.row_for_node_id(NodeId::new(6)), Some(RowIndex::new(2)));
assert_eq!(g.meta.next_node_id, 7);
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn post_compaction_wal_edge_create_recovers_dense_without_rebloat() {
let dir = temp_dir("compacted-snapshot-wal-edge");
let shared = compacted_sample();
write_snapshot(&dir, &shared, 3);
let edge = Change::EdgeCreated {
id: EdgeId::new(2),
label: db_string("recover.wal.edge").unwrap(),
source: NodeId::new(1),
target: NodeId::new(4),
properties: PropertyMap::new(),
};
append_wal(&dir, 3, &[edge]);
let recovered = SharedGraph::recover(&dir, GraphId::new(7)).unwrap();
let g = recovered.read();
assert!(g.is_edge_alive(EdgeId::new(2)));
assert_eq!(
g.edge_store.len(),
1,
"no edge re-bloat: the WAL edge appended at the dense end"
);
assert_eq!(g.row_for_edge_id(EdgeId::new(2)), Some(RowIndex::new(0)));
assert_eq!(
g.edge_endpoints(EdgeId::new(2)),
Some((NodeId::new(1), NodeId::new(4)))
);
assert!(
g.outgoing_edges(NodeId::new(1))
.unwrap()
.iter()
.any(|e| e.edge_id == EdgeId::new(2) && e.neighbor == NodeId::new(4))
);
assert_eq!(g.meta.next_edge_id, 3);
let _ = std::fs::remove_dir_all(dir);
}
#[test]
fn post_compaction_wal_delete_of_survivor_replays_against_compacted_snapshot() {
let dir = temp_dir("compacted-snapshot-wal-delete");
let shared = compacted_sample(); write_snapshot(&dir, &shared, 3);
append_wal(&dir, 3, &[Change::NodeDeleted { id: NodeId::new(1) }]);
let recovered = SharedGraph::recover(&dir, GraphId::new(7)).unwrap();
let g = recovered.read();
assert!(
!g.is_node_alive(NodeId::new(1)),
"the post-compaction delete of a survivor applied"
);
assert!(
g.is_node_alive(NodeId::new(4)),
"the other survivor is untouched"
);
assert!(g.row_for_node_id(NodeId::new(2)).is_none());
assert!(g.row_for_node_id(NodeId::new(5)).is_none());
let _ = std::fs::remove_dir_all(dir);
}