httpmcp_rust/
transport.rs

1use crate::context::RequestContext;
2use crate::error::{McpError, Result};
3use crate::handlers::lifecycle::{handle_initialize, handle_ping};
4use crate::jsonrpc::{JsonRpcRequest, JsonRpcResponse};
5use crate::protocol::*;
6use crate::server::HttpMcpServer;
7use actix_web::{
8    get, post,
9    web::{self, Data},
10    HttpRequest, HttpResponse, Responder,
11};
12use actix_web_lab::sse;
13use futures::stream;
14use serde_json::Value;
15use std::sync::Arc;
16
17/// Configure actix-web application
18pub fn create_app(cfg: &mut web::ServiceConfig, server: Arc<HttpMcpServer>) {
19    if server.enable_cors {
20        cfg.default_service(web::to(|| async {
21            HttpResponse::Ok()
22                .insert_header(("Access-Control-Allow-Origin", "*"))
23                .insert_header(("Access-Control-Allow-Methods", "GET, POST, OPTIONS"))
24                .insert_header((
25                    "Access-Control-Allow-Headers",
26                    "Content-Type, Authorization, Accept, Last-Event-ID",
27                ))
28                .finish()
29        }));
30    }
31
32    cfg.app_data(Data::new(server.clone()))
33        .service(handle_post)
34        .service(handle_get);
35}
36
37/// POST /mcp - Handle JSON-RPC requests
38#[post("/mcp")]
39async fn handle_post(
40    req: HttpRequest,
41    body: web::Json<JsonRpcRequest>,
42    server: Data<Arc<HttpMcpServer>>,
43) -> Result<impl Responder> {
44    let ctx = create_request_context(&req);
45
46    // Validate OAuth if configured
47    if let Some(oauth) = &server.oauth_config {
48        oauth.validate_token(&ctx).await?;
49    }
50
51    // Validate JSON-RPC request
52    body.validate()?;
53
54    let response = route_request(&body, &ctx, &server).await?;
55
56    Ok(HttpResponse::Ok().json(response))
57}
58
59/// GET /mcp - SSE stream for server-to-client messages
60#[get("/mcp")]
61async fn handle_get(req: HttpRequest, server: Data<Arc<HttpMcpServer>>) -> Result<impl Responder> {
62    let ctx = create_request_context(&req);
63
64    // Validate OAuth if configured
65    if let Some(oauth) = &server.oauth_config {
66        oauth.validate_token(&ctx).await?;
67    }
68
69    // Check for Last-Event-ID header for resumption
70    let _last_event_id = req
71        .headers()
72        .get("Last-Event-ID")
73        .and_then(|v| v.to_str().ok())
74        .map(|s| s.to_string());
75
76    // Create SSE stream
77    let event_stream = stream::iter(vec![Ok::<_, actix_web::Error>(sse::Event::Data(
78        sse::Data::new("SSE stream connected").event("message"),
79    ))]);
80
81    Ok(sse::Sse::from_stream(event_stream))
82}
83
84/// Route JSON-RPC request to appropriate handler
85async fn route_request(
86    req: &JsonRpcRequest,
87    ctx: &RequestContext,
88    server: &HttpMcpServer,
89) -> Result<JsonRpcResponse> {
90    tracing::debug!("Routing request: method={}", req.method);
91
92    match req.method.as_str() {
93        // Lifecycle
94        "initialize" => {
95            handle_initialize(req, server.server_info.clone(), server.capabilities.clone())
96        }
97        "ping" => handle_ping(req),
98
99        // Resources
100        "resources/list" => handle_resources_list(req, ctx, server).await,
101        "resources/read" => handle_resources_read(req, ctx, server).await,
102        "resources/templates/list" => handle_resources_templates(req, ctx, server).await,
103        "resources/subscribe" => handle_resources_subscribe(req, ctx, server).await,
104
105        // Tools
106        "tools/list" => handle_tools_list(req, ctx, server).await,
107        "tools/call" => handle_tools_call(req, ctx, server).await,
108
109        // Prompts
110        "prompts/list" => handle_prompts_list(req, ctx, server).await,
111        "prompts/get" => handle_prompts_get(req, ctx, server).await,
112
113        // Logging
114        "logging/setLevel" => handle_logging_set_level(req),
115
116        _ => Err(McpError::MethodNotFound(req.method.clone())),
117    }
118}
119
120// ============================================================================
121// Resource Handlers
122// ============================================================================
123
124async fn handle_resources_list(
125    req: &JsonRpcRequest,
126    ctx: &RequestContext,
127    server: &HttpMcpServer,
128) -> Result<JsonRpcResponse> {
129    let params: ResourcesListParams =
130        serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
131            .unwrap_or(ResourcesListParams { cursor: None });
132
133    // Collect all resources from registered handlers
134    let mut all_resources = Vec::new();
135    for registered in server.resources.values() {
136        let (resources, _) = (registered.list_handler)(params.cursor.clone(), ctx.clone()).await?;
137        all_resources.extend(resources);
138    }
139
140    let result = ResourcesListResult {
141        resources: all_resources,
142        next_cursor: None,
143    };
144
145    Ok(JsonRpcResponse::success(
146        serde_json::to_value(result)?,
147        req.id.clone(),
148    ))
149}
150
151async fn handle_resources_read(
152    req: &JsonRpcRequest,
153    ctx: &RequestContext,
154    server: &HttpMcpServer,
155) -> Result<JsonRpcResponse> {
156    let params: ResourcesReadParams =
157        serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
158            .map_err(|e| McpError::InvalidParams(format!("Invalid params: {}", e)))?;
159
160    // Try to find matching resource handler
161    let mut contents = Vec::new();
162    for registered in server.resources.values() {
163        let result = (registered.read_handler)(params.uri.clone(), ctx.clone()).await?;
164        contents.extend(result);
165    }
166
167    if contents.is_empty() {
168        return Err(McpError::ResourceNotFound(params.uri));
169    }
170
171    let result = ResourcesReadResult { contents };
172
173    Ok(JsonRpcResponse::success(
174        serde_json::to_value(result)?,
175        req.id.clone(),
176    ))
177}
178
179async fn handle_resources_templates(
180    req: &JsonRpcRequest,
181    _ctx: &RequestContext,
182    _server: &HttpMcpServer,
183) -> Result<JsonRpcResponse> {
184    // Resource templates are not supported in the new function-based API
185    Ok(JsonRpcResponse::success(
186        serde_json::json!({ "resourceTemplates": [] }),
187        req.id.clone(),
188    ))
189}
190
191async fn handle_resources_subscribe(
192    req: &JsonRpcRequest,
193    _ctx: &RequestContext,
194    _server: &HttpMcpServer,
195) -> Result<JsonRpcResponse> {
196    // Resource subscription is not supported in the new function-based API
197    Ok(JsonRpcResponse::success(Value::Null, req.id.clone()))
198}
199
200// ============================================================================
201// Tool Handlers
202// ============================================================================
203
204async fn handle_tools_list(
205    req: &JsonRpcRequest,
206    _ctx: &RequestContext,
207    server: &HttpMcpServer,
208) -> Result<JsonRpcResponse> {
209    // Collect all registered tools
210    let tools: Vec<Tool> = server
211        .tools
212        .values()
213        .map(|registered| registered.meta.clone())
214        .collect();
215
216    let result = ToolsListResult {
217        tools,
218        next_cursor: None,
219    };
220
221    Ok(JsonRpcResponse::success(
222        serde_json::to_value(result)?,
223        req.id.clone(),
224    ))
225}
226
227async fn handle_tools_call(
228    req: &JsonRpcRequest,
229    ctx: &RequestContext,
230    server: &HttpMcpServer,
231) -> Result<JsonRpcResponse> {
232    let params: ToolsCallParams = serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
233        .map_err(|e| McpError::InvalidParams(format!("Invalid params: {}", e)))?;
234
235    // Find the registered tool
236    let registered = server
237        .tools
238        .get(&params.name)
239        .ok_or_else(|| McpError::ToolNotFound(params.name.clone()))?;
240
241    // Call the tool handler
242    let result_value =
243        (registered.handler)(params.arguments.unwrap_or_default(), ctx.clone()).await?;
244
245    // Convert result to ToolContent
246    let content = vec![ToolContent::Text {
247        text: result_value.to_string(),
248    }];
249
250    let result = ToolsCallResult {
251        content,
252        is_error: None,
253    };
254
255    Ok(JsonRpcResponse::success(
256        serde_json::to_value(result)?,
257        req.id.clone(),
258    ))
259}
260
261// ============================================================================
262// Prompt Handlers
263// ============================================================================
264
265async fn handle_prompts_list(
266    req: &JsonRpcRequest,
267    _ctx: &RequestContext,
268    server: &HttpMcpServer,
269) -> Result<JsonRpcResponse> {
270    // Collect all registered prompts
271    let prompts: Vec<Prompt> = server
272        .prompts
273        .values()
274        .map(|registered| registered.meta.clone())
275        .collect();
276
277    let result = PromptsListResult {
278        prompts,
279        next_cursor: None,
280    };
281
282    Ok(JsonRpcResponse::success(
283        serde_json::to_value(result)?,
284        req.id.clone(),
285    ))
286}
287
288async fn handle_prompts_get(
289    req: &JsonRpcRequest,
290    ctx: &RequestContext,
291    server: &HttpMcpServer,
292) -> Result<JsonRpcResponse> {
293    let params: PromptsGetParams =
294        serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
295            .map_err(|e| McpError::InvalidParams(format!("Invalid params: {}", e)))?;
296
297    // Find the registered prompt
298    let registered = server
299        .prompts
300        .get(&params.name)
301        .ok_or_else(|| McpError::PromptNotFound(params.name.clone()))?;
302
303    // Call the prompt handler
304    let (description, messages) =
305        (registered.handler)(params.name.clone(), params.arguments, ctx.clone()).await?;
306
307    let result = PromptsGetResult {
308        description,
309        messages,
310    };
311
312    Ok(JsonRpcResponse::success(
313        serde_json::to_value(result)?,
314        req.id.clone(),
315    ))
316}
317
318// ============================================================================
319// Logging Handlers
320// ============================================================================
321
322fn handle_logging_set_level(req: &JsonRpcRequest) -> Result<JsonRpcResponse> {
323    let _params: LoggingSetLevelParams =
324        serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
325            .map_err(|e| McpError::InvalidParams(format!("Invalid params: {}", e)))?;
326
327    // TODO: Implement actual log level setting
328    Ok(JsonRpcResponse::success(Value::Null, req.id.clone()))
329}
330
331// ============================================================================
332// Utilities
333// ============================================================================
334
335fn create_request_context(req: &HttpRequest) -> RequestContext {
336    RequestContext::new(
337        req.headers().clone(),
338        req.method().to_string(),
339        req.path().to_string(),
340        req.peer_addr(),
341    )
342}