1use crate::context::RequestContext;
2use crate::error::{McpError, Result};
3use crate::handlers::lifecycle::{handle_initialize, handle_ping};
4use crate::jsonrpc::{JsonRpcRequest, JsonRpcResponse};
5use crate::protocol::*;
6use crate::server::HttpMcpServer;
7use actix_web::{
8    get, post,
9    web::{self, Data},
10    HttpRequest, HttpResponse, Responder,
11};
12use actix_web_lab::sse;
13use serde_json::Value;
14use std::sync::Arc;
15
16pub fn create_app(cfg: &mut web::ServiceConfig, server: Arc<HttpMcpServer>) {
18    if server.enable_cors {
19        cfg.default_service(web::to(|| async {
20            HttpResponse::Ok()
21                .insert_header(("Access-Control-Allow-Origin", "*"))
22                .insert_header(("Access-Control-Allow-Methods", "GET, POST, OPTIONS"))
23                .insert_header((
24                    "Access-Control-Allow-Headers",
25                    "Content-Type, Authorization, Accept, Last-Event-ID, mcp-protocol-version",
26                ))
27                .finish()
28        }));
29    }
30
31    cfg.app_data(Data::new(server.clone()))
32        .service(handle_post)
33        .service(handle_get);
34}
35
36#[post("/mcp")]
38async fn handle_post(
39    req: HttpRequest,
40    body: web::Json<JsonRpcRequest>,
41    server: Data<Arc<HttpMcpServer>>,
42) -> Result<impl Responder> {
43    let ctx = create_request_context(&req);
44
45    if let Some(oauth) = &server.oauth_config {
47        oauth.validate_token(&ctx).await?;
48    }
49
50    body.validate()?;
52
53    let is_notification = body.id.is_none();
55
56    let accept_sse = req
58        .headers()
59        .get("accept")
60        .and_then(|v| v.to_str().ok())
61        .map(|s| s.contains("text/event-stream"))
62        .unwrap_or(false);
63
64    let response = route_request(&body, &ctx, &server).await?;
66
67    if is_notification {
69        tracing::debug!("Notification received ({}), returning 204 No Content", body.method);
70        let mut resp = HttpResponse::NoContent();
71        if server.enable_cors {
72            resp.insert_header(("Access-Control-Allow-Origin", "*"));
73        }
74        return Ok(resp.finish());
75    }
76
77    if accept_sse {
79        let subscriber_count = server.response_tx.receiver_count();
80        tracing::debug!("Broadcasting response to {} subscribers", subscriber_count);
81
82        if subscriber_count > 0 {
84            let _ = server.response_tx.send(response);
85            let mut resp = HttpResponse::Accepted();
86            if server.enable_cors {
87                resp.insert_header(("Access-Control-Allow-Origin", "*"));
88            }
89            return Ok(resp.finish());
90        }
91
92        tracing::warn!("No SSE subscribers, falling back to direct HTTP response");
94    }
95
96    let mut resp = HttpResponse::Ok();
98    if server.enable_cors {
99        resp.insert_header(("Access-Control-Allow-Origin", "*"));
100    }
101    Ok(resp.json(response))
102}
103
104#[get("/mcp")]
106async fn handle_get(req: HttpRequest, server: Data<Arc<HttpMcpServer>>) -> Result<impl Responder> {
107    let ctx = create_request_context(&req);
108
109    if let Some(oauth) = &server.oauth_config {
111        oauth.validate_token(&ctx).await?;
112    }
113
114    let _last_event_id = req
116        .headers()
117        .get("Last-Event-ID")
118        .and_then(|v| v.to_str().ok())
119        .map(|s| s.to_string());
120
121    let mut rx = server.response_tx.subscribe();
123
124    tracing::debug!("SSE stream connected");
125
126    let event_stream = async_stream::stream! {
128        loop {
129            match rx.recv().await {
130                Ok(response) => {
131                    if let Ok(json) = serde_json::to_string(&response) {
132                        tracing::debug!("Sending response via SSE: {}", json);
133                        yield Ok::<_, actix_web::Error>(sse::Event::Data(
135                            sse::Data::new(json)
136                        ));
137                    }
138                }
139                Err(_) => break,
140            }
141        }
142    };
143
144    Ok(sse::Sse::from_stream(event_stream))
145}
146
147async fn route_request(
149    req: &JsonRpcRequest,
150    ctx: &RequestContext,
151    server: &HttpMcpServer,
152) -> Result<JsonRpcResponse> {
153    tracing::debug!("Routing request: method={}", req.method);
154
155    match req.method.as_str() {
156        "initialize" => {
158            handle_initialize(req, server.server_info.clone(), server.capabilities.clone())
159        }
160        "ping" => handle_ping(req),
161
162        "notifications/initialized" => handle_notifications_initialized(req),
164
165        "resources/list" => handle_resources_list(req, ctx, server).await,
167        "resources/read" => handle_resources_read(req, ctx, server).await,
168        "resources/templates/list" => handle_resources_templates(req, ctx, server).await,
169        "resources/subscribe" => handle_resources_subscribe(req, ctx, server).await,
170
171        "tools/list" => handle_tools_list(req, ctx, server).await,
173        "tools/call" => handle_tools_call(req, ctx, server).await,
174
175        "prompts/list" => handle_prompts_list(req, ctx, server).await,
177        "prompts/get" => handle_prompts_get(req, ctx, server).await,
178
179        "logging/setLevel" => handle_logging_set_level(req),
181
182        _ => Err(McpError::MethodNotFound(req.method.clone())),
183    }
184}
185
186async fn handle_resources_list(
191    req: &JsonRpcRequest,
192    ctx: &RequestContext,
193    server: &HttpMcpServer,
194) -> Result<JsonRpcResponse> {
195    let params: ResourcesListParams =
196        serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
197            .unwrap_or(ResourcesListParams { cursor: None });
198
199    let mut all_resources = Vec::new();
201    for registered in server.resources.values() {
202        let (resources, _) = (registered.list_handler)(params.cursor.clone(), ctx.clone()).await?;
203        all_resources.extend(resources);
204    }
205
206    let result = ResourcesListResult {
207        resources: all_resources,
208        next_cursor: None,
209    };
210
211    Ok(JsonRpcResponse::success(
212        serde_json::to_value(result)?,
213        req.id.clone(),
214    ))
215}
216
217async fn handle_resources_read(
218    req: &JsonRpcRequest,
219    ctx: &RequestContext,
220    server: &HttpMcpServer,
221) -> Result<JsonRpcResponse> {
222    let params: ResourcesReadParams =
223        serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
224            .map_err(|e| McpError::InvalidParams(format!("Invalid params: {}", e)))?;
225
226    let mut contents = Vec::new();
228    for registered in server.resources.values() {
229        let result = (registered.read_handler)(params.uri.clone(), ctx.clone()).await?;
230        contents.extend(result);
231    }
232
233    if contents.is_empty() {
234        return Err(McpError::ResourceNotFound(params.uri));
235    }
236
237    let result = ResourcesReadResult { contents };
238
239    Ok(JsonRpcResponse::success(
240        serde_json::to_value(result)?,
241        req.id.clone(),
242    ))
243}
244
245async fn handle_resources_templates(
246    req: &JsonRpcRequest,
247    _ctx: &RequestContext,
248    _server: &HttpMcpServer,
249) -> Result<JsonRpcResponse> {
250    Ok(JsonRpcResponse::success(
252        serde_json::json!({ "resourceTemplates": [] }),
253        req.id.clone(),
254    ))
255}
256
257async fn handle_resources_subscribe(
258    req: &JsonRpcRequest,
259    _ctx: &RequestContext,
260    _server: &HttpMcpServer,
261) -> Result<JsonRpcResponse> {
262    Ok(JsonRpcResponse::success(Value::Null, req.id.clone()))
264}
265
266async fn handle_tools_list(
271    req: &JsonRpcRequest,
272    _ctx: &RequestContext,
273    server: &HttpMcpServer,
274) -> Result<JsonRpcResponse> {
275    let tools: Vec<Tool> = server
277        .tools
278        .values()
279        .map(|registered| registered.meta.clone())
280        .collect();
281
282    let result = ToolsListResult {
283        tools,
284        next_cursor: None,
285    };
286
287    Ok(JsonRpcResponse::success(
288        serde_json::to_value(result)?,
289        req.id.clone(),
290    ))
291}
292
293async fn handle_tools_call(
294    req: &JsonRpcRequest,
295    ctx: &RequestContext,
296    server: &HttpMcpServer,
297) -> Result<JsonRpcResponse> {
298    let params: ToolsCallParams = serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
299        .map_err(|e| McpError::InvalidParams(format!("Invalid params: {}", e)))?;
300
301    let registered = server
303        .tools
304        .get(¶ms.name)
305        .ok_or_else(|| McpError::ToolNotFound(params.name.clone()))?;
306
307    let result_value =
309        (registered.handler)(params.arguments.unwrap_or_default(), ctx.clone()).await?;
310
311    let content = vec![ToolContent::Text {
313        text: result_value.to_string(),
314    }];
315
316    let result = ToolsCallResult {
317        content,
318        is_error: None,
319    };
320
321    Ok(JsonRpcResponse::success(
322        serde_json::to_value(result)?,
323        req.id.clone(),
324    ))
325}
326
327async fn handle_prompts_list(
332    req: &JsonRpcRequest,
333    _ctx: &RequestContext,
334    server: &HttpMcpServer,
335) -> Result<JsonRpcResponse> {
336    let prompts: Vec<Prompt> = server
338        .prompts
339        .values()
340        .map(|registered| registered.meta.clone())
341        .collect();
342
343    let result = PromptsListResult {
344        prompts,
345        next_cursor: None,
346    };
347
348    Ok(JsonRpcResponse::success(
349        serde_json::to_value(result)?,
350        req.id.clone(),
351    ))
352}
353
354async fn handle_prompts_get(
355    req: &JsonRpcRequest,
356    ctx: &RequestContext,
357    server: &HttpMcpServer,
358) -> Result<JsonRpcResponse> {
359    let params: PromptsGetParams =
360        serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
361            .map_err(|e| McpError::InvalidParams(format!("Invalid params: {}", e)))?;
362
363    let registered = server
365        .prompts
366        .get(¶ms.name)
367        .ok_or_else(|| McpError::PromptNotFound(params.name.clone()))?;
368
369    let (description, messages) =
371        (registered.handler)(params.name.clone(), params.arguments, ctx.clone()).await?;
372
373    let result = PromptsGetResult {
374        description,
375        messages,
376    };
377
378    Ok(JsonRpcResponse::success(
379        serde_json::to_value(result)?,
380        req.id.clone(),
381    ))
382}
383
384fn handle_notifications_initialized(req: &JsonRpcRequest) -> Result<JsonRpcResponse> {
389    tracing::debug!("Client initialized notification received");
390    Ok(JsonRpcResponse::success(serde_json::json!({}), req.id.clone()))
392}
393
394fn handle_logging_set_level(req: &JsonRpcRequest) -> Result<JsonRpcResponse> {
399    let _params: LoggingSetLevelParams =
400        serde_json::from_value(req.params.clone().unwrap_or(Value::Null))
401            .map_err(|e| McpError::InvalidParams(format!("Invalid params: {}", e)))?;
402
403    Ok(JsonRpcResponse::success(serde_json::json!({}), req.id.clone()))
405}
406
407fn create_request_context(req: &HttpRequest) -> RequestContext {
412    RequestContext::new(
413        req.headers().clone(),
414        req.method().to_string(),
415        req.path().to_string(),
416        req.peer_addr(),
417    )
418}