import asyncio
import json
import httpx
async def test_a2a_server():
print("๐งช Validating A2A Server Discovery...")
server_url = "http://localhost:8080"
async with httpx.AsyncClient() as client:
try:
response = await client.get(f"{server_url}/agent-card")
if response.status_code == 200:
print("โ
Discovery endpoint reachable")
info = response.json()
print(f"๐ Agent Name: {info.get('name')}")
print(f"๐ Skills: {[s.get('name') for s in info.get('skills', [])]}")
else:
print(f"โ Discovery endpoint failed: {response.status_code}")
return
print("\n๐งช Sending Echo Message via JSON-RPC...")
test_message = "Hello Rust A2A Server! Echo this."
rpc_payload = {
"jsonrpc": "2.0",
"method": "tasks/send",
"params": {
"id": "test-task-python",
"message": {
"role": "user",
"parts": [{"kind": "text", "text": test_message}],
"messageId": "msg-python-1",
"kind": "message"
}
},
"id": 1
}
response = await client.post(f"{server_url}/", json=rpc_payload)
if response.status_code == 200:
result = response.json()
if "error" in result:
print(f"โ JSON-RPC Error: {result['error']}")
return
print("โ
Message processed")
task = result.get('result', {})
print(f"๐ Task State: {task.get('status', {}).get('state')}")
history = task.get('history', [])
if history:
agent_msgs = [m for m in history if m.get('role') == 'agent']
if agent_msgs:
last_msg = agent_msgs[-1]
print(f"๐ค Agent Response: {json.dumps(last_msg, indent=2)}")
else:
print("โ ๏ธ No agent response found in history.")
else:
print("โ ๏ธ No message history returned in the response.")
else:
print(f"โ Message processing failed: {response.status_code}")
print(response.text)
except Exception as e:
print(f"๐ฅ Error during validation: {str(e)}")
if __name__ == "__main__":
asyncio.run(test_a2a_server())