use std::net::SocketAddr;
use std::sync::Arc;
use super::service::GrpcServiceImpl;
use super::{A2aServiceServer, GrpcConfig};
use crate::handler::RequestHandler;
pub struct GrpcDispatcher {
handler: Arc<RequestHandler>,
config: GrpcConfig,
}
impl GrpcDispatcher {
#[must_use]
pub const fn new(handler: Arc<RequestHandler>, config: GrpcConfig) -> Self {
Self { handler, config }
}
pub async fn serve(self, addr: impl tokio::net::ToSocketAddrs) -> std::io::Result<()> {
let addr = super::helpers::resolve_addr(addr).await?;
let svc = self.into_service();
trace_info!(
addr = %addr,
"A2A gRPC server listening"
);
tonic::transport::Server::builder()
.concurrency_limit_per_connection(self.config.concurrency_limit)
.add_service(svc)
.serve(addr)
.await
.map_err(std::io::Error::other)
}
pub async fn serve_with_addr(
self,
addr: impl tokio::net::ToSocketAddrs,
) -> std::io::Result<SocketAddr> {
let listener = tokio::net::TcpListener::bind(addr).await?;
self.serve_with_listener(listener)
}
pub fn serve_with_listener(
self,
listener: tokio::net::TcpListener,
) -> std::io::Result<SocketAddr> {
let local_addr = listener.local_addr()?;
let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
let svc = self.into_service();
trace_info!(
%local_addr,
"A2A gRPC server listening"
);
let limit = self.config.concurrency_limit;
tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.concurrency_limit_per_connection(limit)
.add_service(svc)
.serve_with_incoming(incoming)
.await;
});
Ok(local_addr)
}
#[must_use]
pub fn into_service(&self) -> A2aServiceServer<GrpcServiceImpl> {
let inner = GrpcServiceImpl {
handler: Arc::clone(&self.handler),
config: self.config.clone(),
};
A2aServiceServer::new(inner)
.max_decoding_message_size(self.config.max_message_size)
.max_encoding_message_size(self.config.max_message_size)
}
}
impl std::fmt::Debug for GrpcDispatcher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GrpcDispatcher")
.field("handler", &"RequestHandler { .. }")
.field("config", &self.config)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn grpc_dispatcher_debug_does_not_panic() {
use crate::agent_executor;
use crate::RequestHandlerBuilder;
use std::sync::Arc;
struct DummyExec;
agent_executor!(DummyExec, |_ctx, _queue| async { Ok(()) });
let handler = Arc::new(RequestHandlerBuilder::new(DummyExec).build().unwrap());
let dispatcher = GrpcDispatcher::new(handler, GrpcConfig::default());
let _ = format!("{dispatcher:?}");
}
}