use car_a2a::{
serve, serve_with_auth, A2aClient, A2aDispatcher, AgentCapabilities, AgentCard,
AgentCardSource, AgentInterface, AgentProvider, AuthValidator, BearerKeyAuth, ClientAuth,
InMemoryTaskStore, TransportProtocol,
};
use car_engine::{Runtime, ToolExecutor};
use car_ir::ToolSchema;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Default)]
struct EchoTool {
gate: Option<Arc<tokio::sync::Notify>>,
}
#[async_trait::async_trait]
impl ToolExecutor for EchoTool {
async fn execute(&self, tool: &str, params: &Value) -> Result<Value, String> {
if tool == "wait_then_echo" {
if let Some(gate) = &self.gate {
gate.notified().await;
}
}
Ok(json!({ "echoed": params.clone() }))
}
}
fn static_card(url: &str) -> AgentCard {
AgentCard {
name: "CAR test".into(),
description: "integration test".into(),
url: url.into(),
version: "1.0.0".into(),
protocol_version: "1.0".into(),
preferred_transport: Some("JSONRPC".into()),
provider: AgentProvider {
organization: "Parslee".into(),
url: None,
},
capabilities: AgentCapabilities {
streaming: true,
push_notifications: true,
state_transition_history: false,
extended_agent_card: false,
extensions: Vec::new(),
},
default_input_modes: vec!["data".into()],
default_output_modes: vec!["data".into()],
skills: vec![],
documentation_url: None,
icon_url: None,
supported_interfaces: vec![],
additional_interfaces: vec![AgentInterface {
url: url.into(),
protocol_binding: "JSONRPC".into(),
transport: Some(TransportProtocol::JsonRpc),
tenant: None,
protocol_version: "1.0".into(),
}],
security_schemes: HashMap::new(),
supports_authenticated_extended_card: false,
security_requirements: vec![],
signatures: vec![],
}
}
async fn boot() -> (std::net::SocketAddr, tokio::task::JoinHandle<()>) {
boot_with_gate(None).await.0
}
async fn boot_with_gate(
gate: Option<Arc<tokio::sync::Notify>>,
) -> ((std::net::SocketAddr, tokio::task::JoinHandle<()>), ()) {
let runtime = Arc::new(Runtime::new());
for name in ["echo", "wait_then_echo"] {
runtime
.register_tool_schema(ToolSchema {
name: name.into(),
description: "Echoes parameters back".into(),
parameters: json!({"type": "object"}),
returns: None,
idempotent: true,
cache_ttl_secs: None,
rate_limit: None,
})
.await;
}
runtime.set_executor(Arc::new(EchoTool { gate })).await;
let store = Arc::new(InMemoryTaskStore::new());
let card_factory: Arc<AgentCardSource> = Arc::new(|| static_card("http://127.0.0.1:0"));
let dispatcher = A2aDispatcher::new(runtime, store, card_factory);
let (addr, handle) = serve(dispatcher, "127.0.0.1:0".parse().unwrap())
.await
.expect("bind");
((addr, handle), ())
}
async fn boot_with_auth(
auth: Arc<dyn AuthValidator>,
) -> (std::net::SocketAddr, tokio::task::JoinHandle<()>) {
let runtime = Arc::new(Runtime::new());
let store = Arc::new(InMemoryTaskStore::new());
let card_factory: Arc<AgentCardSource> = Arc::new(|| static_card("http://127.0.0.1:0"));
let dispatcher = A2aDispatcher::new(runtime, store, card_factory);
let (addr, handle) = serve_with_auth(dispatcher, "127.0.0.1:0".parse().unwrap(), auth)
.await
.expect("bind");
(addr, handle)
}
#[tokio::test]
async fn auth_blocks_request_without_credentials() {
let auth: Arc<dyn AuthValidator> = Arc::new(BearerKeyAuth::single("secret"));
let (addr, _handle) = boot_with_auth(auth).await;
let resp = reqwest::Client::new()
.post(format!("http://{}/", addr))
.json(&json!({
"jsonrpc": "2.0",
"method": "agent/getAuthenticatedExtendedCard",
"params": null,
"id": 1
}))
.send()
.await
.expect("send");
assert_eq!(resp.status().as_u16(), 401);
assert!(resp.headers().contains_key("www-authenticate"));
}
#[tokio::test]
async fn auth_allows_request_with_valid_bearer() {
let auth: Arc<dyn AuthValidator> = Arc::new(BearerKeyAuth::single("secret"));
let (addr, _handle) = boot_with_auth(auth).await;
let resp = reqwest::Client::new()
.post(format!("http://{}/", addr))
.header("Authorization", "Bearer secret")
.json(&json!({
"jsonrpc": "2.0",
"method": "agent/getAuthenticatedExtendedCard",
"params": null,
"id": 1
}))
.send()
.await
.expect("send");
assert!(resp.status().is_success());
}
#[tokio::test]
async fn auth_does_not_block_well_known_agent_card() {
let auth: Arc<dyn AuthValidator> = Arc::new(BearerKeyAuth::single("secret"));
let (addr, _handle) = boot_with_auth(auth).await;
let resp = reqwest::get(format!("http://{}/.well-known/agent-card.json", addr))
.await
.expect("send");
assert!(resp.status().is_success());
}
#[tokio::test]
async fn message_send_invokes_real_tool_and_yields_artifact() {
let (addr, _handle) = boot().await;
let client = reqwest::Client::new();
let body = json!({
"jsonrpc": "2.0",
"method": "message/send",
"params": {
"message": {
"messageId": "m-int-1",
"role": "user",
"parts": [{
"kind": "data",
"data": { "tool": "echo", "parameters": { "n": 42 } }
}]
},
"configuration": { "blocking": true }
},
"id": 1
});
let resp = client
.post(format!("http://{}/", addr))
.json(&body)
.send()
.await
.expect("send");
assert!(resp.status().is_success());
let envelope: Value = resp.json().await.expect("json");
let task = &envelope["result"];
assert_eq!(task["status"]["state"], "completed");
let artifacts = task["artifacts"].as_array().expect("artifacts");
assert_eq!(artifacts.len(), 1);
let data_part = artifacts[0]["parts"]
.as_array()
.unwrap()
.iter()
.find(|p| p["kind"] == "data")
.expect("data part");
assert_eq!(data_part["data"]["echoed"]["n"], 42);
}
#[tokio::test]
async fn agent_card_endpoint_serves_well_known() {
let (addr, _handle) = boot().await;
let resp = reqwest::get(format!("http://{}/.well-known/agent-card.json", addr))
.await
.expect("get");
assert!(resp.status().is_success());
let card: Value = resp.json().await.expect("json");
assert_eq!(card["version"], "1.0.0");
assert_eq!(card["additionalInterfaces"][0]["transport"], "JSONRPC");
}
#[tokio::test]
async fn message_stream_returns_sse_with_task_lifecycle() {
let (addr, _handle) = boot().await;
let client = reqwest::Client::new();
let body = json!({
"jsonrpc": "2.0",
"method": "message/stream",
"params": {
"message": {
"messageId": "m-stream",
"role": "user",
"parts": [{
"kind": "data",
"data": { "tool": "echo", "parameters": { "x": 1 } }
}]
}
},
"id": 1
});
let resp = client
.post(format!("http://{}/", addr))
.json(&body)
.send()
.await
.expect("send");
assert!(resp.status().is_success());
let ctype = resp
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
assert!(
ctype.starts_with("text/event-stream"),
"expected SSE response, got {}",
ctype
);
let body_text = resp.text().await.expect("body");
assert!(body_text.contains("data:"), "no SSE data frames in body");
assert!(
body_text.contains("\"jsonrpc\":\"2.0\""),
"frames missing JSON-RPC envelope"
);
assert!(
body_text.contains("\"result\":"),
"frames missing `result` field"
);
assert!(
body_text.contains("\"final\":true"),
"no final-status frame in SSE body"
);
}
#[tokio::test]
async fn tasks_get_after_async_send_reflects_terminal_state() {
let (addr, _handle) = boot().await;
let client = reqwest::Client::new();
let send = client
.post(format!("http://{}/", addr))
.json(&json!({
"jsonrpc": "2.0",
"method": "message/send",
"params": {
"message": {
"messageId": "m-async",
"role": "user",
"parts": [{
"kind": "data",
"data": { "tool": "echo", "parameters": {} }
}]
}
},
"id": 1
}))
.send()
.await
.expect("send");
let env: Value = send.json().await.expect("json");
let task_id = env["result"]["id"].as_str().unwrap().to_string();
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
let mut last_state = String::new();
while std::time::Instant::now() < deadline {
let resp: Value = client
.post(format!("http://{}/", addr))
.json(&json!({
"jsonrpc": "2.0",
"method": "tasks/get",
"params": { "id": task_id },
"id": 2
}))
.send()
.await
.expect("send")
.json()
.await
.expect("json");
last_state = resp["result"]["status"]["state"]
.as_str()
.unwrap_or("")
.to_string();
if last_state == "completed" || last_state == "failed" || last_state == "canceled" {
return;
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
panic!("task never reached terminal state, last={}", last_state);
}
#[tokio::test]
async fn dispatcher_accepts_a2a_v1_method_names() {
let (addr, _handle) = boot().await;
let client = reqwest::Client::new();
let body = json!({
"jsonrpc": "2.0",
"method": "SendMessage",
"params": {
"message": {
"messageId": "m-v1",
"role": "user",
"parts": [{
"kind": "data",
"data": { "tool": "echo", "parameters": { "v": 1 } }
}]
},
"configuration": { "blocking": true }
},
"id": 1
});
let resp = client
.post(format!("http://{}/", addr))
.json(&body)
.send()
.await
.expect("send");
assert!(resp.status().is_success());
let envelope: Value = resp.json().await.expect("json");
assert_eq!(envelope["result"]["status"]["state"], "completed");
let task_id = envelope["result"]["id"].as_str().unwrap().to_string();
let get_body = json!({
"jsonrpc": "2.0",
"method": "GetTask",
"params": { "id": task_id },
"id": 2
});
let get_resp: Value = client
.post(format!("http://{}/", addr))
.json(&get_body)
.send()
.await
.expect("send")
.json()
.await
.expect("json");
assert_eq!(get_resp["result"]["status"]["state"], "completed");
}
#[tokio::test]
async fn dispatcher_accepts_v1_streaming_method_name() {
let (addr, _handle) = boot().await;
let resp = reqwest::Client::new()
.post(format!("http://{}/", addr))
.json(&json!({
"jsonrpc": "2.0",
"method": "SendStreamingMessage",
"params": {
"message": {
"messageId": "m-v1-stream",
"role": "user",
"parts": [{
"kind": "data",
"data": { "tool": "echo", "parameters": { "x": 1 } }
}]
}
},
"id": 1
}))
.send()
.await
.expect("send");
let ctype = resp
.headers()
.get("content-type")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
assert!(
ctype.starts_with("text/event-stream"),
"expected SSE on SendStreamingMessage, got {}",
ctype
);
}
#[tokio::test]
async fn push_delivery_posts_task_snapshot_to_webhook() {
use std::sync::Mutex;
use tokio::sync::oneshot;
#[derive(Clone)]
struct Captured {
body: Arc<Mutex<Option<Value>>>,
auth_header: Arc<Mutex<Option<String>>>,
signal: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}
let (tx, rx) = oneshot::channel();
let captured = Captured {
body: Arc::new(Mutex::new(None)),
auth_header: Arc::new(Mutex::new(None)),
signal: Arc::new(Mutex::new(Some(tx))),
};
use axum::http::HeaderMap as AxumHeaderMap;
use axum::Json as AxumJson;
let captured_for_handler = captured.clone();
let app = axum::Router::new().route(
"/hook",
axum::routing::post(
move |headers: AxumHeaderMap, AxumJson(body): AxumJson<Value>| {
let cap = captured_for_handler.clone();
async move {
*cap.body.lock().unwrap() = Some(body);
*cap.auth_header.lock().unwrap() = headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
if let Some(sig) = cap.signal.lock().unwrap().take() {
let _ = sig.send(());
}
axum::http::StatusCode::OK
}
},
),
);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let webhook_addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let _ = axum::serve(listener, app).await;
});
let gate = Arc::new(tokio::sync::Notify::new());
let ((bridge_addr, _bridge_handle), ()) = boot_with_gate(Some(gate.clone())).await;
let client = A2aClient::new(format!("http://{}", bridge_addr));
use car_a2a::types::{Message, MessageRole, Part};
let msg = Message {
message_id: "m-push-1".into(),
role: MessageRole::User,
parts: vec![Part::Data(car_a2a::types::DataPart {
data: json!({ "tool": "wait_then_echo", "parameters": { "v": 1 } }),
metadata: HashMap::new(),
})],
task_id: None,
context_id: None,
metadata: HashMap::new(),
};
let send_result = client.send_message(msg, false).await.expect("send");
let task = match send_result {
car_a2a::types::SendMessageResult::Task(t) => t,
_ => panic!("expected Task"),
};
let cfg_id = client
.set_push_config(
&task.id,
car_a2a::types::PushNotificationConfig {
url: format!("http://{}/hook", webhook_addr),
token: Some("hook-token-42".into()),
authentication: None,
},
)
.await
.expect("set push");
assert!(!cfg_id.is_empty());
gate.notify_one();
let result = tokio::time::timeout(std::time::Duration::from_secs(10), rx).await;
assert!(result.is_ok(), "webhook never received a POST");
let body = captured.body.lock().unwrap().clone().expect("body");
assert_eq!(body["id"], task.id);
let auth = captured.auth_header.lock().unwrap().clone().expect("auth");
assert_eq!(auth, "Bearer hook-token-42");
}
#[tokio::test]
async fn outbound_client_round_trips_against_inbound_bridge() {
use car_a2a::types::{Message, MessageRole, Part, TextPart};
let (addr, _handle) = boot().await;
let client = A2aClient::new(format!("http://{}", addr));
let card = client.agent_card().await.expect("card");
assert_eq!(card.protocol_version, "1.0");
let msg = Message {
message_id: "m-client".into(),
role: MessageRole::User,
parts: vec![Part::Data(car_a2a::types::DataPart {
data: json!({ "tool": "echo", "parameters": { "n": 7 } }),
metadata: HashMap::new(),
})],
task_id: None,
context_id: None,
metadata: HashMap::new(),
};
let result = client.send_message(msg, true).await.expect("send");
let task = match result {
car_a2a::types::SendMessageResult::Task(t) => t,
car_a2a::types::SendMessageResult::Message(_) => panic!("expected Task"),
};
assert_eq!(task.status.state, car_a2a::TaskState::Completed);
assert_eq!(task.artifacts.len(), 1);
let fetched = client.get_task(&task.id).await.expect("get");
assert_eq!(fetched.id, task.id);
}
#[tokio::test]
async fn outbound_client_sends_bearer_when_configured() {
let auth: Arc<dyn AuthValidator> = Arc::new(BearerKeyAuth::single("client-key"));
let (addr, _handle) = boot_with_auth(auth).await;
let client = A2aClient::new(format!("http://{}", addr))
.with_auth(ClientAuth::Bearer("client-key".into()));
let _ = client.agent_card().await.expect("card");
let card_via_rpc: Value = client
.call("agent/getAuthenticatedExtendedCard", &Value::Null)
.await
.expect("rpc");
assert_eq!(card_via_rpc["protocolVersion"], "1.0");
}