import asyncio
import json
import httpx
import sys
CLOUD_RUN_URL = "https://a2a-server-rust-fgasxpwzoq-uc.a.run.app"
async def test_a2a_cloud_run(server_url: str):
print(f"๐งช Validating A2A Server at: {server_url}")
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
try:
print("\n๐ Step 1: Discovery...")
print(" Trying GET /agent-card ...")
resp = await client.get(f"{server_url}/agent-card")
if resp.status_code == 200:
print(" โ
GET /agent-card success")
info = resp.json()
else:
print(f" โ GET /agent-card failed: {resp.status_code}")
print(" Trying GET / ...")
resp = await client.get(f"{server_url}/")
if resp.status_code == 200:
print(" โ
GET / success")
info = resp.json()
else:
print(f" โ GET / failed: {resp.status_code}")
return
print(f"๐ Agent Name: {info.get('name')}")
skills = [s.get('name') for s in info.get('skills', [])]
print(f"๐ Skills: {skills}")
print("\n๐งช Step 2: Creating/Sending A2A Task...")
task_id = "cloud-run-test-task-1"
print(" Trying JSON-RPC tasks/send ...")
rpc_payload = {
"jsonrpc": "2.0",
"method": "tasks/get", "params": {
"id": task_id,
"contextId": "cloud-run-validation-context"
},
"id": 2
}
resp = await client.post(f"{server_url}/", json=rpc_payload)
if resp.status_code == 200:
rpc_resp = resp.json()
if "error" in rpc_resp and rpc_resp["error"]["code"] == -32001:
print(f" โ
JSON-RPC reachable (Task not found as expected)")
else:
print(f" โ
JSON-RPC reachable: {rpc_resp}")
else:
print(f" โ JSON-RPC failed: {resp.status_code}")
return
print("\nโ
Basic validation complete. Remote A2A agent is responsive.")
except Exception as e:
print(f"๐ฅ Error during validation: {str(e)}")
if __name__ == "__main__":
url = CLOUD_RUN_URL
if len(sys.argv) > 1:
url = sys.argv[1]
if url.endswith("/"):
url = url[:-1]
asyncio.run(test_a2a_cloud_run(url))