turbomcp_server/
shared.rs

1//! Shared server wrappers for concurrent access
2//!
3//! This module provides thread-safe wrappers around McpServer instances that enable
4//! concurrent access across multiple async tasks without exposing Arc/Mutex complexity.
5
6use std::sync::Arc;
7use tokio::sync::Mutex;
8
9use crate::{
10    config::ServerConfig,
11    error::ServerResult,
12    lifecycle::{HealthStatus, ServerLifecycle},
13    metrics::ServerMetrics,
14    registry::HandlerRegistry,
15    routing::RequestRouter,
16    server::{McpServer, ShutdownHandle},
17};
18
19/// Thread-safe wrapper for sharing McpServer instances across async tasks
20///
21/// This wrapper encapsulates Arc/Mutex complexity and provides a clean API
22/// for concurrent access to server functionality. It addresses the limitations
23/// where server run methods consume `self` but configuration and monitoring
24/// need to be shared across multiple async tasks.
25///
26/// # Design Rationale
27///
28/// McpServer run methods consume `self` because:
29/// - They take ownership of the server to run the main event loop
30/// - Transport binding requires exclusive access
31/// - Graceful shutdown needs to control the entire server lifecycle
32///
33/// However, other operations like health checks, metrics, and configuration
34/// access need to be shared across multiple tasks for monitoring and management.
35///
36/// # Examples
37///
38/// ```rust,no_run
39/// use turbomcp_server::{McpServer, SharedServer, ServerConfig};
40///
41/// # async fn example() -> turbomcp_server::error::ServerResult<()> {
42/// let config = ServerConfig::default();
43/// let server = McpServer::new(config);
44/// let shared = SharedServer::new(server);
45///
46/// // Clone for sharing across tasks
47/// let shared1 = shared.clone();
48/// let shared2 = shared.clone();
49///
50/// // Both tasks can access server state concurrently
51/// let handle1 = tokio::spawn(async move {
52///     shared1.health().await
53/// });
54///
55/// let handle2 = tokio::spawn(async move {
56///     let _handle = shared2.shutdown_handle();
57///     "shutdown_ready"
58/// });
59///
60/// let (health, _shutdown_ready) = tokio::try_join!(handle1, handle2).unwrap();
61///
62/// // Run the server (consumes the shared server)
63/// // shared.run_stdio().await?;
64/// # Ok(())
65/// # }
66/// ```
67pub struct SharedServer {
68    inner: Arc<Mutex<Option<McpServer>>>,
69}
70
71impl SharedServer {
72    /// Create a new shared server wrapper
73    ///
74    /// Takes ownership of a McpServer and wraps it for thread-safe sharing.
75    /// The original server can no longer be accessed directly after this call.
76    pub fn new(server: McpServer) -> Self {
77        Self {
78            inner: Arc::new(Mutex::new(Some(server))),
79        }
80    }
81
82    /// Get server configuration
83    ///
84    /// Returns a clone of the server configuration.
85    pub async fn config(&self) -> Option<ServerConfig> {
86        self.inner.lock().await.as_ref().map(|s| s.config().clone())
87    }
88
89    /// Get handler registry
90    ///
91    /// Returns a clone of the Arc to the handler registry.
92    pub async fn registry(&self) -> Option<Arc<HandlerRegistry>> {
93        self.inner
94            .lock()
95            .await
96            .as_ref()
97            .map(|s| s.registry().clone())
98    }
99
100    /// Get request router
101    ///
102    /// Returns a clone of the Arc to the request router.
103    pub async fn router(&self) -> Option<Arc<RequestRouter>> {
104        self.inner.lock().await.as_ref().map(|s| s.router().clone())
105    }
106
107    /// Get server lifecycle
108    ///
109    /// Returns a clone of the Arc to the server lifecycle.
110    pub async fn lifecycle(&self) -> Option<Arc<ServerLifecycle>> {
111        self.inner
112            .lock()
113            .await
114            .as_ref()
115            .map(|s| s.lifecycle().clone())
116    }
117
118    /// Get server metrics
119    ///
120    /// Returns a clone of the Arc to the server metrics.
121    pub async fn metrics(&self) -> Option<Arc<ServerMetrics>> {
122        self.inner
123            .lock()
124            .await
125            .as_ref()
126            .map(|s| s.metrics().clone())
127    }
128
129    /// Get a shutdown handle for graceful server termination
130    ///
131    /// Returns a shutdown handle that can be used to gracefully terminate
132    /// the server from external tasks.
133    pub async fn shutdown_handle(&self) -> Option<ShutdownHandle> {
134        self.inner
135            .lock()
136            .await
137            .as_ref()
138            .map(|s| s.shutdown_handle())
139    }
140
141    /// Get health status
142    ///
143    /// Returns the current health status of the server.
144    pub async fn health(&self) -> Option<HealthStatus> {
145        match self.inner.lock().await.as_ref() {
146            Some(server) => Some(server.health().await),
147            None => None,
148        }
149    }
150
151    /// Run the server with STDIO transport
152    ///
153    /// This consumes the SharedServer and extracts the inner server to run it.
154    /// After calling this method, the SharedServer can no longer be used.
155    pub async fn run_stdio(self) -> ServerResult<()> {
156        let server = self.take_server().await?;
157        server.run_stdio().await
158    }
159
160    /// Run server with TCP transport
161    #[cfg(feature = "tcp")]
162    pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
163        self,
164        addr: A,
165    ) -> ServerResult<()> {
166        let server = self.take_server().await?;
167        server.run_tcp(addr).await
168    }
169
170    /// Run server with Unix socket transport
171    #[cfg(all(feature = "unix", unix))]
172    pub async fn run_unix<P: AsRef<std::path::Path>>(self, path: P) -> ServerResult<()> {
173        let server = self.take_server().await?;
174        server.run_unix(path).await
175    }
176
177    /// Extract the inner server for running
178    ///
179    /// This is a helper method that takes the server out of the Option,
180    /// making the SharedServer unusable afterwards.
181    async fn take_server(self) -> ServerResult<McpServer> {
182        let mut guard = self.inner.lock().await;
183        guard.take().ok_or_else(|| {
184            crate::ServerError::configuration("Server has already been consumed for running")
185        })
186    }
187
188    /// Check if the server is still available (hasn't been consumed)
189    pub async fn is_available(&self) -> bool {
190        self.inner.lock().await.is_some()
191    }
192}
193
194impl Clone for SharedServer {
195    /// Clone the shared server for use in multiple async tasks
196    ///
197    /// This creates a new reference to the same underlying server,
198    /// allowing multiple tasks to share access safely.
199    fn clone(&self) -> Self {
200        Self {
201            inner: Arc::clone(&self.inner),
202        }
203    }
204}
205
206impl std::fmt::Debug for SharedServer {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        f.debug_struct("SharedServer")
209            .field("inner", &"Arc<Mutex<Option<McpServer>>>")
210            .finish()
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use crate::ServerBuilder;
218
219    #[tokio::test]
220    async fn test_shared_server_creation() {
221        let server = ServerBuilder::new().build();
222        let shared = SharedServer::new(server);
223
224        // Test that we can clone the shared server
225        let _shared2 = shared.clone();
226    }
227
228    #[tokio::test]
229    async fn test_shared_server_cloning() {
230        let server = ServerBuilder::new().build();
231        let shared = SharedServer::new(server);
232
233        // Clone multiple times to test Arc behavior
234        let clones: Vec<_> = (0..10).map(|_| shared.clone()).collect();
235        assert_eq!(clones.len(), 10);
236
237        // All clones should reference the same underlying server
238        // This is verified by the fact that they can all be created without error
239    }
240
241    #[tokio::test]
242    async fn test_shared_server_api_surface() {
243        let server = ServerBuilder::new().build();
244        let shared = SharedServer::new(server);
245
246        // Test that SharedServer provides the expected API surface
247        // These calls should compile and return Some values when server is available
248
249        let _config = shared.config().await;
250        let _registry = shared.registry().await;
251        let _router = shared.router().await;
252        let _lifecycle = shared.lifecycle().await;
253        let _metrics = shared.metrics().await;
254        let _shutdown_handle = shared.shutdown_handle().await;
255        let _health = shared.health().await;
256        let _available = shared.is_available().await;
257
258        assert!(shared.is_available().await);
259    }
260
261    #[tokio::test]
262    async fn test_shared_server_type_compatibility() {
263        let server = ServerBuilder::new().build();
264        let shared = SharedServer::new(server);
265
266        // Test that the SharedServer can be used in generic contexts
267        fn takes_shared_server<T>(_server: T)
268        where
269            T: Clone + Send + Sync + 'static,
270        {
271        }
272
273        takes_shared_server(shared);
274    }
275
276    #[tokio::test]
277    async fn test_shared_server_send_sync() {
278        let server = ServerBuilder::new().build();
279        let shared = SharedServer::new(server);
280
281        // Test that SharedServer can be moved across task boundaries
282        let handle = tokio::spawn(async move {
283            let _cloned = shared.clone();
284            // SharedServer should be Send + Sync, allowing this to compile
285        });
286
287        handle.await.unwrap();
288    }
289
290    #[tokio::test]
291    async fn test_shared_server_thread_safety() {
292        let server = ServerBuilder::new().build();
293        let shared = SharedServer::new(server);
294
295        // Test that SharedServer can be shared across threads safely
296        let shared1 = shared.clone();
297        let shared2 = shared.clone();
298
299        // Verify that concurrent access doesn't corrupt state
300        let handle1 = tokio::spawn(async move { shared1.config().await });
301
302        let handle2 = tokio::spawn(async move { shared2.health().await });
303
304        let (config, health) = tokio::join!(handle1, handle2);
305        let _config = config.unwrap();
306        let _health = health.unwrap();
307
308        // Both should succeed when server is available
309        assert!(shared.is_available().await);
310    }
311
312    #[tokio::test]
313    async fn test_shared_server_consumption() {
314        let server = ServerBuilder::new().build();
315        let shared = SharedServer::new(server);
316        let shared_clone = shared.clone();
317
318        // Server should be available initially
319        assert!(shared.is_available().await);
320        assert!(shared_clone.is_available().await);
321
322        // Take the server (simulating run_stdio consumption)
323        let _server = shared.take_server().await.unwrap();
324
325        // Server should no longer be available
326        assert!(!shared_clone.is_available().await);
327
328        // Attempting to take again should fail
329        let result = shared_clone.take_server().await;
330        assert!(result.is_err());
331    }
332
333    #[tokio::test]
334    async fn test_shared_server_after_consumption() {
335        let server = ServerBuilder::new().build();
336        let shared = SharedServer::new(server);
337        let shared_clone = shared.clone();
338
339        // Consume the server
340        let _server = shared.take_server().await.unwrap();
341
342        // All methods should return None after consumption (using the clone)
343        assert!(shared_clone.config().await.is_none());
344        assert!(shared_clone.registry().await.is_none());
345        assert!(shared_clone.router().await.is_none());
346        assert!(shared_clone.lifecycle().await.is_none());
347        assert!(shared_clone.metrics().await.is_none());
348        assert!(shared_clone.shutdown_handle().await.is_none());
349        assert!(shared_clone.health().await.is_none());
350        assert!(!shared_clone.is_available().await);
351    }
352}