a2a_protocol_server/dispatch/grpc/
dispatcher.rs1use std::net::SocketAddr;
9use std::sync::Arc;
10
11use super::service::GrpcServiceImpl;
12use super::{A2aServiceServer, GrpcConfig};
13use crate::handler::RequestHandler;
14
15pub struct GrpcDispatcher {
21 handler: Arc<RequestHandler>,
22 config: GrpcConfig,
23}
24
25impl GrpcDispatcher {
26 #[must_use]
28 pub const fn new(handler: Arc<RequestHandler>, config: GrpcConfig) -> Self {
29 Self { handler, config }
30 }
31
32 pub async fn serve(self, addr: impl tokio::net::ToSocketAddrs) -> std::io::Result<()> {
41 let addr = super::helpers::resolve_addr(addr).await?;
42 let svc = self.into_service();
43
44 trace_info!(
45 addr = %addr,
46 "A2A gRPC server listening"
47 );
48
49 tonic::transport::Server::builder()
50 .concurrency_limit_per_connection(self.config.concurrency_limit)
51 .add_service(svc)
52 .serve(addr)
53 .await
54 .map_err(std::io::Error::other)
55 }
56
57 pub async fn serve_with_addr(
66 self,
67 addr: impl tokio::net::ToSocketAddrs,
68 ) -> std::io::Result<SocketAddr> {
69 let listener = tokio::net::TcpListener::bind(addr).await?;
70 self.serve_with_listener(listener)
71 }
72
73 pub fn serve_with_listener(
86 self,
87 listener: tokio::net::TcpListener,
88 ) -> std::io::Result<SocketAddr> {
89 let local_addr = listener.local_addr()?;
90 let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener);
91 let svc = self.into_service();
92
93 trace_info!(
94 %local_addr,
95 "A2A gRPC server listening"
96 );
97
98 let limit = self.config.concurrency_limit;
99 tokio::spawn(async move {
100 let _ = tonic::transport::Server::builder()
101 .concurrency_limit_per_connection(limit)
102 .add_service(svc)
103 .serve_with_incoming(incoming)
104 .await;
105 });
106
107 Ok(local_addr)
108 }
109
110 #[must_use]
115 pub fn into_service(&self) -> A2aServiceServer<GrpcServiceImpl> {
116 let inner = GrpcServiceImpl {
117 handler: Arc::clone(&self.handler),
118 config: self.config.clone(),
119 };
120 A2aServiceServer::new(inner)
121 .max_decoding_message_size(self.config.max_message_size)
122 .max_encoding_message_size(self.config.max_message_size)
123 }
124}
125
126impl std::fmt::Debug for GrpcDispatcher {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 f.debug_struct("GrpcDispatcher")
129 .field("handler", &"RequestHandler { .. }")
130 .field("config", &self.config)
131 .finish()
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 #[test]
140 fn grpc_dispatcher_debug_does_not_panic() {
141 use crate::agent_executor;
142 use crate::RequestHandlerBuilder;
143 use std::sync::Arc;
144 struct DummyExec;
145 agent_executor!(DummyExec, |_ctx, _queue| async { Ok(()) });
146 let handler = Arc::new(RequestHandlerBuilder::new(DummyExec).build().unwrap());
147 let dispatcher = GrpcDispatcher::new(handler, GrpcConfig::default());
148 let _ = format!("{dispatcher:?}");
149 }
150}