rmcp_soddygo/transport/streamable_http_server/session.rs
1//! Session management for the Streamable HTTP transport.
2//!
3//! A *session* groups the logically related interactions between a single MCP
4//! client and the server, starting from the `initialize` handshake. The server
5//! assigns each session a unique [`SessionId`] (returned to the client via the
6//! `Mcp-Session-Id` response header) and the client includes that ID on every
7//! subsequent request.
8//!
9//! Two tool calls carrying the same session ID come from the same logical
10//! session; different IDs mean different clients or conversations.
11//!
12//! # Implementations
13//!
14//! * [`local::LocalSessionManager`] — in-memory session store (default).
15//! * [`never::NeverSessionManager`] — rejects all session operations, used
16//! when stateful mode is disabled.
17//!
18//! # Custom session managers
19//!
20//! Implement the [`SessionManager`] trait to back sessions with a database,
21//! Redis, or any other external store.
22
23use futures::Stream;
24
25pub use crate::transport::common::server_side_http::{ServerSseMessage, SessionId};
26use crate::{
27 RoleServer,
28 model::{ClientJsonRpcMessage, ServerJsonRpcMessage},
29};
30
31pub mod local;
32pub mod never;
33pub mod store;
34
35pub use store::{SessionState, SessionStore, SessionStoreError};
36
37/// Extension marker inserted into the `initialize` request extensions during a
38/// session restore replay. Handlers can check for its presence to distinguish a
39/// cross-instance restore from a genuine client-initiated `initialize` request.
40///
41/// ```rust,ignore
42/// if req.extensions().get::<SessionRestoreMarker>().is_some() {
43/// // this is a restore replay, not a fresh client connection
44/// }
45/// ```
46#[non_exhaustive]
47#[derive(Debug, Clone)]
48pub struct SessionRestoreMarker {
49 pub id: SessionId,
50}
51
52/// The outcome of a [`SessionManager::restore_session`] call.
53#[non_exhaustive]
54#[derive(Debug)]
55pub enum RestoreOutcome<T> {
56 /// The session was just re-created from external state; the caller must
57 /// spawn an MCP handler against the returned transport and replay the
58 /// `initialize` handshake.
59 Restored(T),
60 /// The session was already present in memory (e.g. a concurrent request
61 /// already restored it). The caller should proceed as if `has_session`
62 /// had returned `true` — no further action is required.
63 AlreadyPresent,
64 /// This session manager does not support external-store restore.
65 /// The caller should fall through to the normal 404 response.
66 NotSupported,
67}
68
69/// Controls how MCP sessions are created, validated, and closed.
70///
71/// The `StreamableHttpService` calls into this
72/// trait for every HTTP request that carries (or should carry) a session ID.
73///
74/// See the [module-level docs](self) for background on sessions.
75pub trait SessionManager: Send + Sync + 'static {
76 type Error: std::error::Error + Send + 'static;
77 type Transport: crate::transport::Transport<RoleServer>;
78
79 /// Create a new session and return its ID together with the transport
80 /// that will be used to exchange MCP messages within this session.
81 fn create_session(
82 &self,
83 ) -> impl Future<Output = Result<(SessionId, Self::Transport), Self::Error>> + Send;
84
85 /// Forward the first message (the `initialize` request) to the session.
86 fn initialize_session(
87 &self,
88 id: &SessionId,
89 message: ClientJsonRpcMessage,
90 ) -> impl Future<Output = Result<ServerJsonRpcMessage, Self::Error>> + Send;
91
92 /// Return `true` if a session with the given ID exists and is active.
93 fn has_session(&self, id: &SessionId)
94 -> impl Future<Output = Result<bool, Self::Error>> + Send;
95
96 /// Close and remove the session. Corresponds to an HTTP DELETE request
97 /// with `Mcp-Session-Id`.
98 fn close_session(&self, id: &SessionId)
99 -> impl Future<Output = Result<(), Self::Error>> + Send;
100
101 /// Route a client request into the session and return an SSE stream
102 /// carrying the server's response(s).
103 fn create_stream(
104 &self,
105 id: &SessionId,
106 message: ClientJsonRpcMessage,
107 ) -> impl Future<
108 Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
109 > + Send;
110
111 /// Accept a notification, response, or error message from the client
112 /// without producing a response stream.
113 fn accept_message(
114 &self,
115 id: &SessionId,
116 message: ClientJsonRpcMessage,
117 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
118
119 /// Create an SSE stream not tied to a specific client request (HTTP GET).
120 fn create_standalone_stream(
121 &self,
122 id: &SessionId,
123 ) -> impl Future<
124 Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
125 > + Send;
126
127 /// Resume an SSE stream from the given `Last-Event-ID`, replaying any
128 /// events the client missed.
129 fn resume(
130 &self,
131 id: &SessionId,
132 last_event_id: String,
133 ) -> impl Future<
134 Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
135 > + Send;
136
137 /// Attempt to restore a previously-known session from external state,
138 /// creating a fresh in-memory session worker with the given `id`.
139 ///
140 /// See [`RestoreOutcome`] for the three possible results:
141 /// - [`RestoreOutcome::Restored`] — session re-created; caller must spawn
142 /// an MCP handler and replay the `initialize` handshake.
143 /// - [`RestoreOutcome::AlreadyPresent`] — session is already in memory
144 /// (e.g. a concurrent request restored it first); caller proceeds
145 /// normally.
146 /// - [`RestoreOutcome::NotSupported`] (default) — this session manager
147 /// does not support external-store restore; caller returns 404.
148 fn restore_session(
149 &self,
150 _id: SessionId,
151 ) -> impl Future<Output = Result<RestoreOutcome<Self::Transport>, Self::Error>> + Send {
152 futures::future::ready(Ok(RestoreOutcome::NotSupported))
153 }
154}