import os
import re
import random
import pytest
from silk import GraphStore
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
def test_proof_coverage():
proof_path = os.path.join(ROOT, "PROOF.md")
with open(proof_path) as f:
proof_text = f.read()
invariants = set(re.findall(r"\bI-0[1-9]\b", proof_text))
theorems = set(re.findall(r"Theorem [1-9]", proof_text))
assert len(invariants) >= 6, f"Expected at least 6 invariants, found {invariants}"
assert len(theorems) >= 3, f"Expected at least 3 theorems, found {theorems}"
test_dir = os.path.join(ROOT, "pytests")
test_content = ""
for fname in os.listdir(test_dir):
if fname.startswith("test_") and fname.endswith(".py"):
with open(os.path.join(test_dir, fname)) as f:
test_content += f.read()
missing_invariants = []
for inv in sorted(invariants):
if inv not in test_content:
missing_invariants.append(inv)
missing_theorems = []
for thm in sorted(theorems):
if thm not in test_content:
missing_theorems.append(thm)
assert not missing_invariants, (
f"PROOF.md invariants without test coverage: {missing_invariants}. "
f"Add tests that reference these identifiers."
)
assert not missing_theorems, (
f"PROOF.md theorems without test coverage: {missing_theorems}. "
f"Add tests that reference these identifiers."
)
def test_serialization_determinism():
store = GraphStore("det-test", {
"node_types": {
"server": {
"properties": {
"name": {"value_type": "string", "required": True},
"port": {"value_type": "int"},
"tags": {"value_type": "list"},
}
}
},
"edge_types": {
"CONNECTS": {
"source_types": ["server"],
"target_types": ["server"],
}
}
})
store.add_node("s1", "server", "Alpha", {
"name": "alpha",
"port": 8080,
"tags": ["prod", "us-east"],
})
store.add_node("s2", "server", "Beta", {"name": "beta", "port": 443})
store.add_edge("e1", "CONNECTS", "s1", "s2")
store.update_property("s1", "port", 9090)
snap_bytes = store.snapshot()
for i in range(100):
snap_bytes_again = store.snapshot()
assert snap_bytes == snap_bytes_again, (
f"Serialization non-determinism detected on iteration {i}"
)
store2 = GraphStore.from_snapshot("det-test-2", snap_bytes)
assert sorted([n["node_id"] for n in store.all_nodes()]) == \
sorted([n["node_id"] for n in store2.all_nodes()])
def _random_store(seed, store_id, ontology):
rng = random.Random(seed)
store = GraphStore(store_id, ontology)
node_ids = []
num_nodes = rng.randint(3, 15)
for i in range(num_nodes):
nid = f"n-{store_id}-{i}"
props = {"name": f"node-{i}", "value": rng.randint(0, 100)}
store.add_node(nid, "entity", f"Label-{i}", props)
node_ids.append(nid)
if len(node_ids) >= 2:
num_edges = rng.randint(1, min(10, len(node_ids)))
for i in range(num_edges):
src = rng.choice(node_ids)
tgt = rng.choice(node_ids)
if src != tgt:
store.add_edge(f"e-{store_id}-{i}", "LINKS", src, tgt)
num_updates = rng.randint(0, 8)
for _ in range(num_updates):
nid = rng.choice(node_ids)
store.update_property(nid, "value", rng.randint(0, 1000))
if rng.random() < 0.3 and len(node_ids) > 2:
store.remove_node(rng.choice(node_ids))
return store
def test_sync_convergence_randomized():
ontology = {
"node_types": {
"entity": {
"properties": {
"name": {"value_type": "string"},
"value": {"value_type": "int"},
}
}
},
"edge_types": {
"LINKS": {
"source_types": ["entity"],
"target_types": ["entity"],
}
}
}
for scenario in range(20):
seed_a = scenario * 1000
seed_b = scenario * 1000 + 500
store_a = _random_store(seed_a, f"a-{scenario}", ontology)
store_b = _random_store(seed_b, f"b-{scenario}", ontology)
offer_b = store_b.generate_sync_offer()
payload_a = store_a.receive_sync_offer(offer_b)
store_b.merge_sync_payload(payload_a)
offer_a = store_a.generate_sync_offer()
payload_b = store_b.receive_sync_offer(offer_a)
store_a.merge_sync_payload(payload_b)
nodes_a = sorted([n["node_id"] for n in store_a.all_nodes()])
nodes_b = sorted([n["node_id"] for n in store_b.all_nodes()])
assert nodes_a == nodes_b, (
f"Scenario {scenario}: node sets diverged. "
f"A has {len(nodes_a)}, B has {len(nodes_b)}"
)
edges_a = sorted([e["edge_id"] for e in store_a.all_edges()])
edges_b = sorted([e["edge_id"] for e in store_b.all_edges()])
assert edges_a == edges_b, (
f"Scenario {scenario}: edge sets diverged."
)
for nid in nodes_a:
na = store_a.get_node(nid)
nb = store_b.get_node(nid)
assert na["properties"] == nb["properties"], (
f"Scenario {scenario}: properties diverged for {nid}"
)
VALIDATED_OPS = {"AddNode", "AddEdge", "UpdateProperty"}
SKIP_VALIDATION_OPS = {"RemoveNode", "RemoveEdge", "DefineOntology", "ExtendOntology", "Checkpoint", "DefineLens"}
ALL_KNOWN_OPS = VALIDATED_OPS | SKIP_VALIDATION_OPS
def test_all_graph_ops_have_validation_path():
entry_rs = os.path.join(ROOT, "src", "entry.rs")
with open(entry_rs) as f:
content = f.read()
variants = set(re.findall(r'#\[serde\(rename = "(\w+)"\)\]', content))
def to_pascal(s):
return "".join(word.capitalize() for word in s.split("_"))
actual_variants = {to_pascal(v) for v in variants}
unknown = actual_variants - ALL_KNOWN_OPS
assert not unknown, (
f"New GraphOp variants found without validation classification: {unknown}. "
f"Add them to VALIDATED_OPS or SKIP_VALIDATION_OPS in test_invariants.py."
)
removed = ALL_KNOWN_OPS - actual_variants
assert not removed, (
f"GraphOp variants in test list but not in entry.rs: {removed}. "
f"Remove them from the classification lists."
)
def test_validated_ops_actually_validate():
store = GraphStore("val-test", {
"node_types": {
"server": {
"properties": {
"name": {"value_type": "string", "required": True},
"port": {"value_type": "int"},
}
}
},
"edge_types": {
"CONNECTS": {
"source_types": ["server"],
"target_types": ["server"],
}
}
})
with pytest.raises((ValueError, Exception)):
store.add_node("x", "nonexistent_type", "X", {})
store.add_node("s1", "server", "S1", {"name": "s1"})
store.add_node("s2", "server", "S2", {"name": "s2"})
with pytest.raises((ValueError, Exception)):
store.add_edge("e1", "NONEXISTENT", "s1", "s2")
with pytest.raises((ValueError, Exception)):
store.update_property("s1", "port", "not_an_int")
def test_version_consistency():
cargo_path = os.path.join(ROOT, "Cargo.toml")
with open(cargo_path) as f:
cargo = f.read()
cargo_match = re.search(r'^version\s*=\s*"([^"]+)"', cargo, re.MULTILINE)
assert cargo_match, "No version found in Cargo.toml"
cargo_version = cargo_match.group(1)
pyproject_path = os.path.join(ROOT, "pyproject.toml")
with open(pyproject_path) as f:
pyproject = f.read()
py_match = re.search(r'^version\s*=\s*"([^"]+)"', pyproject, re.MULTILINE)
assert py_match, "No version found in pyproject.toml"
py_version = py_match.group(1)
changelog_path = os.path.join(ROOT, "CHANGELOG.md")
with open(changelog_path) as f:
changelog = f.read()
cl_match = re.search(r"^## \[([^\]]+)\]", changelog, re.MULTILINE)
cl_versions = re.findall(r"^## \[([^\]]+)\]", changelog, re.MULTILINE)
cl_versions = [v for v in cl_versions if v != "Unreleased"]
assert cl_versions, "No version entries in CHANGELOG.md"
changelog_version = cl_versions[0]
assert cargo_version == py_version, (
f"Version mismatch: Cargo.toml={cargo_version}, pyproject.toml={py_version}"
)
assert cargo_version == changelog_version, (
f"Version mismatch: Cargo.toml={cargo_version}, CHANGELOG.md={changelog_version}"
)
def test_verify_integrity():
store = GraphStore("int-test", {
"node_types": {
"entity": {
"properties": {"name": {"value_type": "string"}}
}
},
"edge_types": {
"LINKS": {
"source_types": ["entity"],
"target_types": ["entity"],
}
}
})
store.add_node("n1", "entity", "A", {"name": "alpha"})
store.add_node("n2", "entity", "B", {"name": "beta"})
store.add_edge("e1", "LINKS", "n1", "n2")
store.update_property("n1", "name", "alpha-updated")
ok, errors = store.verify_integrity()
assert ok, f"Integrity check failed on clean store: {errors}"
assert errors == []
def test_verify_integrity_after_sync():
ontology = {
"node_types": {
"entity": {
"properties": {"name": {"value_type": "string"}}
}
},
"edge_types": {}
}
store_a = GraphStore("a", ontology)
store_b = GraphStore("b", ontology)
store_a.add_node("n1", "entity", "A", {"name": "from-a"})
store_b.add_node("n2", "entity", "B", {"name": "from-b"})
offer = store_b.generate_sync_offer()
payload = store_a.receive_sync_offer(offer)
store_b.merge_sync_payload(payload)
offer = store_a.generate_sync_offer()
payload = store_b.receive_sync_offer(offer)
store_a.merge_sync_payload(payload)
ok_a, errors_a = store_a.verify_integrity()
ok_b, errors_b = store_b.verify_integrity()
assert ok_a, f"Store A integrity failed after sync: {errors_a}"
assert ok_b, f"Store B integrity failed after sync: {errors_b}"
def test_verify_integrity_after_compaction():
store = GraphStore("comp-test", {
"node_types": {
"entity": {
"properties": {"name": {"value_type": "string"}}
}
},
"edge_types": {}
})
for i in range(10):
store.add_node(f"n{i}", "entity", f"Node {i}", {"name": f"name-{i}"})
store.compact()
ok, errors = store.verify_integrity()
assert ok, f"Integrity failed after compaction: {errors}"