from __future__ import annotations
import argparse
import asyncio
import json
import sys
import requests
import websockets
def pp(label: str, msg: dict):
print(f" {label}: {json.dumps(msg, indent=2)}")
async def send(ws, msg: dict) -> dict:
raw = json.dumps(msg)
await ws.send(raw)
resp = json.loads(await ws.recv())
return resp
async def demo_ws(host: str, port: int, token: str | None):
url = f"ws://{host}:{port}/ws"
print(f"\n{'='*60}")
print(f"WebSocket demo — {url}")
print(f"{'='*60}")
async with websockets.connect(url) as ws:
print("\n1. Connect + auth")
hello = {"type": "hello"}
if token:
hello["token"] = token
resp = await send(ws, hello)
pp("hello_ok", resp)
print("\n2. Schema setup (batch)")
resp = await send(ws, {
"type": "batch",
"request_id": "setup",
"statements": [
{"query": "CREATE NODE TABLE IF NOT EXISTS Person(name STRING, age INT64, PRIMARY KEY(name))"},
{"query": "CREATE NODE TABLE IF NOT EXISTS City(name STRING, PRIMARY KEY(name))"},
{"query": "CREATE REL TABLE IF NOT EXISTS LIVES_IN(FROM Person TO City)"},
{"query": "CREATE REL TABLE IF NOT EXISTS KNOWS(FROM Person TO Person, since INT64)"},
],
})
pp("batch_result", resp)
print("\n3. Insert data (execute with params)")
people = [
("Alice", 30), ("Bob", 42), ("Carol", 28),
("Dave", 35), ("Eve", 22), ("Frank", 50),
]
for name, age in people:
resp = await send(ws, {
"type": "execute",
"query": "MERGE (:Person {name: $name, age: $age})",
"params": {"name": name, "age": age},
})
print(f" Inserted {len(people)} people")
cities = ["New York", "London", "Tokyo"]
for city in cities:
await send(ws, {
"type": "execute",
"query": "MERGE (:City {name: $name})",
"params": {"name": city},
})
print(f" Inserted {len(cities)} cities")
edges = [
("Alice", "New York"), ("Bob", "London"), ("Carol", "Tokyo"),
("Dave", "New York"), ("Eve", "London"), ("Frank", "Tokyo"),
]
for person, city in edges:
await send(ws, {
"type": "execute",
"query": "MATCH (p:Person {name: $p}), (c:City {name: $c}) MERGE (p)-[:LIVES_IN]->(c)",
"params": {"p": person, "c": city},
})
knows = [
("Alice", "Bob", 2020), ("Alice", "Carol", 2021),
("Bob", "Dave", 2019), ("Carol", "Eve", 2022),
("Dave", "Frank", 2018),
]
for a, b, since in knows:
await send(ws, {
"type": "execute",
"query": "MATCH (a:Person {name: $a}), (b:Person {name: $b}) MERGE (a)-[:KNOWS {since: $since}]->(b)",
"params": {"a": a, "b": b, "since": since},
})
print(f" Created {len(edges)} LIVES_IN + {len(knows)} KNOWS relationships")
print("\n4. Query — scalars")
resp = await send(ws, {
"type": "execute",
"query": "MATCH (p:Person) WHERE p.age > $min RETURN p.name, p.age ORDER BY p.age DESC",
"params": {"min": 25},
"request_id": "q-scalars",
})
pp("result", resp)
print("\n5. Query — nodes + rels")
resp = await send(ws, {
"type": "execute",
"query": "MATCH (a:Person)-[r:KNOWS]->(b:Person) RETURN a, r, b LIMIT 3",
"request_id": "q-nodes",
})
pp("result", resp)
print("\n6. Streaming (fetch_size=2)")
resp = await send(ws, {
"type": "execute",
"query": "MATCH (p:Person) RETURN p.name ORDER BY p.name",
"fetch_size": 2,
"request_id": "stream-1",
})
pp("batch 1", resp)
batch = 1
while resp.get("has_more"):
stream_id = resp["stream_id"]
resp = await send(ws, {
"type": "fetch",
"stream_id": stream_id,
})
batch += 1
pp(f"batch {batch}", resp)
print(f" Streamed {batch} batches total")
print("\n7. Streaming — early close_stream")
resp = await send(ws, {
"type": "execute",
"query": "MATCH (p:Person) RETURN p.name ORDER BY p.name",
"fetch_size": 1,
})
pp("first batch", resp)
if resp.get("has_more"):
close_resp = await send(ws, {
"type": "close_stream",
"stream_id": resp["stream_id"],
})
pp("close_stream", close_resp)
print("\n8. Transaction — commit")
resp = await send(ws, {"type": "begin", "request_id": "tx1"})
pp("begin", resp)
resp = await send(ws, {
"type": "execute",
"query": "MERGE (:Person {name: 'Grace', age: 29})",
})
pp("insert", resp)
resp = await send(ws, {"type": "commit", "request_id": "tx1"})
pp("commit", resp)
resp = await send(ws, {
"type": "execute",
"query": "MATCH (p:Person {name: 'Grace'}) RETURN p.name, p.age",
})
pp("verify", resp)
print("\n9. Transaction — rollback")
resp = await send(ws, {"type": "begin"})
pp("begin", resp)
resp = await send(ws, {
"type": "execute",
"query": "MERGE (:Person {name: 'Heidi', age: 99})",
})
pp("insert", resp)
resp = await send(ws, {"type": "rollback"})
pp("rollback", resp)
resp = await send(ws, {
"type": "execute",
"query": "MATCH (p:Person {name: 'Heidi'}) RETURN p.name",
})
pp("verify (should be empty)", resp)
print("\n10. Error handling")
resp = await send(ws, {
"type": "execute",
"query": "THIS IS NOT VALID CYPHER",
"request_id": "bad-query",
})
pp("error", resp)
print("\n11. Graceful close")
resp = await send(ws, {"type": "close"})
pp("close", resp)
def demo_http(host: str, port: int, token: str | None):
base = f"http://{host}:{port}"
headers = {}
if token:
headers["Authorization"] = f"Bearer {token}"
print(f"\n{'='*60}")
print(f"HTTP demo — {base}")
print(f"{'='*60}")
print("\n12. HTTP execute")
resp = requests.post(
f"{base}/v1/execute",
json={"query": "MATCH (p:Person) RETURN p.name, p.age ORDER BY p.age LIMIT 3"},
headers=headers,
)
pp("result", resp.json())
print("\n13. HTTP batch")
resp = requests.post(
f"{base}/v1/batch",
json={
"statements": [
{"query": "MATCH (p:Person) RETURN count(p) AS total"},
{"query": "MATCH ()-[r:KNOWS]->() RETURN count(r) AS total"},
]
},
headers=headers,
)
pp("batch_result", resp.json())
print("\n14. HTTP pipeline (transactional)")
resp = requests.post(
f"{base}/v1/pipeline",
json={
"statements": [
{"query": "MERGE (:Person {name: 'Ivan', age: 33})"},
{"query": "MATCH (p:Person {name: 'Ivan'}) RETURN p.name, p.age"},
]
},
headers=headers,
)
pp("pipeline_result", resp.json())
print("\n15. HTTP pipeline — error + rollback")
resp = requests.post(
f"{base}/v1/pipeline",
json={
"statements": [
{"query": "MERGE (:Person {name: 'Judy', age: 44})"},
{"query": "INVALID QUERY"},
]
},
headers=headers,
)
pp("pipeline_result (Judy rolled back)", resp.json())
resp = requests.post(
f"{base}/v1/execute",
json={"query": "MATCH (p:Person {name: 'Judy'}) RETURN p.name"},
headers=headers,
)
pp("verify Judy absent", resp.json())
print("\n16. Health check")
resp = requests.get(f"{base}/health")
print(f" GET /health: {resp.status_code} {resp.text}")
def main():
parser = argparse.ArgumentParser(description="Strana / graphd demo")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=7688)
parser.add_argument("--token", default=None)
args = parser.parse_args()
try:
asyncio.run(demo_ws(args.host, args.port, args.token))
demo_http(args.host, args.port, args.token)
except ConnectionRefusedError:
print(f"\nError: cannot connect to graphd at {args.host}:{args.port}", file=sys.stderr)
print("Start it with: graphd --data-dir /tmp/demo-data", file=sys.stderr)
sys.exit(1)
print(f"\n{'='*60}")
print("Demo complete.")
print(f"{'='*60}")
if __name__ == "__main__":
main()