httpmcp_rust/
transport.rs

1use crate::context::RequestContext;
2use crate::error::{McpError, Result};
3use crate::handlers::lifecycle::{handle_initialize, handle_ping};
4use crate::jsonrpc::{JsonRpcRequest, JsonRpcResponse};
5use crate::protocol::*;
6use crate::server::HttpMcpServer;
7use actix_web::{
8    get, post,
9    web::{self, Data},
10    HttpRequest, HttpResponse, Responder,
11};
12use actix_web_lab::sse;
13use serde_json::Value;
14use std::sync::Arc;
15
16/// Configure actix-web application
17pub fn create_app(cfg: &mut web::ServiceConfig, server: Arc<HttpMcpServer>) {
18    if server.enable_cors {
19        cfg.default_service(web::to(|| async {
20            HttpResponse::Ok()
21                .insert_header(("Access-Control-Allow-Origin", "*"))
22                .insert_header((
23                    "Access-Control-Allow-Methods",
24                    "GET, POST, PUT, DELETE, PATCH, OPTIONS",
25                ))
26                .insert_header(("Access-Control-Allow-Headers", "*"))
27                .finish()
28        }));
29    }
30
31    cfg.app_data(Data::new(server.clone()))
32        .service(handle_post)
33        .service(handle_get);
34
35    // Register custom endpoints dynamically
36    for endpoint in &server.endpoints {
37        let route = endpoint.route.clone();
38        let method = endpoint.method.clone();
39        let handler = endpoint.handler.clone();
40        let server_clone = server.clone();
41
42        cfg.route(
43            &route,
44            web::method(parse_http_method(&method)).to(
45                move |req: HttpRequest, body: Option<web::Json<Value>>| {
46                    let handler = handler.clone();
47                    let server_clone = server_clone.clone();
48                    async move {
49                        let ctx = create_request_context(&req);
50
51                        // Validate OAuth if configured
52                        if let Some(oauth) = &server_clone.oauth_config {
53                            if let Err(e) = oauth.validate_token(&ctx).await {
54                                return Ok::<HttpResponse, actix_web::Error>(
55                                    HttpResponse::Unauthorized().json(serde_json::json!({
56                                        "error": e.to_string()
57                                    })),
58                                );
59                            }
60                        }
61
62                        let body_value = body.map(|json| json.into_inner());
63                        match handler(ctx, body_value).await {
64                            Ok(response) => Ok(response),
65                            Err(e) => {
66                                Ok(HttpResponse::InternalServerError().json(serde_json::json!({
67                                    "error": e.to_string()
68                                })))
69                            }
70                        }
71                    }
72                },
73            ),
74        );
75    }
76}
77
78/// Parse HTTP method string to actix-web Method
79fn parse_http_method(method: &str) -> actix_web::http::Method {
80    match method.to_uppercase().as_str() {
81        "GET" => actix_web::http::Method::GET,
82        "POST" => actix_web::http::Method::POST,
83        "PUT" => actix_web::http::Method::PUT,
84        "DELETE" => actix_web::http::Method::DELETE,
85        "PATCH" => actix_web::http::Method::PATCH,
86        "HEAD" => actix_web::http::Method::HEAD,
87        "OPTIONS" => actix_web::http::Method::OPTIONS,
88        _ => actix_web::http::Method::GET,
89    }
90}
91
92/// POST /mcp - Handle JSON-RPC requests
93#[post("/mcp")]
94async fn handle_post(
95    req: HttpRequest,
96    body: web::Json<JsonRpcRequest>,
97    server: Data<Arc<HttpMcpServer>>,
98) -> Result<impl Responder> {
99    let ctx = create_request_context(&req);
100
101    // Validate OAuth if configured
102    if let Some(oauth) = &server.oauth_config {
103        oauth.validate_token(&ctx).await?;
104    }
105
106    // Validate JSON-RPC request
107    body.validate()?;
108
109    // Check if this is a notification (no id field)
110    let is_notification = body.id.is_none();
111
112    // Check if client accepts SSE (streaming mode)
113    let accept_sse = req
114        .headers()
115        .get("accept")
116        .and_then(|v| v.to_str().ok())
117        .map(|s| s.contains("text/event-stream"))
118        .unwrap_or(false);
119
120    // Route and execute the request
121    let response = route_request(&body, &ctx, &server).await?;
122
123    // Notifications MUST NOT receive a response per JSON-RPC 2.0 spec
124    if is_notification {
125        tracing::debug!(
126            "Notification received ({}), returning 204 No Content",
127            body.method
128        );
129        let mut resp = HttpResponse::NoContent();
130        if server.enable_cors {
131            resp.insert_header(("Access-Control-Allow-Origin", "*"));
132        }
133        return Ok(resp.finish());
134    }
135
136    // For SSE mode, broadcast response and return 202 Accepted
137    if accept_sse {
138        let subscriber_count = server.response_tx.receiver_count();
139        tracing::debug!("Broadcasting response to {} subscribers", subscriber_count);
140
141        // If there are active SSE subscribers, send via broadcast
142        if subscriber_count > 0 {
143            let _ = server.response_tx.send(response);
144            let mut resp = HttpResponse::Accepted();
145            if server.enable_cors {
146                resp.insert_header(("Access-Control-Allow-Origin", "*"));
147            }
148            return Ok(resp.finish());
149        }
150
151        // If no subscribers, fallback to direct response
152        tracing::warn!("No SSE subscribers, falling back to direct HTTP response");
153    }
154
155    // For non-SSE mode or fallback, return JSON response directly
156    let mut resp = HttpResponse::Ok();
157    if server.enable_cors {
158        resp.insert_header(("Access-Control-Allow-Origin", "*"));
159    }
160    Ok(resp.json(response))
161}
162
163/// GET /mcp - SSE stream for server-to-client messages
164#[get("/mcp")]
165async fn handle_get(req: HttpRequest, server: Data<Arc<HttpMcpServer>>) -> Result<impl Responder> {
166    let ctx = create_request_context(&req);
167
168    // Validate OAuth if configured
169    if let Some(oauth) = &server.oauth_config {
170        oauth.validate_token(&ctx).await?;
171    }
172
173    // Check for Last-Event-ID header for resumption
174    let _last_event_id = req
175        .headers()
176        .get("Last-Event-ID")
177        .and_then(|v| v.to_str().ok())
178        .map(|s| s.to_string());
179
180    // Subscribe to response broadcast channel
181    let mut rx = server.response_tx.subscribe();
182
183    tracing::debug!("SSE stream connected");
184
185    // Create SSE stream from broadcast channel
186    let event_stream = async_stream::stream! {
187        while let Ok(response) = rx.recv().await {
188            if let Ok(json) = serde_json::to_string(&response) {
189                tracing::debug!("Sending response via SSE: {}", json);
190                // Send as "message" event with the JSON-RPC response
191                yield Ok::<_, actix_web::Error>(sse::Event::Data(
192                    sse::Data::new(json)
193                ));
194            }
195        }
196    };
197
198    Ok(sse::Sse::from_stream(event_stream))
199}
200
201/// Route JSON-RPC request to appropriate handler
202async fn route_request(
203    req: &JsonRpcRequest,
204    ctx: &RequestContext,
205    server: &HttpMcpServer,
206) -> Result<JsonRpcResponse> {
207    tracing::debug!("Routing request: method={}", req.method);
208
209    match req.method.as_str() {
210        // Lifecycle
211        "initialize" => {
212            handle_initialize(req, server.server_info.clone(), server.capabilities.clone())
213        }
214        "ping" => handle_ping(req),
215
216        // Notifications
217        "notifications/initialized" => handle_notifications_initialized(req),
218
219        // Resources
220        "resources/list" => handle_resources_list(req, ctx, server).await,
221        "resources/read" => handle_resources_read(req, ctx, server).await,
222        "resources/templates/list" => handle_resources_templates(req, ctx, server).await,
223        "resources/subscribe" => handle_resources_subscribe(req, ctx, server).await,
224
225        // Tools
226        "tools/list" => handle_tools_list(req, ctx, server).await,
227        "tools/call" => handle_tools_call(req, ctx, server).await,
228
229        // Prompts
230        "prompts/list" => handle_prompts_list(req, ctx, server).await,
231        "prompts/get" => handle_prompts_get(req, ctx, server).await,
232
233        // Logging
234        "logging/setLevel" => handle_logging_set_level(req),
235
236        _ => Err(McpError::MethodNotFound(req.method.clone())),
237    }
238}
239
240// ============================================================================
241// Resource Handlers
242// ============================================================================
243
244async fn handle_resources_list(
245    req: &JsonRpcRequest,
246    ctx: &RequestContext,
247    server: &HttpMcpServer,
248) -> Result<JsonRpcResponse> {
249    let params: ResourcesListParams =
250        serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
251            .unwrap_or(ResourcesListParams { cursor: None });
252
253    // Collect all resources from registered handlers
254    let mut all_resources = Vec::new();
255    for registered in server.resources.values() {
256        let (resources, _) = (registered.list_handler)(params.cursor.clone(), ctx.clone()).await?;
257        all_resources.extend(resources);
258    }
259
260    let result = ResourcesListResult {
261        resources: all_resources,
262        next_cursor: None,
263    };
264
265    Ok(JsonRpcResponse::success(
266        serde_json::to_value(result)?,
267        req.id.clone(),
268    ))
269}
270
271async fn handle_resources_read(
272    req: &JsonRpcRequest,
273    ctx: &RequestContext,
274    server: &HttpMcpServer,
275) -> Result<JsonRpcResponse> {
276    let params: ResourcesReadParams =
277        serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
278            .map_err(|e| McpError::InvalidParams(format!("Invalid params: {}", e)))?;
279
280    // Try to find matching resource handler
281    let mut contents = Vec::new();
282    for registered in server.resources.values() {
283        let result = (registered.read_handler)(params.uri.clone(), ctx.clone()).await?;
284        contents.extend(result);
285    }
286
287    if contents.is_empty() {
288        return Err(McpError::ResourceNotFound(params.uri));
289    }
290
291    let result = ResourcesReadResult { contents };
292
293    Ok(JsonRpcResponse::success(
294        serde_json::to_value(result)?,
295        req.id.clone(),
296    ))
297}
298
299async fn handle_resources_templates(
300    req: &JsonRpcRequest,
301    _ctx: &RequestContext,
302    _server: &HttpMcpServer,
303) -> Result<JsonRpcResponse> {
304    // Resource templates are not supported in the new function-based API
305    Ok(JsonRpcResponse::success(
306        serde_json::json!({ "resourceTemplates": [] }),
307        req.id.clone(),
308    ))
309}
310
311async fn handle_resources_subscribe(
312    req: &JsonRpcRequest,
313    _ctx: &RequestContext,
314    _server: &HttpMcpServer,
315) -> Result<JsonRpcResponse> {
316    // Resource subscription is not supported in the new function-based API
317    Ok(JsonRpcResponse::success(Value::Null, req.id.clone()))
318}
319
320// ============================================================================
321// Tool Handlers
322// ============================================================================
323
324async fn handle_tools_list(
325    req: &JsonRpcRequest,
326    _ctx: &RequestContext,
327    server: &HttpMcpServer,
328) -> Result<JsonRpcResponse> {
329    // Collect all registered tools
330    let tools: Vec<Tool> = server
331        .tools
332        .values()
333        .map(|registered| registered.meta.clone())
334        .collect();
335
336    let result = ToolsListResult {
337        tools,
338        next_cursor: None,
339    };
340
341    Ok(JsonRpcResponse::success(
342        serde_json::to_value(result)?,
343        req.id.clone(),
344    ))
345}
346
347async fn handle_tools_call(
348    req: &JsonRpcRequest,
349    ctx: &RequestContext,
350    server: &HttpMcpServer,
351) -> Result<JsonRpcResponse> {
352    let params: ToolsCallParams = serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
353        .map_err(|e| McpError::InvalidParams(format!("Invalid params: {}", e)))?;
354
355    // Find the registered tool
356    let registered = server
357        .tools
358        .get(&params.name)
359        .ok_or_else(|| McpError::ToolNotFound(params.name.clone()))?;
360
361    // Call the tool handler
362    let result_value =
363        (registered.handler)(params.arguments.unwrap_or_default(), ctx.clone()).await?;
364
365    // Convert result to ToolContent
366    let content = vec![ToolContent::Text {
367        text: result_value.to_string(),
368    }];
369
370    let result = ToolsCallResult {
371        content,
372        is_error: None,
373    };
374
375    Ok(JsonRpcResponse::success(
376        serde_json::to_value(result)?,
377        req.id.clone(),
378    ))
379}
380
381// ============================================================================
382// Prompt Handlers
383// ============================================================================
384
385async fn handle_prompts_list(
386    req: &JsonRpcRequest,
387    _ctx: &RequestContext,
388    server: &HttpMcpServer,
389) -> Result<JsonRpcResponse> {
390    // Collect all registered prompts
391    let prompts: Vec<Prompt> = server
392        .prompts
393        .values()
394        .map(|registered| registered.meta.clone())
395        .collect();
396
397    let result = PromptsListResult {
398        prompts,
399        next_cursor: None,
400    };
401
402    Ok(JsonRpcResponse::success(
403        serde_json::to_value(result)?,
404        req.id.clone(),
405    ))
406}
407
408async fn handle_prompts_get(
409    req: &JsonRpcRequest,
410    ctx: &RequestContext,
411    server: &HttpMcpServer,
412) -> Result<JsonRpcResponse> {
413    let params: PromptsGetParams =
414        serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
415            .map_err(|e| McpError::InvalidParams(format!("Invalid params: {}", e)))?;
416
417    // Find the registered prompt
418    let registered = server
419        .prompts
420        .get(&params.name)
421        .ok_or_else(|| McpError::PromptNotFound(params.name.clone()))?;
422
423    // Call the prompt handler
424    let (description, messages) =
425        (registered.handler)(params.name.clone(), params.arguments, ctx.clone()).await?;
426
427    let result = PromptsGetResult {
428        description,
429        messages,
430    };
431
432    Ok(JsonRpcResponse::success(
433        serde_json::to_value(result)?,
434        req.id.clone(),
435    ))
436}
437
438// ============================================================================
439// Notification Handlers
440// ============================================================================
441
442fn handle_notifications_initialized(req: &JsonRpcRequest) -> Result<JsonRpcResponse> {
443    tracing::debug!("Client initialized notification received");
444    // Return empty object instead of null
445    Ok(JsonRpcResponse::success(
446        serde_json::json!({}),
447        req.id.clone(),
448    ))
449}
450
451// ============================================================================
452// Logging Handlers
453// ============================================================================
454
455fn handle_logging_set_level(req: &JsonRpcRequest) -> Result<JsonRpcResponse> {
456    let _params: LoggingSetLevelParams =
457        serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
458            .map_err(|e| McpError::InvalidParams(format!("Invalid params: {}", e)))?;
459
460    // TODO: Implement actual log level setting
461    Ok(JsonRpcResponse::success(
462        serde_json::json!({}),
463        req.id.clone(),
464    ))
465}
466
467// ============================================================================
468// Utilities
469// ============================================================================
470
471fn create_request_context(req: &HttpRequest) -> RequestContext {
472    RequestContext::new(
473        req.headers().clone(),
474        req.method().to_string(),
475        req.path().to_string(),
476        req.peer_addr(),
477    )
478}