import sys
import pytest
from silk import GraphStore
sys.path.insert(0, ".")
from experiments.harness import Metric, check_metrics
ONTOLOGY = {
"node_types": {
"server": {
"properties": {
"status": {"value_type": "string"},
"name": {"value_type": "string"},
"port": {"value_type": "int"},
}
}
},
"edge_types": {
"CONNECTS": {
"source_types": ["server"],
"target_types": ["server"],
}
},
}
def _sync(sender, receiver):
offer = receiver.generate_sync_offer()
payload = sender.receive_sync_offer(offer)
receiver.merge_sync_payload(payload)
offer = sender.generate_sync_offer()
payload = receiver.receive_sync_offer(offer)
sender.merge_sync_payload(payload)
def _get_props(store, node_id):
node = store.get_node(node_id)
return node["properties"] if node else None
def test_compaction_per_property_clock():
a = GraphStore("peer-a", ONTOLOGY)
a.add_node("s1", "server", "S1", {"status": "up", "name": "alpha"})
b = GraphStore("peer-b", ONTOLOGY)
_sync(a, b)
b.update_property("s1", "status", "down")
for i in range(10):
a.add_node(f"dummy-{i}", "server", f"D{i}", {"status": "up", "name": f"d{i}"})
a.update_property("s1", "name", "beta")
ref = GraphStore("ref", ONTOLOGY)
_sync(a, ref)
_sync(b, ref)
ref_props = _get_props(ref, "s1")
print(f" Reference (no compaction): status={ref_props['status']}, name={ref_props['name']}")
a.compact()
snap = a.snapshot()
a_fresh = GraphStore.from_snapshot("peer-a-fresh", snap)
_sync(b, a_fresh)
props = _get_props(a_fresh, "s1")
assert props is not None, "s1 should exist after sync"
print(f" Reference (no compaction): status={ref_props['status']}, name={ref_props['name']}")
print(f" Compacted sync: status={props.get('status')}, name={props.get('name')}")
check_metrics([
Metric(
name="compaction_status_matches_reference",
measured=1 if props["status"] == ref_props["status"] else 0,
threshold=1,
op="==",
),
Metric(
name="compaction_name_matches_reference",
measured=1 if props["name"] == ref_props["name"] else 0,
threshold=1,
op="==",
),
], label="EXP-02 per-property clock preservation")
def test_compaction_no_zombie_resurrection():
a = GraphStore("peer-a", ONTOLOGY)
a.add_node("s1", "server", "S1", {"status": "up", "name": "ghost"})
b = GraphStore("peer-b", ONTOLOGY)
_sync(a, b)
assert b.get_node("s1") is not None, "B should have s1"
a.remove_node("s1")
assert a.get_node("s1") is None, "A should not have s1 after delete"
a.compact()
b.update_property("s1", "status", "down")
snap = a.snapshot()
a_fresh = GraphStore.from_snapshot("peer-a-fresh", snap)
_sync(b, a_fresh)
node = a_fresh.get_node("s1")
print(f" s1 after zombie test: {node}")
if node:
print(f" -> ZOMBIE: s1 reappeared after compaction")
print(f" -> This is expected IF safety precondition was violated")
else:
print(f" -> No zombie: s1 stayed dead")
def test_compaction_edge_property_clocks():
a = GraphStore("peer-a", ONTOLOGY)
a.add_node("s1", "server", "S1", {"status": "up", "name": "s1"})
a.add_node("s2", "server", "S2", {"status": "up", "name": "s2"})
a.add_edge("e1", "CONNECTS", "s1", "s2", {"weight": 1})
a.add_node("dummy", "server", "D", {"status": "up", "name": "d"})
a.update_property("e1", "weight", 99)
b = GraphStore("peer-b", ONTOLOGY)
_sync(a, b)
b.update_property("e1", "label", "primary")
a.compact()
snap = a.snapshot()
a_fresh = GraphStore.from_snapshot("peer-a-fresh", snap)
_sync(b, a_fresh)
edge = a_fresh.get_edge("e1")
assert edge is not None, "e1 should exist"
print(f" edge props after compacted sync: {edge['properties']}")
check_metrics([
Metric(
name="edge_weight_preserved",
measured=1 if edge["properties"].get("weight") == 99 else 0,
threshold=1,
op="==",
),
Metric(
name="edge_label_from_concurrent_peer",
measured=1 if edge["properties"].get("label") == "primary" else 0,
threshold=1,
op="==",
),
], label="EXP-02 edge property clocks")
def test_double_compaction_preserves_state():
a = GraphStore("peer-a", ONTOLOGY)
a.add_node("s1", "server", "S1", {"status": "up", "name": "alpha"})
a.add_node("s2", "server", "S2", {"status": "down", "name": "beta"})
a.add_edge("e1", "CONNECTS", "s1", "s2")
a.update_property("s1", "status", "down")
a.compact()
state_after_first = {n["node_id"]: n["properties"] for n in a.all_nodes()}
a.add_node("s3", "server", "S3", {"status": "up", "name": "gamma"})
a.compact()
state_after_second = {n["node_id"]: n["properties"] for n in a.all_nodes()}
assert state_after_second["s1"] == state_after_first["s1"]
assert state_after_second["s2"] == state_after_first["s2"]
assert "s3" in state_after_second
def test_compaction_add_wins_semantics():
a = GraphStore("peer-a", ONTOLOGY)
a.add_node("s1", "server", "S1", {"status": "up", "name": "test"})
b = GraphStore("peer-b", ONTOLOGY)
_sync(a, b)
a.remove_node("s1")
for i in range(5):
b.add_node(f"b-{i}", "server", f"B{i}", {"status": "up", "name": f"b{i}"})
b.update_property("s1", "status", "revived")
a.compact()
snap = a.snapshot()
a_fresh = GraphStore.from_snapshot("peer-a-fresh", snap)
_sync(b, a_fresh)
node = a_fresh.get_node("s1")
print(f" s1 after add-wins test: {node}")
assert node is not None, (
"s1 should exist — B's concurrent add should win over A's remove"
)
assert node["properties"]["status"] == "revived"
if __name__ == "__main__":
tests = [
("Per-property clock preservation", test_compaction_per_property_clock),
("Zombie resurrection", test_compaction_no_zombie_resurrection),
("Edge property clocks", test_compaction_edge_property_clocks),
("Double compaction", test_double_compaction_preserves_state),
("Add-wins after compaction", test_compaction_add_wins_semantics),
]
passed = 0
failed = 0
for name, fn in tests:
print(f"\n{'='*60}")
print(f"EXP-02: {name}")
print(f"{'='*60}")
try:
fn()
print(f" RESULT: PASS")
passed += 1
except AssertionError as e:
print(f" RESULT: FAIL — {e}")
failed += 1
except Exception as e:
print(f" RESULT: ERROR — {type(e).__name__}: {e}")
failed += 1
print(f"\n{'='*60}")
print(f"EXP-02 Summary: {passed} passed, {failed} failed")
print(f"{'='*60}")