use super::*;
#[tokio::test]
async fn rest_send_message() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let body = serde_json::to_vec(&make_send_params()).unwrap();
let req = hyper::Request::builder()
.method("POST")
.uri(format!("http://{addr}/message:send"))
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 200);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let result: SendMessageResponse = serde_json::from_slice(&body).expect("parse response");
match result {
SendMessageResponse::Task(task) => {
assert_eq!(task.status.state, TaskState::Completed);
}
_ => panic!("expected Task variant"),
}
}
#[tokio::test]
async fn rest_send_streaming() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let body = serde_json::to_vec(&make_send_params()).unwrap();
let req = hyper::Request::builder()
.method("POST")
.uri(format!("http://{addr}/message:stream"))
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 200);
assert_eq!(
resp.headers()
.get("content-type")
.and_then(|v| v.to_str().ok()),
Some("text/event-stream")
);
}
#[tokio::test]
async fn rest_get_task_not_found() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let req = hyper::Request::builder()
.method("GET")
.uri(format!("http://{addr}/tasks/nonexistent"))
.body(Full::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 404);
}
#[tokio::test]
async fn rest_list_tasks() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let req = hyper::Request::builder()
.method("GET")
.uri(format!("http://{addr}/tasks"))
.body(Full::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 200);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let result: serde_json::Value = serde_json::from_slice(&body).expect("parse");
let tasks = result
.get("tasks")
.expect("response should contain 'tasks' field");
assert!(tasks.is_array(), "tasks should be an array");
}
#[tokio::test]
async fn rest_cancel_nonexistent_task() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let req = hyper::Request::builder()
.method("POST")
.uri(format!("http://{addr}/tasks/no-such-task:cancel"))
.body(Full::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 404);
}
#[tokio::test]
async fn rest_subscribe_nonexistent_task() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let req = hyper::Request::builder()
.method("POST")
.uri(format!("http://{addr}/tasks/no-such-task:subscribe"))
.body(Full::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 404);
}
#[tokio::test]
async fn rest_wellknown_agent_card() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let req = hyper::Request::builder()
.method("GET")
.uri(format!("http://{addr}/.well-known/agent-card.json"))
.body(Full::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 200);
assert_eq!(
resp.headers()
.get("content-type")
.and_then(|v| v.to_str().ok()),
Some("application/json")
);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let card: AgentCard = serde_json::from_slice(&body).expect("parse card");
assert_eq!(card.name, "Test Agent");
}
#[tokio::test]
async fn rest_extended_agent_card() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let req = hyper::Request::builder()
.method("GET")
.uri(format!("http://{addr}/extendedAgentCard"))
.body(Full::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 200);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let card: AgentCard = serde_json::from_slice(&body).expect("parse card");
assert_eq!(card.name, "Test Agent");
}
#[tokio::test]
async fn rest_not_found_route() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let req = hyper::Request::builder()
.method("GET")
.uri(format!("http://{addr}/nonexistent/route"))
.body(Full::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 404);
}
#[tokio::test]
async fn rest_push_config_crud() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let config = TaskPushNotificationConfig::new("task-1", "https://example.com/hook");
let body = serde_json::to_vec(&config).unwrap();
let req = hyper::Request::builder()
.method("POST")
.uri(format!(
"http://{addr}/tasks/task-1/pushNotificationConfigs"
))
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 200);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let created: TaskPushNotificationConfig = serde_json::from_slice(&body).expect("parse config");
let config_id_val = created
.id
.as_ref()
.expect("created config should have an ID");
assert!(!config_id_val.is_empty(), "config ID should be non-empty");
let config_id = created.id.unwrap();
let req = hyper::Request::builder()
.method("GET")
.uri(format!(
"http://{addr}/tasks/task-1/pushNotificationConfigs/{config_id}"
))
.body(Full::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 200);
let req = hyper::Request::builder()
.method("GET")
.uri(format!(
"http://{addr}/tasks/task-1/pushNotificationConfigs"
))
.body(Full::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 200);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let list_resp: a2a_protocol_types::responses::ListPushConfigsResponse =
serde_json::from_slice(&body).expect("parse list");
assert_eq!(list_resp.configs.len(), 1);
let req = hyper::Request::builder()
.method("DELETE")
.uri(format!(
"http://{addr}/tasks/task-1/pushNotificationConfigs/{config_id}"
))
.body(Full::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 200);
}
#[tokio::test]
async fn rest_push_config_not_supported_error_status() {
let handler = Arc::new(
RequestHandlerBuilder::new(SimpleExecutor)
.with_agent_card(minimal_agent_card())
.build()
.expect("build handler"),
);
let dispatcher = Arc::new(RestDispatcher::new(handler));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind");
let addr = listener.local_addr().expect("local addr");
tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let io = hyper_util::rt::TokioIo::new(stream);
let d = Arc::clone(&dispatcher);
let service = hyper::service::service_fn(move |req| {
let d = Arc::clone(&d);
async move { Ok::<_, std::convert::Infallible>(d.dispatch(req).await) }
});
let _ = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new())
.serve_connection(io, service)
.await;
});
let client = http_client();
let config = TaskPushNotificationConfig::new("task-1", "https://example.com/hook");
let body = serde_json::to_vec(&config).unwrap();
let req = hyper::Request::builder()
.method("POST")
.uri(format!(
"http://{addr}/tasks/task-1/pushNotificationConfigs"
))
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 400);
}
#[tokio::test]
async fn rest_send_then_get_task() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let body = serde_json::to_vec(&make_send_params()).unwrap();
let req = hyper::Request::builder()
.method("POST")
.uri(format!("http://{addr}/message:send"))
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap();
let resp = client.request(req).await.expect("send");
assert_eq!(resp.status(), 200);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let result: SendMessageResponse = serde_json::from_slice(&body).expect("parse response");
let task_id = match result {
SendMessageResponse::Task(t) => t.id.0,
_ => panic!("expected Task variant"),
};
let req = hyper::Request::builder()
.method("GET")
.uri(format!("http://{addr}/tasks/{task_id}"))
.body(Full::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.expect("get");
assert_eq!(resp.status(), 200);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let fetched: Task = serde_json::from_slice(&body).expect("parse fetched task");
assert_eq!(fetched.id.0, task_id);
assert_eq!(fetched.status.state, TaskState::Completed);
}
#[tokio::test]
async fn rest_response_has_a2a_version_header() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let body = serde_json::to_vec(&make_send_params()).unwrap();
let req = hyper::Request::builder()
.method("POST")
.uri(format!("http://{addr}/message:send"))
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap();
let resp = client.request(req).await.expect("send");
assert_eq!(resp.status(), 200);
assert_eq!(
resp.headers()
.get("A2A-Version")
.and_then(|v| v.to_str().ok()),
Some("1.0.0"),
"response should have A2A-Version: 1.0.0 header"
);
assert_eq!(
resp.headers()
.get("content-type")
.and_then(|v| v.to_str().ok()),
Some("application/a2a+json"),
"response should have application/a2a+json content type"
);
}
#[tokio::test]
async fn rest_tenant_prefix_routing() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let body = serde_json::to_vec(&make_send_params()).unwrap();
let req = hyper::Request::builder()
.method("POST")
.uri(format!("http://{addr}/tenants/acme/message:send"))
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap();
let resp = client.request(req).await.expect("send via tenant prefix");
assert_eq!(resp.status(), 200, "tenant-prefixed route should succeed");
}
#[tokio::test]
async fn rest_get_subscribe_allowed() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let body = serde_json::to_vec(&make_send_params()).unwrap();
let req = hyper::Request::builder()
.method("POST")
.uri(format!("http://{addr}/message:send"))
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap();
let resp = client.request(req).await.expect("send");
let body = resp.into_body().collect().await.unwrap().to_bytes();
let result: SendMessageResponse = serde_json::from_slice(&body).expect("parse");
let task_id = match result {
SendMessageResponse::Task(t) => t.id.0,
_ => panic!("expected Task"),
};
let req = hyper::Request::builder()
.method("GET")
.uri(format!("http://{addr}/tasks/{task_id}:subscribe"))
.body(Full::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.expect("subscribe via GET");
assert_ne!(
resp.status(),
404,
"GET /tasks/:id:subscribe should be routed"
);
}
async fn response_json(
client: &hyper_util::client::legacy::Client<
hyper_util::client::legacy::connect::HttpConnector,
Full<Bytes>,
>,
req: hyper::Request<Full<Bytes>>,
) -> (u16, serde_json::Value) {
let resp = client.request(req).await.expect("request");
let status = resp.status().as_u16();
let body = resp.into_body().collect().await.unwrap().to_bytes();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap_or_else(|e| {
panic!(
"response should be valid JSON: {e}\nbody: {}",
String::from_utf8_lossy(&body)
)
});
(status, json)
}
fn assert_aip193_error(json: &serde_json::Value, expected_code: u16) {
let error = json
.get("error")
.unwrap_or_else(|| panic!("AIP-193: response must have 'error' field.\nGot: {json}"));
assert_eq!(
error.get("code").and_then(|v| v.as_u64()),
Some(u64::from(expected_code)),
"AIP-193: error.code should be {expected_code}.\nGot: {json}"
);
assert!(
error.get("message").and_then(|v| v.as_str()).is_some(),
"AIP-193: error.message should be a non-null string.\nGot: {json}"
);
assert!(
error.get("status").and_then(|v| v.as_str()).is_some(),
"AIP-193: error.status (gRPC status) should be present.\nGot: {json}"
);
}
fn assert_aip193_error_with_details(
json: &serde_json::Value,
expected_code: u16,
expected_reason: &str,
) {
assert_aip193_error(json, expected_code);
let details = json["error"]
.get("details")
.and_then(|v| v.as_array())
.unwrap_or_else(|| panic!("AIP-193: error.details should be an array.\nGot: {json}"));
assert!(
!details.is_empty(),
"AIP-193: error.details should be non-empty.\nGot: {json}"
);
let info = &details[0];
assert_eq!(
info.get("@type").and_then(|v| v.as_str()),
Some("type.googleapis.com/google.rpc.ErrorInfo"),
"AIP-193: details[0].@type should be ErrorInfo.\nGot: {json}"
);
assert_eq!(
info.get("reason").and_then(|v| v.as_str()),
Some(expected_reason),
"AIP-193: details[0].reason should be {expected_reason}.\nGot: {json}"
);
assert_eq!(
info.get("domain").and_then(|v| v.as_str()),
Some("a2a-protocol.org"),
"AIP-193: details[0].domain should be 'a2a-protocol.org'.\nGot: {json}"
);
}
#[tokio::test]
async fn rest_error_task_not_found_has_aip193_body() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let req = hyper::Request::builder()
.method("GET")
.uri(format!("http://{addr}/tasks/nonexistent"))
.body(Full::new(Bytes::new()))
.unwrap();
let (status, json) = response_json(&client, req).await;
assert_eq!(status, 404);
assert_aip193_error_with_details(&json, 404, "TASK_NOT_FOUND");
}
#[tokio::test]
async fn rest_error_cancel_not_found_has_aip193_body() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let req = hyper::Request::builder()
.method("POST")
.uri(format!("http://{addr}/tasks/no-such-task:cancel"))
.body(Full::new(Bytes::new()))
.unwrap();
let (status, json) = response_json(&client, req).await;
assert_eq!(status, 404);
assert_aip193_error_with_details(&json, 404, "TASK_NOT_FOUND");
}
#[tokio::test]
async fn rest_error_subscribe_not_found_has_aip193_body() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let req = hyper::Request::builder()
.method("POST")
.uri(format!("http://{addr}/tasks/no-such-task:subscribe"))
.body(Full::new(Bytes::new()))
.unwrap();
let (status, json) = response_json(&client, req).await;
assert_eq!(status, 404);
assert_aip193_error_with_details(&json, 404, "TASK_NOT_FOUND");
}
#[tokio::test]
async fn rest_error_invalid_json_body_returns_error_object() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let req = hyper::Request::builder()
.method("POST")
.uri(format!("http://{addr}/message:send"))
.header("content-type", "application/json")
.body(Full::new(Bytes::from("{not valid json!!")))
.unwrap();
let (status, json) = response_json(&client, req).await;
assert_eq!(status, 400);
let error = json.get("error").expect("response must have 'error' field");
assert_eq!(error.get("code").and_then(|v| v.as_u64()), Some(400));
assert!(
error.get("message").and_then(|v| v.as_str()).is_some(),
"error.message should be present.\nGot: {json}"
);
}
#[tokio::test]
async fn rest_error_push_not_supported_has_aip193_body() {
let handler = Arc::new(
RequestHandlerBuilder::new(SimpleExecutor)
.with_agent_card(minimal_agent_card())
.build()
.expect("build handler"),
);
let dispatcher = Arc::new(RestDispatcher::new(handler));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("bind");
let addr = listener.local_addr().expect("local addr");
tokio::spawn(async move {
let (stream, _) = listener.accept().await.expect("accept");
let io = hyper_util::rt::TokioIo::new(stream);
let d = Arc::clone(&dispatcher);
let service = hyper::service::service_fn(move |req| {
let d = Arc::clone(&d);
async move { Ok::<_, std::convert::Infallible>(d.dispatch(req).await) }
});
let _ = hyper_util::server::conn::auto::Builder::new(hyper_util::rt::TokioExecutor::new())
.serve_connection(io, service)
.await;
});
let client = http_client();
let config = TaskPushNotificationConfig::new("task-1", "https://example.com/hook");
let body = serde_json::to_vec(&config).unwrap();
let req = hyper::Request::builder()
.method("POST")
.uri(format!(
"http://{addr}/tasks/task-1/pushNotificationConfigs"
))
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap();
let (status, json) = response_json(&client, req).await;
assert_eq!(status, 400);
assert_aip193_error_with_details(&json, 400, "PUSH_NOTIFICATION_NOT_SUPPORTED");
}
#[tokio::test]
async fn rest_streaming_events_are_bare_stream_response() {
let (addr, _handle) = start_rest_server().await;
let client = http_client();
let body = serde_json::to_vec(&make_send_params()).unwrap();
let req = hyper::Request::builder()
.method("POST")
.uri(format!("http://{addr}/message:stream"))
.header("content-type", "application/json")
.body(Full::new(Bytes::from(body)))
.unwrap();
let resp = client.request(req).await.expect("request");
assert_eq!(resp.status(), 200);
let body = resp.into_body().collect().await.unwrap().to_bytes();
let body_str = String::from_utf8_lossy(&body);
let data_lines: Vec<&str> = body_str
.lines()
.filter(|l| l.starts_with("data: "))
.map(|l| l.strip_prefix("data: ").unwrap())
.collect();
assert!(
!data_lines.is_empty(),
"SSE stream should contain at least one data line, got body: {body_str}"
);
for data in &data_lines {
let parsed: serde_json::Value = serde_json::from_str(data)
.unwrap_or_else(|e| panic!("SSE data should be valid JSON: {e}\ndata: {data}"));
assert!(
parsed.get("jsonrpc").is_none(),
"REST SSE event must be bare StreamResponse, not JSON-RPC envelope.\n\
Got: {data}"
);
let is_known = parsed.get("task").is_some()
|| parsed.get("statusUpdate").is_some()
|| parsed.get("artifactUpdate").is_some();
assert!(
is_known,
"SSE event should be a recognized StreamResponse variant.\nGot: {data}"
);
}
}