a2a_rs/adapter/transport/http/
server.rs1use std::sync::Arc;
6
7use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
8
9#[cfg(feature = "tracing")]
10use tracing::{debug, error, info, instrument};
11
12use crate::{
13 adapter::{
14 auth::{NoopAuthenticator, with_auth},
15 error::HttpServerError,
16 },
17 domain::{
18 A2AError,
19 generated::{A2aService, A2aServiceExt},
20 },
21 port::Authenticator,
22 services::server::AgentInfoProvider,
23};
24
25pub struct HttpServer<P, A, Auth = NoopAuthenticator>
27where
28 P: A2aService + Send + Sync + 'static,
29 A: AgentInfoProvider + Send + Sync + 'static,
30 Auth: Authenticator + Send + Sync + 'static,
31{
32 processor: Arc<P>,
35 agent_info: Arc<A>,
37 address: String,
39 authenticator: Option<Arc<Auth>>,
41}
42
43impl<P, A> HttpServer<P, A>
44where
45 P: A2aService + Send + Sync + 'static,
46 A: AgentInfoProvider + Send + Sync + 'static,
47{
48 pub fn new(processor: P, agent_info: A, address: String) -> Self {
50 Self {
51 processor: Arc::new(processor),
52 agent_info: Arc::new(agent_info),
53 address,
54 authenticator: None,
55 }
56 }
57}
58
59impl<P, A, Auth> HttpServer<P, A, Auth>
60where
61 P: A2aService + Send + Sync + 'static,
62 A: AgentInfoProvider + Send + Sync + 'static,
63 Auth: Authenticator + Clone + Send + Sync + 'static,
64{
65 pub fn with_auth(processor: P, agent_info: A, address: String, authenticator: Auth) -> Self {
67 Self {
68 processor: Arc::new(processor),
69 agent_info: Arc::new(agent_info),
70 address,
71 authenticator: Some(Arc::new(authenticator)),
72 }
73 }
74
75 #[cfg_attr(feature = "tracing", instrument(skip(self), fields(
77 server.address = %self.address,
78 server.has_auth = self.authenticator.is_some()
79 )))]
80 pub async fn start(&self) -> Result<(), A2AError> {
81 #[cfg(feature = "tracing")]
82 info!("Starting HTTP server");
83
84 let processor = self.processor.clone();
85 let agent_info = self.agent_info.clone();
86
87 let connect_router = processor.register(connectrpc::Router::new());
89
90 let mut app = Router::new()
91 .route("/.well-known/agent-card.json", get(handle_agent_card))
93 .route("/agent-card", get(handle_agent_card))
95 .route("/skills", get(handle_skills))
96 .route("/skills/{id}", get(handle_skill_by_id))
97 .fallback_service(connect_router.into_axum_service())
98 .with_state(ServerState {
99 agent_info: agent_info.clone(),
100 });
101
102 if let Some(auth) = &self.authenticator {
104 let auth_clone = auth.clone();
106
107 app = with_auth(app, (*auth_clone).clone());
109 }
110
111 let listener = tokio::net::TcpListener::bind(&self.address)
112 .await
113 .map_err(HttpServerError::Io)?;
114
115 #[cfg(feature = "tracing")]
116 info!("HTTP server listening on {}", self.address);
117
118 axum::serve(listener, app).await.map_err(|e| {
119 #[cfg(feature = "tracing")]
120 error!("Server error: {}", e);
121 HttpServerError::Server(format!("Server error: {}", e))
122 })?;
123
124 Ok(())
125 }
126}
127
128struct ServerState<A>
129where
130 A: AgentInfoProvider + Send + Sync + 'static,
131{
132 agent_info: Arc<A>,
133}
134
135impl<A> Clone for ServerState<A>
136where
137 A: AgentInfoProvider + Send + Sync + 'static,
138{
139 fn clone(&self) -> Self {
140 Self {
141 agent_info: self.agent_info.clone(),
142 }
143 }
144}
145
146#[cfg_attr(feature = "tracing", instrument(skip(state)))]
148async fn handle_agent_card<A>(State(state): State<ServerState<A>>) -> impl IntoResponse
149where
150 A: AgentInfoProvider + Send + Sync + 'static,
151{
152 #[cfg(feature = "tracing")]
153 debug!("Fetching agent card");
154 match state.agent_info.get_agent_card().await {
155 Ok(card) => {
156 #[cfg(feature = "tracing")]
157 debug!("Agent card retrieved successfully");
158 (StatusCode::OK, Json(card)).into_response()
159 }
160 Err(e) => {
161 (
163 StatusCode::INTERNAL_SERVER_ERROR,
164 Json(serde_json::json!({
165 "error": e.to_string()
166 })),
167 )
168 .into_response()
169 }
170 }
171}
172
173async fn handle_skills<A>(State(state): State<ServerState<A>>) -> impl IntoResponse
175where
176 A: AgentInfoProvider + Send + Sync + 'static,
177{
178 match state.agent_info.get_skills().await {
179 Ok(skills) => (StatusCode::OK, Json(skills)).into_response(),
180 Err(e) => (
181 StatusCode::INTERNAL_SERVER_ERROR,
182 Json(serde_json::json!({
183 "error": e.to_string()
184 })),
185 )
186 .into_response(),
187 }
188}
189
190async fn handle_skill_by_id<A>(
192 State(state): State<ServerState<A>>,
193 axum::extract::Path(id): axum::extract::Path<String>,
194) -> impl IntoResponse
195where
196 A: AgentInfoProvider + Send + Sync + 'static,
197{
198 match state.agent_info.get_skill_by_id(&id).await {
199 Ok(Some(skill)) => (StatusCode::OK, Json(skill)).into_response(),
200 Ok(None) => (
201 StatusCode::NOT_FOUND,
202 Json(serde_json::json!({
203 "error": format!("Skill with ID '{}' not found", id)
204 })),
205 )
206 .into_response(),
207 Err(e) => (
208 StatusCode::INTERNAL_SERVER_ERROR,
209 Json(serde_json::json!({
210 "error": e.to_string()
211 })),
212 )
213 .into_response(),
214 }
215}