import asyncio, base64, hashlib, hmac, json, os, sys, time, uuid
import http.server, socketserver, threading
import httpx
BASE = os.environ.get("A2A_BASE", "http://127.0.0.1:19074")
RPC = f"{BASE}/api/v1/a2a"
WEBHOOK_PORT = 8902
WEBHOOK_SECRET = "e2e-secret"
WEBHOOK_HITS: list = []
PASS, FAIL = "[PASS]", "[FAIL]"
results: list[tuple[str, bool, str]] = []
def record(name: str, ok: bool, msg: str = ""):
results.append((name, ok, msg))
print(f"{PASS if ok else FAIL} {name}" + (f" — {msg}" if msg else ""))
def start_webhook_server():
class H(http.server.BaseHTTPRequestHandler):
def do_POST(self):
ln = int(self.headers.get("Content-Length") or 0)
body = self.rfile.read(ln) if ln else b""
WEBHOOK_HITS.append({
"sig": self.headers.get("X-A2A-Signature", ""),
"tid": self.headers.get("X-A2A-Task-Id", ""),
"body": body,
})
self.send_response(200); self.end_headers(); self.wfile.write(b"ok")
def log_message(self, *a, **k): pass
srv = socketserver.TCPServer(("127.0.0.1", WEBHOOK_PORT), H)
threading.Thread(target=srv.serve_forever, daemon=True).start()
async def rpc(c: httpx.AsyncClient, method: str, params: dict, rpc_id: str = "1") -> dict:
r = await c.post(RPC, json={"jsonrpc": "2.0", "id": rpc_id, "method": method, "params": params})
r.raise_for_status()
return r.json()
PNG_B64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNkYAAAAAYAAjCB0C8AAAAASUVORK5CYII="
import base64 as _b64 _MP4 = b"\x00\x00\x00\x18ftypmp42" + b"\x00" * 40
_MP3 = b"\xff\xfb\x90\x00" + b"\x00" * 60
_PDF = (
b"%PDF-1.4\n%\xe2\xe3\xcf\xd3\n1 0 obj<<>>endobj\n"
b"xref\n0 1\n0000000000 65535 f\ntrailer<<>>\n%%EOF\n"
)
_BIN = b"\x42" + b"\x99" * 64 + b"\x00ENDOFTEST"
MP4_B64 = _b64.b64encode(_MP4).decode()
MP3_B64 = _b64.b64encode(_MP3).decode()
PDF_B64 = _b64.b64encode(_PDF).decode()
BIN_B64 = _b64.b64encode(_BIN).decode()
FILE_FIXTURES = [
("snapshot.png", "image/png", base64.b64decode(PNG_B64), "images", "i"),
("clip.mp4", "video/mp4", _MP4, "videos", "v"),
("voice.mp3", "audio/mpeg", _MP3, "audios", "a"),
("notes.pdf", "application/pdf", _PDF, "docs", "d"),
("rawbytes.bin", "application/octet-stream", _BIN, "files", "f"),
]
async def t_agent_card(c):
r = await c.get(f"{BASE}/.well-known/agent.json")
card = r.json()
assert card["protocolVersion"] == "1.0", card.get("protocolVersion")
assert card["capabilities"]["streaming"] is True
assert card["capabilities"]["pushNotifications"] is True
assert card["capabilities"]["extendedAgentCard"] is True
assert "bearer" in card["securitySchemes"]
assert "apiKey" in card["securitySchemes"]
return f"v{card['protocolVersion']} {len(card['skills'])} skill(s)"
async def t_extended_agent_card(c):
res = await rpc(c, "GetExtendedAgentCard", {})
card = res["result"]
assert card["protocolVersion"] == "1.0", card
return "ok"
async def t_send_message_text(c):
res = await rpc(c, "SendMessage", {
"message": {"messageId": str(uuid.uuid4()), "role": "user",
"parts": [{"type": "text", "text": "reply with: ack"}]}})
r = res["result"]
assert r["status"]["state"] in ("TASK_STATE_COMPLETED", "TASK_STATE_FAILED"), r["status"]
if r["status"]["state"] == "TASK_STATE_FAILED":
return f"FAILED (likely LLM): {r['status'].get('message', {}).get('parts', [{}])[0].get('text', '?')[:60]}"
txt = r["artifacts"][0]["parts"][0]["text"]
return f"completed: {txt[:40]!r}"
async def t_send_message_with_raw_image(c):
res = await rpc(c, "SendMessage", {
"message": {
"messageId": str(uuid.uuid4()),
"role": "user",
"parts": [
{"type": "text", "text": "describe the image"},
{"type": "raw", "bytes": PNG_B64, "mimeType": "image/png"},
],
}})
r = res["result"]
ws = os.path.expanduser("~/.rsclaw-a2atest/workspace")
img_dir = os.path.join(ws, "a2a", "images")
if not os.path.isdir(img_dir):
return f"no a2a/images dir (status={r['status']['state']})"
matches = [f for f in os.listdir(img_dir) if f.startswith("a2a_i_") and f.endswith(".png")]
assert matches, f"no a2a_i_*.png in {img_dir}"
return f"wrote {matches[-1]}, status={r['status']['state']}"
async def t_send_message_with_data_part(c):
res = await rpc(c, "SendMessage", {
"message": {
"messageId": str(uuid.uuid4()),
"role": "user",
"parts": [
{"type": "text", "text": "parse the data"},
{"type": "data", "data": {"key": "v1", "n": 42}},
],
}})
r = res["result"]
parts = r["history"][0]["parts"]
types = [p["type"] for p in parts]
assert "text" in types and "data" in types, types
return f"history parts={types}, status={r['status']['state']}"
async def t_send_message_all_file_types(c):
ws = os.path.expanduser("~/.rsclaw-a2atest/workspace")
def snap():
out = {}
for _, _, _, bucket, _ in FILE_FIXTURES:
p = os.path.join(ws, "a2a", bucket)
out[bucket] = (
{f: os.path.getsize(os.path.join(p, f)) for f in os.listdir(p)}
if os.path.isdir(p)
else {}
)
return out
before = snap()
parts = [{"type": "text",
"text": "Five attachments follow — just acknowledge receipt."}]
for name, mime, data, _bucket, _kind in FILE_FIXTURES:
parts.append({
"type": "raw",
"bytes": base64.b64encode(data).decode(),
"mimeType": mime,
"name": name,
})
res = await rpc(c, "SendMessage", {
"message": {"messageId": str(uuid.uuid4()), "role": "user", "parts": parts}})
state = res["result"]["status"]["state"]
after = snap()
failures = []
matched = []
for name, _mime, data, bucket, kind in FILE_FIXTURES:
new = set(after[bucket]) - set(before[bucket])
hit = None
for fname in new:
if (after[bucket][fname] == len(data)
and fname.startswith(f"a2a_{kind}_")):
hit = fname
break
if hit:
matched.append(f"{bucket}:{hit}")
else:
failures.append(f"{bucket}/{name} not ingested (new files: {new})")
assert not failures, "; ".join(failures)
return f"5 buckets, all 5 files ingested (status={state})"
async def t_streaming(c):
body = {"jsonrpc": "2.0", "id": "s1", "method": "SendStreamingMessage",
"params": {"message": {"messageId": str(uuid.uuid4()), "role": "user",
"parts": [{"type": "text", "text": "reply ack"}]}}}
states = []
async with c.stream("POST", RPC, json=body, headers={"Accept": "text/event-stream"}) as r:
async for line in r.aiter_lines():
if not line.startswith("data:"): continue
f = json.loads(line[5:].strip()).get("result", {})
if f.get("kind") == "status-update":
states.append(f["status"]["state"])
if f.get("final"): break
assert states[0] == "TASK_STATE_SUBMITTED", states
assert states[1] == "TASK_STATE_WORKING", states
assert states[-1] in ("TASK_STATE_COMPLETED", "TASK_STATE_FAILED"), states
return f"{len(states)} events: {states[0]}→{states[-1]}"
async def t_list_tasks(c):
res = await rpc(c, "ListTasks", {"pageSize": 5})
r = res["result"]
assert "tasks" in r, r
return f"{len(r['tasks'])} task(s)"
async def t_get_task(c):
res = await rpc(c, "SendMessage", {
"message": {"messageId": str(uuid.uuid4()), "role": "user",
"parts": [{"type": "text", "text": "hi"}]}})
tid = res["result"]["id"]
got = await rpc(c, "GetTask", {"id": tid})
assert got["result"]["id"] == tid
return f"id={tid[:8]}"
async def t_get_task_not_found(c):
got = await rpc(c, "GetTask", {"id": "definitely-does-not-exist"})
assert "error" in got, got
assert got["error"]["code"] == -32001, got["error"]
return f"correct -32001"
async def t_subscribe_to_task_unknown(c):
body = {"jsonrpc": "2.0", "id": "sub1", "method": "SubscribeToTask",
"params": {"id": "does-not-exist-either"}}
states = []
async with c.stream("POST", RPC, json=body, headers={"Accept": "text/event-stream"}) as r:
async for line in r.aiter_lines():
if not line.startswith("data:"): continue
f = json.loads(line[5:].strip()).get("result", {})
states.append(f.get("status", {}).get("state"))
if f.get("final"): break
assert states == ["TASK_STATE_FAILED"], states
return "terminated cleanly"
async def t_cancel_task(c):
sse_body = {"jsonrpc": "2.0", "id": "sx", "method": "SendStreamingMessage",
"params": {"message": {"messageId": str(uuid.uuid4()), "role": "user",
"parts": [{"type": "text", "text": "count to 100 slowly"}]}}}
tid_holder = {"tid": None}
async def stream():
async with c.stream("POST", RPC, json=sse_body, headers={"Accept": "text/event-stream"}) as r:
async for line in r.aiter_lines():
if not line.startswith("data:"): continue
f = json.loads(line[5:].strip()).get("result", {})
tid_holder["tid"] = f.get("taskId") or tid_holder["tid"]
if f.get("final"): return
stream_task = asyncio.create_task(stream())
for _ in range(60):
if tid_holder["tid"]: break
await asyncio.sleep(0.05)
assert tid_holder["tid"], "no taskId observed"
cancel = await rpc(c, "CancelTask", {"id": tid_holder["tid"]})
assert cancel["result"]["status"]["state"] == "TASK_STATE_CANCELED", cancel
await stream_task
await asyncio.sleep(0.5)
got = await rpc(c, "GetTask", {"id": tid_holder["tid"]})
assert got["result"]["status"]["state"] == "TASK_STATE_CANCELED", got["result"]["status"]
return f"CANCELED stuck"
async def t_push_crud(c):
tid = "push-crud-" + str(uuid.uuid4())[:8]
created = await rpc(c, "CreateTaskPushNotificationConfig", {
"taskId": tid,
"pushNotificationConfig": {"id": "c1", "url": "http://x/", "token": "t"},
})
assert created["result"]["id"] == "c1", created
listed = await rpc(c, "ListTaskPushNotificationConfigs", {"taskId": tid})
assert any(x.get("id") == "c1" for x in listed["result"]["configs"]), listed
got = await rpc(c, "GetTaskPushNotificationConfig", {
"taskId": tid, "pushNotificationConfigId": "c1"})
assert got["result"]["url"] == "http://x/", got
deleted = await rpc(c, "DeleteTaskPushNotificationConfig", {
"taskId": tid, "pushNotificationConfigId": "c1"})
assert deleted.get("result") is not None or deleted.get("error") is None, deleted
return "Create/List/Get/Delete all ok"
async def t_push_delivery(c):
WEBHOOK_HITS.clear()
tid = "push-deliv-" + str(uuid.uuid4())[:8]
await rpc(c, "CreateTaskPushNotificationConfig", {
"taskId": tid,
"pushNotificationConfig": {
"id": "cfg-1",
"url": f"http://127.0.0.1:{WEBHOOK_PORT}/hook",
"token": WEBHOOK_SECRET,
}})
body = {"jsonrpc": "2.0", "id": "s1", "method": "SendStreamingMessage",
"params": {"message": {"messageId": str(uuid.uuid4()), "role": "user",
"parts": [{"type": "text", "text": "hi"}],
"taskId": tid}}}
async with c.stream("POST", RPC, json=body, headers={"Accept": "text/event-stream"}) as r:
async for line in r.aiter_lines():
if not line.startswith("data:"): continue
f = json.loads(line[5:].strip()).get("result", {})
if f.get("final"): break
await asyncio.sleep(1.0)
assert len(WEBHOOK_HITS) >= 3, f"only {len(WEBHOOK_HITS)} hit(s)"
kinds = []
states = []
for h in WEBHOOK_HITS:
expected = base64.b64encode(hmac.new(WEBHOOK_SECRET.encode(), h["body"], hashlib.sha256).digest()).decode()
assert h["sig"] == expected, f"HMAC mismatch: {h['sig']} vs {expected}"
assert h["tid"] == tid, f"task id mismatch: {h['tid']}"
d = json.loads(h["body"])
kinds.append(d.get("kind"))
if d.get("kind") == "status-update":
states.append(d["status"]["state"])
assert "TASK_STATE_SUBMITTED" in states and "TASK_STATE_WORKING" in states, states
assert "artifact-update" in kinds or "TASK_STATE_COMPLETED" in states or "TASK_STATE_FAILED" in states, (kinds, states)
return f"{len(WEBHOOK_HITS)} hits ({'/'.join(kinds)}), HMAC all valid"
async def _wait_input_round_trip(c, auth: bool, expected_state: str):
prompt = "What is your favorite color?" if not auth else "Please provide your bearer token."
tool_call = (
"Call the wait_input tool with prompt='" + prompt + "'"
+ (", auth: true" if auth else "")
+ ". Once you receive the response, echo it back verbatim as the final reply."
)
body = {"jsonrpc": "2.0", "id": "w1", "method": "SendStreamingMessage",
"params": {"message": {"messageId": str(uuid.uuid4()), "role": "user",
"parts": [{"type": "text", "text": tool_call}]}}}
saw_state = False
saw_artifact_text = None
tid = None
resumed = False
async with c.stream("POST", RPC, json=body, headers={"Accept": "text/event-stream"}) as r:
async for line in r.aiter_lines():
if not line.startswith("data:"): continue
f = json.loads(line[5:].strip()).get("result", {})
tid = f.get("taskId") or tid
kind = f.get("kind")
if kind == "status-update":
st = f["status"]["state"]
if st == expected_state and not resumed:
saw_state = True
resumed = True
await rpc(c, "SendMessage", {
"message": {
"messageId": str(uuid.uuid4()),
"taskId": tid,
"role": "user",
"parts": [{"type": "text", "text": "chartreuse"}],
}}, rpc_id="r1")
elif kind == "artifact-update":
saw_artifact_text = f["artifact"]["parts"][0]["text"]
if f.get("final"): break
assert saw_state, f"never observed {expected_state}"
assert saw_artifact_text and "chartreuse" in saw_artifact_text, \
f"artifact text missing resumed value: {saw_artifact_text!r}"
return f"{expected_state} observed + artifact carried 'chartreuse'"
async def t_wait_input(c):
return await _wait_input_round_trip(c, auth=False, expected_state="TASK_STATE_INPUT_REQUIRED")
async def t_wait_auth(c):
return await _wait_input_round_trip(c, auth=True, expected_state="TASK_STATE_AUTH_REQUIRED")
async def _probe_auth_mode(c: httpx.AsyncClient) -> bool:
r = await c.post(RPC, json={
"jsonrpc": "2.0", "id": "probe", "method": "GetExtendedAgentCard", "params": {}})
return r.status_code == 401
async def t_auth_rejects_no_token(c):
r = await c.post(RPC, json={
"jsonrpc": "2.0", "id": "a1", "method": "GetExtendedAgentCard", "params": {}})
assert r.status_code == 401, f"expected 401, got {r.status_code}"
return "401 returned"
async def t_auth_accepts_valid_bearer(c):
tok = os.environ["A2A_BEARER_TOKEN"]
r = await c.post(RPC, headers={"Authorization": f"Bearer {tok}"},
json={"jsonrpc": "2.0", "id": "a2", "method": "GetExtendedAgentCard", "params": {}})
assert r.status_code == 200, f"expected 200, got {r.status_code}: {r.text[:200]}"
return "200 with valid bearer"
async def t_auth_rejects_bad_bearer(c):
r = await c.post(RPC, headers={"Authorization": "Bearer NOT-A-REAL-TOKEN"},
json={"jsonrpc": "2.0", "id": "a3", "method": "GetExtendedAgentCard", "params": {}})
assert r.status_code == 401, f"expected 401, got {r.status_code}"
return "401 with invalid bearer"
async def t_auth_accepts_valid_apikey(c):
key = os.environ.get("A2A_API_KEY")
if not key:
return "skipped — A2A_API_KEY not set"
r = await c.post(RPC, headers={"X-API-Key": key},
json={"jsonrpc": "2.0", "id": "a4", "method": "GetExtendedAgentCard", "params": {}})
assert r.status_code == 200, f"expected 200, got {r.status_code}"
return "200 with valid X-API-Key"
TESTS = [
("agent_card", t_agent_card),
("extended_agent_card", t_extended_agent_card),
("send_message_text", t_send_message_text),
("send_message_raw_image", t_send_message_with_raw_image),
("send_message_data_part", t_send_message_with_data_part),
("send_message_all_file_types", t_send_message_all_file_types),
("streaming", t_streaming),
("list_tasks", t_list_tasks),
("get_task", t_get_task),
("get_task_not_found", t_get_task_not_found),
("subscribe_to_task_unknown", t_subscribe_to_task_unknown),
("cancel_task", t_cancel_task),
("push_crud", t_push_crud),
("push_delivery", t_push_delivery),
("wait_input", t_wait_input),
("wait_auth", t_wait_auth),
]
AUTH_TESTS = [
("auth_rejects_no_token", t_auth_rejects_no_token),
("auth_accepts_valid_bearer", t_auth_accepts_valid_bearer),
("auth_rejects_bad_bearer", t_auth_rejects_bad_bearer),
("auth_accepts_valid_apikey", t_auth_accepts_valid_apikey),
]
async def main():
start_webhook_server()
await asyncio.sleep(0.3)
async with httpx.AsyncClient(timeout=180) as c:
auth_enforced = await _probe_auth_mode(c)
for name, fn in TESTS:
try:
msg = await fn(c) if not auth_enforced else "skipped — auth mode (use without RSCLAW_A2A_* env)"
if auth_enforced:
print(f"[SKIP] {name} — {msg}")
results.append((name, True, msg))
else:
record(name, True, msg)
except Exception as e:
record(name, False, f"{type(e).__name__}: {e}")
if auth_enforced:
print()
print("=== gateway in auth-enforced mode; running auth-only tests ===")
if "A2A_BEARER_TOKEN" not in os.environ:
print("[SKIP] auth tests — set A2A_BEARER_TOKEN to a value present in "
"RSCLAW_A2A_BEARER_TOKENS on the gateway side")
else:
for name, fn in AUTH_TESTS:
try:
msg = await fn(c)
record(name, True, msg)
except Exception as e:
record(name, False, f"{type(e).__name__}: {e}")
print()
p = sum(1 for _, ok, _ in results if ok)
f = len(results) - p
print(f"=== {p}/{len(results)} passed, {f} failed ===")
sys.exit(0 if f == 0 else 1)
asyncio.run(main())