use std::fmt::Write as _;
use std::sync::Arc;
use async_trait::async_trait;
use futures::StreamExt;
use ra2a::client::{A2AClient, A2AClientBuilder, Client, ClientEvent};
use ra2a::types::{AgentCard, Message as A2aMessage};
use serde_json::Value;
use tokio::sync::RwLock;
use tracing::{debug, info};
use crate::error::ToolError;
use crate::tool::{BoxedTool, DynTool, ToolDefinition};
#[derive(Debug)]
pub struct A2aAgentBuilder {
url: String,
inner: A2AClientBuilder,
name: Option<String>,
}
impl A2aAgentBuilder {
#[must_use]
pub fn bearer_auth(mut self, token: impl Into<String>) -> Self {
self.inner = self.inner.bearer_auth(token);
self
}
#[must_use]
pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.inner = self.inner.header(name, value);
self
}
#[must_use]
pub fn api_key(mut self, header_name: impl Into<String>, key: impl Into<String>) -> Self {
self.inner = self.inner.api_key(header_name, key);
self
}
#[must_use]
pub fn timeout(mut self, secs: u64) -> Self {
self.inner = self.inner.timeout(secs);
self
}
#[must_use]
pub fn name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
pub async fn connect(self) -> crate::Result<A2aAgent> {
info!(url = %self.url, "Connecting to A2A agent");
let client = self.inner.build().map_err(|e| {
crate::error::AgentError::runtime(format!(
"Failed to build A2A client for '{}': {e}",
self.url
))
})?;
let card = client.get_agent_card().await.map_err(|e| {
crate::error::AgentError::runtime(format!(
"Failed to fetch agent card from '{}': {e}",
self.url,
))
})?;
let name = self.name.unwrap_or_else(|| card.name.clone());
info!(
name = %name,
skills = card.skills.len(),
"A2A agent connected",
);
Ok(A2aAgent {
client: Arc::new(client),
card: Arc::new(RwLock::new(card)),
name,
})
}
}
pub struct A2aAgent {
client: Arc<A2AClient>,
card: Arc<RwLock<AgentCard>>,
name: String,
}
impl std::fmt::Debug for A2aAgent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("A2aAgent")
.field("name", &self.name)
.finish_non_exhaustive()
}
}
impl A2aAgent {
#[allow(clippy::new_ret_no_self)]
pub fn new(url: impl Into<String>) -> A2aAgentBuilder {
let url = url.into();
let inner = A2AClientBuilder::new(&url);
A2aAgentBuilder {
url,
inner,
name: None,
}
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
pub async fn agent_card(&self) -> AgentCard {
self.card.read().await.clone()
}
pub async fn refresh_card(&self) -> crate::Result<AgentCard> {
let card = self.client.get_agent_card().await.map_err(|e| {
crate::error::AgentError::runtime(format!(
"Failed to refresh agent card for '{}': {e}",
self.name,
))
})?;
let mut cached = self.card.write().await;
*cached = card.clone();
Ok(card)
}
pub async fn send(&self, text: impl Into<String>) -> Result<String, ToolError> {
let message = A2aMessage::user_text(text);
self.send_message(message).await
}
pub async fn send_message(&self, message: A2aMessage) -> Result<String, ToolError> {
let mut stream = self.client.send_message(message).await.map_err(|e| {
ToolError::Execution(format!("A2A agent '{}' send failed: {e}", self.name))
})?;
let mut output = String::new();
while let Some(result) = stream.next().await {
let event = result.map_err(|e| {
ToolError::Execution(format!("A2A agent '{}' stream error: {e}", self.name))
})?;
match event {
ClientEvent::Message(msg) => {
if let Some(text) = msg.text_content() {
if !output.is_empty() {
output.push('\n');
}
output.push_str(&text);
}
}
ClientEvent::TaskUpdate { task, .. } => {
if let Some(ref msg) = task.status.message
&& let Some(text) = msg.text_content()
{
if !output.is_empty() {
output.push('\n');
}
output.push_str(&text);
}
if let Some(ref artifacts) = task.artifacts {
for artifact in artifacts {
for part in &artifact.parts {
if let Some(text) = part.as_text() {
if !output.is_empty() {
output.push('\n');
}
output.push_str(text);
}
}
}
}
}
}
}
debug!(agent = %self.name, len = output.len(), "A2A response received");
Ok(output)
}
#[must_use]
pub fn into_tool(self) -> BoxedTool {
Box::new(A2aTool {
agent: Arc::new(self),
})
}
#[must_use]
pub fn client(&self) -> &A2AClient {
&self.client
}
}
struct A2aTool {
agent: Arc<A2aAgent>,
}
impl std::fmt::Debug for A2aTool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("A2aTool")
.field("name", &self.agent.name)
.finish()
}
}
#[async_trait]
impl DynTool for A2aTool {
fn name(&self) -> &str {
&self.agent.name
}
fn description(&self) -> String {
self.agent.card.try_read().map_or_else(
|_| "A2A remote agent".to_owned(),
|card| {
let mut desc = card.description.clone();
if !card.skills.is_empty() {
desc.push_str("\n\nSkills:");
for skill in &card.skills {
let _ = write!(desc, "\n- {}: {}", skill.name, skill.description);
}
}
desc
},
)
}
fn definition(&self) -> ToolDefinition {
let params = serde_json::json!({
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "The message to send to the remote agent"
}
},
"required": ["message"]
});
ToolDefinition::new(&self.agent.name, self.description(), params)
}
async fn call_json(&self, args: Value) -> Result<Value, ToolError> {
let text = match args.get("message").and_then(|v| v.as_str()) {
Some(t) => t.to_owned(),
None => {
return Err(ToolError::InvalidArguments(
"Missing required field 'message' (string)".into(),
));
}
};
let response = self.agent.send(text).await?;
Ok(Value::String(response))
}
}