use std::collections::HashMap;
use std::sync::Arc;
use serde_json::Value;
use tracing::{error, info};
use uuid::Uuid;
use crate::a2a::types::{AgentCard, JsonRpcRequest, Message, Part, Task, TaskArtifactUpdateEvent, TaskState, Artifact};
use crate::config::auth::{AuthConfig, AuthType};
use crate::error::{AgentKitError, Result};
#[derive(Clone)]
pub struct RemoteAgentConnection {
pub card: AgentCard,
pub auth: AuthConfig,
http: reqwest::Client,
}
impl RemoteAgentConnection {
pub fn new(card: AgentCard, auth: AuthConfig) -> Self {
Self {
card,
auth,
http: reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(60))
.build()
.expect("reqwest client build failed"),
}
}
pub async fn send_message(
&self,
request: Message,
propagation_headers: HashMap<String, String>,
) -> Result<Task> {
let rpc_id = Uuid::new_v4().to_string();
let rpc = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
method: "message/send".to_string(),
id: Some(serde_json::json!(rpc_id)),
params: serde_json::to_value(&request)
.map_err(|e| AgentKitError::A2aClient(e.to_string()))?,
};
let url = &self.card.url;
info!("A2A → {} ({})", self.card.name, url);
let mut req_builder = self.http.post(url).json(&rpc);
for (k, v) in &propagation_headers {
req_builder = req_builder.header(k, v);
}
if self.auth.auth_type != AuthType::NoAuth {
}
let body = req_builder
.send()
.await?
.error_for_status()
.map_err(|e| AgentKitError::A2aClient(e.to_string()))?
.text()
.await
.map_err(|e| AgentKitError::A2aClient(e.to_string()))?;
let mut collected_parts: Vec<Part> = Vec::new();
let mut task_id = Uuid::new_v4().to_string();
let mut context_id = request
.context_id
.clone()
.unwrap_or_else(|| Uuid::new_v4().to_string());
for line in body.lines() {
let data = match line.strip_prefix("data: ") {
Some(d) if !d.is_empty() => d,
_ => continue,
};
let envelope: serde_json::Value = match serde_json::from_str(data) {
Ok(v) => v,
Err(_) => continue,
};
let result = match envelope.get("result") {
Some(r) => r,
None => continue,
};
match result.get("kind").and_then(Value::as_str) {
Some("artifact-update") => {
if let Ok(ev) =
serde_json::from_value::<TaskArtifactUpdateEvent>(result.clone())
{
collected_parts.extend(ev.artifact.parts);
task_id = ev.task_id;
context_id = ev.context_id;
}
}
Some("status-update") => {
if let Some(tid) = result.get("taskId").and_then(Value::as_str) {
task_id = tid.to_string();
}
if let Some(cid) = result.get("contextId").and_then(Value::as_str) {
context_id = cid.to_string();
}
}
_ => {}
}
}
info!("A2A ← {} — {} part(s) collected", self.card.name, collected_parts.len());
let artifacts = if collected_parts.is_empty() {
vec![]
} else {
vec![Artifact {
artifact_id: Uuid::new_v4().to_string(),
name: None,
description: None,
parts: collected_parts,
metadata: None,
}]
};
Ok(Task {
id: task_id,
context_id,
state: TaskState::Completed,
artifacts,
history: vec![],
metadata: None,
})
}
}
pub struct RemoteAgentManager {
connections: HashMap<String, Arc<RemoteAgentConnection>>,
pub agents_description: String,
}
impl RemoteAgentManager {
pub fn new() -> Self {
Self {
connections: HashMap::new(),
agents_description: String::new(),
}
}
pub async fn connect(&mut self, addresses: Vec<(String, AuthConfig)>) -> Result<()> {
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(60))
.build()
.map_err(|e| AgentKitError::A2aClient(e.to_string()))?;
for (i, (url, auth)) in addresses.into_iter().enumerate() {
let card_url = format!("{url}/.well-known/agent.json");
info!("Connecting to remote agent {i}: {url}");
match http.get(&card_url).send().await {
Ok(resp) => match resp.json::<AgentCard>().await {
Ok(card) => {
info!("Connected to remote agent '{}'", card.name);
self.connections.insert(
card.name.clone(),
Arc::new(RemoteAgentConnection::new(card, auth)),
);
}
Err(e) => error!("Failed to parse agent card from {url}: {e}"),
},
Err(e) => error!("Failed to reach remote agent at {url}: {e}"),
}
}
self.agents_description = self
.connections
.values()
.map(|c| {
serde_json::json!({
"name": c.card.name,
"description": c.card.description
})
.to_string()
})
.collect::<Vec<_>>()
.join("\n");
info!(
"Remote agent manager ready with {} agent(s)",
self.connections.len()
);
Ok(())
}
pub fn take_connections(&self) -> Vec<Arc<RemoteAgentConnection>> {
self.connections.values().cloned().collect()
}
pub fn list_agents(&self) -> Vec<Value> {
self.connections
.values()
.map(|c| serde_json::json!({"name": c.card.name, "description": c.card.description}))
.collect()
}
pub async fn send_message(
&self,
agent_name: &str,
task: &str,
context_id: Option<String>,
task_id: Option<String>,
propagation_headers: HashMap<String, String>,
) -> Result<Task> {
let conn = self.connections.get(agent_name).ok_or_else(|| {
AgentKitError::RemoteAgentNotFound { name: agent_name.to_string() }
})?;
let ctx_id = context_id.unwrap_or_else(|| Uuid::new_v4().to_string());
let message = Message {
kind: "message".to_string(),
role: crate::a2a::types::Role::User,
parts: vec![Part::Text { text: task.to_string(), metadata: None }],
message_id: Uuid::new_v4().to_string(),
context_id: Some(ctx_id),
task_id,
metadata: None,
};
conn.send_message(message, propagation_headers).await
}
}
impl Default for RemoteAgentManager {
fn default() -> Self { Self::new() }
}