Skip to main content

mcp/
http.rs

1//! HTTP Server Adapter for MCP
2//!
3//! This module defines a pluggable interface for HTTP servers that expose MCP protocols.
4//! The adapter pattern allows different HTTP frameworks (axum, actix, warp, etc.) to be
5//! swapped without changing the MCPServerBuilder API.
6//!
7//! # Design
8//!
9//! ```text
10//! MCPServerBuilder
11//!        ↓
12//!   (configures)
13//!        ↓
14//! HttpServerAdapter (trait)
15//!        ↓ (implements)
16//!   ┌────┴────┬─────────────┐
17//!   ↓         ↓             ↓
18//! AxumAdapter ActixAdapter OtherAdapter
19//! ```
20//!
21//! This allows users to swap HTTP frameworks without changing their code.
22
23use crate::builder_utils::IpFilter;
24use crate::events::McpEventHandler;
25use crate::protocol::ToolProtocol;
26use std::error::Error;
27use std::net::SocketAddr;
28use std::sync::Arc;
29
30#[cfg(feature = "server")]
31use axum::Router;
32
33/// Standard error body returned when a request is rejected because of a missing
34/// or invalid bearer token.
35///
36/// Returned in two shapes:
37/// - `Legacy` — flat `{ "error": "Unauthorized", "message": "..." }` for the
38///   `/tools/list`, `/tools/execute`, `/resources/list`, `/resources/read`
39///   legacy endpoints so they remain a superset of the previous wire format.
40/// - `Rpc` — the JSON-RPC 2.0 error object used by streamable-HTTP transports.
41pub const BEARER_TOKEN_REQUIRED_MESSAGE: &str =
42    "This MCP endpoint requires a Bearer token in the `Authorization: Bearer <token>` header. \
43     Either supply a valid token, or set the server's `MENTISDB_BEARER_TOKEN_ACCESS=false` to \
44     disable bearer-token enforcement for the daemon.";
45
46/// Build a flat JSON body returned for bearer-token rejections on legacy
47/// (`/tools/list`, `/tools/execute`, `/resources/list`, `/resources/read`)
48/// endpoints.
49#[cfg(feature = "server")]
50fn bearer_token_required_body() -> serde_json::Value {
51    serde_json::json!({
52        "error": "Unauthorized",
53        "message": BEARER_TOKEN_REQUIRED_MESSAGE,
54        "error_description": BEARER_TOKEN_REQUIRED_MESSAGE,
55        "hint": "Send `Authorization: Bearer <token>` with a token issued by `mentisdb bearertoken create --alias <name>`.",
56    })
57}
58
59/// Configuration for an HTTP MCP server
60pub struct HttpServerConfig {
61    /// Socket address to bind to (e.g., "127.0.0.1:8080")
62    pub addr: SocketAddr,
63    /// Optional bearer token for authentication
64    pub bearer_token: Option<String>,
65    /// Optional dynamic bearer-token authorizer.
66    ///
67    /// When present, this authorizer is consulted after the static
68    /// [`bearer_token`](Self::bearer_token) check. A request is accepted when
69    /// either configured mechanism accepts the supplied bearer token.
70    pub bearer_authorizer: Option<Arc<dyn BearerTokenAuthorizer>>,
71    /// IP filter controlling which client addresses are allowed
72    pub ip_filter: IpFilter,
73    /// Optional event handler for MCP server lifecycle and request events
74    pub event_handler: Option<Arc<dyn McpEventHandler>>,
75}
76
77impl Clone for HttpServerConfig {
78    fn clone(&self) -> Self {
79        Self {
80            addr: self.addr,
81            bearer_token: self.bearer_token.clone(),
82            bearer_authorizer: self.bearer_authorizer.clone(),
83            ip_filter: self.ip_filter.clone(),
84            event_handler: self.event_handler.clone(),
85        }
86    }
87}
88
89impl std::fmt::Debug for HttpServerConfig {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        f.debug_struct("HttpServerConfig")
92            .field("addr", &self.addr)
93            .field("has_bearer_token", &self.bearer_token.is_some())
94            .field("has_bearer_authorizer", &self.bearer_authorizer.is_some())
95            .field("ip_filter", &self.ip_filter)
96            .field("has_event_handler", &self.event_handler.is_some())
97            .finish()
98    }
99}
100
101/// Per-request context passed to a dynamic bearer-token authorizer.
102///
103/// The MCP runtime builds this value before dispatching a request to the
104/// protocol implementation. Servers can inspect it to make authorization
105/// decisions that depend on more than the raw token string.
106///
107/// # Payload shape
108///
109/// `payload` is deliberately transport-shaped:
110///
111/// - streamable HTTP JSON-RPC requests receive the `params` object, such as
112///   `{ "name": "tool_name", "arguments": { ... } }` for `tools/call`.
113/// - legacy `/tools/execute` requests receive the full request body, usually
114///   `{ "tool": "tool_name", "parameters": { ... } }`.
115/// - metadata-style routes such as `/tools/list` and `/resources/list` receive
116///   `None` when there is no useful body to authorize.
117///
118/// Server crates keep ownership of policy. For example, a memory server can
119/// inspect tool arguments inside `payload` and allow a token for one chain while
120/// denying the same token for another.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct BearerAuthContext {
123    /// Client socket address reported by the HTTP framework.
124    pub client_addr: SocketAddr,
125    /// HTTP route that received the request.
126    pub route: String,
127    /// MCP method or legacy action being authorized.
128    pub action: String,
129    /// Parsed request payload or JSON-RPC params, when available.
130    ///
131    /// This value is cloned from the already-parsed request body so authorizers
132    /// do not need to parse JSON a second time.
133    pub payload: Option<serde_json::Value>,
134}
135
136/// Dynamic bearer-token authorization hook for MCP HTTP transports.
137///
138/// Implement this trait when a server needs revocable tokens, scoped access,
139/// or token lookup from durable storage instead of a single static configured
140/// secret.
141///
142/// # Examples
143///
144/// ```
145/// use mcp::{BearerAuthContext, BearerTokenAuthorizer};
146///
147/// struct ToolListOnly;
148///
149/// impl BearerTokenAuthorizer for ToolListOnly {
150///     fn authorize_bearer_token(&self, token: &str, context: &BearerAuthContext) -> bool {
151///         token == "good-token" && context.action == "tools/list"
152///     }
153/// }
154/// ```
155pub trait BearerTokenAuthorizer: Send + Sync {
156    /// Return `true` when a request without a bearer token should be allowed.
157    ///
158    /// The default is fail-closed. Override this only when the embedding server
159    /// has an explicit runtime mode where unauthenticated requests are expected.
160    fn allow_missing_bearer_token(&self, _context: &BearerAuthContext) -> bool {
161        false
162    }
163
164    /// Return `true` when `token` is allowed for `context`.
165    ///
166    /// Implementations should avoid logging raw token values. If comparing
167    /// against stored secrets, prefer constant-time hash comparison in the
168    /// server crate.
169    fn authorize_bearer_token(&self, token: &str, context: &BearerAuthContext) -> bool;
170}
171
172/// A running HTTP server instance
173pub struct HttpServerInstance {
174    /// Socket address the server is listening on
175    pub addr: SocketAddr,
176    /// Handle for shutting down the server
177    /// Type erased to allow different framework implementations
178    shutdown_handle: Box<dyn std::any::Any + Send + Sync>,
179}
180
181impl HttpServerInstance {
182    /// Create a new server instance with the given address and shutdown handle
183    pub fn new(addr: SocketAddr, shutdown_handle: Box<dyn std::any::Any + Send + Sync>) -> Self {
184        Self {
185            addr,
186            shutdown_handle,
187        }
188    }
189
190    /// Get the server's socket address
191    pub fn get_addr(&self) -> SocketAddr {
192        self.addr
193    }
194
195    /// Get mutable reference to the shutdown handle for advanced usage
196    pub fn shutdown_handle_mut(&mut self) -> &mut Box<dyn std::any::Any + Send + Sync> {
197        &mut self.shutdown_handle
198    }
199}
200
201/// Trait for HTTP server implementations
202///
203/// Implementations of this trait provide HTTP endpoints for MCP protocols.
204/// Different HTTP frameworks can be swapped by implementing this trait.
205#[async_trait::async_trait]
206pub trait HttpServerAdapter: Send + Sync {
207    /// Start the HTTP server with the given configuration and tool protocol
208    ///
209    /// # Arguments
210    ///
211    /// * `config` - Server configuration (address, auth, IP filtering)
212    /// * `protocol` - The ToolProtocol implementation to expose
213    ///
214    /// # Endpoints
215    ///
216    /// The server must provide the following endpoints:
217    /// - `POST /tools/list` - List all available tools from the protocol
218    /// - `POST /tools/execute` - Execute a tool with given parameters
219    /// - `POST /resources/list` - List all available resources (if protocol supports)
220    /// - `POST /resources/read` - Read a resource by URI (if protocol supports)
221    ///
222    /// # Returns
223    ///
224    /// A running server instance, or an error if startup fails
225    async fn start(
226        &self,
227        config: HttpServerConfig,
228        protocol: Arc<dyn ToolProtocol>,
229    ) -> Result<HttpServerInstance, Box<dyn Error + Send + Sync>>;
230
231    /// Get the name of this adapter (for logging/debugging)
232    fn name(&self) -> &str {
233        "unknown"
234    }
235}
236
237/// Build an Axum router that exposes a [`ToolProtocol`] over the shared HTTP MCP surface.
238///
239/// The returned router serves:
240/// - `POST /tools/list`
241/// - `POST /tools/execute`
242/// - `POST /resources/list`
243/// - `POST /resources/read`
244///
245/// This helper is useful when a crate wants to reuse the shared MCP transport
246/// but still compose extra routes of its own, such as a `/health` endpoint.
247#[cfg(feature = "server")]
248pub fn axum_router(config: &HttpServerConfig, protocol: Arc<dyn ToolProtocol>) -> Router {
249    use crate::events::McpEvent;
250    use axum::{
251        extract::ConnectInfo, http::HeaderMap, http::StatusCode, response::IntoResponse,
252        routing::post, Json, Router,
253    };
254    use serde_json::json;
255    use sha2::{Digest, Sha256};
256    use subtle::ConstantTimeEq;
257
258    fn bearer_from_headers(headers: &HeaderMap) -> Option<&str> {
259        headers
260            .get("Authorization")
261            .and_then(|v| v.to_str().ok())
262            .and_then(|v| v.strip_prefix("Bearer "))
263    }
264
265    /// Validate the Authorization header against static or dynamic bearer auth.
266    ///
267    /// Returns `true` when neither auth mechanism is configured (open server),
268    /// when the supplied `Bearer <token>` matches the static token, or when the
269    /// dynamic authorizer accepts it. Static comparison uses
270    /// `subtle::ConstantTimeEq` on SHA-256 digests so the compiler cannot
271    /// short-circuit the comparison and leak token length via timing.
272    fn check_auth(
273        expected_token: &Option<String>,
274        authorizer: &Option<Arc<dyn BearerTokenAuthorizer>>,
275        headers: &HeaderMap,
276        context: BearerAuthContext,
277    ) -> bool {
278        if expected_token.is_none() && authorizer.is_none() {
279            return true;
280        }
281
282        let Some(provided) = bearer_from_headers(headers) else {
283            return authorizer
284                .as_ref()
285                .is_some_and(|auth| auth.allow_missing_bearer_token(&context));
286        };
287
288        if let Some(expected) = expected_token.as_deref() {
289            let expected_hash = Sha256::digest(expected.as_bytes());
290            let provided_hash = Sha256::digest(provided.as_bytes());
291            if bool::from(expected_hash.ct_eq(&provided_hash)) {
292                return true;
293            }
294        }
295
296        authorizer
297            .as_ref()
298            .is_some_and(|auth| auth.authorize_bearer_token(provided, &context))
299    }
300
301    let bearer_token = Arc::new(config.bearer_token.clone());
302    let bearer_authorizer = Arc::new(config.bearer_authorizer.clone());
303    let ip_filter = Arc::new(config.ip_filter.clone());
304
305    let token_list = bearer_token.clone();
306    let authz_list = bearer_authorizer.clone();
307    let ips_list = ip_filter.clone();
308    let token_exec = bearer_token.clone();
309    let authz_exec = bearer_authorizer.clone();
310    let ips_exec = ip_filter.clone();
311    let token_res_list = bearer_token.clone();
312    let authz_res_list = bearer_authorizer.clone();
313    let ips_res_list = ip_filter.clone();
314    let token_res_read = bearer_token.clone();
315    let authz_res_read = bearer_authorizer.clone();
316    let ips_res_read = ip_filter.clone();
317
318    let eh_list = config.event_handler.clone();
319    let eh_exec = config.event_handler.clone();
320
321    let protocol_list = protocol.clone();
322    let protocol_exec = protocol.clone();
323    let protocol_res_list = protocol.clone();
324    let protocol_res_read = protocol.clone();
325
326    Router::new()
327        .route(
328            "/tools/list",
329            post(
330                move |ConnectInfo(addr): ConnectInfo<SocketAddr>, headers: HeaderMap| {
331                    let token = token_list.clone();
332                    let authz = authz_list.clone();
333                    let allowed = ips_list.clone();
334                    let proto = protocol_list.clone();
335                    let eh = eh_list.clone();
336                    async move {
337                        if !allowed.is_allowed(addr.ip()) {
338                            if let Some(ref handler) = eh {
339                                handler
340                                    .on_mcp_event(&McpEvent::RequestRejected {
341                                        client_addr: addr.ip().to_string(),
342                                        reason: "IP not allowed".to_string(),
343                                    })
344                                    .await;
345                            }
346                            return (
347                                StatusCode::FORBIDDEN,
348                                Json(json!({"error": "Access denied"})),
349                            )
350                                .into_response();
351                        }
352
353                        if !check_auth(
354                            &token,
355                            &authz,
356                            &headers,
357                            BearerAuthContext {
358                                client_addr: addr,
359                                route: "/tools/list".to_string(),
360                                action: "tools/list".to_string(),
361                                payload: None,
362                            },
363                        ) {
364                            return (StatusCode::UNAUTHORIZED, Json(bearer_token_required_body()))
365                                .into_response();
366                        }
367
368                        if let Some(ref handler) = eh {
369                            handler
370                                .on_mcp_event(&McpEvent::ToolListRequested {
371                                    client_addr: addr.ip().to_string(),
372                                })
373                                .await;
374                        }
375
376                        match proto.list_tools().await {
377                            Ok(tools) => {
378                                let tool_count = tools.len();
379                                if let Some(ref handler) = eh {
380                                    handler
381                                        .on_mcp_event(&McpEvent::ToolListReturned {
382                                            client_addr: addr.ip().to_string(),
383                                            tool_count,
384                                        })
385                                        .await;
386                                }
387                                (StatusCode::OK, Json(json!({"tools": tools}))).into_response()
388                            }
389                            Err(e) => (
390                                StatusCode::INTERNAL_SERVER_ERROR,
391                                Json(json!({"error": e.to_string()})),
392                            )
393                                .into_response(),
394                        }
395                    }
396                },
397            ),
398        )
399        .route(
400            "/tools/execute",
401            post(
402                move |ConnectInfo(addr): ConnectInfo<SocketAddr>,
403                      headers: HeaderMap,
404                      Json(payload): Json<serde_json::Value>| {
405                    let token = token_exec.clone();
406                    let authz = authz_exec.clone();
407                    let allowed = ips_exec.clone();
408                    let proto = protocol_exec.clone();
409                    let eh = eh_exec.clone();
410                    async move {
411                        if !allowed.is_allowed(addr.ip()) {
412                            if let Some(ref handler) = eh {
413                                handler
414                                    .on_mcp_event(&McpEvent::RequestRejected {
415                                        client_addr: addr.ip().to_string(),
416                                        reason: "IP not allowed".to_string(),
417                                    })
418                                    .await;
419                            }
420                            return (
421                                StatusCode::FORBIDDEN,
422                                Json(json!({"error": "Access denied"})),
423                            )
424                                .into_response();
425                        }
426
427                        if !check_auth(
428                            &token,
429                            &authz,
430                            &headers,
431                            BearerAuthContext {
432                                client_addr: addr,
433                                route: "/tools/execute".to_string(),
434                                action: "tools/execute".to_string(),
435                                payload: Some(payload.clone()),
436                            },
437                        ) {
438                            return (StatusCode::UNAUTHORIZED, Json(bearer_token_required_body()))
439                                .into_response();
440                        }
441
442                        let tool_name = payload["tool"].as_str().unwrap_or("").to_string();
443                        let params = payload["parameters"].clone();
444
445                        if let Some(ref handler) = eh {
446                            handler
447                                .on_mcp_event(&McpEvent::ToolCallReceived {
448                                    client_addr: addr.ip().to_string(),
449                                    tool_name: tool_name.clone(),
450                                    parameters: params.clone(),
451                                })
452                                .await;
453                        }
454
455                        let exec_start = std::time::Instant::now();
456                        match proto.execute(&tool_name, params).await {
457                            Ok(result) => {
458                                let duration_ms = exec_start.elapsed().as_millis() as u64;
459                                let success = result.success;
460                                let error = result.error.clone();
461                                if let Some(ref handler) = eh {
462                                    handler
463                                        .on_mcp_event(&McpEvent::ToolCallCompleted {
464                                            client_addr: addr.ip().to_string(),
465                                            tool_name: tool_name.clone(),
466                                            success,
467                                            error,
468                                            duration_ms,
469                                        })
470                                        .await;
471                                }
472                                (StatusCode::OK, Json(json!({"result": result}))).into_response()
473                            }
474                            Err(e) => {
475                                let duration_ms = exec_start.elapsed().as_millis() as u64;
476                                let err_msg = e.to_string();
477                                if let Some(ref handler) = eh {
478                                    handler
479                                        .on_mcp_event(&McpEvent::ToolError {
480                                            source: addr.ip().to_string(),
481                                            tool_name: tool_name.clone(),
482                                            error: err_msg.clone(),
483                                            duration_ms,
484                                        })
485                                        .await;
486                                }
487                                (StatusCode::BAD_REQUEST, Json(json!({"error": err_msg})))
488                                    .into_response()
489                            }
490                        }
491                    }
492                },
493            ),
494        )
495        .route(
496            "/resources/list",
497            post(
498                move |ConnectInfo(addr): ConnectInfo<SocketAddr>, headers: HeaderMap| {
499                    let token = token_res_list.clone();
500                    let authz = authz_res_list.clone();
501                    let allowed = ips_res_list.clone();
502                    let proto = protocol_res_list.clone();
503                    async move {
504                        if !allowed.is_allowed(addr.ip()) {
505                            return (
506                                StatusCode::FORBIDDEN,
507                                Json(json!({"error": "Access denied"})),
508                            )
509                                .into_response();
510                        }
511
512                        if !check_auth(
513                            &token,
514                            &authz,
515                            &headers,
516                            BearerAuthContext {
517                                client_addr: addr,
518                                route: "/resources/list".to_string(),
519                                action: "resources/list".to_string(),
520                                payload: None,
521                            },
522                        ) {
523                            return (StatusCode::UNAUTHORIZED, Json(bearer_token_required_body()))
524                                .into_response();
525                        }
526
527                        if !proto.supports_resources() {
528                            return (
529                                StatusCode::NOT_IMPLEMENTED,
530                                Json(json!({"error": "Resources not supported"})),
531                            )
532                                .into_response();
533                        }
534
535                        match proto.list_resources().await {
536                            Ok(resources) => {
537                                (StatusCode::OK, Json(json!({"resources": resources})))
538                                    .into_response()
539                            }
540                            Err(e) => (
541                                StatusCode::INTERNAL_SERVER_ERROR,
542                                Json(json!({"error": e.to_string()})),
543                            )
544                                .into_response(),
545                        }
546                    }
547                },
548            ),
549        )
550        .route(
551            "/resources/read",
552            post(
553                move |ConnectInfo(addr): ConnectInfo<SocketAddr>,
554                      headers: HeaderMap,
555                      Json(payload): Json<serde_json::Value>| {
556                    let token = token_res_read.clone();
557                    let authz = authz_res_read.clone();
558                    let allowed = ips_res_read.clone();
559                    let proto = protocol_res_read.clone();
560                    async move {
561                        if !allowed.is_allowed(addr.ip()) {
562                            return (
563                                StatusCode::FORBIDDEN,
564                                Json(json!({"error": "Access denied"})),
565                            )
566                                .into_response();
567                        }
568
569                        if !check_auth(
570                            &token,
571                            &authz,
572                            &headers,
573                            BearerAuthContext {
574                                client_addr: addr,
575                                route: "/resources/read".to_string(),
576                                action: "resources/read".to_string(),
577                                payload: Some(payload.clone()),
578                            },
579                        ) {
580                            return (StatusCode::UNAUTHORIZED, Json(bearer_token_required_body()))
581                                .into_response();
582                        }
583
584                        if !proto.supports_resources() {
585                            return (
586                                StatusCode::NOT_IMPLEMENTED,
587                                Json(json!({"error": "Resources not supported"})),
588                            )
589                                .into_response();
590                        }
591
592                        let uri = payload["uri"].as_str().unwrap_or("");
593
594                        match proto.read_resource(uri).await {
595                            Ok(content) => (
596                                StatusCode::OK,
597                                Json(json!({"uri": uri, "content": content})),
598                            )
599                                .into_response(),
600                            Err(e) => {
601                                (StatusCode::NOT_FOUND, Json(json!({"error": e.to_string()})))
602                                    .into_response()
603                            }
604                        }
605                    }
606                },
607            ),
608        )
609}
610
611/// Default Axum-based HTTP server adapter
612///
613/// Provides a full MCP-compatible HTTP server using the Axum framework.
614/// Only available when the `server` feature is enabled.
615#[cfg(feature = "server")]
616pub struct AxumHttpAdapter;
617
618#[cfg(feature = "server")]
619#[async_trait::async_trait]
620impl HttpServerAdapter for AxumHttpAdapter {
621    async fn start(
622        &self,
623        config: HttpServerConfig,
624        protocol: Arc<dyn ToolProtocol>,
625    ) -> Result<HttpServerInstance, Box<dyn Error + Send + Sync>> {
626        use crate::events::McpEvent;
627        use tokio::net::TcpListener;
628        let app =
629            axum_router(&config, protocol).into_make_service_with_connect_info::<SocketAddr>();
630
631        // Bind and start server
632        let listener = TcpListener::bind(config.addr).await?;
633        let addr = listener.local_addr()?;
634
635        // Fire ServerStarted event
636        if let Some(ref handler) = config.event_handler {
637            handler
638                .on_mcp_event(&McpEvent::ServerStarted {
639                    addr: addr.to_string(),
640                })
641                .await;
642        }
643
644        let server_handle = tokio::spawn(async move { axum::serve(listener, app).await });
645
646        Ok(HttpServerInstance::new(addr, Box::new(server_handle)))
647    }
648
649    fn name(&self) -> &str {
650        "axum"
651    }
652}