Skip to main content

tap_http/
server.rs

1//! HTTP server implementation for TAP DIDComm messages.
2//!
3//! This module provides a complete HTTP server implementation for the Transaction Authorization
4//! Protocol (TAP). The server exposes endpoints for:
5//!
6//! - Processing DIDComm messages for TAP operations
7//! - Health checks for monitoring system availability
8//!
9//! The server is built using the Warp web framework and provides graceful shutdown capabilities.
10//!
11//! # Features
12//!
13//! - HTTP/WebSocket messaging for DIDComm transport
14//! - Message validation for TAP protocol compliance
15//! - Configurable host, port, and endpoint paths
16//! - Support for optional TLS encryption
17//! - Graceful shutdown handling
18//! - Health check monitoring endpoint
19//!
20//! # Configuration
21//!
22//! The server can be configured with the `TapHttpConfig` struct, which allows setting:
23//!
24//! - Host address and port
25//! - DIDComm endpoint path
26//! - TLS configuration (certificate and key paths)
27//! - Rate limiting options
28//! - Request timeout settings
29//!
30//! # Example
31//!
32//! ```rust,no_run
33//! use tap_http::{TapHttpConfig, TapHttpServer};
34//! use tap_node::{NodeConfig, TapNode};
35//! use std::time::Duration;
36//!
37//! #[tokio::main]
38//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
39//!     // Create a TAP Node
40//!     let node = TapNode::new(NodeConfig::default());
41//!     
42//!     // Configure the HTTP server with custom settings
43//!     let config = TapHttpConfig {
44//!         host: "0.0.0.0".to_string(),    // Listen on all interfaces
45//!         port: 8080,                     // Custom port
46//!         didcomm_endpoint: "/api/didcomm".to_string(),  // Custom endpoint path
47//!         request_timeout_secs: 60,       // 60-second timeout for outbound requests
48//!         ..TapHttpConfig::default()
49//!     };
50//!     
51//!     // Create and start the server
52//!     let mut server = TapHttpServer::new(config, node);
53//!     server.start().await?;
54//!     
55//!     // Wait for shutdown signal
56//!     tokio::signal::ctrl_c().await?;
57//!     
58//!     // Gracefully stop the server
59//!     server.stop().await?;
60//!     
61//!     Ok(())
62//! }
63//! ```
64
65use crate::config::TapHttpConfig;
66use crate::error::{Error, Result};
67use crate::event::{EventBus, EventLogger};
68use crate::handler::{handle_didcomm, handle_health_check, handle_well_known_did};
69use std::convert::Infallible;
70use std::net::SocketAddr;
71use std::sync::Arc;
72use tap_node::TapNode;
73use tokio::sync::oneshot;
74use tracing::{error, info, warn};
75use warp::{Filter, Rejection, Reply};
76
77// Rate limiter will be implemented in the future update
78
79/// TAP HTTP server for handling DIDComm messages.
80///
81/// This server implementation provides endpoints for:
82/// - `/didcomm` - For processing DIDComm messages via the TAP protocol
83/// - `/health` - For checking the server's operational status
84///
85/// The server requires a configuration and a TapNode instance to function.
86/// The TapNode is responsible for the actual message processing logic.
87pub struct TapHttpServer {
88    /// Server configuration.
89    config: TapHttpConfig,
90
91    /// TAP Node for message processing.
92    node: Arc<TapNode>,
93
94    /// Shutdown channel for graceful server termination.
95    shutdown_tx: Option<oneshot::Sender<()>>,
96
97    /// Event bus for tracking server events.
98    event_bus: Arc<EventBus>,
99}
100
101impl TapHttpServer {
102    /// Creates a new HTTP server with the given configuration and TAP Node.
103    ///
104    /// # Parameters
105    /// * `config` - The server configuration including host, port, and endpoint settings
106    /// * `node` - The TAP Node instance used for processing DIDComm messages
107    ///
108    /// # Returns
109    /// A new TapHttpServer instance that can be started with the `start` method
110    pub fn new(config: TapHttpConfig, node: TapNode) -> Self {
111        // Log if rate limiting is configured but not implemented yet
112        if config.rate_limit.is_some() {
113            warn!("Rate limiting is configured but not yet implemented");
114        }
115
116        // Log if TLS is configured but not implemented yet
117        if config.tls.is_some() {
118            warn!("TLS is configured but not yet fully implemented");
119        }
120
121        // Create the event bus
122        let event_bus = Arc::new(EventBus::new());
123
124        // Initialize event logger if configured
125        if let Some(logger_config) = &config.event_logger {
126            let event_logger = EventLogger::new(logger_config.clone());
127            event_bus.subscribe(event_logger);
128            info!("Event logging enabled");
129        }
130
131        Self {
132            config,
133            node: Arc::new(node),
134            shutdown_tx: None,
135            event_bus,
136        }
137    }
138
139    /// Starts the HTTP server.
140    ///
141    /// This method:
142    /// 1. Configures the server routes based on the provided configuration
143    /// 2. Sets up a graceful shutdown channel
144    /// 3. Starts the server in a separate Tokio task
145    ///
146    /// The server runs until the `stop` method is called.
147    ///
148    /// # Returns
149    /// * `Ok(())` - If the server started successfully
150    /// * `Err(Error)` - If there was an error starting the server
151    pub async fn start(&mut self) -> Result<()> {
152        let addr: SocketAddr = self
153            .config
154            .server_addr()
155            .parse()
156            .map_err(|e| Error::Http(format!("Invalid address: {}", e)))?;
157
158        // Clone Arc<TapNode> for use in route handlers
159        let node = self.node.clone();
160
161        // Clone the event bus for use in route handlers
162        let event_bus = self.event_bus.clone();
163
164        // Get the endpoint path from config
165        let endpoint_path = self
166            .config
167            .didcomm_endpoint
168            .trim_start_matches('/')
169            .to_string();
170
171        // Create DIDComm endpoint (1MB body size limit)
172        let didcomm_route = warp::path(endpoint_path)
173            .and(warp::post())
174            .and(warp::header::optional::<String>("content-type"))
175            .and(warp::body::content_length_limit(1024 * 1024))
176            .and(warp::body::bytes())
177            .and(with_node(node.clone()))
178            .and(with_event_bus(event_bus.clone()))
179            .and_then(handle_didcomm);
180
181        // Health check endpoint
182        let health_route = warp::path("health")
183            .and(warp::get())
184            .and(with_event_bus(event_bus.clone()))
185            .and_then(handle_health_check);
186
187        // Optionally add /.well-known/did.json for did:web hosting
188        let enable_web_did = self.config.enable_web_did;
189
190        // Build routes with or without the well-known endpoint
191        if enable_web_did {
192            info!("Web DID hosting enabled at /.well-known/did.json");
193
194            let max_agents = self.config.max_agents;
195            let well_known_route = warp::path(".well-known")
196                .and(warp::path("did.json"))
197                .and(warp::path::end())
198                .and(warp::get())
199                .and(warp::header::optional::<String>("host"))
200                .and(with_node(node.clone()))
201                .and(with_event_bus(event_bus.clone()))
202                .and(warp::any().map(move || max_agents))
203                .and_then(handle_well_known_did);
204
205            let routes = didcomm_route
206                .or(health_route)
207                .or(well_known_route)
208                .with(warp::log("tap_http"))
209                .with(warp::reply::with::header(
210                    "X-Content-Type-Options",
211                    "nosniff",
212                ))
213                .with(warp::reply::with::header("X-Frame-Options", "DENY"))
214                .with(warp::reply::with::header("Cache-Control", "no-store"))
215                .with(warp::reply::with::header(
216                    "Content-Security-Policy",
217                    "default-src 'none'",
218                ))
219                .recover(handle_rejection);
220
221            return self.spawn_server(routes, addr, event_bus).await;
222        }
223
224        // Combine routes without well-known endpoint
225        let routes = didcomm_route
226            .or(health_route)
227            .with(warp::log("tap_http"))
228            .with(warp::reply::with::header(
229                "X-Content-Type-Options",
230                "nosniff",
231            ))
232            .with(warp::reply::with::header("X-Frame-Options", "DENY"))
233            .with(warp::reply::with::header("Cache-Control", "no-store"))
234            .with(warp::reply::with::header(
235                "Content-Security-Policy",
236                "default-src 'none'",
237            ))
238            .recover(handle_rejection);
239
240        self.spawn_server(routes, addr, event_bus).await
241    }
242
243    /// Stops the HTTP server.
244    ///
245    /// This method sends a shutdown signal to the server, allowing it to terminate gracefully.
246    ///
247    /// # Returns
248    /// * `Ok(())` - If the server was stopped successfully
249    /// * `Err(Error)` - If there was an error stopping the server
250    pub async fn stop(&mut self) -> Result<()> {
251        if let Some(tx) = self.shutdown_tx.take() {
252            let _ = tx.send(());
253            info!("Sent shutdown signal to TAP HTTP server");
254        } else {
255            warn!("TAP HTTP server is not running");
256        }
257        Ok(())
258    }
259
260    /// Returns a reference to the underlying TAP Node.
261    ///
262    /// The TAP Node is responsible for processing DIDComm messages.
263    pub fn node(&self) -> &Arc<TapNode> {
264        &self.node
265    }
266
267    /// Returns a reference to the server configuration.
268    ///
269    /// The server configuration includes settings for the host, port, and endpoint.
270    pub fn config(&self) -> &TapHttpConfig {
271        &self.config
272    }
273
274    /// Returns a reference to the event bus.
275    ///
276    /// The event bus is used to publish and subscribe to server events.
277    pub fn event_bus(&self) -> &Arc<EventBus> {
278        &self.event_bus
279    }
280
281    /// Bind and spawn the warp server with the given routes.
282    async fn spawn_server<F>(
283        &mut self,
284        routes: F,
285        addr: SocketAddr,
286        event_bus: Arc<EventBus>,
287    ) -> Result<()>
288    where
289        F: Filter<Error = Infallible> + Clone + Send + Sync + 'static,
290        F::Extract: Reply,
291    {
292        let (tx, rx) = oneshot::channel::<()>();
293        self.shutdown_tx = Some(tx);
294
295        info!("Starting TAP HTTP server on {}", addr);
296
297        self.event_bus
298            .publish_server_started(addr.to_string())
299            .await;
300
301        let event_bus_clone = event_bus.clone();
302        let (_, server) = warp::serve(routes).bind_with_graceful_shutdown(addr, async move {
303            rx.await.ok();
304            info!("Shutting down TAP HTTP server");
305            event_bus_clone.publish_server_stopped().await;
306        });
307
308        tokio::spawn(server);
309
310        info!("TAP HTTP server started on {}", addr);
311        Ok(())
312    }
313
314    // Rate limiting functionality will be implemented in a future update
315}
316
317/// Helper function to provide the TAP Node to route handlers.
318fn with_node(
319    node: Arc<TapNode>,
320) -> impl Filter<Extract = (Arc<TapNode>,), Error = Infallible> + Clone {
321    warp::any().map(move || node.clone())
322}
323
324/// Helper function to provide the event bus to route handlers.
325fn with_event_bus(
326    event_bus: Arc<EventBus>,
327) -> impl Filter<Extract = (Arc<EventBus>,), Error = Infallible> + Clone {
328    warp::any().map(move || event_bus.clone())
329}
330
331/// Custom rejection for rate limited requests
332#[derive(Debug)]
333struct RateLimitedError;
334impl warp::reject::Reject for RateLimitedError {}
335
336/// Handler for rejections.
337async fn handle_rejection(err: Rejection) -> std::result::Result<impl Reply, Infallible> {
338    use crate::error::Error;
339
340    let error_response = if err.is_not_found() {
341        // Not found errors
342        let err = Error::Http("Resource not found".to_string());
343        err.to_response()
344    } else if err.find::<warp::reject::PayloadTooLarge>().is_some() {
345        // Payload too large
346        let err = Error::Http("Payload too large".to_string());
347        err.to_response()
348    } else if err.find::<warp::reject::UnsupportedMediaType>().is_some() {
349        // Unsupported media type
350        let err = Error::Http("Unsupported media type".to_string());
351        err.to_response()
352    } else if err.find::<warp::reject::MethodNotAllowed>().is_some() {
353        // Method not allowed
354        let err = Error::Http("Method not allowed".to_string());
355        err.to_response()
356    } else if err.find::<RateLimitedError>().is_some() {
357        // Rate limiting
358        let err = Error::RateLimit("Too many requests, please try again later".to_string());
359        err.to_response()
360    } else {
361        // Unhandled error
362        error!("Unhandled rejection: {:?}", err);
363        let err = Error::Unknown("Internal server error".to_string());
364        err.to_response()
365    };
366
367    Ok(error_response)
368}