import json
import pytest
from silk import GraphStore
def _ontology_with_subtypes():
return {
"node_types": {
"entity": {
"subtypes": {
"server": {
"properties": {
"hostname": {"value_type": "string", "required": True},
"status": {"value_type": "string"},
}
},
"project": {
"properties": {
"name": {"value_type": "string", "required": True},
"slug": {"value_type": "string", "required": True},
}
},
}
},
"signal": {
"subtypes": {
"alert": {
"properties": {
"severity": {"value_type": "string", "required": True},
"message": {"value_type": "string"},
}
},
}
},
"rule": {
"properties": {
"name": {"value_type": "string", "required": True},
}
},
},
"edge_types": {
"OBSERVES": {
"source_types": ["signal"],
"target_types": ["entity"],
"properties": {},
},
"GUARDS": {
"source_types": ["rule"],
"target_types": ["entity"],
"properties": {},
},
},
}
def _make_store(ontology=None):
if ontology is None:
ontology = _ontology_with_subtypes()
return GraphStore("test", json.dumps(ontology))
def test_add_node_with_valid_subtype():
store = _make_store()
store.add_node("srv-1", "entity", "server-1", {"hostname": "web01"}, subtype="server")
node = store.get_node("srv-1")
assert node is not None
assert node["node_type"] == "entity"
assert node["subtype"] == "server"
assert node["properties"]["hostname"] == "web01"
def test_add_node_different_subtypes():
store = _make_store()
store.add_node("srv-1", "entity", "srv", {"hostname": "web01"}, subtype="server")
store.add_node("proj-1", "entity", "proj", {"name": "api", "slug": "api"}, subtype="project")
srv = store.get_node("srv-1")
proj = store.get_node("proj-1")
assert srv["subtype"] == "server"
assert proj["subtype"] == "project"
def test_get_node_returns_subtype():
store = _make_store()
store.add_node("srv-1", "entity", "srv", {"hostname": "web01"}, subtype="server")
node = store.get_node("srv-1")
assert "subtype" in node
assert node["subtype"] == "server"
def test_get_node_no_subtype_when_type_has_no_subtypes():
store = _make_store()
store.add_node("r1", "rule", "rule-1", {"name": "cpu-limit"})
node = store.get_node("r1")
assert node["subtype"] is None
def test_missing_subtype_when_required():
store = _make_store()
with pytest.raises(ValueError, match="subtype"):
store.add_node("srv-1", "entity", "srv", {"hostname": "web01"})
def test_unknown_subtype_accepted():
store = _make_store()
store.add_node("x", "entity", "x", {"hostname": "h"}, subtype="nonexistent")
node = store.get_node("x")
assert node is not None
assert node["subtype"] == "nonexistent"
def test_subtype_required_properties_enforced():
store = _make_store()
with pytest.raises(ValueError, match="hostname"):
store.add_node("srv-1", "entity", "srv", {}, subtype="server")
def test_subtype_required_properties_different_per_subtype():
store = _make_store()
with pytest.raises(ValueError, match="name"):
store.add_node("proj-1", "entity", "proj", {}, subtype="project")
with pytest.raises(ValueError, match="hostname"):
store.add_node("srv-1", "entity", "srv", {"name": "x"}, subtype="server")
def test_subtype_property_type_validation():
store = _make_store()
with pytest.raises(ValueError):
store.add_node("srv-1", "entity", "srv", {"hostname": 123}, subtype="server")
def test_subtype_unknown_property_accepted():
store = _make_store()
store.add_node("srv-1", "entity", "srv", {"hostname": "h", "bogus": "x"}, subtype="server")
node = store.get_node("srv-1")
assert node["properties"]["bogus"] == "x"
def test_type_without_subtypes_works_as_before():
store = _make_store()
store.add_node("r1", "rule", "rule-1", {"name": "cpu-limit"})
node = store.get_node("r1")
assert node["node_type"] == "rule"
assert node["properties"]["name"] == "cpu-limit"
def test_type_without_subtypes_accepts_subtype_arg():
store = _make_store()
store.add_node("r1", "rule", "rule-1", {"name": "x"}, subtype="guardrail")
node = store.get_node("r1")
assert node["subtype"] == "guardrail"
def test_type_without_subtypes_validates_properties():
store = _make_store()
with pytest.raises(ValueError, match="name"):
store.add_node("r1", "rule", "rule-1", {})
def test_edge_uses_top_level_type_not_subtype():
store = _make_store()
store.add_node("a1", "signal", "alert", {"severity": "high"}, subtype="alert")
store.add_node("srv-1", "entity", "srv", {"hostname": "web01"}, subtype="server")
store.add_node("proj-1", "entity", "proj", {"name": "api", "slug": "api"}, subtype="project")
store.add_edge("e1", "OBSERVES", "a1", "srv-1")
store.add_edge("e2", "OBSERVES", "a1", "proj-1")
def test_edge_rejects_wrong_top_level_type():
store = _make_store()
store.add_node("srv-1", "entity", "srv", {"hostname": "web01"}, subtype="server")
store.add_node("srv-2", "entity", "srv2", {"hostname": "web02"}, subtype="server")
with pytest.raises(ValueError, match="OBSERVES"):
store.add_edge("e1", "OBSERVES", "srv-1", "srv-2")
def test_query_by_type_returns_all_subtypes():
store = _make_store()
store.add_node("srv-1", "entity", "srv", {"hostname": "h1"}, subtype="server")
store.add_node("proj-1", "entity", "proj", {"name": "a", "slug": "a"}, subtype="project")
nodes = store.query_nodes_by_type("entity")
assert len(nodes) == 2
subtypes = {n["subtype"] for n in nodes}
assert subtypes == {"server", "project"}
def test_sync_preserves_subtype():
ontology = json.dumps(_ontology_with_subtypes())
store_a = GraphStore("a", ontology)
store_a.add_node("srv-1", "entity", "srv", {"hostname": "h1"}, subtype="server")
snap = store_a.snapshot()
store_b = GraphStore.from_snapshot("b", snap)
node = store_b.get_node("srv-1")
assert node is not None
assert node["subtype"] == "server"
def test_subscription_event_includes_subtype():
store = _make_store()
events = []
store.subscribe(lambda e: events.append(e))
store.add_node("srv-1", "entity", "srv", {"hostname": "h1"}, subtype="server")
assert len(events) == 1
assert events[0]["op"] == "add_node"
assert events[0]["node_type"] == "entity"
assert events[0]["subtype"] == "server"
def test_subscription_event_no_subtype_when_none():
store = _make_store()
events = []
store.subscribe(lambda e: events.append(e))
store.add_node("r1", "rule", "rule-1", {"name": "x"})
assert len(events) == 1
assert events[0].get("subtype") is None