1use std::convert::Infallible;
17use std::sync::{Arc, Mutex};
18
19use axum::extract::{DefaultBodyLimit, State};
20use axum::http::{header, HeaderMap, StatusCode};
21use axum::response::sse::{Event, KeepAlive, Sse};
22use axum::response::{IntoResponse, Response};
23use axum::routing::post;
24use axum::{Json, Router};
25use futures::Stream;
26use serde_json::json;
27use tokio::net::TcpListener;
28use tracing::{error, info};
29
30use crate::errors::{MCSError, Result};
31use crate::kg::KnowledgeGraph;
32use crate::server;
33
34type AppState = Arc<Mutex<KnowledgeGraph>>;
35
36pub fn router(kg: AppState) -> Router {
39 Router::new()
40 .route("/mcp", post(post_handler).get(get_handler))
41 .route("/", post(post_handler).get(get_handler))
42 .layer(DefaultBodyLimit::max(server::MAX_REQUEST_BYTES))
43 .with_state(kg)
44}
45
46pub async fn run(addr: &str, kg: AppState) -> Result<()> {
48 let listener = TcpListener::bind(addr).await.map_err(MCSError::IoError)?;
49 info!("Listening for HTTP (Streamable) MCP on http://{addr}/mcp");
50 axum::serve(listener, router(kg)).await.map_err(MCSError::IoError)?;
51 Ok(())
52}
53
54fn wants_sse(headers: &HeaderMap) -> bool {
55 headers
56 .get(header::ACCEPT)
57 .and_then(|v| v.to_str().ok())
58 .is_some_and(|a| a.contains("text/event-stream"))
59}
60
61async fn post_handler(State(kg): State<AppState>, headers: HeaderMap, body: String) -> Response {
62 let result = tokio::task::spawn_blocking(move || server::dispatch_http_body(&body, &kg)).await;
65
66 let outcome = match result {
67 Ok(inner) => inner,
68 Err(join_err) => {
69 error!("dispatch task panicked: {join_err}");
70 return (StatusCode::INTERNAL_SERVER_ERROR, "internal error").into_response();
71 }
72 };
73
74 match outcome {
75 Ok(None) => StatusCode::ACCEPTED.into_response(),
77 Ok(Some(value)) => {
78 if wants_sse(&headers) {
79 let json = serde_json::to_string(&value).unwrap();
81 let stream = futures::stream::once(async move {
82 Ok::<Event, Infallible>(Event::default().data(json))
83 });
84 Sse::new(stream).into_response()
85 } else {
86 Json(value).into_response()
87 }
88 }
89 Err(e) => {
90 let resp = json!({
92 "jsonrpc": "2.0",
93 "error": { "code": -32700, "message": format!("Parse error: {e}") },
94 "id": null
95 });
96 (StatusCode::BAD_REQUEST, Json(resp)).into_response()
97 }
98 }
99}
100
101async fn get_handler() -> Sse<impl Stream<Item = std::result::Result<Event, Infallible>>> {
102 let stream = futures::stream::pending::<std::result::Result<Event, Infallible>>();
104 Sse::new(stream).keep_alive(KeepAlive::default())
105}