rsclaw 2026.5.20

AI Agent Engine Compatible with OpenClaw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
#!/usr/bin/env python3
"""A2A v1.0 end-to-end test matrix against a live rsclaw gateway.

Companion to `tests/a2a_interop_python.md` (which documents individual
tests in prose). This file runs all 15 of them in one shot and exits
non-zero on any failure — useful as a regression check before landing
A2A-touching changes.

Coverage:
  AgentCard discovery
  GetExtendedAgentCard
  SendMessage          (text only)
  SendMessage          (with raw image part → verifies workspace/a2a/images/)
  SendMessage          (with data part → verifies history preserves Data part)
  SendStreamingMessage (text)
  ListTasks            (pagination)
  GetTask              (success + not-found)
  SubscribeToTask      (nonexistent → terminal Failed + stream closes)
  CancelTask           (in-flight → CANCELED state persists, not flipped)
  Push notification    (Create/Get/List/Delete + HMAC-verified delivery
                        across status-update + artifact-update frames)
  wait_input           (INPUT_REQUIRED suspend/resume round-trip)
  wait_input(auth=true) (AUTH_REQUIRED variant)

Each test prints `[PASS]` / `[FAIL]` and a one-line reason. Exit code
matches: 0 if all green, 1 otherwise.

Prerequisites
-------------
1. A running rsclaw gateway. The default profile setup:

       cp ~/.rsclaw/rsclaw.json5 ~/.rsclaw-a2atest/rsclaw.json5
       target/debug/rsclaw --profile a2atest gateway run

   gives you an isolated instance on port 19074 sharing your providers.
   Override with `A2A_BASE=http://host:port` if you point elsewhere.

2. A provider with tool-use support is required for `wait_input` /
   `wait_auth` (verified against deepseek/qwen). The non-wait_input
   tests pass even on a rate-limited provider since they only check
   protocol-level behavior.

3. `pip install httpx`.

Run
---
       python3 tests/a2a_e2e_runner.py
"""
import asyncio, base64, hashlib, hmac, json, os, sys, time, uuid
import http.server, socketserver, threading
import httpx

BASE = os.environ.get("A2A_BASE", "http://127.0.0.1:19074")
RPC = f"{BASE}/api/v1/a2a"
WEBHOOK_PORT = 8902
WEBHOOK_SECRET = "e2e-secret"
WEBHOOK_HITS: list = []

PASS, FAIL = "[PASS]", "[FAIL]"
results: list[tuple[str, bool, str]] = []


def record(name: str, ok: bool, msg: str = ""):
    results.append((name, ok, msg))
    print(f"{PASS if ok else FAIL} {name}" + (f"{msg}" if msg else ""))


def start_webhook_server():
    class H(http.server.BaseHTTPRequestHandler):
        def do_POST(self):
            ln = int(self.headers.get("Content-Length") or 0)
            body = self.rfile.read(ln) if ln else b""
            WEBHOOK_HITS.append({
                "sig": self.headers.get("X-A2A-Signature", ""),
                "tid": self.headers.get("X-A2A-Task-Id", ""),
                "body": body,
            })
            self.send_response(200); self.end_headers(); self.wfile.write(b"ok")
        def log_message(self, *a, **k): pass
    srv = socketserver.TCPServer(("127.0.0.1", WEBHOOK_PORT), H)
    threading.Thread(target=srv.serve_forever, daemon=True).start()


async def rpc(c: httpx.AsyncClient, method: str, params: dict, rpc_id: str = "1") -> dict:
    r = await c.post(RPC, json={"jsonrpc": "2.0", "id": rpc_id, "method": method, "params": params})
    r.raise_for_status()
    return r.json()


# 1x1 transparent PNG.
PNG_B64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNkYAAAAAYAAjCB0C8AAAAASUVORK5CYII="

# Minimal fixtures for the multi-part file-transfer test. None of these has to
# be valid for the agent to *process* — the test only asserts ingest routing:
# bytes land in the right `workspace/a2a/<bucket>/` directory with the right
# canonical filename prefix (`a2a_<kind>_…`). Real validity matters for
# downstream OCR / video / audio pipelines, not for this protocol-layer test.
import base64 as _b64  # avoid shadowing the module-level `base64` import.
_MP4 = b"\x00\x00\x00\x18ftypmp42" + b"\x00" * 40
_MP3 = b"\xff\xfb\x90\x00" + b"\x00" * 60
_PDF = (
    b"%PDF-1.4\n%\xe2\xe3\xcf\xd3\n1 0 obj<<>>endobj\n"
    b"xref\n0 1\n0000000000 65535 f\ntrailer<<>>\n%%EOF\n"
)
_BIN = b"\x42" + b"\x99" * 64 + b"\x00ENDOFTEST"
MP4_B64 = _b64.b64encode(_MP4).decode()
MP3_B64 = _b64.b64encode(_MP3).decode()
PDF_B64 = _b64.b64encode(_PDF).decode()
BIN_B64 = _b64.b64encode(_BIN).decode()
# (image, video, audio, doc, binary) — labels mirror the bucket names in
# `src/a2a/files.rs` (`workspace/a2a/{images,videos,audios,docs,files}`).
FILE_FIXTURES = [
    # (logical-name, mime, base64, expected-bucket, expected-kind-letter, byte-count)
    ("snapshot.png", "image/png",                base64.b64decode(PNG_B64), "images", "i"),
    ("clip.mp4",     "video/mp4",                _MP4,                       "videos", "v"),
    ("voice.mp3",    "audio/mpeg",               _MP3,                       "audios", "a"),
    ("notes.pdf",    "application/pdf",          _PDF,                       "docs",   "d"),
    ("rawbytes.bin", "application/octet-stream", _BIN,                       "files",  "f"),
]


async def t_agent_card(c):
    r = await c.get(f"{BASE}/.well-known/agent.json")
    card = r.json()
    assert card["protocolVersion"] == "1.0", card.get("protocolVersion")
    assert card["capabilities"]["streaming"] is True
    assert card["capabilities"]["pushNotifications"] is True
    assert card["capabilities"]["extendedAgentCard"] is True
    assert "bearer" in card["securitySchemes"]
    assert "apiKey" in card["securitySchemes"]
    return f"v{card['protocolVersion']} {len(card['skills'])} skill(s)"


async def t_extended_agent_card(c):
    res = await rpc(c, "GetExtendedAgentCard", {})
    card = res["result"]
    assert card["protocolVersion"] == "1.0", card
    return "ok"


async def t_send_message_text(c):
    res = await rpc(c, "SendMessage", {
        "message": {"messageId": str(uuid.uuid4()), "role": "user",
                    "parts": [{"type": "text", "text": "reply with: ack"}]}})
    r = res["result"]
    assert r["status"]["state"] in ("TASK_STATE_COMPLETED", "TASK_STATE_FAILED"), r["status"]
    if r["status"]["state"] == "TASK_STATE_FAILED":
        return f"FAILED (likely LLM): {r['status'].get('message', {}).get('parts', [{}])[0].get('text', '?')[:60]}"
    txt = r["artifacts"][0]["parts"][0]["text"]
    return f"completed: {txt[:40]!r}"


async def t_send_message_with_raw_image(c):
    """Send a PNG as a Raw part. Verify the ingest wrote a file into
    workspace/a2a/images/ — even if the LLM call fails, the file lands
    because ingest happens before the agent runs."""
    res = await rpc(c, "SendMessage", {
        "message": {
            "messageId": str(uuid.uuid4()),
            "role": "user",
            "parts": [
                {"type": "text", "text": "describe the image"},
                {"type": "raw", "bytes": PNG_B64, "mimeType": "image/png"},
            ],
        }})
    r = res["result"]
    # Look in the profile's workspace for an a2a-prefixed image.
    # workspace lives under ~/.rsclaw-a2atest/workspace by default.
    ws = os.path.expanduser("~/.rsclaw-a2atest/workspace")
    img_dir = os.path.join(ws, "a2a", "images")
    if not os.path.isdir(img_dir):
        return f"no a2a/images dir (status={r['status']['state']})"
    matches = [f for f in os.listdir(img_dir) if f.startswith("a2a_i_") and f.endswith(".png")]
    assert matches, f"no a2a_i_*.png in {img_dir}"
    return f"wrote {matches[-1]}, status={r['status']['state']}"


async def t_send_message_with_data_part(c):
    res = await rpc(c, "SendMessage", {
        "message": {
            "messageId": str(uuid.uuid4()),
            "role": "user",
            "parts": [
                {"type": "text", "text": "parse the data"},
                {"type": "data", "data": {"key": "v1", "n": 42}},
            ],
        }})
    r = res["result"]
    # Verify the history shows both parts persisted, regardless of LLM outcome.
    parts = r["history"][0]["parts"]
    types = [p["type"] for p in parts]
    assert "text" in types and "data" in types, types
    return f"history parts={types}, status={r['status']['state']}"


async def t_send_message_all_file_types(c):
    """Send a single SendMessage carrying every file-kind we support
    (image / video / audio / doc / binary) and verify each lands in its
    own `workspace/a2a/<bucket>/` directory with the canonical
    `a2a_<kind>_<ts><abc>.<ext>` filename prefix.

    The agent's reply itself is irrelevant for this test — ingest runs
    BEFORE the LLM call, so the file write is testable even with a
    rate-limited / offline provider."""
    ws = os.path.expanduser("~/.rsclaw-a2atest/workspace")

    def snap():
        out = {}
        for _, _, _, bucket, _ in FILE_FIXTURES:
            p = os.path.join(ws, "a2a", bucket)
            out[bucket] = (
                {f: os.path.getsize(os.path.join(p, f)) for f in os.listdir(p)}
                if os.path.isdir(p)
                else {}
            )
        return out

    before = snap()
    parts = [{"type": "text",
              "text": "Five attachments follow — just acknowledge receipt."}]
    for name, mime, data, _bucket, _kind in FILE_FIXTURES:
        parts.append({
            "type": "raw",
            "bytes": base64.b64encode(data).decode(),
            "mimeType": mime,
            "name": name,
        })
    res = await rpc(c, "SendMessage", {
        "message": {"messageId": str(uuid.uuid4()), "role": "user", "parts": parts}})
    state = res["result"]["status"]["state"]
    after = snap()

    # Audit each bucket: find one new file whose size matches the fixture
    # AND whose prefix matches the canonical `a2a_<kind>_` convention.
    failures = []
    matched = []
    for name, _mime, data, bucket, kind in FILE_FIXTURES:
        new = set(after[bucket]) - set(before[bucket])
        hit = None
        for fname in new:
            if (after[bucket][fname] == len(data)
                    and fname.startswith(f"a2a_{kind}_")):
                hit = fname
                break
        if hit:
            matched.append(f"{bucket}:{hit}")
        else:
            failures.append(f"{bucket}/{name} not ingested (new files: {new})")

    assert not failures, "; ".join(failures)
    return f"5 buckets, all 5 files ingested (status={state})"


async def t_streaming(c):
    body = {"jsonrpc": "2.0", "id": "s1", "method": "SendStreamingMessage",
            "params": {"message": {"messageId": str(uuid.uuid4()), "role": "user",
                                   "parts": [{"type": "text", "text": "reply ack"}]}}}
    states = []
    async with c.stream("POST", RPC, json=body, headers={"Accept": "text/event-stream"}) as r:
        async for line in r.aiter_lines():
            if not line.startswith("data:"): continue
            f = json.loads(line[5:].strip()).get("result", {})
            if f.get("kind") == "status-update":
                states.append(f["status"]["state"])
            if f.get("final"): break
    assert states[0] == "TASK_STATE_SUBMITTED", states
    assert states[1] == "TASK_STATE_WORKING", states
    assert states[-1] in ("TASK_STATE_COMPLETED", "TASK_STATE_FAILED"), states
    return f"{len(states)} events: {states[0]}{states[-1]}"


async def t_list_tasks(c):
    res = await rpc(c, "ListTasks", {"pageSize": 5})
    r = res["result"]
    assert "tasks" in r, r
    return f"{len(r['tasks'])} task(s)"


async def t_get_task(c):
    # Need a task id — create one first.
    res = await rpc(c, "SendMessage", {
        "message": {"messageId": str(uuid.uuid4()), "role": "user",
                    "parts": [{"type": "text", "text": "hi"}]}})
    tid = res["result"]["id"]
    got = await rpc(c, "GetTask", {"id": tid})
    assert got["result"]["id"] == tid
    return f"id={tid[:8]}"


async def t_get_task_not_found(c):
    got = await rpc(c, "GetTask", {"id": "definitely-does-not-exist"})
    assert "error" in got, got
    assert got["error"]["code"] == -32001, got["error"]
    return f"correct -32001"


async def t_subscribe_to_task_unknown(c):
    body = {"jsonrpc": "2.0", "id": "sub1", "method": "SubscribeToTask",
            "params": {"id": "does-not-exist-either"}}
    states = []
    async with c.stream("POST", RPC, json=body, headers={"Accept": "text/event-stream"}) as r:
        async for line in r.aiter_lines():
            if not line.startswith("data:"): continue
            f = json.loads(line[5:].strip()).get("result", {})
            states.append(f.get("status", {}).get("state"))
            if f.get("final"): break
    assert states == ["TASK_STATE_FAILED"], states
    return "terminated cleanly"


async def t_cancel_task(c):
    """Race a streaming task and CancelTask. After the dust settles GetTask
    must report CANCELED (not flipped to COMPLETED by a late reply)."""
    sse_body = {"jsonrpc": "2.0", "id": "sx", "method": "SendStreamingMessage",
                "params": {"message": {"messageId": str(uuid.uuid4()), "role": "user",
                                       "parts": [{"type": "text", "text": "count to 100 slowly"}]}}}
    tid_holder = {"tid": None}

    async def stream():
        async with c.stream("POST", RPC, json=sse_body, headers={"Accept": "text/event-stream"}) as r:
            async for line in r.aiter_lines():
                if not line.startswith("data:"): continue
                f = json.loads(line[5:].strip()).get("result", {})
                tid_holder["tid"] = f.get("taskId") or tid_holder["tid"]
                if f.get("final"): return

    stream_task = asyncio.create_task(stream())
    for _ in range(60):
        if tid_holder["tid"]: break
        await asyncio.sleep(0.05)
    assert tid_holder["tid"], "no taskId observed"
    cancel = await rpc(c, "CancelTask", {"id": tid_holder["tid"]})
    assert cancel["result"]["status"]["state"] == "TASK_STATE_CANCELED", cancel
    await stream_task
    await asyncio.sleep(0.5)
    got = await rpc(c, "GetTask", {"id": tid_holder["tid"]})
    assert got["result"]["status"]["state"] == "TASK_STATE_CANCELED", got["result"]["status"]
    return f"CANCELED stuck"


async def t_push_crud(c):
    tid = "push-crud-" + str(uuid.uuid4())[:8]
    created = await rpc(c, "CreateTaskPushNotificationConfig", {
        "taskId": tid,
        "pushNotificationConfig": {"id": "c1", "url": "http://x/", "token": "t"},
    })
    assert created["result"]["id"] == "c1", created
    listed = await rpc(c, "ListTaskPushNotificationConfigs", {"taskId": tid})
    assert any(x.get("id") == "c1" for x in listed["result"]["configs"]), listed
    got = await rpc(c, "GetTaskPushNotificationConfig", {
        "taskId": tid, "pushNotificationConfigId": "c1"})
    assert got["result"]["url"] == "http://x/", got
    deleted = await rpc(c, "DeleteTaskPushNotificationConfig", {
        "taskId": tid, "pushNotificationConfigId": "c1"})
    assert deleted.get("result") is not None or deleted.get("error") is None, deleted
    return "Create/List/Get/Delete all ok"


async def t_push_delivery(c):
    """Register a webhook, fire a streaming task, verify the listener
    receives Submitted/Working/(terminal) events with valid HMAC."""
    WEBHOOK_HITS.clear()
    tid = "push-deliv-" + str(uuid.uuid4())[:8]
    await rpc(c, "CreateTaskPushNotificationConfig", {
        "taskId": tid,
        "pushNotificationConfig": {
            "id": "cfg-1",
            "url": f"http://127.0.0.1:{WEBHOOK_PORT}/hook",
            "token": WEBHOOK_SECRET,
        }})
    body = {"jsonrpc": "2.0", "id": "s1", "method": "SendStreamingMessage",
            "params": {"message": {"messageId": str(uuid.uuid4()), "role": "user",
                                   "parts": [{"type": "text", "text": "hi"}],
                                   "taskId": tid}}}
    async with c.stream("POST", RPC, json=body, headers={"Accept": "text/event-stream"}) as r:
        async for line in r.aiter_lines():
            if not line.startswith("data:"): continue
            f = json.loads(line[5:].strip()).get("result", {})
            if f.get("final"): break
    await asyncio.sleep(1.0)
    assert len(WEBHOOK_HITS) >= 3, f"only {len(WEBHOOK_HITS)} hit(s)"
    kinds = []
    states = []
    for h in WEBHOOK_HITS:
        expected = base64.b64encode(hmac.new(WEBHOOK_SECRET.encode(), h["body"], hashlib.sha256).digest()).decode()
        assert h["sig"] == expected, f"HMAC mismatch: {h['sig']} vs {expected}"
        assert h["tid"] == tid, f"task id mismatch: {h['tid']}"
        d = json.loads(h["body"])
        kinds.append(d.get("kind"))
        if d.get("kind") == "status-update":
            states.append(d["status"]["state"])
    assert "TASK_STATE_SUBMITTED" in states and "TASK_STATE_WORKING" in states, states
    assert "artifact-update" in kinds or "TASK_STATE_COMPLETED" in states or "TASK_STATE_FAILED" in states, (kinds, states)
    return f"{len(WEBHOOK_HITS)} hits ({'/'.join(kinds)}), HMAC all valid"


async def _wait_input_round_trip(c, auth: bool, expected_state: str):
    """Common driver for wait_input / wait_input(auth=true)."""
    prompt = "What is your favorite color?" if not auth else "Please provide your bearer token."
    tool_call = (
        "Call the wait_input tool with prompt='" + prompt + "'"
        + (", auth: true" if auth else "")
        + ". Once you receive the response, echo it back verbatim as the final reply."
    )
    body = {"jsonrpc": "2.0", "id": "w1", "method": "SendStreamingMessage",
            "params": {"message": {"messageId": str(uuid.uuid4()), "role": "user",
                                   "parts": [{"type": "text", "text": tool_call}]}}}
    saw_state = False
    saw_artifact_text = None
    tid = None
    resumed = False

    async with c.stream("POST", RPC, json=body, headers={"Accept": "text/event-stream"}) as r:
        async for line in r.aiter_lines():
            if not line.startswith("data:"): continue
            f = json.loads(line[5:].strip()).get("result", {})
            tid = f.get("taskId") or tid
            kind = f.get("kind")
            if kind == "status-update":
                st = f["status"]["state"]
                if st == expected_state and not resumed:
                    saw_state = True
                    resumed = True
                    await rpc(c, "SendMessage", {
                        "message": {
                            "messageId": str(uuid.uuid4()),
                            "taskId": tid,
                            "role": "user",
                            "parts": [{"type": "text", "text": "chartreuse"}],
                        }}, rpc_id="r1")
            elif kind == "artifact-update":
                saw_artifact_text = f["artifact"]["parts"][0]["text"]
            if f.get("final"): break

    assert saw_state, f"never observed {expected_state}"
    assert saw_artifact_text and "chartreuse" in saw_artifact_text, \
        f"artifact text missing resumed value: {saw_artifact_text!r}"
    return f"{expected_state} observed + artifact carried 'chartreuse'"


async def t_wait_input(c):
    return await _wait_input_round_trip(c, auth=False, expected_state="TASK_STATE_INPUT_REQUIRED")


async def t_wait_auth(c):
    return await _wait_input_round_trip(c, auth=True, expected_state="TASK_STATE_AUTH_REQUIRED")


# ---------- auth-mode tests (skipped unless gateway has tokens set) ---------

async def _probe_auth_mode(c: httpx.AsyncClient) -> bool:
    """Returns True if the gateway requires auth (any non-2xx response to
    an unauthenticated POST), False if it's in dev mode (passes
    everything). Used to decide whether to run the auth-mode tests."""
    r = await c.post(RPC, json={
        "jsonrpc": "2.0", "id": "probe", "method": "GetExtendedAgentCard", "params": {}})
    return r.status_code == 401


async def t_auth_rejects_no_token(c):
    r = await c.post(RPC, json={
        "jsonrpc": "2.0", "id": "a1", "method": "GetExtendedAgentCard", "params": {}})
    assert r.status_code == 401, f"expected 401, got {r.status_code}"
    return "401 returned"


async def t_auth_accepts_valid_bearer(c):
    tok = os.environ["A2A_BEARER_TOKEN"]
    r = await c.post(RPC, headers={"Authorization": f"Bearer {tok}"},
                     json={"jsonrpc": "2.0", "id": "a2", "method": "GetExtendedAgentCard", "params": {}})
    assert r.status_code == 200, f"expected 200, got {r.status_code}: {r.text[:200]}"
    return "200 with valid bearer"


async def t_auth_rejects_bad_bearer(c):
    r = await c.post(RPC, headers={"Authorization": "Bearer NOT-A-REAL-TOKEN"},
                     json={"jsonrpc": "2.0", "id": "a3", "method": "GetExtendedAgentCard", "params": {}})
    assert r.status_code == 401, f"expected 401, got {r.status_code}"
    return "401 with invalid bearer"


async def t_auth_accepts_valid_apikey(c):
    key = os.environ.get("A2A_API_KEY")
    if not key:
        return "skipped — A2A_API_KEY not set"
    r = await c.post(RPC, headers={"X-API-Key": key},
                     json={"jsonrpc": "2.0", "id": "a4", "method": "GetExtendedAgentCard", "params": {}})
    assert r.status_code == 200, f"expected 200, got {r.status_code}"
    return "200 with valid X-API-Key"


# ---------- runner ----------

TESTS = [
    ("agent_card", t_agent_card),
    ("extended_agent_card", t_extended_agent_card),
    ("send_message_text", t_send_message_text),
    ("send_message_raw_image", t_send_message_with_raw_image),
    ("send_message_data_part", t_send_message_with_data_part),
    ("send_message_all_file_types", t_send_message_all_file_types),
    ("streaming", t_streaming),
    ("list_tasks", t_list_tasks),
    ("get_task", t_get_task),
    ("get_task_not_found", t_get_task_not_found),
    ("subscribe_to_task_unknown", t_subscribe_to_task_unknown),
    ("cancel_task", t_cancel_task),
    ("push_crud", t_push_crud),
    ("push_delivery", t_push_delivery),
    ("wait_input", t_wait_input),
    ("wait_auth", t_wait_auth),
]


AUTH_TESTS = [
    ("auth_rejects_no_token",     t_auth_rejects_no_token),
    ("auth_accepts_valid_bearer", t_auth_accepts_valid_bearer),
    ("auth_rejects_bad_bearer",   t_auth_rejects_bad_bearer),
    ("auth_accepts_valid_apikey", t_auth_accepts_valid_apikey),
]


async def main():
    start_webhook_server()
    await asyncio.sleep(0.3)
    async with httpx.AsyncClient(timeout=180) as c:
        auth_enforced = await _probe_auth_mode(c)
        for name, fn in TESTS:
            try:
                msg = await fn(c) if not auth_enforced else "skipped — auth mode (use without RSCLAW_A2A_* env)"
                if auth_enforced:
                    print(f"[SKIP] {name}{msg}")
                    results.append((name, True, msg))
                else:
                    record(name, True, msg)
            except Exception as e:
                record(name, False, f"{type(e).__name__}: {e}")
        if auth_enforced:
            print()
            print("=== gateway in auth-enforced mode; running auth-only tests ===")
            if "A2A_BEARER_TOKEN" not in os.environ:
                print("[SKIP] auth tests — set A2A_BEARER_TOKEN to a value present in "
                      "RSCLAW_A2A_BEARER_TOKENS on the gateway side")
            else:
                for name, fn in AUTH_TESTS:
                    try:
                        msg = await fn(c)
                        record(name, True, msg)
                    except Exception as e:
                        record(name, False, f"{type(e).__name__}: {e}")
    print()
    p = sum(1 for _, ok, _ in results if ok)
    f = len(results) - p
    print(f"=== {p}/{len(results)} passed, {f} failed ===")
    sys.exit(0 if f == 0 else 1)


asyncio.run(main())