Skip to main content

a2a_rs/adapter/transport/http/
server.rs

1//! HTTP server adapter for the A2A protocol
2
3// This module is already conditionally compiled with #[cfg(feature = "http-server")] in mod.rs
4
5use std::sync::Arc;
6
7use axum::{Json, Router, extract::State, http::StatusCode, response::IntoResponse, routing::get};
8
9#[cfg(feature = "tracing")]
10use tracing::{debug, error, info, instrument};
11
12use crate::{
13    adapter::{
14        auth::{NoopAuthenticator, with_auth},
15        error::HttpServerError,
16    },
17    domain::{
18        A2AError,
19        generated::{A2aService, A2aServiceExt},
20    },
21    port::Authenticator,
22    services::server::AgentInfoProvider,
23};
24
25/// HTTP server for the A2A protocol
26pub struct HttpServer<P, A, Auth = NoopAuthenticator>
27where
28    P: A2aService + Send + Sync + 'static,
29    A: AgentInfoProvider + Send + Sync + 'static,
30    Auth: Authenticator + Send + Sync + 'static,
31{
32    /// The `A2aService` implementation this server dispatches requests to
33    /// (e.g. [`ConnectRpcAdapter`](crate::adapter::ConnectRpcAdapter)).
34    processor: Arc<P>,
35    /// Agent info provider
36    agent_info: Arc<A>,
37    /// Server address
38    address: String,
39    /// Authenticator
40    authenticator: Option<Arc<Auth>>,
41}
42
43impl<P, A> HttpServer<P, A>
44where
45    P: A2aService + Send + Sync + 'static,
46    A: AgentInfoProvider + Send + Sync + 'static,
47{
48    /// Create a new HTTP server with the given processor and agent info provider
49    pub fn new(processor: P, agent_info: A, address: String) -> Self {
50        Self {
51            processor: Arc::new(processor),
52            agent_info: Arc::new(agent_info),
53            address,
54            authenticator: None,
55        }
56    }
57}
58
59impl<P, A, Auth> HttpServer<P, A, Auth>
60where
61    P: A2aService + Send + Sync + 'static,
62    A: AgentInfoProvider + Send + Sync + 'static,
63    Auth: Authenticator + Clone + Send + Sync + 'static,
64{
65    /// Create a new HTTP server with authentication
66    pub fn with_auth(processor: P, agent_info: A, address: String, authenticator: Auth) -> Self {
67        Self {
68            processor: Arc::new(processor),
69            agent_info: Arc::new(agent_info),
70            address,
71            authenticator: Some(Arc::new(authenticator)),
72        }
73    }
74
75    /// Start the HTTP server
76    #[cfg_attr(feature = "tracing", instrument(skip(self), fields(
77        server.address = %self.address,
78        server.has_auth = self.authenticator.is_some()
79    )))]
80    pub async fn start(&self) -> Result<(), A2AError> {
81        #[cfg(feature = "tracing")]
82        info!("Starting HTTP server");
83
84        let processor = self.processor.clone();
85        let agent_info = self.agent_info.clone();
86
87        // Register the ConnectRPC service
88        let connect_router = processor.register(connectrpc::Router::new());
89
90        let mut app = Router::new()
91            // v1.0.0 well-known URI endpoint (RFC 8615)
92            .route("/.well-known/agent-card.json", get(handle_agent_card))
93            // Backward compatibility routes
94            .route("/agent-card", get(handle_agent_card))
95            .route("/skills", get(handle_skills))
96            .route("/skills/{id}", get(handle_skill_by_id))
97            .fallback_service(connect_router.into_axum_service())
98            .with_state(ServerState {
99                agent_info: agent_info.clone(),
100            });
101
102        // Apply authentication if provided
103        if let Some(auth) = &self.authenticator {
104            // Clone the authenticator for the middleware
105            let auth_clone = auth.clone();
106
107            // Create an auth router with the authenticator
108            app = with_auth(app, (*auth_clone).clone());
109        }
110
111        let listener = tokio::net::TcpListener::bind(&self.address)
112            .await
113            .map_err(HttpServerError::Io)?;
114
115        #[cfg(feature = "tracing")]
116        info!("HTTP server listening on {}", self.address);
117
118        axum::serve(listener, app).await.map_err(|e| {
119            #[cfg(feature = "tracing")]
120            error!("Server error: {}", e);
121            HttpServerError::Server(format!("Server error: {}", e))
122        })?;
123
124        Ok(())
125    }
126}
127
128struct ServerState<A>
129where
130    A: AgentInfoProvider + Send + Sync + 'static,
131{
132    agent_info: Arc<A>,
133}
134
135impl<A> Clone for ServerState<A>
136where
137    A: AgentInfoProvider + Send + Sync + 'static,
138{
139    fn clone(&self) -> Self {
140        Self {
141            agent_info: self.agent_info.clone(),
142        }
143    }
144}
145
146/// Handle a request for the agent card
147#[cfg_attr(feature = "tracing", instrument(skip(state)))]
148async fn handle_agent_card<A>(State(state): State<ServerState<A>>) -> impl IntoResponse
149where
150    A: AgentInfoProvider + Send + Sync + 'static,
151{
152    #[cfg(feature = "tracing")]
153    debug!("Fetching agent card");
154    match state.agent_info.get_agent_card().await {
155        Ok(card) => {
156            #[cfg(feature = "tracing")]
157            debug!("Agent card retrieved successfully");
158            (StatusCode::OK, Json(card)).into_response()
159        }
160        Err(e) => {
161            // Map A2AError to HTTP response
162            (
163                StatusCode::INTERNAL_SERVER_ERROR,
164                Json(serde_json::json!({
165                    "error": e.to_string()
166                })),
167            )
168                .into_response()
169        }
170    }
171}
172
173/// Handle a request for all agent skills
174async fn handle_skills<A>(State(state): State<ServerState<A>>) -> impl IntoResponse
175where
176    A: AgentInfoProvider + Send + Sync + 'static,
177{
178    match state.agent_info.get_skills().await {
179        Ok(skills) => (StatusCode::OK, Json(skills)).into_response(),
180        Err(e) => (
181            StatusCode::INTERNAL_SERVER_ERROR,
182            Json(serde_json::json!({
183                "error": e.to_string()
184            })),
185        )
186            .into_response(),
187    }
188}
189
190/// Handle a request for a specific agent skill by ID
191async fn handle_skill_by_id<A>(
192    State(state): State<ServerState<A>>,
193    axum::extract::Path(id): axum::extract::Path<String>,
194) -> impl IntoResponse
195where
196    A: AgentInfoProvider + Send + Sync + 'static,
197{
198    match state.agent_info.get_skill_by_id(&id).await {
199        Ok(Some(skill)) => (StatusCode::OK, Json(skill)).into_response(),
200        Ok(None) => (
201            StatusCode::NOT_FOUND,
202            Json(serde_json::json!({
203                "error": format!("Skill with ID '{}' not found", id)
204            })),
205        )
206            .into_response(),
207        Err(e) => (
208            StatusCode::INTERNAL_SERVER_ERROR,
209            Json(serde_json::json!({
210                "error": e.to_string()
211            })),
212        )
213            .into_response(),
214    }
215}