turbomcp_server/
server.rs

1//! Core MCP server implementation
2
3use std::sync::Arc;
4use tokio::sync::RwLock;
5
6use crate::{
7    config::ServerConfig,
8    error::ServerResult,
9    handlers::{PromptHandler, ResourceHandler, ToolHandler},
10    lifecycle::{HealthStatus, ServerLifecycle},
11    metrics::ServerMetrics,
12    middleware::{KeyExtractor, MiddlewareStack, RateLimitConfig, RateLimitMiddleware},
13    registry::HandlerRegistry,
14    routing::RequestRouter,
15};
16
17use bytes::Bytes;
18use tokio::time::{Duration, sleep};
19use turbomcp_core::RequestContext;
20use turbomcp_protocol::jsonrpc::{JsonRpcMessage, JsonRpcRequest, JsonRpcResponse};
21use turbomcp_transport::StdioTransport;
22use turbomcp_transport::core::{TransportError, TransportMessageMetadata};
23use turbomcp_transport::{Transport, TransportMessage};
24
25/// Check if logging should be enabled for STDIO transport
26///
27/// For MCP STDIO transport compliance, logging is disabled by default since stdout
28/// must be reserved exclusively for JSON-RPC messages. This can be overridden by
29/// setting the TURBOMCP_FORCE_LOGGING environment variable.
30fn should_log_for_stdio() -> bool {
31    std::env::var("TURBOMCP_FORCE_LOGGING").is_ok()
32}
33
34/// Handle for triggering graceful server shutdown
35///
36/// Provides external control over server shutdown with support for:
37/// - **Signal handling**: SIGTERM, SIGINT, custom signals
38/// - **Container orchestration**: Kubernetes graceful termination
39/// - **Health checks**: Coordinated shutdown with load balancers  
40/// - **Multi-service coordination**: Synchronized shutdown sequences
41/// - **Testing**: Controlled server lifecycle in tests
42///
43/// The handle is cloneable and thread-safe, allowing multiple components
44/// to coordinate shutdown or check shutdown status.
45#[derive(Debug, Clone)]
46pub struct ShutdownHandle {
47    lifecycle: Arc<ServerLifecycle>,
48}
49
50impl ShutdownHandle {
51    /// Trigger graceful server shutdown
52    pub async fn shutdown(&self) {
53        self.lifecycle.shutdown().await;
54    }
55
56    /// Check if shutdown has been initiated
57    pub async fn is_shutting_down(&self) -> bool {
58        use crate::lifecycle::ServerState;
59        matches!(
60            self.lifecycle.state().await,
61            ServerState::ShuttingDown | ServerState::Stopped
62        )
63    }
64}
65
66/// Main MCP server
67pub struct McpServer {
68    /// Server configuration
69    pub(crate) config: ServerConfig,
70    /// Handler registry
71    pub(crate) registry: Arc<HandlerRegistry>,
72    /// Request router
73    pub(crate) router: Arc<RequestRouter>,
74    /// Middleware stack
75    #[allow(dead_code)]
76    pub(crate) middleware: Arc<RwLock<MiddlewareStack>>,
77    /// Server lifecycle
78    pub(crate) lifecycle: Arc<ServerLifecycle>,
79    /// Server metrics
80    pub(crate) metrics: Arc<ServerMetrics>,
81}
82
83impl std::fmt::Debug for McpServer {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("McpServer")
86            .field("config", &self.config)
87            .finish()
88    }
89}
90
91impl McpServer {
92    /// Create a new server
93    #[must_use]
94    pub fn new(config: ServerConfig) -> Self {
95        let registry = Arc::new(HandlerRegistry::new());
96        let router = Arc::new(RequestRouter::new(Arc::clone(&registry)));
97        let mut stack = MiddlewareStack::new();
98        // Auto-install rate limiting if enabled in config
99        if config.rate_limiting.enabled {
100            #[cfg(test)]
101            let rate_middleware = RateLimitMiddleware::new_for_testing(RateLimitConfig {
102                requests_per_second: config.rate_limiting.requests_per_second,
103                burst_capacity: config.rate_limiting.burst_capacity,
104                key_extractor: KeyExtractor::Global,
105            });
106
107            #[cfg(not(test))]
108            let rate_middleware = RateLimitMiddleware::new(RateLimitConfig {
109                requests_per_second: config.rate_limiting.requests_per_second,
110                burst_capacity: config.rate_limiting.burst_capacity,
111                key_extractor: KeyExtractor::Global,
112            });
113
114            stack.add(rate_middleware);
115        }
116        let middleware = Arc::new(RwLock::new(stack));
117        let lifecycle = Arc::new(ServerLifecycle::new());
118        let metrics = Arc::new(ServerMetrics::new());
119
120        Self {
121            config,
122            registry,
123            router,
124            middleware,
125            lifecycle,
126            metrics,
127        }
128    }
129
130    /// Get server configuration
131    #[must_use]
132    pub const fn config(&self) -> &ServerConfig {
133        &self.config
134    }
135
136    /// Get handler registry
137    #[must_use]
138    pub const fn registry(&self) -> &Arc<HandlerRegistry> {
139        &self.registry
140    }
141
142    /// Get request router
143    #[must_use]
144    pub const fn router(&self) -> &Arc<RequestRouter> {
145        &self.router
146    }
147
148    /// Get server lifecycle
149    #[must_use]
150    pub const fn lifecycle(&self) -> &Arc<ServerLifecycle> {
151        &self.lifecycle
152    }
153
154    /// Get server metrics
155    #[must_use]
156    pub const fn metrics(&self) -> &Arc<ServerMetrics> {
157        &self.metrics
158    }
159
160    /// Get a shutdown handle for graceful server termination
161    ///
162    /// This handle enables external control over server shutdown, essential for:
163    /// - **Production deployments**: Graceful shutdown on SIGTERM/SIGINT
164    /// - **Container orchestration**: Kubernetes graceful pod termination
165    /// - **Load balancer integration**: Health check coordination
166    /// - **Multi-component systems**: Coordinated shutdown sequences
167    /// - **Maintenance operations**: Planned downtime and updates
168    ///
169    /// # Examples
170    ///
171    /// ## Basic shutdown coordination
172    /// ```no_run
173    /// # use turbomcp_server::ServerBuilder;
174    /// # #[tokio::main]
175    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
176    /// let server = ServerBuilder::new().build();
177    /// let shutdown_handle = server.shutdown_handle();
178    ///
179    /// // Coordinate with other services
180    /// tokio::spawn(async move {
181    ///     // Wait for external shutdown signal
182    ///     tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
183    ///     println!("Shutdown signal received, terminating gracefully...");
184    ///     shutdown_handle.shutdown().await;
185    /// });
186    ///
187    /// // Server will gracefully shut down when signaled
188    /// // server.run_stdio().await?;
189    /// # Ok(())
190    /// # }
191    /// ```
192    ///
193    /// ## Container/Kubernetes deployment
194    /// ```no_run
195    /// # use turbomcp_server::ServerBuilder;
196    /// # use std::sync::Arc;
197    /// # #[tokio::main]
198    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
199    /// let server = ServerBuilder::new().build();
200    /// let shutdown_handle = server.shutdown_handle();
201    /// let shutdown_handle_clone = shutdown_handle.clone();
202    ///
203    /// // Handle multiple signal types with proper platform support
204    /// tokio::spawn(async move {
205    ///     #[cfg(unix)]
206    ///     {
207    ///         use tokio::signal::unix::{signal, SignalKind};
208    ///         let mut sigterm = signal(SignalKind::terminate()).unwrap();
209    ///         tokio::select! {
210    ///             _ = tokio::signal::ctrl_c() => {
211    ///                 println!("SIGINT received");
212    ///             }
213    ///             _ = sigterm.recv() => {
214    ///                 println!("SIGTERM received");
215    ///             }
216    ///         }
217    ///     }
218    ///     #[cfg(not(unix))]
219    ///     {
220    ///         tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C handler");
221    ///         println!("SIGINT received");
222    ///     }
223    ///     shutdown_handle_clone.shutdown().await;
224    /// });
225    ///
226    /// // Server handles graceful shutdown automatically
227    /// // server.run_tcp("0.0.0.0:8080").await?;
228    /// # Ok(())
229    /// # }
230    /// ```
231    pub fn shutdown_handle(&self) -> ShutdownHandle {
232        ShutdownHandle {
233            lifecycle: self.lifecycle.clone(),
234        }
235    }
236
237    /// Run the server with STDIO transport
238    pub async fn run_stdio(self) -> ServerResult<()> {
239        // For STDIO transport, disable logging unless explicitly overridden
240        // STDIO stdout must be reserved exclusively for JSON-RPC messages per MCP protocol
241        if should_log_for_stdio() {
242            tracing::info!("Starting MCP server with STDIO transport");
243        }
244        self.lifecycle.start().await;
245
246        // Initialize STDIO transport
247        let mut transport = StdioTransport::new();
248        if let Err(e) = transport.connect().await {
249            if should_log_for_stdio() {
250                tracing::error!(error = %e, "Failed to connect stdio transport");
251            } else {
252                // Critical errors can go to stderr for debugging
253                eprintln!("TurboMCP STDIO transport failed to connect: {}", e);
254            }
255            self.lifecycle.shutdown().await;
256            return Err(e.into());
257        }
258
259        self.run_with_transport_stdio_aware(transport).await
260    }
261
262    /// Get health status
263    pub async fn health(&self) -> HealthStatus {
264        self.lifecycle.health().await
265    }
266
267    /// Run server with HTTP transport - Simple HTTP/JSON-RPC server
268    /// Run server with HTTP transport
269    ///
270    /// This provides a working HTTP server with:
271    /// - Standard HTTP POST for request/response at `/mcp`
272    /// - Full MCP protocol compliance
273    /// - Graceful shutdown support
274    ///
275    /// Note: WebSocket and SSE support temporarily disabled due to DashMap lifetime
276    /// variance issues that require architectural changes to the handler registry.
277    #[cfg(feature = "http")]
278    pub async fn run_http<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
279        self,
280        _addr: A,
281    ) -> ServerResult<()> {
282        // HTTP support is now provided via compile-time routing in the macro-generated code
283        // This avoids all DashMap lifetime issues and provides maximum performance
284        Err(crate::ServerError::configuration(
285            "Direct HTTP support has been replaced with compile-time routing. \
286             Use the #[server] macro which generates into_router() and run_http_direct() methods \
287             with zero lifetime issues and maximum performance.",
288        ))
289    }
290
291    /// Run server with WebSocket transport (progressive enhancement - runtime configuration)
292    /// Note: WebSocket transport in this library is primarily client-oriented
293    /// For production WebSocket servers, consider using the ServerBuilder with WebSocket middleware
294    #[cfg(feature = "websocket")]
295    pub async fn run_websocket<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
296        self,
297        addr: A,
298    ) -> ServerResult<()> {
299        tracing::info!(
300            ?addr,
301            "WebSocket transport server mode not implemented - WebSocket transport is client-oriented"
302        );
303        tracing::info!(
304            "Consider using ServerBuilder with WebSocket middleware for WebSocket server functionality"
305        );
306        Err(crate::ServerError::configuration(
307            "WebSocket server transport not supported - use ServerBuilder with middleware",
308        ))
309    }
310
311    /// Run server with TCP transport (progressive enhancement - runtime configuration)
312    #[cfg(feature = "tcp")]
313    pub async fn run_tcp<A: std::net::ToSocketAddrs + Send + std::fmt::Debug>(
314        self,
315        addr: A,
316    ) -> ServerResult<()> {
317        use turbomcp_transport::TcpTransport;
318
319        tracing::info!(?addr, "Starting MCP server with TCP transport");
320        self.lifecycle.start().await;
321
322        // Convert ToSocketAddrs to SocketAddr
323        let socket_addr = match addr.to_socket_addrs() {
324            Ok(mut addrs) => match addrs.next() {
325                Some(addr) => addr,
326                None => {
327                    tracing::error!("No socket address resolved from provided address");
328                    self.lifecycle.shutdown().await;
329                    return Err(crate::ServerError::configuration("Invalid socket address"));
330                }
331            },
332            Err(e) => {
333                tracing::error!(error = %e, "Failed to resolve socket address");
334                self.lifecycle.shutdown().await;
335                return Err(crate::ServerError::configuration(format!(
336                    "Address resolution failed: {e}"
337                )));
338            }
339        };
340
341        let mut transport = TcpTransport::new_server(socket_addr);
342        if let Err(e) = transport.connect().await {
343            tracing::error!(error = %e, "Failed to connect TCP transport");
344            self.lifecycle.shutdown().await;
345            return Err(e.into());
346        }
347
348        self.run_with_transport(transport).await
349    }
350
351    /// Run server with Unix socket transport (progressive enhancement - runtime configuration)
352    #[cfg(all(feature = "unix", unix))]
353    pub async fn run_unix<P: AsRef<std::path::Path>>(self, path: P) -> ServerResult<()> {
354        use std::path::PathBuf;
355        use turbomcp_transport::UnixTransport;
356
357        tracing::info!(path = ?path.as_ref(), "Starting MCP server with Unix socket transport");
358        self.lifecycle.start().await;
359
360        let socket_path = PathBuf::from(path.as_ref());
361        let mut transport = UnixTransport::new_server(socket_path);
362        if let Err(e) = transport.connect().await {
363            tracing::error!(error = %e, "Failed to connect Unix socket transport");
364            self.lifecycle.shutdown().await;
365            return Err(e.into());
366        }
367
368        self.run_with_transport(transport).await
369    }
370
371    /// Generic transport runner (DRY principle)
372    async fn run_with_transport<T: Transport>(&self, mut transport: T) -> ServerResult<()> {
373        // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
374        let lifecycle_for_sigint = self.lifecycle.clone();
375        tokio::spawn(async move {
376            if let Err(e) = tokio::signal::ctrl_c().await {
377                tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
378                return;
379            }
380            tracing::info!("Ctrl+C received, initiating shutdown");
381            lifecycle_for_sigint.shutdown().await;
382        });
383
384        #[cfg(unix)]
385        {
386            let lifecycle_for_sigterm = self.lifecycle.clone();
387            tokio::spawn(async move {
388                use tokio::signal::unix::{SignalKind, signal};
389                match signal(SignalKind::terminate()) {
390                    Ok(mut sigterm) => {
391                        sigterm.recv().await;
392                        tracing::info!("SIGTERM received, initiating shutdown");
393                        lifecycle_for_sigterm.shutdown().await;
394                    }
395                    Err(e) => tracing::warn!(error = %e, "Failed to install SIGTERM handler"),
396                }
397            });
398        }
399
400        // Shutdown signal
401        let mut shutdown = self.lifecycle.shutdown_signal();
402
403        // Main message processing loop
404        loop {
405            tokio::select! {
406                _ = shutdown.recv() => {
407                    tracing::info!("Shutdown signal received");
408                    break;
409                }
410                res = transport.receive() => {
411                    match res {
412                        Ok(Some(message)) => {
413                            if let Err(e) = self.handle_transport_message(&mut transport, message).await {
414                                tracing::warn!(error = %e, "Failed to handle transport message");
415                            }
416                        }
417                        Ok(None) => {
418                            // No message available; sleep briefly to avoid busy loop
419                            sleep(Duration::from_millis(5)).await;
420                        }
421                        Err(e) => {
422                            match e {
423                                TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
424                                    tracing::info!("Transport receive channel disconnected; shutting down");
425                                    break;
426                                }
427                                _ => {
428                                    tracing::error!(error = %e, "Transport receive failed");
429                                    // Backoff on errors
430                                    sleep(Duration::from_millis(50)).await;
431                                }
432                            }
433                        }
434                    }
435                }
436            }
437        }
438
439        // Disconnect transport
440        if let Err(e) = transport.disconnect().await {
441            tracing::warn!(error = %e, "Error while disconnecting transport");
442        }
443
444        tracing::info!("Server shutdown complete");
445        Ok(())
446    }
447
448    /// STDIO-aware transport runner that respects MCP protocol logging requirements
449    async fn run_with_transport_stdio_aware<T: Transport>(
450        &self,
451        mut transport: T,
452    ) -> ServerResult<()> {
453        // Install signal handlers for graceful shutdown (Ctrl+C / SIGTERM)
454        let lifecycle_for_sigint = self.lifecycle.clone();
455        tokio::spawn(async move {
456            if let Err(e) = tokio::signal::ctrl_c().await {
457                if should_log_for_stdio() {
458                    tracing::warn!(error = %e, "Failed to install Ctrl+C handler");
459                }
460                return;
461            }
462            if should_log_for_stdio() {
463                tracing::info!("Ctrl+C received, initiating shutdown");
464            }
465            lifecycle_for_sigint.shutdown().await;
466        });
467
468        #[cfg(unix)]
469        {
470            let lifecycle_for_sigterm = self.lifecycle.clone();
471            tokio::spawn(async move {
472                use tokio::signal::unix::{SignalKind, signal};
473                match signal(SignalKind::terminate()) {
474                    Ok(mut sigterm) => {
475                        sigterm.recv().await;
476                        if should_log_for_stdio() {
477                            tracing::info!("SIGTERM received, initiating shutdown");
478                        }
479                        lifecycle_for_sigterm.shutdown().await;
480                    }
481                    Err(e) => {
482                        if should_log_for_stdio() {
483                            tracing::warn!(error = %e, "Failed to install SIGTERM handler");
484                        }
485                    }
486                }
487            });
488        }
489
490        // Shutdown signal
491        let mut shutdown = self.lifecycle.shutdown_signal();
492
493        // Main message processing loop
494        loop {
495            tokio::select! {
496                _ = shutdown.recv() => {
497                    if should_log_for_stdio() {
498                        tracing::info!("Shutdown signal received");
499                    }
500                    break;
501                }
502                res = transport.receive() => {
503                    match res {
504                        Ok(Some(message)) => {
505                            if let Err(e) = self.handle_transport_message_stdio_aware(&mut transport, message).await
506                                && should_log_for_stdio() {
507                                    tracing::warn!(error = %e, "Failed to handle transport message");
508                                }
509                        }
510                        Ok(None) => {
511                            // No message available; sleep briefly to avoid busy loop
512                            sleep(Duration::from_millis(5)).await;
513                        }
514                        Err(e) => {
515                            match e {
516                                TransportError::ReceiveFailed(msg) if msg.contains("disconnected") => {
517                                    if should_log_for_stdio() {
518                                        tracing::info!("Transport receive channel disconnected; shutting down");
519                                    }
520                                    break;
521                                }
522                                _ => {
523                                    if should_log_for_stdio() {
524                                        tracing::error!(error = %e, "Transport receive failed");
525                                    }
526                                    // Backoff on errors
527                                    sleep(Duration::from_millis(50)).await;
528                                }
529                            }
530                        }
531                    }
532                }
533            }
534        }
535
536        // Disconnect transport
537        if let Err(e) = transport.disconnect().await
538            && should_log_for_stdio()
539        {
540            tracing::warn!(error = %e, "Error while disconnecting transport");
541        }
542
543        if should_log_for_stdio() {
544            tracing::info!("Server shutdown complete");
545        }
546        Ok(())
547    }
548}
549
550impl McpServer {
551    async fn handle_transport_message(
552        &self,
553        transport: &mut dyn Transport,
554        message: TransportMessage,
555    ) -> ServerResult<()> {
556        // Convert bytes to str
557        let json_str = match std::str::from_utf8(&message.payload) {
558            Ok(s) => s,
559            Err(e) => {
560                tracing::warn!(error = %e, "Invalid UTF-8 in incoming message");
561                return Ok(());
562            }
563        };
564
565        // Parse JSON-RPC
566        let parsed = serde_json::from_str::<JsonRpcMessage>(json_str);
567        let response_json = match parsed {
568            Ok(JsonRpcMessage::Request(req)) => {
569                let ctx = RequestContext::new().with_metadata("transport", "stdio");
570                // Process through middleware stack before routing
571                let (req, ctx) = match self.middleware.read().await.process_request(req, ctx).await
572                {
573                    Ok(tuple) => tuple,
574                    Err(e) => {
575                        // Convert middleware error to JSON-RPC error response
576                        let error = turbomcp_protocol::jsonrpc::JsonRpcError {
577                            code: e.error_code(),
578                            message: e.to_string(),
579                            data: None,
580                        };
581                        let response = turbomcp_protocol::jsonrpc::JsonRpcResponse {
582                            jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
583                            id: turbomcp_protocol::jsonrpc::ResponseId::null(),
584                            payload: turbomcp_protocol::jsonrpc::JsonRpcResponsePayload::Error {
585                                error,
586                            },
587                        };
588                        let reply = TransportMessage::with_metadata(
589                            message.id,
590                            Bytes::from(
591                                serde_json::to_string(&response)
592                                    .unwrap_or_else(|_| "{}".to_string()),
593                            ),
594                            TransportMessageMetadata::with_content_type("application/json"),
595                        );
596                        let _ = transport.send(reply).await;
597                        return Ok(());
598                    }
599                };
600                // Process request through middleware
601                let (processed_req, updated_ctx) = match self
602                    .middleware
603                    .read()
604                    .await
605                    .process_request(req, ctx.clone())
606                    .await
607                {
608                    Ok(r) => r,
609                    Err(e) => {
610                        // Return error response for middleware rejection
611                        let error_response = turbomcp_protocol::jsonrpc::JsonRpcResponse {
612                            jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
613                            id: turbomcp_protocol::jsonrpc::ResponseId::null(),
614                            payload: turbomcp_protocol::jsonrpc::JsonRpcResponsePayload::Error {
615                                error: turbomcp_protocol::jsonrpc::JsonRpcError {
616                                    code: -32603,
617                                    message: format!("Middleware error: {e}"),
618                                    data: None,
619                                },
620                            },
621                        };
622                        let mut reply = TransportMessage::new(
623                            turbomcp_core::MessageId::from("error"),
624                            Bytes::from(
625                                serde_json::to_string(&error_response)
626                                    .unwrap_or_else(|_| "{}".to_string()),
627                            ),
628                        );
629                        reply.metadata =
630                            TransportMessageMetadata::with_content_type("application/json");
631                        let _ = transport.send(reply).await;
632                        return Ok(());
633                    }
634                };
635
636                let mut resp: JsonRpcResponse =
637                    self.router.route(processed_req, updated_ctx.clone()).await;
638                // Process response through middleware
639                resp = match self
640                    .middleware
641                    .read()
642                    .await
643                    .process_response(resp, &updated_ctx)
644                    .await
645                {
646                    Ok(r) => r,
647                    Err(e) => turbomcp_protocol::jsonrpc::JsonRpcResponse {
648                        jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
649                        id: turbomcp_protocol::jsonrpc::ResponseId::null(),
650                        payload: turbomcp_protocol::jsonrpc::JsonRpcResponsePayload::Error {
651                            error: turbomcp_protocol::jsonrpc::JsonRpcError {
652                                code: e.error_code(),
653                                message: e.to_string(),
654                                data: None,
655                            },
656                        },
657                    },
658                };
659
660                serde_json::to_string(&resp).ok()
661            }
662            Ok(JsonRpcMessage::RequestBatch(batch)) => {
663                // Convert batch to Vec<JsonRpcRequest>
664                let requests: Vec<JsonRpcRequest> = batch.items;
665                let ctx = RequestContext::new().with_metadata("transport", "stdio");
666                // Process each request through middleware by reusing the router’s batch processing
667                let responses = self.router.route_batch(requests, ctx).await;
668                serde_json::to_string(&responses).ok()
669            }
670            Ok(JsonRpcMessage::Notification(_note)) => {
671                // No response for notifications
672                None
673            }
674            // Ignore responses from client (server-initiated only)
675            Ok(
676                JsonRpcMessage::Response(_)
677                | JsonRpcMessage::ResponseBatch(_)
678                | JsonRpcMessage::MessageBatch(_),
679            ) => None,
680            Err(e) => {
681                tracing::warn!(error = %e, "Failed to parse JSON-RPC message");
682                // Return proper JSON-RPC parse error response (RFC compliant)
683                let error_response = turbomcp_protocol::jsonrpc::JsonRpcResponse::parse_error(
684                    Some(format!("Invalid JSON-RPC: {}", e)),
685                );
686                serde_json::to_string(&error_response).ok()
687            }
688        };
689
690        if let Some(resp_str) = response_json {
691            let reply = TransportMessage::with_metadata(
692                message.id,
693                Bytes::from(resp_str),
694                TransportMessageMetadata::with_content_type("application/json"),
695            );
696            if let Err(e) = transport.send(reply).await {
697                tracing::warn!(error = %e, "Failed to send response over transport");
698            }
699        }
700
701        Ok(())
702    }
703
704    /// STDIO-aware message handler that respects MCP protocol logging requirements
705    async fn handle_transport_message_stdio_aware(
706        &self,
707        transport: &mut dyn Transport,
708        message: TransportMessage,
709    ) -> ServerResult<()> {
710        // Convert bytes to str
711        let json_str = match std::str::from_utf8(&message.payload) {
712            Ok(s) => s,
713            Err(e) => {
714                if should_log_for_stdio() {
715                    tracing::warn!(error = %e, "Invalid UTF-8 in incoming message");
716                }
717                return Ok(());
718            }
719        };
720
721        // Parse JSON-RPC
722        let parsed = serde_json::from_str::<JsonRpcMessage>(json_str);
723        let response_json = match parsed {
724            Ok(JsonRpcMessage::Request(req)) => {
725                let ctx = RequestContext::new().with_metadata("transport", "stdio");
726                // Process through middleware stack before routing
727                let (req, ctx) = match self.middleware.read().await.process_request(req, ctx).await
728                {
729                    Ok(tuple) => tuple,
730                    Err(e) => {
731                        // Convert middleware error to JSON-RPC error response
732                        let error = turbomcp_protocol::jsonrpc::JsonRpcError {
733                            code: e.error_code(),
734                            message: e.to_string(),
735                            data: None,
736                        };
737                        let response = turbomcp_protocol::jsonrpc::JsonRpcResponse {
738                            jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
739                            id: turbomcp_protocol::jsonrpc::ResponseId::null(),
740                            payload: turbomcp_protocol::jsonrpc::JsonRpcResponsePayload::Error {
741                                error,
742                            },
743                        };
744                        let reply = TransportMessage::with_metadata(
745                            message.id,
746                            Bytes::from(
747                                serde_json::to_string(&response)
748                                    .unwrap_or_else(|_| "{}".to_string()),
749                            ),
750                            TransportMessageMetadata::with_content_type("application/json"),
751                        );
752                        let _ = transport.send(reply).await;
753                        return Ok(());
754                    }
755                };
756                // Process request through middleware
757                let (processed_req, updated_ctx) = match self
758                    .middleware
759                    .read()
760                    .await
761                    .process_request(req, ctx.clone())
762                    .await
763                {
764                    Ok(r) => r,
765                    Err(e) => {
766                        // Return error response for middleware rejection
767                        let error_response = turbomcp_protocol::jsonrpc::JsonRpcResponse {
768                            jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
769                            id: turbomcp_protocol::jsonrpc::ResponseId::null(),
770                            payload: turbomcp_protocol::jsonrpc::JsonRpcResponsePayload::Error {
771                                error: turbomcp_protocol::jsonrpc::JsonRpcError {
772                                    code: -32603,
773                                    message: format!("Middleware error: {e}"),
774                                    data: None,
775                                },
776                            },
777                        };
778                        let mut reply = TransportMessage::new(
779                            turbomcp_core::MessageId::from("error"),
780                            Bytes::from(
781                                serde_json::to_string(&error_response)
782                                    .unwrap_or_else(|_| "{}".to_string()),
783                            ),
784                        );
785                        reply.metadata =
786                            TransportMessageMetadata::with_content_type("application/json");
787                        let _ = transport.send(reply).await;
788                        return Ok(());
789                    }
790                };
791
792                let mut resp: JsonRpcResponse =
793                    self.router.route(processed_req, updated_ctx.clone()).await;
794                // Process response through middleware
795                resp = match self
796                    .middleware
797                    .read()
798                    .await
799                    .process_response(resp, &updated_ctx)
800                    .await
801                {
802                    Ok(r) => r,
803                    Err(e) => turbomcp_protocol::jsonrpc::JsonRpcResponse {
804                        jsonrpc: turbomcp_protocol::jsonrpc::JsonRpcVersion,
805                        id: turbomcp_protocol::jsonrpc::ResponseId::null(),
806                        payload: turbomcp_protocol::jsonrpc::JsonRpcResponsePayload::Error {
807                            error: turbomcp_protocol::jsonrpc::JsonRpcError {
808                                code: e.error_code(),
809                                message: e.to_string(),
810                                data: None,
811                            },
812                        },
813                    },
814                };
815
816                serde_json::to_string(&resp).ok()
817            }
818            Ok(JsonRpcMessage::RequestBatch(batch)) => {
819                // Convert batch to Vec<JsonRpcRequest>
820                let requests: Vec<JsonRpcRequest> = batch.items;
821                let ctx = RequestContext::new().with_metadata("transport", "stdio");
822                // Process each request through middleware by reusing the router's batch processing
823                let responses = self.router.route_batch(requests, ctx).await;
824                serde_json::to_string(&responses).ok()
825            }
826            Ok(JsonRpcMessage::Notification(_note)) => {
827                // No response for notifications
828                None
829            }
830            // Ignore responses from client (server-initiated only)
831            Ok(
832                JsonRpcMessage::Response(_)
833                | JsonRpcMessage::ResponseBatch(_)
834                | JsonRpcMessage::MessageBatch(_),
835            ) => None,
836            Err(e) => {
837                if should_log_for_stdio() {
838                    tracing::warn!(error = %e, "Failed to parse JSON-RPC message");
839                }
840                // Return proper JSON-RPC parse error response (RFC compliant)
841                let error_response = turbomcp_protocol::jsonrpc::JsonRpcResponse::parse_error(
842                    Some(format!("Invalid JSON-RPC: {}", e)),
843                );
844                serde_json::to_string(&error_response).ok()
845            }
846        };
847
848        if let Some(resp_str) = response_json {
849            let reply = TransportMessage::with_metadata(
850                message.id,
851                Bytes::from(resp_str),
852                TransportMessageMetadata::with_content_type("application/json"),
853            );
854            if let Err(e) = transport.send(reply).await
855                && should_log_for_stdio()
856            {
857                tracing::warn!(error = %e, "Failed to send response over transport");
858            }
859        }
860
861        Ok(())
862    }
863}
864
865/// Server builder for convenient server construction
866pub struct ServerBuilder {
867    /// Server configuration
868    config: ServerConfig,
869    /// Registry builder
870    registry: HandlerRegistry,
871}
872
873impl std::fmt::Debug for ServerBuilder {
874    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
875        f.debug_struct("ServerBuilder")
876            .field("config", &self.config)
877            .finish()
878    }
879}
880
881impl ServerBuilder {
882    /// Create a new server builder
883    #[must_use]
884    pub fn new() -> Self {
885        Self {
886            config: ServerConfig::default(),
887            registry: HandlerRegistry::new(),
888        }
889    }
890
891    /// Set server name
892    pub fn name(mut self, name: impl Into<String>) -> Self {
893        self.config.name = name.into();
894        self
895    }
896
897    /// Set server version
898    pub fn version(mut self, version: impl Into<String>) -> Self {
899        self.config.version = version.into();
900        self
901    }
902
903    /// Set server description
904    pub fn description(mut self, description: impl Into<String>) -> Self {
905        self.config.description = Some(description.into());
906        self
907    }
908
909    /// Add a tool handler
910    pub fn tool<T>(self, name: impl Into<String>, handler: T) -> ServerResult<Self>
911    where
912        T: ToolHandler + 'static,
913    {
914        self.registry.register_tool(name, handler)?;
915        Ok(self)
916    }
917
918    /// Add a prompt handler
919    pub fn prompt<P>(self, name: impl Into<String>, handler: P) -> ServerResult<Self>
920    where
921        P: PromptHandler + 'static,
922    {
923        self.registry.register_prompt(name, handler)?;
924        Ok(self)
925    }
926
927    /// Add a resource handler
928    pub fn resource<R>(self, name: impl Into<String>, handler: R) -> ServerResult<Self>
929    where
930        R: ResourceHandler + 'static,
931    {
932        self.registry.register_resource(name, handler)?;
933        Ok(self)
934    }
935
936    /// Add a filesystem root
937    pub fn root(self, uri: impl Into<String>, name: Option<String>) -> Self {
938        use turbomcp_protocol::types::Root;
939        self.registry.add_root(Root {
940            uri: uri.into(),
941            name,
942        });
943        self
944    }
945
946    /// Set multiple filesystem roots
947    pub fn roots(self, roots: Vec<turbomcp_protocol::types::Root>) -> Self {
948        self.registry.set_roots(roots);
949        self
950    }
951
952    /// Build the server
953    #[must_use]
954    pub fn build(self) -> McpServer {
955        let mut server = McpServer::new(self.config);
956        server.registry = Arc::new(self.registry);
957        server.router = Arc::new(RequestRouter::new(Arc::clone(&server.registry)));
958        server
959    }
960}
961
962impl Default for ServerBuilder {
963    fn default() -> Self {
964        Self::new()
965    }
966}
967
968#[cfg(test)]
969mod tests {
970    use serde_json::Value;
971
972    /// Test that invalid JSON-RPC parsing creates proper error responses
973    /// This validates the fix for the hanging vulnerability
974    #[test]
975    fn test_jsonrpc_parse_error_response() {
976        // Test the exact vulnerable pattern that was fixed
977        let invalid_json = r#"{"id": 1, "method": "tools/list"}"#; // Missing jsonrpc version
978
979        // Parse as we do in the server
980        let parse_result =
981            serde_json::from_str::<turbomcp_protocol::jsonrpc::JsonRpcMessage>(invalid_json);
982
983        // Should fail to parse (this is expected)
984        assert!(
985            parse_result.is_err(),
986            "Invalid JSON-RPC should fail to parse"
987        );
988
989        // Verify the error response format that the server now creates
990        let error_response = turbomcp_protocol::jsonrpc::JsonRpcResponse::parse_error(None);
991
992        // Verify it serializes to valid JSON
993        let serialized = serde_json::to_string(&error_response);
994        assert!(
995            serialized.is_ok(),
996            "Error response should serialize correctly"
997        );
998
999        let response_json = serialized.unwrap();
1000        let parsed_response: Value = serde_json::from_str(&response_json).unwrap();
1001
1002        // Verify response structure
1003        assert_eq!(parsed_response["jsonrpc"], "2.0");
1004        assert_eq!(parsed_response["error"]["code"], -32700);
1005        assert_eq!(parsed_response["error"]["message"], "Parse error");
1006        assert!(parsed_response["error"]["data"].is_null());
1007    }
1008}