tap_http/server.rs
1//! HTTP server implementation for TAP DIDComm messages.
2//!
3//! This module provides a complete HTTP server implementation for the Transaction Authorization
4//! Protocol (TAP). The server exposes endpoints for:
5//!
6//! - Processing DIDComm messages for TAP operations
7//! - Health checks for monitoring system availability
8//!
9//! The server is built using the Warp web framework and provides graceful shutdown capabilities.
10//!
11//! # Features
12//!
13//! - HTTP/WebSocket messaging for DIDComm transport
14//! - Message validation for TAP protocol compliance
15//! - Configurable host, port, and endpoint paths
16//! - Support for optional TLS encryption
17//! - Graceful shutdown handling
18//! - Health check monitoring endpoint
19//!
20//! # Configuration
21//!
22//! The server can be configured with the `TapHttpConfig` struct, which allows setting:
23//!
24//! - Host address and port
25//! - DIDComm endpoint path
26//! - TLS configuration (certificate and key paths)
27//! - Rate limiting options
28//! - Request timeout settings
29//!
30//! # Example
31//!
32//! ```rust,no_run
33//! use tap_http::{TapHttpConfig, TapHttpServer};
34//! use tap_node::{NodeConfig, TapNode};
35//! use std::time::Duration;
36//!
37//! #[tokio::main]
38//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
39//! // Create a TAP Node
40//! let node = TapNode::new(NodeConfig::default());
41//!
42//! // Configure the HTTP server with custom settings
43//! let config = TapHttpConfig {
44//! host: "0.0.0.0".to_string(), // Listen on all interfaces
45//! port: 8080, // Custom port
46//! didcomm_endpoint: "/api/didcomm".to_string(), // Custom endpoint path
47//! request_timeout_secs: 60, // 60-second timeout for outbound requests
48//! ..TapHttpConfig::default()
49//! };
50//!
51//! // Create and start the server
52//! let mut server = TapHttpServer::new(config, node);
53//! server.start().await?;
54//!
55//! // Wait for shutdown signal
56//! tokio::signal::ctrl_c().await?;
57//!
58//! // Gracefully stop the server
59//! server.stop().await?;
60//!
61//! Ok(())
62//! }
63//! ```
64
65use crate::config::TapHttpConfig;
66use crate::error::{Error, Result};
67use crate::event::{EventBus, EventLogger};
68use crate::handler::{handle_didcomm, handle_health_check, handle_well_known_did};
69use std::convert::Infallible;
70use std::net::SocketAddr;
71use std::sync::Arc;
72use tap_node::TapNode;
73use tokio::sync::oneshot;
74use tracing::{error, info, warn};
75use warp::{Filter, Rejection, Reply};
76
77// Rate limiter will be implemented in the future update
78
79/// TAP HTTP server for handling DIDComm messages.
80///
81/// This server implementation provides endpoints for:
82/// - `/didcomm` - For processing DIDComm messages via the TAP protocol
83/// - `/health` - For checking the server's operational status
84///
85/// The server requires a configuration and a TapNode instance to function.
86/// The TapNode is responsible for the actual message processing logic.
87pub struct TapHttpServer {
88 /// Server configuration.
89 config: TapHttpConfig,
90
91 /// TAP Node for message processing.
92 node: Arc<TapNode>,
93
94 /// Shutdown channel for graceful server termination.
95 shutdown_tx: Option<oneshot::Sender<()>>,
96
97 /// Event bus for tracking server events.
98 event_bus: Arc<EventBus>,
99}
100
101impl TapHttpServer {
102 /// Creates a new HTTP server with the given configuration and TAP Node.
103 ///
104 /// # Parameters
105 /// * `config` - The server configuration including host, port, and endpoint settings
106 /// * `node` - The TAP Node instance used for processing DIDComm messages
107 ///
108 /// # Returns
109 /// A new TapHttpServer instance that can be started with the `start` method
110 pub fn new(config: TapHttpConfig, node: TapNode) -> Self {
111 // Log if rate limiting is configured but not implemented yet
112 if config.rate_limit.is_some() {
113 warn!("Rate limiting is configured but not yet implemented");
114 }
115
116 // Log if TLS is configured but not implemented yet
117 if config.tls.is_some() {
118 warn!("TLS is configured but not yet fully implemented");
119 }
120
121 // Create the event bus
122 let event_bus = Arc::new(EventBus::new());
123
124 // Initialize event logger if configured
125 if let Some(logger_config) = &config.event_logger {
126 let event_logger = EventLogger::new(logger_config.clone());
127 event_bus.subscribe(event_logger);
128 info!("Event logging enabled");
129 }
130
131 Self {
132 config,
133 node: Arc::new(node),
134 shutdown_tx: None,
135 event_bus,
136 }
137 }
138
139 /// Starts the HTTP server.
140 ///
141 /// This method:
142 /// 1. Configures the server routes based on the provided configuration
143 /// 2. Sets up a graceful shutdown channel
144 /// 3. Starts the server in a separate Tokio task
145 ///
146 /// The server runs until the `stop` method is called.
147 ///
148 /// # Returns
149 /// * `Ok(())` - If the server started successfully
150 /// * `Err(Error)` - If there was an error starting the server
151 pub async fn start(&mut self) -> Result<()> {
152 let addr: SocketAddr = self
153 .config
154 .server_addr()
155 .parse()
156 .map_err(|e| Error::Http(format!("Invalid address: {}", e)))?;
157
158 // Clone Arc<TapNode> for use in route handlers
159 let node = self.node.clone();
160
161 // Clone the event bus for use in route handlers
162 let event_bus = self.event_bus.clone();
163
164 // Get the endpoint path from config
165 let endpoint_path = self
166 .config
167 .didcomm_endpoint
168 .trim_start_matches('/')
169 .to_string();
170
171 // Create DIDComm endpoint (1MB body size limit)
172 let didcomm_route = warp::path(endpoint_path)
173 .and(warp::post())
174 .and(warp::header::optional::<String>("content-type"))
175 .and(warp::body::content_length_limit(1024 * 1024))
176 .and(warp::body::bytes())
177 .and(with_node(node.clone()))
178 .and(with_event_bus(event_bus.clone()))
179 .and_then(handle_didcomm);
180
181 // Health check endpoint
182 let health_route = warp::path("health")
183 .and(warp::get())
184 .and(with_event_bus(event_bus.clone()))
185 .and_then(handle_health_check);
186
187 // Optionally add /.well-known/did.json for did:web hosting
188 let enable_web_did = self.config.enable_web_did;
189
190 // Build routes with or without the well-known endpoint
191 if enable_web_did {
192 info!("Web DID hosting enabled at /.well-known/did.json");
193
194 let max_agents = self.config.max_agents;
195 let well_known_route = warp::path(".well-known")
196 .and(warp::path("did.json"))
197 .and(warp::path::end())
198 .and(warp::get())
199 .and(warp::header::optional::<String>("host"))
200 .and(with_node(node.clone()))
201 .and(with_event_bus(event_bus.clone()))
202 .and(warp::any().map(move || max_agents))
203 .and_then(handle_well_known_did);
204
205 let routes = didcomm_route
206 .or(health_route)
207 .or(well_known_route)
208 .with(warp::log("tap_http"))
209 .with(warp::reply::with::header(
210 "X-Content-Type-Options",
211 "nosniff",
212 ))
213 .with(warp::reply::with::header("X-Frame-Options", "DENY"))
214 .with(warp::reply::with::header("Cache-Control", "no-store"))
215 .with(warp::reply::with::header(
216 "Content-Security-Policy",
217 "default-src 'none'",
218 ))
219 .recover(handle_rejection);
220
221 return self.spawn_server(routes, addr, event_bus).await;
222 }
223
224 // Combine routes without well-known endpoint
225 let routes = didcomm_route
226 .or(health_route)
227 .with(warp::log("tap_http"))
228 .with(warp::reply::with::header(
229 "X-Content-Type-Options",
230 "nosniff",
231 ))
232 .with(warp::reply::with::header("X-Frame-Options", "DENY"))
233 .with(warp::reply::with::header("Cache-Control", "no-store"))
234 .with(warp::reply::with::header(
235 "Content-Security-Policy",
236 "default-src 'none'",
237 ))
238 .recover(handle_rejection);
239
240 self.spawn_server(routes, addr, event_bus).await
241 }
242
243 /// Stops the HTTP server.
244 ///
245 /// This method sends a shutdown signal to the server, allowing it to terminate gracefully.
246 ///
247 /// # Returns
248 /// * `Ok(())` - If the server was stopped successfully
249 /// * `Err(Error)` - If there was an error stopping the server
250 pub async fn stop(&mut self) -> Result<()> {
251 if let Some(tx) = self.shutdown_tx.take() {
252 let _ = tx.send(());
253 info!("Sent shutdown signal to TAP HTTP server");
254 } else {
255 warn!("TAP HTTP server is not running");
256 }
257 Ok(())
258 }
259
260 /// Returns a reference to the underlying TAP Node.
261 ///
262 /// The TAP Node is responsible for processing DIDComm messages.
263 pub fn node(&self) -> &Arc<TapNode> {
264 &self.node
265 }
266
267 /// Returns a reference to the server configuration.
268 ///
269 /// The server configuration includes settings for the host, port, and endpoint.
270 pub fn config(&self) -> &TapHttpConfig {
271 &self.config
272 }
273
274 /// Returns a reference to the event bus.
275 ///
276 /// The event bus is used to publish and subscribe to server events.
277 pub fn event_bus(&self) -> &Arc<EventBus> {
278 &self.event_bus
279 }
280
281 /// Bind and spawn the warp server with the given routes.
282 async fn spawn_server<F>(
283 &mut self,
284 routes: F,
285 addr: SocketAddr,
286 event_bus: Arc<EventBus>,
287 ) -> Result<()>
288 where
289 F: Filter<Error = Infallible> + Clone + Send + Sync + 'static,
290 F::Extract: Reply,
291 {
292 let (tx, rx) = oneshot::channel::<()>();
293 self.shutdown_tx = Some(tx);
294
295 info!("Starting TAP HTTP server on {}", addr);
296
297 self.event_bus
298 .publish_server_started(addr.to_string())
299 .await;
300
301 let event_bus_clone = event_bus.clone();
302 let (_, server) = warp::serve(routes).bind_with_graceful_shutdown(addr, async move {
303 rx.await.ok();
304 info!("Shutting down TAP HTTP server");
305 event_bus_clone.publish_server_stopped().await;
306 });
307
308 tokio::spawn(server);
309
310 info!("TAP HTTP server started on {}", addr);
311 Ok(())
312 }
313
314 // Rate limiting functionality will be implemented in a future update
315}
316
317/// Helper function to provide the TAP Node to route handlers.
318fn with_node(
319 node: Arc<TapNode>,
320) -> impl Filter<Extract = (Arc<TapNode>,), Error = Infallible> + Clone {
321 warp::any().map(move || node.clone())
322}
323
324/// Helper function to provide the event bus to route handlers.
325fn with_event_bus(
326 event_bus: Arc<EventBus>,
327) -> impl Filter<Extract = (Arc<EventBus>,), Error = Infallible> + Clone {
328 warp::any().map(move || event_bus.clone())
329}
330
331/// Custom rejection for rate limited requests
332#[derive(Debug)]
333struct RateLimitedError;
334impl warp::reject::Reject for RateLimitedError {}
335
336/// Handler for rejections.
337async fn handle_rejection(err: Rejection) -> std::result::Result<impl Reply, Infallible> {
338 use crate::error::Error;
339
340 let error_response = if err.is_not_found() {
341 // Not found errors
342 let err = Error::Http("Resource not found".to_string());
343 err.to_response()
344 } else if err.find::<warp::reject::PayloadTooLarge>().is_some() {
345 // Payload too large
346 let err = Error::Http("Payload too large".to_string());
347 err.to_response()
348 } else if err.find::<warp::reject::UnsupportedMediaType>().is_some() {
349 // Unsupported media type
350 let err = Error::Http("Unsupported media type".to_string());
351 err.to_response()
352 } else if err.find::<warp::reject::MethodNotAllowed>().is_some() {
353 // Method not allowed
354 let err = Error::Http("Method not allowed".to_string());
355 err.to_response()
356 } else if err.find::<RateLimitedError>().is_some() {
357 // Rate limiting
358 let err = Error::RateLimit("Too many requests, please try again later".to_string());
359 err.to_response()
360 } else {
361 // Unhandled error
362 error!("Unhandled rejection: {:?}", err);
363 let err = Error::Unknown("Internal server error".to_string());
364 err.to_response()
365 };
366
367 Ok(error_response)
368}