rsclaw 2026.5.20

AI Agent Engine Compatible with OpenClaw
# A2A v1.0 Python SDK Interop Harness

Manual verification that the rsclaw gateway speaks A2A v1.0 to Google's
official Python SDK.

## Setup

```bash
pip install a2a-sdk
# Or, while v1.0 is on the bleeding edge:
pip install git+https://github.com/google-a2a/a2a-python.git
```

## Running the gateway

```bash
cargo run -- gateway start --port 18888
```

Optional auth — set BEFORE starting the gateway:

```bash
export RSCLAW_A2A_BEARER_TOKENS="dev-token-1,dev-token-2"
# or:
export RSCLAW_A2A_API_KEYS="k-1,k-2"
```

## Test 1: Agent Card discovery

```python
import asyncio
import httpx

async def main():
    async with httpx.AsyncClient() as c:
        r = await c.get("http://localhost:18888/.well-known/agent.json")
        card = r.json()
    assert card["protocolVersion"] == "1.0"
    assert card["capabilities"]["streaming"] is True
    assert card["capabilities"]["pushNotifications"] is True
    assert card["capabilities"]["extendedAgentCard"] is True
    assert "bearer" in card["securitySchemes"]
    assert "apiKey" in card["securitySchemes"]
    print("OK:", card["name"], "skills:", len(card["skills"]))

asyncio.run(main())
```

## Test 2: SendMessage (synchronous)

```python
import asyncio, httpx, uuid

async def main():
    body = {
        "jsonrpc": "2.0",
        "id": "t1",
        "method": "SendMessage",
        "params": {
            "message": {
                "messageId": str(uuid.uuid4()),
                "role": "ROLE_USER",
                "parts": [{"type": "text", "text": "hi"}],
            }
        },
    }
    async with httpx.AsyncClient(timeout=120) as c:
        r = await c.post("http://localhost:18888/api/v1/a2a", json=body)
        result = r.json()["result"]
    assert result["status"]["state"] == "TASK_STATE_COMPLETED"
    print("task_id:", result["id"], "→", result["artifacts"][0]["parts"][0]["text"])

asyncio.run(main())
```

## Test 3: SendStreamingMessage (SSE)

```python
import asyncio, httpx, uuid, json

async def main():
    body = {
        "jsonrpc": "2.0",
        "id": "s1",
        "method": "SendStreamingMessage",
        "params": {
            "message": {
                "messageId": str(uuid.uuid4()),
                "role": "ROLE_USER",
                "parts": [{"type": "text", "text": "count to 3"}],
            }
        },
    }
    headers = {"Accept": "text/event-stream"}
    async with httpx.AsyncClient(timeout=120) as c:
        async with c.stream("POST", "http://localhost:18888/api/v1/a2a",
                            json=body, headers=headers) as r:
            async for line in r.aiter_lines():
                if line.startswith("data:"):
                    frame = json.loads(line[5:].strip())
                    res = frame.get("result", {})
                    print(res.get("kind"), res.get("status", {}).get("state"))
                    if res.get("final"):
                        break

asyncio.run(main())
```

## Test 4: GetTask / ListTasks / CancelTask

```python
import asyncio, httpx

async def main():
    async with httpx.AsyncClient(timeout=30) as c:
        listed = (await c.post("http://localhost:18888/api/v1/a2a", json={
            "jsonrpc": "2.0", "id": "l1", "method": "ListTasks",
            "params": {"pageSize": 5}
        })).json()
        tasks = listed["result"]["tasks"]
        if not tasks:
            print("no tasks yet — send one first via Test 2")
            return
        tid = tasks[0]["id"]
        got = (await c.post("http://localhost:18888/api/v1/a2a", json={
            "jsonrpc": "2.0", "id": "g1", "method": "GetTask",
            "params": {"id": tid}
        })).json()
        print(got["result"]["status"])

asyncio.run(main())
```

## Test 5: Push notification

Start a local sink first:

```bash
# Terminal A — echo every webhook to stdout
python -m http.server 9000 &
```

```python
import asyncio, httpx, uuid

async def main():
    tid = str(uuid.uuid4())
    async with httpx.AsyncClient() as c:
        # 1) Register a push config
        await c.post("http://localhost:18888/api/v1/a2a", json={
            "jsonrpc": "2.0", "id": "p1",
            "method": "CreateTaskPushNotificationConfig",
            "params": {
                "taskId": tid,
                "pushNotificationConfig": {
                    "id": "cfg-1",
                    "url": "http://localhost:9000/hook",
                    "token": "dev-secret",
                }
            }
        })
        # 2) Send a streaming task with that taskId.
        body = {
            "jsonrpc": "2.0", "id": "p2",
            "method": "SendStreamingMessage",
            "params": {
                "message": {
                    "messageId": str(uuid.uuid4()),
                    "role": "ROLE_USER",
                    "parts": [{"type": "text", "text": "hello"}],
                    "taskId": tid,
                }
            }
        }
        # ...stream the response and watch the python http.server log
        # for POSTs from the gateway with X-A2A-Signature: <HMAC>.

asyncio.run(main())
```

## Test 6: INPUT_REQUIRED suspend / resume (`wait_input`)

Requires a working LLM with tool-use support (DeepSeek, Qwen, Claude,
GPT, etc.). Verifies the `wait_input` built-in tool and the same-`taskId`
resume short-path end-to-end.

```python
import asyncio, httpx, json, uuid

BASE = "http://localhost:18888/api/v1/a2a"

async def main():
    # 1) Stream a turn that forces the LLM to call wait_input.
    body = {
        "jsonrpc": "2.0", "id": "w1",
        "method": "SendStreamingMessage",
        "params": {"message": {
            "messageId": str(uuid.uuid4()),
            "role": "ROLE_USER",
            "parts": [{"type": "text",
                "text": "Use the wait_input tool to ask the user for their "
                        "favorite color. Echo their answer back as the final "
                        "reply, no other text."}],
        }},
    }
    headers = {"Accept": "text/event-stream"}
    task_id = None
    seen_input_required = False
    artifact_text = None

    async with httpx.AsyncClient(timeout=120) as c:
        async with c.stream("POST", BASE, json=body, headers=headers) as r:
            async for line in r.aiter_lines():
                if not line.startswith("data:"): continue
                frame = json.loads(line[5:].strip())
                res = frame.get("result", {})
                if res.get("kind") == "status-update":
                    state = res["status"]["state"]
                    print("status:", state)
                    task_id = res["taskId"]
                    if state == "TASK_STATE_INPUT_REQUIRED" and not seen_input_required:
                        seen_input_required = True
                        # 2) Resume via SendMessage on the SAME taskId.
                        resume = await c.post(BASE, json={
                            "jsonrpc": "2.0", "id": "r1",
                            "method": "SendMessage",
                            "params": {"message": {
                                "messageId": str(uuid.uuid4()),
                                "taskId": task_id,         # ← same id triggers resume
                                "role": "ROLE_USER",
                                "parts": [{"type": "text", "text": "chartreuse"}],
                            }},
                        })
                        print("resume RPC ok:", resume.status_code)
                elif res.get("kind") == "artifact-update":
                    artifact_text = res["artifact"]["parts"][0]["text"]
                if res.get("final"): break

    assert seen_input_required, "expected TASK_STATE_INPUT_REQUIRED frame"
    assert artifact_text and "chartreuse" in artifact_text, \
        f"expected resumed text in final artifact, got: {artifact_text!r}"
    print("OK — wait_input resumed and final artifact contained the answer")

asyncio.run(main())
```

Expected SSE timeline:

```
SUBMITTED
WORKING
WORKING  (message: "calling tool wait_input")
INPUT_REQUIRED  (message: "What is your favorite color?")
   ↳ client POSTs SendMessage with same taskId carrying "chartreuse"
artifact-update  (parts[0].text contains "chartreuse")
COMPLETED  (final: true)
```

## Expected outcomes

- All 6 tests run without protocol errors
- Agent Card shows `protocolVersion: "1.0"` and all 3 capabilities `true`
- SendMessage returns `TASK_STATE_COMPLETED` with an artifact
- SendStreamingMessage delivers at least one `status-update` event with `final: true`
- Push sink receives signed POSTs with `X-A2A-Signature` and `X-A2A-Task-Id` headers
- `wait_input` round-trip: INPUT_REQUIRED observed → resume RPC succeeds → final artifact contains the resumed text

## Recording

Record outcomes (date, SDK version, pass/fail per test, any wire mismatches)
in `docs/a2a-interop.md` when you run them.