import pytest
import json
from silk import GraphStore
ONTOLOGY = {
"node_types": {
"entity": {
"properties": {
"status": {"value_type": "string"}
}
}
},
"edge_types": {
"LINKS": {
"source_types": ["entity"],
"target_types": ["entity"],
"properties": {}
}
}
}
def _store(instance_id="test"):
return GraphStore(instance_id, ONTOLOGY)
def test_compact_returns_hash():
store = _store()
store.add_node("n1", "entity", "Node 1")
h = store.compact()
assert isinstance(h, str)
assert len(h) == 64
def test_compact_reduces_oplog():
store = _store()
store.add_node("n1", "entity", "A")
store.add_node("n2", "entity", "B")
store.add_node("n3", "entity", "C")
store.add_edge("e1", "LINKS", "n1", "n2")
before = store.len()
assert before >= 5
store.compact()
assert store.len() == 1
def test_compact_preserves_nodes():
store = _store()
store.add_node("n1", "entity", "A", {"status": "active"})
store.add_node("n2", "entity", "B", {"status": "idle"})
store.compact()
assert store.get_node("n1") is not None
assert store.get_node("n1")["properties"]["status"] == "active"
assert store.get_node("n2") is not None
assert store.get_node("n2")["properties"]["status"] == "idle"
def test_compact_preserves_edges():
store = _store()
store.add_node("n1", "entity", "A")
store.add_node("n2", "entity", "B")
store.add_edge("e1", "LINKS", "n1", "n2")
store.compact()
assert store.get_edge("e1") is not None
assert store.get_edge("e1")["source_id"] == "n1"
assert store.get_edge("e1")["target_id"] == "n2"
def test_compact_tombstoned_nodes_excluded():
store = _store()
store.add_node("n1", "entity", "Keep")
store.add_node("n2", "entity", "Remove")
store.remove_node("n2")
store.compact()
assert store.get_node("n1") is not None
assert store.get_node("n2") is None
assert len(store.all_nodes()) == 1
def test_write_after_compact():
store = _store()
store.add_node("n1", "entity", "Before")
store.compact()
store.add_node("n2", "entity", "After")
assert store.get_node("n1") is not None
assert store.get_node("n2") is not None
assert store.len() == 2
def test_multiple_compactions():
store = _store()
store.add_node("n1", "entity", "A")
store.compact()
store.add_node("n2", "entity", "B")
store.compact()
store.add_node("n3", "entity", "C")
store.compact()
assert store.len() == 1
assert len(store.all_nodes()) == 3
def test_snapshot_after_compact():
store_a = _store("a")
store_a.add_node("n1", "entity", "A")
store_a.add_node("n2", "entity", "B")
store_a.compact()
snap = store_a.snapshot()
store_b = GraphStore.from_snapshot("b", snap)
assert store_b.get_node("n1") is not None
assert store_b.get_node("n2") is not None
def test_sync_after_compact():
store_a = _store("a")
store_b = _store("b")
store_a.add_node("n1", "entity", "From A")
store_a.compact()
snap = store_a.snapshot()
store_b = GraphStore.from_snapshot("b", snap)
assert store_b.get_node("n1") is not None
def test_compact_preserves_ontology_extensions():
store = _store()
store.extend_ontology({
"node_types": {"service": {"properties": {}}}
})
store.add_node("svc-1", "service", "API")
store.compact()
assert store.get_node("svc-1") is not None
store.add_node("svc-2", "service", "Web")
assert store.get_node("svc-2") is not None
def test_query_builder_after_compact():
from silk import Query
store = _store()
store.add_node("n1", "entity", "A", {"status": "active"})
store.add_node("n2", "entity", "B", {"status": "idle"})
store.compact()
active = Query(store).nodes("entity").where(status="active").collect()
assert len(active) == 1
assert active[0]["node_id"] == "n1"
def test_compact_persistent_store(tmp_path):
path = str(tmp_path / "test.redb")
store = GraphStore("test", ONTOLOGY, path=path)
store.add_node("n1", "entity", "Persistent")
store.add_node("n2", "entity", "Also persistent")
before = store.len()
store.compact()
assert store.len() == 1
del store
store2 = GraphStore.open(path)
assert store2.get_node("n1") is not None
assert store2.get_node("n2") is not None
assert store2.len() == 1
def test_create_checkpoint_returns_bytes():
store = _store()
store.add_node("n1", "entity", "Node")
before = store.len()
checkpoint_bytes = store.create_checkpoint()
assert isinstance(checkpoint_bytes, bytes)
assert len(checkpoint_bytes) > 0
assert store.len() == before
def test_compact_safe_no_peers():
store = _store()
store.add_node("n1", "entity", "Node")
safe, reasons = store.verify_compaction_safe()
assert safe
assert reasons == []
store.compact()
assert store.len() == 1
def test_compact_unsafe_with_unsynced_peer():
store = _store()
store.add_node("n1", "entity", "Node")
store.register_peer("remote-1", "tcp://remote:7701")
safe, reasons = store.verify_compaction_safe()
assert not safe
assert len(reasons) == 1
assert "remote-1" in reasons[0]
assert "never synced" in reasons[0]
def test_compact_rejects_when_unsafe():
store = _store()
store.add_node("n1", "entity", "Node")
store.register_peer("remote-1", "tcp://remote:7701")
with pytest.raises(RuntimeError, match="compaction is unsafe"):
store.compact()
def test_compact_force_bypasses_safety():
store = _store()
store.add_node("n1", "entity", "Node")
store.register_peer("remote-1", "tcp://remote:7701")
store.compact(safe=False)
assert store.len() == 1
def test_compact_safe_after_sync():
store = _store()
store.add_node("n1", "entity", "Node")
store.register_peer("remote-1", "tcp://remote:7701")
store.record_sync("remote-1")
safe, reasons = store.verify_compaction_safe()
assert safe
assert reasons == []
store.compact()
assert store.len() == 1
def test_compact_unsafe_partial_sync():
store = _store()
store.add_node("n1", "entity", "Node")
store.register_peer("peer-a", "tcp://a:7701")
store.register_peer("peer-b", "tcp://b:7701")
store.record_sync("peer-a")
safe, reasons = store.verify_compaction_safe()
assert not safe
assert len(reasons) == 1
assert "peer-b" in reasons[0]