import json
import threading
import time
import pytest
from silk import GraphStore
ONTOLOGY = json.dumps({
"node_types": {
"entity": {"properties": {"name": {"value_type": "string"}}},
},
"edge_types": {},
})
def make_store(instance_id: str = "test") -> GraphStore:
return GraphStore(instance_id, ONTOLOGY)
class TestTailBasics:
def test_subscribe_from_empty_returns_all(self):
store = make_store()
store.add_node("n1", "entity", "N1", {"name": "one"})
store.add_node("n2", "entity", "N2", {"name": "two"})
sub = store.subscribe_from([])
entries = sub.next_batch(timeout_ms=100, max_count=100)
assert len(entries) == 3
sub.close()
def test_subscribe_from_current_heads_returns_empty(self):
store = make_store()
store.add_node("n1", "entity", "N1", {"name": "one"})
sub = store.subscribe_from(store.heads())
entries = sub.next_batch(timeout_ms=50, max_count=100)
assert entries == []
sub.close()
def test_subscribe_from_partial_cursor_returns_delta(self):
store = make_store()
store.add_node("n1", "entity", "N1", {"name": "one"})
cursor_at_n1 = store.heads()
store.add_node("n2", "entity", "N2", {"name": "two"})
store.add_node("n3", "entity", "N3", {"name": "three"})
sub = store.subscribe_from(cursor_at_n1)
entries = sub.next_batch(timeout_ms=100, max_count=100)
assert len(entries) == 2
sub.close()
def test_next_batch_advances_cursor(self):
store = make_store()
store.add_node("n1", "entity", "N1", {"name": "one"})
sub = store.subscribe_from([])
first_cursor = sub.current_cursor()
assert first_cursor == []
entries = sub.next_batch(timeout_ms=100, max_count=100)
assert len(entries) >= 2
new_cursor = sub.current_cursor()
assert new_cursor == store.heads()
sub.close()
def test_next_batch_max_count(self):
store = make_store()
for i in range(10):
store.add_node(f"n{i}", "entity", f"N{i}", {"name": f"node-{i}"})
sub = store.subscribe_from([])
batch = sub.next_batch(timeout_ms=100, max_count=3)
assert len(batch) == 3
sub.close()
def test_next_batch_timeout_returns_empty(self):
store = make_store()
sub = store.subscribe_from(store.heads())
t0 = time.perf_counter()
entries = sub.next_batch(timeout_ms=100, max_count=10)
elapsed = (time.perf_counter() - t0) * 1000
assert entries == []
assert 50 < elapsed < 500
sub.close()
class TestTailNotification:
def test_waiter_wakes_on_local_append(self):
store = make_store()
sub = store.subscribe_from(store.heads())
results = []
def waiter():
entries = sub.next_batch(timeout_ms=3000, max_count=10)
results.append(entries)
t = threading.Thread(target=waiter)
t.start()
time.sleep(0.05)
store.add_node("n1", "entity", "N1", {"name": "from-main"})
t.join(timeout=2)
assert not t.is_alive(), "waiter did not wake"
assert len(results) == 1
assert len(results[0]) == 1
sub.close()
def test_multiple_waiters_all_wake(self):
store = make_store()
subs = [store.subscribe_from(store.heads()) for _ in range(2)]
results = [[], []]
def waiter(idx):
entries = subs[idx].next_batch(timeout_ms=3000, max_count=10)
results[idx].extend(entries)
threads = [threading.Thread(target=waiter, args=(i,)) for i in range(2)]
for t in threads:
t.start()
time.sleep(0.05)
store.add_node("n1", "entity", "N1", {"name": "one"})
for t in threads:
t.join(timeout=2)
assert not t.is_alive()
assert len(results[0]) == 1
assert len(results[1]) == 1
for s in subs:
s.close()
class TestTailSyncIntegration:
def test_waiter_wakes_on_sync_merge(self):
a = make_store("peer-a")
b = make_store("peer-b")
a.add_node("from-b-will-sync", "entity", "remote", {"name": "hi"})
sub = b.subscribe_from(b.heads())
results = []
def waiter():
entries = sub.next_batch(timeout_ms=3000, max_count=100)
results.append(entries)
t = threading.Thread(target=waiter)
t.start()
time.sleep(0.05)
offer = b.generate_sync_offer()
payload = a.receive_sync_offer(offer)
merged = b.merge_sync_payload(payload)
assert merged > 0
t.join(timeout=2)
assert not t.is_alive(), "subscriber did not wake on merge"
assert len(results) == 1
assert len(results[0]) >= 1
sub.close()
class TestTailIndependence:
def test_two_subscribers_independent_cursors(self):
store = make_store()
store.add_node("n1", "entity", "N1", {"name": "one"})
cursor_at_n1 = store.heads()
store.add_node("n2", "entity", "N2", {"name": "two"})
sub_old = store.subscribe_from([]) sub_new = store.subscribe_from(cursor_at_n1)
old_entries = sub_old.next_batch(timeout_ms=100, max_count=100)
new_entries = sub_new.next_batch(timeout_ms=100, max_count=100)
assert len(old_entries) == 3
assert len(new_entries) == 1
sub_old.close()
sub_new.close()
class TestTailErrors:
def test_subscribe_from_unknown_hash_raises(self):
store = make_store()
fake_hash = "00" * 32
try:
sub = store.subscribe_from([fake_hash])
with pytest.raises(Exception):
sub.next_batch(timeout_ms=100, max_count=10)
sub.close()
except Exception:
pass
def test_subscribe_from_invalid_hex_raises(self):
store = make_store()
with pytest.raises(Exception):
store.subscribe_from(["not-a-real-hash"])
class TestTailRetention:
def test_stale_cursor_raises_after_compaction(self):
store = make_store()
store.add_node("n1", "entity", "N1", {"name": "one"})
old_cursor = store.heads()
store.add_node("n2", "entity", "N2", {"name": "two"})
sub = store.subscribe_from(old_cursor)
store.compact(safe=False)
with pytest.raises(Exception):
sub.next_batch(timeout_ms=100, max_count=10)
sub.close()
def test_register_cursor_blocks_compaction(self):
store = make_store()
store.add_node("n1", "entity", "N1", {"name": "one"})
old_cursor = store.heads()
store.add_node("n2", "entity", "N2", {"name": "two"})
store.register_subscriber_cursor(old_cursor)
safe, reasons = store.verify_compaction_safe()
assert not safe
assert any("cursor" in r.lower() or "subscriber" in r.lower() for r in reasons)
def test_register_cursor_at_head_allows_compaction(self):
store = make_store()
store.add_node("n1", "entity", "N1", {"name": "one"})
store.register_subscriber_cursor(store.heads())
safe, reasons = store.verify_compaction_safe()
if not safe:
cursor_reasons = [r for r in reasons if "cursor" in r.lower() or "subscriber" in r.lower()]
assert not cursor_reasons, f"cursor at head should not block: {cursor_reasons}"
def test_unregister_cursor_unblocks_compaction(self):
store = make_store()
store.add_node("n1", "entity", "N1", {"name": "one"})
old_cursor = store.heads()
store.add_node("n2", "entity", "N2", {"name": "two"})
store.register_subscriber_cursor(old_cursor)
safe1, _ = store.verify_compaction_safe()
assert not safe1
store.unregister_subscriber_cursor(old_cursor)
safe2, reasons = store.verify_compaction_safe()
if not safe2:
cursor_reasons = [r for r in reasons if "cursor" in r.lower() or "subscriber" in r.lower()]
assert not cursor_reasons