use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use super::{
CallInterceptor, Client, ClientConfig, EventStream, JsonRpcTransport, RestTransport,
ServiceParams, Transport, TransportConfig,
};
use crate::error::{A2AError, Result};
use crate::types::{
AgentCard, AgentInterface, CancelTaskRequest, DeleteTaskPushNotificationConfigRequest,
GetExtendedAgentCardRequest, GetTaskPushNotificationConfigRequest, GetTaskRequest,
ListTaskPushNotificationConfigsRequest, ListTaskPushNotificationConfigsResponse,
ListTasksRequest, ListTasksResponse, SendMessageRequest, SendMessageResponse,
SubscribeToTaskRequest, Task, TaskPushNotificationConfig, TransportProtocol,
};
type TransportBuildFuture<'a> =
Pin<Box<dyn Future<Output = Result<Box<dyn Transport>>> + Send + 'a>>;
pub trait TransportBuilder: Send + Sync {
fn build<'a>(&'a self, endpoint: &'a AgentInterface) -> TransportBuildFuture<'a>;
}
struct JsonRpcBuilder;
impl TransportBuilder for JsonRpcBuilder {
fn build<'a>(&'a self, endpoint: &'a AgentInterface) -> TransportBuildFuture<'a> {
Box::pin(async move {
let transport = JsonRpcTransport::new(TransportConfig::new(&endpoint.url))?;
let t: Box<dyn Transport> = Box::new(transport);
Ok(t)
})
}
}
struct RestBuilder;
impl TransportBuilder for RestBuilder {
fn build<'a>(&'a self, endpoint: &'a AgentInterface) -> TransportBuildFuture<'a> {
Box::pin(async move {
let transport = RestTransport::new(&endpoint.url)?;
let t: Box<dyn Transport> = Box::new(transport);
Ok(t)
})
}
}
pub struct ClientFactory {
config: ClientConfig,
interceptors: Vec<Arc<dyn CallInterceptor>>,
builders: HashMap<String, Arc<dyn TransportBuilder>>,
}
impl std::fmt::Debug for ClientFactory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClientFactory")
.field("config", &self.config)
.finish_non_exhaustive()
}
}
impl ClientFactory {
#[must_use]
pub fn new() -> Self {
let mut builders: HashMap<String, Arc<dyn TransportBuilder>> = HashMap::new();
builders.insert(
TransportProtocol::JSONRPC.to_owned(),
Arc::new(JsonRpcBuilder),
);
builders.insert(
TransportProtocol::HTTP_JSON.to_owned(),
Arc::new(RestBuilder),
);
Self {
config: ClientConfig::default(),
interceptors: Vec::new(),
builders,
}
}
#[must_use]
pub fn with_config(mut self, config: ClientConfig) -> Self {
self.config = config;
self
}
#[must_use]
pub fn with_interceptor(mut self, interceptor: Arc<dyn CallInterceptor>) -> Self {
self.interceptors.push(interceptor);
self
}
#[must_use]
pub fn with_transport(
mut self,
protocol: impl Into<String>,
builder: Arc<dyn TransportBuilder>,
) -> Self {
self.builders.insert(protocol.into(), builder);
self
}
pub async fn create_from_card(&self, card: &AgentCard) -> Result<Client> {
if card.supported_interfaces.is_empty() {
return Err(A2AError::InvalidParams(
"agent card has no supported interfaces".into(),
));
}
let mut candidates: Vec<(&AgentInterface, &Arc<dyn TransportBuilder>, usize)> = Vec::new();
for iface in &card.supported_interfaces {
let protocol = iface.protocol_binding.as_str();
if let Some(builder) = self.builders.get(protocol) {
let priority = self
.config
.preferred_transports
.iter()
.position(|p| p.as_str() == protocol)
.unwrap_or(usize::MAX);
candidates.push((iface, builder, priority));
}
}
if candidates.is_empty() {
let protocols: Vec<_> = card
.supported_interfaces
.iter()
.map(|i| i.protocol_binding.to_string())
.collect();
return Err(A2AError::Other(format!(
"no compatible transports found: available protocols - [{}]",
protocols.join(", ")
)));
}
candidates.sort_by_key(|&(_, _, priority)| priority);
let mut errors = Vec::new();
for (iface, builder, _) in &candidates {
let mut transport = match builder.build(iface).await {
Ok(t) => t,
Err(e) => {
errors.push(format!("{}: {e}", iface.url));
continue;
}
};
if let Some(ref tenant) = iface.tenant
&& !tenant.is_empty()
{
transport = Box::new(TenantTransportDecorator {
inner: transport,
tenant: tenant.clone(),
});
}
let mut client = Client::new(transport);
client.set_card(card.clone());
for interceptor in &self.interceptors {
client = client.with_interceptor_arc(Arc::clone(interceptor));
}
return Ok(client);
}
Err(A2AError::Other(format!(
"all transports failed: {}",
errors.join("; ")
)))
}
pub async fn create_from_url(&self, base_url: &str) -> Result<Client> {
let card_url = crate::agent_card_url(base_url);
let resp = reqwest::get(&card_url)
.await
.map_err(|e| A2AError::Other(format!("failed to fetch agent card: {e}")))?;
let card: AgentCard = resp
.json()
.await
.map_err(|e| A2AError::Other(format!("failed to parse agent card: {e}")))?;
self.create_from_card(&card).await
}
}
impl Default for ClientFactory {
fn default() -> Self {
Self::new()
}
}
pub struct TenantTransportDecorator {
inner: Box<dyn Transport>,
tenant: String,
}
impl std::fmt::Debug for TenantTransportDecorator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TenantTransportDecorator")
.field("tenant", &self.tenant)
.finish_non_exhaustive()
}
}
impl Transport for TenantTransportDecorator {
fn send_message<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a SendMessageRequest,
) -> Pin<Box<dyn Future<Output = Result<SendMessageResponse>> + Send + 'a>> {
let mut req = req.clone();
req.tenant = Some(self.tenant.clone());
Box::pin(async move { self.inner.send_message(params, &req).await })
}
fn send_streaming_message<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a SendMessageRequest,
) -> Pin<Box<dyn Future<Output = Result<EventStream>> + Send + 'a>> {
let mut req = req.clone();
req.tenant = Some(self.tenant.clone());
Box::pin(async move { self.inner.send_streaming_message(params, &req).await })
}
fn get_task<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a GetTaskRequest,
) -> Pin<Box<dyn Future<Output = Result<Task>> + Send + 'a>> {
let mut req = req.clone();
req.tenant = Some(self.tenant.clone());
Box::pin(async move { self.inner.get_task(params, &req).await })
}
fn list_tasks<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a ListTasksRequest,
) -> Pin<Box<dyn Future<Output = Result<ListTasksResponse>> + Send + 'a>> {
let mut req = req.clone();
req.tenant = Some(self.tenant.clone());
Box::pin(async move { self.inner.list_tasks(params, &req).await })
}
fn cancel_task<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a CancelTaskRequest,
) -> Pin<Box<dyn Future<Output = Result<Task>> + Send + 'a>> {
let mut req = req.clone();
req.tenant = Some(self.tenant.clone());
Box::pin(async move { self.inner.cancel_task(params, &req).await })
}
fn subscribe_to_task<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a SubscribeToTaskRequest,
) -> Pin<Box<dyn Future<Output = Result<EventStream>> + Send + 'a>> {
let mut req = req.clone();
req.tenant = Some(self.tenant.clone());
Box::pin(async move { self.inner.subscribe_to_task(params, &req).await })
}
fn create_task_push_config<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a TaskPushNotificationConfig,
) -> Pin<Box<dyn Future<Output = Result<TaskPushNotificationConfig>> + Send + 'a>> {
let mut req = req.clone();
req.tenant = Some(self.tenant.clone());
Box::pin(async move { self.inner.create_task_push_config(params, &req).await })
}
fn get_task_push_config<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a GetTaskPushNotificationConfigRequest,
) -> Pin<Box<dyn Future<Output = Result<TaskPushNotificationConfig>> + Send + 'a>> {
let mut req = req.clone();
req.tenant = Some(self.tenant.clone());
Box::pin(async move { self.inner.get_task_push_config(params, &req).await })
}
fn list_task_push_configs<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a ListTaskPushNotificationConfigsRequest,
) -> Pin<Box<dyn Future<Output = Result<ListTaskPushNotificationConfigsResponse>> + Send + 'a>>
{
let mut req = req.clone();
req.tenant = Some(self.tenant.clone());
Box::pin(async move { self.inner.list_task_push_configs(params, &req).await })
}
fn delete_task_push_config<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a DeleteTaskPushNotificationConfigRequest,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
let mut req = req.clone();
req.tenant = Some(self.tenant.clone());
Box::pin(async move { self.inner.delete_task_push_config(params, &req).await })
}
fn get_extended_agent_card<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a GetExtendedAgentCardRequest,
) -> Pin<Box<dyn Future<Output = Result<AgentCard>> + Send + 'a>> {
let mut req = req.clone();
req.tenant = Some(self.tenant.clone());
Box::pin(async move { self.inner.get_extended_agent_card(params, &req).await })
}
fn get_agent_card(&self) -> Pin<Box<dyn Future<Output = Result<AgentCard>> + Send + '_>> {
self.inner.get_agent_card()
}
}