import random
import sys
import time
import pytest
from silk import GraphStore
sys.path.insert(0, ".")
from experiments.harness import Metric, check_metrics, print_table
ONTOLOGY = {
"node_types": {
"entity": {
"properties": {
"name": {"value_type": "string"},
"seq": {"value_type": "int"},
}
}
},
"edge_types": {
"LINKS": {
"source_types": ["entity"],
"target_types": ["entity"],
}
},
}
def _make_diverged_peers(n_shared: int, n_unique_each: int):
a = GraphStore("peer-a", ONTOLOGY)
for i in range(n_shared):
a.add_node(f"shared-{i}", "entity", f"S{i}", {"name": f"s-{i}", "seq": i})
b = GraphStore.from_snapshot("peer-b", a.snapshot())
for i in range(n_unique_each):
a.add_node(f"a-{i}", "entity", f"A{i}", {"name": f"a-{i}", "seq": n_shared + i})
b.add_node(f"b-{i}", "entity", f"B{i}", {"name": f"b-{i}", "seq": n_shared + i})
return a, b
def _sync_bidirectional(a, b):
offer_b = b.generate_sync_offer()
payload_ab = a.receive_sync_offer(offer_b)
m1 = b.merge_sync_payload(payload_ab)
offer_a = a.generate_sync_offer()
payload_ba = b.receive_sync_offer(offer_a)
m2 = a.merge_sync_payload(payload_ba)
return m1, m2
def _assert_converged(a, b, label=""):
nodes_a = sorted([n["node_id"] for n in a.all_nodes()])
nodes_b = sorted([n["node_id"] for n in b.all_nodes()])
assert nodes_a == nodes_b, f"{label}: nodes diverged. A={len(nodes_a)}, B={len(nodes_b)}"
for nid in nodes_a:
pa = a.get_node(nid)["properties"]
pb = b.get_node(nid)["properties"]
assert pa == pb, f"{label}: properties diverged for {nid}"
def test_message_loss_recovery():
a, b = _make_diverged_peers(50, 20)
offer_b = b.generate_sync_offer()
payload_ab = a.receive_sync_offer(offer_b)
b.merge_sync_payload(payload_ab)
assert len(a.all_nodes()) < len(b.all_nodes())
_sync_bidirectional(a, b)
_assert_converged(a, b, "message loss recovery")
def test_duplicate_delivery_idempotent():
a, b = _make_diverged_peers(50, 20)
offer_b = b.generate_sync_offer()
payload = a.receive_sync_offer(offer_b)
m1 = b.merge_sync_payload(payload)
nodes_after_first = sorted([n["node_id"] for n in b.all_nodes()])
m2 = b.merge_sync_payload(payload)
nodes_after_second = sorted([n["node_id"] for n in b.all_nodes()])
assert m2 == 0, f"duplicate merge should insert 0 entries, got {m2}"
assert nodes_after_first == nodes_after_second
def test_corrupted_payload_rejected():
a, b = _make_diverged_peers(50, 20)
offer_b = b.generate_sync_offer()
payload = a.receive_sync_offer(offer_b)
corrupted = bytearray(payload)
mid = len(corrupted) // 2
corrupted[mid] ^= 0xFF
corrupted[mid + 1] ^= 0xFF
corrupted[mid + 2] ^= 0xFF
corrupted = bytes(corrupted)
nodes_before = sorted([n["node_id"] for n in b.all_nodes()])
try:
merged = b.merge_sync_payload(corrupted)
nodes_after = sorted([n["node_id"] for n in b.all_nodes()])
except Exception:
pass
ok, errors = b.verify_integrity()
assert ok, f"integrity check failed after corruption attempt: {errors}"
def test_truncated_payload_rejected():
a, b = _make_diverged_peers(50, 20)
offer_b = b.generate_sync_offer()
payload = a.receive_sync_offer(offer_b)
truncated = payload[:len(payload) // 2]
nodes_before = sorted([n["node_id"] for n in b.all_nodes()])
try:
b.merge_sync_payload(truncated)
except Exception:
pass
nodes_after = sorted([n["node_id"] for n in b.all_nodes()])
assert nodes_before == nodes_after, "truncated payload should not change graph"
ok, errors = b.verify_integrity()
assert ok, f"integrity failed after truncated payload: {errors}"
def test_convergence_under_random_loss():
rng = random.Random(42)
a, b = _make_diverged_peers(100, 50)
expected_total = len(a.all_nodes()) + len(b.all_nodes()) - 100
for round_num in range(20):
offer_b = b.generate_sync_offer()
payload_ab = a.receive_sync_offer(offer_b)
if rng.random() > 0.5:
b.merge_sync_payload(payload_ab)
offer_a = a.generate_sync_offer()
payload_ba = b.receive_sync_offer(offer_a)
if rng.random() > 0.5:
a.merge_sync_payload(payload_ba)
nodes_a = set(n["node_id"] for n in a.all_nodes())
nodes_b = set(n["node_id"] for n in b.all_nodes())
if nodes_a == nodes_b and len(nodes_a) == expected_total:
break
_assert_converged(a, b, f"random loss (converged in {round_num + 1} rounds)")
def test_three_peer_partition_heal():
ont = ONTOLOGY
a = GraphStore("peer-a", ont)
b = GraphStore("peer-b", ont)
c = GraphStore("peer-c", ont)
a.add_node("root", "entity", "Root", {"name": "root", "seq": 0})
_sync_bidirectional(a, b)
_sync_bidirectional(a, c)
_assert_converged(a, b, "initial sync a-b")
_assert_converged(a, c, "initial sync a-c")
for i in range(10):
a.add_node(f"a-{i}", "entity", f"A{i}", {"name": f"a-{i}", "seq": i})
b.add_node(f"b-{i}", "entity", f"B{i}", {"name": f"b-{i}", "seq": i})
_sync_bidirectional(a, b)
for i in range(10):
c.add_node(f"c-{i}", "entity", f"C{i}", {"name": f"c-{i}", "seq": i})
assert len(a.all_nodes()) == 21
assert len(c.all_nodes()) == 11
_sync_bidirectional(a, c)
_sync_bidirectional(a, b)
_sync_bidirectional(b, c)
_assert_converged(a, b, "post-heal a-b")
_assert_converged(a, c, "post-heal a-c")
assert len(a.all_nodes()) == 31
def test_concurrent_property_conflict_resolution():
a = GraphStore("peer-a", ONTOLOGY)
a.add_node("target", "entity", "Target", {"name": "original", "seq": 0})
b = GraphStore.from_snapshot("peer-b", a.snapshot())
a.update_property("target", "name", "from-a")
for i in range(5):
b.add_node(f"b-dummy-{i}", "entity", f"D{i}", {"name": f"d{i}", "seq": i})
b.update_property("target", "name", "from-b")
_sync_bidirectional(a, b)
_assert_converged(a, b, "concurrent property conflict")
val = a.get_node("target")["properties"]["name"]
assert val in ("from-a", "from-b"), f"unexpected value: {val}"
def test_rapid_fire_sync():
a = GraphStore("peer-a", ONTOLOGY)
b = GraphStore("peer-b", ONTOLOGY)
for i in range(100):
if i % 2 == 0:
a.add_node(f"n-{i}", "entity", f"N{i}", {"name": f"n-{i}", "seq": i})
else:
b.add_node(f"n-{i}", "entity", f"N{i}", {"name": f"n-{i}", "seq": i})
if i % 10 == 9:
_sync_bidirectional(a, b)
_sync_bidirectional(a, b)
_assert_converged(a, b, "rapid fire")
assert len(a.all_nodes()) == 100
ok_a, _ = a.verify_integrity()
ok_b, _ = b.verify_integrity()
assert ok_a and ok_b
if __name__ == "__main__":
tests = [
("F1: Message loss recovery", test_message_loss_recovery),
("F2: Duplicate delivery", test_duplicate_delivery_idempotent),
("F3: Corrupted payload", test_corrupted_payload_rejected),
("F4: Truncated payload", test_truncated_payload_rejected),
("F5: Convergence under 50% loss", test_convergence_under_random_loss),
("F6: Three-peer partition heal", test_three_peer_partition_heal),
("F7: Concurrent property conflict", test_concurrent_property_conflict_resolution),
("F8: Rapid fire sync", test_rapid_fire_sync),
]
passed = 0
failed = 0
for name, fn in tests:
print(f"\n{'='*60}")
print(f"EXP-06: {name}")
print(f"{'='*60}")
try:
fn()
print(f" RESULT: PASS")
passed += 1
except AssertionError as e:
print(f" RESULT: FAIL — {e}")
failed += 1
except Exception as e:
print(f" RESULT: ERROR — {type(e).__name__}: {e}")
failed += 1
print(f"\n{'='*60}")
print(f"EXP-06 Summary: {passed} passed, {failed} failed")
print(f"{'='*60}")