turbomcp_server/
shared.rs1use std::sync::Arc;
7use tokio::sync::Mutex;
8
9use crate::{
10 config::ServerConfig,
11 error::ServerResult,
12 lifecycle::{HealthStatus, ServerLifecycle},
13 metrics::ServerMetrics,
14 registry::HandlerRegistry,
15 routing::RequestRouter,
16 server::{McpServer, ShutdownHandle},
17};
18
19pub struct SharedServer {
68 inner: Arc<Mutex<Option<McpServer>>>,
69}
70
71impl SharedServer {
72 pub fn new(server: McpServer) -> Self {
77 Self {
78 inner: Arc::new(Mutex::new(Some(server))),
79 }
80 }
81
82 pub async fn config(&self) -> Option<ServerConfig> {
86 self.inner.lock().await.as_ref().map(|s| s.config().clone())
87 }
88
89 pub async fn registry(&self) -> Option<Arc<HandlerRegistry>> {
93 self.inner
94 .lock()
95 .await
96 .as_ref()
97 .map(|s| s.registry().clone())
98 }
99
100 pub async fn router(&self) -> Option<Arc<RequestRouter>> {
104 self.inner.lock().await.as_ref().map(|s| s.router().clone())
105 }
106
107 pub async fn lifecycle(&self) -> Option<Arc<ServerLifecycle>> {
111 self.inner
112 .lock()
113 .await
114 .as_ref()
115 .map(|s| s.lifecycle().clone())
116 }
117
118 pub async fn metrics(&self) -> Option<Arc<ServerMetrics>> {
122 self.inner
123 .lock()
124 .await
125 .as_ref()
126 .map(|s| s.metrics().clone())
127 }
128
129 pub async fn shutdown_handle(&self) -> Option<ShutdownHandle> {
134 self.inner
135 .lock()
136 .await
137 .as_ref()
138 .map(|s| s.shutdown_handle())
139 }
140
141 pub async fn health(&self) -> Option<HealthStatus> {
145 match self.inner.lock().await.as_ref() {
146 Some(server) => Some(server.health().await),
147 None => None,
148 }
149 }
150
151 pub async fn run_stdio(self) -> ServerResult<()> {
156 let server = self.take_server().await?;
157 server.run_stdio().await
158 }
159
160 #[cfg(feature = "tcp")]
162 pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
163 self,
164 addr: A,
165 ) -> ServerResult<()> {
166 let server = self.take_server().await?;
167 server.run_tcp(addr).await
168 }
169
170 #[cfg(all(feature = "unix", unix))]
172 pub async fn run_unix<P: AsRef<std::path::Path>>(self, path: P) -> ServerResult<()> {
173 let server = self.take_server().await?;
174 server.run_unix(path).await
175 }
176
177 async fn take_server(self) -> ServerResult<McpServer> {
182 let mut guard = self.inner.lock().await;
183 guard.take().ok_or_else(|| {
184 crate::ServerError::configuration("Server has already been consumed for running")
185 })
186 }
187
188 pub async fn is_available(&self) -> bool {
190 self.inner.lock().await.is_some()
191 }
192}
193
194impl Clone for SharedServer {
195 fn clone(&self) -> Self {
200 Self {
201 inner: Arc::clone(&self.inner),
202 }
203 }
204}
205
206impl std::fmt::Debug for SharedServer {
207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 f.debug_struct("SharedServer")
209 .field("inner", &"Arc<Mutex<Option<McpServer>>>")
210 .finish()
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use crate::ServerBuilder;
218
219 #[tokio::test]
220 async fn test_shared_server_creation() {
221 let server = ServerBuilder::new().build();
222 let shared = SharedServer::new(server);
223
224 let _shared2 = shared.clone();
226 }
227
228 #[tokio::test]
229 async fn test_shared_server_cloning() {
230 let server = ServerBuilder::new().build();
231 let shared = SharedServer::new(server);
232
233 let clones: Vec<_> = (0..10).map(|_| shared.clone()).collect();
235 assert_eq!(clones.len(), 10);
236
237 }
240
241 #[tokio::test]
242 async fn test_shared_server_api_surface() {
243 let server = ServerBuilder::new().build();
244 let shared = SharedServer::new(server);
245
246 let _config = shared.config().await;
250 let _registry = shared.registry().await;
251 let _router = shared.router().await;
252 let _lifecycle = shared.lifecycle().await;
253 let _metrics = shared.metrics().await;
254 let _shutdown_handle = shared.shutdown_handle().await;
255 let _health = shared.health().await;
256 let _available = shared.is_available().await;
257
258 assert!(shared.is_available().await);
259 }
260
261 #[tokio::test]
262 async fn test_shared_server_type_compatibility() {
263 let server = ServerBuilder::new().build();
264 let shared = SharedServer::new(server);
265
266 fn takes_shared_server<T>(_server: T)
268 where
269 T: Clone + Send + Sync + 'static,
270 {
271 }
272
273 takes_shared_server(shared);
274 }
275
276 #[tokio::test]
277 async fn test_shared_server_send_sync() {
278 let server = ServerBuilder::new().build();
279 let shared = SharedServer::new(server);
280
281 let handle = tokio::spawn(async move {
283 let _cloned = shared.clone();
284 });
286
287 handle.await.unwrap();
288 }
289
290 #[tokio::test]
291 async fn test_shared_server_thread_safety() {
292 let server = ServerBuilder::new().build();
293 let shared = SharedServer::new(server);
294
295 let shared1 = shared.clone();
297 let shared2 = shared.clone();
298
299 let handle1 = tokio::spawn(async move { shared1.config().await });
301
302 let handle2 = tokio::spawn(async move { shared2.health().await });
303
304 let (config, health) = tokio::join!(handle1, handle2);
305 let _config = config.unwrap();
306 let _health = health.unwrap();
307
308 assert!(shared.is_available().await);
310 }
311
312 #[tokio::test]
313 async fn test_shared_server_consumption() {
314 let server = ServerBuilder::new().build();
315 let shared = SharedServer::new(server);
316 let shared_clone = shared.clone();
317
318 assert!(shared.is_available().await);
320 assert!(shared_clone.is_available().await);
321
322 let _server = shared.take_server().await.unwrap();
324
325 assert!(!shared_clone.is_available().await);
327
328 let result = shared_clone.take_server().await;
330 assert!(result.is_err());
331 }
332
333 #[tokio::test]
334 async fn test_shared_server_after_consumption() {
335 let server = ServerBuilder::new().build();
336 let shared = SharedServer::new(server);
337 let shared_clone = shared.clone();
338
339 let _server = shared.take_server().await.unwrap();
341
342 assert!(shared_clone.config().await.is_none());
344 assert!(shared_clone.registry().await.is_none());
345 assert!(shared_clone.router().await.is_none());
346 assert!(shared_clone.lifecycle().await.is_none());
347 assert!(shared_clone.metrics().await.is_none());
348 assert!(shared_clone.shutdown_handle().await.is_none());
349 assert!(shared_clone.health().await.is_none());
350 assert!(!shared_clone.is_available().await);
351 }
352}