rust_mcp_sdk/hyper_servers/
hyper_runtime.rs

1use std::{sync::Arc, time::Duration};
2
3use crate::{
4    mcp_http::McpAppState,
5    mcp_server::HyperServer,
6    schema::{
7        schema_utils::{NotificationFromServer, RequestFromServer, ResultFromClient},
8        CreateMessageRequestParams, CreateMessageResult, InitializeRequestParams,
9        ListRootsRequestParams, ListRootsResult, LoggingMessageNotificationParams,
10        PromptListChangedNotificationParams, ResourceListChangedNotificationParams,
11        ResourceUpdatedNotificationParams, ToolListChangedNotificationParams,
12    },
13    McpServer,
14};
15
16use axum_server::Handle;
17use rust_mcp_transport::SessionId;
18use tokio::task::JoinHandle;
19
20use crate::{
21    error::SdkResult,
22    mcp_server::{
23        error::{TransportServerError, TransportServerResult},
24        ServerRuntime,
25    },
26};
27
28pub struct HyperRuntime {
29    pub(crate) state: Arc<McpAppState>,
30    pub(crate) server_task: JoinHandle<Result<(), TransportServerError>>,
31    pub(crate) server_handle: Handle,
32}
33
34impl HyperRuntime {
35    pub async fn create(server: HyperServer) -> SdkResult<Self> {
36        let addr = server.options.resolve_server_address().await?;
37        let state = server.state();
38
39        let server_handle = server.server_handle();
40
41        let server_task = tokio::spawn(async move {
42            #[cfg(feature = "ssl")]
43            if server.options.enable_ssl {
44                server.start_ssl(addr).await
45            } else {
46                server.start_http(addr).await
47            }
48
49            #[cfg(not(feature = "ssl"))]
50            if server.options.enable_ssl {
51                panic!("SSL requested but the 'ssl' feature is not enabled");
52            } else {
53                server.start_http(addr).await
54            }
55        });
56
57        Ok(Self {
58            state,
59            server_task,
60            server_handle,
61        })
62    }
63
64    pub fn graceful_shutdown(&self, timeout: Option<Duration>) {
65        self.server_handle.graceful_shutdown(timeout);
66    }
67
68    pub async fn await_server(self) -> SdkResult<()> {
69        let result = self.server_task.await?;
70        result.map_err(|err| err.into())
71    }
72
73    /// Returns a list of active session IDs from the session store.
74    pub async fn sessions(&self) -> Vec<String> {
75        self.state.session_store.keys().await
76    }
77
78    /// Retrieves the runtime associated with the given session ID from the session store.
79    pub async fn runtime_by_session(
80        &self,
81        session_id: &SessionId,
82    ) -> TransportServerResult<Arc<ServerRuntime>> {
83        self.state.session_store.get(session_id).await.ok_or(
84            TransportServerError::SessionIdInvalid(session_id.to_string()),
85        )
86    }
87
88    pub async fn send_request(
89        &self,
90        session_id: &SessionId,
91        request: RequestFromServer,
92        timeout: Option<Duration>,
93    ) -> SdkResult<ResultFromClient> {
94        let runtime = self.runtime_by_session(session_id).await?;
95        runtime.request(request, timeout).await
96    }
97
98    pub async fn send_notification(
99        &self,
100        session_id: &SessionId,
101        notification: NotificationFromServer,
102    ) -> SdkResult<()> {
103        let runtime = self.runtime_by_session(session_id).await?;
104        runtime.send_notification(notification).await
105    }
106
107    /// Request a list of root URIs from the client. Roots allow
108    /// servers to ask for specific directories or files to operate on. A common example
109    /// for roots is providing a set of repositories or directories a server should operate on.
110    /// This request is typically used when the server needs to understand the file system
111    /// structure or access specific locations that the client has permission to read from
112    pub async fn list_roots(
113        &self,
114        session_id: &SessionId,
115        params: Option<ListRootsRequestParams>,
116    ) -> SdkResult<ListRootsResult> {
117        let runtime = self.runtime_by_session(session_id).await?;
118        runtime.list_roots(params).await
119    }
120
121    pub async fn send_logging_message(
122        &self,
123        session_id: &SessionId,
124        params: LoggingMessageNotificationParams,
125    ) -> SdkResult<()> {
126        let runtime = self.runtime_by_session(session_id).await?;
127        runtime.send_logging_message(params).await
128    }
129
130    /// An optional notification from the server to the client, informing it that
131    /// the list of prompts it offers has changed.
132    /// This may be issued by servers without any previous subscription from the client.
133    pub async fn send_prompt_list_changed(
134        &self,
135        session_id: &SessionId,
136        params: Option<PromptListChangedNotificationParams>,
137    ) -> SdkResult<()> {
138        let runtime = self.runtime_by_session(session_id).await?;
139        runtime.send_prompt_list_changed(params).await
140    }
141
142    /// An optional notification from the server to the client,
143    /// informing it that the list of resources it can read from has changed.
144    /// This may be issued by servers without any previous subscription from the client.
145    pub async fn send_resource_list_changed(
146        &self,
147        session_id: &SessionId,
148        params: Option<ResourceListChangedNotificationParams>,
149    ) -> SdkResult<()> {
150        let runtime = self.runtime_by_session(session_id).await?;
151        runtime.send_resource_list_changed(params).await
152    }
153
154    /// A notification from the server to the client, informing it that
155    /// a resource has changed and may need to be read again.
156    ///  This should only be sent if the client previously sent a resources/subscribe request.
157    pub async fn send_resource_updated(
158        &self,
159        session_id: &SessionId,
160        params: ResourceUpdatedNotificationParams,
161    ) -> SdkResult<()> {
162        let runtime = self.runtime_by_session(session_id).await?;
163        runtime.send_resource_updated(params).await
164    }
165
166    /// An optional notification from the server to the client, informing it that
167    /// the list of tools it offers has changed.
168    /// This may be issued by servers without any previous subscription from the client.
169    pub async fn send_tool_list_changed(
170        &self,
171        session_id: &SessionId,
172        params: Option<ToolListChangedNotificationParams>,
173    ) -> SdkResult<()> {
174        let runtime = self.runtime_by_session(session_id).await?;
175        runtime.send_tool_list_changed(params).await
176    }
177
178    /// A ping request to check that the other party is still alive.
179    /// The receiver must promptly respond, or else may be disconnected.
180    ///
181    /// This function creates a `PingRequest` with no specific parameters, sends the request and awaits the response
182    /// Once the response is received, it attempts to convert it into the expected
183    /// result type.
184    ///
185    /// # Returns
186    /// A `SdkResult` containing the `rust_mcp_schema::Result` if the request is successful.
187    /// If the request or conversion fails, an error is returned.
188    pub async fn ping(
189        &self,
190        session_id: &SessionId,
191        timeout: Option<Duration>,
192    ) -> SdkResult<crate::schema::Result> {
193        let runtime = self.runtime_by_session(session_id).await?;
194        runtime.ping(timeout).await
195    }
196
197    /// A request from the server to sample an LLM via the client.
198    /// The client has full discretion over which model to select.
199    /// The client should also inform the user before beginning sampling,
200    /// to allow them to inspect the request (human in the loop)
201    /// and decide whether to approve it.
202    pub async fn create_message(
203        &self,
204        session_id: &SessionId,
205        params: CreateMessageRequestParams,
206    ) -> SdkResult<CreateMessageResult> {
207        let runtime = self.runtime_by_session(session_id).await?;
208        runtime.create_message(params).await
209    }
210
211    pub async fn client_info(
212        &self,
213        session_id: &SessionId,
214    ) -> SdkResult<Option<InitializeRequestParams>> {
215        let runtime = self.runtime_by_session(session_id).await?;
216        Ok(runtime.client_info())
217    }
218}