import json
import pytest
from silk import GraphStore
ONTOLOGY = json.dumps(
{
"node_types": {
"entity": {
"description": "A thing",
"properties": {
"status": {"value_type": "string"},
"cpu": {"value_type": "float"},
},
},
"signal": {
"description": "An observation",
"properties": {
"severity": {"value_type": "string", "required": True},
},
},
},
"edge_types": {
"OBSERVES": {
"source_types": ["signal"],
"target_types": ["entity"],
"properties": {},
},
},
}
)
@pytest.fixture
def store():
return GraphStore("test-instance", ONTOLOGY)
def test_subscription_fires_on_add_node(store):
events = []
store.subscribe(lambda e: events.append(e))
store.add_node("s1", "entity", "Server 1", {"status": "alive"})
assert len(events) == 1
assert events[0]["op"] == "add_node"
assert events[0]["node_id"] == "s1"
def test_subscription_fires_on_update_property(store):
store.add_node("s1", "entity", "Server 1", {"status": "alive"})
events = []
store.subscribe(lambda e: events.append(e))
store.update_property("s1", "status", "dead")
assert len(events) == 1
assert events[0]["op"] == "update_property"
assert events[0]["entity_id"] == "s1"
assert events[0]["key"] == "status"
assert events[0]["value"] == "dead"
def test_subscription_fires_on_add_edge(store):
store.add_node("s1", "entity", "Server 1")
store.add_node("sig1", "signal", "Alert", {"severity": "high"})
events = []
store.subscribe(lambda e: events.append(e))
store.add_edge("e1", "OBSERVES", "sig1", "s1")
assert len(events) == 1
assert events[0]["op"] == "add_edge"
assert events[0]["edge_id"] == "e1"
assert events[0]["edge_type"] == "OBSERVES"
assert events[0]["source_id"] == "sig1"
assert events[0]["target_id"] == "s1"
def test_subscription_fires_on_remove_node(store):
store.add_node("s1", "entity", "Server 1")
events = []
store.subscribe(lambda e: events.append(e))
store.remove_node("s1")
assert len(events) == 1
assert events[0]["op"] == "remove_node"
assert events[0]["node_id"] == "s1"
def test_subscription_fires_on_remove_edge(store):
store.add_node("s1", "entity", "Server 1")
store.add_node("sig1", "signal", "Alert", {"severity": "high"})
store.add_edge("e1", "OBSERVES", "sig1", "s1")
events = []
store.subscribe(lambda e: events.append(e))
store.remove_edge("e1")
assert len(events) == 1
assert events[0]["op"] == "remove_edge"
assert events[0]["edge_id"] == "e1"
def test_event_fields_add_node(store):
events = []
store.subscribe(lambda e: events.append(e))
store.add_node("s1", "entity", "Server 1", {"status": "alive"})
e = events[0]
assert isinstance(e["hash"], str) and len(e["hash"]) == 64
assert e["op"] == "add_node"
assert e["node_id"] == "s1"
assert e["node_type"] == "entity"
assert e["author"] == "test-instance"
assert isinstance(e["physical_ms"], int) and e["physical_ms"] > 0
assert isinstance(e["logical"], int)
assert e["local"] is True
def test_event_fields_update_property(store):
store.add_node("s1", "entity", "Server 1", {"status": "alive"})
events = []
store.subscribe(lambda e: events.append(e))
store.update_property("s1", "cpu", 85.5)
e = events[0]
assert e["op"] == "update_property"
assert e["entity_id"] == "s1"
assert e["key"] == "cpu"
assert e["value"] == 85.5
assert e["local"] is True
def test_event_fields_add_edge(store):
store.add_node("s1", "entity", "Server 1")
store.add_node("sig1", "signal", "Alert", {"severity": "high"})
events = []
store.subscribe(lambda e: events.append(e))
store.add_edge("e1", "OBSERVES", "sig1", "s1")
e = events[0]
assert e["op"] == "add_edge"
assert e["edge_id"] == "e1"
assert e["edge_type"] == "OBSERVES"
assert e["source_id"] == "sig1"
assert e["target_id"] == "s1"
assert e["local"] is True
def test_event_local_true_for_local_write(store):
events = []
store.subscribe(lambda e: events.append(e))
store.add_node("s1", "entity", "Server 1")
assert events[0]["local"] is True
def test_event_local_false_for_merge(store):
store_b = GraphStore("inst-b", ONTOLOGY)
store_b.add_node("s1", "entity", "Server 1", {"status": "alive"})
events = []
store.subscribe(lambda e: events.append(e))
offer_a = store.generate_sync_offer()
payload_for_a = store_b.receive_sync_offer(offer_a)
store.merge_sync_payload(payload_for_a)
add_events = [e for e in events if e["op"] == "add_node"]
assert len(add_events) == 1
assert add_events[0]["local"] is False
assert add_events[0]["author"] == "inst-b"
def test_multiple_subscribers_all_fire(store):
events_a = []
events_b = []
store.subscribe(lambda e: events_a.append(e))
store.subscribe(lambda e: events_b.append(e))
store.add_node("s1", "entity", "Server 1")
assert len(events_a) == 1
assert len(events_b) == 1
assert events_a[0]["hash"] == events_b[0]["hash"]
def test_unsubscribe_stops_callbacks(store):
events = []
sub_id = store.subscribe(lambda e: events.append(e))
store.add_node("s1", "entity", "Server 1")
assert len(events) == 1
store.unsubscribe(sub_id)
store.add_node("s2", "entity", "Server 2")
assert len(events) == 1
def test_subscriber_error_does_not_block_write(store):
def bad_callback(event):
raise RuntimeError("subscriber bug")
store.subscribe(bad_callback)
store.add_node("s1", "entity", "Server 1")
node = store.get_node("s1")
assert node is not None
assert node["node_id"] == "s1"
def test_subscriber_receives_events_in_order(store):
events = []
store.subscribe(lambda e: events.append(e))
store.add_node("s1", "entity", "Server 1")
store.add_node("s2", "entity", "Server 2")
store.update_property("s1", "status", "dead")
assert len(events) == 3
assert events[0]["node_id"] == "s1"
assert events[1]["node_id"] == "s2"
assert events[2]["entity_id"] == "s1"