Skip to main content

mcp_memory/
http.rs

1//! MCP **Streamable HTTP** transport (the 2025-03-26 transport that
2//! superseded the older HTTP+SSE pair).
3//!
4//! * `POST /mcp` — the client sends one JSON-RPC message (or a batch array).
5//!   The reply is delivered as `application/json` by default, or as a one-shot
6//!   `text/event-stream` (SSE) event when the client `Accept`s it. A body of
7//!   only notifications gets `202 Accepted` with no content.
8//! * `GET /mcp` — opens a standalone server→client SSE stream. This server has
9//!   no server-initiated messages, so the stream simply stays open with
10//!   keep-alives; it exists for spec compliance.
11//!
12//! `/` is also wired to the same handlers for convenience. The JSON-RPC
13//! semantics are identical to the stdio and TCP transports — only framing
14//! differs (see [`crate::server::dispatch_http_body`]).
15
16use std::convert::Infallible;
17use parking_lot::RwLock;
18use std::sync::Arc;
19
20use axum::extract::{DefaultBodyLimit, State};
21use axum::http::{header, HeaderMap, StatusCode};
22use axum::response::sse::{Event, KeepAlive, Sse};
23use axum::response::{IntoResponse, Response};
24use axum::routing::post;
25use axum::{Json, Router};
26use futures::Stream;
27use serde_json::json;
28use tokio::net::TcpListener;
29use tracing::{error, info};
30
31use crate::errors::{MCSError, Result};
32use crate::kg::KnowledgeGraph;
33use crate::server;
34
35type AppState = Arc<RwLock<KnowledgeGraph>>;
36
37/// Build the axum router for the HTTP transport. Exposed so tests can drive it
38/// with `tower::ServiceExt::oneshot` without binding a socket.
39pub fn router(kg: AppState) -> Router {
40    Router::new()
41        .route("/mcp", post(post_handler).get(get_handler))
42        .route("/", post(post_handler).get(get_handler))
43        .layer(DefaultBodyLimit::max(server::MAX_REQUEST_BYTES))
44        .with_state(kg)
45}
46
47/// Bind `addr` and serve the HTTP transport until the process is killed.
48pub async fn run(addr: &str, kg: AppState) -> Result<()> {
49    let listener = TcpListener::bind(addr).await.map_err(MCSError::IoError)?;
50    info!("Listening for HTTP (Streamable) MCP on http://{addr}/mcp");
51    axum::serve(listener, router(kg)).await.map_err(MCSError::IoError)?;
52    Ok(())
53}
54
55fn wants_sse(headers: &HeaderMap) -> bool {
56    headers
57        .get(header::ACCEPT)
58        .and_then(|v| v.to_str().ok())
59        .is_some_and(|a| a.contains("text/event-stream"))
60}
61
62async fn post_handler(State(kg): State<AppState>, headers: HeaderMap, body: String) -> Response {
63    // The dispatch path locks the graph and may perform a blocking fsync, so
64    // run it off the async worker pool (keeps the HTTP reactor responsive).
65    let result = tokio::task::spawn_blocking(move || server::dispatch_http_body(&body, &kg)).await;
66
67    let outcome = match result {
68        Ok(inner) => inner,
69        Err(join_err) => {
70            error!("dispatch task panicked: {join_err}");
71            return (StatusCode::INTERNAL_SERVER_ERROR, "internal error").into_response();
72        }
73    };
74
75    match outcome {
76        // Body held only notifications → nothing to return.
77        Ok(None) => StatusCode::ACCEPTED.into_response(),
78        Ok(Some(value)) => {
79            if wants_sse(&headers) {
80                // One JSON-RPC reply delivered as a single SSE event, then close.
81                let json = serde_json::to_string(&value).unwrap();
82                let stream = futures::stream::once(async move {
83                    Ok::<Event, Infallible>(Event::default().data(json))
84                });
85                Sse::new(stream).into_response()
86            } else {
87                Json(value).into_response()
88            }
89        }
90        Err(e) => {
91            // Malformed JSON body → JSON-RPC parse error.
92            let resp = json!({
93                "jsonrpc": "2.0",
94                "error": { "code": -32700, "message": format!("Parse error: {e}") },
95                "id": null
96            });
97            (StatusCode::BAD_REQUEST, Json(resp)).into_response()
98        }
99    }
100}
101
102async fn get_handler() -> Sse<impl Stream<Item = std::result::Result<Event, Infallible>>> {
103    // No server-initiated messages: an open, keep-alive'd stream for compliance.
104    let stream = futures::stream::pending::<std::result::Result<Event, Infallible>>();
105    Sse::new(stream).keep_alive(KeepAlive::default())
106}