import copy
import os
import pickle
import statistics
import sys
import time
import pytest
sys.path.insert(0, ".")
from experiments.harness import Metric, check_metrics, print_table
TERMINUSDB_URL = os.environ.get("TERMINUSDB_URL", "http://127.0.0.1:6363/")
TERMINUSDB_PASS = os.environ.get("TERMINUSDB_PASS", "bench123")
def _silk_available():
try:
from silk import GraphStore
return True
except ImportError:
return False
def _terminusdb_available():
try:
from terminusdb import Client
c = Client(TERMINUSDB_URL)
c.connect(user="admin", key=TERMINUSDB_PASS)
c.info()
return True
except Exception:
return False
def _timed(fn) -> float:
t0 = time.perf_counter()
fn()
return (time.perf_counter() - t0) * 1000
def silk_write(n):
from silk import GraphStore
ont = {"node_types": {"entity": {"properties": {"name": {"value_type": "string"}, "status": {"value_type": "string"}, "seq": {"value_type": "int"}}}}, "edge_types": {}}
s = GraphStore("bench", ont)
for i in range(n):
s.add_node(f"n-{i}", "entity", f"N{i}", {"name": f"node-{i}", "status": "active", "seq": i})
return s
def silk_update(s, n):
for i in range(n):
s.update_property(f"n-{i % 100}", "status", f"updated-{i}")
def silk_query_all(s):
return s.all_nodes()
def silk_snapshot_size(s):
return len(s.snapshot())
def silk_bfs(s, start):
return s.bfs(start)
def silk_dfs(s, start):
return s.dfs(start)
def nx_write(n):
import networkx as nx
g = nx.DiGraph()
for i in range(n):
g.add_node(f"n-{i}", name=f"node-{i}", status="active", seq=i, _type="entity")
return g
def nx_update(g, n):
for i in range(n):
g.nodes[f"n-{i % 100}"]["status"] = f"updated-{i}"
def nx_query_all(g):
return list(g.nodes(data=True))
def nx_snapshot_size(g):
return len(pickle.dumps(g))
def nx_bfs(g, start):
import networkx as nx
return list(nx.bfs_tree(g, start))
def nx_dfs(g, start):
import networkx as nx
return list(nx.dfs_tree(g, start))
def tdb_setup(db_name):
from terminusdb import Client
c = Client(TERMINUSDB_URL)
c.connect(user="admin", key=TERMINUSDB_PASS)
try:
c.delete_database(db_name)
except Exception:
pass
c.create_database(db_name)
c.insert_document({
"@type": "Class", "@id": "Entity",
"name": "xsd:string", "status": "xsd:string", "seq": "xsd:integer",
}, graph_type="schema")
return c
def tdb_write(c, n):
docs = [{"@type": "Entity", "name": f"node-{i}", "status": "active", "seq": i} for i in range(n)]
c.insert_document(docs)
def tdb_update(c, doc_id, key, value):
doc = c.get_document(doc_id)
doc[key] = value
c.replace_document(doc)
def tdb_query_all(c):
return list(c.query_document({"@type": "Entity"}))
def run_comparison(n=1000, rounds=5):
results = []
if _silk_available():
write_times = []
query_times = []
update_times = []
for _ in range(rounds):
t = _timed(lambda: silk_write(n))
write_times.append(t)
s = silk_write(n)
for _ in range(rounds):
query_times.append(_timed(lambda: silk_query_all(s)))
for _ in range(rounds):
update_times.append(_timed(lambda: silk_update(s, 100)))
snap = silk_snapshot_size(s)
mem = s.memory_usage()["total_bytes"]
results.append({
"system": "silk",
"write_ms": round(statistics.median(write_times), 2),
"write_ops_sec": int(n / (statistics.median(write_times) / 1000)),
"query_ms": round(statistics.median(query_times), 2),
"update_100_ms": round(statistics.median(update_times), 2),
"snapshot_kb": round(snap / 1024, 1),
"memory_kb": round(mem / 1024, 1),
})
write_times = []
query_times = []
update_times = []
for _ in range(rounds):
write_times.append(_timed(lambda: nx_write(n)))
g = nx_write(n)
for _ in range(rounds):
query_times.append(_timed(lambda: nx_query_all(g)))
for _ in range(rounds):
update_times.append(_timed(lambda: nx_update(g, 100)))
snap = nx_snapshot_size(g)
results.append({
"system": "networkx",
"write_ms": round(statistics.median(write_times), 2),
"write_ops_sec": int(n / (statistics.median(write_times) / 1000)),
"query_ms": round(statistics.median(query_times), 2),
"update_100_ms": round(statistics.median(update_times), 2),
"snapshot_kb": round(snap / 1024, 1),
"memory_kb": round(snap / 1024, 1), })
if _terminusdb_available():
write_times = []
query_times = []
for _ in range(rounds):
c = tdb_setup(f"bench_comp_{_}")
write_times.append(_timed(lambda: tdb_write(c, n)))
c = tdb_setup("bench_comp_q")
tdb_write(c, n)
for _ in range(rounds):
query_times.append(_timed(lambda: tdb_query_all(c)))
results.append({
"system": "terminusdb",
"write_ms": round(statistics.median(write_times), 2),
"write_ops_sec": int(n / (statistics.median(write_times) / 1000)),
"query_ms": round(statistics.median(query_times), 2),
"update_100_ms": "N/A",
"snapshot_kb": "N/A",
"memory_kb": "N/A (server)",
})
return results
def run_traversal_comparison(n=1000, rounds=10):
from silk import GraphStore
import networkx as nx
ont = {"node_types": {"entity": {"properties": {}}}, "edge_types": {"NEXT": {"source_types": ["entity"], "target_types": ["entity"]}}}
s = GraphStore("trav", ont)
g = nx.DiGraph()
for i in range(n):
s.add_node(f"n-{i}", "entity", f"N{i}")
g.add_node(f"n-{i}")
for i in range(n - 1):
s.add_edge(f"e-{i}", "NEXT", f"n-{i}", f"n-{i+1}")
g.add_edge(f"n-{i}", f"n-{i+1}")
results = []
for name, silk_fn, nx_fn in [
("BFS", lambda: s.bfs("n-0"), lambda: list(nx.bfs_tree(g, "n-0"))),
("DFS", lambda: s.dfs("n-0"), lambda: list(nx.dfs_tree(g, "n-0"))),
]:
silk_times = [_timed(silk_fn) for _ in range(rounds)]
nx_times = [_timed(nx_fn) for _ in range(rounds)]
results.append({
"algorithm": name,
"silk_ms": round(statistics.median(silk_times), 3),
"networkx_ms": round(statistics.median(nx_times), 3),
"ratio": round(statistics.median(silk_times) / statistics.median(nx_times), 2) if statistics.median(nx_times) > 0 else 0,
})
return results
def test_silk_faster_than_terminusdb_on_write():
if not _terminusdb_available():
pytest.skip("TerminusDB not running")
results = run_comparison(n=500, rounds=3)
silk = next((r for r in results if r["system"] == "silk"), None)
tdb = next((r for r in results if r["system"] == "terminusdb"), None)
if silk is None or tdb is None:
pytest.skip("silk or terminusdb not available")
check_metrics([
Metric(
name="silk_vs_terminusdb_write",
measured=round(silk["write_ms"] / tdb["write_ms"], 2),
threshold=1.0,
op="<",
unit="x (lower = silk faster)",
),
], label="EXP-07 silk vs terminusdb write")
def test_networkx_baseline_overhead():
results = run_comparison(n=1000, rounds=3)
silk = next((r for r in results if r["system"] == "silk"), None)
nxr = next((r for r in results if r["system"] == "networkx"), None)
if silk is None or nxr is None:
pytest.skip("silk or networkx not available")
ratio = silk["write_ms"] / nxr["write_ms"] if nxr["write_ms"] > 0 else float("inf")
check_metrics([
Metric(
name="silk_vs_networkx_write_overhead",
measured=round(ratio, 1),
threshold=12.0,
op="<",
unit="x",
),
], label="EXP-07 CRDT overhead vs plain graph")
if __name__ == "__main__":
import platform
print(f"EXP-07: Graph System Comparison")
print(f" platform: {platform.machine()} / {platform.system()}")
print(f" python: {platform.python_version()}")
print(f" terminusdb: {'available' if _terminusdb_available() else 'not running'}")
print()
print("--- Write / Query / Snapshot (1000 entities) ---")
results = run_comparison(n=1000, rounds=5)
print_table(results, ["system", "write_ms", "write_ops_sec", "query_ms", "update_100_ms", "snapshot_kb", "memory_kb"])
print()
print("--- Traversal: BFS/DFS on 1000-node chain ---")
trav = run_traversal_comparison(n=1000, rounds=10)
print_table(trav, ["algorithm", "silk_ms", "networkx_ms", "ratio"])