Skip to main content

model_context_protocol/server/
http.rs

1//! HTTP transport for MCP Server.
2//!
3//! This module provides `McpHttpServer` which wraps the core `McpServer`
4//! and handles HTTP/SSE I/O using actix-web.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use mcp::server::{McpServerConfig, http::McpHttpServer};
10//!
11//! let config = McpServerConfig::builder()
12//!     .name("my-server")
13//!     .version("1.0.0")
14//!     .with_tool(MyTool)
15//!     .build();
16//!
17//! McpHttpServer::run(config, "127.0.0.1", 8080).await?;
18//! ```
19
20use std::sync::Arc;
21
22use actix_web::{web, App, HttpResponse, HttpServer};
23use tokio::sync::mpsc;
24
25use super::{McpServer, McpServerConfig, ServerError};
26use crate::protocol::{ClientInbound, JsonRpcId, JsonRpcMessage, JsonRpcResponse, ServerOutbound};
27
28/// Application state shared across HTTP handlers.
29struct AppState {
30    inbound_tx: mpsc::Sender<ClientInbound>,
31    server: Arc<McpServer>,
32}
33
34/// MCP Server with HTTP transport.
35///
36/// This server exposes HTTP endpoints for JSON-RPC communication and
37/// optionally SSE for server-to-client streaming. It wraps the core
38/// `McpServer` and bridges HTTP I/O to the internal channel-based
39/// communication.
40pub struct McpHttpServer;
41
42impl McpHttpServer {
43    /// Runs an MCP server with HTTP transport.
44    ///
45    /// This starts an HTTP server on the specified host and port.
46    /// The function blocks until the server stops.
47    ///
48    /// # Endpoints
49    ///
50    /// - `POST /rpc` - JSON-RPC endpoint for all MCP methods
51    /// - `GET /tools` - List available tools
52    /// - `POST /call` - Direct tool call endpoint
53    /// - `GET /sse` - Server-Sent Events for server-to-client notifications
54    ///
55    /// # Example
56    ///
57    /// ```ignore
58    /// let config = McpServerConfig::builder()
59    ///     .name("my-server")
60    ///     .version("1.0.0")
61    ///     .with_tool(MyTool)
62    ///     .build();
63    ///
64    /// McpHttpServer::run(config, "127.0.0.1", 8080).await?;
65    /// ```
66    pub async fn run(config: McpServerConfig, host: &str, port: u16) -> Result<(), ServerError> {
67        let (server, mut channels) = McpServer::new(config);
68        let inbound_tx = channels.inbound_tx.clone();
69
70        // Spawn a task to handle outbound messages (for SSE or logging)
71        let _outbound_handle = tokio::spawn(async move {
72            while let Some(outbound) = channels.outbound_rx.recv().await {
73                // For now, just log outbound notifications
74                // SSE streaming could be implemented here
75                match &outbound {
76                    ServerOutbound::Notification(n) => {
77                        eprintln!("[MCP] Notification: {}", n.method);
78                    }
79                    ServerOutbound::Request(r) => {
80                        eprintln!("[MCP] Server request: {}", r.method);
81                    }
82                    _ => {}
83                }
84            }
85        });
86
87        let state = web::Data::new(AppState {
88            inbound_tx,
89            server: Arc::clone(&server),
90        });
91
92        HttpServer::new(move || {
93            let state = state.clone();
94
95            App::new()
96                .app_data(state)
97                .route("/rpc", web::post().to(handle_rpc))
98                .route("/tools", web::get().to(handle_tools_list))
99                .route("/call", web::post().to(handle_tool_call))
100                .route("/health", web::get().to(handle_health))
101        })
102        .bind((host, port))
103        .map_err(|e| ServerError::Io(std::io::Error::new(std::io::ErrorKind::AddrInUse, e)))?
104        .run()
105        .await
106        .map_err(|e| ServerError::Io(std::io::Error::other(e)))
107    }
108}
109
110/// Handles JSON-RPC requests.
111async fn handle_rpc(state: web::Data<AppState>, body: String) -> HttpResponse {
112    let message = match JsonRpcMessage::parse(&body) {
113        Ok(m) => m,
114        Err(e) => {
115            let error_response = JsonRpcResponse::error(
116                JsonRpcId::Null,
117                -32700,
118                format!("Parse error: {}", e),
119                None,
120            );
121            return HttpResponse::Ok().json(error_response);
122        }
123    };
124
125    // For HTTP, we need to handle request/response synchronously
126    // since HTTP doesn't maintain a persistent connection
127    match message {
128        JsonRpcMessage::Request(request) => {
129            // Call the server directly for HTTP since it's stateless
130            let response = handle_request_directly(&state.server, request).await;
131            HttpResponse::Ok().json(response)
132        }
133        JsonRpcMessage::Notification(notification) => {
134            // Handle notification
135            let inbound = ClientInbound::Notification(notification);
136            let _ = state.inbound_tx.send(inbound).await;
137            HttpResponse::NoContent().finish()
138        }
139        JsonRpcMessage::Response(_) => HttpResponse::BadRequest().json(serde_json::json!({
140            "error": "Unexpected response message"
141        })),
142    }
143}
144
145/// Handles a request directly using the server.
146async fn handle_request_directly(
147    server: &McpServer,
148    request: crate::protocol::JsonRpcRequest,
149) -> JsonRpcResponse {
150    match request.method.as_str() {
151        "initialize" => {
152            JsonRpcResponse::success(
153                request.id,
154                serde_json::json!({
155                    "protocolVersion": crate::protocol::MCP_PROTOCOL_VERSION,
156                    "serverInfo": server.server_info(),
157                    "capabilities": {} // TODO: expose capabilities
158                }),
159            )
160        }
161        "tools/list" => {
162            let tools = server.list_tools();
163            JsonRpcResponse::success(request.id, serde_json::json!({ "tools": tools }))
164        }
165        "tools/call" => {
166            let params = match request.params {
167                Some(p) => p,
168                None => {
169                    return JsonRpcResponse::error(
170                        request.id,
171                        -32602,
172                        "Missing params".to_string(),
173                        None,
174                    );
175                }
176            };
177
178            let name = match params.get("name").and_then(|n| n.as_str()) {
179                Some(n) => n,
180                None => {
181                    return JsonRpcResponse::error(
182                        request.id,
183                        -32602,
184                        "Missing tool name".to_string(),
185                        None,
186                    );
187                }
188            };
189
190            let arguments = params
191                .get("arguments")
192                .cloned()
193                .unwrap_or(serde_json::json!({}));
194
195            let result = server.call_tool(name, arguments).await;
196
197            match result {
198                Ok(content) => JsonRpcResponse::success(
199                    request.id,
200                    serde_json::json!({
201                        "content": content,
202                        "isError": false
203                    }),
204                ),
205                Err(e) => JsonRpcResponse::success(
206                    request.id,
207                    serde_json::json!({
208                        "content": [{ "type": "text", "text": e.to_string() }],
209                        "isError": true
210                    }),
211                ),
212            }
213        }
214        "ping" => JsonRpcResponse::success(request.id, serde_json::json!({})),
215        _ => JsonRpcResponse::error(
216            request.id,
217            -32601,
218            format!("Method not found: {}", request.method),
219            None,
220        ),
221    }
222}
223
224/// Handles GET /tools endpoint.
225async fn handle_tools_list(state: web::Data<AppState>) -> HttpResponse {
226    let tools = state.server.list_tools();
227    HttpResponse::Ok().json(tools)
228}
229
230/// Request body for the /call endpoint.
231#[derive(serde::Deserialize)]
232struct CallToolRequest {
233    name: String,
234    arguments: serde_json::Value,
235}
236
237/// Handles POST /call endpoint.
238async fn handle_tool_call(
239    state: web::Data<AppState>,
240    body: web::Json<CallToolRequest>,
241) -> HttpResponse {
242    let result = state
243        .server
244        .call_tool(&body.name, body.arguments.clone())
245        .await;
246
247    match result {
248        Ok(content) => HttpResponse::Ok().json(content),
249        Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({
250            "error": e.to_string()
251        })),
252    }
253}
254
255/// Handles GET /health endpoint.
256async fn handle_health(state: web::Data<AppState>) -> HttpResponse {
257    let status = state.server.status();
258    HttpResponse::Ok().json(serde_json::json!({
259        "status": format!("{:?}", status),
260        "name": state.server.name(),
261        "version": state.server.version()
262    }))
263}
264
265#[cfg(test)]
266mod tests {
267    #[test]
268    fn test_http_server_module_exists() {
269        // Basic module existence test
270        // Full integration tests would require actix-web test utilities
271    }
272}