import json
import os
import tempfile
import pytest
from silk import GraphStore
ONTOLOGY = json.dumps(
{
"node_types": {
"entity": {
"description": "A thing",
"properties": {},
},
},
"edge_types": {},
}
)
class TestPersistentMerge:
def test_merged_entries_survive_reopen(self):
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "peer_b.silk")
store_a = GraphStore("inst-a", ONTOLOGY)
store_a.add_node("n1", "entity", "Node 1")
store_a.add_node("n2", "entity", "Node 2")
store_b = GraphStore("inst-b", ONTOLOGY, path=db_path)
assert store_b.len() == 1
offer_b = store_b.generate_sync_offer()
payload = store_a.receive_sync_offer(offer_b)
merged = store_b.merge_sync_payload(payload)
assert merged >= 2
assert store_b.get_node("n1") is not None
assert store_b.get_node("n2") is not None
len_before = store_b.len()
del store_b
store_b2 = GraphStore.open(db_path)
assert store_b2.len() == len_before, f"reopen lost entries: {store_b2.len()} vs {len_before}"
assert store_b2.get_node("n1") is not None, "n1 lost after reopen"
assert store_b2.get_node("n2") is not None, "n2 lost after reopen"
def test_merged_entries_heads_survive_reopen(self):
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "peer_b.silk")
store_a = GraphStore("inst-a", ONTOLOGY)
store_a.add_node("n1", "entity", "Node 1")
store_b = GraphStore("inst-b", ONTOLOGY, path=db_path)
offer_b = store_b.generate_sync_offer()
payload = store_a.receive_sync_offer(offer_b)
store_b.merge_sync_payload(payload)
heads_before = store_b.heads()
del store_b
store_b2 = GraphStore.open(db_path)
assert set(store_b2.heads()) == set(heads_before), "heads changed after reopen"
def test_merged_graph_queries_survive_reopen(self):
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "peer_b.silk")
store_a = GraphStore("inst-a", ONTOLOGY)
store_a.add_node("a", "entity", "A")
store_a.add_node("b", "entity", "B")
store_b = GraphStore("inst-b", ONTOLOGY, path=db_path)
offer_b = store_b.generate_sync_offer()
payload = store_a.receive_sync_offer(offer_b)
store_b.merge_sync_payload(payload)
del store_b
store_b2 = GraphStore.open(db_path)
nodes = store_b2.all_nodes()
node_ids = {n["node_id"] for n in nodes}
assert node_ids == {"a", "b"}, f"got {node_ids}"
def test_snapshot_into_persistent_store_survives_reopen(self):
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "new_peer.silk")
store_a = GraphStore("inst-a", ONTOLOGY)
store_a.add_node("x", "entity", "X")
store_a.add_node("y", "entity", "Y")
store_new = GraphStore("inst-new", ONTOLOGY, path=db_path)
offer = store_new.generate_sync_offer()
payload = store_a.receive_sync_offer(offer)
store_new.merge_sync_payload(payload)
assert store_new.get_node("x") is not None
assert store_new.get_node("y") is not None
del store_new
store_new2 = GraphStore.open(db_path)
assert store_new2.get_node("x") is not None, "x lost"
assert store_new2.get_node("y") is not None, "y lost"