use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use bytes::Bytes;
use http_body_util::{BodyExt, Full};
use a2a_protocol_types::error::A2aResult;
use a2a_protocol_types::events::{StreamResponse, TaskStatusUpdateEvent};
use a2a_protocol_types::task::{ContextId, TaskState, TaskStatus};
use a2a_protocol_types::{AgentCapabilities, AgentCard, AgentInterface};
use a2a_protocol_server::builder::RequestHandlerBuilder;
use a2a_protocol_server::dispatch::cors::CorsConfig;
use a2a_protocol_server::dispatch::JsonRpcDispatcher;
use a2a_protocol_server::executor::AgentExecutor;
use a2a_protocol_server::request_context::RequestContext;
use a2a_protocol_server::streaming::EventQueueWriter;
struct EchoExecutor;
impl AgentExecutor for EchoExecutor {
fn execute<'a>(
&'a self,
ctx: &'a RequestContext,
queue: &'a dyn EventQueueWriter,
) -> Pin<Box<dyn Future<Output = A2aResult<()>> + Send + 'a>> {
Box::pin(async move {
queue
.write(StreamResponse::StatusUpdate(TaskStatusUpdateEvent {
task_id: ctx.task_id.clone(),
context_id: ContextId::new(ctx.context_id.clone()),
status: TaskStatus::with_timestamp(TaskState::Completed),
metadata: None,
}))
.await?;
Ok(())
})
}
}
fn make_agent_card() -> AgentCard {
AgentCard {
url: None,
name: "test-agent".into(),
version: "1.0".into(),
description: "Test agent".into(),
supported_interfaces: vec![AgentInterface {
url: "http://localhost:8080".into(),
protocol_binding: "JSONRPC".into(),
protocol_version: "1.0.0".into(),
tenant: None,
}],
provider: None,
icon_url: None,
documentation_url: None,
capabilities: AgentCapabilities::none(),
security_schemes: None,
security_requirements: None,
default_input_modes: vec!["text/plain".into()],
default_output_modes: vec!["text/plain".into()],
skills: vec![],
signatures: None,
}
}
fn make_dispatcher_with_cors() -> Arc<JsonRpcDispatcher> {
let handler = Arc::new(RequestHandlerBuilder::new(EchoExecutor).build().unwrap());
Arc::new(JsonRpcDispatcher::new(handler).with_cors(CorsConfig::permissive()))
}
fn make_dispatcher_with_agent_card() -> Arc<JsonRpcDispatcher> {
let handler = Arc::new(
RequestHandlerBuilder::new(EchoExecutor)
.with_agent_card(make_agent_card())
.build()
.unwrap(),
);
Arc::new(JsonRpcDispatcher::new(handler))
}
fn make_dispatcher_with_cors_and_card() -> Arc<JsonRpcDispatcher> {
let handler = Arc::new(
RequestHandlerBuilder::new(EchoExecutor)
.with_agent_card(make_agent_card())
.build()
.unwrap(),
);
Arc::new(JsonRpcDispatcher::new(handler).with_cors(CorsConfig::new("https://example.com")))
}
fn make_plain_dispatcher() -> Arc<JsonRpcDispatcher> {
let handler = Arc::new(RequestHandlerBuilder::new(EchoExecutor).build().unwrap());
Arc::new(JsonRpcDispatcher::new(handler))
}
async fn start_server(dispatcher: Arc<JsonRpcDispatcher>) -> std::net::SocketAddr {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
loop {
let (stream, _) = match listener.accept().await {
Ok(s) => s,
Err(_) => break,
};
let io = hyper_util::rt::TokioIo::new(stream);
let d = Arc::clone(&dispatcher);
tokio::spawn(async move {
let service = hyper::service::service_fn(move |req| {
let d = Arc::clone(&d);
async move { Ok::<_, std::convert::Infallible>(d.dispatch(req).await) }
});
let _ = hyper_util::server::conn::auto::Builder::new(
hyper_util::rt::TokioExecutor::new(),
)
.serve_connection(io, service)
.await;
});
}
});
addr
}
async fn http_request(
addr: std::net::SocketAddr,
method: &str,
path: &str,
body: Option<&str>,
content_type: Option<&str>,
) -> (u16, String, hyper::HeaderMap) {
let client = hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
.build_http::<Full<Bytes>>();
let mut builder = hyper::Request::builder()
.method(method)
.uri(format!("http://{addr}{path}"));
if let Some(ct) = content_type {
builder = builder.header("content-type", ct);
}
let body_bytes = body.unwrap_or("").as_bytes().to_vec();
let req = builder.body(Full::new(Bytes::from(body_bytes))).unwrap();
let resp = client.request(req).await.unwrap();
let status = resp.status().as_u16();
let headers = resp.headers().clone();
let body = resp.collect().await.unwrap().to_bytes();
(status, String::from_utf8_lossy(&body).into_owned(), headers)
}
async fn post_jsonrpc(addr: std::net::SocketAddr, body: &str) -> (u16, String, hyper::HeaderMap) {
http_request(addr, "POST", "/", Some(body), Some("application/json")).await
}
#[test]
fn debug_impl_for_jsonrpc_dispatcher() {
let handler = Arc::new(RequestHandlerBuilder::new(EchoExecutor).build().unwrap());
let dispatcher = JsonRpcDispatcher::new(handler);
let debug_str = format!("{:?}", dispatcher);
assert!(
debug_str.contains("JsonRpcDispatcher"),
"Debug output should contain 'JsonRpcDispatcher'"
);
}
#[test]
fn with_cors_sets_cors_config() {
let handler = Arc::new(RequestHandlerBuilder::new(EchoExecutor).build().unwrap());
let dispatcher = JsonRpcDispatcher::new(handler).with_cors(CorsConfig::permissive());
let debug_str = format!("{:?}", dispatcher);
assert!(debug_str.contains("JsonRpcDispatcher"));
}
#[tokio::test]
async fn cors_preflight_returns_204_with_cors_headers() {
let addr = start_server(make_dispatcher_with_cors()).await;
let (status, _, headers) = http_request(addr, "OPTIONS", "/", None, None).await;
assert_eq!(status, 204, "CORS preflight should return 204");
assert_eq!(
headers.get("access-control-allow-origin").unwrap(),
"*",
"should have CORS origin header"
);
assert!(
headers.get("access-control-allow-methods").is_some(),
"should have CORS methods header"
);
}
#[tokio::test]
async fn options_without_cors_returns_204_no_cors_headers() {
let addr = start_server(make_plain_dispatcher()).await;
let (status, _, headers) = http_request(addr, "OPTIONS", "/", None, None).await;
assert_eq!(status, 204, "OPTIONS without CORS should return 204");
assert!(
headers.get("access-control-allow-origin").is_none(),
"should NOT have CORS headers when CORS not configured"
);
}
#[tokio::test]
async fn agent_card_get_returns_card_when_configured() {
let addr = start_server(make_dispatcher_with_agent_card()).await;
let (status, body, _) =
http_request(addr, "GET", "/.well-known/agent-card.json", None, None).await;
assert_eq!(status, 200, "agent card GET should return 200");
let v: serde_json::Value = serde_json::from_str(&body).unwrap();
assert_eq!(v["name"], "test-agent");
}
#[tokio::test]
async fn agent_card_get_returns_404_when_not_configured() {
let addr = start_server(make_plain_dispatcher()).await;
let (status, body, _) =
http_request(addr, "GET", "/.well-known/agent-card.json", None, None).await;
assert_eq!(
status, 404,
"agent card GET should return 404 when not configured"
);
assert!(body.contains("agent card not configured"));
}
#[tokio::test]
async fn agent_card_get_with_cors_has_cors_headers() {
let addr = start_server(make_dispatcher_with_cors_and_card()).await;
let (status, _, headers) =
http_request(addr, "GET", "/.well-known/agent-card.json", None, None).await;
assert_eq!(status, 200);
assert_eq!(
headers.get("access-control-allow-origin").unwrap(),
"https://example.com",
"agent card response should include CORS headers"
);
}
#[tokio::test]
async fn regular_dispatch_with_cors_has_cors_headers() {
let addr = start_server(make_dispatcher_with_cors()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "GetTask",
"id": "cors-test",
"params": { "id": "nonexistent" }
});
let (status, _, headers) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
assert_eq!(
headers.get("access-control-allow-origin").unwrap(),
"*",
"regular dispatch should include CORS headers"
);
}
#[tokio::test]
async fn unsupported_content_type_returns_parse_error() {
let addr = start_server(make_plain_dispatcher()).await;
let (status, body, _) = http_request(addr, "POST", "/", Some("{}"), Some("text/xml")).await;
assert_eq!(status, 200);
assert!(
body.contains("Parse error"),
"should return parse error for unsupported content type"
);
assert!(body.contains("unsupported Content-Type"));
}
#[tokio::test]
async fn a2a_content_type_is_accepted() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "GetTask",
"id": "ct-test",
"params": { "id": "nonexistent" }
});
let (status, resp, _) = http_request(
addr,
"POST",
"/",
Some(&body.to_string()),
Some("application/a2a+json"),
)
.await;
assert_eq!(status, 200);
assert!(
!resp.contains("unsupported Content-Type"),
"application/a2a+json should be accepted"
);
}
#[tokio::test]
async fn invalid_json_body_returns_parse_error() {
let addr = start_server(make_plain_dispatcher()).await;
let (status, body, _) = http_request(
addr,
"POST",
"/",
Some("not json at all"),
Some("application/json"),
)
.await;
assert_eq!(status, 200);
assert!(body.contains("Parse error"));
}
#[tokio::test]
async fn valid_json_but_invalid_rpc_returns_parse_error() {
let addr = start_server(make_plain_dispatcher()).await;
let (status, body, _) = post_jsonrpc(addr, r#"{"foo": "bar"}"#).await;
assert_eq!(status, 200);
assert!(body.contains("Parse error") || body.contains("error"));
}
#[tokio::test]
async fn send_message_returns_completed_task() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "SendMessage",
"id": "send-1",
"params": {
"message": {
"messageId": "msg-1",
"role": "ROLE_USER",
"parts": [{"text": "hello"}]
}
}
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("result").is_some() || v.get("error").is_some(),
"SendMessage should return a result or error"
);
}
#[tokio::test]
async fn send_message_missing_params_returns_error() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "SendMessage",
"id": "send-no-params"
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"SendMessage without params should error"
);
}
#[tokio::test]
async fn send_message_invalid_params_returns_error() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "SendMessage",
"id": "send-bad-params",
"params": { "invalid": true }
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"SendMessage with invalid params should error"
);
}
#[tokio::test]
async fn send_message_in_batch_returns_result() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!([{
"jsonrpc": "2.0",
"method": "SendMessage",
"id": "batch-send",
"params": {
"message": {
"messageId": "msg-batch",
"role": "ROLE_USER",
"parts": [{"text": "hello from batch"}]
}
}
}]);
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
let arr = v.as_array().unwrap();
assert_eq!(arr.len(), 1);
assert!(
arr[0].get("result").is_some() || arr[0].get("error").is_some(),
"batch SendMessage should return result or error"
);
}
#[tokio::test]
async fn get_task_not_found() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "GetTask",
"id": "get-1",
"params": { "id": "nonexistent-task" }
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"GetTask for nonexistent should error"
);
}
#[tokio::test]
async fn get_task_missing_params() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "GetTask",
"id": "get-no-params"
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(v.get("error").is_some());
}
#[tokio::test]
async fn list_tasks_returns_result() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "ListTasks",
"id": "list-1",
"params": {}
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("result").is_some(),
"ListTasks with empty params should return a result, got: {v}"
);
}
#[tokio::test]
async fn list_tasks_missing_params() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "ListTasks",
"id": "list-no-params"
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(v.get("error").is_some());
}
#[tokio::test]
async fn cancel_task_not_found() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "CancelTask",
"id": "cancel-1",
"params": { "id": "nonexistent-task" }
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"CancelTask for nonexistent should error"
);
}
#[tokio::test]
async fn cancel_task_missing_params() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "CancelTask",
"id": "cancel-no-params"
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(v.get("error").is_some());
}
#[tokio::test]
async fn create_push_config_missing_params() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "CreateTaskPushNotificationConfig",
"id": "push-create-no-params"
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(v.get("error").is_some());
}
#[tokio::test]
async fn create_push_config_with_params() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "CreateTaskPushNotificationConfig",
"id": "push-create",
"params": {
"taskId": "task-1",
"pushNotificationConfig": {
"url": "https://example.com/webhook",
"token": "test-token"
}
}
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"CreateTaskPushNotificationConfig with malformed params should return error, got: {v}"
);
}
#[tokio::test]
async fn get_push_config_missing_params() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "GetTaskPushNotificationConfig",
"id": "push-get-no-params"
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(v.get("error").is_some());
}
#[tokio::test]
async fn get_push_config_with_params() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "GetTaskPushNotificationConfig",
"id": "push-get",
"params": {
"taskId": "task-1",
"pushNotificationConfigId": "config-1"
}
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"GetTaskPushNotificationConfig for nonexistent should return error, got: {v}"
);
}
#[tokio::test]
async fn list_push_configs_missing_params() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "ListTaskPushNotificationConfigs",
"id": "push-list-no-params"
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(v.get("error").is_some());
}
#[tokio::test]
async fn list_push_configs_with_params() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "ListTaskPushNotificationConfigs",
"id": "push-list",
"params": {
"taskId": "task-1"
}
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("result").is_some(),
"ListTaskPushNotificationConfigs should return result, got: {v}"
);
}
#[tokio::test]
async fn delete_push_config_missing_params() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "DeleteTaskPushNotificationConfig",
"id": "push-delete-no-params"
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(v.get("error").is_some());
}
#[tokio::test]
async fn delete_push_config_with_params() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "DeleteTaskPushNotificationConfig",
"id": "push-delete",
"params": {
"taskId": "task-1",
"pushNotificationConfigId": "config-1"
}
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(v.get("error").is_some(), "DeleteTaskPushNotificationConfig with mismatched param names should return error, got: {v}");
}
#[tokio::test]
async fn get_extended_agent_card_returns_error_when_not_configured() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "GetExtendedAgentCard",
"id": "ext-card"
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"GetExtendedAgentCard should error when not configured"
);
}
#[tokio::test]
async fn unknown_method_returns_method_not_found() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "CompletelyFakeMethod",
"id": "fake-1",
"params": {}
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
assert!(resp.contains("Method not found"));
}
#[tokio::test]
async fn send_streaming_message_returns_sse() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "SendStreamingMessage",
"id": "stream-1",
"params": {
"message": {
"messageId": "msg-stream",
"role": "ROLE_USER",
"parts": [{"text": "hello stream"}]
}
}
});
let (status, resp, headers) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let ct = headers
.get("content-type")
.map(|v| v.to_str().unwrap_or(""));
assert!(
ct.is_some_and(|c| c.contains("text/event-stream")),
"SendStreamingMessage should return SSE, got content-type: {:?}, body: {}",
ct,
&resp[..resp.len().min(200)]
);
}
#[tokio::test]
async fn send_streaming_message_missing_params_returns_error() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "SendStreamingMessage",
"id": "stream-no-params"
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"SendStreamingMessage without params should error"
);
}
#[tokio::test]
async fn subscribe_to_task_missing_params_returns_error() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "SubscribeToTask",
"id": "sub-no-params"
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"SubscribeToTask without params should error"
);
}
#[tokio::test]
async fn subscribe_to_task_nonexistent_returns_error() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "SubscribeToTask",
"id": "sub-nonexist",
"params": { "id": "nonexistent-task-id" }
});
let (status, resp, headers) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let ct = headers
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if ct.contains("text/event-stream") {
assert!(!resp.is_empty());
} else {
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"SubscribeToTask for nonexistent task should error"
);
}
}
#[tokio::test]
async fn batch_with_multiple_methods() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!([
{
"jsonrpc": "2.0",
"method": "GetTask",
"id": "batch-get",
"params": { "id": "nonexistent" }
},
{
"jsonrpc": "2.0",
"method": "CancelTask",
"id": "batch-cancel",
"params": { "id": "nonexistent" }
},
{
"jsonrpc": "2.0",
"method": "ListTasks",
"id": "batch-list",
"params": {}
},
{
"jsonrpc": "2.0",
"method": "CompletelyFakeMethod",
"id": "batch-fake",
"params": {}
}
]);
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
let arr = v.as_array().unwrap();
assert_eq!(arr.len(), 4, "batch should return 4 responses");
let fake_resp = arr
.iter()
.find(|r| r["id"] == "batch-fake")
.expect("should find batch-fake response");
assert!(fake_resp.get("error").is_some());
}
#[tokio::test]
async fn batch_with_push_methods() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!([
{
"jsonrpc": "2.0",
"method": "CreateTaskPushNotificationConfig",
"id": "batch-push-create",
"params": {
"taskId": "task-1",
"pushNotificationConfig": {
"url": "https://example.com/webhook",
"token": "test-token"
}
}
},
{
"jsonrpc": "2.0",
"method": "GetTaskPushNotificationConfig",
"id": "batch-push-get",
"params": {
"taskId": "task-1",
"pushNotificationConfigId": "config-1"
}
},
{
"jsonrpc": "2.0",
"method": "ListTaskPushNotificationConfigs",
"id": "batch-push-list",
"params": {
"taskId": "task-1"
}
},
{
"jsonrpc": "2.0",
"method": "DeleteTaskPushNotificationConfig",
"id": "batch-push-delete",
"params": {
"taskId": "task-1",
"pushNotificationConfigId": "config-1"
}
}
]);
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
let arr = v.as_array().unwrap();
assert_eq!(arr.len(), 4, "batch should return 4 responses");
}
#[tokio::test]
async fn batch_with_extended_agent_card() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!([{
"jsonrpc": "2.0",
"method": "GetExtendedAgentCard",
"id": "batch-ext-card"
}]);
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
let arr = v.as_array().unwrap();
assert_eq!(arr.len(), 1);
assert!(arr[0].get("error").is_some());
}
#[tokio::test]
async fn no_content_type_header_still_processes_request() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "GetTask",
"id": "no-ct",
"params": { "id": "nonexistent" }
});
let (status, resp, _) = http_request(addr, "POST", "/", Some(&body.to_string()), None).await;
assert_eq!(status, 200);
let _v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
!resp.contains("unsupported Content-Type"),
"missing content-type should not trigger unsupported error"
);
}
#[tokio::test]
async fn send_message_in_batch_with_bad_params() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!([{
"jsonrpc": "2.0",
"method": "SendMessage",
"id": "batch-send-bad",
"params": { "invalid_field": true }
}]);
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
let arr = v.as_array().unwrap();
assert_eq!(arr.len(), 1);
assert!(
arr[0].get("error").is_some(),
"SendMessage with bad params in batch should error"
);
}
#[tokio::test]
async fn agent_card_404_with_cors_has_cors_headers() {
let dispatcher = make_dispatcher_with_cors(); let addr = start_server(dispatcher).await;
let (status, _, headers) =
http_request(addr, "GET", "/.well-known/agent-card.json", None, None).await;
assert_eq!(status, 404);
assert!(
headers.get("access-control-allow-origin").is_some(),
"404 agent card response with CORS should still have CORS headers"
);
}
#[tokio::test]
async fn send_streaming_message_in_batch_returns_error() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!([{
"jsonrpc": "2.0",
"method": "SendStreamingMessage",
"id": "batch-stream",
"params": {
"message": {
"messageId": "msg-batch-stream",
"role": "ROLE_USER",
"parts": [{"text": "hello"}]
}
}
}]);
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
let arr = v.as_array().unwrap();
assert_eq!(arr.len(), 1);
assert!(
arr[0].get("error").is_some(),
"SendStreamingMessage in batch should return error"
);
let err_msg = arr[0]["error"]["message"].as_str().unwrap_or("");
assert!(
err_msg.contains("not supported in batch"),
"error message should mention batch unsupported, got: {err_msg}"
);
}
#[tokio::test]
async fn subscribe_to_task_in_batch_returns_error() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!([{
"jsonrpc": "2.0",
"method": "SubscribeToTask",
"id": "batch-subscribe",
"params": { "id": "some-task-id" }
}]);
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
let arr = v.as_array().unwrap();
assert_eq!(arr.len(), 1);
assert!(
arr[0].get("error").is_some(),
"SubscribeToTask in batch should return error"
);
let err_msg = arr[0]["error"]["message"].as_str().unwrap_or("");
assert!(
err_msg.contains("not supported in batch"),
"error message should mention batch unsupported, got: {err_msg}"
);
}
#[test]
fn with_config_constructor() {
use a2a_protocol_server::dispatch::DispatchConfig;
let handler = Arc::new(RequestHandlerBuilder::new(EchoExecutor).build().unwrap());
let config = DispatchConfig {
max_request_body_size: 1024,
..DispatchConfig::default()
};
let dispatcher = JsonRpcDispatcher::with_config(handler, config);
let debug_str = format!("{:?}", dispatcher);
assert!(debug_str.contains("JsonRpcDispatcher"));
}
#[tokio::test]
async fn empty_batch_returns_parse_error() {
let addr = start_server(make_plain_dispatcher()).await;
let (status, resp, _) = post_jsonrpc(addr, "[]").await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"empty batch should return parse error"
);
assert!(
resp.contains("empty batch"),
"error message should mention empty batch"
);
}
#[tokio::test]
async fn batch_with_invalid_item_returns_individual_error() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!([
42,
{
"jsonrpc": "2.0",
"method": "GetTask",
"id": "batch-valid",
"params": { "id": "nonexistent" }
}
]);
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
let arr = v.as_array().unwrap();
assert_eq!(
arr.len(),
2,
"should return 2 responses (one error, one real)"
);
assert!(
arr[0].get("error").is_some(),
"invalid batch item should produce error"
);
}
#[tokio::test]
async fn dispatcher_trait_impl_works() {
use a2a_protocol_server::serve::Dispatcher;
use http_body_util::Full;
let handler = Arc::new(RequestHandlerBuilder::new(EchoExecutor).build().unwrap());
let dispatcher = JsonRpcDispatcher::new(handler);
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "GetTask",
"id": "trait-test",
"params": { "id": "nonexistent" }
});
let _req = hyper::Request::builder()
.method("POST")
.uri("/")
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body.to_string())))
.unwrap();
let _: &dyn Dispatcher = &dispatcher;
}
#[tokio::test]
async fn send_streaming_message_error_returns_json_error() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "SendStreamingMessage",
"id": "stream-bad",
"params": { "bad_field": true }
});
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"SendStreamingMessage with bad params should return error"
);
}
#[tokio::test]
async fn subscribe_to_active_task_returns_sse() {
let addr = start_server(make_plain_dispatcher()).await;
let send_body = serde_json::json!({
"jsonrpc": "2.0",
"method": "SendStreamingMessage",
"id": "sub-setup",
"params": {
"message": {
"messageId": "msg-sub-setup",
"role": "ROLE_USER",
"parts": [{"text": "hello"}]
}
}
});
let _ = post_jsonrpc(addr, &send_body.to_string()).await;
}
#[tokio::test]
async fn batch_with_multiple_invalid_items() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!([
"not a valid rpc object",
null,
{
"jsonrpc": "2.0",
"method": "GetTask",
"id": "batch-ok",
"params": { "id": "nonexistent" }
}
]);
let (status, resp, _) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
let arr = v.as_array().unwrap();
assert_eq!(arr.len(), 3, "should return 3 responses");
assert!(arr[0].get("error").is_some());
assert!(arr[1].get("error").is_some());
}
#[tokio::test]
async fn subscribe_to_task_error_returns_json_error() {
let addr = start_server(make_plain_dispatcher()).await;
let body = serde_json::json!({
"jsonrpc": "2.0",
"method": "SubscribeToTask",
"id": "sub-error",
"params": { "id": "definitely-nonexistent-task" }
});
let (status, resp, headers) = post_jsonrpc(addr, &body.to_string()).await;
assert_eq!(status, 200);
let ct = headers
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if !ct.contains("text/event-stream") {
let v: serde_json::Value = serde_json::from_str(&resp).unwrap();
assert!(
v.get("error").is_some(),
"SubscribeToTask error should return JSON error"
);
}
}