import json
import pytest
from silk import GraphStore
ONTOLOGY = json.dumps(
{
"node_types": {
"entity": {
"description": "A managed thing",
"properties": {
"status": {"value_type": "string"},
"cpu": {"value_type": "float"},
},
},
"signal": {
"description": "An observed fact",
"properties": {
"severity": {"value_type": "string", "required": True},
},
},
},
"edge_types": {
"RUNS_ON": {
"source_types": ["entity"],
"target_types": ["entity"],
"properties": {},
},
"OBSERVES": {
"source_types": ["signal"],
"target_types": ["entity"],
"properties": {},
},
},
}
)
def make_store(instance_id: str) -> GraphStore:
return GraphStore(instance_id, ONTOLOGY)
class TestSyncProtocol:
def test_sync_offer_is_bytes(self):
store = make_store("inst-a")
offer = store.generate_sync_offer()
assert isinstance(offer, bytes)
assert len(offer) > 0
def test_receive_sync_offer_is_bytes(self):
store_a = make_store("inst-a")
store_b = make_store("inst-b")
offer_a = store_a.generate_sync_offer()
payload = store_b.receive_sync_offer(offer_a)
assert isinstance(payload, bytes)
def test_sync_a_to_b(self):
store_a = make_store("inst-a")
store_b = make_store("inst-b")
store_a.add_node("s1", "entity", "Server 1", {"status": "alive"})
store_a.add_node("s2", "entity", "Server 2", {"status": "dead"})
offer_b = store_b.generate_sync_offer()
payload = store_a.receive_sync_offer(offer_b)
merged = store_b.merge_sync_payload(payload)
assert merged >= 2 assert store_b.get_node("s1") is not None
assert store_b.get_node("s2") is not None
assert store_b.get_node("s1")["properties"]["status"] == "alive"
def test_sync_bidirectional_convergence(self):
store_a = make_store("inst-a")
store_b = make_store("inst-b")
store_a.add_node("a1", "entity", "Node from A")
store_b.add_node("b1", "entity", "Node from B")
offer_b = store_b.generate_sync_offer()
payload_a_to_b = store_a.receive_sync_offer(offer_b)
store_b.merge_sync_payload(payload_a_to_b)
offer_a = store_a.generate_sync_offer()
payload_b_to_a = store_b.receive_sync_offer(offer_a)
store_a.merge_sync_payload(payload_b_to_a)
assert store_a.get_node("a1") is not None
assert store_a.get_node("b1") is not None
assert store_b.get_node("a1") is not None
assert store_b.get_node("b1") is not None
def test_sync_is_idempotent(self):
store_a = make_store("inst-a")
store_b = make_store("inst-b")
store_a.add_node("n1", "entity", "Node 1")
offer_b = store_b.generate_sync_offer()
payload = store_a.receive_sync_offer(offer_b)
merged1 = store_b.merge_sync_payload(payload)
assert merged1 >= 1
len_after_first = store_b.len()
offer_b2 = store_b.generate_sync_offer()
payload2 = store_a.receive_sync_offer(offer_b2)
merged2 = store_b.merge_sync_payload(payload2)
assert merged2 == 0
assert store_b.len() == len_after_first
def test_sync_with_edges(self):
store_a = make_store("inst-a")
store_b = make_store("inst-b")
store_a.add_node("svc", "entity", "API Service")
store_a.add_node("srv", "entity", "Server")
store_a.add_edge("e1", "RUNS_ON", "svc", "srv")
offer_b = store_b.generate_sync_offer()
payload = store_a.receive_sync_offer(offer_b)
store_b.merge_sync_payload(payload)
assert store_b.get_node("svc") is not None
assert store_b.get_node("srv") is not None
assert store_b.get_edge("e1") is not None
edges = store_b.all_edges()
assert len(edges) == 1
assert edges[0]["edge_type"] == "RUNS_ON"
def test_sync_graph_queries_work_after_merge(self):
store_a = make_store("inst-a")
store_b = make_store("inst-b")
store_a.add_node("a", "entity", "A")
store_a.add_node("b", "entity", "B")
store_a.add_node("c", "entity", "C")
store_a.add_edge("ab", "RUNS_ON", "a", "b")
store_a.add_edge("bc", "RUNS_ON", "b", "c")
offer_b = store_b.generate_sync_offer()
payload = store_a.receive_sync_offer(offer_b)
store_b.merge_sync_payload(payload)
reachable = store_b.bfs("a")
assert "b" in reachable
assert "c" in reachable
path = store_b.shortest_path("a", "c")
assert path is not None
assert path == ["a", "b", "c"]
class TestSnapshot:
def test_snapshot_roundtrip(self):
store_a = make_store("inst-a")
store_a.add_node("s1", "entity", "Server 1", {"status": "alive"})
store_a.add_node("s2", "entity", "Server 2")
store_a.add_edge("e1", "RUNS_ON", "s1", "s2")
snap_bytes = store_a.snapshot()
assert isinstance(snap_bytes, bytes)
assert len(snap_bytes) > 0
store_b = GraphStore.from_snapshot("inst-b", snap_bytes)
assert store_b.get_node("s1") is not None
assert store_b.get_node("s2") is not None
assert store_b.get_edge("e1") is not None
assert store_b.get_node("s1")["properties"]["status"] == "alive"
def test_snapshot_then_delta_sync(self):
store_a = make_store("inst-a")
store_a.add_node("s1", "entity", "Server 1")
snap = store_a.snapshot()
store_b = GraphStore.from_snapshot("inst-b", snap)
store_a.add_node("s2", "entity", "Server 2")
offer_b = store_b.generate_sync_offer()
payload = store_a.receive_sync_offer(offer_b)
merged = store_b.merge_sync_payload(payload)
assert merged >= 1
assert store_b.get_node("s2") is not None
def test_snapshot_preserves_ontology(self):
store_a = make_store("inst-a")
snap = store_a.snapshot()
store_b = GraphStore.from_snapshot("inst-b", snap)
assert store_b.node_type_names() == store_a.node_type_names()
assert store_b.edge_type_names() == store_a.edge_type_names()
with pytest.raises(ValueError):
store_b.add_node("x", "potato", "Bad type")
def test_snapshot_graph_algorithms(self):
store_a = make_store("inst-a")
store_a.add_node("a", "entity", "A")
store_a.add_node("b", "entity", "B")
store_a.add_edge("ab", "RUNS_ON", "a", "b")
store_b = GraphStore.from_snapshot("inst-b", store_a.snapshot())
path = store_b.shortest_path("a", "b")
assert path == ["a", "b"]
reachable = store_b.bfs("a")
assert "b" in reachable
class TestSyncConflictResolution:
def test_lww_concurrent_property_update(self):
store_a = make_store("inst-a")
store_b = make_store("inst-b")
store_a.add_node("s1", "entity", "Server 1")
offer_b = store_b.generate_sync_offer()
payload = store_a.receive_sync_offer(offer_b)
store_b.merge_sync_payload(payload)
store_a.update_property("s1", "status", "alive") store_b.update_property("s1", "status", "dead")
offer_b2 = store_b.generate_sync_offer()
payload_a = store_a.receive_sync_offer(offer_b2)
store_b.merge_sync_payload(payload_a)
offer_a = store_a.generate_sync_offer()
payload_b = store_b.receive_sync_offer(offer_a)
store_a.merge_sync_payload(payload_b)
val_a = store_a.get_node("s1")["properties"]["status"]
val_b = store_b.get_node("s1")["properties"]["status"]
assert val_a == val_b
def test_add_wins_after_sync(self):
store_a = make_store("inst-a")
store_b = make_store("inst-b")
store_a.add_node("s1", "entity", "Server 1")
offer_b = store_b.generate_sync_offer()
payload = store_a.receive_sync_offer(offer_b)
store_b.merge_sync_payload(payload)
store_a.remove_node("s1")
store_b.add_node("s1", "entity", "Server 1 resurrected")
offer_a = store_a.generate_sync_offer()
payload_for_a = store_b.receive_sync_offer(offer_a)
store_a.merge_sync_payload(payload_for_a)
offer_b2 = store_b.generate_sync_offer()
payload_for_b = store_a.receive_sync_offer(offer_b2)
store_b.merge_sync_payload(payload_for_b)
assert store_a.get_node("s1") is not None
assert store_b.get_node("s1") is not None
class TestEdgeValidationOnSync:
def test_invalid_edge_quarantined_on_sync(self):
permissive = {
"node_types": {
"server": {"properties": {}},
"app": {"properties": {}},
},
"edge_types": {
"RUNS_ON": {
"source_types": ["server", "app"],
"target_types": ["server", "app"],
},
},
}
a = GraphStore("peer-a", permissive)
a.add_node("s1", "server", "Server")
a.add_node("a1", "app", "App")
a.add_edge("bad", "RUNS_ON", "s1", "a1")
strict = {
"node_types": {
"server": {"properties": {}},
"app": {"properties": {}},
},
"edge_types": {
"RUNS_ON": {
"source_types": ["app"],
"target_types": ["server"],
},
},
}
b = GraphStore("peer-b", strict)
offer = b.generate_sync_offer()
payload = a.receive_sync_offer(offer)
b.merge_sync_payload(payload)
assert b.get_edge("bad") is None, "invalid edge should not be queryable"
assert len(b.get_quarantined()) > 0, "invalid edge should be quarantined"
assert b.get_node("s1") is not None
assert b.get_node("a1") is not None
def test_valid_edge_survives_sync(self):
ont = {
"node_types": {"server": {"properties": {}}, "app": {"properties": {}}},
"edge_types": {
"RUNS_ON": {
"source_types": ["app"],
"target_types": ["server"],
},
},
}
a = GraphStore("peer-a", ont)
a.add_node("s1", "server", "Server")
a.add_node("a1", "app", "App")
a.add_edge("e1", "RUNS_ON", "a1", "s1")
b = GraphStore("peer-b", ont)
offer = b.generate_sync_offer()
payload = a.receive_sync_offer(offer)
b.merge_sync_payload(payload)
assert b.get_edge("e1") is not None
assert b.get_edge("e1")["source_id"] == "a1"
assert b.get_edge("e1")["target_id"] == "s1"
assert len(b.get_quarantined()) == 0
def test_nodes_before_edges_in_topological_order(self):
ont = {
"node_types": {"server": {"properties": {}}, "app": {"properties": {}}},
"edge_types": {
"RUNS_ON": {
"source_types": ["app"],
"target_types": ["server"],
},
},
}
a = GraphStore("peer-a", ont)
for i in range(50):
a.add_node(f"s-{i}", "server", f"Server {i}")
a.add_node(f"a-{i}", "app", f"App {i}")
for i in range(50):
a.add_edge(f"e-{i}", "RUNS_ON", f"a-{i}", f"s-{i}")
b = GraphStore("peer-b", ont)
offer = b.generate_sync_offer()
payload = a.receive_sync_offer(offer)
b.merge_sync_payload(payload)
assert len(b.get_quarantined()) == 0
for i in range(50):
edge = b.get_edge(f"e-{i}")
assert edge is not None, f"edge e-{i} missing after sync"
assert edge["source_id"] == f"a-{i}"
assert edge["target_id"] == f"s-{i}"
class TestHLCTieBreaking:
def test_lower_instance_id_wins_tie(self):
ont = {
"node_types": {"entity": {"properties": {"value": {"value_type": "string"}}}},
"edge_types": {},
}
base = GraphStore("base", ont)
base.add_node("n1", "entity", "Node", {"value": "original"})
a = GraphStore.from_snapshot("aaa-peer", base.snapshot())
b = GraphStore.from_snapshot("zzz-peer", base.snapshot())
a.update_property("n1", "value", "from-aaa")
b.update_property("n1", "value", "from-zzz")
offer = b.generate_sync_offer()
payload = a.receive_sync_offer(offer)
b.merge_sync_payload(payload)
offer = a.generate_sync_offer()
payload = b.receive_sync_offer(offer)
a.merge_sync_payload(payload)
val_a = a.get_node("n1")["properties"]["value"]
val_b = b.get_node("n1")["properties"]["value"]
assert val_a == val_b, f"peers diverged: a={val_a}, b={val_b}"
class TestMultiSubtypeSync:
RICH_ONTOLOGY = json.dumps({
"node_types": {
"entity": {
"properties": {},
"subtypes": {
"instance": {"properties": {
"host": {"value_type": "string"},
"priority": {"value_type": "int"},
}},
"capability": {"properties": {
"name": {"value_type": "string"},
"role": {"value_type": "string"},
"status": {"value_type": "string"},
}},
"k8s_cluster": {"properties": {
"name": {"value_type": "string"},
"server_url": {"value_type": "string"},
}},
},
},
"signal": {
"properties": {},
"subtypes": {
"alert": {"properties": {
"severity": {"value_type": "string"},
}},
},
},
"rule": {
"properties": {},
"subtypes": {
"guardrail": {"properties": {
"scope": {"value_type": "string"},
"check_type": {"value_type": "string"},
}},
},
},
},
"edge_types": {
"RUNS_ON": {
"source_types": ["entity"],
"target_types": ["entity"],
"properties": {},
},
"DEPENDS_ON": {
"source_types": ["entity"],
"target_types": ["entity"],
"properties": {},
},
},
})
def _make(self, instance_id: str) -> GraphStore:
return GraphStore(instance_id, self.RICH_ONTOLOGY)
def test_all_subtypes_sync_in_one_round(self):
a = self._make("leader")
b = self._make("joiner")
a.add_node("inst-a", "entity", "leader", {"host": "10.0.0.1", "priority": 100}, subtype="instance")
a.add_node("cap-runtime", "entity", "K3s", {"name": "K3s", "role": "container_runtime", "status": "installed"}, subtype="capability")
a.add_node("cap-gw", "entity", "nginx", {"name": "nginx", "role": "gateway", "status": "installed"}, subtype="capability")
a.add_node("cluster-k3s", "entity", "k3s", {"name": "k3s", "server_url": "https://10.0.0.1:6443"}, subtype="k8s_cluster")
a.add_node("guard-1", "rule", "self-model", {"scope": "update", "check_type": "pre_flight"}, subtype="guardrail")
a.add_edge("cap-runtime-RUNS_ON-inst-a", "RUNS_ON", "cap-runtime", "inst-a")
a.add_edge("cap-gw-DEPENDS_ON-cap-runtime", "DEPENDS_ON", "cap-gw", "cap-runtime")
b.add_node("inst-b", "entity", "joiner", {"host": "10.0.0.2", "priority": 50}, subtype="instance")
offer_b = b.generate_sync_offer()
payload = a.receive_sync_offer(offer_b)
b.merge_sync_payload(payload)
offer_a = a.generate_sync_offer()
payload2 = b.receive_sync_offer(offer_a)
a.merge_sync_payload(payload2)
assert b.get_node("inst-a") is not None, "instance node not synced"
assert b.get_node("cap-runtime") is not None, "capability node not synced"
assert b.get_node("cap-gw") is not None, "capability node not synced"
assert b.get_node("cluster-k3s") is not None, "k8s_cluster node not synced"
assert b.get_node("guard-1") is not None, "guardrail node not synced"
assert b.get_edge("cap-runtime-RUNS_ON-inst-a") is not None, "RUNS_ON edge not synced"
assert b.get_edge("cap-gw-DEPENDS_ON-cap-runtime") is not None, "DEPENDS_ON edge not synced"
assert a.get_node("inst-b") is not None, "reverse sync failed"
def test_subtype_properties_preserved_after_sync(self):
a = self._make("alpha")
b = self._make("beta")
a.add_node("cap-1", "entity", "Docker", {"name": "Docker", "role": "container_runtime", "status": "installed"}, subtype="capability")
a.add_node("guard-2", "rule", "disk check", {"scope": "deploy", "check_type": "post_flight"}, subtype="guardrail")
offer_b = b.generate_sync_offer()
payload = a.receive_sync_offer(offer_b)
b.merge_sync_payload(payload)
cap = b.get_node("cap-1")
assert cap is not None
assert cap["subtype"] == "capability"
assert cap["properties"]["name"] == "Docker"
assert cap["properties"]["role"] == "container_runtime"
assert cap["properties"]["status"] == "installed"
guard = b.get_node("guard-2")
assert guard is not None
assert guard["subtype"] == "guardrail"
assert guard["properties"]["scope"] == "deploy"
def test_many_nodes_all_subtypes_converge(self):
a = self._make("source")
b = self._make("dest")
for i in range(10):
a.add_node(f"inst-{i}", "entity", f"inst-{i}", {"host": f"10.0.0.{i}", "priority": i}, subtype="instance")
a.add_node(f"cap-{i}", "entity", f"cap-{i}", {"name": f"cap-{i}", "role": "test", "status": "installed"}, subtype="capability")
a.add_node(f"cluster-{i}", "entity", f"cluster-{i}", {"name": f"c-{i}", "server_url": f"https://10.0.0.{i}:6443"}, subtype="k8s_cluster")
a.add_node(f"alert-{i}", "signal", f"alert-{i}", {"severity": "warning"}, subtype="alert")
a.add_node(f"guard-{i}", "rule", f"guard-{i}", {"scope": "update", "check_type": "pre_flight"}, subtype="guardrail")
offer_b = b.generate_sync_offer()
payload = a.receive_sync_offer(offer_b)
merged = b.merge_sync_payload(payload)
assert merged > 0
for i in range(10):
assert b.get_node(f"inst-{i}") is not None, f"inst-{i} missing"
assert b.get_node(f"cap-{i}") is not None, f"cap-{i} missing"
assert b.get_node(f"cluster-{i}") is not None, f"cluster-{i} missing"
assert b.get_node(f"alert-{i}") is not None, f"alert-{i} missing"
assert b.get_node(f"guard-{i}") is not None, f"guard-{i} missing"
class TestGenesisDivergence:
SIMPLE_ONT = json.dumps({
"node_types": {
"entity": {"properties": {"status": {"value_type": "string"}}},
},
"edge_types": {},
})
def test_different_instance_ids_produce_different_genesis(self):
a = GraphStore("inst-a", self.SIMPLE_ONT)
b = GraphStore("inst-b", self.SIMPLE_ONT)
entries_a = a.entries_since(None)
entries_b = b.entries_since(None)
assert len(entries_a) == 1
assert len(entries_b) == 1
hash_a = entries_a[0]["hash"]
hash_b = entries_b[0]["hash"]
if hash_a == hash_b:
pass
else:
pass
def test_sync_between_independent_stores_transfers_data(self):
a = GraphStore("inst-a", self.SIMPLE_ONT)
b = GraphStore("inst-b", self.SIMPLE_ONT)
a.add_node("node-a", "entity", "from A", {"status": "active"})
b.add_node("node-b", "entity", "from B", {"status": "active"})
offer_b = b.generate_sync_offer()
payload_a_to_b = a.receive_sync_offer(offer_b)
b.merge_sync_payload(payload_a_to_b)
offer_a = a.generate_sync_offer()
payload_b_to_a = b.receive_sync_offer(offer_a)
a.merge_sync_payload(payload_b_to_a)
assert b.get_node("node-a") is not None, "A's node not synced to B"
assert a.get_node("node-b") is not None, "B's node not synced to A"
def test_sync_independent_stores_multiple_nodes(self):
ont = json.dumps({
"node_types": {
"entity": {"properties": {}, "subtypes": {
"server": {"properties": {"host": {"value_type": "string"}}},
"service": {"properties": {"name": {"value_type": "string"}}},
}},
"rule": {"properties": {}, "subtypes": {
"guardrail": {"properties": {"scope": {"value_type": "string"}}},
}},
},
"edge_types": {
"RUNS_ON": {"source_types": ["entity"], "target_types": ["entity"], "properties": {}},
},
})
a = GraphStore("gamma", ont)
b = GraphStore("delta", ont)
a.add_node("srv-1", "entity", "server-1", {"host": "10.0.0.1"}, subtype="server")
a.add_node("svc-api", "entity", "api", {"name": "api"}, subtype="service")
a.add_node("guard-1", "rule", "disk-check", {"scope": "deploy"}, subtype="guardrail")
a.add_edge("svc-api-RUNS_ON-srv-1", "RUNS_ON", "svc-api", "srv-1")
b.add_node("srv-2", "entity", "server-2", {"host": "10.0.0.2"}, subtype="server")
offer_b = b.generate_sync_offer()
payload = a.receive_sync_offer(offer_b)
b.merge_sync_payload(payload)
offer_a = a.generate_sync_offer()
payload2 = b.receive_sync_offer(offer_a)
a.merge_sync_payload(payload2)
assert b.get_node("srv-1") is not None, "server not synced"
assert b.get_node("svc-api") is not None, "service not synced"
assert b.get_node("guard-1") is not None, "guardrail not synced"
assert b.get_edge("svc-api-RUNS_ON-srv-1") is not None, "edge not synced"
assert a.get_node("srv-2") is not None, "reverse sync failed"
class TestOutgoingEdgesAfterSync:
FLEET_ONTOLOGY = json.dumps({
"node_types": {
"entity": {
"properties": {},
"subtypes": {
"instance": {"properties": {
"host": {"value_type": "string"},
"priority": {"value_type": "int"},
"status": {"value_type": "string"},
}},
"server": {"properties": {
"name": {"value_type": "string"},
"ip_v4": {"value_type": "string"},
}},
"capability": {"properties": {
"name": {"value_type": "string"},
"role": {"value_type": "string"},
"status": {"value_type": "string"},
}},
},
},
},
"edge_types": {
"RUNS_ON": {
"source_types": ["entity"],
"target_types": ["entity"],
"properties": {},
},
"MEMBER_OF": {
"source_types": ["entity"],
"target_types": ["entity"],
"properties": {},
},
},
})
def _make(self, instance_id: str) -> GraphStore:
return GraphStore(instance_id, self.FLEET_ONTOLOGY)
def test_outgoing_edges_after_sync_production_scenario(self):
gamma = self._make("gamma")
delta = self._make("delta")
gamma.add_node("inst-gamma", "entity", "gamma",
{"host": "5.78.44.251", "priority": 100, "status": "active"},
subtype="instance")
gamma.add_node("server-gamma", "entity", "gamma-srv",
{"name": "gamma", "ip_v4": "5.78.44.251"},
subtype="server")
gamma.add_edge("inst-gamma-RUNS_ON-server-gamma", "RUNS_ON",
"inst-gamma", "server-gamma")
delta.add_node("inst-delta", "entity", "delta",
{"host": "5.78.81.60", "priority": 50, "status": "active"},
subtype="instance")
delta.add_node("server-delta", "entity", "delta-srv",
{"name": "delta", "ip_v4": "5.78.81.60"},
subtype="server")
delta.add_edge("inst-delta-RUNS_ON-server-delta", "RUNS_ON",
"inst-delta", "server-delta")
offer_d = delta.generate_sync_offer()
payload = gamma.receive_sync_offer(offer_d)
delta.merge_sync_payload(payload)
offer_g = gamma.generate_sync_offer()
payload2 = delta.receive_sync_offer(offer_g)
gamma.merge_sync_payload(payload2)
delta_edges = delta.outgoing_edges("inst-gamma")
assert len(delta_edges) > 0, (
"outgoing_edges('inst-gamma') returned [] on delta after sync. "
"all_edges shows: " + str([(e["edge_id"], e["source_id"], e["target_id"])
for e in delta.all_edges()])
)
assert delta_edges[0]["edge_type"] == "RUNS_ON"
assert delta_edges[0]["target_id"] == "server-gamma"
gamma_edges = gamma.outgoing_edges("inst-delta")
assert len(gamma_edges) > 0, "outgoing_edges('inst-delta') returned [] on gamma"
assert gamma_edges[0]["target_id"] == "server-delta"
def test_outgoing_edges_after_sync_with_capabilities(self):
gamma = self._make("gamma")
delta = self._make("delta")
gamma.add_node("inst-gamma", "entity", "gamma",
{"host": "10.0.0.1", "priority": 100, "status": "active"},
subtype="instance")
gamma.add_node("server-gamma", "entity", "srv",
{"name": "gamma", "ip_v4": "10.0.0.1"},
subtype="server")
gamma.add_node("cap-k3s", "entity", "K3s",
{"name": "K3s", "role": "container_runtime", "status": "installed"},
subtype="capability")
gamma.add_node("cap-nginx", "entity", "nginx",
{"name": "nginx", "role": "gateway", "status": "installed"},
subtype="capability")
gamma.add_edge("inst-gamma-RUNS_ON-server-gamma", "RUNS_ON",
"inst-gamma", "server-gamma")
gamma.add_edge("cap-k3s-RUNS_ON-server-gamma", "RUNS_ON",
"cap-k3s", "server-gamma")
gamma.add_edge("cap-nginx-RUNS_ON-server-gamma", "RUNS_ON",
"cap-nginx", "server-gamma")
delta.add_node("inst-delta", "entity", "delta",
{"host": "10.0.0.2", "priority": 50, "status": "active"},
subtype="instance")
offer_d = delta.generate_sync_offer()
payload = gamma.receive_sync_offer(offer_d)
delta.merge_sync_payload(payload)
inst_edges = delta.outgoing_edges("inst-gamma")
assert len(inst_edges) == 1, f"expected 1 RUNS_ON from inst-gamma, got {len(inst_edges)}"
k3s_edges = delta.outgoing_edges("cap-k3s")
assert len(k3s_edges) == 1, f"expected 1 RUNS_ON from cap-k3s, got {len(k3s_edges)}"
nginx_edges = delta.outgoing_edges("cap-nginx")
assert len(nginx_edges) == 1, f"expected 1 RUNS_ON from cap-nginx, got {len(nginx_edges)}"
incoming = delta.incoming_edges("server-gamma")
assert len(incoming) == 3, f"expected 3 incoming on server-gamma, got {len(incoming)}"
def test_outgoing_edges_survives_tombstone_resurrection(self):
a = self._make("store-a")
b = self._make("store-b")
a.add_node("n1", "entity", "n1",
{"host": "x", "priority": 1, "status": "active"}, subtype="instance")
a.add_node("n2", "entity", "n2",
{"name": "s", "ip_v4": "x"}, subtype="server")
a.add_edge("e1", "RUNS_ON", "n1", "n2")
a.remove_edge("e1")
a.add_edge("e1", "RUNS_ON", "n1", "n2")
offer_b = b.generate_sync_offer()
payload = a.receive_sync_offer(offer_b)
b.merge_sync_payload(payload)
edges = b.outgoing_edges("n1")
assert len(edges) == 1, f"expected 1 edge after resurrection sync, got {len(edges)}"
assert edges[0]["edge_id"] == "e1"
def test_outgoing_edges_after_persistent_reload(self, tmp_path):
path_a = str(tmp_path / "a.redb")
a = GraphStore("inst-a", self.FLEET_ONTOLOGY, path=path_a)
a.add_node("n1", "entity", "n1",
{"host": "x", "priority": 1, "status": "active"}, subtype="instance")
a.add_node("n2", "entity", "n2",
{"name": "s", "ip_v4": "x"}, subtype="server")
a.add_edge("e1", "RUNS_ON", "n1", "n2")
assert len(a.outgoing_edges("n1")) == 1
del a
a2 = GraphStore("inst-a", self.FLEET_ONTOLOGY, path=path_a)
edges = a2.outgoing_edges("n1")
assert len(edges) == 1, f"outgoing_edges empty after redb reload, got {len(edges)}"
assert edges[0]["edge_id"] == "e1"
def test_outgoing_edges_after_sync_then_reload(self, tmp_path):
path_b = str(tmp_path / "b.redb")
a = self._make("gamma")
a.add_node("inst-gamma", "entity", "gamma",
{"host": "10.0.0.1", "priority": 100, "status": "active"},
subtype="instance")
a.add_node("server-gamma", "entity", "srv",
{"name": "gamma", "ip_v4": "10.0.0.1"},
subtype="server")
a.add_edge("inst-gamma-RUNS_ON-server-gamma", "RUNS_ON",
"inst-gamma", "server-gamma")
b = GraphStore("delta", self.FLEET_ONTOLOGY, path=path_b)
b.add_node("inst-delta", "entity", "delta",
{"host": "10.0.0.2", "priority": 50, "status": "active"},
subtype="instance")
offer_b = b.generate_sync_offer()
payload = a.receive_sync_offer(offer_b)
b.merge_sync_payload(payload)
assert len(b.outgoing_edges("inst-gamma")) == 1, "outgoing_edges failed before reload"
del b
b2 = GraphStore("delta", self.FLEET_ONTOLOGY, path=path_b)
edges = b2.outgoing_edges("inst-gamma")
assert len(edges) == 1, (
f"outgoing_edges('inst-gamma') empty after sync + redb reload. "
f"all_edges: {[(e['edge_id'], e['source_id'], e['target_id']) for e in b2.all_edges()]}"
)
assert edges[0]["edge_type"] == "RUNS_ON"
assert edges[0]["target_id"] == "server-gamma"