Skip to main content

rmcp_soddygo/transport/streamable_http_server/
session.rs

1//! Session management for the Streamable HTTP transport.
2//!
3//! A *session* groups the logically related interactions between a single MCP
4//! client and the server, starting from the `initialize` handshake. The server
5//! assigns each session a unique [`SessionId`] (returned to the client via the
6//! `Mcp-Session-Id` response header) and the client includes that ID on every
7//! subsequent request.
8//!
9//! Two tool calls carrying the same session ID come from the same logical
10//! session; different IDs mean different clients or conversations.
11//!
12//! # Implementations
13//!
14//! * [`local::LocalSessionManager`] — in-memory session store (default).
15//! * [`never::NeverSessionManager`] — rejects all session operations, used
16//!   when stateful mode is disabled.
17//!
18//! # Custom session managers
19//!
20//! Implement the [`SessionManager`] trait to back sessions with a database,
21//! Redis, or any other external store.
22
23use futures::Stream;
24
25pub use crate::transport::common::server_side_http::{ServerSseMessage, SessionId};
26use crate::{
27    RoleServer,
28    model::{ClientJsonRpcMessage, ServerJsonRpcMessage},
29};
30
31pub mod local;
32pub mod never;
33pub mod store;
34
35pub use store::{SessionState, SessionStore, SessionStoreError};
36
37/// Extension marker inserted into the `initialize` request extensions during a
38/// session restore replay. Handlers can check for its presence to distinguish a
39/// cross-instance restore from a genuine client-initiated `initialize` request.
40///
41/// ```rust,ignore
42/// if req.extensions().get::<SessionRestoreMarker>().is_some() {
43///     // this is a restore replay, not a fresh client connection
44/// }
45/// ```
46#[non_exhaustive]
47#[derive(Debug, Clone)]
48pub struct SessionRestoreMarker {
49    pub id: SessionId,
50}
51
52/// The outcome of a [`SessionManager::restore_session`] call.
53#[non_exhaustive]
54#[derive(Debug)]
55pub enum RestoreOutcome<T> {
56    /// The session was just re-created from external state; the caller must
57    /// spawn an MCP handler against the returned transport and replay the
58    /// `initialize` handshake.
59    Restored(T),
60    /// The session was already present in memory (e.g. a concurrent request
61    /// already restored it). The caller should proceed as if `has_session`
62    /// had returned `true` — no further action is required.
63    AlreadyPresent,
64    /// This session manager does not support external-store restore.
65    /// The caller should fall through to the normal 404 response.
66    NotSupported,
67}
68
69/// Controls how MCP sessions are created, validated, and closed.
70///
71/// The `StreamableHttpService` calls into this
72/// trait for every HTTP request that carries (or should carry) a session ID.
73///
74/// See the [module-level docs](self) for background on sessions.
75pub trait SessionManager: Send + Sync + 'static {
76    type Error: std::error::Error + Send + 'static;
77    type Transport: crate::transport::Transport<RoleServer>;
78
79    /// Create a new session and return its ID together with the transport
80    /// that will be used to exchange MCP messages within this session.
81    fn create_session(
82        &self,
83    ) -> impl Future<Output = Result<(SessionId, Self::Transport), Self::Error>> + Send;
84
85    /// Forward the first message (the `initialize` request) to the session.
86    fn initialize_session(
87        &self,
88        id: &SessionId,
89        message: ClientJsonRpcMessage,
90    ) -> impl Future<Output = Result<ServerJsonRpcMessage, Self::Error>> + Send;
91
92    /// Return `true` if a session with the given ID exists and is active.
93    fn has_session(&self, id: &SessionId)
94    -> impl Future<Output = Result<bool, Self::Error>> + Send;
95
96    /// Close and remove the session. Corresponds to an HTTP DELETE request
97    /// with `Mcp-Session-Id`.
98    fn close_session(&self, id: &SessionId)
99    -> impl Future<Output = Result<(), Self::Error>> + Send;
100
101    /// Route a client request into the session and return an SSE stream
102    /// carrying the server's response(s).
103    fn create_stream(
104        &self,
105        id: &SessionId,
106        message: ClientJsonRpcMessage,
107    ) -> impl Future<
108        Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
109    > + Send;
110
111    /// Accept a notification, response, or error message from the client
112    /// without producing a response stream.
113    fn accept_message(
114        &self,
115        id: &SessionId,
116        message: ClientJsonRpcMessage,
117    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
118
119    /// Create an SSE stream not tied to a specific client request (HTTP GET).
120    fn create_standalone_stream(
121        &self,
122        id: &SessionId,
123    ) -> impl Future<
124        Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
125    > + Send;
126
127    /// Resume an SSE stream from the given `Last-Event-ID`, replaying any
128    /// events the client missed.
129    fn resume(
130        &self,
131        id: &SessionId,
132        last_event_id: String,
133    ) -> impl Future<
134        Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
135    > + Send;
136
137    /// Attempt to restore a previously-known session from external state,
138    /// creating a fresh in-memory session worker with the given `id`.
139    ///
140    /// See [`RestoreOutcome`] for the three possible results:
141    /// - [`RestoreOutcome::Restored`] — session re-created; caller must spawn
142    ///   an MCP handler and replay the `initialize` handshake.
143    /// - [`RestoreOutcome::AlreadyPresent`] — session is already in memory
144    ///   (e.g. a concurrent request restored it first); caller proceeds
145    ///   normally.
146    /// - [`RestoreOutcome::NotSupported`] (default) — this session manager
147    ///   does not support external-store restore; caller returns 404.
148    fn restore_session(
149        &self,
150        _id: SessionId,
151    ) -> impl Future<Output = Result<RestoreOutcome<Self::Transport>, Self::Error>> + Send {
152        futures::future::ready(Ok(RestoreOutcome::NotSupported))
153    }
154}