mod factory;
mod interceptor;
mod jsonrpc;
mod rest;
#[cfg(feature = "grpc")]
mod grpc;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
pub use factory::{ClientFactory, TenantTransportDecorator, TransportBuilder};
use futures::Stream;
#[cfg(feature = "grpc")]
#[cfg_attr(docsrs, doc(cfg(feature = "grpc")))]
pub use grpc::GrpcTransport;
pub use interceptor::{
CallInterceptor, PassthroughInterceptor, Request, Response, SERVICE_PARAMS, ServiceParams,
StaticParamsInjector, current_service_params,
};
pub use jsonrpc::{JsonRpcTransport, TransportConfig};
pub use rest::RestTransport;
use crate::error::{A2AError, Result};
use crate::types::{
AgentCard, CancelTaskRequest, DeleteTaskPushNotificationConfigRequest,
GetExtendedAgentCardRequest, GetTaskPushNotificationConfigRequest, GetTaskRequest,
ListTaskPushNotificationConfigsRequest, ListTaskPushNotificationConfigsResponse,
ListTasksRequest, ListTasksResponse, SendMessageRequest, SendMessageResponse, StreamResponse,
SubscribeToTaskRequest, Task, TaskPushNotificationConfig, TransportProtocol,
};
pub type EventStream = Pin<Box<dyn Stream<Item = Result<StreamResponse>> + Send>>;
pub trait Transport: Send + Sync {
fn send_message<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a SendMessageRequest,
) -> Pin<Box<dyn Future<Output = Result<SendMessageResponse>> + Send + 'a>>;
fn send_streaming_message<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a SendMessageRequest,
) -> Pin<Box<dyn Future<Output = Result<EventStream>> + Send + 'a>>;
fn get_task<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a GetTaskRequest,
) -> Pin<Box<dyn Future<Output = Result<Task>> + Send + 'a>>;
fn list_tasks<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a ListTasksRequest,
) -> Pin<Box<dyn Future<Output = Result<ListTasksResponse>> + Send + 'a>>;
fn cancel_task<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a CancelTaskRequest,
) -> Pin<Box<dyn Future<Output = Result<Task>> + Send + 'a>>;
fn subscribe_to_task<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a SubscribeToTaskRequest,
) -> Pin<Box<dyn Future<Output = Result<EventStream>> + Send + 'a>>;
fn create_task_push_config<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a TaskPushNotificationConfig,
) -> Pin<Box<dyn Future<Output = Result<TaskPushNotificationConfig>> + Send + 'a>>;
fn get_task_push_config<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a GetTaskPushNotificationConfigRequest,
) -> Pin<Box<dyn Future<Output = Result<TaskPushNotificationConfig>> + Send + 'a>>;
fn list_task_push_configs<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a ListTaskPushNotificationConfigsRequest,
) -> Pin<Box<dyn Future<Output = Result<ListTaskPushNotificationConfigsResponse>> + Send + 'a>>;
fn delete_task_push_config<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a DeleteTaskPushNotificationConfigRequest,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>>;
fn get_extended_agent_card<'a>(
&'a self,
params: &'a ServiceParams,
req: &'a GetExtendedAgentCardRequest,
) -> Pin<Box<dyn Future<Output = Result<AgentCard>> + Send + 'a>>;
fn get_agent_card(&self) -> Pin<Box<dyn Future<Output = Result<AgentCard>> + Send + '_>>;
fn destroy(&self) -> Pin<Box<dyn Future<Output = ()> + Send + '_>> {
Box::pin(async {})
}
}
#[derive(Debug, Clone, Default)]
pub struct ClientConfig {
pub push_config: Option<TaskPushNotificationConfig>,
pub accepted_output_modes: Vec<String>,
pub preferred_transports: Vec<TransportProtocol>,
}
pub struct Client {
transport: Box<dyn Transport>,
interceptors: Vec<Arc<dyn CallInterceptor>>,
card: std::sync::RwLock<Option<AgentCard>>,
config: ClientConfig,
}
impl std::fmt::Debug for Client {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("config", &self.config)
.finish_non_exhaustive()
}
}
impl Client {
#[must_use]
pub fn new(transport: Box<dyn Transport>) -> Self {
Self {
transport,
interceptors: Vec::new(),
card: std::sync::RwLock::new(None),
config: ClientConfig::default(),
}
}
pub fn from_url(base_url: impl Into<String>) -> Result<Self> {
let transport = JsonRpcTransport::from_url(base_url)?;
Ok(Self::new(Box::new(transport)))
}
#[must_use]
pub fn with_config(mut self, config: ClientConfig) -> Self {
self.config = config;
self
}
#[must_use]
pub fn with_interceptor(mut self, interceptor: impl CallInterceptor + 'static) -> Self {
self.interceptors.push(Arc::new(interceptor));
self
}
#[must_use]
pub fn with_interceptor_arc(mut self, interceptor: Arc<dyn CallInterceptor>) -> Self {
self.interceptors.push(interceptor);
self
}
pub fn set_card(&self, card: AgentCard) {
*self
.card
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(card);
}
#[must_use]
pub fn card(&self) -> Option<AgentCard> {
self.card
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone()
}
async fn intercept_before<P: Send + 'static>(
&self,
method: &str,
payload: P,
) -> Result<(P, ServiceParams)> {
if self.interceptors.is_empty() {
return Ok((payload, ServiceParams::default()));
}
let mut req = Request {
method: method.to_owned(),
card: self.card(),
service_params: ServiceParams::default(),
payload: Box::new(payload),
};
for interceptor in &self.interceptors {
interceptor.before(&mut req).await?;
}
let params = req.service_params;
req.payload.downcast::<P>().map_or_else(
|_| {
Err(A2AError::Other(
"interceptor changed request payload type".into(),
))
},
|p| Ok((*p, params)),
)
}
async fn intercept_after<R: Send + 'static>(
&self,
method: &str,
result: Result<R>,
) -> Result<R> {
use std::any::Any;
if self.interceptors.is_empty() {
return result;
}
let (payload, response_err) = match result {
Ok(r) => {
let boxed: Box<dyn Any + Send> = Box::new(r);
(Some(boxed), None)
}
Err(e) => (None, Some(e)),
};
let mut resp = Response {
method: method.to_owned(),
card: self.card(),
payload,
err: response_err,
};
for interceptor in &self.interceptors {
interceptor.after(&mut resp).await?;
}
if let Some(e) = resp.err {
return Err(e);
}
resp.payload.map_or_else(
|| {
Err(A2AError::Other(
"no response payload after interceptor".into(),
))
},
|p| {
p.downcast::<R>().map_or_else(
|_| {
Err(A2AError::Other(
"interceptor changed response payload type".into(),
))
},
|r| Ok(*r),
)
},
)
}
pub async fn send_message(&self, req: &SendMessageRequest) -> Result<SendMessageResponse> {
let (req, sp) = self.intercept_before("SendMessage", req.clone()).await?;
let result = SERVICE_PARAMS
.scope(sp.clone(), async {
self.transport.send_message(&sp, &req).await
})
.await;
self.intercept_after("SendMessage", result).await
}
pub async fn send_streaming_message(&self, req: &SendMessageRequest) -> Result<EventStream> {
let (req, sp) = self
.intercept_before("SendStreamingMessage", req.clone())
.await?;
if let Some(ref card) = self.card()
&& !card.supports_streaming()
{
let result = SERVICE_PARAMS
.scope(sp.clone(), async {
self.transport.send_message(&sp, &req).await
})
.await;
let result = self.intercept_after("SendStreamingMessage", result).await?;
let event = match result {
SendMessageResponse::Task(t) => StreamResponse::Task(t),
SendMessageResponse::Message(m) => StreamResponse::Message(m),
};
let stream: EventStream = Box::pin(futures::stream::once(async move { Ok(event) }));
return Ok(stream);
}
let stream = SERVICE_PARAMS
.scope(sp.clone(), async {
self.transport.send_streaming_message(&sp, &req).await
})
.await?;
Ok(stream)
}
pub async fn get_task(&self, req: &GetTaskRequest) -> Result<Task> {
let (req, sp) = self.intercept_before("GetTask", req.clone()).await?;
let result = SERVICE_PARAMS
.scope(sp.clone(), async {
self.transport.get_task(&sp, &req).await
})
.await;
self.intercept_after("GetTask", result).await
}
pub async fn list_tasks(&self, req: &ListTasksRequest) -> Result<ListTasksResponse> {
let (req, sp) = self.intercept_before("ListTasks", req.clone()).await?;
let result = SERVICE_PARAMS
.scope(sp.clone(), async {
self.transport.list_tasks(&sp, &req).await
})
.await;
self.intercept_after("ListTasks", result).await
}
pub async fn cancel_task(&self, req: &CancelTaskRequest) -> Result<Task> {
let (req, sp) = self.intercept_before("CancelTask", req.clone()).await?;
let result = SERVICE_PARAMS
.scope(sp.clone(), async {
self.transport.cancel_task(&sp, &req).await
})
.await;
self.intercept_after("CancelTask", result).await
}
pub async fn subscribe_to_task(&self, req: &SubscribeToTaskRequest) -> Result<EventStream> {
let (req, sp) = self
.intercept_before("SubscribeToTask", req.clone())
.await?;
SERVICE_PARAMS
.scope(sp.clone(), async {
self.transport.subscribe_to_task(&sp, &req).await
})
.await
}
pub async fn get_agent_card(&self) -> Result<AgentCard> {
if let Some(ref card) = self.card()
&& !card.supports_extended_card()
{
return Ok(card.clone());
}
let result = self.transport.get_agent_card().await;
let card = self.intercept_after("GetAgentCard", result).await?;
self.set_card(card.clone());
Ok(card)
}
pub async fn destroy(&self) {
self.transport.destroy().await;
}
}